From 12405f9f41493a56832321ec21d226b1da4ac428 Mon Sep 17 00:00:00 2001 From: Stefan Breunig Date: Sat, 16 Sep 2017 22:49:08 +0200 Subject: [PATCH] fuse: re-use rcat to support uploads for all remotes (fixes #1672) --- cmd/mountlib/mount.go | 7 ------- cmd/mountlib/write.go | 43 +++++-------------------------------------- cmd/rcat/rcat.go | 3 ++- fs/operations.go | 28 +++++++++++++--------------- fs/operations_test.go | 4 ++-- 5 files changed, 22 insertions(+), 63 deletions(-) diff --git a/cmd/mountlib/mount.go b/cmd/mountlib/mount.go index 0fde56a5f..a78e88916 100644 --- a/cmd/mountlib/mount.go +++ b/cmd/mountlib/mount.go @@ -143,13 +143,6 @@ Assuming only one rclone instance is running, you can reset the cache like this: kill -SIGHUP $(pidof rclone) - -### Bugs ### - - * All the remotes should work for read, but some may not for write - * those which need to know the size in advance won't - eg B2 - * maybe should pass in size as -1 to mean work it out - * Or put in an an upload cache to cache the files on disk first `, Run: func(command *cobra.Command, args []string) { cmd.CheckArgs(2, 2, command, args) diff --git a/cmd/mountlib/write.go b/cmd/mountlib/write.go index 951712b0e..a273c06ff 100644 --- a/cmd/mountlib/write.go +++ b/cmd/mountlib/write.go @@ -3,9 +3,9 @@ package mountlib import ( "io" "sync" + "time" "github.com/ncw/rclone/fs" - "github.com/pkg/errors" ) // WriteFileHandle is an open for write handle on a File @@ -19,41 +19,28 @@ type WriteFileHandle struct { file *File writeCalled bool // set the first time Write() is called offset int64 - hash *fs.MultiHasher } func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) { - var hash *fs.MultiHasher - if !f.d.fsys.noChecksum { - var err error - hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes()) - if err != nil { - fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err) - } - } - fh := &WriteFileHandle{ remote: src.Remote(), result: make(chan error, 1), file: f, - hash: hash, } var pipeReader *io.PipeReader pipeReader, fh.pipeWriter = io.Pipe() go func() { - r := fs.NewAccountSizeName(pipeReader, 0, src.Remote()).WithBuffer() // account the transfer - o, err := d.f.Put(r, src) + o, err := fs.Rcat(d.f, src.Remote(), pipeReader, time.Now()) if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.New Put failed: %v", err) + fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err) } - // Close the Account and thus the pipeReader so the pipeWriter fails with ErrClosedPipe - _ = r.Close() + // Close the pipeReader so the pipeWriter fails with ErrClosedPipe + _ = pipeReader.Close() fh.o = o fh.result <- err }() fh.file.addWriters(1) fh.file.setSize(0) - fs.Stats.Transferring(fh.remote) return fh, nil } @@ -87,7 +74,6 @@ func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err return 0, EBADF } fh.writeCalled = true - // FIXME should probably check the file isn't being seeked? n, err := fh.pipeWriter.Write(data) written = int64(n) fh.offset += written @@ -97,13 +83,6 @@ func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err return 0, err } // fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) - if fh.hash != nil { - _, err = fh.hash.Write(data[:n]) - if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err) - return written, err - } - } return written, nil } @@ -121,7 +100,6 @@ func (fh *WriteFileHandle) close() error { return EBADF } fh.closed = true - fs.Stats.DoneTransferring(fh.remote, true) fh.file.addWriters(-1) writeCloseErr := fh.pipeWriter.Close() err := <-fh.result @@ -129,17 +107,6 @@ func (fh *WriteFileHandle) close() error { fh.file.setObject(fh.o) err = writeCloseErr } - if err == nil && fh.hash != nil { - for hashType, srcSum := range fh.hash.Sums() { - dstSum, err := fh.o.Hash(hashType) - if err != nil { - return err - } - if !fs.HashEquals(srcSum, dstSum) { - return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum) - } - } - } return err } diff --git a/cmd/rcat/rcat.go b/cmd/rcat/rcat.go index 0d6be6aee..3552b1f55 100644 --- a/cmd/rcat/rcat.go +++ b/cmd/rcat/rcat.go @@ -50,7 +50,8 @@ a lot of data, you're better off caching locally and then fdst, dstFileName := cmd.NewFsDstFile(args) cmd.Run(false, false, command, func() error { - return fs.Rcat(fdst, dstFileName, os.Stdin, time.Now()) + _, err := fs.Rcat(fdst, dstFileName, os.Stdin, time.Now()) + return err }) }, } diff --git a/fs/operations.go b/fs/operations.go index b80a03a0a..2a08f2d7a 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -1561,8 +1561,7 @@ func Cat(f Fs, w io.Writer, offset, count int64) error { } // Rcat reads data from the Reader until EOF and uploads it to a file on remote -func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (err error) { - +func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (dst Object, err error) { Stats.Transferring(dstFileName) defer func() { Stats.DoneTransferring(dstFileName, err == nil) @@ -1574,7 +1573,7 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er hashOption := &HashesOption{Hashes: fdst.Hashes()} hash, err := NewMultiHasherTypes(fdst.Hashes()) if err != nil { - return err + return nil, err } readCounter := NewCountingReader(in0) trackingIn := io.TeeReader(readCounter, hash) @@ -1599,13 +1598,13 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er objInfo := NewStaticObjectInfo(dstFileName, modTime, int64(n), false, nil, nil) if Config.DryRun { Logf("stdin", "Not uploading as --dry-run") - return nil + return nil, nil } dst, err := fdst.Put(in, objInfo, hashOption) if err != nil { - return err + return dst, err } - return compare(dst) + return dst, compare(dst) } in := ioutil.NopCloser(io.MultiReader(bytes.NewReader(buf), trackingIn)) @@ -1615,7 +1614,7 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") tmpLocalFs, err := temporaryLocalFs() if err != nil { - return errors.Wrap(err, "Failed to create temporary local FS to spool file") + return nil, errors.Wrap(err, "Failed to create temporary local FS to spool file") } defer func() { err := Purge(tmpLocalFs) @@ -1632,21 +1631,20 @@ func Rcat(fdst Fs, dstFileName string, in0 io.ReadCloser, modTime time.Time) (er Logf("stdin", "Not uploading as --dry-run") // prevents "broken pipe" errors _, err = io.Copy(ioutil.Discard, in) - return err + return nil, err } objInfo := NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil) - tmpObj, err := fStreamTo.Features().PutStream(in, objInfo, hashOption) - if err != nil { - return err + if dst, err = fStreamTo.Features().PutStream(in, objInfo, hashOption); err != nil { + return dst, err } - if err = compare(tmpObj); err != nil { - return err + if err = compare(dst); err != nil { + return dst, err } if !canStream { - return Copy(fdst, nil, dstFileName, tmpObj) + return dst, Copy(fdst, nil, dstFileName, dst) } - return nil + return dst, nil } // Rmdirs removes any empty directories (or directories only diff --git a/fs/operations_test.go b/fs/operations_test.go index d5da8ae22..de7335670 100644 --- a/fs/operations_test.go +++ b/fs/operations_test.go @@ -754,11 +754,11 @@ func TestRcat(t *testing.T) { path2 := prefix + "big_file_from_pipe" in := ioutil.NopCloser(strings.NewReader(data1)) - err := fs.Rcat(r.fremote, path1, in, t1) + _, err := fs.Rcat(r.fremote, path1, in, t1) require.NoError(t, err) in = ioutil.NopCloser(strings.NewReader(data2)) - err = fs.Rcat(r.fremote, path2, in, t2) + _, err = fs.Rcat(r.fremote, path2, in, t2) require.NoError(t, err) file1 := fstest.NewItem(path1, data1, t1)