diff --git a/backend/b2/b2.go b/backend/b2/b2.go index f1b53f82e..5ec50f59e 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -1084,16 +1084,16 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { go func() { defer wg.Done() for object := range toBeDeleted { - accounting.Stats.Checking(object.Name) + accounting.Stats(ctx).Checking(object.Name) checkErr(f.deleteByID(object.ID, object.Name)) - accounting.Stats.DoneChecking(object.Name) + accounting.Stats(ctx).DoneChecking(object.Name) } }() } last := "" checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error { if !isDirectory { - accounting.Stats.Checking(remote) + accounting.Stats(ctx).Checking(remote) if oldOnly && last != remote { if object.Action == "hide" { fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID) @@ -1109,7 +1109,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { toBeDeleted <- object } last = remote - accounting.Stats.DoneChecking(remote) + accounting.Stats(ctx).DoneChecking(remote) } return nil })) diff --git a/backend/local/local.go b/backend/local/local.go index e0e3925dd..caafde9a1 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -359,7 +359,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e err = errors.Wrapf(err, "failed to open directory %q", dir) fs.Errorf(dir, "%v", err) if isPerm { - accounting.Stats.Error(fserrors.NoRetryError(err)) + accounting.Stats(ctx).Error(fserrors.NoRetryError(err)) err = nil // ignore error but fail sync } return nil, err @@ -395,7 +395,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e if fierr != nil { err = errors.Wrapf(err, "failed to read directory %q", namepath) fs.Errorf(dir, "%v", fierr) - accounting.Stats.Error(fserrors.NoRetryError(fierr)) // fail the sync + accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync continue } fis = append(fis, fi) @@ -418,7 +418,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Skip bad symlinks err = fserrors.NoRetryError(errors.Wrap(err, "symlink")) fs.Errorf(newRemote, "Listing error: %v", err) - accounting.Stats.Error(err) + accounting.Stats(ctx).Error(err) continue } if err != nil { diff --git a/cmd/cmd.go b/cmd/cmd.go index ab8ecc5b2..1d6ba8f9a 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -232,25 +232,25 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { for try := 1; try <= *retries; try++ { err = f() fs.CountError(err) - lastErr := accounting.Stats.GetLastError() + lastErr := accounting.GlobalStats().GetLastError() if err == nil { err = lastErr } - if !Retry || !accounting.Stats.Errored() { + if !Retry || !accounting.GlobalStats().Errored() { if try > 1 { fs.Errorf(nil, "Attempt %d/%d succeeded", try, *retries) } break } - if accounting.Stats.HadFatalError() { + if accounting.GlobalStats().HadFatalError() { fs.Errorf(nil, "Fatal error received - not attempting retries") break } - if accounting.Stats.Errored() && !accounting.Stats.HadRetryError() { + if accounting.GlobalStats().Errored() && !accounting.GlobalStats().HadRetryError() { fs.Errorf(nil, "Can't retry this error - not attempting retries") break } - if retryAfter := accounting.Stats.RetryAfter(); !retryAfter.IsZero() { + if retryAfter := accounting.GlobalStats().RetryAfter(); !retryAfter.IsZero() { d := retryAfter.Sub(time.Now()) if d > 0 { fs.Logf(nil, "Received retry after error - sleeping until %s (%v)", retryAfter.Format(time.RFC3339Nano), d) @@ -258,12 +258,12 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } } if lastErr != nil { - fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.Stats.GetErrors(), lastErr) + fs.Errorf(nil, "Attempt %d/%d failed with %d errors and: %v", try, *retries, accounting.GlobalStats().GetErrors(), lastErr) } else { - fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.Stats.GetErrors()) + fs.Errorf(nil, "Attempt %d/%d failed with %d errors", try, *retries, accounting.GlobalStats().GetErrors()) } if try < *retries { - accounting.Stats.ResetErrors() + accounting.GlobalStats().ResetErrors() } if *retriesInterval > 0 { time.Sleep(*retriesInterval) @@ -271,7 +271,7 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } stopStats() if err != nil { - nerrs := accounting.Stats.GetErrors() + nerrs := accounting.GlobalStats().GetErrors() if nerrs <= 1 { log.Printf("Failed to %s: %v", cmd.Name(), err) } else { @@ -279,8 +279,8 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } resolveExitCode(err) } - if showStats && (accounting.Stats.Errored() || *statsInterval > 0) { - accounting.Stats.Log() + if showStats && (accounting.GlobalStats().Errored() || *statsInterval > 0) { + accounting.GlobalStats().Log() } fs.Debugf(nil, "%d go routines active\n", runtime.NumGoroutine()) @@ -303,8 +303,8 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } } - if accounting.Stats.Errored() { - resolveExitCode(accounting.Stats.GetLastError()) + if accounting.GlobalStats().Errored() { + resolveExitCode(accounting.GlobalStats().GetLastError()) } } @@ -337,7 +337,7 @@ func StartStats() func() { for { select { case <-ticker.C: - accounting.Stats.Log() + accounting.GlobalStats().Log() case <-stopStats: ticker.Stop() return diff --git a/cmd/progress.go b/cmd/progress.go index e4d2f1de3..c08e126b6 100644 --- a/cmd/progress.go +++ b/cmd/progress.go @@ -93,7 +93,7 @@ func printProgress(logMessage string) { w, h = 80, 25 } _ = h - stats := strings.TrimSpace(accounting.Stats.String()) + stats := strings.TrimSpace(accounting.GlobalStats().String()) logMessage = strings.TrimSpace(logMessage) out := func(s string) { diff --git a/cmd/serve/ftp/ftp.go b/cmd/serve/ftp/ftp.go index c601e47d9..8202cfe89 100644 --- a/cmd/serve/ftp/ftp.go +++ b/cmd/serve/ftp/ftp.go @@ -214,7 +214,7 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er } // Account the transfer - tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) defer func() { tr.Done(err) }() @@ -313,7 +313,7 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose } // Account the transfer - tr := accounting.Stats.NewTransferRemoteSize(path, node.Size()) + tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) defer tr.Done(nil) return node.Size(), handle, nil diff --git a/cmd/serve/http/http.go b/cmd/serve/http/http.go index b8c8f393f..3c85efad8 100644 --- a/cmd/serve/http/http.go +++ b/cmd/serve/http/http.go @@ -187,7 +187,7 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string }() // Account the transfer - tr := accounting.Stats.NewTransfer(obj) + tr := accounting.Stats(r.Context()).NewTransfer(obj) defer tr.Done(nil) // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer diff --git a/cmd/serve/httplib/serve/dir.go b/cmd/serve/httplib/serve/dir.go index 36ea35cc7..3d952e83e 100644 --- a/cmd/serve/httplib/serve/dir.go +++ b/cmd/serve/httplib/serve/dir.go @@ -75,7 +75,7 @@ func Error(what interface{}, w http.ResponseWriter, text string, err error) { // Serve serves a directory func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { // Account the transfer - tr := accounting.Stats.NewTransferRemoteSize(d.DirRemote, -1) + tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1) defer tr.Done(nil) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index dba1ca8fa..425a3209d 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -75,7 +75,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } - tr := accounting.Stats.NewTransfer(o) + tr := accounting.Stats(r.Context()).NewTransfer(o) defer func() { tr.Done(err) }() diff --git a/cmd/serve/restic/restic.go b/cmd/serve/restic/restic.go index 530a1edb8..7f307de44 100644 --- a/cmd/serve/restic/restic.go +++ b/cmd/serve/restic/restic.go @@ -271,7 +271,7 @@ func (s *server) postObject(w http.ResponseWriter, r *http.Request, remote strin _, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now()) if err != nil { - accounting.Stats.Error(err) + accounting.Stats(r.Context()).Error(err) fs.Errorf(remote, "Post request rcat error: %v", err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) diff --git a/fs/accounting/inprogress.go b/fs/accounting/inprogress.go index 2f9e01491..debf51a7e 100644 --- a/fs/accounting/inprogress.go +++ b/fs/accounting/inprogress.go @@ -39,3 +39,14 @@ func (ip *inProgress) get(name string) *Account { defer ip.mu.Unlock() return ip.m[name] } + +// merge adds items from another inProgress +func (ip *inProgress) merge(m *inProgress) { + ip.mu.Lock() + defer ip.mu.Unlock() + m.mu.Lock() + defer m.mu.Unlock() + for key, val := range m.m { + ip.m[key] = val + } +} diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 743940780..8ecae96df 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -2,7 +2,6 @@ package accounting import ( "bytes" - "context" "fmt" "sort" "strings" @@ -14,60 +13,6 @@ import ( "github.com/ncw/rclone/fs/rc" ) -var ( - // Stats is global statistics counter - Stats = NewStats() -) - -func init() { - // Set the function pointer up in fs - fs.CountError = Stats.Error - - rc.Add(rc.Call{ - Path: "core/stats", - Fn: Stats.RemoteStats, - Title: "Returns stats about current transfers.", - Help: ` -This returns all available stats - - rclone rc core/stats - -Returns the following values: - -` + "```" + ` -{ - "speed": average speed in bytes/sec since start of the process, - "bytes": total transferred bytes since the start of the process, - "errors": number of errors, - "fatalError": whether there has been at least one FatalError, - "retryError": whether there has been at least one non-NoRetryError, - "checks": number of checked files, - "transfers": number of transferred files, - "deletes" : number of deleted files, - "elapsedTime": time in seconds since the start of the process, - "lastError": last occurred error, - "transferring": an array of currently active file transfers: - [ - { - "bytes": total transferred bytes for this file, - "eta": estimated time in seconds until file transfer completion - "name": name of the file, - "percentage": progress of the file transfer in percent, - "speed": speed in bytes/sec, - "speedAvg": speed in bytes/sec as an exponentially weighted moving average, - "size": size of the file in bytes - } - ], - "checking": an array of names of currently active file checks - [] -} -` + "```" + ` -Values for "transferring", "checking" and "lastError" are only assigned if data is available. -The value for "eta" is null if an eta cannot be determined. -`, - }) -} - // StatsInfo accounts all transfers type StatsInfo struct { mu sync.RWMutex @@ -102,7 +47,7 @@ func NewStats() *StatsInfo { } // RemoteStats returns stats for rc -func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) { +func (s *StatsInfo) RemoteStats() (out rc.Params, err error) { out = make(rc.Params) s.mu.RLock() dt := s.totalDuration() @@ -237,7 +182,7 @@ func (s *StatsInfo) String() string { // checking and transferring have their own locking so read // here before lock to prevent deadlock on GetBytes transferring, checking := s.transferring.count(), s.checking.count() - transferringBytesDone, transferringBytesTotal := s.transferring.progress() + transferringBytesDone, transferringBytesTotal := s.transferring.progress(s) s.mu.RLock() @@ -325,10 +270,10 @@ Elapsed time: %10v // Add per transfer stats if required if !fs.Config.StatsOneLine { if !s.checking.empty() { - _, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking) + _, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.inProgress)) } if !s.transferring.empty() { - _, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring) + _, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.inProgress)) } } diff --git a/fs/accounting/stats_groups.go b/fs/accounting/stats_groups.go new file mode 100644 index 000000000..089da4d4d --- /dev/null +++ b/fs/accounting/stats_groups.go @@ -0,0 +1,189 @@ +package accounting + +import ( + "context" + "sync" + + "github.com/ncw/rclone/fs/rc" + + "github.com/ncw/rclone/fs" +) + +const globalStats = "global_stats" + +var groups *statsGroups + +func remoteStats(ctx context.Context, in rc.Params) (rc.Params, error) { + // Check to see if we should filter by group. + group, err := in.GetString("group") + if rc.NotErrParamNotFound(err) { + return rc.Params{}, err + } + if group != "" { + return StatsGroup(group).RemoteStats() + } + + return groups.sum().RemoteStats() +} + +func init() { + // Init stats container + groups = newStatsGroups() + + // Set the function pointer up in fs + fs.CountError = GlobalStats().Error + + rc.Add(rc.Call{ + Path: "core/stats", + Fn: remoteStats, + Title: "Returns stats about current transfers.", + Help: ` +This returns all available stats + + rclone rc core/stats + +Returns the following values: + +` + "```" + ` +{ + "speed": average speed in bytes/sec since start of the process, + "bytes": total transferred bytes since the start of the process, + "errors": number of errors, + "fatalError": whether there has been at least one FatalError, + "retryError": whether there has been at least one non-NoRetryError, + "checks": number of checked files, + "transfers": number of transferred files, + "deletes" : number of deleted files, + "elapsedTime": time in seconds since the start of the process, + "lastError": last occurred error, + "transferring": an array of currently active file transfers: + [ + { + "bytes": total transferred bytes for this file, + "eta": estimated time in seconds until file transfer completion + "name": name of the file, + "percentage": progress of the file transfer in percent, + "speed": speed in bytes/sec, + "speedAvg": speed in bytes/sec as an exponentially weighted moving average, + "size": size of the file in bytes + } + ], + "checking": an array of names of currently active file checks + [] +} +` + "```" + ` +Values for "transferring", "checking" and "lastError" are only assigned if data is available. +The value for "eta" is null if an eta cannot be determined. +`, + }) +} + +type statsGroupCtx int64 + +const statsGroupKey statsGroupCtx = 1 + +// WithStatsGroup returns copy of the parent context with assigned group. +func WithStatsGroup(parent context.Context, group string) context.Context { + return context.WithValue(parent, statsGroupKey, group) +} + +// StatsGroupFromContext returns group from the context if it's available. +// Returns false if group is empty. +func StatsGroupFromContext(ctx context.Context) (string, bool) { + statsGroup, ok := ctx.Value(statsGroupKey).(string) + if statsGroup == "" { + ok = false + } + return statsGroup, ok +} + +// Stats gets stats by extracting group from context. +func Stats(ctx context.Context) *StatsInfo { + group, ok := StatsGroupFromContext(ctx) + if !ok { + return GlobalStats() + } + return StatsGroup(group) +} + +// StatsGroup gets stats by group name. +func StatsGroup(group string) *StatsInfo { + stats := groups.get(group) + if stats == nil { + return NewStatsGroup(group) + } + return stats +} + +// GlobalStats returns special stats used for global accounting. +func GlobalStats() *StatsInfo { + return StatsGroup(globalStats) +} + +// NewStatsGroup creates new stats under named group. +func NewStatsGroup(group string) *StatsInfo { + stats := NewStats() + groups.set(group, stats) + return stats +} + +// statsGroups holds a synchronized map of stats +type statsGroups struct { + mu sync.Mutex + m map[string]*StatsInfo +} + +// newStatsGroups makes a new statsGroups object +func newStatsGroups() *statsGroups { + return &statsGroups{ + m: make(map[string]*StatsInfo), + } +} + +// set marks the stats as belonging to a group +func (sg *statsGroups) set(group string, acc *StatsInfo) { + sg.mu.Lock() + defer sg.mu.Unlock() + sg.m[group] = acc +} + +// clear discards reference to group +func (sg *statsGroups) clear(group string) { + sg.mu.Lock() + defer sg.mu.Unlock() + delete(sg.m, group) +} + +// get gets the stats for group, or nil if not found +func (sg *statsGroups) get(group string) *StatsInfo { + sg.mu.Lock() + defer sg.mu.Unlock() + stats, ok := sg.m[group] + if !ok { + return nil + } + return stats +} + +// get gets the stats for group, or nil if not found +func (sg *statsGroups) sum() *StatsInfo { + sg.mu.Lock() + defer sg.mu.Unlock() + sum := NewStats() + for _, stats := range sg.m { + sum.bytes += stats.bytes + sum.errors += stats.errors + sum.fatalError = sum.fatalError || stats.fatalError + sum.retryError = sum.retryError || stats.retryError + sum.checks += stats.checks + sum.transfers += stats.transfers + sum.deletes += stats.deletes + sum.checking.merge(stats.checking) + sum.transferring.merge(stats.transferring) + sum.inProgress.merge(stats.inProgress) + if sum.lastError == nil && stats.lastError != nil { + sum.lastError = stats.lastError + } + } + return sum +} diff --git a/fs/accounting/stringset.go b/fs/accounting/stringset.go index 71744035d..78197e5aa 100644 --- a/fs/accounting/stringset.go +++ b/fs/accounting/stringset.go @@ -38,6 +38,17 @@ func (ss *stringSet) del(remote string) { ss.mu.Unlock() } +// merge adds items from another set +func (ss *stringSet) merge(m *stringSet) { + ss.mu.Lock() + m.mu.Lock() + for item := range m.items { + ss.items[item] = struct{}{} + } + m.mu.Unlock() + ss.mu.Unlock() +} + // empty returns whether the set has any items func (ss *stringSet) empty() bool { ss.mu.RLock() @@ -52,14 +63,14 @@ func (ss *stringSet) count() int { return len(ss.items) } -// Strings returns all the strings in the stringSet -func (ss *stringSet) Strings() []string { +// String returns string representation of set items. +func (ss *stringSet) String(progress *inProgress) string { ss.mu.RLock() defer ss.mu.RUnlock() - strings := make([]string, 0, len(ss.items)) + strngs := make([]string, 0, len(ss.items)) for name := range ss.items { var out string - if acc := Stats.inProgress.get(name); acc != nil { + if acc := progress.get(name); acc != nil { out = acc.String() } else { out = fmt.Sprintf("%*s: %s", @@ -68,24 +79,19 @@ func (ss *stringSet) Strings() []string { ss.name, ) } - strings = append(strings, " * "+out) + strngs = append(strngs, " * "+out) } - sorted := sort.StringSlice(strings) + sorted := sort.StringSlice(strngs) sorted.Sort() - return sorted -} - -// String returns all the file names in the stringSet joined by newline -func (ss *stringSet) String() string { - return strings.Join(ss.Strings(), "\n") + return strings.Join(sorted, "\n") } // progress returns total bytes read as well as the size. -func (ss *stringSet) progress() (totalBytes, totalSize int64) { +func (ss *stringSet) progress(stats *StatsInfo) (totalBytes, totalSize int64) { ss.mu.RLock() defer ss.mu.RUnlock() for name := range ss.items { - if acc := Stats.inProgress.get(name); acc != nil { + if acc := stats.inProgress.get(name); acc != nil { bytes, size := acc.progress() if size >= 0 && bytes >= 0 { totalBytes += bytes diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index e27662005..1700e0153 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -60,8 +60,8 @@ func TestMultithreadCopy(t *testing.T) { src, err := r.Fremote.NewObject(context.Background(), "file1") require.NoError(t, err) - accounting.Stats.ResetCounters() - tr := accounting.Stats.NewTransfer(src) + accounting.GlobalStats().ResetCounters() + tr := accounting.GlobalStats().NewTransfer(src) defer func() { tr.Done(err) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index a22cc78a5..3da4d5762 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -251,7 +251,7 @@ var _ fs.MimeTyper = (*overrideRemoteObject)(nil) // It returns the destination object if possible. Note that this may // be nil. func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - tr := accounting.Stats.NewTransfer(src) + tr := accounting.Stats(ctx).NewTransfer(src) defer func() { tr.Done(err) }() @@ -281,13 +281,13 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj actionTaken = "Copied (server side copy)" if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { // Check transfer limit for server side copies - if fs.Config.MaxTransfer >= 0 && accounting.Stats.GetBytes() >= int64(fs.Config.MaxTransfer) { + if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) { return nil, accounting.ErrorMaxTransferLimitReached } newDst, err = doCopy(ctx, src, remote) if err == nil { dst = newDst - accounting.Stats.Bytes(dst.Size()) // account the bytes for the server side transfer + accounting.Stats(ctx).Bytes(dst.Size()) // account the bytes for the server side transfer } } else { err = fs.ErrorCantCopy @@ -428,9 +428,9 @@ func SameObject(src, dst fs.Object) bool { // It returns the destination object if possible. Note that this may // be nil. func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - accounting.Stats.Checking(src.Remote()) + accounting.Stats(ctx).Checking(src.Remote()) defer func() { - accounting.Stats.DoneChecking(src.Remote()) + accounting.Stats(ctx).DoneChecking(src.Remote()) }() newDst = dst if fs.Config.DryRun { @@ -501,8 +501,8 @@ func SuffixName(remote string) string { // If backupDir is set then it moves the file to there instead of // deleting func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) { - accounting.Stats.Checking(dst.Remote()) - numDeletes := accounting.Stats.Deletes(1) + accounting.Stats(ctx).Checking(dst.Remote()) + numDeletes := accounting.Stats(ctx).Deletes(1) if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete { return fserrors.FatalError(errors.New("--max-delete threshold reached")) } @@ -523,7 +523,7 @@ func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs } else if !fs.Config.DryRun { fs.Infof(dst, actioned) } - accounting.Stats.DoneChecking(dst.Remote()) + accounting.Stats(ctx).DoneChecking(dst.Remote()) return err } @@ -709,8 +709,8 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { // check to see if two objects are identical using the check function func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool) { - accounting.Stats.Checking(src.Remote()) - defer accounting.Stats.DoneChecking(src.Remote()) + accounting.Stats(ctx).Checking(src.Remote()) + defer accounting.Stats(ctx).DoneChecking(src.Remote()) if sizeDiffers(src, dst) { err := errors.Errorf("Sizes differ") fs.Errorf(src, "%v", err) @@ -797,7 +797,7 @@ func CheckFn(ctx context.Context, fdst, fsrc fs.Fs, check checkFn, oneway bool) fs.Logf(fsrc, "%d files missing", c.srcFilesMissing) } - fs.Logf(fdst, "%d differences found", accounting.Stats.GetErrors()) + fs.Logf(fdst, "%d differences found", accounting.Stats(ctx).GetErrors()) if c.noHashes > 0 { fs.Logf(fdst, "%d hashes could not be checked", c.noHashes) } @@ -854,7 +854,7 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e if err != nil { return true, errors.Wrapf(err, "failed to open %q", dst) } - tr1 := accounting.Stats.NewTransfer(dst) + tr1 := accounting.Stats(ctx).NewTransfer(dst) defer func() { tr1.Done(err) }() @@ -864,7 +864,7 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e if err != nil { return true, errors.Wrapf(err, "failed to open %q", src) } - tr2 := accounting.Stats.NewTransfer(dst) + tr2 := accounting.Stats(ctx).NewTransfer(dst) defer func() { tr2.Done(err) }() @@ -930,9 +930,9 @@ func List(ctx context.Context, f fs.Fs, w io.Writer) error { // Lists in parallel which may get them out of order func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error { return ListFn(ctx, f, func(o fs.Object) { - accounting.Stats.Checking(o.Remote()) + accounting.Stats(ctx).Checking(o.Remote()) modTime := o.ModTime(ctx) - accounting.Stats.DoneChecking(o.Remote()) + accounting.Stats(ctx).DoneChecking(o.Remote()) syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote()) }) } @@ -968,9 +968,9 @@ func DropboxHashSum(ctx context.Context, f fs.Fs, w io.Writer) error { // hashSum returns the human readable hash for ht passed in. This may // be UNSUPPORTED or ERROR. func hashSum(ctx context.Context, ht hash.Type, o fs.Object) string { - accounting.Stats.Checking(o.Remote()) + accounting.Stats(ctx).Checking(o.Remote()) sum, err := o.Hash(ctx, ht) - accounting.Stats.DoneChecking(o.Remote()) + accounting.Stats(ctx).DoneChecking(o.Remote()) if err == hash.ErrUnsupported { sum = "UNSUPPORTED" } else if err != nil { @@ -1167,7 +1167,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { var mu sync.Mutex return ListFn(ctx, f, func(o fs.Object) { var err error - tr := accounting.Stats.NewTransfer(o) + tr := accounting.Stats(ctx).NewTransfer(o) defer func() { tr.Done(err) }() @@ -1206,7 +1206,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { // Rcat reads data from the Reader until EOF and uploads it to a file on remote func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { - tr := accounting.Stats.NewTransferRemoteSize(dstFileName, -1) + tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1) defer func() { tr.Done(err) }() @@ -1527,7 +1527,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo if size >= 0 { var err error // Size known use Put - tr := accounting.Stats.NewTransferRemoteSize(dstFileName, size) + tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size) defer func() { tr.Done(err) }() @@ -1664,7 +1664,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str } return errors.Wrap(err, "error while attempting to move file to a temporary location") } - tr := accounting.Stats.NewTransfer(srcObj) + tr := accounting.Stats(ctx).NewTransfer(srcObj) defer func() { tr.Done(err) }() @@ -1711,11 +1711,11 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str _, err = Op(ctx, fdst, dstObj, dstFileName, srcObj) } else { - accounting.Stats.Checking(srcFileName) + accounting.Stats(ctx).Checking(srcFileName) if !cp { err = DeleteFile(ctx, srcObj) } - defer accounting.Stats.DoneChecking(srcFileName) + defer accounting.Stats(ctx).DoneChecking(srcFileName) } return err } diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index 75f84373a..841a5a6c8 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -14,7 +14,7 @@ // fstest.CheckItems() before use. This make sure the directory // listing is now consistent and stops cascading errors. // -// Call accounting.Stats.ResetCounters() before every fs.Sync() as it +// Call accounting.GlobalStats().ResetCounters() before every fs.Sync() as it // uses the error count internally. package operations_test @@ -315,15 +315,15 @@ func testCheck(t *testing.T, checkFunction func(ctx context.Context, fdst, fsrc check := func(i int, wantErrors int64, wantChecks int64, oneway bool) { fs.Debugf(r.Fremote, "%d: Starting check test", i) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() var buf bytes.Buffer log.SetOutput(&buf) defer func() { log.SetOutput(os.Stderr) }() err := checkFunction(context.Background(), r.Fremote, r.Flocal, oneway) - gotErrors := accounting.Stats.GetErrors() - gotChecks := accounting.Stats.GetChecks() + gotErrors := accounting.GlobalStats().GetErrors() + gotChecks := accounting.GlobalStats().GetChecks() if wantErrors == 0 && err != nil { t.Errorf("%d: Got error when not expecting one: %v", i, err) } diff --git a/fs/rc/job.go b/fs/rc/jobs/job.go similarity index 65% rename from fs/rc/job.go rename to fs/rc/jobs/job.go index 98f0c7ce5..a35cc03a5 100644 --- a/fs/rc/job.go +++ b/fs/rc/jobs/job.go @@ -1,13 +1,18 @@ // Manage background jobs that the rc is running -package rc +package jobs import ( "context" + "fmt" "sync" "sync/atomic" "time" + "github.com/ncw/rclone/fs/rc" + + "github.com/ncw/rclone/fs/accounting" + "github.com/ncw/rclone/fs" "github.com/pkg/errors" ) @@ -16,13 +21,14 @@ import ( type Job struct { mu sync.Mutex ID int64 `json:"id"` + Group string `json:"group"` StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime"` Error string `json:"error"` Finished bool `json:"finished"` Success bool `json:"success"` Duration float64 `json:"duration"` - Output Params `json:"output"` + Output rc.Params `json:"output"` Stop func() `json:"-"` } @@ -96,11 +102,11 @@ func (jobs *Jobs) Get(ID int64) *Job { } // mark the job as finished -func (job *Job) finish(out Params, err error) { +func (job *Job) finish(out rc.Params, err error) { job.mu.Lock() job.EndTime = time.Now() if out == nil { - out = make(Params) + out = make(rc.Params) } job.Output = out job.Duration = job.EndTime.Sub(job.StartTime).Seconds() @@ -117,7 +123,7 @@ func (job *Job) finish(out Params, err error) { } // run the job until completion writing the return status -func (job *Job) run(ctx context.Context, fn Func, in Params) { +func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) { defer func() { if r := recover(); r != nil { job.finish(nil, errors.Errorf("panic received: %v", r)) @@ -126,37 +132,93 @@ func (job *Job) run(ctx context.Context, fn Func, in Params) { job.finish(fn(ctx, in)) } -// NewJob start a new Job off -func (jobs *Jobs) NewJob(fn Func, in Params) *Job { - ctx, cancel := context.WithCancel(context.Background()) +func getGroup(in rc.Params) string { + // Check to see if the group is set + group, err := in.GetString("_group") + if rc.NotErrParamNotFound(err) { + fs.Errorf(nil, "Can't get _group param %+v", err) + } + delete(in, "_group") + return group +} + +// NewAsyncJob start a new asynchronous Job off +func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { + id := atomic.AddInt64(&jobID, 1) + + group := getGroup(in) + if group == "" { + group = fmt.Sprintf("job/%d", id) + } + ctx := accounting.WithStatsGroup(context.Background(), group) + ctx, cancel := context.WithCancel(ctx) stop := func() { cancel() // Wait for cancel to propagate before returning. <-ctx.Done() } job := &Job{ - ID: atomic.AddInt64(&jobID, 1), + ID: id, + Group: group, StartTime: time.Now(), Stop: stop, } - go job.run(ctx, fn, in) jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() + go job.run(ctx, fn, in) return job - } -// StartJob starts a new job and returns a Param suitable for output -func StartJob(fn Func, in Params) (Params, error) { - job := running.NewJob(fn, in) - out := make(Params) +// NewSyncJob start a new synchronous Job off +func (jobs *Jobs) NewSyncJob(ctx context.Context, in rc.Params) (*Job, context.Context) { + id := atomic.AddInt64(&jobID, 1) + group := getGroup(in) + if group == "" { + group = fmt.Sprintf("job/%d", id) + } + ctxG := accounting.WithStatsGroup(ctx, fmt.Sprintf("job/%d", id)) + ctx, cancel := context.WithCancel(ctxG) + stop := func() { + cancel() + // Wait for cancel to propagate before returning. + <-ctx.Done() + } + job := &Job{ + ID: id, + Group: group, + StartTime: time.Now(), + Stop: stop, + } + jobs.mu.Lock() + jobs.jobs[job.ID] = job + jobs.mu.Unlock() + return job, ctx +} + +// StartAsyncJob starts a new job asynchronously and returns a Param suitable +// for output. +func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) { + job := running.NewAsyncJob(fn, in) + out := make(rc.Params) out["jobid"] = job.ID return out, nil } +// ExecuteJob executes new job synchronously and returns a Param suitable for +// output. +func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) { + job, ctx := running.NewSyncJob(ctx, in) + job.run(ctx, fn, in) + var err error + if !job.Success { + err = errors.New(job.Error) + } + return job.Output, job.ID, err +} + func init() { - Add(Call{ + rc.Add(rc.Call{ Path: "job/status", Fn: rcJobStatus, Title: "Reads the status of the job ID", @@ -179,7 +241,7 @@ Results } // Returns the status of a job -func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { +func rcJobStatus(ctx context.Context, in rc.Params) (out rc.Params, err error) { jobID, err := in.GetInt64("jobid") if err != nil { return nil, err @@ -190,8 +252,8 @@ func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { } job.mu.Lock() defer job.mu.Unlock() - out = make(Params) - err = Reshape(&out, job) + out = make(rc.Params) + err = rc.Reshape(&out, job) if err != nil { return nil, errors.Wrap(err, "reshape failed in job status") } @@ -199,7 +261,7 @@ func rcJobStatus(ctx context.Context, in Params) (out Params, err error) { } func init() { - Add(Call{ + rc.Add(rc.Call{ Path: "job/list", Fn: rcJobList, Title: "Lists the IDs of the running jobs", @@ -212,14 +274,14 @@ Results } // Returns list of job ids. -func rcJobList(ctx context.Context, in Params) (out Params, err error) { - out = make(Params) +func rcJobList(ctx context.Context, in rc.Params) (out rc.Params, err error) { + out = make(rc.Params) out["jobids"] = running.IDs() return out, nil } func init() { - Add(Call{ + rc.Add(rc.Call{ Path: "job/stop", Fn: rcJobStop, Title: "Stop the running job", @@ -230,7 +292,7 @@ func init() { } // Stops the running job. -func rcJobStop(ctx context.Context, in Params) (out Params, err error) { +func rcJobStop(ctx context.Context, in rc.Params) (out rc.Params, err error) { jobID, err := in.GetInt64("jobid") if err != nil { return nil, err @@ -241,7 +303,7 @@ func rcJobStop(ctx context.Context, in Params) (out Params, err error) { } job.mu.Lock() defer job.mu.Unlock() - out = make(Params) + out = make(rc.Params) job.Stop() return out, nil } diff --git a/fs/rc/job_test.go b/fs/rc/jobs/job_test.go similarity index 63% rename from fs/rc/job_test.go rename to fs/rc/jobs/job_test.go index 8c4c77838..a9372f13d 100644 --- a/fs/rc/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -1,4 +1,4 @@ -package rc +package jobs import ( "context" @@ -7,6 +7,7 @@ import ( "time" "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/rc" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,10 +37,10 @@ func TestJobsExpire(t *testing.T) { jobs := newJobs() jobs.expireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) - job := jobs.NewJob(func(ctx context.Context, in Params) (Params, error) { + job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) { defer close(wait) return in, nil - }, Params{}) + }, rc.Params{}) <-wait assert.Equal(t, 1, len(jobs.jobs)) jobs.Expire() @@ -57,14 +58,14 @@ func TestJobsExpire(t *testing.T) { jobs.mu.Unlock() } -var noopFn = func(ctx context.Context, in Params) (Params, error) { +var noopFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { return nil, nil } func TestJobsIDs(t *testing.T) { jobs := newJobs() - job1 := jobs.NewJob(noopFn, Params{}) - job2 := jobs.NewJob(noopFn, Params{}) + job1 := jobs.NewAsyncJob(noopFn, rc.Params{}) + job2 := jobs.NewAsyncJob(noopFn, rc.Params{}) wantIDs := []int64{job1.ID, job2.ID} gotIDs := jobs.IDs() require.Equal(t, 2, len(gotIDs)) @@ -76,17 +77,22 @@ func TestJobsIDs(t *testing.T) { func TestJobsGet(t *testing.T) { jobs := newJobs() - job := jobs.NewJob(noopFn, Params{}) + job := jobs.NewAsyncJob(noopFn, rc.Params{}) assert.Equal(t, job, jobs.Get(job.ID)) assert.Nil(t, jobs.Get(123123123123)) } -var longFn = func(ctx context.Context, in Params) (Params, error) { +var longFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { time.Sleep(1 * time.Hour) return nil, nil } -var ctxFn = func(ctx context.Context, in Params) (Params, error) { +var shortFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { + time.Sleep(time.Millisecond) + return nil, nil +} + +var ctxFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -105,17 +111,17 @@ func sleepJob() { func TestJobFinish(t *testing.T) { jobs := newJobs() - job := jobs.NewJob(longFn, Params{}) + job := jobs.NewAsyncJob(longFn, rc.Params{}) sleepJob() assert.Equal(t, true, job.EndTime.IsZero()) - assert.Equal(t, Params(nil), job.Output) + assert.Equal(t, rc.Params(nil), job.Output) assert.Equal(t, 0.0, job.Duration) assert.Equal(t, "", job.Error) assert.Equal(t, false, job.Success) assert.Equal(t, false, job.Finished) - wantOut := Params{"a": 1} + wantOut := rc.Params{"a": 1} job.finish(wantOut, nil) assert.Equal(t, false, job.EndTime.IsZero()) @@ -125,18 +131,18 @@ func TestJobFinish(t *testing.T) { assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Finished) - job = jobs.NewJob(longFn, Params{}) + job = jobs.NewAsyncJob(longFn, rc.Params{}) sleepJob() job.finish(nil, nil) assert.Equal(t, false, job.EndTime.IsZero()) - assert.Equal(t, Params{}, job.Output) + assert.Equal(t, rc.Params{}, job.Output) assert.True(t, job.Duration >= floatSleepTime) assert.Equal(t, "", job.Error) assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Finished) - job = jobs.NewJob(longFn, Params{}) + job = jobs.NewAsyncJob(longFn, rc.Params{}) sleepJob() job.finish(wantOut, errors.New("potato")) @@ -152,14 +158,14 @@ func TestJobFinish(t *testing.T) { // part of NewJob, now just test the panic catching func TestJobRunPanic(t *testing.T) { wait := make(chan struct{}) - boom := func(ctx context.Context, in Params) (Params, error) { + boom := func(ctx context.Context, in rc.Params) (rc.Params, error) { sleepJob() defer close(wait) panic("boom") } jobs := newJobs() - job := jobs.NewJob(boom, Params{}) + job := jobs.NewAsyncJob(boom, rc.Params{}) <-wait runtime.Gosched() // yield to make sure job is updated @@ -176,7 +182,7 @@ func TestJobRunPanic(t *testing.T) { job.mu.Lock() assert.Equal(t, false, job.EndTime.IsZero()) - assert.Equal(t, Params{}, job.Output) + assert.Equal(t, rc.Params{}, job.Output) assert.True(t, job.Duration >= floatSleepTime) assert.Equal(t, "panic received: boom", job.Error) assert.Equal(t, false, job.Success) @@ -187,7 +193,7 @@ func TestJobRunPanic(t *testing.T) { func TestJobsNewJob(t *testing.T) { jobID = 0 jobs := newJobs() - job := jobs.NewJob(noopFn, Params{}) + job := jobs.NewAsyncJob(noopFn, rc.Params{}) assert.Equal(t, int64(1), job.ID) assert.Equal(t, job, jobs.Get(1)) assert.NotEmpty(t, job.Stop) @@ -195,19 +201,26 @@ func TestJobsNewJob(t *testing.T) { func TestStartJob(t *testing.T) { jobID = 0 - out, err := StartJob(longFn, Params{}) + out, err := StartAsyncJob(longFn, rc.Params{}) assert.NoError(t, err) - assert.Equal(t, Params{"jobid": int64(1)}, out) + assert.Equal(t, rc.Params{"jobid": int64(1)}, out) +} + +func TestExecuteJob(t *testing.T) { + jobID = 0 + _, id, err := ExecuteJob(context.Background(), shortFn, rc.Params{}) + assert.NoError(t, err) + assert.Equal(t, int64(1), id) } func TestRcJobStatus(t *testing.T) { jobID = 0 - _, err := StartJob(longFn, Params{}) + _, err := StartAsyncJob(longFn, rc.Params{}) assert.NoError(t, err) - call := Calls.Get("job/status") + call := rc.Calls.Get("job/status") assert.NotNil(t, call) - in := Params{"jobid": 1} + in := rc.Params{"jobid": 1} out, err := call.Fn(context.Background(), in) require.NoError(t, err) require.NotNil(t, out) @@ -216,12 +229,12 @@ func TestRcJobStatus(t *testing.T) { assert.Equal(t, false, out["finished"]) assert.Equal(t, false, out["success"]) - in = Params{"jobid": 123123123} + in = rc.Params{"jobid": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "job not found") - in = Params{"jobidx": 123123123} + in = rc.Params{"jobidx": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "Didn't find key") @@ -229,45 +242,88 @@ func TestRcJobStatus(t *testing.T) { func TestRcJobList(t *testing.T) { jobID = 0 - _, err := StartJob(longFn, Params{}) + _, err := StartAsyncJob(longFn, rc.Params{}) assert.NoError(t, err) - call := Calls.Get("job/list") + call := rc.Calls.Get("job/list") assert.NotNil(t, call) - in := Params{} + in := rc.Params{} out, err := call.Fn(context.Background(), in) require.NoError(t, err) require.NotNil(t, out) - assert.Equal(t, Params{"jobids": []int64{1}}, out) + assert.Equal(t, rc.Params{"jobids": []int64{1}}, out) } -func TestRcJobStop(t *testing.T) { +func TestRcAsyncJobStop(t *testing.T) { jobID = 0 - _, err := StartJob(ctxFn, Params{}) + _, err := StartAsyncJob(ctxFn, rc.Params{}) assert.NoError(t, err) - call := Calls.Get("job/stop") + call := rc.Calls.Get("job/stop") assert.NotNil(t, call) - in := Params{"jobid": 1} + in := rc.Params{"jobid": 1} out, err := call.Fn(context.Background(), in) require.NoError(t, err) require.Empty(t, out) - in = Params{"jobid": 123123123} + in = rc.Params{"jobid": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "job not found") - in = Params{"jobidx": 123123123} + in = rc.Params{"jobidx": 123123123} _, err = call.Fn(context.Background(), in) require.Error(t, err) assert.Contains(t, err.Error(), "Didn't find key") time.Sleep(10 * time.Millisecond) - call = Calls.Get("job/status") + call = rc.Calls.Get("job/status") assert.NotNil(t, call) - in = Params{"jobid": 1} + in = rc.Params{"jobid": 1} + out, err = call.Fn(context.Background(), in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, float64(1), out["id"]) + assert.Equal(t, "context canceled", out["error"]) + assert.Equal(t, true, out["finished"]) + assert.Equal(t, false, out["success"]) +} + +func TestRcSyncJobStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + jobID = 0 + _, id, err := ExecuteJob(ctx, ctxFn, rc.Params{}) + assert.Error(t, err) + assert.Equal(t, int64(1), id) + }() + + time.Sleep(10 * time.Millisecond) + + call := rc.Calls.Get("job/stop") + assert.NotNil(t, call) + in := rc.Params{"jobid": 1} + out, err := call.Fn(context.Background(), in) + require.NoError(t, err) + require.Empty(t, out) + + in = rc.Params{"jobid": 123123123} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "job not found") + + in = rc.Params{"jobidx": 123123123} + _, err = call.Fn(context.Background(), in) + require.Error(t, err) + assert.Contains(t, err.Error(), "Didn't find key") + + cancel() + time.Sleep(10 * time.Millisecond) + + call = rc.Calls.Get("job/status") + assert.NotNil(t, call) + in = rc.Params{"jobid": 1} out, err = call.Fn(context.Background(), in) require.NoError(t, err) require.NotNil(t, out) diff --git a/fs/rc/rcserver/rcserver.go b/fs/rc/rcserver/rcserver.go index 1a33be81a..ae803ed71 100644 --- a/fs/rc/rcserver/rcserver.go +++ b/fs/rc/rcserver/rcserver.go @@ -3,6 +3,8 @@ package rcserver import ( "encoding/json" + "flag" + "fmt" "mime" "net/http" "net/url" @@ -10,6 +12,10 @@ import ( "sort" "strings" + "github.com/skratchdot/open-golang/open" + + "github.com/ncw/rclone/fs/rc/jobs" + "github.com/ncw/rclone/cmd/serve/httplib" "github.com/ncw/rclone/cmd/serve/httplib/serve" "github.com/ncw/rclone/fs" @@ -18,7 +24,6 @@ import ( "github.com/ncw/rclone/fs/list" "github.com/ncw/rclone/fs/rc" "github.com/pkg/errors" - "github.com/skratchdot/open-golang/open" ) // Start the remote control server if configured @@ -79,7 +84,10 @@ func (s *Server) Serve() error { if user != "" || pass != "" { openURL.User = url.UserPassword(user, pass) } - _ = open.Start(openURL.String()) + // Don't open browser if serving in testing environment. + if flag.Lookup("test.v") == nil { + _ = open.Start(openURL.String()) + } } return nil } @@ -181,15 +189,16 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string) writeError(path, in, w, err, http.StatusBadRequest) return } - delete(in, "_async") // remove the async parameter after parsing so vfs operations don't get confused fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) var out rc.Params if isAsync { - out, err = rc.StartJob(call.Fn, in) + out, err = jobs.StartAsyncJob(call.Fn, in) } else { - out, err = call.Fn(r.Context(), in) + var jobID int64 + out, jobID, err = jobs.ExecuteJob(r.Context(), call.Fn, in) + w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", jobID)) } if err != nil { writeError(path, in, w, err, http.StatusInternalServerError) diff --git a/fs/rc/rcserver/rcserver_test.go b/fs/rc/rcserver/rcserver_test.go index 647b3620a..cdb7850aa 100644 --- a/fs/rc/rcserver/rcserver_test.go +++ b/fs/rc/rcserver/rcserver_test.go @@ -611,10 +611,7 @@ func TestRCAsync(t *testing.T) { ContentType: "application/json", Body: `{ "_async":true }`, Status: http.StatusOK, - Expected: `{ - "jobid": 1 -} -`, + Contains: regexp.MustCompile(`(?s)\{.*\"jobid\":.*\}`), }, { Name: "bad", URL: "rc/noop", diff --git a/fs/sync/sync.go b/fs/sync/sync.go index f96bdcd5b..98bd863a6 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -82,12 +82,12 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete dstEmptyDirs: make(map[string]fs.DirEntry), srcEmptyDirs: make(map[string]fs.DirEntry), noTraverse: fs.Config.NoTraverse, - toBeChecked: newPipe(accounting.Stats.SetCheckQueue, fs.Config.MaxBacklog), - toBeUploaded: newPipe(accounting.Stats.SetTransferQueue, fs.Config.MaxBacklog), + toBeChecked: newPipe(accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog), + toBeUploaded: newPipe(accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog), deleteFilesCh: make(chan fs.Object, fs.Config.Checkers), trackRenames: fs.Config.TrackRenames, commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), - toBeRenamed: newPipe(accounting.Stats.SetRenameQueue, fs.Config.MaxBacklog), + toBeRenamed: newPipe(accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog), trackRenamesCh: make(chan fs.Object, fs.Config.Checkers), } s.ctx, s.cancel = context.WithCancel(ctx) @@ -215,7 +215,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { return } src := pair.Src - accounting.Stats.Checking(src.Remote()) + accounting.Stats(s.ctx).Checking(src.Remote()) // Check to see if can store this if src.Storable() { NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, pair.Dst, pair.Src, s.compareCopyDest, s.backupDir) @@ -256,7 +256,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { } } } - accounting.Stats.DoneChecking(src.Remote()) + accounting.Stats(s.ctx).DoneChecking(src.Remote()) } } @@ -401,7 +401,7 @@ func (s *syncCopyMove) stopDeleters() { // checkSrcMap is clear then it assumes that the any source files that // have been found have been removed from dstFiles already. func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error { - if accounting.Stats.Errored() && !fs.Config.IgnoreErrors { + if accounting.Stats(s.ctx).Errored() && !fs.Config.IgnoreErrors { fs.Errorf(s.fdst, "%v", fs.ErrorNotDeleting) return fs.ErrorNotDeleting } @@ -437,7 +437,7 @@ func deleteEmptyDirectories(ctx context.Context, f fs.Fs, entriesMap map[string] if len(entriesMap) == 0 { return nil } - if accounting.Stats.Errored() && !fs.Config.IgnoreErrors { + if accounting.Stats(ctx).Errored() && !fs.Config.IgnoreErrors { fs.Errorf(f, "%v", fs.ErrorNotDeletingDirs) return fs.ErrorNotDeletingDirs } @@ -497,8 +497,8 @@ func copyEmptyDirectories(ctx context.Context, f fs.Fs, entries map[string]fs.Di } } - if accounting.Stats.Errored() { - fs.Debugf(f, "failed to copy %d directories", accounting.Stats.GetErrors()) + if accounting.Stats(ctx).Errored() { + fs.Debugf(f, "failed to copy %d directories", accounting.Stats(ctx).GetErrors()) } if okCount > 0 { @@ -587,12 +587,12 @@ func (s *syncCopyMove) makeRenameMap() { for obj := range in { // only create hash for dst fs.Object if its size could match if _, found := possibleSizes[obj.Size()]; found { - accounting.Stats.Checking(obj.Remote()) + accounting.Stats(s.ctx).Checking(obj.Remote()) hash := s.renameHash(obj) if hash != "" { s.pushRenameMap(hash, obj) } - accounting.Stats.DoneChecking(obj.Remote()) + accounting.Stats(s.ctx).DoneChecking(obj.Remote()) } } }() diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index d721601ed..7c91c677d 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -105,7 +105,7 @@ func TestSyncNoTraverse(t *testing.T) { file1 := r.WriteFile("sub dir/hello world", "hello world", t1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -306,24 +306,24 @@ func TestSyncBasedOnCheckSum(t *testing.T) { file1 := r.WriteFile("check sum", "-", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Fremote, file1) // Change last modified date only file2 := r.WriteFile("check sum", "-", t2) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred no files - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file2) fstest.CheckItems(t, r.Fremote, file1) } @@ -340,24 +340,24 @@ func TestSyncSizeOnly(t *testing.T) { file1 := r.WriteFile("sizeonly", "potato", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Fremote, file1) // Update mtime, md5sum but not length of file file2 := r.WriteFile("sizeonly", "POTATO", t2) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred no files - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file2) fstest.CheckItems(t, r.Fremote, file1) } @@ -374,24 +374,24 @@ func TestSyncIgnoreSize(t *testing.T) { file1 := r.WriteFile("ignore-size", "contents", t1) fstest.CheckItems(t, r.Flocal, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Fremote, file1) // Update size but not date of file file2 := r.WriteFile("ignore-size", "longer contents but same date", t1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred no files - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file2) fstest.CheckItems(t, r.Fremote, file1) } @@ -402,24 +402,24 @@ func TestSyncIgnoreTimes(t *testing.T) { file1 := r.WriteBoth(context.Background(), "existing", "potato", t1) fstest.CheckItems(t, r.Fremote, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly 0 files because the // files were identical. - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) fs.Config.IgnoreTimes = true defer func() { fs.Config.IgnoreTimes = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file even though the // files were identical. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file1) fstest.CheckItems(t, r.Fremote, file1) @@ -433,7 +433,7 @@ func TestSyncIgnoreExisting(t *testing.T) { fs.Config.IgnoreExisting = true defer func() { fs.Config.IgnoreExisting = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1) @@ -441,7 +441,7 @@ func TestSyncIgnoreExisting(t *testing.T) { // Change everything r.WriteFile("existing", "newpotatoes", t2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // Items should not change @@ -488,7 +488,7 @@ func TestSyncIgnoreErrors(t *testing.T) { fs.GetModifyWindow(r.Fremote), ) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() fs.CountError(errors.New("boom")) assert.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false)) @@ -532,7 +532,7 @@ func TestSyncAfterChangingModtimeOnly(t *testing.T) { fs.Config.DryRun = true defer func() { fs.Config.DryRun = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -541,7 +541,7 @@ func TestSyncAfterChangingModtimeOnly(t *testing.T) { fs.Config.DryRun = false - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -569,7 +569,7 @@ func TestSyncAfterChangingModtimeOnlyWithNoUpdateModTime(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1) fstest.CheckItems(t, r.Fremote, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -590,7 +590,7 @@ func TestSyncDoesntUpdateModtime(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1) fstest.CheckItems(t, r.Fremote, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -598,7 +598,7 @@ func TestSyncDoesntUpdateModtime(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) // We should have transferred exactly one file, not set the mod time - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) } func TestSyncAfterAddingAFile(t *testing.T) { @@ -610,7 +610,7 @@ func TestSyncAfterAddingAFile(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1, file2) fstest.CheckItems(t, r.Fremote, file1) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1, file2) @@ -625,7 +625,7 @@ func TestSyncAfterChangingFilesSizeOnly(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2) @@ -648,7 +648,7 @@ func TestSyncAfterChangingContentsOnly(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2) @@ -664,7 +664,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileDryRun(t *testing.T) { file3 := r.WriteBoth(context.Background(), "empty space", "-", t2) fs.Config.DryRun = true - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) fs.Config.DryRun = false require.NoError(t, err) @@ -683,7 +683,7 @@ func TestSyncAfterRemovingAFileAndAddingAFile(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2, file3) fstest.CheckItems(t, r.Flocal, file1, file3) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1, file3) @@ -729,7 +729,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDir(t *testing.T) { fs.GetModifyWindow(r.Fremote), ) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -798,7 +798,7 @@ func TestSyncAfterRemovingAFileAndAddingAFileSubDirWithErrors(t *testing.T) { fs.GetModifyWindow(r.Fremote), ) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() fs.CountError(errors.New("boom")) err := Sync(context.Background(), r.Fremote, r.Flocal, false) assert.Equal(t, fs.ErrorNotDeleting, err) @@ -876,7 +876,7 @@ func TestCopyDeleteBefore(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Flocal, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := CopyDir(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) @@ -899,14 +899,14 @@ func TestSyncWithExclude(t *testing.T) { filter.Active.Opt.MaxSize = -1 }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, file2, file1) // Now sync the other way round and check enormous doesn't get // deleted as it is excluded from the sync - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Flocal, r.Fremote, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2, file1, file3) @@ -929,14 +929,14 @@ func TestSyncWithExcludeAndDeleteExcluded(t *testing.T) { filter.Active.Opt.DeleteExcluded = false }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, file2) // Check sync the other way round to make sure enormous gets // deleted even though it is excluded - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Flocal, r.Fremote, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file2) @@ -971,7 +971,7 @@ func TestSyncWithUpdateOlder(t *testing.T) { fs.Config.ModifyWindow = oldModifyWindow }() - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Fremote, oneO, twoF, threeO, fourF, fiveF) @@ -995,7 +995,7 @@ func TestSyncWithTrackRenames(t *testing.T) { f1 := r.WriteFile("potato", "Potato Content", t1) f2 := r.WriteFile("yam", "Yam Content", t2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false)) fstest.CheckItems(t, r.Fremote, f1, f2) @@ -1004,7 +1004,7 @@ func TestSyncWithTrackRenames(t *testing.T) { // Now rename locally. f2 = r.RenameFile(f2, "yaml") - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false)) fstest.CheckItems(t, r.Fremote, f1, f2) @@ -1012,15 +1012,15 @@ func TestSyncWithTrackRenames(t *testing.T) { if canTrackRenames { if r.Fremote.Features().Move == nil || r.Fremote.Name() == "TestUnion" { // union remote can Move but returns CantMove error // If no server side Move, we are falling back to Copy + Delete - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) // 1 copy - assert.Equal(t, int64(4), accounting.Stats.GetChecks()) // 2 file checks + 1 move + 1 delete + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) // 1 copy + assert.Equal(t, int64(4), accounting.GlobalStats().GetChecks()) // 2 file checks + 1 move + 1 delete } else { - assert.Equal(t, int64(0), accounting.Stats.GetTransfers()) // 0 copy - assert.Equal(t, int64(3), accounting.Stats.GetChecks()) // 2 file checks + 1 move + assert.Equal(t, int64(0), accounting.GlobalStats().GetTransfers()) // 0 copy + assert.Equal(t, int64(3), accounting.GlobalStats().GetChecks()) // 2 file checks + 1 move } } else { - assert.Equal(t, int64(2), accounting.Stats.GetChecks()) // 2 file checks - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) // 0 copy + assert.Equal(t, int64(2), accounting.GlobalStats().GetChecks()) // 2 file checks + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) // 0 copy } } @@ -1049,7 +1049,7 @@ func testServerSideMove(t *testing.T, r *fstest.Run, withFilter, testDeleteEmpty fstest.CheckItems(t, FremoteMove, file2, file3) // Do server side move - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = MoveDir(context.Background(), FremoteMove, r.Fremote, testDeleteEmptyDirs, false) require.NoError(t, err) @@ -1076,7 +1076,7 @@ func testServerSideMove(t *testing.T, r *fstest.Run, withFilter, testDeleteEmpty } // Move it back to a new empty remote, dst does not exist this time - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = MoveDir(context.Background(), FremoteMove2, FremoteMove, testDeleteEmptyDirs, false) require.NoError(t, err) @@ -1439,7 +1439,7 @@ func testSyncBackupDir(t *testing.T, suffix string, suffixKeepExtension bool) { fdst, err := fs.NewFs(r.FremoteName + "/dst") require.NoError(t, err) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1464,7 +1464,7 @@ func testSyncBackupDir(t *testing.T, suffix string, suffixKeepExtension bool) { // This should delete three and overwrite one again, checking // the files got overwritten correctly in backup-dir - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), fdst, r.Flocal, false) require.NoError(t, err) @@ -1518,7 +1518,7 @@ func testSyncSuffix(t *testing.T, suffix string, suffixKeepExtension bool) { fdst, err := fs.NewFs(r.FremoteName + "/dst") require.NoError(t, err) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = operations.CopyFile(context.Background(), fdst, r.Flocal, "one", "one") require.NoError(t, err) err = operations.CopyFile(context.Background(), fdst, r.Flocal, "two", "two") @@ -1548,7 +1548,7 @@ func testSyncSuffix(t *testing.T, suffix string, suffixKeepExtension bool) { // This should delete three and overwrite one again, checking // the files got overwritten correctly in backup-dir - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = operations.CopyFile(context.Background(), fdst, r.Flocal, "one", "one") require.NoError(t, err) err = operations.CopyFile(context.Background(), fdst, r.Flocal, "two", "two") @@ -1594,13 +1594,13 @@ func TestSyncUTFNorm(t *testing.T) { file2 := r.WriteObject(context.Background(), Encoding2, "This is a old test", t2) fstest.CheckItems(t, r.Fremote, file2) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) // We should have transferred exactly one file, but kept the // normalized state of the file. - assert.Equal(t, int64(1), accounting.Stats.GetTransfers()) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) fstest.CheckItems(t, r.Flocal, file1) file1.Path = file2.Path fstest.CheckItems(t, r.Fremote, file1) @@ -1620,7 +1620,7 @@ func TestSyncImmutable(t *testing.T) { fstest.CheckItems(t, r.Fremote) // Should succeed - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1) @@ -1632,7 +1632,7 @@ func TestSyncImmutable(t *testing.T) { fstest.CheckItems(t, r.Fremote, file1) // Should fail with ErrorImmutableModified and not modify local or remote files - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err = Sync(context.Background(), r.Fremote, r.Flocal, false) assert.EqualError(t, err, fs.ErrorImmutableModified.Error()) fstest.CheckItems(t, r.Flocal, file2) @@ -1659,7 +1659,7 @@ func TestSyncIgnoreCase(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2) // Should not copy files that are differently-cased but otherwise identical - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) require.NoError(t, err) fstest.CheckItems(t, r.Flocal, file1) @@ -1694,7 +1694,7 @@ func TestAbort(t *testing.T) { fstest.CheckItems(t, r.Flocal, file1, file2, file3) fstest.CheckItems(t, r.Fremote) - accounting.Stats.ResetCounters() + accounting.GlobalStats().ResetCounters() err := Sync(context.Background(), r.Fremote, r.Flocal, false) assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err) diff --git a/fstest/fstest.go b/fstest/fstest.go index 7464a4642..e7f67e3dd 100644 --- a/fstest/fstest.go +++ b/fstest/fstest.go @@ -274,7 +274,8 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs expectedDirs = filterEmptyDirs(t, items, expectedDirs) } is := NewItems(items) - oldErrors := accounting.Stats.GetErrors() + ctx := context.Background() + oldErrors := accounting.Stats(ctx).GetErrors() var objs []fs.Object var dirs []fs.Directory var err error @@ -283,7 +284,6 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs wantListing1, wantListing2 := makeListingFromItems(items) gotListing := "" listingOK := false - ctx := context.Background() for i := 1; i <= retries; i++ { objs, dirs, err = walk.GetAll(ctx, f, "", true, -1) if err != nil && err != fs.ErrorDirNotFound { @@ -317,8 +317,8 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs } is.Done(t) // Don't notice an error when listing an empty directory - if len(items) == 0 && oldErrors == 0 && accounting.Stats.GetErrors() == 1 { - accounting.Stats.ResetErrors() + if len(items) == 0 && oldErrors == 0 && accounting.Stats(ctx).GetErrors() == 1 { + accounting.Stats(ctx).ResetErrors() } // Check the directories if expectedDirs != nil { diff --git a/vfs/read.go b/vfs/read.go index fc3107cf8..7dd5b0775 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -71,7 +71,7 @@ func (fh *ReadFileHandle) openPending() (err error) { if err != nil { return err } - tr := accounting.Stats.NewTransfer(o) + tr := accounting.GlobalStats().NewTransfer(o) fh.done = tr.Done fh.r = tr.Account(r).WithBuffer() // account the transfer fh.opened = true diff --git a/vfs/read_write.go b/vfs/read_write.go index 84a57a74e..d5e43fbb1 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/ncw/rclone/fs" - "github.com/ncw/rclone/fs/accounting" "github.com/ncw/rclone/fs/log" "github.com/ncw/rclone/fs/operations" "github.com/ncw/rclone/lib/file" @@ -87,10 +86,6 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl // copy an object to or from the remote while accounting for it func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { if operations.NeedTransfer(context.TODO(), dst, src) { - tr := accounting.Stats.NewTransfer(src) - defer func() { - tr.Done(err) - }() newDst, err = operations.Copy(context.TODO(), f, dst, remote, src) } else { newDst = dst