From c8d1b221e2c93bbb81c9d7a3f09dabbcd684931e Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 5 Oct 2023 03:45:44 +0100 Subject: [PATCH] refactor(repository): added fs.DirectoryIterator (#3365) * refactor(repository): added fs.DirectoryIterator This significantly reduces number of small allocations while taking snapshots of lots of files, which leads to faster snapshots. ``` $ runbench --kopia-exe ~/go/bin/kopia \ --compare-to-exe ~/go/bin/kopia-baseline --min-duration 30s \ ./snapshot-linux-parallel-4.sh DIFF duration: current:5.1 baseline:5.8 change:-13.0 % DIFF repo_size: current:1081614127.6 baseline:1081615302.8 change:-0.0 % DIFF num_files: current:60.0 baseline:60.0 change:0% DIFF avg_heap_objects: current:4802666.0 baseline:4905741.8 change:-2.1 % DIFF avg_heap_bytes: current:737397275.2 baseline:715263289.6 change:+3.1 % DIFF avg_ram: current:215.0 baseline:211.5 change:+1.6 % DIFF max_ram: current:294.8 baseline:311.4 change:-5.3 % DIFF avg_cpu: current:167.3 baseline:145.3 change:+15.1 % DIFF max_cpu: current:227.2 baseline:251.0 change:-9.5 % ``` * changed `Next()` API * mechanical move of the iterator to its own file * clarified comment * pr feedback * mechanical move of all localfs dependencies on os.FileInfo to a separate file * Update fs/entry.go Co-authored-by: ashmrtn <3891298+ashmrtn@users.noreply.github.com> * Update fs/entry_dir_iterator.go Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> * doc: clarified valid results from Next() --------- Co-authored-by: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> --- cli/command_ls.go | 19 +- fs/cachefs/cache.go | 2 +- fs/entry.go | 90 +++++-- fs/entry_dir_iterator.go | 30 +++ fs/ignorefs/ignorefs.go | 85 ++++++- fs/ignorefs/ignorefs_test.go | 2 +- fs/localfs/local_fs.go | 226 +----------------- fs/localfs/local_fs_os.go | 159 ++++++++++++ fs/localfs/local_fs_test.go | 8 +- fs/localfs/localfs_benchmark_test.go | 2 +- fs/localfs/shallow_fs.go | 5 +- fs/virtualfs/virtualfs.go | 47 ++-- fs/virtualfs/virtualfs_test.go | 42 +--- internal/diff/diff_test.go | 12 +- internal/fusemount/fusefs.go | 21 +- internal/mockfs/mockfs.go | 14 +- internal/webdavmount/webdavmount.go | 36 +-- snapshot/snapshotfs/all_sources.go | 16 +- snapshot/snapshotfs/estimate.go | 37 ++- snapshot/snapshotfs/estimate_test.go | 4 +- snapshot/snapshotfs/repofs.go | 21 +- snapshot/snapshotfs/snapshot_tree_walker.go | 49 ++-- snapshot/snapshotfs/source_directories.go | 14 +- .../snapshotfs/source_directories_test.go | 2 +- snapshot/snapshotfs/source_snapshots.go | 14 +- snapshot/snapshotfs/upload.go | 39 ++- snapshot/snapshotfs/upload_test.go | 44 +--- 27 files changed, 525 insertions(+), 515 deletions(-) create mode 100644 fs/entry_dir_iterator.go create mode 100644 fs/localfs/local_fs_os.go diff --git a/cli/command_ls.go b/cli/command_ls.go index 7c4e53a77..250788554 100644 --- a/cli/command_ls.go +++ b/cli/command_ls.go @@ -54,9 +54,22 @@ func (c *commandList) run(ctx context.Context, rep repo.Repository) error { } func (c *commandList) listDirectory(ctx context.Context, d fs.Directory, prefix, indent string) error { - if err := d.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error { - return c.printDirectoryEntry(innerCtx, e, prefix, indent) - }); err != nil { + iter, err := d.Iterate(ctx) + if err != nil { + return err //nolint:wrapcheck + } + defer iter.Close() + + e, err := iter.Next(ctx) + for e != nil { + if err2 := c.printDirectoryEntry(ctx, e, prefix, indent); err2 != nil { + return err2 + } + + e, err = iter.Next(ctx) + } + + if err != nil { return err //nolint:wrapcheck } diff --git a/fs/cachefs/cache.go b/fs/cachefs/cache.go index 770233156..3cce38a1b 100644 --- a/fs/cachefs/cache.go +++ b/fs/cachefs/cache.go @@ -112,7 +112,7 @@ func(innerCtx context.Context) ([]fs.Entry, error) { return nil } - return d.IterateEntries(ctx, callback) //nolint:wrapcheck + return fs.IterateEntries(ctx, d, callback) //nolint:wrapcheck } func (c *Cache) getEntriesFromCacheLocked(ctx context.Context, id string) []fs.Entry { diff --git a/fs/entry.go b/fs/entry.go index 26f4dd3ba..f0248e4a1 100644 --- a/fs/entry.go +++ b/fs/entry.go @@ -59,13 +59,56 @@ type StreamingFile interface { // Directory represents contents of a directory. type Directory interface { Entry + Child(ctx context.Context, name string) (Entry, error) - IterateEntries(ctx context.Context, cb func(context.Context, Entry) error) error + Iterate(ctx context.Context) (DirectoryIterator, error) // SupportsMultipleIterations returns true if the Directory supports iterating // through the entries multiple times. Otherwise it returns false. SupportsMultipleIterations() bool } +// IterateEntries iterates entries the provided directory and invokes given callback for each entry +// or until the callback returns an error. +func IterateEntries(ctx context.Context, dir Directory, cb func(context.Context, Entry) error) error { + iter, err := dir.Iterate(ctx) + if err != nil { + return err //nolint:wrapcheck + } + + defer iter.Close() + + cur, err := iter.Next(ctx) + + for cur != nil { + if err2 := cb(ctx, cur); err2 != nil { + return err2 + } + + cur, err = iter.Next(ctx) + } + + return err //nolint:wrapcheck +} + +// DirectoryIterator iterates entries in a directory. +// +// The client is expected to call Next() in a loop until it returns a nil entry to signal +// end of iteration or until an error has occurred. +// +// Valid results: +// +// (nil,nil) - end of iteration, success +// (entry,nil) - iteration in progress, success +// (nil,err) - iteration stopped, failure +// +// The behavior of calling Next() after iteration has signaled its end is undefined. +// +// To release any resources associated with iteration the client must call Close(). +type DirectoryIterator interface { + Next(ctx context.Context) (Entry, error) + Close() +} + // DirectoryWithSummary is optionally implemented by Directory that provide summary. type DirectoryWithSummary interface { Summary(ctx context.Context) (*DirectorySummary, error) @@ -78,14 +121,22 @@ type ErrorEntry interface { ErrorInfo() error } -// GetAllEntries uses IterateEntries to return all entries in a Directory. +// GetAllEntries uses Iterate to return all entries in a Directory. func GetAllEntries(ctx context.Context, d Directory) ([]Entry, error) { entries := []Entry{} - err := d.IterateEntries(ctx, func(ctx context.Context, e Entry) error { - entries = append(entries, e) - return nil - }) + iter, err := d.Iterate(ctx) + if err != nil { + return nil, err //nolint:wrapcheck + } + + defer iter.Close() + + cur, err := iter.Next(ctx) + for cur != nil { + entries = append(entries, cur) + cur, err = iter.Next(ctx) + } return entries, err //nolint:wrapcheck } @@ -96,30 +147,27 @@ func GetAllEntries(ctx context.Context, d Directory) ([]Entry, error) { // IterateEntriesAndFindChild iterates through entries from a directory and returns one by name. // This is a convenience function that may be helpful in implementations of Directory.Child(). func IterateEntriesAndFindChild(ctx context.Context, d Directory, name string) (Entry, error) { - type errStop struct { - error + iter, err := d.Iterate(ctx) + if err != nil { + return nil, err //nolint:wrapcheck } - var result Entry + defer iter.Close() - err := d.IterateEntries(ctx, func(c context.Context, e Entry) error { - if result == nil && e.Name() == name { - result = e - return errStop{errors.New("")} + cur, err := iter.Next(ctx) + for cur != nil { + if cur.Name() == name { + return cur, nil } - return nil - }) - var stopped errStop - if err != nil && !errors.As(err, &stopped) { - return nil, errors.Wrap(err, "error reading directory") + cur, err = iter.Next(ctx) } - if result == nil { - return nil, ErrEntryNotFound + if err != nil { + return nil, err //nolint:wrapcheck } - return result, nil + return nil, ErrEntryNotFound } // MaxFailedEntriesPerDirectorySummary is the maximum number of failed entries per directory summary. diff --git a/fs/entry_dir_iterator.go b/fs/entry_dir_iterator.go new file mode 100644 index 000000000..f85577dbc --- /dev/null +++ b/fs/entry_dir_iterator.go @@ -0,0 +1,30 @@ +package fs + +import "context" + +type staticIterator struct { + cur int + entries []Entry + err error +} + +func (it *staticIterator) Close() { +} + +func (it *staticIterator) Next(ctx context.Context) (Entry, error) { + if it.cur < len(it.entries) { + v := it.entries[it.cur] + it.cur++ + + return v, it.err + } + + return nil, nil +} + +// StaticIterator returns a DirectoryIterator which returns the provided +// entries in order followed by a given final error. +// It is not safe to concurrently access directory iterator. +func StaticIterator(entries []Entry, err error) DirectoryIterator { + return &staticIterator{0, entries, err} +} diff --git a/fs/ignorefs/ignorefs.go b/fs/ignorefs/ignorefs.go index ec6dd30a3..477734406 100644 --- a/fs/ignorefs/ignorefs.go +++ b/fs/ignorefs/ignorefs.go @@ -5,6 +5,7 @@ "bufio" "context" "strings" + "sync" "github.com/pkg/errors" @@ -147,28 +148,81 @@ func (d *ignoreDirectory) DirEntryOrNil(ctx context.Context) (*snapshot.DirEntry return nil, nil } -func (d *ignoreDirectory) IterateEntries(ctx context.Context, callback func(ctx context.Context, entry fs.Entry) error) error { +type ignoreDirIterator struct { + //nolint:containedctx + ctx context.Context + d *ignoreDirectory + inner fs.DirectoryIterator + thisContext *ignoreContext +} + +func (i *ignoreDirIterator) Next(ctx context.Context) (fs.Entry, error) { + cur, err := i.inner.Next(ctx) + + for cur != nil { + //nolint:contextcheck + if wrapped, ok := i.d.maybeWrappedChildEntry(i.ctx, i.thisContext, cur); ok { + return wrapped, nil + } + + cur, err = i.inner.Next(ctx) + } + + return nil, err //nolint:wrapcheck +} + +func (i *ignoreDirIterator) Close() { + i.inner.Close() + + *i = ignoreDirIterator{} + ignoreDirIteratorPool.Put(i) +} + +func (d *ignoreDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { if d.skipCacheDirectory(ctx, d.relativePath, d.policyTree) { - return nil + return fs.StaticIterator(nil, nil), nil } thisContext, err := d.buildContext(ctx) if err != nil { - return err + return nil, err } - //nolint:wrapcheck - return d.Directory.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { - if wrapped, ok := d.maybeWrappedChildEntry(ctx, thisContext, e); ok { - return callback(ctx, wrapped) - } + inner, err := d.Directory.Iterate(ctx) + if err != nil { + return nil, err //nolint:wrapcheck + } - return nil - }) + it := ignoreDirIteratorPool.Get().(*ignoreDirIterator) //nolint:forcetypeassert + it.ctx = ctx + it.d = d + it.inner = inner + it.thisContext = thisContext + + return it, nil +} + +//nolint:gochecknoglobals +var ignoreDirectoryPool = sync.Pool{ + New: func() any { return &ignoreDirectory{} }, +} + +//nolint:gochecknoglobals +var ignoreDirIteratorPool = sync.Pool{ + New: func() any { return &ignoreDirIterator{} }, +} + +func (d *ignoreDirectory) Close() { + d.Directory.Close() + + *d = ignoreDirectory{} + ignoreDirectoryPool.Put(d) } func (d *ignoreDirectory) maybeWrappedChildEntry(ctx context.Context, ic *ignoreContext, e fs.Entry) (fs.Entry, bool) { - if !ic.shouldIncludeByName(ctx, d.relativePath+"/"+e.Name(), e, d.policyTree) { + s := d.relativePath + "/" + e.Name() + + if !ic.shouldIncludeByName(ctx, s, e, d.policyTree) { return nil, false } @@ -181,7 +235,14 @@ func (d *ignoreDirectory) maybeWrappedChildEntry(ctx context.Context, ic *ignore } if dir, ok := e.(fs.Directory); ok { - return &ignoreDirectory{d.relativePath + "/" + e.Name(), ic, d.policyTree.Child(e.Name()), dir}, true + id := ignoreDirectoryPool.Get().(*ignoreDirectory) //nolint:forcetypeassert + + id.relativePath = s + id.parentContext = ic + id.policyTree = d.policyTree.Child(e.Name()) + id.Directory = dir + + return id, true } return e, true diff --git a/fs/ignorefs/ignorefs_test.go b/fs/ignorefs/ignorefs_test.go index 7b1d18083..8a0207141 100644 --- a/fs/ignorefs/ignorefs_test.go +++ b/fs/ignorefs/ignorefs_test.go @@ -549,7 +549,7 @@ func walkTree(t *testing.T, dir fs.Directory) []string { walk = func(path string, d fs.Directory) error { output = append(output, path+"/") - return d.IterateEntries(testlogging.Context(t), func(innerCtx context.Context, e fs.Entry) error { + return fs.IterateEntries(testlogging.Context(t), d, func(innerCtx context.Context, e fs.Entry) error { relPath := path + "/" + e.Name() if subdir, ok := e.(fs.Directory); ok { diff --git a/fs/localfs/local_fs.go b/fs/localfs/local_fs.go index 16e05439e..44097746a 100644 --- a/fs/localfs/local_fs.go +++ b/fs/localfs/local_fs.go @@ -2,11 +2,8 @@ import ( "context" - "io" "os" "path/filepath" - "strings" - "sync" "time" "github.com/pkg/errors" @@ -14,11 +11,7 @@ "github.com/kopia/kopia/fs" ) -const ( - numEntriesToRead = 100 // number of directory entries to read in one shot - dirListingPrefetch = 200 // number of directory items to os.Lstat() in advance - paralellelStatGoroutines = 4 // how many goroutines to use when Lstat() on large directory -) +const numEntriesToRead = 100 // number of directory entries to read in one shot type filesystemEntry struct { name string @@ -71,20 +64,6 @@ func (e *filesystemEntry) LocalFilesystemPath() string { return e.fullPath() } -var _ os.FileInfo = (*filesystemEntry)(nil) - -func newEntry(fi os.FileInfo, prefix string) filesystemEntry { - return filesystemEntry{ - TrimShallowSuffix(fi.Name()), - fi.Size(), - fi.ModTime().UnixNano(), - fi.Mode(), - platformSpecificOwnerInfo(fi), - platformSpecificDeviceInfo(fi), - prefix, - } -} - type filesystemDirectory struct { filesystemEntry } @@ -111,167 +90,6 @@ func (fsd *filesystemDirectory) Size() int64 { return 0 } -func (fsd *filesystemDirectory) Child(ctx context.Context, name string) (fs.Entry, error) { - fullPath := fsd.fullPath() - - st, err := os.Lstat(filepath.Join(fullPath, name)) - if err != nil { - if os.IsNotExist(err) { - return nil, fs.ErrEntryNotFound - } - - return nil, errors.Wrap(err, "unable to get child") - } - - return entryFromDirEntry(st, fullPath+string(filepath.Separator)), nil -} - -type entryWithError struct { - entry fs.Entry - err error -} - -func toDirEntryOrNil(dirEntry os.DirEntry, prefix string) (fs.Entry, error) { - fi, err := os.Lstat(prefix + dirEntry.Name()) - if err != nil { - if os.IsNotExist(err) { - return nil, nil - } - - return nil, errors.Wrap(err, "error reading directory") - } - - return entryFromDirEntry(fi, prefix), nil -} - -func (fsd *filesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { - fullPath := fsd.fullPath() - - f, direrr := os.Open(fullPath) //nolint:gosec - if direrr != nil { - return errors.Wrap(direrr, "unable to read directory") - } - defer f.Close() //nolint:errcheck - - childPrefix := fullPath + string(filepath.Separator) - - batch, err := f.ReadDir(numEntriesToRead) - if len(batch) == numEntriesToRead { - return fsd.iterateEntriesInParallel(ctx, f, childPrefix, batch, cb) - } - - for len(batch) > 0 { - for _, de := range batch { - e, err2 := toDirEntryOrNil(de, childPrefix) - if err2 != nil { - return err2 - } - - if e == nil { - continue - } - - if err3 := cb(ctx, e); err3 != nil { - return err3 - } - } - - batch, err = f.ReadDir(numEntriesToRead) - } - - if errors.Is(err, io.EOF) { - return nil - } - - return errors.Wrap(err, "error listing directory") -} - -//nolint:gocognit,gocyclo -func (fsd *filesystemDirectory) iterateEntriesInParallel(ctx context.Context, f *os.File, childPrefix string, batch []os.DirEntry, cb func(context.Context, fs.Entry) error) error { - inputCh := make(chan os.DirEntry, dirListingPrefetch) - outputCh := make(chan entryWithError, dirListingPrefetch) - - closed := make(chan struct{}) - defer close(closed) - - var workersWG sync.WaitGroup - - // start goroutines that will convert 'os.DirEntry' to 'entryWithError' - for i := 0; i < paralellelStatGoroutines; i++ { - workersWG.Add(1) - - go func() { - defer workersWG.Done() - - for { - select { - case <-closed: - return - - case de := <-inputCh: - e, err := toDirEntryOrNil(de, childPrefix) - outputCh <- entryWithError{entry: e, err: err} - } - } - }() - } - - var pending int - - for len(batch) > 0 { - for _, de := range batch { - // before pushing fetch from outputCh and invoke callbacks for all entries in it - invokeCallbacks: - for { - select { - case dwe := <-outputCh: - pending-- - - if dwe.err != nil { - return dwe.err - } - - if dwe.entry != nil { - if err := cb(ctx, dwe.entry); err != nil { - return err - } - } - - default: - break invokeCallbacks - } - } - - inputCh <- de - pending++ - } - - nextBatch, err := f.ReadDir(numEntriesToRead) - if err != nil && !errors.Is(err, io.EOF) { - //nolint:wrapcheck - return err - } - - batch = nextBatch - } - - for i := 0; i < pending; i++ { - dwe := <-outputCh - - if dwe.err != nil { - return dwe.err - } - - if dwe.entry != nil { - if err := cb(ctx, dwe.entry); err != nil { - return err - } - } - } - - return nil -} - type fileWithMetadata struct { *os.File } @@ -315,23 +133,6 @@ func dirPrefix(s string) string { return "" } -// NewEntry returns fs.Entry for the specified path, the result will be one of supported entry types: fs.File, fs.Directory, fs.Symlink -// or fs.UnsupportedEntry. -func NewEntry(path string) (fs.Entry, error) { - path = filepath.Clean(path) - - fi, err := os.Lstat(path) - if err != nil { - return nil, errors.Wrap(err, "unable to determine entry type") - } - - if path == "/" { - return entryFromDirEntry(fi, ""), nil - } - - return entryFromDirEntry(fi, dirPrefix(path)), nil -} - // Directory returns fs.Directory for the specified path. func Directory(path string) (fs.Directory, error) { e, err := NewEntry(path) @@ -353,31 +154,6 @@ func Directory(path string) (fs.Directory, error) { } } -func entryFromDirEntry(fi os.FileInfo, prefix string) fs.Entry { - isplaceholder := strings.HasSuffix(fi.Name(), ShallowEntrySuffix) - maskedmode := fi.Mode() & os.ModeType - - switch { - case maskedmode == os.ModeDir && !isplaceholder: - return newFilesystemDirectory(newEntry(fi, prefix)) - - case maskedmode == os.ModeDir && isplaceholder: - return newShallowFilesystemDirectory(newEntry(fi, prefix)) - - case maskedmode == os.ModeSymlink && !isplaceholder: - return newFilesystemSymlink(newEntry(fi, prefix)) - - case maskedmode == 0 && !isplaceholder: - return newFilesystemFile(newEntry(fi, prefix)) - - case maskedmode == 0 && isplaceholder: - return newShallowFilesystemFile(newEntry(fi, prefix)) - - default: - return newFilesystemErrorEntry(newEntry(fi, prefix), fs.ErrUnknown) - } -} - var ( _ fs.Directory = (*filesystemDirectory)(nil) _ fs.File = (*filesystemFile)(nil) diff --git a/fs/localfs/local_fs_os.go b/fs/localfs/local_fs_os.go new file mode 100644 index 000000000..755db1db0 --- /dev/null +++ b/fs/localfs/local_fs_os.go @@ -0,0 +1,159 @@ +package localfs + +import ( + "context" + "io" + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/fs" +) + +type filesystemDirectoryIterator struct { + dirHandle *os.File + childPrefix string + + currentIndex int + currentBatch []os.DirEntry +} + +func (it *filesystemDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) { + for { + // we're at the end of the current batch, fetch the next batch + if it.currentIndex >= len(it.currentBatch) { + batch, err := it.dirHandle.ReadDir(numEntriesToRead) + if err != nil && !errors.Is(err, io.EOF) { + // stop iteration + return nil, err //nolint:wrapcheck + } + + it.currentIndex = 0 + it.currentBatch = batch + + // got empty batch + if len(batch) == 0 { + return nil, nil + } + } + + n := it.currentIndex + it.currentIndex++ + + e, err := toDirEntryOrNil(it.currentBatch[n], it.childPrefix) + if err != nil { + // stop iteration + return nil, err + } + + if e == nil { + // go to the next item + continue + } + + return e, nil + } +} + +func (it *filesystemDirectoryIterator) Close() { + it.dirHandle.Close() //nolint:errcheck +} + +func (fsd *filesystemDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { + fullPath := fsd.fullPath() + + f, direrr := os.Open(fullPath) //nolint:gosec + if direrr != nil { + return nil, errors.Wrap(direrr, "unable to read directory") + } + + childPrefix := fullPath + string(filepath.Separator) + + return &filesystemDirectoryIterator{dirHandle: f, childPrefix: childPrefix}, nil +} + +func (fsd *filesystemDirectory) Child(ctx context.Context, name string) (fs.Entry, error) { + fullPath := fsd.fullPath() + + st, err := os.Lstat(filepath.Join(fullPath, name)) + if err != nil { + if os.IsNotExist(err) { + return nil, fs.ErrEntryNotFound + } + + return nil, errors.Wrap(err, "unable to get child") + } + + return entryFromDirEntry(st, fullPath+string(filepath.Separator)), nil +} + +func toDirEntryOrNil(dirEntry os.DirEntry, prefix string) (fs.Entry, error) { + fi, err := os.Lstat(prefix + dirEntry.Name()) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + + return nil, errors.Wrap(err, "error reading directory") + } + + return entryFromDirEntry(fi, prefix), nil +} + +// NewEntry returns fs.Entry for the specified path, the result will be one of supported entry types: fs.File, fs.Directory, fs.Symlink +// or fs.UnsupportedEntry. +func NewEntry(path string) (fs.Entry, error) { + path = filepath.Clean(path) + + fi, err := os.Lstat(path) + if err != nil { + return nil, errors.Wrap(err, "unable to determine entry type") + } + + if path == "/" { + return entryFromDirEntry(fi, ""), nil + } + + return entryFromDirEntry(fi, dirPrefix(path)), nil +} + +func entryFromDirEntry(fi os.FileInfo, prefix string) fs.Entry { + isplaceholder := strings.HasSuffix(fi.Name(), ShallowEntrySuffix) + maskedmode := fi.Mode() & os.ModeType + + switch { + case maskedmode == os.ModeDir && !isplaceholder: + return newFilesystemDirectory(newEntry(fi, prefix)) + + case maskedmode == os.ModeDir && isplaceholder: + return newShallowFilesystemDirectory(newEntry(fi, prefix)) + + case maskedmode == os.ModeSymlink && !isplaceholder: + return newFilesystemSymlink(newEntry(fi, prefix)) + + case maskedmode == 0 && !isplaceholder: + return newFilesystemFile(newEntry(fi, prefix)) + + case maskedmode == 0 && isplaceholder: + return newShallowFilesystemFile(newEntry(fi, prefix)) + + default: + return newFilesystemErrorEntry(newEntry(fi, prefix), fs.ErrUnknown) + } +} + +var _ os.FileInfo = (*filesystemEntry)(nil) + +func newEntry(fi os.FileInfo, prefix string) filesystemEntry { + return filesystemEntry{ + TrimShallowSuffix(fi.Name()), + fi.Size(), + fi.ModTime().UnixNano(), + fi.Mode(), + platformSpecificOwnerInfo(fi), + platformSpecificDeviceInfo(fi), + prefix, + } +} diff --git a/fs/localfs/local_fs_test.go b/fs/localfs/local_fs_test.go index 91af9a80b..0c66eb71c 100644 --- a/fs/localfs/local_fs_test.go +++ b/fs/localfs/local_fs_test.go @@ -147,7 +147,7 @@ func TestIterateNonExistent(t *testing.T) { ctx := testlogging.Context(t) - require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + require.ErrorIs(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { t.Fatal("this won't be invoked") return nil }), os.ErrNotExist) @@ -168,7 +168,7 @@ func testIterate(t *testing.T, nFiles int) { names := map[string]int64{} - require.NoError(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + require.NoError(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { names[e.Name()] = e.Size() return nil })) @@ -179,7 +179,7 @@ func testIterate(t *testing.T, nFiles int) { cnt := 0 - require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + require.ErrorIs(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { cnt++ if cnt == nFiles/10 { @@ -191,7 +191,7 @@ func testIterate(t *testing.T, nFiles int) { cnt = 0 - require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + require.ErrorIs(t, fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { cnt++ if cnt == nFiles-1 { diff --git a/fs/localfs/localfs_benchmark_test.go b/fs/localfs/localfs_benchmark_test.go index 6d5d2ad11..d185b002c 100644 --- a/fs/localfs/localfs_benchmark_test.go +++ b/fs/localfs/localfs_benchmark_test.go @@ -57,7 +57,7 @@ func benchmarkReadDirWithCount(b *testing.B, fileCount int) { for i := 0; i < b.N; i++ { dir, _ := localfs.Directory(td) - dir.IterateEntries(ctx, func(context.Context, fs.Entry) error { + fs.IterateEntries(ctx, dir, func(context.Context, fs.Entry) error { return nil }) } diff --git a/fs/localfs/shallow_fs.go b/fs/localfs/shallow_fs.go index 2990719d2..eec648711 100644 --- a/fs/localfs/shallow_fs.go +++ b/fs/localfs/shallow_fs.go @@ -124,9 +124,8 @@ func (fsd *shallowFilesystemDirectory) Child(ctx context.Context, name string) ( return nil, errors.New("shallowFilesystemDirectory.Child not supported") } -//nolint:revive -func (fsd *shallowFilesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { - return errors.New("shallowFilesystemDirectory.IterateEntries not supported") +func (fsd *shallowFilesystemDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { + return nil, errors.New("shallowFilesystemDirectory.IterateEntries not supported") } var ( diff --git a/fs/virtualfs/virtualfs.go b/fs/virtualfs/virtualfs.go index 3fa7592f8..6d8b0e7e7 100644 --- a/fs/virtualfs/virtualfs.go +++ b/fs/virtualfs/virtualfs.go @@ -78,14 +78,8 @@ func (sd *staticDirectory) Child(ctx context.Context, name string) (fs.Entry, er return fs.IterateEntriesAndFindChild(ctx, sd, name) } -func (sd *staticDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { - for _, e := range append([]fs.Entry{}, sd.entries...) { - if err := cb(ctx, e); err != nil { - return err - } - } - - return nil +func (sd *staticDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { + return fs.StaticIterator(append([]fs.Entry{}, sd.entries...), nil), nil } func (sd *staticDirectory) SupportsMultipleIterations() bool { @@ -105,10 +99,11 @@ func NewStaticDirectory(name string, entries []fs.Entry) fs.Directory { type streamingDirectory struct { virtualEntry - // Used to generate the next entry and execute the callback on it. + + mu sync.Mutex + // +checklocks:mu - callback func(context.Context, func(context.Context, fs.Entry) error) error - mu sync.Mutex + iter fs.DirectoryIterator } var errChildNotSupported = errors.New("streamingDirectory.Child not supported") @@ -119,48 +114,36 @@ func (sd *streamingDirectory) Child(ctx context.Context, _ string) (fs.Entry, er var errIteratorAlreadyUsed = errors.New("cannot use streaming directory iterator more than once") // +checklocksignore: mu -func (sd *streamingDirectory) getIterator() (func(context.Context, func(context.Context, fs.Entry) error) error, error) { +func (sd *streamingDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { sd.mu.Lock() defer sd.mu.Unlock() - if sd.callback == nil { + if sd.iter == nil { return nil, errIteratorAlreadyUsed } - cb := sd.callback - sd.callback = nil + it := sd.iter + sd.iter = nil - return cb, nil -} - -func (sd *streamingDirectory) IterateEntries( - ctx context.Context, - callback func(context.Context, fs.Entry) error, -) error { - cb, err := sd.getIterator() - if err != nil { - return err - } - - return cb(ctx, callback) + return it, nil } func (sd *streamingDirectory) SupportsMultipleIterations() bool { return false } -// NewStreamingDirectory returns a directory that will call the given function -// when IterateEntries is executed. +// NewStreamingDirectory returns a directory that will invoke the provided iterator +// on Iterate(). func NewStreamingDirectory( name string, - callback func(context.Context, func(context.Context, fs.Entry) error) error, + iter fs.DirectoryIterator, ) fs.Directory { return &streamingDirectory{ virtualEntry: virtualEntry{ name: name, mode: defaultPermissions | os.ModeDir, }, - callback: callback, + iter: iter, } } diff --git a/fs/virtualfs/virtualfs_test.go b/fs/virtualfs/virtualfs_test.go index 5ed2100b2..6497edc71 100644 --- a/fs/virtualfs/virtualfs_test.go +++ b/fs/virtualfs/virtualfs_test.go @@ -137,12 +137,7 @@ func TestStreamingDirectory(t *testing.T) { rootDir := NewStreamingDirectory( "root", - func( - ctx context.Context, - callback func(context.Context, fs.Entry) error, - ) error { - return callback(ctx, f) - }, + fs.StaticIterator([]fs.Entry{f}, nil), ) entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir) @@ -174,12 +169,7 @@ func TestStreamingDirectory_MultipleIterationsFails(t *testing.T) { rootDir := NewStreamingDirectory( "root", - func( - ctx context.Context, - callback func(context.Context, fs.Entry) error, - ) error { - return callback(ctx, f) - }, + fs.StaticIterator([]fs.Entry{f}, nil), ) entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir) @@ -202,35 +192,11 @@ func TestStreamingDirectory_ReturnsCallbackError(t *testing.T) { rootDir := NewStreamingDirectory( "root", - func( - ctx context.Context, - callback func(context.Context, fs.Entry) error, - ) error { - return callback(ctx, f) - }, + fs.StaticIterator([]fs.Entry{f}, nil), ) - err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error { + err := fs.IterateEntries(testlogging.Context(t), rootDir, func(context.Context, fs.Entry) error { return errCallback }) assert.ErrorIs(t, err, errCallback) } - -var errIteration = errors.New("iteration error") - -func TestStreamingDirectory_ReturnsReadDirError(t *testing.T) { - rootDir := NewStreamingDirectory( - "root", - func( - ctx context.Context, - callback func(context.Context, fs.Entry) error, - ) error { - return errIteration - }, - ) - - err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error { - return nil - }) - assert.ErrorIs(t, err, errIteration) -} diff --git a/internal/diff/diff_test.go b/internal/diff/diff_test.go index 30052cfeb..6c6df4931 100644 --- a/internal/diff/diff_test.go +++ b/internal/diff/diff_test.go @@ -45,16 +45,10 @@ type testDirectory struct { modtime time.Time } -func (d *testDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { - for _, file := range d.files { - err := cb(ctx, file) - if err != nil { - return err - } - } - - return nil +func (d *testDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { + return fs.StaticIterator(d.files, nil), nil } + func (d *testDirectory) SupportsMultipleIterations() bool { return false } func (d *testDirectory) IsDir() bool { return true } func (d *testDirectory) LocalFilesystemPath() string { return d.name } diff --git a/internal/fusemount/fusefs.go b/internal/fusemount/fusefs.go index bbe117478..232d5fc69 100644 --- a/internal/fusemount/fusefs.go +++ b/internal/fusemount/fusefs.go @@ -167,13 +167,24 @@ func (dir *fuseDirectoryNode) Readdir(ctx context.Context) (gofusefs.DirStream, // TODO: Slice not required as DirStream is also an iterator. result := []fuse.DirEntry{} - err := dir.directory().IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error { + iter, err := dir.directory().Iterate(ctx) + if err != nil { + log(ctx).Errorf("error reading directory %v: %v", dir.entry.Name(), err) + return nil, syscall.EIO + } + + defer iter.Close() + + cur, err := iter.Next(ctx) + for cur != nil { result = append(result, fuse.DirEntry{ - Name: e.Name(), - Mode: entryToFuseMode(e), + Name: cur.Name(), + Mode: entryToFuseMode(cur), }) - return nil - }) + + cur, err = iter.Next(ctx) + } + if err != nil { log(ctx).Errorf("error reading directory %v: %v", dir.entry.Name(), err) return nil, syscall.EIO diff --git a/internal/mockfs/mockfs.go b/internal/mockfs/mockfs.go index 1d660bda9..7c55a7e83 100644 --- a/internal/mockfs/mockfs.go +++ b/internal/mockfs/mockfs.go @@ -303,23 +303,17 @@ func (imd *Directory) Child(ctx context.Context, name string) (fs.Entry, error) return nil, fs.ErrEntryNotFound } -// IterateEntries calls the given callback on each entry in the directory. -func (imd *Directory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { +// Iterate returns directory iterator. +func (imd *Directory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { if imd.readdirError != nil { - return imd.readdirError + return nil, imd.readdirError } if imd.onReaddir != nil { imd.onReaddir() } - for _, e := range append([]fs.Entry{}, imd.children...) { - if err := cb(ctx, e); err != nil { - return err - } - } - - return nil + return fs.StaticIterator(append([]fs.Entry{}, imd.children...), nil), nil } // File is an in-memory fs.File capable of simulating failures. diff --git a/internal/webdavmount/webdavmount.go b/internal/webdavmount/webdavmount.go index 092262b1a..57fa0e63c 100644 --- a/internal/webdavmount/webdavmount.go +++ b/internal/webdavmount/webdavmount.go @@ -102,38 +102,40 @@ type webdavDir struct { // webdavDir implements webdav.File but needs context ctx context.Context //nolint:containedctx - w *webdavFS - entry fs.Directory + w *webdavFS + info os.FileInfo + iter fs.DirectoryIterator } //nolint:gochecknoglobals var symlinksAreUnsupportedLogged = new(int32) -// TODO: (bug) This incorrectly truncates the entries in the directory and does not allow pagination. func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) { + ctx := d.ctx + var fis []os.FileInfo foundEntries := 0 - err := d.entry.IterateEntries(d.ctx, func(innerCtx context.Context, e fs.Entry) error { - if n > 0 && n <= foundEntries { - return nil + e, err := d.iter.Next(ctx) + for e != nil { + if n > 0 && foundEntries >= n { + break } foundEntries++ if _, isSymlink := e.(fs.Symlink); isSymlink { if atomic.AddInt32(symlinksAreUnsupportedLogged, 1) == 1 { - //nolint:contextcheck log(d.ctx).Errorf("Mounting directories containing symbolic links using WebDAV is not supported. The link entries will be skipped.") } - - return nil + } else { + fis = append(fis, &webdavFileInfo{e}) } - fis = append(fis, &webdavFileInfo{e}) - return nil - }) + e, err = d.iter.Next(ctx) + } + if err != nil { return nil, errors.Wrap(err, "error reading directory") } @@ -142,7 +144,7 @@ func (d *webdavDir) Readdir(n int) ([]os.FileInfo, error) { } func (d *webdavDir) Stat() (os.FileInfo, error) { - return webdavFileInfo{d.entry}, nil + return d.info, nil } func (d *webdavDir) Write(_ []byte) (int, error) { @@ -150,6 +152,7 @@ func (d *webdavDir) Write(_ []byte) (int, error) { } func (d *webdavDir) Close() error { + d.iter.Close() return nil } @@ -190,7 +193,12 @@ func (w *webdavFS) OpenFile(ctx context.Context, path string, _ int, _ os.FileMo switch f := f.(type) { case fs.Directory: - return &webdavDir{ctx, w, f}, nil + iter, err := f.Iterate(ctx) + if err != nil { + return nil, err //nolint:wrapcheck + } + + return &webdavDir{ctx, w, webdavFileInfo{f}, iter}, nil case fs.File: return &webdavFile{ctx: ctx, entry: f}, nil } diff --git a/snapshot/snapshotfs/all_sources.go b/snapshot/snapshotfs/all_sources.go index 1e1dd2765..9dab0d97e 100644 --- a/snapshot/snapshotfs/all_sources.go +++ b/snapshot/snapshotfs/all_sources.go @@ -65,10 +65,10 @@ func (s *repositoryAllSources) Child(ctx context.Context, name string) (fs.Entry return fs.IterateEntriesAndFindChild(ctx, s, name) } -func (s *repositoryAllSources) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { +func (s *repositoryAllSources) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { srcs, err := snapshot.ListSources(ctx, s.rep) if err != nil { - return errors.Wrap(err, "error listing sources") + return nil, errors.Wrap(err, "error listing sources") } users := map[string]bool{} @@ -85,19 +85,17 @@ func (s *repositoryAllSources) IterateEntries(ctx context.Context, cb func(conte name2safe = disambiguateSafeNames(name2safe) + var entries []fs.Entry + for u := range users { - e := &sourceDirectories{ + entries = append(entries, &sourceDirectories{ rep: s.rep, userHost: u, name: name2safe[u], - } - - if err2 := cb(ctx, e); err2 != nil { - return err2 - } + }) } - return nil + return fs.StaticIterator(entries, nil), nil } // AllSourcesEntry returns fs.Directory that contains the list of all snapshot sources found in the repository. diff --git a/snapshot/snapshotfs/estimate.go b/snapshot/snapshotfs/estimate.go index cd335c453..931e0afad 100644 --- a/snapshot/snapshotfs/estimate.go +++ b/snapshot/snapshotfs/estimate.go @@ -6,8 +6,6 @@ "path/filepath" "sync/atomic" - "github.com/pkg/errors" - "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/ignorefs" "github.com/kopia/kopia/internal/units" @@ -107,10 +105,6 @@ func Estimate(ctx context.Context, entry fs.Directory, policyTree *policy.Tree, } func estimate(ctx context.Context, relativePath string, entry fs.Entry, policyTree *policy.Tree, stats *snapshot.Stats, ib, eb SampleBuckets, ed *[]string, progress EstimateProgress, maxExamplesPerBucket int) error { - type processEntryError struct { - error - } - // see if the context got canceled select { case <-ctx.Done(): @@ -130,22 +124,26 @@ type processEntryError struct { progress.Processing(ctx, relativePath) - err := entry.IterateEntries(ctx, func(c context.Context, child fs.Entry) error { - defer child.Close() + iter, err := entry.Iterate(ctx) + if err == nil { + defer iter.Close() - if err2 := estimate(ctx, filepath.Join(relativePath, child.Name()), child, policyTree.Child(child.Name()), stats, ib, eb, ed, progress, maxExamplesPerBucket); err2 != nil { - return processEntryError{err2} + var child fs.Entry + + child, err = iter.Next(ctx) + for child != nil { + if err = estimate(ctx, filepath.Join(relativePath, child.Name()), child, policyTree.Child(child.Name()), stats, ib, eb, ed, progress, maxExamplesPerBucket); err != nil { + break + } + + child.Close() + child, err = iter.Next(ctx) } + } - return nil - }) + progress.Stats(ctx, stats, ib, eb, *ed, false) - var funcErr processEntryError if err != nil { - if errors.As(err, &funcErr) { - return funcErr.error - } - isIgnored := policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false) if isIgnored { @@ -155,9 +153,10 @@ type processEntryError struct { } progress.Error(ctx, relativePath, err, isIgnored) - } - progress.Stats(ctx, stats, ib, eb, *ed, false) + //nolint:wrapcheck + return err + } case fs.File: ib.add(relativePath, entry.Size(), maxExamplesPerBucket) diff --git a/snapshot/snapshotfs/estimate_test.go b/snapshot/snapshotfs/estimate_test.go index 358f3224c..4295b2be2 100644 --- a/snapshot/snapshotfs/estimate_test.go +++ b/snapshot/snapshotfs/estimate_test.go @@ -50,9 +50,7 @@ func TestEstimate_SkipsStreamingDirectory(t *testing.T) { rootDir := virtualfs.NewStaticDirectory("root", []fs.Entry{ virtualfs.NewStreamingDirectory( "a-dir", - func(ctx context.Context, callback func(context.Context, fs.Entry) error) error { - return callback(ctx, f) - }, + fs.StaticIterator([]fs.Entry{f}, nil), ), }) diff --git a/snapshot/snapshotfs/repofs.go b/snapshot/snapshotfs/repofs.go index e271efe1d..4f2f1d307 100644 --- a/snapshot/snapshotfs/repofs.go +++ b/snapshot/snapshotfs/repofs.go @@ -133,18 +133,18 @@ func (rd *repositoryDirectory) Child(ctx context.Context, name string) (fs.Entry return EntryFromDirEntry(rd.repo, de), nil } -func (rd *repositoryDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { +func (rd *repositoryDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { if err := rd.ensureDirEntriesLoaded(ctx); err != nil { - return err + return nil, err } + var entries []fs.Entry + for _, de := range rd.dirEntries { - if err := cb(ctx, EntryFromDirEntry(rd.repo, de)); err != nil { - return err - } + entries = append(entries, EntryFromDirEntry(rd.repo, de)) } - return nil + return fs.StaticIterator(entries, nil), nil } func (rd *repositoryDirectory) ensureDirEntriesLoaded(ctx context.Context) error { @@ -298,10 +298,13 @@ func SnapshotRoot(rep repo.Repository, man *snapshot.Manifest) (fs.Entry, error) func AutoDetectEntryFromObjectID(ctx context.Context, rep repo.Repository, oid object.ID, maybeName string) fs.Entry { if IsDirectoryID(oid) { dirEntry := DirectoryEntry(rep, oid, nil) - if err := dirEntry.IterateEntries(ctx, func(context.Context, fs.Entry) error { - return nil - }); err == nil { + + iter, err := dirEntry.Iterate(ctx) + if err == nil { + iter.Close() + repoFSLog(ctx).Debugf("%v auto-detected as directory", oid) + return dirEntry } } diff --git a/snapshot/snapshotfs/snapshot_tree_walker.go b/snapshot/snapshotfs/snapshot_tree_walker.go index 2ed301604..fd945a7f6 100644 --- a/snapshot/snapshotfs/snapshot_tree_walker.go +++ b/snapshot/snapshotfs/snapshot_tree_walker.go @@ -107,37 +107,42 @@ func (w *TreeWalker) processEntry(ctx context.Context, e fs.Entry, entryPath str } func (w *TreeWalker) processDirEntry(ctx context.Context, dir fs.Directory, entryPath string) { - type errStop struct { - error - } - var ag workshare.AsyncGroup[any] defer ag.Close() - err := dir.IterateEntries(ctx, func(c context.Context, ent fs.Entry) error { + iter, err := dir.Iterate(ctx) + if err != nil { + w.ReportError(ctx, entryPath, errors.Wrap(err, "error reading directory")) + + return + } + + defer iter.Close() + + ent, err := iter.Next(ctx) + for ent != nil { + ent2 := ent + if w.TooManyErrors() { - return errStop{errors.New("")} + break } - if w.alreadyProcessed(ctx, ent) { - return nil + if !w.alreadyProcessed(ctx, ent2) { + childPath := path.Join(entryPath, ent2.Name()) + + if ag.CanShareWork(w.wp) { + ag.RunAsync(w.wp, func(c *workshare.Pool[any], request any) { + w.processEntry(ctx, ent2, childPath) + }, nil) + } else { + w.processEntry(ctx, ent2, childPath) + } } - childPath := path.Join(entryPath, ent.Name()) + ent, err = iter.Next(ctx) + } - if ag.CanShareWork(w.wp) { - ag.RunAsync(w.wp, func(c *workshare.Pool[any], request any) { - w.processEntry(ctx, ent, childPath) - }, nil) - } else { - w.processEntry(ctx, ent, childPath) - } - - return nil - }) - - var stopped errStop - if err != nil && !errors.As(err, &stopped) { + if err != nil { w.ReportError(ctx, entryPath, errors.Wrap(err, "error reading directory")) } } diff --git a/snapshot/snapshotfs/source_directories.go b/snapshot/snapshotfs/source_directories.go index fdb0a6ea4..605dcf24c 100644 --- a/snapshot/snapshotfs/source_directories.go +++ b/snapshot/snapshotfs/source_directories.go @@ -69,10 +69,10 @@ func (s *sourceDirectories) Child(ctx context.Context, name string) (fs.Entry, e return fs.IterateEntriesAndFindChild(ctx, s, name) } -func (s *sourceDirectories) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { +func (s *sourceDirectories) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { sources0, err := snapshot.ListSources(ctx, s.rep) if err != nil { - return errors.Wrap(err, "unable to list sources") + return nil, errors.Wrap(err, "unable to list sources") } // step 1 - filter sources. @@ -95,15 +95,13 @@ func (s *sourceDirectories) IterateEntries(ctx context.Context, cb func(context. name2safe = disambiguateSafeNames(name2safe) - for _, src := range sources { - e := &sourceSnapshots{s.rep, src, name2safe[src.Path]} + var entries []fs.Entry - if err2 := cb(ctx, e); err2 != nil { - return err2 - } + for _, src := range sources { + entries = append(entries, &sourceSnapshots{s.rep, src, name2safe[src.Path]}) } - return nil + return fs.StaticIterator(entries, nil), nil } func disambiguateSafeNames(m map[string]string) map[string]string { diff --git a/snapshot/snapshotfs/source_directories_test.go b/snapshot/snapshotfs/source_directories_test.go index 3d17c6fd6..70410fc85 100644 --- a/snapshot/snapshotfs/source_directories_test.go +++ b/snapshot/snapshotfs/source_directories_test.go @@ -83,7 +83,7 @@ func iterateAllNames(ctx context.Context, t *testing.T, dir fs.Directory, prefix result := map[string]struct{}{} - err := dir.IterateEntries(ctx, func(innerCtx context.Context, ent fs.Entry) error { + err := fs.IterateEntries(ctx, dir, func(innerCtx context.Context, ent fs.Entry) error { if ent.IsDir() { result[prefix+ent.Name()+"/"] = struct{}{} childEntries := iterateAllNames(ctx, t, ent.(fs.Directory), prefix+ent.Name()+"/") diff --git a/snapshot/snapshotfs/source_snapshots.go b/snapshot/snapshotfs/source_snapshots.go index 478903f86..3e83de49a 100644 --- a/snapshot/snapshotfs/source_snapshots.go +++ b/snapshot/snapshotfs/source_snapshots.go @@ -67,12 +67,14 @@ func (s *sourceSnapshots) Child(ctx context.Context, name string) (fs.Entry, err return fs.IterateEntriesAndFindChild(ctx, s, name) } -func (s *sourceSnapshots) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { +func (s *sourceSnapshots) Iterate(ctx context.Context) (fs.DirectoryIterator, error) { manifests, err := snapshot.ListSnapshots(ctx, s.rep, s.src) if err != nil { - return errors.Wrap(err, "unable to list snapshots") + return nil, errors.Wrap(err, "unable to list snapshots") } + var entries []fs.Entry + for _, m := range manifests { name := m.StartTime.Format("20060102-150405") if m.IncompleteReason != "" { @@ -91,14 +93,10 @@ func (s *sourceSnapshots) IterateEntries(ctx context.Context, cb func(context.Co de.DirSummary = m.RootEntry.DirSummary } - e := EntryFromDirEntry(s.rep, de) - - if err2 := cb(ctx, e); err2 != nil { - return err2 - } + entries = append(entries, EntryFromDirEntry(s.rep, de)) } - return nil + return fs.StaticIterator(entries, nil), nil } var _ fs.Directory = (*sourceSnapshots)(nil) diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 910f7eb5e..92a4280b6 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -768,45 +768,42 @@ func (u *Uploader) processDirectoryEntries( prevDirs []fs.Directory, wg *workshare.AsyncGroup[*uploadWorkItem], ) error { - // processEntryError distinguishes an error thrown when attempting to read a directory. - type processEntryError struct { - error + iter, err := dir.Iterate(ctx) + if err != nil { + return dirReadError{err} } - err := dir.IterateEntries(ctx, func(ctx context.Context, entry fs.Entry) error { + defer iter.Close() + + entry, err := iter.Next(ctx) + + for entry != nil { + entry2 := entry + if u.IsCanceled() { return errCanceled } - entryRelativePath := path.Join(dirRelativePath, entry.Name()) + entryRelativePath := path.Join(dirRelativePath, entry2.Name()) if wg.CanShareWork(u.workerPool) { wg.RunAsync(u.workerPool, func(c *workshare.Pool[*uploadWorkItem], wi *uploadWorkItem) { - wi.err = u.processSingle(ctx, entry, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry) + wi.err = u.processSingle(ctx, entry2, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry) }, &uploadWorkItem{}) } else { - if err := u.processSingle(ctx, entry, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry); err != nil { - return processEntryError{err} + if err2 := u.processSingle(ctx, entry2, entryRelativePath, parentDirBuilder, policyTree, prevDirs, localDirPathOrEmpty, parentCheckpointRegistry); err2 != nil { + return err2 } } - return nil - }) - - if err == nil { - return nil + entry, err = iter.Next(ctx) } - var peError processEntryError - if errors.As(err, &peError) { - return peError.error + if err != nil { + return dirReadError{err} } - if errors.Is(err, errCanceled) { - return errCanceled - } - - return dirReadError{err} + return nil } //nolint:funlen diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 01a262b4f..8e86e53af 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -768,9 +768,7 @@ func TestUploadScanStopsOnContextCancel(t *testing.T) { }) result, err := u.scanDirectory(scanctx, th.sourceDir, nil) - if !errors.Is(err, scanctx.Err()) { - t.Fatalf("invalid scan error: %v", err) - } + require.ErrorIs(t, err, scanctx.Err()) if result.numFiles == 0 && result.totalFileSize == 0 { t.Fatalf("should have returned partial results, got zeros") @@ -801,21 +799,11 @@ func TestUploadScanIgnoresFiles(t *testing.T) { result2, err := u.scanDirectory(ctx, th.sourceDir, policyTree) require.NoError(t, err) - if result1.numFiles == 0 { - t.Fatalf("no files scanned") - } + require.NotEqual(t, result1.numFiles, 0) + require.NotEqual(t, result2.numFiles, 0) - if result2.numFiles == 0 { - t.Fatalf("no files scanned") - } - - if got, want := result2.numFiles, result1.numFiles; got >= want { - t.Fatalf("expected lower number of files %v, wanted %v", got, want) - } - - if got, want := result2.totalFileSize, result1.totalFileSize; got >= want { - t.Fatalf("expected lower file size %v, wanted %v", got, want) - } + require.Less(t, result2.numFiles, result1.numFiles) + require.Less(t, result2.totalFileSize, result1.totalFileSize) } func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) { @@ -1002,15 +990,7 @@ func TestUpload_StreamingDirectory(t *testing.T) { staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{ virtualfs.NewStreamingDirectory( "stream-directory", - func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error { - for _, f := range files { - if err := callback(innerCtx, f); err != nil { - return err - } - } - - return nil - }, + fs.StaticIterator(files, nil), ), }) @@ -1049,15 +1029,7 @@ func TestUpload_StreamingDirectoryWithIgnoredFile(t *testing.T) { staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{ virtualfs.NewStreamingDirectory( "stream-directory", - func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error { - for _, f := range files { - if err := callback(innerCtx, f); err != nil { - return err - } - } - - return nil - }, + fs.StaticIterator(files, nil), ), }) @@ -1225,7 +1197,7 @@ func TestParallelUploadOfLargeFiles(t *testing.T) { successCount := 0 - dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { if f, ok := e.(fs.File); ok { oid, err := object.ParseID(strings.TrimPrefix(f.(object.HasObjectID).ObjectID().String(), "I")) require.NoError(t, err)