From 5de9278650584ddcb757fd82d28fdcede3927752 Mon Sep 17 00:00:00 2001 From: Martin Czygan <53705+miku@users.noreply.github.com> Date: Tue, 28 Jun 2022 13:51:59 +0200 Subject: [PATCH] fs/cache: make sure we call the Shutdown method on backends This change ensures we call the Shutdown method on backends when they drop out of the fs/cache and at program exit. Some backends implement the optional fs.Shutdowner interface. Until now, Shutdown is only checked and called, when a backend is wrapped (e.g. crypt, compress, ...). To have a general way to perform operations at the end of the backend lifecycle with proper error handling, we can call Shutdown at cache clear time. We add a finalize hook to the cache which will be called when values drop out of the cache. Previous discussion: https://forum.rclone.org/t/31336 --- cmd/cmd.go | 6 ++++++ fs/cache/cache.go | 5 +++++ lib/cache/cache.go | 36 +++++++++++++++++++++++++++++------- lib/cache/cache_test.go | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 5dd120be7..6209fad55 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -321,6 +321,12 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) { } } + // clear cache and shutdown backends + cache.Clear() + if lastErr := accounting.GlobalStats().GetLastError(); cmdErr == nil { + cmdErr = lastErr + } + // Log the final error message and exit if cmdErr != nil { nerrs := accounting.GlobalStats().GetErrors() diff --git a/fs/cache/cache.go b/fs/cache/cache.go index 3bdc8d37f..4e2c4638e 100644 --- a/fs/cache/cache.go +++ b/fs/cache/cache.go @@ -25,6 +25,11 @@ func createOnFirstUse() { c = cache.New() c.SetExpireDuration(ci.FsCacheExpireDuration) c.SetExpireInterval(ci.FsCacheExpireInterval) + c.SetFinalizer(func(value interface{}) { + if s, ok := value.(fs.Shutdowner); ok { + _ = fs.CountError(s.Shutdown(context.Background())) + } + }) }) } diff --git a/lib/cache/cache.go b/lib/cache/cache.go index f9533c137..b8d323ff1 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -16,6 +16,7 @@ type Cache struct { expireRunning bool expireDuration time.Duration // expire the cache entry when it is older than this expireInterval time.Duration // interval to run the cache expire + finalize func(value interface{}) } // New creates a new cache with the default expire duration and interval @@ -25,6 +26,7 @@ func New() *Cache { expireRunning: false, expireDuration: 300 * time.Second, expireInterval: 60 * time.Second, + finalize: func(_ interface{}) {}, } } @@ -154,7 +156,10 @@ func (c *Cache) GetMaybe(key string) (value interface{}, found bool) { // Returns true if the entry was found func (c *Cache) Delete(key string) bool { c.mu.Lock() - _, found := c.cache[key] + entry, found := c.cache[key] + if found { + c.finalize(entry.value) + } delete(c.cache, key) c.mu.Unlock() return found @@ -165,11 +170,13 @@ func (c *Cache) Delete(key string) bool { // Returns number of entries deleted func (c *Cache) DeletePrefix(prefix string) (deleted int) { c.mu.Lock() - for k := range c.cache { - if strings.HasPrefix(k, prefix) { - delete(c.cache, k) - deleted++ + for key, entry := range c.cache { + if !strings.HasPrefix(key, prefix) { + continue } + c.finalize(entry.value) + delete(c.cache, key) + deleted++ } c.mu.Unlock() return deleted @@ -183,12 +190,17 @@ func (c *Cache) Rename(oldKey, newKey string) (value interface{}, found bool) { c.mu.Lock() if newEntry, newFound := c.cache[newKey]; newFound { // If new entry is found use that + if _, oldFound := c.cache[oldKey]; oldFound { + // If there's an old entry, we drop it and also try shutdown. + c.finalize(c.cache[oldKey].value) + } delete(c.cache, oldKey) value, found = newEntry.value, newFound c.used(newEntry) } else if oldEntry, oldFound := c.cache[oldKey]; oldFound { // If old entry is found rename it to new and use that c.cache[newKey] = oldEntry + // No need to shutdown here, as value lives on under newKey delete(c.cache, oldKey) c.used(oldEntry) value, found = oldEntry.value, oldFound @@ -204,6 +216,7 @@ func (c *Cache) cacheExpire() { now := time.Now() for key, entry := range c.cache { if entry.pinCount <= 0 && now.Sub(entry.lastUsed) > c.expireDuration { + c.finalize(entry.value) delete(c.cache, key) } } @@ -218,10 +231,12 @@ func (c *Cache) cacheExpire() { // Clear removes everything from the cache func (c *Cache) Clear() { c.mu.Lock() - for k := range c.cache { - delete(c.cache, k) + for key, entry := range c.cache { + c.finalize(entry.value) + delete(c.cache, key) } c.mu.Unlock() + return } // Entries returns the number of entries in the cache @@ -231,3 +246,10 @@ func (c *Cache) Entries() int { c.mu.Unlock() return entries } + +// SetFinalizer sets a function to be called when a value drops out of the cache +func (c *Cache) SetFinalizer(finalize func(interface{})) { + c.mu.Lock() + c.finalize = finalize + c.mu.Unlock() +} diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 089db07ef..415e167c3 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -333,3 +333,37 @@ func TestCacheRename(t *testing.T) { assert.Equal(t, 1, c.Entries()) } + +func TestCacheFinalize(t *testing.T) { + c := New() + numCalled := 0 + c.SetFinalizer(func(v interface{}) { + numCalled++ + }) + create := func(path string) (interface{}, bool, error) { + return path, true, nil + } + _, _ = c.Get("ok", create) + assert.Equal(t, 0, numCalled) + c.Clear() + assert.Equal(t, 1, numCalled) + + _, _ = c.Get("ok", create) + c.Delete("ok") + assert.Equal(t, 2, numCalled) + + _, _ = c.Get("ok", create) + c.DeletePrefix("ok") + assert.Equal(t, 3, numCalled) + + _, _ = c.Get("old", create) + _, _ = c.Get("new", create) + c.Rename("old", "new") + assert.Equal(t, 4, numCalled) + + c.expireDuration = 1 * time.Millisecond + _, _ = c.Get("ok", create) + time.Sleep(2 * time.Millisecond) + c.cacheExpire() // "ok" and "new" fall out of cache + assert.Equal(t, 6, numCalled) +}