From f4424bd79c8f9da16d39dd54b42eefef4f27bfc8 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 23 Jun 2018 12:30:59 -0700 Subject: [PATCH] parallelized 'block rewrite' --- cli/command_block_rewrite.go | 152 ++++++++++++++++++++--------------- 1 file changed, 87 insertions(+), 65 deletions(-) diff --git a/cli/command_block_rewrite.go b/cli/command_block_rewrite.go index 29bdd17d0..403ffeb1e 100644 --- a/cli/command_block_rewrite.go +++ b/cli/command_block_rewrite.go @@ -5,6 +5,7 @@ "fmt" "os" "strings" + "sync" "github.com/kopia/kopia/block" @@ -13,37 +14,69 @@ ) var ( - blockRewriteCommand = blockCommands.Command("rewrite", "Rewrite blocks using most recent format") - blockRewriteIDs = blockRewriteCommand.Arg("blockID", "Identifiers of blocks to rewrite").Strings() + blockRewriteCommand = blockCommands.Command("rewrite", "Rewrite blocks using most recent format") + blockRewriteIDs = blockRewriteCommand.Arg("blockID", "Identifiers of blocks to rewrite").Strings() + blockRewriteParallelism = blockRewriteCommand.Flag("parallelism", "Number of parallel workers").Default("16").Int() + blockRewriteShortPacks = blockRewriteCommand.Flag("short", "Rewrite blocks from short packs").Bool() blockRewriteFormatVersion = blockRewriteCommand.Flag("format-version", "Rewrite blocks using the provided format version").Default("-1").Int() blockRewritePackPrefix = blockRewriteCommand.Flag("pack-prefix", "Only rewrite pack blocks with a given prefix").String() blockRewriteDryRun = blockRewriteCommand.Flag("dry-run", "Do not actually rewrite, only print what would happen").Short('n').Bool() ) +type blockInfoOrError struct { + block.Info + err error +} + func runRewriteBlocksAction(ctx context.Context, rep *repo.Repository) error { - blocks, err := getBlocksToRewrite(ctx, rep) - if err != nil { - return fmt.Errorf("unable to determine blocks to rewrite: %v", err) + blocks := getBlocksToRewrite(ctx, rep) + + var ( + mu sync.Mutex + totalBytes int64 + failedCount int + ) + + var wg sync.WaitGroup + + for i := 0; i < *blockRewriteParallelism; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for b := range blocks { + if b.err != nil { + log.Error().Msgf("got error: %v", b.err) + mu.Lock() + failedCount++ + mu.Unlock() + return + } + + var optDeleted string + if b.Deleted { + optDeleted = " (deleted)" + } + + fmt.Fprintf(os.Stderr, "Rewriting block %v (%v bytes) from pack %v%v\n", b.BlockID, b.Length, b.PackFile, optDeleted) + mu.Lock() + totalBytes += int64(b.Length) + mu.Unlock() + if *blockRewriteDryRun { + continue + } + if err := rep.Blocks.RewriteBlock(ctx, b.BlockID); err != nil { + log.Warn().Msgf("unable to rewrite block %q: %v", b.BlockID, err) + mu.Lock() + failedCount++ + mu.Unlock() + } + } + }() } - var totalBytes int64 - failedCount := 0 - for _, b := range blocks { - var optDeleted string - if b.Deleted { - optDeleted = " (deleted)" - } - fmt.Fprintf(os.Stderr, "Rewriting block %v (%v bytes) from pack %v%v\n", b.BlockID, b.Length, b.PackFile, optDeleted) - totalBytes += int64(b.Length) - if *blockRewriteDryRun { - continue - } - if err := rep.Blocks.RewriteBlock(ctx, b.BlockID); err != nil { - log.Warn().Msgf("unable to rewrite block %q: %v", b.BlockID, err) - failedCount++ - } - } + wg.Wait() fmt.Fprintf(os.Stderr, "Total bytes rewritten %v\n", totalBytes) @@ -54,77 +87,67 @@ func runRewriteBlocksAction(ctx context.Context, rep *repo.Repository) error { return fmt.Errorf("failed to rewrite %v blocks", failedCount) } -func getBlocksToRewrite(ctx context.Context, rep *repo.Repository) ([]block.Info, error) { - // get blocks listed on command line - result, err := getBlockInfos(ctx, rep, *blockRewriteIDs) - if err != nil { - return nil, err - } +func getBlocksToRewrite(ctx context.Context, rep *repo.Repository) <-chan blockInfoOrError { + ch := make(chan blockInfoOrError) + go func() { + defer close(ch) - // add all blocks from short packs - if *blockRewriteShortPacks { - threshold := uint32(rep.Blocks.Format.MaxPackSize * 6 / 10) - info, err := getBlocksInShortPacks(ctx, rep, threshold) - if err != nil { - return nil, err + // get blocks listed on command line + findBlockInfos(ctx, rep, ch, *blockRewriteIDs) + + // add all blocks from short packs + if *blockRewriteShortPacks { + threshold := uint32(rep.Blocks.Format.MaxPackSize * 6 / 10) + findBlocksInShortPacks(ctx, rep, ch, threshold) } - result = append(result, info...) - } - - // add all blocks with given format version - if *blockRewriteFormatVersion != -1 { - info, err := getBlocksWithFormatVersion(ctx, rep, *blockRewriteFormatVersion) - if err != nil { - return nil, err + // add all blocks with given format version + if *blockRewriteFormatVersion != -1 { + findBlocksWithFormatVersion(ctx, rep, ch, *blockRewriteFormatVersion) } + }() - result = append(result, info...) - } - - return result, nil + return ch } -func getBlockInfos(ctx context.Context, rep *repo.Repository, blockIDs []string) ([]block.Info, error) { - var result []block.Info +func findBlockInfos(ctx context.Context, rep *repo.Repository, ch chan blockInfoOrError, blockIDs []string) { for _, blockID := range blockIDs { i, err := rep.Blocks.BlockInfo(ctx, blockID) if err != nil { - return nil, fmt.Errorf("unable to get info for block %q: %v", blockID, err) + ch <- blockInfoOrError{err: fmt.Errorf("unable to get info for block %q: %v", blockID, err)} + } else { + ch <- blockInfoOrError{Info: i} } - result = append(result, i) } - return result, nil } -func getBlocksWithFormatVersion(ctx context.Context, rep *repo.Repository, version int) ([]block.Info, error) { - var result []block.Info - +func findBlocksWithFormatVersion(ctx context.Context, rep *repo.Repository, ch chan blockInfoOrError, version int) { infos, err := rep.Blocks.ListBlockInfos("", true) if err != nil { - return nil, fmt.Errorf("unable to list index blocks: %v", err) + ch <- blockInfoOrError{err: fmt.Errorf("unable to list index blocks: %v", err)} + return } for _, b := range infos { if int(b.FormatVersion) == *blockRewriteFormatVersion && strings.HasPrefix(b.PackFile, *blockRewritePackPrefix) { - result = append(result, b) + ch <- blockInfoOrError{Info: b} } } - - return result, nil } -func getBlocksInShortPacks(ctx context.Context, rep *repo.Repository, threshold uint32) ([]block.Info, error) { - var result []block.Info - +func findBlocksInShortPacks(ctx context.Context, rep *repo.Repository, ch chan blockInfoOrError, threshold uint32) { + log.Printf("listing blocks...") infos, err := rep.Blocks.ListBlockInfos("", true) if err != nil { - return nil, fmt.Errorf("unable to list index blocks: %v", err) + ch <- blockInfoOrError{err: fmt.Errorf("unable to list index blocks: %v", err)} + return } + log.Printf("finding blocks in short packs...") shortPackBlocks, err := findShortPackBlocks(infos, threshold) if err != nil { - return nil, fmt.Errorf("unable to find short pack blocks: %v", err) + ch <- blockInfoOrError{err: fmt.Errorf("unable to find short pack blocks: %v", err)} + return } log.Printf("found %v short pack blocks", len(shortPackBlocks)) @@ -133,11 +156,10 @@ func getBlocksInShortPacks(ctx context.Context, rep *repo.Repository, threshold } else { for _, b := range infos { if shortPackBlocks[b.PackFile] && strings.HasPrefix(b.PackFile, *blockRewritePackPrefix) { - result = append(result, b) + ch <- blockInfoOrError{Info: b} } } } - return result, nil } func findShortPackBlocks(infos []block.Info, threshold uint32) (map[string]bool, error) {