mirror of
https://github.com/kopia/kopia.git
synced 2026-03-25 01:21:16 -04:00
refactor(general): upload tests (#4619)
- use 'require/assert' - refactor TestUploadMetadataCompression as a table test - allow tests to run in parallel - use t.Cleanup and add a missing cleanup - use maps.Clone
This commit is contained in:
@@ -7,9 +7,9 @@
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"maps"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -75,33 +75,27 @@ func newUploadTestHarness(ctx context.Context, t *testing.T) *uploadTestHarness
|
||||
storage, err := filesystem.New(ctx, &filesystem.Options{
|
||||
Path: repoDir,
|
||||
}, true)
|
||||
if err != nil {
|
||||
panic("cannot create storage directory: " + err.Error())
|
||||
}
|
||||
require.NoError(t, err, "cannot create storage directory")
|
||||
|
||||
faulty := blobtesting.NewFaultyStorage(storage)
|
||||
logged := bloblogging.NewWrapper(faulty, testlogging.Printf(t.Logf, "{STORAGE} "), "")
|
||||
rec := repotesting.NewReconnectableStorage(t, logged)
|
||||
|
||||
if initerr := repo.Initialize(ctx, rec, &repo.NewRepositoryOptions{}, masterPassword); initerr != nil {
|
||||
panic("unable to create repository: " + initerr.Error())
|
||||
}
|
||||
err = repo.Initialize(ctx, rec, &repo.NewRepositoryOptions{}, masterPassword)
|
||||
require.NoError(t, err, "unable to create repository")
|
||||
|
||||
t.Logf("repo dir: %v", repoDir)
|
||||
|
||||
configFile := filepath.Join(repoDir, ".kopia.config")
|
||||
if conerr := repo.Connect(ctx, configFile, rec, masterPassword, nil); conerr != nil {
|
||||
panic("unable to connect to repository: " + conerr.Error())
|
||||
}
|
||||
err = repo.Connect(ctx, configFile, rec, masterPassword, nil)
|
||||
require.NoError(t, err, "unable to connect to repository")
|
||||
|
||||
ft := faketime.NewTimeAdvance(time.Date(2018, time.February, 6, 0, 0, 0, 0, time.UTC))
|
||||
|
||||
rep, err := repo.Open(ctx, configFile, masterPassword, &repo.Options{
|
||||
TimeNowFunc: ft.NowFunc(),
|
||||
})
|
||||
if err != nil {
|
||||
panic("unable to open repository: " + err.Error())
|
||||
}
|
||||
require.NoError(t, err, "unable to open repository")
|
||||
|
||||
sourceDir := mockfs.NewDirectory()
|
||||
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
||||
@@ -124,9 +118,7 @@ func newUploadTestHarness(ctx context.Context, t *testing.T) *uploadTestHarness
|
||||
sourceDir.AddFile("d2/d1/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
||||
|
||||
_, w, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
||||
if err != nil {
|
||||
panic("writer creation error: " + err.Error())
|
||||
}
|
||||
require.NoError(t, err, "writer creation error")
|
||||
|
||||
th := &uploadTestHarness{
|
||||
sourceDir: sourceDir,
|
||||
@@ -141,10 +133,12 @@ func newUploadTestHarness(ctx context.Context, t *testing.T) *uploadTestHarness
|
||||
|
||||
//nolint:gocyclo
|
||||
func TestUpload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
t.Logf("Uploading s1")
|
||||
|
||||
@@ -153,83 +147,48 @@ func TestUpload(t *testing.T) {
|
||||
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
||||
|
||||
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
if err != nil {
|
||||
t.Errorf("Upload error: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload error")
|
||||
|
||||
t.Logf("s1: %v", s1.RootEntry)
|
||||
|
||||
t.Logf("Uploading s2")
|
||||
|
||||
s2, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s1)
|
||||
if err != nil {
|
||||
t.Errorf("Upload error: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload error")
|
||||
|
||||
if !objectIDsEqual(s2.RootObjectID(), s1.RootObjectID()) {
|
||||
t.Errorf("expected s1.RootObjectID==s2.RootObjectID, got %v and %v", s1.RootObjectID().String(), s2.RootObjectID().String())
|
||||
}
|
||||
|
||||
if got, want := atomic.LoadInt32(&s1.Stats.CachedFiles), int32(0); got != want {
|
||||
t.Errorf("unexpected s1 cached files: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
// All non-cached files from s1 are now cached and there are no non-cached files since nothing changed.
|
||||
if got, want := atomic.LoadInt32(&s2.Stats.CachedFiles), atomic.LoadInt32(&s1.Stats.NonCachedFiles); got != want {
|
||||
t.Errorf("unexpected s2 cached files: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := atomic.LoadInt32(&s2.Stats.NonCachedFiles), int32(0); got != want {
|
||||
t.Errorf("unexpected non-cached files: %v", got)
|
||||
}
|
||||
assert.Equal(t, s1.RootObjectID(), s2.RootObjectID(), "root object ids do not match")
|
||||
assert.Zero(t, atomic.LoadInt32(&s1.Stats.CachedFiles), "unexpected s1 cached files")
|
||||
assert.Equal(t, atomic.LoadInt32(&s1.Stats.NonCachedFiles), atomic.LoadInt32(&s2.Stats.CachedFiles),
|
||||
"unexpected s2 cached files: all non-cached files from s1 are now cached and there are no non-cached files since nothing changed")
|
||||
assert.Zero(t, atomic.LoadInt32(&s2.Stats.NonCachedFiles), "unexpected non-cached files")
|
||||
|
||||
// Add one more file, the s1.RootObjectID should change.
|
||||
th.sourceDir.AddFile("d2/d1/f3", []byte{1, 2, 3, 4, 5}, defaultPermissions)
|
||||
|
||||
s3, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s1)
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload failed")
|
||||
|
||||
if objectIDsEqual(s2.RootObjectID(), s3.RootObjectID()) {
|
||||
t.Errorf("expected s3.RootObjectID!=s2.RootObjectID, got %v", s3.RootObjectID().String())
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&s3.Stats.NonCachedFiles) != 1 {
|
||||
// one file is not cached, which causes "./d2/d1/", "./d2/" and "./" to be changed.
|
||||
t.Errorf("unexpected s3 stats: %+v", s3.Stats)
|
||||
}
|
||||
assert.NotEqual(t, s2.RootObjectID(), s3.RootObjectID(), "expected s3.RootObjectID!=s2.RootObjectID")
|
||||
assert.Equal(t, int32(1), atomic.LoadInt32(&s3.Stats.NonCachedFiles), "unexpected s3 stats:", s3.Stats,
|
||||
"one file is not cached, which causes './d2/d1/', './d2/' and './' to be changed.")
|
||||
|
||||
// Now remove the added file, OID should be identical to the original before the file got added.
|
||||
th.sourceDir.Subdir("d2", "d1").Remove("f3")
|
||||
|
||||
s4, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s1)
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload failed")
|
||||
|
||||
if !objectIDsEqual(s4.RootObjectID(), s1.RootObjectID()) {
|
||||
t.Errorf("expected s4.RootObjectID==s1.RootObjectID, got %v and %v", s4.RootObjectID(), s1.RootObjectID())
|
||||
}
|
||||
assert.Equal(t, s4.RootObjectID(), s1.RootObjectID(), "expected s4.RootObjectID==s1.RootObjectID")
|
||||
|
||||
// Everything is still cached.
|
||||
if atomic.LoadInt32(&s4.Stats.CachedFiles) != atomic.LoadInt32(&s1.Stats.NonCachedFiles) || atomic.LoadInt32(&s4.Stats.NonCachedFiles) != 0 {
|
||||
t.Errorf("unexpected s4 stats: %+v", s4.Stats)
|
||||
}
|
||||
assert.Equal(t, atomic.LoadInt32(&s4.Stats.CachedFiles), atomic.LoadInt32(&s1.Stats.NonCachedFiles), "unexpected s4 stats:", s4.Stats)
|
||||
assert.Zero(t, atomic.LoadInt32(&s4.Stats.NonCachedFiles), "unexpected s4 stats:", s4.Stats)
|
||||
|
||||
s5, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s3)
|
||||
if err != nil {
|
||||
t.Errorf("upload failed: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload failed")
|
||||
|
||||
if !objectIDsEqual(s4.RootObjectID(), s5.RootObjectID()) {
|
||||
t.Errorf("expected s4.RootObjectID==s5.RootObjectID, got %v and %v", s4.RootObjectID(), s5.RootObjectID())
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&s5.Stats.NonCachedFiles) != 0 {
|
||||
// no files are changed, but one file disappeared which caused "./d2/d1/", "./d2/" and "./" to be changed.
|
||||
t.Errorf("unexpected s5 stats: %+v", s5.Stats)
|
||||
}
|
||||
assert.Equal(t, s4.RootObjectID(), s5.RootObjectID(), "expected s4.RootObjectID==s5.RootObjectID")
|
||||
require.Zero(t, atomic.LoadInt32(&s5.Stats.NonCachedFiles), "unexpected s5 stats:", s5.Stats,
|
||||
"no files are changed, but one file disappeared which caused './d2/d1/', './d2/' and './' to be changed")
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
@@ -266,87 +225,85 @@ func verifyMetadataCompressor(t *testing.T, ctx context.Context, rep repo.Reposi
|
||||
|
||||
for _, e := range entries {
|
||||
cid, _, ok := e.objectID.ContentID()
|
||||
require.True(t, ok)
|
||||
if !assert.True(t, ok) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !cid.HasPrefix() {
|
||||
continue
|
||||
}
|
||||
|
||||
info, err := rep.ContentInfo(ctx, cid)
|
||||
if err != nil {
|
||||
t.Errorf("failed to get content info: %v", err)
|
||||
}
|
||||
|
||||
require.Equal(t, comp, info.CompressionHeaderID)
|
||||
require.NoError(t, err, "failed to get content info for %v", cid)
|
||||
assert.Equal(t, comp, info.CompressionHeaderID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadMetadataCompression(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
buildPolicy := func(compressor compression.Name) *policy.Tree {
|
||||
return policy.BuildTree(map[string]*policy.Policy{
|
||||
".": {
|
||||
MetadataCompressionPolicy: policy.MetadataCompressionPolicy{
|
||||
CompressorName: compressor,
|
||||
},
|
||||
},
|
||||
}, policy.DefaultPolicy)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
policyTree *policy.Tree
|
||||
compressionID compression.HeaderID
|
||||
}{
|
||||
{
|
||||
name: "default metadata compression",
|
||||
policyTree: policy.BuildTree(nil, policy.DefaultPolicy),
|
||||
compressionID: compression.HeaderZstdFastest,
|
||||
},
|
||||
{
|
||||
name: "disable metadata compression",
|
||||
policyTree: buildPolicy("none"),
|
||||
compressionID: content.NoCompression,
|
||||
},
|
||||
{
|
||||
name: "enable metadata compression",
|
||||
policyTree: buildPolicy("gzip"),
|
||||
compressionID: compression.ByName["gzip"].HeaderID(),
|
||||
},
|
||||
}
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
t.Run("default metadata compression", func(t *testing.T) {
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
defer th.cleanup()
|
||||
u := NewUploader(th.repo)
|
||||
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
||||
|
||||
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
if err != nil {
|
||||
t.Errorf("Upload error: %v", err)
|
||||
}
|
||||
for _, tc := range cases {
|
||||
policyTree := tc.policyTree
|
||||
compID := tc.compressionID
|
||||
|
||||
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
|
||||
entries := findAllEntries(t, ctx, dir)
|
||||
verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest)
|
||||
})
|
||||
t.Run("disable metadata compression", func(t *testing.T) {
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
defer th.cleanup()
|
||||
u := NewUploader(th.repo)
|
||||
policyTree := policy.BuildTree(map[string]*policy.Policy{
|
||||
".": {
|
||||
MetadataCompressionPolicy: policy.MetadataCompressionPolicy{
|
||||
CompressorName: "none",
|
||||
},
|
||||
},
|
||||
}, policy.DefaultPolicy)
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
if err != nil {
|
||||
t.Errorf("Upload error: %v", err)
|
||||
}
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
t.Cleanup(th.cleanup)
|
||||
u := NewUploader(th.repo)
|
||||
|
||||
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
|
||||
entries := findAllEntries(t, ctx, dir)
|
||||
verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression)
|
||||
})
|
||||
t.Run("set metadata compressor", func(t *testing.T) {
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
defer th.cleanup()
|
||||
u := NewUploader(th.repo)
|
||||
policyTree := policy.BuildTree(map[string]*policy.Policy{
|
||||
".": {
|
||||
MetadataCompressionPolicy: policy.MetadataCompressionPolicy{
|
||||
CompressorName: "gzip",
|
||||
},
|
||||
},
|
||||
}, policy.DefaultPolicy)
|
||||
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
require.NoError(t, err, "upload error")
|
||||
|
||||
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
if err != nil {
|
||||
t.Errorf("Upload error: %v", err)
|
||||
}
|
||||
|
||||
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
|
||||
entries := findAllEntries(t, ctx, dir)
|
||||
verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID())
|
||||
})
|
||||
dir := snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory)
|
||||
entries := findAllEntries(t, ctx, dir)
|
||||
verifyMetadataCompressor(t, ctx, th.repo, entries, compID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
th.sourceDir.FailReaddir(errTest)
|
||||
|
||||
@@ -360,10 +317,12 @@ func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUploadDoesNotReportProgressForIgnoredFilesTwice(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
sourceDir := mockfs.NewDirectory()
|
||||
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
||||
@@ -401,10 +360,12 @@ func TestUploadDoesNotReportProgressForIgnoredFilesTwice(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_SubDirectoryReadFailureFailFast(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
||||
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
||||
@@ -427,10 +388,12 @@ func TestUpload_SubDirectoryReadFailureFailFast(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_SubDirectoryReadFailureIgnoredNoFailFast(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
||||
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
||||
@@ -457,10 +420,12 @@ func TestUpload_SubDirectoryReadFailureIgnoredNoFailFast(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_ErrorEntries(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
th.sourceDir.Subdir("d1").AddErrorEntry("some-unknown-entry", os.ModeIrregular, fs.ErrUnknown)
|
||||
th.sourceDir.Subdir("d1").AddErrorEntry("some-failed-entry", 0, errors.New("some-other-error"))
|
||||
@@ -538,9 +503,7 @@ func TestUpload_ErrorEntries(t *testing.T) {
|
||||
})
|
||||
|
||||
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedErrors := entryPathToError{
|
||||
"d1/some-failed-entry": errors.New("some-other-error"),
|
||||
@@ -572,10 +535,12 @@ func verifyErrors(t *testing.T, man *snapshot.Manifest, wantFatalErrors, wantIgn
|
||||
}
|
||||
|
||||
func TestUpload_SubDirectoryReadFailureNoFailFast(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
||||
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
||||
@@ -597,10 +562,12 @@ func TestUpload_SubDirectoryReadFailureNoFailFast(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_SubDirectoryReadFailureSomeIgnoredNoFailFast(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
||||
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
||||
@@ -621,9 +588,7 @@ func TestUpload_SubDirectoryReadFailureSomeIgnoredNoFailFast(t *testing.T) {
|
||||
},
|
||||
}, policy.DefaultPolicy)
|
||||
|
||||
if got, want := policyTree.Child("d3").EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false), true; got != want {
|
||||
t.Fatalf("policy not effective")
|
||||
}
|
||||
require.True(t, policyTree.Child("d3").EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false), "policy not effective")
|
||||
|
||||
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
require.NoError(t, err)
|
||||
@@ -647,12 +612,14 @@ func (mp *mockProgress) FinishedFile(relativePath string, err error) {
|
||||
}
|
||||
|
||||
func TestUpload_FinishedFileProgress(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
mu := sync.Mutex{}
|
||||
filesFinished := 0
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
t.Logf("checking FinishedFile callbacks")
|
||||
|
||||
@@ -721,6 +688,8 @@ func TestUpload_FinishedFileProgress(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_SymlinkStats(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
@@ -760,10 +729,12 @@ func TestUpload_SymlinkStats(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUploadWithCheckpointing(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
|
||||
@@ -793,10 +764,7 @@ func TestUploadWithCheckpointing(t *testing.T) {
|
||||
|
||||
// Be paranoid and make a copy of the labels in the uploader so we know stuff
|
||||
// didn't change.
|
||||
u.CheckpointLabels = make(map[string]string, len(labels))
|
||||
for k, v := range labels {
|
||||
u.CheckpointLabels[k] = v
|
||||
}
|
||||
u.CheckpointLabels = maps.Clone(labels)
|
||||
|
||||
// inject a action into mock filesystem to trigger and wait for checkpoints at few places.
|
||||
// the places are not important, what's important that those are 3 separate points in time.
|
||||
@@ -817,32 +785,27 @@ func TestUploadWithCheckpointing(t *testing.T) {
|
||||
}
|
||||
|
||||
s, err := u.Upload(ctx, th.sourceDir, policyTree, si)
|
||||
if err != nil {
|
||||
t.Errorf("Upload error: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload error")
|
||||
|
||||
checkpoints, err := snapshot.ListSnapshots(ctx, th.repo, si)
|
||||
if err != nil {
|
||||
t.Fatalf("error listing snapshots: %v", err)
|
||||
}
|
||||
|
||||
require.NoError(t, err, "error listing snapshots")
|
||||
require.Len(t, checkpoints, len(dirsToCheckpointAt))
|
||||
|
||||
for _, cp := range checkpoints {
|
||||
if got, want := cp.IncompleteReason, IncompleteReasonCheckpoint; got != want {
|
||||
t.Errorf("unexpected incompleteReason %q, want %q", got, want)
|
||||
}
|
||||
|
||||
assert.Equal(t, IncompleteReasonCheckpoint, cp.IncompleteReason, "unexpected incompleteReason")
|
||||
assert.Equal(t, time.Duration(1), s.StartTime.Sub(cp.StartTime))
|
||||
|
||||
assert.Equal(t, labels, cp.Tags)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelUploadUploadsBlobsInParallel(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
u.ParallelUploads = 13
|
||||
|
||||
@@ -894,7 +857,6 @@ func TestParallelUploadUploadsBlobsInParallel(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, th.repo.Flush(ctx))
|
||||
|
||||
require.Positive(t, maxParallelCalls.Load())
|
||||
}
|
||||
|
||||
@@ -906,10 +868,12 @@ func randomBytes(n int64) []byte {
|
||||
}
|
||||
|
||||
func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
t.Logf("Uploading static directory with streaming file")
|
||||
|
||||
@@ -921,13 +885,10 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
|
||||
tmpContent := []byte("Streaming Temporary file content")
|
||||
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
t.Fatalf("error creating pipe file: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "error creating pipe file")
|
||||
|
||||
if _, err = w.Write(tmpContent); err != nil {
|
||||
t.Fatalf("error writing to pipe file: %v", err)
|
||||
}
|
||||
_, err = w.Write(tmpContent)
|
||||
require.NoError(t, err, "error writing to pipe file")
|
||||
|
||||
w.Close()
|
||||
|
||||
@@ -936,35 +897,24 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
|
||||
})
|
||||
|
||||
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
|
||||
if err != nil {
|
||||
t.Fatalf("Upload error: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "upload error")
|
||||
|
||||
if got, want := atomic.LoadInt32(&man.Stats.CachedFiles), int32(0); got != want {
|
||||
t.Fatalf("unexpected manifest cached files: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := atomic.LoadInt32(&man.Stats.NonCachedFiles), int32(1); got != want {
|
||||
// one file is not cached
|
||||
t.Fatalf("unexpected manifest non-cached files: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := atomic.LoadInt32(&man.Stats.TotalDirectoryCount), int32(1); got != want {
|
||||
// must have one directory
|
||||
t.Fatalf("unexpected manifest directory count: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if got, want := atomic.LoadInt32(&man.Stats.TotalFileCount), int32(1); got != want {
|
||||
// must have one file
|
||||
t.Fatalf("unexpected manifest file count: %v, want %v", got, want)
|
||||
}
|
||||
require.Zero(t, atomic.LoadInt32(&man.Stats.CachedFiles), "unexpected manifest cached files")
|
||||
// one file is not cached
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.NonCachedFiles), "unexpected manifest non-cached files")
|
||||
// must have one directory
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalDirectoryCount), "unexpected manifest directory count")
|
||||
// must have one file
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalFileCount), "unexpected manifest file count")
|
||||
}
|
||||
|
||||
func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
|
||||
@@ -992,11 +942,12 @@ func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T)
|
||||
|
||||
// Write out pending data so the below size check compares properly.
|
||||
require.NoError(t, th.repo.Flush(ctx), "flushing repo")
|
||||
|
||||
assert.Less(t, testutil.MustGetTotalDirSize(t, th.repoDir), int64(14000))
|
||||
require.Less(t, testutil.MustGetTotalDirSize(t, th.repoDir), int64(14000))
|
||||
}
|
||||
|
||||
func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpContent := []byte("Streaming Temporary file content")
|
||||
mt := time.Date(2021, 1, 2, 3, 4, 5, 0, time.UTC)
|
||||
|
||||
@@ -1025,10 +976,12 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
u.ForceHashPercentage = 0
|
||||
@@ -1071,10 +1024,12 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_StreamingDirectory(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
t.Logf("Uploading streaming directory with mock file")
|
||||
|
||||
@@ -1103,10 +1058,12 @@ func TestUpload_StreamingDirectory(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpload_StreamingDirectoryWithIgnoredFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
t.Logf("Uploading streaming directory with some ignored mock files")
|
||||
|
||||
@@ -1180,10 +1137,12 @@ func (w *mockLogger) Sync() error {
|
||||
}
|
||||
|
||||
func TestParallelUploadDedup(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
t.Logf("Uploading static directory with streaming file")
|
||||
|
||||
@@ -1233,10 +1192,12 @@ func TestParallelUploadDedup(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestParallelUploadOfLargeFiles(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
u.ParallelUploads = 10
|
||||
@@ -1374,6 +1335,8 @@ type loggedAction struct {
|
||||
|
||||
//nolint:maintidx
|
||||
func TestUploadLogging(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sourceDir := mockfs.NewDirectory()
|
||||
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
||||
sourceDir.AddFile("f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
||||
@@ -1628,6 +1591,8 @@ func TestUploadLogging(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ml := &mockLogger{}
|
||||
|
||||
logUploader := zap.New(
|
||||
@@ -1663,7 +1628,7 @@ func TestUploadLogging(t *testing.T) {
|
||||
})
|
||||
th := newUploadTestHarness(ctx, t)
|
||||
|
||||
defer th.cleanup()
|
||||
t.Cleanup(th.cleanup)
|
||||
|
||||
u := NewUploader(th.repo)
|
||||
|
||||
@@ -1738,7 +1703,3 @@ func verifyLogDetails(t *testing.T, desc string, wantDetailKeys []string, keysAn
|
||||
sort.Strings(wantDetailKeys)
|
||||
require.Equal(t, wantDetailKeys, gotDetailKeys, "invalid details for "+desc)
|
||||
}
|
||||
|
||||
func objectIDsEqual(o1, o2 object.ID) bool {
|
||||
return reflect.DeepEqual(o1, o2)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user