diff --git a/README.md b/README.md index bf8dd1698..7439ae75a 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Please see [the full list of all storage providers and their features](https://r * Optional encryption ([Crypt](https://rclone.org/crypt/)) * Optional cache ([Cache](https://rclone.org/cache/)) * Optional FUSE mount ([rclone mount](https://rclone.org/commands/rclone_mount/)) + * Multi-threaded downloads to local disk ## Installation & documentation diff --git a/docs/content/about.md b/docs/content/about.md index b2d9f0bd6..7483aef6d 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -65,6 +65,7 @@ Features * ([Cache](/cache/)) backend * ([Union](/union/)) backend * Optional FUSE mount ([rclone mount](/commands/rclone_mount/)) + * Multi-threaded downloads to local disk Links diff --git a/docs/content/docs.md b/docs/content/docs.md index ab0e6249f..8a0446237 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -674,6 +674,49 @@ if you are reading and writing to an OS X filing system this will be This command line flag allows you to override that computed default. +### --multi-thread-cutoff=SIZE ### + +When downloading files to the local backend above this size, rclone +will use multiple threads to download the file. (default 250M) + +Rclone preallocates the file (using `fallocate(FALLOC_FL_KEEP_SIZE)` +on unix or `NTSetInformationFile` on Windows both of which takes no +time) then each thread writes directly into the file at the correct +place. This means that rclone won't create fragmented or sparse files +and there won't be any assembly time at the end of the transfer. + +The number of threads used to dowload is controlled by +`--multi-thread-streams`. + +Use `-vv` if you wish to see info about the threads. + +This will work with the `sync`/`copy`/`move` commands and friends +`copyto`/`moveto`. Multi thread downloads will be used with `rclone +mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or +above. + +**NB** that this **only** works for a local destination but will work +with any source. + +### --multi-thread-streams=N ### + +When using multi thread downloads (see above `--multi-thread-cutoff`) +this sets the maximum number of streams to use. Set to `0` to disable +multi thread downloads. (Default 4) + +Exactly how many streams rclone uses for the download depends on the +size of the file. To calculate the number of download streams Rclone +divides the size of the file by the `--multi-thread-cutoff` and rounds +up, up to the maximum set with `--multi-thread-streams`. + +So if `--multi-thread-cutoff 250MB` and `--multi-thread-streams 4` are +in effect (the defaults): + +- 0MB.250MB files will be downloaded with 1 stream +- 250MB..500MB files will be downloaded with 2 streams +- 500MB..750MB files will be downloaded with 3 streams +- 750MB+ files will be downloaded with 4 streams + ### --no-gzip-encoding ### Don't set `Accept-Encoding: gzip`. This means that rclone won't ask diff --git a/fs/config.go b/fs/config.go index f3745b1a1..f6b8eb1d9 100644 --- a/fs/config.go +++ b/fs/config.go @@ -93,6 +93,8 @@ type ConfigInfo struct { CaCert string // Client Side CA ClientCert string // Client Side Cert ClientKey string // Client Side Key + MultiThreadCutoff SizeSuffix + MultiThreadStreams int } // NewConfig creates a new config with everything set to the default @@ -124,6 +126,8 @@ func NewConfig() *ConfigInfo { c.MaxBacklog = 10000 // We do not want to set the default here. We use this variable being empty as part of the fall-through of options. // c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - " + c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024) + c.MultiThreadStreams = 4 return c } diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index c060ea078..57a36001c 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -95,6 +95,8 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.StringVarP(flagSet, &fs.Config.CaCert, "ca-cert", "", fs.Config.CaCert, "CA certificate used to verify servers") flags.StringVarP(flagSet, &fs.Config.ClientCert, "client-cert", "", fs.Config.ClientCert, "Client SSL certificate (PEM) for mutual TLS auth") flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") + flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.") + flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.") } // SetFlags converts any flags into config which weren't straight forward diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go new file mode 100644 index 000000000..bbb913287 --- /dev/null +++ b/fs/operations/multithread.go @@ -0,0 +1,171 @@ +package operations + +import ( + "context" + "io" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/accounting" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +const ( + multithreadChunkSize = 64 << 10 + multithreadChunkSizeMask = multithreadChunkSize - 1 + multithreadBufferSize = 32 * 1024 +) + +// state for a multi-thread copy +type multiThreadCopyState struct { + ctx context.Context + partSize int64 + size int64 + wc fs.WriterAtCloser + src fs.Object + acc *accounting.Account + streams int +} + +// Copy a single stream into place +func (mc *multiThreadCopyState) copyStream(stream int) (err error) { + defer func() { + if err != nil { + fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err) + } + }() + start := int64(stream) * mc.partSize + if start >= mc.size { + return nil + } + end := start + mc.partSize + if end > mc.size { + end = mc.size + } + + fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) + + rc, err := newReOpen(mc.src, nil, &fs.RangeOption{Start: start, End: end - 1}, fs.Config.LowLevelRetries) + if err != nil { + return errors.Wrap(err, "multpart copy: failed to open source") + } + defer fs.CheckClose(rc, &err) + + // Copy the data + buf := make([]byte, multithreadBufferSize) + offset := start + for { + // Check if context cancelled and exit if so + if mc.ctx.Err() != nil { + return mc.ctx.Err() + } + nr, er := rc.Read(buf) + if nr > 0 { + err = mc.acc.AccountRead(nr) + if err != nil { + return errors.Wrap(err, "multpart copy: accounting failed") + } + nw, ew := mc.wc.WriteAt(buf[0:nr], offset) + if nw > 0 { + offset += int64(nw) + } + if ew != nil { + return errors.Wrap(ew, "multpart copy: write failed") + } + if nr != nw { + return errors.Wrap(io.ErrShortWrite, "multpart copy") + } + } + if er != nil { + if er != io.EOF { + return errors.Wrap(er, "multpart copy: read failed") + } + break + } + } + + if offset != end { + return errors.Errorf("multpart copy: wrote %d bytes but expected to write %d", offset-start, end-start) + } + + fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) + return nil +} + +// Calculate the chunk sizes and updated number of streams +func (mc *multiThreadCopyState) calculateChunks() { + partSize := mc.size / int64(mc.streams) + // Round partition size up so partSize * streams >= size + if (mc.size % int64(mc.streams)) != 0 { + partSize++ + } + // round partSize up to nearest multithreadChunkSize boundary + mc.partSize = (partSize + multithreadChunkSizeMask) &^ multithreadChunkSizeMask + // recalculate number of streams + mc.streams = int(mc.size / mc.partSize) + // round streams up so partSize * streams >= size + if (mc.size % mc.partSize) != 0 { + mc.streams++ + } +} + +// Copy src to (f, remote) using streams download threads and the OpenWriterAt feature +func multiThreadCopy(f fs.Fs, remote string, src fs.Object, streams int) (newDst fs.Object, err error) { + openWriterAt := f.Features().OpenWriterAt + if openWriterAt == nil { + return nil, errors.New("multi-thread copy: OpenWriterAt not supported") + } + if src.Size() < 0 { + return nil, errors.New("multi-thread copy: can't copy unknown sized file") + } + if src.Size() == 0 { + return nil, errors.New("multi-thread copy: can't copy zero sized file") + } + + g, ctx := errgroup.WithContext(context.Background()) + mc := &multiThreadCopyState{ + ctx: ctx, + size: src.Size(), + src: src, + streams: streams, + } + mc.calculateChunks() + + // Make accounting + mc.acc = accounting.NewAccount(nil, src) + defer fs.CheckClose(mc.acc, &err) + + // create write file handle + mc.wc, err = openWriterAt(remote, mc.size) + if err != nil { + return nil, errors.Wrap(err, "multpart copy: failed to open destination") + } + defer fs.CheckClose(mc.wc, &err) + + fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize)) + for stream := 0; stream < mc.streams; stream++ { + stream := stream + g.Go(func() (err error) { + return mc.copyStream(stream) + }) + } + err = g.Wait() + if err != nil { + return nil, err + } + + obj, err := f.NewObject(remote) + if err != nil { + return nil, errors.Wrap(err, "multi-thread copy: failed to find object after copy") + } + + err = obj.SetModTime(src.ModTime()) + switch err { + case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete: + default: + return nil, errors.Wrap(err, "multi-thread copy: failed to set modification time") + } + + fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize)) + return obj, nil +} diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go new file mode 100644 index 000000000..4ebc54484 --- /dev/null +++ b/fs/operations/multithread_test.go @@ -0,0 +1,70 @@ +package operations + +import ( + "fmt" + "testing" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMultithreadCalculateChunks(t *testing.T) { + for _, test := range []struct { + size int64 + streams int + wantPartSize int64 + wantStreams int + }{ + {size: 1, streams: 10, wantPartSize: multithreadChunkSize, wantStreams: 1}, + {size: 1 << 20, streams: 1, wantPartSize: 1 << 20, wantStreams: 1}, + {size: 1 << 20, streams: 2, wantPartSize: 1 << 19, wantStreams: 2}, + {size: (1 << 20) + 1, streams: 2, wantPartSize: (1 << 19) + multithreadChunkSize, wantStreams: 2}, + {size: (1 << 20) - 1, streams: 2, wantPartSize: (1 << 19), wantStreams: 2}, + } { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + mc := &multiThreadCopyState{ + size: test.size, + streams: test.streams, + } + mc.calculateChunks() + assert.Equal(t, test.wantPartSize, mc.partSize) + assert.Equal(t, test.wantStreams, mc.streams) + }) + } +} + +func TestMultithreadCopy(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + for _, test := range []struct { + size int + streams int + }{ + {size: multithreadChunkSize*2 - 1, streams: 2}, + {size: multithreadChunkSize * 2, streams: 2}, + {size: multithreadChunkSize*2 + 1, streams: 2}, + } { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + contents := fstest.RandomString(test.size) + t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") + file1 := r.WriteObject("file1", contents, t1) + fstest.CheckItems(t, r.Fremote, file1) + fstest.CheckItems(t, r.Flocal) + + src, err := r.Fremote.NewObject("file1") + require.NoError(t, err) + + dst, err := multiThreadCopy(r.Flocal, "file1", src, 2) + require.NoError(t, err) + assert.Equal(t, src.Size(), dst.Size()) + assert.Equal(t, "file1", dst.Remote()) + + fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.ModTimeNotSupported) + require.NoError(t, dst.Remove()) + }) + } + +} diff --git a/fs/operations/operations.go b/fs/operations/operations.go index ab2d7fd49..820737841 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -292,38 +292,56 @@ func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Objec } // If can't server side copy, do it manually if err == fs.ErrorCantCopy { - var in0 io.ReadCloser - in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries) - if err != nil { - err = errors.Wrap(err, "failed to open source object") - } else { - if src.Size() == -1 { - // -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream. - if doUpdate { - actionTaken = "Copied (Rcat, replaced existing)" - } else { - actionTaken = "Copied (Rcat, new)" - } - dst, err = Rcat(f, remote, in0, src.ModTime()) - newDst = dst + if doOpenWriterAt := f.Features().OpenWriterAt; doOpenWriterAt != nil && src.Size() >= int64(fs.Config.MultiThreadCutoff) && fs.Config.MultiThreadStreams > 1 { + // Number of streams proportional to size + streams := src.Size() / int64(fs.Config.MultiThreadCutoff) + // With maximum + if streams > int64(fs.Config.MultiThreadStreams) { + streams = int64(fs.Config.MultiThreadStreams) + } + if streams < 2 { + streams = 2 + } + dst, err = multiThreadCopy(f, remote, src, int(streams)) + if doUpdate { + actionTaken = "Multi-thread Copied (replaced existing)" } else { - in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer - var wrappedSrc fs.ObjectInfo = src - // We try to pass the original object if possible - if src.Remote() != remote { - wrappedSrc = &overrideRemoteObject{Object: src, remote: remote} - } - if doUpdate { - actionTaken = "Copied (replaced existing)" - err = dst.Update(in, wrappedSrc, hashOption) - } else { - actionTaken = "Copied (new)" - dst, err = f.Put(in, wrappedSrc, hashOption) - } - closeErr := in.Close() - if err == nil { + actionTaken = "Multi-thread Copied (new)" + } + } else { + var in0 io.ReadCloser + in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries) + if err != nil { + err = errors.Wrap(err, "failed to open source object") + } else { + if src.Size() == -1 { + // -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream. + if doUpdate { + actionTaken = "Copied (Rcat, replaced existing)" + } else { + actionTaken = "Copied (Rcat, new)" + } + dst, err = Rcat(f, remote, in0, src.ModTime()) newDst = dst - err = closeErr + } else { + in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer + var wrappedSrc fs.ObjectInfo = src + // We try to pass the original object if possible + if src.Remote() != remote { + wrappedSrc = &overrideRemoteObject{Object: src, remote: remote} + } + if doUpdate { + actionTaken = "Copied (replaced existing)" + err = dst.Update(in, wrappedSrc, hashOption) + } else { + actionTaken = "Copied (new)" + dst, err = f.Put(in, wrappedSrc, hashOption) + } + closeErr := in.Close() + if err == nil { + newDst = dst + err = closeErr + } } } }