feat(snapshots): Allow users to dynamically create entries in a directory during an upload (#1996)

* Allow dynamic directory entries with virtualfs

* Tests for new virtualfs implementation

* Add escape hatch for estimator during upload

Some virtualfs.StreamingDirectory-s may not be able to (efficiently)
support iterating through entries multiple times. Make a way for the
estimator to ask if they support multiple iterations and skip the
directory if they do not.

* Exapand Directory interface

Expand the Directory interface instead of making a new interface as it's
error-prone to ensure all wrapper types properly handle types that use
the new interface.

* Post-rebase fixes

* Make StreamingDirectory single iteration only

Simplify code and test slightly by not allowing users to declare a
StreamingDirectory that can be iterated through multiple times.

* Add better test for estimator ignoring stream dir

Previous test in uploader had a race condition, meaning it may not catch
all cases.

* Ignore atomic access in checklocks

Comparisons known to be done after all additions to the variables in
question.

* Implement reviewer feedback

* Remove unused function parameter
This commit is contained in:
ashmrtn
2022-06-14 19:08:49 -07:00
committed by GitHub
parent 07bad72874
commit 61e651d30c
16 changed files with 404 additions and 18 deletions

View File

@@ -97,7 +97,7 @@ func (c *commandSnapshotEstimate) run(ctx context.Context, rep repo.Repository)
return errors.Wrapf(err, "error creating policy tree for %v", sourceInfo)
}
if err := snapshotfs.Estimate(ctx, rep, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil {
if err := snapshotfs.Estimate(ctx, dir, policyTree, &ep, c.maxExamplesPerBucket); err != nil {
return errors.Wrap(err, "error estimating")
}

View File

@@ -58,6 +58,9 @@ type Directory interface {
Entry
Child(ctx context.Context, name string) (Entry, error)
IterateEntries(ctx context.Context, cb func(context.Context, Entry) error) error
// SupportsMultipleIterations returns true if the Directory supports iterating
// through the entries multiple times. Otherwise it returns false.
SupportsMultipleIterations() bool
}
// DirectoryWithSummary is optionally implemented by Directory that provide summary.

View File

@@ -102,6 +102,10 @@ type filesystemErrorEntry struct {
err error
}
func (fsd *filesystemDirectory) SupportsMultipleIterations() bool {
return true
}
func (fsd *filesystemDirectory) Size() int64 {
// force directory size to always be zero
return 0

View File

@@ -115,6 +115,10 @@ func (fsf *shallowFilesystemFile) Open(ctx context.Context) (fs.Reader, error) {
return nil, errors.New("shallowFilesystemFile.Open not supported")
}
func (fsd *shallowFilesystemDirectory) SupportsMultipleIterations() bool {
return false
}
func (fsd *shallowFilesystemDirectory) Child(ctx context.Context, name string) (fs.Entry, error) {
return nil, errors.New("shallowFilesystemDirectory.Child not supported")
}

View File

@@ -6,6 +6,7 @@
"errors"
"io"
"os"
"sync"
"time"
"github.com/kopia/kopia/fs"
@@ -83,6 +84,10 @@ func (sd *staticDirectory) IterateEntries(ctx context.Context, cb func(context.C
return nil
}
func (sd *staticDirectory) SupportsMultipleIterations() bool {
return true
}
// NewStaticDirectory returns a virtual static directory.
func NewStaticDirectory(name string, entries []fs.Entry) fs.Directory {
return &staticDirectory{
@@ -94,6 +99,67 @@ func NewStaticDirectory(name string, entries []fs.Entry) fs.Directory {
}
}
type streamingDirectory struct {
virtualEntry
// Used to generate the next entry and execute the callback on it.
// +checklocks:mu
callback func(context.Context, func(context.Context, fs.Entry) error) error
mu sync.Mutex
}
var errChildNotSupported = errors.New("streamingDirectory.Child not supported")
func (sd *streamingDirectory) Child(ctx context.Context, name string) (fs.Entry, error) {
return nil, errChildNotSupported
}
var errIteratorAlreadyUsed = errors.New("cannot use streaming directory iterator more than once") // +checklocksignore: mu
func (sd *streamingDirectory) getIterator() (func(context.Context, func(context.Context, fs.Entry) error) error, error) {
sd.mu.Lock()
defer sd.mu.Unlock()
if sd.callback == nil {
return nil, errIteratorAlreadyUsed
}
cb := sd.callback
sd.callback = nil
return cb, nil
}
func (sd *streamingDirectory) IterateEntries(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
cb, err := sd.getIterator()
if err != nil {
return err
}
return cb(ctx, callback)
}
func (sd *streamingDirectory) SupportsMultipleIterations() bool {
return false
}
// NewStreamingDirectory returns a directory that will call the given function
// when IterateEntries is executed.
func NewStreamingDirectory(
name string,
callback func(context.Context, func(context.Context, fs.Entry) error) error,
) fs.Directory {
return &streamingDirectory{
virtualEntry: virtualEntry{
name: name,
mode: defaultPermissions | os.ModeDir,
},
callback: callback,
}
}
// virtualFile is an implementation of fs.StreamingFile with an io.Reader.
type virtualFile struct {
virtualEntry
@@ -130,6 +196,7 @@ func StreamingFileFromReader(name string, reader io.Reader) fs.StreamingFile {
var (
_ fs.Directory = &staticDirectory{}
_ fs.Directory = &streamingDirectory{}
_ fs.StreamingFile = &virtualFile{}
_ fs.Entry = &virtualEntry{}
)

View File

@@ -1,13 +1,22 @@
package virtualfs
import (
"bytes"
"context"
"errors"
"os"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/testlogging"
)
const (
testFileName = "stream-file"
)
func TestStreamingFile(t *testing.T) {
@@ -25,21 +34,20 @@ func TestStreamingFile(t *testing.T) {
w.Close()
filename := "stream-file"
f := StreamingFileFromReader(filename, r)
f := StreamingFileFromReader(testFileName, r)
rootDir := NewStaticDirectory("root", []fs.Entry{f})
e, err := rootDir.Child(context.TODO(), filename)
e, err := rootDir.Child(testlogging.Context(t), testFileName)
if err != nil {
t.Fatalf("error getting child entry: %v", err)
}
if e.Name() != filename {
t.Fatalf("did not get expected filename: (actual) %v != %v (expected)", e.Name(), filename)
if e.Name() != testFileName {
t.Fatalf("did not get expected filename: (actual) %v != %v (expected)", e.Name(), testFileName)
}
entries, err := fs.GetAllEntries(context.TODO(), rootDir)
entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir)
if err != nil {
t.Fatalf("error getting dir entries %v", err)
}
@@ -49,7 +57,7 @@ func TestStreamingFile(t *testing.T) {
}
// Read and compare data
reader, err := f.GetReader(context.TODO())
reader, err := f.GetReader(testlogging.Context(t))
if err != nil {
t.Fatalf("error getting streaming file reader: %v", err)
}
@@ -80,11 +88,10 @@ func TestStreamingFileGetReader(t *testing.T) {
w.Close()
filename := "stream-file"
f := StreamingFileFromReader(filename, r)
f := StreamingFileFromReader(testFileName, r)
// Read and compare data
reader, err := f.GetReader(context.TODO())
reader, err := f.GetReader(testlogging.Context(t))
if err != nil {
t.Fatalf("error getting streaming file reader: %v", err)
}
@@ -100,7 +107,7 @@ func TestStreamingFileGetReader(t *testing.T) {
}
// Second call to GetReader must fail
_, err = f.GetReader(context.TODO())
_, err = f.GetReader(testlogging.Context(t))
if err == nil {
t.Fatal("expected error, got none")
}
@@ -109,3 +116,110 @@ func TestStreamingFileGetReader(t *testing.T) {
t.Fatalf("did not get expected error: (actual) %v != %v (expected)", err, errReaderAlreadyUsed)
}
}
func TestStreamingDirectory(t *testing.T) {
// Create a temporary file with test data
content := []byte("Temporary file content")
r := bytes.NewReader(content)
f := StreamingFileFromReader(testFileName, r)
rootDir := NewStreamingDirectory(
"root",
func(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
return callback(ctx, f)
},
)
entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir)
require.NoError(t, err)
assert.Len(t, entries, 1)
e := entries[0]
require.Equal(t, e.Name(), testFileName)
// Read and compare data
reader, err := f.GetReader(testlogging.Context(t))
require.NoError(t, err)
result := make([]byte, len(content))
_, err = reader.Read(result)
require.NoError(t, err)
assert.True(t, reflect.DeepEqual(result, content))
}
func TestStreamingDirectory_MultipleIterationsFails(t *testing.T) {
// Create a temporary file with test data
content := []byte("Temporary file content")
r := bytes.NewReader(content)
f := StreamingFileFromReader(testFileName, r)
rootDir := NewStreamingDirectory(
"root",
func(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
return callback(ctx, f)
},
)
entries, err := fs.GetAllEntries(testlogging.Context(t), rootDir)
require.NoError(t, err)
assert.Len(t, entries, 1)
_, err = fs.GetAllEntries(testlogging.Context(t), rootDir)
assert.Error(t, err)
}
var errCallback = errors.New("callback error")
func TestStreamingDirectory_ReturnsCallbackError(t *testing.T) {
// Create a temporary file with test data
content := []byte("Temporary file content")
r := bytes.NewReader(content)
f := StreamingFileFromReader(testFileName, r)
rootDir := NewStreamingDirectory(
"root",
func(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
return callback(ctx, f)
},
)
err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error {
return errCallback
})
assert.ErrorIs(t, err, errCallback)
}
var errIteration = errors.New("iteration error")
func TestStreamingDirectory_ReturnsReadDirError(t *testing.T) {
rootDir := NewStreamingDirectory(
"root",
func(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
return errIteration
},
)
err := rootDir.IterateEntries(testlogging.Context(t), func(context.Context, fs.Entry) error {
return nil
})
assert.ErrorIs(t, err, errIteration)
}

View File

@@ -265,6 +265,11 @@ func (imd *Directory) OnReaddir(cb func()) {
imd.onReaddir = cb
}
// SupportsMultipleIterations returns whether this directory can be iterated through multiple times.
func (imd *Directory) SupportsMultipleIterations() bool {
return true
}
// Child gets the named child of a directory.
func (imd *Directory) Child(ctx context.Context, name string) (fs.Entry, error) {
e := fs.FindByName(imd.children, name)
@@ -353,6 +358,21 @@ func NewDirectory() *Directory {
}
}
// NewFile returns a new mock file with the given name, contents, and mode.
func NewFile(name string, content []byte, permissions os.FileMode) *File {
return &File{
entry: entry{
name: name,
mode: permissions,
size: int64(len(content)),
modTime: DefaultModTime,
},
source: func() (ReaderSeekerCloser, error) {
return readerSeekerCloser{bytes.NewReader(content)}, nil
},
}
}
// ErrorEntry is mock in-memory implementation of fs.ErrorEntry.
type ErrorEntry struct {
entry

View File

@@ -103,8 +103,6 @@ func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiEr
return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body")
}
rep := rc.rep
resolvedRoot := filepath.Clean(ospath.ResolveUserFriendlyPath(req.Root, true))
e, err := localfs.NewEntry(resolvedRoot)
@@ -140,7 +138,7 @@ func handleEstimate(ctx context.Context, rc requestContext) (interface{}, *apiEr
ctrl.OnCancel(cancel)
// nolint:wrapcheck
return snapshotfs.Estimate(estimatectx, rep, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket)
return snapshotfs.Estimate(estimatectx, dir, policyTree, estimateTaskProgress{ctrl}, req.MaxExamplesPerBucket)
})
taskID := <-taskIDChan

View File

@@ -53,6 +53,10 @@ func (s *repositoryAllSources) LocalFilesystemPath() string {
return ""
}
func (s *repositoryAllSources) SupportsMultipleIterations() bool {
return true
}
func (s *repositoryAllSources) Child(ctx context.Context, name string) (fs.Entry, error) {
// nolint:wrapcheck
return fs.IterateEntriesAndFindChild(ctx, s, name)

View File

@@ -11,7 +11,6 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/ignorefs"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
)
@@ -74,7 +73,7 @@ type EstimateProgress interface {
// Estimate walks the provided directory tree and invokes provided progress callback as it discovers
// items to be snapshotted.
func Estimate(ctx context.Context, rep repo.Repository, entry fs.Directory, policyTree *policy.Tree, progress EstimateProgress, maxExamplesPerBucket int) error {
func Estimate(ctx context.Context, entry fs.Directory, policyTree *policy.Tree, progress EstimateProgress, maxExamplesPerBucket int) error {
stats := &snapshot.Stats{}
ed := []string{}
ib := makeBuckets()
@@ -125,6 +124,10 @@ type processEntryError struct {
case fs.Directory:
atomic.AddInt32(&stats.TotalDirectoryCount, 1)
if !entry.SupportsMultipleIterations() {
return nil
}
progress.Processing(ctx, relativePath)
err := entry.IterateEntries(ctx, func(c context.Context, child fs.Entry) error {

View File

@@ -0,0 +1,69 @@
package snapshotfs_test
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
type fakeProgress struct {
t *testing.T
expectedFiles int32
expectedDirectories int32
expectedErrors int32
}
func (p *fakeProgress) Processing(context.Context, string) {}
func (p *fakeProgress) Error(context.Context, string, error, bool) {}
// +checklocksignore.
func (p *fakeProgress) Stats(
ctx context.Context,
s *snapshot.Stats,
includedFiles, excludedFiles snapshotfs.SampleBuckets,
excludedDirs []string,
final bool,
) {
if !final {
return
}
assert.Equal(p.t, s.ErrorCount, p.expectedErrors)
assert.Equal(p.t, s.TotalFileCount, p.expectedFiles)
assert.Equal(p.t, s.TotalDirectoryCount, p.expectedDirectories)
}
func TestEstimate_SkipsStreamingDirectory(t *testing.T) {
f := mockfs.NewFile("f1", []byte{1, 2, 3}, 0o777)
rootDir := virtualfs.NewStaticDirectory("root", []fs.Entry{
virtualfs.NewStreamingDirectory(
"a-dir",
func(ctx context.Context, callback func(context.Context, fs.Entry) error) error {
return callback(ctx, f)
},
),
})
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
p := &fakeProgress{
t: t,
expectedFiles: 0,
expectedDirectories: 2,
expectedErrors: 0,
}
err := snapshotfs.Estimate(testlogging.Context(t), rootDir, policyTree, p, 1)
require.NoError(t, err)
}

View File

@@ -113,6 +113,10 @@ func (rd *repositoryDirectory) Summary(ctx context.Context) (*fs.DirectorySummar
return rd.summary, nil
}
func (rd *repositoryDirectory) SupportsMultipleIterations() bool {
return true
}
func (rd *repositoryDirectory) Child(ctx context.Context, name string) (fs.Entry, error) {
if err := rd.ensureDirEntriesLoaded(ctx); err != nil {
return nil, err

View File

@@ -57,6 +57,10 @@ func (s *sourceDirectories) LocalFilesystemPath() string {
return ""
}
func (s *sourceDirectories) SupportsMultipleIterations() bool {
return true
}
func (s *sourceDirectories) Child(ctx context.Context, name string) (fs.Entry, error) {
// nolint:wrapcheck
return fs.IterateEntriesAndFindChild(ctx, s, name)

View File

@@ -55,6 +55,10 @@ func (s *sourceSnapshots) LocalFilesystemPath() string {
return ""
}
func (s *sourceSnapshots) SupportsMultipleIterations() bool {
return true
}
func (s *sourceSnapshots) Child(ctx context.Context, name string) (fs.Entry, error) {
// nolint:wrapcheck
return fs.IterateEntriesAndFindChild(ctx, s, name)

View File

@@ -36,7 +36,7 @@ func (u *Uploader) scanDirectory(ctx context.Context, dir fs.Directory, policyTr
return res, nil
}
err := Estimate(ctx, u.repo, dir, policyTree, &res, 1)
err := Estimate(ctx, dir, policyTree, &res, 1)
return res, err
}

View File

@@ -15,6 +15,7 @@
"github.com/kylelemons/godebug/pretty"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/fs"
@@ -767,6 +768,93 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
}
}
func TestUpload_StreamingDirectory(t *testing.T) {
ctx := testlogging.Context(t)
th := newUploadTestHarness(ctx, t)
defer th.cleanup()
t.Logf("Uploading streaming directory with mock file")
u := NewUploader(th.repo)
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
files := []fs.Entry{
mockfs.NewFile("f1", []byte{1, 2, 3}, defaultPermissions),
}
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
virtualfs.NewStreamingDirectory(
"stream-directory",
func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error {
for _, f := range files {
if err := callback(innerCtx, f); err != nil {
return err
}
}
return nil
},
),
})
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
require.NoError(t, err)
assert.Equal(t, atomic.LoadInt32(&man.Stats.CachedFiles), int32(0))
assert.Equal(t, atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1))
assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(2))
assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1))
}
func TestUpload_StreamingDirectoryWithIgnoredFile(t *testing.T) {
ctx := testlogging.Context(t)
th := newUploadTestHarness(ctx, t)
defer th.cleanup()
t.Logf("Uploading streaming directory with some ignored mock files")
u := NewUploader(th.repo)
policyTree := policy.BuildTree(map[string]*policy.Policy{
".": {
FilesPolicy: policy.FilesPolicy{
IgnoreRules: []string{"f2"},
},
},
}, policy.DefaultPolicy)
files := []fs.Entry{
mockfs.NewFile("f1", []byte{1, 2, 3}, defaultPermissions),
mockfs.NewFile("f2", []byte{1, 2, 3, 4}, defaultPermissions),
}
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
virtualfs.NewStreamingDirectory(
"stream-directory",
func(innerCtx context.Context, callback func(context.Context, fs.Entry) error) error {
for _, f := range files {
if err := callback(innerCtx, f); err != nil {
return err
}
}
return nil
},
),
})
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
require.NoError(t, err)
assert.Equal(t, atomic.LoadInt32(&man.Stats.CachedFiles), int32(0))
assert.Equal(t, atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1))
assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(2))
assert.Equal(t, atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1))
}
type mockLogger struct {
logging.Logger