From 598364ad0f9bac36e91c02455cdf2a36e51e3c6d Mon Sep 17 00:00:00 2001 From: Lesmiscore Date: Tue, 12 Apr 2022 18:38:44 +0900 Subject: [PATCH] backend/internetarchive: add support for Internet Archive This adds support for Internet Archive (archive.org) Items. --- README.md | 1 + backend/all/all.go | 1 + backend/internetarchive/internetarchive.go | 1132 +++++++++++++++++ .../internetarchive/internetarchive_test.go | 17 + bin/make_manual.py | 1 + docs/content/_index.md | 1 + docs/content/docs.md | 1 + docs/content/internetarchive.md | 222 ++++ docs/content/overview.md | 2 + docs/layouts/chrome/navbar.html | 1 + fstest/test_all/config.yaml | 3 + 11 files changed, 1382 insertions(+) create mode 100644 backend/internetarchive/internetarchive.go create mode 100644 backend/internetarchive/internetarchive_test.go create mode 100644 docs/content/internetarchive.md diff --git a/README.md b/README.md index 61a028606..307d95342 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ Rclone *("rsync for cloud storage")* is a command-line program to sync files and * HDFS (Hadoop Distributed Filesystem) [:page_facing_up:](https://rclone.org/hdfs/) * HTTP [:page_facing_up:](https://rclone.org/http/) * Hubic [:page_facing_up:](https://rclone.org/hubic/) + * Internet Archive [:page_facing_up:](https://rclone.org/internetarchive/) * Jottacloud [:page_facing_up:](https://rclone.org/jottacloud/) * IBM COS S3 [:page_facing_up:](https://rclone.org/s3/#ibm-cos-s3) * Koofr [:page_facing_up:](https://rclone.org/koofr/) diff --git a/backend/all/all.go b/backend/all/all.go index b10cbae38..a93909eca 100644 --- a/backend/all/all.go +++ b/backend/all/all.go @@ -22,6 +22,7 @@ import ( _ "github.com/rclone/rclone/backend/hdfs" _ "github.com/rclone/rclone/backend/http" _ "github.com/rclone/rclone/backend/hubic" + _ "github.com/rclone/rclone/backend/internetarchive" _ "github.com/rclone/rclone/backend/jottacloud" _ "github.com/rclone/rclone/backend/koofr" _ "github.com/rclone/rclone/backend/local" diff --git a/backend/internetarchive/internetarchive.go b/backend/internetarchive/internetarchive.go new file mode 100644 index 000000000..ce994c23c --- /dev/null +++ b/backend/internetarchive/internetarchive.go @@ -0,0 +1,1132 @@ +// Package internetarchive provides an interface to Internet Archive's Item +// via their native API than using S3-compatible endpoints. +package internetarchive + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "regexp" + "strconv" + "strings" + "time" + + "github.com/ncw/swift/v2" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/config/configstruct" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fs/fshttp" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/bucket" + "github.com/rclone/rclone/lib/encoder" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/rest" +) + +// Register with Fs +func init() { + fs.Register(&fs.RegInfo{ + Name: "internetarchive", + Description: "Internet Archive", + NewFs: NewFs, + Options: []fs.Option{{ + Name: "access_key_id", + Help: "IAS3 Access Key.\n\nLeave blank for anonymous access.\nYou can find one here: https://archive.org/account/s3.php", + }, { + Name: "secret_access_key", + Help: "IAS3 Secret Key (password).\n\nLeave blank for anonymous access.", + }, { + // their official client (https://github.com/jjjake/internetarchive) hardcodes following the two + Name: "endpoint", + Help: "IAS3 Endpoint.\n\nLeave blank for default value.", + Default: "https://s3.us.archive.org", + Advanced: true, + }, { + Name: "front_endpoint", + Help: "Host of InternetArchive Frontend.\n\nLeave blank for default value.", + Default: "https://archive.org", + Advanced: true, + }, { + Name: "disable_checksum", + Help: `Don't ask the server to test against MD5 checksum calculated by rclone. +Normally rclone will calculate the MD5 checksum of the input before +uploading it so it can ask the server to check the object against checksum. +This is great for data integrity checking but can cause long delays for +large files to start uploading.`, + Default: true, + Advanced: true, + }, { + Name: "wait_archive", + Help: `Timeout for waiting the server's processing tasks (specifically archive and book_op) to finish. +Only enable if you need to be guaranteed to be reflected after write operations. +0 to disable waiting. No errors to be thrown in case of timeout.`, + Default: fs.Duration(0), + Advanced: true, + }, { + Name: config.ConfigEncoding, + Help: config.ConfigEncodingHelp, + Advanced: true, + Default: encoder.EncodeZero | + encoder.EncodeSlash | + encoder.EncodeLtGt | + encoder.EncodeCrLf | + encoder.EncodeDel | + encoder.EncodeCtl | + encoder.EncodeInvalidUtf8 | + encoder.EncodeDot, + }, + }}) +} + +// maximum size of an item. this is constant across all items +const iaItemMaxSize int64 = 1099511627776 + +// Options defines the configuration for this backend +type Options struct { + AccessKeyID string `config:"access_key_id"` + SecretAccessKey string `config:"secret_access_key"` + Endpoint string `config:"endpoint"` + FrontEndpoint string `config:"front_endpoint"` + DisableChecksum bool `config:"disable_checksum"` + WaitArchive fs.Duration `config:"wait_archive"` + Enc encoder.MultiEncoder `config:"encoding"` +} + +// Fs represents an IAS3 remote +type Fs struct { + name string // name of this remote + root string // the path we are working on if any + opt Options // parsed config options + features *fs.Features // optional features + srv *rest.Client // the connection to IAS3 + front *rest.Client // the connection to frontend + pacer *fs.Pacer // pacer for API calls + ctx context.Context +} + +// Object describes a file at IA +type Object struct { + fs *Fs // reference to Fs + remote string // the remote path + modTime time.Time // last modified time + size int64 // size of the file in bytes + md5 string // md5 hash of the file presented by the server + sha1 string // sha1 hash of the file presented by the server + crc32 string // crc32 of the file presented by the server +} + +// IAFile reprensents a subset of object in MetadataResponse.Files +type IAFile struct { + Name string `json:"name"` + // Source string `json:"source"` + Mtime string `json:"mtime"` + RcloneMtime json.RawMessage `json:"rclone-mtime"` + Size string `json:"size"` + Md5 string `json:"md5"` + Crc32 string `json:"crc32"` + Sha1 string `json:"sha1"` +} + +// MetadataResponse reprensents subset of the JSON object returned by (frontend)/metadata/ +type MetadataResponse struct { + Files []IAFile `json:"files"` + ItemSize int64 `json:"item_size"` +} + +// ModMetadataResponse represents response for amending metadata +type ModMetadataResponse struct { + // https://archive.org/services/docs/api/md-write.html#example + Success bool `json:"success"` + Error string `json:"error"` +} + +// Name of the remote (as passed into NewFs) +func (f *Fs) Name() string { + return f.name +} + +// Root of the remote (as passed into NewFs) +func (f *Fs) Root() string { + return f.root +} + +// String converts this Fs to a string +func (f *Fs) String() string { + bucket, file := f.split("") + if bucket == "" { + return "Internet Archive root" + } + if file == "" { + return fmt.Sprintf("Internet Archive item %s", bucket) + } + return fmt.Sprintf("Internet Archive item %s path %s", bucket, file) +} + +// Features returns the optional features of this Fs +func (f *Fs) Features() *fs.Features { + return f.features +} + +// Hashes returns type of hashes supported by IA +func (f *Fs) Hashes() hash.Set { + return hash.NewHashSet(hash.MD5, hash.SHA1, hash.CRC32) +} + +// Precision returns the precision of mtime that the server responds +func (f *Fs) Precision() time.Duration { + if f.opt.WaitArchive == 0 { + return fs.ModTimeNotSupported + } + return time.Nanosecond +} + +// retryErrorCodes is a slice of error codes that we will retry +// See: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html +var retryErrorCodes = []int{ + 429, // Too Many Requests + 500, // Internal Server Error - "We encountered an internal error. Please try again." + 503, // Service Unavailable/Slow Down - "Reduce your request rate" +} + +// NewFs constructs an Fs from the path +func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) { + // Parse config into Options struct + opt := new(Options) + err := configstruct.Set(m, opt) + if err != nil { + return nil, err + } + + // Parse the endpoints + ep, err := url.Parse(opt.Endpoint) + if err != nil { + return nil, err + } + fe, err := url.Parse(opt.FrontEndpoint) + if err != nil { + return nil, err + } + + root = strings.Trim(root, "/") + + f := &Fs{ + name: name, + opt: *opt, + ctx: ctx, + } + f.setRoot(root) + f.features = (&fs.Features{ + BucketBased: true, + }).Fill(ctx, f) + + f.srv = rest.NewClient(fshttp.NewClient(ctx)) + f.srv.SetRoot(ep.String()) + + f.front = rest.NewClient(fshttp.NewClient(ctx)) + f.front.SetRoot(fe.String()) + + if opt.AccessKeyID != "" && opt.SecretAccessKey != "" { + auth := fmt.Sprintf("LOW %s:%s", opt.AccessKeyID, opt.SecretAccessKey) + f.srv.SetHeader("Authorization", auth) + f.front.SetHeader("Authorization", auth) + } + + f.pacer = fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(10*time.Millisecond))) + + // test if the root exists as a file + _, err = f.NewObject(ctx, "/") + if err == nil { + f.setRoot(betterPathDir(root)) + return f, fs.ErrorIsFile + } + return f, nil +} + +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = strings.Trim(root, "/") +} + +// Remote returns the remote path +func (o *Object) Remote() string { + return o.remote +} + +// ModTime is the last modified time (read-only) +func (o *Object) ModTime(ctx context.Context) time.Time { + return o.modTime +} + +// Size is the file length +func (o *Object) Size() int64 { + return o.size +} + +// Fs returns the parent Fs +func (o *Object) Fs() fs.Info { + return o.fs +} + +// Hash returns the hash value presented by IA +func (o *Object) Hash(ctx context.Context, ty hash.Type) (string, error) { + if ty == hash.MD5 { + return o.md5, nil + } + if ty == hash.SHA1 { + return o.sha1, nil + } + if ty == hash.CRC32 { + return o.crc32, nil + } + return "", hash.ErrUnsupported +} + +// Storable returns if this object is storable +func (o *Object) Storable() bool { + return true +} + +// SetModTime is not supported +func (o *Object) SetModTime(ctx context.Context, t time.Time) (err error) { + bucket, reqDir := o.split() + if bucket == "" { + return fs.ErrorCantSetModTime + } + if reqDir == "" { + return fs.ErrorCantSetModTime + } + + // https://archive.org/services/docs/api/md-write.html + var patch = []interface{}{ + // we should drop it first to clear all rclone-provided mtimes + struct { + Op string `json:"op"` + Path string `json:"path"` + }{"remove", "/rclone-mtime"}, + struct { + Op string `json:"op"` + Path string `json:"path"` + Value string `json:"value"` + }{"add", "/rclone-mtime", t.Format(time.RFC3339Nano)}, + } + res, err := json.Marshal(patch) + if err != nil { + return err + } + params := url.Values{} + params.Add("-target", fmt.Sprintf("files/%s", reqDir)) + params.Add("-patch", string(res)) + body := []byte(params.Encode()) + bodyLen := int64(len(body)) + + var resp *http.Response + var result ModMetadataResponse + // make a POST request to (frontend)/metadata/:item/ + opts := rest.Opts{ + Method: "POST", + Path: path.Join("/metadata/", bucket), + Body: bytes.NewReader(body), + ContentLength: &bodyLen, + ContentType: "application/x-www-form-urlencoded", + } + + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.front.CallJSON(ctx, &opts, nil, &result) + return o.fs.shouldRetry(resp, err) + }) + if err != nil { + return err + } + + if result.Success { + o.modTime = t + return nil + } + + return errors.New(result.Error) +} + +// List files and directories in a directory +func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { + bucket, reqDir := f.split(dir) + if bucket == "" { + if reqDir != "" { + return nil, fs.ErrorListBucketRequired + } + return entries, nil + } + grandparent := f.opt.Enc.ToStandardPath(strings.Trim(path.Join(bucket, reqDir), "/") + "/") + + allEntries, err := f.listAllUnconstrained(ctx, bucket) + if err != nil { + return entries, err + } + for _, ent := range allEntries { + obj, ok := ent.(*Object) + if ok && strings.HasPrefix(obj.remote, grandparent) { + path := trimPathPrefix(obj.remote, grandparent, f.opt.Enc) + if !strings.Contains(path, "/") { + obj.remote = trimPathPrefix(obj.remote, f.root, f.opt.Enc) + entries = append(entries, obj) + } + } + dire, ok := ent.(*fs.Dir) + if ok && strings.HasPrefix(dire.Remote(), grandparent) { + path := trimPathPrefix(dire.Remote(), grandparent, f.opt.Enc) + if !strings.Contains(path, "/") { + dire.SetRemote(trimPathPrefix(dire.Remote(), f.root, f.opt.Enc)) + entries = append(entries, dire) + } + } + } + + return entries, nil +} + +// Mkdir can't be performed on IA like git repositories +func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) { + return nil +} + +// Rmdir as well, unless we're asked for recursive deletion +func (f *Fs) Rmdir(ctx context.Context, dir string) error { + return nil +} + +// NewObject finds the Object at remote. If it can't be found +// it returns the error fs.ErrorObjectNotFound. +func (f *Fs) NewObject(ctx context.Context, remote string) (ret fs.Object, err error) { + bucket, filepath := f.split(remote) + filepath = strings.Trim(filepath, "/") + if bucket == "" { + if filepath != "" { + return nil, fs.ErrorListBucketRequired + } + return nil, fs.ErrorIsDir + } + + grandparent := f.opt.Enc.ToStandardPath(strings.Trim(path.Join(bucket, filepath), "/")) + + allEntries, err := f.listAllUnconstrained(ctx, bucket) + if err != nil { + return nil, err + } + for _, ent := range allEntries { + obj, ok := ent.(*Object) + if ok && obj.remote == grandparent { + obj.remote = trimPathPrefix(obj.remote, f.root, f.opt.Enc) + return obj, nil + } + } + + return nil, fs.ErrorObjectNotFound +} + +// Put uploads a file +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + o := &Object{ + fs: f, + remote: src.Remote(), + modTime: src.ModTime(ctx), + size: src.Size(), + } + + err := o.Update(ctx, in, src, options...) + if err == nil { + return o, nil + } + + return nil, err +} + +// PublicLink generates a public link to the remote path (usually readable by anyone) +func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (link string, err error) { + if strings.HasSuffix(remote, "/") { + return "", fs.ErrorCantShareDirectories + } + if _, err := f.NewObject(ctx, remote); err != nil { + return "", err + } + bucket, bucketPath := f.split(remote) + return path.Join(f.opt.FrontEndpoint, "/download/", bucket, bucketPath), nil +} + +// Copy src to this remote using server-side copy operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantCopy +func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Object, err error) { + dstBucket, dstPath := f.split(remote) + srcObj, ok := src.(*Object) + if !ok { + fs.Debugf(src, "Can't copy - not same remote type") + return nil, fs.ErrorCantCopy + } + srcBucket, srcPath := srcObj.split() + + if dstBucket == srcBucket && dstPath == srcPath { + // https://github.com/jjjake/internetarchive/blob/2456376533251df9d05e0a14d796ec1ced4959f5/internetarchive/cli/ia_copy.py#L68 + fs.Debugf(src, "Can't copy - the source and destination files cannot be the same!") + return nil, fs.ErrorCantCopy + } + + headers := map[string]string{ + "x-archive-auto-make-bucket": "1", + "x-archive-queue-derive": "0", + "x-archive-keep-old-version": "0", + "x-amz-copy-source": quotePath(path.Join("/", srcBucket, srcPath)), + "x-amz-metadata-directive": "COPY", + "x-archive-filemeta-sha1": srcObj.sha1, + "x-archive-filemeta-md5": srcObj.md5, + "x-archive-filemeta-crc32": srcObj.crc32, + "x-archive-filemeta-size": fmt.Sprint(srcObj.size), + // add this too for sure + "x-archive-filemeta-rclone-mtime": srcObj.modTime.Format(time.RFC3339Nano), + } + + // make a PUT request at (IAS3)/:item/:path without body + var resp *http.Response + opts := rest.Opts{ + Method: "PUT", + Path: "/" + url.PathEscape(path.Join(dstBucket, dstPath)), + ExtraHeaders: headers, + } + + err = f.pacer.Call(func() (bool, error) { + resp, err = f.srv.Call(ctx, &opts) + return f.shouldRetry(resp, err) + }) + if err != nil { + return nil, err + } + + // we can't update/find metadata here as IA will also + // queue server-side copy as well as upload/delete. + return f.waitFileUpload(ctx, trimPathPrefix(path.Join(dstBucket, dstPath), f.root, f.opt.Enc), f.getHashes(ctx, src), srcObj.size) +} + +// ListR lists the objects and directories of the Fs starting +// from dir recursively into out. +// +// dir should be "" to start from the root, and should not +// have trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +// +// It should call callback for each tranche of entries read. +// These need not be returned in any particular order. If +// callback returns an error then the listing will stop +// immediately. +// +// Don't implement this unless you have a more efficient way +// of listing recursively than doing a directory traversal. +func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { + var allEntries, entries fs.DirEntries + bucket, reqDir := f.split(dir) + if bucket == "" { + if reqDir != "" { + return fs.ErrorListBucketRequired + } + return callback(entries) + } + grandparent := f.opt.Enc.ToStandardPath(strings.Trim(path.Join(bucket, reqDir), "/") + "/") + + allEntries, err = f.listAllUnconstrained(ctx, bucket) + if err != nil { + return err + } + for _, ent := range allEntries { + obj, ok := ent.(*Object) + if ok && strings.HasPrefix(obj.remote, grandparent) { + obj.remote = trimPathPrefix(obj.remote, f.root, f.opt.Enc) + entries = append(entries, obj) + } + dire, ok := ent.(*fs.Dir) + if ok && strings.HasPrefix(dire.Remote(), grandparent) { + dire.SetRemote(trimPathPrefix(dire.Remote(), f.root, f.opt.Enc)) + entries = append(entries, dire) + } + } + + return callback(entries) +} + +// CleanUp removes all files inside history/ +func (f *Fs) CleanUp(ctx context.Context) (err error) { + bucket, _ := f.split("/") + if bucket == "" { + return fs.ErrorListBucketRequired + } + entries, err := f.listAllUnconstrained(ctx, bucket) + if err != nil { + return err + } + + for _, ent := range entries { + obj, ok := ent.(*Object) + if ok && strings.HasPrefix(obj.remote, bucket+"/history/") { + err = obj.Remove(ctx) + if err != nil { + return err + } + } + // we can fully ignore directories, as they're just virtual entries to + // comply with rclone's requirement + } + + return nil +} + +// About returns things about remaining and used spaces +func (f *Fs) About(ctx context.Context) (_ *fs.Usage, err error) { + bucket, _ := f.split("/") + if bucket == "" { + return nil, fs.ErrorListBucketRequired + } + + result, err := f.requestMetadata(ctx, bucket) + if err != nil { + return nil, err + } + + // perform low-level operation here since it's ridiculous to make 2 same requests + var historySize int64 + for _, ent := range result.Files { + if strings.HasPrefix(ent.Name, "history/") { + size := parseSize(ent.Size) + if size < 0 { + // parse error can be ignored since it's not fatal + continue + } + historySize += size + } + } + + usage := &fs.Usage{ + Total: fs.NewUsageValue(iaItemMaxSize), + Free: fs.NewUsageValue(iaItemMaxSize - result.ItemSize), + Used: fs.NewUsageValue(result.ItemSize), + Trashed: fs.NewUsageValue(historySize), // bytes in trash + } + return usage, nil +} + +// Open an object for read +func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { + var optionsFixed []fs.OpenOption + for _, opt := range options { + if optRange, ok := opt.(*fs.RangeOption); ok { + // Ignore range option if file is empty + if o.Size() == 0 && optRange.Start == 0 && optRange.End > 0 { + continue + } + } + optionsFixed = append(optionsFixed, opt) + } + + var resp *http.Response + // make a GET request to (frontend)/download/:item/:path + opts := rest.Opts{ + Method: "GET", + Path: path.Join("/download/", o.fs.root, o.fs.opt.Enc.FromStandardPath(o.remote)), + Options: optionsFixed, + } + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.front.Call(ctx, &opts) + return o.fs.shouldRetry(resp, err) + }) + if err != nil { + return nil, err + } + return resp.Body, nil +} + +// Update the Object from in with modTime and size +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + bucket, bucketPath := o.split() + modTime := src.ModTime(ctx) + size := src.Size() + + // Set the mtime in the metadata + // internetarchive backend builds at header level as IAS3 has extension outside X-Amz- + headers := map[string]string{ + // https://github.com/jjjake/internetarchive/blob/2456376533251df9d05e0a14d796ec1ced4959f5/internetarchive/iarequest.py#L158 + "x-amz-filemeta-rclone-mtime": modTime.Format(time.RFC3339Nano), + + // we add some more headers for intuitive actions + "x-amz-auto-make-bucket": "1", // create an item if does not exist, do nothing if already + "x-archive-auto-make-bucket": "1", // same as above in IAS3 original way + "x-archive-keep-old-version": "0", // do not keep old versions (a.k.a. trashes in other clouds) + "x-archive-meta-mediatype": "data", // mark media type of the uploading file as "data" + "x-archive-queue-derive": "0", // skip derivation process (e.g. encoding to smaller files, OCR on PDFs) + "x-archive-cascade-delete": "1", // enable "cascate delete" (delete all derived files in addition to the file itself) + } + if size >= 0 { + headers["Content-Length"] = fmt.Sprintf("%d", size) + headers["x-archive-size-hint"] = fmt.Sprintf("%d", size) + } + + // read the md5sum if available + var md5sumHex string + if !o.fs.opt.DisableChecksum { + md5sumHex, err = src.Hash(ctx, hash.MD5) + if err == nil && matchMd5.MatchString(md5sumHex) { + // Set the md5sum in header on the object if + // the user wants it + // https://github.com/jjjake/internetarchive/blob/245637653/internetarchive/item.py#L969 + headers["Content-MD5"] = md5sumHex + } + } + + // make a PUT request at (IAS3)/encoded(:item/:path) + var resp *http.Response + opts := rest.Opts{ + Method: "PUT", + Path: "/" + url.PathEscape(path.Join(bucket, bucketPath)), + Body: in, + ContentLength: &size, + ExtraHeaders: headers, + } + + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.srv.Call(ctx, &opts) + return o.fs.shouldRetry(resp, err) + }) + + // we can't update/find metadata here as IA will "ingest" uploaded file(s) + // upon uploads. (you can find its progress at https://archive.org/history/ItemNameHere ) + // or we have to wait for finish? (needs polling (frontend)/metadata/:item or scraping (frontend)/history/:item) + var newObj *Object + if err == nil { + newObj, err = o.fs.waitFileUpload(ctx, o.remote, o.fs.getHashes(ctx, src), size) + } else { + newObj = &Object{} + } + o.crc32 = newObj.crc32 + o.md5 = newObj.md5 + o.sha1 = newObj.sha1 + o.modTime = newObj.modTime + o.size = newObj.size + return err +} + +// Remove an object +func (o *Object) Remove(ctx context.Context) (err error) { + bucket, bucketPath := o.split() + + // make a DELETE request at (IAS3)/:item/:path + var resp *http.Response + opts := rest.Opts{ + Method: "DELETE", + Path: "/" + url.PathEscape(path.Join(bucket, bucketPath)), + } + + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.srv.Call(ctx, &opts) + return o.fs.shouldRetry(resp, err) + }) + + // deleting files can take bit longer as + // it'll be processed on same queue as uploads + if err == nil { + err = o.fs.waitDelete(ctx, bucket, bucketPath) + } + return err +} + +// String converts this Fs to a string +func (o *Object) String() string { + if o == nil { + return "" + } + return o.remote +} + +func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) { + if resp != nil { + for _, e := range retryErrorCodes { + if resp.StatusCode == e { + return true, err + } + } + } + // Ok, not an awserr, check for generic failure conditions + return fserrors.ShouldRetry(err), err +} + +var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`) + +// split returns bucket and bucketPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) { + bucketName, bucketPath = bucket.Split(path.Join(f.root, rootRelativePath)) + return f.opt.Enc.FromStandardName(bucketName), f.opt.Enc.FromStandardPath(bucketPath) +} + +// split returns bucket and bucketPath from the object +func (o *Object) split() (bucket, bucketPath string) { + return o.fs.split(o.remote) +} + +func (f *Fs) getHashes(ctx context.Context, src fs.ObjectInfo) map[hash.Type]string { + hashMap := map[hash.Type]string{} + for _, ty := range f.Hashes().Array() { + sum, err := src.Hash(ctx, ty) + if err != nil || sum == "" { + continue + } + hashMap[ty] = sum + } + return hashMap +} + +func (f *Fs) requestMetadata(ctx context.Context, bucket string) (result MetadataResponse, err error) { + var resp *http.Response + // make a GET request to (frontend)/metadata/:item/ + opts := rest.Opts{ + Method: "GET", + Path: path.Join("/metadata/", bucket), + } + + err = f.pacer.Call(func() (bool, error) { + resp, err = f.front.CallJSON(ctx, &opts, nil, &result) + return f.shouldRetry(resp, err) + }) + + return result, err +} + +// list up all files/directories without any filters +func (f *Fs) listAllUnconstrained(ctx context.Context, bucket string) (entries fs.DirEntries, err error) { + result, err := f.requestMetadata(ctx, bucket) + if err != nil { + return nil, err + } + + knownDirs := map[string]time.Time{ + "": time.Unix(0, 0), + } + for _, file := range result.Files { + dir := strings.Trim(betterPathDir(file.Name), "/") + nameWithBucket := path.Join(bucket, file.Name) + + mtimeTime := file.parseMtime() + + // populate children directories + child := dir + for { + if _, ok := knownDirs[child]; ok { + break + } + // directory + d := fs.NewDir(f.opt.Enc.ToStandardPath(path.Join(bucket, child)), mtimeTime) + entries = append(entries, d) + + knownDirs[child] = mtimeTime + child = strings.Trim(betterPathDir(child), "/") + } + if _, ok := knownDirs[betterPathDir(file.Name)]; !ok { + continue + } + + size := parseSize(file.Size) + + o := makeValidObject(f, f.opt.Enc.ToStandardPath(nameWithBucket), file, mtimeTime, size) + entries = append(entries, o) + } + + return entries, nil +} + +func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[hash.Type]string, newSize int64) (ret *Object, err error) { + bucket, bucketPath := f.split(reqPath) + + ret = &Object{ + fs: f, + remote: trimPathPrefix(path.Join(bucket, bucketPath), f.root, f.opt.Enc), + modTime: time.Unix(0, 0), + size: -1, + } + + if f.opt.WaitArchive == 0 { + // user doesn't want to poll, let's not + ret2, err := f.NewObject(ctx, reqPath) + if err == nil { + ret2, ok := ret2.(*Object) + if ok { + ret = ret2 + } + } + return ret, nil + } + + retC := make(chan struct { + *Object + error + }, 1) + go func() { + isFirstTime := true + existed := false + oldMtime := "" + oldCrc32 := "" + unreliablePassCount := 0 + for { + if !isFirstTime { + // depending on the queue, it takes time + time.Sleep(10 * time.Second) + } + metadata, err := f.requestMetadata(ctx, bucket) + if err != nil { + retC <- struct { + *Object + error + }{ret, err} + return + } + + var iaFile *IAFile + for _, f := range metadata.Files { + if f.Name == bucketPath { + iaFile = &f + break + } + } + if isFirstTime { + isFirstTime = false + existed = iaFile != nil + if iaFile != nil { + oldMtime = iaFile.Mtime + oldCrc32 = iaFile.Crc32 + } + } + if iaFile == nil { + continue + } + if !existed && !isFirstTime { + // fast path: file wasn't exited before + retC <- struct { + *Object + error + }{makeValidObject2(f, *iaFile, bucket), nil} + return + } + + hashMatched := true + for tt, sum := range newHashes { + if tt == hash.MD5 && !hash.Equals(iaFile.Md5, sum) { + hashMatched = false + break + } + if tt == hash.SHA1 && !hash.Equals(iaFile.Sha1, sum) { + hashMatched = false + break + } + if tt == hash.CRC32 && !hash.Equals(iaFile.Crc32, sum) { + hashMatched = false + break + } + } + if !hashMatched { + continue + } + if !compareSize(parseSize(iaFile.Size), newSize) { + continue + } + if hash.Equals(oldCrc32, iaFile.Crc32) && unreliablePassCount < 60 { + // the following two are based on a sort of "bad" assumption; + // what if the file is updated immediately, before polling? + // by limiting hits of these tests, avoid infinite loop + unreliablePassCount++ + continue + } + if hash.Equals(iaFile.Mtime, oldMtime) && unreliablePassCount < 60 { + unreliablePassCount++ + continue + } + + // voila! + retC <- struct { + *Object + error + }{makeValidObject2(f, *iaFile, bucket), nil} + return + } + }() + + select { + case res := <-retC: + return res.Object, res.error + case <-time.After(time.Duration(f.opt.WaitArchive)): + return ret, nil + } +} + +func (f *Fs) waitDelete(ctx context.Context, bucket, bucketPath string) (err error) { + if f.opt.WaitArchive == 0 { + // user doesn't want to poll, let's not + return nil + } + + retC := make(chan error, 1) + go func() { + for { + metadata, err := f.requestMetadata(ctx, bucket) + if err != nil { + retC <- err + return + } + + found := false + for _, f := range metadata.Files { + if f.Name == bucketPath { + found = true + break + } + } + + if !found { + retC <- nil + return + } + + // depending on the queue, it takes time + time.Sleep(10 * time.Second) + } + }() + + select { + case res := <-retC: + return res + case <-time.After(time.Duration(f.opt.WaitArchive)): + return nil + } +} + +func makeValidObject(f *Fs, remote string, file IAFile, mtime time.Time, size int64) *Object { + return &Object{ + fs: f, + remote: remote, + modTime: mtime, + size: size, + md5: file.Md5, + crc32: file.Crc32, + sha1: file.Sha1, + } +} + +func makeValidObject2(f *Fs, file IAFile, bucket string) *Object { + mtimeTime := file.parseMtime() + + size := parseSize(file.Size) + + return makeValidObject(f, trimPathPrefix(path.Join(bucket, file.Name), f.root, f.opt.Enc), file, mtimeTime, size) +} + +func (file IAFile) parseMtime() (mtime time.Time) { + // method 1: use metadata added by rclone + var rmArray []string + // rclone-metadata can be an array or string + // try to deserialize it as array first + err := json.Unmarshal(file.RcloneMtime, &rmArray) + if err != nil { + // if not, it's a string + dst := new(string) + err = json.Unmarshal(file.RcloneMtime, dst) + if err == nil { + rmArray = []string{*dst} + } + } + // let's take the first value we can deserialize + for _, value := range rmArray { + mtime, err = time.Parse(time.RFC3339Nano, value) + if err == nil { + break + } + } + if err != nil { + // method 2: use metadata added by IAS3 + mtime, err = swift.FloatStringToTime(file.Mtime) + } + if err != nil { + // metadata files don't have some of the fields + mtime = time.Unix(0, 0) + } + return mtime +} + +func compareSize(a, b int64) bool { + if a < 0 || b < 0 { + // we won't compare if any of them is not known + return true + } + return a == b +} + +func parseSize(str string) int64 { + size, err := strconv.ParseInt(str, 10, 64) + if err != nil { + size = -1 + } + return size +} + +func betterPathDir(p string) string { + d := path.Dir(p) + if d == "." { + return "" + } + return d +} + +func betterPathClean(p string) string { + d := path.Clean(p) + if d == "." { + return "" + } + return d +} + +func trimPathPrefix(s, prefix string, enc encoder.MultiEncoder) string { + // we need to clean the paths to make tests pass! + s = betterPathClean(s) + prefix = betterPathClean(prefix) + if s == prefix || s == prefix+"/" { + return "" + } + prefix = enc.ToStandardPath(strings.TrimRight(prefix, "/")) + return enc.ToStandardPath(strings.TrimPrefix(s, prefix+"/")) +} + +// mimicks urllib.parse.quote() on Python; exclude / from url.PathEscape +func quotePath(s string) string { + seg := strings.Split(s, "/") + newValues := []string{} + for _, v := range seg { + newValues = append(newValues, url.PathEscape(v)) + } + return strings.Join(newValues, "/") +} + +var ( + _ fs.Fs = &Fs{} + _ fs.Copier = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.CleanUpper = &Fs{} + _ fs.PublicLinker = &Fs{} + _ fs.Abouter = &Fs{} + _ fs.Object = &Object{} +) diff --git a/backend/internetarchive/internetarchive_test.go b/backend/internetarchive/internetarchive_test.go new file mode 100644 index 000000000..2faa6f484 --- /dev/null +++ b/backend/internetarchive/internetarchive_test.go @@ -0,0 +1,17 @@ +// Test internetarchive filesystem interface +package internetarchive_test + +import ( + "testing" + + "github.com/rclone/rclone/backend/internetarchive" + "github.com/rclone/rclone/fstest/fstests" +) + +// TestIntegration runs integration tests against the remote +func TestIntegration(t *testing.T) { + fstests.Run(t, &fstests.Opt{ + RemoteName: "TestIA:lesmi-rclone-test/", + NilObject: (*internetarchive.Object)(nil), + }) +} diff --git a/bin/make_manual.py b/bin/make_manual.py index 474d49b9d..0afa5f2b5 100755 --- a/bin/make_manual.py +++ b/bin/make_manual.py @@ -48,6 +48,7 @@ docs = [ "hdfs.md", "http.md", "hubic.md", + "internetarchive.md", "jottacloud.md", "koofr.md", "mailru.md", diff --git a/docs/content/_index.md b/docs/content/_index.md index 083ad49c2..043cc4c00 100644 --- a/docs/content/_index.md +++ b/docs/content/_index.md @@ -127,6 +127,7 @@ WebDAV or S3, that work out of the box.) {{< provider name="HDFS" home="https://hadoop.apache.org/" config="/hdfs/" >}} {{< provider name="HTTP" home="https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol" config="/http/" >}} {{< provider name="Hubic" home="https://hubic.com/" config="/hubic/" >}} +{{< provider name="Internet Archive" home="https://archive.org/" config="/internetarchive/" >}} {{< provider name="Jottacloud" home="https://www.jottacloud.com/en/" config="/jottacloud/" >}} {{< provider name="IBM COS S3" home="http://www.ibm.com/cloud/object-storage" config="/s3/#ibm-cos-s3" >}} {{< provider name="Koofr" home="https://koofr.eu/" config="/koofr/" >}} diff --git a/docs/content/docs.md b/docs/content/docs.md index 5d0bb3544..8a072e67a 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -50,6 +50,7 @@ See the following for detailed instructions for * [HDFS](/hdfs/) * [HTTP](/http/) * [Hubic](/hubic/) + * [Internet Archive](/internetarchive/) * [Jottacloud](/jottacloud/) * [Koofr](/koofr/) * [Mail.ru Cloud](/mailru/) diff --git a/docs/content/internetarchive.md b/docs/content/internetarchive.md new file mode 100644 index 000000000..d0e937fdf --- /dev/null +++ b/docs/content/internetarchive.md @@ -0,0 +1,222 @@ +--- +title: "Internet Archive" +description: "Rclone docs for Internet Archive" +--- + +# {{< icon "fa fa-archive" >}} Internet Archive + +The Internet Archive backend utilizes Items on [archive.org](https://archive.org/) + +Refer to [IAS3 API documentation](https://archive.org/services/docs/api/ias3.html) for the API this backend uses. + +Paths are specified as `remote:bucket` (or `remote:` for the `lsd` +command.) You may put subdirectories in too, e.g. `remote:item/path/to/dir`. + +Once you have made a remote (see the provider specific section above) +you can use it like this: + +Unlike S3, listing up all items uploaded by you isn't supported. + +Make a new item + + rclone mkdir remote:item + +List the contents of a item + + rclone ls remote:item + +Sync `/home/local/directory` to the remote item, deleting any excess +files in the item. + + rclone sync -i /home/local/directory remote:item + +## Notes +Because of Internet Archive's architecture, it enqueues write operations (and extra post-processings) in a per-item queue. You can check item's queue at https://catalogd.archive.org/history/item-name-here . Because of that, all uploads/deletes will not show up immediately and takes some time to be available. +The per-item queue is enqueued to an another queue, Item Deriver Queue. [You can check the status of Item Deriver Queue here.](https://catalogd.archive.org/catalog.php?whereami=1) This queue has a limit, and it may block you from uploading, or even deleting. You should avoid uploading a lot of small files for better behavior. + +You can optionally wait for the server's processing to finish, by setting non-zero value to `wait_archive` key. +By making it wait, rclone can do normal file comparison. +Make sure to set a large enough value (e.g. `30m0s` for smaller files) as it can take a long time depending on server's queue. + +## Configuration + +Here is an example of making an internetarchive configuration. +Most applies to the other providers as well, any differences are described [below](#providers). + +First run + + rclone config + +This will guide you through an interactive setup process. + +``` +No remotes found, make a new one? +n) New remote +s) Set configuration password +q) Quit config +n/s/q> n +name> remote +Option Storage. +Type of storage to configure. +Choose a number from below, or type in your own value. +XX / InternetArchive Items + \ (internetarchive) +Storage> internetarchive +Option access_key_id. +IAS3 Access Key. +Leave blank for anonymous access. +You can find one here: https://archive.org/account/s3.php +Enter a value. Press Enter to leave empty. +access_key_id> XXXX +Option secret_access_key. +IAS3 Secret Key (password). +Leave blank for anonymous access. +Enter a value. Press Enter to leave empty. +secret_access_key> XXXX +Edit advanced config? +y) Yes +n) No (default) +y/n> y +Option endpoint. +IAS3 Endpoint. +Leave blank for default value. +Enter a string value. Press Enter for the default (https://s3.us.archive.org). +endpoint> +Option front_endpoint. +Host of InternetArchive Frontend. +Leave blank for default value. +Enter a string value. Press Enter for the default (https://archive.org). +front_endpoint> +Option disable_checksum. +Don't store MD5 checksum with object metadata. +Normally rclone will calculate the MD5 checksum of the input before +uploading it so it can ask the server to check the object against checksum. +This is great for data integrity checking but can cause long delays for +large files to start uploading. +Enter a boolean value (true or false). Press Enter for the default (true). +disable_checksum> true +Option encoding. +The encoding for the backend. +See the [encoding section in the overview](/overview/#encoding) for more info. +Enter a encoder.MultiEncoder value. Press Enter for the default (Slash,Question,Hash,Percent,Del,Ctl,InvalidUtf8,Dot). +encoding> +Edit advanced config? +y) Yes +n) No (default) +y/n> n +-------------------- +[remote] +type = internetarchive +access_key_id = XXXX +secret_access_key = XXXX +-------------------- +y) Yes this is OK (default) +e) Edit this remote +d) Delete this remote +y/e/d> y +``` + +{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/internetarchive/internetarchive.go then run make backenddocs" >}} +### Standard options + +Here are the standard options specific to internetarchive (Internet Archive). + +#### --internetarchive-access-key-id + +IAS3 Access Key. + +Leave blank for anonymous access. +You can find one here: https://archive.org/account/s3.php + +Properties: + +- Config: access_key_id +- Env Var: RCLONE_INTERNETARCHIVE_ACCESS_KEY_ID +- Type: string +- Required: false + +#### --internetarchive-secret-access-key + +IAS3 Secret Key (password). + +Leave blank for anonymous access. + +Properties: + +- Config: secret_access_key +- Env Var: RCLONE_INTERNETARCHIVE_SECRET_ACCESS_KEY +- Type: string +- Required: false + +### Advanced options + +Here are the advanced options specific to internetarchive (Internet Archive). + +#### --internetarchive-endpoint + +IAS3 Endpoint. + +Leave blank for default value. + +Properties: + +- Config: endpoint +- Env Var: RCLONE_INTERNETARCHIVE_ENDPOINT +- Type: string +- Default: "https://s3.us.archive.org" + +#### --internetarchive-front-endpoint + +Host of InternetArchive Frontend. + +Leave blank for default value. + +Properties: + +- Config: front_endpoint +- Env Var: RCLONE_INTERNETARCHIVE_FRONT_ENDPOINT +- Type: string +- Default: "https://archive.org" + +#### --internetarchive-disable-checksum + +Don't ask the server to test against MD5 checksum calculated by rclone. +Normally rclone will calculate the MD5 checksum of the input before +uploading it so it can ask the server to check the object against checksum. +This is great for data integrity checking but can cause long delays for +large files to start uploading. + +Properties: + +- Config: disable_checksum +- Env Var: RCLONE_INTERNETARCHIVE_DISABLE_CHECKSUM +- Type: bool +- Default: true + +#### --internetarchive-wait-archive + +Timeout for waiting the server's processing tasks (specifically archive and book_op) to finish. +Only enable if you need to be guaranteed to be reflected after write operations. +0 to disable waiting. No errors to be thrown in case of timeout. + +Properties: + +- Config: wait_archive +- Env Var: RCLONE_INTERNETARCHIVE_WAIT_ARCHIVE +- Type: Duration +- Default: 0s + +#### --internetarchive-encoding + +The encoding for the backend. + +See the [encoding section in the overview](/overview/#encoding) for more info. + +Properties: + +- Config: encoding +- Env Var: RCLONE_INTERNETARCHIVE_ENCODING +- Type: MultiEncoder +- Default: Slash,LtGt,CrLf,Del,Ctl,InvalidUtf8,Dot + +{{< rem autogenerated options stop >}} diff --git a/docs/content/overview.md b/docs/content/overview.md index c86b791b8..73974b65b 100644 --- a/docs/content/overview.md +++ b/docs/content/overview.md @@ -32,6 +32,7 @@ Here is an overview of the major features of each cloud storage system. | HDFS | - | Yes | No | No | - | | HTTP | - | No | No | No | R | | Hubic | MD5 | Yes | No | No | R/W | +| Internet Archive | MD5, SHA1, CRC32 | Yes | No | No | - | | Jottacloud | MD5 | Yes | Yes | No | R | | Koofr | MD5 | No | Yes | No | - | | Mail.ru Cloud | Mailru ⁶ | Yes | Yes | No | - | @@ -427,6 +428,7 @@ upon backend-specific capabilities. | HDFS | Yes | No | Yes | Yes | No | No | Yes | No | Yes | Yes | | HTTP | No | No | No | No | No | No | No | No | No | Yes | | Hubic | Yes † | Yes | No | No | No | Yes | Yes | No | Yes | No | +| Internet Archive | No | Yes | No | No | Yes | Yes | No | Yes | Yes | No | | Jottacloud | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | | Mail.ru Cloud | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | Yes | | Mega | Yes | No | Yes | Yes | Yes | No | No | Yes | Yes | Yes | diff --git a/docs/layouts/chrome/navbar.html b/docs/layouts/chrome/navbar.html index fe532b45a..552b976ca 100644 --- a/docs/layouts/chrome/navbar.html +++ b/docs/layouts/chrome/navbar.html @@ -73,6 +73,7 @@ HDFS (Hadoop Distributed Filesystem) HTTP Hubic + Internet Archive Jottacloud Koofr Mail.ru Cloud diff --git a/fstest/test_all/config.yaml b/fstest/test_all/config.yaml index 81afe169e..ca36aa7d8 100644 --- a/fstest/test_all/config.yaml +++ b/fstest/test_all/config.yaml @@ -133,6 +133,9 @@ backends: - backend: "hubic" remote: "TestHubic:" fastlist: false + - backend: "internetarchive" + remote: "TestIA:lesmi-rclone-test/" + fastlist: true - backend: "jottacloud" remote: "TestJottacloud:" fastlist: true