fixup tests, refactor for index verification

This commit is contained in:
Aaron H. Alpar
2022-12-05 09:57:28 -08:00
parent 3a4e664e60
commit 1b653b071b
3 changed files with 72 additions and 34 deletions

View File

@@ -73,7 +73,7 @@ func (c *commandRepositoryUpgrade) setup(svc advancedAppServices, parent command
// If the lock is fully established then perform the upgrade.
beginCmd.Action(svc.directRepositoryWriteAction(c.runPhase(c.upgrade)))
// Validate index upgrade success
beginCmd.Action(svc.directRepositoryWriteAction(c.runPhase(c.validateAction)))
beginCmd.Action(svc.directRepositoryWriteAction(c.runPhase(c.ignoreErrorOnAlwaysCommit(c.validateAction))))
// Commit the upgrade and revoke the lock, this will also cleanup any
// backups used for rollback.
beginCmd.Action(svc.directRepositoryWriteAction(c.runPhase(c.commitUpgrade)))
@@ -97,7 +97,7 @@ func assign(iif content.Info, i int, m map[content.ID][2]index.Info) {
m[iif.GetContentID()] = v
}
// loadIndexBlobs load index blobs into indexEntries map.
// loadIndexBlobs load index blobs into indexEntries map. indexEntries map will allow comparison betweel two indexes (index at which == 0 and index at which == 1).
func loadIndexBlobs(ctx context.Context, indexEntries map[content.ID][2]index.Info, sm *content.SharedManager, which int, indexBlobInfos []content.IndexBlobInfo) error {
d := gather.WriteBuffer{}
@@ -131,7 +131,8 @@ func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.
indexBlobInfos1, _, err := sm.IndexReaderV1().ListIndexBlobInfos(ctx)
if err != nil {
return errors.Wrapf(err, "failed to list index blobs for new index")
log(ctx).Errorf("failed to list index blobs for new index. upgrade may have failed.: %v", err)
return nil
}
if len(indexBlobInfos0) == 0 && len(indexBlobInfos1) > 0 {
@@ -139,6 +140,8 @@ func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.
return nil
}
// load index blobs into their appropriate positions inside the indexEntries map
err = loadIndexBlobs(ctx, indexEntries, sm, 0, indexBlobInfos0)
if err != nil {
return errors.Wrapf(err, "failed to load index entries for v0 index entry")
@@ -149,62 +152,73 @@ func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.
return errors.Wrapf(err, "failed to load index entries for new index")
}
var msgs []string
for contentID, indexEntryPairs := range indexEntries {
if indexEntryPairs[0] == nil || indexEntryPairs[1] == nil {
err = errors.Errorf("lop-sided index entries for contentID %q", contentID)
} else {
err = checkIndexInfo(indexEntryPairs[0], indexEntryPairs[1])
iep0 := indexEntryPairs[0]
iep1 := indexEntryPairs[1]
// compare two index blobs for the same content ID
if iep0 != nil && iep1 != nil {
msgs = append(msgs, checkIndexInfo(iep0, iep1)...)
continue
}
if err != nil {
break
if iep0 != nil {
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep0.GetPackBlobID()))
continue
}
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep1.GetPackBlobID()))
}
if err == nil {
if len(msgs) == 0 {
log(ctx).Infof("index validation succeeded")
return nil
}
err = errors.Wrap(err, "repository will remain locked until index differences are resolved")
if c.commitMode == commitModeAlwaysCommit {
log(ctx).Errorf("%v", err)
return nil
log(ctx).Error("inconsistencies found in migrated index:")
for _, m := range msgs {
log(ctx).Error(m)
}
return err
return errors.Wrap(err, "repository will remain locked until index differences are resolved")
}
// checkIndexInfo compare two index infos. If a mismatch exists, return an error with diagnostic information.
func checkIndexInfo(i0, i1 index.Info) error {
var err error
func checkIndexInfo(i0, i1 index.Info) []string {
var q []string
switch {
case i0.GetFormatVersion() != i1.GetFormatVersion():
err = errors.Errorf("mismatched FormatVersions: %v %v", i0.GetFormatVersion(), i1.GetFormatVersion())
q = append(q, fmt.Sprintf("mismatched FormatVersions: %v %v", i0.GetFormatVersion(), i1.GetFormatVersion()))
case i0.GetOriginalLength() != i1.GetOriginalLength():
err = errors.Errorf("mismatched OriginalLengths: %v %v", i0.GetOriginalLength(), i1.GetOriginalLength())
q = append(q, fmt.Sprintf("mismatched OriginalLengths: %v %v", i0.GetOriginalLength(), i1.GetOriginalLength()))
case i0.GetPackBlobID() != i1.GetPackBlobID():
err = errors.Errorf("mismatched PackBlobIDs: %v %v", i0.GetPackBlobID(), i1.GetPackBlobID())
q = append(q, fmt.Sprintf("mismatched PackBlobIDs: %v %v", i0.GetPackBlobID(), i1.GetPackBlobID()))
case i0.GetPackedLength() != i1.GetPackedLength():
err = errors.Errorf("mismatched PackedLengths: %v %v", i0.GetPackedLength(), i1.GetPackedLength())
q = append(q, fmt.Sprintf("mismatched PackedLengths: %v %v", i0.GetPackedLength(), i1.GetPackedLength()))
case i0.GetPackOffset() != i1.GetPackOffset():
err = errors.Errorf("mismatched PackOffsets: %v %v", i0.GetPackOffset(), i1.GetPackOffset())
q = append(q, fmt.Sprintf("mismatched PackOffsets: %v %v", i0.GetPackOffset(), i1.GetPackOffset()))
case i0.GetEncryptionKeyID() != i1.GetEncryptionKeyID():
err = errors.Errorf("mismatched EncryptionKeyIDs: %v %v", i0.GetEncryptionKeyID(), i1.GetEncryptionKeyID())
q = append(q, fmt.Sprintf("mismatched EncryptionKeyIDs: %v %v", i0.GetEncryptionKeyID(), i1.GetEncryptionKeyID()))
case i0.GetDeleted() != i1.GetDeleted():
err = errors.Errorf("mismatched Deleted flags: %v %v", i0.GetDeleted(), i1.GetDeleted())
q = append(q, fmt.Sprintf("mismatched Deleted flags: %v %v", i0.GetDeleted(), i1.GetDeleted()))
case i0.GetTimestampSeconds() != i1.GetTimestampSeconds():
err = errors.Errorf("mismatched TimestampSeconds: %v %v", i0.GetTimestampSeconds(), i1.GetTimestampSeconds())
q = append(q, fmt.Sprintf("mismatched TimestampSeconds: %v %v", i0.GetTimestampSeconds(), i1.GetTimestampSeconds()))
}
if err != nil {
return errors.Wrapf(err, "index blobs do not match: %v, %v",
string(i0.GetPackBlobID()),
string(i1.GetPackBlobID()))
if len(q) == 0 {
return nil
}
return nil
for i := range q {
q[i] = fmt.Sprintf("index blobs do not match: %q, %q: %s", i0.GetPackBlobID(), i1.GetPackBlobID(), q[i])
}
return q
}
func (c *commandRepositoryUpgrade) forceRollbackAction(ctx context.Context, rep repo.DirectRepositoryWriter) error {
@@ -241,6 +255,29 @@ func (c *commandRepositoryUpgrade) runPhase(act func(context.Context, repo.Direc
}
}
func (c *commandRepositoryUpgrade) ignoreErrorOnAlwaysCommit(act func(context.Context, repo.DirectRepositoryWriter) error) func(context.Context, repo.DirectRepositoryWriter) error {
return func(ctx context.Context, rep repo.DirectRepositoryWriter) error {
err := act(ctx, rep)
if err == nil {
return nil
}
if c.commitMode == commitModeAlwaysCommit {
log(ctx).Errorf("%v", err)
return nil
}
// Explicitly skip all stages on error because tests do not
// skip/exit on error. Tests override os.Exit() that prevents
// running rest of the phases until we set the skip flag here.
// This flag is designed for testability and also to support
// rollback.
c.skip = true
return err
}
}
// setLockIntent is an upgrade phase which sets the upgrade lock intent with
// desired parameters.
func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.DirectRepositoryWriter) error {

View File

@@ -63,9 +63,11 @@ func (s *formatSpecificTestSuite) TestRepositoryCorruptedUpgrade(t *testing.T) {
switch s.formatVersion {
case format.FormatVersion1:
require.Contains(t, out, "Format version: 1")
// run upgrade first with commit-mode set to never. this leaves the lock and new index intact so that
// the file can be corrupted with "TweakFile".
_, stderr := env.RunAndExpectSuccessWithErrOut(t, "repository", "upgrade",
"--commit-mode", "never",
"--upgrade-owner-id", "owner",
"--commit-mode", "never",
"--io-drain-timeout", "1s", "--allow-unsafe-upgrade",
"--status-poll-interval", "1s",
"--max-permitted-clock-drift", "1s")
@@ -73,13 +75,13 @@ func (s *formatSpecificTestSuite) TestRepositoryCorruptedUpgrade(t *testing.T) {
require.Contains(t, stderr, "Commit mode is set to 'never'. Skipping commit.")
require.Contains(t, stderr, "index validation succeeded")
env.TweakFile(t, env.RepoDir, "x*/*/*.f")
// then re-run the upgrade with the corrupted index. This should fail on index validation.
_, stderr = env.RunAndExpectFailure(t, "repository", "upgrade",
"--upgrade-owner-id", "owner")
require.Regexp(t, "failed to load index entries for new index: failed to load index blob with BlobID", stderr)
case format.FormatVersion2:
require.Contains(t, out, "Format version: 2")
_, stderr := env.RunAndExpectSuccessWithErrOut(t, "repository", "upgrade",
"--commit-mode", "never",
"--upgrade-owner-id", "owner",
"--io-drain-timeout", "1s", "--allow-unsafe-upgrade",
"--status-poll-interval", "1s",
@@ -89,7 +91,6 @@ func (s *formatSpecificTestSuite) TestRepositoryCorruptedUpgrade(t *testing.T) {
default:
require.Contains(t, out, "Format version: 3")
env.RunAndExpectFailure(t, "repository", "upgrade",
"--commit-mode", "never",
"--upgrade-owner-id", "owner",
"--io-drain-timeout", "1s", "--allow-unsafe-upgrade",
"--status-poll-interval", "1s",

View File

@@ -29,7 +29,7 @@ func BackupBlobID(l UpgradeLockIntent) blob.ID {
// it updates the existing lock using the output of the UpgradeLock.Update().
//
// This method also backs up the original format version on the upgrade lock
// intent and sets the latest format-version o nthe repository blob. This
// intent and sets the latest format-version to the repository blob. This
// should cause the unsupporting clients (non-upgrade capable) to fail
// connecting to the repository.
func (m *Manager) SetUpgradeLockIntent(ctx context.Context, l UpgradeLockIntent) (*UpgradeLockIntent, error) {