googlephotos: implement batcher for uploads - fixes #6920

This commit is contained in:
Nick Craig-Wood 2023-09-13 11:57:52 +01:00
parent b94806a143
commit 3ab9077820
1 changed files with 124 additions and 31 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/log" "github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/lib/batcher"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
@ -71,6 +72,14 @@ var (
ClientSecret: obscure.MustReveal(rcloneEncryptedClientSecret), ClientSecret: obscure.MustReveal(rcloneEncryptedClientSecret),
RedirectURL: oauthutil.RedirectURL, RedirectURL: oauthutil.RedirectURL,
} }
// Configure the batcher
defaultBatcherOptions = batcher.Options{
MaxBatchSize: 50,
DefaultTimeoutSync: 1000 * time.Millisecond,
DefaultTimeoutAsync: 10 * time.Second,
DefaultBatchSizeAsync: 50,
}
) )
// Register with Fs // Register with Fs
@ -111,7 +120,7 @@ will count towards storage in your Google Account.`)
} }
return nil, fmt.Errorf("unknown state %q", config.State) return nil, fmt.Errorf("unknown state %q", config.State)
}, },
Options: append(oauthutil.SharedOptions, []fs.Option{{ Options: append(append(oauthutil.SharedOptions, []fs.Option{{
Name: "read_only", Name: "read_only",
Default: false, Default: false,
Help: `Set to make the Google Photos backend read only. Help: `Set to make the Google Photos backend read only.
@ -158,7 +167,7 @@ listings and won't be transferred.`,
Default: (encoder.Base | Default: (encoder.Base |
encoder.EncodeCrLf | encoder.EncodeCrLf |
encoder.EncodeInvalidUtf8), encoder.EncodeInvalidUtf8),
}}...), }}...), defaultBatcherOptions.FsOptions("")...),
}) })
} }
@ -169,6 +178,9 @@ type Options struct {
StartYear int `config:"start_year"` StartYear int `config:"start_year"`
IncludeArchived bool `config:"include_archived"` IncludeArchived bool `config:"include_archived"`
Enc encoder.MultiEncoder `config:"encoding"` Enc encoder.MultiEncoder `config:"encoding"`
BatchMode string `config:"batch_mode"`
BatchSize int `config:"batch_size"`
BatchTimeout fs.Duration `config:"batch_timeout"`
} }
// Fs represents a remote storage server // Fs represents a remote storage server
@ -187,6 +199,7 @@ type Fs struct {
uploadedMu sync.Mutex // to protect the below uploadedMu sync.Mutex // to protect the below
uploaded dirtree.DirTree // record of uploaded items uploaded dirtree.DirTree // record of uploaded items
createMu sync.Mutex // held when creating albums to prevent dupes createMu sync.Mutex // held when creating albums to prevent dupes
batcher *batcher.Batcher[uploadedItem, *api.MediaItem]
} }
// Object describes a storage object // Object describes a storage object
@ -312,6 +325,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
albums: map[bool]*albums{}, albums: map[bool]*albums{},
uploaded: dirtree.New(), uploaded: dirtree.New(),
} }
batcherOptions := defaultBatcherOptions
batcherOptions.Mode = f.opt.BatchMode
batcherOptions.Size = f.opt.BatchSize
batcherOptions.Timeout = time.Duration(f.opt.BatchTimeout)
f.batcher, err = batcher.New(ctx, f, f.commitBatch, batcherOptions)
if err != nil {
return nil, err
}
f.features = (&fs.Features{ f.features = (&fs.Features{
ReadMimeType: true, ReadMimeType: true,
}).Fill(ctx, f) }).Fill(ctx, f)
@ -781,6 +802,13 @@ func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.None) return hash.Set(hash.None)
} }
// Shutdown the backend, closing any background tasks and any
// cached connections.
func (f *Fs) Shutdown(ctx context.Context) error {
f.batcher.Shutdown()
return nil
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// Fs returns the parent Fs // Fs returns the parent Fs
@ -961,6 +989,82 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
return resp.Body, err return resp.Body, err
} }
// input to the batcher
type uploadedItem struct {
AlbumID string // desired album
UploadToken string // upload ID
}
// Commit a batch of items to albumID returning the errors in errors
func (f *Fs) commitBatchAlbumID(ctx context.Context, items []uploadedItem, results []*api.MediaItem, errors []error, albumID string) {
// Create the media item from an UploadToken, optionally adding to an album
opts := rest.Opts{
Method: "POST",
Path: "/mediaItems:batchCreate",
}
var request = api.BatchCreateRequest{
AlbumID: albumID,
}
itemsInBatch := 0
for i := range items {
if items[i].AlbumID == albumID {
request.NewMediaItems = append(request.NewMediaItems, api.NewMediaItem{
SimpleMediaItem: api.SimpleMediaItem{
UploadToken: items[i].UploadToken,
},
})
itemsInBatch++
}
}
var result api.BatchCreateResponse
var resp *http.Response
var err error
err = f.pacer.Call(func() (bool, error) {
resp, err = f.srv.CallJSON(ctx, &opts, request, &result)
return shouldRetry(ctx, resp, err)
})
if err != nil {
err = fmt.Errorf("failed to create media item: %w", err)
}
if err == nil && len(result.NewMediaItemResults) != itemsInBatch {
err = fmt.Errorf("bad response to BatchCreate expecting %d items but got %d", itemsInBatch, len(result.NewMediaItemResults))
}
j := 0
for i := range items {
if items[i].AlbumID == albumID {
if err == nil {
media := &result.NewMediaItemResults[j]
if media.Status.Code != 0 {
errors[i] = fmt.Errorf("upload failed: %s (%d)", media.Status.Message, media.Status.Code)
} else {
results[i] = &media.MediaItem
}
} else {
errors[i] = err
}
j++
}
}
}
// Called by the batcher to commit a batch
func (f *Fs) commitBatch(ctx context.Context, items []uploadedItem, results []*api.MediaItem, errors []error) (err error) {
// Discover all the AlbumIDs as we have to upload these separately
//
// Should maybe have one batcher per AlbumID
albumIDs := map[string]struct{}{}
for i := range items {
albumIDs[items[i].AlbumID] = struct{}{}
}
// batch the albums
for albumID := range albumIDs {
// errors returned in errors
f.commitBatchAlbumID(ctx, items, results, errors, albumID)
}
return nil
}
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
@ -1021,37 +1125,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return errors.New("empty upload token") return errors.New("empty upload token")
} }
// Create the media item from an UploadToken, optionally adding to an album uploaded := uploadedItem{
opts = rest.Opts{ AlbumID: albumID,
Method: "POST", UploadToken: uploadToken,
Path: "/mediaItems:batchCreate",
} }
var request = api.BatchCreateRequest{
AlbumID: albumID, // Save the upload into an album
NewMediaItems: []api.NewMediaItem{ var info *api.MediaItem
{ if o.fs.batcher.Batching() {
SimpleMediaItem: api.SimpleMediaItem{ info, err = o.fs.batcher.Commit(ctx, o.remote, uploaded)
UploadToken: uploadToken, } else {
}, errors := make([]error, 1)
}, results := make([]*api.MediaItem, 1)
}, err = o.fs.commitBatch(ctx, []uploadedItem{uploaded}, results, errors)
if err != nil {
err = errors[0]
info = results[0]
}
} }
var result api.BatchCreateResponse
err = o.fs.pacer.Call(func() (bool, error) { o.setMetaData(info)
resp, err = o.fs.srv.CallJSON(ctx, &opts, request, &result)
return shouldRetry(ctx, resp, err)
})
if err != nil {
return fmt.Errorf("failed to create media item: %w", err)
}
if len(result.NewMediaItemResults) != 1 {
return errors.New("bad response to BatchCreate wrong number of items")
}
mediaItemResult := result.NewMediaItemResults[0]
if mediaItemResult.Status.Code != 0 {
return fmt.Errorf("upload failed: %s (%d)", mediaItemResult.Status.Message, mediaItemResult.Status.Code)
}
o.setMetaData(&mediaItemResult.MediaItem)
// Add upload to internal storage // Add upload to internal storage
if pattern.isUpload { if pattern.isUpload {