refactor(general): move content verify functionality to the content package (#4827)

Move general functionality from the `content verify` CLI
command implementation to helpers in the content package.

The primary motivation is to allow reusing the content
verification functionality during maintenance.

A separate followup change also extends content
verification to include additional stats useful for
debugging repository corruptions.

Overview of the changes:
- Relocation of the content verification functionality
  to the content package. The entry point is
  content.WriteManager.VerifyContents.
  This is primarily code movement with no functional changes.
- Addition of unit tests for the content verification functionality
  by exercising content.WriteManager.VerifyContents.
- Minor functional change: changing the logging level from
  Error to Warn for the "inner loop" error messages. This allows
  filtering out these messages if needed, while still observing the
  error message that is logged for the overall operation.
This commit is contained in:
Julio Lopez
2025-09-17 21:44:25 -07:00
committed by GitHub
parent 516029719d
commit fb7ecee534
4 changed files with 494 additions and 72 deletions

View File

@@ -2,7 +2,6 @@
import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
@@ -11,7 +10,6 @@
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
)
@@ -38,28 +36,14 @@ func (c *commandContentVerify) setup(svc appServices, parent commandParent) {
}
func (c *commandContentVerify) run(ctx context.Context, rep repo.DirectRepository) error {
downloadPercent := c.contentVerifyPercent
if c.contentVerifyFull {
downloadPercent = 100.0
}
blobMap, err := blob.ReadBlobMap(ctx, rep.BlobReader())
if err != nil {
return errors.Wrap(err, "unable to read blob map")
}
var (
verifiedCount atomic.Int32
successCount atomic.Int32
errorCount atomic.Int32
totalCount atomic.Int32
totalCount atomic.Int32
wg sync.WaitGroup
)
subctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
// ensure we cancel estimation goroutine and wait for it before returning
defer func() {
cancel()
@@ -74,56 +58,50 @@ func (c *commandContentVerify) run(ctx context.Context, rep repo.DirectRepositor
c.getTotalContentCount(subctx, rep, &totalCount)
}()
log(ctx).Info("Verifying all contents...")
rep.DisableIndexRefresh()
throttle := new(timetrack.Throttle)
var throttle timetrack.Throttle
est := timetrack.Start()
if err := rep.ContentReader().IterateContents(ctx, content.IterateOptions{
Range: c.contentRange.contentIDRange(),
Parallel: c.contentVerifyParallel,
IncludeDeleted: c.contentVerifyIncludeDeleted,
}, func(ci content.Info) error {
if err := c.contentVerify(ctx, rep.ContentReader(), ci, blobMap, downloadPercent); err != nil {
log(ctx).Errorf("error %v", err)
errorCount.Add(1)
} else {
successCount.Add(1)
}
if c.contentVerifyFull {
c.contentVerifyPercent = 100.0
}
verifiedCount.Add(1)
opts := content.VerifyOptions{
ContentIDRange: c.contentRange.contentIDRange(),
ContentReadPercentage: c.contentVerifyPercent,
IncludeDeletedContents: c.contentVerifyIncludeDeleted,
ContentIterateParallelism: c.contentVerifyParallel,
ProgressCallbackInterval: 1,
if throttle.ShouldOutput(c.progressInterval) {
timings, ok := est.Estimate(float64(verifiedCount.Load()), float64(totalCount.Load()))
ProgressCallback: func(vps content.VerifyProgressStats) {
if !throttle.ShouldOutput(c.progressInterval) {
return
}
verifiedCount := vps.SuccessCount + vps.ErrorCount
timings, ok := est.Estimate(float64(verifiedCount), float64(totalCount.Load()))
if ok {
log(ctx).Infof(" Verified %v of %v contents (%.1f%%), %v errors, remaining %v, ETA %v",
verifiedCount.Load(),
verifiedCount,
totalCount.Load(),
timings.PercentComplete,
errorCount.Load(),
vps.ErrorCount,
timings.Remaining,
formatTimestamp(timings.EstimatedEndTime),
)
} else {
log(ctx).Infof(" Verified %v contents, %v errors, estimating...", verifiedCount.Load(), errorCount.Load())
log(ctx).Infof(" Verified %v contents, %v errors, estimating...", verifiedCount, vps.ErrorCount)
}
}
return nil
}); err != nil {
return errors.Wrap(err, "iterate contents")
},
}
log(ctx).Infof("Finished verifying %v contents, found %v errors.", verifiedCount.Load(), errorCount.Load())
ec := errorCount.Load()
if ec == 0 {
return nil
if err := rep.ContentReader().VerifyContents(ctx, opts); err != nil {
return errors.Wrap(err, "verify contents")
}
return errors.Errorf("encountered %v errors", ec)
return nil
}
func (c *commandContentVerify) getTotalContentCount(ctx context.Context, rep repo.DirectRepository, totalCount *atomic.Int32) {
@@ -146,25 +124,3 @@ func (c *commandContentVerify) getTotalContentCount(ctx context.Context, rep rep
totalCount.Store(tc)
}
func (c *commandContentVerify) contentVerify(ctx context.Context, r content.Reader, ci content.Info, blobMap map[blob.ID]blob.Metadata, downloadPercent float64) error {
bi, ok := blobMap[ci.PackBlobID]
if !ok {
return errors.Errorf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID)
}
if int64(ci.PackOffset+ci.PackedLength) > bi.Length {
return errors.Errorf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID)
}
//nolint:gosec
if 100*rand.Float64() < downloadPercent {
if _, err := r.GetContent(ctx, ci.ContentID); err != nil {
return errors.Wrapf(err, "content %v is invalid", ci.ContentID)
}
return nil
}
return nil
}

View File

@@ -20,4 +20,5 @@ type Reader interface {
IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error
ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error)
EpochManager(ctx context.Context) (*epoch.Manager, bool, error)
VerifyContents(ctx context.Context, o VerifyOptions) error
}

156
repo/content/verify.go Normal file
View File

@@ -0,0 +1,156 @@
package content
import (
"context"
"math/rand"
"sync/atomic"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
// VerifyOptions allows specifying the optional arguments for WriteManager.VerifyContent.
type VerifyOptions struct {
ContentIDRange IDRange // defaults to AllIDs when not specified
ContentReadPercentage float64
IncludeDeletedContents bool
ContentIterateParallelism int
ProgressCallback func(VerifyProgressStats)
// Number of contents that need to be processed between calls to ProgressCallback.
// For example, with a ProgressCallbackInterval of 1000, ProgressCallback
// is called once for every 1000 contents that are processed.
ProgressCallbackInterval uint32
}
// VerifyProgressStats contains progress counters that are passed to the
// progress callback used in WriteManager.VerifyContent.
type VerifyProgressStats struct {
ErrorCount uint32
SuccessCount uint32
}
// VerifyContents checks whether contents are backed by valid blobs.
func (bm *WriteManager) VerifyContents(ctx context.Context, o VerifyOptions) error {
var v contentVerifier
return v.verifyContents(ctx, bm, o)
}
var errMissingPacks = errors.New("the repository is corrupted, it is missing pack blobs with index-referenced content")
type contentVerifier struct {
bm *WriteManager
existingPacks map[blob.ID]blob.Metadata
progressCallback func(VerifyProgressStats)
progressCallbackInterval uint32
contentReadProbability float64
// content verification stats
successCount atomic.Uint32
errorCount atomic.Uint32
verifiedCount atomic.Uint32 // used for calling the progress callback at the specified interval.
log logging.Logger
}
func (v *contentVerifier) verifyContents(ctx context.Context, bm *WriteManager, o VerifyOptions) error {
existingPacks, err := blob.ReadBlobMap(ctx, bm.st)
if err != nil {
return errors.Wrap(err, "unable to get blob metadata map")
}
v.log = logging.Module("content/verify")(ctx)
v.bm = bm
v.existingPacks = existingPacks
v.progressCallback = o.ProgressCallback
v.contentReadProbability = max(o.ContentReadPercentage/100, 0) //nolint:mnd
if o.ProgressCallback != nil {
v.progressCallbackInterval = o.ProgressCallbackInterval
}
v.log.Info("Verifying contents...")
itOpts := IterateOptions{
Range: o.ContentIDRange,
Parallel: o.ContentIterateParallelism,
IncludeDeleted: o.IncludeDeletedContents,
}
cb := func(ci Info) error {
v.verify(ctx, ci)
return nil
}
err = bm.IterateContents(ctx, itOpts, cb)
ec := v.errorCount.Load()
contentCount := v.successCount.Load() + ec
v.log.Infof("Finished verifying %v contents, found %v errors.", contentCount, ec)
if err != nil {
return err
}
if ec != 0 {
return errors.Wrapf(errMissingPacks, "encountered %v errors", ec)
}
return nil
}
// verifies a content, updates the corresponding counter stats and it may call
// the progress callback.
func (v *contentVerifier) verify(ctx context.Context, ci Info) {
v.verifyContentImpl(ctx, ci)
count := v.verifiedCount.Add(1)
if v.progressCallbackInterval > 0 && count%v.progressCallbackInterval == 0 {
s := VerifyProgressStats{
SuccessCount: v.successCount.Load(),
ErrorCount: v.errorCount.Load(),
}
v.progressCallback(s)
}
}
func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) {
bi, found := v.existingPacks[ci.PackBlobID]
if !found {
v.errorCount.Add(1)
v.log.Warnf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID)
return
}
if int64(ci.PackOffset+ci.PackedLength) > bi.Length {
v.errorCount.Add(1)
v.log.Warnf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID)
return
}
//nolint:gosec
if v.contentReadProbability > 0 && rand.Float64() < v.contentReadProbability {
if _, err := v.bm.GetContent(ctx, ci.ContentID); err != nil {
v.errorCount.Add(1)
v.log.Warnf("content %v is invalid: %v", ci.ContentID, err)
return
}
}
v.successCount.Add(1)
}

309
repo/content/verify_test.go Normal file
View File

@@ -0,0 +1,309 @@
package content
import (
"bytes"
"encoding/binary"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/format"
)
func newTestingMapStorage() blob.Storage {
data := blobtesting.DataMap{}
keyTime := map[blob.ID]time.Time{}
return blobtesting.NewMapStorage(data, keyTime, nil)
}
// newTestWriteManager is a helper to create a WriteManager for testing.
func newTestWriteManager(t *testing.T, st blob.Storage) *WriteManager {
t.Helper()
fp := mustCreateFormatProvider(t, &format.ContentFormat{
Hash: "HMAC-SHA256-128",
Encryption: "AES256-GCM-HMAC-SHA256",
HMACSecret: []byte("test-hmac"),
MasterKey: []byte("0123456789abcdef0123456789abcdef"),
MutableParameters: format.MutableParameters{
Version: 2,
EpochParameters: epoch.DefaultParameters(),
IndexVersion: index.Version2,
MaxPackSize: 1024 * 1024, // 1 MB
},
})
bm, err := NewManagerForTesting(testlogging.Context(t), st, fp, nil, nil)
require.NoError(t, err, "cannot create content write manager")
return bm
}
func TestVerifyContents_NoMissingPacks(t *testing.T) {
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create pack by writing contents.
_, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression)
require.NoError(t, err)
_, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
err = bm.VerifyContents(ctx, VerifyOptions{
ContentIterateParallelism: 1,
})
require.NoError(t, err, "verification should pass as the packs exists")
}
func TestVerifyContentToPackMapping_EnsureCallbackIsCalled(t *testing.T) {
const numberOfContents = 6
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create numberOfContents contents
var buf [4]byte
for i := range numberOfContents {
binary.LittleEndian.PutUint32(buf[:], uint32(i))
_, err := bm.WriteContent(ctx, gather.FromSlice(buf[:]), "", NoCompression)
require.NoError(t, err)
}
require.NoError(t, bm.Flush(ctx))
var callbackCount atomic.Uint32 // use atomic to support higher parallelism
cb := func(st VerifyProgressStats) {
callbackCount.Add(1)
}
// verify that the callback is called twice (every numberOfContents / 2)
err := bm.VerifyContents(ctx, VerifyOptions{
ContentIterateParallelism: 1,
ProgressCallback: cb,
ProgressCallbackInterval: numberOfContents / 2,
})
require.NoError(t, err, "verification should pass as the packs exists")
require.EqualValues(t, 2, callbackCount.Load(), "unexpected callback call count")
// Delete the pack from storage so verification fails
blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular)
require.NoError(t, err)
require.Len(t, blobs, 1)
require.NoError(t, st.DeleteBlob(ctx, blobs[0].BlobID))
callbackCount.Store(0)
// verify the callback is called when there are errors as well.
// verify that the callback is called twice (every numberOfContents / 2)
err = bm.VerifyContents(ctx, VerifyOptions{
ContentIterateParallelism: 1,
ProgressCallback: cb,
ProgressCallbackInterval: numberOfContents / 2,
})
require.Error(t, err, "verification should fail as the pack is missing")
require.EqualValues(t, 2, callbackCount.Load(), "unexpected callback call count")
}
func TestVerifyContents_Deleted(t *testing.T) {
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create pack by writing contents.
cid, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello 1")), "", NoCompression)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
// get pack id
blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular)
require.NoError(t, err)
require.Len(t, blobs, 1)
packId := blobs[0].BlobID
// write another content and delete the first content
_, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello 2")), "", NoCompression)
require.NoError(t, err)
err = bm.DeleteContent(ctx, cid)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
err = bm.VerifyContents(ctx, VerifyOptions{
IncludeDeletedContents: true,
})
require.NoError(t, err, "Verification should succeed")
// Delete the first pack from storage so verification fails
require.NoError(t, st.DeleteBlob(ctx, packId))
err = bm.VerifyContents(ctx, VerifyOptions{
IncludeDeletedContents: false,
})
require.NoError(t, err, "Verification should succeed")
err = bm.VerifyContents(ctx, VerifyOptions{
IncludeDeletedContents: true,
})
require.Error(t, err, "Verification should fail when deleted contents are included and the pack for the deleted content is missing")
}
func TestVerifyContents_TruncatedPack(t *testing.T) {
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create pack by writing contents.
_, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression)
require.NoError(t, err)
_, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
// Truncate the pack so verification fails
blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular)
require.NoError(t, err)
require.Len(t, blobs, 1)
require.NoError(t, st.PutBlob(ctx, blobs[0].BlobID, gather.Bytes{}, blob.PutOptions{}))
err = bm.VerifyContents(ctx, VerifyOptions{})
require.Error(t, err, "Verification should fail when a 'p' pack blob is truncated")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs})
require.Error(t, err, "Verification should fail when a 'p' pack blob is truncated and non-prefixed contents are verified")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs})
require.NoError(t, err, "verification should succeed when a 'p' pack blob is truncated and prefixed contents are verified")
}
func TestVerifyContents_CorruptedPack(t *testing.T) {
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create pack by writing contents.
_, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression)
require.NoError(t, err)
_, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
// Corrupt the pack so verification fails
blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular)
require.NoError(t, err)
require.Len(t, blobs, 1)
bid := blobs[0].BlobID
meta, err := st.GetMetadata(ctx, bid)
require.NoError(t, err)
require.NotZero(t, meta)
bSize := meta.Length
require.NotZero(t, bSize)
err = st.PutBlob(ctx, bid, gather.FromSlice(bytes.Repeat([]byte{1}, int(bSize))), blob.PutOptions{})
require.NoError(t, err)
err = bm.VerifyContents(ctx, VerifyOptions{ContentReadPercentage: 100})
require.Error(t, err, "Verification should fail when a 'p' pack blob is corrupted")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs, ContentReadPercentage: 100})
require.Error(t, err, "Verification should fail when a 'p' pack blob is corrupted and non-prefixed contents are verified")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs, ContentReadPercentage: 100})
require.NoError(t, err, "verification should succeed when a 'p' pack blob is corrupted and prefixed contents are verified")
}
func TestVerifyContents_MissingPackP(t *testing.T) {
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create pack by writing contents.
_, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression)
require.NoError(t, err)
_, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
// Delete pack so verification fails
blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixRegular)
require.NoError(t, err)
require.Len(t, blobs, 1)
require.NoError(t, st.DeleteBlob(ctx, blobs[0].BlobID))
err = bm.VerifyContents(ctx, VerifyOptions{})
require.Error(t, err, "Verification should fail when a 'p' pack blob is missing")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs})
require.Error(t, err, "Verification should fail when a 'p' pack blob is missing and non-prefixed contents are verified")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs})
require.NoError(t, err, "verification should succeed when a 'p' pack blob is missing and prefixed contents are verified")
}
func TestVerifyContentToPackMapping_MissingPackQ(t *testing.T) {
st := newTestingMapStorage()
bm := newTestWriteManager(t, st)
ctx := testlogging.Context(t)
// Create a 'p' pack by writing a non-prefixed content
_, err := bm.WriteContent(ctx, gather.FromSlice([]byte("hello")), "", NoCompression)
require.NoError(t, err)
// Create a 'q' pack by writing a prefixed content
_, err = bm.WriteContent(ctx, gather.FromSlice([]byte("hello prefixed")), "k", NoCompression)
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
// Delete the pack with 'q' prefix so verification fails
blobs, err := blob.ListAllBlobs(ctx, st, PackBlobIDPrefixSpecial)
require.NoError(t, err)
require.Len(t, blobs, 1)
require.NoError(t, st.DeleteBlob(ctx, blobs[0].BlobID))
err = bm.VerifyContents(ctx, VerifyOptions{})
require.Error(t, err, "verification should fail when a 'q' pack blob is missing")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllPrefixedIDs})
require.Error(t, err, "verification should fail when a 'q' pack blob is missing and prefixed contents are verified")
require.ErrorIs(t, err, errMissingPacks)
err = bm.VerifyContents(ctx, VerifyOptions{ContentIDRange: index.AllNonPrefixedIDs})
require.NoError(t, err, "verification should succeed when a 'q' pack blob is missing and non-prefixed contents are verified")
}