From d613d99f7e8cc0b8b2361e156cd86714f4e48bcd Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 27 Aug 2017 15:02:56 -0700 Subject: [PATCH] beginnings of WebDAV mounting support, not really working yet, need to do have better caching --- cli/command_mount.go | 47 ++---- cli/command_mount_fuse.go | 41 +++++ cli/command_mount_nofuse.go | 18 +++ cli/command_mount_webdav.go | 51 ++++++ {fuse => internal/fscache}/cache.go | 12 +- {fuse => internal/fscache}/cache_test.go | 24 +-- {fuse => internal/fusemount}/fusefs.go | 17 +- internal/webdavmount/webdavmount.go | 198 +++++++++++++++++++++++ 8 files changed, 347 insertions(+), 61 deletions(-) create mode 100644 cli/command_mount_fuse.go create mode 100644 cli/command_mount_nofuse.go create mode 100644 cli/command_mount_webdav.go rename {fuse => internal/fscache}/cache.go (89%) rename {fuse => internal/fscache}/cache_test.go (92%) rename {fuse => internal/fusemount}/fusefs.go (83%) create mode 100644 internal/webdavmount/webdavmount.go diff --git a/cli/command_mount.go b/cli/command_mount.go index 3cbc9fc33..a177265f7 100644 --- a/cli/command_mount.go +++ b/cli/command_mount.go @@ -1,15 +1,13 @@ -// +build !windows - package cli import ( - "bazil.org/fuse" + "fmt" + "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/loggingfs" "github.com/kopia/kopia/fs/repofs" - fusefs "bazil.org/fuse/fs" - kopiafuse "github.com/kopia/kopia/fuse" + "github.com/kopia/kopia/internal/fscache" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -24,29 +22,8 @@ mountMaxCachedDirectories = mountCommand.Flag("max-cached-dirs", "Limit the number of cached directories").Default("100").Int() ) -type root struct { - fusefs.Node -} - -func (r *root) Root() (fusefs.Node, error) { - return r.Node, nil -} - func runMountCommand(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) - - fuseConnection, err := fuse.Mount( - *mountPoint, - fuse.ReadOnly(), - fuse.FSName("kopia"), - fuse.Subtype("kopia"), - fuse.VolumeName("Kopia"), - ) - - if err != nil { - return err - } - var entry fs.Directory if *mountObjectID == "all" { @@ -63,15 +40,19 @@ func runMountCommand(context *kingpin.ParseContext) error { entry = loggingfs.Wrap(entry).(fs.Directory) } - cache := kopiafuse.NewCache( - kopiafuse.MaxCachedDirectories(*mountMaxCachedDirectories), - kopiafuse.MaxCachedDirectoryEntries(*mountMaxCachedEntries), + cache := fscache.NewCache( + fscache.MaxCachedDirectories(*mountMaxCachedDirectories), + fscache.MaxCachedDirectoryEntries(*mountMaxCachedEntries), ) - rootNode := kopiafuse.NewDirectoryNode(entry, cache) - fusefs.Serve(fuseConnection, &root{rootNode}) - - return nil + switch *mountMode { + case "FUSE": + return mountDirectoryFUSE(entry, *mountPoint, cache) + case "WEBDAV": + return mountDirectoryWebDAV(entry, *mountPoint, cache) + default: + return fmt.Errorf("unsupported mode: %q", *mountMode) + } } func init() { diff --git a/cli/command_mount_fuse.go b/cli/command_mount_fuse.go new file mode 100644 index 000000000..8d272858d --- /dev/null +++ b/cli/command_mount_fuse.go @@ -0,0 +1,41 @@ +// +build !windows + +package cli + +import ( + "bazil.org/fuse" + fusefs "bazil.org/fuse/fs" + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/fscache" + "github.com/kopia/kopia/internal/fusemount" +) + +type root struct { + fusefs.Node +} + +func (r *root) Root() (fusefs.Node, error) { + return r.Node, nil +} + +var ( + mountMode = mountCommand.Flag("mode", "Mount mode").Default("FUSE").Enum("WEBDAV", "FUSE") +) + +func mountDirectoryFUSE(entry fs.Directory, mountPoint string, cache *fscache.Cache) error { + rootNode := fusemount.NewDirectoryNode(entry, cache) + + fuseConnection, err := fuse.Mount( + mountPoint, + fuse.ReadOnly(), + fuse.FSName("kopia"), + fuse.Subtype("kopia"), + fuse.VolumeName("Kopia"), + ) + + if err != nil { + return err + } + + return fusefs.Serve(fuseConnection, &root{rootNode}) +} diff --git a/cli/command_mount_nofuse.go b/cli/command_mount_nofuse.go new file mode 100644 index 000000000..ae2d913e2 --- /dev/null +++ b/cli/command_mount_nofuse.go @@ -0,0 +1,18 @@ +// +build windows + +package cli + +import ( + "fmt" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/fscache" +) + +var ( + mountMode = mountCommand.Flag("mode", "Mount mode").Default(defaultMountMode()).Enum("WEBDAV") +) + +func mountDirectoryFUSE(entry fs.Directory, mountPoint string, cache *fscache.Cache) error { + return fmt.Errorf("FUSE is not supported") +} diff --git a/cli/command_mount_webdav.go b/cli/command_mount_webdav.go new file mode 100644 index 000000000..d68cc8f16 --- /dev/null +++ b/cli/command_mount_webdav.go @@ -0,0 +1,51 @@ +package cli + +import ( + "context" + "fmt" + "log" + "net/http" + + "golang.org/x/net/webdav" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/fscache" + "github.com/kopia/kopia/internal/webdavmount" +) + +func mountDirectoryWebDAV(entry fs.Directory, mountPoint string, cache *fscache.Cache) error { + mux := http.NewServeMux() + mux.Handle("/", &webdav.Handler{ + FileSystem: webdavmount.WebDAVFS(entry, cache), + LockSystem: webdav.NewMemLS(), + Logger: func(r *http.Request, err error) { + if err != nil { + log.Printf("%v %v err: %v", r.Method, r.URL.RequestURI(), err) + } else { + log.Printf("%v %v OK", r.Method, r.URL.RequestURI()) + } + }, + }) + + s := http.Server{ + Addr: "127.0.0.1:9998", + Handler: mux, + } + + onCtrlC(func() { + s.Shutdown(context.Background()) + }) + + fmt.Printf("Server listening at http://%v/ Press Ctrl-C to shut down.\n", s.Addr) + + err := s.ListenAndServe() + if err == http.ErrServerClosed { + fmt.Println("Server shut down.") + + } + if err != http.ErrServerClosed { + return err + } + + return nil +} diff --git a/fuse/cache.go b/internal/fscache/cache.go similarity index 89% rename from fuse/cache.go rename to internal/fscache/cache.go index 3e93c1f42..d3b4edc24 100644 --- a/fuse/cache.go +++ b/internal/fscache/cache.go @@ -1,9 +1,5 @@ -// +build !windows - -// Package fuse implements FUSE filesystem nodes for mounting contents of filesystem stored in repository. -// -// The FUSE implementation used is from bazil.org/fuse -package fuse +// Package fscache implements in-memory cache of filesystem entries. +package fscache import ( "sync" @@ -35,7 +31,7 @@ type Cache struct { tail *cacheEntry } -func (c *Cache) allocateID() int64 { +func (c *Cache) AllocateID() int64 { if c == nil { return 0 } @@ -80,7 +76,7 @@ func (c *Cache) remove(e *cacheEntry) { } } -func (c *Cache) getEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, error) { +func (c *Cache) GetEntries(id int64, cb func() (fs.Entries, error)) (fs.Entries, error) { if c == nil { return cb() } diff --git a/fuse/cache_test.go b/internal/fscache/cache_test.go similarity index 92% rename from fuse/cache_test.go rename to internal/fscache/cache_test.go index 3325df3d8..8a34bb909 100644 --- a/fuse/cache_test.go +++ b/internal/fscache/cache_test.go @@ -1,4 +1,4 @@ -package fuse +package fscache import ( "errors" @@ -144,57 +144,57 @@ func TestCache(t *testing.T) { cv.verifyCacheOrdering(t) // fetch id1 - c.getEntries(id1, cs.get(id1)) + c.GetEntries(id1, cs.get(id1)) cv.verifyCacheMiss(t, id1) cv.verifyCacheOrdering(t, id1) // fetch id1 again - cache hit, no change - c.getEntries(id1, cs.get(id1)) + c.GetEntries(id1, cs.get(id1)) cv.verifyCacheHit(t, id1) cv.verifyCacheOrdering(t, id1) // fetch id2 - c.getEntries(id2, cs.get(id2)) + c.GetEntries(id2, cs.get(id2)) cv.verifyCacheMiss(t, id2) cv.verifyCacheOrdering(t, id2, id1) // fetch id1 again - cache hit, id1 moved to the top of the LRU list - c.getEntries(id1, cs.get(id1)) + c.GetEntries(id1, cs.get(id1)) cv.verifyCacheHit(t, id1) cv.verifyCacheOrdering(t, id1, id2) // fetch id2 again - c.getEntries(id2, cs.get(id2)) + c.GetEntries(id2, cs.get(id2)) cv.verifyCacheHit(t, id2) cv.verifyCacheOrdering(t, id2, id1) // fetch id3 - c.getEntries(id3, cs.get(id3)) + c.GetEntries(id3, cs.get(id3)) cv.verifyCacheMiss(t, id3) cv.verifyCacheOrdering(t, id3, id2, id1) // fetch id4 - c.getEntries(id4, cs.get(id4)) + c.GetEntries(id4, cs.get(id4)) cv.verifyCacheMiss(t, id4) cv.verifyCacheOrdering(t, id4, id3) // fetch id1 again - c.getEntries(id1, cs.get(id1)) + c.GetEntries(id1, cs.get(id1)) cv.verifyCacheMiss(t, id1) cv.verifyCacheOrdering(t, id1, id4) // fetch id5, it's a big one that expels all but one - c.getEntries(id5, cs.get(id5)) + c.GetEntries(id5, cs.get(id5)) cv.verifyCacheMiss(t, id5) cv.verifyCacheOrdering(t, id5, id1) // fetch id6 - c.getEntries(id6, cs.get(id6)) + c.GetEntries(id6, cs.get(id6)) cv.verifyCacheMiss(t, id6) cv.verifyCacheOrdering(t, id6) // fetch id7 - c.getEntries(id7, cs.get(id7)) + c.GetEntries(id7, cs.get(id7)) cv.verifyCacheMiss(t, id7) cv.verifyCacheOrdering(t, id6) } diff --git a/fuse/fusefs.go b/internal/fusemount/fusefs.go similarity index 83% rename from fuse/fusefs.go rename to internal/fusemount/fusefs.go index 09babd124..9ffdba179 100644 --- a/fuse/fusefs.go +++ b/internal/fusemount/fusefs.go @@ -1,9 +1,9 @@ // +build !windows -// Package fuse implements FUSE filesystem nodes for mounting contents of filesystem stored in repository. +// Package fusemount implements FUSE filesystem nodes for mounting contents of filesystem stored in repository. // // The FUSE implementation used is from bazil.org/fuse -package fuse +package fusemount import ( "fmt" @@ -12,6 +12,7 @@ "sort" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/fscache" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" @@ -21,7 +22,7 @@ type fuseNode struct { entry fs.Entry - cache *Cache + cache *fscache.Cache } func (n *fuseNode) Attr(ctx context.Context, a *fuse.Attr) error { @@ -76,7 +77,7 @@ func (dir *fuseDirectoryNode) Lookup(ctx context.Context, fileName string) (fuse } func (dir *fuseDirectoryNode) readPossiblyCachedReaddir() (fs.Entries, error) { - return dir.cache.getEntries(dir.cacheID, func() (fs.Entries, error) { + return dir.cache.GetEntries(dir.cacheID, func() (fs.Entries, error) { entries, err := dir.directory().Readdir() if err != nil { return nil, err @@ -127,7 +128,7 @@ func (sl *fuseSymlinkNode) Readlink(ctx context.Context, req *fuse.ReadlinkReque return sl.entry.(fs.Symlink).Readlink() } -func newFuseNode(e fs.Entry, cache *Cache) (fusefs.Node, error) { +func newFuseNode(e fs.Entry, cache *fscache.Cache) (fusefs.Node, error) { switch e := e.(type) { case fs.Directory: return newDirectoryNode(e, cache), nil @@ -140,11 +141,11 @@ func newFuseNode(e fs.Entry, cache *Cache) (fusefs.Node, error) { } } -func newDirectoryNode(dir fs.Directory, cache *Cache) fusefs.Node { - return &fuseDirectoryNode{fuseNode{dir, cache}, cache.allocateID()} +func newDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node { + return &fuseDirectoryNode{fuseNode{dir, cache}, cache.AllocateID()} } // NewDirectoryNode returns FUSE Node for a given fs.Directory -func NewDirectoryNode(dir fs.Directory, cache *Cache) fusefs.Node { +func NewDirectoryNode(dir fs.Directory, cache *fscache.Cache) fusefs.Node { return newDirectoryNode(dir, cache) } diff --git a/internal/webdavmount/webdavmount.go b/internal/webdavmount/webdavmount.go new file mode 100644 index 000000000..fc41102d3 --- /dev/null +++ b/internal/webdavmount/webdavmount.go @@ -0,0 +1,198 @@ +package webdavmount + +import ( + "errors" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/kopia/kopia/repo" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/fscache" + "golang.org/x/net/context" + "golang.org/x/net/webdav" +) + +var _ os.FileInfo = webdavFileInfo{} +var _ webdav.File = (*webdavFile)(nil) +var _ webdav.File = (*webdavDir)(nil) + +type webdavFile struct { + repo.ObjectReader + entry fs.File +} + +func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) { + return nil, errors.New("not a directory") +} + +func (f *webdavFile) Stat() (os.FileInfo, error) { + return webdavFileInfo{f.entry.Metadata()}, nil +} + +func (f *webdavFile) Write(b []byte) (int, error) { + return 0, errors.New("read-only filesystem") +} + +type webdavDir struct { + entry fs.Directory +} + +func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) { + log.Printf("ReadDir(%v)", d.entry.Metadata().Name) + entries, err := d.entry.Readdir() + if err != nil { + return nil, err + } + if n > 0 && n < len(entries) { + entries = entries[0:n] + } + + var fis []os.FileInfo + for _, e := range entries { + fis = append(fis, &webdavFileInfo{e.Metadata()}) + } + return fis, nil +} + +func (d *webdavDir) Stat() (os.FileInfo, error) { + return webdavFileInfo{d.entry.Metadata()}, nil +} + +func (d *webdavDir) Write(b []byte) (int, error) { + return 0, errors.New("read-only filesystem") +} + +func (d *webdavDir) Close() error { + return nil +} + +func (d *webdavDir) Read(b []byte) (int, error) { + return 0, errors.New("not supported") +} + +func (d *webdavDir) Seek(int64, int) (int64, error) { + return 0, errors.New("not supported") +} + +type webdavFileInfo struct { + md *fs.EntryMetadata +} + +func (i webdavFileInfo) IsDir() bool { + return (i.md.FileMode() & os.ModeDir) != 0 +} + +func (i webdavFileInfo) ModTime() time.Time { + return i.md.ModTime +} + +func (i webdavFileInfo) Mode() os.FileMode { + return i.md.FileMode() +} + +func (i webdavFileInfo) Name() string { + return i.md.Name +} + +func (i webdavFileInfo) Size() int64 { + return i.md.FileSize +} + +func (i webdavFileInfo) Sys() interface{} { + return nil +} + +type webdavFS struct { + dir fs.Directory + cache *fscache.Cache +} + +func (w *webdavFS) Mkdir(ctx context.Context, path string, mode os.FileMode) error { + return fmt.Errorf("can't create %q: read-only filesystem", path) +} + +func (w *webdavFS) RemoveAll(ctx context.Context, path string) error { + return fmt.Errorf("can't remove %q: read-only filesystem", path) +} + +func (w *webdavFS) Rename(ctx context.Context, oldPath, newPath string) error { + return fmt.Errorf("can't rename %q to %q: read-only filesystem", oldPath, newPath) +} + +func (w *webdavFS) OpenFile(ctx context.Context, path string, flags int, mode os.FileMode) (webdav.File, error) { + f, err := w.findEntry(path) + if err != nil { + log.Printf("OpenFile(%q) failed with %v", path, err) + return nil, err + } + + switch f := f.(type) { + case fs.Directory: + log.Printf("OpenFile(%q) succeeded with directory: %v", path, f.Metadata()) + return &webdavDir{f}, nil + case fs.File: + log.Printf("OpenFile(%q) succeeded with file: %v", path, f.Metadata()) + return &webdavFile{nil, f}, nil + } + + return nil, fmt.Errorf("can't open %q: not implemented", path) +} + +func (w *webdavFS) Stat(ctx context.Context, path string) (os.FileInfo, error) { + e, err := w.findEntry(path) + if err != nil { + log.Printf("Stat(%q) failed with %v", path, err) + return nil, err + } + + log.Printf("Stat(%q) success with %v", path, e.Metadata()) + return webdavFileInfo{e.Metadata()}, nil +} + +func (w *webdavFS) findEntry(path string) (fs.Entry, error) { + parts := removeEmpty(strings.Split(path, "/")) + var e fs.Entry = w.dir + for i, p := range parts { + d, ok := e.(fs.Directory) + if !ok { + return nil, fmt.Errorf("%q not found in %q (not a directory)", p, strings.Join(parts[0:i], "/")) + } + + entries, err := d.Readdir() + if err != nil { + return nil, err + } + + for _, e := range entries { + log.Printf("%+v", e.Metadata()) + } + + e = entries.FindByName(p) + if e == nil { + return nil, fmt.Errorf("%q not found in %q (not found)", p, strings.Join(parts[0:i], "/")) + } + } + + return e, nil +} + +func removeEmpty(s []string) []string { + result := s[:0] + for _, e := range s { + if e == "" { + continue + } + result = append(result, e) + } + + return result +} + +// WebDAVFS returns a webdav.FileSystem implementation for a given directory. +func WebDAVFS(entry fs.Directory, cache *fscache.Cache) webdav.FileSystem { + return &webdavFS{entry, cache} +}