Files
kopia/cli/command_snapshot_fix.go
Prasad Ghangal 3bf947d746 feat(repository): Metadata compression config support for directory and indirect content (#4080)
* Configure compressor for k and x prefixed content

Adds metadata compression setting to policy
Add support to configure compressor for k and x prefixed content
Set zstd-fastest as the default compressor for metadata in the policy
Adds support to set and show metadata compression to kopia policy commands
Adds metadata compression config to dir writer

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Pass concatenate options with ConcatenateOptions struct

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Move content compression handling to caller

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Move handling manifests to manifest pkg

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Correct const in server_test

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Remove unnecessary whitespace

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

* Disable metadata compression for < V2 format

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>

---------

Signed-off-by: Prasad Ghangal <prasad.ganghal@veeam.com>
2024-10-23 23:28:23 -07:00

206 lines
5.7 KiB
Go

package cli
import (
"context"
"github.com/alecthomas/kingpin/v2"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
type commandSnapshotFix struct {
invalidFiles commandSnapshotFixInvalidFiles
removeFiles commandSnapshotFixRemoveFiles
}
func (c *commandSnapshotFix) setup(svc appServices, parent commandParent) {
cmd := parent.Command("fix", "Commands to fix snapshot consistency issues")
c.invalidFiles.setup(svc, cmd)
c.removeFiles.setup(svc, cmd)
}
type commonRewriteSnapshots struct {
manifestIDs []string
sources []string
commit bool
parallel int
invalidDirHandling string
}
const (
invalidEntryKeep = "keep" // keep unreadable file/directory
invalidEntryStub = "stub" // replaces unreadable file/directory with a stub file
invalidEntryFail = "fail" // fail the command
invalidEntryRemove = "remove" // removes unreadable file/directory
)
func (c *commonRewriteSnapshots) setup(svc appServices, cmd *kingpin.CmdClause) {
_ = svc
cmd.Flag("manifest-id", "Manifest IDs").StringsVar(&c.manifestIDs)
cmd.Flag("source", "Source to target (username@hostname:/path)").StringsVar(&c.sources)
cmd.Flag("commit", "Update snapshot manifests").BoolVar(&c.commit)
cmd.Flag("parallel", "Parallelism").IntVar(&c.parallel)
cmd.Flag("invalid-directory-handling", "Handling of invalid directories").Default(invalidEntryStub).EnumVar(&c.invalidDirHandling, invalidEntryFail, invalidEntryStub, invalidEntryKeep)
}
func failedEntryCallback(rep repo.RepositoryWriter, enumVal string) snapshotfs.RewriteFailedEntryCallback {
switch enumVal {
default:
return snapshotfs.RewriteFail
case invalidEntryStub:
return snapshotfs.RewriteAsStub(rep)
case invalidEntryRemove:
return snapshotfs.RewriteRemove
case invalidEntryKeep:
return snapshotfs.RewriteKeep
}
}
func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, rep repo.RepositoryWriter, rewrite snapshotfs.RewriteDirEntryCallback) error {
rw, err := snapshotfs.NewDirRewriter(ctx, rep, snapshotfs.DirRewriterOptions{
Parallel: c.parallel,
RewriteEntry: rewrite,
OnDirectoryReadFailure: failedEntryCallback(rep, c.invalidDirHandling),
})
if err != nil {
return errors.Wrap(err, "unable to create dir rewriter")
}
defer rw.Close(ctx)
var updatedSnapshots int
manifestIDs, err := c.listManifestIDs(ctx, rep)
if err != nil {
return err
}
manifests, err := snapshot.LoadSnapshots(ctx, rep, manifestIDs)
if err != nil {
return errors.Wrap(err, "error loading snapshots")
}
for _, mg := range snapshot.GroupBySource(manifests) {
log(ctx).Infof("Processing snapshot %v", mg[0].Source)
policyTree, err := policy.TreeForSource(ctx, rep, mg[0].Source)
if err != nil {
return errors.Wrap(err, "unable to get policy tree")
}
metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()
for _, man := range snapshot.SortByTime(mg, false) {
log(ctx).Debugf(" %v (%v)", formatTimestamp(man.StartTime.ToTime()), man.ID)
old := man.Clone()
changed, err := rw.RewriteSnapshotManifest(ctx, man, metadataComp)
if err != nil {
return errors.Wrap(err, "error rewriting manifest")
}
if !changed {
log(ctx).Infof(" %v unchanged (%v)", formatTimestamp(man.StartTime.ToTime()), man.ID)
continue
}
if c.commit {
if err := snapshot.UpdateSnapshot(ctx, rep, man); err != nil {
return errors.Wrap(err, "error updating snapshot")
}
}
log(ctx).Infof(" %v replaced manifest from %v to %v", formatTimestamp(man.StartTime.ToTime()), old.ID, man.ID)
log(ctx).Infof(" diff %v %v", old.RootEntry.ObjectID, man.RootEntry.ObjectID)
if d := snapshotSizeDelta(old, man); d != "" {
log(ctx).Infof(" delta:%v", d)
}
updatedSnapshots++
}
}
if updatedSnapshots > 0 {
if !c.commit {
log(ctx).Infof("Fixed %v snapshots, but snapshot manifests were not updated. Pass --commit to update snapshots.", updatedSnapshots)
} else {
log(ctx).Infof("Fixed and committed %v snapshots.", updatedSnapshots)
}
}
if updatedSnapshots == 0 {
log(ctx).Info("No changes.")
}
return nil
}
func snapshotSizeDelta(m1, m2 *snapshot.Manifest) string {
if m1.RootEntry == nil || m2.RootEntry == nil {
return ""
}
if m1.RootEntry.DirSummary == nil || m2.RootEntry.DirSummary == nil {
return ""
}
deltaBytes := m2.RootEntry.DirSummary.TotalFileSize - m1.RootEntry.DirSummary.TotalFileSize
if deltaBytes < 0 {
return "-" + units.BytesString(-deltaBytes)
}
if deltaBytes > 0 {
return "+" + units.BytesString(deltaBytes)
}
return ""
}
func (c *commonRewriteSnapshots) listManifestIDs(ctx context.Context, rep repo.Repository) ([]manifest.ID, error) {
manifests := toManifestIDs(c.manifestIDs)
for _, src := range c.sources {
log(ctx).Infof("Listing snapshots for source %q...", src)
si, err := snapshot.ParseSourceInfo(src, rep.ClientOptions().Hostname, rep.ClientOptions().Username)
if err != nil {
return nil, errors.Wrap(err, "unable to parse source")
}
m, err := snapshot.ListSnapshotManifests(ctx, rep, &si, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to list manifests")
}
if len(m) == 0 {
return nil, errors.Errorf("no snapshots for %v", src)
}
manifests = append(manifests, m...)
}
if len(manifests) == 0 {
log(ctx).Info("Listing all snapshots...")
m, err := snapshot.ListSnapshotManifests(ctx, rep, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to list snapshot manifests")
}
manifests = append(manifests, m...)
}
return manifests, nil
}