mirror of
https://github.com/kopia/kopia.git
synced 2026-05-10 07:44:01 -04:00
Cleanup nits: - get error handling policy upfront and improve readability in uploader - update error message - update field documentation and update flag description - remove unused function - const `isWindows` and remove redundant condition check - add `getEnvVarBool` helper - refactor common helper for mockfs.AddError* functions, and add mockfs.AddErrorEntry<Type> wrappers for clarity. - removed list of skipped tests from gotestsum summary
1822 lines
53 KiB
Go
1822 lines
53 KiB
Go
package upload
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
|
|
"github.com/kopia/kopia/fs"
|
|
"github.com/kopia/kopia/fs/localfs"
|
|
"github.com/kopia/kopia/fs/virtualfs"
|
|
"github.com/kopia/kopia/internal/blobtesting"
|
|
"github.com/kopia/kopia/internal/clock"
|
|
"github.com/kopia/kopia/internal/faketime"
|
|
"github.com/kopia/kopia/internal/mockfs"
|
|
"github.com/kopia/kopia/internal/repotesting"
|
|
"github.com/kopia/kopia/internal/testlogging"
|
|
"github.com/kopia/kopia/internal/testutil"
|
|
"github.com/kopia/kopia/repo"
|
|
"github.com/kopia/kopia/repo/blob/filesystem"
|
|
bloblogging "github.com/kopia/kopia/repo/blob/logging"
|
|
"github.com/kopia/kopia/repo/compression"
|
|
"github.com/kopia/kopia/repo/content"
|
|
"github.com/kopia/kopia/repo/logging"
|
|
"github.com/kopia/kopia/repo/object"
|
|
"github.com/kopia/kopia/snapshot"
|
|
"github.com/kopia/kopia/snapshot/policy"
|
|
"github.com/kopia/kopia/snapshot/snapshotfs"
|
|
)
|
|
|
|
const (
|
|
masterPassword = "foofoofoofoofoofoofoofoo"
|
|
defaultPermissions = 0o777
|
|
)
|
|
|
|
type uploadTestHarness struct {
|
|
sourceDir *mockfs.Directory
|
|
repoDir string
|
|
repo repo.RepositoryWriter
|
|
ft *faketime.TimeAdvance
|
|
faulty *blobtesting.FaultyStorage
|
|
}
|
|
|
|
var errTest = errors.New("test error")
|
|
|
|
type entryPathToError = map[string]error
|
|
|
|
func (th *uploadTestHarness) cleanup() {
|
|
os.RemoveAll(th.repoDir)
|
|
}
|
|
|
|
func newUploadTestHarness(ctx context.Context, t *testing.T) *uploadTestHarness {
|
|
t.Helper()
|
|
|
|
repoDir := testutil.TempDirectory(t)
|
|
|
|
storage, err := filesystem.New(ctx, &filesystem.Options{
|
|
Path: repoDir,
|
|
}, true)
|
|
require.NoError(t, err, "cannot create storage directory")
|
|
|
|
faulty := blobtesting.NewFaultyStorage(storage)
|
|
logged := bloblogging.NewWrapper(faulty, testlogging.Printf(t.Logf, "{STORAGE} "), nil, "")
|
|
rec := repotesting.NewReconnectableStorage(t, logged)
|
|
|
|
err = repo.Initialize(ctx, rec, &repo.NewRepositoryOptions{}, masterPassword)
|
|
require.NoError(t, err, "unable to create repository")
|
|
|
|
t.Logf("repo dir: %v", repoDir)
|
|
|
|
configFile := filepath.Join(repoDir, ".kopia.config")
|
|
err = repo.Connect(ctx, configFile, rec, masterPassword, nil)
|
|
require.NoError(t, err, "unable to connect to repository")
|
|
|
|
ft := faketime.NewTimeAdvance(time.Date(2018, time.February, 6, 0, 0, 0, 0, time.UTC))
|
|
|
|
rep, err := repo.Open(ctx, configFile, masterPassword, &repo.Options{
|
|
TimeNowFunc: ft.NowFunc(),
|
|
})
|
|
require.NoError(t, err, "unable to open repository")
|
|
|
|
sourceDir := mockfs.NewDirectory()
|
|
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddFile("f3", []byte{1, 2, 3, 4, 5}, defaultPermissions)
|
|
|
|
sourceDir.AddDir("d1", defaultPermissions)
|
|
sourceDir.AddDir("d1/d1", defaultPermissions)
|
|
sourceDir.AddDir("d1/d2", defaultPermissions)
|
|
sourceDir.AddDir("d2", defaultPermissions)
|
|
sourceDir.AddDir("d2/d1", defaultPermissions)
|
|
|
|
// Prepare directory contents.
|
|
sourceDir.AddFile("d1/d1/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("d1/d1/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddFile("d1/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddFile("d1/d2/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("d1/d2/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddFile("d2/d1/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("d2/d1/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
|
|
_, w, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "test"})
|
|
require.NoError(t, err, "writer creation error")
|
|
|
|
th := &uploadTestHarness{
|
|
sourceDir: sourceDir,
|
|
repoDir: repoDir,
|
|
repo: w,
|
|
ft: ft,
|
|
faulty: faulty,
|
|
}
|
|
|
|
return th
|
|
}
|
|
|
|
//nolint:gocyclo
|
|
func TestUpload(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
t.Logf("Uploading s1")
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err, "upload error")
|
|
|
|
t.Logf("s1: %v", s1.RootEntry)
|
|
t.Logf("Uploading s2")
|
|
|
|
s2, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s1)
|
|
require.NoError(t, err, "upload error")
|
|
|
|
assert.Equal(t, s1.RootObjectID(), s2.RootObjectID(), "root object ids do not match")
|
|
assert.Zero(t, atomic.LoadInt32(&s1.Stats.CachedFiles), "unexpected s1 cached files")
|
|
assert.Equal(t, atomic.LoadInt32(&s1.Stats.NonCachedFiles), atomic.LoadInt32(&s2.Stats.CachedFiles),
|
|
"unexpected s2 cached files: all non-cached files from s1 are now cached and there are no non-cached files since nothing changed")
|
|
assert.Zero(t, atomic.LoadInt32(&s2.Stats.NonCachedFiles), "unexpected non-cached files")
|
|
|
|
// Add one more file, the s1.RootObjectID should change.
|
|
th.sourceDir.AddFile("d2/d1/f3", []byte{1, 2, 3, 4, 5}, defaultPermissions)
|
|
|
|
s3, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s1)
|
|
require.NoError(t, err, "upload failed")
|
|
|
|
assert.NotEqual(t, s2.RootObjectID(), s3.RootObjectID(), "expected s3.RootObjectID!=s2.RootObjectID")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&s3.Stats.NonCachedFiles), "unexpected s3 stats:", s3.Stats,
|
|
"one file is not cached, which causes './d2/d1/', './d2/' and './' to be changed.")
|
|
|
|
// Now remove the added file, OID should be identical to the original before the file got added.
|
|
th.sourceDir.Subdir("d2", "d1").Remove("f3")
|
|
|
|
s4, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s1)
|
|
require.NoError(t, err, "upload failed")
|
|
|
|
assert.Equal(t, s4.RootObjectID(), s1.RootObjectID(), "expected s4.RootObjectID==s1.RootObjectID")
|
|
|
|
// Everything is still cached.
|
|
assert.Equal(t, atomic.LoadInt32(&s4.Stats.CachedFiles), atomic.LoadInt32(&s1.Stats.NonCachedFiles), "unexpected s4 stats:", s4.Stats)
|
|
assert.Zero(t, atomic.LoadInt32(&s4.Stats.NonCachedFiles), "unexpected s4 stats:", s4.Stats)
|
|
|
|
s5, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}, s3)
|
|
require.NoError(t, err, "upload failed")
|
|
|
|
assert.Equal(t, s4.RootObjectID(), s5.RootObjectID(), "expected s4.RootObjectID==s5.RootObjectID")
|
|
require.Zero(t, atomic.LoadInt32(&s5.Stats.NonCachedFiles), "unexpected s5 stats:", s5.Stats,
|
|
"no files are changed, but one file disappeared which caused './d2/d1/', './d2/' and './' to be changed")
|
|
}
|
|
|
|
type entry struct {
|
|
name string
|
|
objectID object.ID
|
|
}
|
|
|
|
// findAllEntries recursively iterates over all the dirs and returns list of file entries.
|
|
func findAllEntries(t *testing.T, ctx context.Context, dir fs.Directory) []entry {
|
|
t.Helper()
|
|
|
|
entries := []entry{}
|
|
|
|
fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
|
oid, err := object.ParseID(testutil.EnsureType[object.HasObjectID](t, e).ObjectID().String())
|
|
require.NoError(t, err)
|
|
|
|
entries = append(entries, entry{
|
|
name: e.Name(),
|
|
objectID: oid,
|
|
})
|
|
if e.IsDir() {
|
|
entries = append(entries, findAllEntries(t, ctx, testutil.EnsureType[fs.Directory](t, e))...)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return entries
|
|
}
|
|
|
|
func verifyMetadataCompressor(t *testing.T, ctx context.Context, rep repo.Repository, entries []entry, comp compression.HeaderID) {
|
|
t.Helper()
|
|
|
|
for _, e := range entries {
|
|
cid, _, ok := e.objectID.ContentID()
|
|
if !assert.True(t, ok) {
|
|
continue
|
|
}
|
|
|
|
if !cid.HasPrefix() {
|
|
continue
|
|
}
|
|
|
|
info, err := rep.ContentInfo(ctx, cid)
|
|
require.NoError(t, err, "failed to get content info for %v", cid)
|
|
assert.Equal(t, comp, info.CompressionHeaderID)
|
|
}
|
|
}
|
|
|
|
func TestUploadMetadataCompression(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
buildPolicy := func(compressor compression.Name) *policy.Tree {
|
|
return policy.BuildTree(map[string]*policy.Policy{
|
|
".": {
|
|
MetadataCompressionPolicy: policy.MetadataCompressionPolicy{
|
|
CompressorName: compressor,
|
|
},
|
|
},
|
|
}, policy.DefaultPolicy)
|
|
}
|
|
|
|
cases := []struct {
|
|
name string
|
|
policyTree *policy.Tree
|
|
compressionID compression.HeaderID
|
|
}{
|
|
{
|
|
name: "default metadata compression",
|
|
policyTree: policy.BuildTree(nil, policy.DefaultPolicy),
|
|
compressionID: compression.HeaderZstdFastest,
|
|
},
|
|
{
|
|
name: "disable metadata compression",
|
|
policyTree: buildPolicy("none"),
|
|
compressionID: content.NoCompression,
|
|
},
|
|
{
|
|
name: "enable metadata compression",
|
|
policyTree: buildPolicy("gzip"),
|
|
compressionID: compression.ByName["gzip"].HeaderID(),
|
|
},
|
|
}
|
|
|
|
ctx := testlogging.Context(t)
|
|
|
|
for _, tc := range cases {
|
|
policyTree := tc.policyTree
|
|
compID := tc.compressionID
|
|
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
th := newUploadTestHarness(ctx, t)
|
|
t.Cleanup(th.cleanup)
|
|
u := NewUploader(th.repo)
|
|
|
|
s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err, "upload error")
|
|
|
|
dir := testutil.EnsureType[fs.Directory](t, snapshotfs.EntryFromDirEntry(th.repo, s1.RootEntry))
|
|
entries := findAllEntries(t, ctx, dir)
|
|
verifyMetadataCompressor(t, ctx, th.repo, entries, compID)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
th.sourceDir.FailReaddir(errTest)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
s, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.ErrorIs(t, err, errTest)
|
|
require.Nil(t, s)
|
|
}
|
|
|
|
func TestUploadDoesNotReportProgressForIgnoredFilesTwice(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
sourceDir := mockfs.NewDirectory()
|
|
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddFile("f3", []byte{1, 2, 3, 4, 5}, defaultPermissions)
|
|
|
|
sourceDir.AddDir("d1", defaultPermissions)
|
|
sourceDir.AddFile("d1/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
|
|
sourceDir.AddDir("d2", defaultPermissions)
|
|
sourceDir.AddFile("d2/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("d2/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
|
|
u := NewUploader(th.repo)
|
|
cup := &CountingUploadProgress{}
|
|
u.Progress = cup
|
|
u.OverrideEntryLogDetail = policy.NewLogDetail(10)
|
|
u.OverrideDirLogDetail = policy.NewLogDetail(10)
|
|
|
|
policyTree := policy.BuildTree(map[string]*policy.Policy{
|
|
".": {
|
|
FilesPolicy: policy.FilesPolicy{
|
|
IgnoreRules: []string{"d2", "f2"},
|
|
},
|
|
},
|
|
}, policy.DefaultPolicy)
|
|
|
|
_, err := u.Upload(ctx, sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
// make sure ignored counter is only incremented by 1, even though we process each directory twice
|
|
// - once during estimation and once during upload.
|
|
require.EqualValues(t, 1, cup.counters.TotalExcludedFiles)
|
|
require.EqualValues(t, 1, cup.counters.TotalExcludedDirs)
|
|
}
|
|
|
|
func TestUpload_SubDirectoryReadFailureFailFast(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
|
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
|
|
|
u := NewUploader(th.repo)
|
|
u.ParallelUploads = 1
|
|
u.FailFast = true
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
require.NotEmpty(t, man.IncompleteReason, "snapshot not marked as incomplete")
|
|
|
|
// will have one error because we're canceling early.
|
|
verifyErrors(t, man, 1, 0, entryPathToError{
|
|
"d1": errTest,
|
|
})
|
|
}
|
|
|
|
func TestUpload_SubDirectoryReadFailureIgnoredNoFailFast(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
|
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
trueValue := policy.OptionalBool(true)
|
|
|
|
policyTree := policy.BuildTree(nil, &policy.Policy{
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &trueValue,
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
},
|
|
})
|
|
|
|
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
// 0 failed, 2 ignored
|
|
verifyErrors(t, man, 0, 2, entryPathToError{
|
|
"d1": errTest,
|
|
"d2/d1": errTest,
|
|
})
|
|
}
|
|
|
|
func TestUpload_ErrorEntries(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
th.sourceDir.Subdir("d1").AddErrorEntryIrregular("some-unknown-entry", 0, fs.ErrUnknown)
|
|
th.sourceDir.Subdir("d1").AddErrorEntryFile("some-failed-entry", 0, errors.New("some-other-error"))
|
|
th.sourceDir.Subdir("d2").AddErrorEntryIrregular("another-failed-entry", 0, errors.New("another-error"))
|
|
|
|
trueValue := policy.OptionalBool(true)
|
|
falseValue := policy.OptionalBool(false)
|
|
|
|
cases := []struct {
|
|
desc string
|
|
rootEntry fs.Entry
|
|
ehp policy.ErrorHandlingPolicy
|
|
wantFatalErrors int
|
|
wantIgnoredErrors int
|
|
}{
|
|
{
|
|
desc: "default ignore rules",
|
|
rootEntry: th.sourceDir,
|
|
ehp: policy.ErrorHandlingPolicy{},
|
|
wantFatalErrors: 2,
|
|
wantIgnoredErrors: 0, // unknown entries are completely skipped when IgnoreUnknownTypes is true (default)
|
|
},
|
|
{
|
|
desc: "ignore both unknown types and other errors",
|
|
rootEntry: th.sourceDir,
|
|
ehp: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &trueValue,
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
IgnoreUnknownTypes: &trueValue,
|
|
},
|
|
wantFatalErrors: 0,
|
|
wantIgnoredErrors: 2, // only the two non-unknown errors are counted as ignored
|
|
},
|
|
{
|
|
desc: "ignore no errors",
|
|
rootEntry: th.sourceDir,
|
|
ehp: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &falseValue,
|
|
IgnoreDirectoryErrors: &falseValue,
|
|
IgnoreUnknownTypes: &falseValue,
|
|
},
|
|
wantFatalErrors: 3,
|
|
wantIgnoredErrors: 0,
|
|
},
|
|
{
|
|
desc: "ignore unknown type errors",
|
|
rootEntry: th.sourceDir,
|
|
ehp: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &falseValue,
|
|
IgnoreDirectoryErrors: &falseValue,
|
|
IgnoreUnknownTypes: &trueValue,
|
|
},
|
|
wantFatalErrors: 2,
|
|
wantIgnoredErrors: 0, // unknown entries are completely skipped when IgnoreUnknownTypes is true
|
|
},
|
|
{
|
|
desc: "ignore errors except unknown type errors",
|
|
rootEntry: th.sourceDir,
|
|
ehp: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &trueValue,
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
IgnoreUnknownTypes: &falseValue,
|
|
},
|
|
wantFatalErrors: 1,
|
|
wantIgnoredErrors: 2,
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(nil, &policy.Policy{
|
|
ErrorHandlingPolicy: tc.ehp,
|
|
})
|
|
|
|
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
expectedErrors := entryPathToError{
|
|
"d1/some-failed-entry": errors.New("some-other-error"),
|
|
"d2/another-failed-entry": errors.New("another-error"),
|
|
}
|
|
|
|
// Only expect unknown entry in failed entries if IgnoreUnknownTypes is false
|
|
if tc.ehp.IgnoreUnknownTypes != nil && !tc.ehp.IgnoreUnknownTypes.OrDefault(true) {
|
|
expectedErrors["d1/some-unknown-entry"] = errors.New("unknown or unsupported entry type")
|
|
}
|
|
|
|
verifyErrors(t, man, tc.wantFatalErrors, tc.wantIgnoredErrors, expectedErrors)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpload_ErrorEntryChildPolicy(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
// Add a dir-typed error entry, a file-typed error entry, and an unknown-typed error entry under d1.
|
|
th.sourceDir.Subdir("d1").AddErrorEntryDir("dir-err", 0, errors.New("dir-error"))
|
|
th.sourceDir.Subdir("d1").AddErrorEntryFile("file-err", 0, errors.New("file-error"))
|
|
th.sourceDir.Subdir("d1").AddErrorEntryIrregular("unknown-err", 0, fs.ErrUnknown)
|
|
|
|
trueValue := policy.OptionalBool(true)
|
|
falseValue := policy.OptionalBool(false)
|
|
|
|
cases := []struct {
|
|
desc string
|
|
defined map[string]*policy.Policy
|
|
defaultPolicy *policy.Policy
|
|
wantFatalErrors int
|
|
wantIgnoredErrors int
|
|
wantErrors entryPathToError
|
|
}{
|
|
{
|
|
desc: "child policy ignores dir errors only",
|
|
defined: map[string]*policy.Policy{
|
|
"./d1/dir-err": {
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
IgnoreFileErrors: &falseValue,
|
|
},
|
|
},
|
|
},
|
|
defaultPolicy: &policy.Policy{
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreDirectoryErrors: &falseValue,
|
|
IgnoreFileErrors: &falseValue,
|
|
},
|
|
},
|
|
wantFatalErrors: 1, // file-err is fatal (uses default policy)
|
|
wantIgnoredErrors: 1, // dir-err is ignored (uses child policy)
|
|
// unknown-err is silently skipped (IgnoreUnknownTypes defaults to true)
|
|
wantErrors: entryPathToError{
|
|
"d1/dir-err": errors.New("dir-error"),
|
|
"d1/file-err": errors.New("file-error"),
|
|
},
|
|
},
|
|
{
|
|
desc: "child policy ignores file errors only",
|
|
defined: map[string]*policy.Policy{
|
|
"./d1/file-err": {
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreDirectoryErrors: &falseValue,
|
|
IgnoreFileErrors: &trueValue,
|
|
},
|
|
},
|
|
},
|
|
defaultPolicy: &policy.Policy{
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreDirectoryErrors: &falseValue,
|
|
IgnoreFileErrors: &falseValue,
|
|
},
|
|
},
|
|
wantFatalErrors: 1, // dir-err is fatal (uses default policy)
|
|
wantIgnoredErrors: 1, // file-err is ignored (uses child policy)
|
|
// unknown-err is silently skipped (IgnoreUnknownTypes defaults to true)
|
|
wantErrors: entryPathToError{
|
|
"d1/dir-err": errors.New("dir-error"),
|
|
"d1/file-err": errors.New("file-error"),
|
|
},
|
|
},
|
|
{
|
|
desc: "child policy disables unknown type ignore",
|
|
defined: map[string]*policy.Policy{
|
|
"./d1/unknown-err": {
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreUnknownTypes: &falseValue,
|
|
},
|
|
},
|
|
},
|
|
defaultPolicy: &policy.Policy{
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
IgnoreFileErrors: &trueValue,
|
|
},
|
|
},
|
|
wantFatalErrors: 1, // unknown-err is fatal (child policy overrides default)
|
|
wantIgnoredErrors: 2, // dir-err and file-err are ignored (default policy)
|
|
wantErrors: entryPathToError{
|
|
"d1/dir-err": errors.New("dir-error"),
|
|
"d1/file-err": errors.New("file-error"),
|
|
"d1/unknown-err": fs.ErrUnknown,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(tc.defined, tc.defaultPolicy)
|
|
|
|
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
verifyErrors(t, man, tc.wantFatalErrors, tc.wantIgnoredErrors, tc.wantErrors)
|
|
})
|
|
}
|
|
}
|
|
|
|
func verifyErrors(t *testing.T, man *snapshot.Manifest, wantFatalErrors, wantIgnoredErrors int, wantErrors entryPathToError) {
|
|
t.Helper()
|
|
|
|
require.Equal(t, wantFatalErrors, man.RootEntry.DirSummary.FatalErrorCount, "invalid number of fatal errors")
|
|
require.Equal(t, wantIgnoredErrors, man.RootEntry.DirSummary.IgnoredErrorCount, "invalid number of ignored errors")
|
|
|
|
failedEntries := man.RootEntry.DirSummary.FailedEntries
|
|
for _, failedEntry := range failedEntries {
|
|
wantErr, ok := wantErrors[failedEntry.EntryPath]
|
|
require.True(t, ok, "expected error for entry path not found: %s", failedEntry.EntryPath)
|
|
require.Contains(t, failedEntry.Error, wantErr.Error())
|
|
}
|
|
}
|
|
|
|
func TestUpload_SubDirectoryReadFailureNoFailFast(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
|
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
// make sure we have 2 errors
|
|
require.Equal(t, 2, man.RootEntry.DirSummary.FatalErrorCount)
|
|
|
|
verifyErrors(t, man, 2, 0, entryPathToError{
|
|
"d1": errTest,
|
|
"d2/d1": errTest,
|
|
})
|
|
}
|
|
|
|
func TestUpload_SubDirectoryReadFailureSomeIgnoredNoFailFast(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
th.sourceDir.Subdir("d1").FailReaddir(errTest)
|
|
th.sourceDir.Subdir("d2").Subdir("d1").FailReaddir(errTest)
|
|
th.sourceDir.AddDir("d3", defaultPermissions)
|
|
th.sourceDir.Subdir("d3").FailReaddir(errTest)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
trueValue := policy.OptionalBool(true)
|
|
|
|
// set up a policy tree where errors from d3 are ignored.
|
|
policyTree := policy.BuildTree(map[string]*policy.Policy{
|
|
"./d3": {
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &trueValue,
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
},
|
|
},
|
|
}, policy.DefaultPolicy)
|
|
|
|
require.True(t, policyTree.Child("d3").EffectivePolicy().ErrorHandlingPolicy.IgnoreDirectoryErrors.OrDefault(false), "policy not effective")
|
|
|
|
man, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
verifyErrors(t, man, 2, 1, entryPathToError{
|
|
"d1": errTest,
|
|
"d2/d1": errTest,
|
|
"d3": errTest,
|
|
})
|
|
}
|
|
|
|
type mockProgress struct {
|
|
Progress
|
|
finishedFileCheck func(string, error)
|
|
}
|
|
|
|
func (mp *mockProgress) FinishedFile(relativePath string, err error) {
|
|
defer mp.Progress.FinishedFile(relativePath, err)
|
|
|
|
mp.finishedFileCheck(relativePath, err)
|
|
}
|
|
|
|
func TestUpload_FinishedFileProgress(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
mu := sync.Mutex{}
|
|
filesFinished := 0
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
t.Logf("checking FinishedFile callbacks")
|
|
|
|
root := mockfs.NewDirectory()
|
|
root.AddFile("f1", []byte{'1', '2', '3'}, 0o777)
|
|
root.AddFileWithSource("f2", 0o777, func() (mockfs.ReaderSeekerCloser, error) {
|
|
return nil, assert.AnError
|
|
})
|
|
|
|
u := NewUploader(th.repo)
|
|
u.ForceHashPercentage = 0
|
|
u.Progress = &mockProgress{
|
|
Progress: u.Progress,
|
|
finishedFileCheck: func(relativePath string, err error) {
|
|
defer func() {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
filesFinished++
|
|
}()
|
|
|
|
assert.Contains(t, []string{"f1", "f2"}, filepath.Base(relativePath))
|
|
|
|
if strings.Contains(relativePath, "f2") {
|
|
require.Error(t, err)
|
|
return
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
},
|
|
}
|
|
|
|
trueValue := policy.OptionalBool(true)
|
|
policyTree := policy.BuildTree(map[string]*policy.Policy{
|
|
".": {
|
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
|
IgnoreFileErrors: &trueValue,
|
|
IgnoreDirectoryErrors: &trueValue,
|
|
},
|
|
},
|
|
}, policy.DefaultPolicy)
|
|
|
|
man, err := u.Upload(ctx, root, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.ErrorCount), "ErrorCount")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.IgnoredErrorCount), "IgnoredErrorCount")
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.CachedFiles), "CachedFiles")
|
|
assert.Equal(t, int32(2), atomic.LoadInt32(&man.Stats.NonCachedFiles), "NonCachedFiles")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalDirectoryCount), "TotalDirectoryCount")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalFileCount), "TotalFileCount")
|
|
assert.Equal(t, 2, filesFinished, "FinishedFile calls")
|
|
|
|
// Upload a second time to check for cached files.
|
|
filesFinished = 0
|
|
man, err = u.Upload(ctx, root, policyTree, snapshot.SourceInfo{}, man)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.ErrorCount), "ErrorCount")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.IgnoredErrorCount), "IgnoredErrorCount")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.CachedFiles), "CachedFiles")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.NonCachedFiles), "NonCachedFiles")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalDirectoryCount), "TotalDirectoryCount")
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.TotalFileCount), "TotalFileCount")
|
|
assert.Equal(t, 2, filesFinished, "FinishedFile calls")
|
|
}
|
|
|
|
func TestUpload_SymlinkStats(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
root := mockfs.NewDirectory()
|
|
root.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
|
root.AddDir("d1", defaultPermissions)
|
|
root.AddDir("d1/d1", defaultPermissions)
|
|
root.AddFile("d1/d1/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
root.AddSymlink("s1", "d1/d1/f1", defaultPermissions)
|
|
root.AddSymlink("s2", "f1", defaultPermissions)
|
|
root.AddSymlink("s3", "d1", defaultPermissions)
|
|
|
|
u := NewUploader(th.repo)
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
// First upload of the root directory.
|
|
man1, err := u.Upload(ctx, root, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
// Expect the directory summary to have the correct breakdown of files and symlinks.
|
|
require.Equal(t, int64(3), man1.RootEntry.DirSummary.TotalSymlinkCount, "Directory summary TotalSymlinkCount")
|
|
require.Equal(t, int64(2), man1.RootEntry.DirSummary.TotalFileCount, "Directory summary TotalSymlinkCount")
|
|
|
|
// Expect the directory summary total file size to match the stats total file size.
|
|
require.Equal(t, atomic.LoadInt64(&man1.Stats.TotalFileSize), man1.RootEntry.DirSummary.TotalFileSize, "Total file size")
|
|
|
|
// Upload a second time to check the stats from cached files.
|
|
man2, err := u.Upload(ctx, root, policyTree, snapshot.SourceInfo{}, man1)
|
|
require.NoError(t, err)
|
|
|
|
// Expect total file count for the second upload to be zero - all files are cached.
|
|
require.Equal(t, int32(0), atomic.LoadInt32(&man2.Stats.TotalFileCount), "Directory summary TotalFileCount")
|
|
|
|
// Expect the directory summary to have the correct breakdown of files and symlinks.
|
|
require.Equal(t, int64(3), man2.RootEntry.DirSummary.TotalSymlinkCount, "Directory summary TotalSymlinkCount")
|
|
require.Equal(t, int64(2), man2.RootEntry.DirSummary.TotalFileCount, "Directory summary TotalSymlinkCount")
|
|
}
|
|
|
|
func TestUploadWithCheckpointing(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
fakeTicker := make(chan time.Time)
|
|
|
|
// inject fake ticker that we can control externally instead of through time passage.
|
|
u.getTicker = func(d time.Duration) <-chan time.Time {
|
|
return fakeTicker
|
|
}
|
|
|
|
// create a channel that will be sent to whenever checkpoint completes.
|
|
u.checkpointFinished = make(chan struct{})
|
|
u.disableEstimation = true
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
si := snapshot.SourceInfo{
|
|
UserName: "user",
|
|
Host: "host",
|
|
Path: "path",
|
|
}
|
|
|
|
labels := map[string]string{
|
|
"shape": "square",
|
|
"color": "red",
|
|
}
|
|
|
|
// Be paranoid and make a copy of the labels in the uploader so we know stuff
|
|
// didn't change.
|
|
u.CheckpointLabels = maps.Clone(labels)
|
|
|
|
// inject a action into mock filesystem to trigger and wait for checkpoints at few places.
|
|
// the places are not important, what's important that those are 3 separate points in time.
|
|
dirsToCheckpointAt := []*mockfs.Directory{
|
|
th.sourceDir.Subdir("d1"),
|
|
th.sourceDir.Subdir("d2"),
|
|
th.sourceDir.Subdir("d1").Subdir("d2"),
|
|
}
|
|
|
|
for _, d := range dirsToCheckpointAt {
|
|
d.OnReaddir(func() {
|
|
t.Logf("onReadDir %v %s", d.Name(), debug.Stack())
|
|
// trigger checkpoint
|
|
fakeTicker <- clock.Now()
|
|
// wait for checkpoint
|
|
<-u.checkpointFinished
|
|
})
|
|
}
|
|
|
|
s, err := u.Upload(ctx, th.sourceDir, policyTree, si)
|
|
require.NoError(t, err, "upload error")
|
|
|
|
checkpoints, err := snapshot.ListSnapshots(ctx, th.repo, si)
|
|
require.NoError(t, err, "error listing snapshots")
|
|
require.Len(t, checkpoints, len(dirsToCheckpointAt))
|
|
|
|
for _, cp := range checkpoints {
|
|
assert.Equal(t, IncompleteReasonCheckpoint, cp.IncompleteReason, "unexpected incompleteReason")
|
|
assert.Equal(t, s.StartTime, cp.StartTime, "checkpoint start time is expected to match snapshot time")
|
|
assert.Falsef(t, s.EndTime.Before(cp.EndTime), "snapshot end time (%s) is before checkpoint end time (%s), snapshot end time should be equal or after checkpoint time ", s.EndTime.Format(time.RFC3339Nano), cp.EndTime.Format(time.RFC3339Nano))
|
|
assert.Equal(t, labels, cp.Tags)
|
|
}
|
|
}
|
|
|
|
func TestParallelUploadUploadsBlobsInParallel(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
u := NewUploader(th.repo)
|
|
u.ParallelUploads = 13
|
|
|
|
// no faults for first blob write - session marker.
|
|
th.faulty.AddFault(blobtesting.MethodPutBlob)
|
|
|
|
var currentParallelCalls, maxParallelCalls atomic.Int32
|
|
|
|
// measure concurrency of PutBlob calls
|
|
th.faulty.AddFault(blobtesting.MethodPutBlob).Repeat(10).Before(func() {
|
|
v := currentParallelCalls.Add(1)
|
|
maxParallelism := maxParallelCalls.Load()
|
|
|
|
if v > maxParallelism {
|
|
maxParallelCalls.CompareAndSwap(maxParallelism, v)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
currentParallelCalls.Add(-1)
|
|
})
|
|
|
|
// create a channel that will be sent to whenever checkpoint completes.
|
|
u.checkpointFinished = make(chan struct{})
|
|
u.disableEstimation = true
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
require.Equal(t, 13, u.effectiveParallelFileReads(policyTree.EffectivePolicy()))
|
|
|
|
si := snapshot.SourceInfo{
|
|
UserName: "user",
|
|
Host: "host",
|
|
Path: "path",
|
|
}
|
|
|
|
// add a bunch of very large files which can be hashed in parallel and will trigger parallel
|
|
// uploads
|
|
th.sourceDir.AddFile("d1/large1", randomBytes(1e7), defaultPermissions)
|
|
th.sourceDir.AddFile("d1/large2", randomBytes(2e7), defaultPermissions)
|
|
th.sourceDir.AddFile("d1/large3", randomBytes(2e7), defaultPermissions)
|
|
th.sourceDir.AddFile("d1/large4", randomBytes(1e7), defaultPermissions)
|
|
|
|
th.sourceDir.AddFile("d2/large1", randomBytes(1e7), defaultPermissions)
|
|
th.sourceDir.AddFile("d2/large2", randomBytes(1e7), defaultPermissions)
|
|
th.sourceDir.AddFile("d2/large3", randomBytes(1e7), defaultPermissions)
|
|
th.sourceDir.AddFile("d2/large4", randomBytes(1e7), defaultPermissions)
|
|
|
|
_, err := u.Upload(ctx, th.sourceDir, policyTree, si)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, th.repo.Flush(ctx))
|
|
require.Positive(t, maxParallelCalls.Load())
|
|
}
|
|
|
|
func randomBytes(n int64) []byte {
|
|
b := make([]byte, n)
|
|
rand.Read(b)
|
|
|
|
return b
|
|
}
|
|
|
|
func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
t.Logf("Uploading static directory with streaming file")
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
// Create a temporary pipe file with test data
|
|
tmpContent := []byte("Streaming Temporary file content")
|
|
|
|
r, w, err := os.Pipe()
|
|
require.NoError(t, err, "error creating pipe file")
|
|
|
|
_, err = w.Write(tmpContent)
|
|
require.NoError(t, err, "error writing to pipe file")
|
|
|
|
w.Close()
|
|
|
|
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
|
virtualfs.StreamingFileFromReader("stream-file", r),
|
|
})
|
|
|
|
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err, "upload error")
|
|
|
|
require.Zero(t, atomic.LoadInt32(&man.Stats.CachedFiles), "unexpected manifest cached files")
|
|
// one file is not cached
|
|
require.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.NonCachedFiles), "unexpected manifest non-cached files")
|
|
// must have one directory
|
|
require.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalDirectoryCount), "unexpected manifest directory count")
|
|
// must have one file
|
|
require.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalFileCount), "unexpected manifest file count")
|
|
}
|
|
|
|
func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
pol := *policy.DefaultPolicy
|
|
pol.CompressionPolicy.CompressorName = "pgzip"
|
|
|
|
policyTree := policy.BuildTree(nil, &pol)
|
|
|
|
// Create a temporary file with test data. Want something compressible but
|
|
// small so we don't trigger dedupe.
|
|
tmpContent := []byte(strings.Repeat("a", 4096))
|
|
r := io.NopCloser(bytes.NewReader(tmpContent))
|
|
|
|
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
|
virtualfs.StreamingFileFromReader("stream-file", r),
|
|
})
|
|
|
|
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.CachedFiles), "cached file count")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.NonCachedFiles), "non-cached file count")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalDirectoryCount), "directory count")
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalFileCount), "total file count")
|
|
|
|
// Write out pending data so the below size check compares properly.
|
|
require.NoError(t, th.repo.Flush(ctx), "flushing repo")
|
|
require.Less(t, testutil.MustGetTotalDirSize(t, th.repoDir), int64(14000))
|
|
}
|
|
|
|
func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
tmpContent := []byte("Streaming Temporary file content")
|
|
mt := time.Date(2021, 1, 2, 3, 4, 5, 0, time.UTC)
|
|
|
|
cases := []struct {
|
|
desc string
|
|
getFile func() fs.StreamingFile
|
|
cachedFiles int32
|
|
uploadedFiles int32
|
|
}{
|
|
{
|
|
desc: "CurrentTime",
|
|
getFile: func() fs.StreamingFile {
|
|
return virtualfs.StreamingFileFromReader("a", io.NopCloser(bytes.NewReader(tmpContent)))
|
|
},
|
|
cachedFiles: 0,
|
|
uploadedFiles: 1,
|
|
},
|
|
{
|
|
desc: "FixedTime",
|
|
getFile: func() fs.StreamingFile {
|
|
return virtualfs.StreamingFileWithModTimeFromReader("a", mt, io.NopCloser(bytes.NewReader(tmpContent)))
|
|
},
|
|
cachedFiles: 1,
|
|
uploadedFiles: 0,
|
|
},
|
|
}
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
u := NewUploader(th.repo)
|
|
u.ForceHashPercentage = 0
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
|
tc.getFile(),
|
|
})
|
|
|
|
// First snapshot should upload all files/directories.
|
|
man1, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
require.Equal(t, int32(0), atomic.LoadInt32(&man1.Stats.CachedFiles))
|
|
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.NonCachedFiles))
|
|
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalDirectoryCount))
|
|
require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalFileCount))
|
|
require.Equal(t, int64(len(tmpContent)), atomic.LoadInt64(&man1.Stats.TotalFileSize))
|
|
|
|
// wait a little bit to ensure clock moves forward which is not always the case on Windows.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Rebuild tree because reader only works once.
|
|
staticRoot = virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
|
tc.getFile(),
|
|
})
|
|
|
|
// Second upload may find some cached files depending on timestamps.
|
|
man2, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{}, man1)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man2.Stats.TotalDirectoryCount))
|
|
assert.Equal(t, tc.cachedFiles, atomic.LoadInt32(&man2.Stats.CachedFiles))
|
|
assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.NonCachedFiles))
|
|
// Cached files don't count towards the total file count.
|
|
assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.TotalFileCount))
|
|
require.Equal(t, int64(len(tmpContent)), atomic.LoadInt64(&man2.Stats.TotalFileSize))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpload_StreamingDirectory(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
t.Logf("Uploading streaming directory with mock file")
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
|
|
|
files := []fs.Entry{
|
|
mockfs.NewFile("f1", []byte{1, 2, 3}, defaultPermissions),
|
|
}
|
|
|
|
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
|
virtualfs.NewStreamingDirectory(
|
|
"stream-directory",
|
|
fs.StaticIterator(files, nil),
|
|
),
|
|
})
|
|
|
|
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.CachedFiles))
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.NonCachedFiles))
|
|
assert.Equal(t, int32(2), atomic.LoadInt32(&man.Stats.TotalDirectoryCount))
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalFileCount))
|
|
}
|
|
|
|
func TestUpload_StreamingDirectoryWithIgnoredFile(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
t.Logf("Uploading streaming directory with some ignored mock files")
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
policyTree := policy.BuildTree(map[string]*policy.Policy{
|
|
".": {
|
|
FilesPolicy: policy.FilesPolicy{
|
|
IgnoreRules: []string{"f2"},
|
|
},
|
|
},
|
|
}, policy.DefaultPolicy)
|
|
|
|
files := []fs.Entry{
|
|
mockfs.NewFile("f1", []byte{1, 2, 3}, defaultPermissions),
|
|
mockfs.NewFile("f2", []byte{1, 2, 3, 4}, defaultPermissions),
|
|
}
|
|
|
|
staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{
|
|
virtualfs.NewStreamingDirectory(
|
|
"stream-directory",
|
|
fs.StaticIterator(files, nil),
|
|
),
|
|
})
|
|
|
|
man, err := u.Upload(ctx, staticRoot, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&man.Stats.CachedFiles))
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.NonCachedFiles))
|
|
assert.Equal(t, int32(2), atomic.LoadInt32(&man.Stats.TotalDirectoryCount))
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&man.Stats.TotalFileCount))
|
|
}
|
|
|
|
type mockLogger struct {
|
|
logged []loggedAction
|
|
}
|
|
|
|
func (w *mockLogger) Write(p []byte) (int, error) {
|
|
n := len(p)
|
|
|
|
parts := strings.SplitN(strings.TrimSpace(string(p)), "\t", 2)
|
|
|
|
var la loggedAction
|
|
|
|
la.msg = parts[0]
|
|
|
|
if len(parts) == 2 {
|
|
if err := json.Unmarshal([]byte(parts[1]), &la.keysAndValues); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
if !w.ignore(la) {
|
|
w.logged = append(w.logged, la)
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
func (w *mockLogger) ignore(la loggedAction) bool {
|
|
switch {
|
|
case strings.HasPrefix(la.msg, "uploading"):
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (w *mockLogger) Sync() error {
|
|
return nil
|
|
}
|
|
|
|
func TestParallelUploadDedup(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
t.Logf("Uploading static directory with streaming file")
|
|
|
|
u := NewUploader(th.repo)
|
|
u.ParallelUploads = 10
|
|
|
|
pol := *policy.DefaultPolicy
|
|
pol.CompressionPolicy.CompressorName = "pgzip"
|
|
|
|
policyTree := policy.BuildTree(nil, &pol)
|
|
|
|
testutil.SkipTestOnCIUnlessLinuxAMD64(t)
|
|
td := testutil.TempDirectory(t)
|
|
|
|
// 10 identical non-compressible files, 50MB each
|
|
var files []*os.File
|
|
|
|
for i := range 10 {
|
|
f, cerr := os.Create(filepath.Join(td, fmt.Sprintf("file-%v", i)))
|
|
require.NoError(t, cerr)
|
|
|
|
files = append(files, f)
|
|
}
|
|
|
|
for range 1000 {
|
|
buf := make([]byte, 50000)
|
|
rand.Read(buf)
|
|
|
|
for _, f := range files {
|
|
_, werr := f.Write(buf)
|
|
require.NoError(t, werr)
|
|
}
|
|
}
|
|
|
|
for _, f := range files {
|
|
f.Close()
|
|
}
|
|
|
|
srcdir, err := localfs.Directory(td)
|
|
require.NoError(t, err)
|
|
|
|
_, err = u.Upload(ctx, srcdir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
// we wrote 500 MB, which can be deduped to 50MB, repo size must be less than 51MB
|
|
require.Less(t, testutil.MustGetTotalDirSize(t, th.repoDir), int64(51000000))
|
|
}
|
|
|
|
func TestParallelUploadOfLargeFiles(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := testlogging.Context(t)
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
u := NewUploader(th.repo)
|
|
u.ParallelUploads = 10
|
|
|
|
pol := *policy.DefaultPolicy
|
|
|
|
// change policies so that all files above this size are uploaded in parallel
|
|
// use an unusual number so that it's easy to spot.
|
|
const chunkSize = 10203040
|
|
|
|
// future reader, the chunk size must be greater than 4 MiB to make sure splitters are
|
|
// not used in degenerate form.
|
|
require.Greater(t, chunkSize, 4<<20)
|
|
|
|
n := policy.OptionalInt64(chunkSize)
|
|
pol.UploadPolicy.ParallelUploadAboveSize = &n
|
|
|
|
policyTree := policy.BuildTree(nil, &pol)
|
|
|
|
testutil.SkipTestOnCIUnlessLinuxAMD64(t)
|
|
td := testutil.TempDirectory(t)
|
|
|
|
// Write 2 x 50MB files
|
|
var files []*os.File
|
|
|
|
for i := range 2 {
|
|
f, cerr := os.Create(filepath.Join(td, fmt.Sprintf("file-%v", i)))
|
|
require.NoError(t, cerr)
|
|
|
|
files = append(files, f)
|
|
}
|
|
|
|
for range 1000 {
|
|
buf := make([]byte, 50000)
|
|
|
|
for _, f := range files {
|
|
rand.Read(buf)
|
|
|
|
_, werr := f.Write(buf)
|
|
require.NoError(t, werr)
|
|
}
|
|
}
|
|
|
|
for _, f := range files {
|
|
f.Close()
|
|
}
|
|
|
|
srcdir, err := localfs.Directory(td)
|
|
require.NoError(t, err)
|
|
|
|
man, err := u.Upload(ctx, srcdir, policyTree, snapshot.SourceInfo{})
|
|
require.NoError(t, err)
|
|
|
|
t.Logf("man: %v", man.RootObjectID())
|
|
|
|
dir := testutil.EnsureType[fs.Directory](t, snapshotfs.EntryFromDirEntry(th.repo, man.RootEntry))
|
|
successCount := 0
|
|
|
|
fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error {
|
|
if f, ok := e.(fs.File); ok {
|
|
hoid := testutil.EnsureType[object.HasObjectID](t, f)
|
|
|
|
oids := hoid.ObjectID().String()
|
|
|
|
oid, err := object.ParseID(strings.TrimPrefix(oids, "I"))
|
|
require.NoError(t, err, "failed to parse object id", oids)
|
|
|
|
entries, err := object.LoadIndexObject(ctx, testutil.EnsureType[repo.DirectRepositoryWriter](t, th.repo).ContentManager(), oid)
|
|
require.NoError(t, err, "failed to parse indirect object id", oid)
|
|
|
|
// ensure that index object contains breakpoints at all multiples of 'chunkSize'.
|
|
// Because we picked unusual chunkSize, this proves that uploads happened individually
|
|
// and were concatenated
|
|
for offset := int64(0); offset < f.Size(); offset += chunkSize {
|
|
verifyContainsOffset(t, entries, chunkSize)
|
|
|
|
successCount++
|
|
}
|
|
|
|
verifyFileContent(t, f, filepath.Join(td, f.Name()))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
// make sure we actually tested something
|
|
require.Positive(t, successCount)
|
|
}
|
|
|
|
func verifyFileContent(t *testing.T, f1Entry fs.File, f2Name string) {
|
|
t.Helper()
|
|
|
|
f1, err := f1Entry.Open(testlogging.Context(t))
|
|
require.NoError(t, err)
|
|
|
|
defer f1.Close()
|
|
|
|
f2, err := os.Open(f2Name)
|
|
require.NoError(t, err)
|
|
|
|
defer f2.Close()
|
|
|
|
buf1 := make([]byte, 1e6)
|
|
buf2 := make([]byte, 1e6)
|
|
|
|
for {
|
|
n1, err1 := f1.Read(buf1)
|
|
n2, err2 := f2.Read(buf2)
|
|
|
|
if errors.Is(err1, io.EOF) {
|
|
require.ErrorIs(t, err2, io.EOF)
|
|
return
|
|
}
|
|
|
|
require.NoError(t, err1)
|
|
require.NoError(t, err2)
|
|
|
|
require.Equal(t, buf1[0:n1], buf2[0:n2])
|
|
}
|
|
}
|
|
|
|
func verifyContainsOffset(t *testing.T, entries []object.IndirectObjectEntry, want int64) {
|
|
t.Helper()
|
|
|
|
for _, e := range entries {
|
|
if e.Start == want {
|
|
return
|
|
}
|
|
}
|
|
|
|
t.Fatalf("entry set %v does not contain offset %v", entries, want)
|
|
}
|
|
|
|
type loggedAction struct {
|
|
msg string
|
|
keysAndValues map[string]any
|
|
}
|
|
|
|
//nolint:maintidx
|
|
func TestUploadLogging(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
sourceDir := mockfs.NewDirectory()
|
|
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddFile("f3", []byte{1, 2, 3, 4, 5}, defaultPermissions)
|
|
sourceDir.AddSymlink("f4", "f2", defaultPermissions)
|
|
sourceDir.AddErrorEntryFile("f5", defaultPermissions, errors.New("some error"))
|
|
|
|
sourceDir.AddDir("d1", defaultPermissions)
|
|
sourceDir.AddDir("d1/d3", defaultPermissions)
|
|
sourceDir.AddFile("d1/d3/f1", []byte{1, 2, 3}, defaultPermissions)
|
|
sourceDir.AddFile("d1/d3/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
|
sourceDir.AddSymlink("d1/d3/f3", "f1", defaultPermissions)
|
|
|
|
cases := []struct {
|
|
desc string
|
|
globalLoggingPolicy *policy.LoggingPolicy
|
|
globalFilesPolicy *policy.FilesPolicy
|
|
dirLogDetail *policy.LogDetail
|
|
entryLogDetail *policy.LogDetail
|
|
wantEntries []string
|
|
wantEntriesSecond []string
|
|
wantDetailKeys map[string][]string
|
|
}{
|
|
{
|
|
desc: "override-logging disabled",
|
|
dirLogDetail: policy.NewLogDetail(0),
|
|
entryLogDetail: policy.NewLogDetail(0),
|
|
wantEntries: []string{
|
|
// errors are always logged
|
|
"error f5",
|
|
},
|
|
wantEntriesSecond: []string{
|
|
// errors are always logged
|
|
"error f5",
|
|
},
|
|
wantDetailKeys: map[string][]string{
|
|
"cached": {"dur", "path"},
|
|
"error": {"error", "path"},
|
|
},
|
|
},
|
|
{
|
|
desc: "override-minimal logging",
|
|
dirLogDetail: policy.NewLogDetail(1),
|
|
entryLogDetail: policy.NewLogDetail(1),
|
|
wantEntries: []string{
|
|
"snapshotted file d1/d3/f1",
|
|
"snapshotted file d1/d3/f2",
|
|
"snapshotted symlink d1/d3/f3",
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"snapshotted file f1",
|
|
"snapshotted file f2",
|
|
"snapshotted file f3",
|
|
"snapshotted symlink f4",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
wantEntriesSecond: []string{
|
|
"cached d1/d3/f1",
|
|
"cached d1/d3/f2",
|
|
"cached d1/d3/f3",
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"cached f1",
|
|
"cached f2",
|
|
"cached f3",
|
|
"cached f4",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
// at level 1 only durations and paths are logged
|
|
wantDetailKeys: map[string][]string{
|
|
"cached": {"dur", "path"},
|
|
"error": {"path", "error", "dur"},
|
|
"snapshotted file": {"path", "dur"},
|
|
"snapshotted directory": {"path", "dur"},
|
|
"snapshotted symlink": {"path", "dur"},
|
|
},
|
|
},
|
|
// only directories are logged
|
|
{
|
|
desc: "override-directory-only-logging",
|
|
dirLogDetail: policy.NewLogDetail(policy.LogDetailMax),
|
|
entryLogDetail: policy.NewLogDetail(0),
|
|
wantEntries: []string{
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
wantEntriesSecond: []string{
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
// at level 10 a lot of details are logged.
|
|
wantDetailKeys: map[string][]string{
|
|
"error": {"path", "error"},
|
|
"snapshotted directory": {"dirs", "dur", "errors", "files", "mtime", "oid", "path", "size"},
|
|
},
|
|
},
|
|
// only entries are scheduled.
|
|
{
|
|
desc: "override-entry-only-logging",
|
|
dirLogDetail: policy.NewLogDetail(0),
|
|
entryLogDetail: policy.NewLogDetail(policy.LogDetailMax),
|
|
wantEntries: []string{
|
|
"snapshotted file d1/d3/f1",
|
|
"snapshotted file d1/d3/f2",
|
|
"snapshotted symlink d1/d3/f3",
|
|
"snapshotted file f1",
|
|
"snapshotted file f2",
|
|
"snapshotted file f3",
|
|
"snapshotted symlink f4",
|
|
"error f5",
|
|
},
|
|
wantEntriesSecond: []string{
|
|
"cached d1/d3/f1",
|
|
"cached d1/d3/f2",
|
|
"cached d1/d3/f3",
|
|
"cached f1",
|
|
"cached f2",
|
|
"cached f3",
|
|
"cached f4",
|
|
"error f5",
|
|
},
|
|
// at level 10 a lot of details are logged.
|
|
wantDetailKeys: map[string][]string{
|
|
"cached": {"dur", "mtime", "oid", "path", "size"},
|
|
"error": {"dur", "error", "path"},
|
|
"snapshotted file": {"dur", "mtime", "oid", "path", "size"},
|
|
"snapshotted symlink": {"dur", "mtime", "oid", "path", "size"},
|
|
},
|
|
},
|
|
{
|
|
desc: "default-policy",
|
|
wantDetailKeys: map[string][]string{
|
|
"cached": {"dur", "mtime", "oid", "path", "size"},
|
|
"error": {"error", "path"},
|
|
"snapshotted file": {"dur", "path", "size"},
|
|
"snapshotted symlink": {"dur", "path", "size"},
|
|
"snapshotted directory": {"dirs", "dur", "errors", "files", "path", "size"},
|
|
},
|
|
wantEntries: []string{
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
// cache hits are not logged
|
|
wantEntriesSecond: []string{
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
},
|
|
{
|
|
desc: "global-logging-policy",
|
|
globalLoggingPolicy: &policy.LoggingPolicy{
|
|
Directories: policy.DirLoggingPolicy{
|
|
Snapshotted: policy.NewLogDetail(3),
|
|
},
|
|
Entries: policy.EntryLoggingPolicy{
|
|
Snapshotted: policy.NewLogDetail(3),
|
|
},
|
|
},
|
|
wantDetailKeys: map[string][]string{
|
|
"cached": {"dur", "mtime", "oid", "path", "size"},
|
|
"error": {"dur", "error", "path"},
|
|
"snapshotted file": {"dur", "path", "size"},
|
|
"snapshotted symlink": {"dur", "path", "size"},
|
|
"snapshotted directory": {"dur", "path", "size"},
|
|
},
|
|
wantEntries: []string{
|
|
"snapshotted file d1/d3/f1",
|
|
"snapshotted file d1/d3/f2",
|
|
"snapshotted symlink d1/d3/f3",
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"snapshotted file f1",
|
|
"snapshotted file f2",
|
|
"snapshotted file f3",
|
|
"snapshotted symlink f4",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
// cache hits are not logged
|
|
wantEntriesSecond: []string{
|
|
"snapshotted directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
},
|
|
{
|
|
desc: "complex-logging-policy",
|
|
globalLoggingPolicy: &policy.LoggingPolicy{
|
|
Directories: policy.DirLoggingPolicy{
|
|
Snapshotted: policy.NewLogDetail(3),
|
|
Ignored: policy.NewLogDetail(4),
|
|
},
|
|
Entries: policy.EntryLoggingPolicy{
|
|
Ignored: policy.NewLogDetail(3),
|
|
Snapshotted: policy.NewLogDetail(3),
|
|
CacheMiss: policy.NewLogDetail(4),
|
|
CacheHit: policy.NewLogDetail(5),
|
|
},
|
|
},
|
|
globalFilesPolicy: &policy.FilesPolicy{
|
|
IgnoreRules: []string{"f1", "d3"},
|
|
},
|
|
wantDetailKeys: map[string][]string{
|
|
"cache miss": {"mode", "mtime", "path", "size"},
|
|
"cached": {"dur", "path", "size"},
|
|
"error": {"dur", "error", "path"},
|
|
"snapshotted file": {"dur", "path", "size"},
|
|
"snapshotted symlink": {"dur", "path", "size"},
|
|
"snapshotted directory": {"dur", "path", "size"},
|
|
"ignored directory": {"dur", "path"},
|
|
"ignored": {"dur", "path"},
|
|
},
|
|
wantEntries: []string{
|
|
"ignored directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"ignored f1",
|
|
"snapshotted file f2",
|
|
"snapshotted file f3",
|
|
"snapshotted symlink f4",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
wantEntriesSecond: []string{
|
|
"ignored directory d1/d3",
|
|
"snapshotted directory d1",
|
|
"ignored f1",
|
|
"cached f2",
|
|
"cached f3",
|
|
"cached f4",
|
|
"error f5",
|
|
"snapshotted directory .",
|
|
},
|
|
},
|
|
}
|
|
|
|
sourceInfo := snapshot.SourceInfo{
|
|
Host: "somehost",
|
|
UserName: "someuser",
|
|
Path: "/somepath",
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ml := &mockLogger{}
|
|
|
|
logUploader := zap.New(
|
|
zapcore.NewCore(
|
|
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
|
|
// Keys can be anything except the empty string.
|
|
TimeKey: zapcore.OmitKey,
|
|
LevelKey: zapcore.OmitKey,
|
|
NameKey: zapcore.OmitKey,
|
|
CallerKey: zapcore.OmitKey,
|
|
FunctionKey: zapcore.OmitKey,
|
|
MessageKey: "M",
|
|
StacktraceKey: "S",
|
|
LineEnding: zapcore.DefaultLineEnding,
|
|
EncodeLevel: zapcore.CapitalLevelEncoder,
|
|
EncodeTime: zapcore.ISO8601TimeEncoder,
|
|
EncodeDuration: zapcore.StringDurationEncoder,
|
|
EncodeCaller: zapcore.ShortCallerEncoder,
|
|
}),
|
|
ml,
|
|
zapcore.DebugLevel,
|
|
),
|
|
).Sugar()
|
|
|
|
ctx := testlogging.Context(t)
|
|
ctx = logging.WithLogger(ctx, func(module string) logging.Logger {
|
|
if module == "uploader" {
|
|
// only capture logs from the uploader, not the estimator.
|
|
return logUploader
|
|
}
|
|
|
|
return logging.NullLogger
|
|
})
|
|
th := newUploadTestHarness(ctx, t)
|
|
|
|
t.Cleanup(th.cleanup)
|
|
|
|
u := NewUploader(th.repo)
|
|
|
|
// make sure uploads are strictly sequential to get predictable log output.
|
|
u.ParallelUploads = 1
|
|
|
|
pol := *policy.DefaultPolicy
|
|
pol.OSSnapshotPolicy.VolumeShadowCopy.Enable = policy.NewOSSnapshotMode(policy.OSSnapshotNever)
|
|
|
|
if p := tc.globalLoggingPolicy; p != nil {
|
|
pol.LoggingPolicy = *p
|
|
}
|
|
|
|
if p := tc.globalFilesPolicy; p != nil {
|
|
pol.FilesPolicy = *p
|
|
}
|
|
|
|
policy.SetPolicy(ctx, th.repo, policy.GlobalPolicySourceInfo, &pol)
|
|
|
|
u.OverrideDirLogDetail = tc.dirLogDetail
|
|
u.OverrideEntryLogDetail = tc.entryLogDetail
|
|
|
|
ml.logged = nil
|
|
|
|
polTree, err := policy.TreeForSource(ctx, th.repo, sourceInfo)
|
|
require.NoError(t, err)
|
|
|
|
man1, err := u.Upload(ctx, sourceDir, polTree, sourceInfo)
|
|
require.NoError(t, err)
|
|
|
|
var gotEntries []string
|
|
|
|
for _, l := range ml.logged {
|
|
gotEntries = append(gotEntries, fmt.Sprintf("%v %v", l.msg, l.keysAndValues["path"]))
|
|
|
|
assert.Contains(t, tc.wantDetailKeys, l.msg)
|
|
verifyLogDetails(t, l.msg, tc.wantDetailKeys[l.msg], l.keysAndValues)
|
|
}
|
|
|
|
require.Equal(t, tc.wantEntries, gotEntries)
|
|
|
|
ml.logged = nil
|
|
|
|
// run second upload with previous manifest to trigger cache.
|
|
_, err = u.Upload(ctx, sourceDir, polTree, sourceInfo, man1)
|
|
require.NoError(t, err)
|
|
|
|
var gotEntriesSecond []string
|
|
|
|
for _, l := range ml.logged {
|
|
gotEntriesSecond = append(gotEntriesSecond, fmt.Sprintf("%v %v", l.msg, l.keysAndValues["path"]))
|
|
|
|
require.Contains(t, tc.wantDetailKeys, l.msg)
|
|
verifyLogDetails(t, "second "+l.msg, tc.wantDetailKeys[l.msg], l.keysAndValues)
|
|
}
|
|
|
|
require.Equal(t, tc.wantEntriesSecond, gotEntriesSecond)
|
|
})
|
|
}
|
|
}
|
|
|
|
func verifyLogDetails(t *testing.T, desc string, wantDetailKeys []string, keysAndValues map[string]any) {
|
|
t.Helper()
|
|
|
|
var gotDetailKeys []string
|
|
|
|
for k := range keysAndValues {
|
|
gotDetailKeys = append(gotDetailKeys, k)
|
|
}
|
|
|
|
sort.Strings(gotDetailKeys)
|
|
sort.Strings(wantDetailKeys)
|
|
require.Equal(t, wantDetailKeys, gotDetailKeys, "invalid details for "+desc)
|
|
}
|