From fe55dcb6a2c8bb93563201a5838386741c445095 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 24 Aug 2023 09:38:56 -0700 Subject: [PATCH] feat(repository): added hard size limit to the on-disk cache (#3238) * test(providers): added capacity limits to blobtesting.mapStorage * refactor(general): added mutex map which dynamically allocates and releases named mutexes * refactor(repository): refactored cache cleanup and limit enforcement * refactor(repository): plumb through cache size limits in the repository * feat(cli): added CLI options to set cache size limits * unified flag setting and field naming * Update cli/command_cache_set.go Co-authored-by: Shikhar Mall * pr feedback --------- Co-authored-by: Shikhar Mall --- cli/command_cache_info.go | 32 +- cli/command_cache_set.go | 69 ++++- cli/command_cache_set_test.go | 12 +- cli/command_repository_connect.go | 30 +- internal/blobtesting/map.go | 49 ++- internal/blobtesting/map_test.go | 46 +++ internal/cache/content_cache.go | 21 +- internal/cache/content_cache_test.go | 101 +++++-- internal/cache/mutex_map.go | 95 ++++++ internal/cache/mutex_map_test.go | 50 ++++ internal/cache/persistent_lru_cache.go | 282 ++++++++---------- internal/cache/persistent_lru_cache_test.go | 37 +-- internal/cacheprot/storage_protection.go | 13 + internal/server/server_test.go | 4 +- repo/caching.go | 10 +- repo/content/caching_options.go | 28 +- repo/content/committed_read_manager.go | 49 +-- repo/content/content_manager_test.go | 12 +- repo/open.go | 5 +- .../repository_stress_test.go | 4 +- 20 files changed, 654 insertions(+), 295 deletions(-) create mode 100644 internal/cache/mutex_map.go create mode 100644 internal/cache/mutex_map_test.go diff --git a/cli/command_cache_info.go b/cli/command_cache_info.go index cac6083f3..d2062eebe 100644 --- a/cli/command_cache_info.go +++ b/cli/command_cache_info.go @@ -46,10 +46,16 @@ func (c *commandCacheInfo) run(ctx context.Context, _ repo.Repository) error { return errors.Wrap(err, "unable to scan cache directory") } - path2Limit := map[string]int64{ - "contents": opts.MaxCacheSizeBytes, - "metadata": opts.MaxMetadataCacheSizeBytes, - "server-contents": opts.MaxCacheSizeBytes, + path2SoftLimit := map[string]int64{ + "contents": opts.ContentCacheSizeBytes, + "metadata": opts.MetadataCacheSizeBytes, + "server-contents": opts.ContentCacheSizeBytes, + } + + path2HardLimit := map[string]int64{ + "contents": opts.ContentCacheSizeLimitBytes, + "metadata": opts.MetadataCacheSizeLimitBytes, + "server-contents": opts.ContentCacheSizeLimitBytes, } path2SweepAgeSeconds := map[string]time.Duration{ @@ -72,12 +78,24 @@ func (c *commandCacheInfo) run(ctx context.Context, _ repo.Repository) error { } maybeLimit := "" - if l, ok := path2Limit[ent.Name()]; ok { - maybeLimit = fmt.Sprintf(" (limit %v, min sweep age %v)", units.BytesString(l), path2SweepAgeSeconds[ent.Name()]) + + if l, ok := path2SoftLimit[ent.Name()]; ok { + var hardLimit string + + if hl := path2HardLimit[ent.Name()]; hl > 0 { + hardLimit = units.BytesString(hl) + } else { + hardLimit = "none" + } + + maybeLimit = fmt.Sprintf(" (soft limit: %v, hard limit: %v, min sweep age: %v)", + units.BytesString(l), + hardLimit, + path2SweepAgeSeconds[ent.Name()]) } if ent.Name() == "blob-list" { - maybeLimit = fmt.Sprintf(" (duration %v)", opts.MaxListCacheDuration.DurationOrDefault(0)) + maybeLimit = fmt.Sprintf(" (duration: %v)", opts.MaxListCacheDuration.DurationOrDefault(0)) } c.out.printStdout("%v: %v files %v%v\n", subdir, fileCount, units.BytesString(totalFileSize), maybeLimit) diff --git a/cli/command_cache_set.go b/cli/command_cache_set.go index 0bdce1980..fb08001dd 100644 --- a/cli/command_cache_set.go +++ b/cli/command_cache_set.go @@ -4,6 +4,7 @@ "context" "time" + "github.com/alecthomas/kingpin/v2" "github.com/pkg/errors" "github.com/kopia/kopia/internal/units" @@ -11,14 +12,36 @@ "github.com/kopia/kopia/repo/content" ) +type cacheSizeFlags struct { + contentCacheSizeMB int64 + contentCacheSizeLimitMB int64 + contentMinSweepAge time.Duration + + metadataCacheSizeMB int64 + metadataCacheSizeLimitMB int64 + metadataMinSweepAge time.Duration + + maxListCacheDuration time.Duration + indexMinSweepAge time.Duration +} + +func (c *cacheSizeFlags) setup(cmd *kingpin.CmdClause) { + // do not use Defaults here, since this structure is shared between connect/create/set commands + // each command will set their default values in code. + cmd.Flag("content-cache-size-mb", "Desired size of local content cache (soft limit)").PlaceHolder("MB").Int64Var(&c.contentCacheSizeMB) + cmd.Flag("content-cache-size-limit-mb", "Maximum size of local content cache (hard limit)").PlaceHolder("MB").Int64Var(&c.contentCacheSizeLimitMB) + cmd.Flag("content-min-sweep-age", "Minimal age of content cache item to be subject to sweeping").DurationVar(&c.contentMinSweepAge) + cmd.Flag("metadata-cache-size-mb", "Desired size of local metadata cache (soft limit)").PlaceHolder("MB").Int64Var(&c.metadataCacheSizeMB) + cmd.Flag("metadata-cache-size-limit-mb", "Maximum size of local metadata cache (hard limit)").PlaceHolder("MB").Int64Var(&c.metadataCacheSizeLimitMB) + cmd.Flag("metadata-min-sweep-age", "Minimal age of metadata cache item to be subject to sweeping").DurationVar(&c.metadataMinSweepAge) + cmd.Flag("index-min-sweep-age", "Minimal age of index cache item to be subject to sweeping").DurationVar(&c.indexMinSweepAge) + cmd.Flag("max-list-cache-duration", "Duration of index cache").DurationVar(&c.maxListCacheDuration) +} + type commandCacheSetParams struct { - directory string - contentCacheSizeMB int64 - maxMetadataCacheSizeMB int64 - maxListCacheDuration time.Duration - contentMinSweepAge time.Duration - metadataMinSweepAge time.Duration - indexMinSweepAge time.Duration + directory string + + cacheSizeFlags svc appServices } @@ -30,14 +53,14 @@ func (c *commandCacheSetParams) setup(svc appServices, parent commandParent) { c.metadataMinSweepAge = -1 c.indexMinSweepAge = -1 c.maxListCacheDuration = -1 + c.contentCacheSizeLimitMB = -1 + c.contentCacheSizeMB = -1 + c.metadataCacheSizeLimitMB = -1 + c.metadataCacheSizeMB = -1 + c.cacheSizeFlags.setup(cmd) cmd.Flag("cache-directory", "Directory where to store cache files").StringVar(&c.directory) - cmd.Flag("content-cache-size-mb", "Size of local content cache").PlaceHolder("MB").Default("-1").Int64Var(&c.contentCacheSizeMB) - cmd.Flag("content-min-sweep-age", "Minimal age of content cache item to be subject to sweeping").DurationVar(&c.contentMinSweepAge) - cmd.Flag("metadata-cache-size-mb", "Size of local metadata cache").PlaceHolder("MB").Default("-1").Int64Var(&c.maxMetadataCacheSizeMB) - cmd.Flag("metadata-min-sweep-age", "Minimal age of metadata cache item to be subject to sweeping").DurationVar(&c.metadataMinSweepAge) - cmd.Flag("index-min-sweep-age", "Minimal age of index cache item to be subject to sweeping").DurationVar(&c.indexMinSweepAge) - cmd.Flag("max-list-cache-duration", "Duration of index cache").DurationVar(&c.maxListCacheDuration) + cmd.Action(svc.repositoryWriterAction(c.run)) c.svc = svc } @@ -59,14 +82,28 @@ func (c *commandCacheSetParams) run(ctx context.Context, _ repo.RepositoryWriter if v := c.contentCacheSizeMB; v != -1 { v *= 1e6 // convert MB to bytes log(ctx).Infof("changing content cache size to %v", units.BytesString(v)) - opts.MaxCacheSizeBytes = v + opts.ContentCacheSizeBytes = v changed++ } - if v := c.maxMetadataCacheSizeMB; v != -1 { + if v := c.contentCacheSizeLimitMB; v != -1 { + v *= 1e6 // convert MB to bytes + log(ctx).Infof("changing content cache size limit to %v", units.BytesString(v)) + opts.ContentCacheSizeLimitBytes = v + changed++ + } + + if v := c.metadataCacheSizeMB; v != -1 { v *= 1e6 // convert MB to bytes log(ctx).Infof("changing metadata cache size to %v", units.BytesString(v)) - opts.MaxMetadataCacheSizeBytes = v + opts.MetadataCacheSizeBytes = v + changed++ + } + + if v := c.metadataCacheSizeLimitMB; v != -1 { + v *= 1e6 // convert MB to bytes + log(ctx).Infof("changing metadata cache size limit to %v", units.BytesString(v)) + opts.MetadataCacheSizeLimitBytes = v changed++ } diff --git a/cli/command_cache_set_test.go b/cli/command_cache_set_test.go index 17fadc411..8a0dc7a78 100644 --- a/cli/command_cache_set_test.go +++ b/cli/command_cache_set_test.go @@ -30,13 +30,21 @@ func TestCacheSet(t *testing.T) { "cache", "set", "--cache-directory", ncd, "--content-cache-size-mb=33", + "--content-cache-size-limit-mb=331", "--metadata-cache-size-mb=44", + "--metadata-cache-size-limit-mb=441", ) out := env.RunAndExpectSuccess(t, "cache", "info") require.Contains(t, mustGetLineContaining(t, out, "33 MB"), ncd) - require.Contains(t, mustGetLineContaining(t, out, "33 MB"), "contents") - require.Contains(t, mustGetLineContaining(t, out, "44 MB"), "metadata") + require.Contains(t, mustGetLineContaining(t, out, "soft limit: 33 MB"), "contents") + require.Contains(t, mustGetLineContaining(t, out, "hard limit: 331 MB"), "contents") + require.Contains(t, mustGetLineContaining(t, out, "min sweep age: 10m0s"), "contents") + + require.Contains(t, mustGetLineContaining(t, out, "soft limit: 44 MB"), "metadata") + require.Contains(t, mustGetLineContaining(t, out, "hard limit: 441 MB"), "metadata") + require.Contains(t, mustGetLineContaining(t, out, "min sweep age: 24h0m0s"), "metadata") + require.Contains(t, mustGetLineContaining(t, out, "55s"), "blob-list") } diff --git a/cli/command_repository_connect.go b/cli/command_repository_connect.go index d4a599ad9..d68591875 100644 --- a/cli/command_repository_connect.go +++ b/cli/command_repository_connect.go @@ -46,10 +46,10 @@ func (c *commandRepositoryConnect) setup(svc advancedAppServices, parent command } type connectOptions struct { - connectCacheDirectory string - connectMaxCacheSizeMB int64 - connectMaxMetadataCacheSizeMB int64 - connectMaxListCacheDuration time.Duration + connectCacheDirectory string + + cacheSizeFlags + connectHostname string connectUsername string connectCheckForUpdates bool @@ -66,9 +66,12 @@ func (c *connectOptions) setup(svc appServices, cmd *kingpin.CmdClause) { // Set up flags shared between 'create' and 'connect'. Note that because those flags are used by both command // we must use *Var() methods, otherwise one of the commands would always get default flag values. cmd.Flag("cache-directory", "Cache directory").PlaceHolder("PATH").Envar(svc.EnvName("KOPIA_CACHE_DIRECTORY")).StringVar(&c.connectCacheDirectory) - cmd.Flag("content-cache-size-mb", "Size of local content cache").PlaceHolder("MB").Default("5000").Int64Var(&c.connectMaxCacheSizeMB) - cmd.Flag("metadata-cache-size-mb", "Size of local metadata cache").PlaceHolder("MB").Default("5000").Int64Var(&c.connectMaxMetadataCacheSizeMB) - cmd.Flag("max-list-cache-duration", "Duration of index cache").Default("30s").Hidden().DurationVar(&c.connectMaxListCacheDuration) + + c.maxListCacheDuration = 30 * time.Second //nolint:gomnd + c.contentCacheSizeMB = 5000 + c.metadataCacheSizeMB = 5000 + c.cacheSizeFlags.setup(cmd) + cmd.Flag("override-hostname", "Override hostname used by this repository connection").Hidden().StringVar(&c.connectHostname) cmd.Flag("override-username", "Override username used by this repository connection").Hidden().StringVar(&c.connectUsername) cmd.Flag("check-for-updates", "Periodically check for Kopia updates on GitHub").Default("true").Envar(svc.EnvName(checkForUpdatesEnvar)).BoolVar(&c.connectCheckForUpdates) @@ -91,10 +94,15 @@ func (c *connectOptions) getFormatBlobCacheDuration() time.Duration { func (c *connectOptions) toRepoConnectOptions() *repo.ConnectOptions { return &repo.ConnectOptions{ CachingOptions: content.CachingOptions{ - CacheDirectory: c.connectCacheDirectory, - MaxCacheSizeBytes: c.connectMaxCacheSizeMB << 20, //nolint:gomnd - MaxMetadataCacheSizeBytes: c.connectMaxMetadataCacheSizeMB << 20, //nolint:gomnd - MaxListCacheDuration: content.DurationSeconds(c.connectMaxListCacheDuration.Seconds()), + CacheDirectory: c.connectCacheDirectory, + ContentCacheSizeBytes: c.contentCacheSizeMB << 20, //nolint:gomnd + ContentCacheSizeLimitBytes: c.contentCacheSizeLimitMB << 20, //nolint:gomnd + MetadataCacheSizeBytes: c.metadataCacheSizeMB << 20, //nolint:gomnd + MetadataCacheSizeLimitBytes: c.metadataCacheSizeLimitMB << 20, //nolint:gomnd + MaxListCacheDuration: content.DurationSeconds(c.maxListCacheDuration.Seconds()), + MinContentSweepAge: content.DurationSeconds(c.contentMinSweepAge.Seconds()), + MinMetadataSweepAge: content.DurationSeconds(c.metadataMinSweepAge.Seconds()), + MinIndexSweepAge: content.DurationSeconds(c.indexMinSweepAge.Seconds()), }, ClientOptions: repo.ClientOptions{ Hostname: c.connectHostname, diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 39aed3275..b5f4ccba8 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -25,7 +25,25 @@ type mapStorage struct { keyTime map[blob.ID]time.Time // +checklocks:mutex timeNow func() time.Time - mutex sync.RWMutex + // +checklocks:mutex + totalBytes int64 + // +checklocksignore + limit int64 + mutex sync.RWMutex +} + +func (s *mapStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { + if s.limit < 0 { + return blob.Capacity{}, blob.ErrNotAVolume + } + + s.mutex.RLock() + defer s.mutex.RUnlock() + + return blob.Capacity{ + SizeB: uint64(s.limit), + FreeB: uint64(s.limit - s.totalBytes), + }, nil } func (s *mapStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { @@ -90,17 +108,23 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, o s.mutex.Lock() defer s.mutex.Unlock() + var b bytes.Buffer + + data.WriteTo(&b) + + if s.limit >= 0 && s.totalBytes+int64(b.Len()) > s.limit { + return errors.Errorf("exceeded limit, unable to add %v bytes, currently using %v/%v", b.Len(), s.totalBytes, s.limit) + } + if !opts.SetModTime.IsZero() { s.keyTime[id] = opts.SetModTime } else { s.keyTime[id] = s.timeNow() } - var b bytes.Buffer - - data.WriteTo(&b) - + s.totalBytes -= int64(len(s.data[id])) s.data[id] = b.Bytes() + s.totalBytes += int64(len(s.data[id])) if opts.GetModTime != nil { *opts.GetModTime = s.keyTime[id] @@ -113,6 +137,7 @@ func (s *mapStorage) DeleteBlob(ctx context.Context, id blob.ID) error { s.mutex.Lock() defer s.mutex.Unlock() + s.totalBytes -= int64(len(s.data[id])) delete(s.data, id) delete(s.keyTime, id) @@ -184,6 +209,12 @@ func (s *mapStorage) DisplayName() string { // NewMapStorage returns an implementation of Storage backed by the contents of given map. // Used primarily for testing. func NewMapStorage(data DataMap, keyTime map[blob.ID]time.Time, timeNow func() time.Time) blob.Storage { + return NewMapStorageWithLimit(data, keyTime, timeNow, -1) +} + +// NewMapStorageWithLimit returns an implementation of Storage backed by the contents of given map. +// Used primarily for testing. +func NewMapStorageWithLimit(data DataMap, keyTime map[blob.ID]time.Time, timeNow func() time.Time, limit int64) blob.Storage { if keyTime == nil { keyTime = make(map[blob.ID]time.Time) } @@ -192,5 +223,11 @@ func NewMapStorage(data DataMap, keyTime map[blob.ID]time.Time, timeNow func() t timeNow = clock.Now } - return &mapStorage{data: data, keyTime: keyTime, timeNow: timeNow} + totalBytes := int64(0) + + for _, v := range data { + totalBytes += int64(len(v)) + } + + return &mapStorage{data: data, keyTime: keyTime, timeNow: timeNow, limit: limit, totalBytes: totalBytes} } diff --git a/internal/blobtesting/map_test.go b/internal/blobtesting/map_test.go index 456fa57b2..6719780f3 100644 --- a/internal/blobtesting/map_test.go +++ b/internal/blobtesting/map_test.go @@ -3,6 +3,9 @@ import ( "testing" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" ) @@ -17,3 +20,46 @@ func TestMapStorage(t *testing.T) { VerifyStorage(testlogging.Context(t), t, r, blob.PutOptions{}) } + +func TestMapStorageWithLimit(t *testing.T) { + ctx := testlogging.Context(t) + data := DataMap{} + + r := NewMapStorageWithLimit(data, nil, nil, 10) + verifyCapacityAndFreeSpace(t, r, 10, 10) + require.NoError(t, r.PutBlob(ctx, "foo", gather.FromSlice([]byte("foo")), blob.PutOptions{})) + verifyCapacityAndFreeSpace(t, r, 10, 7) + require.NoError(t, r.PutBlob(ctx, "bar", gather.FromSlice([]byte("bar")), blob.PutOptions{})) + verifyCapacityAndFreeSpace(t, r, 10, 4) + require.NoError(t, r.PutBlob(ctx, "baz", gather.FromSlice([]byte("baz")), blob.PutOptions{})) + verifyCapacityAndFreeSpace(t, r, 10, 1) + + // we're at 9/10 bytes, can't add 3 more + require.ErrorContains(t, r.PutBlob(ctx, "qux", gather.FromSlice([]byte("qux")), blob.PutOptions{}), "exceeded limit") + // remove 3 bytes + require.NoError(t, r.DeleteBlob(ctx, "baz")) + verifyCapacityAndFreeSpace(t, r, 10, 4) + // can add 4 bytes again + require.NoError(t, r.PutBlob(ctx, "qux", gather.FromSlice([]byte("qux1")), blob.PutOptions{})) + verifyCapacityAndFreeSpace(t, r, 10, 0) + // can't add any more bytes since we're at 10/10 bytes + require.ErrorContains(t, r.PutBlob(ctx, "aaa", gather.FromSlice([]byte("1")), blob.PutOptions{}), "exceeded limit") + // adding zero bytes won't fail in this situation. + require.NoError(t, r.PutBlob(ctx, "bbb", gather.FromSlice([]byte{}), blob.PutOptions{}), "exceeded limit") + verifyCapacityAndFreeSpace(t, r, 10, 0) + + r = NewMapStorageWithLimit(DataMap{ + "foo": []byte("foo"), + }, nil, nil, 20) + verifyCapacityAndFreeSpace(t, r, 20, 17) +} + +func verifyCapacityAndFreeSpace(t *testing.T, r blob.Storage, wantSize, wantFree int64) { + t.Helper() + + c, err := r.GetCapacity(testlogging.Context(t)) + require.NoError(t, err) + + require.Equal(t, uint64(wantSize), c.SizeB) + require.Equal(t, uint64(wantFree), c.FreeB) +} diff --git a/internal/cache/content_cache.go b/internal/cache/content_cache.go index b474230ae..38c07b45e 100644 --- a/internal/cache/content_cache.go +++ b/internal/cache/content_cache.go @@ -62,10 +62,8 @@ func (c *contentCacheImpl) GetContent(ctx context.Context, contentID string, blo } func (c *contentCacheImpl) getContentFromFullBlob(ctx context.Context, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error { - // acquire exclusive lock - mut := c.pc.GetFetchingMutex(blobID) - mut.Lock() - defer mut.Unlock() + c.pc.exclusiveLock(string(blobID)) + defer c.pc.exclusiveUnlock(string(blobID)) // check again to see if we perhaps lost the race and the data is now in cache. if c.pc.GetPartial(ctx, BlobIDCacheKey(blobID), offset, length, output) { @@ -114,9 +112,8 @@ func (c *contentCacheImpl) fetchBlobInternal(ctx context.Context, blobID blob.ID func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context, contentID string, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error { // acquire shared lock on a blob, PrefetchBlob will acquire exclusive lock here. - mut := c.pc.GetFetchingMutex(blobID) - mut.RLock() - defer mut.RUnlock() + c.pc.sharedLock(string(blobID)) + defer c.pc.sharedUnlock(string(blobID)) // see if we have the full blob cached by extracting a partial range. if c.pc.GetPartial(ctx, BlobIDCacheKey(blobID), offset, length, output) { @@ -124,9 +121,8 @@ func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context, } // acquire exclusive lock on the content - mut2 := c.pc.GetFetchingMutex(blob.ID(contentID)) - mut2.Lock() - defer mut2.Unlock() + c.pc.exclusiveLock(contentID) + defer c.pc.exclusiveUnlock(contentID) output.Reset() @@ -161,9 +157,8 @@ func (c *contentCacheImpl) PrefetchBlob(ctx context.Context, blobID blob.ID) err } // acquire exclusive lock for the blob. - mut := c.pc.GetFetchingMutex(blobID) - mut.Lock() - defer mut.Unlock() + c.pc.exclusiveLock(string(blobID)) + defer c.pc.exclusiveUnlock(string(blobID)) if c.pc.GetPartial(ctx, BlobIDCacheKey(blobID), 0, 1, &blobData) { return nil diff --git a/internal/cache/content_cache_test.go b/internal/cache/content_cache_test.go index 18b684295..c7657221f 100644 --- a/internal/cache/content_cache_test.go +++ b/internal/cache/content_cache_test.go @@ -11,6 +11,7 @@ "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/cache" @@ -33,7 +34,64 @@ func newUnderlyingStorageForContentCacheTesting(t *testing.T) blob.Storage { return st } -func TestCacheExpiration(t *testing.T) { +func TestCacheExpiration_SoftLimitNoMinAge(t *testing.T) { + // cache is 10k, each blob is 4k, so we can store 2 blobs before they are evicted. + wantEvicted := []blob.ID{"a", "b"} + + verifyCacheExpiration(t, cache.SweepSettings{ + MaxSizeBytes: 10000, + TouchThreshold: -1, + }, wantEvicted) +} + +func TestCacheExpiration_SoftLimitWithMinAge(t *testing.T) { + // cache is 10k, each blob is 4k, cache will grow beyond the limit but will not evict anything. + verifyCacheExpiration(t, cache.SweepSettings{ + MaxSizeBytes: 10000, + TouchThreshold: -1, + MinSweepAge: time.Hour, + }, nil) +} + +func TestCacheExpiration_HardLimitWithMinAge(t *testing.T) { + // cache is 10k, each blob is 4k, cache will grow beyond the limit but will not evict anything. + wantEvicted := []blob.ID{"a", "b"} + + verifyCacheExpiration(t, cache.SweepSettings{ + MaxSizeBytes: 10000, + TouchThreshold: -1, + MinSweepAge: time.Hour, + LimitBytes: 10000, + }, wantEvicted) +} + +func TestCacheExpiration_HardLimitAboveSoftLimit(t *testing.T) { + wantExpired := []blob.ID{"a"} + + verifyCacheExpiration(t, cache.SweepSettings{ + MaxSizeBytes: 10000, + TouchThreshold: -1, + MinSweepAge: time.Hour, + LimitBytes: 13000, + }, wantExpired) +} + +func TestCacheExpiration_HardLimitBelowSoftLimit(t *testing.T) { + wantExpired := []blob.ID{"a", "b", "c"} + + verifyCacheExpiration(t, cache.SweepSettings{ + MaxSizeBytes: 10000, + TouchThreshold: -1, + MinSweepAge: time.Hour, + LimitBytes: 5000, + }, wantExpired) +} + +// The test will fetch 4 items into the cache, named "a", "b", "c", "d", each 4000 bytes in size +// verify that the cache is evicting correct items based on the sweep settings. +// +//nolint:thelper +func verifyCacheExpiration(t *testing.T, sweepSettings cache.SweepSettings, wantEvicted []blob.ID) { cacheData := blobtesting.DataMap{} // on Windows, the time does not always move forward (sometimes clock.Now() returns exactly the same value for consecutive invocations) @@ -57,10 +115,7 @@ func TestCacheExpiration(t *testing.T) { ctx := testlogging.Context(t) cc, err := cache.NewContentCache(ctx, underlyingStorage, cache.Options{ Storage: cacheStorage.(cache.Storage), - Sweep: cache.SweepSettings{ - MaxSizeBytes: 10000, - TouchThreshold: -1, - }, + Sweep: sweepSettings, TimeNow: movingTimeFunc, }, nil) @@ -71,34 +126,26 @@ func TestCacheExpiration(t *testing.T) { var tmp gather.WriteBuffer defer tmp.Close() - err = cc.GetContent(ctx, "00000a", "content-4k", 0, -1, &tmp) // 4k + const underlyingBlobID = "content-4k" + + err = cc.GetContent(ctx, "a", underlyingBlobID, 0, -1, &tmp) // 4k require.NoError(t, err) - err = cc.GetContent(ctx, "00000b", "content-4k", 0, -1, &tmp) // 4k + err = cc.GetContent(ctx, "b", underlyingBlobID, 0, -1, &tmp) // 4k require.NoError(t, err) - err = cc.GetContent(ctx, "00000c", "content-4k", 0, -1, &tmp) // 4k + err = cc.GetContent(ctx, "c", underlyingBlobID, 0, -1, &tmp) // 4k require.NoError(t, err) - err = cc.GetContent(ctx, "00000d", "content-4k", 0, -1, &tmp) // 4k + err = cc.GetContent(ctx, "d", underlyingBlobID, 0, -1, &tmp) // 4k require.NoError(t, err) - // 00000a and 00000b will be removed from cache because it's the oldest. - // to verify, let's remove content-4k from the underlying storage and make sure we can still read - // 00000c and 00000d from the cache but not 00000a nor 00000b - require.NoError(t, underlyingStorage.DeleteBlob(ctx, "content-4k")) + // delete underlying storage blob to identify cache items that have been evicted + // all other items will be fetched from the cache. + require.NoError(t, underlyingStorage.DeleteBlob(ctx, underlyingBlobID)) - cases := []struct { - contentID string - expectedError error - }{ - {"00000a", blob.ErrBlobNotFound}, - {"00000b", blob.ErrBlobNotFound}, - {"00000c", nil}, - {"00000d", nil}, - } - - for _, tc := range cases { - got := cc.GetContent(ctx, tc.contentID, "content-4k", 0, -1, &tmp) - if assert.ErrorIs(t, got, tc.expectedError, "tc.contentID: %v", tc.contentID) { - t.Logf("got correct error %v when reading content %v", tc.expectedError, tc.contentID) + for _, blobID := range []blob.ID{"a", "b", "c", "d"} { + if slices.Contains(wantEvicted, blobID) { + require.ErrorIs(t, cc.GetContent(ctx, string(blobID), underlyingBlobID, 0, -1, &tmp), blob.ErrBlobNotFound, "expected item not found %v", blobID) + } else { + require.NoError(t, cc.GetContent(ctx, string(blobID), underlyingBlobID, 0, -1, &tmp), "expected item to be found %v", blobID) } } } diff --git a/internal/cache/mutex_map.go b/internal/cache/mutex_map.go new file mode 100644 index 000000000..67de4b6c5 --- /dev/null +++ b/internal/cache/mutex_map.go @@ -0,0 +1,95 @@ +package cache + +import ( + "sync" +) + +// manages a map of RWMutexes indexed by string keys +// mutexes are allocated on demand and released when no longer needed. +type mutexMap struct { + mu sync.Mutex + + // +checklocks:mu + entries map[string]*mutexMapEntry +} + +type mutexMapEntry struct { + mut *sync.RWMutex + refCount int +} + +// +checklocksignore. +func (m *mutexMap) exclusiveLock(key string) { + m.getMutexAndAddRef(key).Lock() +} + +func (m *mutexMap) tryExclusiveLock(key string) bool { + if !m.getMutexAndAddRef(key).TryLock() { // +checklocksignore + m.getMutexAndReleaseRef(key) + return false + } + + return true +} + +func (m *mutexMap) exclusiveUnlock(key string) { + m.getMutexAndReleaseRef(key).Unlock() // +checklocksignore +} + +// +checklocksignore. +func (m *mutexMap) sharedLock(key string) { + m.getMutexAndAddRef(key).RLock() +} + +func (m *mutexMap) trySharedLock(key string) bool { + if !m.getMutexAndAddRef(key).TryRLock() { // +checklocksignore + m.getMutexAndReleaseRef(key) + return false + } + + return true +} + +func (m *mutexMap) sharedUnlock(key string) { + m.getMutexAndReleaseRef(key).RUnlock() // +checklocksignore +} + +func (m *mutexMap) getMutexAndAddRef(key string) *sync.RWMutex { + m.mu.Lock() + defer m.mu.Unlock() + + ent := m.entries[key] + if ent == nil { + if m.entries == nil { + m.entries = make(map[string]*mutexMapEntry) + } + + ent = &mutexMapEntry{ + mut: &sync.RWMutex{}, + } + + m.entries[key] = ent + } + + ent.refCount++ + + return ent.mut +} + +func (m *mutexMap) getMutexAndReleaseRef(key string) *sync.RWMutex { + m.mu.Lock() + defer m.mu.Unlock() + + if m.entries == nil { + panic("attempted to call unlock without a lock") + } + + ent := m.entries[key] + ent.refCount-- + + if ent.refCount == 0 { + delete(m.entries, key) + } + + return ent.mut +} diff --git a/internal/cache/mutex_map_test.go b/internal/cache/mutex_map_test.go new file mode 100644 index 000000000..dfda2fc3c --- /dev/null +++ b/internal/cache/mutex_map_test.go @@ -0,0 +1,50 @@ +package cache + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMutexMap_ExclusiveLock(t *testing.T) { + var m mutexMap + + require.Len(t, m.entries, 0) + m.exclusiveLock("foo") + require.Len(t, m.entries, 1) + require.False(t, m.tryExclusiveLock("foo")) + require.True(t, m.tryExclusiveLock("bar")) + require.False(t, m.trySharedLock("bar")) + require.Len(t, m.entries, 2) + m.exclusiveUnlock("foo") + require.Len(t, m.entries, 1) + require.True(t, m.tryExclusiveLock("foo")) + require.Len(t, m.entries, 2) + m.exclusiveUnlock("foo") + require.Len(t, m.entries, 1) + m.exclusiveUnlock("bar") + require.Len(t, m.entries, 0) +} + +func TestMutexMap_SharedLock(t *testing.T) { + var m mutexMap + + require.Len(t, m.entries, 0) + m.sharedLock("foo") + require.Len(t, m.entries, 1) + m.sharedLock("foo") + require.Len(t, m.entries, 1) + require.True(t, m.trySharedLock("foo")) + require.Len(t, m.entries, 1) + + // exclusive lock can't be acquired while shared lock is held + require.False(t, m.tryExclusiveLock("foo")) + m.sharedUnlock("foo") + require.False(t, m.tryExclusiveLock("foo")) + m.sharedUnlock("foo") + require.False(t, m.tryExclusiveLock("foo")) + m.sharedUnlock("foo") + + // now exclusive lock can be acquired + require.True(t, m.tryExclusiveLock("foo")) +} diff --git a/internal/cache/persistent_lru_cache.go b/internal/cache/persistent_lru_cache.go index 4148264ff..8ce083bce 100644 --- a/internal/cache/persistent_lru_cache.go +++ b/internal/cache/persistent_lru_cache.go @@ -29,9 +29,13 @@ // PersistentCache provides persistent on-disk cache. type PersistentCache struct { + fetchMutexes mutexMap + listCacheMutex sync.Mutex // +checklocks:listCacheMutex listCache contentMetadataHeap + // +checklocks:listCacheMutex + pendingWriteBytes int64 cacheStorage Storage storageProtection cacheprot.StorageProtection @@ -51,27 +55,6 @@ func (c *PersistentCache) CacheStorage() Storage { return c.cacheStorage } -// GetFetchingMutex returns a RWMutex used to lock a blob or content during loading. -func (c *PersistentCache) GetFetchingMutex(id blob.ID) *sync.RWMutex { - if c == nil { - // special case - also works on non-initialized cache pointer. - return &sync.RWMutex{} - } - - c.listCacheMutex.Lock() - defer c.listCacheMutex.Unlock() - - if _, entry := c.listCache.LookupByID(id); entry != nil { - return &entry.contentDownloadMutex - } - - heap.Push(&c.listCache, blob.Metadata{BlobID: id}) - - _, entry := c.listCache.LookupByID(id) - - return &entry.contentDownloadMutex -} - // GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function. // The function also appends and verifies HMAC checksums using provided secret on all cached items to ensure data integrity. func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func(output *gather.WriteBuffer) error, output *gather.WriteBuffer) error { @@ -86,9 +69,8 @@ func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func( output.Reset() - mut := c.GetFetchingMutex(blob.ID(key)) - mut.Lock() - defer mut.Unlock() + c.exclusiveLock(key) + defer c.exclusiveUnlock(key) // check again while holding the mutex if c.GetFull(ctx, key, output) { @@ -117,36 +99,12 @@ func (c *PersistentCache) getPartialCacheHit(ctx context.Context, key string, le // cache hit c.reportHitBytes(int64(output.Length())) - // cache hit + mtime, err := c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) c.listCacheMutex.Lock() defer c.listCacheMutex.Unlock() - // Touching the blobs when cache is full can lead to cache never - // getting cleaned up if all the blobs fall under MinSweepAge. - // - // This can happen when the user is restoring large files (at - // comparable sizes to the cache size limitation) and MinSweepAge is - // sufficiently large. For large files which span over multiple - // blobs, every blob becomes least-recently-used. - // - // So, we'll avoid this until our cache usage drops to acceptable - // limits. - if c.isCacheFullLocked() { - c.listCacheCleanupLocked(ctx) - - if c.isCacheFullLocked() { - return - } - } - - // unlock for the expensive operation - c.listCacheMutex.Unlock() - mtime, err := c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold) - c.listCacheMutex.Lock() - if err == nil { - // insert or update the metadata - heap.Push(&c.listCache, blob.Metadata{ + c.listCache.AddOrUpdate(blob.Metadata{ BlobID: blob.ID(key), Length: length, Timestamp: mtime, @@ -154,18 +112,17 @@ func (c *PersistentCache) getPartialCacheHit(ctx context.Context, key string, le } } -func (c *PersistentCache) getPartialDeleteInvalidBlob(ctx context.Context, key string) { - // delete invalid blob - c.reportMalformedData() - +func (c *PersistentCache) deleteInvalidBlob(ctx context.Context, key string) { if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) { log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err) - } else { - c.listCacheMutex.Lock() - if i, entry := c.listCache.LookupByID(blob.ID(key)); entry != nil { - heap.Remove(&c.listCache, i) - } - c.listCacheMutex.Unlock() + return + } + + c.listCacheMutex.Lock() + defer c.listCacheMutex.Unlock() + + if i, ok := c.listCache.index[blob.ID(key)]; ok { + heap.Remove(&c.listCache, i) } } @@ -180,19 +137,21 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le defer tmp.Close() if err := c.cacheStorage.GetBlob(ctx, blob.ID(key), offset, length, &tmp); err == nil { - prot := c.storageProtection + sp := c.storageProtection + if length >= 0 { - // only full items have protection. - prot = cacheprot.NoProtection() + // do not perform integrity check on partial reads + sp = cacheprot.NoProtection() } - if err := prot.Verify(key, tmp.Bytes(), output); err == nil { + if err := sp.Verify(key, tmp.Bytes(), output); err == nil { c.getPartialCacheHit(ctx, key, length, output) return true } - c.getPartialDeleteInvalidBlob(ctx, key) + c.reportMalformedData() + c.deleteInvalidBlob(ctx, key) } // cache miss @@ -206,49 +165,34 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le return false } -// +checklocks:c.listCacheMutex -func (c *PersistentCache) isCacheFullLocked() bool { - return c.listCache.DataSize() > c.sweep.MaxSizeBytes -} - // Put adds the provided key-value pair to the cache. func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes) { if c == nil { return } - var ( - protected gather.WriteBuffer - mtime time.Time - ) - - defer protected.Close() - c.listCacheMutex.Lock() defer c.listCacheMutex.Unlock() - // opportunistically cleanup cache before the PUT if we can - if c.isCacheFullLocked() { - c.listCacheCleanupLocked(ctx) - // Do not add more things to cache if it remains full after cleanup. We - // MUST NOT go over the specified limit for the cache space to avoid - // snapshots/restores from getting affected by the cache's storage use. - if c.isCacheFullLocked() { - // Limit warnings to one per minute max. - if clock.Now().Sub(c.lastCacheWarning) > 10*time.Minute { - c.lastCacheWarning = clock.Now() - - log(ctx).Warnf("Cache is full, unable to add item into '%s' cache.", c.description) - } - - return - } - } + // make sure the cache has enough room for the new item including any protection overhead. + l := data.Length() + c.storageProtection.OverheadBytes() + c.pendingWriteBytes += int64(l) + c.sweepLocked(ctx) // LOCK RELEASED for expensive operations c.listCacheMutex.Unlock() + + var protected gather.WriteBuffer + defer protected.Close() + c.storageProtection.Protect(key, data, &protected) + if protected.Length() != l { + log(ctx).Panicf("protection overhead mismatch, assumed %v got %v", l, protected.Length()) + } + + var mtime time.Time + if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{GetModTime: &mtime}); err != nil { c.reportStoreError() @@ -258,13 +202,12 @@ func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes c.listCacheMutex.Lock() // LOCK RE-ACQUIRED - c.listCache.Push(blob.Metadata{ + c.pendingWriteBytes -= int64(protected.Length()) + c.listCache.AddOrUpdate(blob.Metadata{ BlobID: blob.ID(key), Length: int64(protected.Bytes().Length()), Timestamp: mtime, }) - - c.listCacheCleanupLocked(ctx) } // Close closes the instance of persistent cache possibly waiting for at least one sweep to complete. @@ -276,16 +219,11 @@ func (c *PersistentCache) Close(ctx context.Context) { releasable.Released("persistent-cache", c) } -type blobCacheEntry struct { - metadata blob.Metadata - contentDownloadMutex sync.RWMutex -} - // A contentMetadataHeap implements heap.Interface and holds blob.Metadata. type contentMetadataHeap struct { - data []*blobCacheEntry - index map[blob.ID]int - dataSize int64 + data []blob.Metadata + index map[blob.ID]int + totalDataBytes int64 } func newContentMetadataHeap() contentMetadataHeap { @@ -295,86 +233,92 @@ func newContentMetadataHeap() contentMetadataHeap { func (h contentMetadataHeap) Len() int { return len(h.data) } func (h contentMetadataHeap) Less(i, j int) bool { - return h.data[i].metadata.Timestamp.Before(h.data[j].metadata.Timestamp) + return h.data[i].Timestamp.Before(h.data[j].Timestamp) } func (h contentMetadataHeap) Swap(i, j int) { - h.index[h.data[i].metadata.BlobID], h.index[h.data[j].metadata.BlobID] = h.index[h.data[j].metadata.BlobID], h.index[h.data[i].metadata.BlobID] + iBlobID := h.data[i].BlobID + jBlobID := h.data[j].BlobID + + h.index[iBlobID], h.index[jBlobID] = h.index[jBlobID], h.index[iBlobID] h.data[i], h.data[j] = h.data[j], h.data[i] } -func (h *contentMetadataHeap) Push(x interface{}) { +func (h *contentMetadataHeap) Push(x any) { bm := x.(blob.Metadata) //nolint:forcetypeassert + + h.index[bm.BlobID] = len(h.data) + h.data = append(h.data, bm) + h.totalDataBytes += bm.Length +} + +func (h *contentMetadataHeap) AddOrUpdate(bm blob.Metadata) { if i, exists := h.index[bm.BlobID]; exists { // only accept newer timestamps - if h.data[i].metadata.Timestamp.IsZero() || bm.Timestamp.After(h.data[i].metadata.Timestamp) { - h.dataSize += bm.Length - h.data[i].metadata.Length - h.data[i] = &blobCacheEntry{metadata: bm} + if bm.Timestamp.After(h.data[i].Timestamp) { + h.totalDataBytes += bm.Length - h.data[i].Length + h.data[i] = bm heap.Fix(h, i) } } else { - h.index[bm.BlobID] = len(h.data) - h.data = append(h.data, &blobCacheEntry{metadata: bm}) - h.dataSize += bm.Length + heap.Push(h, bm) } } -func (h *contentMetadataHeap) Pop() interface{} { +func (h *contentMetadataHeap) Pop() any { old := h.data n := len(old) item := old[n-1] h.data = old[0 : n-1] - h.dataSize -= item.metadata.Length - delete(h.index, item.metadata.BlobID) + h.totalDataBytes -= item.Length + delete(h.index, item.BlobID) - return item.metadata + return item } -func (h *contentMetadataHeap) LookupByID(id blob.ID) (int, *blobCacheEntry) { - i, ok := h.index[id] - if !ok { - return -1, nil - } - - return i, h.data[i] -} - -func (h contentMetadataHeap) DataSize() int64 { return h.dataSize } - // +checklocks:c.listCacheMutex -func (c *PersistentCache) listCacheCleanupLocked(ctx context.Context) { +func (c *PersistentCache) aboveSoftLimit(extraBytes int64) bool { + return c.listCache.totalDataBytes+extraBytes+c.pendingWriteBytes > c.sweep.MaxSizeBytes +} + +// +checklocks:c.listCacheMutex +func (c *PersistentCache) aboveHardLimit(extraBytes int64) bool { + if c.sweep.LimitBytes <= 0 { + return false + } + + return c.listCache.totalDataBytes+extraBytes+c.pendingWriteBytes > c.sweep.LimitBytes +} + +// +checklocks:c.listCacheMutex +func (c *PersistentCache) sweepLocked(ctx context.Context) { var ( unsuccessfulDeletes []blob.Metadata - unsuccessfulDeletesSize int64 + unsuccessfulDeleteBytes int64 now = c.timeNow() ) - // if there are blobs pending to be deleted ... - for c.listCache.DataSize() > 0 && - // ... and everything including what we couldn't delete is still bigger than the threshold - (c.listCache.DataSize()+unsuccessfulDeletesSize) > c.sweep.MaxSizeBytes { - oldest := heap.Pop(&c.listCache).(blob.Metadata) //nolint:forcetypeassert + for len(c.listCache.data) > 0 && (c.aboveSoftLimit(unsuccessfulDeleteBytes) || c.aboveHardLimit(unsuccessfulDeleteBytes)) { + // examine the oldest cache item without removing it from the heap. + oldest := c.listCache.data[0] - // stop here if the oldest item is below the specified minimal age - if age := now.Sub(oldest.Timestamp); age < c.sweep.MinSweepAge { - heap.Push(&c.listCache, oldest) + if age := now.Sub(oldest.Timestamp); age < c.sweep.MinSweepAge && !c.aboveHardLimit(unsuccessfulDeleteBytes) { + // the oldest item is below the specified minimal sweep age and we're below the hard limit, stop here break } - // unlock before the expensive operation - c.listCacheMutex.Unlock() - delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID) - c.listCacheMutex.Lock() + heap.Pop(&c.listCache) + + if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { + log(ctx).Warnw("unable to remove cache item", "cache", c.description, "item", oldest.BlobID, "err", delerr) - if delerr != nil { - log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr) // accumulate unsuccessful deletes to be pushed back into the heap // later so we do not attempt deleting the same blob multiple times // // after this we keep draining from the heap until we bring down // c.listCache.DataSize() to zero unsuccessfulDeletes = append(unsuccessfulDeletes, oldest) - unsuccessfulDeletesSize += oldest.Length + unsuccessfulDeleteBytes += oldest.Length } } @@ -411,9 +355,7 @@ func (c *PersistentCache) initialScan(ctx context.Context) error { return errors.Wrapf(err, "error listing %v", c.description) } - if c.isCacheFullLocked() { - c.listCacheCleanupLocked(ctx) - } + c.sweepLocked(ctx) dur := timer.Elapsed() @@ -422,27 +364,61 @@ func (c *PersistentCache) initialScan(ctx context.Context) error { inUsePercent := int64(hundredPercent) if c.sweep.MaxSizeBytes != 0 { - inUsePercent = hundredPercent * c.listCache.DataSize() / c.sweep.MaxSizeBytes + inUsePercent = hundredPercent * c.listCache.totalDataBytes / c.sweep.MaxSizeBytes } log(ctx).Debugw( "finished initial cache scan", "cache", c.description, "duration", dur, - "totalRetainedSize", c.listCache.DataSize(), + "totalRetainedSize", c.listCache.totalDataBytes, "tooRecentBytes", tooRecentBytes, "tooRecentCount", tooRecentCount, "maxSizeBytes", c.sweep.MaxSizeBytes, + "limitBytes", c.sweep.LimitBytes, "inUsePercent", inUsePercent, ) return nil } +func (c *PersistentCache) exclusiveLock(key string) { + if c != nil { + c.fetchMutexes.exclusiveLock(key) + } +} + +func (c *PersistentCache) exclusiveUnlock(key string) { + if c != nil { + c.fetchMutexes.exclusiveUnlock(key) + } +} + +func (c *PersistentCache) sharedLock(key string) { + if c != nil { + c.fetchMutexes.sharedLock(key) + } +} + +func (c *PersistentCache) sharedUnlock(key string) { + if c != nil { + c.fetchMutexes.sharedUnlock(key) + } +} + // SweepSettings encapsulates settings that impact cache item sweep/expiration. type SweepSettings struct { - MaxSizeBytes int64 - MinSweepAge time.Duration + // soft limit, the cache will be limited to this size, except for items newer than MinSweepAge. + MaxSizeBytes int64 + + // hard limit, if non-zero the cache will be limited to this size, regardless of MinSweepAge. + LimitBytes int64 + + // items older than this will never be removed from the cache except when the cache is above + // HardMaxSizeBytes. + MinSweepAge time.Duration + + // on each use, items will be touched if they have not been touched in this long. TouchThreshold time.Duration } diff --git a/internal/cache/persistent_lru_cache_test.go b/internal/cache/persistent_lru_cache_test.go index 2f4ef66bf..6012c9d9e 100644 --- a/internal/cache/persistent_lru_cache_test.go +++ b/internal/cache/persistent_lru_cache_test.go @@ -25,7 +25,7 @@ func TestPersistentLRUCache(t *testing.T) { const maxSizeBytes = 1000 - cs := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil).(cache.Storage) + cs := blobtesting.NewMapStorageWithLimit(blobtesting.DataMap{}, nil, nil, maxSizeBytes).(cache.Storage) pc, err := cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ MaxSizeBytes: maxSizeBytes, @@ -148,11 +148,13 @@ func TestPersistentLRUCache_GetDeletesInvalidBlob(t *testing.T) { data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, nil) + const maxSizeBytes = 1000 + + st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes) fs := blobtesting.NewFaultyStorage(st) fc := faultyCache{fs} - pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{MaxSizeBytes: 100}, nil, clock.Now) + pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{MaxSizeBytes: maxSizeBytes}, nil, clock.Now) require.NoError(t, err) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) @@ -204,17 +206,19 @@ func TestPersistentLRUCache_SweepMinSweepAge(t *testing.T) { data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, nil) + const maxSizeBytes = 1000 + + st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes) fs := blobtesting.NewFaultyStorage(st) fc := faultyCache{fs} pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ - MaxSizeBytes: 1000, + MaxSizeBytes: maxSizeBytes, MinSweepAge: 10 * time.Second, }, nil, clock.Now) require.NoError(t, err) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) - pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6))) + pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 10))) time.Sleep(1 * time.Second) // simulate error during final sweep @@ -232,12 +236,14 @@ func TestPersistentLRUCache_SweepIgnoresErrors(t *testing.T) { data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, nil) + const maxSizeBytes = 1000 + + st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes) fs := blobtesting.NewFaultyStorage(st) fc := faultyCache{fs} pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ - MaxSizeBytes: 1000, + MaxSizeBytes: maxSizeBytes, }, nil, clock.Now) require.NoError(t, err) @@ -245,7 +251,7 @@ func TestPersistentLRUCache_SweepIgnoresErrors(t *testing.T) { fs.AddFault(blobtesting.MethodDeleteBlob).ErrorInstead(errors.Errorf("some delete error")).Repeat(1e6) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) - pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6))) + pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 10))) time.Sleep(500 * time.Millisecond) // simulate error during sweep @@ -264,12 +270,14 @@ func TestPersistentLRUCache_Sweep1(t *testing.T) { data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, nil) + const maxSizeBytes = 1 + + st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes) fs := blobtesting.NewFaultyStorage(st) fc := faultyCache{fs} pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{ - MaxSizeBytes: 1, + MaxSizeBytes: maxSizeBytes, MinSweepAge: 0 * time.Second, }, nil, clock.Now) require.NoError(t, err) @@ -291,13 +299,6 @@ func TestPersistentLRUCacheNil(t *testing.T) { pc.Close(ctx) pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3})) - m1 := pc.GetFetchingMutex("dummy") - m2 := pc.GetFetchingMutex("dummy") - - require.NotNil(t, m1) - require.NotNil(t, m2) - require.NotSame(t, m1, m2) - var tmp gather.WriteBuffer require.False(t, pc.GetFull(ctx, "key", &tmp)) diff --git a/internal/cacheprot/storage_protection.go b/internal/cacheprot/storage_protection.go index 7de3ce2e2..a9616605a 100644 --- a/internal/cacheprot/storage_protection.go +++ b/internal/cacheprot/storage_protection.go @@ -19,6 +19,7 @@ type StorageProtection interface { Protect(id string, input gather.Bytes, output *gather.WriteBuffer) Verify(id string, input gather.Bytes, output *gather.WriteBuffer) error + OverheadBytes() int } type nullStorageProtection struct{} @@ -35,6 +36,10 @@ func (nullStorageProtection) Verify(_ string, input gather.Bytes, output *gather return nil } +func (nullStorageProtection) OverheadBytes() int { + return 0 +} + // NoProtection returns implementation of StorageProtection that offers no protection. func NoProtection() StorageProtection { return nullStorageProtection{} @@ -55,6 +60,10 @@ func (p checksumProtection) Verify(_ string, input gather.Bytes, output *gather. return hmac.VerifyAndStrip(input, p.Secret, output) } +func (p checksumProtection) OverheadBytes() int { + return sha256.Size +} + // ChecksumProtection returns StorageProtection that protects cached data using HMAC checksums without encryption. func ChecksumProtection(key []byte) StorageProtection { return checksumProtection{key} @@ -85,6 +94,10 @@ func (p authenticatedEncryptionProtection) Verify(id string, input gather.Bytes, return nil } +func (p authenticatedEncryptionProtection) OverheadBytes() int { + return p.e.Overhead() +} + type authenticatedEncryptionProtectionKey []byte func (k authenticatedEncryptionProtectionKey) GetEncryptionAlgorithm() string { diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 60803cca9..53171d6d2 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -48,8 +48,8 @@ func testServer(t *testing.T, disableGRPC bool) { Username: servertesting.TestUsername, Hostname: servertesting.TestHostname, }, content.CachingOptions{ - CacheDirectory: testutil.TempDirectory(t), - MaxCacheSizeBytes: maxCacheSizeBytes, + CacheDirectory: testutil.TempDirectory(t), + ContentCacheSizeBytes: maxCacheSizeBytes, }, servertesting.TestPassword, &repo.Options{}) // cancel immediately to ensure we did not spawn goroutines that depend on ctx inside diff --git a/repo/caching.go b/repo/caching.go index 484d57ede..8aeb5ce31 100644 --- a/repo/caching.go +++ b/repo/caching.go @@ -39,7 +39,7 @@ func SetCachingOptions(ctx context.Context, configFile string, opt *content.Cach func setupCachingOptionsWithDefaults(ctx context.Context, configPath string, lc *LocalConfig, opt *content.CachingOptions, uniqueID []byte) error { opt = opt.CloneOrDefault() - if opt.MaxCacheSizeBytes == 0 { + if opt.ContentCacheSizeBytes == 0 { lc.Caching = &content.CachingOptions{} return nil } @@ -67,14 +67,16 @@ func setupCachingOptionsWithDefaults(ctx context.Context, configPath string, lc lc.Caching.CacheDirectory = d } - lc.Caching.MaxCacheSizeBytes = opt.MaxCacheSizeBytes - lc.Caching.MaxMetadataCacheSizeBytes = opt.MaxMetadataCacheSizeBytes + lc.Caching.ContentCacheSizeBytes = opt.ContentCacheSizeBytes + lc.Caching.ContentCacheSizeLimitBytes = opt.ContentCacheSizeLimitBytes + lc.Caching.MetadataCacheSizeBytes = opt.MetadataCacheSizeBytes + lc.Caching.MetadataCacheSizeLimitBytes = opt.MetadataCacheSizeLimitBytes lc.Caching.MaxListCacheDuration = opt.MaxListCacheDuration lc.Caching.MinContentSweepAge = opt.MinContentSweepAge lc.Caching.MinMetadataSweepAge = opt.MinMetadataSweepAge lc.Caching.MinIndexSweepAge = opt.MinIndexSweepAge - log(ctx).Debugf("Creating cache directory '%v' with max size %v", lc.Caching.CacheDirectory, lc.Caching.MaxCacheSizeBytes) + log(ctx).Debugf("Creating cache directory '%v' with max size %v", lc.Caching.CacheDirectory, lc.Caching.ContentCacheSizeBytes) return nil } diff --git a/repo/content/caching_options.go b/repo/content/caching_options.go index f456d9755..cab93af17 100644 --- a/repo/content/caching_options.go +++ b/repo/content/caching_options.go @@ -19,14 +19,26 @@ func (s DurationSeconds) DurationOrDefault(def time.Duration) time.Duration { // CachingOptions specifies configuration of local cache. type CachingOptions struct { - CacheDirectory string `json:"cacheDirectory,omitempty"` - MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"` - MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"` - MaxListCacheDuration DurationSeconds `json:"maxListCacheDuration,omitempty"` - MinMetadataSweepAge DurationSeconds `json:"minMetadataSweepAge,omitempty"` - MinContentSweepAge DurationSeconds `json:"minContentSweepAge,omitempty"` - MinIndexSweepAge DurationSeconds `json:"minIndexSweepAge,omitempty"` - HMACSecret []byte `json:"-"` + CacheDirectory string `json:"cacheDirectory,omitempty"` + ContentCacheSizeBytes int64 `json:"maxCacheSize,omitempty"` + ContentCacheSizeLimitBytes int64 `json:"contentCacheSizeLimitBytes,omitempty"` + MetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"` + MetadataCacheSizeLimitBytes int64 `json:"metadataCacheSizeLimitBytes,omitempty"` + MaxListCacheDuration DurationSeconds `json:"maxListCacheDuration,omitempty"` + MinMetadataSweepAge DurationSeconds `json:"minMetadataSweepAge,omitempty"` + MinContentSweepAge DurationSeconds `json:"minContentSweepAge,omitempty"` + MinIndexSweepAge DurationSeconds `json:"minIndexSweepAge,omitempty"` + HMACSecret []byte `json:"-"` +} + +// EffectiveMetadataCacheSizeBytes returns the effective metadata cache size. +func (c *CachingOptions) EffectiveMetadataCacheSizeBytes() int64 { + if c.MetadataCacheSizeBytes == 0 { + // legacy path, use the same size for both caches. + return c.ContentCacheSizeBytes + } + + return c.MetadataCacheSizeBytes } // CloneOrDefault returns a clone of the caching options or empty options for nil. diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 02be512bc..97d6aaa7a 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -420,48 +420,61 @@ func (sm *SharedManager) namedLogger(n string) logging.Logger { return sm.contextLogger } +func contentCacheSweepSettings(caching *CachingOptions) cache.SweepSettings { + return cache.SweepSettings{ + MaxSizeBytes: caching.ContentCacheSizeBytes, + LimitBytes: caching.ContentCacheSizeLimitBytes, + MinSweepAge: caching.MinContentSweepAge.DurationOrDefault(DefaultDataCacheSweepAge), + } +} + +func metadataCacheSizeSweepSettings(caching *CachingOptions) cache.SweepSettings { + return cache.SweepSettings{ + MaxSizeBytes: caching.EffectiveMetadataCacheSizeBytes(), + LimitBytes: caching.MetadataCacheSizeLimitBytes, + MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge), + } +} + +func indexBlobCacheSweepSettings(caching *CachingOptions) cache.SweepSettings { + return cache.SweepSettings{ + MaxSizeBytes: caching.EffectiveMetadataCacheSizeBytes(), + MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge), + } +} + func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *CachingOptions, mr *metrics.Registry) error { dataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{ BaseCacheDirectory: caching.CacheDirectory, CacheSubDir: "contents", HMACSecret: caching.HMACSecret, - Sweep: cache.SweepSettings{ - MaxSizeBytes: caching.MaxCacheSizeBytes, - MinSweepAge: caching.MinContentSweepAge.DurationOrDefault(DefaultDataCacheSweepAge), - }, + Sweep: contentCacheSweepSettings(caching), }, mr) if err != nil { return errors.Wrap(err, "unable to initialize content cache") } - metadataCacheSize := caching.MaxMetadataCacheSizeBytes - if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 { - metadataCacheSize = caching.MaxCacheSizeBytes - } - metadataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{ BaseCacheDirectory: caching.CacheDirectory, CacheSubDir: "metadata", HMACSecret: caching.HMACSecret, FetchFullBlobs: true, - Sweep: cache.SweepSettings{ - MaxSizeBytes: metadataCacheSize, - MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge), - }, + Sweep: metadataCacheSizeSweepSettings(caching), }, mr) if err != nil { return errors.Wrap(err, "unable to initialize metadata cache") } - indexBlobStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "index-blobs") + indexBlobStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, caching.EffectiveMetadataCacheSizeBytes(), "index-blobs") if err != nil { return errors.Wrap(err, "unable to initialize index blob cache storage") } - indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs", indexBlobStorage, cacheprot.ChecksumProtection(caching.HMACSecret), cache.SweepSettings{ - MaxSizeBytes: metadataCacheSize, - MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge), - }, mr, sm.timeNow) + indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs", + indexBlobStorage, + cacheprot.ChecksumProtection(caching.HMACSecret), + indexBlobCacheSweepSettings(caching), + mr, sm.timeNow) if err != nil { return errors.Wrap(err, "unable to create index blob cache") } diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 153ff8302..51d7c4f44 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -2114,9 +2114,9 @@ func (s *contentManagerSuite) TestContentCachingByFormat(t *testing.T) { // create two managers sharing cache directory co := CachingOptions{ - CacheDirectory: cd, - MaxCacheSizeBytes: 100e6, - MaxMetadataCacheSizeBytes: 100e6, + CacheDirectory: cd, + ContentCacheSizeBytes: 100e6, + MetadataCacheSizeBytes: 100e6, } compressibleData := gather.FromSlice(bytes.Repeat([]byte{1, 2, 3, 4}, 10000)) @@ -2164,9 +2164,9 @@ func (s *contentManagerSuite) TestPrefetchContent(t *testing.T) { cd := testutil.TempDirectory(t) bm := s.newTestContentManagerWithTweaks(t, st, &contentManagerTestTweaks{ CachingOptions: CachingOptions{ - CacheDirectory: cd, - MaxCacheSizeBytes: 100e6, - MaxMetadataCacheSizeBytes: 100e6, + CacheDirectory: cd, + ContentCacheSizeBytes: 100e6, + MetadataCacheSizeBytes: 100e6, }, maxPackSize: 20e6, }) diff --git a/repo/open.go b/repo/open.go index 9aa44c1e2..06b39ad1b 100644 --- a/repo/open.go +++ b/repo/open.go @@ -133,7 +133,7 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string, mr *metrics.Registry, timeNow func() time.Time) (*cache.PersistentCache, error) { opt = opt.CloneOrDefault() - cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.MaxCacheSizeBytes, "server-contents") + cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.ContentCacheSizeBytes, "server-contents") if cs == nil { // this may be (nil, nil) or (nil, err) return nil, errors.Wrap(err, "error opening storage") @@ -154,7 +154,8 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass } pc, err := cache.NewPersistentCache(ctx, "cache-storage", cs, prot, cache.SweepSettings{ - MaxSizeBytes: opt.MaxCacheSizeBytes, + MaxSizeBytes: opt.ContentCacheSizeBytes, + LimitBytes: opt.ContentCacheSizeLimitBytes, MinSweepAge: opt.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge), }, mr, timeNow) if err != nil { diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index 02dff7926..2b02c1136 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -232,8 +232,8 @@ func runStress(t *testing.T, opt *StressOptions) { if err = repo.Connect(ctx, configFile, st, masterPassword, &repo.ConnectOptions{ CachingOptions: content.CachingOptions{ - CacheDirectory: filepath.Join(tmpPath, fmt.Sprintf("cache-%v", i)), - MaxCacheSizeBytes: 2000000000, + CacheDirectory: filepath.Join(tmpPath, fmt.Sprintf("cache-%v", i)), + ContentCacheSizeBytes: 2000000000, }, }); err != nil { t.Fatalf("unable to connect %v: %v", configFile, err)