diff --git a/.gitignore b/.gitignore index 7aca43329..cde1c880c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ _junk/ rclone build docs/public +rclone.iml +.idea diff --git a/bin/make_manual.py b/bin/make_manual.py index a221e3423..5cb42f163 100755 --- a/bin/make_manual.py +++ b/bin/make_manual.py @@ -25,6 +25,7 @@ docs = [ "s3.md", "b2.md", "box.md", + "cache.md", "crypt.md", "dropbox.md", "ftp.md", diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 000000000..8f8c5357b --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,949 @@ +// +build !plan9 + +package cache + +import ( + "fmt" + "io" + "path" + "strconv" + "strings" + "sync" + "time" + + "os" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" + "golang.org/x/net/context" + "golang.org/x/time/rate" +) + +const ( + // DefCacheChunkSize is the default value for chunk size + DefCacheChunkSize = "5M" + // DefCacheInfoAge is the default value for object info age + DefCacheInfoAge = "6h" + // DefCacheChunkAge is the default value for chunk age duration + DefCacheChunkAge = "3h" + // DefCacheMetaAge is the default value for chunk age duration + DefCacheMetaAge = "3h" + // DefCacheReadRetries is the default value for read retries + DefCacheReadRetries = 3 + // DefCacheTotalWorkers is how many workers run in parallel to download chunks + DefCacheTotalWorkers = 4 + // DefCacheChunkNoMemory will enable or disable in-memory storage for chunks + DefCacheChunkNoMemory = false + // DefCacheRps limits the number of requests per second to the source FS + DefCacheRps = -1 + // DefWarmUpRatePerSeconds will apply a special config for warming up the cache + DefWarmUpRatePerSeconds = "3/20" + // DefCacheWrites will cache file data on writes through the cache + DefCacheWrites = false +) + +// Globals +var ( + // Flags + cacheDbPath = fs.StringP("cache-db-path", "", path.Join(path.Dir(fs.ConfigPath), "cache"), "Directory to cache DB") + cacheDbPurge = fs.BoolP("cache-db-purge", "", false, "Purge the cache DB before") + cacheChunkSize = fs.StringP("cache-chunk-size", "", DefCacheChunkSize, "The size of a chunk") + cacheInfoAge = fs.StringP("cache-info-age", "", DefCacheInfoAge, "How much time should object info be stored in cache") + cacheChunkAge = fs.StringP("cache-chunk-age", "", DefCacheChunkAge, "How much time should a chunk be in cache before cleanup") + cacheMetaAge = fs.StringP("cache-warm-up-age", "", DefCacheMetaAge, "How much time should data be cached during warm up") + cacheReadRetries = fs.IntP("cache-read-retries", "", DefCacheReadRetries, "How many times to retry a read from a cache storage") + cacheTotalWorkers = fs.IntP("cache-workers", "", DefCacheTotalWorkers, "How many workers should run in parallel to download chunks") + cacheChunkNoMemory = fs.BoolP("cache-chunk-no-memory", "", DefCacheChunkNoMemory, "Disable the in-memory cache for storing chunks during streaming") + cacheRps = fs.IntP("cache-rps", "", int(DefCacheRps), "Limits the number of requests per second to the source FS. -1 disables the rate limiter") + cacheWarmUp = fs.StringP("cache-warm-up-rps", "", DefWarmUpRatePerSeconds, "Format is X/Y = how many X opens per Y seconds should trigger the warm up mode. See the docs") + cacheStoreWrites = fs.BoolP("cache-writes", "", DefCacheWrites, "Will cache file data on writes through the FS") +) + +// Register with Fs +func init() { + fs.Register(&fs.RegInfo{ + Name: "cache", + Description: "Cache a remote", + NewFs: NewFs, + Options: []fs.Option{{ + Name: "remote", + Help: "Remote to cache.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended).", + }, { + Name: "chunk_size", + Help: "The size of a chunk. Lower value good for slow connections but can affect seamless reading. \nDefault: " + DefCacheChunkSize, + Examples: []fs.OptionExample{ + { + Value: "1m", + Help: "1MB", + }, { + Value: "5M", + Help: "5 MB", + }, { + Value: "10M", + Help: "10 MB", + }, + }, + Optional: true, + }, { + Name: "info_age", + Help: "How much time should object info (file size, file hashes etc) be stored in cache. Use a very high value if you don't plan on changing the source FS from outside the cache. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheInfoAge, + Examples: []fs.OptionExample{ + { + Value: "1h", + Help: "1 hour", + }, { + Value: "24h", + Help: "24 hours", + }, { + Value: "48h", + Help: "24 hours", + }, + }, + Optional: true, + }, { + Name: "chunk_age", + Help: "How much time should a chunk (file data) be stored in cache. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheChunkAge, + Examples: []fs.OptionExample{ + { + Value: "30s", + Help: "30 seconds", + }, { + Value: "1m", + Help: "1 minute", + }, { + Value: "1h30m", + Help: "1 hour and 30 minutes", + }, + }, + Optional: true, + }, { + Name: "warmup_age", + Help: "How much time should data be cached during warm up. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheMetaAge, + Examples: []fs.OptionExample{ + { + Value: "3h", + Help: "3 hours", + }, { + Value: "6h", + Help: "6 hours", + }, { + Value: "24h", + Help: "24 hours", + }, + }, + Optional: true, + }}, + }) +} + +// ChunkStorage is a storage type that supports only chunk operations (i.e in RAM) +type ChunkStorage interface { + // will check if the chunk is in storage. should be fast and not read the chunk itself if possible + HasChunk(cachedObject *Object, offset int64) bool + + // returns the chunk in storage. return an error if it's not + GetChunk(cachedObject *Object, offset int64) ([]byte, error) + + // add a new chunk + AddChunk(cachedObject *Object, data []byte, offset int64) error + + // AddChunkAhead adds a new chunk before caching an Object for it + AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error + + // if the storage can cleanup on a cron basis + // otherwise it can do a noop operation + CleanChunksByAge(chunkAge time.Duration) + + // if the storage can cleanup chunks after we no longer need them + // otherwise it can do a noop operation + CleanChunksByNeed(offset int64) +} + +// Storage is a storage type (Bolt) which needs to support both chunk and file based operations +type Storage interface { + ChunkStorage + + // will update/create a directory or an error if it's not found + AddDir(cachedDir *Directory) error + + // will return a directory with all the entries in it or an error if it's not found + GetDirEntries(cachedDir *Directory) (fs.DirEntries, error) + + // remove a directory and all the objects and chunks in it + RemoveDir(fp string) error + + // remove a directory and all the objects and chunks in it + ExpireDir(fp string) error + + // will return an object (file) or error if it doesn't find it + GetObject(cachedObject *Object) (err error) + + // add a new object to its parent directory + // the directory structure (all the parents of this object) is created if its not found + AddObject(cachedObject *Object) error + + // remove an object and all its chunks + RemoveObject(fp string) error + + // Stats returns stats about the cache storage + Stats() (map[string]map[string]interface{}, error) + + // if the storage can cleanup on a cron basis + // otherwise it can do a noop operation + CleanEntriesByAge(entryAge time.Duration) + + // Purge will flush the entire cache + Purge() +} + +// Fs represents a wrapped fs.Fs +type Fs struct { + fs.Fs + + name string + root string + features *fs.Features // optional features + cache Storage + + fileAge time.Duration + chunkSize int64 + chunkAge time.Duration + metaAge time.Duration + readRetries int + totalWorkers int + chunkMemory bool + warmUp bool + warmUpRate int + warmUpSec int + cacheWrites bool + originalTotalWorkers int + originalChunkMemory bool + + lastChunkCleanup time.Time + lastRootCleanup time.Time + lastOpenedEntries map[string]time.Time + cleanupMu sync.Mutex + warmupMu sync.Mutex + rateLimiter *rate.Limiter +} + +// NewFs contstructs an Fs from the path, container:path +func NewFs(name, rpath string) (fs.Fs, error) { + remote := fs.ConfigFileGet(name, "remote") + if strings.HasPrefix(remote, name+":") { + return nil, errors.New("can't point cache remote at itself - check the value of the remote setting") + } + + // Look for a file first + remotePath := path.Join(remote, rpath) + wrappedFs, wrapErr := fs.NewFs(remotePath) + + if wrapErr != fs.ErrorIsFile && wrapErr != nil { + return nil, errors.Wrapf(wrapErr, "failed to make remote %q to wrap", remotePath) + } + fs.Debugf(name, "wrapped %v:%v at root %v", wrappedFs.Name(), wrappedFs.Root(), rpath) + + var chunkSize fs.SizeSuffix + chunkSizeString := fs.ConfigFileGet(name, "chunk_size", DefCacheChunkSize) + if *cacheChunkSize != DefCacheChunkSize { + chunkSizeString = *cacheChunkSize + } + err := chunkSize.Set(chunkSizeString) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand chunk size", chunkSizeString) + } + infoAge := fs.ConfigFileGet(name, "info_age", DefCacheInfoAge) + if *cacheInfoAge != DefCacheInfoAge { + infoAge = *cacheInfoAge + } + infoDuration, err := time.ParseDuration(infoAge) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand duration", infoAge) + } + chunkAge := fs.ConfigFileGet(name, "chunk_age", DefCacheChunkAge) + if *cacheChunkAge != DefCacheChunkAge { + chunkAge = *cacheChunkAge + } + chunkDuration, err := time.ParseDuration(chunkAge) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand duration", chunkAge) + } + metaAge := fs.ConfigFileGet(name, "warmup_age", DefCacheChunkAge) + if *cacheMetaAge != DefCacheMetaAge { + metaAge = *cacheMetaAge + } + metaDuration, err := time.ParseDuration(metaAge) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand duration", metaAge) + } + warmupRps := strings.Split(*cacheWarmUp, "/") + warmupRate, err := strconv.Atoi(warmupRps[0]) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand warm up rate", *cacheWarmUp) + } + warmupSec, err := strconv.Atoi(warmupRps[1]) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand warm up seconds", *cacheWarmUp) + } + // configure cache backend + if *cacheDbPurge { + fs.Debugf(name, "Purging the DB") + } + f := &Fs{ + Fs: wrappedFs, + name: name, + root: rpath, + fileAge: infoDuration, + chunkSize: int64(chunkSize), + chunkAge: chunkDuration, + metaAge: metaDuration, + readRetries: *cacheReadRetries, + totalWorkers: *cacheTotalWorkers, + originalTotalWorkers: *cacheTotalWorkers, + chunkMemory: !*cacheChunkNoMemory, + originalChunkMemory: !*cacheChunkNoMemory, + warmUp: false, + warmUpRate: warmupRate, + warmUpSec: warmupSec, + cacheWrites: *cacheStoreWrites, + lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30), + lastRootCleanup: time.Now().Truncate(time.Hour * 24 * 30), + lastOpenedEntries: make(map[string]time.Time), + } + f.rateLimiter = rate.NewLimiter(rate.Limit(float64(*cacheRps)), f.totalWorkers) + + dbPath := *cacheDbPath + if path.Ext(dbPath) != "" { + dbPath = path.Dir(dbPath) + } + err = os.MkdirAll(dbPath, os.ModePerm) + if err != nil { + return nil, errors.Wrapf(err, "failed to create cache directory %v", dbPath) + } + + dbPath = path.Join(dbPath, name+".db") + fs.Infof(name, "Storage DB path: %v", dbPath) + f.cache = GetPersistent(dbPath, *cacheDbPurge) + if err != nil { + return nil, err + } + + fs.Infof(name, "Chunk Memory: %v", f.chunkMemory) + fs.Infof(name, "Chunk Size: %v", fs.SizeSuffix(f.chunkSize)) + fs.Infof(name, "Workers: %v", f.totalWorkers) + fs.Infof(name, "File Age: %v", f.fileAge.String()) + fs.Infof(name, "Chunk Age: %v", f.chunkAge.String()) + fs.Infof(name, "Cache Writes: %v", f.cacheWrites) + + go f.CleanUpCache(false) + + // TODO: Explore something here but now it's not something we want + // when writing from cache, source FS will send a notification and clear it out immediately + //setup dir notification + //doDirChangeNotify := wrappedFs.Features().DirChangeNotify + //if doDirChangeNotify != nil { + // doDirChangeNotify(func(dir string) { + // d := NewAbsDirectory(f, dir) + // d.Flush() + // fs.Infof(dir, "updated from notification") + // }, time.Second * 10) + //} + + f.features = (&fs.Features{ + CanHaveEmptyDirectories: true, + DuplicateFiles: false, // storage doesn't permit this + Purge: f.Purge, + Copy: f.Copy, + Move: f.Move, + DirMove: f.DirMove, + DirChangeNotify: nil, + DirCacheFlush: f.DirCacheFlush, + PutUnchecked: f.PutUnchecked, + CleanUp: f.CleanUp, + UnWrap: f.UnWrap, + }).Fill(f).Mask(wrappedFs) + + return f, wrapErr +} + +// Name of the remote (as passed into NewFs) +func (f *Fs) Name() string { + return f.name +} + +// Root of the remote (as passed into NewFs) +func (f *Fs) Root() string { + return f.root +} + +// Features returns the optional features of this Fs +func (f *Fs) Features() *fs.Features { + return f.features +} + +// String returns a description of the FS +func (f *Fs) String() string { + return fmt.Sprintf("%s:%s", f.name, f.root) +} + +// ChunkSize returns the configured chunk size +func (f *Fs) ChunkSize() int64 { + return f.chunkSize +} + +// originalSettingWorkers will return the original value of this config +func (f *Fs) originalSettingWorkers() int { + return f.originalTotalWorkers +} + +// originalSettingChunkNoMemory will return the original value of this config +func (f *Fs) originalSettingChunkNoMemory() bool { + return f.originalChunkMemory +} + +// InWarmUp says if cache warm up is active +func (f *Fs) InWarmUp() bool { + return f.warmUp +} + +// enableWarmUp will enable the warm up state of this cache along with the relevant settings +func (f *Fs) enableWarmUp() { + f.totalWorkers = 1 + f.chunkMemory = false + f.warmUp = true +} + +// disableWarmUp will disable the warm up state of this cache along with the relevant settings +func (f *Fs) disableWarmUp() { + f.totalWorkers = f.originalSettingWorkers() + f.chunkMemory = !f.originalSettingChunkNoMemory() + f.warmUp = false +} + +// NewObject finds the Object at remote. +func (f *Fs) NewObject(remote string) (fs.Object, error) { + co := NewObject(f, remote) + err := f.cache.GetObject(co) + if err == nil { + return co, nil + } + obj, err := f.Fs.NewObject(remote) + if err != nil { + return nil, err + } + co = ObjectFromOriginal(f, obj) + co.persist() + return co, nil +} + +// List the objects and directories in dir into entries +func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { + // clean cache + go f.CleanUpCache(false) + + cd := NewDirectory(f, dir) + entries, err = f.cache.GetDirEntries(cd) + if err != nil { + fs.Debugf(dir, "no dir entries in cache: %v", err) + } else if len(entries) == 0 { + // TODO: read empty dirs from source? + } else { + return entries, nil + } + + entries, err = f.Fs.List(dir) + if err != nil { + return nil, err + } + + var cachedEntries fs.DirEntries + for _, entry := range entries { + switch o := entry.(type) { + case fs.Object: + co := ObjectFromOriginal(f, o) + co.persist() + cachedEntries = append(cachedEntries, co) + case fs.Directory: + cd := DirectoryFromOriginal(f, o) + err = f.cache.AddDir(cd) + cachedEntries = append(cachedEntries, cd) + default: + err = errors.Errorf("Unknown object type %T", entry) + } + } + if err != nil { + fs.Errorf(dir, "err caching listing: %v", err) + } + + return cachedEntries, nil +} + +func (f *Fs) recurse(dir string, list *fs.ListRHelper) error { + entries, err := f.List(dir) + if err != nil { + return err + } + + for i := 0; i < len(entries); i++ { + innerDir, ok := entries[i].(fs.Directory) + if ok { + err := f.recurse(innerDir.Remote(), list) + if err != nil { + return err + } + } + + err := list.Add(entries[i]) + if err != nil { + return err + } + } + + return nil +} + +// ListR lists the objects and directories of the Fs starting +// from dir recursively into out. +func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { + fs.Debugf(f, "list recursively from '%s'", dir) + + // we check if the source FS supports ListR + // if it does, we'll use that to get all the entries, cache them and return + do := f.Fs.Features().ListR + if do != nil { + return do(dir, func(entries fs.DirEntries) error { + // we got called back with a set of entries so let's cache them and call the original callback + for _, entry := range entries { + switch o := entry.(type) { + case fs.Object: + _ = f.cache.AddObject(ObjectFromOriginal(f, o)) + case fs.Directory: + _ = f.cache.AddDir(DirectoryFromOriginal(f, o)) + default: + return errors.Errorf("Unknown object type %T", entry) + } + } + + // call the original callback + return callback(entries) + }) + } + + // if we're here, we're gonna do a standard recursive traversal and cache everything + list := fs.NewListRHelper(callback) + err = f.recurse(dir, list) + if err != nil { + return err + } + + return list.Flush() +} + +// Mkdir makes the directory (container, bucket) +func (f *Fs) Mkdir(dir string) error { + err := f.Fs.Mkdir(dir) + if err != nil { + return err + } + if dir == "" && f.Root() == "" { // creating the root is possible but we don't need that cached as we have it already + fs.Debugf(dir, "skipping empty dir in cache") + return nil + } + fs.Infof(f, "create dir '%s'", dir) + + // make an empty dir + _ = f.cache.AddDir(NewDirectory(f, dir)) + + // clean cache + go f.CleanUpCache(false) + return nil +} + +// Rmdir removes the directory (container, bucket) if empty +func (f *Fs) Rmdir(dir string) error { + err := f.Fs.Rmdir(dir) + if err != nil { + return err + } + + _ = f.cache.RemoveDir(NewDirectory(f, dir).abs()) + + // clean cache + go f.CleanUpCache(false) + return nil +} + +// DirMove moves src, srcRemote to this remote at dstRemote +// using server side move operations. +func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { + do := f.Fs.Features().DirMove + if do == nil { + return fs.ErrorCantDirMove + } + srcFs, ok := src.(*Fs) + if !ok { + fs.Errorf(srcFs, "can't move directory - not same remote type") + return fs.ErrorCantDirMove + } + if srcFs.Fs.Name() != f.Fs.Name() { + fs.Errorf(srcFs, "can't move directory - not wrapping same remotes") + return fs.ErrorCantDirMove + } + fs.Infof(f, "move dir '%s'/'%s' -> '%s'", srcRemote, srcFs.Root(), dstRemote) + + err := do(src.Features().UnWrap(), srcRemote, dstRemote) + if err != nil { + return err + } + + srcDir := NewDirectory(srcFs, srcRemote) + // clear any likely dir cached + _ = f.cache.ExpireDir(srcDir.parentRemote()) + _ = f.cache.ExpireDir(NewDirectory(srcFs, dstRemote).parentRemote()) + // delete src dir + _ = f.cache.RemoveDir(srcDir.abs()) + + // clean cache + go f.CleanUpCache(false) + return nil +} + +// cacheReader will split the stream of a reader to be cached at the same time it is read by the original source +func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader)) { + // create the pipe and tee reader + pr, pw := io.Pipe() + tr := io.TeeReader(u, pw) + + // create channel to synchronize + done := make(chan bool) + defer close(done) + + go func() { + // notify the cache reader that we're complete after the source FS finishes + defer func() { + _ = pw.Close() + }() + // process original reading + originalRead(tr) + // signal complete + done <- true + }() + + go func() { + var offset int64 + for { + chunk := make([]byte, f.chunkSize) + readSize, err := io.ReadFull(pr, chunk) + // we ignore 3 failures which are ok: + // 1. EOF - original reading finished and we got a full buffer too + // 2. ErrUnexpectedEOF - original reading finished and partial buffer + // 3. ErrClosedPipe - source remote reader was closed (usually means it reached the end) and we need to stop too + // if we have a different error: we're going to error out the original reading too and stop this + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe { + fs.Errorf(src, "error saving new data in cache. offset: %v, err: %v", offset, err) + _ = pr.CloseWithError(err) + break + } + // if we have some bytes we cache them + if readSize > 0 { + chunk = chunk[:readSize] + err2 := f.cache.AddChunkAhead(cleanPath(path.Join(f.root, src.Remote())), chunk, offset, f.metaAge) + if err2 != nil { + fs.Errorf(src, "error saving new data in cache '%v'", err2) + _ = pr.CloseWithError(err2) + break + } + offset += int64(readSize) + } + // stuff should be closed but let's be sure + if err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe { + _ = pr.Close() + break + } + } + + // signal complete + done <- true + }() + + // wait until both are done + for c := 0; c < 2; c++ { + <-done + } +} + +// Put in to the remote path with the modTime given of the given size +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + fs.Debugf(f, "put data at '%s'", src.Remote()) + + var err error + var obj fs.Object + if f.cacheWrites { + f.cacheReader(in, src, func(inn io.Reader) { + obj, err = f.Fs.Put(inn, src, options...) + }) + } else { + obj, err = f.Fs.Put(in, src, options...) + } + + if err != nil { + fs.Errorf(src, "error saving in cache: %v", err) + return nil, err + } + cachedObj := ObjectFromOriginal(f, obj).persist() + + // clean cache + go f.CleanUpCache(false) + return cachedObj, nil +} + +// PutUnchecked uploads the object +func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + do := f.Fs.Features().PutUnchecked + if do == nil { + return nil, errors.New("can't PutUnchecked") + } + fs.Infof(f, "put data unchecked in '%s'", src.Remote()) + + var err error + var obj fs.Object + if f.cacheWrites { + f.cacheReader(in, src, func(inn io.Reader) { + obj, err = f.Fs.Put(inn, src, options...) + }) + } else { + obj, err = f.Fs.Put(in, src, options...) + } + + if err != nil { + fs.Errorf(src, "error saving in cache: %v", err) + return nil, err + } + cachedObj := ObjectFromOriginal(f, obj).persist() + + // clean cache + go f.CleanUpCache(false) + return cachedObj, nil +} + +// Copy src to this remote using server side copy operations. +func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { + do := f.Fs.Features().Copy + if do == nil { + return nil, fs.ErrorCantCopy + } + + srcObj, ok := src.(*Object) + if !ok { + fs.Errorf(srcObj, "can't copy - not same remote type") + return nil, fs.ErrorCantCopy + } + if srcObj.CacheFs.Fs.Name() != f.Fs.Name() { + fs.Errorf(srcObj, "can't copy - not wrapping same remote types") + return nil, fs.ErrorCantCopy + } + + fs.Infof(f, "copy obj '%s' -> '%s'", srcObj.abs(), remote) + + // store in cache + if err := srcObj.refreshFromSource(); err != nil { + fs.Errorf(f, "can't move %v - %v", src, err) + return nil, fs.ErrorCantCopy + } + obj, err := do(srcObj.Object, remote) + if err != nil { + fs.Errorf(srcObj, "error moving in cache: %v", err) + return nil, err + } + + // persist new + cachedObj := ObjectFromOriginal(f, obj).persist() + _ = f.cache.ExpireDir(cachedObj.parentRemote()) + + // clean cache + go f.CleanUpCache(false) + return cachedObj, nil +} + +// Move src to this remote using server side move operations. +func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { + do := f.Fs.Features().Move + if do == nil { + return nil, fs.ErrorCantMove + } + + srcObj, ok := src.(*Object) + if !ok { + fs.Errorf(srcObj, "can't move - not same remote type") + return nil, fs.ErrorCantMove + } + + if srcObj.CacheFs.Fs.Name() != f.Fs.Name() { + fs.Errorf(srcObj, "can't move - not wrapping same remote types") + return nil, fs.ErrorCantMove + } + + fs.Infof(f, "moving obj '%s' -> %s", srcObj.abs(), remote) + + // save in cache + if err := srcObj.refreshFromSource(); err != nil { + fs.Errorf(f, "can't move %v - %v", src, err) + return nil, fs.ErrorCantMove + } + obj, err := do(srcObj.Object, remote) + if err != nil { + fs.Errorf(srcObj, "error moving in cache: %v", err) + return nil, err + } + + // remove old + _ = f.cache.ExpireDir(srcObj.parentRemote()) + _ = f.cache.RemoveObject(srcObj.abs()) + + // persist new + cachedObj := ObjectFromOriginal(f, obj) + cachedObj.persist() + _ = f.cache.ExpireDir(cachedObj.parentRemote()) + + // clean cache + go f.CleanUpCache(false) + return cachedObj, nil +} + +// Hashes returns the supported hash sets. +func (f *Fs) Hashes() fs.HashSet { + return f.Fs.Hashes() +} + +// Purge all files in the root and the root directory +func (f *Fs) Purge() error { + fs.Infof(f, "purging cache") + f.cache.Purge() + + f.warmupMu.Lock() + defer f.warmupMu.Unlock() + f.lastOpenedEntries = make(map[string]time.Time) + + do := f.Fs.Features().Purge + if do == nil { + return nil + } + + err := do() + if err != nil { + return err + } + + return nil +} + +// CleanUp the trash in the Fs +func (f *Fs) CleanUp() error { + f.CleanUpCache(false) + + do := f.Fs.Features().CleanUp + if do == nil { + return nil + } + + return do() +} + +// Stats returns stats about the cache storage +func (f *Fs) Stats() (map[string]map[string]interface{}, error) { + return f.cache.Stats() +} + +// OpenRateLimited will execute a closure under a rate limiter watch +func (f *Fs) OpenRateLimited(fn func() (io.ReadCloser, error)) (io.ReadCloser, error) { + var err error + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + start := time.Now() + + if err = f.rateLimiter.Wait(ctx); err != nil { + return nil, err + } + + elapsed := time.Since(start) + if elapsed > time.Second*2 { + fs.Debugf(f, "rate limited: %s", elapsed) + } + return fn() +} + +// CheckIfWarmupNeeded changes the FS settings during warmups +func (f *Fs) CheckIfWarmupNeeded(remote string) { + f.warmupMu.Lock() + defer f.warmupMu.Unlock() + + secondCount := time.Duration(f.warmUpSec) + rate := f.warmUpRate + + // clean up entries older than the needed time frame needed + for k, v := range f.lastOpenedEntries { + if time.Now().After(v.Add(time.Second * secondCount)) { + delete(f.lastOpenedEntries, k) + } + } + f.lastOpenedEntries[remote] = time.Now() + + // simple check for the current load + if len(f.lastOpenedEntries) >= rate && !f.warmUp { + fs.Infof(f, "turning on cache warmup") + f.enableWarmUp() + } else if len(f.lastOpenedEntries) < rate && f.warmUp { + fs.Infof(f, "turning off cache warmup") + f.disableWarmUp() + } +} + +// CleanUpCache will cleanup only the cache data that is expired +func (f *Fs) CleanUpCache(ignoreLastTs bool) { + f.cleanupMu.Lock() + defer f.cleanupMu.Unlock() + + if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(f.chunkAge/4)) { + fs.Infof("cache", "running chunks cleanup") + f.cache.CleanChunksByAge(f.chunkAge) + f.lastChunkCleanup = time.Now() + } + + if ignoreLastTs || time.Now().After(f.lastRootCleanup.Add(f.fileAge/4)) { + fs.Infof("cache", "running root cleanup") + f.cache.CleanEntriesByAge(f.fileAge) + f.lastRootCleanup = time.Now() + } +} + +// UnWrap returns the Fs that this Fs is wrapping +func (f *Fs) UnWrap() fs.Fs { + return f.Fs +} + +// DirCacheFlush flushes the dir cache +func (f *Fs) DirCacheFlush() { + _ = f.cache.RemoveDir("") +} + +func cleanPath(p string) string { + p = path.Clean(p) + if p == "." || p == "/" { + p = "" + } + + return p +} + +// Check the interfaces are satisfied +var ( + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.CleanUpper = (*Fs)(nil) + _ fs.UnWrapper = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) +) diff --git a/cache/cache_internal_test.go b/cache/cache_internal_test.go new file mode 100644 index 000000000..02eed7f7e --- /dev/null +++ b/cache/cache_internal_test.go @@ -0,0 +1,653 @@ +// +build !plan9 + +package cache_test + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "math/rand" + "path" + "strconv" + "sync" + "testing" + "time" + + "github.com/ncw/rclone/cache" + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest" + "github.com/ncw/rclone/local" + flag "github.com/spf13/pflag" + "github.com/stretchr/testify/require" +) + +var ( + WrapRemote = flag.String("wrap-remote", "", "Remote to wrap") + RemoteName = flag.String("remote-name", "TestCacheInternal", "Root remote") + SkipTimeouts = flag.Bool("skip-waits", false, "To skip tests that have wait times") + rootFs fs.Fs + boltDb *cache.Persistent + metaAge = time.Second * 30 + infoAge = time.Second * 10 + chunkAge = time.Second * 10 + okDiff = time.Second * 9 // really big diff here but the build machines seem to be slow. need a different way for this + workers = 2 + warmupRate = 3 + warmupSec = 10 +) + +// prepare the test server and return a function to tidy it up afterwards +func TestInternalInit(t *testing.T) { + var err error + + // delete the default path + dbPath := path.Join(path.Dir(fs.ConfigPath), "cache", *RemoteName+".db") + boltDb = cache.GetPersistent(dbPath, true) + fstest.Initialise() + + if len(*WrapRemote) == 0 { + *WrapRemote = "localInternal:/var/tmp/rclone-cache" + fs.ConfigFileSet("localInternal", "type", "local") + fs.ConfigFileSet("localInternal", "nounc", "true") + } + + remoteExists := false + for _, s := range fs.ConfigFileSections() { + if s == *RemoteName { + remoteExists = true + } + } + + if !remoteExists { + fs.ConfigFileSet(*RemoteName, "type", "cache") + fs.ConfigFileSet(*RemoteName, "remote", *WrapRemote) + fs.ConfigFileSet(*RemoteName, "chunk_size", "1024") + fs.ConfigFileSet(*RemoteName, "chunk_age", chunkAge.String()) + fs.ConfigFileSet(*RemoteName, "info_age", infoAge.String()) + } + + _ = flag.Set("cache-warm-up-age", metaAge.String()) + _ = flag.Set("cache-warm-up-rps", fmt.Sprintf("%v/%v", warmupRate, warmupSec)) + _ = flag.Set("cache-chunk-no-memory", "true") + _ = flag.Set("cache-workers", strconv.Itoa(workers)) + + // Instantiate root + rootFs, err = fs.NewFs(*RemoteName + ":") + _ = rootFs.Features().Purge() + require.NoError(t, err) + err = rootFs.Mkdir("") + require.NoError(t, err) + + // flush cache + _, err = getCacheFs(rootFs) + require.NoError(t, err) +} + +func TestInternalListRootAndInnerRemotes(t *testing.T) { + // Instantiate inner fs + innerFolder := "inner" + err := rootFs.Mkdir(innerFolder) + require.NoError(t, err) + innerFs, err := fs.NewFs(*RemoteName + ":" + innerFolder) + require.NoError(t, err) + + obj := writeObjectString(t, innerFs, "one", "content") + + listRoot, err := rootFs.List("") + require.NoError(t, err) + listRootInner, err := rootFs.List(innerFolder) + require.NoError(t, err) + listInner, err := innerFs.List("") + require.NoError(t, err) + + require.Lenf(t, listRoot, 1, "remote %v should have 1 entry", rootFs.Root()) + require.Lenf(t, listRootInner, 1, "remote %v should have 1 entry in %v", rootFs.Root(), innerFolder) + require.Lenf(t, listInner, 1, "remote %v should have 1 entry", innerFs.Root()) + + err = obj.Remove() + require.NoError(t, err) + + err = innerFs.Features().Purge() + require.NoError(t, err) + innerFs = nil +} + +func TestInternalObjWrapFsFound(t *testing.T) { + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + wrappedFs := cfs.UnWrap() + data := "content" + writeObjectString(t, wrappedFs, "second", data) + + listRoot, err := rootFs.List("") + require.NoError(t, err) + require.Lenf(t, listRoot, 1, "remote %v should have 1 entry", rootFs.Root()) + + co, err := rootFs.NewObject("second") + require.NoError(t, err) + r, err := co.Open() + require.NoError(t, err) + cachedData, err := ioutil.ReadAll(r) + require.NoError(t, err) + err = r.Close() + require.NoError(t, err) + + strCached := string(cachedData) + require.Equal(t, data, strCached) + + err = co.Remove() + require.NoError(t, err) + + listRoot, err = wrappedFs.List("") + require.NoError(t, err) + require.Lenf(t, listRoot, 0, "remote %v should have 0 entries: %v", wrappedFs.Root(), listRoot) +} + +func TestInternalObjNotFound(t *testing.T) { + reset(t) + obj, err := rootFs.NewObject("404") + require.Error(t, err) + require.Nil(t, obj) +} + +func TestInternalCachedWrittenContentMatches(t *testing.T) { + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + // create some rand test data + testData := make([]byte, (chunkSize*4 + chunkSize/2)) + testSize, err := rand.Read(testData) + require.Equal(t, len(testData), testSize, "data size doesn't match") + require.NoError(t, err) + + // write the object + o := writeObjectBytes(t, rootFs, "data.bin", testData) + require.Equal(t, o.Size(), int64(testSize)) + + // check sample of data from in-file + sampleStart := chunkSize / 2 + sampleEnd := chunkSize + testSample := testData[sampleStart:sampleEnd] + checkSample := readDataFromObj(t, o, sampleStart, sampleEnd, false) + require.Equal(t, int64(len(checkSample)), sampleEnd-sampleStart) + require.Equal(t, checkSample, testSample) +} + +func TestInternalCachedUpdatedContentMatches(t *testing.T) { + reset(t) + + // create some rand test data + testData1 := []byte(fstest.RandomString(100)) + testData2 := []byte(fstest.RandomString(200)) + + // write the object + o := updateObjectBytes(t, rootFs, "data.bin", testData1, testData2) + require.Equal(t, o.Size(), int64(len(testData2))) + + // check data from in-file + reader, err := o.Open() + require.NoError(t, err) + checkSample, err := ioutil.ReadAll(reader) + _ = reader.Close() + require.NoError(t, err) + require.Equal(t, checkSample, testData2) +} + +func TestInternalWrappedWrittenContentMatches(t *testing.T) { + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + reset(t) + + // create some rand test data + testData := make([]byte, (chunkSize*4 + chunkSize/2)) + testSize, err := rand.Read(testData) + require.Equal(t, len(testData), testSize) + require.NoError(t, err) + + // write the object + o := writeObjectBytes(t, cfs.UnWrap(), "data.bin", testData) + require.Equal(t, o.Size(), int64(testSize)) + + o2, err := rootFs.NewObject("data.bin") + require.NoError(t, err) + require.Equal(t, o2.Size(), o.Size()) + + // check sample of data from in-file + sampleStart := chunkSize / 2 + sampleEnd := chunkSize + testSample := testData[sampleStart:sampleEnd] + checkSample := readDataFromObj(t, o2, sampleStart, sampleEnd, false) + require.Equal(t, len(checkSample), len(testSample)) + + for i := 0; i < len(checkSample); i++ { + require.Equal(t, testSample[i], checkSample[i]) + } +} + +func TestInternalLargeWrittenContentMatches(t *testing.T) { + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + reset(t) + + // create some rand test data + testData := make([]byte, (chunkSize*10 + chunkSize/2)) + testSize, err := rand.Read(testData) + require.Equal(t, len(testData), testSize) + require.NoError(t, err) + + // write the object + o := writeObjectBytes(t, cfs.UnWrap(), "data.bin", testData) + require.Equal(t, o.Size(), int64(testSize)) + + o2, err := rootFs.NewObject("data.bin") + require.NoError(t, err) + require.Equal(t, o2.Size(), o.Size()) + + // check data from in-file + checkSample := readDataFromObj(t, o2, int64(0), int64(testSize), false) + require.Equal(t, len(checkSample), len(testData)) + + for i := 0; i < len(checkSample); i++ { + require.Equal(t, testData[i], checkSample[i], "byte: %d (%d), chunk: %d", int64(i)%chunkSize, i, int64(i)/chunkSize) + } +} + +func TestInternalWrappedFsChangeNotSeen(t *testing.T) { + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + // create some rand test data + co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2)) + + // update in the wrapped fs + o, err := cfs.UnWrap().NewObject(co.Remote()) + require.NoError(t, err) + err = o.SetModTime(co.ModTime().Truncate(time.Hour)) + require.NoError(t, err) + + // get a new instance from the cache + co2, err := rootFs.NewObject(o.Remote()) + require.NoError(t, err) + + require.NotEqual(t, o.ModTime(), co.ModTime()) + require.NotEqual(t, o.ModTime(), co2.ModTime()) + require.Equal(t, co.ModTime(), co2.ModTime()) +} + +func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) { + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + + cfs.DirCacheFlush() // flush the cache + + l, err := cfs.UnWrap().List("") + require.NoError(t, err) + require.Len(t, l, 1) + o := l[0] + + // get a new instance from the cache + co, err := rootFs.NewObject(o.Remote()) + require.NoError(t, err) + require.Equal(t, o.ModTime(), co.ModTime()) +} + +func TestInternalWarmUp(t *testing.T) { + if *SkipTimeouts { + t.Skip("--skip-waits set") + } + + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + o1 := writeObjectRandomBytes(t, rootFs, (chunkSize * 3)) + o2 := writeObjectRandomBytes(t, rootFs, (chunkSize * 4)) + o3 := writeObjectRandomBytes(t, rootFs, (chunkSize * 6)) + + _ = readDataFromObj(t, o1, 0, chunkSize, false) + _ = readDataFromObj(t, o2, 0, chunkSize, false) + + // validate a fresh chunk + expectedExpiry := time.Now().Add(chunkAge) + ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), o2.Remote()), 0) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) + + // validate that we entered a warm up state + _ = readDataFromObj(t, o3, 0, chunkSize, false) + require.True(t, cfs.InWarmUp()) + expectedExpiry = time.Now().Add(metaAge) + ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), 0) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) + + // validate that we cooled down and exit warm up + // we wait for the cache to expire + t.Logf("Waiting 10 seconds for warm up to expire\n") + time.Sleep(time.Second * 10) + + _ = readDataFromObj(t, o3, chunkSize, chunkSize*2, false) + require.False(t, cfs.InWarmUp()) + expectedExpiry = time.Now().Add(chunkAge) + ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), chunkSize) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) +} + +func TestInternalWarmUpInFlight(t *testing.T) { + if *SkipTimeouts { + t.Skip("--skip-waits set") + } + + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + o1 := writeObjectRandomBytes(t, rootFs, (chunkSize * 3)) + o2 := writeObjectRandomBytes(t, rootFs, (chunkSize * 4)) + o3 := writeObjectRandomBytes(t, rootFs, (chunkSize * int64(workers) * int64(2))) + + _ = readDataFromObj(t, o1, 0, chunkSize, false) + _ = readDataFromObj(t, o2, 0, chunkSize, false) + require.False(t, cfs.InWarmUp()) + + // validate that we entered a warm up state + _ = readDataFromObj(t, o3, 0, chunkSize, false) + require.True(t, cfs.InWarmUp()) + expectedExpiry := time.Now().Add(metaAge) + ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), 0) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) + + checkSample := make([]byte, chunkSize) + reader, err := o3.Open(&fs.SeekOption{Offset: 0}) + require.NoError(t, err) + rs, ok := reader.(*cache.Handle) + require.True(t, ok) + + for i := 0; i <= workers; i++ { + _, _ = rs.Seek(int64(i)*chunkSize, 0) + _, err = io.ReadFull(reader, checkSample) + require.NoError(t, err) + + if i == workers { + require.False(t, rs.InWarmUp(), "iteration %v", i) + } else { + require.True(t, rs.InWarmUp(), "iteration %v", i) + } + } + _ = reader.Close() + require.True(t, cfs.InWarmUp()) + expectedExpiry = time.Now().Add(chunkAge) + ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), chunkSize*int64(workers+1)) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) + + // validate that we cooled down and exit warm up + // we wait for the cache to expire + t.Logf("Waiting 10 seconds for warm up to expire\n") + time.Sleep(time.Second * 10) + + _ = readDataFromObj(t, o2, chunkSize, chunkSize*2, false) + require.False(t, cfs.InWarmUp()) + expectedExpiry = time.Now().Add(chunkAge) + ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o2.Remote()), chunkSize) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) +} + +// TODO: this is bugged +//func TestInternalRateLimiter(t *testing.T) { +// reset(t) +// _ = flag.Set("cache-rps", "2") +// rootFs, err := fs.NewFs(*RemoteName + ":") +// require.NoError(t, err) +// defer func() { +// _ = flag.Set("cache-rps", "-1") +// rootFs, err = fs.NewFs(*RemoteName + ":") +// require.NoError(t, err) +// }() +// cfs, err := getCacheFs(rootFs) +// require.NoError(t, err) +// chunkSize := cfs.ChunkSize() +// +// // create some rand test data +// co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2)) +// +// doStuff(t, 5, time.Second, func() { +// r, err := co.Open(&fs.SeekOption{Offset: chunkSize + 1}) +// require.NoError(t, err) +// +// buf := make([]byte, chunkSize) +// totalRead, err := io.ReadFull(r, buf) +// require.NoError(t, err) +// require.Equal(t, len(buf), totalRead) +// _ = r.Close() +// }) +//} + +func TestInternalCacheWrites(t *testing.T) { + reset(t) + _ = flag.Set("cache-writes", "true") + rootFs, err := fs.NewFs(*RemoteName + ":") + require.NoError(t, err) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + // create some rand test data + co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2)) + expectedExpiry := time.Now().Add(metaAge) + ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), co.Remote()), 0) + require.NoError(t, err) + require.WithinDuration(t, expectedExpiry, ts, okDiff) + + // reset fs + _ = flag.Set("cache-writes", "false") + rootFs, err = fs.NewFs(*RemoteName + ":") + require.NoError(t, err) +} + +func TestInternalExpiredChunkRemoved(t *testing.T) { + if *SkipTimeouts { + t.Skip("--skip-waits set") + } + + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + totalChunks := 20 + + // create some rand test data + co := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2)) + remote := co.Remote() + // cache all the chunks + _ = readDataFromObj(t, co, 0, co.Size(), false) + + // we wait for the cache to expire + t.Logf("Waiting %v for cache to expire\n", chunkAge.String()) + time.Sleep(chunkAge) + _, _ = cfs.List("") + time.Sleep(time.Second * 2) + + o, err := cfs.NewObject(remote) + require.NoError(t, err) + co2, ok := o.(*cache.Object) + require.True(t, ok) + require.False(t, boltDb.HasChunk(co2, 0)) +} + +func TestInternalExpiredEntriesRemoved(t *testing.T) { + if *SkipTimeouts { + t.Skip("--skip-waits set") + } + + reset(t) + cfs, err := getCacheFs(rootFs) + require.NoError(t, err) + + // create some rand test data + _ = writeObjectString(t, cfs, "one", "one content") + err = cfs.Mkdir("test") + require.NoError(t, err) + _ = writeObjectString(t, cfs, "test/second", "second content") + + objOne, err := cfs.NewObject("one") + require.NoError(t, err) + require.Equal(t, int64(len([]byte("one content"))), objOne.Size()) + + waitTime := infoAge + time.Second*2 + t.Logf("Waiting %v seconds for cache to expire\n", waitTime) + time.Sleep(infoAge) + + _, err = cfs.List("test") + require.NoError(t, err) + time.Sleep(time.Second * 2) + require.False(t, boltDb.HasEntry("one")) +} + +func TestInternalFinalise(t *testing.T) { + var err error + + err = rootFs.Features().Purge() + require.NoError(t, err) +} + +func writeObjectRandomBytes(t *testing.T, f fs.Fs, size int64) fs.Object { + remote := strconv.Itoa(rand.Int()) + ".bin" + // create some rand test data + testData := make([]byte, size) + testSize, err := rand.Read(testData) + require.Equal(t, size, int64(len(testData))) + require.Equal(t, size, int64(testSize)) + require.NoError(t, err) + + o := writeObjectBytes(t, f, remote, testData) + require.Equal(t, size, o.Size()) + + return o +} + +func writeObjectString(t *testing.T, f fs.Fs, remote, content string) fs.Object { + return writeObjectBytes(t, f, remote, []byte(content)) +} + +func writeObjectBytes(t *testing.T, f fs.Fs, remote string, data []byte) fs.Object { + in := bytes.NewReader(data) + modTime := time.Now() + objInfo := fs.NewStaticObjectInfo(remote, modTime, int64(len(data)), true, nil, f) + + obj, err := f.Put(in, objInfo) + require.NoError(t, err) + + return obj +} + +func updateObjectBytes(t *testing.T, f fs.Fs, remote string, data1 []byte, data2 []byte) fs.Object { + in1 := bytes.NewReader(data1) + in2 := bytes.NewReader(data2) + objInfo1 := fs.NewStaticObjectInfo(remote, time.Now(), int64(len(data1)), true, nil, f) + objInfo2 := fs.NewStaticObjectInfo(remote, time.Now(), int64(len(data2)), true, nil, f) + + obj, err := f.Put(in1, objInfo1) + require.NoError(t, err) + obj, err = f.NewObject(remote) + require.NoError(t, err) + err = obj.Update(in2, objInfo2) + + return obj +} + +func readDataFromObj(t *testing.T, co fs.Object, offset, end int64, useSeek bool) []byte { + var reader io.ReadCloser + var err error + size := end - offset + checkSample := make([]byte, size) + + reader, err = co.Open(&fs.SeekOption{Offset: offset}) + require.NoError(t, err) + + totalRead, err := io.ReadFull(reader, checkSample) + require.NoError(t, err) + _ = reader.Close() + require.Equal(t, int64(totalRead), size, "wrong data read size from file") + + return checkSample +} + +func doStuff(t *testing.T, times int, maxDuration time.Duration, stuff func()) { + var wg sync.WaitGroup + + for i := 0; i < times; i++ { + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(maxDuration / 2) + stuff() + time.Sleep(maxDuration / 2) + }() + } + + wg.Wait() +} + +func reset(t *testing.T) { + var err error + + err = rootFs.Features().Purge() + require.NoError(t, err) + + // Instantiate root + rootFs, err = fs.NewFs(*RemoteName + ":") + require.NoError(t, err) + err = rootFs.Mkdir("") + require.NoError(t, err) +} + +func getCacheFs(f fs.Fs) (*cache.Fs, error) { + cfs, ok := f.(*cache.Fs) + if ok { + return cfs, nil + } else { + if f.Features().UnWrap != nil { + cfs, ok := f.Features().UnWrap().(*cache.Fs) + if ok { + return cfs, nil + } + } + } + + return nil, fmt.Errorf("didn't found a cache fs") +} + +func getSourceFs(f fs.Fs) (fs.Fs, error) { + if f.Features().UnWrap != nil { + sfs := f.Features().UnWrap() + _, ok := sfs.(*cache.Fs) + if !ok { + return sfs, nil + } + + return getSourceFs(sfs) + } + + return nil, fmt.Errorf("didn't found a source fs") +} + +var ( + _ fs.Fs = (*cache.Fs)(nil) + _ fs.Fs = (*local.Fs)(nil) +) diff --git a/cache/cache_test.go b/cache/cache_test.go new file mode 100644 index 000000000..24df0d0d2 --- /dev/null +++ b/cache/cache_test.go @@ -0,0 +1,77 @@ +// Test Cache filesystem interface +// +// Automatically generated - DO NOT EDIT +// Regenerate with: make gen_tests + +// +build !plan9 + +package cache_test + +import ( + "testing" + + "github.com/ncw/rclone/cache" + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest/fstests" + _ "github.com/ncw/rclone/local" +) + +func TestSetup(t *testing.T) { + fstests.NilObject = fs.Object((*cache.Object)(nil)) + fstests.RemoteName = "TestCache:" +} + +// Generic tests for the Fs +func TestInit(t *testing.T) { fstests.TestInit(t) } +func TestFsString(t *testing.T) { fstests.TestFsString(t) } +func TestFsName(t *testing.T) { fstests.TestFsName(t) } +func TestFsRoot(t *testing.T) { fstests.TestFsRoot(t) } +func TestFsRmdirEmpty(t *testing.T) { fstests.TestFsRmdirEmpty(t) } +func TestFsRmdirNotFound(t *testing.T) { fstests.TestFsRmdirNotFound(t) } +func TestFsMkdir(t *testing.T) { fstests.TestFsMkdir(t) } +func TestFsMkdirRmdirSubdir(t *testing.T) { fstests.TestFsMkdirRmdirSubdir(t) } +func TestFsListEmpty(t *testing.T) { fstests.TestFsListEmpty(t) } +func TestFsListDirEmpty(t *testing.T) { fstests.TestFsListDirEmpty(t) } +func TestFsListRDirEmpty(t *testing.T) { fstests.TestFsListRDirEmpty(t) } +func TestFsNewObjectNotFound(t *testing.T) { fstests.TestFsNewObjectNotFound(t) } +func TestFsPutFile1(t *testing.T) { fstests.TestFsPutFile1(t) } +func TestFsPutError(t *testing.T) { fstests.TestFsPutError(t) } +func TestFsPutFile2(t *testing.T) { fstests.TestFsPutFile2(t) } +func TestFsUpdateFile1(t *testing.T) { fstests.TestFsUpdateFile1(t) } +func TestFsListDirFile2(t *testing.T) { fstests.TestFsListDirFile2(t) } +func TestFsListRDirFile2(t *testing.T) { fstests.TestFsListRDirFile2(t) } +func TestFsListDirRoot(t *testing.T) { fstests.TestFsListDirRoot(t) } +func TestFsListRDirRoot(t *testing.T) { fstests.TestFsListRDirRoot(t) } +func TestFsListSubdir(t *testing.T) { fstests.TestFsListSubdir(t) } +func TestFsListRSubdir(t *testing.T) { fstests.TestFsListRSubdir(t) } +func TestFsListLevel2(t *testing.T) { fstests.TestFsListLevel2(t) } +func TestFsListRLevel2(t *testing.T) { fstests.TestFsListRLevel2(t) } +func TestFsListFile1(t *testing.T) { fstests.TestFsListFile1(t) } +func TestFsNewObject(t *testing.T) { fstests.TestFsNewObject(t) } +func TestFsListFile1and2(t *testing.T) { fstests.TestFsListFile1and2(t) } +func TestFsNewObjectDir(t *testing.T) { fstests.TestFsNewObjectDir(t) } +func TestFsCopy(t *testing.T) { fstests.TestFsCopy(t) } +func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } +func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } +func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } +func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } +func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } +func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } +func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } +func TestObjectHashes(t *testing.T) { fstests.TestObjectHashes(t) } +func TestObjectModTime(t *testing.T) { fstests.TestObjectModTime(t) } +func TestObjectMimeType(t *testing.T) { fstests.TestObjectMimeType(t) } +func TestObjectSetModTime(t *testing.T) { fstests.TestObjectSetModTime(t) } +func TestObjectSize(t *testing.T) { fstests.TestObjectSize(t) } +func TestObjectOpen(t *testing.T) { fstests.TestObjectOpen(t) } +func TestObjectOpenSeek(t *testing.T) { fstests.TestObjectOpenSeek(t) } +func TestObjectPartialRead(t *testing.T) { fstests.TestObjectPartialRead(t) } +func TestObjectUpdate(t *testing.T) { fstests.TestObjectUpdate(t) } +func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) } +func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) } +func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) } +func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) } +func TestFsPutStream(t *testing.T) { fstests.TestFsPutStream(t) } +func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) } +func TestFinalise(t *testing.T) { fstests.TestFinalise(t) } diff --git a/cache/cache_unsupported.go b/cache/cache_unsupported.go new file mode 100644 index 000000000..05a39fa8b --- /dev/null +++ b/cache/cache_unsupported.go @@ -0,0 +1,6 @@ +// Build for cache for unsupported platforms to stop go complaining +// about "no buildable Go source files " + +// +build plan9 + +package cache diff --git a/cache/directory.go b/cache/directory.go new file mode 100644 index 000000000..e558f37bd --- /dev/null +++ b/cache/directory.go @@ -0,0 +1,126 @@ +// +build !plan9 + +package cache + +import ( + "time" + + "os" + "path" + "strings" + + "github.com/ncw/rclone/fs" +) + +// Directory is a generic dir that stores basic information about it +type Directory struct { + fs.Directory `json:"-"` + + CacheFs *Fs `json:"-"` // cache fs + Name string `json:"name"` // name of the directory + Dir string `json:"dir"` // abs path of the directory + CacheModTime int64 `json:"modTime"` // modification or creation time - IsZero for unknown + CacheSize int64 `json:"size"` // size of directory and contents or -1 if unknown + + CacheItems int64 `json:"items"` // number of objects or -1 for unknown + CacheType string `json:"cacheType"` // object type +} + +// NewDirectory builds an empty dir which will be used to unmarshal data in it +func NewDirectory(f *Fs, remote string) *Directory { + var cd *Directory + fullRemote := cleanPath(path.Join(f.Root(), remote)) + + // build a new one + dir := cleanPath(path.Dir(fullRemote)) + name := cleanPath(path.Base(fullRemote)) + cd = &Directory{ + CacheFs: f, + Name: name, + Dir: dir, + CacheModTime: time.Now().UnixNano(), + CacheSize: 0, + CacheItems: 0, + CacheType: "Directory", + } + + return cd +} + +// DirectoryFromOriginal builds one from a generic fs.Directory +func DirectoryFromOriginal(f *Fs, d fs.Directory) *Directory { + var cd *Directory + fullRemote := path.Join(f.Root(), d.Remote()) + + dir := cleanPath(path.Dir(fullRemote)) + name := cleanPath(path.Base(fullRemote)) + cd = &Directory{ + Directory: d, + CacheFs: f, + Name: name, + Dir: dir, + CacheModTime: d.ModTime().UnixNano(), + CacheSize: d.Size(), + CacheItems: d.Items(), + CacheType: "Directory", + } + + return cd +} + +// Fs returns its FS info +func (d *Directory) Fs() fs.Info { + return d.CacheFs +} + +// String returns a human friendly name for this object +func (d *Directory) String() string { + if d == nil { + return "" + } + return d.Remote() +} + +// Remote returns the remote path +func (d *Directory) Remote() string { + p := cleanPath(path.Join(d.Dir, d.Name)) + if d.CacheFs.Root() != "" { + p = strings.Replace(p, d.CacheFs.Root(), "", 1) + p = strings.TrimPrefix(p, string(os.PathSeparator)) + } + + return p +} + +// abs returns the absolute path to the dir +func (d *Directory) abs() string { + return cleanPath(path.Join(d.Dir, d.Name)) +} + +// parentRemote returns the absolute path parent remote +func (d *Directory) parentRemote() string { + absPath := d.abs() + if absPath == "" { + return "" + } + return cleanPath(path.Dir(absPath)) +} + +// ModTime returns the cached ModTime +func (d *Directory) ModTime() time.Time { + return time.Unix(0, d.CacheModTime) +} + +// Size returns the cached Size +func (d *Directory) Size() int64 { + return d.CacheSize +} + +// Items returns the cached Items +func (d *Directory) Items() int64 { + return d.CacheItems +} + +var ( + _ fs.Directory = (*Directory)(nil) +) diff --git a/cache/handle.go b/cache/handle.go new file mode 100644 index 000000000..0cc059a20 --- /dev/null +++ b/cache/handle.go @@ -0,0 +1,436 @@ +// +build !plan9 + +package cache + +import ( + "fmt" + "io" + "os" + "sync" + "time" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" +) + +// Handle is managing the read/write/seek operations on an open handle +type Handle struct { + cachedObject *Object + memory ChunkStorage + preloadQueue chan int64 + preloadOffset int64 + offset int64 + seenOffsets map[int64]bool + mu sync.Mutex + + ReadRetries int + TotalWorkers int + UseMemory bool + workers []*worker + chunkAge time.Duration + warmup bool + closed bool +} + +// NewObjectHandle returns a new Handle for an existing Object +func NewObjectHandle(o *Object) *Handle { + r := &Handle{ + cachedObject: o, + offset: 0, + preloadOffset: -1, // -1 to trigger the first preload + + ReadRetries: o.CacheFs.readRetries, + TotalWorkers: o.CacheFs.totalWorkers, + UseMemory: o.CacheFs.chunkMemory, + chunkAge: o.CacheFs.chunkAge, + warmup: o.CacheFs.InWarmUp(), + } + r.seenOffsets = make(map[int64]bool) + r.memory = NewMemory(o.CacheFs.chunkAge) + if o.CacheFs.InWarmUp() { + r.chunkAge = o.CacheFs.metaAge + } + + // create a larger buffer to queue up requests + r.preloadQueue = make(chan int64, r.TotalWorkers*10) + r.startReadWorkers() + return r +} + +// cacheFs is a convenience method to get the parent cache FS of the object's manager +func (r *Handle) cacheFs() *Fs { + return r.cachedObject.CacheFs +} + +// storage is a convenience method to get the persistent storage of the object's manager +func (r *Handle) storage() Storage { + return r.cacheFs().cache +} + +// String representation of this reader +func (r *Handle) String() string { + return r.cachedObject.abs() +} + +// InWarmUp says if this handle is in warmup mode +func (r *Handle) InWarmUp() bool { + return r.warmup +} + +// startReadWorkers will start the worker pool +func (r *Handle) startReadWorkers() { + if r.hasAtLeastOneWorker() { + return + } + for i := 0; i < r.TotalWorkers; i++ { + w := &worker{ + r: r, + ch: r.preloadQueue, + id: i, + } + go w.run() + + r.workers = append(r.workers, w) + } +} + +// queueOffset will send an offset to the workers if it's different from the last one +func (r *Handle) queueOffset(offset int64) { + if offset != r.preloadOffset { + r.preloadOffset = offset + previousChunksCounter := 0 + maxOffset := r.cacheFs().chunkSize * int64(r.cacheFs().originalSettingWorkers()) + + // clear the past seen chunks + // they will remain in our persistent storage but will be removed from transient + // so they need to be picked up by a worker + for k := range r.seenOffsets { + if k < offset { + r.seenOffsets[k] = false + + // we count how many continuous chunks were seen before + if offset >= maxOffset && k >= offset-maxOffset { + previousChunksCounter++ + } + } + } + + // if we read all the previous chunks that could have been preloaded + // we should then disable warm up setting for this handle + if r.warmup && previousChunksCounter >= r.cacheFs().originalSettingWorkers() { + r.TotalWorkers = r.cacheFs().originalSettingWorkers() + r.UseMemory = !r.cacheFs().originalSettingChunkNoMemory() + r.chunkAge = r.cacheFs().chunkAge + r.warmup = false + fs.Infof(r, "disabling warm up") + } + + for i := 0; i < r.TotalWorkers; i++ { + o := r.preloadOffset + r.cacheFs().chunkSize*int64(i) + if o < 0 || o >= r.cachedObject.Size() { + continue + } + if v, ok := r.seenOffsets[o]; ok && v { + continue + } + + r.seenOffsets[o] = true + r.preloadQueue <- o + } + } +} + +func (r *Handle) hasAtLeastOneWorker() bool { + oneWorker := false + for i := 0; i < len(r.workers); i++ { + if r.workers[i].isRunning() { + oneWorker = true + } + } + return oneWorker +} + +// getChunk is called by the FS to retrieve a specific chunk of known start and size from where it can find it +// it can be from transient or persistent cache +// it will also build the chunk from the cache's specific chunk boundaries and build the final desired chunk in a buffer +func (r *Handle) getChunk(chunkStart int64) ([]byte, error) { + var data []byte + var err error + + // we reached the end of the file + if chunkStart >= r.cachedObject.Size() { + fs.Debugf(r, "reached EOF %v", chunkStart) + return nil, io.EOF + } + + // we calculate the modulus of the requested offset with the size of a chunk + offset := chunkStart % r.cacheFs().chunkSize + + // we align the start offset of the first chunk to a likely chunk in the storage + chunkStart = chunkStart - offset + r.queueOffset(chunkStart) + found := false + + // delete old chunks from memory + if r.UseMemory { + go r.memory.CleanChunksByNeed(chunkStart) + + data, err = r.memory.GetChunk(r.cachedObject, chunkStart) + if err == nil { + found = true + } + } + + if !found { + // we're gonna give the workers a chance to pickup the chunk + // and retry a couple of times + for i := 0; i < r.ReadRetries; i++ { + data, err = r.storage().GetChunk(r.cachedObject, chunkStart) + + if err == nil { + found = true + break + } + + fs.Debugf(r, "%v: chunk retry storage: %v", chunkStart, i) + time.Sleep(time.Second) + } + } + + // not found in ram or + // the worker didn't managed to download the chunk in time so we abort and close the stream + if err != nil || len(data) == 0 || !found { + if !r.hasAtLeastOneWorker() { + fs.Errorf(r, "out of workers") + return nil, io.ErrUnexpectedEOF + } + + return nil, errors.Errorf("chunk not found %v", chunkStart) + } + + // first chunk will be aligned with the start + if offset > 0 { + data = data[int(offset):] + } + + return data, nil +} + +// Read a chunk from storage or len(p) +func (r *Handle) Read(p []byte) (n int, err error) { + r.mu.Lock() + defer r.mu.Unlock() + var buf []byte + + currentOffset := r.offset + buf, err = r.getChunk(currentOffset) + if err != nil && len(buf) == 0 { + return 0, io.EOF + } + readSize := copy(p, buf) + newOffset := currentOffset + int64(readSize) + r.offset = newOffset + if r.offset >= r.cachedObject.Size() { + err = io.EOF + } + + return readSize, err +} + +// Close will tell the workers to stop +func (r *Handle) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + if r.closed { + return errors.New("file already closed") + } + + close(r.preloadQueue) + r.closed = true + // wait for workers to complete their jobs before returning + waitCount := 3 + for i := 0; i < len(r.workers); i++ { + waitIdx := 0 + for r.workers[i].isRunning() && waitIdx < waitCount { + time.Sleep(time.Second) + waitIdx++ + } + } + + fs.Debugf(r, "cache reader closed %v", r.offset) + return nil +} + +// Seek will move the current offset based on whence and instruct the workers to move there too +func (r *Handle) Seek(offset int64, whence int) (int64, error) { + r.mu.Lock() + defer r.mu.Unlock() + + var err error + switch whence { + case os.SEEK_SET: + fs.Debugf(r, "moving offset set from %v to %v", r.offset, offset) + r.offset = offset + case os.SEEK_CUR: + fs.Debugf(r, "moving offset cur from %v to %v", r.offset, r.offset+offset) + r.offset += offset + case os.SEEK_END: + fs.Debugf(r, "moving offset end (%v) from %v to %v", r.cachedObject.Size(), r.offset, r.cachedObject.Size()+offset) + r.offset = r.cachedObject.Size() + offset + default: + err = errors.Errorf("cache: unimplemented seek whence %v", whence) + } + + chunkStart := r.offset - (r.offset % r.cacheFs().chunkSize) + if chunkStart >= r.cacheFs().chunkSize { + chunkStart = chunkStart - r.cacheFs().chunkSize + } + r.queueOffset(chunkStart) + + return r.offset, err +} + +type worker struct { + r *Handle + ch <-chan int64 + rc io.ReadCloser + id int + running bool + mu sync.Mutex +} + +// String is a representation of this worker +func (w *worker) String() string { + return fmt.Sprintf("worker-%v <%v>", w.id, w.r.cachedObject.Name) +} + +// reader will return a reader depending on the capabilities of the source reader: +// - if it supports seeking it will seek to the desired offset and return the same reader +// - if it doesn't support seeking it will close a possible existing one and open at the desired offset +// - if there's no reader associated with this worker, it will create one +func (w *worker) reader(offset, end int64) (io.ReadCloser, error) { + var err error + r := w.rc + if w.rc == nil { + r, err = w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) { + return w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset}) + }) + if err != nil { + return nil, err + } + return r, nil + } + + seekerObj, ok := r.(io.Seeker) + if ok { + _, err = seekerObj.Seek(offset, os.SEEK_SET) + return r, err + } + + _ = w.rc.Close() + return w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) { + r, err = w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset}) + if err != nil { + return nil, err + } + return r, nil + }) +} + +func (w *worker) isRunning() bool { + w.mu.Lock() + defer w.mu.Unlock() + return w.running +} + +func (w *worker) setRunning(f bool) { + w.mu.Lock() + defer w.mu.Unlock() + w.running = f +} + +// run is the main loop for the worker which receives offsets to preload +func (w *worker) run() { + var err error + var data []byte + defer w.setRunning(false) + defer func() { + if w.rc != nil { + _ = w.rc.Close() + w.setRunning(false) + } + }() + + for { + chunkStart, open := <-w.ch + w.setRunning(true) + if chunkStart < 0 || !open { + break + } + + // skip if it exists + if w.r.UseMemory { + if w.r.memory.HasChunk(w.r.cachedObject, chunkStart) { + continue + } + + // add it in ram if it's in the persistent storage + data, err = w.r.storage().GetChunk(w.r.cachedObject, chunkStart) + if err == nil { + err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart) + if err != nil { + fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err) + } else { + continue + } + } + err = nil + } else { + if w.r.storage().HasChunk(w.r.cachedObject, chunkStart) { + continue + } + } + + chunkEnd := chunkStart + w.r.cacheFs().chunkSize + if chunkEnd > w.r.cachedObject.Size() { + chunkEnd = w.r.cachedObject.Size() + } + w.rc, err = w.reader(chunkStart, chunkEnd) + // we seem to be getting only errors so we abort + if err != nil { + fs.Errorf(w, "object open failed %v: %v", chunkStart, err) + return + } + + data = make([]byte, chunkEnd-chunkStart) + sourceRead := 0 + sourceRead, err = io.ReadFull(w.rc, data) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err) + return + } + if err == io.ErrUnexpectedEOF { + fs.Debugf(w, "partial read chunk %v: %v", chunkStart, err) + } + data = data[:sourceRead] // reslice to remove extra garbage + fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart)) + + if w.r.UseMemory { + err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart) + if err != nil { + fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err) + } + } + + err = w.r.storage().AddChunkAhead(w.r.cachedObject.abs(), data, chunkStart, w.r.chunkAge) + if err != nil { + fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err) + } + } +} + +// Check the interfaces are satisfied +var ( + _ io.ReadCloser = (*Handle)(nil) + _ io.Seeker = (*Handle)(nil) +) diff --git a/cache/object.go b/cache/object.go new file mode 100644 index 000000000..7a3621a63 --- /dev/null +++ b/cache/object.go @@ -0,0 +1,303 @@ +// +build !plan9 + +package cache + +import ( + "encoding/json" + "io" + "os" + "path" + "strings" + "sync" + "time" + + "strconv" + + "github.com/ncw/rclone/fs" +) + +// Object is a generic file like object that stores basic information about it +type Object struct { + fs.Object `json:"-"` + + CacheFs *Fs `json:"-"` // cache fs + Name string `json:"name"` // name of the directory + Dir string `json:"dir"` // abs path of the object + CacheModTime int64 `json:"modTime"` // modification or creation time - IsZero for unknown + CacheSize int64 `json:"size"` // size of directory and contents or -1 if unknown + CacheStorable bool `json:"storable"` // says whether this object can be stored + CacheType string `json:"cacheType"` + cacheHashes map[fs.HashType]string // all supported hashes cached + + refreshMutex sync.Mutex +} + +// NewObject builds one from a generic fs.Object +func NewObject(f *Fs, remote string) *Object { //0745 379 768 + fullRemote := path.Join(f.Root(), remote) + dir, name := path.Split(fullRemote) + + co := &Object{ + CacheFs: f, + Name: cleanPath(name), + Dir: cleanPath(dir), + CacheModTime: time.Now().UnixNano(), + CacheSize: 0, + CacheStorable: false, + CacheType: "Object", + } + return co +} + +// MarshalJSON is needed to override the hashes map (needed to support older versions of Go) +func (o *Object) MarshalJSON() ([]byte, error) { + hashes := make(map[string]string) + for k, v := range o.cacheHashes { + hashes[strconv.Itoa(int(k))] = v + } + + type Alias Object + return json.Marshal(&struct { + Hashes map[string]string `json:"hashes"` + *Alias + }{ + Alias: (*Alias)(o), + Hashes: hashes, + }) +} + +// UnmarshalJSON is needed to override the CacheHashes map (needed to support older versions of Go) +func (o *Object) UnmarshalJSON(b []byte) error { + type Alias Object + aux := &struct { + Hashes map[string]string `json:"hashes"` + *Alias + }{ + Alias: (*Alias)(o), + } + if err := json.Unmarshal(b, &aux); err != nil { + return err + } + + o.cacheHashes = make(map[fs.HashType]string) + for k, v := range aux.Hashes { + ht, _ := strconv.Atoi(k) + o.cacheHashes[fs.HashType(ht)] = v + } + + return nil +} + +// ObjectFromOriginal builds one from a generic fs.Object +func ObjectFromOriginal(f *Fs, o fs.Object) *Object { + var co *Object + fullRemote := cleanPath(path.Join(f.Root(), o.Remote())) + + dir, name := path.Split(fullRemote) + co = &Object{ + CacheFs: f, + Name: cleanPath(name), + Dir: cleanPath(dir), + CacheType: "Object", + } + co.updateData(o) + return co +} + +func (o *Object) updateData(source fs.Object) { + o.Object = source + o.CacheModTime = source.ModTime().UnixNano() + o.CacheSize = source.Size() + o.CacheStorable = source.Storable() + o.cacheHashes = make(map[fs.HashType]string) +} + +// Fs returns its FS info +func (o *Object) Fs() fs.Info { + return o.CacheFs +} + +// String returns a human friendly name for this object +func (o *Object) String() string { + if o == nil { + return "" + } + return o.Remote() +} + +// Remote returns the remote path +func (o *Object) Remote() string { + p := path.Join(o.Dir, o.Name) + if o.CacheFs.Root() != "" { + p = strings.Replace(p, o.CacheFs.Root(), "", 1) + p = strings.TrimPrefix(p, string(os.PathSeparator)) + } + + return p +} + +// abs returns the absolute path to the object +func (o *Object) abs() string { + return path.Join(o.Dir, o.Name) +} + +// parentRemote returns the absolute path parent remote +func (o *Object) parentRemote() string { + absPath := o.abs() + return cleanPath(path.Dir(absPath)) +} + +// ModTime returns the cached ModTime +func (o *Object) ModTime() time.Time { + return time.Unix(0, o.CacheModTime) +} + +// Size returns the cached Size +func (o *Object) Size() int64 { + return o.CacheSize +} + +// Storable returns the cached Storable +func (o *Object) Storable() bool { + return o.CacheStorable +} + +// refreshFromSource requests the original FS for the object in case it comes from a cached entry +func (o *Object) refreshFromSource() error { + o.refreshMutex.Lock() + defer o.refreshMutex.Unlock() + + if o.Object != nil { + return nil + } + + liveObject, err := o.CacheFs.Fs.NewObject(o.Remote()) + if err != nil { + fs.Errorf(o, "error refreshing object: %v", err) + return err + } + o.updateData(liveObject) + o.persist() + + return nil +} + +// SetModTime sets the ModTime of this object +func (o *Object) SetModTime(t time.Time) error { + if err := o.refreshFromSource(); err != nil { + return err + } + + err := o.Object.SetModTime(t) + if err != nil { + return err + } + + o.CacheModTime = t.UnixNano() + o.persist() + fs.Debugf(o.Fs(), "updated ModTime %v: %v", o, t) + + return nil +} + +// Open is used to request a specific part of the file using fs.RangeOption +func (o *Object) Open(options ...fs.OpenOption) (io.ReadCloser, error) { + if err := o.refreshFromSource(); err != nil { + return nil, err + } + o.CacheFs.CheckIfWarmupNeeded(o.Remote()) + + cacheReader := NewObjectHandle(o) + for _, option := range options { + switch x := option.(type) { + case *fs.SeekOption: + _, err := cacheReader.Seek(x.Offset, os.SEEK_SET) + if err != nil { + return cacheReader, err + } + } + } + + return cacheReader, nil +} + +// Update will change the object data +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + if err := o.refreshFromSource(); err != nil { + return err + } + fs.Infof(o, "updating object contents with size %v", src.Size()) + + // deleting cached chunks and info to be replaced with new ones + _ = o.CacheFs.cache.RemoveObject(o.abs()) + + err := o.Object.Update(in, src, options...) + if err != nil { + fs.Errorf(o, "error updating source: %v", err) + return err + } + + o.CacheModTime = src.ModTime().UnixNano() + o.CacheSize = src.Size() + o.cacheHashes = make(map[fs.HashType]string) + o.persist() + + return nil +} + +// Remove deletes the object from both the cache and the source +func (o *Object) Remove() error { + if err := o.refreshFromSource(); err != nil { + return err + } + err := o.Object.Remove() + if err != nil { + return err + } + fs.Infof(o, "removing object") + + _ = o.CacheFs.cache.RemoveObject(o.abs()) + return err +} + +// Hash requests a hash of the object and stores in the cache +// since it might or might not be called, this is lazy loaded +func (o *Object) Hash(ht fs.HashType) (string, error) { + if o.cacheHashes == nil { + o.cacheHashes = make(map[fs.HashType]string) + } + + cachedHash, found := o.cacheHashes[ht] + if found { + return cachedHash, nil + } + + if err := o.refreshFromSource(); err != nil { + return "", err + } + + liveHash, err := o.Object.Hash(ht) + if err != nil { + return "", err + } + + o.cacheHashes[ht] = liveHash + + o.persist() + fs.Debugf(o, "object hash cached: %v", liveHash) + + return liveHash, nil +} + +// persist adds this object to the persistent cache +func (o *Object) persist() *Object { + err := o.CacheFs.cache.AddObject(o) + if err != nil { + fs.Errorf(o, "failed to cache object: %v", err) + } + return o +} + +var ( + _ fs.Object = (*Object)(nil) +) diff --git a/cache/storage_memory.go b/cache/storage_memory.go new file mode 100644 index 000000000..e752c3914 --- /dev/null +++ b/cache/storage_memory.go @@ -0,0 +1,95 @@ +// +build !plan9 + +package cache + +import ( + "strconv" + "strings" + "time" + + "github.com/ncw/rclone/fs" + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" +) + +// Memory is a wrapper of transient storage for a go-cache store +type Memory struct { + ChunkStorage + + db *cache.Cache +} + +// NewMemory builds this cache storage +// defaultExpiration will set the expiry time of chunks in this storage +func NewMemory(defaultExpiration time.Duration) *Memory { + mem := &Memory{} + err := mem.Connect(defaultExpiration) + if err != nil { + fs.Errorf("cache", "can't open ram connection: %v", err) + } + + return mem +} + +// Connect will create a connection for the storage +func (m *Memory) Connect(defaultExpiration time.Duration) error { + m.db = cache.New(defaultExpiration, -1) + return nil +} + +// HasChunk confirms the existence of a single chunk of an object +func (m *Memory) HasChunk(cachedObject *Object, offset int64) bool { + key := cachedObject.abs() + "-" + strconv.FormatInt(offset, 10) + _, found := m.db.Get(key) + return found +} + +// GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it +func (m *Memory) GetChunk(cachedObject *Object, offset int64) ([]byte, error) { + key := cachedObject.abs() + "-" + strconv.FormatInt(offset, 10) + var data []byte + + if x, found := m.db.Get(key); found { + data = x.([]byte) + return data, nil + } + + return nil, errors.Errorf("couldn't get cached object data at offset %v", offset) +} + +// AddChunk adds a new chunk of a cached object +func (m *Memory) AddChunk(cachedObject *Object, data []byte, offset int64) error { + return m.AddChunkAhead(cachedObject.abs(), data, offset, time.Second) +} + +// AddChunkAhead adds a new chunk of a cached object +func (m *Memory) AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error { + key := fp + "-" + strconv.FormatInt(offset, 10) + m.db.Set(key, data, cache.DefaultExpiration) + + return nil +} + +// CleanChunksByAge will cleanup on a cron basis +func (m *Memory) CleanChunksByAge(chunkAge time.Duration) { + m.db.DeleteExpired() +} + +// CleanChunksByNeed will cleanup chunks after the FS passes a specific chunk +func (m *Memory) CleanChunksByNeed(offset int64) { + var items map[string]cache.Item + + items = m.db.Items() + for key := range items { + sepIdx := strings.LastIndex(key, "-") + keyOffset, err := strconv.ParseInt(key[sepIdx+1:], 10, 64) + if err != nil { + fs.Errorf("cache", "couldn't parse offset entry %v", key) + continue + } + + if keyOffset < offset { + m.db.Delete(key) + } + } +} diff --git a/cache/storage_persistent.go b/cache/storage_persistent.go new file mode 100644 index 000000000..9df75aa75 --- /dev/null +++ b/cache/storage_persistent.go @@ -0,0 +1,758 @@ +// +build !plan9 + +package cache + +import ( + "time" + + "bytes" + "encoding/binary" + "encoding/json" + "os" + "path" + "strconv" + "strings" + "sync" + + "io/ioutil" + "path/filepath" + + bolt "github.com/coreos/bbolt" + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" +) + +// Constants +const ( + RootBucket = "root" + RootTsBucket = "rootTs" + DataTsBucket = "dataTs" +) + +var boltMap = make(map[string]*Persistent) +var boltMapMx sync.RWMutex + +// GetPersistent returns a single instance for the specific store +func GetPersistent(dbPath string, refreshDb bool) *Persistent { + // read lock to check if it exists + boltMapMx.RLock() + if b, ok := boltMap[dbPath]; ok { + boltMapMx.RUnlock() + return b + } + boltMapMx.RUnlock() + + // write lock to create one but let's check a 2nd time + boltMapMx.Lock() + defer boltMapMx.Unlock() + if b, ok := boltMap[dbPath]; ok { + return b + } + + boltMap[dbPath] = newPersistent(dbPath, refreshDb) + return boltMap[dbPath] +} + +// Persistent is a wrapper of persistent storage for a bolt.DB file +type Persistent struct { + Storage + + dbPath string + dataPath string + db *bolt.DB + cleanupMux sync.Mutex +} + +// newPersistent builds a new wrapper and connects to the bolt.DB file +func newPersistent(dbPath string, refreshDb bool) *Persistent { + dataPath := strings.TrimSuffix(dbPath, filepath.Ext(dbPath)) + + b := &Persistent{ + dbPath: dbPath, + dataPath: dataPath, + } + + err := b.Connect(refreshDb) + if err != nil { + fs.Errorf(dbPath, "error opening storage cache: %v", err) + } + + return b +} + +// String will return a human friendly string for this DB (currently the dbPath) +func (b *Persistent) String() string { + return " " + b.dbPath +} + +// Connect creates a connection to the configured file +// refreshDb will delete the file before to create an empty DB if it's set to true +func (b *Persistent) Connect(refreshDb bool) error { + var db *bolt.DB + var err error + + if refreshDb { + err := os.Remove(b.dbPath) + if err != nil { + fs.Errorf(b, "failed to remove cache file: %v", err) + } + err = os.RemoveAll(b.dataPath) + if err != nil { + fs.Errorf(b, "failed to remove cache data: %v", err) + } + } + + err = os.MkdirAll(b.dataPath, os.ModePerm) + if err != nil { + return errors.Wrapf(err, "failed to create a data directory %q", b.dataPath) + } + db, err = bolt.Open(b.dbPath, 0644, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return errors.Wrapf(err, "failed to open a cache connection to %q", b.dbPath) + } + + _ = db.Update(func(tx *bolt.Tx) error { + _, _ = tx.CreateBucketIfNotExists([]byte(RootBucket)) + _, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket)) + _, _ = tx.CreateBucketIfNotExists([]byte(DataTsBucket)) + + return nil + }) + + b.db = db + + return nil +} + +// getBucket prepares and cleans a specific path of the form: /var/tmp and will iterate through each path component +// to get to the nested bucket of the final part (in this example: tmp) +func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *bolt.Bucket { + cleanPath(dir) + + entries := strings.FieldsFunc(dir, func(c rune) bool { + return os.PathSeparator == c + }) + bucket := tx.Bucket([]byte(RootBucket)) + + for _, entry := range entries { + if createIfMissing { + bucket, _ = bucket.CreateBucketIfNotExists([]byte(entry)) + } else { + bucket = bucket.Bucket([]byte(entry)) + } + + if bucket == nil { + return nil + } + } + + return bucket +} + +// updateChunkTs is a convenience method to update a chunk timestamp to mark that it was used recently +func (b *Persistent) updateChunkTs(tx *bolt.Tx, path string, offset int64, t time.Duration) { + tsBucket := tx.Bucket([]byte(DataTsBucket)) + tsVal := path + "-" + strconv.FormatInt(offset, 10) + ts := time.Now().Add(t) + found := false + + // delete previous timestamps for the same object + c := tsBucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + if bytes.Equal(v, []byte(tsVal)) { + if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found { + found = true + continue + } + err := c.Delete() + if err != nil { + fs.Debugf(path, "failed to clean chunk: %v", err) + } + } + } + if found { + return + } + + err := tsBucket.Put(itob(ts.UnixNano()), []byte(tsVal)) + if err != nil { + fs.Debugf(path, "failed to timestamp chunk: %v", err) + } +} + +// updateRootTs is a convenience method to update an object timestamp to mark that it was used recently +func (b *Persistent) updateRootTs(tx *bolt.Tx, path string, t time.Duration) { + tsBucket := tx.Bucket([]byte(RootTsBucket)) + ts := time.Now().Add(t) + found := false + + // delete previous timestamps for the same object + c := tsBucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + if bytes.Equal(v, []byte(path)) { + if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found { + found = true + continue + } + err := c.Delete() + if err != nil { + fs.Debugf(path, "failed to clean object: %v", err) + } + } + } + if found { + return + } + + err := tsBucket.Put(itob(ts.UnixNano()), []byte(path)) + if err != nil { + fs.Debugf(path, "failed to timestamp chunk: %v", err) + } +} + +// AddDir will update a CachedDirectory metadata and all its entries +func (b *Persistent) AddDir(cachedDir *Directory) error { + return b.db.Update(func(tx *bolt.Tx) error { + bucket := b.getBucket(cachedDir.abs(), true, tx) + if bucket == nil { + return errors.Errorf("couldn't open bucket (%v)", cachedDir) + } + + encoded, err := json.Marshal(cachedDir) + if err != nil { + return errors.Errorf("couldn't marshal object (%v): %v", cachedDir, err) + } + err = bucket.Put([]byte("."), encoded) + if err != nil { + return err + } + + b.updateRootTs(tx, cachedDir.abs(), cachedDir.CacheFs.fileAge) + return nil + }) +} + +// GetDirEntries will return a CachedDirectory, its list of dir entries and/or an error if it encountered issues +func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error) { + var dirEntries fs.DirEntries + + err := b.db.View(func(tx *bolt.Tx) error { + bucket := b.getBucket(cachedDir.abs(), false, tx) + if bucket == nil { + return errors.Errorf("couldn't open bucket (%v)", cachedDir.abs()) + } + + val := bucket.Get([]byte(".")) + if val != nil { + err := json.Unmarshal(val, cachedDir) + if err != nil { + fs.Debugf(cachedDir.abs(), "error during unmarshalling obj: %v", err) + } + } else { + return errors.Errorf("missing cached dir: %v", cachedDir) + } + + c := bucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + // ignore metadata key: . + if bytes.Equal(k, []byte(".")) { + continue + } + entryPath := path.Join(cachedDir.Remote(), string(k)) + + if v == nil { // directory + // we try to find a cached meta for the dir + currentBucket := c.Bucket().Bucket(k) + if currentBucket == nil { + return errors.Errorf("couldn't open bucket (%v)", string(k)) + } + + metaKey := currentBucket.Get([]byte(".")) + d := NewDirectory(cachedDir.CacheFs, entryPath) + if metaKey != nil { //if we don't find it, we create an empty dir + err := json.Unmarshal(metaKey, d) + if err != nil { // if even this fails, we fallback to an empty dir + fs.Debugf(string(k), "error during unmarshalling obj: %v", err) + } + } + + dirEntries = append(dirEntries, d) + } else { // object + o := NewObject(cachedDir.CacheFs, entryPath) + err := json.Unmarshal(v, o) + if err != nil { + fs.Debugf(string(k), "error during unmarshalling obj: %v", err) + continue + } + + dirEntries = append(dirEntries, o) + } + } + + return nil + }) + + return dirEntries, err +} + +// RemoveDir will delete a CachedDirectory, all its objects and all the chunks stored for it +func (b *Persistent) RemoveDir(fp string) error { + err := b.ExpireDir(fp) + + // delete chunks on disk + // safe to ignore as the files might not have been open + if err != nil { + _ = os.RemoveAll(path.Join(b.dataPath, fp)) + } + + return nil +} + +// ExpireDir will flush a CachedDirectory and all its objects from the objects +// chunks will remain as they are +func (b *Persistent) ExpireDir(fp string) error { + parentDir, dirName := path.Split(fp) + if fp == "" { + return b.db.Update(func(tx *bolt.Tx) error { + err := tx.DeleteBucket([]byte(RootBucket)) + if err != nil { + fs.Debugf(fp, "couldn't delete from cache: %v", err) + return err + } + err = tx.DeleteBucket([]byte(RootTsBucket)) + if err != nil { + fs.Debugf(fp, "couldn't delete from cache: %v", err) + return err + } + _, _ = tx.CreateBucketIfNotExists([]byte(RootBucket)) + _, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket)) + return nil + }) + } + + return b.db.Update(func(tx *bolt.Tx) error { + bucket := b.getBucket(cleanPath(parentDir), false, tx) + if bucket == nil { + return errors.Errorf("couldn't open bucket (%v)", fp) + } + // delete the cached dir + err := bucket.DeleteBucket([]byte(cleanPath(dirName))) + if err != nil { + fs.Debugf(fp, "couldn't delete from cache: %v", err) + } + return nil + }) +} + +// GetObject will return a CachedObject from its parent directory or an error if it doesn't find it +func (b *Persistent) GetObject(cachedObject *Object) (err error) { + return b.db.View(func(tx *bolt.Tx) error { + bucket := b.getBucket(cachedObject.Dir, false, tx) + if bucket == nil { + return errors.Errorf("couldn't open parent bucket for %v", cachedObject.Dir) + } + val := bucket.Get([]byte(cachedObject.Name)) + if val != nil { + return json.Unmarshal(val, cachedObject) + } + return errors.Errorf("couldn't find object (%v)", cachedObject.Name) + }) +} + +// AddObject will create a cached object in its parent directory +func (b *Persistent) AddObject(cachedObject *Object) error { + return b.db.Update(func(tx *bolt.Tx) error { + bucket := b.getBucket(cachedObject.Dir, true, tx) + if bucket == nil { + return errors.Errorf("couldn't open parent bucket for %v", cachedObject) + } + // cache Object Info + encoded, err := json.Marshal(cachedObject) + if err != nil { + return errors.Errorf("couldn't marshal object (%v) info: %v", cachedObject, err) + } + err = bucket.Put([]byte(cachedObject.Name), []byte(encoded)) + if err != nil { + return errors.Errorf("couldn't cache object (%v) info: %v", cachedObject, err) + } + b.updateRootTs(tx, cachedObject.abs(), cachedObject.CacheFs.fileAge) + return nil + }) +} + +// RemoveObject will delete a single cached object and all the chunks which belong to it +func (b *Persistent) RemoveObject(fp string) error { + parentDir, objName := path.Split(fp) + return b.db.Update(func(tx *bolt.Tx) error { + bucket := b.getBucket(cleanPath(parentDir), false, tx) + if bucket == nil { + return errors.Errorf("couldn't open parent bucket for %v", cleanPath(parentDir)) + } + err := bucket.Delete([]byte(cleanPath(objName))) + if err != nil { + fs.Debugf(fp, "couldn't delete obj from storage: %v", err) + } + // delete chunks on disk + // safe to ignore as the file might not have been open + _ = os.RemoveAll(path.Join(b.dataPath, fp)) + return nil + }) +} + +// HasEntry confirms the existence of a single entry (dir or object) +func (b *Persistent) HasEntry(remote string) bool { + dir, name := path.Split(remote) + dir = cleanPath(dir) + name = cleanPath(name) + + err := b.db.View(func(tx *bolt.Tx) error { + bucket := b.getBucket(dir, false, tx) + if bucket == nil { + return errors.Errorf("couldn't open parent bucket for %v", remote) + } + if f := bucket.Bucket([]byte(name)); f != nil { + return nil + } + if f := bucket.Get([]byte(name)); f != nil { + return nil + } + + return errors.Errorf("couldn't find object (%v)", remote) + }) + if err == nil { + return true + } + return false +} + +// HasChunk confirms the existence of a single chunk of an object +func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool { + fp := path.Join(b.dataPath, cachedObject.abs(), strconv.FormatInt(offset, 10)) + if _, err := os.Stat(fp); !os.IsNotExist(err) { + return true + } + return false +} + +// GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it +func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error) { + p := cachedObject.abs() + var data []byte + + fp := path.Join(b.dataPath, cachedObject.abs(), strconv.FormatInt(offset, 10)) + data, err := ioutil.ReadFile(fp) + if err != nil { + return nil, err + } + + d := cachedObject.CacheFs.chunkAge + if cachedObject.CacheFs.InWarmUp() { + d = cachedObject.CacheFs.metaAge + } + + err = b.db.Update(func(tx *bolt.Tx) error { + b.updateChunkTs(tx, p, offset, d) + return nil + }) + + return data, err +} + +// AddChunk adds a new chunk of a cached object +func (b *Persistent) AddChunk(cachedObject *Object, data []byte, offset int64) error { + t := cachedObject.CacheFs.chunkAge + if cachedObject.CacheFs.InWarmUp() { + t = cachedObject.CacheFs.metaAge + } + return b.AddChunkAhead(cachedObject.abs(), data, offset, t) +} + +// AddChunkAhead adds a new chunk before caching an Object for it +// see fs.cacheWrites +func (b *Persistent) AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error { + _ = os.MkdirAll(path.Join(b.dataPath, fp), os.ModePerm) + + filePath := path.Join(b.dataPath, fp, strconv.FormatInt(offset, 10)) + err := ioutil.WriteFile(filePath, data, os.ModePerm) + if err != nil { + return err + } + + return b.db.Update(func(tx *bolt.Tx) error { + b.updateChunkTs(tx, fp, offset, t) + return nil + }) +} + +// CleanChunksByAge will cleanup on a cron basis +func (b *Persistent) CleanChunksByAge(chunkAge time.Duration) { + b.cleanupMux.Lock() + defer b.cleanupMux.Unlock() + var cntChunks int + + err := b.db.Update(func(tx *bolt.Tx) error { + min := itob(0) + max := itob(time.Now().UnixNano()) + + dataTsBucket := tx.Bucket([]byte(DataTsBucket)) + if dataTsBucket == nil { + return errors.Errorf("Couldn't open (%v) bucket", DataTsBucket) + } + // iterate through ts + c := dataTsBucket.Cursor() + for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { + if v == nil { + continue + } + // split to get (abs path - offset) + val := string(v[:]) + sepIdx := strings.LastIndex(val, "-") + pathCmp := val[:sepIdx] + offsetCmp := val[sepIdx+1:] + + // delete this ts entry + err := c.Delete() + if err != nil { + fs.Errorf(pathCmp, "failed deleting chunk ts during cleanup (%v): %v", val, err) + continue + } + + err = os.Remove(path.Join(b.dataPath, pathCmp, offsetCmp)) + if err == nil { + cntChunks = cntChunks + 1 + } + } + fs.Infof("cache", "deleted (%v) chunks", cntChunks) + return nil + }) + + if err != nil { + fs.Errorf("cache", "cleanup failed: %v", err) + } +} + +// CleanEntriesByAge will cleanup on a cron basis +func (b *Persistent) CleanEntriesByAge(entryAge time.Duration) { + b.cleanupMux.Lock() + defer b.cleanupMux.Unlock() + var cntEntries int + + err := b.db.Update(func(tx *bolt.Tx) error { + min := itob(0) + max := itob(time.Now().UnixNano()) + + rootTsBucket := tx.Bucket([]byte(RootTsBucket)) + if rootTsBucket == nil { + return errors.Errorf("Couldn't open (%v) bucket", rootTsBucket) + } + // iterate through ts + c := rootTsBucket.Cursor() + for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { + if v == nil { + continue + } + // get the path + absPath := string(v) + absDir, absName := path.Split(absPath) + + // delete this ts entry + err := c.Delete() + if err != nil { + fs.Errorf(absPath, "failed deleting object during cleanup: %v", err) + continue + } + + // search for the entry in the root bucket, skip it if it's not found + parentBucket := b.getBucket(cleanPath(absDir), false, tx) + if parentBucket == nil { + continue + } + _ = parentBucket.Delete([]byte(cleanPath(absName))) + cntEntries = cntEntries + 1 + } + fs.Infof("cache", "deleted (%v) entries", cntEntries) + return nil + }) + + if err != nil { + fs.Errorf("cache", "cleanup failed: %v", err) + } +} + +// CleanChunksByNeed is a noop for this implementation +func (b *Persistent) CleanChunksByNeed(offset int64) { + // noop: we want to clean a Bolt DB by time only +} + +// Stats returns a go map with the stats key values +func (b *Persistent) Stats() (map[string]map[string]interface{}, error) { + r := make(map[string]map[string]interface{}) + r["data"] = make(map[string]interface{}) + r["data"]["oldest-ts"] = time.Now() + r["data"]["oldest-file"] = "" + r["data"]["newest-ts"] = time.Now() + r["data"]["newest-file"] = "" + r["data"]["total-chunks"] = 0 + r["files"] = make(map[string]interface{}) + r["files"]["oldest-ts"] = time.Now() + r["files"]["oldest-name"] = "" + r["files"]["newest-ts"] = time.Now() + r["files"]["newest-name"] = "" + r["files"]["total-files"] = 0 + + _ = b.db.View(func(tx *bolt.Tx) error { + dataTsBucket := tx.Bucket([]byte(DataTsBucket)) + rootTsBucket := tx.Bucket([]byte(RootTsBucket)) + + var totalDirs int + var totalFiles int + _ = b.iterateBuckets(tx.Bucket([]byte(RootBucket)), func(name string) { + totalDirs++ + }, func(key string, val []byte) { + totalFiles++ + }) + r["files"]["total-dir"] = totalDirs + r["files"]["total-files"] = totalFiles + + c := dataTsBucket.Cursor() + if k, v := c.First(); k != nil { + // split to get (abs path - offset) + val := string(v[:]) + p := val[:strings.LastIndex(val, "-")] + r["data"]["oldest-ts"] = time.Unix(0, btoi(k)) + r["data"]["oldest-file"] = p + } + if k, v := c.Last(); k != nil { + // split to get (abs path - offset) + val := string(v[:]) + p := val[:strings.LastIndex(val, "-")] + r["data"]["newest-ts"] = time.Unix(0, btoi(k)) + r["data"]["newest-file"] = p + } + + c = rootTsBucket.Cursor() + if k, v := c.First(); k != nil { + // split to get (abs path - offset) + r["files"]["oldest-ts"] = time.Unix(0, btoi(k)) + r["files"]["oldest-name"] = string(v) + } + if k, v := c.Last(); k != nil { + r["files"]["newest-ts"] = time.Unix(0, btoi(k)) + r["files"]["newest-name"] = string(v) + } + + return nil + }) + + return r, nil +} + +// Purge will flush the entire cache +func (b *Persistent) Purge() { + _ = b.db.Update(func(tx *bolt.Tx) error { + _ = tx.DeleteBucket([]byte(RootBucket)) + _ = tx.DeleteBucket([]byte(RootTsBucket)) + _ = tx.DeleteBucket([]byte(DataTsBucket)) + + _, _ = tx.CreateBucketIfNotExists([]byte(RootBucket)) + _, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket)) + _, _ = tx.CreateBucketIfNotExists([]byte(DataTsBucket)) + + return nil + }) + + err := os.RemoveAll(b.dataPath) + if err != nil { + fs.Errorf(b, "issue removing data folder: %v", err) + } + err = os.MkdirAll(b.dataPath, os.ModePerm) + if err != nil { + fs.Errorf(b, "issue removing data folder: %v", err) + } +} + +// GetChunkTs retrieves the current timestamp of this chunk +func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error) { + var t time.Time + tsVal := path + "-" + strconv.FormatInt(offset, 10) + + err := b.db.View(func(tx *bolt.Tx) error { + tsBucket := tx.Bucket([]byte(DataTsBucket)) + c := tsBucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + if bytes.Equal(v, []byte(tsVal)) { + t = time.Unix(0, btoi(k)) + return nil + } + } + return errors.Errorf("not found %v-%v", path, offset) + }) + + return t, err +} + +// GetRootTs retrieves the current timestamp of an object or dir +func (b *Persistent) GetRootTs(path string) (time.Time, error) { + var t time.Time + + err := b.db.View(func(tx *bolt.Tx) error { + tsBucket := tx.Bucket([]byte(RootTsBucket)) + c := tsBucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + if bytes.Equal(v, []byte(path)) { + t = time.Unix(0, btoi(k)) + return nil + } + } + return errors.Errorf("not found %v", path) + }) + + return t, err +} + +func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string), kvFn func(key string, val []byte)) error { + err := b.db.View(func(tx *bolt.Tx) error { + var c *bolt.Cursor + if buk == nil { + c = tx.Cursor() + } else { + c = buk.Cursor() + } + for k, v := c.First(); k != nil; k, v = c.Next() { + if v == nil { + var buk2 *bolt.Bucket + if buk == nil { + buk2 = tx.Bucket(k) + } else { + buk2 = buk.Bucket(k) + } + + bucketFn(string(k)) + _ = b.iterateBuckets(buk2, bucketFn, kvFn) + } else { + kvFn(string(k), v) + } + } + return nil + }) + + return err +} + +// itob returns an 8-byte big endian representation of v. +func itob(v int64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(v)) + return b +} + +func btoi(d []byte) int64 { + return int64(binary.BigEndian.Uint64(d)) +} + +// cloneBytes returns a copy of a given slice. +func cloneBytes(v []byte) []byte { + var clone = make([]byte, len(v)) + copy(clone, v) + return clone +} diff --git a/cmd/all/all.go b/cmd/all/all.go index 9c8ccc6a4..4def69c75 100644 --- a/cmd/all/all.go +++ b/cmd/all/all.go @@ -5,6 +5,7 @@ import ( // Active commands _ "github.com/ncw/rclone/cmd" _ "github.com/ncw/rclone/cmd/authorize" + _ "github.com/ncw/rclone/cmd/cachestats" _ "github.com/ncw/rclone/cmd/cat" _ "github.com/ncw/rclone/cmd/check" _ "github.com/ncw/rclone/cmd/cleanup" diff --git a/cmd/cachestats/cachestats.go b/cmd/cachestats/cachestats.go new file mode 100644 index 000000000..9130c5630 --- /dev/null +++ b/cmd/cachestats/cachestats.go @@ -0,0 +1,61 @@ +// +build !plan9 + +package cachestats + +import ( + "encoding/json" + "fmt" + + "github.com/ncw/rclone/cache" + "github.com/ncw/rclone/cmd" + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +func init() { + cmd.Root.AddCommand(commandDefinition) +} + +var commandDefinition = &cobra.Command{ + Use: "cachestats source:", + Short: `Print cache stats for a remote`, + Long: ` +Print cache stats for a remote in JSON format +`, + Run: func(command *cobra.Command, args []string) { + cmd.CheckArgs(1, 1, command, args) + + _, configName, _, err := fs.ParseRemote(args[0]) + if err != nil { + fs.Errorf("cachestats", "%s", err.Error()) + return + } + + if !fs.ConfigFileGetBool(configName, "read_only", false) { + fs.ConfigFileSet(configName, "read_only", "true") + defer fs.ConfigFileDeleteKey(configName, "read_only") + } + + fsrc := cmd.NewFsSrc(args) + cmd.Run(true, true, command, func() error { + var fsCache *cache.Fs + fsCache, ok := fsrc.(*cache.Fs) + if !ok { + fsCache, ok = fsrc.Features().UnWrap().(*cache.Fs) + if !ok { + return errors.Errorf("%s: is not a cache remote", fsrc.Name()) + } + } + m, err := fsCache.Stats() + + raw, err := json.MarshalIndent(m, "", " ") + if err != nil { + return err + } + + fmt.Printf("%s\n", string(raw)) + return nil + }) + }, +} diff --git a/cmd/cachestats/cachestats_unsupported.go b/cmd/cachestats/cachestats_unsupported.go new file mode 100644 index 000000000..aded32895 --- /dev/null +++ b/cmd/cachestats/cachestats_unsupported.go @@ -0,0 +1,6 @@ +// Build for cache for unsupported platforms to stop go complaining +// about "no buildable Go source files " + +// +build plan9 + +package cachestats diff --git a/docs/content/about.md b/docs/content/about.md index 4e3d1da11..4dd7cb91a 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -55,6 +55,7 @@ Features * [Check](/commands/rclone_check/) mode to check for file hash equality * Can sync to and from network, eg two different cloud accounts * Optional encryption ([Crypt](/crypt/)) + * Optional cache ([Cache](/cache/)) * Optional FUSE mount ([rclone mount](/commands/rclone_mount/)) Links diff --git a/docs/content/cache.md b/docs/content/cache.md new file mode 100644 index 000000000..1c7d56fdd --- /dev/null +++ b/docs/content/cache.md @@ -0,0 +1,309 @@ +--- +title: "Cache" +description: "Rclone docs for cache remote" +date: "2017-09-03" +--- + + Cache +----------------------------------------- + +The `cache` remote wraps another existing remote and stores file structure +and its data for long running tasks like `rclone mount`. + +To get started you just need to have an existing remote which can be configured +with `cache`. + +Here is an example of how to make a remote called `test-cache`. First run: + + rclone config + +This will guide you through an interactive setup process: + +``` +No remotes found - make a new one +n) New remote +r) Rename remote +c) Copy remote +s) Set configuration password +q) Quit config +n/r/c/s/q> n +name> remote +Type of storage to configure. +Choose a number from below, or type in your own value +... + 5 / Cache a remote + \ "cache" +... +Storage> 5 +Remote to cache. +Normally should contain a ':' and a path, eg "myremote:path/to/dir", +"myremote:bucket" or maybe "myremote:" (not recommended). +remote> local:/test +The size of a chunk. Lower value good for slow connections but can affect seamless reading. +Default: 5M +Choose a number from below, or type in your own value + 1 / 1MB + \ "1m" + 2 / 5 MB + \ "5M" + 3 / 10 MB + \ "10M" +chunk_size> 2 +How much time should object info (file size, file hashes etc) be stored in cache. Use a very high value if you don't plan on changing the source FS from outside the cache. +Accepted units are: "s", "m", "h". +Default: 5m +Choose a number from below, or type in your own value + 1 / 1 hour + \ "1h" + 2 / 24 hours + \ "24h" + 3 / 24 hours + \ "48h" +info_age> 2 +How much time should a chunk (file data) be stored in cache. +Accepted units are: "s", "m", "h". +Default: 3h +Choose a number from below, or type in your own value + 1 / 30 seconds + \ "30s" + 2 / 1 minute + \ "1m" + 3 / 1 hour and 30 minutes + \ "1h30m" +chunk_age> 3h +How much time should data be cached during warm up. +Accepted units are: "s", "m", "h". +Default: 24h +Choose a number from below, or type in your own value + 1 / 3 hours + \ "3h" + 2 / 6 hours + \ "6h" + 3 / 24 hours + \ "24h" +warmup_age> 3 +Remote config +-------------------- +[test-cache] +remote = local:/test +chunk_size = 5M +info_age = 24h +chunk_age = 3h +warmup_age = 24h +``` + +You can then use it like this, + +List directories in top level of your drive + + rclone lsd test-cache: + +List all the files in your drive + + rclone ls test-cache: + +To start a cached mount + + rclone mount --allow-other test-cache: /var/tmp/test-cache + +### Write Support ### + +Writes are supported through `cache`. +One caveat is that a mounted cache remote does not add any retry or fallback +mechanism to the upload operation. This will depend on the implementation +of the wrapped remote. + +One special case is covered with `cache-writes` which will cache the file +data at the same time as the upload when it is enabled making it available +from the cache store immediately once the upload is finished. + +### Read Features ### + +#### Multiple connections #### + +To counter the high latency between a local PC where rclone is running +and cloud providers, the cache remote can split multiple requests to the +cloud provider for smaller file chunks and combines them together locally +where they can be available almost immediately before the reader usually +needs them. +This is similar to buffering when media files are played online. Rclone +will stay around the current marker but always try its best to stay ahead +and prepare the data before. + +#### Warmup mode #### + +A negative side of running multiple requests on the cloud provider is +that you can easily reach a limit on how many requests or how much data +you can download from a cloud provider in a window of time. +For this reason, a warmup mode is a state where `cache` changes its settings +to talk less with the cloud provider. + +To prevent a ban or a similar action from the cloud provider, `cache` will +keep track of all the open files and during intensive times when it passes +a configured threshold, it will change its settings to a warmup mode. + +It can also be disabled during single file streaming if `cache` sees that we're +reading the file in sequence and can go load its parts in advance. + +Affected settings: +- `cache-chunk-no-memory`: _disabled_ +- `cache-workers`: _1_ +- file chunks will now be cached using `cache-warm-up-age` as a duration instead of the +regular `cache-chunk-age` + +### Known issues ### + +#### cache and crypt #### + +One common scenario is to keep your data encrypted in the cloud provider +using the `crypt` remote. `crypt` uses a similar technique to wrap around +an existing remote and handles this translation in a seamless way. + +There is an issue with wrapping the remotes in this order: +**cloud remote** -> **crypt** -> **cache** + +During testing, I experienced a lot of bans with the remotes in this order. +I suspect it might be related to how crypt opens files on the cloud provider +which makes it think we're downloading the full file instead of small chunks. +Organizing the remotes in this order yelds better results: +**cloud remote** -> **cache** -> **crypt** + +### Specific options ### + +Here are the command line options specific to this cloud storage +system. + +#### --cache-db-path=PATH #### + +Path to where partial file data (chunks) and the file structure metadata +are stored locally. + +**Default**: / +**Example**: ~/.config/rclone/test-cache + +#### --cache-db-purge #### + +Flag to clear all the cached data for this remote before. + +**Default**: not set + +#### --cache-chunk-size=SIZE #### + +The size of a chunk (partial file data). Use lower numbers for slower +connections. + +**Default**: 5M + +#### --cache-info-age=DURATION #### + +How long to keep file structure information (directory listings, file size, +mod times etc) locally. + +If all write operations are done through `cache` then you can safely make +this value very large as the cache store will also be updated in real time. + +**Default**: 6h + +#### --cache-chunk-age=DURATION #### + +How long to keep file chunks (partial data) locally. + +Longer durations will result in larger cache stores as data will be cleared +less often. + +**Default**: 3h + +#### --cache-warm-up-age=DURATION #### + +How long to keep file chunks (partial data) locally during warmup times. + +If `cache` goes through intensive read times when it is scanned for information +then this setting will allow you to customize higher storage times for that +data. Otherwise, it's safe to keep the same value as `cache-chunk-age`. + +**Default**: 3h + +#### --cache-read-retries=RETRIES #### + +How many times to retry a read from a cache storage. + +Since reading from a `cache` stream is independent from downloading file data, +readers can get to a point where there's no more data in the cache. +Most of the times this can indicate a connectivity issue if `cache` isn't +able to provide file data anymore. + +For really slow connections, increase this to a point where the stream is +able to provide data but your experience will be very stuttering. + +**Default**: 3 + +#### --cache-workers=WORKERS #### + +How many workers should run in parallel to download chunks. + +Higher values will mean more parallel processing (better CPU needed) and +more concurrent requests on the cloud provider. +This impacts several aspects like the cloud provider API limits, more stress +on the hardware that rclone runs on but it also means that streams will +be more fluid and data will be available much more faster to readers. + +**Default**: 4 + +#### --cache-chunk-no-memory #### + +By default, `cache` will keep file data during streaming in RAM as well +to provide it to readers as fast as possible. + +This transient data is evicted as soon as it is read and the number of +chunks stored doesn't exceed the number of workers. However, depending +on other settings like `cache-chunk-size` and `cache-workers` this footprint +can increase if there are parallel streams too (multiple files being read +at the same time). + +If the hardware permits it, use this feature to provide an overall better +performance during streaming but it can also be disabled if RAM is not +available on the local machine. + +**Default**: not set + +#### --cache-rps=NUMBER #### + +Some of the rclone remotes that `cache` will wrap have back-off or limits +in place to not reach cloud provider limits. This is similar to that. +It places a hard limit on the number of requests per second that `cache` +will be doing to the cloud provider remote and try to respect that value +by setting waits between reads. + +If you find that you're getting banned or limited on the cloud provider +through cache and know that a smaller number of requests per second will +allow you to work with it then you can use this setting for that. + +A good balance of all the other settings and warmup times should make this +setting useless but it is available to set for more special cases. + +**NOTE**: This will limit the number of requests during streams but other +API calls to the cloud provider like directory listings will still pass. + +**Default**: 4 + +#### --cache-warm-up-rps=RATE/SECONDS #### + +This setting allows `cache` to change its settings for warmup mode or revert +back from it. + +`cache` keeps track of all open files and when there are `RATE` files open +during `SECONDS` window of time reached it will activate warmup and change +its settings as explained in the `Warmup mode` section. + +When the number of files being open goes under `RATE` in the same amount +of time, `cache` will disable this mode. + +**Default**: 3/20 + +#### --cache-writes #### + +If you need to read files immediately after you upload them through `cache` +you can enable this flag to have their data stored in the cache store at the +same time during upload. + +**Default**: not set diff --git a/docs/content/docs.md b/docs/content/docs.md index e4d3d57ef..328ebd503 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -23,6 +23,7 @@ See the following for detailed instructions for * [Amazon S3](/s3/) * [Backblaze B2](/b2/) * [Box](/box/) + * [Cache](/cache/) * [Crypt](/crypt/) - to encrypt other remotes * [DigitalOcean Spaces](/s3/#digitalocean-spaces) * [Dropbox](/dropbox/) diff --git a/docs/layouts/chrome/navbar.html b/docs/layouts/chrome/navbar.html index 354dd26a2..2a40e1c02 100644 --- a/docs/layouts/chrome/navbar.html +++ b/docs/layouts/chrome/navbar.html @@ -53,6 +53,7 @@
  • Amazon S3
  • Backblaze B2
  • Box
  • +
  • Cache
  • Crypt (encrypts the others)
  • Dropbox
  • FTP
  • diff --git a/fs/all/all.go b/fs/all/all.go index 99333fb6b..eed17a9cc 100644 --- a/fs/all/all.go +++ b/fs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/ncw/rclone/azureblob" _ "github.com/ncw/rclone/b2" _ "github.com/ncw/rclone/box" + _ "github.com/ncw/rclone/cache" _ "github.com/ncw/rclone/crypt" _ "github.com/ncw/rclone/drive" _ "github.com/ncw/rclone/dropbox" diff --git a/fs/test_all.go b/fs/test_all.go index 85d393f31..3fc6b1269 100644 --- a/fs/test_all.go +++ b/fs/test_all.go @@ -123,6 +123,11 @@ var ( SubDir: false, FastList: false, }, + { + Name: "TestCache:", + SubDir: true, + FastList: true, + }, } binary = "fs.test" // Flags diff --git a/fstest/fstests/gen_tests.go b/fstest/fstests/gen_tests.go index 23427d7d3..61482dec8 100644 --- a/fstest/fstests/gen_tests.go +++ b/fstest/fstests/gen_tests.go @@ -66,7 +66,7 @@ import ( "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest/fstests" "github.com/ncw/rclone/{{ .FsName }}" -{{ if eq .FsName "crypt" }} _ "github.com/ncw/rclone/local" +{{ if (or (eq .FsName "crypt") (eq .FsName "cache")) }} _ "github.com/ncw/rclone/local" {{end}}) func TestSetup{{ .Suffix }}(t *testing.T)() { @@ -166,5 +166,6 @@ func main() { generateTestProgram(t, fns, "AzureBlob", buildConstraint("go1.7")) generateTestProgram(t, fns, "Pcloud") generateTestProgram(t, fns, "Webdav") + generateTestProgram(t, fns, "Cache", buildConstraint("!plan9")) log.Printf("Done") }