From d5d28a7513b0970196a5f99eb6a0856fa4ec09e6 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 22 Nov 2023 15:05:44 +0000 Subject: [PATCH] operations: fix overwrite of destination when multi-thread transfer fails Before this change, if a multithread upload failed (let's say the source became unavailable) rclone would finalise the file first before aborting the transfer. This caused the partial file to be written which would overwrite any existing files. This was fixed by making sure we Abort the transfer before Close-ing it. This updates the docs to encourage calling of Abort before Close and updates writerAtChunkWriter to make sure that works properly. This also reworks the tests to detect this and to make sure we upload and download to each multi-thread capable backend (we were only downloading before which isn't a full test). Fixes #7071 --- fs/features.go | 4 +- fs/operations/multithread.go | 25 +++- fs/operations/multithread_test.go | 216 +++++++++++++++++++++++++----- 3 files changed, 206 insertions(+), 39 deletions(-) diff --git a/fs/features.go b/fs/features.go index 991b0aceb..060c4cc37 100644 --- a/fs/features.go +++ b/fs/features.go @@ -664,10 +664,12 @@ type ChunkWriter interface { // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) - // Close complete chunked writer + // Close complete chunked writer finalising the file. Close(ctx context.Context) error // Abort chunk write + // + // You can and should call Abort without calling Close. Abort(ctx context.Context) error } diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index d9e83be42..46e66dfa4 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -165,9 +165,10 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, uploadCtx, cancel := context.WithCancel(ctx) defer cancel() + uploadedOK := false defer atexit.OnError(&err, func() { cancel() - if info.LeavePartsOnError { + if info.LeavePartsOnError || uploadedOK { return } fs.Debugf(src, "multi-thread copy: cancelling transfer on exit") @@ -226,13 +227,14 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, } err = g.Wait() - closeErr := chunkWriter.Close(ctx) if err != nil { return nil, err } - if closeErr != nil { - return nil, fmt.Errorf("multi-thread copy: failed to close object after copy: %w", closeErr) + err = chunkWriter.Close(ctx) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to close object after copy: %w", err) } + uploadedOK = true // file is definitely uploaded OK so no need to abort obj, err := f.NewObject(ctx, remote) if err != nil { @@ -282,10 +284,11 @@ type writerAtChunkWriter struct { chunks int writeBufferSize int64 f fs.Fs + closed bool } // WriteChunk writes chunkNumber from reader -func (w writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { +func (w *writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { fs.Debugf(w.remote, "writing chunk %v", chunkNumber) bytesToWrite := w.chunkSize @@ -316,12 +319,20 @@ func (w writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, re } // Close the chunk writing -func (w writerAtChunkWriter) Close(ctx context.Context) error { +func (w *writerAtChunkWriter) Close(ctx context.Context) error { + if w.closed { + return nil + } + w.closed = true return w.writerAt.Close() } // Abort the chunk writing -func (w writerAtChunkWriter) Abort(ctx context.Context) error { +func (w *writerAtChunkWriter) Abort(ctx context.Context) error { + err := w.Close(ctx) + if err != nil { + fs.Errorf(w.remote, "multi-thread copy: failed to close file before aborting: %v", err) + } obj, err := w.f.NewObject(ctx, w.remote) if err != nil { return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err) diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index cd5f430f8..a61f34331 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -2,10 +2,16 @@ package operations import ( "context" + "errors" "fmt" + "io" + "sync" "testing" + "time" "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fstest/mockfs" "github.com/rclone/rclone/fstest/mockobject" "github.com/rclone/rclone/lib/random" @@ -108,45 +114,193 @@ func TestMultithreadCalculateNumChunks(t *testing.T) { } } +// Skip if not multithread, returning the chunkSize otherwise +func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int { + features := r.Fremote.Features() + if features.OpenChunkWriter == nil && features.OpenWriterAt == nil { + t.Skip("multithread writing not supported") + } + + // Only support one hash otherwise we end up spending a huge amount of CPU on hashing! + oldHashes := hash.SupportOnly([]hash.Type{r.Fremote.Hashes().GetOne()}) + t.Cleanup(func() { + _ = hash.SupportOnly(oldHashes) + }) + + ci := fs.GetConfig(ctx) + chunkSize := int(ci.MultiThreadChunkSize) + if features.OpenChunkWriter != nil { + //OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error) + const fileName = "chunksize-probe" + src := object.NewStaticObjectInfo(fileName, time.Now(), int64(100*fs.Mebi), true, nil, nil) + info, writer, err := features.OpenChunkWriter(ctx, fileName, src) + require.NoError(t, err) + chunkSize = int(info.ChunkSize) + err = writer.Abort(ctx) + require.NoError(t, err) + } + return chunkSize +} + func TestMultithreadCopy(t *testing.T) { r := fstest.NewRun(t) ctx := context.Background() + chunkSize := skipIfNotMultithread(ctx, t, r) - for _, test := range []struct { - size int - streams int - }{ - {size: multithreadChunkSize*2 - 1, streams: 2}, - {size: multithreadChunkSize * 2, streams: 2}, - {size: multithreadChunkSize*2 + 1, streams: 2}, - } { - t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { - if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit { - t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit) + for _, upload := range []bool{false, true} { + for _, test := range []struct { + size int + streams int + }{ + {size: chunkSize*2 - 1, streams: 2}, + {size: chunkSize * 2, streams: 2}, + {size: chunkSize*2 + 1, streams: 2}, + } { + fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams) + t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) { + if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit { + t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit) + } + var ( + contents = random.String(test.size) + t1 = fstest.Time("2001-02-03T04:05:06.499999999Z") + file1 fstest.Item + src, dst fs.Object + err error + ) + + if upload { + file1 = r.WriteFile(fileName, contents, t1) + r.CheckRemoteItems(t) + r.CheckLocalItems(t, file1) + src, err = r.Flocal.NewObject(ctx, fileName) + } else { + file1 = r.WriteObject(ctx, fileName, contents, t1) + r.CheckRemoteItems(t, file1) + r.CheckLocalItems(t) + src, err = r.Fremote.NewObject(ctx, fileName) + } + require.NoError(t, err) + + accounting.GlobalStats().ResetCounters() + tr := accounting.GlobalStats().NewTransfer(src) + + defer func() { + tr.Done(ctx, err) + }() + + if upload { + dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr) + } else { + dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr) + } + + require.NoError(t, err) + assert.Equal(t, src.Size(), dst.Size()) + assert.Equal(t, fileName, dst.Remote()) + fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote)) + fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote)) + require.NoError(t, dst.Remove(ctx)) + require.NoError(t, src.Remove(ctx)) + }) + } + } +} + +type errorObject struct { + fs.Object + size int64 + wg *sync.WaitGroup +} + +// Open opens the file for read. Call Close() on the returned io.ReadCloser +// +// Remember this is called multiple times whenever the backend seeks (eg having read checksum) +func (o errorObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + fs.Debugf(nil, "Open with options = %v", options) + rc, err := o.Object.Open(ctx, options...) + if err != nil { + return nil, err + } + // Return an error reader for the second segment + for _, option := range options { + if ropt, ok := option.(*fs.RangeOption); ok { + end := ropt.End + 1 + if end >= o.size { + // Give the other chunks a chance to start + time.Sleep(time.Second) + // Wait for chunks to upload first + o.wg.Wait() + fs.Debugf(nil, "Returning error reader") + return errorReadCloser{rc}, nil } - var err error - contents := random.String(test.size) - t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") - file1 := r.WriteObject(ctx, "file1", contents, t1) - r.CheckRemoteItems(t, file1) - r.CheckLocalItems(t) + } + } + o.wg.Add(1) + return wgReadCloser{rc, o.wg}, nil +} - src, err := r.Fremote.NewObject(ctx, "file1") - require.NoError(t, err) - accounting.GlobalStats().ResetCounters() - tr := accounting.GlobalStats().NewTransfer(src) +type errorReadCloser struct { + io.ReadCloser +} - defer func() { - tr.Done(ctx, err) - }() - dst, err := multiThreadCopy(ctx, r.Flocal, "file1", src, 2, tr) - require.NoError(t, err) - assert.Equal(t, src.Size(), dst.Size()) - assert.Equal(t, "file1", dst.Remote()) +func (rc errorReadCloser) Read(p []byte) (n int, err error) { + fs.Debugf(nil, "BOOM: simulated read failure") + return 0, errors.New("BOOM: simulated read failure") +} - fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote)) - require.NoError(t, dst.Remove(ctx)) - }) +type wgReadCloser struct { + io.ReadCloser + wg *sync.WaitGroup +} + +func (rc wgReadCloser) Close() (err error) { + rc.wg.Done() + return rc.ReadCloser.Close() +} + +// Make sure aborting the multi-thread copy doesn't overwrite an existing file. +func TestMultithreadCopyAbort(t *testing.T) { + r := fstest.NewRun(t) + ctx := context.Background() + chunkSize := skipIfNotMultithread(ctx, t, r) + size := 2*chunkSize + 1 + + if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit { + t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit) } + // first write a canary file which we are trying not to overwrite + const fileName = "test-multithread-abort" + contents := random.String(100) + t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") + canary := r.WriteObject(ctx, fileName, contents, t1) + r.CheckRemoteItems(t, canary) + + // Now write a local file to upload + file1 := r.WriteFile(fileName, random.String(size), t1) + r.CheckLocalItems(t, file1) + + src, err := r.Flocal.NewObject(ctx, fileName) + require.NoError(t, err) + accounting.GlobalStats().ResetCounters() + tr := accounting.GlobalStats().NewTransfer(src) + + defer func() { + tr.Done(ctx, err) + }() + wg := new(sync.WaitGroup) + dst, err := multiThreadCopy(ctx, r.Fremote, fileName, errorObject{src, int64(size), wg}, 1, tr) + assert.Error(t, err) + assert.Nil(t, dst) + + if r.Fremote.Features().PartialUploads { + r.CheckRemoteItems(t) + + } else { + r.CheckRemoteItems(t, canary) + o, err := r.Fremote.NewObject(ctx, fileName) + require.NoError(t, err) + require.NoError(t, o.Remove(ctx)) + } }