object: fixed compression bug where we were not clearing the buffer

this effectively defeated the purpose of compression, caused high
memory usage and other kinds of bad behavior.

refactored the code to prevent this issue by resetting the buffer
at the caller not callee.

fixed previous e2e test to catch the issue mentioned in #166,
verified it fails against master and passes with this change.
This commit is contained in:
Jarek Kowalski
2020-01-08 21:00:45 -08:00
parent de71f0f057
commit 0b8c4d0ef9
2 changed files with 34 additions and 20 deletions

View File

@@ -302,7 +302,7 @@ func TestEndToEndReadAndSeek(t *testing.T) {
ctx := context.Background()
_, om := setupTest(t)
for _, size := range []int{1, 199, 200, 201, 9999, 512434} {
for _, size := range []int{1, 199, 200, 201, 9999, 512434, 5012434} {
// Create some random data sample of the specified size.
randomData := make([]byte, size)
cryptorand.Read(randomData) //nolint:errcheck
@@ -328,20 +328,23 @@ func TestEndToEndReadAndSeek(t *testing.T) {
func TestEndToEndReadAndSeekWithCompression(t *testing.T) {
ctx := context.Background()
_, om := setupTest(t)
for compressorName := range compression.ByName {
for _, size := range []int{1, 199, 200, 201, 9999, 512434} {
// Create some random data sample of the specified size.
randomData := make([]byte, size)
totalBytesWritten := 0
data, om := setupTest(t)
for _, size := range []int{1, 199, 200, 201, 9999, 512434, 5012434} {
// Create some compressible data sample of the specified size.
randomData := makeCompressibleData(size)
writer := om.NewWriter(ctx, WriterOptions{Compressor: compressorName})
if _, err := writer.Write(randomData); err != nil {
t.Errorf("write error: %v", err)
}
totalBytesWritten += size
objectID, err := writer.Result()
t.Logf("oid: %v", objectID)
writer.Close()
@@ -352,8 +355,25 @@ func TestEndToEndReadAndSeekWithCompression(t *testing.T) {
verify(ctx, t, om, objectID, randomData, fmt.Sprintf("%v %v", objectID, size))
}
compressedBytes := 0
for _, d := range data {
compressedBytes += len(d)
}
// data is highly compressible, should easily compress to 1% of original size or less
ratio := float64(compressedBytes) / float64(totalBytesWritten)
if ratio > 0.01 {
t.Errorf("compression not effective for %v wrote %v, compressed %v, ratio %v", compressorName, totalBytesWritten, compressedBytes, ratio)
}
}
}
func makeCompressibleData(size int) []byte {
phrase := []byte("quick brown fox")
return append(append([]byte(nil), phrase[0:size%len(phrase)]...), bytes.Repeat(phrase, size/len(phrase))...)
}
func verify(ctx context.Context, t *testing.T, om *Manager, objectID ID, expectedData []byte, testCaseID string) {
t.Helper()

View File

@@ -98,11 +98,13 @@ func (w *objectWriter) flushBuffer() error {
w.indirectIndex[chunkID].Length = int64(length)
w.currentPosition += int64(length)
contentBytes, isCompressed, err := w.maybeCompressedContentBytes()
contentBytes, isCompressed, err := maybeCompressedContentBytes(w.compressor, w.buffer.Bytes())
if err != nil {
return errors.Wrap(err, "unable to prepare content bytes")
}
w.buffer.Reset()
contentID, err := w.repo.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix)
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, contentID, length)
@@ -121,27 +123,19 @@ func (w *objectWriter) flushBuffer() error {
return nil
}
func (w *objectWriter) maybeCompressedContentBytes() (data []byte, isCompressed bool, err error) {
if w.compressor != nil {
compressedBytes, err := w.compressor.Compress(w.buffer.Bytes())
func maybeCompressedContentBytes(comp compression.Compressor, b []byte) (data []byte, isCompressed bool, err error) {
if comp != nil {
compressedBytes, err := comp.Compress(b)
if err != nil {
return nil, false, errors.Wrap(err, "compression error")
}
if len(compressedBytes) < w.buffer.Len() {
if len(compressedBytes) < len(b) {
return compressedBytes, true, nil
}
}
var b2 bytes.Buffer
if _, err := w.buffer.WriteTo(&b2); err != nil {
return nil, false, err
}
w.buffer.Reset()
return b2.Bytes(), false, nil
return append([]byte{}, b...), false, nil
}
func (w *objectWriter) Result() (ID, error) {