fix(general): run epoch maintenance for quick maintenance (#4185)

Changes:

* test that quick maintenance runs when epoch manager is enabled

* fix(general): run epoch maintenance for quick maintenance
  Change based on a known-to-be-safe portion of the changes proposed in #3901

* cleanup: pass epoch manager to `runTaskEpochMaintenanceQuick`
The caller needs to get the epoch manager to determine
whether or not the epoch manager is enabled. The caller
now passes the epoch manager to `runTaskEpochMaintenanceQuick`

* wrap the error inside runTaskEpochMaintenanceQuick
This commit is contained in:
Julio López
2024-10-21 12:06:50 -07:00
committed by GitHub
parent d494feedf3
commit 90c4a3c978
2 changed files with 230 additions and 23 deletions

View File

@@ -0,0 +1,217 @@
package maintenance_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
)
// Ensure quick maintenance runs when the epoch manager is enabled.
func TestQuickMaintenanceRunWithEpochManager(t *testing.T) {
t.Parallel()
ctx, env := repotesting.NewEnvironment(t, format.FormatVersion3)
// set the repository owner since it is not set by NewEnvironment
maintParams, err := maintenance.GetParams(ctx, env.Repository)
require.NoError(t, err)
co := env.Repository.ClientOptions()
require.NotZero(t, co)
maintParams.Owner = co.UsernameAtHost()
err = maintenance.SetParams(ctx, env.RepositoryWriter, maintParams)
require.NoError(t, err)
require.NoError(t, env.RepositoryWriter.Flush(ctx))
// verify the owner was set
maintParams, err = maintenance.GetParams(ctx, env.Repository)
require.NoError(t, err)
require.Equal(t, co.UsernameAtHost(), maintParams.Owner)
// verify epoch manager is enabled
dr, isDirect := env.Repository.(repo.DirectRepository)
require.True(t, isDirect)
require.NotNil(t, dr)
fm := dr.FormatManager()
require.NotNil(t, fm)
mp, err := fm.GetMutableParameters(ctx)
require.NoError(t, err)
require.True(t, mp.EpochParameters.Enabled)
// verify quick maintenance has NOT run yet
sch, err := maintenance.GetSchedule(ctx, env.RepositoryWriter)
require.NoError(t, err)
require.True(t, sch.NextFullMaintenanceTime.IsZero(), "unexpected NextFullMaintenanceTime")
require.True(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime")
err = snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeQuick, false, maintenance.SafetyFull)
require.NoError(t, err)
// verify quick maintenance was run
sch, err = maintenance.GetSchedule(ctx, env.RepositoryWriter)
require.NoError(t, err)
require.NotEmpty(t, sch.Runs, "maintenance runs")
require.False(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime")
require.True(t, sch.NextFullMaintenanceTime.IsZero(), "unexpected NextFullMaintenanceTime")
verifyEpochTasksRanInQuickMaintenance(t, ctx, env.RepositoryWriter)
}
func TestQuickMaintenanceAdvancesEpoch(t *testing.T) {
t.Parallel()
ft := faketime.NewAutoAdvance(time.Date(2024, time.October, 18, 0, 0, 0, 0, time.UTC), time.Second)
ctx, env := repotesting.NewEnvironment(t, format.FormatVersion3, repotesting.Options{
OpenOptions: func(o *repo.Options) {
o.TimeNowFunc = ft.NowFunc()
},
})
// set the repository owner since it is not set by NewEnvironment
maintParams, err := maintenance.GetParams(ctx, env.Repository)
require.NoError(t, err)
co := env.Repository.ClientOptions()
require.NotZero(t, co)
maintParams.Owner = co.UsernameAtHost()
maintenance.SetParams(ctx, env.RepositoryWriter, maintParams)
require.NoError(t, err)
require.NoError(t, env.RepositoryWriter.Flush(ctx))
maintParams, err = maintenance.GetParams(ctx, env.Repository)
require.NoError(t, err)
require.Equal(t, co.UsernameAtHost(), maintParams.Owner)
// verify epoch manager is enabled
dr, isDirect := env.Repository.(repo.DirectRepository)
require.True(t, isDirect)
require.NotNil(t, dr)
fm := dr.FormatManager()
require.NotNil(t, fm)
mp, err := fm.GetMutableParameters(ctx)
require.NoError(t, err)
require.True(t, mp.EpochParameters.Enabled, "epoch manager not enabled")
emgr, enabled, err := dr.ContentReader().EpochManager(ctx)
require.NoError(t, err)
require.True(t, enabled, "epoch manager not enabled")
countThreshold := mp.EpochParameters.EpochAdvanceOnCountThreshold
epochDuration := mp.EpochParameters.MinEpochDuration
err = env.Repository.Refresh(ctx)
require.NoError(t, err)
// write countThreshold index blobs: writing an object & flushing creates
// an index blob
for c := range countThreshold {
err = repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) (err error) {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
require.NotNil(t, ow)
defer func() {
cerr := ow.Close()
err = errors.Join(err, cerr)
}()
_, err = fmt.Fprintf(ow, "%v-%v", 0, c) // epoch count, object count
if err != nil {
return err
}
_, err = ow.Result() // force content write
return err
})
require.NoError(t, err)
}
// advance time and write more index to force epoch advancement on maintenance
ft.Advance(epochDuration + time.Second)
ow := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
require.NotNil(t, ow)
_, err = fmt.Fprintf(ow, "%v-%v", 0, "last-object-in-epoch")
require.NoError(t, err)
_, err = ow.Result() // force content write
require.NoError(t, err)
err = ow.Close()
require.NoError(t, err)
// verify that there are enough index blobs to advance the epoch
epochSnap, err := emgr.Current(ctx)
require.NoError(t, err)
err = env.RepositoryWriter.Flush(ctx)
require.NoError(t, err)
require.Zero(t, epochSnap.WriteEpoch, "write epoch was advanced")
require.GreaterOrEqual(t, len(epochSnap.UncompactedEpochSets[0]), countThreshold, "not enough index blobs were written")
// verify quick maintenance has NOT run yet
sch, err := maintenance.GetSchedule(ctx, env.RepositoryWriter)
require.NoError(t, err)
require.True(t, sch.NextFullMaintenanceTime.IsZero(), "unexpected NextFullMaintenanceTime")
require.True(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime")
err = snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeQuick, false, maintenance.SafetyFull)
require.NoError(t, err)
verifyEpochTasksRanInQuickMaintenance(t, ctx, env.RepositoryWriter)
// verify epoch was advanced
err = emgr.Refresh(ctx)
require.NoError(t, err)
epochSnap, err = emgr.Current(ctx)
require.NoError(t, err)
require.Positive(t, epochSnap.WriteEpoch, "write epoch was NOT advanced")
}
func verifyEpochTasksRanInQuickMaintenance(t *testing.T, ctx context.Context, rep repo.DirectRepository) {
t.Helper()
// verify quick maintenance ran
sch, err := maintenance.GetSchedule(ctx, rep)
require.NoError(t, err)
require.False(t, sch.NextQuickMaintenanceTime.IsZero(), "unexpected NextQuickMaintenanceTime")
require.NotEmpty(t, sch.Runs, "quick maintenance did not run")
// note: this does not work => require.Contains(t, sch.Runs, maintenance.TaskEpochAdvance)
r, exists := sch.Runs[maintenance.TaskEpochAdvance]
require.True(t, exists)
require.NotEmpty(t, r)
r, exists = sch.Runs[maintenance.TaskEpochCompactSingle]
require.True(t, exists)
require.NotEmpty(t, r)
}

View File

@@ -252,21 +252,22 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters)
}
func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
_, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx)
s, err := GetSchedule(ctx, runParams.rep)
if err != nil {
return errors.Wrap(err, "unable to get schedule")
}
em, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx)
if ok {
log(ctx).Debug("quick maintenance not required for epoch manager")
return nil
log(ctx).Debug("running quick epoch maintenance only")
return runTaskEpochMaintenanceQuick(ctx, em, runParams, s)
}
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}
s, err := GetSchedule(ctx, runParams.rep)
if err != nil {
return errors.Wrap(err, "unable to get schedule")
}
if shouldQuickRewriteContents(s, safety) {
// find 'q' packs that are less than 80% full and rewrite contents in them into
// new consolidated packs, orphaning old packs in the process.
@@ -299,10 +300,6 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa
notDeletingOrphanedBlobs(ctx, s, safety)
}
if err := runTaskEpochMaintenanceQuick(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error running quick epoch maintenance tasks")
}
// consolidate many smaller indexes into fewer larger ones.
if err := runTaskIndexCompactionQuick(ctx, runParams, s, safety); err != nil {
return errors.Wrap(err, "error performing index compaction")
@@ -343,16 +340,7 @@ func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunPa
})
}
func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters, s *Schedule) error {
em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx)
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}
if !hasEpochManager {
return nil
}
func runTaskEpochMaintenanceQuick(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error {
err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Info("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
@@ -361,7 +349,9 @@ func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters,
return err
}
return runTaskEpochAdvance(ctx, em, runParams, s)
err = runTaskEpochAdvance(ctx, em, runParams, s)
return errors.Wrap(err, "error to advance epoch in quick epoch maintenance task")
}
func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s *Schedule) error {