diff --git a/docs/content/docs.md b/docs/content/docs.md index 4a1c341ab..52ba64a1d 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -879,6 +879,17 @@ This can have a modifier appended with a comma: - `ascending` or `asc` - order so that the smallest (or oldest) is processed first - `descending` or `desc` - order so that the largest (or newest) is processed first +- `mixed` - order so that the smallest is processed first for some threads and the largest for others + +If the modifier is `mixed` then it can have an optional percentage +(which defaults to `50`), eg `size,mixed,25` which means that 25% of +the threads should be taking the smallest items and 75% the +largest. The threads which take the smallest first will always take +the smallest first and likewise the largest first threads. The `mixed` +mode can be useful to minimise the transfer time when you are +transferring a mixture of large and small files - the large files are +guaranteed upload threads and bandwidth and the small files will be +processed continuously. If no modifier is supplied then the order is `ascending`. diff --git a/fs/sync/pipe.go b/fs/sync/pipe.go index 1e19e39c6..5e2b39b01 100644 --- a/fs/sync/pipe.go +++ b/fs/sync/pipe.go @@ -2,6 +2,7 @@ package sync import ( "context" + "strconv" "strings" "sync" @@ -25,17 +26,19 @@ type pipe struct { totalSize int64 stats func(items int, totalSize int64) less lessFn + fraction int } func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) { - less, err := newLess(orderBy) + less, fraction, err := newLess(orderBy) if err != nil { return nil, fserrors.FatalError(err) } p := &pipe{ - c: make(chan struct{}, maxBacklog), - stats: stats, - less: less, + c: make(chan struct{}, maxBacklog), + stats: stats, + less: less, + fraction: fraction, } if p.less != nil { deheap.Init(p) @@ -105,9 +108,12 @@ func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) { // Get a pair from the pipe // +// If fraction is > the mixed fraction set in the pipe then it gets it +// from the other end of the heap if order-by is in effect +// // It returns ok = false if the context was cancelled or Close() has // been called. -func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { +func (p *pipe) GetMax(ctx context.Context, fraction int) (pair fs.ObjectPair, ok bool) { if ctx.Err() != nil { return } @@ -125,8 +131,10 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { pair = p.queue[0] p.queue[0] = fs.ObjectPair{} // avoid memory leak p.queue = p.queue[1:] - } else { + } else if p.fraction < 0 || fraction < p.fraction { pair = deheap.Pop(p).(fs.ObjectPair) + } else { + pair = deheap.PopMax(p).(fs.ObjectPair) } size := pair.Src.Size() if size > 0 { @@ -140,6 +148,14 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { return pair, true } +// Get a pair from the pipe +// +// It returns ok = false if the context was cancelled or Close() has +// been called. +func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { + return p.GetMax(ctx, -1) +} + // Stats reads the number of items in the queue and the totalSize func (p *pipe) Stats() (items int, totalSize int64) { p.mu.Lock() @@ -160,14 +176,12 @@ func (p *pipe) Close() { // newLess returns a less function for the heap comparison or nil if // one is not required -func newLess(orderBy string) (less lessFn, err error) { +func newLess(orderBy string) (less lessFn, fraction int, err error) { + fraction = -1 if orderBy == "" { - return nil, nil + return nil, fraction, nil } parts := strings.Split(strings.ToLower(orderBy), ",") - if len(parts) > 2 { - return nil, errors.Errorf("bad --order-by string %q", orderBy) - } switch parts[0] { case "name": less = func(a, b fs.ObjectPair) bool { @@ -183,7 +197,7 @@ func newLess(orderBy string) (less lessFn, err error) { return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx)) } default: - return nil, errors.Errorf("unknown --order-by comparison %q", parts[0]) + return nil, fraction, errors.Errorf("unknown --order-by comparison %q", parts[0]) } descending := false if len(parts) > 1 { @@ -191,15 +205,27 @@ func newLess(orderBy string) (less lessFn, err error) { case "ascending", "asc": case "descending", "desc": descending = true + case "mixed": + fraction = 50 + if len(parts) > 2 { + fraction, err = strconv.Atoi(parts[2]) + if err != nil { + return nil, fraction, errors.Errorf("bad mixed fraction --order-by %q", parts[2]) + } + } + default: - return nil, errors.Errorf("unknown --order-by sort direction %q", parts[1]) + return nil, fraction, errors.Errorf("unknown --order-by sort direction %q", parts[1]) } } + if (fraction >= 0 && len(parts) > 3) || (fraction < 0 && len(parts) > 2) { + return nil, fraction, errors.Errorf("bad --order-by string %q", orderBy) + } if descending { oldLess := less less = func(a, b fs.ObjectPair) bool { return !oldLess(a, b) } } - return less, nil + return less, fraction, nil } diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go index 59506bcc1..84c9129e5 100644 --- a/fs/sync/pipe_test.go +++ b/fs/sync/pipe_test.go @@ -143,32 +143,37 @@ func TestPipeOrderBy(t *testing.T) { orderBy string swapped1 bool swapped2 bool + fraction int }{ - {"", false, true}, - {"size", false, false}, - {"name", true, true}, - {"modtime", false, true}, - {"size,ascending", false, false}, - {"name,asc", true, true}, - {"modtime,ascending", false, true}, - {"size,descending", true, true}, - {"name,desc", false, false}, - {"modtime,descending", true, false}, + {"", false, true, -1}, + {"size", false, false, -1}, + {"name", true, true, -1}, + {"modtime", false, true, -1}, + {"size,ascending", false, false, -1}, + {"name,asc", true, true, -1}, + {"modtime,ascending", false, true, -1}, + {"size,descending", true, true, -1}, + {"name,desc", false, false, -1}, + {"modtime,descending", true, false, -1}, + {"size,mixed,50", false, false, 25}, + {"size,mixed,51", true, true, 75}, } { t.Run(test.orderBy, func(t *testing.T) { p, err := newPipe(test.orderBy, stats, 10) require.NoError(t, err) - ok := p.Put(ctx, pair1) - assert.True(t, ok) - ok = p.Put(ctx, pair2) - assert.True(t, ok) - readAndCheck := func(swapped bool) { - readFirst, ok := p.Get(ctx) - assert.True(t, ok) - readSecond, ok := p.Get(ctx) - assert.True(t, ok) + var readFirst, readSecond fs.ObjectPair + var ok1, ok2 bool + if test.fraction < 0 { + readFirst, ok1 = p.Get(ctx) + readSecond, ok2 = p.Get(ctx) + } else { + readFirst, ok1 = p.GetMax(ctx, test.fraction) + readSecond, ok2 = p.GetMax(ctx, test.fraction) + } + assert.True(t, ok1) + assert.True(t, ok2) if swapped { assert.True(t, readFirst == pair2 && readSecond == pair1) @@ -177,6 +182,11 @@ func TestPipeOrderBy(t *testing.T) { } } + ok := p.Put(ctx, pair1) + assert.True(t, ok) + ok = p.Put(ctx, pair2) + assert.True(t, ok) + readAndCheck(test.swapped1) // insert other way round @@ -193,25 +203,37 @@ func TestPipeOrderBy(t *testing.T) { func TestNewLess(t *testing.T) { t.Run("blankOK", func(t *testing.T) { - less, err := newLess("") + less, _, err := newLess("") require.NoError(t, err) assert.Nil(t, less) }) t.Run("tooManyParts", func(t *testing.T) { - _, err := newLess("too,many,parts") + _, _, err := newLess("size,asc,toomanyparts") require.Error(t, err) assert.Contains(t, err.Error(), "bad --order-by string") }) + t.Run("tooManyParts2", func(t *testing.T) { + _, _, err := newLess("size,mixed,50,toomanyparts") + require.Error(t, err) + assert.Contains(t, err.Error(), "bad --order-by string") + }) + + t.Run("badMixed", func(t *testing.T) { + _, _, err := newLess("size,mixed,32.7") + require.Error(t, err) + assert.Contains(t, err.Error(), "bad mixed fraction") + }) + t.Run("unknownComparison", func(t *testing.T) { - _, err := newLess("potato") + _, _, err := newLess("potato") require.Error(t, err) assert.Contains(t, err.Error(), "unknown --order-by comparison") }) t.Run("unknownSortDirection", func(t *testing.T) { - _, err := newLess("name,sideways") + _, _, err := newLess("name,sideways") require.Error(t, err) assert.Contains(t, err.Error(), "unknown --order-by sort direction") }) @@ -227,19 +249,23 @@ func TestNewLess(t *testing.T) { orderBy string pair1LessPair2 bool pair2LessPair1 bool + wantFraction int }{ - {"size", true, false}, - {"name", false, true}, - {"modtime", false, false}, - {"size,ascending", true, false}, - {"name,asc", false, true}, - {"modtime,ascending", false, false}, - {"size,descending", false, true}, - {"name,desc", true, false}, - {"modtime,descending", true, true}, + {"size", true, false, -1}, + {"name", false, true, -1}, + {"modtime", false, false, -1}, + {"size,ascending", true, false, -1}, + {"name,asc", false, true, -1}, + {"modtime,ascending", false, false, -1}, + {"size,descending", false, true, -1}, + {"name,desc", true, false, -1}, + {"modtime,descending", true, true, -1}, + {"modtime,mixed", false, false, 50}, + {"modtime,mixed,30", false, false, 30}, } { t.Run(test.orderBy, func(t *testing.T) { - less, err := newLess(test.orderBy) + less, gotFraction, err := newLess(test.orderBy) + assert.Equal(t, test.wantFraction, gotFraction) require.NoError(t, err) require.NotNil(t, less) pair1LessPair2 := less(pair1, pair2) diff --git a/fs/sync/sync.go b/fs/sync/sync.go index c3887c9e6..8bab3bb97 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -241,10 +241,10 @@ func (s *syncCopyMove) currentError() error { // pairChecker reads Objects~s on in send to out if they need transferring. // // FIXME potentially doing lots of hashes at once -func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { +func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) { defer wg.Done() for { - pair, ok := in.Get(s.ctx) + pair, ok := in.GetMax(s.ctx, fraction) if !ok { return } @@ -297,10 +297,10 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { // pairRenamer reads Objects~s on in and attempts to rename them, // otherwise it sends them out if they need transferring. -func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, wg *sync.WaitGroup) { +func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) { defer wg.Done() for { - pair, ok := in.Get(s.ctx) + pair, ok := in.GetMax(s.ctx, fraction) if !ok { return } @@ -316,11 +316,11 @@ func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, wg *sync.WaitGroup) { } // pairCopyOrMove reads Objects on in and moves or copies them. -func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, wg *sync.WaitGroup) { +func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, fraction int, wg *sync.WaitGroup) { defer wg.Done() var err error for { - pair, ok := in.Get(s.ctx) + pair, ok := in.GetMax(s.ctx, fraction) if !ok { return } @@ -338,7 +338,8 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, func (s *syncCopyMove) startCheckers() { s.checkerWg.Add(fs.Config.Checkers) for i := 0; i < fs.Config.Checkers; i++ { - go s.pairChecker(s.toBeChecked, s.toBeUploaded, &s.checkerWg) + fraction := (100 * i) / fs.Config.Checkers + go s.pairChecker(s.toBeChecked, s.toBeUploaded, fraction, &s.checkerWg) } } @@ -353,7 +354,8 @@ func (s *syncCopyMove) stopCheckers() { func (s *syncCopyMove) startTransfers() { s.transfersWg.Add(fs.Config.Transfers) for i := 0; i < fs.Config.Transfers; i++ { - go s.pairCopyOrMove(s.ctx, s.toBeUploaded, s.fdst, &s.transfersWg) + fraction := (100 * i) / fs.Config.Transfers + go s.pairCopyOrMove(s.ctx, s.toBeUploaded, s.fdst, fraction, &s.transfersWg) } } @@ -371,7 +373,8 @@ func (s *syncCopyMove) startRenamers() { } s.renamerWg.Add(fs.Config.Checkers) for i := 0; i < fs.Config.Checkers; i++ { - go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, &s.renamerWg) + fraction := (100 * i) / fs.Config.Checkers + go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, fraction, &s.renamerWg) } }