From d8984cd37f62a220ba784e634a3328b47c19c64f Mon Sep 17 00:00:00 2001 From: buengese Date: Sun, 17 Jan 2021 02:04:26 +0100 Subject: [PATCH] compress: correctly handle wrapping of remotes without PutStream Also fixes ObjectInfo wrapping for Hash and Size - fixes #4928 --- backend/compress/compress.go | 110 ++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 26 deletions(-) diff --git a/backend/compress/compress.go b/backend/compress/compress.go index 5cfd90324..ae12b4917 100644 --- a/backend/compress/compress.go +++ b/backend/compress/compress.go @@ -12,6 +12,8 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" + "os" "regexp" "strings" "time" @@ -27,6 +29,7 @@ import ( "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/fspath" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/operations" ) @@ -89,15 +92,26 @@ func init() { Level 0 turns off compression.`, Default: sgzip.DefaultCompression, Advanced: true, + }, { + Name: "ram_cache_limit", + Help: `Some remotes don't allow the upload of files with unknown size. + In this case the compressed file will need to be cached to determine + it's size. + + Files smaller than this limit will be cached in RAM, file larger than + this limit will be cached on disk`, + Default: fs.SizeSuffix(20 * 1024 * 1024), + Advanced: true, }}, }) } // Options defines the configuration for this backend type Options struct { - Remote string `config:"remote"` - CompressionMode string `config:"mode"` - CompressionLevel int `config:"level"` + Remote string `config:"remote"` + CompressionMode string `config:"mode"` + CompressionLevel int `config:"level"` + RAMCacheLimit fs.SizeSuffix `config:"ram_cache_limit"` } /*** FILESYSTEM FUNCTIONS ***/ @@ -416,8 +430,55 @@ type compressionResult struct { meta sgzip.GzipMetadata } +// replicating some of operations.Rcat functionality because we want to support remotes without streaming +// support and of course cannot know the size of a compressed file before compressing it. +func (f *Fs) rcat(ctx context.Context, dstFileName string, in io.ReadCloser, modTime time.Time, options []fs.OpenOption) (o fs.Object, err error) { + + // cache small files in memory and do normal upload + buf := make([]byte, f.opt.RAMCacheLimit) + if n, err := io.ReadFull(in, buf); err == io.EOF || err == io.ErrUnexpectedEOF { + src := object.NewStaticObjectInfo(dstFileName, modTime, int64(len(buf[:n])), false, nil, f.Fs) + return f.Fs.Put(ctx, bytes.NewBuffer(buf[:n]), src, options...) + } + + // Need to include what we allready read + in = &ReadCloserWrapper{ + Reader: io.MultiReader(bytes.NewReader(buf), in), + Closer: in, + } + + canStream := f.Fs.Features().PutStream != nil + if canStream { + src := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, f.Fs) + return f.Fs.Features().PutStream(ctx, in, src, options...) + } + + fs.Debugf(f, "Target remote doesn't support streaming uploads, creating temporary local file") + tempFile, err := ioutil.TempFile("", "rclone-press-") + defer func() { + // these errors should be relatively uncritical and the upload should've succeeded so it's okay-ish + // to ignore them + _ = tempFile.Close() + _ = os.Remove(tempFile.Name()) + }() + if err != nil { + return nil, errors.Wrap(err, "Failed to create temporary local FS to spool file") + } + if _, err = io.Copy(tempFile, in); err != nil { + return nil, errors.Wrap(err, "Failed to write temporary local file") + } + if _, err = tempFile.Seek(0, 0); err != nil { + return nil, err + } + finfo, err := tempFile.Stat() + if err != nil { + return nil, err + } + return f.Fs.Put(ctx, tempFile, object.NewStaticObjectInfo(dstFileName, modTime, finfo.Size(), false, nil, f.Fs)) +} + // Put a compressed version of a file. Returns a wrappable object and metadata. -func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) { +func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) { // Unwrap reader accounting in, wrap := accounting.UnWrap(in) @@ -471,7 +532,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o } // Transfer the data - o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), f.mode), src.Size()), options...) + o, err := f.rcat(ctx, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx), options) //o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx)) if err != nil { if o != nil { @@ -510,7 +571,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o } // Put an uncompressed version of a file. Returns a wrappable object and metadata. -func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) { +func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, put putFn, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) { // Unwrap the accounting, add our metadata hasher, then wrap it back on in, wrap := accounting.UnWrap(in) @@ -577,6 +638,8 @@ func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.Objec // This function will put both the data and metadata for an Object. // putData is the function used for data, while putMeta is the function used for metadata. +// The putData function will only be used when the object is not compressible if the +// data is compressible this parameter will be ignored. func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) { // Put file then metadata @@ -584,9 +647,9 @@ func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.Ob var meta *ObjectMetadata var err error if compressible { - dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType) + dataObject, meta, err = f.putCompress(ctx, in, src, options, mimeType) } else { - dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType) + dataObject, meta, err = f.putUncompress(ctx, in, src, putData, options, mimeType) } if err != nil { return nil, err @@ -837,7 +900,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string func (f *Fs) CleanUp(ctx context.Context) error { do := f.Fs.Features().CleanUp if do == nil { - return errors.New("can't CleanUp") + return errors.New("can't CleanUp: not supported by underlying remote") } return do(ctx) } @@ -846,7 +909,7 @@ func (f *Fs) CleanUp(ctx context.Context) error { func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { do := f.Fs.Features().About if do == nil { - return nil, errors.New("About not supported") + return nil, errors.New("can't About: not supported by underlying remote") } return do(ctx) } @@ -922,7 +985,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration, unlink bool) (string, error) { do := f.Fs.Features().PublicLink if do == nil { - return "", errors.New("PublicLink not supported") + return "", errors.New("can't PublicLink: not supported by underlying remote") } o, err := f.NewObject(ctx, remote) if err != nil { @@ -1033,7 +1096,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } } } else { - // Function that updates object + // We can only support update when BOTH the old and the new object are uncompressed because only then + // the filesize will be known beforehand and name will stay the same update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { return o.Object, o.Object.Update(ctx, in, src, options...) } @@ -1121,7 +1185,7 @@ func (o *Object) String() string { func (o *Object) Remote() string { origFileName, _, _, err := processFileName(o.Object.Remote()) if err != nil { - fs.Errorf(o, "Could not get remote path for: %s", o.Object.Remote()) + fs.Errorf(o.f, "Could not get remote path for: %s", o.Object.Remote()) return o.Object.Remote() } return origFileName @@ -1161,15 +1225,19 @@ func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) { // multiple storage classes supported func (o *Object) SetTier(tier string) error { do, ok := o.Object.(fs.SetTierer) + mdo, ok := o.mo.(fs.SetTierer) if !ok { return errors.New("press: underlying remote does not support SetTier") } + if err := mdo.SetTier(tier); err != nil { + return err + } return do.SetTier(tier) } // GetTier returns storage tier or class of the Object func (o *Object) GetTier() string { - do, ok := o.Object.(fs.GetTierer) + do, ok := o.mo.(fs.GetTierer) if !ok { return "" } @@ -1272,10 +1340,7 @@ func (o *ObjectInfo) Remote() string { // Size returns the size of the file func (o *ObjectInfo) Size() int64 { - if o.size != -1 { - return o.size - } - return o.src.Size() + return o.size } // ModTime returns the modification time @@ -1286,14 +1351,7 @@ func (o *ObjectInfo) ModTime(ctx context.Context) time.Time { // Hash returns the selected checksum of the file // If no checksum is available it returns "" func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) { - if ht != hash.MD5 { - return "", hash.ErrUnsupported - } - value, err := o.src.Hash(ctx, ht) - if err == hash.ErrUnsupported { - return "", hash.ErrUnsupported - } - return value, err + return "", nil // cannot know the checksum } // ID returns the ID of the Object if known, or "" if not