additional test for race involving blockmanager.delete

This commit is contained in:
Jarek Kowalski
2017-11-12 20:43:33 -08:00
parent c0d71714e1
commit 585b6ddcc1

View File

@@ -544,6 +544,65 @@ func TestDeleteBlock(t *testing.T) {
verifyBlockNotFound(t, bm, block2)
}
func TestDeleteAndRecreate(t *testing.T) {
// simulate race between delete/recreate and delete
// delete happens at t0+10, recreate at t0+20 and second delete time is parameterized.
// depending on it, the second delete results will be visible.
cases := []struct {
desc string
deletionTime time.Time
isVisible bool
}{
{"deleted before delete and-recreate", fakeTime.Add(5), true},
{"deleted after delete and recreate", fakeTime.Add(25), false},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
data := map[string][]byte{}
// write a block
bm := newTestBlockManager(data)
setFakeTimeWithAutoAdvance(bm, fakeTime, 1)
block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100))
bm.Flush()
// delete but at given timestamp but don't commit yet.
bm0 := newTestBlockManager(data)
setFakeTimeWithAutoAdvance(bm0, tc.deletionTime, 1)
bm0.DeleteBlock(block1)
// delete it at t0+10
bm1 := newTestBlockManager(data)
setFakeTimeWithAutoAdvance(bm1, fakeTime.Add(10), 1)
verifyBlock(t, bm1, block1, seededRandomData(10, 100))
bm1.DeleteBlock(block1)
bm1.Flush()
// recreate at t0+20
bm2 := newTestBlockManager(data)
setFakeTimeWithAutoAdvance(bm2, fakeTime.Add(20), 1)
block2 := writeBlockAndVerify(t, bm2, "some-group", seededRandomData(10, 100))
bm2.Flush()
// commit deletion from bm0 (t0+5)
bm0.Flush()
if block1 != block2 {
t.Errorf("got invalid block %v, expected %v", block2, block1)
}
bm3 := newTestBlockManager(data)
if tc.isVisible {
verifyBlock(t, bm3, block1, seededRandomData(10, 100))
} else {
verifyBlockNotFound(t, bm3, block1)
}
})
}
}
func newTestBlockManager(data map[string][]byte) *Manager {
st := storagetesting.NewMapStorage(data)
@@ -594,6 +653,7 @@ func verifyBlock(t *testing.T, bm *Manager, blockID string, b []byte) {
b2, err := bm.GetBlock(blockID)
if err != nil {
t.Errorf("unable to read block %q: %v", blockID, err)
return
}
if got, want := b2, b; !reflect.DeepEqual(got, want) {