From d07eb9f3002f5bfe2e11464d5651263d3614f2e8 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 2 Apr 2021 21:56:01 -0700 Subject: [PATCH] cli: added --safety=full|none flag to maintenance commands (#912) * cli: added --safety=full|none flag to maintenance commands This allows selection between safe, high-latency maintenance parameters which allow concurrent access (`full`) or low-latency which may be unsafe in certain situations when concurrent Kopia processes are running. This is a breaking change for advanced CLI commands, where it removes timing parameters and replaces them with single `--safety` option. * 'blob gc' * 'content rewrite' * 'snapshot gc' * pr renames * maintenance: fixed computation of safe time for --safety=none * maintenance: improved logging for blob gc * maintenance: do not rewrite truly short, densely packed packs * mechanical: pass eventual consistency settle time via CompactOptions * maintenance: add option to disable eventual consistency time buffers with --safety=none * maintenance: trigger flush at the end of snapshot gc * maintenance: reload indexes after compaction that drops deleted entries, this allows single-pass maintenance with --safety=none to delete all unused blobs * testing: allow debugging of integration tests inside VSCode * testing: added end-to-end maintenance test that verifies that full maintenance with --safety=none removes all data --- cli/app.go | 27 +++++++- cli/command_blob_gc.go | 21 +++---- cli/command_content_rewrite.go | 5 +- cli/command_maintenance_run.go | 3 +- cli/command_snapshot_gc.go | 11 ++-- internal/server/server.go | 2 +- repo/content/committed_read_manager.go | 15 +++-- repo/content/content_manager_indexes.go | 30 +++++++-- repo/content/index_blob_manager.go | 45 +++++++------- repo/content/index_blob_manager_test.go | 15 +++-- repo/maintenance/blob_gc.go | 33 +++------- repo/maintenance/blob_gc_test.go | 28 ++++----- repo/maintenance/content_rewrite.go | 22 ++++--- repo/maintenance/drop_deleted_contents.go | 7 ++- repo/maintenance/index_compaction.go | 5 +- repo/maintenance/maintenance_params.go | 12 ---- repo/maintenance/maintenance_run.go | 58 +++++++++-------- repo/maintenance/maintenance_run_test.go | 6 +- repo/maintenance/maintenance_safety.go | 62 +++++++++++++++++++ snapshot/snapshotgc/gc.go | 10 +-- .../snapshotmaintenance.go | 6 +- .../snapshotmaintenance_test.go | 18 +++--- tests/end_to_end_test/maintenance_test.go | 56 +++++++++++++++++ tests/end_to_end_test/snapshot_delete_test.go | 2 +- tests/end_to_end_test/snapshot_gc_test.go | 9 ++- tests/testenv/cli_test_env.go | 8 ++- 26 files changed, 324 insertions(+), 192 deletions(-) create mode 100644 repo/maintenance/maintenance_safety.go create mode 100644 tests/end_to_end_test/maintenance_test.go diff --git a/cli/app.go b/cli/app.go index f8f7f83da..4d883a3f1 100644 --- a/cli/app.go +++ b/cli/app.go @@ -48,6 +48,31 @@ aclCommands = serverCommands.Command("acl", "Manager server access control list entries") ) +var safetyByName = map[string]maintenance.SafetyParameters{ + "none": maintenance.SafetyNone, + "full": maintenance.SafetyFull, +} + +// safetyFlag defines a --safety=none|full flag that returns SafetyParameters. +func safetyFlag(c *kingpin.CmdClause) *maintenance.SafetyParameters { + var ( + result = maintenance.SafetyFull + str string + ) + + c.Flag("safety", "Safety level").Default("full").PreAction(func(pc *kingpin.ParseContext) error { + var ok bool + result, ok = safetyByName[str] + if !ok { + return errors.Errorf("unhandled safety level") + } + + return nil + }).EnumVar(&str, "full", "none") + + return &result +} + func helpFullAction(ctx *kingpin.ParseContext) error { _ = app.UsageForContextWithTemplate(ctx, 0, kingpin.DefaultUsageTemplate) @@ -213,7 +238,7 @@ func maybeRunMaintenance(ctx context.Context, rep repo.Repository) error { Purpose: "maybeRunMaintenance", OnUpload: progress.UploadedBytes, }, func(w repo.DirectRepositoryWriter) error { - return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false) + return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false, maintenance.SafetyFull) }) var noe maintenance.NotOwnedError diff --git a/cli/command_blob_gc.go b/cli/command_blob_gc.go index 5b03d201d..313551e3c 100644 --- a/cli/command_blob_gc.go +++ b/cli/command_blob_gc.go @@ -11,26 +11,23 @@ ) var ( - blobGarbageCollectCommand = blobCommands.Command("gc", "Garbage-collect unused blobs") - blobGarbageCollectCommandDelete = blobGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String() - blobGarbageCollectParallel = blobGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Default("16").Int() - blobGarbageCollectMinAge = blobGarbageCollectCommand.Flag("min-age", "Garbage-collect blobs with minimum age").Default("24h").Duration() - blobGarbageCollectSessionExpirationAge = blobGarbageCollectCommand.Flag("session-expiration-age", "Garbage-collect blobs belonging to sessions that have not been updated recently").Default("96h").Duration() - blobGarbageCollectPrefix = blobGarbageCollectCommand.Flag("prefix", "Only GC blobs with given prefix").String() + blobGarbageCollectCommand = blobCommands.Command("gc", "Garbage-collect unused blobs") + blobGarbageCollectCommandDelete = blobGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String() + blobGarbageCollectParallel = blobGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Default("16").Int() + blobGarbageCollectPrefix = blobGarbageCollectCommand.Flag("prefix", "Only GC blobs with given prefix").String() + blobGarbageCollectSafety = safetyFlag(blobGarbageCollectCommand) ) func runBlobGarbageCollectCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error { advancedCommand(ctx) opts := maintenance.DeleteUnreferencedBlobsOptions{ - DryRun: *blobGarbageCollectCommandDelete != "yes", - MinAge: *blobGarbageCollectMinAge, - SessionExpirationAge: *blobGarbageCollectSessionExpirationAge, - Parallel: *blobGarbageCollectParallel, - Prefix: blob.ID(*blobGarbageCollectPrefix), + DryRun: *blobGarbageCollectCommandDelete != "yes", + Parallel: *blobGarbageCollectParallel, + Prefix: blob.ID(*blobGarbageCollectPrefix), } - n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts) + n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts, *blobGarbageCollectSafety) if err != nil { return errors.Wrap(err, "error deleting unreferenced blobs") } diff --git a/cli/command_content_rewrite.go b/cli/command_content_rewrite.go index b1ce08b86..267007f40 100644 --- a/cli/command_content_rewrite.go +++ b/cli/command_content_rewrite.go @@ -18,7 +18,7 @@ contentRewriteFormatVersion = contentRewriteCommand.Flag("format-version", "Rewrite contents using the provided format version").Default("-1").Int() contentRewritePackPrefix = contentRewriteCommand.Flag("pack-prefix", "Only rewrite contents from pack blobs with a given prefix").String() contentRewriteDryRun = contentRewriteCommand.Flag("dry-run", "Do not actually rewrite, only print what would happen").Short('n').Bool() - contentRewriteMinAge = contentRewriteCommand.Flag("min-age", "Only rewrite contents above given age").Default("1h").Duration() + contentRewriteSafety = safetyFlag(contentRewriteCommand) ) func runContentRewriteCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error { @@ -28,12 +28,11 @@ func runContentRewriteCommand(ctx context.Context, rep repo.DirectRepositoryWrit ContentIDRange: contentIDRange(), ContentIDs: toContentIDs(*contentRewriteIDs), FormatVersion: *contentRewriteFormatVersion, - MinAge: *contentRewriteMinAge, PackPrefix: blob.ID(*contentRewritePackPrefix), Parallel: *contentRewriteParallelism, ShortPacks: *contentRewriteShortPacks, DryRun: *contentRewriteDryRun, - }) + }, *contentRewriteSafety) } func toContentIDs(s []string) []content.ID { diff --git a/cli/command_maintenance_run.go b/cli/command_maintenance_run.go index 84c775cff..ccb05f1b3 100644 --- a/cli/command_maintenance_run.go +++ b/cli/command_maintenance_run.go @@ -12,6 +12,7 @@ maintenanceRunCommand = maintenanceCommands.Command("run", "Run repository maintenance").Default() maintenanceRunFull = maintenanceRunCommand.Flag("full", "Full maintenance").Bool() maintenanceRunForce = maintenanceRunCommand.Flag("force", "Run maintenance even if not owned (unsafe)").Hidden().Bool() + maintenanceRunSafety = safetyFlag(maintenanceRunCommand) ) func runMaintenanceCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error { @@ -20,7 +21,7 @@ func runMaintenanceCommand(ctx context.Context, rep repo.DirectRepositoryWriter) mode = maintenance.ModeFull } - return snapshotmaintenance.Run(ctx, rep, mode, *maintenanceRunForce) + return snapshotmaintenance.Run(ctx, rep, mode, *maintenanceRunForce, *maintenanceRunSafety) } func init() { diff --git a/cli/command_snapshot_gc.go b/cli/command_snapshot_gc.go index f5ad2dd0b..45671470a 100644 --- a/cli/command_snapshot_gc.go +++ b/cli/command_snapshot_gc.go @@ -7,20 +7,17 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" - "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/snapshot/snapshotgc" ) var ( - snapshotGCCommand = snapshotCommands.Command("gc", "Mark contents as deleted which are not used by any snapshot").Hidden() - snapshotGCMinContentAge = snapshotGCCommand.Flag("min-age", "Minimum content age to allow deletion").Default("24h").Duration() - snapshotGCDelete = snapshotGCCommand.Flag("delete", "Delete unreferenced contents").Bool() + snapshotGCCommand = snapshotCommands.Command("gc", "Mark contents as deleted which are not used by any snapshot").Hidden() + snapshotGCDelete = snapshotGCCommand.Flag("delete", "Delete unreferenced contents").Bool() + snapshotGCSafety = safetyFlag(snapshotGCCommand) ) func runSnapshotGCCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error { - st, err := snapshotgc.Run(ctx, rep, maintenance.SnapshotGCParams{ - MinContentAge: *snapshotGCMinContentAge, - }, *snapshotGCDelete) + st, err := snapshotgc.Run(ctx, rep, *snapshotGCDelete, *snapshotGCSafety) log(ctx).Infof("GC found %v unused contents (%v bytes)", st.UnusedCount, units.BytesStringBase2(st.UnusedBytes)) log(ctx).Infof("GC found %v unused contents that are too recent to delete (%v bytes)", st.TooRecentCount, units.BytesStringBase2(st.TooRecentBytes)) diff --git a/internal/server/server.go b/internal/server/server.go index ec2fa84cd..94f791db2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -506,7 +506,7 @@ func periodicMaintenanceOnce(ctx context.Context, rep repo.Repository) error { return repo.DirectWriteSession(ctx, dr, repo.WriteSessionOptions{ Purpose: "periodicMaintenanceOnce", }, func(w repo.DirectRepositoryWriter) error { - return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false) + return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false, maintenance.SafetyFull) }) } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index a7b4f969a..4d4135c12 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -311,14 +311,13 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca sm.committedContents = contentIndex sm.indexBlobManager = &indexBlobManagerImpl{ - st: sm.st, - encryptor: sm.encryptor, - hasher: sm.hasher, - timeNow: sm.timeNow, - ownWritesCache: owc, - listCache: listCache, - indexBlobCache: metadataCache, - maxEventualConsistencySettleTime: defaultEventualConsistencySettleTime, + st: sm.st, + encryptor: sm.encryptor, + hasher: sm.hasher, + timeNow: sm.timeNow, + ownWritesCache: owc, + listCache: listCache, + indexBlobCache: metadataCache, } return nil diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 6bdb0c35d..d0b22cefb 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -14,10 +14,19 @@ // CompactOptions provides options for compaction. type CompactOptions struct { - MaxSmallBlobs int - AllIndexes bool - DropDeletedBefore time.Time - DropContents []ID + MaxSmallBlobs int + AllIndexes bool + DropDeletedBefore time.Time + DropContents []ID + DisableEventualConsistencySafety bool +} + +func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration { + if co.DisableEventualConsistencySafety { + return 0 + } + + return defaultEventualConsistencySettleTime } // CompactIndexes performs compaction of index blobs ensuring that # of small index blobs is below opt.maxSmallBlobs. @@ -38,7 +47,16 @@ func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) return errors.Wrap(err, "error performing compaction") } - return bm.indexBlobManager.cleanup(ctx) + if err := bm.indexBlobManager.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil { + return errors.Wrap(err, "error cleaning up index blobs") + } + + // reload indexes after cleanup. + if _, _, err := bm.loadPackIndexesUnlocked(ctx); err != nil { + return errors.Wrap(err, "error re-loading indexes") + } + + return nil } func (sm *SharedManager) getBlobsToCompact(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo { @@ -125,7 +143,7 @@ func (sm *SharedManager) compactIndexBlobs(ctx context.Context, indexBlobs []Ind outputs = append(outputs, compactedIndexBlob) - if err := sm.indexBlobManager.registerCompaction(ctx, inputs, outputs); err != nil { + if err := sm.indexBlobManager.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil { return errors.Wrap(err, "unable to register compaction") } diff --git a/repo/content/index_blob_manager.go b/repo/content/index_blob_manager.go index 9e5997e80..a2b44e7c8 100644 --- a/repo/content/index_blob_manager.go +++ b/repo/content/index_blob_manager.go @@ -19,15 +19,15 @@ type indexBlobManager interface { writeIndexBlob(ctx context.Context, data []byte, sessionID SessionID) (blob.Metadata, error) listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) - registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error - cleanup(ctx context.Context) error + registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error + cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error flushCache() } const ( + defaultEventualConsistencySettleTime = 1 * time.Hour compactionLogBlobPrefix = "m" cleanupBlobPrefix = "l" - defaultEventualConsistencySettleTime = 1 * time.Hour ) // compactionLogEntry represents contents of compaction log entry stored in `m` blob. @@ -56,14 +56,13 @@ type cleanupEntry struct { } type indexBlobManagerImpl struct { - st blob.Storage - hasher hashing.HashFunc - encryptor encryption.Encryptor - listCache *listCache - ownWritesCache ownWritesCache - timeNow func() time.Time - indexBlobCache contentCache - maxEventualConsistencySettleTime time.Duration + st blob.Storage + hasher hashing.HashFunc + encryptor encryption.Encryptor + listCache *listCache + ownWritesCache ownWritesCache + timeNow func() time.Time + indexBlobCache contentCache } func (m *indexBlobManagerImpl) listAndMergeOwnWrites(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) { @@ -138,7 +137,7 @@ func (m *indexBlobManagerImpl) flushCache() { m.listCache.deleteListCache(compactionLogBlobPrefix) } -func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error { +func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error { logEntryBytes, err := json.Marshal(&compactionLogEntry{ InputMetadata: inputs, OutputMetadata: outputs, @@ -162,7 +161,7 @@ func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, o formatLog(ctx).Debugf("compaction-log %v %v", compactionLogBlobMetadata.BlobID, compactionLogBlobMetadata.Timestamp) - if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata); err != nil { + if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata, maxEventualConsistencySettleTime); err != nil { return errors.Wrap(err, "error deleting old index blobs") } @@ -271,7 +270,7 @@ func (m *indexBlobManagerImpl) getCleanupEntries(ctx context.Context, latestServ return results, nil } -func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata) error { +func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata, maxEventualConsistencySettleTime time.Duration) error { allCompactionLogBlobs, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix) if err != nil { return errors.Wrap(err, "error listing compaction log blobs") @@ -279,7 +278,7 @@ func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob bl // look for server-assigned timestamp of the compaction log entry we just wrote as a reference. // we're assuming server-generated timestamps are somewhat reasonable and time is moving - compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-m.maxEventualConsistencySettleTime) + compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-maxEventualConsistencySettleTime) compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff) log(ctx).Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff) @@ -289,7 +288,7 @@ func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob bl return errors.Wrap(err, "unable to get compaction log entries") } - indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries) + indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries, maxEventualConsistencySettleTime) // note that we must always delete index blobs first before compaction logs // otherwise we may inadvertedly resurrect an index blob that should have been removed. @@ -306,13 +305,13 @@ func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob bl return nil } -func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry) []blob.ID { +func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry, maxEventualConsistencySettleTime time.Duration) []blob.ID { tmp := map[blob.ID]bool{} for _, cl := range entries { // are the input index blobs in this compaction eligble for deletion? - if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < m.maxEventualConsistencySettleTime { - log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, m.maxEventualConsistencySettleTime) + if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < maxEventualConsistencySettleTime { + log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, maxEventualConsistencySettleTime) continue } @@ -343,9 +342,9 @@ func (m *indexBlobManagerImpl) findCompactionLogBlobsToDelayCleanup(ctx context. return result } -func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry) (compactionLogs, cleanupBlobs []blob.ID) { +func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry, maxEventualConsistencySettleTime time.Duration) (compactionLogs, cleanupBlobs []blob.ID) { for k, e := range entries { - if e.age > m.maxEventualConsistencySettleTime { + if e.age >= maxEventualConsistencySettleTime { compactionLogs = append(compactionLogs, e.BlobIDs...) cleanupBlobs = append(cleanupBlobs, k) } @@ -391,7 +390,7 @@ func (m *indexBlobManagerImpl) deleteBlobsFromStorageAndCache(ctx context.Contex return nil } -func (m *indexBlobManagerImpl) cleanup(ctx context.Context) error { +func (m *indexBlobManagerImpl) cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error { allCleanupBlobs, err := m.listCache.listBlobs(ctx, cleanupBlobPrefix) if err != nil { return errors.Wrap(err, "error listing cleanup blobs") @@ -413,7 +412,7 @@ func (m *indexBlobManagerImpl) cleanup(ctx context.Context) error { } // pick cleanup entries to delete that are old enough - compactionLogsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries) + compactionLogsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries, maxEventualConsistencySettleTime) if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogsToDelete); err != nil { return errors.Wrap(err, "unable to delete cleanup blobs") diff --git a/repo/content/index_blob_manager_test.go b/repo/content/index_blob_manager_test.go index 3384c7246..fec8a2679 100644 --- a/repo/content/index_blob_manager_test.go +++ b/repo/content/index_blob_manager_test.go @@ -496,7 +496,7 @@ func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) e inputs = append(inputs, bi.Metadata) } - if err := m.registerCompaction(ctx, inputs, outputs); err != nil { + if err := m.registerCompaction(ctx, inputs, outputs, testEventualConsistencySettleTime); err != nil { return errors.Wrap(err, "compaction error") } @@ -702,7 +702,7 @@ func mustRegisterCompaction(t *testing.T, m indexBlobManager, inputs, outputs [] t.Logf("compacting %v to %v", inputs, outputs) - err := m.registerCompaction(testlogging.Context(t), inputs, outputs) + err := m.registerCompaction(testlogging.Context(t), inputs, outputs, testEventualConsistencySettleTime) if err != nil { t.Fatalf("failed to write index blob: %v", err) } @@ -773,12 +773,11 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, localTimeNow), localTimeNow, }, - indexBlobCache: passthroughContentCache{st}, - encryptor: enc, - hasher: hf, - listCache: lc, - timeNow: localTimeNow, - maxEventualConsistencySettleTime: testIndexBlobDeleteAge, + indexBlobCache: passthroughContentCache{st}, + encryptor: enc, + hasher: hf, + listCache: lc, + timeNow: localTimeNow, } return m diff --git a/repo/maintenance/blob_gc.go b/repo/maintenance/blob_gc.go index 3c4076d9b..1286d530e 100644 --- a/repo/maintenance/blob_gc.go +++ b/repo/maintenance/blob_gc.go @@ -2,7 +2,6 @@ import ( "context" - "time" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -14,38 +13,20 @@ "github.com/kopia/kopia/repo/content" ) -// defaultBlobGCMinAge is a default MinAge for blob GC. -// We're setting it to 2 hours to accommodate the fact that every repository -// will periodically flush its indexes more frequently than 1/hour. -const defaultBlobGCMinAge = 2 * time.Hour - -// treat sessions that have not been updated in 4 days as expired. -const defaultSessionExpirationAge = 4 * 24 * time.Hour - // DeleteUnreferencedBlobsOptions provides option for blob garbage collection algorithm. type DeleteUnreferencedBlobsOptions struct { - Parallel int - Prefix blob.ID - MinAge time.Duration - SessionExpirationAge time.Duration // treat sessions that have not been written for more than X time as expired - DryRun bool + Parallel int + Prefix blob.ID + DryRun bool } // DeleteUnreferencedBlobs deletes old blobs that are no longer referenced by index entries. // nolint:gocyclo -func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWriter, opt DeleteUnreferencedBlobsOptions) (int, error) { +func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWriter, opt DeleteUnreferencedBlobsOptions, safety SafetyParameters) (int, error) { if opt.Parallel == 0 { opt.Parallel = 16 } - if opt.MinAge == 0 { - opt.MinAge = defaultBlobGCMinAge - } - - if opt.SessionExpirationAge == 0 { - opt.SessionExpirationAge = defaultSessionExpirationAge - } - const deleteQueueSize = 100 var unreferenced, deleted stats.CountSum @@ -91,14 +72,14 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite // iterate all pack blobs + session blobs and keep ones that are too young or // belong to alive sessions. if err := rep.ContentManager().IterateUnreferencedBlobs(ctx, prefixes, opt.Parallel, func(bm blob.Metadata) error { - if age := rep.Time().Sub(bm.Timestamp); age < opt.MinAge { - log(ctx).Debugf(" preserving %v because it's too new (age: %v bm: %v)", bm.BlobID, bm.Timestamp, age) + if age := rep.Time().Sub(bm.Timestamp); age < safety.BlobDeleteMinAge { + log(ctx).Debugf(" preserving %v because it's too new (age: %v<%v)", bm.BlobID, age, safety.BlobDeleteMinAge) return nil } sid := content.SessionIDFromBlobID(bm.BlobID) if s, ok := activeSessions[sid]; ok { - if age := rep.Time().Sub(s.CheckpointTime); age < opt.SessionExpirationAge { + if age := rep.Time().Sub(s.CheckpointTime); age < safety.SessionExpirationAge { log(ctx).Debugf(" preserving %v because it's part of an active session (%v)", bm.BlobID, sid) return nil } diff --git a/repo/maintenance/blob_gc_test.go b/repo/maintenance/blob_gc_test.go index 12d8d0241..9257cabc4 100644 --- a/repo/maintenance/blob_gc_test.go +++ b/repo/maintenance/blob_gc_test.go @@ -68,19 +68,21 @@ func TestDeleteUnreferencedBlobs(t *testing.T) { verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID2) // new blobs not will be deleted because of minimum age requirement - if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{ - MinAge: 1 * time.Hour, - }); err != nil { + if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, SafetyFull); err != nil { t.Fatal(err) } verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID1) verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID2) + // mixed safety parameters + safetyFastDeleteLongSessionExpiration := SafetyParameters{ + BlobDeleteMinAge: 1, + SessionExpirationAge: 4 * 24 * time.Hour, + } + // new blobs will be deleted - if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{ - MinAge: 1, - }); err != nil { + if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, SafetyNone); err != nil { t.Fatal(err) } @@ -105,9 +107,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) { CheckpointTime: ta.NowFunc()(), }) - if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{ - MinAge: 1, - }); err != nil { + if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, safetyFastDeleteLongSessionExpiration); err != nil { t.Fatal(err) } @@ -120,9 +120,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) { // now finish session 2 env.RepositoryWriter.BlobStorage().DeleteBlob(ctx, session2Marker) - if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{ - MinAge: 1, - }); err != nil { + if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, safetyFastDeleteLongSessionExpiration); err != nil { t.Fatal(err) } @@ -135,9 +133,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) { // now move time into the future making session 1 timed out ta.Advance(7 * 24 * time.Hour) - if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{ - MinAge: 1, - }); err != nil { + if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, SafetyFull); err != nil { t.Fatal(err) } @@ -155,7 +151,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) { } if diff := cmp.Diff(blobsBefore, blobsAfter); diff != "" { - t.Errorf("unexpected diff: %v", diff) + t.Fatalf("unexpected diff: %v", diff) } } diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go index f343eeda3..7400a25ce 100644 --- a/repo/maintenance/content_rewrite.go +++ b/repo/maintenance/content_rewrite.go @@ -5,7 +5,6 @@ "runtime" "strings" "sync" - "time" "github.com/pkg/errors" @@ -15,14 +14,11 @@ "github.com/kopia/kopia/repo/content" ) -const defaultRewriteContentsMinAge = 2 * time.Hour - const parallelContentRewritesCPUMultiplier = 2 // RewriteContentsOptions provides options for RewriteContents. type RewriteContentsOptions struct { Parallel int - MinAge time.Duration ContentIDs []content.ID ContentIDRange content.IDRange PackPrefix blob.ID @@ -40,15 +36,11 @@ type contentInfoOrError struct { // RewriteContents rewrites contents according to provided criteria and creates new // blobs and index entries to point at the. -func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *RewriteContentsOptions) error { +func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *RewriteContentsOptions, safety SafetyParameters) error { if opt == nil { return errors.Errorf("missing options") } - if opt.MinAge == 0 { - opt.MinAge = defaultRewriteContentsMinAge - } - if opt.ShortPacks { log(ctx).Infof("Rewriting contents from short packs...") } else { @@ -90,7 +82,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt * } age := rep.Time().Sub(c.Timestamp()) - if age < opt.MinAge { + if age < safety.RewriteMinAge { log(ctx).Debugf("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.", c.ID, c.Length, c.PackBlobID, optDeleted, age) continue } @@ -194,6 +186,16 @@ func(pi content.PackInfo) error { return nil } + blobMeta, err := rep.BlobReader().GetMetadata(ctx, pi.PackID) + if err != nil { + return errors.Wrapf(err, "unable to get blob metadata %v", pi.PackID) + } + + // pack is short but the content consumes most of it, ignore. + if pi.TotalSize > shortPackThresholdPercent*blobMeta.Length/100 { + return nil + } + for _, ci := range pi.ContentInfos { ch <- contentInfoOrError{Info: ci} } diff --git a/repo/maintenance/drop_deleted_contents.go b/repo/maintenance/drop_deleted_contents.go index 0ccfde893..bd6a38da7 100644 --- a/repo/maintenance/drop_deleted_contents.go +++ b/repo/maintenance/drop_deleted_contents.go @@ -9,11 +9,12 @@ ) // DropDeletedContents rewrites indexes while dropping deleted contents above certain age. -func DropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, dropDeletedBefore time.Time) error { +func DropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, dropDeletedBefore time.Time, safety SafetyParameters) error { log(ctx).Infof("Dropping contents deleted before %v", dropDeletedBefore) return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{ - AllIndexes: true, - DropDeletedBefore: dropDeletedBefore, + AllIndexes: true, + DropDeletedBefore: dropDeletedBefore, + DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety, }) } diff --git a/repo/maintenance/index_compaction.go b/repo/maintenance/index_compaction.go index ee5a3a595..fd2e46f79 100644 --- a/repo/maintenance/index_compaction.go +++ b/repo/maintenance/index_compaction.go @@ -10,10 +10,11 @@ const maxSmallBlobsForIndexCompaction = 8 // IndexCompaction rewrites index blobs to reduce their count but does not drop any contents. -func IndexCompaction(ctx context.Context, rep repo.DirectRepositoryWriter) error { +func IndexCompaction(ctx context.Context, rep repo.DirectRepositoryWriter, safety SafetyParameters) error { log(ctx).Infof("Compacting indexes...") return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{ - MaxSmallBlobs: maxSmallBlobsForIndexCompaction, + MaxSmallBlobs: maxSmallBlobsForIndexCompaction, + DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety, }) } diff --git a/repo/maintenance/maintenance_params.go b/repo/maintenance/maintenance_params.go index 0e5fff0e9..ce150acc5 100644 --- a/repo/maintenance/maintenance_params.go +++ b/repo/maintenance/maintenance_params.go @@ -20,15 +20,6 @@ type Params struct { QuickCycle CycleParams `json:"quick"` FullCycle CycleParams `json:"full"` - - SnapshotGC SnapshotGCParams `json:"snapshotGC"` -} - -// SnapshotGCParams contains parameters for Snapshot Garbage Collection -// NOTE: Due to layering, the implementation of Snapshot GC is outside of repository package -// but for simplicity we store it here. -type SnapshotGCParams struct { - MinContentAge time.Duration `json:"minAge"` } // DefaultParams represents default values of maintenance parameters. @@ -42,9 +33,6 @@ func DefaultParams() Params { Enabled: true, Interval: 1 * time.Hour, }, - SnapshotGC: SnapshotGCParams{ - MinContentAge: 24 * time.Hour, //nolint:gomnd - }, } } diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index facdcb6df..ba8e569df 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -14,16 +14,6 @@ "github.com/kopia/kopia/repo/logging" ) -// safetyMarginBetweenSnapshotGC is the minimal amount of time that must pass between snapshot -// GC cycles to allow all in-flight snapshots during earlier GC to be flushed to be flushed and -// visible to a following GC. The uploader will automatically create a checkpoint every 45 minutes, -// so ~1 hour should be enough but we're setting this to a higher conservative value for extra safety. -const ( - safetyMarginBetweenSnapshotGC = 4 * time.Hour - - extraSafetyMarginBeforeDroppingContentFromIndex = -1 * time.Hour -) - var log = logging.GetContextLoggerFunc("maintenance") // Mode describes the mode of maintenance to perfor. @@ -186,20 +176,20 @@ func RunExclusive(ctx context.Context, rep repo.DirectRepositoryWriter, mode Mod } // Run performs maintenance activities for a repository. -func Run(ctx context.Context, runParams RunParameters) error { +func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { switch runParams.Mode { case ModeQuick: - return runQuickMaintenance(ctx, runParams) + return runQuickMaintenance(ctx, runParams, safety) case ModeFull: - return runFullMaintenance(ctx, runParams) + return runFullMaintenance(ctx, runParams, safety) default: return errors.Errorf("unknown mode %q", runParams.Mode) } } -func runQuickMaintenance(ctx context.Context, runParams RunParameters) error { +func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { // find 'q' packs that are less than 80% full and rewrite contents in them into // new consolidated packs, orphaning old packs in the process. if err := ReportRun(ctx, runParams.rep, "quick-rewrite-contents", func() error { @@ -207,7 +197,7 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error { ContentIDRange: content.AllPrefixedIDs, PackPrefix: content.PackBlobIDPrefixSpecial, ShortPacks: true, - }) + }, safety) }); err != nil { return errors.Wrap(err, "error rewriting metadata contents") } @@ -216,7 +206,7 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error { if err := ReportRun(ctx, runParams.rep, "quick-delete-blobs", func() error { _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ Prefix: content.PackBlobIDPrefixSpecial, - }) + }, safety) return err }); err != nil { return errors.Wrap(err, "error deleting unreferenced metadata blobs") @@ -224,7 +214,7 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error { // consolidate many smaller indexes into fewer larger ones. if err := ReportRun(ctx, runParams.rep, "index-compaction", func() error { - return IndexCompaction(ctx, runParams.rep) + return IndexCompaction(ctx, runParams.rep, safety) }); err != nil { return errors.Wrap(err, "error performing index compaction") } @@ -232,19 +222,27 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error { return nil } -func runFullMaintenance(ctx context.Context, runParams RunParameters) error { - s, err := GetSchedule(ctx, runParams.rep) - if err != nil { - return errors.Wrap(err, "unable to get schedule") +func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { + var safeDropTime time.Time + + if safety.RequireTwoGCCycles { + s, err := GetSchedule(ctx, runParams.rep) + if err != nil { + return errors.Wrap(err, "unable to get schedule") + } + + safeDropTime = findSafeDropTime(s.Runs["snapshot-gc"], safety) + } else { + safeDropTime = runParams.rep.Time() } - if safeDropTime := findSafeDropTime(s.Runs["snapshot-gc"]); !safeDropTime.IsZero() { + if !safeDropTime.IsZero() { log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime) // rewrite indexes by dropping content entries that have been marked // as deleted for a long time if err := ReportRun(ctx, runParams.rep, "full-drop-deleted-content", func() error { - return DropDeletedContents(ctx, runParams.rep, safeDropTime) + return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety) }); err != nil { return errors.Wrap(err, "error dropping deleted contents") } @@ -258,14 +256,14 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters) error { return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ ContentIDRange: content.AllIDs, ShortPacks: true, - }) + }, safety) }); err != nil { return errors.Wrap(err, "error rewriting contents in short packs") } // delete orphaned packs after some time. if err := ReportRun(ctx, runParams.rep, "full-delete-blobs", func() error { - _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}) + _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety) return err }); err != nil { return errors.Wrap(err, "error deleting unreferenced blobs") @@ -278,7 +276,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters) error { // deleted before that time, because at least two successful GC cycles have completed // and minimum required time between the GCs has passed. // -// The worst possible case we needf to handle is: +// The worst possible case we need to handle is: // // Step #1 - race between GC and snapshot creation: // @@ -297,8 +295,8 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters) error { // After Step 2 completes, we know for sure that all contents deleted before Step #1 has started // are safe to drop from the index because Step #2 has fixed them, as long as all snapshots that // were racing with snapshot GC in step #1 have flushed pending writes, hence the -// safetyMarginBetweenSnapshotGC. -func findSafeDropTime(runs []RunInfo) time.Time { +// safety.MarginBetweenSnapshotGC. +func findSafeDropTime(runs []RunInfo, safety SafetyParameters) time.Time { var successfulRuns []RunInfo for _, r := range runs { @@ -319,8 +317,8 @@ func findSafeDropTime(runs []RunInfo) time.Time { // Look for previous successful run such that the time between GCs exceeds the safety margin. for _, r := range successfulRuns[1:] { diff := -r.End.Sub(successfulRuns[0].Start) - if diff > safetyMarginBetweenSnapshotGC { - return r.Start.Add(extraSafetyMarginBeforeDroppingContentFromIndex) + if diff > safety.MarginBetweenSnapshotGC { + return r.Start.Add(-safety.DropContentFromIndexExtraMargin) } } diff --git a/repo/maintenance/maintenance_run_test.go b/repo/maintenance/maintenance_run_test.go index cd91183fa..380e3fa6b 100644 --- a/repo/maintenance/maintenance_run_test.go +++ b/repo/maintenance/maintenance_run_test.go @@ -45,7 +45,7 @@ func TestFindSafeDropTime(t *testing.T) { {Start: t0700, End: t0715, Success: true}, {Start: t1300, End: t1315, Success: true}, }, - wantTime: t0700.Add(extraSafetyMarginBeforeDroppingContentFromIndex), + wantTime: t0700.Add(-SafetyFull.DropContentFromIndexExtraMargin), }, // three runs spaced enough { @@ -54,7 +54,7 @@ func TestFindSafeDropTime(t *testing.T) { {Start: t0900, End: t0915, Success: true}, {Start: t1300, End: t1315, Success: true}, }, - wantTime: t0700.Add(extraSafetyMarginBeforeDroppingContentFromIndex), + wantTime: t0700.Add(-SafetyFull.DropContentFromIndexExtraMargin), }, // three runs spaced enough, not successful { @@ -68,7 +68,7 @@ func TestFindSafeDropTime(t *testing.T) { } for _, tc := range cases { - if got, want := findSafeDropTime(tc.runs), tc.wantTime; !got.Equal(want) { + if got, want := findSafeDropTime(tc.runs, SafetyFull), tc.wantTime; !got.Equal(want) { t.Errorf("invalid safe drop time for %v: %v, want %v", tc.runs, got, want) } } diff --git a/repo/maintenance/maintenance_safety.go b/repo/maintenance/maintenance_safety.go new file mode 100644 index 000000000..f151f6a02 --- /dev/null +++ b/repo/maintenance/maintenance_safety.go @@ -0,0 +1,62 @@ +package maintenance + +import "time" + +// SafetyParameters specifies timing parameters that affect safety of maintenance. +type SafetyParameters struct { + // Do not rewrite contents younger than this age. + RewriteMinAge time.Duration + + // Snapshot GC: MinContentAgeSubjectToGC is the minimum age of content to be subject to garbage collection. + MinContentAgeSubjectToGC time.Duration + + // MarginBetweenSnapshotGC is the minimal amount of time that must pass between snapshot + // GC cycles to allow all in-flight snapshots during earlier GC to be flushed and + // visible to a following GC. The uploader will automatically create a checkpoint every 45 minutes, + // so ~1 hour should be enough but we're setting this to a higher conservative value for extra safety. + MarginBetweenSnapshotGC time.Duration + + // RequireTwoGCCycles indicates that two GC cycles are required. + RequireTwoGCCycles bool + + // DisableEventualConsistencySafety disables wait time to allow settling of eventually-consistent writes in blob stores. + DisableEventualConsistencySafety bool + + // DropContentFromIndexExtraMargin is the amount of margin time before dropping deleted contents from indices. + DropContentFromIndexExtraMargin time.Duration + + // Blob GC: Delete unused blobs above this age. + BlobDeleteMinAge time.Duration + + // Blob GC: Drop incomplete session blobs above this age. + SessionExpirationAge time.Duration +} + +// Supported safety levels. +var ( + // SafetyNone has safety parameters which allow full garbage collection without unnecessary + // delays, but it is safe only if no other kopia clients are running and storage backend is + // strongly consistent. + SafetyNone = SafetyParameters{ + BlobDeleteMinAge: 0, + DropContentFromIndexExtraMargin: 0, + MarginBetweenSnapshotGC: 0, + MinContentAgeSubjectToGC: 0, + RewriteMinAge: 0, + SessionExpirationAge: 0, + RequireTwoGCCycles: false, + DisableEventualConsistencySafety: true, + } + + // SafetyFull has default safety parameters which allow safe GC concurrent with snapshotting + // by other Kopia clients. + SafetyFull = SafetyParameters{ + BlobDeleteMinAge: 2 * time.Hour, //nolint:gomnd + DropContentFromIndexExtraMargin: time.Hour, + MarginBetweenSnapshotGC: 4 * time.Hour, //nolint:gomnd + MinContentAgeSubjectToGC: 24 * time.Hour, //nolint:gomnd + RewriteMinAge: 2 * time.Hour, //nolint:gomnd + SessionExpirationAge: 96 * time.Hour, //nolint:gomnd + RequireTwoGCCycles: true, + } +) diff --git a/snapshot/snapshotgc/gc.go b/snapshot/snapshotgc/gc.go index d515fe490..abe118b5b 100644 --- a/snapshot/snapshotgc/gc.go +++ b/snapshot/snapshotgc/gc.go @@ -74,17 +74,17 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma } // Run performs garbage collection on all the snapshots in the repository. -func Run(ctx context.Context, rep repo.DirectRepositoryWriter, params maintenance.SnapshotGCParams, gcDelete bool) (Stats, error) { +func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters) (Stats, error) { var st Stats err := maintenance.ReportRun(ctx, rep, "snapshot-gc", func() error { - return runInternal(ctx, rep, params, gcDelete, &st) + return runInternal(ctx, rep, gcDelete, safety, &st) }) return st, errors.Wrap(err, "error running snapshot gc") } -func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, params maintenance.SnapshotGCParams, gcDelete bool, st *Stats) error { +func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, st *Stats) error { var ( used sync.Map @@ -117,7 +117,7 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, params ma return nil } - if rep.Time().Sub(ci.Timestamp()) < params.MinContentAge { + if rep.Time().Sub(ci.Timestamp()) < safety.MinContentAgeSubjectToGC { log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ID, ci.Length, ci.Timestamp()) tooRecent.Add(int64(ci.Length)) return nil @@ -158,5 +158,5 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, params ma return errors.Errorf("Not deleting because '--delete' flag was not set") } - return nil + return errors.Wrap(rep.Flush(ctx), "flush error") } diff --git a/snapshot/snapshotmaintenance/snapshotmaintenance.go b/snapshot/snapshotmaintenance/snapshotmaintenance.go index d379e6251..be3f4d22e 100644 --- a/snapshot/snapshotmaintenance/snapshotmaintenance.go +++ b/snapshot/snapshotmaintenance/snapshotmaintenance.go @@ -12,16 +12,16 @@ ) // Run runs the complete snapshot and repository maintenance. -func Run(ctx context.Context, dr repo.DirectRepositoryWriter, mode maintenance.Mode, force bool) error { +func Run(ctx context.Context, dr repo.DirectRepositoryWriter, mode maintenance.Mode, force bool, safety maintenance.SafetyParameters) error { return maintenance.RunExclusive(ctx, dr, mode, force, func(runParams maintenance.RunParameters) error { // run snapshot GC before full maintenance if runParams.Mode == maintenance.ModeFull { - if _, err := snapshotgc.Run(ctx, dr, runParams.Params.SnapshotGC, true); err != nil { + if _, err := snapshotgc.Run(ctx, dr, true, safety); err != nil { return errors.Wrap(err, "snapshot GC failure") } } - return maintenance.Run(ctx, runParams) + return maintenance.Run(ctx, runParams, safety) }) } diff --git a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go index 3cf638fce..3815c1371 100644 --- a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go +++ b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go @@ -55,10 +55,12 @@ func TestSnapshotGCSimple(t *testing.T) { mustFlush(t, th.RepositoryWriter) - // Advance time to force GC to mark as deleted the contents from the previous snapshot - th.fakeTime.Advance(maintenance.DefaultParams().SnapshotGC.MinContentAge + time.Hour) + safety := maintenance.SafetyFull - err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true) + // Advance time to force GC to mark as deleted the contents from the previous snapshot + th.fakeTime.Advance(safety.MinContentAgeSubjectToGC + time.Hour) + + err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull) require.NoError(t, err) mustFlush(t, th.RepositoryWriter) @@ -109,8 +111,10 @@ func TestMaintenanceReuseDirManifest(t *testing.T) { mustFlush(t, th.RepositoryWriter) + safety := maintenance.SafetyFull + // Advance time to force GC to mark as deleted the contents from the previous snapshot - th.fakeTime.Advance(maintenance.DefaultParams().SnapshotGC.MinContentAge + time.Hour) + th.fakeTime.Advance(safety.MinContentAgeSubjectToGC + time.Hour) r2 := th.openAnother(t) @@ -120,7 +124,7 @@ func TestMaintenanceReuseDirManifest(t *testing.T) { // interleaving snapshot and maintenance and delaying flushing as well to // create dangling references to contents that were in the previously // deleted snapshot and that are reused in this new snapshot. - err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true) + err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull) require.NoError(t, err) info, err := r2.(repo.DirectRepository).ContentReader().ContentInfo(ctx, content.ID(s2.RootObjectID())) @@ -145,8 +149,8 @@ func TestMaintenanceReuseDirManifest(t *testing.T) { require.NoError(t, err) // Run maintenance again - th.fakeTime.Advance(maintenance.DefaultParams().SnapshotGC.MinContentAge + time.Hour) - err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true) + th.fakeTime.Advance(safety.MinContentAgeSubjectToGC + time.Hour) + err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, safety) require.NoError(t, err) mustFlush(t, th.RepositoryWriter) diff --git a/tests/end_to_end_test/maintenance_test.go b/tests/end_to_end_test/maintenance_test.go new file mode 100644 index 000000000..32bda4635 --- /dev/null +++ b/tests/end_to_end_test/maintenance_test.go @@ -0,0 +1,56 @@ +package endtoend_test + +import ( + "testing" + + "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/tests/testenv" +) + +func TestFullMaintenance(t *testing.T) { + t.Parallel() + + e := testenv.NewCLITest(t) + + e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir) + defer e.RunAndExpectSuccess(t, "repo", "disconnect") + + var snap snapshot.Manifest + + // after creation we'll have kopia.repository, 1 index + 1 pack blob + if got, want := e.RunAndExpectSuccess(t, "blob", "list"), 3; len(got) != want { + t.Fatalf("unexpected number of initial blobs: %v, want %v", got, want) + } + + mustParseJSONLines(t, e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1, "--json"), &snap) + e.RunAndExpectSuccess(t, "snapshot", "delete", string(snap.ID), "--delete") + + e.RunAndVerifyOutputLineCount(t, 0, "snapshot", "list") + + originalBlobCount := len(e.RunAndExpectSuccess(t, "blob", "list")) + + e.RunAndVerifyOutputLineCount(t, 0, "maintenance", "run", "--full") + + if got := len(e.RunAndExpectSuccess(t, "blob", "list")); got != originalBlobCount { + t.Fatalf("full maintenance is not expected to change any blobs due to safety margins (got %v, was %v)", got, originalBlobCount) + } + + // now rerun with --safety=none + e.RunAndExpectSuccess(t, "maintenance", "run", "--full", "--safety=none") + + if got := len(e.RunAndExpectSuccess(t, "blob", "list")); got >= originalBlobCount { + t.Fatalf("maintenance did not remove blobs: %v, had %v", got, originalBlobCount) + } + + // we're expecting to have 5 blobs: + // - kopia.maintenance + // - kopia.repository + // - 2 index blobs + // - 1 q blob + + const blobCountAfterFullWipeout = 5 + + if got, want := e.RunAndExpectSuccess(t, "blob", "list"), blobCountAfterFullWipeout; len(got) != want { + t.Fatalf("maintenance left unwanted blobs: %v, want %v", got, want) + } +} diff --git a/tests/end_to_end_test/snapshot_delete_test.go b/tests/end_to_end_test/snapshot_delete_test.go index 39cd08514..91c23538a 100644 --- a/tests/end_to_end_test/snapshot_delete_test.go +++ b/tests/end_to_end_test/snapshot_delete_test.go @@ -223,7 +223,7 @@ func TestSnapshotDeleteRestore(t *testing.T) { e.RunAndExpectFailure(t, "snapshot", "delete", snapID, "--delete") // garbage-collect to clean up the root object. - e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--min-age", "0s") + e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--safety=none") // Run a restore on the deleted snapshot's root ID. The root should be // marked as deleted but still recoverable diff --git a/tests/end_to_end_test/snapshot_gc_test.go b/tests/end_to_end_test/snapshot_gc_test.go index f2e14bf6a..b49affc2d 100644 --- a/tests/end_to_end_test/snapshot_gc_test.go +++ b/tests/end_to_end_test/snapshot_gc_test.go @@ -52,9 +52,14 @@ func TestSnapshotGC(t *testing.T) { // run verification e.RunAndExpectSuccess(t, "snapshot", "verify") - // garbage-collect in dry run mode + // garbage-collect in dry run mode - this will not fail because of default safety level + // which only looks at contents above certain age. e.RunAndExpectSuccess(t, "snapshot", "gc") + // garbage-collect in dry run mode - this will fail because of --safety=none + // makes contents subject to GC immediately but we're not specifying --delete flag. + e.RunAndExpectFailure(t, "snapshot", "gc", "--safety=none") + // data block + directory block + manifest block + manifest block from manifest deletion var contentInfo []content.Info @@ -74,7 +79,7 @@ func TestSnapshotGC(t *testing.T) { time.Sleep(2 * time.Second) // garbage-collect for real, this time without age limit - e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--min-age", "0s") + e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--safety=none") // two contents are deleted expectedContentCount -= 2 diff --git a/tests/testenv/cli_test_env.go b/tests/testenv/cli_test_env.go index f78f8e11c..2252ba3ac 100644 --- a/tests/testenv/cli_test_env.go +++ b/tests/testenv/cli_test_env.go @@ -74,8 +74,12 @@ func NewCLITest(t *testing.T) *CLITest { exe := os.Getenv("KOPIA_EXE") if exe == "" { - // exe = "kopia" - t.Skip() + if os.Getenv("VSCODE_PID") != "" { + // we're launched from VSCode, use system-installed kopia executable. + exe = "kopia" + } else { + t.Skip() + } } // unset environment variables that disrupt tests when passed to subprocesses.