diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 526177acf..1694e47af 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -209,49 +209,48 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa return errors.Wrap(err, "unable to get schedule") } - if shouldRewriteContents(s) { + if shouldQuickRewriteContents(s) { // find 'q' packs that are less than 80% full and rewrite contents in them into // new consolidated packs, orphaning old packs in the process. - if err := ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error { - return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ - ContentIDRange: content.AllPrefixedIDs, - PackPrefix: content.PackBlobIDPrefixSpecial, - ShortPacks: true, - }, safety) - }); err != nil { + if err := runTaskRewriteContentsQuick(ctx, runParams, s, safety); err != nil { return errors.Wrap(err, "error rewriting metadata contents") } } if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) { - // delete orphaned 'q' packs after some time. - if err := ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error { - _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ - Prefix: content.PackBlobIDPrefixSpecial, - }, safety) - return err - }); err != nil { + var err error + + // time to delete orphaned blobs after last rewrite, + // if the last rewrite was full (started as part of full maintenance) we must complete it by + // running full orphaned blob deletion, otherwise next quick maintenance will start a quick rewrite + // and we'd never delete blobs orphaned by full rewrite. + if hadRecentFullRewrite(s) { + err = runTaskDeleteOrphanedBlobsFull(ctx, runParams, s, safety) + } else { + err = runTaskDeleteOrphanedBlobsQuick(ctx, runParams, s, safety) + } + + if err != nil { return errors.Wrap(err, "error deleting unreferenced metadata blobs") } } // consolidate many smaller indexes into fewer larger ones. - if err := ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error { - return IndexCompaction(ctx, runParams.rep, safety) - }); err != nil { + if err := runTaskIndexCompaction(ctx, runParams, s, safety); err != nil { return errors.Wrap(err, "error performing index compaction") } return nil } -func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { - var safeDropTime time.Time +func runTaskIndexCompaction(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { + return ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error { + return IndexCompaction(ctx, runParams.rep, safety) + }) +} - s, err := GetSchedule(ctx, runParams.rep) - if err != nil { - return errors.Wrap(err, "unable to get schedule") - } +func runTaskDropDeletedContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { + var safeDropTime time.Time if safety.RequireTwoGCCycles { safeDropTime = findSafeDropTime(s.Runs[TaskSnapshotGarbageCollection], safety) @@ -259,39 +258,76 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf safeDropTime = runParams.rep.Time() } - if !safeDropTime.IsZero() { - log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime) - - // rewrite indexes by dropping content entries that have been marked - // as deleted for a long time - if err := ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error { - return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety) - }); err != nil { - return errors.Wrap(err, "error dropping deleted contents") - } - } else { + if safeDropTime.IsZero() { log(ctx).Infof("Not enough time has passed since previous successful Snapshot GC. Will try again next time.") + return nil } - if shouldRewriteContents(s) { + log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime) + + return ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error { + return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety) + }) +} + +func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { + return ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error { + return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ + ContentIDRange: content.AllPrefixedIDs, + PackPrefix: content.PackBlobIDPrefixSpecial, + ShortPacks: true, + }, safety) + }) +} + +func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { + return ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error { + return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ + ContentIDRange: content.AllIDs, + ShortPacks: true, + }, safety) + }) +} + +func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { + return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error { + _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety) + return err + }) +} + +func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error { + return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error { + _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ + Prefix: content.PackBlobIDPrefixSpecial, + }, safety) + return err + }) +} + +func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { + s, err := GetSchedule(ctx, runParams.rep) + if err != nil { + return errors.Wrap(err, "unable to get schedule") + } + + // rewrite indexes by dropping content entries that have been marked + // as deleted for a long time + if err := runTaskDropDeletedContentsFull(ctx, runParams, s, safety); err != nil { + return errors.Wrap(err, "error dropping deleted contents") + } + + if shouldFullRewriteContents(s) { // find packs that are less than 80% full and rewrite contents in them into // new consolidated packs, orphaning old packs in the process. - if err := ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error { - return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ - ContentIDRange: content.AllIDs, - ShortPacks: true, - }, safety) - }); err != nil { + if err := runTaskRewriteContentsFull(ctx, runParams, s, safety); err != nil { return errors.Wrap(err, "error rewriting contents in short packs") } } if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) { // delete orphaned packs after some time. - if err := ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error { - _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety) - return err - }); err != nil { + if err := runTaskDeleteOrphanedBlobsFull(ctx, runParams, s, safety); err != nil { return errors.Wrap(err, "error deleting unreferenced blobs") } } @@ -302,7 +338,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf // shouldRewriteContents returns true if it's currently ok to rewrite contents. // since each content rewrite will require deleting of orphaned blobs after some time passes, // we don't want to starve blob deletion by constantly doing rewrites. -func shouldRewriteContents(s *Schedule) bool { +func shouldQuickRewriteContents(s *Schedule) bool { latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull], s.Runs[TaskRewriteContentsQuick]) latestBlobDeleteTime := maxEndTime(s.Runs[TaskDeleteOrphanedBlobsFull], s.Runs[TaskDeleteOrphanedBlobsQuick]) @@ -314,6 +350,23 @@ func shouldRewriteContents(s *Schedule) bool { return latestBlobDeleteTime.After(latestContentRewriteEndTime) } +// shouldFullRewriteContents returns true if it's currently ok to rewrite contents. +// since each content rewrite will require deleting of orphaned blobs after some time passes, +// we don't want to starve blob deletion by constantly doing rewrites. +func shouldFullRewriteContents(s *Schedule) bool { + // NOTE - we're not looking at TaskRewriteContentsQuick here, this allows full rewrite to sometimes + // follow quick rewrite. + latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull]) + latestBlobDeleteTime := maxEndTime(s.Runs[TaskDeleteOrphanedBlobsFull], s.Runs[TaskDeleteOrphanedBlobsQuick]) + + // never did rewrite - safe to do so. + if latestContentRewriteEndTime.IsZero() { + return true + } + + return latestBlobDeleteTime.After(latestContentRewriteEndTime) +} + // shouldDeleteOrphanedPacks returns true if it's ok to delete orphaned packs. // it is only safe to do so after >1hr since the last content rewrite finished to ensure // other clients refresh their indexes. @@ -325,6 +378,10 @@ func shouldDeleteOrphanedPacks(now time.Time, s *Schedule, safety SafetyParamete return now.After(latestContentRewriteEndTime.Add(safety.MinRewriteToOrphanDeletionDelay)) } +func hadRecentFullRewrite(s *Schedule) bool { + return maxEndTime(s.Runs[TaskRewriteContentsFull]).After(maxEndTime(s.Runs[TaskRewriteContentsQuick])) +} + func maxEndTime(taskRuns ...[]RunInfo) time.Time { var result time.Time diff --git a/repo/maintenance/maintenance_run_test.go b/repo/maintenance/maintenance_run_test.go index 3a19c0120..2592a226c 100644 --- a/repo/maintenance/maintenance_run_test.go +++ b/repo/maintenance/maintenance_run_test.go @@ -81,12 +81,14 @@ func TestShouldDeleteOrphanedBlobs(t *testing.T) { func TestShouldRewriteContents(t *testing.T) { cases := []struct { - runs map[TaskType][]RunInfo - want bool + runs map[TaskType][]RunInfo + wantFull bool + wantQuick bool }{ { - runs: map[TaskType][]RunInfo{}, - want: true, + runs: map[TaskType][]RunInfo{}, + wantFull: true, + wantQuick: true, }, { runs: map[TaskType][]RunInfo{ @@ -97,7 +99,8 @@ func TestShouldRewriteContents(t *testing.T) { RunInfo{Success: true, End: t0700}, }, }, - want: true, + wantFull: true, + wantQuick: true, }, { runs: map[TaskType][]RunInfo{ @@ -108,7 +111,20 @@ func TestShouldRewriteContents(t *testing.T) { RunInfo{Success: true, End: t0715}, }, }, - want: false, + wantFull: false, + wantQuick: false, + }, + { + runs: map[TaskType][]RunInfo{ + TaskDeleteOrphanedBlobsQuick: { + RunInfo{Success: true, End: t0700}, + }, + TaskRewriteContentsQuick: { + RunInfo{Success: true, End: t0715}, + }, + }, + wantFull: true, // will be allowed despite quick run having just finished + wantQuick: false, }, { runs: map[TaskType][]RunInfo{ @@ -119,12 +135,16 @@ func TestShouldRewriteContents(t *testing.T) { RunInfo{Success: true, End: t0700}, }, }, - want: true, + wantFull: true, + wantQuick: true, }, } for _, tc := range cases { - require.Equal(t, tc.want, shouldRewriteContents(&Schedule{ + require.Equal(t, tc.wantQuick, shouldQuickRewriteContents(&Schedule{ + Runs: tc.runs, + })) + require.Equal(t, tc.wantFull, shouldFullRewriteContents(&Schedule{ Runs: tc.runs, })) } diff --git a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go index 3815c1371..e05b62977 100644 --- a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go +++ b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go @@ -1,10 +1,12 @@ package snapshotmaintenance_test import ( + "context" "testing" "time" "github.com/kylelemons/godebug/pretty" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/kopia/kopia/fs" @@ -181,6 +183,78 @@ func newTestHarness(t *testing.T) *testHarness { return th } +func TestMaintenanceAutoLiveness(t *testing.T) { + ft := faketime.NewClockTimeWithOffset(0) + + ctx, env := repotesting.NewEnvironment(t, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ft.NowFunc() + }, + }) + + // create dummy snapshot. + si := snapshot.SourceInfo{ + Host: "host", + UserName: "user", + Path: "/foo", + } + + dir := mockfs.NewDirectory() + dir.AddDir("d1", defaultPermissions) + dir.AddFile("d1/f2", []byte{1, 2, 3, 4}, defaultPermissions) + + require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(w repo.RepositoryWriter) error { + _, err := createSnapshot(testlogging.Context(t), w, dir, si, "") + if err != nil { + return errors.Wrap(err, "unable to create snapshot") + } + + dp := maintenance.DefaultParams() + dp.Owner = env.Repository.ClientOptions().UsernameAtHost() + return maintenance.SetParams(ctx, w, &dp) + })) + + // simulate several weeks of triggering auto maintenance few times an hour. + deadline := ft.NowFunc()().Add(21 * 24 * time.Hour) + + for ft.NowFunc()().Before(deadline) { + ft.Advance(30 * time.Minute) + + t.Logf("running maintenance at %v", ft.NowFunc()()) + require.NoError(t, repo.DirectWriteSession(ctx, env.RepositoryWriter, repo.WriteSessionOptions{}, func(dw repo.DirectRepositoryWriter) error { + return snapshotmaintenance.Run(context.Background(), dw, maintenance.ModeAuto, false, maintenance.SafetyFull) + })) + + // verify that at all points in time the last execution time of all tasks is in the last 48 hours. + const maxTimeSinceLastRun = 48 * time.Hour + + sched, err := maintenance.GetSchedule(ctx, env.RepositoryWriter) + require.NoError(t, err) + + now := ft.NowFunc()() + + for k, v := range sched.Runs { + if age := now.Sub(v[0].End); age > maxTimeSinceLastRun { + if age > maxTimeSinceLastRun { + t.Fatalf("at %v the last run of %v was too old (%v vs %v)", now, k, age, maxTimeSinceLastRun) + } + } + } + } + + // make sure all tasks executed at least once. + sched, err := maintenance.GetSchedule(ctx, env.RepositoryWriter) + require.NoError(t, err) + + require.NotEmpty(t, sched.Runs[maintenance.TaskDeleteOrphanedBlobsFull], maintenance.TaskDeleteOrphanedBlobsFull) + require.NotEmpty(t, sched.Runs[maintenance.TaskDeleteOrphanedBlobsQuick], maintenance.TaskDeleteOrphanedBlobsQuick) + require.NotEmpty(t, sched.Runs[maintenance.TaskDropDeletedContentsFull], maintenance.TaskDropDeletedContentsFull) + require.NotEmpty(t, sched.Runs[maintenance.TaskIndexCompaction], maintenance.TaskIndexCompaction) + require.NotEmpty(t, sched.Runs[maintenance.TaskRewriteContentsFull], maintenance.TaskRewriteContentsFull) + require.NotEmpty(t, sched.Runs[maintenance.TaskRewriteContentsQuick], maintenance.TaskRewriteContentsQuick) + require.NotEmpty(t, sched.Runs[maintenance.TaskSnapshotGarbageCollection], maintenance.TaskSnapshotGarbageCollection) +} + func (th *testHarness) fakeTimeOpenRepoOption(o *repo.Options) { o.TimeNowFunc = th.fakeTime.NowFunc() }