mirror of
https://github.com/kopia/kopia.git
synced 2026-01-25 14:58:00 -05:00
parallelized 'block rewrite'
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/kopia/kopia/block"
|
||||
|
||||
@@ -13,37 +14,69 @@
|
||||
)
|
||||
|
||||
var (
|
||||
blockRewriteCommand = blockCommands.Command("rewrite", "Rewrite blocks using most recent format")
|
||||
blockRewriteIDs = blockRewriteCommand.Arg("blockID", "Identifiers of blocks to rewrite").Strings()
|
||||
blockRewriteCommand = blockCommands.Command("rewrite", "Rewrite blocks using most recent format")
|
||||
blockRewriteIDs = blockRewriteCommand.Arg("blockID", "Identifiers of blocks to rewrite").Strings()
|
||||
blockRewriteParallelism = blockRewriteCommand.Flag("parallelism", "Number of parallel workers").Default("16").Int()
|
||||
|
||||
blockRewriteShortPacks = blockRewriteCommand.Flag("short", "Rewrite blocks from short packs").Bool()
|
||||
blockRewriteFormatVersion = blockRewriteCommand.Flag("format-version", "Rewrite blocks using the provided format version").Default("-1").Int()
|
||||
blockRewritePackPrefix = blockRewriteCommand.Flag("pack-prefix", "Only rewrite pack blocks with a given prefix").String()
|
||||
blockRewriteDryRun = blockRewriteCommand.Flag("dry-run", "Do not actually rewrite, only print what would happen").Short('n').Bool()
|
||||
)
|
||||
|
||||
type blockInfoOrError struct {
|
||||
block.Info
|
||||
err error
|
||||
}
|
||||
|
||||
func runRewriteBlocksAction(ctx context.Context, rep *repo.Repository) error {
|
||||
blocks, err := getBlocksToRewrite(ctx, rep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine blocks to rewrite: %v", err)
|
||||
blocks := getBlocksToRewrite(ctx, rep)
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
totalBytes int64
|
||||
failedCount int
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < *blockRewriteParallelism; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for b := range blocks {
|
||||
if b.err != nil {
|
||||
log.Error().Msgf("got error: %v", b.err)
|
||||
mu.Lock()
|
||||
failedCount++
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
var optDeleted string
|
||||
if b.Deleted {
|
||||
optDeleted = " (deleted)"
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "Rewriting block %v (%v bytes) from pack %v%v\n", b.BlockID, b.Length, b.PackFile, optDeleted)
|
||||
mu.Lock()
|
||||
totalBytes += int64(b.Length)
|
||||
mu.Unlock()
|
||||
if *blockRewriteDryRun {
|
||||
continue
|
||||
}
|
||||
if err := rep.Blocks.RewriteBlock(ctx, b.BlockID); err != nil {
|
||||
log.Warn().Msgf("unable to rewrite block %q: %v", b.BlockID, err)
|
||||
mu.Lock()
|
||||
failedCount++
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
failedCount := 0
|
||||
for _, b := range blocks {
|
||||
var optDeleted string
|
||||
if b.Deleted {
|
||||
optDeleted = " (deleted)"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "Rewriting block %v (%v bytes) from pack %v%v\n", b.BlockID, b.Length, b.PackFile, optDeleted)
|
||||
totalBytes += int64(b.Length)
|
||||
if *blockRewriteDryRun {
|
||||
continue
|
||||
}
|
||||
if err := rep.Blocks.RewriteBlock(ctx, b.BlockID); err != nil {
|
||||
log.Warn().Msgf("unable to rewrite block %q: %v", b.BlockID, err)
|
||||
failedCount++
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
fmt.Fprintf(os.Stderr, "Total bytes rewritten %v\n", totalBytes)
|
||||
|
||||
@@ -54,77 +87,67 @@ func runRewriteBlocksAction(ctx context.Context, rep *repo.Repository) error {
|
||||
return fmt.Errorf("failed to rewrite %v blocks", failedCount)
|
||||
}
|
||||
|
||||
func getBlocksToRewrite(ctx context.Context, rep *repo.Repository) ([]block.Info, error) {
|
||||
// get blocks listed on command line
|
||||
result, err := getBlockInfos(ctx, rep, *blockRewriteIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func getBlocksToRewrite(ctx context.Context, rep *repo.Repository) <-chan blockInfoOrError {
|
||||
ch := make(chan blockInfoOrError)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
// add all blocks from short packs
|
||||
if *blockRewriteShortPacks {
|
||||
threshold := uint32(rep.Blocks.Format.MaxPackSize * 6 / 10)
|
||||
info, err := getBlocksInShortPacks(ctx, rep, threshold)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// get blocks listed on command line
|
||||
findBlockInfos(ctx, rep, ch, *blockRewriteIDs)
|
||||
|
||||
// add all blocks from short packs
|
||||
if *blockRewriteShortPacks {
|
||||
threshold := uint32(rep.Blocks.Format.MaxPackSize * 6 / 10)
|
||||
findBlocksInShortPacks(ctx, rep, ch, threshold)
|
||||
}
|
||||
|
||||
result = append(result, info...)
|
||||
}
|
||||
|
||||
// add all blocks with given format version
|
||||
if *blockRewriteFormatVersion != -1 {
|
||||
info, err := getBlocksWithFormatVersion(ctx, rep, *blockRewriteFormatVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// add all blocks with given format version
|
||||
if *blockRewriteFormatVersion != -1 {
|
||||
findBlocksWithFormatVersion(ctx, rep, ch, *blockRewriteFormatVersion)
|
||||
}
|
||||
}()
|
||||
|
||||
result = append(result, info...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return ch
|
||||
}
|
||||
|
||||
func getBlockInfos(ctx context.Context, rep *repo.Repository, blockIDs []string) ([]block.Info, error) {
|
||||
var result []block.Info
|
||||
func findBlockInfos(ctx context.Context, rep *repo.Repository, ch chan blockInfoOrError, blockIDs []string) {
|
||||
for _, blockID := range blockIDs {
|
||||
i, err := rep.Blocks.BlockInfo(ctx, blockID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get info for block %q: %v", blockID, err)
|
||||
ch <- blockInfoOrError{err: fmt.Errorf("unable to get info for block %q: %v", blockID, err)}
|
||||
} else {
|
||||
ch <- blockInfoOrError{Info: i}
|
||||
}
|
||||
result = append(result, i)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func getBlocksWithFormatVersion(ctx context.Context, rep *repo.Repository, version int) ([]block.Info, error) {
|
||||
var result []block.Info
|
||||
|
||||
func findBlocksWithFormatVersion(ctx context.Context, rep *repo.Repository, ch chan blockInfoOrError, version int) {
|
||||
infos, err := rep.Blocks.ListBlockInfos("", true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to list index blocks: %v", err)
|
||||
ch <- blockInfoOrError{err: fmt.Errorf("unable to list index blocks: %v", err)}
|
||||
return
|
||||
}
|
||||
|
||||
for _, b := range infos {
|
||||
if int(b.FormatVersion) == *blockRewriteFormatVersion && strings.HasPrefix(b.PackFile, *blockRewritePackPrefix) {
|
||||
result = append(result, b)
|
||||
ch <- blockInfoOrError{Info: b}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func getBlocksInShortPacks(ctx context.Context, rep *repo.Repository, threshold uint32) ([]block.Info, error) {
|
||||
var result []block.Info
|
||||
|
||||
func findBlocksInShortPacks(ctx context.Context, rep *repo.Repository, ch chan blockInfoOrError, threshold uint32) {
|
||||
log.Printf("listing blocks...")
|
||||
infos, err := rep.Blocks.ListBlockInfos("", true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to list index blocks: %v", err)
|
||||
ch <- blockInfoOrError{err: fmt.Errorf("unable to list index blocks: %v", err)}
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("finding blocks in short packs...")
|
||||
shortPackBlocks, err := findShortPackBlocks(infos, threshold)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to find short pack blocks: %v", err)
|
||||
ch <- blockInfoOrError{err: fmt.Errorf("unable to find short pack blocks: %v", err)}
|
||||
return
|
||||
}
|
||||
log.Printf("found %v short pack blocks", len(shortPackBlocks))
|
||||
|
||||
@@ -133,11 +156,10 @@ func getBlocksInShortPacks(ctx context.Context, rep *repo.Repository, threshold
|
||||
} else {
|
||||
for _, b := range infos {
|
||||
if shortPackBlocks[b.PackFile] && strings.HasPrefix(b.PackFile, *blockRewritePackPrefix) {
|
||||
result = append(result, b)
|
||||
ch <- blockInfoOrError{Info: b}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func findShortPackBlocks(infos []block.Info, threshold uint32) (map[string]bool, error) {
|
||||
|
||||
Reference in New Issue
Block a user