From dc5a734522a35ee61ec732915bca48e0bd84c68b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Thu, 26 Jul 2018 15:45:41 +0200 Subject: [PATCH] drive: implement ListR --- backend/drive/drive.go | 308 ++++++++++++++++++++++++++++++++++------- 1 file changed, 259 insertions(+), 49 deletions(-) diff --git a/backend/drive/drive.go b/backend/drive/drive.go index ed465c7f0..6843a6854 100644 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -8,6 +8,7 @@ package drive // * files with / in name import ( + "bytes" "fmt" "io" "io/ioutil" @@ -29,6 +30,7 @@ import ( "github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fshttp" "github.com/ncw/rclone/fs/hash" + "github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/lib/dircache" "github.com/ncw/rclone/lib/oauthutil" "github.com/ncw/rclone/lib/pacer" @@ -88,7 +90,7 @@ var ( "text/tab-separated-values": "tsv", } extensionToMimeType map[string]string - partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType" + partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents" exportFormatsOnce sync.Once // make sure we fetch the export formats only once _exportFormats map[string][]string // allowed export mime-type conversions ) @@ -373,7 +375,7 @@ func containsString(slice []string, s string) bool { // If the user fn ever returns true then it early exits with found = true // // Search params: https://developers.google.com/drive/search-parameters -func (f *Fs) list(dirID string, title string, directoriesOnly bool, filesOnly bool, includeAll bool, fn listFn) (found bool, err error) { +func (f *Fs) list(dirIDs []string, title string, directoriesOnly bool, filesOnly bool, includeAll bool, fn listFn) (found bool, err error) { var query []string if !includeAll { q := "trashed=" + strconv.FormatBool(f.opt.TrashedOnly) @@ -385,11 +387,23 @@ func (f *Fs) list(dirID string, title string, directoriesOnly bool, filesOnly bo // Search with sharedWithMe will always return things listed in "Shared With Me" (without any parents) // We must not filter with parent when we try list "ROOT" with drive-shared-with-me // If we need to list file inside those shared folders, we must search it without sharedWithMe - if f.opt.SharedWithMe && dirID == f.rootFolderID { - query = append(query, "sharedWithMe=true") + parentsQuery := bytes.NewBufferString("(") + for _, dirID := range dirIDs { + if dirID == "" { + continue + } + if parentsQuery.Len() > 1 { + _, _ = parentsQuery.WriteString(" or ") + } + if f.opt.SharedWithMe && dirID == f.rootFolderID { + _, _ = parentsQuery.WriteString("sharedWithMe=true") + } else { + _, _ = fmt.Fprintf(parentsQuery, "'%s' in parents", dirID) + } } - if dirID != "" && !(f.opt.SharedWithMe && dirID == f.rootFolderID) { - query = append(query, fmt.Sprintf("'%s' in parents", dirID)) + if parentsQuery.Len() > 1 { + _ = parentsQuery.WriteByte(')') + query = append(query, parentsQuery.String()) } stem := "" if title != "" { @@ -736,7 +750,7 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) { // FindLeaf finds a directory of name leaf in the folder with ID pathID func (f *Fs) FindLeaf(pathID, leaf string) (pathIDOut string, found bool, err error) { // Find the leaf in pathID - found, err = f.list(pathID, leaf, true, false, false, func(item *drive.File) bool { + found, err = f.list([]string{pathID}, leaf, true, false, false, func(item *drive.File) bool { if item.Name == leaf { pathIDOut = item.Id return true @@ -848,45 +862,13 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { } var iErr error - _, err = f.list(directoryID, "", false, false, false, func(item *drive.File) bool { - remote := path.Join(dir, item.Name) - switch { - case item.MimeType == driveFolderType: - // cache the directory ID for later lookups - f.dirCache.Put(remote, item.Id) - when, _ := time.Parse(timeFormatIn, item.ModifiedTime) - d := fs.NewDir(remote, when).SetID(item.Id) - entries = append(entries, d) - case f.opt.AuthOwnerOnly && !isAuthOwned(item): - // ignore object - case item.Md5Checksum != "" || item.Size > 0: - // If item has MD5 sum or a length it is a file stored on drive - o, err := f.newObjectWithInfo(remote, item) - if err != nil { - iErr = err - return true - } - entries = append(entries, o) - case f.opt.SkipGdocs: - fs.Debugf(remote, "Skipping google document type %q", item.MimeType) - default: - // If item MimeType is in the ExportFormats then it is a google doc - extension, _, exportMimeType, isDocument := f.findExportFormat(item) - if !isDocument { - fs.Debugf(remote, "Ignoring unknown document type %q", item.MimeType) - break - } - if extension == "" { - fs.Debugf(remote, "No export formats found for %q", item.MimeType) - break - } - obj, err := f.newObjectWithInfo(remote+"."+extension, item) - if err != nil { - iErr = err - return true - } - obj.(*Object).setGdocsMetaData(item, extension, exportMimeType) - entries = append(entries, obj) + _, err = f.list([]string{directoryID}, "", false, false, false, func(item *drive.File) bool { + entry, err := f.itemToDirEntry(path.Join(dir, item.Name), item) + if err != nil { + return true + } + if entry != nil { + entries = append(entries, entry) } return false }) @@ -899,6 +881,233 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { return entries, nil } +// listRRunner will read dirIDs from the in channel, perform the file listing an call cb with each DirEntry. +// +// In each cycle, will wait up to 10ms to read up to grouping entries from the in channel. +// If an error occurs it will be send to the out channel and then return. Once the in channel is closed, +// nil is send to the out channel and the function returns. +func (f *Fs) listRRunner(wg *sync.WaitGroup, in <-chan string, out chan<- error, cb func(fs.DirEntry) error, grouping int) { + var dirs []string + + for dir := range in { + dirs = append(dirs[:0], dir) + wait := time.After(10 * time.Millisecond) + waitloop: + for i := 1; i < grouping; i++ { + select { + case d, ok := <-in: + if !ok { + break waitloop + } + dirs = append(dirs, d) + case <-wait: + break waitloop + } + } + var iErr error + _, err := f.list(dirs, "", false, false, false, func(item *drive.File) bool { + parentPath := "" + if len(item.Parents) > 0 { + p, ok := f.dirCache.GetInv(item.Parents[0]) + if ok { + parentPath = p + } + } + remote := path.Join(parentPath, item.Name) + entry, err := f.itemToDirEntry(remote, item) + if err != nil { + iErr = err + return true + } + + err = cb(entry) + if err != nil { + iErr = err + return true + } + return false + }) + for range dirs { + wg.Done() + } + + if iErr != nil { + out <- iErr + return + } + + if err != nil { + out <- err + return + } + } + out <- nil +} + +// ListR lists the objects and directories of the Fs starting +// from dir recursively into out. +// +// dir should be "" to start from the root, and should not +// have trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +// +// It should call callback for each tranche of entries read. +// These need not be returned in any particular order. If +// callback returns an error then the listing will stop +// immediately. +// +// Don't implement this unless you have a more efficient way +// of listing recursively that doing a directory traversal. +func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { + const ( + grouping = 50 + inputBuffer = 1000 + ) + + err = f.dirCache.FindRoot(false) + if err != nil { + return err + } + directoryID, err := f.dirCache.FindDir(dir, false) + if err != nil { + return err + } + + mu := sync.Mutex{} // protects in and overflow + wg := sync.WaitGroup{} + in := make(chan string, inputBuffer) + out := make(chan error, fs.Config.Checkers) + list := walk.NewListRHelper(callback) + overfflow := []string{} + + cb := func(entry fs.DirEntry) error { + mu.Lock() + defer mu.Unlock() + if d, isDir := entry.(*fs.Dir); isDir && in != nil { + select { + case in <- d.ID(): + wg.Add(1) + default: + overfflow = append(overfflow, d.ID()) + } + } + return list.Add(entry) + } + + wg.Add(1) + in <- directoryID + + for i := 0; i < fs.Config.Checkers; i++ { + go f.listRRunner(&wg, in, out, cb, grouping) + } + go func() { + // wait until the all directories are processed + wg.Wait() + // if the input channel overflowed add the collected entries to the channel now + for len(overfflow) > 0 { + mu.Lock() + l := len(overfflow) + // only fill half of the channel to prevent entries beeing put into overfflow again + if l > inputBuffer/2 { + l = inputBuffer / 2 + } + wg.Add(l) + for _, d := range overfflow[:l] { + in <- d + } + overfflow = overfflow[l:] + mu.Unlock() + + // wait again for the completion of all directories + wg.Wait() + } + mu.Lock() + if in != nil { + // notify all workers to exit + close(in) + in = nil + } + mu.Unlock() + }() + // wait until the all workers to finish + for i := 0; i < fs.Config.Checkers; i++ { + e := <-out + mu.Lock() + // if one worker returns an error early, close the input so all other workers exit + if e != nil && in != nil { + err = e + close(in) + in = nil + } + mu.Unlock() + } + + close(out) + if err != nil { + return err + } + + return list.Flush() +} + +func (f *Fs) itemToDirEntry(remote string, item *drive.File) (fs.DirEntry, error) { + switch { + case item.MimeType == driveFolderType: + // cache the directory ID for later lookups + f.dirCache.Put(remote, item.Id) + when, _ := time.Parse(timeFormatIn, item.ModifiedTime) + d := fs.NewDir(remote, when).SetID(item.Id) + return d, nil + case f.opt.AuthOwnerOnly && !isAuthOwned(item): + // ignore object + case item.Md5Checksum != "" || item.Size > 0: + // If item has MD5 sum or a length it is a file stored on drive + o, err := f.newObjectWithInfo(remote, item) + if err != nil { + return nil, err + } + return o, nil + case f.opt.SkipGdocs: + fs.Debugf(remote, "Skipping google document type %q", item.MimeType) + default: + // If item MimeType is in the ExportFormats then it is a google doc + extension, _, exportMimeType, isDocument := f.findExportFormat(item) + if !isDocument { + fs.Debugf(remote, "Ignoring unknown document type %q", item.MimeType) + break + } + if extension == "" { + fs.Debugf(remote, "No export formats found for %q", item.MimeType) + break + } + o, err := f.newObjectWithInfo(remote+"."+extension, item) + if err != nil { + return nil, err + } + obj := o.(*Object) + obj.url = fmt.Sprintf("%sfiles/%s/export?mimeType=%s", f.svc.BasePath, item.Id, url.QueryEscape(exportMimeType)) + if f.opt.AlternateExport { + switch item.MimeType { + case "application/vnd.google-apps.drawing": + obj.url = fmt.Sprintf("https://docs.google.com/drawings/d/%s/export/%s", item.Id, extension) + case "application/vnd.google-apps.document": + obj.url = fmt.Sprintf("https://docs.google.com/document/d/%s/export?format=%s", item.Id, extension) + case "application/vnd.google-apps.spreadsheet": + obj.url = fmt.Sprintf("https://docs.google.com/spreadsheets/d/%s/export?format=%s", item.Id, extension) + case "application/vnd.google-apps.presentation": + obj.url = fmt.Sprintf("https://docs.google.com/presentation/d/%s/export/%s", item.Id, extension) + } + } + obj.isDocument = true + obj.mimeType = exportMimeType + obj.bytes = -1 + return o, nil + } + return nil, nil +} + // Creates a drive.File info from the parameters passed in and a half // finished Object which must have setMetaData called on it // @@ -996,7 +1205,7 @@ func (f *Fs) MergeDirs(dirs []fs.Directory) error { for _, srcDir := range dirs[1:] { // list the the objects infos := []*drive.File{} - _, err := f.list(srcDir.ID(), "", false, false, true, func(info *drive.File) bool { + _, err := f.list([]string{srcDir.ID()}, "", false, false, true, func(info *drive.File) bool { infos = append(infos, info) return false }) @@ -1064,7 +1273,7 @@ func (f *Fs) Rmdir(dir string) error { return err } var trashedFiles = false - found, err := f.list(directoryID, "", false, false, true, func(item *drive.File) bool { + found, err := f.list([]string{directoryID}, "", false, false, true, func(item *drive.File) bool { if !item.Trashed { fs.Debugf(dir, "Rmdir: contains file: %q", item.Name) return true @@ -1597,7 +1806,7 @@ func (o *Object) readMetaData() (err error) { return err } - found, err := o.fs.list(directoryID, leaf, false, true, false, func(item *drive.File) bool { + found, err := o.fs.list([]string{directoryID}, leaf, false, true, false, func(item *drive.File) bool { if item.Name == leaf { o.setMetaData(item) return true @@ -1870,6 +2079,7 @@ var ( _ fs.ChangeNotifier = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil) _ fs.PublicLinker = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) _ fs.MergeDirser = (*Fs)(nil) _ fs.Abouter = (*Fs)(nil) _ fs.Object = (*Object)(nil)