feat(general): content-to-pack consistency checks in maintenance (#4832)

Probabilistically perform content-to-pack consistency checks in
different maintenance phases.
This commit is contained in:
Julio Lopez
2025-09-23 19:08:40 -07:00
committed by GitHub
parent aaae7e8004
commit 3f03531303
3 changed files with 84 additions and 11 deletions

View File

@@ -0,0 +1,73 @@
package maintenance
import (
"context"
"math/rand/v2"
"os"
"strconv"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
)
// Checks the consistency of the mapping from content index entries to packs,
// to verify that all the referenced packs are present in storage.
func checkContentIndexToPacks(ctx context.Context, r content.Reader) error {
const verifyContentsDefaultParallelism = 5
opts := content.VerifyOptions{
ContentIDRange: index.AllIDs,
ContentReadPercentage: 0,
IncludeDeletedContents: true,
ContentIterateParallelism: verifyContentsDefaultParallelism,
}
if err := r.VerifyContents(ctx, opts); err != nil {
return errors.Wrap(err, "maintenance verify contents")
}
return nil
}
func shouldRunContentIndexVerify(ctx context.Context) bool {
const envName = "KOPIA_MAINTENANCE_CONTENT_VERIFY_PERCENTAGE"
v := os.Getenv(envName)
if v == "" {
return false
}
percentage, err := strconv.ParseFloat(v, 64)
if err != nil {
log(ctx).Warnf("The '%s' environment variable appears to have a non numeric value: '%q', %s", envName, v, err)
return false
}
if rand.Float64() < percentage/100 { //nolint:gosec
return true
}
return false
}
func reportRunAndMaybeCheckContentIndex(ctx context.Context, rep repo.DirectRepositoryWriter, taskType TaskType, s *Schedule, run func() error) error {
if !shouldRunContentIndexVerify(ctx) {
return ReportRun(ctx, rep, taskType, s, run)
}
return ReportRun(ctx, rep, taskType, s, func() error {
if err := checkContentIndexToPacks(ctx, rep.ContentReader()); err != nil {
return err
}
if err := run(); err != nil {
return err
}
return checkContentIndexToPacks(ctx, rep.ContentReader())
})
}

View File

@@ -8,7 +8,7 @@
// runTaskIndexCompactionQuick rewrites index blobs to reduce their count but does not drop any contents.
func runTaskIndexCompactionQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskIndexCompaction, s, func() error {
log(ctx).Info("Compacting indexes...")
const maxSmallBlobsForIndexCompaction = 8

View File

@@ -334,14 +334,14 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul
}
func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error {
return ReportRun(ctx, runParams.rep, TaskEpochAdvance, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochAdvance, s, func() error {
log(ctx).Info("Cleaning up no-longer-needed epoch markers...")
return errors.Wrap(em.MaybeAdvanceWriteEpoch(ctx), "error advancing epoch marker")
})
}
func runTaskEpochMaintenanceQuick(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error {
err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
err := reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Info("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
})
@@ -365,7 +365,7 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s
}
// compact a single epoch
if err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
if err := reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Info("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
}); err != nil {
@@ -377,7 +377,7 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s
}
// compact range
if err := ReportRun(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error {
if err := reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error {
log(ctx).Info("Attempting to compact a range of epoch indexes ...")
return errors.Wrap(em.MaybeGenerateRangeCheckpoint(ctx), "error creating epoch range indexes")
@@ -395,7 +395,7 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s
return err
}
return ReportRun(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error {
log(ctx).Info("Cleaning up old index blobs which have already been compacted...")
return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error removing superseded epoch index blobs")
})
@@ -417,13 +417,13 @@ func runTaskDropDeletedContentsFull(ctx context.Context, runParams RunParameters
log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime)
return ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error {
return dropDeletedContents(ctx, runParams.rep, safeDropTime, safety)
})
}
func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error {
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
ContentIDRange: index.AllPrefixedIDs,
PackPrefix: content.PackBlobIDPrefixSpecial,
@@ -433,7 +433,7 @@ func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s
}
func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error {
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
ContentIDRange: index.AllIDs,
ShortPacks: true,
@@ -442,7 +442,7 @@ func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s
}
func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error {
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
NotAfterTime: runParams.MaintenanceStartTime,
Parallel: runParams.Params.ListParallelism,
@@ -453,7 +453,7 @@ func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters
}
func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error {
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error {
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
NotAfterTime: runParams.MaintenanceStartTime,
Prefix: content.PackBlobIDPrefixSpecial,