diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index d955ce724..109fb6ea0 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -32,7 +32,7 @@ type committedManifestManager struct { // +checklocks:cmmu committedEntries map[ID]*manifestEntry // +checklocks:cmmu - committedContentIDs map[content.ID]bool + committedContentIDs map[content.ID]struct{} // autoCompactionThreshold controls the threshold after which the manager auto-compacts // manifest contents @@ -79,7 +79,7 @@ func (m *committedManifestManager) findCommittedEntries(ctx context.Context, lab return findEntriesMatchingLabels(m.committedEntries, labels), nil } -func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) { if len(entries) == 0 { return nil, nil } @@ -98,7 +98,7 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma // the lock via commitEntries()) and to compact existing committed entries during compaction // where the lock is already being held. // +checklocks:m.cmmu -func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) { if len(entries) == 0 { return nil, nil } @@ -124,12 +124,11 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri for _, e := range entries { m.committedEntries[e.ID] = e - delete(entries, e.ID) } - m.committedContentIDs[contentID] = true + m.committedContentIDs[contentID] = struct{}{} - return map[content.ID]bool{contentID: true}, nil + return map[content.ID]struct{}{contentID: {}}, nil } // +checklocks:m.cmmu @@ -192,10 +191,10 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte // +checklocks:m.cmmu func (m *committedManifestManager) loadManifestContentsLocked(manifests map[content.ID]manifest) { m.committedEntries = map[ID]*manifestEntry{} - m.committedContentIDs = map[content.ID]bool{} + m.committedContentIDs = map[content.ID]struct{}{} for contentID := range manifests { - m.committedContentIDs[contentID] = true + m.committedContentIDs[contentID] = struct{}{} } for _, man := range manifests { @@ -257,19 +256,14 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error { m.b.DisableIndexFlush(ctx) defer m.b.EnableIndexFlush(ctx) - tmp := map[ID]*manifestEntry{} - for k, v := range m.committedEntries { - tmp[k] = v - } - - written, err := m.writeEntriesLocked(ctx, tmp) + written, err := m.writeEntriesLocked(ctx, m.committedEntries) if err != nil { return err } // add the newly-created content to the list, could be duplicate for b := range m.committedContentIDs { - if written[b] { + if _, ok := written[b]; ok { // do not delete content that was just written. continue } @@ -374,7 +368,7 @@ func newCommittedManager(b contentManager, autoCompactionThreshold int) *committ b: b, debugID: debugID, committedEntries: map[ID]*manifestEntry{}, - committedContentIDs: map[content.ID]bool{}, + committedContentIDs: map[content.ID]struct{}{}, autoCompactionThreshold: autoCompactionThreshold, } } diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index cb53935f3..a90c1dbb5 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -215,6 +215,9 @@ func (m *Manager) Flush(ctx context.Context) error { defer m.mu.Unlock() _, err := m.committed.commitEntries(ctx, m.pendingEntries) + if err == nil { + m.pendingEntries = map[ID]*manifestEntry{} + } return err } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 9c567d885..c60fa24cc 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -3,6 +3,7 @@ import ( "context" "encoding/json" + "fmt" "reflect" "sort" "strings" @@ -310,8 +311,8 @@ type contentManagerOpts struct { readOnly bool } -func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap, opts contentManagerOpts) contentManager { - t.Helper() +func newContentManagerForTesting(ctx context.Context, tb testing.TB, data blobtesting.DataMap, opts contentManagerOpts) contentManager { + tb.Helper() st := blobtesting.NewMapStorage(data, nil, nil) @@ -328,12 +329,12 @@ func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtes }, }, nil) - require.NoError(t, err) + require.NoError(tb, err) bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) - require.NoError(t, err) + require.NoError(tb, err) - t.Cleanup(func() { bm.CloseShared(ctx) }) + tb.Cleanup(func() { bm.CloseShared(ctx) }) return bm } @@ -488,3 +489,50 @@ func TestManifestAutoCompactionWithReadOnly(t *testing.T) { _, err = mgr.Find(ctx, map[string]string{"color": "red"}) require.NoError(t, err, "forcing reload of manifest manager") } + +func BenchmarkLargeCompaction(b *testing.B) { + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + + table := []int{10000, 100000, 1000000} + + for _, numItems := range table { + b.Run(fmt.Sprintf("%dItems", numItems), func(b *testing.B) { + for range b.N { + b.StopTimer() + // Use default context to avoid lots of log output during benchmark. + ctx := context.Background() + data := blobtesting.DataMap{} + + bm := newContentManagerForTesting(ctx, b, data, contentManagerOpts{}) + + mgr, err := NewManager( + ctx, + bm, + ManagerOptions{AutoCompactionThreshold: 2}, + nil, + ) + require.NoError(b, err, "getting initial manifest manager") + + for range numItems - 1 { + _, err = mgr.Put(ctx, labels1, item1) + require.NoError(b, err, "adding item to manifest manager") + } + + require.NoError(b, mgr.Flush(ctx)) + require.NoError(b, mgr.b.Flush(ctx)) + + _, err = mgr.Put(ctx, labels1, item1) + require.NoError(b, err, "adding item to manifest manager") + + require.NoError(b, mgr.Flush(ctx)) + require.NoError(b, mgr.b.Flush(ctx)) + + b.StartTimer() + + err = mgr.Compact(ctx) + require.NoError(b, err, "forcing reload of manifest manager") + } + }) + } +}