From fdb6d3c09737bb53efaea08f28b630938d05b398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Fri, 22 Mar 2024 15:29:11 -0700 Subject: [PATCH] refactor(general): add epoch.Manager.MaybeGenerateRangeCheckpoint (#3727) * epoch manager: factor out getRangeToCompact * epoch manager: add epoch.Manager.MaybeGenerateRangeCheckpoint * test epoch.Manager.MaybeGenerateRangeCheckpoint --- internal/epoch/epoch_manager.go | 61 ++++++-- internal/epoch/epoch_manager_test.go | 218 +++++++++++++++++++++++++++ 2 files changed, 267 insertions(+), 12 deletions(-) diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 98810601d..ddd4bf8d5 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -593,19 +593,38 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSna return nil } +// MaybeGenerateRangeCheckpoint may create a new range index for all the +// individual epochs covered by the new range. If there are not enough epochs +// to create a new range, then a range index is not created. +func (e *Manager) MaybeGenerateRangeCheckpoint(ctx context.Context) error { + p, err := e.getParameters(ctx) + if err != nil { + return err + } + + cs, err := e.committedState(ctx, 0) + if err != nil { + return err + } + + latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p) + if !compact { + e.log.Debug("not generating range checkpoint") + + return nil + } + + if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil { + return errors.Wrap(err, "unable to generate full checkpoint, performance will be affected") + } + + return nil +} + func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) { - latestSettled := cs.WriteEpoch - numUnsettledEpochs - if latestSettled < 0 { - return - } - - firstNonRangeCompacted := 0 - if len(cs.LongestRangeCheckpointSets) > 0 { - firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1 - } - - if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency { - e.log.Debugf("not generating range checkpoint") + latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p) + if !compact { + e.log.Debug("not generating range checkpoint") return } @@ -624,6 +643,24 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs }) } +func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compactRange bool) { + latestSettled := cs.WriteEpoch - numUnsettledEpochs + if latestSettled < 0 { + return -1, -1, false + } + + firstNonRangeCompacted := 0 + if rangeSetsLen := len(cs.LongestRangeCheckpointSets); rangeSetsLen > 0 { + firstNonRangeCompacted = cs.LongestRangeCheckpointSets[rangeSetsLen-1].MaxEpoch + 1 + } + + if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency { + return -1, -1, false + } + + return latestSettled, firstNonRangeCompacted, true +} + func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) { // TODO: implement me _ = cs diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index f669c042f..811082d91 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -941,6 +941,224 @@ func TestMaybeCompactSingleEpoch(t *testing.T) { require.Len(t, cs.SingleEpochCompactionSets, newestEpochToCompact) } +func TestMaybeGenerateRangeCheckpoint_Empty(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + ctx := testlogging.Context(t) + + // this should be a no-op + err := te.mgr.MaybeGenerateRangeCheckpoint(ctx) + + require.NoError(t, err) +} + +func TestMaybeGenerateRangeCheckpoint_GetParametersError(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + ctx := testlogging.Context(t) + + paramsError := errors.New("no parameters error") + te.mgr.paramProvider = faultyParamsProvider{err: paramsError} + + err := te.mgr.MaybeGenerateRangeCheckpoint(ctx) + + require.Error(t, err) + require.ErrorIs(t, err, paramsError) +} + +func TestMaybeGenerateRangeCheckpoint_FailToReadState(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + te.mgr.allowCleanupWritesOnIndexLoad = false + ctx := testlogging.Context(t) + + ctx, cancel := context.WithCancel(ctx) + + cancel() + + err := te.mgr.MaybeGenerateRangeCheckpoint(ctx) + + require.Error(t, err) +} + +func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + te.mgr.allowCleanupWritesOnIndexLoad = false + ctx := testlogging.Context(t) + + p, err := te.mgr.getParameters(ctx) + require.NoError(t, err) + + epochsToWrite := p.FullCheckpointFrequency + 3 + idxCount := p.GetEpochAdvanceOnCountThreshold() + + var k int + + // Create sufficient indexes blobs and move clock forward to advance epoch. + for j := 0; j < epochsToWrite; j++ { + for i := 0; i < idxCount; i++ { + if i == idxCount-1 { + // Advance the time so that the difference in times for writes will force + // new epochs. + te.ft.Advance(p.MinEpochDuration + 1*time.Hour) + } + + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k)) + k++ + } + + err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) + require.NoError(t, err) + } + + cs, err := te.mgr.Current(ctx) + + require.NoError(t, err) + require.Equal(t, epochsToWrite, cs.WriteEpoch) + + compactionError := errors.New("test compaction error") + te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error { + return compactionError + } + + err = te.mgr.MaybeGenerateRangeCheckpoint(ctx) + + require.Error(t, err) + require.ErrorIs(t, err, compactionError) +} + +func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + te.mgr.allowCleanupWritesOnIndexLoad = false + ctx := testlogging.Context(t) + + p, err := te.mgr.getParameters(ctx) + require.NoError(t, err) + + var k int + + epochsToWrite := p.FullCheckpointFrequency + 3 + idxCount := p.GetEpochAdvanceOnCountThreshold() + // Create sufficient indexes blobs and move clock forward to advance epoch. + for j := 0; j < epochsToWrite; j++ { + for i := 0; i < idxCount; i++ { + if i == idxCount-1 { + // Advance the time so that the difference in times for writes will force + // new epochs. + te.ft.Advance(p.MinEpochDuration + 1*time.Hour) + } + + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k)) + } + + err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) + require.NoError(t, err) + } + + cs, err := te.mgr.Current(ctx) + + require.NoError(t, err) + require.Equal(t, epochsToWrite, cs.WriteEpoch) + require.Empty(t, cs.LongestRangeCheckpointSets) + + err = te.mgr.MaybeGenerateRangeCheckpoint(ctx) + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) + require.NoError(t, err) + + cs, err = te.mgr.Current(ctx) + + require.NoError(t, err) + require.Equal(t, epochsToWrite, cs.WriteEpoch) + require.Len(t, cs.LongestRangeCheckpointSets, 1) +} + +func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + te.mgr.allowCleanupWritesOnIndexLoad = false + ctx := testlogging.Context(t) + + p, err := te.mgr.getParameters(ctx) + require.NoError(t, err) + + var k int + + epochsToWrite := p.FullCheckpointFrequency + 3 + idxCount := p.GetEpochAdvanceOnCountThreshold() + // Create sufficient indexes blobs and move clock forward to advance epoch. + for j := 0; j < epochsToWrite; j++ { + for i := 0; i < idxCount; i++ { + if i == idxCount-1 { + // Advance the time so that the difference in times for writes will force + // new epochs. + te.ft.Advance(p.MinEpochDuration + 1*time.Hour) + } + + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k)) + } + + err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) + require.NoError(t, err) + } + + cs, err := te.mgr.Current(ctx) + require.NoError(t, err) + + require.Equal(t, epochsToWrite, cs.WriteEpoch) + + // perform single-epoch compaction for settled epochs + newestEpochToCompact := cs.WriteEpoch - numUnsettledEpochs + 1 + for j := 0; j < newestEpochToCompact; j++ { + err = te.mgr.MaybeCompactSingleEpoch(ctx) + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) // force state refresh + require.NoError(t, err) + + cs, err = te.mgr.Current(ctx) + require.NoError(t, err) + + require.Len(t, cs.SingleEpochCompactionSets, j+1) + } + + cs, err = te.mgr.Current(ctx) + + require.NoError(t, err) + require.Equal(t, epochsToWrite, cs.WriteEpoch) + require.Empty(t, cs.LongestRangeCheckpointSets) + + err = te.mgr.MaybeGenerateRangeCheckpoint(ctx) + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) + require.NoError(t, err) + + cs, err = te.mgr.Current(ctx) + + require.NoError(t, err) + require.Equal(t, epochsToWrite, cs.WriteEpoch) + require.Len(t, cs.LongestRangeCheckpointSets, 1) +} + func TestValidateParameters(t *testing.T) { cases := []struct { p Parameters