feat(repository): live cache eviction of expired BLOBs in persistent LRU content cache (#2879)

* feat(repository): live cache eviction for persistent lru content cache

* Update internal/cache/persistent_lru_cache.go

Co-authored-by: Ali Dowair <adowair@umich.edu>

* merge the mutex cache into list cache

---------

Co-authored-by: Shikhar Mall <small@kopia.io>
Co-authored-by: Ali Dowair <adowair@umich.edu>
This commit is contained in:
Shikhar Mall
2023-04-07 21:30:45 -07:00
committed by GitHub
parent 5a95fffb5f
commit 9f9309ad2a
15 changed files with 301 additions and 176 deletions

View File

@@ -165,7 +165,7 @@ func (s *mapStorage) Close(ctx context.Context) error {
return nil
}
func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -176,7 +176,7 @@ func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold ti
}
}
return nil
return s.keyTime[blobID], nil
}
func (s *mapStorage) ConnectionInfo() blob.ConnectionInfo {

View File

@@ -233,7 +233,7 @@ func (s *objectLockingMap) Close(ctx context.Context) error {
// delete-marker or if it does not exist then this becomes a no-op. If the
// latest version has retention parameters set then they are respected.
// Mutations are no allowed unless retention period expires.
func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold time.Duration) error {
func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold time.Duration) (time.Time, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -242,10 +242,10 @@ func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold
// no error if delete-marker or not-exists, prevent changing mtime
// of delete-markers
if errors.Is(err, blob.ErrBlobNotFound) {
return nil
return time.Time{}, nil
}
return err
return time.Time{}, err
}
n := s.timeNow()
@@ -253,7 +253,7 @@ func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold
e.mtime = n
}
return nil
return e.mtime, nil
}
// ConnectionInfo is a no-op.

View File

@@ -24,7 +24,7 @@
// Storage is the storage interface required by the cache and is implemented by the filesystem Storage.
type Storage interface {
blob.Storage
TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error
TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) (time.Time, error)
}
// NewStorageOrNil returns cache.Storage backed by the provided directory.

View File

@@ -2,6 +2,7 @@
import (
"context"
"time"
"github.com/pkg/errors"
@@ -28,6 +29,7 @@ type Options struct {
HMACSecret []byte
FetchFullBlobs bool
Sweep SweepSettings
TimeNow func() time.Time
}
type contentCacheImpl struct {
@@ -61,7 +63,7 @@ func (c *contentCacheImpl) GetContent(ctx context.Context, contentID string, blo
func (c *contentCacheImpl) getContentFromFullBlob(ctx context.Context, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error {
// acquire exclusive lock
mut := c.pc.GetFetchingMutex(string(blobID))
mut := c.pc.GetFetchingMutex(blobID)
mut.Lock()
defer mut.Unlock()
@@ -112,7 +114,7 @@ func (c *contentCacheImpl) fetchBlobInternal(ctx context.Context, blobID blob.ID
func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context, contentID string, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error {
// acquire shared lock on a blob, PrefetchBlob will acquire exclusive lock here.
mut := c.pc.GetFetchingMutex(string(blobID))
mut := c.pc.GetFetchingMutex(blobID)
mut.RLock()
defer mut.RUnlock()
@@ -122,7 +124,7 @@ func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context,
}
// acquire exclusive lock on the content
mut2 := c.pc.GetFetchingMutex(contentID)
mut2 := c.pc.GetFetchingMutex(blob.ID(contentID))
mut2.Lock()
defer mut2.Unlock()
@@ -159,7 +161,7 @@ func (c *contentCacheImpl) PrefetchBlob(ctx context.Context, blobID blob.ID) err
}
// acquire exclusive lock for the blob.
mut := c.pc.GetFetchingMutex(string(blobID))
mut := c.pc.GetFetchingMutex(blobID)
mut.Lock()
defer mut.Unlock()
@@ -190,7 +192,7 @@ func NewContentCache(ctx context.Context, st blob.Storage, opt Options, mr *metr
}
}
pc, err := NewPersistentCache(ctx, opt.CacheSubDir, cacheStorage, cacheprot.ChecksumProtection(opt.HMACSecret), opt.Sweep, mr)
pc, err := NewPersistentCache(ctx, opt.CacheSubDir, cacheStorage, cacheprot.ChecksumProtection(opt.HMACSecret), opt.Sweep, mr, opt.TimeNow)
if err != nil {
return nil, errors.Wrap(err, "unable to create base cache")
}

View File

@@ -25,7 +25,7 @@ func TestContentCacheForData(t *testing.T) {
Storage: cacheStorage,
HMACSecret: []byte{1, 2, 3, 4},
Sweep: cache.SweepSettings{
MaxSizeBytes: 100,
MaxSizeBytes: 150,
},
}, nil)
require.NoError(t, err)

View File

@@ -59,9 +59,9 @@ func TestCacheExpiration(t *testing.T) {
Storage: cacheStorage.(cache.Storage),
Sweep: cache.SweepSettings{
MaxSizeBytes: 10000,
SweepFrequency: 500 * time.Millisecond,
TouchThreshold: -1,
},
TimeNow: movingTimeFunc,
}, nil)
require.NoError(t, err)
@@ -80,9 +80,6 @@ func TestCacheExpiration(t *testing.T) {
err = cc.GetContent(ctx, "00000d", "content-4k", 0, -1, &tmp) // 4k
require.NoError(t, err)
// wait for a sweep
time.Sleep(2 * time.Second)
// 00000a and 00000b will be removed from cache because it's the oldest.
// to verify, let's remove content-4k from the underlying storage and make sure we can still read
// 00000c and 00000d from the cache but not 00000a nor 00000b
@@ -100,7 +97,7 @@ func TestCacheExpiration(t *testing.T) {
for _, tc := range cases {
got := cc.GetContent(ctx, tc.contentID, "content-4k", 0, -1, &tmp)
if assert.ErrorIs(t, got, tc.expectedError, "tc.contentID:", tc.contentID) {
if assert.ErrorIs(t, got, tc.expectedError, "tc.contentID: %v", tc.contentID) {
t.Logf("got correct error %v when reading content %v", tc.expectedError, tc.contentID)
}
}
@@ -311,6 +308,6 @@ type withoutTouchBlob struct {
blob.Storage
}
func (c withoutTouchBlob) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
return errors.Errorf("TouchBlob not implemented")
func (c withoutTouchBlob) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) {
return time.Time{}, errors.Errorf("TouchBlob not implemented")
}

View File

@@ -5,15 +5,12 @@
"container/heap"
"context"
"sync"
"sync/atomic"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/cacheprot"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/releasable"
@@ -25,34 +22,24 @@
var log = logging.Module("cache")
const (
// DefaultSweepFrequency is how frequently the contents of cache are sweeped to remove excess data.
DefaultSweepFrequency = 1 * time.Minute
// DefaultTouchThreshold specifies the resolution of timestamps used to determine which cache items
// to expire. This helps cache storage writes on frequently accessed items.
DefaultTouchThreshold = 10 * time.Minute
// Size of the mutex cache LRU.
// In case a mutex is evicted of the cache, the impact will be some redundant read,
// which given the size should be extremely rare.
mutexCacheSize = 10000
)
// PersistentCache provides persistent on-disk cache.
type PersistentCache struct {
anyChange atomic.Bool
listCacheMutex sync.Mutex
// +checklocks:listCacheMutex
listCache contentMetadataHeap
cacheStorage Storage
storageProtection cacheprot.StorageProtection
sweep SweepSettings
timeNow func() time.Time
description string
periodicSweepRunning sync.WaitGroup
periodicSweepClosed chan struct{}
mutexCache *lru.Cache
metricsStruct
}
@@ -62,25 +49,24 @@ func (c *PersistentCache) CacheStorage() Storage {
}
// GetFetchingMutex returns a RWMutex used to lock a blob or content during loading.
func (c *PersistentCache) GetFetchingMutex(key string) *sync.RWMutex {
func (c *PersistentCache) GetFetchingMutex(id blob.ID) *sync.RWMutex {
if c == nil {
// special case - also works on non-initialized cache pointer.
return &sync.RWMutex{}
}
if v, ok := c.mutexCache.Get(key); ok {
//nolint:forcetypeassert
return v.(*sync.RWMutex)
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
if _, entry := c.listCache.LookupByID(id); entry != nil {
return &entry.contentDownloadMutex
}
newVal := &sync.RWMutex{}
heap.Push(&c.listCache, blob.Metadata{BlobID: id})
if prevVal, ok, _ := c.mutexCache.PeekOrAdd(key, newVal); ok {
//nolint:forcetypeassert
return prevVal.(*sync.RWMutex)
}
_, entry := c.listCache.LookupByID(id)
return newVal
return &entry.contentDownloadMutex
}
// GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function.
@@ -97,7 +83,7 @@ func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func(
output.Reset()
mut := c.GetFetchingMutex(key)
mut := c.GetFetchingMutex(blob.ID(key))
mut.Lock()
defer mut.Unlock()
@@ -124,6 +110,62 @@ func (c *PersistentCache) GetFull(ctx context.Context, key string, output *gathe
return c.GetPartial(ctx, key, 0, -1, output)
}
func (c *PersistentCache) getPartialCacheHit(ctx context.Context, key string, length int64, output *gather.WriteBuffer) {
// cache hit
c.reportHitBytes(int64(output.Length()))
// cache hit
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
// Touching the blobs when cache is full can lead to cache never
// getting cleaned up if all the blobs fall under MinSweepAge.
//
// This can happen when the user is restoring large files (at
// comparable sizes to the cache size limitation) and MinSweepAge is
// sufficiently large. For large files which span over multiple
// blobs, every blob becomes least-recently-used.
//
// So, we'll avoid this until our cache usage drops to acceptable
// limits.
if c.isCacheFullLocked() {
c.listCacheCleanupLocked(ctx)
if c.isCacheFullLocked() {
return
}
}
// unlock for the expensive operation
c.listCacheMutex.Unlock()
mtime, err := c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold)
c.listCacheMutex.Lock()
if err == nil {
// insert or update the metadata
heap.Push(&c.listCache, blob.Metadata{
BlobID: blob.ID(key),
Length: length,
Timestamp: mtime,
})
}
}
func (c *PersistentCache) getPartialDeleteInvalidBlob(ctx context.Context, key string) {
// delete invalid blob
c.reportMalformedData()
if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err)
} else {
c.listCacheMutex.Lock()
if i, entry := c.listCache.LookupByID(blob.ID(key)); entry != nil {
heap.Remove(&c.listCache, i)
}
c.listCacheMutex.Unlock()
}
}
// GetPartial fetches the contents of a cached blob when (length < 0) or a subset of it (when length >= 0).
// returns false if not found.
func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, length int64, output *gather.WriteBuffer) bool {
@@ -142,21 +184,12 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le
}
if err := prot.Verify(key, tmp.Bytes(), output); err == nil {
// cache hit
c.reportHitBytes(int64(output.Length()))
// cache hit
c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) //nolint:errcheck
c.getPartialCacheHit(ctx, key, length, output)
return true
}
// delete invalid blob
c.reportMalformedData()
if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err)
}
c.getPartialDeleteInvalidBlob(ctx, key)
}
// cache miss
@@ -170,24 +203,57 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le
return false
}
func (c *PersistentCache) isCacheFullLocked() bool {
return c.listCache.DataSize() > c.sweep.MaxSizeBytes
}
// Put adds the provided key-value pair to the cache.
func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes) {
if c == nil {
return
}
c.anyChange.Store(true)
var (
protected gather.WriteBuffer
mtime time.Time
)
var protected gather.WriteBuffer
defer protected.Close()
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
// opportunistically cleanup cache before the PUT if we can
if c.isCacheFullLocked() {
c.listCacheCleanupLocked(ctx)
// Do not add more things to cache if it remains full after cleanup. We
// MUST NOT go over the specified limit for the cache space to avoid
// snapshots/restores from getting affected by the cache's storage use.
if c.isCacheFullLocked() {
return
}
}
// LOCK RELEASED for expensive operations
c.listCacheMutex.Unlock()
c.storageProtection.Protect(key, data, &protected)
if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{}); err != nil {
if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{GetModTime: &mtime}); err != nil {
c.reportStoreError()
log(ctx).Errorf("unable to add %v to %v: %v", key, c.description, err)
}
c.listCacheMutex.Lock()
// LOCK RE-ACQUIRED
c.listCache.Push(blob.Metadata{
BlobID: blob.ID(key),
Length: int64(protected.Bytes().Length()),
Timestamp: mtime,
})
c.listCacheCleanupLocked(ctx)
}
// Close closes the instance of persistent cache possibly waiting for at least one sweep to complete.
@@ -196,92 +262,136 @@ func (c *PersistentCache) Close(ctx context.Context) {
return
}
close(c.periodicSweepClosed)
c.periodicSweepRunning.Wait()
// if we added anything to the cache in this session, run one last sweep before shutting down.
if c.anyChange.Load() {
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Errorf("error during final sweep of the %v: %v", c.description, err)
}
}
releasable.Released("persistent-cache", c)
}
func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) {
defer c.periodicSweepRunning.Done()
for {
select {
case <-c.periodicSweepClosed:
return
case <-time.After(c.sweep.SweepFrequency):
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Errorf("error during periodic sweep of %v: %v", c.description, err)
}
}
}
type blobCacheEntry struct {
metadata blob.Metadata
contentDownloadMutex sync.RWMutex
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap []blob.Metadata
type contentMetadataHeap struct {
data []*blobCacheEntry
index map[blob.ID]int
dataSize int64
}
func (h contentMetadataHeap) Len() int { return len(h) }
func newContentMetadataHeap() contentMetadataHeap {
return contentMetadataHeap{index: make(map[blob.ID]int)}
}
func (h contentMetadataHeap) Len() int { return len(h.data) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
return h.data[i].metadata.Timestamp.Before(h.data[j].metadata.Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h.index[h.data[i].metadata.BlobID], h.index[h.data[j].metadata.BlobID] = h.index[h.data[j].metadata.BlobID], h.index[h.data[i].metadata.BlobID]
h.data[i], h.data[j] = h.data[j], h.data[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(blob.Metadata)) //nolint:forcetypeassert
bm := x.(blob.Metadata) //nolint:forcetypeassert
if i, exists := h.index[bm.BlobID]; exists {
// only accept newer timestamps
if h.data[i].metadata.Timestamp.IsZero() || bm.Timestamp.After(h.data[i].metadata.Timestamp) {
h.dataSize += bm.Length - h.data[i].metadata.Length
h.data[i] = &blobCacheEntry{metadata: bm}
heap.Fix(h, i)
}
} else {
h.index[bm.BlobID] = len(h.data)
h.data = append(h.data, &blobCacheEntry{metadata: bm})
h.dataSize += bm.Length
}
}
func (h *contentMetadataHeap) Pop() interface{} {
old := *h
old := h.data
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
h.data = old[0 : n-1]
h.dataSize -= item.metadata.Length
delete(h.index, item.metadata.BlobID)
return item
return item.metadata
}
func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) {
timer := timetrack.StartTimer()
func (h *contentMetadataHeap) LookupByID(id blob.ID) (int, *blobCacheEntry) {
i, ok := h.index[id]
if !ok {
return -1, nil
}
var h contentMetadataHeap
return i, h.data[i]
}
func (h contentMetadataHeap) DataSize() int64 { return h.dataSize }
func (c *PersistentCache) listCacheCleanupLocked(ctx context.Context) {
var (
totalRetainedSize int64
tooRecentBytes int64
tooRecentCount int
unsuccessfulDeletes []blob.Metadata
unsuccessfulDeletesSize int64
now = c.timeNow()
)
err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
// ignore items below minimal age.
if age := clock.Now().Sub(it.Timestamp); age < c.sweep.MinSweepAge {
// if there are blobs pending to be deleted ...
for c.listCache.DataSize() > 0 &&
// ... and everything including what we couldn't delete is still bigger than the threshold
(c.listCache.DataSize()+unsuccessfulDeletesSize) > c.sweep.MaxSizeBytes {
oldest := heap.Pop(&c.listCache).(blob.Metadata) //nolint:forcetypeassert
// stop here if the oldest item is below the specified minimal age
if age := now.Sub(oldest.Timestamp); age < c.sweep.MinSweepAge {
heap.Push(&c.listCache, oldest)
break
}
// unlock before the expensive operation
c.listCacheMutex.Unlock()
delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID)
c.listCacheMutex.Lock()
if delerr != nil {
log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr)
// accumulate unsuccessful deletes to be pushed back into the heap
// later so we do not attempt deleting the same blob multiple times
//
// after this we keep draining from the heap until we bring down
// c.listCache.DataSize() to zero
unsuccessfulDeletes = append(unsuccessfulDeletes, oldest)
unsuccessfulDeletesSize += oldest.Length
}
}
// put all unsuccessful deletes back into the heap
for _, m := range unsuccessfulDeletes {
heap.Push(&c.listCache, m)
}
}
func (c *PersistentCache) initialScan(ctx context.Context) error {
timer := timetrack.StartTimer()
var (
tooRecentBytes int64
tooRecentCount int
now = c.timeNow()
)
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
// count items below minimal age.
if age := now.Sub(it.Timestamp); age < c.sweep.MinSweepAge {
tooRecentCount++
tooRecentBytes += it.Length
return nil
}
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.sweep.MaxSizeBytes {
oldest := heap.Pop(&h).(blob.Metadata) //nolint:forcetypeassert
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
heap.Push(&c.listCache, it)
return nil
})
@@ -289,6 +399,10 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) {
return errors.Wrapf(err, "error listing %v", c.description)
}
if c.isCacheFullLocked() {
c.listCacheCleanupLocked(ctx)
}
dur := timer.Elapsed()
const hundredPercent = 100
@@ -296,14 +410,14 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) {
inUsePercent := int64(hundredPercent)
if c.sweep.MaxSizeBytes != 0 {
inUsePercent = hundredPercent * totalRetainedSize / c.sweep.MaxSizeBytes
inUsePercent = hundredPercent * c.listCache.DataSize() / c.sweep.MaxSizeBytes
}
log(ctx).Debugw(
"finished sweeping",
"finished initial cache scan",
"cache", c.description,
"duration", dur,
"totalRetainedSize", totalRetainedSize,
"totalRetainedSize", c.listCache.DataSize(),
"tooRecentBytes", tooRecentBytes,
"tooRecentCount", tooRecentCount,
"maxSizeBytes", c.sweep.MaxSizeBytes,
@@ -316,7 +430,6 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) {
// SweepSettings encapsulates settings that impact cache item sweep/expiration.
type SweepSettings struct {
MaxSizeBytes int64
SweepFrequency time.Duration
MinSweepAge time.Duration
TouchThreshold time.Duration
}
@@ -326,15 +439,11 @@ func (s SweepSettings) applyDefaults() SweepSettings {
s.TouchThreshold = DefaultTouchThreshold
}
if s.SweepFrequency == 0 {
s.SweepFrequency = DefaultSweepFrequency
}
return s
}
// NewPersistentCache creates the persistent cache in the provided storage.
func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection cacheprot.StorageProtection, sweep SweepSettings, mr *metrics.Registry) (*PersistentCache, error) {
func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection cacheprot.StorageProtection, sweep SweepSettings, mr *metrics.Registry, timeNow func() time.Time) (*PersistentCache, error) {
if cacheStorage == nil {
return nil, nil
}
@@ -346,15 +455,18 @@ func NewPersistentCache(ctx context.Context, description string, cacheStorage St
}
c := &PersistentCache{
cacheStorage: cacheStorage,
sweep: sweep,
periodicSweepClosed: make(chan struct{}),
description: description,
storageProtection: storageProtection,
metricsStruct: initMetricsStruct(mr, description),
cacheStorage: cacheStorage,
sweep: sweep,
description: description,
storageProtection: storageProtection,
metricsStruct: initMetricsStruct(mr, description),
listCache: newContentMetadataHeap(),
timeNow: timeNow,
}
c.mutexCache, _ = lru.New(mutexCacheSize)
if c.timeNow == nil {
c.timeNow = clock.Now
}
// verify that cache storage is functional by listing from it
if _, err := c.cacheStorage.GetMetadata(ctx, "test-blob"); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
@@ -363,9 +475,9 @@ func NewPersistentCache(ctx context.Context, description string, cacheStorage St
releasable.Created("persistent-cache", c)
c.periodicSweepRunning.Add(1)
go c.sweepDirectoryPeriodically(ctxutil.Detach(ctx))
if err := c.initialScan(ctx); err != nil {
return nil, errors.Wrapf(err, "error during initial scan of %s", c.description)
}
return c, nil
}

View File

@@ -12,6 +12,7 @@
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/cacheprot"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/fault"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
@@ -29,8 +30,7 @@ func TestPersistentLRUCache(t *testing.T) {
pc, err := cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
MaxSizeBytes: maxSizeBytes,
TouchThreshold: cache.DefaultTouchThreshold,
SweepFrequency: cache.DefaultSweepFrequency,
}, nil)
}, nil, clock.Now)
require.NoError(t, err)
var tmp gather.WriteBuffer
@@ -71,8 +71,7 @@ func TestPersistentLRUCache(t *testing.T) {
pc, err = cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
MaxSizeBytes: maxSizeBytes,
TouchThreshold: cache.DefaultTouchThreshold,
SweepFrequency: cache.DefaultSweepFrequency,
}, nil)
}, nil, clock.Now)
require.NoError(t, err)
verifyCached(ctx, t, pc, "key1", nil)
@@ -85,8 +84,7 @@ func TestPersistentLRUCache(t *testing.T) {
pc2, err := cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{3, 2, 1}), cache.SweepSettings{
MaxSizeBytes: maxSizeBytes,
TouchThreshold: cache.DefaultTouchThreshold,
SweepFrequency: cache.DefaultSweepFrequency,
}, nil)
}, nil, clock.Now)
require.NoError(t, err)
someError := errors.Errorf("some error")
@@ -119,8 +117,8 @@ type faultyCache struct {
*blobtesting.FaultyStorage
}
func (faultyCache) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
return nil
func (faultyCache) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) {
return time.Time{}, nil
}
func TestPersistentLRUCache_Invalid(t *testing.T) {
@@ -136,7 +134,7 @@ func TestPersistentLRUCache_Invalid(t *testing.T) {
fs.AddFault(blobtesting.MethodGetMetadata).ErrorInstead(someError)
pc, err := cache.NewPersistentCache(ctx, "test", fc, nil, cache.SweepSettings{}, nil)
pc, err := cache.NewPersistentCache(ctx, "test", fc, nil, cache.SweepSettings{}, nil, clock.Now)
require.ErrorIs(t, err, someError)
require.Nil(t, pc)
}
@@ -154,7 +152,7 @@ func TestPersistentLRUCache_GetDeletesInvalidBlob(t *testing.T) {
fs := blobtesting.NewFaultyStorage(st)
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{}, nil)
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{MaxSizeBytes: 100}, nil, clock.Now)
require.NoError(t, err)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
@@ -184,7 +182,7 @@ func TestPersistentLRUCache_PutIgnoresStorageFailure(t *testing.T) {
fs := blobtesting.NewFaultyStorage(st)
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{}, nil)
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{}, nil, clock.Now)
require.NoError(t, err)
fs.AddFault(blobtesting.MethodPutBlob).ErrorInstead(someError)
@@ -211,10 +209,9 @@ func TestPersistentLRUCache_SweepMinSweepAge(t *testing.T) {
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
SweepFrequency: 100 * time.Millisecond,
MaxSizeBytes: 1000,
MinSweepAge: 10 * time.Second,
}, nil)
MaxSizeBytes: 1000,
MinSweepAge: 10 * time.Second,
}, nil, clock.Now)
require.NoError(t, err)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6)))
@@ -240,9 +237,8 @@ func TestPersistentLRUCache_SweepIgnoresErrors(t *testing.T) {
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
SweepFrequency: 100 * time.Millisecond,
MaxSizeBytes: 1000,
}, nil)
MaxSizeBytes: 1000,
}, nil, clock.Now)
require.NoError(t, err)
// ignore delete errors forever
@@ -273,10 +269,9 @@ func TestPersistentLRUCache_Sweep1(t *testing.T) {
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
SweepFrequency: 10 * time.Millisecond,
MaxSizeBytes: 1,
MinSweepAge: 0 * time.Second,
}, nil)
MaxSizeBytes: 1,
MinSweepAge: 0 * time.Second,
}, nil, clock.Now)
require.NoError(t, err)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
pc.Put(ctx, "key", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6)))
@@ -330,7 +325,7 @@ func TestPersistentLRUCache_Defaults(t *testing.T) {
pc, err := cache.NewPersistentCache(ctx, "testing", cs, nil, cache.SweepSettings{
MaxSizeBytes: maxSizeBytes,
}, nil)
}, nil, clock.Now)
require.NoError(t, err)
defer pc.Close(ctx)

View File

@@ -279,9 +279,11 @@ func (fs *fsImpl) ReadDir(ctx context.Context, dirname string) ([]os.FileInfo, e
}
// TouchBlob updates file modification time to current time if it's sufficiently old.
func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
//nolint:wrapcheck
return retry.WithExponentialBackoffNoValue(ctx, "TouchBlob", func() error {
func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) {
var mtime time.Time
//nolint:wrapcheck,forcetypeassert
err := retry.WithExponentialBackoffNoValue(ctx, "TouchBlob", func() error {
_, path, err := fs.Storage.GetShardedPathAndFilePath(ctx, blobID)
if err != nil {
return errors.Wrap(err, "error getting sharded path")
@@ -296,17 +298,22 @@ func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold ti
}
n := clock.Now()
mtime = st.ModTime()
age := n.Sub(st.ModTime())
age := n.Sub(mtime)
if age < threshold {
return nil
}
mtime = n
log(ctx).Debugf("updating timestamp on %v to %v", path, n)
//nolint:wrapcheck
return osi.Chtimes(path, n, n)
}, fs.Impl.(*fsImpl).isRetriable) //nolint:forcetypeassert
}, fs.Impl.(*fsImpl).isRetriable)
return mtime, err
}
func (fs *fsStorage) ConnectionInfo() blob.ConnectionInfo {

View File

@@ -112,19 +112,23 @@ func TestFileStorageTouch(t *testing.T) {
verifyBlobTimestampOrder(t, fs, t1, t2, t3)
assertNoError(t, fs.TouchBlob(ctx, t2, 1*time.Hour)) // has no effect, all timestamps are very new
_, err = fs.TouchBlob(ctx, t2, 1*time.Hour)
assertNoError(t, err) // has no effect, all timestamps are very new
verifyBlobTimestampOrder(t, fs, t1, t2, t3)
time.Sleep(2 * time.Second) // sleep a bit to accommodate Apple filesystems with low timestamp resolution
assertNoError(t, fs.TouchBlob(ctx, t1, 0)) // moves t1 to the top of the pile
_, err = fs.TouchBlob(ctx, t1, 0)
assertNoError(t, err) // moves t1 to the top of the pile
verifyBlobTimestampOrder(t, fs, t2, t3, t1)
time.Sleep(2 * time.Second) // sleep a bit to accommodate Apple filesystems with low timestamp resolution
assertNoError(t, fs.TouchBlob(ctx, t2, 0)) // moves t2 to the top of the pile
_, err = fs.TouchBlob(ctx, t2, 0)
assertNoError(t, err) // moves t2 to the top of the pile
verifyBlobTimestampOrder(t, fs, t3, t1, t2)
time.Sleep(2 * time.Second) // sleep a bit to accommodate Apple filesystems with low timestamp resolution
assertNoError(t, fs.TouchBlob(ctx, t1, 0)) // moves t1 to the top of the pile
_, err = fs.TouchBlob(ctx, t1, 0)
assertNoError(t, err) // moves t1 to the top of the pile
verifyBlobTimestampOrder(t, fs, t3, t2, t1)
}
@@ -423,7 +427,8 @@ func TestFileStorage_TouchBlob_ErrorHandling(t *testing.T) {
osi.statRemainingErrors.Store(1)
require.NoError(t, st.(*fsStorage).TouchBlob(ctx, "someblob1234567812345678", 0))
_, err = st.(*fsStorage).TouchBlob(ctx, "someblob1234567812345678", 0)
require.NoError(t, err)
}
func TestFileStorage_Misc(t *testing.T) {

View File

@@ -22,7 +22,7 @@
var log = logging.Module("sharded") // +checklocksignore
// Impl must be implemented by underlying provided.
// Impl must be implemented by underlying provider.
type Impl interface {
GetBlobFromPath(ctx context.Context, dirPath, filePath string, offset, length int64, output blob.OutputBuffer) error
GetMetadataFromPath(ctx context.Context, dirPath, filePath string) (blob.Metadata, error)

View File

@@ -212,10 +212,15 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st
}
func (d *davStorageImpl) DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error {
return d.translateError(retry.WithExponentialBackoffNoValue(ctx, "DeleteBlobInPath", func() error {
err := d.translateError(retry.WithExponentialBackoffNoValue(ctx, "DeleteBlobInPath", func() error {
//nolint:wrapcheck
return d.cli.Remove(filePath)
}, isRetriable))
if errors.Is(err, blob.ErrBlobNotFound) {
return nil
}
return err
}
func (d *davStorage) ConnectionInfo() blob.ConnectionInfo {

View File

@@ -455,7 +455,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs", indexBlobStorage, cacheprot.ChecksumProtection(caching.HMACSecret), cache.SweepSettings{
MaxSizeBytes: metadataCacheSize,
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
}, mr)
}, mr, sm.timeNow)
if err != nil {
return errors.Wrap(err, "unable to create index blob cache")
}

View File

@@ -132,7 +132,7 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
return openDirect(ctx, configFile, lc, password, options)
}
func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string, mr *metrics.Registry) (*cache.PersistentCache, error) {
func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string, mr *metrics.Registry, timeNow func() time.Time) (*cache.PersistentCache, error) {
opt = opt.CloneOrDefault()
cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.MaxCacheSizeBytes, "server-contents")
@@ -158,7 +158,7 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass
pc, err := cache.NewPersistentCache(ctx, "cache-storage", cs, prot, cache.SweepSettings{
MaxSizeBytes: opt.MaxCacheSizeBytes,
MinSweepAge: opt.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge),
}, mr)
}, mr, timeNow)
if err != nil {
return nil, errors.Wrap(err, "unable to open persistent cache")
}
@@ -172,7 +172,7 @@ func openAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions
mr := metrics.NewRegistry()
contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, mr)
contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, mr, options.TimeNowFunc)
if err != nil {
return nil, errors.Wrap(err, "error opening content cache")
}

View File

@@ -579,11 +579,13 @@ func TestObjectWritesWithRetention(t *testing.T) {
require.NoError(t, versionedMap.ListBlobs(ctx, "", func(it blob.Metadata) error {
for _, prefix := range prefixesWithRetention {
if strings.HasPrefix(string(it.BlobID), prefix) {
require.Error(t, versionedMap.TouchBlob(ctx, it.BlobID, 0), "expected error while touching blob %s", it.BlobID)
_, err = versionedMap.TouchBlob(ctx, it.BlobID, 0)
require.Error(t, err, "expected error while touching blob %s", it.BlobID)
return nil
}
}
require.NoError(t, versionedMap.TouchBlob(ctx, it.BlobID, 0), "unexpected error while touching blob %s", it.BlobID)
_, err = versionedMap.TouchBlob(ctx, it.BlobID, 0)
require.NoError(t, err, "unexpected error while touching blob %s", it.BlobID)
return nil
}))
}