From 490fa4b5b34fcfe4f7018ab9a0994f811eda9889 Mon Sep 17 00:00:00 2001 From: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> Date: Wed, 17 Sep 2025 23:36:09 -0700 Subject: [PATCH] feat(cli): extend stats for content verification (#4830) Track and report errors separately according to the type of error: - missing pack - truncated pack - unreadable content. Add a counter stat for the contents that are read and fully verified (via `GetContent`). Count errors grouped by pack ID using a `CountersMap`. This allows determining the number of referenced contents that were missing in a particular pack. Report the counter stats via structured logging. --- Sample output: $ kopia content verify --progress-interval=0.5s --download-percent=100 Listing blobs... Listed 102 blobs. Verifying contents... Verified 1 contents, 0 errors, estimating... Verified 279 contents, 0 errors, estimating... Verified 512 of 624 contents (82.1%), 0 errors, remaining 0s, ETA 2025-09-17 23:03:38 PDT Finished verifying contents verifyCounters: {"verifiedContents":624,"totalErrorCount":0,"contentsInMissingPacks":0,\ "contentsInTruncatedPacks":0,"unreadableContents":0,"readContents":624,\ "missingPacks":0,"truncatedPacks":0,"corruptedPacks":0} --- internal/stats/count_map.go | 82 ++++++++++++++++ internal/stats/count_map_test.go | 155 +++++++++++++++++++++++++++++++ repo/content/verify.go | 66 ++++++++++--- 3 files changed, 292 insertions(+), 11 deletions(-) create mode 100644 internal/stats/count_map.go create mode 100644 internal/stats/count_map_test.go diff --git a/internal/stats/count_map.go b/internal/stats/count_map.go new file mode 100644 index 000000000..a60244adf --- /dev/null +++ b/internal/stats/count_map.go @@ -0,0 +1,82 @@ +package stats + +import ( + "sync" + "sync/atomic" +) + +// CountersMap is a concurrency-safe map from keys of type K to uint32 counters. +// It allows increments and retrievals of counts from concurrent go routines +// without additional concurrency coordination. +// Added counters cannot be removed. +type CountersMap[K comparable] struct { + // Stores map[K]*atomic.Uint32 + // The counter is stored as pointer so it can be updated with atomic operations. + data sync.Map + length atomic.Uint32 +} + +// Increment increases the counter for the specified key by 1. +// Returns true if the key already existed, false if it was newly created. +func (m *CountersMap[K]) Increment(key K) bool { + return m.add(key, 1) +} + +// add increases the counter for the specified key by the value of v. +// Returns true if the key already existed, false if it was newly created. +// Note: if this function is directly exported at some point, then overflow +// checks should be performed. +func (m *CountersMap[K]) add(key K, v uint32) bool { + // Attempt looking for an already existing entry first to avoid spurious + // (value) allocations in workloads where the entry likely exists already. + actual, found := m.data.Load(key) + if !found { + actual, found = m.data.LoadOrStore(key, &atomic.Uint32{}) + if !found { + m.length.Add(1) + } + } + + actual.(*atomic.Uint32).Add(v) //nolint:forcetypeassert + + return found +} + +// Length returns the approximate number of keys in the map. The actual number +// of keys can be equal or larger to the returned value, but not less. +func (m *CountersMap[K]) Length() uint { + return uint(m.length.Load()) +} + +// Get returns the current value of the counter for key, or 0 when key does not exist. +func (m *CountersMap[K]) Get(key K) (uint32, bool) { + actual, ok := m.data.Load(key) + if !ok { + return 0, false // Key not found, return 0 + } + + return actual.(*atomic.Uint32).Load(), true //nolint:forcetypeassert +} + +// Range iterates over all key/count pairs in the map, calling f for each item. +// If f returns false, iteration stops. +func (m *CountersMap[K]) Range(f func(key K, count uint32) bool) { + m.data.Range(func(k any, v any) bool { + return f(k.(K), v.(*atomic.Uint32).Load()) //nolint:forcetypeassert + }) +} + +// CountMap returns the current value of the counters. The counters do not +// correspond to a consistent snapshot of the map, the counters may change +// while the returned map is built. +func (m *CountersMap[K]) CountMap() map[K]uint32 { + r := map[K]uint32{} + + m.Range(func(key K, count uint32) bool { + r[key] = count + + return true + }) + + return r +} diff --git a/internal/stats/count_map_test.go b/internal/stats/count_map_test.go new file mode 100644 index 000000000..839128539 --- /dev/null +++ b/internal/stats/count_map_test.go @@ -0,0 +1,155 @@ +package stats + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestConcurrentCountMap_Get_MissingKey(t *testing.T) { + var m CountersMap[string] + + v, found := m.Get("missing") + + require.False(t, found) + require.EqualValues(t, 0, v, "expected 0 for missing key") +} + +func TestConcurrentCountMap_IncrementAndGet_NewAndExistingKey(t *testing.T) { + var m CountersMap[string] + + found := m.Increment("foo") + require.False(t, found, "Increment on new key should return false") + + got, found := m.Get("foo") + require.True(t, found) + require.EqualValues(t, 1, got, "expected 1 after first increment") + + found = m.Increment("foo") + require.True(t, found, "Increment on existing key should return true") + + got, found = m.Get("foo") + require.True(t, found) + require.EqualValues(t, 2, got, "expected 2 after second increment") +} + +func TestConcurrentCountMap_Add(t *testing.T) { + var m CountersMap[int] + + found := m.add(42, 5) + require.False(t, found, "Add on new key should return false to indicate 'key not previously found'") + + v, found := m.Get(42) + + require.EqualValues(t, 5, v, "expected 5 after add") + require.True(t, found) + + found = m.add(42, 3) + require.True(t, found, "Add on existing key should return true") + + v, found = m.Get(42) + + require.EqualValues(t, 8, v, "expected 8 after second add") + require.True(t, found) +} + +func TestConcurrentCountMap_Range(t *testing.T) { + var m CountersMap[string] + + expectedMap := map[string]uint32{ + "a": 1, + "b": 2, + "c": 3, + } + + for k, v := range expectedMap { + require.False(t, m.add(k, v)) + } + + m.Range(func(key string, gotCount uint32) bool { + expectedCount, found := expectedMap[key] + + require.True(t, found) + require.Equal(t, expectedCount, gotCount) + + return true + }) +} + +func TestConcurrentCountMap_Length(t *testing.T) { + var m CountersMap[string] + + require.False(t, m.add("a", 1)) + require.False(t, m.add("b", 2)) + + require.EqualValues(t, 2, m.Length()) + require.True(t, m.add("b", 2)) + require.EqualValues(t, 2, m.Length()) + + require.False(t, m.add("c", 3)) + require.EqualValues(t, 3, m.Length()) +} + +func TestConcurrentCountMap_Range_StopEarly(t *testing.T) { + var m CountersMap[string] + + require.False(t, m.add("a", 1)) + require.False(t, m.add("b", 2)) + require.False(t, m.add("c", 3)) + + count := 0 + + m.Range(func(key string, v uint32) bool { + count++ + + return false // stop after first + }) + + require.Equal(t, 1, count, "Range should stop after one iteration") +} + +func TestConcurrentCountMap_CountMap_Snapshot(t *testing.T) { + var m CountersMap[string] + + require.False(t, m.add("x", 10)) + require.False(t, m.add("y", 20)) + + expected := map[string]uint32{ + "x": 10, + "y": 20, + } + + require.Equal(t, expected, m.CountMap()) +} + +func TestConcurrentCountMap_ConcurrentIncrement(t *testing.T) { + const ( + key = 7 + concurrency = 8 + inc = 1000 + ) + + var m CountersMap[int] + + var wg sync.WaitGroup + + wg.Add(concurrency) + + for range concurrency { + go func() { + for range inc { + m.Increment(key) + } + + wg.Done() + }() + } + + wg.Wait() + + got, found := m.Get(key) + + require.True(t, found) + require.EqualValues(t, concurrency*inc, got) +} diff --git a/repo/content/verify.go b/repo/content/verify.go index 0ee774cf9..82c05f15a 100644 --- a/repo/content/verify.go +++ b/repo/content/verify.go @@ -7,6 +7,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/stats" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" ) @@ -53,8 +54,18 @@ type contentVerifier struct { contentReadProbability float64 // content verification stats - successCount atomic.Uint32 - errorCount atomic.Uint32 + successCount atomic.Uint32 + readContentCount atomic.Uint32 + missingPackContentCount atomic.Uint32 + truncatedPackContentCount atomic.Uint32 + errorContentCount atomic.Uint32 + + // Per pack counts for content errors. Notice that a truncated pack, can + // also appear in the corruptedPacks map. A missing pack can only be in + // missingPacks, but not in the other sets. + missingPacks stats.CountersMap[blob.ID] + truncatedPacks stats.CountersMap[blob.ID] + corruptedPacks stats.CountersMap[blob.ID] verifiedCount atomic.Uint32 // used for calling the progress callback at the specified interval. @@ -93,17 +104,36 @@ func (v *contentVerifier) verifyContents(ctx context.Context, bm *WriteManager, err = bm.IterateContents(ctx, itOpts, cb) - ec := v.errorCount.Load() - contentCount := v.successCount.Load() + ec + contentInMissingPackCount := v.missingPackContentCount.Load() + contentInTruncatedPackCount := v.truncatedPackContentCount.Load() + contentErrorCount := v.errorContentCount.Load() + totalErrorCount := contentInMissingPackCount + contentInTruncatedPackCount + contentErrorCount - v.log.Infof("Finished verifying %v contents, found %v errors.", contentCount, ec) + contentCount := v.verifiedCount.Load() + + v.log.Info("Finished verifying contents") + v.log.Infow("verifyCounters:", + "verifiedContents", contentCount, + "totalErrorCount", totalErrorCount, + "contentsInMissingPacks", contentInMissingPackCount, + "contentsInTruncatedPacks", contentInTruncatedPackCount, + "unreadableContents", contentErrorCount, + "readContents", v.readContentCount.Load(), + "missingPacks", v.missingPacks.Length(), + "truncatedPacks", v.truncatedPacks.Length(), + "corruptedPacks", v.corruptedPacks.Length(), + ) + + logCountMap(v.log, "missingPack", &v.missingPacks) + logCountMap(v.log, "truncatedPack", &v.truncatedPacks) + logCountMap(v.log, "corruptedPack", &v.corruptedPacks) if err != nil { return err } - if ec != 0 { - return errors.Wrapf(errMissingPacks, "encountered %v errors", ec) + if totalErrorCount != 0 { + return errors.Wrapf(errMissingPacks, "encountered %v errors", totalErrorCount) } return nil @@ -119,7 +149,7 @@ func (v *contentVerifier) verify(ctx context.Context, ci Info) { if v.progressCallbackInterval > 0 && count%v.progressCallbackInterval == 0 { s := VerifyProgressStats{ SuccessCount: v.successCount.Load(), - ErrorCount: v.errorCount.Load(), + ErrorCount: v.missingPackContentCount.Load() + v.truncatedPackContentCount.Load() + v.errorContentCount.Load(), } v.progressCallback(s) @@ -129,14 +159,16 @@ func (v *contentVerifier) verify(ctx context.Context, ci Info) { func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) { bi, found := v.existingPacks[ci.PackBlobID] if !found { - v.errorCount.Add(1) + v.missingPackContentCount.Add(1) + v.missingPacks.Increment(ci.PackBlobID) v.log.Warnf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID) return } if int64(ci.PackOffset+ci.PackedLength) > bi.Length { - v.errorCount.Add(1) + v.truncatedPackContentCount.Add(1) + v.truncatedPacks.Increment(ci.PackBlobID) v.log.Warnf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID) return @@ -144,8 +176,11 @@ func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) { //nolint:gosec if v.contentReadProbability > 0 && rand.Float64() < v.contentReadProbability { + v.readContentCount.Add(1) + if _, err := v.bm.GetContent(ctx, ci.ContentID); err != nil { - v.errorCount.Add(1) + v.errorContentCount.Add(1) + v.corruptedPacks.Increment(ci.PackBlobID) v.log.Warnf("content %v is invalid: %v", ci.ContentID, err) return @@ -154,3 +189,12 @@ func (v *contentVerifier) verifyContentImpl(ctx context.Context, ci Info) { v.successCount.Add(1) } + +// nothing is logged for an empty map. +func logCountMap(log logging.Logger, mapName string, m *stats.CountersMap[blob.ID]) { + m.Range(func(packID blob.ID, contentCount uint32) bool { + log.Warnw(mapName, "packID", packID, "numberOfReferencedContents", contentCount) + + return true + }) +}