From 90c4a3c978d2e6ce1c97184e4e6d419f00060734 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Mon, 21 Oct 2024 12:06:50 -0700 Subject: [PATCH] fix(general): run epoch maintenance for quick maintenance (#4185) Changes: * test that quick maintenance runs when epoch manager is enabled * fix(general): run epoch maintenance for quick maintenance Change based on a known-to-be-safe portion of the changes proposed in #3901 * cleanup: pass epoch manager to `runTaskEpochMaintenanceQuick` The caller needs to get the epoch manager to determine whether or not the epoch manager is enabled. The caller now passes the epoch manager to `runTaskEpochMaintenanceQuick` * wrap the error inside runTaskEpochMaintenanceQuick --- repo/maintenance/maintenance_quick_test.go | 217 +++++++++++++++++++++ repo/maintenance/maintenance_run.go | 36 ++-- 2 files changed, 230 insertions(+), 23 deletions(-) create mode 100644 repo/maintenance/maintenance_quick_test.go diff --git a/repo/maintenance/maintenance_quick_test.go b/repo/maintenance/maintenance_quick_test.go new file mode 100644 index 000000000..7c5a4e309 --- /dev/null +++ b/repo/maintenance/maintenance_quick_test.go @@ -0,0 +1,217 @@ +package maintenance_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/format" + "github.com/kopia/kopia/repo/maintenance" + "github.com/kopia/kopia/repo/object" + "github.com/kopia/kopia/snapshot/snapshotmaintenance" +) + +// Ensure quick maintenance runs when the epoch manager is enabled. +func TestQuickMaintenanceRunWithEpochManager(t *testing.T) { + t.Parallel() + + ctx, env := repotesting.NewEnvironment(t, format.FormatVersion3) + + // set the repository owner since it is not set by NewEnvironment + maintParams, err := maintenance.GetParams(ctx, env.Repository) + require.NoError(t, err) + + co := env.Repository.ClientOptions() + require.NotZero(t, co) + + maintParams.Owner = co.UsernameAtHost() + + err = maintenance.SetParams(ctx, env.RepositoryWriter, maintParams) + require.NoError(t, err) + + require.NoError(t, env.RepositoryWriter.Flush(ctx)) + + // verify the owner was set + maintParams, err = maintenance.GetParams(ctx, env.Repository) + require.NoError(t, err) + require.Equal(t, co.UsernameAtHost(), maintParams.Owner) + + // verify epoch manager is enabled + dr, isDirect := env.Repository.(repo.DirectRepository) + require.True(t, isDirect) + require.NotNil(t, dr) + + fm := dr.FormatManager() + require.NotNil(t, fm) + + mp, err := fm.GetMutableParameters(ctx) + require.NoError(t, err) + require.True(t, mp.EpochParameters.Enabled) + + // verify quick maintenance has NOT run yet + sch, err := maintenance.GetSchedule(ctx, env.RepositoryWriter) + + require.NoError(t, err) + require.True(t, sch.NextFullMaintenanceTime.IsZero(), "unexpected NextFullMaintenanceTime") + require.True(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime") + + err = snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeQuick, false, maintenance.SafetyFull) + require.NoError(t, err) + + // verify quick maintenance was run + sch, err = maintenance.GetSchedule(ctx, env.RepositoryWriter) + + require.NoError(t, err) + + require.NotEmpty(t, sch.Runs, "maintenance runs") + require.False(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime") + require.True(t, sch.NextFullMaintenanceTime.IsZero(), "unexpected NextFullMaintenanceTime") + + verifyEpochTasksRanInQuickMaintenance(t, ctx, env.RepositoryWriter) +} + +func TestQuickMaintenanceAdvancesEpoch(t *testing.T) { + t.Parallel() + + ft := faketime.NewAutoAdvance(time.Date(2024, time.October, 18, 0, 0, 0, 0, time.UTC), time.Second) + ctx, env := repotesting.NewEnvironment(t, format.FormatVersion3, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ft.NowFunc() + }, + }) + + // set the repository owner since it is not set by NewEnvironment + maintParams, err := maintenance.GetParams(ctx, env.Repository) + require.NoError(t, err) + + co := env.Repository.ClientOptions() + require.NotZero(t, co) + + maintParams.Owner = co.UsernameAtHost() + maintenance.SetParams(ctx, env.RepositoryWriter, maintParams) + + require.NoError(t, err) + require.NoError(t, env.RepositoryWriter.Flush(ctx)) + + maintParams, err = maintenance.GetParams(ctx, env.Repository) + require.NoError(t, err) + require.Equal(t, co.UsernameAtHost(), maintParams.Owner) + + // verify epoch manager is enabled + dr, isDirect := env.Repository.(repo.DirectRepository) + require.True(t, isDirect) + require.NotNil(t, dr) + + fm := dr.FormatManager() + require.NotNil(t, fm) + + mp, err := fm.GetMutableParameters(ctx) + require.NoError(t, err) + require.True(t, mp.EpochParameters.Enabled, "epoch manager not enabled") + + emgr, enabled, err := dr.ContentReader().EpochManager(ctx) + require.NoError(t, err) + require.True(t, enabled, "epoch manager not enabled") + + countThreshold := mp.EpochParameters.EpochAdvanceOnCountThreshold + epochDuration := mp.EpochParameters.MinEpochDuration + + err = env.Repository.Refresh(ctx) + require.NoError(t, err) + + // write countThreshold index blobs: writing an object & flushing creates + // an index blob + for c := range countThreshold { + err = repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) (err error) { + ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + require.NotNil(t, ow) + + defer func() { + cerr := ow.Close() + err = errors.Join(err, cerr) + }() + + _, err = fmt.Fprintf(ow, "%v-%v", 0, c) // epoch count, object count + if err != nil { + return err + } + + _, err = ow.Result() // force content write + + return err + }) + + require.NoError(t, err) + } + + // advance time and write more index to force epoch advancement on maintenance + ft.Advance(epochDuration + time.Second) + ow := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + require.NotNil(t, ow) + + _, err = fmt.Fprintf(ow, "%v-%v", 0, "last-object-in-epoch") + require.NoError(t, err) + + _, err = ow.Result() // force content write + require.NoError(t, err) + + err = ow.Close() + require.NoError(t, err) + + // verify that there are enough index blobs to advance the epoch + epochSnap, err := emgr.Current(ctx) + require.NoError(t, err) + + err = env.RepositoryWriter.Flush(ctx) + require.NoError(t, err) + + require.Zero(t, epochSnap.WriteEpoch, "write epoch was advanced") + require.GreaterOrEqual(t, len(epochSnap.UncompactedEpochSets[0]), countThreshold, "not enough index blobs were written") + + // verify quick maintenance has NOT run yet + sch, err := maintenance.GetSchedule(ctx, env.RepositoryWriter) + + require.NoError(t, err) + require.True(t, sch.NextFullMaintenanceTime.IsZero(), "unexpected NextFullMaintenanceTime") + require.True(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime") + + err = snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeQuick, false, maintenance.SafetyFull) + require.NoError(t, err) + + verifyEpochTasksRanInQuickMaintenance(t, ctx, env.RepositoryWriter) + + // verify epoch was advanced + err = emgr.Refresh(ctx) + require.NoError(t, err) + + epochSnap, err = emgr.Current(ctx) + require.NoError(t, err) + require.Positive(t, epochSnap.WriteEpoch, "write epoch was NOT advanced") +} + +func verifyEpochTasksRanInQuickMaintenance(t *testing.T, ctx context.Context, rep repo.DirectRepository) { + t.Helper() + + // verify quick maintenance ran + sch, err := maintenance.GetSchedule(ctx, rep) + + require.NoError(t, err) + require.False(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime") + require.NotEmpty(t, sch.Runs, "quick maintenance did not run") + + // note: this does not work => require.Contains(t, sch.Runs, maintenance.TaskEpochAdvance) + r, exists := sch.Runs[maintenance.TaskEpochAdvance] + require.True(t, exists) + require.NotEmpty(t, r) + + r, exists = sch.Runs[maintenance.TaskEpochCompactSingle] + require.True(t, exists) + require.NotEmpty(t, r) +} diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 2431c0142..f2638d677 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -252,21 +252,22 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters) } func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { - _, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx) + s, err := GetSchedule(ctx, runParams.rep) + if err != nil { + return errors.Wrap(err, "unable to get schedule") + } + + em, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx) if ok { - log(ctx).Debug("quick maintenance not required for epoch manager") - return nil + log(ctx).Debug("running quick epoch maintenance only") + + return runTaskEpochMaintenanceQuick(ctx, em, runParams, s) } if emerr != nil { return errors.Wrap(emerr, "epoch manager") } - s, err := GetSchedule(ctx, runParams.rep) - if err != nil { - return errors.Wrap(err, "unable to get schedule") - } - if shouldQuickRewriteContents(s, safety) { // find 'q' packs that are less than 80% full and rewrite contents in them into // new consolidated packs, orphaning old packs in the process. @@ -299,10 +300,6 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa notDeletingOrphanedBlobs(ctx, s, safety) } - if err := runTaskEpochMaintenanceQuick(ctx, runParams, s); err != nil { - return errors.Wrap(err, "error running quick epoch maintenance tasks") - } - // consolidate many smaller indexes into fewer larger ones. if err := runTaskIndexCompactionQuick(ctx, runParams, s, safety); err != nil { return errors.Wrap(err, "error performing index compaction") @@ -343,16 +340,7 @@ func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunPa }) } -func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters, s *Schedule) error { - em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx) - if emerr != nil { - return errors.Wrap(emerr, "epoch manager") - } - - if !hasEpochManager { - return nil - } - +func runTaskEpochMaintenanceQuick(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error { err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { log(ctx).Info("Compacting an eligible uncompacted epoch...") return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch") @@ -361,7 +349,9 @@ func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters, return err } - return runTaskEpochAdvance(ctx, em, runParams, s) + err = runTaskEpochAdvance(ctx, em, runParams, s) + + return errors.Wrap(err, "error to advance epoch in quick epoch maintenance task") } func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s *Schedule) error {