From aec3cdcb2f612c42ebd1aded552ee197596c18ee Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 7 Dec 2019 14:33:00 -0800 Subject: [PATCH] object: added support for compressed objects --- repo/object/object_manager.go | 28 ++++++++++++++++++-- repo/object/object_manager_test.go | 30 ++++++++++++++++++++- repo/object/object_writer.go | 42 +++++++++++++++++++++++++----- repo/object/objectid.go | 19 ++++++++++---- 4 files changed, 105 insertions(+), 14 deletions(-) diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index dc0104c77..8c9812608 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -4,6 +4,7 @@ import ( "bytes" "context" + "encoding/binary" "encoding/json" "io" @@ -52,6 +53,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer { splitter: om.newSplitter(), description: opt.Description, prefix: opt.Prefix, + compressor: CompressorsByName[opt.Compressor], } } @@ -129,7 +131,7 @@ func (om *Manager) verifyObjectInternal(ctx context.Context, oid ID, tracker *co return om.verifyIndirectObjectInternal(ctx, indexObjectID, tracker) } - if contentID, ok := oid.ContentID(); ok { + if contentID, _, ok := oid.ContentID(); ok { if _, err := om.contentMgr.ContentInfo(ctx, contentID); err != nil { return err } @@ -207,7 +209,7 @@ func (om *Manager) flattenListChunk(rawReader io.Reader) ([]indirectObjectEntry, } func (om *Manager) newRawReader(ctx context.Context, objectID ID, assertLength int64) (Reader, error) { - if contentID, ok := objectID.ContentID(); ok { + if contentID, compressed, ok := objectID.ContentID(); ok { payload, err := om.contentMgr.GetContent(ctx, contentID) if err == content.ErrContentNotFound { return nil, ErrObjectNotFound @@ -217,6 +219,13 @@ func (om *Manager) newRawReader(ctx context.Context, objectID ID, assertLength i return nil, errors.Wrap(err, "unexpected content error") } + if compressed { + payload, err = om.decompress(payload) + if err != nil { + return nil, errors.Wrap(err, "decompression error") + } + } + if assertLength != -1 && int64(len(payload)) != assertLength { return nil, errors.Wrapf(err, "unexpected chunk length %v, expected %v", len(payload), assertLength) } @@ -227,6 +236,21 @@ func (om *Manager) newRawReader(ctx context.Context, objectID ID, assertLength i return nil, errors.Errorf("unsupported object ID: %v", objectID) } +func (om *Manager) decompress(b []byte) ([]byte, error) { + if len(b) < 4 { + return nil, errors.Errorf("invalid compression header") + } + + compressorID := binary.BigEndian.Uint32(b[0:4]) + + compressor := Compressors[compressorID] + if compressor == nil { + return nil, errors.Errorf("unsupported compressor %x", compressorID) + } + + return compressor.Decompress(b) +} + type readerWithData struct { io.ReadSeeker length int64 diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index a2e85def9..a12b26972 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -110,7 +110,7 @@ func TestWriters(t *testing.T) { t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String()) } - if _, ok := c.objectID.ContentID(); !ok { + if _, _, ok := c.objectID.ContentID(); !ok { if len(data) != 0 { t.Errorf("unexpected data written to the storage: %v", data) } @@ -325,6 +325,34 @@ func TestEndToEndReadAndSeek(t *testing.T) { } } +func TestEndToEndReadAndSeekWithCompression(t *testing.T) { + ctx := context.Background() + _, om := setupTest(t) + + for compressorName := range CompressorsByName { + for _, size := range []int{1, 199, 200, 201, 9999, 512434} { + // Create some random data sample of the specified size. + randomData := make([]byte, size) + + writer := om.NewWriter(ctx, WriterOptions{Compressor: compressorName}) + if _, err := writer.Write(randomData); err != nil { + t.Errorf("write error: %v", err) + } + + objectID, err := writer.Result() + t.Logf("oid: %v", objectID) + + writer.Close() + + if err != nil { + t.Errorf("cannot get writer result for %v: %v", size, err) + continue + } + + verify(ctx, t, om, objectID, randomData, fmt.Sprintf("%v %v", objectID, size)) + } + } +} func verify(ctx context.Context, t *testing.T, om *Manager, objectID ID, expectedData []byte, testCaseID string) { t.Helper() diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index dbeb83f69..68c1c207b 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -52,6 +52,8 @@ type objectWriter struct { ctx context.Context repo *Manager + compressor Compressor + prefix content.ID buffer bytes.Buffer totalLength int64 @@ -93,23 +95,49 @@ func (w *objectWriter) flushBuffer() error { w.indirectIndex[chunkID].Length = int64(length) w.currentPosition += int64(length) - var b2 bytes.Buffer + contentBytes, isCompressed, err := w.maybeCompressedContentBytes() + if err != nil { + return errors.Wrap(err, "unable to prepare content bytes") + } - w.buffer.WriteTo(&b2) //nolint:errcheck - w.buffer.Reset() - - contentID, err := w.repo.contentMgr.WriteContent(w.ctx, b2.Bytes(), w.prefix) + contentID, err := w.repo.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix) w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, contentID, length) if err != nil { return errors.Wrapf(err, "error when flushing chunk %d of %s", chunkID, w.description) } - w.indirectIndex[chunkID].Object = DirectObjectID(contentID) + oid := DirectObjectID(contentID) + + if isCompressed { + oid = Compressed(oid) + } + + w.indirectIndex[chunkID].Object = oid return nil } +func (w *objectWriter) maybeCompressedContentBytes() (data []byte, isCompressed bool, err error) { + if w.compressor != nil { + compressedBytes, err := w.compressor.Compress(w.buffer.Bytes()) + if err != nil { + return nil, false, errors.Wrap(err, "compression error") + } + + if len(compressedBytes) < w.buffer.Len() { + return compressedBytes, true, nil + } + } + + var b2 bytes.Buffer + + w.buffer.WriteTo(&b2) //nolint:errcheck + w.buffer.Reset() + + return b2.Bytes(), false, nil +} + func (w *objectWriter) Result() (ID, error) { if w.buffer.Len() > 0 || len(w.indirectIndex) == 0 { if err := w.flushBuffer(); err != nil { @@ -124,6 +152,7 @@ func (w *objectWriter) Result() (ID, error) { iw := &objectWriter{ ctx: w.ctx, repo: w.repo, + compressor: nil, description: "LIST(" + w.description + ")", splitter: w.repo.newSplitter(), prefix: w.prefix, @@ -150,4 +179,5 @@ func (w *objectWriter) Result() (ID, error) { type WriterOptions struct { Description string Prefix content.ID // empty string or a single-character ('g'..'z') + Compressor CompressorName } diff --git a/repo/object/objectid.go b/repo/object/objectid.go index 4d487c7d2..b9d7bf897 100644 --- a/repo/object/objectid.go +++ b/repo/object/objectid.go @@ -36,16 +36,20 @@ func (i ID) IndexObjectID() (ID, bool) { } // ContentID returns the ID of the underlying content. -func (i ID) ContentID() (content.ID, bool) { +func (i ID) ContentID() (id content.ID, compressed, ok bool) { if strings.HasPrefix(string(i), "D") { - return content.ID(i[1:]), true + return content.ID(i[1:]), false, true } if strings.HasPrefix(string(i), "I") { - return "", false + return "", false, false } - return content.ID(i), true + if strings.HasPrefix(string(i), "Z") { + return content.ID(i[1:]), true, true + } + + return content.ID(i), false, true } // Validate checks the ID format for validity and reports any errors. @@ -58,7 +62,7 @@ func (i ID) Validate() error { return nil } - if contentID, ok := i.ContentID(); ok { + if contentID, _, ok := i.ContentID(); ok { if len(contentID) < 2 { return errors.Errorf("missing content ID") } @@ -87,6 +91,11 @@ func DirectObjectID(contentID content.ID) ID { return ID(contentID) } +// Compressed returns object ID with 'Z' prefix indicating it's compressed. +func Compressed(objectID ID) ID { + return "Z" + objectID +} + // IndirectObjectID returns indirect object ID based on the underlying index object ID. func IndirectObjectID(indexObjectID ID) ID { return "I" + indexObjectID