Repository maintenance support (#411)

Maintenance: support for automatic GC

Moved maintenance algorithms from 'cli' to 'repo/maintenance' package

Added support for CLI commands:

kopia gc - performs quick maintenance
kopia gc --full- perform full maintenance

Full maintenance performs snapshot gc, but it's not safe to do this automatically possibly in parallel to snapshots being taken. This will be addressed ~0.7 timeframe.
This commit is contained in:
Jarek Kowalski
2020-04-14 00:11:41 -07:00
committed by GitHub
parent 573d10422a
commit 4b4628a21e
22 changed files with 1122 additions and 272 deletions

View File

@@ -191,18 +191,7 @@ ifneq ($(uname),Windows)
# whitelisted internal packages.
find repo/ -name '*.go' | xargs grep "^\t\"github.com/kopia/kopia" \
| grep -v -e github.com/kopia/kopia/repo \
-e github.com/kopia/kopia/internal/retry \
-e github.com/kopia/kopia/internal/buf \
-e github.com/kopia/kopia/internal/throttle \
-e github.com/kopia/kopia/internal/iocopy \
-e github.com/kopia/kopia/internal/gather \
-e github.com/kopia/kopia/internal/blobtesting \
-e github.com/kopia/kopia/internal/repotesting \
-e github.com/kopia/kopia/internal/testlogging \
-e github.com/kopia/kopia/internal/hmac \
-e github.com/kopia/kopia/internal/faketime \
-e github.com/kopia/kopia/internal/testutil \
-e github.com/kopia/kopia/internal/ctxutil \
-e github.com/kopia/kopia/internal \
-e github.com/kopia/kopia/issues && exit 1 || echo repo/ layering ok
endif

View File

@@ -15,6 +15,8 @@
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/gc"
)
var log = logging.GetContextLoggerFunc("kopia/cli")
@@ -28,18 +30,21 @@
var (
app = kingpin.New("kopia", "Kopia - Online Backup").Author("http://kopia.github.io/")
enableAutomaticMaintenance = app.Flag("auto-maintenance", "Automatic maintenance").Default("true").Hidden().Bool()
_ = app.Flag("help-full", "Show help for all commands, including hidden").Action(helpFullAction).Bool()
repositoryCommands = app.Command("repository", "Commands to manipulate repository.").Alias("repo")
cacheCommands = app.Command("cache", "Commands to manipulate local cache").Hidden()
snapshotCommands = app.Command("snapshot", "Commands to manipulate snapshots.").Alias("snap")
policyCommands = app.Command("policy", "Commands to manipulate snapshotting policies.").Alias("policies")
serverCommands = app.Command("server", "Commands to control HTTP API server.")
manifestCommands = app.Command("manifest", "Low-level commands to manipulate manifest items.").Hidden()
contentCommands = app.Command("content", "Commands to manipulate content in repository.").Alias("contents").Hidden()
blobCommands = app.Command("blob", "Commands to manipulate BLOBs.").Hidden()
indexCommands = app.Command("index", "Commands to manipulate content index.").Hidden()
benchmarkCommands = app.Command("benchmark", "Commands to test performance of algorithms.").Hidden()
repositoryCommands = app.Command("repository", "Commands to manipulate repository.").Alias("repo")
cacheCommands = app.Command("cache", "Commands to manipulate local cache").Hidden()
snapshotCommands = app.Command("snapshot", "Commands to manipulate snapshots.").Alias("snap")
policyCommands = app.Command("policy", "Commands to manipulate snapshotting policies.").Alias("policies")
serverCommands = app.Command("server", "Commands to control HTTP API server.")
manifestCommands = app.Command("manifest", "Low-level commands to manipulate manifest items.").Hidden()
contentCommands = app.Command("content", "Commands to manipulate content in repository.").Alias("contents").Hidden()
blobCommands = app.Command("blob", "Commands to manipulate BLOBs.").Hidden()
indexCommands = app.Command("index", "Commands to manipulate content index.").Hidden()
benchmarkCommands = app.Command("benchmark", "Commands to test performance of algorithms.").Hidden()
maintenanceCommands = app.Command("maintenance", "Maintenance commands.").Hidden().Alias("gc")
)
func helpFullAction(ctx *kingpin.ParseContext) error {
@@ -139,16 +144,46 @@ func maybeRepositoryAction(act func(ctx context.Context, rep repo.Repository) er
}
err = act(ctx, rep)
if err == nil && rep != nil {
err = maybeRunMaintenance(ctx, rep)
}
if rep != nil && required {
if cerr := rep.Close(ctx); cerr != nil {
return errors.Wrap(cerr, "unable to close repository")
}
}
return err
})
}
}
func maybeRunMaintenance(ctx context.Context, rep repo.Repository) error {
if !*enableAutomaticMaintenance {
return nil
}
dr, ok := rep.(*repo.DirectRepository)
if !ok {
return nil
}
// run maintenance if scheduled.
return maintenance.RunExclusive(ctx, dr, maintenance.ModeAuto,
func(runParams maintenance.RunParameters) error {
// run snapshot GC before full maintenance
if runParams.Mode == maintenance.ModeFull {
if _, err := gc.Run(ctx, dr, runParams.Params.SnapshotGC, true); err != nil {
return errors.Wrap(err, "snapshot GC failure")
}
}
return maintenance.Run(ctx, runParams)
})
}
// App returns an instance of command-line application object.
func App() *kingpin.Application {
return app

View File

@@ -2,15 +2,10 @@
import (
"context"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/stats"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/maintenance"
)
var (
@@ -22,81 +17,24 @@
)
func runBlobGarbageCollectCommand(ctx context.Context, rep *repo.DirectRepository) error {
const deleteQueueSize = 100
var unreferenced, deleted stats.CountSum
var eg errgroup.Group
unused := make(chan blob.Metadata, deleteQueueSize)
if *blobGarbageCollectCommandDelete == "yes" {
// start goroutines to delete blobs as they come.
for i := 0; i < *blobGarbageCollectParallel; i++ {
eg.Go(func() error {
for bm := range unused {
if err := rep.Blobs.DeleteBlob(ctx, bm.BlobID); err != nil {
return errors.Wrapf(err, "unable to delete blob %q", bm.BlobID)
}
cnt, del := deleted.Add(bm.Length)
if cnt%100 == 0 {
printStderr(" deleted %v unreferenced blobs (%v)\n", cnt, units.BytesStringBase10(del))
}
}
return nil
})
}
opts := maintenance.DeleteUnreferencedBlobsOptions{
DryRun: *blobGarbageCollectCommandDelete != "yes",
MinAge: *blobGarbageCollectMinAge,
Parallel: *blobGarbageCollectParallel,
Prefix: blob.ID(*blobGarbageCollectPrefix),
}
// iterate unreferenced blobs and count them + optionally send to the channel to be deleted
printStderr("Looking for unreferenced blobs...\n")
n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts)
var prefixes []blob.ID
if p := *blobGarbageCollectPrefix; p != "" {
prefixes = append(prefixes, blob.ID(p))
}
if err := rep.Content.IterateUnreferencedBlobs(ctx, prefixes, *blobGarbageCollectParallel, func(bm blob.Metadata) error {
if age := time.Since(bm.Timestamp); age < *blobGarbageCollectMinAge {
printStderr(" preserving %v because it's too new (age: %v)\n", bm.BlobID, age)
return nil
}
unreferenced.Add(bm.Length)
if *blobGarbageCollectCommandDelete == "yes" {
unused <- bm
}
return nil
}); err != nil {
return errors.Wrap(err, "error looking for unreferenced blobs")
}
close(unused)
unreferencedCount, unreferencedSize := unreferenced.Approximate()
printStderr("Found %v blobs to delete (%v)\n", unreferencedCount, units.BytesStringBase10(unreferencedSize))
// wait for all delete workers to finish.
if err := eg.Wait(); err != nil {
if err != nil {
return err
}
if *blobGarbageCollectCommandDelete != "yes" {
if unreferencedCount > 0 {
printStderr("Pass --delete=yes to delete.\n")
}
return nil
if opts.DryRun && n > 0 {
printStderr("Pass --delete=yes to delete.\n")
}
del, cnt := deleted.Approximate()
printStderr("Deleted total %v unreferenced blobs (%v)\n", del, units.BytesStringBase10(cnt))
return nil
return err
}
func init() {

View File

@@ -2,16 +2,11 @@
import (
"context"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/maintenance"
)
var (
@@ -26,102 +21,17 @@
contentRewriteMinAge = contentRewriteCommand.Flag("min-age", "Only rewrite contents above given age").Default("1h").Duration()
)
const shortPackThresholdPercent = 60 // blocks below 60% of max block size are considered to be 'short
type contentInfoOrError struct {
content.Info
err error
}
func runContentRewriteCommand(ctx context.Context, rep *repo.DirectRepository) error {
cnt := getContentToRewrite(ctx, rep)
var (
mu sync.Mutex
totalBytes int64
failedCount int
)
var wg sync.WaitGroup
for i := 0; i < *contentRewriteParallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for c := range cnt {
if c.err != nil {
log(ctx).Errorf("got error: %v", c.err)
mu.Lock()
failedCount++
mu.Unlock()
return
}
var optDeleted string
if c.Deleted {
optDeleted = " (deleted)"
}
if age := time.Since(c.Timestamp()); age < *contentRewriteMinAge {
printStderr("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.\n", c.ID, c.Length, c.PackBlobID, optDeleted, formatTimestamp(c.Timestamp()))
continue
}
printStderr("Rewriting content %v (%v bytes) from pack %v%v %v\n", c.ID, c.Length, c.PackBlobID, optDeleted, formatTimestamp(c.Timestamp()))
mu.Lock()
totalBytes += int64(c.Length)
mu.Unlock()
if *contentRewriteDryRun {
continue
}
if err := rep.Content.RewriteContent(ctx, c.ID); err != nil {
log(ctx).Warningf("unable to rewrite content %q: %v", c.ID, err)
mu.Lock()
failedCount++
mu.Unlock()
}
}
}()
}
wg.Wait()
printStderr("Total bytes rewritten %v\n", units.BytesStringBase10(totalBytes))
if failedCount == 0 {
return nil
}
return errors.Errorf("failed to rewrite %v contents", failedCount)
}
func getContentToRewrite(ctx context.Context, rep *repo.DirectRepository) <-chan contentInfoOrError {
ch := make(chan contentInfoOrError)
go func() {
defer close(ch)
// get content IDs listed on command line
findContentInfos(ctx, rep, ch, toContentIDs(*contentRewriteIDs))
// add all content IDs from short packs
if *contentRewriteShortPacks {
threshold := int64(rep.Content.Format.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd
findContentInShortPacks(ctx, rep, ch, threshold)
}
// add all blocks with given format version
if *contentRewriteFormatVersion != -1 {
findContentWithFormatVersion(ctx, rep, ch, *contentRewriteFormatVersion)
}
}()
return ch
return maintenance.RewriteContents(ctx, rep, &maintenance.RewriteContentsOptions{
ContentIDRange: contentIDRange(),
ContentIDs: toContentIDs(*contentRewriteIDs),
FormatVersion: *contentRewriteFormatVersion,
MinAge: *contentRewriteMinAge,
PackPrefix: blob.ID(*contentRewritePackPrefix),
Parallel: *contentRewriteParallelism,
ShortPacks: *contentRewriteShortPacks,
DryRun: *contentRewriteDryRun,
})
}
func toContentIDs(s []string) []content.ID {
@@ -133,65 +43,6 @@ func toContentIDs(s []string) []content.ID {
return result
}
func findContentInfos(ctx context.Context, rep *repo.DirectRepository, ch chan contentInfoOrError, contentIDs []content.ID) {
for _, contentID := range contentIDs {
i, err := rep.Content.ContentInfo(ctx, contentID)
if err != nil {
ch <- contentInfoOrError{err: errors.Wrapf(err, "unable to get info for content %q", contentID)}
} else {
ch <- contentInfoOrError{Info: i}
}
}
}
func findContentWithFormatVersion(ctx context.Context, rep *repo.DirectRepository, ch chan contentInfoOrError, version int) {
_ = rep.Content.IterateContents(
ctx,
content.IterateOptions{
Range: contentIDRange(),
IncludeDeleted: true,
},
func(b content.Info) error {
if int(b.FormatVersion) == version && strings.HasPrefix(string(b.PackBlobID), *contentRewritePackPrefix) {
ch <- contentInfoOrError{Info: b}
}
return nil
})
}
func findContentInShortPacks(ctx context.Context, rep *repo.DirectRepository, ch chan contentInfoOrError, threshold int64) {
var prefixes []blob.ID
if *contentRewritePackPrefix != "" {
prefixes = append(prefixes, blob.ID(*contentRewritePackPrefix))
}
err := rep.Content.IteratePacks(
ctx,
content.IteratePackOptions{
Prefixes: prefixes,
IncludePacksWithOnlyDeletedContent: true,
IncludeContentInfos: true,
},
func(pi content.PackInfo) error {
if pi.TotalSize >= threshold {
return nil
}
for _, ci := range pi.ContentInfos {
ch <- contentInfoOrError{Info: ci}
}
return nil
},
)
if err != nil {
ch <- contentInfoOrError{err: err}
return
}
}
func init() {
contentRewriteCommand.Action(directRepositoryAction(runContentRewriteCommand))
setupContentIDRangeFlags(contentRewriteCommand)

View File

@@ -0,0 +1,54 @@
package cli
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/maintenance"
)
var (
maintenanceInfoCommand = maintenanceCommands.Command("info", "Display maintenance information").Alias("status")
)
func runMaintenanceInfoCommand(ctx context.Context, rep *repo.DirectRepository) error {
p, err := maintenance.GetParams(ctx, rep)
if err != nil {
return errors.Wrap(err, "unable to get maintenance params")
}
s, err := maintenance.GetSchedule(ctx, rep)
if err != nil {
return errors.Wrap(err, "unable to get maintenance schedule")
}
printStderr("Owner: %v\n", p.Owner)
printStderr("Quick Cycle:\n")
displayCycleInfo(&p.QuickCycle, s.NextQuickMaintenanceTime, rep)
printStderr("Full Cycle:\n")
displayCycleInfo(&p.FullCycle, s.NextFullMaintenanceTime, rep)
return nil
}
func displayCycleInfo(c *maintenance.CycleParams, t time.Time, rep *repo.DirectRepository) {
printStderr(" scheduled: %v\n", c.Enabled)
if c.Enabled {
printStderr(" interval: %v\n", c.Interval)
if rep.Time().Before(t) {
printStderr(" next run: %v (in %v)\n", formatTimestamp(t), time.Until(t).Truncate(time.Second))
} else {
printStderr(" next run: now\n")
}
}
}
func init() {
maintenanceInfoCommand.Action(directRepositoryAction(runMaintenanceInfoCommand))
}

View File

@@ -0,0 +1,37 @@
package cli
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/gc"
)
var (
maintenanceRunCommand = maintenanceCommands.Command("run", "Run repository maintenance").Default()
maintenanceRunFull = maintenanceRunCommand.Flag("full", "Full maintenance").Bool()
)
func runMaintenanceCommand(ctx context.Context, rep *repo.DirectRepository) error {
mode := maintenance.ModeQuick
if *maintenanceRunFull {
mode = maintenance.ModeFull
}
return maintenance.RunExclusive(ctx, rep, mode, func(p maintenance.RunParameters) error {
if p.Mode == maintenance.ModeFull {
if _, err := gc.Run(ctx, rep, p.Params.SnapshotGC, true); err != nil {
return errors.Wrap(err, "error running snapshot GC")
}
}
return maintenance.Run(ctx, p)
})
}
func init() {
maintenanceRunCommand.Action(directRepositoryAction(runMaintenanceCommand))
}

View File

@@ -0,0 +1,134 @@
package cli
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/maintenance"
)
var (
maintenanceSetCommand = maintenanceCommands.Command("set", "Set maintenance parameters")
maintenanceSetOwner = maintenanceSetCommand.Flag("owner", "Set maintenance owner user@hostname").String()
maintenanceSetEnableQuick = maintenanceSetCommand.Flag("enable-quick", "Enable or disable quick maintenance").BoolList()
maintenanceSetEnableFull = maintenanceSetCommand.Flag("enable-full", "Enable or disable full maintenance").BoolList()
maintenanceSetQuickFrequency = maintenanceSetCommand.Flag("quick-interval", "Set quick maintenance interval").DurationList()
maintenanceSetFullFrequency = maintenanceSetCommand.Flag("full-interval", "Set full maintenance interval").DurationList()
maintenanceSetPauseQuick = maintenanceSetCommand.Flag("pause-quick", "Pause quick maintenance for a specified duration").DurationList()
maintenanceSetPauseFull = maintenanceSetCommand.Flag("pause-full", "Pause full maintenance for a specified duration").DurationList()
maintenanceSetDropDeletedAge = maintenanceSetCommand.Flag("drop-deleted-age", "Drop deleted contents older than").DurationList()
)
func setMaintenanceOwnerFromFlags(p *maintenance.Params, rep *repo.DirectRepository, changed *bool) {
if v := *maintenanceSetOwner; v != "" {
if v == "me" {
p.Owner = rep.Username() + "@" + rep.Hostname()
} else {
p.Owner = v
}
*changed = true
printStderr("Setting maintenance owner to %v\n", p.Owner)
}
}
func setMaintenanceEnabledAndIntervalFromFlags(c *maintenance.CycleParams, cycleName string, enableFlag []bool, intervalFlag []time.Duration, changed *bool) {
// we use lists to distinguish between flag not set
// Zero elements == not set, more than zero - flag set, in which case we pick the last value
if len(enableFlag) > 0 {
lastVal := enableFlag[len(enableFlag)-1]
c.Enabled = lastVal
*changed = true
if lastVal {
printStderr("Periodic %v maintenance enabled.\n", cycleName)
} else {
printStderr("Periodic %v maintenance disabled.\n", cycleName)
}
}
if len(intervalFlag) > 0 {
lastVal := intervalFlag[len(intervalFlag)-1]
c.Interval = lastVal
*changed = true
printStderr("Interval for %v maintenance set to %v.\n", cycleName, lastVal)
}
}
func setMaintenanceIndexRewriteParamsFromFlags(p *maintenance.Params, changed *bool) {
if v := *maintenanceSetDropDeletedAge; len(v) > 0 {
lastVal := v[len(v)-1]
p.DropDeletedContent.MinDeletedAge = lastVal
printStderr("Index rewrite will drop deleted contents older than %v.\n", p.DropDeletedContent.MinDeletedAge)
*changed = true
}
}
func runMaintenanceSetParams(ctx context.Context, rep *repo.DirectRepository) error {
p, err := maintenance.GetParams(ctx, rep)
if err != nil {
return errors.Wrap(err, "unable to get current parameters")
}
s, err := maintenance.GetSchedule(ctx, rep)
if err != nil {
return errors.Wrap(err, "unable to get current parameters")
}
var changedParams, changedSchedule bool
setMaintenanceOwnerFromFlags(p, rep, &changedParams)
setMaintenanceEnabledAndIntervalFromFlags(&p.QuickCycle, "quick", *maintenanceSetEnableQuick, *maintenanceSetQuickFrequency, &changedParams)
setMaintenanceEnabledAndIntervalFromFlags(&p.FullCycle, "full", *maintenanceSetEnableFull, *maintenanceSetFullFrequency, &changedParams)
setMaintenanceIndexRewriteParamsFromFlags(p, &changedParams)
if v := *maintenanceSetPauseQuick; len(v) > 0 {
pauseDuration := v[len(v)-1]
s.NextQuickMaintenanceTime = rep.Time().Add(pauseDuration)
changedSchedule = true
printStderr("Quick maintenance paused until %v", formatTimestamp(s.NextQuickMaintenanceTime))
}
if v := *maintenanceSetPauseFull; len(v) > 0 {
pauseDuration := v[len(v)-1]
s.NextFullMaintenanceTime = rep.Time().Add(pauseDuration)
changedSchedule = true
printStderr("Full maintenance paused until %v", formatTimestamp(s.NextFullMaintenanceTime))
}
if !changedParams && !changedSchedule {
return errors.Errorf("no changes specified")
}
if changedSchedule {
if err := maintenance.SetSchedule(ctx, rep, s); err != nil {
return errors.Wrap(err, "unable to set schedule")
}
}
if changedParams {
if err := maintenance.SetParams(ctx, rep, p); err != nil {
return errors.Wrap(err, "unable to set params")
}
}
return nil
}
func init() {
maintenanceSetCommand.Action(directRepositoryAction(runMaintenanceSetParams))
}

View File

@@ -10,6 +10,7 @@
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/repo/splitter"
"github.com/kopia/kopia/snapshot/policy"
@@ -104,5 +105,12 @@ func populateRepository(ctx context.Context, password string) error {
printStdout("To change the policy use:\n kopia policy set --global <options>\n")
printStdout("or\n kopia policy set <dir> <options>\n")
p := maintenance.DefaultParams()
p.Owner = rep.Username() + "@" + rep.Hostname()
if err := maintenance.SetParams(ctx, rep, &p); err != nil {
return errors.Wrap(err, "unable to set maintenance params")
}
return nil
}

View File

@@ -5,6 +5,7 @@
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/gc"
)
@@ -15,7 +16,9 @@
)
func runSnapshotGCCommand(ctx context.Context, rep *repo.DirectRepository) error {
st, err := gc.Run(ctx, rep, *snapshotGCMinContentAge, *snapshotGCDelete)
st, err := gc.Run(ctx, rep, maintenance.SnapshotGCParams{
MinContentAge: *snapshotGCMinContentAge,
}, *snapshotGCDelete)
log(ctx).Infof("GC found %v unused contents (%v bytes)", st.UnusedCount, units.BytesStringBase2(st.UnusedBytes))
log(ctx).Infof("GC found %v unused contents that are too recent to delete (%v bytes)", st.TooRecentCount, units.BytesStringBase2(st.TooRecentBytes))

1
go.mod
View File

@@ -15,6 +15,7 @@ require (
github.com/chmduquesne/rollinghash v4.0.0+incompatible
github.com/efarrer/iothrottler v0.0.1
github.com/fatih/color v1.9.0
github.com/gofrs/flock v0.7.1
github.com/golang/protobuf v1.3.5
github.com/google/fswalker v0.2.0
github.com/google/readahead v0.0.0-20161222183148-eaceba169032 // indirect

4
go.sum
View File

@@ -169,6 +169,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/godbus/dbus v4.1.0+incompatible h1:WqqLRTsQic3apZUK9qC5sGNfXthmPXzUZ7nQPrNITa4=
github.com/godbus/dbus v4.1.0+incompatible/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
@@ -290,6 +292,8 @@ github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfE
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b h1:FQ7+9fxhyp82ks9vAuyPzG0/vVbWwMwLJ+P6yJI5FN8=
github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b/go.mod h1:HMcgvsgd0Fjj4XXDkbjdmlbI505rUPBs6WBMYg2pXks=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

View File

@@ -14,6 +14,7 @@
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/splitter"
"github.com/kopia/kopia/snapshot/policy"
)
@@ -93,6 +94,15 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa
return nil, internalServerError(errors.Wrap(err, "set global policy"))
}
if dr, ok := s.rep.(*repo.DirectRepository); ok {
p := maintenance.DefaultParams()
p.Owner = s.rep.Username() + "@" + s.rep.Hostname()
if err := maintenance.SetParams(ctx, dr, &p); err != nil {
return nil, internalServerError(errors.Wrap(err, "unable to set maintenance params"))
}
}
if err := s.rep.Flush(ctx); err != nil {
return nil, internalServerError(errors.Wrap(err, "flush"))
}

109
repo/maintenance/blob_gc.go Normal file
View File

@@ -0,0 +1,109 @@
package maintenance
import (
"context"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/stats"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo/blob"
)
// defaultBlobGCMinAge is a default MinAge for blob GC.
// We're setting it to 2 hours to accommodate the fact that every repository
// will periodically flush its indexes more frequently than 1/hour.
const defaultBlobGCMinAge = 2 * time.Hour
// DeleteUnreferencedBlobsOptions provides option for blob garbage collection algorithm.
type DeleteUnreferencedBlobsOptions struct {
Parallel int
Prefix blob.ID
MinAge time.Duration
DryRun bool
}
// DeleteUnreferencedBlobs deletes old blobs that are no longer referenced by index entries.
func DeleteUnreferencedBlobs(ctx context.Context, rep MaintainableRepository, opt DeleteUnreferencedBlobsOptions) (int, error) {
if opt.Parallel == 0 {
opt.Parallel = 16
}
if opt.MinAge == 0 {
opt.MinAge = defaultBlobGCMinAge
}
const deleteQueueSize = 100
var unreferenced, deleted stats.CountSum
var eg errgroup.Group
unused := make(chan blob.Metadata, deleteQueueSize)
if !opt.DryRun {
// start goroutines to delete blobs as they come.
for i := 0; i < opt.Parallel; i++ {
eg.Go(func() error {
for bm := range unused {
if err := rep.BlobStorage().DeleteBlob(ctx, bm.BlobID); err != nil {
return errors.Wrapf(err, "unable to delete blob %q", bm.BlobID)
}
cnt, del := deleted.Add(bm.Length)
if cnt%100 == 0 {
log(ctx).Infof(" deleted %v unreferenced blobs (%v)", cnt, units.BytesStringBase10(del))
}
}
return nil
})
}
}
// iterate unreferenced blobs and count them + optionally send to the channel to be deleted
log(ctx).Infof("Looking for unreferenced blobs...")
var prefixes []blob.ID
if p := opt.Prefix; p != "" {
prefixes = append(prefixes, p)
}
if err := rep.ContentManager().IterateUnreferencedBlobs(ctx, prefixes, opt.Parallel, func(bm blob.Metadata) error {
if age := rep.Time().Sub(bm.Timestamp); age < opt.MinAge {
log(ctx).Debugf(" preserving %v because it's too new (age: %v)", bm.BlobID, age)
return nil
}
unreferenced.Add(bm.Length)
if !opt.DryRun {
unused <- bm
}
return nil
}); err != nil {
return 0, errors.Wrap(err, "error looking for unreferenced blobs")
}
close(unused)
unreferencedCount, unreferencedSize := unreferenced.Approximate()
log(ctx).Debugf("Found %v blobs to delete (%v)", unreferencedCount, units.BytesStringBase10(unreferencedSize))
// wait for all delete workers to finish.
if err := eg.Wait(); err != nil {
return 0, err
}
if opt.DryRun {
return int(unreferencedCount), nil
}
del, cnt := deleted.Approximate()
log(ctx).Infof("Deleted total %v unreferenced blobs (%v)", del, units.BytesStringBase10(cnt))
return int(del), nil
}

View File

@@ -0,0 +1,208 @@
package maintenance
import (
"context"
"runtime"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
)
const defaultRewriteContentsMinAge = 2 * time.Hour
const parallelContentRewritesCPUMultiplier = 2
// RewriteContentsOptions provides options for RewriteContents
type RewriteContentsOptions struct {
Parallel int
MinAge time.Duration
ContentIDs []content.ID
ContentIDRange content.IDRange
PackPrefix blob.ID
ShortPacks bool
FormatVersion int
DryRun bool
}
const shortPackThresholdPercent = 60 // blocks below 60% of max block size are considered to be 'short
type contentInfoOrError struct {
content.Info
err error
}
// RewriteContents rewrites contents according to provided criteria and creates new
// blobs and index entries to point at the
func RewriteContents(ctx context.Context, rep MaintainableRepository, opt *RewriteContentsOptions) error {
if opt == nil {
return errors.Errorf("missing options")
}
if opt.MinAge == 0 {
opt.MinAge = defaultRewriteContentsMinAge
}
if opt.ShortPacks {
log(ctx).Infof("Rewriting contents from short packs...")
} else {
log(ctx).Infof("Rewriting contents...")
}
cnt := getContentToRewrite(ctx, rep, opt)
var (
mu sync.Mutex
totalBytes int64
failedCount int
)
if opt.Parallel == 0 {
opt.Parallel = runtime.NumCPU() * parallelContentRewritesCPUMultiplier
}
var wg sync.WaitGroup
for i := 0; i < opt.Parallel; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for c := range cnt {
if c.err != nil {
mu.Lock()
failedCount++
mu.Unlock()
return
}
var optDeleted string
if c.Deleted {
optDeleted = " (deleted)"
}
age := rep.Time().Sub(c.Timestamp())
if age < opt.MinAge {
log(ctx).Debugf("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.", c.ID, c.Length, c.PackBlobID, optDeleted, age)
continue
}
log(ctx).Debugf("Rewriting content %v (%v bytes) from pack %v%v %v", c.ID, c.Length, c.PackBlobID, optDeleted, age)
mu.Lock()
totalBytes += int64(c.Length)
mu.Unlock()
if opt.DryRun {
continue
}
if err := rep.ContentManager().RewriteContent(ctx, c.ID); err != nil {
log(ctx).Infof("unable to rewrite content %q: %v", c.ID, err)
mu.Lock()
failedCount++
mu.Unlock()
}
}
}()
}
wg.Wait()
log(ctx).Debugf("Total bytes rewritten %v", units.BytesStringBase10(totalBytes))
if failedCount == 0 {
return nil
}
return errors.Errorf("failed to rewrite %v contents", failedCount)
}
func getContentToRewrite(ctx context.Context, rep MaintainableRepository, opt *RewriteContentsOptions) <-chan contentInfoOrError {
ch := make(chan contentInfoOrError)
go func() {
defer close(ch)
// get content IDs listed on command line
findContentInfos(ctx, rep, ch, opt.ContentIDs)
// add all content IDs from short packs
if opt.ShortPacks {
threshold := int64(rep.ContentManager().Format.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd
findContentInShortPacks(ctx, rep, ch, threshold, opt)
}
// add all blocks with given format version
if opt.FormatVersion != 0 {
findContentWithFormatVersion(ctx, rep, ch, opt)
}
}()
return ch
}
func findContentInfos(ctx context.Context, rep MaintainableRepository, ch chan contentInfoOrError, contentIDs []content.ID) {
for _, contentID := range contentIDs {
i, err := rep.ContentManager().ContentInfo(ctx, contentID)
if err != nil {
ch <- contentInfoOrError{err: errors.Wrapf(err, "unable to get info for content %q", contentID)}
} else {
ch <- contentInfoOrError{Info: i}
}
}
}
func findContentWithFormatVersion(ctx context.Context, rep MaintainableRepository, ch chan contentInfoOrError, opt *RewriteContentsOptions) {
_ = rep.ContentManager().IterateContents(
ctx,
content.IterateOptions{
Range: opt.ContentIDRange,
IncludeDeleted: true,
},
func(b content.Info) error {
if int(b.FormatVersion) == opt.FormatVersion && strings.HasPrefix(string(b.PackBlobID), string(opt.PackPrefix)) {
ch <- contentInfoOrError{Info: b}
}
return nil
})
}
func findContentInShortPacks(ctx context.Context, rep MaintainableRepository, ch chan contentInfoOrError, threshold int64, opt *RewriteContentsOptions) {
var prefixes []blob.ID
if opt.PackPrefix != "" {
prefixes = append(prefixes, opt.PackPrefix)
}
err := rep.ContentManager().IteratePacks(
ctx,
content.IteratePackOptions{
Prefixes: prefixes,
IncludePacksWithOnlyDeletedContent: true,
IncludeContentInfos: true,
},
func(pi content.PackInfo) error {
if pi.TotalSize >= threshold {
return nil
}
for _, ci := range pi.ContentInfos {
ch <- contentInfoOrError{Info: ci}
}
return nil
},
)
if err != nil {
ch <- contentInfoOrError{err: err}
return
}
}

View File

@@ -0,0 +1,35 @@
package maintenance
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/content"
)
const defaultDropDeletedContentsAge = 24 * time.Hour
// DropDeletedContentOptions specifies options for DropDeletedContent maintenance.
type DropDeletedContentOptions struct {
MinDeletedAge time.Duration `json:"minDeletedAge"`
}
// DropDeletedContents rewrites indexes while dropping deleted contents above certain age.
func DropDeletedContents(ctx context.Context, rep MaintainableRepository, opt *DropDeletedContentOptions) error {
if opt == nil {
return errors.Errorf("options must be set")
}
if opt.MinDeletedAge <= 0 {
opt.MinDeletedAge = defaultDropDeletedContentsAge
}
log(ctx).Infof("Dropping deleted contents older than %v", opt.MinDeletedAge)
return rep.ContentManager().CompactIndexes(ctx, content.CompactOptions{
AllIndexes: true,
SkipDeletedOlderThan: opt.MinDeletedAge,
})
}

View File

@@ -0,0 +1,118 @@
package maintenance
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/manifest"
)
var manifestLabels = map[string]string{
"type": "maintenance",
}
// Params is a JSON-serialized maintenance configuration stored in a repository.
type Params struct {
Owner string `json:"owner"`
QuickCycle CycleParams `json:"quick"`
FullCycle CycleParams `json:"full"`
DropDeletedContent DropDeletedContentOptions `json:"dropDeletedContent"`
SnapshotGC SnapshotGCParams `json:"snapshotGC"`
}
// SnapshotGCParams contains parameters for Snapshot Garbage Collection
// NOTE: Due to layering, the implementation of Snapshot GC is outside of repository package
// but for simplicity we store it here.
type SnapshotGCParams struct {
MinContentAge time.Duration `json:"minAge"`
}
// DefaultParams represents default values of maintenance parameters.
func DefaultParams() Params {
return Params{
FullCycle: CycleParams{
// TODO: enable this when ready for public consumption
// Enabled: true,
Interval: 7 * 24 * time.Hour,
},
QuickCycle: CycleParams{
// TODO: enable this when ready for public consumption
// Enabled: true,
Interval: 1 * time.Hour,
},
DropDeletedContent: DropDeletedContentOptions{
MinDeletedAge: 1 * time.Hour,
},
SnapshotGC: SnapshotGCParams{
MinContentAge: 24 * time.Hour, //nolint:gomnd
},
}
}
// CycleParams specifies parameters for a maintenance cycle (quick or full).
type CycleParams struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval"`
}
// GetParams returns repository-wide maintenance parameters.
func GetParams(ctx context.Context, rep MaintainableRepository) (*Params, error) {
md, err := manifestIDs(ctx, rep)
if err != nil {
return nil, err
}
if len(md) == 0 {
// not found, return empty params
p := DefaultParams()
return &p, nil
}
// arbitrality pick first pick ID to return in case there's more than one
// this is possible when two repository clients independently create manifests at approximately the same time
// so it should not really matter which one we pick.
// see https://github.com/kopia/kopia/issues/391
manifestID := md[0].ID
p := &Params{}
if _, err := rep.GetManifest(ctx, manifestID, p); err != nil {
return nil, errors.Wrap(err, "error loading manifest")
}
return p, nil
}
// SetParams sets the maintenance parameters.
func SetParams(ctx context.Context, rep MaintainableRepository, par *Params) error {
md, err := manifestIDs(ctx, rep)
if err != nil {
return err
}
if _, err := rep.PutManifest(ctx, manifestLabels, par); err != nil {
return errors.Wrap(err, "put manifest")
}
for _, m := range md {
if err := rep.DeleteManifest(ctx, m.ID); err != nil {
return errors.Wrap(err, "delete manifest")
}
}
return nil
}
func manifestIDs(ctx context.Context, rep MaintainableRepository) ([]*manifest.EntryMetadata, error) {
md, err := rep.FindManifests(ctx, manifestLabels)
if err != nil {
return nil, errors.Wrap(err, "error looking for maintenance manifest")
}
return md, err
}

View File

@@ -0,0 +1,247 @@
// Package maintenance manages automatic repository maintenance.
package maintenance
import (
"context"
"time"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/manifest"
)
var log = logging.GetContextLoggerFunc("maintenance")
// Mode describes the mode of maintenance to perfor
type Mode string
// MaintainableRepository is a subset of Repository required for maintenance tasks.
type MaintainableRepository interface {
Username() string
Hostname() string
Time() time.Time
ConfigFilename() string
BlobStorage() blob.Storage
ContentManager() *content.Manager
GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error)
PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error)
DeleteManifest(ctx context.Context, id manifest.ID) error
}
// Supported maintenance modes
const (
ModeNone Mode = "none"
ModeQuick Mode = "quick"
ModeFull Mode = "full"
ModeAuto Mode = "auto" // run either quick of full if required by schedule
)
// shouldRun returns Mode if repository is due for periodic maintenance.
func shouldRun(ctx context.Context, rep MaintainableRepository, p *Params) (Mode, error) {
if myUsername := rep.Username() + "@" + rep.Hostname(); p.Owner != myUsername {
log(ctx).Debugf("maintenance owned by another user '%v'", p.Owner)
return ModeNone, nil
}
s, err := GetSchedule(ctx, rep)
if err != nil {
return ModeNone, errors.Wrap(err, "error getting status")
}
// check full cycle first, as it does more than the quick cycle
if p.FullCycle.Enabled {
if rep.Time().After(s.NextFullMaintenanceTime) {
log(ctx).Debugf("due for full manintenance cycle")
return ModeFull, nil
}
log(ctx).Debugf("not due for full manintenance cycle until %v", s.NextFullMaintenanceTime)
} else {
log(ctx).Debugf("full manintenance cycle not enabled")
}
// no time for full cycle, check quick cycle
if p.QuickCycle.Enabled {
if rep.Time().After(s.NextQuickMaintenanceTime) {
log(ctx).Debugf("due for quick manintenance cycle")
return ModeQuick, nil
}
log(ctx).Debugf("not due for quick manintenance cycle until %v", s.NextQuickMaintenanceTime)
} else {
log(ctx).Debugf("quick manintenance cycle not enabled")
}
return ModeNone, nil
}
func updateSchedule(ctx context.Context, runParams RunParameters) error {
rep := runParams.rep
p := runParams.Params
s, err := GetSchedule(ctx, rep)
if err != nil {
return errors.Wrap(err, "error getting schedule")
}
switch runParams.Mode {
case ModeFull:
// on full cycle, also update the quick cycle
s.NextFullMaintenanceTime = rep.Time().Add(p.FullCycle.Interval)
s.NextQuickMaintenanceTime = s.NextFullMaintenanceTime.Add(p.QuickCycle.Interval)
log(ctx).Debugf("scheduling next full cycle at %v", s.NextFullMaintenanceTime)
log(ctx).Debugf("scheduling next quick cycle at %v", s.NextQuickMaintenanceTime)
return SetSchedule(ctx, rep, s)
case ModeQuick:
log(ctx).Debugf("scheduling next quick cycle at %v", s.NextQuickMaintenanceTime)
s.NextQuickMaintenanceTime = rep.Time().Add(p.QuickCycle.Interval)
return SetSchedule(ctx, rep, s)
default:
return nil
}
}
// RunParameters passes essential parameters for maintenance.
// It is generated by RunExclusive and can't be create outside of its package and
// is required to ensure all maintenance tasks run under an exclusive lock.
type RunParameters struct {
rep MaintainableRepository
Mode Mode
Params *Params
}
// RunExclusive runs the provided callback if the maintenance is owned by local user and
// lock can be acquired. Lock is passed to the function, which ensures that every call to Run()
// is within the exclusive context.
func RunExclusive(ctx context.Context, rep MaintainableRepository, mode Mode, cb func(runParams RunParameters) error) error {
p, err := GetParams(ctx, rep)
if err != nil {
return errors.Wrap(err, "unable to get maintenance params")
}
if myUsername := rep.Username() + "@" + rep.Hostname(); p.Owner != myUsername {
log(ctx).Debugf("maintenance owned by another user '%v'", p.Owner)
return nil
}
if mode == ModeAuto {
mode, err = shouldRun(ctx, rep, p)
if err != nil {
return errors.Wrap(err, "unable to determine if maintenance is required")
}
}
if mode == ModeNone {
log(ctx).Debugf("not due for maintenance")
return nil
}
runParams := RunParameters{rep, mode, p}
// update schedule so that we don't run the maintenance again immediately if
// this process crashes.
if err = updateSchedule(ctx, runParams); err != nil {
return errors.Wrap(err, "error updating maintenance schedule")
}
lockFile := rep.ConfigFilename() + ".mlock"
log(ctx).Debugf("Acquiring maintenance lock in file %v", lockFile)
// acquire local lock on a config file
l := flock.New(lockFile)
ok, err := l.TryLock()
if err != nil {
return errors.Wrap(err, "error acquiring maintenance lock")
}
if !ok {
log(ctx).Debugf("maintenance is already in progress locally")
return nil
}
defer l.Unlock() //nolint:errcheck
log(ctx).Infof("Running %v maintenance...", runParams.Mode)
defer log(ctx).Infof("Finished %v maintenance.", runParams.Mode)
return cb(runParams)
}
// Run performs maintenance activities for a repository.
func Run(ctx context.Context, runParams RunParameters) error {
switch runParams.Mode {
case ModeQuick:
return runQuickMaintenance(ctx, runParams)
case ModeFull:
return runFullMaintenance(ctx, runParams)
default:
return errors.Errorf("unknown mode %q", runParams.Mode)
}
}
func runQuickMaintenance(ctx context.Context, runParams RunParameters) error {
// rewrite indexes by dropping content entries that have been marked
// as deleted for a long time
if err := DropDeletedContents(ctx, runParams.rep, &runParams.Params.DropDeletedContent); err != nil {
return errors.Wrap(err, "error dropping deleted contents")
}
// find 'q' packs that are less than 80% full and rewrite contents in them into
// new consolidated packs, orphaning old packs in the process.
if err := RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
ContentIDRange: content.AllPrefixedIDs,
PackPrefix: content.PackBlobIDPrefixSpecial,
ShortPacks: true,
}); err != nil {
return errors.Wrap(err, "error rewriting metadata contents")
}
// delete orphaned 'q' packs after some time.
if _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{
Prefix: content.PackBlobIDPrefixSpecial,
}); err != nil {
return errors.Wrap(err, "error deleting unreferenced metadata blobs")
}
return nil
}
func runFullMaintenance(ctx context.Context, runParams RunParameters) error {
// rewrite indexes by dropping content entries that have been marked
// as deleted for a long time
if err := DropDeletedContents(ctx, runParams.rep, &runParams.Params.DropDeletedContent); err != nil {
return errors.Wrap(err, "error dropping deleted contents")
}
// find packs that are less than 80% full and rewrite contents in them into
// new consolidated packs, orphaning old packs in the process.
if err := RewriteContents(ctx, runParams.rep, &RewriteContentsOptions{
ContentIDRange: content.AllIDs,
ShortPacks: true,
}); err != nil {
return errors.Wrap(err, "error rewriting contents in short packs")
}
// delete orphaned packs after some time.
if _, err := DeleteUnreferencedBlobs(ctx, runParams.rep, DeleteUnreferencedBlobsOptions{}); err != nil {
return errors.Wrap(err, "error deleting unreferenced blobs")
}
return nil
}

View File

@@ -0,0 +1,49 @@
package maintenance
import (
"context"
"encoding/json"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
)
const maintenanceScheduleBlobID = "kopia.schedule"
// Schedule keeps track of scheduled maintenance times.
type Schedule struct {
NextFullMaintenanceTime time.Time `json:"nextFullMaintenance"`
NextQuickMaintenanceTime time.Time `json:"nextQuickMaintenance"`
}
// GetSchedule gets the scheduled maintenance times.
func GetSchedule(ctx context.Context, rep MaintainableRepository) (*Schedule, error) {
v, err := rep.BlobStorage().GetBlob(ctx, maintenanceScheduleBlobID, 0, -1)
if err == blob.ErrBlobNotFound {
return &Schedule{}, nil
}
if err != nil {
return nil, errors.Wrap(err, "error reading schedule blob")
}
s := &Schedule{}
if err := json.Unmarshal(v, s); err != nil {
return nil, errors.Wrap(err, "malformed schedule blob")
}
return s, nil
}
// SetSchedule updates scheduled maintenance times.
func SetSchedule(ctx context.Context, rep MaintainableRepository, s *Schedule) error {
v, err := json.Marshal(s)
if err != nil {
return errors.Wrap(err, "unable to serialize JSON")
}
return rep.BlobStorage().PutBlob(ctx, maintenanceScheduleBlobID, gather.FromSlice(v))
}

View File

@@ -57,6 +57,21 @@ func (r *DirectRepository) Hostname() string { return r.hostname }
// Username returns the username that's connect to the repository.
func (r *DirectRepository) Username() string { return r.username }
// BlobStorage returns the blob storage.
func (r *DirectRepository) BlobStorage() blob.Storage {
return r.Blobs
}
// ContentManager returns the content manager.
func (r *DirectRepository) ContentManager() *content.Manager {
return r.Content
}
// ConfigFilename returns the name of the configuration file
func (r *DirectRepository) ConfigFilename() string {
return r.ConfigFile
}
// OpenObject opens the reader for a given object, returns object.ErrNotFound
func (r *DirectRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
return r.Objects.Open(ctx, id)

View File

@@ -4,7 +4,6 @@
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
@@ -14,13 +13,14 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
var log = logging.GetContextLoggerFunc("kopia/snapshot/gc")
var log = logging.GetContextLoggerFunc("snapshotgc")
func oidOf(entry fs.Entry) object.ID {
return entry.(object.HasObjectID).ObjectID()
@@ -75,7 +75,7 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma
// Run performs garbage collection on all the snapshots in the repository.
// nolint:gocognit
func Run(ctx context.Context, rep *repo.DirectRepository, minContentAge time.Duration, gcDelete bool) (Stats, error) {
func Run(ctx context.Context, rep *repo.DirectRepository, params maintenance.SnapshotGCParams, gcDelete bool) (Stats, error) {
var used sync.Map
var st Stats
@@ -99,7 +99,7 @@ func Run(ctx context.Context, rep *repo.DirectRepository, minContentAge time.Dur
return nil
}
if rep.Time().Sub(ci.Timestamp()) < minContentAge {
if rep.Time().Sub(ci.Timestamp()) < params.MinContentAge {
log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ID, ci.Length, ci.Timestamp())
tooRecent.Add(int64(ci.Length))
return nil

View File

@@ -32,7 +32,7 @@
const copyBufferSize = 128 * 1024
var log = logging.GetContextLoggerFunc("kopia/upload")
var log = logging.GetContextLoggerFunc("snapshotfs")
var errCanceled = errors.New("canceled")

View File

@@ -252,21 +252,26 @@ func TestSnapshotDeleteTypeCheck(t *testing.T) {
e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir)
lines := e.RunAndExpectSuccess(t, "manifest", "ls")
if len(lines) != 1 {
t.Fatalf("Expected 1 line global policy output for manifest ls")
if len(lines) != 2 {
t.Fatalf("Expected 2 line global policy + maintenance config output for manifest ls")
}
line := lines[0]
fields := strings.Fields(line)
manifestID := fields[0]
typeField := fields[5]
for _, line := range lines {
fields := strings.Fields(line)
manifestID := fields[0]
typeField := fields[5]
typeVal := strings.TrimPrefix(typeField, "type:")
if typeVal != "policy" {
t.Fatalf("Expected global policy manifest on a fresh repo")
typeVal := strings.TrimPrefix(typeField, "type:")
if typeVal == "maintenance" {
continue
}
if typeVal != "policy" {
t.Fatalf("Expected global policy manifest on a fresh repo")
}
e.RunAndExpectFailure(t, "snapshot", "delete", manifestID, "--unsafe-ignore-source")
}
e.RunAndExpectFailure(t, "snapshot", "delete", manifestID, "--unsafe-ignore-source")
}
func TestSnapshotDeleteRestore(t *testing.T) {