refactor(general): add epoch.Manager.MaybeGenerateRangeCheckpoint (#3727)

* epoch manager: factor out getRangeToCompact
* epoch manager: add epoch.Manager.MaybeGenerateRangeCheckpoint
* test epoch.Manager.MaybeGenerateRangeCheckpoint
This commit is contained in:
Julio López
2024-03-22 15:29:11 -07:00
committed by GitHub
parent 9c99b8aa83
commit fdb6d3c097
2 changed files with 267 additions and 12 deletions

View File

@@ -593,19 +593,38 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSna
return nil
}
// MaybeGenerateRangeCheckpoint may create a new range index for all the
// individual epochs covered by the new range. If there are not enough epochs
// to create a new range, then a range index is not created.
func (e *Manager) MaybeGenerateRangeCheckpoint(ctx context.Context) error {
p, err := e.getParameters(ctx)
if err != nil {
return err
}
cs, err := e.committedState(ctx, 0)
if err != nil {
return err
}
latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p)
if !compact {
e.log.Debug("not generating range checkpoint")
return nil
}
if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil {
return errors.Wrap(err, "unable to generate full checkpoint, performance will be affected")
}
return nil
}
func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
return
}
firstNonRangeCompacted := 0
if len(cs.LongestRangeCheckpointSets) > 0 {
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1
}
if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency {
e.log.Debugf("not generating range checkpoint")
latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p)
if !compact {
e.log.Debug("not generating range checkpoint")
return
}
@@ -624,6 +643,24 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
})
}
func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compactRange bool) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
return -1, -1, false
}
firstNonRangeCompacted := 0
if rangeSetsLen := len(cs.LongestRangeCheckpointSets); rangeSetsLen > 0 {
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[rangeSetsLen-1].MaxEpoch + 1
}
if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency {
return -1, -1, false
}
return latestSettled, firstNonRangeCompacted, true
}
func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) {
// TODO: implement me
_ = cs

View File

@@ -941,6 +941,224 @@ func TestMaybeCompactSingleEpoch(t *testing.T) {
require.Len(t, cs.SingleEpochCompactionSets, newestEpochToCompact)
}
func TestMaybeGenerateRangeCheckpoint_Empty(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
ctx := testlogging.Context(t)
// this should be a no-op
err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)
}
func TestMaybeGenerateRangeCheckpoint_GetParametersError(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
ctx := testlogging.Context(t)
paramsError := errors.New("no parameters error")
te.mgr.paramProvider = faultyParamsProvider{err: paramsError}
err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.Error(t, err)
require.ErrorIs(t, err, paramsError)
}
func TestMaybeGenerateRangeCheckpoint_FailToReadState(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
ctx, cancel := context.WithCancel(ctx)
cancel()
err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.Error(t, err)
}
func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)
epochsToWrite := p.FullCheckpointFrequency + 3
idxCount := p.GetEpochAdvanceOnCountThreshold()
var k int
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < epochsToWrite; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
}
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k))
k++
}
err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
}
cs, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
compactionError := errors.New("test compaction error")
te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error {
return compactionError
}
err = te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.Error(t, err)
require.ErrorIs(t, err, compactionError)
}
func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)
var k int
epochsToWrite := p.FullCheckpointFrequency + 3
idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < epochsToWrite; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
}
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k))
}
err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
}
cs, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Empty(t, cs.LongestRangeCheckpointSets)
err = te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Len(t, cs.LongestRangeCheckpointSets, 1)
}
func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)
var k int
epochsToWrite := p.FullCheckpointFrequency + 3
idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < epochsToWrite; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
}
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k))
}
err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
}
cs, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
// perform single-epoch compaction for settled epochs
newestEpochToCompact := cs.WriteEpoch - numUnsettledEpochs + 1
for j := 0; j < newestEpochToCompact; j++ {
err = te.mgr.MaybeCompactSingleEpoch(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx) // force state refresh
require.NoError(t, err)
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
require.Len(t, cs.SingleEpochCompactionSets, j+1)
}
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Empty(t, cs.LongestRangeCheckpointSets)
err = te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Len(t, cs.LongestRangeCheckpointSets, 1)
}
func TestValidateParameters(t *testing.T) {
cases := []struct {
p Parameters