From e64d5b8eab2bf9855fe5d0a86113164b4e0c8ca4 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 1 Jul 2021 21:37:27 -0700 Subject: [PATCH] Fixed few subtle threading bugs uncovered by stress test and rewrote the test to be model-based (#1157) * testing: refactored logs directory management * content: fixed index mutex to be shared across all write sessions added mutex protection during writecontent/refresh race * testing: upload log artifacts * content: bump revision number after index has been added This fixes a bug where manifest manager in another session for the same open repository may not see a content added, because they will prematurely cache the incomplete set of contents. This took 2 weeks to find. * manifest: improved log output, fixed unnecessary mutex release * testing: rewrote stress test to be model-based and more precise --- .github/workflows/code-coverage.yml | 7 + .github/workflows/make.yml | 7 + .github/workflows/stress-test.yml | 33 + Makefile | 10 +- internal/testutil/tmpdir.go | 105 ++++ repo/content/committed_content_index.go | 8 +- repo/content/committed_read_manager.go | 6 + repo/content/content_manager.go | 21 +- repo/content/content_manager_indexes.go | 28 +- repo/manifest/committed_manifest_manager.go | 58 +- .../repomodel/content_set.go | 93 +++ .../repomodel/manifest_set.go | 93 +++ .../repomodel/open_repository_model.go | 28 + .../repomodel/repository_data_model.go | 30 + .../repomodel/repository_session_model.go | 47 ++ .../repository_stress_test.go | 592 ++++++++++++------ tests/stress_test/stress_test.go | 6 +- tests/testenv/cli_exe_runner.go | 30 +- tests/testenv/cli_test_env.go | 63 -- 19 files changed, 929 insertions(+), 336 deletions(-) create mode 100644 .github/workflows/stress-test.yml create mode 100644 tests/repository_stress_test/repomodel/content_set.go create mode 100644 tests/repository_stress_test/repomodel/manifest_set.go create mode 100644 tests/repository_stress_test/repomodel/open_repository_model.go create mode 100644 tests/repository_stress_test/repomodel/repository_data_model.go create mode 100644 tests/repository_stress_test/repomodel/repository_session_model.go diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml index a9e64eddd..687b8fa3a 100644 --- a/.github/workflows/code-coverage.yml +++ b/.github/workflows/code-coverage.yml @@ -18,3 +18,10 @@ jobs: run: make test-with-coverage - name: Publish Coverage Results run: make ci-publish-coverage + - name: Upload Logs + uses: actions/upload-artifact@v2 + with: + name: logs + path: .logs/**/*.log + if-no-files-found: ignore + if: ${{ always() }} \ No newline at end of file diff --git a/.github/workflows/make.yml b/.github/workflows/make.yml index 8cb057d7c..7bfe0f818 100644 --- a/.github/workflows/make.yml +++ b/.github/workflows/make.yml @@ -103,6 +103,13 @@ jobs: - name: Integration Tests run: make -j2 ci-integration-tests continue-on-error: ${{ github.event_name != 'pull_request' }} + - name: Upload Logs + uses: actions/upload-artifact@v2 + with: + name: logs + path: .logs/**/*.log + if-no-files-found: ignore + if: ${{ always() }} - name: Upload Kopia Artifacts uses: actions/upload-artifact@v2 with: diff --git a/.github/workflows/stress-test.yml b/.github/workflows/stress-test.yml new file mode 100644 index 000000000..5ba5368ba --- /dev/null +++ b/.github/workflows/stress-test.yml @@ -0,0 +1,33 @@ +name: Stress Test +on: + push: + branches: [ master ] + tags: + - v* + pull_request: {} + schedule: + # run every 2 hours + - cron: '12 */2 * * *' +jobs: + endurance-test: + name: Stress Test + runs-on: ubuntu-latest + steps: + - name: Set up Go. + uses: actions/setup-go@v2 + with: + go-version: ^1.16 + id: go + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Stress Test + run: make stress-test + - name: Upload Logs + uses: actions/upload-artifact@v2 + with: + name: logs + path: .logs/**/*.log + if-no-files-found: ignore + if: ${{ always() }} \ No newline at end of file diff --git a/Makefile b/Makefile index 1bc158842..a467c115c 100644 --- a/Makefile +++ b/Makefile @@ -157,7 +157,6 @@ ci-integration-tests: $(MAKE) integration-tests $(MAKE) integration-tests-index-v2 $(MAKE) robustness-tool-tests - $(MAKE) stress-test ci-publish-coverage: ifeq ($(GOOS)/$(GOARCH),linux/amd64) @@ -250,10 +249,13 @@ ifeq ($(GOOS)/$(GOARCH),linux/amd64) $(GO_TEST) -count=$(REPEAT_TEST) github.com/kopia/kopia/tests/tools/... github.com/kopia/kopia/tests/robustness/engine/... $(TEST_FLAGS) endif -stress_test: export KOPIA_LONG_STRESS_TEST=1 +stress-test: export KOPIA_STRESS_TEST=1 +stress-test: export KOPIA_DEBUG_MANIFEST_MANAGER=1 +stress-test: export KOPIA_LOGS_DIR=$(CURDIR)/.logs +stress-test: export KOPIA_KEEP_LOGS=1 stress-test: $(gotestsum) - $(GO_TEST) -count=$(REPEAT_TEST) -timeout 200s github.com/kopia/kopia/tests/stress_test - $(GO_TEST) -count=$(REPEAT_TEST) -timeout 200s github.com/kopia/kopia/tests/repository_stress_test + $(GO_TEST) -count=$(REPEAT_TEST) -timeout 3600s github.com/kopia/kopia/tests/stress_test + $(GO_TEST) -count=$(REPEAT_TEST) -timeout 3600s github.com/kopia/kopia/tests/repository_stress_test layering-test: ifneq ($(GOOS),windows) diff --git a/internal/testutil/tmpdir.go b/internal/testutil/tmpdir.go index b3d7575c8..db8ab7921 100644 --- a/internal/testutil/tmpdir.go +++ b/internal/testutil/tmpdir.go @@ -1,6 +1,7 @@ package testutil import ( + "fmt" "io/ioutil" "math/rand" "os" @@ -9,6 +10,14 @@ "testing" "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/clock" +) + +const ( + maxOutputLinesToLog = 4000 + logsDirPermissions = 0o750 ) var interestingLengths = []int{10, 50, 100, 240, 250, 260, 270} @@ -55,3 +64,99 @@ func TempDirectory(t *testing.T) string { return d } + +// TempLogDirectory returns a temporary directory used for storing logs. +// If KOPIA_LOGS_DIR is provided. +func TempLogDirectory(t *testing.T) string { + t.Helper() + + cleanName := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll( + t.Name(), + "/", "_"), "\\", "_"), ":", "_") + + t.Helper() + + logsBaseDir := os.Getenv("KOPIA_LOGS_DIR") + if logsBaseDir == "" { + logsBaseDir = filepath.Join(os.TempDir(), "kopia-logs") + } + + logsDir := filepath.Join(logsBaseDir, cleanName+"."+clock.Now().Local().Format("20060102150405")) + + require.NoError(t, os.MkdirAll(logsDir, logsDirPermissions)) + + t.Cleanup(func() { + if os.Getenv("KOPIA_KEEP_LOGS") != "" { + t.Logf("logs preserved in %v", logsDir) + return + } + + if t.Failed() && os.Getenv("KOPIA_DISABLE_LOG_DUMP_ON_FAILURE") == "" { + dumpLogs(t, logsDir) + } + + os.RemoveAll(logsDir) // nolint:errcheck + }) + + return logsDir +} + +func dumpLogs(t *testing.T, dirname string) { + t.Helper() + + entries, err := ioutil.ReadDir(dirname) + if err != nil { + t.Errorf("unable to read %v: %v", dirname, err) + + return + } + + for _, e := range entries { + if e.IsDir() { + dumpLogs(t, filepath.Join(dirname, e.Name())) + continue + } + + dumpLogFile(t, filepath.Join(dirname, e.Name())) + } +} + +func dumpLogFile(t *testing.T, fname string) { + t.Helper() + + // nolint:gosec + data, err := ioutil.ReadFile(fname) + if err != nil { + t.Error(err) + return + } + + t.Logf("LOG FILE: %v %v", fname, trimOutput(string(data))) +} + +func trimOutput(s string) string { + lines := splitLines(s) + if len(lines) <= maxOutputLinesToLog { + return s + } + + lines2 := append([]string(nil), lines[0:(maxOutputLinesToLog/2)]...) // nolint:gomnd + lines2 = append(lines2, fmt.Sprintf("/* %v lines removed */", len(lines)-maxOutputLinesToLog)) + lines2 = append(lines2, lines[len(lines)-(maxOutputLinesToLog/2):]...) // nolint:gomnd + + return strings.Join(lines2, "\n") +} + +func splitLines(s string) []string { + s = strings.TrimSpace(s) + if s == "" { + return nil + } + + var result []string + for _, l := range strings.Split(s, "\n") { + result = append(result, strings.TrimRight(l, "\r")) + } + + return result +} diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index c6350874f..b77100eca 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -66,7 +66,11 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) { } func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID blob.ID, data []byte, use bool) error { - atomic.AddInt64(&c.rev, 1) + // ensure we bump revision number AFTER this function + // doing it prematurely might confuse callers of revision() who may cache + // a set of old contents and associate it with new revision, before new contents + // are actually available. + defer atomic.AddInt64(&c.rev, 1) if err := c.cache.addContentToCache(ctx, indexBlobID, data); err != nil { return errors.Wrap(err, "error adding content to cache") @@ -160,7 +164,7 @@ func (c *committedContentIndex) use(ctx context.Context, indexFiles []blob.ID) e return nil } - c.log.Debugf("use-indexes", indexFiles) + c.log.Debugf("use-indexes %v", indexFiles) mergedAndCombined, newInUse, err := c.merge(ctx, indexFiles) if err != nil { diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 39b4b5067..6cb65a947 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -5,6 +5,7 @@ "context" "os" "path/filepath" + "sync" "sync/atomic" "time" @@ -53,6 +54,11 @@ type SharedManager struct { enc *encryptedBlobMgr timeNow func() time.Time + // lock to protect the set of commtited indexes + // shared lock will be acquired when writing new content to allow it to happen in parallel + // exclusive lock will be acquired during compaction or refresh. + indexesLock sync.RWMutex + format FormattingOptions checkInvariantsOnUnlock bool writeFormatVersion int32 // format version to write diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 665214f04..786ba9989 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -375,6 +375,12 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { bm.onUpload(int64(len(data))) + // we must hold a lock between writing an index and adding index blob to committed contents index + // otherwise it is possible for concurrent compaction or refresh to forget about the blob we have just + // written + bm.indexesLock.RLock() + defer bm.indexesLock.RUnlock() + indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data, bm.currentSessionInfo.ID) if err != nil { return errors.Wrap(err, "error writing index blob") @@ -745,21 +751,6 @@ func (bm *WriteManager) unlock() { bm.mu.Unlock() } -// Refresh reloads the committed content indexes. -func (bm *WriteManager) Refresh(ctx context.Context) error { - bm.lock() - defer bm.unlock() - - bm.log.Debugf("Refresh started") - - t0 := clock.Now() - - err := bm.loadPackIndexesUnlocked(ctx) - bm.log.Debugf("Refresh completed in %v", clock.Since(t0)) - - return err -} - // SyncMetadataCache synchronizes metadata cache with metadata blobs in storage. func (bm *WriteManager) SyncMetadataCache(ctx context.Context) error { if cm, ok := bm.metadataCache.(*contentCacheForMetadata); ok { diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 479c1ba7c..b35382bcf 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -7,6 +7,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/repo/blob" ) @@ -29,21 +30,36 @@ func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration { return defaultEventualConsistencySettleTime } +// Refresh reloads the committed content indexes. +func (sm *SharedManager) Refresh(ctx context.Context) error { + sm.indexesLock.Lock() + defer sm.indexesLock.Unlock() + + sm.log.Debugf("Refresh started") + + t0 := clock.Now() + + err := sm.loadPackIndexesUnlocked(ctx) + sm.log.Debugf("Refresh completed in %v", clock.Since(t0)) + + return err +} + // CompactIndexes performs compaction of index blobs ensuring that # of small index blobs is below opt.maxSmallBlobs. -func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) error { +func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions) error { // we must hold the lock here to avoid the race with Refresh() which can reload the // current set of indexes while we process them. - bm.mu.Lock() - defer bm.mu.Unlock() + sm.indexesLock.Lock() + defer sm.indexesLock.Unlock() - bm.log.Debugf("CompactIndexes(%+v)", opt) + sm.log.Debugf("CompactIndexes(%+v)", opt) - if err := bm.indexBlobManager.compact(ctx, opt); err != nil { + if err := sm.indexBlobManager.compact(ctx, opt); err != nil { return errors.Wrap(err, "error performing compaction") } // reload indexes after compaction. - if err := bm.loadPackIndexesUnlocked(ctx); err != nil { + if err := sm.loadPackIndexesUnlocked(ctx); err != nil { return errors.Wrap(err, "error re-loading indexes") } diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index c99d3fa3d..6deaaf9f7 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -5,9 +5,13 @@ "compress/gzip" "context" "encoding/json" + "fmt" + "os" + "sort" "sync" "github.com/pkg/errors" + "golang.org/x/exp/rand" "github.com/kopia/kopia/repo/content" ) @@ -16,6 +20,8 @@ type committedManifestManager struct { b contentManager + debugID string + cmmu sync.Mutex lastRevision int64 locked bool @@ -24,28 +30,48 @@ type committedManifestManager struct { } func (m *committedManifestManager) getCommittedEntryOrNil(ctx context.Context, id ID) (*manifestEntry, error) { - if err := m.ensureInitialized(ctx); err != nil { - return nil, err - } - m.lock() defer m.unlock() + if err := m.ensureInitializedLocked(ctx); err != nil { + return nil, err + } + return m.committedEntries[id], nil } -func (m *committedManifestManager) findCommittedEntries(ctx context.Context, labels map[string]string) (map[ID]*manifestEntry, error) { - if err := m.ensureInitialized(ctx); err != nil { - return nil, err +func (m *committedManifestManager) dump(ctx context.Context, prefix string) { + if m.debugID == "" { + return } + var keys []string + + for k := range m.committedEntries { + keys = append(keys, string(k)) + } + + sort.Strings(keys) + + log(ctx).Debugf(prefix+"["+m.debugID+"] committed keys %v: %v rev=%v", len(keys), keys, m.lastRevision) +} + +func (m *committedManifestManager) findCommittedEntries(ctx context.Context, labels map[string]string) (map[ID]*manifestEntry, error) { m.lock() defer m.unlock() + if err := m.ensureInitializedLocked(ctx); err != nil { + return nil, err + } + return findEntriesMatchingLabels(m.committedEntries, labels), nil } func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { + if len(entries) == 0 { + return nil, nil + } + m.lock() defer m.unlock() @@ -243,21 +269,25 @@ func (m *committedManifestManager) mergeEntryLocked(e *manifestEntry) { } } -func (m *committedManifestManager) ensureInitialized(ctx context.Context) error { - m.lock() - defer m.unlock() - +func (m *committedManifestManager) ensureInitializedLocked(ctx context.Context) error { rev := m.b.Revision() if m.lastRevision == rev { + if m.debugID != "" { + log(ctx).Debugf("%v up-to-date rev=%v last=%v", m.debugID, rev, m.lastRevision) + } + return nil } + log(ctx).Debugf("reloading committed manifest contents: rev=%v last=%v", rev, m.lastRevision) + if err := m.loadCommittedContentsLocked(ctx); err != nil { return err } m.lastRevision = rev + m.dump(ctx, "after ensureInitialized: ") // it is possible that the content manager revision has changed while we were reading it, // that's ok - we read __some__ consistent set of data and next time we will invalidate again. @@ -301,8 +331,14 @@ func loadManifestContent(ctx context.Context, b contentManager, contentID conten } func newCommittedManager(b contentManager) *committedManifestManager { + debugID := "" + if os.Getenv("KOPIA_DEBUG_MANIFEST_MANAGER") != "" { + debugID = fmt.Sprintf("%x", rand.Int63()) + } + return &committedManifestManager{ b: b, + debugID: debugID, committedEntries: map[ID]*manifestEntry{}, committedContentIDs: map[content.ID]bool{}, } diff --git a/tests/repository_stress_test/repomodel/content_set.go b/tests/repository_stress_test/repomodel/content_set.go new file mode 100644 index 000000000..6ffd08259 --- /dev/null +++ b/tests/repository_stress_test/repomodel/content_set.go @@ -0,0 +1,93 @@ +package repomodel + +import ( + "math/rand" + "sync" + + "github.com/kopia/kopia/repo/content" +) + +// ContentSet represents a set of contents. +type ContentSet struct { + mu sync.Mutex + ids []content.ID +} + +// PickRandom picks one random content from the set or empty string. +func (s *ContentSet) PickRandom() content.ID { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.ids) == 0 { + return "" + } + + // nolint:gosec + return s.ids[rand.Intn(len(s.ids))] +} + +// Snapshot returns the snapshot of all IDs. +func (s *ContentSet) Snapshot() ContentSet { + s.mu.Lock() + defer s.mu.Unlock() + + return ContentSet{ + ids: append([]content.ID(nil), s.ids...), + } +} + +// Replace replaces all elements in the set. +func (s *ContentSet) Replace(ids []content.ID) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ids = append([]content.ID(nil), s.ids...) +} + +// Add adds the provided items to the set. +func (s *ContentSet) Add(d ...content.ID) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ids = append(s.ids, d...) +} + +// RemoveAll removes the provided items from the set. +func (s *ContentSet) RemoveAll(d ...content.ID) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ids = removeAllContentIDs(s.ids, d) +} + +func removeAllContentIDs(a, b []content.ID) []content.ID { + var result []content.ID + + for _, v := range a { + found := false + + for _, v2 := range b { + if v2 == v { + found = true + break + } + } + + if !found { + result = append(result, v) + } + } + + return result +} + +// Clear removes all elements from the set. +func (s *ContentSet) Clear() ContentSet { + s.mu.Lock() + defer s.mu.Unlock() + + old := s.ids + s.ids = nil + + return ContentSet{ids: old} +} diff --git a/tests/repository_stress_test/repomodel/manifest_set.go b/tests/repository_stress_test/repomodel/manifest_set.go new file mode 100644 index 000000000..f04ecbb06 --- /dev/null +++ b/tests/repository_stress_test/repomodel/manifest_set.go @@ -0,0 +1,93 @@ +package repomodel + +import ( + "math/rand" + "sync" + + "github.com/kopia/kopia/repo/manifest" +) + +// ManifestSet represents a set of manifests. +type ManifestSet struct { + mu sync.Mutex + ids []manifest.ID +} + +// PickRandom picks one random manifest from the set or empty string. +func (s *ManifestSet) PickRandom() manifest.ID { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.ids) == 0 { + return "" + } + + // nolint:gosec + return s.ids[rand.Intn(len(s.ids))] +} + +// Snapshot returns the snapshot of all IDs. +func (s *ManifestSet) Snapshot() ManifestSet { + s.mu.Lock() + defer s.mu.Unlock() + + return ManifestSet{ + ids: append([]manifest.ID(nil), s.ids...), + } +} + +// Replace replaces all elements in the set. +func (s *ManifestSet) Replace(ids []manifest.ID) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ids = append([]manifest.ID(nil), s.ids...) +} + +// Add adds the provided items to the set. +func (s *ManifestSet) Add(d ...manifest.ID) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ids = append(s.ids, d...) +} + +// RemoveAll removes the provided items from the set. +func (s *ManifestSet) RemoveAll(d ...manifest.ID) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ids = removeAllManifestIDs(s.ids, d) +} + +func removeAllManifestIDs(a, b []manifest.ID) []manifest.ID { + var result []manifest.ID + + for _, v := range a { + found := false + + for _, v2 := range b { + if v2 == v { + found = true + break + } + } + + if !found { + result = append(result, v) + } + } + + return result +} + +// Clear removes all elements from the set. +func (s *ManifestSet) Clear() ManifestSet { + s.mu.Lock() + defer s.mu.Unlock() + + old := s.ids + s.ids = nil + + return ManifestSet{ids: old} +} diff --git a/tests/repository_stress_test/repomodel/open_repository_model.go b/tests/repository_stress_test/repomodel/open_repository_model.go new file mode 100644 index 000000000..1dc4e9e6c --- /dev/null +++ b/tests/repository_stress_test/repomodel/open_repository_model.go @@ -0,0 +1,28 @@ +package repomodel + +import "sync" + +// OpenRepository models the behavior of an open repository. +type OpenRepository struct { + RepoData *RepositoryData + + Contents ContentSet + Manifests ManifestSet + + EnableMaintenance bool + + mu sync.Mutex +} + +// Refresh refreshes the set of committed Contents and manifest from repositor. +func (o *OpenRepository) Refresh() { + o.Contents.Replace(o.RepoData.Contents.Snapshot().ids) + o.Manifests.Replace(o.RepoData.Manifests.Snapshot().ids) +} + +// NewSession creates new model for a session to access a repository. +func (o *OpenRepository) NewSession() *RepositorySession { + return &RepositorySession{ + OpenRepo: o, + } +} diff --git a/tests/repository_stress_test/repomodel/repository_data_model.go b/tests/repository_stress_test/repomodel/repository_data_model.go new file mode 100644 index 000000000..6d23bf67a --- /dev/null +++ b/tests/repository_stress_test/repomodel/repository_data_model.go @@ -0,0 +1,30 @@ +// Package repomodel provides simplified model of repository operaton. +package repomodel + +import "sync/atomic" + +// RepositoryData models the d stored in the repository. +type RepositoryData struct { + Contents ContentSet + Manifests ManifestSet + + openCounter *int32 +} + +// OpenRepository returns an OpenRepository model based on current snapshot of RepositoryData. +func (d *RepositoryData) OpenRepository() *OpenRepository { + return &OpenRepository{ + RepoData: d, + + Contents: d.Contents.Snapshot(), + Manifests: d.Manifests.Snapshot(), + EnableMaintenance: atomic.AddInt32(d.openCounter, 1) == 1, + } +} + +// NewRepositoryData creates new RepositoryData model. +func NewRepositoryData() *RepositoryData { + return &RepositoryData{ + openCounter: new(int32), + } +} diff --git a/tests/repository_stress_test/repomodel/repository_session_model.go b/tests/repository_stress_test/repomodel/repository_session_model.go new file mode 100644 index 000000000..f5d3083e5 --- /dev/null +++ b/tests/repository_stress_test/repomodel/repository_session_model.go @@ -0,0 +1,47 @@ +package repomodel + +import ( + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/manifest" +) + +// RepositorySession models the behavior of a single session in an repository. +type RepositorySession struct { + OpenRepo *OpenRepository + + WrittenContents ContentSet + WrittenManifests ManifestSet +} + +// WriteContent adds the provided content ID to the model. +func (s *RepositorySession) WriteContent(cid content.ID) { + s.WrittenContents.Add(cid) +} + +// WriteManifest adds the provided manifest ID to the model. +func (s *RepositorySession) WriteManifest(mid manifest.ID) { + s.WrittenManifests.Add(mid) +} + +// Refresh refreshes the set of committed contents and manifest from repositor. +func (s *RepositorySession) Refresh() { + s.OpenRepo.Refresh() +} + +// Flush flushes the changes written in this RepositorySession and makes them available +// to other RepositoryData model. +func (s *RepositorySession) Flush(wc *ContentSet, wm *ManifestSet) { + s.OpenRepo.mu.Lock() + defer s.OpenRepo.mu.Unlock() + + // data flushed is visible to other sessions in the same open repository. + s.OpenRepo.Contents.Add(wc.ids...) + s.OpenRepo.Manifests.Add(wm.ids...) + + // data flushed is visible to other sessions in other open repositories. + s.OpenRepo.RepoData.Contents.Add(wc.ids...) + s.OpenRepo.RepoData.Manifests.Add(wm.ids...) + + s.WrittenContents.RemoveAll(wc.ids...) + s.WrittenManifests.RemoveAll(wm.ids...) +} diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index a24938bb2..17738e65d 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -5,36 +5,194 @@ cryptorand "crypto/rand" "fmt" "io/ioutil" - "log" "math/rand" "os" "path/filepath" - "runtime" - "strings" - "sync" + "sync/atomic" "testing" "time" "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob/filesystem" "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/logging" + "github.com/kopia/kopia/tests/repository_stress_test/repomodel" ) +const ( + shortStressTestDuration = 10 * time.Second + longStressTestDuration = 120 * time.Second +) + +type actName string + +const ( + actWriteRandomContent = "writeRandomContent" + actReadPendingContent = "readPendingContent" + actReadFlushContent = "readFlushedContent" + + actListContents = "listContents" + actListAndReadAllContents = "listAndReadAllContents" + + actCompact = "compact" + actFlush = "flush" + actRefresh = "refresh" + + actReadPendingManifest = "readPendingManifest" + actReadFlushedManifest = "readFlushedManifest" + actWriteRandomManifest = "writeRandomManifest" +) + +var actions = map[actName]func(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error{ + actWriteRandomContent: writeRandomContent, + actReadPendingContent: readPendingContent, + actReadFlushContent: readFlushedContent, + actListContents: listContents, + actListAndReadAllContents: listAndReadAllContents, + actCompact: compact, + actFlush: flush, + actRefresh: refresh, + actReadPendingManifest: readPendingManifest, + actReadFlushedManifest: readFlushedManifest, + actWriteRandomManifest: writeRandomManifest, +} + +type StressOptions struct { + ConfigsPerRepository int + OpenRepositoriesPerConfig int + SessionsPerOpenRepository int + WorkersPerSession int + + ActionWeights map[actName]int +} + +var errSkipped = errors.Errorf("skipped") + const masterPassword = "foo-bar-baz-1234" -var ( - knownBlocks []content.ID - knownBlocksMutex sync.Mutex -) +func init() { + rand.Seed(clock.Now().UnixNano()) +} -func TestStressRepository(t *testing.T) { - if testing.Short() { - t.Skip("skipping stress test during short tests") +func TestStressRepositoryMixAll(t *testing.T) { + runStress(t, &StressOptions{ + ConfigsPerRepository: 2, + OpenRepositoriesPerConfig: 2, + SessionsPerOpenRepository: 2, + WorkersPerSession: 2, + + ActionWeights: map[actName]int{ + actWriteRandomContent: 10, + actReadPendingContent: 500, + actReadFlushContent: 500, + actListContents: 10, + actListAndReadAllContents: 10, + actCompact: 2, + actFlush: 10, + actRefresh: 20, + actReadPendingManifest: 300, + actReadFlushedManifest: 300, + actWriteRandomManifest: 20, + }, + }) +} + +func TestStressRepositoryRandomMix(t *testing.T) { + runStress(t, &StressOptions{ + ConfigsPerRepository: 1 + rand.Intn(3), + OpenRepositoriesPerConfig: 1 + rand.Intn(2), + SessionsPerOpenRepository: 1 + rand.Intn(2), + WorkersPerSession: 1 + rand.Intn(2), + + ActionWeights: map[actName]int{ + actWriteRandomContent: 1 + rand.Intn(100), + actReadPendingContent: 1 + rand.Intn(100), + actReadFlushContent: 1 + rand.Intn(100), + actListContents: 1 + rand.Intn(100), + actListAndReadAllContents: 1 + rand.Intn(100), + actCompact: 1 + rand.Intn(100), + actFlush: 1 + rand.Intn(100), + actRefresh: 1 + rand.Intn(100), + actReadPendingManifest: 1 + rand.Intn(100), + actReadFlushedManifest: 1 + rand.Intn(100), + actWriteRandomManifest: 1 + rand.Intn(100), + }, + }) +} + +func TestStressRepositoryManifests(t *testing.T) { + runStress(t, &StressOptions{ + ConfigsPerRepository: 1, + OpenRepositoriesPerConfig: 1, + SessionsPerOpenRepository: 2, + WorkersPerSession: 1, + + ActionWeights: map[actName]int{ + actCompact: 2, + actFlush: 10, + actRefresh: 20, + actReadPendingManifest: 300, + actReadFlushedManifest: 300, + actWriteRandomManifest: 1, + }, + }) +} + +func TestStressContentWriteHeavy(t *testing.T) { + runStress(t, &StressOptions{ + ConfigsPerRepository: 2, + OpenRepositoriesPerConfig: 2, + SessionsPerOpenRepository: 2, + WorkersPerSession: 2, + + ActionWeights: map[actName]int{ + actWriteRandomContent: 500, + actReadPendingContent: 10, + actReadFlushContent: 10, + actListContents: 10, + actListAndReadAllContents: 10, + actCompact: 2, + actFlush: 10, + actRefresh: 20, + }, + }) +} + +func TestStressContentReadHeavy(t *testing.T) { + runStress(t, &StressOptions{ + ConfigsPerRepository: 2, + OpenRepositoriesPerConfig: 2, + SessionsPerOpenRepository: 2, + WorkersPerSession: 2, + + ActionWeights: map[actName]int{ + actWriteRandomContent: 5, + actReadPendingContent: 500, + actReadFlushContent: 500, + actListContents: 500, + actListAndReadAllContents: 10, + actCompact: 2, + actFlush: 10, + actRefresh: 20, + }, + }) +} + +// nolint:thelper +func runStress(t *testing.T, opt *StressOptions) { + if os.Getenv("KOPIA_STRESS_TEST") == "" { + t.Skip("skipping stress test") } + t.Logf("running stress test with options: %#v", *opt) + ctx := testlogging.Context(t) tmpPath, err := ioutil.TempDir("", "kopia") @@ -51,8 +209,6 @@ func TestStressRepository(t *testing.T) { t.Logf("path: %v", tmpPath) storagePath := filepath.Join(tmpPath, "storage") - configFile1 := filepath.Join(tmpPath, "kopia1.config") - configFile2 := filepath.Join(tmpPath, "kopia2.config") assertNoError(t, os.MkdirAll(storagePath, 0o700)) @@ -64,281 +220,302 @@ func TestStressRepository(t *testing.T) { } // create repository - if err := repo.Initialize(ctx, st, &repo.NewRepositoryOptions{}, masterPassword); err != nil { + if err = repo.Initialize(ctx, st, &repo.NewRepositoryOptions{}, masterPassword); err != nil { t.Fatalf("unable to initialize repository: %v", err) } + var configFiles []string + // set up two parallel kopia connections, each with its own config file and cache. - if err := repo.Connect(ctx, configFile1, st, masterPassword, &repo.ConnectOptions{ - CachingOptions: content.CachingOptions{ - CacheDirectory: filepath.Join(tmpPath, "cache1"), - MaxCacheSizeBytes: 2000000000, - }, - }); err != nil { - t.Fatalf("unable to connect 1: %v", err) + for i := 0; i < opt.ConfigsPerRepository; i++ { + configFile := filepath.Join(tmpPath, fmt.Sprintf("kopia-%v.config", i)) + configFiles = append(configFiles, configFile) + + if err = repo.Connect(ctx, configFile, st, masterPassword, &repo.ConnectOptions{ + CachingOptions: content.CachingOptions{ + CacheDirectory: filepath.Join(tmpPath, fmt.Sprintf("cache-%v", i)), + MaxCacheSizeBytes: 2000000000, + }, + }); err != nil { + t.Fatalf("unable to connect %v: %v", configFile, err) + } } - if err := repo.Connect(ctx, configFile2, st, masterPassword, &repo.ConnectOptions{ - CachingOptions: content.CachingOptions{ - CacheDirectory: filepath.Join(tmpPath, "cache2"), - MaxCacheSizeBytes: 2000000000, - }, - }); err != nil { - t.Fatalf("unable to connect 2: %v", err) + stop := new(int32) + + eg, ctx := errgroup.WithContext(ctx) + + logDir := testutil.TempLogDirectory(t) + + rm := repomodel.NewRepositoryData() + + logFileName := filepath.Join(logDir, "workers.log") + logFile, err := os.Create(logFileName) + require.NoError(t, err) + t.Logf("log file: %v", logFileName) + + defer logFile.Close() + + for _, configFile := range configFiles { + configFile := configFile + + for i := 0; i < opt.OpenRepositoriesPerConfig; i++ { + i := i + + eg.Go(func() error { + log := logging.WithPrefix(fmt.Sprintf("%v::o%v", filepath.Base(configFile), i), logging.Broadcast{ + logging.Printf(func(msg string, args ...interface{}) { + fmt.Fprintf(logFile, clock.Now().Format("2006-01-02T15:04:05.000000Z07:00")+" "+msg+"\n", args...) + })("test"), + }) + + ctx2 := logging.WithLogger(ctx, func(module string) logging.Logger { + return log + }) + + return longLivedRepositoryTest(ctx2, t, configFile, rm, log, opt, stop) + }) + } } - cancel := make(chan struct{}) + duration := shortStressTestDuration + if os.Getenv("CI") != "" && os.Getenv("IS_PULL_REQUEST") == "false" { + duration = longStressTestDuration + } - var wg sync.WaitGroup + time.Sleep(duration) + atomic.StoreInt32(stop, 1) - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg) - - wg.Add(1) - - go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg) - - time.Sleep(5 * time.Second) - close(cancel) - - wg.Wait() + require.NoError(t, eg.Wait()) } -func longLivedRepositoryTest(ctx context.Context, t *testing.T, cancel chan struct{}, configFile string, wg *sync.WaitGroup) { +func longLivedRepositoryTest(ctx context.Context, t *testing.T, configFile string, rm *repomodel.RepositoryData, log logging.Logger, opt *StressOptions, stop *int32) error { t.Helper() - defer wg.Done() + // important to call OpenRepository() before repo.Open() to ensure we're not seeing state + // added between repo.Open() and OpenRepository() + or := rm.OpenRepository() rep, err := repo.Open(ctx, configFile, masterPassword, &repo.Options{}) if err != nil { - t.Errorf("error opening repository: %v", err) - return + return errors.Wrap(err, "error opening repository") } + defer rep.Close(ctx) - _, w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "longLivedRepositoryTest"}) - if err != nil { - t.Errorf("error opening writer: %v", err) - return + eg, ctx := errgroup.WithContext(ctx) + + for i := 0; i < opt.SessionsPerOpenRepository; i++ { + ors := or.NewSession() + + _, w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{ + Purpose: fmt.Sprintf("longLivedRepositoryTest-w%v", i), + }) + if err != nil { + return errors.Wrap(err, "error opening writer") + } + + for j := 0; j < opt.WorkersPerSession; j++ { + log2 := logging.WithPrefix(fmt.Sprintf("s%vw%v::", i, j), log) + + eg.Go(func() error { + return repositoryTest(ctx, t, stop, w, ors, log2, opt) + }) + } } - var wg2 sync.WaitGroup - - for i := 0; i < 4; i++ { - wg2.Add(1) - - go func() { - defer wg2.Done() - - repositoryTest(ctx, t, cancel, w) - }() - } - - wg2.Wait() + return eg.Wait() } -func repositoryTest(ctx context.Context, t *testing.T, cancel chan struct{}, rep repo.DirectRepositoryWriter) { +func repositoryTest(ctx context.Context, t *testing.T, stop *int32, rep repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger, opt *StressOptions) error { t.Helper() - workTypes := []*struct { - name string - fun func(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error - weight int - hitCount int - }{ - // {"reopen", reopen, 1, 0}, - {"writeRandomBlock", writeRandomBlock, 100, 0}, - {"writeRandomManifest", writeRandomManifest, 100, 0}, - {"readKnownBlock", readKnownBlock, 500, 0}, - {"listContents", listContents, 50, 0}, - {"listAndReadAllContents", listAndReadAllContents, 5, 0}, - {"readRandomManifest", readRandomManifest, 50, 0}, - {"compact", compact, 1, 0}, - {"refresh", refresh, 3, 0}, - {"flush", flush, 1, 0}, - } - var totalWeight int - for _, w := range workTypes { - totalWeight += w.weight + for _, w := range opt.ActionWeights { + totalWeight += w } - iter := 0 - - for { - select { - case <-cancel: - return - default: - } - - if iter%1000 == 0 { - var bits []string - for _, w := range workTypes { - bits = append(bits, fmt.Sprintf("%v:%v", w.name, w.hitCount)) - } - - log.Printf("#%v %v %v goroutines", iter, strings.Join(bits, " "), runtime.NumGoroutine()) - } - iter++ - + for ctx.Err() == nil && atomic.LoadInt32(stop) == 0 { roulette := rand.Intn(totalWeight) - for _, w := range workTypes { - if roulette < w.weight { - w.hitCount++ + for act, weight := range opt.ActionWeights { + if roulette < weight { + if err := actions[act](ctx, rep, rs, log); err != nil { + if errors.Is(err, errSkipped) { + break + } - if err := w.fun(ctx, t, rep); err != nil { - w.hitCount++ - t.Errorf("error: %v", errors.Wrapf(err, "error running %v", w.name)) + log.Errorf("FAILED %v: %v", act, err) - return + return errors.Wrapf(err, "error running %v", act) } + log.Errorf("SUCCEEDED %v", act) + break } - roulette -= w.weight + roulette -= weight } } + + return nil } -func writeRandomBlock(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() - +func writeRandomContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { data := make([]byte, 1000) cryptorand.Read(data) contentID, err := r.ContentManager().WriteContent(ctx, data, "", content.NoCompression) - if err == nil { - knownBlocksMutex.Lock() - if len(knownBlocks) >= 1000 { - n := rand.Intn(len(knownBlocks)) - knownBlocks[n] = contentID - } else { - knownBlocks = append(knownBlocks, contentID) - } - knownBlocksMutex.Unlock() + if err != nil { + return errors.Wrap(err, "WriteContent error") } - return err + log.Debugf("writeRandomContent(%v,%x)", contentID, data[0:16]) + + rs.WriteContent(contentID) + + return errors.Wrapf(err, "writeRandomContent(%v)", contentID) } -func readKnownBlock(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() - - knownBlocksMutex.Lock() - if len(knownBlocks) == 0 { - knownBlocksMutex.Unlock() - return nil +func readPendingContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + contentID := rs.WrittenContents.PickRandom() + if contentID == "" { + return errSkipped } - contentID := knownBlocks[rand.Intn(len(knownBlocks))] - knownBlocksMutex.Unlock() + log.Debugf("readPendingContent(%v)", contentID) _, err := r.ContentReader().GetContent(ctx, contentID) - if err == nil || errors.Is(err, content.ErrContentNotFound) { + if err == nil { return nil } - return err + return errors.Wrapf(err, "readPendingContent(%v)", contentID) } -func listContents(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() +func readFlushedContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + contentID := rs.OpenRepo.Contents.PickRandom() + if contentID == "" { + return errSkipped + } - return r.ContentReader().IterateContents( + log.Debugf("readFlushedContent(%v)", contentID) + + _, err := r.ContentReader().GetContent(ctx, contentID) + if err == nil { + return nil + } + + return errors.Wrapf(err, "readFlushedContent(%v)", contentID) +} + +func listContents(ctx context.Context, r repo.DirectRepositoryWriter, _ *repomodel.RepositorySession, log logging.Logger) error { + log.Debugf("listContents()") + + return errors.Wrapf(r.ContentReader().IterateContents( ctx, content.IterateOptions{}, func(i content.Info) error { return nil }, - ) + ), "listContents()") } -func listAndReadAllContents(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() +func listAndReadAllContents(ctx context.Context, r repo.DirectRepositoryWriter, _ *repomodel.RepositorySession, log logging.Logger) error { + log.Debugf("listAndReadAllContents()") - return r.ContentReader().IterateContents( + return errors.Wrapf(r.ContentReader().IterateContents( ctx, content.IterateOptions{}, func(ci content.Info) error { cid := ci.GetContentID() _, err := r.ContentReader().GetContent(ctx, cid) if err != nil { - if errors.Is(err, content.ErrContentNotFound) && strings.HasPrefix(string(cid), "m") { - // this is ok, sometimes manifest manager will perform compaction and 'm' contents will be marked as deleted - return nil - } return errors.Wrapf(err, "error reading content %v", cid) } return nil - }) + }), "listAndReadAllContents()") } -func compact(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() - - return r.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1}) -} - -func flush(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() - - return r.Flush(ctx) -} - -func refresh(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() - - return r.Refresh(ctx) -} - -func readRandomManifest(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() - - manifests, err := r.FindManifests(ctx, nil) - if err != nil { - return err +func compact(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + if !rs.OpenRepo.EnableMaintenance { + return errSkipped } - if len(manifests) == 0 { + log.Debugf("compact()") + + return errors.Wrapf( + r.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1}), + "compact()") +} + +func flush(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + log.Debugf("flush()") + + // capture contents and manifests we had before we start flushing. + // this is necessary since operations can proceed in parallel to Flush() which might add more data + // to the model. It would be incorrect to flush the latest state of the model + // because we don't know for sure if the corresponding repository data has indeed been flushed. + wc := rs.WrittenContents.Snapshot() + wm := rs.WrittenManifests.Snapshot() + + if err := r.Flush(ctx); err != nil { + return errors.Wrap(err, "error flushing") + } + + // flush model after flushing the repository to communicate to other sessions that they can expect + // to see flushed items now. + rs.Flush(&wc, &wm) + + return nil +} + +func refresh(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + log.Debugf("refresh()") + + // refresh model before refreshing repository to guarantee that repository has at least all the items in + // the model (possibly more). + rs.Refresh() + + if err := r.Refresh(ctx); err != nil { + return errors.Wrap(err, "refresh error") + } + + return nil +} + +func readPendingManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + manifestID := rs.WrittenManifests.PickRandom() + if manifestID == "" { + return errSkipped + } + + log.Debugf("readPendingManifest(%v)", manifestID) + + _, err := r.GetManifest(ctx, manifestID, nil) + if err == nil { return nil } - n := rand.Intn(len(manifests)) - - _, err = r.GetManifest(ctx, manifests[n].ID, nil) - - return err + return errors.Wrapf(err, "readPendingManifest(%v)", manifestID) } -func writeRandomManifest(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error { - t.Helper() +func readFlushedManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { + manifestID := rs.OpenRepo.Manifests.PickRandom() + if manifestID == "" { + return errSkipped + } + log.Debugf("readFlushedManifest(%v)", manifestID) + + _, err := r.GetManifest(ctx, manifestID, nil) + if err == nil { + return nil + } + + return errors.Wrapf(err, "readFlushedManifest(%v)", manifestID) +} + +func writeRandomManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error { key1 := fmt.Sprintf("key-%v", rand.Intn(10)) key2 := fmt.Sprintf("key-%v", rand.Intn(10)) val1 := fmt.Sprintf("val1-%v", rand.Intn(10)) @@ -347,7 +524,8 @@ func writeRandomManifest(ctx context.Context, t *testing.T, r repo.DirectReposit content2 := fmt.Sprintf("content-%v", rand.Intn(10)) content1val := fmt.Sprintf("val1-%v", rand.Intn(10)) content2val := fmt.Sprintf("val2-%v", rand.Intn(10)) - _, err := r.PutManifest(ctx, map[string]string{ + + mid, err := r.PutManifest(ctx, map[string]string{ "type": key1, key1: val1, key2: val2, @@ -355,6 +533,12 @@ func writeRandomManifest(ctx context.Context, t *testing.T, r repo.DirectReposit content1: content1val, content2: content2val, }) + if err != nil { + return err + } + + log.Debugf("writeRandomManifest(%v)", mid) + rs.WriteManifest(mid) return err } diff --git a/tests/stress_test/stress_test.go b/tests/stress_test/stress_test.go index b9e60bd6a..ea1340958 100644 --- a/tests/stress_test/stress_test.go +++ b/tests/stress_test/stress_test.go @@ -20,8 +20,8 @@ const goroutineCount = 16 func TestStressBlockManager(t *testing.T) { - if testing.Short() { - t.Skip("skipping stress test during short tests") + if os.Getenv("KOPIA_STRESS_TEST") == "" { + t.Skip("skipping stress test") } data := blobtesting.DataMap{} @@ -29,7 +29,7 @@ func TestStressBlockManager(t *testing.T) { memst := blobtesting.NewMapStorage(data, keyTimes, clock.Now) duration := 3 * time.Second - if os.Getenv("KOPIA_LONG_STRESS_TEST") != "" { + if os.Getenv("CI") != "" { duration = 30 * time.Second } diff --git a/tests/testenv/cli_exe_runner.go b/tests/testenv/cli_exe_runner.go index 5e5001b6f..035430ac8 100644 --- a/tests/testenv/cli_exe_runner.go +++ b/tests/testenv/cli_exe_runner.go @@ -8,7 +8,7 @@ "strings" "testing" - "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/testutil" ) // CLIExeRunner is a CLIExeRunner that invokes the commands via external executable. @@ -85,33 +85,7 @@ func NewExeRunner(t *testing.T) *CLIExeRunner { // unset environment variables that disrupt tests when passed to subprocesses. os.Unsetenv("KOPIA_PASSWORD") - cleanName := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll( - t.Name(), - "/", "_"), "\\", "_"), ":", "_") - - logsBaseDir := os.Getenv("KOPIA_LOGS_DIR") - if logsBaseDir == "" { - logsBaseDir = filepath.Join(os.TempDir(), "kopia-logs") - } - - logsDir := filepath.Join(logsBaseDir, cleanName+"."+clock.Now().Local().Format("20060102150405")) - - t.Cleanup(func() { - if t.Failed() { - t.Logf("FAILURE ABOVE ^^^^") - } - - if os.Getenv("KOPIA_KEEP_LOGS") != "" { - t.Logf("logs preserved in %v", logsDir) - return - } - - if t.Failed() && os.Getenv("KOPIA_DISABLE_LOG_DUMP_ON_FAILURE") == "" { - dumpLogs(t, logsDir) - } - - os.RemoveAll(logsDir) - }) + logsDir := testutil.TempLogDirectory(t) return &CLIExeRunner{ Exe: filepath.FromSlash(exe), diff --git a/tests/testenv/cli_test_env.go b/tests/testenv/cli_test_env.go index 38178086d..266f55ddf 100644 --- a/tests/testenv/cli_test_env.go +++ b/tests/testenv/cli_test_env.go @@ -3,9 +3,7 @@ import ( "bufio" - "fmt" "io" - "io/ioutil" "path/filepath" "runtime" "strings" @@ -22,8 +20,6 @@ const ( // TestRepoPassword is a password for repositories created in tests. TestRepoPassword = "qWQPJ2hiiLgWRRCr" - - maxOutputLinesToLog = 4000 ) // CLIRunner encapsulates running kopia subcommands for testing purposes. @@ -85,38 +81,6 @@ func NewCLITest(t *testing.T, runner CLIRunner) *CLITest { } } -func dumpLogs(t *testing.T, dirname string) { - t.Helper() - - entries, err := ioutil.ReadDir(dirname) - if err != nil { - t.Errorf("unable to read %v: %v", dirname, err) - - return - } - - for _, e := range entries { - if e.IsDir() { - dumpLogs(t, filepath.Join(dirname, e.Name())) - continue - } - - dumpLogFile(t, filepath.Join(dirname, e.Name())) - } -} - -func dumpLogFile(t *testing.T, fname string) { - t.Helper() - - data, err := ioutil.ReadFile(fname) - if err != nil { - t.Error(err) - return - } - - t.Logf("LOG FILE: %v %v", fname, trimOutput(string(data))) -} - // RunAndExpectSuccess runs the given command, expects it to succeed and returns its output lines. func (e *CLITest) RunAndExpectSuccess(t *testing.T, args ...string) []string { t.Helper() @@ -250,30 +214,3 @@ func (e *CLITest) Run(t *testing.T, expectedError bool, args ...string) (stdout, return stdout, stderr, gotErr } - -func trimOutput(s string) string { - lines := splitLines(s) - if len(lines) <= maxOutputLinesToLog { - return s - } - - lines2 := append([]string(nil), lines[0:(maxOutputLinesToLog/2)]...) - lines2 = append(lines2, fmt.Sprintf("/* %v lines removed */", len(lines)-maxOutputLinesToLog)) - lines2 = append(lines2, lines[len(lines)-(maxOutputLinesToLog/2):]...) - - return strings.Join(lines2, "\n") -} - -func splitLines(s string) []string { - s = strings.TrimSpace(s) - if s == "" { - return nil - } - - var result []string - for _, l := range strings.Split(s, "\n") { - result = append(result, strings.TrimRight(l, "\r")) - } - - return result -}