From 2a6240ba3ec8f1d710dabbebe72eaf3fbb337cd1 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 27 Aug 2019 23:05:28 -0700 Subject: [PATCH] content: remove lock while writing content --- repo/content/content_manager.go | 137 +++++++++++++----------- repo/content/content_manager_iterate.go | 26 +++-- repo/content/content_manager_test.go | 5 +- 3 files changed, 95 insertions(+), 73 deletions(-) diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 7b3edace1..16bc66a33 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -68,7 +68,8 @@ type Manager struct { locked bool pendingPacks map[blob.ID]*pendingPackInfo - packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed) + writingPacks []*pendingPackInfo // list of packs that are being written + packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed) disableIndexFlushCount int flushPackIndexesAfter time.Time // time when those indexes should be flushed @@ -102,14 +103,17 @@ func (bm *Manager) DeleteContent(contentID ID) error { } } + // remove from all packs that are being written, since they will be committed to index soon + for _, pp := range bm.writingPacks { + if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted { + bm.deletePreexistingContent(bi) + return nil + } + } + // if found in committed index, add another entry that's marked for deletion if bi, ok := bm.packIndexBuilder[contentID]; ok { - if !bi.Deleted { - // we have this content in index and it's not deleted. - bm.deletePreexistingContent(*bi) - } - - // we have this content in index and it already deleted - do nothing. + bm.deletePreexistingContent(*bi) return nil } @@ -119,11 +123,6 @@ func (bm *Manager) DeleteContent(contentID ID) error { return err } - if bi.Deleted { - // already deleted - return nil - } - bm.deletePreexistingContent(bi) return nil } @@ -131,18 +130,26 @@ func (bm *Manager) DeleteContent(contentID ID) error { // Intentionally passing bi by value. // nolint:hugeParam func (bm *Manager) deletePreexistingContent(ci Info) { + if ci.Deleted { + return + } pp := bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(ci.ID)) ci.Deleted = true ci.TimestampSeconds = bm.timeNow().Unix() pp.currentPackItems[ci.ID] = ci } -func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byte, isDeleted bool) error { - bm.assertLocked() - +func (bm *Manager) addToPackUnlocked(ctx context.Context, contentID ID, data []byte, isDeleted bool) error { prefix := packPrefixForContentID(contentID) - pp := bm.getOrCreatePendingPackInfoLocked(prefix) + bm.lock() + if bm.timeNow().After(bm.flushPackIndexesAfter) { + if err := bm.flushPackIndexesLocked(ctx); err != nil { + return err + } + } + + pp := bm.getOrCreatePendingPackInfoLocked(prefix) data = cloneBytes(data) pp.currentPackDataLength += len(data) pp.currentPackItems[contentID] = Info{ @@ -152,30 +159,20 @@ func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byt Length: uint32(len(data)), TimestampSeconds: bm.timeNow().Unix(), } - - if pp.currentPackDataLength >= bm.maxPackSize { - if err := bm.finishPackLocked(ctx, pp); err != nil { - return errors.Wrap(err, "unable to finish pack") - } - - if err := bm.maybeFlushIndexesLocked(ctx); err != nil { - return err - } + shouldWrite := pp.currentPackDataLength >= bm.maxPackSize + if shouldWrite { + // we're about to write to storage without holding a lock + // remove from pendingPacks so other goroutine tries to mess with this pending pack. + delete(bm.pendingPacks, pp.prefix) + bm.writingPacks = append(bm.writingPacks, pp) } + bm.unlock() - return nil -} - -func (bm *Manager) maybeFlushIndexesLocked(ctx context.Context) error { - bm.assertLocked() - - if bm.timeNow().After(bm.flushPackIndexesAfter) { - if err := bm.finishAllPacksLocked(ctx); err != nil { - return errors.Wrap(err, "finish all packs") - } - - if err := bm.flushPackIndexesLocked(ctx); err != nil { - return err + // at this point we're unlocked so different goroutines can encrypt and + // save to storage in parallel. + if shouldWrite { + if err := bm.writePackAndAddToIndex(ctx, pp, false); err != nil { + return errors.Wrap(err, "unable to write pack") } } @@ -285,13 +282,11 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { } func (bm *Manager) finishAllPacksLocked(ctx context.Context) error { - for _, pp := range bm.pendingPacks { - if len(pp.currentPackItems) == 0 { - log.Debugf("no current pack entries") - continue - } + for prefix, pp := range bm.pendingPacks { + delete(bm.pendingPacks, prefix) + bm.writingPacks = append(bm.writingPacks, pp) - if err := bm.finishPackLocked(ctx, pp); err != nil { + if err := bm.writePackAndAddToIndex(ctx, pp, true); err != nil { return errors.Wrap(err, "error writing pack content") } } @@ -299,9 +294,7 @@ func (bm *Manager) finishAllPacksLocked(ctx context.Context) error { return nil } -func (bm *Manager) finishPackLocked(ctx context.Context, pp *pendingPackInfo) error { - bm.assertLocked() - +func (bm *Manager) writePackAndAddToIndex(ctx context.Context, pp *pendingPackInfo, lockHeld bool) error { contentID := make([]byte, 16) if _, err := cryptorand.Read(contentID); err != nil { return errors.Wrap(err, "unable to read crypto bytes") @@ -320,14 +313,32 @@ func (bm *Manager) finishPackLocked(ctx context.Context, pp *pendingPackInfo) er formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData)) } + // after the file has been writte, add pack index builder entries to index. + if !lockHeld { + bm.lock() + } + + bm.writingPacks = removePendingPack(bm.writingPacks, pp) for _, info := range packFileIndex { bm.packIndexBuilder.Add(*info) } - delete(bm.pendingPacks, pp.prefix) + if !lockHeld { + bm.unlock() + } return nil } +func removePendingPack(slice []*pendingPackInfo, pp *pendingPackInfo) []*pendingPackInfo { + result := slice[:0] + for _, p := range slice { + if p != pp { + result = append(result, p) + } + } + return result +} + // Close closes the content manager. func (bm *Manager) Close() { bm.contentCache.close() @@ -347,7 +358,6 @@ func (bm *Manager) Flush(ctx context.Context) error { if err := bm.flushPackIndexesLocked(ctx); err != nil { return errors.Wrap(err, "error flushing indexes") } - return nil } @@ -363,10 +373,7 @@ func (bm *Manager) RewriteContent(ctx context.Context, contentID ID) error { return err } - bm.lock() - defer bm.unlock() - - return bm.addToPackLocked(ctx, contentID, data, bi.Deleted) + return bm.addToPackUnlocked(ctx, contentID, data, bi.Deleted) } func packPrefixForContentID(contentID ID) blob.ID { @@ -402,10 +409,7 @@ func (bm *Manager) WriteContent(ctx context.Context, data []byte, prefix ID) (ID } } - log.Debugf("WriteContent(%q) - new", contentID) - bm.lock() - defer bm.unlock() - err := bm.addToPackLocked(ctx, contentID, data, false) + err := bm.addToPackUnlocked(ctx, contentID, data, false) return contentID, err } @@ -429,15 +433,21 @@ func (bm *Manager) getContentInfo(contentID ID) (Info, error) { // check added contents, not written to any packs yet. for _, pp := range bm.pendingPacks { - bi, ok := pp.currentPackItems[contentID] - if ok { - return bi, nil + if ci, ok := pp.currentPackItems[contentID]; ok { + return ci, nil + } + } + + // check contents being written to packs right now. + for _, pp := range bm.writingPacks { + if ci, ok := pp.currentPackItems[contentID]; ok { + return ci, nil } } // added contents, written to packs but not yet added to indexes - if bi, ok := bm.packIndexBuilder[contentID]; ok { - return *bi, nil + if ci, ok := bm.packIndexBuilder[contentID]; ok { + return *ci, nil } // read from committed content index @@ -477,9 +487,6 @@ func (bm *Manager) assertLocked() { // Refresh reloads the committed content indexes. func (bm *Manager) Refresh(ctx context.Context) (bool, error) { - bm.mu.Lock() - defer bm.mu.Unlock() - log.Debugf("Refresh started") t0 := time.Now() _, updated, err := bm.loadPackIndexesUnlocked(ctx) diff --git a/repo/content/content_manager_iterate.go b/repo/content/content_manager_iterate.go index ec6775362..9746a949b 100644 --- a/repo/content/content_manager_iterate.go +++ b/repo/content/content_manager_iterate.go @@ -73,24 +73,36 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter return callback, cleanup } -// IterateContents invokes the provided callback for each content starting with a specified prefix -// and possibly including deleted items. -func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error { +func (bm *Manager) snapshotUncommittedItems() packIndexBuilder { bm.lock() + defer bm.unlock() + overlay := bm.packIndexBuilder.clone() for _, pp := range bm.pendingPacks { for _, pi := range pp.currentPackItems { overlay.Add(pi) } } - bm.unlock() + for _, pp := range bm.writingPacks { + for _, pi := range pp.currentPackItems { + overlay.Add(pi) + } + } + return overlay +} + +// IterateContents invokes the provided callback for each content starting with a specified prefix +// and possibly including deleted items. +func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error { callback, cleanup := maybeParallelExecutor(opts.Parallel, callback) defer cleanup() //nolint:errcheck + uncommitted := bm.snapshotUncommittedItems() + invokeCallback := func(i Info) error { if !opts.IncludeDeleted { - if ci, ok := overlay[i.ID]; ok { + if ci, ok := uncommitted[i.ID]; ok { if ci.Deleted { return nil } @@ -105,12 +117,12 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback return callback(i) } - if len(overlay) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { + if len(uncommitted) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { // fast path, invoke callback directly invokeCallback = callback } - for _, bi := range overlay { + for _, bi := range uncommitted { _ = invokeCallback(*bi) } diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 3e20e991e..a653bc2eb 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -387,7 +387,10 @@ func TestDeleteContent(t *testing.T) { keyTime := map[blob.ID]time.Time{} bm := newTestContentManager(data, keyTime, nil) content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) - bm.Flush(ctx) + if err := bm.Flush(ctx); err != nil { + t.Fatalf("error flushing: %v", err) + } + dumpContents(t, bm, "after first flush") content2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100)) log.Infof("xxx deleting.") if err := bm.DeleteContent(content1); err != nil {