From 068cfdaa00f656d37a9a885427e5941484e39c5c Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 20 Aug 2020 11:47:27 +0100 Subject: [PATCH] drive: fix "panic: send on closed channel" when recycling dir entries In this commit: cbf3d43561358149 drive: fix missing items when listing using --fast-list / ListR We introduced a bug where under specific circumstances it could cause a "panic: send on closed channel". This was caused by: - rclone engaging the workaround from the commit above - one of the listing routines returning an error - this caused the `in` channel to be closed to stop the readers - however the workaround was recycling stuff into the `in` channel at the time - hence the panic on closed channel This fix factors out the sending to the `in` channel into `sendJob` and calls this both from the master go routine and the list runners. `sendJob` detects the `in` channel being closed properly and also deals correctly with contention on the `in` channel. Fixes #4511 --- backend/drive/drive.go | 55 +++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/backend/drive/drive.go b/backend/drive/drive.go index e1f0f1189..8e5f4b09f 100755 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -1643,7 +1643,7 @@ func (s listRSlices) Less(i, j int) bool { // In each cycle it will read up to grouping entries from the in channel without blocking. // If an error occurs it will be send to the out channel and then return. Once the in channel is closed, // nil is send to the out channel and the function returns. -func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error) { +func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error, sendJob func(listREntry)) { var dirs []string var paths []string var grouping int32 @@ -1724,24 +1724,17 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listRE if atomic.SwapInt32(&f.grouping, 1) != 1 { fs.Debugf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs)) } - var recycled = make([]listREntry, len(dirs)) f.listRmu.Lock() for i := range dirs { - recycled[i] = listREntry{id: dirs[i], path: paths[i]} + // Requeue the jobs + job := listREntry{id: dirs[i], path: paths[i]} + sendJob(job) // Make a note of these dirs - if they all turn // out to be empty then we can re-enable grouping f.listRempties[dirs[i]] = struct{}{} } f.listRmu.Unlock() - // recycle these in the background so we don't deadlock - // the listR runners if they all get here - wg.Add(len(recycled)) - go func() { - for _, entry := range recycled { - in <- entry - } - fs.Debugf(f, "Recycled %d entries", len(recycled)) - }() + fs.Debugf(f, "Recycled %d entries", len(dirs)) } // If using a grouping of 1 and dir was empty then check to see if it // is part of the group that caused grouping to be disabled. @@ -1810,21 +1803,33 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( overflow := []listREntry{} listed := 0 - cb := func(entry fs.DirEntry) error { + // Send a job to the input channel if not closed. If the job + // won't fit then queue it in the overflow slice. + // + // This will not block if the channel is full. + sendJob := func(job listREntry) { mu.Lock() defer mu.Unlock() - if d, isDir := entry.(*fs.Dir); isDir && in != nil { - job := listREntry{actualID(d.ID()), d.Remote()} - select { - case in <- job: - // Adding the wg after we've entered the item is - // safe here because we know when the callback - // is called we are holding a waitgroup. - wg.Add(1) - default: - overflow = append(overflow, job) - } + if in == nil { + return } + wg.Add(1) + select { + case in <- job: + default: + overflow = append(overflow, job) + wg.Add(-1) + } + } + + // Send the entry to the caller, queueing any directories as new jobs + cb := func(entry fs.DirEntry) error { + if d, isDir := entry.(*fs.Dir); isDir { + job := listREntry{actualID(d.ID()), d.Remote()} + sendJob(job) + } + mu.Lock() + defer mu.Unlock() listed++ return list.Add(entry) } @@ -1833,7 +1838,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( in <- listREntry{directoryID, dir} for i := 0; i < fs.Config.Checkers; i++ { - go f.listRRunner(ctx, &wg, in, out, cb) + go f.listRRunner(ctx, &wg, in, out, cb, sendJob) } go func() { // wait until the all directories are processed