From fe7a418a5b2371a9e4de724f38fdb187bdd60984 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 4 Apr 2024 18:47:11 -0700 Subject: [PATCH] feat(cli): added decompression benchmark (#3773) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(cli): added decompression benchmark * Update cli/command_benchmark_compression.go Co-authored-by: Julio López <1953782+julio-lopez@users.noreply.github.com> * fixed log output * deduped code --------- Co-authored-by: Julio López <1953782+julio-lopez@users.noreply.github.com> --- cli/command_benchmark.go | 41 ++++++-- cli/command_benchmark_compression.go | 151 ++++++++++++++++++++++++--- cli/command_benchmark_crypto.go | 2 +- cli/command_benchmark_ecc.go | 4 +- cli/command_benchmark_encryption.go | 2 +- cli/command_benchmark_hashing.go | 2 +- cli/command_benchmark_splitters.go | 2 +- 7 files changed, 180 insertions(+), 24 deletions(-) diff --git a/cli/command_benchmark.go b/cli/command_benchmark.go index 301906417..eb991d717 100644 --- a/cli/command_benchmark.go +++ b/cli/command_benchmark.go @@ -1,6 +1,7 @@ package cli import ( + "bytes" "sync" ) @@ -30,30 +31,58 @@ type cryptoBenchResult struct { throughput float64 } -func runInParallelNoResult(parallel int, run func()) { - runInParallel(parallel, func() any { +func runInParallelNoInputNoResult(n int, run func()) { + dummyArgs := make([]int, n) + + runInParallelNoResult(dummyArgs, func(_ int) { run() + }) +} + +func runInParallelNoInput[T any](n int, run func() T) T { + dummyArgs := make([]int, n) + + return runInParallel(dummyArgs, func(_ int) T { + return run() + }) +} + +func runInParallelNoResult[A any](args []A, run func(arg A)) { + runInParallel(args, func(arg A) any { + run(arg) return nil }) } -func runInParallel[T any](parallel int, run func() T) T { +func runInParallel[A any, T any](args []A, run func(arg A) T) T { var wg sync.WaitGroup - for i := 0; i < parallel-1; i++ { + for i := 0; i < len(args)-1; i++ { wg.Add(1) + arg := args[i] + go func() { defer wg.Done() - run() + run(arg) }() } // run one on the main goroutine and N-1 in parallel. - v := run() + v := run(args[len(args)-1]) wg.Wait() return v } + +func makeOutputBuffers(n, capacity int) []*bytes.Buffer { + var res []*bytes.Buffer + + for i := 0; i < n; i++ { + res = append(res, bytes.NewBuffer(make([]byte, 0, capacity))) + } + + return res +} diff --git a/cli/command_benchmark_compression.go b/cli/command_benchmark_compression.go index 5d2894d93..b6e12ac43 100644 --- a/cli/command_benchmark_compression.go +++ b/cli/command_benchmark_compression.go @@ -8,9 +8,11 @@ "os" "runtime" "sort" + "strings" "github.com/pkg/errors" + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo/compression" @@ -27,6 +29,8 @@ type commandBenchmarkCompression struct { optionPrint bool parallel int deprecated bool + operations string + algorithms string out textOutput } @@ -38,9 +42,11 @@ func (c *commandBenchmarkCompression) setup(svc appServices, parent commandParen cmd.Flag("by-size", "Sort results by size").BoolVar(&c.bySize) cmd.Flag("by-alloc", "Sort results by allocated bytes").BoolVar(&c.byAllocated) cmd.Flag("parallel", "Number of parallel goroutines").Default("1").IntVar(&c.parallel) + cmd.Flag("operations", "Operations").Default("both").EnumVar(&c.operations, "compress", "decompress", "both") cmd.Flag("verify-stable", "Verify that compression is stable").BoolVar(&c.verifyStable) cmd.Flag("print-options", "Print out options usable for repository creation").BoolVar(&c.optionPrint) cmd.Flag("deprecated", "Included deprecated compression algorithms").BoolVar(&c.deprecated) + cmd.Flag("algorithms", "Comma-separated list of algorithms to benchmark").StringVar(&c.algorithms) cmd.Action(svc.noRepositoryAction(c.run)) c.out.setup(svc) } @@ -62,7 +68,7 @@ func (c *commandBenchmarkCompression) readInputFile(ctx context.Context) ([]byte if dataLength > defaultCompressedDataByMethod { dataLength = defaultCompressedDataByMethod - log(ctx).Infof("NOTICE: The provided input file is too big, using first %v.", units.BytesString(dataLength)) + log(ctx).Infof("NOTICE: The provided input file is too big, using first %v.", units.BytesStringBase2(dataLength)) } data := make([]byte, dataLength) @@ -82,8 +88,26 @@ type compressionBechmarkResult struct { allocBytes int64 } +func (c *commandBenchmarkCompression) shouldIncludeAlgorithm(name compression.Name) bool { + if c.algorithms == "" { + if compression.IsDeprecated[name] && !c.deprecated { + return false + } + + return true + } + + for _, a := range strings.Split(c.algorithms, ",") { + if strings.HasPrefix(string(name), a) { + return true + } + } + + return false +} + func (c *commandBenchmarkCompression) run(ctx context.Context) error { - var results []compressionBechmarkResult + var benchmarkCompression, benchmarkDecompression bool data, err := c.readInputFile(ctx) if err != nil { @@ -94,8 +118,6 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error { return errors.Errorf("empty data file") } - log(ctx).Infof("Compressing input file %q (%v) using all compression methods.", c.dataFile, units.BytesString(int64(len(data)))) - repeatCount := c.repeat if repeatCount == 0 { @@ -106,13 +128,49 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error { } } - log(ctx).Infof("Repeating %v times per compression method (total %v). Override with --repeat=N.", repeatCount, units.BytesString(int64(repeatCount*len(data)))) + algorithms := map[compression.Name]compression.Compressor{} for name, comp := range compression.ByName { - if compression.IsDeprecated[name] && !c.deprecated { - continue + if c.shouldIncludeAlgorithm(name) { + algorithms[name] = comp } + } + log(ctx).Infof("Will repeat each benchmark %v times per compression method (total %v). Override with --repeat=N.", repeatCount, units.BytesString(int64(repeatCount*len(data)))) + + switch c.operations { + case "compress": + benchmarkCompression = true + benchmarkDecompression = false + case "decompress": + benchmarkCompression = false + benchmarkDecompression = true + default: + benchmarkCompression = true + benchmarkDecompression = true + } + + if benchmarkCompression { + if err := c.runCompression(ctx, data, repeatCount, algorithms); err != nil { + return err + } + } + + if benchmarkDecompression { + if err := c.runDecompression(ctx, data, repeatCount, algorithms); err != nil { + return err + } + } + + return nil +} + +func (c *commandBenchmarkCompression) runCompression(ctx context.Context, data []byte, repeatCount int, algorithms map[compression.Name]compression.Compressor) error { + var results []compressionBechmarkResult + + log(ctx).Infof("Compressing input file %q (%v) using %v compression methods.", c.dataFile, units.BytesString(int64(len(data))), len(algorithms)) + + for name, comp := range algorithms { log(ctx).Infof("Benchmarking compressor '%v'...", name) cnt := repeatCount @@ -121,11 +179,10 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error { var startMS, endMS runtime.MemStats - run := func() int64 { + run := func(compressed *bytes.Buffer) int64 { var ( compressedSize int64 lastHash uint64 - compressed bytes.Buffer input = bytes.NewReader(nil) ) @@ -133,7 +190,7 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error { compressed.Reset() input.Reset(data) - if err := comp.Compress(&compressed, input); err != nil { + if err := comp.Compress(compressed, input); err != nil { log(ctx).Errorf("compression %q failed: %v", name, err) continue } @@ -155,11 +212,81 @@ func (c *commandBenchmarkCompression) run(ctx context.Context) error { return compressedSize } + outputBuffers := makeOutputBuffers(c.parallel, defaultCompressedDataByMethod) + tt := timetrack.Start() runtime.ReadMemStats(&startMS) - compressedSize := runInParallel(c.parallel, run) + compressedSize := runInParallel(outputBuffers, run) + + runtime.ReadMemStats(&endMS) + + _, perSecond := tt.Completed(float64(c.parallel) * float64(len(data)) * float64(cnt)) + + results = append(results, + compressionBechmarkResult{ + compression: name, + throughput: perSecond, + compressedSize: compressedSize, + allocations: int64(endMS.Mallocs - startMS.Mallocs), + allocBytes: int64(endMS.TotalAlloc - startMS.TotalAlloc), + }) + } + + c.sortResults(results) + c.printResults(results) + + return nil +} + +func (c *commandBenchmarkCompression) runDecompression(ctx context.Context, data []byte, repeatCount int, algorithms map[compression.Name]compression.Compressor) error { + var results []compressionBechmarkResult + + log(ctx).Infof("Decompressing input file %q (%v) using %v compression methods.", c.dataFile, units.BytesString(int64(len(data))), len(algorithms)) + + var compressedInput gather.WriteBuffer + defer compressedInput.Close() + + for name, comp := range algorithms { + compressedInput.Reset() + + if err := comp.Compress(&compressedInput, bytes.NewReader(data)); err != nil { + return errors.Wrapf(err, "unable to compress data using %v", name) + } + + compressedInputBytes := compressedInput.ToByteSlice() + + log(ctx).Infof("Benchmarking decompressor '%v'...", name) + + cnt := repeatCount + + runtime.GC() + + var startMS, endMS runtime.MemStats + + run := func(decompressed *bytes.Buffer) int64 { + input := bytes.NewReader(nil) + + for i := 0; i < cnt; i++ { + decompressed.Reset() + input.Reset(compressedInputBytes) + + if err := comp.Decompress(decompressed, input, true); err != nil { + log(ctx).Errorf("decompression %q failed: %v", name, err) + } + } + + return int64(compressedInput.Length()) + } + + outputBuffers := makeOutputBuffers(c.parallel, defaultCompressedDataByMethod) + + tt := timetrack.Start() + + runtime.ReadMemStats(&startMS) + + compressedSize := runInParallel(outputBuffers, run) runtime.ReadMemStats(&endMS) @@ -199,7 +326,7 @@ func (c *commandBenchmarkCompression) sortResults(results []compressionBechmarkR } func (c *commandBenchmarkCompression) printResults(results []compressionBechmarkResult) { - c.out.printStdout(" %-26v %-12v %-12v %v\n", "Compression", "Compressed", "Throughput", "Allocs Usage") + c.out.printStdout(" %-26v %-12v %-12v %v\n", "Compression", "Compressed", "Throughput", "Allocs Memory Usage") c.out.printStdout("------------------------------------------------------------------------------------------------\n") for ndx, r := range results { diff --git a/cli/command_benchmark_crypto.go b/cli/command_benchmark_crypto.go index 6ad1861a2..4efd169da 100644 --- a/cli/command_benchmark_crypto.go +++ b/cli/command_benchmark_crypto.go @@ -91,7 +91,7 @@ func (c *commandBenchmarkCrypto) runBenchmark(ctx context.Context) []cryptoBench hashCount := c.repeat - runInParallelNoResult(c.parallel, func() { + runInParallelNoInputNoResult(c.parallel, func() { var hashOutput [hashing.MaxHashSize]byte var encryptOutput gather.WriteBuffer diff --git a/cli/command_benchmark_ecc.go b/cli/command_benchmark_ecc.go index b9ef7b7ff..0daf4a853 100644 --- a/cli/command_benchmark_ecc.go +++ b/cli/command_benchmark_ecc.go @@ -95,7 +95,7 @@ func (c *commandBenchmarkEcc) runBenchmark(ctx context.Context) []eccBenchResult repeat := c.repeat - runInParallelNoResult(c.parallel, func() { + runInParallelNoInputNoResult(c.parallel, func() { var tmp gather.WriteBuffer defer tmp.Close() @@ -121,7 +121,7 @@ func (c *commandBenchmarkEcc) runBenchmark(ctx context.Context) []eccBenchResult input = encodedBuffer.Bytes() tt = timetrack.Start() - runInParallelNoResult(c.parallel, func() { + runInParallelNoInputNoResult(c.parallel, func() { var tmp gather.WriteBuffer defer tmp.Close() diff --git a/cli/command_benchmark_encryption.go b/cli/command_benchmark_encryption.go index 4b870d45a..faac73fb2 100644 --- a/cli/command_benchmark_encryption.go +++ b/cli/command_benchmark_encryption.go @@ -83,7 +83,7 @@ func (c *commandBenchmarkEncryption) runBenchmark(ctx context.Context) []cryptoB hashCount := c.repeat - runInParallelNoResult(c.parallel, func() { + runInParallelNoInputNoResult(c.parallel, func() { var hashOutput [hashing.MaxHashSize]byte var encryptOutput gather.WriteBuffer diff --git a/cli/command_benchmark_hashing.go b/cli/command_benchmark_hashing.go index 1c071f668..ff0acf8ab 100644 --- a/cli/command_benchmark_hashing.go +++ b/cli/command_benchmark_hashing.go @@ -78,7 +78,7 @@ func (c *commandBenchmarkHashing) runBenchmark(ctx context.Context) []cryptoBenc hashCount := c.repeat - runInParallelNoResult(c.parallel, func() { + runInParallelNoInputNoResult(c.parallel, func() { var hashOutput [hashing.MaxHashSize]byte for i := 0; i < hashCount; i++ { diff --git a/cli/command_benchmark_splitters.go b/cli/command_benchmark_splitters.go index 99b505a83..b5b9abcf7 100644 --- a/cli/command_benchmark_splitters.go +++ b/cli/command_benchmark_splitters.go @@ -80,7 +80,7 @@ type benchResult struct { for _, sp := range splitter.SupportedAlgorithms() { tt := timetrack.Start() - segmentLengths := runInParallel(c.parallel, func() []int { + segmentLengths := runInParallelNoInput(c.parallel, func() []int { fact := splitter.GetFactory(sp) var segmentLengths []int