diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 021040de5..fa05c3e5b 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -256,6 +256,21 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) return } +func checkUploadCutoff(cs fs.SizeSuffix) error { + if cs > maxUploadCutoff { + return errors.Errorf("%v must be less than or equal to %v", cs, maxUploadCutoff) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + // NewFs contstructs an Fs from the path, container:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -265,8 +280,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { return nil, err } - if opt.UploadCutoff > maxUploadCutoff { - return nil, errors.Errorf("azure: upload cutoff (%v) must be less than or equal to %v", opt.UploadCutoff, maxUploadCutoff) + err = checkUploadCutoff(opt.UploadCutoff) + if err != nil { + return nil, errors.Wrap(err, "azure: upload cutoff") } err = checkUploadChunkSize(opt.ChunkSize) if err != nil { diff --git a/backend/azureblob/azureblob_test.go b/backend/azureblob/azureblob_test.go index bce127333..dcfd898c7 100644 --- a/backend/azureblob/azureblob_test.go +++ b/backend/azureblob/azureblob_test.go @@ -27,4 +27,11 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { return f.setUploadChunkSize(cs) } -var _ fstests.SetUploadChunkSizer = (*Fs)(nil) +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +) diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 829de2971..77c2c5277 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -293,6 +293,22 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) err = checkUploadChunkSize(cs) if err == nil { old, f.opt.ChunkSize = f.opt.ChunkSize, cs + f.fillBufferTokens() // reset the buffer tokens + } + return +} + +func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error { + if cs < opt.ChunkSize { + return errors.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(&f.opt, cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs } return } @@ -305,8 +321,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } - if opt.UploadCutoff < opt.ChunkSize { - return nil, errors.Errorf("b2: upload cutoff (%v) must be greater than or equal to chunk size (%v)", opt.UploadCutoff, opt.ChunkSize) + err = checkUploadCutoff(opt, opt.UploadCutoff) + if err != nil { + return nil, errors.Wrap(err, "b2: upload cutoff") } err = checkUploadChunkSize(opt.ChunkSize) if err != nil { @@ -326,13 +343,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { opt.Endpoint = defaultEndpoint } f := &Fs{ - name: name, - opt: *opt, - bucket: bucket, - root: directory, - srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), - bufferTokens: make(chan []byte, fs.Config.Transfers), + name: name, + opt: *opt, + bucket: bucket, + root: directory, + srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), } f.features = (&fs.Features{ ReadMimeType: true, @@ -345,10 +361,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f.srv.SetHeader(testModeHeader, testMode) fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) } - // Fill up the buffer tokens - for i := 0; i < fs.Config.Transfers; i++ { - f.bufferTokens <- nil - } + f.fillBufferTokens() err = f.authorizeAccount() if err != nil { return nil, errors.Wrap(err, "failed to authorize account") @@ -456,6 +469,14 @@ func (f *Fs) clearUploadURL() { f.uploadMu.Unlock() } +// Fill up (or reset) the buffer tokens +func (f *Fs) fillBufferTokens() { + f.bufferTokens = make(chan []byte, fs.Config.Transfers) + for i := 0; i < fs.Config.Transfers; i++ { + f.bufferTokens <- nil + } +} + // getUploadBlock gets a block from the pool of size chunkSize func (f *Fs) getUploadBlock() []byte { buf := <-f.bufferTokens diff --git a/backend/b2/b2_test.go b/backend/b2/b2_test.go index 033d6c750..11c49f22d 100644 --- a/backend/b2/b2_test.go +++ b/backend/b2/b2_test.go @@ -14,7 +14,8 @@ func TestIntegration(t *testing.T) { RemoteName: "TestB2:", NilObject: (*Object)(nil), ChunkedUpload: fstests.ChunkedUploadConfig{ - MinChunkSize: minChunkSize, + MinChunkSize: minChunkSize, + NeedMultipleChunks: true, }, }) } @@ -23,4 +24,11 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { return f.setUploadChunkSize(cs) } -var _ fstests.SetUploadChunkSizer = (*Fs)(nil) +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +) diff --git a/backend/drive/drive.go b/backend/drive/drive.go index 139a72223..72f477190 100644 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -806,6 +806,18 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) return } +func checkUploadCutoff(cs fs.SizeSuffix) error { + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + // NewFs contstructs an Fs from the path, container:path func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -814,6 +826,10 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } + err = checkUploadCutoff(opt.UploadCutoff) + if err != nil { + return nil, errors.Wrap(err, "drive: upload cutoff") + } err = checkUploadChunkSize(opt.ChunkSize) if err != nil { return nil, errors.Wrap(err, "drive: chunk size") diff --git a/backend/drive/drive_test.go b/backend/drive/drive_test.go index f30899ac0..e8cbb31f3 100644 --- a/backend/drive/drive_test.go +++ b/backend/drive/drive_test.go @@ -24,4 +24,11 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { return f.setUploadChunkSize(cs) } -var _ fstests.SetUploadChunkSizer = (*Fs)(nil) +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +)