From 4e705726fe05c780ee87f20450da7aabf3270962 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 1 Mar 2021 06:15:39 -0800 Subject: [PATCH] Implemented caching for server connections (#845) * cache: refactored reusable portion of cache into separate package * repo: plumbed through caching for remote repository clients * repo: plumb through cache in the unit tests * cache: ensure we only allow absolute cache paths, fixed cache path resolution for remote repositories --- .../cache/cache_metrics.go | 43 ++- internal/cache/cache_storage.go | 49 ++++ internal/cache/persistent_lru_cache.go | 251 ++++++++++++++++++ internal/cache/persistent_lru_cache_test.go | 99 +++++++ internal/cache/storage_protection.go | 110 ++++++++ internal/cache/storage_protection_test.go | 40 +++ internal/server/server_test.go | 8 +- repo/api_server_repository.go | 23 +- repo/content/committed_read_manager.go | 5 +- repo/content/content_cache.go | 32 --- repo/content/content_cache_base.go | 202 -------------- repo/content/content_cache_data.go | 83 +----- repo/content/content_cache_metadata.go | 61 +++-- repo/content/content_cache_test.go | 82 +++--- repo/grpc_repository_client.go | 31 ++- repo/open.go | 55 +++- .../api_server_repository_test.go | 4 +- 17 files changed, 760 insertions(+), 418 deletions(-) rename repo/content/content_cache_metrics.go => internal/cache/cache_metrics.go (52%) create mode 100644 internal/cache/cache_storage.go create mode 100644 internal/cache/persistent_lru_cache.go create mode 100644 internal/cache/persistent_lru_cache_test.go create mode 100644 internal/cache/storage_protection.go create mode 100644 internal/cache/storage_protection_test.go delete mode 100644 repo/content/content_cache_base.go diff --git a/repo/content/content_cache_metrics.go b/internal/cache/cache_metrics.go similarity index 52% rename from repo/content/content_cache_metrics.go rename to internal/cache/cache_metrics.go index 1da6c37bb..27dda8de1 100644 --- a/repo/content/content_cache_metrics.go +++ b/internal/cache/cache_metrics.go @@ -1,57 +1,72 @@ -package content +package cache import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) -// content cache metrics. +// cache metrics. var ( - metricContentCacheHitCount = stats.Int64( + MetricHitCount = stats.Int64( "kopia/content/cache/hit_count", "Number of time content was retrieved from the cache", stats.UnitDimensionless, ) - metricContentCacheHitBytes = stats.Int64( + MetricHitBytes = stats.Int64( "kopia/content/cache/hit_bytes", "Number of bytes retrieved from the cache", stats.UnitBytes, ) - metricContentCacheMissCount = stats.Int64( + MetricMissCount = stats.Int64( "kopia/content/cache/miss_count", "Number of time content was not found in the cache and fetched from the storage", stats.UnitDimensionless, ) - metricContentCacheMissBytes = stats.Int64( + MetricMalformedCacheDataCount = stats.Int64( + "kopia/content/cache/malformed", + "Number of times malformed content was read from the cache", + stats.UnitDimensionless, + ) + + MetricMissBytes = stats.Int64( "kopia/content/cache/missed_bytes", "Number of bytes retrieved from the underlying storage", stats.UnitBytes, ) - metricContentCacheMissErrors = stats.Int64( + MetricMissErrors = stats.Int64( "kopia/content/cache/miss_error_count", "Number of time content could not be found in the underlying storage", stats.UnitDimensionless, ) - metricContentCacheStoreErrors = stats.Int64( + MetricStoreErrors = stats.Int64( "kopia/content/cache/store_error_count", "Number of time content could not be saved in the cache", stats.UnitDimensionless, ) ) +func simpleAggregation(m stats.Measure, agg *view.Aggregation) *view.View { + return &view.View{ + Name: m.Name(), + Aggregation: agg, + Description: m.Description(), + Measure: m, + } +} + func init() { if err := view.Register( - simpleAggregation(metricContentCacheHitCount, view.Count()), - simpleAggregation(metricContentCacheHitBytes, view.Sum()), - simpleAggregation(metricContentCacheMissCount, view.Count()), - simpleAggregation(metricContentCacheMissBytes, view.Sum()), - simpleAggregation(metricContentCacheMissErrors, view.Count()), - simpleAggregation(metricContentCacheStoreErrors, view.Count()), + simpleAggregation(MetricHitCount, view.Count()), + simpleAggregation(MetricHitBytes, view.Sum()), + simpleAggregation(MetricMissCount, view.Count()), + simpleAggregation(MetricMissBytes, view.Sum()), + simpleAggregation(MetricMissErrors, view.Count()), + simpleAggregation(MetricStoreErrors, view.Count()), ); err != nil { panic("unable to register opencensus views: " + err.Error()) } diff --git a/internal/cache/cache_storage.go b/internal/cache/cache_storage.go new file mode 100644 index 000000000..0b0b59df3 --- /dev/null +++ b/internal/cache/cache_storage.go @@ -0,0 +1,49 @@ +package cache + +import ( + "context" + "os" + "path/filepath" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/ctxutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/filesystem" +) + +// Storage is the storage interface required by the cache and is implemented by the filesystem Storage. +type Storage interface { + blob.Storage + TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error +} + +// NewStorageOrNil returns cache.Storage backed by the provided directory. +func NewStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (Storage, error) { + if maxBytes <= 0 || cacheDir == "" { + return nil, nil + } + + if !filepath.IsAbs(cacheDir) { + return nil, errors.Errorf("cache dir %q was not absolute", cacheDir) + } + + contentCacheDir := filepath.Join(cacheDir, subdir) + + if _, err := os.Stat(contentCacheDir); os.IsNotExist(err) { + if mkdirerr := os.MkdirAll(contentCacheDir, 0o700); mkdirerr != nil { + return nil, errors.Wrap(mkdirerr, "error creating cache directory") + } + } + + fs, err := filesystem.New(ctxutil.Detach(ctx), &filesystem.Options{ + Path: contentCacheDir, + DirectoryShards: []int{2}, + }) + if err != nil { + return nil, errors.Wrap(err, "error initializing filesystem cache") + } + + return fs.(Storage), nil +} diff --git a/internal/cache/persistent_lru_cache.go b/internal/cache/persistent_lru_cache.go new file mode 100644 index 000000000..a745fe38c --- /dev/null +++ b/internal/cache/persistent_lru_cache.go @@ -0,0 +1,251 @@ +// Package cache implements durable on-disk cache with LRU expiration. +package cache + +import ( + "container/heap" + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "go.opencensus.io/stats" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/logging" +) + +var log = logging.GetContextLoggerFunc("cache") + +const ( + // DefaultSweepFrequency is how frequently the contents of cache are sweeped to remove excess data. + DefaultSweepFrequency = 1 * time.Minute + + // DefaultTouchThreshold specifies the resolution of timestamps used to determine which cache items + // to expire. This helps cache storage writes on frequently accessed items. + DefaultTouchThreshold = 10 * time.Minute +) + +// PersistentCache provides persistent on-disk cache. +type PersistentCache struct { + anyChange int32 + + cacheStorage Storage + storageProtection StorageProtection + + maxSizeBytes int64 + sweepFrequency time.Duration + touchThreshold time.Duration + description string + + periodicSweepRunning sync.WaitGroup + periodicSweepClosed chan struct{} +} + +// GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function. +// The function also appends and verifies HMAC checksums using provided secret on all cached items to ensure data integrity. +func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func() ([]byte, error)) ([]byte, error) { + if c == nil { + // special case - also works on non-initialized cache pointer. + return fetch() + } + + if b := c.Get(ctx, key, 0, -1); b != nil { + return b, nil + } + + b, err := fetch() + if err != nil { + stats.Record(ctx, MetricMissErrors.M(1)) + + return nil, err + } + + stats.Record(ctx, MetricMissBytes.M(int64(len(b)))) + + c.Put(ctx, key, b) + + return b, nil +} + +// Get fetches the contents of a cached blob when (length < 0) or a subset of it (when length >= 0). +// returns nil if not found. +func (c *PersistentCache) Get(ctx context.Context, key string, offset, length int64) []byte { + if c == nil { + return nil + } + + if length >= 0 && !c.storageProtection.SupportsPartial() { + return nil + } + + v, err := c.cacheStorage.GetBlob(ctx, blob.ID(key), offset, length) + if err == nil { + vb, err := c.storageProtection.Verify(key, v) + if err == nil { + // cache hit + stats.Record(ctx, + MetricHitCount.M(1), + MetricHitBytes.M(int64(len(vb))), + ) + + // cache hit + c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.touchThreshold) //nolint:errcheck + + return vb + } + + // delete invalid blob + stats.Record(ctx, MetricMalformedCacheDataCount.M(1)) + + if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) { + log(ctx).Warningf("unable to delete %v entry %v: %v", c.description, key, err) + } + } + + // cache miss + stats.Record(ctx, MetricMissCount.M(1)) + + return nil +} + +// Put adds the provided key-value pair to the cache. +func (c *PersistentCache) Put(ctx context.Context, key string, data []byte) { + if c == nil { + return + } + + atomic.StoreInt32(&c.anyChange, 1) + + if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), gather.FromSlice(c.storageProtection.Protect(key, data))); err != nil { + stats.Record(ctx, MetricStoreErrors.M(1)) + + log(ctx).Warningf("unable to add %v to %v: %v", key, c.description, err) + } +} + +// Close closes the instance of persistent cache possibly waiting for at least one sweep to complete. +func (c *PersistentCache) Close(ctx context.Context) { + if c == nil { + return + } + + close(c.periodicSweepClosed) + c.periodicSweepRunning.Wait() + + // if we added anything to the cache in this sesion, run one last sweep before shutting down. + if atomic.LoadInt32(&c.anyChange) == 1 { + if err := c.sweepDirectory(ctx); err != nil { + log(ctx).Warningf("error during final sweep of the %v: %v", c.description, err) + } + } +} + +func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) { + defer c.periodicSweepRunning.Done() + + for { + select { + case <-c.periodicSweepClosed: + return + + case <-time.After(c.sweepFrequency): + if err := c.sweepDirectory(ctx); err != nil { + log(ctx).Warningf("error during periodic sweep of %v: %v", c.description, err) + } + } + } +} + +// A contentMetadataHeap implements heap.Interface and holds blob.Metadata. +type contentMetadataHeap []blob.Metadata + +func (h contentMetadataHeap) Len() int { return len(h) } + +func (h contentMetadataHeap) Less(i, j int) bool { + return h[i].Timestamp.Before(h[j].Timestamp) +} + +func (h contentMetadataHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *contentMetadataHeap) Push(x interface{}) { + *h = append(*h, x.(blob.Metadata)) +} + +func (h *contentMetadataHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + + return item +} + +func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) { + t0 := clock.Now() + + var h contentMetadataHeap + + var totalRetainedSize int64 + + err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + heap.Push(&h, it) + totalRetainedSize += it.Length + + if totalRetainedSize > c.maxSizeBytes { + oldest := heap.Pop(&h).(blob.Metadata) + if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { + log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr) + } else { + totalRetainedSize -= oldest.Length + } + } + return nil + }) + if err != nil { + return errors.Wrapf(err, "error listing %v", c.description) + } + + log(ctx).Debugf("finished sweeping %v in %v and retained %v/%v bytes (%v %%)", c.description, clock.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) + + return nil +} + +// NewPersistentCache creates the persistent cache in the provided storage. +func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection StorageProtection, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*PersistentCache, error) { + if storageProtection == nil { + storageProtection = nullStorageProtection{} + } + + c := &PersistentCache{ + cacheStorage: cacheStorage, + maxSizeBytes: maxSizeBytes, + periodicSweepClosed: make(chan struct{}), + touchThreshold: touchThreshold, + sweepFrequency: sweepFrequency, + description: description, + storageProtection: storageProtection, + } + + // errGood is a marker error to stop blob iteration quickly, does not + // indicate any problem. + errGood := errors.Errorf("good") + + // verify that cache storage is functional by listing from it + if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { + // nolint:wrapcheck + return errGood + }); err != nil && !errors.Is(err, errGood) { + return nil, errors.Wrapf(err, "unable to open %v", c.description) + } + + c.periodicSweepRunning.Add(1) + + go c.sweepDirectoryPeriodically(ctx) + + return c, nil +} diff --git a/internal/cache/persistent_lru_cache_test.go b/internal/cache/persistent_lru_cache_test.go new file mode 100644 index 000000000..5cd09ddf1 --- /dev/null +++ b/internal/cache/persistent_lru_cache_test.go @@ -0,0 +1,99 @@ +package cache_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/cache" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" +) + +func TestPersistentLRUCache(t *testing.T) { + cacheDir := testutil.TempDirectory(t) + ctx := testlogging.Context(t) + + const maxSizeBytes = 1000 + + cs, err := cache.NewStorageOrNil(ctx, cacheDir, maxSizeBytes, "subdir") + if err != nil { + t.Fatal(err) + } + + pc, err := cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + if err != nil { + t.Fatal(err) + } + + if got := pc.Get(ctx, "key", 0, -1); got != nil { + t.Fatalf("unexpected cache hit on empty cache: %x", got) + } + + someData := bytes.Repeat([]byte{1}, 300) + + pc.Put(ctx, "key1", someData) + verifyBlobExists(ctx, t, cs, "key1") + + // sleep between adding key1 and the rest to make it easily the oldest + // even if the filesystem is not very precise keeping time. + time.Sleep(2 * time.Second) + pc.Put(ctx, "key2", someData) + verifyBlobExists(ctx, t, cs, "key2") + pc.Put(ctx, "key3", someData) + verifyBlobExists(ctx, t, cs, "key3") + pc.Put(ctx, "key4", someData) + verifyBlobExists(ctx, t, cs, "key4") + + if got, want := pc.Get(ctx, "key2", 0, -1), someData; !bytes.Equal(got, want) { + t.Fatalf("invalid data retrieved from cache: %x", got) + } + + // final sweep is performed on close at which time key1 becomes candidate + // for expulsion from cache because it's the oldest and we have 1200 bytes in the cache + // but the limit is only 1000. + pc.Close(ctx) + + verifyBlobDoesNotExist(ctx, t, cs, "key1") + verifyBlobExists(ctx, t, cs, "key2") + verifyBlobExists(ctx, t, cs, "key3") + verifyBlobExists(ctx, t, cs, "key4") + + pc, err = cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) + if err != nil { + t.Fatal(err) + } + + verifyCached(ctx, t, pc, "key1", nil) + verifyCached(ctx, t, pc, "key2", someData) + verifyCached(ctx, t, pc, "key3", someData) + verifyCached(ctx, t, pc, "key4", someData) +} + +func verifyCached(ctx context.Context, t *testing.T, pc *cache.PersistentCache, key string, want []byte) { + t.Helper() + + if got := pc.Get(ctx, key, 0, -1); !bytes.Equal(got, want) { + t.Fatalf("invalid cached result for %v: %x, want %x", key, got, want) + } +} + +func verifyBlobExists(ctx context.Context, t *testing.T, st blob.Storage, blobID blob.ID) { + t.Helper() + + if _, err := st.GetMetadata(ctx, blobID); err != nil { + t.Fatalf("blob %v error: %v", blobID, err) + } +} + +func verifyBlobDoesNotExist(ctx context.Context, t *testing.T, st blob.Storage, blobID blob.ID) { + t.Helper() + + if _, err := st.GetMetadata(ctx, blobID); !errors.Is(err, blob.ErrBlobNotFound) { + t.Fatalf("unexpected blob %v error: %v", blobID, err) + } +} diff --git a/internal/cache/storage_protection.go b/internal/cache/storage_protection.go new file mode 100644 index 000000000..caf589dfd --- /dev/null +++ b/internal/cache/storage_protection.go @@ -0,0 +1,110 @@ +package cache + +import ( + "crypto/sha256" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/hmac" + "github.com/kopia/kopia/repo/encryption" +) + +// encryptionProtectionAlgorithm is the authenticated encryption algorithm used by authenticatedEncryptionProtection. +var encryptionProtectionAlgorithm = "AES256-GCM-HMAC-SHA256" + +// StorageProtection encapsulates protection (HMAC and/or encryption) applied to local cache items. +type StorageProtection interface { + SupportsPartial() bool + Protect(id string, b []byte) []byte + Verify(id string, b []byte) ([]byte, error) +} + +type nullStorageProtection struct{} + +func (nullStorageProtection) Protect(id string, b []byte) []byte { + return b +} + +func (nullStorageProtection) Verify(id string, b []byte) ([]byte, error) { + return b, nil +} + +func (nullStorageProtection) SupportsPartial() bool { + return true +} + +// NoProtection returns implementation of StorageProtection that offers no protection. +func NoProtection() StorageProtection { + return nullStorageProtection{} +} + +type checksumProtection struct { + Secret []byte +} + +func (p checksumProtection) Protect(id string, b []byte) []byte { + return hmac.Append(b, p.Secret) +} + +func (p checksumProtection) Verify(id string, b []byte) ([]byte, error) { + return hmac.VerifyAndStrip(b, p.Secret) +} + +func (checksumProtection) SupportsPartial() bool { + return false +} + +// ChecksumProtection returns StorageProtection that protects cached data using HMAC checksums without encryption. +func ChecksumProtection(key []byte) StorageProtection { + return checksumProtection{key} +} + +type authenticatedEncryptionProtection struct { + e encryption.Encryptor +} + +func (p authenticatedEncryptionProtection) deriveIV(id string) []byte { + contentID := sha256.Sum256([]byte(id)) + return contentID[:] +} + +func (p authenticatedEncryptionProtection) Protect(id string, b []byte) []byte { + c, err := p.e.Encrypt(nil, b, p.deriveIV(id)) + if err != nil { + panic("encryption unexpectedly failed: " + err.Error()) + } + + return c +} + +func (authenticatedEncryptionProtection) SupportsPartial() bool { + return false +} + +func (p authenticatedEncryptionProtection) Verify(id string, b []byte) ([]byte, error) { + return p.e.Decrypt(nil, b, p.deriveIV(id)) +} + +type authenticatedEncryptionProtectionKey []byte + +func (k authenticatedEncryptionProtectionKey) GetEncryptionAlgorithm() string { + return encryptionProtectionAlgorithm +} + +func (k authenticatedEncryptionProtectionKey) GetMasterKey() []byte { + return k +} + +// AuthenticatedEncryptionProtection returns StorageProtection that protects cached data using authenticated encryption. +func AuthenticatedEncryptionProtection(key []byte) (StorageProtection, error) { + e, err := encryption.CreateEncryptor(authenticatedEncryptionProtectionKey(key)) + if err != nil { + return nil, errors.Wrap(err, "unable to create encryptor") + } + + if !e.IsAuthenticated() { + return nil, errors.Wrap(err, "encryption is not authenticated!") + } + + return authenticatedEncryptionProtection{e}, nil +} diff --git a/internal/cache/storage_protection_test.go b/internal/cache/storage_protection_test.go new file mode 100644 index 000000000..9195253e4 --- /dev/null +++ b/internal/cache/storage_protection_test.go @@ -0,0 +1,40 @@ +package cache_test + +import ( + "bytes" + "testing" + + "github.com/kopia/kopia/internal/cache" +) + +func TestHMACStorageProtection(t *testing.T) { + testStorageProtection(t, cache.ChecksumProtection([]byte{1, 2, 3, 4})) +} + +func TestEncryptionStorageProtection(t *testing.T) { + e, err := cache.AuthenticatedEncryptionProtection([]byte{1}) + if err != nil { + t.Fatal(err) + } + + testStorageProtection(t, e) +} + +// nolint:thelper +func testStorageProtection(t *testing.T, sp cache.StorageProtection) { + payload := []byte{0, 1, 2, 3, 4} + + protected := sp.Protect("x", payload) + + unprotected, err := sp.Verify("x", protected) + if err != nil { + t.Fatal(err) + } + + if got, want := unprotected, payload; !bytes.Equal(got, want) { + t.Fatalf("invalid unprotected payload %x, wanted %x", got, want) + } + + // flip one bit + protected[0] ^= 1 +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go index d59432287..d437bf609 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -16,6 +16,7 @@ "github.com/kopia/kopia/internal/repotesting" "github.com/kopia/kopia/internal/server" "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/manifest" @@ -28,6 +29,8 @@ testHostname = "bar" testPassword = "123" testPathname = "/tmp/path" + + maxCacheSizeBytes = 1e6 ) // nolint:thelper @@ -85,6 +88,9 @@ func testServer(t *testing.T, disableGRPC bool) { rep, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{ Username: testUsername, Hostname: testHostname, + }, &content.CachingOptions{ + CacheDirectory: testutil.TempDirectory(t), + MaxCacheSizeBytes: maxCacheSizeBytes, }, testPassword) if err != nil { t.Fatal(err) @@ -102,7 +108,7 @@ func TestGPRServer_AuthenticationError(t *testing.T) { if _, err := repo.OpenGRPCAPIRepository(ctx, apiServerInfo, repo.ClientOptions{ Username: "bad-username", Hostname: "bad-hostname", - }, "bad-password"); err == nil { + }, nil, "bad-password"); err == nil { t.Fatal("unexpected success when connecting with invalid username") } } diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index c1205d91d..a885078ee 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -14,6 +14,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/apiclient" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/remoterepoapi" "github.com/kopia/kopia/repo/content" @@ -38,6 +39,8 @@ type apiServerRepository struct { cliOpts ClientOptions omgr *object.Manager wso WriteSessionOptions + + contentCache *cache.PersistentCache } func (r *apiServerRepository) APIServerURL() string { @@ -160,13 +163,15 @@ func (r *apiServerRepository) ContentInfo(ctx context.Context, contentID content } func (r *apiServerRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { - var result []byte + return r.contentCache.GetOrLoad(ctx, string(contentID), func() ([]byte, error) { + var result []byte - if err := r.cli.Get(ctx, "contents/"+string(contentID), content.ErrContentNotFound, &result); err != nil { - return nil, errors.Wrap(err, "GetContent") - } + if err := r.cli.Get(ctx, "contents/"+string(contentID), content.ErrContentNotFound, &result); err != nil { + return nil, errors.Wrap(err, "GetContent") + } - return result, nil + return result, nil + }) } func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) { @@ -209,7 +214,7 @@ func (r *apiServerRepository) Close(ctx context.Context) error { var _ Repository = (*apiServerRepository)(nil) // openRestAPIRepository connects remote repository over Kopia API. -func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, password string) (Repository, error) { +func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string) (Repository, error) { cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{ BaseURL: si.BaseURL, TrustedServerCertificateFingerprint: si.TrustedServerCertificateFingerprint, @@ -222,8 +227,9 @@ func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien } rr := &apiServerRepository{ - cli: cli, - cliOpts: cliOpts, + cli: cli, + cliOpts: cliOpts, + contentCache: contentCache, wso: WriteSessionOptions{ OnUpload: func(i int64) {}, }, @@ -259,6 +265,7 @@ func ConnectAPIServer(ctx context.Context, configFile string, si *APIServerInfo, lc := LocalConfig{ APIServer: si, ClientOptions: opt.ClientOptions.ApplyDefaults(ctx, "API Server: "+si.BaseURL), + Caching: opt.CachingOptions.CloneOrDefault(), } d, err := json.MarshalIndent(&lc, "", " ") diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 46713336e..a7b4f969a 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -12,6 +12,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/buf" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/encryption" @@ -266,7 +267,7 @@ func (sm *SharedManager) verifyChecksum(data, contentID []byte) error { } func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *CachingOptions) error { - dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") + dataCacheStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") if err != nil { return errors.Wrap(err, "unable to initialize data cache storage") } @@ -281,7 +282,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca metadataCacheSize = caching.MaxCacheSizeBytes } - metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") + metadataCacheStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") if err != nil { return errors.Wrap(err, "unable to initialize data cache storage") } diff --git a/repo/content/content_cache.go b/repo/content/content_cache.go index 88a049bb1..6df89d12f 100644 --- a/repo/content/content_cache.go +++ b/repo/content/content_cache.go @@ -2,14 +2,8 @@ import ( "context" - "os" - "path/filepath" - "github.com/pkg/errors" - - "github.com/kopia/kopia/internal/ctxutil" "github.com/kopia/kopia/repo/blob" - "github.com/kopia/kopia/repo/blob/filesystem" ) type cacheKey string @@ -18,29 +12,3 @@ type contentCache interface { close(ctx context.Context) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) } - -func newCacheStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (blob.Storage, error) { - var cacheStorage blob.Storage - - var err error - - if maxBytes > 0 && cacheDir != "" { - contentCacheDir := filepath.Join(cacheDir, subdir) - - if _, err = os.Stat(contentCacheDir); os.IsNotExist(err) { - if mkdirerr := os.MkdirAll(contentCacheDir, 0o700); mkdirerr != nil { - return nil, errors.Wrap(mkdirerr, "error creating cache directory") - } - } - - cacheStorage, err = filesystem.New(ctxutil.Detach(ctx), &filesystem.Options{ - Path: contentCacheDir, - DirectoryShards: []int{2}, - }) - if err != nil { - return nil, errors.Wrap(err, "error initializing filesystem cache") - } - } - - return cacheStorage, nil -} diff --git a/repo/content/content_cache_base.go b/repo/content/content_cache_base.go deleted file mode 100644 index f9a7a56e2..000000000 --- a/repo/content/content_cache_base.go +++ /dev/null @@ -1,202 +0,0 @@ -package content - -import ( - "container/heap" - "context" - "sync" - "sync/atomic" - "time" - - "github.com/pkg/errors" - - "github.com/kopia/kopia/internal/clock" - "github.com/kopia/kopia/repo/blob" -) - -const ( - defaultSweepFrequency = 1 * time.Minute - defaultTouchThreshold = 10 * time.Minute - mutexAgeCutoff = 5 * time.Minute -) - -type mutexLRU struct { - // values aligned to 8-bytes due to atomic access - lastUsedNanoseconds int64 - - mu *sync.Mutex -} - -// cacheBase provides common implementation for per-content and per-blob caches. -type cacheBase struct { - anyChange int32 - - cacheStorage blob.Storage - maxSizeBytes int64 - sweepFrequency time.Duration - touchThreshold time.Duration - description string - - periodicSweepRunning sync.WaitGroup - periodicSweepClosed chan struct{} - - // stores key to *mutexLRU mapping which is periodically garbage-collected - // and used to eliminate/minimize concurrent fetches of the same cached item. - loadingMap sync.Map -} - -type blobToucher interface { - TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error -} - -func (c *cacheBase) touch(ctx context.Context, blobID blob.ID) { - if t, ok := c.cacheStorage.(blobToucher); ok { - t.TouchBlob(ctx, blobID, c.touchThreshold) //nolint:errcheck - } -} - -func (c *cacheBase) close(ctx context.Context) { - close(c.periodicSweepClosed) - c.periodicSweepRunning.Wait() - - // if we added anything to the cache in this sesion, run one last sweep before shutting down. - if atomic.LoadInt32(&c.anyChange) == 1 { - if err := c.sweepDirectory(ctx); err != nil { - log(ctx).Warningf("error during final sweep of the %v: %v", c.description, err) - } - } -} - -func (c *cacheBase) perItemMutex(key interface{}) *sync.Mutex { - now := clock.Now().UnixNano() - - v, ok := c.loadingMap.Load(key) - if !ok { - v, _ = c.loadingMap.LoadOrStore(key, &mutexLRU{ - mu: &sync.Mutex{}, - lastUsedNanoseconds: now, - }) - } - - m := v.(*mutexLRU) - atomic.StoreInt64(&m.lastUsedNanoseconds, now) - - return m.mu -} - -func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) { - defer c.periodicSweepRunning.Done() - - for { - select { - case <-c.periodicSweepClosed: - return - - case <-time.After(c.sweepFrequency): - c.sweepMutexes() - - if err := c.sweepDirectory(ctx); err != nil { - log(ctx).Warningf("error during periodic sweep of %v: %v", c.description, err) - } - } - } -} - -// A contentMetadataHeap implements heap.Interface and holds blob.Metadata. -type contentMetadataHeap []blob.Metadata - -func (h contentMetadataHeap) Len() int { return len(h) } - -func (h contentMetadataHeap) Less(i, j int) bool { - return h[i].Timestamp.Before(h[j].Timestamp) -} - -func (h contentMetadataHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *contentMetadataHeap) Push(x interface{}) { - *h = append(*h, x.(blob.Metadata)) -} - -func (h *contentMetadataHeap) Pop() interface{} { - old := *h - n := len(old) - item := old[n-1] - *h = old[0 : n-1] - - return item -} - -func (c *cacheBase) sweepDirectory(ctx context.Context) (err error) { - t0 := clock.Now() - - var h contentMetadataHeap - - var totalRetainedSize int64 - - err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - heap.Push(&h, it) - totalRetainedSize += it.Length - - if totalRetainedSize > c.maxSizeBytes { - oldest := heap.Pop(&h).(blob.Metadata) - if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil { - log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr) - } else { - totalRetainedSize -= oldest.Length - } - } - return nil - }) - if err != nil { - return errors.Wrapf(err, "error listing %v", c.description) - } - - log(ctx).Debugf("finished sweeping %v in %v and retained %v/%v bytes (%v %%)", c.description, clock.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) - - return nil -} - -func (c *cacheBase) sweepMutexes() { - cutoffTime := clock.Now().Add(-mutexAgeCutoff).UnixNano() - - // remove from loadingMap all items that have not been touched recently. - // since the mutexes are only for performance (to avoid loading duplicates) - // and not for correctness, it's always safe to remove them. - c.loadingMap.Range(func(key, value interface{}) bool { - if m := value.(*mutexLRU); atomic.LoadInt64(&m.lastUsedNanoseconds) < cutoffTime { - c.loadingMap.Delete(key) - } - - return true - }) -} - -func newContentCacheBase(ctx context.Context, description string, cacheStorage blob.Storage, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*cacheBase, error) { - c := &cacheBase{ - cacheStorage: cacheStorage, - maxSizeBytes: maxSizeBytes, - periodicSweepClosed: make(chan struct{}), - touchThreshold: touchThreshold, - sweepFrequency: sweepFrequency, - description: description, - } - - // errGood is a marker error to stop blob iteration quickly, does not - // indicate any problem. - errGood := errors.Errorf("good") - - // verify that cache storage is functional by listing from it - if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error { - // nolint:wrapcheck - return errGood - }); err != nil && !errors.Is(err, errGood) { - return nil, errors.Wrapf(err, "unable to open %v", c.description) - } - - c.periodicSweepRunning.Add(1) - - go c.sweepDirectoryPeriodically(ctx) - - return c, nil -} diff --git a/repo/content/content_cache_data.go b/repo/content/content_cache_data.go index 581f25569..3533beadf 100644 --- a/repo/content/content_cache_data.go +++ b/repo/content/content_cache_data.go @@ -2,21 +2,16 @@ import ( "context" - "sync/atomic" "github.com/pkg/errors" - "go.opencensus.io/stats" - "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/hmac" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/repo/blob" ) type contentCacheForData struct { - *cacheBase - - st blob.Storage - hmacSecret []byte + pc *cache.PersistentCache + st blob.Storage } func adjustCacheKey(cacheKey cacheKey) cacheKey { @@ -32,81 +27,27 @@ func adjustCacheKey(cacheKey cacheKey) cacheKey { func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { cacheKey = adjustCacheKey(cacheKey) - if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil { - stats.Record(ctx, - metricContentCacheHitCount.M(1), - metricContentCacheHitBytes.M(int64(len(b))), - ) - - return b, nil - } - - stats.Record(ctx, metricContentCacheMissCount.M(1)) - - b, err := c.st.GetBlob(ctx, blobID, offset, length) - if err != nil { - stats.Record(ctx, metricContentCacheMissErrors.M(1)) - } else { - stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b)))) - } - - if errors.Is(err, blob.ErrBlobNotFound) { - // not found in underlying storage - // nolint:wrapcheck - return nil, err - } - - if err != nil { - return nil, errors.Wrap(err, "error getting content from cache") - } - - atomic.StoreInt32(&c.anyChange, 1) - - if puterr := c.cacheStorage.PutBlob(ctx, blob.ID(cacheKey), gather.FromSlice(hmac.Append(b, c.hmacSecret))); puterr != nil { - stats.Record(ctx, metricContentCacheStoreErrors.M(1)) - log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr) - } - - return b, nil + return c.pc.GetOrLoad(ctx, string(cacheKey), func() ([]byte, error) { + return c.st.GetBlob(ctx, blobID, offset, length) + }) } -func (c *contentCacheForData) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte { - b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1) - if err == nil { - b, err = hmac.VerifyAndStrip(b, c.hmacSecret) - if err == nil { - c.touch(ctx, blob.ID(cacheKey)) - - // retrieved from cache and HMAC valid - return b - } - - // ignore malformed contents - log(ctx).Warningf("malformed content %v: %v", cacheKey, err) - - return nil - } - - if !errors.Is(err, blob.ErrBlobNotFound) { - log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err) - } - - return nil +func (c *contentCacheForData) close(ctx context.Context) { + c.pc.Close(ctx) } -func newContentCacheForData(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) { +func newContentCacheForData(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) { if cacheStorage == nil { return passthroughContentCache{st}, nil } - cb, err := newContentCacheBase(ctx, "content cache", cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency) + pc, err := cache.NewPersistentCache(ctx, "content cache", cacheStorage, cache.ChecksumProtection(hmacSecret), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) if err != nil { return nil, errors.Wrap(err, "unable to create base cache") } return &contentCacheForData{ - st: st, - hmacSecret: append([]byte(nil), hmacSecret...), - cacheBase: cb, + st: st, + pc: pc, }, nil } diff --git a/repo/content/content_cache_metadata.go b/repo/content/content_cache_metadata.go index 48af5a462..c41fdbffc 100644 --- a/repo/content/content_cache_metadata.go +++ b/repo/content/content_cache_metadata.go @@ -2,22 +2,28 @@ import ( "context" - "sync/atomic" + "hash/fnv" + "io" + "sync" "github.com/pkg/errors" "go.opencensus.io/stats" "golang.org/x/sync/errgroup" - "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/repo/blob" ) -const metadataCacheSyncParallelism = 16 +const ( + metadataCacheSyncParallelism = 16 + metadataCacheMutexShards = 16 +) type contentCacheForMetadata struct { - *cacheBase + pc *cache.PersistentCache - st blob.Storage + st blob.Storage + shardedMutexes [metadataCacheMutexShards]sync.Mutex } // sync synchronizes metadata cache with all blobs found in the storage. @@ -50,31 +56,31 @@ func (c *contentCacheForMetadata) sync(ctx context.Context) error { return eg.Wait() } +func (c *contentCacheForMetadata) mutexForBlob(blobID blob.ID) *sync.Mutex { + // hash the blob ID to pick one of the sharded mutexes. + h := fnv.New32() + io.WriteString(h, string(blobID)) //nolint:errcheck + mutexID := h.Sum32() % metadataCacheMutexShards + + return &c.shardedMutexes[mutexID] +} + func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) { - m := c.perItemMutex(blobID) + m := c.mutexForBlob(blobID) m.Lock() defer m.Unlock() - if v, err := c.cacheBase.cacheStorage.GetBlob(ctx, blobID, offset, length); err == nil { - // cache hit - stats.Record(ctx, - metricContentCacheHitCount.M(1), - metricContentCacheHitBytes.M(int64(len(v))), - ) - + if v := c.pc.Get(ctx, string(blobID), offset, length); v != nil { return v, nil } - stats.Record(ctx, metricContentCacheMissCount.M(1)) - // read the entire blob - log(ctx).Debugf("fetching metadata blob %q", blobID) blobData, err := c.st.GetBlob(ctx, blobID, 0, -1) if err != nil { - stats.Record(ctx, metricContentCacheMissErrors.M(1)) + stats.Record(ctx, cache.MetricMissErrors.M(1)) } else { - stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(blobData)))) + stats.Record(ctx, cache.MetricMissBytes.M(int64(len(blobData)))) } if errors.Is(err, blob.ErrBlobNotFound) { @@ -88,13 +94,8 @@ func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cache return nil, err } - atomic.StoreInt32(&c.anyChange, 1) - // store the whole blob in the cache. - if puterr := c.cacheStorage.PutBlob(ctx, blobID, gather.FromSlice(blobData)); puterr != nil { - stats.Record(ctx, metricContentCacheStoreErrors.M(1)) - log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr) - } + c.pc.Put(ctx, string(blobID), blobData) if offset == 0 && length == -1 { return blobData, nil @@ -107,18 +108,22 @@ func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cache return blobData[offset : offset+length], nil } -func newContentCacheForMetadata(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64) (contentCache, error) { +func (c *contentCacheForMetadata) close(ctx context.Context) { + c.pc.Close(ctx) +} + +func newContentCacheForMetadata(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, maxSizeBytes int64) (contentCache, error) { if cacheStorage == nil { return passthroughContentCache{st}, nil } - cb, err := newContentCacheBase(ctx, "metadata cache", cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency) + pc, err := cache.NewPersistentCache(ctx, "metadata cache", cacheStorage, cache.NoProtection(), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) if err != nil { return nil, errors.Wrap(err, "unable to create base cache") } return &contentCacheForMetadata{ - st: st, - cacheBase: cb, + st: st, + pc: pc, }, nil } diff --git a/repo/content/content_cache_test.go b/repo/content/content_cache_test.go index e7d55c784..118ae38b8 100644 --- a/repo/content/content_cache_test.go +++ b/repo/content/content_cache_test.go @@ -2,6 +2,7 @@ import ( "bytes" + "context" "reflect" "sort" "strings" @@ -12,6 +13,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/internal/testlogging" @@ -52,27 +54,27 @@ func TestCacheExpiration(t *testing.T) { underlyingStorage := newUnderlyingStorageForContentCacheTesting(t) - cb, err := newContentCacheBase(testlogging.Context(t), "test cache", cacheStorage, 10000, 0, 500*time.Millisecond) + pc, err := cache.NewPersistentCache(testlogging.Context(t), "test cache", cacheStorage.(cache.Storage), cache.NoProtection(), 10000, 0, 500*time.Millisecond) if err != nil { t.Fatalf("unable to create base cache: %v", err) } - cache := &contentCacheForData{ - st: underlyingStorage, - cacheBase: cb, + cc := &contentCacheForData{ + st: underlyingStorage, + pc: pc, } ctx := testlogging.Context(t) - defer cache.close(ctx) + defer cc.close(ctx) - _, err = cache.getContent(ctx, "00000a", "content-4k", 0, -1) // 4k + _, err = cc.getContent(ctx, "00000a", "content-4k", 0, -1) // 4k assertNoError(t, err) - _, err = cache.getContent(ctx, "00000b", "content-4k", 0, -1) // 4k + _, err = cc.getContent(ctx, "00000b", "content-4k", 0, -1) // 4k assertNoError(t, err) - _, err = cache.getContent(ctx, "00000c", "content-4k", 0, -1) // 4k + _, err = cc.getContent(ctx, "00000c", "content-4k", 0, -1) // 4k assertNoError(t, err) - _, err = cache.getContent(ctx, "00000d", "content-4k", 0, -1) // 4k + _, err = cc.getContent(ctx, "00000d", "content-4k", 0, -1) // 4k assertNoError(t, err) // wait for a sweep @@ -94,7 +96,7 @@ func TestCacheExpiration(t *testing.T) { } for _, tc := range cases { - _, got := cache.getContent(ctx, cacheKey(tc.blobID), "content-4k", 0, -1) + _, got := cc.getContent(ctx, cacheKey(tc.blobID), "content-4k", 0, -1) if want := tc.expectedError; !errors.Is(got, want) { t.Errorf("unexpected error when getting content %v: %v wanted %v", tc.blobID, got, want) } else { @@ -110,22 +112,22 @@ func TestDiskContentCache(t *testing.T) { const maxBytes = 10000 - cacheStorage, err := newCacheStorageOrNil(ctx, tmpDir, maxBytes, "contents") + cacheStorage, err := cache.NewStorageOrNil(ctx, tmpDir, maxBytes, "contents") if err != nil { t.Fatal(err) } - cache, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil) + cc, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil) if err != nil { t.Fatalf("err: %v", err) } - defer cache.close(ctx) + defer cc.close(ctx) - verifyContentCache(t, cache) + verifyContentCache(t, cc, cacheStorage) } -func verifyContentCache(t *testing.T, cache contentCache) { +func verifyContentCache(t *testing.T, cc contentCache, cacheStorage blob.Storage) { t.Helper() ctx := testlogging.Context(t) @@ -147,12 +149,12 @@ func verifyContentCache(t *testing.T, cache contentCache) { {"xf0f0f3", "no-such-content", 0, -1, nil, blob.ErrBlobNotFound}, {"xf0f0f4", "no-such-content", 10, 5, nil, blob.ErrBlobNotFound}, {"f0f0f5", "content-1", 7, 3, []byte{8, 9, 10}, nil}, - {"xf0f0f6", "content-1", 11, 10, nil, errors.Errorf("error getting content from cache: invalid offset: 11: invalid blob offset or length")}, - {"xf0f0f6", "content-1", -1, 5, nil, errors.Errorf("error getting content from cache: invalid offset: -1: invalid blob offset or length")}, + {"xf0f0f6", "content-1", 11, 10, nil, errors.Errorf("invalid offset: 11: invalid blob offset or length")}, + {"xf0f0f6", "content-1", -1, 5, nil, errors.Errorf("invalid offset: -1: invalid blob offset or length")}, } for _, tc := range cases { - v, err := cache.getContent(ctx, tc.cacheKey, tc.blobID, tc.offset, tc.length) + v, err := cc.getContent(ctx, tc.cacheKey, tc.blobID, tc.offset, tc.length) if (err != nil) != (tc.err != nil) { t.Errorf("unexpected error for %v: %+v, wanted %+v", tc.cacheKey, err, tc.err) } else if err != nil && err.Error() != tc.err.Error() { @@ -163,27 +165,25 @@ func verifyContentCache(t *testing.T, cache contentCache) { } } - verifyStorageContentList(t, cache.(*contentCacheForData).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5") + verifyStorageContentList(t, cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5") }) t.Run("DataCorruption", func(t *testing.T) { - var cacheKey blob.ID = "f0f0f1x" - d, err := cache.(*contentCacheForData).cacheStorage.GetBlob(ctx, cacheKey, 0, -1) - if err != nil { - t.Fatalf("unable to retrieve data from cache: %v", err) - } + const cacheKey = "f0f0f1x" + + d, err := cacheStorage.GetBlob(ctx, cacheKey, 0, -1) + must(t, err) // corrupt the data and write back d[0] ^= 1 - if puterr := cache.(*contentCacheForData).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil { - t.Fatalf("unable to write corrupted content: %v", puterr) - } + must(t, cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d))) - v, err := cache.getContent(ctx, "xf0f0f1", "content-1", 1, 5) + v, err := cc.getContent(ctx, "xf0f0f1", "content-1", 1, 5) if err != nil { t.Fatalf("error in getContent: %v", err) } + if got, want := v, []byte{2, 3, 4, 5, 6}; !reflect.DeepEqual(v, want) { t.Errorf("invalid result when reading corrupted data: %v, wanted %v", got, want) } @@ -206,7 +206,7 @@ func TestCacheFailureToOpen(t *testing.T) { } // Will fail because of ListBlobs failure. - _, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) + _, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) if err == nil || !strings.Contains(err.Error(), someError.Error()) { t.Errorf("invalid error %v, wanted: %v", err, someError) } @@ -214,12 +214,12 @@ func TestCacheFailureToOpen(t *testing.T) { // ListBlobs fails only once, next time it succeeds. ctx := testlogging.Context(t) - cache, err := newContentCacheForData(ctx, underlyingStorage, faultyCache, 10000, nil) + cc, err := newContentCacheForData(ctx, underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } - defer cache.close(ctx) + cc.close(ctx) } func TestCacheFailureToWrite(t *testing.T) { @@ -232,14 +232,14 @@ func TestCacheFailureToWrite(t *testing.T) { Base: cacheStorage, } - cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) + cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } ctx := testlogging.Context(t) - defer cache.close(ctx) + defer cc.close(ctx) faultyCache.Faults = map[string][]*blobtesting.Fault{ "PutBlob": { @@ -247,7 +247,7 @@ func TestCacheFailureToWrite(t *testing.T) { }, } - v, err := cache.getContent(ctx, "aa", "content-1", 0, 3) + v, err := cc.getContent(ctx, "aa", "content-1", 0, 3) if err != nil { t.Errorf("write failure wasn't ignored: %v", err) } @@ -276,14 +276,14 @@ func TestCacheFailureToRead(t *testing.T) { Base: cacheStorage, } - cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil) + cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil) if err != nil { t.Fatalf("err: %v", err) } ctx := testlogging.Context(t) - defer cache.close(ctx) + defer cc.close(ctx) faultyCache.Faults = map[string][]*blobtesting.Fault{ "GetBlob": { @@ -292,7 +292,7 @@ func TestCacheFailureToRead(t *testing.T) { } for i := 0; i < 2; i++ { - v, err := cache.getContent(ctx, "aa", "content-1", 0, 3) + v, err := cc.getContent(ctx, "aa", "content-1", 0, 3) if err != nil { t.Errorf("read failure wasn't ignored: %v", err) } @@ -329,3 +329,11 @@ func assertNoError(t *testing.T, err error) { t.Errorf("err: %v", err) } } + +type withoutTouchBlob struct { + blob.Storage +} + +func (c withoutTouchBlob) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error { + return errors.Errorf("TouchBlob not implemented") +} diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index e422836da..90c19f406 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -15,6 +15,7 @@ "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/ctxutil" apipb "github.com/kopia/kopia/internal/grpcapi" @@ -58,6 +59,8 @@ type grpcRepositoryClient struct { objectFormat object.Format cliOpts ClientOptions omgr *object.Manager + + contentCache *cache.PersistentCache } type grpcInnerSession struct { @@ -383,7 +386,7 @@ func (r *grpcInnerSession) Flush(ctx context.Context) error { } func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) { - w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, opt, false) + w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, opt, r.contentCache, false) if err != nil { return nil, err } @@ -493,14 +496,16 @@ func unhandledSessionResponse(resp *apipb.SessionResponse) error { } func (r *grpcRepositoryClient) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { - v, err := r.maybeRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) { - return sess.GetContent(ctx, contentID) - }) - if err != nil { - return nil, err - } + return r.contentCache.GetOrLoad(ctx, string(contentID), func() ([]byte, error) { + v, err := r.maybeRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) { + return sess.GetContent(ctx, contentID) + }) + if err != nil { + return nil, err + } - return v.([]byte), nil + return v.([]byte), nil + }) } func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { @@ -587,6 +592,9 @@ func (r *grpcRepositoryClient) Close(ctx context.Context) error { if atomic.AddInt32(r.connRefCount, -1) == 0 { log(ctx).Debugf("closing GPRC connection to %v", r.conn.Target()) + + defer r.contentCache.Close(ctx) + return errors.Wrap(r.conn.Close(), "error closing GRPC connection") } @@ -620,7 +628,7 @@ func (c grpcCreds) RequireTransportSecurity() bool { // OpenGRPCAPIRepository opens the Repository based on remote GRPC server. // The APIServerInfo must have the address of the repository as 'kopia://host:port' -func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, password string) (Repository, error) { +func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string) (Repository, error) { var transportCreds credentials.TransportCredentials if si.TrustedServerCertificateFingerprint != "" { @@ -651,7 +659,7 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien return nil, errors.Wrap(err, "dial error") } - rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, WriteSessionOptions{}, true) + rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, WriteSessionOptions{}, contentCache, true) if err != nil { return nil, err } @@ -719,7 +727,7 @@ func (r *grpcRepositoryClient) killInnerSession() { } // newGRPCAPIRepositoryForConnection opens GRPC-based repository connection. -func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, opt WriteSessionOptions, transparentRetries bool) (*grpcRepositoryClient, error) { +func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, opt WriteSessionOptions, contentCache *cache.PersistentCache, transparentRetries bool) (*grpcRepositoryClient, error) { if opt.OnUpload == nil { opt.OnUpload = func(i int64) {} } @@ -731,6 +739,7 @@ func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientCon transparentRetries: transparentRetries, opt: opt, isReadOnly: cliOpts.ReadOnly, + contentCache: contentCache, } v, err := rr.inSessionWithoutRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) { diff --git a/repo/open.go b/repo/open.go index db57340e2..74ecf7778 100644 --- a/repo/open.go +++ b/repo/open.go @@ -10,8 +10,10 @@ "time" "github.com/pkg/errors" + "golang.org/x/crypto/scrypt" "github.com/kopia/kopia/internal/atomicfile" + "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/repo/blob" loggingwrapper "github.com/kopia/kopia/repo/blob/logging" "github.com/kopia/kopia/repo/blob/readonly" @@ -75,28 +77,61 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r return nil, err } + // cache directory is stored as relative to config file name, resolve it to absolute. + if lc.Caching != nil { + if lc.Caching.CacheDirectory != "" && !filepath.IsAbs(lc.Caching.CacheDirectory) { + lc.Caching.CacheDirectory = filepath.Join(filepath.Dir(configFile), lc.Caching.CacheDirectory) + } + } + if lc.APIServer != nil { - return OpenAPIServer(ctx, lc.APIServer, lc.ClientOptions, password) + return OpenAPIServer(ctx, lc.APIServer, lc.ClientOptions, lc.Caching, password) } return openDirect(ctx, configFile, lc, password, options) } -// OpenAPIServer connects remote repository over Kopia API. -func OpenAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, password string) (Repository, error) { - if si.DisableGRPC { - return openRestAPIRepository(ctx, si, cliOpts, password) +func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string) (*cache.PersistentCache, error) { + opt = opt.CloneOrDefault() + + cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.MaxCacheSizeBytes, "server-contents") + if cs == nil { + // this may be (nil, nil) or (nil, err) + return nil, errors.Wrap(err, "error opening storage") } - return OpenGRPCAPIRepository(ctx, si, cliOpts, password) + // derive content cache key from the password & HMAC secret using scrypt. + salt := append([]byte("content-cache-protection"), opt.HMACSecret...) + + cacheEncryptionKey, err := scrypt.Key([]byte(password), salt, 65536, 8, 1, 32) + if err != nil { + return nil, errors.Wrap(err, "unable to derive cache encryption key from password") + } + + prot, err := cache.AuthenticatedEncryptionProtection(cacheEncryptionKey) + if err != nil { + return nil, errors.Wrap(err, "unable to initialize protection") + } + + return cache.NewPersistentCache(ctx, "cache-storage", cs, prot, opt.MaxCacheSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency) +} + +// OpenAPIServer connects remote repository over Kopia API. +func OpenAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, cachingOptions *content.CachingOptions, password string) (Repository, error) { + contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password) + if err != nil { + return nil, errors.Wrap(err, "error opening content cache") + } + + if si.DisableGRPC { + return openRestAPIRepository(ctx, si, cliOpts, contentCache, password) + } + + return OpenGRPCAPIRepository(ctx, si, cliOpts, contentCache, password) } // openDirect opens the repository that directly manipulates blob storage.. func openDirect(ctx context.Context, configFile string, lc *LocalConfig, password string, options *Options) (rep Repository, err error) { - if lc.Caching.CacheDirectory != "" && !filepath.IsAbs(lc.Caching.CacheDirectory) { - lc.Caching.CacheDirectory = filepath.Join(filepath.Dir(configFile), lc.Caching.CacheDirectory) - } - if lc.Storage == nil { return nil, errors.Errorf("storage not set in the configuration file") } diff --git a/tests/end_to_end_test/api_server_repository_test.go b/tests/end_to_end_test/api_server_repository_test.go index b61156343..0c721ec30 100644 --- a/tests/end_to_end_test/api_server_repository_test.go +++ b/tests/end_to_end_test/api_server_repository_test.go @@ -109,7 +109,7 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al }, repo.ClientOptions{ Username: "foo", Hostname: "bar", - }, "baz") + }, nil, "baz") if err != nil { t.Fatal(err) } @@ -242,7 +242,7 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al }, repo.ClientOptions{ Username: "foo", Hostname: "bar", - }, "baz") + }, nil, "baz") if dur := clock.Since(t0); dur > 15*time.Second { t.Fatalf("failed connection took %v", dur)