fix(repository): selection of next epoch to compact (#5373)

- additional test cases for epoch selection
- helper to derive oldest not compacted epoch
- fix selection of next epoch to compact
- cleanup: remove unused code and tests
- fixes #5371
This commit is contained in:
Julio López
2026-05-13 22:35:30 -07:00
committed by GitHub
parent 6af71d2d57
commit 0346afccee
2 changed files with 217 additions and 110 deletions

View File

@@ -1,6 +1,9 @@
package epoch
import (
"cmp"
"iter"
"maps"
"slices"
"strconv"
"strings"
@@ -124,42 +127,6 @@ func (r closedIntRange) isEmpty() bool {
minInt = -1 << (intSize - 1)
)
func getFirstContiguousKeyRange[E any](m map[int]E) closedIntRange {
if len(m) == 0 {
return closedIntRange{lo: 0, hi: -1}
}
keys := make([]int, 0, len(m))
for k := range m {
keys = append(keys, k)
}
slices.Sort(keys)
lo := keys[0]
if hi := keys[len(keys)-1]; hi-lo+1 == len(m) {
// the difference between the largest and smallest key is the same as
// the length of the key set, then the range is contiguous
return closedIntRange{lo: lo, hi: hi}
}
hi := lo
for _, v := range keys[1:] {
if v != hi+1 {
break
}
hi = v
}
return closedIntRange{lo: lo, hi: hi}
}
func getCompactedEpochRange(cs CurrentSnapshot) closedIntRange {
return getFirstContiguousKeyRange(cs.SingleEpochCompactionSets)
}
var errInvalidCompactedRange = errors.New("invalid compacted epoch range")
func getRangeCompactedRange(cs CurrentSnapshot) closedIntRange {
@@ -189,16 +156,56 @@ func oldestUncompactedEpoch(cs CurrentSnapshot) (int, error) {
oldestUncompacted = rangeCompacted.hi + 1
}
singleCompacted := getCompactedEpochRange(cs)
oldestUncompacted = getOldestUncompactedAfterEpoch(maps.Keys(cs.SingleEpochCompactionSets), oldestUncompacted)
if singleCompacted.isEmpty() || oldestUncompacted < singleCompacted.lo {
return oldestUncompacted, nil
}
// singleCompacted is not empty
if oldestUncompacted > singleCompacted.hi {
return oldestUncompacted, nil
}
return singleCompacted.hi + 1, nil
return oldestUncompacted, nil
}
// filterLowerThan returns a sequence with the elements from s that are greater
// or equal than threshold, that is it omits the elements that are strictly less
// than threshold.
// For example, if s = {0, 3, 5} and threshold is 3, then the resulting sequence
// yields {3, 5}.
func filterLowerThan[V cmp.Ordered](threshold V, s iter.Seq[V]) iter.Seq[V] {
return func(yield func(V) bool) {
s(func(v V) bool { // this is the filtering function
if v >= threshold {
return yield(v) // only yield values >= threshold
}
return true
})
}
}
// getOldestUncompactedAfterEpoch finds the oldest uncompacted epoch given a
// sequence of known (single-epoch) compacted epochs. The returned epoch is
// greater or equal than the uncompactedCandidateEpoch. For example, suppose
// that compacted epochs has { 3, 5, 6, 8 } then the following are the returned
// values for uncompactedCandidateEpoch
// uncompactedCandidateEpoch < 3 => uncompactedCandidateEpoch
// uncompactedCandidateEpoch == 3 => 4
// uncompactedCandidateEpoch == 4 => 4
// uncompactedCandidateEpoch == 5 or 6 => 7
// uncompactedCandidateEpoch == 7 => 7
// uncompactedCandidateEpoch == 8 => 9
// uncompactedCandidateEpoch > 8 => uncompactedCandidateEpoch.
//
//nolint:dupword
func getOldestUncompactedAfterEpoch(compactedEpochs iter.Seq[int], uncompactedCandidateEpoch int) int {
s := slices.Sorted(filterLowerThan(uncompactedCandidateEpoch, compactedEpochs))
if len(s) == 0 || uncompactedCandidateEpoch < s[0] {
return uncompactedCandidateEpoch
}
prev := s[0]
for _, v := range s[1:] {
if v != prev+1 {
break
}
prev = v
}
return prev + 1
}

View File

@@ -3,6 +3,7 @@
import (
"fmt"
"math"
"slices"
"strconv"
"testing"
@@ -247,6 +248,36 @@ func TestOldestUncompactedEpoch(t *testing.T) {
},
expectedEpoch: 8,
},
{
input: CurrentSnapshot{
LongestRangeCheckpointSets: makeLongestRange(0, 7),
// non-contiguous single epoch compaction set, the first contiguous sequence fully overlaps with the compacted range
SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 2, 4, 6, 7, 9}),
},
expectedEpoch: 8,
},
{
input: CurrentSnapshot{
LongestRangeCheckpointSets: makeLongestRange(0, 7),
// non-contiguous single epoch compaction set, but most of the
// set overlaps with the compacted range except for the last
// epoch in the range (7), and the next epoch (8) is in the
// single compaction set already
SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 2, 4, 6, 8, 9}),
},
expectedEpoch: 10,
},
{
input: CurrentSnapshot{
LongestRangeCheckpointSets: makeLongestRange(0, 7),
// non-contiguous single epoch compaction set, but most of the
// set overlaps with the compacted range except for the last
// epoch in the range (7), and the next epoch (8) is in the
// single compaction set already
SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 2, 4, 6, 8, 10}),
},
expectedEpoch: 9,
},
{
input: CurrentSnapshot{
LongestRangeCheckpointSets: makeLongestRange(0, 7),
@@ -308,111 +339,180 @@ func makeLongestRange(minEpoch, maxEpoch int) []*RangeMetadata {
}
}
func TestGetFirstContiguousKeyRange(t *testing.T) {
func TestGetOldestUncompactedAfterEpoch(t *testing.T) {
cases := []struct {
input map[int]bool
want closedIntRange
length uint
isEmpty bool
in []int
threshold int
expected int
}{
{},
{
isEmpty: true,
want: closedIntRange{0, -1},
threshold: 5,
expected: 5,
},
{
input: map[int]bool{0: true},
want: closedIntRange{lo: 0, hi: 0},
length: 1,
in: []int{},
threshold: 0,
expected: 0,
},
{
input: map[int]bool{-5: true},
want: closedIntRange{lo: -5, hi: -5},
length: 1,
in: []int{0},
threshold: 0,
expected: 1,
},
{
in: []int{0},
threshold: 1,
expected: 1,
},
{
input: map[int]bool{-5: true, -4: true},
want: closedIntRange{lo: -5, hi: -4},
length: 2,
in: []int{0, 2, 5, 3},
threshold: 0,
expected: 1,
},
{
input: map[int]bool{0: true},
want: closedIntRange{lo: 0, hi: 0},
length: 1,
in: []int{0, 2, 5, 3},
threshold: 1,
expected: 1,
},
{
input: map[int]bool{5: true},
want: closedIntRange{lo: 5, hi: 5},
length: 1,
in: []int{0, 2, 5, 3},
threshold: 2,
expected: 4,
},
{
input: map[int]bool{0: true, 1: true},
want: closedIntRange{lo: 0, hi: 1},
length: 2,
in: []int{0, 2, 5, 3},
threshold: 3,
expected: 4,
},
{
input: map[int]bool{8: true, 9: true},
want: closedIntRange{lo: 8, hi: 9},
length: 2,
in: []int{0, 2, 5, 3},
threshold: 4,
expected: 4,
},
{
input: map[int]bool{1: true, 2: true, 3: true, 4: true, 5: true},
want: closedIntRange{lo: 1, hi: 5},
length: 5,
in: []int{0, 2, 5, 3},
threshold: 5,
expected: 6,
},
{
input: map[int]bool{8: true, 10: true},
want: closedIntRange{lo: 8, hi: 8},
length: 1,
in: []int{0, 2, 5, 3},
threshold: 6,
expected: 6,
},
{
input: map[int]bool{1: true, 2: true, 3: true, 5: true},
want: closedIntRange{lo: 1, hi: 3},
length: 3,
in: []int{0, 2, 5, 3},
threshold: 8,
expected: 8,
},
{
input: map[int]bool{-5: true, -7: true},
want: closedIntRange{lo: -7, hi: -7},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 0,
expected: 2,
},
{
input: map[int]bool{0: true, minInt: true},
want: closedIntRange{lo: minInt, hi: minInt},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 1,
expected: 2,
},
{
input: map[int]bool{0: true, maxInt: true},
want: closedIntRange{lo: 0, hi: 0},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 2,
expected: 2,
},
{
input: map[int]bool{maxInt: true, minInt: true},
want: closedIntRange{lo: minInt, hi: minInt},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 3,
expected: 3,
},
{
input: map[int]bool{minInt: true},
want: closedIntRange{lo: minInt, hi: minInt},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 4,
expected: 6,
},
{
input: map[int]bool{maxInt - 1: true},
want: closedIntRange{lo: maxInt - 1, hi: maxInt - 1},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 5,
expected: 6,
},
{
input: map[int]bool{maxInt: true},
want: closedIntRange{lo: maxInt, hi: maxInt},
length: 1,
in: []int{1, 0, 5, 4},
threshold: 6,
expected: 6,
},
{
in: []int{1, 0, 5, 4},
threshold: 7,
expected: 7,
},
}
for i, tc := range cases {
t.Run(fmt.Sprint("case:", i), func(t *testing.T) {
got := getFirstContiguousKeyRange(tc.input)
t.Run("case:"+strconv.Itoa(i), func(t *testing.T) {
vseq := slices.Values(tc.in)
got := getOldestUncompactedAfterEpoch(vseq, tc.threshold)
require.Equal(t, tc.want, got, "input: %#v", tc.input)
require.Equal(t, tc.length, got.length())
require.Equal(t, tc.isEmpty, got.isEmpty())
require.Equal(t, tc.expected, got)
})
}
}
func TestFilterLowerThan(t *testing.T) {
cases := []struct {
in []int
threshold int
expected []int
}{
{},
{
threshold: 5,
},
{
in: []int{},
threshold: 0,
expected: []int{},
},
{
in: []int{0},
threshold: 0,
expected: []int{0},
},
{
in: []int{0},
threshold: 1,
expected: []int{},
},
{
in: []int{0, 2, 5, 3},
threshold: 6,
expected: []int{},
},
{
in: []int{1, 0, 5, 4},
threshold: 0,
expected: []int{1, 0, 5, 4},
},
{
in: []int{1, 0, -1, 5, 4},
threshold: 3,
expected: []int{4, 5},
},
{
in: []int{1, 0, -1, 5, 4},
threshold: 4,
expected: []int{4, 5},
},
}
for i, tc := range cases {
t.Run("case:"+strconv.Itoa(i), func(t *testing.T) {
vseq := slices.Values(tc.in)
got := filterLowerThan(tc.threshold, vseq)
gotSlice := slices.Collect(got)
require.Subset(t, tc.in, gotSlice)
require.ElementsMatch(t, gotSlice, tc.expected)
})
}
}