added per-block locking to caching storage

This commit is contained in:
Jarek Kowalski
2016-08-09 20:22:34 -07:00
parent 714d3deb2d
commit 7455a99f9d
4 changed files with 117 additions and 1 deletions

View File

@@ -4,6 +4,8 @@
"bytes"
"encoding/binary"
"errors"
"fmt"
"time"
)
const (
@@ -25,6 +27,18 @@ func (e *blockCacheEntry) isKnownSize() bool {
return e.size != sizeUnknown
}
func (e blockCacheEntry) GoString() string {
ts := time.Unix(e.accessTime/1000000000, e.accessTime%1000000000)
switch e.size {
case sizeDoesNotExists:
return fmt.Sprintf("entry[not-found;acc:%v]", ts)
case sizeUnknown:
return fmt.Sprintf("entry[exists-size-unknown;acc:%v]", ts)
default:
return fmt.Sprintf("entry[size:%v;acc:%v]", e.size, ts)
}
}
func (e *blockCacheEntry) serialize() []byte {
var buf bytes.Buffer

View File

@@ -28,6 +28,8 @@ type cachingStorage struct {
cache storage.Storage
db *bolt.DB
sizeBytes int64
lockMap
}
func defaultGetCurrentTime() int64 {
@@ -100,6 +102,9 @@ func (c *cachingStorage) BlockExists(id string) (bool, error) {
return entry.exists(), nil
}
c.Lock(id)
defer c.Unlock(id)
exists, err := c.master.BlockExists(id)
if err != nil {
return false, err
@@ -110,6 +115,9 @@ func (c *cachingStorage) BlockExists(id string) (bool, error) {
}
func (c *cachingStorage) DeleteBlock(id string) error {
c.Lock(id)
defer c.Unlock(id)
// Remove from cache first.
c.cache.DeleteBlock(id)
@@ -122,6 +130,9 @@ func (c *cachingStorage) DeleteBlock(id string) error {
}
func (c *cachingStorage) GetBlock(id string) ([]byte, error) {
c.Lock(id)
defer c.Unlock(id)
if blockCacheEntry, ok := c.getCacheEntry(id); ok {
if !blockCacheEntry.exists() {
return nil, storage.ErrBlockNotFound
@@ -136,9 +147,10 @@ func (c *cachingStorage) GetBlock(id string) ([]byte, error) {
b, err := c.master.GetBlock(id)
if err == nil {
l := int64(len(b))
data := storage.NewReader(bytes.NewBuffer(b))
c.cache.PutBlock(id, data, storage.PutOptionsOverwrite)
c.setCacheEntrySize(id, int64(data.Len()))
c.setCacheEntrySize(id, l)
} else if err == storage.ErrBlockNotFound {
c.setCacheEntrySize(id, sizeDoesNotExists)
}
@@ -147,6 +159,9 @@ func (c *cachingStorage) GetBlock(id string) ([]byte, error) {
}
func (c *cachingStorage) PutBlock(id string, data storage.ReaderWithLength, options storage.PutOptions) error {
c.Lock(id)
defer c.Unlock(id)
// Remove from cache first.
c.cache.DeleteBlock(id)
c.removeCacheEntry(id)
@@ -166,6 +181,7 @@ func (c *cachingStorage) Flush() error {
func (c *cachingStorage) Close() error {
if c.db != nil {
c.evict()
c.db.Close()
c.db = nil
}
@@ -222,6 +238,7 @@ func NewWrapper(master storage.Storage, options Options) (storage.Storage, error
cache: cs,
db: db,
sizeBytes: sizeBytes,
lockMap: newLockMap(),
}
return s, nil

View File

@@ -0,0 +1,40 @@
// Package caching implements a caching wrapper around another Storage.
package caching
import "sync"
type lockMap struct {
cond *sync.Cond
locks map[string]bool
}
func (l *lockMap) getSync(id string) (*sync.Cond, map[string]bool) {
return l.cond, l.locks
}
func (l *lockMap) Lock(id string) {
cv, locks := l.getSync(id)
cv.L.Lock()
for locks[id] {
cv.Wait()
}
locks[id] = true
cv.L.Unlock()
}
func (l *lockMap) Unlock(id string) {
cv, locks := l.getSync(id)
cv.L.Lock()
delete(locks, id)
cv.Signal()
cv.L.Unlock()
}
func newLockMap() lockMap {
return lockMap{
cond: &sync.Cond{L: &sync.Mutex{}},
locks: map[string]bool{},
}
}

View File

@@ -0,0 +1,45 @@
// Package caching implements a caching wrapper around another Storage.
package caching
import (
"fmt"
"sync"
"testing"
"time"
)
func TestLockMap(t *testing.T) {
lockMap := newLockMap()
var wg sync.WaitGroup
// 100 goroutines competing for 10 buckets
workers := 100
lockedBlocks := 10
workTime := 20 * time.Millisecond
counters := make([]int, lockedBlocks)
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(i int) {
b := i % lockedBlocks
blockID := fmt.Sprintf("block-%v", b)
for i := 0; i < 3; i++ {
lockMap.Lock(blockID)
if counters[b] != 0 {
t.Errorf("*** multiple goroutines entered block %v", blockID)
}
counters[b]++
time.Sleep(workTime)
counters[b]--
if counters[b] != 0 {
t.Errorf("*** multiple goroutines entered block %v", blockID)
}
lockMap.Unlock(blockID)
}
defer wg.Done()
}(i)
}
wg.Wait()
}