mirror of
https://github.com/kopia/kopia.git
synced 2026-05-14 09:47:35 -04:00
cli: added --safety=full|none flag to maintenance commands (#912)
* cli: added --safety=full|none flag to maintenance commands This allows selection between safe, high-latency maintenance parameters which allow concurrent access (`full`) or low-latency which may be unsafe in certain situations when concurrent Kopia processes are running. This is a breaking change for advanced CLI commands, where it removes timing parameters and replaces them with single `--safety` option. * 'blob gc' * 'content rewrite' * 'snapshot gc' * pr renames * maintenance: fixed computation of safe time for --safety=none * maintenance: improved logging for blob gc * maintenance: do not rewrite truly short, densely packed packs * mechanical: pass eventual consistency settle time via CompactOptions * maintenance: add option to disable eventual consistency time buffers with --safety=none * maintenance: trigger flush at the end of snapshot gc * maintenance: reload indexes after compaction that drops deleted entries, this allows single-pass maintenance with --safety=none to delete all unused blobs * testing: allow debugging of integration tests inside VSCode * testing: added end-to-end maintenance test that verifies that full maintenance with --safety=none removes all data
This commit is contained in:
27
cli/app.go
27
cli/app.go
@@ -48,6 +48,31 @@
|
||||
aclCommands = serverCommands.Command("acl", "Manager server access control list entries")
|
||||
)
|
||||
|
||||
var safetyByName = map[string]maintenance.SafetyParameters{
|
||||
"none": maintenance.SafetyNone,
|
||||
"full": maintenance.SafetyFull,
|
||||
}
|
||||
|
||||
// safetyFlag defines a --safety=none|full flag that returns SafetyParameters.
|
||||
func safetyFlag(c *kingpin.CmdClause) *maintenance.SafetyParameters {
|
||||
var (
|
||||
result = maintenance.SafetyFull
|
||||
str string
|
||||
)
|
||||
|
||||
c.Flag("safety", "Safety level").Default("full").PreAction(func(pc *kingpin.ParseContext) error {
|
||||
var ok bool
|
||||
result, ok = safetyByName[str]
|
||||
if !ok {
|
||||
return errors.Errorf("unhandled safety level")
|
||||
}
|
||||
|
||||
return nil
|
||||
}).EnumVar(&str, "full", "none")
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
func helpFullAction(ctx *kingpin.ParseContext) error {
|
||||
_ = app.UsageForContextWithTemplate(ctx, 0, kingpin.DefaultUsageTemplate)
|
||||
|
||||
@@ -213,7 +238,7 @@ func maybeRunMaintenance(ctx context.Context, rep repo.Repository) error {
|
||||
Purpose: "maybeRunMaintenance",
|
||||
OnUpload: progress.UploadedBytes,
|
||||
}, func(w repo.DirectRepositoryWriter) error {
|
||||
return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false)
|
||||
return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false, maintenance.SafetyFull)
|
||||
})
|
||||
|
||||
var noe maintenance.NotOwnedError
|
||||
|
||||
@@ -11,26 +11,23 @@
|
||||
)
|
||||
|
||||
var (
|
||||
blobGarbageCollectCommand = blobCommands.Command("gc", "Garbage-collect unused blobs")
|
||||
blobGarbageCollectCommandDelete = blobGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String()
|
||||
blobGarbageCollectParallel = blobGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Default("16").Int()
|
||||
blobGarbageCollectMinAge = blobGarbageCollectCommand.Flag("min-age", "Garbage-collect blobs with minimum age").Default("24h").Duration()
|
||||
blobGarbageCollectSessionExpirationAge = blobGarbageCollectCommand.Flag("session-expiration-age", "Garbage-collect blobs belonging to sessions that have not been updated recently").Default("96h").Duration()
|
||||
blobGarbageCollectPrefix = blobGarbageCollectCommand.Flag("prefix", "Only GC blobs with given prefix").String()
|
||||
blobGarbageCollectCommand = blobCommands.Command("gc", "Garbage-collect unused blobs")
|
||||
blobGarbageCollectCommandDelete = blobGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String()
|
||||
blobGarbageCollectParallel = blobGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Default("16").Int()
|
||||
blobGarbageCollectPrefix = blobGarbageCollectCommand.Flag("prefix", "Only GC blobs with given prefix").String()
|
||||
blobGarbageCollectSafety = safetyFlag(blobGarbageCollectCommand)
|
||||
)
|
||||
|
||||
func runBlobGarbageCollectCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
advancedCommand(ctx)
|
||||
|
||||
opts := maintenance.DeleteUnreferencedBlobsOptions{
|
||||
DryRun: *blobGarbageCollectCommandDelete != "yes",
|
||||
MinAge: *blobGarbageCollectMinAge,
|
||||
SessionExpirationAge: *blobGarbageCollectSessionExpirationAge,
|
||||
Parallel: *blobGarbageCollectParallel,
|
||||
Prefix: blob.ID(*blobGarbageCollectPrefix),
|
||||
DryRun: *blobGarbageCollectCommandDelete != "yes",
|
||||
Parallel: *blobGarbageCollectParallel,
|
||||
Prefix: blob.ID(*blobGarbageCollectPrefix),
|
||||
}
|
||||
|
||||
n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts)
|
||||
n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts, *blobGarbageCollectSafety)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced blobs")
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
contentRewriteFormatVersion = contentRewriteCommand.Flag("format-version", "Rewrite contents using the provided format version").Default("-1").Int()
|
||||
contentRewritePackPrefix = contentRewriteCommand.Flag("pack-prefix", "Only rewrite contents from pack blobs with a given prefix").String()
|
||||
contentRewriteDryRun = contentRewriteCommand.Flag("dry-run", "Do not actually rewrite, only print what would happen").Short('n').Bool()
|
||||
contentRewriteMinAge = contentRewriteCommand.Flag("min-age", "Only rewrite contents above given age").Default("1h").Duration()
|
||||
contentRewriteSafety = safetyFlag(contentRewriteCommand)
|
||||
)
|
||||
|
||||
func runContentRewriteCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
@@ -28,12 +28,11 @@ func runContentRewriteCommand(ctx context.Context, rep repo.DirectRepositoryWrit
|
||||
ContentIDRange: contentIDRange(),
|
||||
ContentIDs: toContentIDs(*contentRewriteIDs),
|
||||
FormatVersion: *contentRewriteFormatVersion,
|
||||
MinAge: *contentRewriteMinAge,
|
||||
PackPrefix: blob.ID(*contentRewritePackPrefix),
|
||||
Parallel: *contentRewriteParallelism,
|
||||
ShortPacks: *contentRewriteShortPacks,
|
||||
DryRun: *contentRewriteDryRun,
|
||||
})
|
||||
}, *contentRewriteSafety)
|
||||
}
|
||||
|
||||
func toContentIDs(s []string) []content.ID {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
maintenanceRunCommand = maintenanceCommands.Command("run", "Run repository maintenance").Default()
|
||||
maintenanceRunFull = maintenanceRunCommand.Flag("full", "Full maintenance").Bool()
|
||||
maintenanceRunForce = maintenanceRunCommand.Flag("force", "Run maintenance even if not owned (unsafe)").Hidden().Bool()
|
||||
maintenanceRunSafety = safetyFlag(maintenanceRunCommand)
|
||||
)
|
||||
|
||||
func runMaintenanceCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
@@ -20,7 +21,7 @@ func runMaintenanceCommand(ctx context.Context, rep repo.DirectRepositoryWriter)
|
||||
mode = maintenance.ModeFull
|
||||
}
|
||||
|
||||
return snapshotmaintenance.Run(ctx, rep, mode, *maintenanceRunForce)
|
||||
return snapshotmaintenance.Run(ctx, rep, mode, *maintenanceRunForce, *maintenanceRunSafety)
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -7,20 +7,17 @@
|
||||
|
||||
"github.com/kopia/kopia/internal/units"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/maintenance"
|
||||
"github.com/kopia/kopia/snapshot/snapshotgc"
|
||||
)
|
||||
|
||||
var (
|
||||
snapshotGCCommand = snapshotCommands.Command("gc", "Mark contents as deleted which are not used by any snapshot").Hidden()
|
||||
snapshotGCMinContentAge = snapshotGCCommand.Flag("min-age", "Minimum content age to allow deletion").Default("24h").Duration()
|
||||
snapshotGCDelete = snapshotGCCommand.Flag("delete", "Delete unreferenced contents").Bool()
|
||||
snapshotGCCommand = snapshotCommands.Command("gc", "Mark contents as deleted which are not used by any snapshot").Hidden()
|
||||
snapshotGCDelete = snapshotGCCommand.Flag("delete", "Delete unreferenced contents").Bool()
|
||||
snapshotGCSafety = safetyFlag(snapshotGCCommand)
|
||||
)
|
||||
|
||||
func runSnapshotGCCommand(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
st, err := snapshotgc.Run(ctx, rep, maintenance.SnapshotGCParams{
|
||||
MinContentAge: *snapshotGCMinContentAge,
|
||||
}, *snapshotGCDelete)
|
||||
st, err := snapshotgc.Run(ctx, rep, *snapshotGCDelete, *snapshotGCSafety)
|
||||
|
||||
log(ctx).Infof("GC found %v unused contents (%v bytes)", st.UnusedCount, units.BytesStringBase2(st.UnusedBytes))
|
||||
log(ctx).Infof("GC found %v unused contents that are too recent to delete (%v bytes)", st.TooRecentCount, units.BytesStringBase2(st.TooRecentBytes))
|
||||
|
||||
@@ -506,7 +506,7 @@ func periodicMaintenanceOnce(ctx context.Context, rep repo.Repository) error {
|
||||
return repo.DirectWriteSession(ctx, dr, repo.WriteSessionOptions{
|
||||
Purpose: "periodicMaintenanceOnce",
|
||||
}, func(w repo.DirectRepositoryWriter) error {
|
||||
return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false)
|
||||
return snapshotmaintenance.Run(ctx, w, maintenance.ModeAuto, false, maintenance.SafetyFull)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -311,14 +311,13 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
|
||||
sm.committedContents = contentIndex
|
||||
|
||||
sm.indexBlobManager = &indexBlobManagerImpl{
|
||||
st: sm.st,
|
||||
encryptor: sm.encryptor,
|
||||
hasher: sm.hasher,
|
||||
timeNow: sm.timeNow,
|
||||
ownWritesCache: owc,
|
||||
listCache: listCache,
|
||||
indexBlobCache: metadataCache,
|
||||
maxEventualConsistencySettleTime: defaultEventualConsistencySettleTime,
|
||||
st: sm.st,
|
||||
encryptor: sm.encryptor,
|
||||
hasher: sm.hasher,
|
||||
timeNow: sm.timeNow,
|
||||
ownWritesCache: owc,
|
||||
listCache: listCache,
|
||||
indexBlobCache: metadataCache,
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -14,10 +14,19 @@
|
||||
|
||||
// CompactOptions provides options for compaction.
|
||||
type CompactOptions struct {
|
||||
MaxSmallBlobs int
|
||||
AllIndexes bool
|
||||
DropDeletedBefore time.Time
|
||||
DropContents []ID
|
||||
MaxSmallBlobs int
|
||||
AllIndexes bool
|
||||
DropDeletedBefore time.Time
|
||||
DropContents []ID
|
||||
DisableEventualConsistencySafety bool
|
||||
}
|
||||
|
||||
func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration {
|
||||
if co.DisableEventualConsistencySafety {
|
||||
return 0
|
||||
}
|
||||
|
||||
return defaultEventualConsistencySettleTime
|
||||
}
|
||||
|
||||
// CompactIndexes performs compaction of index blobs ensuring that # of small index blobs is below opt.maxSmallBlobs.
|
||||
@@ -38,7 +47,16 @@ func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions)
|
||||
return errors.Wrap(err, "error performing compaction")
|
||||
}
|
||||
|
||||
return bm.indexBlobManager.cleanup(ctx)
|
||||
if err := bm.indexBlobManager.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil {
|
||||
return errors.Wrap(err, "error cleaning up index blobs")
|
||||
}
|
||||
|
||||
// reload indexes after cleanup.
|
||||
if _, _, err := bm.loadPackIndexesUnlocked(ctx); err != nil {
|
||||
return errors.Wrap(err, "error re-loading indexes")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sm *SharedManager) getBlobsToCompact(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo {
|
||||
@@ -125,7 +143,7 @@ func (sm *SharedManager) compactIndexBlobs(ctx context.Context, indexBlobs []Ind
|
||||
|
||||
outputs = append(outputs, compactedIndexBlob)
|
||||
|
||||
if err := sm.indexBlobManager.registerCompaction(ctx, inputs, outputs); err != nil {
|
||||
if err := sm.indexBlobManager.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil {
|
||||
return errors.Wrap(err, "unable to register compaction")
|
||||
}
|
||||
|
||||
|
||||
@@ -19,15 +19,15 @@ type indexBlobManager interface {
|
||||
writeIndexBlob(ctx context.Context, data []byte, sessionID SessionID) (blob.Metadata, error)
|
||||
listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error)
|
||||
getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error)
|
||||
registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error
|
||||
cleanup(ctx context.Context) error
|
||||
registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error
|
||||
cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error
|
||||
flushCache()
|
||||
}
|
||||
|
||||
const (
|
||||
defaultEventualConsistencySettleTime = 1 * time.Hour
|
||||
compactionLogBlobPrefix = "m"
|
||||
cleanupBlobPrefix = "l"
|
||||
defaultEventualConsistencySettleTime = 1 * time.Hour
|
||||
)
|
||||
|
||||
// compactionLogEntry represents contents of compaction log entry stored in `m` blob.
|
||||
@@ -56,14 +56,13 @@ type cleanupEntry struct {
|
||||
}
|
||||
|
||||
type indexBlobManagerImpl struct {
|
||||
st blob.Storage
|
||||
hasher hashing.HashFunc
|
||||
encryptor encryption.Encryptor
|
||||
listCache *listCache
|
||||
ownWritesCache ownWritesCache
|
||||
timeNow func() time.Time
|
||||
indexBlobCache contentCache
|
||||
maxEventualConsistencySettleTime time.Duration
|
||||
st blob.Storage
|
||||
hasher hashing.HashFunc
|
||||
encryptor encryption.Encryptor
|
||||
listCache *listCache
|
||||
ownWritesCache ownWritesCache
|
||||
timeNow func() time.Time
|
||||
indexBlobCache contentCache
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerImpl) listAndMergeOwnWrites(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) {
|
||||
@@ -138,7 +137,7 @@ func (m *indexBlobManagerImpl) flushCache() {
|
||||
m.listCache.deleteListCache(compactionLogBlobPrefix)
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error {
|
||||
func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
|
||||
logEntryBytes, err := json.Marshal(&compactionLogEntry{
|
||||
InputMetadata: inputs,
|
||||
OutputMetadata: outputs,
|
||||
@@ -162,7 +161,7 @@ func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, o
|
||||
|
||||
formatLog(ctx).Debugf("compaction-log %v %v", compactionLogBlobMetadata.BlobID, compactionLogBlobMetadata.Timestamp)
|
||||
|
||||
if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata); err != nil {
|
||||
if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata, maxEventualConsistencySettleTime); err != nil {
|
||||
return errors.Wrap(err, "error deleting old index blobs")
|
||||
}
|
||||
|
||||
@@ -271,7 +270,7 @@ func (m *indexBlobManagerImpl) getCleanupEntries(ctx context.Context, latestServ
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata) error {
|
||||
func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
|
||||
allCompactionLogBlobs, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing compaction log blobs")
|
||||
@@ -279,7 +278,7 @@ func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob bl
|
||||
|
||||
// look for server-assigned timestamp of the compaction log entry we just wrote as a reference.
|
||||
// we're assuming server-generated timestamps are somewhat reasonable and time is moving
|
||||
compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-m.maxEventualConsistencySettleTime)
|
||||
compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-maxEventualConsistencySettleTime)
|
||||
compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff)
|
||||
|
||||
log(ctx).Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff)
|
||||
@@ -289,7 +288,7 @@ func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob bl
|
||||
return errors.Wrap(err, "unable to get compaction log entries")
|
||||
}
|
||||
|
||||
indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries)
|
||||
indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries, maxEventualConsistencySettleTime)
|
||||
|
||||
// note that we must always delete index blobs first before compaction logs
|
||||
// otherwise we may inadvertedly resurrect an index blob that should have been removed.
|
||||
@@ -306,13 +305,13 @@ func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob bl
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry) []blob.ID {
|
||||
func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry, maxEventualConsistencySettleTime time.Duration) []blob.ID {
|
||||
tmp := map[blob.ID]bool{}
|
||||
|
||||
for _, cl := range entries {
|
||||
// are the input index blobs in this compaction eligble for deletion?
|
||||
if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < m.maxEventualConsistencySettleTime {
|
||||
log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, m.maxEventualConsistencySettleTime)
|
||||
if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < maxEventualConsistencySettleTime {
|
||||
log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, maxEventualConsistencySettleTime)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -343,9 +342,9 @@ func (m *indexBlobManagerImpl) findCompactionLogBlobsToDelayCleanup(ctx context.
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry) (compactionLogs, cleanupBlobs []blob.ID) {
|
||||
func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry, maxEventualConsistencySettleTime time.Duration) (compactionLogs, cleanupBlobs []blob.ID) {
|
||||
for k, e := range entries {
|
||||
if e.age > m.maxEventualConsistencySettleTime {
|
||||
if e.age >= maxEventualConsistencySettleTime {
|
||||
compactionLogs = append(compactionLogs, e.BlobIDs...)
|
||||
cleanupBlobs = append(cleanupBlobs, k)
|
||||
}
|
||||
@@ -391,7 +390,7 @@ func (m *indexBlobManagerImpl) deleteBlobsFromStorageAndCache(ctx context.Contex
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *indexBlobManagerImpl) cleanup(ctx context.Context) error {
|
||||
func (m *indexBlobManagerImpl) cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error {
|
||||
allCleanupBlobs, err := m.listCache.listBlobs(ctx, cleanupBlobPrefix)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error listing cleanup blobs")
|
||||
@@ -413,7 +412,7 @@ func (m *indexBlobManagerImpl) cleanup(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// pick cleanup entries to delete that are old enough
|
||||
compactionLogsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries)
|
||||
compactionLogsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries, maxEventualConsistencySettleTime)
|
||||
|
||||
if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogsToDelete); err != nil {
|
||||
return errors.Wrap(err, "unable to delete cleanup blobs")
|
||||
|
||||
@@ -496,7 +496,7 @@ func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) e
|
||||
inputs = append(inputs, bi.Metadata)
|
||||
}
|
||||
|
||||
if err := m.registerCompaction(ctx, inputs, outputs); err != nil {
|
||||
if err := m.registerCompaction(ctx, inputs, outputs, testEventualConsistencySettleTime); err != nil {
|
||||
return errors.Wrap(err, "compaction error")
|
||||
}
|
||||
|
||||
@@ -702,7 +702,7 @@ func mustRegisterCompaction(t *testing.T, m indexBlobManager, inputs, outputs []
|
||||
|
||||
t.Logf("compacting %v to %v", inputs, outputs)
|
||||
|
||||
err := m.registerCompaction(testlogging.Context(t), inputs, outputs)
|
||||
err := m.registerCompaction(testlogging.Context(t), inputs, outputs, testEventualConsistencySettleTime)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to write index blob: %v", err)
|
||||
}
|
||||
@@ -773,12 +773,11 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f
|
||||
blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, localTimeNow),
|
||||
localTimeNow,
|
||||
},
|
||||
indexBlobCache: passthroughContentCache{st},
|
||||
encryptor: enc,
|
||||
hasher: hf,
|
||||
listCache: lc,
|
||||
timeNow: localTimeNow,
|
||||
maxEventualConsistencySettleTime: testIndexBlobDeleteAge,
|
||||
indexBlobCache: passthroughContentCache{st},
|
||||
encryptor: enc,
|
||||
hasher: hf,
|
||||
listCache: lc,
|
||||
timeNow: localTimeNow,
|
||||
}
|
||||
|
||||
return m
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -14,38 +13,20 @@
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
// defaultBlobGCMinAge is a default MinAge for blob GC.
|
||||
// We're setting it to 2 hours to accommodate the fact that every repository
|
||||
// will periodically flush its indexes more frequently than 1/hour.
|
||||
const defaultBlobGCMinAge = 2 * time.Hour
|
||||
|
||||
// treat sessions that have not been updated in 4 days as expired.
|
||||
const defaultSessionExpirationAge = 4 * 24 * time.Hour
|
||||
|
||||
// DeleteUnreferencedBlobsOptions provides option for blob garbage collection algorithm.
|
||||
type DeleteUnreferencedBlobsOptions struct {
|
||||
Parallel int
|
||||
Prefix blob.ID
|
||||
MinAge time.Duration
|
||||
SessionExpirationAge time.Duration // treat sessions that have not been written for more than X time as expired
|
||||
DryRun bool
|
||||
Parallel int
|
||||
Prefix blob.ID
|
||||
DryRun bool
|
||||
}
|
||||
|
||||
// DeleteUnreferencedBlobs deletes old blobs that are no longer referenced by index entries.
|
||||
// nolint:gocyclo
|
||||
func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWriter, opt DeleteUnreferencedBlobsOptions) (int, error) {
|
||||
func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWriter, opt DeleteUnreferencedBlobsOptions, safety SafetyParameters) (int, error) {
|
||||
if opt.Parallel == 0 {
|
||||
opt.Parallel = 16
|
||||
}
|
||||
|
||||
if opt.MinAge == 0 {
|
||||
opt.MinAge = defaultBlobGCMinAge
|
||||
}
|
||||
|
||||
if opt.SessionExpirationAge == 0 {
|
||||
opt.SessionExpirationAge = defaultSessionExpirationAge
|
||||
}
|
||||
|
||||
const deleteQueueSize = 100
|
||||
|
||||
var unreferenced, deleted stats.CountSum
|
||||
@@ -91,14 +72,14 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
// iterate all pack blobs + session blobs and keep ones that are too young or
|
||||
// belong to alive sessions.
|
||||
if err := rep.ContentManager().IterateUnreferencedBlobs(ctx, prefixes, opt.Parallel, func(bm blob.Metadata) error {
|
||||
if age := rep.Time().Sub(bm.Timestamp); age < opt.MinAge {
|
||||
log(ctx).Debugf(" preserving %v because it's too new (age: %v bm: %v)", bm.BlobID, bm.Timestamp, age)
|
||||
if age := rep.Time().Sub(bm.Timestamp); age < safety.BlobDeleteMinAge {
|
||||
log(ctx).Debugf(" preserving %v because it's too new (age: %v<%v)", bm.BlobID, age, safety.BlobDeleteMinAge)
|
||||
return nil
|
||||
}
|
||||
|
||||
sid := content.SessionIDFromBlobID(bm.BlobID)
|
||||
if s, ok := activeSessions[sid]; ok {
|
||||
if age := rep.Time().Sub(s.CheckpointTime); age < opt.SessionExpirationAge {
|
||||
if age := rep.Time().Sub(s.CheckpointTime); age < safety.SessionExpirationAge {
|
||||
log(ctx).Debugf(" preserving %v because it's part of an active session (%v)", bm.BlobID, sid)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -68,19 +68,21 @@ func TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID2)
|
||||
|
||||
// new blobs not will be deleted because of minimum age requirement
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{
|
||||
MinAge: 1 * time.Hour,
|
||||
}); err != nil {
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, SafetyFull); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID1)
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID2)
|
||||
|
||||
// mixed safety parameters
|
||||
safetyFastDeleteLongSessionExpiration := SafetyParameters{
|
||||
BlobDeleteMinAge: 1,
|
||||
SessionExpirationAge: 4 * 24 * time.Hour,
|
||||
}
|
||||
|
||||
// new blobs will be deleted
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{
|
||||
MinAge: 1,
|
||||
}); err != nil {
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, SafetyNone); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -105,9 +107,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
CheckpointTime: ta.NowFunc()(),
|
||||
})
|
||||
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{
|
||||
MinAge: 1,
|
||||
}); err != nil {
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, safetyFastDeleteLongSessionExpiration); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -120,9 +120,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
// now finish session 2
|
||||
env.RepositoryWriter.BlobStorage().DeleteBlob(ctx, session2Marker)
|
||||
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{
|
||||
MinAge: 1,
|
||||
}); err != nil {
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, safetyFastDeleteLongSessionExpiration); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -135,9 +133,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
// now move time into the future making session 1 timed out
|
||||
ta.Advance(7 * 24 * time.Hour)
|
||||
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{
|
||||
MinAge: 1,
|
||||
}); err != nil {
|
||||
if _, err = DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, DeleteUnreferencedBlobsOptions{}, SafetyFull); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -155,7 +151,7 @@ func TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(blobsBefore, blobsAfter); diff != "" {
|
||||
t.Errorf("unexpected diff: %v", diff)
|
||||
t.Fatalf("unexpected diff: %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -15,14 +14,11 @@
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
)
|
||||
|
||||
const defaultRewriteContentsMinAge = 2 * time.Hour
|
||||
|
||||
const parallelContentRewritesCPUMultiplier = 2
|
||||
|
||||
// RewriteContentsOptions provides options for RewriteContents.
|
||||
type RewriteContentsOptions struct {
|
||||
Parallel int
|
||||
MinAge time.Duration
|
||||
ContentIDs []content.ID
|
||||
ContentIDRange content.IDRange
|
||||
PackPrefix blob.ID
|
||||
@@ -40,15 +36,11 @@ type contentInfoOrError struct {
|
||||
|
||||
// RewriteContents rewrites contents according to provided criteria and creates new
|
||||
// blobs and index entries to point at the.
|
||||
func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *RewriteContentsOptions) error {
|
||||
func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *RewriteContentsOptions, safety SafetyParameters) error {
|
||||
if opt == nil {
|
||||
return errors.Errorf("missing options")
|
||||
}
|
||||
|
||||
if opt.MinAge == 0 {
|
||||
opt.MinAge = defaultRewriteContentsMinAge
|
||||
}
|
||||
|
||||
if opt.ShortPacks {
|
||||
log(ctx).Infof("Rewriting contents from short packs...")
|
||||
} else {
|
||||
@@ -90,7 +82,7 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
|
||||
}
|
||||
|
||||
age := rep.Time().Sub(c.Timestamp())
|
||||
if age < opt.MinAge {
|
||||
if age < safety.RewriteMinAge {
|
||||
log(ctx).Debugf("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.", c.ID, c.Length, c.PackBlobID, optDeleted, age)
|
||||
continue
|
||||
}
|
||||
@@ -194,6 +186,16 @@ func(pi content.PackInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
blobMeta, err := rep.BlobReader().GetMetadata(ctx, pi.PackID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to get blob metadata %v", pi.PackID)
|
||||
}
|
||||
|
||||
// pack is short but the content consumes most of it, ignore.
|
||||
if pi.TotalSize > shortPackThresholdPercent*blobMeta.Length/100 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, ci := range pi.ContentInfos {
|
||||
ch <- contentInfoOrError{Info: ci}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,12 @@
|
||||
)
|
||||
|
||||
// DropDeletedContents rewrites indexes while dropping deleted contents above certain age.
|
||||
func DropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, dropDeletedBefore time.Time) error {
|
||||
func DropDeletedContents(ctx context.Context, rep repo.DirectRepositoryWriter, dropDeletedBefore time.Time, safety SafetyParameters) error {
|
||||
log(ctx).Infof("Dropping contents deleted before %v", dropDeletedBefore)
|
||||
|
||||
return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{
|
||||
AllIndexes: true,
|
||||
DropDeletedBefore: dropDeletedBefore,
|
||||
AllIndexes: true,
|
||||
DropDeletedBefore: dropDeletedBefore,
|
||||
DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -10,10 +10,11 @@
|
||||
const maxSmallBlobsForIndexCompaction = 8
|
||||
|
||||
// IndexCompaction rewrites index blobs to reduce their count but does not drop any contents.
|
||||
func IndexCompaction(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
func IndexCompaction(ctx context.Context, rep repo.DirectRepositoryWriter, safety SafetyParameters) error {
|
||||
log(ctx).Infof("Compacting indexes...")
|
||||
|
||||
return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{
|
||||
MaxSmallBlobs: maxSmallBlobsForIndexCompaction,
|
||||
MaxSmallBlobs: maxSmallBlobsForIndexCompaction,
|
||||
DisableEventualConsistencySafety: safety.DisableEventualConsistencySafety,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -20,15 +20,6 @@ type Params struct {
|
||||
|
||||
QuickCycle CycleParams `json:"quick"`
|
||||
FullCycle CycleParams `json:"full"`
|
||||
|
||||
SnapshotGC SnapshotGCParams `json:"snapshotGC"`
|
||||
}
|
||||
|
||||
// SnapshotGCParams contains parameters for Snapshot Garbage Collection
|
||||
// NOTE: Due to layering, the implementation of Snapshot GC is outside of repository package
|
||||
// but for simplicity we store it here.
|
||||
type SnapshotGCParams struct {
|
||||
MinContentAge time.Duration `json:"minAge"`
|
||||
}
|
||||
|
||||
// DefaultParams represents default values of maintenance parameters.
|
||||
@@ -42,9 +33,6 @@ func DefaultParams() Params {
|
||||
Enabled: true,
|
||||
Interval: 1 * time.Hour,
|
||||
},
|
||||
SnapshotGC: SnapshotGCParams{
|
||||
MinContentAge: 24 * time.Hour, //nolint:gomnd
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,16 +14,6 @@
|
||||
"github.com/kopia/kopia/repo/logging"
|
||||
)
|
||||
|
||||
// safetyMarginBetweenSnapshotGC is the minimal amount of time that must pass between snapshot
|
||||
// GC cycles to allow all in-flight snapshots during earlier GC to be flushed to be flushed and
|
||||
// visible to a following GC. The uploader will automatically create a checkpoint every 45 minutes,
|
||||
// so ~1 hour should be enough but we're setting this to a higher conservative value for extra safety.
|
||||
const (
|
||||
safetyMarginBetweenSnapshotGC = 4 * time.Hour
|
||||
|
||||
extraSafetyMarginBeforeDroppingContentFromIndex = -1 * time.Hour
|
||||
)
|
||||
|
||||
var log = logging.GetContextLoggerFunc("maintenance")
|
||||
|
||||
// Mode describes the mode of maintenance to perfor.
|
||||
@@ -186,20 +176,20 @@ func RunExclusive(ctx context.Context, rep repo.DirectRepositoryWriter, mode Mod
|
||||
}
|
||||
|
||||
// Run performs maintenance activities for a repository.
|
||||
func Run(ctx context.Context, runParams RunParameters) error {
|
||||
func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
|
||||
switch runParams.Mode {
|
||||
case ModeQuick:
|
||||
return runQuickMaintenance(ctx, runParams)
|
||||
return runQuickMaintenance(ctx, runParams, safety)
|
||||
|
||||
case ModeFull:
|
||||
return runFullMaintenance(ctx, runParams)
|
||||
return runFullMaintenance(ctx, runParams, safety)
|
||||
|
||||
default:
|
||||
return errors.Errorf("unknown mode %q", runParams.Mode)
|
||||
}
|
||||
}
|
||||
|
||||
func runQuickMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
|
||||
// find 'q' packs that are less than 80% full and rewrite contents in them into
|
||||
// new consolidated packs, orphaning old packs in the process.
|
||||
if err := ReportRun(ctx, runParams.rep, "quick-rewrite-contents", func() error {
|
||||
@@ -207,7 +197,7 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
ContentIDRange: content.AllPrefixedIDs,
|
||||
PackPrefix: content.PackBlobIDPrefixSpecial,
|
||||
ShortPacks: true,
|
||||
})
|
||||
}, safety)
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error rewriting metadata contents")
|
||||
}
|
||||
@@ -216,7 +206,7 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
if err := ReportRun(ctx, runParams.rep, "quick-delete-blobs", func() error {
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
|
||||
Prefix: content.PackBlobIDPrefixSpecial,
|
||||
})
|
||||
}, safety)
|
||||
return err
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced metadata blobs")
|
||||
@@ -224,7 +214,7 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
|
||||
// consolidate many smaller indexes into fewer larger ones.
|
||||
if err := ReportRun(ctx, runParams.rep, "index-compaction", func() error {
|
||||
return IndexCompaction(ctx, runParams.rep)
|
||||
return IndexCompaction(ctx, runParams.rep, safety)
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error performing index compaction")
|
||||
}
|
||||
@@ -232,19 +222,27 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func runFullMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
s, err := GetSchedule(ctx, runParams.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get schedule")
|
||||
func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
|
||||
var safeDropTime time.Time
|
||||
|
||||
if safety.RequireTwoGCCycles {
|
||||
s, err := GetSchedule(ctx, runParams.rep)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get schedule")
|
||||
}
|
||||
|
||||
safeDropTime = findSafeDropTime(s.Runs["snapshot-gc"], safety)
|
||||
} else {
|
||||
safeDropTime = runParams.rep.Time()
|
||||
}
|
||||
|
||||
if safeDropTime := findSafeDropTime(s.Runs["snapshot-gc"]); !safeDropTime.IsZero() {
|
||||
if !safeDropTime.IsZero() {
|
||||
log(ctx).Infof("Found safe time to drop indexes: %v", safeDropTime)
|
||||
|
||||
// rewrite indexes by dropping content entries that have been marked
|
||||
// as deleted for a long time
|
||||
if err := ReportRun(ctx, runParams.rep, "full-drop-deleted-content", func() error {
|
||||
return DropDeletedContents(ctx, runParams.rep, safeDropTime)
|
||||
return DropDeletedContents(ctx, runParams.rep, safeDropTime, safety)
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error dropping deleted contents")
|
||||
}
|
||||
@@ -258,14 +256,14 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
return RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
|
||||
ContentIDRange: content.AllIDs,
|
||||
ShortPacks: true,
|
||||
})
|
||||
}, safety)
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error rewriting contents in short packs")
|
||||
}
|
||||
|
||||
// delete orphaned packs after some time.
|
||||
if err := ReportRun(ctx, runParams.rep, "full-delete-blobs", func() error {
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{})
|
||||
_, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}, safety)
|
||||
return err
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced blobs")
|
||||
@@ -278,7 +276,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
// deleted before that time, because at least two successful GC cycles have completed
|
||||
// and minimum required time between the GCs has passed.
|
||||
//
|
||||
// The worst possible case we needf to handle is:
|
||||
// The worst possible case we need to handle is:
|
||||
//
|
||||
// Step #1 - race between GC and snapshot creation:
|
||||
//
|
||||
@@ -297,8 +295,8 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters) error {
|
||||
// After Step 2 completes, we know for sure that all contents deleted before Step #1 has started
|
||||
// are safe to drop from the index because Step #2 has fixed them, as long as all snapshots that
|
||||
// were racing with snapshot GC in step #1 have flushed pending writes, hence the
|
||||
// safetyMarginBetweenSnapshotGC.
|
||||
func findSafeDropTime(runs []RunInfo) time.Time {
|
||||
// safety.MarginBetweenSnapshotGC.
|
||||
func findSafeDropTime(runs []RunInfo, safety SafetyParameters) time.Time {
|
||||
var successfulRuns []RunInfo
|
||||
|
||||
for _, r := range runs {
|
||||
@@ -319,8 +317,8 @@ func findSafeDropTime(runs []RunInfo) time.Time {
|
||||
// Look for previous successful run such that the time between GCs exceeds the safety margin.
|
||||
for _, r := range successfulRuns[1:] {
|
||||
diff := -r.End.Sub(successfulRuns[0].Start)
|
||||
if diff > safetyMarginBetweenSnapshotGC {
|
||||
return r.Start.Add(extraSafetyMarginBeforeDroppingContentFromIndex)
|
||||
if diff > safety.MarginBetweenSnapshotGC {
|
||||
return r.Start.Add(-safety.DropContentFromIndexExtraMargin)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ func TestFindSafeDropTime(t *testing.T) {
|
||||
{Start: t0700, End: t0715, Success: true},
|
||||
{Start: t1300, End: t1315, Success: true},
|
||||
},
|
||||
wantTime: t0700.Add(extraSafetyMarginBeforeDroppingContentFromIndex),
|
||||
wantTime: t0700.Add(-SafetyFull.DropContentFromIndexExtraMargin),
|
||||
},
|
||||
// three runs spaced enough
|
||||
{
|
||||
@@ -54,7 +54,7 @@ func TestFindSafeDropTime(t *testing.T) {
|
||||
{Start: t0900, End: t0915, Success: true},
|
||||
{Start: t1300, End: t1315, Success: true},
|
||||
},
|
||||
wantTime: t0700.Add(extraSafetyMarginBeforeDroppingContentFromIndex),
|
||||
wantTime: t0700.Add(-SafetyFull.DropContentFromIndexExtraMargin),
|
||||
},
|
||||
// three runs spaced enough, not successful
|
||||
{
|
||||
@@ -68,7 +68,7 @@ func TestFindSafeDropTime(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
if got, want := findSafeDropTime(tc.runs), tc.wantTime; !got.Equal(want) {
|
||||
if got, want := findSafeDropTime(tc.runs, SafetyFull), tc.wantTime; !got.Equal(want) {
|
||||
t.Errorf("invalid safe drop time for %v: %v, want %v", tc.runs, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
62
repo/maintenance/maintenance_safety.go
Normal file
62
repo/maintenance/maintenance_safety.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package maintenance
|
||||
|
||||
import "time"
|
||||
|
||||
// SafetyParameters specifies timing parameters that affect safety of maintenance.
|
||||
type SafetyParameters struct {
|
||||
// Do not rewrite contents younger than this age.
|
||||
RewriteMinAge time.Duration
|
||||
|
||||
// Snapshot GC: MinContentAgeSubjectToGC is the minimum age of content to be subject to garbage collection.
|
||||
MinContentAgeSubjectToGC time.Duration
|
||||
|
||||
// MarginBetweenSnapshotGC is the minimal amount of time that must pass between snapshot
|
||||
// GC cycles to allow all in-flight snapshots during earlier GC to be flushed and
|
||||
// visible to a following GC. The uploader will automatically create a checkpoint every 45 minutes,
|
||||
// so ~1 hour should be enough but we're setting this to a higher conservative value for extra safety.
|
||||
MarginBetweenSnapshotGC time.Duration
|
||||
|
||||
// RequireTwoGCCycles indicates that two GC cycles are required.
|
||||
RequireTwoGCCycles bool
|
||||
|
||||
// DisableEventualConsistencySafety disables wait time to allow settling of eventually-consistent writes in blob stores.
|
||||
DisableEventualConsistencySafety bool
|
||||
|
||||
// DropContentFromIndexExtraMargin is the amount of margin time before dropping deleted contents from indices.
|
||||
DropContentFromIndexExtraMargin time.Duration
|
||||
|
||||
// Blob GC: Delete unused blobs above this age.
|
||||
BlobDeleteMinAge time.Duration
|
||||
|
||||
// Blob GC: Drop incomplete session blobs above this age.
|
||||
SessionExpirationAge time.Duration
|
||||
}
|
||||
|
||||
// Supported safety levels.
|
||||
var (
|
||||
// SafetyNone has safety parameters which allow full garbage collection without unnecessary
|
||||
// delays, but it is safe only if no other kopia clients are running and storage backend is
|
||||
// strongly consistent.
|
||||
SafetyNone = SafetyParameters{
|
||||
BlobDeleteMinAge: 0,
|
||||
DropContentFromIndexExtraMargin: 0,
|
||||
MarginBetweenSnapshotGC: 0,
|
||||
MinContentAgeSubjectToGC: 0,
|
||||
RewriteMinAge: 0,
|
||||
SessionExpirationAge: 0,
|
||||
RequireTwoGCCycles: false,
|
||||
DisableEventualConsistencySafety: true,
|
||||
}
|
||||
|
||||
// SafetyFull has default safety parameters which allow safe GC concurrent with snapshotting
|
||||
// by other Kopia clients.
|
||||
SafetyFull = SafetyParameters{
|
||||
BlobDeleteMinAge: 2 * time.Hour, //nolint:gomnd
|
||||
DropContentFromIndexExtraMargin: time.Hour,
|
||||
MarginBetweenSnapshotGC: 4 * time.Hour, //nolint:gomnd
|
||||
MinContentAgeSubjectToGC: 24 * time.Hour, //nolint:gomnd
|
||||
RewriteMinAge: 2 * time.Hour, //nolint:gomnd
|
||||
SessionExpirationAge: 96 * time.Hour, //nolint:gomnd
|
||||
RequireTwoGCCycles: true,
|
||||
}
|
||||
)
|
||||
@@ -74,17 +74,17 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma
|
||||
}
|
||||
|
||||
// Run performs garbage collection on all the snapshots in the repository.
|
||||
func Run(ctx context.Context, rep repo.DirectRepositoryWriter, params maintenance.SnapshotGCParams, gcDelete bool) (Stats, error) {
|
||||
func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters) (Stats, error) {
|
||||
var st Stats
|
||||
|
||||
err := maintenance.ReportRun(ctx, rep, "snapshot-gc", func() error {
|
||||
return runInternal(ctx, rep, params, gcDelete, &st)
|
||||
return runInternal(ctx, rep, gcDelete, safety, &st)
|
||||
})
|
||||
|
||||
return st, errors.Wrap(err, "error running snapshot gc")
|
||||
}
|
||||
|
||||
func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, params maintenance.SnapshotGCParams, gcDelete bool, st *Stats) error {
|
||||
func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, st *Stats) error {
|
||||
var (
|
||||
used sync.Map
|
||||
|
||||
@@ -117,7 +117,7 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, params ma
|
||||
return nil
|
||||
}
|
||||
|
||||
if rep.Time().Sub(ci.Timestamp()) < params.MinContentAge {
|
||||
if rep.Time().Sub(ci.Timestamp()) < safety.MinContentAgeSubjectToGC {
|
||||
log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ID, ci.Length, ci.Timestamp())
|
||||
tooRecent.Add(int64(ci.Length))
|
||||
return nil
|
||||
@@ -158,5 +158,5 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, params ma
|
||||
return errors.Errorf("Not deleting because '--delete' flag was not set")
|
||||
}
|
||||
|
||||
return nil
|
||||
return errors.Wrap(rep.Flush(ctx), "flush error")
|
||||
}
|
||||
|
||||
@@ -12,16 +12,16 @@
|
||||
)
|
||||
|
||||
// Run runs the complete snapshot and repository maintenance.
|
||||
func Run(ctx context.Context, dr repo.DirectRepositoryWriter, mode maintenance.Mode, force bool) error {
|
||||
func Run(ctx context.Context, dr repo.DirectRepositoryWriter, mode maintenance.Mode, force bool, safety maintenance.SafetyParameters) error {
|
||||
return maintenance.RunExclusive(ctx, dr, mode, force,
|
||||
func(runParams maintenance.RunParameters) error {
|
||||
// run snapshot GC before full maintenance
|
||||
if runParams.Mode == maintenance.ModeFull {
|
||||
if _, err := snapshotgc.Run(ctx, dr, runParams.Params.SnapshotGC, true); err != nil {
|
||||
if _, err := snapshotgc.Run(ctx, dr, true, safety); err != nil {
|
||||
return errors.Wrap(err, "snapshot GC failure")
|
||||
}
|
||||
}
|
||||
|
||||
return maintenance.Run(ctx, runParams)
|
||||
return maintenance.Run(ctx, runParams, safety)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -55,10 +55,12 @@ func TestSnapshotGCSimple(t *testing.T) {
|
||||
|
||||
mustFlush(t, th.RepositoryWriter)
|
||||
|
||||
// Advance time to force GC to mark as deleted the contents from the previous snapshot
|
||||
th.fakeTime.Advance(maintenance.DefaultParams().SnapshotGC.MinContentAge + time.Hour)
|
||||
safety := maintenance.SafetyFull
|
||||
|
||||
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true)
|
||||
// Advance time to force GC to mark as deleted the contents from the previous snapshot
|
||||
th.fakeTime.Advance(safety.MinContentAgeSubjectToGC + time.Hour)
|
||||
|
||||
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)
|
||||
require.NoError(t, err)
|
||||
|
||||
mustFlush(t, th.RepositoryWriter)
|
||||
@@ -109,8 +111,10 @@ func TestMaintenanceReuseDirManifest(t *testing.T) {
|
||||
|
||||
mustFlush(t, th.RepositoryWriter)
|
||||
|
||||
safety := maintenance.SafetyFull
|
||||
|
||||
// Advance time to force GC to mark as deleted the contents from the previous snapshot
|
||||
th.fakeTime.Advance(maintenance.DefaultParams().SnapshotGC.MinContentAge + time.Hour)
|
||||
th.fakeTime.Advance(safety.MinContentAgeSubjectToGC + time.Hour)
|
||||
|
||||
r2 := th.openAnother(t)
|
||||
|
||||
@@ -120,7 +124,7 @@ func TestMaintenanceReuseDirManifest(t *testing.T) {
|
||||
// interleaving snapshot and maintenance and delaying flushing as well to
|
||||
// create dangling references to contents that were in the previously
|
||||
// deleted snapshot and that are reused in this new snapshot.
|
||||
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true)
|
||||
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := r2.(repo.DirectRepository).ContentReader().ContentInfo(ctx, content.ID(s2.RootObjectID()))
|
||||
@@ -145,8 +149,8 @@ func TestMaintenanceReuseDirManifest(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Run maintenance again
|
||||
th.fakeTime.Advance(maintenance.DefaultParams().SnapshotGC.MinContentAge + time.Hour)
|
||||
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true)
|
||||
th.fakeTime.Advance(safety.MinContentAgeSubjectToGC + time.Hour)
|
||||
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, safety)
|
||||
require.NoError(t, err)
|
||||
mustFlush(t, th.RepositoryWriter)
|
||||
|
||||
|
||||
56
tests/end_to_end_test/maintenance_test.go
Normal file
56
tests/end_to_end_test/maintenance_test.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package endtoend_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/tests/testenv"
|
||||
)
|
||||
|
||||
func TestFullMaintenance(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
e := testenv.NewCLITest(t)
|
||||
|
||||
e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir)
|
||||
defer e.RunAndExpectSuccess(t, "repo", "disconnect")
|
||||
|
||||
var snap snapshot.Manifest
|
||||
|
||||
// after creation we'll have kopia.repository, 1 index + 1 pack blob
|
||||
if got, want := e.RunAndExpectSuccess(t, "blob", "list"), 3; len(got) != want {
|
||||
t.Fatalf("unexpected number of initial blobs: %v, want %v", got, want)
|
||||
}
|
||||
|
||||
mustParseJSONLines(t, e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1, "--json"), &snap)
|
||||
e.RunAndExpectSuccess(t, "snapshot", "delete", string(snap.ID), "--delete")
|
||||
|
||||
e.RunAndVerifyOutputLineCount(t, 0, "snapshot", "list")
|
||||
|
||||
originalBlobCount := len(e.RunAndExpectSuccess(t, "blob", "list"))
|
||||
|
||||
e.RunAndVerifyOutputLineCount(t, 0, "maintenance", "run", "--full")
|
||||
|
||||
if got := len(e.RunAndExpectSuccess(t, "blob", "list")); got != originalBlobCount {
|
||||
t.Fatalf("full maintenance is not expected to change any blobs due to safety margins (got %v, was %v)", got, originalBlobCount)
|
||||
}
|
||||
|
||||
// now rerun with --safety=none
|
||||
e.RunAndExpectSuccess(t, "maintenance", "run", "--full", "--safety=none")
|
||||
|
||||
if got := len(e.RunAndExpectSuccess(t, "blob", "list")); got >= originalBlobCount {
|
||||
t.Fatalf("maintenance did not remove blobs: %v, had %v", got, originalBlobCount)
|
||||
}
|
||||
|
||||
// we're expecting to have 5 blobs:
|
||||
// - kopia.maintenance
|
||||
// - kopia.repository
|
||||
// - 2 index blobs
|
||||
// - 1 q blob
|
||||
|
||||
const blobCountAfterFullWipeout = 5
|
||||
|
||||
if got, want := e.RunAndExpectSuccess(t, "blob", "list"), blobCountAfterFullWipeout; len(got) != want {
|
||||
t.Fatalf("maintenance left unwanted blobs: %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
@@ -223,7 +223,7 @@ func TestSnapshotDeleteRestore(t *testing.T) {
|
||||
e.RunAndExpectFailure(t, "snapshot", "delete", snapID, "--delete")
|
||||
|
||||
// garbage-collect to clean up the root object.
|
||||
e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--min-age", "0s")
|
||||
e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--safety=none")
|
||||
|
||||
// Run a restore on the deleted snapshot's root ID. The root should be
|
||||
// marked as deleted but still recoverable
|
||||
|
||||
@@ -52,9 +52,14 @@ func TestSnapshotGC(t *testing.T) {
|
||||
// run verification
|
||||
e.RunAndExpectSuccess(t, "snapshot", "verify")
|
||||
|
||||
// garbage-collect in dry run mode
|
||||
// garbage-collect in dry run mode - this will not fail because of default safety level
|
||||
// which only looks at contents above certain age.
|
||||
e.RunAndExpectSuccess(t, "snapshot", "gc")
|
||||
|
||||
// garbage-collect in dry run mode - this will fail because of --safety=none
|
||||
// makes contents subject to GC immediately but we're not specifying --delete flag.
|
||||
e.RunAndExpectFailure(t, "snapshot", "gc", "--safety=none")
|
||||
|
||||
// data block + directory block + manifest block + manifest block from manifest deletion
|
||||
var contentInfo []content.Info
|
||||
|
||||
@@ -74,7 +79,7 @@ func TestSnapshotGC(t *testing.T) {
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// garbage-collect for real, this time without age limit
|
||||
e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--min-age", "0s")
|
||||
e.RunAndExpectSuccess(t, "snapshot", "gc", "--delete", "--safety=none")
|
||||
|
||||
// two contents are deleted
|
||||
expectedContentCount -= 2
|
||||
|
||||
@@ -74,8 +74,12 @@ func NewCLITest(t *testing.T) *CLITest {
|
||||
|
||||
exe := os.Getenv("KOPIA_EXE")
|
||||
if exe == "" {
|
||||
// exe = "kopia"
|
||||
t.Skip()
|
||||
if os.Getenv("VSCODE_PID") != "" {
|
||||
// we're launched from VSCode, use system-installed kopia executable.
|
||||
exe = "kopia"
|
||||
} else {
|
||||
t.Skip()
|
||||
}
|
||||
}
|
||||
|
||||
// unset environment variables that disrupt tests when passed to subprocesses.
|
||||
|
||||
Reference in New Issue
Block a user