diff --git a/backend/chunker/chunker_internal_test.go b/backend/chunker/chunker_internal_test.go index dee3657d9..fc3e958b0 100644 --- a/backend/chunker/chunker_internal_test.go +++ b/backend/chunker/chunker_internal_test.go @@ -759,7 +759,7 @@ func testFutureProof(t *testing.T, f *Fs) { // Rcat must fail in := io.NopCloser(bytes.NewBufferString("abc")) - robj, err := operations.Rcat(ctx, f, file, in, modTime) + robj, err := operations.Rcat(ctx, f, file, in, modTime, nil) assert.Nil(t, robj) assert.NotNil(t, err) if err != nil { diff --git a/cmd/rcat/rcat.go b/cmd/rcat/rcat.go index c7766265d..4328a8aa8 100644 --- a/cmd/rcat/rcat.go +++ b/cmd/rcat/rcat.go @@ -66,7 +66,7 @@ a lot of data, you're better off caching locally and then fdst, dstFileName := cmd.NewFsDstFile(args) cmd.Run(false, false, command, func() error { - _, err := operations.RcatSize(context.Background(), fdst, dstFileName, os.Stdin, size, time.Now()) + _, err := operations.RcatSize(context.Background(), fdst, dstFileName, os.Stdin, size, time.Now(), nil) return err }) }, diff --git a/cmd/serve/restic/restic.go b/cmd/serve/restic/restic.go index a0a521bb6..7dfed0e1e 100644 --- a/cmd/serve/restic/restic.go +++ b/cmd/serve/restic/restic.go @@ -295,7 +295,7 @@ func (s *Server) postObject(w http.ResponseWriter, r *http.Request, remote strin } } - o, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now()) + o, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now(), nil) if err != nil { err = accounting.Stats(r.Context()).Error(err) fs.Errorf(remote, "Post request rcat error: %v", err) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 26db4cc0d..4b4afd041 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -475,8 +475,16 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj } else { actionTaken = "Copied (Rcat, new)" } + // Make any metadata to pass to rcat + var meta fs.Metadata + if ci.Metadata { + meta, err = fs.GetMetadata(ctx, src) + if err != nil { + fs.Errorf(src, "Failed to read metadata: %v", err) + } + } // NB Rcat closes in0 - dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) + dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx), meta) newDst = dst } else { in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer @@ -1347,7 +1355,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { } // Rcat reads data from the Reader until EOF and uploads it to a file on remote -func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { +func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) { ci := fs.GetConfig(ctx) tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1) defer func() { @@ -1386,7 +1394,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, opt.checkSum = true sums = hasher.Sums() } - src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst) + src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta) if !equal(ctx, src, dst, opt) { err = fmt.Errorf("corrupted on transfer") err = fs.CountError(err) @@ -1400,7 +1408,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, buf := make([]byte, ci.StreamingUploadCutoff) if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) - src := object.NewMemoryObject(dstFileName, modTime, buf[:n]) + src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta) return Copy(ctx, fdst, nil, dstFileName, src) } @@ -1433,7 +1441,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, return nil, err } - objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil) + objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil).WithMetadata(meta) if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil { return dst, err } @@ -1442,7 +1450,22 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, } if !canStream { // copy dst (which is the local object we have just streamed to) to the remote - return Copy(ctx, fdst, nil, dstFileName, dst) + newCtx := ctx + if ci.Metadata && len(meta) != 0 { + // If we have metadata and we are setting it then use + // the --metadataset mechanism to supply it to Copy + var newCi *fs.ConfigInfo + newCtx, newCi = fs.AddConfig(ctx) + if len(newCi.MetadataSet) == 0 { + newCi.MetadataSet = meta + } else { + var newMeta fs.Metadata + newMeta.Merge(meta) + newMeta.Merge(newCi.MetadataSet) // --metadata-set takes priority + newCi.MetadataSet = newMeta + } + } + return Copy(newCtx, fdst, nil, dstFileName, dst) } return dst, nil } @@ -1724,7 +1747,7 @@ func NeedTransfer(ctx context.Context, dst, src fs.Object) bool { // RcatSize reads data from the Reader until EOF and uploads it to a file on remote. // Pass in size >=0 if known, <0 if not known -func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, size int64, modTime time.Time) (dst fs.Object, err error) { +func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, size int64, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) { var obj fs.Object if size >= 0 { @@ -1743,7 +1766,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo return nil, err } - info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst) + info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst).WithMetadata(meta) obj, err = fdst.Put(ctx, in, info) if err != nil { fs.Errorf(dstFileName, "Post request put error: %v", err) @@ -1752,7 +1775,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo } } else { // Size unknown use Rcat - obj, err = Rcat(ctx, fdst, dstFileName, in, modTime) + obj, err = Rcat(ctx, fdst, dstFileName, in, modTime, meta) if err != nil { fs.Errorf(dstFileName, "Post request rcat error: %v", err) @@ -1811,7 +1834,7 @@ func CopyURL(ctx context.Context, fdst fs.Fs, dstFileName string, url string, au return errors.New("CopyURL failed: file already exist") } } - dst, err = RcatSize(ctx, fdst, dstFileName, in, size, modTime) + dst, err = RcatSize(ctx, fdst, dstFileName, in, size, modTime, nil) return err }) return dst, err diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index 8f8229c4f..b9be64902 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -1590,11 +1590,11 @@ func TestRcat(t *testing.T) { path2 := prefix + "big_file_from_pipe" in := io.NopCloser(strings.NewReader(data1)) - _, err := operations.Rcat(ctx, r.Fremote, path1, in, t1) + _, err := operations.Rcat(ctx, r.Fremote, path1, in, t1, nil) require.NoError(t, err) in = io.NopCloser(strings.NewReader(data2)) - _, err = operations.Rcat(ctx, r.Fremote, path2, in, t2) + _, err = operations.Rcat(ctx, r.Fremote, path2, in, t2, nil) require.NoError(t, err) file1 := fstest.NewItem(path1, data1, t1) @@ -1611,6 +1611,62 @@ func TestRcat(t *testing.T) { } } +func TestRcatMetadata(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + if !r.Fremote.Features().UserMetadata { + t.Skip("Skipping as destination doesn't support user metadata") + } + + test := func(disableUploadCutoff bool) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.Metadata = true + data := "this is some really nice test data with metadata" + path := "rcat_metadata" + + meta := fs.Metadata{ + "key": "value", + "sausage": "potato", + } + + if disableUploadCutoff { + ci.StreamingUploadCutoff = 0 + data += " uploadCutoff=0" + path += "_uploadcutoff0" + } + + fstest.CheckListing(t, r.Fremote, []fstest.Item{}) + + in := io.NopCloser(strings.NewReader(data)) + _, err := operations.Rcat(ctx, r.Fremote, path, in, t1, meta) + require.NoError(t, err) + + file := fstest.NewItem(path, data, t1) + r.CheckRemoteItems(t, file) + + o, err := r.Fremote.NewObject(ctx, path) + require.NoError(t, err) + gotMeta, err := fs.GetMetadata(ctx, o) + require.NoError(t, err) + // Check the specific user data we set is set + // Likey there will be other values + assert.Equal(t, "value", gotMeta["key"]) + assert.Equal(t, "potato", gotMeta["sausage"]) + + // Delete the test file + require.NoError(t, o.Remove(ctx)) + } + + t.Run("Normal", func(t *testing.T) { + test(false) + }) + t.Run("ViaDisk", func(t *testing.T) { + test(true) + }) +} + func TestRcatSize(t *testing.T) { ctx := context.Background() r := fstest.NewRun(t) @@ -1621,7 +1677,7 @@ func TestRcatSize(t *testing.T) { file2 := r.WriteFile("potato2", body, t2) // Test with known length bodyReader := io.NopCloser(strings.NewReader(body)) - obj, err := operations.RcatSize(ctx, r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime) + obj, err := operations.RcatSize(ctx, r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime, nil) require.NoError(t, err) assert.Equal(t, int64(len(body)), obj.Size()) assert.Equal(t, file1.Path, obj.Remote()) @@ -1629,7 +1685,7 @@ func TestRcatSize(t *testing.T) { // Test with unknown length bodyReader = io.NopCloser(strings.NewReader(body)) // reset Reader io.NopCloser(strings.NewReader(body)) - obj, err = operations.RcatSize(ctx, r.Fremote, file2.Path, bodyReader, -1, file2.ModTime) + obj, err = operations.RcatSize(ctx, r.Fremote, file2.Path, bodyReader, -1, file2.ModTime, nil) require.NoError(t, err) assert.Equal(t, int64(len(body)), obj.Size()) assert.Equal(t, file2.Path, obj.Remote()) @@ -1638,6 +1694,58 @@ func TestRcatSize(t *testing.T) { r.CheckRemoteItems(t, file1, file2) } +func TestRcatSizeMetadata(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + if !r.Fremote.Features().UserMetadata { + t.Skip("Skipping as destination doesn't support user metadata") + } + + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.Metadata = true + + meta := fs.Metadata{ + "key": "value", + "sausage": "potato", + } + + const body = "------------------------------------------------------------" + file1 := r.WriteFile("potato1", body, t1) + file2 := r.WriteFile("potato2", body, t2) + + // Test with known length + bodyReader := io.NopCloser(strings.NewReader(body)) + obj, err := operations.RcatSize(ctx, r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime, meta) + require.NoError(t, err) + assert.Equal(t, int64(len(body)), obj.Size()) + assert.Equal(t, file1.Path, obj.Remote()) + + // Test with unknown length + bodyReader = io.NopCloser(strings.NewReader(body)) // reset Reader + io.NopCloser(strings.NewReader(body)) + obj, err = operations.RcatSize(ctx, r.Fremote, file2.Path, bodyReader, -1, file2.ModTime, meta) + require.NoError(t, err) + assert.Equal(t, int64(len(body)), obj.Size()) + assert.Equal(t, file2.Path, obj.Remote()) + + // Check files exist + r.CheckRemoteItems(t, file1, file2) + + // Check metadata OK + for _, path := range []string{file1.Path, file2.Path} { + o, err := r.Fremote.NewObject(ctx, path) + require.NoError(t, err) + gotMeta, err := fs.GetMetadata(ctx, o) + require.NoError(t, err) + // Check the specific user data we set is set + // Likey there will be other values + assert.Equal(t, "value", gotMeta["key"]) + assert.Equal(t, "potato", gotMeta["sausage"]) + } +} + func TestCopyFileMaxTransfer(t *testing.T) { ctx := context.Background() ctx, ci := fs.AddConfig(ctx) diff --git a/fs/operations/rc.go b/fs/operations/rc.go index 39705b119..a4e15ea09 100644 --- a/fs/operations/rc.go +++ b/fs/operations/rc.go @@ -305,7 +305,7 @@ func rcSingleCommand(ctx context.Context, in rc.Params, name string, noRemote bo return nil, err } if p.FileName() != "" { - obj, err := Rcat(ctx, f, path.Join(remote, p.FileName()), p, time.Now()) + obj, err := Rcat(ctx, f, path.Join(remote, p.FileName()), p, time.Now(), nil) if err != nil { return nil, err } diff --git a/vfs/vfscache/downloaders/downloaders_test.go b/vfs/vfscache/downloaders/downloaders_test.go index 9fa425192..d079986b5 100644 --- a/vfs/vfscache/downloaders/downloaders_test.go +++ b/vfs/vfscache/downloaders/downloaders_test.go @@ -85,7 +85,7 @@ func TestDownloaders(t *testing.T) { // Write the test file in := io.NopCloser(readers.NewPatternReader(size)) - src, err := operations.RcatSize(ctx, r.Fremote, remote, in, size, time.Now()) + src, err := operations.RcatSize(ctx, r.Fremote, remote, in, size, time.Now(), nil) require.NoError(t, err) assert.Equal(t, size, src.Size()) diff --git a/vfs/write.go b/vfs/write.go index 4556ebe3e..e18f1d001 100644 --- a/vfs/write.go +++ b/vfs/write.go @@ -68,7 +68,7 @@ func (fh *WriteFileHandle) openPending() (err error) { pipeReader, fh.pipeWriter = io.Pipe() go func() { // NB Rcat deals with Stats.Transferring, etc. - o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now()) + o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now(), nil) if err != nil { fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err) }