From 91b54aafccc3aed01a5389864e8ef3902286b406 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 18 Jan 2024 16:44:13 +0000 Subject: [PATCH] rc: add srcFs and dstFs to core/stats and core/transferred stats Before this change it wasn't possible to see where transfers were going from and to in core/stats and core/transferred. When use in rclone mount in particular this made interpreting the stats very hard. --- cmd/serve/ftp/ftp.go | 4 +- cmd/serve/http/http.go | 2 +- docs/content/rc.md | 12 +++-- fs/accounting/accounting.go | 7 +-- fs/accounting/stats.go | 16 ++++-- fs/accounting/transfer.go | 32 +++++++++--- fs/accounting/transfer_test.go | 66 +++++++++++++++++++++++++ fs/accounting/transfermap.go | 6 +-- fs/operations/check.go | 6 +-- fs/operations/copy.go | 2 +- fs/operations/multithread_test.go | 4 +- fs/operations/operations.go | 12 ++--- lib/http/serve/dir.go | 2 +- lib/http/serve/serve.go | 2 +- vfs/read.go | 2 +- vfs/vfscache/downloaders/downloaders.go | 2 +- 16 files changed, 137 insertions(+), 40 deletions(-) create mode 100644 fs/accounting/transfer_test.go diff --git a/cmd/serve/ftp/ftp.go b/cmd/serve/ftp/ftp.go index 3c24c3583..2750e6458 100644 --- a/cmd/serve/ftp/ftp.go +++ b/cmd/serve/ftp/ftp.go @@ -339,7 +339,7 @@ func (d *driver) ListDir(sctx *ftp.Context, path string, callback func(iofs.File } // Account the transfer - tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) + tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size(), d.f, nil) defer func() { tr.Done(d.ctx, err) }() @@ -448,7 +448,7 @@ func (d *driver) GetFile(sctx *ftp.Context, path string, offset int64) (size int } // Account the transfer - tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) + tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size(), d.f, nil) defer tr.Done(d.ctx, nil) return node.Size(), handle, nil diff --git a/cmd/serve/http/http.go b/cmd/serve/http/http.go index 6af35ccd9..e3e373acc 100644 --- a/cmd/serve/http/http.go +++ b/cmd/serve/http/http.go @@ -297,7 +297,7 @@ func (s *HTTP) serveFile(w http.ResponseWriter, r *http.Request, remote string) }() // Account the transfer - tr := accounting.Stats(r.Context()).NewTransfer(obj) + tr := accounting.Stats(r.Context()).NewTransfer(obj, nil) defer tr.Done(r.Context(), nil) // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer diff --git a/docs/content/rc.md b/docs/content/rc.md index 7e4ed11d0..9fb42917a 100644 --- a/docs/content/rc.md +++ b/docs/content/rc.md @@ -847,12 +847,15 @@ Returns the following values: [ { "bytes": total transferred bytes for this file, - "eta": estimated time in seconds until file transfer completion + "eta": estimated time in seconds until file transfer completion (may be nil) "name": name of the file, "percentage": progress of the file transfer in percent, "speed": average speed over the whole transfer in bytes per second, "speedAvg": current speed in bytes per second as an exponentially weighted moving average, "size": size of the file in bytes + "group": stats group this transfer is part of + "srcFs": name of the source remote (not present if not known) + "dstFs": name of the destination remote (not present if not known) } ], "checking": an array of names of currently active file checks @@ -904,9 +907,12 @@ Returns the following values: "size": size of the file in bytes, "bytes": total transferred bytes for this file, "checked": if the transfer is only checked (skipped, deleted), - "timestamp": integer representing millisecond unix epoch, + "started_at": time the transfer was started at (RFC3339 format, eg `"2000-01-01T01:00:00.085742121Z"`), + "completed_at": time the transfer was completed at (RFC3339 format, only present if transfer is completed), "error": string description of the error (empty if successful), - "jobid": id of the job that this transfer belongs to + "group": string representing which stats group this is part of, + "srcFs": name of the source remote (not present if not known), + "dstFs": name of the destination remote (not present if not known), } ] } diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 41eb71225..2aa8c3219 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -539,9 +539,8 @@ func (acc *Account) String() string { ) } -// rcStats produces remote control stats for this file -func (acc *Account) rcStats() (out rc.Params) { - out = make(rc.Params) +// rcStats adds remote control stats for this file +func (acc *Account) rcStats(out rc.Params) { a, b := acc.progress() out["bytes"] = a out["size"] = b @@ -563,8 +562,6 @@ func (acc *Account) rcStats() (out rc.Params) { } out["percentage"] = percentageDone out["group"] = acc.stats.group - - return out } // OldStream returns the top io.Reader diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 3d89defd4..393b6711a 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -775,16 +775,24 @@ func (s *StatsInfo) GetTransfers() int64 { } // NewTransfer adds a transfer to the stats from the object. -func (s *StatsInfo) NewTransfer(obj fs.DirEntry) *Transfer { - tr := newTransfer(s, obj) +// +// The obj is uses as the srcFs, the dstFs must be supplied +func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer { + var srcFs fs.Fs + if oi, ok := obj.(fs.ObjectInfo); ok { + if f, ok := oi.Fs().(fs.Fs); ok { + srcFs = f + } + } + tr := newTransfer(s, obj, srcFs, dstFs) s.transferring.add(tr) s.startAverageLoop() return tr } // NewTransferRemoteSize adds a transfer to the stats based on remote and size. -func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer { - tr := newTransferRemoteSize(s, remote, size, false, "") +func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer { + tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs) s.transferring.add(tr) s.startAverageLoop() return tr diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index 5860eefb2..f2bc67761 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -21,6 +21,8 @@ type TransferSnapshot struct { CompletedAt time.Time `json:"completed_at,omitempty"` Error error `json:"-"` Group string `json:"group"` + SrcFs string `json:"srcFs,omitempty"` + DstFs string `json:"dstFs,omitempty"` } // MarshalJSON implements json.Marshaler interface. @@ -51,6 +53,8 @@ type Transfer struct { startedAt time.Time checking bool what string // what kind of transfer this is + srcFs fs.Fs // source Fs - may be nil + dstFs fs.Fs // destination Fs - may be nil // Protects all below // @@ -65,15 +69,15 @@ type Transfer struct { // newCheckingTransfer instantiates new checking of the object. func newCheckingTransfer(stats *StatsInfo, obj fs.DirEntry, what string) *Transfer { - return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true, what) + return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true, what, nil, nil) } // newTransfer instantiates new transfer. -func newTransfer(stats *StatsInfo, obj fs.DirEntry) *Transfer { - return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false, "") +func newTransfer(stats *StatsInfo, obj fs.DirEntry, srcFs, dstFs fs.Fs) *Transfer { + return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false, "", srcFs, dstFs) } -func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool, what string) *Transfer { +func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool, what string, srcFs, dstFs fs.Fs) *Transfer { tr := &Transfer{ stats: stats, remote: remote, @@ -81,6 +85,8 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking startedAt: time.Now(), checking: checking, what: what, + srcFs: srcFs, + dstFs: dstFs, } stats.AddTransfer(tr) return tr @@ -178,7 +184,7 @@ func (tr *Transfer) Snapshot() TransferSnapshot { if tr.acc != nil { b, s = tr.acc.progress() } - return TransferSnapshot{ + snapshot := TransferSnapshot{ Name: tr.remote, Checked: tr.checking, Size: s, @@ -188,12 +194,26 @@ func (tr *Transfer) Snapshot() TransferSnapshot { Error: tr.err, Group: tr.stats.group, } + if tr.srcFs != nil { + snapshot.SrcFs = fs.ConfigString(tr.srcFs) + } + if tr.dstFs != nil { + snapshot.DstFs = fs.ConfigString(tr.dstFs) + } + return snapshot } // rcStats returns stats for the transfer suitable for the rc func (tr *Transfer) rcStats() rc.Params { - return rc.Params{ + out := rc.Params{ "name": tr.remote, // no locking needed to access this "size": tr.size, } + if tr.srcFs != nil { + out["srcFs"] = fs.ConfigString(tr.srcFs) + } + if tr.dstFs != nil { + out["dstFs"] = fs.ConfigString(tr.dstFs) + } + return out } diff --git a/fs/accounting/transfer_test.go b/fs/accounting/transfer_test.go new file mode 100644 index 000000000..ec5c11e78 --- /dev/null +++ b/fs/accounting/transfer_test.go @@ -0,0 +1,66 @@ +package accounting + +import ( + "context" + "errors" + "io" + "testing" + + "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/fstest/mockfs" + "github.com/rclone/rclone/fstest/mockobject" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTransfer(t *testing.T) { + ctx := context.Background() + s := NewStats(ctx) + + o := mockobject.Object("obj") + srcFs, err := mockfs.NewFs(ctx, "srcFs", "srcFs", nil) + require.NoError(t, err) + dstFs, err := mockfs.NewFs(ctx, "dstFs", "dstFs", nil) + require.NoError(t, err) + + tr := newTransfer(s, o, srcFs, dstFs) + + t.Run("Snapshot", func(t *testing.T) { + snap := tr.Snapshot() + assert.Equal(t, "obj", snap.Name) + assert.Equal(t, int64(0), snap.Size) + assert.Equal(t, int64(0), snap.Bytes) + assert.Equal(t, false, snap.Checked) + assert.Equal(t, false, snap.StartedAt.IsZero()) + assert.Equal(t, true, snap.CompletedAt.IsZero()) + assert.Equal(t, nil, snap.Error) + assert.Equal(t, "", snap.Group) + assert.Equal(t, "srcFs:srcFs", snap.SrcFs) + assert.Equal(t, "dstFs:dstFs", snap.DstFs) + }) + + t.Run("Done", func(t *testing.T) { + tr.Done(ctx, io.EOF) + snap := tr.Snapshot() + assert.Equal(t, "obj", snap.Name) + assert.Equal(t, int64(0), snap.Size) + assert.Equal(t, int64(0), snap.Bytes) + assert.Equal(t, false, snap.Checked) + assert.Equal(t, false, snap.StartedAt.IsZero()) + assert.Equal(t, false, snap.CompletedAt.IsZero()) + assert.Equal(t, true, errors.Is(snap.Error, io.EOF)) + assert.Equal(t, "", snap.Group) + assert.Equal(t, "srcFs:srcFs", snap.SrcFs) + assert.Equal(t, "dstFs:dstFs", snap.DstFs) + }) + + t.Run("rcStats", func(t *testing.T) { + out := tr.rcStats() + assert.Equal(t, rc.Params{ + "name": "obj", + "size": int64(0), + "srcFs": "srcFs:srcFs", + "dstFs": "dstFs:dstFs", + }, out) + }) +} diff --git a/fs/accounting/transfermap.go b/fs/accounting/transfermap.go index 518124ffc..41409eece 100644 --- a/fs/accounting/transfermap.go +++ b/fs/accounting/transfermap.go @@ -159,11 +159,11 @@ func (tm *transferMap) rcStats(progress *inProgress) (t []rc.Params) { tm.mu.RLock() defer tm.mu.RUnlock() for _, tr := range tm._sortedSlice() { + out := tr.rcStats() // basic stats if acc := progress.get(tr.remote); acc != nil { - t = append(t, acc.rcStats()) - } else { - t = append(t, tr.rcStats()) + acc.rcStats(out) // add extended stats if have acc } + t = append(t, out) } return t } diff --git a/fs/operations/check.go b/fs/operations/check.go index 328be2128..2279b3fe6 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -341,7 +341,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo if err != nil { return true, fmt.Errorf("failed to open %q: %w", dst, err) } - tr1 := accounting.Stats(ctx).NewTransfer(dst) + tr1 := accounting.Stats(ctx).NewTransfer(dst, nil) defer func() { tr1.Done(ctx, nil) // error handling is done by the caller }() @@ -351,7 +351,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo if err != nil { return true, fmt.Errorf("failed to open %q: %w", src, err) } - tr2 := accounting.Stats(ctx).NewTransfer(dst) + tr2 := accounting.Stats(ctx).NewTransfer(dst, nil) defer func() { tr2.Done(ctx, nil) // error handling is done by the caller }() @@ -501,7 +501,7 @@ func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool, if in, err = Open(ctx, obj); err != nil { return } - tr := accounting.Stats(ctx).NewTransfer(obj) + tr := accounting.Stats(ctx).NewTransfer(obj, nil) in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer defer func() { tr.Done(ctx, nil) // will close the stream diff --git a/fs/operations/copy.go b/fs/operations/copy.go index 596c7ed04..4425c4974 100644 --- a/fs/operations/copy.go +++ b/fs/operations/copy.go @@ -369,7 +369,7 @@ func (c *copy) copy(ctx context.Context) (newDst fs.Object, err error) { // be nil. func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { ci := fs.GetConfig(ctx) - tr := accounting.Stats(ctx).NewTransfer(src) + tr := accounting.Stats(ctx).NewTransfer(src, f) defer func() { tr.Done(ctx, err) }() diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index a61f34331..59ff59d9d 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -183,7 +183,7 @@ func TestMultithreadCopy(t *testing.T) { require.NoError(t, err) accounting.GlobalStats().ResetCounters() - tr := accounting.GlobalStats().NewTransfer(src) + tr := accounting.GlobalStats().NewTransfer(src, nil) defer func() { tr.Done(ctx, err) @@ -284,7 +284,7 @@ func TestMultithreadCopyAbort(t *testing.T) { src, err := r.Flocal.NewObject(ctx, fileName) require.NoError(t, err) accounting.GlobalStats().ResetCounters() - tr := accounting.GlobalStats().NewTransfer(src) + tr := accounting.GlobalStats().NewTransfer(src, nil) defer func() { tr.Done(ctx, err) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 78b271765..91d7bd226 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -382,7 +382,7 @@ func move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs. ci := fs.GetConfig(ctx) var tr *accounting.Transfer if isTransfer { - tr = accounting.Stats(ctx).NewTransfer(src) + tr = accounting.Stats(ctx).NewTransfer(src, fdst) } else { tr = accounting.Stats(ctx).NewCheckingTransfer(src, "moving") } @@ -814,7 +814,7 @@ func HashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag // Setup: Define accounting, open the file with NewReOpen to provide restarts, account for the transfer, and setup a multi-hasher with the appropriate type // Execution: io.Copy file to hasher, get hash and encode in hex - tr := accounting.Stats(ctx).NewTransfer(o) + tr := accounting.Stats(ctx).NewTransfer(o, nil) defer func() { tr.Done(ctx, err) }() @@ -1106,7 +1106,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b ci := fs.GetConfig(ctx) return ListFn(ctx, f, func(o fs.Object) { var err error - tr := accounting.Stats(ctx).NewTransfer(o) + tr := accounting.Stats(ctx).NewTransfer(o, nil) defer func() { tr.Done(ctx, err) }() @@ -1157,7 +1157,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b // Rcat reads data from the Reader until EOF and uploads it to a file on remote func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) { ci := fs.GetConfig(ctx) - tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1) + tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst) defer func() { tr.Done(ctx, err) }() @@ -1603,7 +1603,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo if size >= 0 { var err error // Size known use Put - tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size) + tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size, nil, fdst) defer func() { tr.Done(ctx, err) }() @@ -1807,7 +1807,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str logger(ctx, TransferError, nil, tmpObjFail, err) return fmt.Errorf("error while attempting to move file to a temporary location: %w", err) } - tr := accounting.Stats(ctx).NewTransfer(srcObj) + tr := accounting.Stats(ctx).NewTransfer(srcObj, fdst) defer func() { tr.Done(ctx, err) }() diff --git a/lib/http/serve/dir.go b/lib/http/serve/dir.go index 00d4579c5..1cfcc454b 100644 --- a/lib/http/serve/dir.go +++ b/lib/http/serve/dir.go @@ -224,7 +224,7 @@ const ( // Serve serves a directory func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { // Account the transfer - tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1) + tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1, nil, nil) defer tr.Done(r.Context(), nil) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) diff --git a/lib/http/serve/serve.go b/lib/http/serve/serve.go index 6f97d0d8a..21f3c3322 100644 --- a/lib/http/serve/serve.go +++ b/lib/http/serve/serve.go @@ -79,7 +79,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } - tr := accounting.Stats(r.Context()).NewTransfer(o) + tr := accounting.Stats(r.Context()).NewTransfer(o, nil) defer func() { tr.Done(r.Context(), err) }() diff --git a/vfs/read.go b/vfs/read.go index 8bedeae91..f890143c8 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -79,7 +79,7 @@ func (fh *ReadFileHandle) openPending() (err error) { if err != nil { return err } - tr := accounting.GlobalStats().NewTransfer(o) + tr := accounting.GlobalStats().NewTransfer(o, nil) fh.done = tr.Done fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer fh.opened = true diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index b12c107b5..b0dd6b508 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -518,7 +518,7 @@ loop: // should be called on a fresh downloader func (dl *downloader) open(offset int64) (err error) { // defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err) - dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src) + dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src, nil) size := dl.dls.src.Size() if size < 0 {