diff --git a/dropbox/dropbox.go b/dropbox/dropbox.go index a2fdb61ef..4e0ae5091 100644 --- a/dropbox/dropbox.go +++ b/dropbox/dropbox.go @@ -813,8 +813,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size if size != -1 { chunks = int(size/chunkSize) + 1 } - wc := &writeCounter{} - in := io.TeeReader(in0, wc) + in := fs.NewCountingReader(in0) fmtChunk := func(cur int, last bool) { if chunks == 0 && last { @@ -853,12 +852,12 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size // if the size is known, only upload full chunks. Remaining bytes are uploaded with // the UploadSessionFinish request. break - } else if chunks == 0 && wc.Written-cursor.Offset < uint64(chunkSize) { + } else if chunks == 0 && in.BytesRead()-cursor.Offset < uint64(chunkSize) { // if the size is unknown, upload as long as we can read full chunks from the reader. // The UploadSessionFinish request will not contain any payload. break } - cursor.Offset = wc.Written + cursor.Offset = in.BytesRead() fmtChunk(currentChunk, false) err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize}) @@ -871,7 +870,7 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size } // write the remains - cursor.Offset = wc.Written + cursor.Offset = in.BytesRead() args := &files.UploadSessionFinishArg{ Cursor: &cursor, Commit: commitInfo, @@ -929,17 +928,6 @@ func (o *Object) Remove() (err error) { return err } -type writeCounter struct { - Written uint64 -} - -// Write implements the io.Writer interface. -func (wc *writeCounter) Write(p []byte) (int, error) { - n := len(p) - wc.Written += uint64(n) - return n, nil -} - // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) diff --git a/fs/counting_reader.go b/fs/counting_reader.go new file mode 100644 index 000000000..20629dad3 --- /dev/null +++ b/fs/counting_reader.go @@ -0,0 +1,28 @@ +package fs + +import "io" + +// NewCountingReader returns a CountingReader, which will read from the given +// reader while keeping track of how many bytes were read. +func NewCountingReader(in io.Reader) *CountingReader { + return &CountingReader{in: in} +} + +// CountingReader holds a reader and a read count of how many bytes were read +// so far. +type CountingReader struct { + in io.Reader + read uint64 +} + +// Read reads from the underlying reader. +func (cr *CountingReader) Read(b []byte) (int, error) { + n, err := cr.in.Read(b) + cr.read += uint64(n) + return n, err +} + +// BytesRead returns how many bytes were read from the underlying reader so far. +func (cr *CountingReader) BytesRead() uint64 { + return cr.read +} diff --git a/yandex/yandex.go b/yandex/yandex.go index 5a08ae2ea..1cea449ce 100644 --- a/yandex/yandex.go +++ b/yandex/yandex.go @@ -421,6 +421,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs. return o, o.Update(in, src, options...) } +// PutStream uploads to the remote path with the modTime given of indeterminate size +func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.Put(in, src, options...) +} + // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(dir string) error { root := f.diskRoot @@ -565,8 +570,8 @@ func (o *Object) remotePath() string { // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - size := src.Size() +func (o *Object) Update(in0 io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + in := fs.NewCountingReader(in0) modTime := src.ModTime() remote := o.remotePath() @@ -581,7 +586,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio err := o.fs.yd.Upload(in, remote, overwrite, mimeType) if err == nil { //if file uploaded sucessfully then return metadata - o.bytes = uint64(size) + o.bytes = in.BytesRead() o.modTime = modTime o.md5sum = "" // according to unit tests after put the md5 is empty. //and set modTime of uploaded file @@ -648,9 +653,10 @@ func (o *Object) MimeType() string { // Check the interfaces are satisfied var ( - _ fs.Fs = (*Fs)(nil) - _ fs.Purger = (*Fs)(nil) - _ fs.ListRer = (*Fs)(nil) + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) //_ fs.Copier = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil) _ fs.Object = (*Object)(nil)