From e5aa92c9222d8d999c9bd0a20c2b6074aab43f5a Mon Sep 17 00:00:00 2001 From: Robert Thomas <31854736+wolveix@users.noreply.github.com> Date: Wed, 24 Feb 2021 09:33:31 +0000 Subject: [PATCH] dropbox: add polling support - fixes #2949 This implements polling support for the Dropbox backend. The Dropbox SDK dependency had to be updated due to an auth issue, which was fixed on Jan 12 2021. A secondary internal Dropbox service was created to handle unauthorized SDK requests, as is necessary when using the ListFolderLongpoll function/endpoint. The config variable was renamed to cfg to avoid potential conflicts with the imported config package. --- backend/dropbox/dropbox.go | 170 ++++++++++++++++++++++++++++++++++--- go.mod | 2 +- go.sum | 5 +- 3 files changed, 163 insertions(+), 14 deletions(-) diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index b61ef5b3f..456c7cc69 100755 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -219,11 +219,11 @@ shared folder.`, // as invalid characters. // Testing revealed names with trailing spaces and the DEL character don't work. // Also encode invalid UTF-8 bytes as json doesn't handle them properly. - Default: (encoder.Base | + Default: encoder.Base | encoder.EncodeBackSlash | encoder.EncodeDel | encoder.EncodeRightSpace | - encoder.EncodeInvalidUtf8), + encoder.EncodeInvalidUtf8, }}...), }) } @@ -242,8 +242,10 @@ type Fs struct { name string // name of this remote root string // the path we are working on opt Options // parsed options + ci *fs.ConfigInfo // global config features *fs.Features // optional features srv files.Client // the connection to the dropbox server + svc files.Client // the connection to the dropbox server (unauthorized) sharing sharing.Client // as above, but for generating sharing links users users.Client // as above, but for accessing user information team team.Client // for the Teams API @@ -367,22 +369,29 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, errors.Wrap(err, "failed to configure dropbox") } + ci := fs.GetConfig(ctx) + f := &Fs{ name: name, opt: *opt, + ci: ci, pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } - config := dropbox.Config{ + cfg := dropbox.Config{ LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo Client: oAuthClient, // maybe??? HeaderGenerator: f.headerGenerator, } + // unauthorized config for endpoints that fail with auth + ucfg := dropbox.Config{ + LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo + } + // NOTE: needs to be created pre-impersonation so we can look up the impersonated user - f.team = team.New(config) + f.team = team.New(cfg) if opt.Impersonate != "" { - user := team.UserSelectorArg{ Email: opt.Impersonate, } @@ -397,12 +406,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, errors.Wrapf(err, "invalid dropbox team member: %q", opt.Impersonate) } - config.AsMemberID = memberIds[0].MemberInfo.Profile.MemberProfile.TeamMemberId + cfg.AsMemberID = memberIds[0].MemberInfo.Profile.MemberProfile.TeamMemberId } - f.srv = files.New(config) - f.sharing = sharing.New(config) - f.users = users.New(config) + f.srv = files.New(cfg) + f.svc = files.New(ucfg) + f.sharing = sharing.New(cfg) + f.users = users.New(cfg) f.features = (&fs.Features{ CaseInsensitive: true, ReadMimeType: false, @@ -661,7 +671,7 @@ func (f *Fs) findSharedFolder(name string) (id string, err error) { return "", fs.ErrorDirNotFound } -// mountSharedFolders mount a shared folder to the root namespace +// mountSharedFolder mount a shared folder to the root namespace func (f *Fs) mountSharedFolder(id string) error { arg := sharing.MountFolderArg{ SharedFolderId: id, @@ -673,7 +683,7 @@ func (f *Fs) mountSharedFolder(id string) error { return err } -// listSharedFolders lists shared the user as access to (note this means individual +// listReceivedFiles lists shared the user as access to (note this means individual // files not files contained in shared folders) func (f *Fs) listReceivedFiles() (entries fs.DirEntries, err error) { started := false @@ -1191,6 +1201,144 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) { return usage, nil } +// ChangeNotify calls the passed function with a path that has had changes. +// If the implementation uses polling, it should adhere to the given interval. +// +// Automatically restarts itself in case of unexpected behavior of the remote. +// +// Close the returned channel to stop being notified. +func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { + go func() { + // get the StartCursor early so all changes from now on get processed + startCursor, err := f.changeNotifyCursor() + if err != nil { + fs.Infof(f, "Failed to get StartCursor: %s", err) + } + var ticker *time.Ticker + var tickerC <-chan time.Time + for { + select { + case pollInterval, ok := <-pollIntervalChan: + if !ok { + if ticker != nil { + ticker.Stop() + } + return + } + if ticker != nil { + ticker.Stop() + ticker, tickerC = nil, nil + } + if pollInterval != 0 { + ticker = time.NewTicker(pollInterval) + tickerC = ticker.C + } + case <-tickerC: + if startCursor == "" { + startCursor, err = f.changeNotifyCursor() + if err != nil { + fs.Infof(f, "Failed to get StartCursor: %s", err) + continue + } + } + fs.Debugf(f, "Checking for changes on remote") + startCursor, err = f.changeNotifyRunner(ctx, notifyFunc, startCursor) + if err != nil { + fs.Infof(f, "Change notify listener failure: %s", err) + } + } + } + }() +} + +func (f *Fs) changeNotifyCursor() (cursor string, err error) { + var startCursor *files.ListFolderGetLatestCursorResult + + err = f.pacer.Call(func() (bool, error) { + startCursor, err = f.srv.ListFolderGetLatestCursor(&files.ListFolderArg{Path: f.opt.Enc.FromStandardPath(f.slashRoot), Recursive: true}) + return shouldRetry(err) + }) + if err != nil { + return + } + return startCursor.Cursor, nil +} + +func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), startCursor string) (newCursor string, err error) { + cursor := startCursor + var res *files.ListFolderLongpollResult + + // Dropbox sets a timeout range of 30 - 480 + timeout := uint64(f.ci.Timeout / time.Second) + if timeout > 480 { + timeout = 480 + } + + err = f.pacer.Call(func() (bool, error) { + args := files.ListFolderLongpollArg{ + Cursor: cursor, + Timeout: timeout, + } + + res, err = f.svc.ListFolderLongpoll(&args) + return shouldRetry(err) + }) + if err != nil { + return + } + + if !res.Changes { + return cursor, nil + } + + if res.Backoff != 0 { + fs.Debugf(f, "Waiting to poll for %d seconds", res.Backoff) + time.Sleep(time.Duration(res.Backoff) * time.Second) + } + + for { + var changeList *files.ListFolderResult + + arg := files.ListFolderContinueArg{ + Cursor: cursor, + } + err = f.pacer.Call(func() (bool, error) { + changeList, err = f.srv.ListFolderContinue(&arg) + return shouldRetry(err) + }) + if err != nil { + return "", errors.Wrap(err, "list continue") + } + cursor = changeList.Cursor + var entryType fs.EntryType + for _, entry := range changeList.Entries { + entryPath := "" + switch info := entry.(type) { + case *files.FolderMetadata: + entryType = fs.EntryDirectory + entryPath = strings.TrimLeft(info.PathDisplay, f.slashRootSlash) + case *files.FileMetadata: + entryType = fs.EntryObject + entryPath = strings.TrimLeft(info.PathDisplay, f.slashRootSlash) + case *files.DeletedMetadata: + entryType = fs.EntryObject + entryPath = strings.TrimLeft(info.PathDisplay, f.slashRootSlash) + default: + fs.Errorf(entry, "dropbox ChangeNotify: ignoring unknown EntryType %T", entry) + continue + } + + if entryPath != "" { + notifyFunc(entryPath, entryType) + } + } + if !changeList.HasMore { + break + } + } + return cursor, nil +} + // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { return hash.Set(DbHashType) diff --git a/go.mod b/go.mod index 61958e4ed..2b87b984c 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/colinmarc/hdfs/v2 v2.2.0 github.com/coreos/go-semver v0.3.0 github.com/dop251/scsu v0.0.0-20200422003335-8fadfb689669 - github.com/dropbox/dropbox-sdk-go-unofficial v5.6.0+incompatible + github.com/dropbox/dropbox-sdk-go-unofficial v1.0.1-0.20210114204226-41fdcdae8a53 github.com/gabriel-vasile/mimetype v1.1.2 github.com/gogo/protobuf v1.3.2 // indirect github.com/google/go-querystring v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3ca16fe04..25b46b5aa 100644 --- a/go.sum +++ b/go.sum @@ -175,8 +175,8 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dop251/scsu v0.0.0-20200422003335-8fadfb689669 h1:e28M2/odOZjMc1J2ZZwgex6NM9+aqr1nMlTqPLayxbk= github.com/dop251/scsu v0.0.0-20200422003335-8fadfb689669/go.mod h1:Gth7Xev0h28tuTayG4HlTZy90IXhiDgV2+MLtJzjpP0= -github.com/dropbox/dropbox-sdk-go-unofficial v5.6.0+incompatible h1:DtumzkLk2zZ2SeElEr+VNz+zV7l+BTe509cV4sKPXbM= -github.com/dropbox/dropbox-sdk-go-unofficial v5.6.0+incompatible/go.mod h1:lr+LhMM3F6Y3lW1T9j2U5l7QeuWm87N9+PPXo3yH4qY= +github.com/dropbox/dropbox-sdk-go-unofficial v1.0.1-0.20210114204226-41fdcdae8a53 h1:HQ0F1AdtiOOtx4fv1bYYOBTrwQwxJh2tCWouwmvUjyo= +github.com/dropbox/dropbox-sdk-go-unofficial v1.0.1-0.20210114204226-41fdcdae8a53/go.mod h1:6zG+Yst2Q7BA8rp69tmHlCnt7BxeCyj3rno0B7hYq8k= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -816,6 +816,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=