From 67572449180cebf90aceace7a40e26d8b2fdc861 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 11 May 2019 10:03:51 +0100 Subject: [PATCH] drive: use multipart resumable uploads for streaming and uploads in mount Before this change we used non multipart uploads for files of unknown size (streaming and uploads in mount). This is slower and less reliable and is not recommended by Google for files smaller than 5MB. After this change we use multipart resumable uploads for all files of unknown length. This will use an extra transaction so is less efficient for files under the chunk size, however the natural buffering in the operations.Rcat call specified by `--streaming-upload-cutoff` will overcome this. See: https://forum.rclone.org/t/upload-behaviour-and-speed-when-using-vfs-cache/9920/ --- backend/drive/drive.go | 4 +- backend/drive/upload.go | 81 +++++++++++++++++++---------------------- 2 files changed, 39 insertions(+), 46 deletions(-) diff --git a/backend/drive/drive.go b/backend/drive/drive.go index 6cc17e163..44f0a7fa8 100644 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -1851,7 +1851,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, } var info *drive.File - if size == 0 || size < int64(f.opt.UploadCutoff) { + if size >= 0 && size < int64(f.opt.UploadCutoff) { // Make the API request to upload metadata and file data. // Don't retry, return a retry error instead err = f.pacer.CallNoRetry(func() (bool, error) { @@ -2845,7 +2845,7 @@ func (o *baseObject) update(ctx context.Context, updateInfo *drive.File, uploadM src fs.ObjectInfo) (info *drive.File, err error) { // Make the API request to upload metadata and file data. size := src.Size() - if size == 0 || size < int64(o.fs.opt.UploadCutoff) { + if size >= 0 && size < int64(o.fs.opt.UploadCutoff) { // Don't retry, return a retry error instead err = o.fs.pacer.CallNoRetry(func() (bool, error) { info, err = o.fs.svc.Files.Update(o.id, updateInfo). diff --git a/backend/drive/upload.go b/backend/drive/upload.go index ebbe3ea16..559aa155a 100644 --- a/backend/drive/upload.go +++ b/backend/drive/upload.go @@ -11,16 +11,15 @@ package drive import ( + "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" - "regexp" "strconv" - "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/lib/readers" @@ -88,7 +87,9 @@ func (f *Fs) Upload(ctx context.Context, in io.Reader, size int64, contentType, }) req.Header.Set("Content-Type", "application/json; charset=UTF-8") req.Header.Set("X-Upload-Content-Type", contentType) - req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size)) + if size >= 0 { + req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size)) + } res, err = f.client.Do(req) if err == nil { defer googleapi.CloseBody(res) @@ -116,49 +117,19 @@ func (rx *resumableUpload) makeRequest(ctx context.Context, start int64, body io req, _ := http.NewRequest("POST", rx.URI, body) req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext req.ContentLength = reqSize + totalSize := "*" + if rx.ContentLength >= 0 { + totalSize = strconv.FormatInt(rx.ContentLength, 10) + } if reqSize != 0 { - req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength)) + req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, totalSize)) } else { - req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength)) + req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", totalSize)) } req.Header.Set("Content-Type", rx.MediaType) return req } -// rangeRE matches the transfer status response from the server. $1 is -// the last byte index uploaded. -var rangeRE = regexp.MustCompile(`^0\-(\d+)$`) - -// Query drive for the amount transferred so far -// -// If error is nil, then start should be valid -func (rx *resumableUpload) transferStatus(ctx context.Context) (start int64, err error) { - req := rx.makeRequest(ctx, 0, nil, 0) - res, err := rx.f.client.Do(req) - if err != nil { - return 0, err - } - defer googleapi.CloseBody(res) - if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK { - return rx.ContentLength, nil - } - if res.StatusCode != statusResumeIncomplete { - err = googleapi.CheckResponse(res) - if err != nil { - return 0, err - } - return 0, errors.Errorf("unexpected http return code %v", res.StatusCode) - } - Range := res.Header.Get("Range") - if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 { - start, err = strconv.ParseInt(m[1], 10, 64) - if err == nil { - return start, nil - } - } - return 0, errors.Errorf("unable to parse range %q", Range) -} - // Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil func (rx *resumableUpload) transferChunk(ctx context.Context, start int64, chunk io.ReadSeeker, chunkSize int64) (int, error) { _, _ = chunk.Seek(0, io.SeekStart) @@ -200,12 +171,34 @@ func (rx *resumableUpload) Upload(ctx context.Context) (*drive.File, error) { var StatusCode int var err error buf := make([]byte, int(rx.f.opt.ChunkSize)) - for start < rx.ContentLength { - reqSize := rx.ContentLength - start - if reqSize >= int64(rx.f.opt.ChunkSize) { - reqSize = int64(rx.f.opt.ChunkSize) + for finished := false; !finished; { + var reqSize int64 + var chunk io.ReadSeeker + if rx.ContentLength >= 0 { + // If size known use repeatable reader for smoother bwlimit + if start >= rx.ContentLength { + break + } + reqSize = rx.ContentLength - start + if reqSize >= int64(rx.f.opt.ChunkSize) { + reqSize = int64(rx.f.opt.ChunkSize) + } + chunk = readers.NewRepeatableLimitReaderBuffer(rx.Media, buf, reqSize) + } else { + // If size unknown read into buffer + var n int + n, err = readers.ReadFill(rx.Media, buf) + if err == io.EOF { + // Send the last chunk with the correct ContentLength + // otherwise Google doesn't know we've finished + rx.ContentLength = start + int64(n) + finished = true + } else if err != nil { + return nil, err + } + reqSize = int64(n) + chunk = bytes.NewReader(buf[:reqSize]) } - chunk := readers.NewRepeatableLimitReaderBuffer(rx.Media, buf, reqSize) // Transfer the chunk err = rx.f.pacer.Call(func() (bool, error) {