From 8dda3bf2c553529aed69cf960c0809d0f76820e2 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 12 Aug 2017 23:11:01 -0700 Subject: [PATCH] added option to read portions of blob storage blocks --- blob/caching/caching_storage.go | 6 +-- blob/filesystem/filesystem_storage.go | 19 +++++--- blob/gcs/gcs_storage.go | 6 +-- blob/logging/logging_storage.go | 18 ++++---- blob/storage.go | 2 +- internal/storagetesting/asserts.go | 4 +- internal/storagetesting/map.go | 12 ++++- repo/metadata_manager.go | 4 +- repo/object_formatter.go | 30 ++++++++---- repo/object_formatter_test.go | 4 +- repo/object_manager.go | 66 ++++++++++++++------------- 11 files changed, 101 insertions(+), 70 deletions(-) diff --git a/blob/caching/caching_storage.go b/blob/caching/caching_storage.go index a8f473bd9..867e4366f 100644 --- a/blob/caching/caching_storage.go +++ b/blob/caching/caching_storage.go @@ -136,7 +136,7 @@ func (c *cachingStorage) DeleteBlock(id string) error { return nil } -func (c *cachingStorage) GetBlock(id string) ([]byte, error) { +func (c *cachingStorage) GetBlock(id string, offset, length int64) ([]byte, error) { c.Lock(id) defer c.Unlock(id) @@ -145,14 +145,14 @@ func (c *cachingStorage) GetBlock(id string) ([]byte, error) { return nil, blob.ErrBlockNotFound } - v, err := c.cache.GetBlock(id) + v, err := c.cache.GetBlock(id, offset, length) if err == nil { return v, nil } } // Download from master - b, err := c.master.GetBlock(id) + b, err := c.master.GetBlock(id, 0, -1) if err == nil { l := int64(len(b)) diff --git a/blob/filesystem/filesystem_storage.go b/blob/filesystem/filesystem_storage.go index bf76347b0..09fa730c1 100644 --- a/blob/filesystem/filesystem_storage.go +++ b/blob/filesystem/filesystem_storage.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "io" "io/ioutil" "math/rand" "os" @@ -43,18 +44,24 @@ func (fs *fsStorage) BlockSize(blockID string) (int64, error) { return 0, err } -func (fs *fsStorage) GetBlock(blockID string) ([]byte, error) { +func (fs *fsStorage) GetBlock(blockID string, offset, length int64) ([]byte, error) { _, path := fs.getShardedPathAndFilePath(blockID) - d, err := ioutil.ReadFile(path) - if err == nil { - return d, err - } + f, err := os.Open(path) if os.IsNotExist(err) { return nil, blob.ErrBlockNotFound } - return nil, err + if err != nil { + return nil, err + } + + if length < 0 { + return ioutil.ReadAll(f) + } + + f.Seek(offset, os.SEEK_SET) + return ioutil.ReadAll(io.LimitReader(f, length)) } func getstringFromFileName(name string) (string, bool) { diff --git a/blob/gcs/gcs_storage.go b/blob/gcs/gcs_storage.go index 0a66afae2..0de781da9 100644 --- a/blob/gcs/gcs_storage.go +++ b/blob/gcs/gcs_storage.go @@ -61,9 +61,9 @@ func (gcs *gcsStorage) BlockSize(b string) (int64, error) { return v.(int64), nil } -func (gcs *gcsStorage) GetBlock(b string) ([]byte, error) { +func (gcs *gcsStorage) GetBlock(b string, offset, length int64) ([]byte, error) { attempt := func() (interface{}, error) { - reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewReader(gcs.ctx) + reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewRangeReader(gcs.ctx, offset, length) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func (gcs *gcsStorage) GetBlock(b string) ([]byte, error) { return v, err } - v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q)", b), attempt) + v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q,%v,%v)", b, offset, length), attempt) if err != nil { return nil, translateError(err) } diff --git a/blob/logging/logging_storage.go b/blob/logging/logging_storage.go index 4cea5b9c5..b3c28a144 100644 --- a/blob/logging/logging_storage.go +++ b/blob/logging/logging_storage.go @@ -18,18 +18,18 @@ func (s *loggingStorage) BlockSize(id string) (int64, error) { t0 := time.Now() result, err := s.base.BlockSize(id) dt := time.Since(t0) - s.printf(s.prefix+"BlockSize(%#v)=%#v,%#v took %v", id, result, err, dt) + s.printf(s.prefix+"BlockSize(%q)=%#v,%#v took %v", id, result, err, dt) return result, err } -func (s *loggingStorage) GetBlock(id string) ([]byte, error) { +func (s *loggingStorage) GetBlock(id string, offset, length int64) ([]byte, error) { t0 := time.Now() - result, err := s.base.GetBlock(id) + result, err := s.base.GetBlock(id, offset, length) dt := time.Since(t0) if len(result) < 20 { - s.printf(s.prefix+"GetBlock(%#v)=(%#v, %#v) took %v", id, result, err, dt) + s.printf(s.prefix+"GetBlock(%q,%v,%v)=(%#v, %#v) took %v", id, result, err, dt) } else { - s.printf(s.prefix+"GetBlock(%#v)=({%#v bytes}, %#v) took %v", id, len(result), err, dt) + s.printf(s.prefix+"GetBlock(%q,%v,%v)=({%#v bytes}, %#v) took %v", id, len(result), err, dt) } return result, err } @@ -38,7 +38,7 @@ func (s *loggingStorage) PutBlock(id string, data []byte, options blob.PutOption t0 := time.Now() err := s.base.PutBlock(id, data, options) dt := time.Since(t0) - s.printf(s.prefix+"PutBlock(%#v, options=%v, len=%v)=%#v took %v", id, options, len(data), err, dt) + s.printf(s.prefix+"PutBlock(%q, options=%v, len=%v)=%#v took %v", id, options, len(data), err, dt) return err } @@ -46,16 +46,16 @@ func (s *loggingStorage) DeleteBlock(id string) error { t0 := time.Now() err := s.base.DeleteBlock(id) dt := time.Since(t0) - s.printf(s.prefix+"DeleteBlock(%#v)=%#v took %v", id, err, dt) + s.printf(s.prefix+"DeleteBlock(%q)=%#v took %v", id, err, dt) return err } func (s *loggingStorage) ListBlocks(prefix string) (chan blob.BlockMetadata, blob.CancelFunc) { t0 := time.Now() ch, cf := s.base.ListBlocks(prefix) - s.printf(s.prefix+"ListBlocks(%#v) took %v", prefix, time.Since(t0)) + s.printf(s.prefix+"ListBlocks(%q) took %v", prefix, time.Since(t0)) return ch, func() { - s.printf(s.prefix+"Cancelled ListBlocks(%#v)after %v", prefix, time.Since(t0)) + s.printf(s.prefix+"Cancelled ListBlocks(%q)after %v", prefix, time.Since(t0)) cf() } } diff --git a/blob/storage.go b/blob/storage.go index af94fd6ad..d69923c9e 100644 --- a/blob/storage.go +++ b/blob/storage.go @@ -24,7 +24,7 @@ type Storage interface { BlockSize(id string) (int64, error) PutBlock(id string, data []byte, options PutOptions) error DeleteBlock(id string) error - GetBlock(id string) ([]byte, error) + GetBlock(id string, offset, length int64) ([]byte, error) ListBlocks(prefix string) (chan (BlockMetadata), CancelFunc) } diff --git a/internal/storagetesting/asserts.go b/internal/storagetesting/asserts.go index b93d8dc6e..8cff84a5c 100644 --- a/internal/storagetesting/asserts.go +++ b/internal/storagetesting/asserts.go @@ -13,7 +13,7 @@ // AssertGetBlock asserts that the specified storage block has correct content. func AssertGetBlock(t *testing.T, s blob.Storage, block string, expected []byte) { - b, err := s.GetBlock(block) + b, err := s.GetBlock(block, 0, -1) if err != nil { t.Errorf(errorPrefix()+"GetBlock(%v) returned error %v, expected data: %v", block, err, expected) return @@ -26,7 +26,7 @@ func AssertGetBlock(t *testing.T, s blob.Storage, block string, expected []byte) // AssertGetBlockNotFound asserts that GetBlock() for specified storage block returns ErrBlockNotFound. func AssertGetBlockNotFound(t *testing.T, s blob.Storage, block string) { - b, err := s.GetBlock(block) + b, err := s.GetBlock(block, 0, -1) if err != blob.ErrBlockNotFound || b != nil { t.Errorf(errorPrefix()+"GetBlock(%v) returned %v, %v but expected ErrBlockNotFound", block, b, err) } diff --git a/internal/storagetesting/map.go b/internal/storagetesting/map.go index 7849a6390..bbd576852 100644 --- a/internal/storagetesting/map.go +++ b/internal/storagetesting/map.go @@ -25,13 +25,21 @@ func (s *mapStorage) BlockSize(id string) (int64, error) { return int64(len(d)), nil } -func (s *mapStorage) GetBlock(id string) ([]byte, error) { +func (s *mapStorage) GetBlock(id string, offset, length int64) ([]byte, error) { s.mutex.RLock() defer s.mutex.RUnlock() data, ok := s.data[string(id)] if ok { - return data, nil + if length < 0 { + return data, nil + } + + data = data[offset:] + if int(length) > len(data) { + return data, nil + } + return data[0:length], nil } return nil, blob.ErrBlockNotFound diff --git a/repo/metadata_manager.go b/repo/metadata_manager.go index 638ee36aa..7f1c126f3 100644 --- a/repo/metadata_manager.go +++ b/repo/metadata_manager.go @@ -98,7 +98,7 @@ func (mm *MetadataManager) writeEncryptedBlock(itemID string, content []byte) er } func (mm *MetadataManager) readEncryptedBlock(itemID string) ([]byte, error) { - content, err := mm.storage.GetBlock(MetadataBlockPrefix + itemID) + content, err := mm.storage.GetBlock(MetadataBlockPrefix+itemID, 0, -1) if err != nil { if err == blob.ErrBlockNotFound { return nil, ErrMetadataNotFound @@ -304,7 +304,7 @@ func newMetadataManager(st blob.Storage, creds auth.Credentials) (*MetadataManag var blocks [4][]byte f := func(index int, name string) { - blocks[index], _ = st.GetBlock(name) + blocks[index], _ = st.GetBlock(name, 0, -1) wg.Done() } diff --git a/repo/object_formatter.go b/repo/object_formatter.go index bedffa694..0c6563937 100644 --- a/repo/object_formatter.go +++ b/repo/object_formatter.go @@ -40,10 +40,10 @@ type objectFormatter interface { ComputeObjectID(data []byte) ObjectID // Encrypt returns encrypted bytes corresponding to the given plaintext. May reuse the input slice. - Encrypt(plainText []byte, oid ObjectID) ([]byte, error) + Encrypt(plainText []byte, oid ObjectID, skip int) ([]byte, error) // Decrypt returns unencrypted bytes corresponding to the given ciphertext. May reuse the input slice. - Decrypt(cipherText []byte, oid ObjectID) ([]byte, error) + Decrypt(cipherText []byte, oid ObjectID, skip int) ([]byte, error) } // digestFunction computes the digest (hash, optionally HMAC) of a given block of bytes. @@ -60,11 +60,11 @@ func (fi *unencryptedFormat) ComputeObjectID(data []byte) ObjectID { return ObjectID{StorageBlock: hex.EncodeToString(h)} } -func (fi *unencryptedFormat) Encrypt(plainText []byte, oid ObjectID) ([]byte, error) { +func (fi *unencryptedFormat) Encrypt(plainText []byte, oid ObjectID, skip int) ([]byte, error) { return plainText, nil } -func (fi *unencryptedFormat) Decrypt(cipherText []byte, oid ObjectID) ([]byte, error) { +func (fi *unencryptedFormat) Decrypt(cipherText []byte, oid ObjectID, skip int) ([]byte, error) { return cipherText, nil } @@ -81,31 +81,43 @@ func (fi *syntheticIVEncryptionFormat) ComputeObjectID(data []byte) ObjectID { return ObjectID{StorageBlock: hex.EncodeToString(h)} } -func (fi *syntheticIVEncryptionFormat) Encrypt(plainText []byte, oid ObjectID) ([]byte, error) { +func (fi *syntheticIVEncryptionFormat) Encrypt(plainText []byte, oid ObjectID, skip int) ([]byte, error) { iv, err := decodeHexSuffix(oid.StorageBlock, aes.BlockSize*2) if err != nil { return nil, err } - return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, plainText) + return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, plainText, skip) } -func (fi *syntheticIVEncryptionFormat) Decrypt(cipherText []byte, oid ObjectID) ([]byte, error) { +func (fi *syntheticIVEncryptionFormat) Decrypt(cipherText []byte, oid ObjectID, skip int) ([]byte, error) { iv, err := decodeHexSuffix(oid.StorageBlock, aes.BlockSize*2) if err != nil { return nil, err } - return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, cipherText) + return symmetricEncrypt(fi.createCipher, fi.aesKey, iv, cipherText, skip) } -func symmetricEncrypt(createCipher func(key []byte) (cipher.Block, error), key []byte, iv []byte, b []byte) ([]byte, error) { +func symmetricEncrypt(createCipher func(key []byte) (cipher.Block, error), key []byte, iv []byte, b []byte, skip int) ([]byte, error) { blockCipher, err := createCipher(key) if err != nil { return nil, err } ctr := cipher.NewCTR(blockCipher, iv[0:blockCipher.BlockSize()]) + if skip > 0 { + var skipBuf [32]byte + skipBufSlice := skipBuf[:] + for skip >= len(skipBuf) { + ctr.XORKeyStream(skipBufSlice, skipBufSlice) + skip -= len(skipBufSlice) + } + if skip > 0 { + ctr.XORKeyStream(skipBufSlice[0:skip], skipBufSlice[0:skip]) + } + } + ctr.XORKeyStream(b, b) return b, nil } diff --git a/repo/object_formatter_test.go b/repo/object_formatter_test.go index 0efdb7c25..8f767eb85 100644 --- a/repo/object_formatter_test.go +++ b/repo/object_formatter_test.go @@ -27,12 +27,12 @@ func TestObjectFormatters(t *testing.T) { t.Logf("testing %v", k) oid := of.ComputeObjectID(data) - cipherText, err := of.Encrypt(data, oid) + cipherText, err := of.Encrypt(data, oid, 0) if err != nil || cipherText == nil { t.Errorf("invalid response from Encrypt: %v %v", cipherText, err) } - plainText, err := of.Decrypt(cipherText, oid) + plainText, err := of.Decrypt(cipherText, oid, 0) if err != nil || plainText == nil { t.Errorf("invalid response from Decrypt: %v %v", plainText, err) } diff --git a/repo/object_manager.go b/repo/object_manager.go index a37734aa7..fe658e623 100644 --- a/repo/object_manager.go +++ b/repo/object_manager.go @@ -72,15 +72,6 @@ func (r *ObjectManager) Open(objectID ObjectID) (ObjectReader, error) { // Flush any pending writes. r.writeBack.flush() - if objectID.PackID != "" { - var err error - - objectID, err = r.packIDToSection(objectID) - if err != nil { - return nil, err - } - } - if objectID.Section != nil { baseReader, err := r.Open(objectID.Section.Base) if err != nil { @@ -111,6 +102,14 @@ func (r *ObjectManager) Open(objectID ObjectID) (ObjectReader, error) { }, nil } + if objectID.BinaryContent != nil { + return newObjectReaderWithData(objectID.BinaryContent), nil + } + + if len(objectID.TextContent) > 0 { + return newObjectReaderWithData([]byte(objectID.TextContent)), nil + } + return r.newRawReader(objectID) } @@ -124,43 +123,41 @@ func (r *ObjectManager) FinishPacking() error { return r.packMgr.finishPacking() } -func (r *ObjectManager) packIDToSection(oid ObjectID) (ObjectID, error) { +func (r *ObjectManager) packIDToSection(oid ObjectID) (ObjectIDSection, error) { if oid.PackID == "" { - return NullObjectID, fmt.Errorf("invalid pack ID: %v", oid) + return ObjectIDSection{}, fmt.Errorf("invalid pack ID: %v", oid) } pi, err := r.packMgr.ensurePackIndexesLoaded() if err != nil { - return NullObjectID, fmt.Errorf("can't load pack index: %v", err) + return ObjectIDSection{}, fmt.Errorf("can't load pack index: %v", err) } p := pi[oid.PackID] if p == nil { - return NullObjectID, fmt.Errorf("no such pack %q referenced by object ID: %v", oid.PackID, oid) + return ObjectIDSection{}, fmt.Errorf("no such pack %q referenced by object ID: %v", oid.PackID, oid) } blk := p.Items[oid.StorageBlock] if blk == "" { - return NullObjectID, fmt.Errorf("block %q not found in pack %q", oid.StorageBlock, oid.PackID) + return ObjectIDSection{}, fmt.Errorf("block %q not found in pack %q", oid.StorageBlock, oid.PackID) } if plus := strings.IndexByte(blk, '+'); plus > 0 { if start, err := strconv.ParseInt(blk[0:plus], 10, 64); err == nil { if length, err := strconv.ParseInt(blk[plus+1:], 10, 64); err == nil { if base, err := ParseObjectID(p.PackObject); err == nil { - return ObjectID{ - Section: &ObjectIDSection{ - Base: base, - Start: start, - Length: length, - }, + return ObjectIDSection{ + Base: base, + Start: start, + Length: length, }, nil } } } } - return NullObjectID, fmt.Errorf("invalid pack index for %q", oid) + return ObjectIDSection{}, fmt.Errorf("invalid pack index for %q", oid) } // newObjectManager creates an ObjectManager with the specified storage, format and options. @@ -265,7 +262,7 @@ func (r *ObjectManager) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Bu // Encrypt the block in-place. atomic.AddInt64(&r.stats.EncryptedBytes, int64(len(data))) - data, err = r.formatter.Encrypt(data, objectID) + data, err = r.formatter.Encrypt(data, objectID, 0) if err != nil { return NullObjectID, err } @@ -315,16 +312,23 @@ func removeIndirection(o ObjectID) ObjectID { } func (r *ObjectManager) newRawReader(objectID ObjectID) (ObjectReader, error) { - if objectID.BinaryContent != nil { - return newObjectReaderWithData(objectID.BinaryContent), nil - } + var payload []byte + var err error + underlyingObjectID := objectID + var decryptSkip int + if objectID.PackID != "" { + var err error - if len(objectID.TextContent) > 0 { - return newObjectReaderWithData([]byte(objectID.TextContent)), nil + p, err := r.packIDToSection(objectID) + if err != nil { + return nil, err + } + payload, err = r.storage.GetBlock(p.Base.StorageBlock, p.Start, p.Length) + underlyingObjectID = p.Base + decryptSkip = int(p.Start) + } else { + payload, err = r.storage.GetBlock(objectID.StorageBlock, 0, -1) } - - blockID := objectID.StorageBlock - payload, err := r.storage.GetBlock(blockID) if err != nil { return nil, err } @@ -332,7 +336,7 @@ func (r *ObjectManager) newRawReader(objectID ObjectID) (ObjectReader, error) { atomic.AddInt32(&r.stats.ReadBlocks, 1) atomic.AddInt64(&r.stats.ReadBytes, int64(len(payload))) - payload, err = r.formatter.Decrypt(payload, objectID) + payload, err = r.formatter.Decrypt(payload, underlyingObjectID, decryptSkip) atomic.AddInt64(&r.stats.DecryptedBytes, int64(len(payload))) if err != nil { return nil, err