renamed 'repo optimize' to 'block reindex', added repacking method and unit tests

This commit is contained in:
Jarek Kowalski
2017-10-24 21:05:59 -07:00
parent 2aa44eb454
commit b4e6a70e09
5 changed files with 201 additions and 13 deletions

View File

@@ -125,17 +125,18 @@ func (bm *Manager) addToIndexLocked(groupID, blockID string, ndx *packIndex, os
m[blockID] = ndx
}
func (bm *Manager) addToPack(packGroup string, blockID string, data []byte) error {
bm.lock()
defer bm.unlock()
func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte, force bool) error {
bm.assertLocked()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return err
}
// See if we already have this block ID in the pack.
if _, ok := bm.groupToBlockToIndex[packGroup][blockID]; ok {
return nil
if !force {
// See if we already have this block ID in the pack.
if _, ok := bm.groupToBlockToIndex[packGroup][blockID]; ok {
return nil
}
}
g := bm.ensurePackGroupLocked(packGroup, false)
@@ -426,6 +427,64 @@ func removeEmptyIndexes(ndx packIndexes) packIndexes {
return res
}
func (bm *Manager) regroupPacksAndUnpacked(ndx packIndexes) packIndexes {
var res packIndexes
allPacks := &packIndex{
Items: map[string]offsetAndSize{},
PackGroup: packObjectsPackGroup,
CreateTime: bm.timeNow(),
}
allNonPacked := &packIndex{
Items: map[string]offsetAndSize{},
PackGroup: nonPackedObjectsPackGroup,
CreateTime: bm.timeNow(),
}
inUsePackBlocks := map[string]bool{}
// Iterate through all indexes, build merged index of all packs and all non-packed items.
for _, n := range ndx {
if n.PackGroup == packObjectsPackGroup {
for i, o := range n.Items {
allPacks.Items[i] = o
}
continue
}
if n.PackGroup == nonPackedObjectsPackGroup {
for i, o := range n.Items {
allNonPacked.Items[i] = o
}
continue
}
if n.PackBlockID != "" {
inUsePackBlocks[n.PackBlockID] = true
}
res = append(res, n)
}
// Now delete all pack blocks that are not in use.
for k := range allPacks.Items {
if !inUsePackBlocks[k] {
delete(allPacks.Items, k)
}
}
if len(allPacks.Items) > 0 {
res = append(res, allPacks)
}
if len(allNonPacked.Items) > 0 {
res = append(res, allNonPacked)
}
return res
}
// CompactIndexes performs compaction of index blocks and optionally removes index blocks not present in the provided set.
func (bm *Manager) CompactIndexes(cutoffTime time.Time, inUseBlocks map[string]bool) error {
bm.lock()
@@ -549,6 +608,7 @@ func (bm *Manager) compactIndexes(merged packIndexes, blockIDs []string, inUseBl
}
merged = removeEmptyIndexes(merged)
merged = bm.regroupPacksAndUnpacked(merged)
if len(blockIDs) <= 1 && inUseBlocks == nil {
log.Printf("skipping index compaction - already compacted")
@@ -617,7 +677,10 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) {
if bm.maxPackedContentLength > 0 && len(data) <= bm.maxPackedContentLength {
blockID := bm.hashData(data)
err := bm.addToPack(groupID, blockID, data)
bm.lock()
defer bm.unlock()
err := bm.addToPackLocked(groupID, blockID, data, false)
return blockID, err
}
@@ -633,6 +696,62 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) {
return blockID, nil
}
// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size.
func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Time) error {
bm.lock()
defer bm.unlock()
if groupID == "" || groupID == nonPackedObjectsPackGroup || groupID == packObjectsPackGroup {
return fmt.Errorf("invalid group ID: %q", groupID)
}
if err := bm.ensurePackIndexesLoaded(); err != nil {
return err
}
merged, _, err := bm.loadMergedPackIndexLocked(cutoffTime)
if err != nil {
return err
}
var toRepackage []*packIndex
var totalBytes int64
for _, m := range merged {
if m.PackGroup == groupID && m.PackBlockID != "" {
bi, err := bm.blockInfoLocked(m.PackBlockID)
if err != nil {
return fmt.Errorf("unable to get info on block %q: %v", m.PackBlockID, err)
}
if bi.Length <= maxLength {
toRepackage = append(toRepackage, m)
totalBytes += bi.Length
}
}
}
log.Printf("%v blocks to re-package (%v total bytes)", len(toRepackage), totalBytes)
for _, m := range toRepackage {
data, err := bm.getBlockInternalLocked(m.PackBlockID)
if err != nil {
return fmt.Errorf("can't fetch block %q for repackaging: %v", m.PackBlockID, err)
}
for blockID, os := range m.Items {
log.Printf("re-packaging: %v %v", blockID, os)
blockData := data[os.offset : os.offset+os.size]
if err := bm.addToPackLocked(groupID, blockID, blockData, true); err != nil {
return fmt.Errorf("unable to re-package %q: %v", blockID, err)
}
}
}
return nil
}
func (bm *Manager) writeUnpackedBlockNotLocked(data []byte, prefix string, force bool) (string, error) {
blockID := prefix + bm.hashData(data)

View File

@@ -229,6 +229,64 @@ func TestBlockManagerPackIdentialToRawObject(t *testing.T) {
}
}
func TestBlockManagerRepack(t *testing.T) {
data := map[string][]byte{}
bm := newTestBlockManager(data)
d1 := seededRandomData(1, 10)
d2 := seededRandomData(2, 20)
d3 := seededRandomData(3, 30)
writeBlockAndVerify(t, bm, "g1", d1)
bm.Flush()
writeBlockAndVerify(t, bm, "g1", d2)
bm.Flush()
writeBlockAndVerify(t, bm, "g1", d3)
bm.Flush()
// 3 data blocks, 3 index blocks.
if got, want := len(data), 6; got != want {
t.Errorf("unexpected block count: %v, wanted %v", got, want)
}
if err := bm.Repackage("g1", 5, fakeTime); err != nil {
t.Errorf("repackage failure: %v", err)
}
bm.Flush()
// nothing happened, still 3 data blocks, 3 index blocks.
if got, want := len(data), 6; got != want {
t.Errorf("unexpected block count: %v, wanted %v", got, want)
}
setFakeTime(bm, fakeTime.Add(1*time.Second))
if err := bm.Repackage("g1", 30, fakeTime); err != nil {
t.Errorf("repackage failure: %v", err)
}
bm.Flush()
log.Printf("after repackage")
dumpBlockManagerData(data)
// added one more data block + one mode index block.
if got, want := len(data), 8; got != want {
t.Errorf("unexpected block count: %v, wanted %v", got, want)
}
if err := bm.CompactIndexes(bm.timeNow(), nil); err != nil {
t.Errorf("compaction failure: %v", err)
}
log.Printf("after compaction")
dumpBlockManagerData(data)
// old 3 data blocks still there + 1 new one + 1 compacted index
if got, want := len(data), 5; got != want {
t.Errorf("unexpected block count: %v, wanted %v", got, want)
dumpBlockManagerData(data)
}
// t.Error()
// dumpBlockManagerData(data)
}
func TestBlockManagerInternalFlush(t *testing.T) {
data := map[string][]byte{}
bm := newTestBlockManager(data)

View File

@@ -7,8 +7,8 @@
)
var (
optimizeCommand = repositoryCommands.Command("optimize", "Optimize repository performance.")
optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of objects to optimize").Default("24h").Duration()
optimizeCommand = blockCommands.Command("reindex", "Optimize block indexes.")
optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of blocks to re-index").Default("24h").Duration()
)
func runOptimizeCommand(context *kingpin.ParseContext) error {

View File

@@ -1,20 +1,25 @@
package cli
import (
"time"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
var (
blockRepackCommand = blockCommands.Command("repack", "Repackage small blocks into bigger ones")
blockRepackSizeThreshold = blockRepackCommand.Flag("threshold", "Min size of block to re-pack").Default("100000").Int64()
blockRepackGroup = blockRepackCommand.Flag("group", "Group to repack").Default("DIR").String()
blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Int64()
blockRepackMinAge = blockRepackCommand.Flag("min-age", "Minimum age to repack").Default("24h").Duration()
)
func runBlockRepackAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
blocks := rep.Blocks.ListBlocks("", "packs")
_ = blocks
if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold, time.Now().Add(-*blockRepackMinAge)); err != nil {
return err
}
return nil
}

View File

@@ -14,13 +14,19 @@
blockStatsCommand = blockCommands.Command("stats", "Block statistics")
blockStatsKind = blockStatsCommand.Flag("kind", "Kinds of blocks").Default("logical").Enum("all", "logical", "physical", "packed", "nonpacked", "packs")
blockStatsRaw = blockStatsCommand.Flag("raw", "Raw numbers").Short('r').Bool()
blockStatsGroup = blockStatsCommand.Flag("group", "Display stats about blocks belonging to a given group").String()
)
func runBlockStatsAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
blocks := rep.Blocks.ListBlocks("", *blockStatsKind)
var blocks []block.Info
if *blockStatsGroup != "" {
blocks = rep.Blocks.ListGroupBlocks(*blockStatsGroup)
} else {
blocks = rep.Blocks.ListBlocks("", *blockStatsKind)
}
sort.Slice(blocks, func(i, j int) bool { return blocks[i].Length < blocks[j].Length })
var sizeThreshold int64 = 10