test(repository): Basic tests for getting epoch manager blob set (#5418)

Create helper functions that can seed state for epoch manager tests.

Test basic cases of getting the active set of blobs. This indirectly
tests the fallback logic that ensures all epochs are covered when
returning the active set.

Authored-by: @ashmrtn
This commit is contained in:
Julio López
2026-06-15 16:38:49 -07:00
committed by GitHub
parent 09c0760a20
commit 72ec08fd8e

View File

@@ -5,6 +5,7 @@
"encoding/json"
"fmt"
"math/rand"
"slices"
"sort"
"sync/atomic"
"testing"
@@ -127,6 +128,250 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv {
return te2
}
func makeSeq(base, start, end int) []int {
s := make([]int, 0, end-start+1)
for i := start; i <= end; i++ {
s = append(s, base+i)
}
return s
}
func makeIndexBlob(
ctx context.Context,
te *epochManagerTestEnv,
prefix blob.ID,
ndx *fakeIndex,
) {
blobID := blob.ID(fmt.Sprintf("%v%016x-s0-c1", prefix, rand.Int63()))
te.st.PutBlob(ctx, blobID, gather.FromSlice(ndx.Bytes()), blob.PutOptions{})
}
func makeUncompactedBlobsForEpochs(
ctx context.Context,
te *epochManagerTestEnv,
startEpoch int,
endEpoch int,
idxDataOffset int,
) {
for n := startEpoch; n <= endEpoch; n++ {
makeIndexBlob(
ctx,
te,
UncompactedEpochBlobPrefix(n),
newFakeIndexWithEntries(idxDataOffset+n),
)
}
}
func makeSingleEpochCompactedBlobsForEpochs(
ctx context.Context,
te *epochManagerTestEnv,
startEpoch int,
endEpoch int,
idxDataOffset int, //nolint:unparam
) {
for n := startEpoch; n <= endEpoch; n++ {
makeIndexBlob(
ctx,
te,
compactedEpochBlobPrefix(n),
newFakeIndexWithEntries(idxDataOffset+n),
)
}
}
func makeRangeCheckpointBlob(
ctx context.Context,
te *epochManagerTestEnv,
minEpoch int,
maxEpoch int,
idxDataOffset int,
) {
makeIndexBlob(
ctx,
te,
rangeCheckpointBlobPrefix(minEpoch, maxEpoch),
newFakeIndexWithEntries(makeSeq(idxDataOffset, minEpoch, maxEpoch)...),
)
}
// mustMakeWriteEpoch advances the epoch marker so that after the next
// Refresh, WriteEpoch equals epoch.
func mustMakeWriteEpoch(
ctx context.Context,
tb testing.TB,
te *epochManagerTestEnv,
epoch int,
) {
tb.Helper()
cs := CurrentSnapshot{
// advanceEpochMarker creates a blob for epoch N+1, so subtract 1 here to
// get the desired result.
WriteEpoch: epoch - 1,
}
err := te.mgr.advanceEpochMarker(ctx, cs)
require.NoError(tb, err, "creating write epoch marker")
}
func TestGetCompleteIndexSet_CoversAllEpochs(t *testing.T) {
// Each test in the table creates a set of index blobs of different compaction
// levels for different epochs. The data in each index blob is represented as
// a list of integers. The integers are specifically chosen to allow
// associating an integer with the index blob and epoch it was sourced from.
//
// The following base offsets are used for the different types of index blobs:
// * range checkpoint: offset 400
// * single epoch compaction: offset 100
// * settled uncompacted epoch: offset 200
// * unsettled uncompacted epoch: offset 300
//
// For the integers within each index blob, the lower-order numbers represent
// the epoch the entry corresponds to. For example, a range checkpoint for
// epochs 2-4 would have entries 402, 403, and 404. As single epoch
// compactions and uncompacted blobs only represent a single epoch, epoch blob
// of those types will only ever contain a single number, but sets of blobs
// for continuous epochs may span a range of integers.
//
// As there's a strong mapping of integers to type of blob and epoch, test
// failures that report the missing integers can be mapped back to the type of
// blob(s) that's missing and the epochs covered by the missing blob(s).
table := []struct {
name string
initStorage func(
ctx context.Context,
tb testing.TB,
te *epochManagerTestEnv,
)
wantEntries []int
}{
{
name: "OnlyRangeCheckpoint_CoversAllSettled",
initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) {
tb.Helper()
makeRangeCheckpointBlob(ctx, te, 0, 2, 400)
makeUncompactedBlobsForEpochs(ctx, te, 3, 4, 300)
mustMakeWriteEpoch(ctx, t, te, 4)
},
wantEntries: slices.Concat(
makeSeq(400, 0, 2),
makeSeq(300, 3, 4),
),
},
{
name: "OnlySingleEpochCompactions_CoverAllSettled",
initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) {
tb.Helper()
makeSingleEpochCompactedBlobsForEpochs(ctx, te, 0, 2, 100)
makeUncompactedBlobsForEpochs(ctx, te, 3, 4, 300)
mustMakeWriteEpoch(ctx, t, te, 4)
},
wantEntries: slices.Concat(
makeSeq(100, 0, 2),
makeSeq(300, 3, 4),
),
},
{
name: "OnlySingleEpochCompactions_GapsFilledByUncompacted",
initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) {
tb.Helper()
makeSingleEpochCompactedBlobsForEpochs(ctx, te, 0, 0, 100)
makeSingleEpochCompactedBlobsForEpochs(ctx, te, 2, 2, 100)
makeUncompactedBlobsForEpochs(ctx, te, 1, 1, 200)
makeUncompactedBlobsForEpochs(ctx, te, 3, 3, 200)
makeUncompactedBlobsForEpochs(ctx, te, 4, 5, 300)
mustMakeWriteEpoch(ctx, t, te, 5)
},
wantEntries: []int{
100, 102,
201, 203,
304, 305,
},
},
{
name: "RangeAndSingleEpochCompactions_NoGaps",
initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) {
tb.Helper()
makeRangeCheckpointBlob(ctx, te, 0, 2, 400)
makeSingleEpochCompactedBlobsForEpochs(ctx, te, 3, 4, 100)
makeUncompactedBlobsForEpochs(ctx, te, 5, 6, 300)
mustMakeWriteEpoch(ctx, t, te, 6)
},
wantEntries: slices.Concat(
makeSeq(400, 0, 2),
makeSeq(100, 3, 4),
makeSeq(300, 5, 6),
),
},
{
name: "RangeAndSingleEpochCompactions_GapsFilledByUncompacted",
initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) {
tb.Helper()
makeRangeCheckpointBlob(ctx, te, 0, 2, 400)
makeSingleEpochCompactedBlobsForEpochs(ctx, te, 3, 3, 100)
makeSingleEpochCompactedBlobsForEpochs(ctx, te, 5, 5, 100)
makeUncompactedBlobsForEpochs(ctx, te, 4, 4, 200)
makeUncompactedBlobsForEpochs(ctx, te, 6, 7, 300)
mustMakeWriteEpoch(ctx, t, te, 7)
},
wantEntries: []int{
400, 401, 402,
103, 105,
204,
306, 307,
},
},
{
name: "NoCompactions_OnlyUncompacted",
initStorage: func(ctx context.Context, tb testing.TB, te *epochManagerTestEnv) {
tb.Helper()
makeUncompactedBlobsForEpochs(ctx, te, 0, 2, 200)
makeUncompactedBlobsForEpochs(ctx, te, 3, 4, 300)
mustMakeWriteEpoch(ctx, t, te, 4)
},
wantEntries: slices.Concat(
makeSeq(200, 0, 2),
makeSeq(300, 3, 4),
),
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
ctx := testlogging.Context(t)
te := newTestEnv(t)
test.initStorage(ctx, t, te)
require.NoError(t, te.mgr.Refresh(ctx), "refreshing epoch manager")
blobs, _, err := te.mgr.GetCompleteIndexSet(ctx, LatestEpoch)
require.NoError(t, err, "getting index set")
gotIndex, err := te.getMergedIndexContents(
ctx,
blob.IDsFromMetadata(blobs),
)
require.NoError(t, err, "getting range compacted blobs")
assert.ElementsMatch(
t,
test.wantEntries,
gotIndex.Entries,
"range checkpoint entries",
)
})
}
}
func TestIndexEpochManager_Regular(t *testing.T) {
t.Parallel()