cli: 'snapshot migrate' improvements to help with data migration

- cleaned up migration progress output
- fixed migration idempotency
- added migration of policies
- renamed --parallelism to --parallel
- improved e2e test
- do not prompt for password to source repository if persisted
This commit is contained in:
Jarek Kowalski
2020-03-06 19:21:43 -08:00
parent d526843124
commit cd35e3bab5
3 changed files with 138 additions and 23 deletions

View File

@@ -35,6 +35,9 @@ type cliProgress struct {
previousFileCount int
previousTotalSize int64
// indicates shared instance that does not reset counters at the beginning of upload.
shared bool
}
func (p *cliProgress) FinishedHashingFile(fname string, totalSize int64) {
@@ -135,7 +138,25 @@ func (p *cliProgress) spinnerCharacter() string {
return s
}
func (p *cliProgress) StartShared() {
*p = cliProgress{
uploading: 1,
uploadStartTime: time.Now(),
shared: true,
}
}
func (p *cliProgress) FinishShared() {
atomic.StoreInt32(&p.uploadFinished, 1)
p.output()
}
func (p *cliProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {
if p.shared {
// do nothing
return
}
*p = cliProgress{
uploading: 1,
uploadStartTime: time.Now(),
@@ -150,6 +171,10 @@ func (p *cliProgress) UploadFinished() {
}
func (p *cliProgress) Finish() {
if p.shared {
return
}
atomic.StoreInt32(&p.uploadFinished, 1)
p.output()
}

View File

@@ -15,22 +15,19 @@
)
var (
migrateCommand = snapshotCommands.Command("migrate", "Migrate snapshots from another repository")
migrateSourceConfig = migrateCommand.Flag("source-config", "Configuration file for the source repository").Required().ExistingFile()
migrateSources = migrateCommand.Flag("sources", "List of sources to migrate").Strings()
migrateAll = migrateCommand.Flag("all", "Migrate all sources").Bool()
migrateLatestOnly = migrateCommand.Flag("latest-only", "Only migrate the latest snapshot").Bool()
migrateIgnoreErrors = migrateCommand.Flag("ignore-errors", "Ignore errors when reading source backup").Bool()
migrateParallelism = migrateCommand.Flag("parallelism", "Number of sources to migrate in parallel").Default("1").Int()
migrateCommand = snapshotCommands.Command("migrate", "Migrate snapshots from another repository")
migrateSourceConfig = migrateCommand.Flag("source-config", "Configuration file for the source repository").Required().ExistingFile()
migrateSources = migrateCommand.Flag("sources", "List of sources to migrate").Strings()
migrateAll = migrateCommand.Flag("all", "Migrate all sources").Bool()
migratePolicies = migrateCommand.Flag("policies", "Migrate policies too").Default("true").Bool()
migrateOverwritePolicies = migrateCommand.Flag("overwrite-policies", "Overwrite policies").Bool()
migrateLatestOnly = migrateCommand.Flag("latest-only", "Only migrate the latest snapshot").Bool()
migrateIgnoreErrors = migrateCommand.Flag("ignore-errors", "Ignore errors when reading source snapshot").Bool()
migrateParallel = migrateCommand.Flag("parallel", "Number of sources to migrate in parallel").Default("1").Int()
)
func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
pass, err := getPasswordFromFlags(ctx, false, false)
if err != nil {
return errors.Wrap(err, "source repository password")
}
sourceRepo, err := repo.Open(ctx, *migrateSourceConfig, pass, applyOptionsFromFlags(ctx, nil))
sourceRepo, err := openSourceRepo(ctx)
if err != nil {
return errors.Wrap(err, "can't open source repository")
}
@@ -40,7 +37,7 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
return errors.Wrap(err, "can't retrieve sources")
}
semaphore := make(chan struct{}, *migrateParallelism)
semaphore := make(chan struct{}, *migrateParallel)
var (
wg sync.WaitGroup
@@ -49,6 +46,8 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
activeUploaders = map[snapshot.SourceInfo]*snapshotfs.Uploader{}
)
progress.StartShared()
onCtrlC(func() {
mu.Lock()
defer mu.Unlock()
@@ -62,6 +61,18 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
}
})
if *migratePolicies {
if *migrateAll {
err = migrateAllPolicies(ctx, sourceRepo, destRepo)
} else {
err = migratePoliciesForSources(ctx, sourceRepo, destRepo, sources)
}
if err != nil {
return errors.Wrap(err, "unable to migrate policies")
}
}
for _, s := range sources {
// start a new uploader unless already canceled
mu.Lock()
@@ -96,18 +107,89 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error {
}
wg.Wait()
progress.FinishShared()
printStderr("\r\nMigration finished.\n")
return nil
}
func openSourceRepo(ctx context.Context) (*repo.Repository, error) {
pass, ok := repo.GetPersistedPassword(ctx, *migrateSourceConfig)
if !ok {
var err error
if pass, err = getPasswordFromFlags(ctx, false, false); err != nil {
return nil, errors.Wrap(err, "source repository password")
}
}
sourceRepo, err := repo.Open(ctx, *migrateSourceConfig, pass, applyOptionsFromFlags(ctx, nil))
if err != nil {
return nil, errors.Wrap(err, "can't open source repository")
}
return sourceRepo, nil
}
func migratePoliciesForSources(ctx context.Context, sourceRepo, destRepo *repo.Repository, sources []snapshot.SourceInfo) error {
for _, si := range sources {
if err := migrateSinglePolicy(ctx, sourceRepo, destRepo, si); err != nil {
return errors.Wrapf(err, "unable to migrate policy for %v", si)
}
}
return nil
}
func migrateAllPolicies(ctx context.Context, sourceRepo, destRepo *repo.Repository) error {
policies, err := policy.ListPolicies(ctx, sourceRepo)
if err != nil {
return errors.Wrap(err, "unable to list source policies")
}
for _, pol := range policies {
if err := migrateSinglePolicy(ctx, sourceRepo, destRepo, pol.Target()); err != nil {
log(ctx).Warningf("unable to migrate policy for %v: %v", pol.Target(), err)
}
}
return nil
}
func migrateSinglePolicy(ctx context.Context, sourceRepo, destRepo *repo.Repository, si snapshot.SourceInfo) error {
pol, err := policy.GetDefinedPolicy(ctx, sourceRepo, si)
if err == policy.ErrPolicyNotFound {
return nil
}
if err != nil {
return errors.Wrapf(err, "unable to migrate policy for %v", si)
}
_, err = policy.GetDefinedPolicy(ctx, destRepo, si)
if err == nil {
if !*migrateOverwritePolicies {
printStderr("\rpolicy already set for %v\n", si)
// already have destination policy
return nil
}
} else if err != policy.ErrPolicyNotFound {
return errors.Wrapf(err, "unable to migrate policy for %v", si)
}
printStderr("\rmigrating policy for %v\n", si)
return policy.SetPolicy(ctx, destRepo, si, pol)
}
func findPreviousSnapshotManifestWithStartTime(ctx context.Context, rep *repo.Repository, sourceInfo snapshot.SourceInfo, startTime time.Time) (*snapshot.Manifest, error) {
previous, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil {
return nil, errors.Wrap(err, "error listing previous backups")
return nil, errors.Wrap(err, "error listing previous snapshots")
}
for _, p := range previous {
if p.StartTime == startTime {
if p.StartTime.Equal(startTime) {
return p, nil
}
}
@@ -145,7 +227,7 @@ func migrateSingleSource(ctx context.Context, uploader *snapshotfs.Uploader, sou
func migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo, destRepo *repo.Repository, s snapshot.SourceInfo, m *snapshot.Manifest) error {
if m.IncompleteReason != "" {
log(ctx).Infof("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime))
log(ctx).Debugf("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime))
return nil
}
@@ -157,11 +239,11 @@ func migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploa
}
if existing != nil {
log(ctx).Infof("already migrated %v at %v", s, formatTimestamp(m.StartTime))
printStderr("\ralready migrated %v at %v\n", s, formatTimestamp(m.StartTime))
return nil
}
log(ctx).Infof("migrating snapshot of %v at %v", s, formatTimestamp(m.StartTime))
printStderr("\rmigrating snapshot of %v at %v\n", s, formatTimestamp(m.StartTime))
previous, err := findPreviousSnapshotManifest(ctx, destRepo, m.Source, &m.StartTime)
if err != nil {

View File

@@ -19,20 +19,28 @@ func TestSnapshotMigrate(t *testing.T) {
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1)
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1)
e.RunAndExpectSuccess(t, "policy", "set", sharedTestDataDir1, "--keep-daily=77")
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir2)
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir2)
e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir3)
e.RunAndExpectSuccess(t, "policy", "set", sharedTestDataDir3, "--keep-daily=88")
sourceSnapshotCount := len(e.RunAndExpectSuccess(t, "snapshot", "list", ".", "-a"))
sourcePolicyCount := len(e.RunAndExpectSuccess(t, "policy", "list"))
dstenv := testenv.NewCLITest(t)
defer dstenv.Cleanup(t)
dstenv.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", dstenv.RepoDir)
dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all")
// migrate again, which should be a no-op.
dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all")
sourceSnapshotCount := len(e.RunAndExpectSuccess(t, "snapshot", "list", ".", "-a"))
dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all", "--parallel=5")
dstenv.RunAndVerifyOutputLineCount(t, sourceSnapshotCount, "snapshot", "list", ".", "-a")
dstenv.RunAndVerifyOutputLineCount(t, sourcePolicyCount, "policy", "list")
// migrate again, which should be a no-op, and should not create any more policies/snapshots
dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all")
dstenv.RunAndVerifyOutputLineCount(t, sourceSnapshotCount, "snapshot", "list", ".", "-a")
dstenv.RunAndVerifyOutputLineCount(t, sourcePolicyCount, "policy", "list")
}