object: added support for compressed objects

This commit is contained in:
Jarek Kowalski
2019-12-07 14:33:00 -08:00
parent cfa2eea1e1
commit aec3cdcb2f
4 changed files with 105 additions and 14 deletions

View File

@@ -4,6 +4,7 @@
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"io"
@@ -52,6 +53,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
splitter: om.newSplitter(),
description: opt.Description,
prefix: opt.Prefix,
compressor: CompressorsByName[opt.Compressor],
}
}
@@ -129,7 +131,7 @@ func (om *Manager) verifyObjectInternal(ctx context.Context, oid ID, tracker *co
return om.verifyIndirectObjectInternal(ctx, indexObjectID, tracker)
}
if contentID, ok := oid.ContentID(); ok {
if contentID, _, ok := oid.ContentID(); ok {
if _, err := om.contentMgr.ContentInfo(ctx, contentID); err != nil {
return err
}
@@ -207,7 +209,7 @@ func (om *Manager) flattenListChunk(rawReader io.Reader) ([]indirectObjectEntry,
}
func (om *Manager) newRawReader(ctx context.Context, objectID ID, assertLength int64) (Reader, error) {
if contentID, ok := objectID.ContentID(); ok {
if contentID, compressed, ok := objectID.ContentID(); ok {
payload, err := om.contentMgr.GetContent(ctx, contentID)
if err == content.ErrContentNotFound {
return nil, ErrObjectNotFound
@@ -217,6 +219,13 @@ func (om *Manager) newRawReader(ctx context.Context, objectID ID, assertLength i
return nil, errors.Wrap(err, "unexpected content error")
}
if compressed {
payload, err = om.decompress(payload)
if err != nil {
return nil, errors.Wrap(err, "decompression error")
}
}
if assertLength != -1 && int64(len(payload)) != assertLength {
return nil, errors.Wrapf(err, "unexpected chunk length %v, expected %v", len(payload), assertLength)
}
@@ -227,6 +236,21 @@ func (om *Manager) newRawReader(ctx context.Context, objectID ID, assertLength i
return nil, errors.Errorf("unsupported object ID: %v", objectID)
}
func (om *Manager) decompress(b []byte) ([]byte, error) {
if len(b) < 4 {
return nil, errors.Errorf("invalid compression header")
}
compressorID := binary.BigEndian.Uint32(b[0:4])
compressor := Compressors[compressorID]
if compressor == nil {
return nil, errors.Errorf("unsupported compressor %x", compressorID)
}
return compressor.Decompress(b)
}
type readerWithData struct {
io.ReadSeeker
length int64

View File

@@ -110,7 +110,7 @@ func TestWriters(t *testing.T) {
t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String())
}
if _, ok := c.objectID.ContentID(); !ok {
if _, _, ok := c.objectID.ContentID(); !ok {
if len(data) != 0 {
t.Errorf("unexpected data written to the storage: %v", data)
}
@@ -325,6 +325,34 @@ func TestEndToEndReadAndSeek(t *testing.T) {
}
}
func TestEndToEndReadAndSeekWithCompression(t *testing.T) {
ctx := context.Background()
_, om := setupTest(t)
for compressorName := range CompressorsByName {
for _, size := range []int{1, 199, 200, 201, 9999, 512434} {
// Create some random data sample of the specified size.
randomData := make([]byte, size)
writer := om.NewWriter(ctx, WriterOptions{Compressor: compressorName})
if _, err := writer.Write(randomData); err != nil {
t.Errorf("write error: %v", err)
}
objectID, err := writer.Result()
t.Logf("oid: %v", objectID)
writer.Close()
if err != nil {
t.Errorf("cannot get writer result for %v: %v", size, err)
continue
}
verify(ctx, t, om, objectID, randomData, fmt.Sprintf("%v %v", objectID, size))
}
}
}
func verify(ctx context.Context, t *testing.T, om *Manager, objectID ID, expectedData []byte, testCaseID string) {
t.Helper()

View File

@@ -52,6 +52,8 @@ type objectWriter struct {
ctx context.Context
repo *Manager
compressor Compressor
prefix content.ID
buffer bytes.Buffer
totalLength int64
@@ -93,23 +95,49 @@ func (w *objectWriter) flushBuffer() error {
w.indirectIndex[chunkID].Length = int64(length)
w.currentPosition += int64(length)
var b2 bytes.Buffer
contentBytes, isCompressed, err := w.maybeCompressedContentBytes()
if err != nil {
return errors.Wrap(err, "unable to prepare content bytes")
}
w.buffer.WriteTo(&b2) //nolint:errcheck
w.buffer.Reset()
contentID, err := w.repo.contentMgr.WriteContent(w.ctx, b2.Bytes(), w.prefix)
contentID, err := w.repo.contentMgr.WriteContent(w.ctx, contentBytes, w.prefix)
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, contentID, length)
if err != nil {
return errors.Wrapf(err, "error when flushing chunk %d of %s", chunkID, w.description)
}
w.indirectIndex[chunkID].Object = DirectObjectID(contentID)
oid := DirectObjectID(contentID)
if isCompressed {
oid = Compressed(oid)
}
w.indirectIndex[chunkID].Object = oid
return nil
}
func (w *objectWriter) maybeCompressedContentBytes() (data []byte, isCompressed bool, err error) {
if w.compressor != nil {
compressedBytes, err := w.compressor.Compress(w.buffer.Bytes())
if err != nil {
return nil, false, errors.Wrap(err, "compression error")
}
if len(compressedBytes) < w.buffer.Len() {
return compressedBytes, true, nil
}
}
var b2 bytes.Buffer
w.buffer.WriteTo(&b2) //nolint:errcheck
w.buffer.Reset()
return b2.Bytes(), false, nil
}
func (w *objectWriter) Result() (ID, error) {
if w.buffer.Len() > 0 || len(w.indirectIndex) == 0 {
if err := w.flushBuffer(); err != nil {
@@ -124,6 +152,7 @@ func (w *objectWriter) Result() (ID, error) {
iw := &objectWriter{
ctx: w.ctx,
repo: w.repo,
compressor: nil,
description: "LIST(" + w.description + ")",
splitter: w.repo.newSplitter(),
prefix: w.prefix,
@@ -150,4 +179,5 @@ func (w *objectWriter) Result() (ID, error) {
type WriterOptions struct {
Description string
Prefix content.ID // empty string or a single-character ('g'..'z')
Compressor CompressorName
}

View File

@@ -36,16 +36,20 @@ func (i ID) IndexObjectID() (ID, bool) {
}
// ContentID returns the ID of the underlying content.
func (i ID) ContentID() (content.ID, bool) {
func (i ID) ContentID() (id content.ID, compressed, ok bool) {
if strings.HasPrefix(string(i), "D") {
return content.ID(i[1:]), true
return content.ID(i[1:]), false, true
}
if strings.HasPrefix(string(i), "I") {
return "", false
return "", false, false
}
return content.ID(i), true
if strings.HasPrefix(string(i), "Z") {
return content.ID(i[1:]), true, true
}
return content.ID(i), false, true
}
// Validate checks the ID format for validity and reports any errors.
@@ -58,7 +62,7 @@ func (i ID) Validate() error {
return nil
}
if contentID, ok := i.ContentID(); ok {
if contentID, _, ok := i.ContentID(); ok {
if len(contentID) < 2 {
return errors.Errorf("missing content ID")
}
@@ -87,6 +91,11 @@ func DirectObjectID(contentID content.ID) ID {
return ID(contentID)
}
// Compressed returns object ID with 'Z' prefix indicating it's compressed.
func Compressed(objectID ID) ID {
return "Z" + objectID
}
// IndirectObjectID returns indirect object ID based on the underlying index object ID.
func IndirectObjectID(indexObjectID ID) ID {
return "I" + indexObjectID