diff --git a/backend/googlephotos/googlephotos.go b/backend/googlephotos/googlephotos.go index c743ddec4..df031b606 100644 --- a/backend/googlephotos/googlephotos.go +++ b/backend/googlephotos/googlephotos.go @@ -29,6 +29,7 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/log" + "github.com/rclone/rclone/lib/batcher" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/pacer" @@ -71,6 +72,14 @@ var ( ClientSecret: obscure.MustReveal(rcloneEncryptedClientSecret), RedirectURL: oauthutil.RedirectURL, } + + // Configure the batcher + defaultBatcherOptions = batcher.Options{ + MaxBatchSize: 50, + DefaultTimeoutSync: 1000 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 50, + } ) // Register with Fs @@ -111,7 +120,7 @@ will count towards storage in your Google Account.`) } return nil, fmt.Errorf("unknown state %q", config.State) }, - Options: append(oauthutil.SharedOptions, []fs.Option{{ + Options: append(append(oauthutil.SharedOptions, []fs.Option{{ Name: "read_only", Default: false, Help: `Set to make the Google Photos backend read only. @@ -158,7 +167,7 @@ listings and won't be transferred.`, Default: (encoder.Base | encoder.EncodeCrLf | encoder.EncodeInvalidUtf8), - }}...), + }}...), defaultBatcherOptions.FsOptions("")...), }) } @@ -169,6 +178,9 @@ type Options struct { StartYear int `config:"start_year"` IncludeArchived bool `config:"include_archived"` Enc encoder.MultiEncoder `config:"encoding"` + BatchMode string `config:"batch_mode"` + BatchSize int `config:"batch_size"` + BatchTimeout fs.Duration `config:"batch_timeout"` } // Fs represents a remote storage server @@ -187,6 +199,7 @@ type Fs struct { uploadedMu sync.Mutex // to protect the below uploaded dirtree.DirTree // record of uploaded items createMu sync.Mutex // held when creating albums to prevent dupes + batcher *batcher.Batcher[uploadedItem, *api.MediaItem] } // Object describes a storage object @@ -312,6 +325,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e albums: map[bool]*albums{}, uploaded: dirtree.New(), } + batcherOptions := defaultBatcherOptions + batcherOptions.Mode = f.opt.BatchMode + batcherOptions.Size = f.opt.BatchSize + batcherOptions.Timeout = time.Duration(f.opt.BatchTimeout) + f.batcher, err = batcher.New(ctx, f, f.commitBatch, batcherOptions) + if err != nil { + return nil, err + } f.features = (&fs.Features{ ReadMimeType: true, }).Fill(ctx, f) @@ -781,6 +802,13 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.None) } +// Shutdown the backend, closing any background tasks and any +// cached connections. +func (f *Fs) Shutdown(ctx context.Context) error { + f.batcher.Shutdown() + return nil +} + // ------------------------------------------------------------ // Fs returns the parent Fs @@ -961,6 +989,82 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read return resp.Body, err } +// input to the batcher +type uploadedItem struct { + AlbumID string // desired album + UploadToken string // upload ID +} + +// Commit a batch of items to albumID returning the errors in errors +func (f *Fs) commitBatchAlbumID(ctx context.Context, items []uploadedItem, results []*api.MediaItem, errors []error, albumID string) { + // Create the media item from an UploadToken, optionally adding to an album + opts := rest.Opts{ + Method: "POST", + Path: "/mediaItems:batchCreate", + } + var request = api.BatchCreateRequest{ + AlbumID: albumID, + } + itemsInBatch := 0 + for i := range items { + if items[i].AlbumID == albumID { + request.NewMediaItems = append(request.NewMediaItems, api.NewMediaItem{ + SimpleMediaItem: api.SimpleMediaItem{ + UploadToken: items[i].UploadToken, + }, + }) + itemsInBatch++ + } + } + var result api.BatchCreateResponse + var resp *http.Response + var err error + err = f.pacer.Call(func() (bool, error) { + resp, err = f.srv.CallJSON(ctx, &opts, request, &result) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + err = fmt.Errorf("failed to create media item: %w", err) + } + if err == nil && len(result.NewMediaItemResults) != itemsInBatch { + err = fmt.Errorf("bad response to BatchCreate expecting %d items but got %d", itemsInBatch, len(result.NewMediaItemResults)) + } + j := 0 + for i := range items { + if items[i].AlbumID == albumID { + if err == nil { + media := &result.NewMediaItemResults[j] + if media.Status.Code != 0 { + errors[i] = fmt.Errorf("upload failed: %s (%d)", media.Status.Message, media.Status.Code) + } else { + results[i] = &media.MediaItem + } + } else { + errors[i] = err + } + j++ + } + } +} + +// Called by the batcher to commit a batch +func (f *Fs) commitBatch(ctx context.Context, items []uploadedItem, results []*api.MediaItem, errors []error) (err error) { + // Discover all the AlbumIDs as we have to upload these separately + // + // Should maybe have one batcher per AlbumID + albumIDs := map[string]struct{}{} + for i := range items { + albumIDs[items[i].AlbumID] = struct{}{} + } + + // batch the albums + for albumID := range albumIDs { + // errors returned in errors + f.commitBatchAlbumID(ctx, items, results, errors, albumID) + } + return nil +} + // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned @@ -1021,37 +1125,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return errors.New("empty upload token") } - // Create the media item from an UploadToken, optionally adding to an album - opts = rest.Opts{ - Method: "POST", - Path: "/mediaItems:batchCreate", + uploaded := uploadedItem{ + AlbumID: albumID, + UploadToken: uploadToken, } - var request = api.BatchCreateRequest{ - AlbumID: albumID, - NewMediaItems: []api.NewMediaItem{ - { - SimpleMediaItem: api.SimpleMediaItem{ - UploadToken: uploadToken, - }, - }, - }, + + // Save the upload into an album + var info *api.MediaItem + if o.fs.batcher.Batching() { + info, err = o.fs.batcher.Commit(ctx, o.remote, uploaded) + } else { + errors := make([]error, 1) + results := make([]*api.MediaItem, 1) + err = o.fs.commitBatch(ctx, []uploadedItem{uploaded}, results, errors) + if err != nil { + err = errors[0] + info = results[0] + } } - var result api.BatchCreateResponse - err = o.fs.pacer.Call(func() (bool, error) { - resp, err = o.fs.srv.CallJSON(ctx, &opts, request, &result) - return shouldRetry(ctx, resp, err) - }) - if err != nil { - return fmt.Errorf("failed to create media item: %w", err) - } - if len(result.NewMediaItemResults) != 1 { - return errors.New("bad response to BatchCreate wrong number of items") - } - mediaItemResult := result.NewMediaItemResults[0] - if mediaItemResult.Status.Code != 0 { - return fmt.Errorf("upload failed: %s (%d)", mediaItemResult.Status.Message, mediaItemResult.Status.Code) - } - o.setMetaData(&mediaItemResult.MediaItem) + + o.setMetaData(info) // Add upload to internal storage if pattern.isUpload {