diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 3056262eb..507c349a2 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -8,7 +8,6 @@ "encoding/hex" "fmt" "os" - "reflect" "sync" "time" @@ -69,7 +68,7 @@ type Manager struct { locked bool pendingPacks map[blob.ID]*pendingPackInfo - packIndexBuilder packIndexBuilder // contents that are in index currently being built (current pack and all packs saved but not committed) + packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed) disableIndexFlushCount int flushPackIndexesAfter time.Time // time when those indexes should be flushed @@ -94,28 +93,29 @@ func (bm *Manager) DeleteContent(contentID ID) error { log.Debugf("DeleteContent(%q)", contentID) - // We have this content in current pack index and it's already deleted there. + // remove from all pending packs + for _, pp := range bm.pendingPacks { + if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted { + delete(pp.currentPackItems, contentID) + return nil + } + } + + // if found in committed index, add another entry that's marked for deletion if bi, ok := bm.packIndexBuilder[contentID]; ok { if !bi.Deleted { - if bi.PackBlobID == "" { - // added and never committed, just forget about it. - delete(bm.packIndexBuilder, contentID) - for _, pp := range bm.pendingPacks { - delete(pp.currentPackItems, contentID) - } - return nil - } - - // added and committed. + // we have this content in index and it's not deleted. bi2 := *bi bi2.Deleted = true bi2.TimestampSeconds = bm.timeNow().Unix() bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2) } + + // we have this content in index and it already deleted - do nothing. return nil } - // We have this content in current pack index and it's already deleted there. + // see if the block existed before bi, err := bm.committedContents.getContent(contentID) if err != nil { return err @@ -130,6 +130,7 @@ func (bm *Manager) DeleteContent(contentID ID) error { bi2 := bi bi2.Deleted = true bi2.TimestampSeconds = bm.timeNow().Unix() + bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2) return nil } @@ -137,7 +138,6 @@ func (bm *Manager) DeleteContent(contentID ID) error { //nolint:gocritic // We're intentionally passing "i" by value func (bm *Manager) setPendingContent(pp *pendingPackInfo, i Info) { - bm.packIndexBuilder.Add(i) pp.currentPackItems[i.ID] = i } @@ -226,9 +226,6 @@ func (bm *Manager) verifyCurrentPackItemsLocked() { bm.assertInvariant(cpi.ID == k, "content ID entry has invalid key: %v %v", cpi.ID, k) bm.assertInvariant(cpi.Deleted || cpi.PackBlobID == "", "content ID entry has unexpected pack content ID %v: %v", cpi.ID, cpi.PackBlobID) bm.assertInvariant(cpi.TimestampSeconds != 0, "content has no timestamp: %v", cpi.ID) - bi, ok := bm.packIndexBuilder[k] - bm.assertInvariant(ok, "content ID entry not present in pack index builder: %v", cpi.ID) - bm.assertInvariant(reflect.DeepEqual(*bi, cpi), "current pack index does not match pack index builder: %v", cpi, *bi) } } } @@ -236,10 +233,6 @@ func (bm *Manager) verifyCurrentPackItemsLocked() { func (bm *Manager) verifyPackIndexBuilderLocked() { for k, cpi := range bm.packIndexBuilder { bm.assertInvariant(cpi.ID == k, "content ID entry has invalid key: %v %v", cpi.ID, k) - if _, ok := bm.findContentInPendingPacks(cpi.ID); ok { - // ignore contents also in current packs - continue - } if cpi.Deleted { bm.assertInvariant(cpi.PackBlobID == "", "content can't be both deleted and have a pack content: %v", cpi.ID) } else { @@ -328,15 +321,14 @@ func (bm *Manager) finishPackLocked(ctx context.Context, prefix blob.ID, pp *pen if err := bm.writePackFileNotLocked(ctx, packFile, contentData); err != nil { return errors.Wrap(err, "can't save pack data content") } + formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData)) } - formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData)) for _, info := range packFileIndex { bm.packIndexBuilder.Add(*info) } delete(bm.pendingPacks, prefix) - return nil } diff --git a/repo/content/content_manager_iterate.go b/repo/content/content_manager_iterate.go index 6bed608f4..ec6775362 100644 --- a/repo/content/content_manager_iterate.go +++ b/repo/content/content_manager_iterate.go @@ -77,7 +77,12 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter // and possibly including deleted items. func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error { bm.lock() - pibClone := bm.packIndexBuilder.clone() + overlay := bm.packIndexBuilder.clone() + for _, pp := range bm.pendingPacks { + for _, pi := range pp.currentPackItems { + overlay.Add(pi) + } + } bm.unlock() callback, cleanup := maybeParallelExecutor(opts.Parallel, callback) @@ -85,7 +90,7 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback invokeCallback := func(i Info) error { if !opts.IncludeDeleted { - if ci, ok := pibClone[i.ID]; ok { + if ci, ok := overlay[i.ID]; ok { if ci.Deleted { return nil } @@ -100,12 +105,12 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback return callback(i) } - if len(pibClone) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { + if len(overlay) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 { // fast path, invoke callback directly invokeCallback = callback } - for _, bi := range pibClone { + for _, bi := range overlay { _ = invokeCallback(*bi) } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index ad3679b66..72e5d43fd 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -249,11 +249,16 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi } packFileIndex := packIndexBuilder{} + haveContent := false for contentID, info := range pp.currentPackItems { if info.Payload == nil { + // no payload, it's a deletion of a previously-committed content. + packFileIndex.Add(info) continue } + haveContent = true + var encrypted []byte encrypted, err = bm.maybeEncryptContentDataForPacking(info.Payload, info.ID) if err != nil { @@ -283,6 +288,10 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi return nil, nil, nil } + if !haveContent { + return nil, packFileIndex, nil + } + if bm.paddingUnit > 0 { if missing := bm.paddingUnit - (len(contentData) % bm.paddingUnit); missing > 0 { contentData, err = appendRandomBytes(contentData, missing) diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index b09b1c783..3e20e991e 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -389,15 +389,19 @@ func TestDeleteContent(t *testing.T) { content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) bm.Flush(ctx) content2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100)) + log.Infof("xxx deleting.") if err := bm.DeleteContent(content1); err != nil { - t.Errorf("unable to delete content: %v", content1) + t.Fatalf("unable to delete content %v: %v", content1, err) } + log.Infof("yyy deleting.") if err := bm.DeleteContent(content2); err != nil { - t.Errorf("unable to delete content: %v", content1) + t.Fatalf("unable to delete content %v: %v", content2, err) } verifyContentNotFound(ctx, t, bm, content1) verifyContentNotFound(ctx, t, bm, content2) + log.Infof("flushing") bm.Flush(ctx) + log.Infof("flushed") log.Debugf("-----------") bm = newTestContentManager(data, keyTime, nil) verifyContentNotFound(ctx, t, bm, content1) @@ -620,7 +624,7 @@ func TestIterateContents(t *testing.T) { // pending, deleted - is completely discarded contentID4 := writeContentAndVerify(ctx, t, bm, seededRandomData(13, 100)) if err := bm.DeleteContent(contentID4); err != nil { - t.Errorf("error deleting content 4 %v", err) + t.Fatalf("error deleting content 4 %v", err) } t.Logf("contentID1: %v", contentID1) t.Logf("contentID2: %v", contentID2) @@ -738,9 +742,12 @@ func TestFindUnreferencedBlobs(t *testing.T) { bm := newTestContentManager(data, keyTime, nil) verifyUnreferencedBlobsCount(ctx, t, bm, 0) contentID := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) + log.Infof("flushing") if err := bm.Flush(ctx); err != nil { t.Errorf("flush error: %v", err) } + dumpContents(t, bm, "after flush #1") + dumpContentManagerData(t, data) verifyUnreferencedBlobsCount(ctx, t, bm, 0) if err := bm.DeleteContent(contentID); err != nil { t.Errorf("error deleting content: %v", contentID) @@ -749,6 +756,8 @@ func TestFindUnreferencedBlobs(t *testing.T) { t.Errorf("flush error: %v", err) } + dumpContents(t, bm, "after flush #2") + dumpContentManagerData(t, data) // content still present in first pack verifyUnreferencedBlobsCount(ctx, t, bm, 0) @@ -793,7 +802,7 @@ func TestFindUnreferencedBlobs2(t *testing.T) { func dumpContents(t *testing.T, bm *Manager, caption string) { t.Helper() count := 0 - log.Infof("finished dumping %v contents", caption) + log.Infof("dumping %v contents", caption) if err := bm.IterateContents(IterateOptions{IncludeDeleted: true}, func(ci Info) error { log.Debugf(" ci[%v]=%#v", count, ci) @@ -1056,19 +1065,20 @@ func hashValue(b []byte) string { func dumpContentManagerData(t *testing.T, data blobtesting.DataMap) { t.Helper() + log.Infof("***data - %v items", len(data)) for k, v := range data { if k[0] == 'n' { ndx, err := openPackIndex(bytes.NewReader(v)) if err == nil { - t.Logf("index %v (%v bytes)", k, len(v)) + log.Infof("index %v (%v bytes)", k, len(v)) assertNoError(t, ndx.Iterate("", func(i Info) error { - t.Logf(" %+v\n", i) + log.Infof(" %+v\n", i) return nil })) - } } else { - t.Logf("data %v (%v bytes)\n", k, len(v)) + log.Infof("non-index %v (%v bytes)\n", k, len(v)) } } + log.Infof("*** end of data") }