diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index e826cc788..00ed959e8 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -9,24 +9,22 @@ FIXME make searching for directory in id and file in id more efficient FIXME make the default for no files and no dirs be (FILE & FOLDER) so we ignore assets completely! - */ import ( "fmt" "io" "log" - "math/rand" "net/http" "regexp" "strings" - "sync" "time" "github.com/ncw/go-acd" "github.com/ncw/rclone/dircache" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/oauthutil" + "github.com/ncw/rclone/pacer" "golang.org/x/oauth2" ) @@ -40,8 +38,9 @@ const ( assetKind = "ASSET" statusAvailable = "AVAILABLE" timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z - minBackoff = 1 * time.Second - maxBackoff = 256 * time.Second + minSleep = 100 * time.Millisecond + maxSleep = 256 * time.Second + decayConstant = 2 // bigger for slower decay, exponential ) // Globals @@ -57,7 +56,6 @@ var ( ClientSecret: fs.Reveal(rcloneClientSecret), RedirectURL: redirectURL, } - FIXME = fmt.Errorf("FIXME not implemented") ) // Register with Fs @@ -87,9 +85,7 @@ type FsAcd struct { c *acd.Client // the connection to the acd server root string // the path we are working on dirCache *dircache.DirCache // Map of directory path to directory id - - backoffLock sync.Mutex - backoff time.Duration // current backoff + pacer *pacer.Pacer // pacer for API calls } // FsObjectAcd describes a acd object @@ -127,6 +123,17 @@ func parsePath(path string) (root string) { return } +// shouldRetry returns a boolean as to whether this resp and err +// deserve to be retried. It returns the err as a convenience +func shouldRetry(resp *http.Response, err error) (bool, error) { + // FIXME retry other http codes? + // 409 conflict ? + if err != nil && resp != nil && resp.StatusCode == 429 { + return true, err + } + return false, err +} + // NewFs contstructs an FsAcd from the path, container:path func NewFs(name, root string) (fs.Fs, error) { root = parsePath(root) @@ -138,19 +145,28 @@ func NewFs(name, root string) (fs.Fs, error) { c := acd.NewClient(oAuthClient) c.UserAgent = fs.UserAgent f := &FsAcd{ - name: name, - root: root, - c: c, + name: name, + root: root, + c: c, + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), } // Update endpoints - _, _, err = f.c.Account.GetEndpoints() + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + _, resp, err = f.c.Account.GetEndpoints() + return shouldRetry(resp, err) + }) if err != nil { return nil, fmt.Errorf("Failed to get endpoints: %v", err) } // Get rootID - rootInfo, _, err := f.c.Nodes.GetRoot() + var rootInfo *acd.Folder + err = f.pacer.Call(func() (bool, error) { + rootInfo, resp, err = f.c.Nodes.GetRoot() + return shouldRetry(resp, err) + }) if err != nil || rootInfo.Id == nil { return nil, fmt.Errorf("Failed to get root: %v", err) } @@ -210,46 +226,16 @@ func (f *FsAcd) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } -// errChk checks the response and if it is a 429 error returns a -// RetryError, otherwise it returns just a plain error -// -// It also implements the backoff strategy -func (f *FsAcd) errChk(resp *http.Response, err error) error { - if err != nil && resp != nil && resp.StatusCode == 429 { - // Update backoff - f.backoffLock.Lock() - backoff := f.backoff - if backoff == 0 { - backoff = minBackoff - } else { - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } - f.backoff = backoff - f.backoffLock.Unlock() - // Sleep for the backoff time - sleepTime := time.Duration(rand.Int63n(int64(backoff))) - fs.Debug(f, "Retry error: backoff is now %v, sleeping for %v", backoff, sleepTime) - time.Sleep(sleepTime) - return fs.RetryError(err) - } - // Reset backoff on success - if err == nil { - f.backoffLock.Lock() - f.backoff = 0 - f.backoffLock.Unlock() - } - return err -} - // FindLeaf finds a directory of name leaf in the folder with ID pathId func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err error) { //fs.Debug(f, "FindLeaf(%q, %q)", pathId, leaf) folder := acd.FolderFromId(pathId, f.c.Nodes) - subFolder, _, err := folder.GetFolder(leaf) - // err = f.errChk(resp, err) + var resp *http.Response + var subFolder *acd.Folder + err = f.pacer.Call(func() (bool, error) { + subFolder, resp, err = folder.GetFolder(leaf) + return shouldRetry(resp, err) + }) if err != nil { if err == acd.ErrorNodeNotFound { //fs.Debug(f, "...Not found") @@ -269,15 +255,19 @@ func (f *FsAcd) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err // CreateDir makes a directory with pathId as parent and name leaf func (f *FsAcd) CreateDir(pathId, leaf string) (newId string, err error) { - //fs.Debug(f, "CreateDir(%q, %q)", pathId, leaf) + //fmt.Printf("CreateDir(%q, %q)\n", pathId, leaf) folder := acd.FolderFromId(pathId, f.c.Nodes) - info, _, err := folder.CreateFolder(leaf) - // err = f.errChk(resp, err) + var resp *http.Response + var info *acd.Folder + err = f.pacer.Call(func() (bool, error) { + info, resp, err = folder.CreateFolder(leaf) + return shouldRetry(resp, err) + }) if err != nil { - fs.Debug(f, "...Error %v", err) + //fmt.Printf("...Error %v\n", err) return "", err } - //fs.Debug(f, "...Id %q", *info.Id) + //fmt.Printf("...Id %q\n", *info.Id) return *info.Id, nil } @@ -310,8 +300,11 @@ func (f *FsAcd) listAll(dirId string, title string, directoriesOnly bool, filesO //var resp *http.Response OUTER: for { - nodes, _, err = f.c.Nodes.GetNodes(&opts) - // err = f.errChk(resp, err) + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + nodes, resp, err = f.c.Nodes.GetNodes(&opts) + return shouldRetry(resp, err) + }) if err != nil { fs.Stats.Error() fs.ErrorLog(f, "Couldn't list files: %v", err) @@ -440,12 +433,14 @@ func (f *FsAcd) Put(in io.Reader, remote string, modTime time.Time, size int64) folder := acd.FolderFromId(directoryID, o.acd.c.Nodes) var info *acd.File var resp *http.Response - if size != 0 { - info, resp, err = folder.Put(in, leaf) - } else { - info, resp, err = folder.PutSized(in, size, leaf) - } - err = f.errChk(resp, err) + err = f.pacer.CallNoRetry(func() (bool, error) { + if size != 0 { + info, resp, err = folder.Put(in, leaf) + } else { + info, resp, err = folder.PutSized(in, size, leaf) + } + return shouldRetry(resp, err) + }) if err != nil { return nil, err } @@ -497,8 +492,10 @@ func (f *FsAcd) purgeCheck(check bool) error { node := acd.NodeFromId(rootID, f.c.Nodes) var resp *http.Response - resp, err = node.Trash() - err = f.errChk(resp, err) + err = f.pacer.Call(func() (bool, error) { + resp, err = node.Trash() + return shouldRetry(resp, err) + }) if err != nil { return err } @@ -599,8 +596,12 @@ func (o *FsObjectAcd) readMetaData() (err error) { return err } folder := acd.FolderFromId(directoryID, o.acd.c.Nodes) - info, _, err := folder.GetFile(leaf) - // err = o.acd.errChk(resp, err) + var resp *http.Response + var info *acd.File + err = o.acd.pacer.Call(func() (bool, error) { + info, resp, err = folder.GetFile(leaf) + return shouldRetry(resp, err) + }) if err != nil { fs.Debug(o, "Failed to read info: %s", err) return err @@ -643,8 +644,10 @@ func (o *FsObjectAcd) Storable() bool { func (o *FsObjectAcd) Open() (in io.ReadCloser, err error) { file := acd.File{Node: o.info} var resp *http.Response - in, resp, err = file.Open() - err = o.acd.errChk(resp, err) + err = o.acd.pacer.Call(func() (bool, error) { + in, resp, err = file.Open() + return shouldRetry(resp, err) + }) return in, err } @@ -656,12 +659,14 @@ func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error var info *acd.File var resp *http.Response var err error - if size != 0 { - info, resp, err = file.OverwriteSized(in, size) - } else { - info, resp, err = file.Overwrite(in) - } - err = o.acd.errChk(resp, err) + err = o.acd.pacer.CallNoRetry(func() (bool, error) { + if size != 0 { + info, resp, err = file.OverwriteSized(in, size) + } else { + info, resp, err = file.Overwrite(in) + } + return shouldRetry(resp, err) + }) if err != nil { return err } @@ -671,8 +676,12 @@ func (o *FsObjectAcd) Update(in io.Reader, modTime time.Time, size int64) error // Remove an object func (o *FsObjectAcd) Remove() error { - resp, err := o.info.Trash() - err = o.acd.errChk(resp, err) + var resp *http.Response + var err error + err = o.acd.pacer.Call(func() (bool, error) { + resp, err = o.info.Trash() + return shouldRetry(resp, err) + }) return err } diff --git a/drive/drive.go b/drive/drive.go index 2eb1eba13..64ad4e834 100644 --- a/drive/drive.go +++ b/drive/drive.go @@ -23,6 +23,7 @@ import ( "github.com/ncw/rclone/dircache" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/oauthutil" + "github.com/ncw/rclone/pacer" "github.com/spf13/pflag" ) @@ -82,14 +83,13 @@ func init() { // FsDrive represents a remote drive server type FsDrive struct { - name string // name of this remote - svc *drive.Service // the connection to the drive server - root string // the path we are working on - client *http.Client // authorized client - about *drive.About // information about the drive, including the root - dirCache *dircache.DirCache // Map of directory path to directory id - pacer chan struct{} // To pace the operations - sleepTime time.Duration // Time to sleep for each transaction + name string // name of this remote + svc *drive.Service // the connection to the drive server + root string // the path we are working on + client *http.Client // authorized client + about *drive.About // information about the drive, including the root + dirCache *dircache.DirCache // Map of directory path to directory id + pacer *pacer.Pacer // To pace the API calls } // FsObjectDrive describes a drive object @@ -120,46 +120,11 @@ func (f *FsDrive) String() string { return fmt.Sprintf("Google drive root '%s'", f.root) } -// Start a call to the drive API -// -// This must be called as a pair with endCall -// -// This waits for the pacer token -func (f *FsDrive) beginCall() { - // pacer starts with a token in and whenever we take one out - // XXX ms later we put another in. We could do this with a - // Ticker more accurately, but then we'd have to work out how - // not to run it when it wasn't needed - <-f.pacer - - // Restart the timer - go func(t time.Duration) { - // fs.Debug(f, "New sleep for %v at %v", t, time.Now()) - time.Sleep(t) - f.pacer <- struct{}{} - }(f.sleepTime) -} - -// End a call to the drive API -// -// Refresh the pace given an error that was returned. It returns a -// boolean as to whether the operation should be retried. -// -// See https://developers.google.com/drive/web/handle-errors -// http://stackoverflow.com/questions/18529524/403-rate-limit-after-only-1-insert-per-second -func (f *FsDrive) endCall(err error) bool { - again := false - oldSleepTime := f.sleepTime - if err == nil { - f.sleepTime = (f.sleepTime<> decayConstant - if f.sleepTime < minSleep { - f.sleepTime = minSleep - } - if f.sleepTime != oldSleepTime { - fs.Debug(f, "Reducing sleep to %v", f.sleepTime) - } - } else { - fs.Debug(f, "Error recived: %T %#v", err, err) +// shouldRetry determines whehter a given err rates being retried +func shouldRetry(err error) (again bool, errOut error) { + again = false + errOut = err + if err != nil { // Check for net error Timeout() if x, ok := err.(interface { Timeout() bool @@ -185,30 +150,7 @@ func (f *FsDrive) endCall(err error) bool { } } } - if again { - f.sleepTime *= 2 - if f.sleepTime > maxSleep { - f.sleepTime = maxSleep - } - if f.sleepTime != oldSleepTime { - fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime) - } - } - return again -} - -// Pace the remote operations to not exceed Google's limits and retry -// on 403 rate limit exceeded -// -// This calls fn, expecting it to place its error in perr -func (f *FsDrive) call(perr *error, fn func()) { - for { - f.beginCall() - fn() - if !f.endCall(*perr) { - break - } - } + return again, err } // parseParse parses a drive 'url' @@ -249,8 +191,9 @@ func (f *FsDrive) listAll(dirId string, title string, directoriesOnly bool, file OUTER: for { var files *drive.FileList - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { files, err = list.Do() + return shouldRetry(err) }) if err != nil { return false, fmt.Errorf("Couldn't list directory: %s", err) @@ -301,15 +244,11 @@ func NewFs(name, path string) (fs.Fs, error) { } f := &FsDrive{ - name: name, - root: root, - pacer: make(chan struct{}, 1), - sleepTime: minSleep, + name: name, + root: root, + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), } - // Put the first pacing token in - f.pacer <- struct{}{} - // Create a new authorized Drive client. f.client = oAuthClient f.svc, err = drive.New(f.client) @@ -318,8 +257,9 @@ func NewFs(name, path string) (fs.Fs, error) { } // Read About so we know the root path - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { f.about, err = f.svc.About.Get().Do() + return shouldRetry(err) }) if err != nil { return nil, fmt.Errorf("Couldn't read info about Drive: %s", err) @@ -411,8 +351,9 @@ func (f *FsDrive) CreateDir(pathId, leaf string) (newId string, err error) { Parents: []*drive.ParentReference{{Id: pathId}}, } var info *drive.File - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { info, err = f.svc.Files.Insert(createInfo).Do() + return shouldRetry(err) }) if err != nil { return "", err @@ -625,13 +566,12 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64 if size == 0 || size < int64(driveUploadCutoff) { // Make the API request to upload metadata and file data. // Don't retry, return a retry error instead - f.beginCall() - info, err = f.svc.Files.Insert(createInfo).Media(in).Do() - if f.endCall(err) { - return o, fs.RetryErrorf("Upload failed - retry: %s", err) - } + err = f.pacer.CallNoRetry(func() (bool, error) { + info, err = f.svc.Files.Insert(createInfo).Media(in).Do() + return shouldRetry(err) + }) if err != nil { - return o, fmt.Errorf("Upload failed: %s", err) + return o, err } } else { // Upload the file in chunks @@ -658,8 +598,9 @@ func (f *FsDrive) Rmdir() error { return err } var children *drive.ChildList - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { children, err = f.svc.Children.List(f.dirCache.RootID()).MaxResults(10).Do() + return shouldRetry(err) }) if err != nil { return err @@ -669,12 +610,13 @@ func (f *FsDrive) Rmdir() error { } // Delete the directory if it isn't the root if f.root != "" { - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { if *driveUseTrash { _, err = f.svc.Files.Trash(f.dirCache.RootID()).Do() } else { err = f.svc.Files.Delete(f.dirCache.RootID()).Do() } + return shouldRetry(err) }) if err != nil { return err @@ -711,8 +653,9 @@ func (f *FsDrive) Copy(src fs.Object, remote string) (fs.Object, error) { } var info *drive.File - o.drive.call(&err, func() { + err = o.drive.pacer.Call(func() (bool, error) { info, err = o.drive.svc.Files.Copy(srcObj.id, createInfo).Do() + return shouldRetry(err) }) if err != nil { return nil, err @@ -735,12 +678,13 @@ func (f *FsDrive) Purge() error { if err != nil { return err } - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { if *driveUseTrash { _, err = f.svc.Files.Trash(f.dirCache.RootID()).Do() } else { err = f.svc.Files.Delete(f.dirCache.RootID()).Do() } + return shouldRetry(err) }) f.dirCache.ResetRoot() if err != nil { @@ -921,8 +865,9 @@ func (o *FsObjectDrive) SetModTime(modTime time.Time) { } // Set modified date var info *drive.File - o.drive.call(&err, func() { + err = o.drive.pacer.Call(func() (bool, error) { info, err = o.drive.svc.Files.Update(o.id, updateInfo).SetModifiedDate(true).Do() + return shouldRetry(err) }) if err != nil { fs.Stats.Error() @@ -949,8 +894,9 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) { } req.Header.Set("User-Agent", fs.UserAgent) var res *http.Response - o.drive.call(&err, func() { + err = o.drive.pacer.Call(func() (bool, error) { res, err = o.drive.client.Do(req) + return shouldRetry(err) }) if err != nil { return nil, err @@ -978,13 +924,12 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro var info *drive.File if size == 0 || size < int64(driveUploadCutoff) { // Don't retry, return a retry error instead - o.drive.beginCall() - info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do() - if o.drive.endCall(err) { - return fs.RetryErrorf("Update failed - retry: %s", err) - } + err = o.drive.pacer.CallNoRetry(func() (bool, error) { + info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do() + return shouldRetry(err) + }) if err != nil { - return fmt.Errorf("Update failed: %s", err) + return err } } else { // Upload the file in chunks @@ -1000,12 +945,13 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro // Remove an object func (o *FsObjectDrive) Remove() error { var err error - o.drive.call(&err, func() { + err = o.drive.pacer.Call(func() (bool, error) { if *driveUseTrash { _, err = o.drive.svc.Files.Trash(o.id).Do() } else { err = o.drive.svc.Files.Delete(o.id).Do() } + return shouldRetry(err) }) return err } diff --git a/drive/upload.go b/drive/upload.go index 3e91b8e5e..97599d589 100644 --- a/drive/upload.go +++ b/drive/upload.go @@ -78,12 +78,13 @@ func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *dri req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size)) req.Header.Set("User-Agent", fs.UserAgent) var res *http.Response - f.call(&err, func() { + err = f.pacer.Call(func() (bool, error) { res, err = f.client.Do(req) if err == nil { defer googleapi.CloseBody(res) err = googleapi.CheckResponse(res) } + return shouldRetry(err) }) if err != nil { return nil, err @@ -203,19 +204,19 @@ func (rx *resumableUpload) Upload() (*drive.File, error) { } // Transfer the chunk - for try := 1; try <= maxTries; try++ { - fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries) - rx.f.beginCall() + err = rx.f.pacer.Call(func() (bool, error) { + fs.Debug(rx.remote, "Sending chunk %d length %d", start, reqSize) StatusCode, err = rx.transferChunk(start, buf) - rx.f.endCall(err) + again, err := shouldRetry(err) if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK { - goto success + again = false + err = nil } - fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err) + return again, err + }) + if err != nil { + return nil, err } - fs.Debug(rx.remote, "Failed to send chunk") - return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err) - success: start += reqSize } diff --git a/pacer/pacer.go b/pacer/pacer.go new file mode 100644 index 000000000..1cc007184 --- /dev/null +++ b/pacer/pacer.go @@ -0,0 +1,152 @@ +// pacer is a utility package to make pacing and retrying API calls easy +package pacer + +import ( + "time" + + "github.com/ncw/rclone/fs" +) + +type Pacer struct { + minSleep time.Duration // minimum sleep time + maxSleep time.Duration // maximum sleep time + decayConstant uint // decay constant + pacer chan struct{} // To pace the operations + sleepTime time.Duration // Time to sleep for each transaction + retries int // Max number of retries +} + +// Paced is a function which is called by the Call and CallNoRetry +// methods. It should return a boolean, true if it would like to be +// retried, and an error. This error may be returned or returned +// wrapped in a RetryError. +type Paced func() (bool, error) + +// New returns a Pacer with sensible defaults +func New() *Pacer { + p := &Pacer{ + minSleep: 10 * time.Millisecond, + maxSleep: 2 * time.Second, + decayConstant: 2, + retries: 10, + pacer: make(chan struct{}, 1), + } + p.sleepTime = p.minSleep + + // Put the first pacing token in + p.pacer <- struct{}{} + + return p +} + +// SetMinSleep sets the minimum sleep time for the pacer +func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { + p.minSleep = t + p.sleepTime = p.minSleep + return p +} + +// SetMaxSleep sets the maximum sleep time for the pacer +func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer { + p.maxSleep = t + p.sleepTime = p.minSleep + return p +} + +// SetDecayConstant sets the decay constant for the pacer +// +// This is the speed the time falls back to the minimum after errors +// have occurred. +// +// bigger for slower decay, exponential +func (p *Pacer) SetDecayConstant(decay uint) *Pacer { + p.decayConstant = decay + return p +} + +// SetRetries sets the max number of tries for Call +func (p *Pacer) SetRetries(retries int) *Pacer { + p.retries = retries + return p +} + +// Start a call to the API +// +// This must be called as a pair with endCall +// +// This waits for the pacer token +func (p *Pacer) beginCall() { + // pacer starts with a token in and whenever we take one out + // XXX ms later we put another in. We could do this with a + // Ticker more accurately, but then we'd have to work out how + // not to run it when it wasn't needed + <-p.pacer + + // Restart the timer + go func(t time.Duration) { + // fs.Debug(f, "New sleep for %v at %v", t, time.Now()) + time.Sleep(t) + p.pacer <- struct{}{} + }(p.sleepTime) +} + +// End a call to the API +// +// Refresh the pace given an error that was returned. It returns a +// boolean as to whether the operation should be retried. +func (p *Pacer) endCall(again bool) { + oldSleepTime := p.sleepTime + if again { + p.sleepTime *= 2 + if p.sleepTime > p.maxSleep { + p.sleepTime = p.maxSleep + } + if p.sleepTime != oldSleepTime { + fs.Debug("pacer", "Rate limited, increasing sleep to %v", p.sleepTime) + } + } else { + p.sleepTime = (p.sleepTime<> p.decayConstant + if p.sleepTime < p.minSleep { + p.sleepTime = p.minSleep + } + if p.sleepTime != oldSleepTime { + fs.Debug("pacer", "Reducing sleep to %v", p.sleepTime) + } + } +} + +// call implements Call but with settable retries +func (p *Pacer) call(fn Paced, retries int) (err error) { + var again bool + for i := 0; i < retries; i++ { + p.beginCall() + again, err = fn() + p.endCall(again) + if !again { + break + } + } + if again { + err = fs.RetryError(err) + } + return err +} + +// Call paces the remote operations to not exceed the limits and retry +// on rate limit exceeded +// +// This calls fn, expecting it to return a retry flag and an +// error. This error may be returned wrapped in a RetryError if the +// number of retries is exceeded. +func (p *Pacer) Call(fn Paced) (err error) { + return p.call(fn, p.retries) +} + +// Pace the remote operations to not exceed Amazon's limits and return +// a retry error on rate limit exceeded +// +// This calls fn and wraps the output in a RetryError if it would like +// it to be retried +func (p *Pacer) CallNoRetry(fn Paced) error { + return p.call(fn, 1) +}