From d3b854ef8e6c8dd377660e5afdc3626a9969de82 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 23 Nov 2017 15:55:56 -0800 Subject: [PATCH] optimized format for pack index, added compacted block with suffix -zTIMESTAMP where TIMESTAMP is base16-encoded unix nanoseconds of all blocks that this block supersedes, this allows much more efficient loading of blocks without having to delete anything --- block/block_manager.go | 191 +++++++++++++++++----------- cli/command_block_index_list.go | 23 +++- cli/command_block_index_optimize.go | 5 +- cli/command_block_repack.go | 5 +- cli/command_ls.go | 1 + cli/command_object_cleanup.go | 2 +- 6 files changed, 142 insertions(+), 85 deletions(-) diff --git a/block/block_manager.go b/block/block_manager.go index e788b5ba7..563c3e1d6 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -8,6 +8,7 @@ "io" "log" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -23,9 +24,13 @@ packBlockPrefix = "P" // prefix for all storage blocks that are pack indexes nonPackedObjectsPackGroup = "raw" // ID of pack group that stores non-packed objects that don't belong to any group packObjectsPackGroup = "packs" // ID of pack group that stores pack blocks themselves + compactedBlockSuffix = "-z" + maxIndexBlockUploadTime = 1 * time.Minute maxNonPackedBlocksPerPackIndex = 200 ) +var zeroTime time.Time + type packInfo struct { currentPackData []byte currentPackIndex *packIndex @@ -243,7 +248,7 @@ func (bm *Manager) flushPackIndexesLocked() error { if false { log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes)) } - if _, err := bm.writePackIndexes(bm.pendingPackIndexes); err != nil { + if _, err := bm.writePackIndexes(bm.pendingPackIndexes, nil); err != nil { return err } } @@ -253,7 +258,7 @@ func (bm *Manager) flushPackIndexesLocked() error { return nil } -func (bm *Manager) writePackIndexes(ndx packIndexes) (string, error) { +func (bm *Manager) writePackIndexes(ndx packIndexes, replacesBlockBeforeTime *time.Time) (string, error) { var buf bytes.Buffer zw := gzip.NewWriter(&buf) @@ -262,7 +267,12 @@ func (bm *Manager) writePackIndexes(ndx packIndexes) (string, error) { } zw.Close() - return bm.writeUnpackedBlockNotLocked(buf.Bytes(), packBlockPrefix, true) + var suffix string + if replacesBlockBeforeTime != nil { + suffix = fmt.Sprintf("%v%x", compactedBlockSuffix, replacesBlockBeforeTime.UnixNano()) + } + + return bm.writeUnpackedBlockNotLocked(buf.Bytes(), packBlockPrefix, suffix, true) } func (bm *Manager) finishAllOpenPacksLocked() error { @@ -294,7 +304,7 @@ func (bm *Manager) finishPackLocked(g *packInfo) error { if len(g.currentPackIndex.Items)+len(g.currentPackIndex.DeletedItems) > 0 { if g.currentPackData != nil { dataLength := len(g.currentPackData) - blockID, err := bm.writeUnpackedBlockNotLocked(g.currentPackData, "", true) + blockID, err := bm.writeUnpackedBlockNotLocked(g.currentPackData, "", "", true) if err != nil { return fmt.Errorf("can't save pack data block %q: %v", blockID, err) } @@ -311,10 +321,95 @@ func (bm *Manager) finishPackLocked(g *packInfo) error { return nil } -func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes, []string, error) { +// IndexBlocks returns the list of all index blocks, including inactive, sorted by time. +func (bm *Manager) ListIndexBlocks() ([]Info, error) { ch, cancel := bm.storage.ListBlocks(packBlockPrefix) defer cancel() + var blocks []Info + for b := range ch { + if b.Error != nil { + return nil, fmt.Errorf("error listing index blocks: %v", b.Error) + } + + blocks = append(blocks, Info{ + BlockID: b.BlockID, + Length: b.Length, + Timestamp: b.TimeStamp, + }) + } + + sortBlocksByTime(blocks) + return blocks, nil +} + +// ActiveIndexBlocks returns the list of active index blocks, sorted by time. +func (bm *Manager) ActiveIndexBlocks() ([]Info, error) { + blocks, err := bm.ListIndexBlocks() + if len(blocks) == 0 { + return nil, nil + } + + cutoffTime, err := findLatestCompactedTimestamp(blocks) + if err != nil { + return nil, err + } + + var activeBlocks []Info + for _, b := range blocks { + if b.Timestamp.After(cutoffTime) { + activeBlocks = append(activeBlocks, b) + } + } + + sortBlocksByTime(activeBlocks) + return activeBlocks, nil +} + +func sortBlocksByTime(b []Info) { + sort.Slice(b, func(i, j int) bool { + return b[i].Timestamp.Before(b[j].Timestamp) + }) +} + +func findLatestCompactedTimestamp(blocks []Info) (time.Time, error) { + // look for blocks that end with -ztimestamp + // find the latest such timestamp. + var latestTime time.Time + + for _, b := range blocks { + blk := b.BlockID + if p := strings.Index(blk, compactedBlockSuffix); p >= 0 { + unixNano, err := strconv.ParseInt(blk[p+len(compactedBlockSuffix):], 16, 64) + if err != nil { + return latestTime, fmt.Errorf("malformed index block name %q", blk) + } + + ts := time.Unix(unixNano/1e9, unixNano%1e9) + if ts.After(latestTime) { + latestTime = ts + } + } + } + + return latestTime, nil +} + +func (bm *Manager) loadMergedPackIndexLocked() (packIndexes, []string, time.Time, error) { + blocks, err := bm.ActiveIndexBlocks() + if err != nil { + return nil, nil, time.Now(), err + } + + // add block IDs to the channel + ch := make(chan string, len(blocks)) + go func() { + for _, b := range blocks { + ch <- b.BlockID + } + close(ch) + }() + t0 := time.Now() var wg sync.WaitGroup @@ -332,12 +427,7 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes, defer wg.Done() for b := range ch { - if b.Error != nil { - errors <- b.Error - return - } - - data, err := bm.getBlockInternalLocked(b.BlockID) + data, err := bm.getBlockInternalLocked(b) if err != nil { errors <- err return @@ -356,12 +446,8 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes, return } - if hasPackCreateAfter(pi, cutoffTime) { - continue - } - mu.Lock() - blockIDs = append(blockIDs, b.BlockID) + blockIDs = append(blockIDs, b) indexes = append(indexes, pi) totalSize += len(data) mu.Unlock() @@ -374,7 +460,7 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes, // Propagate async errors, if any. for err := range errors { - return nil, nil, err + return nil, nil, time.Now(), err } if false { @@ -386,17 +472,7 @@ func (bm *Manager) loadMergedPackIndexLocked(cutoffTime time.Time) (packIndexes, merged = append(merged, pi...) } - return merged, blockIDs, nil -} - -func hasPackCreateAfter(pi packIndexes, t time.Time) bool { - for _, ndx := range pi { - if ndx.CreateTime.After(t) { - return true - } - } - - return false + return merged, blockIDs, blocks[len(blocks)-1].Timestamp, nil } func (bm *Manager) ensurePackIndexesLoaded() error { @@ -409,7 +485,7 @@ func (bm *Manager) ensurePackIndexesLoaded() error { t0 := time.Now() - merged, _, err := bm.loadMergedPackIndexLocked(bm.timeNow().Add(24 * time.Hour)) + merged, _, _, err := bm.loadMergedPackIndexLocked() if err != nil { return err } @@ -523,16 +599,16 @@ func (bm *Manager) regroupPacksAndUnpacked(ndx packIndexes) packIndexes { } // CompactIndexes performs compaction of index blocks and optionally removes index blocks not present in the provided set. -func (bm *Manager) CompactIndexes(cutoffTime time.Time, inUseBlocks map[string]bool) error { +func (bm *Manager) CompactIndexes() error { bm.lock() defer bm.unlock() - merged, blockIDs, err := bm.loadMergedPackIndexLocked(cutoffTime) + merged, blockIDs, latestBlockTime, err := bm.loadMergedPackIndexLocked() if err != nil { return err } - if err := bm.compactIndexes(merged, blockIDs, inUseBlocks); err != nil { + if err := bm.compactIndexes(merged, blockIDs, latestBlockTime); err != nil { return err } @@ -631,52 +707,20 @@ func (bm *Manager) ListGroupBlocks(groupID string) []Info { return result } -func (bm *Manager) compactIndexes(merged packIndexes, blockIDs []string, inUseBlocks map[string]bool) error { +func (bm *Manager) compactIndexes(merged packIndexes, blockIDs []string, latestBlockTime time.Time) error { dedupeBlockIDsAndIndex(merged) - if inUseBlocks != nil { - for _, m := range merged { - for b := range m.Items { - if !inUseBlocks[b] { - //log.Printf("removing block in index but not in use: %q", b) - delete(m.Items, b) - } - } - } - } - merged = removeEmptyIndexes(merged) merged = bm.regroupPacksAndUnpacked(merged) - - if len(blockIDs) <= 1 && inUseBlocks == nil { + if len(blockIDs) <= 1 { log.Printf("skipping index compaction - already compacted") return nil } - compactedBlock, err := bm.writePackIndexes(merged) + _, err := bm.writePackIndexes(merged, &latestBlockTime) if err != nil { return err } - ch := makeStringChannel(blockIDs) - var wg sync.WaitGroup - - for i := 0; i < parallelDeletes; i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - - for blockID := range ch { - if blockID == compactedBlock { - log.Printf("warning: sanity check failed, not deleting freshly-written compacted index: %q", compactedBlock) - continue - } - if err := bm.storage.DeleteBlock(blockID); err != nil { - log.Printf("warning: unable to delete %q: %v", blockID, err) - } - } - }(i) - } - wg.Wait() return nil } @@ -721,7 +765,7 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) { return blockID, err } - blockID, err := bm.writeUnpackedBlockNotLocked(data, "", false) + blockID, err := bm.writeUnpackedBlockNotLocked(data, "", "", false) if err != nil { return "", err } @@ -734,7 +778,7 @@ func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) { } // Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size. -func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Time) error { +func (bm *Manager) Repackage(groupID string, maxLength int64) error { bm.lock() defer bm.unlock() @@ -746,7 +790,7 @@ func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Ti return err } - merged, _, err := bm.loadMergedPackIndexLocked(cutoffTime) + merged, _, _, err := bm.loadMergedPackIndexLocked() if err != nil { return err } @@ -789,8 +833,8 @@ func (bm *Manager) Repackage(groupID string, maxLength int64, cutoffTime time.Ti return nil } -func (bm *Manager) writeUnpackedBlockNotLocked(data []byte, prefix string, force bool) (string, error) { - blockID := prefix + bm.hashData(data) +func (bm *Manager) writeUnpackedBlockNotLocked(data []byte, prefix string, suffix string, force bool) (string, error) { + blockID := prefix + bm.hashData(data) + suffix if !force { // Before performing encryption, check if the block is already there. @@ -959,6 +1003,9 @@ func (bm *Manager) getBlockInternalLocked(blockID string) ([]byte, error) { func (bm *Manager) verifyChecksum(data []byte, blockID string) error { expected := bm.formatter.ComputeBlockID(data) + if p := strings.Index(blockID, compactedBlockSuffix); p >= 0 { + blockID = blockID[0:p] + } if !strings.HasSuffix(blockID, expected) { atomic.AddInt32(&bm.stats.InvalidBlocks, 1) return fmt.Errorf("invalid checksum for blob: '%v', expected %v", blockID, expected) diff --git a/cli/command_block_index_list.go b/cli/command_block_index_list.go index af9224856..8dbba9ddf 100644 --- a/cli/command_block_index_list.go +++ b/cli/command_block_index_list.go @@ -3,24 +3,39 @@ import ( "fmt" + "github.com/kopia/kopia/block" + kingpin "gopkg.in/alecthomas/kingpin.v2" ) var ( blockIndexListCommand = blockIndexCommands.Command("list", "List block indexes").Alias("ls") + blockIndexListAll = blockIndexListCommand.Flag("all", "List all blocks, not just active ones").Short('a').Bool() ) func runListBlockIndexesAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - ch, cancel := rep.Storage.ListBlocks("P") - defer cancel() + var blks []block.Info + var err error - for b := range ch { - fmt.Printf("%-34v %10v %v\n", b.BlockID, b.Length, b.TimeStamp.Local().Format(timeFormat)) + if !*blockIndexListAll { + blks, err = rep.Blocks.ActiveIndexBlocks() + } else { + blks, err = rep.Blocks.ListIndexBlocks() } + if err != nil { + return err + } + + for _, b := range blks { + fmt.Printf("%-54v %10v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormatPrecise)) + } + + fmt.Printf("total %v blocks\n", len(blks)) + return nil } diff --git a/cli/command_block_index_optimize.go b/cli/command_block_index_optimize.go index cc5dd0e75..dfc750be8 100644 --- a/cli/command_block_index_optimize.go +++ b/cli/command_block_index_optimize.go @@ -1,21 +1,18 @@ package cli import ( - "time" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) var ( optimizeCommand = blockIndexCommands.Command("optimize", "Optimize block indexes.") - optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of blocks to re-index").Default("24h").Duration() ) func runOptimizeCommand(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - return rep.Blocks.CompactIndexes(time.Now().Add(-*optimizeMinAge), nil) + return rep.Blocks.CompactIndexes() } func init() { diff --git a/cli/command_block_repack.go b/cli/command_block_repack.go index c9b9eca73..c9c725607 100644 --- a/cli/command_block_repack.go +++ b/cli/command_block_repack.go @@ -1,8 +1,6 @@ package cli import ( - "time" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -10,14 +8,13 @@ blockRepackCommand = blockCommands.Command("repack", "Repackage small blocks into bigger ones") blockRepackGroup = blockRepackCommand.Flag("group", "Group to repack").Default("DIR").String() blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Int64() - blockRepackMinAge = blockRepackCommand.Flag("min-age", "Minimum age to repack").Default("24h").Duration() ) func runBlockRepackAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold, time.Now().Add(-*blockRepackMinAge)); err != nil { + if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold); err != nil { return err } diff --git a/cli/command_ls.go b/cli/command_ls.go index dcd903cbf..3fecb26b8 100644 --- a/cli/command_ls.go +++ b/cli/command_ls.go @@ -13,6 +13,7 @@ ) const timeFormat = "02 Jan 06 15:04:05" +const timeFormatPrecise = "02 Jan 06 15:04:05.000000000" var ( lsCommand = app.Command("list", "List a directory stored in repository object.").Alias("ls") diff --git a/cli/command_object_cleanup.go b/cli/command_object_cleanup.go index 3f8be2535..608a4e951 100644 --- a/cli/command_object_cleanup.go +++ b/cli/command_object_cleanup.go @@ -235,7 +235,7 @@ func runCleanupCommand(context *kingpin.ParseContext) error { log.Printf("Found %v in-use objects in %v blocks in %v", len(ctx.queue.visited), len(ctx.inuse), dt) - rep.Blocks.CompactIndexes(cutoffTime, ctx.inuse) + rep.Blocks.CompactIndexes() var totalBlocks int var totalBytes int64