feat(repository): added hard size limit to the on-disk cache (#3238)

* test(providers): added capacity limits to blobtesting.mapStorage

* refactor(general): added mutex map which dynamically allocates and releases named mutexes

* refactor(repository): refactored cache cleanup and limit enforcement

* refactor(repository): plumb through cache size limits in the repository

* feat(cli): added CLI options to set cache size limits

* unified flag setting and field naming

* Update cli/command_cache_set.go

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>

* pr feedback

---------

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
This commit is contained in:
Jarek Kowalski
2023-08-24 09:38:56 -07:00
committed by GitHub
parent 1fd1ff0151
commit fe55dcb6a2
20 changed files with 654 additions and 295 deletions

View File

@@ -46,10 +46,16 @@ func (c *commandCacheInfo) run(ctx context.Context, _ repo.Repository) error {
return errors.Wrap(err, "unable to scan cache directory")
}
path2Limit := map[string]int64{
"contents": opts.MaxCacheSizeBytes,
"metadata": opts.MaxMetadataCacheSizeBytes,
"server-contents": opts.MaxCacheSizeBytes,
path2SoftLimit := map[string]int64{
"contents": opts.ContentCacheSizeBytes,
"metadata": opts.MetadataCacheSizeBytes,
"server-contents": opts.ContentCacheSizeBytes,
}
path2HardLimit := map[string]int64{
"contents": opts.ContentCacheSizeLimitBytes,
"metadata": opts.MetadataCacheSizeLimitBytes,
"server-contents": opts.ContentCacheSizeLimitBytes,
}
path2SweepAgeSeconds := map[string]time.Duration{
@@ -72,12 +78,24 @@ func (c *commandCacheInfo) run(ctx context.Context, _ repo.Repository) error {
}
maybeLimit := ""
if l, ok := path2Limit[ent.Name()]; ok {
maybeLimit = fmt.Sprintf(" (limit %v, min sweep age %v)", units.BytesString(l), path2SweepAgeSeconds[ent.Name()])
if l, ok := path2SoftLimit[ent.Name()]; ok {
var hardLimit string
if hl := path2HardLimit[ent.Name()]; hl > 0 {
hardLimit = units.BytesString(hl)
} else {
hardLimit = "none"
}
maybeLimit = fmt.Sprintf(" (soft limit: %v, hard limit: %v, min sweep age: %v)",
units.BytesString(l),
hardLimit,
path2SweepAgeSeconds[ent.Name()])
}
if ent.Name() == "blob-list" {
maybeLimit = fmt.Sprintf(" (duration %v)", opts.MaxListCacheDuration.DurationOrDefault(0))
maybeLimit = fmt.Sprintf(" (duration: %v)", opts.MaxListCacheDuration.DurationOrDefault(0))
}
c.out.printStdout("%v: %v files %v%v\n", subdir, fileCount, units.BytesString(totalFileSize), maybeLimit)

View File

@@ -4,6 +4,7 @@
"context"
"time"
"github.com/alecthomas/kingpin/v2"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/units"
@@ -11,14 +12,36 @@
"github.com/kopia/kopia/repo/content"
)
type cacheSizeFlags struct {
contentCacheSizeMB int64
contentCacheSizeLimitMB int64
contentMinSweepAge time.Duration
metadataCacheSizeMB int64
metadataCacheSizeLimitMB int64
metadataMinSweepAge time.Duration
maxListCacheDuration time.Duration
indexMinSweepAge time.Duration
}
func (c *cacheSizeFlags) setup(cmd *kingpin.CmdClause) {
// do not use Defaults here, since this structure is shared between connect/create/set commands
// each command will set their default values in code.
cmd.Flag("content-cache-size-mb", "Desired size of local content cache (soft limit)").PlaceHolder("MB").Int64Var(&c.contentCacheSizeMB)
cmd.Flag("content-cache-size-limit-mb", "Maximum size of local content cache (hard limit)").PlaceHolder("MB").Int64Var(&c.contentCacheSizeLimitMB)
cmd.Flag("content-min-sweep-age", "Minimal age of content cache item to be subject to sweeping").DurationVar(&c.contentMinSweepAge)
cmd.Flag("metadata-cache-size-mb", "Desired size of local metadata cache (soft limit)").PlaceHolder("MB").Int64Var(&c.metadataCacheSizeMB)
cmd.Flag("metadata-cache-size-limit-mb", "Maximum size of local metadata cache (hard limit)").PlaceHolder("MB").Int64Var(&c.metadataCacheSizeLimitMB)
cmd.Flag("metadata-min-sweep-age", "Minimal age of metadata cache item to be subject to sweeping").DurationVar(&c.metadataMinSweepAge)
cmd.Flag("index-min-sweep-age", "Minimal age of index cache item to be subject to sweeping").DurationVar(&c.indexMinSweepAge)
cmd.Flag("max-list-cache-duration", "Duration of index cache").DurationVar(&c.maxListCacheDuration)
}
type commandCacheSetParams struct {
directory string
contentCacheSizeMB int64
maxMetadataCacheSizeMB int64
maxListCacheDuration time.Duration
contentMinSweepAge time.Duration
metadataMinSweepAge time.Duration
indexMinSweepAge time.Duration
directory string
cacheSizeFlags
svc appServices
}
@@ -30,14 +53,14 @@ func (c *commandCacheSetParams) setup(svc appServices, parent commandParent) {
c.metadataMinSweepAge = -1
c.indexMinSweepAge = -1
c.maxListCacheDuration = -1
c.contentCacheSizeLimitMB = -1
c.contentCacheSizeMB = -1
c.metadataCacheSizeLimitMB = -1
c.metadataCacheSizeMB = -1
c.cacheSizeFlags.setup(cmd)
cmd.Flag("cache-directory", "Directory where to store cache files").StringVar(&c.directory)
cmd.Flag("content-cache-size-mb", "Size of local content cache").PlaceHolder("MB").Default("-1").Int64Var(&c.contentCacheSizeMB)
cmd.Flag("content-min-sweep-age", "Minimal age of content cache item to be subject to sweeping").DurationVar(&c.contentMinSweepAge)
cmd.Flag("metadata-cache-size-mb", "Size of local metadata cache").PlaceHolder("MB").Default("-1").Int64Var(&c.maxMetadataCacheSizeMB)
cmd.Flag("metadata-min-sweep-age", "Minimal age of metadata cache item to be subject to sweeping").DurationVar(&c.metadataMinSweepAge)
cmd.Flag("index-min-sweep-age", "Minimal age of index cache item to be subject to sweeping").DurationVar(&c.indexMinSweepAge)
cmd.Flag("max-list-cache-duration", "Duration of index cache").DurationVar(&c.maxListCacheDuration)
cmd.Action(svc.repositoryWriterAction(c.run))
c.svc = svc
}
@@ -59,14 +82,28 @@ func (c *commandCacheSetParams) run(ctx context.Context, _ repo.RepositoryWriter
if v := c.contentCacheSizeMB; v != -1 {
v *= 1e6 // convert MB to bytes
log(ctx).Infof("changing content cache size to %v", units.BytesString(v))
opts.MaxCacheSizeBytes = v
opts.ContentCacheSizeBytes = v
changed++
}
if v := c.maxMetadataCacheSizeMB; v != -1 {
if v := c.contentCacheSizeLimitMB; v != -1 {
v *= 1e6 // convert MB to bytes
log(ctx).Infof("changing content cache size limit to %v", units.BytesString(v))
opts.ContentCacheSizeLimitBytes = v
changed++
}
if v := c.metadataCacheSizeMB; v != -1 {
v *= 1e6 // convert MB to bytes
log(ctx).Infof("changing metadata cache size to %v", units.BytesString(v))
opts.MaxMetadataCacheSizeBytes = v
opts.MetadataCacheSizeBytes = v
changed++
}
if v := c.metadataCacheSizeLimitMB; v != -1 {
v *= 1e6 // convert MB to bytes
log(ctx).Infof("changing metadata cache size limit to %v", units.BytesString(v))
opts.MetadataCacheSizeLimitBytes = v
changed++
}

View File

@@ -30,13 +30,21 @@ func TestCacheSet(t *testing.T) {
"cache", "set",
"--cache-directory", ncd,
"--content-cache-size-mb=33",
"--content-cache-size-limit-mb=331",
"--metadata-cache-size-mb=44",
"--metadata-cache-size-limit-mb=441",
)
out := env.RunAndExpectSuccess(t, "cache", "info")
require.Contains(t, mustGetLineContaining(t, out, "33 MB"), ncd)
require.Contains(t, mustGetLineContaining(t, out, "33 MB"), "contents")
require.Contains(t, mustGetLineContaining(t, out, "44 MB"), "metadata")
require.Contains(t, mustGetLineContaining(t, out, "soft limit: 33 MB"), "contents")
require.Contains(t, mustGetLineContaining(t, out, "hard limit: 331 MB"), "contents")
require.Contains(t, mustGetLineContaining(t, out, "min sweep age: 10m0s"), "contents")
require.Contains(t, mustGetLineContaining(t, out, "soft limit: 44 MB"), "metadata")
require.Contains(t, mustGetLineContaining(t, out, "hard limit: 441 MB"), "metadata")
require.Contains(t, mustGetLineContaining(t, out, "min sweep age: 24h0m0s"), "metadata")
require.Contains(t, mustGetLineContaining(t, out, "55s"), "blob-list")
}

View File

@@ -46,10 +46,10 @@ func (c *commandRepositoryConnect) setup(svc advancedAppServices, parent command
}
type connectOptions struct {
connectCacheDirectory string
connectMaxCacheSizeMB int64
connectMaxMetadataCacheSizeMB int64
connectMaxListCacheDuration time.Duration
connectCacheDirectory string
cacheSizeFlags
connectHostname string
connectUsername string
connectCheckForUpdates bool
@@ -66,9 +66,12 @@ func (c *connectOptions) setup(svc appServices, cmd *kingpin.CmdClause) {
// Set up flags shared between 'create' and 'connect'. Note that because those flags are used by both command
// we must use *Var() methods, otherwise one of the commands would always get default flag values.
cmd.Flag("cache-directory", "Cache directory").PlaceHolder("PATH").Envar(svc.EnvName("KOPIA_CACHE_DIRECTORY")).StringVar(&c.connectCacheDirectory)
cmd.Flag("content-cache-size-mb", "Size of local content cache").PlaceHolder("MB").Default("5000").Int64Var(&c.connectMaxCacheSizeMB)
cmd.Flag("metadata-cache-size-mb", "Size of local metadata cache").PlaceHolder("MB").Default("5000").Int64Var(&c.connectMaxMetadataCacheSizeMB)
cmd.Flag("max-list-cache-duration", "Duration of index cache").Default("30s").Hidden().DurationVar(&c.connectMaxListCacheDuration)
c.maxListCacheDuration = 30 * time.Second //nolint:gomnd
c.contentCacheSizeMB = 5000
c.metadataCacheSizeMB = 5000
c.cacheSizeFlags.setup(cmd)
cmd.Flag("override-hostname", "Override hostname used by this repository connection").Hidden().StringVar(&c.connectHostname)
cmd.Flag("override-username", "Override username used by this repository connection").Hidden().StringVar(&c.connectUsername)
cmd.Flag("check-for-updates", "Periodically check for Kopia updates on GitHub").Default("true").Envar(svc.EnvName(checkForUpdatesEnvar)).BoolVar(&c.connectCheckForUpdates)
@@ -91,10 +94,15 @@ func (c *connectOptions) getFormatBlobCacheDuration() time.Duration {
func (c *connectOptions) toRepoConnectOptions() *repo.ConnectOptions {
return &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: c.connectCacheDirectory,
MaxCacheSizeBytes: c.connectMaxCacheSizeMB << 20, //nolint:gomnd
MaxMetadataCacheSizeBytes: c.connectMaxMetadataCacheSizeMB << 20, //nolint:gomnd
MaxListCacheDuration: content.DurationSeconds(c.connectMaxListCacheDuration.Seconds()),
CacheDirectory: c.connectCacheDirectory,
ContentCacheSizeBytes: c.contentCacheSizeMB << 20, //nolint:gomnd
ContentCacheSizeLimitBytes: c.contentCacheSizeLimitMB << 20, //nolint:gomnd
MetadataCacheSizeBytes: c.metadataCacheSizeMB << 20, //nolint:gomnd
MetadataCacheSizeLimitBytes: c.metadataCacheSizeLimitMB << 20, //nolint:gomnd
MaxListCacheDuration: content.DurationSeconds(c.maxListCacheDuration.Seconds()),
MinContentSweepAge: content.DurationSeconds(c.contentMinSweepAge.Seconds()),
MinMetadataSweepAge: content.DurationSeconds(c.metadataMinSweepAge.Seconds()),
MinIndexSweepAge: content.DurationSeconds(c.indexMinSweepAge.Seconds()),
},
ClientOptions: repo.ClientOptions{
Hostname: c.connectHostname,

View File

@@ -25,7 +25,25 @@ type mapStorage struct {
keyTime map[blob.ID]time.Time
// +checklocks:mutex
timeNow func() time.Time
mutex sync.RWMutex
// +checklocks:mutex
totalBytes int64
// +checklocksignore
limit int64
mutex sync.RWMutex
}
func (s *mapStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) {
if s.limit < 0 {
return blob.Capacity{}, blob.ErrNotAVolume
}
s.mutex.RLock()
defer s.mutex.RUnlock()
return blob.Capacity{
SizeB: uint64(s.limit),
FreeB: uint64(s.limit - s.totalBytes),
}, nil
}
func (s *mapStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error {
@@ -90,17 +108,23 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, o
s.mutex.Lock()
defer s.mutex.Unlock()
var b bytes.Buffer
data.WriteTo(&b)
if s.limit >= 0 && s.totalBytes+int64(b.Len()) > s.limit {
return errors.Errorf("exceeded limit, unable to add %v bytes, currently using %v/%v", b.Len(), s.totalBytes, s.limit)
}
if !opts.SetModTime.IsZero() {
s.keyTime[id] = opts.SetModTime
} else {
s.keyTime[id] = s.timeNow()
}
var b bytes.Buffer
data.WriteTo(&b)
s.totalBytes -= int64(len(s.data[id]))
s.data[id] = b.Bytes()
s.totalBytes += int64(len(s.data[id]))
if opts.GetModTime != nil {
*opts.GetModTime = s.keyTime[id]
@@ -113,6 +137,7 @@ func (s *mapStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.totalBytes -= int64(len(s.data[id]))
delete(s.data, id)
delete(s.keyTime, id)
@@ -184,6 +209,12 @@ func (s *mapStorage) DisplayName() string {
// NewMapStorage returns an implementation of Storage backed by the contents of given map.
// Used primarily for testing.
func NewMapStorage(data DataMap, keyTime map[blob.ID]time.Time, timeNow func() time.Time) blob.Storage {
return NewMapStorageWithLimit(data, keyTime, timeNow, -1)
}
// NewMapStorageWithLimit returns an implementation of Storage backed by the contents of given map.
// Used primarily for testing.
func NewMapStorageWithLimit(data DataMap, keyTime map[blob.ID]time.Time, timeNow func() time.Time, limit int64) blob.Storage {
if keyTime == nil {
keyTime = make(map[blob.ID]time.Time)
}
@@ -192,5 +223,11 @@ func NewMapStorage(data DataMap, keyTime map[blob.ID]time.Time, timeNow func() t
timeNow = clock.Now
}
return &mapStorage{data: data, keyTime: keyTime, timeNow: timeNow}
totalBytes := int64(0)
for _, v := range data {
totalBytes += int64(len(v))
}
return &mapStorage{data: data, keyTime: keyTime, timeNow: timeNow, limit: limit, totalBytes: totalBytes}
}

View File

@@ -3,6 +3,9 @@
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
)
@@ -17,3 +20,46 @@ func TestMapStorage(t *testing.T) {
VerifyStorage(testlogging.Context(t), t, r, blob.PutOptions{})
}
func TestMapStorageWithLimit(t *testing.T) {
ctx := testlogging.Context(t)
data := DataMap{}
r := NewMapStorageWithLimit(data, nil, nil, 10)
verifyCapacityAndFreeSpace(t, r, 10, 10)
require.NoError(t, r.PutBlob(ctx, "foo", gather.FromSlice([]byte("foo")), blob.PutOptions{}))
verifyCapacityAndFreeSpace(t, r, 10, 7)
require.NoError(t, r.PutBlob(ctx, "bar", gather.FromSlice([]byte("bar")), blob.PutOptions{}))
verifyCapacityAndFreeSpace(t, r, 10, 4)
require.NoError(t, r.PutBlob(ctx, "baz", gather.FromSlice([]byte("baz")), blob.PutOptions{}))
verifyCapacityAndFreeSpace(t, r, 10, 1)
// we're at 9/10 bytes, can't add 3 more
require.ErrorContains(t, r.PutBlob(ctx, "qux", gather.FromSlice([]byte("qux")), blob.PutOptions{}), "exceeded limit")
// remove 3 bytes
require.NoError(t, r.DeleteBlob(ctx, "baz"))
verifyCapacityAndFreeSpace(t, r, 10, 4)
// can add 4 bytes again
require.NoError(t, r.PutBlob(ctx, "qux", gather.FromSlice([]byte("qux1")), blob.PutOptions{}))
verifyCapacityAndFreeSpace(t, r, 10, 0)
// can't add any more bytes since we're at 10/10 bytes
require.ErrorContains(t, r.PutBlob(ctx, "aaa", gather.FromSlice([]byte("1")), blob.PutOptions{}), "exceeded limit")
// adding zero bytes won't fail in this situation.
require.NoError(t, r.PutBlob(ctx, "bbb", gather.FromSlice([]byte{}), blob.PutOptions{}), "exceeded limit")
verifyCapacityAndFreeSpace(t, r, 10, 0)
r = NewMapStorageWithLimit(DataMap{
"foo": []byte("foo"),
}, nil, nil, 20)
verifyCapacityAndFreeSpace(t, r, 20, 17)
}
func verifyCapacityAndFreeSpace(t *testing.T, r blob.Storage, wantSize, wantFree int64) {
t.Helper()
c, err := r.GetCapacity(testlogging.Context(t))
require.NoError(t, err)
require.Equal(t, uint64(wantSize), c.SizeB)
require.Equal(t, uint64(wantFree), c.FreeB)
}

View File

@@ -62,10 +62,8 @@ func (c *contentCacheImpl) GetContent(ctx context.Context, contentID string, blo
}
func (c *contentCacheImpl) getContentFromFullBlob(ctx context.Context, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error {
// acquire exclusive lock
mut := c.pc.GetFetchingMutex(blobID)
mut.Lock()
defer mut.Unlock()
c.pc.exclusiveLock(string(blobID))
defer c.pc.exclusiveUnlock(string(blobID))
// check again to see if we perhaps lost the race and the data is now in cache.
if c.pc.GetPartial(ctx, BlobIDCacheKey(blobID), offset, length, output) {
@@ -114,9 +112,8 @@ func (c *contentCacheImpl) fetchBlobInternal(ctx context.Context, blobID blob.ID
func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context, contentID string, blobID blob.ID, offset, length int64, output *gather.WriteBuffer) error {
// acquire shared lock on a blob, PrefetchBlob will acquire exclusive lock here.
mut := c.pc.GetFetchingMutex(blobID)
mut.RLock()
defer mut.RUnlock()
c.pc.sharedLock(string(blobID))
defer c.pc.sharedUnlock(string(blobID))
// see if we have the full blob cached by extracting a partial range.
if c.pc.GetPartial(ctx, BlobIDCacheKey(blobID), offset, length, output) {
@@ -124,9 +121,8 @@ func (c *contentCacheImpl) getContentFromFullOrPartialBlob(ctx context.Context,
}
// acquire exclusive lock on the content
mut2 := c.pc.GetFetchingMutex(blob.ID(contentID))
mut2.Lock()
defer mut2.Unlock()
c.pc.exclusiveLock(contentID)
defer c.pc.exclusiveUnlock(contentID)
output.Reset()
@@ -161,9 +157,8 @@ func (c *contentCacheImpl) PrefetchBlob(ctx context.Context, blobID blob.ID) err
}
// acquire exclusive lock for the blob.
mut := c.pc.GetFetchingMutex(blobID)
mut.Lock()
defer mut.Unlock()
c.pc.exclusiveLock(string(blobID))
defer c.pc.exclusiveUnlock(string(blobID))
if c.pc.GetPartial(ctx, BlobIDCacheKey(blobID), 0, 1, &blobData) {
return nil

View File

@@ -11,6 +11,7 @@
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/cache"
@@ -33,7 +34,64 @@ func newUnderlyingStorageForContentCacheTesting(t *testing.T) blob.Storage {
return st
}
func TestCacheExpiration(t *testing.T) {
func TestCacheExpiration_SoftLimitNoMinAge(t *testing.T) {
// cache is 10k, each blob is 4k, so we can store 2 blobs before they are evicted.
wantEvicted := []blob.ID{"a", "b"}
verifyCacheExpiration(t, cache.SweepSettings{
MaxSizeBytes: 10000,
TouchThreshold: -1,
}, wantEvicted)
}
func TestCacheExpiration_SoftLimitWithMinAge(t *testing.T) {
// cache is 10k, each blob is 4k, cache will grow beyond the limit but will not evict anything.
verifyCacheExpiration(t, cache.SweepSettings{
MaxSizeBytes: 10000,
TouchThreshold: -1,
MinSweepAge: time.Hour,
}, nil)
}
func TestCacheExpiration_HardLimitWithMinAge(t *testing.T) {
// cache is 10k, each blob is 4k, cache will grow beyond the limit but will not evict anything.
wantEvicted := []blob.ID{"a", "b"}
verifyCacheExpiration(t, cache.SweepSettings{
MaxSizeBytes: 10000,
TouchThreshold: -1,
MinSweepAge: time.Hour,
LimitBytes: 10000,
}, wantEvicted)
}
func TestCacheExpiration_HardLimitAboveSoftLimit(t *testing.T) {
wantExpired := []blob.ID{"a"}
verifyCacheExpiration(t, cache.SweepSettings{
MaxSizeBytes: 10000,
TouchThreshold: -1,
MinSweepAge: time.Hour,
LimitBytes: 13000,
}, wantExpired)
}
func TestCacheExpiration_HardLimitBelowSoftLimit(t *testing.T) {
wantExpired := []blob.ID{"a", "b", "c"}
verifyCacheExpiration(t, cache.SweepSettings{
MaxSizeBytes: 10000,
TouchThreshold: -1,
MinSweepAge: time.Hour,
LimitBytes: 5000,
}, wantExpired)
}
// The test will fetch 4 items into the cache, named "a", "b", "c", "d", each 4000 bytes in size
// verify that the cache is evicting correct items based on the sweep settings.
//
//nolint:thelper
func verifyCacheExpiration(t *testing.T, sweepSettings cache.SweepSettings, wantEvicted []blob.ID) {
cacheData := blobtesting.DataMap{}
// on Windows, the time does not always move forward (sometimes clock.Now() returns exactly the same value for consecutive invocations)
@@ -57,10 +115,7 @@ func TestCacheExpiration(t *testing.T) {
ctx := testlogging.Context(t)
cc, err := cache.NewContentCache(ctx, underlyingStorage, cache.Options{
Storage: cacheStorage.(cache.Storage),
Sweep: cache.SweepSettings{
MaxSizeBytes: 10000,
TouchThreshold: -1,
},
Sweep: sweepSettings,
TimeNow: movingTimeFunc,
}, nil)
@@ -71,34 +126,26 @@ func TestCacheExpiration(t *testing.T) {
var tmp gather.WriteBuffer
defer tmp.Close()
err = cc.GetContent(ctx, "00000a", "content-4k", 0, -1, &tmp) // 4k
const underlyingBlobID = "content-4k"
err = cc.GetContent(ctx, "a", underlyingBlobID, 0, -1, &tmp) // 4k
require.NoError(t, err)
err = cc.GetContent(ctx, "00000b", "content-4k", 0, -1, &tmp) // 4k
err = cc.GetContent(ctx, "b", underlyingBlobID, 0, -1, &tmp) // 4k
require.NoError(t, err)
err = cc.GetContent(ctx, "00000c", "content-4k", 0, -1, &tmp) // 4k
err = cc.GetContent(ctx, "c", underlyingBlobID, 0, -1, &tmp) // 4k
require.NoError(t, err)
err = cc.GetContent(ctx, "00000d", "content-4k", 0, -1, &tmp) // 4k
err = cc.GetContent(ctx, "d", underlyingBlobID, 0, -1, &tmp) // 4k
require.NoError(t, err)
// 00000a and 00000b will be removed from cache because it's the oldest.
// to verify, let's remove content-4k from the underlying storage and make sure we can still read
// 00000c and 00000d from the cache but not 00000a nor 00000b
require.NoError(t, underlyingStorage.DeleteBlob(ctx, "content-4k"))
// delete underlying storage blob to identify cache items that have been evicted
// all other items will be fetched from the cache.
require.NoError(t, underlyingStorage.DeleteBlob(ctx, underlyingBlobID))
cases := []struct {
contentID string
expectedError error
}{
{"00000a", blob.ErrBlobNotFound},
{"00000b", blob.ErrBlobNotFound},
{"00000c", nil},
{"00000d", nil},
}
for _, tc := range cases {
got := cc.GetContent(ctx, tc.contentID, "content-4k", 0, -1, &tmp)
if assert.ErrorIs(t, got, tc.expectedError, "tc.contentID: %v", tc.contentID) {
t.Logf("got correct error %v when reading content %v", tc.expectedError, tc.contentID)
for _, blobID := range []blob.ID{"a", "b", "c", "d"} {
if slices.Contains(wantEvicted, blobID) {
require.ErrorIs(t, cc.GetContent(ctx, string(blobID), underlyingBlobID, 0, -1, &tmp), blob.ErrBlobNotFound, "expected item not found %v", blobID)
} else {
require.NoError(t, cc.GetContent(ctx, string(blobID), underlyingBlobID, 0, -1, &tmp), "expected item to be found %v", blobID)
}
}
}

95
internal/cache/mutex_map.go vendored Normal file
View File

@@ -0,0 +1,95 @@
package cache
import (
"sync"
)
// manages a map of RWMutexes indexed by string keys
// mutexes are allocated on demand and released when no longer needed.
type mutexMap struct {
mu sync.Mutex
// +checklocks:mu
entries map[string]*mutexMapEntry
}
type mutexMapEntry struct {
mut *sync.RWMutex
refCount int
}
// +checklocksignore.
func (m *mutexMap) exclusiveLock(key string) {
m.getMutexAndAddRef(key).Lock()
}
func (m *mutexMap) tryExclusiveLock(key string) bool {
if !m.getMutexAndAddRef(key).TryLock() { // +checklocksignore
m.getMutexAndReleaseRef(key)
return false
}
return true
}
func (m *mutexMap) exclusiveUnlock(key string) {
m.getMutexAndReleaseRef(key).Unlock() // +checklocksignore
}
// +checklocksignore.
func (m *mutexMap) sharedLock(key string) {
m.getMutexAndAddRef(key).RLock()
}
func (m *mutexMap) trySharedLock(key string) bool {
if !m.getMutexAndAddRef(key).TryRLock() { // +checklocksignore
m.getMutexAndReleaseRef(key)
return false
}
return true
}
func (m *mutexMap) sharedUnlock(key string) {
m.getMutexAndReleaseRef(key).RUnlock() // +checklocksignore
}
func (m *mutexMap) getMutexAndAddRef(key string) *sync.RWMutex {
m.mu.Lock()
defer m.mu.Unlock()
ent := m.entries[key]
if ent == nil {
if m.entries == nil {
m.entries = make(map[string]*mutexMapEntry)
}
ent = &mutexMapEntry{
mut: &sync.RWMutex{},
}
m.entries[key] = ent
}
ent.refCount++
return ent.mut
}
func (m *mutexMap) getMutexAndReleaseRef(key string) *sync.RWMutex {
m.mu.Lock()
defer m.mu.Unlock()
if m.entries == nil {
panic("attempted to call unlock without a lock")
}
ent := m.entries[key]
ent.refCount--
if ent.refCount == 0 {
delete(m.entries, key)
}
return ent.mut
}

50
internal/cache/mutex_map_test.go vendored Normal file
View File

@@ -0,0 +1,50 @@
package cache
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMutexMap_ExclusiveLock(t *testing.T) {
var m mutexMap
require.Len(t, m.entries, 0)
m.exclusiveLock("foo")
require.Len(t, m.entries, 1)
require.False(t, m.tryExclusiveLock("foo"))
require.True(t, m.tryExclusiveLock("bar"))
require.False(t, m.trySharedLock("bar"))
require.Len(t, m.entries, 2)
m.exclusiveUnlock("foo")
require.Len(t, m.entries, 1)
require.True(t, m.tryExclusiveLock("foo"))
require.Len(t, m.entries, 2)
m.exclusiveUnlock("foo")
require.Len(t, m.entries, 1)
m.exclusiveUnlock("bar")
require.Len(t, m.entries, 0)
}
func TestMutexMap_SharedLock(t *testing.T) {
var m mutexMap
require.Len(t, m.entries, 0)
m.sharedLock("foo")
require.Len(t, m.entries, 1)
m.sharedLock("foo")
require.Len(t, m.entries, 1)
require.True(t, m.trySharedLock("foo"))
require.Len(t, m.entries, 1)
// exclusive lock can't be acquired while shared lock is held
require.False(t, m.tryExclusiveLock("foo"))
m.sharedUnlock("foo")
require.False(t, m.tryExclusiveLock("foo"))
m.sharedUnlock("foo")
require.False(t, m.tryExclusiveLock("foo"))
m.sharedUnlock("foo")
// now exclusive lock can be acquired
require.True(t, m.tryExclusiveLock("foo"))
}

View File

@@ -29,9 +29,13 @@
// PersistentCache provides persistent on-disk cache.
type PersistentCache struct {
fetchMutexes mutexMap
listCacheMutex sync.Mutex
// +checklocks:listCacheMutex
listCache contentMetadataHeap
// +checklocks:listCacheMutex
pendingWriteBytes int64
cacheStorage Storage
storageProtection cacheprot.StorageProtection
@@ -51,27 +55,6 @@ func (c *PersistentCache) CacheStorage() Storage {
return c.cacheStorage
}
// GetFetchingMutex returns a RWMutex used to lock a blob or content during loading.
func (c *PersistentCache) GetFetchingMutex(id blob.ID) *sync.RWMutex {
if c == nil {
// special case - also works on non-initialized cache pointer.
return &sync.RWMutex{}
}
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
if _, entry := c.listCache.LookupByID(id); entry != nil {
return &entry.contentDownloadMutex
}
heap.Push(&c.listCache, blob.Metadata{BlobID: id})
_, entry := c.listCache.LookupByID(id)
return &entry.contentDownloadMutex
}
// GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function.
// The function also appends and verifies HMAC checksums using provided secret on all cached items to ensure data integrity.
func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func(output *gather.WriteBuffer) error, output *gather.WriteBuffer) error {
@@ -86,9 +69,8 @@ func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func(
output.Reset()
mut := c.GetFetchingMutex(blob.ID(key))
mut.Lock()
defer mut.Unlock()
c.exclusiveLock(key)
defer c.exclusiveUnlock(key)
// check again while holding the mutex
if c.GetFull(ctx, key, output) {
@@ -117,36 +99,12 @@ func (c *PersistentCache) getPartialCacheHit(ctx context.Context, key string, le
// cache hit
c.reportHitBytes(int64(output.Length()))
// cache hit
mtime, err := c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold)
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
// Touching the blobs when cache is full can lead to cache never
// getting cleaned up if all the blobs fall under MinSweepAge.
//
// This can happen when the user is restoring large files (at
// comparable sizes to the cache size limitation) and MinSweepAge is
// sufficiently large. For large files which span over multiple
// blobs, every blob becomes least-recently-used.
//
// So, we'll avoid this until our cache usage drops to acceptable
// limits.
if c.isCacheFullLocked() {
c.listCacheCleanupLocked(ctx)
if c.isCacheFullLocked() {
return
}
}
// unlock for the expensive operation
c.listCacheMutex.Unlock()
mtime, err := c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.sweep.TouchThreshold)
c.listCacheMutex.Lock()
if err == nil {
// insert or update the metadata
heap.Push(&c.listCache, blob.Metadata{
c.listCache.AddOrUpdate(blob.Metadata{
BlobID: blob.ID(key),
Length: length,
Timestamp: mtime,
@@ -154,18 +112,17 @@ func (c *PersistentCache) getPartialCacheHit(ctx context.Context, key string, le
}
}
func (c *PersistentCache) getPartialDeleteInvalidBlob(ctx context.Context, key string) {
// delete invalid blob
c.reportMalformedData()
func (c *PersistentCache) deleteInvalidBlob(ctx context.Context, key string) {
if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Errorf("unable to delete %v entry %v: %v", c.description, key, err)
} else {
c.listCacheMutex.Lock()
if i, entry := c.listCache.LookupByID(blob.ID(key)); entry != nil {
heap.Remove(&c.listCache, i)
}
c.listCacheMutex.Unlock()
return
}
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
if i, ok := c.listCache.index[blob.ID(key)]; ok {
heap.Remove(&c.listCache, i)
}
}
@@ -180,19 +137,21 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le
defer tmp.Close()
if err := c.cacheStorage.GetBlob(ctx, blob.ID(key), offset, length, &tmp); err == nil {
prot := c.storageProtection
sp := c.storageProtection
if length >= 0 {
// only full items have protection.
prot = cacheprot.NoProtection()
// do not perform integrity check on partial reads
sp = cacheprot.NoProtection()
}
if err := prot.Verify(key, tmp.Bytes(), output); err == nil {
if err := sp.Verify(key, tmp.Bytes(), output); err == nil {
c.getPartialCacheHit(ctx, key, length, output)
return true
}
c.getPartialDeleteInvalidBlob(ctx, key)
c.reportMalformedData()
c.deleteInvalidBlob(ctx, key)
}
// cache miss
@@ -206,49 +165,34 @@ func (c *PersistentCache) GetPartial(ctx context.Context, key string, offset, le
return false
}
// +checklocks:c.listCacheMutex
func (c *PersistentCache) isCacheFullLocked() bool {
return c.listCache.DataSize() > c.sweep.MaxSizeBytes
}
// Put adds the provided key-value pair to the cache.
func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes) {
if c == nil {
return
}
var (
protected gather.WriteBuffer
mtime time.Time
)
defer protected.Close()
c.listCacheMutex.Lock()
defer c.listCacheMutex.Unlock()
// opportunistically cleanup cache before the PUT if we can
if c.isCacheFullLocked() {
c.listCacheCleanupLocked(ctx)
// Do not add more things to cache if it remains full after cleanup. We
// MUST NOT go over the specified limit for the cache space to avoid
// snapshots/restores from getting affected by the cache's storage use.
if c.isCacheFullLocked() {
// Limit warnings to one per minute max.
if clock.Now().Sub(c.lastCacheWarning) > 10*time.Minute {
c.lastCacheWarning = clock.Now()
log(ctx).Warnf("Cache is full, unable to add item into '%s' cache.", c.description)
}
return
}
}
// make sure the cache has enough room for the new item including any protection overhead.
l := data.Length() + c.storageProtection.OverheadBytes()
c.pendingWriteBytes += int64(l)
c.sweepLocked(ctx)
// LOCK RELEASED for expensive operations
c.listCacheMutex.Unlock()
var protected gather.WriteBuffer
defer protected.Close()
c.storageProtection.Protect(key, data, &protected)
if protected.Length() != l {
log(ctx).Panicf("protection overhead mismatch, assumed %v got %v", l, protected.Length())
}
var mtime time.Time
if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), protected.Bytes(), blob.PutOptions{GetModTime: &mtime}); err != nil {
c.reportStoreError()
@@ -258,13 +202,12 @@ func (c *PersistentCache) Put(ctx context.Context, key string, data gather.Bytes
c.listCacheMutex.Lock()
// LOCK RE-ACQUIRED
c.listCache.Push(blob.Metadata{
c.pendingWriteBytes -= int64(protected.Length())
c.listCache.AddOrUpdate(blob.Metadata{
BlobID: blob.ID(key),
Length: int64(protected.Bytes().Length()),
Timestamp: mtime,
})
c.listCacheCleanupLocked(ctx)
}
// Close closes the instance of persistent cache possibly waiting for at least one sweep to complete.
@@ -276,16 +219,11 @@ func (c *PersistentCache) Close(ctx context.Context) {
releasable.Released("persistent-cache", c)
}
type blobCacheEntry struct {
metadata blob.Metadata
contentDownloadMutex sync.RWMutex
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap struct {
data []*blobCacheEntry
index map[blob.ID]int
dataSize int64
data []blob.Metadata
index map[blob.ID]int
totalDataBytes int64
}
func newContentMetadataHeap() contentMetadataHeap {
@@ -295,86 +233,92 @@ func newContentMetadataHeap() contentMetadataHeap {
func (h contentMetadataHeap) Len() int { return len(h.data) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h.data[i].metadata.Timestamp.Before(h.data[j].metadata.Timestamp)
return h.data[i].Timestamp.Before(h.data[j].Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h.index[h.data[i].metadata.BlobID], h.index[h.data[j].metadata.BlobID] = h.index[h.data[j].metadata.BlobID], h.index[h.data[i].metadata.BlobID]
iBlobID := h.data[i].BlobID
jBlobID := h.data[j].BlobID
h.index[iBlobID], h.index[jBlobID] = h.index[jBlobID], h.index[iBlobID]
h.data[i], h.data[j] = h.data[j], h.data[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
func (h *contentMetadataHeap) Push(x any) {
bm := x.(blob.Metadata) //nolint:forcetypeassert
h.index[bm.BlobID] = len(h.data)
h.data = append(h.data, bm)
h.totalDataBytes += bm.Length
}
func (h *contentMetadataHeap) AddOrUpdate(bm blob.Metadata) {
if i, exists := h.index[bm.BlobID]; exists {
// only accept newer timestamps
if h.data[i].metadata.Timestamp.IsZero() || bm.Timestamp.After(h.data[i].metadata.Timestamp) {
h.dataSize += bm.Length - h.data[i].metadata.Length
h.data[i] = &blobCacheEntry{metadata: bm}
if bm.Timestamp.After(h.data[i].Timestamp) {
h.totalDataBytes += bm.Length - h.data[i].Length
h.data[i] = bm
heap.Fix(h, i)
}
} else {
h.index[bm.BlobID] = len(h.data)
h.data = append(h.data, &blobCacheEntry{metadata: bm})
h.dataSize += bm.Length
heap.Push(h, bm)
}
}
func (h *contentMetadataHeap) Pop() interface{} {
func (h *contentMetadataHeap) Pop() any {
old := h.data
n := len(old)
item := old[n-1]
h.data = old[0 : n-1]
h.dataSize -= item.metadata.Length
delete(h.index, item.metadata.BlobID)
h.totalDataBytes -= item.Length
delete(h.index, item.BlobID)
return item.metadata
return item
}
func (h *contentMetadataHeap) LookupByID(id blob.ID) (int, *blobCacheEntry) {
i, ok := h.index[id]
if !ok {
return -1, nil
}
return i, h.data[i]
}
func (h contentMetadataHeap) DataSize() int64 { return h.dataSize }
// +checklocks:c.listCacheMutex
func (c *PersistentCache) listCacheCleanupLocked(ctx context.Context) {
func (c *PersistentCache) aboveSoftLimit(extraBytes int64) bool {
return c.listCache.totalDataBytes+extraBytes+c.pendingWriteBytes > c.sweep.MaxSizeBytes
}
// +checklocks:c.listCacheMutex
func (c *PersistentCache) aboveHardLimit(extraBytes int64) bool {
if c.sweep.LimitBytes <= 0 {
return false
}
return c.listCache.totalDataBytes+extraBytes+c.pendingWriteBytes > c.sweep.LimitBytes
}
// +checklocks:c.listCacheMutex
func (c *PersistentCache) sweepLocked(ctx context.Context) {
var (
unsuccessfulDeletes []blob.Metadata
unsuccessfulDeletesSize int64
unsuccessfulDeleteBytes int64
now = c.timeNow()
)
// if there are blobs pending to be deleted ...
for c.listCache.DataSize() > 0 &&
// ... and everything including what we couldn't delete is still bigger than the threshold
(c.listCache.DataSize()+unsuccessfulDeletesSize) > c.sweep.MaxSizeBytes {
oldest := heap.Pop(&c.listCache).(blob.Metadata) //nolint:forcetypeassert
for len(c.listCache.data) > 0 && (c.aboveSoftLimit(unsuccessfulDeleteBytes) || c.aboveHardLimit(unsuccessfulDeleteBytes)) {
// examine the oldest cache item without removing it from the heap.
oldest := c.listCache.data[0]
// stop here if the oldest item is below the specified minimal age
if age := now.Sub(oldest.Timestamp); age < c.sweep.MinSweepAge {
heap.Push(&c.listCache, oldest)
if age := now.Sub(oldest.Timestamp); age < c.sweep.MinSweepAge && !c.aboveHardLimit(unsuccessfulDeleteBytes) {
// the oldest item is below the specified minimal sweep age and we're below the hard limit, stop here
break
}
// unlock before the expensive operation
c.listCacheMutex.Unlock()
delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID)
c.listCacheMutex.Lock()
heap.Pop(&c.listCache)
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Warnw("unable to remove cache item", "cache", c.description, "item", oldest.BlobID, "err", delerr)
if delerr != nil {
log(ctx).Errorf("unable to remove %v: %v", oldest.BlobID, delerr)
// accumulate unsuccessful deletes to be pushed back into the heap
// later so we do not attempt deleting the same blob multiple times
//
// after this we keep draining from the heap until we bring down
// c.listCache.DataSize() to zero
unsuccessfulDeletes = append(unsuccessfulDeletes, oldest)
unsuccessfulDeletesSize += oldest.Length
unsuccessfulDeleteBytes += oldest.Length
}
}
@@ -411,9 +355,7 @@ func (c *PersistentCache) initialScan(ctx context.Context) error {
return errors.Wrapf(err, "error listing %v", c.description)
}
if c.isCacheFullLocked() {
c.listCacheCleanupLocked(ctx)
}
c.sweepLocked(ctx)
dur := timer.Elapsed()
@@ -422,27 +364,61 @@ func (c *PersistentCache) initialScan(ctx context.Context) error {
inUsePercent := int64(hundredPercent)
if c.sweep.MaxSizeBytes != 0 {
inUsePercent = hundredPercent * c.listCache.DataSize() / c.sweep.MaxSizeBytes
inUsePercent = hundredPercent * c.listCache.totalDataBytes / c.sweep.MaxSizeBytes
}
log(ctx).Debugw(
"finished initial cache scan",
"cache", c.description,
"duration", dur,
"totalRetainedSize", c.listCache.DataSize(),
"totalRetainedSize", c.listCache.totalDataBytes,
"tooRecentBytes", tooRecentBytes,
"tooRecentCount", tooRecentCount,
"maxSizeBytes", c.sweep.MaxSizeBytes,
"limitBytes", c.sweep.LimitBytes,
"inUsePercent", inUsePercent,
)
return nil
}
func (c *PersistentCache) exclusiveLock(key string) {
if c != nil {
c.fetchMutexes.exclusiveLock(key)
}
}
func (c *PersistentCache) exclusiveUnlock(key string) {
if c != nil {
c.fetchMutexes.exclusiveUnlock(key)
}
}
func (c *PersistentCache) sharedLock(key string) {
if c != nil {
c.fetchMutexes.sharedLock(key)
}
}
func (c *PersistentCache) sharedUnlock(key string) {
if c != nil {
c.fetchMutexes.sharedUnlock(key)
}
}
// SweepSettings encapsulates settings that impact cache item sweep/expiration.
type SweepSettings struct {
MaxSizeBytes int64
MinSweepAge time.Duration
// soft limit, the cache will be limited to this size, except for items newer than MinSweepAge.
MaxSizeBytes int64
// hard limit, if non-zero the cache will be limited to this size, regardless of MinSweepAge.
LimitBytes int64
// items older than this will never be removed from the cache except when the cache is above
// HardMaxSizeBytes.
MinSweepAge time.Duration
// on each use, items will be touched if they have not been touched in this long.
TouchThreshold time.Duration
}

View File

@@ -25,7 +25,7 @@ func TestPersistentLRUCache(t *testing.T) {
const maxSizeBytes = 1000
cs := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil).(cache.Storage)
cs := blobtesting.NewMapStorageWithLimit(blobtesting.DataMap{}, nil, nil, maxSizeBytes).(cache.Storage)
pc, err := cache.NewPersistentCache(ctx, "testing", cs, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
MaxSizeBytes: maxSizeBytes,
@@ -148,11 +148,13 @@ func TestPersistentLRUCache_GetDeletesInvalidBlob(t *testing.T) {
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
const maxSizeBytes = 1000
st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes)
fs := blobtesting.NewFaultyStorage(st)
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{MaxSizeBytes: 100}, nil, clock.Now)
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{MaxSizeBytes: maxSizeBytes}, nil, clock.Now)
require.NoError(t, err)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
@@ -204,17 +206,19 @@ func TestPersistentLRUCache_SweepMinSweepAge(t *testing.T) {
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
const maxSizeBytes = 1000
st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes)
fs := blobtesting.NewFaultyStorage(st)
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
MaxSizeBytes: 1000,
MaxSizeBytes: maxSizeBytes,
MinSweepAge: 10 * time.Second,
}, nil, clock.Now)
require.NoError(t, err)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6)))
pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 10)))
time.Sleep(1 * time.Second)
// simulate error during final sweep
@@ -232,12 +236,14 @@ func TestPersistentLRUCache_SweepIgnoresErrors(t *testing.T) {
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
const maxSizeBytes = 1000
st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes)
fs := blobtesting.NewFaultyStorage(st)
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
MaxSizeBytes: 1000,
MaxSizeBytes: maxSizeBytes,
}, nil, clock.Now)
require.NoError(t, err)
@@ -245,7 +251,7 @@ func TestPersistentLRUCache_SweepIgnoresErrors(t *testing.T) {
fs.AddFault(blobtesting.MethodDeleteBlob).ErrorInstead(errors.Errorf("some delete error")).Repeat(1e6)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 1e6)))
pc.Put(ctx, "key2", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3}, 10)))
time.Sleep(500 * time.Millisecond)
// simulate error during sweep
@@ -264,12 +270,14 @@ func TestPersistentLRUCache_Sweep1(t *testing.T) {
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
const maxSizeBytes = 1
st := blobtesting.NewMapStorageWithLimit(data, nil, nil, maxSizeBytes)
fs := blobtesting.NewFaultyStorage(st)
fc := faultyCache{fs}
pc, err := cache.NewPersistentCache(ctx, "test", fc, cacheprot.ChecksumProtection([]byte{1, 2, 3}), cache.SweepSettings{
MaxSizeBytes: 1,
MaxSizeBytes: maxSizeBytes,
MinSweepAge: 0 * time.Second,
}, nil, clock.Now)
require.NoError(t, err)
@@ -291,13 +299,6 @@ func TestPersistentLRUCacheNil(t *testing.T) {
pc.Close(ctx)
pc.Put(ctx, "key", gather.FromSlice([]byte{1, 2, 3}))
m1 := pc.GetFetchingMutex("dummy")
m2 := pc.GetFetchingMutex("dummy")
require.NotNil(t, m1)
require.NotNil(t, m2)
require.NotSame(t, m1, m2)
var tmp gather.WriteBuffer
require.False(t, pc.GetFull(ctx, "key", &tmp))

View File

@@ -19,6 +19,7 @@
type StorageProtection interface {
Protect(id string, input gather.Bytes, output *gather.WriteBuffer)
Verify(id string, input gather.Bytes, output *gather.WriteBuffer) error
OverheadBytes() int
}
type nullStorageProtection struct{}
@@ -35,6 +36,10 @@ func (nullStorageProtection) Verify(_ string, input gather.Bytes, output *gather
return nil
}
func (nullStorageProtection) OverheadBytes() int {
return 0
}
// NoProtection returns implementation of StorageProtection that offers no protection.
func NoProtection() StorageProtection {
return nullStorageProtection{}
@@ -55,6 +60,10 @@ func (p checksumProtection) Verify(_ string, input gather.Bytes, output *gather.
return hmac.VerifyAndStrip(input, p.Secret, output)
}
func (p checksumProtection) OverheadBytes() int {
return sha256.Size
}
// ChecksumProtection returns StorageProtection that protects cached data using HMAC checksums without encryption.
func ChecksumProtection(key []byte) StorageProtection {
return checksumProtection{key}
@@ -85,6 +94,10 @@ func (p authenticatedEncryptionProtection) Verify(id string, input gather.Bytes,
return nil
}
func (p authenticatedEncryptionProtection) OverheadBytes() int {
return p.e.Overhead()
}
type authenticatedEncryptionProtectionKey []byte
func (k authenticatedEncryptionProtectionKey) GetEncryptionAlgorithm() string {

View File

@@ -48,8 +48,8 @@ func testServer(t *testing.T, disableGRPC bool) {
Username: servertesting.TestUsername,
Hostname: servertesting.TestHostname,
}, content.CachingOptions{
CacheDirectory: testutil.TempDirectory(t),
MaxCacheSizeBytes: maxCacheSizeBytes,
CacheDirectory: testutil.TempDirectory(t),
ContentCacheSizeBytes: maxCacheSizeBytes,
}, servertesting.TestPassword, &repo.Options{})
// cancel immediately to ensure we did not spawn goroutines that depend on ctx inside

View File

@@ -39,7 +39,7 @@ func SetCachingOptions(ctx context.Context, configFile string, opt *content.Cach
func setupCachingOptionsWithDefaults(ctx context.Context, configPath string, lc *LocalConfig, opt *content.CachingOptions, uniqueID []byte) error {
opt = opt.CloneOrDefault()
if opt.MaxCacheSizeBytes == 0 {
if opt.ContentCacheSizeBytes == 0 {
lc.Caching = &content.CachingOptions{}
return nil
}
@@ -67,14 +67,16 @@ func setupCachingOptionsWithDefaults(ctx context.Context, configPath string, lc
lc.Caching.CacheDirectory = d
}
lc.Caching.MaxCacheSizeBytes = opt.MaxCacheSizeBytes
lc.Caching.MaxMetadataCacheSizeBytes = opt.MaxMetadataCacheSizeBytes
lc.Caching.ContentCacheSizeBytes = opt.ContentCacheSizeBytes
lc.Caching.ContentCacheSizeLimitBytes = opt.ContentCacheSizeLimitBytes
lc.Caching.MetadataCacheSizeBytes = opt.MetadataCacheSizeBytes
lc.Caching.MetadataCacheSizeLimitBytes = opt.MetadataCacheSizeLimitBytes
lc.Caching.MaxListCacheDuration = opt.MaxListCacheDuration
lc.Caching.MinContentSweepAge = opt.MinContentSweepAge
lc.Caching.MinMetadataSweepAge = opt.MinMetadataSweepAge
lc.Caching.MinIndexSweepAge = opt.MinIndexSweepAge
log(ctx).Debugf("Creating cache directory '%v' with max size %v", lc.Caching.CacheDirectory, lc.Caching.MaxCacheSizeBytes)
log(ctx).Debugf("Creating cache directory '%v' with max size %v", lc.Caching.CacheDirectory, lc.Caching.ContentCacheSizeBytes)
return nil
}

View File

@@ -19,14 +19,26 @@ func (s DurationSeconds) DurationOrDefault(def time.Duration) time.Duration {
// CachingOptions specifies configuration of local cache.
type CachingOptions struct {
CacheDirectory string `json:"cacheDirectory,omitempty"`
MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"`
MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"`
MaxListCacheDuration DurationSeconds `json:"maxListCacheDuration,omitempty"`
MinMetadataSweepAge DurationSeconds `json:"minMetadataSweepAge,omitempty"`
MinContentSweepAge DurationSeconds `json:"minContentSweepAge,omitempty"`
MinIndexSweepAge DurationSeconds `json:"minIndexSweepAge,omitempty"`
HMACSecret []byte `json:"-"`
CacheDirectory string `json:"cacheDirectory,omitempty"`
ContentCacheSizeBytes int64 `json:"maxCacheSize,omitempty"`
ContentCacheSizeLimitBytes int64 `json:"contentCacheSizeLimitBytes,omitempty"`
MetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"`
MetadataCacheSizeLimitBytes int64 `json:"metadataCacheSizeLimitBytes,omitempty"`
MaxListCacheDuration DurationSeconds `json:"maxListCacheDuration,omitempty"`
MinMetadataSweepAge DurationSeconds `json:"minMetadataSweepAge,omitempty"`
MinContentSweepAge DurationSeconds `json:"minContentSweepAge,omitempty"`
MinIndexSweepAge DurationSeconds `json:"minIndexSweepAge,omitempty"`
HMACSecret []byte `json:"-"`
}
// EffectiveMetadataCacheSizeBytes returns the effective metadata cache size.
func (c *CachingOptions) EffectiveMetadataCacheSizeBytes() int64 {
if c.MetadataCacheSizeBytes == 0 {
// legacy path, use the same size for both caches.
return c.ContentCacheSizeBytes
}
return c.MetadataCacheSizeBytes
}
// CloneOrDefault returns a clone of the caching options or empty options for nil.

View File

@@ -420,48 +420,61 @@ func (sm *SharedManager) namedLogger(n string) logging.Logger {
return sm.contextLogger
}
func contentCacheSweepSettings(caching *CachingOptions) cache.SweepSettings {
return cache.SweepSettings{
MaxSizeBytes: caching.ContentCacheSizeBytes,
LimitBytes: caching.ContentCacheSizeLimitBytes,
MinSweepAge: caching.MinContentSweepAge.DurationOrDefault(DefaultDataCacheSweepAge),
}
}
func metadataCacheSizeSweepSettings(caching *CachingOptions) cache.SweepSettings {
return cache.SweepSettings{
MaxSizeBytes: caching.EffectiveMetadataCacheSizeBytes(),
LimitBytes: caching.MetadataCacheSizeLimitBytes,
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
}
}
func indexBlobCacheSweepSettings(caching *CachingOptions) cache.SweepSettings {
return cache.SweepSettings{
MaxSizeBytes: caching.EffectiveMetadataCacheSizeBytes(),
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
}
}
func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *CachingOptions, mr *metrics.Registry) error {
dataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{
BaseCacheDirectory: caching.CacheDirectory,
CacheSubDir: "contents",
HMACSecret: caching.HMACSecret,
Sweep: cache.SweepSettings{
MaxSizeBytes: caching.MaxCacheSizeBytes,
MinSweepAge: caching.MinContentSweepAge.DurationOrDefault(DefaultDataCacheSweepAge),
},
Sweep: contentCacheSweepSettings(caching),
}, mr)
if err != nil {
return errors.Wrap(err, "unable to initialize content cache")
}
metadataCacheSize := caching.MaxMetadataCacheSizeBytes
if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 {
metadataCacheSize = caching.MaxCacheSizeBytes
}
metadataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{
BaseCacheDirectory: caching.CacheDirectory,
CacheSubDir: "metadata",
HMACSecret: caching.HMACSecret,
FetchFullBlobs: true,
Sweep: cache.SweepSettings{
MaxSizeBytes: metadataCacheSize,
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
},
Sweep: metadataCacheSizeSweepSettings(caching),
}, mr)
if err != nil {
return errors.Wrap(err, "unable to initialize metadata cache")
}
indexBlobStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "index-blobs")
indexBlobStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, caching.EffectiveMetadataCacheSizeBytes(), "index-blobs")
if err != nil {
return errors.Wrap(err, "unable to initialize index blob cache storage")
}
indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs", indexBlobStorage, cacheprot.ChecksumProtection(caching.HMACSecret), cache.SweepSettings{
MaxSizeBytes: metadataCacheSize,
MinSweepAge: caching.MinMetadataSweepAge.DurationOrDefault(DefaultMetadataCacheSweepAge),
}, mr, sm.timeNow)
indexBlobCache, err := cache.NewPersistentCache(ctx, "index-blobs",
indexBlobStorage,
cacheprot.ChecksumProtection(caching.HMACSecret),
indexBlobCacheSweepSettings(caching),
mr, sm.timeNow)
if err != nil {
return errors.Wrap(err, "unable to create index blob cache")
}

View File

@@ -2114,9 +2114,9 @@ func (s *contentManagerSuite) TestContentCachingByFormat(t *testing.T) {
// create two managers sharing cache directory
co := CachingOptions{
CacheDirectory: cd,
MaxCacheSizeBytes: 100e6,
MaxMetadataCacheSizeBytes: 100e6,
CacheDirectory: cd,
ContentCacheSizeBytes: 100e6,
MetadataCacheSizeBytes: 100e6,
}
compressibleData := gather.FromSlice(bytes.Repeat([]byte{1, 2, 3, 4}, 10000))
@@ -2164,9 +2164,9 @@ func (s *contentManagerSuite) TestPrefetchContent(t *testing.T) {
cd := testutil.TempDirectory(t)
bm := s.newTestContentManagerWithTweaks(t, st, &contentManagerTestTweaks{
CachingOptions: CachingOptions{
CacheDirectory: cd,
MaxCacheSizeBytes: 100e6,
MaxMetadataCacheSizeBytes: 100e6,
CacheDirectory: cd,
ContentCacheSizeBytes: 100e6,
MetadataCacheSizeBytes: 100e6,
},
maxPackSize: 20e6,
})

View File

@@ -133,7 +133,7 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string, mr *metrics.Registry, timeNow func() time.Time) (*cache.PersistentCache, error) {
opt = opt.CloneOrDefault()
cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.MaxCacheSizeBytes, "server-contents")
cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.ContentCacheSizeBytes, "server-contents")
if cs == nil {
// this may be (nil, nil) or (nil, err)
return nil, errors.Wrap(err, "error opening storage")
@@ -154,7 +154,8 @@ func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, pass
}
pc, err := cache.NewPersistentCache(ctx, "cache-storage", cs, prot, cache.SweepSettings{
MaxSizeBytes: opt.MaxCacheSizeBytes,
MaxSizeBytes: opt.ContentCacheSizeBytes,
LimitBytes: opt.ContentCacheSizeLimitBytes,
MinSweepAge: opt.MinContentSweepAge.DurationOrDefault(content.DefaultDataCacheSweepAge),
}, mr, timeNow)
if err != nil {

View File

@@ -232,8 +232,8 @@ func runStress(t *testing.T, opt *StressOptions) {
if err = repo.Connect(ctx, configFile, st, masterPassword, &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: filepath.Join(tmpPath, fmt.Sprintf("cache-%v", i)),
MaxCacheSizeBytes: 2000000000,
CacheDirectory: filepath.Join(tmpPath, fmt.Sprintf("cache-%v", i)),
ContentCacheSizeBytes: 2000000000,
},
}); err != nil {
t.Fatalf("unable to connect %v: %v", configFile, err)