feat(general): maintenance stats for drop deleted contents and compact indexes (#4948)

This commit is contained in:
lyndon-li
2025-11-05 03:40:20 +08:00
committed by GitHub
parent 4ab8ecc341
commit 83bd4d45da
16 changed files with 136 additions and 47 deletions

View File

@@ -4,6 +4,8 @@
"context"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content/indexblob"
)
@@ -46,6 +48,7 @@ func (c *commandIndexOptimize) runOptimizeCommand(ctx context.Context, rep repo.
opt.DropDeletedBefore = rep.Time().Add(-age)
}
//nolint:wrapcheck
return rep.ContentManager().CompactIndexes(ctx, opt)
_, err = rep.ContentManager().CompactIndexes(ctx, opt)
return errors.Wrap(err, "error optimizing indexes")
}

View File

@@ -239,10 +239,10 @@ func (e *Manager) Current(ctx context.Context) (CurrentSnapshot, error) {
// AdvanceDeletionWatermark moves the deletion watermark time to a given timestamp
// this causes all deleted content entries before given time to be treated as non-existent.
func (e *Manager) AdvanceDeletionWatermark(ctx context.Context, ts time.Time) error {
func (e *Manager) AdvanceDeletionWatermark(ctx context.Context, ts time.Time) (bool, error) {
cs, err := e.committedState(ctx, 0)
if err != nil {
return err
return false, err
}
if ts.Before(cs.DeletionWatermark) {
@@ -252,18 +252,18 @@ func (e *Manager) AdvanceDeletionWatermark(ctx context.Context, ts time.Time) er
logparam.Time("deletionWatermark", cs.DeletionWatermark),
)
return nil
return false, nil
}
blobID := blob.ID(fmt.Sprintf("%v%v", string(DeletionWatermarkBlobPrefix), ts.Unix()))
if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("deletion-watermark")), blob.PutOptions{}); err != nil {
return errors.Wrap(err, "error writing deletion watermark")
return false, errors.Wrap(err, "error writing deletion watermark")
}
e.Invalidate()
return nil
return true, nil
}
// Refresh refreshes information about current epoch.

View File

@@ -851,8 +851,14 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) {
if indexNum%13 == 0 {
ts := te.ft.NowFunc()().Truncate(time.Second)
require.NoError(t, te.mgr.AdvanceDeletionWatermark(ctx, ts))
require.NoError(t, te.mgr.AdvanceDeletionWatermark(ctx, ts.Add(-time.Second)))
advanced, err := te.mgr.AdvanceDeletionWatermark(ctx, ts)
require.NoError(t, err)
require.True(t, advanced)
advanced, err = te.mgr.AdvanceDeletionWatermark(ctx, ts.Add(-time.Second))
require.NoError(t, err)
require.False(t, advanced)
lastDeletionWatermark = ts
}
}

View File

@@ -13,6 +13,7 @@
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/maintenancestats"
)
// Refresh reloads the committed content indexes.
@@ -39,7 +40,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error {
}
// CompactIndexes performs compaction of index blobs ensuring that # of small index blobs is below opt.maxSmallBlobs.
func (sm *SharedManager) CompactIndexes(ctx context.Context, opt indexblob.CompactOptions) error {
func (sm *SharedManager) CompactIndexes(ctx context.Context, opt indexblob.CompactOptions) (*maintenancestats.CompactIndexesStats, error) {
// we must hold the lock here to avoid the race with Refresh() which can reload the
// current set of indexes while we process them.
sm.indexesLock.Lock()
@@ -53,19 +54,20 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt indexblob.Compa
ibm, err := sm.indexBlobManager(ctx)
if err != nil {
return err
return nil, err
}
if err := ibm.Compact(ctx, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
stats, err := ibm.Compact(ctx, opt)
if err != nil {
return nil, errors.Wrap(err, "error performing compaction")
}
// reload indexes after compaction.
if err := sm.loadPackIndexesLocked(ctx); err != nil {
return errors.Wrap(err, "error re-loading indexes")
return nil, errors.Wrap(err, "error re-loading indexes")
}
return nil
return stats, nil
}
// ParseIndexBlob loads entries in a given index blob and returns them.

View File

@@ -442,10 +442,12 @@ func (s *contentManagerSuite) TestIndexCompactionDropsContent(t *testing.T) {
bm = s.newTestContentManagerWithCustomTime(t, st, timeFunc)
// this drops deleted entries, including from index #1
require.NoError(t, bm.CompactIndexes(ctx, indexblob.CompactOptions{
_, err := bm.CompactIndexes(ctx, indexblob.CompactOptions{
DropDeletedBefore: deleteThreshold,
AllIndexes: true,
}))
})
require.NoError(t, err)
require.NoError(t, bm.Flush(ctx))
require.NoError(t, bm.CloseShared(ctx))
@@ -522,7 +524,7 @@ func (s *contentManagerSuite) TestContentManagerConcurrency(t *testing.T) {
validateIndexCount(t, data, 4, 0)
if err := bm4.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
if _, err := bm4.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
t.Errorf("compaction error: %v", err)
}
@@ -540,7 +542,7 @@ func (s *contentManagerSuite) TestContentManagerConcurrency(t *testing.T) {
verifyContent(ctx, t, bm5, bm2content, seededRandomData(32, 100))
verifyContent(ctx, t, bm5, bm3content, seededRandomData(33, 100))
if err := bm5.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
if _, err := bm5.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
t.Errorf("compaction error: %v", err)
}
}
@@ -1961,7 +1963,7 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion for
// make sure we can read everything
verifyContentManagerDataSet(ctx, t, mgr, dataSet)
if err := mgr.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
if _, err := mgr.CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
t.Fatalf("unable to compact indexes: %v", err)
}

View File

@@ -8,13 +8,14 @@
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/maintenancestats"
)
// Manager is the API of index blob manager as used by content manager.
type Manager interface {
WriteIndexBlobs(ctx context.Context, data []gather.Bytes, suffix blob.ID) ([]blob.Metadata, error)
ListActiveIndexBlobs(ctx context.Context) ([]Metadata, time.Time, error)
Compact(ctx context.Context, opts CompactOptions) error
Compact(ctx context.Context, opts CompactOptions) (*maintenancestats.CompactIndexesStats, error)
Invalidate()
}

View File

@@ -16,6 +16,7 @@
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenancestats"
)
// V0IndexBlobPrefix is the prefix for all legacy (v0) index blobs.
@@ -148,28 +149,35 @@ func (m *ManagerV0) Invalidate() {
// Compact performs compaction of index blobs by merging smaller ones into larger
// and registering compaction and cleanup blobs in the repository.
func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) error {
func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) (*maintenancestats.CompactIndexesStats, error) {
indexBlobs, _, err := m.ListActiveIndexBlobs(ctx)
if err != nil {
return errors.Wrap(err, "error listing active index blobs")
return nil, errors.Wrap(err, "error listing active index blobs")
}
mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
return nil, errors.Wrap(mperr, "mutable parameters")
}
blobsToCompact := m.getBlobsToCompact(ctx, indexBlobs, opt, mp)
if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
compacted, err := m.compactIndexBlobs(ctx, blobsToCompact, opt)
if err != nil {
return nil, errors.Wrap(err, "error performing compaction")
}
if err := m.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil {
return errors.Wrap(err, "error cleaning up index blobs")
return nil, errors.Wrap(err, "error cleaning up index blobs")
}
return nil
if compacted {
return &maintenancestats.CompactIndexesStats{
DroppedContentsDeletedBefore: opt.DropDeletedBefore,
}, nil
}
return nil, nil
}
func (m *ManagerV0) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
@@ -489,14 +497,14 @@ func (m *ManagerV0) getBlobsToCompact(ctx context.Context, indexBlobs []Metadata
return nonCompactedBlobs
}
func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata, opt CompactOptions) error {
func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata, opt CompactOptions) (bool, error) {
if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 {
return nil
return false, nil
}
mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
return false, errors.Wrap(mperr, "mutable parameters")
}
bld := make(index.Builder)
@@ -510,7 +518,7 @@ func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata
blobparam.BlobMetadataList("superseded", indexBlob.Superseded))
if err := addIndexBlobsToBuilder(ctx, m.enc, bld.Add, indexBlob.BlobID); err != nil {
return errors.Wrap(err, "error adding index to builder")
return false, errors.Wrap(err, "error adding index to builder")
}
inputs = append(inputs, indexBlob.Metadata)
@@ -522,23 +530,23 @@ func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata
dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, DefaultIndexShardSize)
if err != nil {
return errors.Wrap(err, "unable to build an index")
return false, errors.Wrap(err, "unable to build an index")
}
defer cleanupShards()
compactedIndexBlobs, err := m.WriteIndexBlobs(ctx, dataShards, "")
if err != nil {
return errors.Wrap(err, "unable to write compacted indexes")
return false, errors.Wrap(err, "unable to write compacted indexes")
}
outputs = append(outputs, compactedIndexBlobs...)
if err := m.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil {
return errors.Wrap(err, "unable to register compaction")
return false, errors.Wrap(err, "unable to register compaction")
}
return nil
return true, nil
}
func (m *ManagerV0) dropContentsFromBuilder(ctx context.Context, bld index.Builder, opt CompactOptions) {

View File

@@ -15,6 +15,7 @@
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/maintenancestats"
)
// ManagerV1 is the append-only implementation of indexblob.Manager
@@ -61,12 +62,23 @@ func (m *ManagerV1) Invalidate() {
}
// Compact advances the deletion watermark.
func (m *ManagerV1) Compact(ctx context.Context, opt CompactOptions) error {
func (m *ManagerV1) Compact(ctx context.Context, opt CompactOptions) (*maintenancestats.CompactIndexesStats, error) {
if opt.DropDeletedBefore.IsZero() {
return nil
return nil, nil
}
return errors.Wrap(m.epochMgr.AdvanceDeletionWatermark(ctx, opt.DropDeletedBefore), "error advancing deletion watermark")
advanced, err := m.epochMgr.AdvanceDeletionWatermark(ctx, opt.DropDeletedBefore)
if err != nil {
return nil, errors.Wrap(err, "error advancing deletion watermark")
}
if !advanced {
return nil, nil
}
return &maintenancestats.CompactIndexesStats{
DroppedContentsDeletedBefore: opt.DropDeletedBefore,
}, nil
}
// CompactEpoch compacts the provided index blobs and writes a new set of blobs.

View File

@@ -8,10 +8,11 @@
"github.com/kopia/kopia/internal/contentlog/logparam"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/maintenancestats"
)
// dropDeletedContents rewrites indexes while dropping deleted contents above certain age.
func dropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, dropDeletedBefore time.Time, safety SafetyParameters) error {
func dropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, dropDeletedBefore time.Time, safety SafetyParameters) (*maintenancestats.CompactIndexesStats, error) {
ctx = contentlog.WithParams(ctx,
logparam.String("span:drop-deleted-contents", contentlog.RandomSpanID()))

View File

@@ -17,7 +17,7 @@ func runTaskIndexCompactionQuick(ctx context.Context, runParams RunParameters, s
const maxSmallBlobsForIndexCompaction = 8
return nil, runParams.rep.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{
return runParams.rep.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{
MaxSmallBlobs: maxSmallBlobsForIndexCompaction,
DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety,
})

View File

@@ -444,7 +444,7 @@ func runTaskDropDeletedContentsFull(ctx context.Context, runParams RunParameters
contentlog.Log1(ctx, log, "Found safe time to drop indexes", logparam.Time("safeDropTime", safeDropTime))
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() (maintenancestats.Kind, error) {
return nil, dropDeletedContents(ctx, runParams.rep, safeDropTime, safety)
return dropDeletedContents(ctx, runParams.rep, safeDropTime, safety)
})
}

View File

@@ -58,6 +58,8 @@ func BuildFromExtra(stats Extra) (Summarizer, error) {
result = &AdvanceEpochStats{}
case compactSingleEpochStatsKind:
result = &CompactSingleEpochStats{}
case compactIndexesStatsKind:
result = &CompactIndexesStats{}
default:
return nil, errors.Wrapf(ErrUnSupportedStatKindError, "invalid kind for stats %v", stats)
}

View File

@@ -67,6 +67,16 @@ func TestBuildExtraSuccess(t *testing.T) {
Data: []byte(`{"supersededIndexBlobCount":3,"supersededIndexTotalSize":4096,"epoch":1}`),
},
},
{
name: "CompactIndexesStats",
stats: &CompactIndexesStats{
DroppedContentsDeletedBefore: time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC),
},
expected: Extra{
Kind: compactIndexesStatsKind,
Data: []byte(`{"droppedContentsDeletedBefore":"2025-01-01T00:00:00Z"}`),
},
},
}
for _, tc := range cases {
@@ -169,6 +179,16 @@ func TestBuildFromExtraSuccess(t *testing.T) {
Epoch: 1,
},
},
{
name: "CompactIndexesStats",
stats: Extra{
Kind: compactIndexesStatsKind,
Data: []byte(`{"droppedContentsDeletedBefore":"2025-01-01T00:00:00Z"}`),
},
expected: &CompactIndexesStats{
DroppedContentsDeletedBefore: time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC),
},
},
}
for _, tc := range cases {

View File

@@ -0,0 +1,32 @@
package maintenancestats
import (
"fmt"
"time"
"github.com/kopia/kopia/internal/contentlog"
)
const compactIndexesStatsKind = "compactIndexesStats"
// CompactIndexesStats are the stats for dropping deleted contents.
type CompactIndexesStats struct {
DroppedContentsDeletedBefore time.Time `json:"droppedContentsDeletedBefore"`
}
// WriteValueTo writes the stats to JSONWriter.
func (cs *CompactIndexesStats) WriteValueTo(jw *contentlog.JSONWriter) {
jw.BeginObjectField(cs.Kind())
jw.TimeField("droppedContentsDeletedBefore", cs.DroppedContentsDeletedBefore)
jw.EndObject()
}
// Summary generates a human readable summary for the stats.
func (cs *CompactIndexesStats) Summary() string {
return fmt.Sprintf("Dropped contents deleted before %v", cs.DroppedContentsDeletedBefore)
}
// Kind returns the kind name for the stats.
func (cs *CompactIndexesStats) Kind() string {
return compactIndexesStatsKind
}

View File

@@ -133,7 +133,7 @@ func (s *formatSpecificTestSuite) TestPackingSimple(t *testing.T) {
verify(ctx, t, env.RepositoryWriter, oid2a, []byte(content2), "packed-object-2")
verify(ctx, t, env.RepositoryWriter, oid3a, []byte(content3), "packed-object-3")
if err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
if _, err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
t.Errorf("optimize error: %v", err)
}
@@ -143,7 +143,7 @@ func (s *formatSpecificTestSuite) TestPackingSimple(t *testing.T) {
verify(ctx, t, env.RepositoryWriter, oid2a, []byte(content2), "packed-object-2")
verify(ctx, t, env.RepositoryWriter, oid3a, []byte(content3), "packed-object-3")
if err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
if _, err := env.RepositoryWriter.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}); err != nil {
t.Errorf("optimize error: %v", err)
}

View File

@@ -439,9 +439,9 @@ func compact(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.R
log.Debug("compact()")
return errors.Wrapf(
r.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1}),
"compact()")
_, err := r.ContentManager().CompactIndexes(ctx, indexblob.CompactOptions{MaxSmallBlobs: 1})
return errors.Wrapf(err, "compact()")
}
func flush(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {