mirror of
https://github.com/kopia/kopia.git
synced 2026-01-22 13:27:57 -05:00
Move general functionality from the `content verify` CLI command implementation to helpers in the content package. The primary motivation is to allow reusing the content verification functionality during maintenance. A separate followup change also extends content verification to include additional stats useful for debugging repository corruptions. Overview of the changes: - Relocation of the content verification functionality to the content package. The entry point is content.WriteManager.VerifyContents. This is primarily code movement with no functional changes. - Addition of unit tests for the content verification functionality by exercising content.WriteManager.VerifyContents. - Minor functional change: changing the logging level from Error to Warn for the "inner loop" error messages. This allows filtering out these messages if needed, while still observing the error message that is logged for the overall operation.
127 lines
3.4 KiB
Go
127 lines
3.4 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/kopia/kopia/internal/timetrack"
|
|
"github.com/kopia/kopia/repo"
|
|
"github.com/kopia/kopia/repo/content"
|
|
)
|
|
|
|
type commandContentVerify struct {
|
|
contentVerifyParallel int
|
|
contentVerifyFull bool
|
|
contentVerifyIncludeDeleted bool
|
|
contentVerifyPercent float64
|
|
progressInterval time.Duration
|
|
|
|
contentRange contentRangeFlags
|
|
}
|
|
|
|
func (c *commandContentVerify) setup(svc appServices, parent commandParent) {
|
|
cmd := parent.Command("verify", "Verify that each content is backed by a valid blob")
|
|
|
|
cmd.Flag("parallel", "Parallelism").Default("16").IntVar(&c.contentVerifyParallel)
|
|
cmd.Flag("full", "Full verification (including download)").BoolVar(&c.contentVerifyFull)
|
|
cmd.Flag("include-deleted", "Include deleted contents").BoolVar(&c.contentVerifyIncludeDeleted)
|
|
cmd.Flag("download-percent", "Download a percentage of files [0.0 .. 100.0]").Float64Var(&c.contentVerifyPercent)
|
|
cmd.Flag("progress-interval", "Progress output interval").Default("3s").DurationVar(&c.progressInterval)
|
|
c.contentRange.setup(cmd)
|
|
cmd.Action(svc.directRepositoryReadAction(c.run))
|
|
}
|
|
|
|
func (c *commandContentVerify) run(ctx context.Context, rep repo.DirectRepository) error {
|
|
var (
|
|
totalCount atomic.Int32
|
|
|
|
wg sync.WaitGroup
|
|
)
|
|
|
|
subctx, cancel := context.WithCancel(ctx)
|
|
|
|
// ensure we cancel estimation goroutine and wait for it before returning
|
|
defer func() {
|
|
cancel()
|
|
wg.Wait()
|
|
}()
|
|
|
|
// start a goroutine that will populate totalCount
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
c.getTotalContentCount(subctx, rep, &totalCount)
|
|
}()
|
|
|
|
rep.DisableIndexRefresh()
|
|
|
|
var throttle timetrack.Throttle
|
|
|
|
est := timetrack.Start()
|
|
|
|
if c.contentVerifyFull {
|
|
c.contentVerifyPercent = 100.0
|
|
}
|
|
|
|
opts := content.VerifyOptions{
|
|
ContentIDRange: c.contentRange.contentIDRange(),
|
|
ContentReadPercentage: c.contentVerifyPercent,
|
|
IncludeDeletedContents: c.contentVerifyIncludeDeleted,
|
|
ContentIterateParallelism: c.contentVerifyParallel,
|
|
ProgressCallbackInterval: 1,
|
|
|
|
ProgressCallback: func(vps content.VerifyProgressStats) {
|
|
if !throttle.ShouldOutput(c.progressInterval) {
|
|
return
|
|
}
|
|
|
|
verifiedCount := vps.SuccessCount + vps.ErrorCount
|
|
timings, ok := est.Estimate(float64(verifiedCount), float64(totalCount.Load()))
|
|
if ok {
|
|
log(ctx).Infof(" Verified %v of %v contents (%.1f%%), %v errors, remaining %v, ETA %v",
|
|
verifiedCount,
|
|
totalCount.Load(),
|
|
timings.PercentComplete,
|
|
vps.ErrorCount,
|
|
timings.Remaining,
|
|
formatTimestamp(timings.EstimatedEndTime),
|
|
)
|
|
} else {
|
|
log(ctx).Infof(" Verified %v contents, %v errors, estimating...", verifiedCount, vps.ErrorCount)
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := rep.ContentReader().VerifyContents(ctx, opts); err != nil {
|
|
return errors.Wrap(err, "verify contents")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *commandContentVerify) getTotalContentCount(ctx context.Context, rep repo.DirectRepository, totalCount *atomic.Int32) {
|
|
var tc int32
|
|
|
|
if err := rep.ContentReader().IterateContents(ctx, content.IterateOptions{
|
|
Range: c.contentRange.contentIDRange(),
|
|
IncludeDeleted: c.contentVerifyIncludeDeleted,
|
|
}, func(_ content.Info) error {
|
|
if err := ctx.Err(); err != nil {
|
|
return errors.Wrap(err, "context error")
|
|
}
|
|
|
|
tc++
|
|
return nil
|
|
}); err != nil {
|
|
log(ctx).Debugf("error estimating content count: %v", err)
|
|
return
|
|
}
|
|
|
|
totalCount.Store(tc)
|
|
}
|