diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 6b54e0b24..32ef8f454 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -1,6 +1,7 @@ package content import ( + "bytes" "context" "path/filepath" "sync" @@ -10,6 +11,12 @@ "github.com/kopia/kopia/repo/blob" ) +// smallIndexEntryCountThreshold is the threshold to determine whether an +// index is small. Any index with fewer entries than this threshold +// will be combined in-memory to reduce the number of segments and speed up +// large index operations (such as verification of all contents). +const smallIndexEntryCountThreshold = 100 + type committedContentIndex struct { cache committedContentIndexCache @@ -119,7 +126,14 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b newInUse[e] = ndx } - b.merged = newMerged + mergedAndCombined, err := combineSmallIndexes(newMerged) + if err != nil { + return false, errors.Wrap(err, "unable to combine small indexes") + } + + log(ctx).Debugf("combined %v into %v index segments", len(newMerged), len(mergedAndCombined)) + + b.merged = mergedAndCombined b.inUse = newInUse if err := b.cache.expireUnused(ctx, packFiles); err != nil { @@ -131,6 +145,46 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b return true, nil } +func combineSmallIndexes(m mergedIndex) (mergedIndex, error) { + var toKeep, toMerge mergedIndex + + for _, ndx := range m { + if ndx.ApproximateCount() < smallIndexEntryCountThreshold { + toMerge = append(toMerge, ndx) + } else { + toKeep = append(toKeep, ndx) + } + } + + if len(toMerge) <= 1 { + return m, nil + } + + b := packIndexBuilder{} + + for _, ndx := range toMerge { + if err := ndx.Iterate(AllIDs, func(i Info) error { + b.Add(i) + return nil + }); err != nil { + return nil, errors.Wrap(err, "unable to iterate index entries") + } + } + + var buf bytes.Buffer + + if err := b.Build(&buf); err != nil { + return nil, errors.Wrap(err, "error building combined in-memory index") + } + + combined, err := openPackIndex(bytes.NewReader(buf.Bytes())) + if err != nil { + return nil, errors.Wrap(err, "error opening combined in-memory index") + } + + return append(toKeep, combined), nil +} + func (b *committedContentIndex) close() error { b.mu.Lock() defer b.mu.Unlock() diff --git a/repo/content/index.go b/repo/content/index.go index a69ec45e1..ab3331ede 100644 --- a/repo/content/index.go +++ b/repo/content/index.go @@ -21,6 +21,8 @@ type packIndex interface { io.Closer + ApproximateCount() int + GetInfo(contentID ID) (*Info, error) // invoked the provided callback for all entries such that entry.ID >= startID and entry.ID < endID @@ -89,6 +91,10 @@ func readHeader(readerAt io.ReaderAt) (headerInfo, error) { return hi, nil } +func (b *index) ApproximateCount() int { + return b.hdr.entryCount +} + // Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically. // The iteration ends when the callback returns an error, which is propagated to the caller or when // all contents have been visited. diff --git a/repo/content/merged.go b/repo/content/merged.go index d455be999..2ac7230f8 100644 --- a/repo/content/merged.go +++ b/repo/content/merged.go @@ -11,6 +11,16 @@ // mergedIndex is an implementation of Index that transparently merges returns from underlying Indexes. type mergedIndex []packIndex +func (m mergedIndex) ApproximateCount() int { + c := 0 + + for _, ndx := range m { + c += ndx.ApproximateCount() + } + + return c +} + // Close closes all underlying indexes. func (m mergedIndex) Close() error { for _, ndx := range m {