s3: implement Purge to purge versions and `backend cleanup-hidden`

This commit is contained in:
Nick Craig-Wood 2022-07-26 15:03:32 +01:00
parent 0ae171416f
commit 81d242473a
3 changed files with 129 additions and 0 deletions

View File

@ -37,6 +37,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/ncw/swift/v2"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
@ -3739,6 +3740,17 @@ Durations are parsed as per the rest of rclone, 2h, 7d, 7w etc.
Opts: map[string]string{
"max-age": "Max age of upload to delete",
},
}, {
Name: "cleanup-hidden",
Short: "Remove old versions of files.",
Long: `This command removes any old hidden versions of files
on a versions enabled bucket.
Note that you can use -i/--dry-run with this command to see what it
would do.
rclone backend cleanup-hidden s3:bucket/path/to/dir
`,
}, {
Name: "versioning",
Short: "Set/get versioning support for a bucket.",
@ -3843,6 +3855,8 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str
}
}
return nil, f.cleanUp(ctx, maxAge)
case "cleanup-hidden":
return nil, f.CleanUpHidden(ctx)
case "versioning":
return f.setGetVersioning(ctx, arg...)
default:
@ -4029,6 +4043,91 @@ func (f *Fs) CleanUp(ctx context.Context) (err error) {
return f.cleanUp(ctx, 24*time.Hour)
}
// purge deletes all the files and directories
//
// if oldOnly is true then it deletes only non current files.
//
// Implemented here so we can make sure we delete old versions.
func (f *Fs) purge(ctx context.Context, dir string, oldOnly bool) error {
bucket, directory := f.split(dir)
if bucket == "" {
return errors.New("can't purge from root")
}
versioned := f.isVersioned(ctx)
if !versioned && oldOnly {
fs.Infof(f, "bucket is not versioned so not removing old versions")
return nil
}
var errReturn error
var checkErrMutex sync.Mutex
var checkErr = func(err error) {
if err == nil {
return
}
checkErrMutex.Lock()
defer checkErrMutex.Unlock()
if errReturn == nil {
errReturn = err
}
}
// Delete Config.Transfers in parallel
delChan := make(fs.ObjectsChan, f.ci.Transfers)
delErr := make(chan error, 1)
go func() {
delErr <- operations.DeleteFiles(ctx, delChan)
}()
checkErr(f.list(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", true, versioned, true, false, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error {
if isDirectory {
return nil
}
oi, err := f.newObjectWithInfo(ctx, remote, object, versionID)
if err != nil {
fs.Errorf(object, "Can't create object %+v", err)
return nil
}
tr := accounting.Stats(ctx).NewCheckingTransfer(oi)
// Work out whether the file is the current version or not
isCurrentVersion := !versioned || !version.Match(remote)
fs.Debugf(nil, "%q version %v", remote, version.Match(remote))
if oldOnly && isCurrentVersion {
// Check current version of the file
if object.Size == isDeleteMarker {
fs.Debugf(remote, "Deleting current version (id %q) as it is a delete marker", aws.StringValue(versionID))
delChan <- oi
} else {
fs.Debugf(remote, "Not deleting current version %q", aws.StringValue(versionID))
}
} else {
if object.Size == isDeleteMarker {
fs.Debugf(remote, "Deleting delete marker (id %q)", aws.StringValue(versionID))
} else {
fs.Debugf(remote, "Deleting (id %q)", aws.StringValue(versionID))
}
delChan <- oi
}
tr.Done(ctx, nil)
return nil
}))
close(delChan)
checkErr(<-delErr)
if !oldOnly {
checkErr(f.Rmdir(ctx, dir))
}
return errReturn
}
// Purge deletes all the files and directories including the old versions.
func (f *Fs) Purge(ctx context.Context, dir string) error {
return f.purge(ctx, dir, false)
}
// CleanUpHidden deletes all the hidden files.
func (f *Fs) CleanUpHidden(ctx context.Context) error {
return f.purge(ctx, "", true)
}
// ------------------------------------------------------------
// Fs returns the parent Fs
@ -5042,6 +5141,7 @@ func (o *Object) Metadata(ctx context.Context) (metadata fs.Metadata, err error)
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Purger = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}

View File

@ -158,6 +158,20 @@ func (f *Fs) InternalTestVersions(t *testing.T) {
assert.Equal(t, int64(100), o.Size(), o.Remote())
})
})
t.Run("Cleanup", func(t *testing.T) {
require.NoError(t, f.CleanUpHidden(ctx))
items := append([]fstest.Item{newItem}, fstests.InternalTestFiles...)
fstest.CheckListing(t, f, items)
// Set --s3-versions for this test
f.opt.Versions = true
defer func() {
f.opt.Versions = false
}()
fstest.CheckListing(t, f, items)
})
// Purge gets tested later
}
func (f *Fs) InternalTest(t *testing.T) {

View File

@ -2777,6 +2777,21 @@ Options:
- "max-age": Max age of upload to delete
### cleanup-hidden
Remove old versions of files.
rclone backend cleanup-hidden remote: [options] [<arguments>+]
This command removes any old hidden versions of files
on a versions enabled bucket.
Note that you can use -i/--dry-run with this command to see what it
would do.
rclone backend cleanup-hidden s3:bucket/path/to/dir
### versioning
Set/get versioning support for a bucket.