Files
kopia/internal/cache/persistent_lru_cache.go
Jarek Kowalski daa62de3e4 chore(ci): added checklocks static analyzer (#1838)
From https://github.com/google/gvisor/tree/master/tools/checklocks

This will perform static verification that we're using
`sync.Mutex`, `sync.RWMutex` and `atomic` correctly to guard access
to certain fields.

This was mostly just a matter of adding annotations to indicate which
fields are guarded by which mutex.

In a handful of places the code had to be refactored to allow static
analyzer to do its job better or to not be confused by some
constructs.

In one place this actually uncovered a bug where a function was not
releasing a lock properly in an error case.

The check is part of `make lint` but can also be invoked by
`make check-locks`.
2022-03-19 22:42:59 -07:00

358 lines
8.9 KiB
Go

// Package cache implements durable on-disk cache with LRU expiration.
package cache
import (
"container/heap"
"context"
"sync"
"sync/atomic"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
var log = logging.Module("cache")
const (
// DefaultSweepFrequency is how frequently the contents of cache are sweeped to remove excess data.
DefaultSweepFrequency = 1 * time.Minute
// DefaultTouchThreshold specifies the resolution of timestamps used to determine which cache items
// to expire. This helps cache storage writes on frequently accessed items.
DefaultTouchThreshold = 10 * time.Minute
// Size of the mutex cache LRU.
// In case a mutex is evicted of the cache, the impact will be some redundant read,
// which given the size should be extremely rare.
mutexCacheSize = 10000
)
// PersistentCache provides persistent on-disk cache.
type PersistentCache struct {
// +checkatomic
anyChange int32
cacheStorage Storage
storageProtection StorageProtection
sweep SweepSettings
description string
periodicSweepRunning sync.WaitGroup
periodicSweepClosed chan struct{}
mutexCache *lru.Cache
}
// CacheStorage returns cache storage.
func (c *PersistentCache) CacheStorage() Storage {
return c.cacheStorage
}
// GetFetchingMutex returns a RWMutex used to lock a blob or content during loading.
func (c *PersistentCache) GetFetchingMutex(key string) *sync.RWMutex {
if v, ok := c.mutexCache.Get(key); ok {
// nolint:forcetypeassert
return v.(*sync.RWMutex)
}
newVal := &sync.RWMutex{}
if prevVal, ok, _ := c.mutexCache.PeekOrAdd(key, newVal); ok {
// nolint:forcetypeassert
return prevVal.(*sync.RWMutex)
}
return newVal
}
// GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function.
// The function also appends and verifies HMAC checksums using provided secret on all cached items to ensure data integrity.
func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func(output *gather.WriteBuffer) error, output *gather.WriteBuffer) error {
if c == nil {
// special case - also works on non-initialized cache pointer.
return fetch(output)
}
if c.GetFull(ctx, key, output) {
return nil
}
output.Reset()
mut := c.GetFetchingMutex(key)
mut.Lock()
defer mut.Unlock()
// check again while holding the mutex
if c.GetFull(ctx, key, output) {
return nil
}
if err := fetch(output); err != nil {
reportMissError()
return err
}
reportMissBytes(int64(output.Length()))
c.Put(ctx, key, output.Bytes())
return nil
}
// GetFull fetches the contents of a full blob. Returns false if not found.
func (c *PersistentCache) GetFull(ctx context.Context, key string, output *gather.WriteBuffer) bool {
return c.GetPartial(ctx, key, 0, -1, output)
}
// GetPartial fetches the contents of a cached blob when (length < 0) or a subset of it (when length >= 0).
// returns false if not found.
func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, length int64, output *gather.WriteBuffer) bool {
if c == nil {
return false
}
var tmp gather.WriteBuffer
defer tmp.Close()
if err := c.cacheStorage.GetBlob(ctx, blob.ID(key), offset, length, &tmp); err == nil {
prot := c.storageProtection
if length >= 0 {
// only full items have protection.
prot = nullStorageProtection{}
}
if err := prot.Verify(key, tmp.Bytes(), output); err == nil {
// cache hit
reportHitBytes(int64(output.Length()))
// cache hit
c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) //nolint:errcheck
return true
}
// delete invalid blob
reportMalformedData()
if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err)
}
}
// cache miss
l := length
if l < 0 {
l = 0
}
reportMissBytes(l)
return false
}
// Put adds the provided key-value pair to the cache.
func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes) {
if c == nil {
return
}
atomic.StoreInt32(&c.anyChange, 1)
var protected gather.WriteBuffer
defer protected.Close()
c.storageProtection.Protect(key, data, &protected)
if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{}); err != nil {
reportStoreError()
log(ctx).Errorf("unable to add %v to %v: %v", key, c.description, err)
}
}
// Close closes the instance of persistent cache possibly waiting for at least one sweep to complete.
func (c *PersistentCache) Close(ctx context.Context) {
if c == nil {
return
}
close(c.periodicSweepClosed)
c.periodicSweepRunning.Wait()
// if we added anything to the cache in this sesion, run one last sweep before shutting down.
if atomic.LoadInt32(&c.anyChange) == 1 {
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Errorf("error during final sweep of the %v: %v", c.description, err)
}
}
}
func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) {
defer c.periodicSweepRunning.Done()
for {
select {
case <-c.periodicSweepClosed:
return
case <-time.After(c.sweep.SweepFrequency):
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Errorf("error during periodic sweep of %v: %v", c.description, err)
}
}
}
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap []blob.Metadata
func (h contentMetadataHeap) Len() int { return len(h) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(blob.Metadata)) // nolint:forcetypeassert
}
func (h *contentMetadataHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) {
timer := timetrack.StartTimer()
var h contentMetadataHeap
var (
totalRetainedSize int64
tooRecentBytes int64
tooRecentCount int
)
err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
// ignore items below minimal age.
if age := clock.Now().Sub(it.Timestamp); age < c.sweep.MinSweepAge {
tooRecentCount++
tooRecentBytes += it.Length
return nil
}
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.sweep.MaxSizeBytes {
oldest := heap.Pop(&h).(blob.Metadata) //nolint:forcetypeassert
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
return nil
})
if err != nil {
return errors.Wrapf(err, "error listing %v", c.description)
}
dur := timer.Elapsed()
const hundredPercent = 100
inUsePercent := int64(hundredPercent)
if c.sweep.MaxSizeBytes != 0 {
inUsePercent = hundredPercent * totalRetainedSize / c.sweep.MaxSizeBytes
}
log(ctx).Debugw(
"finished sweeping",
"cache", c.description,
"duration", dur,
"totalRetainedSize", totalRetainedSize,
"tooRecentBytes", tooRecentBytes,
"tooRecentCount", tooRecentCount,
"maxSizeBytes", c.sweep.MaxSizeBytes,
"inUsePercent", inUsePercent,
)
return nil
}
// SweepSettings encapsulates settings that impact cache item sweep/expiration.
type SweepSettings struct {
MaxSizeBytes int64
SweepFrequency time.Duration
MinSweepAge time.Duration
TouchThreshold time.Duration
}
func (s SweepSettings) applyDefaults() SweepSettings {
if s.TouchThreshold == 0 {
s.TouchThreshold = DefaultTouchThreshold
}
if s.SweepFrequency == 0 {
s.SweepFrequency = DefaultSweepFrequency
}
return s
}
// NewPersistentCache creates the persistent cache in the provided storage.
func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection StorageProtection, sweep SweepSettings) (*PersistentCache, error) {
if cacheStorage == nil {
return nil, nil
}
sweep = sweep.applyDefaults()
if storageProtection == nil {
storageProtection = NoProtection()
}
c := &PersistentCache{
cacheStorage: cacheStorage,
sweep: sweep,
periodicSweepClosed: make(chan struct{}),
description: description,
storageProtection: storageProtection,
}
c.mutexCache, _ = lru.New(mutexCacheSize)
// verify that cache storage is functional by listing from it
if _, err := c.cacheStorage.GetMetadata(ctx, "test-blob"); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
return nil, errors.Wrapf(err, "unable to open %v", c.description)
}
c.periodicSweepRunning.Add(1)
go c.sweepDirectoryPeriodically(ctxutil.Detach(ctx))
return c, nil
}