diff --git a/backend/drive/drive.go b/backend/drive/drive.go index f199858f0..799cbfddd 100755 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "text/template" "time" @@ -68,6 +69,8 @@ const ( minChunkSize = 256 * fs.KibiByte defaultChunkSize = 8 * fs.MebiByte partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents,webViewLink,shortcutDetails" + listRGrouping = 50 // number of IDs to search at once when using ListR + listRInputBuffer = 1000 // size of input buffer when using ListR ) // Globals @@ -558,6 +561,9 @@ type Fs struct { isTeamDrive bool // true if this is a team drive fileFields googleapi.Field // fields to fetch file info with m configmap.Mapper + grouping int32 // number of IDs to search at once in ListR - read with atomic + listRmu *sync.Mutex // protects listRempties + listRempties map[string]struct{} // IDs of supposedly empty directories which triggered grouping disable } type baseObject struct { @@ -1079,11 +1085,14 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) { } f := &Fs{ - name: name, - root: root, - opt: *opt, - pacer: newPacer(opt), - m: m, + name: name, + root: root, + opt: *opt, + pacer: newPacer(opt), + m: m, + grouping: listRGrouping, + listRmu: new(sync.Mutex), + listRempties: make(map[string]struct{}), } f.isTeamDrive = opt.TeamDriveID != "" f.fileFields = f.getFileFields() @@ -1634,15 +1643,17 @@ func (s listRSlices) Less(i, j int) bool { // In each cycle it will read up to grouping entries from the in channel without blocking. // If an error occurs it will be send to the out channel and then return. Once the in channel is closed, // nil is send to the out channel and the function returns. -func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan listREntry, out chan<- error, cb func(fs.DirEntry) error, grouping int) { +func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error) { var dirs []string var paths []string + var grouping int32 for dir := range in { dirs = append(dirs[:0], dir.id) paths = append(paths[:0], dir.path) + grouping = atomic.LoadInt32(&f.grouping) waitloop: - for i := 1; i < grouping; i++ { + for i := int32(1); i < grouping; i++ { select { case d, ok := <-in: if !ok { @@ -1655,6 +1666,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list } listRSlices{dirs, paths}.Sort() var iErr error + foundItems := false _, err := f.list(ctx, dirs, "", false, false, false, func(item *drive.File) bool { // shared with me items have no parents when at the root if f.opt.SharedWithMe && len(item.Parents) == 0 && len(paths) == 1 && paths[0] == "" { @@ -1662,6 +1674,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list } for _, parent := range item.Parents { var i int + foundItems = true earlyExit := false // If only one item in paths then no need to search for the ID // assuming google drive is doing its job properly. @@ -1702,6 +1715,53 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list } return false }) + // Found no items in more than one directory. Retry these as + // individual directories This is to work around a bug in google + // drive where (A in parents) or (B in parents) returns nothing + // sometimes. See #3114, #4289 and + // https://issuetracker.google.com/issues/149522397 + if len(dirs) > 1 && !foundItems { + if atomic.SwapInt32(&f.grouping, 1) != 1 { + fs.Logf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs)) + } + var recycled = make([]listREntry, len(dirs)) + f.listRmu.Lock() + for i := range dirs { + recycled[i] = listREntry{id: dirs[i], path: paths[i]} + // Make a note of these dirs - if they all turn + // out to be empty then we can re-enable grouping + f.listRempties[dirs[i]] = struct{}{} + } + f.listRmu.Unlock() + // recycle these in the background so we don't deadlock + // the listR runners if they all get here + wg.Add(len(recycled)) + go func() { + for _, entry := range recycled { + in <- entry + } + fs.Debugf(f, "Recycled %d entries", len(recycled)) + }() + } + // If using a grouping of 1 and dir was empty then check to see if it + // is part of the group that caused grouping to be disabled. + if grouping == 1 && len(dirs) == 1 && !foundItems { + f.listRmu.Lock() + if _, found := f.listRempties[dirs[0]]; found { + // Remove the ID + delete(f.listRempties, dirs[0]) + // If no empties left => all the directories that + // triggered the grouping being set to 1 were actually + // empty so must have made a mistake + if len(f.listRempties) == 0 { + if atomic.SwapInt32(&f.grouping, listRGrouping) != listRGrouping { + fs.Logf(f, "Re-enabling ListR as previous detection was in error") + } + } + } + f.listRmu.Unlock() + } + for range dirs { wg.Done() } @@ -1736,11 +1796,6 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - const ( - grouping = 50 - inputBuffer = 1000 - ) - err = f.dirCache.FindRoot(ctx, false) if err != nil { return err @@ -1753,7 +1808,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( mu := sync.Mutex{} // protects in and overflow wg := sync.WaitGroup{} - in := make(chan listREntry, inputBuffer) + in := make(chan listREntry, listRInputBuffer) out := make(chan error, fs.Config.Checkers) list := walk.NewListRHelper(callback) overflow := []listREntry{} @@ -1766,6 +1821,9 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( job := listREntry{actualID(d.ID()), d.Remote()} select { case in <- job: + // Adding the wg after we've entered the item is + // safe here because we know when the callback + // is called we are holding a waitgroup. wg.Add(1) default: overflow = append(overflow, job) @@ -1779,7 +1837,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( in <- listREntry{directoryID, dir} for i := 0; i < fs.Config.Checkers; i++ { - go f.listRRunner(ctx, &wg, in, out, cb, grouping) + go f.listRRunner(ctx, &wg, in, out, cb) } go func() { // wait until the all directories are processed @@ -1789,8 +1847,8 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( mu.Lock() l := len(overflow) // only fill half of the channel to prevent entries being put into overflow again - if l > inputBuffer/2 { - l = inputBuffer / 2 + if l > listRInputBuffer/2 { + l = listRInputBuffer / 2 } wg.Add(l) for _, d := range overflow[:l] {