mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-04-04 15:43:54 -04:00
588 lines
14 KiB
Go
588 lines
14 KiB
Go
package ttlcache
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/singleflight"
|
|
)
|
|
|
|
// Available eviction reasons.
|
|
const (
|
|
EvictionReasonDeleted EvictionReason = iota + 1
|
|
EvictionReasonCapacityReached
|
|
EvictionReasonExpired
|
|
)
|
|
|
|
// EvictionReason is used to specify why a certain item was
|
|
// evicted/deleted.
|
|
type EvictionReason int
|
|
|
|
// Cache is a synchronised map of items that are automatically removed
|
|
// when they expire or the capacity is reached.
|
|
type Cache[K comparable, V any] struct {
|
|
items struct {
|
|
mu sync.RWMutex
|
|
values map[K]*list.Element
|
|
|
|
// a generic doubly linked list would be more convenient
|
|
// (and more performant?). It's possible that this
|
|
// will be introduced with/in go1.19+
|
|
lru *list.List
|
|
expQueue expirationQueue[K, V]
|
|
|
|
timerCh chan time.Duration
|
|
}
|
|
|
|
metricsMu sync.RWMutex
|
|
metrics Metrics
|
|
|
|
events struct {
|
|
insertion struct {
|
|
mu sync.RWMutex
|
|
nextID uint64
|
|
fns map[uint64]func(*Item[K, V])
|
|
}
|
|
eviction struct {
|
|
mu sync.RWMutex
|
|
nextID uint64
|
|
fns map[uint64]func(EvictionReason, *Item[K, V])
|
|
}
|
|
}
|
|
|
|
stopCh chan struct{}
|
|
options options[K, V]
|
|
}
|
|
|
|
// New creates a new instance of cache.
|
|
func New[K comparable, V any](opts ...Option[K, V]) *Cache[K, V] {
|
|
c := &Cache[K, V]{
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
c.items.values = make(map[K]*list.Element)
|
|
c.items.lru = list.New()
|
|
c.items.expQueue = newExpirationQueue[K, V]()
|
|
c.items.timerCh = make(chan time.Duration, 1) // buffer is important
|
|
c.events.insertion.fns = make(map[uint64]func(*Item[K, V]))
|
|
c.events.eviction.fns = make(map[uint64]func(EvictionReason, *Item[K, V]))
|
|
|
|
applyOptions(&c.options, opts...)
|
|
|
|
return c
|
|
}
|
|
|
|
// updateExpirations updates the expiration queue and notifies
|
|
// the cache auto cleaner if needed.
|
|
// Not concurrently safe.
|
|
func (c *Cache[K, V]) updateExpirations(fresh bool, elem *list.Element) {
|
|
var oldExpiresAt time.Time
|
|
|
|
if !c.items.expQueue.isEmpty() {
|
|
oldExpiresAt = c.items.expQueue[0].Value.(*Item[K, V]).expiresAt
|
|
}
|
|
|
|
if fresh {
|
|
c.items.expQueue.push(elem)
|
|
} else {
|
|
c.items.expQueue.update(elem)
|
|
}
|
|
|
|
newExpiresAt := c.items.expQueue[0].Value.(*Item[K, V]).expiresAt
|
|
|
|
// check if the closest/soonest expiration timestamp changed
|
|
if newExpiresAt.IsZero() || (!oldExpiresAt.IsZero() && !newExpiresAt.Before(oldExpiresAt)) {
|
|
return
|
|
}
|
|
|
|
d := time.Until(newExpiresAt)
|
|
|
|
// It's possible that the auto cleaner isn't active or
|
|
// is busy, so we need to drain the channel before
|
|
// sending a new value.
|
|
// Also, since this method is called after locking the items' mutex,
|
|
// we can be sure that there is no other concurrent call of this
|
|
// method
|
|
if len(c.items.timerCh) > 0 {
|
|
// we need to drain this channel in a select with a default
|
|
// case because it's possible that the auto cleaner
|
|
// read this channel just after we entered this if
|
|
select {
|
|
case d1 := <-c.items.timerCh:
|
|
if d1 < d {
|
|
d = d1
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
|
|
// since the channel has a size 1 buffer, we can be sure
|
|
// that the line below won't block (we can't overfill the buffer
|
|
// because we just drained it)
|
|
c.items.timerCh <- d
|
|
}
|
|
|
|
// set creates a new item, adds it to the cache and then returns it.
|
|
// Not concurrently safe.
|
|
func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] {
|
|
if ttl == DefaultTTL {
|
|
ttl = c.options.ttl
|
|
}
|
|
|
|
elem := c.get(key, false)
|
|
if elem != nil {
|
|
// update/overwrite an existing item
|
|
item := elem.Value.(*Item[K, V])
|
|
item.update(value, ttl)
|
|
c.updateExpirations(false, elem)
|
|
|
|
return item
|
|
}
|
|
|
|
if c.options.capacity != 0 && uint64(len(c.items.values)) >= c.options.capacity {
|
|
// delete the oldest item
|
|
c.evict(EvictionReasonCapacityReached, c.items.lru.Back())
|
|
}
|
|
|
|
// create a new item
|
|
item := newItem(key, value, ttl)
|
|
elem = c.items.lru.PushFront(item)
|
|
c.items.values[key] = elem
|
|
c.updateExpirations(true, elem)
|
|
|
|
c.metricsMu.Lock()
|
|
c.metrics.Insertions++
|
|
c.metricsMu.Unlock()
|
|
|
|
c.events.insertion.mu.RLock()
|
|
for _, fn := range c.events.insertion.fns {
|
|
fn(item)
|
|
}
|
|
c.events.insertion.mu.RUnlock()
|
|
|
|
return item
|
|
}
|
|
|
|
// get retrieves an item from the cache and extends its expiration
|
|
// time if 'touch' is set to true.
|
|
// It returns nil if the item is not found or is expired.
|
|
// Not concurrently safe.
|
|
func (c *Cache[K, V]) get(key K, touch bool) *list.Element {
|
|
elem := c.items.values[key]
|
|
if elem == nil {
|
|
return nil
|
|
}
|
|
|
|
item := elem.Value.(*Item[K, V])
|
|
if item.isExpiredUnsafe() {
|
|
return nil
|
|
}
|
|
|
|
c.items.lru.MoveToFront(elem)
|
|
|
|
if touch && item.ttl > 0 {
|
|
item.touch()
|
|
c.updateExpirations(false, elem)
|
|
}
|
|
|
|
return elem
|
|
}
|
|
|
|
// evict deletes items from the cache.
|
|
// If no items are provided, all currently present cache items
|
|
// are evicted.
|
|
// Not concurrently safe.
|
|
func (c *Cache[K, V]) evict(reason EvictionReason, elems ...*list.Element) {
|
|
if len(elems) > 0 {
|
|
c.metricsMu.Lock()
|
|
c.metrics.Evictions += uint64(len(elems))
|
|
c.metricsMu.Unlock()
|
|
|
|
c.events.eviction.mu.RLock()
|
|
for i := range elems {
|
|
item := elems[i].Value.(*Item[K, V])
|
|
delete(c.items.values, item.key)
|
|
c.items.lru.Remove(elems[i])
|
|
c.items.expQueue.remove(elems[i])
|
|
|
|
for _, fn := range c.events.eviction.fns {
|
|
fn(reason, item)
|
|
}
|
|
}
|
|
c.events.eviction.mu.RUnlock()
|
|
|
|
return
|
|
}
|
|
|
|
c.metricsMu.Lock()
|
|
c.metrics.Evictions += uint64(len(c.items.values))
|
|
c.metricsMu.Unlock()
|
|
|
|
c.events.eviction.mu.RLock()
|
|
for _, elem := range c.items.values {
|
|
item := elem.Value.(*Item[K, V])
|
|
|
|
for _, fn := range c.events.eviction.fns {
|
|
fn(reason, item)
|
|
}
|
|
}
|
|
c.events.eviction.mu.RUnlock()
|
|
|
|
c.items.values = make(map[K]*list.Element)
|
|
c.items.lru.Init()
|
|
c.items.expQueue = newExpirationQueue[K, V]()
|
|
}
|
|
|
|
// Set creates a new item from the provided key and value, adds
|
|
// it to the cache and then returns it. If an item associated with the
|
|
// provided key already exists, the new item overwrites the existing one.
|
|
func (c *Cache[K, V]) Set(key K, value V, ttl time.Duration) *Item[K, V] {
|
|
c.items.mu.Lock()
|
|
defer c.items.mu.Unlock()
|
|
|
|
return c.set(key, value, ttl)
|
|
}
|
|
|
|
// Get retrieves an item from the cache by the provided key.
|
|
// Unless this is disabled, it also extends/touches an item's
|
|
// expiration timestamp on successful retrieval.
|
|
// If the item is not found, a nil value is returned.
|
|
func (c *Cache[K, V]) Get(key K, opts ...Option[K, V]) *Item[K, V] {
|
|
getOpts := options[K, V]{
|
|
loader: c.options.loader,
|
|
disableTouchOnHit: c.options.disableTouchOnHit,
|
|
}
|
|
|
|
applyOptions(&getOpts, opts...)
|
|
|
|
c.items.mu.Lock()
|
|
elem := c.get(key, !getOpts.disableTouchOnHit)
|
|
c.items.mu.Unlock()
|
|
|
|
if elem == nil {
|
|
c.metricsMu.Lock()
|
|
c.metrics.Misses++
|
|
c.metricsMu.Unlock()
|
|
|
|
if getOpts.loader != nil {
|
|
return getOpts.loader.Load(c, key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
c.metricsMu.Lock()
|
|
c.metrics.Hits++
|
|
c.metricsMu.Unlock()
|
|
|
|
return elem.Value.(*Item[K, V])
|
|
}
|
|
|
|
// Delete deletes an item from the cache. If the item associated with
|
|
// the key is not found, the method is no-op.
|
|
func (c *Cache[K, V]) Delete(key K) {
|
|
c.items.mu.Lock()
|
|
defer c.items.mu.Unlock()
|
|
|
|
elem := c.items.values[key]
|
|
if elem == nil {
|
|
return
|
|
}
|
|
|
|
c.evict(EvictionReasonDeleted, elem)
|
|
}
|
|
|
|
// DeleteAll deletes all items from the cache.
|
|
func (c *Cache[K, V]) DeleteAll() {
|
|
c.items.mu.Lock()
|
|
c.evict(EvictionReasonDeleted)
|
|
c.items.mu.Unlock()
|
|
}
|
|
|
|
// DeleteExpired deletes all expired items from the cache.
|
|
func (c *Cache[K, V]) DeleteExpired() {
|
|
c.items.mu.Lock()
|
|
defer c.items.mu.Unlock()
|
|
|
|
if c.items.expQueue.isEmpty() {
|
|
return
|
|
}
|
|
|
|
e := c.items.expQueue[0]
|
|
for e.Value.(*Item[K, V]).isExpiredUnsafe() {
|
|
c.evict(EvictionReasonExpired, e)
|
|
|
|
if c.items.expQueue.isEmpty() {
|
|
break
|
|
}
|
|
|
|
// expiration queue has a new root
|
|
e = c.items.expQueue[0]
|
|
}
|
|
}
|
|
|
|
// Touch simulates an item's retrieval without actually returning it.
|
|
// Its main purpose is to extend an item's expiration timestamp.
|
|
// If the item is not found, the method is no-op.
|
|
func (c *Cache[K, V]) Touch(key K) {
|
|
c.items.mu.Lock()
|
|
c.get(key, true)
|
|
c.items.mu.Unlock()
|
|
}
|
|
|
|
// Len returns the number of items in the cache.
|
|
func (c *Cache[K, V]) Len() int {
|
|
c.items.mu.RLock()
|
|
defer c.items.mu.RUnlock()
|
|
|
|
return len(c.items.values)
|
|
}
|
|
|
|
// Keys returns all keys currently present in the cache.
|
|
func (c *Cache[K, V]) Keys() []K {
|
|
c.items.mu.RLock()
|
|
defer c.items.mu.RUnlock()
|
|
|
|
res := make([]K, 0, len(c.items.values))
|
|
for k := range c.items.values {
|
|
res = append(res, k)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
// Items returns a copy of all items in the cache.
|
|
// It does not update any expiration timestamps.
|
|
func (c *Cache[K, V]) Items() map[K]*Item[K, V] {
|
|
c.items.mu.RLock()
|
|
defer c.items.mu.RUnlock()
|
|
|
|
items := make(map[K]*Item[K, V], len(c.items.values))
|
|
for k := range c.items.values {
|
|
item := c.get(k, false)
|
|
if item != nil {
|
|
items[k] = item.Value.(*Item[K, V])
|
|
}
|
|
}
|
|
|
|
return items
|
|
}
|
|
|
|
// Metrics returns the metrics of the cache.
|
|
func (c *Cache[K, V]) Metrics() Metrics {
|
|
c.metricsMu.RLock()
|
|
defer c.metricsMu.RUnlock()
|
|
|
|
return c.metrics
|
|
}
|
|
|
|
// Start starts an automatic cleanup process that
|
|
// periodically deletes expired items.
|
|
// It blocks until Stop is called.
|
|
func (c *Cache[K, V]) Start() {
|
|
waitDur := func() time.Duration {
|
|
c.items.mu.RLock()
|
|
defer c.items.mu.RUnlock()
|
|
|
|
if !c.items.expQueue.isEmpty() &&
|
|
!c.items.expQueue[0].Value.(*Item[K, V]).expiresAt.IsZero() {
|
|
d := time.Until(c.items.expQueue[0].Value.(*Item[K, V]).expiresAt)
|
|
if d <= 0 {
|
|
// execute immediately
|
|
return time.Microsecond
|
|
}
|
|
|
|
return d
|
|
}
|
|
|
|
if c.options.ttl > 0 {
|
|
return c.options.ttl
|
|
}
|
|
|
|
return time.Hour
|
|
}
|
|
|
|
timer := time.NewTimer(waitDur())
|
|
stop := func() {
|
|
if !timer.Stop() {
|
|
// drain the timer chan
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
defer stop()
|
|
|
|
for {
|
|
select {
|
|
case <-c.stopCh:
|
|
return
|
|
case d := <-c.items.timerCh:
|
|
stop()
|
|
timer.Reset(d)
|
|
case <-timer.C:
|
|
c.DeleteExpired()
|
|
stop()
|
|
timer.Reset(waitDur())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the automatic cleanup process.
|
|
// It blocks until the cleanup process exits.
|
|
func (c *Cache[K, V]) Stop() {
|
|
c.stopCh <- struct{}{}
|
|
}
|
|
|
|
// OnInsertion adds the provided function to be executed when
|
|
// a new item is inserted into the cache. The function is executed
|
|
// on a separate goroutine and does not block the flow of the cache
|
|
// manager.
|
|
// The returned function may be called to delete the subscription function
|
|
// from the list of insertion subscribers.
|
|
// When the returned function is called, it blocks until all instances of
|
|
// the same subscription function return. A context is used to notify the
|
|
// subscription function when the returned/deletion function is called.
|
|
func (c *Cache[K, V]) OnInsertion(fn func(context.Context, *Item[K, V])) func() {
|
|
var (
|
|
wg sync.WaitGroup
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
)
|
|
|
|
c.events.insertion.mu.Lock()
|
|
id := c.events.insertion.nextID
|
|
c.events.insertion.fns[id] = func(item *Item[K, V]) {
|
|
wg.Add(1)
|
|
go func() {
|
|
fn(ctx, item)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
c.events.insertion.nextID++
|
|
c.events.insertion.mu.Unlock()
|
|
|
|
return func() {
|
|
cancel()
|
|
|
|
c.events.insertion.mu.Lock()
|
|
delete(c.events.insertion.fns, id)
|
|
c.events.insertion.mu.Unlock()
|
|
|
|
wg.Wait()
|
|
}
|
|
}
|
|
|
|
// OnEviction adds the provided function to be executed when
|
|
// an item is evicted/deleted from the cache. The function is executed
|
|
// on a separate goroutine and does not block the flow of the cache
|
|
// manager.
|
|
// The returned function may be called to delete the subscription function
|
|
// from the list of eviction subscribers.
|
|
// When the returned function is called, it blocks until all instances of
|
|
// the same subscription function return. A context is used to notify the
|
|
// subscription function when the returned/deletion function is called.
|
|
func (c *Cache[K, V]) OnEviction(fn func(context.Context, EvictionReason, *Item[K, V])) func() {
|
|
var (
|
|
wg sync.WaitGroup
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
)
|
|
|
|
c.events.eviction.mu.Lock()
|
|
id := c.events.eviction.nextID
|
|
c.events.eviction.fns[id] = func(r EvictionReason, item *Item[K, V]) {
|
|
wg.Add(1)
|
|
go func() {
|
|
fn(ctx, r, item)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
c.events.eviction.nextID++
|
|
c.events.eviction.mu.Unlock()
|
|
|
|
return func() {
|
|
cancel()
|
|
|
|
c.events.eviction.mu.Lock()
|
|
delete(c.events.eviction.fns, id)
|
|
c.events.eviction.mu.Unlock()
|
|
|
|
wg.Wait()
|
|
}
|
|
}
|
|
|
|
// Loader is an interface that handles missing data loading.
|
|
type Loader[K comparable, V any] interface {
|
|
// Load should execute a custom item retrieval logic and
|
|
// return the item that is associated with the key.
|
|
// It should return nil if the item is not found/valid.
|
|
// The method is allowed to fetch data from the cache instance
|
|
// or update it for future use.
|
|
Load(c *Cache[K, V], key K) *Item[K, V]
|
|
}
|
|
|
|
// LoaderFunc type is an adapter that allows the use of ordinary
|
|
// functions as data loaders.
|
|
type LoaderFunc[K comparable, V any] func(*Cache[K, V], K) *Item[K, V]
|
|
|
|
// Load executes a custom item retrieval logic and returns the item that
|
|
// is associated with the key.
|
|
// It returns nil if the item is not found/valid.
|
|
func (l LoaderFunc[K, V]) Load(c *Cache[K, V], key K) *Item[K, V] {
|
|
return l(c, key)
|
|
}
|
|
|
|
// SuppressedLoader wraps another Loader and suppresses duplicate
|
|
// calls to its Load method.
|
|
type SuppressedLoader[K comparable, V any] struct {
|
|
loader Loader[K, V]
|
|
group *singleflight.Group
|
|
}
|
|
|
|
// NewSuppressedLoader creates a new instance of suppressed loader.
|
|
// If the group parameter is nil, a newly created instance of
|
|
// *singleflight.Group is used.
|
|
func NewSuppressedLoader[K comparable, V any](loader Loader[K, V], group *singleflight.Group) *SuppressedLoader[K, V] {
|
|
if group == nil {
|
|
group = &singleflight.Group{}
|
|
}
|
|
|
|
return &SuppressedLoader[K, V]{
|
|
loader: loader,
|
|
group: group,
|
|
}
|
|
}
|
|
|
|
// Load executes a custom item retrieval logic and returns the item that
|
|
// is associated with the key.
|
|
// It returns nil if the item is not found/valid.
|
|
// It also ensures that only one execution of the wrapped Loader's Load
|
|
// method is in-flight for a given key at a time.
|
|
func (l *SuppressedLoader[K, V]) Load(c *Cache[K, V], key K) *Item[K, V] {
|
|
// there should be a better/generic way to create a
|
|
// singleflight Group's key. It's possible that a generic
|
|
// singleflight.Group will be introduced with/in go1.19+
|
|
strKey := fmt.Sprint(key)
|
|
|
|
// the error can be discarded since the singleflight.Group
|
|
// itself does not return any of its errors, it returns
|
|
// the error that we return ourselves in the func below, which
|
|
// is also nil
|
|
res, _, _ := l.group.Do(strKey, func() (interface{}, error) {
|
|
item := l.loader.Load(c, key)
|
|
if item == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return item, nil
|
|
})
|
|
if res == nil {
|
|
return nil
|
|
}
|
|
|
|
return res.(*Item[K, V])
|
|
}
|