From a8a772c2932baa2ff62c36b717561e9f7f7296fe Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 5 Jan 2021 08:10:24 -0800 Subject: [PATCH] performance: improve performance of fragmented index lookups (#765) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * performance: improve performance of fragmented index lookups In a typical repository there will be many small indexes and few large ones. If the number gets above ~100 or so, things get very slow. It helps to pre-merge very small indexes in memory to reduce the number of binary searches and mmaped IO to perform. In some extreme cases where we have many uncompacted index segments (each with just 1-2 entries), the savings are as quite dramatic. In one case with >100 index segments the time to run `kopia snapshot verify` went from 10m to 0.5m after this change. In another well-maintained repository with 1.2M contents and about 25 segments, the time to run `kopia snapshot verify` went from 48s to 35s. Co-authored-by: Julio López --- repo/content/committed_content_index.go | 56 ++++++++++++++++++++++++- repo/content/index.go | 6 +++ repo/content/merged.go | 10 +++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 6b54e0b24..32ef8f454 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -1,6 +1,7 @@ package content import ( + "bytes" "context" "path/filepath" "sync" @@ -10,6 +11,12 @@ "github.com/kopia/kopia/repo/blob" ) +// smallIndexEntryCountThreshold is the threshold to determine whether an +// index is small. Any index with fewer entries than this threshold +// will be combined in-memory to reduce the number of segments and speed up +// large index operations (such as verification of all contents). +const smallIndexEntryCountThreshold = 100 + type committedContentIndex struct { cache committedContentIndexCache @@ -119,7 +126,14 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b newInUse[e] = ndx } - b.merged = newMerged + mergedAndCombined, err := combineSmallIndexes(newMerged) + if err != nil { + return false, errors.Wrap(err, "unable to combine small indexes") + } + + log(ctx).Debugf("combined %v into %v index segments", len(newMerged), len(mergedAndCombined)) + + b.merged = mergedAndCombined b.inUse = newInUse if err := b.cache.expireUnused(ctx, packFiles); err != nil { @@ -131,6 +145,46 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b return true, nil } +func combineSmallIndexes(m mergedIndex) (mergedIndex, error) { + var toKeep, toMerge mergedIndex + + for _, ndx := range m { + if ndx.ApproximateCount() < smallIndexEntryCountThreshold { + toMerge = append(toMerge, ndx) + } else { + toKeep = append(toKeep, ndx) + } + } + + if len(toMerge) <= 1 { + return m, nil + } + + b := packIndexBuilder{} + + for _, ndx := range toMerge { + if err := ndx.Iterate(AllIDs, func(i Info) error { + b.Add(i) + return nil + }); err != nil { + return nil, errors.Wrap(err, "unable to iterate index entries") + } + } + + var buf bytes.Buffer + + if err := b.Build(&buf); err != nil { + return nil, errors.Wrap(err, "error building combined in-memory index") + } + + combined, err := openPackIndex(bytes.NewReader(buf.Bytes())) + if err != nil { + return nil, errors.Wrap(err, "error opening combined in-memory index") + } + + return append(toKeep, combined), nil +} + func (b *committedContentIndex) close() error { b.mu.Lock() defer b.mu.Unlock() diff --git a/repo/content/index.go b/repo/content/index.go index a69ec45e1..ab3331ede 100644 --- a/repo/content/index.go +++ b/repo/content/index.go @@ -21,6 +21,8 @@ type packIndex interface { io.Closer + ApproximateCount() int + GetInfo(contentID ID) (*Info, error) // invoked the provided callback for all entries such that entry.ID >= startID and entry.ID < endID @@ -89,6 +91,10 @@ func readHeader(readerAt io.ReaderAt) (headerInfo, error) { return hi, nil } +func (b *index) ApproximateCount() int { + return b.hdr.entryCount +} + // Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically. // The iteration ends when the callback returns an error, which is propagated to the caller or when // all contents have been visited. diff --git a/repo/content/merged.go b/repo/content/merged.go index d455be999..2ac7230f8 100644 --- a/repo/content/merged.go +++ b/repo/content/merged.go @@ -11,6 +11,16 @@ // mergedIndex is an implementation of Index that transparently merges returns from underlying Indexes. type mergedIndex []packIndex +func (m mergedIndex) ApproximateCount() int { + c := 0 + + for _, ndx := range m { + c += ndx.ApproximateCount() + } + + return c +} + // Close closes all underlying indexes. func (m mergedIndex) Close() error { for _, ndx := range m {