From 8243ff8bc88c5d8593c66aca10c03bfec17f5965 Mon Sep 17 00:00:00 2001 From: Aleksandar Jankovic Date: Thu, 18 Jul 2019 12:13:54 +0200 Subject: [PATCH] accounting: isolate stats to groups Introduce stats groups that will isolate accounting for logically different transferring operations. That way multiple accounting operations can be done in parallel without interfering with each other stats. Using groups is optional. There is dedicated global stats that will be used by default if no group is specified. This is operating mode for CLI usage which is just fire and forget operation. For running rclone as rc http server each request will create it's own group. Also there is an option to specify your own group. --- backend/b2/b2.go | 8 +- backend/local/local.go | 6 +- cmd/cmd.go | 28 ++--- cmd/progress.go | 2 +- cmd/serve/ftp/ftp.go | 4 +- cmd/serve/http/http.go | 2 +- cmd/serve/httplib/serve/dir.go | 2 +- cmd/serve/httplib/serve/serve.go | 2 +- cmd/serve/restic/restic.go | 2 +- fs/accounting/inprogress.go | 11 ++ fs/accounting/stats.go | 63 +--------- fs/accounting/stats_groups.go | 189 ++++++++++++++++++++++++++++++ fs/accounting/stringset.go | 34 +++--- fs/operations/multithread_test.go | 4 +- fs/operations/operations.go | 46 ++++---- fs/operations/operations_test.go | 8 +- fs/rc/{ => jobs}/job.go | 112 ++++++++++++++---- fs/rc/{ => jobs}/job_test.go | 132 +++++++++++++++------ fs/rc/rcserver/rcserver.go | 19 ++- fs/rc/rcserver/rcserver_test.go | 5 +- fs/sync/sync.go | 22 ++-- fs/sync/sync_test.go | 116 +++++++++--------- fstest/fstest.go | 8 +- vfs/read.go | 2 +- vfs/read_write.go | 5 - 25 files changed, 551 insertions(+), 281 deletions(-) create mode 100644 fs/accounting/stats_groups.go rename fs/rc/{ => jobs}/job.go (65%) rename fs/rc/{ => jobs}/job_test.go (63%) diff --git a/backend/b2/b2.go b/backend/b2/b2.go index f1b53f82e..5ec50f59e 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -1084,16 +1084,16 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { go func() { defer wg.Done() for object := range toBeDeleted { - accounting.Stats.Checking(object.Name) + accounting.Stats(ctx).Checking(object.Name) checkErr(f.deleteByID(object.ID, object.Name)) - accounting.Stats.DoneChecking(object.Name) + accounting.Stats(ctx).DoneChecking(object.Name) } }() } last := "" checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error { if !isDirectory { - accounting.Stats.Checking(remote) + accounting.Stats(ctx).Checking(remote) if oldOnly && last != remote { if object.Action == "hide" { fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID) @@ -1109,7 +1109,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { toBeDeleted <- object } last = remote - accounting.Stats.DoneChecking(remote) + accounting.Stats(ctx).DoneChecking(remote) } return nil })) diff --git a/backend/local/local.go b/backend/local/local.go index e0e3925dd..caafde9a1 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -359,7 +359,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e err = errors.Wrapf(err, "failed to open directory %q", dir) fs.Errorf(dir, "%v", err) if isPerm { - accounting.Stats.Error(fserrors.NoRetryError(err)) + accounting.Stats(ctx).Error(fserrors.NoRetryError(err)) err = nil // ignore error but fail sync } return nil, err @@ -395,7 +395,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e if fierr != nil { err = errors.Wrapf(err, "failed to read directory %q", namepath) fs.Errorf(dir, "%v", fierr) - accounting.Stats.Error(fserrors.NoRetryError(fierr)) // fail the sync + accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync continue } fis = append(fis, fi) @@ -418,7 +418,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Skip bad symlinks err = fserrors.NoRetryError(errors.Wrap(err, "symlink")) fs.Errorf(newRemote, "Listing error: %v", err) - accounting.Stats.Error(err) + accounting.Stats(ctx).Error(err) continue } if err != nil { diff --git a/cmd/cmd.go b/cmd/cmd.go index ab8ecc5b2..1d6ba8f9a 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -232,25 +232,25 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { for try := 1; try <= *retries; try++ { err = f() fs.CountError(err) - lastErr := accounting.Stats.GetLastError() + lastErr := accounting.GlobalStats().GetLastError() if err == nil { err = lastErr } - if !Retry || !accounting.Stats.Errored() { + if !Retry || !accounting.GlobalStats().Errored() { if try > 1 { fs.Errorf(nil, "Attempt %d/%d succeeded", try, *retries) } break } - if accounting.Stats.HadFatalError() { + if accounting.GlobalStats().HadFatalError() { fs.Errorf(nil, "Fatal error received - not attempting retries") break } - if accounting.Stats.Errored() && !accounting.Stats.HadRetryError() { + if accounting.GlobalStats().Errored() && !accounting.GlobalStats().HadRetryError() { fs.Errorf(nil, "Can't retry this error - not attempting retries") break } - if retryAfter := accounting.Stats.RetryAfter(); !retryAfter.IsZero() { + if retryAfter := accounting.GlobalStats().RetryAfter(); !retryAfter.IsZero() { d := retryAfter.Sub(time.Now()) if d > 0 { fs.Logf(nil, "Received retry after error - sleeping until %s (%v)", retryAfter.Format(time.RFC3339Nano), d) @@ -258,12 +258,12 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } } if lastErr != nil { - fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.Stats.GetErrors(), lastErr) + fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.GlobalStats().GetErrors(), lastErr) } else { - fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.Stats.GetErrors()) + fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.GlobalStats().GetErrors()) } if try < *retries { - accounting.Stats.ResetErrors() + accounting.GlobalStats().ResetErrors() } if *retriesInterval > 0 { time.Sleep(*retriesInterval) @@ -271,7 +271,7 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } stopStats() if err != nil { - nerrs := accounting.Stats.GetErrors() + nerrs := accounting.GlobalStats().GetErrors() if nerrs <= 1 { log.Printf("Failed to %s: %v", cmd.Name(), err) } else { @@ -279,8 +279,8 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } resolveExitCode(err) } - if showStats && (accounting.Stats.Errored() || *statsInterval > 0) { - accounting.Stats.Log() + if showStats && (accounting.GlobalStats().Errored() || *statsInterval > 0) { + accounting.GlobalStats().Log() } fs.Debugf(nil, "%d go routines active\n", runtime.NumGoroutine()) @@ -303,8 +303,8 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } } - if accounting.Stats.Errored() { - resolveExitCode(accounting.Stats.GetLastError()) + if accounting.GlobalStats().Errored() { + resolveExitCode(accounting.GlobalStats().GetLastError()) } } @@ -337,7 +337,7 @@ func StartStats() func() { for { select { case <-ticker.C: - accounting.Stats.Log() + accounting.GlobalStats().Log() case <-stopStats: ticker.Stop() return diff --git a/cmd/progress.go b/cmd/progress.go index e4d2f1de3..c08e126b6 100644 --- a/cmd/progress.go +++ b/cmd/progress.go @@ -93,7 +93,7 @@ func printProgress(logMessage string) { w, h = 80, 25 } _ = h - stats := strings.TrimSpace(accounting.Stats.String()) + stats := strings.TrimSpace(accounting.GlobalStats().String()) logMessage = strings.TrimSpace(logMessage) out := func(s string) { diff --git a/cmd/serve/ftp/ftp.go b/cmd/serve/ftp/ftp.go index c601e47d9..8202cfe89 100644 --- a/cmd/serve/ftp/ftp.go +++ b/cmd/serve/ftp/ftp.go @@ -214,7 +214,7 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er } // Account the transfer - tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) defer func() { tr.Done(err) }() @@ -313,7 +313,7 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose } // Account the transfer - tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) defer tr.Done(nil) return node.Size(), handle, nil diff --git a/cmd/serve/http/http.go b/cmd/serve/http/http.go index b8c8f393f..3c85efad8 100644 --- a/cmd/serve/http/http.go +++ b/cmd/serve/http/http.go @@ -187,7 +187,7 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string }() // Account the transfer - tr := accounting.Stats.NewTransfer(obj) + tr := accounting.Stats(r.Context()).NewTransfer(obj) defer tr.Done(nil) // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer diff --git a/cmd/serve/httplib/serve/dir.go b/cmd/serve/httplib/serve/dir.go index 36ea35cc7..3d952e83e 100644 --- a/cmd/serve/httplib/serve/dir.go +++ b/cmd/serve/httplib/serve/dir.go @@ -75,7 +75,7 @@ func Error(what interface{}, w http.ResponseWriter, text string, err error) { // Serve serves a directory func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { // Account the transfer - tr := accounting.Stats.NewTransferRemoteSize(d.DirRemote, -1) + tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1) defer tr.Done(nil) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index dba1ca8fa..425a3209d 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -75,7 +75,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } - tr := accounting.Stats.NewTransfer(o) + tr := accounting.Stats(r.Context()).NewTransfer(o) defer func() { tr.Done(err) }() diff --git a/cmd/serve/restic/restic.go b/cmd/serve/restic/restic.go index 530a1edb8..7f307de44 100644 --- a/cmd/serve/restic/restic.go +++ b/cmd/serve/restic/restic.go @@ -271,7 +271,7 @@ func (s *server) postObject(w http.ResponseWriter, r *http.Request, remote strin _, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now()) if err != nil { - accounting.Stats.Error(err) + accounting.Stats(r.Context()).Error(err) fs.Errorf(remote, "Post request rcat error: %v", err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) diff --git a/fs/accounting/inprogress.go b/fs/accounting/inprogress.go index 2f9e01491..debf51a7e 100644 --- a/fs/accounting/inprogress.go +++ b/fs/accounting/inprogress.go @@ -39,3 +39,14 @@ func (ip *inProgress) get(name string) *Account { defer ip.mu.Unlock() return ip.m[name] } + +// merge adds items from another inProgress +func (ip *inProgress) merge(m *inProgress) { + ip.mu.Lock() + defer ip.mu.Unlock() + m.mu.Lock() + defer m.mu.Unlock() + for key, val := range m.m { + ip.m[key] = val + } +} diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 743940780..8ecae96df 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -2,7 +2,6 @@ package accounting import ( "bytes" - "context" "fmt" "sort" "strings" @@ -14,60 +13,6 @@ import ( "github.com/ncw/rclone/fs/rc" ) -var ( - // Stats is global statistics counter - Stats = NewStats() -) - -func init() { - // Set the function pointer up in fs - fs.CountError = Stats.Error - - rc.Add(rc.Call{ - Path: "core/stats", - Fn: Stats.RemoteStats, - Title: "Returns stats about current transfers.", - Help: ` -This returns all available stats - - rclone rc core/stats - -Returns the following values: - -` + "```" + ` -{ - "speed": average speed in bytes/sec since start of the process, - "bytes": total transferred bytes since the start of the process, - "errors": number of errors, - "fatalError": whether there has been at least one FatalError, - "retryError": whether there has been at least one non-NoRetryError, - "checks": number of checked files, - "transfers": number of transferred files, - "deletes" : number of deleted files, - "elapsedTime": time in seconds since the start of the process, - "lastError": last occurred error, - "transferring": an array of currently active file transfers: - [ - { - "bytes": total transferred bytes for this file, - "eta": estimated time in seconds until file transfer completion - "name": name of the file, - "percentage": progress of the file transfer in percent, - "speed": speed in bytes/sec, - "speedAvg": speed in bytes/sec as an exponentially weighted moving average, - "size": size of the file in bytes - } - ], - "checking": an array of names of currently active file checks - [] -} -` + "```" + ` -Values for "transferring", "checking" and "lastError" are only assigned if data is available. -The value for "eta" is null if an eta cannot be determined. -`, - }) -} - // StatsInfo accounts all transfers type StatsInfo struct { mu sync.RWMutex @@ -102,7 +47,7 @@ func NewStats() *StatsInfo { } // RemoteStats returns stats for rc -func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) { +func (s *StatsInfo) RemoteStats() (out rc.Params, err error) { out = make(rc.Params) s.mu.RLock() dt := s.totalDuration() @@ -237,7 +182,7 @@ func (s *StatsInfo) String() string { // checking and transferring have their own locking so read // here before lock to prevent deadlock on GetBytes transferring, checking := s.transferring.count(), s.checking.count() - transferringBytesDone, transferringBytesTotal := s.transferring.progress() + transferringBytesDone, transferringBytesTotal := s.transferring.progress(s) s.mu.RLock() @@ -325,10 +270,10 @@ Elapsed time: %10v // Add per transfer stats if required if !fs.Config.StatsOneLine { if !s.checking.empty() { - _, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking) + _, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.inProgress)) } if !s.transferring.empty() { - _, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring) + _, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.inProgress)) } } diff --git a/fs/accounting/stats_groups.go b/fs/accounting/stats_groups.go new file mode 100644 index 000000000..089da4d4d --- /dev/null +++ b/fs/accounting/stats_groups.go @@ -0,0 +1,189 @@ +package accounting + +import ( + "context" + "sync" + + "github.com/ncw/rclone/fs/rc" + + "github.com/ncw/rclone/fs" +) + +const globalStats = "global_stats" + +var groups *statsGroups + +func remoteStats(ctx context.Context, in rc.Params) (rc.Params, error) { + // Check to see if we should filter by group. + group, err := in.GetString("group") + if rc.NotErrParamNotFound(err) { + return rc.Params{}, err + } + if group != "" { + return StatsGroup(group).RemoteStats() + } + + return groups.sum().RemoteStats() +} + +func init() { + // Init stats container + groups = newStatsGroups() + + // Set the function pointer up in fs + fs.CountError = GlobalStats().Error + + rc.Add(rc.Call{ + Path: "core/stats", + Fn: remoteStats, + Title: "Returns stats about current transfers.", + Help: ` +This returns all available stats + + rclone rc core/stats + +Returns the following values: + +` + "```" + ` +{ + "speed": average speed in bytes/sec since start of the process, + "bytes": total transferred bytes since the start of the process, + "errors": number of errors, + "fatalError": whether there has been at least one FatalError, + "retryError": whether there has been at least one non-NoRetryError, + "checks": number of checked files, + "transfers": number of transferred files, + "deletes" : number of deleted files, + "elapsedTime": time in seconds since the start of the process, + "lastError": last occurred error, + "transferring": an array of currently active file transfers: + [ + { + "bytes": total transferred bytes for this file, + "eta": estimated time in seconds until file transfer completion + "name": name of the file, + "percentage": progress of the file transfer in percent, + "speed": speed in bytes/sec, + "speedAvg": speed in bytes/sec as an exponentially weighted moving average, + "size": size of the file in bytes + } + ], + "checking": an array of names of currently active file checks + [] +} +` + "```" + ` +Values for "transferring", "checking" and "lastError" are only assigned if data is available. +The value for "eta" is null if an eta cannot be determined. +`, + }) +} + +type statsGroupCtx int64 + +const statsGroupKey statsGroupCtx = 1 + +// WithStatsGroup returns copy of the parent context with assigned group. +func WithStatsGroup(parent context.Context, group string) context.Context { + return context.WithValue(parent, statsGroupKey, group) +} + +// StatsGroupFromContext returns group from the context if it's available. +// Returns false if group is empty. +func StatsGroupFromContext(ctx context.Context) (string, bool) { + statsGroup, ok := ctx.Value(statsGroupKey).(string) + if statsGroup == "" { + ok = false + } + return statsGroup, ok +} + +// Stats gets stats by extracting group from context. +func Stats(ctx context.Context) *StatsInfo { + group, ok := StatsGroupFromContext(ctx) + if !ok { + return GlobalStats() + } + return StatsGroup(group) +} + +// StatsGroup gets stats by group name. +func StatsGroup(group string) *StatsInfo { + stats := groups.get(group) + if stats == nil { + return NewStatsGroup(group) + } + return stats +} + +// GlobalStats returns special stats used for global accounting. +func GlobalStats() *StatsInfo { + return StatsGroup(globalStats) +} + +// NewStatsGroup creates new stats under named group. +func NewStatsGroup(group string) *StatsInfo { + stats := NewStats() + groups.set(group, stats) + return stats +} + +// statsGroups holds a synchronized map of stats +type statsGroups struct { + mu sync.Mutex + m map[string]*StatsInfo +} + +// newStatsGroups makes a new statsGroups object +func newStatsGroups() *statsGroups { + return &statsGroups{ + m: make(map[string]*StatsInfo), + } +} + +// set marks the stats as belonging to a group +func (sg *statsGroups) set(group string, acc *StatsInfo) { + sg.mu.Lock() + defer sg.mu.Unlock() + sg.m[group] = acc +} + +// clear discards reference to group +func (sg *statsGroups) clear(group string) { + sg.mu.Lock() + defer sg.mu.Unlock() + delete(sg.m, group) +} + +// get gets the stats for group, or nil if not found +func (sg *statsGroups) get(group string) *StatsInfo { + sg.mu.Lock() + defer sg.mu.Unlock() + stats, ok := sg.m[group] + if !ok { + return nil + } + return stats +} + +// get gets the stats for group, or nil if not found +func (sg *statsGroups) sum() *StatsInfo { + sg.mu.Lock() + defer sg.mu.Unlock() + sum := NewStats() + for _, stats := range sg.m { + sum.bytes += stats.bytes + sum.errors += stats.errors + sum.fatalError = sum.fatalError || stats.fatalError + sum.retryError = sum.retryError || stats.retryError + sum.checks += stats.checks + sum.transfers += stats.transfers + sum.deletes += stats.deletes + sum.checking.merge(stats.checking) + sum.transferring.merge(stats.transferring) + sum.inProgress.merge(stats.inProgress) + if sum.lastError == nil && stats.lastError != nil { + sum.lastError = stats.lastError + } + } + return sum +} diff --git a/fs/accounting/stringset.go b/fs/accounting/stringset.go index 71744035d..78197e5aa 100644 --- a/fs/accounting/stringset.go +++ b/fs/accounting/stringset.go @@ -38,6 +38,17 @@ func (ss *stringSet) del(remote string) { ss.mu.Unlock() } +// merge adds items from another set +func (ss *stringSet) merge(m *stringSet) { + ss.mu.Lock() + m.mu.Lock() + for item := range m.items { + ss.items[item] = struct{}{} + } + m.mu.Unlock() + ss.mu.Unlock() +} + // empty returns whether the set has any items func (ss *stringSet) empty() bool { ss.mu.RLock() @@ -52,14 +63,14 @@ func (ss *stringSet) count() int { return len(ss.items) } -// Strings returns all the strings in the stringSet -func (ss *stringSet) Strings() []string { +// String returns string representation of set items. +func (ss *stringSet) String(progress *inProgress) string { ss.mu.RLock() defer ss.mu.RUnlock() - strings := make([]string, 0, len(ss.items)) + strngs := make([]string, 0, len(ss.items)) for name := range ss.items { var out string - if acc := Stats.inProgress.get(name); acc != nil { + if acc := progress.get(name); acc != nil { out = acc.String() } else { out = fmt.Sprintf("%*s: %s", @@ -68,24 +79,19 @@ func (ss *stringSet) Strings() []string { ss.name, ) } - strings = append(strings, " * "+out) + strngs = append(strngs, " * "+out) } - sorted := sort.StringSlice(strings) + sorted := sort.StringSlice(strngs) sorted.Sort() - return sorted -} - -// String returns all the file names in the stringSet joined by newline -func (ss *stringSet) String() string { - return strings.Join(ss.Strings(), "\n") + return strings.Join(sorted, "\n") } // progress returns total bytes read as well as the size. -func (ss *stringSet) progress() (totalBytes, totalSize int64) { +func (ss *stringSet) progress(stats *StatsInfo) (totalBytes, totalSize int64) { ss.mu.RLock() defer ss.mu.RUnlock() for name := range ss.items { - if acc := Stats.inProgress.get(name); acc != nil { + if acc := stats.inProgress.get(name); acc != nil { bytes, size := acc.progress() if size >= 0 && bytes >= 0 { totalBytes += bytes diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index e27662005..1700e0153 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -60,8 +60,8 @@ func TestMultithreadCopy(t *testing.T) { src, err := r.Fremote.NewObject(context.Background(), "file1") require.NoError(t, err) - accounting.Stats.ResetCounters() - tr := accounting.Stats.NewTransfer(src) + accounting.GlobalStats().ResetCounters() + tr := accounting.GlobalStats().NewTransfer(src) defer func() { tr.Done(err) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index a22cc78a5..3da4d5762 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -251,7 +251,7 @@ var _ fs.MimeTyper = (*overrideRemoteObject)(nil) // It returns the destination object if possible. Note that this may // be nil. func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - tr := accounting.Stats.NewTransfer(src) + tr := accounting.Stats(ctx).NewTransfer(src) defer func() { tr.Done(err) }() @@ -281,13 +281,13 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj actionTaken = "Copied (server side copy)" if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { // Check transfer limit for server side copies - if fs.Config.MaxTransfer >= 0 && accounting.Stats.GetBytes() >= int64(fs.Config.MaxTransfer) { + if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) { return nil, accounting.ErrorMaxTransferLimitReached } newDst, err = doCopy(ctx, src, remote) if err == nil { dst = newDst - accounting.Stats.Bytes(dst.Size()) // account the bytes for the server side transfer + accounting.Stats(ctx).Bytes(dst.Size()) // account the bytes for the server side transfer } } else { err = fs.ErrorCantCopy @@ -428,9 +428,9 @@ func SameObject(src, dst fs.Object) bool { // It returns the destination object if possible. Note that this may // be nil. func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - accounting.Stats.Checking(src.Remote()) + accounting.Stats(ctx).Checking(src.Remote()) defer func() { - accounting.Stats.DoneChecking(src.Remote()) + accounting.Stats(ctx).DoneChecking(src.Remote()) }() newDst = dst if fs.Config.DryRun { @@ -501,8 +501,8 @@ func SuffixName(remote string) string { // If backupDir is set then it moves the file to there instead of // deleting func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) { - accounting.Stats.Checking(dst.Remote()) - numDeletes := accounting.Stats.Deletes(1) + accounting.Stats(ctx).Checking(dst.Remote()) + numDeletes := accounting.Stats(ctx).Deletes(1) if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete { return fserrors.FatalError(errors.New("--max-delete threshold reached")) } @@ -523,7 +523,7 @@ func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs } else if !fs.Config.DryRun { fs.Infof(dst, actioned) } - accounting.Stats.DoneChecking(dst.Remote()) + accounting.Stats(ctx).DoneChecking(dst.Remote()) return err } @@ -709,8 +709,8 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { // check to see if two objects are identical using the check function func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool) { - accounting.Stats.Checking(src.Remote()) - defer accounting.Stats.DoneChecking(src.Remote()) + accounting.Stats(ctx).Checking(src.Remote()) + defer accounting.Stats(ctx).DoneChecking(src.Remote()) if sizeDiffers(src, dst) { err := errors.Errorf("Sizes differ") fs.Errorf(src, "%v", err) @@ -797,7 +797,7 @@ func CheckFn(ctx context.Context, fdst, fsrc fs.Fs, check checkFn, oneway bool) fs.Logf(fsrc, "%d files missing", c.srcFilesMissing) } - fs.Logf(fdst, "%d differences found", accounting.Stats.GetErrors()) + fs.Logf(fdst, "%d differences found", accounting.Stats(ctx).GetErrors()) if c.noHashes > 0 { fs.Logf(fdst, "%d hashes could not be checked", c.noHashes) } @@ -854,7 +854,7 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e if err != nil { return true, errors.Wrapf(err, "failed to open %q", dst) } - tr1 := accounting.Stats.NewTransfer(dst) + tr1 := accounting.Stats(ctx).NewTransfer(dst) defer func() { tr1.Done(err) }() @@ -864,7 +864,7 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e if err != nil { return true, errors.Wrapf(err, "failed to open %q", src) } - tr2 := accounting.Stats.NewTransfer(dst) + tr2 := accounting.Stats(ctx).NewTransfer(dst) defer func() { tr2.Done(err) }() @@ -930,9 +930,9 @@ func List(ctx context.Context, f fs.Fs, w io.Writer) error { // Lists in parallel which may get them out of order func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error { return ListFn(ctx, f, func(o fs.Object) { - accounting.Stats.Checking(o.Remote()) + accounting.Stats(ctx).Checking(o.Remote()) modTime := o.ModTime(ctx) - accounting.Stats.DoneChecking(o.Remote()) + accounting.Stats(ctx).DoneChecking(o.Remote()) syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote()) }) } @@ -968,9 +968,9 @@ func DropboxHashSum(ctx context.Context, f fs.Fs, w io.Writer) error { // hashSum returns the human readable hash for ht passed in. This may // be UNSUPPORTED or ERROR. func hashSum(ctx context.Context, ht hash.Type, o fs.Object) string { - accounting.Stats.Checking(o.Remote()) + accounting.Stats(ctx).Checking(o.Remote()) sum, err := o.Hash(ctx, ht) - accounting.Stats.DoneChecking(o.Remote()) + accounting.Stats(ctx).DoneChecking(o.Remote()) if err == hash.ErrUnsupported { sum = "UNSUPPORTED" } else if err != nil { @@ -1167,7 +1167,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { var mu sync.Mutex return ListFn(ctx, f, func(o fs.Object) { var err error - tr := accounting.Stats.NewTransfer(o) + tr := accounting.Stats(ctx).NewTransfer(o) defer func() { tr.Done(err) }() @@ -1206,7 +1206,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { // Rcat reads data from the Reader until EOF and uploads it to a file on remote func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { - tr := accounting.Stats.NewTransferRemoteSize(dstFileName, -1) + tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1) defer func() { tr.Done(err) }() @@ -1527,7 +1527,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo if size >= 0 { var err error // Size known use Put - tr := accounting.Stats.NewTransferRemoteSize(dstFileName, size) + tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size) defer func() { tr.Done(err) }() @@ -1664,7 +1664,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str } return errors.Wrap(err, "error while attempting to move file to a temporary location") } - tr := accounting.Stats.NewTransfer(srcObj) + tr := accounting.Stats(ctx).NewTransfer(srcObj) defer func() { tr.Done(err) }() @@ -1711,11 +1711,11 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str _, err = Op(ctx, fdst, dstObj, dstFileName, srcObj) } else { - accounting.Stats.Checking(srcFileName) + accounting.Stats(ctx).Checking(srcFileName) if !cp { err = DeleteFile(ctx, srcObj) } - defer accounting.Stats.DoneChecking(srcFileName) + defer accounting.Stats(ctx).DoneChecking(srcFileName) } return err } diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index 75f84373a..841a5a6c8 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -14,7 +14,7 @@ // fstest.CheckItems() before use. This make sure the directory // listing is now consistent and stops cascading errors. // -// Call accounting.Stats.ResetCounters() before every fs.Sync() as it +// Call accounting.GlobalStats().ResetCounters() before every fs.Sync() as it // uses the error count internally. package operations_test @@ -315,15 +315,15 @@ func testCheck(t *testing.T, checkFunction func(ctx context.Context, fdst, fsrc check := func(i int, wantErrors int64, wantChecks int64, oneway bool) { fs.Debugf(r.Fremote, "%d: Starting check test", i) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() var buf bytes.Buffer log.SetOutput(&buf) defer func() { log.SetOutput(os.Stderr) }() err := checkFunction(context.Background(), r.Fremote, r.Flocal, oneway) - gotErrors := accounting.Stats.GetErrors() - gotChecks := accounting.Stats.GetChecks() + gotErrors := accounting.GlobalStats().GetErrors() + gotChecks := accounting.GlobalStats().GetChecks() if wantErrors == 0 && err != nil { t.Errorf("%d: Got error when not expecting one: %v", i, err) } diff --git a/fs/rc/job.go b/fs/rc/jobs/job.go similarity index 65% rename from fs/rc/job.go rename to fs/rc/jobs/job.go index 98f0c7ce5..a35cc03a5 100644 --- a/fs/rc/job.go +++ b/fs/rc/jobs/job.go @@ -1,13 +1,18 @@ // Manage background jobs that the rc is running -package rc +package jobs import ( "context" + "fmt" "sync" "sync/atomic" "time" + "github.com/ncw/rclone/fs/rc" + + "github.com/ncw/rclone/fs/accounting" + "github.com/ncw/rclone/fs" "github.com/pkg/errors" ) @@ -16,13 +21,14 @@ import ( type Job struct { mu sync.Mutex ID int64 `json:"id"` + Group string `json:"group"` StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime"` Error string `json:"error"` Finished bool `json:"finished"` Success bool `json:"success"` Duration float64 `json:"duration"` - Output Params `json:"output"` + Output rc.Params `json:"output"` Stop func() `json:"-"` } @@ -96,11 +102,11 @@ func (jobs *Jobs) Get(ID int64) *Job { } // mark the job as finished -func (job *Job) finish(out Params, err error) { +func (job *Job) finish(out rc.Params, err error) { job.mu.Lock() job.EndTime = time.Now() if out == nil { - out = make(Params) + out = make(rc.Params) } job.Output = out job.Duration = job.EndTime.Sub(job.StartTime).Seconds() @@ -117,7 +123,7 @@ func (job *Job) finish(out Params, err error) { } // run the job until completion writing the return status -func (job *Job) run(ctx context.Context, fn Func, in Params) { +func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) { defer func() { if r := recover(); r != nil { job.finish(nil, errors.Errorf("panic received: %v", r)) @@ -126,37 +132,93 @@ func (job *Job) run(ctx context.Context, fn Func, in Params) { job.finish(fn(ctx, in)) } -// NewJob start a new Job off -func (jobs *Jobs) NewJob(fn Func, in Params) *Job { - ctx, cancel := context.WithCancel(context.Background()) +func getGroup(in rc.Params) string { + // Check to see if the group is set + group, err := in.GetString("_group") + if rc.NotErrParamNotFound(err) { + fs.Errorf(nil, "Can't get _group param %+v", err) + } + delete(in, "_group") + return group +} + +// NewAsyncJob start a new asynchronous Job off +func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { + id := atomic.AddInt64(&jobID, 1) + + group := getGroup(in) + if group == "" { + group = fmt.Sprintf("job/%d", id) + } + ctx := accounting.WithStatsGroup(context.Background(), group) + ctx, cancel := context.WithCancel(ctx) stop := func() { cancel() // Wait for cancel to propagate before returning. <-ctx.Done() } job := &Job{ - ID: atomic.AddInt64(&jobID, 1), + ID: id, + Group: group, StartTime: time.Now(), Stop: stop, } - go job.run(ctx, fn, in) jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() + go job.run(ctx, fn, in) return job - } -// StartJob starts a new job and returns a Param suitable for output -func StartJob(fn Func, in Params) (Params, error) { - job := running.NewJob(fn, in) - out := make(Params) +// NewSyncJob start a new synchronous Job off +func (jobs *Jobs) NewSyncJob(ctx context.Context, in rc.Params) (*Job, context.Context) { + id := atomic.AddInt64(&jobID, 1) + group := getGroup(in) + if group == "" { + group = fmt.Sprintf("job/%d", id) + } + ctxG := accounting.WithStatsGroup(ctx, fmt.Sprintf("job/%d", id)) + ctx, cancel := context.WithCancel(ctxG) + stop := func() { + cancel() + // Wait for cancel to propagate before returning. + <-ctx.Done() + } + job := &Job{ + ID: id, + Group: group, + StartTime: time.Now(), + Stop: stop, + } + jobs.mu.Lock() + jobs.jobs[job.ID] = job + jobs.mu.Unlock() + return job, ctx +} + +// StartAsyncJob starts a new job asynchronously and returns a Param suitable +// for output. +func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) { + job := running.NewAsyncJob(fn, in) + out := make(rc.Params) out["jobid"] = job.ID return out, nil } +// ExecuteJob executes new job synchronously and returns a Param suitable for +// output. +func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) { + job, ctx := running.NewSyncJob(ctx, in) + job.run(ctx, fn, in) + var err error + if !job.Success { + err = errors.New(job.Error) + } + return job.Output, job.ID, err +} + func init() { - Add(Call{ + rc.Add(rc.Call{ Path: "job/status", Fn: rcJobStatus, Title: "Reads the status of the job ID", @@ -179,7 +241,7 @@ Results } // Returns the status of a job -func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { +func rcJobStatus(ctx context.Context, in rc.Params) (out rc.Params, err error) { jobID, err := in.GetInt64("jobid") if err != nil { return nil, err @@ -190,8 +252,8 @@ func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { } job.mu.Lock() defer job.mu.Unlock() - out = make(Params) - err = Reshape(&out, job) + out = make(rc.Params) + err = rc.Reshape(&out, job) if err != nil { return nil, errors.Wrap(err, "reshape failed in job status") } @@ -199,7 +261,7 @@ func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { } func init() { - Add(Call{ + rc.Add(rc.Call{ Path: "job/list", Fn: rcJobList, Title: "Lists the IDs of the running jobs", @@ -212,14 +274,14 @@ Results } // Returns list of job ids. -func rcJobList(ctx context.Context, in Params) (out Params, err error) { - out = make(Params) +func rcJobList(ctx context.Context, in rc.Params) (out rc.Params, err error) { + out = make(rc.Params) out["jobids"] = running.IDs() return out, nil } func init() { - Add(Call{ + rc.Add(rc.Call{ Path: "job/stop", Fn: rcJobStop, Title: "Stop the running job", @@ -230,7 +292,7 @@ func init() { } // Stops the running job. -func rcJobStop(ctx context.Context, in Params) (out Params, err error) { +func rcJobStop(ctx context.Context, in rc.Params) (out rc.Params, err error) { jobID, err := in.GetInt64("jobid") if err != nil { return nil, err @@ -241,7 +303,7 @@ func rcJobStop(ctx context.Context, in Params) (out Params, err error) { } job.mu.Lock() defer job.mu.Unlock() - out = make(Params) + out = make(rc.Params) job.Stop() return out, nil } diff --git a/fs/rc/job_test.go b/fs/rc/jobs/job_test.go similarity index 63% rename from fs/rc/job_test.go rename to fs/rc/jobs/job_test.go index 8c4c77838..a9372f13d 100644 --- a/fs/rc/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -1,4 +1,4 @@ -package rc +package jobs import ( "context" @@ -7,6 +7,7 @@ import ( "time" "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/rc" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,10 +37,10 @@ func TestJobsExpire(t *testing.T) { jobs := newJobs() jobs.expireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) - job := jobs.NewJob(func(ctx context.Context, in Params) (Params, error) { + job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) { defer close(wait) return in, nil - }, Params{}) + }, rc.Params{}) <-wait assert.Equal(t, 1, len(jobs.jobs)) jobs.Expire() @@ -57,14 +58,14 @@ func TestJobsExpire(t *testing.T) { jobs.mu.Unlock() } -var noopFn = func(ctx context.Context, in Params) (Params, error) { +var noopFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { return nil, nil } func TestJobsIDs(t *testing.T) { jobs := newJobs() - job1 := jobs.NewJob(noopFn, Params{}) - job2 := jobs.NewJob(noopFn, Params{}) + job1 := jobs.NewAsyncJob(noopFn, rc.Params{}) + job2 := jobs.NewAsyncJob(noopFn, rc.Params{}) wantIDs := []int64{job1.ID, job2.ID} gotIDs := jobs.IDs() require.Equal(t, 2, len(gotIDs)) @@ -76,17 +77,22 @@ func TestJobsIDs(t *testing.T) { func TestJobsGet(t *testing.T) { jobs := newJobs() - job := jobs.NewJob(noopFn, Params{}) + job := jobs.NewAsyncJob(noopFn, rc.Params{}) assert.Equal(t, job, jobs.Get(job.ID)) assert.Nil(t, jobs.Get(123123123123)) } -var longFn = func(ctx context.Context, in Params) (Params, error) { +var longFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { time.Sleep(1 * time.Hour) return nil, nil } -var ctxFn = func(ctx context.Context, in Params) (Params, error) { +var shortFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { + time.Sleep(time.Millisecond) + return nil, nil +} + +var ctxFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -105,17 +111,17 @@ func sleepJob() { func TestJobFinish(t *testing.T) { jobs := newJobs() - job := jobs.NewJob(longFn, Params{}) + job := jobs.NewAsyncJob(longFn, rc.Params{}) sleepJob() assert.Equal(t, true, job.EndTime.IsZero()) - assert.Equal(t, Params(nil), job.Output) + assert.Equal(t, rc.Params(nil), job.Output) assert.Equal(t, 0.0, job.Duration) assert.Equal(t, "", job.Error) assert.Equal(t, false, job.Success) assert.Equal(t, false, job.Finished) - wantOut := Params{"a": 1} + wantOut := rc.Params{"a": 1} job.finish(wantOut, nil) assert.Equal(t, false, job.EndTime.IsZero()) @@ -125,18 +131,18 @@ func TestJobFinish(t *testing.T) { assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Finished) - job = jobs.NewJob(longFn, Params{}) + job = jobs.NewAsyncJob(longFn, rc.Params{}) sleepJob() job.finish(nil, nil) assert.Equal(t, false, job.EndTime.IsZero()) - assert.Equal(t, Params{}, job.Output) + assert.Equal(t, rc.Params{}, job.Output) assert.True(t, job.Duration >= floatSleepTime) assert.Equal(t, "", job.Error) assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Finished) - job = jobs.NewJob(longFn, Params{}) + job = jobs.NewAsyncJob(longFn, rc.Params{}) sleepJob() job.finish(wantOut, errors.New("potato")) @@ -152,14 +158,14 @@ func TestJobFinish(t *testing.T) { // part of NewJob, now just test the panic catching func TestJobRunPanic(t *testing.T) { wait := make(chan struct{}) - boom := func(ctx context.Context, in Params) (Params, error) { + boom := func(ctx context.Context, in rc.Params) (rc.Params, error) { sleepJob() defer close(wait) panic("boom") } jobs := newJobs() - job := jobs.NewJob(boom, Params{}) + job := jobs.NewAsyncJob(boom, rc.Params{}) <-wait runtime.Gosched() // yield to make sure job is updated @@ -176,7 +182,7 @@ func TestJobRunPanic(t *testing.T) { job.mu.Lock() assert.Equal(t, false, job.EndTime.IsZero()) - assert.Equal(t, Params{}, job.Output) + assert.Equal(t, rc.Params{}, job.Output) assert.True(t, job.Duration >= floatSleepTime) assert.Equal(t, "panic received: boom", job.Error) assert.Equal(t, false, job.Success) @@ -187,7 +193,7 @@ func TestJobRunPanic(t *testing.T) { func TestJobsNewJob(t *testing.T) { jobID = 0 jobs := newJobs() - job := jobs.NewJob(noopFn, Params{}) + job := jobs.NewAsyncJob(noopFn, rc.Params{}) assert.Equal(t, int64(1), job.ID) assert.Equal(t, job, jobs.Get(1)) assert.NotEmpty(t, job.Stop) @@ -195,19 +201,26 @@ func TestJobsNewJob(t *testing.T) { func TestStartJob(t *testing.T) { jobID = 0 - out, err := StartJob(longFn, Params{}) + out, err := StartAsyncJob(longFn, rc.Params{}) assert.NoError(t, err) - assert.Equal(t, Params{"jobid": int64(1)}, out) + assert.Equal(t, rc.Params{"jobid": int64(1)}, out) +} + +func TestExecuteJob(t *testing.T) { + jobID = 0 + _, id, err := ExecuteJob(context.Background(), shortFn, rc.Params{}) + assert.NoError(t, err) + assert.Equal(t, int64(1), id) } func TestRcJobStatus(t *testing.T) { jobID = 0 - _, err := StartJob(longFn, Params{}) + _, err := StartAsyncJob(longFn, rc.Params{}) assert.NoError(t, err) - call := Calls.Get("job/status") + call := rc.Calls.Get("job/status") assert.NotNil(t, call) - in := Params{"jobid": 1} + in := rc.Params{"jobid": 1} out, err := call.Fn(context.Background(), in) require.NoError(t, err) require.NotNil(t, out) @@ -216,12 +229,12 @@ func TestRcJobStatus(t *testing.T) { assert.Equal(t, false, out["finished"]) assert.Equal(t, false, out["success"]) - in = Params{"jobid": 123123123} + in = rc.Params{"jobid": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "job not found") - in = Params{"jobidx": 123123123} + in = rc.Params{"jobidx": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "Didn't find key") @@ -229,45 +242,88 @@ func TestRcJobStatus(t *testing.T) { func TestRcJobList(t *testing.T) { jobID = 0 - _, err := StartJob(longFn, Params{}) + _, err := StartAsyncJob(longFn, rc.Params{}) assert.NoError(t, err) - call := Calls.Get("job/list") + call := rc.Calls.Get("job/list") assert.NotNil(t, call) - in := Params{} + in := rc.Params{} out, err := call.Fn(context.Background(), in) require.NoError(t, err) require.NotNil(t, out) - assert.Equal(t, Params{"jobids": []int64{1}}, out) + assert.Equal(t, rc.Params{"jobids": []int64{1}}, out) } -func TestRcJobStop(t *testing.T) { +func TestRcAsyncJobStop(t *testing.T) { jobID = 0 - _, err := StartJob(ctxFn, Params{}) + _, err := StartAsyncJob(ctxFn, rc.Params{}) assert.NoError(t, err) - call := Calls.Get("job/stop") + call := rc.Calls.Get("job/stop") assert.NotNil(t, call) - in := Params{"jobid": 1} + in := rc.Params{"jobid": 1} out, err := call.Fn(context.Background(), in) require.NoError(t, err) require.Empty(t, out) - in = Params{"jobid": 123123123} + in = rc.Params{"jobid": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "job not found") - in = Params{"jobidx": 123123123} + in = rc.Params{"jobidx": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "Didn't find key") time.Sleep(10 * time.Millisecond) - call = Calls.Get("job/status") + call = rc.Calls.Get("job/status") assert.NotNil(t, call) - in = Params{"jobid": 1} + in = rc.Params{"jobid": 1} + out, err = call.Fn(context.Background(), in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, float64(1), out["id"]) + assert.Equal(t, "context canceled", out["error"]) + assert.Equal(t, true, out["finished"]) + assert.Equal(t, false, out["success"]) +} + +func TestRcSyncJobStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + jobID = 0 + _, id, err := ExecuteJob(ctx, ctxFn, rc.Params{}) + assert.Error(t, err) + assert.Equal(t, int64(1), id) + }() + + time.Sleep(10 * time.Millisecond) + + call := rc.Calls.Get("job/stop") + assert.NotNil(t, call) + in := rc.Params{"jobid": 1} + out, err := call.Fn(context.Background(), in) + require.NoError(t, err) + require.Empty(t, out) + + in = rc.Params{"jobid": 123123123} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "job not found") + + in = rc.Params{"jobidx": 123123123} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "Didn't find key") + + cancel() + time.Sleep(10 * time.Millisecond) + + call = rc.Calls.Get("job/status") + assert.NotNil(t, call) + in = rc.Params{"jobid": 1} out, err = call.Fn(context.Background(), in) require.NoError(t, err) require.NotNil(t, out) diff --git a/fs/rc/rcserver/rcserver.go b/fs/rc/rcserver/rcserver.go index 1a33be81a..ae803ed71 100644 --- a/fs/rc/rcserver/rcserver.go +++ b/fs/rc/rcserver/rcserver.go @@ -3,6 +3,8 @@ package rcserver import ( "encoding/json" + "flag" + "fmt" "mime" "net/http" "net/url" @@ -10,6 +12,10 @@ import ( "sort" "strings" + "github.com/skratchdot/open-golang/open" + + "github.com/ncw/rclone/fs/rc/jobs" + "github.com/ncw/rclone/cmd/serve/httplib" "github.com/ncw/rclone/cmd/serve/httplib/serve" "github.com/ncw/rclone/fs" @@ -18,7 +24,6 @@ import ( "github.com/ncw/rclone/fs/list" "github.com/ncw/rclone/fs/rc" "github.com/pkg/errors" - "github.com/skratchdot/open-golang/open" ) // Start the remote control server if configured @@ -79,7 +84,10 @@ func (s *Server) Serve() error { if user != "" || pass != "" { openURL.User = url.UserPassword(user, pass) } - _ = open.Start(openURL.String()) + // Don't open browser if serving in testing environment. + if flag.Lookup("test.v") == nil { + _ = open.Start(openURL.String()) + } } return nil } @@ -181,15 +189,16 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string) writeError(path, in, w, err, http.StatusBadRequest) return } - delete(in, "_async") // remove the async parameter after parsing so vfs operations don't get confused fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) var out rc.Params if isAsync { - out, err = rc.StartJob(call.Fn, in) + out, err = jobs.StartAsyncJob(call.Fn, in) } else { - out, err = call.Fn(r.Context(), in) + var jobID int64 + out, jobID, err = jobs.ExecuteJob(r.Context(), call.Fn, in) + w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", jobID)) } if err != nil { writeError(path, in, w, err, http.StatusInternalServerError) diff --git a/fs/rc/rcserver/rcserver_test.go b/fs/rc/rcserver/rcserver_test.go index 647b3620a..cdb7850aa 100644 --- a/fs/rc/rcserver/rcserver_test.go +++ b/fs/rc/rcserver/rcserver_test.go @@ -611,10 +611,7 @@ func TestRCAsync(t *testing.T) { ContentType: "application/json", Body: `{ "_async":true }`, Status: http.StatusOK, - Expected: `{ - "jobid": 1 -} -`, + Contains: regexp.MustCompile(`(?s)\{.*\"jobid\":.*\}`), }, { Name: "bad", URL: "rc/noop", diff --git a/fs/sync/sync.go b/fs/sync/sync.go index f96bdcd5b..98bd863a6 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -82,12 +82,12 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete dstEmptyDirs: make(map[string]fs.DirEntry), srcEmptyDirs: make(map[string]fs.DirEntry), noTraverse: fs.Config.NoTraverse, - toBeChecked: newPipe(accounting.Stats.SetCheckQueue, fs.Config.MaxBacklog), - toBeUploaded: newPipe(accounting.Stats.SetTransferQueue, fs.Config.MaxBacklog), + toBeChecked: newPipe(accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog), + toBeUploaded: newPipe(accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog), deleteFilesCh: make(chan fs.Object, fs.Config.Checkers), trackRenames: fs.Config.TrackRenames, commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), - toBeRenamed: newPipe(accounting.Stats.SetRenameQueue, fs.Config.MaxBacklog), + toBeRenamed: newPipe(accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog), trackRenamesCh: make(chan fs.Object, fs.Config.Checkers), } s.ctx, s.cancel = context.WithCancel(ctx) @@ -215,7 +215,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { return } src := pair.Src - accounting.Stats.Checking(src.Remote()) + accounting.Stats(s.ctx).Checking(src.Remote()) // Check to see if can store this if src.Storable() { NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, pair.Dst, pair.Src, s.compareCopyDest, s.backupDir) @@ -256,7 +256,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { } } } - accounting.Stats.DoneChecking(src.Remote()) + accounting.Stats(s.ctx).DoneChecking(src.Remote()) } } @@ -401,7 +401,7 @@ func (s *syncCopyMove) stopDeleters() { // checkSrcMap is clear then it assumes that the any source files that // have been found have been removed from dstFiles already. func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error { - if accounting.Stats.Errored() && !fs.Config.IgnoreErrors { + if accounting.Stats(s.ctx).Errored() && !fs.Config.IgnoreErrors { fs.Errorf(s.fdst, "%v", fs.ErrorNotDeleting) return fs.ErrorNotDeleting } @@ -437,7 +437,7 @@ func deleteEmptyDirectories(ctx context.Context, f fs.Fs, entriesMap map[string] if len(entriesMap) == 0 { return nil } - if accounting.Stats.Errored() && !fs.Config.IgnoreErrors { + if accounting.Stats(ctx).Errored() && !fs.Config.IgnoreErrors { fs.Errorf(f, "%v", fs.ErrorNotDeletingDirs) return fs.ErrorNotDeletingDirs } @@ -497,8 +497,8 @@ func copyEmptyDirectories(ctx context.Context, f fs.Fs, entries map[string]fs.Di } } - if accounting.Stats.Errored() { - fs.Debugf(f, "failed to copy %d directories", accounting.Stats.GetErrors()) + if accounting.Stats(ctx).Errored() { + fs.Debugf(f, "failed to copy %d directories", accounting.Stats(ctx).GetErrors()) } if okCount > 0 { @@ -587,12 +587,12 @@ func (s *syncCopyMove) makeRenameMap() { for obj := range in { // only create hash for dst fs.Object if its size could match if _, found := possibleSizes[obj.Size()]; found { - accounting.Stats.Checking(obj.Remote()) + accounting.Stats(s.ctx).Checking(obj.Remote()) hash := s.renameHash(obj) if hash != "" { s.pushRenameMap(hash, obj) } - accounting.Stats.DoneChecking(obj.Remote()) + accounting.Stats(s.ctx).DoneChecking(obj.Remote()) } } }() diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index d721601ed..7c91c677d 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -105,7 +105,7 @@ func TestSyncNoTraverse(t *testing.T) { file1 := r.WriteFile("sub dir/hello world", "hello world", t1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -306,24 +306,24 @@ func TestSyncBasedOnCheckSum(t *testing.T) { file1 := r.WriteFile("check sum", "-", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Fremote, file1) // Change last modified date only file2 := r.WriteFile("check sum", "-", t2) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred no files - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file2) fstest.CheckItems(t, r.Fremote, file1) } @@ -340,24 +340,24 @@ func TestSyncSizeOnly(t *testing.T) { file1 := r.WriteFile("sizeonly", "potato", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Fremote, file1) // Update mtime, md5sum but not length of file file2 := r.WriteFile("sizeonly", "POTATO", t2) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred no files - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file2) fstest.CheckItems(t, r.Fremote, file1) } @@ -374,24 +374,24 @@ func TestSyncIgnoreSize(t *testing.T) { file1 := r.WriteFile("ignore-size", "contents", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Fremote, file1) // Update size but not date of file file2 := r.WriteFile("ignore-size", "longer contents but same date", t1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred no files - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file2) fstest.CheckItems(t, r.Fremote, file1) } @@ -402,24 +402,24 @@ func TestSyncIgnoreTimes(t *testing.T) { file1 := r.WriteBoth(context.Background(), "existing", "potato", t1) fstest.CheckItems(t, r.Fremote, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly 0 files because the // files were identical. - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fs.Config.IgnoreTimes = true defer func() { fs.Config.IgnoreTimes = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file even though the // files were identical. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file1) fstest.CheckItems(t, r.Fremote, file1) @@ -433,7 +433,7 @@ func TestSyncIgnoreExisting(t *testing.T) { fs.Config.IgnoreExisting = true defer func() { fs.Config.IgnoreExisting = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1) @@ -441,7 +441,7 @@ func TestSyncIgnoreExisting(t *testing.T) { // Change everything r.WriteFile("existing", "newpotatoes", t2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // Items should not change @@ -488,7 +488,7 @@ func TestSyncIgnoreErrors(t *testing.T) { fs.GetModifyWindow(r.Fremote), ) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() fs.CountError(errors.New("boom")) assert.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false)) @@ -532,7 +532,7 @@ func TestSyncAfterChangingModtimeOnly(t *testing.T) { fs.Config.DryRun = true defer func() { fs.Config.DryRun = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -541,7 +541,7 @@ func TestSyncAfterChangingModtimeOnly(t *testing.T) { fs.Config.DryRun = false - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -569,7 +569,7 @@ func TestSyncAfterChangingModtimeOnlyWithNoUpdateModTime(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1) fstest.CheckItems(t, r.Fremote, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -590,7 +590,7 @@ func TestSyncDoesntUpdateModtime(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1) fstest.CheckItems(t, r.Fremote, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -598,7 +598,7 @@ func TestSyncDoesntUpdateModtime(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) // We should have transferred exactly one file, not set the mod time - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) } func TestSyncAfterAddingAFile(t *testing.T) { @@ -610,7 +610,7 @@ func TestSyncAfterAddingAFile(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1, file2) fstest.CheckItems(t, r.Fremote, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1, file2) @@ -625,7 +625,7 @@ func TestSyncAfterChangingFilesSizeOnly(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2) @@ -648,7 +648,7 @@ func TestSyncAfterChangingContentsOnly(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2) @@ -664,7 +664,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileDryRun(t *testing.T) { file3 := r.WriteBoth(context.Background(), "empty space", "-", t2) fs.Config.DryRun = true - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) fs.Config.DryRun = false require.NoError(t, err) @@ -683,7 +683,7 @@ func TestSyncAfterRemovingAFileAndAddingAFile(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file3) fstest.CheckItems(t, r.Flocal, file1, file3) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1, file3) @@ -729,7 +729,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDir(t *testing.T) { fs.GetModifyWindow(r.Fremote), ) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -798,7 +798,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDirWithErrors(t *testing.T) { fs.GetModifyWindow(r.Fremote), ) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() fs.CountError(errors.New("boom")) err := Sync(context.Background(), r.Fremote, r.Flocal, false) assert.Equal(t, fs.ErrorNotDeleting, err) @@ -876,7 +876,7 @@ func TestCopyDeleteBefore(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := CopyDir(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -899,14 +899,14 @@ func TestSyncWithExclude(t *testing.T) { filter.Active.Opt.MaxSize = -1 }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, file2, file1) // Now sync the other way round and check enormous doesn't get // deleted as it is excluded from the sync - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Flocal, r.Fremote, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2, file1, file3) @@ -929,14 +929,14 @@ func TestSyncWithExcludeAndDeleteExcluded(t *testing.T) { filter.Active.Opt.DeleteExcluded = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, file2) // Check sync the other way round to make sure enormous gets // deleted even though it is excluded - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Flocal, r.Fremote, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2) @@ -971,7 +971,7 @@ func TestSyncWithUpdateOlder(t *testing.T) { fs.Config.ModifyWindow = oldModifyWindow }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, oneO, twoF, threeO, fourF, fiveF) @@ -995,7 +995,7 @@ func TestSyncWithTrackRenames(t *testing.T) { f1 := r.WriteFile("potato", "Potato Content", t1) f2 := r.WriteFile("yam", "Yam Content", t2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false)) fstest.CheckItems(t, r.Fremote, f1, f2) @@ -1004,7 +1004,7 @@ func TestSyncWithTrackRenames(t *testing.T) { // Now rename locally. f2 = r.RenameFile(f2, "yaml") - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false)) fstest.CheckItems(t, r.Fremote, f1, f2) @@ -1012,15 +1012,15 @@ func TestSyncWithTrackRenames(t *testing.T) { if canTrackRenames { if r.Fremote.Features().Move == nil || r.Fremote.Name() == "TestUnion" { // union remote can Move but returns CantMove error // If no server side Move, we are falling back to Copy + Delete - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) // 1 copy - assert.Equal(t, int64(4), accounting.Stats.GetChecks()) // 2 file checks + 1 move + 1 delete + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) // 1 copy + assert.Equal(t, int64(4), accounting.GlobalStats().GetChecks()) // 2 file checks + 1 move + 1 delete } else { - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) // 0 copy - assert.Equal(t, int64(3), accounting.Stats.GetChecks()) // 2 file checks + 1 move + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) // 0 copy + assert.Equal(t, int64(3), accounting.GlobalStats().GetChecks()) // 2 file checks + 1 move } } else { - assert.Equal(t, int64(2), accounting.Stats.GetChecks()) // 2 file checks - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) // 0 copy + assert.Equal(t, int64(2), accounting.GlobalStats().GetChecks()) // 2 file checks + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) // 0 copy } } @@ -1049,7 +1049,7 @@ func testServerSideMove(t *testing.T, r *fstest.Run, withFilter, testDeleteEmpty fstest.CheckItems(t, FremoteMove, file2, file3) // Do server side move - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = MoveDir(context.Background(), FremoteMove, r.Fremote, testDeleteEmptyDirs, false) require.NoError(t, err) @@ -1076,7 +1076,7 @@ func testServerSideMove(t *testing.T, r *fstest.Run, withFilter, testDeleteEmpty } // Move it back to a new empty remote, dst does not exist this time - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = MoveDir(context.Background(), FremoteMove2, FremoteMove, testDeleteEmptyDirs, false) require.NoError(t, err) @@ -1439,7 +1439,7 @@ func testSyncBackupDir(t *testing.T, suffix string, suffixKeepExtension bool) { fdst, err := fs.NewFs(r.FremoteName + "/dst") require.NoError(t, err) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1464,7 +1464,7 @@ func testSyncBackupDir(t *testing.T, suffix string, suffixKeepExtension bool) { // This should delete three and overwrite one again, checking // the files got overwritten correctly in backup-dir - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1518,7 +1518,7 @@ func testSyncSuffix(t *testing.T, suffix string, suffixKeepExtension bool) { fdst, err := fs.NewFs(r.FremoteName + "/dst") require.NoError(t, err) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = operations.CopyFile(context.Background(), fdst, r.Flocal, "one", "one") require.NoError(t, err) err = operations.CopyFile(context.Background(), fdst, r.Flocal, "two", "two") @@ -1548,7 +1548,7 @@ func testSyncSuffix(t *testing.T, suffix string, suffixKeepExtension bool) { // This should delete three and overwrite one again, checking // the files got overwritten correctly in backup-dir - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = operations.CopyFile(context.Background(), fdst, r.Flocal, "one", "one") require.NoError(t, err) err = operations.CopyFile(context.Background(), fdst, r.Flocal, "two", "two") @@ -1594,13 +1594,13 @@ func TestSyncUTFNorm(t *testing.T) { file2 := r.WriteObject(context.Background(), Encoding2, "This is a old test", t2) fstest.CheckItems(t, r.Fremote, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file, but kept the // normalized state of the file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file1) file1.Path = file2.Path fstest.CheckItems(t, r.Fremote, file1) @@ -1620,7 +1620,7 @@ func TestSyncImmutable(t *testing.T) { fstest.CheckItems(t, r.Fremote) // Should succeed - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1) @@ -1632,7 +1632,7 @@ func TestSyncImmutable(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) // Should fail with ErrorImmutableModified and not modify local or remote files - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) assert.EqualError(t, err, fs.ErrorImmutableModified.Error()) fstest.CheckItems(t, r.Flocal, file2) @@ -1659,7 +1659,7 @@ func TestSyncIgnoreCase(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2) // Should not copy files that are differently-cased but otherwise identical - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1) @@ -1694,7 +1694,7 @@ func TestAbort(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1, file2, file3) fstest.CheckItems(t, r.Fremote) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err) diff --git a/fstest/fstest.go b/fstest/fstest.go index 7464a4642..e7f67e3dd 100644 --- a/fstest/fstest.go +++ b/fstest/fstest.go @@ -274,7 +274,8 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs expectedDirs = filterEmptyDirs(t, items, expectedDirs) } is := NewItems(items) - oldErrors := accounting.Stats.GetErrors() + ctx := context.Background() + oldErrors := accounting.Stats(ctx).GetErrors() var objs []fs.Object var dirs []fs.Directory var err error @@ -283,7 +284,6 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs wantListing1, wantListing2 := makeListingFromItems(items) gotListing := "" listingOK := false - ctx := context.Background() for i := 1; i <= retries; i++ { objs, dirs, err = walk.GetAll(ctx, f, "", true, -1) if err != nil && err != fs.ErrorDirNotFound { @@ -317,8 +317,8 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs } is.Done(t) // Don't notice an error when listing an empty directory - if len(items) == 0 && oldErrors == 0 && accounting.Stats.GetErrors() == 1 { - accounting.Stats.ResetErrors() + if len(items) == 0 && oldErrors == 0 && accounting.Stats(ctx).GetErrors() == 1 { + accounting.Stats(ctx).ResetErrors() } // Check the directories if expectedDirs != nil { diff --git a/vfs/read.go b/vfs/read.go index fc3107cf8..7dd5b0775 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -71,7 +71,7 @@ func (fh *ReadFileHandle) openPending() (err error) { if err != nil { return err } - tr := accounting.Stats.NewTransfer(o) + tr := accounting.GlobalStats().NewTransfer(o) fh.done = tr.Done fh.r = tr.Account(r).WithBuffer() // account the transfer fh.opened = true diff --git a/vfs/read_write.go b/vfs/read_write.go index 84a57a74e..d5e43fbb1 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/ncw/rclone/fs" - "github.com/ncw/rclone/fs/accounting" "github.com/ncw/rclone/fs/log" "github.com/ncw/rclone/fs/operations" "github.com/ncw/rclone/lib/file" @@ -87,10 +86,6 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl // copy an object to or from the remote while accounting for it func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { if operations.NeedTransfer(context.TODO(), dst, src) { - tr := accounting.Stats.NewTransfer(src) - defer func() { - tr.Done(err) - }() newDst, err = operations.Copy(context.TODO(), f, dst, remote, src) } else { newDst = dst