diff --git a/storage/caching/cache_entry.go b/storage/caching/cache_entry.go index cbabba864..ddc153af0 100644 --- a/storage/caching/cache_entry.go +++ b/storage/caching/cache_entry.go @@ -4,6 +4,8 @@ "bytes" "encoding/binary" "errors" + "fmt" + "time" ) const ( @@ -25,6 +27,18 @@ func (e *blockCacheEntry) isKnownSize() bool { return e.size != sizeUnknown } +func (e blockCacheEntry) GoString() string { + ts := time.Unix(e.accessTime/1000000000, e.accessTime%1000000000) + switch e.size { + case sizeDoesNotExists: + return fmt.Sprintf("entry[not-found;acc:%v]", ts) + case sizeUnknown: + return fmt.Sprintf("entry[exists-size-unknown;acc:%v]", ts) + default: + return fmt.Sprintf("entry[size:%v;acc:%v]", e.size, ts) + } +} + func (e *blockCacheEntry) serialize() []byte { var buf bytes.Buffer diff --git a/storage/caching/caching_storage.go b/storage/caching/caching_storage.go index 9452a7cdc..6f515df6d 100644 --- a/storage/caching/caching_storage.go +++ b/storage/caching/caching_storage.go @@ -28,6 +28,8 @@ type cachingStorage struct { cache storage.Storage db *bolt.DB sizeBytes int64 + + lockMap } func defaultGetCurrentTime() int64 { @@ -100,6 +102,9 @@ func (c *cachingStorage) BlockExists(id string) (bool, error) { return entry.exists(), nil } + c.Lock(id) + defer c.Unlock(id) + exists, err := c.master.BlockExists(id) if err != nil { return false, err @@ -110,6 +115,9 @@ func (c *cachingStorage) BlockExists(id string) (bool, error) { } func (c *cachingStorage) DeleteBlock(id string) error { + c.Lock(id) + defer c.Unlock(id) + // Remove from cache first. c.cache.DeleteBlock(id) @@ -122,6 +130,9 @@ func (c *cachingStorage) DeleteBlock(id string) error { } func (c *cachingStorage) GetBlock(id string) ([]byte, error) { + c.Lock(id) + defer c.Unlock(id) + if blockCacheEntry, ok := c.getCacheEntry(id); ok { if !blockCacheEntry.exists() { return nil, storage.ErrBlockNotFound @@ -136,9 +147,10 @@ func (c *cachingStorage) GetBlock(id string) ([]byte, error) { b, err := c.master.GetBlock(id) if err == nil { + l := int64(len(b)) data := storage.NewReader(bytes.NewBuffer(b)) c.cache.PutBlock(id, data, storage.PutOptionsOverwrite) - c.setCacheEntrySize(id, int64(data.Len())) + c.setCacheEntrySize(id, l) } else if err == storage.ErrBlockNotFound { c.setCacheEntrySize(id, sizeDoesNotExists) } @@ -147,6 +159,9 @@ func (c *cachingStorage) GetBlock(id string) ([]byte, error) { } func (c *cachingStorage) PutBlock(id string, data storage.ReaderWithLength, options storage.PutOptions) error { + c.Lock(id) + defer c.Unlock(id) + // Remove from cache first. c.cache.DeleteBlock(id) c.removeCacheEntry(id) @@ -166,6 +181,7 @@ func (c *cachingStorage) Flush() error { func (c *cachingStorage) Close() error { if c.db != nil { + c.evict() c.db.Close() c.db = nil } @@ -222,6 +238,7 @@ func NewWrapper(master storage.Storage, options Options) (storage.Storage, error cache: cs, db: db, sizeBytes: sizeBytes, + lockMap: newLockMap(), } return s, nil diff --git a/storage/caching/lock_map.go b/storage/caching/lock_map.go new file mode 100644 index 000000000..078e165eb --- /dev/null +++ b/storage/caching/lock_map.go @@ -0,0 +1,40 @@ +// Package caching implements a caching wrapper around another Storage. +package caching + +import "sync" + +type lockMap struct { + cond *sync.Cond + locks map[string]bool +} + +func (l *lockMap) getSync(id string) (*sync.Cond, map[string]bool) { + return l.cond, l.locks +} + +func (l *lockMap) Lock(id string) { + cv, locks := l.getSync(id) + + cv.L.Lock() + for locks[id] { + cv.Wait() + } + locks[id] = true + cv.L.Unlock() +} + +func (l *lockMap) Unlock(id string) { + cv, locks := l.getSync(id) + + cv.L.Lock() + delete(locks, id) + cv.Signal() + cv.L.Unlock() +} + +func newLockMap() lockMap { + return lockMap{ + cond: &sync.Cond{L: &sync.Mutex{}}, + locks: map[string]bool{}, + } +} diff --git a/storage/caching/lock_map_test.go b/storage/caching/lock_map_test.go new file mode 100644 index 000000000..6be937544 --- /dev/null +++ b/storage/caching/lock_map_test.go @@ -0,0 +1,45 @@ +// Package caching implements a caching wrapper around another Storage. +package caching + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestLockMap(t *testing.T) { + lockMap := newLockMap() + var wg sync.WaitGroup + + // 100 goroutines competing for 10 buckets + workers := 100 + lockedBlocks := 10 + workTime := 20 * time.Millisecond + + counters := make([]int, lockedBlocks) + + wg.Add(workers) + for i := 0; i < workers; i++ { + go func(i int) { + b := i % lockedBlocks + blockID := fmt.Sprintf("block-%v", b) + for i := 0; i < 3; i++ { + lockMap.Lock(blockID) + if counters[b] != 0 { + t.Errorf("*** multiple goroutines entered block %v", blockID) + } + counters[b]++ + time.Sleep(workTime) + counters[b]-- + if counters[b] != 0 { + t.Errorf("*** multiple goroutines entered block %v", blockID) + } + lockMap.Unlock(blockID) + } + defer wg.Done() + }(i) + } + + wg.Wait() +}