added ObjectWriter.StorageBlocks() which returns the list of unique storage blocks that the object is based upon

This commit is contained in:
Jarek Kowalski
2016-08-10 21:25:52 -07:00
parent fb051eca60
commit 9c828db309
3 changed files with 66 additions and 24 deletions

View File

@@ -14,6 +14,26 @@ type ObjectWriter interface {
io.WriteCloser
Result(forceStored bool) (ObjectID, error)
StorageBlocks() []string
}
type blockTracker struct {
blocks map[string]bool
}
func (t *blockTracker) addBlock(blockID string) {
if t.blocks == nil {
t.blocks = make(map[string]bool)
}
t.blocks[blockID] = true
}
func (t *blockTracker) blockIDs() []string {
result := make([]string, 0, len(t.blocks))
for k := range t.blocks {
result = append(result, k)
}
return result
}
type objectWriter struct {
@@ -30,6 +50,8 @@ type objectWriter struct {
description string
objectType ObjectIDType
blockTracker *blockTracker
atomicWrites bool
}
@@ -110,13 +132,19 @@ func (w *objectWriter) flushBuffer(force bool) error {
err)
}
w.blockTracker.addBlock(objectID.BlockID())
w.flushedObjectCount++
w.lastFlushedObject = objectID
if w.listWriter == nil {
w.listWriter = newObjectWriter(w.repo, ObjectIDTypeList)
w.listWriter.prefix = w.prefix
w.listWriter.description = "LIST(" + w.description + ")"
w.listWriter.atomicWrites = true
w.listWriter = &objectWriter{
repo: w.repo,
objectType: ObjectIDTypeList,
prefix: w.prefix,
description: "LIST(" + w.description + ")",
atomicWrites: true,
blockTracker: w.blockTracker,
}
}
fmt.Fprintf(w.listWriter, "%v,%v\n", length, objectID)
@@ -124,13 +152,6 @@ func (w *objectWriter) flushBuffer(force bool) error {
return nil
}
func newObjectWriter(repo *repository, objectType ObjectIDType) *objectWriter {
return &objectWriter{
repo: repo,
objectType: objectType,
}
}
func (w *objectWriter) Result(forceStored bool) (ObjectID, error) {
if !forceStored && w.flushedObjectCount == 0 {
if w.buffer == nil {
@@ -158,6 +179,10 @@ func (w *objectWriter) Result(forceStored bool) (ObjectID, error) {
}
}
func (w *objectWriter) StorageBlocks() []string {
return w.blockTracker.blockIDs()
}
// WriterOption is an option that can be passed to Repository.NewWriter()
type WriterOption func(*objectWriter)

View File

@@ -105,7 +105,11 @@ func (repo *repository) Storage() storage.Storage {
}
func (repo *repository) NewWriter(options ...WriterOption) ObjectWriter {
result := newObjectWriter(repo, ObjectIDTypeStored)
result := &objectWriter{
repo: repo,
objectType: ObjectIDTypeStored,
blockTracker: &blockTracker{},
}
for _, option := range options {
option(result)

View File

@@ -9,6 +9,7 @@
"io/ioutil"
"math/rand"
"reflect"
"sort"
"strings"
"testing"
@@ -135,13 +136,11 @@ func TestWriterListChunk(t *testing.T) {
}
// We have 3 chunks - 200 zero bytes, 50 zero bytes and the list.
if !reflect.DeepEqual(data, map[string][]byte{
verifyStorageBlocks(t, writer.StorageBlocks(), data, map[string][]byte{
contentMd5Sum200: contentBytes[0:200],
contentMd5Sum50: contentBytes[0:50],
getMd5Digest(listChunkContent): listChunkContent,
}) {
t.Errorf("invalid storage contents: %v", data)
}
})
}
func TestWriterListOfListsChunk(t *testing.T) {
@@ -167,14 +166,12 @@ func TestWriterListOfListsChunk(t *testing.T) {
}
// We have 4 chunks - 200 zero bytes, 2 lists, and 1 list-of-lists.
if !reflect.DeepEqual(data, map[string][]byte{
verifyStorageBlocks(t, writer.StorageBlocks(), data, map[string][]byte{
getMd5Digest(contentBytes[0:200]): contentBytes[0:200],
getMd5Digest(list1ChunkContent): list1ChunkContent,
getMd5Digest(list2ChunkContent): list2ChunkContent,
getMd5Digest(listOfListsChunkContent): listOfListsChunkContent,
}) {
t.Errorf("invalid storage contents: %v", data)
}
})
}
func TestWriterListOfListsOfListsChunk(t *testing.T) {
@@ -221,14 +218,12 @@ func TestWriterListOfListsOfListsChunk(t *testing.T) {
}
// We have 4 data blocks representing 10000 bytes of zero. Not bad!
if !reflect.DeepEqual(data, map[string][]byte{
verifyStorageBlocks(t, writer.StorageBlocks(), data, map[string][]byte{
getMd5Digest(writtenData[0:200]): writtenData[0:200],
getMd5Digest(list1ChunkContent): list1ChunkContent,
getMd5Digest(list2ChunkContent): list2ChunkContent,
getMd5Digest(list3ChunkContent): list3ChunkContent,
}) {
t.Errorf("invalid storage contents: %v", data)
}
})
}
func TestHMAC(t *testing.T) {
@@ -602,3 +597,21 @@ func TestInvalidEncryptionKey(t *testing.T) {
t.Errorf("expected error when opening object with corrupt data")
}
}
func verifyStorageBlocks(t *testing.T, actual []string, data, expectedData map[string][]byte) {
actualCopy := append([]string(nil), actual...)
sort.Strings(actualCopy)
var expected []string
for k := range expectedData {
expected = append(expected, k)
}
sort.Strings(expected)
if !reflect.DeepEqual(expected, actualCopy) {
t.Errorf("updated blocks don't match. Expected: %#v, got %#v", expected, actualCopy)
}
if !reflect.DeepEqual(expectedData, data) {
t.Errorf("storage data does not match, expected: %v, got %v", expectedData, data)
}
}