From 25703ad20ed34a8990fc02913e17a74692050d21 Mon Sep 17 00:00:00 2001 From: Manoj Ghosh Date: Mon, 21 Aug 2023 21:18:27 -0700 Subject: [PATCH] oracleobjectstorage: implement OpenChunkWriter and multi-thread uploads #7056 --- backend/oracleobjectstorage/multipart.go | 637 ++++++++++-------- backend/oracleobjectstorage/object.go | 125 +--- backend/oracleobjectstorage/options.go | 17 - .../oracleobjectstorage.go | 47 +- docs/content/oracleobjectstorage.md | 25 - fstest/test_all/config.yaml | 1 + 6 files changed, 390 insertions(+), 462 deletions(-) diff --git a/backend/oracleobjectstorage/multipart.go b/backend/oracleobjectstorage/multipart.go index 9dce5ff75..f5af23a11 100644 --- a/backend/oracleobjectstorage/multipart.go +++ b/backend/oracleobjectstorage/multipart.go @@ -4,43 +4,390 @@ package oracleobjectstorage import ( - "bytes" "context" "crypto/md5" "encoding/base64" + "encoding/hex" "fmt" "io" - "sort" - "strconv" + "strings" "sync" + "time" + + "github.com/ncw/swift/v2" + "github.com/rclone/rclone/lib/multipart" + "golang.org/x/net/http/httpguts" "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/chunksize" - "github.com/rclone/rclone/lib/atexit" - "github.com/rclone/rclone/lib/pacer" - "github.com/rclone/rclone/lib/readers" - "golang.org/x/sync/errgroup" + "github.com/rclone/rclone/fs/hash" ) var warnStreamUpload sync.Once -func (o *Object) uploadMultipart( +// Info needed for an upload +type uploadInfo struct { + req *objectstorage.PutObjectRequest + md5sumHex string +} + +type objectChunkWriter struct { + chunkSize int64 + size int64 + f *Fs + bucket *string + key *string + uploadId *string + partsToCommit []objectstorage.CommitMultipartUploadPartDetails + partsToCommitMu sync.Mutex + existingParts map[int]objectstorage.MultipartUploadPartSummary + eTag string + md5sMu sync.Mutex + md5s []byte + ui uploadInfo + o *Object +} + +func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) error { + _, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ + Open: o.fs, + Concurrency: o.fs.opt.UploadConcurrency, + LeavePartsOnError: o.fs.opt.LeavePartsOnError, + OpenOptions: options, + }) + return err +} + +// OpenChunkWriter returns the chunk size and a ChunkWriter +// +// Pass in the remote and the src object +// You can also use options to hint at the desired chunk size +func (f *Fs) OpenChunkWriter( ctx context.Context, - putReq *objectstorage.PutObjectRequest, - in io.Reader, - src fs.ObjectInfo) (err error) { - uploadID, uploadedParts, err := o.createMultipartUpload(ctx, putReq) + remote string, + src fs.ObjectInfo, + options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + // Temporary Object under construction + o := &Object{ + fs: f, + remote: remote, + } + ui, err := o.prepareUpload(ctx, src, options) + if err != nil { + return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) + } + + uploadParts := f.opt.MaxUploadParts + if uploadParts < 1 { + uploadParts = 1 + } else if uploadParts > maxUploadParts { + uploadParts = maxUploadParts + } + size := src.Size() + + // calculate size of parts + chunkSize := f.opt.ChunkSize + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of + // 48 GiB which seems like a not too unreasonable limit. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts))) + }) + } else { + chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize) + } + + uploadId, existingParts, err := o.createMultipartUpload(ctx, ui.req) + if err != nil { + return -1, nil, fmt.Errorf("create multipart upload request failed: %w", err) + } + bucketName, bucketPath := o.split() + chunkWriter := &objectChunkWriter{ + chunkSize: int64(chunkSize), + size: size, + f: f, + bucket: &bucketName, + key: &bucketPath, + uploadId: &uploadId, + existingParts: existingParts, + ui: ui, + o: o, + } + fs.Debugf(o, "open chunk writer: started multipart upload: %v", uploadId) + return int64(chunkSize), chunkWriter, err +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 +func (w *objectChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) { + if chunkNumber < 0 { + err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) + return -1, err + } + m := md5.New() + currentChunkSize, err := io.Copy(m, reader) + if err != nil { + return -1, err + } + // If no data read, don't write the chunk + if currentChunkSize == 0 { + return 0, nil + } + md5sumBinary := m.Sum([]byte{}) + w.addMd5(&md5sumBinary, int64(chunkNumber)) + md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) + + // Object storage requires 1 <= PartNumber <= 10000 + ossPartNumber := chunkNumber + 1 + if existing, ok := w.existingParts[ossPartNumber]; ok { + if md5sum == *existing.Md5 { + fs.Debugf(w.o, "matched uploaded part found, part num %d, skipping part, md5=%v", *existing.PartNumber, md5sum) + w.addCompletedPart(existing.PartNumber, existing.Etag) + return currentChunkSize, nil + } + } + req := objectstorage.UploadPartRequest{ + NamespaceName: common.String(w.f.opt.Namespace), + BucketName: w.bucket, + ObjectName: w.key, + UploadId: w.uploadId, + UploadPartNum: common.Int(ossPartNumber), + ContentLength: common.Int64(currentChunkSize), + ContentMD5: common.String(md5sum), + } + w.o.applyPartUploadOptions(w.ui.req, &req) + var resp objectstorage.UploadPartResponse + err = w.f.pacer.Call(func() (bool, error) { + // req.UploadPartBody = io.NopCloser(bytes.NewReader(buf)) + // rewind the reader on retry and after reading md5 + _, err = reader.Seek(0, io.SeekStart) + if err != nil { + return false, err + } + req.UploadPartBody = io.NopCloser(reader) + resp, err = w.f.srv.UploadPart(ctx, req) + if err != nil { + if ossPartNumber <= 8 { + return shouldRetry(ctx, resp.HTTPResponse(), err) + } + // retry all chunks once have done the first few + return true, err + } + return false, err + }) + if err != nil { + fs.Errorf(w.o, "multipart upload failed to upload part:%d err: %v", ossPartNumber, err) + return -1, fmt.Errorf("multipart upload failed to upload part: %w", err) + } + w.addCompletedPart(&ossPartNumber, resp.ETag) + return currentChunkSize, err + +} + +// add a part number and etag to the completed parts +func (w *objectChunkWriter) addCompletedPart(partNum *int, eTag *string) { + w.partsToCommitMu.Lock() + defer w.partsToCommitMu.Unlock() + w.partsToCommit = append(w.partsToCommit, objectstorage.CommitMultipartUploadPartDetails{ + PartNum: partNum, + Etag: eTag, + }) +} + +func (w *objectChunkWriter) Close(ctx context.Context) (err error) { + req := objectstorage.CommitMultipartUploadRequest{ + NamespaceName: common.String(w.f.opt.Namespace), + BucketName: w.bucket, + ObjectName: w.key, + UploadId: w.uploadId, + } + req.PartsToCommit = w.partsToCommit + var resp objectstorage.CommitMultipartUploadResponse + err = w.f.pacer.Call(func() (bool, error) { + resp, err = w.f.srv.CommitMultipartUpload(ctx, req) + // if multipart is corrupted, we will abort the uploadId + if isMultiPartUploadCorrupted(err) { + fs.Debugf(w.o, "multipart uploadId %v is corrupted, aborting...", *w.uploadId) + _ = w.Abort(ctx) + return false, err + } + return shouldRetry(ctx, resp.HTTPResponse(), err) + }) if err != nil { - fs.Errorf(o, "failed to create multipart upload-id err: %v", err) return err } - return o.uploadParts(ctx, putReq, in, src, uploadID, uploadedParts) + w.eTag = *resp.ETag + hashOfHashes := md5.Sum(w.md5s) + wantMultipartMd5 := fmt.Sprintf("%s-%d", base64.StdEncoding.EncodeToString(hashOfHashes[:]), len(w.partsToCommit)) + gotMultipartMd5 := *resp.OpcMultipartMd5 + if wantMultipartMd5 != gotMultipartMd5 { + fs.Errorf(w.o, "multipart upload corrupted: multipart md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) + return fmt.Errorf("multipart upload corrupted: md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) + } + fs.Debugf(w.o, "multipart upload %v md5 matched: expecting %s and got %s", *w.uploadId, wantMultipartMd5, gotMultipartMd5) + return nil +} + +func isMultiPartUploadCorrupted(err error) bool { + if err == nil { + return false + } + // Check if this oci-err object, and if it is multipart commit error + if ociError, ok := err.(common.ServiceError); ok { + // If it is a timeout then we want to retry that + if ociError.GetCode() == "InvalidUploadPart" { + return true + } + } + return false +} + +func (w *objectChunkWriter) Abort(ctx context.Context) error { + fs.Debugf(w.o, "Cancelling multipart upload") + err := w.o.fs.abortMultiPartUpload( + ctx, + w.bucket, + w.key, + w.uploadId) + if err != nil { + fs.Debugf(w.o, "Failed to cancel multipart upload: %v", err) + } else { + fs.Debugf(w.o, "canceled and aborted multipart upload: %v", *w.uploadId) + } + return err +} + +// addMd5 adds a binary md5 to the md5 calculated so far +func (w *objectChunkWriter) addMd5(md5binary *[]byte, chunkNumber int64) { + w.md5sMu.Lock() + defer w.md5sMu.Unlock() + start := chunkNumber * md5.Size + end := start + md5.Size + if extend := end - int64(len(w.md5s)); extend > 0 { + w.md5s = append(w.md5s, make([]byte, extend)...) + } + copy(w.md5s[start:end], (*md5binary)[:]) +} + +func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) { + bucket, bucketPath := o.split() + + ui.req = &objectstorage.PutObjectRequest{ + NamespaceName: common.String(o.fs.opt.Namespace), + BucketName: common.String(bucket), + ObjectName: common.String(bucketPath), + } + + // Set the mtime in the metadata + modTime := src.ModTime(ctx) + // Fetch metadata if --metadata is in use + meta, err := fs.GetMetadataOptions(ctx, src, options) + if err != nil { + return ui, fmt.Errorf("failed to read metadata from source object: %w", err) + } + ui.req.OpcMeta = make(map[string]string, len(meta)+2) + // merge metadata into request and user metadata + for k, v := range meta { + pv := common.String(v) + k = strings.ToLower(k) + switch k { + case "cache-control": + ui.req.CacheControl = pv + case "content-disposition": + ui.req.ContentDisposition = pv + case "content-encoding": + ui.req.ContentEncoding = pv + case "content-language": + ui.req.ContentLanguage = pv + case "content-type": + ui.req.ContentType = pv + case "tier": + // ignore + case "mtime": + // mtime in meta overrides source ModTime + metaModTime, err := time.Parse(time.RFC3339Nano, v) + if err != nil { + fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err) + } else { + modTime = metaModTime + } + case "btime": + // write as metadata since we can't set it + ui.req.OpcMeta[k] = v + default: + ui.req.OpcMeta[k] = v + } + } + + // Set the mtime in the metadata + ui.req.OpcMeta[metaMtime] = swift.TimeToFloatString(modTime) + + // read the md5sum if available + // - for non-multipart + // - so we can add a ContentMD5 + // - so we can add the md5sum in the metadata as metaMD5Hash if using SSE/SSE-C + // - for multipart provided checksums aren't disabled + // - so we can add the md5sum in the metadata as metaMD5Hash + size := src.Size() + isMultipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) + var md5sumBase64 string + if !isMultipart || !o.fs.opt.DisableChecksum { + ui.md5sumHex, err = src.Hash(ctx, hash.MD5) + if err == nil && matchMd5.MatchString(ui.md5sumHex) { + hashBytes, err := hex.DecodeString(ui.md5sumHex) + if err == nil { + md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes) + if isMultipart && !o.fs.opt.DisableChecksum { + // Set the md5sum as metadata on the object if + // - a multipart upload + // - the ETag is not an MD5, e.g. when using SSE/SSE-C + // provided checksums aren't disabled + ui.req.OpcMeta[metaMD5Hash] = md5sumBase64 + } + } + } + } + // Set the content type if it isn't set already + if ui.req.ContentType == nil { + ui.req.ContentType = common.String(fs.MimeType(ctx, src)) + } + if size >= 0 { + ui.req.ContentLength = common.Int64(size) + } + if md5sumBase64 != "" { + ui.req.ContentMD5 = &md5sumBase64 + } + o.applyPutOptions(ui.req, options...) + useBYOKPutObject(o.fs, ui.req) + if o.fs.opt.StorageTier != "" { + storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier) + if !ok { + return ui, fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier) + } + ui.req.StorageTier = storageTier + } + // Check metadata keys and values are valid + for key, value := range ui.req.OpcMeta { + if !httpguts.ValidHeaderFieldName(key) { + fs.Errorf(o, "Dropping invalid metadata key %q", key) + delete(ui.req.OpcMeta, key) + } else if value == "" { + fs.Errorf(o, "Dropping nil metadata value for key %q", key) + delete(ui.req.OpcMeta, key) + } else if !httpguts.ValidHeaderFieldValue(value) { + fs.Errorf(o, "Dropping invalid metadata value %q for key %q", value, key) + delete(ui.req.OpcMeta, key) + } + } + return ui, nil } func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorage.PutObjectRequest) ( - uploadID string, uploadedParts map[int]objectstorage.MultipartUploadPartSummary, err error) { + uploadID string, existingParts map[int]objectstorage.MultipartUploadPartSummary, err error) { bucketName, bucketPath := o.split() f := o.fs if f.opt.AttemptResumeUpload { @@ -48,10 +395,10 @@ func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorag resumeUploads, err := o.fs.findLatestMultipartUpload(ctx, bucketName, bucketPath) if err == nil && len(resumeUploads) > 0 { uploadID = *resumeUploads[0].UploadId - uploadedParts, err = f.listMultipartUploadParts(ctx, bucketName, bucketPath, uploadID) + existingParts, err = f.listMultipartUploadParts(ctx, bucketName, bucketPath, uploadID) if err == nil { fs.Debugf(o, "resuming with existing upload id: %v", uploadID) - return uploadID, uploadedParts, err + return uploadID, existingParts, err } } } @@ -75,260 +422,10 @@ func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorag return shouldRetry(ctx, resp.HTTPResponse(), err) }) if err != nil { - return "", nil, err + return "", existingParts, err } + existingParts = make(map[int]objectstorage.MultipartUploadPartSummary) uploadID = *resp.UploadId fs.Debugf(o, "created new upload id: %v", uploadID) - return uploadID, nil, err -} - -func (o *Object) uploadParts( - ctx context.Context, - putReq *objectstorage.PutObjectRequest, - in io.Reader, - src fs.ObjectInfo, - uploadID string, - uploadedParts map[int]objectstorage.MultipartUploadPartSummary) (err error) { - bucketName, bucketPath := o.split() - f := o.fs - - // make concurrency machinery - concurrency := f.opt.UploadConcurrency - if concurrency < 1 { - concurrency = 1 - } - - uploadParts := f.opt.MaxUploadParts - if uploadParts < 1 { - uploadParts = 1 - } else if uploadParts > maxUploadParts { - uploadParts = maxUploadParts - } - - // calculate size of parts - partSize := f.opt.ChunkSize - fileSize := src.Size() - - // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize - // buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of - // 48 GiB which seems like a not too unreasonable limit. - if fileSize == -1 { - warnStreamUpload.Do(func() { - fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", - f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*int64(uploadParts))) - }) - } else { - partSize = chunksize.Calculator(o, fileSize, uploadParts, f.opt.ChunkSize) - } - - uploadCtx, cancel := context.WithCancel(ctx) - defer atexit.OnError(&err, func() { - cancel() - if o.fs.opt.LeavePartsOnError { - return - } - fs.Debugf(o, "Cancelling multipart upload") - errCancel := o.fs.abortMultiPartUpload( - context.Background(), - bucketName, - bucketPath, - uploadID) - if errCancel != nil { - fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) - } else { - fs.Debugf(o, "canceled and aborted multipart upload: %v", uploadID) - } - })() - - var ( - g, gCtx = errgroup.WithContext(uploadCtx) - finished = false - partsMu sync.Mutex // to protect parts - parts []*objectstorage.CommitMultipartUploadPartDetails - off int64 - md5sMu sync.Mutex - md5s []byte - tokens = pacer.NewTokenDispenser(concurrency) - memPool = o.fs.getMemoryPool(int64(partSize)) - ) - - addMd5 := func(md5binary *[md5.Size]byte, partNum int64) { - md5sMu.Lock() - defer md5sMu.Unlock() - start := partNum * md5.Size - end := start + md5.Size - if extend := end - int64(len(md5s)); extend > 0 { - md5s = append(md5s, make([]byte, extend)...) - } - copy(md5s[start:end], (*md5binary)[:]) - } - - for partNum := int64(1); !finished; partNum++ { - // Get a block of memory from the pool and token which limits concurrency. - tokens.Get() - buf := memPool.Get() - - free := func() { - // return the memory and token - memPool.Put(buf) - tokens.Put() - } - - // Fail fast, in case an errgroup managed function returns an error - // gCtx is cancelled. There is no point in uploading all the other parts. - if gCtx.Err() != nil { - free() - break - } - - // Read the chunk - var n int - n, err = readers.ReadFill(in, buf) // this can never return 0, nil - if err == io.EOF { - if n == 0 && partNum != 1 { // end if no data and if not first chunk - free() - break - } - finished = true - } else if err != nil { - free() - return fmt.Errorf("multipart upload failed to read source: %w", err) - } - buf = buf[:n] - - partNum := partNum - fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(fileSize)) - off += int64(n) - g.Go(func() (err error) { - defer free() - partLength := int64(len(buf)) - - // create checksum of buffer for integrity checking - md5sumBinary := md5.Sum(buf) - addMd5(&md5sumBinary, partNum-1) - md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) - if uploadedPart, ok := uploadedParts[int(partNum)]; ok { - if md5sum == *uploadedPart.Md5 { - fs.Debugf(o, "matched uploaded part found, part num %d, skipping part, md5=%v", partNum, md5sum) - partsMu.Lock() - parts = append(parts, &objectstorage.CommitMultipartUploadPartDetails{ - PartNum: uploadedPart.PartNumber, - Etag: uploadedPart.Etag, - }) - partsMu.Unlock() - return nil - } - } - - req := objectstorage.UploadPartRequest{ - NamespaceName: common.String(o.fs.opt.Namespace), - BucketName: common.String(bucketName), - ObjectName: common.String(bucketPath), - UploadId: common.String(uploadID), - UploadPartNum: common.Int(int(partNum)), - ContentLength: common.Int64(partLength), - ContentMD5: common.String(md5sum), - } - o.applyPartUploadOptions(putReq, &req) - var resp objectstorage.UploadPartResponse - err = f.pacer.Call(func() (bool, error) { - req.UploadPartBody = io.NopCloser(bytes.NewReader(buf)) - resp, err = f.srv.UploadPart(gCtx, req) - if err != nil { - if partNum <= int64(concurrency) { - return shouldRetry(gCtx, resp.HTTPResponse(), err) - } - // retry all chunks once have done the first batch - return true, err - } - partsMu.Lock() - parts = append(parts, &objectstorage.CommitMultipartUploadPartDetails{ - PartNum: common.Int(int(partNum)), - Etag: resp.ETag, - }) - partsMu.Unlock() - return false, nil - }) - if err != nil { - fs.Errorf(o, "multipart upload failed to upload part:%d err: %v", partNum, err) - return fmt.Errorf("multipart upload failed to upload part: %w", err) - } - return nil - }) - } - err = g.Wait() - if err != nil { - return err - } - - // sort the completed parts by part number - sort.Slice(parts, func(i, j int) bool { - return *parts[i].PartNum < *parts[j].PartNum - }) - - var resp objectstorage.CommitMultipartUploadResponse - resp, err = o.commitMultiPart(ctx, uploadID, parts) - if err != nil { - return err - } - fs.Debugf(o, "multipart upload %v committed.", uploadID) - hashOfHashes := md5.Sum(md5s) - wantMultipartMd5 := base64.StdEncoding.EncodeToString(hashOfHashes[:]) + "-" + strconv.Itoa(len(parts)) - gotMultipartMd5 := *resp.OpcMultipartMd5 - if wantMultipartMd5 != gotMultipartMd5 { - fs.Errorf(o, "multipart upload corrupted: multipart md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) - return fmt.Errorf("multipart upload corrupted: md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) - } - fs.Debugf(o, "multipart upload %v md5 matched: expecting %s and got %s", uploadID, wantMultipartMd5, gotMultipartMd5) - return nil -} - -// commits the multipart upload -func (o *Object) commitMultiPart(ctx context.Context, uploadID string, parts []*objectstorage.CommitMultipartUploadPartDetails) (resp objectstorage.CommitMultipartUploadResponse, err error) { - bucketName, bucketPath := o.split() - req := objectstorage.CommitMultipartUploadRequest{ - NamespaceName: common.String(o.fs.opt.Namespace), - BucketName: common.String(bucketName), - ObjectName: common.String(bucketPath), - UploadId: common.String(uploadID), - } - var partsToCommit []objectstorage.CommitMultipartUploadPartDetails - for _, part := range parts { - partsToCommit = append(partsToCommit, *part) - } - req.PartsToCommit = partsToCommit - err = o.fs.pacer.Call(func() (bool, error) { - resp, err = o.fs.srv.CommitMultipartUpload(ctx, req) - // if multipart is corrupted, we will abort the uploadId - if o.isMultiPartUploadCorrupted(err) { - fs.Debugf(o, "multipart uploadId %v is corrupted, aborting...", uploadID) - errCancel := o.fs.abortMultiPartUpload( - context.Background(), - bucketName, - bucketPath, - uploadID) - if errCancel != nil { - fs.Debugf(o, "Failed to abort multipart upload: %v, ignoring.", errCancel) - } else { - fs.Debugf(o, "aborted multipart upload: %v", uploadID) - } - return false, err - } - return shouldRetry(ctx, resp.HTTPResponse(), err) - }) - return resp, err -} - -func (o *Object) isMultiPartUploadCorrupted(err error) bool { - if err == nil { - return false - } - // Check if this ocierr object, and if it is multipart commit error - if ociError, ok := err.(common.ServiceError); ok { - // If it is a timeout then we want to retry that - if ociError.GetCode() == "InvalidUploadPart" { - return true - } - } - return false + return uploadID, existingParts, err } diff --git a/backend/oracleobjectstorage/object.go b/backend/oracleobjectstorage/object.go index b9158062c..fb9562b6d 100644 --- a/backend/oracleobjectstorage/object.go +++ b/backend/oracleobjectstorage/object.go @@ -17,8 +17,6 @@ import ( "strings" "time" - "golang.org/x/net/http/httpguts" - "github.com/ncw/swift/v2" "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" @@ -390,7 +388,7 @@ func isZeroLength(streamReader io.Reader) bool { // Update an object if it has changed func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { - bucketName, bucketPath := o.split() + bucketName, _ := o.split() err = o.fs.makeBucket(ctx, bucketName) if err != nil { return err @@ -398,129 +396,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // determine if we like upload single or multipart. size := src.Size() - multipart := size >= int64(o.fs.opt.UploadCutoff) - + multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) if isZeroLength(in) { multipart = false } - - req := objectstorage.PutObjectRequest{ - NamespaceName: common.String(o.fs.opt.Namespace), - BucketName: common.String(bucketName), - ObjectName: common.String(bucketPath), - } - - // Set the mtime in the metadata - modTime := src.ModTime(ctx) - // Fetch metadata if --metadata is in use - meta, err := fs.GetMetadataOptions(ctx, src, options) - if err != nil { - return fmt.Errorf("failed to read metadata from source object: %w", err) - } - req.OpcMeta = make(map[string]string, len(meta)+2) - // merge metadata into request and user metadata - for k, v := range meta { - pv := common.String(v) - k = strings.ToLower(k) - switch k { - case "cache-control": - req.CacheControl = pv - case "content-disposition": - req.ContentDisposition = pv - case "content-encoding": - req.ContentEncoding = pv - case "content-language": - req.ContentLanguage = pv - case "content-type": - req.ContentType = pv - case "tier": - // ignore - case "mtime": - // mtime in meta overrides source ModTime - metaModTime, err := time.Parse(time.RFC3339Nano, v) - if err != nil { - fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err) - } else { - modTime = metaModTime - } - case "btime": - // write as metadata since we can't set it - req.OpcMeta[k] = v - default: - req.OpcMeta[k] = v - } - } - - // Set the mtime in the metadata - req.OpcMeta[metaMtime] = swift.TimeToFloatString(modTime) - - // read the md5sum if available - // - for non-multipart - // - so we can add a ContentMD5 - // - so we can add the md5sum in the metadata as metaMD5Hash if using SSE/SSE-C - // - for multipart provided checksums aren't disabled - // - so we can add the md5sum in the metadata as metaMD5Hash - var md5sumBase64 string - var md5sumHex string - if !multipart || !o.fs.opt.DisableChecksum { - md5sumHex, err = src.Hash(ctx, hash.MD5) - if err == nil && matchMd5.MatchString(md5sumHex) { - hashBytes, err := hex.DecodeString(md5sumHex) - if err == nil { - md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes) - if multipart && !o.fs.opt.DisableChecksum { - // Set the md5sum as metadata on the object if - // - a multipart upload - // - the ETag is not an MD5, e.g. when using SSE/SSE-C - // provided checksums aren't disabled - req.OpcMeta[metaMD5Hash] = md5sumBase64 - } - } - } - } - // Set the content type if it isn't set already - if req.ContentType == nil { - req.ContentType = common.String(fs.MimeType(ctx, src)) - } - if size >= 0 { - req.ContentLength = common.Int64(size) - } - if md5sumBase64 != "" { - req.ContentMD5 = &md5sumBase64 - } - o.applyPutOptions(&req, options...) - useBYOKPutObject(o.fs, &req) - if o.fs.opt.StorageTier != "" { - storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier) - if !ok { - return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier) - } - req.StorageTier = storageTier - } - // Check metadata keys and values are valid - for key, value := range req.OpcMeta { - if !httpguts.ValidHeaderFieldName(key) { - fs.Errorf(o, "Dropping invalid metadata key %q", key) - delete(req.OpcMeta, key) - } else if value == "" { - fs.Errorf(o, "Dropping nil metadata value for key %q", key) - delete(req.OpcMeta, key) - } else if !httpguts.ValidHeaderFieldValue(value) { - fs.Errorf(o, "Dropping invalid metadata value %q for key %q", value, key) - delete(req.OpcMeta, key) - } - } - if multipart { - err = o.uploadMultipart(ctx, &req, in, src) + err = o.uploadMultipart(ctx, src, in) if err != nil { return err } } else { + ui, err := o.prepareUpload(ctx, src, options) + if err != nil { + return fmt.Errorf("failed to prepare upload: %w", err) + } var resp objectstorage.PutObjectResponse err = o.fs.pacer.CallNoRetry(func() (bool, error) { - req.PutObjectBody = io.NopCloser(in) - resp, err = o.fs.srv.PutObject(ctx, req) + ui.req.PutObjectBody = io.NopCloser(in) + resp, err = o.fs.srv.PutObject(ctx, *ui.req) return shouldRetry(ctx, resp.HTTPResponse(), err) }) if err != nil { diff --git a/backend/oracleobjectstorage/options.go b/backend/oracleobjectstorage/options.go index 6442e1d8c..3eeb6899e 100644 --- a/backend/oracleobjectstorage/options.go +++ b/backend/oracleobjectstorage/options.go @@ -20,8 +20,6 @@ const ( maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) minSleep = 10 * time.Millisecond defaultCopyTimeoutDuration = fs.Duration(time.Minute) - memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long - memoryPoolUseMmap = false ) const ( @@ -61,8 +59,6 @@ type Options struct { MaxUploadParts int `config:"max_upload_parts"` UploadConcurrency int `config:"upload_concurrency"` DisableChecksum bool `config:"disable_checksum"` - MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` - MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` CopyTimeout fs.Duration `config:"copy_timeout"` StorageTier string `config:"storage_tier"` @@ -223,19 +219,6 @@ copied in chunks of this size. The minimum is 0 and the maximum is 5 GiB.`, Default: fs.SizeSuffix(maxSizeForCopy), Advanced: true, - }, { - Name: "memory_pool_flush_time", - Default: memoryPoolFlushTime, - Advanced: true, - Help: `How often internal memory buffer pools will be flushed. - -Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. -This option controls how often unused buffers will be removed from the pool.`, - }, { - Name: "memory_pool_use_mmap", - Default: memoryPoolUseMmap, - Advanced: true, - Help: `Whether to use mmap buffers in internal memory pool.`, }, { Name: "copy_timeout", Help: `Timeout for copy. diff --git a/backend/oracleobjectstorage/oracleobjectstorage.go b/backend/oracleobjectstorage/oracleobjectstorage.go index 4ff321519..e1cc2980d 100644 --- a/backend/oracleobjectstorage/oracleobjectstorage.go +++ b/backend/oracleobjectstorage/oracleobjectstorage.go @@ -23,7 +23,6 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" - "github.com/rclone/rclone/lib/pool" ) // Register with Fs @@ -50,7 +49,6 @@ type Fs struct { rootDirectory string // directory part of root (if any) cache *bucket.Cache // cache for bucket creation status pacer *fs.Pacer // To pace the API calls - pool *pool.Pool // memory pool } // NewFs Initialize backend @@ -82,12 +80,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e srv: objectStorageClient, cache: bucket.NewCache(), pacer: pc, - pool: pool.New( - time.Duration(opt.MemoryPoolFlushTime), - int(opt.ChunkSize), - opt.UploadConcurrency*ci.Transfers, - opt.MemoryPoolUseMmap, - ), } f.setRoot(root) f.features = (&fs.Features{ @@ -187,19 +179,6 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.MD5) } -func (f *Fs) getMemoryPool(size int64) *pool.Pool { - if size == int64(f.opt.ChunkSize) { - return f.pool - } - - return pool.New( - time.Duration(f.opt.MemoryPoolFlushTime), - int(size), - f.opt.UploadConcurrency*f.ci.Transfers, - f.opt.MemoryPoolUseMmap, - ) -} - // setRoot changes the root of the Fs func (f *Fs) setRoot(root string) { f.root = parsePath(root) @@ -339,7 +318,6 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck remote := *object.Name remote = f.opt.Enc.ToStandardPath(remote) if !strings.HasPrefix(remote, prefix) { - // fs.Debugf(f, "Odd name received %v", object.Name) continue } remote = remote[len(prefix):] @@ -579,15 +557,15 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { }) } -func (f *Fs) abortMultiPartUpload(ctx context.Context, bucketName, bucketPath, uploadID string) (err error) { - if uploadID == "" { +func (f *Fs) abortMultiPartUpload(ctx context.Context, bucketName, bucketPath, uploadID *string) (err error) { + if uploadID == nil || *uploadID == "" { return nil } request := objectstorage.AbortMultipartUploadRequest{ NamespaceName: common.String(f.opt.Namespace), - BucketName: common.String(bucketName), - ObjectName: common.String(bucketPath), - UploadId: common.String(uploadID), + BucketName: bucketName, + ObjectName: bucketPath, + UploadId: uploadID, } err = f.pacer.Call(func() (bool, error) { resp, err := f.srv.AbortMultipartUpload(ctx, request) @@ -610,7 +588,7 @@ func (f *Fs) cleanUpBucket(ctx context.Context, bucket string, maxAge time.Durat if operations.SkipDestructive(ctx, what, "remove pending upload") { continue } - _ = f.abortMultiPartUpload(ctx, *upload.Bucket, *upload.Object, *upload.UploadId) + _ = f.abortMultiPartUpload(ctx, upload.Bucket, upload.Object, upload.UploadId) } } else { fs.Infof(f, "MultipartUpload doesn't have sufficient details to abort.") @@ -705,12 +683,13 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Copier = &Fs{} - _ fs.PutStreamer = &Fs{} - _ fs.ListRer = &Fs{} - _ fs.Commander = &Fs{} - _ fs.CleanUpper = &Fs{} + _ fs.Fs = &Fs{} + _ fs.Copier = &Fs{} + _ fs.PutStreamer = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.Commander = &Fs{} + _ fs.CleanUpper = &Fs{} + _ fs.OpenChunkWriter = &Fs{} _ fs.Object = &Object{} _ fs.MimeTyper = &Object{} diff --git a/docs/content/oracleobjectstorage.md b/docs/content/oracleobjectstorage.md index 002ab9781..48b0b4f17 100644 --- a/docs/content/oracleobjectstorage.md +++ b/docs/content/oracleobjectstorage.md @@ -507,31 +507,6 @@ Properties: - Type: SizeSuffix - Default: 4.656Gi -#### --oos-memory-pool-flush-time - -How often internal memory buffer pools will be flushed. - -Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. -This option controls how often unused buffers will be removed from the pool. - -Properties: - -- Config: memory_pool_flush_time -- Env Var: RCLONE_OOS_MEMORY_POOL_FLUSH_TIME -- Type: Duration -- Default: 1m0s - -#### --oos-memory-pool-use-mmap - -Whether to use mmap buffers in internal memory pool. - -Properties: - -- Config: memory_pool_use_mmap -- Env Var: RCLONE_OOS_MEMORY_POOL_USE_MMAP -- Type: bool -- Default: false - #### --oos-copy-timeout Timeout for copy. diff --git a/fstest/test_all/config.yaml b/fstest/test_all/config.yaml index c8a35d1b7..99e2a1adf 100644 --- a/fstest/test_all/config.yaml +++ b/fstest/test_all/config.yaml @@ -428,3 +428,4 @@ backends: - TestIntegration/FsMkdir/FsEncoding/trailing_CR - TestIntegration/FsMkdir/FsEncoding/trailing_LF - TestIntegration/FsMkdir/FsEncoding/leading_HT + - TestIntegration/FsMkdir/FsPutFiles/FsPutStream/0