mirror of
https://github.com/kopia/kopia.git
synced 2026-03-27 10:32:08 -04:00
refactor(repository): added fs.DirectoryIterator (#3365)
* refactor(repository): added fs.DirectoryIterator This significantly reduces number of small allocations while taking snapshots of lots of files, which leads to faster snapshots. ``` $ runbench --kopia-exe ~/go/bin/kopia \ --compare-to-exe ~/go/bin/kopia-baseline --min-duration 30s \ ./snapshot-linux-parallel-4.sh DIFF duration: current:5.1 baseline:5.8 change:-13.0 % DIFF repo_size: current:1081614127.6 baseline:1081615302.8 change:-0.0 % DIFF num_files: current:60.0 baseline:60.0 change:0% DIFF avg_heap_objects: current:4802666.0 baseline:4905741.8 change:-2.1 % DIFF avg_heap_bytes: current:737397275.2 baseline:715263289.6 change:+3.1 % DIFF avg_ram: current:215.0 baseline:211.5 change:+1.6 % DIFF max_ram: current:294.8 baseline:311.4 change:-5.3 % DIFF avg_cpu: current:167.3 baseline:145.3 change:+15.1 % DIFF max_cpu: current:227.2 baseline:251.0 change:-9.5 % ``` * changed `Next()` API * mechanical move of the iterator to its own file * clarified comment * pr feedback * mechanical move of all localfs dependencies on os.FileInfo to a separate file * Update fs/entry.go Co-authored-by: ashmrtn <3891298+ashmrtn@users.noreply.github.com> * Update fs/entry_dir_iterator.go Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> * doc: clarified valid results from Next() --------- Co-authored-by: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com>
This commit is contained in:
@@ -54,9 +54,22 @@ func (c *commandList) run(ctx context.Context, rep repo.Repository) error {
|
||||
}
|
||||
|
||||
func (c *commandList) listDirectory(ctx context.Context, d fs.Directory, prefix, indent string) error {
|
||||
if err := d.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error {
|
||||
return c.printDirectoryEntry(innerCtx, e, prefix, indent)
|
||||
}); err != nil {
|
||||
iter, err := d.Iterate(ctx)
|
||||
if err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
e, err := iter.Next(ctx)
|
||||
for e != nil {
|
||||
if err2 := c.printDirectoryEntry(ctx, e, prefix, indent); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
e, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ func(innerCtx context.Context) ([]fs.Entry, error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return d.IterateEntries(ctx, callback) //nolint:wrapcheck
|
||||
return fs.IterateEntries(ctx, d, callback) //nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (c *Cache) getEntriesFromCacheLocked(ctx context.Context, id string) []fs.Entry {
|
||||
|
||||
90
fs/entry.go
90
fs/entry.go
@@ -59,13 +59,56 @@ type StreamingFile interface {
|
||||
// Directory represents contents of a directory.
|
||||
type Directory interface {
|
||||
Entry
|
||||
|
||||
Child(ctx context.Context, name string) (Entry, error)
|
||||
IterateEntries(ctx context.Context, cb func(context.Context, Entry) error) error
|
||||
Iterate(ctx context.Context) (DirectoryIterator, error)
|
||||
// SupportsMultipleIterations returns true if the Directory supports iterating
|
||||
// through the entries multiple times. Otherwise it returns false.
|
||||
SupportsMultipleIterations() bool
|
||||
}
|
||||
|
||||
// IterateEntries iterates entries the provided directory and invokes given callback for each entry
|
||||
// or until the callback returns an error.
|
||||
func IterateEntries(ctx context.Context, dir Directory, cb func(context.Context, Entry) error) error {
|
||||
iter, err := dir.Iterate(ctx)
|
||||
if err != nil {
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
cur, err := iter.Next(ctx)
|
||||
|
||||
for cur != nil {
|
||||
if err2 := cb(ctx, cur); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
cur, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
// DirectoryIterator iterates entries in a directory.
|
||||
//
|
||||
// The client is expected to call Next() in a loop until it returns a nil entry to signal
|
||||
// end of iteration or until an error has occurred.
|
||||
//
|
||||
// Valid results:
|
||||
//
|
||||
// (nil,nil) - end of iteration, success
|
||||
// (entry,nil) - iteration in progress, success
|
||||
// (nil,err) - iteration stopped, failure
|
||||
//
|
||||
// The behavior of calling Next() after iteration has signaled its end is undefined.
|
||||
//
|
||||
// To release any resources associated with iteration the client must call Close().
|
||||
type DirectoryIterator interface {
|
||||
Next(ctx context.Context) (Entry, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
// DirectoryWithSummary is optionally implemented by Directory that provide summary.
|
||||
type DirectoryWithSummary interface {
|
||||
Summary(ctx context.Context) (*DirectorySummary, error)
|
||||
@@ -78,14 +121,22 @@ type ErrorEntry interface {
|
||||
ErrorInfo() error
|
||||
}
|
||||
|
||||
// GetAllEntries uses IterateEntries to return all entries in a Directory.
|
||||
// GetAllEntries uses Iterate to return all entries in a Directory.
|
||||
func GetAllEntries(ctx context.Context, d Directory) ([]Entry, error) {
|
||||
entries := []Entry{}
|
||||
|
||||
err := d.IterateEntries(ctx, func(ctx context.Context, e Entry) error {
|
||||
entries = append(entries, e)
|
||||
return nil
|
||||
})
|
||||
iter, err := d.Iterate(ctx)
|
||||
if err != nil {
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
cur, err := iter.Next(ctx)
|
||||
for cur != nil {
|
||||
entries = append(entries, cur)
|
||||
cur, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
return entries, err //nolint:wrapcheck
|
||||
}
|
||||
@@ -96,30 +147,27 @@ func GetAllEntries(ctx context.Context, d Directory) ([]Entry, error) {
|
||||
// IterateEntriesAndFindChild iterates through entries from a directory and returns one by name.
|
||||
// This is a convenience function that may be helpful in implementations of Directory.Child().
|
||||
func IterateEntriesAndFindChild(ctx context.Context, d Directory, name string) (Entry, error) {
|
||||
type errStop struct {
|
||||
error
|
||||
iter, err := d.Iterate(ctx)
|
||||
if err != nil {
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
var result Entry
|
||||
defer iter.Close()
|
||||
|
||||
err := d.IterateEntries(ctx, func(c context.Context, e Entry) error {
|
||||
if result == nil && e.Name() == name {
|
||||
result = e
|
||||
return errStop{errors.New("")}
|
||||
cur, err := iter.Next(ctx)
|
||||
for cur != nil {
|
||||
if cur.Name() == name {
|
||||
return cur, nil
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
var stopped errStop
|
||||
if err != nil && !errors.As(err, &stopped) {
|
||||
return nil, errors.Wrap(err, "error reading directory")
|
||||
cur, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
if result == nil {
|
||||
return nil, ErrEntryNotFound
|
||||
if err != nil {
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return nil, ErrEntryNotFound
|
||||
}
|
||||
|
||||
// MaxFailedEntriesPerDirectorySummary is the maximum number of failed entries per directory summary.
|
||||
|
||||
30
fs/entry_dir_iterator.go
Normal file
30
fs/entry_dir_iterator.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package fs
|
||||
|
||||
import "context"
|
||||
|
||||
type staticIterator struct {
|
||||
cur int
|
||||
entries []Entry
|
||||
err error
|
||||
}
|
||||
|
||||
func (it *staticIterator) Close() {
|
||||
}
|
||||
|
||||
func (it *staticIterator) Next(ctx context.Context) (Entry, error) {
|
||||
if it.cur < len(it.entries) {
|
||||
v := it.entries[it.cur]
|
||||
it.cur++
|
||||
|
||||
return v, it.err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// StaticIterator returns a DirectoryIterator which returns the provided
|
||||
// entries in order followed by a given final error.
|
||||
// It is not safe to concurrently access directory iterator.
|
||||
func StaticIterator(entries []Entry, err error) DirectoryIterator {
|
||||
return &staticIterator{0, entries, err}
|
||||
}
|
||||
@@ -5,6 +5,7 @@
|
||||
"bufio"
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -147,28 +148,81 @@ func (d *ignoreDirectory) DirEntryOrNil(ctx context.Context) (*snapshot.DirEntry
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d *ignoreDirectory) IterateEntries(ctx context.Context, callback func(ctx context.Context, entry fs.Entry) error) error {
|
||||
type ignoreDirIterator struct {
|
||||
//nolint:containedctx
|
||||
ctx context.Context
|
||||
d *ignoreDirectory
|
||||
inner fs.DirectoryIterator
|
||||
thisContext *ignoreContext
|
||||
}
|
||||
|
||||
func (i *ignoreDirIterator) Next(ctx context.Context) (fs.Entry, error) {
|
||||
cur, err := i.inner.Next(ctx)
|
||||
|
||||
for cur != nil {
|
||||
//nolint:contextcheck
|
||||
if wrapped, ok := i.d.maybeWrappedChildEntry(i.ctx, i.thisContext, cur); ok {
|
||||
return wrapped, nil
|
||||
}
|
||||
|
||||
cur, err = i.inner.Next(ctx)
|
||||
}
|
||||
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
func (i *ignoreDirIterator) Close() {
|
||||
i.inner.Close()
|
||||
|
||||
*i = ignoreDirIterator{}
|
||||
ignoreDirIteratorPool.Put(i)
|
||||
}
|
||||
|
||||
func (d *ignoreDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
if d.skipCacheDirectory(ctx, d.relativePath, d.policyTree) {
|
||||
return nil
|
||||
return fs.StaticIterator(nil, nil), nil
|
||||
}
|
||||
|
||||
thisContext, err := d.buildContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return d.Directory.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
if wrapped, ok := d.maybeWrappedChildEntry(ctx, thisContext, e); ok {
|
||||
return callback(ctx, wrapped)
|
||||
}
|
||||
inner, err := d.Directory.Iterate(ctx)
|
||||
if err != nil {
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
it := ignoreDirIteratorPool.Get().(*ignoreDirIterator) //nolint:forcetypeassert
|
||||
it.ctx = ctx
|
||||
it.d = d
|
||||
it.inner = inner
|
||||
it.thisContext = thisContext
|
||||
|
||||
return it, nil
|
||||
}
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var ignoreDirectoryPool = sync.Pool{
|
||||
New: func() any { return &ignoreDirectory{} },
|
||||
}
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var ignoreDirIteratorPool = sync.Pool{
|
||||
New: func() any { return &ignoreDirIterator{} },
|
||||
}
|
||||
|
||||
func (d *ignoreDirectory) Close() {
|
||||
d.Directory.Close()
|
||||
|
||||
*d = ignoreDirectory{}
|
||||
ignoreDirectoryPool.Put(d)
|
||||
}
|
||||
|
||||
func (d *ignoreDirectory) maybeWrappedChildEntry(ctx context.Context, ic *ignoreContext, e fs.Entry) (fs.Entry, bool) {
|
||||
if !ic.shouldIncludeByName(ctx, d.relativePath+"/"+e.Name(), e, d.policyTree) {
|
||||
s := d.relativePath + "/" + e.Name()
|
||||
|
||||
if !ic.shouldIncludeByName(ctx, s, e, d.policyTree) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
@@ -181,7 +235,14 @@ func (d *ignoreDirectory) maybeWrappedChildEntry(ctx context.Context, ic *ignore
|
||||
}
|
||||
|
||||
if dir, ok := e.(fs.Directory); ok {
|
||||
return &ignoreDirectory{d.relativePath + "/" + e.Name(), ic, d.policyTree.Child(e.Name()), dir}, true
|
||||
id := ignoreDirectoryPool.Get().(*ignoreDirectory) //nolint:forcetypeassert
|
||||
|
||||
id.relativePath = s
|
||||
id.parentContext = ic
|
||||
id.policyTree = d.policyTree.Child(e.Name())
|
||||
id.Directory = dir
|
||||
|
||||
return id, true
|
||||
}
|
||||
|
||||
return e, true
|
||||
|
||||
@@ -549,7 +549,7 @@ func walkTree(t *testing.T, dir fs.Directory) []string {
|
||||
walk = func(path string, d fs.Directory) error {
|
||||
output = append(output, path+"/")
|
||||
|
||||
return d.IterateEntries(testlogging.Context(t), func(innerCtx context.Context, e fs.Entry) error {
|
||||
return fs.IterateEntries(testlogging.Context(t), d, func(innerCtx context.Context, e fs.Entry) error {
|
||||
relPath := path + "/" + e.Name()
|
||||
|
||||
if subdir, ok := e.(fs.Directory); ok {
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -14,11 +11,7 @@
|
||||
"github.com/kopia/kopia/fs"
|
||||
)
|
||||
|
||||
const (
|
||||
numEntriesToRead = 100 // number of directory entries to read in one shot
|
||||
dirListingPrefetch = 200 // number of directory items to os.Lstat() in advance
|
||||
paralellelStatGoroutines = 4 // how many goroutines to use when Lstat() on large directory
|
||||
)
|
||||
const numEntriesToRead = 100 // number of directory entries to read in one shot
|
||||
|
||||
type filesystemEntry struct {
|
||||
name string
|
||||
@@ -71,20 +64,6 @@ func (e *filesystemEntry) LocalFilesystemPath() string {
|
||||
return e.fullPath()
|
||||
}
|
||||
|
||||
var _ os.FileInfo = (*filesystemEntry)(nil)
|
||||
|
||||
func newEntry(fi os.FileInfo, prefix string) filesystemEntry {
|
||||
return filesystemEntry{
|
||||
TrimShallowSuffix(fi.Name()),
|
||||
fi.Size(),
|
||||
fi.ModTime().UnixNano(),
|
||||
fi.Mode(),
|
||||
platformSpecificOwnerInfo(fi),
|
||||
platformSpecificDeviceInfo(fi),
|
||||
prefix,
|
||||
}
|
||||
}
|
||||
|
||||
type filesystemDirectory struct {
|
||||
filesystemEntry
|
||||
}
|
||||
@@ -111,167 +90,6 @@ func (fsd *filesystemDirectory) Size() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (fsd *filesystemDirectory) Child(ctx context.Context, name string) (fs.Entry, error) {
|
||||
fullPath := fsd.fullPath()
|
||||
|
||||
st, err := os.Lstat(filepath.Join(fullPath, name))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, fs.ErrEntryNotFound
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "unable to get child")
|
||||
}
|
||||
|
||||
return entryFromDirEntry(st, fullPath+string(filepath.Separator)), nil
|
||||
}
|
||||
|
||||
type entryWithError struct {
|
||||
entry fs.Entry
|
||||
err error
|
||||
}
|
||||
|
||||
func toDirEntryOrNil(dirEntry os.DirEntry, prefix string) (fs.Entry, error) {
|
||||
fi, err := os.Lstat(prefix + dirEntry.Name())
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "error reading directory")
|
||||
}
|
||||
|
||||
return entryFromDirEntry(fi, prefix), nil
|
||||
}
|
||||
|
||||
func (fsd *filesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
fullPath := fsd.fullPath()
|
||||
|
||||
f, direrr := os.Open(fullPath) //nolint:gosec
|
||||
if direrr != nil {
|
||||
return errors.Wrap(direrr, "unable to read directory")
|
||||
}
|
||||
defer f.Close() //nolint:errcheck
|
||||
|
||||
childPrefix := fullPath + string(filepath.Separator)
|
||||
|
||||
batch, err := f.ReadDir(numEntriesToRead)
|
||||
if len(batch) == numEntriesToRead {
|
||||
return fsd.iterateEntriesInParallel(ctx, f, childPrefix, batch, cb)
|
||||
}
|
||||
|
||||
for len(batch) > 0 {
|
||||
for _, de := range batch {
|
||||
e, err2 := toDirEntryOrNil(de, childPrefix)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
if e == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err3 := cb(ctx, e); err3 != nil {
|
||||
return err3
|
||||
}
|
||||
}
|
||||
|
||||
batch, err = f.ReadDir(numEntriesToRead)
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "error listing directory")
|
||||
}
|
||||
|
||||
//nolint:gocognit,gocyclo
|
||||
func (fsd *filesystemDirectory) iterateEntriesInParallel(ctx context.Context, f *os.File, childPrefix string, batch []os.DirEntry, cb func(context.Context, fs.Entry) error) error {
|
||||
inputCh := make(chan os.DirEntry, dirListingPrefetch)
|
||||
outputCh := make(chan entryWithError, dirListingPrefetch)
|
||||
|
||||
closed := make(chan struct{})
|
||||
defer close(closed)
|
||||
|
||||
var workersWG sync.WaitGroup
|
||||
|
||||
// start goroutines that will convert 'os.DirEntry' to 'entryWithError'
|
||||
for i := 0; i < paralellelStatGoroutines; i++ {
|
||||
workersWG.Add(1)
|
||||
|
||||
go func() {
|
||||
defer workersWG.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-closed:
|
||||
return
|
||||
|
||||
case de := <-inputCh:
|
||||
e, err := toDirEntryOrNil(de, childPrefix)
|
||||
outputCh <- entryWithError{entry: e, err: err}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var pending int
|
||||
|
||||
for len(batch) > 0 {
|
||||
for _, de := range batch {
|
||||
// before pushing fetch from outputCh and invoke callbacks for all entries in it
|
||||
invokeCallbacks:
|
||||
for {
|
||||
select {
|
||||
case dwe := <-outputCh:
|
||||
pending--
|
||||
|
||||
if dwe.err != nil {
|
||||
return dwe.err
|
||||
}
|
||||
|
||||
if dwe.entry != nil {
|
||||
if err := cb(ctx, dwe.entry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
break invokeCallbacks
|
||||
}
|
||||
}
|
||||
|
||||
inputCh <- de
|
||||
pending++
|
||||
}
|
||||
|
||||
nextBatch, err := f.ReadDir(numEntriesToRead)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
//nolint:wrapcheck
|
||||
return err
|
||||
}
|
||||
|
||||
batch = nextBatch
|
||||
}
|
||||
|
||||
for i := 0; i < pending; i++ {
|
||||
dwe := <-outputCh
|
||||
|
||||
if dwe.err != nil {
|
||||
return dwe.err
|
||||
}
|
||||
|
||||
if dwe.entry != nil {
|
||||
if err := cb(ctx, dwe.entry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type fileWithMetadata struct {
|
||||
*os.File
|
||||
}
|
||||
@@ -315,23 +133,6 @@ func dirPrefix(s string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// NewEntry returns fs.Entry for the specified path, the result will be one of supported entry types: fs.File, fs.Directory, fs.Symlink
|
||||
// or fs.UnsupportedEntry.
|
||||
func NewEntry(path string) (fs.Entry, error) {
|
||||
path = filepath.Clean(path)
|
||||
|
||||
fi, err := os.Lstat(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to determine entry type")
|
||||
}
|
||||
|
||||
if path == "/" {
|
||||
return entryFromDirEntry(fi, ""), nil
|
||||
}
|
||||
|
||||
return entryFromDirEntry(fi, dirPrefix(path)), nil
|
||||
}
|
||||
|
||||
// Directory returns fs.Directory for the specified path.
|
||||
func Directory(path string) (fs.Directory, error) {
|
||||
e, err := NewEntry(path)
|
||||
@@ -353,31 +154,6 @@ func Directory(path string) (fs.Directory, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func entryFromDirEntry(fi os.FileInfo, prefix string) fs.Entry {
|
||||
isplaceholder := strings.HasSuffix(fi.Name(), ShallowEntrySuffix)
|
||||
maskedmode := fi.Mode() & os.ModeType
|
||||
|
||||
switch {
|
||||
case maskedmode == os.ModeDir && !isplaceholder:
|
||||
return newFilesystemDirectory(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == os.ModeDir && isplaceholder:
|
||||
return newShallowFilesystemDirectory(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == os.ModeSymlink && !isplaceholder:
|
||||
return newFilesystemSymlink(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == 0 && !isplaceholder:
|
||||
return newFilesystemFile(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == 0 && isplaceholder:
|
||||
return newShallowFilesystemFile(newEntry(fi, prefix))
|
||||
|
||||
default:
|
||||
return newFilesystemErrorEntry(newEntry(fi, prefix), fs.ErrUnknown)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
_ fs.Directory = (*filesystemDirectory)(nil)
|
||||
_ fs.File = (*filesystemFile)(nil)
|
||||
|
||||
159
fs/localfs/local_fs_os.go
Normal file
159
fs/localfs/local_fs_os.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package localfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
)
|
||||
|
||||
type filesystemDirectoryIterator struct {
|
||||
dirHandle *os.File
|
||||
childPrefix string
|
||||
|
||||
currentIndex int
|
||||
currentBatch []os.DirEntry
|
||||
}
|
||||
|
||||
func (it *filesystemDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) {
|
||||
for {
|
||||
// we're at the end of the current batch, fetch the next batch
|
||||
if it.currentIndex >= len(it.currentBatch) {
|
||||
batch, err := it.dirHandle.ReadDir(numEntriesToRead)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
// stop iteration
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
it.currentIndex = 0
|
||||
it.currentBatch = batch
|
||||
|
||||
// got empty batch
|
||||
if len(batch) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
n := it.currentIndex
|
||||
it.currentIndex++
|
||||
|
||||
e, err := toDirEntryOrNil(it.currentBatch[n], it.childPrefix)
|
||||
if err != nil {
|
||||
// stop iteration
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e == nil {
|
||||
// go to the next item
|
||||
continue
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (it *filesystemDirectoryIterator) Close() {
|
||||
it.dirHandle.Close() //nolint:errcheck
|
||||
}
|
||||
|
||||
func (fsd *filesystemDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
fullPath := fsd.fullPath()
|
||||
|
||||
f, direrr := os.Open(fullPath) //nolint:gosec
|
||||
if direrr != nil {
|
||||
return nil, errors.Wrap(direrr, "unable to read directory")
|
||||
}
|
||||
|
||||
childPrefix := fullPath + string(filepath.Separator)
|
||||
|
||||
return &filesystemDirectoryIterator{dirHandle: f, childPrefix: childPrefix}, nil
|
||||
}
|
||||
|
||||
func (fsd *filesystemDirectory) Child(ctx context.Context, name string) (fs.Entry, error) {
|
||||
fullPath := fsd.fullPath()
|
||||
|
||||
st, err := os.Lstat(filepath.Join(fullPath, name))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, fs.ErrEntryNotFound
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "unable to get child")
|
||||
}
|
||||
|
||||
return entryFromDirEntry(st, fullPath+string(filepath.Separator)), nil
|
||||
}
|
||||
|
||||
func toDirEntryOrNil(dirEntry os.DirEntry, prefix string) (fs.Entry, error) {
|
||||
fi, err := os.Lstat(prefix + dirEntry.Name())
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "error reading directory")
|
||||
}
|
||||
|
||||
return entryFromDirEntry(fi, prefix), nil
|
||||
}
|
||||
|
||||
// NewEntry returns fs.Entry for the specified path, the result will be one of supported entry types: fs.File, fs.Directory, fs.Symlink
|
||||
// or fs.UnsupportedEntry.
|
||||
func NewEntry(path string) (fs.Entry, error) {
|
||||
path = filepath.Clean(path)
|
||||
|
||||
fi, err := os.Lstat(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to determine entry type")
|
||||
}
|
||||
|
||||
if path == "/" {
|
||||
return entryFromDirEntry(fi, ""), nil
|
||||
}
|
||||
|
||||
return entryFromDirEntry(fi, dirPrefix(path)), nil
|
||||
}
|
||||
|
||||
func entryFromDirEntry(fi os.FileInfo, prefix string) fs.Entry {
|
||||
isplaceholder := strings.HasSuffix(fi.Name(), ShallowEntrySuffix)
|
||||
maskedmode := fi.Mode() & os.ModeType
|
||||
|
||||
switch {
|
||||
case maskedmode == os.ModeDir && !isplaceholder:
|
||||
return newFilesystemDirectory(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == os.ModeDir && isplaceholder:
|
||||
return newShallowFilesystemDirectory(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == os.ModeSymlink && !isplaceholder:
|
||||
return newFilesystemSymlink(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == 0 && !isplaceholder:
|
||||
return newFilesystemFile(newEntry(fi, prefix))
|
||||
|
||||
case maskedmode == 0 && isplaceholder:
|
||||
return newShallowFilesystemFile(newEntry(fi, prefix))
|
||||
|
||||
default:
|
||||
return newFilesystemErrorEntry(newEntry(fi, prefix), fs.ErrUnknown)
|
||||
}
|
||||
}
|
||||
|
||||
var _ os.FileInfo = (*filesystemEntry)(nil)
|
||||
|
||||
func newEntry(fi os.FileInfo, prefix string) filesystemEntry {
|
||||
return filesystemEntry{
|
||||
TrimShallowSuffix(fi.Name()),
|
||||
fi.Size(),
|
||||
fi.ModTime().UnixNano(),
|
||||
fi.Mode(),
|
||||
platformSpecificOwnerInfo(fi),
|
||||
platformSpecificDeviceInfo(fi),
|
||||
prefix,
|
||||
}
|
||||
}
|
||||
@@ -147,7 +147,7 @@ func TestIterateNonExistent(t *testing.T) {
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
require.ErrorIs(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
||||
t.Fatal("this won't be invoked")
|
||||
return nil
|
||||
}), os.ErrNotExist)
|
||||
@@ -168,7 +168,7 @@ func testIterate(t *testing.T, nFiles int) {
|
||||
|
||||
names := map[string]int64{}
|
||||
|
||||
require.NoError(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
require.NoError(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
||||
names[e.Name()] = e.Size()
|
||||
return nil
|
||||
}))
|
||||
@@ -179,7 +179,7 @@ func testIterate(t *testing.T, nFiles int) {
|
||||
|
||||
cnt := 0
|
||||
|
||||
require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
require.ErrorIs(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
||||
cnt++
|
||||
|
||||
if cnt == nFiles/10 {
|
||||
@@ -191,7 +191,7 @@ func testIterate(t *testing.T, nFiles int) {
|
||||
|
||||
cnt = 0
|
||||
|
||||
require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
require.ErrorIs(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
||||
cnt++
|
||||
|
||||
if cnt == nFiles-1 {
|
||||
|
||||
@@ -57,7 +57,7 @@ func benchmarkReadDirWithCount(b *testing.B, fileCount int) {
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
dir, _ := localfs.Directory(td)
|
||||
dir.IterateEntries(ctx, func(context.Context, fs.Entry) error {
|
||||
fs.IterateEntries(ctx, dir, func(context.Context, fs.Entry) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -124,9 +124,8 @@ func (fsd *shallowFilesystemDirectory) Child(ctx context.Context, name string) (
|
||||
return nil, errors.New("shallowFilesystemDirectory.Child not supported")
|
||||
}
|
||||
|
||||
//nolint:revive
|
||||
func (fsd *shallowFilesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
return errors.New("shallowFilesystemDirectory.IterateEntries not supported")
|
||||
func (fsd *shallowFilesystemDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
return nil, errors.New("shallowFilesystemDirectory.IterateEntries not supported")
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -78,14 +78,8 @@ func (sd *staticDirectory) Child(ctx context.Context, name string) (fs.Entry, er
|
||||
return fs.IterateEntriesAndFindChild(ctx, sd, name)
|
||||
}
|
||||
|
||||
func (sd *staticDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
for _, e := range append([]fs.Entry{}, sd.entries...) {
|
||||
if err := cb(ctx, e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
func (sd *staticDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
return fs.StaticIterator(append([]fs.Entry{}, sd.entries...), nil), nil
|
||||
}
|
||||
|
||||
func (sd *staticDirectory) SupportsMultipleIterations() bool {
|
||||
@@ -105,10 +99,11 @@ func NewStaticDirectory(name string, entries []fs.Entry) fs.Directory {
|
||||
|
||||
type streamingDirectory struct {
|
||||
virtualEntry
|
||||
// Used to generate the next entry and execute the callback on it.
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
// +checklocks:mu
|
||||
callback func(context.Context, func(context.Context, fs.Entry) error) error
|
||||
mu sync.Mutex
|
||||
iter fs.DirectoryIterator
|
||||
}
|
||||
|
||||
var errChildNotSupported = errors.New("streamingDirectory.Child not supported")
|
||||
@@ -119,48 +114,36 @@ func (sd *streamingDirectory) Child(ctx context.Context, _ string) (fs.Entry, er
|
||||
|
||||
var errIteratorAlreadyUsed = errors.New("cannot use streaming directory iterator more than once") // +checklocksignore: mu
|
||||
|
||||
func (sd *streamingDirectory) getIterator() (func(context.Context, func(context.Context, fs.Entry) error) error, error) {
|
||||
func (sd *streamingDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
sd.mu.Lock()
|
||||
defer sd.mu.Unlock()
|
||||
|
||||
if sd.callback == nil {
|
||||
if sd.iter == nil {
|
||||
return nil, errIteratorAlreadyUsed
|
||||
}
|
||||
|
||||
cb := sd.callback
|
||||
sd.callback = nil
|
||||
it := sd.iter
|
||||
sd.iter = nil
|
||||
|
||||
return cb, nil
|
||||
}
|
||||
|
||||
func (sd *streamingDirectory) IterateEntries(
|
||||
ctx context.Context,
|
||||
callback func(context.Context, fs.Entry) error,
|
||||
) error {
|
||||
cb, err := sd.getIterator()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cb(ctx, callback)
|
||||
return it, nil
|
||||
}
|
||||
|
||||
func (sd *streamingDirectory) SupportsMultipleIterations() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// NewStreamingDirectory returns a directory that will call the given function
|
||||
// when IterateEntries is executed.
|
||||
// NewStreamingDirectory returns a directory that will invoke the provided iterator
|
||||
// on Iterate().
|
||||
func NewStreamingDirectory(
|
||||
name string,
|
||||
callback func(context.Context, func(context.Context, fs.Entry) error) error,
|
||||
iter fs.DirectoryIterator,
|
||||
) fs.Directory {
|
||||
return &streamingDirectory{
|
||||
virtualEntry: virtualEntry{
|
||||
name: name,
|
||||
mode: defaultPermissions | os.ModeDir,
|
||||
},
|
||||
callback: callback,
|
||||
iter: iter,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -137,12 +137,7 @@ func TestStreamingDirectory(t *testing.T) {
|
||||
|
||||
rootDir := NewStreamingDirectory(
|
||||
"root",
|
||||
func(
|
||||
ctx context.Context,
|
||||
callback func(context.Context, fs.Entry) error,
|
||||
) error {
|
||||
return callback(ctx, f)
|
||||
},
|
||||
fs.StaticIterator([]fs.Entry{f}, nil),
|
||||
)
|
||||
|
||||
entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir)
|
||||
@@ -174,12 +169,7 @@ func TestStreamingDirectory_MultipleIterationsFails(t *testing.T) {
|
||||
|
||||
rootDir := NewStreamingDirectory(
|
||||
"root",
|
||||
func(
|
||||
ctx context.Context,
|
||||
callback func(context.Context, fs.Entry) error,
|
||||
) error {
|
||||
return callback(ctx, f)
|
||||
},
|
||||
fs.StaticIterator([]fs.Entry{f}, nil),
|
||||
)
|
||||
|
||||
entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir)
|
||||
@@ -202,35 +192,11 @@ func TestStreamingDirectory_ReturnsCallbackError(t *testing.T) {
|
||||
|
||||
rootDir := NewStreamingDirectory(
|
||||
"root",
|
||||
func(
|
||||
ctx context.Context,
|
||||
callback func(context.Context, fs.Entry) error,
|
||||
) error {
|
||||
return callback(ctx, f)
|
||||
},
|
||||
fs.StaticIterator([]fs.Entry{f}, nil),
|
||||
)
|
||||
|
||||
err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error {
|
||||
err := fs.IterateEntries(testlogging.Context(t), rootDir, func(context.Context, fs.Entry) error {
|
||||
return errCallback
|
||||
})
|
||||
assert.ErrorIs(t, err, errCallback)
|
||||
}
|
||||
|
||||
var errIteration = errors.New("iteration error")
|
||||
|
||||
func TestStreamingDirectory_ReturnsReadDirError(t *testing.T) {
|
||||
rootDir := NewStreamingDirectory(
|
||||
"root",
|
||||
func(
|
||||
ctx context.Context,
|
||||
callback func(context.Context, fs.Entry) error,
|
||||
) error {
|
||||
return errIteration
|
||||
},
|
||||
)
|
||||
|
||||
err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error {
|
||||
return nil
|
||||
})
|
||||
assert.ErrorIs(t, err, errIteration)
|
||||
}
|
||||
|
||||
@@ -45,16 +45,10 @@ type testDirectory struct {
|
||||
modtime time.Time
|
||||
}
|
||||
|
||||
func (d *testDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
for _, file := range d.files {
|
||||
err := cb(ctx, file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
func (d *testDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
return fs.StaticIterator(d.files, nil), nil
|
||||
}
|
||||
|
||||
func (d *testDirectory) SupportsMultipleIterations() bool { return false }
|
||||
func (d *testDirectory) IsDir() bool { return true }
|
||||
func (d *testDirectory) LocalFilesystemPath() string { return d.name }
|
||||
|
||||
@@ -167,13 +167,24 @@ func (dir *fuseDirectoryNode) Readdir(ctx context.Context) (gofusefs.DirStream,
|
||||
// TODO: Slice not required as DirStream is also an iterator.
|
||||
result := []fuse.DirEntry{}
|
||||
|
||||
err := dir.directory().IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error {
|
||||
iter, err := dir.directory().Iterate(ctx)
|
||||
if err != nil {
|
||||
log(ctx).Errorf("error reading directory %v: %v", dir.entry.Name(), err)
|
||||
return nil, syscall.EIO
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
cur, err := iter.Next(ctx)
|
||||
for cur != nil {
|
||||
result = append(result, fuse.DirEntry{
|
||||
Name: e.Name(),
|
||||
Mode: entryToFuseMode(e),
|
||||
Name: cur.Name(),
|
||||
Mode: entryToFuseMode(cur),
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
cur, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log(ctx).Errorf("error reading directory %v: %v", dir.entry.Name(), err)
|
||||
return nil, syscall.EIO
|
||||
|
||||
@@ -303,23 +303,17 @@ func (imd *Directory) Child(ctx context.Context, name string) (fs.Entry, error)
|
||||
return nil, fs.ErrEntryNotFound
|
||||
}
|
||||
|
||||
// IterateEntries calls the given callback on each entry in the directory.
|
||||
func (imd *Directory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
// Iterate returns directory iterator.
|
||||
func (imd *Directory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
if imd.readdirError != nil {
|
||||
return imd.readdirError
|
||||
return nil, imd.readdirError
|
||||
}
|
||||
|
||||
if imd.onReaddir != nil {
|
||||
imd.onReaddir()
|
||||
}
|
||||
|
||||
for _, e := range append([]fs.Entry{}, imd.children...) {
|
||||
if err := cb(ctx, e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return fs.StaticIterator(append([]fs.Entry{}, imd.children...), nil), nil
|
||||
}
|
||||
|
||||
// File is an in-memory fs.File capable of simulating failures.
|
||||
|
||||
@@ -102,38 +102,40 @@ type webdavDir struct {
|
||||
// webdavDir implements webdav.File but needs context
|
||||
ctx context.Context //nolint:containedctx
|
||||
|
||||
w *webdavFS
|
||||
entry fs.Directory
|
||||
w *webdavFS
|
||||
info os.FileInfo
|
||||
iter fs.DirectoryIterator
|
||||
}
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var symlinksAreUnsupportedLogged = new(int32)
|
||||
|
||||
// TODO: (bug) This incorrectly truncates the entries in the directory and does not allow pagination.
|
||||
func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) {
|
||||
ctx := d.ctx
|
||||
|
||||
var fis []os.FileInfo
|
||||
|
||||
foundEntries := 0
|
||||
|
||||
err := d.entry.IterateEntries(d.ctx, func(innerCtx context.Context, e fs.Entry) error {
|
||||
if n > 0 && n <= foundEntries {
|
||||
return nil
|
||||
e, err := d.iter.Next(ctx)
|
||||
for e != nil {
|
||||
if n > 0 && foundEntries >= n {
|
||||
break
|
||||
}
|
||||
|
||||
foundEntries++
|
||||
|
||||
if _, isSymlink := e.(fs.Symlink); isSymlink {
|
||||
if atomic.AddInt32(symlinksAreUnsupportedLogged, 1) == 1 {
|
||||
//nolint:contextcheck
|
||||
log(d.ctx).Errorf("Mounting directories containing symbolic links using WebDAV is not supported. The link entries will be skipped.")
|
||||
}
|
||||
|
||||
return nil
|
||||
} else {
|
||||
fis = append(fis, &webdavFileInfo{e})
|
||||
}
|
||||
|
||||
fis = append(fis, &webdavFileInfo{e})
|
||||
return nil
|
||||
})
|
||||
e, err = d.iter.Next(ctx)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error reading directory")
|
||||
}
|
||||
@@ -142,7 +144,7 @@ func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) {
|
||||
}
|
||||
|
||||
func (d *webdavDir) Stat() (os.FileInfo, error) {
|
||||
return webdavFileInfo{d.entry}, nil
|
||||
return d.info, nil
|
||||
}
|
||||
|
||||
func (d *webdavDir) Write(_ []byte) (int, error) {
|
||||
@@ -150,6 +152,7 @@ func (d *webdavDir) Write(_ []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (d *webdavDir) Close() error {
|
||||
d.iter.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -190,7 +193,12 @@ func (w *webdavFS) OpenFile(ctx context.Context, path string, _ int, _ os.FileMo
|
||||
|
||||
switch f := f.(type) {
|
||||
case fs.Directory:
|
||||
return &webdavDir{ctx, w, f}, nil
|
||||
iter, err := f.Iterate(ctx)
|
||||
if err != nil {
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
return &webdavDir{ctx, w, webdavFileInfo{f}, iter}, nil
|
||||
case fs.File:
|
||||
return &webdavFile{ctx: ctx, entry: f}, nil
|
||||
}
|
||||
|
||||
@@ -65,10 +65,10 @@ func (s *repositoryAllSources) Child(ctx context.Context, name string) (fs.Entry
|
||||
return fs.IterateEntriesAndFindChild(ctx, s, name)
|
||||
}
|
||||
|
||||
func (s *repositoryAllSources) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
func (s *repositoryAllSources) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
srcs, err := snapshot.ListSources(ctx, s.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing sources")
|
||||
return nil, errors.Wrap(err, "error listing sources")
|
||||
}
|
||||
|
||||
users := map[string]bool{}
|
||||
@@ -85,19 +85,17 @@ func (s *repositoryAllSources) IterateEntries(ctx context.Context, cb func(conte
|
||||
|
||||
name2safe = disambiguateSafeNames(name2safe)
|
||||
|
||||
var entries []fs.Entry
|
||||
|
||||
for u := range users {
|
||||
e := &sourceDirectories{
|
||||
entries = append(entries, &sourceDirectories{
|
||||
rep: s.rep,
|
||||
userHost: u,
|
||||
name: name2safe[u],
|
||||
}
|
||||
|
||||
if err2 := cb(ctx, e); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
return fs.StaticIterator(entries, nil), nil
|
||||
}
|
||||
|
||||
// AllSourcesEntry returns fs.Directory that contains the list of all snapshot sources found in the repository.
|
||||
|
||||
@@ -6,8 +6,6 @@
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/fs/ignorefs"
|
||||
"github.com/kopia/kopia/internal/units"
|
||||
@@ -107,10 +105,6 @@ func Estimate(ctx context.Context, entry fs.Directory, policyTree *policy.Tree,
|
||||
}
|
||||
|
||||
func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTree *policy.Tree, stats *snapshot.Stats, ib, eb SampleBuckets, ed *[]string, progress EstimateProgress, maxExamplesPerBucket int) error {
|
||||
type processEntryError struct {
|
||||
error
|
||||
}
|
||||
|
||||
// see if the context got canceled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -130,22 +124,26 @@ type processEntryError struct {
|
||||
|
||||
progress.Processing(ctx, relativePath)
|
||||
|
||||
err := entry.IterateEntries(ctx, func(c context.Context, child fs.Entry) error {
|
||||
defer child.Close()
|
||||
iter, err := entry.Iterate(ctx)
|
||||
if err == nil {
|
||||
defer iter.Close()
|
||||
|
||||
if err2 := estimate(ctx, filepath.Join(relativePath, child.Name()), child, policyTree.Child(child.Name()), stats, ib, eb, ed, progress, maxExamplesPerBucket); err2 != nil {
|
||||
return processEntryError{err2}
|
||||
var child fs.Entry
|
||||
|
||||
child, err = iter.Next(ctx)
|
||||
for child != nil {
|
||||
if err = estimate(ctx, filepath.Join(relativePath, child.Name()), child, policyTree.Child(child.Name()), stats, ib, eb, ed, progress, maxExamplesPerBucket); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
child.Close()
|
||||
child, err = iter.Next(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
progress.Stats(ctx, stats, ib, eb, *ed, false)
|
||||
|
||||
var funcErr processEntryError
|
||||
if err != nil {
|
||||
if errors.As(err, &funcErr) {
|
||||
return funcErr.error
|
||||
}
|
||||
|
||||
isIgnored := policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false)
|
||||
|
||||
if isIgnored {
|
||||
@@ -155,9 +153,10 @@ type processEntryError struct {
|
||||
}
|
||||
|
||||
progress.Error(ctx, relativePath, err, isIgnored)
|
||||
}
|
||||
|
||||
progress.Stats(ctx, stats, ib, eb, *ed, false)
|
||||
//nolint:wrapcheck
|
||||
return err
|
||||
}
|
||||
|
||||
case fs.File:
|
||||
ib.add(relativePath, entry.Size(), maxExamplesPerBucket)
|
||||
|
||||
@@ -50,9 +50,7 @@ func TestEstimate_SkipsStreamingDirectory(t *testing.T) {
|
||||
rootDir := virtualfs.NewStaticDirectory("root", []fs.Entry{
|
||||
virtualfs.NewStreamingDirectory(
|
||||
"a-dir",
|
||||
func(ctx context.Context, callback func(context.Context, fs.Entry) error) error {
|
||||
return callback(ctx, f)
|
||||
},
|
||||
fs.StaticIterator([]fs.Entry{f}, nil),
|
||||
),
|
||||
})
|
||||
|
||||
|
||||
@@ -133,18 +133,18 @@ func (rd *repositoryDirectory) Child(ctx context.Context, name string) (fs.Entry
|
||||
return EntryFromDirEntry(rd.repo, de), nil
|
||||
}
|
||||
|
||||
func (rd *repositoryDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
func (rd *repositoryDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
if err := rd.ensureDirEntriesLoaded(ctx); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var entries []fs.Entry
|
||||
|
||||
for _, de := range rd.dirEntries {
|
||||
if err := cb(ctx, EntryFromDirEntry(rd.repo, de)); err != nil {
|
||||
return err
|
||||
}
|
||||
entries = append(entries, EntryFromDirEntry(rd.repo, de))
|
||||
}
|
||||
|
||||
return nil
|
||||
return fs.StaticIterator(entries, nil), nil
|
||||
}
|
||||
|
||||
func (rd *repositoryDirectory) ensureDirEntriesLoaded(ctx context.Context) error {
|
||||
@@ -298,10 +298,13 @@ func SnapshotRoot(rep repo.Repository, man *snapshot.Manifest) (fs.Entry, error)
|
||||
func AutoDetectEntryFromObjectID(ctx context.Context, rep repo.Repository, oid object.ID, maybeName string) fs.Entry {
|
||||
if IsDirectoryID(oid) {
|
||||
dirEntry := DirectoryEntry(rep, oid, nil)
|
||||
if err := dirEntry.IterateEntries(ctx, func(context.Context, fs.Entry) error {
|
||||
return nil
|
||||
}); err == nil {
|
||||
|
||||
iter, err := dirEntry.Iterate(ctx)
|
||||
if err == nil {
|
||||
iter.Close()
|
||||
|
||||
repoFSLog(ctx).Debugf("%v auto-detected as directory", oid)
|
||||
|
||||
return dirEntry
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,37 +107,42 @@ func (w *TreeWalker) processEntry(ctx context.Context, e fs.Entry, entryPath str
|
||||
}
|
||||
|
||||
func (w *TreeWalker) processDirEntry(ctx context.Context, dir fs.Directory, entryPath string) {
|
||||
type errStop struct {
|
||||
error
|
||||
}
|
||||
|
||||
var ag workshare.AsyncGroup[any]
|
||||
defer ag.Close()
|
||||
|
||||
err := dir.IterateEntries(ctx, func(c context.Context, ent fs.Entry) error {
|
||||
iter, err := dir.Iterate(ctx)
|
||||
if err != nil {
|
||||
w.ReportError(ctx, entryPath, errors.Wrap(err, "error reading directory"))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
ent, err := iter.Next(ctx)
|
||||
for ent != nil {
|
||||
ent2 := ent
|
||||
|
||||
if w.TooManyErrors() {
|
||||
return errStop{errors.New("")}
|
||||
break
|
||||
}
|
||||
|
||||
if w.alreadyProcessed(ctx, ent) {
|
||||
return nil
|
||||
if !w.alreadyProcessed(ctx, ent2) {
|
||||
childPath := path.Join(entryPath, ent2.Name())
|
||||
|
||||
if ag.CanShareWork(w.wp) {
|
||||
ag.RunAsync(w.wp, func(c *workshare.Pool[any], request any) {
|
||||
w.processEntry(ctx, ent2, childPath)
|
||||
}, nil)
|
||||
} else {
|
||||
w.processEntry(ctx, ent2, childPath)
|
||||
}
|
||||
}
|
||||
|
||||
childPath := path.Join(entryPath, ent.Name())
|
||||
ent, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
if ag.CanShareWork(w.wp) {
|
||||
ag.RunAsync(w.wp, func(c *workshare.Pool[any], request any) {
|
||||
w.processEntry(ctx, ent, childPath)
|
||||
}, nil)
|
||||
} else {
|
||||
w.processEntry(ctx, ent, childPath)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
var stopped errStop
|
||||
if err != nil && !errors.As(err, &stopped) {
|
||||
if err != nil {
|
||||
w.ReportError(ctx, entryPath, errors.Wrap(err, "error reading directory"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,10 +69,10 @@ func (s *sourceDirectories) Child(ctx context.Context, name string) (fs.Entry, e
|
||||
return fs.IterateEntriesAndFindChild(ctx, s, name)
|
||||
}
|
||||
|
||||
func (s *sourceDirectories) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
func (s *sourceDirectories) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
sources0, err := snapshot.ListSources(ctx, s.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to list sources")
|
||||
return nil, errors.Wrap(err, "unable to list sources")
|
||||
}
|
||||
|
||||
// step 1 - filter sources.
|
||||
@@ -95,15 +95,13 @@ func (s *sourceDirectories) IterateEntries(ctx context.Context, cb func(context.
|
||||
|
||||
name2safe = disambiguateSafeNames(name2safe)
|
||||
|
||||
for _, src := range sources {
|
||||
e := &sourceSnapshots{s.rep, src, name2safe[src.Path]}
|
||||
var entries []fs.Entry
|
||||
|
||||
if err2 := cb(ctx, e); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
for _, src := range sources {
|
||||
entries = append(entries, &sourceSnapshots{s.rep, src, name2safe[src.Path]})
|
||||
}
|
||||
|
||||
return nil
|
||||
return fs.StaticIterator(entries, nil), nil
|
||||
}
|
||||
|
||||
func disambiguateSafeNames(m map[string]string) map[string]string {
|
||||
|
||||
@@ -83,7 +83,7 @@ func iterateAllNames(ctx context.Context, t *testing.T, dir fs.Directory, prefix
|
||||
|
||||
result := map[string]struct{}{}
|
||||
|
||||
err := dir.IterateEntries(ctx, func(innerCtx context.Context, ent fs.Entry) error {
|
||||
err := fs.IterateEntries(ctx, dir, func(innerCtx context.Context, ent fs.Entry) error {
|
||||
if ent.IsDir() {
|
||||
result[prefix+ent.Name()+"/"] = struct{}{}
|
||||
childEntries := iterateAllNames(ctx, t, ent.(fs.Directory), prefix+ent.Name()+"/")
|
||||
|
||||
@@ -67,12 +67,14 @@ func (s *sourceSnapshots) Child(ctx context.Context, name string) (fs.Entry, err
|
||||
return fs.IterateEntriesAndFindChild(ctx, s, name)
|
||||
}
|
||||
|
||||
func (s *sourceSnapshots) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
func (s *sourceSnapshots) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
|
||||
manifests, err := snapshot.ListSnapshots(ctx, s.rep, s.src)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to list snapshots")
|
||||
return nil, errors.Wrap(err, "unable to list snapshots")
|
||||
}
|
||||
|
||||
var entries []fs.Entry
|
||||
|
||||
for _, m := range manifests {
|
||||
name := m.StartTime.Format("20060102-150405")
|
||||
if m.IncompleteReason != "" {
|
||||
@@ -91,14 +93,10 @@ func (s *sourceSnapshots) IterateEntries(ctx context.Context, cb func(context.Co
|
||||
de.DirSummary = m.RootEntry.DirSummary
|
||||
}
|
||||
|
||||
e := EntryFromDirEntry(s.rep, de)
|
||||
|
||||
if err2 := cb(ctx, e); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
entries = append(entries, EntryFromDirEntry(s.rep, de))
|
||||
}
|
||||
|
||||
return nil
|
||||
return fs.StaticIterator(entries, nil), nil
|
||||
}
|
||||
|
||||
var _ fs.Directory = (*sourceSnapshots)(nil)
|
||||
|
||||
@@ -768,45 +768,42 @@ func (u *Uploader) processDirectoryEntries(
|
||||
prevDirs []fs.Directory,
|
||||
wg *workshare.AsyncGroup[*uploadWorkItem],
|
||||
) error {
|
||||
// processEntryError distinguishes an error thrown when attempting to read a directory.
|
||||
type processEntryError struct {
|
||||
error
|
||||
iter, err := dir.Iterate(ctx)
|
||||
if err != nil {
|
||||
return dirReadError{err}
|
||||
}
|
||||
|
||||
err := dir.IterateEntries(ctx, func(ctx context.Context, entry fs.Entry) error {
|
||||
defer iter.Close()
|
||||
|
||||
entry, err := iter.Next(ctx)
|
||||
|
||||
for entry != nil {
|
||||
entry2 := entry
|
||||
|
||||
if u.IsCanceled() {
|
||||
return errCanceled
|
||||
}
|
||||
|
||||
entryRelativePath := path.Join(dirRelativePath, entry.Name())
|
||||
entryRelativePath := path.Join(dirRelativePath, entry2.Name())
|
||||
|
||||
if wg.CanShareWork(u.workerPool) {
|
||||
wg.RunAsync(u.workerPool, func(c *workshare.Pool[*uploadWorkItem], wi *uploadWorkItem) {
|
||||
wi.err = u.processSingle(ctx, entry, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry)
|
||||
wi.err = u.processSingle(ctx, entry2, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry)
|
||||
}, &uploadWorkItem{})
|
||||
} else {
|
||||
if err := u.processSingle(ctx, entry, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry); err != nil {
|
||||
return processEntryError{err}
|
||||
if err2 := u.processSingle(ctx, entry2, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
entry, err = iter.Next(ctx)
|
||||
}
|
||||
|
||||
var peError processEntryError
|
||||
if errors.As(err, &peError) {
|
||||
return peError.error
|
||||
if err != nil {
|
||||
return dirReadError{err}
|
||||
}
|
||||
|
||||
if errors.Is(err, errCanceled) {
|
||||
return errCanceled
|
||||
}
|
||||
|
||||
return dirReadError{err}
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint:funlen
|
||||
|
||||
@@ -768,9 +768,7 @@ func TestUploadScanStopsOnContextCancel(t *testing.T) {
|
||||
})
|
||||
|
||||
result, err := u.scanDirectory(scanctx, th.sourceDir, nil)
|
||||
if !errors.Is(err, scanctx.Err()) {
|
||||
t.Fatalf("invalid scan error: %v", err)
|
||||
}
|
||||
require.ErrorIs(t, err, scanctx.Err())
|
||||
|
||||
if result.numFiles == 0 && result.totalFileSize == 0 {
|
||||
t.Fatalf("should have returned partial results, got zeros")
|
||||
@@ -801,21 +799,11 @@ func TestUploadScanIgnoresFiles(t *testing.T) {
|
||||
result2, err := u.scanDirectory(ctx, th.sourceDir, policyTree)
|
||||
require.NoError(t, err)
|
||||
|
||||
if result1.numFiles == 0 {
|
||||
t.Fatalf("no files scanned")
|
||||
}
|
||||
require.NotEqual(t, result1.numFiles, 0)
|
||||
require.NotEqual(t, result2.numFiles, 0)
|
||||
|
||||
if result2.numFiles == 0 {
|
||||
t.Fatalf("no files scanned")
|
||||
}
|
||||
|
||||
if got, want := result2.numFiles, result1.numFiles; got >= want {
|
||||
t.Fatalf("expected lower number of files %v, wanted %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := result2.totalFileSize, result1.totalFileSize; got >= want {
|
||||
t.Fatalf("expected lower file size %v, wanted %v", got, want)
|
||||
}
|
||||
require.Less(t, result2.numFiles, result1.numFiles)
|
||||
require.Less(t, result2.totalFileSize, result1.totalFileSize)
|
||||
}
|
||||
|
||||
func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
|
||||
@@ -1002,15 +990,7 @@ func TestUpload_StreamingDirectory(t *testing.T) {
|
||||
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
||||
virtualfs.NewStreamingDirectory(
|
||||
"stream-directory",
|
||||
func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error {
|
||||
for _, f := range files {
|
||||
if err := callback(innerCtx, f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
fs.StaticIterator(files, nil),
|
||||
),
|
||||
})
|
||||
|
||||
@@ -1049,15 +1029,7 @@ func TestUpload_StreamingDirectoryWithIgnoredFile(t *testing.T) {
|
||||
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
||||
virtualfs.NewStreamingDirectory(
|
||||
"stream-directory",
|
||||
func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error {
|
||||
for _, f := range files {
|
||||
if err := callback(innerCtx, f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
fs.StaticIterator(files, nil),
|
||||
),
|
||||
})
|
||||
|
||||
@@ -1225,7 +1197,7 @@ func TestParallelUploadOfLargeFiles(t *testing.T) {
|
||||
|
||||
successCount := 0
|
||||
|
||||
dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
|
||||
fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
||||
if f, ok := e.(fs.File); ok {
|
||||
oid, err := object.ParseID(strings.TrimPrefix(f.(object.HasObjectID).ObjectID().String(), "I"))
|
||||
require.NoError(t, err)
|
||||
|
||||
Reference in New Issue
Block a user