mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 06:34:46 -04:00
Add JSON output flag for snapshot verify (#4644)
This commit is contained in:
@@ -26,6 +26,9 @@ type commandSnapshotVerify struct {
|
||||
|
||||
fileQueueLength int
|
||||
fileParallelism int
|
||||
|
||||
jo jsonOutput
|
||||
out textOutput
|
||||
}
|
||||
|
||||
func (c *commandSnapshotVerify) setup(svc appServices, parent commandParent) {
|
||||
@@ -42,6 +45,10 @@ func (c *commandSnapshotVerify) setup(svc appServices, parent commandParent) {
|
||||
cmd.Flag("file-queue-length", "Queue length for file verification").Default("20000").IntVar(&c.fileQueueLength)
|
||||
cmd.Flag("file-parallelism", "Parallelism for file verification").IntVar(&c.fileParallelism)
|
||||
cmd.Flag("verify-files-percent", "Randomly verify a percentage of files by downloading them [0.0 .. 100.0]").Default("0").Float64Var(&c.verifyCommandFilesPercent)
|
||||
|
||||
c.jo.setup(svc, cmd)
|
||||
c.out.setup(svc)
|
||||
|
||||
cmd.Action(svc.repositoryReaderAction(c.run))
|
||||
}
|
||||
|
||||
@@ -71,10 +78,26 @@ func (c *commandSnapshotVerify) run(ctx context.Context, rep repo.Repository) er
|
||||
}
|
||||
|
||||
v := snapshotfs.NewVerifier(ctx, rep, opts)
|
||||
defer v.ShowFinalStats(ctx)
|
||||
|
||||
defer func() {
|
||||
// Suppress final stats output if --json flag provided.
|
||||
if !c.jo.jsonOutput {
|
||||
v.ShowFinalStats(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
result, err := v.InParallel(ctx, c.makeVerifyWalkerFunc(ctx, rep))
|
||||
|
||||
if c.jo.jsonOutput {
|
||||
c.out.printStdout("%s\n", c.jo.jsonIndentedBytes(result, " "))
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *commandSnapshotVerify) makeVerifyWalkerFunc(ctx context.Context, rep repo.Repository) func(tw *snapshotfs.TreeWalker) error {
|
||||
return func(tw *snapshotfs.TreeWalker) error {
|
||||
manifests, err := c.loadSourceManifests(ctx, rep)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -127,7 +150,7 @@ func (c *commandSnapshotVerify) run(ctx context.Context, rep repo.Repository) er
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (c *commandSnapshotVerify) loadSourceManifests(ctx context.Context, rep repo.Repository) ([]*snapshot.Manifest, error) {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -11,6 +12,7 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/kopia/kopia/tests/testenv"
|
||||
)
|
||||
|
||||
@@ -76,4 +78,73 @@ func TestSnapshotVerify(t *testing.T) {
|
||||
|
||||
// Requesting a snapshot verify of a non-existent manifest ID results in error.
|
||||
env.RunAndExpectFailure(t, "snapshot", "verify", "not-a-manifest-id")
|
||||
|
||||
// Redo the above snapshot verify commands with the --json flag and check the outputs.
|
||||
|
||||
// Verifying everything is expected to fail.
|
||||
stdout, _ := env.RunAndExpectFailure(t, "snapshot", "verify", "--json")
|
||||
require.Len(t, stdout, 1)
|
||||
result := unmarshalSnapVerify(t, stdout[0])
|
||||
require.NotZero(t, result.ProcessedObjectCount)
|
||||
require.NotZero(t, result.ErrorCount)
|
||||
require.Len(t, result.ErrorStrings, 1)
|
||||
|
||||
// Verifying the untouched snapshot is expected to succeed.
|
||||
stdout = env.RunAndExpectSuccess(t, "snapshot", "verify", "--json", string(intactMan.ID))
|
||||
require.Len(t, stdout, 1)
|
||||
result = unmarshalSnapVerify(t, stdout[0])
|
||||
require.NotZero(t, result.ProcessedObjectCount)
|
||||
require.Zero(t, result.ErrorCount)
|
||||
require.Empty(t, result.ErrorStrings)
|
||||
|
||||
// Verifying the corrupted snapshot is expected to fail.
|
||||
stdout, _ = env.RunAndExpectFailure(t, "snapshot", "verify", "--json", string(corruptMan1.ID))
|
||||
require.Len(t, stdout, 1)
|
||||
result = unmarshalSnapVerify(t, stdout[0])
|
||||
require.NotZero(t, result.ProcessedObjectCount)
|
||||
require.NotZero(t, result.ErrorCount)
|
||||
require.Len(t, result.ErrorStrings, 1)
|
||||
|
||||
// Verifying the corrupted snapshot is expected to fail.
|
||||
stdout, _ = env.RunAndExpectFailure(t, "snapshot", "verify", "--json", string(corruptMan2.ID))
|
||||
require.Len(t, stdout, 1)
|
||||
result = unmarshalSnapVerify(t, stdout[0])
|
||||
require.NotZero(t, result.ProcessedObjectCount)
|
||||
require.NotZero(t, result.ErrorCount)
|
||||
require.Len(t, result.ErrorStrings, 1)
|
||||
|
||||
// Find one matching error corresponding to the single corrupted contents.
|
||||
stdout, stderr, err = env.Run(t, true, "snapshot", "verify", "--json", "--max-errors", "3", string(corruptMan1.ID))
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, 1, strings.Count(strings.Join(stderr, "\n"), "error processing"))
|
||||
result = unmarshalSnapVerify(t, stdout[0])
|
||||
require.NotZero(t, result.ProcessedObjectCount)
|
||||
require.Equal(t, 1, result.ErrorCount)
|
||||
require.Len(t, result.ErrorStrings, 1)
|
||||
|
||||
// Find two matching errors in the verify output, corresponding to each
|
||||
// of the two corrupted contents.
|
||||
stdout, stderr, err = env.Run(t, true, "snapshot", "verify", "--json", "--max-errors", "3", string(corruptMan2.ID))
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, 2, strings.Count(strings.Join(stderr, "\n"), "error processing"))
|
||||
result = unmarshalSnapVerify(t, stdout[0])
|
||||
require.NotZero(t, result.ProcessedObjectCount)
|
||||
require.Equal(t, 2, result.ErrorCount)
|
||||
require.Len(t, result.ErrorStrings, 2)
|
||||
|
||||
// Requesting a snapshot verify of a non-existent manifest ID results in error.
|
||||
stdout, _ = env.RunAndExpectFailure(t, "snapshot", "verify", "--json", "not-a-manifest-id")
|
||||
result = unmarshalSnapVerify(t, stdout[0])
|
||||
require.Zero(t, result.ProcessedObjectCount)
|
||||
require.Equal(t, 1, result.ErrorCount)
|
||||
require.Len(t, result.ErrorStrings, 1)
|
||||
}
|
||||
|
||||
func unmarshalSnapVerify(t *testing.T, stdout string) *snapshotfs.VerifierResult {
|
||||
t.Helper()
|
||||
|
||||
r := &snapshotfs.VerifierResult{}
|
||||
require.NoError(t, json.Unmarshal([]byte(stdout), r))
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -58,6 +58,15 @@ func (w *TreeWalker) ReportError(ctx context.Context, entryPath string, err erro
|
||||
w.numErrors++
|
||||
}
|
||||
|
||||
// GetErrors returns a copy of the list of errors found during the tree walk, as well
|
||||
// as the total count of errors encountered.
|
||||
func (w *TreeWalker) GetErrors() (errs []error, numErrors int) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
return append([]error{}, w.errors...), w.numErrors
|
||||
}
|
||||
|
||||
// Err returns the error encountered when walking the tree.
|
||||
func (w *TreeWalker) Err() error {
|
||||
w.mu.Lock()
|
||||
|
||||
@@ -131,16 +131,24 @@ type VerifierOptions struct {
|
||||
BlobMap map[blob.ID]blob.Metadata
|
||||
}
|
||||
|
||||
// VerifierResult returns results from the verifier.
|
||||
type VerifierResult struct {
|
||||
ProcessedObjectCount int `json:"processedObjectCount"`
|
||||
ErrorCount int `json:"errorCount"`
|
||||
Errors []error `json:"-"`
|
||||
ErrorStrings []string `json:"errorStrings,omitempty"`
|
||||
}
|
||||
|
||||
// InParallel starts parallel verification and invokes the provided function which can
|
||||
// call Process() on in the provided TreeWalker.
|
||||
func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker) error) error {
|
||||
func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker) error) (*VerifierResult, error) {
|
||||
tw, twerr := NewTreeWalker(ctx, TreeWalkerOptions{
|
||||
Parallelism: v.opts.Parallelism,
|
||||
EntryCallback: v.verifyObject,
|
||||
MaxErrors: v.opts.MaxErrors,
|
||||
})
|
||||
if twerr != nil {
|
||||
return errors.Wrap(twerr, "tree walker")
|
||||
return nil, errors.Wrap(twerr, "tree walker")
|
||||
}
|
||||
defer tw.Close(ctx)
|
||||
|
||||
@@ -170,11 +178,33 @@ func (v *Verifier) InParallel(ctx context.Context, enqueue func(tw *TreeWalker)
|
||||
v.workersWG.Wait()
|
||||
v.fileWorkQueue = nil
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
twErrs, numErrors := tw.GetErrors()
|
||||
|
||||
errStrs := make([]string, 0, len(twErrs))
|
||||
for _, twErr := range twErrs {
|
||||
errStrs = append(errStrs, twErr.Error())
|
||||
}
|
||||
|
||||
return tw.Err()
|
||||
result := &VerifierResult{
|
||||
ProcessedObjectCount: int(v.processed.Load()),
|
||||
ErrorCount: numErrors,
|
||||
Errors: twErrs,
|
||||
ErrorStrings: errStrs,
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// In some circumstances, the enqueue function may return an error itself, for instance
|
||||
// if it failed to resolve the snapshot manifest from the ID.
|
||||
// Append that error to the result output and return.
|
||||
result.Errors = append(result.Errors, err)
|
||||
result.ErrorStrings = append(result.ErrorStrings, err.Error())
|
||||
result.ErrorCount++
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Otherwise return the tree walker error output along with result details.
|
||||
return result, tw.Err()
|
||||
}
|
||||
|
||||
// NewVerifier creates a verifier.
|
||||
|
||||
@@ -61,22 +61,36 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
|
||||
someErr := errors.New("some error")
|
||||
|
||||
require.ErrorIs(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err := v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
return someErr
|
||||
}), someErr)
|
||||
})
|
||||
require.ErrorIs(t, err, someErr)
|
||||
require.Equal(t, 1, result.ErrorCount)
|
||||
require.Len(t, result.Errors, 1)
|
||||
require.ErrorIs(t, result.Errors[0], someErr)
|
||||
|
||||
require.ErrorIs(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
return someErr
|
||||
}), someErr)
|
||||
})
|
||||
require.ErrorIs(t, err, someErr)
|
||||
require.Equal(t, 1, result.ErrorCount)
|
||||
require.Len(t, result.Errors, 1)
|
||||
require.ErrorIs(t, result.Errors[0], someErr)
|
||||
|
||||
require.NoError(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
return nil
|
||||
}))
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, result.ErrorCount)
|
||||
require.Empty(t, result.Errors)
|
||||
|
||||
require.NoError(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}))
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, result.ErrorCount)
|
||||
require.Empty(t, result.Errors)
|
||||
})
|
||||
|
||||
t.Run("FullFileReadsAndBlobMap", func(t *testing.T) {
|
||||
@@ -93,10 +107,13 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
|
||||
v := snapshotfs.NewVerifier(ctx, te2, opts)
|
||||
|
||||
require.NoError(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err := v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}))
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, result.ErrorCount)
|
||||
require.Empty(t, result.Errors)
|
||||
|
||||
// now remove all 'p' blobs from the blob map
|
||||
for k := range opts.BlobMap {
|
||||
@@ -105,10 +122,17 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
require.ErrorContains(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}), "encountered 3 errors")
|
||||
})
|
||||
require.ErrorContains(t, err, "encountered 3 errors")
|
||||
require.Equal(t, 3, result.ErrorCount)
|
||||
require.Len(t, result.Errors, 3)
|
||||
|
||||
for _, err := range result.Errors {
|
||||
require.ErrorContains(t, err, "is backed by missing blob")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("MaxErrors", func(t *testing.T) {
|
||||
@@ -125,10 +149,15 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
|
||||
v := snapshotfs.NewVerifier(ctx, te2, opts)
|
||||
|
||||
require.NoError(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
var result *snapshotfs.VerifierResult
|
||||
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}))
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, result.ErrorCount)
|
||||
require.Empty(t, result.Errors)
|
||||
|
||||
// now remove all 'p' blobs from the blob map
|
||||
for k := range opts.BlobMap {
|
||||
@@ -138,10 +167,14 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
}
|
||||
|
||||
// we have 3 errors but max==1
|
||||
require.ErrorContains(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}), "is backed by missing blob")
|
||||
})
|
||||
require.ErrorContains(t, err, "is backed by missing blob")
|
||||
require.Equal(t, 1, result.ErrorCount)
|
||||
require.Len(t, result.Errors, 1)
|
||||
require.ErrorContains(t, result.Errors[0], "is backed by missing blob")
|
||||
})
|
||||
|
||||
t.Run("FullFileReadsNoBlobMap", func(t *testing.T) {
|
||||
@@ -151,10 +184,13 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
}
|
||||
v := snapshotfs.NewVerifier(ctx, te2, opts)
|
||||
|
||||
require.NoError(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err := v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}))
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, result.ErrorCount)
|
||||
require.Empty(t, result.Errors)
|
||||
|
||||
blobs, err := blob.ListAllBlobs(ctx, te.RepositoryWriter.BlobReader(), "p")
|
||||
require.NoError(t, err)
|
||||
@@ -163,9 +199,17 @@ func TestSnapshotVerifier(t *testing.T) {
|
||||
require.NoError(t, te.RepositoryWriter.BlobStorage().DeleteBlob(ctx, bm.BlobID))
|
||||
}
|
||||
|
||||
require.ErrorContains(t, v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
result, err = v.InParallel(ctx, func(tw *snapshotfs.TreeWalker) error {
|
||||
tw.Process(ctx, snapshotfs.DirectoryEntry(te.Repository, obj1, nil), ".")
|
||||
return nil
|
||||
}), "encountered 3 errors")
|
||||
})
|
||||
|
||||
require.ErrorContains(t, err, "encountered 3 errors")
|
||||
require.Equal(t, 3, result.ErrorCount)
|
||||
require.Len(t, result.Errors, 3)
|
||||
|
||||
for _, err := range result.Errors {
|
||||
require.ErrorContains(t, err, "BLOB not found")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user