diff --git a/.golangci.yml b/.golangci.yml index 1191c12cd..2aac01796 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -134,3 +134,6 @@ issues: - text: "unwrapped: sig: func github.com/kopia/kopia/fs.ReaddirToIterate" linters: - wrapcheck + - text: "unwrapped: sig: func github.com/kopia/kopia/fs.IterateEntriesToReaddir" + linters: + - wrapcheck diff --git a/fs/entry.go b/fs/entry.go index f228f01e4..f48a1a9e2 100644 --- a/fs/entry.go +++ b/fs/entry.go @@ -95,6 +95,20 @@ func ReaddirToIterate(ctx context.Context, d Directory, cb func(context.Context, return nil } +// IterateEntriesToReaddir is an adapter for a naive IterateEntries -> Readdir implementation. +func IterateEntriesToReaddir(ctx context.Context, d Directory) (Entries, error) { + var entries Entries + + err := d.IterateEntries(ctx, func(ctx context.Context, e Entry) error { + entries = append(entries, e) + return nil + }) + + entries.Sort() + + return entries, err // nolint:wrapcheck +} + // ReadDirAndFindChild reads all entries from a directory and returns one by name. // This is a convenience function that may be helpful in implementations of Directory.Child(). func ReadDirAndFindChild(ctx context.Context, d Directory, name string) (Entry, error) { diff --git a/fs/localfs/local_fs.go b/fs/localfs/local_fs.go index 79da72e74..fe25d123c 100644 --- a/fs/localfs/local_fs.go +++ b/fs/localfs/local_fs.go @@ -15,7 +15,6 @@ ) const ( - numEntriesToReadFirst = 100 // number of directory entries to read in the first batch before parallelism kicks in. numEntriesToRead = 100 // number of directory entries to read in one shot dirListingPrefetch = 200 // number of directory items to os.Lstat() in advance paralellelStatGoroutines = 4 // how many goroutines to use when Lstat() on large directory @@ -141,155 +140,136 @@ func toDirEntryOrNil(dirEntry os.DirEntry, prefix string) (fs.Entry, error) { return entryFromDirEntry(fi, prefix), nil } -func (fsd *filesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { - return fs.ReaddirToIterate(ctx, fsd, cb) +func (fsd *filesystemDirectory) Readdir(ctx context.Context) (fs.Entries, error) { + return fs.IterateEntriesToReaddir(ctx, fsd) } -func (fsd *filesystemDirectory) Readdir(ctx context.Context) (fs.Entries, error) { +func (fsd *filesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error { fullPath := fsd.fullPath() f, direrr := os.Open(fullPath) //nolint:gosec if direrr != nil { - return nil, errors.Wrap(direrr, "unable to read directory") + return errors.Wrap(direrr, "unable to read directory") } defer f.Close() //nolint:errcheck,gosec - var entries fs.Entries - - // read first batch of directory entries using Readdir() before parallelization. - firstBatch, firstBatchErr := f.ReadDir(numEntriesToReadFirst) - if firstBatchErr != nil && !errors.Is(firstBatchErr, io.EOF) { - return nil, errors.Wrap(firstBatchErr, "unable to read directory entries") - } - childPrefix := fullPath + string(filepath.Separator) - for _, de := range firstBatch { - e, err := toDirEntryOrNil(de, childPrefix) - if err != nil { - return nil, errors.Wrap(err, "error reading entry") - } - - if e != nil { - entries = append(entries, e) - } + batch, err := f.ReadDir(numEntriesToRead) + if len(batch) == numEntriesToRead { + return fsd.iterateEntriesInParallel(ctx, f, childPrefix, batch, cb) } - // first batch was complete with EOF, we're done here. - if errors.Is(firstBatchErr, io.EOF) { - entries.Sort() - - return entries, nil - } - - // first batch was shorter than expected, perform another read to make sure we get EOF. - if len(firstBatch) < numEntriesToRead { - secondBatch, secondBatchErr := f.ReadDir(numEntriesToRead) - if secondBatchErr != nil && !errors.Is(secondBatchErr, io.EOF) { - return nil, errors.Wrap(secondBatchErr, "unable to read directory entries") - } - - // process results in case it's not EOF. - for _, de := range secondBatch { - e, err := toDirEntryOrNil(de, childPrefix) - if err != nil { - return nil, errors.Wrap(err, "error reading entry") + for len(batch) > 0 { + for _, de := range batch { + e, err2 := toDirEntryOrNil(de, childPrefix) + if err2 != nil { + return err2 } - if e != nil { - entries = append(entries, e) - } - } - - // if we got EOF at this point, return. - if errors.Is(secondBatchErr, io.EOF) { - entries.Sort() - - return entries, nil - } - } - - return fsd.readRemainingDirEntriesInParallel(childPrefix, entries, f) -} - -func (fsd *filesystemDirectory) readRemainingDirEntriesInParallel(childPrefix string, entries fs.Entries, f *os.File) (fs.Entries, error) { - // start feeding directory entries to dirEntryCh - dirEntryCh := make(chan os.DirEntry, dirListingPrefetch) - - var readDirErr error - - go func() { - defer close(dirEntryCh) - - for { - des, err := f.ReadDir(numEntriesToRead) - for _, de := range des { - dirEntryCh <- de - } - - if err == nil { + if e == nil { continue } - if errors.Is(err, io.EOF) { - break + if err3 := cb(ctx, e); err3 != nil { + return err3 } - - readDirErr = err - - break } - }() - entriesCh := make(chan entryWithError, dirListingPrefetch) + batch, err = f.ReadDir(numEntriesToRead) + } + + if errors.Is(err, io.EOF) { + return nil + } + + return errors.Wrap(err, "error listing directory") +} + +// nolint:gocognit,gocyclo +func (fsd *filesystemDirectory) iterateEntriesInParallel(ctx context.Context, f *os.File, childPrefix string, batch []os.DirEntry, cb func(context.Context, fs.Entry) error) error { + inputCh := make(chan os.DirEntry, dirListingPrefetch) + outputCh := make(chan entryWithError, dirListingPrefetch) + + closed := make(chan struct{}) + defer close(closed) var workersWG sync.WaitGroup + // start goroutines that will convert 'os.DirEntry' to 'entryWithError' for i := 0; i < paralellelStatGoroutines; i++ { workersWG.Add(1) go func() { defer workersWG.Done() - for de := range dirEntryCh { - e, err := toDirEntryOrNil(de, childPrefix) - if err != nil { - entriesCh <- entryWithError{err: errors.Errorf("unable to stat directory entry %q: %v", de, err)} - continue - } + for { + select { + case <-closed: + return - if e != nil { - entriesCh <- entryWithError{entry: e} + case de := <-inputCh: + e, err := toDirEntryOrNil(de, childPrefix) + outputCh <- entryWithError{entry: e, err: err} } } }() } - // close entriesCh channel when all goroutines terminate - go func() { - workersWG.Wait() - close(entriesCh) - }() + var pending int - // drain the entriesCh into a slice and sort it + for len(batch) > 0 { + for _, de := range batch { + // before pushing fetch from outputCh and invoke callbacks for all entries in it + invokeCallbacks: + for { + select { + case dwe := <-outputCh: + pending-- - for e := range entriesCh { - if e.err != nil { - // only return the first error - if readDirErr == nil { - readDirErr = e.err + if dwe.err != nil { + return dwe.err + } + + if dwe.entry != nil { + if err := cb(ctx, dwe.entry); err != nil { + return err + } + } + + default: + break invokeCallbacks + } } - continue + inputCh <- de + pending++ } - entries = append(entries, e.entry) + nextBatch, err := f.ReadDir(numEntriesToRead) + if err != nil && !errors.Is(err, io.EOF) { + // nolint:wrapcheck + return err + } + + batch = nextBatch } - entries.Sort() + for i := 0; i < pending; i++ { + dwe := <-outputCh - // return any error encountered when listing or reading the directory - return entries, readDirErr + if dwe.err != nil { + return dwe.err + } + + if dwe.entry != nil { + if err := cb(ctx, dwe.entry); err != nil { + return err + } + } + } + + return nil } type fileWithMetadata struct { diff --git a/fs/localfs/local_fs_test.go b/fs/localfs/local_fs_test.go index 521752b15..c201c77d1 100644 --- a/fs/localfs/local_fs_test.go +++ b/fs/localfs/local_fs_test.go @@ -1,6 +1,7 @@ package localfs import ( + "context" "fmt" "os" "path/filepath" @@ -98,6 +99,78 @@ func TestFiles(t *testing.T) { verifyChild(t, dir) } +func TestIterate1000(t *testing.T) { + testIterate(t, 1000) +} + +func TestIterate10(t *testing.T) { + testIterate(t, 10) +} + +func TestIterateNonExistent(t *testing.T) { + tmp := testutil.TempDirectory(t) + + dir, err := Directory(tmp) + require.NoError(t, err) + os.Remove(tmp) + + ctx := testlogging.Context(t) + + require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + t.Fatal("this won't be invoked") + return nil + }), os.ErrNotExist) +} + +// nolint:thelper +func testIterate(t *testing.T, nFiles int) { + tmp := testutil.TempDirectory(t) + + for i := 0; i < nFiles; i++ { + assertNoError(t, os.WriteFile(filepath.Join(tmp, fmt.Sprintf("f%v", i)), []byte{1, 2, 3}, 0o777)) + } + + dir, err := Directory(tmp) + require.NoError(t, err) + + ctx := testlogging.Context(t) + + names := map[string]int64{} + + require.NoError(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + names[e.Name()] = e.Size() + return nil + })) + + require.Len(t, names, nFiles) + + errTest := errors.New("test error") + + cnt := 0 + + require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + cnt++ + + if cnt == nFiles/10 { + return errTest + } + + return nil + }), errTest) + + cnt = 0 + + require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error { + cnt++ + + if cnt == nFiles-1 { + return errTest + } + + return nil + }), errTest) +} + func verifyChild(t *testing.T, dir fs.Directory) { t.Helper()