From 61e651d30c52fc76d6b6bd3371e9844a678d2af4 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Tue, 14 Jun 2022 19:08:49 -0700 Subject: [PATCH] feat(snapshots): Allow users to dynamically create entries in a directory during an upload (#1996) * Allow dynamic directory entries with virtualfs * Tests for new virtualfs implementation * Add escape hatch for estimator during upload Some virtualfs.StreamingDirectory-s may not be able to (efficiently) support iterating through entries multiple times. Make a way for the estimator to ask if they support multiple iterations and skip the directory if they do not. * Exapand Directory interface Expand the Directory interface instead of making a new interface as it's error-prone to ensure all wrapper types properly handle types that use the new interface. * Post-rebase fixes * Make StreamingDirectory single iteration only Simplify code and test slightly by not allowing users to declare a StreamingDirectory that can be iterated through multiple times. * Add better test for estimator ignoring stream dir Previous test in uploader had a race condition, meaning it may not catch all cases. * Ignore atomic access in checklocks Comparisons known to be done after all additions to the variables in question. * Implement reviewer feedback * Remove unused function parameter --- cli/command_snapshot_estimate.go | 2 +- fs/entry.go | 3 + fs/localfs/local_fs.go | 4 + fs/localfs/shallow_fs.go | 4 + fs/virtualfs/virtualfs.go | 67 +++++++++++ fs/virtualfs/virtualfs_test.go | 136 ++++++++++++++++++++-- internal/mockfs/mockfs.go | 20 ++++ internal/server/api_estimate.go | 4 +- snapshot/snapshotfs/all_sources.go | 4 + snapshot/snapshotfs/estimate.go | 7 +- snapshot/snapshotfs/estimate_test.go | 69 +++++++++++ snapshot/snapshotfs/repofs.go | 4 + snapshot/snapshotfs/source_directories.go | 4 + snapshot/snapshotfs/source_snapshots.go | 4 + snapshot/snapshotfs/upload_scan.go | 2 +- snapshot/snapshotfs/upload_test.go | 88 ++++++++++++++ 16 files changed, 404 insertions(+), 18 deletions(-) create mode 100644 snapshot/snapshotfs/estimate_test.go diff --git a/cli/command_snapshot_estimate.go b/cli/command_snapshot_estimate.go index 6bd7b120a..37e2f36c5 100644 --- a/cli/command_snapshot_estimate.go +++ b/cli/command_snapshot_estimate.go @@ -97,7 +97,7 @@ func (c *commandSnapshotEstimate) run(ctx context.Context, rep repo.Repository) return errors.Wrapf(err, "error creating policy tree for %v", sourceInfo) } - if err := snapshotfs.Estimate(ctx, rep, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil { + if err := snapshotfs.Estimate(ctx, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil { return errors.Wrap(err, "error estimating") } diff --git a/fs/entry.go b/fs/entry.go index 857ee89af..9170e5027 100644 --- a/fs/entry.go +++ b/fs/entry.go @@ -58,6 +58,9 @@ type Directory interface { Entry Child(ctx context.Context, name string) (Entry, error) IterateEntries(ctx context.Context, cb func(context.Context, Entry) error) error + // SupportsMultipleIterations returns true if the Directory supports iterating + // through the entries multiple times. Otherwise it returns false. + SupportsMultipleIterations() bool } // DirectoryWithSummary is optionally implemented by Directory that provide summary. diff --git a/fs/localfs/local_fs.go b/fs/localfs/local_fs.go index d18742db7..464473cf9 100644 --- a/fs/localfs/local_fs.go +++ b/fs/localfs/local_fs.go @@ -102,6 +102,10 @@ type filesystemErrorEntry struct { err error } +func (fsd *filesystemDirectory) SupportsMultipleIterations() bool { + return true +} + func (fsd *filesystemDirectory) Size() int64 { // force directory size to always be zero return 0 diff --git a/fs/localfs/shallow_fs.go b/fs/localfs/shallow_fs.go index 058da4fee..6595d1561 100644 --- a/fs/localfs/shallow_fs.go +++ b/fs/localfs/shallow_fs.go @@ -115,6 +115,10 @@ func (fsf *shallowFilesystemFile) Open(ctx context.Context) (fs.Reader, error) { return nil, errors.New("shallowFilesystemFile.Open not supported") } +func (fsd *shallowFilesystemDirectory) SupportsMultipleIterations() bool { + return false +} + func (fsd *shallowFilesystemDirectory) Child(ctx context.Context, name string) (fs.Entry, error) { return nil, errors.New("shallowFilesystemDirectory.Child not supported") } diff --git a/fs/virtualfs/virtualfs.go b/fs/virtualfs/virtualfs.go index b2ce67062..aaf8c95d2 100644 --- a/fs/virtualfs/virtualfs.go +++ b/fs/virtualfs/virtualfs.go @@ -6,6 +6,7 @@ "errors" "io" "os" + "sync" "time" "github.com/kopia/kopia/fs" @@ -83,6 +84,10 @@ func (sd *staticDirectory) IterateEntries(ctx context.Context, cb func(context.C return nil } +func (sd *staticDirectory) SupportsMultipleIterations() bool { + return true +} + // NewStaticDirectory returns a virtual static directory. func NewStaticDirectory(name string, entries []fs.Entry) fs.Directory { return &staticDirectory{ @@ -94,6 +99,67 @@ func NewStaticDirectory(name string, entries []fs.Entry) fs.Directory { } } +type streamingDirectory struct { + virtualEntry + // Used to generate the next entry and execute the callback on it. + // +checklocks:mu + callback func(context.Context, func(context.Context, fs.Entry) error) error + mu sync.Mutex +} + +var errChildNotSupported = errors.New("streamingDirectory.Child not supported") + +func (sd *streamingDirectory) Child(ctx context.Context, name string) (fs.Entry, error) { + return nil, errChildNotSupported +} + +var errIteratorAlreadyUsed = errors.New("cannot use streaming directory iterator more than once") // +checklocksignore: mu + +func (sd *streamingDirectory) getIterator() (func(context.Context, func(context.Context, fs.Entry) error) error, error) { + sd.mu.Lock() + defer sd.mu.Unlock() + + if sd.callback == nil { + return nil, errIteratorAlreadyUsed + } + + cb := sd.callback + sd.callback = nil + + return cb, nil +} + +func (sd *streamingDirectory) IterateEntries( + ctx context.Context, + callback func(context.Context, fs.Entry) error, +) error { + cb, err := sd.getIterator() + if err != nil { + return err + } + + return cb(ctx, callback) +} + +func (sd *streamingDirectory) SupportsMultipleIterations() bool { + return false +} + +// NewStreamingDirectory returns a directory that will call the given function +// when IterateEntries is executed. +func NewStreamingDirectory( + name string, + callback func(context.Context, func(context.Context, fs.Entry) error) error, +) fs.Directory { + return &streamingDirectory{ + virtualEntry: virtualEntry{ + name: name, + mode: defaultPermissions | os.ModeDir, + }, + callback: callback, + } +} + // virtualFile is an implementation of fs.StreamingFile with an io.Reader. type virtualFile struct { virtualEntry @@ -130,6 +196,7 @@ func StreamingFileFromReader(name string, reader io.Reader) fs.StreamingFile { var ( _ fs.Directory = &staticDirectory{} + _ fs.Directory = &streamingDirectory{} _ fs.StreamingFile = &virtualFile{} _ fs.Entry = &virtualEntry{} ) diff --git a/fs/virtualfs/virtualfs_test.go b/fs/virtualfs/virtualfs_test.go index a88e5115e..e2431c421 100644 --- a/fs/virtualfs/virtualfs_test.go +++ b/fs/virtualfs/virtualfs_test.go @@ -1,13 +1,22 @@ package virtualfs import ( + "bytes" "context" "errors" "os" "reflect" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/internal/testlogging" +) + +const ( + testFileName = "stream-file" ) func TestStreamingFile(t *testing.T) { @@ -25,21 +34,20 @@ func TestStreamingFile(t *testing.T) { w.Close() - filename := "stream-file" - f := StreamingFileFromReader(filename, r) + f := StreamingFileFromReader(testFileName, r) rootDir := NewStaticDirectory("root", []fs.Entry{f}) - e, err := rootDir.Child(context.TODO(), filename) + e, err := rootDir.Child(testlogging.Context(t), testFileName) if err != nil { t.Fatalf("error getting child entry: %v", err) } - if e.Name() != filename { - t.Fatalf("did not get expected filename: (actual) %v != %v (expected)", e.Name(), filename) + if e.Name() != testFileName { + t.Fatalf("did not get expected filename: (actual) %v != %v (expected)", e.Name(), testFileName) } - entries, err := fs.GetAllEntries(context.TODO(), rootDir) + entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir) if err != nil { t.Fatalf("error getting dir entries %v", err) } @@ -49,7 +57,7 @@ func TestStreamingFile(t *testing.T) { } // Read and compare data - reader, err := f.GetReader(context.TODO()) + reader, err := f.GetReader(testlogging.Context(t)) if err != nil { t.Fatalf("error getting streaming file reader: %v", err) } @@ -80,11 +88,10 @@ func TestStreamingFileGetReader(t *testing.T) { w.Close() - filename := "stream-file" - f := StreamingFileFromReader(filename, r) + f := StreamingFileFromReader(testFileName, r) // Read and compare data - reader, err := f.GetReader(context.TODO()) + reader, err := f.GetReader(testlogging.Context(t)) if err != nil { t.Fatalf("error getting streaming file reader: %v", err) } @@ -100,7 +107,7 @@ func TestStreamingFileGetReader(t *testing.T) { } // Second call to GetReader must fail - _, err = f.GetReader(context.TODO()) + _, err = f.GetReader(testlogging.Context(t)) if err == nil { t.Fatal("expected error, got none") } @@ -109,3 +116,110 @@ func TestStreamingFileGetReader(t *testing.T) { t.Fatalf("did not get expected error: (actual) %v != %v (expected)", err, errReaderAlreadyUsed) } } + +func TestStreamingDirectory(t *testing.T) { + // Create a temporary file with test data + content := []byte("Temporary file content") + r := bytes.NewReader(content) + + f := StreamingFileFromReader(testFileName, r) + + rootDir := NewStreamingDirectory( + "root", + func( + ctx context.Context, + callback func(context.Context, fs.Entry) error, + ) error { + return callback(ctx, f) + }, + ) + + entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir) + require.NoError(t, err) + + assert.Len(t, entries, 1) + + e := entries[0] + require.Equal(t, e.Name(), testFileName) + + // Read and compare data + reader, err := f.GetReader(testlogging.Context(t)) + require.NoError(t, err) + + result := make([]byte, len(content)) + + _, err = reader.Read(result) + require.NoError(t, err) + + assert.True(t, reflect.DeepEqual(result, content)) +} + +func TestStreamingDirectory_MultipleIterationsFails(t *testing.T) { + // Create a temporary file with test data + content := []byte("Temporary file content") + r := bytes.NewReader(content) + + f := StreamingFileFromReader(testFileName, r) + + rootDir := NewStreamingDirectory( + "root", + func( + ctx context.Context, + callback func(context.Context, fs.Entry) error, + ) error { + return callback(ctx, f) + }, + ) + + entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir) + require.NoError(t, err) + + assert.Len(t, entries, 1) + + _, err = fs.GetAllEntries(testlogging.Context(t), rootDir) + assert.Error(t, err) +} + +var errCallback = errors.New("callback error") + +func TestStreamingDirectory_ReturnsCallbackError(t *testing.T) { + // Create a temporary file with test data + content := []byte("Temporary file content") + r := bytes.NewReader(content) + + f := StreamingFileFromReader(testFileName, r) + + rootDir := NewStreamingDirectory( + "root", + func( + ctx context.Context, + callback func(context.Context, fs.Entry) error, + ) error { + return callback(ctx, f) + }, + ) + + err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error { + return errCallback + }) + assert.ErrorIs(t, err, errCallback) +} + +var errIteration = errors.New("iteration error") + +func TestStreamingDirectory_ReturnsReadDirError(t *testing.T) { + rootDir := NewStreamingDirectory( + "root", + func( + ctx context.Context, + callback func(context.Context, fs.Entry) error, + ) error { + return errIteration + }, + ) + + err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error { + return nil + }) + assert.ErrorIs(t, err, errIteration) +} diff --git a/internal/mockfs/mockfs.go b/internal/mockfs/mockfs.go index da5a8c70d..779a37f75 100644 --- a/internal/mockfs/mockfs.go +++ b/internal/mockfs/mockfs.go @@ -265,6 +265,11 @@ func (imd *Directory) OnReaddir(cb func()) { imd.onReaddir = cb } +// SupportsMultipleIterations returns whether this directory can be iterated through multiple times. +func (imd *Directory) SupportsMultipleIterations() bool { + return true +} + // Child gets the named child of a directory. func (imd *Directory) Child(ctx context.Context, name string) (fs.Entry, error) { e := fs.FindByName(imd.children, name) @@ -353,6 +358,21 @@ func NewDirectory() *Directory { } } +// NewFile returns a new mock file with the given name, contents, and mode. +func NewFile(name string, content []byte, permissions os.FileMode) *File { + return &File{ + entry: entry{ + name: name, + mode: permissions, + size: int64(len(content)), + modTime: DefaultModTime, + }, + source: func() (ReaderSeekerCloser, error) { + return readerSeekerCloser{bytes.NewReader(content)}, nil + }, + } +} + // ErrorEntry is mock in-memory implementation of fs.ErrorEntry. type ErrorEntry struct { entry diff --git a/internal/server/api_estimate.go b/internal/server/api_estimate.go index ccf274474..972a29f7e 100644 --- a/internal/server/api_estimate.go +++ b/internal/server/api_estimate.go @@ -103,8 +103,6 @@ func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiEr return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") } - rep := rc.rep - resolvedRoot := filepath.Clean(ospath.ResolveUserFriendlyPath(req.Root, true)) e, err := localfs.NewEntry(resolvedRoot) @@ -140,7 +138,7 @@ func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiEr ctrl.OnCancel(cancel) // nolint:wrapcheck - return snapshotfs.Estimate(estimatectx, rep, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket) + return snapshotfs.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket) }) taskID := <-taskIDChan diff --git a/snapshot/snapshotfs/all_sources.go b/snapshot/snapshotfs/all_sources.go index 44c848a87..037512027 100644 --- a/snapshot/snapshotfs/all_sources.go +++ b/snapshot/snapshotfs/all_sources.go @@ -53,6 +53,10 @@ func (s *repositoryAllSources) LocalFilesystemPath() string { return "" } +func (s *repositoryAllSources) SupportsMultipleIterations() bool { + return true +} + func (s *repositoryAllSources) Child(ctx context.Context, name string) (fs.Entry, error) { // nolint:wrapcheck return fs.IterateEntriesAndFindChild(ctx, s, name) diff --git a/snapshot/snapshotfs/estimate.go b/snapshot/snapshotfs/estimate.go index d1fc20220..3b356faf6 100644 --- a/snapshot/snapshotfs/estimate.go +++ b/snapshot/snapshotfs/estimate.go @@ -11,7 +11,6 @@ "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/ignorefs" "github.com/kopia/kopia/internal/units" - "github.com/kopia/kopia/repo" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" ) @@ -74,7 +73,7 @@ type EstimateProgress interface { // Estimate walks the provided directory tree and invokes provided progress callback as it discovers // items to be snapshotted. -func Estimate(ctx context.Context, rep repo.Repository, entry fs.Directory, policyTree *policy.Tree, progress EstimateProgress, maxExamplesPerBucket int) error { +func Estimate(ctx context.Context, entry fs.Directory, policyTree *policy.Tree, progress EstimateProgress, maxExamplesPerBucket int) error { stats := &snapshot.Stats{} ed := []string{} ib := makeBuckets() @@ -125,6 +124,10 @@ type processEntryError struct { case fs.Directory: atomic.AddInt32(&stats.TotalDirectoryCount, 1) + if !entry.SupportsMultipleIterations() { + return nil + } + progress.Processing(ctx, relativePath) err := entry.IterateEntries(ctx, func(c context.Context, child fs.Entry) error { diff --git a/snapshot/snapshotfs/estimate_test.go b/snapshot/snapshotfs/estimate_test.go new file mode 100644 index 000000000..358f3224c --- /dev/null +++ b/snapshot/snapshotfs/estimate_test.go @@ -0,0 +1,69 @@ +package snapshotfs_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" + "github.com/kopia/kopia/internal/mockfs" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" + "github.com/kopia/kopia/snapshot/snapshotfs" +) + +type fakeProgress struct { + t *testing.T + expectedFiles int32 + expectedDirectories int32 + expectedErrors int32 +} + +func (p *fakeProgress) Processing(context.Context, string) {} + +func (p *fakeProgress) Error(context.Context, string, error, bool) {} + +// +checklocksignore. +func (p *fakeProgress) Stats( + ctx context.Context, + s *snapshot.Stats, + includedFiles, excludedFiles snapshotfs.SampleBuckets, + excludedDirs []string, + final bool, +) { + if !final { + return + } + + assert.Equal(p.t, s.ErrorCount, p.expectedErrors) + assert.Equal(p.t, s.TotalFileCount, p.expectedFiles) + assert.Equal(p.t, s.TotalDirectoryCount, p.expectedDirectories) +} + +func TestEstimate_SkipsStreamingDirectory(t *testing.T) { + f := mockfs.NewFile("f1", []byte{1, 2, 3}, 0o777) + + rootDir := virtualfs.NewStaticDirectory("root", []fs.Entry{ + virtualfs.NewStreamingDirectory( + "a-dir", + func(ctx context.Context, callback func(context.Context, fs.Entry) error) error { + return callback(ctx, f) + }, + ), + }) + + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + p := &fakeProgress{ + t: t, + expectedFiles: 0, + expectedDirectories: 2, + expectedErrors: 0, + } + + err := snapshotfs.Estimate(testlogging.Context(t), rootDir, policyTree, p, 1) + require.NoError(t, err) +} diff --git a/snapshot/snapshotfs/repofs.go b/snapshot/snapshotfs/repofs.go index 992a50e9a..f5e73ea3f 100644 --- a/snapshot/snapshotfs/repofs.go +++ b/snapshot/snapshotfs/repofs.go @@ -113,6 +113,10 @@ func (rd *repositoryDirectory) Summary(ctx context.Context) (*fs.DirectorySummar return rd.summary, nil } +func (rd *repositoryDirectory) SupportsMultipleIterations() bool { + return true +} + func (rd *repositoryDirectory) Child(ctx context.Context, name string) (fs.Entry, error) { if err := rd.ensureDirEntriesLoaded(ctx); err != nil { return nil, err diff --git a/snapshot/snapshotfs/source_directories.go b/snapshot/snapshotfs/source_directories.go index 3e07d52da..66794a5d8 100644 --- a/snapshot/snapshotfs/source_directories.go +++ b/snapshot/snapshotfs/source_directories.go @@ -57,6 +57,10 @@ func (s *sourceDirectories) LocalFilesystemPath() string { return "" } +func (s *sourceDirectories) SupportsMultipleIterations() bool { + return true +} + func (s *sourceDirectories) Child(ctx context.Context, name string) (fs.Entry, error) { // nolint:wrapcheck return fs.IterateEntriesAndFindChild(ctx, s, name) diff --git a/snapshot/snapshotfs/source_snapshots.go b/snapshot/snapshotfs/source_snapshots.go index 19d662564..6f0e03864 100644 --- a/snapshot/snapshotfs/source_snapshots.go +++ b/snapshot/snapshotfs/source_snapshots.go @@ -55,6 +55,10 @@ func (s *sourceSnapshots) LocalFilesystemPath() string { return "" } +func (s *sourceSnapshots) SupportsMultipleIterations() bool { + return true +} + func (s *sourceSnapshots) Child(ctx context.Context, name string) (fs.Entry, error) { // nolint:wrapcheck return fs.IterateEntriesAndFindChild(ctx, s, name) diff --git a/snapshot/snapshotfs/upload_scan.go b/snapshot/snapshotfs/upload_scan.go index 7ca309cd1..aa6b00b0e 100644 --- a/snapshot/snapshotfs/upload_scan.go +++ b/snapshot/snapshotfs/upload_scan.go @@ -36,7 +36,7 @@ func (u *Uploader) scanDirectory(ctx context.Context, dir fs.Directory, policyTr return res, nil } - err := Estimate(ctx, u.repo, dir, policyTree, &res, 1) + err := Estimate(ctx, dir, policyTree, &res, 1) return res, err } diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index b52a9ce01..3c764b5d2 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -15,6 +15,7 @@ "github.com/kylelemons/godebug/pretty" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/kopia/kopia/fs" @@ -767,6 +768,93 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) { } } +func TestUpload_StreamingDirectory(t *testing.T) { + ctx := testlogging.Context(t) + th := newUploadTestHarness(ctx, t) + + defer th.cleanup() + + t.Logf("Uploading streaming directory with mock file") + + u := NewUploader(th.repo) + + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + + files := []fs.Entry{ + mockfs.NewFile("f1", []byte{1, 2, 3}, defaultPermissions), + } + + staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{ + virtualfs.NewStreamingDirectory( + "stream-directory", + func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error { + for _, f := range files { + if err := callback(innerCtx, f); err != nil { + return err + } + } + + return nil + }, + ), + }) + + man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{}) + require.NoError(t, err) + + assert.Equal(t, atomic.LoadInt32(&man.Stats.CachedFiles), int32(0)) + assert.Equal(t, atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1)) + assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(2)) + assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1)) +} + +func TestUpload_StreamingDirectoryWithIgnoredFile(t *testing.T) { + ctx := testlogging.Context(t) + th := newUploadTestHarness(ctx, t) + + defer th.cleanup() + + t.Logf("Uploading streaming directory with some ignored mock files") + + u := NewUploader(th.repo) + + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + FilesPolicy: policy.FilesPolicy{ + IgnoreRules: []string{"f2"}, + }, + }, + }, policy.DefaultPolicy) + + files := []fs.Entry{ + mockfs.NewFile("f1", []byte{1, 2, 3}, defaultPermissions), + mockfs.NewFile("f2", []byte{1, 2, 3, 4}, defaultPermissions), + } + + staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{ + virtualfs.NewStreamingDirectory( + "stream-directory", + func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error { + for _, f := range files { + if err := callback(innerCtx, f); err != nil { + return err + } + } + + return nil + }, + ), + }) + + man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{}) + require.NoError(t, err) + + assert.Equal(t, atomic.LoadInt32(&man.Stats.CachedFiles), int32(0)) + assert.Equal(t, atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1)) + assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(2)) + assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1)) +} + type mockLogger struct { logging.Logger