diff --git a/cli/command_content_verify.go b/cli/command_content_verify.go index 0048e7dd6..95375ad15 100644 --- a/cli/command_content_verify.go +++ b/cli/command_content_verify.go @@ -2,7 +2,6 @@ import ( "context" - "math/rand" "sync" "sync/atomic" "time" @@ -11,7 +10,6 @@ "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" ) @@ -38,28 +36,14 @@ func (c *commandContentVerify) setup(svc appServices, parent commandParent) { } func (c *commandContentVerify) run(ctx context.Context, rep repo.DirectRepository) error { - downloadPercent := c.contentVerifyPercent - - if c.contentVerifyFull { - downloadPercent = 100.0 - } - - blobMap, err := blob.ReadBlobMap(ctx, rep.BlobReader()) - if err != nil { - return errors.Wrap(err, "unable to read blob map") - } - var ( - verifiedCount atomic.Int32 - successCount atomic.Int32 - errorCount atomic.Int32 - totalCount atomic.Int32 + totalCount atomic.Int32 + + wg sync.WaitGroup ) subctx, cancel := context.WithCancel(ctx) - var wg sync.WaitGroup - // ensure we cancel estimation goroutine and wait for it before returning defer func() { cancel() @@ -74,56 +58,50 @@ func (c *commandContentVerify) run(ctx context.Context, rep repo.DirectRepositor c.getTotalContentCount(subctx, rep, &totalCount) }() - log(ctx).Info("Verifying all contents...") - rep.DisableIndexRefresh() - throttle := new(timetrack.Throttle) + var throttle timetrack.Throttle + est := timetrack.Start() - if err := rep.ContentReader().IterateContents(ctx, content.IterateOptions{ - Range: c.contentRange.contentIDRange(), - Parallel: c.contentVerifyParallel, - IncludeDeleted: c.contentVerifyIncludeDeleted, - }, func(ci content.Info) error { - if err := c.contentVerify(ctx, rep.ContentReader(), ci, blobMap, downloadPercent); err != nil { - log(ctx).Errorf("error %v", err) - errorCount.Add(1) - } else { - successCount.Add(1) - } + if c.contentVerifyFull { + c.contentVerifyPercent = 100.0 + } - verifiedCount.Add(1) + opts := content.VerifyOptions{ + ContentIDRange: c.contentRange.contentIDRange(), + ContentReadPercentage: c.contentVerifyPercent, + IncludeDeletedContents: c.contentVerifyIncludeDeleted, + ContentIterateParallelism: c.contentVerifyParallel, + ProgressCallbackInterval: 1, - if throttle.ShouldOutput(c.progressInterval) { - timings, ok := est.Estimate(float64(verifiedCount.Load()), float64(totalCount.Load())) + ProgressCallback: func(vps content.VerifyProgressStats) { + if !throttle.ShouldOutput(c.progressInterval) { + return + } + + verifiedCount := vps.SuccessCount + vps.ErrorCount + timings, ok := est.Estimate(float64(verifiedCount), float64(totalCount.Load())) if ok { log(ctx).Infof(" Verified %v of %v contents (%.1f%%), %v errors, remaining %v, ETA %v", - verifiedCount.Load(), + verifiedCount, totalCount.Load(), timings.PercentComplete, - errorCount.Load(), + vps.ErrorCount, timings.Remaining, formatTimestamp(timings.EstimatedEndTime), ) } else { - log(ctx).Infof(" Verified %v contents, %v errors, estimating...", verifiedCount.Load(), errorCount.Load()) + log(ctx).Infof(" Verified %v contents, %v errors, estimating...", verifiedCount, vps.ErrorCount) } - } - - return nil - }); err != nil { - return errors.Wrap(err, "iterate contents") + }, } - log(ctx).Infof("Finished verifying %v contents, found %v errors.", verifiedCount.Load(), errorCount.Load()) - - ec := errorCount.Load() - if ec == 0 { - return nil + if err := rep.ContentReader().VerifyContents(ctx, opts); err != nil { + return errors.Wrap(err, "verify contents") } - return errors.Errorf("encountered %v errors", ec) + return nil } func (c *commandContentVerify) getTotalContentCount(ctx context.Context, rep repo.DirectRepository, totalCount *atomic.Int32) { @@ -146,25 +124,3 @@ func (c *commandContentVerify) getTotalContentCount(ctx context.Context, rep rep totalCount.Store(tc) } - -func (c *commandContentVerify) contentVerify(ctx context.Context, r content.Reader, ci content.Info, blobMap map[blob.ID]blob.Metadata, downloadPercent float64) error { - bi, ok := blobMap[ci.PackBlobID] - if !ok { - return errors.Errorf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID) - } - - if int64(ci.PackOffset+ci.PackedLength) > bi.Length { - return errors.Errorf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID) - } - - //nolint:gosec - if 100*rand.Float64() < downloadPercent { - if _, err := r.GetContent(ctx, ci.ContentID); err != nil { - return errors.Wrapf(err, "content %v is invalid", ci.ContentID) - } - - return nil - } - - return nil -} diff --git a/repo/content/content_reader.go b/repo/content/content_reader.go index 91af67b71..4fc736709 100644 --- a/repo/content/content_reader.go +++ b/repo/content/content_reader.go @@ -20,4 +20,5 @@ type Reader interface { IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error) EpochManager(ctx context.Context) (*epoch.Manager, bool, error) + VerifyContents(ctx context.Context, o VerifyOptions) error } diff --git a/repo/content/verify.go b/repo/content/verify.go new file mode 100644 index 000000000..0ee774cf9 --- /dev/null +++ b/repo/content/verify.go @@ -0,0 +1,156 @@ +package content + +import ( + "context" + "math/rand" + "sync/atomic" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/logging" +) + +// VerifyOptions allows specifying the optional arguments for WriteManager.VerifyContent. +type VerifyOptions struct { + ContentIDRange IDRange // defaults to AllIDs when not specified + ContentReadPercentage float64 + IncludeDeletedContents bool + + ContentIterateParallelism int + + ProgressCallback func(VerifyProgressStats) + // Number of contents that need to be processed between calls to ProgressCallback. + // For example, with a ProgressCallbackInterval of 1000, ProgressCallback + // is called once for every 1000 contents that are processed. + ProgressCallbackInterval uint32 +} + +// VerifyProgressStats contains progress counters that are passed to the +// progress callback used in WriteManager.VerifyContent. +type VerifyProgressStats struct { + ErrorCount uint32 + SuccessCount uint32 +} + +// VerifyContents checks whether contents are backed by valid blobs. +func (bm *WriteManager) VerifyContents(ctx context.Context, o VerifyOptions) error { + var v contentVerifier + + return v.verifyContents(ctx, bm, o) +} + +var errMissingPacks = errors.New("the repository is corrupted, it is missing pack blobs with index-referenced content") + +type contentVerifier struct { + bm *WriteManager + + existingPacks map[blob.ID]blob.Metadata + + progressCallback func(VerifyProgressStats) + progressCallbackInterval uint32 + + contentReadProbability float64 + + // content verification stats + successCount atomic.Uint32 + errorCount atomic.Uint32 + + verifiedCount atomic.Uint32 // used for calling the progress callback at the specified interval. + + log logging.Logger +} + +func (v *contentVerifier) verifyContents(ctx context.Context, bm *WriteManager, o VerifyOptions) error { + existingPacks, err := blob.ReadBlobMap(ctx, bm.st) + if err != nil { + return errors.Wrap(err, "unable to get blob metadata map") + } + + v.log = logging.Module("content/verify")(ctx) + v.bm = bm + v.existingPacks = existingPacks + v.progressCallback = o.ProgressCallback + v.contentReadProbability = max(o.ContentReadPercentage/100, 0) //nolint:mnd + + if o.ProgressCallback != nil { + v.progressCallbackInterval = o.ProgressCallbackInterval + } + + v.log.Info("Verifying contents...") + + itOpts := IterateOptions{ + Range: o.ContentIDRange, + Parallel: o.ContentIterateParallelism, + IncludeDeleted: o.IncludeDeletedContents, + } + + cb := func(ci Info) error { + v.verify(ctx, ci) + + return nil + } + + err = bm.IterateContents(ctx, itOpts, cb) + + ec := v.errorCount.Load() + contentCount := v.successCount.Load() + ec + + v.log.Infof("Finished verifying %v contents, found %v errors.", contentCount, ec) + + if err != nil { + return err + } + + if ec != 0 { + return errors.Wrapf(errMissingPacks, "encountered %v errors", ec) + } + + return nil +} + +// verifies a content, updates the corresponding counter stats and it may call +// the progress callback. +func (v *contentVerifier) verify(ctx context.Context, ci Info) { + v.verifyContentImpl(ctx, ci) + + count := v.verifiedCount.Add(1) + + if v.progressCallbackInterval > 0 && count%v.progressCallbackInterval == 0 { + s := VerifyProgressStats{ + SuccessCount: v.successCount.Load(), + ErrorCount: v.errorCount.Load(), + } + + v.progressCallback(s) + } +} + +func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) { + bi, found := v.existingPacks[ci.PackBlobID] + if !found { + v.errorCount.Add(1) + v.log.Warnf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID) + + return + } + + if int64(ci.PackOffset+ci.PackedLength) > bi.Length { + v.errorCount.Add(1) + v.log.Warnf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID) + + return + } + + //nolint:gosec + if v.contentReadProbability > 0 && rand.Float64() < v.contentReadProbability { + if _, err := v.bm.GetContent(ctx, ci.ContentID); err != nil { + v.errorCount.Add(1) + v.log.Warnf("content %v is invalid: %v", ci.ContentID, err) + + return + } + } + + v.successCount.Add(1) +} diff --git a/repo/content/verify_test.go b/repo/content/verify_test.go new file mode 100644 index 000000000..e3e278674 --- /dev/null +++ b/repo/content/verify_test.go @@ -0,0 +1,309 @@ +package content + +import ( + "bytes" + "encoding/binary" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/epoch" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/format" +) + +func newTestingMapStorage() blob.Storage { + data := blobtesting.DataMap{} + keyTime := map[blob.ID]time.Time{} + + return blobtesting.NewMapStorage(data, keyTime, nil) +} + +// newTestWriteManager is a helper to create a WriteManager for testing. +func newTestWriteManager(t *testing.T, st blob.Storage) *WriteManager { + t.Helper() + + fp := mustCreateFormatProvider(t, &format.ContentFormat{ + Hash: "HMAC-SHA256-128", + Encryption: "AES256-GCM-HMAC-SHA256", + HMACSecret: []byte("test-hmac"), + MasterKey: []byte("0123456789abcdef0123456789abcdef"), + MutableParameters: format.MutableParameters{ + Version: 2, + EpochParameters: epoch.DefaultParameters(), + IndexVersion: index.Version2, + MaxPackSize: 1024 * 1024, // 1 MB + }, + }) + + bm, err := NewManagerForTesting(testlogging.Context(t), st, fp, nil, nil) + + require.NoError(t, err, "cannot create content write manager") + + return bm +} + +func TestVerifyContents_NoMissingPacks(t *testing.T) { + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create pack by writing contents. + _, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression) + require.NoError(t, err) + + _, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression) + require.NoError(t, err) + + require.NoError(t, bm.Flush(ctx)) + + err = bm.VerifyContents(ctx, VerifyOptions{ + ContentIterateParallelism: 1, + }) + + require.NoError(t, err, "verification should pass as the packs exists") +} + +func TestVerifyContentToPackMapping_EnsureCallbackIsCalled(t *testing.T) { + const numberOfContents = 6 + + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create numberOfContents contents + var buf [4]byte + + for i := range numberOfContents { + binary.LittleEndian.PutUint32(buf[:], uint32(i)) + _, err := bm.WriteContent(ctx, gather.FromSlice(buf[:]), "", NoCompression) + require.NoError(t, err) + } + + require.NoError(t, bm.Flush(ctx)) + + var callbackCount atomic.Uint32 // use atomic to support higher parallelism + + cb := func(st VerifyProgressStats) { + callbackCount.Add(1) + } + + // verify that the callback is called twice (every numberOfContents / 2) + err := bm.VerifyContents(ctx, VerifyOptions{ + ContentIterateParallelism: 1, + ProgressCallback: cb, + ProgressCallbackInterval: numberOfContents / 2, + }) + + require.NoError(t, err, "verification should pass as the packs exists") + require.EqualValues(t, 2, callbackCount.Load(), "unexpected callback call count") + + // Delete the pack from storage so verification fails + blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.NoError(t, st.DeleteBlob(ctx, blobs[0].BlobID)) + + callbackCount.Store(0) + + // verify the callback is called when there are errors as well. + // verify that the callback is called twice (every numberOfContents / 2) + err = bm.VerifyContents(ctx, VerifyOptions{ + ContentIterateParallelism: 1, + ProgressCallback: cb, + ProgressCallbackInterval: numberOfContents / 2, + }) + + require.Error(t, err, "verification should fail as the pack is missing") + require.EqualValues(t, 2, callbackCount.Load(), "unexpected callback call count") +} + +func TestVerifyContents_Deleted(t *testing.T) { + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create pack by writing contents. + cid, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello 1")), "", NoCompression) + + require.NoError(t, err) + require.NoError(t, bm.Flush(ctx)) + + // get pack id + blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular) + require.NoError(t, err) + require.Len(t, blobs, 1) + packId := blobs[0].BlobID + + // write another content and delete the first content + _, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello 2")), "", NoCompression) + require.NoError(t, err) + + err = bm.DeleteContent(ctx, cid) + require.NoError(t, err) + + require.NoError(t, bm.Flush(ctx)) + + err = bm.VerifyContents(ctx, VerifyOptions{ + IncludeDeletedContents: true, + }) + require.NoError(t, err, "Verification should succeed") + + // Delete the first pack from storage so verification fails + require.NoError(t, st.DeleteBlob(ctx, packId)) + + err = bm.VerifyContents(ctx, VerifyOptions{ + IncludeDeletedContents: false, + }) + require.NoError(t, err, "Verification should succeed") + + err = bm.VerifyContents(ctx, VerifyOptions{ + IncludeDeletedContents: true, + }) + require.Error(t, err, "Verification should fail when deleted contents are included and the pack for the deleted content is missing") +} + +func TestVerifyContents_TruncatedPack(t *testing.T) { + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create pack by writing contents. + _, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression) + require.NoError(t, err) + + _, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression) + require.NoError(t, err) + + require.NoError(t, bm.Flush(ctx)) + + // Truncate the pack so verification fails + blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.NoError(t, st.PutBlob(ctx, blobs[0].BlobID, gather.Bytes{}, blob.PutOptions{})) + + err = bm.VerifyContents(ctx, VerifyOptions{}) + require.Error(t, err, "Verification should fail when a 'p' pack blob is truncated") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs}) + require.Error(t, err, "Verification should fail when a 'p' pack blob is truncated and non-prefixed contents are verified") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs}) + require.NoError(t, err, "verification should succeed when a 'p' pack blob is truncated and prefixed contents are verified") +} + +func TestVerifyContents_CorruptedPack(t *testing.T) { + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create pack by writing contents. + _, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression) + require.NoError(t, err) + + _, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression) + require.NoError(t, err) + + require.NoError(t, bm.Flush(ctx)) + + // Corrupt the pack so verification fails + blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular) + require.NoError(t, err) + require.Len(t, blobs, 1) + bid := blobs[0].BlobID + + meta, err := st.GetMetadata(ctx, bid) + require.NoError(t, err) + require.NotZero(t, meta) + + bSize := meta.Length + require.NotZero(t, bSize) + + err = st.PutBlob(ctx, bid, gather.FromSlice(bytes.Repeat([]byte{1}, int(bSize))), blob.PutOptions{}) + require.NoError(t, err) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentReadPercentage: 100}) + require.Error(t, err, "Verification should fail when a 'p' pack blob is corrupted") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs, ContentReadPercentage: 100}) + require.Error(t, err, "Verification should fail when a 'p' pack blob is corrupted and non-prefixed contents are verified") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs, ContentReadPercentage: 100}) + require.NoError(t, err, "verification should succeed when a 'p' pack blob is corrupted and prefixed contents are verified") +} + +func TestVerifyContents_MissingPackP(t *testing.T) { + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create pack by writing contents. + _, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression) + require.NoError(t, err) + + _, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression) + require.NoError(t, err) + + require.NoError(t, bm.Flush(ctx)) + + // Delete pack so verification fails + blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.NoError(t, st.DeleteBlob(ctx, blobs[0].BlobID)) + + err = bm.VerifyContents(ctx, VerifyOptions{}) + require.Error(t, err, "Verification should fail when a 'p' pack blob is missing") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs}) + require.Error(t, err, "Verification should fail when a 'p' pack blob is missing and non-prefixed contents are verified") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs}) + require.NoError(t, err, "verification should succeed when a 'p' pack blob is missing and prefixed contents are verified") +} + +func TestVerifyContentToPackMapping_MissingPackQ(t *testing.T) { + st := newTestingMapStorage() + bm := newTestWriteManager(t, st) + ctx := testlogging.Context(t) + + // Create a 'p' pack by writing a non-prefixed content + _, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression) + require.NoError(t, err) + + // Create a 'q' pack by writing a prefixed content + _, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression) + require.NoError(t, err) + + require.NoError(t, bm.Flush(ctx)) + + // Delete the pack with 'q' prefix so verification fails + blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixSpecial) + require.NoError(t, err) + require.Len(t, blobs, 1) + require.NoError(t, st.DeleteBlob(ctx, blobs[0].BlobID)) + + err = bm.VerifyContents(ctx, VerifyOptions{}) + require.Error(t, err, "verification should fail when a 'q' pack blob is missing") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs}) + require.Error(t, err, "verification should fail when a 'q' pack blob is missing and prefixed contents are verified") + require.ErrorIs(t, err, errMissingPacks) + + err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs}) + require.NoError(t, err, "verification should succeed when a 'q' pack blob is missing and non-prefixed contents are verified") +}