content: refactored internal state

Previously 'packIndexBuilder' contained both contents that have been
written to packs and the ones that have not.

This change makes it so that 'packIndexBuilder' only contains contents
from flushed packs, but non pending ones. It will help parallelize
writes later.
This commit is contained in:
Jarek Kowalski
2019-08-26 22:19:36 -07:00
parent e097e8be84
commit 439e19d5c8
4 changed files with 52 additions and 36 deletions

View File

@@ -8,7 +8,6 @@
"encoding/hex"
"fmt"
"os"
"reflect"
"sync"
"time"
@@ -69,7 +68,7 @@ type Manager struct {
locked bool
pendingPacks map[blob.ID]*pendingPackInfo
packIndexBuilder packIndexBuilder // contents that are in index currently being built (current pack and all packs saved but not committed)
packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed)
disableIndexFlushCount int
flushPackIndexesAfter time.Time // time when those indexes should be flushed
@@ -94,28 +93,29 @@ func (bm *Manager) DeleteContent(contentID ID) error {
log.Debugf("DeleteContent(%q)", contentID)
// We have this content in current pack index and it's already deleted there.
// remove from all pending packs
for _, pp := range bm.pendingPacks {
if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted {
delete(pp.currentPackItems, contentID)
return nil
}
}
// if found in committed index, add another entry that's marked for deletion
if bi, ok := bm.packIndexBuilder[contentID]; ok {
if !bi.Deleted {
if bi.PackBlobID == "" {
// added and never committed, just forget about it.
delete(bm.packIndexBuilder, contentID)
for _, pp := range bm.pendingPacks {
delete(pp.currentPackItems, contentID)
}
return nil
}
// added and committed.
// we have this content in index and it's not deleted.
bi2 := *bi
bi2.Deleted = true
bi2.TimestampSeconds = bm.timeNow().Unix()
bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2)
}
// we have this content in index and it already deleted - do nothing.
return nil
}
// We have this content in current pack index and it's already deleted there.
// see if the block existed before
bi, err := bm.committedContents.getContent(contentID)
if err != nil {
return err
@@ -130,6 +130,7 @@ func (bm *Manager) DeleteContent(contentID ID) error {
bi2 := bi
bi2.Deleted = true
bi2.TimestampSeconds = bm.timeNow().Unix()
bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2)
return nil
}
@@ -137,7 +138,6 @@ func (bm *Manager) DeleteContent(contentID ID) error {
//nolint:gocritic
// We're intentionally passing "i" by value
func (bm *Manager) setPendingContent(pp *pendingPackInfo, i Info) {
bm.packIndexBuilder.Add(i)
pp.currentPackItems[i.ID] = i
}
@@ -226,9 +226,6 @@ func (bm *Manager) verifyCurrentPackItemsLocked() {
bm.assertInvariant(cpi.ID == k, "content ID entry has invalid key: %v %v", cpi.ID, k)
bm.assertInvariant(cpi.Deleted || cpi.PackBlobID == "", "content ID entry has unexpected pack content ID %v: %v", cpi.ID, cpi.PackBlobID)
bm.assertInvariant(cpi.TimestampSeconds != 0, "content has no timestamp: %v", cpi.ID)
bi, ok := bm.packIndexBuilder[k]
bm.assertInvariant(ok, "content ID entry not present in pack index builder: %v", cpi.ID)
bm.assertInvariant(reflect.DeepEqual(*bi, cpi), "current pack index does not match pack index builder: %v", cpi, *bi)
}
}
}
@@ -236,10 +233,6 @@ func (bm *Manager) verifyCurrentPackItemsLocked() {
func (bm *Manager) verifyPackIndexBuilderLocked() {
for k, cpi := range bm.packIndexBuilder {
bm.assertInvariant(cpi.ID == k, "content ID entry has invalid key: %v %v", cpi.ID, k)
if _, ok := bm.findContentInPendingPacks(cpi.ID); ok {
// ignore contents also in current packs
continue
}
if cpi.Deleted {
bm.assertInvariant(cpi.PackBlobID == "", "content can't be both deleted and have a pack content: %v", cpi.ID)
} else {
@@ -328,15 +321,14 @@ func (bm *Manager) finishPackLocked(ctx context.Context, prefix blob.ID, pp *pen
if err := bm.writePackFileNotLocked(ctx, packFile, contentData); err != nil {
return errors.Wrap(err, "can't save pack data content")
}
formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData))
}
formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData))
for _, info := range packFileIndex {
bm.packIndexBuilder.Add(*info)
}
delete(bm.pendingPacks, prefix)
return nil
}

View File

@@ -77,7 +77,12 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter
// and possibly including deleted items.
func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error {
bm.lock()
pibClone := bm.packIndexBuilder.clone()
overlay := bm.packIndexBuilder.clone()
for _, pp := range bm.pendingPacks {
for _, pi := range pp.currentPackItems {
overlay.Add(pi)
}
}
bm.unlock()
callback, cleanup := maybeParallelExecutor(opts.Parallel, callback)
@@ -85,7 +90,7 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback
invokeCallback := func(i Info) error {
if !opts.IncludeDeleted {
if ci, ok := pibClone[i.ID]; ok {
if ci, ok := overlay[i.ID]; ok {
if ci.Deleted {
return nil
}
@@ -100,12 +105,12 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback
return callback(i)
}
if len(pibClone) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 {
if len(overlay) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 {
// fast path, invoke callback directly
invokeCallback = callback
}
for _, bi := range pibClone {
for _, bi := range overlay {
_ = invokeCallback(*bi)
}

View File

@@ -249,11 +249,16 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi
}
packFileIndex := packIndexBuilder{}
haveContent := false
for contentID, info := range pp.currentPackItems {
if info.Payload == nil {
// no payload, it's a deletion of a previously-committed content.
packFileIndex.Add(info)
continue
}
haveContent = true
var encrypted []byte
encrypted, err = bm.maybeEncryptContentDataForPacking(info.Payload, info.ID)
if err != nil {
@@ -283,6 +288,10 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi
return nil, nil, nil
}
if !haveContent {
return nil, packFileIndex, nil
}
if bm.paddingUnit > 0 {
if missing := bm.paddingUnit - (len(contentData) % bm.paddingUnit); missing > 0 {
contentData, err = appendRandomBytes(contentData, missing)

View File

@@ -389,15 +389,19 @@ func TestDeleteContent(t *testing.T) {
content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100))
bm.Flush(ctx)
content2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100))
log.Infof("xxx deleting.")
if err := bm.DeleteContent(content1); err != nil {
t.Errorf("unable to delete content: %v", content1)
t.Fatalf("unable to delete content %v: %v", content1, err)
}
log.Infof("yyy deleting.")
if err := bm.DeleteContent(content2); err != nil {
t.Errorf("unable to delete content: %v", content1)
t.Fatalf("unable to delete content %v: %v", content2, err)
}
verifyContentNotFound(ctx, t, bm, content1)
verifyContentNotFound(ctx, t, bm, content2)
log.Infof("flushing")
bm.Flush(ctx)
log.Infof("flushed")
log.Debugf("-----------")
bm = newTestContentManager(data, keyTime, nil)
verifyContentNotFound(ctx, t, bm, content1)
@@ -620,7 +624,7 @@ func TestIterateContents(t *testing.T) {
// pending, deleted - is completely discarded
contentID4 := writeContentAndVerify(ctx, t, bm, seededRandomData(13, 100))
if err := bm.DeleteContent(contentID4); err != nil {
t.Errorf("error deleting content 4 %v", err)
t.Fatalf("error deleting content 4 %v", err)
}
t.Logf("contentID1: %v", contentID1)
t.Logf("contentID2: %v", contentID2)
@@ -738,9 +742,12 @@ func TestFindUnreferencedBlobs(t *testing.T) {
bm := newTestContentManager(data, keyTime, nil)
verifyUnreferencedBlobsCount(ctx, t, bm, 0)
contentID := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100))
log.Infof("flushing")
if err := bm.Flush(ctx); err != nil {
t.Errorf("flush error: %v", err)
}
dumpContents(t, bm, "after flush #1")
dumpContentManagerData(t, data)
verifyUnreferencedBlobsCount(ctx, t, bm, 0)
if err := bm.DeleteContent(contentID); err != nil {
t.Errorf("error deleting content: %v", contentID)
@@ -749,6 +756,8 @@ func TestFindUnreferencedBlobs(t *testing.T) {
t.Errorf("flush error: %v", err)
}
dumpContents(t, bm, "after flush #2")
dumpContentManagerData(t, data)
// content still present in first pack
verifyUnreferencedBlobsCount(ctx, t, bm, 0)
@@ -793,7 +802,7 @@ func TestFindUnreferencedBlobs2(t *testing.T) {
func dumpContents(t *testing.T, bm *Manager, caption string) {
t.Helper()
count := 0
log.Infof("finished dumping %v contents", caption)
log.Infof("dumping %v contents", caption)
if err := bm.IterateContents(IterateOptions{IncludeDeleted: true},
func(ci Info) error {
log.Debugf(" ci[%v]=%#v", count, ci)
@@ -1056,19 +1065,20 @@ func hashValue(b []byte) string {
func dumpContentManagerData(t *testing.T, data blobtesting.DataMap) {
t.Helper()
log.Infof("***data - %v items", len(data))
for k, v := range data {
if k[0] == 'n' {
ndx, err := openPackIndex(bytes.NewReader(v))
if err == nil {
t.Logf("index %v (%v bytes)", k, len(v))
log.Infof("index %v (%v bytes)", k, len(v))
assertNoError(t, ndx.Iterate("", func(i Info) error {
t.Logf(" %+v\n", i)
log.Infof(" %+v\n", i)
return nil
}))
}
} else {
t.Logf("data %v (%v bytes)\n", k, len(v))
log.Infof("non-index %v (%v bytes)\n", k, len(v))
}
}
log.Infof("*** end of data")
}