diff --git a/cli/command_cache_sync.go b/cli/command_cache_sync.go new file mode 100644 index 000000000..bad4017f7 --- /dev/null +++ b/cli/command_cache_sync.go @@ -0,0 +1,19 @@ +package cli + +import ( + "context" + + "github.com/kopia/kopia/repo" +) + +var ( + cacheSyncCommand = cacheCommands.Command("sync", "Synchronizes the metadata cache with blobs in storage") +) + +func runCacheSyncCommand(ctx context.Context, rep *repo.DirectRepository) error { + return rep.Content.SyncMetadataCache(ctx) +} + +func init() { + cacheSyncCommand.Action(directRepositoryAction(runCacheSyncCommand)) +} diff --git a/repo/content/content_cache.go b/repo/content/content_cache.go index de0f91267..d258fda8d 100644 --- a/repo/content/content_cache.go +++ b/repo/content/content_cache.go @@ -1,28 +1,15 @@ package content import ( - "container/heap" "context" "os" "path/filepath" - "sync" - "time" - - "github.com/pkg/errors" - "go.opencensus.io/stats" "github.com/kopia/kopia/internal/ctxutil" - "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/hmac" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/filesystem" ) -const ( - defaultSweepFrequency = 1 * time.Minute - defaultTouchThreshold = 10 * time.Minute -) - type cacheKey string type contentCache interface { @@ -30,187 +17,13 @@ type contentCache interface { getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) } -type contentCacheWithStorage struct { - st blob.Storage - cacheStorage blob.Storage - maxSizeBytes int64 - hmacSecret []byte - sweepFrequency time.Duration - touchThreshold time.Duration - - asyncWG sync.WaitGroup - closed chan struct{} -} - -type contentToucher interface { - TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error -} - -func adjustCacheKey(cacheKey cacheKey) cacheKey { - // content IDs with odd length have a single-byte prefix. - // move the prefix to the end of cache key to make sure the top level shard is spread 256 ways. - if len(cacheKey)%2 == 1 { - return cacheKey[1:] + cacheKey[0:1] - } - - return cacheKey -} - -func (c *contentCacheWithStorage) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { - cacheKey = adjustCacheKey(cacheKey) - - useCache := shouldUseContentCache(ctx) - if useCache { - if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil { - stats.Record(ctx, - metricContentCacheHitCount.M(1), - metricContentCacheHitBytes.M(int64(len(b))), - ) - - return b, nil - } - } - - stats.Record(ctx, metricContentCacheMissCount.M(1)) - - b, err := c.st.GetBlob(ctx, blobID, offset, length) - if err != nil { - stats.Record(ctx, metricContentCacheMissErrors.M(1)) - } else { - stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b)))) - } - - if err == blob.ErrBlobNotFound { - // not found in underlying storage - return nil, err - } - - if err == nil && useCache { - // do not report cache writes as uploads. - if puterr := c.cacheStorage.PutBlob( - blob.WithUploadProgressCallback(ctx, nil), - blob.ID(cacheKey), - gather.FromSlice(hmac.Append(b, c.hmacSecret)), - ); puterr != nil { - stats.Record(ctx, metricContentCacheStoreErrors.M(1)) - log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr) - } - } - - return b, err -} - -func (c *contentCacheWithStorage) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte { - b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1) - if err == nil { - b, err = hmac.VerifyAndStrip(b, c.hmacSecret) - if err == nil { - if t, ok := c.cacheStorage.(contentToucher); ok { - t.TouchBlob(ctx, blob.ID(cacheKey), c.touchThreshold) //nolint:errcheck - } - - // retrieved from cache and HMAC valid - return b - } - - // ignore malformed contents - log(ctx).Warningf("malformed content %v: %v", cacheKey, err) - - return nil - } - - if err != blob.ErrBlobNotFound { - log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err) - } - - return nil -} - -func (c *contentCacheWithStorage) close() { - close(c.closed) - c.asyncWG.Wait() -} - -func (c *contentCacheWithStorage) sweepDirectoryPeriodically(ctx context.Context) { - defer c.asyncWG.Done() - - for { - select { - case <-c.closed: - return - - case <-time.After(c.sweepFrequency): - err := c.sweepDirectory(ctx) - if err != nil { - log(ctx).Warningf("contentCacheWithStorage sweep failed: %v", err) - } - } - } -} - -// A contentMetadataHeap implements heap.Interface and holds blob.Metadata. -type contentMetadataHeap []blob.Metadata - -func (h contentMetadataHeap) Len() int { return len(h) } - -func (h contentMetadataHeap) Less(i, j int) bool { - return h[i].Timestamp.Before(h[j].Timestamp) -} - -func (h contentMetadataHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *contentMetadataHeap) Push(x interface{}) { - *h = append(*h, x.(blob.Metadata)) -} - -func (h *contentMetadataHeap) Pop() interface{} { - old := *h - n := len(old) - item := old[n-1] - *h = old[0 : n-1] - - return item -} - -func (c *contentCacheWithStorage) sweepDirectory(ctx context.Context) (err error) { - t0 := time.Now() // allow:no-inject-time - - var h contentMetadataHeap - - var totalRetainedSize int64 - - err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - heap.Push(&h, it) - totalRetainedSize += it.Length - - if totalRetainedSize > c.maxSizeBytes { - oldest := heap.Pop(&h).(blob.Metadata) - if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { - log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr) - } else { - totalRetainedSize -= oldest.Length - } - } - return nil - }) - if err != nil { - return errors.Wrap(err, "error listing cache") - } - - log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time - - return nil -} - -func newContentCache(ctx context.Context, st blob.Storage, caching CachingOptions, maxBytes int64, subdir string) (contentCache, error) { +func newCacheStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (blob.Storage, error) { var cacheStorage blob.Storage var err error - if maxBytes > 0 && caching.CacheDirectory != "" { - contentCacheDir := filepath.Join(caching.CacheDirectory, subdir) + if maxBytes > 0 && cacheDir != "" { + contentCacheDir := filepath.Join(cacheDir, subdir) if _, err = os.Stat(contentCacheDir); os.IsNotExist(err) { if mkdirerr := os.MkdirAll(contentCacheDir, 0700); mkdirerr != nil { @@ -227,49 +40,5 @@ func newContentCache(ctx context.Context, st blob.Storage, caching CachingOption } } - return newContentCacheWithCacheStorage(ctx, st, cacheStorage, maxBytes, caching, defaultTouchThreshold, defaultSweepFrequency) -} - -func newContentCacheWithCacheStorage(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, caching CachingOptions, touchThreshold, sweepFrequency time.Duration) (contentCache, error) { - if cacheStorage == nil { - return passthroughContentCache{st}, nil - } - - c := &contentCacheWithStorage{ - st: st, - cacheStorage: cacheStorage, - maxSizeBytes: maxSizeBytes, - hmacSecret: append([]byte(nil), caching.HMACSecret...), - closed: make(chan struct{}), - touchThreshold: touchThreshold, - sweepFrequency: sweepFrequency, - } - - // errGood is a marker error to stop blob iteration quickly, does not - // indicate any problem. - var errGood = errors.Errorf("good") - - // verify that cache storage is functional by listing from it - if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - return errGood - }); err != nil && err != errGood { - return nil, errors.Wrap(err, "unable to open cache") - } - - c.asyncWG.Add(1) - - go c.sweepDirectoryPeriodically(ctx) - - return c, nil -} - -// passthroughContentCache is a contentCache which does no caching. -type passthroughContentCache struct { - st blob.Storage -} - -func (c passthroughContentCache) close() {} - -func (c passthroughContentCache) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { - return c.st.GetBlob(ctx, blobID, offset, length) + return cacheStorage, nil } diff --git a/repo/content/content_cache_base.go b/repo/content/content_cache_base.go new file mode 100644 index 000000000..0e390e848 --- /dev/null +++ b/repo/content/content_cache_base.go @@ -0,0 +1,187 @@ +package content + +import ( + "container/heap" + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" +) + +const ( + defaultSweepFrequency = 1 * time.Minute + defaultTouchThreshold = 10 * time.Minute + mutexAgeCutoff = 5 * time.Minute +) + +type mutextLRU struct { + mu *sync.Mutex + lastUsedNanoseconds int64 +} + +// cacheBase provides common implementation for per-content and per-blob caches +type cacheBase struct { + cacheStorage blob.Storage + maxSizeBytes int64 + sweepFrequency time.Duration + touchThreshold time.Duration + + asyncWG sync.WaitGroup + closed chan struct{} + + // stores key to *mutexLRU mapping which is periodically garbage-collected + // and used to eliminate/minimize concurrent fetches of the same cached item. + loadingMap sync.Map +} + +type contentToucher interface { + TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error +} + +func (c *cacheBase) touch(ctx context.Context, blobID blob.ID) { + if t, ok := c.cacheStorage.(contentToucher); ok { + t.TouchBlob(ctx, blobID, c.touchThreshold) //nolint:errcheck + } +} + +func (c *cacheBase) close() { + close(c.closed) + c.asyncWG.Wait() +} + +func (c *cacheBase) perItemMutex(key interface{}) *sync.Mutex { + now := time.Now().UnixNano() // allow:no-inject-time + + v, ok := c.loadingMap.Load(key) + if !ok { + v, _ = c.loadingMap.LoadOrStore(key, &mutextLRU{ + mu: &sync.Mutex{}, + lastUsedNanoseconds: now, + }) + } + + m := v.(*mutextLRU) + atomic.StoreInt64(&m.lastUsedNanoseconds, now) + + return m.mu +} + +func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) { + defer c.asyncWG.Done() + + for { + select { + case <-c.closed: + return + + case <-time.After(c.sweepFrequency): + c.sweepMutexes() + + if err := c.sweepDirectory(ctx); err != nil { + log(ctx).Warningf("cacheBase sweep failed: %v", err) + } + } + } +} + +// A contentMetadataHeap implements heap.Interface and holds blob.Metadata. +type contentMetadataHeap []blob.Metadata + +func (h contentMetadataHeap) Len() int { return len(h) } + +func (h contentMetadataHeap) Less(i, j int) bool { + return h[i].Timestamp.Before(h[j].Timestamp) +} + +func (h contentMetadataHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *contentMetadataHeap) Push(x interface{}) { + *h = append(*h, x.(blob.Metadata)) +} + +func (h *contentMetadataHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + + return item +} + +func (c *cacheBase) sweepDirectory(ctx context.Context) (err error) { + t0 := time.Now() // allow:no-inject-time + + var h contentMetadataHeap + + var totalRetainedSize int64 + + err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + heap.Push(&h, it) + totalRetainedSize += it.Length + + if totalRetainedSize > c.maxSizeBytes { + oldest := heap.Pop(&h).(blob.Metadata) + if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { + log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr) + } else { + totalRetainedSize -= oldest.Length + } + } + return nil + }) + if err != nil { + return errors.Wrap(err, "error listing cache") + } + + log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time + + return nil +} + +func (c *cacheBase) sweepMutexes() { + cutoffTime := time.Now().Add(-mutexAgeCutoff).UnixNano() // allow:no-inject-time + + // remove from loadingMap all items that have not been touched recently. + // since the mutexes are only for performance (to avoid loading duplicates) + // and not for correctness, it's always safe to remove them. + c.loadingMap.Range(func(key, value interface{}) bool { + if m := value.(*mutextLRU); m.lastUsedNanoseconds < cutoffTime { + c.loadingMap.Delete(key) + } + + return true + }) +} + +func newContentCacheBase(ctx context.Context, cacheStorage blob.Storage, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*cacheBase, error) { + c := &cacheBase{ + cacheStorage: cacheStorage, + maxSizeBytes: maxSizeBytes, + closed: make(chan struct{}), + touchThreshold: touchThreshold, + sweepFrequency: sweepFrequency, + } + + // errGood is a marker error to stop blob iteration quickly, does not + // indicate any problem. + var errGood = errors.Errorf("good") + + // verify that cache storage is functional by listing from it + if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + return errGood + }); err != nil && err != errGood { + return nil, errors.Wrap(err, "unable to open cache") + } + + c.asyncWG.Add(1) + + go c.sweepDirectoryPeriodically(ctx) + + return c, nil +} diff --git a/repo/content/content_cache_data.go b/repo/content/content_cache_data.go new file mode 100644 index 000000000..928c77e28 --- /dev/null +++ b/repo/content/content_cache_data.go @@ -0,0 +1,114 @@ +package content + +import ( + "context" + + "github.com/pkg/errors" + "go.opencensus.io/stats" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/hmac" + "github.com/kopia/kopia/repo/blob" +) + +type contentCacheForData struct { + *cacheBase + + st blob.Storage + hmacSecret []byte +} + +func adjustCacheKey(cacheKey cacheKey) cacheKey { + // content IDs with odd length have a single-byte prefix. + // move the prefix to the end of cache key to make sure the top level shard is spread 256 ways. + if len(cacheKey)%2 == 1 { + return cacheKey[1:] + cacheKey[0:1] + } + + return cacheKey +} + +func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { + cacheKey = adjustCacheKey(cacheKey) + + useCache := shouldUseContentCache(ctx) + if useCache { + if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil { + stats.Record(ctx, + metricContentCacheHitCount.M(1), + metricContentCacheHitBytes.M(int64(len(b))), + ) + + return b, nil + } + } + + stats.Record(ctx, metricContentCacheMissCount.M(1)) + + b, err := c.st.GetBlob(ctx, blobID, offset, length) + if err != nil { + stats.Record(ctx, metricContentCacheMissErrors.M(1)) + } else { + stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b)))) + } + + if err == blob.ErrBlobNotFound { + // not found in underlying storage + return nil, err + } + + if err == nil && useCache { + // do not report cache writes as uploads. + if puterr := c.cacheStorage.PutBlob( + blob.WithUploadProgressCallback(ctx, nil), + blob.ID(cacheKey), + gather.FromSlice(hmac.Append(b, c.hmacSecret)), + ); puterr != nil { + stats.Record(ctx, metricContentCacheStoreErrors.M(1)) + log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr) + } + } + + return b, err +} + +func (c *contentCacheForData) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte { + b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1) + if err == nil { + b, err = hmac.VerifyAndStrip(b, c.hmacSecret) + if err == nil { + c.touch(ctx, blob.ID(cacheKey)) + + // retrieved from cache and HMAC valid + return b + } + + // ignore malformed contents + log(ctx).Warningf("malformed content %v: %v", cacheKey, err) + + return nil + } + + if err != blob.ErrBlobNotFound { + log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err) + } + + return nil +} + +func newContentCacheForData(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) { + if cacheStorage == nil { + return passthroughContentCache{st}, nil + } + + cb, err := newContentCacheBase(ctx, cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency) + if err != nil { + return nil, errors.Wrap(err, "unable to create base cache") + } + + return &contentCacheForData{ + st: st, + hmacSecret: append([]byte(nil), hmacSecret...), + cacheBase: cb, + }, nil +} diff --git a/repo/content/content_cache_metadata.go b/repo/content/content_cache_metadata.go new file mode 100644 index 000000000..b08c40ae6 --- /dev/null +++ b/repo/content/content_cache_metadata.go @@ -0,0 +1,128 @@ +package content + +import ( + "context" + + "github.com/pkg/errors" + "go.opencensus.io/stats" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" +) + +const metadataCacheSyncParallelism = 16 + +type contentCacheForMetadata struct { + *cacheBase + + st blob.Storage +} + +// sync synchronizes metadata cache with all blobs found in the storage. +func (c *contentCacheForMetadata) sync(ctx context.Context) error { + sem := make(chan struct{}, metadataCacheSyncParallelism) + + log(ctx).Debugf("synchronizing metadata cache...") + defer log(ctx).Debugf("finished synchronizing metadata cache.") + + var eg errgroup.Group + + // list all blobs and fetch contents into cache in parallel. + if err := c.st.ListBlobs(ctx, PackBlobIDPrefixSpecial, func(bm blob.Metadata) error { + // acquire semaphore + sem <- struct{}{} + eg.Go(func() error { + defer func() { + <-sem + }() + + _, err := c.getContent(ctx, "dummy", bm.BlobID, 0, 1) + return err + }) + + return nil + }); err != nil { + return errors.Wrap(err, "error listing blobs") + } + + return eg.Wait() +} + +func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { + m := c.perItemMutex(blobID) + m.Lock() + defer m.Unlock() + + useCache := shouldUseContentCache(ctx) + if useCache { + if v, err := c.cacheBase.cacheStorage.GetBlob(ctx, blobID, offset, length); err == nil { + // cache hit + stats.Record(ctx, + metricContentCacheHitCount.M(1), + metricContentCacheHitBytes.M(int64(len(v))), + ) + + return v, nil + } + } + + stats.Record(ctx, metricContentCacheMissCount.M(1)) + + // read the entire blob + log(ctx).Debugf("fetching metadata blob %q", blobID) + blobData, err := c.st.GetBlob(ctx, blobID, 0, -1) + + if err != nil { + stats.Record(ctx, metricContentCacheMissErrors.M(1)) + } else { + stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(blobData)))) + } + + if err == blob.ErrBlobNotFound { + // not found in underlying storage + return nil, err + } + + if err != nil { + return nil, err + } + + if useCache { + // store the whole blob in the cache, do not report cache writes as uploads. + if puterr := c.cacheStorage.PutBlob( + blob.WithUploadProgressCallback(ctx, nil), + blobID, + gather.FromSlice(blobData), + ); puterr != nil { + stats.Record(ctx, metricContentCacheStoreErrors.M(1)) + log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr) + } + } + + if offset == 0 && length == -1 { + return blobData, err + } + + if offset < 0 || offset+length > int64(len(blobData)) { + return nil, errors.Errorf("invalid (offset=%v,length=%v) for blob %q of size %v", offset, length, blobID, len(blobData)) + } + + return blobData[offset : offset+length], nil +} + +func newContentCacheForMetadata(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64) (contentCache, error) { + if cacheStorage == nil { + return passthroughContentCache{st}, nil + } + + cb, err := newContentCacheBase(ctx, cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency) + if err != nil { + return nil, errors.Wrap(err, "unable to create base cache") + } + + return &contentCacheForMetadata{ + st: st, + cacheBase: cb, + }, nil +} diff --git a/repo/content/content_cache_passthrough.go b/repo/content/content_cache_passthrough.go new file mode 100644 index 000000000..4217f3201 --- /dev/null +++ b/repo/content/content_cache_passthrough.go @@ -0,0 +1,18 @@ +package content + +import ( + "context" + + "github.com/kopia/kopia/repo/blob" +) + +// passthroughContentCache is a contentCache which does no caching. +type passthroughContentCache struct { + st blob.Storage +} + +func (c passthroughContentCache) close() {} + +func (c passthroughContentCache) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { + return c.st.GetBlob(ctx, blobID, offset, length) +} diff --git a/repo/content/content_cache_test.go b/repo/content/content_cache_test.go index a3ca00749..4fa3e4811 100644 --- a/repo/content/content_cache_test.go +++ b/repo/content/content_cache_test.go @@ -50,9 +50,14 @@ func TestCacheExpiration(t *testing.T) { underlyingStorage := newUnderlyingStorageForContentCacheTesting(t) - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, cacheStorage, 10000, CachingOptions{}, 0, 500*time.Millisecond) + cb, err := newContentCacheBase(testlogging.Context(t), cacheStorage, 10000, 0, 500*time.Millisecond) if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("unable to create base cache: %v", err) + } + + cache := &contentCacheForData{ + st: underlyingStorage, + cacheBase: cb, } defer cache.close() @@ -105,10 +110,14 @@ func TestDiskContentCache(t *testing.T) { defer os.RemoveAll(tmpDir) - cache, err := newContentCache(ctx, newUnderlyingStorageForContentCacheTesting(t), CachingOptions{ - CacheDirectory: tmpDir, - }, 10000, "contents") + const maxBytes = 10000 + cacheStorage, err := newCacheStorageOrNil(ctx, tmpDir, maxBytes, "contents") + if err != nil { + t.Fatal(err) + } + + cache, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -154,12 +163,12 @@ func verifyContentCache(t *testing.T, cache contentCache) { } } - verifyStorageContentList(t, cache.(*contentCacheWithStorage).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5") + verifyStorageContentList(t, cache.(*contentCacheForData).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5") }) t.Run("DataCorruption", func(t *testing.T) { var cacheKey blob.ID = "f0f0f1x" - d, err := cache.(*contentCacheWithStorage).cacheStorage.GetBlob(ctx, cacheKey, 0, -1) + d, err := cache.(*contentCacheForData).cacheStorage.GetBlob(ctx, cacheKey, 0, -1) if err != nil { t.Fatalf("unable to retrieve data from cache: %v", err) } @@ -167,7 +176,7 @@ func verifyContentCache(t *testing.T, cache contentCache) { // corrupt the data and write back d[0] ^= 1 - if puterr := cache.(*contentCacheWithStorage).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil { + if puterr := cache.(*contentCacheForData).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil { t.Fatalf("unable to write corrupted content: %v", puterr) } @@ -197,13 +206,13 @@ func TestCacheFailureToOpen(t *testing.T) { } // Will fail because of ListBlobs failure. - _, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour) + _, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err == nil || !strings.Contains(err.Error(), someError.Error()) { t.Errorf("invalid error %v, wanted: %v", err, someError) } // ListBlobs fails only once, next time it succeeds. - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 100*time.Millisecond) + cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -221,7 +230,7 @@ func TestCacheFailureToWrite(t *testing.T) { Base: cacheStorage, } - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour) + cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -264,7 +273,7 @@ func TestCacheFailureToRead(t *testing.T) { Base: cacheStorage, } - cache, err := newContentCacheWithCacheStorage(testlogging.Context(t), underlyingStorage, faultyCache, 10000, CachingOptions{}, 0, 5*time.Hour) + cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index bd2a3b884..efe10f3eb 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -650,6 +650,17 @@ func (bm *Manager) Refresh(ctx context.Context) (bool, error) { return updated, err } +// SyncMetadataCache synchronizes metadata cache with metadata blobs in storage. +func (bm *Manager) SyncMetadataCache(ctx context.Context) error { + if cm, ok := bm.metadataCache.(*contentCacheForMetadata); ok { + return cm.sync(ctx) + } + + log(ctx).Debugf("metadata cache not enabled") + + return nil +} + // ManagerOptions are the optional parameters for manager creation type ManagerOptions struct { RepositoryFormatBytes []byte @@ -680,7 +691,12 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp return nil, err } - dataCache, err := newContentCache(ctx, st, caching, caching.MaxCacheSizeBytes, "contents") + dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") + if err != nil { + return nil, errors.Wrap(err, "unable to initialize data cache storage") + } + + dataCache, err := newContentCacheForData(ctx, st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) if err != nil { return nil, errors.Wrap(err, "unable to initialize content cache") } @@ -690,7 +706,12 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp metadataCacheSize = caching.MaxCacheSizeBytes } - metadataCache, err := newContentCache(ctx, st, caching, metadataCacheSize, "metadata") + metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") + if err != nil { + return nil, errors.Wrap(err, "unable to initialize data cache storage") + } + + metadataCache, err := newContentCacheForMetadata(ctx, st, metadataCacheStorage, metadataCacheSize) if err != nil { return nil, errors.Wrap(err, "unable to initialize metadata cache") } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 5119e6744..22473ca9c 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -321,7 +321,7 @@ func (bm *lockFreeManager) IndexBlobs(ctx context.Context) ([]IndexBlobInfo, err } func (bm *lockFreeManager) getIndexBlobInternal(ctx context.Context, blobID blob.ID) ([]byte, error) { - payload, err := bm.contentCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) + payload, err := bm.metadataCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) if err != nil { return nil, err }