From b4e6a70e09cc5f9700cc85b0a4dff3e8fbf85087 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 24 Oct 2017 21:05:59 -0700 Subject: [PATCH] renamed 'repo optimize' to 'block reindex', added repacking method and unit tests --- block/block_manager.go | 133 +++++++++++++++++- block/block_manager_test.go | 58 ++++++++ ...y_optimize.go => command_block_reindex.go} | 4 +- cli/command_block_repack.go | 11 +- cli/command_block_stats.go | 8 +- 5 files changed, 201 insertions(+), 13 deletions(-) rename cli/{command_repository_optimize.go => command_block_reindex.go} (73%) diff --git a/block/block_manager.go b/block/block_manager.go index a633301f8..fcc97195a 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -125,17 +125,18 @@ func (bm *Manager) addToIndexLocked(groupID, blockID string, ndx *packIndex, os m[blockID] = ndx } -func (bm *Manager) addToPack(packGroup string, blockID string, data []byte) error { - bm.lock() - defer bm.unlock() +func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte, force bool) error { + bm.assertLocked() if err := bm.ensurePackIndexesLoaded(); err != nil { return err } - // See if we already have this block ID in the pack. - if _, ok := bm.groupToBlockToIndex[packGroup][blockID]; ok { - return nil + if !force { + // See if we already have this block ID in the pack. + if _, ok := bm.groupToBlockToIndex[packGroup][blockID]; ok { + return nil + } } g := bm.ensurePackGroupLocked(packGroup, false) @@ -426,6 +427,64 @@ func removeEmptyIndexes(ndx packIndexes) packIndexes { return res } +func (bm *Manager) regroupPacksAndUnpacked(ndx packIndexes) packIndexes { + var res packIndexes + + allPacks := &packIndex{ + Items: map[string]offsetAndSize{}, + PackGroup: packObjectsPackGroup, + CreateTime: bm.timeNow(), + } + + allNonPacked := &packIndex{ + Items: map[string]offsetAndSize{}, + PackGroup: nonPackedObjectsPackGroup, + CreateTime: bm.timeNow(), + } + + inUsePackBlocks := map[string]bool{} + + // Iterate through all indexes, build merged index of all packs and all non-packed items. + for _, n := range ndx { + if n.PackGroup == packObjectsPackGroup { + for i, o := range n.Items { + allPacks.Items[i] = o + } + continue + } + + if n.PackGroup == nonPackedObjectsPackGroup { + for i, o := range n.Items { + allNonPacked.Items[i] = o + } + continue + } + + if n.PackBlockID != "" { + inUsePackBlocks[n.PackBlockID] = true + } + + res = append(res, n) + } + + // Now delete all pack blocks that are not in use. + for k := range allPacks.Items { + if !inUsePackBlocks[k] { + delete(allPacks.Items, k) + } + } + + if len(allPacks.Items) > 0 { + res = append(res, allPacks) + } + + if len(allNonPacked.Items) > 0 { + res = append(res, allNonPacked) + } + + return res +} + // CompactIndexes performs compaction of index blocks and optionally removes index blocks not present in the provided set. func (bm *Manager) CompactIndexes(cutoffTime time.Time, inUseBlocks map[string]bool) error { bm.lock() @@ -549,6 +608,7 @@ func (bm *Manager) compactIndexes(merged packIndexes, blockIDs []string, inUseBl } merged = removeEmptyIndexes(merged) + merged = bm.regroupPacksAndUnpacked(merged) if len(blockIDs) <= 1 && inUseBlocks == nil { log.Printf("skipping index compaction - already compacted") @@ -617,7 +677,10 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) { if bm.maxPackedContentLength > 0 && len(data) <= bm.maxPackedContentLength { blockID := bm.hashData(data) - err := bm.addToPack(groupID, blockID, data) + bm.lock() + defer bm.unlock() + + err := bm.addToPackLocked(groupID, blockID, data, false) return blockID, err } @@ -633,6 +696,62 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) { return blockID, nil } +// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size. +func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Time) error { + bm.lock() + defer bm.unlock() + + if groupID == "" || groupID == nonPackedObjectsPackGroup || groupID == packObjectsPackGroup { + return fmt.Errorf("invalid group ID: %q", groupID) + } + + if err := bm.ensurePackIndexesLoaded(); err != nil { + return err + } + + merged, _, err := bm.loadMergedPackIndexLocked(cutoffTime) + if err != nil { + return err + } + + var toRepackage []*packIndex + var totalBytes int64 + + for _, m := range merged { + if m.PackGroup == groupID && m.PackBlockID != "" { + bi, err := bm.blockInfoLocked(m.PackBlockID) + if err != nil { + return fmt.Errorf("unable to get info on block %q: %v", m.PackBlockID, err) + } + + if bi.Length <= maxLength { + toRepackage = append(toRepackage, m) + totalBytes += bi.Length + } + } + } + + log.Printf("%v blocks to re-package (%v total bytes)", len(toRepackage), totalBytes) + + for _, m := range toRepackage { + data, err := bm.getBlockInternalLocked(m.PackBlockID) + if err != nil { + return fmt.Errorf("can't fetch block %q for repackaging: %v", m.PackBlockID, err) + } + + for blockID, os := range m.Items { + log.Printf("re-packaging: %v %v", blockID, os) + + blockData := data[os.offset : os.offset+os.size] + if err := bm.addToPackLocked(groupID, blockID, blockData, true); err != nil { + return fmt.Errorf("unable to re-package %q: %v", blockID, err) + } + } + } + + return nil +} + func (bm *Manager) writeUnpackedBlockNotLocked(data []byte, prefix string, force bool) (string, error) { blockID := prefix + bm.hashData(data) diff --git a/block/block_manager_test.go b/block/block_manager_test.go index ae04b011e..a96698a7d 100644 --- a/block/block_manager_test.go +++ b/block/block_manager_test.go @@ -229,6 +229,64 @@ func TestBlockManagerPackIdentialToRawObject(t *testing.T) { } } +func TestBlockManagerRepack(t *testing.T) { + data := map[string][]byte{} + bm := newTestBlockManager(data) + + d1 := seededRandomData(1, 10) + d2 := seededRandomData(2, 20) + d3 := seededRandomData(3, 30) + + writeBlockAndVerify(t, bm, "g1", d1) + bm.Flush() + writeBlockAndVerify(t, bm, "g1", d2) + bm.Flush() + writeBlockAndVerify(t, bm, "g1", d3) + bm.Flush() + + // 3 data blocks, 3 index blocks. + if got, want := len(data), 6; got != want { + t.Errorf("unexpected block count: %v, wanted %v", got, want) + } + + if err := bm.Repackage("g1", 5, fakeTime); err != nil { + t.Errorf("repackage failure: %v", err) + } + bm.Flush() + + // nothing happened, still 3 data blocks, 3 index blocks. + if got, want := len(data), 6; got != want { + t.Errorf("unexpected block count: %v, wanted %v", got, want) + } + + setFakeTime(bm, fakeTime.Add(1*time.Second)) + + if err := bm.Repackage("g1", 30, fakeTime); err != nil { + t.Errorf("repackage failure: %v", err) + } + bm.Flush() + log.Printf("after repackage") + dumpBlockManagerData(data) + + // added one more data block + one mode index block. + if got, want := len(data), 8; got != want { + t.Errorf("unexpected block count: %v, wanted %v", got, want) + } + if err := bm.CompactIndexes(bm.timeNow(), nil); err != nil { + t.Errorf("compaction failure: %v", err) + } + log.Printf("after compaction") + dumpBlockManagerData(data) + + // old 3 data blocks still there + 1 new one + 1 compacted index + if got, want := len(data), 5; got != want { + t.Errorf("unexpected block count: %v, wanted %v", got, want) + dumpBlockManagerData(data) + } + // t.Error() + // dumpBlockManagerData(data) +} + func TestBlockManagerInternalFlush(t *testing.T) { data := map[string][]byte{} bm := newTestBlockManager(data) diff --git a/cli/command_repository_optimize.go b/cli/command_block_reindex.go similarity index 73% rename from cli/command_repository_optimize.go rename to cli/command_block_reindex.go index 884a0087f..32e23b680 100644 --- a/cli/command_repository_optimize.go +++ b/cli/command_block_reindex.go @@ -7,8 +7,8 @@ ) var ( - optimizeCommand = repositoryCommands.Command("optimize", "Optimize repository performance.") - optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of objects to optimize").Default("24h").Duration() + optimizeCommand = blockCommands.Command("reindex", "Optimize block indexes.") + optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of blocks to re-index").Default("24h").Duration() ) func runOptimizeCommand(context *kingpin.ParseContext) error { diff --git a/cli/command_block_repack.go b/cli/command_block_repack.go index 90114b5a5..c9b9eca73 100644 --- a/cli/command_block_repack.go +++ b/cli/command_block_repack.go @@ -1,20 +1,25 @@ package cli import ( + "time" + kingpin "gopkg.in/alecthomas/kingpin.v2" ) var ( blockRepackCommand = blockCommands.Command("repack", "Repackage small blocks into bigger ones") - blockRepackSizeThreshold = blockRepackCommand.Flag("threshold", "Min size of block to re-pack").Default("100000").Int64() + blockRepackGroup = blockRepackCommand.Flag("group", "Group to repack").Default("DIR").String() + blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Int64() + blockRepackMinAge = blockRepackCommand.Flag("min-age", "Minimum age to repack").Default("24h").Duration() ) func runBlockRepackAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - blocks := rep.Blocks.ListBlocks("", "packs") - _ = blocks + if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold, time.Now().Add(-*blockRepackMinAge)); err != nil { + return err + } return nil } diff --git a/cli/command_block_stats.go b/cli/command_block_stats.go index 7c8409052..fa3c5c56c 100644 --- a/cli/command_block_stats.go +++ b/cli/command_block_stats.go @@ -14,13 +14,19 @@ blockStatsCommand = blockCommands.Command("stats", "Block statistics") blockStatsKind = blockStatsCommand.Flag("kind", "Kinds of blocks").Default("logical").Enum("all", "logical", "physical", "packed", "nonpacked", "packs") blockStatsRaw = blockStatsCommand.Flag("raw", "Raw numbers").Short('r').Bool() + blockStatsGroup = blockStatsCommand.Flag("group", "Display stats about blocks belonging to a given group").String() ) func runBlockStatsAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - blocks := rep.Blocks.ListBlocks("", *blockStatsKind) + var blocks []block.Info + if *blockStatsGroup != "" { + blocks = rep.Blocks.ListGroupBlocks(*blockStatsGroup) + } else { + blocks = rep.Blocks.ListBlocks("", *blockStatsKind) + } sort.Slice(blocks, func(i, j int) bool { return blocks[i].Length < blocks[j].Length }) var sizeThreshold int64 = 10