diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index b18a2460b..dc4b0cb1e 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -49,6 +49,7 @@ const ( storageDefaultBaseURL = "blob.core.windows.net" defaultChunkSize = 4 * fs.MebiByte maxChunkSize = 100 * fs.MebiByte + uploadConcurrency = 4 defaultAccessTier = azblob.AccessTierNone maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing) // Default storage account, key and blob endpoint for emulator support, @@ -1492,6 +1493,48 @@ func increment(xs []byte) { } } +// poolWrapper wraps a pool.Pool as an azblob.TransferManager +type poolWrapper struct { + pool *pool.Pool + bufToken chan struct{} + runToken chan struct{} +} + +// newPoolWrapper creates an azblob.TransferManager that will use a +// pool.Pool with maximum concurrency as specified. +func (f *Fs) newPoolWrapper(concurrency int) azblob.TransferManager { + return &poolWrapper{ + pool: f.pool, + bufToken: make(chan struct{}, concurrency), + runToken: make(chan struct{}, concurrency), + } +} + +// Get implements TransferManager.Get(). +func (pw *poolWrapper) Get() []byte { + pw.bufToken <- struct{}{} + return pw.pool.Get() +} + +// Put implements TransferManager.Put(). +func (pw *poolWrapper) Put(b []byte) { + pw.pool.Put(b) + <-pw.bufToken +} + +// Run implements TransferManager.Run(). +func (pw *poolWrapper) Run(f func()) { + pw.runToken <- struct{}{} + go func() { + f() + <-pw.runToken + }() +} + +// Close implements TransferManager.Close(). +func (pw *poolWrapper) Close() { +} + // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned @@ -1538,9 +1581,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op putBlobOptions := azblob.UploadStreamToBlockBlobOptions{ BufferSize: int(o.fs.opt.ChunkSize), - MaxBuffers: 4, + MaxBuffers: uploadConcurrency, Metadata: o.meta, BlobHTTPHeaders: httpHeaders, + TransferManager: o.fs.newPoolWrapper(uploadConcurrency), } // Don't retry, return a retry error instead diff --git a/go.mod b/go.mod index 5a8b12a74..dcf1d9335 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( bazil.org/fuse v0.0.0-20200524192727-fb710f7dfd05 cloud.google.com/go v0.70.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 - github.com/Azure/azure-storage-blob-go v0.12.0 + github.com/Azure/azure-storage-blob-go v0.13.0 github.com/Azure/go-autorest/autorest/adal v0.9.8 github.com/Unknwon/goconfig v0.0.0-20200908083735-df7de6a44db8 github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e diff --git a/go.sum b/go.sum index ab4aedaf8..ec889127b 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVt github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-storage-blob-go v0.12.0 h1:7bFXA1QB+lOK2/ASWHhp6/vnxjaeeZq6t8w1Jyp0Iaw= github.com/Azure/azure-storage-blob-go v0.12.0/go.mod h1:A0u4VjtpgZJ7Y7um/+ix2DHBuEKFC6sEIlj0xc13a4Q= +github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc= +github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest/adal v0.9.2 h1:Aze/GQeAN1RRbGmnUJvUj+tFGBzFdIg3293/A9rbxC4=