More epoch manager work (#1147)

* content: added packIndexBuilder sharding

* epoch manager improvements
This commit is contained in:
Jarek Kowalski
2021-06-19 16:48:45 -07:00
committed by GitHub
parent 9e059a1277
commit 0756dee6d5
8 changed files with 687 additions and 266 deletions

View File

@@ -18,9 +18,9 @@ func TestShouldAdvanceEpoch(t *testing.T) {
Timestamp: t0, Length: 1,
})
for i := 0; i < defaultParams.EpochAdvanceOnCountThreshold; i++ {
for i := 0; i < DefaultParameters.EpochAdvanceOnCountThreshold; i++ {
lotsOfMetadata = append(lotsOfMetadata, blob.Metadata{
Timestamp: t0.Add(defaultParams.MinEpochDuration),
Timestamp: t0.Add(DefaultParameters.MinEpochDuration),
Length: 1,
})
}
@@ -45,7 +45,7 @@ func TestShouldAdvanceEpoch(t *testing.T) {
{
desc: "two blobs, not enough time passed, size enough to advance",
bms: []blob.Metadata{
{Timestamp: t0.Add(defaultParams.MinEpochDuration - 1), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold},
{Timestamp: t0.Add(DefaultParameters.MinEpochDuration - 1), Length: DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold},
{Timestamp: t0, Length: 1},
},
want: false,
@@ -54,7 +54,7 @@ func TestShouldAdvanceEpoch(t *testing.T) {
desc: "two blobs, enough time passed, total size enough to advance",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
{Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold},
{Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold},
},
want: true,
},
@@ -62,7 +62,7 @@ func TestShouldAdvanceEpoch(t *testing.T) {
desc: "two blobs, enough time passed, total size not enough to advance",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
{Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold - 2},
{Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold - 2},
},
want: false,
},
@@ -70,7 +70,7 @@ func TestShouldAdvanceEpoch(t *testing.T) {
desc: "enough time passed, count not enough to advance",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
{Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: 1},
{Timestamp: t0.Add(DefaultParameters.MinEpochDuration), Length: 1},
},
want: false,
},
@@ -83,7 +83,7 @@ func TestShouldAdvanceEpoch(t *testing.T) {
for _, tc := range cases {
require.Equal(t, tc.want,
shouldAdvance(tc.bms, defaultParams.MinEpochDuration, defaultParams.EpochAdvanceOnCountThreshold, defaultParams.EpochAdvanceOnTotalSizeBytesThreshold),
shouldAdvance(tc.bms, DefaultParameters.MinEpochDuration, DefaultParameters.EpochAdvanceOnCountThreshold, DefaultParameters.EpochAdvanceOnTotalSizeBytesThreshold),
tc.desc)
}
}

View File

@@ -15,7 +15,6 @@
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
@@ -23,6 +22,12 @@
// LatestEpoch represents the current epoch number in GetCompleteIndexSet.
const LatestEpoch = -1
const (
initiaRefreshAttemptSleep = 100 * time.Millisecond
maxRefreshAttemptSleep = 15 * time.Second
maxRefreshAttemptSleepExponent = 1.5
)
// Parameters encapsulates all parameters that influence the behavior of epoch manager.
type Parameters struct {
// how frequently each client will list blobs to determine the current epoch.
@@ -47,58 +52,61 @@ type Parameters struct {
DeleteParallelism int
}
// DefaultParameters contains default epoch manager parameters.
// nolint:gomnd
var defaultParams = Parameters{
var DefaultParameters = Parameters{
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
CleanupSafetyMargin: 1 * time.Hour,
MinEpochDuration: 6 * time.Hour,
EpochAdvanceOnCountThreshold: 100,
EpochAdvanceOnCountThreshold: 20,
EpochAdvanceOnTotalSizeBytesThreshold: 10 << 20,
DeleteParallelism: 4,
}
// snapshot captures a point-in time snapshot of a repository indexes, including current epoch
// information and existing checkpoints.
type snapshot struct {
WriteEpoch int `json:"writeEpoch"`
LatestFullCheckpointEpoch int `json:"latestCheckpointEpoch"`
FullCheckpointSets map[int][]blob.Metadata `json:"fullCheckpointSets"`
SingleEpochCompactionSets map[int][]blob.Metadata `json:"singleEpochCompactionSets"`
EpochStartTime map[int]time.Time `json:"epochStartTimes"`
ValidUntil time.Time `json:"validUntil"` // time after which the contents of this struct are no longer valid
// CurrentSnapshot captures a point-in time snapshot of a repository indexes, including current epoch
// information and compaction set.
type CurrentSnapshot struct {
WriteEpoch int `json:"writeEpoch"`
UncompactedEpochSets map[int][]blob.Metadata `json:"unsettled"`
LongestRangeCheckpointSets []*RangeMetadata `json:"longestRangeCheckpointSets"`
SingleEpochCompactionSets map[int][]blob.Metadata `json:"singleEpochCompactionSets"`
EpochStartTime map[int]time.Time `json:"epochStartTimes"`
ValidUntil time.Time `json:"validUntil"` // time after which the contents of this struct are no longer valid
}
func (cs *snapshot) isSettledEpochNumber(epoch int) bool {
func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool {
return epoch <= cs.WriteEpoch-numUnsettledEpochs
}
// Manager manages repository epochs.
type Manager struct {
Params Parameters
st blob.Storage
compact CompactionFunc
log logging.Logger
timeFunc func() time.Time
params Parameters
// wait group that waits for all compaction and cleanup goroutines.
backgroundWork sync.WaitGroup
// mutable under lock, data invalid until refresh succeeds at least once.
mu sync.Mutex
lastKnownState snapshot
lastKnownState CurrentSnapshot
// counters keeping track of the number of times operations were too slow and had to
// be retried, for testability.
committedStateRefreshTooSlow *int32
getCompleteIndexSetTooSlow *int32
writeIndexTooSlow *int32
}
const (
epochMarkerIndexBlobPrefix blob.ID = "xe"
uncompactedIndexBlobPrefix blob.ID = "xn"
singleEpochCompactionBlobPrefix blob.ID = "xs"
fullCheckpointIndexBlobPrefix blob.ID = "xf"
rangeCheckpointIndexBlobPrefix blob.ID = "xr"
numUnsettledEpochs = 2
)
@@ -113,6 +121,27 @@ func (e *Manager) Flush() {
e.backgroundWork.Wait()
}
// Current retrieves current snapshot.
func (e *Manager) Current(ctx context.Context) (CurrentSnapshot, error) {
return e.committedState(ctx)
}
// ForceAdvanceEpoch advances current epoch unconditionally.
func (e *Manager) ForceAdvanceEpoch(ctx context.Context) error {
cs, err := e.committedState(ctx)
if err != nil {
return err
}
e.Invalidate()
if err := e.advanceEpoch(ctx, cs); err != nil {
return errors.Wrap(err, "error advancing epoch")
}
return nil
}
// Refresh refreshes information about current epoch.
func (e *Manager) Refresh(ctx context.Context) error {
e.mu.Lock()
@@ -131,9 +160,31 @@ func (e *Manager) Cleanup(ctx context.Context) error {
return e.cleanupInternal(ctx, cs)
}
func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error {
func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error {
eg, ctx := errgroup.WithContext(ctx)
// find max timestamp recently written to the repository to establish storage clock.
// we will be deleting blobs whose timestamps are sufficiently old enough relative
// to this max time. This assumes that storage clock moves forward somewhat reasonably.
var maxTime time.Time
for _, v := range cs.UncompactedEpochSets {
for _, bm := range v {
if bm.Timestamp.After(maxTime) {
maxTime = bm.Timestamp
}
}
}
if maxTime.IsZero() {
return nil
}
// only delete blobs if a suitable replacement exists and has been written sufficiently
// long ago. we don't want to delete blobs that are created too recently, because other clients
// may have not observed them yet.
maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin)
// delete epoch markers for epoch < current-1
eg.Go(func() error {
var toDelete []blob.ID
@@ -150,7 +201,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error {
return errors.Wrap(err, "error listing epoch markers")
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting index blob marker")
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting index blob marker")
})
// delete uncompacted indexes for epochs that already have single-epoch compaction
@@ -164,76 +215,51 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error {
var toDelete []blob.ID
for _, bm := range blobs {
if cs.safeToDeleteUncompactedBlob(bm, e.params.CleanupSafetyMargin) {
toDelete = append(toDelete, bm.BlobID)
if epoch, ok := epochNumberFromBlobID(bm.BlobID); ok {
if blobSetWrittenEarlyEnough(cs.SingleEpochCompactionSets[epoch], maxReplacementTime) {
toDelete = append(toDelete, bm.BlobID)
}
}
}
if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism); err != nil {
if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism); err != nil {
return errors.Wrap(err, "unable to delete uncompacted blobs")
}
return nil
})
// delete single-epoch compacted indexes epoch numbers for which full-world state compacted exist
if cs.LatestFullCheckpointEpoch > 0 {
eg.Go(func() error {
blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix)
if err != nil {
return errors.Wrap(err, "error refreshing epochs")
}
var toDelete []blob.ID
for _, bm := range blobs {
epoch, ok := epochNumberFromBlobID(bm.BlobID)
if !ok {
continue
}
if epoch < cs.LatestFullCheckpointEpoch {
toDelete = append(toDelete, bm.BlobID)
}
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting single-epoch compacted blobs")
})
}
return errors.Wrap(eg.Wait(), "error cleaning up index blobs")
}
func (cs *snapshot) safeToDeleteUncompactedBlob(bm blob.Metadata, safetyMargin time.Duration) bool {
epoch, ok := epochNumberFromBlobID(bm.BlobID)
if !ok {
return false
}
if epoch < cs.LatestFullCheckpointEpoch {
return true
}
cset := cs.SingleEpochCompactionSets[epoch]
if cset == nil {
// single-epoch compaction set does not exist for this epoch, don't delete.
func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTime time.Time) bool {
max := blob.MaxTimestamp(replacementSet)
if max.IsZero() {
return false
}
// compaction set was written sufficiently long ago to be reliably discovered by all
// other clients - we can delete uncompacted blobs for this epoch.
compactionSetWriteTime := blob.MaxTimestamp(cset)
return compactionSetWriteTime.Add(safetyMargin).Before(cs.EpochStartTime[cs.WriteEpoch])
return blob.MaxTimestamp(replacementSet).Before(maxReplacementTime)
}
func (e *Manager) refreshLocked(ctx context.Context) error {
return errors.Wrap(retry.WithExponentialBackoffNoValue(ctx, "epoch manager refresh", func() error {
return e.refreshAttemptLocked(ctx)
}, retry.Always), "error refreshing")
nextDelayTime := initiaRefreshAttemptSleep
for err := e.refreshAttemptLocked(ctx); err != nil; err = e.refreshAttemptLocked(ctx) {
e.log.Debugf("refresh attempt failed: %v, sleeping %v before next retry", err, nextDelayTime)
nextDelayTime = time.Duration(float64(nextDelayTime) * maxRefreshAttemptSleepExponent)
if nextDelayTime > maxRefreshAttemptSleep {
nextDelayTime = maxRefreshAttemptSleep
}
}
return nil
}
func (e *Manager) loadWriteEpoch(ctx context.Context, cs *snapshot) error {
func (e *Manager) loadWriteEpoch(ctx context.Context, cs *CurrentSnapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, epochMarkerIndexBlobPrefix)
if err != nil {
return errors.Wrap(err, "error loading write epoch")
@@ -250,26 +276,34 @@ func (e *Manager) loadWriteEpoch(ctx context.Context, cs *snapshot) error {
return nil
}
func (e *Manager) loadFullCheckpoints(ctx context.Context, cs *snapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, fullCheckpointIndexBlobPrefix)
func (e *Manager) loadRangeCheckpoints(ctx context.Context, cs *CurrentSnapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, rangeCheckpointIndexBlobPrefix)
if err != nil {
return errors.Wrap(err, "error loading full checkpoints")
}
for epoch, bms := range groupByEpochNumber(blobs) {
if comp := findCompleteSetOfBlobs(bms); comp != nil {
cs.FullCheckpointSets[epoch] = comp
var rangeCheckpointSets []*RangeMetadata
if epoch > cs.LatestFullCheckpointEpoch {
cs.LatestFullCheckpointEpoch = epoch
for epoch1, m := range groupByEpochRanges(blobs) {
for epoch2, bms := range m {
if comp := findCompleteSetOfBlobs(bms); comp != nil {
erm := &RangeMetadata{
MinEpoch: epoch1,
MaxEpoch: epoch2,
Blobs: comp,
}
rangeCheckpointSets = append(rangeCheckpointSets, erm)
}
}
}
cs.LongestRangeCheckpointSets = findLongestRangeCheckpoint(rangeCheckpointSets)
return nil
}
func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *snapshot) error {
func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSnapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix)
if err != nil {
return errors.Wrap(err, "error loading single-epoch compactions")
@@ -284,8 +318,18 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *snapshot)
return nil
}
func (e *Manager) maybeStartFullCheckpointAsync(ctx context.Context, cs snapshot) {
if cs.WriteEpoch-cs.LatestFullCheckpointEpoch < e.params.FullCheckpointFrequency {
func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
return
}
firstNonRangeCompacted := 0
if len(cs.LongestRangeCheckpointSets) > 0 {
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1
}
if latestSettled-firstNonRangeCompacted < e.Params.FullCheckpointFrequency {
return
}
@@ -294,13 +338,16 @@ func (e *Manager) maybeStartFullCheckpointAsync(ctx context.Context, cs snapshot
go func() {
defer e.backgroundWork.Done()
if err := e.generateFullCheckpointFromCommittedState(ctx, cs, cs.WriteEpoch-numUnsettledEpochs); err != nil {
if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil {
e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err)
}
}()
}
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs snapshot) {
func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) {
}
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot) {
e.backgroundWork.Add(1)
go func() {
@@ -312,16 +359,49 @@ func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs snapshot) {
}()
}
func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[int][]blob.Metadata, error) {
var mu sync.Mutex
result := map[int][]blob.Metadata{}
eg, ctx := errgroup.WithContext(ctx)
for n := min; n <= max; n++ {
n := n
if n < 0 {
continue
}
eg.Go(func() error {
bm, err := blob.ListAllBlobs(ctx, e.st, uncompactedEpochBlobPrefix(n))
if err != nil {
return errors.Wrapf(err, "error listing uncompacted epoch %v", n)
}
mu.Lock()
defer mu.Unlock()
result[n] = bm
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, errors.Wrap(err, "error listing uncompacted epochs")
}
return result, nil
}
// refreshAttemptLocked attempts to load the committedState of
// the index and updates `lastKnownState` state atomically when complete.
func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
cs := snapshot{
cs := CurrentSnapshot{
WriteEpoch: 0,
EpochStartTime: map[int]time.Time{},
UncompactedEpochSets: map[int][]blob.Metadata{},
SingleEpochCompactionSets: map[int][]blob.Metadata{},
LatestFullCheckpointEpoch: 0,
FullCheckpointSets: map[int][]blob.Metadata{},
ValidUntil: e.timeFunc().Add(e.params.EpochRefreshFrequency),
ValidUntil: e.timeFunc().Add(e.Params.EpochRefreshFrequency),
}
eg, ctx := errgroup.WithContext(ctx)
@@ -332,13 +412,36 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
return e.loadSingleEpochCompactions(ctx, &cs)
})
eg.Go(func() error {
return e.loadFullCheckpoints(ctx, &cs)
return e.loadRangeCheckpoints(ctx, &cs)
})
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "error refreshing")
}
ues, err := e.loadUncompactedEpochs(ctx, cs.WriteEpoch-1, cs.WriteEpoch+1)
if err != nil {
return errors.Wrap(err, "error loading uncompacted epochs")
}
for epoch := range ues {
ues[epoch] = blobsWrittenBefore(ues[epoch], cs.EpochStartTime[epoch+numUnsettledEpochs])
}
cs.UncompactedEpochSets = ues
e.log.Debugf("current epoch %v, uncompacted epoch sets %v %v %v",
cs.WriteEpoch,
len(ues[cs.WriteEpoch-1]),
len(ues[cs.WriteEpoch]),
len(ues[cs.WriteEpoch+1]))
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Params.MinEpochDuration, e.Params.EpochAdvanceOnCountThreshold, e.Params.EpochAdvanceOnTotalSizeBytesThreshold) {
if err := e.advanceEpoch(ctx, cs); err != nil {
return errors.Wrap(err, "error advancing epoch")
}
}
if e.timeFunc().After(cs.ValidUntil) {
atomic.AddInt32(e.committedStateRefreshTooSlow, 1)
@@ -347,37 +450,36 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
e.lastKnownState = cs
e.maybeStartFullCheckpointAsync(ctx, cs)
e.maybeGenerateNextRangeCheckpointAsync(ctx, cs)
e.maybeStartCleanupAsync(ctx, cs)
e.log.Debugf("current epoch %v started at %v", cs.WriteEpoch, cs.EpochStartTime[cs.WriteEpoch])
e.maybeOptimizeRangeCheckpointsAsync(ctx, cs)
return nil
}
func (e *Manager) committedState(ctx context.Context) (snapshot, error) {
func (e *Manager) advanceEpoch(ctx context.Context, cs CurrentSnapshot) error {
blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), cs.WriteEpoch+1))
if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil {
return errors.Wrap(err, "error writing epoch marker")
}
return nil
}
func (e *Manager) committedState(ctx context.Context) (CurrentSnapshot, error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.timeFunc().After(e.lastKnownState.ValidUntil) {
if err := e.refreshLocked(ctx); err != nil {
return snapshot{}, err
return CurrentSnapshot{}, err
}
}
return e.lastKnownState, nil
}
// Current returns the current epoch number.
func (e *Manager) Current(ctx context.Context) (int, error) {
cs, err := e.committedState(ctx)
if err != nil {
return 0, err
}
return cs.WriteEpoch, nil
}
// GetCompleteIndexSet returns the set of blobs forming a complete index set up to the provided epoch number.
func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob.Metadata, error) {
for {
@@ -390,8 +492,9 @@ func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob
maxEpoch = cs.WriteEpoch + 1
}
result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, maxEpoch)
result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, 0, maxEpoch)
if e.timeFunc().Before(cs.ValidUntil) {
e.log.Debugf("Complete Index Set for [%v..%v]: %v", 0, maxEpoch, blob.IDsFromMetadata(result))
return result, err
}
@@ -408,72 +511,89 @@ func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob
}
}
func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs snapshot, maxEpoch int) ([]blob.Metadata, error) {
var (
startEpoch int
// WriteIndex writes new index blob by picking the appropriate prefix based on current epoch.
func (e *Manager) WriteIndex(ctx context.Context, unprefixedBlobID blob.ID, data blob.Bytes) (blob.Metadata, error) {
for {
cs, err := e.committedState(ctx)
if err != nil {
return blob.Metadata{}, errors.Wrap(err, "error getting committed state")
}
resultMutex sync.Mutex
result []blob.Metadata
)
blobID := uncompactedEpochBlobPrefix(cs.WriteEpoch) + unprefixedBlobID
for i := maxEpoch; i >= 0; i-- {
if blobs := cs.FullCheckpointSets[i]; blobs != nil {
result = append(result, blobs...)
startEpoch = i + 1
if err := e.st.PutBlob(ctx, blobID, data); err != nil {
return blob.Metadata{}, errors.Wrap(err, "error writing index blob")
}
e.log.Debugf("using full checkpoint at epoch %v", i)
if !e.timeFunc().Before(cs.ValidUntil) {
e.log.Debugf("write was too slow, retrying")
atomic.AddInt32(e.writeIndexTooSlow, 1)
break
continue
}
e.Invalidate()
// nolint:wrapcheck
return e.st.GetMetadata(ctx, blobID)
}
}
// Invalidate ensures that all cached index information is discarded.
func (e *Manager) Invalidate() {
e.mu.Lock()
defer e.mu.Unlock()
e.lastKnownState = CurrentSnapshot{}
}
func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs CurrentSnapshot, minEpoch, maxEpoch int) ([]blob.Metadata, error) {
var result []blob.Metadata
startEpoch := minEpoch
for _, c := range cs.LongestRangeCheckpointSets {
if c.MaxEpoch > maxEpoch {
result = append(result, c.Blobs...)
startEpoch = c.MaxEpoch + 1
}
}
eg, ctx := errgroup.WithContext(ctx)
e.log.Debugf("adding incremental state for epochs %v..%v", startEpoch, maxEpoch)
e.log.Debugf("adding incremental state for epochs %v..%v on top of %v", startEpoch, maxEpoch, result)
cnt := maxEpoch - startEpoch + 1
for i := startEpoch; i <= maxEpoch; i++ {
tmp := make([][]blob.Metadata, cnt)
for i := 0; i < cnt; i++ {
i := i
ep := i + startEpoch
eg.Go(func() error {
s, err := e.getIndexesFromEpochInternal(ctx, cs, i)
s, err := e.getIndexesFromEpochInternal(ctx, cs, ep)
if err != nil {
return errors.Wrapf(err, "error getting indexes for epoch %v", i)
return errors.Wrapf(err, "error getting indexes for epoch %v", ep)
}
resultMutex.Lock()
result = append(result, s...)
resultMutex.Unlock()
tmp[i] = s
return nil
})
}
return result, errors.Wrap(eg.Wait(), "error getting indexes")
if err := eg.Wait(); err != nil {
return nil, errors.Wrap(err, "error getting indexes")
}
for _, v := range tmp {
result = append(result, v...)
}
return result, nil
}
// WroteIndex is invoked after writing an index blob. It will validate whether the index was written
// in the correct epoch.
func (e *Manager) WroteIndex(ctx context.Context, bm blob.Metadata) error {
cs, err := e.committedState(ctx)
if err != nil {
return err
}
epoch, ok := epochNumberFromBlobID(bm.BlobID)
if !ok {
return errors.Errorf("invalid blob ID written")
}
if cs.isSettledEpochNumber(epoch) {
return errors.Errorf("index write took to long")
}
e.invalidate()
return nil
}
func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs snapshot, epoch int) ([]blob.Metadata, error) {
func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSnapshot, epoch int) ([]blob.Metadata, error) {
// check if the epoch is old enough to possibly have compacted blobs
epochSettled := cs.isSettledEpochNumber(epoch)
if epochSettled && cs.SingleEpochCompactionSets[epoch] != nil {
@@ -515,39 +635,14 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs snapshot,
}()
}
advance := shouldAdvance(uncompactedBlobs, e.params.MinEpochDuration, e.params.EpochAdvanceOnCountThreshold, e.params.EpochAdvanceOnTotalSizeBytesThreshold)
if advance && epoch == cs.WriteEpoch {
if err := e.advanceEpoch(ctx, cs.WriteEpoch+1); err != nil {
e.log.Errorf("unable to advance epoch: %v, performance will be affected", err)
}
}
// return uncompacted blobs to the caller while we're compacting them in background
return uncompactedBlobs, nil
}
func (e *Manager) advanceEpoch(ctx context.Context, newEpoch int) error {
blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), newEpoch))
func (e *Manager) generateRangeCheckpointFromCommittedState(ctx context.Context, cs CurrentSnapshot, minEpoch, maxEpoch int) error {
e.log.Debugf("generating range checkpoint for %v..%v", minEpoch, maxEpoch)
if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil {
return errors.Wrap(err, "error writing epoch marker")
}
e.invalidate()
return nil
}
func (e *Manager) invalidate() {
e.mu.Lock()
defer e.mu.Unlock()
e.lastKnownState = snapshot{}
}
func (e *Manager) generateFullCheckpointFromCommittedState(ctx context.Context, cs snapshot, epoch int) error {
e.log.Debugf("generating full checkpoint until epoch %v", epoch)
completeSet, err := e.getCompleteIndexSetForCommittedState(ctx, cs, epoch)
completeSet, err := e.getCompleteIndexSetForCommittedState(ctx, cs, minEpoch, maxEpoch)
if err != nil {
return errors.Wrap(err, "unable to get full checkpoint")
}
@@ -556,7 +651,7 @@ func (e *Manager) generateFullCheckpointFromCommittedState(ctx context.Context,
return errors.Errorf("not generating full checkpoint - the committed state is no longer valid")
}
if err := e.compact(ctx, blob.IDsFromMetadata(completeSet), fullCheckpointBlobPrefix(epoch)); err != nil {
if err := e.compact(ctx, blob.IDsFromMetadata(completeSet), rangeCheckpointBlobPrefix(minEpoch, maxEpoch)); err != nil {
return errors.Wrap(err, "unable to compact blobs")
}
@@ -571,19 +666,22 @@ func compactedEpochBlobPrefix(epoch int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_", singleEpochCompactionBlobPrefix, epoch))
}
func fullCheckpointBlobPrefix(epoch int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_", fullCheckpointIndexBlobPrefix, epoch))
func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_%v_", rangeCheckpointIndexBlobPrefix, epoch1, epoch2))
}
// NewManager creates new epoch manager.
func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, sharedBaseLogger logging.Logger) *Manager {
log := logging.WithPrefix("[epoch-manager] ", sharedBaseLogger)
return &Manager{
st: st,
log: logging.WithPrefix("[epoch-manager] ", sharedBaseLogger),
log: log,
compact: compactor,
timeFunc: clock.Now,
params: params,
Params: params,
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
writeIndexTooSlow: new(int32),
}
}

View File

@@ -2,6 +2,7 @@
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math"
@@ -13,8 +14,10 @@
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
@@ -92,11 +95,12 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
st = logging.NewWrapper(st, t.Logf, "[STORAGE] ")
te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft}
m := NewManager(te.st, Parameters{
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
CleanupSafetyMargin: 1 * time.Hour,
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
// increased safety margin because we're moving fake clock very fast
CleanupSafetyMargin: 48 * time.Hour,
MinEpochDuration: 12 * time.Hour,
EpochAdvanceOnCountThreshold: 25,
EpochAdvanceOnCountThreshold: 15,
EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20,
DeleteParallelism: 1,
}, te.compact, testlogging.NewTestLogger(t))
@@ -108,6 +112,20 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
return te
}
func (te *epochManagerTestEnv) another() *epochManagerTestEnv {
te2 := &epochManagerTestEnv{
data: te.data,
unloggedst: te.unloggedst,
st: te.st,
ft: te.ft,
faultyStorage: te.faultyStorage,
}
te2.mgr = NewManager(te2.st, te.mgr.Params, te2.compact, te.mgr.log)
return te2
}
func TestIndexEpochManager_Regular(t *testing.T) {
t.Parallel()
@@ -116,6 +134,119 @@ func TestIndexEpochManager_Regular(t *testing.T) {
verifySequentialWrites(t, te)
}
func TestIndexEpochManager_Parallel(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
ctx := testlogging.Context(t)
eg, ctx := errgroup.WithContext(ctx)
// run for 30 seconds of real time or 60 days of fake time which advances much faster
endFakeTime := te.ft.NowFunc()().Add(60 * 24 * time.Hour)
endTimeReal := clock.Now().Add(30 * time.Second)
for worker := 1; worker <= 5; worker++ {
worker := worker
te2 := te.another()
indexNum := 1e6 * worker
eg.Go(func() error {
_ = te2
var (
previousEntries []int
writtenEntries []int
blobNotFoundCount int
successfulMergeCount int
)
for te2.ft.NowFunc()().Before(endFakeTime) && clock.Now().Before(endTimeReal) {
if err := ctx.Err(); err != nil {
return err
}
indexNum++
var rnd [8]byte
rand.Read(rnd[:])
ndx := newFakeIndexWithEntries(indexNum)
if _, err := te2.mgr.WriteIndex(ctx, blob.ID(fmt.Sprintf("w%vr%x", worker, rnd)), gather.FromSlice(ndx.Bytes())); err != nil {
return errors.Wrap(err, "error writing")
}
writtenEntries = append(writtenEntries, indexNum)
blobs, err := te2.mgr.GetCompleteIndexSet(ctx, LatestEpoch)
if err != nil {
return errors.Wrap(err, "GetCompleteIndexSet")
}
merged, err := te2.getMergedIndexContents(ctx, blob.IDsFromMetadata(blobs))
if err != nil {
if errors.Is(err, blob.ErrBlobNotFound) {
// ErrBlobNotFound is unavoidable because another thread may decide
// to delete some blobs after we compute the index set.
blobNotFoundCount++
continue
}
return errors.Wrap(err, "getMergedIndexContents")
}
successfulMergeCount++
if err := verifySuperset(previousEntries, merged.Entries); err != nil {
return errors.Wrap(err, "verifySuperset")
}
if err := verifySuperset(writtenEntries, merged.Entries); err != nil {
return errors.Wrap(err, "verifySuperset")
}
previousEntries = merged.Entries
dt := randomTime(1*time.Minute, 3*time.Hour)
te2.ft.Advance(dt)
time.Sleep(100 * time.Millisecond)
}
// allow for 5% of NOT_FOUND races
if float64(blobNotFoundCount)/float64(successfulMergeCount) > 0.05 {
t.Fatalf("too many not found cases")
}
t.Logf("worker %v wrote %v", worker, indexNum)
return nil
})
}
require.NoError(t, eg.Wait())
}
// verifySuperset verifies that every element in 'a' is also found in 'b'.
// Both sets are sorted and unique.
func verifySuperset(a, b []int) error {
nextB := 0
for _, it := range a {
for nextB < len(b) && b[nextB] < it {
nextB++
}
if nextB >= len(b) || b[nextB] != it {
return errors.Errorf("%v not found", it)
}
}
return nil
}
func TestIndexEpochManager_RogueBlobs(t *testing.T) {
t.Parallel()
@@ -123,7 +254,7 @@ func TestIndexEpochManager_RogueBlobs(t *testing.T) {
te.data[epochMarkerIndexBlobPrefix+"zzzz"] = []byte{1}
te.data[singleEpochCompactionBlobPrefix+"zzzz"] = []byte{1}
te.data[fullCheckpointIndexBlobPrefix+"zzzz"] = []byte{1}
te.data[rangeCheckpointIndexBlobPrefix+"zzzz"] = []byte{1}
verifySequentialWrites(t, te)
te.mgr.Cleanup(testlogging.Context(t))
@@ -201,7 +332,7 @@ func TestRefreshRetriesIfTakingTooLong(t *testing.T) {
te.faultyStorage.Faults = map[string][]*blobtesting.Fault{
"ListBlobs": {
&blobtesting.Fault{
Repeat: 4, // refresh does 3 lists, so this will cause 2 unsuccessful retries
Repeat: 8, // refresh does 7 lists, so this will cause 2 unsuccessful retries
ErrCallback: func() error {
te.ft.Advance(24 * time.Hour)
@@ -253,88 +384,50 @@ func TestGetCompleteIndexSetRetriesIfTookTooLong(t *testing.T) {
require.EqualValues(t, 1, *te.mgr.getCompleteIndexSetTooSlow)
}
func TestLateWriteIsIgnored(t *testing.T) {
func TestSlowWrite(t *testing.T) {
te := newTestEnv(t)
defer te.mgr.Flush()
ctx := testlogging.Context(t)
// get current epoch number
epoch, err := te.mgr.Current(ctx)
require.NoError(t, err)
var rnd [8]byte
rand.Read(rnd[:])
blobID1 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:]))
rand.Read(rnd[:])
blobID2 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:]))
// at this point it's possible that the process hangs for a very long time, during which the
// current epoch moves by 2. This would be dangerous, since we'd potentially modify an already
// settled epoch.
// To verify this, we call WroteIndex() after the write which will fail if the write finished
// late. During read we will ignore index files with dates that are too late.
// simulate process process hanging for a very long time, during which time the epoch moves.
for i := 0; i < 30; i++ {
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i))
te.ft.Advance(time.Hour)
te.faultyStorage.Faults = map[string][]*blobtesting.Fault{
"PutBlob": {
{
Repeat: 10,
ErrCallback: func() error {
te.ft.Advance(24 * time.Hour)
return nil
},
},
},
}
// epoch advance is triggered during reads.
_, err = te.mgr.GetCompleteIndexSet(ctx, epoch+1)
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(1))
require.EqualValues(t, 11, *te.mgr.writeIndexTooSlow)
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(2))
te.verifyCompleteIndexSet(ctx, t, LatestEpoch, newFakeIndexWithEntries(1, 2))
}
func TestForceAdvanceEpoch(t *testing.T) {
te := newTestEnv(t)
defer te.mgr.Flush()
ctx := testlogging.Context(t)
cs, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, 0, cs.WriteEpoch)
// make sure the epoch has moved
epoch2, err := te.mgr.Current(ctx)
require.NoError(t, te.mgr.ForceAdvanceEpoch(ctx))
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epoch+1, epoch2)
require.Equal(t, 1, cs.WriteEpoch)
require.NoError(t, te.st.PutBlob(ctx, blobID1, gather.FromSlice([]byte("dummy"))))
bm, err := te.unloggedst.GetMetadata(ctx, blobID1)
require.NoError(t, te.mgr.ForceAdvanceEpoch(ctx))
cs, err = te.mgr.Current(ctx)
require.NoError(t, err)
// it's not an error to finish the write in the next epoch.
require.NoError(t, te.mgr.WroteIndex(ctx, bm))
// move the epoch one more.
for i := 0; i < 30; i++ {
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i))
te.ft.Advance(time.Hour)
}
// epoch advance is triggered during reads.
_, err = te.mgr.GetCompleteIndexSet(ctx, epoch+2)
require.NoError(t, err)
// make sure the epoch has moved
epoch3, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epoch+2, epoch3)
// on Windows the time does not always move forward, give it a nudge.
te.ft.Advance(2 * time.Second)
require.NoError(t, te.st.PutBlob(ctx, blobID2, gather.FromSlice([]byte("dummy"))))
bm, err = te.unloggedst.GetMetadata(ctx, blobID2)
require.NoError(t, err)
// at this point WroteIndex() will fail because epoch #0 is already settled.
require.Error(t, te.mgr.WroteIndex(ctx, bm))
iset, err := te.mgr.GetCompleteIndexSet(ctx, epoch3)
require.NoError(t, err)
// blobID1 will be included in the index.
require.Contains(t, blob.IDsFromMetadata(iset), blobID1)
// blobID2 will be excluded from the index.
require.NotContains(t, blob.IDsFromMetadata(iset), blobID2)
require.Equal(t, 2, cs.WriteEpoch)
}
// nolint:thelper
@@ -419,18 +512,10 @@ func (te *epochManagerTestEnv) getMergedIndexContents(ctx context.Context, blobI
func (te *epochManagerTestEnv) mustWriteIndexFile(ctx context.Context, t *testing.T, ndx *fakeIndex) {
t.Helper()
epoch, err := te.mgr.Current(ctx)
require.NoError(t, err)
var rnd [8]byte
rand.Read(rnd[:])
blobID := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:]))
require.NoError(t, te.st.PutBlob(ctx, blobID, gather.FromSlice(ndx.Bytes())))
bm, err := te.unloggedst.GetMetadata(ctx, blobID)
_, err := te.mgr.WriteIndex(ctx, blob.ID(hex.EncodeToString(rnd[:])), gather.FromSlice(ndx.Bytes()))
require.NoError(t, err)
require.NoError(t, te.mgr.WroteIndex(ctx, bm))
}

View File

@@ -0,0 +1,47 @@
package epoch
import (
"github.com/kopia/kopia/repo/blob"
)
// RangeMetadata represents a range of indexes for [min,max] epoch range. Both min and max are inclusive.
type RangeMetadata struct {
MinEpoch int `json:"min"`
MaxEpoch int `json:"max"`
Blobs []blob.Metadata `json:"blobs"`
}
func findLongestRangeCheckpoint(ranges []*RangeMetadata) []*RangeMetadata {
byMin := map[int][]*RangeMetadata{}
for _, r := range ranges {
byMin[r.MinEpoch] = append(byMin[r.MinEpoch], r)
}
return findLongestRangeCheckpointStartingAt(0, byMin, make(map[int][]*RangeMetadata))
}
func findLongestRangeCheckpointStartingAt(startEpoch int, byMin, memo map[int][]*RangeMetadata) []*RangeMetadata {
l, ok := memo[startEpoch]
if ok {
return l
}
var (
longest = 0
longestMetadata []*RangeMetadata
)
for _, cp := range byMin[startEpoch] {
combined := append([]*RangeMetadata{cp}, findLongestRangeCheckpointStartingAt(cp.MaxEpoch+1, byMin, memo)...)
if max := combined[len(combined)-1].MaxEpoch; max > longest {
longest = max
longestMetadata = combined
}
}
memo[startEpoch] = longestMetadata
return longestMetadata
}

View File

@@ -0,0 +1,63 @@
package epoch
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestLongestRangeCheckpoint(t *testing.T) {
m0_9 := newEpochRangeMetadataForTesting(0, 9)
m0_29 := newEpochRangeMetadataForTesting(0, 29)
m10_19 := newEpochRangeMetadataForTesting(10, 19)
m20_29 := newEpochRangeMetadataForTesting(20, 29)
m30_39 := newEpochRangeMetadataForTesting(30, 39)
m50_59 := newEpochRangeMetadataForTesting(50, 59)
m10_59 := newEpochRangeMetadataForTesting(10, 59)
cases := []struct {
input []*RangeMetadata
want []*RangeMetadata
}{
{
input: nil,
want: nil,
},
{
input: []*RangeMetadata{m0_9, m10_19, m20_29},
want: []*RangeMetadata{m0_9, m10_19, m20_29},
},
{
input: []*RangeMetadata{m0_9, m10_19, m20_29, m50_59},
want: []*RangeMetadata{m0_9, m10_19, m20_29},
},
{
input: []*RangeMetadata{m0_9, m20_29, m50_59},
want: []*RangeMetadata{m0_9},
},
{
input: []*RangeMetadata{m0_29, m20_29, m30_39},
want: []*RangeMetadata{m0_29, m30_39},
},
{
input: []*RangeMetadata{m0_9, m0_29, m10_19, m30_39},
want: []*RangeMetadata{m0_29, m30_39},
},
{
input: []*RangeMetadata{m0_9, m0_29, m10_59, m30_39},
want: []*RangeMetadata{m0_9, m10_59},
},
{
input: []*RangeMetadata{m0_9, m0_9, m0_29, m10_59, m30_39},
want: []*RangeMetadata{m0_9, m10_59},
},
}
for _, tc := range cases {
require.Equal(t, tc.want, findLongestRangeCheckpoint(tc.input))
}
}
func newEpochRangeMetadataForTesting(min, max int) *RangeMetadata {
return &RangeMetadata{MinEpoch: min, MaxEpoch: max}
}

View File

@@ -30,6 +30,29 @@ func epochNumberFromBlobID(blobID blob.ID) (int, bool) {
return n, true
}
// epochNumberFromBlobID extracts the range epoch numbers from a string formatted as
// <prefix><epochNumber>_<epochNumber2>_<remainder>.
func epochRangeFromBlobID(blobID blob.ID) (min, max int, ok bool) {
parts := strings.Split(string(blobID), "_")
// nolint:gomnd
if len(parts) < 3 {
return 0, 0, false
}
first := parts[0]
second := parts[1]
for len(first) > 0 && !unicode.IsDigit(rune(first[0])) {
first = first[1:]
}
n1, err1 := strconv.Atoi(first)
n2, err2 := strconv.Atoi(second)
return n1, n2, err1 == nil && err2 == nil
}
func blobsWrittenBefore(bms []blob.Metadata, maxTime time.Time) []blob.Metadata {
var result []blob.Metadata
@@ -55,3 +78,19 @@ func groupByEpochNumber(bms []blob.Metadata) map[int][]blob.Metadata {
return result
}
func groupByEpochRanges(bms []blob.Metadata) map[int]map[int][]blob.Metadata {
result := map[int]map[int][]blob.Metadata{}
for _, bm := range bms {
if n1, n2, ok := epochRangeFromBlobID(bm.BlobID); ok {
if result[n1] == nil {
result[n1] = make(map[int][]blob.Metadata)
}
result[n1][n2] = append(result[n1][n2], bm)
}
}
return result
}

View File

@@ -2,6 +2,7 @@
import (
"crypto/rand"
"hash/fnv"
"io"
"runtime"
"sort"
@@ -140,3 +141,26 @@ func (b packIndexBuilder) BuildStable(output io.Writer, version int) error {
return errors.Errorf("unsupported index version: %v", version)
}
}
func (b packIndexBuilder) shard(maxShardSize int) []packIndexBuilder {
numShards := (len(b) + maxShardSize - 1) / maxShardSize
if numShards <= 1 {
return []packIndexBuilder{b}
}
result := make([]packIndexBuilder, numShards)
for i := range result {
result[i] = make(packIndexBuilder)
}
for k, v := range b {
h := fnv.New32a()
io.WriteString(h, string(k)) // nolint:errcheck
shard := h.Sum32() % uint32(numShards)
result[shard][k] = v
}
return result
}

View File

@@ -495,3 +495,68 @@ func infoDiff(i1, i2 Info, ignore ...string) []string {
return result
}
func TestShard(t *testing.T) {
b := packIndexBuilder{}
// generate 10000 IDs in random order
ids := make([]int, 10000)
for i := range ids {
ids[i] = i
}
rand.Shuffle(len(ids), func(i, j int) {
ids[i], ids[j] = ids[j], ids[i]
})
// add ID to the builder
for _, id := range ids {
b.Add(&InfoStruct{
ContentID: deterministicContentID("", id),
})
}
// verify number of shards
verifyAllShardedIDs(t, b.shard(100000), len(b), 1)
verifyAllShardedIDs(t, b.shard(100), len(b), 100)
// sharding will always produce stable results, verify sorted shard lengths here
require.ElementsMatch(t,
[]int{460, 472, 473, 477, 479, 483, 486, 492, 498, 499, 501, 503, 504, 505, 511, 519, 524, 528, 542, 544},
verifyAllShardedIDs(t, b.shard(500), len(b), 20))
require.ElementsMatch(t,
[]int{945, 964, 988, 988, 993, 1002, 1014, 1017, 1021, 1068},
verifyAllShardedIDs(t, b.shard(1000), len(b), 10))
require.ElementsMatch(t,
[]int{1952, 1995, 2005, 2013, 2035},
verifyAllShardedIDs(t, b.shard(2000), len(b), 5))
}
func verifyAllShardedIDs(t *testing.T, sharded []packIndexBuilder, numTotal, numShards int) []int {
t.Helper()
require.Len(t, sharded, numShards)
m := map[ID]bool{}
for i := 0; i < numTotal; i++ {
m[deterministicContentID("", i)] = true
}
cnt := 0
var lens []int
for _, s := range sharded {
cnt += len(s)
lens = append(lens, len(s))
for _, v := range s {
delete(m, v.GetContentID())
}
}
require.Equal(t, numTotal, cnt, "invalid total number of sharded elements")
require.Empty(t, m)
return lens
}