From be49fcd42b7fea6befddb66815e7f60299e202bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Wed, 6 Mar 2024 17:12:49 -0800 Subject: [PATCH] refactor(general): add epoch.Manager.MaybeAdvanceEpoch helper (#3709) Add - TestMabyeAdvanceEpoch - TestMabyeAdvanceEpoch_Empty - TestMaybeAdvanceEpoch_Error - TestMaybeAdvanceEpoch_GetParametersError Ref: - #3638 - #3645 - #3651 --- internal/epoch/epoch_manager.go | 19 ++++ internal/epoch/epoch_manager_test.go | 160 +++++++++++++++++++++++++++ 2 files changed, 179 insertions(+) diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 4fc617523..1546a71f1 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -723,6 +723,25 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { return nil } +// MaybeAdvanceWriteEpoch writes a new write epoch marker when a new write +// epoch should be started, otherwise it does not do anything. +func (e *Manager) MaybeAdvanceWriteEpoch(ctx context.Context) error { + p, err := e.getParameters(ctx) + if err != nil { + return err + } + + e.mu.Lock() + cs := e.lastKnownState + e.mu.Unlock() + + if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) { + return errors.Wrap(e.advanceEpochMarker(ctx, cs), "error advancing epoch") + } + + return nil +} + func (e *Manager) advanceEpochMarker(ctx context.Context, cs CurrentSnapshot) error { blobID := blob.ID(fmt.Sprintf("%v%v", string(EpochMarkerIndexBlobPrefix), cs.WriteEpoch+1)) diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index 30ad25d96..2c15c851a 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -529,6 +529,141 @@ func TestSlowWrite_MovesToNextEpochTwice(t *testing.T) { require.Contains(t, err.Error(), "slow index write") } +func TestMaybeAdvanceEpoch_Empty(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + ctx := testlogging.Context(t) + + te.verifyCurrentWriteEpoch(t, 0) + + // this should be a no-op + err := te.mgr.MaybeAdvanceWriteEpoch(ctx) + + require.NoError(t, err) + + // check current epoch again + te.verifyCurrentWriteEpoch(t, 0) +} + +func TestMaybeAdvanceEpoch(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + te := newTestEnv(t) + + // Disable automatic epoch advancement and compaction to build up state + te.mgr.allowCleanupWritesOnIndexLoad = false + te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error { + return nil + } + + te.verifyCurrentWriteEpoch(t, 0) + + p, err := te.mgr.getParameters(ctx) + require.NoError(t, err) + + idxCount := p.GetEpochAdvanceOnCountThreshold() + // Create sufficient indexes blobs and move clock forward to advance epoch. + for i := 0; i < idxCount; i++ { + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i)) + } + + te.verifyCurrentWriteEpoch(t, 0) + + // Advance the time so that the difference in times for writes will force + // new epochs. + te.ft.Advance(p.MinEpochDuration + 1*time.Hour) + + err = te.mgr.Refresh(ctx) + require.NoError(t, err) + + te.verifyCurrentWriteEpoch(t, 0) + + // one more to go over the threshold + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(idxCount)) + err = te.mgr.Refresh(ctx) + + require.NoError(t, err) + te.verifyCurrentWriteEpoch(t, 0) + + err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + + require.NoError(t, err) + + err = te.mgr.Refresh(ctx) // force state refresh + + require.NoError(t, err) + te.verifyCurrentWriteEpoch(t, 1) +} + +type faultyParamsProvider struct { + err error +} + +func (p faultyParamsProvider) GetParameters(ctx context.Context) (*Parameters, error) { + return nil, p.err +} + +func TestMaybeAdvanceEpoch_GetParametersError(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + ctx := testlogging.Context(t) + + paramsError := errors.New("no parameters error") + te.mgr.paramProvider = faultyParamsProvider{err: paramsError} + + err := te.mgr.MaybeAdvanceWriteEpoch(ctx) + + require.Error(t, err) + require.ErrorIs(t, err, paramsError) +} + +func TestMaybeAdvanceEpoch_Error(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + te := newTestEnv(t) + + // Disable automatic epoch advancement and compaction to build up state + te.mgr.allowCleanupWritesOnIndexLoad = false + te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error { + return nil + } + + te.verifyCurrentWriteEpoch(t, 0) + + p, err := te.mgr.getParameters(ctx) + require.NoError(t, err) + + idxCount := p.GetEpochAdvanceOnCountThreshold() + // Create sufficient indexes blobs and move clock forward to advance epoch. + for i := 0; i < idxCount; i++ { + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i)) + } + + // Advance the time so that the difference in times for writes will force + // new epochs. + te.ft.Advance(p.MinEpochDuration + 1*time.Hour) + + // one more to go over the threshold + te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(idxCount)) + err = te.mgr.Refresh(ctx) + + require.NoError(t, err) + te.verifyCurrentWriteEpoch(t, 0) + + berr := errors.New("advance epoch put blob error") + te.faultyStorage.AddFaults(blobtesting.MethodPutBlob, + fault.New().ErrorInstead(berr)) + + err = te.mgr.MaybeAdvanceWriteEpoch(ctx) + + require.Error(t, err) + require.ErrorIs(t, err, berr) +} + func TestForceAdvanceEpoch(t *testing.T) { te := newTestEnv(t) @@ -825,3 +960,28 @@ func (e *Manager) forceAdvanceEpoch(ctx context.Context) error { return nil } + +func (te *epochManagerTestEnv) verifyCurrentWriteEpoch(t *testing.T, expectedEpoch int) { + t.Helper() + + // load current epoch directly from index blobs in the backend storage + cs := CurrentSnapshot{ + WriteEpoch: 0, + EpochStartTime: map[int]time.Time{}, + UncompactedEpochSets: map[int][]blob.Metadata{}, + SingleEpochCompactionSets: map[int][]blob.Metadata{}, + } + + ctx := testlogging.Context(t) + err := te.mgr.loadWriteEpoch(ctx, &cs) + + require.NoError(t, err) + require.Equal(t, expectedEpoch, cs.WriteEpoch) + + // check current epoch via the epoch manager, this may or may not cause + // a refresh from storage. + cs, err = te.mgr.Current(ctx) + + require.NoError(t, err) + require.Equal(t, expectedEpoch, cs.WriteEpoch) +}