diff --git a/vfs/cache.go b/vfs/cache.go new file mode 100644 index 000000000..329c0ba98 --- /dev/null +++ b/vfs/cache.go @@ -0,0 +1,276 @@ +// This deals with caching of files locally + +package vfs + +import ( + "fmt" + "os" + "path" + "path/filepath" + "runtime" + "strings" + "sync" + "time" + + "github.com/djherbis/times" + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// CacheMode controls the functionality of the cache +type CacheMode byte + +// CacheMode options +const ( + CacheModeOff CacheMode = iota // cache nothing - return errors for writes which can't be satisfied + CacheModeMinimal // cache only the minimum, eg read/write opens + CacheModeWrites // cache all files opened with write intent + CacheModeFull // cache all files opened in any mode +) + +var cacheModeToString = []string{ + CacheModeOff: "off", + CacheModeMinimal: "minimal", + CacheModeWrites: "writes", + CacheModeFull: "full", +} + +// String turns a CacheMode into a string +func (l CacheMode) String() string { + if l >= CacheMode(len(cacheModeToString)) { + return fmt.Sprintf("CacheMode(%d)", l) + } + return cacheModeToString[l] +} + +// Set a CacheMode +func (l *CacheMode) Set(s string) error { + for n, name := range cacheModeToString { + if s != "" && name == s { + *l = CacheMode(n) + return nil + } + } + return errors.Errorf("Unknown cache mode level %q", s) +} + +// Type of the value +func (l *CacheMode) Type() string { + return "string" +} + +// cache opened files +type cache struct { + f fs.Fs // fs for the cache directory + opt *Options // vfs Options + root string // root of the cache directory + itemMu sync.Mutex // protects the next two maps + item map[string]*cacheItem // files in the cache +} + +// cacheItem is stored in the item map +type cacheItem struct { + opens int // number of times file is open + atime time.Time // last time file was accessed +} + +// newCacheItem returns an item for the cache +func newCacheItem() *cacheItem { + return &cacheItem{atime: time.Now()} +} + +// newCache creates a new cache heirachy for f +// +// This starts background goroutines which can be cancelled with the +// context passed in. +func newCache(ctx context.Context, f fs.Fs, opt *Options) (*cache, error) { + fRoot := filepath.FromSlash(f.Root()) + if runtime.GOOS == "windows" { + if strings.HasPrefix(fRoot, `\\?`) { + fRoot = fRoot[3:] + } + fRoot = strings.Replace(fRoot, ":", "", -1) + } + root := filepath.Join(fs.CacheDir, "vfs", f.Name(), fRoot) + fs.Debugf(nil, "vfs cache root is %q", root) + + f, err := fs.NewFs(root) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache remote") + } + + c := &cache{ + f: f, + opt: opt, + root: root, + item: make(map[string]*cacheItem), + } + + go c.cleaner(ctx) + + return c, nil +} + +// mkdir makes the directory for name in the cache and returns an os +// path for the file +func (c *cache) mkdir(name string) (string, error) { + parent := path.Dir(name) + if parent == "." { + parent = "" + } + leaf := path.Base(name) + parentPath := filepath.Join(c.root, filepath.FromSlash(parent)) + err := os.MkdirAll(parentPath, 0700) + if err != nil { + return "", errors.Wrap(err, "make cache directory failed") + } + return filepath.Join(parentPath, leaf), nil +} + +// _get gets name from the cache or creates a new one +// +// must be called with itemMu held +func (c *cache) _get(name string) *cacheItem { + item := c.item[name] + if item == nil { + item = newCacheItem() + c.item[name] = item + } + return item +} + +// get gets name from the cache or creates a new one +func (c *cache) get(name string) *cacheItem { + c.itemMu.Lock() + item := c._get(name) + c.itemMu.Unlock() + return item +} + +// updateTime sets the atime of the name to that passed in if it is +// newer than the existing or there isn't an existing time. +func (c *cache) updateTime(name string, when time.Time) { + c.itemMu.Lock() + item := c._get(name) + if when.Sub(item.atime) > 0 { + fs.Debugf(name, "updateTime: setting atime to %v", when) + item.atime = when + } + c.itemMu.Unlock() +} + +// open marks name as open +func (c *cache) open(name string) { + c.itemMu.Lock() + item := c._get(name) + item.opens++ + item.atime = time.Now() + c.itemMu.Unlock() +} + +// close marks name as closed +func (c *cache) close(name string) { + c.itemMu.Lock() + item := c._get(name) + item.opens-- + item.atime = time.Now() + if item.opens < 0 { + fs.Errorf(name, "cache: double close") + } + c.itemMu.Unlock() +} + +// cleanUp empties the cache of everything +func (c *cache) cleanUp() error { + return os.RemoveAll(c.root) +} + +// updateAtimes walks the cache updating any atimes it finds +func (c *cache) updateAtimes() error { + return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + if !fi.IsDir() { + // Find path relative to the cache root + name, err := filepath.Rel(c.root, osPath) + if err != nil { + return errors.Wrap(err, "filepath.Rel failed in updatAtimes") + } + // And convert into slashes + name = filepath.ToSlash(name) + + // Update the atime with that of the file + atime := times.Get(fi).AccessTime() + c.updateTime(name, atime) + } + return nil + }) +} + +// purgeOld gets rid of any files that are over age +func (c *cache) purgeOld(maxAge time.Duration) { + c.itemMu.Lock() + defer c.itemMu.Unlock() + cutoff := time.Now().Add(-maxAge) + for name, item := range c.item { + // If not locked and access time too long ago - delete the file + dt := item.atime.Sub(cutoff) + // fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt) + if item.opens == 0 && dt < 0 { + osPath := filepath.Join(c.root, filepath.FromSlash(name)) + err := os.Remove(osPath) + if err != nil { + fs.Errorf(name, "Failed to remove from cache: %v", err) + } else { + fs.Debugf(name, "Removed from cache") + } + // Remove the entry + delete(c.item, name) + } + } +} + +// clean empties the cache of stuff if it can +func (c *cache) clean() { + // Cache may be empty so end + _, err := os.Stat(c.root) + if os.IsNotExist(err) { + return + } + + fs.Debugf(nil, "Cleaning the cache") + + // first walk the FS to update the atimes + err = c.updateAtimes() + if err != nil { + fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err) + } + + // Now remove any files that are over age + c.purgeOld(c.opt.CacheMaxAge) + + // Now tidy up any empty directories + err = fs.Rmdirs(c.f, "") + if err != nil { + fs.Errorf(c.f, "Failed to remove empty directories from cache: %v", err) + } +} + +// cleaner calls clean at regular intervals +// +// doesn't return until context is cancelled +func (c *cache) cleaner(ctx context.Context) { + timer := time.NewTicker(c.opt.CachePollInterval) + defer timer.Stop() + for { + select { + case <-timer.C: + c.clean() + case <-ctx.Done(): + fs.Debugf(nil, "cache cleaner exiting") + return + } + } +} diff --git a/vfs/cache_test.go b/vfs/cache_test.go new file mode 100644 index 000000000..6b67a8dcf --- /dev/null +++ b/vfs/cache_test.go @@ -0,0 +1,144 @@ +package vfs + +import ( + "io/ioutil" + "log" + "os" + "path/filepath" + "testing" + "time" + + "github.com/djherbis/times" + "github.com/ncw/rclone/fstest" + "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" // switch to "context" when we stop supporting go1.6 +) + +// Check CacheMode it satisfies the pflag interface +var _ pflag.Value = (*CacheMode)(nil) + +func TestCacheModeString(t *testing.T) { + assert.Equal(t, "off", CacheModeOff.String()) + assert.Equal(t, "full", CacheModeFull.String()) + assert.Equal(t, "CacheMode(17)", CacheMode(17).String()) +} + +func TestCacheModeSet(t *testing.T) { + var m CacheMode + + err := m.Set("full") + assert.NoError(t, err) + assert.Equal(t, CacheModeFull, m) + + err = m.Set("potato") + assert.Error(t, err, "Unknown cache mode level") + + err = m.Set("") + assert.Error(t, err, "Unknown cache mode level") +} + +func TestCacheModeType(t *testing.T) { + var m CacheMode + assert.Equal(t, "string", m.Type()) +} + +func TestCacheNew(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := newCache(ctx, r.Fremote, &DefaultOpt) + require.NoError(t, err) + + assert.Contains(t, c.root, "vfs") + assert.Contains(t, c.f.Root(), filepath.Base(r.Fremote.Root())) + + // mkdir + p, err := c.mkdir("potato") + require.NoError(t, err) + assert.Equal(t, "potato", filepath.Base(p)) + + fi, err := os.Stat(filepath.Dir(p)) + require.NoError(t, err) + assert.True(t, fi.IsDir()) + + // get + item := c.get("potato") + item2 := c.get("potato") + assert.Equal(t, item, item2) + assert.WithinDuration(t, time.Now(), item.atime, time.Second) + + // updateTime + //.. before + t1 := time.Now().Add(-60 * time.Minute) + c.updateTime("potato", t1) + item = c.get("potato") + assert.NotEqual(t, t1, item.atime) + assert.Equal(t, 0, item.opens) + //..after + t2 := time.Now().Add(60 * time.Minute) + c.updateTime("potato", t2) + item = c.get("potato") + assert.Equal(t, t2, item.atime) + assert.Equal(t, 0, item.opens) + + // open + c.open("potato") + item = c.get("potato") + assert.WithinDuration(t, time.Now(), item.atime, time.Second) + assert.Equal(t, 1, item.opens) + + // write the file + err = ioutil.WriteFile(p, []byte("hello"), 0600) + require.NoError(t, err) + + // read its atime + fi, err = os.Stat(p) + assert.NoError(t, err) + atime := times.Get(fi).AccessTime() + + // updateAtimes + log.Printf("updateAtimes") + item = c.get("potato") + item.atime = time.Now().Add(-24 * time.Hour) + err = c.updateAtimes() + require.NoError(t, err) + item = c.get("potato") + assert.Equal(t, atime, item.atime) + + // try purging with file open + c.purgeOld(10 * time.Second) + _, err = os.Stat(p) + assert.NoError(t, err) + + // close + c.updateTime("potato", t2) + c.close("potato") + item = c.get("potato") + assert.WithinDuration(t, time.Now(), item.atime, time.Second) + assert.Equal(t, 0, item.opens) + + // try purging with file closed + c.purgeOld(10 * time.Second) + // ...nothing should happend + _, err = os.Stat(p) + assert.NoError(t, err) + + //.. purge again with -ve age + c.purgeOld(-10 * time.Second) + _, err = os.Stat(p) + assert.True(t, os.IsNotExist(err)) + + // clean - have tested the internals already + c.clean() + + // cleanup + err = c.cleanUp() + require.NoError(t, err) + _, err = os.Stat(c.root) + assert.True(t, os.IsNotExist(err)) +} diff --git a/vfs/dir.go b/vfs/dir.go index a75de3998..4af06ca6b 100644 --- a/vfs/dir.go +++ b/vfs/dir.go @@ -306,9 +306,12 @@ func (d *Dir) ReadDirAll() (items Nodes, err error) { return items, nil } +// accessModeMask masks off the read modes from the flags +const accessModeMask = (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) + // Open the directory according to the flags provided func (d *Dir) Open(flags int) (fd Handle, err error) { - rdwrMode := flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) + rdwrMode := flags & accessModeMask if rdwrMode != os.O_RDONLY { fs.Errorf(d, "Can only open directories read only") return nil, EPERM @@ -498,3 +501,8 @@ func (d *Dir) Fsync() error { func (d *Dir) VFS() *VFS { return d.vfs } + +// Truncate changes the size of the named file. +func (d *Dir) Truncate(size int64) error { + return ENOSYS +} diff --git a/vfs/dir_test.go b/vfs/dir_test.go index f1724572e..c79d5db5a 100644 --- a/vfs/dir_test.go +++ b/vfs/dir_test.go @@ -282,7 +282,7 @@ func TestDirCreate(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(0), file.Size()) - fd, err := file.Open(os.O_WRONLY) + fd, err := file.Open(os.O_WRONLY | os.O_CREATE) require.NoError(t, err) // FIXME Note that this fails with the current implementation diff --git a/vfs/file.go b/vfs/file.go index 75ac1073b..11832e18f 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -189,7 +189,7 @@ func (f *File) setObject(o fs.Object) { } // Wait for f.o to become non nil for a short time returning it or an -// error +// error. Use when opening a read handle. // // Call without the mutex held func (f *File) waitForValidObject() (o fs.Object, err error) { @@ -219,9 +219,8 @@ func (f *File) OpenRead() (fh *ReadFileHandle, err error) { // fs.Debugf(o, "File.OpenRead") fh, err = newReadFileHandle(f, o) - err = errors.Wrap(err, "open for read") - if err != nil { + err = errors.Wrap(err, "open for read") fs.Errorf(f, "File.OpenRead failed: %v", err) return nil, err } @@ -236,15 +235,32 @@ func (f *File) OpenWrite() (fh *WriteFileHandle, err error) { // fs.Debugf(o, "File.OpenWrite") fh, err = newWriteFileHandle(f.d, f, f.path()) - err = errors.Wrap(err, "open for write") - if err != nil { + err = errors.Wrap(err, "open for write") fs.Errorf(f, "File.OpenWrite failed: %v", err) return nil, err } return fh, nil } +// OpenRW open the file for read and write using a temporay file +// +// It uses the open flags passed in. +func (f *File) OpenRW(flags int) (fh *RWFileHandle, err error) { + if flags&accessModeMask != os.O_RDONLY && f.d.vfs.Opt.ReadOnly { + return nil, EROFS + } + // fs.Debugf(o, "File.OpenRW") + + fh, err = newRWFileHandle(f.d, f, f.path(), flags) + if err != nil { + err = errors.Wrap(err, "open for read write") + fs.Errorf(f, "File.OpenRW failed: %v", err) + return nil, err + } + return fh, nil +} + // Fsync the file // // Note that we don't do anything except return OK @@ -290,25 +306,85 @@ func (f *File) VFS() *VFS { } // Open a file according to the flags provided +// +// O_RDONLY open the file read-only. +// O_WRONLY open the file write-only. +// O_RDWR open the file read-write. +// +// O_APPEND append data to the file when writing. +// O_CREATE create a new file if none exists. +// O_EXCL used with O_CREATE, file must not exist +// O_SYNC open for synchronous I/O. +// O_TRUNC if possible, truncate file when opene +// +// We ignore O_SYNC and O_EXCL func (f *File) Open(flags int) (fd Handle, err error) { - rdwrMode := flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) - var read bool + var ( + write bool // if set need write support + read bool // if set need read support + rdwrMode = flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) + ) + + // Figure out the read/write intents switch { case rdwrMode == os.O_RDONLY: read = true - case rdwrMode == os.O_WRONLY || (rdwrMode == os.O_RDWR && (flags&os.O_TRUNC) != 0): - read = false + case rdwrMode == os.O_WRONLY: + write = true case rdwrMode == os.O_RDWR: - fs.Errorf(f, "Can't open for Read and Write") - return nil, EPERM + read = true + write = true default: fs.Errorf(f, "Can't figure out how to open with flags: 0x%X", flags) return nil, EPERM } - if read { - fd, err = f.OpenRead() + + // If append is set then set read to force OpenRW + if flags&os.O_APPEND != 0 { + read = true + } + + // If truncate is set then set write to force OpenRW + if flags&os.O_TRUNC != 0 { + write = true + } + + // FIXME discover if file is in cache or not? + + // Open the correct sort of handle + CacheMode := f.d.vfs.Opt.CacheMode + if read && write { + if CacheMode >= CacheModeMinimal { + fd, err = f.OpenRW(flags) + } else if flags&os.O_TRUNC != 0 { + fd, err = f.OpenWrite() + } else { + fs.Errorf(f, "Can't open for read and write without cache") + return nil, EPERM + } + } else if write { + if CacheMode >= CacheModeWrites { + fd, err = f.OpenRW(flags) + } else { + fd, err = f.OpenWrite() + } + } else if read { + if CacheMode >= CacheModeFull { + fd, err = f.OpenRW(flags) + } else { + fd, err = f.OpenRead() + } } else { - fd, err = f.OpenWrite() + fs.Errorf(f, "Can't figure out how to open with flags: 0x%X", flags) + return nil, EPERM } return fd, err } + +// Truncate changes the size of the named file. +func (f *File) Truncate(size int64) error { + if f.d.vfs.Opt.CacheMode >= CacheModeWrites { + } + // FIXME + return ENOSYS +} diff --git a/vfs/file_test.go b/vfs/file_test.go index 87ced3655..af3b29621 100644 --- a/vfs/file_test.go +++ b/vfs/file_test.go @@ -160,7 +160,7 @@ func TestFileOpen(t *testing.T) { _, file, _ := fileCreate(t, r) fd, err := file.Open(os.O_RDONLY) - assert.NoError(t, err) + require.NoError(t, err) _, ok := fd.(*ReadFileHandle) assert.True(t, ok) require.NoError(t, fd.Close()) @@ -171,12 +171,6 @@ func TestFileOpen(t *testing.T) { assert.True(t, ok) require.NoError(t, fd.Close()) - fd, err = file.Open(os.O_RDWR | os.O_TRUNC) - assert.NoError(t, err) - _, ok = fd.(*WriteFileHandle) - assert.True(t, ok) - require.NoError(t, fd.Close()) - fd, err = file.Open(os.O_RDWR) assert.Equal(t, EPERM, err) diff --git a/vfs/read_write.go b/vfs/read_write.go new file mode 100644 index 000000000..7301277ca --- /dev/null +++ b/vfs/read_write.go @@ -0,0 +1,406 @@ +package vfs + +import ( + "io" + "os" + "sync" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" +) + +// RWFileHandle is a handle that can be open for read and write. +// +// It will be open to a temporary file which, when closed, will be +// transferred to the remote. +type RWFileHandle struct { + *os.File + mu sync.Mutex + closed bool // set if handle has been closed + o fs.Object // may be nil + remote string + file *File + d *Dir + opened bool + flags int // open flags + osPath string // path to the file in the cache + writeCalled bool // if any Write() methods have been called +} + +// Check interfaces +var ( + _ io.Reader = (*RWFileHandle)(nil) + _ io.ReaderAt = (*RWFileHandle)(nil) + _ io.Writer = (*RWFileHandle)(nil) + _ io.WriterAt = (*RWFileHandle)(nil) + _ io.Seeker = (*RWFileHandle)(nil) + _ io.Closer = (*RWFileHandle)(nil) +) + +func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandle, err error) { + // Make a place for the file + osPath, err := d.vfs.cache.mkdir(remote) + if err != nil { + return nil, errors.Wrap(err, "open RW handle failed to make cache directory") + } + + fh = &RWFileHandle{ + o: f.o, + file: f, + d: d, + remote: remote, + flags: flags, + osPath: osPath, + } + return fh, nil +} + +// openPending opens the file if there is a pending open +// +// call with the lock held +func (fh *RWFileHandle) openPending(truncate bool) (err error) { + if fh.opened { + return nil + } + + rdwrMode := fh.flags & accessModeMask + + // if not truncating the file, need to read it first + if fh.flags&os.O_TRUNC == 0 && !truncate { + // Fetch the file if it hasn't changed + // FIXME retries + err = fs.CopyFile(fh.d.vfs.cache.f, fh.d.vfs.f, fh.remote, fh.remote) + if err != nil { + // if the object wasn't found AND O_CREATE is set then... + cause := errors.Cause(err) + notFound := cause == fs.ErrorObjectNotFound || cause == fs.ErrorDirNotFound + if notFound { + // Remove cached item if there is one + err = os.Remove(fh.osPath) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "open RW handle failed to delete stale cache file") + } + } + if notFound && fh.flags&os.O_CREATE != 0 { + // ...ignore error as we are about to create the file + } else { + return errors.Wrap(err, "open RW handle failed to cache file") + } + } + } else { + // Set the size to 0 since we are truncating + fh.file.setSize(0) + } + + if rdwrMode != os.O_RDONLY { + fh.file.addWriters(1) + } + + fs.Debugf(fh.remote, "Opening cached copy with flags=0x%02X", fh.flags) + fd, err := os.OpenFile(fh.osPath, fh.flags|os.O_CREATE, 0600) + if err != nil { + return errors.Wrap(err, "cache open file failed") + } + fh.File = fd + fh.opened = true + fh.d.vfs.cache.open(fh.osPath) + return nil +} + +// String converts it to printable +func (fh *RWFileHandle) String() string { + if fh == nil { + return "" + } + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.file == nil { + return "" + } + return fh.file.String() + " (rw)" +} + +// Node returns the Node assocuated with this - satisfies Noder interface +func (fh *RWFileHandle) Node() Node { + fh.mu.Lock() + defer fh.mu.Unlock() + return fh.file +} + +// close the file handle returning EBADF if it has been +// closed already. +// +// Must be called with fh.mu held +// +// Note that we leave the file around in the cache on error conditions +// to give the user a chance to recover it. +func (fh *RWFileHandle) close() (err error) { + defer fs.Trace(fh.remote, "")("err=%v", &err) + if fh.closed { + return ECLOSED + } + fh.closed = true + rdwrMode := fh.flags & accessModeMask + if !fh.opened { + // If read only then return + if rdwrMode == os.O_RDONLY { + return nil + } + // If we aren't creating or truncating the file then + // we haven't modified it so don't need to transfer it + if fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 { + return nil + } + // Otherwise open the file + // FIXME this could be more efficient + if err := fh.openPending(false); err != nil { + return err + } + } + if rdwrMode != os.O_RDONLY { + fh.file.addWriters(-1) + fi, err := fh.File.Stat() + if err != nil { + fs.Errorf(fh.remote, "Failed to stat cache file: %v", err) + } else { + fh.file.setSize(fi.Size()) + } + } + fh.d.vfs.cache.close(fh.osPath) + + // Close the underlying file + err = fh.File.Close() + if err != nil { + return err + } + + // FIXME measure whether we actually did any writes or not - + // no writes means no transfer? + if rdwrMode == os.O_RDONLY { + fs.Debugf(fh.remote, "read only so not transferring") + return nil + } + + // If write hasn't been called and we aren't creating or + // truncating the file then we haven't modified it so don't + // need to transfer it + if !fh.writeCalled && fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 { + fs.Debugf(fh.remote, "not modified so not transferring") + return nil + } + + // Transfer the temp file to the remote + // FIXME retries + if fh.d.vfs.Opt.CacheMode < CacheModeFull { + err = fs.MoveFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote) + } else { + err = fs.CopyFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote) + } + if err != nil { + err = errors.Wrap(err, "failed to transfer file from cache to remote") + fs.Errorf(fh.remote, "%v", err) + return err + } + + // FIXME get MoveFile to return this object + o, err := fh.d.vfs.f.NewObject(fh.remote) + if err != nil { + err = errors.Wrap(err, "failed to find object after transfer to remote") + fs.Errorf(fh.remote, "%v", err) + return err + } + fh.file.setObject(o) + fs.Debugf(o, "transferred to remote") + + return nil +} + +// Close closes the file +func (fh *RWFileHandle) Close() error { + fh.mu.Lock() + defer fh.mu.Unlock() + return fh.close() +} + +// Flush is called each time the file or directory is closed. +// Because there can be multiple file descriptors referring to a +// single opened file, Flush can be called multiple times. +func (fh *RWFileHandle) Flush() error { + fh.mu.Lock() + defer fh.mu.Unlock() + if !fh.opened { + return nil + } + if fh.closed { + fs.Debugf(fh.remote, "RWFileHandle.Flush nothing to do") + return nil + } + // fs.Debugf(fh.remote, "RWFileHandle.Flush") + if !fh.opened { + fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unopened handle") + return nil + } + + // If Write hasn't been called then ignore the Flush - Release + // will pick it up + if !fh.writeCalled { + fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unwritten handle") + return nil + } + err := fh.close() + if err != nil { + fs.Errorf(fh.remote, "RWFileHandle.Flush error: %v", err) + } else { + // fs.Debugf(fh.remote, "RWFileHandle.Flush OK") + } + return err +} + +// Release is called when we are finished with the file handle +// +// It isn't called directly from userspace so the error is ignored by +// the kernel +func (fh *RWFileHandle) Release() error { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + fs.Debugf(fh.remote, "RWFileHandle.Release nothing to do") + return nil + } + fs.Debugf(fh.remote, "RWFileHandle.Release closing") + err := fh.close() + if err != nil { + fs.Errorf(fh.remote, "RWFileHandle.Release error: %v", err) + } else { + // fs.Debugf(fh.remote, "RWFileHandle.Release OK") + } + return err +} + +// Size returns the size of the underlying file +func (fh *RWFileHandle) Size() int64 { + fh.mu.Lock() + defer fh.mu.Unlock() + if !fh.opened { + return fh.file.Size() + } + fi, err := fh.File.Stat() + if err != nil { + return 0 + } + return fi.Size() +} + +// Stat returns info about the file +func (fh *RWFileHandle) Stat() (os.FileInfo, error) { + fh.mu.Lock() + defer fh.mu.Unlock() + return fh.file, nil +} + +// Read bytes from the file +func (fh *RWFileHandle) Read(b []byte) (n int, err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + return 0, ECLOSED + } + if err = fh.openPending(false); err != nil { + return n, err + } + return fh.File.Read(b) +} + +// ReadAt bytes from the file at off +func (fh *RWFileHandle) ReadAt(b []byte, off int64) (n int, err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + return 0, ECLOSED + } + if err = fh.openPending(false); err != nil { + return n, err + } + return fh.File.ReadAt(b, off) +} + +// Seek to new file position +func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + return 0, ECLOSED + } + if err = fh.openPending(false); err != nil { + return ret, err + } + return fh.File.Seek(offset, whence) +} + +// writeFn general purpose write call +// +// Pass a closure to do the actual write +func (fh *RWFileHandle) writeFn(write func() error) (err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + return ECLOSED + } + if err = fh.openPending(false); err != nil { + return err + } + fh.writeCalled = true + err = write() + if err != nil { + return err + } + fi, err := fh.File.Stat() + if err != nil { + return errors.Wrap(err, "failed to stat cache file") + } + fh.file.setSize(fi.Size()) + return nil +} + +// Write bytes to the file +func (fh *RWFileHandle) Write(b []byte) (n int, err error) { + err = fh.writeFn(func() error { + n, err = fh.File.Write(b) + return err + }) + return n, err +} + +// WriteAt bytes to the file at off +func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) { + err = fh.writeFn(func() error { + n, err = fh.File.WriteAt(b, off) + return err + }) + return n, err +} + +// WriteString a string to the file +func (fh *RWFileHandle) WriteString(s string) (n int, err error) { + err = fh.writeFn(func() error { + n, err = fh.File.WriteString(s) + return err + }) + return n, err + +} + +// Truncate file to given size +func (fh *RWFileHandle) Truncate(size int64) (err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + return ECLOSED + } + if err = fh.openPending(size == 0); err != nil { + return err + } + fh.writeCalled = true + fh.file.setSize(size) + return fh.File.Truncate(size) +} diff --git a/vfs/read_write_test.go b/vfs/read_write_test.go new file mode 100644 index 000000000..49f0b46b2 --- /dev/null +++ b/vfs/read_write_test.go @@ -0,0 +1,432 @@ +package vfs + +import ( + "io" + "os" + "testing" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Open a file for write +func rwHandleCreateReadOnly(t *testing.T, r *fstest.Run) (*VFS, *RWFileHandle) { + vfs := New(r.Fremote, nil) + vfs.Opt.CacheMode = CacheModeFull + + file1 := r.WriteObject("dir/file1", "0123456789abcdef", t1) + fstest.CheckItems(t, r.Fremote, file1) + + h, err := vfs.OpenFile("dir/file1", os.O_RDONLY, 0777) + require.NoError(t, err) + fh, ok := h.(*RWFileHandle) + require.True(t, ok) + + return vfs, fh +} + +// Open a file for write +func rwHandleCreateWriteOnly(t *testing.T, r *fstest.Run) (*VFS, *RWFileHandle) { + vfs := New(r.Fremote, nil) + vfs.Opt.CacheMode = CacheModeFull + + h, err := vfs.OpenFile("file1", os.O_WRONLY|os.O_CREATE, 0777) + require.NoError(t, err) + fh, ok := h.(*RWFileHandle) + require.True(t, ok) + + return vfs, fh +} + +// read data from the string +func rwReadString(t *testing.T, fh *RWFileHandle, n int) string { + buf := make([]byte, n) + n, err := fh.Read(buf) + if err != io.EOF { + assert.NoError(t, err) + } + return string(buf[:n]) +} + +func TestRWFileHandleMethodsRead(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateReadOnly(t, r) + + // String + assert.Equal(t, "dir/file1 (rw)", fh.String()) + assert.Equal(t, "", (*RWFileHandle)(nil).String()) + assert.Equal(t, "", new(RWFileHandle).String()) + + // Node + node := fh.Node() + assert.Equal(t, "file1", node.Name()) + + // Size + assert.Equal(t, int64(16), fh.Size()) + + // Read 1 + assert.Equal(t, "0", rwReadString(t, fh, 1)) + + // Read remainder + assert.Equal(t, "123456789abcdef", rwReadString(t, fh, 256)) + + // Read EOF + buf := make([]byte, 16) + _, err := fh.Read(buf) + assert.Equal(t, io.EOF, err) + + // Stat + var fi os.FileInfo + fi, err = fh.Stat() + assert.NoError(t, err) + assert.Equal(t, int64(16), fi.Size()) + assert.Equal(t, "file1", fi.Name()) + + // Close + assert.False(t, fh.closed) + assert.Equal(t, nil, fh.Close()) + assert.True(t, fh.closed) + + // Close again + assert.Equal(t, ECLOSED, fh.Close()) +} + +func TestRWFileHandleSeek(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateReadOnly(t, r) + + assert.Equal(t, "0", rwReadString(t, fh, 1)) + + // 0 means relative to the origin of the file, + n, err := fh.Seek(5, 0) + assert.NoError(t, err) + assert.Equal(t, int64(5), n) + assert.Equal(t, "5", rwReadString(t, fh, 1)) + + // 1 means relative to the current offset + n, err = fh.Seek(-3, 1) + assert.NoError(t, err) + assert.Equal(t, int64(3), n) + assert.Equal(t, "3", rwReadString(t, fh, 1)) + + // 2 means relative to the end. + n, err = fh.Seek(-3, 2) + assert.NoError(t, err) + assert.Equal(t, int64(13), n) + assert.Equal(t, "d", rwReadString(t, fh, 1)) + + // Seek off the end + n, err = fh.Seek(100, 0) + assert.NoError(t, err) + + // Get the error on read + buf := make([]byte, 16) + l, err := fh.Read(buf) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, l) + + // Close + assert.Equal(t, nil, fh.Close()) +} + +func TestRWFileHandleReadAt(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateReadOnly(t, r) + + // read from start + buf := make([]byte, 1) + n, err := fh.ReadAt(buf, 0) + require.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, "0", string(buf[:n])) + + // seek forwards + n, err = fh.ReadAt(buf, 5) + require.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, "5", string(buf[:n])) + + // seek backwards + n, err = fh.ReadAt(buf, 1) + require.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, "1", string(buf[:n])) + + // read exactly to the end + buf = make([]byte, 6) + n, err = fh.ReadAt(buf, 10) + require.NoError(t, err) + assert.Equal(t, 6, n) + assert.Equal(t, "abcdef", string(buf[:n])) + + // read off the end + buf = make([]byte, 256) + n, err = fh.ReadAt(buf, 10) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 6, n) + assert.Equal(t, "abcdef", string(buf[:n])) + + // read starting off the end + n, err = fh.ReadAt(buf, 100) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) + + // Properly close the file + assert.NoError(t, fh.Close()) + + // check reading on closed file + n, err = fh.ReadAt(buf, 100) + assert.Equal(t, ECLOSED, err) +} + +func TestRWFileHandleFlushRead(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateReadOnly(t, r) + + // Check Flush does nothing if read not called + err := fh.Flush() + assert.NoError(t, err) + assert.False(t, fh.closed) + + // Read data + buf := make([]byte, 256) + n, err := fh.Read(buf) + assert.True(t, err == io.EOF || err == nil) + assert.Equal(t, 16, n) + + // Check Flush does nothing if read called + err = fh.Flush() + assert.NoError(t, err) + assert.False(t, fh.closed) + + // Check flush does nothing if called again + err = fh.Flush() + assert.NoError(t, err) + assert.False(t, fh.closed) + + // Properly close the file + assert.NoError(t, fh.Close()) +} + +func TestRWFileHandleReleaseRead(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateReadOnly(t, r) + + // Read data + buf := make([]byte, 256) + n, err := fh.Read(buf) + assert.True(t, err == io.EOF || err == nil) + assert.Equal(t, 16, n) + + // Check Release closes file + err = fh.Release() + assert.NoError(t, err) + assert.True(t, fh.closed) + + // Check Release does nothing if called again + err = fh.Release() + assert.NoError(t, err) + assert.True(t, fh.closed) +} + +/// ------------------------------------------------------------ + +func TestRWFileHandleMethodsWrite(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + vfs, fh := rwHandleCreateWriteOnly(t, r) + + // String + assert.Equal(t, "file1 (rw)", fh.String()) + assert.Equal(t, "", (*RWFileHandle)(nil).String()) + assert.Equal(t, "", new(RWFileHandle).String()) + + // Node + node := fh.Node() + assert.Equal(t, "file1", node.Name()) + + offset := func() int64 { + n, err := fh.Seek(0, 1) + require.NoError(t, err) + return n + } + + // Offset #1 + assert.Equal(t, int64(0), offset()) + assert.Equal(t, int64(0), node.Size()) + + // Size #1 + assert.Equal(t, int64(0), fh.Size()) + + // Write + n, err := fh.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + + // Offset #2 + assert.Equal(t, int64(5), offset()) + assert.Equal(t, int64(5), node.Size()) + + // Size #2 + assert.Equal(t, int64(5), fh.Size()) + + // WriteString + n, err = fh.WriteString(" world!") + assert.NoError(t, err) + assert.Equal(t, 7, n) + + // Stat + var fi os.FileInfo + fi, err = fh.Stat() + assert.NoError(t, err) + assert.Equal(t, int64(12), fi.Size()) + assert.Equal(t, "file1", fi.Name()) + + // Truncate + err = fh.Truncate(11) + assert.NoError(t, err) + + // Close + assert.NoError(t, fh.Close()) + + // Check double close + err = fh.Close() + assert.Equal(t, ECLOSED, err) + + // check vfs + root, err := vfs.Root() + checkListing(t, root, []string{"file1,11,false"}) + + // check the underlying r.Fremote but not the modtime + file1 := fstest.NewItem("file1", "hello world", t1) + fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, []string{}, fs.ModTimeNotSupported) +} + +func TestRWFileHandleWriteAt(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + vfs, fh := rwHandleCreateWriteOnly(t, r) + + offset := func() int64 { + n, err := fh.Seek(0, 1) + require.NoError(t, err) + return n + } + + // Preconditions + assert.Equal(t, int64(0), offset()) + assert.False(t, fh.writeCalled) + + // Write the data + n, err := fh.WriteAt([]byte("hello**"), 0) + assert.NoError(t, err) + assert.Equal(t, 7, n) + + // After write + assert.Equal(t, int64(0), offset()) + assert.True(t, fh.writeCalled) + + // Write more data + n, err = fh.WriteAt([]byte(" world"), 5) + assert.NoError(t, err) + assert.Equal(t, 6, n) + + // Close + assert.NoError(t, fh.Close()) + + // Check can't write on closed handle + n, err = fh.WriteAt([]byte("hello"), 0) + assert.Equal(t, ECLOSED, err) + assert.Equal(t, 0, n) + + // check vfs + root, err := vfs.Root() + checkListing(t, root, []string{"file1,11,false"}) + + // check the underlying r.Fremote but not the modtime + file1 := fstest.NewItem("file1", "hello world", t1) + fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, []string{}, fs.ModTimeNotSupported) +} + +func TestRWFileHandleWriteNoWrite(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + vfs, fh := rwHandleCreateWriteOnly(t, r) + + // Close the file without writing to it + err := fh.Close() + assert.NoError(t, err) + + // Create a different file (not in the cache) + h, err := vfs.OpenFile("file2", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0777) + require.NoError(t, err) + + // Close it with Flush and Release + err = h.Flush() + assert.NoError(t, err) + err = h.Release() + assert.NoError(t, err) + + // check vfs + root, err := vfs.Root() + checkListing(t, root, []string{"file1,0,false", "file2,0,false"}) + + // check the underlying r.Fremote but not the modtime + file1 := fstest.NewItem("file1", "", t1) + file2 := fstest.NewItem("file2", "", t1) + fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1, file2}, []string{}, fs.ModTimeNotSupported) +} + +func TestRWFileHandleFlushWrite(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateWriteOnly(t, r) + + // Check Flush does nothing if write not called + err := fh.Flush() + assert.NoError(t, err) + assert.False(t, fh.closed) + + // Write some data + n, err := fh.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + + // Check Flush closes file if write called + err = fh.Flush() + assert.NoError(t, err) + assert.True(t, fh.closed) + + // Check flush does nothing if called again + err = fh.Flush() + assert.NoError(t, err) + assert.True(t, fh.closed) +} + +func TestRWFileHandleReleaseWrite(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + _, fh := rwHandleCreateWriteOnly(t, r) + + // Write some data + n, err := fh.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + + // Check Release closes file + err = fh.Release() + assert.NoError(t, err) + assert.True(t, fh.closed) + + // Check Release does nothing if called again + err = fh.Release() + assert.NoError(t, err) + assert.True(t, fh.closed) +} diff --git a/vfs/vfs.go b/vfs/vfs.go index 40ac5a132..b34bcd4ee 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -27,21 +27,25 @@ import ( "time" "github.com/ncw/rclone/fs" + "golang.org/x/net/context" // switch to "context" when we stop supporting go1.6 ) // DefaultOpt is the default values uses for Opt var DefaultOpt = Options{ - NoModTime: false, - NoChecksum: false, - NoSeek: false, - DirCacheTime: 5 * 60 * time.Second, - PollInterval: time.Minute, - ReadOnly: false, - Umask: 0, - UID: ^uint32(0), // these values instruct WinFSP-FUSE to use the current user - GID: ^uint32(0), // overriden for non windows in mount_unix.go - DirPerms: os.FileMode(0777) | os.ModeDir, - FilePerms: os.FileMode(0666), + NoModTime: false, + NoChecksum: false, + NoSeek: false, + DirCacheTime: 5 * 60 * time.Second, + PollInterval: time.Minute, + ReadOnly: false, + Umask: 0, + UID: ^uint32(0), // these values instruct WinFSP-FUSE to use the current user + GID: ^uint32(0), // overriden for non windows in mount_unix.go + DirPerms: os.FileMode(0777) | os.ModeDir, + FilePerms: os.FileMode(0666), + CacheMode: CacheModeOff, + CacheMaxAge: 3600 * time.Second, + CachePollInterval: 60 * time.Second, } // Node represents either a directory (*Dir) or a file (*File) @@ -56,6 +60,7 @@ type Node interface { DirEntry() fs.DirEntry VFS() *VFS Open(flags int) (Handle, error) + Truncate(size int64) error } // Check interfaces @@ -84,12 +89,12 @@ var ( _ Noder = (*Dir)(nil) _ Noder = (*ReadFileHandle)(nil) _ Noder = (*WriteFileHandle)(nil) + _ Noder = (*RWFileHandle)(nil) _ Noder = (*DirHandle)(nil) ) -// Handle is the interface statisified by open files or directories. -// It is the methods on *os.File. Not all of them are supported. -type Handle interface { +// OsFiler is the methods on *os.File +type OsFiler interface { Chdir() error Chmod(mode os.FileMode) error Chown(uid, gid int) error @@ -107,10 +112,18 @@ type Handle interface { Write(b []byte) (n int, err error) WriteAt(b []byte, off int64) (n int, err error) WriteString(s string) (n int, err error) +} + +// Handle is the interface statisified by open files or directories. +// It is the methods on *os.File, plus a few more useful for FUSE +// filingsystems. Not all of them are supported. +type Handle interface { + OsFiler // Additional methods useful for FUSE filesystems Flush() error Release() error Node() Node + // Size() int64 } // baseHandle implements all the missing methods @@ -137,34 +150,42 @@ func (h baseHandle) Flush() (err error) { retu func (h baseHandle) Release() (err error) { return ENOSYS } func (h baseHandle) Node() Node { return nil } +//func (h baseHandle) Size() int64 { return 0 } + // Check interfaces var ( - _ Handle = (*baseHandle)(nil) - _ Handle = (*ReadFileHandle)(nil) - _ Handle = (*WriteFileHandle)(nil) - _ Handle = (*DirHandle)(nil) + _ OsFiler = (*os.File)(nil) + _ Handle = (*baseHandle)(nil) + _ Handle = (*ReadFileHandle)(nil) + _ Handle = (*WriteFileHandle)(nil) + _ Handle = (*DirHandle)(nil) ) // VFS represents the top level filing system type VFS struct { - f fs.Fs - root *Dir - Opt Options + f fs.Fs + root *Dir + Opt Options + cache *cache + cancel context.CancelFunc } // Options is options for creating the vfs type Options struct { - NoSeek bool // don't allow seeking if set - NoChecksum bool // don't check checksums if set - ReadOnly bool // if set VFS is read only - NoModTime bool // don't read mod times for files - DirCacheTime time.Duration // how long to consider directory listing cache valid - PollInterval time.Duration - Umask int - UID uint32 - GID uint32 - DirPerms os.FileMode - FilePerms os.FileMode + NoSeek bool // don't allow seeking if set + NoChecksum bool // don't check checksums if set + ReadOnly bool // if set VFS is read only + NoModTime bool // don't read mod times for files + DirCacheTime time.Duration // how long to consider directory listing cache valid + PollInterval time.Duration + Umask int + UID uint32 + GID uint32 + DirPerms os.FileMode + FilePerms os.FileMode + CacheMode CacheMode + CacheMaxAge time.Duration + CachePollInterval time.Duration } // New creates a new VFS and root directory. If opt is nil, then @@ -200,9 +221,32 @@ func New(f fs.Fs, opt *Options) *VFS { fs.Logf(f, "poll-interval is not supported by this remote") } } + + // Create the cache + ctx, cancel := context.WithCancel(context.Background()) + vfs.cancel = cancel + cache, err := newCache(ctx, f, &vfs.Opt) // FIXME pass on context or get from Opt? + if err != nil { + // FIXME + panic(fmt.Sprintf("failed to create local cache: %v", err)) + } + vfs.cache = cache return vfs } +// Shutdown stops any background go-routines +func (vfs *VFS) Shutdown() { + if vfs.cancel != nil { + vfs.cancel() + vfs.cancel = nil + } +} + +// CleanUp deletes the contents of the cache +func (vfs *VFS) CleanUp() error { + return vfs.cache.cleanUp() +} + // Root returns the root node func (vfs *VFS) Root() (*Dir, error) { // fs.Debugf(vfs.f, "Root()") diff --git a/vfs/vfsflags/vfsflags.go b/vfs/vfsflags/vfsflags.go index 265587cb3..5d1468128 100644 --- a/vfs/vfsflags/vfsflags.go +++ b/vfs/vfsflags/vfsflags.go @@ -19,5 +19,8 @@ func AddFlags(flags *pflag.FlagSet) { flags.DurationVarP(&Opt.DirCacheTime, "dir-cache-time", "", Opt.DirCacheTime, "Time to cache directory entries for.") flags.DurationVarP(&Opt.PollInterval, "poll-interval", "", Opt.PollInterval, "Time to wait between polling for changes. Must be smaller than dir-cache-time. Only on supported remotes. Set to 0 to disable.") flags.BoolVarP(&Opt.ReadOnly, "read-only", "", Opt.ReadOnly, "Mount read-only.") + flags.VarP(&Opt.CacheMode, "cache-mode", "", "Cache mode off|minimal|writes|full") + flags.DurationVarP(&Opt.CachePollInterval, "cache-poll-interval", "", Opt.CachePollInterval, "Interval to poll the cache for stale objects.") + flags.DurationVarP(&Opt.CacheMaxAge, "cache-max-age", "", Opt.CacheMaxAge, "Max age of objects in the cache.") platformFlags(flags) }