From 2c6213239aa5cf1cd78bb061cf8fa349131c4466 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 29 Aug 2017 21:37:36 -0700 Subject: [PATCH] added cache expiration to fscache.Cache, so that entries based on ObjectID are cached for a very long time and others expire almost immediately. This allows 'kopia mount all' to see newly created snapshots. --- internal/fscache/cache.go | 65 +++++++++++++++++++++------------- internal/fscache/cache_test.go | 63 ++++++++++++++++---------------- internal/fusemount/fusefs.go | 22 ++++++++++-- 3 files changed, 92 insertions(+), 58 deletions(-) diff --git a/internal/fscache/cache.go b/internal/fscache/cache.go index c05f95ea0..979fed974 100644 --- a/internal/fscache/cache.go +++ b/internal/fscache/cache.go @@ -2,42 +2,36 @@ package fscache import ( + "log" "sync" - "sync/atomic" + "time" "github.com/kopia/kopia/fs" ) type cacheEntry struct { - id int64 + id string prev *cacheEntry next *cacheEntry - entries fs.Entries + expireAfter time.Time + entries fs.Entries } // Cache maintains in-memory cache of recently-read data to speed up filesystem operations. type Cache struct { sync.Mutex - nextID int64 totalDirectoryEntries int maxDirectories int maxDirectoryEntries int - data map[int64]*cacheEntry + data map[string]*cacheEntry // Doubly-linked list of entries, in access time order head *cacheEntry tail *cacheEntry -} -// AllocateID allocates new unique ID to be used when referring to cached items. -func (c *Cache) AllocateID() int64 { - if c == nil { - return 0 - } - - return atomic.AddInt64(&c.nextID, 1) + debug bool } func (c *Cache) moveToHead(e *cacheEntry) { @@ -77,20 +71,37 @@ func (c *Cache) remove(e *cacheEntry) { } } +// Loader provides data to be stored in the cache. +type Loader func() (fs.Entries, error) + // GetEntries consults the cache and either retrieves the contents of directory listing from the cache // or invokes the provides callback and adds the results to cache. -func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, error) { +func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) { if c == nil { return cb() } c.Lock() - if v, ok := c.data[id]; ok { - c.moveToHead(v) - c.Unlock() - return v.entries, nil + if v, ok := c.data[id]; id != "" && ok { + if time.Now().Before(v.expireAfter) { + c.moveToHead(v) + c.Unlock() + if c.debug { + log.Printf("cache hit for %q (valid until %v)", id, v.expireAfter) + } + return v.entries, nil + } + + // time expired + if c.debug { + log.Printf("removing expired cache entry %q after %v", id, v.expireAfter) + } + c.removeEntryLocked(v) } + if c.debug { + log.Printf("cache miss for %q", id) + } raw, err := cb() if err != nil { return nil, err @@ -103,18 +114,16 @@ func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, } entry := &cacheEntry{ - id: id, - entries: raw, + id: id, + entries: raw, + expireAfter: time.Now().Add(expirationTime), } c.addToHead(entry) c.data[id] = entry c.totalDirectoryEntries += len(raw) for c.totalDirectoryEntries > c.maxDirectoryEntries || len(c.data) > c.maxDirectories { - toremove := c.tail - c.remove(toremove) - c.totalDirectoryEntries -= len(toremove.entries) - delete(c.data, toremove.id) + c.removeEntryLocked(c.tail) } c.Unlock() @@ -122,6 +131,12 @@ func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, return raw, nil } +func (c *Cache) removeEntryLocked(toremove *cacheEntry) { + c.remove(toremove) + c.totalDirectoryEntries -= len(toremove.entries) + delete(c.data, toremove.id) +} + // CacheOption modifies the behavior of FUSE node cache. type CacheOption func(c *Cache) @@ -142,7 +157,7 @@ func MaxCachedDirectoryEntries(count int) CacheOption { // NewCache creates FUSE node cache. func NewCache(options ...CacheOption) *Cache { c := &Cache{ - data: make(map[int64]*cacheEntry), + data: make(map[string]*cacheEntry), maxDirectories: 1000, maxDirectoryEntries: 100000, } diff --git a/internal/fscache/cache_test.go b/internal/fscache/cache_test.go index 8a34bb909..22ca6abb2 100644 --- a/internal/fscache/cache_test.go +++ b/internal/fscache/cache_test.go @@ -7,16 +7,19 @@ "reflect" "runtime" "testing" + "time" "github.com/kopia/kopia/fs" ) +const expirationTime = 10 * time.Hour + type cacheSource struct { - data map[int64]fs.Entries - callCounter map[int64]int + data map[string]fs.Entries + callCounter map[string]int } -func (cs *cacheSource) get(id int64) func() (fs.Entries, error) { +func (cs *cacheSource) get(id string) func() (fs.Entries, error) { return func() (fs.Entries, error) { cs.callCounter[id]++ d, ok := cs.data[id] @@ -28,7 +31,7 @@ func (cs *cacheSource) get(id int64) func() (fs.Entries, error) { } } -func (cs *cacheSource) setEntryCount(id int64, cnt int) { +func (cs *cacheSource) setEntryCount(id string, cnt int) { var fakeEntries fs.Entries var fakeEntry fs.Entry for i := 0; i < cnt; i++ { @@ -41,18 +44,18 @@ func (cs *cacheSource) setEntryCount(id int64, cnt int) { func newCacheSource() *cacheSource { return &cacheSource{ - data: make(map[int64]fs.Entries), - callCounter: make(map[int64]int), + data: make(map[string]fs.Entries), + callCounter: make(map[string]int), } } type cacheVerifier struct { cache *Cache cacheSource *cacheSource - lastCallCounter map[int64]int + lastCallCounter map[string]int } -func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id int64) { +func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id string) { actual := cv.cacheSource.callCounter[id] expected := cv.lastCallCounter[id] + 1 if actual != expected { @@ -61,15 +64,15 @@ func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id int64) { cv.reset() } -func (cv *cacheVerifier) verifyCacheHit(t *testing.T, id int64) { +func (cv *cacheVerifier) verifyCacheHit(t *testing.T, id string) { if !reflect.DeepEqual(cv.lastCallCounter, cv.cacheSource.callCounter) { t.Errorf(errorPrefix()+" unexpected call counters for %v, got %v, expected %v", id, cv.cacheSource.callCounter, cv.lastCallCounter) } cv.reset() } -func (cv *cacheVerifier) verifyCacheOrdering(t *testing.T, expectedOrdering ...int64) { - var actualOrdering []int64 +func (cv *cacheVerifier) verifyCacheOrdering(t *testing.T, expectedOrdering ...string) { + var actualOrdering []string var totalDirectoryEntries int var totalDirectories int for e := cv.cache.head; e != nil; e = e.next { @@ -109,7 +112,7 @@ func errorPrefix() string { } func (cv *cacheVerifier) reset() { - cv.lastCallCounter = make(map[int64]int) + cv.lastCallCounter = make(map[string]int) for k, v := range cv.cacheSource.callCounter { cv.lastCallCounter[k] = v } @@ -126,13 +129,13 @@ func TestCache(t *testing.T) { cs := newCacheSource() cv := cacheVerifier{cacheSource: cs, cache: c} - var id1 int64 = 1 - var id2 int64 = 2 - var id3 int64 = 3 - var id4 int64 = 4 - var id5 int64 = 5 - var id6 int64 = 6 - var id7 int64 = 7 + id1 := "1" + id2 := "2" + id3 := "3" + id4 := "4" + id5 := "5" + id6 := "6" + id7 := "7" cs.setEntryCount(id1, 3) cs.setEntryCount(id2, 3) cs.setEntryCount(id3, 3) @@ -144,57 +147,57 @@ func TestCache(t *testing.T) { cv.verifyCacheOrdering(t) // fetch id1 - c.GetEntries(id1, cs.get(id1)) + c.GetEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheMiss(t, id1) cv.verifyCacheOrdering(t, id1) // fetch id1 again - cache hit, no change - c.GetEntries(id1, cs.get(id1)) + c.GetEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheHit(t, id1) cv.verifyCacheOrdering(t, id1) // fetch id2 - c.GetEntries(id2, cs.get(id2)) + c.GetEntries(id2, expirationTime, cs.get(id2)) cv.verifyCacheMiss(t, id2) cv.verifyCacheOrdering(t, id2, id1) // fetch id1 again - cache hit, id1 moved to the top of the LRU list - c.GetEntries(id1, cs.get(id1)) + c.GetEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheHit(t, id1) cv.verifyCacheOrdering(t, id1, id2) // fetch id2 again - c.GetEntries(id2, cs.get(id2)) + c.GetEntries(id2, expirationTime, cs.get(id2)) cv.verifyCacheHit(t, id2) cv.verifyCacheOrdering(t, id2, id1) // fetch id3 - c.GetEntries(id3, cs.get(id3)) + c.GetEntries(id3, expirationTime, cs.get(id3)) cv.verifyCacheMiss(t, id3) cv.verifyCacheOrdering(t, id3, id2, id1) // fetch id4 - c.GetEntries(id4, cs.get(id4)) + c.GetEntries(id4, expirationTime, cs.get(id4)) cv.verifyCacheMiss(t, id4) cv.verifyCacheOrdering(t, id4, id3) // fetch id1 again - c.GetEntries(id1, cs.get(id1)) + c.GetEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheMiss(t, id1) cv.verifyCacheOrdering(t, id1, id4) // fetch id5, it's a big one that expels all but one - c.GetEntries(id5, cs.get(id5)) + c.GetEntries(id5, expirationTime, cs.get(id5)) cv.verifyCacheMiss(t, id5) cv.verifyCacheOrdering(t, id5, id1) // fetch id6 - c.GetEntries(id6, cs.get(id6)) + c.GetEntries(id6, expirationTime, cs.get(id6)) cv.verifyCacheMiss(t, id6) cv.verifyCacheOrdering(t, id6) // fetch id7 - c.GetEntries(id7, cs.get(id7)) + c.GetEntries(id7, expirationTime, cs.get(id7)) cv.verifyCacheMiss(t, id7) cv.verifyCacheOrdering(t, id6) } diff --git a/internal/fusemount/fusefs.go b/internal/fusemount/fusefs.go index 9ffdba179..79cae557d 100644 --- a/internal/fusemount/fusefs.go +++ b/internal/fusemount/fusefs.go @@ -10,6 +10,10 @@ "io/ioutil" "os" "sort" + "time" + + "github.com/google/uuid" + "github.com/kopia/kopia/repo" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/internal/fscache" @@ -51,7 +55,8 @@ func (f *fuseFileNode) ReadAll(ctx context.Context) ([]byte, error) { type fuseDirectoryNode struct { fuseNode - cacheID int64 + cacheID string + cacheExpiration time.Duration } func (dir *fuseDirectoryNode) directory() fs.Directory { @@ -77,7 +82,7 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse } func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) { - return dir.cache.GetEntries(dir.cacheID, func() (fs.Entries, error) { + return dir.cache.GetEntries(dir.cacheID, dir.cacheExpiration, func() (fs.Entries, error) { entries, err := dir.directory().Readdir() if err != nil { return nil, err @@ -142,7 +147,18 @@ func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) { } func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node { - return &fuseDirectoryNode{fuseNode{dir, cache}, cache.AllocateID()} + var cacheID string + var cacheExpiration time.Duration + + if h, ok := dir.(repo.HasObjectID); ok { + cacheID = h.ObjectID().String() + cacheExpiration = 24 * time.Hour + } else { + cacheID = uuid.New().String() + cacheExpiration = 1 * time.Second + } + + return &fuseDirectoryNode{fuseNode{dir, cache}, cacheID, cacheExpiration} } // NewDirectoryNode returns FUSE Node for a given fs.Directory