feat(snapshots): implemented iteration for local filesystem (#1967)

When combined with #1963, it significantly reduces memory usage.

When backing up Kopia enlistment with various binaries 2.8GB
(files:74180 dirs:12322):

Before: max memory 440MB, time 5.8s
After:  max memory 360MB, time 5.4s
This commit is contained in:
Jarek Kowalski
2022-05-26 14:32:16 -07:00
committed by GitHub
parent 6757489a00
commit bb9c2bf250
4 changed files with 175 additions and 105 deletions

View File

@@ -134,3 +134,6 @@ issues:
- text: "unwrapped: sig: func github.com/kopia/kopia/fs.ReaddirToIterate"
linters:
- wrapcheck
- text: "unwrapped: sig: func github.com/kopia/kopia/fs.IterateEntriesToReaddir"
linters:
- wrapcheck

View File

@@ -95,6 +95,20 @@ func ReaddirToIterate(ctx context.Context, d Directory, cb func(context.Context,
return nil
}
// IterateEntriesToReaddir is an adapter for a naive IterateEntries -> Readdir implementation.
func IterateEntriesToReaddir(ctx context.Context, d Directory) (Entries, error) {
var entries Entries
err := d.IterateEntries(ctx, func(ctx context.Context, e Entry) error {
entries = append(entries, e)
return nil
})
entries.Sort()
return entries, err // nolint:wrapcheck
}
// ReadDirAndFindChild reads all entries from a directory and returns one by name.
// This is a convenience function that may be helpful in implementations of Directory.Child().
func ReadDirAndFindChild(ctx context.Context, d Directory, name string) (Entry, error) {

View File

@@ -15,7 +15,6 @@
)
const (
numEntriesToReadFirst = 100 // number of directory entries to read in the first batch before parallelism kicks in.
numEntriesToRead = 100 // number of directory entries to read in one shot
dirListingPrefetch = 200 // number of directory items to os.Lstat() in advance
paralellelStatGoroutines = 4 // how many goroutines to use when Lstat() on large directory
@@ -141,155 +140,136 @@ func toDirEntryOrNil(dirEntry os.DirEntry, prefix string) (fs.Entry, error) {
return entryFromDirEntry(fi, prefix), nil
}
func (fsd *filesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
return fs.ReaddirToIterate(ctx, fsd, cb)
func (fsd *filesystemDirectory) Readdir(ctx context.Context) (fs.Entries, error) {
return fs.IterateEntriesToReaddir(ctx, fsd)
}
func (fsd *filesystemDirectory) Readdir(ctx context.Context) (fs.Entries, error) {
func (fsd *filesystemDirectory) IterateEntries(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
fullPath := fsd.fullPath()
f, direrr := os.Open(fullPath) //nolint:gosec
if direrr != nil {
return nil, errors.Wrap(direrr, "unable to read directory")
return errors.Wrap(direrr, "unable to read directory")
}
defer f.Close() //nolint:errcheck,gosec
var entries fs.Entries
// read first batch of directory entries using Readdir() before parallelization.
firstBatch, firstBatchErr := f.ReadDir(numEntriesToReadFirst)
if firstBatchErr != nil && !errors.Is(firstBatchErr, io.EOF) {
return nil, errors.Wrap(firstBatchErr, "unable to read directory entries")
}
childPrefix := fullPath + string(filepath.Separator)
for _, de := range firstBatch {
e, err := toDirEntryOrNil(de, childPrefix)
if err != nil {
return nil, errors.Wrap(err, "error reading entry")
}
if e != nil {
entries = append(entries, e)
}
batch, err := f.ReadDir(numEntriesToRead)
if len(batch) == numEntriesToRead {
return fsd.iterateEntriesInParallel(ctx, f, childPrefix, batch, cb)
}
// first batch was complete with EOF, we're done here.
if errors.Is(firstBatchErr, io.EOF) {
entries.Sort()
return entries, nil
}
// first batch was shorter than expected, perform another read to make sure we get EOF.
if len(firstBatch) < numEntriesToRead {
secondBatch, secondBatchErr := f.ReadDir(numEntriesToRead)
if secondBatchErr != nil && !errors.Is(secondBatchErr, io.EOF) {
return nil, errors.Wrap(secondBatchErr, "unable to read directory entries")
}
// process results in case it's not EOF.
for _, de := range secondBatch {
e, err := toDirEntryOrNil(de, childPrefix)
if err != nil {
return nil, errors.Wrap(err, "error reading entry")
for len(batch) > 0 {
for _, de := range batch {
e, err2 := toDirEntryOrNil(de, childPrefix)
if err2 != nil {
return err2
}
if e != nil {
entries = append(entries, e)
}
}
// if we got EOF at this point, return.
if errors.Is(secondBatchErr, io.EOF) {
entries.Sort()
return entries, nil
}
}
return fsd.readRemainingDirEntriesInParallel(childPrefix, entries, f)
}
func (fsd *filesystemDirectory) readRemainingDirEntriesInParallel(childPrefix string, entries fs.Entries, f *os.File) (fs.Entries, error) {
// start feeding directory entries to dirEntryCh
dirEntryCh := make(chan os.DirEntry, dirListingPrefetch)
var readDirErr error
go func() {
defer close(dirEntryCh)
for {
des, err := f.ReadDir(numEntriesToRead)
for _, de := range des {
dirEntryCh <- de
}
if err == nil {
if e == nil {
continue
}
if errors.Is(err, io.EOF) {
break
if err3 := cb(ctx, e); err3 != nil {
return err3
}
readDirErr = err
break
}
}()
entriesCh := make(chan entryWithError, dirListingPrefetch)
batch, err = f.ReadDir(numEntriesToRead)
}
if errors.Is(err, io.EOF) {
return nil
}
return errors.Wrap(err, "error listing directory")
}
// nolint:gocognit,gocyclo
func (fsd *filesystemDirectory) iterateEntriesInParallel(ctx context.Context, f *os.File, childPrefix string, batch []os.DirEntry, cb func(context.Context, fs.Entry) error) error {
inputCh := make(chan os.DirEntry, dirListingPrefetch)
outputCh := make(chan entryWithError, dirListingPrefetch)
closed := make(chan struct{})
defer close(closed)
var workersWG sync.WaitGroup
// start goroutines that will convert 'os.DirEntry' to 'entryWithError'
for i := 0; i < paralellelStatGoroutines; i++ {
workersWG.Add(1)
go func() {
defer workersWG.Done()
for de := range dirEntryCh {
e, err := toDirEntryOrNil(de, childPrefix)
if err != nil {
entriesCh <- entryWithError{err: errors.Errorf("unable to stat directory entry %q: %v", de, err)}
continue
}
for {
select {
case <-closed:
return
if e != nil {
entriesCh <- entryWithError{entry: e}
case de := <-inputCh:
e, err := toDirEntryOrNil(de, childPrefix)
outputCh <- entryWithError{entry: e, err: err}
}
}
}()
}
// close entriesCh channel when all goroutines terminate
go func() {
workersWG.Wait()
close(entriesCh)
}()
var pending int
// drain the entriesCh into a slice and sort it
for len(batch) > 0 {
for _, de := range batch {
// before pushing fetch from outputCh and invoke callbacks for all entries in it
invokeCallbacks:
for {
select {
case dwe := <-outputCh:
pending--
for e := range entriesCh {
if e.err != nil {
// only return the first error
if readDirErr == nil {
readDirErr = e.err
if dwe.err != nil {
return dwe.err
}
if dwe.entry != nil {
if err := cb(ctx, dwe.entry); err != nil {
return err
}
}
default:
break invokeCallbacks
}
}
continue
inputCh <- de
pending++
}
entries = append(entries, e.entry)
nextBatch, err := f.ReadDir(numEntriesToRead)
if err != nil && !errors.Is(err, io.EOF) {
// nolint:wrapcheck
return err
}
batch = nextBatch
}
entries.Sort()
for i := 0; i < pending; i++ {
dwe := <-outputCh
// return any error encountered when listing or reading the directory
return entries, readDirErr
if dwe.err != nil {
return dwe.err
}
if dwe.entry != nil {
if err := cb(ctx, dwe.entry); err != nil {
return err
}
}
}
return nil
}
type fileWithMetadata struct {

View File

@@ -1,6 +1,7 @@
package localfs
import (
"context"
"fmt"
"os"
"path/filepath"
@@ -98,6 +99,78 @@ func TestFiles(t *testing.T) {
verifyChild(t, dir)
}
func TestIterate1000(t *testing.T) {
testIterate(t, 1000)
}
func TestIterate10(t *testing.T) {
testIterate(t, 10)
}
func TestIterateNonExistent(t *testing.T) {
tmp := testutil.TempDirectory(t)
dir, err := Directory(tmp)
require.NoError(t, err)
os.Remove(tmp)
ctx := testlogging.Context(t)
require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
t.Fatal("this won't be invoked")
return nil
}), os.ErrNotExist)
}
// nolint:thelper
func testIterate(t *testing.T, nFiles int) {
tmp := testutil.TempDirectory(t)
for i := 0; i < nFiles; i++ {
assertNoError(t, os.WriteFile(filepath.Join(tmp, fmt.Sprintf("f%v", i)), []byte{1, 2, 3}, 0o777))
}
dir, err := Directory(tmp)
require.NoError(t, err)
ctx := testlogging.Context(t)
names := map[string]int64{}
require.NoError(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
names[e.Name()] = e.Size()
return nil
}))
require.Len(t, names, nFiles)
errTest := errors.New("test error")
cnt := 0
require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
cnt++
if cnt == nFiles/10 {
return errTest
}
return nil
}), errTest)
cnt = 0
require.ErrorIs(t, dir.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
cnt++
if cnt == nFiles-1 {
return errTest
}
return nil
}), errTest)
}
func verifyChild(t *testing.T, dir fs.Directory) {
t.Helper()