performance: improve performance of fragmented index lookups (#765)

* performance: improve performance of fragmented index lookups

In a typical repository there will be many small indexes and few large
ones. If the number gets above ~100 or so, things get very slow.

It helps to pre-merge very small indexes in memory to reduce the number
of binary searches and mmaped IO to perform. In some extreme cases
where we have many uncompacted index segments (each with just 1-2
entries), the savings are as quite dramatic.

In one case with >100 index segments the time to run
`kopia snapshot verify` went from 10m to 0.5m after this change.

In another well-maintained repository with 1.2M contents and
about 25 segments, the time to run `kopia snapshot verify` went from
48s to 35s.

Co-authored-by: Julio López <julio+gh@kasten.io>
This commit is contained in:
Jarek Kowalski
2021-01-05 08:10:24 -08:00
committed by GitHub
parent f3737fef6e
commit a8a772c293
3 changed files with 71 additions and 1 deletions

View File

@@ -1,6 +1,7 @@
package content
import (
"bytes"
"context"
"path/filepath"
"sync"
@@ -10,6 +11,12 @@
"github.com/kopia/kopia/repo/blob"
)
// smallIndexEntryCountThreshold is the threshold to determine whether an
// index is small. Any index with fewer entries than this threshold
// will be combined in-memory to reduce the number of segments and speed up
// large index operations (such as verification of all contents).
const smallIndexEntryCountThreshold = 100
type committedContentIndex struct {
cache committedContentIndexCache
@@ -119,7 +126,14 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b
newInUse[e] = ndx
}
b.merged = newMerged
mergedAndCombined, err := combineSmallIndexes(newMerged)
if err != nil {
return false, errors.Wrap(err, "unable to combine small indexes")
}
log(ctx).Debugf("combined %v into %v index segments", len(newMerged), len(mergedAndCombined))
b.merged = mergedAndCombined
b.inUse = newInUse
if err := b.cache.expireUnused(ctx, packFiles); err != nil {
@@ -131,6 +145,46 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b
return true, nil
}
func combineSmallIndexes(m mergedIndex) (mergedIndex, error) {
var toKeep, toMerge mergedIndex
for _, ndx := range m {
if ndx.ApproximateCount() < smallIndexEntryCountThreshold {
toMerge = append(toMerge, ndx)
} else {
toKeep = append(toKeep, ndx)
}
}
if len(toMerge) <= 1 {
return m, nil
}
b := packIndexBuilder{}
for _, ndx := range toMerge {
if err := ndx.Iterate(AllIDs, func(i Info) error {
b.Add(i)
return nil
}); err != nil {
return nil, errors.Wrap(err, "unable to iterate index entries")
}
}
var buf bytes.Buffer
if err := b.Build(&buf); err != nil {
return nil, errors.Wrap(err, "error building combined in-memory index")
}
combined, err := openPackIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
return nil, errors.Wrap(err, "error opening combined in-memory index")
}
return append(toKeep, combined), nil
}
func (b *committedContentIndex) close() error {
b.mu.Lock()
defer b.mu.Unlock()

View File

@@ -21,6 +21,8 @@
type packIndex interface {
io.Closer
ApproximateCount() int
GetInfo(contentID ID) (*Info, error)
// invoked the provided callback for all entries such that entry.ID >= startID and entry.ID < endID
@@ -89,6 +91,10 @@ func readHeader(readerAt io.ReaderAt) (headerInfo, error) {
return hi, nil
}
func (b *index) ApproximateCount() int {
return b.hdr.entryCount
}
// Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically.
// The iteration ends when the callback returns an error, which is propagated to the caller or when
// all contents have been visited.

View File

@@ -11,6 +11,16 @@
// mergedIndex is an implementation of Index that transparently merges returns from underlying Indexes.
type mergedIndex []packIndex
func (m mergedIndex) ApproximateCount() int {
c := 0
for _, ndx := range m {
c += ndx.ApproximateCount()
}
return c
}
// Close closes all underlying indexes.
func (m mergedIndex) Close() error {
for _, ndx := range m {