refactor(general): remove ability to enable compaction on index load (#3834)

Cleanup.

- Fixes: #3638
- #3639
This commit is contained in:
Julio López
2024-05-01 14:33:46 -07:00
committed by GitHub
parent f4b2034898
commit ad06bb20b1
2 changed files with 8 additions and 117 deletions

View File

@@ -6,8 +6,6 @@
import (
"context"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
@@ -16,7 +14,6 @@
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/completeset"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
@@ -188,8 +185,6 @@ type Manager struct {
log logging.Logger
timeFunc func() time.Time
allowCleanupWritesOnIndexLoad bool
// wait group that waits for all compaction and cleanup goroutines.
backgroundWork sync.WaitGroup
@@ -474,39 +469,9 @@ func (e *Manager) refreshLocked(ctx context.Context) error {
}
}
return e.maybeCompactAndCleanupLocked(ctx, p)
}
func (e *Manager) maybeCompactAndCleanupLocked(ctx context.Context, p *Parameters) error {
if !e.allowWritesOnLoad() {
e.log.Debug("not performing epoch index cleanup")
return nil
}
cs := e.lastKnownState
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) {
if err := e.advanceEpochMarker(ctx, cs); err != nil {
return errors.Wrap(err, "error advancing epoch")
}
}
e.maybeGenerateNextRangeCheckpointAsync(ctx, cs, p)
e.maybeStartCleanupAsync(ctx, cs, p)
e.maybeOptimizeRangeCheckpointsAsync(ctx, cs)
return nil
}
// allowWritesOnLoad returns whether writes for index cleanup operations,
// such as index compaction, can be done during index reads.
// These index cleanup operations are disabled when using read-only storage
// since they will fail when they try to mutate the underlying storage.
func (e *Manager) allowWritesOnLoad() bool {
return e.allowCleanupWritesOnIndexLoad && !e.st.IsReadOnly()
}
func (e *Manager) loadWriteEpoch(ctx context.Context, cs *CurrentSnapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, EpochMarkerIndexBlobPrefix)
if err != nil {
@@ -621,28 +586,6 @@ func (e *Manager) MaybeGenerateRangeCheckpoint(ctx context.Context) error {
return nil
}
func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p)
if !compact {
e.log.Debug("not generating range checkpoint")
return
}
e.log.Debugf("generating range checkpoint")
e.backgroundWork.Add(1)
// we're starting background work, ignore parent cancellation signal.
ctxutil.GoDetached(ctx, func(ctx context.Context) {
defer e.backgroundWork.Done()
if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil {
e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err)
}
})
}
func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compactRange bool) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
@@ -661,24 +604,6 @@ func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compact
return latestSettled, firstNonRangeCompacted, true
}
func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) {
// TODO: implement me
_ = cs
}
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
e.backgroundWork.Add(1)
// we're starting background work, ignore parent cancellation signal.
ctxutil.GoDetached(ctx, func(ctx context.Context) {
defer e.backgroundWork.Done()
if err := e.cleanupInternal(ctx, cs, p); err != nil {
e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err)
}
})
}
func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[int][]blob.Metadata, error) {
var mu sync.Mutex
@@ -1077,21 +1002,6 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna
uncompactedBlobs = ue
}
if epochSettled && e.allowWritesOnLoad() {
e.backgroundWork.Add(1)
// we're starting background work, ignore parent cancellation signal.
ctxutil.GoDetached(ctx, func(ctx context.Context) {
defer e.backgroundWork.Done()
e.log.Debugf("starting single-epoch compaction of %v", epoch)
if err := e.compact(ctx, blob.IDsFromMetadata(uncompactedBlobs), compactedEpochBlobPrefix(epoch)); err != nil {
e.log.Errorf("unable to compact blobs for epoch %v: %v, performance will be affected", epoch, err)
}
})
}
// return uncompacted blobs to the caller while we're compacting them in background
return uncompactedBlobs, nil
}
@@ -1128,23 +1038,16 @@ func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_%v_", RangeCheckpointIndexBlobPrefix, epoch1, epoch2))
}
func allowWritesOnIndexLoad() bool {
v := strings.ToLower(os.Getenv("KOPIA_ALLOW_WRITE_ON_INDEX_LOAD"))
return v == "true" || v == "1"
}
// NewManager creates new epoch manager.
func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager {
return &Manager{
st: st,
log: log,
compact: compactor,
timeFunc: timeNow,
paramProvider: paramProvider,
allowCleanupWritesOnIndexLoad: allowWritesOnIndexLoad(),
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
writeIndexTooSlow: new(int32),
st: st,
log: log,
compact: compactor,
timeFunc: timeNow,
paramProvider: paramProvider,
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
writeIndexTooSlow: new(int32),
}
}

View File

@@ -635,7 +635,6 @@ func TestMaybeAdvanceEpoch(t *testing.T) {
te := newTestEnv(t)
// Disable automatic epoch advancement and compaction to build up state
te.mgr.allowCleanupWritesOnIndexLoad = false
te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error {
return nil
}
@@ -709,7 +708,6 @@ func TestMaybeAdvanceEpoch_Error(t *testing.T) {
te := newTestEnv(t)
// Disable automatic epoch advancement and compaction to build up state
te.mgr.allowCleanupWritesOnIndexLoad = false
te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error {
return nil
}
@@ -952,7 +950,6 @@ func TestMaybeCompactSingleEpoch(t *testing.T) {
te := newTestEnv(t)
ctx := testlogging.Context(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)
@@ -1056,7 +1053,6 @@ func TestMaybeGenerateRangeCheckpoint_FailToReadState(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
ctx, cancel := context.WithCancel(ctx)
@@ -1072,7 +1068,6 @@ func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)
@@ -1123,7 +1118,6 @@ func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)
@@ -1175,7 +1169,6 @@ func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)
@@ -1321,7 +1314,6 @@ func TestCleanupMarkers_Empty(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
// this should be a no-op
@@ -1335,7 +1327,6 @@ func TestCleanupMarkers_GetParametersError(t *testing.T) {
te := newTestEnv(t)
ctx := testlogging.Context(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
paramsError := errors.New("no parameters error")
te.mgr.paramProvider = faultyParamsProvider{err: paramsError}
@@ -1350,7 +1341,6 @@ func TestCleanupMarkers_FailToReadState(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx, cancel := context.WithCancel(testlogging.Context(t))
te.ft.Advance(1 * time.Hour) // force state refresh in CleanupMarkers
@@ -1365,7 +1355,6 @@ func TestCleanupMarkers_AvoidCleaningUpSingleEpochMarker(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
te.mgr.forceAdvanceEpoch(ctx)
@@ -1394,7 +1383,6 @@ func TestCleanupMarkers_CleanUpManyMarkers(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)
p, err := te.mgr.getParameters(ctx)