From ed7ef85a2d800ca26cb0c4185717bc49a5fd8d63 Mon Sep 17 00:00:00 2001 From: lyndon-li <98304688+Lyndon-Li@users.noreply.github.com> Date: Fri, 31 Oct 2025 14:11:15 +0800 Subject: [PATCH] feat(general): add stats to maintenance run - AdvanceEpoch (#4937) --- internal/epoch/epoch_manager.go | 19 +++++--- internal/epoch/epoch_manager_test.go | 47 ++++++++++++++------ repo/maintenance/maintenance_run.go | 7 ++- repo/maintenancestats/builder.go | 2 + repo/maintenancestats/builder_test.go | 22 +++++++++ repo/maintenancestats/stats_advance_epoch.go | 38 ++++++++++++++++ 6 files changed, 115 insertions(+), 20 deletions(-) create mode 100644 repo/maintenancestats/stats_advance_epoch.go diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 522233907..83d4b8204 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -745,21 +745,30 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { // MaybeAdvanceWriteEpoch writes a new write epoch marker when a new write // epoch should be started, otherwise it does not do anything. -func (e *Manager) MaybeAdvanceWriteEpoch(ctx context.Context) error { +func (e *Manager) MaybeAdvanceWriteEpoch(ctx context.Context) (*maintenancestats.AdvanceEpochStats, error) { p, err := e.getParameters(ctx) if err != nil { - return err + return nil, err } e.mu.Lock() cs := e.lastKnownState e.mu.Unlock() - if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) { - return errors.Wrap(e.advanceEpochMarker(ctx, cs), "error advancing epoch") + result := &maintenancestats.AdvanceEpochStats{ + CurrentEpoch: cs.WriteEpoch, } - return nil + if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) { + if err := e.advanceEpochMarker(ctx, cs); err != nil { + return nil, errors.Wrap(err, "error advancing epoch") + } + + result.CurrentEpoch++ + result.WasAdvanced = true + } + + return result, nil } func (e *Manager) advanceEpochMarker(ctx context.Context, cs CurrentSnapshot) error { diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index ad5742d8e..3535fd9e7 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -621,9 +621,11 @@ func TestMaybeAdvanceEpoch_Empty(t *testing.T) { te.verifyCurrentWriteEpoch(t, 0) // this should be a no-op - err := te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.NoError(t, err) + require.Equal(t, 0, stats.CurrentEpoch) + require.False(t, stats.WasAdvanced) // check current epoch again te.verifyCurrentWriteEpoch(t, 0) @@ -669,9 +671,11 @@ func TestMaybeAdvanceEpoch(t *testing.T) { require.NoError(t, err) te.verifyCurrentWriteEpoch(t, 0) - err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.NoError(t, err) + require.Equal(t, 1, stats.CurrentEpoch) + require.True(t, stats.WasAdvanced) err = te.mgr.Refresh(ctx) // force state refresh @@ -696,10 +700,11 @@ func TestMaybeAdvanceEpoch_GetParametersError(t *testing.T) { paramsError := errors.New("no parameters error") te.mgr.paramProvider = faultyParamsProvider{err: paramsError} - err := te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.Error(t, err) require.ErrorIs(t, err, paramsError) + require.Nil(t, stats) } func TestMaybeAdvanceEpoch_Error(t *testing.T) { @@ -739,10 +744,11 @@ func TestMaybeAdvanceEpoch_Error(t *testing.T) { te.faultyStorage.AddFaults(blobtesting.MethodPutBlob, fault.New().ErrorInstead(berr)) - err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.Error(t, err) require.ErrorIs(t, err, berr) + require.Nil(t, stats) } func TestForceAdvanceEpoch(t *testing.T) { @@ -920,7 +926,7 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) { idxCount := p.GetEpochAdvanceOnCountThreshold() // Create sufficient indexes blobs and move clock forward to advance epoch. - for range 4 { + for j := range 4 { for i := range idxCount { if i == idxCount-1 { // Advance the time so that the difference in times for writes will force @@ -931,7 +937,14 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) { te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i)) } - require.NoError(t, te.mgr.MaybeAdvanceWriteEpoch(ctx)) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) + require.NoError(t, err) + require.Equal(t, j+1, stats.CurrentEpoch) + require.True(t, stats.WasAdvanced) + + err = te.mgr.Refresh(ctx) // force state refresh + + require.NoError(t, err) } compactionError := errors.New("test compaction error") @@ -976,8 +989,10 @@ func TestMaybeCompactSingleEpoch(t *testing.T) { te.verifyCurrentWriteEpoch(t, j) - err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.NoError(t, err) + require.Equal(t, j+1, stats.CurrentEpoch) + require.True(t, stats.WasAdvanced) err = te.mgr.Refresh(ctx) // force state refresh @@ -1085,7 +1100,7 @@ func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) { var k int // Create sufficient indexes blobs and move clock forward to advance epoch. - for range epochsToWrite { + for j := range epochsToWrite { for i := range idxCount { if i == idxCount-1 { // Advance the time so that the difference in times for writes will force @@ -1098,8 +1113,10 @@ func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) { k++ } - err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.NoError(t, err) + require.Equal(t, j+1, stats.CurrentEpoch) + require.True(t, stats.WasAdvanced) err = te.mgr.Refresh(ctx) require.NoError(t, err) @@ -1136,7 +1153,7 @@ func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) { epochsToWrite := p.FullCheckpointFrequency + 3 idxCount := p.GetEpochAdvanceOnCountThreshold() // Create sufficient indexes blobs and move clock forward to advance epoch. - for range epochsToWrite { + for j := range epochsToWrite { for i := range idxCount { if i == idxCount-1 { // Advance the time so that the difference in times for writes will force @@ -1147,8 +1164,10 @@ func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) { te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k)) } - err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.NoError(t, err) + require.Equal(t, j+1, stats.CurrentEpoch) + require.True(t, stats.WasAdvanced) err = te.mgr.Refresh(ctx) require.NoError(t, err) @@ -1189,7 +1208,7 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) { epochsToWrite := p.FullCheckpointFrequency + 3 idxCount := p.GetEpochAdvanceOnCountThreshold() // Create sufficient indexes blobs and move clock forward to advance epoch. - for range epochsToWrite { + for j := range epochsToWrite { for i := range idxCount { if i == idxCount-1 { // Advance the time so that the difference in times for writes will force @@ -1200,8 +1219,10 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) { te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k)) } - err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + stats, err := te.mgr.MaybeAdvanceWriteEpoch(ctx) require.NoError(t, err) + require.Equal(t, j+1, stats.CurrentEpoch) + require.True(t, stats.WasAdvanced) err = te.mgr.Refresh(ctx) require.NoError(t, err) diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 8681ead5b..69a4d66f0 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -342,8 +342,11 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error { return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochAdvance, s, func() (maintenancestats.Kind, error) { - userLog(ctx).Info("Cleaning up no-longer-needed epoch markers...") - return nil, errors.Wrap(em.MaybeAdvanceWriteEpoch(ctx), "error advancing epoch marker") + userLog(ctx).Info("Advancing epoch markers...") + + stats, err := em.MaybeAdvanceWriteEpoch(ctx) + + return stats, errors.Wrap(err, "error advancing epoch marker") }) } diff --git a/repo/maintenancestats/builder.go b/repo/maintenancestats/builder.go index 12b594df7..7305d71ec 100644 --- a/repo/maintenancestats/builder.go +++ b/repo/maintenancestats/builder.go @@ -54,6 +54,8 @@ func BuildFromExtra(stats Extra) (Summarizer, error) { result = &CleanupSupersededIndexesStats{} case generateRangeCheckpointStatsKind: result = &GenerateRangeCheckpointStats{} + case advanceEpochStatsKind: + result = &AdvanceEpochStats{} default: return nil, errors.Wrapf(ErrUnSupportedStatKindError, "invalid kind for stats %v", stats) } diff --git a/repo/maintenancestats/builder_test.go b/repo/maintenancestats/builder_test.go index 595f27786..849f9711e 100644 --- a/repo/maintenancestats/builder_test.go +++ b/repo/maintenancestats/builder_test.go @@ -44,6 +44,17 @@ func TestBuildExtraSuccess(t *testing.T) { Data: []byte(`{"rangeMinEpoch":3,"rangeMaxEpoch":5}`), }, }, + { + name: "advanceEpochStats", + stats: &AdvanceEpochStats{ + CurrentEpoch: 3, + WasAdvanced: true, + }, + expected: Extra{ + Kind: advanceEpochStatsKind, + Data: []byte(`{"currentEpoch":3,"wasAdvanced":true}`), + }, + }, } for _, tc := range cases { @@ -123,6 +134,17 @@ func TestBuildFromExtraSuccess(t *testing.T) { RangeMaxEpoch: 5, }, }, + { + name: "advanceEpochStats", + stats: Extra{ + Kind: advanceEpochStatsKind, + Data: []byte(`{"currentEpoch":3,"wasAdvanced":true}`), + }, + expected: &AdvanceEpochStats{ + CurrentEpoch: 3, + WasAdvanced: true, + }, + }, } for _, tc := range cases { diff --git a/repo/maintenancestats/stats_advance_epoch.go b/repo/maintenancestats/stats_advance_epoch.go new file mode 100644 index 000000000..76fdd9602 --- /dev/null +++ b/repo/maintenancestats/stats_advance_epoch.go @@ -0,0 +1,38 @@ +package maintenancestats + +import ( + "fmt" + + "github.com/kopia/kopia/internal/contentlog" +) + +const advanceEpochStatsKind = "advanceEpochStats" + +// AdvanceEpochStats are the stats for advancing write epoch. +type AdvanceEpochStats struct { + CurrentEpoch int `json:"currentEpoch"` + WasAdvanced bool `json:"wasAdvanced"` +} + +// WriteValueTo writes the stats to JSONWriter. +func (as *AdvanceEpochStats) WriteValueTo(jw *contentlog.JSONWriter) { + jw.BeginObjectField(as.Kind()) + jw.IntField("currentEpoch", as.CurrentEpoch) + jw.BoolField("wasAdvanced", as.WasAdvanced) + jw.EndObject() +} + +// Summary generates a human readable summary for the stats. +func (as *AdvanceEpochStats) Summary() string { + var message string + if as.WasAdvanced { + message = fmt.Sprintf("Advanced epoch to %v", as.CurrentEpoch) + } + + return message +} + +// Kind returns the kind name for the stats. +func (as *AdvanceEpochStats) Kind() string { + return advanceEpochStatsKind +}