Files
kopia/cli/command_snapshot_migrate.go
Jarek Kowalski 1f1682b2cc Snapshot checkpointing (#410)
* snapshot: support for periodic checkpointing of snapshots in progress

For each snapshot that takes longer than 45 minutes, we trigger
internal cancellation, save the manifest and restart the snapshot
at which point all files will be cached.

This helps ensure the property that no file or directory objects
in the repository remain unreachable from a snapshot root for more than
one hour, which is important from GC perspective.

* nit: unified spelling 'cancelled' => 'canceled'
2020-04-07 17:54:21 -07:00

307 lines
8.1 KiB
Go

package cli
import (
"context"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)
var (
migrateCommand = snapshotCommands.Command("migrate", "Migrate snapshots from another repository")
migrateSourceConfig = migrateCommand.Flag("source-config", "Configuration file for the source repository").Required().ExistingFile()
migrateSources = migrateCommand.Flag("sources", "List of sources to migrate").Strings()
migrateAll = migrateCommand.Flag("all", "Migrate all sources").Bool()
migratePolicies = migrateCommand.Flag("policies", "Migrate policies too").Default("true").Bool()
migrateOverwritePolicies = migrateCommand.Flag("overwrite-policies", "Overwrite policies").Bool()
migrateLatestOnly = migrateCommand.Flag("latest-only", "Only migrate the latest snapshot").Bool()
migrateIgnoreErrors = migrateCommand.Flag("ignore-errors", "Ignore errors when reading source snapshot").Bool()
migrateParallel = migrateCommand.Flag("parallel", "Number of sources to migrate in parallel").Default("1").Int()
)
func runMigrateCommand(ctx context.Context, destRepo repo.Repository) error {
sourceRepo, err := openSourceRepo(ctx)
if err != nil {
return errors.Wrap(err, "can't open source repository")
}
sources, err := getSourcesToMigrate(ctx, sourceRepo)
if err != nil {
return errors.Wrap(err, "can't retrieve sources")
}
semaphore := make(chan struct{}, *migrateParallel)
var (
wg sync.WaitGroup
mu sync.Mutex
canceled bool
activeUploaders = map[snapshot.SourceInfo]*snapshotfs.Uploader{}
)
progress.StartShared()
onCtrlC(func() {
mu.Lock()
defer mu.Unlock()
if !canceled {
canceled = true
for s, u := range activeUploaders {
log(ctx).Warningf("canceling active uploader for %v", s)
u.Cancel()
}
}
})
if *migratePolicies {
if *migrateAll {
err = migrateAllPolicies(ctx, sourceRepo, destRepo)
} else {
err = migratePoliciesForSources(ctx, sourceRepo, destRepo, sources)
}
if err != nil {
return errors.Wrap(err, "unable to migrate policies")
}
}
for _, s := range sources {
// start a new uploader unless already canceled
mu.Lock()
if canceled {
mu.Unlock()
break
}
uploader := snapshotfs.NewUploader(destRepo)
uploader.Progress = progress
uploader.IgnoreReadErrors = *migrateIgnoreErrors
activeUploaders[s] = uploader
mu.Unlock()
wg.Add(1)
semaphore <- struct{}{}
go func(s snapshot.SourceInfo) {
defer func() {
mu.Lock()
delete(activeUploaders, s)
mu.Unlock()
<-semaphore
wg.Done()
}()
if err := migrateSingleSource(ctx, uploader, sourceRepo, destRepo, s); err != nil {
log(ctx).Warningf("unable to migrate source: %v", err)
}
}(s)
}
wg.Wait()
progress.FinishShared()
printStderr("\r\nMigration finished.\n")
return nil
}
func openSourceRepo(ctx context.Context) (repo.Repository, error) {
pass, ok := repo.GetPersistedPassword(ctx, *migrateSourceConfig)
if !ok {
var err error
if pass, err = getPasswordFromFlags(ctx, false, false); err != nil {
return nil, errors.Wrap(err, "source repository password")
}
}
sourceRepo, err := repo.Open(ctx, *migrateSourceConfig, pass, applyOptionsFromFlags(ctx, nil))
if err != nil {
return nil, errors.Wrap(err, "can't open source repository")
}
return sourceRepo, nil
}
func migratePoliciesForSources(ctx context.Context, sourceRepo, destRepo repo.Repository, sources []snapshot.SourceInfo) error {
for _, si := range sources {
if err := migrateSinglePolicy(ctx, sourceRepo, destRepo, si); err != nil {
return errors.Wrapf(err, "unable to migrate policy for %v", si)
}
}
return nil
}
func migrateAllPolicies(ctx context.Context, sourceRepo, destRepo repo.Repository) error {
policies, err := policy.ListPolicies(ctx, sourceRepo)
if err != nil {
return errors.Wrap(err, "unable to list source policies")
}
for _, pol := range policies {
if err := migrateSinglePolicy(ctx, sourceRepo, destRepo, pol.Target()); err != nil {
log(ctx).Warningf("unable to migrate policy for %v: %v", pol.Target(), err)
}
}
return nil
}
func migrateSinglePolicy(ctx context.Context, sourceRepo, destRepo repo.Repository, si snapshot.SourceInfo) error {
pol, err := policy.GetDefinedPolicy(ctx, sourceRepo, si)
if err == policy.ErrPolicyNotFound {
return nil
}
if err != nil {
return errors.Wrapf(err, "unable to migrate policy for %v", si)
}
_, err = policy.GetDefinedPolicy(ctx, destRepo, si)
if err == nil {
if !*migrateOverwritePolicies {
printStderr("\rpolicy already set for %v\n", si)
// already have destination policy
return nil
}
} else if err != policy.ErrPolicyNotFound {
return errors.Wrapf(err, "unable to migrate policy for %v", si)
}
printStderr("\rmigrating policy for %v\n", si)
return policy.SetPolicy(ctx, destRepo, si, pol)
}
func findPreviousSnapshotManifestWithStartTime(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, startTime time.Time) (*snapshot.Manifest, error) {
previous, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil {
return nil, errors.Wrap(err, "error listing previous snapshots")
}
for _, p := range previous {
if p.StartTime.Equal(startTime) {
return p, nil
}
}
return nil, nil
}
func migrateSingleSource(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo, destRepo repo.Repository, s snapshot.SourceInfo) error {
manifests, err := snapshot.ListSnapshotManifests(ctx, sourceRepo, &s)
if err != nil {
return err
}
snapshots, err := snapshot.LoadSnapshots(ctx, sourceRepo, manifests)
if err != nil {
return errors.Wrapf(err, "unable to load snapshot manifests for %v", s)
}
sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].StartTime.Before(snapshots[j].StartTime)
})
for _, m := range filterSnapshotsToMigrate(snapshots) {
if uploader.IsCanceled() {
break
}
if err := migrateSingleSourceSnapshot(ctx, uploader, sourceRepo, destRepo, s, m); err != nil {
return err
}
}
return nil
}
func migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo, destRepo repo.Repository, s snapshot.SourceInfo, m *snapshot.Manifest) error {
if m.IncompleteReason != "" {
log(ctx).Debugf("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime))
return nil
}
sourceEntry := snapshotfs.DirectoryEntry(sourceRepo, m.RootObjectID(), nil)
existing, err := findPreviousSnapshotManifestWithStartTime(ctx, destRepo, m.Source, m.StartTime)
if err != nil {
return err
}
if existing != nil {
printStderr("\ralready migrated %v at %v\n", s, formatTimestamp(m.StartTime))
return nil
}
printStderr("\rmigrating snapshot of %v at %v\n", s, formatTimestamp(m.StartTime))
previous, err := findPreviousSnapshotManifest(ctx, destRepo, m.Source, &m.StartTime)
if err != nil {
return err
}
var policyTree *policy.Tree
newm, err := uploader.Upload(ctx, sourceEntry, policyTree, m.Source, previous...)
if err != nil {
return errors.Wrapf(err, "error migrating shapshot %v @ %v", m.Source, m.StartTime)
}
newm.StartTime = m.StartTime
newm.EndTime = m.EndTime
newm.Description = m.Description
if newm.IncompleteReason == "" {
if _, err := snapshot.SaveSnapshot(ctx, destRepo, newm); err != nil {
return errors.Wrap(err, "cannot save manifest")
}
}
return nil
}
func filterSnapshotsToMigrate(s []*snapshot.Manifest) []*snapshot.Manifest {
if *migrateLatestOnly && len(s) > 0 {
s = s[0:1]
}
return s
}
func getSourcesToMigrate(ctx context.Context, rep repo.Repository) ([]snapshot.SourceInfo, error) {
if len(*migrateSources) > 0 {
var result []snapshot.SourceInfo
for _, s := range *migrateSources {
si, err := snapshot.ParseSourceInfo(s, rep.Hostname(), rep.Username())
if err != nil {
return nil, err
}
result = append(result, si)
}
return result, nil
}
if *migrateAll {
return snapshot.ListSources(ctx, rep)
}
return nil, errors.New("must specify either --all or --sources")
}
func init() {
migrateCommand.Action(repositoryAction(runMigrateCommand))
}