From 9a507b73d58da3d407cd14f1eb088090dc3d2a18 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Wed, 9 Oct 2024 18:41:13 -0700 Subject: [PATCH] refactor(repository): Reduce memory allocations during manifest compaction (#4084) * Change struct for tracking committed content Committed content only ever has a value of 'true' for committed so use an empty struct and an existance check instead. * Don't copy committed manifest set for compaction Assuming the number of committed manifests is much larger than the number of manifest updates, it seems reasonable to update the logic to write manifests out to not delete entries from the set being operated on. Doing so allows us to avoid creating a duplicate of the set of all committed manifests during compaction, which could also save some memory as the temporary map wasn't being right-sized based on the the number of committed entries. This also works because writing data either fails or succeeds completely. That means there's no possibility of only some entries being written out but not others, which means callers can use the presence of an error to determine how to react (i.e. clear pending set). * Benchmarks for compaction --- repo/manifest/committed_manifest_manager.go | 26 ++++----- repo/manifest/manifest_manager.go | 3 ++ repo/manifest/manifest_manager_test.go | 58 +++++++++++++++++++-- 3 files changed, 66 insertions(+), 21 deletions(-) diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index d955ce724..109fb6ea0 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -32,7 +32,7 @@ type committedManifestManager struct { // +checklocks:cmmu committedEntries map[ID]*manifestEntry // +checklocks:cmmu - committedContentIDs map[content.ID]bool + committedContentIDs map[content.ID]struct{} // autoCompactionThreshold controls the threshold after which the manager auto-compacts // manifest contents @@ -79,7 +79,7 @@ func (m *committedManifestManager) findCommittedEntries(ctx context.Context, lab return findEntriesMatchingLabels(m.committedEntries, labels), nil } -func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) { if len(entries) == 0 { return nil, nil } @@ -98,7 +98,7 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma // the lock via commitEntries()) and to compact existing committed entries during compaction // where the lock is already being held. // +checklocks:m.cmmu -func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) { if len(entries) == 0 { return nil, nil } @@ -124,12 +124,11 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri for _, e := range entries { m.committedEntries[e.ID] = e - delete(entries, e.ID) } - m.committedContentIDs[contentID] = true + m.committedContentIDs[contentID] = struct{}{} - return map[content.ID]bool{contentID: true}, nil + return map[content.ID]struct{}{contentID: {}}, nil } // +checklocks:m.cmmu @@ -192,10 +191,10 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte // +checklocks:m.cmmu func (m *committedManifestManager) loadManifestContentsLocked(manifests map[content.ID]manifest) { m.committedEntries = map[ID]*manifestEntry{} - m.committedContentIDs = map[content.ID]bool{} + m.committedContentIDs = map[content.ID]struct{}{} for contentID := range manifests { - m.committedContentIDs[contentID] = true + m.committedContentIDs[contentID] = struct{}{} } for _, man := range manifests { @@ -257,19 +256,14 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error { m.b.DisableIndexFlush(ctx) defer m.b.EnableIndexFlush(ctx) - tmp := map[ID]*manifestEntry{} - for k, v := range m.committedEntries { - tmp[k] = v - } - - written, err := m.writeEntriesLocked(ctx, tmp) + written, err := m.writeEntriesLocked(ctx, m.committedEntries) if err != nil { return err } // add the newly-created content to the list, could be duplicate for b := range m.committedContentIDs { - if written[b] { + if _, ok := written[b]; ok { // do not delete content that was just written. continue } @@ -374,7 +368,7 @@ func newCommittedManager(b contentManager, autoCompactionThreshold int) *committ b: b, debugID: debugID, committedEntries: map[ID]*manifestEntry{}, - committedContentIDs: map[content.ID]bool{}, + committedContentIDs: map[content.ID]struct{}{}, autoCompactionThreshold: autoCompactionThreshold, } } diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index cb53935f3..a90c1dbb5 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -215,6 +215,9 @@ func (m *Manager) Flush(ctx context.Context) error { defer m.mu.Unlock() _, err := m.committed.commitEntries(ctx, m.pendingEntries) + if err == nil { + m.pendingEntries = map[ID]*manifestEntry{} + } return err } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 9c567d885..c60fa24cc 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -3,6 +3,7 @@ import ( "context" "encoding/json" + "fmt" "reflect" "sort" "strings" @@ -310,8 +311,8 @@ type contentManagerOpts struct { readOnly bool } -func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap, opts contentManagerOpts) contentManager { - t.Helper() +func newContentManagerForTesting(ctx context.Context, tb testing.TB, data blobtesting.DataMap, opts contentManagerOpts) contentManager { + tb.Helper() st := blobtesting.NewMapStorage(data, nil, nil) @@ -328,12 +329,12 @@ func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtes }, }, nil) - require.NoError(t, err) + require.NoError(tb, err) bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) - require.NoError(t, err) + require.NoError(tb, err) - t.Cleanup(func() { bm.CloseShared(ctx) }) + tb.Cleanup(func() { bm.CloseShared(ctx) }) return bm } @@ -488,3 +489,50 @@ func TestManifestAutoCompactionWithReadOnly(t *testing.T) { _, err = mgr.Find(ctx, map[string]string{"color": "red"}) require.NoError(t, err, "forcing reload of manifest manager") } + +func BenchmarkLargeCompaction(b *testing.B) { + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + + table := []int{10000, 100000, 1000000} + + for _, numItems := range table { + b.Run(fmt.Sprintf("%dItems", numItems), func(b *testing.B) { + for range b.N { + b.StopTimer() + // Use default context to avoid lots of log output during benchmark. + ctx := context.Background() + data := blobtesting.DataMap{} + + bm := newContentManagerForTesting(ctx, b, data, contentManagerOpts{}) + + mgr, err := NewManager( + ctx, + bm, + ManagerOptions{AutoCompactionThreshold: 2}, + nil, + ) + require.NoError(b, err, "getting initial manifest manager") + + for range numItems - 1 { + _, err = mgr.Put(ctx, labels1, item1) + require.NoError(b, err, "adding item to manifest manager") + } + + require.NoError(b, mgr.Flush(ctx)) + require.NoError(b, mgr.b.Flush(ctx)) + + _, err = mgr.Put(ctx, labels1, item1) + require.NoError(b, err, "adding item to manifest manager") + + require.NoError(b, mgr.Flush(ctx)) + require.NoError(b, mgr.b.Flush(ctx)) + + b.StartTimer() + + err = mgr.Compact(ctx) + require.NoError(b, err, "forcing reload of manifest manager") + } + }) + } +}