From a462798b28e414ea6a9da69da338973f8f1bf1ef Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 20 Apr 2020 02:18:43 -0700 Subject: [PATCH] content: added blob-level metadata cache (#421) Unlike regular cache, which caches segments of blobs on a per-content basis, metadata cache will fetch and store the entire metadata blob (q) when any of the contents in it is accessed. Given that there are relatively few metadata blobs compared to data (p) blobs, this will reduce the traffic to the underlying store and improve performance of Snapshot GC which only relies on metadata contents. --- cli/command_cache_sync.go | 19 ++ repo/content/content_cache.go | 239 +--------------------- repo/content/content_cache_base.go | 187 +++++++++++++++++ repo/content/content_cache_data.go | 114 +++++++++++ repo/content/content_cache_metadata.go | 128 ++++++++++++ repo/content/content_cache_passthrough.go | 18 ++ repo/content/content_cache_test.go | 33 +-- repo/content/content_manager.go | 25 ++- repo/content/content_manager_lock_free.go | 2 +- 9 files changed, 515 insertions(+), 250 deletions(-) create mode 100644 cli/command_cache_sync.go create mode 100644 repo/content/content_cache_base.go create mode 100644 repo/content/content_cache_data.go create mode 100644 repo/content/content_cache_metadata.go create mode 100644 repo/content/content_cache_passthrough.go diff --git a/cli/command_cache_sync.go b/cli/command_cache_sync.go new file mode 100644 index 000000000..bad4017f7 --- /dev/null +++ b/cli/command_cache_sync.go @@ -0,0 +1,19 @@ +package cli + +import ( + "context" + + "github.com/kopia/kopia/repo" +) + +var ( + cacheSyncCommand = cacheCommands.Command("sync", "Synchronizes the metadata cache with blobs in storage") +) + +func runCacheSyncCommand(ctx context.Context, rep *repo.DirectRepository) error { + return rep.Content.SyncMetadataCache(ctx) +} + +func init() { + cacheSyncCommand.Action(directRepositoryAction(runCacheSyncCommand)) +} diff --git a/repo/content/content_cache.go b/repo/content/content_cache.go index de0f91267..d258fda8d 100644 --- a/repo/content/content_cache.go +++ b/repo/content/content_cache.go @@ -1,28 +1,15 @@ package content import ( - "container/heap" "context" "os" "path/filepath" - "sync" - "time" - - "github.com/pkg/errors" - "go.opencensus.io/stats" "github.com/kopia/kopia/internal/ctxutil" - "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/hmac" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/filesystem" ) -const ( - defaultSweepFrequency = 1 * time.Minute - defaultTouchThreshold = 10 * time.Minute -) - type cacheKey string type contentCache interface { @@ -30,187 +17,13 @@ type contentCache interface { getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) } -type contentCacheWithStorage struct { - st blob.Storage - cacheStorage blob.Storage - maxSizeBytes int64 - hmacSecret []byte - sweepFrequency time.Duration - touchThreshold time.Duration - - asyncWG sync.WaitGroup - closed chan struct{} -} - -type contentToucher interface { - TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error -} - -func adjustCacheKey(cacheKey cacheKey) cacheKey { - // content IDs with odd length have a single-byte prefix. - // move the prefix to the end of cache key to make sure the top level shard is spread 256 ways. - if len(cacheKey)%2 == 1 { - return cacheKey[1:] + cacheKey[0:1] - } - - return cacheKey -} - -func (c *contentCacheWithStorage) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { - cacheKey = adjustCacheKey(cacheKey) - - useCache := shouldUseContentCache(ctx) - if useCache { - if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil { - stats.Record(ctx, - metricContentCacheHitCount.M(1), - metricContentCacheHitBytes.M(int64(len(b))), - ) - - return b, nil - } - } - - stats.Record(ctx, metricContentCacheMissCount.M(1)) - - b, err := c.st.GetBlob(ctx, blobID, offset, length) - if err != nil { - stats.Record(ctx, metricContentCacheMissErrors.M(1)) - } else { - stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b)))) - } - - if err == blob.ErrBlobNotFound { - // not found in underlying storage - return nil, err - } - - if err == nil && useCache { - // do not report cache writes as uploads. - if puterr := c.cacheStorage.PutBlob( - blob.WithUploadProgressCallback(ctx, nil), - blob.ID(cacheKey), - gather.FromSlice(hmac.Append(b, c.hmacSecret)), - ); puterr != nil { - stats.Record(ctx, metricContentCacheStoreErrors.M(1)) - log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr) - } - } - - return b, err -} - -func (c *contentCacheWithStorage) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte { - b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1) - if err == nil { - b, err = hmac.VerifyAndStrip(b, c.hmacSecret) - if err == nil { - if t, ok := c.cacheStorage.(contentToucher); ok { - t.TouchBlob(ctx, blob.ID(cacheKey), c.touchThreshold) //nolint:errcheck - } - - // retrieved from cache and HMAC valid - return b - } - - // ignore malformed contents - log(ctx).Warningf("malformed content %v: %v", cacheKey, err) - - return nil - } - - if err != blob.ErrBlobNotFound { - log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err) - } - - return nil -} - -func (c *contentCacheWithStorage) close() { - close(c.closed) - c.asyncWG.Wait() -} - -func (c *contentCacheWithStorage) sweepDirectoryPeriodically(ctx context.Context) { - defer c.asyncWG.Done() - - for { - select { - case <-c.closed: - return - - case <-time.After(c.sweepFrequency): - err := c.sweepDirectory(ctx) - if err != nil { - log(ctx).Warningf("contentCacheWithStorage sweep failed: %v", err) - } - } - } -} - -// A contentMetadataHeap implements heap.Interface and holds blob.Metadata. -type contentMetadataHeap []blob.Metadata - -func (h contentMetadataHeap) Len() int { return len(h) } - -func (h contentMetadataHeap) Less(i, j int) bool { - return h[i].Timestamp.Before(h[j].Timestamp) -} - -func (h contentMetadataHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *contentMetadataHeap) Push(x interface{}) { - *h = append(*h, x.(blob.Metadata)) -} - -func (h *contentMetadataHeap) Pop() interface{} { - old := *h - n := len(old) - item := old[n-1] - *h = old[0 : n-1] - - return item -} - -func (c *contentCacheWithStorage) sweepDirectory(ctx context.Context) (err error) { - t0 := time.Now() // allow:no-inject-time - - var h contentMetadataHeap - - var totalRetainedSize int64 - - err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - heap.Push(&h, it) - totalRetainedSize += it.Length - - if totalRetainedSize > c.maxSizeBytes { - oldest := heap.Pop(&h).(blob.Metadata) - if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { - log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr) - } else { - totalRetainedSize -= oldest.Length - } - } - return nil - }) - if err != nil { - return errors.Wrap(err, "error listing cache") - } - - log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time - - return nil -} - -func newContentCache(ctx context.Context, st blob.Storage, caching CachingOptions, maxBytes int64, subdir string) (contentCache, error) { +func newCacheStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (blob.Storage, error) { var cacheStorage blob.Storage var err error - if maxBytes > 0 && caching.CacheDirectory != "" { - contentCacheDir := filepath.Join(caching.CacheDirectory, subdir) + if maxBytes > 0 && cacheDir != "" { + contentCacheDir := filepath.Join(cacheDir, subdir) if _, err = os.Stat(contentCacheDir); os.IsNotExist(err) { if mkdirerr := os.MkdirAll(contentCacheDir, 0700); mkdirerr != nil { @@ -227,49 +40,5 @@ func newContentCache(ctx context.Context, st blob.Storage, caching CachingOption } } - return newContentCacheWithCacheStorage(ctx, st, cacheStorage, maxBytes, caching, defaultTouchThreshold, defaultSweepFrequency) -} - -func newContentCacheWithCacheStorage(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, caching CachingOptions, touchThreshold, sweepFrequency time.Duration) (contentCache, error) { - if cacheStorage == nil { - return passthroughContentCache{st}, nil - } - - c := &contentCacheWithStorage{ - st: st, - cacheStorage: cacheStorage, - maxSizeBytes: maxSizeBytes, - hmacSecret: append([]byte(nil), caching.HMACSecret...), - closed: make(chan struct{}), - touchThreshold: touchThreshold, - sweepFrequency: sweepFrequency, - } - - // errGood is a marker error to stop blob iteration quickly, does not - // indicate any problem. - var errGood = errors.Errorf("good") - - // verify that cache storage is functional by listing from it - if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - return errGood - }); err != nil && err != errGood { - return nil, errors.Wrap(err, "unable to open cache") - } - - c.asyncWG.Add(1) - - go c.sweepDirectoryPeriodically(ctx) - - return c, nil -} - -// passthroughContentCache is a contentCache which does no caching. -type passthroughContentCache struct { - st blob.Storage -} - -func (c passthroughContentCache) close() {} - -func (c passthroughContentCache) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { - return c.st.GetBlob(ctx, blobID, offset, length) + return cacheStorage, nil } diff --git a/repo/content/content_cache_base.go b/repo/content/content_cache_base.go new file mode 100644 index 000000000..0e390e848 --- /dev/null +++ b/repo/content/content_cache_base.go @@ -0,0 +1,187 @@ +package content + +import ( + "container/heap" + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" +) + +const ( + defaultSweepFrequency = 1 * time.Minute + defaultTouchThreshold = 10 * time.Minute + mutexAgeCutoff = 5 * time.Minute +) + +type mutextLRU struct { + mu *sync.Mutex + lastUsedNanoseconds int64 +} + +// cacheBase provides common implementation for per-content and per-blob caches +type cacheBase struct { + cacheStorage blob.Storage + maxSizeBytes int64 + sweepFrequency time.Duration + touchThreshold time.Duration + + asyncWG sync.WaitGroup + closed chan struct{} + + // stores key to *mutexLRU mapping which is periodically garbage-collected + // and used to eliminate/minimize concurrent fetches of the same cached item. + loadingMap sync.Map +} + +type contentToucher interface { + TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error +} + +func (c *cacheBase) touch(ctx context.Context, blobID blob.ID) { + if t, ok := c.cacheStorage.(contentToucher); ok { + t.TouchBlob(ctx, blobID, c.touchThreshold) //nolint:errcheck + } +} + +func (c *cacheBase) close() { + close(c.closed) + c.asyncWG.Wait() +} + +func (c *cacheBase) perItemMutex(key interface{}) *sync.Mutex { + now := time.Now().UnixNano() // allow:no-inject-time + + v, ok := c.loadingMap.Load(key) + if !ok { + v, _ = c.loadingMap.LoadOrStore(key, &mutextLRU{ + mu: &sync.Mutex{}, + lastUsedNanoseconds: now, + }) + } + + m := v.(*mutextLRU) + atomic.StoreInt64(&m.lastUsedNanoseconds, now) + + return m.mu +} + +func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) { + defer c.asyncWG.Done() + + for { + select { + case <-c.closed: + return + + case <-time.After(c.sweepFrequency): + c.sweepMutexes() + + if err := c.sweepDirectory(ctx); err != nil { + log(ctx).Warningf("cacheBase sweep failed: %v", err) + } + } + } +} + +// A contentMetadataHeap implements heap.Interface and holds blob.Metadata. +type contentMetadataHeap []blob.Metadata + +func (h contentMetadataHeap) Len() int { return len(h) } + +func (h contentMetadataHeap) Less(i, j int) bool { + return h[i].Timestamp.Before(h[j].Timestamp) +} + +func (h contentMetadataHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *contentMetadataHeap) Push(x interface{}) { + *h = append(*h, x.(blob.Metadata)) +} + +func (h *contentMetadataHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + + return item +} + +func (c *cacheBase) sweepDirectory(ctx context.Context) (err error) { + t0 := time.Now() // allow:no-inject-time + + var h contentMetadataHeap + + var totalRetainedSize int64 + + err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + heap.Push(&h, it) + totalRetainedSize += it.Length + + if totalRetainedSize > c.maxSizeBytes { + oldest := heap.Pop(&h).(blob.Metadata) + if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { + log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr) + } else { + totalRetainedSize -= oldest.Length + } + } + return nil + }) + if err != nil { + return errors.Wrap(err, "error listing cache") + } + + log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time + + return nil +} + +func (c *cacheBase) sweepMutexes() { + cutoffTime := time.Now().Add(-mutexAgeCutoff).UnixNano() // allow:no-inject-time + + // remove from loadingMap all items that have not been touched recently. + // since the mutexes are only for performance (to avoid loading duplicates) + // and not for correctness, it's always safe to remove them. + c.loadingMap.Range(func(key, value interface{}) bool { + if m := value.(*mutextLRU); m.lastUsedNanoseconds < cutoffTime { + c.loadingMap.Delete(key) + } + + return true + }) +} + +func newContentCacheBase(ctx context.Context, cacheStorage blob.Storage, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*cacheBase, error) { + c := &cacheBase{ + cacheStorage: cacheStorage, + maxSizeBytes: maxSizeBytes, + closed: make(chan struct{}), + touchThreshold: touchThreshold, + sweepFrequency: sweepFrequency, + } + + // errGood is a marker error to stop blob iteration quickly, does not + // indicate any problem. + var errGood = errors.Errorf("good") + + // verify that cache storage is functional by listing from it + if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + return errGood + }); err != nil && err != errGood { + return nil, errors.Wrap(err, "unable to open cache") + } + + c.asyncWG.Add(1) + + go c.sweepDirectoryPeriodically(ctx) + + return c, nil +} diff --git a/repo/content/content_cache_data.go b/repo/content/content_cache_data.go new file mode 100644 index 000000000..928c77e28 --- /dev/null +++ b/repo/content/content_cache_data.go @@ -0,0 +1,114 @@ +package content + +import ( + "context" + + "github.com/pkg/errors" + "go.opencensus.io/stats" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/hmac" + "github.com/kopia/kopia/repo/blob" +) + +type contentCacheForData struct { + *cacheBase + + st blob.Storage + hmacSecret []byte +} + +func adjustCacheKey(cacheKey cacheKey) cacheKey { + // content IDs with odd length have a single-byte prefix. + // move the prefix to the end of cache key to make sure the top level shard is spread 256 ways. + if len(cacheKey)%2 == 1 { + return cacheKey[1:] + cacheKey[0:1] + } + + return cacheKey +} + +func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { + cacheKey = adjustCacheKey(cacheKey) + + useCache := shouldUseContentCache(ctx) + if useCache { + if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil { + stats.Record(ctx, + metricContentCacheHitCount.M(1), + metricContentCacheHitBytes.M(int64(len(b))), + ) + + return b, nil + } + } + + stats.Record(ctx, metricContentCacheMissCount.M(1)) + + b, err := c.st.GetBlob(ctx, blobID, offset, length) + if err != nil { + stats.Record(ctx, metricContentCacheMissErrors.M(1)) + } else { + stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b)))) + } + + if err == blob.ErrBlobNotFound { + // not found in underlying storage + return nil, err + } + + if err == nil && useCache { + // do not report cache writes as uploads. + if puterr := c.cacheStorage.PutBlob( + blob.WithUploadProgressCallback(ctx, nil), + blob.ID(cacheKey), + gather.FromSlice(hmac.Append(b, c.hmacSecret)), + ); puterr != nil { + stats.Record(ctx, metricContentCacheStoreErrors.M(1)) + log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr) + } + } + + return b, err +} + +func (c *contentCacheForData) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte { + b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1) + if err == nil { + b, err = hmac.VerifyAndStrip(b, c.hmacSecret) + if err == nil { + c.touch(ctx, blob.ID(cacheKey)) + + // retrieved from cache and HMAC valid + return b + } + + // ignore malformed contents + log(ctx).Warningf("malformed content %v: %v", cacheKey, err) + + return nil + } + + if err != blob.ErrBlobNotFound { + log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err) + } + + return nil +} + +func newContentCacheForData(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) { + if cacheStorage == nil { + return passthroughContentCache{st}, nil + } + + cb, err := newContentCacheBase(ctx, cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency) + if err != nil { + return nil, errors.Wrap(err, "unable to create base cache") + } + + return &contentCacheForData{ + st: st, + hmacSecret: append([]byte(nil), hmacSecret...), + cacheBase: cb, + }, nil +} diff --git a/repo/content/content_cache_metadata.go b/repo/content/content_cache_metadata.go new file mode 100644 index 000000000..b08c40ae6 --- /dev/null +++ b/repo/content/content_cache_metadata.go @@ -0,0 +1,128 @@ +package content + +import ( + "context" + + "github.com/pkg/errors" + "go.opencensus.io/stats" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" +) + +const metadataCacheSyncParallelism = 16 + +type contentCacheForMetadata struct { + *cacheBase + + st blob.Storage +} + +// sync synchronizes metadata cache with all blobs found in the storage. +func (c *contentCacheForMetadata) sync(ctx context.Context) error { + sem := make(chan struct{}, metadataCacheSyncParallelism) + + log(ctx).Debugf("synchronizing metadata cache...") + defer log(ctx).Debugf("finished synchronizing metadata cache.") + + var eg errgroup.Group + + // list all blobs and fetch contents into cache in parallel. + if err := c.st.ListBlobs(ctx, PackBlobIDPrefixSpecial, func(bm blob.Metadata) error { + // acquire semaphore + sem <- struct{}{} + eg.Go(func() error { + defer func() { + <-sem + }() + + _, err := c.getContent(ctx, "dummy", bm.BlobID, 0, 1) + return err + }) + + return nil + }); err != nil { + return errors.Wrap(err, "error listing blobs") + } + + return eg.Wait() +} + +func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { + m := c.perItemMutex(blobID) + m.Lock() + defer m.Unlock() + + useCache := shouldUseContentCache(ctx) + if useCache { + if v, err := c.cacheBase.cacheStorage.GetBlob(ctx, blobID, offset, length); err == nil { + // cache hit + stats.Record(ctx, + metricContentCacheHitCount.M(1), + metricContentCacheHitBytes.M(int64(len(v))), + ) + + return v, nil + } + } + + stats.Record(ctx, metricContentCacheMissCount.M(1)) + + // read the entire blob + log(ctx).Debugf("fetching metadata blob %q", blobID) + blobData, err := c.st.GetBlob(ctx, blobID, 0, -1) + + if err != nil { + stats.Record(ctx, metricContentCacheMissErrors.M(1)) + } else { + stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(blobData)))) + } + + if err == blob.ErrBlobNotFound { + // not found in underlying storage + return nil, err + } + + if err != nil { + return nil, err + } + + if useCache { + // store the whole blob in the cache, do not report cache writes as uploads. + if puterr := c.cacheStorage.PutBlob( + blob.WithUploadProgressCallback(ctx, nil), + blobID, + gather.FromSlice(blobData), + ); puterr != nil { + stats.Record(ctx, metricContentCacheStoreErrors.M(1)) + log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr) + } + } + + if offset == 0 && length == -1 { + return blobData, err + } + + if offset < 0 || offset+length > int64(len(blobData)) { + return nil, errors.Errorf("invalid (offset=%v,length=%v) for blob %q of size %v", offset, length, blobID, len(blobData)) + } + + return blobData[offset : offset+length], nil +} + +func newContentCacheForMetadata(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64) (contentCache, error) { + if cacheStorage == nil { + return passthroughContentCache{st}, nil + } + + cb, err := newContentCacheBase(ctx, cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency) + if err != nil { + return nil, errors.Wrap(err, "unable to create base cache") + } + + return &contentCacheForMetadata{ + st: st, + cacheBase: cb, + }, nil +} diff --git a/repo/content/content_cache_passthrough.go b/repo/content/content_cache_passthrough.go new file mode 100644 index 000000000..4217f3201 --- /dev/null +++ b/repo/content/content_cache_passthrough.go @@ -0,0 +1,18 @@ +package content + +import ( + "context" + + "github.com/kopia/kopia/repo/blob" +) + +// passthroughContentCache is a contentCache which does no caching. +type passthroughContentCache struct { + st blob.Storage +} + +func (c passthroughContentCache) close() {} + +func (c passthroughContentCache) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { + return c.st.GetBlob(ctx, blobID, offset, length) +} diff --git a/repo/content/content_cache_test.go b/repo/content/content_cache_test.go index a3ca00749..4fa3e4811 100644 --- a/repo/content/content_cache_test.go +++ b/repo/content/content_cache_test.go @@ -50,9 +50,14 @@ func TestCacheExpiration(t *testing.T) { underlyingStorage := newUnderlyingStorageForContentCacheTesting(t) - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, cacheStorage, 10000, CachingOptions{}, 0, 500*time.Millisecond) + cb, err := newContentCacheBase(testlogging.Context(t), cacheStorage, 10000, 0, 500*time.Millisecond) if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("unable to create base cache: %v", err) + } + + cache := &contentCacheForData{ + st: underlyingStorage, + cacheBase: cb, } defer cache.close() @@ -105,10 +110,14 @@ func TestDiskContentCache(t *testing.T) { defer os.RemoveAll(tmpDir) - cache, err := newContentCache(ctx, newUnderlyingStorageForContentCacheTesting(t), CachingOptions{ - CacheDirectory: tmpDir, - }, 10000, "contents") + const maxBytes = 10000 + cacheStorage, err := newCacheStorageOrNil(ctx, tmpDir, maxBytes, "contents") + if err != nil { + t.Fatal(err) + } + + cache, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -154,12 +163,12 @@ func verifyContentCache(t *testing.T, cache contentCache) { } } - verifyStorageContentList(t, cache.(*contentCacheWithStorage).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5") + verifyStorageContentList(t, cache.(*contentCacheForData).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5") }) t.Run("DataCorruption", func(t *testing.T) { var cacheKey blob.ID = "f0f0f1x" - d, err := cache.(*contentCacheWithStorage).cacheStorage.GetBlob(ctx, cacheKey, 0, -1) + d, err := cache.(*contentCacheForData).cacheStorage.GetBlob(ctx, cacheKey, 0, -1) if err != nil { t.Fatalf("unable to retrieve data from cache: %v", err) } @@ -167,7 +176,7 @@ func verifyContentCache(t *testing.T, cache contentCache) { // corrupt the data and write back d[0] ^= 1 - if puterr := cache.(*contentCacheWithStorage).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil { + if puterr := cache.(*contentCacheForData).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil { t.Fatalf("unable to write corrupted content: %v", puterr) } @@ -197,13 +206,13 @@ func TestCacheFailureToOpen(t *testing.T) { } // Will fail because of ListBlobs failure. - _, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour) + _, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err == nil || !strings.Contains(err.Error(), someError.Error()) { t.Errorf("invalid error %v, wanted: %v", err, someError) } // ListBlobs fails only once, next time it succeeds. - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 100*time.Millisecond) + cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -221,7 +230,7 @@ func TestCacheFailureToWrite(t *testing.T) { Base: cacheStorage, } - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour) + cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -264,7 +273,7 @@ func TestCacheFailureToRead(t *testing.T) { Base: cacheStorage, } - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour) + cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index bd2a3b884..efe10f3eb 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -650,6 +650,17 @@ func (bm *Manager) Refresh(ctx context.Context) (bool, error) { return updated, err } +// SyncMetadataCache synchronizes metadata cache with metadata blobs in storage. +func (bm *Manager) SyncMetadataCache(ctx context.Context) error { + if cm, ok := bm.metadataCache.(*contentCacheForMetadata); ok { + return cm.sync(ctx) + } + + log(ctx).Debugf("metadata cache not enabled") + + return nil +} + // ManagerOptions are the optional parameters for manager creation type ManagerOptions struct { RepositoryFormatBytes []byte @@ -680,7 +691,12 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp return nil, err } - dataCache, err := newContentCache(ctx, st, caching, caching.MaxCacheSizeBytes, "contents") + dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") + if err != nil { + return nil, errors.Wrap(err, "unable to initialize data cache storage") + } + + dataCache, err := newContentCacheForData(ctx, st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) if err != nil { return nil, errors.Wrap(err, "unable to initialize content cache") } @@ -690,7 +706,12 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp metadataCacheSize = caching.MaxCacheSizeBytes } - metadataCache, err := newContentCache(ctx, st, caching, metadataCacheSize, "metadata") + metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") + if err != nil { + return nil, errors.Wrap(err, "unable to initialize data cache storage") + } + + metadataCache, err := newContentCacheForMetadata(ctx, st, metadataCacheStorage, metadataCacheSize) if err != nil { return nil, errors.Wrap(err, "unable to initialize metadata cache") } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 5119e6744..22473ca9c 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -321,7 +321,7 @@ func (bm *lockFreeManager) IndexBlobs(ctx context.Context) ([]IndexBlobInfo, err } func (bm *lockFreeManager) getIndexBlobInternal(ctx context.Context, blobID blob.ID) ([]byte, error) { - payload, err := bm.contentCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) + payload, err := bm.metadataCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) if err != nil { return nil, err }