Robustness engine actions with stats and log (#685)

* Robustness engine actions with stats and logging

- Add actions to robustness engine
- Actions wrap other functional behavior and serve as a common interface for collecting stats
- Add stats for the engine, both per run and cumulative over time
- Add a log for actions that the engine has executed
- Add recovery logic to re-sync snapshot metadata after a possible failed engine run (e.g. if metadata wasn't properly persisted).

Current built-in actions:
- snapshot root directory
- restore random snapshot ID into a target restore path
- delete a random snapshot ID
- run GC
- write random files to the local data directory
- delete a random subdirectory under the local data directory
- delete files in a directory
- restore a snapshot ID into the local data directory

Actions are executed according to a set of options, which dictate the relative probabilities of picking a given action, along with ranges for action-specific parameters that can be randomized.
This commit is contained in:
Nick
2020-11-17 01:07:04 -08:00
committed by GitHub
parent ade7975212
commit 71dcbcf2e3
19 changed files with 1903 additions and 194 deletions

View File

@@ -205,9 +205,16 @@ endurance-tests: export KOPIA_EXE ?= $(KOPIA_INTEGRATION_EXE)
endurance-tests: build-integration-test-binary $(gotestsum)
$(GO_TEST) $(TEST_FLAGS) -count=1 -parallel $(PARALLEL) -timeout 3600s github.com/kopia/kopia/tests/endurance_test
robustness-tool-tests: $(gotestsum)
robustness-tests: export KOPIA_EXE ?= $(KOPIA_INTEGRATION_EXE)
robustness-tests: build-integration-test-binary $(gotestsum)
FIO_DOCKER_IMAGE=$(FIO_DOCKER_TAG) \
$(GO_TEST) $(TEST_FLAGS) -count=1 -timeout 90s github.com/kopia/kopia/tests/tools/...
$(GO_TEST) -count=1 github.com/kopia/kopia/tests/robustness/robustness_test $(TEST_FLAGS)
robustness-tool-tests: export KOPIA_EXE ?= $(KOPIA_INTEGRATION_EXE)
robustness-tool-tests: build-integration-test-binary $(gotestsum)
KOPIA_EXE=$(KOPIA_INTEGRATION_EXE) \
FIO_DOCKER_IMAGE=$(FIO_DOCKER_TAG) \
$(GO_TEST) -count=1 github.com/kopia/kopia/tests/tools/... github.com/kopia/kopia/tests/robustness/engine/... $(TEST_FLAGS)
stress-test: $(gotestsum)
KOPIA_LONG_STRESS_TEST=1 $(GO_TEST) -count=1 -timeout 200s github.com/kopia/kopia/tests/stress_test

View File

@@ -10,6 +10,7 @@
"io/ioutil"
"log"
"os"
"strconv"
"time"
"github.com/pkg/errors"
@@ -19,6 +20,11 @@
"github.com/kopia/kopia/tests/robustness/snapmeta"
)
const (
deleteLimitEnvKey = "LIVE_SNAP_DELETE_LIMIT"
defaultDeleteLimit = 10
)
// Checker is an object that can take snapshots and restore them, performing
// a validation for data consistency.
type Checker struct {
@@ -26,21 +32,33 @@ type Checker struct {
snapshotIssuer snap.Snapshotter
snapshotMetadataStore snapmeta.Store
validator Comparer
RecoveryMode bool
DeleteLimit int
}
// NewChecker instantiates a new Checker, returning its pointer. A temporary
// directory is created to mount restored data.
func NewChecker(snapIssuer snap.Snapshotter, snapmetaStore snapmeta.Store, validator Comparer) (*Checker, error) {
restoreDir, err := ioutil.TempDir("", "restore-data-")
func NewChecker(snapIssuer snap.Snapshotter, snapmetaStore snapmeta.Store, validator Comparer, restoreDir string) (*Checker, error) {
restoreDir, err := ioutil.TempDir(restoreDir, "restore-data-")
if err != nil {
return nil, err
}
delLimitStr := os.Getenv(deleteLimitEnvKey)
delLimit, err := strconv.Atoi(delLimitStr)
if err != nil {
log.Printf("using default delete limit %d", defaultDeleteLimit)
delLimit = defaultDeleteLimit
}
return &Checker{
RestoreDir: restoreDir,
snapshotIssuer: snapIssuer,
snapshotMetadataStore: snapmetaStore,
validator: validator,
RecoveryMode: false,
DeleteLimit: delLimit,
}, nil
}
@@ -53,7 +71,7 @@ func (chk *Checker) Cleanup() {
// GetSnapIDs gets the list of snapshot IDs being tracked by the checker's snapshot store.
func (chk *Checker) GetSnapIDs() []string {
return chk.snapshotMetadataStore.GetKeys()
return chk.snapshotMetadataStore.GetKeys(allSnapshotsIdxName)
}
// SnapshotMetadata holds metadata associated with a given snapshot.
@@ -73,18 +91,7 @@ func (chk *Checker) GetSnapshotMetadata(snapID string) (*SnapshotMetadata, error
// GetLiveSnapIDs gets the list of snapshot IDs being tracked by the checker's snapshot store
// that do not have a deletion time associated with them.
func (chk *Checker) GetLiveSnapIDs() []string {
snapIDs := chk.GetSnapIDs()
var ret []string
for _, snapID := range snapIDs {
deleted, err := chk.IsSnapshotIDDeleted(snapID)
if err == nil && !deleted {
ret = append(ret, snapID)
}
}
return ret
return chk.snapshotMetadataStore.GetKeys(liveSnapshotsIdxName)
}
// IsSnapshotIDDeleted reports whether the metadata associated with the provided snapshot ID
@@ -125,14 +132,44 @@ func (chk *Checker) VerifySnapshotMetadata() error {
for _, metaSnapID := range liveSnapsInMetadata {
if _, ok := liveMap[metaSnapID]; !ok {
log.Printf("Metadata present for snapID %v but not found in known metadata", metaSnapID)
errCount++
log.Printf("Metadata present for snapID %v but not found in list of repo snapshots", metaSnapID)
if chk.RecoveryMode {
chk.snapshotMetadataStore.Delete(metaSnapID)
chk.snapshotMetadataStore.RemoveFromIndex(metaSnapID, liveSnapshotsIdxName)
} else {
errCount++
}
}
}
var liveSnapsDeleted int
for _, liveSnapID := range liveSnapsInRepo {
if _, ok := metadataMap[liveSnapID]; !ok {
log.Printf("Live snapshot present for snapID %v but not found in known metadata", liveSnapID)
if _, ok := metadataMap[liveSnapID]; ok {
// Found live snapshot ID in the metadata. No recovery handling needed.
continue
}
log.Printf("Live snapshot present for snapID %v but not found in known metadata", liveSnapID)
if chk.RecoveryMode {
if liveSnapsDeleted >= chk.DeleteLimit {
log.Printf("delete limit (%v) reached", chk.DeleteLimit)
errCount++
}
// Might as well delete the snapshot since we don't have metadata for it
log.Printf("Deleting snapshot ID %s", liveSnapID)
err = chk.snapshotIssuer.DeleteSnapshot(liveSnapID)
if err != nil {
log.Printf("error deleting snapshot: %s", err)
errCount++
}
liveSnapsDeleted++
} else {
errCount++
}
}
@@ -173,6 +210,9 @@ func (chk *Checker) TakeSnapshot(ctx context.Context, sourceDir string) (snapID
return snapID, err
}
chk.snapshotMetadataStore.AddToIndex(snapID, allSnapshotsIdxName)
chk.snapshotMetadataStore.AddToIndex(snapID, liveSnapshotsIdxName)
return snapID, nil
}
@@ -211,6 +251,22 @@ func (chk *Checker) RestoreVerifySnapshot(ctx context.Context, snapID, destPath
return err
}
if ssMeta == nil && chk.RecoveryMode {
var b []byte
b, err = chk.validator.Gather(ctx, destPath)
if err != nil {
return err
}
ssMeta = &SnapshotMetadata{
SnapID: snapID,
ValidationData: b,
}
return chk.saveSnapshotMetadata(ssMeta)
}
err = chk.validator.Compare(ctx, destPath, ssMeta.ValidationData, reportOut)
if err != nil {
return err
@@ -219,6 +275,12 @@ func (chk *Checker) RestoreVerifySnapshot(ctx context.Context, snapID, destPath
return nil
}
const (
deletedSnapshotsIdxName = "deleted-snapshots-idx"
liveSnapshotsIdxName = "live-snapshots-idx"
allSnapshotsIdxName = "all-snapshots-idx"
)
// DeleteSnapshot performs the Snapshotter's DeleteSnapshot action, and
// marks the snapshot with the given snapshot ID as deleted.
func (chk *Checker) DeleteSnapshot(ctx context.Context, snapID string) error {
@@ -232,13 +294,17 @@ func (chk *Checker) DeleteSnapshot(ctx context.Context, snapID string) error {
return err
}
ssMeta.DeletionTime = clock.Now()
ssMeta.DeletionTime = time.Now()
ssMeta.ValidationData = nil
err = chk.saveSnapshotMetadata(ssMeta)
if err != nil {
return err
}
chk.snapshotMetadataStore.AddToIndex(ssMeta.SnapID, deletedSnapshotsIdxName)
chk.snapshotMetadataStore.RemoveFromIndex(ssMeta.SnapID, liveSnapshotsIdxName)
return nil
}

View File

@@ -0,0 +1,489 @@
// +build darwin,amd64 linux,amd64
package engine
import (
"bytes"
"context"
"errors"
"log"
"math/rand"
"strconv"
"strings"
"time"
"github.com/kopia/kopia/tests/tools/fio"
)
// Errors associated with action-picking.
var (
ErrNoActionPicked = errors.New("unable to pick an action with the action control options provided")
ErrInvalidOption = errors.New("invalid option setting")
)
// ExecAction executes the action denoted by the provided ActionKey.
func (e *Engine) ExecAction(actionKey ActionKey, opts map[string]string) (map[string]string, error) {
if opts == nil {
opts = make(map[string]string)
}
e.RunStats.ActionCounter++
e.CumulativeStats.ActionCounter++
log.Printf("Engine executing ACTION: name=%q actionCount=%v totActCount=%v t=%vs (%vs)", actionKey, e.RunStats.ActionCounter, e.CumulativeStats.ActionCounter, e.RunStats.getLifetimeSeconds(), e.getRuntimeSeconds())
action := actions[actionKey]
st := time.Now()
logEntry := &LogEntry{
StartTime: st,
EngineTimestamp: e.getTimestampS(),
Action: actionKey,
ActionOpts: opts,
}
// Execute the action n times
err := ErrNoOp // Default to no-op error
// TODO: return more than the last output
var out map[string]string
n := getOptAsIntOrDefault(ActionRepeaterField, opts, defaultActionRepeats)
for i := 0; i < n; i++ {
out, err = action.f(e, opts, logEntry)
if err != nil {
break
}
}
// If error was just a no-op, don't bother logging the action
switch {
case errors.Is(err, ErrNoOp):
e.RunStats.NoOpCount++
e.CumulativeStats.NoOpCount++
return out, err
case err != nil:
log.Printf("error=%q", err.Error())
}
if e.RunStats.PerActionStats != nil && e.RunStats.PerActionStats[actionKey] == nil {
e.RunStats.PerActionStats[actionKey] = new(ActionStats)
}
if e.CumulativeStats.PerActionStats != nil && e.CumulativeStats.PerActionStats[actionKey] == nil {
e.CumulativeStats.PerActionStats[actionKey] = new(ActionStats)
}
e.RunStats.PerActionStats[actionKey].Record(st, err)
e.CumulativeStats.PerActionStats[actionKey].Record(st, err)
e.EngineLog.AddCompleted(logEntry, err)
return out, err
}
// RandomAction executes a random action picked by the relative weights given
// in actionOpts[ActionControlActionKey], or uniform probability if that
// key is not present in the input options.
func (e *Engine) RandomAction(actionOpts ActionOpts) error {
actionControlOpts := actionOpts.getActionControlOpts()
actionName := pickActionWeighted(actionControlOpts, actions)
if string(actionName) == "" {
return ErrNoActionPicked
}
_, err := e.ExecAction(actionName, actionOpts[actionName])
err = e.checkErrRecovery(err, actionOpts)
return err
}
func (e *Engine) checkErrRecovery(incomingErr error, actionOpts ActionOpts) (outgoingErr error) {
outgoingErr = incomingErr
if incomingErr == nil {
return nil
}
ctrl := actionOpts.getActionControlOpts()
if errIsNotEnoughSpace(incomingErr) && ctrl[ThrowNoSpaceOnDeviceErrField] == "" {
// no space left on device
// Delete everything in the data directory
const hundredPcnt = 100
deleteDirActionKey := DeleteDirectoryContentsActionKey
deleteRootOpts := map[string]string{
MaxDirDepthField: strconv.Itoa(0),
DeletePercentOfContentsField: strconv.Itoa(hundredPcnt),
}
_, outgoingErr = e.ExecAction(deleteDirActionKey, deleteRootOpts)
if outgoingErr != nil {
return outgoingErr
}
e.RunStats.DataPurgeCount++
e.CumulativeStats.DataPurgeCount++
// Restore a previoius snapshot to the data directory
restoreActionKey := RestoreIntoDataDirectoryActionKey
_, outgoingErr = e.ExecAction(restoreActionKey, actionOpts[restoreActionKey])
if errors.Is(outgoingErr, ErrNoOp) {
outgoingErr = nil
} else {
e.RunStats.DataRestoreCount++
e.CumulativeStats.DataRestoreCount++
}
}
if outgoingErr == nil {
e.RunStats.ErrorRecoveryCount++
e.CumulativeStats.ErrorRecoveryCount++
}
return outgoingErr
}
// List of action keys.
const (
ActionControlActionKey ActionKey = "action-control"
SnapshotRootDirActionKey ActionKey = "snapshot-root"
RestoreSnapshotActionKey ActionKey = "restore-random-snapID"
DeleteRandomSnapshotActionKey ActionKey = "delete-random-snapID"
WriteRandomFilesActionKey ActionKey = "write-random-files"
DeleteRandomSubdirectoryActionKey ActionKey = "delete-random-subdirectory"
DeleteDirectoryContentsActionKey ActionKey = "delete-files"
RestoreIntoDataDirectoryActionKey ActionKey = "restore-into-data-dir"
GCActionKey ActionKey = "run-gc"
)
// ActionOpts is a structure that designates the options for
// picking and running an action.
type ActionOpts map[ActionKey]map[string]string
func (actionOpts ActionOpts) getActionControlOpts() map[string]string {
actionControlOpts := defaultActionControls()
if actionOpts != nil && actionOpts[ActionControlActionKey] != nil {
actionControlOpts = actionOpts[ActionControlActionKey]
}
return actionControlOpts
}
// Action is a unit of functionality that can be executed by
// the engine.
type Action struct {
f func(eng *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error)
}
// ActionKey refers to an action that can be executed by the engine.
type ActionKey string
var actions = map[ActionKey]Action{
SnapshotRootDirActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
log.Printf("Creating snapshot of root directory %s", e.FileWriter.LocalDataDir)
ctx := context.TODO()
snapID, err := e.Checker.TakeSnapshot(ctx, e.FileWriter.LocalDataDir)
setLogEntryCmdOpts(l, map[string]string{
"snap-dir": e.FileWriter.LocalDataDir,
"snapID": snapID,
})
return map[string]string{
SnapshotIDField: snapID,
}, err
},
},
RestoreSnapshotActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
snapID, err := e.getSnapIDOptOrRandLive(opts)
if err != nil {
return nil, err
}
setLogEntryCmdOpts(l, map[string]string{"snapID": snapID})
log.Printf("Restoring snapshot %s", snapID)
ctx := context.Background()
b := &bytes.Buffer{}
err = e.Checker.RestoreSnapshot(ctx, snapID, b)
if err != nil {
log.Print(b.String())
}
return nil, err
},
},
DeleteRandomSnapshotActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
snapID, err := e.getSnapIDOptOrRandLive(opts)
if err != nil {
return nil, err
}
log.Printf("Deleting snapshot %s", snapID)
setLogEntryCmdOpts(l, map[string]string{"snapID": snapID})
ctx := context.Background()
err = e.Checker.DeleteSnapshot(ctx, snapID)
return nil, err
},
},
GCActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
return nil, e.TestRepo.RunGC()
},
},
WriteRandomFilesActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
// Directory depth
maxDirDepth := getOptAsIntOrDefault(MaxDirDepthField, opts, defaultMaxDirDepth)
dirDepth := rand.Intn(maxDirDepth + 1) //nolint:gosec
// File size range
maxFileSizeB := getOptAsIntOrDefault(MaxFileSizeField, opts, defaultMaxFileSize)
minFileSizeB := getOptAsIntOrDefault(MinFileSizeField, opts, defaultMinFileSize)
// Number of files to write
maxNumFiles := getOptAsIntOrDefault(MaxNumFilesPerWriteField, opts, defaultMaxNumFilesPerWrite)
minNumFiles := getOptAsIntOrDefault(MinNumFilesPerWriteField, opts, defaultMinNumFilesPerWrite)
numFiles := rand.Intn(maxNumFiles-minNumFiles+1) + minNumFiles //nolint:gosec
// Dedup Percentage
maxDedupPcnt := getOptAsIntOrDefault(MaxDedupePercentField, opts, defaultMaxDedupePercent)
minDedupPcnt := getOptAsIntOrDefault(MinDedupePercentField, opts, defaultMinDedupePercent)
dedupStep := getOptAsIntOrDefault(DedupePercentStepField, opts, defaultDedupePercentStep)
dedupPcnt := dedupStep * (rand.Intn(maxDedupPcnt/dedupStep-minDedupPcnt/dedupStep+1) + minDedupPcnt/dedupStep) //nolint:gosec
blockSize := int64(defaultMinFileSize)
fioOpts := fio.Options{}.
WithFileSizeRange(int64(minFileSizeB), int64(maxFileSizeB)).
WithNumFiles(numFiles).
WithBlockSize(blockSize).
WithDedupePercentage(dedupPcnt).
WithNoFallocate()
ioLimit := getOptAsIntOrDefault(IOLimitPerWriteAction, opts, defaultIOLimitPerWriteAction)
if ioLimit > 0 {
freeSpaceLimitB := getOptAsIntOrDefault(FreeSpaceLimitField, opts, defaultFreeSpaceLimit)
freeSpaceB, err := getFreeSpaceB(e.FileWriter.LocalDataDir)
if err != nil {
return nil, err
}
log.Printf("Free Space %v B, limit %v B, ioLimit %v B\n", freeSpaceB, freeSpaceLimitB, ioLimit)
if int(freeSpaceB)-ioLimit < freeSpaceLimitB {
ioLimit = int(freeSpaceB) - freeSpaceLimitB
log.Printf("Cutting down I/O limit for space %v", ioLimit)
if ioLimit <= 0 {
return nil, ErrCannotPerformIO
}
}
fioOpts = fioOpts.WithIOLimit(int64(ioLimit))
}
relBasePath := "."
log.Printf("Writing files at depth %v (fileSize: %v-%v, numFiles: %v, blockSize: %v, dedupPcnt: %v, ioLimit: %v)\n", dirDepth, minFileSizeB, maxFileSizeB, numFiles, blockSize, dedupPcnt, ioLimit)
setLogEntryCmdOpts(l, map[string]string{
"dirDepth": strconv.Itoa(dirDepth),
"relBasePath": relBasePath,
})
for k, v := range fioOpts {
l.CmdOpts[k] = v
}
return nil, e.FileWriter.WriteFilesAtDepthRandomBranch(relBasePath, dirDepth, fioOpts)
},
},
DeleteRandomSubdirectoryActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
maxDirDepth := getOptAsIntOrDefault(MaxDirDepthField, opts, defaultMaxDirDepth)
if maxDirDepth <= 0 {
return nil, ErrInvalidOption
}
dirDepth := rand.Intn(maxDirDepth) + 1 //nolint:gosec
log.Printf("Deleting directory at depth %v\n", dirDepth)
setLogEntryCmdOpts(l, map[string]string{"dirDepth": strconv.Itoa(dirDepth)})
err = e.FileWriter.DeleteDirAtDepth("", dirDepth)
if errors.Is(err, fio.ErrNoDirFound) {
log.Print(err)
return nil, ErrNoOp
}
return nil, err
},
},
DeleteDirectoryContentsActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
maxDirDepth := getOptAsIntOrDefault(MaxDirDepthField, opts, defaultMaxDirDepth)
dirDepth := rand.Intn(maxDirDepth + 1) //nolint:gosec
pcnt := getOptAsIntOrDefault(DeletePercentOfContentsField, opts, defaultDeletePercentOfContents)
log.Printf("Deleting %d%% of directory contents at depth %v\n", pcnt, dirDepth)
setLogEntryCmdOpts(l, map[string]string{
"dirDepth": strconv.Itoa(dirDepth),
"percent": strconv.Itoa(pcnt),
})
const pcntConv = 100
err = e.FileWriter.DeleteContentsAtDepth("", dirDepth, float32(pcnt)/pcntConv)
if errors.Is(err, fio.ErrNoDirFound) {
log.Print(err)
return nil, ErrNoOp
}
return nil, err
},
},
RestoreIntoDataDirectoryActionKey: {
f: func(e *Engine, opts map[string]string, l *LogEntry) (out map[string]string, err error) {
snapID, err := e.getSnapIDOptOrRandLive(opts)
if err != nil {
return nil, err
}
log.Printf("Restoring snap ID %v into data directory\n", snapID)
setLogEntryCmdOpts(l, map[string]string{"snapID": snapID})
b := &bytes.Buffer{}
err = e.Checker.RestoreSnapshotToPath(context.Background(), snapID, e.FileWriter.LocalDataDir, b)
if err != nil {
log.Print(b.String())
return nil, err
}
return nil, nil
},
},
}
// Action constants.
const (
defaultMaxDirDepth = 20
defaultMaxFileSize = 1 * 1024 * 1024 * 1024 // 1GB
defaultMinFileSize = 4096
defaultMaxNumFilesPerWrite = 10000
defaultMinNumFilesPerWrite = 1
defaultIOLimitPerWriteAction = 0 // A zero value does not impose any limit on IO
defaultFreeSpaceLimit = 100 * 1024 * 1024 // 100 MB
defaultMaxDedupePercent = 100
defaultMinDedupePercent = 0
defaultDedupePercentStep = 25
defaultDeletePercentOfContents = 20
defaultActionRepeats = 1
)
// Option field names.
const (
MaxDirDepthField = "max-dir-depth"
MaxFileSizeField = "max-file-size"
MinFileSizeField = "min-file-size"
MaxNumFilesPerWriteField = "max-num-files-per-write"
MinNumFilesPerWriteField = "min-num-files-per-write"
IOLimitPerWriteAction = "io-limit-per-write"
FreeSpaceLimitField = "free-space-limit"
MaxDedupePercentField = "max-dedupe-percent"
MinDedupePercentField = "min-dedupe-percent"
DedupePercentStepField = "dedupe-percent"
ActionRepeaterField = "repeat-action"
ThrowNoSpaceOnDeviceErrField = "throw-no-space-error"
DeletePercentOfContentsField = "delete-contents-percent"
SnapshotIDField = "snapshot-ID"
)
func getOptAsIntOrDefault(key string, opts map[string]string, def int) int {
if opts == nil {
return def
}
if opts[key] == "" {
return def
}
retInt, err := strconv.Atoi(opts[key])
if err != nil {
return def
}
return retInt
}
func defaultActionControls() map[string]string {
ret := make(map[string]string, len(actions))
for actionKey := range actions {
switch actionKey {
case RestoreIntoDataDirectoryActionKey:
// Don't restore into data directory by default
ret[string(actionKey)] = strconv.Itoa(0)
default:
ret[string(actionKey)] = strconv.Itoa(1)
}
}
return ret
}
func pickActionWeighted(actionControlOpts map[string]string, actionList map[ActionKey]Action) ActionKey {
var keepKey ActionKey
sum := 0
for actionName := range actionList {
weight := getOptAsIntOrDefault(string(actionName), actionControlOpts, 0)
if weight == 0 {
continue
}
sum += weight
if rand.Intn(sum) < weight { //nolint:gosec
keepKey = actionName
}
}
return keepKey
}
func errIsNotEnoughSpace(err error) bool {
return errors.Is(err, ErrCannotPerformIO) || strings.Contains(err.Error(), noSpaceOnDeviceMatchStr)
}
func (e *Engine) getSnapIDOptOrRandLive(opts map[string]string) (snapID string, err error) {
snapID = opts[SnapshotIDField]
if snapID != "" {
return snapID, nil
}
snapIDList := e.Checker.GetLiveSnapIDs()
if len(snapIDList) == 0 {
return "", ErrNoOp
}
return snapIDList[rand.Intn(len(snapIDList))], nil //nolint:gosec
}

View File

@@ -6,10 +6,14 @@
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/kopia/kopia/tests/robustness/checker"
"github.com/kopia/kopia/tests/robustness/snap"
@@ -24,8 +28,16 @@
S3BucketNameEnvKey = "S3_BUCKET_NAME"
)
// ErrS3BucketNameEnvUnset is the error returned when the S3BucketNameEnvKey environment variable is not set.
var ErrS3BucketNameEnvUnset = fmt.Errorf("environment variable required: %v", S3BucketNameEnvKey)
var (
// ErrNoOp is thrown when an action could not do anything useful.
ErrNoOp = fmt.Errorf("no-op")
// ErrCannotPerformIO is returned if the engine determines there is not enough space
// to write files.
ErrCannotPerformIO = fmt.Errorf("cannot perform i/o")
// ErrS3BucketNameEnvUnset is the error returned when the S3BucketNameEnvKey environment variable is not set.
ErrS3BucketNameEnvUnset = fmt.Errorf("environment variable required: %v", S3BucketNameEnvKey)
noSpaceOnDeviceMatchStr = "no space left on device"
)
// Engine is the outer level testing framework for robustness testing.
type Engine struct {
@@ -34,6 +46,11 @@ type Engine struct {
MetaStore snapmeta.Persister
Checker *checker.Checker
cleanupRoutines []func()
baseDirPath string
RunStats Stats
CumulativeStats Stats
EngineLog Log
}
// NewEngine instantiates a new Engine and returns its pointer. It is
@@ -48,12 +65,19 @@ func NewEngine(workingDir string) (*Engine, error) {
return nil, err
}
e := new(Engine)
e := &Engine{
baseDirPath: baseDirPath,
RunStats: Stats{
RunCounter: 1,
CreationTime: time.Now(),
PerActionStats: make(map[ActionKey]*ActionStats),
},
}
// Fill the file writer
e.FileWriter, err = fio.NewRunner()
if err != nil {
e.Cleanup() //nolint:errcheck
e.CleanComponents()
return nil, err
}
@@ -62,7 +86,7 @@ func NewEngine(workingDir string) (*Engine, error) {
// Fill Snapshotter interface
kopiaSnapper, err := kopiarunner.NewKopiaSnapshotter(baseDirPath)
if err != nil {
e.Cleanup() //nolint:errcheck
e.CleanComponents()
return nil, err
}
@@ -72,7 +96,7 @@ func NewEngine(workingDir string) (*Engine, error) {
// Fill the snapshot store interface
snapStore, err := snapmeta.New(baseDirPath)
if err != nil {
e.Cleanup() //nolint:errcheck
e.CleanComponents()
return nil, err
}
@@ -80,12 +104,18 @@ func NewEngine(workingDir string) (*Engine, error) {
e.MetaStore = snapStore
err = e.setupLogging()
if err != nil {
e.CleanComponents()
return nil, err
}
// Create the data integrity checker
chk, err := checker.NewChecker(kopiaSnapper, snapStore, fswalker.NewWalkCompare())
chk, err := checker.NewChecker(kopiaSnapper, snapStore, fswalker.NewWalkCompare(), baseDirPath)
e.cleanupRoutines = append(e.cleanupRoutines, chk.Cleanup)
if err != nil {
e.Cleanup() //nolint:errcheck
e.CleanComponents()
return nil, err
}
@@ -96,18 +126,93 @@ func NewEngine(workingDir string) (*Engine, error) {
// Cleanup cleans up after each component of the test engine.
func (e *Engine) Cleanup() error {
defer e.cleanup()
// Perform a snapshot action to capture the state of the data directory
// at the end of the run
lastWriteEntry := e.EngineLog.FindLastThisRun(WriteRandomFilesActionKey)
lastSnapEntry := e.EngineLog.FindLastThisRun(SnapshotRootDirActionKey)
if lastWriteEntry != nil {
if lastSnapEntry == nil || lastSnapEntry.Idx < lastWriteEntry.Idx {
// Only force a final snapshot if the data tree has been modified since the last snapshot
e.ExecAction(SnapshotRootDirActionKey, make(map[string]string)) //nolint:errcheck
}
}
cleanupSummaryBuilder := new(strings.Builder)
cleanupSummaryBuilder.WriteString("\n================\n")
cleanupSummaryBuilder.WriteString("Cleanup Summary:\n\n")
cleanupSummaryBuilder.WriteString(e.Stats())
cleanupSummaryBuilder.WriteString("\n\n")
cleanupSummaryBuilder.WriteString(e.EngineLog.StringThisRun())
cleanupSummaryBuilder.WriteString("\n")
log.Print(cleanupSummaryBuilder.String())
e.RunStats.RunTime = time.Since(e.RunStats.CreationTime)
e.CumulativeStats.RunTime += e.RunStats.RunTime
defer e.CleanComponents()
if e.MetaStore != nil {
err := e.SaveLog()
if err != nil {
return err
}
err = e.SaveStats()
if err != nil {
return err
}
return e.MetaStore.FlushMetadata()
}
return nil
}
func (e *Engine) cleanup() {
func (e *Engine) setupLogging() error {
dirPath := e.MetaStore.GetPersistDir()
newLogPath := filepath.Join(dirPath, e.formatLogName())
f, err := os.Create(newLogPath)
if err != nil {
return err
}
// Write to both stderr and persistent log file
wrt := io.MultiWriter(os.Stderr, f)
log.SetOutput(wrt)
return nil
}
func (e *Engine) formatLogName() string {
st := e.RunStats.CreationTime
return fmt.Sprintf("Log_%s", st.Format("2006_01_02_15_04_05"))
}
// CleanComponents cleans up each component part of the test engine.
func (e *Engine) CleanComponents() {
for _, f := range e.cleanupRoutines {
f()
if f != nil {
f()
}
}
os.RemoveAll(e.baseDirPath) //nolint:errcheck
}
// Init initializes the Engine to a repository location according to the environment setup.
// - If S3_BUCKET_NAME is set, initialize S3
// - Else initialize filesystem.
func (e *Engine) Init(ctx context.Context, testRepoPath, metaRepoPath string) error {
switch {
case os.Getenv(S3BucketNameEnvKey) != "":
bucketName := os.Getenv(S3BucketNameEnvKey)
return e.InitS3(ctx, bucketName, testRepoPath, metaRepoPath)
default:
return e.InitFilesystem(ctx, testRepoPath, metaRepoPath)
}
}
@@ -115,48 +220,18 @@ func (e *Engine) cleanup() {
// is successful, the engine is populated with the metadata associated with the
// snapshot in that repo. A new repo will be created if one does not already
// exist.
func (e *Engine) InitS3(ctx context.Context, testRepoPath, metaRepoPath string) error {
bucketName := os.Getenv(S3BucketNameEnvKey)
if bucketName == "" {
return ErrS3BucketNameEnvUnset
}
func (e *Engine) InitS3(ctx context.Context, bucketName, testRepoPath, metaRepoPath string) error {
err := e.MetaStore.ConnectOrCreateS3(bucketName, metaRepoPath)
if err != nil {
return err
}
err = e.MetaStore.LoadMetadata()
if err != nil {
return err
}
err = e.TestRepo.ConnectOrCreateS3(bucketName, testRepoPath)
if err != nil {
return err
}
_, _, err = e.TestRepo.Run("policy", "set", "--global", "--keep-latest", strconv.Itoa(1<<31-1))
if err != nil {
return err
}
err = e.Checker.VerifySnapshotMetadata()
if err != nil {
return err
}
snapIDs := e.Checker.GetLiveSnapIDs()
if len(snapIDs) > 0 {
randSnapID := snapIDs[rand.Intn(len(snapIDs))] //nolint:gosec
err = e.Checker.RestoreSnapshotToPath(ctx, randSnapID, e.FileWriter.LocalDataDir, os.Stdout)
if err != nil {
return err
}
}
return nil
return e.init(ctx)
}
// InitFilesystem attempts to connect to a test repo and metadata repo on the local
@@ -169,30 +244,36 @@ func (e *Engine) InitFilesystem(ctx context.Context, testRepoPath, metaRepoPath
return err
}
err = e.MetaStore.LoadMetadata()
if err != nil {
return err
}
err = e.TestRepo.ConnectOrCreateFilesystem(testRepoPath)
if err != nil {
return err
}
err = e.Checker.VerifySnapshotMetadata()
return e.init(ctx)
}
func (e *Engine) init(ctx context.Context) error {
err := e.MetaStore.LoadMetadata()
if err != nil {
return err
}
snapIDs := e.Checker.GetSnapIDs()
if len(snapIDs) > 0 {
randSnapID := snapIDs[rand.Intn(len(snapIDs))] //nolint:gosec
err = e.Checker.RestoreSnapshotToPath(ctx, randSnapID, e.FileWriter.LocalDataDir, os.Stdout)
if err != nil {
return err
}
err = e.LoadStats()
if err != nil {
return err
}
return nil
e.CumulativeStats.RunCounter++
err = e.LoadLog()
if err != nil {
return err
}
_, _, err = e.TestRepo.Run("policy", "set", "--global", "--keep-latest", strconv.Itoa(1<<31-1), "--compression", "s2-default")
if err != nil {
return err
}
return e.Checker.VerifySnapshotMetadata()
}

View File

@@ -5,13 +5,23 @@
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v6/pkg/credentials"
"github.com/kopia/kopia/tests/robustness/snapmeta"
"github.com/kopia/kopia/tests/testenv"
"github.com/kopia/kopia/tests/tools/fio"
"github.com/kopia/kopia/tests/tools/fswalker"
@@ -19,10 +29,17 @@
)
var (
fsMetadataRepoPath = filepath.Join("/tmp", "metadata-repo")
s3MetadataRepoPath = filepath.Join("some/path", "metadata-repo")
fsDataRepoPath = filepath.Join("/tmp", "data-repo")
s3DataRepoPath = filepath.Join("some/path", "data-repo")
repoBaseDirName = "engine"
fsBasePath = "/tmp"
s3BasePath = ""
dataRepoPath = "unit-tests/data-repo"
metadataRepoPath = "unit-tests/metadata-repo"
fsRepoBaseDirPath = filepath.Join(fsBasePath, repoBaseDirName)
s3RepoBaseDirPath = filepath.Join(s3BasePath, repoBaseDirName)
fsMetadataRepoPath = filepath.Join(fsRepoBaseDirPath, metadataRepoPath)
s3MetadataRepoPath = filepath.Join(s3RepoBaseDirPath, metadataRepoPath)
fsDataRepoPath = filepath.Join(fsRepoBaseDirPath, dataRepoPath)
s3DataRepoPath = filepath.Join(s3RepoBaseDirPath, dataRepoPath)
)
func TestEngineWritefilesBasicFS(t *testing.T) {
@@ -36,18 +53,20 @@ func TestEngineWritefilesBasicFS(t *testing.T) {
defer func() {
cleanupErr := eng.Cleanup()
testenv.AssertNoError(t, cleanupErr)
os.RemoveAll(fsRepoBaseDirPath)
}()
ctx := context.TODO()
err = eng.InitFilesystem(ctx, fsDataRepoPath, fsMetadataRepoPath)
testenv.AssertNoError(t, err)
fileSize := int64(256 * 1024 * 1024)
fileSize := int64(256 * 1024)
numFiles := 10
fioOpt := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
fioOpts := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
err = eng.FileWriter.WriteFiles("", fioOpt)
err = eng.FileWriter.WriteFiles("", fioOpts)
testenv.AssertNoError(t, err)
snapIDs := eng.Checker.GetSnapIDs()
@@ -64,7 +83,74 @@ func TestEngineWritefilesBasicFS(t *testing.T) {
}
}
func randomString(n int) string {
b := make([]byte, n)
io.ReadFull(rand.Reader, b)
return hex.EncodeToString(b)
}
func makeTempS3Bucket(t *testing.T) (bucketName string, cleanupCB func()) {
endpoint := "s3.amazonaws.com"
accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
sessionToken := os.Getenv("AWS_SESSION_TOKEN")
if accessKeyID == "" || secretAccessKey == "" || sessionToken == "" {
t.Skip("Skipping S3 tests if no creds provided")
}
secure := true
region := ""
cli, err := minio.NewWithCredentials(endpoint, credentials.NewStaticV4(accessKeyID, secretAccessKey, sessionToken), secure, region)
testenv.AssertNoError(t, err)
bucketName = fmt.Sprintf("engine-unit-tests-%s", randomString(4))
err = cli.MakeBucket(bucketName, "")
testenv.AssertNoError(t, err)
return bucketName, func() {
objNameCh := make(chan string)
errCh := cli.RemoveObjects(bucketName, objNameCh)
go func() {
for removeErr := range errCh {
t.Errorf("error removing key %s from bucket: %s", removeErr.ObjectName, removeErr.Err)
}
}()
recursive := true
doneCh := make(chan struct{})
defer close(doneCh)
for obj := range cli.ListObjects(bucketName, "", recursive, doneCh) {
objNameCh <- obj.Key
}
close(objNameCh)
retries := 10
retryPeriod := 1 * time.Second
var err error
for retry := 0; retry < retries; retry++ {
time.Sleep(retryPeriod)
err = cli.RemoveBucket(bucketName)
if err == nil {
break
}
}
testenv.AssertNoError(t, err)
}
}
func TestWriteFilesBasicS3(t *testing.T) {
bucketName, cleanupCB := makeTempS3Bucket(t)
defer cleanupCB()
eng, err := NewEngine("")
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
t.Skip(err)
@@ -78,15 +164,15 @@ func TestWriteFilesBasicS3(t *testing.T) {
}()
ctx := context.TODO()
err = eng.InitS3(ctx, s3DataRepoPath, s3MetadataRepoPath)
err = eng.InitS3(ctx, bucketName, s3DataRepoPath, s3MetadataRepoPath)
testenv.AssertNoError(t, err)
fileSize := int64(256 * 1024 * 1024)
fileSize := int64(256 * 1024)
numFiles := 10
fioOpt := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
fioOpts := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
err = eng.FileWriter.WriteFiles("", fioOpt)
err = eng.FileWriter.WriteFiles("", fioOpts)
testenv.AssertNoError(t, err)
snapIDs := eng.Checker.GetLiveSnapIDs()
@@ -104,6 +190,9 @@ func TestWriteFilesBasicS3(t *testing.T) {
}
func TestDeleteSnapshotS3(t *testing.T) {
bucketName, cleanupCB := makeTempS3Bucket(t)
defer cleanupCB()
eng, err := NewEngine("")
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
t.Skip(err)
@@ -117,15 +206,15 @@ func TestDeleteSnapshotS3(t *testing.T) {
}()
ctx := context.TODO()
err = eng.InitS3(ctx, s3DataRepoPath, s3MetadataRepoPath)
err = eng.InitS3(ctx, bucketName, s3DataRepoPath, s3MetadataRepoPath)
testenv.AssertNoError(t, err)
fileSize := int64(256 * 1024 * 1024)
fileSize := int64(256 * 1024)
numFiles := 10
fioOpt := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
fioOpts := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
err = eng.FileWriter.WriteFiles("", fioOpt)
err = eng.FileWriter.WriteFiles("", fioOpts)
testenv.AssertNoError(t, err)
snapID, err := eng.Checker.TakeSnapshot(ctx, eng.FileWriter.LocalDataDir)
@@ -144,6 +233,9 @@ func TestDeleteSnapshotS3(t *testing.T) {
}
func TestSnapshotVerificationFail(t *testing.T) {
bucketName, cleanupCB := makeTempS3Bucket(t)
defer cleanupCB()
eng, err := NewEngine("")
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
t.Skip(err)
@@ -157,13 +249,12 @@ func TestSnapshotVerificationFail(t *testing.T) {
}()
ctx := context.TODO()
err = eng.InitS3(ctx, s3DataRepoPath, s3MetadataRepoPath)
err = eng.InitS3(ctx, bucketName, s3DataRepoPath, s3MetadataRepoPath)
testenv.AssertNoError(t, err)
// Perform writes
fileSize := int64(256 * 1024 * 1024)
fileSize := int64(256 * 1024)
numFiles := 10
fioOpt := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
err = eng.FileWriter.WriteFiles("", fioOpt)
@@ -230,7 +321,7 @@ func TestDataPersistency(t *testing.T) {
testenv.AssertNoError(t, err)
// Perform writes
fileSize := int64(256 * 1024 * 1024)
fileSize := int64(256 * 1024)
numFiles := 10
fioOpt := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles)
@@ -254,7 +345,7 @@ func TestDataPersistency(t *testing.T) {
eng2, err := NewEngine("")
testenv.AssertNoError(t, err)
defer eng2.cleanup()
defer eng2.CleanComponents()
// Connect this engine to the same data and metadata repositories -
// expect that the snapshot taken above will be found in metadata,
@@ -263,8 +354,382 @@ func TestDataPersistency(t *testing.T) {
err = eng2.InitFilesystem(ctx, dataRepoPath, metadataRepoPath)
testenv.AssertNoError(t, err)
err = eng2.Checker.RestoreSnapshotToPath(ctx, snapID, eng2.FileWriter.LocalDataDir, os.Stdout)
testenv.AssertNoError(t, err)
// Compare the data directory of the second engine with the fingerprint
// of the snapshot taken earlier. They should match.
err = fswalker.NewWalkCompare().Compare(ctx, eng2.FileWriter.LocalDataDir, dataDirWalk.ValidationData, os.Stdout)
testenv.AssertNoError(t, err)
}
func TestPickActionWeighted(t *testing.T) {
for _, tc := range []struct {
name string
inputCtrlWeights map[string]float64
inputActionList map[ActionKey]Action
}{
{
name: "basic uniform",
inputCtrlWeights: map[string]float64{
"A": 1,
"B": 1,
"C": 1,
},
inputActionList: map[ActionKey]Action{
"A": {},
"B": {},
"C": {},
},
},
{
name: "basic weighted",
inputCtrlWeights: map[string]float64{
"A": 1,
"B": 10,
"C": 100,
},
inputActionList: map[ActionKey]Action{
"A": {},
"B": {},
"C": {},
},
},
{
name: "include a zero weight",
inputCtrlWeights: map[string]float64{
"A": 1,
"B": 0,
"C": 1,
},
inputActionList: map[ActionKey]Action{
"A": {},
"B": {},
"C": {},
},
},
{
name: "include an ActionKey that is not in the action list",
inputCtrlWeights: map[string]float64{
"A": 1,
"B": 1,
"C": 1,
"D": 100,
},
inputActionList: map[ActionKey]Action{
"A": {},
"B": {},
"C": {},
},
},
} {
t.Log(tc.name)
weightsSum := 0.0
inputCtrlOpts := make(map[string]string)
for k, v := range tc.inputCtrlWeights {
// Do not weight actions that are not expected in the results
if _, ok := tc.inputActionList[ActionKey(k)]; !ok {
continue
}
inputCtrlOpts[k] = strconv.Itoa(int(v))
weightsSum += v
}
numTestLoops := 100000
results := make(map[ActionKey]int, len(tc.inputCtrlWeights))
for loop := 0; loop < numTestLoops; loop++ {
results[pickActionWeighted(inputCtrlOpts, tc.inputActionList)]++
}
for actionKey, count := range results {
p := tc.inputCtrlWeights[string(actionKey)] / weightsSum
exp := p * float64(numTestLoops)
errPcnt := math.Abs(exp-float64(count)) / exp
if errPcnt > 0.1 {
t.Errorf("Error in actual counts was above 10%% for %v (exp %v, actual %v)", actionKey, exp, count)
}
}
}
}
func TestActionsFilesystem(t *testing.T) {
eng, err := NewEngine("")
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
t.Skip(err)
}
testenv.AssertNoError(t, err)
defer func() {
cleanupErr := eng.Cleanup()
testenv.AssertNoError(t, cleanupErr)
os.RemoveAll(fsRepoBaseDirPath)
}()
ctx := context.TODO()
err = eng.InitFilesystem(ctx, fsDataRepoPath, fsMetadataRepoPath)
testenv.AssertNoError(t, err)
actionOpts := ActionOpts{
WriteRandomFilesActionKey: map[string]string{
MaxDirDepthField: "20",
MaxFileSizeField: strconv.Itoa(10 * 1024 * 1024),
MinFileSizeField: strconv.Itoa(10 * 1024 * 1024),
MaxNumFilesPerWriteField: "10",
MinNumFilesPerWriteField: "10",
MaxDedupePercentField: "100",
MinDedupePercentField: "100",
DedupePercentStepField: "1",
IOLimitPerWriteAction: "0",
},
}
numActions := 10
for loop := 0; loop < numActions; loop++ {
err := eng.RandomAction(actionOpts)
if !(err == nil || err == ErrNoOp) {
t.Error("Hit error", err)
}
}
}
func TestActionsS3(t *testing.T) {
bucketName, cleanupCB := makeTempS3Bucket(t)
defer cleanupCB()
eng, err := NewEngine("")
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
t.Skip(err)
}
testenv.AssertNoError(t, err)
defer func() {
cleanupErr := eng.Cleanup()
testenv.AssertNoError(t, cleanupErr)
}()
ctx := context.TODO()
err = eng.InitS3(ctx, bucketName, s3DataRepoPath, s3MetadataRepoPath)
testenv.AssertNoError(t, err)
actionOpts := ActionOpts{
WriteRandomFilesActionKey: map[string]string{
MaxDirDepthField: "20",
MaxFileSizeField: strconv.Itoa(10 * 1024 * 1024),
MinFileSizeField: strconv.Itoa(10 * 1024 * 1024),
MaxNumFilesPerWriteField: "10",
MinNumFilesPerWriteField: "10",
MaxDedupePercentField: "100",
MinDedupePercentField: "100",
DedupePercentStepField: "1",
IOLimitPerWriteAction: "0",
},
}
numActions := 10
for loop := 0; loop < numActions; loop++ {
err := eng.RandomAction(actionOpts)
if !(err == nil || err == ErrNoOp) {
t.Error("Hit error", err)
}
}
}
func TestIOLimitPerWriteAction(t *testing.T) {
// Instruct a write action to write an enormous amount of data
// that should take longer than this timeout without "io_limit",
// but finish in less time with "io_limit". Command instructs fio
// to generate 100 files x 10 MB each = 1 GB of i/o. The limit is
// set to 1 MB.
const timeout = 10 * time.Second
eng, err := NewEngine("")
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
t.Skip(err)
}
testenv.AssertNoError(t, err)
defer func() {
cleanupErr := eng.Cleanup()
testenv.AssertNoError(t, cleanupErr)
os.RemoveAll(fsRepoBaseDirPath)
}()
ctx := context.TODO()
err = eng.InitFilesystem(ctx, fsDataRepoPath, fsMetadataRepoPath)
testenv.AssertNoError(t, err)
actionOpts := ActionOpts{
ActionControlActionKey: map[string]string{
string(SnapshotRootDirActionKey): strconv.Itoa(0),
string(RestoreSnapshotActionKey): strconv.Itoa(0),
string(DeleteRandomSnapshotActionKey): strconv.Itoa(0),
string(WriteRandomFilesActionKey): strconv.Itoa(1),
string(DeleteRandomSubdirectoryActionKey): strconv.Itoa(0),
},
WriteRandomFilesActionKey: map[string]string{
MaxDirDepthField: "2",
MaxFileSizeField: strconv.Itoa(10 * 1024 * 1024),
MinFileSizeField: strconv.Itoa(10 * 1024 * 1024),
MaxNumFilesPerWriteField: "100",
MinNumFilesPerWriteField: "100",
IOLimitPerWriteAction: strconv.Itoa(1 * 1024 * 1024),
},
}
st := time.Now()
numActions := 1
for loop := 0; loop < numActions; loop++ {
err := eng.RandomAction(actionOpts)
testenv.AssertNoError(t, err)
}
if time.Since(st) > timeout {
t.Errorf("IO limit parameter did not cut down on the fio runtime")
}
}
func TestStatsPersist(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "stats-persist-test")
testenv.AssertNoError(t, err)
defer os.RemoveAll(tmpDir)
snapStore, err := snapmeta.New(tmpDir)
if errors.Is(err, kopiarunner.ErrExeVariableNotSet) {
t.Skip(err)
}
testenv.AssertNoError(t, err)
err = snapStore.ConnectOrCreateFilesystem(tmpDir)
testenv.AssertNoError(t, err)
actionstats := &ActionStats{
Count: 120,
TotalRuntime: 25 * time.Hour,
MinRuntime: 5 * time.Minute,
MaxRuntime: 35 * time.Minute,
}
creationTime := time.Now().Add(-time.Hour)
eng := &Engine{
MetaStore: snapStore,
CumulativeStats: Stats{
ActionCounter: 11235,
CreationTime: creationTime,
PerActionStats: map[ActionKey]*ActionStats{
ActionKey("some-action"): actionstats,
},
DataRestoreCount: 99,
},
}
err = eng.SaveStats()
testenv.AssertNoError(t, err)
err = eng.MetaStore.FlushMetadata()
testenv.AssertNoError(t, err)
snapStoreNew, err := snapmeta.New(tmpDir)
testenv.AssertNoError(t, err)
// Connect to the same metadata store
err = snapStoreNew.ConnectOrCreateFilesystem(tmpDir)
testenv.AssertNoError(t, err)
err = snapStoreNew.LoadMetadata()
testenv.AssertNoError(t, err)
engNew := &Engine{
MetaStore: snapStoreNew,
}
err = engNew.LoadStats()
testenv.AssertNoError(t, err)
if got, want := engNew.Stats(), eng.Stats(); got != want {
t.Errorf("Stats do not match\n%v\n%v", got, want)
}
fmt.Println(eng.Stats())
fmt.Println(engNew.Stats())
}
func TestLogsPersist(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "logs-persist-test")
testenv.AssertNoError(t, err)
defer os.RemoveAll(tmpDir)
snapStore, err := snapmeta.New(tmpDir)
if errors.Is(err, kopiarunner.ErrExeVariableNotSet) {
t.Skip(err)
}
testenv.AssertNoError(t, err)
err = snapStore.ConnectOrCreateFilesystem(tmpDir)
testenv.AssertNoError(t, err)
log := Log{
Log: []*LogEntry{
{
StartTime: time.Now().Add(-time.Hour),
EndTime: time.Now(),
Action: ActionKey("some action"),
Error: "some error",
Idx: 11235,
ActionOpts: map[string]string{
"opt1": "opt1 value",
},
CmdOpts: map[string]string{
"cmdOpt": "cmdOptVal",
},
},
},
}
eng := &Engine{
MetaStore: snapStore,
EngineLog: log,
}
err = eng.SaveLog()
testenv.AssertNoError(t, err)
err = eng.MetaStore.FlushMetadata()
testenv.AssertNoError(t, err)
snapStoreNew, err := snapmeta.New(tmpDir)
testenv.AssertNoError(t, err)
// Connect to the same metadata store
err = snapStoreNew.ConnectOrCreateFilesystem(tmpDir)
testenv.AssertNoError(t, err)
err = snapStoreNew.LoadMetadata()
testenv.AssertNoError(t, err)
engNew := &Engine{
MetaStore: snapStoreNew,
}
err = engNew.LoadLog()
testenv.AssertNoError(t, err)
if got, want := engNew.EngineLog.String(), eng.EngineLog.String(); got != want {
t.Errorf("Logs do not match\n%v\n%v", got, want)
}
}

View File

@@ -0,0 +1,126 @@
// +build darwin,amd64 linux,amd64
package engine
import (
"fmt"
"strings"
"time"
)
// Log keeps track of the actions taken by the engine.
type Log struct {
runOffset int
Log []*LogEntry
}
// LogEntry is an entry for the engine log.
type LogEntry struct {
StartTime time.Time
EndTime time.Time
EngineTimestamp int64
Action ActionKey
Error string
Idx int64
ActionOpts map[string]string
CmdOpts map[string]string
}
func (l *LogEntry) String() string {
b := &strings.Builder{}
const timeResol = 100 * time.Millisecond
fmt.Fprintf(b, "%4v t=%ds %s (%s): %v -> error=%s\n",
l.Idx,
l.EngineTimestamp,
formatTime(l.StartTime),
l.EndTime.Sub(l.StartTime).Round(timeResol),
l.Action,
l.Error,
)
return b.String()
}
func formatTime(tm time.Time) string {
return tm.Format("2006/01/02 15:04:05 MST")
}
// StringThisRun returns a string of only the log entries generated
// by actions in this run of the engine.
func (elog *Log) StringThisRun() string {
b := &strings.Builder{}
for i, l := range elog.Log {
if i >= elog.runOffset {
fmt.Fprint(b, l.String())
}
}
return b.String()
}
func (elog *Log) String() string {
b := &strings.Builder{}
fmt.Fprintf(b, "Log size: %10v\n", len(elog.Log))
fmt.Fprintf(b, "========\n")
for _, l := range elog.Log {
fmt.Fprint(b, l.String())
}
return b.String()
}
// AddEntry adds a LogEntry to the Log.
func (elog *Log) AddEntry(l *LogEntry) {
l.Idx = int64(len(elog.Log))
elog.Log = append(elog.Log, l)
}
// AddCompleted finalizes a log entry at the time it is called
// and with the provided error, before adding it to the Log.
func (elog *Log) AddCompleted(logEntry *LogEntry, err error) {
logEntry.EndTime = time.Now()
if err != nil {
logEntry.Error = err.Error()
}
elog.AddEntry(logEntry)
if len(elog.Log) == 0 {
panic("Did not get added")
}
}
// FindLast finds the most recent log entry with the provided ActionKey.
func (elog *Log) FindLast(actionKey ActionKey) *LogEntry {
return elog.findLastUntilIdx(actionKey, 0)
}
// FindLastThisRun finds the most recent log entry with the provided ActionKey,
// limited to the current run only.
func (elog *Log) FindLastThisRun(actionKey ActionKey) (found *LogEntry) {
return elog.findLastUntilIdx(actionKey, elog.runOffset)
}
func (elog *Log) findLastUntilIdx(actionKey ActionKey, limitIdx int) *LogEntry {
for i := len(elog.Log) - 1; i >= limitIdx; i-- {
entry := elog.Log[i]
if entry != nil && entry.Action == actionKey {
return entry
}
}
return nil
}
func setLogEntryCmdOpts(l *LogEntry, opts map[string]string) {
if l == nil {
return
}
l.CmdOpts = opts
}

View File

@@ -0,0 +1,77 @@
// +build darwin,amd64 linux,amd64
package engine
import (
"encoding/json"
"errors"
"time"
"github.com/kopia/kopia/tests/robustness/snapmeta"
)
const (
engineStatsStoreKey = "cumulative-engine-stats"
engineLogsStoreKey = "engine-logs"
)
// SaveLog saves the engine Log in the metadata store.
func (e *Engine) SaveLog() error {
b, err := json.Marshal(e.EngineLog)
if err != nil {
return err
}
return e.MetaStore.Store(engineLogsStoreKey, b)
}
// LoadLog loads the engine log from the metadata store.
func (e *Engine) LoadLog() error {
b, err := e.MetaStore.Load(engineLogsStoreKey)
if err != nil {
if errors.Is(err, snapmeta.ErrKeyNotFound) {
// Swallow key-not-found error. May not have historical logs
return nil
}
return err
}
err = json.Unmarshal(b, &e.EngineLog)
if err != nil {
return err
}
e.EngineLog.runOffset = len(e.EngineLog.Log)
return err
}
// SaveStats saves the engine Stats in the metadata store.
func (e *Engine) SaveStats() error {
cumulStatRaw, err := json.Marshal(e.CumulativeStats)
if err != nil {
return err
}
return e.MetaStore.Store(engineStatsStoreKey, cumulStatRaw)
}
// LoadStats loads the engine Stats from the metadata store.
func (e *Engine) LoadStats() error {
b, err := e.MetaStore.Load(engineStatsStoreKey)
if err != nil {
if errors.Is(err, snapmeta.ErrKeyNotFound) {
// Swallow key-not-found error. We may not have historical
// stats data. Initialize the action map for the cumulative stats
e.CumulativeStats.PerActionStats = make(map[ActionKey]*ActionStats)
e.CumulativeStats.CreationTime = time.Now()
return nil
}
return err
}
return json.Unmarshal(b, &e.CumulativeStats)
}

View File

@@ -0,0 +1,155 @@
// +build darwin,amd64 linux,amd64
package engine
import (
"fmt"
"strings"
"time"
)
var (
repoBuildTime = "unknown"
repoGitRevision = "unknown"
repoGitBranch = "unknown"
testBuildTime = "unknown"
testGitRevision = "unknown"
testGitBranch = "unknown"
)
// Stats prints the engine stats, cumulative and from the current run.
func (e *Engine) Stats() string {
b := &strings.Builder{}
fmt.Fprintln(b, "==================================")
fmt.Fprintln(b, "Build Info")
fmt.Fprintln(b, "==================================")
fmt.Fprintf(b, " Repo build time: %25v\n", repoBuildTime)
fmt.Fprintf(b, " Repo git revision: %25v\n", repoGitRevision)
fmt.Fprintf(b, " Repo git branch: %25v\n", repoGitBranch)
fmt.Fprintln(b, "")
fmt.Fprintf(b, " Engine build time: %25v\n", testBuildTime)
fmt.Fprintf(b, " Engine git revision: %25v\n", testGitRevision)
fmt.Fprintf(b, " Engine git branch: %25v\n", testGitBranch)
fmt.Fprintln(b, "")
fmt.Fprintln(b, "==================================")
fmt.Fprintln(b, "Engine Action Summary (Cumulative)")
fmt.Fprintln(b, "==================================")
fmt.Fprintf(b, " Engine runtime: %10vs\n", e.getRuntimeSeconds())
fmt.Fprintln(b, "")
fmt.Fprint(b, e.CumulativeStats.Stats())
fmt.Fprintln(b, "")
fmt.Fprintln(b, "==================================")
fmt.Fprintln(b, "Engine Action Summary (This Run)")
fmt.Fprintln(b, "==================================")
fmt.Fprint(b, e.RunStats.Stats())
fmt.Fprintln(b, "")
return b.String()
}
// Stats tracks statistics during engine runtime.
type Stats struct {
RunCounter int64
ActionCounter int64
CreationTime time.Time
RunTime time.Duration
PerActionStats map[ActionKey]*ActionStats
DataRestoreCount int64
DataPurgeCount int64
ErrorRecoveryCount int64
NoOpCount int64
}
// Stats returns a string report of the engine's stats.
func (stats *Stats) Stats() string {
b := &strings.Builder{}
fmt.Fprintln(b, "=============")
fmt.Fprintln(b, "Stat summary")
fmt.Fprintln(b, "=============")
fmt.Fprintf(b, " Number of runs: %10v\n", stats.RunCounter)
fmt.Fprintf(b, " Engine lifetime: %10vs\n", stats.getLifetimeSeconds())
fmt.Fprintf(b, " Actions run: %10v\n", stats.ActionCounter)
fmt.Fprintf(b, " Errors recovered: %10v\n", stats.ErrorRecoveryCount)
fmt.Fprintf(b, " Data dir restores: %10v\n", stats.DataRestoreCount)
fmt.Fprintf(b, " Data dir purges: %10v\n", stats.DataPurgeCount)
fmt.Fprintf(b, " NoOp count: %10v\n", stats.NoOpCount)
fmt.Fprintln(b, "")
fmt.Fprintln(b, "=============")
fmt.Fprintln(b, "Action stats")
fmt.Fprintln(b, "=============")
for actionKey, actionStat := range stats.PerActionStats {
fmt.Fprintf(b, "%s:\n", actionKey)
fmt.Fprintf(b, " Count: %10d\n", actionStat.Count)
fmt.Fprintf(b, " Avg Runtime: %10v\n", actionStat.avgRuntimeString())
fmt.Fprintf(b, " Max Runtime: %10vs\n", durationToSec(actionStat.MaxRuntime))
fmt.Fprintf(b, " Min Runtime: %10vs\n", durationToSec(actionStat.MinRuntime))
fmt.Fprintf(b, " Error Count: %10v\n", actionStat.ErrorCount)
fmt.Fprintln(b, "")
}
return b.String()
}
// ActionStats tracks runtime statistics for an action.
type ActionStats struct {
Count int64
TotalRuntime time.Duration
MinRuntime time.Duration
MaxRuntime time.Duration
ErrorCount int64
}
// AverageRuntime returns the average run time for the action.
func (s *ActionStats) AverageRuntime() time.Duration {
return time.Duration(int64(s.TotalRuntime) / s.Count)
}
// Record records the current time against the provided start time
// and updates the stats accordingly.
func (s *ActionStats) Record(st time.Time, err error) {
thisRuntime := time.Since(st)
s.TotalRuntime += thisRuntime
if thisRuntime > s.MaxRuntime {
s.MaxRuntime = thisRuntime
}
if s.Count == 0 || thisRuntime < s.MinRuntime {
s.MinRuntime = thisRuntime
}
s.Count++
if err != nil {
s.ErrorCount++
}
}
func (stats *Stats) getLifetimeSeconds() int64 {
return durationToSec(time.Since(stats.CreationTime))
}
func durationToSec(dur time.Duration) int64 {
return int64(dur.Round(time.Second).Seconds())
}
func (s *ActionStats) avgRuntimeString() string {
if s.Count == 0 {
return "--"
}
return fmt.Sprintf("%vs", durationToSec(s.AverageRuntime()))
}
func (e *Engine) getTimestampS() int64 {
return e.getRuntimeSeconds()
}
func (e *Engine) getRuntimeSeconds() int64 {
return durationToSec(e.CumulativeStats.RunTime + time.Since(e.RunStats.CreationTime))
}

View File

@@ -0,0 +1,19 @@
// +build darwin,amd64 linux,amd64
package engine
import (
"syscall"
)
func getFreeSpaceB(path string) (uint64, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(path, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
return stat.Bavail * uint64(stat.Bsize), nil
}

View File

@@ -5,9 +5,13 @@
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"path"
"testing"
"time"
"github.com/kopia/kopia/tests/robustness/engine"
"github.com/kopia/kopia/tests/tools/fio"
@@ -17,38 +21,62 @@
var eng *engine.Engine
const (
fsDataPath = "/tmp/robustness-data"
fsMetadataPath = "/tmp/robustness-metadata"
s3DataPath = "robustness-data"
s3MetadataPath = "robustness-metadata"
dataSubPath = "robustness-data"
metadataSubPath = "robustness-metadata"
defaultTestDur = 5 * time.Minute
)
var (
randomizedTestDur = flag.Duration("rand-test-duration", defaultTestDur, "Set the duration for the randomized test")
repoPathPrefix = flag.String("repo-path-prefix", "", "Point the robustness tests at this path prefix")
)
func TestMain(m *testing.M) {
flag.Parse()
var err error
eng, err = engine.NewEngine("")
if err != nil {
log.Println("skipping robustness tests:", err)
if err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet) {
os.Exit(0)
}
switch {
case err == kopiarunner.ErrExeVariableNotSet || errors.Is(err, fio.ErrEnvNotSet):
fmt.Println("Skipping robustness tests if KOPIA_EXE is not set")
os.Exit(0)
case err != nil:
fmt.Printf("error on engine creation: %s\n", err.Error())
os.Exit(1)
}
switch {
case os.Getenv(engine.S3BucketNameEnvKey) != "":
eng.InitS3(context.Background(), s3DataPath, s3MetadataPath)
default:
eng.InitFilesystem(context.Background(), fsDataPath, fsMetadataPath)
dataRepoPath := path.Join(*repoPathPrefix, dataSubPath)
metadataRepoPath := path.Join(*repoPathPrefix, metadataSubPath)
// Try to reconcile metadata if it is out of sync with the repo state
eng.Checker.RecoveryMode = true
// Initialize the engine, connecting it to the repositories
err = eng.Init(context.Background(), dataRepoPath, metadataRepoPath)
if err != nil {
// Clean the temporary dirs from the file system, don't write out the
// metadata, in case there was an issue loading it
eng.CleanComponents()
fmt.Printf("error initializing engine for S3: %s\n", err.Error())
os.Exit(1)
}
// Restore a random snapshot into the data directory
_, err = eng.ExecAction(engine.RestoreIntoDataDirectoryActionKey, nil)
if err != nil && err != engine.ErrNoOp {
eng.Cleanup()
fmt.Printf("error restoring into the data directory: %s\n", err.Error())
os.Exit(1)
}
result := m.Run()
err = eng.Cleanup()
if err != nil {
panic(err)
log.Printf("error cleaning up the engine: %s\n", err.Error())
os.Exit(2)
}
os.Exit(result)

View File

@@ -3,77 +3,112 @@
package robustness
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/kopia/kopia/tests/robustness/engine"
"github.com/kopia/kopia/tests/testenv"
"github.com/kopia/kopia/tests/tools/fio"
)
func TestManySmallFiles(t *testing.T) {
fileSize := int64(4096)
numFiles := 100
fileSize := 4096
numFiles := 10000
fioOpt := fio.Options{}.WithFileSize(fileSize).WithNumFiles(numFiles).WithBlockSize(4096)
fileWriteOpts := map[string]string{
engine.MaxDirDepthField: strconv.Itoa(1),
engine.MaxFileSizeField: strconv.Itoa(fileSize),
engine.MinFileSizeField: strconv.Itoa(fileSize),
engine.MaxNumFilesPerWriteField: strconv.Itoa(numFiles),
engine.MinNumFilesPerWriteField: strconv.Itoa(numFiles),
}
err := eng.FileWriter.WriteFiles("", fioOpt)
_, err := eng.ExecAction(engine.WriteRandomFilesActionKey, fileWriteOpts)
testenv.AssertNoError(t, err)
ctx := context.TODO()
snapID, err := eng.Checker.TakeSnapshot(ctx, eng.FileWriter.LocalDataDir)
snapOut, err := eng.ExecAction(engine.SnapshotRootDirActionKey, nil)
testenv.AssertNoError(t, err)
output, err := ioutil.TempFile("", t.Name())
testenv.AssertNoError(t, err)
defer output.Close()
err = eng.Checker.RestoreSnapshot(ctx, snapID, output)
_, err = eng.ExecAction(engine.RestoreSnapshotActionKey, snapOut)
testenv.AssertNoError(t, err)
}
func TestModifyWorkload(t *testing.T) {
const (
numSnapshots = 10
numDirs = 10
maxOpsPerMod = 5
)
func TestOneLargeFile(t *testing.T) {
fileSize := 40 * 1024 * 1024
numFiles := 1
numFiles := 10
writeSize := int64(65536 * numFiles)
fioOpt := fio.Options{}.
WithDedupePercentage(35).
WithRandRepeat(false).
WithBlockSize(4096).
WithFileSize(writeSize).
WithNumFiles(numFiles)
fileWriteOpts := map[string]string{
engine.MaxDirDepthField: strconv.Itoa(1),
engine.MaxFileSizeField: strconv.Itoa(fileSize),
engine.MinFileSizeField: strconv.Itoa(fileSize),
engine.MaxNumFilesPerWriteField: strconv.Itoa(numFiles),
engine.MinNumFilesPerWriteField: strconv.Itoa(numFiles),
}
var resultIDs []string
_, err := eng.ExecAction(engine.WriteRandomFilesActionKey, fileWriteOpts)
testenv.AssertNoError(t, err)
ctx := context.Background()
snapOut, err := eng.ExecAction(engine.SnapshotRootDirActionKey, nil)
testenv.AssertNoError(t, err)
for snapNum := 0; snapNum < numSnapshots; snapNum++ {
opsThisLoop := rand.Intn(maxOpsPerMod) + 1
for mod := 0; mod < opsThisLoop; mod++ {
dirIdxToMod := rand.Intn(numDirs)
writeToDir := filepath.Join(t.Name(), fmt.Sprintf("dir%d", dirIdxToMod))
_, err = eng.ExecAction(engine.RestoreSnapshotActionKey, snapOut)
testenv.AssertNoError(t, err)
}
err := eng.FileWriter.WriteFiles(writeToDir, fioOpt)
testenv.AssertNoError(t, err)
func TestManySmallFilesAcrossDirecoryTree(t *testing.T) {
// TODO: Test takes too long - need to address performance issues with fio writes
fileSize := 4096
numFiles := 1000
filesPerWrite := 10
actionRepeats := numFiles / filesPerWrite
fileWriteOpts := map[string]string{
engine.MaxDirDepthField: strconv.Itoa(15),
engine.MaxFileSizeField: strconv.Itoa(fileSize),
engine.MinFileSizeField: strconv.Itoa(fileSize),
engine.MaxNumFilesPerWriteField: strconv.Itoa(filesPerWrite),
engine.MinNumFilesPerWriteField: strconv.Itoa(filesPerWrite),
engine.ActionRepeaterField: strconv.Itoa(actionRepeats),
}
_, err := eng.ExecAction(engine.WriteRandomFilesActionKey, fileWriteOpts)
testenv.AssertNoError(t, err)
snapOut, err := eng.ExecAction(engine.SnapshotRootDirActionKey, nil)
testenv.AssertNoError(t, err)
_, err = eng.ExecAction(engine.RestoreSnapshotActionKey, snapOut)
testenv.AssertNoError(t, err)
}
func TestRandomizedSmall(t *testing.T) {
st := time.Now()
opts := engine.ActionOpts{
engine.ActionControlActionKey: map[string]string{
string(engine.SnapshotRootDirActionKey): strconv.Itoa(2),
string(engine.RestoreSnapshotActionKey): strconv.Itoa(2),
string(engine.DeleteRandomSnapshotActionKey): strconv.Itoa(1),
string(engine.WriteRandomFilesActionKey): strconv.Itoa(8),
string(engine.DeleteRandomSubdirectoryActionKey): strconv.Itoa(1),
},
engine.WriteRandomFilesActionKey: map[string]string{
engine.IOLimitPerWriteAction: fmt.Sprintf("%d", 512*1024*1024),
engine.MaxNumFilesPerWriteField: strconv.Itoa(100),
engine.MaxFileSizeField: strconv.Itoa(64 * 1024 * 1024),
engine.MaxDirDepthField: strconv.Itoa(3),
},
}
for time.Since(st) <= *randomizedTestDur {
err := eng.RandomAction(opts)
if err == engine.ErrNoOp {
t.Log("Random action resulted in no-op")
err = nil
}
snapID, err := eng.Checker.TakeSnapshot(ctx, eng.FileWriter.LocalDataDir)
testenv.AssertNoError(t, err)
resultIDs = append(resultIDs, snapID)
}
for _, snapID := range resultIDs {
err := eng.Checker.RestoreSnapshot(ctx, snapID, nil)
testenv.AssertNoError(t, err)
}
}

View File

@@ -10,6 +10,7 @@ type Snapshotter interface {
CreateSnapshot(sourceDir string) (snapID string, err error)
RestoreSnapshot(snapID string, restoreDir string) error
DeleteSnapshot(snapID string) error
RunGC() error
ListSnapshots() ([]string, error)
Run(args ...string) (stdout, stderr string, err error)
}

View File

@@ -0,0 +1,36 @@
package snapmeta
// Index is a map of index name to the keys associated
// with that index name.
type Index map[string]map[string]struct{}
// AddToIndex adds a key to the index of the given name.
func (idx Index) AddToIndex(key, indexName string) {
if _, ok := idx[indexName]; !ok {
idx[indexName] = make(map[string]struct{})
}
idx[indexName][key] = struct{}{}
}
// RemoveFromIndex removes a key from the index of the given name.
func (idx Index) RemoveFromIndex(key, indexName string) {
if _, ok := idx[indexName]; !ok {
return
}
delete(idx[indexName], key)
}
// GetKeys returns the list of keys associated with the given index name.
func (idx Index) GetKeys(indexName string) (ret []string) {
if _, ok := idx[indexName]; !ok {
return ret
}
for k := range idx[indexName] {
ret = append(ret, k)
}
return ret
}

View File

@@ -0,0 +1,32 @@
package snapmeta
import (
"testing"
)
func TestIndex(t *testing.T) {
idx := Index{}
const (
snapshotIndexName = "snapshotIndex"
snapIDKey = "snapID1"
)
idx.AddToIndex(snapIDKey, snapshotIndexName)
keys := idx.GetKeys(snapshotIndexName)
if got, want := len(keys), 1; got != want {
t.Fatalf("expected %v keys but got %v", want, got)
}
if got, want := keys[0], snapIDKey; got != want {
t.Fatalf("expected %v but got %v", want, got)
}
idx.RemoveFromIndex(snapIDKey, snapshotIndexName)
keys = idx.GetKeys(snapshotIndexName)
if got, want := len(keys), 0; got != want {
t.Fatalf("expected %v keys but got %v", want, got)
}
}

View File

@@ -16,6 +16,7 @@
type kopiaMetadata struct {
*Simple
localMetadataDir string
persistenceDir string
snap *kopiarunner.KopiaSnapshotter
}
@@ -31,8 +32,14 @@ func New(baseDir string) (Persister, error) {
return nil, err
}
persistenceDir, err := ioutil.TempDir(localDir, "kopia-persistence-root")
if err != nil {
return nil, err
}
return &kopiaMetadata{
localMetadataDir: localDir,
persistenceDir: persistenceDir,
Simple: NewSimple(),
snap: snap,
}, nil
@@ -61,6 +68,8 @@ func (store *kopiaMetadata) ConnectOrCreateFilesystem(path string) error {
return store.snap.ConnectOrCreateFilesystem(path)
}
const metadataStoreFileName = "metadata-store-latest"
// LoadMetadata implements the DataPersister interface, restores the latest
// snapshot from the kopia repository and decodes its contents, populating
// its metadata on the snapshots residing in the target test repository.
@@ -76,21 +85,21 @@ func (store *kopiaMetadata) LoadMetadata() error {
lastSnapID := snapIDs[len(snapIDs)-1]
restorePath := filepath.Join(store.localMetadataDir, "kopia-metadata-latest")
err = store.snap.RestoreSnapshot(lastSnapID, restorePath)
err = store.snap.RestoreSnapshot(lastSnapID, store.persistenceDir)
if err != nil {
return err
}
defer os.Remove(restorePath) //nolint:errcheck
metadataPath := filepath.Join(store.persistenceDir, metadataStoreFileName)
f, err := os.Open(restorePath) //nolint:gosec
defer os.Remove(metadataPath) //nolint:errcheck
f, err := os.Open(metadataPath) //nolint:gosec
if err != nil {
return err
}
err = json.NewDecoder(f).Decode(&(store.Simple.m))
err = json.NewDecoder(f).Decode(&(store.Simple))
if err != nil {
return err
}
@@ -98,26 +107,34 @@ func (store *kopiaMetadata) LoadMetadata() error {
return nil
}
// GetPersistDir returns the path to the directory that will be persisted
// as a snapshot to the kopia repository.
func (store *kopiaMetadata) GetPersistDir() string {
return store.persistenceDir
}
// FlushMetadata implements the DataPersister interface, flushing the local
// metadata on the target test repo's snapshots to the metadata Kopia repository
// as a snapshot create.
func (store *kopiaMetadata) FlushMetadata() error {
f, err := ioutil.TempFile(store.localMetadataDir, "kopia-metadata-")
metadataPath := filepath.Join(store.persistenceDir, metadataStoreFileName)
f, err := os.Create(metadataPath)
if err != nil {
return err
}
defer func() {
f.Close() //nolint:errcheck
os.Remove(f.Name()) //nolint:errcheck
f.Close() //nolint:errcheck
os.Remove(metadataPath) //nolint:errcheck
}()
err = json.NewEncoder(f).Encode(store.Simple.m)
err = json.NewEncoder(f).Encode(store.Simple)
if err != nil {
return err
}
_, err = store.snap.CreateSnapshot(f.Name())
_, err = store.snap.CreateSnapshot(store.persistenceDir)
if err != nil {
return err
}

View File

@@ -1,6 +1,12 @@
package snapmeta
import "sync"
import (
"errors"
"sync"
)
// ErrKeyNotFound is returned when the store can't find the key provided.
var ErrKeyNotFound = errors.New("key not found")
var _ Store = &Simple{}
@@ -8,15 +14,17 @@
// snapshot metadata as a byte slice in a map in memory.
// A Simple should not be copied.
type Simple struct {
m map[string][]byte
mu sync.Mutex
Data map[string][]byte `json:"data"`
Idx Index `json:"idx"`
mu sync.Mutex
}
// NewSimple instantiates a new Simple snapstore and
// returns its pointer.
func NewSimple() *Simple {
return &Simple{
m: make(map[string][]byte),
Data: make(map[string][]byte),
Idx: Index(make(map[string]map[string]struct{})),
}
}
@@ -28,7 +36,7 @@ func (s *Simple) Store(key string, val []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
s.m[key] = buf
s.Data[key] = buf
return nil
}
@@ -38,14 +46,14 @@ func (s *Simple) Load(key string) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
if buf, found := s.m[key]; found {
if buf, found := s.Data[key]; found {
retBuf := make([]byte, len(buf))
_ = copy(retBuf, buf)
return retBuf, nil
}
return nil, nil
return nil, ErrKeyNotFound
}
// Delete implements the Storer interface Delete method.
@@ -53,19 +61,29 @@ func (s *Simple) Delete(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.m, key)
delete(s.Data, key)
}
// GetKeys implements the Storer interface GetKeys method.
func (s *Simple) GetKeys() []string {
// AddToIndex implements the Storer interface AddToIndex method.
func (s *Simple) AddToIndex(key, indexName string) {
s.mu.Lock()
defer s.mu.Unlock()
ret := make([]string, 0, len(s.m))
for k := range s.m {
ret = append(ret, k)
}
return ret
s.Idx.AddToIndex(key, indexName)
}
// RemoveFromIndex implements the Indexer interface RemoveFromIndex method.
func (s *Simple) RemoveFromIndex(key, indexName string) {
s.mu.Lock()
defer s.mu.Unlock()
s.Idx.RemoveFromIndex(key, indexName)
}
// GetKeys implements the Indexer interface GetKeys method.
func (s *Simple) GetKeys(indexName string) []string {
s.mu.Lock()
defer s.mu.Unlock()
return s.Idx.GetKeys(indexName)
}

View File

@@ -0,0 +1,25 @@
package snapmeta
import (
"testing"
)
func TestSimpleWithIndex(t *testing.T) {
simple := NewSimple()
storeKey := "key-to-store"
data := []byte("some stored data")
simple.Store(storeKey, data)
idxName := "index-name"
simple.AddToIndex(storeKey, idxName)
idxKeys := simple.GetKeys(idxName)
if got, want := len(idxKeys), 1; got != want {
t.Fatalf("expected %v keys but got %v", want, got)
}
if got, want := idxKeys[0], storeKey; got != want {
t.Fatalf("expected key %v but got %v", want, got)
}
}

View File

@@ -10,7 +10,14 @@ type Store interface {
Store(key string, val []byte) error
Load(key string) ([]byte, error)
Delete(key string)
GetKeys() []string
Indexer
}
// Indexer describes methods surrounding categorization of keys via a named index.
type Indexer interface {
AddToIndex(key, indexName string)
RemoveFromIndex(key, indexName string)
GetKeys(indexName string) (ret []string)
}
// Persister describes the ability to flush metadata
@@ -20,5 +27,6 @@ type Persister interface {
snap.RepoManager
LoadMetadata() error
FlushMetadata() error
GetPersistDir() string
Cleanup()
}

View File

@@ -38,6 +38,8 @@ func NewWalkCompare() *WalkCompare {
filterFileTimeDiffs,
isRootDirectoryRename,
dirSizeMightBeOffByBlockSizeMultiple,
ignoreGIDIfZero,
ignoreUIDIfZero,
},
}
}
@@ -204,6 +206,28 @@ func filterFileTimeDiffs(str string, mod fswalker.ActionData) bool {
return strings.Contains(str, "ctime:") || strings.Contains(str, "atime:") || strings.Contains(str, "mtime:")
}
func ignoreGIDIfZero(str string, mod fswalker.ActionData) bool {
if !strings.Contains(str, "gid:") {
return false
}
beforeGID := mod.Before.Stat.Gid
afterGID := mod.After.Stat.Gid
return beforeGID != afterGID && beforeGID == 0
}
func ignoreUIDIfZero(str string, mod fswalker.ActionData) bool {
if !strings.Contains(str, "uid:") {
return false
}
beforeUID := mod.Before.Stat.Uid
afterUID := mod.After.Stat.Uid
return beforeUID != afterUID && beforeUID == 0
}
func validateReport(report *fswalker.Report) error {
if len(report.Modified) > 0 {
return errors.New("files were modified")