refactor(general): add epoch.Manager.MaybeAdvanceEpoch helper (#3709)

Add
- TestMabyeAdvanceEpoch
- TestMabyeAdvanceEpoch_Empty
- TestMaybeAdvanceEpoch_Error
- TestMaybeAdvanceEpoch_GetParametersError

Ref:
- #3638
- #3645
- #3651
This commit is contained in:
Julio López
2024-03-06 17:12:49 -08:00
committed by GitHub
parent 9a689d4318
commit be49fcd42b
2 changed files with 179 additions and 0 deletions

View File

@@ -723,6 +723,25 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
return nil
}
// MaybeAdvanceWriteEpoch writes a new write epoch marker when a new write
// epoch should be started, otherwise it does not do anything.
func (e *Manager) MaybeAdvanceWriteEpoch(ctx context.Context) error {
p, err := e.getParameters(ctx)
if err != nil {
return err
}
e.mu.Lock()
cs := e.lastKnownState
e.mu.Unlock()
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) {
return errors.Wrap(e.advanceEpochMarker(ctx, cs), "error advancing epoch")
}
return nil
}
func (e *Manager) advanceEpochMarker(ctx context.Context, cs CurrentSnapshot) error {
blobID := blob.ID(fmt.Sprintf("%v%v", string(EpochMarkerIndexBlobPrefix), cs.WriteEpoch+1))

View File

@@ -529,6 +529,141 @@ func TestSlowWrite_MovesToNextEpochTwice(t *testing.T) {
require.Contains(t, err.Error(), "slow index write")
}
func TestMaybeAdvanceEpoch_Empty(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
ctx := testlogging.Context(t)
te.verifyCurrentWriteEpoch(t, 0)
// this should be a no-op
err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
// check current epoch again
te.verifyCurrentWriteEpoch(t, 0)
}
func TestMaybeAdvanceEpoch(t *testing.T) {
t.Parallel()
ctx := testlogging.Context(t)
te := newTestEnv(t)
// Disable automatic epoch advancement and compaction to build up state
te.mgr.allowCleanupWritesOnIndexLoad = false
te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error {
return nil
}
te.verifyCurrentWriteEpoch(t, 0)
p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)
idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for i := 0; i < idxCount; i++ {
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i))
}
te.verifyCurrentWriteEpoch(t, 0)
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
te.verifyCurrentWriteEpoch(t, 0)
// one more to go over the threshold
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(idxCount))
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
te.verifyCurrentWriteEpoch(t, 0)
err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)
err = te.mgr.Refresh(ctx) // force state refresh
require.NoError(t, err)
te.verifyCurrentWriteEpoch(t, 1)
}
type faultyParamsProvider struct {
err error
}
func (p faultyParamsProvider) GetParameters(ctx context.Context) (*Parameters, error) {
return nil, p.err
}
func TestMaybeAdvanceEpoch_GetParametersError(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
ctx := testlogging.Context(t)
paramsError := errors.New("no parameters error")
te.mgr.paramProvider = faultyParamsProvider{err: paramsError}
err := te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.Error(t, err)
require.ErrorIs(t, err, paramsError)
}
func TestMaybeAdvanceEpoch_Error(t *testing.T) {
t.Parallel()
ctx := testlogging.Context(t)
te := newTestEnv(t)
// Disable automatic epoch advancement and compaction to build up state
te.mgr.allowCleanupWritesOnIndexLoad = false
te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error {
return nil
}
te.verifyCurrentWriteEpoch(t, 0)
p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)
idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for i := 0; i < idxCount; i++ {
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i))
}
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
// one more to go over the threshold
te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(idxCount))
err = te.mgr.Refresh(ctx)
require.NoError(t, err)
te.verifyCurrentWriteEpoch(t, 0)
berr := errors.New("advance epoch put blob error")
te.faultyStorage.AddFaults(blobtesting.MethodPutBlob,
fault.New().ErrorInstead(berr))
err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.Error(t, err)
require.ErrorIs(t, err, berr)
}
func TestForceAdvanceEpoch(t *testing.T) {
te := newTestEnv(t)
@@ -825,3 +960,28 @@ func (e *Manager) forceAdvanceEpoch(ctx context.Context) error {
return nil
}
func (te *epochManagerTestEnv) verifyCurrentWriteEpoch(t *testing.T, expectedEpoch int) {
t.Helper()
// load current epoch directly from index blobs in the backend storage
cs := CurrentSnapshot{
WriteEpoch: 0,
EpochStartTime: map[int]time.Time{},
UncompactedEpochSets: map[int][]blob.Metadata{},
SingleEpochCompactionSets: map[int][]blob.Metadata{},
}
ctx := testlogging.Context(t)
err := te.mgr.loadWriteEpoch(ctx, &cs)
require.NoError(t, err)
require.Equal(t, expectedEpoch, cs.WriteEpoch)
// check current epoch via the epoch manager, this may or may not cause
// a refresh from storage.
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, expectedEpoch, cs.WriteEpoch)
}