added support for block manager delete

This commit is contained in:
Jarek Kowalski
2017-11-11 10:06:27 -08:00
parent 5b287815ad
commit 50398692fe
4 changed files with 75 additions and 50 deletions

View File

@@ -82,6 +82,35 @@ func (bm *Manager) BlockSize(blockID string) (int64, error) {
return int64(ndx.Items[blockID].size), nil
}
// DeleteBlock marks the given blockID as deleted.
//
// NOTE: To avoid race conditions only blocks that cannot be possibly re-created
// should ever be deleted. That means that contents of such blocks should include some element
// of randomness or a contemporaneous timestamp that will never reappear.
func (bm *Manager) DeleteBlock(blockID string) error {
bm.lock()
defer bm.unlock()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return err
}
// delete from all indexes
for _, m := range bm.groupToBlockToIndex {
delete(m, blockID)
}
for _, m := range bm.openPackGroups {
if ndx := m.currentPackIndex; ndx != nil {
delete(ndx.Items, blockID)
}
}
g := bm.ensurePackGroupLocked("", true)
g.currentPackIndex.DeletedItems = append(g.currentPackIndex.DeletedItems, blockID)
return nil
}
func (bm *Manager) registerUnpackedBlock(packGroupID string, blockID string, dataLength int64) error {
bm.lock()
defer bm.unlock()
@@ -210,7 +239,9 @@ func (bm *Manager) ensurePackGroupLocked(packGroup string, unpacked bool) *packI
func (bm *Manager) flushPackIndexesLocked() error {
if len(bm.pendingPackIndexes) > 0 {
log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes))
if false {
log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes))
}
if _, err := bm.writePackIndexes(bm.pendingPackIndexes); err != nil {
return err
}
@@ -270,7 +301,7 @@ func (bm *Manager) finishPackLocked(g *packInfo) error {
g.currentPackIndex.PackBlockID = blockID
}
if len(g.currentPackIndex.Items) > 0 {
if len(g.currentPackIndex.Items)+len(g.currentPackIndex.DeletedItems) > 0 {
bm.pendingPackIndexes = append(bm.pendingPackIndexes, g.currentPackIndex)
}
g.currentPackData = g.currentPackData[:0]
@@ -384,7 +415,9 @@ func (bm *Manager) ensurePackIndexesLoaded() error {
bm.groupToBlockToIndex = dedupeBlockIDsAndIndex(merged)
log.Printf("loaded %v indexes of %v blocks in %v", len(merged), len(bm.groupToBlockToIndex), time.Since(t0))
if false {
log.Printf("loaded %v indexes of %v blocks in %v", len(merged), len(bm.groupToBlockToIndex), time.Since(t0))
}
return nil
}
@@ -413,6 +446,15 @@ func dedupeBlockIDsAndIndex(ndx packIndexes) map[string]map[string]*packIndex {
}
}
// Remove deleted items from all groups.
for _, pck := range ndx {
for _, di := range pck.DeletedItems {
for _, m := range pi {
delete(m, di)
}
}
}
return pi
}
@@ -857,6 +899,10 @@ func (bm *Manager) findIndexForBlockLocked(blockID string) *packIndex {
}
func (bm *Manager) blockInfoLocked(blockID string) (Info, error) {
if strings.HasPrefix(blockID, packBlockPrefix) {
return Info{}, nil
}
bm.assertLocked()
ndx := bm.findIndexForBlockLocked(blockID)
@@ -879,9 +925,7 @@ func (bm *Manager) getBlockInternalLocked(blockID string) ([]byte, error) {
s, err := bm.blockInfoLocked(blockID)
if err != nil {
if err != storage.ErrBlockNotFound {
return nil, err
}
return nil, err
}
var payload []byte

View File

@@ -522,6 +522,26 @@ func TestBlockManagerConcurrency(t *testing.T) {
}
}
func TestDeleteBlock(t *testing.T) {
data := map[string][]byte{}
bm := newTestBlockManager(data)
block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100))
bm.Flush()
block2 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(11, 100))
if err := bm.DeleteBlock(block1); err != nil {
t.Errorf("unable to delete block: %v", block1)
}
if err := bm.DeleteBlock(block2); err != nil {
t.Errorf("unable to delete block: %v", block1)
}
verifyBlockNotFound(t, bm, block1)
verifyBlockNotFound(t, bm, block2)
bm.Flush()
bm = newTestBlockManager(data)
verifyBlockNotFound(t, bm, block1)
verifyBlockNotFound(t, bm, block2)
}
func newTestBlockManager(data map[string][]byte) *Manager {
st := storagetesting.NewMapStorage(data)

View File

@@ -48,10 +48,11 @@ func (o *offsetAndSize) UnmarshalJSON(b []byte) error {
}
type packIndex struct {
PackBlockID string `json:"packBlock,omitempty"`
PackGroup string `json:"packGroup,omitempty"`
CreateTime time.Time `json:"createTime"`
Items map[string]offsetAndSize `json:"items"`
PackBlockID string `json:"packBlock,omitempty"`
PackGroup string `json:"packGroup,omitempty"`
CreateTime time.Time `json:"createTime"`
Items map[string]offsetAndSize `json:"items"`
DeletedItems []string `json:"deletedItems"`
}
func loadPackIndexes(r io.Reader) (packIndexes, error) {

View File

@@ -233,46 +233,6 @@ func TestHMAC(t *testing.T) {
t.Errorf("unexpected result: %v err: %v", result.String(), err)
}
}
func TestReader(t *testing.T) {
data, repo := setupTest(t)
storedPayload := []byte("foo\nbar")
data["a76999788386641a3ec798554f1fe7e6"] = storedPayload
cases := []struct {
text string
payload []byte
}{
{"Da76999788386641a3ec798554f1fe7e6", storedPayload},
}
for _, c := range cases {
objectID, err := object.ParseID(c.text)
if err != nil {
t.Errorf("cannot parse object ID: %v", err)
continue
}
reader, err := repo.Objects.Open(objectID)
if err != nil {
t.Errorf("cannot create reader for %v: %v", objectID, err)
continue
}
d, err := ioutil.ReadAll(reader)
if err != nil {
t.Errorf("cannot read all data for %v: %v", objectID, err)
continue
}
if !bytes.Equal(d, c.payload) {
t.Errorf("incorrect payload for %v: expected: %v got: %v", objectID, c.payload, d)
continue
}
}
}
func TestMalformedStoredData(t *testing.T) {
data, repo := setupTest(t)