diff --git a/backend/s3/s3.go b/backend/s3/s3.go index c0359029e..473f74ece 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -61,6 +61,7 @@ import ( "github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/version" "golang.org/x/net/http/httpguts" + "golang.org/x/sync/errgroup" ) // The S3 providers @@ -2185,10 +2186,10 @@ If empty it will default to the environment variable "AWS_PROFILE" or Sensitive: true, }, { Name: "upload_concurrency", - Help: `Concurrency for multipart uploads. + Help: `Concurrency for multipart uploads and copies. This is the number of chunks of the same file that are uploaded -concurrently. +concurrently for multipart uploads and copies. If you are uploading small numbers of large files over high-speed links and these uploads do not fully utilize your bandwidth, then increasing @@ -4507,10 +4508,20 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst fs.Debugf(src, "Starting multipart copy with %d parts", numParts) - var parts []*s3.CompletedPart + var ( + parts = make([]*s3.CompletedPart, numParts) + g, gCtx = errgroup.WithContext(ctx) + ) + g.SetLimit(f.opt.UploadConcurrency) for partNum := int64(1); partNum <= numParts; partNum++ { - if err := f.pacer.Call(func() (bool, error) { - partNum := partNum + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + break + } + partNum := partNum // for closure + g.Go(func() error { + var uout *s3.UploadPartCopyOutput uploadPartReq := &s3.UploadPartCopyInput{} //structs.SetFrom(uploadPartReq, copyReq) setFrom_s3UploadPartCopyInput_s3CopyObjectInput(uploadPartReq, copyReq) @@ -4519,18 +4530,24 @@ func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dst uploadPartReq.PartNumber = &partNum uploadPartReq.UploadId = uid uploadPartReq.CopySourceRange = aws.String(calculateRange(partSize, partNum-1, numParts, srcSize)) - uout, err := f.c.UploadPartCopyWithContext(ctx, uploadPartReq) + err := f.pacer.Call(func() (bool, error) { + uout, err = f.c.UploadPartCopyWithContext(gCtx, uploadPartReq) + return f.shouldRetry(gCtx, err) + }) if err != nil { - return f.shouldRetry(ctx, err) + return err } - parts = append(parts, &s3.CompletedPart{ + parts[partNum-1] = &s3.CompletedPart{ PartNumber: &partNum, ETag: uout.CopyPartResult.ETag, - }) - return false, nil - }); err != nil { - return err - } + } + return nil + }) + } + + err = g.Wait() + if err != nil { + return err } return f.pacer.Call(func() (bool, error) {