From 50df8cec9ca00996a7a79369894cfc99c9867e35 Mon Sep 17 00:00:00 2001 From: Ivan Andreev Date: Tue, 12 Oct 2021 14:57:54 +0300 Subject: [PATCH] lib/kv: add key-value database api #5587 Add bolt-based key-value database support. Quick API description: https://github.com/rclone/rclone/pull/5587#issuecomment-942174768 --- docs/content/flags.md | 1 + fs/config.go | 2 + fs/config/configflags/configflags.go | 1 + lib/kv/bolt.go | 312 +++++++++++++++++++++++++++ lib/kv/types.go | 35 +++ lib/kv/unsupported.go | 40 ++++ 6 files changed, 391 insertions(+) create mode 100644 lib/kv/bolt.go create mode 100644 lib/kv/types.go create mode 100644 lib/kv/unsupported.go diff --git a/docs/content/flags.md b/docs/content/flags.md index 980ab5d18..0ad6aa7c4 100644 --- a/docs/content/flags.md +++ b/docs/content/flags.md @@ -71,6 +71,7 @@ These flags are available for every command. --include stringArray Include files matching pattern --include-from stringArray Read include patterns from file (use - to read from stdin) -i, --interactive Enable interactive mode + --kv-lock-time duration Maximum time to keep key-value database locked by process (default 1s) --log-file string Log everything to this file --log-format string Comma separated list of log format options (default "date,time") --log-level string Log level DEBUG|INFO|NOTICE|ERROR (default "NOTICE") diff --git a/fs/config.go b/fs/config.go index 5a4e44c71..0d43e04f9 100644 --- a/fs/config.go +++ b/fs/config.go @@ -132,6 +132,7 @@ type ConfigInfo struct { FsCacheExpireInterval time.Duration DisableHTTP2 bool HumanReadable bool + KvLockTime time.Duration // maximum time to keep key-value database locked by process } // NewConfig creates a new config with everything set to the default @@ -171,6 +172,7 @@ func NewConfig() *ConfigInfo { c.TrackRenamesStrategy = "hash" c.FsCacheExpireDuration = 300 * time.Second c.FsCacheExpireInterval = 60 * time.Second + c.KvLockTime = 1 * time.Second // Perform a simple check for debug flags to enable debug logging during the flag initialization for argIndex, arg := range os.Args { diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index bb4afbaa6..f6ba1c41a 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -136,6 +136,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) { flags.DurationVarP(flagSet, &ci.FsCacheExpireInterval, "fs-cache-expire-interval", "", ci.FsCacheExpireInterval, "interval to check for expired remotes") flags.BoolVarP(flagSet, &ci.DisableHTTP2, "disable-http2", "", ci.DisableHTTP2, "Disable HTTP/2 in the global transport.") flags.BoolVarP(flagSet, &ci.HumanReadable, "human-readable", "", ci.HumanReadable, "Print numbers in a human-readable format. Sizes with suffix Ki|Mi|Gi|Ti|Pi.") + flags.DurationVarP(flagSet, &ci.KvLockTime, "kv-lock-time", "", ci.KvLockTime, "Maximum time to keep key-value database locked by process") } // ParseHeaders converts the strings passed in via the header flags into HTTPOptions diff --git a/lib/kv/bolt.go b/lib/kv/bolt.go new file mode 100644 index 000000000..44a59da7e --- /dev/null +++ b/lib/kv/bolt.go @@ -0,0 +1,312 @@ +//go:build !plan9 && !js +// +build !plan9,!js + +package kv + +import ( + "context" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/lib/encoder" + "go.etcd.io/bbolt" +) + +const ( + initTime = 24 * time.Hour // something reasonably long + dbFileMode = 0600 + dbDirMode = 0700 + queueSize = 2 +) + +// DB represents a key-value database +type DB struct { + name string + path string + facility string + refs int + bolt *bbolt.DB + mu sync.Mutex + canWrite bool + queue chan *request + lockTime time.Duration + idleTime time.Duration + openTime time.Duration + idleTimer *time.Timer + lockTimer *time.Timer +} + +var ( + dbMap = map[string]*DB{} + dbMut = sync.Mutex{} +) + +// Supported returns true on supported OSes +func Supported() bool { return true } + +// makeName makes a store name +func makeName(facility string, f fs.Fs) string { + var name string + if f != nil { + name = f.Name() + if idx := strings.Index(name, "{"); idx != -1 { + name = name[:idx] + } + name = encoder.OS.FromStandardPath(name) + name += "~" + } + return name + facility + ".bolt" +} + +// Start a new key-value database +func Start(ctx context.Context, facility string, f fs.Fs) (*DB, error) { + if db := Get(facility, f); db != nil { + return db, nil + } + + dir := filepath.Join(config.GetCacheDir(), "kv") + if err := os.MkdirAll(dir, dbDirMode); err != nil { + return nil, err + } + + name := makeName(facility, f) + lockTime := fs.GetConfig(ctx).KvLockTime + + db := &DB{ + name: name, + path: filepath.Join(dir, name), + facility: facility, + refs: 1, + lockTime: lockTime, + idleTime: lockTime / 4, + openTime: lockTime * 2, + idleTimer: time.NewTimer(initTime), + lockTimer: time.NewTimer(initTime), + queue: make(chan *request, queueSize), + } + + fi, err := os.Stat(db.path) + if strings.HasSuffix(os.Args[0], ".test") || (err == nil && fi.Size() == 0) { + _ = os.Remove(db.path) + fs.Infof(db.name, "drop cache remaining after unit test") + } + + if err = db.open(ctx, false); err != nil && err != ErrEmpty { + return nil, errors.Wrapf(err, "cannot open db: %s", db.path) + } + + // Initialization above was performed without locks.. + dbMut.Lock() + defer dbMut.Unlock() + if dbOther := dbMap[name]; dbOther != nil { + // Races between concurrent Start's are rare but possible, the 1st one wins. + _ = db.close() + return dbOther, nil + } + go db.loop() // Start queue handling + return db, nil +} + +// Get returns database record for given filesystem and facility +func Get(facility string, f fs.Fs) *DB { + name := makeName(facility, f) + dbMut.Lock() + db := dbMap[name] + if db != nil { + db.mu.Lock() + db.refs++ + db.mu.Unlock() + } + dbMut.Unlock() + return db +} + +// free database record +func (db *DB) free() { + dbMut.Lock() + db.mu.Lock() + db.refs-- + if db.refs <= 0 { + delete(dbMap, db.name) + } + db.mu.Unlock() + dbMut.Unlock() +} + +// Path returns database path +func (db *DB) Path() string { return db.path } + +var modeNames = map[bool]string{false: "reading", true: "writing"} + +func (db *DB) open(ctx context.Context, forWrite bool) (err error) { + if db.bolt != nil && (db.canWrite || !forWrite) { + return nil + } + _ = db.close() + + db.canWrite = forWrite + if !forWrite { + // mitigate https://github.com/etcd-io/bbolt/issues/98 + _, err = os.Stat(db.path) + if os.IsNotExist(err) { + return ErrEmpty + } + } + + opt := &bbolt.Options{ + Timeout: db.openTime, + ReadOnly: !forWrite, + } + openMode := modeNames[forWrite] + startTime := time.Now() + var bolt *bbolt.DB + retry := 1 + maxRetries := fs.GetConfig(ctx).LowLevelRetries + for { + bolt, err = bbolt.Open(db.path, dbFileMode, opt) + if err == nil || retry >= maxRetries { + break + } + fs.Debugf(db.name, "Retry #%d opening for %s: %v", retry, openMode, err) + retry++ + } + if err != nil { + return err + } + + fs.Debugf(db.name, "Opened for %s in %v", openMode, time.Since(startTime)) + _ = db.lockTimer.Reset(db.lockTime) + _ = db.idleTimer.Reset(db.idleTime) + db.bolt = bolt + return nil +} + +func (db *DB) close() (err error) { + if db.bolt != nil { + _ = db.lockTimer.Stop() + _ = db.idleTimer.Stop() + err = db.bolt.Close() + db.bolt = nil + fs.Debugf(db.name, "released") + } + return +} + +// loop over database operations sequentially +func (db *DB) loop() { + ctx := context.Background() + for db.queue != nil { + select { + case req := <-db.queue: + req.handle(ctx, db) + _ = db.idleTimer.Reset(db.idleTime) + case <-db.idleTimer.C: + _ = db.close() + case <-db.lockTimer.C: + _ = db.close() + } + } + db.free() +} + +// Do a key-value operation and return error when done +func (db *DB) Do(write bool, op Op) error { + if db.queue == nil { + return ErrInactive + } + r := &request{ + op: op, + wr: write, + } + r.wg.Add(1) + db.queue <- r + r.wg.Wait() + return r.err +} + +// request encapsulates a synchronous operation and its results +type request struct { + op Op + wr bool + err error + wg sync.WaitGroup +} + +// handle a key-value request with given DB +func (r *request) handle(ctx context.Context, db *DB) { + db.mu.Lock() + if op, stop := r.op.(*opStop); stop { + r.err = db.close() + if op.remove { + if err := os.Remove(db.path); !os.IsNotExist(err) { + r.err = err + } + } + db.queue = nil + } else { + r.err = db.execute(ctx, r.op, r.wr) + } + db.mu.Unlock() + r.wg.Done() +} + +// execute a key-value DB operation +func (db *DB) execute(ctx context.Context, op Op, write bool) error { + if err := db.open(ctx, write); err != nil { + return err + } + if write { + return db.bolt.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(db.facility)) + if err != nil || b == nil { + return ErrEmpty + } + return op.Do(ctx, &bucketAdapter{b}) + }) + } + return db.bolt.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(db.facility)) + if b == nil { + return ErrEmpty + } + return op.Do(ctx, &bucketAdapter{b}) + }) +} + +// bucketAdapter is a thin wrapper adapting kv.Bucket to bbolt.Bucket +type bucketAdapter struct { + *bbolt.Bucket +} + +func (b *bucketAdapter) Cursor() Cursor { + return b.Bucket.Cursor() +} + +// Stop a database loop, optionally removing the file +func (db *DB) Stop(remove bool) error { + return db.Do(false, &opStop{remove: remove}) +} + +// opStop: close database and stop operation loop +type opStop struct { + remove bool +} + +func (*opStop) Do(context.Context, Bucket) error { + return nil +} + +// Exit stops all databases +func Exit() { + dbMut.Lock() + for _, s := range dbMap { + _ = s.Stop(false) + } + dbMut.Unlock() +} diff --git a/lib/kv/types.go b/lib/kv/types.go new file mode 100644 index 000000000..f21957976 --- /dev/null +++ b/lib/kv/types.go @@ -0,0 +1,35 @@ +package kv + +import ( + "context" + + "github.com/pkg/errors" +) + +// package errors +var ( + ErrEmpty = errors.New("database empty") + ErrInactive = errors.New("database stopped") + ErrUnsupported = errors.New("unsupported on this OS") +) + +// Op represents a database operation +type Op interface { + Do(context.Context, Bucket) error +} + +// Bucket decouples bbolt.Bucket from key-val operations +type Bucket interface { + Get([]byte) []byte + Put([]byte, []byte) error + Delete([]byte) error + ForEach(func(bkey, data []byte) error) error + Cursor() Cursor +} + +// Cursor decouples bbolt.Cursor from key-val operations +type Cursor interface { + First() ([]byte, []byte) + Next() ([]byte, []byte) + Seek([]byte) ([]byte, []byte) +} diff --git a/lib/kv/unsupported.go b/lib/kv/unsupported.go new file mode 100644 index 000000000..4bede3fd9 --- /dev/null +++ b/lib/kv/unsupported.go @@ -0,0 +1,40 @@ +//go:build plan9 || js +// +build plan9 js + +package kv + +import ( + "context" + + "github.com/rclone/rclone/fs" +) + +// DB represents a key-value database +type DB struct{} + +// Supported returns true on supported OSes +func Supported() bool { return false } + +// Start a key-value database +func Start(ctx context.Context, facility string, f fs.Fs) (*DB, error) { + return nil, ErrUnsupported +} + +// Get returns database for given filesystem and facility +func Get(f fs.Fs, facility string) *DB { return nil } + +// Path returns database path +func (*DB) Path() string { return "UNSUPPORTED" } + +// Do submits a key-value request and waits for results +func (*DB) Do(write bool, op Op) error { + return ErrUnsupported +} + +// Stop a database loop, optionally removing the file +func (*DB) Stop(remove bool) error { + return ErrUnsupported +} + +// Exit stops all databases +func Exit() {}