refactored FS cache into reusable wrapper that can be applied on top of any filesystem abstraction

This commit is contained in:
Jarek Kowalski
2017-09-04 16:27:46 -07:00
parent 8435ed4c80
commit b08a35e92b
9 changed files with 139 additions and 90 deletions

View File

@@ -5,11 +5,10 @@
"time"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/cachefs"
"github.com/kopia/kopia/fs/loggingfs"
"github.com/kopia/kopia/fs/repofs"
"github.com/kopia/kopia/internal/fscache"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
@@ -54,16 +53,18 @@ func runMountCommand(context *kingpin.ParseContext) error {
entry = loggingfs.Wrap(entry).(fs.Directory)
}
cache := fscache.NewCache(
fscache.MaxCachedDirectories(*mountMaxCachedDirectories),
fscache.MaxCachedDirectoryEntries(*mountMaxCachedEntries),
)
cache := cachefs.NewCache(&cachefs.Options{
MaxCachedDirectories: *mountMaxCachedDirectories,
MaxCachedEntries: *mountMaxCachedEntries,
})
entry = cachefs.Wrap(entry, cache).(fs.Directory)
switch *mountMode {
case "FUSE":
return mountDirectoryFUSE(entry, *mountPoint, cache)
return mountDirectoryFUSE(entry, *mountPoint)
case "WEBDAV":
return mountDirectoryWebDAV(entry, *mountPoint, cache)
return mountDirectoryWebDAV(entry, *mountPoint)
default:
return fmt.Errorf("unsupported mode: %q", *mountMode)
}

View File

@@ -6,7 +6,6 @@
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/fscache"
"github.com/kopia/kopia/internal/fusemount"
)
@@ -22,8 +21,8 @@ func (r *root) Root() (fusefs.Node, error) {
mountMode = mountCommand.Flag("mode", "Mount mode").Default("FUSE").Enum("WEBDAV", "FUSE")
)
func mountDirectoryFUSE(entry fs.Directory, mountPoint string, cache *fscache.Cache) error {
rootNode := fusemount.NewDirectoryNode(entry, cache)
func mountDirectoryFUSE(entry fs.Directory, mountPoint string) error {
rootNode := fusemount.NewDirectoryNode(entry)
fuseConnection, err := fuse.Mount(
mountPoint,

View File

@@ -6,13 +6,12 @@
"fmt"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/fscache"
)
var (
mountMode = mountCommand.Flag("mode", "Mount mode").Default("WEBDAV").Enum("WEBDAV")
)
func mountDirectoryFUSE(entry fs.Directory, mountPoint string, cache *fscache.Cache) error {
func mountDirectoryFUSE(entry fs.Directory, mountPoint string) error {
return fmt.Errorf("FUSE is not supported")
}

View File

@@ -10,14 +10,13 @@
"golang.org/x/net/webdav"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/fscache"
"github.com/kopia/kopia/internal/webdavmount"
)
func mountDirectoryWebDAV(entry fs.Directory, mountPoint string, cache *fscache.Cache) error {
func mountDirectoryWebDAV(entry fs.Directory, mountPoint string) error {
mux := http.NewServeMux()
mux.Handle("/", &webdav.Handler{
FileSystem: webdavmount.WebDAVFS(entry, cache),
FileSystem: webdavmount.WebDAVFS(entry),
LockSystem: webdav.NewMemLS(),
Logger: func(r *http.Request, err error) {
var maybeRange string

View File

@@ -1,5 +1,4 @@
// Package fscache implements in-memory cache of filesystem entries.
package fscache
package cachefs
import (
"log"
@@ -21,7 +20,7 @@ type cacheEntry struct {
// Cache maintains in-memory cache of recently-read data to speed up filesystem operations.
type Cache struct {
sync.Mutex
mu sync.Mutex
totalDirectoryEntries int
maxDirectories int
@@ -81,24 +80,24 @@ func (c *Cache) Readdir(d fs.Directory) (fs.Entries, error) {
if h, ok := d.(repo.HasObjectID); ok {
cacheID := h.ObjectID().String()
cacheExpiration := 24 * time.Hour
return c.GetEntries(cacheID, cacheExpiration, d.Readdir)
return c.getEntries(cacheID, cacheExpiration, d.Readdir)
}
return d.Readdir()
}
// GetEntries consults the cache and either retrieves the contents of directory listing from the cache
// getEntries consults the cache and either retrieves the contents of directory listing from the cache
// or invokes the provides callback and adds the results to cache.
func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) {
func (c *Cache) getEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) {
if c == nil {
return cb()
}
c.Lock()
c.mu.Lock()
if v, ok := c.data[id]; id != "" && ok {
if time.Now().Before(v.expireAfter) {
c.moveToHead(v)
c.Unlock()
c.mu.Unlock()
if c.debug {
log.Printf("cache hit for %q (valid until %v)", id, v.expireAfter)
}
@@ -122,7 +121,7 @@ func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (
if len(raw) > c.maxDirectoryEntries {
// no point caching since it would not fit anyway, just return it.
c.Unlock()
c.mu.Unlock()
return raw, nil
}
@@ -139,7 +138,7 @@ func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (
c.removeEntryLocked(c.tail)
}
c.Unlock()
c.mu.Unlock()
return raw, nil
}
@@ -150,34 +149,26 @@ func (c *Cache) removeEntryLocked(toremove *cacheEntry) {
delete(c.data, toremove.id)
}
// CacheOption modifies the behavior of FUSE node cache.
type CacheOption func(c *Cache)
// MaxCachedDirectories configures cache to allow at most the given number of cached directories.
func MaxCachedDirectories(count int) CacheOption {
return func(c *Cache) {
c.maxDirectories = count
}
// Options specifies behavior of filesystem Cache.
type Options struct {
MaxCachedDirectories int
MaxCachedEntries int
}
// MaxCachedDirectoryEntries configures cache to allow at most the given number entries in cached directories.
func MaxCachedDirectoryEntries(count int) CacheOption {
return func(c *Cache) {
c.maxDirectoryEntries = count
}
var defaultOptions = &Options{
MaxCachedDirectories: 1000,
MaxCachedEntries: 100000,
}
// NewCache creates FUSE node cache.
func NewCache(options ...CacheOption) *Cache {
c := &Cache{
// NewCache creates filesystem cache.
func NewCache(options *Options) *Cache {
if options == nil {
options = defaultOptions
}
return &Cache{
data: make(map[string]*cacheEntry),
maxDirectories: 1000,
maxDirectoryEntries: 100000,
maxDirectories: options.MaxCachedDirectories,
maxDirectoryEntries: options.MaxCachedEntries,
}
for _, o := range options {
o(c)
}
return c
}

View File

@@ -1,4 +1,4 @@
package fscache
package cachefs
import (
"errors"
@@ -119,10 +119,10 @@ func (cv *cacheVerifier) reset() {
}
func TestCache(t *testing.T) {
c := NewCache(
MaxCachedDirectories(4),
MaxCachedDirectoryEntries(100),
)
c := NewCache(&Options{
MaxCachedDirectories: 4,
MaxCachedEntries: 100,
})
if len(c.data) != 0 || c.totalDirectoryEntries != 0 || c.head != nil || c.tail != nil {
t.Errorf("invalid initial state: %v %v %v %v", c.data, c.totalDirectoryEntries, c.head, c.tail)
}
@@ -147,57 +147,57 @@ func TestCache(t *testing.T) {
cv.verifyCacheOrdering(t)
// fetch id1
c.GetEntries(id1, expirationTime, cs.get(id1))
c.getEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheMiss(t, id1)
cv.verifyCacheOrdering(t, id1)
// fetch id1 again - cache hit, no change
c.GetEntries(id1, expirationTime, cs.get(id1))
c.getEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheHit(t, id1)
cv.verifyCacheOrdering(t, id1)
// fetch id2
c.GetEntries(id2, expirationTime, cs.get(id2))
c.getEntries(id2, expirationTime, cs.get(id2))
cv.verifyCacheMiss(t, id2)
cv.verifyCacheOrdering(t, id2, id1)
// fetch id1 again - cache hit, id1 moved to the top of the LRU list
c.GetEntries(id1, expirationTime, cs.get(id1))
c.getEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheHit(t, id1)
cv.verifyCacheOrdering(t, id1, id2)
// fetch id2 again
c.GetEntries(id2, expirationTime, cs.get(id2))
c.getEntries(id2, expirationTime, cs.get(id2))
cv.verifyCacheHit(t, id2)
cv.verifyCacheOrdering(t, id2, id1)
// fetch id3
c.GetEntries(id3, expirationTime, cs.get(id3))
c.getEntries(id3, expirationTime, cs.get(id3))
cv.verifyCacheMiss(t, id3)
cv.verifyCacheOrdering(t, id3, id2, id1)
// fetch id4
c.GetEntries(id4, expirationTime, cs.get(id4))
c.getEntries(id4, expirationTime, cs.get(id4))
cv.verifyCacheMiss(t, id4)
cv.verifyCacheOrdering(t, id4, id3)
// fetch id1 again
c.GetEntries(id1, expirationTime, cs.get(id1))
c.getEntries(id1, expirationTime, cs.get(id1))
cv.verifyCacheMiss(t, id1)
cv.verifyCacheOrdering(t, id1, id4)
// fetch id5, it's a big one that expels all but one
c.GetEntries(id5, expirationTime, cs.get(id5))
c.getEntries(id5, expirationTime, cs.get(id5))
cv.verifyCacheMiss(t, id5)
cv.verifyCacheOrdering(t, id5, id1)
// fetch id6
c.GetEntries(id6, expirationTime, cs.get(id6))
c.getEntries(id6, expirationTime, cs.get(id6))
cv.verifyCacheMiss(t, id6)
cv.verifyCacheOrdering(t, id6)
// fetch id7
c.GetEntries(id7, expirationTime, cs.get(id7))
c.getEntries(id7, expirationTime, cs.get(id7))
cv.verifyCacheMiss(t, id7)
cv.verifyCacheOrdering(t, id6)
}

68
fs/cachefs/cachefs.go Normal file
View File

@@ -0,0 +1,68 @@
// Package cachefs implements a wrapper that caches filesystem actions.
package cachefs
import (
"github.com/kopia/kopia/fs"
)
// DirectoryCacher reads and potentially caches directory entries for a given directory.
type DirectoryCacher interface {
Readdir(d fs.Directory) (fs.Entries, error)
}
type context struct {
cacher DirectoryCacher
}
type directory struct {
ctx *context
fs.Directory
}
func (d *directory) Readdir() (fs.Entries, error) {
entries, err := d.ctx.cacher.Readdir(d.Directory)
if err != nil {
return entries, err
}
wrapped := make(fs.Entries, len(entries))
for i, entry := range entries {
wrapped[i] = wrapWithContext(entry, d.ctx)
}
return wrapped, err
}
type file struct {
ctx *context
fs.File
}
type symlink struct {
ctx *context
fs.Symlink
}
// Wrap returns an Entry that wraps another Entry and caches directory reads.
func Wrap(e fs.Entry, cacher DirectoryCacher) fs.Entry {
return wrapWithContext(e, &context{cacher})
}
func wrapWithContext(e fs.Entry, opts *context) fs.Entry {
switch e := e.(type) {
case fs.Directory:
return fs.Directory(&directory{opts, e})
case fs.File:
return fs.File(&file{opts, e})
case fs.Symlink:
return fs.Symlink(&symlink{opts, e})
default:
return e
}
}
var _ fs.Directory = &directory{}
var _ fs.File = &file{}
var _ fs.Symlink = &symlink{}

View File

@@ -11,7 +11,6 @@
"os"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/fscache"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
@@ -21,7 +20,6 @@
type fuseNode struct {
entry fs.Entry
cache *fscache.Cache
}
func (n *fuseNode) Attr(ctx context.Context, a *fuse.Attr) error {
@@ -57,7 +55,7 @@ func (dir *fuseDirectoryNode) directory() fs.Directory {
}
func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fusefs.Node, error) {
entries, err := dir.readPossiblyCachedReaddir()
entries, err := dir.directory().Readdir()
if err != nil {
if os.IsNotExist(err) {
return nil, fuse.ENOENT
@@ -71,15 +69,11 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse
return nil, fuse.ENOENT
}
return newFuseNode(e, dir.cache)
}
func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) {
return dir.cache.Readdir(dir.directory())
return newFuseNode(e)
}
func (dir *fuseDirectoryNode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
entries, err := dir.readPossiblyCachedReaddir()
entries, err := dir.directory().Readdir()
if err != nil {
return nil, err
}
@@ -115,24 +109,24 @@ func (sl *fuseSymlinkNode) Readlink(ctx context.Context, req *fuse.ReadlinkReque
return sl.entry.(fs.Symlink).Readlink()
}
func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) {
func newFuseNode(e fs.Entry) (fusefs.Node, error) {
switch e := e.(type) {
case fs.Directory:
return newDirectoryNode(e, cache), nil
return newDirectoryNode(e), nil
case fs.File:
return &fuseFileNode{fuseNode{e, cache}}, nil
return &fuseFileNode{fuseNode{e}}, nil
case fs.Symlink:
return &fuseSymlinkNode{fuseNode{e, cache}}, nil
return &fuseSymlinkNode{fuseNode{e}}, nil
default:
return nil, fmt.Errorf("entry type not supported: %v", e.Metadata().Type)
}
}
func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node {
return &fuseDirectoryNode{fuseNode{dir, cache}}
func newDirectoryNode(dir fs.Directory) fusefs.Node {
return &fuseDirectoryNode{fuseNode{dir}}
}
// NewDirectoryNode returns FUSE Node for a given fs.Directory
func NewDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node {
return newDirectoryNode(dir, cache)
func NewDirectoryNode(dir fs.Directory) fusefs.Node {
return newDirectoryNode(dir)
}

View File

@@ -10,7 +10,6 @@
"time"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/fscache"
"golang.org/x/net/context"
"golang.org/x/net/webdav"
)
@@ -88,7 +87,7 @@ type webdavDir struct {
}
func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) {
entries, err := d.w.cache.Readdir(d.entry)
entries, err := d.entry.Readdir()
if err != nil {
return nil, err
}
@@ -152,8 +151,7 @@ func (i webdavFileInfo) Sys() interface{} {
}
type webdavFS struct {
dir fs.Directory
cache *fscache.Cache
dir fs.Directory
}
func (w *webdavFS) Mkdir(ctx context.Context, path string, mode os.FileMode) error {
@@ -203,7 +201,7 @@ func (w *webdavFS) findEntry(path string) (fs.Entry, error) {
return nil, fmt.Errorf("%q not found in %q (not a directory)", p, strings.Join(parts[0:i], "/"))
}
entries, err := w.cache.Readdir(d)
entries, err := d.Readdir()
if err != nil {
return nil, err
}
@@ -230,6 +228,6 @@ func removeEmpty(s []string) []string {
}
// WebDAVFS returns a webdav.FileSystem implementation for a given directory.
func WebDAVFS(entry fs.Directory, cache *fscache.Cache) webdav.FileSystem {
return &webdavFS{entry, cache}
func WebDAVFS(entry fs.Directory) webdav.FileSystem {
return &webdavFS{entry}
}