optimized format for pack index, added compacted block with suffix -zTIMESTAMP where TIMESTAMP is base16-encoded unix nanoseconds of all blocks that this block supersedes, this allows much more efficient loading of blocks without having to delete anything

This commit is contained in:
Jarek Kowalski
2017-11-23 15:55:56 -08:00
parent 29ed48f734
commit d3b854ef8e
6 changed files with 142 additions and 85 deletions

View File

@@ -8,6 +8,7 @@
"io"
"log"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
@@ -23,9 +24,13 @@
packBlockPrefix = "P" // prefix for all storage blocks that are pack indexes
nonPackedObjectsPackGroup = "raw" // ID of pack group that stores non-packed objects that don't belong to any group
packObjectsPackGroup = "packs" // ID of pack group that stores pack blocks themselves
compactedBlockSuffix = "-z"
maxIndexBlockUploadTime = 1 * time.Minute
maxNonPackedBlocksPerPackIndex = 200
)
var zeroTime time.Time
type packInfo struct {
currentPackData []byte
currentPackIndex *packIndex
@@ -243,7 +248,7 @@ func (bm *Manager) flushPackIndexesLocked() error {
if false {
log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes))
}
if _, err := bm.writePackIndexes(bm.pendingPackIndexes); err != nil {
if _, err := bm.writePackIndexes(bm.pendingPackIndexes, nil); err != nil {
return err
}
}
@@ -253,7 +258,7 @@ func (bm *Manager) flushPackIndexesLocked() error {
return nil
}
func (bm *Manager) writePackIndexes(ndx packIndexes) (string, error) {
func (bm *Manager) writePackIndexes(ndx packIndexes, replacesBlockBeforeTime *time.Time) (string, error) {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
@@ -262,7 +267,12 @@ func (bm *Manager) writePackIndexes(ndx packIndexes) (string, error) {
}
zw.Close()
return bm.writeUnpackedBlockNotLocked(buf.Bytes(), packBlockPrefix, true)
var suffix string
if replacesBlockBeforeTime != nil {
suffix = fmt.Sprintf("%v%x", compactedBlockSuffix, replacesBlockBeforeTime.UnixNano())
}
return bm.writeUnpackedBlockNotLocked(buf.Bytes(), packBlockPrefix, suffix, true)
}
func (bm *Manager) finishAllOpenPacksLocked() error {
@@ -294,7 +304,7 @@ func (bm *Manager) finishPackLocked(g *packInfo) error {
if len(g.currentPackIndex.Items)+len(g.currentPackIndex.DeletedItems) > 0 {
if g.currentPackData != nil {
dataLength := len(g.currentPackData)
blockID, err := bm.writeUnpackedBlockNotLocked(g.currentPackData, "", true)
blockID, err := bm.writeUnpackedBlockNotLocked(g.currentPackData, "", "", true)
if err != nil {
return fmt.Errorf("can't save pack data block %q: %v", blockID, err)
}
@@ -311,10 +321,95 @@ func (bm *Manager) finishPackLocked(g *packInfo) error {
return nil
}
func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes, []string, error) {
// IndexBlocks returns the list of all index blocks, including inactive, sorted by time.
func (bm *Manager) ListIndexBlocks() ([]Info, error) {
ch, cancel := bm.storage.ListBlocks(packBlockPrefix)
defer cancel()
var blocks []Info
for b := range ch {
if b.Error != nil {
return nil, fmt.Errorf("error listing index blocks: %v", b.Error)
}
blocks = append(blocks, Info{
BlockID: b.BlockID,
Length: b.Length,
Timestamp: b.TimeStamp,
})
}
sortBlocksByTime(blocks)
return blocks, nil
}
// ActiveIndexBlocks returns the list of active index blocks, sorted by time.
func (bm *Manager) ActiveIndexBlocks() ([]Info, error) {
blocks, err := bm.ListIndexBlocks()
if len(blocks) == 0 {
return nil, nil
}
cutoffTime, err := findLatestCompactedTimestamp(blocks)
if err != nil {
return nil, err
}
var activeBlocks []Info
for _, b := range blocks {
if b.Timestamp.After(cutoffTime) {
activeBlocks = append(activeBlocks, b)
}
}
sortBlocksByTime(activeBlocks)
return activeBlocks, nil
}
func sortBlocksByTime(b []Info) {
sort.Slice(b, func(i, j int) bool {
return b[i].Timestamp.Before(b[j].Timestamp)
})
}
func findLatestCompactedTimestamp(blocks []Info) (time.Time, error) {
// look for blocks that end with -ztimestamp
// find the latest such timestamp.
var latestTime time.Time
for _, b := range blocks {
blk := b.BlockID
if p := strings.Index(blk, compactedBlockSuffix); p >= 0 {
unixNano, err := strconv.ParseInt(blk[p+len(compactedBlockSuffix):], 16, 64)
if err != nil {
return latestTime, fmt.Errorf("malformed index block name %q", blk)
}
ts := time.Unix(unixNano/1e9, unixNano%1e9)
if ts.After(latestTime) {
latestTime = ts
}
}
}
return latestTime, nil
}
func (bm *Manager) loadMergedPackIndexLocked() (packIndexes, []string, time.Time, error) {
blocks, err := bm.ActiveIndexBlocks()
if err != nil {
return nil, nil, time.Now(), err
}
// add block IDs to the channel
ch := make(chan string, len(blocks))
go func() {
for _, b := range blocks {
ch <- b.BlockID
}
close(ch)
}()
t0 := time.Now()
var wg sync.WaitGroup
@@ -332,12 +427,7 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes,
defer wg.Done()
for b := range ch {
if b.Error != nil {
errors <- b.Error
return
}
data, err := bm.getBlockInternalLocked(b.BlockID)
data, err := bm.getBlockInternalLocked(b)
if err != nil {
errors <- err
return
@@ -356,12 +446,8 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes,
return
}
if hasPackCreateAfter(pi, cutoffTime) {
continue
}
mu.Lock()
blockIDs = append(blockIDs, b.BlockID)
blockIDs = append(blockIDs, b)
indexes = append(indexes, pi)
totalSize += len(data)
mu.Unlock()
@@ -374,7 +460,7 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes,
// Propagate async errors, if any.
for err := range errors {
return nil, nil, err
return nil, nil, time.Now(), err
}
if false {
@@ -386,17 +472,7 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes,
merged = append(merged, pi...)
}
return merged, blockIDs, nil
}
func hasPackCreateAfter(pi packIndexes, t time.Time) bool {
for _, ndx := range pi {
if ndx.CreateTime.After(t) {
return true
}
}
return false
return merged, blockIDs, blocks[len(blocks)-1].Timestamp, nil
}
func (bm *Manager) ensurePackIndexesLoaded() error {
@@ -409,7 +485,7 @@ func (bm *Manager) ensurePackIndexesLoaded() error {
t0 := time.Now()
merged, _, err := bm.loadMergedPackIndexLocked(bm.timeNow().Add(24 * time.Hour))
merged, _, _, err := bm.loadMergedPackIndexLocked()
if err != nil {
return err
}
@@ -523,16 +599,16 @@ func (bm *Manager) regroupPacksAndUnpacked(ndx packIndexes) packIndexes {
}
// CompactIndexes performs compaction of index blocks and optionally removes index blocks not present in the provided set.
func (bm *Manager) CompactIndexes(cutoffTime time.Time, inUseBlocks map[string]bool) error {
func (bm *Manager) CompactIndexes() error {
bm.lock()
defer bm.unlock()
merged, blockIDs, err := bm.loadMergedPackIndexLocked(cutoffTime)
merged, blockIDs, latestBlockTime, err := bm.loadMergedPackIndexLocked()
if err != nil {
return err
}
if err := bm.compactIndexes(merged, blockIDs, inUseBlocks); err != nil {
if err := bm.compactIndexes(merged, blockIDs, latestBlockTime); err != nil {
return err
}
@@ -631,52 +707,20 @@ func (bm *Manager) ListGroupBlocks(groupID string) []Info {
return result
}
func (bm *Manager) compactIndexes(merged packIndexes, blockIDs []string, inUseBlocks map[string]bool) error {
func (bm *Manager) compactIndexes(merged packIndexes, blockIDs []string, latestBlockTime time.Time) error {
dedupeBlockIDsAndIndex(merged)
if inUseBlocks != nil {
for _, m := range merged {
for b := range m.Items {
if !inUseBlocks[b] {
//log.Printf("removing block in index but not in use: %q", b)
delete(m.Items, b)
}
}
}
}
merged = removeEmptyIndexes(merged)
merged = bm.regroupPacksAndUnpacked(merged)
if len(blockIDs) <= 1 && inUseBlocks == nil {
if len(blockIDs) <= 1 {
log.Printf("skipping index compaction - already compacted")
return nil
}
compactedBlock, err := bm.writePackIndexes(merged)
_, err := bm.writePackIndexes(merged, &latestBlockTime)
if err != nil {
return err
}
ch := makeStringChannel(blockIDs)
var wg sync.WaitGroup
for i := 0; i < parallelDeletes; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for blockID := range ch {
if blockID == compactedBlock {
log.Printf("warning: sanity check failed, not deleting freshly-written compacted index: %q", compactedBlock)
continue
}
if err := bm.storage.DeleteBlock(blockID); err != nil {
log.Printf("warning: unable to delete %q: %v", blockID, err)
}
}
}(i)
}
wg.Wait()
return nil
}
@@ -721,7 +765,7 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) {
return blockID, err
}
blockID, err := bm.writeUnpackedBlockNotLocked(data, "", false)
blockID, err := bm.writeUnpackedBlockNotLocked(data, "", "", false)
if err != nil {
return "", err
}
@@ -734,7 +778,7 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) {
}
// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size.
func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Time) error {
func (bm *Manager) Repackage(groupID string, maxLength int64) error {
bm.lock()
defer bm.unlock()
@@ -746,7 +790,7 @@ func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Ti
return err
}
merged, _, err := bm.loadMergedPackIndexLocked(cutoffTime)
merged, _, _, err := bm.loadMergedPackIndexLocked()
if err != nil {
return err
}
@@ -789,8 +833,8 @@ func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Ti
return nil
}
func (bm *Manager) writeUnpackedBlockNotLocked(data []byte, prefix string, force bool) (string, error) {
blockID := prefix + bm.hashData(data)
func (bm *Manager) writeUnpackedBlockNotLocked(data []byte, prefix string, suffix string, force bool) (string, error) {
blockID := prefix + bm.hashData(data) + suffix
if !force {
// Before performing encryption, check if the block is already there.
@@ -959,6 +1003,9 @@ func (bm *Manager) getBlockInternalLocked(blockID string) ([]byte, error) {
func (bm *Manager) verifyChecksum(data []byte, blockID string) error {
expected := bm.formatter.ComputeBlockID(data)
if p := strings.Index(blockID, compactedBlockSuffix); p >= 0 {
blockID = blockID[0:p]
}
if !strings.HasSuffix(blockID, expected) {
atomic.AddInt32(&bm.stats.InvalidBlocks, 1)
return fmt.Errorf("invalid checksum for blob: '%v', expected %v", blockID, expected)

View File

@@ -3,24 +3,39 @@
import (
"fmt"
"github.com/kopia/kopia/block"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
var (
blockIndexListCommand = blockIndexCommands.Command("list", "List block indexes").Alias("ls")
blockIndexListAll = blockIndexListCommand.Flag("all", "List all blocks, not just active ones").Short('a').Bool()
)
func runListBlockIndexesAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
ch, cancel := rep.Storage.ListBlocks("P")
defer cancel()
var blks []block.Info
var err error
for b := range ch {
fmt.Printf("%-34v %10v %v\n", b.BlockID, b.Length, b.TimeStamp.Local().Format(timeFormat))
if !*blockIndexListAll {
blks, err = rep.Blocks.ActiveIndexBlocks()
} else {
blks, err = rep.Blocks.ListIndexBlocks()
}
if err != nil {
return err
}
for _, b := range blks {
fmt.Printf("%-54v %10v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormatPrecise))
}
fmt.Printf("total %v blocks\n", len(blks))
return nil
}

View File

@@ -1,21 +1,18 @@
package cli
import (
"time"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
var (
optimizeCommand = blockIndexCommands.Command("optimize", "Optimize block indexes.")
optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of blocks to re-index").Default("24h").Duration()
)
func runOptimizeCommand(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
return rep.Blocks.CompactIndexes(time.Now().Add(-*optimizeMinAge), nil)
return rep.Blocks.CompactIndexes()
}
func init() {

View File

@@ -1,8 +1,6 @@
package cli
import (
"time"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
@@ -10,14 +8,13 @@
blockRepackCommand = blockCommands.Command("repack", "Repackage small blocks into bigger ones")
blockRepackGroup = blockRepackCommand.Flag("group", "Group to repack").Default("DIR").String()
blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Int64()
blockRepackMinAge = blockRepackCommand.Flag("min-age", "Minimum age to repack").Default("24h").Duration()
)
func runBlockRepackAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold, time.Now().Add(-*blockRepackMinAge)); err != nil {
if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold); err != nil {
return err
}

View File

@@ -13,6 +13,7 @@
)
const timeFormat = "02 Jan 06 15:04:05"
const timeFormatPrecise = "02 Jan 06 15:04:05.000000000"
var (
lsCommand = app.Command("list", "List a directory stored in repository object.").Alias("ls")

View File

@@ -235,7 +235,7 @@ func runCleanupCommand(context *kingpin.ParseContext) error {
log.Printf("Found %v in-use objects in %v blocks in %v", len(ctx.queue.visited), len(ctx.inuse), dt)
rep.Blocks.CompactIndexes(cutoffTime, ctx.inuse)
rep.Blocks.CompactIndexes()
var totalBlocks int
var totalBytes int64