From 3f03531303df2bb954dc9dcc25a43e12ea2860d3 Mon Sep 17 00:00:00 2001 From: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> Date: Tue, 23 Sep 2025 19:08:40 -0700 Subject: [PATCH] feat(general): content-to-pack consistency checks in maintenance (#4832) Probabilistically perform content-to-pack consistency checks in different maintenance phases. --- .../content_index_to_pack_check.go | 73 +++++++++++++++++++ repo/maintenance/index_compaction.go | 2 +- repo/maintenance/maintenance_run.go | 20 ++--- 3 files changed, 84 insertions(+), 11 deletions(-) create mode 100644 repo/maintenance/content_index_to_pack_check.go diff --git a/repo/maintenance/content_index_to_pack_check.go b/repo/maintenance/content_index_to_pack_check.go new file mode 100644 index 000000000..8c8b8c14f --- /dev/null +++ b/repo/maintenance/content_index_to_pack_check.go @@ -0,0 +1,73 @@ +package maintenance + +import ( + "context" + "math/rand/v2" + "os" + "strconv" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/content/index" +) + +// Checks the consistency of the mapping from content index entries to packs, +// to verify that all the referenced packs are present in storage. +func checkContentIndexToPacks(ctx context.Context, r content.Reader) error { + const verifyContentsDefaultParallelism = 5 + + opts := content.VerifyOptions{ + ContentIDRange: index.AllIDs, + ContentReadPercentage: 0, + IncludeDeletedContents: true, + ContentIterateParallelism: verifyContentsDefaultParallelism, + } + + if err := r.VerifyContents(ctx, opts); err != nil { + return errors.Wrap(err, "maintenance verify contents") + } + + return nil +} + +func shouldRunContentIndexVerify(ctx context.Context) bool { + const envName = "KOPIA_MAINTENANCE_CONTENT_VERIFY_PERCENTAGE" + + v := os.Getenv(envName) + if v == "" { + return false + } + + percentage, err := strconv.ParseFloat(v, 64) + if err != nil { + log(ctx).Warnf("The '%s' environment variable appears to have a non numeric value: '%q', %s", envName, v, err) + + return false + } + + if rand.Float64() < percentage/100 { //nolint:gosec + return true + } + + return false +} + +func reportRunAndMaybeCheckContentIndex(ctx context.Context, rep repo.DirectRepositoryWriter, taskType TaskType, s *Schedule, run func() error) error { + if !shouldRunContentIndexVerify(ctx) { + return ReportRun(ctx, rep, taskType, s, run) + } + + return ReportRun(ctx, rep, taskType, s, func() error { + if err := checkContentIndexToPacks(ctx, rep.ContentReader()); err != nil { + return err + } + + if err := run(); err != nil { + return err + } + + return checkContentIndexToPacks(ctx, rep.ContentReader()) + }) +} diff --git a/repo/maintenance/index_compaction.go b/repo/maintenance/index_compaction.go index a8341ce09..7c21a5571 100644 --- a/repo/maintenance/index_compaction.go +++ b/repo/maintenance/index_compaction.go @@ -8,7 +8,7 @@ // runTaskIndexCompactionQuick rewrites index blobs to reduce their count but does not drop any contents. func runTaskIndexCompactionQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { - return ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskIndexCompaction, s, func() error { log(ctx).Info("Compacting indexes...") const maxSmallBlobsForIndexCompaction = 8 diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 1f4557b05..913b3299b 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -334,14 +334,14 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul } func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error { - return ReportRun(ctx, runParams.rep, TaskEpochAdvance, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochAdvance, s, func() error { log(ctx).Info("Cleaning up no-longer-needed epoch markers...") return errors.Wrap(em.MaybeAdvanceWriteEpoch(ctx), "error advancing epoch marker") }) } func runTaskEpochMaintenanceQuick(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error { - err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { + err := reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { log(ctx).Info("Compacting an eligible uncompacted epoch...") return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch") }) @@ -365,7 +365,7 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s } // compact a single epoch - if err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { + if err := reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { log(ctx).Info("Compacting an eligible uncompacted epoch...") return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch") }); err != nil { @@ -377,7 +377,7 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s } // compact range - if err := ReportRun(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error { + if err := reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error { log(ctx).Info("Attempting to compact a range of epoch indexes ...") return errors.Wrap(em.MaybeGenerateRangeCheckpoint(ctx), "error creating epoch range indexes") @@ -395,7 +395,7 @@ func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s return err } - return ReportRun(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error { log(ctx).Info("Cleaning up old index blobs which have already been compacted...") return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error removing superseded epoch index blobs") }) @@ -417,13 +417,13 @@ func runTaskDropDeletedContentsFull(ctx context.Context, runParams RunParameters log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime) - return ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error { return dropDeletedContents(ctx, runParams.rep, safeDropTime, safety) }) } func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { - return ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error { return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ ContentIDRange: index.AllPrefixedIDs, PackPrefix: content.PackBlobIDPrefixSpecial, @@ -433,7 +433,7 @@ func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s } func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { - return ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error { return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ ContentIDRange: index.AllIDs, ShortPacks: true, @@ -442,7 +442,7 @@ func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s } func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { - return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error { _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ NotAfterTime: runParams.MaintenanceStartTime, Parallel: runParams.Params.ListParallelism, @@ -453,7 +453,7 @@ func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters } func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { - return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error { + return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error { _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ NotAfterTime: runParams.MaintenanceStartTime, Prefix: content.PackBlobIDPrefixSpecial,