diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 7724077b3..3ee0bb325 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -6,8 +6,6 @@ import ( "context" "fmt" - "os" - "strings" "sync" "sync/atomic" "time" @@ -16,7 +14,6 @@ "golang.org/x/sync/errgroup" "github.com/kopia/kopia/internal/completeset" - "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" @@ -188,8 +185,6 @@ type Manager struct { log logging.Logger timeFunc func() time.Time - allowCleanupWritesOnIndexLoad bool - // wait group that waits for all compaction and cleanup goroutines. backgroundWork sync.WaitGroup @@ -474,39 +469,9 @@ func (e *Manager) refreshLocked(ctx context.Context) error { } } - return e.maybeCompactAndCleanupLocked(ctx, p) -} - -func (e *Manager) maybeCompactAndCleanupLocked(ctx context.Context, p *Parameters) error { - if !e.allowWritesOnLoad() { - e.log.Debug("not performing epoch index cleanup") - - return nil - } - - cs := e.lastKnownState - - if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) { - if err := e.advanceEpochMarker(ctx, cs); err != nil { - return errors.Wrap(err, "error advancing epoch") - } - } - - e.maybeGenerateNextRangeCheckpointAsync(ctx, cs, p) - e.maybeStartCleanupAsync(ctx, cs, p) - e.maybeOptimizeRangeCheckpointsAsync(ctx, cs) - return nil } -// allowWritesOnLoad returns whether writes for index cleanup operations, -// such as index compaction, can be done during index reads. -// These index cleanup operations are disabled when using read-only storage -// since they will fail when they try to mutate the underlying storage. -func (e *Manager) allowWritesOnLoad() bool { - return e.allowCleanupWritesOnIndexLoad && !e.st.IsReadOnly() -} - func (e *Manager) loadWriteEpoch(ctx context.Context, cs *CurrentSnapshot) error { blobs, err := blob.ListAllBlobs(ctx, e.st, EpochMarkerIndexBlobPrefix) if err != nil { @@ -621,28 +586,6 @@ func (e *Manager) MaybeGenerateRangeCheckpoint(ctx context.Context) error { return nil } -func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) { - latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p) - if !compact { - e.log.Debug("not generating range checkpoint") - - return - } - - e.log.Debugf("generating range checkpoint") - - e.backgroundWork.Add(1) - - // we're starting background work, ignore parent cancellation signal. - ctxutil.GoDetached(ctx, func(ctx context.Context) { - defer e.backgroundWork.Done() - - if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil { - e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err) - } - }) -} - func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compactRange bool) { latestSettled := cs.WriteEpoch - numUnsettledEpochs if latestSettled < 0 { @@ -661,24 +604,6 @@ func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compact return latestSettled, firstNonRangeCompacted, true } -func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) { - // TODO: implement me - _ = cs -} - -func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) { - e.backgroundWork.Add(1) - - // we're starting background work, ignore parent cancellation signal. - ctxutil.GoDetached(ctx, func(ctx context.Context) { - defer e.backgroundWork.Done() - - if err := e.cleanupInternal(ctx, cs, p); err != nil { - e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err) - } - }) -} - func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[int][]blob.Metadata, error) { var mu sync.Mutex @@ -1077,21 +1002,6 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna uncompactedBlobs = ue } - if epochSettled && e.allowWritesOnLoad() { - e.backgroundWork.Add(1) - - // we're starting background work, ignore parent cancellation signal. - ctxutil.GoDetached(ctx, func(ctx context.Context) { - defer e.backgroundWork.Done() - - e.log.Debugf("starting single-epoch compaction of %v", epoch) - - if err := e.compact(ctx, blob.IDsFromMetadata(uncompactedBlobs), compactedEpochBlobPrefix(epoch)); err != nil { - e.log.Errorf("unable to compact blobs for epoch %v: %v, performance will be affected", epoch, err) - } - }) - } - // return uncompacted blobs to the caller while we're compacting them in background return uncompactedBlobs, nil } @@ -1128,23 +1038,16 @@ func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID { return blob.ID(fmt.Sprintf("%v%v_%v_", RangeCheckpointIndexBlobPrefix, epoch1, epoch2)) } -func allowWritesOnIndexLoad() bool { - v := strings.ToLower(os.Getenv("KOPIA_ALLOW_WRITE_ON_INDEX_LOAD")) - - return v == "true" || v == "1" -} - // NewManager creates new epoch manager. func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager { return &Manager{ - st: st, - log: log, - compact: compactor, - timeFunc: timeNow, - paramProvider: paramProvider, - allowCleanupWritesOnIndexLoad: allowWritesOnIndexLoad(), - getCompleteIndexSetTooSlow: new(int32), - committedStateRefreshTooSlow: new(int32), - writeIndexTooSlow: new(int32), + st: st, + log: log, + compact: compactor, + timeFunc: timeNow, + paramProvider: paramProvider, + getCompleteIndexSetTooSlow: new(int32), + committedStateRefreshTooSlow: new(int32), + writeIndexTooSlow: new(int32), } } diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index 9b4c37ca6..fb1605601 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -635,7 +635,6 @@ func TestMaybeAdvanceEpoch(t *testing.T) { te := newTestEnv(t) // Disable automatic epoch advancement and compaction to build up state - te.mgr.allowCleanupWritesOnIndexLoad = false te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error { return nil } @@ -709,7 +708,6 @@ func TestMaybeAdvanceEpoch_Error(t *testing.T) { te := newTestEnv(t) // Disable automatic epoch advancement and compaction to build up state - te.mgr.allowCleanupWritesOnIndexLoad = false te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error { return nil } @@ -952,7 +950,6 @@ func TestMaybeCompactSingleEpoch(t *testing.T) { te := newTestEnv(t) ctx := testlogging.Context(t) - te.mgr.allowCleanupWritesOnIndexLoad = false p, err := te.mgr.getParameters(ctx) require.NoError(t, err) @@ -1056,7 +1053,6 @@ func TestMaybeGenerateRangeCheckpoint_FailToReadState(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) ctx, cancel := context.WithCancel(ctx) @@ -1072,7 +1068,6 @@ func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) p, err := te.mgr.getParameters(ctx) @@ -1123,7 +1118,6 @@ func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) p, err := te.mgr.getParameters(ctx) @@ -1175,7 +1169,6 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) p, err := te.mgr.getParameters(ctx) @@ -1321,7 +1314,6 @@ func TestCleanupMarkers_Empty(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) // this should be a no-op @@ -1335,7 +1327,6 @@ func TestCleanupMarkers_GetParametersError(t *testing.T) { te := newTestEnv(t) ctx := testlogging.Context(t) - te.mgr.allowCleanupWritesOnIndexLoad = false paramsError := errors.New("no parameters error") te.mgr.paramProvider = faultyParamsProvider{err: paramsError} @@ -1350,7 +1341,6 @@ func TestCleanupMarkers_FailToReadState(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx, cancel := context.WithCancel(testlogging.Context(t)) te.ft.Advance(1 * time.Hour) // force state refresh in CleanupMarkers @@ -1365,7 +1355,6 @@ func TestCleanupMarkers_AvoidCleaningUpSingleEpochMarker(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) te.mgr.forceAdvanceEpoch(ctx) @@ -1394,7 +1383,6 @@ func TestCleanupMarkers_CleanUpManyMarkers(t *testing.T) { t.Parallel() te := newTestEnv(t) - te.mgr.allowCleanupWritesOnIndexLoad = false ctx := testlogging.Context(t) p, err := te.mgr.getParameters(ctx)