azureblob, b2, drive: implement set upload cutoff for chunked upload tests

This commit is contained in:
Nick Craig-Wood 2018-10-13 22:45:17 +01:00
parent 6fea75afde
commit e1503add41
6 changed files with 94 additions and 19 deletions

View File

@ -256,6 +256,21 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return return
} }
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs > maxUploadCutoff {
return errors.Errorf("%v must be less than or equal to %v", cs, maxUploadCutoff)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// NewFs contstructs an Fs from the path, container:path // NewFs contstructs an Fs from the path, container:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct // Parse config into Options struct
@ -265,8 +280,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
return nil, err return nil, err
} }
if opt.UploadCutoff > maxUploadCutoff { err = checkUploadCutoff(opt.UploadCutoff)
return nil, errors.Errorf("azure: upload cutoff (%v) must be less than or equal to %v", opt.UploadCutoff, maxUploadCutoff) if err != nil {
return nil, errors.Wrap(err, "azure: upload cutoff")
} }
err = checkUploadChunkSize(opt.ChunkSize) err = checkUploadChunkSize(opt.ChunkSize)
if err != nil { if err != nil {

View File

@ -27,4 +27,11 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs) return f.setUploadChunkSize(cs)
} }
var _ fstests.SetUploadChunkSizer = (*Fs)(nil) func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetUploadCutoffer = (*Fs)(nil)
)

View File

@ -293,6 +293,22 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
err = checkUploadChunkSize(cs) err = checkUploadChunkSize(cs)
if err == nil { if err == nil {
old, f.opt.ChunkSize = f.opt.ChunkSize, cs old, f.opt.ChunkSize = f.opt.ChunkSize, cs
f.fillBufferTokens() // reset the buffer tokens
}
return
}
func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error {
if cs < opt.ChunkSize {
return errors.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(&f.opt, cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
} }
return return
} }
@ -305,8 +321,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if opt.UploadCutoff < opt.ChunkSize { err = checkUploadCutoff(opt, opt.UploadCutoff)
return nil, errors.Errorf("b2: upload cutoff (%v) must be greater than or equal to chunk size (%v)", opt.UploadCutoff, opt.ChunkSize) if err != nil {
return nil, errors.Wrap(err, "b2: upload cutoff")
} }
err = checkUploadChunkSize(opt.ChunkSize) err = checkUploadChunkSize(opt.ChunkSize)
if err != nil { if err != nil {
@ -326,13 +343,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
opt.Endpoint = defaultEndpoint opt.Endpoint = defaultEndpoint
} }
f := &Fs{ f := &Fs{
name: name, name: name,
opt: *opt, opt: *opt,
bucket: bucket, bucket: bucket,
root: directory, root: directory,
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
bufferTokens: make(chan []byte, fs.Config.Transfers),
} }
f.features = (&fs.Features{ f.features = (&fs.Features{
ReadMimeType: true, ReadMimeType: true,
@ -345,10 +361,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f.srv.SetHeader(testModeHeader, testMode) f.srv.SetHeader(testModeHeader, testMode)
fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode)
} }
// Fill up the buffer tokens f.fillBufferTokens()
for i := 0; i < fs.Config.Transfers; i++ {
f.bufferTokens <- nil
}
err = f.authorizeAccount() err = f.authorizeAccount()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to authorize account") return nil, errors.Wrap(err, "failed to authorize account")
@ -456,6 +469,14 @@ func (f *Fs) clearUploadURL() {
f.uploadMu.Unlock() f.uploadMu.Unlock()
} }
// Fill up (or reset) the buffer tokens
func (f *Fs) fillBufferTokens() {
f.bufferTokens = make(chan []byte, fs.Config.Transfers)
for i := 0; i < fs.Config.Transfers; i++ {
f.bufferTokens <- nil
}
}
// getUploadBlock gets a block from the pool of size chunkSize // getUploadBlock gets a block from the pool of size chunkSize
func (f *Fs) getUploadBlock() []byte { func (f *Fs) getUploadBlock() []byte {
buf := <-f.bufferTokens buf := <-f.bufferTokens

View File

@ -14,7 +14,8 @@ func TestIntegration(t *testing.T) {
RemoteName: "TestB2:", RemoteName: "TestB2:",
NilObject: (*Object)(nil), NilObject: (*Object)(nil),
ChunkedUpload: fstests.ChunkedUploadConfig{ ChunkedUpload: fstests.ChunkedUploadConfig{
MinChunkSize: minChunkSize, MinChunkSize: minChunkSize,
NeedMultipleChunks: true,
}, },
}) })
} }
@ -23,4 +24,11 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs) return f.setUploadChunkSize(cs)
} }
var _ fstests.SetUploadChunkSizer = (*Fs)(nil) func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetUploadCutoffer = (*Fs)(nil)
)

View File

@ -806,6 +806,18 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return return
} }
func checkUploadCutoff(cs fs.SizeSuffix) error {
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// NewFs contstructs an Fs from the path, container:path // NewFs contstructs an Fs from the path, container:path
func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) { func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct // Parse config into Options struct
@ -814,6 +826,10 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = checkUploadCutoff(opt.UploadCutoff)
if err != nil {
return nil, errors.Wrap(err, "drive: upload cutoff")
}
err = checkUploadChunkSize(opt.ChunkSize) err = checkUploadChunkSize(opt.ChunkSize)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "drive: chunk size") return nil, errors.Wrap(err, "drive: chunk size")

View File

@ -24,4 +24,11 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs) return f.setUploadChunkSize(cs)
} }
var _ fstests.SetUploadChunkSizer = (*Fs)(nil) func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetUploadCutoffer = (*Fs)(nil)
)