mirror of
https://github.com/kopia/kopia.git
synced 2026-04-21 06:28:01 -04:00
mounting webdav works, changed how FS directory caching is implemented to be based solely on ObjectIDs
This commit is contained in:
@@ -19,10 +19,14 @@ func mountDirectoryWebDAV(entry fs.Directory, mountPoint string, cache *fscache.
|
||||
FileSystem: webdavmount.WebDAVFS(entry, cache),
|
||||
LockSystem: webdav.NewMemLS(),
|
||||
Logger: func(r *http.Request, err error) {
|
||||
var maybeRange string
|
||||
if r := r.Header.Get("Range"); r != "" {
|
||||
maybeRange = " " + r
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("%v %v err: %v", r.Method, r.URL.RequestURI(), err)
|
||||
log.Printf("%v %v%v err: %v", r.Method, r.URL.RequestURI(), maybeRange, err)
|
||||
} else {
|
||||
log.Printf("%v %v OK", r.Method, r.URL.RequestURI())
|
||||
log.Printf("%v %v%v OK", r.Method, r.URL.RequestURI(), maybeRange)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/repo"
|
||||
)
|
||||
|
||||
type cacheEntry struct {
|
||||
@@ -74,6 +75,18 @@ func (c *Cache) remove(e *cacheEntry) {
|
||||
// Loader provides data to be stored in the cache.
|
||||
type Loader func() (fs.Entries, error)
|
||||
|
||||
// Readdir reads the contents of a provided directory using ObjectID of a directory (if any) to cache
|
||||
// the results.
|
||||
func (c *Cache) Readdir(d fs.Directory) (fs.Entries, error) {
|
||||
if h, ok := d.(repo.HasObjectID); ok {
|
||||
cacheID := h.ObjectID().String()
|
||||
cacheExpiration := 24 * time.Hour
|
||||
return c.GetEntries(cacheID, cacheExpiration, d.Readdir)
|
||||
}
|
||||
|
||||
return d.Readdir()
|
||||
}
|
||||
|
||||
// GetEntries consults the cache and either retrieves the contents of directory listing from the cache
|
||||
// or invokes the provides callback and adds the results to cache.
|
||||
func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) {
|
||||
|
||||
@@ -9,11 +9,6 @@
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/kopia/kopia/repo"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/internal/fscache"
|
||||
@@ -55,8 +50,6 @@ func (f *fuseFileNode) ReadAll(ctx context.Context) ([]byte, error) {
|
||||
|
||||
type fuseDirectoryNode struct {
|
||||
fuseNode
|
||||
cacheID string
|
||||
cacheExpiration time.Duration
|
||||
}
|
||||
|
||||
func (dir *fuseDirectoryNode) directory() fs.Directory {
|
||||
@@ -82,18 +75,7 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse
|
||||
}
|
||||
|
||||
func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) {
|
||||
return dir.cache.GetEntries(dir.cacheID, dir.cacheExpiration, func() (fs.Entries, error) {
|
||||
entries, err := dir.directory().Readdir()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Metadata().Name < entries[j].Metadata().Name
|
||||
})
|
||||
|
||||
return entries, nil
|
||||
})
|
||||
return dir.cache.Readdir(dir.directory())
|
||||
}
|
||||
|
||||
func (dir *fuseDirectoryNode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
|
||||
@@ -147,18 +129,7 @@ func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) {
|
||||
}
|
||||
|
||||
func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node {
|
||||
var cacheID string
|
||||
var cacheExpiration time.Duration
|
||||
|
||||
if h, ok := dir.(repo.HasObjectID); ok {
|
||||
cacheID = h.ObjectID().String()
|
||||
cacheExpiration = 24 * time.Hour
|
||||
} else {
|
||||
cacheID = uuid.New().String()
|
||||
cacheExpiration = 1 * time.Second
|
||||
}
|
||||
|
||||
return &fuseDirectoryNode{fuseNode{dir, cache}, cacheID, cacheExpiration}
|
||||
return &fuseDirectoryNode{fuseNode{dir, cache}}
|
||||
}
|
||||
|
||||
// NewDirectoryNode returns FUSE Node for a given fs.Directory
|
||||
|
||||
@@ -6,10 +6,9 @@
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/internal/fscache"
|
||||
"golang.org/x/net/context"
|
||||
@@ -21,8 +20,10 @@
|
||||
var _ webdav.File = (*webdavDir)(nil)
|
||||
|
||||
type webdavFile struct {
|
||||
repo.ObjectReader
|
||||
entry fs.File
|
||||
|
||||
mu sync.Mutex
|
||||
r fs.Reader
|
||||
}
|
||||
|
||||
func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) {
|
||||
@@ -33,17 +34,61 @@ func (f *webdavFile) Stat() (os.FileInfo, error) {
|
||||
return webdavFileInfo{f.entry.Metadata()}, nil
|
||||
}
|
||||
|
||||
func (f *webdavFile) getReader() (fs.Reader, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.r == nil {
|
||||
r, err := f.entry.Open()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.r = r
|
||||
}
|
||||
|
||||
return f.r, nil
|
||||
}
|
||||
|
||||
func (f *webdavFile) Read(b []byte) (int, error) {
|
||||
r, err := f.getReader()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return r.Read(b)
|
||||
}
|
||||
|
||||
func (f *webdavFile) Seek(offset int64, whence int) (int64, error) {
|
||||
r, err := f.getReader()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return r.Seek(offset, whence)
|
||||
}
|
||||
|
||||
func (f *webdavFile) Write(b []byte) (int, error) {
|
||||
return 0, errors.New("read-only filesystem")
|
||||
}
|
||||
|
||||
func (f *webdavFile) Close() error {
|
||||
f.mu.Lock()
|
||||
r := f.r
|
||||
f.r = nil
|
||||
f.mu.Unlock()
|
||||
|
||||
if r != nil {
|
||||
return r.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type webdavDir struct {
|
||||
w *webdavFS
|
||||
entry fs.Directory
|
||||
}
|
||||
|
||||
func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) {
|
||||
log.Printf("ReadDir(%v)", d.entry.Metadata().Name)
|
||||
entries, err := d.entry.Readdir()
|
||||
entries, err := d.w.cache.Readdir(d.entry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -132,11 +177,9 @@ func (w *webdavFS) OpenFile(ctx context.Context, path string, flags int, mode os
|
||||
|
||||
switch f := f.(type) {
|
||||
case fs.Directory:
|
||||
log.Printf("OpenFile(%q) succeeded with directory: %v", path, f.Metadata())
|
||||
return &webdavDir{f}, nil
|
||||
return &webdavDir{w, f}, nil
|
||||
case fs.File:
|
||||
log.Printf("OpenFile(%q) succeeded with file: %v", path, f.Metadata())
|
||||
return &webdavFile{nil, f}, nil
|
||||
return &webdavFile{entry: f}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("can't open %q: not implemented", path)
|
||||
@@ -145,11 +188,9 @@ func (w *webdavFS) OpenFile(ctx context.Context, path string, flags int, mode os
|
||||
func (w *webdavFS) Stat(ctx context.Context, path string) (os.FileInfo, error) {
|
||||
e, err := w.findEntry(path)
|
||||
if err != nil {
|
||||
log.Printf("Stat(%q) failed with %v", path, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Printf("Stat(%q) success with %v", path, e.Metadata())
|
||||
return webdavFileInfo{e.Metadata()}, nil
|
||||
}
|
||||
|
||||
@@ -162,15 +203,11 @@ func (w *webdavFS) findEntry(path string) (fs.Entry, error) {
|
||||
return nil, fmt.Errorf("%q not found in %q (not a directory)", p, strings.Join(parts[0:i], "/"))
|
||||
}
|
||||
|
||||
entries, err := d.Readdir()
|
||||
entries, err := w.cache.Readdir(d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
log.Printf("%+v", e.Metadata())
|
||||
}
|
||||
|
||||
e = entries.FindByName(p)
|
||||
if e == nil {
|
||||
return nil, fmt.Errorf("%q not found in %q (not found)", p, strings.Join(parts[0:i], "/"))
|
||||
|
||||
@@ -85,7 +85,7 @@ func (r *objectReader) closeCurrentChunk() {
|
||||
r.currentChunkData = nil
|
||||
}
|
||||
|
||||
func (r *objectReader) findChunkIndexForOffset(offset int64) int {
|
||||
func (r *objectReader) findChunkIndexForOffset(offset int64) (int, error) {
|
||||
left := 0
|
||||
right := len(r.seekTable) - 1
|
||||
for left <= right {
|
||||
@@ -101,10 +101,10 @@ func (r *objectReader) findChunkIndexForOffset(offset int64) int {
|
||||
continue
|
||||
}
|
||||
|
||||
return middle
|
||||
return middle, nil
|
||||
}
|
||||
|
||||
panic("Unreachable code")
|
||||
return 0, fmt.Errorf("can't find chunk for offset %v", offset)
|
||||
}
|
||||
|
||||
func (r *objectReader) Seek(offset int64, whence int) (int64, error) {
|
||||
@@ -124,7 +124,10 @@ func (r *objectReader) Seek(offset int64, whence int) (int64, error) {
|
||||
offset = r.totalLength
|
||||
}
|
||||
|
||||
index := r.findChunkIndexForOffset(offset)
|
||||
index, err := r.findChunkIndexForOffset(offset)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("invalid seek %v %v: %v", offset, whence, err)
|
||||
}
|
||||
|
||||
chunkStartOffset := r.seekTable[index].Start
|
||||
|
||||
|
||||
Reference in New Issue
Block a user