mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 14:44:47 -04:00
feat(cli): added decompression benchmark (#3773)
* feat(cli): added decompression benchmark * Update cli/command_benchmark_compression.go Co-authored-by: Julio López <1953782+julio-lopez@users.noreply.github.com> * fixed log output * deduped code --------- Co-authored-by: Julio López <1953782+julio-lopez@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@@ -30,30 +31,58 @@ type cryptoBenchResult struct {
|
||||
throughput float64
|
||||
}
|
||||
|
||||
func runInParallelNoResult(parallel int, run func()) {
|
||||
runInParallel(parallel, func() any {
|
||||
func runInParallelNoInputNoResult(n int, run func()) {
|
||||
dummyArgs := make([]int, n)
|
||||
|
||||
runInParallelNoResult(dummyArgs, func(_ int) {
|
||||
run()
|
||||
})
|
||||
}
|
||||
|
||||
func runInParallelNoInput[T any](n int, run func() T) T {
|
||||
dummyArgs := make([]int, n)
|
||||
|
||||
return runInParallel(dummyArgs, func(_ int) T {
|
||||
return run()
|
||||
})
|
||||
}
|
||||
|
||||
func runInParallelNoResult[A any](args []A, run func(arg A)) {
|
||||
runInParallel(args, func(arg A) any {
|
||||
run(arg)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func runInParallel[T any](parallel int, run func() T) T {
|
||||
func runInParallel[A any, T any](args []A, run func(arg A) T) T {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < parallel-1; i++ {
|
||||
for i := 0; i < len(args)-1; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
arg := args[i]
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
run()
|
||||
run(arg)
|
||||
}()
|
||||
}
|
||||
|
||||
// run one on the main goroutine and N-1 in parallel.
|
||||
v := run()
|
||||
v := run(args[len(args)-1])
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
func makeOutputBuffers(n, capacity int) []*bytes.Buffer {
|
||||
var res []*bytes.Buffer
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
res = append(res, bytes.NewBuffer(make([]byte, 0, capacity)))
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
@@ -8,9 +8,11 @@
|
||||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/timetrack"
|
||||
"github.com/kopia/kopia/internal/units"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
@@ -27,6 +29,8 @@ type commandBenchmarkCompression struct {
|
||||
optionPrint bool
|
||||
parallel int
|
||||
deprecated bool
|
||||
operations string
|
||||
algorithms string
|
||||
|
||||
out textOutput
|
||||
}
|
||||
@@ -38,9 +42,11 @@ func (c *commandBenchmarkCompression) setup(svc appServices, parent commandParen
|
||||
cmd.Flag("by-size", "Sort results by size").BoolVar(&c.bySize)
|
||||
cmd.Flag("by-alloc", "Sort results by allocated bytes").BoolVar(&c.byAllocated)
|
||||
cmd.Flag("parallel", "Number of parallel goroutines").Default("1").IntVar(&c.parallel)
|
||||
cmd.Flag("operations", "Operations").Default("both").EnumVar(&c.operations, "compress", "decompress", "both")
|
||||
cmd.Flag("verify-stable", "Verify that compression is stable").BoolVar(&c.verifyStable)
|
||||
cmd.Flag("print-options", "Print out options usable for repository creation").BoolVar(&c.optionPrint)
|
||||
cmd.Flag("deprecated", "Included deprecated compression algorithms").BoolVar(&c.deprecated)
|
||||
cmd.Flag("algorithms", "Comma-separated list of algorithms to benchmark").StringVar(&c.algorithms)
|
||||
cmd.Action(svc.noRepositoryAction(c.run))
|
||||
c.out.setup(svc)
|
||||
}
|
||||
@@ -62,7 +68,7 @@ func (c *commandBenchmarkCompression) readInputFile(ctx context.Context) ([]byte
|
||||
if dataLength > defaultCompressedDataByMethod {
|
||||
dataLength = defaultCompressedDataByMethod
|
||||
|
||||
log(ctx).Infof("NOTICE: The provided input file is too big, using first %v.", units.BytesString(dataLength))
|
||||
log(ctx).Infof("NOTICE: The provided input file is too big, using first %v.", units.BytesStringBase2(dataLength))
|
||||
}
|
||||
|
||||
data := make([]byte, dataLength)
|
||||
@@ -82,8 +88,26 @@ type compressionBechmarkResult struct {
|
||||
allocBytes int64
|
||||
}
|
||||
|
||||
func (c *commandBenchmarkCompression) shouldIncludeAlgorithm(name compression.Name) bool {
|
||||
if c.algorithms == "" {
|
||||
if compression.IsDeprecated[name] && !c.deprecated {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
for _, a := range strings.Split(c.algorithms, ",") {
|
||||
if strings.HasPrefix(string(name), a) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *commandBenchmarkCompression) run(ctx context.Context) error {
|
||||
var results []compressionBechmarkResult
|
||||
var benchmarkCompression, benchmarkDecompression bool
|
||||
|
||||
data, err := c.readInputFile(ctx)
|
||||
if err != nil {
|
||||
@@ -94,8 +118,6 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error {
|
||||
return errors.Errorf("empty data file")
|
||||
}
|
||||
|
||||
log(ctx).Infof("Compressing input file %q (%v) using all compression methods.", c.dataFile, units.BytesString(int64(len(data))))
|
||||
|
||||
repeatCount := c.repeat
|
||||
|
||||
if repeatCount == 0 {
|
||||
@@ -106,13 +128,49 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
log(ctx).Infof("Repeating %v times per compression method (total %v). Override with --repeat=N.", repeatCount, units.BytesString(int64(repeatCount*len(data))))
|
||||
algorithms := map[compression.Name]compression.Compressor{}
|
||||
|
||||
for name, comp := range compression.ByName {
|
||||
if compression.IsDeprecated[name] && !c.deprecated {
|
||||
continue
|
||||
if c.shouldIncludeAlgorithm(name) {
|
||||
algorithms[name] = comp
|
||||
}
|
||||
}
|
||||
|
||||
log(ctx).Infof("Will repeat each benchmark %v times per compression method (total %v). Override with --repeat=N.", repeatCount, units.BytesString(int64(repeatCount*len(data))))
|
||||
|
||||
switch c.operations {
|
||||
case "compress":
|
||||
benchmarkCompression = true
|
||||
benchmarkDecompression = false
|
||||
case "decompress":
|
||||
benchmarkCompression = false
|
||||
benchmarkDecompression = true
|
||||
default:
|
||||
benchmarkCompression = true
|
||||
benchmarkDecompression = true
|
||||
}
|
||||
|
||||
if benchmarkCompression {
|
||||
if err := c.runCompression(ctx, data, repeatCount, algorithms); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if benchmarkDecompression {
|
||||
if err := c.runDecompression(ctx, data, repeatCount, algorithms); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandBenchmarkCompression) runCompression(ctx context.Context, data []byte, repeatCount int, algorithms map[compression.Name]compression.Compressor) error {
|
||||
var results []compressionBechmarkResult
|
||||
|
||||
log(ctx).Infof("Compressing input file %q (%v) using %v compression methods.", c.dataFile, units.BytesString(int64(len(data))), len(algorithms))
|
||||
|
||||
for name, comp := range algorithms {
|
||||
log(ctx).Infof("Benchmarking compressor '%v'...", name)
|
||||
|
||||
cnt := repeatCount
|
||||
@@ -121,11 +179,10 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error {
|
||||
|
||||
var startMS, endMS runtime.MemStats
|
||||
|
||||
run := func() int64 {
|
||||
run := func(compressed *bytes.Buffer) int64 {
|
||||
var (
|
||||
compressedSize int64
|
||||
lastHash uint64
|
||||
compressed bytes.Buffer
|
||||
input = bytes.NewReader(nil)
|
||||
)
|
||||
|
||||
@@ -133,7 +190,7 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error {
|
||||
compressed.Reset()
|
||||
input.Reset(data)
|
||||
|
||||
if err := comp.Compress(&compressed, input); err != nil {
|
||||
if err := comp.Compress(compressed, input); err != nil {
|
||||
log(ctx).Errorf("compression %q failed: %v", name, err)
|
||||
continue
|
||||
}
|
||||
@@ -155,11 +212,81 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error {
|
||||
return compressedSize
|
||||
}
|
||||
|
||||
outputBuffers := makeOutputBuffers(c.parallel, defaultCompressedDataByMethod)
|
||||
|
||||
tt := timetrack.Start()
|
||||
|
||||
runtime.ReadMemStats(&startMS)
|
||||
|
||||
compressedSize := runInParallel(c.parallel, run)
|
||||
compressedSize := runInParallel(outputBuffers, run)
|
||||
|
||||
runtime.ReadMemStats(&endMS)
|
||||
|
||||
_, perSecond := tt.Completed(float64(c.parallel) * float64(len(data)) * float64(cnt))
|
||||
|
||||
results = append(results,
|
||||
compressionBechmarkResult{
|
||||
compression: name,
|
||||
throughput: perSecond,
|
||||
compressedSize: compressedSize,
|
||||
allocations: int64(endMS.Mallocs - startMS.Mallocs),
|
||||
allocBytes: int64(endMS.TotalAlloc - startMS.TotalAlloc),
|
||||
})
|
||||
}
|
||||
|
||||
c.sortResults(results)
|
||||
c.printResults(results)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandBenchmarkCompression) runDecompression(ctx context.Context, data []byte, repeatCount int, algorithms map[compression.Name]compression.Compressor) error {
|
||||
var results []compressionBechmarkResult
|
||||
|
||||
log(ctx).Infof("Decompressing input file %q (%v) using %v compression methods.", c.dataFile, units.BytesString(int64(len(data))), len(algorithms))
|
||||
|
||||
var compressedInput gather.WriteBuffer
|
||||
defer compressedInput.Close()
|
||||
|
||||
for name, comp := range algorithms {
|
||||
compressedInput.Reset()
|
||||
|
||||
if err := comp.Compress(&compressedInput, bytes.NewReader(data)); err != nil {
|
||||
return errors.Wrapf(err, "unable to compress data using %v", name)
|
||||
}
|
||||
|
||||
compressedInputBytes := compressedInput.ToByteSlice()
|
||||
|
||||
log(ctx).Infof("Benchmarking decompressor '%v'...", name)
|
||||
|
||||
cnt := repeatCount
|
||||
|
||||
runtime.GC()
|
||||
|
||||
var startMS, endMS runtime.MemStats
|
||||
|
||||
run := func(decompressed *bytes.Buffer) int64 {
|
||||
input := bytes.NewReader(nil)
|
||||
|
||||
for i := 0; i < cnt; i++ {
|
||||
decompressed.Reset()
|
||||
input.Reset(compressedInputBytes)
|
||||
|
||||
if err := comp.Decompress(decompressed, input, true); err != nil {
|
||||
log(ctx).Errorf("decompression %q failed: %v", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return int64(compressedInput.Length())
|
||||
}
|
||||
|
||||
outputBuffers := makeOutputBuffers(c.parallel, defaultCompressedDataByMethod)
|
||||
|
||||
tt := timetrack.Start()
|
||||
|
||||
runtime.ReadMemStats(&startMS)
|
||||
|
||||
compressedSize := runInParallel(outputBuffers, run)
|
||||
|
||||
runtime.ReadMemStats(&endMS)
|
||||
|
||||
@@ -199,7 +326,7 @@ func (c *commandBenchmarkCompression) sortResults(results []compressionBechmarkR
|
||||
}
|
||||
|
||||
func (c *commandBenchmarkCompression) printResults(results []compressionBechmarkResult) {
|
||||
c.out.printStdout(" %-26v %-12v %-12v %v\n", "Compression", "Compressed", "Throughput", "Allocs Usage")
|
||||
c.out.printStdout(" %-26v %-12v %-12v %v\n", "Compression", "Compressed", "Throughput", "Allocs Memory Usage")
|
||||
c.out.printStdout("------------------------------------------------------------------------------------------------\n")
|
||||
|
||||
for ndx, r := range results {
|
||||
|
||||
@@ -91,7 +91,7 @@ func (c *commandBenchmarkCrypto) runBenchmark(ctx context.Context) []cryptoBench
|
||||
|
||||
hashCount := c.repeat
|
||||
|
||||
runInParallelNoResult(c.parallel, func() {
|
||||
runInParallelNoInputNoResult(c.parallel, func() {
|
||||
var hashOutput [hashing.MaxHashSize]byte
|
||||
|
||||
var encryptOutput gather.WriteBuffer
|
||||
|
||||
@@ -95,7 +95,7 @@ func (c *commandBenchmarkEcc) runBenchmark(ctx context.Context) []eccBenchResult
|
||||
|
||||
repeat := c.repeat
|
||||
|
||||
runInParallelNoResult(c.parallel, func() {
|
||||
runInParallelNoInputNoResult(c.parallel, func() {
|
||||
var tmp gather.WriteBuffer
|
||||
defer tmp.Close()
|
||||
|
||||
@@ -121,7 +121,7 @@ func (c *commandBenchmarkEcc) runBenchmark(ctx context.Context) []eccBenchResult
|
||||
input = encodedBuffer.Bytes()
|
||||
tt = timetrack.Start()
|
||||
|
||||
runInParallelNoResult(c.parallel, func() {
|
||||
runInParallelNoInputNoResult(c.parallel, func() {
|
||||
var tmp gather.WriteBuffer
|
||||
defer tmp.Close()
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ func (c *commandBenchmarkEncryption) runBenchmark(ctx context.Context) []cryptoB
|
||||
|
||||
hashCount := c.repeat
|
||||
|
||||
runInParallelNoResult(c.parallel, func() {
|
||||
runInParallelNoInputNoResult(c.parallel, func() {
|
||||
var hashOutput [hashing.MaxHashSize]byte
|
||||
|
||||
var encryptOutput gather.WriteBuffer
|
||||
|
||||
@@ -78,7 +78,7 @@ func (c *commandBenchmarkHashing) runBenchmark(ctx context.Context) []cryptoBenc
|
||||
|
||||
hashCount := c.repeat
|
||||
|
||||
runInParallelNoResult(c.parallel, func() {
|
||||
runInParallelNoInputNoResult(c.parallel, func() {
|
||||
var hashOutput [hashing.MaxHashSize]byte
|
||||
|
||||
for i := 0; i < hashCount; i++ {
|
||||
|
||||
@@ -80,7 +80,7 @@ type benchResult struct {
|
||||
for _, sp := range splitter.SupportedAlgorithms() {
|
||||
tt := timetrack.Start()
|
||||
|
||||
segmentLengths := runInParallel(c.parallel, func() []int {
|
||||
segmentLengths := runInParallelNoInput(c.parallel, func() []int {
|
||||
fact := splitter.GetFactory(sp)
|
||||
|
||||
var segmentLengths []int
|
||||
|
||||
Reference in New Issue
Block a user