From 9f9309ad2a0a69ae63c259bffdcd822984bdb076 Mon Sep 17 00:00:00 2001 From: Shikhar Mall Date: Fri, 7 Apr 2023 21:30:45 -0700 Subject: [PATCH] feat(repository): live cache eviction of expired BLOBs in persistent LRU content cache (#2879) * feat(repository): live cache eviction for persistent lru content cache * Update internal/cache/persistent_lru_cache.go Co-authored-by: Ali Dowair * merge the mutex cache into list cache --------- Co-authored-by: Shikhar Mall Co-authored-by: Ali Dowair --- internal/blobtesting/map.go | 4 +- internal/blobtesting/object_locking_map.go | 8 +- internal/cache/cache_storage.go | 2 +- internal/cache/content_cache.go | 12 +- internal/cache/content_cache_data_test.go | 2 +- internal/cache/content_cache_test.go | 11 +- internal/cache/persistent_lru_cache.go | 342 ++++++++++++------ internal/cache/persistent_lru_cache_test.go | 41 +-- repo/blob/filesystem/filesystem_storage.go | 17 +- .../filesystem/filesystem_storage_test.go | 15 +- repo/blob/sharded/sharded.go | 2 +- repo/blob/webdav/webdav_storage.go | 7 +- repo/content/committed_read_manager.go | 2 +- repo/open.go | 6 +- repo/repository_test.go | 6 +- 15 files changed, 301 insertions(+), 176 deletions(-) diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index a59939d29..78a4820e2 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -165,7 +165,7 @@ func (s *mapStorage) Close(ctx context.Context) error { return nil } -func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { +func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -176,7 +176,7 @@ func (s *mapStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold ti } } - return nil + return s.keyTime[blobID], nil } func (s *mapStorage) ConnectionInfo() blob.ConnectionInfo { diff --git a/internal/blobtesting/object_locking_map.go b/internal/blobtesting/object_locking_map.go index 5708cb59d..5954f6c70 100644 --- a/internal/blobtesting/object_locking_map.go +++ b/internal/blobtesting/object_locking_map.go @@ -233,7 +233,7 @@ func (s *objectLockingMap) Close(ctx context.Context) error { // delete-marker or if it does not exist then this becomes a no-op. If the // latest version has retention parameters set then they are respected. // Mutations are no allowed unless retention period expires. -func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold time.Duration) error { +func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold time.Duration) (time.Time, error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -242,10 +242,10 @@ func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold // no error if delete-marker or not-exists, prevent changing mtime // of delete-markers if errors.Is(err, blob.ErrBlobNotFound) { - return nil + return time.Time{}, nil } - return err + return time.Time{}, err } n := s.timeNow() @@ -253,7 +253,7 @@ func (s *objectLockingMap) TouchBlob(ctx context.Context, id blob.ID, threshold e.mtime = n } - return nil + return e.mtime, nil } // ConnectionInfo is a no-op. diff --git a/internal/cache/cache_storage.go b/internal/cache/cache_storage.go index b20c8a5cc..7affe3b25 100644 --- a/internal/cache/cache_storage.go +++ b/internal/cache/cache_storage.go @@ -24,7 +24,7 @@ // Storage is the storage interface required by the cache and is implemented by the filesystem Storage. type Storage interface { blob.Storage - TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error + TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) (time.Time, error) } // NewStorageOrNil returns cache.Storage backed by the provided directory. diff --git a/internal/cache/content_cache.go b/internal/cache/content_cache.go index c3d936eb0..b474230ae 100644 --- a/internal/cache/content_cache.go +++ b/internal/cache/content_cache.go @@ -2,6 +2,7 @@ import ( "context" + "time" "github.com/pkg/errors" @@ -28,6 +29,7 @@ type Options struct { HMACSecret []byte FetchFullBlobs bool Sweep SweepSettings + TimeNow func() time.Time } type contentCacheImpl struct { @@ -61,7 +63,7 @@ func (c *contentCacheImpl) GetContent(ctx context.Context, contentID string, blo func (c *contentCacheImpl) getContentFromFullBlob(ctx context.Context, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error { // acquire exclusive lock - mut := c.pc.GetFetchingMutex(string(blobID)) + mut := c.pc.GetFetchingMutex(blobID) mut.Lock() defer mut.Unlock() @@ -112,7 +114,7 @@ func (c *contentCacheImpl) fetchBlobInternal(ctx context.Context, blobID blob.ID func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context, contentID string, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error { // acquire shared lock on a blob, PrefetchBlob will acquire exclusive lock here. - mut := c.pc.GetFetchingMutex(string(blobID)) + mut := c.pc.GetFetchingMutex(blobID) mut.RLock() defer mut.RUnlock() @@ -122,7 +124,7 @@ func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context, } // acquire exclusive lock on the content - mut2 := c.pc.GetFetchingMutex(contentID) + mut2 := c.pc.GetFetchingMutex(blob.ID(contentID)) mut2.Lock() defer mut2.Unlock() @@ -159,7 +161,7 @@ func (c *contentCacheImpl) PrefetchBlob(ctx context.Context, blobID blob.ID) err } // acquire exclusive lock for the blob. - mut := c.pc.GetFetchingMutex(string(blobID)) + mut := c.pc.GetFetchingMutex(blobID) mut.Lock() defer mut.Unlock() @@ -190,7 +192,7 @@ func NewContentCache(ctx context.Context, st blob.Storage, opt Options, mr *metr } } - pc, err := NewPersistentCache(ctx, opt.CacheSubDir, cacheStorage, cacheprot.ChecksumProtection(opt.HMACSecret), opt.Sweep, mr) + pc, err := NewPersistentCache(ctx, opt.CacheSubDir, cacheStorage, cacheprot.ChecksumProtection(opt.HMACSecret), opt.Sweep, mr, opt.TimeNow) if err != nil { return nil, errors.Wrap(err, "unable to create base cache") } diff --git a/internal/cache/content_cache_data_test.go b/internal/cache/content_cache_data_test.go index 7e29660dc..53aa9bff0 100644 --- a/internal/cache/content_cache_data_test.go +++ b/internal/cache/content_cache_data_test.go @@ -25,7 +25,7 @@ func TestContentCacheForData(t *testing.T) { Storage: cacheStorage, HMACSecret: []byte{1, 2, 3, 4}, Sweep: cache.SweepSettings{ - MaxSizeBytes: 100, + MaxSizeBytes: 150, }, }, nil) require.NoError(t, err) diff --git a/internal/cache/content_cache_test.go b/internal/cache/content_cache_test.go index 2a546b561..18b684295 100644 --- a/internal/cache/content_cache_test.go +++ b/internal/cache/content_cache_test.go @@ -59,9 +59,9 @@ func TestCacheExpiration(t *testing.T) { Storage: cacheStorage.(cache.Storage), Sweep: cache.SweepSettings{ MaxSizeBytes: 10000, - SweepFrequency: 500 * time.Millisecond, TouchThreshold: -1, }, + TimeNow: movingTimeFunc, }, nil) require.NoError(t, err) @@ -80,9 +80,6 @@ func TestCacheExpiration(t *testing.T) { err = cc.GetContent(ctx, "00000d", "content-4k", 0, -1, &tmp) // 4k require.NoError(t, err) - // wait for a sweep - time.Sleep(2 * time.Second) - // 00000a and 00000b will be removed from cache because it's the oldest. // to verify, let's remove content-4k from the underlying storage and make sure we can still read // 00000c and 00000d from the cache but not 00000a nor 00000b @@ -100,7 +97,7 @@ func TestCacheExpiration(t *testing.T) { for _, tc := range cases { got := cc.GetContent(ctx, tc.contentID, "content-4k", 0, -1, &tmp) - if assert.ErrorIs(t, got, tc.expectedError, "tc.contentID:", tc.contentID) { + if assert.ErrorIs(t, got, tc.expectedError, "tc.contentID: %v", tc.contentID) { t.Logf("got correct error %v when reading content %v", tc.expectedError, tc.contentID) } } @@ -311,6 +308,6 @@ type withoutTouchBlob struct { blob.Storage } -func (c withoutTouchBlob) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { - return errors.Errorf("TouchBlob not implemented") +func (c withoutTouchBlob) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) { + return time.Time{}, errors.Errorf("TouchBlob not implemented") } diff --git a/internal/cache/persistent_lru_cache.go b/internal/cache/persistent_lru_cache.go index 5902dc876..5bc4c13e0 100644 --- a/internal/cache/persistent_lru_cache.go +++ b/internal/cache/persistent_lru_cache.go @@ -5,15 +5,12 @@ "container/heap" "context" "sync" - "sync/atomic" "time" - lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" "github.com/kopia/kopia/internal/cacheprot" "github.com/kopia/kopia/internal/clock" - "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/internal/releasable" @@ -25,34 +22,24 @@ var log = logging.Module("cache") const ( - // DefaultSweepFrequency is how frequently the contents of cache are sweeped to remove excess data. - DefaultSweepFrequency = 1 * time.Minute - // DefaultTouchThreshold specifies the resolution of timestamps used to determine which cache items // to expire. This helps cache storage writes on frequently accessed items. DefaultTouchThreshold = 10 * time.Minute - - // Size of the mutex cache LRU. - // In case a mutex is evicted of the cache, the impact will be some redundant read, - // which given the size should be extremely rare. - mutexCacheSize = 10000 ) // PersistentCache provides persistent on-disk cache. type PersistentCache struct { - anyChange atomic.Bool + listCacheMutex sync.Mutex + // +checklocks:listCacheMutex + listCache contentMetadataHeap cacheStorage Storage storageProtection cacheprot.StorageProtection sweep SweepSettings + timeNow func() time.Time description string - periodicSweepRunning sync.WaitGroup - periodicSweepClosed chan struct{} - - mutexCache *lru.Cache - metricsStruct } @@ -62,25 +49,24 @@ func (c *PersistentCache) CacheStorage() Storage { } // GetFetchingMutex returns a RWMutex used to lock a blob or content during loading. -func (c *PersistentCache) GetFetchingMutex(key string) *sync.RWMutex { +func (c *PersistentCache) GetFetchingMutex(id blob.ID) *sync.RWMutex { if c == nil { // special case - also works on non-initialized cache pointer. return &sync.RWMutex{} } - if v, ok := c.mutexCache.Get(key); ok { - //nolint:forcetypeassert - return v.(*sync.RWMutex) + c.listCacheMutex.Lock() + defer c.listCacheMutex.Unlock() + + if _, entry := c.listCache.LookupByID(id); entry != nil { + return &entry.contentDownloadMutex } - newVal := &sync.RWMutex{} + heap.Push(&c.listCache, blob.Metadata{BlobID: id}) - if prevVal, ok, _ := c.mutexCache.PeekOrAdd(key, newVal); ok { - //nolint:forcetypeassert - return prevVal.(*sync.RWMutex) - } + _, entry := c.listCache.LookupByID(id) - return newVal + return &entry.contentDownloadMutex } // GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function. @@ -97,7 +83,7 @@ func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func( output.Reset() - mut := c.GetFetchingMutex(key) + mut := c.GetFetchingMutex(blob.ID(key)) mut.Lock() defer mut.Unlock() @@ -124,6 +110,62 @@ func (c *PersistentCache) GetFull(ctx context.Context, key string, output *gathe return c.GetPartial(ctx, key, 0, -1, output) } +func (c *PersistentCache) getPartialCacheHit(ctx context.Context, key string, length int64, output *gather.WriteBuffer) { + // cache hit + c.reportHitBytes(int64(output.Length())) + + // cache hit + c.listCacheMutex.Lock() + defer c.listCacheMutex.Unlock() + + // Touching the blobs when cache is full can lead to cache never + // getting cleaned up if all the blobs fall under MinSweepAge. + // + // This can happen when the user is restoring large files (at + // comparable sizes to the cache size limitation) and MinSweepAge is + // sufficiently large. For large files which span over multiple + // blobs, every blob becomes least-recently-used. + // + // So, we'll avoid this until our cache usage drops to acceptable + // limits. + if c.isCacheFullLocked() { + c.listCacheCleanupLocked(ctx) + + if c.isCacheFullLocked() { + return + } + } + + // unlock for the expensive operation + c.listCacheMutex.Unlock() + mtime, err := c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) + c.listCacheMutex.Lock() + + if err == nil { + // insert or update the metadata + heap.Push(&c.listCache, blob.Metadata{ + BlobID: blob.ID(key), + Length: length, + Timestamp: mtime, + }) + } +} + +func (c *PersistentCache) getPartialDeleteInvalidBlob(ctx context.Context, key string) { + // delete invalid blob + c.reportMalformedData() + + if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) { + log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err) + } else { + c.listCacheMutex.Lock() + if i, entry := c.listCache.LookupByID(blob.ID(key)); entry != nil { + heap.Remove(&c.listCache, i) + } + c.listCacheMutex.Unlock() + } +} + // GetPartial fetches the contents of a cached blob when (length < 0) or a subset of it (when length >= 0). // returns false if not found. func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, length int64, output *gather.WriteBuffer) bool { @@ -142,21 +184,12 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le } if err := prot.Verify(key, tmp.Bytes(), output); err == nil { - // cache hit - c.reportHitBytes(int64(output.Length())) - - // cache hit - c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) //nolint:errcheck + c.getPartialCacheHit(ctx, key, length, output) return true } - // delete invalid blob - c.reportMalformedData() - - if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) { - log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err) - } + c.getPartialDeleteInvalidBlob(ctx, key) } // cache miss @@ -170,24 +203,57 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le return false } +func (c *PersistentCache) isCacheFullLocked() bool { + return c.listCache.DataSize() > c.sweep.MaxSizeBytes +} + // Put adds the provided key-value pair to the cache. func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes) { if c == nil { return } - c.anyChange.Store(true) + var ( + protected gather.WriteBuffer + mtime time.Time + ) - var protected gather.WriteBuffer defer protected.Close() + c.listCacheMutex.Lock() + defer c.listCacheMutex.Unlock() + + // opportunistically cleanup cache before the PUT if we can + if c.isCacheFullLocked() { + c.listCacheCleanupLocked(ctx) + // Do not add more things to cache if it remains full after cleanup. We + // MUST NOT go over the specified limit for the cache space to avoid + // snapshots/restores from getting affected by the cache's storage use. + if c.isCacheFullLocked() { + return + } + } + + // LOCK RELEASED for expensive operations + c.listCacheMutex.Unlock() c.storageProtection.Protect(key, data, &protected) - if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{}); err != nil { + if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{GetModTime: &mtime}); err != nil { c.reportStoreError() log(ctx).Errorf("unable to add %v to %v: %v", key, c.description, err) } + + c.listCacheMutex.Lock() + // LOCK RE-ACQUIRED + + c.listCache.Push(blob.Metadata{ + BlobID: blob.ID(key), + Length: int64(protected.Bytes().Length()), + Timestamp: mtime, + }) + + c.listCacheCleanupLocked(ctx) } // Close closes the instance of persistent cache possibly waiting for at least one sweep to complete. @@ -196,92 +262,136 @@ func (c *PersistentCache) Close(ctx context.Context) { return } - close(c.periodicSweepClosed) - c.periodicSweepRunning.Wait() - - // if we added anything to the cache in this session, run one last sweep before shutting down. - if c.anyChange.Load() { - if err := c.sweepDirectory(ctx); err != nil { - log(ctx).Errorf("error during final sweep of the %v: %v", c.description, err) - } - } - releasable.Released("persistent-cache", c) } -func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) { - defer c.periodicSweepRunning.Done() - - for { - select { - case <-c.periodicSweepClosed: - return - - case <-time.After(c.sweep.SweepFrequency): - if err := c.sweepDirectory(ctx); err != nil { - log(ctx).Errorf("error during periodic sweep of %v: %v", c.description, err) - } - } - } +type blobCacheEntry struct { + metadata blob.Metadata + contentDownloadMutex sync.RWMutex } // A contentMetadataHeap implements heap.Interface and holds blob.Metadata. -type contentMetadataHeap []blob.Metadata +type contentMetadataHeap struct { + data []*blobCacheEntry + index map[blob.ID]int + dataSize int64 +} -func (h contentMetadataHeap) Len() int { return len(h) } +func newContentMetadataHeap() contentMetadataHeap { + return contentMetadataHeap{index: make(map[blob.ID]int)} +} + +func (h contentMetadataHeap) Len() int { return len(h.data) } func (h contentMetadataHeap) Less(i, j int) bool { - return h[i].Timestamp.Before(h[j].Timestamp) + return h.data[i].metadata.Timestamp.Before(h.data[j].metadata.Timestamp) } func (h contentMetadataHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] + h.index[h.data[i].metadata.BlobID], h.index[h.data[j].metadata.BlobID] = h.index[h.data[j].metadata.BlobID], h.index[h.data[i].metadata.BlobID] + h.data[i], h.data[j] = h.data[j], h.data[i] } func (h *contentMetadataHeap) Push(x interface{}) { - *h = append(*h, x.(blob.Metadata)) //nolint:forcetypeassert + bm := x.(blob.Metadata) //nolint:forcetypeassert + if i, exists := h.index[bm.BlobID]; exists { + // only accept newer timestamps + if h.data[i].metadata.Timestamp.IsZero() || bm.Timestamp.After(h.data[i].metadata.Timestamp) { + h.dataSize += bm.Length - h.data[i].metadata.Length + h.data[i] = &blobCacheEntry{metadata: bm} + heap.Fix(h, i) + } + } else { + h.index[bm.BlobID] = len(h.data) + h.data = append(h.data, &blobCacheEntry{metadata: bm}) + h.dataSize += bm.Length + } } func (h *contentMetadataHeap) Pop() interface{} { - old := *h + old := h.data n := len(old) item := old[n-1] - *h = old[0 : n-1] + h.data = old[0 : n-1] + h.dataSize -= item.metadata.Length + delete(h.index, item.metadata.BlobID) - return item + return item.metadata } -func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { - timer := timetrack.StartTimer() +func (h *contentMetadataHeap) LookupByID(id blob.ID) (int, *blobCacheEntry) { + i, ok := h.index[id] + if !ok { + return -1, nil + } - var h contentMetadataHeap + return i, h.data[i] +} +func (h contentMetadataHeap) DataSize() int64 { return h.dataSize } + +func (c *PersistentCache) listCacheCleanupLocked(ctx context.Context) { var ( - totalRetainedSize int64 - tooRecentBytes int64 - tooRecentCount int + unsuccessfulDeletes []blob.Metadata + unsuccessfulDeletesSize int64 + now = c.timeNow() ) - err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - // ignore items below minimal age. - if age := clock.Now().Sub(it.Timestamp); age < c.sweep.MinSweepAge { + // if there are blobs pending to be deleted ... + for c.listCache.DataSize() > 0 && + // ... and everything including what we couldn't delete is still bigger than the threshold + (c.listCache.DataSize()+unsuccessfulDeletesSize) > c.sweep.MaxSizeBytes { + oldest := heap.Pop(&c.listCache).(blob.Metadata) //nolint:forcetypeassert + + // stop here if the oldest item is below the specified minimal age + if age := now.Sub(oldest.Timestamp); age < c.sweep.MinSweepAge { + heap.Push(&c.listCache, oldest) + break + } + + // unlock before the expensive operation + c.listCacheMutex.Unlock() + delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID) + c.listCacheMutex.Lock() + + if delerr != nil { + log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr) + // accumulate unsuccessful deletes to be pushed back into the heap + // later so we do not attempt deleting the same blob multiple times + // + // after this we keep draining from the heap until we bring down + // c.listCache.DataSize() to zero + unsuccessfulDeletes = append(unsuccessfulDeletes, oldest) + unsuccessfulDeletesSize += oldest.Length + } + } + + // put all unsuccessful deletes back into the heap + for _, m := range unsuccessfulDeletes { + heap.Push(&c.listCache, m) + } +} + +func (c *PersistentCache) initialScan(ctx context.Context) error { + timer := timetrack.StartTimer() + + var ( + tooRecentBytes int64 + tooRecentCount int + now = c.timeNow() + ) + + c.listCacheMutex.Lock() + defer c.listCacheMutex.Unlock() + + err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + // count items below minimal age. + if age := now.Sub(it.Timestamp); age < c.sweep.MinSweepAge { tooRecentCount++ tooRecentBytes += it.Length - - return nil } - heap.Push(&h, it) - totalRetainedSize += it.Length - - if totalRetainedSize > c.sweep.MaxSizeBytes { - oldest := heap.Pop(&h).(blob.Metadata) //nolint:forcetypeassert - if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { - log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr) - } else { - totalRetainedSize -= oldest.Length - } - } + heap.Push(&c.listCache, it) return nil }) @@ -289,6 +399,10 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { return errors.Wrapf(err, "error listing %v", c.description) } + if c.isCacheFullLocked() { + c.listCacheCleanupLocked(ctx) + } + dur := timer.Elapsed() const hundredPercent = 100 @@ -296,14 +410,14 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { inUsePercent := int64(hundredPercent) if c.sweep.MaxSizeBytes != 0 { - inUsePercent = hundredPercent * totalRetainedSize / c.sweep.MaxSizeBytes + inUsePercent = hundredPercent * c.listCache.DataSize() / c.sweep.MaxSizeBytes } log(ctx).Debugw( - "finished sweeping", + "finished initial cache scan", "cache", c.description, "duration", dur, - "totalRetainedSize", totalRetainedSize, + "totalRetainedSize", c.listCache.DataSize(), "tooRecentBytes", tooRecentBytes, "tooRecentCount", tooRecentCount, "maxSizeBytes", c.sweep.MaxSizeBytes, @@ -316,7 +430,6 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { // SweepSettings encapsulates settings that impact cache item sweep/expiration. type SweepSettings struct { MaxSizeBytes int64 - SweepFrequency time.Duration MinSweepAge time.Duration TouchThreshold time.Duration } @@ -326,15 +439,11 @@ func (s SweepSettings) applyDefaults() SweepSettings { s.TouchThreshold = DefaultTouchThreshold } - if s.SweepFrequency == 0 { - s.SweepFrequency = DefaultSweepFrequency - } - return s } // NewPersistentCache creates the persistent cache in the provided storage. -func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection cacheprot.StorageProtection, sweep SweepSettings, mr *metrics.Registry) (*PersistentCache, error) { +func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection cacheprot.StorageProtection, sweep SweepSettings, mr *metrics.Registry, timeNow func() time.Time) (*PersistentCache, error) { if cacheStorage == nil { return nil, nil } @@ -346,15 +455,18 @@ func NewPersistentCache(ctx context.Context, description string, cacheStorage St } c := &PersistentCache{ - cacheStorage: cacheStorage, - sweep: sweep, - periodicSweepClosed: make(chan struct{}), - description: description, - storageProtection: storageProtection, - metricsStruct: initMetricsStruct(mr, description), + cacheStorage: cacheStorage, + sweep: sweep, + description: description, + storageProtection: storageProtection, + metricsStruct: initMetricsStruct(mr, description), + listCache: newContentMetadataHeap(), + timeNow: timeNow, } - c.mutexCache, _ = lru.New(mutexCacheSize) + if c.timeNow == nil { + c.timeNow = clock.Now + } // verify that cache storage is functional by listing from it if _, err := c.cacheStorage.GetMetadata(ctx, "test-blob"); err != nil && !errors.Is(err, blob.ErrBlobNotFound) { @@ -363,9 +475,9 @@ func NewPersistentCache(ctx context.Context, description string, cacheStorage St releasable.Created("persistent-cache", c) - c.periodicSweepRunning.Add(1) - - go c.sweepDirectoryPeriodically(ctxutil.Detach(ctx)) + if err := c.initialScan(ctx); err != nil { + return nil, errors.Wrapf(err, "error during initial scan of %s", c.description) + } return c, nil } diff --git a/internal/cache/persistent_lru_cache_test.go b/internal/cache/persistent_lru_cache_test.go index 76ee7ee56..2f4ef66bf 100644 --- a/internal/cache/persistent_lru_cache_test.go +++ b/internal/cache/persistent_lru_cache_test.go @@ -12,6 +12,7 @@ "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/cacheprot" + "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/fault" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" @@ -29,8 +30,7 @@ func TestPersistentLRUCache(t *testing.T) { pc, err := cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ MaxSizeBytes: maxSizeBytes, TouchThreshold: cache.DefaultTouchThreshold, - SweepFrequency: cache.DefaultSweepFrequency, - }, nil) + }, nil, clock.Now) require.NoError(t, err) var tmp gather.WriteBuffer @@ -71,8 +71,7 @@ func TestPersistentLRUCache(t *testing.T) { pc, err = cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ MaxSizeBytes: maxSizeBytes, TouchThreshold: cache.DefaultTouchThreshold, - SweepFrequency: cache.DefaultSweepFrequency, - }, nil) + }, nil, clock.Now) require.NoError(t, err) verifyCached(ctx, t, pc, "key1", nil) @@ -85,8 +84,7 @@ func TestPersistentLRUCache(t *testing.T) { pc2, err := cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{3, 2, 1}), cache.SweepSettings{ MaxSizeBytes: maxSizeBytes, TouchThreshold: cache.DefaultTouchThreshold, - SweepFrequency: cache.DefaultSweepFrequency, - }, nil) + }, nil, clock.Now) require.NoError(t, err) someError := errors.Errorf("some error") @@ -119,8 +117,8 @@ type faultyCache struct { *blobtesting.FaultyStorage } -func (faultyCache) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { - return nil +func (faultyCache) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) { + return time.Time{}, nil } func TestPersistentLRUCache_Invalid(t *testing.T) { @@ -136,7 +134,7 @@ func TestPersistentLRUCache_Invalid(t *testing.T) { fs.AddFault(blobtesting.MethodGetMetadata).ErrorInstead(someError) - pc, err := cache.NewPersistentCache(ctx, "test", fc, nil, cache.SweepSettings{}, nil) + pc, err := cache.NewPersistentCache(ctx, "test", fc, nil, cache.SweepSettings{}, nil, clock.Now) require.ErrorIs(t, err, someError) require.Nil(t, pc) } @@ -154,7 +152,7 @@ func TestPersistentLRUCache_GetDeletesInvalidBlob(t *testing.T) { fs := blobtesting.NewFaultyStorage(st) fc := faultyCache{fs} - pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{}, nil) + pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{MaxSizeBytes: 100}, nil, clock.Now) require.NoError(t, err) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) @@ -184,7 +182,7 @@ func TestPersistentLRUCache_PutIgnoresStorageFailure(t *testing.T) { fs := blobtesting.NewFaultyStorage(st) fc := faultyCache{fs} - pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{}, nil) + pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{}, nil, clock.Now) require.NoError(t, err) fs.AddFault(blobtesting.MethodPutBlob).ErrorInstead(someError) @@ -211,10 +209,9 @@ func TestPersistentLRUCache_SweepMinSweepAge(t *testing.T) { fc := faultyCache{fs} pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ - SweepFrequency: 100 * time.Millisecond, - MaxSizeBytes: 1000, - MinSweepAge: 10 * time.Second, - }, nil) + MaxSizeBytes: 1000, + MinSweepAge: 10 * time.Second, + }, nil, clock.Now) require.NoError(t, err) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6))) @@ -240,9 +237,8 @@ func TestPersistentLRUCache_SweepIgnoresErrors(t *testing.T) { fc := faultyCache{fs} pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ - SweepFrequency: 100 * time.Millisecond, - MaxSizeBytes: 1000, - }, nil) + MaxSizeBytes: 1000, + }, nil, clock.Now) require.NoError(t, err) // ignore delete errors forever @@ -273,10 +269,9 @@ func TestPersistentLRUCache_Sweep1(t *testing.T) { fc := faultyCache{fs} pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ - SweepFrequency: 10 * time.Millisecond, - MaxSizeBytes: 1, - MinSweepAge: 0 * time.Second, - }, nil) + MaxSizeBytes: 1, + MinSweepAge: 0 * time.Second, + }, nil, clock.Now) require.NoError(t, err) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) pc.Put(ctx, "key", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6))) @@ -330,7 +325,7 @@ func TestPersistentLRUCache_Defaults(t *testing.T) { pc, err := cache.NewPersistentCache(ctx, "testing", cs, nil, cache.SweepSettings{ MaxSizeBytes: maxSizeBytes, - }, nil) + }, nil, clock.Now) require.NoError(t, err) defer pc.Close(ctx) diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index 626bf4c5f..8129a30ad 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -279,9 +279,11 @@ func (fs *fsImpl) ReadDir(ctx context.Context, dirname string) ([]os.FileInfo, e } // TouchBlob updates file modification time to current time if it's sufficiently old. -func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { - //nolint:wrapcheck - return retry.WithExponentialBackoffNoValue(ctx, "TouchBlob", func() error { +func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) (time.Time, error) { + var mtime time.Time + + //nolint:wrapcheck,forcetypeassert + err := retry.WithExponentialBackoffNoValue(ctx, "TouchBlob", func() error { _, path, err := fs.Storage.GetShardedPathAndFilePath(ctx, blobID) if err != nil { return errors.Wrap(err, "error getting sharded path") @@ -296,17 +298,22 @@ func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold ti } n := clock.Now() + mtime = st.ModTime() - age := n.Sub(st.ModTime()) + age := n.Sub(mtime) if age < threshold { return nil } + mtime = n + log(ctx).Debugf("updating timestamp on %v to %v", path, n) //nolint:wrapcheck return osi.Chtimes(path, n, n) - }, fs.Impl.(*fsImpl).isRetriable) //nolint:forcetypeassert + }, fs.Impl.(*fsImpl).isRetriable) + + return mtime, err } func (fs *fsStorage) ConnectionInfo() blob.ConnectionInfo { diff --git a/repo/blob/filesystem/filesystem_storage_test.go b/repo/blob/filesystem/filesystem_storage_test.go index 4601f4f11..226f108ec 100644 --- a/repo/blob/filesystem/filesystem_storage_test.go +++ b/repo/blob/filesystem/filesystem_storage_test.go @@ -112,19 +112,23 @@ func TestFileStorageTouch(t *testing.T) { verifyBlobTimestampOrder(t, fs, t1, t2, t3) - assertNoError(t, fs.TouchBlob(ctx, t2, 1*time.Hour)) // has no effect, all timestamps are very new + _, err = fs.TouchBlob(ctx, t2, 1*time.Hour) + assertNoError(t, err) // has no effect, all timestamps are very new verifyBlobTimestampOrder(t, fs, t1, t2, t3) time.Sleep(2 * time.Second) // sleep a bit to accommodate Apple filesystems with low timestamp resolution - assertNoError(t, fs.TouchBlob(ctx, t1, 0)) // moves t1 to the top of the pile + _, err = fs.TouchBlob(ctx, t1, 0) + assertNoError(t, err) // moves t1 to the top of the pile verifyBlobTimestampOrder(t, fs, t2, t3, t1) time.Sleep(2 * time.Second) // sleep a bit to accommodate Apple filesystems with low timestamp resolution - assertNoError(t, fs.TouchBlob(ctx, t2, 0)) // moves t2 to the top of the pile + _, err = fs.TouchBlob(ctx, t2, 0) + assertNoError(t, err) // moves t2 to the top of the pile verifyBlobTimestampOrder(t, fs, t3, t1, t2) time.Sleep(2 * time.Second) // sleep a bit to accommodate Apple filesystems with low timestamp resolution - assertNoError(t, fs.TouchBlob(ctx, t1, 0)) // moves t1 to the top of the pile + _, err = fs.TouchBlob(ctx, t1, 0) + assertNoError(t, err) // moves t1 to the top of the pile verifyBlobTimestampOrder(t, fs, t3, t2, t1) } @@ -423,7 +427,8 @@ func TestFileStorage_TouchBlob_ErrorHandling(t *testing.T) { osi.statRemainingErrors.Store(1) - require.NoError(t, st.(*fsStorage).TouchBlob(ctx, "someblob1234567812345678", 0)) + _, err = st.(*fsStorage).TouchBlob(ctx, "someblob1234567812345678", 0) + require.NoError(t, err) } func TestFileStorage_Misc(t *testing.T) { diff --git a/repo/blob/sharded/sharded.go b/repo/blob/sharded/sharded.go index f84c7ff85..ab46f6043 100644 --- a/repo/blob/sharded/sharded.go +++ b/repo/blob/sharded/sharded.go @@ -22,7 +22,7 @@ var log = logging.Module("sharded") // +checklocksignore -// Impl must be implemented by underlying provided. +// Impl must be implemented by underlying provider. type Impl interface { GetBlobFromPath(ctx context.Context, dirPath, filePath string, offset, length int64, output blob.OutputBuffer) error GetMetadataFromPath(ctx context.Context, dirPath, filePath string) (blob.Metadata, error) diff --git a/repo/blob/webdav/webdav_storage.go b/repo/blob/webdav/webdav_storage.go index 9e17d73a8..a274da7bf 100644 --- a/repo/blob/webdav/webdav_storage.go +++ b/repo/blob/webdav/webdav_storage.go @@ -212,10 +212,15 @@ func (d *davStorageImpl) PutBlobInPath(ctx context.Context, dirPath, filePath st } func (d *davStorageImpl) DeleteBlobInPath(ctx context.Context, dirPath, filePath string) error { - return d.translateError(retry.WithExponentialBackoffNoValue(ctx, "DeleteBlobInPath", func() error { + err := d.translateError(retry.WithExponentialBackoffNoValue(ctx, "DeleteBlobInPath", func() error { //nolint:wrapcheck return d.cli.Remove(filePath) }, isRetriable)) + if errors.Is(err, blob.ErrBlobNotFound) { + return nil + } + + return err } func (d *davStorage) ConnectionInfo() blob.ConnectionInfo { diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index bca87cc08..5027d365e 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -455,7 +455,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs", indexBlobStorage, cacheprot.ChecksumProtection(caching.HMACSecret), cache.SweepSettings{ MaxSizeBytes: metadataCacheSize, MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge), - }, mr) + }, mr, sm.timeNow) if err != nil { return errors.Wrap(err, "unable to create index blob cache") } diff --git a/repo/open.go b/repo/open.go index 472912bdb..dd7c5a206 100644 --- a/repo/open.go +++ b/repo/open.go @@ -132,7 +132,7 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r return openDirect(ctx, configFile, lc, password, options) } -func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string, mr *metrics.Registry) (*cache.PersistentCache, error) { +func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string, mr *metrics.Registry, timeNow func() time.Time) (*cache.PersistentCache, error) { opt = opt.CloneOrDefault() cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.MaxCacheSizeBytes, "server-contents") @@ -158,7 +158,7 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass pc, err := cache.NewPersistentCache(ctx, "cache-storage", cs, prot, cache.SweepSettings{ MaxSizeBytes: opt.MaxCacheSizeBytes, MinSweepAge: opt.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge), - }, mr) + }, mr, timeNow) if err != nil { return nil, errors.Wrap(err, "unable to open persistent cache") } @@ -172,7 +172,7 @@ func openAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions mr := metrics.NewRegistry() - contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, mr) + contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password, mr, options.TimeNowFunc) if err != nil { return nil, errors.Wrap(err, "error opening content cache") } diff --git a/repo/repository_test.go b/repo/repository_test.go index 650203537..baa9802d5 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -579,11 +579,13 @@ func TestObjectWritesWithRetention(t *testing.T) { require.NoError(t, versionedMap.ListBlobs(ctx, "", func(it blob.Metadata) error { for _, prefix := range prefixesWithRetention { if strings.HasPrefix(string(it.BlobID), prefix) { - require.Error(t, versionedMap.TouchBlob(ctx, it.BlobID, 0), "expected error while touching blob %s", it.BlobID) + _, err = versionedMap.TouchBlob(ctx, it.BlobID, 0) + require.Error(t, err, "expected error while touching blob %s", it.BlobID) return nil } } - require.NoError(t, versionedMap.TouchBlob(ctx, it.BlobID, 0), "unexpected error while touching blob %s", it.BlobID) + _, err = versionedMap.TouchBlob(ctx, it.BlobID, 0) + require.NoError(t, err, "unexpected error while touching blob %s", it.BlobID) return nil })) }