From e8847b65ccfbb9bb9f68e19349ad793b00a267e8 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 2 Sep 2017 08:29:59 -0700 Subject: [PATCH] mounting webdav works, changed how FS directory caching is implemented to be based solely on ObjectIDs --- cli/command_mount_webdav.go | 8 +++- internal/fscache/cache.go | 13 ++++++ internal/fusemount/fusefs.go | 33 +------------- internal/webdavmount/webdavmount.go | 69 ++++++++++++++++++++++------- repo/object_reader.go | 11 +++-- 5 files changed, 81 insertions(+), 53 deletions(-) diff --git a/cli/command_mount_webdav.go b/cli/command_mount_webdav.go index d68cc8f16..68222ce24 100644 --- a/cli/command_mount_webdav.go +++ b/cli/command_mount_webdav.go @@ -19,10 +19,14 @@ func mountDirectoryWebDAV(entry fs.Directory, mountPoint string, cache *fscache. FileSystem: webdavmount.WebDAVFS(entry, cache), LockSystem: webdav.NewMemLS(), Logger: func(r *http.Request, err error) { + var maybeRange string + if r := r.Header.Get("Range"); r != "" { + maybeRange = " " + r + } if err != nil { - log.Printf("%v %v err: %v", r.Method, r.URL.RequestURI(), err) + log.Printf("%v %v%v err: %v", r.Method, r.URL.RequestURI(), maybeRange, err) } else { - log.Printf("%v %v OK", r.Method, r.URL.RequestURI()) + log.Printf("%v %v%v OK", r.Method, r.URL.RequestURI(), maybeRange) } }, }) diff --git a/internal/fscache/cache.go b/internal/fscache/cache.go index 979fed974..8ed93604c 100644 --- a/internal/fscache/cache.go +++ b/internal/fscache/cache.go @@ -7,6 +7,7 @@ "time" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/repo" ) type cacheEntry struct { @@ -74,6 +75,18 @@ func (c *Cache) remove(e *cacheEntry) { // Loader provides data to be stored in the cache. type Loader func() (fs.Entries, error) +// Readdir reads the contents of a provided directory using ObjectID of a directory (if any) to cache +// the results. +func (c *Cache) Readdir(d fs.Directory) (fs.Entries, error) { + if h, ok := d.(repo.HasObjectID); ok { + cacheID := h.ObjectID().String() + cacheExpiration := 24 * time.Hour + return c.GetEntries(cacheID, cacheExpiration, d.Readdir) + } + + return d.Readdir() +} + // GetEntries consults the cache and either retrieves the contents of directory listing from the cache // or invokes the provides callback and adds the results to cache. func (c *Cache) GetEntries(id string, expirationTime time.Duration, cb Loader) (fs.Entries, error) { diff --git a/internal/fusemount/fusefs.go b/internal/fusemount/fusefs.go index 79cae557d..6ec856e78 100644 --- a/internal/fusemount/fusefs.go +++ b/internal/fusemount/fusefs.go @@ -9,11 +9,6 @@ "fmt" "io/ioutil" "os" - "sort" - "time" - - "github.com/google/uuid" - "github.com/kopia/kopia/repo" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/internal/fscache" @@ -55,8 +50,6 @@ func (f *fuseFileNode) ReadAll(ctx context.Context) ([]byte, error) { type fuseDirectoryNode struct { fuseNode - cacheID string - cacheExpiration time.Duration } func (dir *fuseDirectoryNode) directory() fs.Directory { @@ -82,18 +75,7 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse } func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) { - return dir.cache.GetEntries(dir.cacheID, dir.cacheExpiration, func() (fs.Entries, error) { - entries, err := dir.directory().Readdir() - if err != nil { - return nil, err - } - - sort.Slice(entries, func(i, j int) bool { - return entries[i].Metadata().Name < entries[j].Metadata().Name - }) - - return entries, nil - }) + return dir.cache.Readdir(dir.directory()) } func (dir *fuseDirectoryNode) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { @@ -147,18 +129,7 @@ func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) { } func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node { - var cacheID string - var cacheExpiration time.Duration - - if h, ok := dir.(repo.HasObjectID); ok { - cacheID = h.ObjectID().String() - cacheExpiration = 24 * time.Hour - } else { - cacheID = uuid.New().String() - cacheExpiration = 1 * time.Second - } - - return &fuseDirectoryNode{fuseNode{dir, cache}, cacheID, cacheExpiration} + return &fuseDirectoryNode{fuseNode{dir, cache}} } // NewDirectoryNode returns FUSE Node for a given fs.Directory diff --git a/internal/webdavmount/webdavmount.go b/internal/webdavmount/webdavmount.go index fc41102d3..53a65a800 100644 --- a/internal/webdavmount/webdavmount.go +++ b/internal/webdavmount/webdavmount.go @@ -6,10 +6,9 @@ "log" "os" "strings" + "sync" "time" - "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/fs" "github.com/kopia/kopia/internal/fscache" "golang.org/x/net/context" @@ -21,8 +20,10 @@ var _ webdav.File = (*webdavDir)(nil) type webdavFile struct { - repo.ObjectReader entry fs.File + + mu sync.Mutex + r fs.Reader } func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) { @@ -33,17 +34,61 @@ func (f *webdavFile) Stat() (os.FileInfo, error) { return webdavFileInfo{f.entry.Metadata()}, nil } +func (f *webdavFile) getReader() (fs.Reader, error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.r == nil { + r, err := f.entry.Open() + if err != nil { + return nil, err + } + f.r = r + } + + return f.r, nil +} + +func (f *webdavFile) Read(b []byte) (int, error) { + r, err := f.getReader() + if err != nil { + return 0, err + } + return r.Read(b) +} + +func (f *webdavFile) Seek(offset int64, whence int) (int64, error) { + r, err := f.getReader() + if err != nil { + return 0, err + } + + return r.Seek(offset, whence) +} + func (f *webdavFile) Write(b []byte) (int, error) { return 0, errors.New("read-only filesystem") } +func (f *webdavFile) Close() error { + f.mu.Lock() + r := f.r + f.r = nil + f.mu.Unlock() + + if r != nil { + return r.Close() + } + + return nil +} + type webdavDir struct { + w *webdavFS entry fs.Directory } func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) { - log.Printf("ReadDir(%v)", d.entry.Metadata().Name) - entries, err := d.entry.Readdir() + entries, err := d.w.cache.Readdir(d.entry) if err != nil { return nil, err } @@ -132,11 +177,9 @@ func (w *webdavFS) OpenFile(ctx context.Context, path string, flags int, mode os switch f := f.(type) { case fs.Directory: - log.Printf("OpenFile(%q) succeeded with directory: %v", path, f.Metadata()) - return &webdavDir{f}, nil + return &webdavDir{w, f}, nil case fs.File: - log.Printf("OpenFile(%q) succeeded with file: %v", path, f.Metadata()) - return &webdavFile{nil, f}, nil + return &webdavFile{entry: f}, nil } return nil, fmt.Errorf("can't open %q: not implemented", path) @@ -145,11 +188,9 @@ func (w *webdavFS) OpenFile(ctx context.Context, path string, flags int, mode os func (w *webdavFS) Stat(ctx context.Context, path string) (os.FileInfo, error) { e, err := w.findEntry(path) if err != nil { - log.Printf("Stat(%q) failed with %v", path, err) return nil, err } - log.Printf("Stat(%q) success with %v", path, e.Metadata()) return webdavFileInfo{e.Metadata()}, nil } @@ -162,15 +203,11 @@ func (w *webdavFS) findEntry(path string) (fs.Entry, error) { return nil, fmt.Errorf("%q not found in %q (not a directory)", p, strings.Join(parts[0:i], "/")) } - entries, err := d.Readdir() + entries, err := w.cache.Readdir(d) if err != nil { return nil, err } - for _, e := range entries { - log.Printf("%+v", e.Metadata()) - } - e = entries.FindByName(p) if e == nil { return nil, fmt.Errorf("%q not found in %q (not found)", p, strings.Join(parts[0:i], "/")) diff --git a/repo/object_reader.go b/repo/object_reader.go index dabd494a5..7ca3fa02f 100644 --- a/repo/object_reader.go +++ b/repo/object_reader.go @@ -85,7 +85,7 @@ func (r *objectReader) closeCurrentChunk() { r.currentChunkData = nil } -func (r *objectReader) findChunkIndexForOffset(offset int64) int { +func (r *objectReader) findChunkIndexForOffset(offset int64) (int, error) { left := 0 right := len(r.seekTable) - 1 for left <= right { @@ -101,10 +101,10 @@ func (r *objectReader) findChunkIndexForOffset(offset int64) int { continue } - return middle + return middle, nil } - panic("Unreachable code") + return 0, fmt.Errorf("can't find chunk for offset %v", offset) } func (r *objectReader) Seek(offset int64, whence int) (int64, error) { @@ -124,7 +124,10 @@ func (r *objectReader) Seek(offset int64, whence int) (int64, error) { offset = r.totalLength } - index := r.findChunkIndexForOffset(offset) + index, err := r.findChunkIndexForOffset(offset) + if err != nil { + return -1, fmt.Errorf("invalid seek %v %v: %v", offset, whence, err) + } chunkStartOffset := r.seekTable[index].Start