diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 07ed4c384..d68b340ba 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -5316,7 +5316,198 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once -func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (wantETag, gotETag string, versionID *string, err error) { +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + + // This duplicates part of the logic in Update, + //however per my understanding it is required until we migrate the MultiPartUpload to OpenChunkWriter/multi-thread op completely + // Temporary Object under construction + o := &Object{ + fs: f, + remote: remote, + } + req, _, err := o.buildS3Req(ctx, src, options) + if err != nil { + return -1, nil, fmt.Errorf("failed to build s3 request: %v", err) + } + + //structs.SetFrom(&mReq, req) + var mReq s3.CreateMultipartUploadInput + setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, req) + + uploadParts := f.opt.MaxUploadParts + if uploadParts < 1 { + uploadParts = 1 + } else if uploadParts > maxUploadParts { + uploadParts = maxUploadParts + } + size := src.Size() + + // calculate size of parts + chunkSize := f.opt.ChunkSize + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of + // 48 GiB which seems like a not too unreasonable limit. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts))) + }) + } else { + chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize) + } + + mOut, err := f.c.CreateMultipartUploadWithContext(ctx, &mReq) + if err != nil { + return -1, nil, fmt.Errorf("CreateMultipartUpload failed: %w", err) + } + + chunkWriter := &s3ChunkWriter{ + ctx: ctx, + chunkSize: int64(chunkSize), + size: src.Size(), + f: f, + bucket: mOut.Bucket, + key: mOut.Key, + uploadId: mOut.UploadId, + multiPartUploadInput: &mReq, + completedParts: make([]*s3.CompletedPart, 0), + } + fs.Debugf(f, "open chunk writer: started multipart upload: %v", *mOut.UploadId) + return int64(chunkSize), chunkWriter, err +} + +type s3ChunkWriter struct { + ctx context.Context + chunkSize int64 + size int64 + f *Fs + bucket *string + key *string + uploadId *string + multiPartUploadInput *s3.CreateMultipartUploadInput + completedPartsMu sync.Mutex + completedParts []*s3.CompletedPart + eTag string + versionID string + md5sMu sync.Mutex + md5s []byte +} + +func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) { + if chunkNumber < 0 { + err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) + return -1, err + } + + addMd5 := func(md5binary *[]byte, chunkNumber int64) { + w.md5sMu.Lock() + defer w.md5sMu.Unlock() + start := chunkNumber * md5.Size + end := start + md5.Size + if extend := end - int64(len(w.md5s)); extend > 0 { + w.md5s = append(w.md5s, make([]byte, extend)...) + } + copy(w.md5s[start:end], (*md5binary)[:]) + } + + // create checksum of buffer for integrity checking + // currently there is no way to calculate the md5 without reading the chunk a 2nd time (1st read is in uploadMultipart) + // possible in AWS SDK v2 with trailers? + m := md5.New() + currentChunkSize, err := io.Copy(m, reader) + if err != nil && err != io.EOF { + return -1, err + } + md5sumBinary := m.Sum([]byte{}) + addMd5(&md5sumBinary, int64(chunkNumber)) + md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) + + // reset the reader after we calculated the md5 + _, err = reader.Seek(0, io.SeekStart) + if err != nil { + return -1, err + } + + // S3 requires 1 <= PartNumber <= 10000 + s3PartNumber := aws.Int64(int64(chunkNumber + 1)) + uploadPartReq := &s3.UploadPartInput{ + Body: reader, + Bucket: w.bucket, + Key: w.key, + PartNumber: s3PartNumber, + UploadId: w.uploadId, + ContentMD5: &md5sum, + ContentLength: aws.Int64(currentChunkSize), + RequestPayer: w.multiPartUploadInput.RequestPayer, + SSECustomerAlgorithm: w.multiPartUploadInput.SSECustomerAlgorithm, + SSECustomerKey: w.multiPartUploadInput.SSECustomerKey, + SSECustomerKeyMD5: w.multiPartUploadInput.SSECustomerKeyMD5, + } + uout, err := w.f.c.UploadPartWithContext(w.ctx, uploadPartReq) + if err != nil { + fs.Errorf(w.f, "Failed to upload part: %v", err) + return -1, err + } + + addCompletedPart := func(partNum *int64, eTag *string) { + w.completedPartsMu.Lock() + defer w.completedPartsMu.Unlock() + w.completedParts = append(w.completedParts, &s3.CompletedPart{ + PartNumber: partNum, + ETag: uout.ETag, + }) + } + addCompletedPart(s3PartNumber, uout.ETag) + + fs.Debugf(w.f, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag) + return currentChunkSize, err +} + +func (w *s3ChunkWriter) Abort() error { + _, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: w.bucket, + Key: w.key, + UploadId: w.uploadId, + RequestPayer: w.multiPartUploadInput.RequestPayer, + }) + if err != nil { + fs.Errorf(w.f, "Failed to abort multipart upload: %v", err) + } + fs.Debugf(w.f, "multipart upload '%v' aborted", *w.uploadId) + return err +} + +func (w *s3ChunkWriter) Close() error { + // sort the completed parts by part number + sort.Slice(w.completedParts, func(i, j int) bool { + return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber + }) + resp, err := w.f.c.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ + Bucket: w.bucket, + Key: w.key, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: w.completedParts, + }, + RequestPayer: w.multiPartUploadInput.RequestPayer, + UploadId: w.uploadId, + }) + if err != nil { + fs.Errorf(w.f, "Failed to complete multipart upload: %v", err) + } + if resp != nil { + if resp.ETag != nil { + w.eTag = *resp.ETag + } + if resp.VersionId != nil { + w.versionID = *resp.VersionId + } + } + fs.Debugf(w.f, "multipart upload '%v' closed", *w.uploadId) + return err +} + +func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader) (wantETag, gotETag string, versionID *string, err error) { f := o.fs // make concurrency machinery @@ -5325,45 +5516,19 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si concurrency = 1 } tokens := pacer.NewTokenDispenser(concurrency) + openChunkWriter := f.Features().OpenChunkWriter - uploadParts := f.opt.MaxUploadParts - if uploadParts < 1 { - uploadParts = 1 - } else if uploadParts > maxUploadParts { - uploadParts = maxUploadParts - } - - // calculate size of parts - partSize := f.opt.ChunkSize - - // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize - // buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of - // 48 GiB which seems like a not too unreasonable limit. - if size == -1 { - warnStreamUpload.Do(func() { - fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", - f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*int64(uploadParts))) - }) - } else { - partSize = chunksize.Calculator(o, size, uploadParts, f.opt.ChunkSize) - } - - memPool := f.getMemoryPool(int64(partSize)) - - var mReq s3.CreateMultipartUploadInput - //structs.SetFrom(&mReq, req) - setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, req) - var cout *s3.CreateMultipartUploadOutput + var chunkWriter fs.ChunkWriter + var chunkSize int64 err = f.pacer.Call(func() (bool, error) { var err error - cout, err = f.c.CreateMultipartUploadWithContext(ctx, &mReq) + chunkSize, chunkWriter, err = openChunkWriter(ctx, src.Remote(), src) return f.shouldRetry(ctx, err) }) if err != nil { return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err) } - uid := cout.UploadId - + memPool := f.getMemoryPool(chunkSize) uploadCtx, cancel := context.WithCancel(ctx) defer atexit.OnError(&err, func() { cancel() @@ -5372,12 +5537,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si } fs.Debugf(o, "Cancelling multipart upload") errCancel := f.pacer.Call(func() (bool, error) { - _, err := f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ - Bucket: req.Bucket, - Key: req.Key, - UploadId: uid, - RequestPayer: req.RequestPayer, - }) + err := chunkWriter.Abort() return f.shouldRetry(ctx, err) }) if errCancel != nil { @@ -5388,25 +5548,10 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si var ( g, gCtx = errgroup.WithContext(uploadCtx) finished = false - partsMu sync.Mutex // to protect parts - parts []*s3.CompletedPart off int64 - md5sMu sync.Mutex - md5s []byte ) - addMd5 := func(md5binary *[md5.Size]byte, partNum int64) { - md5sMu.Lock() - defer md5sMu.Unlock() - start := partNum * md5.Size - end := start + md5.Size - if extend := end - int64(len(md5s)); extend > 0 { - md5s = append(md5s, make([]byte, extend)...) - } - copy(md5s[start:end], (*md5binary)[:]) - } - - for partNum := int64(1); !finished; partNum++ { + for partNum := int64(0); !finished; partNum++ { // Get a block of memory from the pool and token which limits concurrency. tokens.Get() buf := memPool.Get() @@ -5428,7 +5573,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si var n int n, err = readers.ReadFill(in, buf) // this can never return 0, nil if err == io.EOF { - if n == 0 && partNum != 1 { // end if no data and if not first chunk + if n == 0 && partNum != 0 { // end if no data and if not first chunk free() break } @@ -5440,32 +5585,12 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si buf = buf[:n] partNum := partNum - fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size)) + fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(src.Size())) off += int64(n) g.Go(func() (err error) { defer free() - partLength := int64(len(buf)) - - // create checksum of buffer for integrity checking - md5sumBinary := md5.Sum(buf) - addMd5(&md5sumBinary, partNum-1) - md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) - err = f.pacer.Call(func() (bool, error) { - uploadPartReq := &s3.UploadPartInput{ - Body: bytes.NewReader(buf), - Bucket: req.Bucket, - Key: req.Key, - PartNumber: &partNum, - UploadId: uid, - ContentMD5: &md5sum, - ContentLength: &partLength, - RequestPayer: req.RequestPayer, - SSECustomerAlgorithm: req.SSECustomerAlgorithm, - SSECustomerKey: req.SSECustomerKey, - SSECustomerKeyMD5: req.SSECustomerKeyMD5, - } - uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq) + _, err := chunkWriter.WriteChunk(int(partNum), bytes.NewReader(buf)) if err != nil { if partNum <= int64(concurrency) { return f.shouldRetry(gCtx, err) @@ -5473,13 +5598,6 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si // retry all chunks once have done the first batch return true, err } - partsMu.Lock() - parts = append(parts, &s3.CompletedPart{ - PartNumber: &partNum, - ETag: uout.ETag, - }) - partsMu.Unlock() - return false, nil }) if err != nil { @@ -5493,35 +5611,21 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return wantETag, gotETag, nil, err } - // sort the completed parts by part number - sort.Slice(parts, func(i, j int) bool { - return *parts[i].PartNumber < *parts[j].PartNumber - }) - - var resp *s3.CompleteMultipartUploadOutput err = f.pacer.Call(func() (bool, error) { - resp, err = f.c.CompleteMultipartUploadWithContext(uploadCtx, &s3.CompleteMultipartUploadInput{ - Bucket: req.Bucket, - Key: req.Key, - MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: parts, - }, - RequestPayer: req.RequestPayer, - UploadId: uid, - }) + err := chunkWriter.Close() return f.shouldRetry(uploadCtx, err) }) if err != nil { return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err) } - hashOfHashes := md5.Sum(md5s) - wantETag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(parts)) - if resp != nil { - if resp.ETag != nil { - gotETag = *resp.ETag - } - versionID = resp.VersionId - } + + var s3cw *s3ChunkWriter = chunkWriter.(*s3ChunkWriter) + gotETag = s3cw.eTag + versionID = aws.String(s3cw.versionID) + + hashOfHashes := md5.Sum(s3cw.md5s) + wantETag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(s3cw.completedParts)) + return wantETag, gotETag, versionID, nil } @@ -5652,25 +5756,18 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P return etag, lastModified, versionID, nil } -// Update the Object from in with modTime and size -func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - if o.fs.opt.VersionAt.IsSet() { - return errNotWithVersionAt - } +func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (req *s3.PutObjectInput, md5sumHex string, err error) { bucket, bucketPath := o.split() // Create parent dir/bucket if not saving directory marker if !strings.HasSuffix(o.remote, "/") { err := o.fs.mkdirParent(ctx, o.remote) if err != nil { - return err + return nil, "", err } } modTime := src.ModTime(ctx) - size := src.Size() - multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) - - req := s3.PutObjectInput{ + req = &s3.PutObjectInput{ Bucket: &bucket, ACL: stringPointerOrNil(o.fs.opt.ACL), Key: &bucketPath, @@ -5679,7 +5776,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Fetch metadata if --metadata is in use meta, err := fs.GetMetadataOptions(ctx, src, options) if err != nil { - return fmt.Errorf("failed to read metadata from source object: %w", err) + return nil, "", fmt.Errorf("failed to read metadata from source object: %w", err) } req.Metadata = make(map[string]*string, len(meta)+2) // merge metadata into request and user metadata @@ -5731,7 +5828,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // - for multipart provided checksums aren't disabled // - so we can add the md5sum in the metadata as metaMD5Hash var md5sumBase64 string - var md5sumHex string + size := src.Size() + multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) if !multipart || !o.fs.opt.DisableChecksum { md5sumHex, err = src.Hash(ctx, hash.MD5) if err == nil && matchMd5.MatchString(md5sumHex) { @@ -5749,7 +5847,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } } - // Set the content type it it isn't set already + // Set the content type if it isn't set already if req.ContentType == nil { req.ContentType = aws.String(fs.MimeType(ctx, src)) } @@ -5824,17 +5922,36 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } } + return req, md5sumHex, nil +} + +// Update the Object from in with modTime and size +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + if o.fs.opt.VersionAt.IsSet() { + return errNotWithVersionAt + } + size := src.Size() + multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) + var wantETag string // Multipart upload Etag to check var gotETag string // Etag we got from the upload var lastModified time.Time // Time we got from the upload var versionID *string // versionID we got from the upload + var err error + var md5sumHex string + var req *s3.PutObjectInput if multipart { - wantETag, gotETag, versionID, err = o.uploadMultipart(ctx, &req, size, in) + wantETag, gotETag, versionID, err = o.uploadMultipart(ctx, src, in) } else { + req, md5sumHex, err = o.buildS3Req(ctx, src, options) + if err != nil { + return fmt.Errorf("failed to build s3 request: %v", err) + } + if o.fs.opt.UsePresignedRequest { - gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in) + gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, req, size, in) } else { - gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, &req, size, in) + gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, req, size, in) } } if err != nil { @@ -5854,7 +5971,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if o.fs.opt.NoHead && size >= 0 { head = new(s3.HeadObjectOutput) //structs.SetFrom(head, &req) - setFrom_s3HeadObjectOutput_s3PutObjectInput(head, &req) + setFrom_s3HeadObjectOutput_s3PutObjectInput(head, req) head.ETag = &md5sumHex // doesn't matter quotes are missing head.ContentLength = &size // We get etag back from single and multipart upload so fill it in here