diff --git a/internal/epoch/epoch_utils.go b/internal/epoch/epoch_utils.go index 005e60aad..42a569586 100644 --- a/internal/epoch/epoch_utils.go +++ b/internal/epoch/epoch_utils.go @@ -1,6 +1,7 @@ package epoch import ( + "slices" "strconv" "strings" "time" @@ -161,8 +162,40 @@ func getContiguousKeyRange[E any](m map[int]E) (closedIntRange, error) { return r, nil } -func getCompactedEpochRange(cs CurrentSnapshot) (closedIntRange, error) { - return getContiguousKeyRange(cs.SingleEpochCompactionSets) +func getFirstContiguousKeyRange[E any](m map[int]E) closedIntRange { + if len(m) == 0 { + return closedIntRange{lo: 0, hi: -1} + } + + keys := make([]int, 0, len(m)) + + for k := range m { + keys = append(keys, k) + } + + slices.Sort(keys) + + lo := keys[0] + if hi := keys[len(keys)-1]; hi-lo+1 == len(m) { + // the difference between the largest and smallest key is the same as + // the length of the key set, then the range is contiguous + return closedIntRange{lo: lo, hi: hi} + } + + hi := lo + for _, v := range keys[1:] { + if v != hi+1 { + break + } + + hi = v + } + + return closedIntRange{lo: lo, hi: hi} +} + +func getCompactedEpochRange(cs CurrentSnapshot) closedIntRange { + return getFirstContiguousKeyRange(cs.SingleEpochCompactionSets) } var errInvalidCompactedRange = errors.New("invalid compacted epoch range") @@ -194,10 +227,7 @@ func oldestUncompactedEpoch(cs CurrentSnapshot) (int, error) { oldestUncompacted = rangeCompacted.hi + 1 } - singleCompacted, err := getCompactedEpochRange(cs) - if err != nil { - return -1, errors.Wrap(err, "could not get latest single-compacted epoch") - } + singleCompacted := getCompactedEpochRange(cs) if singleCompacted.isEmpty() || oldestUncompacted < singleCompacted.lo { return oldestUncompacted, nil diff --git a/internal/epoch/epoch_utils_test.go b/internal/epoch/epoch_utils_test.go index 5dcf9171c..fd7890a6e 100644 --- a/internal/epoch/epoch_utils_test.go +++ b/internal/epoch/epoch_utils_test.go @@ -3,6 +3,7 @@ import ( "fmt" "math" + "strconv" "testing" "github.com/stretchr/testify/require" @@ -313,117 +314,94 @@ func TestGetKeyRange(t *testing.T) { } } +//nolint:maintidx func TestOldestUncompactedEpoch(t *testing.T) { cases := []struct { input CurrentSnapshot expectedEpoch int wantErr error }{ + // cases with non-contiguous single epoch compaction sets are needed for + // compatibility with older clients. { input: CurrentSnapshot{ SingleEpochCompactionSets: map[int][]blob.Metadata{}, }, - }, - { - input: CurrentSnapshot{ - WriteEpoch: 0, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 0: {blob.Metadata{BlobID: compactedEpochBlobPrefix(0) + "foo0"}}, - }, - }, - expectedEpoch: 1, - }, - { - input: CurrentSnapshot{ - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 0: {blob.Metadata{BlobID: compactedEpochBlobPrefix(0) + "foo0"}}, - 1: {blob.Metadata{BlobID: compactedEpochBlobPrefix(1) + "foo1"}}, - }, - }, - expectedEpoch: 2, - }, - { - input: CurrentSnapshot{ - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 1: {blob.Metadata{BlobID: compactedEpochBlobPrefix(1) + "foo1"}}, - }, - }, expectedEpoch: 0, }, { input: CurrentSnapshot{ - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 0: {blob.Metadata{BlobID: compactedEpochBlobPrefix(0) + "foo0"}}, - 2: {blob.Metadata{BlobID: compactedEpochBlobPrefix(2) + "foo2"}}, - }, + WriteEpoch: 0, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0}), }, - expectedEpoch: -1, - wantErr: errNonContiguousRange, + expectedEpoch: 1, }, { input: CurrentSnapshot{ - LongestRangeCheckpointSets: []*RangeMetadata{ - { - MinEpoch: 0, - MaxEpoch: 2, - Blobs: []blob.Metadata{ - {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, - }, - }, - }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1}), + }, + expectedEpoch: 2, + }, + { + input: CurrentSnapshot{ + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{1}), + }, + expectedEpoch: 0, + }, + { + input: CurrentSnapshot{ + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{23}), + }, + expectedEpoch: 0, + }, + { + input: CurrentSnapshot{ + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 2}), + }, + expectedEpoch: 1, + }, + { + input: CurrentSnapshot{ + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 4}), + }, + expectedEpoch: 1, + }, + + { + input: CurrentSnapshot{ + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 3}), + }, + expectedEpoch: 2, + }, + + { + input: CurrentSnapshot{ + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 4}), + }, + expectedEpoch: 2, + }, + { + input: CurrentSnapshot{ + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 4, 5}), + }, + expectedEpoch: 2, + }, + { + input: CurrentSnapshot{ + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 2, 4}), }, expectedEpoch: 3, }, { input: CurrentSnapshot{ - LongestRangeCheckpointSets: []*RangeMetadata{ - { - MinEpoch: 0, - MaxEpoch: 2, - Blobs: []blob.Metadata{ - {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, - }, - }, - }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 0: {blob.Metadata{BlobID: compactedEpochBlobPrefix(0) + "foo0"}}, - 1: {blob.Metadata{BlobID: compactedEpochBlobPrefix(1) + "foo1"}}, - }, - }, - expectedEpoch: 3, - }, - { - input: CurrentSnapshot{ - LongestRangeCheckpointSets: []*RangeMetadata{ - { - MinEpoch: 0, - MaxEpoch: 2, - Blobs: []blob.Metadata{ - {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, - }, - }, - }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 1: {blob.Metadata{BlobID: compactedEpochBlobPrefix(1) + "foo1"}}, - 2: {blob.Metadata{BlobID: compactedEpochBlobPrefix(2) + "foo2"}}, - }, - }, - expectedEpoch: 3, - }, - { - input: CurrentSnapshot{ - LongestRangeCheckpointSets: []*RangeMetadata{ - { - MinEpoch: 0, - MaxEpoch: 2, - Blobs: []blob.Metadata{ - {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, - }, - }, - }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 1: {blob.Metadata{BlobID: compactedEpochBlobPrefix(1) + "foo1"}}, - }, + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 2, 4, 6, 9}), }, expectedEpoch: 3, }, @@ -439,10 +417,6 @@ func TestOldestUncompactedEpoch(t *testing.T) { }, }, }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 4: {blob.Metadata{BlobID: compactedEpochBlobPrefix(4) + "foo4"}}, - 5: {blob.Metadata{BlobID: compactedEpochBlobPrefix(5) + "foo5"}}, - }, }, expectedEpoch: 3, }, @@ -457,10 +431,68 @@ func TestOldestUncompactedEpoch(t *testing.T) { }, }, }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 2: {blob.Metadata{BlobID: compactedEpochBlobPrefix(2) + "foo2"}}, - 3: {blob.Metadata{BlobID: compactedEpochBlobPrefix(3) + "foo3"}}, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1}), + }, + expectedEpoch: 3, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 2, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, + }, + }, }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{1, 2}), + }, + expectedEpoch: 3, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 2, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, + }, + }, + }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{1}), + }, + expectedEpoch: 3, + }, + + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 2, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, + }, + }, + }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{4, 5}), + }, + expectedEpoch: 3, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 2, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 2) + "foo-0-2"}, + }, + }, + }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{2, 3}), }, expectedEpoch: 4, }, @@ -475,10 +507,7 @@ func TestOldestUncompactedEpoch(t *testing.T) { }, }, }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 3: {blob.Metadata{BlobID: compactedEpochBlobPrefix(3) + "foo3"}}, - 4: {blob.Metadata{BlobID: compactedEpochBlobPrefix(4) + "foo4"}}, - }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{3, 4}), }, expectedEpoch: 5, }, @@ -493,10 +522,7 @@ func TestOldestUncompactedEpoch(t *testing.T) { }, }, }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 3: {blob.Metadata{BlobID: compactedEpochBlobPrefix(3) + "foo3"}}, - 4: {blob.Metadata{BlobID: compactedEpochBlobPrefix(4) + "foo4"}}, - }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{3, 4}), }, expectedEpoch: -1, wantErr: errInvalidCompactedRange, @@ -512,21 +538,83 @@ func TestOldestUncompactedEpoch(t *testing.T) { }, }, }, - SingleEpochCompactionSets: map[int][]blob.Metadata{ - 3: {blob.Metadata{BlobID: compactedEpochBlobPrefix(3) + "foo3"}}, - 5: {blob.Metadata{BlobID: compactedEpochBlobPrefix(5) + "foo5"}}, - }, + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{3, 5}), }, - expectedEpoch: -1, - wantErr: errNonContiguousRange, + expectedEpoch: 4, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 7, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 7) + "foo-0-7"}, + }, + }, + }, + // non-contiguous single epoch compaction set, but most of the set overlaps with the compacted range + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{0, 1, 2, 4, 6, 9}), + }, + expectedEpoch: 8, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 7, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 7) + "foo-0-7"}, + }, + }, + }, + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{9, 10}), + }, + expectedEpoch: 8, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 7, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 7) + "foo-0-7"}, + }, + }, + }, + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{8, 10}), + }, + expectedEpoch: 9, + }, + { + input: CurrentSnapshot{ + LongestRangeCheckpointSets: []*RangeMetadata{ + { + MinEpoch: 0, + MaxEpoch: 7, + Blobs: []blob.Metadata{ + {BlobID: rangeCheckpointBlobPrefix(0, 7) + "foo-0-7"}, + }, + }, + }, + // non-contiguous single epoch compaction set + SingleEpochCompactionSets: makeSingleCompactionEpochSets([]int{8, 9, 12}), + }, + expectedEpoch: 10, }, } for i, tc := range cases { - t.Run(fmt.Sprint("case: ", i), func(t *testing.T) { + t.Run(fmt.Sprint("case:", i), func(t *testing.T) { got, err := oldestUncompactedEpoch(tc.input) - if tc.wantErr != nil { + if tc.wantErr == nil { + require.NoError(t, err) + } else { require.Error(t, err) } @@ -534,3 +622,121 @@ func TestOldestUncompactedEpoch(t *testing.T) { }) } } + +func makeSingleCompactionEpochSets(epochs []int) map[int][]blob.Metadata { + es := make(map[int][]blob.Metadata, len(epochs)) + for _, e := range epochs { + es[e] = []blob.Metadata{{BlobID: compactedEpochBlobPrefix(e) + "foo_" + blob.ID(strconv.Itoa(e))}} + } + + return es +} + +func TestGetFirstContiguosKeyRange(t *testing.T) { + cases := []struct { + input map[int]bool + want closedIntRange + length uint + isEmpty bool + }{ + { + isEmpty: true, + want: closedIntRange{0, -1}, + }, + { + input: map[int]bool{0: true}, + want: closedIntRange{lo: 0, hi: 0}, + length: 1, + }, + { + input: map[int]bool{-5: true}, + want: closedIntRange{lo: -5, hi: -5}, + length: 1, + }, + { + input: map[int]bool{-5: true, -4: true}, + want: closedIntRange{lo: -5, hi: -4}, + length: 2, + }, + { + input: map[int]bool{0: true}, + want: closedIntRange{lo: 0, hi: 0}, + length: 1, + }, + { + input: map[int]bool{5: true}, + want: closedIntRange{lo: 5, hi: 5}, + length: 1, + }, + { + input: map[int]bool{0: true, 1: true}, + want: closedIntRange{lo: 0, hi: 1}, + length: 2, + }, + { + input: map[int]bool{8: true, 9: true}, + want: closedIntRange{lo: 8, hi: 9}, + length: 2, + }, + { + input: map[int]bool{1: true, 2: true, 3: true, 4: true, 5: true}, + want: closedIntRange{lo: 1, hi: 5}, + length: 5, + }, + { + input: map[int]bool{8: true, 10: true}, + want: closedIntRange{lo: 8, hi: 8}, + length: 1, + }, + { + input: map[int]bool{1: true, 2: true, 3: true, 5: true}, + want: closedIntRange{lo: 1, hi: 3}, + length: 3, + }, + { + input: map[int]bool{-5: true, -7: true}, + want: closedIntRange{lo: -7, hi: -7}, + length: 1, + }, + { + input: map[int]bool{0: true, minInt: true}, + want: closedIntRange{lo: minInt, hi: minInt}, + length: 1, + }, + { + input: map[int]bool{0: true, maxInt: true}, + want: closedIntRange{lo: 0, hi: 0}, + length: 1, + }, + { + input: map[int]bool{maxInt: true, minInt: true}, + want: closedIntRange{lo: minInt, hi: minInt}, + length: 1, + }, + { + input: map[int]bool{minInt: true}, + want: closedIntRange{lo: minInt, hi: minInt}, + length: 1, + }, + { + input: map[int]bool{maxInt - 1: true}, + want: closedIntRange{lo: maxInt - 1, hi: maxInt - 1}, + length: 1, + }, + { + input: map[int]bool{maxInt: true}, + want: closedIntRange{lo: maxInt, hi: maxInt}, + length: 1, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprint("case:", i), func(t *testing.T) { + got := getFirstContiguousKeyRange(tc.input) + + require.Equal(t, tc.want, got, "input: %#v", tc.input) + require.Equal(t, tc.length, got.length()) + require.Equal(t, tc.isEmpty, got.isEmpty()) + }) + } +}