From cd35e3bab5ded86b986c6be23146fd99fa9fd9ac Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 6 Mar 2020 19:21:43 -0800 Subject: [PATCH] cli: 'snapshot migrate' improvements to help with data migration - cleaned up migration progress output - fixed migration idempotency - added migration of policies - renamed --parallelism to --parallel - improved e2e test - do not prompt for password to source repository if persisted --- cli/cli_progress.go | 25 ++++ cli/command_snapshot_migrate.go | 120 +++++++++++++++--- .../end_to_end_test/snapshot_migrate_test.go | 16 ++- 3 files changed, 138 insertions(+), 23 deletions(-) diff --git a/cli/cli_progress.go b/cli/cli_progress.go index aa51c2073..1d2bfc449 100644 --- a/cli/cli_progress.go +++ b/cli/cli_progress.go @@ -35,6 +35,9 @@ type cliProgress struct { previousFileCount int previousTotalSize int64 + + // indicates shared instance that does not reset counters at the beginning of upload. + shared bool } func (p *cliProgress) FinishedHashingFile(fname string, totalSize int64) { @@ -135,7 +138,25 @@ func (p *cliProgress) spinnerCharacter() string { return s } +func (p *cliProgress) StartShared() { + *p = cliProgress{ + uploading: 1, + uploadStartTime: time.Now(), + shared: true, + } +} + +func (p *cliProgress) FinishShared() { + atomic.StoreInt32(&p.uploadFinished, 1) + p.output() +} + func (p *cliProgress) UploadStarted(previousFileCount int, previousTotalSize int64) { + if p.shared { + // do nothing + return + } + *p = cliProgress{ uploading: 1, uploadStartTime: time.Now(), @@ -150,6 +171,10 @@ func (p *cliProgress) UploadFinished() { } func (p *cliProgress) Finish() { + if p.shared { + return + } + atomic.StoreInt32(&p.uploadFinished, 1) p.output() } diff --git a/cli/command_snapshot_migrate.go b/cli/command_snapshot_migrate.go index a38d6bb77..fa8ad8c25 100644 --- a/cli/command_snapshot_migrate.go +++ b/cli/command_snapshot_migrate.go @@ -15,22 +15,19 @@ ) var ( - migrateCommand = snapshotCommands.Command("migrate", "Migrate snapshots from another repository") - migrateSourceConfig = migrateCommand.Flag("source-config", "Configuration file for the source repository").Required().ExistingFile() - migrateSources = migrateCommand.Flag("sources", "List of sources to migrate").Strings() - migrateAll = migrateCommand.Flag("all", "Migrate all sources").Bool() - migrateLatestOnly = migrateCommand.Flag("latest-only", "Only migrate the latest snapshot").Bool() - migrateIgnoreErrors = migrateCommand.Flag("ignore-errors", "Ignore errors when reading source backup").Bool() - migrateParallelism = migrateCommand.Flag("parallelism", "Number of sources to migrate in parallel").Default("1").Int() + migrateCommand = snapshotCommands.Command("migrate", "Migrate snapshots from another repository") + migrateSourceConfig = migrateCommand.Flag("source-config", "Configuration file for the source repository").Required().ExistingFile() + migrateSources = migrateCommand.Flag("sources", "List of sources to migrate").Strings() + migrateAll = migrateCommand.Flag("all", "Migrate all sources").Bool() + migratePolicies = migrateCommand.Flag("policies", "Migrate policies too").Default("true").Bool() + migrateOverwritePolicies = migrateCommand.Flag("overwrite-policies", "Overwrite policies").Bool() + migrateLatestOnly = migrateCommand.Flag("latest-only", "Only migrate the latest snapshot").Bool() + migrateIgnoreErrors = migrateCommand.Flag("ignore-errors", "Ignore errors when reading source snapshot").Bool() + migrateParallel = migrateCommand.Flag("parallel", "Number of sources to migrate in parallel").Default("1").Int() ) func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { - pass, err := getPasswordFromFlags(ctx, false, false) - if err != nil { - return errors.Wrap(err, "source repository password") - } - - sourceRepo, err := repo.Open(ctx, *migrateSourceConfig, pass, applyOptionsFromFlags(ctx, nil)) + sourceRepo, err := openSourceRepo(ctx) if err != nil { return errors.Wrap(err, "can't open source repository") } @@ -40,7 +37,7 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { return errors.Wrap(err, "can't retrieve sources") } - semaphore := make(chan struct{}, *migrateParallelism) + semaphore := make(chan struct{}, *migrateParallel) var ( wg sync.WaitGroup @@ -49,6 +46,8 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { activeUploaders = map[snapshot.SourceInfo]*snapshotfs.Uploader{} ) + progress.StartShared() + onCtrlC(func() { mu.Lock() defer mu.Unlock() @@ -62,6 +61,18 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { } }) + if *migratePolicies { + if *migrateAll { + err = migrateAllPolicies(ctx, sourceRepo, destRepo) + } else { + err = migratePoliciesForSources(ctx, sourceRepo, destRepo, sources) + } + + if err != nil { + return errors.Wrap(err, "unable to migrate policies") + } + } + for _, s := range sources { // start a new uploader unless already canceled mu.Lock() @@ -96,18 +107,89 @@ func runMigrateCommand(ctx context.Context, destRepo *repo.Repository) error { } wg.Wait() + progress.FinishShared() + printStderr("\r\nMigration finished.\n") return nil } +func openSourceRepo(ctx context.Context) (*repo.Repository, error) { + pass, ok := repo.GetPersistedPassword(ctx, *migrateSourceConfig) + if !ok { + var err error + + if pass, err = getPasswordFromFlags(ctx, false, false); err != nil { + return nil, errors.Wrap(err, "source repository password") + } + } + + sourceRepo, err := repo.Open(ctx, *migrateSourceConfig, pass, applyOptionsFromFlags(ctx, nil)) + if err != nil { + return nil, errors.Wrap(err, "can't open source repository") + } + + return sourceRepo, nil +} + +func migratePoliciesForSources(ctx context.Context, sourceRepo, destRepo *repo.Repository, sources []snapshot.SourceInfo) error { + for _, si := range sources { + if err := migrateSinglePolicy(ctx, sourceRepo, destRepo, si); err != nil { + return errors.Wrapf(err, "unable to migrate policy for %v", si) + } + } + + return nil +} + +func migrateAllPolicies(ctx context.Context, sourceRepo, destRepo *repo.Repository) error { + policies, err := policy.ListPolicies(ctx, sourceRepo) + if err != nil { + return errors.Wrap(err, "unable to list source policies") + } + + for _, pol := range policies { + if err := migrateSinglePolicy(ctx, sourceRepo, destRepo, pol.Target()); err != nil { + log(ctx).Warningf("unable to migrate policy for %v: %v", pol.Target(), err) + } + } + + return nil +} + +func migrateSinglePolicy(ctx context.Context, sourceRepo, destRepo *repo.Repository, si snapshot.SourceInfo) error { + pol, err := policy.GetDefinedPolicy(ctx, sourceRepo, si) + if err == policy.ErrPolicyNotFound { + return nil + } + + if err != nil { + return errors.Wrapf(err, "unable to migrate policy for %v", si) + } + + _, err = policy.GetDefinedPolicy(ctx, destRepo, si) + if err == nil { + if !*migrateOverwritePolicies { + printStderr("\rpolicy already set for %v\n", si) + // already have destination policy + return nil + } + } else if err != policy.ErrPolicyNotFound { + return errors.Wrapf(err, "unable to migrate policy for %v", si) + } + + printStderr("\rmigrating policy for %v\n", si) + + return policy.SetPolicy(ctx, destRepo, si, pol) +} + func findPreviousSnapshotManifestWithStartTime(ctx context.Context, rep *repo.Repository, sourceInfo snapshot.SourceInfo, startTime time.Time) (*snapshot.Manifest, error) { previous, err := snapshot.ListSnapshots(ctx, rep, sourceInfo) if err != nil { - return nil, errors.Wrap(err, "error listing previous backups") + return nil, errors.Wrap(err, "error listing previous snapshots") } for _, p := range previous { - if p.StartTime == startTime { + if p.StartTime.Equal(startTime) { return p, nil } } @@ -145,7 +227,7 @@ func migrateSingleSource(ctx context.Context, uploader *snapshotfs.Uploader, sou func migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploader, sourceRepo, destRepo *repo.Repository, s snapshot.SourceInfo, m *snapshot.Manifest) error { if m.IncompleteReason != "" { - log(ctx).Infof("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime)) + log(ctx).Debugf("ignoring incomplete %v at %v", s, formatTimestamp(m.StartTime)) return nil } @@ -157,11 +239,11 @@ func migrateSingleSourceSnapshot(ctx context.Context, uploader *snapshotfs.Uploa } if existing != nil { - log(ctx).Infof("already migrated %v at %v", s, formatTimestamp(m.StartTime)) + printStderr("\ralready migrated %v at %v\n", s, formatTimestamp(m.StartTime)) return nil } - log(ctx).Infof("migrating snapshot of %v at %v", s, formatTimestamp(m.StartTime)) + printStderr("\rmigrating snapshot of %v at %v\n", s, formatTimestamp(m.StartTime)) previous, err := findPreviousSnapshotManifest(ctx, destRepo, m.Source, &m.StartTime) if err != nil { diff --git a/tests/end_to_end_test/snapshot_migrate_test.go b/tests/end_to_end_test/snapshot_migrate_test.go index 7829a130a..b429dcafd 100644 --- a/tests/end_to_end_test/snapshot_migrate_test.go +++ b/tests/end_to_end_test/snapshot_migrate_test.go @@ -19,20 +19,28 @@ func TestSnapshotMigrate(t *testing.T) { e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1) e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir1) + e.RunAndExpectSuccess(t, "policy", "set", sharedTestDataDir1, "--keep-daily=77") e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir2) e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir2) e.RunAndExpectSuccess(t, "snapshot", "create", sharedTestDataDir3) + e.RunAndExpectSuccess(t, "policy", "set", sharedTestDataDir3, "--keep-daily=88") + + sourceSnapshotCount := len(e.RunAndExpectSuccess(t, "snapshot", "list", ".", "-a")) + sourcePolicyCount := len(e.RunAndExpectSuccess(t, "policy", "list")) dstenv := testenv.NewCLITest(t) defer dstenv.Cleanup(t) dstenv.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", dstenv.RepoDir) - dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all") - // migrate again, which should be a no-op. - dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all") - sourceSnapshotCount := len(e.RunAndExpectSuccess(t, "snapshot", "list", ".", "-a")) + dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all", "--parallel=5") dstenv.RunAndVerifyOutputLineCount(t, sourceSnapshotCount, "snapshot", "list", ".", "-a") + dstenv.RunAndVerifyOutputLineCount(t, sourcePolicyCount, "policy", "list") + + // migrate again, which should be a no-op, and should not create any more policies/snapshots + dstenv.RunAndExpectSuccess(t, "snapshot", "migrate", "--source-config", filepath.Join(e.ConfigDir, ".kopia.config"), "--all") + dstenv.RunAndVerifyOutputLineCount(t, sourceSnapshotCount, "snapshot", "list", ".", "-a") + dstenv.RunAndVerifyOutputLineCount(t, sourcePolicyCount, "policy", "list") }