From 56f4a2bf40fba93a0f9e04bf5b2a2b3b3e9dc5da Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 30 Jul 2016 12:13:34 -0700 Subject: [PATCH] Added support for reading new ObjectID format - section, which can be used to create packed objects: S{start},{length},{baseID} --- repo/object_reader.go | 4 +++ repo/objectid.go | 75 ++++++++++++++++++++++++++++++++++++++++- repo/repository.go | 22 ++++++++++-- repo/repository_test.go | 63 +++++++++++++++++++++++----------- repo/section_reader.go | 70 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 212 insertions(+), 22 deletions(-) create mode 100644 repo/section_reader.go diff --git a/repo/object_reader.go b/repo/object_reader.go index 60e995baa..558724526 100644 --- a/repo/object_reader.go +++ b/repo/object_reader.go @@ -156,3 +156,7 @@ func (r *objectReader) Seek(offset int64, whence int) (int64, error) { func (r *objectReader) Close() error { return nil } + +func (r *objectReader) Length() int64 { + return r.totalLength +} diff --git a/repo/objectid.go b/repo/objectid.go index b58cea89b..017940ab4 100644 --- a/repo/objectid.go +++ b/repo/objectid.go @@ -3,7 +3,9 @@ import ( "encoding/base64" "encoding/hex" + "errors" "fmt" + "strconv" "strings" "unicode/utf8" ) @@ -38,7 +40,11 @@ // ObjectIDTypeList represents ID of an object whose data is stored in mutliple storage blocks. // The value of the ObjectID is the list chunk, which lists object IDs that need to be concatenated // to form the contents. - ObjectIDTypeList ObjectIDType = "L" // list chunk + ObjectIDTypeList ObjectIDType = "L" + + // ObjectIDTypeSection represents ID of an object whose data is a section of another object. + // The format is S{offset},{length},{base} + ObjectIDTypeSection ObjectIDType = "S" ) // IsStored determines whether data for the given chunk type is stored in the storage @@ -81,6 +87,63 @@ func (c ObjectID) InlineData() []byte { return nil } +// parseNumberUntilComma parses a string of the form "{x},{remainder}" where x is a 64-bit number and remainder is arbitrary string. +// Returns the number and remainder. +func parseNumberUntilComma(s string) (int64, string, error) { + comma := strings.IndexByte(s, ',') + if comma < 0 { + return 0, "", errors.New("missing comma") + } + + num, err := strconv.ParseInt(s[0:comma], 10, 64) + if err != nil { + return 0, "", err + } + + return num, s[comma+1:], nil +} + +func parseSectionInfoString(s string) (int64, int64, ObjectID, error) { + if ObjectIDType(s[0]) != ObjectIDTypeSection { + return 0, -1, "", errors.New("not a section object") + } + + var start, length int64 + var err error + + start, s, err = parseNumberUntilComma(s[1:]) + if err != nil { + return 0, -1, "", err + } + + length, s, err = parseNumberUntilComma(s) + if err != nil { + return 0, -1, "", err + } + + oid, err := ParseObjectID(s) + if err != nil { + return 0, -1, "", err + } + + return start, length, oid, nil +} + +// SectionInfo returns start, length and the base ID of a section object. +func (c ObjectID) SectionInfo() (start int64, length int64, baseID ObjectID) { + if c.Type() != ObjectIDTypeSection { + return 0, 0, "" + } + + start, length, oid, err := parseSectionInfoString(string(c)) + if err != nil { + // This should not happen if we came in through ParseObjectID + panic("invalid section info: " + string(c)) + } + + return start, length, oid +} + // BlockID returns identifier of the storage block. For inline chunk IDs, an empty string is returned. func (c ObjectID) BlockID() string { if c.Type().IsStored() { @@ -125,6 +188,11 @@ func NewInlineObjectID(data []byte) ObjectID { return ObjectID("T" + string(data)) } +// NewSectionObjectID returns new ObjectID representing a section of an object with a given base ID, start offset and length. +func NewSectionObjectID(start, length int64, baseID ObjectID) ObjectID { + return ObjectID(fmt.Sprintf("S%v,%v,%v", start, length, baseID)) +} + // ParseObjectID converts the specified string into ObjectID. func ParseObjectID(objectIDString string) (ObjectID, error) { if len(objectIDString) >= 1 { @@ -132,6 +200,11 @@ func ParseObjectID(objectIDString string) (ObjectID, error) { content := objectIDString[1:] switch chunkType { + case ObjectIDTypeSection: + if _, _, _, err := parseSectionInfoString(objectIDString); err == nil { + return ObjectID(objectIDString), nil + } + case ObjectIDTypeText: return ObjectID(objectIDString), nil diff --git a/repo/repository.go b/repo/repository.go index 2710e4320..09820f831 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -27,6 +27,7 @@ type ObjectReader interface { io.Reader io.Seeker io.Closer + Length() int64 } // Since we never share keys, using constant IV is fine. @@ -114,6 +115,15 @@ func (repo *repository) NewWriter(options ...WriterOption) ObjectWriter { } func (repo *repository) Open(objectID ObjectID) (ObjectReader, error) { + if start, length, baseID := objectID.SectionInfo(); baseID != "" { + baseReader, err := repo.Open(baseID) + if err != nil { + return nil, fmt.Errorf("cannot create base reader: %v", err) + } + + return newObjectSectionReader(start, length, baseReader) + } + r, err := repo.newRawReader(objectID) if err != nil { return nil, err @@ -373,12 +383,20 @@ func (repo *repository) verifyChecksum(data []byte, blockID string) error { type readerWithData struct { io.ReadSeeker + length int64 } func (rwd *readerWithData) Close() error { return nil } -func newObjectReaderWithData(data []byte) ObjectReader { - return &readerWithData{bytes.NewReader(data)} +func (rwd *readerWithData) Length() int64 { + return rwd.length +} + +func newObjectReaderWithData(data []byte) ObjectReader { + return &readerWithData{ + ReadSeeker: bytes.NewReader(data), + length: int64(len(data)), + } } diff --git a/repo/repository_test.go b/repo/repository_test.go index bb884714b..a0ba2b684 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -266,6 +266,12 @@ func TestReader(t *testing.T) { {"T", []byte{}}, {"Tfoo\nbar", []byte("foo\nbar")}, {"Da76999788386641a3ec798554f1fe7e6", storedPayload}, + {"S0,2,BAQIDBA", []byte{1, 2}}, + {"S1,3,BAQIDBA", []byte{2, 3, 4}}, + {"S1,5,BAQIDBA", []byte{2, 3, 4}}, + {"S0,0,BAQIDBA", []byte{}}, + {"S0,2,Da76999788386641a3ec798554f1fe7e6", storedPayload[0:2]}, + {"S2,4,Da76999788386641a3ec798554f1fe7e6", storedPayload[2:6]}, } for _, c := range cases { @@ -290,6 +296,7 @@ func TestReader(t *testing.T) { t.Errorf("incorrect payload for %v: expected: %v got: %v", objectID, c.payload, d) continue } + } } @@ -348,29 +355,47 @@ func TestEndToEndReadAndSeek(t *testing.T) { continue } - reader, err := repo.Open(objectID) - if err != nil { - t.Errorf("cannot get reader for %v/%v: %v %v", forceStored, size, objectID, err) - continue + verify(t, repo, objectID, randomData, fmt.Sprintf("%v %v/%v", objectID, forceStored, size)) + + if size > 1 { + sectionID := NewSectionObjectID(0, int64(size/2), objectID) + verify(t, repo, sectionID, randomData[0:10], fmt.Sprintf("%v %v/%v", sectionID, forceStored, size)) } - for i := 0; i < 20; i++ { - sampleSize := int(rand.Int31n(300)) - seekOffset := int(rand.Int31n(int32(len(randomData)))) - if seekOffset+sampleSize > len(randomData) { - sampleSize = len(randomData) - seekOffset - } - if sampleSize > 0 { - got := make([]byte, sampleSize) - reader.Seek(int64(seekOffset), 0) - reader.Read(got) + if size > 1 { + sectionID := NewSectionObjectID(int64(1), int64(size-1), objectID) + verify(t, repo, sectionID, randomData[1:], fmt.Sprintf("%v %v/%v", sectionID, forceStored, size)) + } + } + } +} - expected := randomData[seekOffset : seekOffset+sampleSize] +func verify(t *testing.T, repo Repository, objectID ObjectID, expectedData []byte, testCaseID string) { + reader, err := repo.Open(objectID) + if err != nil { + t.Errorf("cannot get reader for %v: %v", testCaseID, err) + return + } - if !bytes.Equal(expected, got) { - t.Errorf("incorrect data read for %v/%v: expected: %v, got: %v", forceStored, size, expected, got) - } - } + for i := 0; i < 20; i++ { + sampleSize := int(rand.Int31n(300)) + seekOffset := int(rand.Int31n(int32(len(expectedData)))) + if seekOffset+sampleSize > len(expectedData) { + sampleSize = len(expectedData) - seekOffset + } + if sampleSize > 0 { + got := make([]byte, sampleSize) + if offset, err := reader.Seek(int64(seekOffset), 0); err != nil || offset != int64(seekOffset) { + t.Errorf("seek error: %v offset=%v expected:%v", err, offset, seekOffset) + } + if n, err := reader.Read(got); err != nil || n != sampleSize { + t.Errorf("invalid data: n=%v, expected=%v, err:%v", n, sampleSize, err) + } + + expected := expectedData[seekOffset : seekOffset+sampleSize] + + if !bytes.Equal(expected, got) { + t.Errorf("incorrect data read for %v: expected: %x, got: %x", testCaseID, expected, got) } } } diff --git a/repo/section_reader.go b/repo/section_reader.go new file mode 100644 index 000000000..d79f6f1e2 --- /dev/null +++ b/repo/section_reader.go @@ -0,0 +1,70 @@ +package repo + +import ( + "fmt" + "io" +) + +type objectSectionReader struct { + baseReader ObjectReader + start, length int64 + currentPosition int64 +} + +func (osr *objectSectionReader) Close() error { + return osr.baseReader.Close() +} + +func (osr *objectSectionReader) Length() int64 { + return osr.length +} + +func (osr *objectSectionReader) Read(p []byte) (n int, err error) { + if osr.currentPosition >= osr.length { + return 0, io.EOF + } + if max := osr.length - osr.currentPosition; int64(len(p)) > max { + p = p[0:max] + } + n, err = osr.baseReader.Read(p) + osr.currentPosition += int64(n) + return +} + +func (osr *objectSectionReader) Seek(offset int64, whence int) (int64, error) { + if whence == 1 { + return osr.Seek(osr.currentPosition+offset, 0) + } + + if whence == 2 { + return osr.Seek(osr.length+offset, 0) + } + + if offset < 0 { + return -1, fmt.Errorf("Invalid seek.") + } + + if offset > osr.length { + offset = osr.length + } + + osr.currentPosition = offset + + _, err := osr.baseReader.Seek(osr.start+osr.currentPosition, 0) + return osr.currentPosition, err +} + +func newObjectSectionReader(start, length int64, baseReader ObjectReader) (ObjectReader, error) { + r := &objectSectionReader{ + baseReader: baseReader, + start: start, + length: length, + } + + if _, err := r.Seek(0, 0); err != nil { + r.Close() + return nil, err + } + + return r, nil +}