mirror of
https://github.com/kopia/kopia.git
synced 2026-03-26 10:01:32 -04:00
refactor(repository): Reduce memory allocations during manifest compaction (#4084)
* Change struct for tracking committed content Committed content only ever has a value of 'true' for committed so use an empty struct and an existance check instead. * Don't copy committed manifest set for compaction Assuming the number of committed manifests is much larger than the number of manifest updates, it seems reasonable to update the logic to write manifests out to not delete entries from the set being operated on. Doing so allows us to avoid creating a duplicate of the set of all committed manifests during compaction, which could also save some memory as the temporary map wasn't being right-sized based on the the number of committed entries. This also works because writing data either fails or succeeds completely. That means there's no possibility of only some entries being written out but not others, which means callers can use the presence of an error to determine how to react (i.e. clear pending set). * Benchmarks for compaction
This commit is contained in:
@@ -32,7 +32,7 @@ type committedManifestManager struct {
|
||||
// +checklocks:cmmu
|
||||
committedEntries map[ID]*manifestEntry
|
||||
// +checklocks:cmmu
|
||||
committedContentIDs map[content.ID]bool
|
||||
committedContentIDs map[content.ID]struct{}
|
||||
|
||||
// autoCompactionThreshold controls the threshold after which the manager auto-compacts
|
||||
// manifest contents
|
||||
@@ -79,7 +79,7 @@ func (m *committedManifestManager) findCommittedEntries(ctx context.Context, lab
|
||||
return findEntriesMatchingLabels(m.committedEntries, labels), nil
|
||||
}
|
||||
|
||||
func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) {
|
||||
func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -98,7 +98,7 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma
|
||||
// the lock via commitEntries()) and to compact existing committed entries during compaction
|
||||
// where the lock is already being held.
|
||||
// +checklocks:m.cmmu
|
||||
func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) {
|
||||
func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) {
|
||||
if len(entries) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -124,12 +124,11 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
|
||||
|
||||
for _, e := range entries {
|
||||
m.committedEntries[e.ID] = e
|
||||
delete(entries, e.ID)
|
||||
}
|
||||
|
||||
m.committedContentIDs[contentID] = true
|
||||
m.committedContentIDs[contentID] = struct{}{}
|
||||
|
||||
return map[content.ID]bool{contentID: true}, nil
|
||||
return map[content.ID]struct{}{contentID: {}}, nil
|
||||
}
|
||||
|
||||
// +checklocks:m.cmmu
|
||||
@@ -192,10 +191,10 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte
|
||||
// +checklocks:m.cmmu
|
||||
func (m *committedManifestManager) loadManifestContentsLocked(manifests map[content.ID]manifest) {
|
||||
m.committedEntries = map[ID]*manifestEntry{}
|
||||
m.committedContentIDs = map[content.ID]bool{}
|
||||
m.committedContentIDs = map[content.ID]struct{}{}
|
||||
|
||||
for contentID := range manifests {
|
||||
m.committedContentIDs[contentID] = true
|
||||
m.committedContentIDs[contentID] = struct{}{}
|
||||
}
|
||||
|
||||
for _, man := range manifests {
|
||||
@@ -257,19 +256,14 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error {
|
||||
m.b.DisableIndexFlush(ctx)
|
||||
defer m.b.EnableIndexFlush(ctx)
|
||||
|
||||
tmp := map[ID]*manifestEntry{}
|
||||
for k, v := range m.committedEntries {
|
||||
tmp[k] = v
|
||||
}
|
||||
|
||||
written, err := m.writeEntriesLocked(ctx, tmp)
|
||||
written, err := m.writeEntriesLocked(ctx, m.committedEntries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add the newly-created content to the list, could be duplicate
|
||||
for b := range m.committedContentIDs {
|
||||
if written[b] {
|
||||
if _, ok := written[b]; ok {
|
||||
// do not delete content that was just written.
|
||||
continue
|
||||
}
|
||||
@@ -374,7 +368,7 @@ func newCommittedManager(b contentManager, autoCompactionThreshold int) *committ
|
||||
b: b,
|
||||
debugID: debugID,
|
||||
committedEntries: map[ID]*manifestEntry{},
|
||||
committedContentIDs: map[content.ID]bool{},
|
||||
committedContentIDs: map[content.ID]struct{}{},
|
||||
autoCompactionThreshold: autoCompactionThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,6 +215,9 @@ func (m *Manager) Flush(ctx context.Context) error {
|
||||
defer m.mu.Unlock()
|
||||
|
||||
_, err := m.committed.commitEntries(ctx, m.pendingEntries)
|
||||
if err == nil {
|
||||
m.pendingEntries = map[ID]*manifestEntry{}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -310,8 +311,8 @@ type contentManagerOpts struct {
|
||||
readOnly bool
|
||||
}
|
||||
|
||||
func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap, opts contentManagerOpts) contentManager {
|
||||
t.Helper()
|
||||
func newContentManagerForTesting(ctx context.Context, tb testing.TB, data blobtesting.DataMap, opts contentManagerOpts) contentManager {
|
||||
tb.Helper()
|
||||
|
||||
st := blobtesting.NewMapStorage(data, nil, nil)
|
||||
|
||||
@@ -328,12 +329,12 @@ func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtes
|
||||
},
|
||||
}, nil)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
|
||||
bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
|
||||
t.Cleanup(func() { bm.CloseShared(ctx) })
|
||||
tb.Cleanup(func() { bm.CloseShared(ctx) })
|
||||
|
||||
return bm
|
||||
}
|
||||
@@ -488,3 +489,50 @@ func TestManifestAutoCompactionWithReadOnly(t *testing.T) {
|
||||
_, err = mgr.Find(ctx, map[string]string{"color": "red"})
|
||||
require.NoError(t, err, "forcing reload of manifest manager")
|
||||
}
|
||||
|
||||
func BenchmarkLargeCompaction(b *testing.B) {
|
||||
item1 := map[string]int{"foo": 1, "bar": 2}
|
||||
labels1 := map[string]string{"type": "item", "color": "red"}
|
||||
|
||||
table := []int{10000, 100000, 1000000}
|
||||
|
||||
for _, numItems := range table {
|
||||
b.Run(fmt.Sprintf("%dItems", numItems), func(b *testing.B) {
|
||||
for range b.N {
|
||||
b.StopTimer()
|
||||
// Use default context to avoid lots of log output during benchmark.
|
||||
ctx := context.Background()
|
||||
data := blobtesting.DataMap{}
|
||||
|
||||
bm := newContentManagerForTesting(ctx, b, data, contentManagerOpts{})
|
||||
|
||||
mgr, err := NewManager(
|
||||
ctx,
|
||||
bm,
|
||||
ManagerOptions{AutoCompactionThreshold: 2},
|
||||
nil,
|
||||
)
|
||||
require.NoError(b, err, "getting initial manifest manager")
|
||||
|
||||
for range numItems - 1 {
|
||||
_, err = mgr.Put(ctx, labels1, item1)
|
||||
require.NoError(b, err, "adding item to manifest manager")
|
||||
}
|
||||
|
||||
require.NoError(b, mgr.Flush(ctx))
|
||||
require.NoError(b, mgr.b.Flush(ctx))
|
||||
|
||||
_, err = mgr.Put(ctx, labels1, item1)
|
||||
require.NoError(b, err, "adding item to manifest manager")
|
||||
|
||||
require.NoError(b, mgr.Flush(ctx))
|
||||
require.NoError(b, mgr.b.Flush(ctx))
|
||||
|
||||
b.StartTimer()
|
||||
|
||||
err = mgr.Compact(ctx)
|
||||
require.NoError(b, err, "forcing reload of manifest manager")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user