diff --git a/backend/qingstor/qingstor.go b/backend/qingstor/qingstor.go index 5492e79e9..5acb536c9 100644 --- a/backend/qingstor/qingstor.go +++ b/backend/qingstor/qingstor.go @@ -14,7 +14,6 @@ import ( "regexp" "strconv" "strings" - "sync" "time" "github.com/pkg/errors" @@ -24,6 +23,7 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/bucket" qsConfig "github.com/yunify/qingstor-sdk-go/v3/config" qsErr "github.com/yunify/qingstor-sdk-go/v3/request/errors" qs "github.com/yunify/qingstor-sdk-go/v3/service" @@ -146,16 +146,15 @@ type Options struct { // Fs represents a remote qingstor server type Fs struct { - name string // The name of the remote - root string // The root is a subdir, is a special object - opt Options // parsed options - features *fs.Features // optional features - svc *qs.Service // The connection to the qingstor server - zone string // The zone we are working on - bucket string // The bucket we are working on - bucketOKMu sync.Mutex // mutex to protect bucketOK and bucketDeleted - bucketOK bool // true if we have created the bucket - bucketDeleted bool // true if we have deleted the bucket + name string // The name of the remote + root string // The root is a subdir, is a special object + opt Options // parsed options + features *fs.Features // optional features + svc *qs.Service // The connection to the qingstor server + zone string // The zone we are working on + rootBucket string // bucket part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache for bucket creation status } // Object describes a qingstor object @@ -176,22 +175,23 @@ type Object struct { // ------------------------------------------------------------ -// Pattern to match a qingstor path -var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) - -// parseParse parses a qingstor 'url' -func qsParsePath(path string) (bucket, key string, err error) { - // Pattern to match a qingstor path - parts := matcher.FindStringSubmatch(path) - if parts == nil { - err = errors.Errorf("Couldn't parse bucket out of qingstor path %q", path) - } else { - bucket, key = parts[1], parts[2] - key = strings.Trim(key, "/") - } +// parsePath parses a remote 'url' +func parsePath(path string) (root string) { + root = strings.Trim(path, "/") return } +// split returns bucket and bucketPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) { + return bucket.Split(path.Join(f.root, rootRelativePath)) +} + +// split returns bucket and bucketPath from the object +func (o *Object) split() (bucket, bucketPath string) { + return o.fs.split(o.remote) +} + // Split an URL into three parts: protocol host and port func qsParseEndpoint(endpoint string) (protocol, host, port string, err error) { /* @@ -301,6 +301,12 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { return } +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = parsePath(root) + f.rootBucket, f.rootDirectory = bucket.Split(f.root) +} + // NewFs constructs an Fs from the path, bucket:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -317,10 +323,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, errors.Wrap(err, "qingstor: upload cutoff") } - bucket, key, err := qsParsePath(root) - if err != nil { - return nil, err - } svc, err := qsServiceConnection(opt) if err != nil { return nil, err @@ -331,36 +333,33 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { } f := &Fs{ - name: name, - root: key, - opt: *opt, - svc: svc, - zone: opt.Zone, - bucket: bucket, + name: name, + opt: *opt, + svc: svc, + zone: opt.Zone, + cache: bucket.NewCache(), } + f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, }).Fill(f) - if f.root != "" { - if !strings.HasSuffix(f.root, "/") { - f.root += "/" - } - //Check to see if the object exists - bucketInit, err := svc.Bucket(bucket, opt.Zone) + if f.rootBucket != "" && f.rootDirectory != "" { + // Check to see if the object exists + bucketInit, err := svc.Bucket(f.rootBucket, opt.Zone) if err != nil { return nil, err } - _, err = bucketInit.HeadObject(key, &qs.HeadObjectInput{}) + _, err = bucketInit.HeadObject(f.rootDirectory, &qs.HeadObjectInput{}) if err == nil { - f.root = path.Dir(key) - if f.root == "." { - f.root = "" - } else { - f.root += "/" + newRoot := path.Dir(f.root) + if newRoot == "." { + newRoot = "" } + f.setRoot(newRoot) // return an error with an fs which points to the parent return f, fs.ErrorIsFile } @@ -375,18 +374,18 @@ func (f *Fs) Name() string { // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { - if f.root == "" { - return f.bucket - } - return f.bucket + "/" + f.root + return f.root } // String converts this Fs to a string func (f *Fs) String() string { - if f.root == "" { - return fmt.Sprintf("QingStor bucket %s", f.bucket) + if f.rootBucket == "" { + return fmt.Sprintf("QingStor root") } - return fmt.Sprintf("QingStor bucket %s root %s", f.bucket, f.root) + if f.rootDirectory == "" { + return fmt.Sprintf("QingStor bucket %s", f.rootBucket) + } + return fmt.Sprintf("QingStor bucket %s path %s", f.rootBucket, f.rootDirectory) } // Precision of the remote @@ -426,6 +425,7 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + dstBucket, dstPath := f.split(remote) err := f.Mkdir(ctx, "") if err != nil { return nil, err @@ -435,22 +435,21 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - srcFs := srcObj.fs - key := f.root + remote - source := path.Join("/"+srcFs.bucket, srcFs.root+srcObj.remote) + srcBucket, srcPath := srcObj.split() + source := path.Join("/", srcBucket, srcPath) - fs.Debugf(f, "Copied, source key is: %s, and dst key is: %s", source, key) + // fs.Debugf(f, "Copied, source key is: %s, and dst key is: %s", source, key) req := qs.PutObjectInput{ XQSCopySource: &source, } - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) + bucketInit, err := f.svc.Bucket(dstBucket, f.zone) if err != nil { return nil, err } - _, err = bucketInit.PutObject(key, &req) + _, err = bucketInit.PutObject(dstPath, &req) if err != nil { - fs.Debugf(f, "Copy Failed, API Error: %v", err) + // fs.Debugf(f, "Copy Failed, API Error: %v", err) return nil, err } return f.NewObject(ctx, remote) @@ -511,29 +510,27 @@ type listFn func(remote string, object *qs.KeyType, isDirectory bool) error // dir is the starting directory, "" for root // // Set recurse to read sub directories -func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) error { - prefix := f.root - if dir != "" { - prefix += dir + "/" +func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error { + if prefix != "" { + prefix += "/" + } + if directory != "" { + directory += "/" } - delimiter := "" if !recurse { delimiter = "/" } - maxLimit := int(listLimitSize) var marker *string - for { - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) + bucketInit, err := f.svc.Bucket(bucket, f.zone) if err != nil { return err } - // FIXME need to implement ALL loop req := qs.ListObjectsInput{ Delimiter: &delimiter, - Prefix: &prefix, + Prefix: &directory, Limit: &maxLimit, Marker: marker, } @@ -546,7 +543,6 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro } return err } - rootLength := len(f.root) if !recurse { for _, commonPrefix := range resp.CommonPrefixes { if commonPrefix == nil { @@ -554,15 +550,17 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro continue } remote := *commonPrefix - if !strings.HasPrefix(remote, f.root) { + if !strings.HasPrefix(remote, prefix) { fs.Logf(f, "Odd name received %q", remote) continue } - remote = remote[rootLength:] + remote = remote[len(prefix):] + if addBucket { + remote = path.Join(bucket, remote) + } if strings.HasSuffix(remote, "/") { remote = remote[:len(remote)-1] } - err = fn(remote, &qs.KeyType{Key: &remote}, true) if err != nil { return err @@ -572,11 +570,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro for _, object := range resp.Keys { key := qs.StringValue(object.Key) - if !strings.HasPrefix(key, f.root) { + if !strings.HasPrefix(key, prefix) { fs.Logf(f, "Odd name received %q", key) continue } - remote := key[rootLength:] + remote := key[len(prefix):] + if addBucket { + remote = path.Join(bucket, remote) + } err = fn(remote, object, false) if err != nil { return err @@ -613,20 +614,10 @@ func (f *Fs) itemToDirEntry(remote string, object *qs.KeyType, isDirectory bool) return o, nil } -// mark the bucket as being OK -func (f *Fs) markBucketOK() { - if f.bucket != "" { - f.bucketOKMu.Lock() - f.bucketOK = true - f.bucketDeleted = false - f.bucketOKMu.Unlock() - } -} - // listDir lists files and directories to out -func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { // List the objects and directories - err = f.list(ctx, dir, false, func(remote string, object *qs.KeyType, isDirectory bool) error { + err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *qs.KeyType, isDirectory bool) error { entry, err := f.itemToDirEntry(remote, object, isDirectory) if err != nil { return err @@ -640,7 +631,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er return nil, err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return entries, nil } @@ -675,10 +666,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if f.bucket == "" { + bucket, directory := f.split(dir) + if bucket == "" { return f.listBuckets(dir) } - return f.listDir(ctx, dir) + return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } // ListR lists the objects and directories of the Fs starting @@ -698,106 +690,98 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - if f.bucket == "" { - return fs.ErrorListBucketRequired - } + bucket, directory := f.split(dir) list := walk.NewListRHelper(callback) - err = f.list(ctx, dir, true, func(remote string, object *qs.KeyType, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) + listR := func(bucket, directory, prefix string, addBucket bool) error { + return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *qs.KeyType, isDirectory bool) error { + entry, err := f.itemToDirEntry(remote, object, isDirectory) + if err != nil { + return err + } + return list.Add(entry) + }) + } + if bucket == "" { + entries, err := f.listBuckets("") if err != nil { return err } - return list.Add(entry) - }) - if err != nil { - return err - } - // bucket must be present if listing succeeded - f.markBucketOK() - return list.Flush() -} - -// Check if the bucket exists -func (f *Fs) dirExists() (bool, error) { - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) - if err != nil { - return false, err - } - - _, err = bucketInit.Head() - if err == nil { - return true, nil - } - - if e, ok := err.(*qsErr.QingStorError); ok { - if e.StatusCode == http.StatusNotFound { - err = nil + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + bucket := entry.Remote() + err = listR(bucket, "", f.rootDirectory, true) + if err != nil { + return err + } + } + } else { + err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") + if err != nil { + return err } } - return false, err + // bucket must be present if listing succeeded + f.cache.MarkOK(bucket) + return list.Flush() } // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.bucketOK { - return nil - } - - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) - if err != nil { - return err - } - /* When delete a bucket, qingstor need about 60 second to sync status; - So, need wait for it sync end if we try to operation a just deleted bucket - */ - retries := 0 - for retries <= 120 { - statistics, err := bucketInit.GetStatistics() - if statistics == nil || err != nil { - break - } - switch *statistics.Status { - case "deleted": - fs.Debugf(f, "Wait for qingstor sync bucket status, retries: %d", retries) - time.Sleep(time.Second * 1) - retries++ - continue - default: - break - } - break - } - - if !f.bucketDeleted { - exists, err := f.dirExists() - if err == nil { - f.bucketOK = exists - } - if err != nil || exists { + bucket, _ := f.split(dir) + return f.cache.Create(bucket, func() error { + bucketInit, err := f.svc.Bucket(bucket, f.zone) + if err != nil { return err } - } - - _, err = bucketInit.Put() - if e, ok := err.(*qsErr.QingStorError); ok { - if e.StatusCode == http.StatusConflict { - err = nil + /* When delete a bucket, qingstor need about 60 second to sync status; + So, need wait for it sync end if we try to operation a just deleted bucket + */ + wasDeleted := false + retries := 0 + for retries <= 120 { + statistics, err := bucketInit.GetStatistics() + if statistics == nil || err != nil { + break + } + switch *statistics.Status { + case "deleted": + fs.Debugf(f, "Wait for qingstor bucket to be deleted, retries: %d", retries) + time.Sleep(time.Second * 1) + retries++ + wasDeleted = true + continue + default: + break + } + break } - } - if err == nil { - f.bucketOK = true - f.bucketDeleted = false - } - - return err + retries = 0 + for retries <= 120 { + _, err = bucketInit.Put() + if e, ok := err.(*qsErr.QingStorError); ok { + if e.StatusCode == http.StatusConflict { + if wasDeleted { + fs.Debugf(f, "Wait for qingstor bucket to be creatable, retries: %d", retries) + time.Sleep(time.Second * 1) + retries++ + continue + } + err = nil + } + } + break + } + return err + }, nil) } -// dirIsEmpty check if the bucket empty -func (f *Fs) dirIsEmpty() (bool, error) { - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) +// bucketIsEmpty check if the bucket empty +func (f *Fs) bucketIsEmpty(bucket string) (bool, error) { + bucketInit, err := f.svc.Bucket(bucket, f.zone) if err != nil { return true, err } @@ -815,71 +799,64 @@ func (f *Fs) dirIsEmpty() (bool, error) { // Rmdir delete a bucket func (f *Fs) Rmdir(ctx context.Context, dir string) error { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.root != "" || dir != "" { + bucket, directory := f.split(dir) + if bucket == "" || directory != "" { return nil } - - isEmpty, err := f.dirIsEmpty() + isEmpty, err := f.bucketIsEmpty(bucket) if err != nil { return err } if !isEmpty { - fs.Debugf(f, "The bucket %s you tried to delete not empty.", f.bucket) + // fs.Debugf(f, "The bucket %s you tried to delete not empty.", bucket) return errors.New("BucketNotEmpty: The bucket you tried to delete is not empty") } - - fs.Debugf(f, "Tried to delete the bucket %s", f.bucket) - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) - if err != nil { - return err - } - retries := 0 - for retries <= 10 { - _, delErr := bucketInit.Delete() - if delErr != nil { - if e, ok := delErr.(*qsErr.QingStorError); ok { - switch e.Code { - // The status of "lease" takes a few seconds to "ready" when creating a new bucket - // wait for lease status ready - case "lease_not_ready": - fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries) - retries++ - time.Sleep(time.Second * 1) - continue - default: - err = e - break - } - } - } else { - err = delErr + return f.cache.Remove(bucket, func() error { + // fs.Debugf(f, "Deleting the bucket %s", bucket) + bucketInit, err := f.svc.Bucket(bucket, f.zone) + if err != nil { + return err } - break - } - - if err == nil { - f.bucketOK = false - f.bucketDeleted = true - } - return err + retries := 0 + for retries <= 10 { + _, delErr := bucketInit.Delete() + if delErr != nil { + if e, ok := delErr.(*qsErr.QingStorError); ok { + switch e.Code { + // The status of "lease" takes a few seconds to "ready" when creating a new bucket + // wait for lease status ready + case "lease_not_ready": + fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries) + retries++ + time.Sleep(time.Second * 1) + continue + default: + err = e + break + } + } + } else { + err = delErr + } + break + } + return err + }) } // readMetaData gets the metadata if it hasn't already been fetched // // it also sets the info func (o *Object) readMetaData() (err error) { - bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone) + bucket, bucketPath := o.split() + bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone) if err != nil { return err } - - key := o.fs.root + o.remote - fs.Debugf(o, "Read metadata of key: %s", key) - resp, err := bucketInit.HeadObject(key, &qs.HeadObjectInput{}) + // fs.Debugf(o, "Read metadata of key: %s", key) + resp, err := bucketInit.HeadObject(bucketPath, &qs.HeadObjectInput{}) if err != nil { - fs.Debugf(o, "Read metadata failed, API Error: %v", err) + // fs.Debugf(o, "Read metadata failed, API Error: %v", err) if e, ok := err.(*qsErr.QingStorError); ok { if e.StatusCode == http.StatusNotFound { return fs.ErrorObjectNotFound @@ -941,10 +918,10 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { return nil } // Copy the object to itself to update the metadata - key := o.fs.root + o.remote - sourceKey := path.Join("/", o.fs.bucket, key) + bucket, bucketPath := o.split() + sourceKey := path.Join("/", bucket, bucketPath) - bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone) + bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone) if err != nil { return err } @@ -953,19 +930,19 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { XQSCopySource: &sourceKey, ContentType: &mimeType, } - _, err = bucketInit.PutObject(key, &req) + _, err = bucketInit.PutObject(bucketPath, &req) return err } // Open opens the file for read. Call Close() on the returned io.ReadCloser func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { - bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone) + bucket, bucketPath := o.split() + bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone) if err != nil { return nil, err } - key := o.fs.root + o.remote req := qs.GetObjectInput{} fs.FixRangeOption(options, o.size) for _, option := range options { @@ -979,7 +956,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo } } } - resp, err := bucketInit.GetObject(key, &req) + resp, err := bucketInit.GetObject(bucketPath, &req) if err != nil { return nil, err } @@ -989,21 +966,21 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo // Update in to the object func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { // The maximum size of upload object is multipartUploadSize * MaxMultipleParts + bucket, bucketPath := o.split() err := o.fs.Mkdir(ctx, "") if err != nil { return err } - key := o.fs.root + o.remote // Guess the content type mimeType := fs.MimeType(ctx, src) req := uploadInput{ body: in, qsSvc: o.fs.svc, - bucket: o.fs.bucket, + bucket: bucket, zone: o.fs.zone, - key: key, + key: bucketPath, mimeType: mimeType, partSize: int64(o.fs.opt.ChunkSize), concurrency: o.fs.opt.UploadConcurrency, @@ -1027,13 +1004,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Remove this object func (o *Object) Remove(ctx context.Context) error { - bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone) + bucket, bucketPath := o.split() + bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone) if err != nil { return err } - - key := o.fs.root + o.remote - _, err = bucketInit.DeleteObject(key) + _, err = bucketInit.DeleteObject(bucketPath) return err }