union: implement ListR

This commit is contained in:
Max Sum 2020-05-10 22:33:06 +08:00 committed by Nick Craig-Wood
parent f21e97001b
commit 54b16bd054
1 changed files with 63 additions and 30 deletions

View File

@ -580,6 +580,67 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
return f.mergeDirEntries(entriess)
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
for _, u := range f.upstreams {
if u.Features().ListR == nil {
return errors.Errorf("ListR Unsupported for branch: %s", u.Name())
}
}
var entriess [][]upstream.Entry
errs := Errors(make([]error, len(f.upstreams)))
var mutex sync.Mutex
multithread(len(f.upstreams), func(i int) {
u := f.upstreams[i]
err := u.Features().ListR(ctx, dir, func(entries fs.DirEntries) error {
uEntries := make([]upstream.Entry, len(entries))
for j, e := range entries {
uEntries[j], _ = u.WrapEntry(e)
}
mutex.Lock()
entriess = append(entriess, uEntries)
mutex.Unlock()
return nil
})
if err != nil {
errs[i] = errors.Wrap(err, u.Name())
return
}
})
if len(errs) == len(errs.FilterNil()) {
errs = errs.Map(func(e error) error {
if errors.Cause(e) == fs.ErrorDirNotFound {
return nil
}
return e
})
if len(errs) == 0 {
return fs.ErrorDirNotFound
}
return errs.Err()
}
entries, err := f.mergeDirEntries(entriess)
if err != nil {
return err
}
return callback(entries)
}
// NewObject creates a new remote union file object
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
objs := make([]*upstream.Object, len(f.upstreams))
@ -749,36 +810,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
GetTier: true,
}).Fill(f)
for _, f := range upstreams {
if !f.IsWritable() {
continue
}
features = features.Mask(f) // Mask all writable upstream fs
}
// Really need the union of all upstreams for these, so
// re-instate and calculate separately.
features.ChangeNotify = f.ChangeNotify
features.DirCacheFlush = f.DirCacheFlush
// FIXME maybe should be masking the bools here?
// Clear ChangeNotify and DirCacheFlush if all are nil
clearChangeNotify := true
clearDirCacheFlush := true
for _, u := range f.upstreams {
uFeatures := u.Features()
if uFeatures.ChangeNotify != nil {
clearChangeNotify = false
}
if uFeatures.DirCacheFlush != nil {
clearDirCacheFlush = false
}
}
if clearChangeNotify {
features.ChangeNotify = nil
}
if clearDirCacheFlush {
features.DirCacheFlush = nil
features = features.Mask(f) // Mask all upstream fs
}
f.features = features
@ -825,4 +857,5 @@ var (
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
)