operations: use less memory when doing multithread uploads

For uploads which are coming from disk or going to disk or going to a
backend which doesn't need to seek except for retries this doesn't
buffer the input.

This dramatically reduces rclone's memory usage.

Fixes #7350
This commit is contained in:
Nick Craig-Wood 2023-10-08 18:09:53 +01:00
parent c0fb9ebfce
commit 1f9a79ef09
1 changed files with 29 additions and 19 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
)
@ -53,13 +52,13 @@ func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool {
// state for a multi-thread copy
type multiThreadCopyState struct {
ctx context.Context
partSize int64
size int64
src fs.Object
acc *accounting.Account
numChunks int
noSeek bool // set if sure the receiving fs won't seek the input
ctx context.Context
partSize int64
size int64
src fs.Object
acc *accounting.Account
numChunks int
noBuffering bool // set to read the input without buffering
}
// Copy a single chunk into place
@ -88,10 +87,11 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer
defer fs.CheckClose(rc, &err)
var rs io.ReadSeeker
if mc.noSeek {
if mc.noBuffering {
// Read directly if we are sure we aren't going to seek
// and account with accounting
rs = readers.NoSeeker{Reader: mc.acc.WrapStream(rc)}
rc.SetAccounting(mc.acc.AccountRead)
rs = rc
} else {
// Read the chunk into buffered reader
rw := multipart.NewRW()
@ -130,15 +130,25 @@ func calculateNumChunks(size int64, chunkSize int64) int {
func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption) (newDst fs.Object, err error) {
openChunkWriter := f.Features().OpenChunkWriter
ci := fs.GetConfig(ctx)
noseek := false
noBuffering := false
if openChunkWriter == nil {
openWriterAt := f.Features().OpenWriterAt
if openWriterAt == nil {
return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported")
}
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f)
// We don't seek the chunks with OpenWriterAt
noseek = true
// If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
noBuffering = true
} else if src.Fs().Features().IsLocal {
// If the source fs is local we don't need to buffer
fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk")
noBuffering = true
} else if f.Features().ChunkWriterDoesntSeek {
// If the destination Fs promises not to seek its chunks
// (except for retries) then we don't need buffering.
fs.Debugf(src, "multi-thread copy: disabling buffering because destination has set ChunkWriterDoesntSeek")
noBuffering = true
}
if src.Size() < 0 {
@ -192,12 +202,12 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
g.SetLimit(concurrency)
mc := &multiThreadCopyState{
ctx: gCtx,
size: src.Size(),
src: src,
partSize: info.ChunkSize,
numChunks: numChunks,
noSeek: noseek,
ctx: gCtx,
size: src.Size(),
src: src,
partSize: info.ChunkSize,
numChunks: numChunks,
noBuffering: noBuffering,
}
// Make accounting