From 9a066460f1572644ef52843be10a554a1e67c8c5 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 23 Dec 2012 09:32:33 +0000 Subject: [PATCH] Make downloads work in parallel --- swiftsync.go | 78 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/swiftsync.go b/swiftsync.go index f93be1bfa..88a65f3c5 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -37,7 +37,7 @@ var ( userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.") apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") - uploaders = flag.Int("uploaders", 4, "Number of uploaders to run in parallel.") + transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") ) // A filesystem like object which can either be a remote object or a @@ -193,7 +193,7 @@ func (fs *FsObject) put(c *swift.Connection, container string) { fs.Debugf("Uploaded") } -// Stat a FsObject info info +// Stat a FsObject into info func (fs *FsObject) lstat() error { info, err := os.Lstat(fs.path) fs.info = info @@ -249,15 +249,24 @@ func walk(root string) FsObjectsChan { // Read FsObjects on in and write them to out if they need uploading // // FIXME potentially doing lots of MD5SUMS at once -func checker(c *swift.Connection, container string, in, out FsObjectsChan, wg *sync.WaitGroup) { +func checker(c *swift.Connection, container string, in, out FsObjectsChan, upload bool, wg *sync.WaitGroup) { defer wg.Done() for fs := range in { + if !upload { + _ = fs.lstat() + if fs.info == nil { + fs.Debugf("Couldn't find local file - download") + out <- fs + continue + } + } + // Check to see if can store this if !fs.storable() { continue } // Check to see if changed or not - if fs.Equal(c, container, true) { + if fs.Equal(c, container, upload) { fs.Debugf("Unchanged skipping") continue } @@ -278,17 +287,17 @@ func upload(c *swift.Connection, args []string) { root, container := args[0], args[1] mkdir(c, []string{container}) to_be_checked := walk(root) - to_be_uploaded := make(FsObjectsChan, *uploaders) + to_be_uploaded := make(FsObjectsChan, *transfers) var checkerWg sync.WaitGroup checkerWg.Add(*checkers) for i := 0; i < *checkers; i++ { - go checker(c, container, to_be_checked, to_be_uploaded, &checkerWg) + go checker(c, container, to_be_checked, to_be_uploaded, true, &checkerWg) } var uploaderWg sync.WaitGroup - uploaderWg.Add(*uploaders) - for i := 0; i < *uploaders; i++ { + uploaderWg.Add(*transfers) + for i := 0; i < *transfers; i++ { go uploader(c, container, to_be_uploaded, &uploaderWg) } @@ -344,12 +353,17 @@ func (fs *FsObject) get(c *swift.Connection, container string) { } } + +// Read FsObjects on in and download them +func downloader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) { + defer wg.Done() + for fs := range in { + fs.get(c, container) + } +} + // Syncs a container into a directory // -// FIXME don't want to update the modification times on the -// remote server if they are different - want to modify the local -// file! -// // FIXME need optional stat in FsObject and to be able to make FsObjects from ObjectsAll // // FIXME should download and stat many at once @@ -366,26 +380,34 @@ func download(c *swift.Connection, args []string) { log.Fatalf("Couldn't make directory %q: %s", root, err) } + to_be_checked := make(FsObjectsChan, *checkers) + to_be_downloaded := make(FsObjectsChan, *transfers) + + var checkerWg sync.WaitGroup + checkerWg.Add(*checkers) + for i := 0; i < *checkers; i++ { + go checker(c, container, to_be_checked, to_be_downloaded, false, &checkerWg) + } + + var downloaderWg sync.WaitGroup + downloaderWg.Add(*transfers) + for i := 0; i < *transfers; i++ { + go downloader(c, container, to_be_downloaded, &downloaderWg) + } + for i := range objects { object := &objects[i] filepath := path.Join(root, object.Name) - fs := FsObject{remote: object.Name, path: filepath} - _ = fs.lstat() - if fs.info == nil { - fs.Debugf("Couldn't find local file - download") - } else { - fs.Debugf("Found local file - checking") - if !fs.storable() { - fs.Debugf("Not overwriting different type local file") - continue - } - if fs.Equal(c, container, false) { - fs.Debugf("Skip: not changed") - continue - } - } - fs.get(c, container) + to_be_checked <- &FsObject{remote: object.Name, path: filepath} } + close(to_be_checked) + + log.Printf("Waiting for checks to finish") + checkerWg.Wait() + close(to_be_downloaded) + log.Printf("Waiting for downloads to finish") + downloaderWg.Wait() + } // Lists the containers