From 9bf3d3da4c3996dca64dca53aa4cc984227d34d8 Mon Sep 17 00:00:00 2001 From: Tim Gallant Date: Mon, 10 Feb 2020 01:01:28 -0800 Subject: [PATCH] fs: add UploadHeaders, DownloadHeaders to Update/Put/Open options --- fs/operations/multithread.go | 2 +- fs/operations/operations.go | 14 ++++++-- fs/operations/reopen.go | 65 +++++++++++++++++++++--------------- fs/operations/reopen_test.go | 2 +- 4 files changed, 52 insertions(+), 31 deletions(-) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 54c228583..c5bed49bf 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -71,7 +71,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) - rc, err := NewReOpen(ctx, mc.src, nil, &fs.RangeOption{Start: start, End: end - 1}, fs.Config.LowLevelRetries) + rc, err := NewReOpen(ctx, mc.src, fs.Config.LowLevelRetries, &fs.RangeOption{Start: start, End: end - 1}) if err != nil { return errors.Wrap(err, "multpart copy: failed to open source") } diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 2d3a9046e..4d525459d 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -403,7 +403,11 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj } } else { var in0 io.ReadCloser - in0, err = NewReOpen(ctx, src, hashOption, nil, fs.Config.LowLevelRetries) + options := []fs.OpenOption{hashOption} + for _, option := range fs.Config.DownloadHeaders { + options = append(options, option) + } + in0, err = NewReOpen(ctx, src, fs.Config.LowLevelRetries, options...) if err != nil { err = errors.Wrap(err, "failed to open source object") } else { @@ -424,12 +428,16 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj if src.Remote() != remote { wrappedSrc = NewOverrideRemote(src, remote) } + options := []fs.OpenOption{hashOption} + for _, option := range fs.Config.UploadHeaders { + options = append(options, option) + } if doUpdate { actionTaken = "Copied (replaced existing)" - err = dst.Update(ctx, in, wrappedSrc, hashOption) + err = dst.Update(ctx, in, wrappedSrc, options...) } else { actionTaken = "Copied (new)" - dst, err = f.Put(ctx, in, wrappedSrc, hashOption) + dst, err = f.Put(ctx, in, wrappedSrc, options...) } closeErr := in.Close() if err == nil { diff --git a/fs/operations/reopen.go b/fs/operations/reopen.go index bd45f96a5..688c5d057 100644 --- a/fs/operations/reopen.go +++ b/fs/operations/reopen.go @@ -12,17 +12,16 @@ import ( // ReOpen is a wrapper for an object reader which reopens the stream on error type ReOpen struct { - ctx context.Context - mu sync.Mutex // mutex to protect the below - src fs.Object // object to open - hashOption *fs.HashesOption // option to pass to initial open - rangeOption *fs.RangeOption // option to pass to initial open - rc io.ReadCloser // underlying stream - read int64 // number of bytes read from this stream - maxTries int // maximum number of retries - tries int // number of retries we've had so far in this stream - err error // if this is set then Read/Close calls will return it - opened bool // if set then rc is valid and needs closing + ctx context.Context + mu sync.Mutex // mutex to protect the below + src fs.Object // object to open + options []fs.OpenOption // option to pass to initial open + rc io.ReadCloser // underlying stream + read int64 // number of bytes read from this stream + maxTries int // maximum number of retries + tries int // number of retries we've had so far in this stream + err error // if this is set then Read/Close calls will return it + opened bool // if set then rc is valid and needs closing } var ( @@ -36,13 +35,12 @@ var ( // // If rangeOption is set then this will applied when reading from the // start, and updated on retries. -func NewReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption, rangeOption *fs.RangeOption, maxTries int) (rc io.ReadCloser, err error) { +func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) { h := &ReOpen{ - ctx: ctx, - src: src, - hashOption: hashOption, - rangeOption: rangeOption, - maxTries: maxTries, + ctx: ctx, + src: src, + maxTries: maxTries, + options: options, } h.mu.Lock() defer h.mu.Unlock() @@ -57,20 +55,35 @@ func NewReOpen(ctx context.Context, src fs.Object, hashOption *fs.HashesOption, // // we don't retry here as the Open() call will itself have low level retries func (h *ReOpen) open() error { - var optsArray [2]fs.OpenOption - var opts = optsArray[:0] - if h.read == 0 { - if h.rangeOption != nil { - opts = append(opts, h.rangeOption) + opts := []fs.OpenOption{} + var hashOption *fs.HashesOption + var rangeOption *fs.RangeOption + for _, option := range h.options { + switch option.(type) { + case *fs.HashesOption: + hashOption = option.(*fs.HashesOption) + case *fs.RangeOption: + rangeOption = option.(*fs.RangeOption) + case *fs.HTTPOption: + opts = append(opts, option) + default: + if option.Mandatory() { + fs.Logf(h.src, "Unsupported mandatory option: %v", option) + } } - if h.hashOption != nil { + } + if h.read == 0 { + if rangeOption != nil { + opts = append(opts, rangeOption) + } + if hashOption != nil { // put hashOption on if reading from the start, ditch otherwise - opts = append(opts, h.hashOption) + opts = append(opts, hashOption) } } else { - if h.rangeOption != nil { + if rangeOption != nil { // range to the read point - opts = append(opts, &fs.RangeOption{Start: h.rangeOption.Start + h.read, End: h.rangeOption.End}) + opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End}) } else { // seek to the read point opts = append(opts, &fs.SeekOption{Offset: h.read}) diff --git a/fs/operations/reopen_test.go b/fs/operations/reopen_test.go index dac33e3cd..0faecb812 100644 --- a/fs/operations/reopen_test.go +++ b/fs/operations/reopen_test.go @@ -74,7 +74,7 @@ func TestReOpen(t *testing.T) { breaks: breaks, } hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} - return NewReOpen(context.Background(), src, hashOption, rangeOption, maxRetries) + return NewReOpen(context.Background(), src, maxRetries, hashOption, rangeOption) } t.Run("Basics", func(t *testing.T) {