sync: implement --order-by xxx,mixed

This commit is contained in:
Nick Craig-Wood 2020-03-13 21:12:22 +00:00
parent 1e3d899db8
commit c227a90b52
4 changed files with 122 additions and 56 deletions

View File

@ -879,6 +879,17 @@ This can have a modifier appended with a comma:
- `ascending` or `asc` - order so that the smallest (or oldest) is processed first - `ascending` or `asc` - order so that the smallest (or oldest) is processed first
- `descending` or `desc` - order so that the largest (or newest) is processed first - `descending` or `desc` - order so that the largest (or newest) is processed first
- `mixed` - order so that the smallest is processed first for some threads and the largest for others
If the modifier is `mixed` then it can have an optional percentage
(which defaults to `50`), eg `size,mixed,25` which means that 25% of
the threads should be taking the smallest items and 75% the
largest. The threads which take the smallest first will always take
the smallest first and likewise the largest first threads. The `mixed`
mode can be useful to minimise the transfer time when you are
transferring a mixture of large and small files - the large files are
guaranteed upload threads and bandwidth and the small files will be
processed continuously.
If no modifier is supplied then the order is `ascending`. If no modifier is supplied then the order is `ascending`.

View File

@ -2,6 +2,7 @@ package sync
import ( import (
"context" "context"
"strconv"
"strings" "strings"
"sync" "sync"
@ -25,17 +26,19 @@ type pipe struct {
totalSize int64 totalSize int64
stats func(items int, totalSize int64) stats func(items int, totalSize int64)
less lessFn less lessFn
fraction int
} }
func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) { func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) {
less, err := newLess(orderBy) less, fraction, err := newLess(orderBy)
if err != nil { if err != nil {
return nil, fserrors.FatalError(err) return nil, fserrors.FatalError(err)
} }
p := &pipe{ p := &pipe{
c: make(chan struct{}, maxBacklog), c: make(chan struct{}, maxBacklog),
stats: stats, stats: stats,
less: less, less: less,
fraction: fraction,
} }
if p.less != nil { if p.less != nil {
deheap.Init(p) deheap.Init(p)
@ -105,9 +108,12 @@ func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
// Get a pair from the pipe // Get a pair from the pipe
// //
// If fraction is > the mixed fraction set in the pipe then it gets it
// from the other end of the heap if order-by is in effect
//
// It returns ok = false if the context was cancelled or Close() has // It returns ok = false if the context was cancelled or Close() has
// been called. // been called.
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { func (p *pipe) GetMax(ctx context.Context, fraction int) (pair fs.ObjectPair, ok bool) {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
@ -125,8 +131,10 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
pair = p.queue[0] pair = p.queue[0]
p.queue[0] = fs.ObjectPair{} // avoid memory leak p.queue[0] = fs.ObjectPair{} // avoid memory leak
p.queue = p.queue[1:] p.queue = p.queue[1:]
} else { } else if p.fraction < 0 || fraction < p.fraction {
pair = deheap.Pop(p).(fs.ObjectPair) pair = deheap.Pop(p).(fs.ObjectPair)
} else {
pair = deheap.PopMax(p).(fs.ObjectPair)
} }
size := pair.Src.Size() size := pair.Src.Size()
if size > 0 { if size > 0 {
@ -140,6 +148,14 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
return pair, true return pair, true
} }
// Get a pair from the pipe
//
// It returns ok = false if the context was cancelled or Close() has
// been called.
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
return p.GetMax(ctx, -1)
}
// Stats reads the number of items in the queue and the totalSize // Stats reads the number of items in the queue and the totalSize
func (p *pipe) Stats() (items int, totalSize int64) { func (p *pipe) Stats() (items int, totalSize int64) {
p.mu.Lock() p.mu.Lock()
@ -160,14 +176,12 @@ func (p *pipe) Close() {
// newLess returns a less function for the heap comparison or nil if // newLess returns a less function for the heap comparison or nil if
// one is not required // one is not required
func newLess(orderBy string) (less lessFn, err error) { func newLess(orderBy string) (less lessFn, fraction int, err error) {
fraction = -1
if orderBy == "" { if orderBy == "" {
return nil, nil return nil, fraction, nil
} }
parts := strings.Split(strings.ToLower(orderBy), ",") parts := strings.Split(strings.ToLower(orderBy), ",")
if len(parts) > 2 {
return nil, errors.Errorf("bad --order-by string %q", orderBy)
}
switch parts[0] { switch parts[0] {
case "name": case "name":
less = func(a, b fs.ObjectPair) bool { less = func(a, b fs.ObjectPair) bool {
@ -183,7 +197,7 @@ func newLess(orderBy string) (less lessFn, err error) {
return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx)) return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx))
} }
default: default:
return nil, errors.Errorf("unknown --order-by comparison %q", parts[0]) return nil, fraction, errors.Errorf("unknown --order-by comparison %q", parts[0])
} }
descending := false descending := false
if len(parts) > 1 { if len(parts) > 1 {
@ -191,15 +205,27 @@ func newLess(orderBy string) (less lessFn, err error) {
case "ascending", "asc": case "ascending", "asc":
case "descending", "desc": case "descending", "desc":
descending = true descending = true
case "mixed":
fraction = 50
if len(parts) > 2 {
fraction, err = strconv.Atoi(parts[2])
if err != nil {
return nil, fraction, errors.Errorf("bad mixed fraction --order-by %q", parts[2])
}
}
default: default:
return nil, errors.Errorf("unknown --order-by sort direction %q", parts[1]) return nil, fraction, errors.Errorf("unknown --order-by sort direction %q", parts[1])
} }
} }
if (fraction >= 0 && len(parts) > 3) || (fraction < 0 && len(parts) > 2) {
return nil, fraction, errors.Errorf("bad --order-by string %q", orderBy)
}
if descending { if descending {
oldLess := less oldLess := less
less = func(a, b fs.ObjectPair) bool { less = func(a, b fs.ObjectPair) bool {
return !oldLess(a, b) return !oldLess(a, b)
} }
} }
return less, nil return less, fraction, nil
} }

View File

@ -143,32 +143,37 @@ func TestPipeOrderBy(t *testing.T) {
orderBy string orderBy string
swapped1 bool swapped1 bool
swapped2 bool swapped2 bool
fraction int
}{ }{
{"", false, true}, {"", false, true, -1},
{"size", false, false}, {"size", false, false, -1},
{"name", true, true}, {"name", true, true, -1},
{"modtime", false, true}, {"modtime", false, true, -1},
{"size,ascending", false, false}, {"size,ascending", false, false, -1},
{"name,asc", true, true}, {"name,asc", true, true, -1},
{"modtime,ascending", false, true}, {"modtime,ascending", false, true, -1},
{"size,descending", true, true}, {"size,descending", true, true, -1},
{"name,desc", false, false}, {"name,desc", false, false, -1},
{"modtime,descending", true, false}, {"modtime,descending", true, false, -1},
{"size,mixed,50", false, false, 25},
{"size,mixed,51", true, true, 75},
} { } {
t.Run(test.orderBy, func(t *testing.T) { t.Run(test.orderBy, func(t *testing.T) {
p, err := newPipe(test.orderBy, stats, 10) p, err := newPipe(test.orderBy, stats, 10)
require.NoError(t, err) require.NoError(t, err)
ok := p.Put(ctx, pair1)
assert.True(t, ok)
ok = p.Put(ctx, pair2)
assert.True(t, ok)
readAndCheck := func(swapped bool) { readAndCheck := func(swapped bool) {
readFirst, ok := p.Get(ctx) var readFirst, readSecond fs.ObjectPair
assert.True(t, ok) var ok1, ok2 bool
readSecond, ok := p.Get(ctx) if test.fraction < 0 {
assert.True(t, ok) readFirst, ok1 = p.Get(ctx)
readSecond, ok2 = p.Get(ctx)
} else {
readFirst, ok1 = p.GetMax(ctx, test.fraction)
readSecond, ok2 = p.GetMax(ctx, test.fraction)
}
assert.True(t, ok1)
assert.True(t, ok2)
if swapped { if swapped {
assert.True(t, readFirst == pair2 && readSecond == pair1) assert.True(t, readFirst == pair2 && readSecond == pair1)
@ -177,6 +182,11 @@ func TestPipeOrderBy(t *testing.T) {
} }
} }
ok := p.Put(ctx, pair1)
assert.True(t, ok)
ok = p.Put(ctx, pair2)
assert.True(t, ok)
readAndCheck(test.swapped1) readAndCheck(test.swapped1)
// insert other way round // insert other way round
@ -193,25 +203,37 @@ func TestPipeOrderBy(t *testing.T) {
func TestNewLess(t *testing.T) { func TestNewLess(t *testing.T) {
t.Run("blankOK", func(t *testing.T) { t.Run("blankOK", func(t *testing.T) {
less, err := newLess("") less, _, err := newLess("")
require.NoError(t, err) require.NoError(t, err)
assert.Nil(t, less) assert.Nil(t, less)
}) })
t.Run("tooManyParts", func(t *testing.T) { t.Run("tooManyParts", func(t *testing.T) {
_, err := newLess("too,many,parts") _, _, err := newLess("size,asc,toomanyparts")
require.Error(t, err) require.Error(t, err)
assert.Contains(t, err.Error(), "bad --order-by string") assert.Contains(t, err.Error(), "bad --order-by string")
}) })
t.Run("tooManyParts2", func(t *testing.T) {
_, _, err := newLess("size,mixed,50,toomanyparts")
require.Error(t, err)
assert.Contains(t, err.Error(), "bad --order-by string")
})
t.Run("badMixed", func(t *testing.T) {
_, _, err := newLess("size,mixed,32.7")
require.Error(t, err)
assert.Contains(t, err.Error(), "bad mixed fraction")
})
t.Run("unknownComparison", func(t *testing.T) { t.Run("unknownComparison", func(t *testing.T) {
_, err := newLess("potato") _, _, err := newLess("potato")
require.Error(t, err) require.Error(t, err)
assert.Contains(t, err.Error(), "unknown --order-by comparison") assert.Contains(t, err.Error(), "unknown --order-by comparison")
}) })
t.Run("unknownSortDirection", func(t *testing.T) { t.Run("unknownSortDirection", func(t *testing.T) {
_, err := newLess("name,sideways") _, _, err := newLess("name,sideways")
require.Error(t, err) require.Error(t, err)
assert.Contains(t, err.Error(), "unknown --order-by sort direction") assert.Contains(t, err.Error(), "unknown --order-by sort direction")
}) })
@ -227,19 +249,23 @@ func TestNewLess(t *testing.T) {
orderBy string orderBy string
pair1LessPair2 bool pair1LessPair2 bool
pair2LessPair1 bool pair2LessPair1 bool
wantFraction int
}{ }{
{"size", true, false}, {"size", true, false, -1},
{"name", false, true}, {"name", false, true, -1},
{"modtime", false, false}, {"modtime", false, false, -1},
{"size,ascending", true, false}, {"size,ascending", true, false, -1},
{"name,asc", false, true}, {"name,asc", false, true, -1},
{"modtime,ascending", false, false}, {"modtime,ascending", false, false, -1},
{"size,descending", false, true}, {"size,descending", false, true, -1},
{"name,desc", true, false}, {"name,desc", true, false, -1},
{"modtime,descending", true, true}, {"modtime,descending", true, true, -1},
{"modtime,mixed", false, false, 50},
{"modtime,mixed,30", false, false, 30},
} { } {
t.Run(test.orderBy, func(t *testing.T) { t.Run(test.orderBy, func(t *testing.T) {
less, err := newLess(test.orderBy) less, gotFraction, err := newLess(test.orderBy)
assert.Equal(t, test.wantFraction, gotFraction)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, less) require.NotNil(t, less)
pair1LessPair2 := less(pair1, pair2) pair1LessPair2 := less(pair1, pair2)

View File

@ -241,10 +241,10 @@ func (s *syncCopyMove) currentError() error {
// pairChecker reads Objects~s on in send to out if they need transferring. // pairChecker reads Objects~s on in send to out if they need transferring.
// //
// FIXME potentially doing lots of hashes at once // FIXME potentially doing lots of hashes at once
func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
for { for {
pair, ok := in.Get(s.ctx) pair, ok := in.GetMax(s.ctx, fraction)
if !ok { if !ok {
return return
} }
@ -297,10 +297,10 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) {
// pairRenamer reads Objects~s on in and attempts to rename them, // pairRenamer reads Objects~s on in and attempts to rename them,
// otherwise it sends them out if they need transferring. // otherwise it sends them out if they need transferring.
func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, wg *sync.WaitGroup) { func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
for { for {
pair, ok := in.Get(s.ctx) pair, ok := in.GetMax(s.ctx, fraction)
if !ok { if !ok {
return return
} }
@ -316,11 +316,11 @@ func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, wg *sync.WaitGroup) {
} }
// pairCopyOrMove reads Objects on in and moves or copies them. // pairCopyOrMove reads Objects on in and moves or copies them.
func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, wg *sync.WaitGroup) { func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, fraction int, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
var err error var err error
for { for {
pair, ok := in.Get(s.ctx) pair, ok := in.GetMax(s.ctx, fraction)
if !ok { if !ok {
return return
} }
@ -338,7 +338,8 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs,
func (s *syncCopyMove) startCheckers() { func (s *syncCopyMove) startCheckers() {
s.checkerWg.Add(fs.Config.Checkers) s.checkerWg.Add(fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ { for i := 0; i < fs.Config.Checkers; i++ {
go s.pairChecker(s.toBeChecked, s.toBeUploaded, &s.checkerWg) fraction := (100 * i) / fs.Config.Checkers
go s.pairChecker(s.toBeChecked, s.toBeUploaded, fraction, &s.checkerWg)
} }
} }
@ -353,7 +354,8 @@ func (s *syncCopyMove) stopCheckers() {
func (s *syncCopyMove) startTransfers() { func (s *syncCopyMove) startTransfers() {
s.transfersWg.Add(fs.Config.Transfers) s.transfersWg.Add(fs.Config.Transfers)
for i := 0; i < fs.Config.Transfers; i++ { for i := 0; i < fs.Config.Transfers; i++ {
go s.pairCopyOrMove(s.ctx, s.toBeUploaded, s.fdst, &s.transfersWg) fraction := (100 * i) / fs.Config.Transfers
go s.pairCopyOrMove(s.ctx, s.toBeUploaded, s.fdst, fraction, &s.transfersWg)
} }
} }
@ -371,7 +373,8 @@ func (s *syncCopyMove) startRenamers() {
} }
s.renamerWg.Add(fs.Config.Checkers) s.renamerWg.Add(fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ { for i := 0; i < fs.Config.Checkers; i++ {
go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, &s.renamerWg) fraction := (100 * i) / fs.Config.Checkers
go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, fraction, &s.renamerWg)
} }
} }