diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index ba8e569df..526177acf 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -27,6 +27,20 @@ ModeAuto Mode = "auto" // run either quick of full if required by schedule ) +// TaskType identifies the type of a maintenance task. +type TaskType string + +// Task IDs. +const ( + TaskSnapshotGarbageCollection = "snapshot-gc" + TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs" + TaskDeleteOrphanedBlobsFull = "full-delete-blobs" + TaskRewriteContentsQuick = "quick-rewrite-contents" + TaskRewriteContentsFull = "full-rewrite-contents" + TaskDropDeletedContentsFull = "full-drop-deleted-content" + TaskIndexCompaction = "index-compaction" +) + // shouldRun returns Mode if repository is due for periodic maintenance. func shouldRun(ctx context.Context, rep repo.DirectRepository, p *Params) (Mode, error) { if myUsername := rep.ClientOptions().UsernameAtHost(); p.Owner != myUsername { @@ -190,30 +204,39 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters) } func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { - // find 'q' packs that are less than 80% full and rewrite contents in them into - // new consolidated packs, orphaning old packs in the process. - if err := ReportRun(ctx, runParams.rep, "quick-rewrite-contents", func() error { - return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ - ContentIDRange: content.AllPrefixedIDs, - PackPrefix: content.PackBlobIDPrefixSpecial, - ShortPacks: true, - }, safety) - }); err != nil { - return errors.Wrap(err, "error rewriting metadata contents") + s, err := GetSchedule(ctx, runParams.rep) + if err != nil { + return errors.Wrap(err, "unable to get schedule") } - // delete orphaned 'q' packs after some time. - if err := ReportRun(ctx, runParams.rep, "quick-delete-blobs", func() error { - _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ - Prefix: content.PackBlobIDPrefixSpecial, - }, safety) - return err - }); err != nil { - return errors.Wrap(err, "error deleting unreferenced metadata blobs") + if shouldRewriteContents(s) { + // find 'q' packs that are less than 80% full and rewrite contents in them into + // new consolidated packs, orphaning old packs in the process. + if err := ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error { + return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ + ContentIDRange: content.AllPrefixedIDs, + PackPrefix: content.PackBlobIDPrefixSpecial, + ShortPacks: true, + }, safety) + }); err != nil { + return errors.Wrap(err, "error rewriting metadata contents") + } + } + + if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) { + // delete orphaned 'q' packs after some time. + if err := ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error { + _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ + Prefix: content.PackBlobIDPrefixSpecial, + }, safety) + return err + }); err != nil { + return errors.Wrap(err, "error deleting unreferenced metadata blobs") + } } // consolidate many smaller indexes into fewer larger ones. - if err := ReportRun(ctx, runParams.rep, "index-compaction", func() error { + if err := ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error { return IndexCompaction(ctx, runParams.rep, safety) }); err != nil { return errors.Wrap(err, "error performing index compaction") @@ -225,13 +248,13 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { var safeDropTime time.Time - if safety.RequireTwoGCCycles { - s, err := GetSchedule(ctx, runParams.rep) - if err != nil { - return errors.Wrap(err, "unable to get schedule") - } + s, err := GetSchedule(ctx, runParams.rep) + if err != nil { + return errors.Wrap(err, "unable to get schedule") + } - safeDropTime = findSafeDropTime(s.Runs["snapshot-gc"], safety) + if safety.RequireTwoGCCycles { + safeDropTime = findSafeDropTime(s.Runs[TaskSnapshotGarbageCollection], safety) } else { safeDropTime = runParams.rep.Time() } @@ -241,7 +264,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf // rewrite indexes by dropping content entries that have been marked // as deleted for a long time - if err := ReportRun(ctx, runParams.rep, "full-drop-deleted-content", func() error { + if err := ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error { return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety) }); err != nil { return errors.Wrap(err, "error dropping deleted contents") @@ -250,28 +273,76 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf log(ctx).Infof("Not enough time has passed since previous successful Snapshot GC. Will try again next time.") } - // find packs that are less than 80% full and rewrite contents in them into - // new consolidated packs, orphaning old packs in the process. - if err := ReportRun(ctx, runParams.rep, "full-rewrite-contents", func() error { - return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ - ContentIDRange: content.AllIDs, - ShortPacks: true, - }, safety) - }); err != nil { - return errors.Wrap(err, "error rewriting contents in short packs") + if shouldRewriteContents(s) { + // find packs that are less than 80% full and rewrite contents in them into + // new consolidated packs, orphaning old packs in the process. + if err := ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error { + return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ + ContentIDRange: content.AllIDs, + ShortPacks: true, + }, safety) + }); err != nil { + return errors.Wrap(err, "error rewriting contents in short packs") + } } - // delete orphaned packs after some time. - if err := ReportRun(ctx, runParams.rep, "full-delete-blobs", func() error { - _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety) - return err - }); err != nil { - return errors.Wrap(err, "error deleting unreferenced blobs") + if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) { + // delete orphaned packs after some time. + if err := ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error { + _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety) + return err + }); err != nil { + return errors.Wrap(err, "error deleting unreferenced blobs") + } } return nil } +// shouldRewriteContents returns true if it's currently ok to rewrite contents. +// since each content rewrite will require deleting of orphaned blobs after some time passes, +// we don't want to starve blob deletion by constantly doing rewrites. +func shouldRewriteContents(s *Schedule) bool { + latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull], s.Runs[TaskRewriteContentsQuick]) + latestBlobDeleteTime := maxEndTime(s.Runs[TaskDeleteOrphanedBlobsFull], s.Runs[TaskDeleteOrphanedBlobsQuick]) + + // never did rewrite - safe to do so. + if latestContentRewriteEndTime.IsZero() { + return true + } + + return latestBlobDeleteTime.After(latestContentRewriteEndTime) +} + +// shouldDeleteOrphanedPacks returns true if it's ok to delete orphaned packs. +// it is only safe to do so after >1hr since the last content rewrite finished to ensure +// other clients refresh their indexes. +// rewritten packs become orphaned immediately but if we don't wait before their deletion +// clients who have old indexes cached may be trying to read pre-rewrite blobs. +func shouldDeleteOrphanedPacks(now time.Time, s *Schedule, safety SafetyParameters) bool { + latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull], s.Runs[TaskRewriteContentsQuick]) + + return now.After(latestContentRewriteEndTime.Add(safety.MinRewriteToOrphanDeletionDelay)) +} + +func maxEndTime(taskRuns ...[]RunInfo) time.Time { + var result time.Time + + for _, tr := range taskRuns { + for _, r := range tr { + if !r.Success { + continue + } + + if r.End.After(result) { + result = r.End + } + } + } + + return result +} + // findSafeDropTime returns the latest timestamp for which it is safe to drop content entries // deleted before that time, because at least two successful GC cycles have completed // and minimum required time between the GCs has passed. diff --git a/repo/maintenance/maintenance_run_test.go b/repo/maintenance/maintenance_run_test.go index 380e3fa6b..3a19c0120 100644 --- a/repo/maintenance/maintenance_run_test.go +++ b/repo/maintenance/maintenance_run_test.go @@ -1,20 +1,136 @@ package maintenance import ( + "fmt" "testing" "time" + + "github.com/stretchr/testify/require" ) -func TestFindSafeDropTime(t *testing.T) { - var ( - t0700 = time.Date(2020, 1, 1, 7, 0, 0, 0, time.UTC) - t0715 = time.Date(2020, 1, 1, 7, 15, 0, 0, time.UTC) - t0900 = time.Date(2020, 1, 1, 9, 0, 0, 0, time.UTC) - t0915 = time.Date(2020, 1, 1, 9, 15, 0, 0, time.UTC) - t1300 = time.Date(2020, 1, 1, 13, 0, 0, 0, time.UTC) - t1315 = time.Date(2020, 1, 1, 13, 15, 0, 0, time.UTC) - ) +var ( + t0700 = time.Date(2020, 1, 1, 7, 0, 0, 0, time.UTC) + t0715 = time.Date(2020, 1, 1, 7, 15, 0, 0, time.UTC) + t0900 = time.Date(2020, 1, 1, 9, 0, 0, 0, time.UTC) + t0915 = time.Date(2020, 1, 1, 9, 15, 0, 0, time.UTC) + t1300 = time.Date(2020, 1, 1, 13, 0, 0, 0, time.UTC) + t1315 = time.Date(2020, 1, 1, 13, 15, 0, 0, time.UTC) +) +func TestShouldDeleteOrphanedBlobs(t *testing.T) { + now := t1315 + + cases := []struct { + runs map[TaskType][]RunInfo + safety SafetyParameters + want bool + }{ + { + // no rewrites + runs: map[TaskType][]RunInfo{}, + safety: SafetyFull, + want: true, + }, + { + runs: map[TaskType][]RunInfo{}, + safety: SafetyNone, + want: true, + }, + { + runs: map[TaskType][]RunInfo{ + TaskRewriteContentsQuick: { + // old enough + {End: t0900, Success: true}, + }, + }, + safety: SafetyFull, + want: true, + }, + { + runs: map[TaskType][]RunInfo{ + // recent but no safety, so will go through + TaskRewriteContentsFull: { + {End: t1300, Success: true}, + }, + }, + safety: SafetyNone, + want: true, + }, + { + runs: map[TaskType][]RunInfo{ + // too recent for full safety + TaskRewriteContentsFull: { + {End: t1300, Success: true}, + }, + }, + safety: SafetyFull, + want: false, + }, + } + + for _, tc := range cases { + tc := tc + + t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) { + require.Equal(t, tc.want, shouldDeleteOrphanedPacks(now, &Schedule{ + Runs: tc.runs, + }, tc.safety)) + }) + } +} + +func TestShouldRewriteContents(t *testing.T) { + cases := []struct { + runs map[TaskType][]RunInfo + want bool + }{ + { + runs: map[TaskType][]RunInfo{}, + want: true, + }, + { + runs: map[TaskType][]RunInfo{ + TaskDeleteOrphanedBlobsFull: { + RunInfo{Success: true, End: t0715}, + }, + TaskDeleteOrphanedBlobsQuick: { + RunInfo{Success: true, End: t0700}, + }, + }, + want: true, + }, + { + runs: map[TaskType][]RunInfo{ + TaskDeleteOrphanedBlobsQuick: { + RunInfo{Success: true, End: t0700}, + }, + TaskRewriteContentsFull: { + RunInfo{Success: true, End: t0715}, + }, + }, + want: false, + }, + { + runs: map[TaskType][]RunInfo{ + TaskDeleteOrphanedBlobsQuick: { + RunInfo{Success: true, End: t0715}, + }, + TaskRewriteContentsFull: { + RunInfo{Success: true, End: t0700}, + }, + }, + want: true, + }, + } + + for _, tc := range cases { + require.Equal(t, tc.want, shouldRewriteContents(&Schedule{ + Runs: tc.runs, + })) + } +} + +func TestFindSafeDropTime(t *testing.T) { cases := []struct { runs []RunInfo wantTime time.Time diff --git a/repo/maintenance/maintenance_safety.go b/repo/maintenance/maintenance_safety.go index f151f6a02..8437192de 100644 --- a/repo/maintenance/maintenance_safety.go +++ b/repo/maintenance/maintenance_safety.go @@ -30,6 +30,9 @@ type SafetyParameters struct { // Blob GC: Drop incomplete session blobs above this age. SessionExpirationAge time.Duration + + // Minimum time that must pass after content rewrite before we delete orphaned blobs. + MinRewriteToOrphanDeletionDelay time.Duration } // Supported safety levels. @@ -58,5 +61,6 @@ type SafetyParameters struct { RewriteMinAge: 2 * time.Hour, //nolint:gomnd SessionExpirationAge: 96 * time.Hour, //nolint:gomnd RequireTwoGCCycles: true, + MinRewriteToOrphanDeletionDelay: time.Hour, } ) diff --git a/repo/maintenance/maintenance_safety_test.go b/repo/maintenance/maintenance_safety_test.go new file mode 100644 index 000000000..2546337d7 --- /dev/null +++ b/repo/maintenance/maintenance_safety_test.go @@ -0,0 +1,142 @@ +package maintenance_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/maintenance" + "github.com/kopia/kopia/repo/object" + "github.com/kopia/kopia/snapshot/snapshotmaintenance" +) + +func TestMaintenanceSafety(t *testing.T) { + ft := faketime.NewClockTimeWithOffset(0) + + ctx, env := repotesting.NewEnvironment(t, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TraceStorage = t.Logf + o.TimeNowFunc = ft.NowFunc() + }, + }) + + anotherClient := env.MustConnectOpenAnother(t, func(o *repo.Options) { + o.TimeNowFunc = ft.NowFunc() + }) + + var objectID object.ID + + // create object that's immediately orphaned since nobody refers to it. + require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(w repo.RepositoryWriter) error { + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + fmt.Fprintf(ow, "hello world") + var err error + objectID, err = ow.Result() + return err + })) + + // create another object in separate pack. + require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(w repo.RepositoryWriter) error { + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + fmt.Fprintf(ow, "hello universe") + _, err := ow.Result() + return err + })) + + // both 'main' and 'another' client can see it + t.Logf("**** MAINTENANCE #1") + require.NoError(t, anotherClient.Refresh(ctx)) + require.NoError(t, env.Repository.Refresh(ctx)) + verifyContentDeletedState(ctx, t, env.Repository, objectID, false) + verifyObjectReadable(ctx, t, env.Repository, objectID) + verifyObjectReadable(ctx, t, anotherClient, objectID) + + // maintenance has no effect since there was no previous GC + require.NoError(t, snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)) + verifyContentDeletedState(ctx, t, env.Repository, objectID, false) + + t.Logf("**** MAINTENANCE #2") + + ft.Advance(25 * time.Hour) + // at this point there was a previous GC so content gets marked as deleted but is still readable. + require.NoError(t, snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)) + verifyContentDeletedState(ctx, t, env.Repository, objectID, true) + + require.NoError(t, anotherClient.Refresh(ctx)) + require.NoError(t, env.Repository.Refresh(ctx)) + verifyObjectReadable(ctx, t, env.Repository, objectID) + verifyObjectReadable(ctx, t, anotherClient, objectID) + + t.Logf("**** MAINTENANCE #3") + ft.Advance(4 * time.Hour) + + // run maintenance again - this time we'll rewrite the two objects together. + require.NoError(t, anotherClient.Refresh(ctx)) + require.NoError(t, env.Repository.Refresh(ctx)) + require.NoError(t, snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)) + + // the object is still readable using main client because it has updated indexes after + // rewrite. + require.NoError(t, env.Repository.Refresh(ctx)) + verifyObjectReadable(ctx, t, env.Repository, objectID) + + // verify that object is still readable using another client, to ensure we did not + // immediately delete the blob that was rewritten. + verifyObjectReadable(ctx, t, anotherClient, objectID) + + t.Logf("**** MAINTENANCE #4") + ft.Advance(4 * time.Hour) + require.NoError(t, snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)) + require.NoError(t, anotherClient.Refresh(ctx)) + require.NoError(t, env.Repository.Refresh(ctx)) + verifyObjectReadable(ctx, t, anotherClient, objectID) + verifyObjectReadable(ctx, t, env.Repository, objectID) + + t.Logf("**** MAINTENANCE #5") + ft.Advance(4 * time.Hour) + require.NoError(t, snapshotmaintenance.Run(ctx, env.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)) + require.NoError(t, anotherClient.Refresh(ctx)) + require.NoError(t, env.Repository.Refresh(ctx)) + verifyObjectNotFound(ctx, t, env.Repository, objectID) + verifyObjectNotFound(ctx, t, anotherClient, objectID) +} + +func verifyContentDeletedState(ctx context.Context, t *testing.T, rep repo.Repository, objectID object.ID, want bool) { + t.Helper() + + cid, _, _ := objectID.ContentID() + + require.NoError(t, repo.DirectWriteSession(ctx, rep.(repo.DirectRepository), repo.WriteSessionOptions{}, func(dw repo.DirectRepositoryWriter) error { + info, err := dw.ContentReader().ContentInfo(ctx, cid) + require.NoError(t, err) + require.Equal(t, want, info.Deleted) + return nil + })) +} + +func verifyObjectReadable(ctx context.Context, t *testing.T, rep repo.Repository, objectID object.ID) { + t.Helper() + + require.NoError(t, repo.WriteSession(ctx, rep, repo.WriteSessionOptions{}, func(w repo.RepositoryWriter) error { + r, err := w.OpenObject(ctx, objectID) + require.NoError(t, err) + r.Close() + return nil + })) +} + +func verifyObjectNotFound(ctx context.Context, t *testing.T, rep repo.Repository, objectID object.ID) { + t.Helper() + + require.NoError(t, repo.WriteSession(ctx, rep, repo.WriteSessionOptions{}, func(w repo.RepositoryWriter) error { + _, err := w.OpenObject(ctx, objectID) + require.ErrorIs(t, err, object.ErrObjectNotFound) + return nil + })) +} diff --git a/repo/maintenance/maintenance_schedule.go b/repo/maintenance/maintenance_schedule.go index 7d2e7a65f..530159087 100644 --- a/repo/maintenance/maintenance_schedule.go +++ b/repo/maintenance/maintenance_schedule.go @@ -41,23 +41,23 @@ type Schedule struct { NextFullMaintenanceTime time.Time `json:"nextFullMaintenance"` NextQuickMaintenanceTime time.Time `json:"nextQuickMaintenance"` - Runs map[string][]RunInfo `json:"runs"` + Runs map[TaskType][]RunInfo `json:"runs"` } // ReportRun adds the provided run information to the history and discards oldest entried. -func (s *Schedule) ReportRun(runType string, info RunInfo) { +func (s *Schedule) ReportRun(taskType TaskType, info RunInfo) { if s.Runs == nil { - s.Runs = map[string][]RunInfo{} + s.Runs = map[TaskType][]RunInfo{} } // insert as first item - history := append([]RunInfo{info}, s.Runs[runType]...) + history := append([]RunInfo{info}, s.Runs[taskType]...) if len(history) > maxRetainedRunInfoPerRunType { history = history[0:maxRetainedRunInfoPerRunType] } - s.Runs[runType] = history + s.Runs[taskType] = history } func getAES256GCM(rep repo.DirectRepository) (cipher.AEAD, error) { @@ -132,7 +132,7 @@ func SetSchedule(ctx context.Context, rep repo.DirectRepositoryWriter, s *Schedu } // ReportRun reports timing of a maintenance run and persists it in repository. -func ReportRun(ctx context.Context, rep repo.DirectRepositoryWriter, runType string, run func() error) error { +func ReportRun(ctx context.Context, rep repo.DirectRepositoryWriter, taskType TaskType, s *Schedule, run func() error) error { ri := RunInfo{ Start: rep.Time(), } @@ -147,12 +147,16 @@ func ReportRun(ctx context.Context, rep repo.DirectRepositoryWriter, runType str ri.Success = true } - s, err := GetSchedule(ctx, rep) - if err != nil { - log(ctx).Warningf("unable to get schedule") + if s == nil { + var err error + + s, err = GetSchedule(ctx, rep) + if err != nil { + log(ctx).Warningf("unable to get schedule") + } } - s.ReportRun(runType, ri) + s.ReportRun(taskType, ri) if err := SetSchedule(ctx, rep, s); err != nil { log(ctx).Warningf("unable to report run: %v", err) diff --git a/snapshot/snapshotgc/gc.go b/snapshot/snapshotgc/gc.go index abe118b5b..fb4481dff 100644 --- a/snapshot/snapshotgc/gc.go +++ b/snapshot/snapshotgc/gc.go @@ -77,7 +77,7 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters) (Stats, error) { var st Stats - err := maintenance.ReportRun(ctx, rep, "snapshot-gc", func() error { + err := maintenance.ReportRun(ctx, rep, maintenance.TaskSnapshotGarbageCollection, nil, func() error { return runInternal(ctx, rep, gcDelete, safety, &st) })