From 4a1b644bfb3af26d80386a3df26316ec5fcd94cf Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 13 May 2020 21:29:21 +0100 Subject: [PATCH] azureblob: implement streaming of unknown sized files See: https://forum.rclone.org/t/rclone-rcat-azure-blob-container-sas-token-403-error/16286/3 --- backend/azureblob/azureblob.go | 214 ++++++++++--------- backend/azureblob/azureblob_internal_test.go | 17 ++ docs/content/overview.md | 2 +- 3 files changed, 128 insertions(+), 105 deletions(-) diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 435c4659c..9a6568b42 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -9,7 +9,6 @@ import ( "context" "crypto/md5" "encoding/base64" - "encoding/binary" "encoding/hex" "fmt" "io" @@ -36,6 +35,8 @@ import ( "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pool" + "github.com/rclone/rclone/lib/readers" + "golang.org/x/sync/errgroup" ) const ( @@ -857,6 +858,11 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . return fs, fs.Update(ctx, in, src, options...) } +// PutStream uploads to the remote path with the modTime given of indeterminate size +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.Put(ctx, in, src, options...) +} + // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { container, _ := f.split(dir) @@ -1279,141 +1285,140 @@ type readSeeker struct { io.Seeker } +// increment the slice passed in as LSB binary +func increment(xs []byte) { + for i, digit := range xs { + newDigit := digit + 1 + xs[i] = newDigit + if newDigit >= digit { + // exit if no carry + break + } + } +} + +var warnStreamUpload sync.Once + // uploadMultipart uploads a file using multipart upload // // Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList. -func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) { +func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) { // Calculate correct chunkSize chunkSize := int64(o.fs.opt.ChunkSize) - var totalParts int64 - for { - // Calculate number of parts - var remainder int64 - totalParts, remainder = size/chunkSize, size%chunkSize - if remainder != 0 { - totalParts++ + totalParts := -1 + + // Note that the max size of file is 4.75 TB (100 MB X 50,000 + // blocks) and this is bigger than the max uncommitted block + // size (9.52 TB) so we do not need to part commit block lists + // or garbage collect uncommitted blocks. + // + // See: https://docs.microsoft.com/en-gb/rest/api/storageservices/put-block + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 4MB). With a maximum number of parts (50,000) this will be a file of + // 195GB which seems like a not too unreasonable limit. + if size == -1 { + warnStreamUpload.Do(func() { + fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v", + o.fs.opt.ChunkSize, fs.SizeSuffix(chunkSize*maxTotalParts)) + }) + } else { + // Adjust partSize until the number of parts is small enough. + if size/chunkSize >= maxTotalParts { + // Calculate partition size rounded up to the nearest MB + chunkSize = (((size / maxTotalParts) >> 20) + 1) << 20 } - if totalParts < maxTotalParts { - break - } - // Double chunk size if the number of parts is too big - chunkSize *= 2 if chunkSize > int64(maxChunkSize) { return errors.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), totalParts, fs.SizeSuffix(chunkSize/2)) } + totalParts = int(size / chunkSize) + if size%chunkSize != 0 { + totalParts++ + } } + fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize)) - // https://godoc.org/github.com/Azure/azure-storage-blob-go/2017-07-29/azblob#example-BlockBlobURL - // Utilities are cloned from above example - // These helper functions convert a binary block ID to a base-64 string and vice versa - // NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length - blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) } - // These helper functions convert an int block ID to a base-64 string and vice versa - blockIDIntToBase64 := func(blockID uint64) string { - binaryBlockID := (&[8]byte{})[:] // All block IDs are 8 bytes long - binary.LittleEndian.PutUint64(binaryBlockID, blockID) - return blockIDBinaryToBase64(binaryBlockID) - } - - // block ID variables - var ( - rawID uint64 - blockID = "" // id in base64 encoded form - blocks []string - ) - - // increment the blockID - nextID := func() { - rawID++ - blockID = blockIDIntToBase64(rawID) - blocks = append(blocks, blockID) - } - - // Get BlockBlobURL, we will use default pipeline here - blockBlobURL := blob.ToBlockBlobURL() - ctx := context.Background() - ac := azblob.LeaseAccessConditions{} // Use default lease access conditions - // unwrap the accounting from the input, we use wrap to put it // back on after the buffering in, wrap := accounting.UnWrap(in) // Upload the chunks - remaining := size - position := int64(0) - errs := make(chan error, 1) - var wg sync.WaitGroup - memPool := o.fs.getMemoryPool(chunkSize) -outer: - for part := 0; part < int(totalParts); part++ { - // Check any errors - select { - case err = <-errs: - break outer - default: - } - - reqSize := remaining - if reqSize >= chunkSize { - reqSize = chunkSize - } - + var ( + g, gCtx = errgroup.WithContext(ctx) + remaining = size // remaining size in file for logging only, -1 if size < 0 + position = int64(0) // position in file + memPool = o.fs.getMemoryPool(chunkSize) // pool to get memory from + finished = false // set when we have read EOF + blocks []string // list of blocks for finalize + blockBlobURL = blob.ToBlockBlobURL() // Get BlockBlobURL, we will use default pipeline here + ac = azblob.LeaseAccessConditions{} // Use default lease access conditions + binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes + ) + for part := 0; !finished; part++ { // Get a block of memory from the pool and a token which limits concurrency o.fs.uploadToken.Get() buf := memPool.Get() - buf = buf[:reqSize] - // Read the chunk - _, err = io.ReadFull(in, buf) - if err != nil { - err = errors.Wrap(err, "multipart upload failed to read source") + free := func() { memPool.Put(buf) // return the buf o.fs.uploadToken.Put() // return the token - break outer } + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + free() + break + } + + // Read the chunk + n, err := readers.ReadFill(in, buf) // this can never return 0, nil + if err == io.EOF { + if n == 0 { // end if no data + free() + break + } + finished = true + } else if err != nil { + free() + return errors.Wrap(err, "multipart upload failed to read source") + } + buf = buf[:n] + + // increment the blockID and save the blocks for finalize + increment(binaryBlockID) + blockID := base64.StdEncoding.EncodeToString(binaryBlockID) + blocks = append(blocks, blockID) + // Transfer the chunk - nextID() - wg.Add(1) - go func(part int, position int64, blockID string) { - defer wg.Done() - defer o.fs.uploadToken.Put() - defer memPool.Put(buf) - fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) + fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) + g.Go(func() (err error) { + defer free() // Upload the block, with MD5 for check md5sum := md5.Sum(buf) transactionalMD5 := md5sum[:] - err := o.fs.pacer.Call(func() (bool, error) { + err = o.fs.pacer.Call(func() (bool, error) { bufferReader := bytes.NewReader(buf) wrappedReader := wrap(bufferReader) rs := readSeeker{wrappedReader, bufferReader} _, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5) return o.fs.shouldRetry(err) }) - if err != nil { - err = errors.Wrap(err, "multipart upload failed to upload part") - select { - case errs <- err: - default: - } - return + return errors.Wrap(err, "multipart upload failed to upload part") } - }(part, position, blockID) + return nil + }) // ready for next block - remaining -= chunkSize + if size >= 0 { + remaining -= chunkSize + } position += chunkSize } - wg.Wait() - if err == nil { - select { - case err = <-errs: - default: - } - } + err = g.Wait() if err != nil { return err } @@ -1473,7 +1478,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // is merged the SDK can't upload a single blob of exactly the chunk // size, so upload with a multpart upload to work around. // See: https://github.com/rclone/rclone/issues/2653 - multipartUpload := size >= int64(o.fs.opt.UploadCutoff) + multipartUpload := size < 0 || size >= int64(o.fs.opt.UploadCutoff) if size == int64(o.fs.opt.ChunkSize) { multipartUpload = true fs.Debugf(o, "Setting multipart upload for file of chunk size (%d) to work around SDK bug", size) @@ -1483,7 +1488,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op err = o.fs.pacer.CallNoRetry(func() (bool, error) { if multipartUpload { // If a large file upload in chunks - err = o.uploadMultipart(in, size, &blob, &httpHeaders) + err = o.uploadMultipart(ctx, in, size, &blob, &httpHeaders) } else { // Write a small blob in one transaction blockBlobURL := blob.ToBlockBlobURL() @@ -1568,12 +1573,13 @@ func (o *Object) GetTier() string { // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Copier = &Fs{} - _ fs.Purger = &Fs{} - _ fs.ListRer = &Fs{} - _ fs.Object = &Object{} - _ fs.MimeTyper = &Object{} - _ fs.GetTierer = &Object{} - _ fs.SetTierer = &Object{} + _ fs.Fs = &Fs{} + _ fs.Copier = &Fs{} + _ fs.PutStreamer = &Fs{} + _ fs.Purger = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.Object = &Object{} + _ fs.MimeTyper = &Object{} + _ fs.GetTierer = &Object{} + _ fs.SetTierer = &Object{} ) diff --git a/backend/azureblob/azureblob_internal_test.go b/backend/azureblob/azureblob_internal_test.go index 0ccd006b6..17fcb6272 100644 --- a/backend/azureblob/azureblob_internal_test.go +++ b/backend/azureblob/azureblob_internal_test.go @@ -16,3 +16,20 @@ func (f *Fs) InternalTest(t *testing.T) { enabled = f.Features().GetTier assert.True(t, enabled) } + +func TestIncrement(t *testing.T) { + for _, test := range []struct { + in []byte + want []byte + }{ + {[]byte{0, 0, 0, 0}, []byte{1, 0, 0, 0}}, + {[]byte{0xFE, 0, 0, 0}, []byte{0xFF, 0, 0, 0}}, + {[]byte{0xFF, 0, 0, 0}, []byte{0, 1, 0, 0}}, + {[]byte{0, 1, 0, 0}, []byte{1, 1, 0, 0}}, + {[]byte{0xFF, 0xFF, 0xFF, 0xFE}, []byte{0, 0, 0, 0xFF}}, + {[]byte{0xFF, 0xFF, 0xFF, 0xFF}, []byte{0, 0, 0, 0}}, + } { + increment(test.in) + assert.Equal(t, test.want, test.in) + } +} diff --git a/docs/content/overview.md b/docs/content/overview.md index 4adb7acb5..62b7ab63d 100644 --- a/docs/content/overview.md +++ b/docs/content/overview.md @@ -336,7 +336,7 @@ operations more efficient. | Mail.ru Cloud | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | Yes | | Mega | Yes | No | Yes | Yes | Yes | No | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | Yes | | Memory | No | Yes | No | No | No | Yes | Yes | No | No | No | -| Microsoft Azure Blob Storage | Yes | Yes | No | No | No | Yes | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No | +| Microsoft Azure Blob Storage | Yes | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No | | Microsoft OneDrive | Yes | Yes | Yes | Yes | No [#575](https://github.com/rclone/rclone/issues/575) | No | No | Yes | Yes | Yes | | OpenDrive | Yes | Yes | Yes | Yes | No | No | No | No | No | Yes | | Openstack Swift | Yes † | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | No |