mirror of
https://github.com/kopia/kopia.git
synced 2026-05-09 23:33:22 -04:00
refactor(general): consistent use "pack" where possible (#4952)
In kopia, "blob" is a generic term to refer to either an object in an object storage provider, or a file in a file system storage provider. There are various types of blobs in a kopia repository. In kopia, the term "pack" is used to refer to specific types of blobs, namely 'p' & 'q' pack blobs, that store "content" data, as opposed to say, "index" blobs. This change attempts to use the term "pack" consistently in the functions and types used for pack deletion. Note that the corresponding task names, shown below, remain unchanged since these names are used in the persistent maintenance run metadata, and that is used to make decisions about the safety of the execution of those tasks. ``` TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs" TaskDeleteOrphanedBlobsFull = "full-delete-blobs" ```
This commit is contained in:
@@ -33,13 +33,13 @@ func (c *commandBlobGC) setup(svc appServices, parent commandParent) {
|
||||
func (c *commandBlobGC) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
c.svc.dangerousCommand()
|
||||
|
||||
opts := maintenance.DeleteUnreferencedBlobsOptions{
|
||||
opts := maintenance.DeleteUnreferencedPacksOptions{
|
||||
DryRun: c.delete != "yes",
|
||||
Parallel: c.parallel,
|
||||
Prefix: blob.ID(c.prefix),
|
||||
}
|
||||
|
||||
stats, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts, c.safety)
|
||||
stats, err := maintenance.DeleteUnreferencedPacks(ctx, rep, opts, c.safety)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced blobs")
|
||||
}
|
||||
|
||||
@@ -294,17 +294,17 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa
|
||||
// and we'd never delete blobs orphaned by full rewrite.
|
||||
if hadRecentFullRewrite(s) {
|
||||
userLog(ctx).Debug("Had recent full rewrite - performing full blob deletion.")
|
||||
err = runTaskDeleteOrphanedBlobsFull(ctx, runParams, s, safety)
|
||||
err = runTaskDeleteOrphanedPacksFull(ctx, runParams, s, safety)
|
||||
} else {
|
||||
userLog(ctx).Debug("Performing quick blob deletion.")
|
||||
err = runTaskDeleteOrphanedBlobsQuick(ctx, runParams, s, safety)
|
||||
err = runTaskDeleteOrphanedPacksQuick(ctx, runParams, s, safety)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced metadata blobs")
|
||||
}
|
||||
} else {
|
||||
notDeletingOrphanedBlobs(ctx, log, s, safety)
|
||||
notDeletingOrphanedPacks(ctx, log, s, safety)
|
||||
}
|
||||
|
||||
// consolidate many smaller indexes into fewer larger ones.
|
||||
@@ -324,8 +324,8 @@ func notRewritingContents(ctx context.Context, log *contentlog.Logger) {
|
||||
contentlog.Log(ctx, log, "Previous content rewrite has not been finalized yet, waiting until the next blob deletion.")
|
||||
}
|
||||
|
||||
func notDeletingOrphanedBlobs(ctx context.Context, log *contentlog.Logger, s *Schedule, safety SafetyParameters) {
|
||||
left := nextBlobDeleteTime(s, safety).Sub(clock.Now()).Truncate(time.Second)
|
||||
func notDeletingOrphanedPacks(ctx context.Context, log *contentlog.Logger, s *Schedule, safety SafetyParameters) {
|
||||
left := nextPackDeleteTime(s, safety).Sub(clock.Now()).Truncate(time.Second)
|
||||
|
||||
contentlog.Log1(ctx, log, "Skipping blob deletion because not enough time has passed yet", logparam.Duration("left", left))
|
||||
}
|
||||
@@ -467,18 +467,18 @@ func runTaskRewriteContentsFull(ctx context.Context, runParams RunParameters, s
|
||||
})
|
||||
}
|
||||
|
||||
func runTaskDeleteOrphanedBlobsFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
func runTaskDeleteOrphanedPacksFull(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDeleteOrphanedBlobsFull, s, func() (maintenancestats.Kind, error) {
|
||||
return DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
|
||||
return DeleteUnreferencedPacks(ctx, runParams.rep, DeleteUnreferencedPacksOptions{
|
||||
NotAfterTime: runParams.MaintenanceStartTime,
|
||||
Parallel: runParams.Params.ListParallelism,
|
||||
}, safety)
|
||||
})
|
||||
}
|
||||
|
||||
func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
func runTaskDeleteOrphanedPacksQuick(ctx context.Context, runParams RunParameters, s *Schedule, safety SafetyParameters) error {
|
||||
return reportRunAndMaybeCheckContentIndex(ctx, runParams.rep, TaskDeleteOrphanedBlobsQuick, s, func() (maintenancestats.Kind, error) {
|
||||
return DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
|
||||
return DeleteUnreferencedPacks(ctx, runParams.rep, DeleteUnreferencedPacksOptions{
|
||||
NotAfterTime: runParams.MaintenanceStartTime,
|
||||
Prefix: content.PackBlobIDPrefixSpecial,
|
||||
Parallel: runParams.Params.ListParallelism,
|
||||
@@ -518,11 +518,11 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf
|
||||
|
||||
if shouldDeleteOrphanedPacks(runParams.rep.Time(), s, safety) {
|
||||
// delete orphaned packs after some time.
|
||||
if err := runTaskDeleteOrphanedBlobsFull(ctx, runParams, s, safety); err != nil {
|
||||
if err := runTaskDeleteOrphanedPacksFull(ctx, runParams, s, safety); err != nil {
|
||||
return errors.Wrap(err, "error deleting unreferenced blobs")
|
||||
}
|
||||
} else {
|
||||
notDeletingOrphanedBlobs(ctx, log, s, safety)
|
||||
notDeletingOrphanedPacks(ctx, log, s, safety)
|
||||
}
|
||||
|
||||
// extend retention-time on supported storage.
|
||||
@@ -584,10 +584,10 @@ func shouldFullRewriteContents(s *Schedule, safety SafetyParameters) bool {
|
||||
// rewritten packs become orphaned immediately but if we don't wait before their deletion
|
||||
// clients who have old indexes cached may be trying to read pre-rewrite blobs.
|
||||
func shouldDeleteOrphanedPacks(now time.Time, s *Schedule, safety SafetyParameters) bool {
|
||||
return !now.Before(nextBlobDeleteTime(s, safety))
|
||||
return !now.Before(nextPackDeleteTime(s, safety))
|
||||
}
|
||||
|
||||
func nextBlobDeleteTime(s *Schedule, safety SafetyParameters) time.Time {
|
||||
func nextPackDeleteTime(s *Schedule, safety SafetyParameters) time.Time {
|
||||
latestContentRewriteEndTime := maxEndTime(s.Runs[TaskRewriteContentsFull], s.Runs[TaskRewriteContentsQuick])
|
||||
if latestContentRewriteEndTime.IsZero() {
|
||||
return time.Time{}
|
||||
|
||||
@@ -26,7 +26,7 @@ type SafetyParameters struct {
|
||||
DropContentFromIndexExtraMargin time.Duration
|
||||
|
||||
// Blob GC: Delete unused blobs above this age.
|
||||
BlobDeleteMinAge time.Duration
|
||||
PackDeleteMinAge time.Duration
|
||||
|
||||
// Blob GC: Drop incomplete session blobs above this age.
|
||||
SessionExpirationAge time.Duration
|
||||
@@ -43,7 +43,7 @@ type SafetyParameters struct {
|
||||
// delays, but it is safe only if no other kopia clients are running and storage backend is
|
||||
// strongly consistent.
|
||||
SafetyNone = SafetyParameters{
|
||||
BlobDeleteMinAge: 0,
|
||||
PackDeleteMinAge: 0,
|
||||
DropContentFromIndexExtraMargin: 0,
|
||||
MarginBetweenSnapshotGC: 0,
|
||||
MinContentAgeSubjectToGC: 0,
|
||||
@@ -56,7 +56,7 @@ type SafetyParameters struct {
|
||||
// SafetyFull has default safety parameters which allow safe GC concurrent with snapshotting
|
||||
// by other Kopia clients.
|
||||
SafetyFull = SafetyParameters{
|
||||
BlobDeleteMinAge: 24 * time.Hour, //nolint:mnd
|
||||
PackDeleteMinAge: 24 * time.Hour, //nolint:mnd
|
||||
DropContentFromIndexExtraMargin: time.Hour,
|
||||
MarginBetweenSnapshotGC: 4 * time.Hour, //nolint:mnd
|
||||
MinContentAgeSubjectToGC: 24 * time.Hour, //nolint:mnd
|
||||
|
||||
@@ -17,22 +17,22 @@
|
||||
"github.com/kopia/kopia/repo/maintenancestats"
|
||||
)
|
||||
|
||||
// DeleteUnreferencedBlobsOptions provides option for blob garbage collection algorithm.
|
||||
type DeleteUnreferencedBlobsOptions struct {
|
||||
// DeleteUnreferencedPacksOptions provides option for pack garbage collection algorithm.
|
||||
type DeleteUnreferencedPacksOptions struct {
|
||||
Parallel int
|
||||
Prefix blob.ID
|
||||
DryRun bool
|
||||
NotAfterTime time.Time
|
||||
}
|
||||
|
||||
// DeleteUnreferencedBlobs deletes blobs that are unreferenced by index entries.
|
||||
// DeleteUnreferencedPacks deletes pack blobs that are unreferenced by index entries.
|
||||
//
|
||||
//nolint:gocyclo,funlen
|
||||
func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWriter, opt DeleteUnreferencedBlobsOptions, safety SafetyParameters) (*maintenancestats.DeleteUnreferencedPacksStats, error) {
|
||||
func DeleteUnreferencedPacks(ctx context.Context, rep repo.DirectRepositoryWriter, opt DeleteUnreferencedPacksOptions, safety SafetyParameters) (*maintenancestats.DeleteUnreferencedPacksStats, error) {
|
||||
ctx = contentlog.WithParams(ctx,
|
||||
logparam.String("span:blob-gc", contentlog.RandomSpanID()))
|
||||
logparam.String("span:pack-gc", contentlog.RandomSpanID()))
|
||||
|
||||
log := rep.LogManager().NewLogger("maintenance-blob-gc")
|
||||
log := rep.LogManager().NewLogger("maintenance-pack-gc")
|
||||
|
||||
if opt.Parallel == 0 {
|
||||
opt.Parallel = 16
|
||||
@@ -47,17 +47,17 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
unused := make(chan blob.Metadata, deleteQueueSize)
|
||||
|
||||
if !opt.DryRun {
|
||||
// start goroutines to delete blobs as they come.
|
||||
// start goroutines to delete packs as they come.
|
||||
for range opt.Parallel {
|
||||
eg.Go(func() error {
|
||||
for bm := range unused {
|
||||
if err := rep.BlobStorage().DeleteBlob(ctx, bm.BlobID); err != nil {
|
||||
return errors.Wrapf(err, "unable to delete blob %q", bm.BlobID)
|
||||
return errors.Wrapf(err, "unable to delete pack blob %q", bm.BlobID)
|
||||
}
|
||||
|
||||
cnt, del := deleted.Add(bm.Length)
|
||||
if cnt%100 == 0 {
|
||||
contentlog.Log2(ctx, log, "deleted unreferenced blobs", logparam.UInt32("count", cnt), logparam.Int64("bytes", del))
|
||||
contentlog.Log2(ctx, log, "deleted unreferenced pack blobs", logparam.UInt32("count", cnt), logparam.Int64("bytes", del))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,8 +66,8 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
}
|
||||
}
|
||||
|
||||
// iterate unreferenced blobs and count them + optionally send to the channel to be deleted
|
||||
contentlog.Log(ctx, log, "Looking for unreferenced blobs...")
|
||||
// iterate unreferenced packs and count them + optionally send to the channel to be deleted
|
||||
contentlog.Log(ctx, log, "Looking for unreferenced pack blobs...")
|
||||
|
||||
var prefixes []blob.ID
|
||||
if p := opt.Prefix; p != "" {
|
||||
@@ -100,18 +100,18 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
retained.Add(bm.Length)
|
||||
|
||||
contentlog.Log3(ctx, log,
|
||||
"preserving blob - after cutoff time",
|
||||
"preserving pack - after cutoff time",
|
||||
blobparam.BlobID("blobID", bm.BlobID),
|
||||
logparam.Time("cutoffTime", cutoffTime),
|
||||
logparam.Time("timestamp", bm.Timestamp))
|
||||
return nil
|
||||
}
|
||||
|
||||
if age := cutoffTime.Sub(bm.Timestamp); age < safety.BlobDeleteMinAge {
|
||||
if age := cutoffTime.Sub(bm.Timestamp); age < safety.PackDeleteMinAge {
|
||||
retained.Add(bm.Length)
|
||||
|
||||
contentlog.Log2(ctx, log,
|
||||
"preserving blob - below min age",
|
||||
"preserving pack - below min age",
|
||||
blobparam.BlobID("blobID", bm.BlobID),
|
||||
logparam.Duration("age", age))
|
||||
return nil
|
||||
@@ -123,7 +123,7 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
retained.Add(bm.Length)
|
||||
|
||||
contentlog.Log2(ctx, log,
|
||||
"preserving blob - part of active session",
|
||||
"preserving pack - part of active session",
|
||||
blobparam.BlobID("blobID", bm.BlobID),
|
||||
logparam.String("sessionID", string(sid)))
|
||||
return nil
|
||||
@@ -138,7 +138,7 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "error looking for unreferenced blobs")
|
||||
return nil, errors.Wrap(err, "error looking for unreferenced pack blobs")
|
||||
}
|
||||
|
||||
close(unused)
|
||||
@@ -155,7 +155,7 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
DeletedTotalSize: 0,
|
||||
}
|
||||
|
||||
contentlog.Log1(ctx, log, "Found unreferenced blobs to delete", result)
|
||||
contentlog.Log1(ctx, log, "Found unreferenced pack blobs to delete", result)
|
||||
|
||||
// wait for all delete workers to finish.
|
||||
if err := eg.Wait(); err != nil {
|
||||
@@ -170,7 +170,7 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
result.DeletedPackCount = deletedCount
|
||||
result.DeletedTotalSize = deletedSize
|
||||
|
||||
contentlog.Log1(ctx, log, "Completed deleting unreferenced blobs", result)
|
||||
contentlog.Log1(ctx, log, "Completed deleting unreferenced pack blobs", result)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
@@ -30,7 +30,7 @@
|
||||
|
||||
var testMasterKey = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
|
||||
|
||||
func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
func (s *formatSpecificTestSuite) TestDeleteUnreferencedPacks(t *testing.T) {
|
||||
// set up fake clock which is initially synchronized to wall clock time
|
||||
// and moved at the same speed but which can be moved forward.
|
||||
ta := faketime.NewClockTimeWithOffset(0)
|
||||
@@ -70,7 +70,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID2)
|
||||
|
||||
// new blobs not will be deleted because of minimum age requirement
|
||||
_, err = maintenance.DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedBlobsOptions{}, maintenance.SafetyFull)
|
||||
_, err = maintenance.DeleteUnreferencedPacks(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedPacksOptions{}, maintenance.SafetyFull)
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobID1)
|
||||
@@ -78,12 +78,12 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
|
||||
// mixed safety parameters
|
||||
safetyFastDeleteLongSessionExpiration := maintenance.SafetyParameters{
|
||||
BlobDeleteMinAge: 1,
|
||||
PackDeleteMinAge: 1,
|
||||
SessionExpirationAge: 4 * 24 * time.Hour,
|
||||
}
|
||||
|
||||
// new blobs will be deleted
|
||||
_, err = maintenance.DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedBlobsOptions{}, maintenance.SafetyNone)
|
||||
_, err = maintenance.DeleteUnreferencedPacks(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedPacksOptions{}, maintenance.SafetyNone)
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyBlobNotFound(t, env.RepositoryWriter.BlobStorage(), extraBlobID1)
|
||||
@@ -107,7 +107,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
CheckpointTime: ta.NowFunc()(),
|
||||
})
|
||||
|
||||
_, err = maintenance.DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedBlobsOptions{}, safetyFastDeleteLongSessionExpiration)
|
||||
_, err = maintenance.DeleteUnreferencedPacks(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedPacksOptions{}, safetyFastDeleteLongSessionExpiration)
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobIDWithSession1)
|
||||
@@ -119,7 +119,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
// now finish session 2
|
||||
env.RepositoryWriter.BlobStorage().DeleteBlob(ctx, session2Marker)
|
||||
|
||||
_, err = maintenance.DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedBlobsOptions{}, safetyFastDeleteLongSessionExpiration)
|
||||
_, err = maintenance.DeleteUnreferencedPacks(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedPacksOptions{}, safetyFastDeleteLongSessionExpiration)
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyBlobExists(t, env.RepositoryWriter.BlobStorage(), extraBlobIDWithSession1)
|
||||
@@ -131,7 +131,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
|
||||
// now move time into the future making session 1 timed out
|
||||
ta.Advance(7 * 24 * time.Hour)
|
||||
|
||||
_, err = maintenance.DeleteUnreferencedBlobs(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedBlobsOptions{}, maintenance.SafetyFull)
|
||||
_, err = maintenance.DeleteUnreferencedPacks(ctx, env.RepositoryWriter, maintenance.DeleteUnreferencedPacksOptions{}, maintenance.SafetyFull)
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyBlobNotFound(t, env.RepositoryWriter.BlobStorage(), extraBlobIDWithSession1)
|
||||
Reference in New Issue
Block a user