accounting: add context.Context #3257 #4685

This commit is contained in:
Nick Craig-Wood 2020-11-05 16:59:59 +00:00
parent e3fe31f7cb
commit 1fb6ad700f
29 changed files with 138 additions and 109 deletions

View File

@ -19,7 +19,7 @@ var (
)
func prepare(t *testing.T, root string) {
config.LoadConfig()
config.LoadConfig(context.Background())
// Configure the remote
config.FileSet(remoteName, "type", "alias")

View File

@ -1182,7 +1182,7 @@ func (f *Fs) purge(ctx context.Context, dir string, oldOnly bool) error {
tr := accounting.Stats(ctx).NewCheckingTransfer(oi)
err = f.deleteByID(ctx, object.ID, object.Name)
checkErr(err)
tr.Done(err)
tr.Done(ctx, err)
}
}()
}
@ -1210,7 +1210,7 @@ func (f *Fs) purge(ctx context.Context, dir string, oldOnly bool) error {
toBeDeleted <- object
}
last = remote
tr.Done(nil)
tr.Done(ctx, nil)
}
return nil
}))

View File

@ -47,7 +47,7 @@ func prepareServer(t *testing.T) (configmap.Simple, func()) {
ts := httptest.NewServer(handler)
// Configure the remote
config.LoadConfig()
config.LoadConfig(context.Background())
// fs.Config.LogLevel = fs.LogLevelDebug
// fs.Config.DumpHeaders = true
// fs.Config.DumpBodies = true

View File

@ -41,7 +41,7 @@ func startServer(t *testing.T, f fs.Fs) {
}
func TestInit(t *testing.T) {
config.LoadConfig()
config.LoadConfig(context.Background())
f, err := fs.NewFs(context.Background(), "testdata/files")
l, _ := f.List(context.Background(), "")

View File

@ -294,7 +294,7 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er
// Account the transfer
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size())
defer func() {
tr.Done(err)
tr.Done(d.s.ctx, err)
}()
for _, file := range dirEntries {
@ -392,7 +392,7 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose
// Account the transfer
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size())
defer tr.Done(nil)
defer tr.Done(d.s.ctx, nil)
return node.Size(), handle, nil
}

View File

@ -206,7 +206,7 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string
// Account the transfer
tr := accounting.Stats(r.Context()).NewTransfer(obj)
defer tr.Done(nil)
defer tr.Done(r.Context(), nil)
// FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer
// Serve the file

View File

@ -60,7 +60,7 @@ var (
func TestInit(t *testing.T) {
// Configure the remote
config.LoadConfig()
config.LoadConfig(context.Background())
// fs.Config.LogLevel = fs.LogLevelDebug
// fs.Config.DumpHeaders = true
// fs.Config.DumpBodies = true

View File

@ -225,7 +225,7 @@ const (
func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) {
// Account the transfer
tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1)
defer tr.Done(nil)
defer tr.Done(r.Context(), nil)
fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr)

View File

@ -77,7 +77,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) {
}
tr := accounting.Stats(r.Context()).NewTransfer(o)
defer func() {
tr.Done(err)
tr.Done(r.Context(), err)
}()
in := tr.Account(r.Context(), file) // account the transfer (no buffering)

View File

@ -28,8 +28,9 @@ var (
)
func TestNewAccountSizeName(t *testing.T) {
ctx := context.Background()
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
stats := NewStats()
stats := NewStats(ctx)
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
assert.Equal(t, in, acc.in)
assert.Equal(t, acc, stats.inProgress.get("test"))
@ -42,10 +43,11 @@ func TestNewAccountSizeName(t *testing.T) {
}
func TestAccountWithBuffer(t *testing.T) {
ctx := context.Background()
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, -1, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, -1, "test")
assert.False(t, acc.HasBuffer())
acc.WithBuffer()
assert.True(t, acc.HasBuffer())
@ -54,7 +56,7 @@ func TestAccountWithBuffer(t *testing.T) {
require.True(t, ok)
assert.NoError(t, acc.Close())
acc = newAccountSizeName(context.Background(), stats, in, 1, "test")
acc = newAccountSizeName(ctx, stats, in, 1, "test")
acc.WithBuffer()
// should not have a buffer for a small size
_, ok = acc.in.(*asyncreader.AsyncReader)
@ -63,11 +65,12 @@ func TestAccountWithBuffer(t *testing.T) {
}
func TestAccountGetUpdateReader(t *testing.T) {
ctx := context.Background()
test := func(doClose bool) func(t *testing.T) {
return func(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 1, "test")
assert.Equal(t, in, acc.GetReader())
assert.Equal(t, acc, stats.inProgress.get("test"))
@ -78,7 +81,7 @@ func TestAccountGetUpdateReader(t *testing.T) {
}
in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc.UpdateReader(context.Background(), in2)
acc.UpdateReader(ctx, in2)
assert.Equal(t, in2, acc.GetReader())
assert.Equal(t, acc, stats.inProgress.get("test"))
@ -91,9 +94,10 @@ func TestAccountGetUpdateReader(t *testing.T) {
}
func TestAccountRead(t *testing.T) {
ctx := context.Background()
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 1, "test")
assert.True(t, acc.values.start.IsZero())
acc.values.mu.Lock()
@ -128,13 +132,14 @@ func TestAccountRead(t *testing.T) {
}
func testAccountWriteTo(t *testing.T, withBuffer bool) {
ctx := context.Background()
buf := make([]byte, 2*asyncreader.BufferSize+1)
for i := range buf {
buf[i] = byte(i % 251)
}
in := ioutil.NopCloser(bytes.NewBuffer(buf))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, int64(len(buf)), "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, int64(len(buf)), "test")
if withBuffer {
acc = acc.WithBuffer()
}
@ -172,9 +177,10 @@ func TestAccountWriteToWithBuffer(t *testing.T) {
}
func TestAccountString(t *testing.T) {
ctx := context.Background()
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, 3, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 3, "test")
// FIXME not an exhaustive test!
@ -192,9 +198,10 @@ func TestAccountString(t *testing.T) {
// Test the Accounter interface methods on Account and accountStream
func TestAccountAccounter(t *testing.T) {
ctx := context.Background()
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, 3, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 3, "test")
assert.True(t, in == acc.OldStream())
@ -250,6 +257,7 @@ func TestAccountAccounter(t *testing.T) {
}
func TestAccountMaxTransfer(t *testing.T) {
ctx := context.Background()
old := fs.Config.MaxTransfer
oldMode := fs.Config.CutoffMode
@ -260,8 +268,8 @@ func TestAccountMaxTransfer(t *testing.T) {
}()
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 1, "test")
var b = make([]byte, 10)
@ -277,8 +285,8 @@ func TestAccountMaxTransfer(t *testing.T) {
assert.True(t, fserrors.IsFatalError(err))
fs.Config.CutoffMode = fs.CutoffModeSoft
stats = NewStats()
acc = newAccountSizeName(context.Background(), stats, in, 1, "test")
stats = NewStats(ctx)
acc = newAccountSizeName(ctx, stats, in, 1, "test")
n, err = acc.Read(b)
assert.Equal(t, 10, n)
@ -292,6 +300,7 @@ func TestAccountMaxTransfer(t *testing.T) {
}
func TestAccountMaxTransferWriteTo(t *testing.T) {
ctx := context.Background()
old := fs.Config.MaxTransfer
oldMode := fs.Config.CutoffMode
@ -302,8 +311,8 @@ func TestAccountMaxTransferWriteTo(t *testing.T) {
}()
in := ioutil.NopCloser(readers.NewPatternReader(1024))
stats := NewStats()
acc := newAccountSizeName(context.Background(), stats, in, 1, "test")
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 1, "test")
var b bytes.Buffer
@ -313,9 +322,10 @@ func TestAccountMaxTransferWriteTo(t *testing.T) {
}
func TestAccountReadCtx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
stats := NewStats()
stats := NewStats(ctx)
acc := newAccountSizeName(ctx, stats, in, 1, "test")
var b = make([]byte, 10)

View File

@ -1,6 +1,7 @@
package accounting
import (
"context"
"sync"
"github.com/rclone/rclone/fs"
@ -13,7 +14,7 @@ type inProgress struct {
}
// newInProgress makes a new inProgress object
func newInProgress() *inProgress {
func newInProgress(ctx context.Context) *inProgress {
return &inProgress{
m: make(map[string]*Account, fs.Config.Transfers),
}

View File

@ -1,6 +1,8 @@
package accounting
import (
"context"
"github.com/prometheus/client_golang/prometheus"
)
@ -8,6 +10,7 @@ var namespace = "rclone_"
// RcloneCollector is a Prometheus collector for Rclone
type RcloneCollector struct {
ctx context.Context
bytesTransferred *prometheus.Desc
transferSpeed *prometheus.Desc
numOfErrors *prometheus.Desc
@ -21,8 +24,9 @@ type RcloneCollector struct {
}
// NewRcloneCollector make a new RcloneCollector
func NewRcloneCollector() *RcloneCollector {
func NewRcloneCollector(ctx context.Context) *RcloneCollector {
return &RcloneCollector{
ctx: ctx,
bytesTransferred: prometheus.NewDesc(namespace+"bytes_transferred_total",
"Total transferred bytes since the start of the Rclone process",
nil, nil,
@ -82,7 +86,7 @@ func (c *RcloneCollector) Describe(ch chan<- *prometheus.Desc) {
// Collect is part of the Collector interface: https://godoc.org/github.com/prometheus/client_golang/prometheus#Collector
func (c *RcloneCollector) Collect(ch chan<- prometheus.Metric) {
s := groups.sum()
s := groups.sum(c.ctx)
s.mu.RLock()
ch <- prometheus.MustNewConstMetric(c.bytesTransferred, prometheus.CounterValue, float64(s.bytes))

View File

@ -2,6 +2,7 @@ package accounting
import (
"bytes"
"context"
"fmt"
"sort"
"strings"
@ -22,6 +23,7 @@ var startTime = time.Now()
// StatsInfo accounts all transfers
type StatsInfo struct {
mu sync.RWMutex
ctx context.Context
bytes int64
errors int64
lastError error
@ -49,11 +51,12 @@ type StatsInfo struct {
}
// NewStats creates an initialised StatsInfo
func NewStats() *StatsInfo {
func NewStats(ctx context.Context) *StatsInfo {
return &StatsInfo{
ctx: ctx,
checking: newTransferMap(fs.Config.Checkers, "checking"),
transferring: newTransferMap(fs.Config.Transfers, "transferring"),
inProgress: newInProgress(),
inProgress: newInProgress(ctx),
}
}
@ -332,10 +335,10 @@ func (s *StatsInfo) String() string {
// Add per transfer stats if required
if !fs.Config.StatsOneLine {
if !s.checking.empty() {
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.inProgress, s.transferring))
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.ctx, s.inProgress, s.transferring))
}
if !s.transferring.empty() {
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.inProgress, nil))
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.ctx, s.inProgress, nil))
}
}

View File

@ -59,10 +59,10 @@ func rcRemoteStats(ctx context.Context, in rc.Params) (rc.Params, error) {
return rc.Params{}, err
}
if group != "" {
return StatsGroup(group).RemoteStats()
return StatsGroup(ctx, group).RemoteStats()
}
return groups.sum().RemoteStats()
return groups.sum(ctx).RemoteStats()
}
func init() {
@ -129,9 +129,9 @@ func rcTransferredStats(ctx context.Context, in rc.Params) (rc.Params, error) {
out := make(rc.Params)
if group != "" {
out["transferred"] = StatsGroup(group).Transferred()
out["transferred"] = StatsGroup(ctx, group).Transferred()
} else {
out["transferred"] = groups.sum().Transferred()
out["transferred"] = groups.sum(ctx).Transferred()
}
return out, nil
@ -265,28 +265,28 @@ func Stats(ctx context.Context) *StatsInfo {
if !ok {
return GlobalStats()
}
return StatsGroup(group)
return StatsGroup(ctx, group)
}
// StatsGroup gets stats by group name.
func StatsGroup(group string) *StatsInfo {
func StatsGroup(ctx context.Context, group string) *StatsInfo {
stats := groups.get(group)
if stats == nil {
return NewStatsGroup(group)
return NewStatsGroup(ctx, group)
}
return stats
}
// GlobalStats returns special stats used for global accounting.
func GlobalStats() *StatsInfo {
return StatsGroup(globalStats)
return StatsGroup(context.Background(), globalStats)
}
// NewStatsGroup creates new stats under named group.
func NewStatsGroup(group string) *StatsInfo {
stats := NewStats()
func NewStatsGroup(ctx context.Context, group string) *StatsInfo {
stats := NewStats(ctx)
stats.group = group
groups.set(group, stats)
groups.set(ctx, group, stats)
return stats
}
@ -305,7 +305,7 @@ func newStatsGroups() *statsGroups {
}
// set marks the stats as belonging to a group
func (sg *statsGroups) set(group string, stats *StatsInfo) {
func (sg *statsGroups) set(ctx context.Context, group string, stats *StatsInfo) {
sg.mu.Lock()
defer sg.mu.Unlock()
@ -343,11 +343,11 @@ func (sg *statsGroups) names() []string {
}
// sum returns aggregate stats that contains summation of all groups.
func (sg *statsGroups) sum() *StatsInfo {
func (sg *statsGroups) sum(ctx context.Context) *StatsInfo {
sg.mu.Lock()
defer sg.mu.Unlock()
sum := NewStats()
sum := NewStats(ctx)
for _, stats := range sg.m {
stats.mu.RLock()
{

View File

@ -1,6 +1,7 @@
package accounting
import (
"context"
"fmt"
"runtime"
"testing"
@ -11,6 +12,7 @@ import (
)
func TestStatsGroupOperations(t *testing.T) {
ctx := context.Background()
t.Run("empty group returns nil", func(t *testing.T) {
t.Parallel()
@ -20,10 +22,10 @@ func TestStatsGroupOperations(t *testing.T) {
t.Run("set assigns stats to group", func(t *testing.T) {
t.Parallel()
stats := NewStats()
stats := NewStats(ctx)
sg := newStatsGroups()
sg.set("test", stats)
sg.set("test1", stats)
sg.set(ctx, "test", stats)
sg.set(ctx, "test1", stats)
if len(sg.m) != len(sg.names()) || len(sg.m) != 2 {
t.Fatalf("Expected two stats got %d, %d", len(sg.m), len(sg.order))
}
@ -31,10 +33,10 @@ func TestStatsGroupOperations(t *testing.T) {
t.Run("get returns correct group", func(t *testing.T) {
t.Parallel()
stats := NewStats()
stats := NewStats(ctx)
sg := newStatsGroups()
sg.set("test", stats)
sg.set("test1", stats)
sg.set(ctx, "test", stats)
sg.set(ctx, "test1", stats)
got := sg.get("test")
if got != stats {
t.Fatal("get returns incorrect stats")
@ -43,20 +45,20 @@ func TestStatsGroupOperations(t *testing.T) {
t.Run("sum returns correct values", func(t *testing.T) {
t.Parallel()
stats1 := NewStats()
stats1 := NewStats(ctx)
stats1.bytes = 5
stats1.errors = 6
stats1.oldDuration = time.Second
stats1.oldTimeRanges = []timeRange{{time.Now(), time.Now().Add(time.Second)}}
stats2 := NewStats()
stats2 := NewStats(ctx)
stats2.bytes = 10
stats2.errors = 12
stats2.oldDuration = 2 * time.Second
stats2.oldTimeRanges = []timeRange{{time.Now(), time.Now().Add(2 * time.Second)}}
sg := newStatsGroups()
sg.set("test1", stats1)
sg.set("test2", stats2)
sum := sg.sum()
sg.set(ctx, "test1", stats1)
sg.set(ctx, "test2", stats2)
sum := sg.sum(ctx)
assert.Equal(t, stats1.bytes+stats2.bytes, sum.bytes)
assert.Equal(t, stats1.errors+stats2.errors, sum.errors)
assert.Equal(t, stats1.oldDuration+stats2.oldDuration, sum.oldDuration)
@ -70,10 +72,10 @@ func TestStatsGroupOperations(t *testing.T) {
t.Run("delete removes stats", func(t *testing.T) {
t.Parallel()
stats := NewStats()
stats := NewStats(ctx)
sg := newStatsGroups()
sg.set("test", stats)
sg.set("test1", stats)
sg.set(ctx, "test", stats)
sg.set(ctx, "test1", stats)
sg.delete("test1")
if sg.get("test1") != nil {
t.Fatal("stats not deleted")
@ -95,7 +97,7 @@ func TestStatsGroupOperations(t *testing.T) {
runtime.ReadMemStats(&start)
for i := 0; i < count; i++ {
sg.set(fmt.Sprintf("test-%d", i), NewStats())
sg.set(ctx, fmt.Sprintf("test-%d", i), NewStats(ctx))
}
for i := 0; i < count; i++ {

View File

@ -1,6 +1,7 @@
package accounting
import (
"context"
"fmt"
"io"
"testing"
@ -67,7 +68,8 @@ func TestPercentage(t *testing.T) {
}
func TestStatsError(t *testing.T) {
s := NewStats()
ctx := context.Background()
s := NewStats(ctx)
assert.Equal(t, int64(0), s.GetErrors())
assert.False(t, s.HadFatalError())
assert.False(t, s.HadRetryError())
@ -132,6 +134,7 @@ func TestStatsError(t *testing.T) {
}
func TestStatsTotalDuration(t *testing.T) {
ctx := context.Background()
startTime := time.Now()
time1 := startTime.Add(-40 * time.Second)
time2 := time1.Add(10 * time.Second)
@ -139,7 +142,7 @@ func TestStatsTotalDuration(t *testing.T) {
time4 := time3.Add(10 * time.Second)
t.Run("Single completed transfer", func(t *testing.T) {
s := NewStats()
s := NewStats(ctx)
tr1 := &Transfer{
startedAt: time1,
completedAt: time2,
@ -158,7 +161,7 @@ func TestStatsTotalDuration(t *testing.T) {
})
t.Run("Single uncompleted transfer", func(t *testing.T) {
s := NewStats()
s := NewStats(ctx)
tr1 := &Transfer{
startedAt: time1,
}
@ -174,7 +177,7 @@ func TestStatsTotalDuration(t *testing.T) {
})
t.Run("Overlapping without ending", func(t *testing.T) {
s := NewStats()
s := NewStats(ctx)
tr1 := &Transfer{
startedAt: time2,
completedAt: time3,
@ -218,7 +221,7 @@ func TestStatsTotalDuration(t *testing.T) {
})
t.Run("Mixed completed and uncompleted transfers", func(t *testing.T) {
s := NewStats()
s := NewStats(ctx)
s.AddTransfer(&Transfer{
startedAt: time1,
completedAt: time2,
@ -382,6 +385,7 @@ func TestTimeRangeDuration(t *testing.T) {
}
func TestPruneTransfers(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
Name string
Transfers int
@ -406,7 +410,7 @@ func TestPruneTransfers(t *testing.T) {
MaxCompletedTransfers = test.Limit
defer func() { MaxCompletedTransfers = prevLimit }()
s := NewStats()
s := NewStats(ctx)
for i := int64(1); i <= int64(test.Transfers); i++ {
s.AddTransfer(&Transfer{
startedAt: time.Unix(i, 0),

View File

@ -35,7 +35,7 @@ func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter {
}
// StartTokenBucket starts the token bucket if necessary
func StartTokenBucket() {
func StartTokenBucket(ctx context.Context) {
currLimitMu.Lock()
currLimit := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Unlock()
@ -51,7 +51,7 @@ func StartTokenBucket() {
}
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
func StartTokenTicker() {
func StartTokenTicker(ctx context.Context) {
// If the timetable has a single entry or was not specified, we don't need
// a ticker to update the bandwidth.
if len(fs.Config.BwLimit) <= 1 {

View File

@ -86,7 +86,7 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking
// Done ends the transfer.
// Must be called after transfer is finished to run proper cleanups.
func (tr *Transfer) Done(err error) {
func (tr *Transfer) Done(ctx context.Context, err error) {
if err != nil {
err = tr.stats.Error(err)
@ -123,7 +123,7 @@ func (tr *Transfer) Done(err error) {
}
// Reset allows to switch the Account to another transfer method.
func (tr *Transfer) Reset() {
func (tr *Transfer) Reset(ctx context.Context) {
tr.mu.RLock()
acc := tr.acc
tr.acc = nil

View File

@ -1,6 +1,7 @@
package accounting
import (
"context"
"fmt"
"sort"
"strings"
@ -88,7 +89,7 @@ func (tm *transferMap) _sortedSlice() []*Transfer {
// String returns string representation of map items excluding any in
// exclude (if set).
func (tm *transferMap) String(progress *inProgress, exclude *transferMap) string {
func (tm *transferMap) String(ctx context.Context, progress *inProgress, exclude *transferMap) string {
tm.mu.RLock()
defer tm.mu.RUnlock()
stringList := make([]string, 0, len(tm.items))

View File

@ -4,6 +4,7 @@ package config
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
@ -113,7 +114,7 @@ func init() {
func getConfigData() *goconfig.ConfigFile {
if configFile == nil {
LoadConfig()
LoadConfig(context.Background())
}
return configFile
}
@ -212,7 +213,7 @@ func makeConfigPath() string {
}
// LoadConfig loads the config file
func LoadConfig() {
func LoadConfig(ctx context.Context) {
// Set RCLONE_CONFIG_DIR for backend config and subprocesses
_ = os.Setenv("RCLONE_CONFIG_DIR", filepath.Dir(ConfigPath))
@ -229,10 +230,10 @@ func LoadConfig() {
}
// Start the token bucket limiter
accounting.StartTokenBucket()
accounting.StartTokenBucket(ctx)
// Start the bandwidth update ticker
accounting.StartTokenTicker()
accounting.StartTokenTicker(ctx)
// Start the transactions per second limiter
fshttp.StartHTTPTokenBucket()

View File

@ -2,6 +2,7 @@ package config
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
@ -15,6 +16,7 @@ import (
)
func testConfigFile(t *testing.T, configFileName string) func() {
ctx := context.Background()
configKey = nil // reset password
_ = os.Unsetenv("_RCLONE_CONFIG_KEY_FILE")
_ = os.Unsetenv("RCLONE_CONFIG_PASS")
@ -36,7 +38,7 @@ func testConfigFile(t *testing.T, configFileName string) func() {
fs.Config = &fs.ConfigInfo{}
configFile = nil
LoadConfig()
LoadConfig(ctx)
assert.Equal(t, []string{}, getConfigData().GetSectionList())
// Fake a remote

View File

@ -116,7 +116,7 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) {
func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) {
tr := accounting.Stats(ctx).NewCheckingTransfer(src)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
if sizeDiffers(src, dst) {
err = errors.Errorf("Sizes differ")
@ -323,7 +323,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
}
tr1 := accounting.Stats(ctx).NewTransfer(dst)
defer func() {
tr1.Done(nil) // error handling is done by the caller
tr1.Done(ctx, nil) // error handling is done by the caller
}()
in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer
@ -333,7 +333,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
}
tr2 := accounting.Stats(ctx).NewTransfer(dst)
defer func() {
tr2.Done(nil) // error handling is done by the caller
tr2.Done(ctx, nil) // error handling is done by the caller
}()
in2 = tr2.Account(ctx, in2).WithBuffer() // account and buffer the transfer

View File

@ -132,7 +132,7 @@ func TestMultithreadCopy(t *testing.T) {
tr := accounting.GlobalStats().NewTransfer(src)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
dst, err := multiThreadCopy(ctx, r.Flocal, "file1", src, 2, tr)
require.NoError(t, err)

View File

@ -359,7 +359,7 @@ func CommonHash(fa, fb fs.Info) (hash.Type, *fs.HashesOption) {
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
tr := accounting.Stats(ctx).NewTransfer(src)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
newDst = dst
if SkipDestructive(ctx, src, "copy") {
@ -401,7 +401,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
_ = in.Close()
}
if err == fs.ErrorCantCopy {
tr.Reset() // skip incomplete accounting - will be overwritten by the manual copy below
tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy below
}
} else {
err = fs.ErrorCantCopy
@ -478,7 +478,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
// Retry if err returned a retry error
if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) {
fs.Debugf(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries)
tr.Reset() // skip incomplete accounting - will be overwritten by retry
tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry
continue
}
// otherwise finish
@ -550,7 +550,7 @@ func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.
if err == nil {
accounting.Stats(ctx).Renames(1)
}
tr.Done(err)
tr.Done(ctx, err)
}()
newDst = dst
if SkipDestructive(ctx, src, "move") {
@ -627,7 +627,7 @@ func SuffixName(remote string) string {
func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) {
tr := accounting.Stats(ctx).NewCheckingTransfer(dst)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
numDeletes := accounting.Stats(ctx).Deletes(1)
if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete {
@ -817,7 +817,7 @@ func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error {
return ListFn(ctx, f, func(o fs.Object) {
tr := accounting.Stats(ctx).NewCheckingTransfer(o)
defer func() {
tr.Done(nil)
tr.Done(ctx, nil)
}()
modTime := o.ModTime(ctx)
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote())
@ -850,7 +850,7 @@ func hashSum(ctx context.Context, ht hash.Type, o fs.Object) (string, error) {
var err error
tr := accounting.Stats(ctx).NewCheckingTransfer(o)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
sum, err := o.Hash(ctx, ht)
if err == hash.ErrUnsupported {
@ -1058,7 +1058,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
var err error
tr := accounting.Stats(ctx).NewTransfer(o)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
opt := fs.RangeOption{Start: offset, End: -1}
size := o.Size()
@ -1100,7 +1100,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) {
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
in = tr.Account(ctx, in).WithBuffer()
@ -1447,7 +1447,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
// Size known use Put
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
body := ioutil.NopCloser(in) // we let the server close the body
in := tr.Account(ctx, body) // account the transfer (no buffering)
@ -1624,7 +1624,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
}
tr := accounting.Stats(ctx).NewTransfer(srcObj)
defer func() {
tr.Done(err)
tr.Done(ctx, err)
}()
tmpObj, err := Op(ctx, fdst, nil, tmpObjName, srcObj)
if err != nil {
@ -1673,7 +1673,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
if !cp {
err = DeleteFile(ctx, srcObj)
}
tr.Done(err)
tr.Done(ctx, err)
}
return err
}

View File

@ -42,7 +42,7 @@ var promHandler http.Handler
var onlyOnceWarningAllowOrigin sync.Once
func init() {
rcloneCollector := accounting.NewRcloneCollector()
rcloneCollector := accounting.NewRcloneCollector(context.Background())
prometheus.MustRegister(rcloneCollector)
promHandler = promhttp.Handler()
}

View File

@ -344,7 +344,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.W
}
}
}
tr.Done(err)
tr.Done(s.ctx, err)
}
}
@ -749,7 +749,7 @@ func (s *syncCopyMove) makeRenameMap() {
s.pushRenameMap(hash, obj)
}
tr.Done(nil)
tr.Done(s.ctx, nil)
}
}
}()

View File

@ -58,6 +58,7 @@ func init() {
// Initialise rclone for testing
func Initialise() {
ctx := context.Background()
// Never ask for passwords, fail instead.
// If your local config is encrypted set environment variable
// "RCLONE_CONFIG_PASS=hunter2" (or your password)
@ -68,7 +69,7 @@ func Initialise() {
if envConfig := os.Getenv("RCLONE_CONFIG"); envConfig != "" {
config.ConfigPath = envConfig
}
config.LoadConfig()
config.LoadConfig(ctx)
if *Verbose {
fs.Config.LogLevel = fs.LogLevelDebug
}

View File

@ -17,7 +17,7 @@ import (
// ReadFileHandle is an open for read file handle on a File
type ReadFileHandle struct {
baseHandle
done func(err error)
done func(ctx context.Context, err error)
mu sync.Mutex
cond *sync.Cond // cond lock for out of sequence reads
closed bool // set if handle has been closed
@ -414,7 +414,7 @@ func (fh *ReadFileHandle) close() error {
if fh.opened {
var err error
defer func() {
fh.done(err)
fh.done(context.TODO(), err)
}()
// Close first so that we have hashes
err = fh.r.Close()

View File

@ -553,7 +553,7 @@ func (dl *downloader) close(inErr error) (err error) {
dl.in = nil
}
if dl.tr != nil {
dl.tr.Done(inErr)
dl.tr.Done(dl.dls.ctx, inErr)
dl.tr = nil
}
dl._closed = true