From 0b8c4d0ef9c884bd9668e7aa2dc2b95b5024ff00 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Wed, 8 Jan 2020 21:00:45 -0800 Subject: [PATCH] object: fixed compression bug where we were not clearing the buffer this effectively defeated the purpose of compression, caused high memory usage and other kinds of bad behavior. refactored the code to prevent this issue by resetting the buffer at the caller not callee. fixed previous e2e test to catch the issue mentioned in #166, verified it fails against master and passes with this change. --- repo/object/object_manager_test.go | 32 ++++++++++++++++++++++++------ repo/object/object_writer.go | 22 ++++++++------------ 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index f2b0c74c1..08c4a913e 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -302,7 +302,7 @@ func TestEndToEndReadAndSeek(t *testing.T) { ctx := context.Background() _, om := setupTest(t) - for _, size := range []int{1, 199, 200, 201, 9999, 512434} { + for _, size := range []int{1, 199, 200, 201, 9999, 512434, 5012434} { // Create some random data sample of the specified size. randomData := make([]byte, size) cryptorand.Read(randomData) //nolint:errcheck @@ -328,20 +328,23 @@ func TestEndToEndReadAndSeek(t *testing.T) { func TestEndToEndReadAndSeekWithCompression(t *testing.T) { ctx := context.Background() - _, om := setupTest(t) for compressorName := range compression.ByName { - for _, size := range []int{1, 199, 200, 201, 9999, 512434} { - // Create some random data sample of the specified size. - randomData := make([]byte, size) + totalBytesWritten := 0 + data, om := setupTest(t) + + for _, size := range []int{1, 199, 200, 201, 9999, 512434, 5012434} { + // Create some compressible data sample of the specified size. + randomData := makeCompressibleData(size) writer := om.NewWriter(ctx, WriterOptions{Compressor: compressorName}) if _, err := writer.Write(randomData); err != nil { t.Errorf("write error: %v", err) } + totalBytesWritten += size + objectID, err := writer.Result() - t.Logf("oid: %v", objectID) writer.Close() @@ -352,8 +355,25 @@ func TestEndToEndReadAndSeekWithCompression(t *testing.T) { verify(ctx, t, om, objectID, randomData, fmt.Sprintf("%v %v", objectID, size)) } + + compressedBytes := 0 + for _, d := range data { + compressedBytes += len(d) + } + + // data is highly compressible, should easily compress to 1% of original size or less + ratio := float64(compressedBytes) / float64(totalBytesWritten) + if ratio > 0.01 { + t.Errorf("compression not effective for %v wrote %v, compressed %v, ratio %v", compressorName, totalBytesWritten, compressedBytes, ratio) + } } } + +func makeCompressibleData(size int) []byte { + phrase := []byte("quick brown fox") + return append(append([]byte(nil), phrase[0:size%len(phrase)]...), bytes.Repeat(phrase, size/len(phrase))...) +} + func verify(ctx context.Context, t *testing.T, om *Manager, objectID ID, expectedData []byte, testCaseID string) { t.Helper() diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 62fed65ba..e3595feb9 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -98,11 +98,13 @@ func (w *objectWriter) flushBuffer() error { w.indirectIndex[chunkID].Length = int64(length) w.currentPosition += int64(length) - contentBytes, isCompressed, err := w.maybeCompressedContentBytes() + contentBytes, isCompressed, err := maybeCompressedContentBytes(w.compressor, w.buffer.Bytes()) if err != nil { return errors.Wrap(err, "unable to prepare content bytes") } + w.buffer.Reset() + contentID, err := w.repo.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix) w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, contentID, length) @@ -121,27 +123,19 @@ func (w *objectWriter) flushBuffer() error { return nil } -func (w *objectWriter) maybeCompressedContentBytes() (data []byte, isCompressed bool, err error) { - if w.compressor != nil { - compressedBytes, err := w.compressor.Compress(w.buffer.Bytes()) +func maybeCompressedContentBytes(comp compression.Compressor, b []byte) (data []byte, isCompressed bool, err error) { + if comp != nil { + compressedBytes, err := comp.Compress(b) if err != nil { return nil, false, errors.Wrap(err, "compression error") } - if len(compressedBytes) < w.buffer.Len() { + if len(compressedBytes) < len(b) { return compressedBytes, true, nil } } - var b2 bytes.Buffer - - if _, err := w.buffer.WriteTo(&b2); err != nil { - return nil, false, err - } - - w.buffer.Reset() - - return b2.Bytes(), false, nil + return append([]byte{}, b...), false, nil } func (w *objectWriter) Result() (ID, error) {