mirror of
https://github.com/kopia/kopia.git
synced 2026-05-18 19:54:37 -04:00
added caching to 'kopia mount'
This commit is contained in:
@@ -16,9 +16,11 @@
|
||||
var (
|
||||
mountCommand = app.Command("mount", "Mount repository object as a local filesystem.")
|
||||
|
||||
mountObjectID = mountCommand.Arg("path", "Identifier of the directory to mount.").Required().String()
|
||||
mountPoint = mountCommand.Arg("mountPoint", "Mount point").Required().ExistingDir()
|
||||
mountTraceFS = mountCommand.Flag("trace-fs", "Trace filesystem operations").Bool()
|
||||
mountObjectID = mountCommand.Arg("path", "Identifier of the directory to mount.").Required().String()
|
||||
mountPoint = mountCommand.Arg("mountPoint", "Mount point").Required().ExistingDir()
|
||||
mountTraceFS = mountCommand.Flag("trace-fs", "Trace filesystem operations").Bool()
|
||||
mountMaxCachedEntries = mountCommand.Flag("max-cached-entries", "Limit the number of cached directories").Default("100000").Int()
|
||||
mountMaxCachedDirectories = mountCommand.Flag("max-cached-dirs", "Limit the number of cached directories").Default("100").Int()
|
||||
)
|
||||
|
||||
type root struct {
|
||||
@@ -50,7 +52,11 @@ func runMountCommand(context *kingpin.ParseContext) error {
|
||||
entry = loggingfs.Wrap(entry).(fs.Directory)
|
||||
}
|
||||
|
||||
rootNode := kopiafuse.NewDirectoryNode(entry)
|
||||
cache := kopiafuse.NewCache(
|
||||
kopiafuse.MaxCachedDirectories(*mountMaxCachedDirectories),
|
||||
kopiafuse.MaxCachedDirectoryEntries(*mountMaxCachedEntries),
|
||||
)
|
||||
rootNode := kopiafuse.NewDirectoryNode(entry, cache)
|
||||
|
||||
fusefs.Serve(fuseConnection, &root{rootNode})
|
||||
|
||||
|
||||
156
fuse/cache.go
Normal file
156
fuse/cache.go
Normal file
@@ -0,0 +1,156 @@
|
||||
// +build !windows
|
||||
|
||||
// Package fuse implements FUSE filesystem nodes for mounting contents of filesystem stored in repository.
|
||||
//
|
||||
// The FUSE implementation used is from bazil.org/fuse
|
||||
package fuse
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
)
|
||||
|
||||
type cacheEntry struct {
|
||||
id int64
|
||||
prev *cacheEntry
|
||||
next *cacheEntry
|
||||
|
||||
entries fs.Entries
|
||||
}
|
||||
|
||||
// Cache maintains in-memory cache of recently-read data to speed up filesystem operations.
|
||||
type Cache struct {
|
||||
sync.Mutex
|
||||
|
||||
nextID int64
|
||||
totalDirectoryEntries int
|
||||
maxDirectories int
|
||||
maxDirectoryEntries int
|
||||
data map[int64]*cacheEntry
|
||||
|
||||
// Doubly-linked list of entries, in access time order
|
||||
head *cacheEntry
|
||||
tail *cacheEntry
|
||||
}
|
||||
|
||||
func (c *Cache) allocateID() int64 {
|
||||
if c == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return atomic.AddInt64(&c.nextID, 1)
|
||||
}
|
||||
|
||||
func (c *Cache) moveToHead(e *cacheEntry) {
|
||||
if e == c.head {
|
||||
// Already at head, no change.
|
||||
return
|
||||
}
|
||||
|
||||
c.remove(e)
|
||||
c.addToHead(e)
|
||||
}
|
||||
|
||||
func (c *Cache) addToHead(e *cacheEntry) {
|
||||
if c.head != nil {
|
||||
e.next = c.head
|
||||
c.head.prev = e
|
||||
c.head = e
|
||||
} else {
|
||||
c.head = e
|
||||
c.tail = e
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) remove(e *cacheEntry) {
|
||||
if e.prev == nil {
|
||||
// First element.
|
||||
c.head = e.next
|
||||
} else {
|
||||
e.prev.next = e.next
|
||||
}
|
||||
|
||||
if e.next == nil {
|
||||
// Last element
|
||||
c.tail = e.prev
|
||||
} else {
|
||||
e.next.prev = e.prev
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) getEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, error) {
|
||||
if c == nil {
|
||||
return cb()
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
if v, ok := c.data[id]; ok {
|
||||
c.moveToHead(v)
|
||||
c.Unlock()
|
||||
return v.entries, nil
|
||||
}
|
||||
|
||||
raw, err := cb()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(raw) > c.maxDirectoryEntries {
|
||||
// no point caching since it would not fit anyway, just return it.
|
||||
c.Unlock()
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
entry := &cacheEntry{
|
||||
id: id,
|
||||
entries: raw,
|
||||
}
|
||||
c.addToHead(entry)
|
||||
c.data[id] = entry
|
||||
|
||||
c.totalDirectoryEntries += len(raw)
|
||||
for c.totalDirectoryEntries > c.maxDirectoryEntries || len(c.data) > c.maxDirectories {
|
||||
toremove := c.tail
|
||||
c.remove(toremove)
|
||||
c.totalDirectoryEntries -= len(toremove.entries)
|
||||
delete(c.data, toremove.id)
|
||||
}
|
||||
|
||||
c.Unlock()
|
||||
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
// CacheOption modifies the behavior of FUSE node cache.
|
||||
type CacheOption func(c *Cache)
|
||||
|
||||
// MaxCachedDirectories configures cache to allow at most the given number of cached directories.
|
||||
func MaxCachedDirectories(count int) CacheOption {
|
||||
return func(c *Cache) {
|
||||
c.maxDirectories = count
|
||||
}
|
||||
}
|
||||
|
||||
// MaxCachedDirectoryEntries configures cache to allow at most the given number entries in cached directories.
|
||||
func MaxCachedDirectoryEntries(count int) CacheOption {
|
||||
return func(c *Cache) {
|
||||
c.maxDirectoryEntries = count
|
||||
}
|
||||
}
|
||||
|
||||
// NewCache creates FUSE node cache.
|
||||
func NewCache(options ...CacheOption) *Cache {
|
||||
c := &Cache{
|
||||
data: make(map[int64]*cacheEntry),
|
||||
maxDirectories: 1000,
|
||||
maxDirectoryEntries: 100000,
|
||||
}
|
||||
|
||||
for _, o := range options {
|
||||
o(c)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
200
fuse/cache_test.go
Normal file
200
fuse/cache_test.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package fuse
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
)
|
||||
|
||||
type cacheSource struct {
|
||||
data map[int64]fs.Entries
|
||||
callCounter map[int64]int
|
||||
}
|
||||
|
||||
func (cs *cacheSource) get(id int64) func() (fs.Entries, error) {
|
||||
return func() (fs.Entries, error) {
|
||||
cs.callCounter[id]++
|
||||
d, ok := cs.data[id]
|
||||
if !ok {
|
||||
return nil, errors.New("no such id")
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *cacheSource) setEntryCount(id int64, cnt int) {
|
||||
var fakeEntries fs.Entries
|
||||
var fakeEntry fs.Entry
|
||||
for i := 0; i < cnt; i++ {
|
||||
fakeEntries = append(fakeEntries, fakeEntry)
|
||||
}
|
||||
|
||||
cs.data[id] = fakeEntries
|
||||
cs.callCounter[id] = 0
|
||||
}
|
||||
|
||||
func newCacheSource() *cacheSource {
|
||||
return &cacheSource{
|
||||
data: make(map[int64]fs.Entries),
|
||||
callCounter: make(map[int64]int),
|
||||
}
|
||||
}
|
||||
|
||||
type cacheVerifier struct {
|
||||
cache *Cache
|
||||
cacheSource *cacheSource
|
||||
lastCallCounter map[int64]int
|
||||
}
|
||||
|
||||
func (cv *cacheVerifier) verifyCacheMiss(t *testing.T, id int64) {
|
||||
actual := cv.cacheSource.callCounter[id]
|
||||
expected := cv.lastCallCounter[id] + 1
|
||||
if actual != expected {
|
||||
t.Errorf(errorPrefix()+"invalid call counter for %v, got %v, expected %v", id, actual, expected)
|
||||
}
|
||||
cv.reset()
|
||||
}
|
||||
|
||||
func (cv *cacheVerifier) verifyCacheHit(t *testing.T, id int64) {
|
||||
if !reflect.DeepEqual(cv.lastCallCounter, cv.cacheSource.callCounter) {
|
||||
t.Errorf(errorPrefix()+" unexpected call counters for %v, got %v, expected %v", id, cv.cacheSource.callCounter, cv.lastCallCounter)
|
||||
}
|
||||
cv.reset()
|
||||
}
|
||||
|
||||
func (cv *cacheVerifier) verifyCacheOrdering(t *testing.T, expectedOrdering ...int64) {
|
||||
var actualOrdering []int64
|
||||
var totalDirectoryEntries int
|
||||
var totalDirectories int
|
||||
for e := cv.cache.head; e != nil; e = e.next {
|
||||
actualOrdering = append(actualOrdering, e.id)
|
||||
totalDirectoryEntries += len(e.entries)
|
||||
totalDirectories++
|
||||
}
|
||||
|
||||
if cv.cache.totalDirectoryEntries != totalDirectoryEntries {
|
||||
t.Errorf("invalid totalDirectoryEntries: %v, expected %v", cv.cache.totalDirectoryEntries, totalDirectoryEntries)
|
||||
}
|
||||
|
||||
if len(cv.cache.data) != totalDirectories {
|
||||
t.Errorf("invalid total directories: %v, expected %v", len(cv.cache.data), totalDirectories)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(actualOrdering, expectedOrdering) {
|
||||
t.Errorf(errorPrefix()+"unexpected ordering: %v, expected: %v", actualOrdering, expectedOrdering)
|
||||
}
|
||||
|
||||
if totalDirectories > cv.cache.maxDirectories {
|
||||
t.Errorf(errorPrefix()+"total directories exceeds limit: %v, expected %v", totalDirectories, cv.cache.maxDirectories)
|
||||
}
|
||||
|
||||
if totalDirectoryEntries > cv.cache.maxDirectoryEntries {
|
||||
t.Errorf(errorPrefix()+"total directory entries exceeds limit: %v, expected %v", totalDirectoryEntries, cv.cache.maxDirectoryEntries)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func errorPrefix() string {
|
||||
if _, fn, line, ok := runtime.Caller(2); ok {
|
||||
return fmt.Sprintf("called from %v:%v: ", filepath.Base(fn), line)
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func (cv *cacheVerifier) reset() {
|
||||
cv.lastCallCounter = make(map[int64]int)
|
||||
for k, v := range cv.cacheSource.callCounter {
|
||||
cv.lastCallCounter[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
c := NewCache(
|
||||
MaxCachedDirectories(4),
|
||||
MaxCachedDirectoryEntries(100),
|
||||
)
|
||||
if len(c.data) != 0 || c.totalDirectoryEntries != 0 || c.head != nil || c.tail != nil {
|
||||
t.Errorf("invalid initial state: %v %v %v %v", c.data, c.totalDirectoryEntries, c.head, c.tail)
|
||||
}
|
||||
|
||||
cs := newCacheSource()
|
||||
cv := cacheVerifier{cacheSource: cs, cache: c}
|
||||
var id1 int64 = 1
|
||||
var id2 int64 = 2
|
||||
var id3 int64 = 3
|
||||
var id4 int64 = 4
|
||||
var id5 int64 = 5
|
||||
var id6 int64 = 6
|
||||
var id7 int64 = 7
|
||||
cs.setEntryCount(id1, 3)
|
||||
cs.setEntryCount(id2, 3)
|
||||
cs.setEntryCount(id3, 3)
|
||||
cs.setEntryCount(id4, 95)
|
||||
cs.setEntryCount(id5, 70)
|
||||
cs.setEntryCount(id6, 100)
|
||||
cs.setEntryCount(id7, 101)
|
||||
|
||||
cv.verifyCacheOrdering(t)
|
||||
|
||||
// fetch id1
|
||||
c.getEntries(id1, cs.get(id1))
|
||||
cv.verifyCacheMiss(t, id1)
|
||||
cv.verifyCacheOrdering(t, id1)
|
||||
|
||||
// fetch id1 again - cache hit, no change
|
||||
c.getEntries(id1, cs.get(id1))
|
||||
cv.verifyCacheHit(t, id1)
|
||||
cv.verifyCacheOrdering(t, id1)
|
||||
|
||||
// fetch id2
|
||||
c.getEntries(id2, cs.get(id2))
|
||||
cv.verifyCacheMiss(t, id2)
|
||||
cv.verifyCacheOrdering(t, id2, id1)
|
||||
|
||||
// fetch id1 again - cache hit, id1 moved to the top of the LRU list
|
||||
c.getEntries(id1, cs.get(id1))
|
||||
cv.verifyCacheHit(t, id1)
|
||||
cv.verifyCacheOrdering(t, id1, id2)
|
||||
|
||||
// fetch id2 again
|
||||
c.getEntries(id2, cs.get(id2))
|
||||
cv.verifyCacheHit(t, id2)
|
||||
cv.verifyCacheOrdering(t, id2, id1)
|
||||
|
||||
// fetch id3
|
||||
c.getEntries(id3, cs.get(id3))
|
||||
cv.verifyCacheMiss(t, id3)
|
||||
cv.verifyCacheOrdering(t, id3, id2, id1)
|
||||
|
||||
// fetch id4
|
||||
c.getEntries(id4, cs.get(id4))
|
||||
cv.verifyCacheMiss(t, id4)
|
||||
cv.verifyCacheOrdering(t, id4, id3)
|
||||
|
||||
// fetch id1 again
|
||||
c.getEntries(id1, cs.get(id1))
|
||||
cv.verifyCacheMiss(t, id1)
|
||||
cv.verifyCacheOrdering(t, id1, id4)
|
||||
|
||||
// fetch id5, it's a big one that expels all but one
|
||||
c.getEntries(id5, cs.get(id5))
|
||||
cv.verifyCacheMiss(t, id5)
|
||||
cv.verifyCacheOrdering(t, id5, id1)
|
||||
|
||||
// fetch id6
|
||||
c.getEntries(id6, cs.get(id6))
|
||||
cv.verifyCacheMiss(t, id6)
|
||||
cv.verifyCacheOrdering(t, id6)
|
||||
|
||||
// fetch id7
|
||||
c.getEntries(id7, cs.get(id7))
|
||||
cv.verifyCacheMiss(t, id7)
|
||||
cv.verifyCacheOrdering(t, id6)
|
||||
}
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
type fuseNode struct {
|
||||
entry fs.Entry
|
||||
cache *Cache
|
||||
}
|
||||
|
||||
func (n *fuseNode) Attr(ctx context.Context, a *fuse.Attr) error {
|
||||
@@ -48,6 +49,7 @@ func (f *fuseFileNode) ReadAll(ctx context.Context) ([]byte, error) {
|
||||
|
||||
type fuseDirectoryNode struct {
|
||||
fuseNode
|
||||
cacheID int64
|
||||
}
|
||||
|
||||
func (dir *fuseDirectoryNode) directory() fs.Directory {
|
||||
@@ -69,12 +71,12 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse
|
||||
return nil, fuse.ENOENT
|
||||
}
|
||||
|
||||
return newFuseNode(e)
|
||||
return newFuseNode(e, dir.cache)
|
||||
|
||||
}
|
||||
|
||||
func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) {
|
||||
return dir.directory().Readdir()
|
||||
return dir.cache.getEntries(dir.cacheID, func() (fs.Entries, error) { return dir.directory().Readdir() })
|
||||
}
|
||||
|
||||
func (dir *fuseDirectoryNode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
@@ -114,20 +116,24 @@ func (sl *fuseSymlinkNode) Readlink(ctx context.Context, req *fuse.ReadlinkReque
|
||||
return sl.entry.(fs.Symlink).Readlink()
|
||||
}
|
||||
|
||||
func newFuseNode(e fs.Entry) (fusefs.Node, error) {
|
||||
func newFuseNode(e fs.Entry, cache *Cache) (fusefs.Node, error) {
|
||||
switch e := e.(type) {
|
||||
case fs.Directory:
|
||||
return &fuseDirectoryNode{fuseNode{e}}, nil
|
||||
return newDirectoryNode(e, cache), nil
|
||||
case fs.File:
|
||||
return &fuseFileNode{fuseNode{e}}, nil
|
||||
return &fuseFileNode{fuseNode{e, cache}}, nil
|
||||
case fs.Symlink:
|
||||
return &fuseSymlinkNode{fuseNode{e}}, nil
|
||||
return &fuseSymlinkNode{fuseNode{e, cache}}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("entry type not supported: %v", e.Metadata().Type)
|
||||
}
|
||||
}
|
||||
|
||||
// NewDirectoryNode returns FUSE Node for a given fs.Directory
|
||||
func NewDirectoryNode(dir fs.Directory) fusefs.Node {
|
||||
return &fuseDirectoryNode{fuseNode{dir}}
|
||||
func newDirectoryNode(dir fs.Directory, cache *Cache) fusefs.Node {
|
||||
return &fuseDirectoryNode{fuseNode{dir, cache}, cache.allocateID()}
|
||||
}
|
||||
|
||||
// NewDirectoryNode returns FUSE Node for a given fs.Directory
|
||||
func NewDirectoryNode(dir fs.Directory, cache *Cache) fusefs.Node {
|
||||
return newDirectoryNode(dir, cache)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user