diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 507c349a2..7b3edace1 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -78,6 +78,7 @@ type Manager struct { } type pendingPackInfo struct { + prefix blob.ID currentPackItems map[ID]Info // contents that are in the pack content currently being built (all inline) currentPackDataLength int // total length of all items in the current pack content } @@ -105,10 +106,7 @@ func (bm *Manager) DeleteContent(contentID ID) error { if bi, ok := bm.packIndexBuilder[contentID]; ok { if !bi.Deleted { // we have this content in index and it's not deleted. - bi2 := *bi - bi2.Deleted = true - bi2.TimestampSeconds = bm.timeNow().Unix() - bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2) + bm.deletePreexistingContent(*bi) } // we have this content in index and it already deleted - do nothing. @@ -126,19 +124,17 @@ func (bm *Manager) DeleteContent(contentID ID) error { return nil } - // object present but not deleted, mark for deletion and add to pending - bi2 := bi - bi2.Deleted = true - bi2.TimestampSeconds = bm.timeNow().Unix() - - bm.setPendingContent(bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(contentID)), bi2) + bm.deletePreexistingContent(bi) return nil } -//nolint:gocritic -// We're intentionally passing "i" by value -func (bm *Manager) setPendingContent(pp *pendingPackInfo, i Info) { - pp.currentPackItems[i.ID] = i +// Intentionally passing bi by value. +// nolint:hugeParam +func (bm *Manager) deletePreexistingContent(ci Info) { + pp := bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(ci.ID)) + ci.Deleted = true + ci.TimestampSeconds = bm.timeNow().Unix() + pp.currentPackItems[ci.ID] = ci } func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byte, isDeleted bool) error { @@ -149,16 +145,20 @@ func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byt data = cloneBytes(data) pp.currentPackDataLength += len(data) - bm.setPendingContent(pp, Info{ + pp.currentPackItems[contentID] = Info{ Deleted: isDeleted, ID: contentID, Payload: data, Length: uint32(len(data)), TimestampSeconds: bm.timeNow().Unix(), - }) + } if pp.currentPackDataLength >= bm.maxPackSize { - if err := bm.finishPackAndMaybeFlushIndexesLocked(ctx, prefix, pp); err != nil { + if err := bm.finishPackLocked(ctx, pp); err != nil { + return errors.Wrap(err, "unable to finish pack") + } + + if err := bm.maybeFlushIndexesLocked(ctx); err != nil { return err } } @@ -166,13 +166,9 @@ func (bm *Manager) addToPackLocked(ctx context.Context, contentID ID, data []byt return nil } -func (bm *Manager) finishPackAndMaybeFlushIndexesLocked(ctx context.Context, prefix blob.ID, pp *pendingPackInfo) error { +func (bm *Manager) maybeFlushIndexesLocked(ctx context.Context) error { bm.assertLocked() - if err := bm.finishPackLocked(ctx, prefix, pp); err != nil { - return errors.Wrap(err, "unable to finish pack") - } - if bm.timeNow().After(bm.flushPackIndexesAfter) { if err := bm.finishAllPacksLocked(ctx); err != nil { return errors.Wrap(err, "finish all packs") @@ -289,13 +285,13 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { } func (bm *Manager) finishAllPacksLocked(ctx context.Context) error { - for prefix, pp := range bm.pendingPacks { + for _, pp := range bm.pendingPacks { if len(pp.currentPackItems) == 0 { log.Debugf("no current pack entries") continue } - if err := bm.finishPackLocked(ctx, prefix, pp); err != nil { + if err := bm.finishPackLocked(ctx, pp); err != nil { return errors.Wrap(err, "error writing pack content") } } @@ -303,7 +299,7 @@ func (bm *Manager) finishAllPacksLocked(ctx context.Context) error { return nil } -func (bm *Manager) finishPackLocked(ctx context.Context, prefix blob.ID, pp *pendingPackInfo) error { +func (bm *Manager) finishPackLocked(ctx context.Context, pp *pendingPackInfo) error { bm.assertLocked() contentID := make([]byte, 16) @@ -311,7 +307,7 @@ func (bm *Manager) finishPackLocked(ctx context.Context, prefix blob.ID, pp *pen return errors.Wrap(err, "unable to read crypto bytes") } - packFile := blob.ID(fmt.Sprintf("%v%x", prefix, contentID)) + packFile := blob.ID(fmt.Sprintf("%v%x", pp.prefix, contentID)) contentData, packFileIndex, err := bm.preparePackDataContent(ctx, pp, packFile) if err != nil { return errors.Wrap(err, "error preparing data content") @@ -328,7 +324,7 @@ func (bm *Manager) finishPackLocked(ctx context.Context, prefix blob.ID, pp *pen bm.packIndexBuilder.Add(*info) } - delete(bm.pendingPacks, prefix) + delete(bm.pendingPacks, pp.prefix) return nil } @@ -383,6 +379,7 @@ func packPrefixForContentID(contentID ID) blob.ID { func (bm *Manager) getOrCreatePendingPackInfoLocked(prefix blob.ID) *pendingPackInfo { if bm.pendingPacks[prefix] == nil { bm.pendingPacks[prefix] = &pendingPackInfo{ + prefix: prefix, currentPackItems: map[ID]Info{}, } } @@ -430,9 +427,12 @@ func (bm *Manager) getContentInfo(contentID ID) (Info, error) { bm.lock() defer bm.unlock() - // check added contents, not written to any packs. - if bi, ok := bm.findContentInPendingPacks(contentID); ok { - return bi, nil + // check added contents, not written to any packs yet. + for _, pp := range bm.pendingPacks { + bi, ok := pp.currentPackItems[contentID] + if ok { + return bi, nil + } } // added contents, written to packs but not yet added to indexes @@ -444,17 +444,6 @@ func (bm *Manager) getContentInfo(contentID ID) (Info, error) { return bm.committedContents.getContent(contentID) } -func (bm *Manager) findContentInPendingPacks(contentID ID) (Info, bool) { - for _, pp := range bm.pendingPacks { - bi, ok := pp.currentPackItems[contentID] - if ok { - return bi, true - } - } - - return Info{}, false -} - // ContentInfo returns information about a single content. func (bm *Manager) ContentInfo(ctx context.Context, contentID ID) (Info, error) { bi, err := bm.getContentInfo(contentID)