Files
kopia/cli/command_object_verify.go
2018-11-03 11:44:19 -07:00

267 lines
7.3 KiB
Go

package cli
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sync"
"time"
"github.com/kopia/kopia/internal/parallelwork"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/object"
)
var (
verifyCommand = app.Command("verify", "Verify the contents of stored object")
verifyCommandErrorThreshold = verifyCommand.Flag("max-errors", "Maximum number of errors before stopping").Default("0").Int()
verifyCommandDirObjectIDs = verifyCommand.Flag("directory-id", "Directory object IDs to verify").Strings()
verifyCommandFileObjectIDs = verifyCommand.Flag("file-id", "File object IDs to verify").Strings()
verifyCommandAllSources = verifyCommand.Flag("all-sources", "Verify all snapshots").Bool()
verifyCommandSources = verifyCommand.Flag("sources", "Verify the provided sources").Strings()
verifyCommandParallel = verifyCommand.Flag("parallel", "Parallelization").Default("16").Int()
verifyCommandFilesPercent = verifyCommand.Flag("verify-files-percent", "Randomly verify a percentage of files").Default("0").Int()
)
type verifier struct {
rep *repo.Repository
om *object.Manager
workQueue *parallelwork.Queue
startTime time.Time
mu sync.Mutex
seen map[object.ID]bool
errors []error
}
func (v *verifier) progressCallback(enqueued, active, completed int64) {
elapsed := time.Since(v.startTime)
maybeTimeRemaining := ""
if elapsed > 1*time.Second && enqueued > 0 && completed > 0 {
completedRatio := float64(completed) / float64(enqueued)
predictedSeconds := elapsed.Seconds() / completedRatio
predictedEndTime := v.startTime.Add(time.Duration(predictedSeconds) * time.Second)
dt := time.Until(predictedEndTime)
if dt > 0 {
maybeTimeRemaining = fmt.Sprintf(" remaining %v (ETA %v)", dt.Truncate(1*time.Second), formatTimestamp(predictedEndTime.Truncate(1*time.Second)))
}
}
printStderr("Found %v objects, verifying %v, completed %v objects%v.\n", enqueued, active, completed, maybeTimeRemaining)
}
func (v *verifier) tooManyErrors() bool {
v.mu.Lock()
defer v.mu.Unlock()
if *verifyCommandErrorThreshold == 0 {
return false
}
return len(v.errors) >= *verifyCommandErrorThreshold
}
func (v *verifier) reportError(path string, err error) bool {
v.mu.Lock()
defer v.mu.Unlock()
log.Warningf("failed on %v: %v", path, err)
v.errors = append(v.errors, err)
return len(v.errors) >= *verifyCommandErrorThreshold
}
func (v *verifier) shouldEnqueue(oid object.ID) bool {
v.mu.Lock()
defer v.mu.Unlock()
if v.seen[oid] {
return false
}
v.seen[oid] = true
return true
}
func (v *verifier) enqueueVerifyDirectory(ctx context.Context, oid object.ID, path string) {
// push to the front of the queue, so that we quickly discover all directories to get reliable ETA.
if !v.shouldEnqueue(oid) {
return
}
v.workQueue.EnqueueFront(func() {
v.doVerifyDirectory(ctx, oid, path)
})
}
func (v *verifier) enqueueVerifyObject(ctx context.Context, oid object.ID, path string, expectedLength int64) {
// push to the back of the queue, so that we process non-directories at the end.
if !v.shouldEnqueue(oid) {
return
}
v.workQueue.EnqueueBack(func() {
v.doVerifyObject(ctx, oid, path, expectedLength)
})
}
func (v *verifier) doVerifyDirectory(ctx context.Context, oid object.ID, path string) {
log.Debugf("verifying directory %q (%v)", path, oid)
d := snapshotfs.DirectoryEntry(v.rep, oid, nil)
entries, err := d.Readdir(ctx)
if err != nil {
v.reportError(path, fmt.Errorf("error reading %v: %v", oid, err))
return
}
for _, e := range entries {
if v.tooManyErrors() {
break
}
objectID := e.(object.HasObjectID).ObjectID()
childPath := path + "/" + e.Name()
if e.IsDir() {
v.enqueueVerifyDirectory(ctx, objectID, childPath)
} else {
v.enqueueVerifyObject(ctx, objectID, childPath, e.Size())
}
}
}
func (v *verifier) doVerifyObject(ctx context.Context, oid object.ID, path string, expectedLength int64) {
if expectedLength < 0 {
log.Debugf("verifying object %v", oid)
} else {
log.Debugf("verifying object %v (%v) with length %v", path, oid, expectedLength)
}
var length int64
var err error
length, _, err = v.om.VerifyObject(ctx, oid)
if err != nil {
v.reportError(path, fmt.Errorf("error verifying %v: %v", oid, err))
}
if expectedLength >= 0 && length != expectedLength {
v.reportError(path, fmt.Errorf("invalid object length %q, %v, expected %v", oid, length, expectedLength))
}
if rand.Intn(100) < *verifyCommandFilesPercent {
if err := v.readEntireObject(ctx, oid, path); err != nil {
v.reportError(path, fmt.Errorf("error reading object %v: %v", oid, err))
}
}
}
func (v *verifier) readEntireObject(ctx context.Context, oid object.ID, path string) error {
log.Debugf("reading object %v %v", oid, path)
ctx = block.UsingBlockCache(ctx, false)
// also read the entire file
r, err := v.om.Open(ctx, oid)
if err != nil {
return err
}
defer r.Close() //nolint:errcheck
_, err = io.Copy(ioutil.Discard, r)
return err
}
func runVerifyCommand(ctx context.Context, rep *repo.Repository) error {
v := &verifier{
rep: rep,
om: rep.Objects,
startTime: time.Now(),
workQueue: parallelwork.NewQueue(),
seen: map[object.ID]bool{},
}
if err := enqueueRootsToVerify(ctx, v, rep); err != nil {
return err
}
v.workQueue.ProgressCallback = v.progressCallback
v.workQueue.Process(*verifyCommandParallel)
if len(v.errors) == 0 {
return nil
}
return fmt.Errorf("encountered %v errors", len(v.errors))
}
func enqueueRootsToVerify(ctx context.Context, v *verifier, rep *repo.Repository) error {
manifests, err := loadSourceManifests(ctx, rep, *verifyCommandAllSources, *verifyCommandSources)
if err != nil {
return err
}
for _, man := range manifests {
path := fmt.Sprintf("%v@%v", man.Source, formatTimestamp(man.StartTime))
if man.RootEntry == nil {
continue
}
if man.RootEntry.Type == snapshot.EntryTypeDirectory {
v.enqueueVerifyDirectory(ctx, man.RootObjectID(), path)
} else {
v.enqueueVerifyObject(ctx, man.RootObjectID(), path, -1)
}
}
for _, oidStr := range *verifyCommandDirObjectIDs {
oid, err := parseObjectID(ctx, rep, oidStr)
if err != nil {
return err
}
v.enqueueVerifyDirectory(ctx, oid, oidStr)
}
for _, oidStr := range *verifyCommandFileObjectIDs {
oid, err := parseObjectID(ctx, rep, oidStr)
if err != nil {
return err
}
v.enqueueVerifyObject(ctx, oid, oidStr, -1)
}
return nil
}
func loadSourceManifests(ctx context.Context, rep *repo.Repository, all bool, sources []string) ([]*snapshot.Manifest, error) {
var manifestIDs []string
if *verifyCommandAllSources {
man, err := snapshot.ListSnapshotManifests(ctx, rep, nil)
if err != nil {
return nil, err
}
manifestIDs = append(manifestIDs, man...)
} else {
for _, srcStr := range *verifyCommandSources {
src, err := snapshot.ParseSourceInfo(srcStr, getHostName(), getUserName())
if err != nil {
return nil, fmt.Errorf("error parsing %q: %v", srcStr, err)
}
man, err := snapshot.ListSnapshotManifests(ctx, rep, &src)
if err != nil {
return nil, err
}
manifestIDs = append(manifestIDs, man...)
}
}
return snapshot.LoadSnapshots(ctx, rep, manifestIDs)
}
func init() {
verifyCommand.Action(repositoryAction(runVerifyCommand))
}