azureblob: implement OpenChunkWriter and multi-thread uploads #7056

This implements the OpenChunkWriter interface for azureblob which
enables multi-thread uploads.

This makes the memory controls of the s3 backend inoperative; they are
replaced with the global ones.

    --azureblob-memory-pool-flush-time
    --azureblob-memory-pool-use-mmap

By using the buffered reader this fixes excessive memory use when
uploading large files as it will share memory pages between all
readers.
This commit is contained in:
Nick Craig-Wood 2023-08-16 16:59:39 +01:00
parent 3dfcfc2caa
commit 0427177857
2 changed files with 298 additions and 273 deletions

View File

@ -5,7 +5,6 @@
package azureblob
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
@ -18,6 +17,7 @@ import (
"net/url"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
@ -33,7 +33,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
@ -46,10 +45,8 @@ import (
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/env"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
)
const (
@ -70,8 +67,6 @@ const (
emulatorAccount = "devstoreaccount1"
emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1"
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
)
var (
@ -337,17 +332,16 @@ to start uploading.`,
Advanced: true,
}, {
Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Default: fs.Duration(time.Minute),
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
Hide: fs.OptionHideBoth,
Help: `How often internal memory buffer pools will be flushed. (no longer used)`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Default: false,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
Hide: fs.OptionHideBoth,
Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`,
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
@ -432,8 +426,6 @@ type Options struct {
ArchiveTierDelete bool `config:"archive_tier_delete"`
UseEmulator bool `config:"use_emulator"`
DisableCheckSum bool `config:"disable_checksum"`
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
Enc encoder.MultiEncoder `config:"encoding"`
PublicAccess string `config:"public_access"`
DirectoryMarkers bool `config:"directory_markers"`
@ -457,8 +449,6 @@ type Fs struct {
cache *bucket.Cache // cache for container creation status
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
pool *pool.Pool // memory pool
poolSize int64 // size of pages in memory pool
publicAccess container.PublicAccessType // Container Public Access Level
}
@ -671,13 +661,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
cache: bucket.NewCache(),
cntSVCcache: make(map[string]*container.Client, 1),
pool: pool.New(
time.Duration(opt.MemoryPoolFlushTime),
int(opt.ChunkSize),
ci.Transfers,
opt.MemoryPoolUseMmap,
),
poolSize: int64(opt.ChunkSize),
}
f.publicAccess = container.PublicAccessType(opt.PublicAccess)
f.setRoot(root)
@ -1594,19 +1577,6 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
return f.NewObject(ctx, remote)
}
func (f *Fs) getMemoryPool(size int64) *pool.Pool {
if size == int64(f.opt.ChunkSize) {
return f.pool
}
return pool.New(
time.Duration(f.opt.MemoryPoolFlushTime),
int(size),
f.ci.Transfers,
f.opt.MemoryPoolUseMmap,
)
}
// ------------------------------------------------------------
// Fs returns the parent Fs
@ -1982,8 +1952,8 @@ func (rs *readSeekCloser) Close() error {
return nil
}
// increment the slice passed in as LSB binary
func increment(xs []byte) {
// increment the array as LSB binary
func increment(xs *[8]byte) {
for i, digit := range xs {
newDigit := digit + 1
xs[i] = newDigit
@ -1994,22 +1964,43 @@ func increment(xs []byte) {
}
}
var warnStreamUpload sync.Once
// record chunk number and id for Close
type azBlock struct {
chunkNumber int
id string
}
// uploadMultipart uploads a file using multipart upload
// Implements the fs.ChunkWriter interface
type azChunkWriter struct {
chunkSize int64
size int64
f *Fs
ui uploadInfo
blocksMu sync.Mutex // protects the below
blocks []azBlock // list of blocks for finalize
binaryBlockID [8]byte // block counter as LSB first 8 bytes
o *Object
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blb *blockblob.Client, httpHeaders *blob.HTTPHeaders) (err error) {
// Calculate correct partSize
partSize := o.fs.opt.ChunkSize
totalParts := -1
// make concurrency machinery
concurrency := o.fs.opt.UploadConcurrency
if concurrency < 1 {
concurrency = 1
// Pass in the remote and the src object
// You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
tokens := pacer.NewTokenDispenser(concurrency)
ui, err := o.prepareUpload(ctx, src, options)
if err != nil {
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err)
}
// Calculate correct partSize
partSize := f.opt.ChunkSize
totalParts := -1
size := src.Size()
// Note that the max size of file is 4.75 TB (100 MB X 50,000
// blocks) and this is bigger than the max uncommitted block
@ -2023,13 +2014,13 @@ func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64,
// 195GB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v",
o.fs.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks))
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks))
})
} else {
partSize = chunksize.Calculator(o, size, blockblob.MaxBlocks, o.fs.opt.ChunkSize)
partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize)
if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) {
return fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes))
return -1, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes))
}
totalParts = int(fs.SizeSuffix(size) / partSize)
if fs.SizeSuffix(size)%partSize != 0 {
@ -2039,173 +2030,259 @@ func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64,
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, partSize)
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
// FIXME it would be nice to delete uncommitted blocks
// See: https://github.com/rclone/rclone/issues/5583
//
// However there doesn't seem to be an easy way of doing this other than
// by deleting the target.
//
// This means that a failed upload deletes the target which isn't ideal.
//
// Uploading a zero length blob and deleting it will remove the
// uncommitted blocks I think.
//
// Could check to see if a file exists already and if it
// doesn't then create a 0 length file and delete it to flush
// the uncommitted blocks.
//
// This is what azcopy does
// https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962
// defer atexit.OnError(&err, func() {
// fs.Debugf(o, "Cancelling multipart upload")
// // Code goes here!
// })()
// Upload the chunks
var (
g, gCtx = errgroup.WithContext(ctx)
remaining = fs.SizeSuffix(size) // remaining size in file for logging only, -1 if size < 0
position = fs.SizeSuffix(0) // position in file
memPool = o.fs.getMemoryPool(int64(partSize)) // pool to get memory from
finished = false // set when we have read EOF
blocks []string // list of blocks for finalize
binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes
)
for part := 0; !finished; part++ {
// Get a block of memory from the pool and a token which limits concurrency
tokens.Get()
buf := memPool.Get()
free := func() {
memPool.Put(buf) // return the buf
tokens.Put() // return the token
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
n, err := readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 { // end if no data
free()
break
}
finished = true
} else if err != nil {
free()
return fmt.Errorf("multipart upload failed to read source: %w", err)
}
buf = buf[:n]
// increment the blockID and save the blocks for finalize
increment(binaryBlockID)
blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
blocks = append(blocks, blockID)
// Transfer the chunk
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %d", part+1, totalParts, position, fs.SizeSuffix(size), len(buf))
g.Go(func() (err error) {
defer free()
// Upload the block, with MD5 for check
md5sum := md5.Sum(buf)
transactionalMD5 := md5sum[:]
err = o.fs.pacer.Call(func() (bool, error) {
bufferReader := bytes.NewReader(buf)
wrappedReader := wrap(bufferReader)
rs := readSeekCloser{wrappedReader, bufferReader}
options := blockblob.StageBlockOptions{
// Specify the transactional md5 for the body, to be validated by the service.
TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5),
}
_, err = blb.StageBlock(ctx, blockID, &rs, &options)
return o.fs.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("multipart upload failed to upload part: %w", err)
}
return nil
})
// ready for next block
if size >= 0 {
remaining -= partSize
}
position += partSize
chunkWriter := &azChunkWriter{
chunkSize: int64(partSize),
size: size,
f: f,
ui: ui,
o: o,
}
err = g.Wait()
fs.Debugf(o, "open chunk writer: started multipart upload")
return int64(partSize), chunkWriter, nil
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
if chunkNumber < 0 {
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
return -1, err
}
// Upload the block, with MD5 for check
m := md5.New()
currentChunkSize, err := io.Copy(m, reader)
if err != nil {
return err
return -1, err
}
options := blockblob.CommitBlockListOptions{
Metadata: o.getMetadata(),
Tier: parseTier(o.fs.opt.AccessTier),
HTTPHeaders: httpHeaders,
// If no data read, don't write the chunk
if currentChunkSize == 0 {
return 0, nil
}
md5sum := m.Sum(nil)
transactionalMD5 := md5sum[:]
// Finalise the upload session
err = o.fs.pacer.Call(func() (bool, error) {
_, err := blb.CommitBlockList(ctx, blocks, &options)
return o.fs.shouldRetry(ctx, err)
// increment the blockID and save the blocks for finalize
increment(&w.binaryBlockID)
blockID := base64.StdEncoding.EncodeToString(w.binaryBlockID[:])
// Save the blockID for the commit
w.blocksMu.Lock()
w.blocks = append(w.blocks, azBlock{
chunkNumber: chunkNumber,
id: blockID,
})
w.blocksMu.Unlock()
err = w.f.pacer.Call(func() (bool, error) {
// rewind the reader on retry and after reading md5
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
options := blockblob.StageBlockOptions{
// Specify the transactional md5 for the body, to be validated by the service.
TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5),
}
_, err = w.ui.blb.StageBlock(ctx, blockID, &readSeekCloser{Reader: reader, Seeker: reader}, &options)
if err != nil {
if chunkNumber <= 8 {
return w.f.shouldRetry(ctx, err)
}
// retry all chunks once have done the first few
return true, err
}
return false, nil
})
if err != nil {
return fmt.Errorf("multipart upload failed to finalize: %w", err)
return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err)
}
fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes", chunkNumber+1, currentChunkSize)
return currentChunkSize, err
}
// Abort the multpart upload.
//
// FIXME it would be nice to delete uncommitted blocks.
//
// See: https://github.com/rclone/rclone/issues/5583
//
// However there doesn't seem to be an easy way of doing this other than
// by deleting the target.
//
// This means that a failed upload deletes the target which isn't ideal.
//
// Uploading a zero length blob and deleting it will remove the
// uncommitted blocks I think.
//
// Could check to see if a file exists already and if it doesn't then
// create a 0 length file and delete it to flush the uncommitted
// blocks.
//
// This is what azcopy does
// https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962
func (w *azChunkWriter) Abort(ctx context.Context) error {
fs.Debugf(w.o, "multipart upload aborted (did nothing - see issue #5583)")
return nil
}
// Close and finalise the multipart upload
func (w *azChunkWriter) Close(ctx context.Context) (err error) {
// sort the completed parts by part number
sort.Slice(w.blocks, func(i, j int) bool {
return w.blocks[i].chunkNumber < w.blocks[j].chunkNumber
})
// Create a list of block IDs
blockIDs := make([]string, len(w.blocks))
for i := range w.blocks {
blockIDs[i] = w.blocks[i].id
}
options := blockblob.CommitBlockListOptions{
Metadata: w.o.getMetadata(),
Tier: parseTier(w.f.opt.AccessTier),
HTTPHeaders: &w.ui.httpHeaders,
}
// Finalise the upload session
err = w.f.pacer.Call(func() (bool, error) {
_, err := w.ui.blb.CommitBlockList(ctx, blockIDs, &options)
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
fs.Debugf(w.o, "multipart upload finished")
return err
}
var warnStreamUpload sync.Once
// uploadMultipart uploads a file using multipart upload
//
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) {
chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency,
OpenOptions: options,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
})
if err != nil {
return ui, err
}
return chunkWriter.(*azChunkWriter).ui, nil
}
// uploadSinglepart uploads a short blob using a single part upload
func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, blb *blockblob.Client, httpHeaders *blob.HTTPHeaders) (err error) {
func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, ui uploadInfo) (err error) {
chunkSize := int64(o.fs.opt.ChunkSize)
// fs.Debugf(o, "Single part upload starting of object %d bytes", size)
if size > o.fs.poolSize || size < 0 {
return fmt.Errorf("internal error: single part upload size too big %d > %d", size, o.fs.opt.ChunkSize)
if size > chunkSize || size < 0 {
return fmt.Errorf("internal error: single part upload size too big %d > %d", size, chunkSize)
}
buf := o.fs.pool.Get()
defer o.fs.pool.Put(buf)
rw := multipart.NewRW()
defer fs.CheckClose(rw, &err)
n, err := readers.ReadFill(in, buf)
if err == nil {
// Check to see whether in is exactly len(buf) or bigger
var buf2 = []byte{0}
n2, err2 := readers.ReadFill(in, buf2)
if n2 != 0 || err2 != io.EOF {
return fmt.Errorf("single part upload read failed: object longer than expected (expecting %d but got > %d)", size, len(buf))
}
}
n, err := io.CopyN(rw, in, size+1)
if err != nil && err != io.EOF {
return fmt.Errorf("single part upload read failed: %w", err)
}
if int64(n) != size {
if n != size {
return fmt.Errorf("single part upload: expecting to read %d bytes but read %d", size, n)
}
b := bytes.NewReader(buf[:n])
rs := &readSeekCloser{Reader: b, Seeker: b}
rs := &readSeekCloser{Reader: rw, Seeker: rw}
options := blockblob.UploadOptions{
Metadata: o.getMetadata(),
Tier: parseTier(o.fs.opt.AccessTier),
HTTPHeaders: httpHeaders,
HTTPHeaders: &ui.httpHeaders,
}
// Don't retry, return a retry error instead
return o.fs.pacer.CallNoRetry(func() (bool, error) {
_, err = blb.Upload(ctx, rs, &options)
return o.fs.pacer.Call(func() (bool, error) {
// rewind the reader on retry
_, err = rs.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
_, err = ui.blb.Upload(ctx, rs, &options)
return o.fs.shouldRetry(ctx, err)
})
}
// Info needed for an upload
type uploadInfo struct {
blb *blockblob.Client
httpHeaders blob.HTTPHeaders
isDirMarker bool
}
// Prepare the object for upload
func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) {
container, containerPath := o.split()
if container == "" || containerPath == "" {
return ui, fmt.Errorf("can't upload to root - need a container")
}
// Create parent dir/bucket if not saving directory marker
_, isDirMarker := o.meta[dirMetaKey]
if !isDirMarker {
err = o.fs.mkdirParent(ctx, o.remote)
if err != nil {
return ui, err
}
}
// Update Mod time
o.updateMetadataWithModTime(src.ModTime(ctx))
if err != nil {
return ui, err
}
// Create the HTTP headers for the upload
ui.httpHeaders = blob.HTTPHeaders{
BlobContentType: pString(fs.MimeType(ctx, src)),
}
// Compute the Content-MD5 of the file. As we stream all uploads it
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
if !o.fs.opt.DisableCheckSum {
if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
if err == nil {
ui.httpHeaders.BlobContentMD5 = sourceMD5bytes
} else {
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
}
}
}
// Apply upload options (also allows one to overwrite content-type)
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
ui.httpHeaders.BlobCacheControl = pString(value)
case "content-disposition":
ui.httpHeaders.BlobContentDisposition = pString(value)
case "content-encoding":
ui.httpHeaders.BlobContentEncoding = pString(value)
case "content-language":
ui.httpHeaders.BlobContentLanguage = pString(value)
case "content-type":
ui.httpHeaders.BlobContentType = pString(value)
}
}
ui.blb = o.fs.getBlockBlobSVC(container, containerPath)
return ui, nil
}
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
@ -2221,80 +2298,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return errCantUpdateArchiveTierBlobs
}
}
container, containerPath := o.split()
if container == "" || containerPath == "" {
return fmt.Errorf("can't upload to root - need a container")
}
// Create parent dir/bucket if not saving directory marker
_, isDirMarker := o.meta[dirMetaKey]
if !isDirMarker {
err = o.fs.mkdirParent(ctx, o.remote)
if err != nil {
return err
}
}
// Update Mod time
fs.Debugf(nil, "o.meta = %+v", o.meta)
o.updateMetadataWithModTime(src.ModTime(ctx))
if err != nil {
return err
}
// Create the HTTP headers for the upload
httpHeaders := blob.HTTPHeaders{
BlobContentType: pString(fs.MimeType(ctx, src)),
}
// Compute the Content-MD5 of the file. As we stream all uploads it
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
if !o.fs.opt.DisableCheckSum {
if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
if err == nil {
httpHeaders.BlobContentMD5 = sourceMD5bytes
} else {
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
}
}
}
// Apply upload options (also allows one to overwrite content-type)
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
httpHeaders.BlobCacheControl = pString(value)
case "content-disposition":
httpHeaders.BlobContentDisposition = pString(value)
case "content-encoding":
httpHeaders.BlobContentEncoding = pString(value)
case "content-language":
httpHeaders.BlobContentLanguage = pString(value)
case "content-type":
httpHeaders.BlobContentType = pString(value)
}
}
blb := o.fs.getBlockBlobSVC(container, containerPath)
size := src.Size()
multipartUpload := size < 0 || size > o.fs.poolSize
multipartUpload := size < 0 || size > int64(o.fs.opt.ChunkSize)
var ui uploadInfo
fs.Debugf(nil, "o.meta = %+v", o.meta)
if multipartUpload {
err = o.uploadMultipart(ctx, in, size, blb, &httpHeaders)
ui, err = o.uploadMultipart(ctx, in, src, options...)
} else {
err = o.uploadSinglepart(ctx, in, size, blb, &httpHeaders)
ui, err = o.prepareUpload(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to prepare upload: %w", err)
}
err = o.uploadSinglepart(ctx, in, size, ui)
}
if err != nil {
return err
}
// Refresh metadata on object
if !isDirMarker {
if !ui.isDirMarker {
o.clearMetaData()
err = o.readMetaData(ctx)
if err != nil {
@ -2383,13 +2406,14 @@ func parseTier(tier string) *blob.AccessTier {
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Purger = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.GetTierer = &Object{}
_ fs.SetTierer = &Object{}
_ fs.Fs = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.Purger = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.OpenChunkWriter = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.GetTierer = &Object{}
_ fs.SetTierer = &Object{}
)

View File

@ -20,17 +20,18 @@ func (f *Fs) InternalTest(t *testing.T) {
func TestIncrement(t *testing.T) {
for _, test := range []struct {
in []byte
want []byte
in [8]byte
want [8]byte
}{
{[]byte{0, 0, 0, 0}, []byte{1, 0, 0, 0}},
{[]byte{0xFE, 0, 0, 0}, []byte{0xFF, 0, 0, 0}},
{[]byte{0xFF, 0, 0, 0}, []byte{0, 1, 0, 0}},
{[]byte{0, 1, 0, 0}, []byte{1, 1, 0, 0}},
{[]byte{0xFF, 0xFF, 0xFF, 0xFE}, []byte{0, 0, 0, 0xFF}},
{[]byte{0xFF, 0xFF, 0xFF, 0xFF}, []byte{0, 0, 0, 0}},
{[8]byte{0, 0, 0, 0}, [8]byte{1, 0, 0, 0}},
{[8]byte{0xFE, 0, 0, 0}, [8]byte{0xFF, 0, 0, 0}},
{[8]byte{0xFF, 0, 0, 0}, [8]byte{0, 1, 0, 0}},
{[8]byte{0, 1, 0, 0}, [8]byte{1, 1, 0, 0}},
{[8]byte{0xFF, 0xFF, 0xFF, 0xFE}, [8]byte{0, 0, 0, 0xFF}},
{[8]byte{0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 1}},
{[8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 0, 0, 0}},
} {
increment(test.in)
increment(&test.in)
assert.Equal(t, test.want, test.in)
}
}