diff --git a/docs/content/overview.md b/docs/content/overview.md index 39ef9b143..f95a65145 100644 --- a/docs/content/overview.md +++ b/docs/content/overview.md @@ -30,7 +30,7 @@ Here is an overview of the major features of each cloud storage system. | Microsoft Azure Blob Storage | MD5 | Yes | No | No | R/W | | Microsoft OneDrive | SHA1 | Yes | Yes | No | R | | Openstack Swift | MD5 | Yes | No | No | R/W | -| QingStor | - | No | No | No | R/W | +| QingStor | MD5 | No | No | No | R/W | | SFTP | MD5, SHA1 * | Yes | Depends | No | - | | Yandex Disk | MD5 | Yes | No | No | R/W | | The local filesystem | All | Yes | Depends | No | - | diff --git a/qingstor/qingstor.go b/qingstor/qingstor.go index 908937755..bb1698772 100644 --- a/qingstor/qingstor.go +++ b/qingstor/qingstor.go @@ -6,7 +6,6 @@ package qingstor import ( - "bytes" "fmt" "io" "net/http" @@ -65,6 +64,11 @@ func init() { Help: "The Shanghai (China) First Zone\nNeeds location constraint sh1a.", }, + { + Value: "gd2a", + + Help: "The Guangdong (China) Second Zone\nNeeds location constraint gd2a.", + }, }, }, { Name: "connection_retries", @@ -75,11 +79,8 @@ func init() { // Constants const ( - listLimitSize = 1000 // Number of items to read at once - maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY - maxSizeForPart = 1024 * 1024 * 1024 * 1 // The maximum size of object we can Upload in Multipart Upload API - multipartUploadSize = 1024 * 1024 * 64 // The size of multipart upload object as once. - MaxMultipleParts = 10000 // The maximum number of upload multiple parts + listLimitSize = 1000 // Number of items to read at once + maxSizeForCopy = 1024 * 1024 * 1024 * 5 // The maximum size of object we can COPY ) // Globals @@ -193,6 +194,8 @@ func qsServiceConnection(name string) (*qs.Service, error) { host = _host if _port != "" { port, _ = strconv.Atoi(_port) + } else if protocol == "http" { + port = 80 } } @@ -300,9 +303,8 @@ func (f *Fs) Precision() time.Duration { // Hashes returns the supported hash sets. func (f *Fs) Hashes() fs.HashSet { - //return fs.HashSet(fs.HashMD5) - //Not supported temporary - return fs.HashSet(fs.HashNone) + return fs.HashSet(fs.HashMD5) + //return fs.HashSet(fs.HashNone) } // Features returns the optional features of this Fs @@ -631,6 +633,31 @@ func (f *Fs) Mkdir(dir string) error { return nil } + bucketInit, err := f.svc.Bucket(f.bucket, f.zone) + if err != nil { + return err + } + /* When delete a bucket, qingstor need about 60 second to sync status; + So, need wait for it sync end if we try to operation a just deleted bucket + */ + retries := 0 + for retries <= 120 { + statistics, err := bucketInit.GetStatistics() + if statistics == nil || err != nil { + break + } + switch *statistics.Status { + case "deleted": + fs.Debugf(f, "Wiat for qingstor sync bucket status, retries: %d", retries) + time.Sleep(time.Second * 1) + retries++ + continue + default: + break + } + break + } + if !f.bucketDeleted { exists, err := f.dirExists() if err == nil { @@ -641,10 +668,6 @@ func (f *Fs) Mkdir(dir string) error { } } - bucketInit, err := f.svc.Bucket(f.bucket, f.zone) - if err != nil { - return err - } _, err = bucketInit.Put() if e, ok := err.(*qsErr.QingStorError); ok { if e.StatusCode == http.StatusConflict { @@ -662,21 +685,17 @@ func (f *Fs) Mkdir(dir string) error { // dirIsEmpty check if the bucket empty func (f *Fs) dirIsEmpty() (bool, error) { - limit := 8 bucketInit, err := f.svc.Bucket(f.bucket, f.zone) if err != nil { return true, err } - req := qs.ListObjectsInput{ - Limit: &limit, - } - rsp, err := bucketInit.ListObjects(&req) - + statistics, err := bucketInit.GetStatistics() if err != nil { - return false, err + return true, err } - if len(rsp.Keys) == 0 { + + if *statistics.Count == 0 { return true, nil } return false, nil @@ -704,7 +723,30 @@ func (f *Fs) Rmdir(dir string) error { if err != nil { return err } - _, err = bucketInit.Delete() + retries := 0 + for retries <= 10 { + _, delErr := bucketInit.Delete() + if delErr != nil { + if e, ok := delErr.(*qsErr.QingStorError); ok { + switch e.Code { + // The status of "lease" takes a few seconds to "ready" when creating a new bucket + // wait for lease status ready + case "lease_not_ready": + fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries) + retries++ + time.Sleep(time.Second * 1) + continue + default: + err = e + break + } + } + } else { + err = delErr + } + break + } + if err == nil { f.bucketOK = false f.bucketDeleted = true @@ -839,85 +881,24 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio return err } - bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone) - if err != nil { - return err - } - //Initiate Upload Multipart key := o.fs.root + o.remote - var objectParts = []*qs.ObjectPartType{} - var uploadID *string - var partNumber int - - defer func() { - if err != nil { - fs.Errorf(o, "Create Object Faild, API ERROR: %v", err) - // Abort Upload when init success and upload failed - if uploadID != nil { - fs.Debugf(o, "Abort Upload Multipart, upload_id: %s, objectParts: %+v", *uploadID, objectParts) - abortReq := qs.AbortMultipartUploadInput{ - UploadID: uploadID, - } - _, _ = bucketInit.AbortMultipartUpload(key, &abortReq) - } - } - }() - - fs.Debugf(o, "Initiate Upload Multipart, key: %s", key) + // Guess the content type mimeType := fs.MimeType(src) - initReq := qs.InitiateMultipartUploadInput{ - ContentType: &mimeType, + + req := uploadInput{ + body: in, + qsSvc: o.fs.svc, + bucket: o.fs.bucket, + zone: o.fs.zone, + key: key, + mimeType: mimeType, } - rsp, err := bucketInit.InitiateMultipartUpload(key, &initReq) + uploader := newUploader(&req) + + err = uploader.upload() if err != nil { return err } - uploadID = rsp.UploadID - - // Create an new buffer - buffer := new(bytes.Buffer) - - for { - size, er := io.CopyN(buffer, in, multipartUploadSize) - if er != nil && er != io.EOF { - err = fmt.Errorf("read upload data failed, error: %s", er) - return err - } - if size == 0 && partNumber > 0 { - break - } - // Upload Multipart Object - number := partNumber - req := qs.UploadMultipartInput{ - PartNumber: &number, - UploadID: uploadID, - ContentLength: &size, - Body: buffer, - } - fs.Debugf(o, "Upload Multipart, upload_id: %s, part_number: %d", *uploadID, number) - _, err = bucketInit.UploadMultipart(key, &req) - if err != nil { - return err - } - part := qs.ObjectPartType{ - PartNumber: &number, - Size: &size, - } - objectParts = append(objectParts, &part) - partNumber++ - } - - // Complete Multipart Upload - fs.Debugf(o, "Complete Upload Multipart, upload_id: %s, objectParts: %d", *uploadID, len(objectParts)) - completeReq := qs.CompleteMultipartUploadInput{ - UploadID: uploadID, - ObjectParts: objectParts, - } - _, err = bucketInit.CompleteMultipartUpload(key, &completeReq) - if err != nil { - return err - } - // Read Metadata of object err = o.readMetaData() return err diff --git a/qingstor/upload.go b/qingstor/upload.go new file mode 100644 index 000000000..c2b631863 --- /dev/null +++ b/qingstor/upload.go @@ -0,0 +1,415 @@ +// Upload object to QingStor + +// +build !plan9 + +package qingstor + +import ( + "bytes" + "crypto/md5" + "fmt" + "hash" + "io" + "sort" + "sync" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" + qs "github.com/yunify/qingstor-sdk-go/service" +) + +const ( + maxSinglePartSize = 1024 * 1024 * 1024 * 5 // The maximum allowed size when uploading a single object to QingStor + maxMultiPartSize = 1024 * 1024 * 1024 * 1 // The maximum allowed part size when uploading a part to QingStor + minMultiPartSize = 1024 * 1024 * 4 // The minimum allowed part size when uploading a part to QingStor + maxMultiParts = 10000 // The maximum allowed number of parts in an multi-part upload +) + +const ( + defaultUploadPartSize = 1024 * 1024 * 64 // The default part size to buffer chunks of a payload into. + defaultUploadConcurrency = 4 // the default number of goroutines to spin up when using multiPartUpload. +) + +func readFillBuf(r io.Reader, b []byte) (offset int, err error) { + for offset < len(b) && err == nil { + var n int + n, err = r.Read(b[offset:]) + offset += n + } + + return offset, err +} + +// uploadInput contains all input for upload requests to QingStor. +type uploadInput struct { + body io.Reader + qsSvc *qs.Service + mimeType string + zone string + bucket string + key string + partSize int64 + concurrency int + maxUploadParts int +} + +// uploader internal structure to manage an upload to QingStor. +type uploader struct { + cfg *uploadInput + totalSize int64 // set to -1 if the size is not known + readerPos int64 // current reader position + readerSize int64 // current reader content size +} + +// newUploader creates a new Uploader instance to upload objects to QingStor. +func newUploader(in *uploadInput) *uploader { + u := &uploader{ + cfg: in, + } + return u +} + +// bucketInit initiate as bucket controller +func (u *uploader) bucketInit() (*qs.Bucket, error) { + bucketInit, err := u.cfg.qsSvc.Bucket(u.cfg.bucket, u.cfg.zone) + return bucketInit, err +} + +// String converts uploader to a string +func (u *uploader) String() string { + return fmt.Sprintf("QingStor bucket %s key %s", u.cfg.bucket, u.cfg.key) +} + +// nextReader returns a seekable reader representing the next packet of data. +// This operation increases the shared u.readerPos counter, but note that it +// does not need to be wrapped in a mutex because nextReader is only called +// from the main thread. +func (u *uploader) nextReader() (io.ReadSeeker, int, error) { + type readerAtSeeker interface { + io.ReaderAt + io.ReadSeeker + } + switch r := u.cfg.body.(type) { + case readerAtSeeker: + var err error + n := u.cfg.partSize + if u.totalSize >= 0 { + bytesLeft := u.totalSize - u.readerPos + + if bytesLeft <= u.cfg.partSize { + err = io.EOF + n = bytesLeft + } + } + reader := io.NewSectionReader(r, u.readerPos, n) + u.readerPos += n + u.readerSize = n + return reader, int(n), err + + default: + part := make([]byte, u.cfg.partSize) + n, err := readFillBuf(r, part) + u.readerPos += int64(n) + u.readerSize = int64(n) + return bytes.NewReader(part[0:n]), n, err + } +} + +// init will initialize all default options. +func (u *uploader) init() { + if u.cfg.concurrency == 0 { + u.cfg.concurrency = defaultUploadConcurrency + } + if u.cfg.partSize == 0 { + u.cfg.partSize = defaultUploadPartSize + } + if u.cfg.maxUploadParts == 0 { + u.cfg.maxUploadParts = maxMultiParts + } + // Try to get the total size for some optimizations + u.totalSize = -1 + switch r := u.cfg.body.(type) { + case io.Seeker: + pos, _ := r.Seek(0, 1) + defer func() { + _, _ = r.Seek(pos, 0) + }() + + n, err := r.Seek(0, 2) + if err != nil { + return + } + u.totalSize = n + + // Try to adjust partSize if it is too small and account for + // integer division truncation. + if u.totalSize/u.cfg.partSize >= int64(u.cfg.partSize) { + // Add one to the part size to account for remainders + // during the size calculation. e.g odd number of bytes. + u.cfg.partSize = (u.totalSize / int64(u.cfg.maxUploadParts)) + 1 + } + } +} + +// singlePartUpload upload a single object that contentLength less than "defaultUploadPartSize" +func (u *uploader) singlePartUpload(buf io.ReadSeeker) error { + bucketInit, _ := u.bucketInit() + + req := qs.PutObjectInput{ + ContentLength: &u.readerPos, + ContentType: &u.cfg.mimeType, + Body: buf, + } + + _, err := bucketInit.PutObject(u.cfg.key, &req) + if err == nil { + fs.Debugf(u, "Upload single objcet finished") + } + return err +} + +// Upload upload a object into QingStor +func (u *uploader) upload() error { + u.init() + + if u.cfg.partSize < minMultiPartSize { + return errors.Errorf("part size must be at least %d bytes", minMultiPartSize) + } + + // Do one read to determine if we have more than one part + reader, _, err := u.nextReader() + if err == io.EOF { // single part + fs.Debugf(u, "Tried to upload a singile object to QingStor") + return u.singlePartUpload(reader) + } else if err != nil { + return errors.Errorf("read upload data failed: %s", err) + } + + fs.Debugf(u, "Treied to upload a multi-part object to QingStor") + mu := multiUploader{uploader: u} + return mu.multiPartUpload(reader) +} + +// internal structure to manage a specific multipart upload to QingStor. +type multiUploader struct { + *uploader + wg sync.WaitGroup + mtx sync.Mutex + err error + uploadID *string + objectParts completedParts + hashMd5 hash.Hash +} + +// keeps track of a single chunk of data being sent to QingStor. +type chunk struct { + buffer io.ReadSeeker + partNumber int + size int64 +} + +// completedParts is a wrapper to make parts sortable by their part number, +// since QingStor required this list to be sent in sorted order. +type completedParts []*qs.ObjectPartType + +func (a completedParts) Len() int { return len(a) } +func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } + +// String converts multiUploader to a string +func (mu *multiUploader) String() string { + if uploadID := mu.uploadID; uploadID != nil { + return fmt.Sprintf("QingStor bucket %s key %s uploadID %s", mu.cfg.bucket, mu.cfg.key, *uploadID) + } + return fmt.Sprintf("QingStor bucket %s key %s uploadID ", mu.cfg.bucket, mu.cfg.key) +} + +// getErr is a thread-safe getter for the error object +func (mu *multiUploader) getErr() error { + mu.mtx.Lock() + defer mu.mtx.Unlock() + return mu.err +} + +// setErr is a thread-safe setter for the error object +func (mu *multiUploader) setErr(e error) { + mu.mtx.Lock() + defer mu.mtx.Unlock() + mu.err = e +} + +// readChunk runs in worker goroutines to pull chunks off of the ch channel +// and send() them as UploadPart requests. +func (mu *multiUploader) readChunk(ch chan chunk) { + defer mu.wg.Done() + for { + c, ok := <-ch + if !ok { + break + } + if mu.getErr() == nil { + if err := mu.send(c); err != nil { + mu.setErr(err) + } + } + } +} + +// initiate init an Multiple Object and obtain UploadID +func (mu *multiUploader) initiate() error { + bucketInit, _ := mu.bucketInit() + req := qs.InitiateMultipartUploadInput{ + ContentType: &mu.cfg.mimeType, + } + fs.Debugf(mu, "Tried to initiate a multi-part upload") + rsp, err := bucketInit.InitiateMultipartUpload(mu.cfg.key, &req) + if err == nil { + mu.uploadID = rsp.UploadID + mu.hashMd5 = md5.New() + } + return err +} + +// send upload a part into QingStor +func (mu *multiUploader) send(c chunk) error { + bucketInit, _ := mu.bucketInit() + req := qs.UploadMultipartInput{ + PartNumber: &c.partNumber, + UploadID: mu.uploadID, + ContentLength: &c.size, + Body: c.buffer, + } + fs.Debugf(mu, "Tried to upload a part to QingStor that partNumber %d and partSize %d", c.partNumber, c.size) + _, err := bucketInit.UploadMultipart(mu.cfg.key, &req) + if err != nil { + return err + } + fs.Debugf(mu, "Upload part finished that partNumber %d and partSize %d", c.partNumber, c.size) + + mu.mtx.Lock() + defer mu.mtx.Unlock() + + _, _ = c.buffer.Seek(0, 0) + _, _ = io.Copy(mu.hashMd5, c.buffer) + + parts := qs.ObjectPartType{PartNumber: &c.partNumber, Size: &c.size} + mu.objectParts = append(mu.objectParts, &parts) + return err +} + +// list list the ObjectParts of an multipart upload +func (mu *multiUploader) list() error { + bucketInit, _ := mu.bucketInit() + + req := qs.ListMultipartInput{ + UploadID: mu.uploadID, + } + fs.Debugf(mu, "Tried to list a multi-part") + rsp, err := bucketInit.ListMultipart(mu.cfg.key, &req) + if err == nil { + mu.objectParts = rsp.ObjectParts + } + return err +} + +// complete complete an multipart upload +func (mu *multiUploader) complete() error { + var err error + if err = mu.getErr(); err != nil { + return err + } + bucketInit, _ := mu.bucketInit() + //if err = mu.list(); err != nil { + // return err + //} + //md5String := fmt.Sprintf("\"%s\"", hex.EncodeToString(mu.hashMd5.Sum(nil))) + + md5String := fmt.Sprintf("\"%x\"", mu.hashMd5.Sum(nil)) + sort.Sort(mu.objectParts) + req := qs.CompleteMultipartUploadInput{ + UploadID: mu.uploadID, + ObjectParts: mu.objectParts, + ETag: &md5String, + } + fs.Debugf(mu, "Tried to complete a multi-part") + _, err = bucketInit.CompleteMultipartUpload(mu.cfg.key, &req) + if err == nil { + fs.Debugf(mu, "Complete multi-part finished") + } + return err +} + +// abort abort an multipart upload +func (mu *multiUploader) abort() error { + var err error + bucketInit, _ := mu.bucketInit() + + if uploadID := mu.uploadID; uploadID != nil { + req := qs.AbortMultipartUploadInput{ + UploadID: uploadID, + } + fs.Debugf(mu, "Tried to abort a multi-part") + _, err = bucketInit.AbortMultipartUpload(mu.cfg.key, &req) + } + + return err +} + +// multiPartUpload upload a multiple object into QingStor +func (mu *multiUploader) multiPartUpload(firstBuf io.ReadSeeker) error { + var err error + //Initiate an multi-part upload + if err = mu.initiate(); err != nil { + return err + } + + ch := make(chan chunk, mu.cfg.concurrency) + for i := 0; i < mu.cfg.concurrency; i++ { + mu.wg.Add(1) + go mu.readChunk(ch) + } + + var partNumber int + ch <- chunk{partNumber: partNumber, buffer: firstBuf, size: mu.readerSize} + + for mu.getErr() == nil { + partNumber++ + // This upload exceeded maximum number of supported parts, error now. + if partNumber > mu.cfg.maxUploadParts || partNumber > maxMultiParts { + var msg string + if partNumber > mu.cfg.maxUploadParts { + msg = fmt.Sprintf("exceeded total allowed configured maxUploadParts (%d). "+ + "Adjust PartSize to fit in this limit", mu.cfg.maxUploadParts) + } else { + msg = fmt.Sprintf("exceeded total allowed QingStor limit maxUploadParts (%d). "+ + "Adjust PartSize to fit in this limit", maxMultiParts) + } + mu.setErr(errors.New(msg)) + break + } + + var reader io.ReadSeeker + var nextChunkLen int + reader, nextChunkLen, err = mu.nextReader() + if err != nil && err != io.EOF { + return err + } + if nextChunkLen == 0 && partNumber > 0 { + // No need to upload empty part, if file was empty to start + // with empty single part would of been created and never + // started multipart upload. + break + } + num := partNumber + ch <- chunk{partNumber: num, buffer: reader, size: mu.readerSize} + } + // Wait for all goroutines finish + close(ch) + mu.wg.Wait() + // Complete Multipart Upload + err = mu.complete() + if mu.getErr() != nil || err != nil { + _ = mu.abort() + } + return err +}