From 92ec29fe3fccf039d16d140ffcc761ebd5761fb7 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 28 Mar 2014 17:56:04 +0000 Subject: [PATCH] Factor the generic code into fs and add some more intefaces --- drive/drive.go | 13 ++ fs/config.go | 24 ++- fs/fs.go | 178 +++--------------- fs/operations.go | 472 +++++++++++++++++++++++++++++++++++++++++++++++ local/local.go | 18 +- rclone.go | 403 +++++----------------------------------- s3/s3.go | 13 ++ swift/swift.go | 13 ++ 8 files changed, 630 insertions(+), 504 deletions(-) create mode 100644 fs/operations.go diff --git a/drive/drive.go b/drive/drive.go index 57ae0cb85..28406a10e 100644 --- a/drive/drive.go +++ b/drive/drive.go @@ -760,6 +760,19 @@ func (f *FsDrive) Purge() error { // ------------------------------------------------------------ +// Return the parent Fs +func (o *FsObjectDrive) Fs() fs.Fs { + return o.drive +} + +// Return a string version +func (o *FsObjectDrive) String() string { + if o == nil { + return "" + } + return o.remote +} + // Return the remote path func (o *FsObjectDrive) Remote() string { return o.remote diff --git a/fs/config.go b/fs/config.go index 6e481dae2..ebba90505 100644 --- a/fs/config.go +++ b/fs/config.go @@ -1,4 +1,5 @@ -// Read and write the config file +// Read, write and edit the config file + package fs import ( @@ -38,12 +39,14 @@ var ( checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.") transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.") configFile = pflag.StringP("config", "", ConfigPath, "Config file.") + dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes") ) // Filesystem config options type ConfigInfo struct { Verbose bool Quiet bool + DryRun bool ModifyWindow time.Duration Checkers int Transfers int @@ -67,6 +70,7 @@ func LoadConfig() { // FIXME read these from the config file too Config.Verbose = *verbose Config.Quiet = *quiet + Config.Quiet = *dryRun Config.ModifyWindow = *modifyWindow Config.Checkers = *checkers Config.Transfers = *transfers @@ -230,6 +234,22 @@ func RemoteConfig(name string) { } } +// Choose an option +func ChooseOption(o *Option) string { + fmt.Println(o.Help) + if len(o.Examples) > 0 { + var values []string + var help []string + for _, example := range o.Examples { + values = append(values, example.Value) + help = append(help, example.Help) + } + return Choose(o.Name, values, help, true) + } + fmt.Printf("%s> ", o.Name) + return ReadLine() +} + // Make a new remote func NewRemote(name string) { fmt.Printf("What type of source is it?\n") @@ -244,7 +264,7 @@ func NewRemote(name string) { log.Fatalf("Failed to find fs: %v", err) } for _, option := range fs.Options { - ConfigFile.SetValue(name, option.Name, option.Choose()) + ConfigFile.SetValue(name, option.Name, ChooseOption(&option)) } RemoteConfig(name) if OkRemote(name) { diff --git a/fs/fs.go b/fs/fs.go index 3fe3b8a3a..df5bcf08b 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -38,22 +38,6 @@ type OptionExample struct { Help string } -// Choose an option -func (o *Option) Choose() string { - fmt.Println(o.Help) - if len(o.Examples) > 0 { - var values []string - var help []string - for _, example := range o.Examples { - values = append(values, example.Value) - help = append(help, example.Help) - } - return Choose(o.Name, values, help, true) - } - fmt.Printf("%s> ", o.Name) - return ReadLine() -} - // Register a filesystem // // Fs modules should use this in an init() function @@ -92,11 +76,15 @@ type Fs interface { Precision() time.Duration } -// FIXME make f.Debugf... - // A filesystem like object which can either be a remote object or a // local file/directory type Object interface { + // String returns a description of the Object + String() string + + // Fs returns the Fs that this object is part of + Fs() Fs + // Remote returns the remote path Remote() string @@ -137,6 +125,14 @@ type ObjectsChan chan Object // A slice of Objects type Objects []Object +// A pair of Objects +type ObjectPair struct { + src, dst Object +} + +// A channel of ObjectPair +type ObjectPairChan chan ObjectPair + // A structure of directory/container/bucket lists type Dir struct { Name string // name of the directory @@ -186,19 +182,27 @@ func NewFs(path string) (Fs, error) { return fs.NewFs(configName, fsPath) } -// Write debuging output for this Object -func Debug(fs Object, text string, args ...interface{}) { +// Outputs log for object +func OutputLog(o interface{}, text string, args ...interface{}) { + description := "" + if x, ok := o.(fmt.Stringer); ok { + description = x.String() + ": " + } + out := fmt.Sprintf(text, args...) + log.Print(description + out) +} + +// Write debuging output for this Object or Fs +func Debug(o interface{}, text string, args ...interface{}) { if Config.Verbose { - out := fmt.Sprintf(text, args...) - log.Printf("%s: %s", fs.Remote(), out) + OutputLog(o, text, args...) } } -// Write log output for this Object -func Log(fs Object, text string, args ...interface{}) { +// Write log output for this Object or Fs +func Log(o interface{}, text string, args ...interface{}) { if !Config.Quiet { - out := fmt.Sprintf(text, args...) - log.Printf("%s: %s", fs.Remote(), out) + OutputLog(o, text, args...) } } @@ -210,125 +214,3 @@ func checkClose(c io.Closer, err *error) { *err = cerr } } - -// Work out modify window for fses passed in - sets Config.ModifyWindow -// -// This is the largest modify window of all the fses in use, and the -// user configured value -func CalculateModifyWindow(fs ...Fs) { - for _, f := range fs { - if f != nil { - precision := f.Precision() - if precision > Config.ModifyWindow { - Config.ModifyWindow = precision - } - } - } - if Config.Verbose { - log.Printf("Modify window is %s\n", Config.ModifyWindow) - } -} - -// Check the two files to see if the MD5sums are the same -// -// May return an error which will already have been logged -// -// If an error is returned it will return false -func CheckMd5sums(src, dst Object) (bool, error) { - srcMd5, err := src.Md5sum() - if err != nil { - Stats.Error() - Log(src, "Failed to calculate src md5: %s", err) - return false, err - } - dstMd5, err := dst.Md5sum() - if err != nil { - Stats.Error() - Log(dst, "Failed to calculate dst md5: %s", err) - return false, err - } - // Debug("Src MD5 %s", srcMd5) - // Debug("Dst MD5 %s", obj.Hash) - return srcMd5 == dstMd5, nil -} - -// Checks to see if the src and dst objects are equal by looking at -// size, mtime and MD5SUM -// -// If the src and dst size are different then it is considered to be -// not equal. -// -// If the size is the same and the mtime is the same then it is -// considered to be equal. This is the heuristic rsync uses when -// not using --checksum. -// -// If the size is the same and and mtime is different or unreadable -// and the MD5SUM is the same then the file is considered to be equal. -// In this case the mtime on the dst is updated. -// -// Otherwise the file is considered to be not equal including if there -// were errors reading info. -func Equal(src, dst Object) bool { - if src.Size() != dst.Size() { - Debug(src, "Sizes differ") - return false - } - - // Size the same so check the mtime - srcModTime := src.ModTime() - dstModTime := dst.ModTime() - dt := dstModTime.Sub(srcModTime) - ModifyWindow := Config.ModifyWindow - if dt >= ModifyWindow || dt <= -ModifyWindow { - Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime) - } else { - Debug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow) - return true - } - - // mtime is unreadable or different but size is the same so - // check the MD5SUM - same, _ := CheckMd5sums(src, dst) - if !same { - Debug(src, "Md5sums differ") - return false - } - - // Size and MD5 the same but mtime different so update the - // mtime of the dst object here - dst.SetModTime(srcModTime) - - Debug(src, "Size and MD5SUM of src and dst objects identical") - return true -} - -// Copy src object to f -func Copy(f Fs, src Object) { - in0, err := src.Open() - if err != nil { - Stats.Error() - Log(src, "Failed to open: %s", err) - return - } - in := NewAccount(in0) // account the transfer - - dst, err := f.Put(in, src.Remote(), src.ModTime(), src.Size()) - inErr := in.Close() - if err == nil { - err = inErr - } - if err != nil { - Stats.Error() - Log(src, "Failed to copy: %s", err) - if dst != nil { - Debug(dst, "Removing failed copy") - removeErr := dst.Remove() - if removeErr != nil { - Stats.Error() - Log(dst, "Failed to remove failed copy: %s", removeErr) - } - } - return - } - Debug(src, "Copied") -} diff --git a/fs/operations.go b/fs/operations.go new file mode 100644 index 000000000..02930b186 --- /dev/null +++ b/fs/operations.go @@ -0,0 +1,472 @@ +// Generic operations on filesystems and objects + +package fs + +import ( + "fmt" + "log" + "sync" +) + +// Work out modify window for fses passed in - sets Config.ModifyWindow +// +// This is the largest modify window of all the fses in use, and the +// user configured value +func CalculateModifyWindow(fs ...Fs) { + for _, f := range fs { + if f != nil { + precision := f.Precision() + if precision > Config.ModifyWindow { + Config.ModifyWindow = precision + } + } + } + Debug(fs[0], "Modify window is %s\n", Config.ModifyWindow) +} + +// Check the two files to see if the MD5sums are the same +// +// May return an error which will already have been logged +// +// If an error is returned it will return false +func CheckMd5sums(src, dst Object) (bool, error) { + srcMd5, err := src.Md5sum() + if err != nil { + Stats.Error() + Log(src, "Failed to calculate src md5: %s", err) + return false, err + } + dstMd5, err := dst.Md5sum() + if err != nil { + Stats.Error() + Log(dst, "Failed to calculate dst md5: %s", err) + return false, err + } + // Debug("Src MD5 %s", srcMd5) + // Debug("Dst MD5 %s", obj.Hash) + return srcMd5 == dstMd5, nil +} + +// Checks to see if the src and dst objects are equal by looking at +// size, mtime and MD5SUM +// +// If the src and dst size are different then it is considered to be +// not equal. +// +// If the size is the same and the mtime is the same then it is +// considered to be equal. This is the heuristic rsync uses when +// not using --checksum. +// +// If the size is the same and and mtime is different or unreadable +// and the MD5SUM is the same then the file is considered to be equal. +// In this case the mtime on the dst is updated. +// +// Otherwise the file is considered to be not equal including if there +// were errors reading info. +func Equal(src, dst Object) bool { + if src.Size() != dst.Size() { + Debug(src, "Sizes differ") + return false + } + + // Size the same so check the mtime + srcModTime := src.ModTime() + dstModTime := dst.ModTime() + dt := dstModTime.Sub(srcModTime) + ModifyWindow := Config.ModifyWindow + if dt >= ModifyWindow || dt <= -ModifyWindow { + Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime) + } else { + Debug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow) + return true + } + + // mtime is unreadable or different but size is the same so + // check the MD5SUM + same, _ := CheckMd5sums(src, dst) + if !same { + Debug(src, "Md5sums differ") + return false + } + + // Size and MD5 the same but mtime different so update the + // mtime of the dst object here + dst.SetModTime(srcModTime) + + Debug(src, "Size and MD5SUM of src and dst objects identical") + return true +} + +// Copy src object to f +func Copy(f Fs, src Object) { + in0, err := src.Open() + if err != nil { + Stats.Error() + Log(src, "Failed to open: %s", err) + return + } + in := NewAccount(in0) // account the transfer + + dst, err := f.Put(in, src.Remote(), src.ModTime(), src.Size()) + inErr := in.Close() + if err == nil { + err = inErr + } + if err != nil { + Stats.Error() + Log(src, "Failed to copy: %s", err) + if dst != nil { + Debug(dst, "Removing failed copy") + removeErr := dst.Remove() + if removeErr != nil { + Stats.Error() + Log(dst, "Failed to remove failed copy: %s", removeErr) + } + } + return + } + Debug(src, "Copied") +} + +// Check to see if src needs to be copied to dst and if so puts it in out +func checkOne(src, dst Object, out ObjectsChan) { + if dst == nil { + Debug(src, "Couldn't find local file - download") + out <- src + return + } + // Check to see if can store this + if !src.Storable() { + return + } + // Check to see if changed or not + if Equal(src, dst) { + Debug(src, "Unchanged skipping") + return + } + out <- src +} + +// Read FsObjects~s on in send to out if they need uploading +// +// FIXME potentially doing lots of MD5SUMS at once +func PairChecker(in ObjectPairChan, out ObjectsChan, wg *sync.WaitGroup) { + defer wg.Done() + for pair := range in { + src := pair.src + Stats.Checking(src) + checkOne(src, pair.dst, out) + Stats.DoneChecking(src) + } +} + +// Read FsObjects~s on in send to out if they need uploading +// +// FIXME potentially doing lots of MD5SUMS at once +func Checker(in, out ObjectsChan, fdst Fs, wg *sync.WaitGroup) { + defer wg.Done() + for src := range in { + Stats.Checking(src) + dst := fdst.NewFsObject(src.Remote()) + checkOne(src, dst, out) + Stats.DoneChecking(src) + } +} + +// Read FsObjects on in and copy them +func Copier(in ObjectsChan, fdst Fs, wg *sync.WaitGroup) { + defer wg.Done() + for src := range in { + Stats.Transferring(src) + Copy(fdst, src) + Stats.DoneTransferring(src) + } +} + +// Copies fsrc into fdst +func CopyFs(fdst, fsrc Fs) error { + err := fdst.Mkdir() + if err != nil { + Stats.Error() + return err + } + + to_be_checked := fsrc.List() + to_be_uploaded := make(ObjectsChan, Config.Transfers) + + var checkerWg sync.WaitGroup + checkerWg.Add(Config.Checkers) + for i := 0; i < Config.Checkers; i++ { + go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg) + } + + var copierWg sync.WaitGroup + copierWg.Add(Config.Transfers) + for i := 0; i < Config.Transfers; i++ { + go Copier(to_be_uploaded, fdst, &copierWg) + } + + Log(fdst, "Waiting for checks to finish") + checkerWg.Wait() + close(to_be_uploaded) + Log(fdst, "Waiting for transfers to finish") + copierWg.Wait() + + return nil +} + +// Delete all the files passed in the channel +func DeleteFiles(to_be_deleted ObjectsChan) { + var wg sync.WaitGroup + wg.Add(Config.Transfers) + var fs Fs + for i := 0; i < Config.Transfers; i++ { + go func() { + defer wg.Done() + for dst := range to_be_deleted { + fs = dst.Fs() + if Config.DryRun { + Debug(dst, "Not deleting as --dry-run") + } else { + Stats.Checking(dst) + err := dst.Remove() + Stats.DoneChecking(dst) + if err != nil { + Stats.Error() + Log(dst, "Couldn't delete: %s", err) + } else { + Debug(dst, "Deleted") + } + } + } + }() + } + + Log(fs, "Waiting for deletions to finish") + wg.Wait() +} + +// Syncs fsrc into fdst +func Sync(fdst, fsrc Fs) error { + err := fdst.Mkdir() + if err != nil { + Stats.Error() + return err + } + + Log(fdst, "Building file list") + + // Read the destination files first + // FIXME could do this in parallel and make it use less memory + delFiles := make(map[string]Object) + for dst := range fdst.List() { + delFiles[dst.Remote()] = dst + } + + // Read source files checking them off against dest files + to_be_checked := make(ObjectPairChan, Config.Transfers) + to_be_uploaded := make(ObjectsChan, Config.Transfers) + + var checkerWg sync.WaitGroup + checkerWg.Add(Config.Checkers) + for i := 0; i < Config.Checkers; i++ { + go PairChecker(to_be_checked, to_be_uploaded, &checkerWg) + } + + var copierWg sync.WaitGroup + copierWg.Add(Config.Transfers) + for i := 0; i < Config.Transfers; i++ { + go Copier(to_be_uploaded, fdst, &copierWg) + } + + go func() { + for src := range fsrc.List() { + remote := src.Remote() + dst, found := delFiles[remote] + if found { + delete(delFiles, remote) + to_be_checked <- ObjectPair{src, dst} + } else { + // No need to check doesn't exist + to_be_uploaded <- src + } + } + close(to_be_checked) + }() + + Log(fdst, "Waiting for checks to finish") + checkerWg.Wait() + close(to_be_uploaded) + Log(fdst, "Waiting for transfers to finish") + copierWg.Wait() + + if Stats.Errored() { + Log(fdst, "Not deleting files as there were IO errors") + return nil + } + + // Delete the spare files + toDelete := make(ObjectsChan, Config.Transfers) + go func() { + for _, fs := range delFiles { + toDelete <- fs + } + close(toDelete) + }() + DeleteFiles(toDelete) + return nil +} + +// Checks the files in fsrc and fdst according to Size and MD5SUM +func Check(fdst, fsrc Fs) error { + Log(fdst, "Building file list") + + // Read the destination files first + // FIXME could do this in parallel and make it use less memory + dstFiles := make(map[string]Object) + for dst := range fdst.List() { + dstFiles[dst.Remote()] = dst + } + + // Read the source files checking them against dstFiles + // FIXME could do this in parallel and make it use less memory + srcFiles := make(map[string]Object) + commonFiles := make(map[string][]Object) + for src := range fsrc.List() { + remote := src.Remote() + if dst, ok := dstFiles[remote]; ok { + commonFiles[remote] = []Object{dst, src} + delete(dstFiles, remote) + } else { + srcFiles[remote] = src + } + } + + Log(fdst, "%d files not in %v", len(dstFiles), fsrc) + for _, dst := range dstFiles { + Stats.Error() + Log(dst, "File not in %v", fsrc) + } + + Log(fsrc, "%d files not in %s", len(srcFiles), fdst) + for _, src := range srcFiles { + Stats.Error() + Log(src, "File not in %v", fdst) + } + + checks := make(chan []Object, Config.Transfers) + go func() { + for _, check := range commonFiles { + checks <- check + } + close(checks) + }() + + var checkerWg sync.WaitGroup + checkerWg.Add(Config.Checkers) + for i := 0; i < Config.Checkers; i++ { + go func() { + defer checkerWg.Done() + for check := range checks { + dst, src := check[0], check[1] + Stats.Checking(src) + if src.Size() != dst.Size() { + Stats.DoneChecking(src) + Stats.Error() + Log(src, "Sizes differ") + continue + } + same, err := CheckMd5sums(src, dst) + Stats.DoneChecking(src) + if err != nil { + continue + } + if !same { + Stats.Error() + Log(src, "Md5sums differ") + } + Debug(src, "OK") + } + }() + } + + Log(fdst, "Waiting for checks to finish") + checkerWg.Wait() + Log(fdst, "%d differences found", Stats.GetErrors()) + if Stats.GetErrors() > 0 { + return fmt.Errorf("%d differences found", Stats.GetErrors()) + } + return nil +} + +// List the Fs to stdout +// +// Lists in parallel which may get them out of order +func List(f Fs) error { + in := f.List() + var wg sync.WaitGroup + wg.Add(Config.Checkers) + for i := 0; i < Config.Checkers; i++ { + go func() { + defer wg.Done() + for o := range in { + Stats.Checking(o) + modTime := o.ModTime() + Stats.DoneChecking(o) + fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote()) + } + }() + } + wg.Wait() + return nil +} + +// List the directories/buckets/containers in the Fs to stdout +func ListDir(f Fs) error { + for dir := range f.ListDir() { + fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name) + } + return nil +} + +// Makes a destination directory or container +func Mkdir(f Fs) error { + err := f.Mkdir() + if err != nil { + Stats.Error() + return err + } + return nil +} + +// Removes a container but not if not empty +func Rmdir(f Fs) error { + if Config.DryRun { + Log(f, "Not deleting as dry run is set") + } else { + err := f.Rmdir() + if err != nil { + Stats.Error() + return err + } + } + return nil +} + +// Removes a container and all of its contents +// +// FIXME doesn't delete local directories +func Purge(f Fs) error { + if purger, ok := f.(Purger); ok { + err := purger.Purge() + if err != nil { + Stats.Error() + return err + } + } else { + DeleteFiles(f.List()) + log.Printf("Deleting path") + Rmdir(f) + } + return nil +} diff --git a/local/local.go b/local/local.go index b0b3a9af9..9b13cf0dd 100644 --- a/local/local.go +++ b/local/local.go @@ -33,6 +33,7 @@ type FsLocal struct { // FsObjectLocal represents a local filesystem object type FsObjectLocal struct { + local fs.Fs // The Fs this object is part of remote string // The remote path path string // The local path info os.FileInfo // Interface for file info @@ -57,7 +58,7 @@ func (f *FsLocal) String() string { // May return nil if an error occurred func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) fs.Object { path := filepath.Join(f.root, remote) - o := &FsObjectLocal{remote: remote, path: path} + o := &FsObjectLocal{local: f, remote: remote, path: path} if info != nil { o.info = info } else { @@ -162,7 +163,7 @@ func (f *FsLocal) ListDir() fs.DirChan { func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) { dstPath := filepath.Join(f.root, remote) // Temporary FsObject under construction - fs := &FsObjectLocal{remote: remote, path: dstPath} + fs := &FsObjectLocal{local: f, remote: remote, path: dstPath} dir := path.Dir(dstPath) err := os.MkdirAll(dir, 0770) @@ -260,6 +261,19 @@ func (f *FsLocal) readPrecision() (precision time.Duration) { // ------------------------------------------------------------ +// Return the parent Fs +func (o *FsObjectLocal) Fs() fs.Fs { + return o.local +} + +// Return a string version +func (o *FsObjectLocal) String() string { + if o == nil { + return "" + } + return o.remote +} + // Return the remote path func (o *FsObjectLocal) Remote() string { return o.remote diff --git a/rclone.go b/rclone.go index 8bd80a0cc..06dbb0325 100644 --- a/rclone.go +++ b/rclone.go @@ -10,7 +10,6 @@ import ( "runtime" "runtime/pprof" "strings" - "sync" "time" "github.com/ogier/pflag" @@ -27,351 +26,9 @@ import ( var ( // Flags cpuprofile = pflag.StringP("cpuprofile", "", "", "Write cpu profile to file") - dry_run = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes") statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats") ) -// A pair of fs.Objects -type PairFsObjects struct { - src, dst fs.Object -} - -type PairFsObjectsChan chan PairFsObjects - -// Check to see if src needs to be copied to dst and if so puts it in out -func checkOne(src, dst fs.Object, out fs.ObjectsChan) { - if dst == nil { - fs.Debug(src, "Couldn't find local file - download") - out <- src - return - } - // Check to see if can store this - if !src.Storable() { - return - } - // Check to see if changed or not - if fs.Equal(src, dst) { - fs.Debug(src, "Unchanged skipping") - return - } - out <- src -} - -// Read FsObjects~s on in send to out if they need uploading -// -// FIXME potentially doing lots of MD5SUMS at once -func PairChecker(in PairFsObjectsChan, out fs.ObjectsChan, wg *sync.WaitGroup) { - defer wg.Done() - for pair := range in { - src := pair.src - fs.Stats.Checking(src) - checkOne(src, pair.dst, out) - fs.Stats.DoneChecking(src) - } -} - -// Read FsObjects~s on in send to out if they need uploading -// -// FIXME potentially doing lots of MD5SUMS at once -func Checker(in, out fs.ObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) { - defer wg.Done() - for src := range in { - fs.Stats.Checking(src) - dst := fdst.NewFsObject(src.Remote()) - checkOne(src, dst, out) - fs.Stats.DoneChecking(src) - } -} - -// Read FsObjects on in and copy them -func Copier(in fs.ObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) { - defer wg.Done() - for src := range in { - fs.Stats.Transferring(src) - fs.Copy(fdst, src) - fs.Stats.DoneTransferring(src) - } -} - -// Copies fsrc into fdst -func CopyFs(fdst, fsrc fs.Fs) { - err := fdst.Mkdir() - if err != nil { - fs.Stats.Error() - log.Fatal("Failed to make destination") - } - - to_be_checked := fsrc.List() - to_be_uploaded := make(fs.ObjectsChan, fs.Config.Transfers) - - var checkerWg sync.WaitGroup - checkerWg.Add(fs.Config.Checkers) - for i := 0; i < fs.Config.Checkers; i++ { - go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg) - } - - var copierWg sync.WaitGroup - copierWg.Add(fs.Config.Transfers) - for i := 0; i < fs.Config.Transfers; i++ { - go Copier(to_be_uploaded, fdst, &copierWg) - } - - log.Printf("Waiting for checks to finish") - checkerWg.Wait() - close(to_be_uploaded) - log.Printf("Waiting for transfers to finish") - copierWg.Wait() -} - -// Delete all the files passed in the channel -func DeleteFiles(to_be_deleted fs.ObjectsChan) { - var wg sync.WaitGroup - wg.Add(fs.Config.Transfers) - for i := 0; i < fs.Config.Transfers; i++ { - go func() { - defer wg.Done() - for dst := range to_be_deleted { - if *dry_run { - fs.Debug(dst, "Not deleting as --dry-run") - } else { - fs.Stats.Checking(dst) - err := dst.Remove() - fs.Stats.DoneChecking(dst) - if err != nil { - fs.Stats.Error() - fs.Log(dst, "Couldn't delete: %s", err) - } else { - fs.Debug(dst, "Deleted") - } - } - } - }() - } - - log.Printf("Waiting for deletions to finish") - wg.Wait() -} - -// Syncs fsrc into fdst -func Sync(fdst, fsrc fs.Fs) { - err := fdst.Mkdir() - if err != nil { - fs.Stats.Error() - log.Fatal("Failed to make destination") - } - - log.Printf("Building file list") - - // Read the destination files first - // FIXME could do this in parallel and make it use less memory - delFiles := make(map[string]fs.Object) - for dst := range fdst.List() { - delFiles[dst.Remote()] = dst - } - - // Read source files checking them off against dest files - to_be_checked := make(PairFsObjectsChan, fs.Config.Transfers) - to_be_uploaded := make(fs.ObjectsChan, fs.Config.Transfers) - - var checkerWg sync.WaitGroup - checkerWg.Add(fs.Config.Checkers) - for i := 0; i < fs.Config.Checkers; i++ { - go PairChecker(to_be_checked, to_be_uploaded, &checkerWg) - } - - var copierWg sync.WaitGroup - copierWg.Add(fs.Config.Transfers) - for i := 0; i < fs.Config.Transfers; i++ { - go Copier(to_be_uploaded, fdst, &copierWg) - } - - go func() { - for src := range fsrc.List() { - remote := src.Remote() - dst, found := delFiles[remote] - if found { - delete(delFiles, remote) - to_be_checked <- PairFsObjects{src, dst} - } else { - // No need to check doesn't exist - to_be_uploaded <- src - } - } - close(to_be_checked) - }() - - log.Printf("Waiting for checks to finish") - checkerWg.Wait() - close(to_be_uploaded) - log.Printf("Waiting for transfers to finish") - copierWg.Wait() - - if fs.Stats.Errored() { - log.Printf("Not deleting files as there were IO errors") - return - } - - // Delete the spare files - toDelete := make(fs.ObjectsChan, fs.Config.Transfers) - go func() { - for _, fs := range delFiles { - toDelete <- fs - } - close(toDelete) - }() - DeleteFiles(toDelete) -} - -// Checks the files in fsrc and fdst according to Size and MD5SUM -func Check(fdst, fsrc fs.Fs) { - log.Printf("Building file list") - - // Read the destination files first - // FIXME could do this in parallel and make it use less memory - dstFiles := make(map[string]fs.Object) - for dst := range fdst.List() { - dstFiles[dst.Remote()] = dst - } - - // Read the source files checking them against dstFiles - // FIXME could do this in parallel and make it use less memory - srcFiles := make(map[string]fs.Object) - commonFiles := make(map[string][]fs.Object) - for src := range fsrc.List() { - remote := src.Remote() - if dst, ok := dstFiles[remote]; ok { - commonFiles[remote] = []fs.Object{dst, src} - delete(dstFiles, remote) - } else { - srcFiles[remote] = src - } - } - - log.Printf("Files in %s but not in %s", fdst, fsrc) - for remote := range dstFiles { - fs.Stats.Error() - log.Printf(remote) - } - - log.Printf("Files in %s but not in %s", fsrc, fdst) - for remote := range srcFiles { - fs.Stats.Error() - log.Printf(remote) - } - - checks := make(chan []fs.Object, fs.Config.Transfers) - go func() { - for _, check := range commonFiles { - checks <- check - } - close(checks) - }() - - var checkerWg sync.WaitGroup - checkerWg.Add(fs.Config.Checkers) - for i := 0; i < fs.Config.Checkers; i++ { - go func() { - defer checkerWg.Done() - for check := range checks { - dst, src := check[0], check[1] - fs.Stats.Checking(src) - if src.Size() != dst.Size() { - fs.Stats.DoneChecking(src) - fs.Stats.Error() - fs.Log(src, "Sizes differ") - continue - } - same, err := fs.CheckMd5sums(src, dst) - fs.Stats.DoneChecking(src) - if err != nil { - continue - } - if !same { - fs.Stats.Error() - fs.Log(src, "Md5sums differ") - } - fs.Debug(src, "OK") - } - }() - } - - log.Printf("Waiting for checks to finish") - checkerWg.Wait() - log.Printf("%d differences found", fs.Stats.GetErrors()) -} - -// List the Fs to stdout -// -// Lists in parallel which may get them out of order -func List(f, _ fs.Fs) { - in := f.List() - var wg sync.WaitGroup - wg.Add(fs.Config.Checkers) - for i := 0; i < fs.Config.Checkers; i++ { - go func() { - defer wg.Done() - for o := range in { - fs.Stats.Checking(o) - modTime := o.ModTime() - fs.Stats.DoneChecking(o) - fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote()) - } - }() - } - wg.Wait() -} - -// List the directories/buckets/containers in the Fs to stdout -func ListDir(f, _ fs.Fs) { - for dir := range f.ListDir() { - fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name) - } -} - -// Makes a destination directory or container -func mkdir(fdst, fsrc fs.Fs) { - err := fdst.Mkdir() - if err != nil { - fs.Stats.Error() - log.Fatalf("Mkdir failed: %s", err) - } -} - -// Removes a container but not if not empty -func rmdir(fdst, fsrc fs.Fs) { - if *dry_run { - log.Printf("Not deleting %s as --dry-run", fdst) - } else { - err := fdst.Rmdir() - if err != nil { - fs.Stats.Error() - log.Fatalf("Rmdir failed: %s", err) - } - } -} - -// Removes a container and all of its contents -// -// FIXME doesn't delete local directories -func purge(fdst, fsrc fs.Fs) { - if f, ok := fdst.(fs.Purger); ok { - err := f.Purge() - if err != nil { - fs.Stats.Error() - log.Fatalf("Purge failed: %s", err) - } - } else { - DeleteFiles(fdst.List()) - log.Printf("Deleting path") - rmdir(fdst, fsrc) - } -} - -// Edits the config file -func EditConfig(fdst, fsrc fs.Fs) { - fs.EditConfig() -} - type Command struct { Name string Help string @@ -403,7 +60,12 @@ var Commands = []Command{ Copy the source to the destination. Doesn't transfer unchanged files, testing first by modification time then by MD5SUM. Doesn't delete files from the destination.`, - Run: CopyFs, + Run: func(fdst, fsrc fs.Fs) { + err := fs.CopyFs(fdst, fsrc) + if err != nil { + log.Fatalf("Failed to copy: %v", err) + } + }, MinArgs: 2, MaxArgs: 2, }, @@ -416,7 +78,12 @@ var Commands = []Command{ MD5SUM. Deletes any files that exist in source that don't exist in destination. Since this can cause data loss, test first with the --dry-run flag.`, - Run: Sync, + Run: func(fdst, fsrc fs.Fs) { + err := fs.Sync(fdst, fsrc) + if err != nil { + log.Fatalf("Failed to sync: %v", err) + } + }, MinArgs: 2, MaxArgs: 2, }, @@ -425,7 +92,12 @@ var Commands = []Command{ ArgsHelp: "[remote://path]", Help: ` List all the objects in the the path.`, - Run: List, + Run: func(fdst, fsrc fs.Fs) { + err := fs.List(fdst) + if err != nil { + log.Fatalf("Failed to list: %v", err) + } + }, MinArgs: 1, MaxArgs: 1, }, @@ -434,7 +106,12 @@ var Commands = []Command{ ArgsHelp: "[remote://path]", Help: ` List all directoryes/objects/buckets in the the path.`, - Run: ListDir, + Run: func(fdst, fsrc fs.Fs) { + err := fs.ListDir(fdst) + if err != nil { + log.Fatalf("Failed to listdir: %v", err) + } + }, MinArgs: 1, MaxArgs: 1, }, @@ -443,7 +120,12 @@ var Commands = []Command{ ArgsHelp: "remote://path", Help: ` Make the path if it doesn't already exist`, - Run: mkdir, + Run: func(fdst, fsrc fs.Fs) { + err := fs.Mkdir(fdst) + if err != nil { + log.Fatalf("Failed to mkdir: %v", err) + } + }, MinArgs: 1, MaxArgs: 1, }, @@ -453,7 +135,12 @@ var Commands = []Command{ Help: ` Remove the path. Note that you can't remove a path with objects in it, use purge for that.`, - Run: rmdir, + Run: func(fdst, fsrc fs.Fs) { + err := fs.Rmdir(fdst) + if err != nil { + log.Fatalf("Failed to rmdir: %v", err) + } + }, MinArgs: 1, MaxArgs: 1, }, @@ -462,7 +149,12 @@ var Commands = []Command{ ArgsHelp: "remote://path", Help: ` Remove the path and all of its contents.`, - Run: purge, + Run: func(fdst, fsrc fs.Fs) { + err := fs.Purge(fdst) + if err != nil { + log.Fatalf("Failed to purge: %v", err) + } + }, MinArgs: 1, MaxArgs: 1, }, @@ -473,7 +165,12 @@ var Commands = []Command{ Checks the files in the source and destination match. It compares sizes and MD5SUMs and prints a report of files which don't match. It doesn't alter the source or destination.`, - Run: Check, + Run: func(fdst, fsrc fs.Fs) { + err := fs.Check(fdst, fsrc) + if err != nil { + log.Fatalf("Failed to check: %v", err) + } + }, MinArgs: 2, MaxArgs: 2, }, @@ -481,7 +178,9 @@ var Commands = []Command{ Name: "config", Help: ` Enter an interactive configuration session.`, - Run: EditConfig, + Run: func(fdst, fsrc fs.Fs) { + fs.EditConfig() + }, NoStats: true, }, { diff --git a/s3/s3.go b/s3/s3.go index 36689dff4..b2c7f0c03 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -331,6 +331,19 @@ func (f *FsS3) Precision() time.Duration { // ------------------------------------------------------------ +// Return the parent Fs +func (o *FsObjectS3) Fs() fs.Fs { + return o.s3 +} + +// Return a string version +func (o *FsObjectS3) String() string { + if o == nil { + return "" + } + return o.remote +} + // Return the remote path func (o *FsObjectS3) Remote() string { return o.remote diff --git a/swift/swift.go b/swift/swift.go index 8a92421cd..65604ecfa 100644 --- a/swift/swift.go +++ b/swift/swift.go @@ -245,6 +245,19 @@ func (fs *FsSwift) Precision() time.Duration { // ------------------------------------------------------------ +// Return the parent Fs +func (o *FsObjectSwift) Fs() fs.Fs { + return o.swift +} + +// Return a string version +func (o *FsObjectSwift) String() string { + if o == nil { + return "" + } + return o.remote +} + // Return the remote path func (o *FsObjectSwift) Remote() string { return o.remote