From 61616ba8643b2fc1d073651a69381f3991c0ce77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Sat, 9 Feb 2019 21:52:15 +0100 Subject: [PATCH] pacer: make pacer more flexible Make the pacer package more flexible by extracting the pace calculation functions into a separate interface. This also allows to move features that require the fs package like logging and custom errors into the fs package. Also add a RetryAfterError sentinel error that can be used to signal a desired retry time to the Calculator. --- backend/amazonclouddrive/amazonclouddrive.go | 4 +- backend/azureblob/azureblob.go | 4 +- backend/b2/b2.go | 12 +- backend/box/box.go | 4 +- backend/drive/drive.go | 6 +- backend/dropbox/dropbox.go | 6 +- .../googlecloudstorage/googlecloudstorage.go | 4 +- backend/jottacloud/jottacloud.go | 4 +- backend/mega/mega.go | 4 +- backend/onedrive/onedrive.go | 4 +- backend/opendrive/opendrive.go | 4 +- backend/pcloud/pcloud.go | 4 +- backend/s3/s3.go | 4 +- backend/swift/swift.go | 4 +- backend/webdav/webdav.go | 4 +- backend/yandex/yandex.go | 4 +- fs/fs.go | 89 ++++ fs/fs_test.go | 51 ++ lib/pacer/pacer.go | 450 ++++++----------- lib/pacer/pacer_test.go | 461 +++++++----------- lib/pacer/pacers.go | 326 +++++++++++++ 21 files changed, 822 insertions(+), 631 deletions(-) create mode 100644 lib/pacer/pacers.go diff --git a/backend/amazonclouddrive/amazonclouddrive.go b/backend/amazonclouddrive/amazonclouddrive.go index a1288aa67..a61227e23 100644 --- a/backend/amazonclouddrive/amazonclouddrive.go +++ b/backend/amazonclouddrive/amazonclouddrive.go @@ -155,7 +155,7 @@ type Fs struct { noAuthClient *http.Client // unauthenticated http client root string // the path we are working on dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls trueRootID string // ID of true root directory tokenRenewer *oauthutil.Renew // renew the token on expiry } @@ -273,7 +273,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { root: root, opt: *opt, c: c, - pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer), + pacer: fs.NewPacer(pacer.NewAmazonCloudDrive(pacer.MinSleep(minSleep))), noAuthClient: fshttp.NewClient(fs.Config), } f.features = (&fs.Features{ diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 5c9defb3f..51aad65c4 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -144,7 +144,7 @@ type Fs struct { containerOKMu sync.Mutex // mutex to protect container OK containerOK bool // true if we have created the container containerDeleted bool // true if we have deleted the container - pacer *pacer.Pacer // To pace and retry the API calls + pacer *fs.Pacer // To pace and retry the API calls uploadToken *pacer.TokenDispenser // control concurrency } @@ -347,7 +347,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { opt: *opt, container: container, root: directory, - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant).SetPacer(pacer.S3Pacer), + pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), client: fshttp.NewClient(fs.Config), } diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 6f3539f66..75dbd6783 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -167,7 +167,7 @@ type Fs struct { uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadURLResponse // result of get upload URL calls authMu sync.Mutex // lock for authorizing the account - pacer *pacer.Pacer // To pace and retry the API calls + pacer *fs.Pacer // To pace and retry the API calls bufferTokens chan []byte // control concurrency of multipart uploads } @@ -251,13 +251,7 @@ func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) { fs.Errorf(f, "Malformed %s header %q: %v", retryAfterHeader, retryAfterString, err) } } - retryAfterDuration := time.Duration(retryAfter) * time.Second - if f.pacer.GetSleep() < retryAfterDuration { - fs.Debugf(f, "Setting sleep to %v after error: %v", retryAfterDuration, err) - // We set 1/2 the value here because the pacer will double it immediately - f.pacer.SetSleep(retryAfterDuration / 2) - } - return true, err + return true, pacer.RetryAfterError(err, time.Duration(retryAfter)*time.Second) } return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err } @@ -363,7 +357,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { bucket: bucket, root: directory, srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ ReadMimeType: true, diff --git a/backend/box/box.go b/backend/box/box.go index 3bf242e77..f6f4a629c 100644 --- a/backend/box/box.go +++ b/backend/box/box.go @@ -111,7 +111,7 @@ type Fs struct { features *fs.Features // optional features srv *rest.Client // the connection to the one drive server dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls tokenRenewer *oauthutil.Renew // renew the token on expiry uploadToken *pacer.TokenDispenser // control concurrency } @@ -260,7 +260,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { root: root, opt: *opt, srv: rest.NewClient(oAuthClient).SetRoot(rootURL), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), } f.features = (&fs.Features{ diff --git a/backend/drive/drive.go b/backend/drive/drive.go index 3dfa1a51d..c45b8a35f 100644 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -426,7 +426,7 @@ type Fs struct { client *http.Client // authorized client rootFolderID string // the id of the root folder dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // To pace the API calls + pacer *fs.Pacer // To pace the API calls exportExtensions []string // preferred extensions to download docs importMimeTypes []string // MIME types to convert to docs isTeamDrive bool // true if this is a team drive @@ -789,8 +789,8 @@ func configTeamDrive(opt *Options, m configmap.Mapper, name string) error { } // newPacer makes a pacer configured for drive -func newPacer(opt *Options) *pacer.Pacer { - return pacer.New().SetMinSleep(time.Duration(opt.PacerMinSleep)).SetBurst(opt.PacerBurst).SetPacer(pacer.GoogleDrivePacer) +func newPacer(opt *Options) *fs.Pacer { + return fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(opt.PacerMinSleep), pacer.Burst(opt.PacerBurst))) } func getServiceAccountClient(opt *Options, credentialsData []byte) (*http.Client, error) { diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index fa17ba97e..7d4748d97 100644 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -160,7 +160,7 @@ type Fs struct { team team.Client // for the Teams API slashRoot string // root with "/" prefix, lowercase slashRootSlash string // root with "/" prefix and postfix, lowercase - pacer *pacer.Pacer // To pace the API calls + pacer *fs.Pacer // To pace the API calls ns string // The namespace we are using or "" for none } @@ -209,7 +209,7 @@ func shouldRetry(err error) (bool, error) { case auth.RateLimitAPIError: if e.RateLimitError.RetryAfter > 0 { fs.Debugf(baseErrString, "Too many requests or write operations. Trying again in %d seconds.", e.RateLimitError.RetryAfter) - time.Sleep(time.Duration(e.RateLimitError.RetryAfter) * time.Second) + err = pacer.RetryAfterError(err, time.Duration(e.RateLimitError.RetryAfter)*time.Second) } return true, err } @@ -273,7 +273,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f := &Fs{ name: name, opt: *opt, - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } config := dropbox.Config{ LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go index 5e7b65015..25866ff81 100644 --- a/backend/googlecloudstorage/googlecloudstorage.go +++ b/backend/googlecloudstorage/googlecloudstorage.go @@ -256,7 +256,7 @@ type Fs struct { bucket string // the bucket we are working on bucketOKMu sync.Mutex // mutex to protect bucket OK bucketOK bool // true if we have created the bucket - pacer *pacer.Pacer // To pace the API calls + pacer *fs.Pacer // To pace the API calls } // Object describes a storage object @@ -395,7 +395,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { bucket: bucket, root: directory, opt: *opt, - pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.GoogleDrivePacer), + pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))), } f.features = (&fs.Features{ ReadMimeType: true, diff --git a/backend/jottacloud/jottacloud.go b/backend/jottacloud/jottacloud.go index 99ab163aa..62bd394c2 100644 --- a/backend/jottacloud/jottacloud.go +++ b/backend/jottacloud/jottacloud.go @@ -190,7 +190,7 @@ type Fs struct { endpointURL string srv *rest.Client apiSrv *rest.Client - pacer *pacer.Pacer + pacer *fs.Pacer tokenRenewer *oauthutil.Renew // renew the token on expiry } @@ -403,7 +403,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { opt: *opt, srv: rest.NewClient(oAuthClient).SetRoot(rootURL), apiSrv: rest.NewClient(oAuthClient).SetRoot(apiURL), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ CaseInsensitive: true, diff --git a/backend/mega/mega.go b/backend/mega/mega.go index e61dd319c..be3d7f8f1 100644 --- a/backend/mega/mega.go +++ b/backend/mega/mega.go @@ -98,7 +98,7 @@ type Fs struct { opt Options // parsed config options features *fs.Features // optional features srv *mega.Mega // the connection to the server - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls rootNodeMu sync.Mutex // mutex for _rootNode _rootNode *mega.Node // root node - call findRoot to use this mkdirMu sync.Mutex // used to serialize calls to mkdir / rmdir @@ -217,7 +217,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { root: root, opt: *opt, srv: srv, - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ DuplicateFiles: true, diff --git a/backend/onedrive/onedrive.go b/backend/onedrive/onedrive.go index ee2440b5a..21395f1ec 100644 --- a/backend/onedrive/onedrive.go +++ b/backend/onedrive/onedrive.go @@ -261,7 +261,7 @@ type Fs struct { features *fs.Features // optional features srv *rest.Client // the connection to the one drive server dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls tokenRenewer *oauthutil.Renew // renew the token on expiry driveID string // ID to use for querying Microsoft Graph driveType string // https://developer.microsoft.com/en-us/graph/docs/api-reference/v1.0/resources/drive @@ -475,7 +475,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { driveID: opt.DriveID, driveType: opt.DriveType, srv: rest.NewClient(oAuthClient).SetRoot(graphURL + "/drives/" + opt.DriveID), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ CaseInsensitive: true, diff --git a/backend/opendrive/opendrive.go b/backend/opendrive/opendrive.go index 5d8cc53a2..127fdc7e7 100644 --- a/backend/opendrive/opendrive.go +++ b/backend/opendrive/opendrive.go @@ -65,7 +65,7 @@ type Fs struct { opt Options // parsed options features *fs.Features // optional features srv *rest.Client // the connection to the server - pacer *pacer.Pacer // To pace and retry the API calls + pacer *fs.Pacer // To pace and retry the API calls session UserSessionInfo // contains the session data dirCache *dircache.DirCache // Map of directory path to directory id } @@ -144,7 +144,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { root: root, opt: *opt, srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.dirCache = dircache.New(root, "0", f) diff --git a/backend/pcloud/pcloud.go b/backend/pcloud/pcloud.go index 7a75932de..2caaa1444 100644 --- a/backend/pcloud/pcloud.go +++ b/backend/pcloud/pcloud.go @@ -95,7 +95,7 @@ type Fs struct { features *fs.Features // optional features srv *rest.Client // the connection to the server dirCache *dircache.DirCache // Map of directory path to directory id - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls tokenRenewer *oauthutil.Renew // renew the token on expiry } @@ -254,7 +254,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { root: root, opt: *opt, srv: rest.NewClient(oAuthClient).SetRoot(rootURL), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.features = (&fs.Features{ CaseInsensitive: false, diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 8a545470c..f70d0b27b 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -782,7 +782,7 @@ type Fs struct { bucketOKMu sync.Mutex // mutex to protect bucket OK bucketOK bool // true if we have created the bucket bucketDeleted bool // true if we have deleted the bucket - pacer *pacer.Pacer // To pace the API calls + pacer *fs.Pacer // To pace the API calls srv *http.Client // a plain http client } @@ -1055,7 +1055,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { c: c, bucket: bucket, ses: ses, - pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer), + pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), srv: fshttp.NewClient(fs.Config), } f.features = (&fs.Features{ diff --git a/backend/swift/swift.go b/backend/swift/swift.go index aa72adc57..1708db02b 100644 --- a/backend/swift/swift.go +++ b/backend/swift/swift.go @@ -216,7 +216,7 @@ type Fs struct { containerOK bool // true if we have created the container segmentsContainer string // container to store the segments (if any) in noCheckContainer bool // don't check the container before creating it - pacer *pacer.Pacer // To pace the API calls + pacer *fs.Pacer // To pace the API calls } // Object describes a swift object @@ -401,7 +401,7 @@ func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, n segmentsContainer: container + "_segments", root: directory, noCheckContainer: noCheckContainer, - pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.S3Pacer), + pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), } f.features = (&fs.Features{ ReadMimeType: true, diff --git a/backend/webdav/webdav.go b/backend/webdav/webdav.go index 889da8cdf..91205b00f 100644 --- a/backend/webdav/webdav.go +++ b/backend/webdav/webdav.go @@ -101,7 +101,7 @@ type Fs struct { endpoint *url.URL // URL of the host endpointURL string // endpoint as a string srv *rest.Client // the connection to the one drive server - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls precision time.Duration // mod time precision canStream bool // set if can stream useOCMtime bool // set if can use X-OC-Mtime @@ -318,7 +318,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { endpoint: u, endpointURL: u.String(), srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetRoot(u.String()), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), precision: fs.ModTimeNotSupported, } f.features = (&fs.Features{ diff --git a/backend/yandex/yandex.go b/backend/yandex/yandex.go index 624c425e6..864cb25f4 100644 --- a/backend/yandex/yandex.go +++ b/backend/yandex/yandex.go @@ -93,7 +93,7 @@ type Fs struct { opt Options // parsed options features *fs.Features // optional features srv *rest.Client // the connection to the yandex server - pacer *pacer.Pacer // pacer for API calls + pacer *fs.Pacer // pacer for API calls diskRoot string // root path with "disk:/" container name } @@ -269,7 +269,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { name: name, opt: *opt, srv: rest.NewClient(oAuthClient).SetRoot(rootURL), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } f.setRoot(root) f.features = (&fs.Features{ diff --git a/fs/fs.go b/fs/fs.go index f8ded9ed6..ffdd6cf36 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -16,8 +16,10 @@ import ( "github.com/ncw/rclone/fs/config/configmap" "github.com/ncw/rclone/fs/config/configstruct" + "github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fspath" "github.com/ncw/rclone/fs/hash" + "github.com/ncw/rclone/lib/pacer" "github.com/pkg/errors" ) @@ -1112,3 +1114,90 @@ func GetModifyWindow(fss ...Info) time.Duration { } return window } + +// Pacer is a simple wrapper around a pacer.Pacer with logging. +type Pacer struct { + *pacer.Pacer +} + +type logCalculator struct { + pacer.Calculator +} + +// NewPacer creates a Pacer for the given Fs and Calculator. +func NewPacer(c pacer.Calculator) *Pacer { + p := &Pacer{ + Pacer: pacer.New( + pacer.InvokerOption(pacerInvoker), + pacer.MaxConnectionsOption(Config.Checkers+Config.Transfers), + pacer.RetriesOption(Config.LowLevelRetries), + pacer.CalculatorOption(c), + ), + } + p.SetCalculator(c) + return p +} + +func (d *logCalculator) Calculate(state pacer.State) time.Duration { + type causer interface { + Cause() error + } + + if c, ok := state.LastError.(causer); ok { + state.LastError = c.Cause() + } else { + Logf("pacer", "Invalid error in fs.Pacer: %t", state.LastError) + } + oldSleepTime := state.SleepTime + newSleepTime := d.Calculator.Calculate(state) + if state.ConsecutiveRetries > 0 { + if newSleepTime != oldSleepTime { + Debugf("pacer", "Rate limited, increasing sleep to %v", newSleepTime) + } + } else { + if newSleepTime != oldSleepTime { + Debugf("pacer", "Reducing sleep to %v", newSleepTime) + } + } + return newSleepTime +} + +// SetCalculator sets the pacing algorithm. Don't modify the Calculator object +// afterwards, use the ModifyCalculator method when needed. +// +// It will choose the default algorithm if nil is passed in. +func (p *Pacer) SetCalculator(c pacer.Calculator) { + switch c.(type) { + case *logCalculator: + Logf("pacer", "Invalid Calculator in fs.Pacer.SetCalculator") + case nil: + c = &logCalculator{pacer.NewDefault()} + default: + c = &logCalculator{c} + } + + p.Pacer.SetCalculator(c) +} + +// ModifyCalculator calls the given function with the currently configured +// Calculator and the Pacer lock held. +func (p *Pacer) ModifyCalculator(f func(pacer.Calculator)) { + p.ModifyCalculator(func(c pacer.Calculator) { + switch _c := c.(type) { + case *logCalculator: + f(_c.Calculator) + default: + Logf("pacer", "Invalid Calculator in fs.Pacer: %t", c) + f(c) + } + }) +} + +func pacerInvoker(try, retries int, f pacer.Paced) (retry bool, err error) { + retry, err = f() + if retry { + Debugf("pacer", "low level retry %d/%d (error %v)", try, retries, err) + err = fserrors.RetryError(err) + } + return +} diff --git a/fs/fs_test.go b/fs/fs_test.go index 753614b0b..0fd15c22d 100644 --- a/fs/fs_test.go +++ b/fs/fs_test.go @@ -2,8 +2,15 @@ package fs import ( "strings" + "sync" "testing" + "time" + "github.com/stretchr/testify/require" + + "github.com/ncw/rclone/fs/fserrors" + "github.com/ncw/rclone/lib/pacer" + "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/stretchr/testify/assert" ) @@ -70,3 +77,47 @@ func TestOption(t *testing.T) { err = d.Set("sdfsdf") assert.Error(t, err) } + +var errFoo = errors.New("foo") + +type dummyPaced struct { + retry bool + called int + wait *sync.Cond +} + +func (dp *dummyPaced) fn() (bool, error) { + if dp.wait != nil { + dp.wait.L.Lock() + dp.wait.Wait() + dp.wait.L.Unlock() + } + dp.called++ + return dp.retry, errFoo +} + +func TestPacerCall(t *testing.T) { + expectedCalled := Config.LowLevelRetries + if expectedCalled == 0 { + expectedCalled = 20 + Config.LowLevelRetries = expectedCalled + defer func() { + Config.LowLevelRetries = 0 + }() + } + p := NewPacer(pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond))) + + dp := &dummyPaced{retry: true} + err := p.Call(dp.fn) + require.Equal(t, expectedCalled, dp.called) + require.Implements(t, (*fserrors.Retrier)(nil), err) +} + +func TestPacerCallNoRetry(t *testing.T) { + p := NewPacer(pacer.NewDefault(pacer.MinSleep(1*time.Millisecond), pacer.MaxSleep(2*time.Millisecond))) + + dp := &dummyPaced{retry: true} + err := p.CallNoRetry(dp.fn) + require.Equal(t, 1, dp.called) + require.Implements(t, (*fserrors.Retrier)(nil), err) +} diff --git a/lib/pacer/pacer.go b/lib/pacer/pacer.go index 3914950ad..7e58230f5 100644 --- a/lib/pacer/pacer.go +++ b/lib/pacer/pacer.go @@ -2,74 +2,69 @@ package pacer import ( - "context" - "math/rand" "sync" "time" - "github.com/ncw/rclone/fs" - "github.com/ncw/rclone/fs/fserrors" - "golang.org/x/time/rate" + "github.com/ncw/rclone/lib/errors" ) -// Pacer state -type Pacer struct { - mu sync.Mutex // Protecting read/writes - minSleep time.Duration // minimum sleep time - maxSleep time.Duration // maximum sleep time - burst int // number of calls to send without rate limiting - limiter *rate.Limiter // rate limiter for the minsleep - decayConstant uint // decay constant - attackConstant uint // attack constant - pacer chan struct{} // To pace the operations - sleepTime time.Duration // Time to sleep for each transaction - retries int // Max number of retries - maxConnections int // Maximum number of concurrent connections - connTokens chan struct{} // Connection tokens - calculatePace func(bool) // switchable pacing algorithm - call with mu held - consecutiveRetries int // number of consecutive retries +// State represents the public Pacer state that will be passed to the +// configured Calculator +type State struct { + SleepTime time.Duration // current time to sleep before adding the pacer token back + ConsecutiveRetries int // number of consecutive retries, will be 0 when the last invoker call returned false + LastError error // the error returned by the last invoker call or nil } -// Type is for selecting different pacing algorithms -type Type int +// Calculator is a generic calculation function for a Pacer. +type Calculator interface { + // Calculate takes the current Pacer state and returns the sleep time after which + // the next Pacer call will be done. + Calculate(state State) time.Duration +} -const ( - // DefaultPacer is a truncated exponential attack and decay. - // - // On retries the sleep time is doubled, on non errors then - // sleeptime decays according to the decay constant as set - // with SetDecayConstant. - // - // The sleep never goes below that set with SetMinSleep or - // above that set with SetMaxSleep. - DefaultPacer = Type(iota) +// Pacer is the primary type of the pacer package. It allows to retry calls +// with a configurable delay in between. +type Pacer struct { + pacerOptions + mu sync.Mutex // Protecting read/writes + pacer chan struct{} // To pace the operations + connTokens chan struct{} // Connection tokens + state State +} +type pacerOptions struct { + maxConnections int // Maximum number of concurrent connections + retries int // Max number of retries + calculator Calculator // switchable pacing algorithm - call with mu held + invoker InvokerFunc // wrapper function used to invoke the target function +} - // AmazonCloudDrivePacer is a specialised pacer for Amazon Drive - // - // It implements a truncated exponential backoff strategy with - // randomization. Normally operations are paced at the - // interval set with SetMinSleep. On errors the sleep timer - // is set to 0..2**retries seconds. - // - // See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices - AmazonCloudDrivePacer +// InvokerFunc is the signature of the wrapper function used to invoke the +// target function in Pacer. +type InvokerFunc func(try, tries int, f Paced) (bool, error) - // GoogleDrivePacer is a specialised pacer for Google Drive - // - // It implements a truncated exponential backoff strategy with - // randomization. Normally operations are paced at the - // interval set with SetMinSleep. On errors the sleep timer - // is set to (2 ^ n) + random_number_milliseconds seconds - // - // See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff - GoogleDrivePacer +// Option can be used in New to configure the Pacer. +type Option func(*pacerOptions) - // S3Pacer is a specialised pacer for S3 - // - // It is basically the defaultPacer, but allows the sleep time to go to 0 - // when things are going well. - S3Pacer -) +// CalculatorOption sets a Calculator for the new Pacer. +func CalculatorOption(c Calculator) Option { + return func(p *pacerOptions) { p.calculator = c } +} + +// RetriesOption sets the retries number for the new Pacer. +func RetriesOption(retries int) Option { + return func(p *pacerOptions) { p.retries = retries } +} + +// MaxConnectionsOption sets the maximum connections number for the new Pacer. +func MaxConnectionsOption(maxConnections int) Option { + return func(p *pacerOptions) { p.maxConnections = maxConnections } +} + +// InvokerOption sets a InvokerFunc for the new Pacer. +func InvokerOption(invoker InvokerFunc) Option { + return func(p *pacerOptions) { p.invoker = invoker } +} // Paced is a function which is called by the Call and CallNoRetry // methods. It should return a boolean, true if it would like to be @@ -77,19 +72,27 @@ const ( // wrapped in a RetryError. type Paced func() (bool, error) -// New returns a Pacer with sensible defaults -func New() *Pacer { - p := &Pacer{ - maxSleep: 2 * time.Second, - decayConstant: 2, - attackConstant: 1, - retries: fs.Config.LowLevelRetries, - pacer: make(chan struct{}, 1), +// New returns a Pacer with sensible defaults. +func New(options ...Option) *Pacer { + opts := pacerOptions{ + maxConnections: 10, + retries: 3, } - p.sleepTime = p.minSleep - p.SetPacer(DefaultPacer) - p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers) - p.SetMinSleep(10 * time.Millisecond) + for _, o := range options { + o(&opts) + } + p := &Pacer{ + pacerOptions: opts, + pacer: make(chan struct{}, 1), + } + if p.calculator == nil { + p.SetCalculator(nil) + } + p.state.SleepTime = p.calculator.Calculate(p.state) + if p.invoker == nil { + p.invoker = invoke + } + p.SetMaxConnections(p.maxConnections) // Put the first pacing token in p.pacer <- struct{}{} @@ -97,54 +100,11 @@ func New() *Pacer { return p } -// SetSleep sets the current sleep time -func (p *Pacer) SetSleep(t time.Duration) *Pacer { - p.mu.Lock() - defer p.mu.Unlock() - p.sleepTime = t - return p -} - -// GetSleep gets the current sleep time -func (p *Pacer) GetSleep() time.Duration { - p.mu.Lock() - defer p.mu.Unlock() - return p.sleepTime -} - -// SetMinSleep sets the minimum sleep time for the pacer -func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { - p.mu.Lock() - defer p.mu.Unlock() - p.minSleep = t - p.sleepTime = p.minSleep - p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) - return p -} - -// SetBurst sets the burst with no limiting of the pacer -func (p *Pacer) SetBurst(n int) *Pacer { - p.mu.Lock() - defer p.mu.Unlock() - p.burst = n - p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) - return p -} - -// SetMaxSleep sets the maximum sleep time for the pacer -func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer { - p.mu.Lock() - defer p.mu.Unlock() - p.maxSleep = t - p.sleepTime = p.minSleep - return p -} - // SetMaxConnections sets the maximum number of concurrent connections. // Setting the value to 0 will allow unlimited number of connections. // Should not be changed once you have started calling the pacer. // By default this will be set to fs.Config.Checkers. -func (p *Pacer) SetMaxConnections(n int) *Pacer { +func (p *Pacer) SetMaxConnections(n int) { p.mu.Lock() defer p.mu.Unlock() p.maxConnections = n @@ -156,61 +116,34 @@ func (p *Pacer) SetMaxConnections(n int) *Pacer { p.connTokens <- struct{}{} } } - return p } -// SetDecayConstant sets the decay constant for the pacer -// -// This is the speed the time falls back to the minimum after errors -// have occurred. -// -// bigger for slower decay, exponential. 1 is halve, 0 is go straight to minimum -func (p *Pacer) SetDecayConstant(decay uint) *Pacer { - p.mu.Lock() - defer p.mu.Unlock() - p.decayConstant = decay - return p -} - -// SetAttackConstant sets the attack constant for the pacer -// -// This is the speed the time grows from the minimum after errors have -// occurred. -// -// bigger for slower attack, 1 is double, 0 is go straight to maximum -func (p *Pacer) SetAttackConstant(attack uint) *Pacer { - p.mu.Lock() - defer p.mu.Unlock() - p.attackConstant = attack - return p -} - -// SetRetries sets the max number of tries for Call -func (p *Pacer) SetRetries(retries int) *Pacer { +// SetRetries sets the max number of retries for Call +func (p *Pacer) SetRetries(retries int) { p.mu.Lock() defer p.mu.Unlock() p.retries = retries - return p } -// SetPacer sets the pacing algorithm +// SetCalculator sets the pacing algorithm. Don't modify the Calculator object +// afterwards, use the ModifyCalculator method when needed. // -// It will choose the default algorithm if an incorrect value is -// passed in. -func (p *Pacer) SetPacer(t Type) *Pacer { +// It will choose the default algorithm if nil is passed in. +func (p *Pacer) SetCalculator(c Calculator) { p.mu.Lock() defer p.mu.Unlock() - switch t { - case AmazonCloudDrivePacer: - p.calculatePace = p.acdPacer - case GoogleDrivePacer: - p.calculatePace = p.drivePacer - case S3Pacer: - p.calculatePace = p.s3Pacer - default: - p.calculatePace = p.defaultPacer + if c == nil { + c = NewDefault() } - return p + p.calculator = c +} + +// ModifyCalculator calls the given function with the currently configured +// Calculator and the Pacer lock held. +func (p *Pacer) ModifyCalculator(f func(Calculator)) { + p.mu.Lock() + f(p.calculator) + p.mu.Unlock() } // Start a call to the API @@ -230,170 +163,29 @@ func (p *Pacer) beginCall() { p.mu.Lock() // Restart the timer - go func(sleepTime, minSleep time.Duration) { - // fs.Debugf(f, "New sleep for %v at %v", t, time.Now()) - // Sleep the minimum time with the rate limiter - if minSleep > 0 && sleepTime >= minSleep { - _ = p.limiter.Wait(context.Background()) - sleepTime -= minSleep - } - // Then sleep the remaining time - if sleepTime > 0 { - time.Sleep(sleepTime) - } + go func(t time.Duration) { + time.Sleep(t) p.pacer <- struct{}{} - }(p.sleepTime, p.minSleep) + }(p.state.SleepTime) p.mu.Unlock() } -// exponentialImplementation implements a exponentialImplementation up -// and down pacing algorithm -// -// See the description for DefaultPacer -// -// This should calculate a new sleepTime. It takes a boolean as to -// whether the operation should be retried or not. -// -// Call with p.mu held -func (p *Pacer) defaultPacer(retry bool) { - oldSleepTime := p.sleepTime - if retry { - if p.attackConstant == 0 { - p.sleepTime = p.maxSleep - } else { - p.sleepTime = (p.sleepTime << p.attackConstant) / ((1 << p.attackConstant) - 1) - } - if p.sleepTime > p.maxSleep { - p.sleepTime = p.maxSleep - } - if p.sleepTime != oldSleepTime { - fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime) - } - } else { - p.sleepTime = (p.sleepTime<> p.decayConstant - if p.sleepTime < p.minSleep { - p.sleepTime = p.minSleep - } - if p.sleepTime != oldSleepTime { - fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime) - } - } -} - -// acdPacer implements a truncated exponential backoff -// strategy with randomization for Amazon Drive -// -// See the description for AmazonCloudDrivePacer -// -// This should calculate a new sleepTime. It takes a boolean as to -// whether the operation should be retried or not. -// -// Call with p.mu held -func (p *Pacer) acdPacer(retry bool) { - consecutiveRetries := p.consecutiveRetries - if consecutiveRetries == 0 { - if p.sleepTime != p.minSleep { - p.sleepTime = p.minSleep - fs.Debugf("pacer", "Resetting sleep to minimum %v on success", p.sleepTime) - } - } else { - if consecutiveRetries > 9 { - consecutiveRetries = 9 - } - // consecutiveRetries starts at 1 so - // maxSleep is 2**(consecutiveRetries-1) seconds - maxSleep := time.Second << uint(consecutiveRetries-1) - // actual sleep is random from 0..maxSleep - p.sleepTime = time.Duration(rand.Int63n(int64(maxSleep))) - if p.sleepTime < p.minSleep { - p.sleepTime = p.minSleep - } - fs.Debugf("pacer", "Rate limited, sleeping for %v (%d consecutive low level retries)", p.sleepTime, p.consecutiveRetries) - } -} - -// drivePacer implements a truncated exponential backoff strategy with -// randomization for Google Drive -// -// See the description for GoogleDrivePacer -// -// This should calculate a new sleepTime. It takes a boolean as to -// whether the operation should be retried or not. -// -// Call with p.mu held -func (p *Pacer) drivePacer(retry bool) { - consecutiveRetries := p.consecutiveRetries - if consecutiveRetries == 0 { - if p.sleepTime != p.minSleep { - p.sleepTime = p.minSleep - fs.Debugf("pacer", "Resetting sleep to minimum %v on success", p.sleepTime) - } - } else { - if consecutiveRetries > 5 { - consecutiveRetries = 5 - } - // consecutiveRetries starts at 1 so go from 1,2,3,4,5,5 => 1,2,4,8,16,16 - // maxSleep is 2**(consecutiveRetries-1) seconds + random milliseconds - p.sleepTime = time.Second< p.maxSleep { - p.sleepTime = p.maxSleep - } - if p.sleepTime != oldSleepTime { - fs.Debugf("pacer", "Rate limited, increasing sleep to %v", p.sleepTime) - } - } else { - p.sleepTime = (p.sleepTime<> p.decayConstant - if p.sleepTime < p.minSleep { - p.sleepTime = 0 - } - if p.sleepTime != oldSleepTime { - fs.Debugf("pacer", "Reducing sleep to %v", p.sleepTime) - } - } -} - // endCall implements the pacing algorithm // // This should calculate a new sleepTime. It takes a boolean as to // whether the operation should be retried or not. -func (p *Pacer) endCall(retry bool) { +func (p *Pacer) endCall(retry bool, err error) { if p.maxConnections > 0 { p.connTokens <- struct{}{} } p.mu.Lock() if retry { - p.consecutiveRetries++ + p.state.ConsecutiveRetries++ } else { - p.consecutiveRetries = 0 + p.state.ConsecutiveRetries = 0 } - p.calculatePace(retry) + p.state.LastError = err + p.state.SleepTime = p.calculator.Calculate(p.state) p.mu.Unlock() } @@ -402,15 +194,11 @@ func (p *Pacer) call(fn Paced, retries int) (err error) { var retry bool for i := 1; i <= retries; i++ { p.beginCall() - retry, err = fn() - p.endCall(retry) + retry, err = p.invoker(i, retries, fn) + p.endCall(retry, err) if !retry { break } - fs.Debugf("pacer", "low level retry %d/%d (error %v)", i, retries, err) - } - if retry { - err = fserrors.RetryError(err) } return err } @@ -436,3 +224,41 @@ func (p *Pacer) Call(fn Paced) (err error) { func (p *Pacer) CallNoRetry(fn Paced) error { return p.call(fn, 1) } + +func invoke(try, tries int, f Paced) (bool, error) { + return f() +} + +type retryAfterError struct { + error + retryAfter time.Duration +} + +func (r *retryAfterError) Error() string { + return r.error.Error() +} + +func (r *retryAfterError) Cause() error { + return r.error +} + +// RetryAfterError returns a wrapped error that can be used by Calculator implementations +func RetryAfterError(err error, retryAfter time.Duration) error { + return &retryAfterError{ + error: err, + retryAfter: retryAfter, + } +} + +// IsRetryAfter returns true if the the error or any of it's Cause's is an error +// returned by RetryAfterError. It also returns the associated Duration if possible. +func IsRetryAfter(err error) (retryAfter time.Duration, isRetryAfter bool) { + errors.Walk(err, func(err error) bool { + if r, ok := err.(*retryAfterError); ok { + retryAfter, isRetryAfter = r.retryAfter, true + return true + } + return false + }) + return +} diff --git a/lib/pacer/pacer_test.go b/lib/pacer/pacer_test.go index 1b2793f7a..4a7646530 100644 --- a/lib/pacer/pacer_test.go +++ b/lib/pacer/pacer_test.go @@ -1,181 +1,85 @@ package pacer import ( - "fmt" + "sync" "testing" "time" - "github.com/ncw/rclone/fs" - "github.com/ncw/rclone/fs/fserrors" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) func TestNew(t *testing.T) { const expectedRetries = 7 - fs.Config.LowLevelRetries = expectedRetries - p := New() - if p.minSleep != 10*time.Millisecond { - t.Errorf("minSleep") - } - if p.maxSleep != 2*time.Second { - t.Errorf("maxSleep") - } - if p.sleepTime != p.minSleep { - t.Errorf("sleepTime") - } - if p.retries != expectedRetries { - t.Errorf("retries want %v got %v", expectedRetries, p.retries) - } - if p.decayConstant != 2 { - t.Errorf("decayConstant") - } - if p.attackConstant != 1 { - t.Errorf("attackConstant") - } - if cap(p.pacer) != 1 { - t.Errorf("pacer 1") - } - if len(p.pacer) != 1 { - t.Errorf("pacer 2") - } - if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) { - t.Errorf("calculatePace") - } - if p.maxConnections != fs.Config.Checkers+fs.Config.Transfers { - t.Errorf("maxConnections") - } - if cap(p.connTokens) != fs.Config.Checkers+fs.Config.Transfers { - t.Errorf("connTokens") - } - if p.consecutiveRetries != 0 { - t.Errorf("consecutiveRetries") - } -} - -func TestSetSleep(t *testing.T) { - p := New().SetSleep(2 * time.Millisecond) - if p.sleepTime != 2*time.Millisecond { - t.Errorf("didn't set") - } -} - -func TestGetSleep(t *testing.T) { - p := New().SetSleep(2 * time.Millisecond) - if p.GetSleep() != 2*time.Millisecond { - t.Errorf("didn't get") - } -} - -func TestSetMinSleep(t *testing.T) { - p := New().SetMinSleep(1 * time.Millisecond) - if p.minSleep != 1*time.Millisecond { - t.Errorf("didn't set") - } -} - -func TestSetMaxSleep(t *testing.T) { - p := New().SetMaxSleep(100 * time.Second) - if p.maxSleep != 100*time.Second { - t.Errorf("didn't set") + const expectedConnections = 9 + p := New(RetriesOption(expectedRetries), MaxConnectionsOption(expectedConnections)) + if d, ok := p.calculator.(*Default); ok { + assert.Equal(t, 10*time.Millisecond, d.minSleep) + assert.Equal(t, 2*time.Second, d.maxSleep) + assert.Equal(t, d.minSleep, p.state.SleepTime) + assert.Equal(t, uint(2), d.decayConstant) + assert.Equal(t, uint(1), d.attackConstant) + } else { + t.Errorf("calculator") } + assert.Equal(t, expectedRetries, p.retries) + assert.Equal(t, 1, cap(p.pacer)) + assert.Equal(t, 1, len(p.pacer)) + assert.Equal(t, expectedConnections, p.maxConnections) + assert.Equal(t, expectedConnections, cap(p.connTokens)) + assert.Equal(t, 0, p.state.ConsecutiveRetries) } func TestMaxConnections(t *testing.T) { - p := New().SetMaxConnections(20) - if p.maxConnections != 20 { - t.Errorf("maxConnections") - } - if cap(p.connTokens) != 20 { - t.Errorf("connTokens") - } + p := New() + p.SetMaxConnections(20) + assert.Equal(t, 20, p.maxConnections) + assert.Equal(t, 20, cap(p.connTokens)) p.SetMaxConnections(0) - if p.maxConnections != 0 { - t.Errorf("maxConnections is not 0") - } - if p.connTokens != nil { - t.Errorf("connTokens is not nil") - } -} - -func TestSetDecayConstant(t *testing.T) { - p := New().SetDecayConstant(17) - if p.decayConstant != 17 { - t.Errorf("didn't set") - } + assert.Equal(t, 0, p.maxConnections) + assert.Nil(t, p.connTokens) } func TestDecay(t *testing.T) { - p := New().SetMinSleep(time.Microsecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second) + c := NewDefault(MinSleep(1*time.Microsecond), MaxSleep(1*time.Second)) for _, test := range []struct { - in time.Duration + in State attackConstant uint want time.Duration }{ - {8 * time.Millisecond, 1, 4 * time.Millisecond}, - {1 * time.Millisecond, 0, time.Microsecond}, - {1 * time.Millisecond, 2, (3 * time.Millisecond) / 4}, - {1 * time.Millisecond, 3, (7 * time.Millisecond) / 8}, + {State{SleepTime: 8 * time.Millisecond}, 1, 4 * time.Millisecond}, + {State{SleepTime: 1 * time.Millisecond}, 0, 1 * time.Microsecond}, + {State{SleepTime: 1 * time.Millisecond}, 2, (3 * time.Millisecond) / 4}, + {State{SleepTime: 1 * time.Millisecond}, 3, (7 * time.Millisecond) / 8}, } { - p.sleepTime = test.in - p.SetDecayConstant(test.attackConstant) - p.defaultPacer(false) - got := p.sleepTime - if got != test.want { - t.Errorf("bad sleep want %v got %v", test.want, got) - } - } -} - -func TestSetAttackConstant(t *testing.T) { - p := New().SetAttackConstant(19) - if p.attackConstant != 19 { - t.Errorf("didn't set") + c.decayConstant = test.attackConstant + got := c.Calculate(test.in) + assert.Equal(t, test.want, got, "test: %+v", test) } } func TestAttack(t *testing.T) { - p := New().SetMinSleep(time.Microsecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second) + c := NewDefault(MinSleep(1*time.Microsecond), MaxSleep(1*time.Second)) for _, test := range []struct { - in time.Duration + in State attackConstant uint want time.Duration }{ - {1 * time.Millisecond, 1, 2 * time.Millisecond}, - {1 * time.Millisecond, 0, time.Second}, - {1 * time.Millisecond, 2, (4 * time.Millisecond) / 3}, - {1 * time.Millisecond, 3, (8 * time.Millisecond) / 7}, + {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 1, 2 * time.Millisecond}, + {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 0, 1 * time.Second}, + {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 2, (4 * time.Millisecond) / 3}, + {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 3, (8 * time.Millisecond) / 7}, } { - p.sleepTime = test.in - p.SetAttackConstant(test.attackConstant) - p.defaultPacer(true) - got := p.sleepTime - if got != test.want { - t.Errorf("bad sleep want %v got %v", test.want, got) - } + c.attackConstant = test.attackConstant + got := c.Calculate(test.in) + assert.Equal(t, test.want, got, "test: %+v", test) } - } func TestSetRetries(t *testing.T) { - p := New().SetRetries(18) - if p.retries != 18 { - t.Errorf("didn't set") - } -} - -func TestSetPacer(t *testing.T) { - p := New().SetPacer(AmazonCloudDrivePacer) - if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.acdPacer) { - t.Errorf("calculatePace is not acdPacer") - } - p.SetPacer(GoogleDrivePacer) - if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.drivePacer) { - t.Errorf("calculatePace is not drivePacer") - } - p.SetPacer(DefaultPacer) - if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) { - t.Errorf("calculatePace is not defaultPacer") - } + p := New() + p.SetRetries(18) + assert.Equal(t, 18, p.retries) } // emptyTokens empties the pacer of all its tokens @@ -200,7 +104,7 @@ func waitForPace(p *Pacer, duration time.Duration) (when time.Time) { } func TestBeginCall(t *testing.T) { - p := New().SetMaxConnections(10).SetMinSleep(1 * time.Millisecond) + p := New(MaxConnectionsOption(10), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond)))) emptyTokens(p) go p.beginCall() if !waitForPace(p, 10*time.Millisecond).IsZero() { @@ -223,7 +127,7 @@ func TestBeginCall(t *testing.T) { } func TestBeginCallZeroConnections(t *testing.T) { - p := New().SetMaxConnections(0).SetMinSleep(1 * time.Millisecond) + p := New(MaxConnectionsOption(0), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond)))) emptyTokens(p) go p.beginCall() if !waitForPace(p, 10*time.Millisecond).IsZero() { @@ -241,155 +145,144 @@ func TestBeginCallZeroConnections(t *testing.T) { } func TestDefaultPacer(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second).SetDecayConstant(2) + c := NewDefault(MinSleep(1*time.Millisecond), MaxSleep(1*time.Second), DecayConstant(2)) for _, test := range []struct { - in time.Duration - retry bool + state State want time.Duration }{ - {time.Millisecond, true, 2 * time.Millisecond}, - {time.Second, true, time.Second}, - {(3 * time.Second) / 4, true, time.Second}, - {time.Second, false, 750 * time.Millisecond}, - {1000 * time.Microsecond, false, time.Millisecond}, - {1200 * time.Microsecond, false, time.Millisecond}, + {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 1}, 2 * time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1 * time.Second}, + {State{SleepTime: (3 * time.Second) / 4, ConsecutiveRetries: 1}, 1 * time.Second}, + {State{SleepTime: 1 * time.Second}, 750 * time.Millisecond}, + {State{SleepTime: 1000 * time.Microsecond}, 1 * time.Millisecond}, + {State{SleepTime: 1200 * time.Microsecond}, 1 * time.Millisecond}, } { - p.sleepTime = test.in - p.defaultPacer(test.retry) - got := p.sleepTime - if got != test.want { - t.Errorf("bad sleep want %v got %v", test.want, got) - } + got := c.Calculate(test.state) + assert.Equal(t, test.want, got, "test: %+v", test) } } func TestAmazonCloudDrivePacer(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetPacer(AmazonCloudDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2) + c := NewAmazonCloudDrive(MinSleep(1 * time.Millisecond)) // Do lots of times because of the random number! for _, test := range []struct { - in time.Duration - consecutiveRetries int - retry bool - want time.Duration + state State + want time.Duration }{ - {time.Millisecond, 0, true, time.Millisecond}, - {10 * time.Millisecond, 0, true, time.Millisecond}, - {1 * time.Second, 1, true, 500 * time.Millisecond}, - {1 * time.Second, 2, true, 1 * time.Second}, - {1 * time.Second, 3, true, 2 * time.Second}, - {1 * time.Second, 4, true, 4 * time.Second}, - {1 * time.Second, 5, true, 8 * time.Second}, - {1 * time.Second, 6, true, 16 * time.Second}, - {1 * time.Second, 7, true, 32 * time.Second}, - {1 * time.Second, 8, true, 64 * time.Second}, - {1 * time.Second, 9, true, 128 * time.Second}, - {1 * time.Second, 10, true, 128 * time.Second}, - {1 * time.Second, 11, true, 128 * time.Second}, + {State{SleepTime: 1 * time.Millisecond, ConsecutiveRetries: 0}, 1 * time.Millisecond}, + {State{SleepTime: 10 * time.Millisecond, ConsecutiveRetries: 0}, 1 * time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 500 * time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 2}, 1 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 3}, 2 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 4}, 4 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 5}, 8 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 6}, 16 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 7}, 32 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 8}, 64 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 9}, 128 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 10}, 128 * time.Second}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 11}, 128 * time.Second}, } { const n = 1000 var sum time.Duration // measure average time over n cycles for i := 0; i < n; i++ { - p.sleepTime = test.in - p.consecutiveRetries = test.consecutiveRetries - p.acdPacer(test.retry) - sum += p.sleepTime + sum += c.Calculate(test.state) } got := sum / n - //t.Logf("%+v: got = %v", test, got) - if got < (test.want*9)/10 || got > (test.want*11)/10 { - t.Fatalf("%+v: bad sleep want %v+/-10%% got %v", test, test.want, got) - } + assert.False(t, got < (test.want*9)/10 || got > (test.want*11)/10, "test: %+v", test) } } func TestGoogleDrivePacer(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetPacer(GoogleDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2) // Do lots of times because of the random number! for _, test := range []struct { - in time.Duration - consecutiveRetries int - retry bool - want time.Duration + state State + want time.Duration }{ - {time.Millisecond, 0, true, time.Millisecond}, - {10 * time.Millisecond, 0, true, time.Millisecond}, - {1 * time.Second, 1, true, 1*time.Second + 500*time.Millisecond}, - {1 * time.Second, 2, true, 2*time.Second + 500*time.Millisecond}, - {1 * time.Second, 3, true, 4*time.Second + 500*time.Millisecond}, - {1 * time.Second, 4, true, 8*time.Second + 500*time.Millisecond}, - {1 * time.Second, 5, true, 16*time.Second + 500*time.Millisecond}, - {1 * time.Second, 6, true, 16*time.Second + 500*time.Millisecond}, - {1 * time.Second, 7, true, 16*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Millisecond}, 0}, + {State{SleepTime: 10 * time.Millisecond}, 0}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 2}, 2*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 3}, 4*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 4}, 8*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 5}, 16*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 6}, 16*time.Second + 500*time.Millisecond}, + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 7}, 16*time.Second + 500*time.Millisecond}, } { const n = 1000 var sum time.Duration // measure average time over n cycles for i := 0; i < n; i++ { - p.sleepTime = test.in - p.consecutiveRetries = test.consecutiveRetries - p.drivePacer(test.retry) - sum += p.sleepTime + c := NewGoogleDrive(MinSleep(1 * time.Millisecond)) + sum += c.Calculate(test.state) } got := sum / n - //t.Logf("%+v: got = %v", test, got) - if got < (test.want*9)/10 || got > (test.want*11)/10 { - t.Fatalf("%+v: bad sleep want %v+/-10%% got %v", test, test.want, got) + assert.False(t, got < (test.want*9)/10 || got > (test.want*11)/10, "test: %+v, got: %v", test, got) + } + + const minSleep = 2 * time.Millisecond + for _, test := range []struct { + calls int + want int + }{ + {1, 0}, + {9, 0}, + {10, 0}, + {11, 1}, + {12, 2}, + } { + c := NewGoogleDrive(MinSleep(minSleep), Burst(10)) + count := 0 + for i := 0; i < test.calls; i++ { + sleep := c.Calculate(State{}) + time.Sleep(sleep) + if sleep != 0 { + count++ + } } + assert.Equalf(t, test.want, count, "test: %+v, got: %v", test, count) } } func TestS3Pacer(t *testing.T) { - p := New().SetMinSleep(10 * time.Millisecond).SetPacer(S3Pacer).SetMaxSleep(time.Second).SetDecayConstant(2) + c := NewS3(MinSleep(10*time.Millisecond), MaxSleep(1*time.Second), DecayConstant(2)) for _, test := range []struct { - in time.Duration - retry bool + state State want time.Duration }{ - {0, true, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep - {10 * time.Millisecond, true, 20 * time.Millisecond}, //Another fail, double the backoff - {10 * time.Millisecond, false, 0}, //Things start going ok when we're at minSleep; should result in no sleep - {12 * time.Millisecond, false, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0 - {0, false, 0}, //Things have been going ok; not retrying should keep sleep at 0 - {time.Second, true, time.Second}, //Check maxSleep is enforced - {(3 * time.Second) / 4, true, time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep - {time.Second, false, 750 * time.Millisecond}, //Check decay from maxSleep - {48 * time.Millisecond, false, 36 * time.Millisecond}, //Check simple decay above minSleep + {State{SleepTime: 0, ConsecutiveRetries: 1}, 10 * time.Millisecond}, //Things were going ok, we failed once, back off to minSleep + {State{SleepTime: 10 * time.Millisecond, ConsecutiveRetries: 1}, 20 * time.Millisecond}, //Another fail, double the backoff + {State{SleepTime: 10 * time.Millisecond}, 0}, //Things start going ok when we're at minSleep; should result in no sleep + {State{SleepTime: 12 * time.Millisecond}, 0}, //*near* minsleep and going ok, decay would take below minSleep, should go to 0 + {State{SleepTime: 0}, 0}, //Things have been going ok; not retrying should keep sleep at 0 + {State{SleepTime: 1 * time.Second, ConsecutiveRetries: 1}, 1 * time.Second}, //Check maxSleep is enforced + {State{SleepTime: (3 * time.Second) / 4, ConsecutiveRetries: 1}, 1 * time.Second}, //Check attack heading to maxSleep doesn't exceed maxSleep + {State{SleepTime: 1 * time.Second}, 750 * time.Millisecond}, //Check decay from maxSleep + {State{SleepTime: 48 * time.Millisecond}, 36 * time.Millisecond}, //Check simple decay above minSleep } { - p.sleepTime = test.in - p.s3Pacer(test.retry) - got := p.sleepTime - if got != test.want { - t.Errorf("bad sleep for %v with retry %v: want %v got %v", test.in, test.retry, test.want, got) - } + got := c.Calculate(test.state) + assert.Equal(t, test.want, got, "test: %+v", test) } } func TestEndCall(t *testing.T) { - p := New().SetMaxConnections(5) + p := New(MaxConnectionsOption(5)) emptyTokens(p) - p.consecutiveRetries = 1 - p.endCall(true) - if len(p.connTokens) != 1 { - t.Errorf("Expecting 1 token") - } - if p.consecutiveRetries != 2 { - t.Errorf("Bad consecutive retries") - } + p.state.ConsecutiveRetries = 1 + p.endCall(true, nil) + assert.Equal(t, 1, len(p.connTokens)) + assert.Equal(t, 2, p.state.ConsecutiveRetries) } func TestEndCallZeroConnections(t *testing.T) { - p := New().SetMaxConnections(0) + p := New(MaxConnectionsOption(0)) emptyTokens(p) - p.consecutiveRetries = 1 - p.endCall(false) - if len(p.connTokens) != 0 { - t.Errorf("Expecting 0 token") - } - if p.consecutiveRetries != 0 { - t.Errorf("Bad consecutive retries") - } + p.state.ConsecutiveRetries = 1 + p.endCall(false, nil) + assert.Equal(t, 0, len(p.connTokens)) + assert.Equal(t, 0, p.state.ConsecutiveRetries) } var errFoo = errors.New("foo") @@ -397,67 +290,79 @@ var errFoo = errors.New("foo") type dummyPaced struct { retry bool called int + wait *sync.Cond } func (dp *dummyPaced) fn() (bool, error) { - dp.called++ + if dp.wait != nil { + dp.wait.L.Lock() + dp.called++ + dp.wait.Wait() + dp.wait.L.Unlock() + } else { + dp.called++ + } return dp.retry, errFoo } -func Test_callNoRetry(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond) +func TestCallFixed(t *testing.T) { + p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond)))) dp := &dummyPaced{retry: false} err := p.call(dp.fn, 10) - if dp.called != 1 { - t.Errorf("called want %d got %d", 1, dp.called) - } - if err != errFoo { - t.Errorf("err want %v got %v", errFoo, err) - } + assert.Equal(t, 1, dp.called) + assert.Equal(t, errFoo, err) } func Test_callRetry(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond) + p := New(CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond)))) dp := &dummyPaced{retry: true} err := p.call(dp.fn, 10) - if dp.called != 10 { - t.Errorf("called want %d got %d", 10, dp.called) - } - if err == errFoo { - t.Errorf("err didn't want %v got %v", errFoo, err) - } - _, ok := err.(fserrors.Retrier) - if !ok { - t.Errorf("didn't return a retry error") - } + assert.Equal(t, 10, dp.called) + assert.Equal(t, errFoo, err) } func TestCall(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20) + p := New(RetriesOption(20), CalculatorOption(NewDefault(MinSleep(1*time.Millisecond), MaxSleep(2*time.Millisecond)))) dp := &dummyPaced{retry: true} err := p.Call(dp.fn) - if dp.called != 20 { - t.Errorf("called want %d got %d", 20, dp.called) - } - _, ok := err.(fserrors.Retrier) - if !ok { - t.Errorf("didn't return a retry error") - } + assert.Equal(t, 20, dp.called) + assert.Equal(t, errFoo, err) } -func TestCallNoRetry(t *testing.T) { - p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20) +func TestCallParallel(t *testing.T) { + p := New(MaxConnectionsOption(3), RetriesOption(1), CalculatorOption(NewDefault(MinSleep(100*time.Microsecond), MaxSleep(1*time.Millisecond)))) - dp := &dummyPaced{retry: true} - err := p.CallNoRetry(dp.fn) - if dp.called != 1 { - t.Errorf("called want %d got %d", 1, dp.called) + wait := sync.NewCond(&sync.Mutex{}) + funcs := make([]*dummyPaced, 5) + for i := range funcs { + dp := &dummyPaced{wait: wait} + funcs[i] = dp + go func() { + assert.Equal(t, errFoo, p.CallNoRetry(dp.fn)) + }() } - _, ok := err.(fserrors.Retrier) - if !ok { - t.Errorf("didn't return a retry error") + time.Sleep(10 * time.Millisecond) + called := 0 + wait.L.Lock() + for _, dp := range funcs { + called += dp.called } + wait.L.Unlock() + + assert.Equal(t, 3, called) + wait.Broadcast() + time.Sleep(20 * time.Millisecond) + + called = 0 + wait.L.Lock() + for _, dp := range funcs { + called += dp.called + } + wait.L.Unlock() + + assert.Equal(t, 5, called) + wait.Broadcast() } diff --git a/lib/pacer/pacers.go b/lib/pacer/pacers.go new file mode 100644 index 000000000..9c42be198 --- /dev/null +++ b/lib/pacer/pacers.go @@ -0,0 +1,326 @@ +package pacer + +import ( + "math/rand" + "time" + + "golang.org/x/time/rate" +) + +type ( + // MinSleep configures the minimum sleep time of a Calculator + MinSleep time.Duration + // MaxSleep configures the maximum sleep time of a Calculator + MaxSleep time.Duration + // DecayConstant configures the decay constant time of a Calculator + DecayConstant uint + // AttackConstant configures the attack constant of a Calculator + AttackConstant uint + // Burst configures the number of API calls to allow without sleeping + Burst int +) + +// Default is a truncated exponential attack and decay. +// +// On retries the sleep time is doubled, on non errors then sleeptime decays +// according to the decay constant as set with SetDecayConstant. +// +// The sleep never goes below that set with SetMinSleep or above that set +// with SetMaxSleep. +type Default struct { + minSleep time.Duration // minimum sleep time + maxSleep time.Duration // maximum sleep time + decayConstant uint // decay constant + attackConstant uint // attack constant +} + +// DefaultOption is the interface implemented by all options for the Default Calculator +type DefaultOption interface { + ApplyDefault(*Default) +} + +// NewDefault creates a Calculator used by Pacer as the default. +func NewDefault(opts ...DefaultOption) *Default { + c := &Default{ + minSleep: 10 * time.Millisecond, + maxSleep: 2 * time.Second, + decayConstant: 2, + attackConstant: 1, + } + c.Update(opts...) + return c +} + +// Update applies the Calculator options. +func (c *Default) Update(opts ...DefaultOption) { + for _, opt := range opts { + opt.ApplyDefault(c) + } +} + +// ApplyDefault updates the value on the Calculator +func (o MinSleep) ApplyDefault(c *Default) { + c.minSleep = time.Duration(o) +} + +// ApplyDefault updates the value on the Calculator +func (o MaxSleep) ApplyDefault(c *Default) { + c.maxSleep = time.Duration(o) +} + +// ApplyDefault updates the value on the Calculator +func (o DecayConstant) ApplyDefault(c *Default) { + c.decayConstant = uint(o) +} + +// ApplyDefault updates the value on the Calculator +func (o AttackConstant) ApplyDefault(c *Default) { + c.attackConstant = uint(o) +} + +// Calculate takes the current Pacer state and return the wait time until the next try. +func (c *Default) Calculate(state State) time.Duration { + if t, ok := IsRetryAfter(state.LastError); ok { + if t < c.minSleep { + return c.minSleep + } + return t + } + + if state.ConsecutiveRetries > 0 { + sleepTime := c.maxSleep + if c.attackConstant != 0 { + sleepTime = (state.SleepTime << c.attackConstant) / ((1 << c.attackConstant) - 1) + } + if sleepTime > c.maxSleep { + sleepTime = c.maxSleep + } + return sleepTime + } + sleepTime := (state.SleepTime<> c.decayConstant + if sleepTime < c.minSleep { + sleepTime = c.minSleep + } + return sleepTime +} + +// AmazonCloudDrive is a specialized pacer for Amazon Drive +// +// It implements a truncated exponential backoff strategy with randomization. +// Normally operations are paced at the interval set with SetMinSleep. On errors +// the sleep timer is set to 0..2**retries seconds. +// +// See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices +type AmazonCloudDrive struct { + minSleep time.Duration // minimum sleep time +} + +// AmazonCloudDriveOption is the interface implemented by all options for the AmazonCloudDrive Calculator +type AmazonCloudDriveOption interface { + ApplyAmazonCloudDrive(*AmazonCloudDrive) +} + +// NewAmazonCloudDrive returns a new AmazonCloudDrive Calculator with default values +func NewAmazonCloudDrive(opts ...AmazonCloudDriveOption) *AmazonCloudDrive { + c := &AmazonCloudDrive{ + minSleep: 10 * time.Millisecond, + } + c.Update(opts...) + return c +} + +// Update applies the Calculator options. +func (c *AmazonCloudDrive) Update(opts ...AmazonCloudDriveOption) { + for _, opt := range opts { + opt.ApplyAmazonCloudDrive(c) + } +} + +// ApplyAmazonCloudDrive updates the value on the Calculator +func (o MinSleep) ApplyAmazonCloudDrive(c *AmazonCloudDrive) { + c.minSleep = time.Duration(o) +} + +// Calculate takes the current Pacer state and return the wait time until the next try. +func (c *AmazonCloudDrive) Calculate(state State) time.Duration { + if t, ok := IsRetryAfter(state.LastError); ok { + if t < c.minSleep { + return c.minSleep + } + return t + } + + consecutiveRetries := state.ConsecutiveRetries + if consecutiveRetries == 0 { + return c.minSleep + } + if consecutiveRetries > 9 { + consecutiveRetries = 9 + } + // consecutiveRetries starts at 1 so + // maxSleep is 2**(consecutiveRetries-1) seconds + maxSleep := time.Second << uint(consecutiveRetries-1) + // actual sleep is random from 0..maxSleep + sleepTime := time.Duration(rand.Int63n(int64(maxSleep))) + if sleepTime < c.minSleep { + sleepTime = c.minSleep + } + return sleepTime +} + +// GoogleDrive is a specialized pacer for Google Drive +// +// It implements a truncated exponential backoff strategy with randomization. +// Normally operations are paced at the interval set with SetMinSleep. On errors +// the sleep timer is set to (2 ^ n) + random_number_milliseconds seconds. +// +// See https://developers.google.com/drive/v2/web/handle-errors#exponential-backoff +type GoogleDrive struct { + minSleep time.Duration // minimum sleep time + burst int // number of requests without sleeping + limiter *rate.Limiter // rate limiter for the minSleep +} + +// GoogleDriveOption is the interface implemented by all options for the GoogleDrive Calculator +type GoogleDriveOption interface { + ApplyGoogleDrive(*GoogleDrive) +} + +// NewGoogleDrive returns a new GoogleDrive Calculator with default values +func NewGoogleDrive(opts ...GoogleDriveOption) *GoogleDrive { + c := &GoogleDrive{ + minSleep: 10 * time.Millisecond, + burst: 1, + } + c.Update(opts...) + return c +} + +// Update applies the Calculator options. +func (c *GoogleDrive) Update(opts ...GoogleDriveOption) { + for _, opt := range opts { + opt.ApplyGoogleDrive(c) + } + if c.burst <= 0 { + c.burst = 1 + } + c.limiter = rate.NewLimiter(rate.Every(c.minSleep), c.burst) +} + +// ApplyGoogleDrive updates the value on the Calculator +func (o MinSleep) ApplyGoogleDrive(c *GoogleDrive) { + c.minSleep = time.Duration(o) +} + +// ApplyGoogleDrive updates the value on the Calculator +func (o Burst) ApplyGoogleDrive(c *GoogleDrive) { + c.burst = int(o) +} + +// Calculate takes the current Pacer state and return the wait time until the next try. +func (c *GoogleDrive) Calculate(state State) time.Duration { + if t, ok := IsRetryAfter(state.LastError); ok { + if t < c.minSleep { + return c.minSleep + } + return t + } + + consecutiveRetries := state.ConsecutiveRetries + if consecutiveRetries == 0 { + return c.limiter.Reserve().Delay() + } + if consecutiveRetries > 5 { + consecutiveRetries = 5 + } + // consecutiveRetries starts at 1 so go from 1,2,3,4,5,5 => 1,2,4,8,16,16 + // maxSleep is 2**(consecutiveRetries-1) seconds + random milliseconds + return time.Second< 0 { + if c.attackConstant == 0 { + return c.maxSleep + } + if state.SleepTime == 0 { + return c.minSleep + } + sleepTime := (state.SleepTime << c.attackConstant) / ((1 << c.attackConstant) - 1) + if sleepTime > c.maxSleep { + sleepTime = c.maxSleep + } + return sleepTime + } + sleepTime := (state.SleepTime<> c.decayConstant + if sleepTime < c.minSleep { + sleepTime = 0 + } + return sleepTime +}