mirror of
https://github.com/kopia/kopia.git
synced 2026-03-13 11:46:55 -04:00
maintenance: fixed possible starvation of full blob deletion by quick maintenance (#955)
* testing: added test that simulates running auto maintenance over several weeks of time. This ensures all maintenance tasks run with reasonable frequency. * testing: modify time interval to 30 minutes which uncovered a bug where we're starving full blob deletion * maintenance: fixed full rewrite logic to allow full rewrite after a quick rewrite * maintenance: when performing quick maintenance after full maintenance we sometimes need to do full blob deletion to ensure liveness * maintenance: refactored to improve readability
This commit is contained in:
@@ -209,49 +209,48 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa
|
||||
return errors.Wrap(err, "unable to get schedule")
|
||||
}
|
||||
|
||||
if shouldRewriteContents(s) {
|
||||
if shouldQuickRewriteContents(s) {
|
||||
// find 'q' packs that are less than 80% full and rewrite contents in them into
|
||||
// new consolidated packs, orphaning old packs in the process.
|
||||
if err := ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error {
|
||||
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
|
||||
ContentIDRange: content.AllPrefixedIDs,
|
||||
PackPrefix: content.PackBlobIDPrefixSpecial,
|
||||
ShortPacks: true,
|
||||
}, safety)
|
||||
}); err != nil {
|
||||
if err := runTaskRewriteContentsQuick(ctx, runParams, s, safety); err != nil {
|
||||
return errors.Wrap(err, "error rewriting metadata contents")
|
||||
}
|
||||
}
|
||||
|
||||
if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) {
|
||||
// delete orphaned 'q' packs after some time.
|
||||
if err := ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error {
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
|
||||
Prefix: content.PackBlobIDPrefixSpecial,
|
||||
}, safety)
|
||||
return err
|
||||
}); err != nil {
|
||||
var err error
|
||||
|
||||
// time to delete orphaned blobs after last rewrite,
|
||||
// if the last rewrite was full (started as part of full maintenance) we must complete it by
|
||||
// running full orphaned blob deletion, otherwise next quick maintenance will start a quick rewrite
|
||||
// and we'd never delete blobs orphaned by full rewrite.
|
||||
if hadRecentFullRewrite(s) {
|
||||
err = runTaskDeleteOrphanedBlobsFull(ctx, runParams, s, safety)
|
||||
} else {
|
||||
err = runTaskDeleteOrphanedBlobsQuick(ctx, runParams, s, safety)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced metadata blobs")
|
||||
}
|
||||
}
|
||||
|
||||
// consolidate many smaller indexes into fewer larger ones.
|
||||
if err := ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error {
|
||||
return IndexCompaction(ctx, runParams.rep, safety)
|
||||
}); err != nil {
|
||||
if err := runTaskIndexCompaction(ctx, runParams, s, safety); err != nil {
|
||||
return errors.Wrap(err, "error performing index compaction")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
|
||||
var safeDropTime time.Time
|
||||
func runTaskIndexCompaction(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return ReportRun(ctx, runParams.rep, TaskIndexCompaction, s, func() error {
|
||||
return IndexCompaction(ctx, runParams.rep, safety)
|
||||
})
|
||||
}
|
||||
|
||||
s, err := GetSchedule(ctx, runParams.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get schedule")
|
||||
}
|
||||
func runTaskDropDeletedContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
var safeDropTime time.Time
|
||||
|
||||
if safety.RequireTwoGCCycles {
|
||||
safeDropTime = findSafeDropTime(s.Runs[TaskSnapshotGarbageCollection], safety)
|
||||
@@ -259,39 +258,76 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf
|
||||
safeDropTime = runParams.rep.Time()
|
||||
}
|
||||
|
||||
if !safeDropTime.IsZero() {
|
||||
log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime)
|
||||
|
||||
// rewrite indexes by dropping content entries that have been marked
|
||||
// as deleted for a long time
|
||||
if err := ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error {
|
||||
return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety)
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error dropping deleted contents")
|
||||
}
|
||||
} else {
|
||||
if safeDropTime.IsZero() {
|
||||
log(ctx).Infof("Not enough time has passed since previous successful Snapshot GC. Will try again next time.")
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldRewriteContents(s) {
|
||||
log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime)
|
||||
|
||||
return ReportRun(ctx, runParams.rep, TaskDropDeletedContentsFull, s, func() error {
|
||||
return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety)
|
||||
})
|
||||
}
|
||||
|
||||
func runTaskRewriteContentsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return ReportRun(ctx, runParams.rep, TaskRewriteContentsQuick, s, func() error {
|
||||
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
|
||||
ContentIDRange: content.AllPrefixedIDs,
|
||||
PackPrefix: content.PackBlobIDPrefixSpecial,
|
||||
ShortPacks: true,
|
||||
}, safety)
|
||||
})
|
||||
}
|
||||
|
||||
func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error {
|
||||
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
|
||||
ContentIDRange: content.AllIDs,
|
||||
ShortPacks: true,
|
||||
}, safety)
|
||||
})
|
||||
}
|
||||
|
||||
func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error {
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() error {
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
|
||||
Prefix: content.PackBlobIDPrefixSpecial,
|
||||
}, safety)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
|
||||
s, err := GetSchedule(ctx, runParams.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get schedule")
|
||||
}
|
||||
|
||||
// rewrite indexes by dropping content entries that have been marked
|
||||
// as deleted for a long time
|
||||
if err := runTaskDropDeletedContentsFull(ctx, runParams, s, safety); err != nil {
|
||||
return errors.Wrap(err, "error dropping deleted contents")
|
||||
}
|
||||
|
||||
if shouldFullRewriteContents(s) {
|
||||
// find packs that are less than 80% full and rewrite contents in them into
|
||||
// new consolidated packs, orphaning old packs in the process.
|
||||
if err := ReportRun(ctx, runParams.rep, TaskRewriteContentsFull, s, func() error {
|
||||
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
|
||||
ContentIDRange: content.AllIDs,
|
||||
ShortPacks: true,
|
||||
}, safety)
|
||||
}); err != nil {
|
||||
if err := runTaskRewriteContentsFull(ctx, runParams, s, safety); err != nil {
|
||||
return errors.Wrap(err, "error rewriting contents in short packs")
|
||||
}
|
||||
}
|
||||
|
||||
if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) {
|
||||
// delete orphaned packs after some time.
|
||||
if err := ReportRun(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() error {
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety)
|
||||
return err
|
||||
}); err != nil {
|
||||
if err := runTaskDeleteOrphanedBlobsFull(ctx, runParams, s, safety); err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced blobs")
|
||||
}
|
||||
}
|
||||
@@ -302,7 +338,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf
|
||||
// shouldRewriteContents returns true if it's currently ok to rewrite contents.
|
||||
// since each content rewrite will require deleting of orphaned blobs after some time passes,
|
||||
// we don't want to starve blob deletion by constantly doing rewrites.
|
||||
func shouldRewriteContents(s *Schedule) bool {
|
||||
func shouldQuickRewriteContents(s *Schedule) bool {
|
||||
latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull], s.Runs[TaskRewriteContentsQuick])
|
||||
latestBlobDeleteTime := maxEndTime(s.Runs[TaskDeleteOrphanedBlobsFull], s.Runs[TaskDeleteOrphanedBlobsQuick])
|
||||
|
||||
@@ -314,6 +350,23 @@ func shouldRewriteContents(s *Schedule) bool {
|
||||
return latestBlobDeleteTime.After(latestContentRewriteEndTime)
|
||||
}
|
||||
|
||||
// shouldFullRewriteContents returns true if it's currently ok to rewrite contents.
|
||||
// since each content rewrite will require deleting of orphaned blobs after some time passes,
|
||||
// we don't want to starve blob deletion by constantly doing rewrites.
|
||||
func shouldFullRewriteContents(s *Schedule) bool {
|
||||
// NOTE - we're not looking at TaskRewriteContentsQuick here, this allows full rewrite to sometimes
|
||||
// follow quick rewrite.
|
||||
latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull])
|
||||
latestBlobDeleteTime := maxEndTime(s.Runs[TaskDeleteOrphanedBlobsFull], s.Runs[TaskDeleteOrphanedBlobsQuick])
|
||||
|
||||
// never did rewrite - safe to do so.
|
||||
if latestContentRewriteEndTime.IsZero() {
|
||||
return true
|
||||
}
|
||||
|
||||
return latestBlobDeleteTime.After(latestContentRewriteEndTime)
|
||||
}
|
||||
|
||||
// shouldDeleteOrphanedPacks returns true if it's ok to delete orphaned packs.
|
||||
// it is only safe to do so after >1hr since the last content rewrite finished to ensure
|
||||
// other clients refresh their indexes.
|
||||
@@ -325,6 +378,10 @@ func shouldDeleteOrphanedPacks(now time.Time, s *Schedule, safety SafetyParamete
|
||||
return now.After(latestContentRewriteEndTime.Add(safety.MinRewriteToOrphanDeletionDelay))
|
||||
}
|
||||
|
||||
func hadRecentFullRewrite(s *Schedule) bool {
|
||||
return maxEndTime(s.Runs[TaskRewriteContentsFull]).After(maxEndTime(s.Runs[TaskRewriteContentsQuick]))
|
||||
}
|
||||
|
||||
func maxEndTime(taskRuns ...[]RunInfo) time.Time {
|
||||
var result time.Time
|
||||
|
||||
|
||||
@@ -81,12 +81,14 @@ func TestShouldDeleteOrphanedBlobs(t *testing.T) {
|
||||
|
||||
func TestShouldRewriteContents(t *testing.T) {
|
||||
cases := []struct {
|
||||
runs map[TaskType][]RunInfo
|
||||
want bool
|
||||
runs map[TaskType][]RunInfo
|
||||
wantFull bool
|
||||
wantQuick bool
|
||||
}{
|
||||
{
|
||||
runs: map[TaskType][]RunInfo{},
|
||||
want: true,
|
||||
runs: map[TaskType][]RunInfo{},
|
||||
wantFull: true,
|
||||
wantQuick: true,
|
||||
},
|
||||
{
|
||||
runs: map[TaskType][]RunInfo{
|
||||
@@ -97,7 +99,8 @@ func TestShouldRewriteContents(t *testing.T) {
|
||||
RunInfo{Success: true, End: t0700},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
wantFull: true,
|
||||
wantQuick: true,
|
||||
},
|
||||
{
|
||||
runs: map[TaskType][]RunInfo{
|
||||
@@ -108,7 +111,20 @@ func TestShouldRewriteContents(t *testing.T) {
|
||||
RunInfo{Success: true, End: t0715},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
wantFull: false,
|
||||
wantQuick: false,
|
||||
},
|
||||
{
|
||||
runs: map[TaskType][]RunInfo{
|
||||
TaskDeleteOrphanedBlobsQuick: {
|
||||
RunInfo{Success: true, End: t0700},
|
||||
},
|
||||
TaskRewriteContentsQuick: {
|
||||
RunInfo{Success: true, End: t0715},
|
||||
},
|
||||
},
|
||||
wantFull: true, // will be allowed despite quick run having just finished
|
||||
wantQuick: false,
|
||||
},
|
||||
{
|
||||
runs: map[TaskType][]RunInfo{
|
||||
@@ -119,12 +135,16 @@ func TestShouldRewriteContents(t *testing.T) {
|
||||
RunInfo{Success: true, End: t0700},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
wantFull: true,
|
||||
wantQuick: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
require.Equal(t, tc.want, shouldRewriteContents(&Schedule{
|
||||
require.Equal(t, tc.wantQuick, shouldQuickRewriteContents(&Schedule{
|
||||
Runs: tc.runs,
|
||||
}))
|
||||
require.Equal(t, tc.wantFull, shouldFullRewriteContents(&Schedule{
|
||||
Runs: tc.runs,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package snapshotmaintenance_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kylelemons/godebug/pretty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
@@ -181,6 +183,78 @@ func newTestHarness(t *testing.T) *testHarness {
|
||||
return th
|
||||
}
|
||||
|
||||
func TestMaintenanceAutoLiveness(t *testing.T) {
|
||||
ft := faketime.NewClockTimeWithOffset(0)
|
||||
|
||||
ctx, env := repotesting.NewEnvironment(t, repotesting.Options{
|
||||
OpenOptions: func(o *repo.Options) {
|
||||
o.TimeNowFunc = ft.NowFunc()
|
||||
},
|
||||
})
|
||||
|
||||
// create dummy snapshot.
|
||||
si := snapshot.SourceInfo{
|
||||
Host: "host",
|
||||
UserName: "user",
|
||||
Path: "/foo",
|
||||
}
|
||||
|
||||
dir := mockfs.NewDirectory()
|
||||
dir.AddDir("d1", defaultPermissions)
|
||||
dir.AddFile("d1/f2", []byte{1, 2, 3, 4}, defaultPermissions)
|
||||
|
||||
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(w repo.RepositoryWriter) error {
|
||||
_, err := createSnapshot(testlogging.Context(t), w, dir, si, "")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to create snapshot")
|
||||
}
|
||||
|
||||
dp := maintenance.DefaultParams()
|
||||
dp.Owner = env.Repository.ClientOptions().UsernameAtHost()
|
||||
return maintenance.SetParams(ctx, w, &dp)
|
||||
}))
|
||||
|
||||
// simulate several weeks of triggering auto maintenance few times an hour.
|
||||
deadline := ft.NowFunc()().Add(21 * 24 * time.Hour)
|
||||
|
||||
for ft.NowFunc()().Before(deadline) {
|
||||
ft.Advance(30 * time.Minute)
|
||||
|
||||
t.Logf("running maintenance at %v", ft.NowFunc()())
|
||||
require.NoError(t, repo.DirectWriteSession(ctx, env.RepositoryWriter, repo.WriteSessionOptions{}, func(dw repo.DirectRepositoryWriter) error {
|
||||
return snapshotmaintenance.Run(context.Background(), dw, maintenance.ModeAuto, false, maintenance.SafetyFull)
|
||||
}))
|
||||
|
||||
// verify that at all points in time the last execution time of all tasks is in the last 48 hours.
|
||||
const maxTimeSinceLastRun = 48 * time.Hour
|
||||
|
||||
sched, err := maintenance.GetSchedule(ctx, env.RepositoryWriter)
|
||||
require.NoError(t, err)
|
||||
|
||||
now := ft.NowFunc()()
|
||||
|
||||
for k, v := range sched.Runs {
|
||||
if age := now.Sub(v[0].End); age > maxTimeSinceLastRun {
|
||||
if age > maxTimeSinceLastRun {
|
||||
t.Fatalf("at %v the last run of %v was too old (%v vs %v)", now, k, age, maxTimeSinceLastRun)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure all tasks executed at least once.
|
||||
sched, err := maintenance.GetSchedule(ctx, env.RepositoryWriter)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskDeleteOrphanedBlobsFull], maintenance.TaskDeleteOrphanedBlobsFull)
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskDeleteOrphanedBlobsQuick], maintenance.TaskDeleteOrphanedBlobsQuick)
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskDropDeletedContentsFull], maintenance.TaskDropDeletedContentsFull)
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskIndexCompaction], maintenance.TaskIndexCompaction)
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskRewriteContentsFull], maintenance.TaskRewriteContentsFull)
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskRewriteContentsQuick], maintenance.TaskRewriteContentsQuick)
|
||||
require.NotEmpty(t, sched.Runs[maintenance.TaskSnapshotGarbageCollection], maintenance.TaskSnapshotGarbageCollection)
|
||||
}
|
||||
|
||||
func (th *testHarness) fakeTimeOpenRepoOption(o *repo.Options) {
|
||||
o.TimeNowFunc = th.fakeTime.NowFunc()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user