From 3b6df718380c4e4c0d31c2503ee8c9009dda2f24 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 4 Jul 2020 17:20:54 +0100 Subject: [PATCH] accounting: refactor bwlimit code to allow for multiple slots --- fs/accounting/accounting.go | 7 +- fs/accounting/accounting_other.go | 2 +- fs/accounting/accounting_unix.go | 12 +- fs/accounting/token_bucket.go | 184 +++++++++++++++++------------ fs/accounting/token_bucket_test.go | 4 +- fs/config/config.go | 4 +- fs/sync/sync_test.go | 4 +- 7 files changed, 127 insertions(+), 90 deletions(-) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 105c72a20..7d3b6bfcc 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -10,7 +10,6 @@ import ( "unicode/utf8" "github.com/rclone/rclone/fs/rc" - "golang.org/x/time/rate" "github.com/pkg/errors" "github.com/rclone/rclone/fs" @@ -50,7 +49,7 @@ type Account struct { exit chan struct{} // channel that will be closed when transfer is finished withBuf bool // is using a buffered in - tokenBucket *rate.Limiter // per file bandwidth limiter (may be nil) + tokenBucket buckets // per file bandwidth limiter (may be nil) values accountValues } @@ -289,7 +288,7 @@ func (acc *Account) DryRun(n int64) { // Account for n bytes from the current file bandwidth limit (if any) func (acc *Account) limitPerFileBandwidth(n int) { acc.values.mu.Lock() - tokenBucket := acc.tokenBucket + tokenBucket := acc.tokenBucket[TokenBucketSlotAccounting] acc.values.mu.Unlock() if tokenBucket != nil { @@ -310,7 +309,7 @@ func (acc *Account) accountRead(n int) { acc.stats.Bytes(int64(n)) - limitBandwidth(n) + TokenBucket.LimitBandwidth(TokenBucketSlotAccounting, n) acc.limitPerFileBandwidth(n) } diff --git a/fs/accounting/accounting_other.go b/fs/accounting/accounting_other.go index 9a9cb2975..294677a3c 100644 --- a/fs/accounting/accounting_other.go +++ b/fs/accounting/accounting_other.go @@ -7,4 +7,4 @@ package accounting // startSignalHandler() is Unix specific and does nothing under non-Unix // platforms. -func startSignalHandler() {} +func (tb *tokenBucket) startSignalHandler() {} diff --git a/fs/accounting/accounting_unix.go b/fs/accounting/accounting_unix.go index 8165363df..a59fe83e1 100644 --- a/fs/accounting/accounting_unix.go +++ b/fs/accounting/accounting_unix.go @@ -14,7 +14,7 @@ import ( ) // startSignalHandler() sets a signal handler to catch SIGUSR2 and toggle throttling. -func startSignalHandler() { +func (tb *tokenBucket) startSignalHandler() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGUSR2) @@ -22,14 +22,14 @@ func startSignalHandler() { // This runs forever, but blocks until the signal is received. for { <-signals - tokenBucketMu.Lock() - bwLimitToggledOff = !bwLimitToggledOff - tokenBucket, prevTokenBucket = prevTokenBucket, tokenBucket + tb.mu.Lock() + tb.toggledOff = !tb.toggledOff + tb.curr, tb.prev = tb.prev, tb.curr s := "disabled" - if tokenBucket != nil { + if !tb.curr._isOff() { s = "enabled" } - tokenBucketMu.Unlock() + tb.mu.Unlock() fs.Logf(nil, "Bandwidth limit %s by user", s) } }() diff --git a/fs/accounting/token_bucket.go b/fs/accounting/token_bucket.go index 547bfc6ad..5b9ff03e3 100644 --- a/fs/accounting/token_bucket.go +++ b/fs/accounting/token_bucket.go @@ -11,48 +11,79 @@ import ( "golang.org/x/time/rate" ) -// Globals -var ( - tokenBucketMu sync.Mutex // protects the token bucket variables - tokenBucket *rate.Limiter - prevTokenBucket = tokenBucket - bwLimitToggledOff = false - currLimitMu sync.Mutex // protects changes to the timeslot - currLimit fs.BwTimeSlot +// TokenBucket holds the global token bucket limiter +var TokenBucket tokenBucket + +// TokenBucketSlot is the type to select which token bucket to use +type TokenBucketSlot int + +// Slots for the token bucket +const ( + TokenBucketSlotAccounting TokenBucketSlot = iota + TokenBucketSlots ) +type buckets [TokenBucketSlots]*rate.Limiter + +// tokenBucket holds info about the rate limiters in use +type tokenBucket struct { + mu sync.RWMutex // protects the token bucket variables + curr buckets + prev buckets + toggledOff bool + currLimitMu sync.Mutex // protects changes to the timeslot + currLimit fs.BwTimeSlot +} + +// Return true if limit is disabled +// +// Call with lock held +func (bs *buckets) _isOff() bool { + return bs[0] == nil +} + +// Disable the limits +// +// Call with lock held +func (bs *buckets) _setOff() { + for i := range bs { + bs[i] = nil + } +} + const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request // make a new empty token bucket with the bandwidth given -func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter { - newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize) - // empty the bucket - err := newTokenBucket.WaitN(context.Background(), maxBurstSize) - if err != nil { - fs.Errorf(nil, "Failed to empty token bucket: %v", err) +func newTokenBucket(bandwidth fs.SizeSuffix) (newTokenBucket buckets) { + for i := range newTokenBucket { + newTokenBucket[i] = rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize) + // empty the bucket + err := newTokenBucket[i].WaitN(context.Background(), maxBurstSize) + if err != nil { + fs.Errorf(nil, "Failed to empty token bucket: %v", err) + } } return newTokenBucket } // StartTokenBucket starts the token bucket if necessary -func StartTokenBucket(ctx context.Context) { +func (tb *tokenBucket) StartTokenBucket(ctx context.Context) { + tb.mu.Lock() + defer tb.mu.Unlock() ci := fs.GetConfig(ctx) - currLimitMu.Lock() - currLimit := ci.BwLimit.LimitAt(time.Now()) - currLimitMu.Unlock() - - if currLimit.Bandwidth > 0 { - tokenBucket = newTokenBucket(currLimit.Bandwidth) - fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth) + tb.currLimit = ci.BwLimit.LimitAt(time.Now()) + if tb.currLimit.Bandwidth > 0 { + tb.curr = newTokenBucket(tb.currLimit.Bandwidth) + fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &tb.currLimit.Bandwidth) // Start the SIGUSR2 signal handler to toggle bandwidth. // This function does nothing in windows systems. - startSignalHandler() + tb.startSignalHandler() } } // StartTokenTicker creates a ticker to update the bandwidth limiter every minute. -func StartTokenTicker(ctx context.Context) { +func (tb *tokenBucket) StartTokenTicker(ctx context.Context) { ci := fs.GetConfig(ctx) // If the timetable has a single entry or was not specified, we don't need // a ticker to update the bandwidth. @@ -64,102 +95,109 @@ func StartTokenTicker(ctx context.Context) { go func() { for range ticker.C { limitNow := ci.BwLimit.LimitAt(time.Now()) - currLimitMu.Lock() + tb.currLimitMu.Lock() - if currLimit.Bandwidth != limitNow.Bandwidth { - tokenBucketMu.Lock() + if tb.currLimit.Bandwidth != limitNow.Bandwidth { + tb.mu.Lock() // If bwlimit is toggled off, the change should only // become active on the next toggle, which causes - // an exchange of tokenBucket <-> prevTokenBucket - var targetBucket **rate.Limiter - if bwLimitToggledOff { - targetBucket = &prevTokenBucket + // an exchange of tb.curr <-> tb.prev + var targetBucket *buckets + if tb.toggledOff { + targetBucket = &tb.prev } else { - targetBucket = &tokenBucket + targetBucket = &tb.curr } // Set new bandwidth. If unlimited, set tokenbucket to nil. if limitNow.Bandwidth > 0 { *targetBucket = newTokenBucket(limitNow.Bandwidth) - if bwLimitToggledOff { + if tb.toggledOff { fs.Logf(nil, "Scheduled bandwidth change. "+ "Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth) } else { fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth) } } else { - *targetBucket = nil + targetBucket._setOff() fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled") } - currLimit = limitNow - tokenBucketMu.Unlock() + tb.currLimit = limitNow + tb.mu.Unlock() } - currLimitMu.Unlock() + tb.currLimitMu.Unlock() } }() } -// limitBandwidth sleeps for the correct amount of time for the passage +// LimitBandwidth sleeps for the correct amount of time for the passage // of n bytes according to the current bandwidth limit -func limitBandwidth(n int) { - tokenBucketMu.Lock() +func (tb *tokenBucket) LimitBandwidth(i TokenBucketSlot, n int) { + tb.mu.RLock() // Limit the transfer speed if required - if tokenBucket != nil { - err := tokenBucket.WaitN(context.Background(), n) + if !tb.curr._isOff() { + err := tb.curr[i].WaitN(context.Background(), n) if err != nil { fs.Errorf(nil, "Token bucket error: %v", err) } } - tokenBucketMu.Unlock() + tb.mu.RUnlock() } // SetBwLimit sets the current bandwidth limit -func SetBwLimit(bandwidth fs.SizeSuffix) { - tokenBucketMu.Lock() - defer tokenBucketMu.Unlock() +func (tb *tokenBucket) SetBwLimit(bandwidth fs.SizeSuffix) { + tb.mu.Lock() + defer tb.mu.Unlock() if bandwidth > 0 { - tokenBucket = newTokenBucket(bandwidth) + tb.curr = newTokenBucket(bandwidth) fs.Logf(nil, "Bandwidth limit set to %v", bandwidth) } else { - tokenBucket = nil + tb.curr._setOff() fs.Logf(nil, "Bandwidth limit reset to unlimited") } } +// read and set the bandwidth limits +func (tb *tokenBucket) rcBwlimit(ctx context.Context, in rc.Params) (out rc.Params, err error) { + if in["rate"] != nil { + bwlimit, err := in.GetString("rate") + if err != nil { + return out, err + } + var bws fs.BwTimetable + err = bws.Set(bwlimit) + if err != nil { + return out, errors.Wrap(err, "bad bwlimit") + } + if len(bws) != 1 { + return out, errors.New("need exactly 1 bandwidth setting") + } + bw := bws[0] + tb.SetBwLimit(bw.Bandwidth) + } + tb.mu.RLock() + bytesPerSecond := int64(-1) + if !tb.curr._isOff() { + bytesPerSecond = int64(tb.curr[0].Limit()) + } + tb.mu.RUnlock() + out = rc.Params{ + "rate": fs.SizeSuffix(bytesPerSecond).String(), + "bytesPerSecond": bytesPerSecond, + } + return out, nil +} + // Remote control for the token bucket func init() { rc.Add(rc.Call{ Path: "core/bwlimit", Fn: func(ctx context.Context, in rc.Params) (out rc.Params, err error) { - if in["rate"] != nil { - bwlimit, err := in.GetString("rate") - if err != nil { - return out, err - } - var bws fs.BwTimetable - err = bws.Set(bwlimit) - if err != nil { - return out, errors.Wrap(err, "bad bwlimit") - } - if len(bws) != 1 { - return out, errors.New("need exactly 1 bandwidth setting") - } - bw := bws[0] - SetBwLimit(bw.Bandwidth) - } - bytesPerSecond := int64(-1) - if tokenBucket != nil { - bytesPerSecond = int64(tokenBucket.Limit()) - } - out = rc.Params{ - "rate": fs.SizeSuffix(bytesPerSecond).String(), - "bytesPerSecond": bytesPerSecond, - } - return out, nil + return TokenBucket.rcBwlimit(ctx, in) }, Title: "Set the bandwidth limit.", Help: ` diff --git a/fs/accounting/token_bucket_test.go b/fs/accounting/token_bucket_test.go index b2997fbfe..f7e5e9095 100644 --- a/fs/accounting/token_bucket_test.go +++ b/fs/accounting/token_bucket_test.go @@ -24,7 +24,7 @@ func TestRcBwLimit(t *testing.T) { "bytesPerSecond": int64(1048576), "rate": "1M", }, out) - assert.Equal(t, rate.Limit(1048576), tokenBucket.Limit()) + assert.Equal(t, rate.Limit(1048576), TokenBucket.curr[0].Limit()) // Query in = rc.Params{} @@ -45,7 +45,7 @@ func TestRcBwLimit(t *testing.T) { "bytesPerSecond": int64(-1), "rate": "off", }, out) - assert.Nil(t, tokenBucket) + assert.Nil(t, TokenBucket.curr[0]) // Query in = rc.Params{} diff --git a/fs/config/config.go b/fs/config/config.go index 7ad367d07..5eb02f6c0 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -229,10 +229,10 @@ func LoadConfig(ctx context.Context) { } // Start the token bucket limiter - accounting.StartTokenBucket(ctx) + accounting.TokenBucket.StartTokenBucket(ctx) // Start the bandwidth update ticker - accounting.StartTokenTicker(ctx) + accounting.TokenBucket.StartTokenTicker(ctx) // Start the transactions per second limiter accounting.StartLimitTPS(ctx) diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index 92bed0d7a..f691e9af6 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -1079,13 +1079,13 @@ func TestSyncWithMaxDuration(t *testing.T) { maxDuration := 250 * time.Millisecond ci.MaxDuration = maxDuration bytesPerSecond := 300 - accounting.SetBwLimit(fs.SizeSuffix(bytesPerSecond)) + accounting.TokenBucket.SetBwLimit(fs.SizeSuffix(bytesPerSecond)) oldTransfers := ci.Transfers ci.Transfers = 1 defer func() { ci.MaxDuration = 0 // reset back to default ci.Transfers = oldTransfers - accounting.SetBwLimit(fs.SizeSuffix(0)) + accounting.TokenBucket.SetBwLimit(fs.SizeSuffix(0)) }() // 5 files of 60 bytes at 60 bytes/s 5 seconds