mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 06:34:46 -04:00
Added support for reading new ObjectID format - section, which can be used to create packed objects:
S{start},{length},{baseID}
This commit is contained in:
@@ -156,3 +156,7 @@ func (r *objectReader) Seek(offset int64, whence int) (int64, error) {
|
||||
func (r *objectReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *objectReader) Length() int64 {
|
||||
return r.totalLength
|
||||
}
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
)
|
||||
@@ -38,7 +40,11 @@
|
||||
// ObjectIDTypeList represents ID of an object whose data is stored in mutliple storage blocks.
|
||||
// The value of the ObjectID is the list chunk, which lists object IDs that need to be concatenated
|
||||
// to form the contents.
|
||||
ObjectIDTypeList ObjectIDType = "L" // list chunk
|
||||
ObjectIDTypeList ObjectIDType = "L"
|
||||
|
||||
// ObjectIDTypeSection represents ID of an object whose data is a section of another object.
|
||||
// The format is S{offset},{length},{base}
|
||||
ObjectIDTypeSection ObjectIDType = "S"
|
||||
)
|
||||
|
||||
// IsStored determines whether data for the given chunk type is stored in the storage
|
||||
@@ -81,6 +87,63 @@ func (c ObjectID) InlineData() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseNumberUntilComma parses a string of the form "{x},{remainder}" where x is a 64-bit number and remainder is arbitrary string.
|
||||
// Returns the number and remainder.
|
||||
func parseNumberUntilComma(s string) (int64, string, error) {
|
||||
comma := strings.IndexByte(s, ',')
|
||||
if comma < 0 {
|
||||
return 0, "", errors.New("missing comma")
|
||||
}
|
||||
|
||||
num, err := strconv.ParseInt(s[0:comma], 10, 64)
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
|
||||
return num, s[comma+1:], nil
|
||||
}
|
||||
|
||||
func parseSectionInfoString(s string) (int64, int64, ObjectID, error) {
|
||||
if ObjectIDType(s[0]) != ObjectIDTypeSection {
|
||||
return 0, -1, "", errors.New("not a section object")
|
||||
}
|
||||
|
||||
var start, length int64
|
||||
var err error
|
||||
|
||||
start, s, err = parseNumberUntilComma(s[1:])
|
||||
if err != nil {
|
||||
return 0, -1, "", err
|
||||
}
|
||||
|
||||
length, s, err = parseNumberUntilComma(s)
|
||||
if err != nil {
|
||||
return 0, -1, "", err
|
||||
}
|
||||
|
||||
oid, err := ParseObjectID(s)
|
||||
if err != nil {
|
||||
return 0, -1, "", err
|
||||
}
|
||||
|
||||
return start, length, oid, nil
|
||||
}
|
||||
|
||||
// SectionInfo returns start, length and the base ID of a section object.
|
||||
func (c ObjectID) SectionInfo() (start int64, length int64, baseID ObjectID) {
|
||||
if c.Type() != ObjectIDTypeSection {
|
||||
return 0, 0, ""
|
||||
}
|
||||
|
||||
start, length, oid, err := parseSectionInfoString(string(c))
|
||||
if err != nil {
|
||||
// This should not happen if we came in through ParseObjectID
|
||||
panic("invalid section info: " + string(c))
|
||||
}
|
||||
|
||||
return start, length, oid
|
||||
}
|
||||
|
||||
// BlockID returns identifier of the storage block. For inline chunk IDs, an empty string is returned.
|
||||
func (c ObjectID) BlockID() string {
|
||||
if c.Type().IsStored() {
|
||||
@@ -125,6 +188,11 @@ func NewInlineObjectID(data []byte) ObjectID {
|
||||
return ObjectID("T" + string(data))
|
||||
}
|
||||
|
||||
// NewSectionObjectID returns new ObjectID representing a section of an object with a given base ID, start offset and length.
|
||||
func NewSectionObjectID(start, length int64, baseID ObjectID) ObjectID {
|
||||
return ObjectID(fmt.Sprintf("S%v,%v,%v", start, length, baseID))
|
||||
}
|
||||
|
||||
// ParseObjectID converts the specified string into ObjectID.
|
||||
func ParseObjectID(objectIDString string) (ObjectID, error) {
|
||||
if len(objectIDString) >= 1 {
|
||||
@@ -132,6 +200,11 @@ func ParseObjectID(objectIDString string) (ObjectID, error) {
|
||||
content := objectIDString[1:]
|
||||
|
||||
switch chunkType {
|
||||
case ObjectIDTypeSection:
|
||||
if _, _, _, err := parseSectionInfoString(objectIDString); err == nil {
|
||||
return ObjectID(objectIDString), nil
|
||||
}
|
||||
|
||||
case ObjectIDTypeText:
|
||||
return ObjectID(objectIDString), nil
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ type ObjectReader interface {
|
||||
io.Reader
|
||||
io.Seeker
|
||||
io.Closer
|
||||
Length() int64
|
||||
}
|
||||
|
||||
// Since we never share keys, using constant IV is fine.
|
||||
@@ -114,6 +115,15 @@ func (repo *repository) NewWriter(options ...WriterOption) ObjectWriter {
|
||||
}
|
||||
|
||||
func (repo *repository) Open(objectID ObjectID) (ObjectReader, error) {
|
||||
if start, length, baseID := objectID.SectionInfo(); baseID != "" {
|
||||
baseReader, err := repo.Open(baseID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create base reader: %v", err)
|
||||
}
|
||||
|
||||
return newObjectSectionReader(start, length, baseReader)
|
||||
}
|
||||
|
||||
r, err := repo.newRawReader(objectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -373,12 +383,20 @@ func (repo *repository) verifyChecksum(data []byte, blockID string) error {
|
||||
|
||||
type readerWithData struct {
|
||||
io.ReadSeeker
|
||||
length int64
|
||||
}
|
||||
|
||||
func (rwd *readerWithData) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newObjectReaderWithData(data []byte) ObjectReader {
|
||||
return &readerWithData{bytes.NewReader(data)}
|
||||
func (rwd *readerWithData) Length() int64 {
|
||||
return rwd.length
|
||||
}
|
||||
|
||||
func newObjectReaderWithData(data []byte) ObjectReader {
|
||||
return &readerWithData{
|
||||
ReadSeeker: bytes.NewReader(data),
|
||||
length: int64(len(data)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,6 +266,12 @@ func TestReader(t *testing.T) {
|
||||
{"T", []byte{}},
|
||||
{"Tfoo\nbar", []byte("foo\nbar")},
|
||||
{"Da76999788386641a3ec798554f1fe7e6", storedPayload},
|
||||
{"S0,2,BAQIDBA", []byte{1, 2}},
|
||||
{"S1,3,BAQIDBA", []byte{2, 3, 4}},
|
||||
{"S1,5,BAQIDBA", []byte{2, 3, 4}},
|
||||
{"S0,0,BAQIDBA", []byte{}},
|
||||
{"S0,2,Da76999788386641a3ec798554f1fe7e6", storedPayload[0:2]},
|
||||
{"S2,4,Da76999788386641a3ec798554f1fe7e6", storedPayload[2:6]},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
@@ -290,6 +296,7 @@ func TestReader(t *testing.T) {
|
||||
t.Errorf("incorrect payload for %v: expected: %v got: %v", objectID, c.payload, d)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,29 +355,47 @@ func TestEndToEndReadAndSeek(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
reader, err := repo.Open(objectID)
|
||||
if err != nil {
|
||||
t.Errorf("cannot get reader for %v/%v: %v %v", forceStored, size, objectID, err)
|
||||
continue
|
||||
verify(t, repo, objectID, randomData, fmt.Sprintf("%v %v/%v", objectID, forceStored, size))
|
||||
|
||||
if size > 1 {
|
||||
sectionID := NewSectionObjectID(0, int64(size/2), objectID)
|
||||
verify(t, repo, sectionID, randomData[0:10], fmt.Sprintf("%v %v/%v", sectionID, forceStored, size))
|
||||
}
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
sampleSize := int(rand.Int31n(300))
|
||||
seekOffset := int(rand.Int31n(int32(len(randomData))))
|
||||
if seekOffset+sampleSize > len(randomData) {
|
||||
sampleSize = len(randomData) - seekOffset
|
||||
}
|
||||
if sampleSize > 0 {
|
||||
got := make([]byte, sampleSize)
|
||||
reader.Seek(int64(seekOffset), 0)
|
||||
reader.Read(got)
|
||||
if size > 1 {
|
||||
sectionID := NewSectionObjectID(int64(1), int64(size-1), objectID)
|
||||
verify(t, repo, sectionID, randomData[1:], fmt.Sprintf("%v %v/%v", sectionID, forceStored, size))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expected := randomData[seekOffset : seekOffset+sampleSize]
|
||||
func verify(t *testing.T, repo Repository, objectID ObjectID, expectedData []byte, testCaseID string) {
|
||||
reader, err := repo.Open(objectID)
|
||||
if err != nil {
|
||||
t.Errorf("cannot get reader for %v: %v", testCaseID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !bytes.Equal(expected, got) {
|
||||
t.Errorf("incorrect data read for %v/%v: expected: %v, got: %v", forceStored, size, expected, got)
|
||||
}
|
||||
}
|
||||
for i := 0; i < 20; i++ {
|
||||
sampleSize := int(rand.Int31n(300))
|
||||
seekOffset := int(rand.Int31n(int32(len(expectedData))))
|
||||
if seekOffset+sampleSize > len(expectedData) {
|
||||
sampleSize = len(expectedData) - seekOffset
|
||||
}
|
||||
if sampleSize > 0 {
|
||||
got := make([]byte, sampleSize)
|
||||
if offset, err := reader.Seek(int64(seekOffset), 0); err != nil || offset != int64(seekOffset) {
|
||||
t.Errorf("seek error: %v offset=%v expected:%v", err, offset, seekOffset)
|
||||
}
|
||||
if n, err := reader.Read(got); err != nil || n != sampleSize {
|
||||
t.Errorf("invalid data: n=%v, expected=%v, err:%v", n, sampleSize, err)
|
||||
}
|
||||
|
||||
expected := expectedData[seekOffset : seekOffset+sampleSize]
|
||||
|
||||
if !bytes.Equal(expected, got) {
|
||||
t.Errorf("incorrect data read for %v: expected: %x, got: %x", testCaseID, expected, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
70
repo/section_reader.go
Normal file
70
repo/section_reader.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package repo
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
type objectSectionReader struct {
|
||||
baseReader ObjectReader
|
||||
start, length int64
|
||||
currentPosition int64
|
||||
}
|
||||
|
||||
func (osr *objectSectionReader) Close() error {
|
||||
return osr.baseReader.Close()
|
||||
}
|
||||
|
||||
func (osr *objectSectionReader) Length() int64 {
|
||||
return osr.length
|
||||
}
|
||||
|
||||
func (osr *objectSectionReader) Read(p []byte) (n int, err error) {
|
||||
if osr.currentPosition >= osr.length {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if max := osr.length - osr.currentPosition; int64(len(p)) > max {
|
||||
p = p[0:max]
|
||||
}
|
||||
n, err = osr.baseReader.Read(p)
|
||||
osr.currentPosition += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
func (osr *objectSectionReader) Seek(offset int64, whence int) (int64, error) {
|
||||
if whence == 1 {
|
||||
return osr.Seek(osr.currentPosition+offset, 0)
|
||||
}
|
||||
|
||||
if whence == 2 {
|
||||
return osr.Seek(osr.length+offset, 0)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return -1, fmt.Errorf("Invalid seek.")
|
||||
}
|
||||
|
||||
if offset > osr.length {
|
||||
offset = osr.length
|
||||
}
|
||||
|
||||
osr.currentPosition = offset
|
||||
|
||||
_, err := osr.baseReader.Seek(osr.start+osr.currentPosition, 0)
|
||||
return osr.currentPosition, err
|
||||
}
|
||||
|
||||
func newObjectSectionReader(start, length int64, baseReader ObjectReader) (ObjectReader, error) {
|
||||
r := &objectSectionReader{
|
||||
baseReader: baseReader,
|
||||
start: start,
|
||||
length: length,
|
||||
}
|
||||
|
||||
if _, err := r.Seek(0, 0); err != nil {
|
||||
r.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
Reference in New Issue
Block a user