Fixed few subtle threading bugs uncovered by stress test and rewrote the test to be model-based (#1157)

* testing: refactored logs directory management

* content: fixed index mutex to be shared across all write sessions

added mutex protection during writecontent/refresh race

* testing: upload log artifacts

* content: bump revision number after index has been added

This fixes a bug where manifest manager in another session for
the same open repository may not see a content added, because they
will prematurely cache the incomplete set of contents.

This took 2 weeks to find.

* manifest: improved log output, fixed unnecessary mutex release

* testing: rewrote stress test to be model-based and more precise
This commit is contained in:
Jarek Kowalski
2021-07-01 21:37:27 -07:00
committed by GitHub
parent d73e0d60ce
commit e64d5b8eab
19 changed files with 929 additions and 336 deletions

View File

@@ -18,3 +18,10 @@ jobs:
run: make test-with-coverage
- name: Publish Coverage Results
run: make ci-publish-coverage
- name: Upload Logs
uses: actions/upload-artifact@v2
with:
name: logs
path: .logs/**/*.log
if-no-files-found: ignore
if: ${{ always() }}

View File

@@ -103,6 +103,13 @@ jobs:
- name: Integration Tests
run: make -j2 ci-integration-tests
continue-on-error: ${{ github.event_name != 'pull_request' }}
- name: Upload Logs
uses: actions/upload-artifact@v2
with:
name: logs
path: .logs/**/*.log
if-no-files-found: ignore
if: ${{ always() }}
- name: Upload Kopia Artifacts
uses: actions/upload-artifact@v2
with:

33
.github/workflows/stress-test.yml vendored Normal file
View File

@@ -0,0 +1,33 @@
name: Stress Test
on:
push:
branches: [ master ]
tags:
- v*
pull_request: {}
schedule:
# run every 2 hours
- cron: '12 */2 * * *'
jobs:
endurance-test:
name: Stress Test
runs-on: ubuntu-latest
steps:
- name: Set up Go.
uses: actions/setup-go@v2
with:
go-version: ^1.16
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Stress Test
run: make stress-test
- name: Upload Logs
uses: actions/upload-artifact@v2
with:
name: logs
path: .logs/**/*.log
if-no-files-found: ignore
if: ${{ always() }}

View File

@@ -157,7 +157,6 @@ ci-integration-tests:
$(MAKE) integration-tests
$(MAKE) integration-tests-index-v2
$(MAKE) robustness-tool-tests
$(MAKE) stress-test
ci-publish-coverage:
ifeq ($(GOOS)/$(GOARCH),linux/amd64)
@@ -250,10 +249,13 @@ ifeq ($(GOOS)/$(GOARCH),linux/amd64)
$(GO_TEST) -count=$(REPEAT_TEST) github.com/kopia/kopia/tests/tools/... github.com/kopia/kopia/tests/robustness/engine/... $(TEST_FLAGS)
endif
stress_test: export KOPIA_LONG_STRESS_TEST=1
stress-test: export KOPIA_STRESS_TEST=1
stress-test: export KOPIA_DEBUG_MANIFEST_MANAGER=1
stress-test: export KOPIA_LOGS_DIR=$(CURDIR)/.logs
stress-test: export KOPIA_KEEP_LOGS=1
stress-test: $(gotestsum)
$(GO_TEST) -count=$(REPEAT_TEST) -timeout 200s github.com/kopia/kopia/tests/stress_test
$(GO_TEST) -count=$(REPEAT_TEST) -timeout 200s github.com/kopia/kopia/tests/repository_stress_test
$(GO_TEST) -count=$(REPEAT_TEST) -timeout 3600s github.com/kopia/kopia/tests/stress_test
$(GO_TEST) -count=$(REPEAT_TEST) -timeout 3600s github.com/kopia/kopia/tests/repository_stress_test
layering-test:
ifneq ($(GOOS),windows)

View File

@@ -1,6 +1,7 @@
package testutil
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
@@ -9,6 +10,14 @@
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/clock"
)
const (
maxOutputLinesToLog = 4000
logsDirPermissions = 0o750
)
var interestingLengths = []int{10, 50, 100, 240, 250, 260, 270}
@@ -55,3 +64,99 @@ func TempDirectory(t *testing.T) string {
return d
}
// TempLogDirectory returns a temporary directory used for storing logs.
// If KOPIA_LOGS_DIR is provided.
func TempLogDirectory(t *testing.T) string {
t.Helper()
cleanName := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(
t.Name(),
"/", "_"), "\\", "_"), ":", "_")
t.Helper()
logsBaseDir := os.Getenv("KOPIA_LOGS_DIR")
if logsBaseDir == "" {
logsBaseDir = filepath.Join(os.TempDir(), "kopia-logs")
}
logsDir := filepath.Join(logsBaseDir, cleanName+"."+clock.Now().Local().Format("20060102150405"))
require.NoError(t, os.MkdirAll(logsDir, logsDirPermissions))
t.Cleanup(func() {
if os.Getenv("KOPIA_KEEP_LOGS") != "" {
t.Logf("logs preserved in %v", logsDir)
return
}
if t.Failed() && os.Getenv("KOPIA_DISABLE_LOG_DUMP_ON_FAILURE") == "" {
dumpLogs(t, logsDir)
}
os.RemoveAll(logsDir) // nolint:errcheck
})
return logsDir
}
func dumpLogs(t *testing.T, dirname string) {
t.Helper()
entries, err := ioutil.ReadDir(dirname)
if err != nil {
t.Errorf("unable to read %v: %v", dirname, err)
return
}
for _, e := range entries {
if e.IsDir() {
dumpLogs(t, filepath.Join(dirname, e.Name()))
continue
}
dumpLogFile(t, filepath.Join(dirname, e.Name()))
}
}
func dumpLogFile(t *testing.T, fname string) {
t.Helper()
// nolint:gosec
data, err := ioutil.ReadFile(fname)
if err != nil {
t.Error(err)
return
}
t.Logf("LOG FILE: %v %v", fname, trimOutput(string(data)))
}
func trimOutput(s string) string {
lines := splitLines(s)
if len(lines) <= maxOutputLinesToLog {
return s
}
lines2 := append([]string(nil), lines[0:(maxOutputLinesToLog/2)]...) // nolint:gomnd
lines2 = append(lines2, fmt.Sprintf("/* %v lines removed */", len(lines)-maxOutputLinesToLog))
lines2 = append(lines2, lines[len(lines)-(maxOutputLinesToLog/2):]...) // nolint:gomnd
return strings.Join(lines2, "\n")
}
func splitLines(s string) []string {
s = strings.TrimSpace(s)
if s == "" {
return nil
}
var result []string
for _, l := range strings.Split(s, "\n") {
result = append(result, strings.TrimRight(l, "\r"))
}
return result
}

View File

@@ -66,7 +66,11 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
}
func (c *committedContentIndex) addIndexBlob(ctx context.Context, indexBlobID blob.ID, data []byte, use bool) error {
atomic.AddInt64(&c.rev, 1)
// ensure we bump revision number AFTER this function
// doing it prematurely might confuse callers of revision() who may cache
// a set of old contents and associate it with new revision, before new contents
// are actually available.
defer atomic.AddInt64(&c.rev, 1)
if err := c.cache.addContentToCache(ctx, indexBlobID, data); err != nil {
return errors.Wrap(err, "error adding content to cache")
@@ -160,7 +164,7 @@ func (c *committedContentIndex) use(ctx context.Context, indexFiles []blob.ID) e
return nil
}
c.log.Debugf("use-indexes", indexFiles)
c.log.Debugf("use-indexes %v", indexFiles)
mergedAndCombined, newInUse, err := c.merge(ctx, indexFiles)
if err != nil {

View File

@@ -5,6 +5,7 @@
"context"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
@@ -53,6 +54,11 @@ type SharedManager struct {
enc *encryptedBlobMgr
timeNow func() time.Time
// lock to protect the set of commtited indexes
// shared lock will be acquired when writing new content to allow it to happen in parallel
// exclusive lock will be acquired during compaction or refresh.
indexesLock sync.RWMutex
format FormattingOptions
checkInvariantsOnUnlock bool
writeFormatVersion int32 // format version to write

View File

@@ -375,6 +375,12 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
bm.onUpload(int64(len(data)))
// we must hold a lock between writing an index and adding index blob to committed contents index
// otherwise it is possible for concurrent compaction or refresh to forget about the blob we have just
// written
bm.indexesLock.RLock()
defer bm.indexesLock.RUnlock()
indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data, bm.currentSessionInfo.ID)
if err != nil {
return errors.Wrap(err, "error writing index blob")
@@ -745,21 +751,6 @@ func (bm *WriteManager) unlock() {
bm.mu.Unlock()
}
// Refresh reloads the committed content indexes.
func (bm *WriteManager) Refresh(ctx context.Context) error {
bm.lock()
defer bm.unlock()
bm.log.Debugf("Refresh started")
t0 := clock.Now()
err := bm.loadPackIndexesUnlocked(ctx)
bm.log.Debugf("Refresh completed in %v", clock.Since(t0))
return err
}
// SyncMetadataCache synchronizes metadata cache with metadata blobs in storage.
func (bm *WriteManager) SyncMetadataCache(ctx context.Context) error {
if cm, ok := bm.metadataCache.(*contentCacheForMetadata); ok {

View File

@@ -7,6 +7,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/repo/blob"
)
@@ -29,21 +30,36 @@ func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration {
return defaultEventualConsistencySettleTime
}
// Refresh reloads the committed content indexes.
func (sm *SharedManager) Refresh(ctx context.Context) error {
sm.indexesLock.Lock()
defer sm.indexesLock.Unlock()
sm.log.Debugf("Refresh started")
t0 := clock.Now()
err := sm.loadPackIndexesUnlocked(ctx)
sm.log.Debugf("Refresh completed in %v", clock.Since(t0))
return err
}
// CompactIndexes performs compaction of index blobs ensuring that # of small index blobs is below opt.maxSmallBlobs.
func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) error {
func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions) error {
// we must hold the lock here to avoid the race with Refresh() which can reload the
// current set of indexes while we process them.
bm.mu.Lock()
defer bm.mu.Unlock()
sm.indexesLock.Lock()
defer sm.indexesLock.Unlock()
bm.log.Debugf("CompactIndexes(%+v)", opt)
sm.log.Debugf("CompactIndexes(%+v)", opt)
if err := bm.indexBlobManager.compact(ctx, opt); err != nil {
if err := sm.indexBlobManager.compact(ctx, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
}
// reload indexes after compaction.
if err := bm.loadPackIndexesUnlocked(ctx); err != nil {
if err := sm.loadPackIndexesUnlocked(ctx); err != nil {
return errors.Wrap(err, "error re-loading indexes")
}

View File

@@ -5,9 +5,13 @@
"compress/gzip"
"context"
"encoding/json"
"fmt"
"os"
"sort"
"sync"
"github.com/pkg/errors"
"golang.org/x/exp/rand"
"github.com/kopia/kopia/repo/content"
)
@@ -16,6 +20,8 @@
type committedManifestManager struct {
b contentManager
debugID string
cmmu sync.Mutex
lastRevision int64
locked bool
@@ -24,28 +30,48 @@ type committedManifestManager struct {
}
func (m *committedManifestManager) getCommittedEntryOrNil(ctx context.Context, id ID) (*manifestEntry, error) {
if err := m.ensureInitialized(ctx); err != nil {
return nil, err
}
m.lock()
defer m.unlock()
if err := m.ensureInitializedLocked(ctx); err != nil {
return nil, err
}
return m.committedEntries[id], nil
}
func (m *committedManifestManager) findCommittedEntries(ctx context.Context, labels map[string]string) (map[ID]*manifestEntry, error) {
if err := m.ensureInitialized(ctx); err != nil {
return nil, err
func (m *committedManifestManager) dump(ctx context.Context, prefix string) {
if m.debugID == "" {
return
}
var keys []string
for k := range m.committedEntries {
keys = append(keys, string(k))
}
sort.Strings(keys)
log(ctx).Debugf(prefix+"["+m.debugID+"] committed keys %v: %v rev=%v", len(keys), keys, m.lastRevision)
}
func (m *committedManifestManager) findCommittedEntries(ctx context.Context, labels map[string]string) (map[ID]*manifestEntry, error) {
m.lock()
defer m.unlock()
if err := m.ensureInitializedLocked(ctx); err != nil {
return nil, err
}
return findEntriesMatchingLabels(m.committedEntries, labels), nil
}
func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) {
if len(entries) == 0 {
return nil, nil
}
m.lock()
defer m.unlock()
@@ -243,21 +269,25 @@ func (m *committedManifestManager) mergeEntryLocked(e *manifestEntry) {
}
}
func (m *committedManifestManager) ensureInitialized(ctx context.Context) error {
m.lock()
defer m.unlock()
func (m *committedManifestManager) ensureInitializedLocked(ctx context.Context) error {
rev := m.b.Revision()
if m.lastRevision == rev {
if m.debugID != "" {
log(ctx).Debugf("%v up-to-date rev=%v last=%v", m.debugID, rev, m.lastRevision)
}
return nil
}
log(ctx).Debugf("reloading committed manifest contents: rev=%v last=%v", rev, m.lastRevision)
if err := m.loadCommittedContentsLocked(ctx); err != nil {
return err
}
m.lastRevision = rev
m.dump(ctx, "after ensureInitialized: ")
// it is possible that the content manager revision has changed while we were reading it,
// that's ok - we read __some__ consistent set of data and next time we will invalidate again.
@@ -301,8 +331,14 @@ func loadManifestContent(ctx context.Context, b contentManager, contentID conten
}
func newCommittedManager(b contentManager) *committedManifestManager {
debugID := ""
if os.Getenv("KOPIA_DEBUG_MANIFEST_MANAGER") != "" {
debugID = fmt.Sprintf("%x", rand.Int63())
}
return &committedManifestManager{
b: b,
debugID: debugID,
committedEntries: map[ID]*manifestEntry{},
committedContentIDs: map[content.ID]bool{},
}

View File

@@ -0,0 +1,93 @@
package repomodel
import (
"math/rand"
"sync"
"github.com/kopia/kopia/repo/content"
)
// ContentSet represents a set of contents.
type ContentSet struct {
mu sync.Mutex
ids []content.ID
}
// PickRandom picks one random content from the set or empty string.
func (s *ContentSet) PickRandom() content.ID {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.ids) == 0 {
return ""
}
// nolint:gosec
return s.ids[rand.Intn(len(s.ids))]
}
// Snapshot returns the snapshot of all IDs.
func (s *ContentSet) Snapshot() ContentSet {
s.mu.Lock()
defer s.mu.Unlock()
return ContentSet{
ids: append([]content.ID(nil), s.ids...),
}
}
// Replace replaces all elements in the set.
func (s *ContentSet) Replace(ids []content.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append([]content.ID(nil), s.ids...)
}
// Add adds the provided items to the set.
func (s *ContentSet) Add(d ...content.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append(s.ids, d...)
}
// RemoveAll removes the provided items from the set.
func (s *ContentSet) RemoveAll(d ...content.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = removeAllContentIDs(s.ids, d)
}
func removeAllContentIDs(a, b []content.ID) []content.ID {
var result []content.ID
for _, v := range a {
found := false
for _, v2 := range b {
if v2 == v {
found = true
break
}
}
if !found {
result = append(result, v)
}
}
return result
}
// Clear removes all elements from the set.
func (s *ContentSet) Clear() ContentSet {
s.mu.Lock()
defer s.mu.Unlock()
old := s.ids
s.ids = nil
return ContentSet{ids: old}
}

View File

@@ -0,0 +1,93 @@
package repomodel
import (
"math/rand"
"sync"
"github.com/kopia/kopia/repo/manifest"
)
// ManifestSet represents a set of manifests.
type ManifestSet struct {
mu sync.Mutex
ids []manifest.ID
}
// PickRandom picks one random manifest from the set or empty string.
func (s *ManifestSet) PickRandom() manifest.ID {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.ids) == 0 {
return ""
}
// nolint:gosec
return s.ids[rand.Intn(len(s.ids))]
}
// Snapshot returns the snapshot of all IDs.
func (s *ManifestSet) Snapshot() ManifestSet {
s.mu.Lock()
defer s.mu.Unlock()
return ManifestSet{
ids: append([]manifest.ID(nil), s.ids...),
}
}
// Replace replaces all elements in the set.
func (s *ManifestSet) Replace(ids []manifest.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append([]manifest.ID(nil), s.ids...)
}
// Add adds the provided items to the set.
func (s *ManifestSet) Add(d ...manifest.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append(s.ids, d...)
}
// RemoveAll removes the provided items from the set.
func (s *ManifestSet) RemoveAll(d ...manifest.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = removeAllManifestIDs(s.ids, d)
}
func removeAllManifestIDs(a, b []manifest.ID) []manifest.ID {
var result []manifest.ID
for _, v := range a {
found := false
for _, v2 := range b {
if v2 == v {
found = true
break
}
}
if !found {
result = append(result, v)
}
}
return result
}
// Clear removes all elements from the set.
func (s *ManifestSet) Clear() ManifestSet {
s.mu.Lock()
defer s.mu.Unlock()
old := s.ids
s.ids = nil
return ManifestSet{ids: old}
}

View File

@@ -0,0 +1,28 @@
package repomodel
import "sync"
// OpenRepository models the behavior of an open repository.
type OpenRepository struct {
RepoData *RepositoryData
Contents ContentSet
Manifests ManifestSet
EnableMaintenance bool
mu sync.Mutex
}
// Refresh refreshes the set of committed Contents and manifest from repositor.
func (o *OpenRepository) Refresh() {
o.Contents.Replace(o.RepoData.Contents.Snapshot().ids)
o.Manifests.Replace(o.RepoData.Manifests.Snapshot().ids)
}
// NewSession creates new model for a session to access a repository.
func (o *OpenRepository) NewSession() *RepositorySession {
return &RepositorySession{
OpenRepo: o,
}
}

View File

@@ -0,0 +1,30 @@
// Package repomodel provides simplified model of repository operaton.
package repomodel
import "sync/atomic"
// RepositoryData models the d stored in the repository.
type RepositoryData struct {
Contents ContentSet
Manifests ManifestSet
openCounter *int32
}
// OpenRepository returns an OpenRepository model based on current snapshot of RepositoryData.
func (d *RepositoryData) OpenRepository() *OpenRepository {
return &OpenRepository{
RepoData: d,
Contents: d.Contents.Snapshot(),
Manifests: d.Manifests.Snapshot(),
EnableMaintenance: atomic.AddInt32(d.openCounter, 1) == 1,
}
}
// NewRepositoryData creates new RepositoryData model.
func NewRepositoryData() *RepositoryData {
return &RepositoryData{
openCounter: new(int32),
}
}

View File

@@ -0,0 +1,47 @@
package repomodel
import (
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
)
// RepositorySession models the behavior of a single session in an repository.
type RepositorySession struct {
OpenRepo *OpenRepository
WrittenContents ContentSet
WrittenManifests ManifestSet
}
// WriteContent adds the provided content ID to the model.
func (s *RepositorySession) WriteContent(cid content.ID) {
s.WrittenContents.Add(cid)
}
// WriteManifest adds the provided manifest ID to the model.
func (s *RepositorySession) WriteManifest(mid manifest.ID) {
s.WrittenManifests.Add(mid)
}
// Refresh refreshes the set of committed contents and manifest from repositor.
func (s *RepositorySession) Refresh() {
s.OpenRepo.Refresh()
}
// Flush flushes the changes written in this RepositorySession and makes them available
// to other RepositoryData model.
func (s *RepositorySession) Flush(wc *ContentSet, wm *ManifestSet) {
s.OpenRepo.mu.Lock()
defer s.OpenRepo.mu.Unlock()
// data flushed is visible to other sessions in the same open repository.
s.OpenRepo.Contents.Add(wc.ids...)
s.OpenRepo.Manifests.Add(wm.ids...)
// data flushed is visible to other sessions in other open repositories.
s.OpenRepo.RepoData.Contents.Add(wc.ids...)
s.OpenRepo.RepoData.Manifests.Add(wm.ids...)
s.WrittenContents.RemoveAll(wc.ids...)
s.WrittenManifests.RemoveAll(wm.ids...)
}

View File

@@ -5,36 +5,194 @@
cryptorand "crypto/rand"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob/filesystem"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/tests/repository_stress_test/repomodel"
)
const (
shortStressTestDuration = 10 * time.Second
longStressTestDuration = 120 * time.Second
)
type actName string
const (
actWriteRandomContent = "writeRandomContent"
actReadPendingContent = "readPendingContent"
actReadFlushContent = "readFlushedContent"
actListContents = "listContents"
actListAndReadAllContents = "listAndReadAllContents"
actCompact = "compact"
actFlush = "flush"
actRefresh = "refresh"
actReadPendingManifest = "readPendingManifest"
actReadFlushedManifest = "readFlushedManifest"
actWriteRandomManifest = "writeRandomManifest"
)
var actions = map[actName]func(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error{
actWriteRandomContent: writeRandomContent,
actReadPendingContent: readPendingContent,
actReadFlushContent: readFlushedContent,
actListContents: listContents,
actListAndReadAllContents: listAndReadAllContents,
actCompact: compact,
actFlush: flush,
actRefresh: refresh,
actReadPendingManifest: readPendingManifest,
actReadFlushedManifest: readFlushedManifest,
actWriteRandomManifest: writeRandomManifest,
}
type StressOptions struct {
ConfigsPerRepository int
OpenRepositoriesPerConfig int
SessionsPerOpenRepository int
WorkersPerSession int
ActionWeights map[actName]int
}
var errSkipped = errors.Errorf("skipped")
const masterPassword = "foo-bar-baz-1234"
var (
knownBlocks []content.ID
knownBlocksMutex sync.Mutex
)
func init() {
rand.Seed(clock.Now().UnixNano())
}
func TestStressRepository(t *testing.T) {
if testing.Short() {
t.Skip("skipping stress test during short tests")
func TestStressRepositoryMixAll(t *testing.T) {
runStress(t, &StressOptions{
ConfigsPerRepository: 2,
OpenRepositoriesPerConfig: 2,
SessionsPerOpenRepository: 2,
WorkersPerSession: 2,
ActionWeights: map[actName]int{
actWriteRandomContent: 10,
actReadPendingContent: 500,
actReadFlushContent: 500,
actListContents: 10,
actListAndReadAllContents: 10,
actCompact: 2,
actFlush: 10,
actRefresh: 20,
actReadPendingManifest: 300,
actReadFlushedManifest: 300,
actWriteRandomManifest: 20,
},
})
}
func TestStressRepositoryRandomMix(t *testing.T) {
runStress(t, &StressOptions{
ConfigsPerRepository: 1 + rand.Intn(3),
OpenRepositoriesPerConfig: 1 + rand.Intn(2),
SessionsPerOpenRepository: 1 + rand.Intn(2),
WorkersPerSession: 1 + rand.Intn(2),
ActionWeights: map[actName]int{
actWriteRandomContent: 1 + rand.Intn(100),
actReadPendingContent: 1 + rand.Intn(100),
actReadFlushContent: 1 + rand.Intn(100),
actListContents: 1 + rand.Intn(100),
actListAndReadAllContents: 1 + rand.Intn(100),
actCompact: 1 + rand.Intn(100),
actFlush: 1 + rand.Intn(100),
actRefresh: 1 + rand.Intn(100),
actReadPendingManifest: 1 + rand.Intn(100),
actReadFlushedManifest: 1 + rand.Intn(100),
actWriteRandomManifest: 1 + rand.Intn(100),
},
})
}
func TestStressRepositoryManifests(t *testing.T) {
runStress(t, &StressOptions{
ConfigsPerRepository: 1,
OpenRepositoriesPerConfig: 1,
SessionsPerOpenRepository: 2,
WorkersPerSession: 1,
ActionWeights: map[actName]int{
actCompact: 2,
actFlush: 10,
actRefresh: 20,
actReadPendingManifest: 300,
actReadFlushedManifest: 300,
actWriteRandomManifest: 1,
},
})
}
func TestStressContentWriteHeavy(t *testing.T) {
runStress(t, &StressOptions{
ConfigsPerRepository: 2,
OpenRepositoriesPerConfig: 2,
SessionsPerOpenRepository: 2,
WorkersPerSession: 2,
ActionWeights: map[actName]int{
actWriteRandomContent: 500,
actReadPendingContent: 10,
actReadFlushContent: 10,
actListContents: 10,
actListAndReadAllContents: 10,
actCompact: 2,
actFlush: 10,
actRefresh: 20,
},
})
}
func TestStressContentReadHeavy(t *testing.T) {
runStress(t, &StressOptions{
ConfigsPerRepository: 2,
OpenRepositoriesPerConfig: 2,
SessionsPerOpenRepository: 2,
WorkersPerSession: 2,
ActionWeights: map[actName]int{
actWriteRandomContent: 5,
actReadPendingContent: 500,
actReadFlushContent: 500,
actListContents: 500,
actListAndReadAllContents: 10,
actCompact: 2,
actFlush: 10,
actRefresh: 20,
},
})
}
// nolint:thelper
func runStress(t *testing.T, opt *StressOptions) {
if os.Getenv("KOPIA_STRESS_TEST") == "" {
t.Skip("skipping stress test")
}
t.Logf("running stress test with options: %#v", *opt)
ctx := testlogging.Context(t)
tmpPath, err := ioutil.TempDir("", "kopia")
@@ -51,8 +209,6 @@ func TestStressRepository(t *testing.T) {
t.Logf("path: %v", tmpPath)
storagePath := filepath.Join(tmpPath, "storage")
configFile1 := filepath.Join(tmpPath, "kopia1.config")
configFile2 := filepath.Join(tmpPath, "kopia2.config")
assertNoError(t, os.MkdirAll(storagePath, 0o700))
@@ -64,281 +220,302 @@ func TestStressRepository(t *testing.T) {
}
// create repository
if err := repo.Initialize(ctx, st, &repo.NewRepositoryOptions{}, masterPassword); err != nil {
if err = repo.Initialize(ctx, st, &repo.NewRepositoryOptions{}, masterPassword); err != nil {
t.Fatalf("unable to initialize repository: %v", err)
}
var configFiles []string
// set up two parallel kopia connections, each with its own config file and cache.
if err := repo.Connect(ctx, configFile1, st, masterPassword, &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: filepath.Join(tmpPath, "cache1"),
MaxCacheSizeBytes: 2000000000,
},
}); err != nil {
t.Fatalf("unable to connect 1: %v", err)
for i := 0; i < opt.ConfigsPerRepository; i++ {
configFile := filepath.Join(tmpPath, fmt.Sprintf("kopia-%v.config", i))
configFiles = append(configFiles, configFile)
if err = repo.Connect(ctx, configFile, st, masterPassword, &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: filepath.Join(tmpPath, fmt.Sprintf("cache-%v", i)),
MaxCacheSizeBytes: 2000000000,
},
}); err != nil {
t.Fatalf("unable to connect %v: %v", configFile, err)
}
}
if err := repo.Connect(ctx, configFile2, st, masterPassword, &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: filepath.Join(tmpPath, "cache2"),
MaxCacheSizeBytes: 2000000000,
},
}); err != nil {
t.Fatalf("unable to connect 2: %v", err)
stop := new(int32)
eg, ctx := errgroup.WithContext(ctx)
logDir := testutil.TempLogDirectory(t)
rm := repomodel.NewRepositoryData()
logFileName := filepath.Join(logDir, "workers.log")
logFile, err := os.Create(logFileName)
require.NoError(t, err)
t.Logf("log file: %v", logFileName)
defer logFile.Close()
for _, configFile := range configFiles {
configFile := configFile
for i := 0; i < opt.OpenRepositoriesPerConfig; i++ {
i := i
eg.Go(func() error {
log := logging.WithPrefix(fmt.Sprintf("%v::o%v", filepath.Base(configFile), i), logging.Broadcast{
logging.Printf(func(msg string, args ...interface{}) {
fmt.Fprintf(logFile, clock.Now().Format("2006-01-02T15:04:05.000000Z07:00")+" "+msg+"\n", args...)
})("test"),
})
ctx2 := logging.WithLogger(ctx, func(module string) logging.Logger {
return log
})
return longLivedRepositoryTest(ctx2, t, configFile, rm, log, opt, stop)
})
}
}
cancel := make(chan struct{})
duration := shortStressTestDuration
if os.Getenv("CI") != "" && os.Getenv("IS_PULL_REQUEST") == "false" {
duration = longStressTestDuration
}
var wg sync.WaitGroup
time.Sleep(duration)
atomic.StoreInt32(stop, 1)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile1, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg)
wg.Add(1)
go longLivedRepositoryTest(ctx, t, cancel, configFile2, &wg)
time.Sleep(5 * time.Second)
close(cancel)
wg.Wait()
require.NoError(t, eg.Wait())
}
func longLivedRepositoryTest(ctx context.Context, t *testing.T, cancel chan struct{}, configFile string, wg *sync.WaitGroup) {
func longLivedRepositoryTest(ctx context.Context, t *testing.T, configFile string, rm *repomodel.RepositoryData, log logging.Logger, opt *StressOptions, stop *int32) error {
t.Helper()
defer wg.Done()
// important to call OpenRepository() before repo.Open() to ensure we're not seeing state
// added between repo.Open() and OpenRepository()
or := rm.OpenRepository()
rep, err := repo.Open(ctx, configFile, masterPassword, &repo.Options{})
if err != nil {
t.Errorf("error opening repository: %v", err)
return
return errors.Wrap(err, "error opening repository")
}
defer rep.Close(ctx)
_, w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{Purpose: "longLivedRepositoryTest"})
if err != nil {
t.Errorf("error opening writer: %v", err)
return
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < opt.SessionsPerOpenRepository; i++ {
ors := or.NewSession()
_, w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{
Purpose: fmt.Sprintf("longLivedRepositoryTest-w%v", i),
})
if err != nil {
return errors.Wrap(err, "error opening writer")
}
for j := 0; j < opt.WorkersPerSession; j++ {
log2 := logging.WithPrefix(fmt.Sprintf("s%vw%v::", i, j), log)
eg.Go(func() error {
return repositoryTest(ctx, t, stop, w, ors, log2, opt)
})
}
}
var wg2 sync.WaitGroup
for i := 0; i < 4; i++ {
wg2.Add(1)
go func() {
defer wg2.Done()
repositoryTest(ctx, t, cancel, w)
}()
}
wg2.Wait()
return eg.Wait()
}
func repositoryTest(ctx context.Context, t *testing.T, cancel chan struct{}, rep repo.DirectRepositoryWriter) {
func repositoryTest(ctx context.Context, t *testing.T, stop *int32, rep repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger, opt *StressOptions) error {
t.Helper()
workTypes := []*struct {
name string
fun func(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error
weight int
hitCount int
}{
// {"reopen", reopen, 1, 0},
{"writeRandomBlock", writeRandomBlock, 100, 0},
{"writeRandomManifest", writeRandomManifest, 100, 0},
{"readKnownBlock", readKnownBlock, 500, 0},
{"listContents", listContents, 50, 0},
{"listAndReadAllContents", listAndReadAllContents, 5, 0},
{"readRandomManifest", readRandomManifest, 50, 0},
{"compact", compact, 1, 0},
{"refresh", refresh, 3, 0},
{"flush", flush, 1, 0},
}
var totalWeight int
for _, w := range workTypes {
totalWeight += w.weight
for _, w := range opt.ActionWeights {
totalWeight += w
}
iter := 0
for {
select {
case <-cancel:
return
default:
}
if iter%1000 == 0 {
var bits []string
for _, w := range workTypes {
bits = append(bits, fmt.Sprintf("%v:%v", w.name, w.hitCount))
}
log.Printf("#%v %v %v goroutines", iter, strings.Join(bits, " "), runtime.NumGoroutine())
}
iter++
for ctx.Err() == nil && atomic.LoadInt32(stop) == 0 {
roulette := rand.Intn(totalWeight)
for _, w := range workTypes {
if roulette < w.weight {
w.hitCount++
for act, weight := range opt.ActionWeights {
if roulette < weight {
if err := actions[act](ctx, rep, rs, log); err != nil {
if errors.Is(err, errSkipped) {
break
}
if err := w.fun(ctx, t, rep); err != nil {
w.hitCount++
t.Errorf("error: %v", errors.Wrapf(err, "error running %v", w.name))
log.Errorf("FAILED %v: %v", act, err)
return
return errors.Wrapf(err, "error running %v", act)
}
log.Errorf("SUCCEEDED %v", act)
break
}
roulette -= w.weight
roulette -= weight
}
}
return nil
}
func writeRandomBlock(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
func writeRandomContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
data := make([]byte, 1000)
cryptorand.Read(data)
contentID, err := r.ContentManager().WriteContent(ctx, data, "", content.NoCompression)
if err == nil {
knownBlocksMutex.Lock()
if len(knownBlocks) >= 1000 {
n := rand.Intn(len(knownBlocks))
knownBlocks[n] = contentID
} else {
knownBlocks = append(knownBlocks, contentID)
}
knownBlocksMutex.Unlock()
if err != nil {
return errors.Wrap(err, "WriteContent error")
}
return err
log.Debugf("writeRandomContent(%v,%x)", contentID, data[0:16])
rs.WriteContent(contentID)
return errors.Wrapf(err, "writeRandomContent(%v)", contentID)
}
func readKnownBlock(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
knownBlocksMutex.Lock()
if len(knownBlocks) == 0 {
knownBlocksMutex.Unlock()
return nil
func readPendingContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
contentID := rs.WrittenContents.PickRandom()
if contentID == "" {
return errSkipped
}
contentID := knownBlocks[rand.Intn(len(knownBlocks))]
knownBlocksMutex.Unlock()
log.Debugf("readPendingContent(%v)", contentID)
_, err := r.ContentReader().GetContent(ctx, contentID)
if err == nil || errors.Is(err, content.ErrContentNotFound) {
if err == nil {
return nil
}
return err
return errors.Wrapf(err, "readPendingContent(%v)", contentID)
}
func listContents(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
func readFlushedContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
contentID := rs.OpenRepo.Contents.PickRandom()
if contentID == "" {
return errSkipped
}
return r.ContentReader().IterateContents(
log.Debugf("readFlushedContent(%v)", contentID)
_, err := r.ContentReader().GetContent(ctx, contentID)
if err == nil {
return nil
}
return errors.Wrapf(err, "readFlushedContent(%v)", contentID)
}
func listContents(ctx context.Context, r repo.DirectRepositoryWriter, _ *repomodel.RepositorySession, log logging.Logger) error {
log.Debugf("listContents()")
return errors.Wrapf(r.ContentReader().IterateContents(
ctx,
content.IterateOptions{},
func(i content.Info) error { return nil },
)
), "listContents()")
}
func listAndReadAllContents(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
func listAndReadAllContents(ctx context.Context, r repo.DirectRepositoryWriter, _ *repomodel.RepositorySession, log logging.Logger) error {
log.Debugf("listAndReadAllContents()")
return r.ContentReader().IterateContents(
return errors.Wrapf(r.ContentReader().IterateContents(
ctx,
content.IterateOptions{},
func(ci content.Info) error {
cid := ci.GetContentID()
_, err := r.ContentReader().GetContent(ctx, cid)
if err != nil {
if errors.Is(err, content.ErrContentNotFound) && strings.HasPrefix(string(cid), "m") {
// this is ok, sometimes manifest manager will perform compaction and 'm' contents will be marked as deleted
return nil
}
return errors.Wrapf(err, "error reading content %v", cid)
}
return nil
})
}), "listAndReadAllContents()")
}
func compact(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
return r.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1})
}
func flush(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
return r.Flush(ctx)
}
func refresh(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
return r.Refresh(ctx)
}
func readRandomManifest(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
manifests, err := r.FindManifests(ctx, nil)
if err != nil {
return err
func compact(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
if !rs.OpenRepo.EnableMaintenance {
return errSkipped
}
if len(manifests) == 0 {
log.Debugf("compact()")
return errors.Wrapf(
r.ContentManager().CompactIndexes(ctx, content.CompactOptions{MaxSmallBlobs: 1}),
"compact()")
}
func flush(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
log.Debugf("flush()")
// capture contents and manifests we had before we start flushing.
// this is necessary since operations can proceed in parallel to Flush() which might add more data
// to the model. It would be incorrect to flush the latest state of the model
// because we don't know for sure if the corresponding repository data has indeed been flushed.
wc := rs.WrittenContents.Snapshot()
wm := rs.WrittenManifests.Snapshot()
if err := r.Flush(ctx); err != nil {
return errors.Wrap(err, "error flushing")
}
// flush model after flushing the repository to communicate to other sessions that they can expect
// to see flushed items now.
rs.Flush(&wc, &wm)
return nil
}
func refresh(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
log.Debugf("refresh()")
// refresh model before refreshing repository to guarantee that repository has at least all the items in
// the model (possibly more).
rs.Refresh()
if err := r.Refresh(ctx); err != nil {
return errors.Wrap(err, "refresh error")
}
return nil
}
func readPendingManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
manifestID := rs.WrittenManifests.PickRandom()
if manifestID == "" {
return errSkipped
}
log.Debugf("readPendingManifest(%v)", manifestID)
_, err := r.GetManifest(ctx, manifestID, nil)
if err == nil {
return nil
}
n := rand.Intn(len(manifests))
_, err = r.GetManifest(ctx, manifests[n].ID, nil)
return err
return errors.Wrapf(err, "readPendingManifest(%v)", manifestID)
}
func writeRandomManifest(ctx context.Context, t *testing.T, r repo.DirectRepositoryWriter) error {
t.Helper()
func readFlushedManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
manifestID := rs.OpenRepo.Manifests.PickRandom()
if manifestID == "" {
return errSkipped
}
log.Debugf("readFlushedManifest(%v)", manifestID)
_, err := r.GetManifest(ctx, manifestID, nil)
if err == nil {
return nil
}
return errors.Wrapf(err, "readFlushedManifest(%v)", manifestID)
}
func writeRandomManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
key1 := fmt.Sprintf("key-%v", rand.Intn(10))
key2 := fmt.Sprintf("key-%v", rand.Intn(10))
val1 := fmt.Sprintf("val1-%v", rand.Intn(10))
@@ -347,7 +524,8 @@ func writeRandomManifest(ctx context.Context, t *testing.T, r repo.DirectReposit
content2 := fmt.Sprintf("content-%v", rand.Intn(10))
content1val := fmt.Sprintf("val1-%v", rand.Intn(10))
content2val := fmt.Sprintf("val2-%v", rand.Intn(10))
_, err := r.PutManifest(ctx, map[string]string{
mid, err := r.PutManifest(ctx, map[string]string{
"type": key1,
key1: val1,
key2: val2,
@@ -355,6 +533,12 @@ func writeRandomManifest(ctx context.Context, t *testing.T, r repo.DirectReposit
content1: content1val,
content2: content2val,
})
if err != nil {
return err
}
log.Debugf("writeRandomManifest(%v)", mid)
rs.WriteManifest(mid)
return err
}

View File

@@ -20,8 +20,8 @@
const goroutineCount = 16
func TestStressBlockManager(t *testing.T) {
if testing.Short() {
t.Skip("skipping stress test during short tests")
if os.Getenv("KOPIA_STRESS_TEST") == "" {
t.Skip("skipping stress test")
}
data := blobtesting.DataMap{}
@@ -29,7 +29,7 @@ func TestStressBlockManager(t *testing.T) {
memst := blobtesting.NewMapStorage(data, keyTimes, clock.Now)
duration := 3 * time.Second
if os.Getenv("KOPIA_LONG_STRESS_TEST") != "" {
if os.Getenv("CI") != "" {
duration = 30 * time.Second
}

View File

@@ -8,7 +8,7 @@
"strings"
"testing"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/testutil"
)
// CLIExeRunner is a CLIExeRunner that invokes the commands via external executable.
@@ -85,33 +85,7 @@ func NewExeRunner(t *testing.T) *CLIExeRunner {
// unset environment variables that disrupt tests when passed to subprocesses.
os.Unsetenv("KOPIA_PASSWORD")
cleanName := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(
t.Name(),
"/", "_"), "\\", "_"), ":", "_")
logsBaseDir := os.Getenv("KOPIA_LOGS_DIR")
if logsBaseDir == "" {
logsBaseDir = filepath.Join(os.TempDir(), "kopia-logs")
}
logsDir := filepath.Join(logsBaseDir, cleanName+"."+clock.Now().Local().Format("20060102150405"))
t.Cleanup(func() {
if t.Failed() {
t.Logf("FAILURE ABOVE ^^^^")
}
if os.Getenv("KOPIA_KEEP_LOGS") != "" {
t.Logf("logs preserved in %v", logsDir)
return
}
if t.Failed() && os.Getenv("KOPIA_DISABLE_LOG_DUMP_ON_FAILURE") == "" {
dumpLogs(t, logsDir)
}
os.RemoveAll(logsDir)
})
logsDir := testutil.TempLogDirectory(t)
return &CLIExeRunner{
Exe: filepath.FromSlash(exe),

View File

@@ -3,9 +3,7 @@
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"runtime"
"strings"
@@ -22,8 +20,6 @@
const (
// TestRepoPassword is a password for repositories created in tests.
TestRepoPassword = "qWQPJ2hiiLgWRRCr"
maxOutputLinesToLog = 4000
)
// CLIRunner encapsulates running kopia subcommands for testing purposes.
@@ -85,38 +81,6 @@ func NewCLITest(t *testing.T, runner CLIRunner) *CLITest {
}
}
func dumpLogs(t *testing.T, dirname string) {
t.Helper()
entries, err := ioutil.ReadDir(dirname)
if err != nil {
t.Errorf("unable to read %v: %v", dirname, err)
return
}
for _, e := range entries {
if e.IsDir() {
dumpLogs(t, filepath.Join(dirname, e.Name()))
continue
}
dumpLogFile(t, filepath.Join(dirname, e.Name()))
}
}
func dumpLogFile(t *testing.T, fname string) {
t.Helper()
data, err := ioutil.ReadFile(fname)
if err != nil {
t.Error(err)
return
}
t.Logf("LOG FILE: %v %v", fname, trimOutput(string(data)))
}
// RunAndExpectSuccess runs the given command, expects it to succeed and returns its output lines.
func (e *CLITest) RunAndExpectSuccess(t *testing.T, args ...string) []string {
t.Helper()
@@ -250,30 +214,3 @@ func (e *CLITest) Run(t *testing.T, expectedError bool, args ...string) (stdout,
return stdout, stderr, gotErr
}
func trimOutput(s string) string {
lines := splitLines(s)
if len(lines) <= maxOutputLinesToLog {
return s
}
lines2 := append([]string(nil), lines[0:(maxOutputLinesToLog/2)]...)
lines2 = append(lines2, fmt.Sprintf("/* %v lines removed */", len(lines)-maxOutputLinesToLog))
lines2 = append(lines2, lines[len(lines)-(maxOutputLinesToLog/2):]...)
return strings.Join(lines2, "\n")
}
func splitLines(s string) []string {
s = strings.TrimSpace(s)
if s == "" {
return nil
}
var result []string
for _, l := range strings.Split(s, "\n") {
result = append(result, strings.TrimRight(l, "\r"))
}
return result
}