diff --git a/lib/pacer/pacer.go b/lib/pacer/pacer.go index 8d94e5fe8..6322ae86e 100644 --- a/lib/pacer/pacer.go +++ b/lib/pacer/pacer.go @@ -2,12 +2,14 @@ package pacer import ( + "context" "math/rand" "sync" "time" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/fserrors" + "golang.org/x/time/rate" ) // Pacer state @@ -15,6 +17,8 @@ type Pacer struct { mu sync.Mutex // Protecting read/writes minSleep time.Duration // minimum sleep time maxSleep time.Duration // maximum sleep time + burst int // number of calls to send without rate limiting + limiter *rate.Limiter // rate limiter for the minsleep decayConstant uint // decay constant attackConstant uint // attack constant pacer chan struct{} // To pace the operations @@ -76,7 +80,6 @@ type Paced func() (bool, error) // New returns a Pacer with sensible defaults func New() *Pacer { p := &Pacer{ - minSleep: 10 * time.Millisecond, maxSleep: 2 * time.Second, decayConstant: 2, attackConstant: 1, @@ -86,6 +89,7 @@ func New() *Pacer { p.sleepTime = p.minSleep p.SetPacer(DefaultPacer) p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers) + p.SetMinSleep(10 * time.Millisecond) // Put the first pacing token in p.pacer <- struct{}{} @@ -114,6 +118,16 @@ func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { defer p.mu.Unlock() p.minSleep = t p.sleepTime = p.minSleep + p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) + return p +} + +// SetBurst sets the burst with no limiting of the pacer +func (p *Pacer) SetBurst(n int) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() + p.burst = n + p.limiter = rate.NewLimiter(rate.Every(p.minSleep), p.burst) return p } @@ -216,11 +230,19 @@ func (p *Pacer) beginCall() { p.mu.Lock() // Restart the timer - go func(t time.Duration) { + go func(sleepTime, minSleep time.Duration) { // fs.Debugf(f, "New sleep for %v at %v", t, time.Now()) - time.Sleep(t) + // Sleep the minimum time with the rate limiter + if minSleep > 0 && sleepTime >= minSleep { + p.limiter.Wait(context.Background()) + sleepTime -= minSleep + } + // Then sleep the remaining time + if sleepTime > 0 { + time.Sleep(sleepTime) + } p.pacer <- struct{}{} - }(p.sleepTime) + }(p.sleepTime, p.minSleep) p.mu.Unlock() }