From e3e4e09bc83b63483e36a33af0f95f99d6a4a451 Mon Sep 17 00:00:00 2001 From: lyndon-li <98304688+Lyndon-Li@users.noreply.github.com> Date: Sat, 25 Oct 2025 04:49:40 +0800 Subject: [PATCH] feat(general): add stats to maintenance run - CleanupMarkers (#4900) --- internal/epoch/epoch_manager.go | 44 ++++-- internal/epoch/epoch_manager_test.go | 20 ++- repo/maintenance/maintenance_run.go | 4 +- .../maintenancestats/{types.go => builder.go} | 24 +++- repo/maintenancestats/builder_test.go | 126 ++++++++++++++++++ .../maintenancestats/stats_cleanup_markers.go | 33 +++++ 6 files changed, 230 insertions(+), 21 deletions(-) rename repo/maintenancestats/{types.go => builder.go} (58%) create mode 100644 repo/maintenancestats/builder_test.go create mode 100644 repo/maintenancestats/stats_cleanup_markers.go diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 46f7b8d7c..368407f02 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -19,6 +19,7 @@ "github.com/kopia/kopia/internal/contentlog/logparam" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/maintenancestats" ) // LatestEpoch represents the current epoch number in GetCompleteIndexSet. @@ -290,21 +291,21 @@ func (e *Manager) maxCleanupTime(cs CurrentSnapshot) time.Time { } // CleanupMarkers removes superseded watermarks and epoch markers. -func (e *Manager) CleanupMarkers(ctx context.Context) error { +func (e *Manager) CleanupMarkers(ctx context.Context) (*maintenancestats.CleanupMarkersStats, error) { cs, err := e.committedState(ctx, 0) if err != nil { - return err + return nil, err } p, err := e.getParameters(ctx) if err != nil { - return err + return nil, err } return e.cleanupInternal(ctx, cs, p) } -func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Parameters) error { +func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Parameters) (*maintenancestats.CleanupMarkersStats, error) { eg, ctx := errgroup.WithContext(ctx) // find max timestamp recently written to the repository to establish storage clock. @@ -312,7 +313,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Pa // to this max time. This assumes that storage clock moves forward somewhat reasonably. maxTime := e.maxCleanupTime(cs) if maxTime.IsZero() { - return nil + return nil, nil } // only delete blobs if a suitable replacement exists and has been written sufficiently @@ -320,18 +321,35 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Pa // may have not observed them yet. maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin) + var deletedEpochMarkers, deletedDeletionWaterMarks atomic.Int64 + eg.Go(func() error { - return e.cleanupEpochMarkers(ctx, cs) + deleted, err := e.cleanupEpochMarkers(ctx, cs) + deletedEpochMarkers.Store(int64(deleted)) + + return err }) eg.Go(func() error { - return e.cleanupWatermarks(ctx, cs, p, maxReplacementTime) + deleted, err := e.cleanupWatermarks(ctx, cs, p, maxReplacementTime) + deletedDeletionWaterMarks.Store(int64(deleted)) + + return err }) - return errors.Wrap(eg.Wait(), "error cleaning up index blobs") + if err := eg.Wait(); err != nil { + return nil, errors.Wrap(err, "error cleaning up index blobs") + } + + result := &maintenancestats.CleanupMarkersStats{ + DeletedEpochMarkerBlobCount: int(deletedEpochMarkers.Load()), + DeletedDeletionWaterMarkBlobCount: int(deletedDeletionWaterMarks.Load()), + } + + return result, nil } -func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) error { +func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) (int, error) { // delete epoch markers for epoch < current-1 var toDelete []blob.ID @@ -345,13 +363,13 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e p, err := e.getParameters(ctx) if err != nil { - return err + return 0, err } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker") + return len(toDelete), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker") } -func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) error { +func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) (int, error) { var toDelete []blob.ID for _, bm := range cs.DeletionWatermarkBlobs { @@ -369,7 +387,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p * } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs") + return len(toDelete), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs") } // CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones. diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index 810f752f8..78032cbc0 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -25,6 +25,7 @@ "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/logging" "github.com/kopia/kopia/repo/blob/readonly" + "github.com/kopia/kopia/repo/maintenancestats" ) type fakeIndex struct { @@ -1319,9 +1320,10 @@ func TestCleanupMarkers_Empty(t *testing.T) { ctx := testlogging.Context(t) // this should be a no-op - err := te.mgr.CleanupMarkers(ctx) + stats, err := te.mgr.CleanupMarkers(ctx) require.NoError(t, err) + require.Nil(t, stats) } func TestCleanupMarkers_GetParametersError(t *testing.T) { @@ -1333,10 +1335,11 @@ func TestCleanupMarkers_GetParametersError(t *testing.T) { paramsError := errors.New("no parameters error") te.mgr.paramProvider = faultyParamsProvider{err: paramsError} - err := te.mgr.CleanupMarkers(ctx) + stats, err := te.mgr.CleanupMarkers(ctx) require.Error(t, err) require.ErrorIs(t, err, paramsError) + require.Nil(t, stats) } func TestCleanupMarkers_FailToReadState(t *testing.T) { @@ -1349,9 +1352,10 @@ func TestCleanupMarkers_FailToReadState(t *testing.T) { cancel() - err := te.mgr.CleanupMarkers(ctx) + stats, err := te.mgr.CleanupMarkers(ctx) require.Error(t, err) + require.Nil(t, stats) } func TestCleanupMarkers_AvoidCleaningUpSingleEpochMarker(t *testing.T) { @@ -1369,8 +1373,10 @@ func TestCleanupMarkers_AvoidCleaningUpSingleEpochMarker(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, cs.WriteEpoch) - err = te.mgr.CleanupMarkers(ctx) + stats, err := te.mgr.CleanupMarkers(ctx) + require.NoError(t, err) + require.Nil(t, stats) require.NoError(t, te.mgr.Refresh(ctx)) @@ -1408,8 +1414,12 @@ func TestCleanupMarkers_CleanUpManyMarkers(t *testing.T) { require.NoError(t, err) require.Len(t, cs.EpochMarkerBlobs, epochsToAdvance) - err = te.mgr.CleanupMarkers(ctx) + stats, err := te.mgr.CleanupMarkers(ctx) require.NoError(t, err) + require.Equal(t, &maintenancestats.CleanupMarkersStats{ + DeletedEpochMarkerBlobCount: 3, + DeletedDeletionWaterMarkBlobCount: 0, + }, stats) // is the epoch marker preserved? require.NoError(t, te.mgr.Refresh(ctx)) diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 33cd434ba..7e7895264 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -396,7 +396,9 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s err := ReportRun(ctx, runParams.rep, TaskEpochCleanupMarkers, s, func() (maintenancestats.Kind, error) { userLog(ctx).Info("Cleaning up unneeded epoch markers...") - return nil, errors.Wrap(em.CleanupMarkers(ctx), "error removing epoch markers") + stats, err := em.CleanupMarkers(ctx) + + return stats, errors.Wrap(err, "error removing epoch markers") }) if err != nil { return err diff --git a/repo/maintenancestats/types.go b/repo/maintenancestats/builder.go similarity index 58% rename from repo/maintenancestats/types.go rename to repo/maintenancestats/builder.go index 6242cd8ec..e1994db29 100644 --- a/repo/maintenancestats/types.go +++ b/repo/maintenancestats/builder.go @@ -23,8 +23,15 @@ type Kind interface { Kind() string } +// ErrUnSupportedStatKindError is reported for unsupported stats kind. +var ErrUnSupportedStatKindError = errors.New("unsupported stats kind") + // BuildExtra builds an Extra from maintenance statistics. func BuildExtra(stats Kind) (Extra, error) { + if stats == nil { + return Extra{}, errors.New("invalid stats") + } + bytes, err := json.Marshal(stats) if err != nil { return Extra{}, errors.Wrapf(err, "error marshaling stats %v", stats) @@ -37,6 +44,19 @@ func BuildExtra(stats Kind) (Extra, error) { } // BuildFromExtra builds maintenance statistics from an Extra and returns a Summarizer. -func BuildFromExtra(_ Extra) (Summarizer, error) { - return nil, nil +func BuildFromExtra(stats Extra) (Summarizer, error) { + var result Summarizer + + switch stats.Kind { + case cleanupMarkersStatsKind: + result = &CleanupMarkersStats{} + default: + return nil, errors.Wrapf(ErrUnSupportedStatKindError, "invalid kind for stats %v", stats) + } + + if err := json.Unmarshal(stats.Data, result); err != nil { + return nil, errors.Wrapf(err, "error unmarshaling raw stats %v of kind %s to %T", stats.Data, stats.Kind, result) + } + + return result, nil } diff --git a/repo/maintenancestats/builder_test.go b/repo/maintenancestats/builder_test.go new file mode 100644 index 000000000..1e8ee7752 --- /dev/null +++ b/repo/maintenancestats/builder_test.go @@ -0,0 +1,126 @@ +package maintenancestats + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type unmarshalable struct { + Data string + Channel chan int +} + +func (u *unmarshalable) Kind() string { + return u.Data +} + +func TestBuildExtraSuccess(t *testing.T) { + cases := []struct { + name string + stats Kind + expected Extra + }{ + { + name: "succeed", + stats: &CleanupMarkersStats{ + DeletedEpochMarkerBlobCount: 10, + DeletedDeletionWaterMarkBlobCount: 20, + }, + expected: Extra{ + Kind: "cleanupMarkersStats", + Data: []byte(`{"deletedEpochMarkerBlobCount":10,"deletedDeletionWaterMarkBlobCount":20}`), + }, + }, + } + + for _, tc := range cases { + result, err := BuildExtra(tc.stats) + + require.NoError(t, err) + require.Equal(t, tc.expected, result) + } +} + +func TestBuildExtraError(t *testing.T) { + um := unmarshalable{ + Data: "fake", + } + + cases := []struct { + name string + stats Kind + expectedErr string + }{ + { + name: "nil stats", + expectedErr: "invalid stats", + }, + { + name: "marshal fails", + stats: &um, + expectedErr: "error marshaling stats &{fake }: json: unsupported type: chan int", + }, + } + + for _, tc := range cases { + result, err := BuildExtra(tc.stats) + + require.EqualError(t, err, tc.expectedErr) + require.Equal(t, Extra{}, result) + } +} + +func TestBuildFromExtraSuccess(t *testing.T) { + cases := []struct { + name string + stats Extra + expected Summarizer + }{ + { + name: "cleanupMarkersStats", + stats: Extra{ + Kind: cleanupMarkersStatsKind, + Data: []byte(`{"deletedEpochMarkerBlobCount":10,"deletedDeletionWaterMarkBlobCount":20}`), + }, + expected: &CleanupMarkersStats{ + DeletedEpochMarkerBlobCount: 10, + DeletedDeletionWaterMarkBlobCount: 20, + }, + }, + } + + for _, tc := range cases { + result, err := BuildFromExtra(tc.stats) + + require.NoError(t, err) + require.Equal(t, tc.expected, result) + } +} + +func TestBuildFromExtraError(t *testing.T) { + cases := []struct { + name string + stats Extra + expectedErr string + }{ + { + name: "unsupported kind", + expectedErr: "invalid kind for stats { []}: unsupported stats kind", + }, + { + name: "unmarshal fails", + stats: Extra{ + Kind: cleanupMarkersStatsKind, + }, + expectedErr: "error unmarshaling raw stats [] of kind cleanupMarkersStats to *maintenancestats.CleanupMarkersStats: unexpected end of JSON input", + }, + } + + for _, tc := range cases { + result, err := BuildFromExtra(tc.stats) + + require.EqualError(t, err, tc.expectedErr) + require.Nil(t, result) + } +} diff --git a/repo/maintenancestats/stats_cleanup_markers.go b/repo/maintenancestats/stats_cleanup_markers.go new file mode 100644 index 000000000..8c59983aa --- /dev/null +++ b/repo/maintenancestats/stats_cleanup_markers.go @@ -0,0 +1,33 @@ +package maintenancestats + +import ( + "fmt" + + "github.com/kopia/kopia/internal/contentlog" +) + +const cleanupMarkersStatsKind = "cleanupMarkersStats" + +// CleanupMarkersStats are the stats for cleaning up markers. +type CleanupMarkersStats struct { + DeletedEpochMarkerBlobCount int `json:"deletedEpochMarkerBlobCount"` + DeletedDeletionWaterMarkBlobCount int `json:"deletedDeletionWaterMarkBlobCount"` +} + +// WriteValueTo writes the stats to JSONWriter. +func (cs *CleanupMarkersStats) WriteValueTo(jw *contentlog.JSONWriter) { + jw.BeginObjectField(cs.Kind()) + jw.IntField("deletedEpochMarkerBlobCount", cs.DeletedEpochMarkerBlobCount) + jw.IntField("deletedDeletionWaterMarkBlobCount", cs.DeletedDeletionWaterMarkBlobCount) + jw.EndObject() +} + +// Summary generates a human readable summary for the stats. +func (cs *CleanupMarkersStats) Summary() string { + return fmt.Sprintf("Cleaned up %v epoch markers and %v deletion watermarks", cs.DeletedEpochMarkerBlobCount, cs.DeletedDeletionWaterMarkBlobCount) +} + +// Kind returns the kind name for the stats. +func (cs *CleanupMarkersStats) Kind() string { + return cleanupMarkersStatsKind +}