feat(cli): extend stats for content verification (#4830)

Track and report errors separately according to the type of error:
- missing pack
- truncated pack
- unreadable content.

Add a counter stat for the contents that are
read and fully verified (via `GetContent`).

Count errors grouped by pack ID using a `CountersMap`.
This allows determining the number of referenced contents
that were missing in a particular pack.

Report the counter stats via structured logging.

---

Sample output:

$ kopia content verify --progress-interval=0.5s --download-percent=100
Listing blobs...
Listed 102 blobs.
Verifying contents...
  Verified 1 contents, 0 errors, estimating...
  Verified 279 contents, 0 errors, estimating...
  Verified 512 of 624 contents (82.1%), 0 errors, remaining 0s, ETA 2025-09-17 23:03:38 PDT
Finished verifying contents
verifyCounters:	{"verifiedContents":624,"totalErrorCount":0,"contentsInMissingPacks":0,\
"contentsInTruncatedPacks":0,"unreadableContents":0,"readContents":624,\
"missingPacks":0,"truncatedPacks":0,"corruptedPacks":0}
This commit is contained in:
Julio Lopez
2025-09-17 23:36:09 -07:00
committed by GitHub
parent b22e97b38c
commit 490fa4b5b3
3 changed files with 292 additions and 11 deletions

View File

@@ -0,0 +1,82 @@
package stats
import (
"sync"
"sync/atomic"
)
// CountersMap is a concurrency-safe map from keys of type K to uint32 counters.
// It allows increments and retrievals of counts from concurrent go routines
// without additional concurrency coordination.
// Added counters cannot be removed.
type CountersMap[K comparable] struct {
// Stores map[K]*atomic.Uint32
// The counter is stored as pointer so it can be updated with atomic operations.
data sync.Map
length atomic.Uint32
}
// Increment increases the counter for the specified key by 1.
// Returns true if the key already existed, false if it was newly created.
func (m *CountersMap[K]) Increment(key K) bool {
return m.add(key, 1)
}
// add increases the counter for the specified key by the value of v.
// Returns true if the key already existed, false if it was newly created.
// Note: if this function is directly exported at some point, then overflow
// checks should be performed.
func (m *CountersMap[K]) add(key K, v uint32) bool {
// Attempt looking for an already existing entry first to avoid spurious
// (value) allocations in workloads where the entry likely exists already.
actual, found := m.data.Load(key)
if !found {
actual, found = m.data.LoadOrStore(key, &atomic.Uint32{})
if !found {
m.length.Add(1)
}
}
actual.(*atomic.Uint32).Add(v) //nolint:forcetypeassert
return found
}
// Length returns the approximate number of keys in the map. The actual number
// of keys can be equal or larger to the returned value, but not less.
func (m *CountersMap[K]) Length() uint {
return uint(m.length.Load())
}
// Get returns the current value of the counter for key, or 0 when key does not exist.
func (m *CountersMap[K]) Get(key K) (uint32, bool) {
actual, ok := m.data.Load(key)
if !ok {
return 0, false // Key not found, return 0
}
return actual.(*atomic.Uint32).Load(), true //nolint:forcetypeassert
}
// Range iterates over all key/count pairs in the map, calling f for each item.
// If f returns false, iteration stops.
func (m *CountersMap[K]) Range(f func(key K, count uint32) bool) {
m.data.Range(func(k any, v any) bool {
return f(k.(K), v.(*atomic.Uint32).Load()) //nolint:forcetypeassert
})
}
// CountMap returns the current value of the counters. The counters do not
// correspond to a consistent snapshot of the map, the counters may change
// while the returned map is built.
func (m *CountersMap[K]) CountMap() map[K]uint32 {
r := map[K]uint32{}
m.Range(func(key K, count uint32) bool {
r[key] = count
return true
})
return r
}

View File

@@ -0,0 +1,155 @@
package stats
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func TestConcurrentCountMap_Get_MissingKey(t *testing.T) {
var m CountersMap[string]
v, found := m.Get("missing")
require.False(t, found)
require.EqualValues(t, 0, v, "expected 0 for missing key")
}
func TestConcurrentCountMap_IncrementAndGet_NewAndExistingKey(t *testing.T) {
var m CountersMap[string]
found := m.Increment("foo")
require.False(t, found, "Increment on new key should return false")
got, found := m.Get("foo")
require.True(t, found)
require.EqualValues(t, 1, got, "expected 1 after first increment")
found = m.Increment("foo")
require.True(t, found, "Increment on existing key should return true")
got, found = m.Get("foo")
require.True(t, found)
require.EqualValues(t, 2, got, "expected 2 after second increment")
}
func TestConcurrentCountMap_Add(t *testing.T) {
var m CountersMap[int]
found := m.add(42, 5)
require.False(t, found, "Add on new key should return false to indicate 'key not previously found'")
v, found := m.Get(42)
require.EqualValues(t, 5, v, "expected 5 after add")
require.True(t, found)
found = m.add(42, 3)
require.True(t, found, "Add on existing key should return true")
v, found = m.Get(42)
require.EqualValues(t, 8, v, "expected 8 after second add")
require.True(t, found)
}
func TestConcurrentCountMap_Range(t *testing.T) {
var m CountersMap[string]
expectedMap := map[string]uint32{
"a": 1,
"b": 2,
"c": 3,
}
for k, v := range expectedMap {
require.False(t, m.add(k, v))
}
m.Range(func(key string, gotCount uint32) bool {
expectedCount, found := expectedMap[key]
require.True(t, found)
require.Equal(t, expectedCount, gotCount)
return true
})
}
func TestConcurrentCountMap_Length(t *testing.T) {
var m CountersMap[string]
require.False(t, m.add("a", 1))
require.False(t, m.add("b", 2))
require.EqualValues(t, 2, m.Length())
require.True(t, m.add("b", 2))
require.EqualValues(t, 2, m.Length())
require.False(t, m.add("c", 3))
require.EqualValues(t, 3, m.Length())
}
func TestConcurrentCountMap_Range_StopEarly(t *testing.T) {
var m CountersMap[string]
require.False(t, m.add("a", 1))
require.False(t, m.add("b", 2))
require.False(t, m.add("c", 3))
count := 0
m.Range(func(key string, v uint32) bool {
count++
return false // stop after first
})
require.Equal(t, 1, count, "Range should stop after one iteration")
}
func TestConcurrentCountMap_CountMap_Snapshot(t *testing.T) {
var m CountersMap[string]
require.False(t, m.add("x", 10))
require.False(t, m.add("y", 20))
expected := map[string]uint32{
"x": 10,
"y": 20,
}
require.Equal(t, expected, m.CountMap())
}
func TestConcurrentCountMap_ConcurrentIncrement(t *testing.T) {
const (
key = 7
concurrency = 8
inc = 1000
)
var m CountersMap[int]
var wg sync.WaitGroup
wg.Add(concurrency)
for range concurrency {
go func() {
for range inc {
m.Increment(key)
}
wg.Done()
}()
}
wg.Wait()
got, found := m.Get(key)
require.True(t, found)
require.EqualValues(t, concurrency*inc, got)
}

View File

@@ -7,6 +7,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/stats"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
@@ -53,8 +54,18 @@ type contentVerifier struct {
contentReadProbability float64
// content verification stats
successCount atomic.Uint32
errorCount atomic.Uint32
successCount atomic.Uint32
readContentCount atomic.Uint32
missingPackContentCount atomic.Uint32
truncatedPackContentCount atomic.Uint32
errorContentCount atomic.Uint32
// Per pack counts for content errors. Notice that a truncated pack, can
// also appear in the corruptedPacks map. A missing pack can only be in
// missingPacks, but not in the other sets.
missingPacks stats.CountersMap[blob.ID]
truncatedPacks stats.CountersMap[blob.ID]
corruptedPacks stats.CountersMap[blob.ID]
verifiedCount atomic.Uint32 // used for calling the progress callback at the specified interval.
@@ -93,17 +104,36 @@ func (v *contentVerifier) verifyContents(ctx context.Context, bm *WriteManager,
err = bm.IterateContents(ctx, itOpts, cb)
ec := v.errorCount.Load()
contentCount := v.successCount.Load() + ec
contentInMissingPackCount := v.missingPackContentCount.Load()
contentInTruncatedPackCount := v.truncatedPackContentCount.Load()
contentErrorCount := v.errorContentCount.Load()
totalErrorCount := contentInMissingPackCount + contentInTruncatedPackCount + contentErrorCount
v.log.Infof("Finished verifying %v contents, found %v errors.", contentCount, ec)
contentCount := v.verifiedCount.Load()
v.log.Info("Finished verifying contents")
v.log.Infow("verifyCounters:",
"verifiedContents", contentCount,
"totalErrorCount", totalErrorCount,
"contentsInMissingPacks", contentInMissingPackCount,
"contentsInTruncatedPacks", contentInTruncatedPackCount,
"unreadableContents", contentErrorCount,
"readContents", v.readContentCount.Load(),
"missingPacks", v.missingPacks.Length(),
"truncatedPacks", v.truncatedPacks.Length(),
"corruptedPacks", v.corruptedPacks.Length(),
)
logCountMap(v.log, "missingPack", &v.missingPacks)
logCountMap(v.log, "truncatedPack", &v.truncatedPacks)
logCountMap(v.log, "corruptedPack", &v.corruptedPacks)
if err != nil {
return err
}
if ec != 0 {
return errors.Wrapf(errMissingPacks, "encountered %v errors", ec)
if totalErrorCount != 0 {
return errors.Wrapf(errMissingPacks, "encountered %v errors", totalErrorCount)
}
return nil
@@ -119,7 +149,7 @@ func (v *contentVerifier) verify(ctx context.Context, ci Info) {
if v.progressCallbackInterval > 0 && count%v.progressCallbackInterval == 0 {
s := VerifyProgressStats{
SuccessCount: v.successCount.Load(),
ErrorCount: v.errorCount.Load(),
ErrorCount: v.missingPackContentCount.Load() + v.truncatedPackContentCount.Load() + v.errorContentCount.Load(),
}
v.progressCallback(s)
@@ -129,14 +159,16 @@ func (v *contentVerifier) verify(ctx context.Context, ci Info) {
func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) {
bi, found := v.existingPacks[ci.PackBlobID]
if !found {
v.errorCount.Add(1)
v.missingPackContentCount.Add(1)
v.missingPacks.Increment(ci.PackBlobID)
v.log.Warnf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID)
return
}
if int64(ci.PackOffset+ci.PackedLength) > bi.Length {
v.errorCount.Add(1)
v.truncatedPackContentCount.Add(1)
v.truncatedPacks.Increment(ci.PackBlobID)
v.log.Warnf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID)
return
@@ -144,8 +176,11 @@ func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) {
//nolint:gosec
if v.contentReadProbability > 0 && rand.Float64() < v.contentReadProbability {
v.readContentCount.Add(1)
if _, err := v.bm.GetContent(ctx, ci.ContentID); err != nil {
v.errorCount.Add(1)
v.errorContentCount.Add(1)
v.corruptedPacks.Increment(ci.PackBlobID)
v.log.Warnf("content %v is invalid: %v", ci.ContentID, err)
return
@@ -154,3 +189,12 @@ func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) {
v.successCount.Add(1)
}
// nothing is logged for an empty map.
func logCountMap(log logging.Logger, mapName string, m *stats.CountersMap[blob.ID]) {
m.Range(func(packID blob.ID, contentCount uint32) bool {
log.Warnw(mapName, "packID", packID, "numberOfReferencedContents", contentCount)
return true
})
}