From f8246b5a7ecae9abbdcedc81d5d69cd57d58165c Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 18 Jan 2013 23:21:02 +0000 Subject: [PATCH] Implement Precision interface in all filesystems * drive: add mime type on upload * drive: correct info on upload to fix crash * local: measure time precision by making a local file and using Chtimes * swift: move snet parameter here * core: add -modify-window and compute the optimum one * core: use modify window when checking times --- fs.go | 10 ++++++--- fs_drive.go | 25 +++++++++++++++++++-- fs_local.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++- fs_s3.go | 5 +++++ fs_swift.go | 6 +++++ notes.txt | 9 ++++---- swiftsync.go | 20 ++++++++++++++++- 7 files changed, 127 insertions(+), 11 deletions(-) diff --git a/fs.go b/fs.go index bfca0d3fd..7b167e526 100644 --- a/fs.go +++ b/fs.go @@ -32,6 +32,9 @@ type Fs interface { // Remove the directory (container, bucket) if empty Rmdir() error + + // Precision of the ModTimes in this Fs + Precision() time.Duration } // FIXME make f.Debugf... @@ -168,10 +171,11 @@ func Equal(src, dst FsObject) bool { // Size the same so check the mtime srcModTime := src.ModTime() dstModTime := dst.ModTime() - if !dstModTime.Equal(srcModTime) { - FsDebug(src, "Modification times differ: %v, %v", srcModTime, dstModTime) + dt := dstModTime.Sub(srcModTime) + if dt >= *modifyWindow || dt <= -*modifyWindow { + FsDebug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime) } else { - FsDebug(src, "Size and modification time the same") + FsDebug(src, "Size and modification time differ by %s (within %s)", dt, *modifyWindow) return true } diff --git a/fs_drive.go b/fs_drive.go index 8dc080f76..13c8635af 100644 --- a/fs_drive.go +++ b/fs_drive.go @@ -1,8 +1,15 @@ // Drive interface package main +// FIXME drive code is leaking goroutines somehow + +// FIXME use recursive listing not bound to directory for speed? + // FIXME list containers equivalent should list directories? +// FIXME list directory should list to channel for concurrency not +// append to array + // FIXME drive times only accurate to 1 ms (3 decimal places) // FIXME perhaps have a drive setup mode where we ask for all the @@ -24,8 +31,10 @@ import ( "fmt" "io" "log" + "mime" "net/http" "os" + "path" "regexp" "strings" "sync" @@ -458,12 +467,18 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64 return nil, fmt.Errorf("Couldn't find or make directory: %s", err) } + // Guess the mime type + mimeType := mime.TypeByExtension(path.Ext(remote)) + if mimeType == "" { + mimeType = "application/octet-stream" + } + // Define the metadata for the file we are going to create. info := &drive.File{ Title: leaf, Description: leaf, Parents: []*drive.ParentReference{{Id: directoryId}}, - // FIXME set mimeType: + MimeType: mimeType, } // FIXME can't set modified date on initial upload as no @@ -483,12 +498,13 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64 if err != nil { return nil, fmt.Errorf("Upload failed: %s", err) } + fs.info = info // Set modified date info.ModifiedDate = modTime.Format(time.RFC3339Nano) _, err = f.svc.Files.Update(info.Id, info).SetModifiedDate(true).Do() if err != nil { - return nil, fmt.Errorf("Failed to set mtime: %s", err) + return fs, fmt.Errorf("Failed to set mtime: %s", err) } return fs, nil } @@ -523,6 +539,11 @@ func (f *FsDrive) Rmdir() error { return nil } +// Return the precision +func (fs *FsDrive) Precision() time.Duration { + return time.Millisecond +} + // Purge deletes all the files and the container // // Returns an error if it isn't empty diff --git a/fs_local.go b/fs_local.go index ba9d8e8b6..33036fd5d 100644 --- a/fs_local.go +++ b/fs_local.go @@ -5,16 +5,20 @@ import ( "crypto/md5" "fmt" "io" + "io/ioutil" "log" "os" "path" "path/filepath" + "sync" "time" ) // FsLocal represents a local filesystem rooted at root type FsLocal struct { - root string + root string // The root directory + precisionOk sync.Once // Whether we need to read the precision + precision time.Duration // precision of local filesystem } // FsObjectLocal represents a local filesystem object @@ -144,6 +148,63 @@ func (f *FsLocal) Rmdir() error { return os.Remove(f.root) } +// Return the precision +func (f *FsLocal) Precision() (precision time.Duration) { + f.precisionOk.Do(func() { + f.precision = f.readPrecision() + }) + return f.precision +} + +// Read the precision +func (f *FsLocal) readPrecision() (precision time.Duration) { + // Default precision of 1s + precision = time.Second + + // Create temporary file and test it + fd, err := ioutil.TempFile("", "swiftsync") + if err != nil { + // If failed return 1s + // fmt.Println("Failed to create temp file", err) + return time.Second + } + path := fd.Name() + // fmt.Println("Created temp file", path) + fd.Close() + + // Delete it on return + defer func() { + // fmt.Println("Remove temp file") + os.Remove(path) + }() + + // Find the minimum duration we can detect + for duration := time.Duration(1); duration < time.Second; duration *= 10 { + // Current time with delta + t := time.Unix(time.Now().Unix(), int64(duration)) + err := Chtimes(path, t, t) + if err != nil { + // fmt.Println("Failed to Chtimes", err) + break + } + + // Read the actual time back + fi, err := os.Stat(path) + if err != nil { + // fmt.Println("Failed to Stat", err) + break + } + + // If it matches - have found the precision + // fmt.Println("compare", fi.ModTime(), t) + if fi.ModTime() == t { + // fmt.Println("Precision detected as", duration) + return duration + } + } + return +} + // ------------------------------------------------------------ // Return the remote path diff --git a/fs_s3.go b/fs_s3.go index 8f0586888..c9ef02607 100644 --- a/fs_s3.go +++ b/fs_s3.go @@ -244,6 +244,11 @@ func (f *FsS3) Rmdir() error { return f.b.DelBucket() } +// Return the precision +func (fs *FsS3) Precision() time.Duration { + return time.Nanosecond +} + // ------------------------------------------------------------ // Return the remote path diff --git a/fs_swift.go b/fs_swift.go index 844694f8f..8db951d36 100644 --- a/fs_swift.go +++ b/fs_swift.go @@ -39,6 +39,7 @@ var ( authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.") userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.") apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") + snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented ) // String converts this FsSwift to a string @@ -200,6 +201,11 @@ func (f *FsSwift) Rmdir() error { return f.c.ContainerDelete(f.container) } +// Return the precision +func (fs *FsSwift) Precision() time.Duration { + return time.Nanosecond +} + // ------------------------------------------------------------ // Return the remote path diff --git a/notes.txt b/notes.txt index 44eb1854e..101a37d64 100644 --- a/notes.txt +++ b/notes.txt @@ -1,7 +1,6 @@ Todo * Make a test suite which can run on all the given types of fs * Copy should use the sync code as it is more efficient at directory listing - * Drive needs a modify window of 1ms * Factor fses into own packages * FIXME: ls without an argument for buckets/containers? * FIXME: More -dry-run checks for object transfer @@ -19,8 +18,8 @@ Todo * Check the locking in swift module! * Windows paths? Do we need to translate / and \? * Make a fs.Errorf and count errors and log them at a different level - * add -modify-window flag - fs should keep knowledge of resolution * Add max object size to fs metadata - 5GB for swift, infinite for local, ? for s3 + * tie into -max-size flag Drive * Do we need the secrets or just the code? If just the code then @@ -33,11 +32,13 @@ Drive * It receives the token back * It displays the token to the user to paste in to the code * Should be https really - + * Sometimes get: Failed to copy: Upload failed: googleapi: Error 403: Rate Limit Exceeded + * quota is 100.0 requests/second/user + Ideas * could do encryption - put IV into metadata? * optimise remote copy container to another container using remote - copy if local is same as remote + copy if local is same as remote - use an optional Copier interface * Allow subpaths container:/sub/path * look at auth from env in s3 module - add to swift? * support diff --git a/swiftsync.go b/swiftsync.go index 31c3f439b..dafb2adff 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -19,13 +19,13 @@ import ( var ( // Flags cpuprofile = flag.String("cpuprofile", "", "Write cpu profile to file") - snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented verbose = flag.Bool("verbose", false, "Print lots more stuff") quiet = flag.Bool("quiet", false, "Print as little stuff as possible") dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes") checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") statsInterval = flag.Duration("stats", time.Minute*1, "Interval to print stats") + modifyWindow = flag.Duration("modify-window", time.Nanosecond, "Max time diff to be considered the same") ) // A pair of FsObjects @@ -566,6 +566,23 @@ func main() { fsrc, fdst = fdst, fsrc } + // Work out modify window + if fsrc != nil { + precision := fsrc.Precision() + log.Printf("Source precision %s\n", precision) + if precision > *modifyWindow { + *modifyWindow = precision + } + } + if fdst != nil { + precision := fdst.Precision() + log.Printf("Destination precision %s\n", precision) + if precision > *modifyWindow { + *modifyWindow = precision + } + } + log.Printf("Modify window is %s\n", *modifyWindow) + // Print the stats every statsInterval go func() { ch := time.Tick(*statsInterval) @@ -579,6 +596,7 @@ func main() { if found.run != nil { found.run(fdst, fsrc) fmt.Println(stats) + log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine()) if stats.errors > 0 { os.Exit(1) }