content: added blob-level metadata cache (#421)

Unlike regular cache, which caches segments of blobs on a per-content
basis, metadata cache will fetch and store the entire metadata blob (q)
when any of the contents in it is accessed.

Given that there are relatively few metadata blobs compared to data (p)
blobs, this will reduce the traffic to the underlying store and improve
performance of Snapshot GC which only relies on metadata contents.
This commit is contained in:
Jarek Kowalski
2020-04-20 02:18:43 -07:00
committed by GitHub
parent d707f3615a
commit a462798b28
9 changed files with 515 additions and 250 deletions

19
cli/command_cache_sync.go Normal file
View File

@@ -0,0 +1,19 @@
package cli
import (
"context"
"github.com/kopia/kopia/repo"
)
var (
cacheSyncCommand = cacheCommands.Command("sync", "Synchronizes the metadata cache with blobs in storage")
)
func runCacheSyncCommand(ctx context.Context, rep *repo.DirectRepository) error {
return rep.Content.SyncMetadataCache(ctx)
}
func init() {
cacheSyncCommand.Action(directRepositoryAction(runCacheSyncCommand))
}

View File

@@ -1,28 +1,15 @@
package content
import (
"container/heap"
"context"
"os"
"path/filepath"
"sync"
"time"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/hmac"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
)
const (
defaultSweepFrequency = 1 * time.Minute
defaultTouchThreshold = 10 * time.Minute
)
type cacheKey string
type contentCache interface {
@@ -30,187 +17,13 @@ type contentCache interface {
getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error)
}
type contentCacheWithStorage struct {
st blob.Storage
cacheStorage blob.Storage
maxSizeBytes int64
hmacSecret []byte
sweepFrequency time.Duration
touchThreshold time.Duration
asyncWG sync.WaitGroup
closed chan struct{}
}
type contentToucher interface {
TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error
}
func adjustCacheKey(cacheKey cacheKey) cacheKey {
// content IDs with odd length have a single-byte prefix.
// move the prefix to the end of cache key to make sure the top level shard is spread 256 ways.
if len(cacheKey)%2 == 1 {
return cacheKey[1:] + cacheKey[0:1]
}
return cacheKey
}
func (c *contentCacheWithStorage) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
cacheKey = adjustCacheKey(cacheKey)
useCache := shouldUseContentCache(ctx)
if useCache {
if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil {
stats.Record(ctx,
metricContentCacheHitCount.M(1),
metricContentCacheHitBytes.M(int64(len(b))),
)
return b, nil
}
}
stats.Record(ctx, metricContentCacheMissCount.M(1))
b, err := c.st.GetBlob(ctx, blobID, offset, length)
if err != nil {
stats.Record(ctx, metricContentCacheMissErrors.M(1))
} else {
stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b))))
}
if err == blob.ErrBlobNotFound {
// not found in underlying storage
return nil, err
}
if err == nil && useCache {
// do not report cache writes as uploads.
if puterr := c.cacheStorage.PutBlob(
blob.WithUploadProgressCallback(ctx, nil),
blob.ID(cacheKey),
gather.FromSlice(hmac.Append(b, c.hmacSecret)),
); puterr != nil {
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr)
}
}
return b, err
}
func (c *contentCacheWithStorage) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte {
b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1)
if err == nil {
b, err = hmac.VerifyAndStrip(b, c.hmacSecret)
if err == nil {
if t, ok := c.cacheStorage.(contentToucher); ok {
t.TouchBlob(ctx, blob.ID(cacheKey), c.touchThreshold) //nolint:errcheck
}
// retrieved from cache and HMAC valid
return b
}
// ignore malformed contents
log(ctx).Warningf("malformed content %v: %v", cacheKey, err)
return nil
}
if err != blob.ErrBlobNotFound {
log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err)
}
return nil
}
func (c *contentCacheWithStorage) close() {
close(c.closed)
c.asyncWG.Wait()
}
func (c *contentCacheWithStorage) sweepDirectoryPeriodically(ctx context.Context) {
defer c.asyncWG.Done()
for {
select {
case <-c.closed:
return
case <-time.After(c.sweepFrequency):
err := c.sweepDirectory(ctx)
if err != nil {
log(ctx).Warningf("contentCacheWithStorage sweep failed: %v", err)
}
}
}
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap []blob.Metadata
func (h contentMetadataHeap) Len() int { return len(h) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(blob.Metadata))
}
func (h *contentMetadataHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
func (c *contentCacheWithStorage) sweepDirectory(ctx context.Context) (err error) {
t0 := time.Now() // allow:no-inject-time
var h contentMetadataHeap
var totalRetainedSize int64
err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.maxSizeBytes {
oldest := heap.Pop(&h).(blob.Metadata)
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
return nil
})
if err != nil {
return errors.Wrap(err, "error listing cache")
}
log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time
return nil
}
func newContentCache(ctx context.Context, st blob.Storage, caching CachingOptions, maxBytes int64, subdir string) (contentCache, error) {
func newCacheStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (blob.Storage, error) {
var cacheStorage blob.Storage
var err error
if maxBytes > 0 && caching.CacheDirectory != "" {
contentCacheDir := filepath.Join(caching.CacheDirectory, subdir)
if maxBytes > 0 && cacheDir != "" {
contentCacheDir := filepath.Join(cacheDir, subdir)
if _, err = os.Stat(contentCacheDir); os.IsNotExist(err) {
if mkdirerr := os.MkdirAll(contentCacheDir, 0700); mkdirerr != nil {
@@ -227,49 +40,5 @@ func newContentCache(ctx context.Context, st blob.Storage, caching CachingOption
}
}
return newContentCacheWithCacheStorage(ctx, st, cacheStorage, maxBytes, caching, defaultTouchThreshold, defaultSweepFrequency)
}
func newContentCacheWithCacheStorage(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, caching CachingOptions, touchThreshold, sweepFrequency time.Duration) (contentCache, error) {
if cacheStorage == nil {
return passthroughContentCache{st}, nil
}
c := &contentCacheWithStorage{
st: st,
cacheStorage: cacheStorage,
maxSizeBytes: maxSizeBytes,
hmacSecret: append([]byte(nil), caching.HMACSecret...),
closed: make(chan struct{}),
touchThreshold: touchThreshold,
sweepFrequency: sweepFrequency,
}
// errGood is a marker error to stop blob iteration quickly, does not
// indicate any problem.
var errGood = errors.Errorf("good")
// verify that cache storage is functional by listing from it
if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
return errGood
}); err != nil && err != errGood {
return nil, errors.Wrap(err, "unable to open cache")
}
c.asyncWG.Add(1)
go c.sweepDirectoryPeriodically(ctx)
return c, nil
}
// passthroughContentCache is a contentCache which does no caching.
type passthroughContentCache struct {
st blob.Storage
}
func (c passthroughContentCache) close() {}
func (c passthroughContentCache) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
return c.st.GetBlob(ctx, blobID, offset, length)
return cacheStorage, nil
}

View File

@@ -0,0 +1,187 @@
package content
import (
"container/heap"
"context"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
)
const (
defaultSweepFrequency = 1 * time.Minute
defaultTouchThreshold = 10 * time.Minute
mutexAgeCutoff = 5 * time.Minute
)
type mutextLRU struct {
mu *sync.Mutex
lastUsedNanoseconds int64
}
// cacheBase provides common implementation for per-content and per-blob caches
type cacheBase struct {
cacheStorage blob.Storage
maxSizeBytes int64
sweepFrequency time.Duration
touchThreshold time.Duration
asyncWG sync.WaitGroup
closed chan struct{}
// stores key to *mutexLRU mapping which is periodically garbage-collected
// and used to eliminate/minimize concurrent fetches of the same cached item.
loadingMap sync.Map
}
type contentToucher interface {
TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error
}
func (c *cacheBase) touch(ctx context.Context, blobID blob.ID) {
if t, ok := c.cacheStorage.(contentToucher); ok {
t.TouchBlob(ctx, blobID, c.touchThreshold) //nolint:errcheck
}
}
func (c *cacheBase) close() {
close(c.closed)
c.asyncWG.Wait()
}
func (c *cacheBase) perItemMutex(key interface{}) *sync.Mutex {
now := time.Now().UnixNano() // allow:no-inject-time
v, ok := c.loadingMap.Load(key)
if !ok {
v, _ = c.loadingMap.LoadOrStore(key, &mutextLRU{
mu: &sync.Mutex{},
lastUsedNanoseconds: now,
})
}
m := v.(*mutextLRU)
atomic.StoreInt64(&m.lastUsedNanoseconds, now)
return m.mu
}
func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) {
defer c.asyncWG.Done()
for {
select {
case <-c.closed:
return
case <-time.After(c.sweepFrequency):
c.sweepMutexes()
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Warningf("cacheBase sweep failed: %v", err)
}
}
}
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap []blob.Metadata
func (h contentMetadataHeap) Len() int { return len(h) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(blob.Metadata))
}
func (h *contentMetadataHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
func (c *cacheBase) sweepDirectory(ctx context.Context) (err error) {
t0 := time.Now() // allow:no-inject-time
var h contentMetadataHeap
var totalRetainedSize int64
err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.maxSizeBytes {
oldest := heap.Pop(&h).(blob.Metadata)
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
return nil
})
if err != nil {
return errors.Wrap(err, "error listing cache")
}
log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time
return nil
}
func (c *cacheBase) sweepMutexes() {
cutoffTime := time.Now().Add(-mutexAgeCutoff).UnixNano() // allow:no-inject-time
// remove from loadingMap all items that have not been touched recently.
// since the mutexes are only for performance (to avoid loading duplicates)
// and not for correctness, it's always safe to remove them.
c.loadingMap.Range(func(key, value interface{}) bool {
if m := value.(*mutextLRU); m.lastUsedNanoseconds < cutoffTime {
c.loadingMap.Delete(key)
}
return true
})
}
func newContentCacheBase(ctx context.Context, cacheStorage blob.Storage, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*cacheBase, error) {
c := &cacheBase{
cacheStorage: cacheStorage,
maxSizeBytes: maxSizeBytes,
closed: make(chan struct{}),
touchThreshold: touchThreshold,
sweepFrequency: sweepFrequency,
}
// errGood is a marker error to stop blob iteration quickly, does not
// indicate any problem.
var errGood = errors.Errorf("good")
// verify that cache storage is functional by listing from it
if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
return errGood
}); err != nil && err != errGood {
return nil, errors.Wrap(err, "unable to open cache")
}
c.asyncWG.Add(1)
go c.sweepDirectoryPeriodically(ctx)
return c, nil
}

View File

@@ -0,0 +1,114 @@
package content
import (
"context"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/hmac"
"github.com/kopia/kopia/repo/blob"
)
type contentCacheForData struct {
*cacheBase
st blob.Storage
hmacSecret []byte
}
func adjustCacheKey(cacheKey cacheKey) cacheKey {
// content IDs with odd length have a single-byte prefix.
// move the prefix to the end of cache key to make sure the top level shard is spread 256 ways.
if len(cacheKey)%2 == 1 {
return cacheKey[1:] + cacheKey[0:1]
}
return cacheKey
}
func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
cacheKey = adjustCacheKey(cacheKey)
useCache := shouldUseContentCache(ctx)
if useCache {
if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil {
stats.Record(ctx,
metricContentCacheHitCount.M(1),
metricContentCacheHitBytes.M(int64(len(b))),
)
return b, nil
}
}
stats.Record(ctx, metricContentCacheMissCount.M(1))
b, err := c.st.GetBlob(ctx, blobID, offset, length)
if err != nil {
stats.Record(ctx, metricContentCacheMissErrors.M(1))
} else {
stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b))))
}
if err == blob.ErrBlobNotFound {
// not found in underlying storage
return nil, err
}
if err == nil && useCache {
// do not report cache writes as uploads.
if puterr := c.cacheStorage.PutBlob(
blob.WithUploadProgressCallback(ctx, nil),
blob.ID(cacheKey),
gather.FromSlice(hmac.Append(b, c.hmacSecret)),
); puterr != nil {
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr)
}
}
return b, err
}
func (c *contentCacheForData) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte {
b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1)
if err == nil {
b, err = hmac.VerifyAndStrip(b, c.hmacSecret)
if err == nil {
c.touch(ctx, blob.ID(cacheKey))
// retrieved from cache and HMAC valid
return b
}
// ignore malformed contents
log(ctx).Warningf("malformed content %v: %v", cacheKey, err)
return nil
}
if err != blob.ErrBlobNotFound {
log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err)
}
return nil
}
func newContentCacheForData(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) {
if cacheStorage == nil {
return passthroughContentCache{st}, nil
}
cb, err := newContentCacheBase(ctx, cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency)
if err != nil {
return nil, errors.Wrap(err, "unable to create base cache")
}
return &contentCacheForData{
st: st,
hmacSecret: append([]byte(nil), hmacSecret...),
cacheBase: cb,
}, nil
}

View File

@@ -0,0 +1,128 @@
package content
import (
"context"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
)
const metadataCacheSyncParallelism = 16
type contentCacheForMetadata struct {
*cacheBase
st blob.Storage
}
// sync synchronizes metadata cache with all blobs found in the storage.
func (c *contentCacheForMetadata) sync(ctx context.Context) error {
sem := make(chan struct{}, metadataCacheSyncParallelism)
log(ctx).Debugf("synchronizing metadata cache...")
defer log(ctx).Debugf("finished synchronizing metadata cache.")
var eg errgroup.Group
// list all blobs and fetch contents into cache in parallel.
if err := c.st.ListBlobs(ctx, PackBlobIDPrefixSpecial, func(bm blob.Metadata) error {
// acquire semaphore
sem <- struct{}{}
eg.Go(func() error {
defer func() {
<-sem
}()
_, err := c.getContent(ctx, "dummy", bm.BlobID, 0, 1)
return err
})
return nil
}); err != nil {
return errors.Wrap(err, "error listing blobs")
}
return eg.Wait()
}
func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
m := c.perItemMutex(blobID)
m.Lock()
defer m.Unlock()
useCache := shouldUseContentCache(ctx)
if useCache {
if v, err := c.cacheBase.cacheStorage.GetBlob(ctx, blobID, offset, length); err == nil {
// cache hit
stats.Record(ctx,
metricContentCacheHitCount.M(1),
metricContentCacheHitBytes.M(int64(len(v))),
)
return v, nil
}
}
stats.Record(ctx, metricContentCacheMissCount.M(1))
// read the entire blob
log(ctx).Debugf("fetching metadata blob %q", blobID)
blobData, err := c.st.GetBlob(ctx, blobID, 0, -1)
if err != nil {
stats.Record(ctx, metricContentCacheMissErrors.M(1))
} else {
stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(blobData))))
}
if err == blob.ErrBlobNotFound {
// not found in underlying storage
return nil, err
}
if err != nil {
return nil, err
}
if useCache {
// store the whole blob in the cache, do not report cache writes as uploads.
if puterr := c.cacheStorage.PutBlob(
blob.WithUploadProgressCallback(ctx, nil),
blobID,
gather.FromSlice(blobData),
); puterr != nil {
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr)
}
}
if offset == 0 && length == -1 {
return blobData, err
}
if offset < 0 || offset+length > int64(len(blobData)) {
return nil, errors.Errorf("invalid (offset=%v,length=%v) for blob %q of size %v", offset, length, blobID, len(blobData))
}
return blobData[offset : offset+length], nil
}
func newContentCacheForMetadata(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64) (contentCache, error) {
if cacheStorage == nil {
return passthroughContentCache{st}, nil
}
cb, err := newContentCacheBase(ctx, cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency)
if err != nil {
return nil, errors.Wrap(err, "unable to create base cache")
}
return &contentCacheForMetadata{
st: st,
cacheBase: cb,
}, nil
}

View File

@@ -0,0 +1,18 @@
package content
import (
"context"
"github.com/kopia/kopia/repo/blob"
)
// passthroughContentCache is a contentCache which does no caching.
type passthroughContentCache struct {
st blob.Storage
}
func (c passthroughContentCache) close() {}
func (c passthroughContentCache) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
return c.st.GetBlob(ctx, blobID, offset, length)
}

View File

@@ -50,9 +50,14 @@ func TestCacheExpiration(t *testing.T) {
underlyingStorage := newUnderlyingStorageForContentCacheTesting(t)
cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, cacheStorage, 10000, CachingOptions{}, 0, 500*time.Millisecond)
cb, err := newContentCacheBase(testlogging.Context(t), cacheStorage, 10000, 0, 500*time.Millisecond)
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("unable to create base cache: %v", err)
}
cache := &contentCacheForData{
st: underlyingStorage,
cacheBase: cb,
}
defer cache.close()
@@ -105,10 +110,14 @@ func TestDiskContentCache(t *testing.T) {
defer os.RemoveAll(tmpDir)
cache, err := newContentCache(ctx, newUnderlyingStorageForContentCacheTesting(t), CachingOptions{
CacheDirectory: tmpDir,
}, 10000, "contents")
const maxBytes = 10000
cacheStorage, err := newCacheStorageOrNil(ctx, tmpDir, maxBytes, "contents")
if err != nil {
t.Fatal(err)
}
cache, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -154,12 +163,12 @@ func verifyContentCache(t *testing.T, cache contentCache) {
}
}
verifyStorageContentList(t, cache.(*contentCacheWithStorage).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5")
verifyStorageContentList(t, cache.(*contentCacheForData).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5")
})
t.Run("DataCorruption", func(t *testing.T) {
var cacheKey blob.ID = "f0f0f1x"
d, err := cache.(*contentCacheWithStorage).cacheStorage.GetBlob(ctx, cacheKey, 0, -1)
d, err := cache.(*contentCacheForData).cacheStorage.GetBlob(ctx, cacheKey, 0, -1)
if err != nil {
t.Fatalf("unable to retrieve data from cache: %v", err)
}
@@ -167,7 +176,7 @@ func verifyContentCache(t *testing.T, cache contentCache) {
// corrupt the data and write back
d[0] ^= 1
if puterr := cache.(*contentCacheWithStorage).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil {
if puterr := cache.(*contentCacheForData).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil {
t.Fatalf("unable to write corrupted content: %v", puterr)
}
@@ -197,13 +206,13 @@ func TestCacheFailureToOpen(t *testing.T) {
}
// Will fail because of ListBlobs failure.
_, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour)
_, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
if err == nil || !strings.Contains(err.Error(), someError.Error()) {
t.Errorf("invalid error %v, wanted: %v", err, someError)
}
// ListBlobs fails only once, next time it succeeds.
cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 100*time.Millisecond)
cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -221,7 +230,7 @@ func TestCacheFailureToWrite(t *testing.T) {
Base: cacheStorage,
}
cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour)
cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -264,7 +273,7 @@ func TestCacheFailureToRead(t *testing.T) {
Base: cacheStorage,
}
cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour)
cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -650,6 +650,17 @@ func (bm *Manager) Refresh(ctx context.Context) (bool, error) {
return updated, err
}
// SyncMetadataCache synchronizes metadata cache with metadata blobs in storage.
func (bm *Manager) SyncMetadataCache(ctx context.Context) error {
if cm, ok := bm.metadataCache.(*contentCacheForMetadata); ok {
return cm.sync(ctx)
}
log(ctx).Debugf("metadata cache not enabled")
return nil
}
// ManagerOptions are the optional parameters for manager creation
type ManagerOptions struct {
RepositoryFormatBytes []byte
@@ -680,7 +691,12 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp
return nil, err
}
dataCache, err := newContentCache(ctx, st, caching, caching.MaxCacheSizeBytes, "contents")
dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents")
if err != nil {
return nil, errors.Wrap(err, "unable to initialize data cache storage")
}
dataCache, err := newContentCacheForData(ctx, st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize content cache")
}
@@ -690,7 +706,12 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp
metadataCacheSize = caching.MaxCacheSizeBytes
}
metadataCache, err := newContentCache(ctx, st, caching, metadataCacheSize, "metadata")
metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata")
if err != nil {
return nil, errors.Wrap(err, "unable to initialize data cache storage")
}
metadataCache, err := newContentCacheForMetadata(ctx, st, metadataCacheStorage, metadataCacheSize)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize metadata cache")
}

View File

@@ -321,7 +321,7 @@ func (bm *lockFreeManager) IndexBlobs(ctx context.Context) ([]IndexBlobInfo, err
}
func (bm *lockFreeManager) getIndexBlobInternal(ctx context.Context, blobID blob.ID) ([]byte, error) {
payload, err := bm.contentCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1)
payload, err := bm.metadataCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1)
if err != nil {
return nil, err
}