From 7725f00cb0bd4ea58a7873495cb0d03de8ece9dc Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 25 Sep 2016 16:22:28 -0700 Subject: [PATCH] added caching to 'kopia mount' --- cmd/kopia/command_mount.go | 14 ++- fuse/cache.go | 156 +++++++++++++++++++++++++++++ fuse/cache_test.go | 200 +++++++++++++++++++++++++++++++++++++ fuse/fusefs.go | 24 +++-- 4 files changed, 381 insertions(+), 13 deletions(-) create mode 100644 fuse/cache.go create mode 100644 fuse/cache_test.go diff --git a/cmd/kopia/command_mount.go b/cmd/kopia/command_mount.go index 158042740..18b444d16 100644 --- a/cmd/kopia/command_mount.go +++ b/cmd/kopia/command_mount.go @@ -16,9 +16,11 @@ var ( mountCommand = app.Command("mount", "Mount repository object as a local filesystem.") - mountObjectID = mountCommand.Arg("path", "Identifier of the directory to mount.").Required().String() - mountPoint = mountCommand.Arg("mountPoint", "Mount point").Required().ExistingDir() - mountTraceFS = mountCommand.Flag("trace-fs", "Trace filesystem operations").Bool() + mountObjectID = mountCommand.Arg("path", "Identifier of the directory to mount.").Required().String() + mountPoint = mountCommand.Arg("mountPoint", "Mount point").Required().ExistingDir() + mountTraceFS = mountCommand.Flag("trace-fs", "Trace filesystem operations").Bool() + mountMaxCachedEntries = mountCommand.Flag("max-cached-entries", "Limit the number of cached directories").Default("100000").Int() + mountMaxCachedDirectories = mountCommand.Flag("max-cached-dirs", "Limit the number of cached directories").Default("100").Int() ) type root struct { @@ -50,7 +52,11 @@ func runMountCommand(context *kingpin.ParseContext) error { entry = loggingfs.Wrap(entry).(fs.Directory) } - rootNode := kopiafuse.NewDirectoryNode(entry) + cache := kopiafuse.NewCache( + kopiafuse.MaxCachedDirectories(*mountMaxCachedDirectories), + kopiafuse.MaxCachedDirectoryEntries(*mountMaxCachedEntries), + ) + rootNode := kopiafuse.NewDirectoryNode(entry, cache) fusefs.Serve(fuseConnection, &root{rootNode}) diff --git a/fuse/cache.go b/fuse/cache.go new file mode 100644 index 000000000..3e93c1f42 --- /dev/null +++ b/fuse/cache.go @@ -0,0 +1,156 @@ +// +build !windows + +// Package fuse implements FUSE filesystem nodes for mounting contents of filesystem stored in repository. +// +// The FUSE implementation used is from bazil.org/fuse +package fuse + +import ( + "sync" + "sync/atomic" + + "github.com/kopia/kopia/fs" +) + +type cacheEntry struct { + id int64 + prev *cacheEntry + next *cacheEntry + + entries fs.Entries +} + +// Cache maintains in-memory cache of recently-read data to speed up filesystem operations. +type Cache struct { + sync.Mutex + + nextID int64 + totalDirectoryEntries int + maxDirectories int + maxDirectoryEntries int + data map[int64]*cacheEntry + + // Doubly-linked list of entries, in access time order + head *cacheEntry + tail *cacheEntry +} + +func (c *Cache) allocateID() int64 { + if c == nil { + return 0 + } + + return atomic.AddInt64(&c.nextID, 1) +} + +func (c *Cache) moveToHead(e *cacheEntry) { + if e == c.head { + // Already at head, no change. + return + } + + c.remove(e) + c.addToHead(e) +} + +func (c *Cache) addToHead(e *cacheEntry) { + if c.head != nil { + e.next = c.head + c.head.prev = e + c.head = e + } else { + c.head = e + c.tail = e + } +} + +func (c *Cache) remove(e *cacheEntry) { + if e.prev == nil { + // First element. + c.head = e.next + } else { + e.prev.next = e.next + } + + if e.next == nil { + // Last element + c.tail = e.prev + } else { + e.next.prev = e.prev + } +} + +func (c *Cache) getEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, error) { + if c == nil { + return cb() + } + + c.Lock() + if v, ok := c.data[id]; ok { + c.moveToHead(v) + c.Unlock() + return v.entries, nil + } + + raw, err := cb() + if err != nil { + return nil, err + } + + if len(raw) > c.maxDirectoryEntries { + // no point caching since it would not fit anyway, just return it. + c.Unlock() + return raw, nil + } + + entry := &cacheEntry{ + id: id, + entries: raw, + } + c.addToHead(entry) + c.data[id] = entry + + c.totalDirectoryEntries += len(raw) + for c.totalDirectoryEntries > c.maxDirectoryEntries || len(c.data) > c.maxDirectories { + toremove := c.tail + c.remove(toremove) + c.totalDirectoryEntries -= len(toremove.entries) + delete(c.data, toremove.id) + } + + c.Unlock() + + return raw, nil +} + +// CacheOption modifies the behavior of FUSE node cache. +type CacheOption func(c *Cache) + +// MaxCachedDirectories configures cache to allow at most the given number of cached directories. +func MaxCachedDirectories(count int) CacheOption { + return func(c *Cache) { + c.maxDirectories = count + } +} + +// MaxCachedDirectoryEntries configures cache to allow at most the given number entries in cached directories. +func MaxCachedDirectoryEntries(count int) CacheOption { + return func(c *Cache) { + c.maxDirectoryEntries = count + } +} + +// NewCache creates FUSE node cache. +func NewCache(options ...CacheOption) *Cache { + c := &Cache{ + data: make(map[int64]*cacheEntry), + maxDirectories: 1000, + maxDirectoryEntries: 100000, + } + + for _, o := range options { + o(c) + } + + return c +} diff --git a/fuse/cache_test.go b/fuse/cache_test.go new file mode 100644 index 000000000..3325df3d8 --- /dev/null +++ b/fuse/cache_test.go @@ -0,0 +1,200 @@ +package fuse + +import ( + "errors" + "fmt" + "path/filepath" + "reflect" + "runtime" + "testing" + + "github.com/kopia/kopia/fs" +) + +type cacheSource struct { + data map[int64]fs.Entries + callCounter map[int64]int +} + +func (cs *cacheSource) get(id int64) func() (fs.Entries, error) { + return func() (fs.Entries, error) { + cs.callCounter[id]++ + d, ok := cs.data[id] + if !ok { + return nil, errors.New("no such id") + } + + return d, nil + } +} + +func (cs *cacheSource) setEntryCount(id int64, cnt int) { + var fakeEntries fs.Entries + var fakeEntry fs.Entry + for i := 0; i < cnt; i++ { + fakeEntries = append(fakeEntries, fakeEntry) + } + + cs.data[id] = fakeEntries + cs.callCounter[id] = 0 +} + +func newCacheSource() *cacheSource { + return &cacheSource{ + data: make(map[int64]fs.Entries), + callCounter: make(map[int64]int), + } +} + +type cacheVerifier struct { + cache *Cache + cacheSource *cacheSource + lastCallCounter map[int64]int +} + +func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id int64) { + actual := cv.cacheSource.callCounter[id] + expected := cv.lastCallCounter[id] + 1 + if actual != expected { + t.Errorf(errorPrefix()+"invalid call counter for %v, got %v, expected %v", id, actual, expected) + } + cv.reset() +} + +func (cv *cacheVerifier) verifyCacheHit(t *testing.T, id int64) { + if !reflect.DeepEqual(cv.lastCallCounter, cv.cacheSource.callCounter) { + t.Errorf(errorPrefix()+" unexpected call counters for %v, got %v, expected %v", id, cv.cacheSource.callCounter, cv.lastCallCounter) + } + cv.reset() +} + +func (cv *cacheVerifier) verifyCacheOrdering(t *testing.T, expectedOrdering ...int64) { + var actualOrdering []int64 + var totalDirectoryEntries int + var totalDirectories int + for e := cv.cache.head; e != nil; e = e.next { + actualOrdering = append(actualOrdering, e.id) + totalDirectoryEntries += len(e.entries) + totalDirectories++ + } + + if cv.cache.totalDirectoryEntries != totalDirectoryEntries { + t.Errorf("invalid totalDirectoryEntries: %v, expected %v", cv.cache.totalDirectoryEntries, totalDirectoryEntries) + } + + if len(cv.cache.data) != totalDirectories { + t.Errorf("invalid total directories: %v, expected %v", len(cv.cache.data), totalDirectories) + } + + if !reflect.DeepEqual(actualOrdering, expectedOrdering) { + t.Errorf(errorPrefix()+"unexpected ordering: %v, expected: %v", actualOrdering, expectedOrdering) + } + + if totalDirectories > cv.cache.maxDirectories { + t.Errorf(errorPrefix()+"total directories exceeds limit: %v, expected %v", totalDirectories, cv.cache.maxDirectories) + } + + if totalDirectoryEntries > cv.cache.maxDirectoryEntries { + t.Errorf(errorPrefix()+"total directory entries exceeds limit: %v, expected %v", totalDirectoryEntries, cv.cache.maxDirectoryEntries) + } + +} + +func errorPrefix() string { + if _, fn, line, ok := runtime.Caller(2); ok { + return fmt.Sprintf("called from %v:%v: ", filepath.Base(fn), line) + } + + return "" +} + +func (cv *cacheVerifier) reset() { + cv.lastCallCounter = make(map[int64]int) + for k, v := range cv.cacheSource.callCounter { + cv.lastCallCounter[k] = v + } +} + +func TestCache(t *testing.T) { + c := NewCache( + MaxCachedDirectories(4), + MaxCachedDirectoryEntries(100), + ) + if len(c.data) != 0 || c.totalDirectoryEntries != 0 || c.head != nil || c.tail != nil { + t.Errorf("invalid initial state: %v %v %v %v", c.data, c.totalDirectoryEntries, c.head, c.tail) + } + + cs := newCacheSource() + cv := cacheVerifier{cacheSource: cs, cache: c} + var id1 int64 = 1 + var id2 int64 = 2 + var id3 int64 = 3 + var id4 int64 = 4 + var id5 int64 = 5 + var id6 int64 = 6 + var id7 int64 = 7 + cs.setEntryCount(id1, 3) + cs.setEntryCount(id2, 3) + cs.setEntryCount(id3, 3) + cs.setEntryCount(id4, 95) + cs.setEntryCount(id5, 70) + cs.setEntryCount(id6, 100) + cs.setEntryCount(id7, 101) + + cv.verifyCacheOrdering(t) + + // fetch id1 + c.getEntries(id1, cs.get(id1)) + cv.verifyCacheMiss(t, id1) + cv.verifyCacheOrdering(t, id1) + + // fetch id1 again - cache hit, no change + c.getEntries(id1, cs.get(id1)) + cv.verifyCacheHit(t, id1) + cv.verifyCacheOrdering(t, id1) + + // fetch id2 + c.getEntries(id2, cs.get(id2)) + cv.verifyCacheMiss(t, id2) + cv.verifyCacheOrdering(t, id2, id1) + + // fetch id1 again - cache hit, id1 moved to the top of the LRU list + c.getEntries(id1, cs.get(id1)) + cv.verifyCacheHit(t, id1) + cv.verifyCacheOrdering(t, id1, id2) + + // fetch id2 again + c.getEntries(id2, cs.get(id2)) + cv.verifyCacheHit(t, id2) + cv.verifyCacheOrdering(t, id2, id1) + + // fetch id3 + c.getEntries(id3, cs.get(id3)) + cv.verifyCacheMiss(t, id3) + cv.verifyCacheOrdering(t, id3, id2, id1) + + // fetch id4 + c.getEntries(id4, cs.get(id4)) + cv.verifyCacheMiss(t, id4) + cv.verifyCacheOrdering(t, id4, id3) + + // fetch id1 again + c.getEntries(id1, cs.get(id1)) + cv.verifyCacheMiss(t, id1) + cv.verifyCacheOrdering(t, id1, id4) + + // fetch id5, it's a big one that expels all but one + c.getEntries(id5, cs.get(id5)) + cv.verifyCacheMiss(t, id5) + cv.verifyCacheOrdering(t, id5, id1) + + // fetch id6 + c.getEntries(id6, cs.get(id6)) + cv.verifyCacheMiss(t, id6) + cv.verifyCacheOrdering(t, id6) + + // fetch id7 + c.getEntries(id7, cs.get(id7)) + cv.verifyCacheMiss(t, id7) + cv.verifyCacheOrdering(t, id6) +} diff --git a/fuse/fusefs.go b/fuse/fusefs.go index 869cc7959..5cd17f205 100644 --- a/fuse/fusefs.go +++ b/fuse/fusefs.go @@ -20,6 +20,7 @@ type fuseNode struct { entry fs.Entry + cache *Cache } func (n *fuseNode) Attr(ctx context.Context, a *fuse.Attr) error { @@ -48,6 +49,7 @@ func (f *fuseFileNode) ReadAll(ctx context.Context) ([]byte, error) { type fuseDirectoryNode struct { fuseNode + cacheID int64 } func (dir *fuseDirectoryNode) directory() fs.Directory { @@ -69,12 +71,12 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse return nil, fuse.ENOENT } - return newFuseNode(e) + return newFuseNode(e, dir.cache) } func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) { - return dir.directory().Readdir() + return dir.cache.getEntries(dir.cacheID, func() (fs.Entries, error) { return dir.directory().Readdir() }) } func (dir *fuseDirectoryNode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { @@ -114,20 +116,24 @@ func (sl *fuseSymlinkNode) Readlink(ctx context.Context, req *fuse.ReadlinkReque return sl.entry.(fs.Symlink).Readlink() } -func newFuseNode(e fs.Entry) (fusefs.Node, error) { +func newFuseNode(e fs.Entry, cache *Cache) (fusefs.Node, error) { switch e := e.(type) { case fs.Directory: - return &fuseDirectoryNode{fuseNode{e}}, nil + return newDirectoryNode(e, cache), nil case fs.File: - return &fuseFileNode{fuseNode{e}}, nil + return &fuseFileNode{fuseNode{e, cache}}, nil case fs.Symlink: - return &fuseSymlinkNode{fuseNode{e}}, nil + return &fuseSymlinkNode{fuseNode{e, cache}}, nil default: return nil, fmt.Errorf("entry type not supported: %v", e.Metadata().Type) } } -// NewDirectoryNode returns FUSE Node for a given fs.Directory -func NewDirectoryNode(dir fs.Directory) fusefs.Node { - return &fuseDirectoryNode{fuseNode{dir}} +func newDirectoryNode(dir fs.Directory, cache *Cache) fusefs.Node { + return &fuseDirectoryNode{fuseNode{dir, cache}, cache.allocateID()} +} + +// NewDirectoryNode returns FUSE Node for a given fs.Directory +func NewDirectoryNode(dir fs.Directory, cache *Cache) fusefs.Node { + return newDirectoryNode(dir, cache) }