From 9ac5c6de14af811e341ddce3c7771c6c481f5ee0 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 4 Jun 2020 09:33:50 +0100 Subject: [PATCH] vfs: cache: rework file downloader - Download to multiple places at once in the stream - Restart as necessary - Timeout unused downloaders - Close reader if too much data skipped - Only use one file handle as use item for writing - Implement --vfs-read-chunk-size and --vfs-read-chunk-limit - fix deadlock between asyncbuffer and vfs cache downloader - fix display of stream abandoned error which should be ignored --- vfs/vfscache/downloader.go | 589 ++++++++++++++++++++++++------------- vfs/vfscache/item.go | 233 +++++++++------ 2 files changed, 530 insertions(+), 292 deletions(-) diff --git a/vfs/vfscache/downloader.go b/vfs/vfscache/downloader.go index 28530fa00..f85406044 100644 --- a/vfs/vfscache/downloader.go +++ b/vfs/vfscache/downloader.go @@ -2,85 +2,305 @@ package vfscache import ( "context" - "io" - "io/ioutil" - "os" "sync" + "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/asyncreader" + "github.com/rclone/rclone/fs/chunkedreader" "github.com/rclone/rclone/fs/log" - "github.com/rclone/rclone/fs/operations" - "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/ranges" - "github.com/rclone/rclone/lib/readers" ) -// downloader represents a running download for a file -type downloader struct { - // write only - mu sync.Mutex +// FIXME implement max downloaders + +const ( + // max time a downloader can be idle before closing itself + maxDownloaderIdleTime = 5 * time.Second + // max number of bytes a reader should skip over before closing it + maxSkipBytes = 1024 * 1024 +) + +// downloaders is a number of downloader~s and a queue of waiters +// waiting for segments to be downloaded. +type downloaders struct { + // Write once - no locking required ctx context.Context item *Item src fs.Object // source object - fcache fs.Fs // destination Fs + remote string + fcache fs.Fs // destination Fs osPath string - // per download - out *os.File // file we are writing to - offset int64 // current offset - waiters []waiter - tr *accounting.Transfer - in *accounting.Account // input we are reading from - downloading bool // whether the download thread is running - finished chan struct{} // closed when download finished + // Read write + mu sync.Mutex + dls []*downloader + waiters []waiter } -// waiter is a range we are waiting for and a channel to signal +// waiter is a range we are waiting for and a channel to signal when +// the range is found type waiter struct { r ranges.Range errChan chan<- error } -func newDownloader(item *Item, fcache fs.Fs, remote string, src fs.Object) (dl *downloader, err error) { - defer log.Trace(src, "remote=%q", remote)("dl=%+v, err=%v", &dl, &err) +// downloader represents a running download for part of a file. +type downloader struct { + // Write once + dls *downloaders // parent structure + quit chan struct{} // close to quit the downloader + wg sync.WaitGroup // to keep track of downloader goroutine + kick chan struct{} // kick the downloader when needed - dl = &downloader{ + // Read write + mu sync.Mutex + start int64 // start offset + offset int64 // current offset + maxOffset int64 // maximum offset we are reading to + tr *accounting.Transfer + in *accounting.Account // input we are reading from + skipped int64 // number of bytes we have skipped sequentially + _closed bool // set to true if downloader is closed + stop bool // set to true if we have called _stop() +} + +func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls *downloaders) { + dls = &downloaders{ ctx: context.Background(), item: item, src: src, + remote: remote, fcache: fcache, osPath: item.c.toOSPath(remote), } + return dls +} - // make sure there is a cache file - _, err = os.Stat(dl.osPath) - if err == nil { - // do nothing - } else if os.IsNotExist(err) { - return nil, errors.New("vfs cache: internal error: newDownloader: called before Item.Open") - // fs.Debugf(src, "creating empty file") - // err = item._truncateToCurrentSize() - // if err != nil { - // return nil, errors.Wrap(err, "newDownloader: failed to create empty file") - // } - } else { - return nil, errors.Wrap(err, "newDownloader: failed to stat cache file") +// Make a new downloader, starting it to download r +// +// call with lock held +func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) { + defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) + + dl = &downloader{ + kick: make(chan struct{}, 1), + quit: make(chan struct{}), + dls: dls, + start: r.Pos, + offset: r.Pos, + maxOffset: r.End(), } + err = dl.open(dl.offset) + if err != nil { + _ = dl.close(err) + return nil, errors.Wrap(err, "failed to open downloader") + } + + dls.dls = append(dls.dls, dl) + + dl.wg.Add(1) + go func() { + defer dl.wg.Done() + err := dl.download() + _ = dl.close(err) + if err != nil { + fs.Errorf(dl.dls.src, "Failed to download: %v", err) + } + err = dl.dls.kickWaiters() + if err != nil { + fs.Errorf(dl.dls.src, "Failed to kick waiters: %v", err) + } + }() + return dl, nil } +// _removeClosed() removes any downloaders which are closed. +// +// Call with the mutex held +func (dls *downloaders) _removeClosed() { + newDownloaders := dls.dls[:0] + for _, dl := range dls.dls { + if !dl.closed() { + newDownloaders = append(newDownloaders, dl) + } + } + dls.dls = newDownloaders +} + +// Close all running downloaders and return any unfulfilled waiters +// with inErr +func (dls *downloaders) close(inErr error) (err error) { + dls.mu.Lock() + defer dls.mu.Unlock() + dls._removeClosed() + for _, dl := range dls.dls { + dls.mu.Unlock() + closeErr := dl.stopAndClose(inErr) + dls.mu.Lock() + if closeErr != nil && err != nil { + err = closeErr + } + } + dls.dls = nil + dls._dispatchWaiters() + dls._closeWaiters(inErr) + return err +} + +// Ensure a downloader is running to download r +func (dls *downloaders) ensure(r ranges.Range) (err error) { + defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err) + + dls.mu.Lock() + + errChan := make(chan error) + waiter := waiter{ + r: r, + errChan: errChan, + } + + err = dls._ensureDownloader(r) + if err != nil { + dls.mu.Unlock() + return err + } + + dls.waiters = append(dls.waiters, waiter) + dls.mu.Unlock() + return <-errChan +} + // close any waiters with the error passed in // // call with lock held -func (dl *downloader) _closeWaiters(err error) { - for _, waiter := range dl.waiters { +func (dls *downloaders) _closeWaiters(err error) { + for _, waiter := range dls.waiters { waiter.errChan <- err } - dl.waiters = nil + dls.waiters = nil +} + +// ensure a downloader is running for the range if required. If one isn't found +// then it starts it. +// +// call with lock held +func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) { + // FIXME this window could be a different config var? + window := int64(fs.Config.BufferSize) + + // We may be reopening a downloader after a failure here or + // doing a tentative prefetch so check to see that we haven't + // read some stuff already. + // + // Clip r to stuff which needs downloading + r = dls.item.findMissing(r) + + // If the range is entirely present then we only need to start a + // dowloader if the window isn't full. + if r.IsEmpty() { + // Make a new range which includes the window + rWindow := r + if rWindow.Size < window { + rWindow.Size = window + } + // Clip rWindow to stuff which needs downloading + rWindow = dls.item.findMissing(rWindow) + // If rWindow is empty then just return without starting a + // downloader as there is no data within the window which needs + // downloading. + if rWindow.IsEmpty() { + return nil + } + // Start downloading at the start of the unread window + r.Pos = rWindow.Pos + // But don't write anything for the moment + r.Size = 0 + } + + var dl *downloader + // Look through downloaders to find one in range + // If there isn't one then start a new one + dls._removeClosed() + for _, dl = range dls.dls { + start, maxOffset := dl.getRange() + + // The downloader's offset to offset+window is the gap + // in which we would like to re-use this + // downloader. The downloader will never reach before + // start and maxOffset+windows is too far away - we'd + // rather start another downloader. + // fs.Debugf(nil, "r=%v start=%d, maxOffset=%d, found=%v", r, start, maxOffset, r.Pos >= start && r.Pos < maxOffset+window) + if r.Pos >= start && r.Pos < maxOffset+window { + // Found downloader which will soon have our data + dl.setRange(r) + return nil + } + } + // Downloader not found so start a new one + dl, err = dls._newDownloader(r) + if err != nil { + return errors.Wrap(err, "failed to start downloader") + } + return err +} + +// ensure a downloader is running for offset if required. If one +// isn't found then it starts it +func (dls *downloaders) ensureDownloader(r ranges.Range) (err error) { + dls.mu.Lock() + defer dls.mu.Unlock() + return dls._ensureDownloader(r) +} + +// _dispatchWaiters() sends any waiters which have completed back to +// their callers. +// +// Call with the mutex held +func (dls *downloaders) _dispatchWaiters() { + if len(dls.waiters) == 0 { + return + } + + newWaiters := dls.waiters[:0] + for _, waiter := range dls.waiters { + if dls.item.hasRange(waiter.r) { + waiter.errChan <- nil + } else { + newWaiters = append(newWaiters, waiter) + } + } + dls.waiters = newWaiters +} + +// Send any waiters which have completed back to their callers and make sure +// there is a downloader appropriate for each waiter +func (dls *downloaders) kickWaiters() (err error) { + dls.mu.Lock() + defer dls.mu.Unlock() + + dls._dispatchWaiters() + + if len(dls.waiters) == 0 { + return nil + } + + // Make sure each waiter has a downloader + // This is an O(waiters*downloaders) algorithm + // However the number of waiters and the number of downloaders + // are both expected to be small. + for _, waiter := range dls.waiters { + err = dls._ensureDownloader(waiter.r) + if err != nil { + fs.Errorf(dls.src, "Restart download failed: %v", err) + } + } + + return nil } // Write writes len(p) bytes from p to the underlying data stream. It @@ -91,221 +311,194 @@ func (dl *downloader) _closeWaiters(err error) { // // Implementations must not retain p. func (dl *downloader) Write(p []byte) (n int, err error) { - defer log.Trace(dl.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) + defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) - var ( - // Range we wish to write - r = ranges.Range{Pos: dl.offset, Size: int64(len(p))} - curr ranges.Range - present bool - nn int - ) - - // Check to see what regions are already present - dl.mu.Lock() - defer dl.mu.Unlock() - dl.item.mu.Lock() - defer dl.item.mu.Unlock() - - // Write the range out ignoring already written chunks - // FIXME might stop downloading if we are ignoring chunks? - for err == nil && !r.IsEmpty() { - curr, r, present = dl.item.info.Rs.Find(r) - if curr.Pos != dl.offset { - return n, errors.New("internal error: offset of range is wrong") + // Kick the waiters on exit if some characters received + defer func() { + if n <= 0 { + return } - if present { - // if present want to skip this range - fs.Debugf(dl.src, "skip chunk offset=%d size=%d", dl.offset, curr.Size) - nn = int(curr.Size) - _, err = dl.out.Seek(curr.Size, io.SeekCurrent) - if err != nil { - nn = 0 + if waitErr := dl.dls.kickWaiters(); waitErr != nil { + fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr) + if err == nil { + err = waitErr } - } else { - // if range not present then we want to write it - fs.Debugf(dl.src, "write chunk offset=%d size=%d", dl.offset, curr.Size) - nn, err = dl.out.Write(p[:curr.Size]) - dl.item.info.Rs.Insert(ranges.Range{Pos: dl.offset, Size: int64(nn)}) - } - dl.offset += int64(nn) - p = p[nn:] - n += nn - } - if n > 0 { - if len(dl.waiters) > 0 { - newWaiters := dl.waiters[:0] - for _, waiter := range dl.waiters { - if dl.item.info.Rs.Present(waiter.r) { - waiter.errChan <- nil - } else { - newWaiters = append(newWaiters, waiter) - } - } - dl.waiters = newWaiters - } - } - if err != nil && err != io.EOF { - dl._closeWaiters(err) - } - return n, err -} - -// start the download running from offset -func (dl *downloader) start(offset int64) (err error) { - err = dl.open(offset) - if err != nil { - _ = dl.close(err) - return errors.Wrap(err, "failed to open downloader") - } - - go func() { - err := dl.download() - _ = dl.close(err) - if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { - fs.Errorf(dl.src, "Failed to download: %v", err) - // FIXME set an error here???? } }() - return nil + dl.mu.Lock() + defer dl.mu.Unlock() + + // Wait here if we have reached maxOffset until + // - we are quitting + // - we get kicked + // - timeout happens + if dl.offset >= dl.maxOffset { + var timeout = time.NewTimer(maxDownloaderIdleTime) + dl.mu.Unlock() + select { + case <-dl.quit: + dl.mu.Lock() + timeout.Stop() + case <-dl.kick: + dl.mu.Lock() + timeout.Stop() + case <-timeout.C: + // stop any future reading + dl.mu.Lock() + if !dl.stop { + fs.Debugf(dl.dls.src, "stopping download thread as it timed out") + dl._stop() + } + } + } + + n, skipped, err := dl.dls.item.writeAtNoOverwrite(p, dl.offset) + if skipped == n { + dl.skipped += int64(skipped) + } else { + dl.skipped = 0 + } + dl.offset += int64(n) + + // Kill this downloader if skipped too many bytes + if !dl.stop && dl.skipped > maxSkipBytes { + fs.Debugf(dl.dls.src, "stopping download thread as it has skipped %d bytes", dl.skipped) + dl._stop() + } + return n, err } // open the file from offset // // should be called on a fresh downloader func (dl *downloader) open(offset int64) (err error) { - defer log.Trace(dl.src, "offset=%d", offset)("err=%v", &err) - dl.finished = make(chan struct{}) - defer close(dl.finished) - dl.downloading = true - dl.tr = accounting.Stats(dl.ctx).NewTransfer(dl.src) + defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err) + dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src) - size := dl.src.Size() + size := dl.dls.src.Size() if size < 0 { // FIXME should just completely download these return errors.New("can't open unknown sized file") } // FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag. - var rangeOption *fs.RangeOption - if offset > 0 { - rangeOption = &fs.RangeOption{Start: offset, End: size - 1} - } - in0, err := operations.NewReOpen(dl.ctx, dl.src, fs.Config.LowLevelRetries, dl.item.c.hashOption, rangeOption) + // var rangeOption *fs.RangeOption + // if offset > 0 { + // rangeOption = &fs.RangeOption{Start: offset, End: size - 1} + // } + // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, fs.Config.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) + + in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.item.c.opt.ChunkSize), int64(dl.dls.item.c.opt.ChunkSizeLimit)) + _, err = in0.Seek(offset, 0) if err != nil { return errors.Wrap(err, "vfs reader: failed to open source file") } dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer - dl.out, err = file.OpenFile(dl.osPath, os.O_CREATE|os.O_WRONLY, 0700) - if err != nil { - return errors.Wrap(err, "vfs reader: failed to open cache file") - } - dl.offset = offset - err = file.SetSparse(dl.out) - if err != nil { - fs.Debugf(dl.src, "vfs reader: failed to set as a sparse file: %v", err) - } - - _, err = dl.out.Seek(offset, io.SeekStart) - if err != nil { - return errors.Wrap(err, "vfs reader: failed to seek") - } - // FIXME set mod time // FIXME check checksums return nil } -var errStop = errors.New("vfs downloader: reading stopped") - -// stop the downloader if running and close everything -func (dl *downloader) stop() { - defer log.Trace(dl.src, "")("") - - dl.mu.Lock() - if !dl.downloading || dl.in == nil { - dl.mu.Unlock() - return - } - - // stop the downloader - dl.in.StopBuffering() - oldReader := dl.in.GetReader() - dl.in.UpdateReader(ioutil.NopCloser(readers.ErrorReader{Err: errStop})) - err := oldReader.Close() - if err != nil { - fs.Debugf(dl.src, "vfs downloader: stop close old failed: %v", err) - } - - dl.mu.Unlock() - - // wait for downloader to finish... - <-dl.finished -} - +// close the downloader func (dl *downloader) close(inErr error) (err error) { - defer log.Trace(dl.src, "inErr=%v", err)("err=%v", &err) - dl.stop() + defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err) + checkErr := func(e error) { + if e == nil || errors.Cause(err) == asyncreader.ErrorStreamAbandoned { + return + } + err = e + } dl.mu.Lock() if dl.in != nil { - fs.CheckClose(dl.in, &err) + checkErr(dl.in.Close()) dl.in = nil } if dl.tr != nil { dl.tr.Done(inErr) dl.tr = nil } - if dl.out != nil { - fs.CheckClose(dl.out, &err) - dl.out = nil - } - dl._closeWaiters(err) - dl.downloading = false + dl._closed = true dl.mu.Unlock() return nil } -/* -FIXME -need gating at all the Read/Write sites -need to pass in offset somehow and start the readfile off -need to end when offset is reached -need to be able to quit on demand -Need offset to be passed to NewReOpen -*/ -// fetch the (offset, size) block from the remote file +// closed returns true if the downloader has been closed alread +func (dl *downloader) closed() bool { + dl.mu.Lock() + defer dl.mu.Unlock() + return dl._closed +} + +// stop the downloader if running +// +// Call with the mutex held +func (dl *downloader) _stop() { + defer log.Trace(dl.dls.src, "")("") + + // exit if have already called _stop + if dl.stop { + return + } + dl.stop = true + + // Signal quit now to unblock the downloader + close(dl.quit) + + // stop the downloader by stopping the async reader buffering + // any more input. This causes all the stuff in the async + // buffer (which can be many MB) to be written to the disk + // before exiting. + if dl.in != nil { + dl.in.StopBuffering() + } +} + +// stop the downloader if running then close it with the error passed in +func (dl *downloader) stopAndClose(inErr error) (err error) { + // Stop the downloader by closing its input + dl.mu.Lock() + dl._stop() + dl.mu.Unlock() + // wait for downloader to finish... + // do this without mutex as asyncreader + // calls back into Write() which needs the lock + dl.wg.Wait() + return dl.close(inErr) +} + +// Start downloading to the local file starting at offset until maxOffset. func (dl *downloader) download() (err error) { - defer log.Trace(dl.src, "")("err=%v", &err) + defer log.Trace(dl.dls.src, "")("err=%v", &err) _, err = dl.in.WriteTo(dl) - if err != nil { + if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { return errors.Wrap(err, "vfs reader: failed to write to cache file") } return nil } -// ensure the range is present -func (dl *downloader) ensure(r ranges.Range) (err error) { - defer log.Trace(dl.src, "r=%+v", r)("err=%v", &err) - errChan := make(chan error) - waiter := waiter{ - r: r, - errChan: errChan, - } +// setRange makes sure the downloader is downloading the range passed in +func (dl *downloader) setRange(r ranges.Range) { dl.mu.Lock() - // FIXME racey - might have finished here - dl.waiters = append(dl.waiters, waiter) + maxOffset := r.End() + if maxOffset > dl.maxOffset { + dl.maxOffset = maxOffset + // fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset) + select { + case dl.kick <- struct{}{}: + default: + } + } dl.mu.Unlock() - return <-errChan } -// ensure the range is present -func (dl *downloader) running() bool { +// get the current range this downloader is working on +func (dl *downloader) getRange() (start, maxOffset int64) { dl.mu.Lock() defer dl.mu.Unlock() - return dl.downloading + return dl.start, dl.maxOffset } diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 132dddb9c..1d1d1779b 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -43,14 +43,14 @@ type Item struct { // read only c *Cache // cache this is part of - mu sync.Mutex // protect the variables - name string // name in the VFS - opens int // number of times file is open - downloader *downloader // if the file is being downloaded to cache - o fs.Object // object we are caching - may be nil - fd *os.File // handle we are using to read and write to the file - metaDirty bool // set if the info needs writeback - info Info // info about the file to persist to backing store + mu sync.Mutex // protect the variables + name string // name in the VFS + opens int // number of times file is open + downloaders *downloaders // a record of the downloaders in action - may be nil + o fs.Object // object we are caching - may be nil + fd *os.File // handle we are using to read and write to the file + metaDirty bool // set if the info needs writeback + info Info // info about the file to persist to backing store } @@ -453,6 +453,9 @@ func (item *Item) Open(o fs.Object) (err error) { // Relock the Item.mu for the return item.mu.Lock() + // Create the downloaders + item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o) + return err } @@ -514,32 +517,9 @@ func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) { func (item *Item) Close(storeFn StoreFn) (err error) { defer log.Trace(item.o, "Item.Close")("err=%v", &err) var ( - downloader *downloader + downloaders *downloaders syncWriteBack = item.c.opt.WriteBack <= 0 ) - // FIXME need to unlock to kill downloader - should we - // re-arrange locking so this isn't necessary? maybe - // downloader should use the item mutex for locking? or put a - // finer lock on Rs? - // - // close downloader with mutex unlocked - // downloader.Write calls ensure which needs the lock - defer func() { - if downloader != nil { - closeErr := downloader.close(nil) - if closeErr != nil && err == nil { - err = closeErr - } - } - // save the metadata once more since it may be dirty - // after the downloader - item.mu.Lock() - saveErr := item._save() - if saveErr != nil && err == nil { - err = errors.Wrap(saveErr, "close failed to save item") - } - item.mu.Unlock() - }() item.mu.Lock() defer item.mu.Unlock() @@ -554,21 +534,43 @@ func (item *Item) Close(storeFn StoreFn) (err error) { // Update the size on close _, _ = item._getSize() - err = item._save() - if err != nil { - return errors.Wrap(err, "close failed to save item") + + // Accumulate and log errors + checkErr := func(e error) { + if e != nil { + fs.Errorf(item.o, "vfs cache item close failed: %v", e) + if err == nil { + err = e + } + } } - // close the downloader - downloader = item.downloader - item.downloader = nil + // Close the downloaders + if downloaders = item.downloaders; downloaders != nil { + item.downloaders = nil + // FIXME need to unlock to kill downloader - should we + // re-arrange locking so this isn't necessary? maybe + // downloader should use the item mutex for locking? or put a + // finer lock on Rs? + // + // downloader.Write calls ensure which needs the lock + // close downloader with mutex unlocked + item.mu.Unlock() + checkErr(downloaders.close(nil)) + item.mu.Lock() + } // close the file handle if item.fd == nil { - return errors.New("vfs cache item: internal error: didn't Open file") + checkErr(errors.New("vfs cache item: internal error: didn't Open file")) + } else { + checkErr(item.fd.Close()) + item.fd = nil } - err = item.fd.Close() - item.fd = nil + + // save the metadata once more since it may be dirty + // after the downloader + checkErr(item._save()) // if the item hasn't been changed but has been completed then // set the modtime from the object otherwise set it from the info @@ -585,7 +587,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) { fs.Debugf(item.name, "item changed - writeback in %v", item.c.opt.WriteBack) if syncWriteBack { // do synchronous writeback - err = item._store(context.Background(), storeFn) + checkErr(item._store(context.Background(), storeFn)) } else { // asynchronous writeback item.c.writeback.add(item, item.name, storeFn) @@ -720,35 +722,33 @@ func (item *Item) remove(reason string) (wasWriting bool) { return item._remove(reason) } -// create a downloader for the item -// -// call with item mutex held -func (item *Item) _newDownloader() (err error) { - // If no cached object then can't download - if item.o == nil { - return errors.New("vfs cache: internal error: tried to download nil object") - } - // If downloading the object already stop the downloader and restart it - if item.downloader != nil { - item.mu.Unlock() - _ = item.downloader.close(nil) - item.mu.Lock() - item.downloader = nil - } - item.downloader, err = newDownloader(item, item.c.fremote, item.name, item.o) - return err -} - // _present returns true if the whole file has been downloaded // // call with the lock held func (item *Item) _present() bool { - if item.downloader != nil && item.downloader.running() { - return false - } return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size}) } +// hasRange returns true if the current ranges entirely include range +func (item *Item) hasRange(r ranges.Range) bool { + item.mu.Lock() + defer item.mu.Unlock() + return item.info.Rs.Present(r) +} + +// findMissing adjusts r returning a new ranges.Range which only +// contains the range which needs to be downloaded. This could be +// empty - check with IsEmpty. It also adjust this to make sure it is +// not larger than the file. +func (item *Item) findMissing(r ranges.Range) (outr ranges.Range) { + item.mu.Lock() + defer item.mu.Unlock() + outr = item.info.Rs.FindMissing(r) + // Clip returned block to size of file + outr.Clip(item.info.Size) + return outr +} + // ensure the range from offset, size is present in the backing file // // call with the item lock held @@ -759,33 +759,21 @@ func (item *Item) _ensure(offset, size int64) (err error) { } r := ranges.Range{Pos: offset, Size: size} present := item.info.Rs.Present(r) - downloader := item.downloader fs.Debugf(nil, "looking for range=%+v in %+v - present %v", r, item.info.Rs, present) - if present { - return nil - } - // FIXME pass in offset here to decide to seek? - err = item._newDownloader() - if err != nil { - return errors.Wrap(err, "Ensure: failed to start downloader") - } - downloader = item.downloader - if downloader == nil { - return errors.New("internal error: downloader is nil") - } - if !downloader.running() { - // FIXME need to make sure we start in the correct place because some of offset,size might exist - // FIXME this could stop an old download - item.mu.Unlock() - err = downloader.start(offset) - item.mu.Lock() - if err != nil { - return errors.Wrap(err, "Ensure: failed to run downloader") - } - } item.mu.Unlock() defer item.mu.Lock() - return item.downloader.ensure(r) + if present { + // This is a file we are writing so no downloaders needed + if item.downloaders == nil { + return nil + } + // Otherwise start the downloader for the future if required + return item.downloaders.ensureDownloader(r) + } + if item.downloaders == nil { + return errors.New("internal error: downloaders is nil") + } + return item.downloaders.ensure(r) } // _written marks the (offset, size) as present in the backing file @@ -796,7 +784,7 @@ func (item *Item) _ensure(offset, size int64) (err error) { // call with lock held func (item *Item) _written(offset, size int64) { defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("") - item.info.Rs.Insert(ranges.Range{Pos: offset, Size: offset + size}) + item.info.Rs.Insert(ranges.Range{Pos: offset, Size: size}) item.metaDirty = true } @@ -868,6 +856,9 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) { item.mu.Unlock() // Do the writing with Item.mu unlocked n, err = item.fd.WriteAt(b, off) + if err == nil && n != len(b) { + err = errors.Errorf("short write: tried to write %d but only %d written", len(b), n) + } item.mu.Lock() item._written(off, int64(n)) if n > 0 { @@ -881,6 +872,60 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) { return n, err } +// writeAtNoOverwrite writes b to the file, but will not overwrite +// already present ranges. +// +// This is used by the downloader to write bytes to the file +// +// It returns n the total bytes processed and skipped the number of +// bytes which were processed but not actually written to the file. +func (item *Item) writeAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) { + item.mu.Lock() + + var ( + // Range we wish to write + r = ranges.Range{Pos: off, Size: int64(len(b))} + // Ranges that we need to write + foundRanges = item.info.Rs.FindAll(r) + // Length of each write + nn int + ) + + // Write the range out ignoring already written chunks + fs.Debugf(item.name, "Ranges = %v", item.info.Rs) + for i := range foundRanges { + foundRange := &foundRanges[i] + fs.Debugf(item.name, "foundRange[%d] = %v", i, foundRange) + if foundRange.R.Pos != off { + err = errors.New("internal error: offset of range is wrong") + break + } + size := int(foundRange.R.Size) + if foundRange.Present { + // if present want to skip this range + fs.Debugf(item.name, "skip chunk offset=%d size=%d", off, size) + nn = size + skipped += size + } else { + // if range not present then we want to write it + fs.Debugf(item.name, "write chunk offset=%d size=%d", off, size) + nn, err = item.fd.WriteAt(b[:size], off) + if err == nil && nn != size { + err = errors.Errorf("downloader: short write: tried to write %d but only %d written", size, nn) + } + item._written(off, int64(nn)) + } + off += int64(nn) + b = b[nn:] + n += nn + if err != nil { + break + } + } + item.mu.Unlock() + return n, skipped, err +} + // Sync commits the current contents of the file to stable storage. Typically, // this means flushing the file system's in-memory copy of recently written // data to disk. @@ -904,11 +949,11 @@ func (item *Item) Sync() (err error) { // rename the item func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) { - var downloader *downloader + var downloaders *downloaders // close downloader with mutex unlocked defer func() { - if downloader != nil { - _ = downloader.close(nil) + if downloaders != nil { + _ = downloaders.close(nil) } }() @@ -916,8 +961,8 @@ func (item *Item) rename(name string, newName string, newObj fs.Object) (err err defer item.mu.Unlock() // stop downloader - downloader = item.downloader - item.downloader = nil + downloaders = item.downloaders + item.downloaders = nil // Set internal state item.name = newName