From b08a35e92b80cd73f0558ac2237eac468eb87df6 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 4 Sep 2017 16:27:46 -0700 Subject: [PATCH] refactored FS cache into reusable wrapper that can be applied on top of any filesystem abstraction --- cli/command_mount.go | 17 ++--- cli/command_mount_fuse.go | 5 +- cli/command_mount_nofuse.go | 3 +- cli/command_mount_webdav.go | 5 +- {internal/fscache => fs/cachefs}/cache.go | 59 +++++++--------- .../fscache => fs/cachefs}/cache_test.go | 32 ++++----- fs/cachefs/cachefs.go | 68 +++++++++++++++++++ internal/fusemount/fusefs.go | 28 +++----- internal/webdavmount/webdavmount.go | 12 ++-- 9 files changed, 139 insertions(+), 90 deletions(-) rename {internal/fscache => fs/cachefs}/cache.go (72%) rename {internal/fscache => fs/cachefs}/cache_test.go (88%) create mode 100644 fs/cachefs/cachefs.go diff --git a/cli/command_mount.go b/cli/command_mount.go index 617d2a601..019bfaae1 100644 --- a/cli/command_mount.go +++ b/cli/command_mount.go @@ -5,11 +5,10 @@ "time" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/cachefs" "github.com/kopia/kopia/fs/loggingfs" "github.com/kopia/kopia/fs/repofs" - "github.com/kopia/kopia/internal/fscache" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -54,16 +53,18 @@ func runMountCommand(context *kingpin.ParseContext) error { entry = loggingfs.Wrap(entry).(fs.Directory) } - cache := fscache.NewCache( - fscache.MaxCachedDirectories(*mountMaxCachedDirectories), - fscache.MaxCachedDirectoryEntries(*mountMaxCachedEntries), - ) + cache := cachefs.NewCache(&cachefs.Options{ + MaxCachedDirectories: *mountMaxCachedDirectories, + MaxCachedEntries: *mountMaxCachedEntries, + }) + + entry = cachefs.Wrap(entry, cache).(fs.Directory) switch *mountMode { case "FUSE": - return mountDirectoryFUSE(entry, *mountPoint, cache) + return mountDirectoryFUSE(entry, *mountPoint) case "WEBDAV": - return mountDirectoryWebDAV(entry, *mountPoint, cache) + return mountDirectoryWebDAV(entry, *mountPoint) default: return fmt.Errorf("unsupported mode: %q", *mountMode) } diff --git a/cli/command_mount_fuse.go b/cli/command_mount_fuse.go index b2f4b9693..0f4a98cd0 100644 --- a/cli/command_mount_fuse.go +++ b/cli/command_mount_fuse.go @@ -6,7 +6,6 @@ "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/internal/fscache" "github.com/kopia/kopia/internal/fusemount" ) @@ -22,8 +21,8 @@ func (r *root) Root() (fusefs.Node, error) { mountMode = mountCommand.Flag("mode", "Mount mode").Default("FUSE").Enum("WEBDAV", "FUSE") ) -func mountDirectoryFUSE(entry fs.Directory, mountPoint string, cache *fscache.Cache) error { - rootNode := fusemount.NewDirectoryNode(entry, cache) +func mountDirectoryFUSE(entry fs.Directory, mountPoint string) error { + rootNode := fusemount.NewDirectoryNode(entry) fuseConnection, err := fuse.Mount( mountPoint, diff --git a/cli/command_mount_nofuse.go b/cli/command_mount_nofuse.go index af389af08..7a161b49c 100644 --- a/cli/command_mount_nofuse.go +++ b/cli/command_mount_nofuse.go @@ -6,13 +6,12 @@ "fmt" "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/internal/fscache" ) var ( mountMode = mountCommand.Flag("mode", "Mount mode").Default("WEBDAV").Enum("WEBDAV") ) -func mountDirectoryFUSE(entry fs.Directory, mountPoint string, cache *fscache.Cache) error { +func mountDirectoryFUSE(entry fs.Directory, mountPoint string) error { return fmt.Errorf("FUSE is not supported") } diff --git a/cli/command_mount_webdav.go b/cli/command_mount_webdav.go index d7c32ba33..ed7d3424b 100644 --- a/cli/command_mount_webdav.go +++ b/cli/command_mount_webdav.go @@ -10,14 +10,13 @@ "golang.org/x/net/webdav" "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/internal/fscache" "github.com/kopia/kopia/internal/webdavmount" ) -func mountDirectoryWebDAV(entry fs.Directory, mountPoint string, cache *fscache.Cache) error { +func mountDirectoryWebDAV(entry fs.Directory, mountPoint string) error { mux := http.NewServeMux() mux.Handle("/", &webdav.Handler{ - FileSystem: webdavmount.WebDAVFS(entry, cache), + FileSystem: webdavmount.WebDAVFS(entry), LockSystem: webdav.NewMemLS(), Logger: func(r *http.Request, err error) { var maybeRange string diff --git a/internal/fscache/cache.go b/fs/cachefs/cache.go similarity index 72% rename from internal/fscache/cache.go rename to fs/cachefs/cache.go index 8ed93604c..0e0329a28 100644 --- a/internal/fscache/cache.go +++ b/fs/cachefs/cache.go @@ -1,5 +1,4 @@ -// Package fscache implements in-memory cache of filesystem entries. -package fscache +package cachefs import ( "log" @@ -21,7 +20,7 @@ type cacheEntry struct { // Cache maintains in-memory cache of recently-read data to speed up filesystem operations. type Cache struct { - sync.Mutex + mu sync.Mutex totalDirectoryEntries int maxDirectories int @@ -81,24 +80,24 @@ func (c *Cache) Readdir(d fs.Directory) (fs.Entries, error) { if h, ok := d.(repo.HasObjectID); ok { cacheID := h.ObjectID().String() cacheExpiration := 24 * time.Hour - return c.GetEntries(cacheID, cacheExpiration, d.Readdir) + return c.getEntries(cacheID, cacheExpiration, d.Readdir) } return d.Readdir() } -// GetEntries consults the cache and either retrieves the contents of directory listing from the cache +// getEntries consults the cache and either retrieves the contents of directory listing from the cache // or invokes the provides callback and adds the results to cache. -func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) { +func (c *Cache) getEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) { if c == nil { return cb() } - c.Lock() + c.mu.Lock() if v, ok := c.data[id]; id != "" && ok { if time.Now().Before(v.expireAfter) { c.moveToHead(v) - c.Unlock() + c.mu.Unlock() if c.debug { log.Printf("cache hit for %q (valid until %v)", id, v.expireAfter) } @@ -122,7 +121,7 @@ func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) ( if len(raw) > c.maxDirectoryEntries { // no point caching since it would not fit anyway, just return it. - c.Unlock() + c.mu.Unlock() return raw, nil } @@ -139,7 +138,7 @@ func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) ( c.removeEntryLocked(c.tail) } - c.Unlock() + c.mu.Unlock() return raw, nil } @@ -150,34 +149,26 @@ func (c *Cache) removeEntryLocked(toremove *cacheEntry) { delete(c.data, toremove.id) } -// CacheOption modifies the behavior of FUSE node cache. -type CacheOption func(c *Cache) - -// MaxCachedDirectories configures cache to allow at most the given number of cached directories. -func MaxCachedDirectories(count int) CacheOption { - return func(c *Cache) { - c.maxDirectories = count - } +// Options specifies behavior of filesystem Cache. +type Options struct { + MaxCachedDirectories int + MaxCachedEntries int } -// MaxCachedDirectoryEntries configures cache to allow at most the given number entries in cached directories. -func MaxCachedDirectoryEntries(count int) CacheOption { - return func(c *Cache) { - c.maxDirectoryEntries = count - } +var defaultOptions = &Options{ + MaxCachedDirectories: 1000, + MaxCachedEntries: 100000, } -// NewCache creates FUSE node cache. -func NewCache(options ...CacheOption) *Cache { - c := &Cache{ +// NewCache creates filesystem cache. +func NewCache(options *Options) *Cache { + if options == nil { + options = defaultOptions + } + + return &Cache{ data: make(map[string]*cacheEntry), - maxDirectories: 1000, - maxDirectoryEntries: 100000, + maxDirectories: options.MaxCachedDirectories, + maxDirectoryEntries: options.MaxCachedEntries, } - - for _, o := range options { - o(c) - } - - return c } diff --git a/internal/fscache/cache_test.go b/fs/cachefs/cache_test.go similarity index 88% rename from internal/fscache/cache_test.go rename to fs/cachefs/cache_test.go index 22ca6abb2..a2c6a7904 100644 --- a/internal/fscache/cache_test.go +++ b/fs/cachefs/cache_test.go @@ -1,4 +1,4 @@ -package fscache +package cachefs import ( "errors" @@ -119,10 +119,10 @@ func (cv *cacheVerifier) reset() { } func TestCache(t *testing.T) { - c := NewCache( - MaxCachedDirectories(4), - MaxCachedDirectoryEntries(100), - ) + c := NewCache(&Options{ + MaxCachedDirectories: 4, + MaxCachedEntries: 100, + }) if len(c.data) != 0 || c.totalDirectoryEntries != 0 || c.head != nil || c.tail != nil { t.Errorf("invalid initial state: %v %v %v %v", c.data, c.totalDirectoryEntries, c.head, c.tail) } @@ -147,57 +147,57 @@ func TestCache(t *testing.T) { cv.verifyCacheOrdering(t) // fetch id1 - c.GetEntries(id1, expirationTime, cs.get(id1)) + c.getEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheMiss(t, id1) cv.verifyCacheOrdering(t, id1) // fetch id1 again - cache hit, no change - c.GetEntries(id1, expirationTime, cs.get(id1)) + c.getEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheHit(t, id1) cv.verifyCacheOrdering(t, id1) // fetch id2 - c.GetEntries(id2, expirationTime, cs.get(id2)) + c.getEntries(id2, expirationTime, cs.get(id2)) cv.verifyCacheMiss(t, id2) cv.verifyCacheOrdering(t, id2, id1) // fetch id1 again - cache hit, id1 moved to the top of the LRU list - c.GetEntries(id1, expirationTime, cs.get(id1)) + c.getEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheHit(t, id1) cv.verifyCacheOrdering(t, id1, id2) // fetch id2 again - c.GetEntries(id2, expirationTime, cs.get(id2)) + c.getEntries(id2, expirationTime, cs.get(id2)) cv.verifyCacheHit(t, id2) cv.verifyCacheOrdering(t, id2, id1) // fetch id3 - c.GetEntries(id3, expirationTime, cs.get(id3)) + c.getEntries(id3, expirationTime, cs.get(id3)) cv.verifyCacheMiss(t, id3) cv.verifyCacheOrdering(t, id3, id2, id1) // fetch id4 - c.GetEntries(id4, expirationTime, cs.get(id4)) + c.getEntries(id4, expirationTime, cs.get(id4)) cv.verifyCacheMiss(t, id4) cv.verifyCacheOrdering(t, id4, id3) // fetch id1 again - c.GetEntries(id1, expirationTime, cs.get(id1)) + c.getEntries(id1, expirationTime, cs.get(id1)) cv.verifyCacheMiss(t, id1) cv.verifyCacheOrdering(t, id1, id4) // fetch id5, it's a big one that expels all but one - c.GetEntries(id5, expirationTime, cs.get(id5)) + c.getEntries(id5, expirationTime, cs.get(id5)) cv.verifyCacheMiss(t, id5) cv.verifyCacheOrdering(t, id5, id1) // fetch id6 - c.GetEntries(id6, expirationTime, cs.get(id6)) + c.getEntries(id6, expirationTime, cs.get(id6)) cv.verifyCacheMiss(t, id6) cv.verifyCacheOrdering(t, id6) // fetch id7 - c.GetEntries(id7, expirationTime, cs.get(id7)) + c.getEntries(id7, expirationTime, cs.get(id7)) cv.verifyCacheMiss(t, id7) cv.verifyCacheOrdering(t, id6) } diff --git a/fs/cachefs/cachefs.go b/fs/cachefs/cachefs.go new file mode 100644 index 000000000..c79182cb6 --- /dev/null +++ b/fs/cachefs/cachefs.go @@ -0,0 +1,68 @@ +// Package cachefs implements a wrapper that caches filesystem actions. +package cachefs + +import ( + "github.com/kopia/kopia/fs" +) + +// DirectoryCacher reads and potentially caches directory entries for a given directory. +type DirectoryCacher interface { + Readdir(d fs.Directory) (fs.Entries, error) +} + +type context struct { + cacher DirectoryCacher +} + +type directory struct { + ctx *context + fs.Directory +} + +func (d *directory) Readdir() (fs.Entries, error) { + entries, err := d.ctx.cacher.Readdir(d.Directory) + if err != nil { + return entries, err + } + + wrapped := make(fs.Entries, len(entries)) + for i, entry := range entries { + wrapped[i] = wrapWithContext(entry, d.ctx) + } + return wrapped, err +} + +type file struct { + ctx *context + fs.File +} + +type symlink struct { + ctx *context + fs.Symlink +} + +// Wrap returns an Entry that wraps another Entry and caches directory reads. +func Wrap(e fs.Entry, cacher DirectoryCacher) fs.Entry { + return wrapWithContext(e, &context{cacher}) +} + +func wrapWithContext(e fs.Entry, opts *context) fs.Entry { + switch e := e.(type) { + case fs.Directory: + return fs.Directory(&directory{opts, e}) + + case fs.File: + return fs.File(&file{opts, e}) + + case fs.Symlink: + return fs.Symlink(&symlink{opts, e}) + + default: + return e + } +} + +var _ fs.Directory = &directory{} +var _ fs.File = &file{} +var _ fs.Symlink = &symlink{} diff --git a/internal/fusemount/fusefs.go b/internal/fusemount/fusefs.go index 6ec856e78..873ea8c6d 100644 --- a/internal/fusemount/fusefs.go +++ b/internal/fusemount/fusefs.go @@ -11,7 +11,6 @@ "os" "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/internal/fscache" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" @@ -21,7 +20,6 @@ type fuseNode struct { entry fs.Entry - cache *fscache.Cache } func (n *fuseNode) Attr(ctx context.Context, a *fuse.Attr) error { @@ -57,7 +55,7 @@ func (dir *fuseDirectoryNode) directory() fs.Directory { } func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fusefs.Node, error) { - entries, err := dir.readPossiblyCachedReaddir() + entries, err := dir.directory().Readdir() if err != nil { if os.IsNotExist(err) { return nil, fuse.ENOENT @@ -71,15 +69,11 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse return nil, fuse.ENOENT } - return newFuseNode(e, dir.cache) -} - -func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) { - return dir.cache.Readdir(dir.directory()) + return newFuseNode(e) } func (dir *fuseDirectoryNode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { - entries, err := dir.readPossiblyCachedReaddir() + entries, err := dir.directory().Readdir() if err != nil { return nil, err } @@ -115,24 +109,24 @@ func (sl *fuseSymlinkNode) Readlink(ctx context.Context, req *fuse.ReadlinkReque return sl.entry.(fs.Symlink).Readlink() } -func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) { +func newFuseNode(e fs.Entry) (fusefs.Node, error) { switch e := e.(type) { case fs.Directory: - return newDirectoryNode(e, cache), nil + return newDirectoryNode(e), nil case fs.File: - return &fuseFileNode{fuseNode{e, cache}}, nil + return &fuseFileNode{fuseNode{e}}, nil case fs.Symlink: - return &fuseSymlinkNode{fuseNode{e, cache}}, nil + return &fuseSymlinkNode{fuseNode{e}}, nil default: return nil, fmt.Errorf("entry type not supported: %v", e.Metadata().Type) } } -func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node { - return &fuseDirectoryNode{fuseNode{dir, cache}} +func newDirectoryNode(dir fs.Directory) fusefs.Node { + return &fuseDirectoryNode{fuseNode{dir}} } // NewDirectoryNode returns FUSE Node for a given fs.Directory -func NewDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node { - return newDirectoryNode(dir, cache) +func NewDirectoryNode(dir fs.Directory) fusefs.Node { + return newDirectoryNode(dir) } diff --git a/internal/webdavmount/webdavmount.go b/internal/webdavmount/webdavmount.go index 53a65a800..513cb333d 100644 --- a/internal/webdavmount/webdavmount.go +++ b/internal/webdavmount/webdavmount.go @@ -10,7 +10,6 @@ "time" "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/internal/fscache" "golang.org/x/net/context" "golang.org/x/net/webdav" ) @@ -88,7 +87,7 @@ type webdavDir struct { } func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) { - entries, err := d.w.cache.Readdir(d.entry) + entries, err := d.entry.Readdir() if err != nil { return nil, err } @@ -152,8 +151,7 @@ func (i webdavFileInfo) Sys() interface{} { } type webdavFS struct { - dir fs.Directory - cache *fscache.Cache + dir fs.Directory } func (w *webdavFS) Mkdir(ctx context.Context, path string, mode os.FileMode) error { @@ -203,7 +201,7 @@ func (w *webdavFS) findEntry(path string) (fs.Entry, error) { return nil, fmt.Errorf("%q not found in %q (not a directory)", p, strings.Join(parts[0:i], "/")) } - entries, err := w.cache.Readdir(d) + entries, err := d.Readdir() if err != nil { return nil, err } @@ -230,6 +228,6 @@ func removeEmpty(s []string) []string { } // WebDAVFS returns a webdav.FileSystem implementation for a given directory. -func WebDAVFS(entry fs.Directory, cache *fscache.Cache) webdav.FileSystem { - return &webdavFS{entry, cache} +func WebDAVFS(entry fs.Directory) webdav.FileSystem { + return &webdavFS{entry} }