added auto-compaction of block indexes

This commit is contained in:
Jarek Kowalski
2018-03-04 18:46:51 -08:00
parent 772ef9a286
commit 14bda2aefc

View File

@@ -30,6 +30,9 @@
defaultMaxPreambleLength = 32
defaultPaddingUnit = 4096
maxInlineContentLength = 100000 // amount of block data to store in the index block itself
autoCompactionBlockCount = 16
autoCompactionSafetyMargin = 5 * time.Minute // do not auto-compact if time since block was written is less than this
)
var zeroTime time.Time
@@ -58,7 +61,6 @@ type Manager struct {
mu sync.Mutex
locked bool
indexLoaded bool
blockIDToIndex map[string]*blockmgrpb.Index
packBlockIDToIndex map[string]*blockmgrpb.Index
pendingPackIndexes []*blockmgrpb.Index
@@ -86,10 +88,6 @@ func (bm *Manager) DeleteBlock(blockID string) error {
bm.lock()
defer bm.unlock()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return fmt.Errorf("can't load pack index: %v", err)
}
delete(bm.blockIDToIndex, blockID)
delete(bm.currentPackIndex.Items, blockID)
delete(bm.currentPackIndex.InlineItems, blockID)
@@ -118,10 +116,6 @@ func (bm *Manager) addToIndexLocked(blockID string, ndx *blockmgrpb.Index, packe
func (bm *Manager) addToPackLocked(blockID string, data []byte, force bool) error {
bm.assertLocked()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return fmt.Errorf("can't load pack index: %v", err)
}
if !force {
// See if we already have this block ID in the pack.
if _, ok := bm.blockIDToIndex[blockID]; ok {
@@ -414,22 +408,26 @@ func (bm *Manager) loadMergedPackIndexLocked() ([]*blockmgrpb.Index, []string, t
return merged, blockIDs, blocks[len(blocks)-1].Timestamp, nil
}
func (bm *Manager) ensurePackIndexesLoaded() error {
bm.assertLocked()
func (bm *Manager) initializeIndexes() error {
bm.lock()
defer bm.unlock()
if bm.indexLoaded {
return nil
}
merged, _, _, err := bm.loadMergedPackIndexLocked()
merged, blockIDs, latestBlockTime, err := bm.loadMergedPackIndexLocked()
if err != nil {
return err
}
log.Debug().Msgf("loaded %v index blocks with latest time %v", len(blockIDs), latestBlockTime.Local())
bm.indexLoaded = true
bm.blockIDToIndex, bm.packBlockIDToIndex = dedupeBlockIDsAndIndex(merged)
totalBlocks := len(bm.blockIDToIndex)
if len(blockIDs) >= autoCompactionBlockCount && latestBlockTime.Before(time.Now().Add(-autoCompactionSafetyMargin)) {
log.Debug().Msgf("auto compacting block indexes (block count %v exceeds threshold of %v)", len(blockIDs), autoCompactionBlockCount)
merged = removeEmptyIndexes(merged)
if _, err := bm.writePackIndexes(merged, &latestBlockTime); err != nil {
return err
}
}
totalBlocks := len(bm.blockIDToIndex)
log.Debug().Int("blocks", totalBlocks).Msgf("loaded indexes")
return nil
@@ -505,10 +503,6 @@ func (bm *Manager) ListBlocks(prefix string) ([]Info, error) {
var result []Info
if err := bm.ensurePackIndexesLoaded(); err != nil {
return nil, fmt.Errorf("can't load pack index: %v", err)
}
for b, ndx := range bm.blockIDToIndex {
if !strings.HasPrefix(b, prefix) {
continue
@@ -592,7 +586,6 @@ func (bm *Manager) IsStorageBlockInUse(storageBlockID string) bool {
bm.lock()
defer bm.unlock()
bm.ensurePackIndexesLoaded()
return bm.packBlockIDToIndex[storageBlockID] != nil
}
@@ -601,10 +594,6 @@ func (bm *Manager) Repackage(maxLength uint64) error {
bm.lock()
defer bm.unlock()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return err
}
merged, _, _, err := bm.loadMergedPackIndexLocked()
if err != nil {
return err
@@ -731,10 +720,6 @@ func (bm *Manager) GetBlock(blockID string) ([]byte, error) {
bm.lock()
defer bm.unlock()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return nil, fmt.Errorf("can't load pack index: %v", err)
}
if b, err := bm.getPendingBlockLocked(blockID); err == nil {
return b, nil
}
@@ -747,10 +732,6 @@ func (bm *Manager) BlockInfo(blockID string) (Info, error) {
bm.lock()
defer bm.unlock()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return Info{}, fmt.Errorf("can't load pack index: %v", err)
}
return bm.blockInfoLocked(blockID)
}
@@ -971,6 +952,10 @@ func newManagerWithTime(st storage.Storage, f FormattingOptions, caching Caching
m.startPackIndexLocked()
if err := m.initializeIndexes(); err != nil {
return nil, fmt.Errorf("unable initialize indexes: %v", err)
}
return m, nil
}
func getCompactedTimestamp(blk string) (time.Time, bool) {