rcat: preserve metadata when Copy falls back to Rcat

Before this change if we copied files of unknown size, then they lost
their metadata.

This was particularly noticeable using --s3-decompress.

This change adds metadata to Rcat and RcatSized and changes Copy to
pass the metadata in when it calls Rcat for an unknown sized input.

Fixes #6546
This commit is contained in:
Nick Craig-Wood 2022-11-08 17:42:18 +00:00
parent ec2024b907
commit 617c5d5e1b
8 changed files with 151 additions and 20 deletions

View File

@ -759,7 +759,7 @@ func testFutureProof(t *testing.T, f *Fs) {
// Rcat must fail // Rcat must fail
in := io.NopCloser(bytes.NewBufferString("abc")) in := io.NopCloser(bytes.NewBufferString("abc"))
robj, err := operations.Rcat(ctx, f, file, in, modTime) robj, err := operations.Rcat(ctx, f, file, in, modTime, nil)
assert.Nil(t, robj) assert.Nil(t, robj)
assert.NotNil(t, err) assert.NotNil(t, err)
if err != nil { if err != nil {

View File

@ -66,7 +66,7 @@ a lot of data, you're better off caching locally and then
fdst, dstFileName := cmd.NewFsDstFile(args) fdst, dstFileName := cmd.NewFsDstFile(args)
cmd.Run(false, false, command, func() error { cmd.Run(false, false, command, func() error {
_, err := operations.RcatSize(context.Background(), fdst, dstFileName, os.Stdin, size, time.Now()) _, err := operations.RcatSize(context.Background(), fdst, dstFileName, os.Stdin, size, time.Now(), nil)
return err return err
}) })
}, },

View File

@ -295,7 +295,7 @@ func (s *Server) postObject(w http.ResponseWriter, r *http.Request, remote strin
} }
} }
o, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now()) o, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now(), nil)
if err != nil { if err != nil {
err = accounting.Stats(r.Context()).Error(err) err = accounting.Stats(r.Context()).Error(err)
fs.Errorf(remote, "Post request rcat error: %v", err) fs.Errorf(remote, "Post request rcat error: %v", err)

View File

@ -475,8 +475,16 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
} else { } else {
actionTaken = "Copied (Rcat, new)" actionTaken = "Copied (Rcat, new)"
} }
// Make any metadata to pass to rcat
var meta fs.Metadata
if ci.Metadata {
meta, err = fs.GetMetadata(ctx, src)
if err != nil {
fs.Errorf(src, "Failed to read metadata: %v", err)
}
}
// NB Rcat closes in0 // NB Rcat closes in0
dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx), meta)
newDst = dst newDst = dst
} else { } else {
in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer
@ -1347,7 +1355,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
} }
// Rcat reads data from the Reader until EOF and uploads it to a file on remote // Rcat reads data from the Reader until EOF and uploads it to a file on remote
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) {
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1) tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1)
defer func() { defer func() {
@ -1386,7 +1394,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
opt.checkSum = true opt.checkSum = true
sums = hasher.Sums() sums = hasher.Sums()
} }
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst) src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta)
if !equal(ctx, src, dst, opt) { if !equal(ctx, src, dst, opt) {
err = fmt.Errorf("corrupted on transfer") err = fmt.Errorf("corrupted on transfer")
err = fs.CountError(err) err = fs.CountError(err)
@ -1400,7 +1408,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
buf := make([]byte, ci.StreamingUploadCutoff) buf := make([]byte, ci.StreamingUploadCutoff)
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n)
src := object.NewMemoryObject(dstFileName, modTime, buf[:n]) src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta)
return Copy(ctx, fdst, nil, dstFileName, src) return Copy(ctx, fdst, nil, dstFileName, src)
} }
@ -1433,7 +1441,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
return nil, err return nil, err
} }
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil) objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil).WithMetadata(meta)
if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil { if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil {
return dst, err return dst, err
} }
@ -1442,7 +1450,22 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
} }
if !canStream { if !canStream {
// copy dst (which is the local object we have just streamed to) to the remote // copy dst (which is the local object we have just streamed to) to the remote
return Copy(ctx, fdst, nil, dstFileName, dst) newCtx := ctx
if ci.Metadata && len(meta) != 0 {
// If we have metadata and we are setting it then use
// the --metadataset mechanism to supply it to Copy
var newCi *fs.ConfigInfo
newCtx, newCi = fs.AddConfig(ctx)
if len(newCi.MetadataSet) == 0 {
newCi.MetadataSet = meta
} else {
var newMeta fs.Metadata
newMeta.Merge(meta)
newMeta.Merge(newCi.MetadataSet) // --metadata-set takes priority
newCi.MetadataSet = newMeta
}
}
return Copy(newCtx, fdst, nil, dstFileName, dst)
} }
return dst, nil return dst, nil
} }
@ -1724,7 +1747,7 @@ func NeedTransfer(ctx context.Context, dst, src fs.Object) bool {
// RcatSize reads data from the Reader until EOF and uploads it to a file on remote. // RcatSize reads data from the Reader until EOF and uploads it to a file on remote.
// Pass in size >=0 if known, <0 if not known // Pass in size >=0 if known, <0 if not known
func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, size int64, modTime time.Time) (dst fs.Object, err error) { func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, size int64, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) {
var obj fs.Object var obj fs.Object
if size >= 0 { if size >= 0 {
@ -1743,7 +1766,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
return nil, err return nil, err
} }
info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst) info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst).WithMetadata(meta)
obj, err = fdst.Put(ctx, in, info) obj, err = fdst.Put(ctx, in, info)
if err != nil { if err != nil {
fs.Errorf(dstFileName, "Post request put error: %v", err) fs.Errorf(dstFileName, "Post request put error: %v", err)
@ -1752,7 +1775,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
} }
} else { } else {
// Size unknown use Rcat // Size unknown use Rcat
obj, err = Rcat(ctx, fdst, dstFileName, in, modTime) obj, err = Rcat(ctx, fdst, dstFileName, in, modTime, meta)
if err != nil { if err != nil {
fs.Errorf(dstFileName, "Post request rcat error: %v", err) fs.Errorf(dstFileName, "Post request rcat error: %v", err)
@ -1811,7 +1834,7 @@ func CopyURL(ctx context.Context, fdst fs.Fs, dstFileName string, url string, au
return errors.New("CopyURL failed: file already exist") return errors.New("CopyURL failed: file already exist")
} }
} }
dst, err = RcatSize(ctx, fdst, dstFileName, in, size, modTime) dst, err = RcatSize(ctx, fdst, dstFileName, in, size, modTime, nil)
return err return err
}) })
return dst, err return dst, err

View File

@ -1590,11 +1590,11 @@ func TestRcat(t *testing.T) {
path2 := prefix + "big_file_from_pipe" path2 := prefix + "big_file_from_pipe"
in := io.NopCloser(strings.NewReader(data1)) in := io.NopCloser(strings.NewReader(data1))
_, err := operations.Rcat(ctx, r.Fremote, path1, in, t1) _, err := operations.Rcat(ctx, r.Fremote, path1, in, t1, nil)
require.NoError(t, err) require.NoError(t, err)
in = io.NopCloser(strings.NewReader(data2)) in = io.NopCloser(strings.NewReader(data2))
_, err = operations.Rcat(ctx, r.Fremote, path2, in, t2) _, err = operations.Rcat(ctx, r.Fremote, path2, in, t2, nil)
require.NoError(t, err) require.NoError(t, err)
file1 := fstest.NewItem(path1, data1, t1) file1 := fstest.NewItem(path1, data1, t1)
@ -1611,6 +1611,62 @@ func TestRcat(t *testing.T) {
} }
} }
func TestRcatMetadata(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
if !r.Fremote.Features().UserMetadata {
t.Skip("Skipping as destination doesn't support user metadata")
}
test := func(disableUploadCutoff bool) {
ctx := context.Background()
ctx, ci := fs.AddConfig(ctx)
ci.Metadata = true
data := "this is some really nice test data with metadata"
path := "rcat_metadata"
meta := fs.Metadata{
"key": "value",
"sausage": "potato",
}
if disableUploadCutoff {
ci.StreamingUploadCutoff = 0
data += " uploadCutoff=0"
path += "_uploadcutoff0"
}
fstest.CheckListing(t, r.Fremote, []fstest.Item{})
in := io.NopCloser(strings.NewReader(data))
_, err := operations.Rcat(ctx, r.Fremote, path, in, t1, meta)
require.NoError(t, err)
file := fstest.NewItem(path, data, t1)
r.CheckRemoteItems(t, file)
o, err := r.Fremote.NewObject(ctx, path)
require.NoError(t, err)
gotMeta, err := fs.GetMetadata(ctx, o)
require.NoError(t, err)
// Check the specific user data we set is set
// Likey there will be other values
assert.Equal(t, "value", gotMeta["key"])
assert.Equal(t, "potato", gotMeta["sausage"])
// Delete the test file
require.NoError(t, o.Remove(ctx))
}
t.Run("Normal", func(t *testing.T) {
test(false)
})
t.Run("ViaDisk", func(t *testing.T) {
test(true)
})
}
func TestRcatSize(t *testing.T) { func TestRcatSize(t *testing.T) {
ctx := context.Background() ctx := context.Background()
r := fstest.NewRun(t) r := fstest.NewRun(t)
@ -1621,7 +1677,7 @@ func TestRcatSize(t *testing.T) {
file2 := r.WriteFile("potato2", body, t2) file2 := r.WriteFile("potato2", body, t2)
// Test with known length // Test with known length
bodyReader := io.NopCloser(strings.NewReader(body)) bodyReader := io.NopCloser(strings.NewReader(body))
obj, err := operations.RcatSize(ctx, r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime) obj, err := operations.RcatSize(ctx, r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime, nil)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, int64(len(body)), obj.Size()) assert.Equal(t, int64(len(body)), obj.Size())
assert.Equal(t, file1.Path, obj.Remote()) assert.Equal(t, file1.Path, obj.Remote())
@ -1629,7 +1685,7 @@ func TestRcatSize(t *testing.T) {
// Test with unknown length // Test with unknown length
bodyReader = io.NopCloser(strings.NewReader(body)) // reset Reader bodyReader = io.NopCloser(strings.NewReader(body)) // reset Reader
io.NopCloser(strings.NewReader(body)) io.NopCloser(strings.NewReader(body))
obj, err = operations.RcatSize(ctx, r.Fremote, file2.Path, bodyReader, -1, file2.ModTime) obj, err = operations.RcatSize(ctx, r.Fremote, file2.Path, bodyReader, -1, file2.ModTime, nil)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, int64(len(body)), obj.Size()) assert.Equal(t, int64(len(body)), obj.Size())
assert.Equal(t, file2.Path, obj.Remote()) assert.Equal(t, file2.Path, obj.Remote())
@ -1638,6 +1694,58 @@ func TestRcatSize(t *testing.T) {
r.CheckRemoteItems(t, file1, file2) r.CheckRemoteItems(t, file1, file2)
} }
func TestRcatSizeMetadata(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
if !r.Fremote.Features().UserMetadata {
t.Skip("Skipping as destination doesn't support user metadata")
}
ctx := context.Background()
ctx, ci := fs.AddConfig(ctx)
ci.Metadata = true
meta := fs.Metadata{
"key": "value",
"sausage": "potato",
}
const body = "------------------------------------------------------------"
file1 := r.WriteFile("potato1", body, t1)
file2 := r.WriteFile("potato2", body, t2)
// Test with known length
bodyReader := io.NopCloser(strings.NewReader(body))
obj, err := operations.RcatSize(ctx, r.Fremote, file1.Path, bodyReader, int64(len(body)), file1.ModTime, meta)
require.NoError(t, err)
assert.Equal(t, int64(len(body)), obj.Size())
assert.Equal(t, file1.Path, obj.Remote())
// Test with unknown length
bodyReader = io.NopCloser(strings.NewReader(body)) // reset Reader
io.NopCloser(strings.NewReader(body))
obj, err = operations.RcatSize(ctx, r.Fremote, file2.Path, bodyReader, -1, file2.ModTime, meta)
require.NoError(t, err)
assert.Equal(t, int64(len(body)), obj.Size())
assert.Equal(t, file2.Path, obj.Remote())
// Check files exist
r.CheckRemoteItems(t, file1, file2)
// Check metadata OK
for _, path := range []string{file1.Path, file2.Path} {
o, err := r.Fremote.NewObject(ctx, path)
require.NoError(t, err)
gotMeta, err := fs.GetMetadata(ctx, o)
require.NoError(t, err)
// Check the specific user data we set is set
// Likey there will be other values
assert.Equal(t, "value", gotMeta["key"])
assert.Equal(t, "potato", gotMeta["sausage"])
}
}
func TestCopyFileMaxTransfer(t *testing.T) { func TestCopyFileMaxTransfer(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, ci := fs.AddConfig(ctx) ctx, ci := fs.AddConfig(ctx)

View File

@ -305,7 +305,7 @@ func rcSingleCommand(ctx context.Context, in rc.Params, name string, noRemote bo
return nil, err return nil, err
} }
if p.FileName() != "" { if p.FileName() != "" {
obj, err := Rcat(ctx, f, path.Join(remote, p.FileName()), p, time.Now()) obj, err := Rcat(ctx, f, path.Join(remote, p.FileName()), p, time.Now(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -85,7 +85,7 @@ func TestDownloaders(t *testing.T) {
// Write the test file // Write the test file
in := io.NopCloser(readers.NewPatternReader(size)) in := io.NopCloser(readers.NewPatternReader(size))
src, err := operations.RcatSize(ctx, r.Fremote, remote, in, size, time.Now()) src, err := operations.RcatSize(ctx, r.Fremote, remote, in, size, time.Now(), nil)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, size, src.Size()) assert.Equal(t, size, src.Size())

View File

@ -68,7 +68,7 @@ func (fh *WriteFileHandle) openPending() (err error) {
pipeReader, fh.pipeWriter = io.Pipe() pipeReader, fh.pipeWriter = io.Pipe()
go func() { go func() {
// NB Rcat deals with Stats.Transferring, etc. // NB Rcat deals with Stats.Transferring, etc.
o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now()) o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now(), nil)
if err != nil { if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err) fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err)
} }