From 920341cb683d3f2e696bd9c16be610bd840f9927 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Fri, 3 Dec 2021 15:35:01 -0800 Subject: [PATCH] cache: prevent metadata cache thrashing if working set exceeds max defined size (#1557) This is done by protecting newly added cache items from being swept for X amount of time where X defaults to: * `metadata` - 24 hours (new) * `data` - 10 min (new) * `indexes` - 1 hours (same as today) Fixes #1540 --- cli/command_cache_info.go | 13 +++- cli/command_cache_set.go | 31 ++++++++- cli/command_repository_connect.go | 2 +- internal/cache/persistent_lru_cache.go | 67 +++++++++++++++---- internal/cache/persistent_lru_cache_test.go | 18 ++++- repo/caching.go | 5 +- repo/content/caching_options.go | 27 ++++++-- repo/content/committed_content_index.go | 3 +- .../committed_content_index_cache_test.go | 2 +- .../committed_content_index_disk_cache.go | 19 ++++-- repo/content/committed_read_manager.go | 21 ++++-- repo/content/content_cache_data.go | 4 +- repo/content/content_cache_metadata.go | 4 +- repo/content/content_cache_test.go | 18 +++-- repo/open.go | 5 +- 15 files changed, 190 insertions(+), 49 deletions(-) diff --git a/cli/command_cache_info.go b/cli/command_cache_info.go index ee19d378f..5395cc344 100644 --- a/cli/command_cache_info.go +++ b/cli/command_cache_info.go @@ -5,11 +5,13 @@ "fmt" "os" "path/filepath" + "time" "github.com/pkg/errors" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" ) type commandCacheInfo struct { @@ -50,6 +52,13 @@ func (c *commandCacheInfo) run(ctx context.Context, rep repo.Repository) error { "server-contents": opts.MaxCacheSizeBytes, } + path2SweepAgeSeconds := map[string]time.Duration{ + "contents": opts.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge), + "metadata": opts.MinMetadataSweepAge.DurationOrDefault(content.DefaultMetadataCacheSweepAge), + "indexes": opts.MinIndexSweepAge.DurationOrDefault(content.DefaultIndexCacheSweepAge), + "server-contents": opts.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge), + } + for _, ent := range entries { if !ent.IsDir() { continue @@ -64,11 +73,11 @@ func (c *commandCacheInfo) run(ctx context.Context, rep repo.Repository) error { maybeLimit := "" if l, ok := path2Limit[ent.Name()]; ok { - maybeLimit = fmt.Sprintf(" (limit %v)", units.BytesStringBase10(l)) + maybeLimit = fmt.Sprintf(" (limit %v, min sweep age %v)", units.BytesStringBase10(l), path2SweepAgeSeconds[ent.Name()]) } if ent.Name() == "blob-list" { - maybeLimit = fmt.Sprintf(" (duration %vs)", opts.MaxListCacheDurationSec) + maybeLimit = fmt.Sprintf(" (duration %v)", opts.MaxListCacheDuration.DurationOrDefault(0)) } c.out.printStdout("%v: %v files %v%v\n", subdir, fileCount, units.BytesStringBase10(totalFileSize), maybeLimit) diff --git a/cli/command_cache_set.go b/cli/command_cache_set.go index ea1bb939e..9a86b9f8a 100644 --- a/cli/command_cache_set.go +++ b/cli/command_cache_set.go @@ -8,6 +8,7 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" ) type commandCacheSetParams struct { @@ -15,6 +16,9 @@ type commandCacheSetParams struct { contentCacheSizeMB int64 maxMetadataCacheSizeMB int64 maxListCacheDuration time.Duration + contentMinSweepAge time.Duration + metadataMinSweepAge time.Duration + indexMinSweepAge time.Duration svc appServices } @@ -22,9 +26,16 @@ type commandCacheSetParams struct { func (c *commandCacheSetParams) setup(svc appServices, parent commandParent) { cmd := parent.Command("set", "Sets parameters local caching of repository data") + c.contentMinSweepAge = -1 + c.metadataMinSweepAge = -1 + c.indexMinSweepAge = -1 + cmd.Flag("cache-directory", "Directory where to store cache files").StringVar(&c.directory) cmd.Flag("content-cache-size-mb", "Size of local content cache").PlaceHolder("MB").Default("-1").Int64Var(&c.contentCacheSizeMB) + cmd.Flag("content-min-sweep-age", "Minimal age of content cache item to be subject to sweeping").DurationVar(&c.contentMinSweepAge) cmd.Flag("metadata-cache-size-mb", "Size of local metadata cache").PlaceHolder("MB").Default("-1").Int64Var(&c.maxMetadataCacheSizeMB) + cmd.Flag("metadata-min-sweep-age", "Minimal age of metadata cache item to be subject to sweeping").DurationVar(&c.metadataMinSweepAge) + cmd.Flag("index-min-sweep-age", "Minimal age of index cache item to be subject to sweeping").DurationVar(&c.indexMinSweepAge) cmd.Flag("max-list-cache-duration", "Duration of index cache").Default("-1ns").DurationVar(&c.maxListCacheDuration) cmd.Action(svc.repositoryWriterAction(c.run)) c.svc = svc @@ -60,7 +71,25 @@ func (c *commandCacheSetParams) run(ctx context.Context, rep repo.RepositoryWrit if v := c.maxListCacheDuration; v != -1 { log(ctx).Infof("changing list cache duration to %v", v) - opts.MaxListCacheDurationSec = int(v.Seconds()) + opts.MaxListCacheDuration = content.DurationSeconds(v.Seconds()) + changed++ + } + + if v := c.metadataMinSweepAge; v != -1 { + log(ctx).Infof("changing minimum metadata sweep age to %v", v) + opts.MinMetadataSweepAge = content.DurationSeconds(v.Seconds()) + changed++ + } + + if v := c.contentMinSweepAge; v != -1 { + log(ctx).Infof("changing minimum content sweep age to %v", v) + opts.MinContentSweepAge = content.DurationSeconds(v.Seconds()) + changed++ + } + + if v := c.indexMinSweepAge; v != -1 { + log(ctx).Infof("changing minimum index sweep age to %v", v) + opts.MinIndexSweepAge = content.DurationSeconds(v.Seconds()) changed++ } diff --git a/cli/command_repository_connect.go b/cli/command_repository_connect.go index 7c486003b..85f678919 100644 --- a/cli/command_repository_connect.go +++ b/cli/command_repository_connect.go @@ -90,7 +90,7 @@ func (c *connectOptions) toRepoConnectOptions() *repo.ConnectOptions { CacheDirectory: c.connectCacheDirectory, MaxCacheSizeBytes: c.connectMaxCacheSizeMB << 20, //nolint:gomnd MaxMetadataCacheSizeBytes: c.connectMaxMetadataCacheSizeMB << 20, //nolint:gomnd - MaxListCacheDurationSec: int(c.connectMaxListCacheDuration.Seconds()), + MaxListCacheDuration: content.DurationSeconds(c.connectMaxListCacheDuration.Seconds()), }, ClientOptions: repo.ClientOptions{ Hostname: c.connectHostname, diff --git a/internal/cache/persistent_lru_cache.go b/internal/cache/persistent_lru_cache.go index f634d6ce7..d76778acb 100644 --- a/internal/cache/persistent_lru_cache.go +++ b/internal/cache/persistent_lru_cache.go @@ -11,6 +11,7 @@ "github.com/pkg/errors" "go.opencensus.io/stats" + "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/repo/blob" @@ -34,11 +35,9 @@ type PersistentCache struct { cacheStorage Storage storageProtection StorageProtection + sweep SweepSettings - maxSizeBytes int64 - sweepFrequency time.Duration - touchThreshold time.Duration - description string + description string periodicSweepRunning sync.WaitGroup periodicSweepClosed chan struct{} @@ -94,7 +93,7 @@ func (c *PersistentCache) Get(ctx context.Context, key string, offset, length in ) // cache hit - c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.touchThreshold) //nolint:errcheck + c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) //nolint:errcheck return true } @@ -158,7 +157,7 @@ func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) { case <-c.periodicSweepClosed: return - case <-time.After(c.sweepFrequency): + case <-time.After(c.sweep.SweepFrequency): if err := c.sweepDirectory(ctx); err != nil { log(ctx).Errorf("error during periodic sweep of %v: %v", c.description, err) } @@ -197,13 +196,25 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { var h contentMetadataHeap - var totalRetainedSize int64 + var ( + totalRetainedSize int64 + tooRecentBytes int64 + tooRecentCount int + ) err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + // ignore items below minimal age. + if age := clock.Now().Sub(it.Timestamp); age < c.sweep.MinSweepAge { + tooRecentCount++ + tooRecentBytes += it.Length + + return nil + } + heap.Push(&h, it) totalRetainedSize += it.Length - if totalRetainedSize > c.maxSizeBytes { + if totalRetainedSize > c.sweep.MaxSizeBytes { oldest := heap.Pop(&h).(blob.Metadata) //nolint:forcetypeassert if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr) @@ -211,6 +222,7 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { totalRetainedSize -= oldest.Length } } + return nil }) if err != nil { @@ -219,23 +231,52 @@ func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { dur := timer.Elapsed() - log(ctx).Debugf("finished sweeping %v in %v and retained %v/%v bytes (%v %%)", c.description, dur, totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) + log(ctx).Debugw( + "finished sweeping", + "cache", c.description, + "duration", dur, + "totalRetainedSize", totalRetainedSize, + "tooRecentBytes", tooRecentBytes, + "tooRecentCount", tooRecentCount, + "maxSizeBytes", c.sweep.MaxSizeBytes, + "inUsePercent", 100*totalRetainedSize/c.sweep.MaxSizeBytes, + ) return nil } +// SweepSettings encapsulates settings that impact cache item sweep/expiration. +type SweepSettings struct { + MaxSizeBytes int64 + SweepFrequency time.Duration + MinSweepAge time.Duration + TouchThreshold time.Duration +} + +func (s SweepSettings) applyDefaults() SweepSettings { + if s.TouchThreshold == 0 { + s.TouchThreshold = DefaultTouchThreshold + } + + if s.SweepFrequency == 0 { + s.SweepFrequency = DefaultSweepFrequency + } + + return s +} + // NewPersistentCache creates the persistent cache in the provided storage. -func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection StorageProtection, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*PersistentCache, error) { +func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection StorageProtection, sweep SweepSettings) (*PersistentCache, error) { + sweep = sweep.applyDefaults() + if storageProtection == nil { storageProtection = nullStorageProtection{} } c := &PersistentCache{ cacheStorage: cacheStorage, - maxSizeBytes: maxSizeBytes, + sweep: sweep, periodicSweepClosed: make(chan struct{}), - touchThreshold: touchThreshold, - sweepFrequency: sweepFrequency, description: description, storageProtection: storageProtection, } diff --git a/internal/cache/persistent_lru_cache_test.go b/internal/cache/persistent_lru_cache_test.go index d641dde4f..480ec57ed 100644 --- a/internal/cache/persistent_lru_cache_test.go +++ b/internal/cache/persistent_lru_cache_test.go @@ -27,7 +27,11 @@ func TestPersistentLRUCache(t *testing.T) { t.Fatal(err) } - pc, err := cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + pc, err := cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ + MaxSizeBytes: maxSizeBytes, + TouchThreshold: cache.DefaultTouchThreshold, + SweepFrequency: cache.DefaultSweepFrequency, + }) if err != nil { t.Fatal(err) } @@ -70,7 +74,11 @@ func TestPersistentLRUCache(t *testing.T) { verifyBlobExists(ctx, t, cs, "key3") verifyBlobExists(ctx, t, cs, "key4") - pc, err = cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + pc, err = cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ + MaxSizeBytes: maxSizeBytes, + TouchThreshold: cache.DefaultTouchThreshold, + SweepFrequency: cache.DefaultSweepFrequency, + }) if err != nil { t.Fatal(err) } @@ -82,7 +90,11 @@ func TestPersistentLRUCache(t *testing.T) { // create another persistent cache based on the same storage but wrong protection key. // all reads from cache will be invalid, which means GetOrLoad will fetch them from the source. - pc2, err := cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{3, 2, 1}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + pc2, err := cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{3, 2, 1}), cache.SweepSettings{ + MaxSizeBytes: maxSizeBytes, + TouchThreshold: cache.DefaultTouchThreshold, + SweepFrequency: cache.DefaultSweepFrequency, + }) if err != nil { t.Fatal(err) } diff --git a/repo/caching.go b/repo/caching.go index 99e5369c6..484d57ede 100644 --- a/repo/caching.go +++ b/repo/caching.go @@ -69,7 +69,10 @@ func setupCachingOptionsWithDefaults(ctx context.Context, configPath string, lc lc.Caching.MaxCacheSizeBytes = opt.MaxCacheSizeBytes lc.Caching.MaxMetadataCacheSizeBytes = opt.MaxMetadataCacheSizeBytes - lc.Caching.MaxListCacheDurationSec = opt.MaxListCacheDurationSec + lc.Caching.MaxListCacheDuration = opt.MaxListCacheDuration + lc.Caching.MinContentSweepAge = opt.MinContentSweepAge + lc.Caching.MinMetadataSweepAge = opt.MinMetadataSweepAge + lc.Caching.MinIndexSweepAge = opt.MinIndexSweepAge log(ctx).Debugf("Creating cache directory '%v' with max size %v", lc.Caching.CacheDirectory, lc.Caching.MaxCacheSizeBytes) diff --git a/repo/content/caching_options.go b/repo/content/caching_options.go index 19a55f6a7..36657077f 100644 --- a/repo/content/caching_options.go +++ b/repo/content/caching_options.go @@ -1,12 +1,29 @@ package content +import "time" + +// DurationSeconds represents the duration in seconds. +type DurationSeconds float64 + +// DurationOrDefault returns the duration or the provided default if not set or zero. +func (s DurationSeconds) DurationOrDefault(def time.Duration) time.Duration { + if s == 0 { + return def + } + + return time.Duration(float64(s) * float64(time.Second)) +} + // CachingOptions specifies configuration of local cache. type CachingOptions struct { - CacheDirectory string `json:"cacheDirectory,omitempty"` - MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"` - MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"` - MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"` - HMACSecret []byte `json:"-"` + CacheDirectory string `json:"cacheDirectory,omitempty"` + MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"` + MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"` + MaxListCacheDuration DurationSeconds `json:"maxListCacheDuration,omitempty"` + MinMetadataSweepAge DurationSeconds `json:"minMetadataSweepAge,omitempty"` + MinContentSweepAge DurationSeconds `json:"minContentSweepAge,omitempty"` + MinIndexSweepAge DurationSeconds `json:"minIndexSweepAge,omitempty"` + HMACSecret []byte `json:"-"` } // CloneOrDefault returns a clone of the caching options or empty options for nil. diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 4e689c220..a86a3bc19 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -325,12 +325,13 @@ func newCommittedContentIndex(caching *CachingOptions, indexVersion int, fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error, log logging.Logger, + minSweepAge time.Duration, ) *committedContentIndex { var cache committedContentIndexCache if caching.CacheDirectory != "" { dirname := filepath.Join(caching.CacheDirectory, "indexes") - cache = &diskCommittedContentIndexCache{dirname, clock.Now, v1PerContentOverhead, log} + cache = &diskCommittedContentIndexCache{dirname, clock.Now, v1PerContentOverhead, log, minSweepAge} } else { cache = &memoryCommittedContentIndexCache{ contents: map[blob.ID]packIndex{}, diff --git a/repo/content/committed_content_index_cache_test.go b/repo/content/committed_content_index_cache_test.go index 82ce4502c..a5764f3b2 100644 --- a/repo/content/committed_content_index_cache_test.go +++ b/repo/content/committed_content_index_cache_test.go @@ -20,7 +20,7 @@ func TestCommittedContentIndexCache_Disk(t *testing.T) { ta := faketime.NewClockTimeWithOffset(0) - testCache(t, &diskCommittedContentIndexCache{testutil.TempDirectory(t), ta.NowFunc(), 3, logging.Printf(t.Logf, "test")}, ta) + testCache(t, &diskCommittedContentIndexCache{testutil.TempDirectory(t), ta.NowFunc(), 3, logging.Printf(t.Logf, "test"), DefaultIndexCacheSweepAge}, ta) } func TestCommittedContentIndexCache_Memory(t *testing.T) { diff --git a/repo/content/committed_content_index_disk_cache.go b/repo/content/committed_content_index_disk_cache.go index 17ae5ad0c..87d12be2a 100644 --- a/repo/content/committed_content_index_disk_cache.go +++ b/repo/content/committed_content_index_disk_cache.go @@ -17,8 +17,7 @@ ) const ( - simpleIndexSuffix = ".sndx" - unusedCommittedContentIndexCleanupTime = 1 * time.Hour // delete unused committed index blobs after 1 hour + simpleIndexSuffix = ".sndx" ) type diskCommittedContentIndexCache struct { @@ -26,6 +25,7 @@ type diskCommittedContentIndexCache struct { timeNow func() time.Time v1PerContentOverhead uint32 log logging.Logger + minSweepAge time.Duration } func (c *diskCommittedContentIndexCache) indexBlobPath(indexBlobID blob.ID) string { @@ -137,7 +137,9 @@ func writeTempFileAtomic(dirname string, data []byte) (string, error) { } func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used []blob.ID) error { - c.log.Debugf("expireUnused (except %v)", used) + c.log.Debugw("expireUnused", + "except", used, + "minSweepAge", c.minSweepAge) entries, err := os.ReadDir(c.dirname) if err != nil { @@ -168,14 +170,19 @@ func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used } for _, rem := range remaining { - if c.timeNow().Sub(rem.ModTime()) > unusedCommittedContentIndexCleanupTime { - c.log.Debugf("removing unused %v %v", rem.Name(), rem.ModTime()) + if c.timeNow().Sub(rem.ModTime()) > c.minSweepAge { + c.log.Debugw("removing unused", + "name", rem.Name(), + "mtime", rem.ModTime()) if err := os.Remove(filepath.Join(c.dirname, rem.Name())); err != nil { c.log.Errorf("unable to remove unused index file: %v", err) } } else { - c.log.Debugf("keeping unused %v because it's too new %v", rem.Name(), rem.ModTime()) + c.log.Debugw("keeping unused index because it's too new", + "name", rem.Name(), + "mtime", rem.ModTime(), + "threshold", c.minSweepAge) } } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 57a5eeb67..2db89ea7f 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -33,6 +33,13 @@ const ownWritesCacheDuration = 15 * time.Minute +// constants below specify how long to prevent cache entries from expiring. +const ( + DefaultMetadataCacheSweepAge = 24 * time.Hour + DefaultDataCacheSweepAge = 10 * time.Minute + DefaultIndexCacheSweepAge = 1 * time.Hour +) + var cachedIndexBlobPrefixes = []blob.ID{ IndexBlobPrefix, compactionLogBlobPrefix, @@ -325,7 +332,7 @@ func newListCache(ctx context.Context, st blob.Storage, caching *CachingOptions) return nil, errors.Wrap(err, "unable to get list cache backing storage") } - return listcache.NewWrapper(st, cacheSt, cachedIndexBlobPrefixes, caching.HMACSecret, time.Duration(caching.MaxListCacheDurationSec)*time.Second), nil + return listcache.NewWrapper(st, cacheSt, cachedIndexBlobPrefixes, caching.HMACSecret, caching.MaxListCacheDuration.DurationOrDefault(0)), nil } func newCacheBackingStorage(ctx context.Context, caching *CachingOptions, subdir string) (blob.Storage, error) { @@ -364,7 +371,10 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca return errors.Wrap(err, "unable to initialize data cache storage") } - dataCache, err := newContentCacheForData(ctx, sm.st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) + dataCache, err := newContentCacheForData(ctx, sm.st, dataCacheStorage, cache.SweepSettings{ + MaxSizeBytes: caching.MaxCacheSizeBytes, + MinSweepAge: caching.MinContentSweepAge.DurationOrDefault(DefaultDataCacheSweepAge), + }, caching.HMACSecret) if err != nil { return errors.Wrap(err, "unable to initialize content cache") } @@ -379,7 +389,10 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca return errors.Wrap(err, "unable to initialize data cache storage") } - metadataCache, err := newContentCacheForMetadata(ctx, sm.st, metadataCacheStorage, metadataCacheSize) + metadataCache, err := newContentCacheForMetadata(ctx, sm.st, metadataCacheStorage, cache.SweepSettings{ + MaxSizeBytes: metadataCacheSize, + MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge), + }) if err != nil { return errors.Wrap(err, "unable to initialize metadata cache") } @@ -434,7 +447,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca // once everything is ready, set it up sm.contentCache = dataCache sm.metadataCache = metadataCache - sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index")) + sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) return nil } diff --git a/repo/content/content_cache_data.go b/repo/content/content_cache_data.go index d52df8456..9a5c72401 100644 --- a/repo/content/content_cache_data.go +++ b/repo/content/content_cache_data.go @@ -39,12 +39,12 @@ func (c *contentCacheForData) close(ctx context.Context) { c.pc.Close(ctx) } -func newContentCacheForData(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) { +func newContentCacheForData(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, sweep cache.SweepSettings, hmacSecret []byte) (contentCache, error) { if cacheStorage == nil { return passthroughContentCache{st}, nil } - pc, err := cache.NewPersistentCache(ctx, "content cache", cacheStorage, cache.ChecksumProtection(hmacSecret), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + pc, err := cache.NewPersistentCache(ctx, "content cache", cacheStorage, cache.ChecksumProtection(hmacSecret), sweep) if err != nil { return nil, errors.Wrap(err, "unable to create base cache") } diff --git a/repo/content/content_cache_metadata.go b/repo/content/content_cache_metadata.go index 2fc149e3a..69544760d 100644 --- a/repo/content/content_cache_metadata.go +++ b/repo/content/content_cache_metadata.go @@ -125,12 +125,12 @@ func (c *contentCacheForMetadata) close(ctx context.Context) { c.pc.Close(ctx) } -func newContentCacheForMetadata(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, maxSizeBytes int64) (contentCache, error) { +func newContentCacheForMetadata(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, sweep cache.SweepSettings) (contentCache, error) { if cacheStorage == nil { return passthroughContentCache{st}, nil } - pc, err := cache.NewPersistentCache(ctx, "metadata cache", cacheStorage, cache.NoProtection(), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + pc, err := cache.NewPersistentCache(ctx, "metadata cache", cacheStorage, cache.NoProtection(), sweep) if err != nil { return nil, errors.Wrap(err, "unable to create base cache") } diff --git a/repo/content/content_cache_test.go b/repo/content/content_cache_test.go index 04cfb2d07..ed56c3a74 100644 --- a/repo/content/content_cache_test.go +++ b/repo/content/content_cache_test.go @@ -55,7 +55,11 @@ func TestCacheExpiration(t *testing.T) { underlyingStorage := newUnderlyingStorageForContentCacheTesting(t) - pc, err := cache.NewPersistentCache(testlogging.Context(t), "test cache", cacheStorage.(cache.Storage), cache.NoProtection(), 10000, 0, 500*time.Millisecond) + pc, err := cache.NewPersistentCache(testlogging.Context(t), "test cache", cacheStorage.(cache.Storage), cache.NoProtection(), cache.SweepSettings{ + MaxSizeBytes: 10000, + SweepFrequency: 500 * time.Millisecond, + TouchThreshold: -1, + }) if err != nil { t.Fatalf("unable to create base cache: %v", err) } @@ -121,7 +125,9 @@ func TestDiskContentCache(t *testing.T) { t.Fatal(err) } - cc, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil) + cc, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, cache.SweepSettings{ + MaxSizeBytes: maxBytes, + }, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -216,7 +222,7 @@ func TestCacheFailureToOpen(t *testing.T) { } // Will fail because of ListBlobs failure. - _, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) + _, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, cache.SweepSettings{MaxSizeBytes: 10000}, nil) if err == nil || !strings.Contains(err.Error(), someError.Error()) { t.Errorf("invalid error %v, wanted: %v", err, someError) } @@ -224,7 +230,7 @@ func TestCacheFailureToOpen(t *testing.T) { // ListBlobs fails only once, next time it succeeds. ctx := testlogging.Context(t) - cc, err := newContentCacheForData(ctx, underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) + cc, err := newContentCacheForData(ctx, underlyingStorage, withoutTouchBlob{faultyCache}, cache.SweepSettings{MaxSizeBytes: 10000}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -242,7 +248,7 @@ func TestCacheFailureToWrite(t *testing.T) { Base: cacheStorage, } - cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) + cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, cache.SweepSettings{MaxSizeBytes: 10000}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -288,7 +294,7 @@ func TestCacheFailureToRead(t *testing.T) { Base: cacheStorage, } - cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) + cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, cache.SweepSettings{MaxSizeBytes: 10000}, nil) if err != nil { t.Fatalf("err: %v", err) } diff --git a/repo/open.go b/repo/open.go index 855ccc377..f0e52662a 100644 --- a/repo/open.go +++ b/repo/open.go @@ -122,7 +122,10 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass return nil, errors.Wrap(err, "unable to initialize protection") } - pc, err := cache.NewPersistentCache(ctx, "cache-storage", cs, prot, opt.MaxCacheSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + pc, err := cache.NewPersistentCache(ctx, "cache-storage", cs, prot, cache.SweepSettings{ + MaxSizeBytes: opt.MaxCacheSizeBytes, + MinSweepAge: opt.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge), + }) if err != nil { return nil, errors.Wrap(err, "unable to open persistent cache") }