mirror of
https://github.com/kopia/kopia.git
synced 2026-05-18 19:54:37 -04:00
feat(general): add stats to maintenance run - CleanupMarkers (#4900)
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
"github.com/kopia/kopia/internal/contentlog/logparam"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/maintenancestats"
|
||||
)
|
||||
|
||||
// LatestEpoch represents the current epoch number in GetCompleteIndexSet.
|
||||
@@ -290,21 +291,21 @@ func (e *Manager) maxCleanupTime(cs CurrentSnapshot) time.Time {
|
||||
}
|
||||
|
||||
// CleanupMarkers removes superseded watermarks and epoch markers.
|
||||
func (e *Manager) CleanupMarkers(ctx context.Context) error {
|
||||
func (e *Manager) CleanupMarkers(ctx context.Context) (*maintenancestats.CleanupMarkersStats, error) {
|
||||
cs, err := e.committedState(ctx, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p, err := e.getParameters(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return e.cleanupInternal(ctx, cs, p)
|
||||
}
|
||||
|
||||
func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Parameters) error {
|
||||
func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Parameters) (*maintenancestats.CleanupMarkersStats, error) {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// find max timestamp recently written to the repository to establish storage clock.
|
||||
@@ -312,7 +313,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Pa
|
||||
// to this max time. This assumes that storage clock moves forward somewhat reasonably.
|
||||
maxTime := e.maxCleanupTime(cs)
|
||||
if maxTime.IsZero() {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// only delete blobs if a suitable replacement exists and has been written sufficiently
|
||||
@@ -320,18 +321,35 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Pa
|
||||
// may have not observed them yet.
|
||||
maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin)
|
||||
|
||||
var deletedEpochMarkers, deletedDeletionWaterMarks atomic.Int64
|
||||
|
||||
eg.Go(func() error {
|
||||
return e.cleanupEpochMarkers(ctx, cs)
|
||||
deleted, err := e.cleanupEpochMarkers(ctx, cs)
|
||||
deletedEpochMarkers.Store(int64(deleted))
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
return e.cleanupWatermarks(ctx, cs, p, maxReplacementTime)
|
||||
deleted, err := e.cleanupWatermarks(ctx, cs, p, maxReplacementTime)
|
||||
deletedDeletionWaterMarks.Store(int64(deleted))
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
return errors.Wrap(eg.Wait(), "error cleaning up index blobs")
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, errors.Wrap(err, "error cleaning up index blobs")
|
||||
}
|
||||
|
||||
result := &maintenancestats.CleanupMarkersStats{
|
||||
DeletedEpochMarkerBlobCount: int(deletedEpochMarkers.Load()),
|
||||
DeletedDeletionWaterMarkBlobCount: int(deletedDeletionWaterMarks.Load()),
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) error {
|
||||
func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) (int, error) {
|
||||
// delete epoch markers for epoch < current-1
|
||||
var toDelete []blob.ID
|
||||
|
||||
@@ -345,13 +363,13 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e
|
||||
|
||||
p, err := e.getParameters(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker")
|
||||
return len(toDelete), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker")
|
||||
}
|
||||
|
||||
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) error {
|
||||
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) (int, error) {
|
||||
var toDelete []blob.ID
|
||||
|
||||
for _, bm := range cs.DeletionWatermarkBlobs {
|
||||
@@ -369,7 +387,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs")
|
||||
return len(toDelete), errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs")
|
||||
}
|
||||
|
||||
// CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones.
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/logging"
|
||||
"github.com/kopia/kopia/repo/blob/readonly"
|
||||
"github.com/kopia/kopia/repo/maintenancestats"
|
||||
)
|
||||
|
||||
type fakeIndex struct {
|
||||
@@ -1319,9 +1320,10 @@ func TestCleanupMarkers_Empty(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
// this should be a no-op
|
||||
err := te.mgr.CleanupMarkers(ctx)
|
||||
stats, err := te.mgr.CleanupMarkers(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, stats)
|
||||
}
|
||||
|
||||
func TestCleanupMarkers_GetParametersError(t *testing.T) {
|
||||
@@ -1333,10 +1335,11 @@ func TestCleanupMarkers_GetParametersError(t *testing.T) {
|
||||
paramsError := errors.New("no parameters error")
|
||||
te.mgr.paramProvider = faultyParamsProvider{err: paramsError}
|
||||
|
||||
err := te.mgr.CleanupMarkers(ctx)
|
||||
stats, err := te.mgr.CleanupMarkers(ctx)
|
||||
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, paramsError)
|
||||
require.Nil(t, stats)
|
||||
}
|
||||
|
||||
func TestCleanupMarkers_FailToReadState(t *testing.T) {
|
||||
@@ -1349,9 +1352,10 @@ func TestCleanupMarkers_FailToReadState(t *testing.T) {
|
||||
|
||||
cancel()
|
||||
|
||||
err := te.mgr.CleanupMarkers(ctx)
|
||||
stats, err := te.mgr.CleanupMarkers(ctx)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, stats)
|
||||
}
|
||||
|
||||
func TestCleanupMarkers_AvoidCleaningUpSingleEpochMarker(t *testing.T) {
|
||||
@@ -1369,8 +1373,10 @@ func TestCleanupMarkers_AvoidCleaningUpSingleEpochMarker(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, cs.WriteEpoch)
|
||||
|
||||
err = te.mgr.CleanupMarkers(ctx)
|
||||
stats, err := te.mgr.CleanupMarkers(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, stats)
|
||||
|
||||
require.NoError(t, te.mgr.Refresh(ctx))
|
||||
|
||||
@@ -1408,8 +1414,12 @@ func TestCleanupMarkers_CleanUpManyMarkers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, cs.EpochMarkerBlobs, epochsToAdvance)
|
||||
|
||||
err = te.mgr.CleanupMarkers(ctx)
|
||||
stats, err := te.mgr.CleanupMarkers(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, &maintenancestats.CleanupMarkersStats{
|
||||
DeletedEpochMarkerBlobCount: 3,
|
||||
DeletedDeletionWaterMarkBlobCount: 0,
|
||||
}, stats)
|
||||
|
||||
// is the epoch marker preserved?
|
||||
require.NoError(t, te.mgr.Refresh(ctx))
|
||||
|
||||
@@ -396,7 +396,9 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s
|
||||
err := ReportRun(ctx, runParams.rep, TaskEpochCleanupMarkers, s, func() (maintenancestats.Kind, error) {
|
||||
userLog(ctx).Info("Cleaning up unneeded epoch markers...")
|
||||
|
||||
return nil, errors.Wrap(em.CleanupMarkers(ctx), "error removing epoch markers")
|
||||
stats, err := em.CleanupMarkers(ctx)
|
||||
|
||||
return stats, errors.Wrap(err, "error removing epoch markers")
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -23,8 +23,15 @@ type Kind interface {
|
||||
Kind() string
|
||||
}
|
||||
|
||||
// ErrUnSupportedStatKindError is reported for unsupported stats kind.
|
||||
var ErrUnSupportedStatKindError = errors.New("unsupported stats kind")
|
||||
|
||||
// BuildExtra builds an Extra from maintenance statistics.
|
||||
func BuildExtra(stats Kind) (Extra, error) {
|
||||
if stats == nil {
|
||||
return Extra{}, errors.New("invalid stats")
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(stats)
|
||||
if err != nil {
|
||||
return Extra{}, errors.Wrapf(err, "error marshaling stats %v", stats)
|
||||
@@ -37,6 +44,19 @@ func BuildExtra(stats Kind) (Extra, error) {
|
||||
}
|
||||
|
||||
// BuildFromExtra builds maintenance statistics from an Extra and returns a Summarizer.
|
||||
func BuildFromExtra(_ Extra) (Summarizer, error) {
|
||||
return nil, nil
|
||||
func BuildFromExtra(stats Extra) (Summarizer, error) {
|
||||
var result Summarizer
|
||||
|
||||
switch stats.Kind {
|
||||
case cleanupMarkersStatsKind:
|
||||
result = &CleanupMarkersStats{}
|
||||
default:
|
||||
return nil, errors.Wrapf(ErrUnSupportedStatKindError, "invalid kind for stats %v", stats)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(stats.Data, result); err != nil {
|
||||
return nil, errors.Wrapf(err, "error unmarshaling raw stats %v of kind %s to %T", stats.Data, stats.Kind, result)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
126
repo/maintenancestats/builder_test.go
Normal file
126
repo/maintenancestats/builder_test.go
Normal file
@@ -0,0 +1,126 @@
|
||||
package maintenancestats
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type unmarshalable struct {
|
||||
Data string
|
||||
Channel chan int
|
||||
}
|
||||
|
||||
func (u *unmarshalable) Kind() string {
|
||||
return u.Data
|
||||
}
|
||||
|
||||
func TestBuildExtraSuccess(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
stats Kind
|
||||
expected Extra
|
||||
}{
|
||||
{
|
||||
name: "succeed",
|
||||
stats: &CleanupMarkersStats{
|
||||
DeletedEpochMarkerBlobCount: 10,
|
||||
DeletedDeletionWaterMarkBlobCount: 20,
|
||||
},
|
||||
expected: Extra{
|
||||
Kind: "cleanupMarkersStats",
|
||||
Data: []byte(`{"deletedEpochMarkerBlobCount":10,"deletedDeletionWaterMarkBlobCount":20}`),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
result, err := BuildExtra(tc.stats)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExtraError(t *testing.T) {
|
||||
um := unmarshalable{
|
||||
Data: "fake",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
stats Kind
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "nil stats",
|
||||
expectedErr: "invalid stats",
|
||||
},
|
||||
{
|
||||
name: "marshal fails",
|
||||
stats: &um,
|
||||
expectedErr: "error marshaling stats &{fake <nil>}: json: unsupported type: chan int",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
result, err := BuildExtra(tc.stats)
|
||||
|
||||
require.EqualError(t, err, tc.expectedErr)
|
||||
require.Equal(t, Extra{}, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildFromExtraSuccess(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
stats Extra
|
||||
expected Summarizer
|
||||
}{
|
||||
{
|
||||
name: "cleanupMarkersStats",
|
||||
stats: Extra{
|
||||
Kind: cleanupMarkersStatsKind,
|
||||
Data: []byte(`{"deletedEpochMarkerBlobCount":10,"deletedDeletionWaterMarkBlobCount":20}`),
|
||||
},
|
||||
expected: &CleanupMarkersStats{
|
||||
DeletedEpochMarkerBlobCount: 10,
|
||||
DeletedDeletionWaterMarkBlobCount: 20,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
result, err := BuildFromExtra(tc.stats)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildFromExtraError(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
stats Extra
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "unsupported kind",
|
||||
expectedErr: "invalid kind for stats { []}: unsupported stats kind",
|
||||
},
|
||||
{
|
||||
name: "unmarshal fails",
|
||||
stats: Extra{
|
||||
Kind: cleanupMarkersStatsKind,
|
||||
},
|
||||
expectedErr: "error unmarshaling raw stats [] of kind cleanupMarkersStats to *maintenancestats.CleanupMarkersStats: unexpected end of JSON input",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
result, err := BuildFromExtra(tc.stats)
|
||||
|
||||
require.EqualError(t, err, tc.expectedErr)
|
||||
require.Nil(t, result)
|
||||
}
|
||||
}
|
||||
33
repo/maintenancestats/stats_cleanup_markers.go
Normal file
33
repo/maintenancestats/stats_cleanup_markers.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package maintenancestats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/kopia/kopia/internal/contentlog"
|
||||
)
|
||||
|
||||
const cleanupMarkersStatsKind = "cleanupMarkersStats"
|
||||
|
||||
// CleanupMarkersStats are the stats for cleaning up markers.
|
||||
type CleanupMarkersStats struct {
|
||||
DeletedEpochMarkerBlobCount int `json:"deletedEpochMarkerBlobCount"`
|
||||
DeletedDeletionWaterMarkBlobCount int `json:"deletedDeletionWaterMarkBlobCount"`
|
||||
}
|
||||
|
||||
// WriteValueTo writes the stats to JSONWriter.
|
||||
func (cs *CleanupMarkersStats) WriteValueTo(jw *contentlog.JSONWriter) {
|
||||
jw.BeginObjectField(cs.Kind())
|
||||
jw.IntField("deletedEpochMarkerBlobCount", cs.DeletedEpochMarkerBlobCount)
|
||||
jw.IntField("deletedDeletionWaterMarkBlobCount", cs.DeletedDeletionWaterMarkBlobCount)
|
||||
jw.EndObject()
|
||||
}
|
||||
|
||||
// Summary generates a human readable summary for the stats.
|
||||
func (cs *CleanupMarkersStats) Summary() string {
|
||||
return fmt.Sprintf("Cleaned up %v epoch markers and %v deletion watermarks", cs.DeletedEpochMarkerBlobCount, cs.DeletedDeletionWaterMarkBlobCount)
|
||||
}
|
||||
|
||||
// Kind returns the kind name for the stats.
|
||||
func (cs *CleanupMarkersStats) Kind() string {
|
||||
return cleanupMarkersStatsKind
|
||||
}
|
||||
Reference in New Issue
Block a user