diff --git a/block/block_manager.go b/block/block_manager.go index 167aa5188..66c5537d2 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -30,6 +30,9 @@ defaultMaxPreambleLength = 32 defaultPaddingUnit = 4096 maxInlineContentLength = 100000 // amount of block data to store in the index block itself + + autoCompactionBlockCount = 16 + autoCompactionSafetyMargin = 5 * time.Minute // do not auto-compact if time since block was written is less than this ) var zeroTime time.Time @@ -58,7 +61,6 @@ type Manager struct { mu sync.Mutex locked bool - indexLoaded bool blockIDToIndex map[string]*blockmgrpb.Index packBlockIDToIndex map[string]*blockmgrpb.Index pendingPackIndexes []*blockmgrpb.Index @@ -86,10 +88,6 @@ func (bm *Manager) DeleteBlock(blockID string) error { bm.lock() defer bm.unlock() - if err := bm.ensurePackIndexesLoaded(); err != nil { - return fmt.Errorf("can't load pack index: %v", err) - } - delete(bm.blockIDToIndex, blockID) delete(bm.currentPackIndex.Items, blockID) delete(bm.currentPackIndex.InlineItems, blockID) @@ -118,10 +116,6 @@ func (bm *Manager) addToIndexLocked(blockID string, ndx *blockmgrpb.Index, packe func (bm *Manager) addToPackLocked(blockID string, data []byte, force bool) error { bm.assertLocked() - if err := bm.ensurePackIndexesLoaded(); err != nil { - return fmt.Errorf("can't load pack index: %v", err) - } - if !force { // See if we already have this block ID in the pack. if _, ok := bm.blockIDToIndex[blockID]; ok { @@ -414,22 +408,26 @@ func (bm *Manager) loadMergedPackIndexLocked() ([]*blockmgrpb.Index, []string, t return merged, blockIDs, blocks[len(blocks)-1].Timestamp, nil } -func (bm *Manager) ensurePackIndexesLoaded() error { - bm.assertLocked() +func (bm *Manager) initializeIndexes() error { + bm.lock() + defer bm.unlock() - if bm.indexLoaded { - return nil - } - - merged, _, _, err := bm.loadMergedPackIndexLocked() + merged, blockIDs, latestBlockTime, err := bm.loadMergedPackIndexLocked() if err != nil { return err } + log.Debug().Msgf("loaded %v index blocks with latest time %v", len(blockIDs), latestBlockTime.Local()) - bm.indexLoaded = true bm.blockIDToIndex, bm.packBlockIDToIndex = dedupeBlockIDsAndIndex(merged) - totalBlocks := len(bm.blockIDToIndex) + if len(blockIDs) >= autoCompactionBlockCount && latestBlockTime.Before(time.Now().Add(-autoCompactionSafetyMargin)) { + log.Debug().Msgf("auto compacting block indexes (block count %v exceeds threshold of %v)", len(blockIDs), autoCompactionBlockCount) + merged = removeEmptyIndexes(merged) + if _, err := bm.writePackIndexes(merged, &latestBlockTime); err != nil { + return err + } + } + totalBlocks := len(bm.blockIDToIndex) log.Debug().Int("blocks", totalBlocks).Msgf("loaded indexes") return nil @@ -505,10 +503,6 @@ func (bm *Manager) ListBlocks(prefix string) ([]Info, error) { var result []Info - if err := bm.ensurePackIndexesLoaded(); err != nil { - return nil, fmt.Errorf("can't load pack index: %v", err) - } - for b, ndx := range bm.blockIDToIndex { if !strings.HasPrefix(b, prefix) { continue @@ -592,7 +586,6 @@ func (bm *Manager) IsStorageBlockInUse(storageBlockID string) bool { bm.lock() defer bm.unlock() - bm.ensurePackIndexesLoaded() return bm.packBlockIDToIndex[storageBlockID] != nil } @@ -601,10 +594,6 @@ func (bm *Manager) Repackage(maxLength uint64) error { bm.lock() defer bm.unlock() - if err := bm.ensurePackIndexesLoaded(); err != nil { - return err - } - merged, _, _, err := bm.loadMergedPackIndexLocked() if err != nil { return err @@ -731,10 +720,6 @@ func (bm *Manager) GetBlock(blockID string) ([]byte, error) { bm.lock() defer bm.unlock() - if err := bm.ensurePackIndexesLoaded(); err != nil { - return nil, fmt.Errorf("can't load pack index: %v", err) - } - if b, err := bm.getPendingBlockLocked(blockID); err == nil { return b, nil } @@ -747,10 +732,6 @@ func (bm *Manager) BlockInfo(blockID string) (Info, error) { bm.lock() defer bm.unlock() - if err := bm.ensurePackIndexesLoaded(); err != nil { - return Info{}, fmt.Errorf("can't load pack index: %v", err) - } - return bm.blockInfoLocked(blockID) } @@ -971,6 +952,10 @@ func newManagerWithTime(st storage.Storage, f FormattingOptions, caching Caching m.startPackIndexLocked() + if err := m.initializeIndexes(); err != nil { + return nil, fmt.Errorf("unable initialize indexes: %v", err) + } + return m, nil } func getCompactedTimestamp(blk string) (time.Time, bool) {