Files
kopia/cli/command_index_recover.go

205 lines
6.0 KiB
Go

package cli
import (
"context"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/indexblob"
)
type commandIndexRecover struct {
blobIDs []string
blobPrefixes []string
commit bool
ignoreErrors bool
parallel int
deleteIndexes bool
svc appServices
}
func (c *commandIndexRecover) setup(svc appServices, parent commandParent) {
cmd := parent.Command("recover", "Recover indexes from pack blobs")
cmd.Flag("blob-prefixes", "Prefixes of pack blobs to recover from (default=all packs)").StringsVar(&c.blobPrefixes)
cmd.Flag("blobs", "Names of pack blobs to recover from (default=all packs)").StringsVar(&c.blobIDs)
cmd.Flag("parallel", "Recover parallelism").Default("1").IntVar(&c.parallel)
cmd.Flag("ignore-errors", "Ignore errors when recovering").BoolVar(&c.ignoreErrors)
cmd.Flag("delete-indexes", "Delete all indexes before recovering").BoolVar(&c.deleteIndexes)
cmd.Flag("commit", "Commit recovered content").BoolVar(&c.commit)
cmd.Action(svc.directRepositoryWriteAction(c.run))
c.svc = svc
}
func (c *commandIndexRecover) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
c.svc.dangerousCommand()
var (
processedBlobCount atomic.Int32
recoveredContentCount atomic.Int32
)
defer func() {
if recoveredContentCount.Load() == 0 {
log(ctx).Info("No contents recovered.")
return
}
if !c.commit {
log(ctx).Infof("Found %v contents to recover from %v blobs, but not committed. Re-run with --commit", recoveredContentCount.Load(), processedBlobCount.Load())
} else {
log(ctx).Infof("Recovered %v contents from %v.", recoveredContentCount.Load(), processedBlobCount.Load())
}
}()
if c.deleteIndexes {
if err := rep.BlobReader().ListBlobs(ctx, indexblob.V0IndexBlobPrefix, func(bm blob.Metadata) error {
if c.commit {
log(ctx).Infof("deleting old index blob: %v", bm.BlobID)
return errors.Wrap(rep.BlobStorage().DeleteBlob(ctx, bm.BlobID), "error deleting index blob")
}
log(ctx).Infof("would delete old index: %v (pass --commit to approve)", bm.BlobID)
return nil
}); err != nil {
return errors.Wrap(err, "error deleting old indexes")
}
}
if len(c.blobIDs) == 0 {
var prefixes []blob.ID
if len(c.blobPrefixes) > 0 {
for _, p := range c.blobPrefixes {
prefixes = append(prefixes, blob.ID(p))
}
} else {
prefixes = content.PackBlobIDPrefixes
}
return c.recoverIndexesFromAllPacks(ctx, rep, prefixes, &processedBlobCount, &recoveredContentCount)
}
for _, packFile := range c.blobIDs {
if err := c.recoverIndexFromSinglePackFile(ctx, rep, blob.ID(packFile), 0, &processedBlobCount, &recoveredContentCount); err != nil && !c.ignoreErrors {
return errors.Wrapf(err, "error recovering index from %v", packFile)
}
}
return nil
}
func (c *commandIndexRecover) recoverIndexesFromAllPacks(ctx context.Context, rep repo.DirectRepositoryWriter, prefixes []blob.ID, processedBlobCount, recoveredContentCount *atomic.Int32) error {
var (
discoveredBlobCount atomic.Int32
discoveringBlobCount atomic.Int32
tt timetrack.Throttle
)
// recover indexes from all pack blobs in parallel.
// this is actually quite fast since we typically need to read only 8KB from each blob.
eg, ctx := errgroup.WithContext(ctx)
go func() {
for _, prefix := range prefixes {
//nolint:errcheck
rep.BlobStorage().ListBlobs(ctx, prefix, func(_ blob.Metadata) error {
discoveringBlobCount.Add(1)
return nil
})
}
discoveredBlobCount.Store(discoveringBlobCount.Load())
}()
est := timetrack.Start()
blobCh := make(chan blob.Metadata)
// goroutine to populate blob metadata into a channel.
eg.Go(func() error {
defer close(blobCh)
for _, prefix := range prefixes {
if err := rep.BlobStorage().ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
blobCh <- bm
return nil
}); err != nil {
return errors.Wrapf(err, "error listing blobs with prefix %q", prefix)
}
}
return nil
})
// N goroutines to recover from incoming blobs.
for worker := range c.parallel {
eg.Go(func() error {
cnt := 0
for bm := range blobCh {
finishedBlobs := processedBlobCount.Load()
log(ctx).Debugf("worker %v got %v", worker, cnt)
cnt++
if tt.ShouldOutput(time.Second) {
if disc := discoveredBlobCount.Load(); disc > 0 {
e, ok := est.Estimate(float64(finishedBlobs), float64(disc))
if ok {
log(ctx).Infof("Recovered %v index entries from %v/%v blobs (%.1f %%) %v remaining %v ETA",
recoveredContentCount.Load(),
finishedBlobs,
disc,
e.PercentComplete,
e.Remaining,
formatTimestamp(e.EstimatedEndTime))
}
} else {
log(ctx).Infof("Recovered %v index entries from %v blobs, estimating time remaining... (found %v blobs)",
recoveredContentCount.Load(),
finishedBlobs,
discoveringBlobCount.Load())
}
}
if err := c.recoverIndexFromSinglePackFile(ctx, rep, bm.BlobID, bm.Length, processedBlobCount, recoveredContentCount); err != nil {
return errors.Wrapf(err, "error recovering from %v", bm.BlobID)
}
}
return nil
})
}
return errors.Wrap(eg.Wait(), "recovering indexes")
}
func (c *commandIndexRecover) recoverIndexFromSinglePackFile(ctx context.Context, rep repo.DirectRepositoryWriter, blobID blob.ID, length int64, processedBlobCount, recoveredContentCount *atomic.Int32) error {
log(ctx).Debugf("recovering from %v", blobID)
recovered, err := rep.ContentManager().RecoverIndexFromPackBlob(ctx, blobID, length, c.commit)
if err != nil {
if c.ignoreErrors {
return nil
}
return errors.Wrapf(err, "unable to recover index from %v", blobID)
}
recoveredContentCount.Add(int32(len(recovered))) //nolint:gosec
processedBlobCount.Add(1)
log(ctx).Debugf("Recovered %v entries from %v (commit=%v)", len(recovered), blobID, c.commit)
return nil
}