mirror of
https://github.com/kopia/kopia.git
synced 2025-12-23 22:57:50 -05:00
205 lines
6.0 KiB
Go
205 lines
6.0 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/kopia/kopia/internal/timetrack"
|
|
"github.com/kopia/kopia/repo"
|
|
"github.com/kopia/kopia/repo/blob"
|
|
"github.com/kopia/kopia/repo/content"
|
|
"github.com/kopia/kopia/repo/content/indexblob"
|
|
)
|
|
|
|
type commandIndexRecover struct {
|
|
blobIDs []string
|
|
blobPrefixes []string
|
|
commit bool
|
|
ignoreErrors bool
|
|
parallel int
|
|
deleteIndexes bool
|
|
|
|
svc appServices
|
|
}
|
|
|
|
func (c *commandIndexRecover) setup(svc appServices, parent commandParent) {
|
|
cmd := parent.Command("recover", "Recover indexes from pack blobs")
|
|
cmd.Flag("blob-prefixes", "Prefixes of pack blobs to recover from (default=all packs)").StringsVar(&c.blobPrefixes)
|
|
cmd.Flag("blobs", "Names of pack blobs to recover from (default=all packs)").StringsVar(&c.blobIDs)
|
|
cmd.Flag("parallel", "Recover parallelism").Default("1").IntVar(&c.parallel)
|
|
cmd.Flag("ignore-errors", "Ignore errors when recovering").BoolVar(&c.ignoreErrors)
|
|
cmd.Flag("delete-indexes", "Delete all indexes before recovering").BoolVar(&c.deleteIndexes)
|
|
cmd.Flag("commit", "Commit recovered content").BoolVar(&c.commit)
|
|
cmd.Action(svc.directRepositoryWriteAction(c.run))
|
|
|
|
c.svc = svc
|
|
}
|
|
|
|
func (c *commandIndexRecover) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
|
c.svc.dangerousCommand()
|
|
|
|
var (
|
|
processedBlobCount atomic.Int32
|
|
recoveredContentCount atomic.Int32
|
|
)
|
|
|
|
defer func() {
|
|
if recoveredContentCount.Load() == 0 {
|
|
log(ctx).Info("No contents recovered.")
|
|
return
|
|
}
|
|
|
|
if !c.commit {
|
|
log(ctx).Infof("Found %v contents to recover from %v blobs, but not committed. Re-run with --commit", recoveredContentCount.Load(), processedBlobCount.Load())
|
|
} else {
|
|
log(ctx).Infof("Recovered %v contents from %v.", recoveredContentCount.Load(), processedBlobCount.Load())
|
|
}
|
|
}()
|
|
|
|
if c.deleteIndexes {
|
|
if err := rep.BlobReader().ListBlobs(ctx, indexblob.V0IndexBlobPrefix, func(bm blob.Metadata) error {
|
|
if c.commit {
|
|
log(ctx).Infof("deleting old index blob: %v", bm.BlobID)
|
|
return errors.Wrap(rep.BlobStorage().DeleteBlob(ctx, bm.BlobID), "error deleting index blob")
|
|
}
|
|
|
|
log(ctx).Infof("would delete old index: %v (pass --commit to approve)", bm.BlobID)
|
|
return nil
|
|
}); err != nil {
|
|
return errors.Wrap(err, "error deleting old indexes")
|
|
}
|
|
}
|
|
|
|
if len(c.blobIDs) == 0 {
|
|
var prefixes []blob.ID
|
|
|
|
if len(c.blobPrefixes) > 0 {
|
|
for _, p := range c.blobPrefixes {
|
|
prefixes = append(prefixes, blob.ID(p))
|
|
}
|
|
} else {
|
|
prefixes = content.PackBlobIDPrefixes
|
|
}
|
|
|
|
return c.recoverIndexesFromAllPacks(ctx, rep, prefixes, &processedBlobCount, &recoveredContentCount)
|
|
}
|
|
|
|
for _, packFile := range c.blobIDs {
|
|
if err := c.recoverIndexFromSinglePackFile(ctx, rep, blob.ID(packFile), 0, &processedBlobCount, &recoveredContentCount); err != nil && !c.ignoreErrors {
|
|
return errors.Wrapf(err, "error recovering index from %v", packFile)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *commandIndexRecover) recoverIndexesFromAllPacks(ctx context.Context, rep repo.DirectRepositoryWriter, prefixes []blob.ID, processedBlobCount, recoveredContentCount *atomic.Int32) error {
|
|
var (
|
|
discoveredBlobCount atomic.Int32
|
|
discoveringBlobCount atomic.Int32
|
|
tt timetrack.Throttle
|
|
)
|
|
|
|
// recover indexes from all pack blobs in parallel.
|
|
// this is actually quite fast since we typically need to read only 8KB from each blob.
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
|
|
go func() {
|
|
for _, prefix := range prefixes {
|
|
//nolint:errcheck
|
|
rep.BlobStorage().ListBlobs(ctx, prefix, func(_ blob.Metadata) error {
|
|
discoveringBlobCount.Add(1)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
discoveredBlobCount.Store(discoveringBlobCount.Load())
|
|
}()
|
|
|
|
est := timetrack.Start()
|
|
|
|
blobCh := make(chan blob.Metadata)
|
|
|
|
// goroutine to populate blob metadata into a channel.
|
|
eg.Go(func() error {
|
|
defer close(blobCh)
|
|
|
|
for _, prefix := range prefixes {
|
|
if err := rep.BlobStorage().ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
|
|
blobCh <- bm
|
|
return nil
|
|
}); err != nil {
|
|
return errors.Wrapf(err, "error listing blobs with prefix %q", prefix)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
// N goroutines to recover from incoming blobs.
|
|
for worker := range c.parallel {
|
|
eg.Go(func() error {
|
|
cnt := 0
|
|
|
|
for bm := range blobCh {
|
|
finishedBlobs := processedBlobCount.Load()
|
|
|
|
log(ctx).Debugf("worker %v got %v", worker, cnt)
|
|
|
|
cnt++
|
|
|
|
if tt.ShouldOutput(time.Second) {
|
|
if disc := discoveredBlobCount.Load(); disc > 0 {
|
|
e, ok := est.Estimate(float64(finishedBlobs), float64(disc))
|
|
if ok {
|
|
log(ctx).Infof("Recovered %v index entries from %v/%v blobs (%.1f %%) %v remaining %v ETA",
|
|
recoveredContentCount.Load(),
|
|
finishedBlobs,
|
|
disc,
|
|
e.PercentComplete,
|
|
e.Remaining,
|
|
formatTimestamp(e.EstimatedEndTime))
|
|
}
|
|
} else {
|
|
log(ctx).Infof("Recovered %v index entries from %v blobs, estimating time remaining... (found %v blobs)",
|
|
recoveredContentCount.Load(),
|
|
finishedBlobs,
|
|
discoveringBlobCount.Load())
|
|
}
|
|
}
|
|
|
|
if err := c.recoverIndexFromSinglePackFile(ctx, rep, bm.BlobID, bm.Length, processedBlobCount, recoveredContentCount); err != nil {
|
|
return errors.Wrapf(err, "error recovering from %v", bm.BlobID)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return errors.Wrap(eg.Wait(), "recovering indexes")
|
|
}
|
|
|
|
func (c *commandIndexRecover) recoverIndexFromSinglePackFile(ctx context.Context, rep repo.DirectRepositoryWriter, blobID blob.ID, length int64, processedBlobCount, recoveredContentCount *atomic.Int32) error {
|
|
log(ctx).Debugf("recovering from %v", blobID)
|
|
|
|
recovered, err := rep.ContentManager().RecoverIndexFromPackBlob(ctx, blobID, length, c.commit)
|
|
if err != nil {
|
|
if c.ignoreErrors {
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrapf(err, "unable to recover index from %v", blobID)
|
|
}
|
|
|
|
recoveredContentCount.Add(int32(len(recovered))) //nolint:gosec
|
|
processedBlobCount.Add(1)
|
|
log(ctx).Debugf("Recovered %v entries from %v (commit=%v)", len(recovered), blobID, c.commit)
|
|
|
|
return nil
|
|
}
|