swift: implement copying large objects

This commit is contained in:
nguyenhuuluan434 2021-03-02 15:58:14 +07:00 committed by Nick Craig-Wood
parent 12cd322643
commit d6870473a1
3 changed files with 153 additions and 8 deletions

View File

@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/google/uuid"
"github.com/ncw/swift/v2"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
@ -905,18 +906,125 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
srcContainer, srcPath := srcObj.split()
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil)
return shouldRetryHeaders(ctx, rxHeaders, err)
})
isLargeObject, err := srcObj.isLargeObject(ctx)
if err != nil {
return nil, err
}
if isLargeObject {
/*handle large object*/
err = copyLargeObject(ctx, f, srcObj, dstContainer, dstPath)
} else {
srcContainer, srcPath := srcObj.split()
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil)
return shouldRetryHeaders(ctx, rxHeaders, err)
})
}
if err != nil {
return nil, err
}
return f.NewObject(ctx, remote)
}
func copyLargeObject(ctx context.Context, f *Fs, src *Object, dstContainer string, dstPath string) error {
segmentsContainer := dstContainer + "_segments"
err := f.makeContainer(ctx, segmentsContainer)
if err != nil {
return err
}
segments, err := src.getSegmentsLargeObject(ctx)
if err != nil {
return err
}
if len(segments) == 0 {
return errors.New("could not copy object, list segments are empty")
}
nanoSeconds := time.Now().Nanosecond()
prefixSegment := fmt.Sprintf("%v/%v/%s", nanoSeconds, src.size, strings.ReplaceAll(uuid.New().String(), "-", ""))
copiedSegmentsLen := 10
for _, value := range segments {
if len(value) <= 0 {
continue
}
fragment := value[0]
if len(fragment) <= 0 {
continue
}
copiedSegmentsLen = len(value)
firstIndex := strings.Index(fragment, "/")
if firstIndex < 0 {
firstIndex = 0
} else {
firstIndex = firstIndex + 1
}
lastIndex := strings.LastIndex(fragment, "/")
if lastIndex < 0 {
lastIndex = len(fragment)
} else {
lastIndex = lastIndex - 1
}
prefixSegment = fragment[firstIndex:lastIndex]
break
}
copiedSegments := make([]string, copiedSegmentsLen)
defer handleCopyFail(ctx, f, segmentsContainer, copiedSegments, err)
for c, ss := range segments {
if len(ss) <= 0 {
continue
}
for _, s := range ss {
lastIndex := strings.LastIndex(s, "/")
if lastIndex <= 0 {
lastIndex = 0
} else {
lastIndex = lastIndex + 1
}
segmentName := dstPath + "/" + prefixSegment + "/" + s[lastIndex:]
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, c, s, segmentsContainer, segmentName, nil)
copiedSegments = append(copiedSegments, segmentName)
return shouldRetryHeaders(ctx, rxHeaders, err)
})
if err != nil {
return err
}
}
}
m := swift.Metadata{}
headers := m.ObjectHeaders()
headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s/%s", segmentsContainer, dstPath, prefixSegment))
headers["Content-Length"] = "0"
emptyReader := bytes.NewReader(nil)
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectPut(ctx, dstContainer, dstPath, emptyReader, true, "", src.contentType, headers)
return shouldRetryHeaders(ctx, rxHeaders, err)
})
return err
}
//remove copied segments when copy process failed
func handleCopyFail(ctx context.Context, f *Fs, segmentsContainer string, segments []string, err error) {
fs.Debugf(f, "handle copy segment fail")
if err == nil {
return
}
if len(segmentsContainer) == 0 {
fs.Debugf(f, "invalid segments container")
return
}
if len(segments) == 0 {
fs.Debugf(f, "segments is empty")
return
}
fs.Debugf(f, "action delete segments what copied")
for _, v := range segments {
_ = f.c.ObjectDelete(ctx, segmentsContainer, v)
}
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.MD5)
@ -1244,7 +1352,8 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.
if segmentInfos == nil || len(segmentInfos) == 0 {
return
}
deleteChunks(ctx, o, segmentsContainer, segmentInfos)
_ctx := context.Background()
deleteChunks(_ctx, o, segmentsContainer, segmentInfos)
})()
for {
// can we read at least one byte?

View File

@ -80,6 +80,7 @@ func (f *Fs) InternalTest(t *testing.T) {
t.Run("NoChunk", f.testNoChunk)
t.Run("WithChunk", f.testWithChunk)
t.Run("WithChunkFail", f.testWithChunkFail)
t.Run("CopyLargeObject", f.testCopyLargeObject)
}
func (f *Fs) testWithChunk(t *testing.T) {
@ -154,4 +155,39 @@ func (f *Fs) testWithChunkFail(t *testing.T) {
require.Empty(t, objs)
}
func (f *Fs) testCopyLargeObject(t *testing.T) {
preConfChunkSize := f.opt.ChunkSize
preConfChunk := f.opt.NoChunk
f.opt.NoChunk = false
f.opt.ChunkSize = 1024 * fs.Byte
defer func() {
//restore old config after test
f.opt.ChunkSize = preConfChunkSize
f.opt.NoChunk = preConfChunk
}()
file := fstest.Item{
ModTime: fstest.Time("2020-12-31T04:05:06.499999999Z"),
Path: "large.txt",
Size: -1, // use unknown size during upload
}
const contentSize = 2048
contents := random.String(contentSize)
buf := bytes.NewBufferString(contents)
uploadHash := hash.NewMultiHasher()
in := io.TeeReader(buf, uploadHash)
file.Size = -1
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
ctx := context.TODO()
obj, err := f.Features().PutStream(ctx, in, obji)
require.NoError(t, err)
require.NotEmpty(t, obj)
remoteTarget := "large.txt (copy)"
objTarget, err := f.Features().Copy(ctx, obj, remoteTarget)
require.NoError(t, err)
require.NotEmpty(t, objTarget)
require.Equal(t, obj.Size(), objTarget.Size())
}
var _ fstests.InternalTester = (*Fs)(nil)

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/gabriel-vasile/mimetype v1.1.2
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/google/uuid v1.2.0
github.com/hanwen/go-fuse/v2 v2.0.3
github.com/iguanesolutions/go-systemd/v5 v5.0.0
github.com/jcmturner/gokrb5/v8 v8.4.2