From 439e19d5c8a44048ba47f743345944600eb832d2 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 26 Aug 2019 22:19:36 -0700 Subject: [PATCH] content: refactored internal state Previously 'packIndexBuilder' contained both contents that have been written to packs and the ones that have not. This change makes it so that 'packIndexBuilder' only contains contents from flushed packs, but non pending ones. It will help parallelize writes later. --- repo/content/content_manager.go | 40 +++++++++-------------- repo/content/content_manager_iterate.go | 13 +++++--- repo/content/content_manager_lock_free.go | 9 +++++ repo/content/content_manager_test.go | 26 ++++++++++----- 4 files changed, 52 insertions(+), 36 deletions(-) diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 3056262eb..507c349a2 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -8,7 +8,6 @@ "encoding/hex" "fmt" "os" - "reflect" "sync" "time" @@ -69,7 +68,7 @@ type Manager struct { locked bool pendingPacks map[blob.ID]*pendingPackInfo - packIndexBuilder packIndexBuilder // contents that are in index currently being built (current pack and all packs saved but not committed) + packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed) disableIndexFlushCount int flushPackIndexesAfter time.Time // time when those indexes should be flushed @@ -94,28 +93,29 @@ func (bm *Manager) DeleteContent(contentID ID) error { log.Debugf("DeleteContent(%q)", contentID) - // We have this content in current pack index and it's already deleted there. + // remove from all pending packs + for _, pp := range bm.pendingPacks { + if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted { + delete(pp.currentPackItems, contentID) + return nil + } + } + + // if found in committed index, add another entry that's marked for deletion if bi, ok := bm.packIndexBuilder[contentID]; ok { if !bi.Deleted { - if bi.PackBlobID == "" { - // added and never committed, just forget about it. - delete(bm.packIndexBuilder, contentID) - for _, pp := range bm.pendingPacks { - delete(pp.currentPackItems, contentID) - } - return nil - } - - // added and committed. + // we have this content in index and it's not deleted. bi2 := *bi bi2.Deleted = true bi2.TimestampSeconds = bm.timeNow().Unix() bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2) } + + // we have this content in index and it already deleted - do nothing. return nil } - // We have this content in current pack index and it's already deleted there. + // see if the block existed before bi, err := bm.committedContents.getContent(contentID) if err != nil { return err @@ -130,6 +130,7 @@ func (bm *Manager) DeleteContent(contentID ID) error { bi2 := bi bi2.Deleted = true bi2.TimestampSeconds = bm.timeNow().Unix() + bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2) return nil } @@ -137,7 +138,6 @@ func (bm *Manager) DeleteContent(contentID ID) error { //nolint:gocritic // We're intentionally passing "i" by value func (bm *Manager) setPendingContent(pp *pendingPackInfo, i Info) { - bm.packIndexBuilder.Add(i) pp.currentPackItems[i.ID] = i } @@ -226,9 +226,6 @@ func (bm *Manager) verifyCurrentPackItemsLocked() { bm.assertInvariant(cpi.ID == k, "content ID entry has invalid key: %v %v", cpi.ID, k) bm.assertInvariant(cpi.Deleted || cpi.PackBlobID == "", "content ID entry has unexpected pack content ID %v: %v", cpi.ID, cpi.PackBlobID) bm.assertInvariant(cpi.TimestampSeconds != 0, "content has no timestamp: %v", cpi.ID) - bi, ok := bm.packIndexBuilder[k] - bm.assertInvariant(ok, "content ID entry not present in pack index builder: %v", cpi.ID) - bm.assertInvariant(reflect.DeepEqual(*bi, cpi), "current pack index does not match pack index builder: %v", cpi, *bi) } } } @@ -236,10 +233,6 @@ func (bm *Manager) verifyCurrentPackItemsLocked() { func (bm *Manager) verifyPackIndexBuilderLocked() { for k, cpi := range bm.packIndexBuilder { bm.assertInvariant(cpi.ID == k, "content ID entry has invalid key: %v %v", cpi.ID, k) - if _, ok := bm.findContentInPendingPacks(cpi.ID); ok { - // ignore contents also in current packs - continue - } if cpi.Deleted { bm.assertInvariant(cpi.PackBlobID == "", "content can't be both deleted and have a pack content: %v", cpi.ID) } else { @@ -328,15 +321,14 @@ func (bm *Manager) finishPackLocked(ctx context.Context, prefix blob.ID, pp *pen if err := bm.writePackFileNotLocked(ctx, packFile, contentData); err != nil { return errors.Wrap(err, "can't save pack data content") } + formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData)) } - formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData)) for _, info := range packFileIndex { bm.packIndexBuilder.Add(*info) } delete(bm.pendingPacks, prefix) - return nil } diff --git a/repo/content/content_manager_iterate.go b/repo/content/content_manager_iterate.go index 6bed608f4..ec6775362 100644 --- a/repo/content/content_manager_iterate.go +++ b/repo/content/content_manager_iterate.go @@ -77,7 +77,12 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter // and possibly including deleted items. func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error { bm.lock() - pibClone := bm.packIndexBuilder.clone() + overlay := bm.packIndexBuilder.clone() + for _, pp := range bm.pendingPacks { + for _, pi := range pp.currentPackItems { + overlay.Add(pi) + } + } bm.unlock() callback, cleanup := maybeParallelExecutor(opts.Parallel, callback) @@ -85,7 +90,7 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback invokeCallback := func(i Info) error { if !opts.IncludeDeleted { - if ci, ok := pibClone[i.ID]; ok { + if ci, ok := overlay[i.ID]; ok { if ci.Deleted { return nil } @@ -100,12 +105,12 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback return callback(i) } - if len(pibClone) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { + if len(overlay) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { // fast path, invoke callback directly invokeCallback = callback } - for _, bi := range pibClone { + for _, bi := range overlay { _ = invokeCallback(*bi) } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index ad3679b66..72e5d43fd 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -249,11 +249,16 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi } packFileIndex := packIndexBuilder{} + haveContent := false for contentID, info := range pp.currentPackItems { if info.Payload == nil { + // no payload, it's a deletion of a previously-committed content. + packFileIndex.Add(info) continue } + haveContent = true + var encrypted []byte encrypted, err = bm.maybeEncryptContentDataForPacking(info.Payload, info.ID) if err != nil { @@ -283,6 +288,10 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi return nil, nil, nil } + if !haveContent { + return nil, packFileIndex, nil + } + if bm.paddingUnit > 0 { if missing := bm.paddingUnit - (len(contentData) % bm.paddingUnit); missing > 0 { contentData, err = appendRandomBytes(contentData, missing) diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index b09b1c783..3e20e991e 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -389,15 +389,19 @@ func TestDeleteContent(t *testing.T) { content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) bm.Flush(ctx) content2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100)) + log.Infof("xxx deleting.") if err := bm.DeleteContent(content1); err != nil { - t.Errorf("unable to delete content: %v", content1) + t.Fatalf("unable to delete content %v: %v", content1, err) } + log.Infof("yyy deleting.") if err := bm.DeleteContent(content2); err != nil { - t.Errorf("unable to delete content: %v", content1) + t.Fatalf("unable to delete content %v: %v", content2, err) } verifyContentNotFound(ctx, t, bm, content1) verifyContentNotFound(ctx, t, bm, content2) + log.Infof("flushing") bm.Flush(ctx) + log.Infof("flushed") log.Debugf("-----------") bm = newTestContentManager(data, keyTime, nil) verifyContentNotFound(ctx, t, bm, content1) @@ -620,7 +624,7 @@ func TestIterateContents(t *testing.T) { // pending, deleted - is completely discarded contentID4 := writeContentAndVerify(ctx, t, bm, seededRandomData(13, 100)) if err := bm.DeleteContent(contentID4); err != nil { - t.Errorf("error deleting content 4 %v", err) + t.Fatalf("error deleting content 4 %v", err) } t.Logf("contentID1: %v", contentID1) t.Logf("contentID2: %v", contentID2) @@ -738,9 +742,12 @@ func TestFindUnreferencedBlobs(t *testing.T) { bm := newTestContentManager(data, keyTime, nil) verifyUnreferencedBlobsCount(ctx, t, bm, 0) contentID := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) + log.Infof("flushing") if err := bm.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) } + dumpContents(t, bm, "after flush #1") + dumpContentManagerData(t, data) verifyUnreferencedBlobsCount(ctx, t, bm, 0) if err := bm.DeleteContent(contentID); err != nil { t.Errorf("error deleting content: %v", contentID) @@ -749,6 +756,8 @@ func TestFindUnreferencedBlobs(t *testing.T) { t.Errorf("flush error: %v", err) } + dumpContents(t, bm, "after flush #2") + dumpContentManagerData(t, data) // content still present in first pack verifyUnreferencedBlobsCount(ctx, t, bm, 0) @@ -793,7 +802,7 @@ func TestFindUnreferencedBlobs2(t *testing.T) { func dumpContents(t *testing.T, bm *Manager, caption string) { t.Helper() count := 0 - log.Infof("finished dumping %v contents", caption) + log.Infof("dumping %v contents", caption) if err := bm.IterateContents(IterateOptions{IncludeDeleted: true}, func(ci Info) error { log.Debugf(" ci[%v]=%#v", count, ci) @@ -1056,19 +1065,20 @@ func hashValue(b []byte) string { func dumpContentManagerData(t *testing.T, data blobtesting.DataMap) { t.Helper() + log.Infof("***data - %v items", len(data)) for k, v := range data { if k[0] == 'n' { ndx, err := openPackIndex(bytes.NewReader(v)) if err == nil { - t.Logf("index %v (%v bytes)", k, len(v)) + log.Infof("index %v (%v bytes)", k, len(v)) assertNoError(t, ndx.Iterate("", func(i Info) error { - t.Logf(" %+v\n", i) + log.Infof(" %+v\n", i) return nil })) - } } else { - t.Logf("data %v (%v bytes)\n", k, len(v)) + log.Infof("non-index %v (%v bytes)\n", k, len(v)) } } + log.Infof("*** end of data") }