diff --git a/cmd/mount/dir.go b/cmd/mount/dir.go index 4c7b02340..4b2a9f587 100644 --- a/cmd/mount/dir.go +++ b/cmd/mount/dir.go @@ -5,12 +5,11 @@ package mount import ( "os" "path" - "strings" - "sync" "time" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" + "github.com/ncw/rclone/cmd/mountlib" "github.com/ncw/rclone/fs" "github.com/pkg/errors" "golang.org/x/net/context" @@ -28,187 +27,13 @@ type DirEntry struct { // Dir represents a directory entry type Dir struct { - f fs.Fs - path string - modTime time.Time - mu sync.RWMutex // protects the following - read time.Time // time directory entry last read - items map[string]*DirEntry -} - -func newDir(f fs.Fs, fsDir *fs.Dir) *Dir { - return &Dir{ - f: f, - path: fsDir.Name, - modTime: fsDir.When, - } -} - -// ForgetAll ensures the directory and all its children are purged -// from the cache. -func (d *Dir) ForgetAll() { - d.ForgetPath("") -} - -// ForgetPath clears the cache for itself and all subdirectories if -// they match the given path. The path is specified relative from the -// directory it is called from. -// It is not possible to traverse the directory tree upwards, i.e. -// you cannot clear the cache for the Dir's ancestors or siblings. -func (d *Dir) ForgetPath(relativePath string) { - absPath := path.Join(d.path, relativePath) - if absPath == "." { - absPath = "" - } - - d.walk(absPath, func(dir *Dir) { - fs.Debugf(dir.path, "forgetting directory cache") - dir.read = time.Time{} - dir.items = nil - }) -} - -// walk runs a function on all directories whose path matches -// the given absolute one. It will be called on a directory's -// children first. It will not apply the function to parent -// nodes, regardless of the given path. -func (d *Dir) walk(absPath string, fun func(*Dir)) { - if d.items != nil { - for _, entry := range d.items { - if dir, ok := entry.node.(*Dir); ok { - dir.walk(absPath, fun) - } - } - } - - if d.path == absPath || absPath == "" || strings.HasPrefix(d.path, absPath+"/") { - d.mu.Lock() - defer d.mu.Unlock() - fun(d) - } -} - -// rename should be called after the directory is renamed -// -// Reset the directory to new state, discarding all the objects and -// reading everything again -func (d *Dir) rename(newParent *Dir, fsDir *fs.Dir) { - d.ForgetAll() - d.path = fsDir.Name - d.modTime = fsDir.When - d.read = time.Time{} -} - -// addObject adds a new object or directory to the directory -// -// note that we add new objects rather than updating old ones -func (d *Dir) addObject(o fs.BasicInfo, node fusefs.Node) *DirEntry { - item := &DirEntry{ - o: o, - node: node, - } - d.mu.Lock() - d.items[path.Base(o.Remote())] = item - d.mu.Unlock() - return item -} - -// delObject removes an object from the directory -func (d *Dir) delObject(leaf string) { - d.mu.Lock() - delete(d.items, leaf) - d.mu.Unlock() -} - -// read the directory -func (d *Dir) readDir() error { - d.mu.Lock() - defer d.mu.Unlock() - when := time.Now() - if d.read.IsZero() { - fs.Debugf(d.path, "Reading directory") - } else { - age := when.Sub(d.read) - if age < dirCacheTime { - return nil - } - fs.Debugf(d.path, "Re-reading directory (%v old)", age) - } - entries, err := fs.ListDirSorted(d.f, false, d.path) - if err == fs.ErrorDirNotFound { - // We treat directory not found as empty because we - // create directories on the fly - } else if err != nil { - return err - } - // NB when we re-read a directory after its cache has expired - // we drop the old files which should lead to correct - // behaviour but may not be very efficient. - - // Keep a note of the previous contents of the directory - oldItems := d.items - - // Cache the items by name - d.items = make(map[string]*DirEntry, len(entries)) - for _, entry := range entries { - switch item := entry.(type) { - case fs.Object: - obj := item - name := path.Base(obj.Remote()) - d.items[name] = &DirEntry{ - o: obj, - node: nil, - } - case *fs.Dir: - dir := item - name := path.Base(dir.Remote()) - // Use old dir value if it exists - if oldItem, ok := oldItems[name]; ok { - if _, ok := oldItem.o.(*fs.Dir); ok { - d.items[name] = oldItem - continue - } - } - d.items[name] = &DirEntry{ - o: dir, - node: nil, - } - default: - err = errors.Errorf("unknown type %T", item) - fs.Errorf(d.path, "readDir error: %v", err) - return err - } - } - d.read = when - return nil -} - -// lookup a single item in the directory -// -// returns fuse.ENOENT if not found. -func (d *Dir) lookup(leaf string) (*DirEntry, error) { - err := d.readDir() - if err != nil { - return nil, err - } - d.mu.RLock() - item, ok := d.items[leaf] - d.mu.RUnlock() - if !ok { - return nil, fuse.ENOENT - } - return item, nil -} - -// Check to see if a directory is empty -func (d *Dir) isEmpty() (bool, error) { - err := d.readDir() - if err != nil { - return false, err - } - d.mu.RLock() - defer d.mu.RUnlock() - return len(d.items) == 0, nil + *mountlib.Dir + // f fs.Fs + // path string + // modTime time.Time + // mu sync.RWMutex // protects the following + // read time.Time // time directory entry last read + // items map[string]*DirEntry } // Check interface satsified @@ -219,12 +44,13 @@ func (d *Dir) Attr(ctx context.Context, a *fuse.Attr) error { a.Gid = gid a.Uid = uid a.Mode = os.ModeDir | dirPerms - a.Atime = d.modTime - a.Mtime = d.modTime - a.Ctime = d.modTime - a.Crtime = d.modTime + modTime := d.ModTime() + a.Atime = modTime + a.Mtime = modTime + a.Ctime = modTime + a.Crtime = modTime // FIXME include Valid so get some caching? - fs.Debugf(d.path, "Dir.Attr %+v", a) + // FIXME fs.Debugf(d.path, "Dir.Attr %+v", a) return nil } @@ -232,46 +58,18 @@ func (d *Dir) Attr(ctx context.Context, a *fuse.Attr) error { var _ fusefs.NodeSetattrer = (*Dir)(nil) // Setattr handles attribute changes from FUSE. Currently supports ModTime only. -func (d *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { +func (d *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) { if noModTime { return nil } - d.mu.Lock() - defer d.mu.Unlock() - if req.Valid.MtimeNow() { - d.modTime = time.Now() + err = d.SetModTime(time.Now()) } else if req.Valid.Mtime() { - d.modTime = req.Mtime + err = d.SetModTime(req.Mtime) } - return nil -} - -// lookupNode calls lookup then makes sure the node is not nil in the DirEntry -func (d *Dir) lookupNode(leaf string) (item *DirEntry, err error) { - item, err = d.lookup(leaf) - if err != nil { - return nil, err - } - if item.node != nil { - return item, nil - } - var node fusefs.Node - switch x := item.o.(type) { - case fs.Object: - node, err = newFile(d, x), nil - case *fs.Dir: - node, err = newDir(d.f, x), nil - default: - err = errors.Errorf("unknown type %T", item) - } - if err != nil { - return nil, err - } - item = d.addObject(item.o, node) - return item, nil + return translateError(err) } // Check interface satisfied @@ -284,17 +82,17 @@ var _ fusefs.NodeRequestLookuper = (*Dir)(nil) // // Lookup need not to handle the names "." and "..". func (d *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fusefs.Node, err error) { - path := path.Join(d.path, req.Name) - fs.Debugf(path, "Dir.Lookup") - item, err := d.lookupNode(req.Name) + mnode, err := d.Dir.Lookup(req.Name) if err != nil { - if err != fuse.ENOENT { - fs.Errorf(path, "Dir.Lookup error: %v", err) - } - return nil, err + return nil, translateError(err) } - fs.Debugf(path, "Dir.Lookup OK") - return item.node, nil + switch x := mnode.(type) { + case *mountlib.File: + return &File{x}, nil + case *mountlib.Dir: + return &Dir{x}, nil + } + panic("bad type") } // Check interface satisfied @@ -302,17 +100,13 @@ var _ fusefs.HandleReadDirAller = (*Dir)(nil) // ReadDirAll reads the contents of the directory func (d *Dir) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) { - fs.Debugf(d.path, "Dir.ReadDirAll") - err = d.readDir() + items, err := d.Dir.ReadDirAll() if err != nil { - fs.Debugf(d.path, "Dir.ReadDirAll error: %v", err) - return nil, err + return nil, translateError(err) } - d.mu.RLock() - defer d.mu.RUnlock() - for _, item := range d.items { + for _, item := range items { var dirent fuse.Dirent - switch x := item.o.(type) { + switch x := item.Obj.(type) { case fs.Object: dirent = fuse.Dirent{ // Inode FIXME ??? @@ -326,13 +120,10 @@ func (d *Dir) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) Name: path.Base(x.Remote()), } default: - err = errors.Errorf("unknown type %T", item) - fs.Errorf(d.path, "Dir.ReadDirAll error: %v", err) - return nil, err + return nil, errors.Errorf("unknown type %T", item) } dirents = append(dirents, dirent) } - fs.Debugf(d.path, "Dir.ReadDirAll OK with %d entries", len(dirents)) return dirents, nil } @@ -340,39 +131,22 @@ var _ fusefs.NodeCreater = (*Dir)(nil) // Create makes a new file func (d *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fusefs.Node, fusefs.Handle, error) { - path := path.Join(d.path, req.Name) - fs.Debugf(path, "Dir.Create") - src := newCreateInfo(d.f, path) - // This gets added to the directory when the file is written - file := newFile(d, nil) - fh, err := newWriteFileHandle(d, file, src) + file, fh, err := d.Dir.Create(req.Name) if err != nil { - fs.Errorf(path, "Dir.Create error: %v", err) - return nil, nil, err + return nil, nil, translateError(err) } - fs.Debugf(path, "Dir.Create OK") - return file, fh, nil + return &File{file}, &WriteFileHandle{fh}, err } var _ fusefs.NodeMkdirer = (*Dir)(nil) // Mkdir creates a new directory func (d *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fusefs.Node, error) { - path := path.Join(d.path, req.Name) - fs.Debugf(path, "Dir.Mkdir") - err := d.f.Mkdir(path) + dir, err := d.Dir.Mkdir(req.Name) if err != nil { - fs.Errorf(path, "Dir.Mkdir failed to create directory: %v", err) - return nil, err + return nil, translateError(err) } - fsDir := &fs.Dir{ - Name: path, - When: time.Now(), - } - dir := newDir(d.f, fsDir) - d.addObject(fsDir, dir) - fs.Debugf(path, "Dir.Mkdir OK") - return dir, nil + return &Dir{dir}, nil } var _ fusefs.NodeRemover = (*Dir)(nil) @@ -381,46 +155,10 @@ var _ fusefs.NodeRemover = (*Dir)(nil) // the receiver, which must be a directory. The entry to be removed // may correspond to a file (unlink) or to a directory (rmdir). func (d *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { - path := path.Join(d.path, req.Name) - fs.Debugf(path, "Dir.Remove") - item, err := d.lookupNode(req.Name) + err := d.Dir.Remove(req.Name) if err != nil { - fs.Errorf(path, "Dir.Remove error: %v", err) - return err + return translateError(err) } - switch x := item.o.(type) { - case fs.Object: - err = x.Remove() - if err != nil { - fs.Errorf(path, "Dir.Remove file error: %v", err) - return err - } - case *fs.Dir: - // Check directory is empty first - dir := item.node.(*Dir) - empty, err := dir.isEmpty() - if err != nil { - fs.Errorf(path, "Dir.Remove dir error: %v", err) - return err - } - if !empty { - // return fuse.ENOTEMPTY - doesn't exist though so use EEXIST - fs.Errorf(path, "Dir.Remove not empty") - return fuse.EEXIST - } - // remove directory - err = d.f.Rmdir(path) - if err != nil { - fs.Errorf(path, "Dir.Remove failed to remove directory: %v", err) - return err - } - default: - fs.Errorf(path, "Dir.Remove unknown type %T", item) - return errors.Errorf("unknown type %T", item) - } - // Remove the item from the directory listing - d.delObject(req.Name) - fs.Debugf(path, "Dir.Remove OK") return nil } @@ -429,82 +167,16 @@ var _ fusefs.NodeRenamer = (*Dir)(nil) // Rename the file func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fusefs.Node) error { - oldPath := path.Join(d.path, req.OldName) destDir, ok := newDir.(*Dir) if !ok { - err := errors.Errorf("Unknown Dir type %T", newDir) - fs.Errorf(oldPath, "Dir.Rename error: %v", err) - return err + return errors.Errorf("Unknown Dir type %T", newDir) } - newPath := path.Join(destDir.path, req.NewName) - fs.Debugf(oldPath, "Dir.Rename to %q", newPath) - oldItem, err := d.lookupNode(req.OldName) + + err := d.Dir.Rename(req.OldName, req.NewName, destDir.Dir) if err != nil { - fs.Errorf(oldPath, "Dir.Rename error: %v", err) - return err - } - var newObj fs.BasicInfo - oldNode := oldItem.node - switch x := oldItem.o.(type) { - case fs.Object: - oldObject := x - // FIXME: could Copy then Delete if Move not available - // - though care needed if case insensitive... - doMove := d.f.Features().Move - if doMove == nil { - err := errors.Errorf("Fs %q can't rename files (no Move)", d.f) - fs.Errorf(oldPath, "Dir.Rename error: %v", err) - return err - } - newObject, err := doMove(oldObject, newPath) - if err != nil { - fs.Errorf(oldPath, "Dir.Rename error: %v", err) - return err - } - newObj = newObject - // Update the node with the new details - if oldNode != nil { - if oldFile, ok := oldNode.(*File); ok { - fs.Debugf(oldItem.o, "Updating file with %v %p", newObject, oldFile) - oldFile.rename(destDir, newObject) - } - } - case *fs.Dir: - doDirMove := d.f.Features().DirMove - if doDirMove == nil { - err := errors.Errorf("Fs %q can't rename directories (no DirMove)", d.f) - fs.Errorf(oldPath, "Dir.Rename error: %v", err) - return err - } - srcRemote := x.Name - dstRemote := newPath - err = doDirMove(d.f, srcRemote, dstRemote) - if err != nil { - fs.Errorf(oldPath, "Dir.Rename error: %v", err) - return err - } - newDir := new(fs.Dir) - *newDir = *x - newDir.Name = newPath - newObj = newDir - // Update the node with the new details - if oldNode != nil { - if oldDir, ok := oldNode.(*Dir); ok { - fs.Debugf(oldItem.o, "Updating dir with %v %p", newDir, oldDir) - oldDir.rename(destDir, newDir) - } - } - default: - err = errors.Errorf("unknown type %T", oldItem) - fs.Errorf(d.path, "Dir.ReadDirAll error: %v", err) - return err + return translateError(err) } - // Show moved - delete from old dir and add to new - d.delObject(req.OldName) - destDir.addObject(newObj, oldNode) - - fs.Debugf(newPath, "Dir.Rename renamed from %q", oldPath) return nil } @@ -512,8 +184,10 @@ func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fusefs var _ fusefs.NodeFsyncer = (*Dir)(nil) // Fsync the directory -// -// Note that we don't do anything except return OK func (d *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + err := d.Dir.Fsync() + if err != nil { + return translateError(err) + } return nil } diff --git a/cmd/mount/file.go b/cmd/mount/file.go index 951fb6284..f48c284fc 100644 --- a/cmd/mount/file.go +++ b/cmd/mount/file.go @@ -3,48 +3,24 @@ package mount import ( - "sync" - "sync/atomic" "time" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" - "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/cmd/mountlib" "github.com/pkg/errors" "golang.org/x/net/context" ) // File represents a file type File struct { - size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned - d *Dir // parent directory - read only - mu sync.RWMutex // protects the following - o fs.Object // NB o may be nil if file is being written - writers int // number of writers for this file - pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written -} - -// newFile creates a new File -func newFile(d *Dir, o fs.Object) *File { - return &File{ - d: d, - o: o, - } -} - -// rename should be called to update f.o and f.d after a rename -func (f *File) rename(d *Dir, o fs.Object) { - f.mu.Lock() - f.o = o - f.d = d - f.mu.Unlock() -} - -// addWriters increments or decrements the writers -func (f *File) addWriters(n int) { - f.mu.Lock() - f.writers += n - f.mu.Unlock() + *mountlib.File + // size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned + // d *Dir // parent directory - read only + // mu sync.RWMutex // protects the following + // o fs.Object // NB o may be nil if file is being written + // writers int // number of writers for this file + // pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written } // Check interface satisfied @@ -52,32 +28,19 @@ var _ fusefs.Node = (*File)(nil) // Attr fills out the attributes for the file func (f *File) Attr(ctx context.Context, a *fuse.Attr) error { - f.mu.Lock() - defer f.mu.Unlock() + modTime, Size, Blocks, err := f.File.Attr(noModTime) + if err != nil { + return translateError(err) + } a.Gid = gid a.Uid = uid a.Mode = filePerms - // if o is nil it isn't valid yet, so return the size so far - if f.o == nil { - a.Size = uint64(atomic.LoadInt64(&f.size)) - if !noModTime && !f.pendingModTime.IsZero() { - a.Atime = f.pendingModTime - a.Mtime = f.pendingModTime - a.Ctime = f.pendingModTime - a.Crtime = f.pendingModTime - } - } else { - a.Size = uint64(f.o.Size()) - if !noModTime { - modTime := f.o.ModTime() - a.Atime = modTime - a.Mtime = modTime - a.Ctime = modTime - a.Crtime = modTime - } - } - a.Blocks = (a.Size + 511) / 512 - fs.Debugf(f.o, "File.Attr %+v", a) + a.Size = Size + a.Atime = modTime + a.Mtime = modTime + a.Ctime = modTime + a.Crtime = modTime + a.Blocks = Blocks return nil } @@ -89,83 +52,13 @@ func (f *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse if noModTime { return nil } - - f.mu.Lock() - defer f.mu.Unlock() - + var err error if req.Valid.MtimeNow() { - f.pendingModTime = time.Now() + err = f.File.SetModTime(time.Now()) } else if req.Valid.Mtime() { - f.pendingModTime = req.Mtime + err = f.File.SetModTime(req.Mtime) } - - if f.o != nil { - return f.applyPendingModTime() - } - - // queue up for later, hoping f.o becomes available - return nil -} - -// call with the mutex held -func (f *File) applyPendingModTime() error { - defer func() { f.pendingModTime = time.Time{} }() - - if f.pendingModTime.IsZero() { - return nil - } - - if f.o == nil { - return errors.New("Cannot apply ModTime, file object is not available") - } - - err := f.o.SetModTime(f.pendingModTime) - switch err { - case nil: - fs.Debugf(f.o, "File.applyPendingModTime OK") - case fs.ErrorCantSetModTime: - // do nothing, in order to not break "touch somefile" if it exists already - default: - fs.Errorf(f.o, "File.applyPendingModTime error: %v", err) - return err - } - - return nil -} - -// Update the size while writing -func (f *File) written(n int64) { - atomic.AddInt64(&f.size, n) -} - -// Update the object when written -func (f *File) setObject(o fs.Object) { - f.mu.Lock() - defer f.mu.Unlock() - f.o = o - _ = f.applyPendingModTime() - f.d.addObject(o, f) -} - -// Wait for f.o to become non nil for a short time returning it or an -// error -// -// Call without the mutex held -func (f *File) waitForValidObject() (o fs.Object, err error) { - for i := 0; i < 50; i++ { - f.mu.Lock() - o = f.o - writers := f.writers - f.mu.Unlock() - if o != nil { - return o, nil - } - if writers == 0 { - return nil, errors.New("can't open file - writer failed") - } - time.Sleep(100 * time.Millisecond) - } - return nil, fuse.ENOENT + return translateError(err) } // Check interface satisfied @@ -173,25 +66,19 @@ var _ fusefs.NodeOpener = (*File)(nil) // Open the file for read or write func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fh fusefs.Handle, err error) { - // if o is nil it isn't valid yet - o, err := f.waitForValidObject() - if err != nil { - return nil, err - } - fs.Debugf(o, "File.Open %v", req.Flags) - switch { case req.Flags.IsReadOnly(): if noSeek { resp.Flags |= fuse.OpenNonSeekable } - fh, err = newReadFileHandle(o) - err = errors.Wrap(err, "open for read") + var rfh *mountlib.ReadFileHandle + rfh, err = f.File.OpenRead() + fh = &ReadFileHandle{rfh} case req.Flags.IsWriteOnly() || (req.Flags.IsReadWrite() && (req.Flags&fuse.OpenTruncate) != 0): resp.Flags |= fuse.OpenNonSeekable - src := newCreateInfo(f.d.f, o.Remote()) - fh, err = newWriteFileHandle(f.d, f, src) - err = errors.Wrap(err, "open for write") + var wfh *mountlib.WriteFileHandle + wfh, err = f.File.OpenWrite() + fh = &WriteFileHandle{wfh} case req.Flags.IsReadWrite(): err = errors.New("can't open for read and write simultaneously") default: @@ -211,8 +98,7 @@ func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR */ if err != nil { - fs.Errorf(o, "File.Open failed: %v", err) - return nil, err + return nil, translateError(err) } return fh, nil } diff --git a/cmd/mount/fs.go b/cmd/mount/fs.go index e018a3c2a..206892da0 100644 --- a/cmd/mount/fs.go +++ b/cmd/mount/fs.go @@ -5,114 +5,42 @@ package mount import ( - "os" - "os/signal" - "syscall" - "time" - "bazil.org/fuse" fusefs "bazil.org/fuse/fs" + "github.com/ncw/rclone/cmd/mountlib" "github.com/ncw/rclone/fs" + "github.com/pkg/errors" "golang.org/x/net/context" ) // FS represents the top level filing system type FS struct { - f fs.Fs - rootDir *Dir + *mountlib.FS + f fs.Fs } // Check interface satistfied var _ fusefs.FS = (*FS)(nil) +// NewFS makes a new FS +func NewFS(f fs.Fs) *FS { + fsys := &FS{ + FS: mountlib.NewFS(f), + f: f, + } + if noSeek { + fsys.FS.NoSeek() + } + return fsys +} + // Root returns the root node func (f *FS) Root() (fusefs.Node, error) { - fs.Debugf(f.f, "Root()") - if f.rootDir == nil { - fsDir := &fs.Dir{ - Name: "", - When: time.Now(), - } - f.rootDir = newDir(f.f, fsDir) - } - return f.rootDir, nil -} - -// mountOptions configures the options from the command line flags -func mountOptions(device string) (options []fuse.MountOption) { - options = []fuse.MountOption{ - fuse.MaxReadahead(uint32(maxReadAhead)), - fuse.Subtype("rclone"), - fuse.FSName(device), fuse.VolumeName(device), - fuse.NoAppleDouble(), - fuse.NoAppleXattr(), - - // Options from benchmarking in the fuse module - //fuse.MaxReadahead(64 * 1024 * 1024), - //fuse.AsyncRead(), - FIXME this causes - // ReadFileHandle.Read error: read /home/files/ISOs/xubuntu-15.10-desktop-amd64.iso: bad file descriptor - // which is probably related to errors people are having - //fuse.WritebackCache(), - } - if allowNonEmpty { - options = append(options, fuse.AllowNonEmptyMount()) - } - if allowOther { - options = append(options, fuse.AllowOther()) - } - if allowRoot { - options = append(options, fuse.AllowRoot()) - } - if defaultPermissions { - options = append(options, fuse.DefaultPermissions()) - } - if readOnly { - options = append(options, fuse.ReadOnly()) - } - if writebackCache { - options = append(options, fuse.WritebackCache()) - } - return options -} - -// mount the file system -// -// The mount point will be ready when this returns. -// -// returns an error, and an error channel for the serve process to -// report an error when fusermount is called. -func mount(f fs.Fs, mountpoint string) (*FS, <-chan error, error) { - fs.Debugf(f, "Mounting on %q", mountpoint) - - filesys := &FS{ - f: f, - } - - c, err := fuse.Mount(mountpoint, mountOptions(f.Name()+":"+f.Root())...) + root, err := f.FS.Root() if err != nil { - return filesys, nil, err + return nil, translateError(err) } - server := fusefs.New(c, nil) - - // Serve the mount point in the background returning error to errChan - errChan := make(chan error, 1) - go func() { - err := server.Serve(filesys) - closeErr := c.Close() - if err == nil { - err = closeErr - } - errChan <- err - }() - - // check if the mount process has an error to report - <-c.Ready - if err := c.MountError; err != nil { - return filesys, nil, err - } - - filesys.startSignalHandler() - return filesys, errChan, nil + return &Dir{root}, nil } // Check interface satsified @@ -134,15 +62,21 @@ func (f *FS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.Sta return nil } -func (f *FS) startSignalHandler() { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGHUP) - go func() { - for { - <-sigChan - if f.rootDir != nil { - f.rootDir.ForgetAll() - } +// Translate errors from mountlib +func translateError(err error) error { + if err == nil { + return nil + } + cause := errors.Cause(err) + if mErr, ok := cause.(mountlib.Error); ok { + switch mErr { + case mountlib.ENOENT: + return fuse.ENOENT + case mountlib.ENOTEMPTY: + return fuse.EEXIST // return fuse.ENOTEMPTY - doesn't exist though so use EEXIST + case mountlib.EEXIST: + return fuse.EEXIST } - }() + } + return err } diff --git a/cmd/mount/mount.go b/cmd/mount/mount.go index 6a6212cdb..a6f45824e 100644 --- a/cmd/mount/mount.go +++ b/cmd/mount/mount.go @@ -12,6 +12,7 @@ import ( "time" "bazil.org/fuse" + fusefs "bazil.org/fuse/fs" "github.com/ncw/rclone/cmd" "github.com/ncw/rclone/fs" "github.com/pkg/errors" @@ -154,6 +155,83 @@ like this: }, } +// mountOptions configures the options from the command line flags +func mountOptions(device string) (options []fuse.MountOption) { + options = []fuse.MountOption{ + fuse.MaxReadahead(uint32(maxReadAhead)), + fuse.Subtype("rclone"), + fuse.FSName(device), fuse.VolumeName(device), + fuse.NoAppleDouble(), + fuse.NoAppleXattr(), + + // Options from benchmarking in the fuse module + //fuse.MaxReadahead(64 * 1024 * 1024), + //fuse.AsyncRead(), - FIXME this causes + // ReadFileHandle.Read error: read /home/files/ISOs/xubuntu-15.10-desktop-amd64.iso: bad file descriptor + // which is probably related to errors people are having + //fuse.WritebackCache(), + } + if allowNonEmpty { + options = append(options, fuse.AllowNonEmptyMount()) + } + if allowOther { + options = append(options, fuse.AllowOther()) + } + if allowRoot { + options = append(options, fuse.AllowRoot()) + } + if defaultPermissions { + options = append(options, fuse.DefaultPermissions()) + } + if readOnly { + options = append(options, fuse.ReadOnly()) + } + if writebackCache { + options = append(options, fuse.WritebackCache()) + } + return options +} + +// mount the file system +// +// The mount point will be ready when this returns. +// +// returns an error, and an error channel for the serve process to +// report an error when fusermount is called. +func mount(f fs.Fs, mountpoint string) (<-chan error, func() error, error) { + fs.Debugf(f, "Mounting on %q", mountpoint) + c, err := fuse.Mount(mountpoint, mountOptions(f.Name()+":"+f.Root())...) + if err != nil { + return nil, nil, err + } + + filesys := NewFS(f) + server := fusefs.New(c, nil) + + // Serve the mount point in the background returning error to errChan + errChan := make(chan error, 1) + go func() { + err := server.Serve(filesys) + closeErr := c.Close() + if err == nil { + err = closeErr + } + errChan <- err + }() + + // check if the mount process has an error to report + <-c.Ready + if err := c.MountError; err != nil { + return nil, nil, err + } + + unmount := func() error { + return fuse.Unmount(mountpoint) + } + + return errChan, unmount, nil +} + // Mount mounts the remote at mountpoint. // // If noModTime is set then it @@ -175,7 +253,7 @@ func Mount(f fs.Fs, mountpoint string) error { } // Mount it - _, errChan, err := mount(f, mountpoint) + errChan, unmount, err := mount(f, mountpoint) if err != nil { return errors.Wrap(err, "failed to mount FUSE fs") } @@ -189,7 +267,7 @@ func Mount(f fs.Fs, mountpoint string) error { break // Program abort: umount case <-sigChan: - err = fuse.Unmount(mountpoint) + err = unmount() } if err != nil { diff --git a/cmd/mount/mount_test.go b/cmd/mount/mount_test.go new file mode 100644 index 000000000..90dde768f --- /dev/null +++ b/cmd/mount/mount_test.go @@ -0,0 +1,28 @@ +package mount + +import ( + "testing" + + "github.com/ncw/rclone/cmd/mountlib/mounttest" +) + +func TestMain(m *testing.M) { mounttest.TestMain(m, mount, dirPerms, filePerms) } +func TestDirLs(t *testing.T) { mounttest.TestDirLs(t) } +func TestDirCreateAndRemoveDir(t *testing.T) { mounttest.TestDirCreateAndRemoveDir(t) } +func TestDirCreateAndRemoveFile(t *testing.T) { mounttest.TestDirCreateAndRemoveFile(t) } +func TestDirRenameFile(t *testing.T) { mounttest.TestDirRenameFile(t) } +func TestDirRenameEmptyDir(t *testing.T) { mounttest.TestDirRenameEmptyDir(t) } +func TestDirRenameFullDir(t *testing.T) { mounttest.TestDirRenameFullDir(t) } +func TestDirModTime(t *testing.T) { mounttest.TestDirModTime(t) } +func TestFileModTime(t *testing.T) { mounttest.TestFileModTime(t) } +func TestFileModTimeWithOpenWriters(t *testing.T) { mounttest.TestFileModTimeWithOpenWriters(t) } +func TestMount(t *testing.T) { mounttest.TestMount(t) } +func TestRoot(t *testing.T) { mounttest.TestRoot(t) } +func TestReadByByte(t *testing.T) { mounttest.TestReadByByte(t) } +func TestReadFileDoubleClose(t *testing.T) { mounttest.TestReadFileDoubleClose(t) } +func TestReadSeek(t *testing.T) { mounttest.TestReadSeek(t) } +func TestWriteFileNoWrite(t *testing.T) { mounttest.TestWriteFileNoWrite(t) } +func TestWriteFileWrite(t *testing.T) { mounttest.TestWriteFileWrite(t) } +func TestWriteFileOverwrite(t *testing.T) { mounttest.TestWriteFileOverwrite(t) } +func TestWriteFileDoubleClose(t *testing.T) { mounttest.TestWriteFileDoubleClose(t) } +func TestWriteFileFsync(t *testing.T) { mounttest.TestWriteFileFsync(t) } diff --git a/cmd/mount/read.go b/cmd/mount/read.go index 2fcb7ea41..8d6da0db5 100644 --- a/cmd/mount/read.go +++ b/cmd/mount/read.go @@ -3,48 +3,21 @@ package mount import ( - "io" - "sync" - "bazil.org/fuse" fusefs "bazil.org/fuse/fs" - "github.com/ncw/rclone/fs" - "github.com/pkg/errors" + "github.com/ncw/rclone/cmd/mountlib" "golang.org/x/net/context" ) // ReadFileHandle is an open for read file handle on a File type ReadFileHandle struct { - mu sync.Mutex - closed bool // set if handle has been closed - r *fs.Account - o fs.Object - readCalled bool // set if read has been called - offset int64 - hash *fs.MultiHasher -} - -func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) { - r, err := o.Open() - if err != nil { - return nil, err - } - - var hash *fs.MultiHasher - if !noChecksum { - hash, err = fs.NewMultiHasherTypes(o.Fs().Hashes()) - if err != nil { - fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err) - } - } - - fh := &ReadFileHandle{ - o: o, - r: fs.NewAccount(r, o).WithBuffer(), // account the transfer - hash: hash, - } - fs.Stats.Transferring(fh.o.Remote()) - return fh, nil + *mountlib.ReadFileHandle + // mu sync.Mutex + // closed bool // set if handle has been closed + // r *fs.Account + // o fs.Object + // readCalled bool // set if read has been called + // offset int64 } // Check interface satisfied @@ -53,173 +26,24 @@ var _ fusefs.Handle = (*ReadFileHandle)(nil) // Check interface satisfied var _ fusefs.HandleReader = (*ReadFileHandle)(nil) -// seek to a new offset -// -// if reopen is true, then we won't attempt to use an io.Seeker interface -// -// Must be called with fh.mu held -func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { - fh.r.StopBuffering() // stop the background reading first - fh.hash = nil - oldReader := fh.r.GetReader() - r := oldReader - // Can we seek it directly? - if do, ok := oldReader.(io.Seeker); !reopen && ok { - fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) - _, err = do.Seek(offset, 0) - if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err) - return err - } - } else { - fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset) - // close old one - err = oldReader.Close() - if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) - } - // re-open with a seek - r, err = fh.o.Open(&fs.SeekOption{Offset: offset}) - if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err) - return err - } - } - fh.r.UpdateReader(r) - fh.offset = offset - return nil -} - // Read from the file handle func (fh *ReadFileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) (err error) { - fh.mu.Lock() - defer fh.mu.Unlock() - fs.Debugf(fh.o, "ReadFileHandle.Read size %d offset %d", req.Size, req.Offset) - if fh.closed { - fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", errClosedFileHandle) - return errClosedFileHandle - } - doSeek := req.Offset != fh.offset - var n int - var newOffset int64 - retries := 0 - buf := make([]byte, req.Size) - doReopen := false - for { - if doSeek { - // Are we attempting to seek beyond the end of the - // file - if so just return EOF leaving the underlying - // file in an unchanged state. - if req.Offset >= fh.o.Size() { - fs.Debugf(fh.o, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", req.Offset, fh.o.Size()) - resp.Data = nil - return nil - } - // Otherwise do the seek - err = fh.seek(req.Offset, doReopen) - } else { - err = nil - } - if err == nil { - if req.Size > 0 { - fh.readCalled = true - } - // One exception to the above is if we fail to fully populate a - // page cache page; a read into page cache is always page aligned. - // Make sure we never serve a partial read, to avoid that. - n, err = io.ReadFull(fh.r, buf) - newOffset = fh.offset + int64(n) - // if err == nil && rand.Intn(10) == 0 { - // err = errors.New("random error") - // } - if err == nil { - break - } else if (err == io.ErrUnexpectedEOF || err == io.EOF) && newOffset == fh.o.Size() { - // Have read to end of file - reset error - err = nil - break - } - } - if retries >= fs.Config.LowLevelRetries { - break - } - retries++ - fs.Errorf(fh.o, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err) - doSeek = true - doReopen = true - } + data, err := fh.ReadFileHandle.Read(int64(req.Size), req.Offset) if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", err) - } else { - resp.Data = buf[:n] - fh.offset = newOffset - fs.Debugf(fh.o, "ReadFileHandle.Read OK") - - if fh.hash != nil { - _, err = fh.hash.Write(resp.Data) - if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err) - return err - } - } + return translateError(err) } - return err -} - -// close the file handle returning errClosedFileHandle if it has been -// closed already. -// -// Must be called with fh.mu held -func (fh *ReadFileHandle) close() error { - if fh.closed { - return errClosedFileHandle - } - fh.closed = true - fs.Stats.DoneTransferring(fh.o.Remote(), true) - - if err := fh.checkHash(); err != nil { - return err - } - - return fh.r.Close() + resp.Data = data + return nil } // Check interface satisfied var _ fusefs.HandleFlusher = (*ReadFileHandle)(nil) -func (fh *ReadFileHandle) checkHash() error { - if fh.hash == nil || !fh.readCalled || fh.offset < fh.o.Size() { - return nil - } - - for hashType, dstSum := range fh.hash.Sums() { - srcSum, err := fh.o.Hash(hashType) - if err != nil { - return err - } - if !fs.HashEquals(dstSum, srcSum) { - return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum) - } - } - - return nil -} - // Flush is called each time the file or directory is closed. // Because there can be multiple file descriptors referring to a // single opened file, Flush can be called multiple times. func (fh *ReadFileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { - fh.mu.Lock() - defer fh.mu.Unlock() - fs.Debugf(fh.o, "ReadFileHandle.Flush") - - if err := fh.checkHash(); err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err) - return err - } - - fs.Debugf(fh.o, "ReadFileHandle.Flush OK") - return nil + return translateError(fh.ReadFileHandle.Flush()) } var _ fusefs.HandleReleaser = (*ReadFileHandle)(nil) @@ -229,18 +53,5 @@ var _ fusefs.HandleReleaser = (*ReadFileHandle)(nil) // It isn't called directly from userspace so the error is ignored by // the kernel func (fh *ReadFileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - fh.mu.Lock() - defer fh.mu.Unlock() - if fh.closed { - fs.Debugf(fh.o, "ReadFileHandle.Release nothing to do") - return nil - } - fs.Debugf(fh.o, "ReadFileHandle.Release closing") - err := fh.close() - if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Release error: %v", err) - } else { - fs.Debugf(fh.o, "ReadFileHandle.Release OK") - } - return err + return translateError(fh.ReadFileHandle.Release()) } diff --git a/cmd/mount/write.go b/cmd/mount/write.go index a285edc07..78688d1d5 100644 --- a/cmd/mount/write.go +++ b/cmd/mount/write.go @@ -3,13 +3,11 @@ package mount import ( - "io" - "sync" + "errors" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" - "github.com/ncw/rclone/fs" - "github.com/pkg/errors" + "github.com/ncw/rclone/cmd/mountlib" "golang.org/x/net/context" ) @@ -17,120 +15,25 @@ var errClosedFileHandle = errors.New("Attempt to use closed file handle") // WriteFileHandle is an open for write handle on a File type WriteFileHandle struct { - mu sync.Mutex - closed bool // set if handle has been closed - remote string - pipeReader *io.PipeReader - pipeWriter *io.PipeWriter - o fs.Object - result chan error - file *File - writeCalled bool // set the first time Write() is called - hash *fs.MultiHasher + *mountlib.WriteFileHandle } // Check interface satisfied var _ fusefs.Handle = (*WriteFileHandle)(nil) -func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) { - var hash *fs.MultiHasher - if !noChecksum { - var err error - hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes()) - if err != nil { - fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err) - } - } - - fh := &WriteFileHandle{ - remote: src.Remote(), - result: make(chan error, 1), - file: f, - hash: hash, - } - - fh.pipeReader, fh.pipeWriter = io.Pipe() - r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer - go func() { - o, err := d.f.Put(r, src) - fh.o = o - fh.result <- err - }() - fh.file.addWriters(1) - fs.Stats.Transferring(fh.remote) - return fh, nil -} - // Check interface satisfied var _ fusefs.HandleWriter = (*WriteFileHandle)(nil) // Write data to the file handle func (fh *WriteFileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - fs.Debugf(fh.remote, "WriteFileHandle.Write len=%d", len(req.Data)) - fh.mu.Lock() - defer fh.mu.Unlock() - if fh.closed { - fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", errClosedFileHandle) - return errClosedFileHandle - } - fh.writeCalled = true - // FIXME should probably check the file isn't being seeked? - n, err := fh.pipeWriter.Write(req.Data) - resp.Size = n - fh.file.written(int64(n)) + n, err := fh.WriteFileHandle.Write(req.Data, req.Offset) if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", err) - return err - } - fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) - if fh.hash != nil { - _, err = fh.hash.Write(req.Data) - if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err) - return err - } + return translateError(err) } + resp.Size = int(n) return nil } -// close the file handle returning errClosedFileHandle if it has been -// closed already. -// -// Must be called with fh.mu held -func (fh *WriteFileHandle) close() error { - if fh.closed { - return errClosedFileHandle - } - fh.closed = true - fs.Stats.DoneTransferring(fh.remote, true) - fh.file.addWriters(-1) - writeCloseErr := fh.pipeWriter.Close() - err := <-fh.result - readCloseErr := fh.pipeReader.Close() - if err == nil { - fh.file.setObject(fh.o) - err = writeCloseErr - } - if err == nil { - err = readCloseErr - } - if err == nil && fh.hash != nil { - for hashType, srcSum := range fh.hash.Sums() { - dstSum, err := fh.o.Hash(hashType) - if err != nil { - return err - } - if !fs.HashEquals(srcSum, dstSum) { - return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum) - } - } - } - return err -} - -// Check interface satisfied -var _ fusefs.HandleFlusher = (*WriteFileHandle)(nil) - // Flush is called on each close() of a file descriptor. So if a // filesystem wants to return write errors in close() and the file has // cached dirty data, this is a good place to write back data and @@ -147,23 +50,7 @@ var _ fusefs.HandleFlusher = (*WriteFileHandle)(nil) // Filesystems shouldn't assume that flush will always be called after // some writes, or that if will be called at all. func (fh *WriteFileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { - fh.mu.Lock() - defer fh.mu.Unlock() - fs.Debugf(fh.remote, "WriteFileHandle.Flush") - // If Write hasn't been called then ignore the Flush - Release - // will pick it up - if !fh.writeCalled { - fs.Debugf(fh.remote, "WriteFileHandle.Flush ignoring flush on unwritten handle") - return nil - - } - err := fh.close() - if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.Flush error: %v", err) - } else { - fs.Debugf(fh.remote, "WriteFileHandle.Flush OK") - } - return err + return translateError(fh.WriteFileHandle.Flush()) } var _ fusefs.HandleReleaser = (*WriteFileHandle)(nil) @@ -173,18 +60,5 @@ var _ fusefs.HandleReleaser = (*WriteFileHandle)(nil) // It isn't called directly from userspace so the error is ignored by // the kernel func (fh *WriteFileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - fh.mu.Lock() - defer fh.mu.Unlock() - if fh.closed { - fs.Debugf(fh.remote, "WriteFileHandle.Release nothing to do") - return nil - } - fs.Debugf(fh.remote, "WriteFileHandle.Release closing") - err := fh.close() - if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.Release error: %v", err) - } else { - fs.Debugf(fh.remote, "WriteFileHandle.Release OK") - } - return err + return translateError(fh.WriteFileHandle.Release()) } diff --git a/cmd/mount/createinfo.go b/cmd/mountlib/createinfo.go similarity index 96% rename from cmd/mount/createinfo.go rename to cmd/mountlib/createinfo.go index ea5ad9e73..14b979d66 100644 --- a/cmd/mount/createinfo.go +++ b/cmd/mountlib/createinfo.go @@ -1,6 +1,4 @@ -// +build linux darwin freebsd - -package mount +package mountlib import ( "time" diff --git a/cmd/mountlib/dir.go b/cmd/mountlib/dir.go new file mode 100644 index 000000000..bc68b2fdf --- /dev/null +++ b/cmd/mountlib/dir.go @@ -0,0 +1,421 @@ +package mountlib + +import ( + "path" + "sync" + "time" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" +) + +var dirCacheTime = 60 * time.Second // FIXME needs to be settable + +// DirEntry describes the contents of a directory entry +// +// It can be a file or a directory +// +// node may be nil, but o may not +type DirEntry struct { + Obj fs.BasicInfo + Node Node +} + +// Dir represents a directory entry +type Dir struct { + fsys *FS + inode uint64 // inode number + f fs.Fs + path string + modTime time.Time + mu sync.RWMutex // protects the following + read time.Time // time directory entry last read + items map[string]*DirEntry +} + +func newDir(fsys *FS, f fs.Fs, fsDir *fs.Dir) *Dir { + return &Dir{ + fsys: fsys, + f: f, + path: fsDir.Name, + modTime: fsDir.When, + inode: NewInode(), + } +} + +// IsFile returns false for Dir - satisfies Node interface +func (d *Dir) IsFile() bool { + return false +} + +// Inode returns the inode number - satisfies Node interface +func (d *Dir) Inode() uint64 { + return d.inode +} + +// Node returns the Node assocuated with this - satisfies Noder interface +func (d *Dir) Node() Node { + return d +} + +// rename should be called after the directory is renamed +// +// Reset the directory to new state, discarding all the objects and +// reading everything again +func (d *Dir) rename(newParent *Dir, fsDir *fs.Dir) { + d.path = fsDir.Name + d.modTime = fsDir.When + d.items = nil + d.read = time.Time{} +} + +// addObject adds a new object or directory to the directory +// +// note that we add new objects rather than updating old ones +func (d *Dir) addObject(o fs.BasicInfo, node Node) *DirEntry { + item := &DirEntry{ + Obj: o, + Node: node, + } + d.mu.Lock() + d.items[path.Base(o.Remote())] = item + d.mu.Unlock() + return item +} + +// delObject removes an object from the directory +func (d *Dir) delObject(leaf string) { + d.mu.Lock() + delete(d.items, leaf) + d.mu.Unlock() +} + +// read the directory +func (d *Dir) readDir() error { + d.mu.Lock() + defer d.mu.Unlock() + when := time.Now() + if d.read.IsZero() { + fs.Debugf(d.path, "Reading directory") + } else { + age := when.Sub(d.read) + if age < dirCacheTime { + return nil + } + fs.Debugf(d.path, "Re-reading directory (%v old)", age) + } + entries, err := fs.ListDirSorted(d.f, false, d.path) + if err == fs.ErrorDirNotFound { + // We treat directory not found as empty because we + // create directories on the fly + } else if err != nil { + return err + } + // NB when we re-read a directory after its cache has expired + // we drop the old files which should lead to correct + // behaviour but may not be very efficient. + + // Keep a note of the previous contents of the directory + oldItems := d.items + + // Cache the items by name + d.items = make(map[string]*DirEntry, len(entries)) + for _, entry := range entries { + switch item := entry.(type) { + case fs.Object: + obj := item + name := path.Base(obj.Remote()) + d.items[name] = &DirEntry{ + Obj: obj, + Node: nil, + } + case *fs.Dir: + dir := item + name := path.Base(dir.Remote()) + // Use old dir value if it exists + if oldItem, ok := oldItems[name]; ok { + if _, ok := oldItem.Obj.(*fs.Dir); ok { + d.items[name] = oldItem + continue + } + } + d.items[name] = &DirEntry{ + Obj: dir, + Node: nil, + } + default: + err = errors.Errorf("unknown type %T", item) + fs.Errorf(d.path, "readDir error: %v", err) + return err + } + } + d.read = when + return nil +} + +// lookup a single item in the directory +// +// returns ENOENT if not found. +func (d *Dir) lookup(leaf string) (*DirEntry, error) { + err := d.readDir() + if err != nil { + return nil, err + } + d.mu.RLock() + item, ok := d.items[leaf] + d.mu.RUnlock() + if !ok { + return nil, ENOENT + } + return item, nil +} + +// Check to see if a directory is empty +func (d *Dir) isEmpty() (bool, error) { + err := d.readDir() + if err != nil { + return false, err + } + d.mu.RLock() + defer d.mu.RUnlock() + return len(d.items) == 0, nil +} + +// ModTime returns the modification time of the directory +func (d *Dir) ModTime() time.Time { + fs.Debugf(d.path, "Dir.ModTime %v", d.modTime) + return d.modTime +} + +// SetModTime sets the modTime for this dir +func (d *Dir) SetModTime(modTime time.Time) error { + d.mu.Lock() + defer d.mu.Unlock() + d.modTime = modTime + return nil +} + +// lookupNode calls lookup then makes sure the node is not nil in the DirEntry +func (d *Dir) lookupNode(leaf string) (item *DirEntry, err error) { + item, err = d.lookup(leaf) + if err != nil { + return nil, err + } + if item.Node != nil { + return item, nil + } + var node Node + switch x := item.Obj.(type) { + case fs.Object: + node, err = newFile(d, x), nil + case *fs.Dir: + node, err = newDir(d.fsys, d.f, x), nil + default: + err = errors.Errorf("unknown type %T", item) + } + if err != nil { + return nil, err + } + item = d.addObject(item.Obj, node) + return item, nil +} + +// Lookup looks up a specific entry in the receiver. +// +// Lookup should return a Node corresponding to the entry. If the +// name does not exist in the directory, Lookup should return ENOENT. +// +// Lookup need not to handle the names "." and "..". +func (d *Dir) Lookup(name string) (node Node, err error) { + path := path.Join(d.path, name) + fs.Debugf(path, "Dir.Lookup") + item, err := d.lookupNode(name) + if err != nil { + if err != ENOENT { + fs.Errorf(path, "Dir.Lookup error: %v", err) + } + return nil, err + } + fs.Debugf(path, "Dir.Lookup OK") + return item.Node, nil +} + +// ReadDirAll reads the contents of the directory +func (d *Dir) ReadDirAll() (items []*DirEntry, err error) { + fs.Debugf(d.path, "Dir.ReadDirAll") + err = d.readDir() + if err != nil { + fs.Debugf(d.path, "Dir.ReadDirAll error: %v", err) + return nil, err + } + d.mu.RLock() + defer d.mu.RUnlock() + for _, item := range d.items { + items = append(items, item) + } + fs.Debugf(d.path, "Dir.ReadDirAll OK with %d entries", len(items)) + return items, nil +} + +// Create makes a new file +func (d *Dir) Create(name string) (*File, *WriteFileHandle, error) { + path := path.Join(d.path, name) + fs.Debugf(path, "Dir.Create") + src := newCreateInfo(d.f, path) + // This gets added to the directory when the file is written + file := newFile(d, nil) + fh, err := newWriteFileHandle(d, file, src) + if err != nil { + fs.Errorf(path, "Dir.Create error: %v", err) + return nil, nil, err + } + fs.Debugf(path, "Dir.Create OK") + return file, fh, nil +} + +// Mkdir creates a new directory +func (d *Dir) Mkdir(name string) (*Dir, error) { + path := path.Join(d.path, name) + fs.Debugf(path, "Dir.Mkdir") + err := d.f.Mkdir(path) + if err != nil { + fs.Errorf(path, "Dir.Mkdir failed to create directory: %v", err) + return nil, err + } + fsDir := &fs.Dir{ + Name: path, + When: time.Now(), + } + dir := newDir(d.fsys, d.f, fsDir) + d.addObject(fsDir, dir) + fs.Debugf(path, "Dir.Mkdir OK") + return dir, nil +} + +// Remove removes the entry with the given name from +// the receiver, which must be a directory. The entry to be removed +// may correspond to a file (unlink) or to a directory (rmdir). +func (d *Dir) Remove(name string) error { + path := path.Join(d.path, name) + fs.Debugf(path, "Dir.Remove") + item, err := d.lookupNode(name) + if err != nil { + fs.Errorf(path, "Dir.Remove error: %v", err) + return err + } + switch x := item.Obj.(type) { + case fs.Object: + err = x.Remove() + if err != nil { + fs.Errorf(path, "Dir.Remove file error: %v", err) + return err + } + case *fs.Dir: + // Check directory is empty first + dir := item.Node.(*Dir) + empty, err := dir.isEmpty() + if err != nil { + fs.Errorf(path, "Dir.Remove dir error: %v", err) + return err + } + if !empty { + fs.Errorf(path, "Dir.Remove not empty") + return ENOTEMPTY + } + // remove directory + err = d.f.Rmdir(path) + if err != nil { + fs.Errorf(path, "Dir.Remove failed to remove directory: %v", err) + return err + } + default: + fs.Errorf(path, "Dir.Remove unknown type %T", item) + return errors.Errorf("unknown type %T", item) + } + // Remove the item from the directory listing + d.delObject(name) + fs.Debugf(path, "Dir.Remove OK") + return nil +} + +// Rename the file +func (d *Dir) Rename(oldName, newName string, destDir *Dir) error { + oldPath := path.Join(d.path, oldName) + newPath := path.Join(destDir.path, newName) + fs.Debugf(oldPath, "Dir.Rename to %q", newPath) + oldItem, err := d.lookupNode(oldName) + if err != nil { + fs.Errorf(oldPath, "Dir.Rename error: %v", err) + return err + } + var newObj fs.BasicInfo + oldNode := oldItem.Node + switch x := oldItem.Obj.(type) { + case fs.Object: + oldObject := x + // FIXME: could Copy then Delete if Move not available + // - though care needed if case insensitive... + doMove := d.f.Features().Move + if doMove == nil { + err := errors.Errorf("Fs %q can't rename files (no Move)", d.f) + fs.Errorf(oldPath, "Dir.Rename error: %v", err) + return err + } + newObject, err := doMove(oldObject, newPath) + if err != nil { + fs.Errorf(oldPath, "Dir.Rename error: %v", err) + return err + } + newObj = newObject + // Update the node with the new details + if oldNode != nil { + if oldFile, ok := oldNode.(*File); ok { + fs.Debugf(oldItem.Obj, "Updating file with %v %p", newObject, oldFile) + oldFile.rename(destDir, newObject) + } + } + case *fs.Dir: + doDirMove := d.f.Features().DirMove + if doDirMove == nil { + err := errors.Errorf("Fs %q can't rename directories (no DirMove)", d.f) + fs.Errorf(oldPath, "Dir.Rename error: %v", err) + return err + } + srcRemote := x.Name + dstRemote := newPath + err = doDirMove(d.f, srcRemote, dstRemote) + if err != nil { + fs.Errorf(oldPath, "Dir.Rename error: %v", err) + return err + } + newDir := new(fs.Dir) + *newDir = *x + newDir.Name = newPath + newObj = newDir + // Update the node with the new details + if oldNode != nil { + if oldDir, ok := oldNode.(*Dir); ok { + fs.Debugf(oldItem.Obj, "Updating dir with %v %p", newDir, oldDir) + oldDir.rename(destDir, newDir) + } + } + default: + err = errors.Errorf("unknown type %T", oldItem) + fs.Errorf(d.path, "Dir.ReadDirAll error: %v", err) + return err + } + + // Show moved - delete from old dir and add to new + d.delObject(oldName) + destDir.addObject(newObj, oldNode) + + fs.Debugf(newPath, "Dir.Rename renamed from %q", oldPath) + return nil +} + +// Fsync the directory +// +// Note that we don't do anything except return OK +func (d *Dir) Fsync() error { + return nil +} diff --git a/cmd/mountlib/errors.go b/cmd/mountlib/errors.go new file mode 100644 index 000000000..430a07e60 --- /dev/null +++ b/cmd/mountlib/errors.go @@ -0,0 +1,35 @@ +// Cross platform errors + +package mountlib + +import "fmt" + +// Error describes low level errors in a cross platform way +type Error byte + +// Low level errors +const ( + OK Error = iota + ENOENT + ENOTEMPTY + EEXIST + ESPIPE + EBADF +) + +var errorNames = []string{ + OK: "Success", + ENOENT: "No such file or directory", + ENOTEMPTY: "Directory not empty", + EEXIST: "File exists", + ESPIPE: "Illegal seek", + EBADF: "Bad file descriptor", +} + +// Error renders the error as a string +func (e Error) Error() string { + if int(e) >= len(errorNames) { + return fmt.Sprintf("Low level error %d", e) + } + return errorNames[e] +} diff --git a/cmd/mountlib/file.go b/cmd/mountlib/file.go new file mode 100644 index 000000000..75785137d --- /dev/null +++ b/cmd/mountlib/file.go @@ -0,0 +1,203 @@ +package mountlib + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" +) + +// File represents a file +type File struct { + inode uint64 // inode number + size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned + d *Dir // parent directory - read only + mu sync.RWMutex // protects the following + o fs.Object // NB o may be nil if file is being written + writers int // number of writers for this file + pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written +} + +// newFile creates a new File +func newFile(d *Dir, o fs.Object) *File { + return &File{ + d: d, + o: o, + inode: NewInode(), + } +} + +// IsFile returns true for File - satisfies Node interface +func (f *File) IsFile() bool { + return true +} + +// Inode returns the inode number - satisfies Node interface +func (f *File) Inode() uint64 { + return f.inode +} + +// Node returns the Node assocuated with this - satisfies Noder interface +func (f *File) Node() Node { + return f +} + +// rename should be called to update f.o and f.d after a rename +func (f *File) rename(d *Dir, o fs.Object) { + f.mu.Lock() + f.o = o + f.d = d + f.mu.Unlock() +} + +// addWriters increments or decrements the writers +func (f *File) addWriters(n int) { + f.mu.Lock() + f.writers += n + f.mu.Unlock() +} + +// Attr fills out the attributes for the file +func (f *File) Attr(noModTime bool) (modTime time.Time, Size, Blocks uint64, err error) { + f.mu.Lock() + defer f.mu.Unlock() + // if o is nil it isn't valid yet or there are writers, so return the size so far + if f.o == nil || f.writers != 0 { + Size = uint64(atomic.LoadInt64(&f.size)) + if !noModTime && !f.pendingModTime.IsZero() { + modTime = f.pendingModTime + } + } else { + Size = uint64(f.o.Size()) + if !noModTime { + modTime = f.o.ModTime() + } + } + Blocks = (Size + 511) / 512 + fs.Debugf(f.o, "File.Attr modTime=%v, Size=%d, Blocks=%v", modTime, Size, Blocks) + return +} + +// SetModTime sets the modtime for the file +func (f *File) SetModTime(modTime time.Time) error { + f.mu.Lock() + defer f.mu.Unlock() + + f.pendingModTime = modTime + + if f.o != nil { + return f.applyPendingModTime() + } + + // queue up for later, hoping f.o becomes available + return nil +} + +// call with the mutex held +func (f *File) applyPendingModTime() error { + defer func() { f.pendingModTime = time.Time{} }() + + if f.pendingModTime.IsZero() { + return nil + } + + if f.o == nil { + return errors.New("Cannot apply ModTime, file object is not available") + } + + err := f.o.SetModTime(f.pendingModTime) + switch err { + case nil: + fs.Debugf(f.o, "File.applyPendingModTime OK") + case fs.ErrorCantSetModTime: + // do nothing, in order to not break "touch somefile" if it exists already + default: + fs.Errorf(f.o, "File.applyPendingModTime error: %v", err) + return err + } + + return nil +} + +// Update the size while writing +func (f *File) setSize(n int64) { + atomic.StoreInt64(&f.size, n) +} + +// Update the object when written +func (f *File) setObject(o fs.Object) { + f.mu.Lock() + defer f.mu.Unlock() + f.o = o + _ = f.applyPendingModTime() + f.d.addObject(o, f) +} + +// Wait for f.o to become non nil for a short time returning it or an +// error +// +// Call without the mutex held +func (f *File) waitForValidObject() (o fs.Object, err error) { + for i := 0; i < 50; i++ { + f.mu.Lock() + o = f.o + writers := f.writers + f.mu.Unlock() + if o != nil { + return o, nil + } + if writers == 0 { + return nil, errors.New("can't open file - writer failed") + } + time.Sleep(100 * time.Millisecond) + } + return nil, ENOENT +} + +// OpenRead open the file for read +func (f *File) OpenRead() (fh *ReadFileHandle, err error) { + // if o is nil it isn't valid yet + o, err := f.waitForValidObject() + if err != nil { + return nil, err + } + fs.Debugf(o, "File.OpenRead") + + fh, err = newReadFileHandle(f, o, f.d.fsys.noSeek) + err = errors.Wrap(err, "open for read") + + if err != nil { + fs.Errorf(o, "File.OpenRead failed: %v", err) + return nil, err + } + return fh, nil +} + +// OpenWrite open the file for write +func (f *File) OpenWrite() (fh *WriteFileHandle, err error) { + // if o is nil it isn't valid yet + o, err := f.waitForValidObject() + if err != nil { + return nil, err + } + fs.Debugf(o, "File.OpenWrite") + + src := newCreateInfo(f.d.f, o.Remote()) + fh, err = newWriteFileHandle(f.d, f, src) + err = errors.Wrap(err, "open for write") + + if err != nil { + fs.Errorf(o, "File.OpenWrite failed: %v", err) + return nil, err + } + return fh, nil +} + +// Fsync the file +// +// Note that we don't do anything except return OK +func (f *File) Fsync() error { + return nil +} diff --git a/cmd/mountlib/fs.go b/cmd/mountlib/fs.go new file mode 100644 index 000000000..536fd2112 --- /dev/null +++ b/cmd/mountlib/fs.go @@ -0,0 +1,116 @@ +package mountlib + +import ( + "strings" + "sync/atomic" + "time" + + "github.com/ncw/rclone/fs" +) + +// Node represents either a *Dir or a *File +type Node interface { + IsFile() bool + Inode() uint64 +} + +var ( + _ Node = (*File)(nil) + _ Node = (*Dir)(nil) +) + +// Noder represents something which can return a node +type Noder interface { + Node() Node +} + +var ( + _ Noder = (*File)(nil) + _ Noder = (*Dir)(nil) + _ Noder = (*ReadFileHandle)(nil) + _ Noder = (*WriteFileHandle)(nil) +) + +// FS represents the top level filing system +type FS struct { + f fs.Fs + root *Dir + noSeek bool // don't allow seeking if set +} + +// NewFS creates a new filing system and root directory +func NewFS(f fs.Fs) *FS { + fsDir := &fs.Dir{ + Name: "", + When: time.Now(), + } + fsys := &FS{ + f: f, + } + fsys.root = newDir(fsys, f, fsDir) + return fsys +} + +// NoSeek disables seeking of files +func (fsys *FS) NoSeek() *FS { + fsys.noSeek = true + return fsys +} + +// Root returns the root node +func (fsys *FS) Root() (*Dir, error) { + fs.Debugf(fsys.f, "Root()") + return fsys.root, nil +} + +var inodeCount uint64 + +// NewInode creates a new unique inode number +func NewInode() (inode uint64) { + return atomic.AddUint64(&inodeCount, 1) +} + +// Lookup finds the Node by path starting from the root +func (f *FS) Lookup(path string) (node Node, err error) { + node = f.root + for path != "" { + i := strings.IndexRune(path, '/') + var name string + if i < 0 { + name, path = path, "" + } else { + name, path = path[:i], path[i+1:] + } + if name == "" { + continue + } + dir, ok := node.(*Dir) + if !ok { + // We need to look in a directory, but found a file + return nil, ENOENT + } + node, err = dir.Lookup(name) + if err != nil { + return nil, err + } + } + return +} + +// Statfs is called to obtain file system metadata. +// It should write that data to resp. +func (f *FS) Statfs() error { + /* FIXME + const blockSize = 4096 + const fsBlocks = (1 << 50) / blockSize + resp.Blocks = fsBlocks // Total data blocks in file system. + resp.Bfree = fsBlocks // Free blocks in file system. + resp.Bavail = fsBlocks // Free blocks in file system if you're not root. + resp.Files = 1E9 // Total files in file system. + resp.Ffree = 1E9 // Free files in file system. + resp.Bsize = blockSize // Block size + resp.Namelen = 255 // Maximum file name length? + resp.Frsize = blockSize // Fragment size, smallest addressable data size in the file system. + */ + return nil +} diff --git a/cmd/mount/dir_test.go b/cmd/mountlib/mounttest/dir.go similarity index 99% rename from cmd/mount/dir_test.go rename to cmd/mountlib/mounttest/dir.go index 50fe33447..527719ab4 100644 --- a/cmd/mount/dir_test.go +++ b/cmd/mountlib/mounttest/dir.go @@ -1,6 +1,6 @@ // +build linux darwin freebsd -package mount +package mounttest import ( "io/ioutil" diff --git a/cmd/mount/file_test.go b/cmd/mountlib/mounttest/file.go similarity index 98% rename from cmd/mount/file_test.go rename to cmd/mountlib/mounttest/file.go index 3e59bf58e..b936c10fe 100644 --- a/cmd/mount/file_test.go +++ b/cmd/mountlib/mounttest/file.go @@ -1,6 +1,6 @@ // +build linux darwin freebsd -package mount +package mounttest import ( "os" diff --git a/cmd/mount/fs_test.go b/cmd/mountlib/mounttest/fs.go similarity index 85% rename from cmd/mount/fs_test.go rename to cmd/mountlib/mounttest/fs.go index 53a722d2f..e943f5f91 100644 --- a/cmd/mount/fs_test.go +++ b/cmd/mountlib/mounttest/fs.go @@ -2,7 +2,7 @@ // Test suite for rclonefs -package mount +package mounttest import ( "flag" @@ -33,10 +33,22 @@ var ( LowLevelRetries = flag.Int("low-level-retries", 10, "Number of low level retries") ) +type ( + UnmountFn func() error + MountFn func(f fs.Fs, mountpoint string) (<-chan error, func() error, error) +) + +var ( + mountFn MountFn +) + // TestMain drives the tests -func TestMain(m *testing.M) { +func TestMain(m *testing.M, fn MountFn, dirPerms, filePerms os.FileMode) { + mountFn = fn flag.Parse() run = newRun() + run.dirPerms = dirPerms + run.filePerms = filePerms rc := m.Run() run.Finalise() os.Exit(rc) @@ -44,13 +56,14 @@ func TestMain(m *testing.M) { // Run holds the remotes for a test run type Run struct { - mountPath string - fremote fs.Fs - fremoteName string - cleanRemote func() - umountResult <-chan error - mountFS *FS - skip bool + mountPath string + fremote fs.Fs + fremoteName string + cleanRemote func() + umountResult <-chan error + umountFn UnmountFn + skip bool + dirPerms, filePerms os.FileMode } // run holds the master Run data @@ -103,7 +116,7 @@ func newRun() *Run { func (r *Run) mount() { log.Printf("mount %q %q", r.fremote, r.mountPath) var err error - r.mountFS, r.umountResult, err = mount(r.fremote, r.mountPath) + r.umountResult, r.umountFn, err = mountFn(r.fremote, r.mountPath) if err != nil { log.Printf("mount failed: %v", err) r.skip = true @@ -116,10 +129,17 @@ func (r *Run) umount() { log.Printf("FUSE not found so skipping umount") return } - log.Printf("Calling fusermount -u %q", r.mountPath) - err := exec.Command("fusermount", "-u", r.mountPath).Run() + /* + log.Printf("Calling fusermount -u %q", r.mountPath) + err := exec.Command("fusermount", "-u", r.mountPath).Run() + if err != nil { + log.Printf("fusermount failed: %v", err) + } + */ + log.Printf("Unmounting %q", r.mountPath) + err := r.umountFn() if err != nil { - log.Printf("fusermount failed: %v", err) + log.Fatalf("signal to umount failed: %v", err) } log.Printf("Waiting for umount") err = <-r.umountResult @@ -182,10 +202,10 @@ func (r *Run) readLocal(t *testing.T, dir dirMap, filepath string) { if fi.IsDir() { dir[name+"/"] = struct{}{} r.readLocal(t, dir, name) - assert.Equal(t, fi.Mode().Perm(), os.FileMode(dirPerms)) + assert.Equal(t, r.dirPerms, fi.Mode().Perm()) } else { dir[fmt.Sprintf("%s %d", name, fi.Size())] = struct{}{} - assert.Equal(t, fi.Mode().Perm(), os.FileMode(filePerms)) + assert.Equal(t, r.filePerms, fi.Mode().Perm()) } } } @@ -267,5 +287,5 @@ func TestRoot(t *testing.T) { fi, err := os.Lstat(run.mountPath) require.NoError(t, err) assert.True(t, fi.IsDir()) - assert.Equal(t, fi.Mode().Perm(), os.FileMode(dirPerms)) + assert.Equal(t, fi.Mode().Perm(), run.dirPerms) } diff --git a/cmd/mount/read_test.go b/cmd/mountlib/mounttest/read.go similarity index 99% rename from cmd/mount/read_test.go rename to cmd/mountlib/mounttest/read.go index 6757e53c2..d09c40d64 100644 --- a/cmd/mount/read_test.go +++ b/cmd/mountlib/mounttest/read.go @@ -1,6 +1,6 @@ // +build linux darwin freebsd -package mount +package mounttest import ( "io" diff --git a/cmd/mount/write_test.go b/cmd/mountlib/mounttest/write.go similarity index 95% rename from cmd/mount/write_test.go rename to cmd/mountlib/mounttest/write.go index ee838c53f..58e175ee4 100644 --- a/cmd/mount/write_test.go +++ b/cmd/mountlib/mounttest/write.go @@ -1,11 +1,12 @@ // +build linux darwin freebsd -package mount +package mounttest import ( "os" "syscall" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,6 +22,9 @@ func TestWriteFileNoWrite(t *testing.T) { err = fd.Close() assert.NoError(t, err) + // FIXME - wait for the Release on the file + time.Sleep(10 * time.Millisecond) + run.checkDir(t, "testnowrite 0") run.rm(t, "testnowrite") diff --git a/cmd/mountlib/read.go b/cmd/mountlib/read.go new file mode 100644 index 000000000..38d0610f9 --- /dev/null +++ b/cmd/mountlib/read.go @@ -0,0 +1,210 @@ +package mountlib + +import ( + "io" + "sync" + + "github.com/ncw/rclone/fs" +) + +// ReadFileHandle is an open for read file handle on a File +type ReadFileHandle struct { + mu sync.Mutex + closed bool // set if handle has been closed + r *fs.Account + o fs.Object + readCalled bool // set if read has been called + offset int64 + noSeek bool + file *File +} + +func newReadFileHandle(f *File, o fs.Object, noSeek bool) (*ReadFileHandle, error) { + r, err := o.Open() + if err != nil { + return nil, err + } + fh := &ReadFileHandle{ + o: o, + r: fs.NewAccount(r, o).WithBuffer(), // account the transfer + noSeek: noSeek, + file: f, + } + fs.Stats.Transferring(fh.o.Remote()) + return fh, nil +} + +// Node returns the Node assocuated with this - satisfies Noder interface +func (fh *ReadFileHandle) Node() Node { + return fh.file +} + +// seek to a new offset +// +// if reopen is true, then we won't attempt to use an io.Seeker interface +// +// Must be called with fh.mu held +func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { + if fh.noSeek { + return ESPIPE + } + fh.r.StopBuffering() // stop the background reading first + oldReader := fh.r.GetReader() + r := oldReader + // Can we seek it directly? + if do, ok := oldReader.(io.Seeker); !reopen && ok { + fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) + _, err = do.Seek(offset, 0) + if err != nil { + fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err) + return err + } + } else { + fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset) + // close old one + err = oldReader.Close() + if err != nil { + fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) + } + // re-open with a seek + r, err = fh.o.Open(&fs.SeekOption{Offset: offset}) + if err != nil { + fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err) + return err + } + } + fh.r.UpdateReader(r) + fh.offset = offset + return nil +} + +// Read from the file handle +func (fh *ReadFileHandle) Read(reqSize, reqOffset int64) (respData []byte, err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + fs.Debugf(fh.o, "ReadFileHandle.Read size %d offset %d", reqSize, reqOffset) + if fh.closed { + fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", EBADF) + return nil, EBADF + } + doSeek := reqOffset != fh.offset + var n int + var newOffset int64 + retries := 0 + buf := make([]byte, reqSize) + doReopen := false + for { + if doSeek { + // Are we attempting to seek beyond the end of the + // file - if so just return EOF leaving the underlying + // file in an unchanged state. + if reqOffset >= fh.o.Size() { + fs.Debugf(fh.o, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", reqOffset, fh.o.Size()) + respData = nil + return nil, nil + } + // Otherwise do the seek + err = fh.seek(reqOffset, doReopen) + } else { + err = nil + } + if err == nil { + if reqSize > 0 { + fh.readCalled = true + } + // One exception to the above is if we fail to fully populate a + // page cache page; a read into page cache is always page aligned. + // Make sure we never serve a partial read, to avoid that. + n, err = io.ReadFull(fh.r, buf) + newOffset = fh.offset + int64(n) + // if err == nil && rand.Intn(10) == 0 { + // err = errors.New("random error") + // } + if err == nil { + break + } else if (err == io.ErrUnexpectedEOF || err == io.EOF) && newOffset == fh.o.Size() { + // Have read to end of file - reset error + err = nil + break + } + } + if retries >= fs.Config.LowLevelRetries { + break + } + retries++ + fs.Errorf(fh.o, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err) + doSeek = true + doReopen = true + } + if err != nil { + fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", err) + } else { + respData = buf[:n] + fh.offset = newOffset + fs.Debugf(fh.o, "ReadFileHandle.Read OK") + } + return respData, err +} + +// close the file handle returning EBADF if it has been +// closed already. +// +// Must be called with fh.mu held +func (fh *ReadFileHandle) close() error { + if fh.closed { + return EBADF + } + fh.closed = true + fs.Stats.DoneTransferring(fh.o.Remote(), true) + return fh.r.Close() +} + +// Flush is called each time the file or directory is closed. +// Because there can be multiple file descriptors referring to a +// single opened file, Flush can be called multiple times. +func (fh *ReadFileHandle) Flush() error { + fh.mu.Lock() + defer fh.mu.Unlock() + fs.Debugf(fh.o, "ReadFileHandle.Flush") + + // Ignore the Flush as there is nothing we can sensibly do and + // it seems quite common for Flush to be called from + // different threads each of which have read some data. + if false { + // If Read hasn't been called then ignore the Flush - Release + // will pick it up + if !fh.readCalled { + fs.Debugf(fh.o, "ReadFileHandle.Flush ignoring flush on unread handle") + return nil + + } + err := fh.close() + if err != nil { + fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err) + return err + } + } + fs.Debugf(fh.o, "ReadFileHandle.Flush OK") + return nil +} + +// Release is called when we are finished with the file handle +// +// It isn't called directly from userspace so the error is ignored by +// the kernel +func (fh *ReadFileHandle) Release() error { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + fs.Debugf(fh.o, "ReadFileHandle.Release nothing to do") + return nil + } + fs.Debugf(fh.o, "ReadFileHandle.Release closing") + err := fh.close() + if err != nil { + fs.Errorf(fh.o, "ReadFileHandle.Release error: %v", err) + } else { + fs.Debugf(fh.o, "ReadFileHandle.Release OK") + } + return err +} diff --git a/cmd/mountlib/write.go b/cmd/mountlib/write.go new file mode 100644 index 000000000..b335ad06c --- /dev/null +++ b/cmd/mountlib/write.go @@ -0,0 +1,158 @@ +package mountlib + +import ( + "io" + "sync" + + "github.com/ncw/rclone/fs" +) + +// WriteFileHandle is an open for write handle on a File +type WriteFileHandle struct { + mu sync.Mutex + closed bool // set if handle has been closed + remote string + pipeReader *io.PipeReader + pipeWriter *io.PipeWriter + o fs.Object + result chan error + file *File + writeCalled bool // set the first time Write() is called + offset int64 +} + +func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) { + fh := &WriteFileHandle{ + remote: src.Remote(), + result: make(chan error, 1), + file: f, + } + fh.pipeReader, fh.pipeWriter = io.Pipe() + r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer + go func() { + o, err := d.f.Put(r, src) + fh.o = o + fh.result <- err + }() + fh.file.addWriters(1) + fh.file.setSize(0) + fs.Stats.Transferring(fh.remote) + return fh, nil +} + +// Node returns the Node assocuated with this - satisfies Noder interface +func (fh *WriteFileHandle) Node() Node { + return fh.file +} + +// Write data to the file handle +func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err error) { + fs.Debugf(fh.remote, "WriteFileHandle.Write len=%d", len(data)) + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.offset != offset { + fs.Errorf(fh.remote, "WriteFileHandle.Write can't seek in file") + return 0, ESPIPE + } + if fh.closed { + fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", EBADF) + return 0, EBADF + } + fh.writeCalled = true + // FIXME should probably check the file isn't being seeked? + n, err := fh.pipeWriter.Write(data) + written = int64(n) + fh.offset += written + fh.file.setSize(fh.offset) + if err != nil { + fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", err) + return 0, err + } + fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) + return written, nil +} + +// Returns the offset of the file pointer +func (fh *WriteFileHandle) Offset() (offset int64) { + return fh.offset +} + +// close the file handle returning EBADF if it has been +// closed already. +// +// Must be called with fh.mu held +func (fh *WriteFileHandle) close() error { + if fh.closed { + return EBADF + } + fh.closed = true + fs.Stats.DoneTransferring(fh.remote, true) + fh.file.addWriters(-1) + writeCloseErr := fh.pipeWriter.Close() + err := <-fh.result + readCloseErr := fh.pipeReader.Close() + if err == nil { + fh.file.setObject(fh.o) + err = writeCloseErr + } + if err == nil { + err = readCloseErr + } + return err +} + +// Flush is called on each close() of a file descriptor. So if a +// filesystem wants to return write errors in close() and the file has +// cached dirty data, this is a good place to write back data and +// return any errors. Since many applications ignore close() errors +// this is not always useful. +// +// NOTE: The flush() method may be called more than once for each +// open(). This happens if more than one file descriptor refers to an +// opened file due to dup(), dup2() or fork() calls. It is not +// possible to determine if a flush is final, so each flush should be +// treated equally. Multiple write-flush sequences are relatively +// rare, so this shouldn't be a problem. +// +// Filesystems shouldn't assume that flush will always be called after +// some writes, or that if will be called at all. +func (fh *WriteFileHandle) Flush() error { + fh.mu.Lock() + defer fh.mu.Unlock() + fs.Debugf(fh.remote, "WriteFileHandle.Flush") + // If Write hasn't been called then ignore the Flush - Release + // will pick it up + if !fh.writeCalled { + fs.Debugf(fh.remote, "WriteFileHandle.Flush ignoring flush on unwritten handle") + return nil + + } + err := fh.close() + if err != nil { + fs.Errorf(fh.remote, "WriteFileHandle.Flush error: %v", err) + } else { + fs.Debugf(fh.remote, "WriteFileHandle.Flush OK") + } + return err +} + +// Release is called when we are finished with the file handle +// +// It isn't called directly from userspace so the error is ignored by +// the kernel +func (fh *WriteFileHandle) Release() error { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.closed { + fs.Debugf(fh.remote, "WriteFileHandle.Release nothing to do") + return nil + } + fs.Debugf(fh.remote, "WriteFileHandle.Release closing") + err := fh.close() + if err != nil { + fs.Errorf(fh.remote, "WriteFileHandle.Release error: %v", err) + } else { + fs.Debugf(fh.remote, "WriteFileHandle.Release OK") + } + return err +}