content: remove lock while writing content

This commit is contained in:
Jarek Kowalski
2019-08-27 23:05:28 -07:00
parent ca51fdec0d
commit 2a6240ba3e
3 changed files with 95 additions and 73 deletions

View File

@@ -68,7 +68,8 @@ type Manager struct {
locked bool
pendingPacks map[blob.ID]*pendingPackInfo
packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed)
writingPacks []*pendingPackInfo // list of packs that are being written
packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed)
disableIndexFlushCount int
flushPackIndexesAfter time.Time // time when those indexes should be flushed
@@ -102,14 +103,17 @@ func (bm *Manager) DeleteContent(contentID ID) error {
}
}
// remove from all packs that are being written, since they will be committed to index soon
for _, pp := range bm.writingPacks {
if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted {
bm.deletePreexistingContent(bi)
return nil
}
}
// if found in committed index, add another entry that's marked for deletion
if bi, ok := bm.packIndexBuilder[contentID]; ok {
if !bi.Deleted {
// we have this content in index and it's not deleted.
bm.deletePreexistingContent(*bi)
}
// we have this content in index and it already deleted - do nothing.
bm.deletePreexistingContent(*bi)
return nil
}
@@ -119,11 +123,6 @@ func (bm *Manager) DeleteContent(contentID ID) error {
return err
}
if bi.Deleted {
// already deleted
return nil
}
bm.deletePreexistingContent(bi)
return nil
}
@@ -131,18 +130,26 @@ func (bm *Manager) DeleteContent(contentID ID) error {
// Intentionally passing bi by value.
// nolint:hugeParam
func (bm *Manager) deletePreexistingContent(ci Info) {
if ci.Deleted {
return
}
pp := bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(ci.ID))
ci.Deleted = true
ci.TimestampSeconds = bm.timeNow().Unix()
pp.currentPackItems[ci.ID] = ci
}
func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byte, isDeleted bool) error {
bm.assertLocked()
func (bm *Manager) addToPackUnlocked(ctx context.Context, contentID ID, data []byte, isDeleted bool) error {
prefix := packPrefixForContentID(contentID)
pp := bm.getOrCreatePendingPackInfoLocked(prefix)
bm.lock()
if bm.timeNow().After(bm.flushPackIndexesAfter) {
if err := bm.flushPackIndexesLocked(ctx); err != nil {
return err
}
}
pp := bm.getOrCreatePendingPackInfoLocked(prefix)
data = cloneBytes(data)
pp.currentPackDataLength += len(data)
pp.currentPackItems[contentID] = Info{
@@ -152,30 +159,20 @@ func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byt
Length: uint32(len(data)),
TimestampSeconds: bm.timeNow().Unix(),
}
if pp.currentPackDataLength >= bm.maxPackSize {
if err := bm.finishPackLocked(ctx, pp); err != nil {
return errors.Wrap(err, "unable to finish pack")
}
if err := bm.maybeFlushIndexesLocked(ctx); err != nil {
return err
}
shouldWrite := pp.currentPackDataLength >= bm.maxPackSize
if shouldWrite {
// we're about to write to storage without holding a lock
// remove from pendingPacks so other goroutine tries to mess with this pending pack.
delete(bm.pendingPacks, pp.prefix)
bm.writingPacks = append(bm.writingPacks, pp)
}
bm.unlock()
return nil
}
func (bm *Manager) maybeFlushIndexesLocked(ctx context.Context) error {
bm.assertLocked()
if bm.timeNow().After(bm.flushPackIndexesAfter) {
if err := bm.finishAllPacksLocked(ctx); err != nil {
return errors.Wrap(err, "finish all packs")
}
if err := bm.flushPackIndexesLocked(ctx); err != nil {
return err
// at this point we're unlocked so different goroutines can encrypt and
// save to storage in parallel.
if shouldWrite {
if err := bm.writePackAndAddToIndex(ctx, pp, false); err != nil {
return errors.Wrap(err, "unable to write pack")
}
}
@@ -285,13 +282,11 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error {
}
func (bm *Manager) finishAllPacksLocked(ctx context.Context) error {
for _, pp := range bm.pendingPacks {
if len(pp.currentPackItems) == 0 {
log.Debugf("no current pack entries")
continue
}
for prefix, pp := range bm.pendingPacks {
delete(bm.pendingPacks, prefix)
bm.writingPacks = append(bm.writingPacks, pp)
if err := bm.finishPackLocked(ctx, pp); err != nil {
if err := bm.writePackAndAddToIndex(ctx, pp, true); err != nil {
return errors.Wrap(err, "error writing pack content")
}
}
@@ -299,9 +294,7 @@ func (bm *Manager) finishAllPacksLocked(ctx context.Context) error {
return nil
}
func (bm *Manager) finishPackLocked(ctx context.Context, pp *pendingPackInfo) error {
bm.assertLocked()
func (bm *Manager) writePackAndAddToIndex(ctx context.Context, pp *pendingPackInfo, lockHeld bool) error {
contentID := make([]byte, 16)
if _, err := cryptorand.Read(contentID); err != nil {
return errors.Wrap(err, "unable to read crypto bytes")
@@ -320,14 +313,32 @@ func (bm *Manager) finishPackLocked(ctx context.Context, pp *pendingPackInfo) er
formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData))
}
// after the file has been writte, add pack index builder entries to index.
if !lockHeld {
bm.lock()
}
bm.writingPacks = removePendingPack(bm.writingPacks, pp)
for _, info := range packFileIndex {
bm.packIndexBuilder.Add(*info)
}
delete(bm.pendingPacks, pp.prefix)
if !lockHeld {
bm.unlock()
}
return nil
}
func removePendingPack(slice []*pendingPackInfo, pp *pendingPackInfo) []*pendingPackInfo {
result := slice[:0]
for _, p := range slice {
if p != pp {
result = append(result, p)
}
}
return result
}
// Close closes the content manager.
func (bm *Manager) Close() {
bm.contentCache.close()
@@ -347,7 +358,6 @@ func (bm *Manager) Flush(ctx context.Context) error {
if err := bm.flushPackIndexesLocked(ctx); err != nil {
return errors.Wrap(err, "error flushing indexes")
}
return nil
}
@@ -363,10 +373,7 @@ func (bm *Manager) RewriteContent(ctx context.Context, contentID ID) error {
return err
}
bm.lock()
defer bm.unlock()
return bm.addToPackLocked(ctx, contentID, data, bi.Deleted)
return bm.addToPackUnlocked(ctx, contentID, data, bi.Deleted)
}
func packPrefixForContentID(contentID ID) blob.ID {
@@ -402,10 +409,7 @@ func (bm *Manager) WriteContent(ctx context.Context, data []byte, prefix ID) (ID
}
}
log.Debugf("WriteContent(%q) - new", contentID)
bm.lock()
defer bm.unlock()
err := bm.addToPackLocked(ctx, contentID, data, false)
err := bm.addToPackUnlocked(ctx, contentID, data, false)
return contentID, err
}
@@ -429,15 +433,21 @@ func (bm *Manager) getContentInfo(contentID ID) (Info, error) {
// check added contents, not written to any packs yet.
for _, pp := range bm.pendingPacks {
bi, ok := pp.currentPackItems[contentID]
if ok {
return bi, nil
if ci, ok := pp.currentPackItems[contentID]; ok {
return ci, nil
}
}
// check contents being written to packs right now.
for _, pp := range bm.writingPacks {
if ci, ok := pp.currentPackItems[contentID]; ok {
return ci, nil
}
}
// added contents, written to packs but not yet added to indexes
if bi, ok := bm.packIndexBuilder[contentID]; ok {
return *bi, nil
if ci, ok := bm.packIndexBuilder[contentID]; ok {
return *ci, nil
}
// read from committed content index
@@ -477,9 +487,6 @@ func (bm *Manager) assertLocked() {
// Refresh reloads the committed content indexes.
func (bm *Manager) Refresh(ctx context.Context) (bool, error) {
bm.mu.Lock()
defer bm.mu.Unlock()
log.Debugf("Refresh started")
t0 := time.Now()
_, updated, err := bm.loadPackIndexesUnlocked(ctx)

View File

@@ -73,24 +73,36 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter
return callback, cleanup
}
// IterateContents invokes the provided callback for each content starting with a specified prefix
// and possibly including deleted items.
func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error {
func (bm *Manager) snapshotUncommittedItems() packIndexBuilder {
bm.lock()
defer bm.unlock()
overlay := bm.packIndexBuilder.clone()
for _, pp := range bm.pendingPacks {
for _, pi := range pp.currentPackItems {
overlay.Add(pi)
}
}
bm.unlock()
for _, pp := range bm.writingPacks {
for _, pi := range pp.currentPackItems {
overlay.Add(pi)
}
}
return overlay
}
// IterateContents invokes the provided callback for each content starting with a specified prefix
// and possibly including deleted items.
func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback) error {
callback, cleanup := maybeParallelExecutor(opts.Parallel, callback)
defer cleanup() //nolint:errcheck
uncommitted := bm.snapshotUncommittedItems()
invokeCallback := func(i Info) error {
if !opts.IncludeDeleted {
if ci, ok := overlay[i.ID]; ok {
if ci, ok := uncommitted[i.ID]; ok {
if ci.Deleted {
return nil
}
@@ -105,12 +117,12 @@ func (bm *Manager) IterateContents(opts IterateOptions, callback IterateCallback
return callback(i)
}
if len(overlay) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 {
if len(uncommitted) == 0 && opts.IncludeDeleted && opts.Prefix == "" && opts.Parallel <= 1 {
// fast path, invoke callback directly
invokeCallback = callback
}
for _, bi := range overlay {
for _, bi := range uncommitted {
_ = invokeCallback(*bi)
}

View File

@@ -387,7 +387,10 @@ func TestDeleteContent(t *testing.T) {
keyTime := map[blob.ID]time.Time{}
bm := newTestContentManager(data, keyTime, nil)
content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100))
bm.Flush(ctx)
if err := bm.Flush(ctx); err != nil {
t.Fatalf("error flushing: %v", err)
}
dumpContents(t, bm, "after first flush")
content2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100))
log.Infof("xxx deleting.")
if err := bm.DeleteContent(content1); err != nil {