From 50398692fec6ae5fccea37500da2bb6dd0f9669a Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 11 Nov 2017 10:06:27 -0800 Subject: [PATCH] added support for block manager delete --- block/block_manager.go | 56 +++++++++++++++++++++++++++++++++---- block/block_manager_test.go | 20 +++++++++++++ block/pack_index.go | 9 +++--- repo/repository_test.go | 40 -------------------------- 4 files changed, 75 insertions(+), 50 deletions(-) diff --git a/block/block_manager.go b/block/block_manager.go index fcc97195a..83da31fdf 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -82,6 +82,35 @@ func (bm *Manager) BlockSize(blockID string) (int64, error) { return int64(ndx.Items[blockID].size), nil } +// DeleteBlock marks the given blockID as deleted. +// +// NOTE: To avoid race conditions only blocks that cannot be possibly re-created +// should ever be deleted. That means that contents of such blocks should include some element +// of randomness or a contemporaneous timestamp that will never reappear. +func (bm *Manager) DeleteBlock(blockID string) error { + bm.lock() + defer bm.unlock() + + if err := bm.ensurePackIndexesLoaded(); err != nil { + return err + } + + // delete from all indexes + for _, m := range bm.groupToBlockToIndex { + delete(m, blockID) + } + + for _, m := range bm.openPackGroups { + if ndx := m.currentPackIndex; ndx != nil { + delete(ndx.Items, blockID) + } + } + + g := bm.ensurePackGroupLocked("", true) + g.currentPackIndex.DeletedItems = append(g.currentPackIndex.DeletedItems, blockID) + return nil +} + func (bm *Manager) registerUnpackedBlock(packGroupID string, blockID string, dataLength int64) error { bm.lock() defer bm.unlock() @@ -210,7 +239,9 @@ func (bm *Manager) ensurePackGroupLocked(packGroup string, unpacked bool) *packI func (bm *Manager) flushPackIndexesLocked() error { if len(bm.pendingPackIndexes) > 0 { - log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes)) + if false { + log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes)) + } if _, err := bm.writePackIndexes(bm.pendingPackIndexes); err != nil { return err } @@ -270,7 +301,7 @@ func (bm *Manager) finishPackLocked(g *packInfo) error { g.currentPackIndex.PackBlockID = blockID } - if len(g.currentPackIndex.Items) > 0 { + if len(g.currentPackIndex.Items)+len(g.currentPackIndex.DeletedItems) > 0 { bm.pendingPackIndexes = append(bm.pendingPackIndexes, g.currentPackIndex) } g.currentPackData = g.currentPackData[:0] @@ -384,7 +415,9 @@ func (bm *Manager) ensurePackIndexesLoaded() error { bm.groupToBlockToIndex = dedupeBlockIDsAndIndex(merged) - log.Printf("loaded %v indexes of %v blocks in %v", len(merged), len(bm.groupToBlockToIndex), time.Since(t0)) + if false { + log.Printf("loaded %v indexes of %v blocks in %v", len(merged), len(bm.groupToBlockToIndex), time.Since(t0)) + } return nil } @@ -413,6 +446,15 @@ func dedupeBlockIDsAndIndex(ndx packIndexes) map[string]map[string]*packIndex { } } + // Remove deleted items from all groups. + for _, pck := range ndx { + for _, di := range pck.DeletedItems { + for _, m := range pi { + delete(m, di) + } + } + } + return pi } @@ -857,6 +899,10 @@ func (bm *Manager) findIndexForBlockLocked(blockID string) *packIndex { } func (bm *Manager) blockInfoLocked(blockID string) (Info, error) { + if strings.HasPrefix(blockID, packBlockPrefix) { + return Info{}, nil + } + bm.assertLocked() ndx := bm.findIndexForBlockLocked(blockID) @@ -879,9 +925,7 @@ func (bm *Manager) getBlockInternalLocked(blockID string) ([]byte, error) { s, err := bm.blockInfoLocked(blockID) if err != nil { - if err != storage.ErrBlockNotFound { - return nil, err - } + return nil, err } var payload []byte diff --git a/block/block_manager_test.go b/block/block_manager_test.go index 2ebbbd934..97e4def96 100644 --- a/block/block_manager_test.go +++ b/block/block_manager_test.go @@ -522,6 +522,26 @@ func TestBlockManagerConcurrency(t *testing.T) { } } +func TestDeleteBlock(t *testing.T) { + data := map[string][]byte{} + bm := newTestBlockManager(data) + block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100)) + bm.Flush() + block2 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(11, 100)) + if err := bm.DeleteBlock(block1); err != nil { + t.Errorf("unable to delete block: %v", block1) + } + if err := bm.DeleteBlock(block2); err != nil { + t.Errorf("unable to delete block: %v", block1) + } + verifyBlockNotFound(t, bm, block1) + verifyBlockNotFound(t, bm, block2) + bm.Flush() + bm = newTestBlockManager(data) + verifyBlockNotFound(t, bm, block1) + verifyBlockNotFound(t, bm, block2) +} + func newTestBlockManager(data map[string][]byte) *Manager { st := storagetesting.NewMapStorage(data) diff --git a/block/pack_index.go b/block/pack_index.go index 4da49805c..1f98d9c4e 100644 --- a/block/pack_index.go +++ b/block/pack_index.go @@ -48,10 +48,11 @@ func (o *offsetAndSize) UnmarshalJSON(b []byte) error { } type packIndex struct { - PackBlockID string `json:"packBlock,omitempty"` - PackGroup string `json:"packGroup,omitempty"` - CreateTime time.Time `json:"createTime"` - Items map[string]offsetAndSize `json:"items"` + PackBlockID string `json:"packBlock,omitempty"` + PackGroup string `json:"packGroup,omitempty"` + CreateTime time.Time `json:"createTime"` + Items map[string]offsetAndSize `json:"items"` + DeletedItems []string `json:"deletedItems"` } func loadPackIndexes(r io.Reader) (packIndexes, error) { diff --git a/repo/repository_test.go b/repo/repository_test.go index 52eea1753..daa8447b1 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -233,46 +233,6 @@ func TestHMAC(t *testing.T) { t.Errorf("unexpected result: %v err: %v", result.String(), err) } } - -func TestReader(t *testing.T) { - data, repo := setupTest(t) - - storedPayload := []byte("foo\nbar") - data["a76999788386641a3ec798554f1fe7e6"] = storedPayload - - cases := []struct { - text string - payload []byte - }{ - {"Da76999788386641a3ec798554f1fe7e6", storedPayload}, - } - - for _, c := range cases { - objectID, err := object.ParseID(c.text) - if err != nil { - t.Errorf("cannot parse object ID: %v", err) - continue - } - - reader, err := repo.Objects.Open(objectID) - if err != nil { - t.Errorf("cannot create reader for %v: %v", objectID, err) - continue - } - - d, err := ioutil.ReadAll(reader) - if err != nil { - t.Errorf("cannot read all data for %v: %v", objectID, err) - continue - } - if !bytes.Equal(d, c.payload) { - t.Errorf("incorrect payload for %v: expected: %v got: %v", objectID, c.payload, d) - continue - } - - } -} - func TestMalformedStoredData(t *testing.T) { data, repo := setupTest(t)