From 4b4628a21ee2c8f6a550466833e8233c48d2b7be Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 14 Apr 2020 00:11:41 -0700 Subject: [PATCH] Repository maintenance support (#411) Maintenance: support for automatic GC Moved maintenance algorithms from 'cli' to 'repo/maintenance' package Added support for CLI commands: kopia gc - performs quick maintenance kopia gc --full- perform full maintenance Full maintenance performs snapshot gc, but it's not safe to do this automatically possibly in parallel to snapshots being taken. This will be addressed ~0.7 timeframe. --- Makefile | 13 +- cli/app.go | 55 +++- cli/command_blob_gc.go | 84 +----- cli/command_content_rewrite.go | 171 +----------- cli/command_maintenance_info.go | 54 ++++ cli/command_maintenance_run.go | 37 +++ cli/command_maintenance_set.go | 134 ++++++++++ cli/command_repository_create.go | 8 + cli/command_snapshot_gc.go | 5 +- go.mod | 1 + go.sum | 4 + internal/server/api_repo.go | 10 + repo/maintenance/blob_gc.go | 109 ++++++++ repo/maintenance/content_rewrite.go | 208 +++++++++++++++ repo/maintenance/drop_deleted_contents.go | 35 +++ repo/maintenance/maintenance_params.go | 118 +++++++++ repo/maintenance/maintenance_run.go | 247 ++++++++++++++++++ repo/maintenance/maintenance_schedule.go | 49 ++++ repo/repository.go | 15 ++ snapshot/gc/gc.go | 8 +- snapshot/snapshotfs/upload.go | 2 +- tests/end_to_end_test/snapshot_delete_test.go | 27 +- 22 files changed, 1122 insertions(+), 272 deletions(-) create mode 100644 cli/command_maintenance_info.go create mode 100644 cli/command_maintenance_run.go create mode 100644 cli/command_maintenance_set.go create mode 100644 repo/maintenance/blob_gc.go create mode 100644 repo/maintenance/content_rewrite.go create mode 100644 repo/maintenance/drop_deleted_contents.go create mode 100644 repo/maintenance/maintenance_params.go create mode 100644 repo/maintenance/maintenance_run.go create mode 100644 repo/maintenance/maintenance_schedule.go diff --git a/Makefile b/Makefile index 38909da6a..268f1853d 100644 --- a/Makefile +++ b/Makefile @@ -191,18 +191,7 @@ ifneq ($(uname),Windows) # whitelisted internal packages. find repo/ -name '*.go' | xargs grep "^\t\"github.com/kopia/kopia" \ | grep -v -e github.com/kopia/kopia/repo \ - -e github.com/kopia/kopia/internal/retry \ - -e github.com/kopia/kopia/internal/buf \ - -e github.com/kopia/kopia/internal/throttle \ - -e github.com/kopia/kopia/internal/iocopy \ - -e github.com/kopia/kopia/internal/gather \ - -e github.com/kopia/kopia/internal/blobtesting \ - -e github.com/kopia/kopia/internal/repotesting \ - -e github.com/kopia/kopia/internal/testlogging \ - -e github.com/kopia/kopia/internal/hmac \ - -e github.com/kopia/kopia/internal/faketime \ - -e github.com/kopia/kopia/internal/testutil \ - -e github.com/kopia/kopia/internal/ctxutil \ + -e github.com/kopia/kopia/internal \ -e github.com/kopia/kopia/issues && exit 1 || echo repo/ layering ok endif diff --git a/cli/app.go b/cli/app.go index c9ef08b47..030f1cf6f 100644 --- a/cli/app.go +++ b/cli/app.go @@ -15,6 +15,8 @@ "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" + "github.com/kopia/kopia/repo/maintenance" + "github.com/kopia/kopia/snapshot/gc" ) var log = logging.GetContextLoggerFunc("kopia/cli") @@ -28,18 +30,21 @@ var ( app = kingpin.New("kopia", "Kopia - Online Backup").Author("http://kopia.github.io/") + enableAutomaticMaintenance = app.Flag("auto-maintenance", "Automatic maintenance").Default("true").Hidden().Bool() + _ = app.Flag("help-full", "Show help for all commands, including hidden").Action(helpFullAction).Bool() - repositoryCommands = app.Command("repository", "Commands to manipulate repository.").Alias("repo") - cacheCommands = app.Command("cache", "Commands to manipulate local cache").Hidden() - snapshotCommands = app.Command("snapshot", "Commands to manipulate snapshots.").Alias("snap") - policyCommands = app.Command("policy", "Commands to manipulate snapshotting policies.").Alias("policies") - serverCommands = app.Command("server", "Commands to control HTTP API server.") - manifestCommands = app.Command("manifest", "Low-level commands to manipulate manifest items.").Hidden() - contentCommands = app.Command("content", "Commands to manipulate content in repository.").Alias("contents").Hidden() - blobCommands = app.Command("blob", "Commands to manipulate BLOBs.").Hidden() - indexCommands = app.Command("index", "Commands to manipulate content index.").Hidden() - benchmarkCommands = app.Command("benchmark", "Commands to test performance of algorithms.").Hidden() + repositoryCommands = app.Command("repository", "Commands to manipulate repository.").Alias("repo") + cacheCommands = app.Command("cache", "Commands to manipulate local cache").Hidden() + snapshotCommands = app.Command("snapshot", "Commands to manipulate snapshots.").Alias("snap") + policyCommands = app.Command("policy", "Commands to manipulate snapshotting policies.").Alias("policies") + serverCommands = app.Command("server", "Commands to control HTTP API server.") + manifestCommands = app.Command("manifest", "Low-level commands to manipulate manifest items.").Hidden() + contentCommands = app.Command("content", "Commands to manipulate content in repository.").Alias("contents").Hidden() + blobCommands = app.Command("blob", "Commands to manipulate BLOBs.").Hidden() + indexCommands = app.Command("index", "Commands to manipulate content index.").Hidden() + benchmarkCommands = app.Command("benchmark", "Commands to test performance of algorithms.").Hidden() + maintenanceCommands = app.Command("maintenance", "Maintenance commands.").Hidden().Alias("gc") ) func helpFullAction(ctx *kingpin.ParseContext) error { @@ -139,16 +144,46 @@ func maybeRepositoryAction(act func(ctx context.Context, rep repo.Repository) er } err = act(ctx, rep) + + if err == nil && rep != nil { + err = maybeRunMaintenance(ctx, rep) + } + if rep != nil && required { if cerr := rep.Close(ctx); cerr != nil { return errors.Wrap(cerr, "unable to close repository") } } + return err }) } } +func maybeRunMaintenance(ctx context.Context, rep repo.Repository) error { + if !*enableAutomaticMaintenance { + return nil + } + + dr, ok := rep.(*repo.DirectRepository) + if !ok { + return nil + } + + // run maintenance if scheduled. + return maintenance.RunExclusive(ctx, dr, maintenance.ModeAuto, + func(runParams maintenance.RunParameters) error { + // run snapshot GC before full maintenance + if runParams.Mode == maintenance.ModeFull { + if _, err := gc.Run(ctx, dr, runParams.Params.SnapshotGC, true); err != nil { + return errors.Wrap(err, "snapshot GC failure") + } + } + + return maintenance.Run(ctx, runParams) + }) +} + // App returns an instance of command-line application object. func App() *kingpin.Application { return app diff --git a/cli/command_blob_gc.go b/cli/command_blob_gc.go index 94766859f..9ea512a79 100644 --- a/cli/command_blob_gc.go +++ b/cli/command_blob_gc.go @@ -2,15 +2,10 @@ import ( "context" - "time" - "github.com/pkg/errors" - "golang.org/x/sync/errgroup" - - "github.com/kopia/kopia/internal/stats" - "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/maintenance" ) var ( @@ -22,81 +17,24 @@ ) func runBlobGarbageCollectCommand(ctx context.Context, rep *repo.DirectRepository) error { - const deleteQueueSize = 100 - - var unreferenced, deleted stats.CountSum - - var eg errgroup.Group - - unused := make(chan blob.Metadata, deleteQueueSize) - - if *blobGarbageCollectCommandDelete == "yes" { - // start goroutines to delete blobs as they come. - for i := 0; i < *blobGarbageCollectParallel; i++ { - eg.Go(func() error { - for bm := range unused { - if err := rep.Blobs.DeleteBlob(ctx, bm.BlobID); err != nil { - return errors.Wrapf(err, "unable to delete blob %q", bm.BlobID) - } - cnt, del := deleted.Add(bm.Length) - if cnt%100 == 0 { - printStderr(" deleted %v unreferenced blobs (%v)\n", cnt, units.BytesStringBase10(del)) - } - } - - return nil - }) - } + opts := maintenance.DeleteUnreferencedBlobsOptions{ + DryRun: *blobGarbageCollectCommandDelete != "yes", + MinAge: *blobGarbageCollectMinAge, + Parallel: *blobGarbageCollectParallel, + Prefix: blob.ID(*blobGarbageCollectPrefix), } - // iterate unreferenced blobs and count them + optionally send to the channel to be deleted - printStderr("Looking for unreferenced blobs...\n") + n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts) - var prefixes []blob.ID - if p := *blobGarbageCollectPrefix; p != "" { - prefixes = append(prefixes, blob.ID(p)) - } - - if err := rep.Content.IterateUnreferencedBlobs(ctx, prefixes, *blobGarbageCollectParallel, func(bm blob.Metadata) error { - if age := time.Since(bm.Timestamp); age < *blobGarbageCollectMinAge { - printStderr(" preserving %v because it's too new (age: %v)\n", bm.BlobID, age) - return nil - } - - unreferenced.Add(bm.Length) - - if *blobGarbageCollectCommandDelete == "yes" { - unused <- bm - } - - return nil - }); err != nil { - return errors.Wrap(err, "error looking for unreferenced blobs") - } - - close(unused) - - unreferencedCount, unreferencedSize := unreferenced.Approximate() - printStderr("Found %v blobs to delete (%v)\n", unreferencedCount, units.BytesStringBase10(unreferencedSize)) - - // wait for all delete workers to finish. - if err := eg.Wait(); err != nil { + if err != nil { return err } - if *blobGarbageCollectCommandDelete != "yes" { - if unreferencedCount > 0 { - printStderr("Pass --delete=yes to delete.\n") - } - - return nil + if opts.DryRun && n > 0 { + printStderr("Pass --delete=yes to delete.\n") } - del, cnt := deleted.Approximate() - - printStderr("Deleted total %v unreferenced blobs (%v)\n", del, units.BytesStringBase10(cnt)) - - return nil + return err } func init() { diff --git a/cli/command_content_rewrite.go b/cli/command_content_rewrite.go index 5fc230e55..e31e19a40 100644 --- a/cli/command_content_rewrite.go +++ b/cli/command_content_rewrite.go @@ -2,16 +2,11 @@ import ( "context" - "strings" - "sync" - "time" - "github.com/pkg/errors" - - "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/maintenance" ) var ( @@ -26,102 +21,17 @@ contentRewriteMinAge = contentRewriteCommand.Flag("min-age", "Only rewrite contents above given age").Default("1h").Duration() ) -const shortPackThresholdPercent = 60 // blocks below 60% of max block size are considered to be 'short - -type contentInfoOrError struct { - content.Info - err error -} - func runContentRewriteCommand(ctx context.Context, rep *repo.DirectRepository) error { - cnt := getContentToRewrite(ctx, rep) - - var ( - mu sync.Mutex - totalBytes int64 - failedCount int - ) - - var wg sync.WaitGroup - - for i := 0; i < *contentRewriteParallelism; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - - for c := range cnt { - if c.err != nil { - log(ctx).Errorf("got error: %v", c.err) - mu.Lock() - failedCount++ - mu.Unlock() - - return - } - - var optDeleted string - if c.Deleted { - optDeleted = " (deleted)" - } - - if age := time.Since(c.Timestamp()); age < *contentRewriteMinAge { - printStderr("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.\n", c.ID, c.Length, c.PackBlobID, optDeleted, formatTimestamp(c.Timestamp())) - continue - } - - printStderr("Rewriting content %v (%v bytes) from pack %v%v %v\n", c.ID, c.Length, c.PackBlobID, optDeleted, formatTimestamp(c.Timestamp())) - mu.Lock() - totalBytes += int64(c.Length) - mu.Unlock() - - if *contentRewriteDryRun { - continue - } - - if err := rep.Content.RewriteContent(ctx, c.ID); err != nil { - log(ctx).Warningf("unable to rewrite content %q: %v", c.ID, err) - mu.Lock() - failedCount++ - mu.Unlock() - } - } - }() - } - - wg.Wait() - - printStderr("Total bytes rewritten %v\n", units.BytesStringBase10(totalBytes)) - - if failedCount == 0 { - return nil - } - - return errors.Errorf("failed to rewrite %v contents", failedCount) -} - -func getContentToRewrite(ctx context.Context, rep *repo.DirectRepository) <-chan contentInfoOrError { - ch := make(chan contentInfoOrError) - - go func() { - defer close(ch) - - // get content IDs listed on command line - findContentInfos(ctx, rep, ch, toContentIDs(*contentRewriteIDs)) - - // add all content IDs from short packs - if *contentRewriteShortPacks { - threshold := int64(rep.Content.Format.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd - findContentInShortPacks(ctx, rep, ch, threshold) - } - - // add all blocks with given format version - if *contentRewriteFormatVersion != -1 { - findContentWithFormatVersion(ctx, rep, ch, *contentRewriteFormatVersion) - } - }() - - return ch + return maintenance.RewriteContents(ctx, rep, &maintenance.RewriteContentsOptions{ + ContentIDRange: contentIDRange(), + ContentIDs: toContentIDs(*contentRewriteIDs), + FormatVersion: *contentRewriteFormatVersion, + MinAge: *contentRewriteMinAge, + PackPrefix: blob.ID(*contentRewritePackPrefix), + Parallel: *contentRewriteParallelism, + ShortPacks: *contentRewriteShortPacks, + DryRun: *contentRewriteDryRun, + }) } func toContentIDs(s []string) []content.ID { @@ -133,65 +43,6 @@ func toContentIDs(s []string) []content.ID { return result } -func findContentInfos(ctx context.Context, rep *repo.DirectRepository, ch chan contentInfoOrError, contentIDs []content.ID) { - for _, contentID := range contentIDs { - i, err := rep.Content.ContentInfo(ctx, contentID) - if err != nil { - ch <- contentInfoOrError{err: errors.Wrapf(err, "unable to get info for content %q", contentID)} - } else { - ch <- contentInfoOrError{Info: i} - } - } -} - -func findContentWithFormatVersion(ctx context.Context, rep *repo.DirectRepository, ch chan contentInfoOrError, version int) { - _ = rep.Content.IterateContents( - ctx, - content.IterateOptions{ - Range: contentIDRange(), - IncludeDeleted: true, - }, - func(b content.Info) error { - if int(b.FormatVersion) == version && strings.HasPrefix(string(b.PackBlobID), *contentRewritePackPrefix) { - ch <- contentInfoOrError{Info: b} - } - return nil - }) -} - -func findContentInShortPacks(ctx context.Context, rep *repo.DirectRepository, ch chan contentInfoOrError, threshold int64) { - var prefixes []blob.ID - - if *contentRewritePackPrefix != "" { - prefixes = append(prefixes, blob.ID(*contentRewritePackPrefix)) - } - - err := rep.Content.IteratePacks( - ctx, - content.IteratePackOptions{ - Prefixes: prefixes, - IncludePacksWithOnlyDeletedContent: true, - IncludeContentInfos: true, - }, - func(pi content.PackInfo) error { - if pi.TotalSize >= threshold { - return nil - } - - for _, ci := range pi.ContentInfos { - ch <- contentInfoOrError{Info: ci} - } - - return nil - }, - ) - - if err != nil { - ch <- contentInfoOrError{err: err} - return - } -} - func init() { contentRewriteCommand.Action(directRepositoryAction(runContentRewriteCommand)) setupContentIDRangeFlags(contentRewriteCommand) diff --git a/cli/command_maintenance_info.go b/cli/command_maintenance_info.go new file mode 100644 index 000000000..1b7472990 --- /dev/null +++ b/cli/command_maintenance_info.go @@ -0,0 +1,54 @@ +package cli + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/maintenance" +) + +var ( + maintenanceInfoCommand = maintenanceCommands.Command("info", "Display maintenance information").Alias("status") +) + +func runMaintenanceInfoCommand(ctx context.Context, rep *repo.DirectRepository) error { + p, err := maintenance.GetParams(ctx, rep) + if err != nil { + return errors.Wrap(err, "unable to get maintenance params") + } + + s, err := maintenance.GetSchedule(ctx, rep) + if err != nil { + return errors.Wrap(err, "unable to get maintenance schedule") + } + + printStderr("Owner: %v\n", p.Owner) + printStderr("Quick Cycle:\n") + displayCycleInfo(&p.QuickCycle, s.NextQuickMaintenanceTime, rep) + + printStderr("Full Cycle:\n") + displayCycleInfo(&p.FullCycle, s.NextFullMaintenanceTime, rep) + + return nil +} + +func displayCycleInfo(c *maintenance.CycleParams, t time.Time, rep *repo.DirectRepository) { + printStderr(" scheduled: %v\n", c.Enabled) + + if c.Enabled { + printStderr(" interval: %v\n", c.Interval) + + if rep.Time().Before(t) { + printStderr(" next run: %v (in %v)\n", formatTimestamp(t), time.Until(t).Truncate(time.Second)) + } else { + printStderr(" next run: now\n") + } + } +} + +func init() { + maintenanceInfoCommand.Action(directRepositoryAction(runMaintenanceInfoCommand)) +} diff --git a/cli/command_maintenance_run.go b/cli/command_maintenance_run.go new file mode 100644 index 000000000..b35b670aa --- /dev/null +++ b/cli/command_maintenance_run.go @@ -0,0 +1,37 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/maintenance" + "github.com/kopia/kopia/snapshot/gc" +) + +var ( + maintenanceRunCommand = maintenanceCommands.Command("run", "Run repository maintenance").Default() + maintenanceRunFull = maintenanceRunCommand.Flag("full", "Full maintenance").Bool() +) + +func runMaintenanceCommand(ctx context.Context, rep *repo.DirectRepository) error { + mode := maintenance.ModeQuick + if *maintenanceRunFull { + mode = maintenance.ModeFull + } + + return maintenance.RunExclusive(ctx, rep, mode, func(p maintenance.RunParameters) error { + if p.Mode == maintenance.ModeFull { + if _, err := gc.Run(ctx, rep, p.Params.SnapshotGC, true); err != nil { + return errors.Wrap(err, "error running snapshot GC") + } + } + + return maintenance.Run(ctx, p) + }) +} + +func init() { + maintenanceRunCommand.Action(directRepositoryAction(runMaintenanceCommand)) +} diff --git a/cli/command_maintenance_set.go b/cli/command_maintenance_set.go new file mode 100644 index 000000000..cea87ab7d --- /dev/null +++ b/cli/command_maintenance_set.go @@ -0,0 +1,134 @@ +package cli + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/maintenance" +) + +var ( + maintenanceSetCommand = maintenanceCommands.Command("set", "Set maintenance parameters") + + maintenanceSetOwner = maintenanceSetCommand.Flag("owner", "Set maintenance owner user@hostname").String() + + maintenanceSetEnableQuick = maintenanceSetCommand.Flag("enable-quick", "Enable or disable quick maintenance").BoolList() + maintenanceSetEnableFull = maintenanceSetCommand.Flag("enable-full", "Enable or disable full maintenance").BoolList() + + maintenanceSetQuickFrequency = maintenanceSetCommand.Flag("quick-interval", "Set quick maintenance interval").DurationList() + maintenanceSetFullFrequency = maintenanceSetCommand.Flag("full-interval", "Set full maintenance interval").DurationList() + + maintenanceSetPauseQuick = maintenanceSetCommand.Flag("pause-quick", "Pause quick maintenance for a specified duration").DurationList() + maintenanceSetPauseFull = maintenanceSetCommand.Flag("pause-full", "Pause full maintenance for a specified duration").DurationList() + + maintenanceSetDropDeletedAge = maintenanceSetCommand.Flag("drop-deleted-age", "Drop deleted contents older than").DurationList() +) + +func setMaintenanceOwnerFromFlags(p *maintenance.Params, rep *repo.DirectRepository, changed *bool) { + if v := *maintenanceSetOwner; v != "" { + if v == "me" { + p.Owner = rep.Username() + "@" + rep.Hostname() + } else { + p.Owner = v + } + + *changed = true + + printStderr("Setting maintenance owner to %v\n", p.Owner) + } +} + +func setMaintenanceEnabledAndIntervalFromFlags(c *maintenance.CycleParams, cycleName string, enableFlag []bool, intervalFlag []time.Duration, changed *bool) { + // we use lists to distinguish between flag not set + // Zero elements == not set, more than zero - flag set, in which case we pick the last value + if len(enableFlag) > 0 { + lastVal := enableFlag[len(enableFlag)-1] + c.Enabled = lastVal + *changed = true + + if lastVal { + printStderr("Periodic %v maintenance enabled.\n", cycleName) + } else { + printStderr("Periodic %v maintenance disabled.\n", cycleName) + } + } + + if len(intervalFlag) > 0 { + lastVal := intervalFlag[len(intervalFlag)-1] + c.Interval = lastVal + *changed = true + + printStderr("Interval for %v maintenance set to %v.\n", cycleName, lastVal) + } +} + +func setMaintenanceIndexRewriteParamsFromFlags(p *maintenance.Params, changed *bool) { + if v := *maintenanceSetDropDeletedAge; len(v) > 0 { + lastVal := v[len(v)-1] + + p.DropDeletedContent.MinDeletedAge = lastVal + printStderr("Index rewrite will drop deleted contents older than %v.\n", p.DropDeletedContent.MinDeletedAge) + + *changed = true + } +} + +func runMaintenanceSetParams(ctx context.Context, rep *repo.DirectRepository) error { + p, err := maintenance.GetParams(ctx, rep) + if err != nil { + return errors.Wrap(err, "unable to get current parameters") + } + + s, err := maintenance.GetSchedule(ctx, rep) + if err != nil { + return errors.Wrap(err, "unable to get current parameters") + } + + var changedParams, changedSchedule bool + + setMaintenanceOwnerFromFlags(p, rep, &changedParams) + setMaintenanceEnabledAndIntervalFromFlags(&p.QuickCycle, "quick", *maintenanceSetEnableQuick, *maintenanceSetQuickFrequency, &changedParams) + setMaintenanceEnabledAndIntervalFromFlags(&p.FullCycle, "full", *maintenanceSetEnableFull, *maintenanceSetFullFrequency, &changedParams) + setMaintenanceIndexRewriteParamsFromFlags(p, &changedParams) + + if v := *maintenanceSetPauseQuick; len(v) > 0 { + pauseDuration := v[len(v)-1] + s.NextQuickMaintenanceTime = rep.Time().Add(pauseDuration) + changedSchedule = true + + printStderr("Quick maintenance paused until %v", formatTimestamp(s.NextQuickMaintenanceTime)) + } + + if v := *maintenanceSetPauseFull; len(v) > 0 { + pauseDuration := v[len(v)-1] + s.NextFullMaintenanceTime = rep.Time().Add(pauseDuration) + changedSchedule = true + + printStderr("Full maintenance paused until %v", formatTimestamp(s.NextFullMaintenanceTime)) + } + + if !changedParams && !changedSchedule { + return errors.Errorf("no changes specified") + } + + if changedSchedule { + if err := maintenance.SetSchedule(ctx, rep, s); err != nil { + return errors.Wrap(err, "unable to set schedule") + } + } + + if changedParams { + if err := maintenance.SetParams(ctx, rep, p); err != nil { + return errors.Wrap(err, "unable to set params") + } + } + + return nil +} + +func init() { + maintenanceSetCommand.Action(directRepositoryAction(runMaintenanceSetParams)) +} diff --git a/cli/command_repository_create.go b/cli/command_repository_create.go index 341784315..905cf32d0 100644 --- a/cli/command_repository_create.go +++ b/cli/command_repository_create.go @@ -10,6 +10,7 @@ "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/encryption" "github.com/kopia/kopia/repo/hashing" + "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/repo/splitter" "github.com/kopia/kopia/snapshot/policy" @@ -104,5 +105,12 @@ func populateRepository(ctx context.Context, password string) error { printStdout("To change the policy use:\n kopia policy set --global \n") printStdout("or\n kopia policy set \n") + p := maintenance.DefaultParams() + p.Owner = rep.Username() + "@" + rep.Hostname() + + if err := maintenance.SetParams(ctx, rep, &p); err != nil { + return errors.Wrap(err, "unable to set maintenance params") + } + return nil } diff --git a/cli/command_snapshot_gc.go b/cli/command_snapshot_gc.go index a94692bbc..10a88895c 100644 --- a/cli/command_snapshot_gc.go +++ b/cli/command_snapshot_gc.go @@ -5,6 +5,7 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/snapshot/gc" ) @@ -15,7 +16,9 @@ ) func runSnapshotGCCommand(ctx context.Context, rep *repo.DirectRepository) error { - st, err := gc.Run(ctx, rep, *snapshotGCMinContentAge, *snapshotGCDelete) + st, err := gc.Run(ctx, rep, maintenance.SnapshotGCParams{ + MinContentAge: *snapshotGCMinContentAge, + }, *snapshotGCDelete) log(ctx).Infof("GC found %v unused contents (%v bytes)", st.UnusedCount, units.BytesStringBase2(st.UnusedBytes)) log(ctx).Infof("GC found %v unused contents that are too recent to delete (%v bytes)", st.TooRecentCount, units.BytesStringBase2(st.TooRecentBytes)) diff --git a/go.mod b/go.mod index a2b61d822..78801741d 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/chmduquesne/rollinghash v4.0.0+incompatible github.com/efarrer/iothrottler v0.0.1 github.com/fatih/color v1.9.0 + github.com/gofrs/flock v0.7.1 github.com/golang/protobuf v1.3.5 github.com/google/fswalker v0.2.0 github.com/google/readahead v0.0.0-20161222183148-eaceba169032 // indirect diff --git a/go.sum b/go.sum index a9b4a3843..a3f899bc6 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/godbus/dbus v4.1.0+incompatible h1:WqqLRTsQic3apZUK9qC5sGNfXthmPXzUZ7nQPrNITa4= github.com/godbus/dbus v4.1.0+incompatible/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= +github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= +github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -290,6 +292,8 @@ github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfE github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b h1:FQ7+9fxhyp82ks9vAuyPzG0/vVbWwMwLJ+P6yJI5FN8= +github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b/go.mod h1:HMcgvsgd0Fjj4XXDkbjdmlbI505rUPBs6WBMYg2pXks= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 4a69ee976..ae721a0ac 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -14,6 +14,7 @@ "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/encryption" "github.com/kopia/kopia/repo/hashing" + "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/repo/splitter" "github.com/kopia/kopia/snapshot/policy" ) @@ -93,6 +94,15 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa return nil, internalServerError(errors.Wrap(err, "set global policy")) } + if dr, ok := s.rep.(*repo.DirectRepository); ok { + p := maintenance.DefaultParams() + p.Owner = s.rep.Username() + "@" + s.rep.Hostname() + + if err := maintenance.SetParams(ctx, dr, &p); err != nil { + return nil, internalServerError(errors.Wrap(err, "unable to set maintenance params")) + } + } + if err := s.rep.Flush(ctx); err != nil { return nil, internalServerError(errors.Wrap(err, "flush")) } diff --git a/repo/maintenance/blob_gc.go b/repo/maintenance/blob_gc.go new file mode 100644 index 000000000..1c2dd460e --- /dev/null +++ b/repo/maintenance/blob_gc.go @@ -0,0 +1,109 @@ +package maintenance + +import ( + "context" + "time" + + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/stats" + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/repo/blob" +) + +// defaultBlobGCMinAge is a default MinAge for blob GC. +// We're setting it to 2 hours to accommodate the fact that every repository +// will periodically flush its indexes more frequently than 1/hour. +const defaultBlobGCMinAge = 2 * time.Hour + +// DeleteUnreferencedBlobsOptions provides option for blob garbage collection algorithm. +type DeleteUnreferencedBlobsOptions struct { + Parallel int + Prefix blob.ID + MinAge time.Duration + DryRun bool +} + +// DeleteUnreferencedBlobs deletes old blobs that are no longer referenced by index entries. +func DeleteUnreferencedBlobs(ctx context.Context, rep MaintainableRepository, opt DeleteUnreferencedBlobsOptions) (int, error) { + if opt.Parallel == 0 { + opt.Parallel = 16 + } + + if opt.MinAge == 0 { + opt.MinAge = defaultBlobGCMinAge + } + + const deleteQueueSize = 100 + + var unreferenced, deleted stats.CountSum + + var eg errgroup.Group + + unused := make(chan blob.Metadata, deleteQueueSize) + + if !opt.DryRun { + // start goroutines to delete blobs as they come. + for i := 0; i < opt.Parallel; i++ { + eg.Go(func() error { + for bm := range unused { + if err := rep.BlobStorage().DeleteBlob(ctx, bm.BlobID); err != nil { + return errors.Wrapf(err, "unable to delete blob %q", bm.BlobID) + } + cnt, del := deleted.Add(bm.Length) + if cnt%100 == 0 { + log(ctx).Infof(" deleted %v unreferenced blobs (%v)", cnt, units.BytesStringBase10(del)) + } + } + + return nil + }) + } + } + + // iterate unreferenced blobs and count them + optionally send to the channel to be deleted + log(ctx).Infof("Looking for unreferenced blobs...") + + var prefixes []blob.ID + if p := opt.Prefix; p != "" { + prefixes = append(prefixes, p) + } + + if err := rep.ContentManager().IterateUnreferencedBlobs(ctx, prefixes, opt.Parallel, func(bm blob.Metadata) error { + if age := rep.Time().Sub(bm.Timestamp); age < opt.MinAge { + log(ctx).Debugf(" preserving %v because it's too new (age: %v)", bm.BlobID, age) + return nil + } + + unreferenced.Add(bm.Length) + + if !opt.DryRun { + unused <- bm + } + + return nil + }); err != nil { + return 0, errors.Wrap(err, "error looking for unreferenced blobs") + } + + close(unused) + + unreferencedCount, unreferencedSize := unreferenced.Approximate() + log(ctx).Debugf("Found %v blobs to delete (%v)", unreferencedCount, units.BytesStringBase10(unreferencedSize)) + + // wait for all delete workers to finish. + if err := eg.Wait(); err != nil { + return 0, err + } + + if opt.DryRun { + return int(unreferencedCount), nil + } + + del, cnt := deleted.Approximate() + + log(ctx).Infof("Deleted total %v unreferenced blobs (%v)", del, units.BytesStringBase10(cnt)) + + return int(del), nil +} diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go new file mode 100644 index 000000000..20a44409c --- /dev/null +++ b/repo/maintenance/content_rewrite.go @@ -0,0 +1,208 @@ +package maintenance + +import ( + "context" + "runtime" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content" +) + +const defaultRewriteContentsMinAge = 2 * time.Hour + +const parallelContentRewritesCPUMultiplier = 2 + +// RewriteContentsOptions provides options for RewriteContents +type RewriteContentsOptions struct { + Parallel int + MinAge time.Duration + ContentIDs []content.ID + ContentIDRange content.IDRange + PackPrefix blob.ID + ShortPacks bool + FormatVersion int + DryRun bool +} + +const shortPackThresholdPercent = 60 // blocks below 60% of max block size are considered to be 'short + +type contentInfoOrError struct { + content.Info + err error +} + +// RewriteContents rewrites contents according to provided criteria and creates new +// blobs and index entries to point at the +func RewriteContents(ctx context.Context, rep MaintainableRepository, opt *RewriteContentsOptions) error { + if opt == nil { + return errors.Errorf("missing options") + } + + if opt.MinAge == 0 { + opt.MinAge = defaultRewriteContentsMinAge + } + + if opt.ShortPacks { + log(ctx).Infof("Rewriting contents from short packs...") + } else { + log(ctx).Infof("Rewriting contents...") + } + + cnt := getContentToRewrite(ctx, rep, opt) + + var ( + mu sync.Mutex + totalBytes int64 + failedCount int + ) + + if opt.Parallel == 0 { + opt.Parallel = runtime.NumCPU() * parallelContentRewritesCPUMultiplier + } + + var wg sync.WaitGroup + + for i := 0; i < opt.Parallel; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + for c := range cnt { + if c.err != nil { + mu.Lock() + failedCount++ + mu.Unlock() + + return + } + + var optDeleted string + if c.Deleted { + optDeleted = " (deleted)" + } + + age := rep.Time().Sub(c.Timestamp()) + if age < opt.MinAge { + log(ctx).Debugf("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.", c.ID, c.Length, c.PackBlobID, optDeleted, age) + continue + } + + log(ctx).Debugf("Rewriting content %v (%v bytes) from pack %v%v %v", c.ID, c.Length, c.PackBlobID, optDeleted, age) + mu.Lock() + totalBytes += int64(c.Length) + mu.Unlock() + + if opt.DryRun { + continue + } + + if err := rep.ContentManager().RewriteContent(ctx, c.ID); err != nil { + log(ctx).Infof("unable to rewrite content %q: %v", c.ID, err) + mu.Lock() + failedCount++ + mu.Unlock() + } + } + }() + } + + wg.Wait() + + log(ctx).Debugf("Total bytes rewritten %v", units.BytesStringBase10(totalBytes)) + + if failedCount == 0 { + return nil + } + + return errors.Errorf("failed to rewrite %v contents", failedCount) +} + +func getContentToRewrite(ctx context.Context, rep MaintainableRepository, opt *RewriteContentsOptions) <-chan contentInfoOrError { + ch := make(chan contentInfoOrError) + + go func() { + defer close(ch) + + // get content IDs listed on command line + findContentInfos(ctx, rep, ch, opt.ContentIDs) + + // add all content IDs from short packs + if opt.ShortPacks { + threshold := int64(rep.ContentManager().Format.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd + findContentInShortPacks(ctx, rep, ch, threshold, opt) + } + + // add all blocks with given format version + if opt.FormatVersion != 0 { + findContentWithFormatVersion(ctx, rep, ch, opt) + } + }() + + return ch +} + +func findContentInfos(ctx context.Context, rep MaintainableRepository, ch chan contentInfoOrError, contentIDs []content.ID) { + for _, contentID := range contentIDs { + i, err := rep.ContentManager().ContentInfo(ctx, contentID) + if err != nil { + ch <- contentInfoOrError{err: errors.Wrapf(err, "unable to get info for content %q", contentID)} + } else { + ch <- contentInfoOrError{Info: i} + } + } +} + +func findContentWithFormatVersion(ctx context.Context, rep MaintainableRepository, ch chan contentInfoOrError, opt *RewriteContentsOptions) { + _ = rep.ContentManager().IterateContents( + ctx, + content.IterateOptions{ + Range: opt.ContentIDRange, + IncludeDeleted: true, + }, + func(b content.Info) error { + if int(b.FormatVersion) == opt.FormatVersion && strings.HasPrefix(string(b.PackBlobID), string(opt.PackPrefix)) { + ch <- contentInfoOrError{Info: b} + } + return nil + }) +} + +func findContentInShortPacks(ctx context.Context, rep MaintainableRepository, ch chan contentInfoOrError, threshold int64, opt *RewriteContentsOptions) { + var prefixes []blob.ID + + if opt.PackPrefix != "" { + prefixes = append(prefixes, opt.PackPrefix) + } + + err := rep.ContentManager().IteratePacks( + ctx, + content.IteratePackOptions{ + Prefixes: prefixes, + IncludePacksWithOnlyDeletedContent: true, + IncludeContentInfos: true, + }, + func(pi content.PackInfo) error { + if pi.TotalSize >= threshold { + return nil + } + + for _, ci := range pi.ContentInfos { + ch <- contentInfoOrError{Info: ci} + } + + return nil + }, + ) + + if err != nil { + ch <- contentInfoOrError{err: err} + return + } +} diff --git a/repo/maintenance/drop_deleted_contents.go b/repo/maintenance/drop_deleted_contents.go new file mode 100644 index 000000000..d7d2c7dda --- /dev/null +++ b/repo/maintenance/drop_deleted_contents.go @@ -0,0 +1,35 @@ +package maintenance + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/content" +) + +const defaultDropDeletedContentsAge = 24 * time.Hour + +// DropDeletedContentOptions specifies options for DropDeletedContent maintenance. +type DropDeletedContentOptions struct { + MinDeletedAge time.Duration `json:"minDeletedAge"` +} + +// DropDeletedContents rewrites indexes while dropping deleted contents above certain age. +func DropDeletedContents(ctx context.Context, rep MaintainableRepository, opt *DropDeletedContentOptions) error { + if opt == nil { + return errors.Errorf("options must be set") + } + + if opt.MinDeletedAge <= 0 { + opt.MinDeletedAge = defaultDropDeletedContentsAge + } + + log(ctx).Infof("Dropping deleted contents older than %v", opt.MinDeletedAge) + + return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{ + AllIndexes: true, + SkipDeletedOlderThan: opt.MinDeletedAge, + }) +} diff --git a/repo/maintenance/maintenance_params.go b/repo/maintenance/maintenance_params.go new file mode 100644 index 000000000..5ed89d4a8 --- /dev/null +++ b/repo/maintenance/maintenance_params.go @@ -0,0 +1,118 @@ +package maintenance + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/manifest" +) + +var manifestLabels = map[string]string{ + "type": "maintenance", +} + +// Params is a JSON-serialized maintenance configuration stored in a repository. +type Params struct { + Owner string `json:"owner"` + + QuickCycle CycleParams `json:"quick"` + FullCycle CycleParams `json:"full"` + + DropDeletedContent DropDeletedContentOptions `json:"dropDeletedContent"` + + SnapshotGC SnapshotGCParams `json:"snapshotGC"` +} + +// SnapshotGCParams contains parameters for Snapshot Garbage Collection +// NOTE: Due to layering, the implementation of Snapshot GC is outside of repository package +// but for simplicity we store it here. +type SnapshotGCParams struct { + MinContentAge time.Duration `json:"minAge"` +} + +// DefaultParams represents default values of maintenance parameters. +func DefaultParams() Params { + return Params{ + FullCycle: CycleParams{ + // TODO: enable this when ready for public consumption + // Enabled: true, + Interval: 7 * 24 * time.Hour, + }, + QuickCycle: CycleParams{ + // TODO: enable this when ready for public consumption + // Enabled: true, + Interval: 1 * time.Hour, + }, + DropDeletedContent: DropDeletedContentOptions{ + MinDeletedAge: 1 * time.Hour, + }, + SnapshotGC: SnapshotGCParams{ + MinContentAge: 24 * time.Hour, //nolint:gomnd + }, + } +} + +// CycleParams specifies parameters for a maintenance cycle (quick or full). +type CycleParams struct { + Enabled bool `json:"enabled"` + Interval time.Duration `json:"interval"` +} + +// GetParams returns repository-wide maintenance parameters. +func GetParams(ctx context.Context, rep MaintainableRepository) (*Params, error) { + md, err := manifestIDs(ctx, rep) + if err != nil { + return nil, err + } + + if len(md) == 0 { + // not found, return empty params + p := DefaultParams() + return &p, nil + } + + // arbitrality pick first pick ID to return in case there's more than one + // this is possible when two repository clients independently create manifests at approximately the same time + // so it should not really matter which one we pick. + // see https://github.com/kopia/kopia/issues/391 + manifestID := md[0].ID + + p := &Params{} + if _, err := rep.GetManifest(ctx, manifestID, p); err != nil { + return nil, errors.Wrap(err, "error loading manifest") + } + + return p, nil +} + +// SetParams sets the maintenance parameters. +func SetParams(ctx context.Context, rep MaintainableRepository, par *Params) error { + md, err := manifestIDs(ctx, rep) + if err != nil { + return err + } + + if _, err := rep.PutManifest(ctx, manifestLabels, par); err != nil { + return errors.Wrap(err, "put manifest") + } + + for _, m := range md { + if err := rep.DeleteManifest(ctx, m.ID); err != nil { + return errors.Wrap(err, "delete manifest") + } + } + + return nil +} + +func manifestIDs(ctx context.Context, rep MaintainableRepository) ([]*manifest.EntryMetadata, error) { + md, err := rep.FindManifests(ctx, manifestLabels) + + if err != nil { + return nil, errors.Wrap(err, "error looking for maintenance manifest") + } + + return md, err +} diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go new file mode 100644 index 000000000..402286fea --- /dev/null +++ b/repo/maintenance/maintenance_run.go @@ -0,0 +1,247 @@ +// Package maintenance manages automatic repository maintenance. +package maintenance + +import ( + "context" + "time" + + "github.com/gofrs/flock" + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/logging" + "github.com/kopia/kopia/repo/manifest" +) + +var log = logging.GetContextLoggerFunc("maintenance") + +// Mode describes the mode of maintenance to perfor +type Mode string + +// MaintainableRepository is a subset of Repository required for maintenance tasks. +type MaintainableRepository interface { + Username() string + Hostname() string + Time() time.Time + ConfigFilename() string + + BlobStorage() blob.Storage + ContentManager() *content.Manager + + GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) + PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) + FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) + DeleteManifest(ctx context.Context, id manifest.ID) error +} + +// Supported maintenance modes +const ( + ModeNone Mode = "none" + ModeQuick Mode = "quick" + ModeFull Mode = "full" + ModeAuto Mode = "auto" // run either quick of full if required by schedule +) + +// shouldRun returns Mode if repository is due for periodic maintenance. +func shouldRun(ctx context.Context, rep MaintainableRepository, p *Params) (Mode, error) { + if myUsername := rep.Username() + "@" + rep.Hostname(); p.Owner != myUsername { + log(ctx).Debugf("maintenance owned by another user '%v'", p.Owner) + return ModeNone, nil + } + + s, err := GetSchedule(ctx, rep) + if err != nil { + return ModeNone, errors.Wrap(err, "error getting status") + } + + // check full cycle first, as it does more than the quick cycle + if p.FullCycle.Enabled { + if rep.Time().After(s.NextFullMaintenanceTime) { + log(ctx).Debugf("due for full manintenance cycle") + return ModeFull, nil + } + + log(ctx).Debugf("not due for full manintenance cycle until %v", s.NextFullMaintenanceTime) + } else { + log(ctx).Debugf("full manintenance cycle not enabled") + } + + // no time for full cycle, check quick cycle + if p.QuickCycle.Enabled { + if rep.Time().After(s.NextQuickMaintenanceTime) { + log(ctx).Debugf("due for quick manintenance cycle") + return ModeQuick, nil + } + + log(ctx).Debugf("not due for quick manintenance cycle until %v", s.NextQuickMaintenanceTime) + } else { + log(ctx).Debugf("quick manintenance cycle not enabled") + } + + return ModeNone, nil +} + +func updateSchedule(ctx context.Context, runParams RunParameters) error { + rep := runParams.rep + p := runParams.Params + + s, err := GetSchedule(ctx, rep) + if err != nil { + return errors.Wrap(err, "error getting schedule") + } + + switch runParams.Mode { + case ModeFull: + // on full cycle, also update the quick cycle + s.NextFullMaintenanceTime = rep.Time().Add(p.FullCycle.Interval) + s.NextQuickMaintenanceTime = s.NextFullMaintenanceTime.Add(p.QuickCycle.Interval) + log(ctx).Debugf("scheduling next full cycle at %v", s.NextFullMaintenanceTime) + log(ctx).Debugf("scheduling next quick cycle at %v", s.NextQuickMaintenanceTime) + + return SetSchedule(ctx, rep, s) + + case ModeQuick: + log(ctx).Debugf("scheduling next quick cycle at %v", s.NextQuickMaintenanceTime) + s.NextQuickMaintenanceTime = rep.Time().Add(p.QuickCycle.Interval) + + return SetSchedule(ctx, rep, s) + + default: + return nil + } +} + +// RunParameters passes essential parameters for maintenance. +// It is generated by RunExclusive and can't be create outside of its package and +// is required to ensure all maintenance tasks run under an exclusive lock. +type RunParameters struct { + rep MaintainableRepository + + Mode Mode + + Params *Params +} + +// RunExclusive runs the provided callback if the maintenance is owned by local user and +// lock can be acquired. Lock is passed to the function, which ensures that every call to Run() +// is within the exclusive context. +func RunExclusive(ctx context.Context, rep MaintainableRepository, mode Mode, cb func(runParams RunParameters) error) error { + p, err := GetParams(ctx, rep) + if err != nil { + return errors.Wrap(err, "unable to get maintenance params") + } + + if myUsername := rep.Username() + "@" + rep.Hostname(); p.Owner != myUsername { + log(ctx).Debugf("maintenance owned by another user '%v'", p.Owner) + return nil + } + + if mode == ModeAuto { + mode, err = shouldRun(ctx, rep, p) + if err != nil { + return errors.Wrap(err, "unable to determine if maintenance is required") + } + } + + if mode == ModeNone { + log(ctx).Debugf("not due for maintenance") + return nil + } + + runParams := RunParameters{rep, mode, p} + + // update schedule so that we don't run the maintenance again immediately if + // this process crashes. + if err = updateSchedule(ctx, runParams); err != nil { + return errors.Wrap(err, "error updating maintenance schedule") + } + + lockFile := rep.ConfigFilename() + ".mlock" + log(ctx).Debugf("Acquiring maintenance lock in file %v", lockFile) + + // acquire local lock on a config file + l := flock.New(lockFile) + + ok, err := l.TryLock() + if err != nil { + return errors.Wrap(err, "error acquiring maintenance lock") + } + + if !ok { + log(ctx).Debugf("maintenance is already in progress locally") + return nil + } + + defer l.Unlock() //nolint:errcheck + + log(ctx).Infof("Running %v maintenance...", runParams.Mode) + defer log(ctx).Infof("Finished %v maintenance.", runParams.Mode) + + return cb(runParams) +} + +// Run performs maintenance activities for a repository. +func Run(ctx context.Context, runParams RunParameters) error { + switch runParams.Mode { + case ModeQuick: + return runQuickMaintenance(ctx, runParams) + + case ModeFull: + return runFullMaintenance(ctx, runParams) + + default: + return errors.Errorf("unknown mode %q", runParams.Mode) + } +} + +func runQuickMaintenance(ctx context.Context, runParams RunParameters) error { + // rewrite indexes by dropping content entries that have been marked + // as deleted for a long time + if err := DropDeletedContents(ctx, runParams.rep, &runParams.Params.DropDeletedContent); err != nil { + return errors.Wrap(err, "error dropping deleted contents") + } + + // find 'q' packs that are less than 80% full and rewrite contents in them into + // new consolidated packs, orphaning old packs in the process. + if err := RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ + ContentIDRange: content.AllPrefixedIDs, + PackPrefix: content.PackBlobIDPrefixSpecial, + ShortPacks: true, + }); err != nil { + return errors.Wrap(err, "error rewriting metadata contents") + } + + // delete orphaned 'q' packs after some time. + if _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{ + Prefix: content.PackBlobIDPrefixSpecial, + }); err != nil { + return errors.Wrap(err, "error deleting unreferenced metadata blobs") + } + + return nil +} + +func runFullMaintenance(ctx context.Context, runParams RunParameters) error { + // rewrite indexes by dropping content entries that have been marked + // as deleted for a long time + if err := DropDeletedContents(ctx, runParams.rep, &runParams.Params.DropDeletedContent); err != nil { + return errors.Wrap(err, "error dropping deleted contents") + } + + // find packs that are less than 80% full and rewrite contents in them into + // new consolidated packs, orphaning old packs in the process. + if err := RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{ + ContentIDRange: content.AllIDs, + ShortPacks: true, + }); err != nil { + return errors.Wrap(err, "error rewriting contents in short packs") + } + + // delete orphaned packs after some time. + if _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}); err != nil { + return errors.Wrap(err, "error deleting unreferenced blobs") + } + + return nil +} diff --git a/repo/maintenance/maintenance_schedule.go b/repo/maintenance/maintenance_schedule.go new file mode 100644 index 000000000..84e58f9a2 --- /dev/null +++ b/repo/maintenance/maintenance_schedule.go @@ -0,0 +1,49 @@ +package maintenance + +import ( + "context" + "encoding/json" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" +) + +const maintenanceScheduleBlobID = "kopia.schedule" + +// Schedule keeps track of scheduled maintenance times. +type Schedule struct { + NextFullMaintenanceTime time.Time `json:"nextFullMaintenance"` + NextQuickMaintenanceTime time.Time `json:"nextQuickMaintenance"` +} + +// GetSchedule gets the scheduled maintenance times. +func GetSchedule(ctx context.Context, rep MaintainableRepository) (*Schedule, error) { + v, err := rep.BlobStorage().GetBlob(ctx, maintenanceScheduleBlobID, 0, -1) + if err == blob.ErrBlobNotFound { + return &Schedule{}, nil + } + + if err != nil { + return nil, errors.Wrap(err, "error reading schedule blob") + } + + s := &Schedule{} + if err := json.Unmarshal(v, s); err != nil { + return nil, errors.Wrap(err, "malformed schedule blob") + } + + return s, nil +} + +// SetSchedule updates scheduled maintenance times. +func SetSchedule(ctx context.Context, rep MaintainableRepository, s *Schedule) error { + v, err := json.Marshal(s) + if err != nil { + return errors.Wrap(err, "unable to serialize JSON") + } + + return rep.BlobStorage().PutBlob(ctx, maintenanceScheduleBlobID, gather.FromSlice(v)) +} diff --git a/repo/repository.go b/repo/repository.go index 573e34da3..5d9694767 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -57,6 +57,21 @@ func (r *DirectRepository) Hostname() string { return r.hostname } // Username returns the username that's connect to the repository. func (r *DirectRepository) Username() string { return r.username } +// BlobStorage returns the blob storage. +func (r *DirectRepository) BlobStorage() blob.Storage { + return r.Blobs +} + +// ContentManager returns the content manager. +func (r *DirectRepository) ContentManager() *content.Manager { + return r.Content +} + +// ConfigFilename returns the name of the configuration file +func (r *DirectRepository) ConfigFilename() string { + return r.ConfigFile +} + // OpenObject opens the reader for a given object, returns object.ErrNotFound func (r *DirectRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) { return r.Objects.Open(ctx, id) diff --git a/snapshot/gc/gc.go b/snapshot/gc/gc.go index 1a68554ef..27881294b 100644 --- a/snapshot/gc/gc.go +++ b/snapshot/gc/gc.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "time" "github.com/pkg/errors" @@ -14,13 +13,14 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" + "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/snapshotfs" ) -var log = logging.GetContextLoggerFunc("kopia/snapshot/gc") +var log = logging.GetContextLoggerFunc("snapshotgc") func oidOf(entry fs.Entry) object.ID { return entry.(object.HasObjectID).ObjectID() @@ -75,7 +75,7 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma // Run performs garbage collection on all the snapshots in the repository. // nolint:gocognit -func Run(ctx context.Context, rep *repo.DirectRepository, minContentAge time.Duration, gcDelete bool) (Stats, error) { +func Run(ctx context.Context, rep *repo.DirectRepository, params maintenance.SnapshotGCParams, gcDelete bool) (Stats, error) { var used sync.Map var st Stats @@ -99,7 +99,7 @@ func Run(ctx context.Context, rep *repo.DirectRepository, minContentAge time.Dur return nil } - if rep.Time().Sub(ci.Timestamp()) < minContentAge { + if rep.Time().Sub(ci.Timestamp()) < params.MinContentAge { log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ID, ci.Length, ci.Timestamp()) tooRecent.Add(int64(ci.Length)) return nil diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index bd6f9fdd8..ee648aa05 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -32,7 +32,7 @@ const copyBufferSize = 128 * 1024 -var log = logging.GetContextLoggerFunc("kopia/upload") +var log = logging.GetContextLoggerFunc("snapshotfs") var errCanceled = errors.New("canceled") diff --git a/tests/end_to_end_test/snapshot_delete_test.go b/tests/end_to_end_test/snapshot_delete_test.go index fef7514af..f7ee339b3 100644 --- a/tests/end_to_end_test/snapshot_delete_test.go +++ b/tests/end_to_end_test/snapshot_delete_test.go @@ -252,21 +252,26 @@ func TestSnapshotDeleteTypeCheck(t *testing.T) { e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir) lines := e.RunAndExpectSuccess(t, "manifest", "ls") - if len(lines) != 1 { - t.Fatalf("Expected 1 line global policy output for manifest ls") + if len(lines) != 2 { + t.Fatalf("Expected 2 line global policy + maintenance config output for manifest ls") } - line := lines[0] - fields := strings.Fields(line) - manifestID := fields[0] - typeField := fields[5] + for _, line := range lines { + fields := strings.Fields(line) + manifestID := fields[0] + typeField := fields[5] - typeVal := strings.TrimPrefix(typeField, "type:") - if typeVal != "policy" { - t.Fatalf("Expected global policy manifest on a fresh repo") + typeVal := strings.TrimPrefix(typeField, "type:") + if typeVal == "maintenance" { + continue + } + + if typeVal != "policy" { + t.Fatalf("Expected global policy manifest on a fresh repo") + } + + e.RunAndExpectFailure(t, "snapshot", "delete", manifestID, "--unsafe-ignore-source") } - - e.RunAndExpectFailure(t, "snapshot", "delete", manifestID, "--unsafe-ignore-source") } func TestSnapshotDeleteRestore(t *testing.T) {