diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index 11cae1563..829c2f471 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -5,6 +5,7 @@ "encoding/json" "fmt" "math/rand" + "slices" "sort" "sync/atomic" "testing" @@ -127,6 +128,250 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv { return te2 } +func makeSeq(base, start, end int) []int { + s := make([]int, 0, end-start+1) + for i := start; i <= end; i++ { + s = append(s, base+i) + } + + return s +} + +func makeIndexBlob( + ctx context.Context, + te *epochManagerTestEnv, + prefix blob.ID, + ndx *fakeIndex, +) { + blobID := blob.ID(fmt.Sprintf("%v%016x-s0-c1", prefix, rand.Int63())) + te.st.PutBlob(ctx, blobID, gather.FromSlice(ndx.Bytes()), blob.PutOptions{}) +} + +func makeUncompactedBlobsForEpochs( + ctx context.Context, + te *epochManagerTestEnv, + startEpoch int, + endEpoch int, + idxDataOffset int, +) { + for n := startEpoch; n <= endEpoch; n++ { + makeIndexBlob( + ctx, + te, + UncompactedEpochBlobPrefix(n), + newFakeIndexWithEntries(idxDataOffset+n), + ) + } +} + +func makeSingleEpochCompactedBlobsForEpochs( + ctx context.Context, + te *epochManagerTestEnv, + startEpoch int, + endEpoch int, + idxDataOffset int, //nolint:unparam +) { + for n := startEpoch; n <= endEpoch; n++ { + makeIndexBlob( + ctx, + te, + compactedEpochBlobPrefix(n), + newFakeIndexWithEntries(idxDataOffset+n), + ) + } +} + +func makeRangeCheckpointBlob( + ctx context.Context, + te *epochManagerTestEnv, + minEpoch int, + maxEpoch int, + idxDataOffset int, +) { + makeIndexBlob( + ctx, + te, + rangeCheckpointBlobPrefix(minEpoch, maxEpoch), + newFakeIndexWithEntries(makeSeq(idxDataOffset, minEpoch, maxEpoch)...), + ) +} + +// mustMakeWriteEpoch advances the epoch marker so that after the next +// Refresh, WriteEpoch equals epoch. +func mustMakeWriteEpoch( + ctx context.Context, + tb testing.TB, + te *epochManagerTestEnv, + epoch int, +) { + tb.Helper() + + cs := CurrentSnapshot{ + // advanceEpochMarker creates a blob for epoch N+1, so subtract 1 here to + // get the desired result. + WriteEpoch: epoch - 1, + } + + err := te.mgr.advanceEpochMarker(ctx, cs) + require.NoError(tb, err, "creating write epoch marker") +} + +func TestGetCompleteIndexSet_CoversAllEpochs(t *testing.T) { + // Each test in the table creates a set of index blobs of different compaction + // levels for different epochs. The data in each index blob is represented as + // a list of integers. The integers are specifically chosen to allow + // associating an integer with the index blob and epoch it was sourced from. + // + // The following base offsets are used for the different types of index blobs: + // * range checkpoint: offset 400 + // * single epoch compaction: offset 100 + // * settled uncompacted epoch: offset 200 + // * unsettled uncompacted epoch: offset 300 + // + // For the integers within each index blob, the lower-order numbers represent + // the epoch the entry corresponds to. For example, a range checkpoint for + // epochs 2-4 would have entries 402, 403, and 404. As single epoch + // compactions and uncompacted blobs only represent a single epoch, epoch blob + // of those types will only ever contain a single number, but sets of blobs + // for continuous epochs may span a range of integers. + // + // As there's a strong mapping of integers to type of blob and epoch, test + // failures that report the missing integers can be mapped back to the type of + // blob(s) that's missing and the epochs covered by the missing blob(s). + table := []struct { + name string + initStorage func( + ctx context.Context, + tb testing.TB, + te *epochManagerTestEnv, + ) + wantEntries []int + }{ + { + name: "OnlyRangeCheckpoint_CoversAllSettled", + initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) { + tb.Helper() + + makeRangeCheckpointBlob(ctx, te, 0, 2, 400) + makeUncompactedBlobsForEpochs(ctx, te, 3, 4, 300) + mustMakeWriteEpoch(ctx, t, te, 4) + }, + wantEntries: slices.Concat( + makeSeq(400, 0, 2), + makeSeq(300, 3, 4), + ), + }, + { + name: "OnlySingleEpochCompactions_CoverAllSettled", + initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) { + tb.Helper() + + makeSingleEpochCompactedBlobsForEpochs(ctx, te, 0, 2, 100) + makeUncompactedBlobsForEpochs(ctx, te, 3, 4, 300) + mustMakeWriteEpoch(ctx, t, te, 4) + }, + wantEntries: slices.Concat( + makeSeq(100, 0, 2), + makeSeq(300, 3, 4), + ), + }, + { + name: "OnlySingleEpochCompactions_GapsFilledByUncompacted", + initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) { + tb.Helper() + + makeSingleEpochCompactedBlobsForEpochs(ctx, te, 0, 0, 100) + makeSingleEpochCompactedBlobsForEpochs(ctx, te, 2, 2, 100) + makeUncompactedBlobsForEpochs(ctx, te, 1, 1, 200) + makeUncompactedBlobsForEpochs(ctx, te, 3, 3, 200) + makeUncompactedBlobsForEpochs(ctx, te, 4, 5, 300) + mustMakeWriteEpoch(ctx, t, te, 5) + }, + wantEntries: []int{ + 100, 102, + 201, 203, + 304, 305, + }, + }, + { + name: "RangeAndSingleEpochCompactions_NoGaps", + initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) { + tb.Helper() + + makeRangeCheckpointBlob(ctx, te, 0, 2, 400) + makeSingleEpochCompactedBlobsForEpochs(ctx, te, 3, 4, 100) + makeUncompactedBlobsForEpochs(ctx, te, 5, 6, 300) + mustMakeWriteEpoch(ctx, t, te, 6) + }, + wantEntries: slices.Concat( + makeSeq(400, 0, 2), + makeSeq(100, 3, 4), + makeSeq(300, 5, 6), + ), + }, + { + name: "RangeAndSingleEpochCompactions_GapsFilledByUncompacted", + initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) { + tb.Helper() + + makeRangeCheckpointBlob(ctx, te, 0, 2, 400) + makeSingleEpochCompactedBlobsForEpochs(ctx, te, 3, 3, 100) + makeSingleEpochCompactedBlobsForEpochs(ctx, te, 5, 5, 100) + makeUncompactedBlobsForEpochs(ctx, te, 4, 4, 200) + makeUncompactedBlobsForEpochs(ctx, te, 6, 7, 300) + mustMakeWriteEpoch(ctx, t, te, 7) + }, + wantEntries: []int{ + 400, 401, 402, + 103, 105, + 204, + 306, 307, + }, + }, + { + name: "NoCompactions_OnlyUncompacted", + initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) { + tb.Helper() + + makeUncompactedBlobsForEpochs(ctx, te, 0, 2, 200) + makeUncompactedBlobsForEpochs(ctx, te, 3, 4, 300) + mustMakeWriteEpoch(ctx, t, te, 4) + }, + wantEntries: slices.Concat( + makeSeq(200, 0, 2), + makeSeq(300, 3, 4), + ), + }, + } + + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + ctx := testlogging.Context(t) + te := newTestEnv(t) + + test.initStorage(ctx, t, te) + + require.NoError(t, te.mgr.Refresh(ctx), "refreshing epoch manager") + + blobs, _, err := te.mgr.GetCompleteIndexSet(ctx, LatestEpoch) + require.NoError(t, err, "getting index set") + + gotIndex, err := te.getMergedIndexContents( + ctx, + blob.IDsFromMetadata(blobs), + ) + require.NoError(t, err, "getting range compacted blobs") + + assert.ElementsMatch( + t, + test.wantEntries, + gotIndex.Entries, + "range checkpoint entries", + ) + }) + } +} + func TestIndexEpochManager_Regular(t *testing.T) { t.Parallel()