backend/internetarchive: fix uploads can take very long time

* fill in empty values for non-wait mode
* add tracking metadata to observe file change
* completely remove getHashes
* remove unreliable update tests

Closes #6150
This commit is contained in:
Lesmiscore 2022-05-08 05:54:14 +09:00 committed by GitHub
parent a3d4307892
commit 1d6d41fb91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 54 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/lib/rest"
)
@ -129,6 +130,7 @@ type IAFile struct {
// Source string `json:"source"`
Mtime string `json:"mtime"`
RcloneMtime json.RawMessage `json:"rclone-mtime"`
UpdateTrack json.RawMessage `json:"rclone-update-track"`
Size string `json:"size"`
Md5 string `json:"md5"`
Crc32 string `json:"crc32"`
@ -294,7 +296,7 @@ func (o *Object) Storable() bool {
return true
}
// SetModTime is not supported
// SetModTime sets modTime on a particular file
func (o *Object) SetModTime(ctx context.Context, t time.Time) (err error) {
bucket, reqDir := o.split()
if bucket == "" {
@ -483,6 +485,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Objec
return nil, fs.ErrorCantCopy
}
updateTracker := random.String(32)
headers := map[string]string{
"x-archive-auto-make-bucket": "1",
"x-archive-queue-derive": "0",
@ -494,7 +497,8 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Objec
"x-archive-filemeta-crc32": srcObj.crc32,
"x-archive-filemeta-size": fmt.Sprint(srcObj.size),
// add this too for sure
"x-archive-filemeta-rclone-mtime": srcObj.modTime.Format(time.RFC3339Nano),
"x-archive-filemeta-rclone-mtime": srcObj.modTime.Format(time.RFC3339Nano),
"x-archive-filemeta-rclone-update-track": updateTracker,
}
// make a PUT request at (IAS3)/:item/:path without body
@ -515,7 +519,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Objec
// we can't update/find metadata here as IA will also
// queue server-side copy as well as upload/delete.
return f.waitFileUpload(ctx, trimPathPrefix(path.Join(dstBucket, dstPath), f.root, f.opt.Enc), f.getHashes(ctx, src), srcObj.size)
return f.waitFileUpload(ctx, trimPathPrefix(path.Join(dstBucket, dstPath), f.root, f.opt.Enc), updateTracker, srcObj.size)
}
// ListR lists the objects and directories of the Fs starting
@ -660,12 +664,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
bucket, bucketPath := o.split()
modTime := src.ModTime(ctx)
size := src.Size()
updateTracker := random.String(32)
// Set the mtime in the metadata
// internetarchive backend builds at header level as IAS3 has extension outside X-Amz-
headers := map[string]string{
// https://github.com/jjjake/internetarchive/blob/2456376533251df9d05e0a14d796ec1ced4959f5/internetarchive/iarequest.py#L158
"x-amz-filemeta-rclone-mtime": modTime.Format(time.RFC3339Nano),
"x-amz-filemeta-rclone-mtime": modTime.Format(time.RFC3339Nano),
"x-amz-filemeta-rclone-update-track": updateTracker,
// we add some more headers for intuitive actions
"x-amz-auto-make-bucket": "1", // create an item if does not exist, do nothing if already
@ -712,7 +718,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// or we have to wait for finish? (needs polling (frontend)/metadata/:item or scraping (frontend)/history/:item)
var newObj *Object
if err == nil {
newObj, err = o.fs.waitFileUpload(ctx, o.remote, o.fs.getHashes(ctx, src), size)
newObj, err = o.fs.waitFileUpload(ctx, o.remote, updateTracker, size)
} else {
newObj = &Object{}
}
@ -782,18 +788,6 @@ func (o *Object) split() (bucket, bucketPath string) {
return o.fs.split(o.remote)
}
func (f *Fs) getHashes(ctx context.Context, src fs.ObjectInfo) map[hash.Type]string {
hashMap := map[hash.Type]string{}
for _, ty := range f.Hashes().Array() {
sum, err := src.Hash(ctx, ty)
if err != nil || sum == "" {
continue
}
hashMap[ty] = sum
}
return hashMap
}
func (f *Fs) requestMetadata(ctx context.Context, bucket string) (result MetadataResponse, err error) {
var resp *http.Response
// make a GET request to (frontend)/metadata/:item/
@ -852,7 +846,7 @@ func (f *Fs) listAllUnconstrained(ctx context.Context, bucket string) (entries f
return entries, nil
}
func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[hash.Type]string, newSize int64) (ret *Object, err error) {
func (f *Fs) waitFileUpload(ctx context.Context, reqPath, tracker string, newSize int64) (ret *Object, err error) {
bucket, bucketPath := f.split(reqPath)
ret = &Object{
@ -869,6 +863,10 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h
ret2, ok := ret2.(*Object)
if ok {
ret = ret2
ret.crc32 = ""
ret.md5 = ""
ret.sha1 = ""
ret.size = -1
}
}
return ret, nil
@ -881,9 +879,6 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h
go func() {
isFirstTime := true
existed := false
oldMtime := ""
oldCrc32 := ""
unreliablePassCount := 0
for {
if !isFirstTime {
// depending on the queue, it takes time
@ -908,10 +903,6 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h
if isFirstTime {
isFirstTime = false
existed = iaFile != nil
if iaFile != nil {
oldMtime = iaFile.Mtime
oldCrc32 = iaFile.Crc32
}
}
if iaFile == nil {
continue
@ -925,38 +916,20 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h
return
}
hashMatched := true
for tt, sum := range newHashes {
if tt == hash.MD5 && !hash.Equals(iaFile.Md5, sum) {
hashMatched = false
break
}
if tt == hash.SHA1 && !hash.Equals(iaFile.Sha1, sum) {
hashMatched = false
break
}
if tt == hash.CRC32 && !hash.Equals(iaFile.Crc32, sum) {
hashMatched = false
fileTrackers, _ := listOrString(iaFile.UpdateTrack)
trackerMatch := false
for _, v := range fileTrackers {
if v == tracker {
trackerMatch = true
break
}
}
if !hashMatched {
if !trackerMatch {
continue
}
if !compareSize(parseSize(iaFile.Size), newSize) {
continue
}
if hash.Equals(oldCrc32, iaFile.Crc32) && unreliablePassCount < 60 {
// the following two are based on a sort of "bad" assumption;
// what if the file is updated immediately, before polling?
// by limiting hits of these tests, avoid infinite loop
unreliablePassCount++
continue
}
if hash.Equals(iaFile.Mtime, oldMtime) && unreliablePassCount < 60 {
unreliablePassCount++
continue
}
// voila!
retC <- struct {
@ -1036,20 +1009,24 @@ func makeValidObject2(f *Fs, file IAFile, bucket string) *Object {
return makeValidObject(f, trimPathPrefix(path.Join(bucket, file.Name), f.root, f.opt.Enc), file, mtimeTime, size)
}
func (file IAFile) parseMtime() (mtime time.Time) {
// method 1: use metadata added by rclone
var rmArray []string
func listOrString(jm json.RawMessage) (rmArray []string, err error) {
// rclone-metadata can be an array or string
// try to deserialize it as array first
err := json.Unmarshal(file.RcloneMtime, &rmArray)
err = json.Unmarshal(jm, &rmArray)
if err != nil {
// if not, it's a string
dst := new(string)
err = json.Unmarshal(file.RcloneMtime, dst)
err = json.Unmarshal(jm, dst)
if err == nil {
rmArray = []string{*dst}
}
}
return
}
func (file IAFile) parseMtime() (mtime time.Time) {
// method 1: use metadata added by rclone
rmArray, err := listOrString(file.RcloneMtime)
// let's take the first value we can deserialize
for _, value := range rmArray {
mtime, err = time.Parse(time.RFC3339Nano, value)