From 0756dee6d5128eb7204253839699691eef19ab4f Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 19 Jun 2021 16:48:45 -0700 Subject: [PATCH] More epoch manager work (#1147) * content: added packIndexBuilder sharding * epoch manager improvements --- internal/epoch/epoch_advance_test.go | 14 +- internal/epoch/epoch_manager.go | 448 ++++++++++++++++----------- internal/epoch/epoch_manager_test.go | 253 ++++++++++----- internal/epoch/epoch_range.go | 47 +++ internal/epoch/epoch_range_test.go | 63 ++++ internal/epoch/epoch_utils.go | 39 +++ repo/content/builder.go | 24 ++ repo/content/packindex_test.go | 65 ++++ 8 files changed, 687 insertions(+), 266 deletions(-) create mode 100644 internal/epoch/epoch_range.go create mode 100644 internal/epoch/epoch_range_test.go diff --git a/internal/epoch/epoch_advance_test.go b/internal/epoch/epoch_advance_test.go index 0cabdb346..efa98c93e 100644 --- a/internal/epoch/epoch_advance_test.go +++ b/internal/epoch/epoch_advance_test.go @@ -18,9 +18,9 @@ func TestShouldAdvanceEpoch(t *testing.T) { Timestamp: t0, Length: 1, }) - for i := 0; i < defaultParams.EpochAdvanceOnCountThreshold; i++ { + for i := 0; i < DefaultParameters.EpochAdvanceOnCountThreshold; i++ { lotsOfMetadata = append(lotsOfMetadata, blob.Metadata{ - Timestamp: t0.Add(defaultParams.MinEpochDuration), + Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: 1, }) } @@ -45,7 +45,7 @@ func TestShouldAdvanceEpoch(t *testing.T) { { desc: "two blobs, not enough time passed, size enough to advance", bms: []blob.Metadata{ - {Timestamp: t0.Add(defaultParams.MinEpochDuration - 1), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold}, + {Timestamp: t0.Add(DefaultParameters.MinEpochDuration - 1), Length: DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold}, {Timestamp: t0, Length: 1}, }, want: false, @@ -54,7 +54,7 @@ func TestShouldAdvanceEpoch(t *testing.T) { desc: "two blobs, enough time passed, total size enough to advance", bms: []blob.Metadata{ {Timestamp: t0, Length: 1}, - {Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold}, + {Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold}, }, want: true, }, @@ -62,7 +62,7 @@ func TestShouldAdvanceEpoch(t *testing.T) { desc: "two blobs, enough time passed, total size not enough to advance", bms: []blob.Metadata{ {Timestamp: t0, Length: 1}, - {Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold - 2}, + {Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold - 2}, }, want: false, }, @@ -70,7 +70,7 @@ func TestShouldAdvanceEpoch(t *testing.T) { desc: "enough time passed, count not enough to advance", bms: []blob.Metadata{ {Timestamp: t0, Length: 1}, - {Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: 1}, + {Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: 1}, }, want: false, }, @@ -83,7 +83,7 @@ func TestShouldAdvanceEpoch(t *testing.T) { for _, tc := range cases { require.Equal(t, tc.want, - shouldAdvance(tc.bms, defaultParams.MinEpochDuration, defaultParams.EpochAdvanceOnCountThreshold, defaultParams.EpochAdvanceOnTotalSizeBytesThreshold), + shouldAdvance(tc.bms, DefaultParameters.MinEpochDuration, DefaultParameters.EpochAdvanceOnCountThreshold, DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold), tc.desc) } } diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 36a253119..b0b73c90f 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -15,7 +15,6 @@ "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" ) @@ -23,6 +22,12 @@ // LatestEpoch represents the current epoch number in GetCompleteIndexSet. const LatestEpoch = -1 +const ( + initiaRefreshAttemptSleep = 100 * time.Millisecond + maxRefreshAttemptSleep = 15 * time.Second + maxRefreshAttemptSleepExponent = 1.5 +) + // Parameters encapsulates all parameters that influence the behavior of epoch manager. type Parameters struct { // how frequently each client will list blobs to determine the current epoch. @@ -47,58 +52,61 @@ type Parameters struct { DeleteParallelism int } +// DefaultParameters contains default epoch manager parameters. // nolint:gomnd -var defaultParams = Parameters{ +var DefaultParameters = Parameters{ EpochRefreshFrequency: 20 * time.Minute, FullCheckpointFrequency: 7, CleanupSafetyMargin: 1 * time.Hour, MinEpochDuration: 6 * time.Hour, - EpochAdvanceOnCountThreshold: 100, + EpochAdvanceOnCountThreshold: 20, EpochAdvanceOnTotalSizeBytesThreshold: 10 << 20, DeleteParallelism: 4, } -// snapshot captures a point-in time snapshot of a repository indexes, including current epoch -// information and existing checkpoints. -type snapshot struct { - WriteEpoch int `json:"writeEpoch"` - LatestFullCheckpointEpoch int `json:"latestCheckpointEpoch"` - FullCheckpointSets map[int][]blob.Metadata `json:"fullCheckpointSets"` - SingleEpochCompactionSets map[int][]blob.Metadata `json:"singleEpochCompactionSets"` - EpochStartTime map[int]time.Time `json:"epochStartTimes"` - ValidUntil time.Time `json:"validUntil"` // time after which the contents of this struct are no longer valid +// CurrentSnapshot captures a point-in time snapshot of a repository indexes, including current epoch +// information and compaction set. +type CurrentSnapshot struct { + WriteEpoch int `json:"writeEpoch"` + UncompactedEpochSets map[int][]blob.Metadata `json:"unsettled"` + LongestRangeCheckpointSets []*RangeMetadata `json:"longestRangeCheckpointSets"` + SingleEpochCompactionSets map[int][]blob.Metadata `json:"singleEpochCompactionSets"` + EpochStartTime map[int]time.Time `json:"epochStartTimes"` + ValidUntil time.Time `json:"validUntil"` // time after which the contents of this struct are no longer valid } -func (cs *snapshot) isSettledEpochNumber(epoch int) bool { +func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool { return epoch <= cs.WriteEpoch-numUnsettledEpochs } // Manager manages repository epochs. type Manager struct { + Params Parameters + st blob.Storage compact CompactionFunc log logging.Logger timeFunc func() time.Time - params Parameters // wait group that waits for all compaction and cleanup goroutines. backgroundWork sync.WaitGroup // mutable under lock, data invalid until refresh succeeds at least once. mu sync.Mutex - lastKnownState snapshot + lastKnownState CurrentSnapshot // counters keeping track of the number of times operations were too slow and had to // be retried, for testability. committedStateRefreshTooSlow *int32 getCompleteIndexSetTooSlow *int32 + writeIndexTooSlow *int32 } const ( epochMarkerIndexBlobPrefix blob.ID = "xe" uncompactedIndexBlobPrefix blob.ID = "xn" singleEpochCompactionBlobPrefix blob.ID = "xs" - fullCheckpointIndexBlobPrefix blob.ID = "xf" + rangeCheckpointIndexBlobPrefix blob.ID = "xr" numUnsettledEpochs = 2 ) @@ -113,6 +121,27 @@ func (e *Manager) Flush() { e.backgroundWork.Wait() } +// Current retrieves current snapshot. +func (e *Manager) Current(ctx context.Context) (CurrentSnapshot, error) { + return e.committedState(ctx) +} + +// ForceAdvanceEpoch advances current epoch unconditionally. +func (e *Manager) ForceAdvanceEpoch(ctx context.Context) error { + cs, err := e.committedState(ctx) + if err != nil { + return err + } + + e.Invalidate() + + if err := e.advanceEpoch(ctx, cs); err != nil { + return errors.Wrap(err, "error advancing epoch") + } + + return nil +} + // Refresh refreshes information about current epoch. func (e *Manager) Refresh(ctx context.Context) error { e.mu.Lock() @@ -131,9 +160,31 @@ func (e *Manager) Cleanup(ctx context.Context) error { return e.cleanupInternal(ctx, cs) } -func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error { +func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error { eg, ctx := errgroup.WithContext(ctx) + // find max timestamp recently written to the repository to establish storage clock. + // we will be deleting blobs whose timestamps are sufficiently old enough relative + // to this max time. This assumes that storage clock moves forward somewhat reasonably. + var maxTime time.Time + + for _, v := range cs.UncompactedEpochSets { + for _, bm := range v { + if bm.Timestamp.After(maxTime) { + maxTime = bm.Timestamp + } + } + } + + if maxTime.IsZero() { + return nil + } + + // only delete blobs if a suitable replacement exists and has been written sufficiently + // long ago. we don't want to delete blobs that are created too recently, because other clients + // may have not observed them yet. + maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin) + // delete epoch markers for epoch < current-1 eg.Go(func() error { var toDelete []blob.ID @@ -150,7 +201,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error { return errors.Wrap(err, "error listing epoch markers") } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting index blob marker") + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting index blob marker") }) // delete uncompacted indexes for epochs that already have single-epoch compaction @@ -164,76 +215,51 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error { var toDelete []blob.ID for _, bm := range blobs { - if cs.safeToDeleteUncompactedBlob(bm, e.params.CleanupSafetyMargin) { - toDelete = append(toDelete, bm.BlobID) + if epoch, ok := epochNumberFromBlobID(bm.BlobID); ok { + if blobSetWrittenEarlyEnough(cs.SingleEpochCompactionSets[epoch], maxReplacementTime) { + toDelete = append(toDelete, bm.BlobID) + } } } - if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism); err != nil { + if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism); err != nil { return errors.Wrap(err, "unable to delete uncompacted blobs") } return nil }) - // delete single-epoch compacted indexes epoch numbers for which full-world state compacted exist - if cs.LatestFullCheckpointEpoch > 0 { - eg.Go(func() error { - blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix) - if err != nil { - return errors.Wrap(err, "error refreshing epochs") - } - - var toDelete []blob.ID - - for _, bm := range blobs { - epoch, ok := epochNumberFromBlobID(bm.BlobID) - if !ok { - continue - } - - if epoch < cs.LatestFullCheckpointEpoch { - toDelete = append(toDelete, bm.BlobID) - } - } - - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting single-epoch compacted blobs") - }) - } - return errors.Wrap(eg.Wait(), "error cleaning up index blobs") } -func (cs *snapshot) safeToDeleteUncompactedBlob(bm blob.Metadata, safetyMargin time.Duration) bool { - epoch, ok := epochNumberFromBlobID(bm.BlobID) - if !ok { - return false - } - - if epoch < cs.LatestFullCheckpointEpoch { - return true - } - - cset := cs.SingleEpochCompactionSets[epoch] - if cset == nil { - // single-epoch compaction set does not exist for this epoch, don't delete. +func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTime time.Time) bool { + max := blob.MaxTimestamp(replacementSet) + if max.IsZero() { return false } // compaction set was written sufficiently long ago to be reliably discovered by all // other clients - we can delete uncompacted blobs for this epoch. - compactionSetWriteTime := blob.MaxTimestamp(cset) - - return compactionSetWriteTime.Add(safetyMargin).Before(cs.EpochStartTime[cs.WriteEpoch]) + return blob.MaxTimestamp(replacementSet).Before(maxReplacementTime) } func (e *Manager) refreshLocked(ctx context.Context) error { - return errors.Wrap(retry.WithExponentialBackoffNoValue(ctx, "epoch manager refresh", func() error { - return e.refreshAttemptLocked(ctx) - }, retry.Always), "error refreshing") + nextDelayTime := initiaRefreshAttemptSleep + + for err := e.refreshAttemptLocked(ctx); err != nil; err = e.refreshAttemptLocked(ctx) { + e.log.Debugf("refresh attempt failed: %v, sleeping %v before next retry", err, nextDelayTime) + + nextDelayTime = time.Duration(float64(nextDelayTime) * maxRefreshAttemptSleepExponent) + + if nextDelayTime > maxRefreshAttemptSleep { + nextDelayTime = maxRefreshAttemptSleep + } + } + + return nil } -func (e *Manager) loadWriteEpoch(ctx context.Context, cs *snapshot) error { +func (e *Manager) loadWriteEpoch(ctx context.Context, cs *CurrentSnapshot) error { blobs, err := blob.ListAllBlobs(ctx, e.st, epochMarkerIndexBlobPrefix) if err != nil { return errors.Wrap(err, "error loading write epoch") @@ -250,26 +276,34 @@ func (e *Manager) loadWriteEpoch(ctx context.Context, cs *snapshot) error { return nil } -func (e *Manager) loadFullCheckpoints(ctx context.Context, cs *snapshot) error { - blobs, err := blob.ListAllBlobs(ctx, e.st, fullCheckpointIndexBlobPrefix) +func (e *Manager) loadRangeCheckpoints(ctx context.Context, cs *CurrentSnapshot) error { + blobs, err := blob.ListAllBlobs(ctx, e.st, rangeCheckpointIndexBlobPrefix) if err != nil { return errors.Wrap(err, "error loading full checkpoints") } - for epoch, bms := range groupByEpochNumber(blobs) { - if comp := findCompleteSetOfBlobs(bms); comp != nil { - cs.FullCheckpointSets[epoch] = comp + var rangeCheckpointSets []*RangeMetadata - if epoch > cs.LatestFullCheckpointEpoch { - cs.LatestFullCheckpointEpoch = epoch + for epoch1, m := range groupByEpochRanges(blobs) { + for epoch2, bms := range m { + if comp := findCompleteSetOfBlobs(bms); comp != nil { + erm := &RangeMetadata{ + MinEpoch: epoch1, + MaxEpoch: epoch2, + Blobs: comp, + } + + rangeCheckpointSets = append(rangeCheckpointSets, erm) } } } + cs.LongestRangeCheckpointSets = findLongestRangeCheckpoint(rangeCheckpointSets) + return nil } -func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *snapshot) error { +func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSnapshot) error { blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix) if err != nil { return errors.Wrap(err, "error loading single-epoch compactions") @@ -284,8 +318,18 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *snapshot) return nil } -func (e *Manager) maybeStartFullCheckpointAsync(ctx context.Context, cs snapshot) { - if cs.WriteEpoch-cs.LatestFullCheckpointEpoch < e.params.FullCheckpointFrequency { +func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot) { + latestSettled := cs.WriteEpoch - numUnsettledEpochs + if latestSettled < 0 { + return + } + + firstNonRangeCompacted := 0 + if len(cs.LongestRangeCheckpointSets) > 0 { + firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1 + } + + if latestSettled-firstNonRangeCompacted < e.Params.FullCheckpointFrequency { return } @@ -294,13 +338,16 @@ func (e *Manager) maybeStartFullCheckpointAsync(ctx context.Context, cs snapshot go func() { defer e.backgroundWork.Done() - if err := e.generateFullCheckpointFromCommittedState(ctx, cs, cs.WriteEpoch-numUnsettledEpochs); err != nil { + if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil { e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err) } }() } -func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs snapshot) { +func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) { +} + +func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot) { e.backgroundWork.Add(1) go func() { @@ -312,16 +359,49 @@ func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs snapshot) { }() } +func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[int][]blob.Metadata, error) { + var mu sync.Mutex + + result := map[int][]blob.Metadata{} + + eg, ctx := errgroup.WithContext(ctx) + + for n := min; n <= max; n++ { + n := n + if n < 0 { + continue + } + + eg.Go(func() error { + bm, err := blob.ListAllBlobs(ctx, e.st, uncompactedEpochBlobPrefix(n)) + if err != nil { + return errors.Wrapf(err, "error listing uncompacted epoch %v", n) + } + + mu.Lock() + defer mu.Unlock() + + result[n] = bm + return nil + }) + } + + if err := eg.Wait(); err != nil { + return nil, errors.Wrap(err, "error listing uncompacted epochs") + } + + return result, nil +} + // refreshAttemptLocked attempts to load the committedState of // the index and updates `lastKnownState` state atomically when complete. func (e *Manager) refreshAttemptLocked(ctx context.Context) error { - cs := snapshot{ + cs := CurrentSnapshot{ WriteEpoch: 0, EpochStartTime: map[int]time.Time{}, + UncompactedEpochSets: map[int][]blob.Metadata{}, SingleEpochCompactionSets: map[int][]blob.Metadata{}, - LatestFullCheckpointEpoch: 0, - FullCheckpointSets: map[int][]blob.Metadata{}, - ValidUntil: e.timeFunc().Add(e.params.EpochRefreshFrequency), + ValidUntil: e.timeFunc().Add(e.Params.EpochRefreshFrequency), } eg, ctx := errgroup.WithContext(ctx) @@ -332,13 +412,36 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { return e.loadSingleEpochCompactions(ctx, &cs) }) eg.Go(func() error { - return e.loadFullCheckpoints(ctx, &cs) + return e.loadRangeCheckpoints(ctx, &cs) }) if err := eg.Wait(); err != nil { return errors.Wrap(err, "error refreshing") } + ues, err := e.loadUncompactedEpochs(ctx, cs.WriteEpoch-1, cs.WriteEpoch+1) + if err != nil { + return errors.Wrap(err, "error loading uncompacted epochs") + } + + for epoch := range ues { + ues[epoch] = blobsWrittenBefore(ues[epoch], cs.EpochStartTime[epoch+numUnsettledEpochs]) + } + + cs.UncompactedEpochSets = ues + + e.log.Debugf("current epoch %v, uncompacted epoch sets %v %v %v", + cs.WriteEpoch, + len(ues[cs.WriteEpoch-1]), + len(ues[cs.WriteEpoch]), + len(ues[cs.WriteEpoch+1])) + + if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Params.MinEpochDuration, e.Params.EpochAdvanceOnCountThreshold, e.Params.EpochAdvanceOnTotalSizeBytesThreshold) { + if err := e.advanceEpoch(ctx, cs); err != nil { + return errors.Wrap(err, "error advancing epoch") + } + } + if e.timeFunc().After(cs.ValidUntil) { atomic.AddInt32(e.committedStateRefreshTooSlow, 1) @@ -347,37 +450,36 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { e.lastKnownState = cs - e.maybeStartFullCheckpointAsync(ctx, cs) + e.maybeGenerateNextRangeCheckpointAsync(ctx, cs) e.maybeStartCleanupAsync(ctx, cs) - - e.log.Debugf("current epoch %v started at %v", cs.WriteEpoch, cs.EpochStartTime[cs.WriteEpoch]) + e.maybeOptimizeRangeCheckpointsAsync(ctx, cs) return nil } -func (e *Manager) committedState(ctx context.Context) (snapshot, error) { +func (e *Manager) advanceEpoch(ctx context.Context, cs CurrentSnapshot) error { + blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), cs.WriteEpoch+1)) + + if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil { + return errors.Wrap(err, "error writing epoch marker") + } + + return nil +} + +func (e *Manager) committedState(ctx context.Context) (CurrentSnapshot, error) { e.mu.Lock() defer e.mu.Unlock() if e.timeFunc().After(e.lastKnownState.ValidUntil) { if err := e.refreshLocked(ctx); err != nil { - return snapshot{}, err + return CurrentSnapshot{}, err } } return e.lastKnownState, nil } -// Current returns the current epoch number. -func (e *Manager) Current(ctx context.Context) (int, error) { - cs, err := e.committedState(ctx) - if err != nil { - return 0, err - } - - return cs.WriteEpoch, nil -} - // GetCompleteIndexSet returns the set of blobs forming a complete index set up to the provided epoch number. func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob.Metadata, error) { for { @@ -390,8 +492,9 @@ func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob maxEpoch = cs.WriteEpoch + 1 } - result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, maxEpoch) + result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, 0, maxEpoch) if e.timeFunc().Before(cs.ValidUntil) { + e.log.Debugf("Complete Index Set for [%v..%v]: %v", 0, maxEpoch, blob.IDsFromMetadata(result)) return result, err } @@ -408,72 +511,89 @@ func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob } } -func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs snapshot, maxEpoch int) ([]blob.Metadata, error) { - var ( - startEpoch int +// WriteIndex writes new index blob by picking the appropriate prefix based on current epoch. +func (e *Manager) WriteIndex(ctx context.Context, unprefixedBlobID blob.ID, data blob.Bytes) (blob.Metadata, error) { + for { + cs, err := e.committedState(ctx) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "error getting committed state") + } - resultMutex sync.Mutex - result []blob.Metadata - ) + blobID := uncompactedEpochBlobPrefix(cs.WriteEpoch) + unprefixedBlobID - for i := maxEpoch; i >= 0; i-- { - if blobs := cs.FullCheckpointSets[i]; blobs != nil { - result = append(result, blobs...) - startEpoch = i + 1 + if err := e.st.PutBlob(ctx, blobID, data); err != nil { + return blob.Metadata{}, errors.Wrap(err, "error writing index blob") + } - e.log.Debugf("using full checkpoint at epoch %v", i) + if !e.timeFunc().Before(cs.ValidUntil) { + e.log.Debugf("write was too slow, retrying") + atomic.AddInt32(e.writeIndexTooSlow, 1) - break + continue + } + + e.Invalidate() + + // nolint:wrapcheck + return e.st.GetMetadata(ctx, blobID) + } +} + +// Invalidate ensures that all cached index information is discarded. +func (e *Manager) Invalidate() { + e.mu.Lock() + defer e.mu.Unlock() + + e.lastKnownState = CurrentSnapshot{} +} + +func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs CurrentSnapshot, minEpoch, maxEpoch int) ([]blob.Metadata, error) { + var result []blob.Metadata + + startEpoch := minEpoch + + for _, c := range cs.LongestRangeCheckpointSets { + if c.MaxEpoch > maxEpoch { + result = append(result, c.Blobs...) + startEpoch = c.MaxEpoch + 1 } } eg, ctx := errgroup.WithContext(ctx) - e.log.Debugf("adding incremental state for epochs %v..%v", startEpoch, maxEpoch) + e.log.Debugf("adding incremental state for epochs %v..%v on top of %v", startEpoch, maxEpoch, result) + cnt := maxEpoch - startEpoch + 1 - for i := startEpoch; i <= maxEpoch; i++ { + tmp := make([][]blob.Metadata, cnt) + + for i := 0; i < cnt; i++ { i := i + ep := i + startEpoch eg.Go(func() error { - s, err := e.getIndexesFromEpochInternal(ctx, cs, i) + s, err := e.getIndexesFromEpochInternal(ctx, cs, ep) if err != nil { - return errors.Wrapf(err, "error getting indexes for epoch %v", i) + return errors.Wrapf(err, "error getting indexes for epoch %v", ep) } - resultMutex.Lock() - result = append(result, s...) - resultMutex.Unlock() + tmp[i] = s return nil }) } - return result, errors.Wrap(eg.Wait(), "error getting indexes") + if err := eg.Wait(); err != nil { + return nil, errors.Wrap(err, "error getting indexes") + } + + for _, v := range tmp { + result = append(result, v...) + } + + return result, nil } -// WroteIndex is invoked after writing an index blob. It will validate whether the index was written -// in the correct epoch. -func (e *Manager) WroteIndex(ctx context.Context, bm blob.Metadata) error { - cs, err := e.committedState(ctx) - if err != nil { - return err - } - - epoch, ok := epochNumberFromBlobID(bm.BlobID) - if !ok { - return errors.Errorf("invalid blob ID written") - } - - if cs.isSettledEpochNumber(epoch) { - return errors.Errorf("index write took to long") - } - - e.invalidate() - - return nil -} - -func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs snapshot, epoch int) ([]blob.Metadata, error) { +func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSnapshot, epoch int) ([]blob.Metadata, error) { // check if the epoch is old enough to possibly have compacted blobs epochSettled := cs.isSettledEpochNumber(epoch) if epochSettled && cs.SingleEpochCompactionSets[epoch] != nil { @@ -515,39 +635,14 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs snapshot, }() } - advance := shouldAdvance(uncompactedBlobs, e.params.MinEpochDuration, e.params.EpochAdvanceOnCountThreshold, e.params.EpochAdvanceOnTotalSizeBytesThreshold) - if advance && epoch == cs.WriteEpoch { - if err := e.advanceEpoch(ctx, cs.WriteEpoch+1); err != nil { - e.log.Errorf("unable to advance epoch: %v, performance will be affected", err) - } - } - // return uncompacted blobs to the caller while we're compacting them in background return uncompactedBlobs, nil } -func (e *Manager) advanceEpoch(ctx context.Context, newEpoch int) error { - blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), newEpoch)) +func (e *Manager) generateRangeCheckpointFromCommittedState(ctx context.Context, cs CurrentSnapshot, minEpoch, maxEpoch int) error { + e.log.Debugf("generating range checkpoint for %v..%v", minEpoch, maxEpoch) - if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil { - return errors.Wrap(err, "error writing epoch marker") - } - - e.invalidate() - - return nil -} - -func (e *Manager) invalidate() { - e.mu.Lock() - defer e.mu.Unlock() - e.lastKnownState = snapshot{} -} - -func (e *Manager) generateFullCheckpointFromCommittedState(ctx context.Context, cs snapshot, epoch int) error { - e.log.Debugf("generating full checkpoint until epoch %v", epoch) - - completeSet, err := e.getCompleteIndexSetForCommittedState(ctx, cs, epoch) + completeSet, err := e.getCompleteIndexSetForCommittedState(ctx, cs, minEpoch, maxEpoch) if err != nil { return errors.Wrap(err, "unable to get full checkpoint") } @@ -556,7 +651,7 @@ func (e *Manager) generateFullCheckpointFromCommittedState(ctx context.Context, return errors.Errorf("not generating full checkpoint - the committed state is no longer valid") } - if err := e.compact(ctx, blob.IDsFromMetadata(completeSet), fullCheckpointBlobPrefix(epoch)); err != nil { + if err := e.compact(ctx, blob.IDsFromMetadata(completeSet), rangeCheckpointBlobPrefix(minEpoch, maxEpoch)); err != nil { return errors.Wrap(err, "unable to compact blobs") } @@ -571,19 +666,22 @@ func compactedEpochBlobPrefix(epoch int) blob.ID { return blob.ID(fmt.Sprintf("%v%v_", singleEpochCompactionBlobPrefix, epoch)) } -func fullCheckpointBlobPrefix(epoch int) blob.ID { - return blob.ID(fmt.Sprintf("%v%v_", fullCheckpointIndexBlobPrefix, epoch)) +func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID { + return blob.ID(fmt.Sprintf("%v%v_%v_", rangeCheckpointIndexBlobPrefix, epoch1, epoch2)) } // NewManager creates new epoch manager. func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, sharedBaseLogger logging.Logger) *Manager { + log := logging.WithPrefix("[epoch-manager] ", sharedBaseLogger) + return &Manager{ st: st, - log: logging.WithPrefix("[epoch-manager] ", sharedBaseLogger), + log: log, compact: compactor, timeFunc: clock.Now, - params: params, + Params: params, getCompleteIndexSetTooSlow: new(int32), committedStateRefreshTooSlow: new(int32), + writeIndexTooSlow: new(int32), } } diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index ce8365777..e2b0cbc35 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -2,6 +2,7 @@ import ( "context" + "encoding/hex" "encoding/json" "fmt" "math" @@ -13,8 +14,10 @@ "github.com/pkg/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/faketime" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" @@ -92,11 +95,12 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv { st = logging.NewWrapper(st, t.Logf, "[STORAGE] ") te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft} m := NewManager(te.st, Parameters{ - EpochRefreshFrequency: 20 * time.Minute, - FullCheckpointFrequency: 7, - CleanupSafetyMargin: 1 * time.Hour, + EpochRefreshFrequency: 20 * time.Minute, + FullCheckpointFrequency: 7, + // increased safety margin because we're moving fake clock very fast + CleanupSafetyMargin: 48 * time.Hour, MinEpochDuration: 12 * time.Hour, - EpochAdvanceOnCountThreshold: 25, + EpochAdvanceOnCountThreshold: 15, EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20, DeleteParallelism: 1, }, te.compact, testlogging.NewTestLogger(t)) @@ -108,6 +112,20 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv { return te } +func (te *epochManagerTestEnv) another() *epochManagerTestEnv { + te2 := &epochManagerTestEnv{ + data: te.data, + unloggedst: te.unloggedst, + st: te.st, + ft: te.ft, + faultyStorage: te.faultyStorage, + } + + te2.mgr = NewManager(te2.st, te.mgr.Params, te2.compact, te.mgr.log) + + return te2 +} + func TestIndexEpochManager_Regular(t *testing.T) { t.Parallel() @@ -116,6 +134,119 @@ func TestIndexEpochManager_Regular(t *testing.T) { verifySequentialWrites(t, te) } +func TestIndexEpochManager_Parallel(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + ctx := testlogging.Context(t) + + eg, ctx := errgroup.WithContext(ctx) + + // run for 30 seconds of real time or 60 days of fake time which advances much faster + endFakeTime := te.ft.NowFunc()().Add(60 * 24 * time.Hour) + endTimeReal := clock.Now().Add(30 * time.Second) + + for worker := 1; worker <= 5; worker++ { + worker := worker + te2 := te.another() + indexNum := 1e6 * worker + + eg.Go(func() error { + _ = te2 + + var ( + previousEntries []int + writtenEntries []int + blobNotFoundCount int + successfulMergeCount int + ) + + for te2.ft.NowFunc()().Before(endFakeTime) && clock.Now().Before(endTimeReal) { + if err := ctx.Err(); err != nil { + return err + } + + indexNum++ + + var rnd [8]byte + + rand.Read(rnd[:]) + + ndx := newFakeIndexWithEntries(indexNum) + + if _, err := te2.mgr.WriteIndex(ctx, blob.ID(fmt.Sprintf("w%vr%x", worker, rnd)), gather.FromSlice(ndx.Bytes())); err != nil { + return errors.Wrap(err, "error writing") + } + + writtenEntries = append(writtenEntries, indexNum) + + blobs, err := te2.mgr.GetCompleteIndexSet(ctx, LatestEpoch) + if err != nil { + return errors.Wrap(err, "GetCompleteIndexSet") + } + + merged, err := te2.getMergedIndexContents(ctx, blob.IDsFromMetadata(blobs)) + if err != nil { + if errors.Is(err, blob.ErrBlobNotFound) { + // ErrBlobNotFound is unavoidable because another thread may decide + // to delete some blobs after we compute the index set. + blobNotFoundCount++ + continue + } + + return errors.Wrap(err, "getMergedIndexContents") + } + + successfulMergeCount++ + + if err := verifySuperset(previousEntries, merged.Entries); err != nil { + return errors.Wrap(err, "verifySuperset") + } + + if err := verifySuperset(writtenEntries, merged.Entries); err != nil { + return errors.Wrap(err, "verifySuperset") + } + + previousEntries = merged.Entries + + dt := randomTime(1*time.Minute, 3*time.Hour) + te2.ft.Advance(dt) + + time.Sleep(100 * time.Millisecond) + } + + // allow for 5% of NOT_FOUND races + if float64(blobNotFoundCount)/float64(successfulMergeCount) > 0.05 { + t.Fatalf("too many not found cases") + } + + t.Logf("worker %v wrote %v", worker, indexNum) + + return nil + }) + } + + require.NoError(t, eg.Wait()) +} + +// verifySuperset verifies that every element in 'a' is also found in 'b'. +// Both sets are sorted and unique. +func verifySuperset(a, b []int) error { + nextB := 0 + + for _, it := range a { + for nextB < len(b) && b[nextB] < it { + nextB++ + } + + if nextB >= len(b) || b[nextB] != it { + return errors.Errorf("%v not found", it) + } + } + + return nil +} + func TestIndexEpochManager_RogueBlobs(t *testing.T) { t.Parallel() @@ -123,7 +254,7 @@ func TestIndexEpochManager_RogueBlobs(t *testing.T) { te.data[epochMarkerIndexBlobPrefix+"zzzz"] = []byte{1} te.data[singleEpochCompactionBlobPrefix+"zzzz"] = []byte{1} - te.data[fullCheckpointIndexBlobPrefix+"zzzz"] = []byte{1} + te.data[rangeCheckpointIndexBlobPrefix+"zzzz"] = []byte{1} verifySequentialWrites(t, te) te.mgr.Cleanup(testlogging.Context(t)) @@ -201,7 +332,7 @@ func TestRefreshRetriesIfTakingTooLong(t *testing.T) { te.faultyStorage.Faults = map[string][]*blobtesting.Fault{ "ListBlobs": { &blobtesting.Fault{ - Repeat: 4, // refresh does 3 lists, so this will cause 2 unsuccessful retries + Repeat: 8, // refresh does 7 lists, so this will cause 2 unsuccessful retries ErrCallback: func() error { te.ft.Advance(24 * time.Hour) @@ -253,88 +384,50 @@ func TestGetCompleteIndexSetRetriesIfTookTooLong(t *testing.T) { require.EqualValues(t, 1, *te.mgr.getCompleteIndexSetTooSlow) } -func TestLateWriteIsIgnored(t *testing.T) { +func TestSlowWrite(t *testing.T) { te := newTestEnv(t) defer te.mgr.Flush() ctx := testlogging.Context(t) - // get current epoch number - epoch, err := te.mgr.Current(ctx) - require.NoError(t, err) - - var rnd [8]byte - - rand.Read(rnd[:]) - - blobID1 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:])) - - rand.Read(rnd[:]) - blobID2 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:])) - - // at this point it's possible that the process hangs for a very long time, during which the - // current epoch moves by 2. This would be dangerous, since we'd potentially modify an already - // settled epoch. - // To verify this, we call WroteIndex() after the write which will fail if the write finished - // late. During read we will ignore index files with dates that are too late. - - // simulate process process hanging for a very long time, during which time the epoch moves. - for i := 0; i < 30; i++ { - te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i)) - te.ft.Advance(time.Hour) + te.faultyStorage.Faults = map[string][]*blobtesting.Fault{ + "PutBlob": { + { + Repeat: 10, + ErrCallback: func() error { + te.ft.Advance(24 * time.Hour) + return nil + }, + }, + }, } - // epoch advance is triggered during reads. - _, err = te.mgr.GetCompleteIndexSet(ctx, epoch+1) + te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(1)) + require.EqualValues(t, 11, *te.mgr.writeIndexTooSlow) + te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(2)) + te.verifyCompleteIndexSet(ctx, t, LatestEpoch, newFakeIndexWithEntries(1, 2)) +} + +func TestForceAdvanceEpoch(t *testing.T) { + te := newTestEnv(t) + defer te.mgr.Flush() + + ctx := testlogging.Context(t) + cs, err := te.mgr.Current(ctx) require.NoError(t, err) + require.Equal(t, 0, cs.WriteEpoch) - // make sure the epoch has moved - epoch2, err := te.mgr.Current(ctx) + require.NoError(t, te.mgr.ForceAdvanceEpoch(ctx)) + + cs, err = te.mgr.Current(ctx) require.NoError(t, err) - require.Equal(t, epoch+1, epoch2) + require.Equal(t, 1, cs.WriteEpoch) - require.NoError(t, te.st.PutBlob(ctx, blobID1, gather.FromSlice([]byte("dummy")))) - bm, err := te.unloggedst.GetMetadata(ctx, blobID1) + require.NoError(t, te.mgr.ForceAdvanceEpoch(ctx)) + cs, err = te.mgr.Current(ctx) require.NoError(t, err) - - // it's not an error to finish the write in the next epoch. - require.NoError(t, te.mgr.WroteIndex(ctx, bm)) - - // move the epoch one more. - for i := 0; i < 30; i++ { - te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i)) - te.ft.Advance(time.Hour) - } - - // epoch advance is triggered during reads. - _, err = te.mgr.GetCompleteIndexSet(ctx, epoch+2) - require.NoError(t, err) - - // make sure the epoch has moved - epoch3, err := te.mgr.Current(ctx) - require.NoError(t, err) - require.Equal(t, epoch+2, epoch3) - - // on Windows the time does not always move forward, give it a nudge. - te.ft.Advance(2 * time.Second) - - require.NoError(t, te.st.PutBlob(ctx, blobID2, gather.FromSlice([]byte("dummy")))) - bm, err = te.unloggedst.GetMetadata(ctx, blobID2) - - require.NoError(t, err) - - // at this point WroteIndex() will fail because epoch #0 is already settled. - require.Error(t, te.mgr.WroteIndex(ctx, bm)) - - iset, err := te.mgr.GetCompleteIndexSet(ctx, epoch3) - require.NoError(t, err) - - // blobID1 will be included in the index. - require.Contains(t, blob.IDsFromMetadata(iset), blobID1) - - // blobID2 will be excluded from the index. - require.NotContains(t, blob.IDsFromMetadata(iset), blobID2) + require.Equal(t, 2, cs.WriteEpoch) } // nolint:thelper @@ -419,18 +512,10 @@ func (te *epochManagerTestEnv) getMergedIndexContents(ctx context.Context, blobI func (te *epochManagerTestEnv) mustWriteIndexFile(ctx context.Context, t *testing.T, ndx *fakeIndex) { t.Helper() - epoch, err := te.mgr.Current(ctx) - require.NoError(t, err) - var rnd [8]byte rand.Read(rnd[:]) - blobID := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:])) - - require.NoError(t, te.st.PutBlob(ctx, blobID, gather.FromSlice(ndx.Bytes()))) - bm, err := te.unloggedst.GetMetadata(ctx, blobID) - + _, err := te.mgr.WriteIndex(ctx, blob.ID(hex.EncodeToString(rnd[:])), gather.FromSlice(ndx.Bytes())) require.NoError(t, err) - require.NoError(t, te.mgr.WroteIndex(ctx, bm)) } diff --git a/internal/epoch/epoch_range.go b/internal/epoch/epoch_range.go new file mode 100644 index 000000000..95bb03ece --- /dev/null +++ b/internal/epoch/epoch_range.go @@ -0,0 +1,47 @@ +package epoch + +import ( + "github.com/kopia/kopia/repo/blob" +) + +// RangeMetadata represents a range of indexes for [min,max] epoch range. Both min and max are inclusive. +type RangeMetadata struct { + MinEpoch int `json:"min"` + MaxEpoch int `json:"max"` + Blobs []blob.Metadata `json:"blobs"` +} + +func findLongestRangeCheckpoint(ranges []*RangeMetadata) []*RangeMetadata { + byMin := map[int][]*RangeMetadata{} + + for _, r := range ranges { + byMin[r.MinEpoch] = append(byMin[r.MinEpoch], r) + } + + return findLongestRangeCheckpointStartingAt(0, byMin, make(map[int][]*RangeMetadata)) +} + +func findLongestRangeCheckpointStartingAt(startEpoch int, byMin, memo map[int][]*RangeMetadata) []*RangeMetadata { + l, ok := memo[startEpoch] + if ok { + return l + } + + var ( + longest = 0 + longestMetadata []*RangeMetadata + ) + + for _, cp := range byMin[startEpoch] { + combined := append([]*RangeMetadata{cp}, findLongestRangeCheckpointStartingAt(cp.MaxEpoch+1, byMin, memo)...) + + if max := combined[len(combined)-1].MaxEpoch; max > longest { + longest = max + longestMetadata = combined + } + } + + memo[startEpoch] = longestMetadata + + return longestMetadata +} diff --git a/internal/epoch/epoch_range_test.go b/internal/epoch/epoch_range_test.go new file mode 100644 index 000000000..d2f571883 --- /dev/null +++ b/internal/epoch/epoch_range_test.go @@ -0,0 +1,63 @@ +package epoch + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLongestRangeCheckpoint(t *testing.T) { + m0_9 := newEpochRangeMetadataForTesting(0, 9) + m0_29 := newEpochRangeMetadataForTesting(0, 29) + m10_19 := newEpochRangeMetadataForTesting(10, 19) + m20_29 := newEpochRangeMetadataForTesting(20, 29) + m30_39 := newEpochRangeMetadataForTesting(30, 39) + m50_59 := newEpochRangeMetadataForTesting(50, 59) + m10_59 := newEpochRangeMetadataForTesting(10, 59) + + cases := []struct { + input []*RangeMetadata + want []*RangeMetadata + }{ + { + input: nil, + want: nil, + }, + { + input: []*RangeMetadata{m0_9, m10_19, m20_29}, + want: []*RangeMetadata{m0_9, m10_19, m20_29}, + }, + { + input: []*RangeMetadata{m0_9, m10_19, m20_29, m50_59}, + want: []*RangeMetadata{m0_9, m10_19, m20_29}, + }, + { + input: []*RangeMetadata{m0_9, m20_29, m50_59}, + want: []*RangeMetadata{m0_9}, + }, + { + input: []*RangeMetadata{m0_29, m20_29, m30_39}, + want: []*RangeMetadata{m0_29, m30_39}, + }, + { + input: []*RangeMetadata{m0_9, m0_29, m10_19, m30_39}, + want: []*RangeMetadata{m0_29, m30_39}, + }, + { + input: []*RangeMetadata{m0_9, m0_29, m10_59, m30_39}, + want: []*RangeMetadata{m0_9, m10_59}, + }, + { + input: []*RangeMetadata{m0_9, m0_9, m0_29, m10_59, m30_39}, + want: []*RangeMetadata{m0_9, m10_59}, + }, + } + + for _, tc := range cases { + require.Equal(t, tc.want, findLongestRangeCheckpoint(tc.input)) + } +} + +func newEpochRangeMetadataForTesting(min, max int) *RangeMetadata { + return &RangeMetadata{MinEpoch: min, MaxEpoch: max} +} diff --git a/internal/epoch/epoch_utils.go b/internal/epoch/epoch_utils.go index aaebca3c0..b41bb1a36 100644 --- a/internal/epoch/epoch_utils.go +++ b/internal/epoch/epoch_utils.go @@ -30,6 +30,29 @@ func epochNumberFromBlobID(blobID blob.ID) (int, bool) { return n, true } +// epochNumberFromBlobID extracts the range epoch numbers from a string formatted as +// __. +func epochRangeFromBlobID(blobID blob.ID) (min, max int, ok bool) { + parts := strings.Split(string(blobID), "_") + + // nolint:gomnd + if len(parts) < 3 { + return 0, 0, false + } + + first := parts[0] + second := parts[1] + + for len(first) > 0 && !unicode.IsDigit(rune(first[0])) { + first = first[1:] + } + + n1, err1 := strconv.Atoi(first) + n2, err2 := strconv.Atoi(second) + + return n1, n2, err1 == nil && err2 == nil +} + func blobsWrittenBefore(bms []blob.Metadata, maxTime time.Time) []blob.Metadata { var result []blob.Metadata @@ -55,3 +78,19 @@ func groupByEpochNumber(bms []blob.Metadata) map[int][]blob.Metadata { return result } + +func groupByEpochRanges(bms []blob.Metadata) map[int]map[int][]blob.Metadata { + result := map[int]map[int][]blob.Metadata{} + + for _, bm := range bms { + if n1, n2, ok := epochRangeFromBlobID(bm.BlobID); ok { + if result[n1] == nil { + result[n1] = make(map[int][]blob.Metadata) + } + + result[n1][n2] = append(result[n1][n2], bm) + } + } + + return result +} diff --git a/repo/content/builder.go b/repo/content/builder.go index b4c483d81..213d0f708 100644 --- a/repo/content/builder.go +++ b/repo/content/builder.go @@ -2,6 +2,7 @@ import ( "crypto/rand" + "hash/fnv" "io" "runtime" "sort" @@ -140,3 +141,26 @@ func (b packIndexBuilder) BuildStable(output io.Writer, version int) error { return errors.Errorf("unsupported index version: %v", version) } } + +func (b packIndexBuilder) shard(maxShardSize int) []packIndexBuilder { + numShards := (len(b) + maxShardSize - 1) / maxShardSize + if numShards <= 1 { + return []packIndexBuilder{b} + } + + result := make([]packIndexBuilder, numShards) + for i := range result { + result[i] = make(packIndexBuilder) + } + + for k, v := range b { + h := fnv.New32a() + io.WriteString(h, string(k)) // nolint:errcheck + + shard := h.Sum32() % uint32(numShards) + + result[shard][k] = v + } + + return result +} diff --git a/repo/content/packindex_test.go b/repo/content/packindex_test.go index 425c60f50..049c54e6a 100644 --- a/repo/content/packindex_test.go +++ b/repo/content/packindex_test.go @@ -495,3 +495,68 @@ func infoDiff(i1, i2 Info, ignore ...string) []string { return result } + +func TestShard(t *testing.T) { + b := packIndexBuilder{} + + // generate 10000 IDs in random order + ids := make([]int, 10000) + for i := range ids { + ids[i] = i + } + + rand.Shuffle(len(ids), func(i, j int) { + ids[i], ids[j] = ids[j], ids[i] + }) + + // add ID to the builder + for _, id := range ids { + b.Add(&InfoStruct{ + ContentID: deterministicContentID("", id), + }) + } + + // verify number of shards + verifyAllShardedIDs(t, b.shard(100000), len(b), 1) + verifyAllShardedIDs(t, b.shard(100), len(b), 100) + + // sharding will always produce stable results, verify sorted shard lengths here + require.ElementsMatch(t, + []int{460, 472, 473, 477, 479, 483, 486, 492, 498, 499, 501, 503, 504, 505, 511, 519, 524, 528, 542, 544}, + verifyAllShardedIDs(t, b.shard(500), len(b), 20)) + require.ElementsMatch(t, + []int{945, 964, 988, 988, 993, 1002, 1014, 1017, 1021, 1068}, + verifyAllShardedIDs(t, b.shard(1000), len(b), 10)) + require.ElementsMatch(t, + []int{1952, 1995, 2005, 2013, 2035}, + verifyAllShardedIDs(t, b.shard(2000), len(b), 5)) +} + +func verifyAllShardedIDs(t *testing.T, sharded []packIndexBuilder, numTotal, numShards int) []int { + t.Helper() + + require.Len(t, sharded, numShards) + + m := map[ID]bool{} + for i := 0; i < numTotal; i++ { + m[deterministicContentID("", i)] = true + } + + cnt := 0 + + var lens []int + + for _, s := range sharded { + cnt += len(s) + lens = append(lens, len(s)) + + for _, v := range s { + delete(m, v.GetContentID()) + } + } + + require.Equal(t, numTotal, cnt, "invalid total number of sharded elements") + require.Empty(t, m) + + return lens +}