drive: constrain list by filter #5023

Google Drive API allows for clauses like "modifiedTime > '2012-06-04T12:00:00'"
in the query param, so the filter flags --max-age and --min-age can be applied
directly at the directory listing phase rather than in a filter.
This is extremely helpful when we want to do an incremental backup of a remote
drive with many files but the number of recently changed file is small.

Co-authored-by: fotile96 <fotile96@users.noreply.github.com>
This commit is contained in:
Ivan Andreev 2020-12-02 19:20:58 +03:00
parent 729704bcb8
commit a2545066e2
5 changed files with 145 additions and 7 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/config/obscure"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/fspath"
@ -616,6 +617,7 @@ type Fs struct {
client *http.Client // authorized client
rootFolderID string // the id of the root folder
dirCache *dircache.DirCache // Map of directory path to directory id
lastQuery string // Last query string to check in unit tests
pacer *fs.Pacer // To pace the API calls
exportExtensions []string // preferred extensions to download docs
importMimeTypes []string // MIME types to convert to docs
@ -829,11 +831,31 @@ func (f *Fs) list(ctx context.Context, dirIDs []string, title string, directorie
if filesOnly {
query = append(query, fmt.Sprintf("mimeType!='%s'", driveFolderType))
}
list := f.svc.Files.List()
if len(query) > 0 {
list.Q(strings.Join(query, " and "))
// fmt.Printf("list Query = %q\n", query)
// Constrain query using filter if this remote is a sync/copy/walk source.
if fi, use := filter.GetConfig(ctx), filter.GetUseFilter(ctx); fi != nil && use {
queryByTime := func(op string, tm time.Time) {
if tm.IsZero() {
return
}
// https://developers.google.com/drive/api/v3/ref-search-terms#operators
// Query times use RFC 3339 format, default timezone is UTC
timeStr := tm.UTC().Format("2006-01-02T15:04:05")
term := fmt.Sprintf("(modifiedTime %s '%s' or mimeType = '%s')", op, timeStr, driveFolderType)
query = append(query, term)
}
queryByTime(">=", fi.ModTimeFrom)
queryByTime("<=", fi.ModTimeTo)
}
list := f.svc.Files.List()
queryString := strings.Join(query, " and ")
if queryString != "" {
list.Q(queryString)
// fs.Debugf(f, "list query: %q", queryString)
}
f.lastQuery = queryString // for unit tests
if f.opt.ListChunk > 0 {
list.PageSize(f.opt.ListChunk)
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime"
@ -17,8 +18,10 @@ import (
"github.com/pkg/errors"
_ "github.com/rclone/rclone/backend/local"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/sync"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests"
"github.com/rclone/rclone/lib/random"
@ -461,6 +464,81 @@ func (f *Fs) InternalTestCopyID(t *testing.T) {
})
}
// TestIntegration/FsMkdir/FsPutFiles/Internal/AgeQuery
func (f *Fs) InternalTestAgeQuery(t *testing.T) {
opt := &filter.Opt{}
err := opt.MaxAge.Set("1h")
assert.NoError(t, err)
flt, err := filter.NewFilter(opt)
assert.NoError(t, err)
defCtx := context.Background()
fltCtx := filter.ReplaceConfig(defCtx, flt)
testCtx1 := fltCtx
testCtx2 := filter.SetUseFilter(testCtx1, true)
testCtx3, testCancel := context.WithCancel(testCtx2)
testCtx4 := filter.SetUseFilter(testCtx3, false)
testCancel()
assert.False(t, filter.GetUseFilter(testCtx1))
assert.True(t, filter.GetUseFilter(testCtx2))
assert.True(t, filter.GetUseFilter(testCtx3))
assert.False(t, filter.GetUseFilter(testCtx4))
subRemote := fmt.Sprintf("%s:%s/%s", f.Name(), f.Root(), "agequery-testdir")
subFsResult, err := fs.NewFs(defCtx, subRemote)
require.NoError(t, err)
subFs, isDriveFs := subFsResult.(*Fs)
require.True(t, isDriveFs)
tempDir1, err := ioutil.TempDir("", "rclone-drive-agequery1-test")
require.NoError(t, err)
defer func() {
_ = os.RemoveAll(tempDir1)
}()
tempFs1, err := fs.NewFs(defCtx, tempDir1)
require.NoError(t, err)
tempDir2, err := ioutil.TempDir("", "rclone-drive-agequery2-test")
require.NoError(t, err)
defer func() {
_ = os.RemoveAll(tempDir2)
}()
tempFs2, err := fs.NewFs(defCtx, tempDir2)
require.NoError(t, err)
file1 := fstest.Item{ModTime: time.Now(), Path: "agequery.txt"}
_, _ = fstests.PutTestContents(defCtx, t, tempFs1, &file1, "abcxyz", true)
// validate sync/copy
const timeQuery = "(modifiedTime >= '"
assert.NoError(t, sync.CopyDir(defCtx, subFs, tempFs1, false))
assert.NotContains(t, subFs.lastQuery, timeQuery)
assert.NoError(t, sync.CopyDir(fltCtx, subFs, tempFs1, false))
assert.Contains(t, subFs.lastQuery, timeQuery)
assert.NoError(t, sync.CopyDir(fltCtx, tempFs2, subFs, false))
assert.Contains(t, subFs.lastQuery, timeQuery)
assert.NoError(t, sync.CopyDir(defCtx, tempFs2, subFs, false))
assert.NotContains(t, subFs.lastQuery, timeQuery)
// validate list/walk
devNull, errOpen := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
require.NoError(t, errOpen)
defer func() {
_ = devNull.Close()
}()
assert.NoError(t, operations.List(defCtx, subFs, devNull))
assert.NotContains(t, subFs.lastQuery, timeQuery)
assert.NoError(t, operations.List(fltCtx, subFs, devNull))
assert.Contains(t, subFs.lastQuery, timeQuery)
}
func (f *Fs) InternalTest(t *testing.T) {
// These tests all depend on each other so run them as nested tests
t.Run("DocumentImport", func(t *testing.T) {
@ -478,6 +556,7 @@ func (f *Fs) InternalTest(t *testing.T) {
t.Run("Shortcuts", f.InternalTestShortcuts)
t.Run("UnTrash", f.InternalTestUnTrash)
t.Run("CopyID", f.InternalTestCopyID)
t.Run("AgeQuery", f.InternalTestAgeQuery)
}
var _ fstests.InternalTester = (*Fs)(nil)

View File

@ -598,9 +598,9 @@ func (f *Filter) UsesDirectoryFilters() bool {
return true
}
// Context key for config
type configContextKeyType struct{}
// Context key for config
var configContextKey = configContextKeyType{}
// GetConfig returns the global or context sensitive config
@ -645,3 +645,29 @@ func ReplaceConfig(ctx context.Context, f *Filter) context.Context {
newCtx := context.WithValue(ctx, configContextKey, f)
return newCtx
}
// Context key for the "use filter" flag
type useFlagContextKeyType struct{}
var useFlagContextKey = useFlagContextKeyType{}
// GetUseFilter obtains the "use filter" flag from context
// The flag tells filter-aware backends (Drive) to constrain List using filter
func GetUseFilter(ctx context.Context) bool {
if ctx != nil {
if pVal := ctx.Value(useFlagContextKey); pVal != nil {
return *(pVal.(*bool))
}
}
return false
}
// SetUseFilter returns a context having (re)set the "use filter" flag
func SetUseFilter(ctx context.Context, useFilter bool) context.Context {
if useFilter == GetUseFilter(ctx) {
return ctx // Minimize depth of nested contexts
}
pVal := new(bool)
*pVal = useFilter
return context.WithValue(ctx, useFlagContextKey, pVal)
}

View File

@ -49,6 +49,7 @@ type Marcher interface {
}
// init sets up a march over opt.Fsrc, and opt.Fdst calling back callback for each match
// Note: this will flag filter-aware backends on the source side
func (m *March) init(ctx context.Context) {
ci := fs.GetConfig(ctx)
m.srcListDir = m.makeListDir(ctx, m.Fsrc, m.SrcIncludeAll)
@ -76,13 +77,15 @@ type listDirFn func(dir string) (entries fs.DirEntries, err error)
// makeListDir makes constructs a listing function for the given fs
// and includeAll flags for marching through the file system.
// Note: this will optionally flag filter-aware backends!
func (m *March) makeListDir(ctx context.Context, f fs.Fs, includeAll bool) listDirFn {
ci := fs.GetConfig(ctx)
fi := filter.GetConfig(ctx)
if !(ci.UseListR && f.Features().ListR != nil) && // !--fast-list active and
!(ci.NoTraverse && fi.HaveFilesFrom()) { // !(--files-from and --no-traverse)
return func(dir string) (entries fs.DirEntries, err error) {
return list.DirSorted(m.Ctx, f, includeAll, dir)
dirCtx := filter.SetUseFilter(m.Ctx, !includeAll) // make filter-aware backends constrain List
return list.DirSorted(dirCtx, f, includeAll, dir)
}
}
@ -98,7 +101,8 @@ func (m *March) makeListDir(ctx context.Context, f fs.Fs, includeAll bool) listD
mu.Lock()
defer mu.Unlock()
if !started {
dirs, dirsErr = walk.NewDirTree(m.Ctx, f, m.Dir, includeAll, ci.MaxDepth)
dirCtx := filter.SetUseFilter(m.Ctx, !includeAll) // make filter-aware backends constrain List
dirs, dirsErr = walk.NewDirTree(dirCtx, f, m.Dir, includeAll, ci.MaxDepth)
started = true
}
if dirsErr != nil {

View File

@ -57,10 +57,13 @@ type Func func(path string, entries fs.DirEntries, err error) error
// If --files-from and --no-traverse is set then a DirTree will be
// constructed with just those files in and then walked with WalkR
//
// Note: this will flag filter-aware backends!
//
// NB (f, path) to be replaced by fs.Dir at some point
func Walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel int, fn Func) error {
ci := fs.GetConfig(ctx)
fi := filter.GetConfig(ctx)
ctx = filter.SetUseFilter(ctx, !includeAll) // make filter-aware backends constrain List
if ci.NoTraverse && fi.HaveFilesFrom() {
return walkR(ctx, f, path, includeAll, maxLevel, fn, fi.MakeListR(ctx, f.NewObject))
}
@ -138,6 +141,8 @@ func (l ListType) Filter(in *fs.DirEntries) {
// This is implemented by using ListR on the backend if possible and
// efficient, otherwise by Walk.
//
// Note: this will flag filter-aware backends
//
// NB (f, path) to be replaced by fs.Dir at some point
func ListR(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel int, listType ListType, fn fs.ListRCallback) error {
fi := filter.GetConfig(ctx)
@ -152,10 +157,12 @@ func ListR(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel
fi.UsesDirectoryFilters() { // ...using any directory filters
return listRwalk(ctx, f, path, includeAll, maxLevel, listType, fn)
}
ctx = filter.SetUseFilter(ctx, !includeAll) // make filter-aware backends constrain List
return listR(ctx, f, path, includeAll, listType, fn, doListR, listType.Dirs() && f.Features().BucketBased)
}
// listRwalk walks the file tree for ListR using Walk
// Note: this will flag filter-aware backends (via Walk)
func listRwalk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel int, listType ListType, fn fs.ListRCallback) error {
var listErr error
walkErr := Walk(ctx, f, path, includeAll, maxLevel, func(path string, entries fs.DirEntries, err error) error {