added option to read portions of blob storage blocks

This commit is contained in:
Jarek Kowalski
2017-08-12 23:11:01 -07:00
parent 097cf2d0b3
commit 8dda3bf2c5
11 changed files with 101 additions and 70 deletions

View File

@@ -136,7 +136,7 @@ func (c *cachingStorage) DeleteBlock(id string) error {
return nil
}
func (c *cachingStorage) GetBlock(id string) ([]byte, error) {
func (c *cachingStorage) GetBlock(id string, offset, length int64) ([]byte, error) {
c.Lock(id)
defer c.Unlock(id)
@@ -145,14 +145,14 @@ func (c *cachingStorage) GetBlock(id string) ([]byte, error) {
return nil, blob.ErrBlockNotFound
}
v, err := c.cache.GetBlock(id)
v, err := c.cache.GetBlock(id, offset, length)
if err == nil {
return v, nil
}
}
// Download from master
b, err := c.master.GetBlock(id)
b, err := c.master.GetBlock(id, 0, -1)
if err == nil {
l := int64(len(b))

View File

@@ -4,6 +4,7 @@
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
@@ -43,18 +44,24 @@ func (fs *fsStorage) BlockSize(blockID string) (int64, error) {
return 0, err
}
func (fs *fsStorage) GetBlock(blockID string) ([]byte, error) {
func (fs *fsStorage) GetBlock(blockID string, offset, length int64) ([]byte, error) {
_, path := fs.getShardedPathAndFilePath(blockID)
d, err := ioutil.ReadFile(path)
if err == nil {
return d, err
}
f, err := os.Open(path)
if os.IsNotExist(err) {
return nil, blob.ErrBlockNotFound
}
return nil, err
if err != nil {
return nil, err
}
if length < 0 {
return ioutil.ReadAll(f)
}
f.Seek(offset, os.SEEK_SET)
return ioutil.ReadAll(io.LimitReader(f, length))
}
func getstringFromFileName(name string) (string, bool) {

View File

@@ -61,9 +61,9 @@ func (gcs *gcsStorage) BlockSize(b string) (int64, error) {
return v.(int64), nil
}
func (gcs *gcsStorage) GetBlock(b string) ([]byte, error) {
func (gcs *gcsStorage) GetBlock(b string, offset, length int64) ([]byte, error) {
attempt := func() (interface{}, error) {
reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewReader(gcs.ctx)
reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewRangeReader(gcs.ctx, offset, length)
if err != nil {
return nil, err
}
@@ -73,7 +73,7 @@ func (gcs *gcsStorage) GetBlock(b string) ([]byte, error) {
return v, err
}
v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q)", b), attempt)
v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q,%v,%v)", b, offset, length), attempt)
if err != nil {
return nil, translateError(err)
}

View File

@@ -18,18 +18,18 @@ func (s *loggingStorage) BlockSize(id string) (int64, error) {
t0 := time.Now()
result, err := s.base.BlockSize(id)
dt := time.Since(t0)
s.printf(s.prefix+"BlockSize(%#v)=%#v,%#v took %v", id, result, err, dt)
s.printf(s.prefix+"BlockSize(%q)=%#v,%#v took %v", id, result, err, dt)
return result, err
}
func (s *loggingStorage) GetBlock(id string) ([]byte, error) {
func (s *loggingStorage) GetBlock(id string, offset, length int64) ([]byte, error) {
t0 := time.Now()
result, err := s.base.GetBlock(id)
result, err := s.base.GetBlock(id, offset, length)
dt := time.Since(t0)
if len(result) < 20 {
s.printf(s.prefix+"GetBlock(%#v)=(%#v, %#v) took %v", id, result, err, dt)
s.printf(s.prefix+"GetBlock(%q,%v,%v)=(%#v, %#v) took %v", id, result, err, dt)
} else {
s.printf(s.prefix+"GetBlock(%#v)=({%#v bytes}, %#v) took %v", id, len(result), err, dt)
s.printf(s.prefix+"GetBlock(%q,%v,%v)=({%#v bytes}, %#v) took %v", id, len(result), err, dt)
}
return result, err
}
@@ -38,7 +38,7 @@ func (s *loggingStorage) PutBlock(id string, data []byte, options blob.PutOption
t0 := time.Now()
err := s.base.PutBlock(id, data, options)
dt := time.Since(t0)
s.printf(s.prefix+"PutBlock(%#v, options=%v, len=%v)=%#v took %v", id, options, len(data), err, dt)
s.printf(s.prefix+"PutBlock(%q, options=%v, len=%v)=%#v took %v", id, options, len(data), err, dt)
return err
}
@@ -46,16 +46,16 @@ func (s *loggingStorage) DeleteBlock(id string) error {
t0 := time.Now()
err := s.base.DeleteBlock(id)
dt := time.Since(t0)
s.printf(s.prefix+"DeleteBlock(%#v)=%#v took %v", id, err, dt)
s.printf(s.prefix+"DeleteBlock(%q)=%#v took %v", id, err, dt)
return err
}
func (s *loggingStorage) ListBlocks(prefix string) (chan blob.BlockMetadata, blob.CancelFunc) {
t0 := time.Now()
ch, cf := s.base.ListBlocks(prefix)
s.printf(s.prefix+"ListBlocks(%#v) took %v", prefix, time.Since(t0))
s.printf(s.prefix+"ListBlocks(%q) took %v", prefix, time.Since(t0))
return ch, func() {
s.printf(s.prefix+"Cancelled ListBlocks(%#v)after %v", prefix, time.Since(t0))
s.printf(s.prefix+"Cancelled ListBlocks(%q)after %v", prefix, time.Since(t0))
cf()
}
}

View File

@@ -24,7 +24,7 @@ type Storage interface {
BlockSize(id string) (int64, error)
PutBlock(id string, data []byte, options PutOptions) error
DeleteBlock(id string) error
GetBlock(id string) ([]byte, error)
GetBlock(id string, offset, length int64) ([]byte, error)
ListBlocks(prefix string) (chan (BlockMetadata), CancelFunc)
}

View File

@@ -13,7 +13,7 @@
// AssertGetBlock asserts that the specified storage block has correct content.
func AssertGetBlock(t *testing.T, s blob.Storage, block string, expected []byte) {
b, err := s.GetBlock(block)
b, err := s.GetBlock(block, 0, -1)
if err != nil {
t.Errorf(errorPrefix()+"GetBlock(%v) returned error %v, expected data: %v", block, err, expected)
return
@@ -26,7 +26,7 @@ func AssertGetBlock(t *testing.T, s blob.Storage, block string, expected []byte)
// AssertGetBlockNotFound asserts that GetBlock() for specified storage block returns ErrBlockNotFound.
func AssertGetBlockNotFound(t *testing.T, s blob.Storage, block string) {
b, err := s.GetBlock(block)
b, err := s.GetBlock(block, 0, -1)
if err != blob.ErrBlockNotFound || b != nil {
t.Errorf(errorPrefix()+"GetBlock(%v) returned %v, %v but expected ErrBlockNotFound", block, b, err)
}

View File

@@ -25,13 +25,21 @@ func (s *mapStorage) BlockSize(id string) (int64, error) {
return int64(len(d)), nil
}
func (s *mapStorage) GetBlock(id string) ([]byte, error) {
func (s *mapStorage) GetBlock(id string, offset, length int64) ([]byte, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
data, ok := s.data[string(id)]
if ok {
return data, nil
if length < 0 {
return data, nil
}
data = data[offset:]
if int(length) > len(data) {
return data, nil
}
return data[0:length], nil
}
return nil, blob.ErrBlockNotFound

View File

@@ -98,7 +98,7 @@ func (mm *MetadataManager) writeEncryptedBlock(itemID string, content []byte) er
}
func (mm *MetadataManager) readEncryptedBlock(itemID string) ([]byte, error) {
content, err := mm.storage.GetBlock(MetadataBlockPrefix + itemID)
content, err := mm.storage.GetBlock(MetadataBlockPrefix+itemID, 0, -1)
if err != nil {
if err == blob.ErrBlockNotFound {
return nil, ErrMetadataNotFound
@@ -304,7 +304,7 @@ func newMetadataManager(st blob.Storage, creds auth.Credentials) (*MetadataManag
var blocks [4][]byte
f := func(index int, name string) {
blocks[index], _ = st.GetBlock(name)
blocks[index], _ = st.GetBlock(name, 0, -1)
wg.Done()
}

View File

@@ -40,10 +40,10 @@ type objectFormatter interface {
ComputeObjectID(data []byte) ObjectID
// Encrypt returns encrypted bytes corresponding to the given plaintext. May reuse the input slice.
Encrypt(plainText []byte, oid ObjectID) ([]byte, error)
Encrypt(plainText []byte, oid ObjectID, skip int) ([]byte, error)
// Decrypt returns unencrypted bytes corresponding to the given ciphertext. May reuse the input slice.
Decrypt(cipherText []byte, oid ObjectID) ([]byte, error)
Decrypt(cipherText []byte, oid ObjectID, skip int) ([]byte, error)
}
// digestFunction computes the digest (hash, optionally HMAC) of a given block of bytes.
@@ -60,11 +60,11 @@ func (fi *unencryptedFormat) ComputeObjectID(data []byte) ObjectID {
return ObjectID{StorageBlock: hex.EncodeToString(h)}
}
func (fi *unencryptedFormat) Encrypt(plainText []byte, oid ObjectID) ([]byte, error) {
func (fi *unencryptedFormat) Encrypt(plainText []byte, oid ObjectID, skip int) ([]byte, error) {
return plainText, nil
}
func (fi *unencryptedFormat) Decrypt(cipherText []byte, oid ObjectID) ([]byte, error) {
func (fi *unencryptedFormat) Decrypt(cipherText []byte, oid ObjectID, skip int) ([]byte, error) {
return cipherText, nil
}
@@ -81,31 +81,43 @@ func (fi *syntheticIVEncryptionFormat) ComputeObjectID(data []byte) ObjectID {
return ObjectID{StorageBlock: hex.EncodeToString(h)}
}
func (fi *syntheticIVEncryptionFormat) Encrypt(plainText []byte, oid ObjectID) ([]byte, error) {
func (fi *syntheticIVEncryptionFormat) Encrypt(plainText []byte, oid ObjectID, skip int) ([]byte, error) {
iv, err := decodeHexSuffix(oid.StorageBlock, aes.BlockSize*2)
if err != nil {
return nil, err
}
return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, plainText)
return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, plainText, skip)
}
func (fi *syntheticIVEncryptionFormat) Decrypt(cipherText []byte, oid ObjectID) ([]byte, error) {
func (fi *syntheticIVEncryptionFormat) Decrypt(cipherText []byte, oid ObjectID, skip int) ([]byte, error) {
iv, err := decodeHexSuffix(oid.StorageBlock, aes.BlockSize*2)
if err != nil {
return nil, err
}
return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, cipherText)
return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, cipherText, skip)
}
func symmetricEncrypt(createCipher func(key []byte) (cipher.Block, error), key []byte, iv []byte, b []byte) ([]byte, error) {
func symmetricEncrypt(createCipher func(key []byte) (cipher.Block, error), key []byte, iv []byte, b []byte, skip int) ([]byte, error) {
blockCipher, err := createCipher(key)
if err != nil {
return nil, err
}
ctr := cipher.NewCTR(blockCipher, iv[0:blockCipher.BlockSize()])
if skip > 0 {
var skipBuf [32]byte
skipBufSlice := skipBuf[:]
for skip >= len(skipBuf) {
ctr.XORKeyStream(skipBufSlice, skipBufSlice)
skip -= len(skipBufSlice)
}
if skip > 0 {
ctr.XORKeyStream(skipBufSlice[0:skip], skipBufSlice[0:skip])
}
}
ctr.XORKeyStream(b, b)
return b, nil
}

View File

@@ -27,12 +27,12 @@ func TestObjectFormatters(t *testing.T) {
t.Logf("testing %v", k)
oid := of.ComputeObjectID(data)
cipherText, err := of.Encrypt(data, oid)
cipherText, err := of.Encrypt(data, oid, 0)
if err != nil || cipherText == nil {
t.Errorf("invalid response from Encrypt: %v %v", cipherText, err)
}
plainText, err := of.Decrypt(cipherText, oid)
plainText, err := of.Decrypt(cipherText, oid, 0)
if err != nil || plainText == nil {
t.Errorf("invalid response from Decrypt: %v %v", plainText, err)
}

View File

@@ -72,15 +72,6 @@ func (r *ObjectManager) Open(objectID ObjectID) (ObjectReader, error) {
// Flush any pending writes.
r.writeBack.flush()
if objectID.PackID != "" {
var err error
objectID, err = r.packIDToSection(objectID)
if err != nil {
return nil, err
}
}
if objectID.Section != nil {
baseReader, err := r.Open(objectID.Section.Base)
if err != nil {
@@ -111,6 +102,14 @@ func (r *ObjectManager) Open(objectID ObjectID) (ObjectReader, error) {
}, nil
}
if objectID.BinaryContent != nil {
return newObjectReaderWithData(objectID.BinaryContent), nil
}
if len(objectID.TextContent) > 0 {
return newObjectReaderWithData([]byte(objectID.TextContent)), nil
}
return r.newRawReader(objectID)
}
@@ -124,43 +123,41 @@ func (r *ObjectManager) FinishPacking() error {
return r.packMgr.finishPacking()
}
func (r *ObjectManager) packIDToSection(oid ObjectID) (ObjectID, error) {
func (r *ObjectManager) packIDToSection(oid ObjectID) (ObjectIDSection, error) {
if oid.PackID == "" {
return NullObjectID, fmt.Errorf("invalid pack ID: %v", oid)
return ObjectIDSection{}, fmt.Errorf("invalid pack ID: %v", oid)
}
pi, err := r.packMgr.ensurePackIndexesLoaded()
if err != nil {
return NullObjectID, fmt.Errorf("can't load pack index: %v", err)
return ObjectIDSection{}, fmt.Errorf("can't load pack index: %v", err)
}
p := pi[oid.PackID]
if p == nil {
return NullObjectID, fmt.Errorf("no such pack %q referenced by object ID: %v", oid.PackID, oid)
return ObjectIDSection{}, fmt.Errorf("no such pack %q referenced by object ID: %v", oid.PackID, oid)
}
blk := p.Items[oid.StorageBlock]
if blk == "" {
return NullObjectID, fmt.Errorf("block %q not found in pack %q", oid.StorageBlock, oid.PackID)
return ObjectIDSection{}, fmt.Errorf("block %q not found in pack %q", oid.StorageBlock, oid.PackID)
}
if plus := strings.IndexByte(blk, '+'); plus > 0 {
if start, err := strconv.ParseInt(blk[0:plus], 10, 64); err == nil {
if length, err := strconv.ParseInt(blk[plus+1:], 10, 64); err == nil {
if base, err := ParseObjectID(p.PackObject); err == nil {
return ObjectID{
Section: &ObjectIDSection{
Base: base,
Start: start,
Length: length,
},
return ObjectIDSection{
Base: base,
Start: start,
Length: length,
}, nil
}
}
}
}
return NullObjectID, fmt.Errorf("invalid pack index for %q", oid)
return ObjectIDSection{}, fmt.Errorf("invalid pack index for %q", oid)
}
// newObjectManager creates an ObjectManager with the specified storage, format and options.
@@ -265,7 +262,7 @@ func (r *ObjectManager) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Bu
// Encrypt the block in-place.
atomic.AddInt64(&r.stats.EncryptedBytes, int64(len(data)))
data, err = r.formatter.Encrypt(data, objectID)
data, err = r.formatter.Encrypt(data, objectID, 0)
if err != nil {
return NullObjectID, err
}
@@ -315,16 +312,23 @@ func removeIndirection(o ObjectID) ObjectID {
}
func (r *ObjectManager) newRawReader(objectID ObjectID) (ObjectReader, error) {
if objectID.BinaryContent != nil {
return newObjectReaderWithData(objectID.BinaryContent), nil
}
var payload []byte
var err error
underlyingObjectID := objectID
var decryptSkip int
if objectID.PackID != "" {
var err error
if len(objectID.TextContent) > 0 {
return newObjectReaderWithData([]byte(objectID.TextContent)), nil
p, err := r.packIDToSection(objectID)
if err != nil {
return nil, err
}
payload, err = r.storage.GetBlock(p.Base.StorageBlock, p.Start, p.Length)
underlyingObjectID = p.Base
decryptSkip = int(p.Start)
} else {
payload, err = r.storage.GetBlock(objectID.StorageBlock, 0, -1)
}
blockID := objectID.StorageBlock
payload, err := r.storage.GetBlock(blockID)
if err != nil {
return nil, err
}
@@ -332,7 +336,7 @@ func (r *ObjectManager) newRawReader(objectID ObjectID) (ObjectReader, error) {
atomic.AddInt32(&r.stats.ReadBlocks, 1)
atomic.AddInt64(&r.stats.ReadBytes, int64(len(payload)))
payload, err = r.formatter.Decrypt(payload, objectID)
payload, err = r.formatter.Decrypt(payload, underlyingObjectID, decryptSkip)
atomic.AddInt64(&r.stats.DecryptedBytes, int64(len(payload)))
if err != nil {
return nil, err