From f102ef216103636b44a8445fa1ead7a270e2029f Mon Sep 17 00:00:00 2001 From: Ivan Andreev Date: Tue, 12 Oct 2021 15:16:39 +0300 Subject: [PATCH] hasher: add hasher backend #5587 --- backend/all/all.go | 1 + backend/hasher/commands.go | 179 +++++++++ backend/hasher/hasher.go | 508 +++++++++++++++++++++++++ backend/hasher/hasher_internal_test.go | 78 ++++ backend/hasher/hasher_test.go | 38 ++ backend/hasher/kv.go | 315 +++++++++++++++ backend/hasher/object.go | 305 +++++++++++++++ 7 files changed, 1424 insertions(+) create mode 100644 backend/hasher/commands.go create mode 100644 backend/hasher/hasher.go create mode 100644 backend/hasher/hasher_internal_test.go create mode 100644 backend/hasher/hasher_test.go create mode 100644 backend/hasher/kv.go create mode 100644 backend/hasher/object.go diff --git a/backend/all/all.go b/backend/all/all.go index cad76f6ad..f278e4ed7 100644 --- a/backend/all/all.go +++ b/backend/all/all.go @@ -18,6 +18,7 @@ import ( _ "github.com/rclone/rclone/backend/ftp" _ "github.com/rclone/rclone/backend/googlecloudstorage" _ "github.com/rclone/rclone/backend/googlephotos" + _ "github.com/rclone/rclone/backend/hasher" _ "github.com/rclone/rclone/backend/hdfs" _ "github.com/rclone/rclone/backend/http" _ "github.com/rclone/rclone/backend/hubic" diff --git a/backend/hasher/commands.go b/backend/hasher/commands.go new file mode 100644 index 000000000..99101daba --- /dev/null +++ b/backend/hasher/commands.go @@ -0,0 +1,179 @@ +package hasher + +import ( + "context" + "path" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/fspath" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/kv" +) + +// Command the backend to run a named command +// +// The command run is name +// args may be used to read arguments from +// opts may be used to read optional arguments from +// +// The result should be capable of being JSON encoded +// If it is a string or a []string it will be shown to the user +// otherwise it will be JSON encoded and shown to the user like that +func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[string]string) (out interface{}, err error) { + switch name { + case "drop": + return nil, f.db.Stop(true) + case "dump", "fulldump": + return nil, f.dbDump(ctx, name == "fulldump", "") + case "import", "stickyimport": + sticky := name == "stickyimport" + if len(arg) != 2 { + return nil, errors.New("please provide checksum type and path to sum file") + } + return nil, f.dbImport(ctx, arg[0], arg[1], sticky) + default: + return nil, fs.ErrorCommandNotFound + } +} + +var commandHelp = []fs.CommandHelp{{ + Name: "drop", + Short: "Drop cache", + Long: `Completely drop checksum cache. +Usage Example: + rclone backend drop hasher: +`, +}, { + Name: "dump", + Short: "Dump the database", + Long: "Dump cache records covered by the current remote", +}, { + Name: "fulldump", + Short: "Full dump of the database", + Long: "Dump all cache records in the database", +}, { + Name: "import", + Short: "Import a SUM file", + Long: `Amend hash cache from a SUM file and bind checksums to files by size/time. +Usage Example: + rclone backend import hasher:subdir md5 /path/to/sum.md5 +`, +}, { + Name: "stickyimport", + Short: "Perform fast import of a SUM file", + Long: `Fill hash cache from a SUM file without verifying file fingerprints. +Usage Example: + rclone backend stickyimport hasher:subdir md5 remote:path/to/sum.md5 +`, +}} + +func (f *Fs) dbDump(ctx context.Context, full bool, root string) error { + if root == "" { + remoteFs, err := cache.Get(ctx, f.opt.Remote) + if err != nil { + return err + } + root = fspath.JoinRootPath(remoteFs.Root(), f.Root()) + } + op := &kvDump{ + full: full, + root: root, + path: f.db.Path(), + fs: f, + } + err := f.db.Do(false, op) + if err == kv.ErrEmpty { + fs.Infof(op.path, "empty") + err = nil + } + return err +} + +func (f *Fs) dbImport(ctx context.Context, hashName, sumRemote string, sticky bool) error { + var hashType hash.Type + if err := hashType.Set(hashName); err != nil { + return err + } + if hashType == hash.None { + return errors.New("please provide a valid hash type") + } + if !f.suppHashes.Contains(hashType) { + return errors.New("unsupported hash type") + } + if !f.keepHashes.Contains(hashType) { + fs.Infof(nil, "Need not import hashes of this type") + return nil + } + + _, sumPath, err := fspath.SplitFs(sumRemote) + if err != nil { + return err + } + sumFs, err := cache.Get(ctx, sumRemote) + switch err { + case fs.ErrorIsFile: + // ok + case nil: + return errors.Errorf("not a file: %s", sumRemote) + default: + return err + } + + sumObj, err := sumFs.NewObject(ctx, path.Base(sumPath)) + if err != nil { + return errors.Wrap(err, "cannot open sum file") + } + hashes, err := operations.ParseSumFile(ctx, sumObj) + if err != nil { + return errors.Wrap(err, "failed to parse sum file") + } + + if sticky { + rootPath := f.Fs.Root() + for remote, hashVal := range hashes { + key := path.Join(rootPath, remote) + hashSums := operations.HashSums{hashName: hashVal} + if err := f.putRawHashes(ctx, key, anyFingerprint, hashSums); err != nil { + fs.Errorf(nil, "%s: failed to import: %v", remote, err) + } + } + fs.Infof(nil, "Summary: %d checksum(s) imported", len(hashes)) + return nil + } + + const longImportThreshold = 100 + if len(hashes) > longImportThreshold { + fs.Infof(nil, "Importing %d checksums. Please wait...", len(hashes)) + } + + doneCount := 0 + err = operations.ListFn(ctx, f, func(obj fs.Object) { + remote := obj.Remote() + hash := hashes[remote] + hashes[remote] = "" // mark as handled + o, ok := obj.(*Object) + if ok && hash != "" { + if err := o.putHashes(ctx, hashMap{hashType: hash}); err != nil { + fs.Errorf(nil, "%s: failed to import: %v", remote, err) + } + accounting.Stats(ctx).NewCheckingTransfer(obj).Done(ctx, err) + doneCount++ + } + }) + if err != nil { + fs.Errorf(nil, "Import failed: %v", err) + } + skipCount := 0 + for remote, emptyOrDone := range hashes { + if emptyOrDone != "" { + fs.Infof(nil, "Skip vanished object: %s", remote) + skipCount++ + } + } + fs.Infof(nil, "Summary: %d imported, %d skipped", doneCount, skipCount) + return err +} diff --git a/backend/hasher/hasher.go b/backend/hasher/hasher.go new file mode 100644 index 000000000..1a7df38fc --- /dev/null +++ b/backend/hasher/hasher.go @@ -0,0 +1,508 @@ +// Package hasher implements a checksum handling overlay backend +package hasher + +import ( + "context" + "encoding/gob" + "fmt" + "io" + "path" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/config/configstruct" + "github.com/rclone/rclone/fs/fspath" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/kv" +) + +// Register with Fs +func init() { + fs.Register(&fs.RegInfo{ + Name: "hasher", + Description: "Better checksums for other remotes", + NewFs: NewFs, + CommandHelp: commandHelp, + Options: []fs.Option{{ + Name: "remote", + Required: true, + Help: "Remote to cache checksums for (e.g. myRemote:path).", + }, { + Name: "hashes", + Default: fs.CommaSepList{"md5", "sha1"}, + Advanced: false, + Help: "Comma separated list of supported checksum types.", + }, { + Name: "max_age", + Advanced: false, + Default: fs.DurationOff, + Help: "Maximum time to keep checksums in cache (0 = no cache, off = cache forever).", + }, { + Name: "auto_size", + Advanced: true, + Default: fs.SizeSuffix(0), + Help: "Auto-update checksum for files smaller than this size (disabled by default).", + }}, + }) +} + +// Options defines the configuration for this backend +type Options struct { + Remote string `config:"remote"` + Hashes fs.CommaSepList `config:"hashes"` + AutoSize fs.SizeSuffix `config:"auto_size"` + MaxAge fs.Duration `config:"max_age"` +} + +// Fs represents a wrapped fs.Fs +type Fs struct { + fs.Fs + name string + root string + wrapper fs.Fs + features *fs.Features + opt *Options + db *kv.DB + // fingerprinting + fpTime bool // true if using time in fingerprints + fpHash hash.Type // hash type to use in fingerprints or None + // hash types triaged by groups + suppHashes hash.Set // all supported checksum types + passHashes hash.Set // passed directly to the base without caching + slowHashes hash.Set // passed to the base and then cached + autoHashes hash.Set // calculated in-house and cached + keepHashes hash.Set // checksums to keep in cache (slow + auto) +} + +var warnExperimental sync.Once + +// NewFs constructs an Fs from the remote:path string +func NewFs(ctx context.Context, fsname, rpath string, cmap configmap.Mapper) (fs.Fs, error) { + if !kv.Supported() { + return nil, errors.New("hasher is not supported on this OS") + } + warnExperimental.Do(func() { + fs.Infof(nil, "Hasher is EXPERIMENTAL!") + }) + + opt := &Options{} + err := configstruct.Set(cmap, opt) + if err != nil { + return nil, err + } + + if strings.HasPrefix(opt.Remote, fsname+":") { + return nil, errors.New("can't point remote at itself") + } + remotePath := fspath.JoinRootPath(opt.Remote, rpath) + baseFs, err := cache.Get(ctx, remotePath) + if err != nil && err != fs.ErrorIsFile { + return nil, errors.Wrapf(err, "failed to derive base remote %q", opt.Remote) + } + + f := &Fs{ + Fs: baseFs, + name: fsname, + root: rpath, + opt: opt, + } + baseFeatures := baseFs.Features() + f.fpTime = baseFs.Precision() != fs.ModTimeNotSupported + + if baseFeatures.SlowHash { + f.slowHashes = f.Fs.Hashes() + } else { + f.passHashes = f.Fs.Hashes() + f.fpHash = f.passHashes.GetOne() + } + + f.suppHashes = f.passHashes + f.suppHashes.Add(f.slowHashes.Array()...) + + for _, hashName := range opt.Hashes { + var ht hash.Type + if err := ht.Set(hashName); err != nil { + return nil, errors.Errorf("invalid token %q in hash string %q", hashName, opt.Hashes.String()) + } + if !f.slowHashes.Contains(ht) { + f.autoHashes.Add(ht) + } + f.keepHashes.Add(ht) + f.suppHashes.Add(ht) + } + + fs.Debugf(f, "Groups by usage: cached %s, passed %s, auto %s, slow %s, supported %s", + f.keepHashes, f.passHashes, f.autoHashes, f.slowHashes, f.suppHashes) + + var nilSet hash.Set + if f.keepHashes == nilSet { + return nil, errors.New("configured hash_names have nothing to keep in cache") + } + + if f.opt.MaxAge > 0 { + gob.Register(hashRecord{}) + db, err := kv.Start(ctx, "hasher", f.Fs) + if err != nil { + return nil, err + } + f.db = db + } + + stubFeatures := &fs.Features{ + CanHaveEmptyDirectories: true, + IsLocal: true, + ReadMimeType: true, + WriteMimeType: true, + } + f.features = stubFeatures.Fill(ctx, f).Mask(ctx, f.Fs).WrapsFs(f, f.Fs) + + cache.PinUntilFinalized(f.Fs, f) + return f, err +} + +// +// Filesystem +// + +// Name of the remote (as passed into NewFs) +func (f *Fs) Name() string { return f.name } + +// Root of the remote (as passed into NewFs) +func (f *Fs) Root() string { return f.root } + +// Features returns the optional features of this Fs +func (f *Fs) Features() *fs.Features { return f.features } + +// Hashes returns the supported hash sets. +func (f *Fs) Hashes() hash.Set { return f.suppHashes } + +// String returns a description of the FS +// The "hasher::" prefix is a distinctive feature. +func (f *Fs) String() string { + return fmt.Sprintf("hasher::%s:%s", f.name, f.root) +} + +// UnWrap returns the Fs that this Fs is wrapping +func (f *Fs) UnWrap() fs.Fs { return f.Fs } + +// WrapFs returns the Fs that is wrapping this Fs +func (f *Fs) WrapFs() fs.Fs { return f.wrapper } + +// SetWrapper sets the Fs that is wrapping this Fs +func (f *Fs) SetWrapper(wrapper fs.Fs) { f.wrapper = wrapper } + +// Wrap base entries into hasher entries. +func (f *Fs) wrapEntries(baseEntries fs.DirEntries) (hashEntries fs.DirEntries, err error) { + hashEntries = baseEntries[:0] // work inplace + for _, entry := range baseEntries { + switch x := entry.(type) { + case fs.Object: + hashEntries = append(hashEntries, f.wrapObject(x, nil)) + default: + hashEntries = append(hashEntries, entry) // trash in - trash out + } + } + return hashEntries, nil +} + +// List the objects and directories in dir into entries. +func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { + if entries, err = f.Fs.List(ctx, dir); err != nil { + return nil, err + } + return f.wrapEntries(entries) +} + +// ListR lists the objects and directories recursively into out. +func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { + return f.Fs.Features().ListR(ctx, dir, func(baseEntries fs.DirEntries) error { + hashEntries, err := f.wrapEntries(baseEntries) + if err != nil { + return err + } + return callback(hashEntries) + }) +} + +// Purge a directory +func (f *Fs) Purge(ctx context.Context, dir string) error { + if do := f.Fs.Features().Purge; do != nil { + if err := do(ctx, dir); err != nil { + return err + } + err := f.db.Do(true, &kvPurge{ + dir: path.Join(f.Fs.Root(), dir), + }) + if err != nil { + fs.Errorf(f, "Failed to purge some hashes: %v", err) + } + return nil + } + return fs.ErrorCantPurge +} + +// PutStream uploads to the remote path with undeterminate size. +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + if do := f.Fs.Features().PutStream; do != nil { + _ = f.pruneHash(src.Remote()) + oResult, err := do(ctx, in, src, options...) + return f.wrapObject(oResult, err), err + } + return nil, errors.New("PutStream not supported") +} + +// PutUnchecked uploads the object, allowing duplicates. +func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + if do := f.Fs.Features().PutUnchecked; do != nil { + _ = f.pruneHash(src.Remote()) + oResult, err := do(ctx, in, src, options...) + return f.wrapObject(oResult, err), err + } + return nil, errors.New("PutUnchecked not supported") +} + +// pruneHash deletes hash for a path +func (f *Fs) pruneHash(remote string) error { + return f.db.Do(true, &kvPrune{ + key: path.Join(f.Fs.Root(), remote), + }) +} + +// CleanUp the trash in the Fs +func (f *Fs) CleanUp(ctx context.Context) error { + if do := f.Fs.Features().CleanUp; do != nil { + return do(ctx) + } + return errors.New("CleanUp not supported") +} + +// About gets quota information from the Fs +func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { + if do := f.Fs.Features().About; do != nil { + return do(ctx) + } + return nil, errors.New("About not supported") +} + +// ChangeNotify calls the passed function with a path that has had changes. +func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { + if do := f.Fs.Features().ChangeNotify; do != nil { + do(ctx, notifyFunc, pollIntervalChan) + } +} + +// UserInfo returns info about the connected user +func (f *Fs) UserInfo(ctx context.Context) (map[string]string, error) { + if do := f.Fs.Features().UserInfo; do != nil { + return do(ctx) + } + return nil, fs.ErrorNotImplemented +} + +// Disconnect the current user +func (f *Fs) Disconnect(ctx context.Context) error { + if do := f.Fs.Features().Disconnect; do != nil { + return do(ctx) + } + return fs.ErrorNotImplemented +} + +// MergeDirs merges the contents of all the directories passed +// in into the first one and rmdirs the other directories. +func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error { + if do := f.Fs.Features().MergeDirs; do != nil { + return do(ctx, dirs) + } + return errors.New("MergeDirs not supported") +} + +// DirCacheFlush resets the directory cache - used in testing +// as an optional interface +func (f *Fs) DirCacheFlush() { + if do := f.Fs.Features().DirCacheFlush; do != nil { + do() + } +} + +// PublicLink generates a public link to the remote path (usually readable by anyone) +func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (string, error) { + if do := f.Fs.Features().PublicLink; do != nil { + return do(ctx, remote, expire, unlink) + } + return "", errors.New("PublicLink not supported") +} + +// Copy src to this remote using server-side copy operations. +func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + do := f.Fs.Features().Copy + if do == nil { + return nil, fs.ErrorCantCopy + } + o, ok := src.(*Object) + if !ok { + return nil, fs.ErrorCantCopy + } + oResult, err := do(ctx, o.Object, remote) + return f.wrapObject(oResult, err), err +} + +// Move src to this remote using server-side move operations. +func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + do := f.Fs.Features().Move + if do == nil { + return nil, fs.ErrorCantMove + } + o, ok := src.(*Object) + if !ok { + return nil, fs.ErrorCantMove + } + oResult, err := do(ctx, o.Object, remote) + if err != nil { + return nil, err + } + _ = f.db.Do(true, &kvMove{ + src: path.Join(f.Fs.Root(), src.Remote()), + dst: path.Join(f.Fs.Root(), remote), + dir: false, + fs: f, + }) + return f.wrapObject(oResult, nil), nil +} + +// DirMove moves src, srcRemote to this remote at dstRemote using server-side move operations. +func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { + do := f.Fs.Features().DirMove + if do == nil { + return fs.ErrorCantDirMove + } + srcFs, ok := src.(*Fs) + if !ok { + return fs.ErrorCantDirMove + } + err := do(ctx, srcFs.Fs, srcRemote, dstRemote) + if err == nil { + _ = f.db.Do(true, &kvMove{ + src: path.Join(srcFs.Fs.Root(), srcRemote), + dst: path.Join(f.Fs.Root(), dstRemote), + dir: true, + fs: f, + }) + } + return err +} + +// Shutdown the backend, closing any background tasks and any cached connections. +func (f *Fs) Shutdown(ctx context.Context) (err error) { + err = f.db.Stop(false) + if do := f.Fs.Features().Shutdown; do != nil { + if err2 := do(ctx); err2 != nil { + err = err2 + } + } + return +} + +// NewObject finds the Object at remote. +func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + o, err := f.Fs.NewObject(ctx, remote) + return f.wrapObject(o, err), err +} + +// +// Object +// + +// Object represents a composite file wrapping one or more data chunks +type Object struct { + fs.Object + f *Fs +} + +// Wrap base object into hasher object +func (f *Fs) wrapObject(o fs.Object, err error) *Object { + if err != nil || o == nil { + return nil + } + return &Object{Object: o, f: f} +} + +// Fs returns read only access to the Fs that this object is part of +func (o *Object) Fs() fs.Info { return o.f } + +// UnWrap returns the wrapped Object +func (o *Object) UnWrap() fs.Object { return o.Object } + +// Return a string version +func (o *Object) String() string { + if o == nil { + return "" + } + return o.Object.String() +} + +// ID returns the ID of the Object if possible +func (o *Object) ID() string { + if doer, ok := o.Object.(fs.IDer); ok { + return doer.ID() + } + return "" +} + +// GetTier returns the Tier of the Object if possible +func (o *Object) GetTier() string { + if doer, ok := o.Object.(fs.GetTierer); ok { + return doer.GetTier() + } + return "" +} + +// SetTier set the Tier of the Object if possible +func (o *Object) SetTier(tier string) error { + if doer, ok := o.Object.(fs.SetTierer); ok { + return doer.SetTier(tier) + } + return errors.New("SetTier not supported") +} + +// MimeType of an Object if known, "" otherwise +func (o *Object) MimeType(ctx context.Context) string { + if doer, ok := o.Object.(fs.MimeTyper); ok { + return doer.MimeType(ctx) + } + return "" +} + +// Check the interfaces are satisfied +var ( + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.Commander = (*Fs)(nil) + _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.CleanUpper = (*Fs)(nil) + _ fs.UnWrapper = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) + _ fs.Abouter = (*Fs)(nil) + _ fs.Wrapper = (*Fs)(nil) + _ fs.MergeDirser = (*Fs)(nil) + _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.ChangeNotifier = (*Fs)(nil) + _ fs.PublicLinker = (*Fs)(nil) + _ fs.UserInfoer = (*Fs)(nil) + _ fs.Disconnecter = (*Fs)(nil) + _ fs.Shutdowner = (*Fs)(nil) + _ fs.Object = (*Object)(nil) + _ fs.ObjectUnWrapper = (*Object)(nil) + _ fs.IDer = (*Object)(nil) + _ fs.SetTierer = (*Object)(nil) + _ fs.GetTierer = (*Object)(nil) + _ fs.MimeTyper = (*Object)(nil) +) diff --git a/backend/hasher/hasher_internal_test.go b/backend/hasher/hasher_internal_test.go new file mode 100644 index 000000000..3ec7a11f9 --- /dev/null +++ b/backend/hasher/hasher_internal_test.go @@ -0,0 +1,78 @@ +package hasher + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config/obscure" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" + "github.com/rclone/rclone/lib/kv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func putFile(ctx context.Context, t *testing.T, f fs.Fs, name, data string) fs.Object { + mtime1 := fstest.Time("2001-02-03T04:05:06.499999999Z") + item := fstest.Item{Path: name, ModTime: mtime1} + _, o := fstests.PutTestContents(ctx, t, f, &item, data, true) + require.NotNil(t, o) + return o +} + +func (f *Fs) testUploadFromCrypt(t *testing.T) { + // make a temporary local remote + tempRoot, err := fstest.LocalRemote() + require.NoError(t, err) + defer func() { + _ = os.RemoveAll(tempRoot) + }() + + // make a temporary crypt remote + ctx := context.Background() + pass := obscure.MustObscure("crypt") + remote := fmt.Sprintf(":crypt,remote=%s,password=%s:", tempRoot, pass) + cryptFs, err := fs.NewFs(ctx, remote) + require.NoError(t, err) + + // make a test file on the crypt remote + const dirName = "from_crypt_1" + const fileName = dirName + "/file_from_crypt_1" + const longTime = fs.ModTimeNotSupported + src := putFile(ctx, t, cryptFs, fileName, "doggy froggy") + + // ensure that hash does not exist yet + _ = f.pruneHash(fileName) + hashType := f.keepHashes.GetOne() + hash, err := f.getRawHash(ctx, hashType, fileName, anyFingerprint, longTime) + assert.Error(t, err) + assert.Empty(t, hash) + + // upload file to hasher + in, err := src.Open(ctx) + require.NoError(t, err) + dst, err := f.Put(ctx, in, src) + require.NoError(t, err) + assert.NotNil(t, dst) + + // check that hash was created + hash, err = f.getRawHash(ctx, hashType, fileName, anyFingerprint, longTime) + assert.NoError(t, err) + assert.NotEmpty(t, hash) + //t.Logf("hash is %q", hash) + _ = operations.Purge(ctx, f, dirName) +} + +// InternalTest dispatches all internal tests +func (f *Fs) InternalTest(t *testing.T) { + if !kv.Supported() { + t.Skip("hasher is not supported on this OS") + } + t.Run("UploadFromCrypt", f.testUploadFromCrypt) +} + +var _ fstests.InternalTester = (*Fs)(nil) diff --git a/backend/hasher/hasher_test.go b/backend/hasher/hasher_test.go new file mode 100644 index 000000000..09d60f33d --- /dev/null +++ b/backend/hasher/hasher_test.go @@ -0,0 +1,38 @@ +package hasher_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/rclone/rclone/backend/hasher" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" + "github.com/rclone/rclone/lib/kv" + + _ "github.com/rclone/rclone/backend/all" // for integration tests +) + +// TestIntegration runs integration tests against the remote +func TestIntegration(t *testing.T) { + if !kv.Supported() { + t.Skip("hasher is not supported on this OS") + } + opt := fstests.Opt{ + RemoteName: *fstest.RemoteName, + NilObject: (*hasher.Object)(nil), + UnimplementableFsMethods: []string{ + "OpenWriterAt", + }, + UnimplementableObjectMethods: []string{}, + } + if *fstest.RemoteName == "" { + tempDir := filepath.Join(os.TempDir(), "rclone-hasher-test") + opt.ExtraConfig = []fstests.ExtraConfigItem{ + {Name: "TestHasher", Key: "type", Value: "hasher"}, + {Name: "TestHasher", Key: "remote", Value: tempDir}, + } + opt.RemoteName = "TestHasher:" + } + fstests.Run(t, &opt) +} diff --git a/backend/hasher/kv.go b/backend/hasher/kv.go new file mode 100644 index 000000000..881661b4f --- /dev/null +++ b/backend/hasher/kv.go @@ -0,0 +1,315 @@ +package hasher + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/kv" +) + +const ( + timeFormat = "2006-01-02T15:04:05.000000000-0700" + anyFingerprint = "*" +) + +type hashMap map[hash.Type]string + +type hashRecord struct { + Fp string // fingerprint + Hashes operations.HashSums + Created time.Time +} + +func (r *hashRecord) encode(key string) ([]byte, error) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(r); err != nil { + fs.Debugf(key, "hasher encoding %v: %v", r, err) + return nil, err + } + return buf.Bytes(), nil +} + +func (r *hashRecord) decode(key string, data []byte) error { + if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(r); err != nil { + fs.Debugf(key, "hasher decoding %q failed: %v", data, err) + return err + } + return nil +} + +// kvPrune: prune a single hash +type kvPrune struct { + key string +} + +func (op *kvPrune) Do(ctx context.Context, b kv.Bucket) error { + return b.Delete([]byte(op.key)) +} + +// kvPurge: delete a subtree +type kvPurge struct { + dir string +} + +func (op *kvPurge) Do(ctx context.Context, b kv.Bucket) error { + dir := op.dir + if !strings.HasSuffix(dir, "/") { + dir += "/" + } + var items []string + cur := b.Cursor() + bkey, _ := cur.Seek([]byte(dir)) + for bkey != nil { + key := string(bkey) + if !strings.HasPrefix(key, dir) { + break + } + items = append(items, key[len(dir):]) + bkey, _ = cur.Next() + } + nerr := 0 + for _, sub := range items { + if err := b.Delete([]byte(dir + sub)); err != nil { + nerr++ + } + } + fs.Debugf(dir, "%d hashes purged, %d failed", len(items)-nerr, nerr) + return nil +} + +// kvMove: assign hashes to new path +type kvMove struct { + src string + dst string + dir bool + fs *Fs +} + +func (op *kvMove) Do(ctx context.Context, b kv.Bucket) error { + src, dst := op.src, op.dst + if !op.dir { + err := moveHash(b, src, dst) + fs.Debugf(op.fs, "moving cached hash %s to %s (err: %v)", src, dst, err) + return err + } + + if !strings.HasSuffix(src, "/") { + src += "/" + } + if !strings.HasSuffix(dst, "/") { + dst += "/" + } + + var items []string + cur := b.Cursor() + bkey, _ := cur.Seek([]byte(src)) + for bkey != nil { + key := string(bkey) + if !strings.HasPrefix(key, src) { + break + } + items = append(items, key[len(src):]) + bkey, _ = cur.Next() + } + + nerr := 0 + for _, suffix := range items { + srcKey, dstKey := src+suffix, dst+suffix + err := moveHash(b, srcKey, dstKey) + fs.Debugf(op.fs, "Rename cache record %s -> %s (err: %v)", srcKey, dstKey, err) + if err != nil { + nerr++ + } + } + fs.Debugf(op.fs, "%d hashes moved, %d failed", len(items)-nerr, nerr) + return nil +} + +func moveHash(b kv.Bucket, src, dst string) error { + data := b.Get([]byte(src)) + err := b.Delete([]byte(src)) + if err != nil || len(data) == 0 { + return err + } + return b.Put([]byte(dst), data) +} + +// kvGet: get single hash from database +type kvGet struct { + key string + fp string + hash string + val string + age time.Duration +} + +func (op *kvGet) Do(ctx context.Context, b kv.Bucket) error { + data := b.Get([]byte(op.key)) + if len(data) == 0 { + return errors.New("no record") + } + var r hashRecord + if err := r.decode(op.key, data); err != nil { + return errors.New("invalid record") + } + if !(r.Fp == anyFingerprint || op.fp == anyFingerprint || r.Fp == op.fp) { + return errors.New("fingerprint changed") + } + if time.Since(r.Created) > op.age { + return errors.New("record timed out") + } + if r.Hashes != nil { + op.val = r.Hashes[op.hash] + } + return nil +} + +// kvPut: set hashes for an object by key +type kvPut struct { + key string + fp string + hashes operations.HashSums + age time.Duration +} + +func (op *kvPut) Do(ctx context.Context, b kv.Bucket) (err error) { + data := b.Get([]byte(op.key)) + var r hashRecord + if len(data) > 0 { + err = r.decode(op.key, data) + if err != nil || r.Fp != op.fp || time.Since(r.Created) > op.age { + r.Hashes = nil + } + } + if len(r.Hashes) == 0 { + r.Created = time.Now() + r.Hashes = operations.HashSums{} + r.Fp = op.fp + } + + for hashType, hashVal := range op.hashes { + r.Hashes[hashType] = hashVal + } + if data, err = r.encode(op.key); err != nil { + return errors.Wrap(err, "marshal failed") + } + if err = b.Put([]byte(op.key), data); err != nil { + return errors.Wrap(err, "put failed") + } + return err +} + +// kvDump: dump the database. +// Note: long dump can cause concurrent operations to fail. +type kvDump struct { + full bool + root string + path string + fs *Fs + num int + total int +} + +func (op *kvDump) Do(ctx context.Context, b kv.Bucket) error { + f, baseRoot, dbPath := op.fs, op.root, op.path + + if op.full { + total := 0 + num := 0 + _ = b.ForEach(func(bkey, data []byte) error { + total++ + key := string(bkey) + include := (baseRoot == "" || key == baseRoot || strings.HasPrefix(key, baseRoot+"/")) + var r hashRecord + if err := r.decode(key, data); err != nil { + fs.Errorf(nil, "%s: invalid record: %v", key, err) + return nil + } + fmt.Println(f.dumpLine(&r, key, include, nil)) + if include { + num++ + } + return nil + }) + fs.Infof(dbPath, "%d records out of %d", num, total) + op.num, op.total = num, total // for unit tests + return nil + } + + num := 0 + cur := b.Cursor() + var bkey, data []byte + if baseRoot != "" { + bkey, data = cur.Seek([]byte(baseRoot)) + } else { + bkey, data = cur.First() + } + for bkey != nil { + key := string(bkey) + if !(baseRoot == "" || key == baseRoot || strings.HasPrefix(key, baseRoot+"/")) { + break + } + var r hashRecord + if err := r.decode(key, data); err != nil { + fs.Errorf(nil, "%s: invalid record: %v", key, err) + continue + } + if key = strings.TrimPrefix(key[len(baseRoot):], "/"); key == "" { + key = "/" + } + fmt.Println(f.dumpLine(&r, key, true, nil)) + num++ + bkey, data = cur.Next() + } + fs.Infof(dbPath, "%d records", num) + op.num = num // for unit tests + return nil +} + +func (f *Fs) dumpLine(r *hashRecord, path string, include bool, err error) string { + var status string + switch { + case !include: + status = "ext" + case err != nil: + status = "bad" + case r.Fp == anyFingerprint: + status = "stk" + default: + status = "ok " + } + + var hashes []string + for _, hashType := range f.keepHashes.Array() { + hashName := hashType.String() + hashVal := r.Hashes[hashName] + if hashVal == "" || err != nil { + hashVal = "-" + } + hashVal = fmt.Sprintf("%-*s", hash.Width(hashType), hashVal) + hashes = append(hashes, hashName+":"+hashVal) + } + hashesStr := strings.Join(hashes, " ") + + age := time.Since(r.Created).Round(time.Second) + if age > 24*time.Hour { + age = age.Round(time.Hour) + } + if err != nil { + age = 0 + } + ageStr := age.String() + if strings.HasSuffix(ageStr, "h0m0s") { + ageStr = strings.TrimSuffix(ageStr, "0m0s") + } + + return fmt.Sprintf("%s %s %9s %s", status, hashesStr, ageStr, path) +} diff --git a/backend/hasher/object.go b/backend/hasher/object.go new file mode 100644 index 000000000..3c6e5af78 --- /dev/null +++ b/backend/hasher/object.go @@ -0,0 +1,305 @@ +package hasher + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "path" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/operations" +) + +// obtain hash for an object +func (o *Object) getHash(ctx context.Context, hashType hash.Type) (string, error) { + maxAge := time.Duration(o.f.opt.MaxAge) + if maxAge <= 0 { + return "", nil + } + fp := o.fingerprint(ctx) + if fp == "" { + return "", errors.New("fingerprint failed") + } + return o.f.getRawHash(ctx, hashType, o.Remote(), fp, maxAge) +} + +// obtain hash for a path +func (f *Fs) getRawHash(ctx context.Context, hashType hash.Type, remote, fp string, age time.Duration) (string, error) { + key := path.Join(f.Fs.Root(), remote) + op := &kvGet{ + key: key, + fp: fp, + hash: hashType.String(), + age: age, + } + err := f.db.Do(false, op) + return op.val, err +} + +// put new hashes for an object +func (o *Object) putHashes(ctx context.Context, rawHashes hashMap) error { + if o.f.opt.MaxAge <= 0 { + return nil + } + fp := o.fingerprint(ctx) + if fp == "" { + return nil + } + key := path.Join(o.f.Fs.Root(), o.Remote()) + hashes := operations.HashSums{} + for hashType, hashVal := range rawHashes { + hashes[hashType.String()] = hashVal + } + return o.f.putRawHashes(ctx, key, fp, hashes) +} + +// set hashes for a path without any validation +func (f *Fs) putRawHashes(ctx context.Context, key, fp string, hashes operations.HashSums) error { + return f.db.Do(true, &kvPut{ + key: key, + fp: fp, + hashes: hashes, + age: time.Duration(f.opt.MaxAge), + }) +} + +// Hash returns the selected checksum of the file or "" if unavailable. +func (o *Object) Hash(ctx context.Context, hashType hash.Type) (hashVal string, err error) { + f := o.f + if f.passHashes.Contains(hashType) { + fs.Debugf(o, "pass %s", hashType) + return o.Object.Hash(ctx, hashType) + } + if !f.suppHashes.Contains(hashType) { + fs.Debugf(o, "unsupp %s", hashType) + return "", hash.ErrUnsupported + } + if hashVal, err = o.getHash(ctx, hashType); err != nil { + fs.Debugf(o, "getHash: %v", err) + err = nil + hashVal = "" + } + if hashVal != "" { + fs.Debugf(o, "cached %s = %q", hashType, hashVal) + return hashVal, nil + } + if f.slowHashes.Contains(hashType) { + fs.Debugf(o, "slow %s", hashType) + hashVal, err = o.Object.Hash(ctx, hashType) + if err == nil && hashVal != "" && f.keepHashes.Contains(hashType) { + if err = o.putHashes(ctx, hashMap{hashType: hashVal}); err != nil { + fs.Debugf(o, "putHashes: %v", err) + err = nil + } + } + return hashVal, err + } + if f.autoHashes.Contains(hashType) && o.Size() < int64(f.opt.AutoSize) { + _ = o.updateHashes(ctx) + if hashVal, err = o.getHash(ctx, hashType); err != nil { + fs.Debugf(o, "auto %s = %q (%v)", hashType, hashVal, err) + err = nil + } + } + return hashVal, err +} + +// updateHashes performs implicit "rclone hashsum --download" and updates cache. +func (o *Object) updateHashes(ctx context.Context) error { + r, err := o.Open(ctx) + if err != nil { + fs.Infof(o, "update failed (open): %v", err) + return err + } + defer func() { + _ = r.Close() + }() + if _, err = io.Copy(ioutil.Discard, r); err != nil { + fs.Infof(o, "update failed (copy): %v", err) + return err + } + return nil +} + +// Update the object with the given data, time and size. +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + _ = o.f.pruneHash(src.Remote()) + return o.Object.Update(ctx, in, src, options...) +} + +// Remove an object. +func (o *Object) Remove(ctx context.Context) error { + _ = o.f.pruneHash(o.Remote()) + return o.Object.Remove(ctx) +} + +// SetModTime sets the modification time of the file. +// Also prunes the cache entry when modtime changes so that +// touching a file will trigger checksum recalculation even +// on backends that don't provide modTime with fingerprint. +func (o *Object) SetModTime(ctx context.Context, mtime time.Time) error { + if mtime != o.Object.ModTime(ctx) { + _ = o.f.pruneHash(o.Remote()) + } + return o.Object.SetModTime(ctx, mtime) +} + +// Open opens the file for read. +// Full reads will also update object hashes. +func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (r io.ReadCloser, err error) { + size := o.Size() + var offset, limit int64 = 0, -1 + for _, option := range options { + switch opt := option.(type) { + case *fs.SeekOption: + offset = opt.Offset + case *fs.RangeOption: + offset, limit = opt.Decode(size) + } + } + if offset < 0 { + return nil, errors.New("invalid offset") + } + if limit < 0 { + limit = size - offset + } + if r, err = o.Object.Open(ctx, options...); err != nil { + return nil, err + } + if offset != 0 || limit < size { + // It's a partial read + return r, err + } + return o.f.newHashingReader(ctx, r, func(sums hashMap) { + if err := o.putHashes(ctx, sums); err != nil { + fs.Infof(o, "auto hashing error: %v", err) + } + }) +} + +// Put data into the remote path with given modTime and size +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + var ( + o *Object + common hash.Set + rehash bool + hashes hashMap + ) + if fsrc := src.Fs(); fsrc != nil { + common = fsrc.Hashes().Overlap(f.keepHashes) + // Rehash if source does not have all required hashes or hashing is slow + rehash = fsrc.Features().SlowHash || common != f.keepHashes + } + + wrapIn := in + if rehash { + r, err := f.newHashingReader(ctx, in, func(sums hashMap) { + hashes = sums + }) + fs.Debugf(src, "Rehash in-fly due to incomplete or slow source set %v (err: %v)", common, err) + if err == nil { + wrapIn = r + } else { + rehash = false + } + } + + _ = f.pruneHash(src.Remote()) + oResult, err := f.Fs.Put(ctx, wrapIn, src, options...) + o = f.wrapObject(oResult, err) + if o == nil { + return nil, err + } + + if !rehash { + hashes = hashMap{} + for _, ht := range common.Array() { + if h, e := src.Hash(ctx, ht); e == nil && h != "" { + hashes[ht] = h + } + } + } + if len(hashes) > 0 { + err := o.putHashes(ctx, hashes) + fs.Debugf(o, "Applied %d source hashes, err: %v", len(hashes), err) + } + return o, err +} + +type hashingReader struct { + rd io.Reader + hasher *hash.MultiHasher + fun func(hashMap) +} + +func (f *Fs) newHashingReader(ctx context.Context, rd io.Reader, fun func(hashMap)) (*hashingReader, error) { + hasher, err := hash.NewMultiHasherTypes(f.keepHashes) + if err != nil { + return nil, err + } + hr := &hashingReader{ + rd: rd, + hasher: hasher, + fun: fun, + } + return hr, nil +} + +func (r *hashingReader) Read(p []byte) (n int, err error) { + n, err = r.rd.Read(p) + if err != nil && err != io.EOF { + r.hasher = nil + } + if r.hasher != nil { + if _, errHash := r.hasher.Write(p[:n]); errHash != nil { + r.hasher = nil + err = errHash + } + } + if err == io.EOF && r.hasher != nil { + r.fun(r.hasher.Sums()) + r.hasher = nil + } + return +} + +func (r *hashingReader) Close() error { + if rc, ok := r.rd.(io.ReadCloser); ok { + return rc.Close() + } + return nil +} + +// Return object fingerprint or empty string in case of errors +// +// Note that we can't use the generic `fs.Fingerprint` here because +// this fingerprint is used to pick _derived hashes_ that are slow +// to calculate or completely unsupported by the base remote. +// +// The hasher fingerprint must be based on `fsHash`, the first _fast_ +// hash supported _by the underlying remote_ (if there is one), +// while `fs.Fingerprint` would select a hash _produced by hasher_ +// creating unresolvable fingerprint loop. +func (o *Object) fingerprint(ctx context.Context) string { + size := o.Object.Size() + timeStr := "-" + if o.f.fpTime { + timeStr = o.Object.ModTime(ctx).UTC().Format(timeFormat) + if timeStr == "" { + return "" + } + } + hashStr := "-" + if o.f.fpHash != hash.None { + var err error + hashStr, err = o.Object.Hash(ctx, o.f.fpHash) + if hashStr == "" || err != nil { + return "" + } + } + return fmt.Sprintf("%d,%s,%s", size, timeStr, hashStr) +}