added cache expiration to fscache.Cache, so that entries based on ObjectID are cached for a very long time and others expire almost immediately.

This allows 'kopia mount all' to see newly created snapshots.
This commit is contained in:
Jarek Kowalski
2017-08-29 21:37:36 -07:00
parent 4b1547fec9
commit 2c6213239a
3 changed files with 92 additions and 58 deletions

View File

@@ -2,42 +2,36 @@
package fscache
import (
"log"
"sync"
"sync/atomic"
"time"
"github.com/kopia/kopia/fs"
)
type cacheEntry struct {
id int64
id string
prev *cacheEntry
next *cacheEntry
entries fs.Entries
expireAfter time.Time
entries fs.Entries
}
// Cache maintains in-memory cache of recently-read data to speed up filesystem operations.
type Cache struct {
sync.Mutex
nextID int64
totalDirectoryEntries int
maxDirectories int
maxDirectoryEntries int
data map[int64]*cacheEntry
data map[string]*cacheEntry
// Doubly-linked list of entries, in access time order
head *cacheEntry
tail *cacheEntry
}
// AllocateID allocates new unique ID to be used when referring to cached items.
func (c *Cache) AllocateID() int64 {
if c == nil {
return 0
}
return atomic.AddInt64(&c.nextID, 1)
debug bool
}
func (c *Cache) moveToHead(e *cacheEntry) {
@@ -77,20 +71,37 @@ func (c *Cache) remove(e *cacheEntry) {
}
}
// Loader provides data to be stored in the cache.
type Loader func() (fs.Entries, error)
// GetEntries consults the cache and either retrieves the contents of directory listing from the cache
// or invokes the provides callback and adds the results to cache.
func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, error) {
func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) {
if c == nil {
return cb()
}
c.Lock()
if v, ok := c.data[id]; ok {
c.moveToHead(v)
c.Unlock()
return v.entries, nil
if v, ok := c.data[id]; id != "" && ok {
if time.Now().Before(v.expireAfter) {
c.moveToHead(v)
c.Unlock()
if c.debug {
log.Printf("cache hit for %q (valid until %v)", id, v.expireAfter)
}
return v.entries, nil
}
// time expired
if c.debug {
log.Printf("removing expired cache entry %q after %v", id, v.expireAfter)
}
c.removeEntryLocked(v)
}
if c.debug {
log.Printf("cache miss for %q", id)
}
raw, err := cb()
if err != nil {
return nil, err
@@ -103,18 +114,16 @@ func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries,
}
entry := &cacheEntry{
id: id,
entries: raw,
id: id,
entries: raw,
expireAfter: time.Now().Add(expirationTime),
}
c.addToHead(entry)
c.data[id] = entry
c.totalDirectoryEntries += len(raw)
for c.totalDirectoryEntries > c.maxDirectoryEntries || len(c.data) > c.maxDirectories {
toremove := c.tail
c.remove(toremove)
c.totalDirectoryEntries -= len(toremove.entries)
delete(c.data, toremove.id)
c.removeEntryLocked(c.tail)
}
c.Unlock()
@@ -122,6 +131,12 @@ func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries,
return raw, nil
}
func (c *Cache) removeEntryLocked(toremove *cacheEntry) {
c.remove(toremove)
c.totalDirectoryEntries -= len(toremove.entries)
delete(c.data, toremove.id)
}
// CacheOption modifies the behavior of FUSE node cache.
type CacheOption func(c *Cache)
@@ -142,7 +157,7 @@ func MaxCachedDirectoryEntries(count int) CacheOption {
// NewCache creates FUSE node cache.
func NewCache(options ...CacheOption) *Cache {
c := &Cache{
data: make(map[int64]*cacheEntry),
data: make(map[string]*cacheEntry),
maxDirectories: 1000,
maxDirectoryEntries: 100000,
}

View File

@@ -7,16 +7,19 @@
"reflect"
"runtime"
"testing"
"time"
"github.com/kopia/kopia/fs"
)
const expirationTime = 10 * time.Hour
type cacheSource struct {
data map[int64]fs.Entries
callCounter map[int64]int
data map[string]fs.Entries
callCounter map[string]int
}
func (cs *cacheSource) get(id int64) func() (fs.Entries, error) {
func (cs *cacheSource) get(id string) func() (fs.Entries, error) {
return func() (fs.Entries, error) {
cs.callCounter[id]++
d, ok := cs.data[id]
@@ -28,7 +31,7 @@ func (cs *cacheSource) get(id int64) func() (fs.Entries, error) {
}
}
func (cs *cacheSource) setEntryCount(id int64, cnt int) {
func (cs *cacheSource) setEntryCount(id string, cnt int) {
var fakeEntries fs.Entries
var fakeEntry fs.Entry
for i := 0; i < cnt; i++ {
@@ -41,18 +44,18 @@ func (cs *cacheSource) setEntryCount(id int64, cnt int) {
func newCacheSource() *cacheSource {
return &cacheSource{
data: make(map[int64]fs.Entries),
callCounter: make(map[int64]int),
data: make(map[string]fs.Entries),
callCounter: make(map[string]int),
}
}
type cacheVerifier struct {
cache *Cache
cacheSource *cacheSource
lastCallCounter map[int64]int
lastCallCounter map[string]int
}
func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id int64) {
func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id string) {
actual := cv.cacheSource.callCounter[id]
expected := cv.lastCallCounter[id] + 1
if actual != expected {
@@ -61,15 +64,15 @@ func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id int64) {
cv.reset()
}
func (cv *cacheVerifier) verifyCacheHit(t *testing.T, id int64) {
func (cv *cacheVerifier) verifyCacheHit(t *testing.T, id string) {
if !reflect.DeepEqual(cv.lastCallCounter, cv.cacheSource.callCounter) {
t.Errorf(errorPrefix()+" unexpected call counters for %v, got %v, expected %v", id, cv.cacheSource.callCounter, cv.lastCallCounter)
}
cv.reset()
}
func (cv *cacheVerifier) verifyCacheOrdering(t *testing.T, expectedOrdering ...int64) {
var actualOrdering []int64
func (cv *cacheVerifier) verifyCacheOrdering(t *testing.T, expectedOrdering ...string) {
var actualOrdering []string
var totalDirectoryEntries int
var totalDirectories int
for e := cv.cache.head; e != nil; e = e.next {
@@ -109,7 +112,7 @@ func errorPrefix() string {
}
func (cv *cacheVerifier) reset() {
cv.lastCallCounter = make(map[int64]int)
cv.lastCallCounter = make(map[string]int)
for k, v := range cv.cacheSource.callCounter {
cv.lastCallCounter[k] = v
}
@@ -126,13 +129,13 @@ func TestCache(t *testing.T) {
cs := newCacheSource()
cv := cacheVerifier{cacheSource: cs, cache: c}
var id1 int64 = 1
var id2 int64 = 2
var id3 int64 = 3
var id4 int64 = 4
var id5 int64 = 5
var id6 int64 = 6
var id7 int64 = 7
id1 := "1"
id2 := "2"
id3 := "3"
id4 := "4"
id5 := "5"
id6 := "6"
id7 := "7"
cs.setEntryCount(id1, 3)
cs.setEntryCount(id2, 3)
cs.setEntryCount(id3, 3)
@@ -144,57 +147,57 @@ func TestCache(t *testing.T) {
cv.verifyCacheOrdering(t)
// fetch id1
c.GetEntries(id1, cs.get(id1))
c.GetEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheMiss(t, id1)
cv.verifyCacheOrdering(t, id1)
// fetch id1 again - cache hit, no change
c.GetEntries(id1, cs.get(id1))
c.GetEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheHit(t, id1)
cv.verifyCacheOrdering(t, id1)
// fetch id2
c.GetEntries(id2, cs.get(id2))
c.GetEntries(id2, expirationTime, cs.get(id2))
cv.verifyCacheMiss(t, id2)
cv.verifyCacheOrdering(t, id2, id1)
// fetch id1 again - cache hit, id1 moved to the top of the LRU list
c.GetEntries(id1, cs.get(id1))
c.GetEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheHit(t, id1)
cv.verifyCacheOrdering(t, id1, id2)
// fetch id2 again
c.GetEntries(id2, cs.get(id2))
c.GetEntries(id2, expirationTime, cs.get(id2))
cv.verifyCacheHit(t, id2)
cv.verifyCacheOrdering(t, id2, id1)
// fetch id3
c.GetEntries(id3, cs.get(id3))
c.GetEntries(id3, expirationTime, cs.get(id3))
cv.verifyCacheMiss(t, id3)
cv.verifyCacheOrdering(t, id3, id2, id1)
// fetch id4
c.GetEntries(id4, cs.get(id4))
c.GetEntries(id4, expirationTime, cs.get(id4))
cv.verifyCacheMiss(t, id4)
cv.verifyCacheOrdering(t, id4, id3)
// fetch id1 again
c.GetEntries(id1, cs.get(id1))
c.GetEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheMiss(t, id1)
cv.verifyCacheOrdering(t, id1, id4)
// fetch id5, it's a big one that expels all but one
c.GetEntries(id5, cs.get(id5))
c.GetEntries(id5, expirationTime, cs.get(id5))
cv.verifyCacheMiss(t, id5)
cv.verifyCacheOrdering(t, id5, id1)
// fetch id6
c.GetEntries(id6, cs.get(id6))
c.GetEntries(id6, expirationTime, cs.get(id6))
cv.verifyCacheMiss(t, id6)
cv.verifyCacheOrdering(t, id6)
// fetch id7
c.GetEntries(id7, cs.get(id7))
c.GetEntries(id7, expirationTime, cs.get(id7))
cv.verifyCacheMiss(t, id7)
cv.verifyCacheOrdering(t, id6)
}

View File

@@ -10,6 +10,10 @@
"io/ioutil"
"os"
"sort"
"time"
"github.com/google/uuid"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/fscache"
@@ -51,7 +55,8 @@ func (f *fuseFileNode) ReadAll(ctx context.Context) ([]byte, error) {
type fuseDirectoryNode struct {
fuseNode
cacheID int64
cacheID string
cacheExpiration time.Duration
}
func (dir *fuseDirectoryNode) directory() fs.Directory {
@@ -77,7 +82,7 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse
}
func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) {
return dir.cache.GetEntries(dir.cacheID, func() (fs.Entries, error) {
return dir.cache.GetEntries(dir.cacheID, dir.cacheExpiration, func() (fs.Entries, error) {
entries, err := dir.directory().Readdir()
if err != nil {
return nil, err
@@ -142,7 +147,18 @@ func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) {
}
func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node {
return &fuseDirectoryNode{fuseNode{dir, cache}, cache.AllocateID()}
var cacheID string
var cacheExpiration time.Duration
if h, ok := dir.(repo.HasObjectID); ok {
cacheID = h.ObjectID().String()
cacheExpiration = 24 * time.Hour
} else {
cacheID = uuid.New().String()
cacheExpiration = 1 * time.Second
}
return &fuseDirectoryNode{fuseNode{dir, cache}, cacheID, cacheExpiration}
}
// NewDirectoryNode returns FUSE Node for a given fs.Directory