Implemented caching for server connections (#845)

* cache: refactored reusable portion of cache into separate package

* repo: plumbed through caching for remote repository clients

* repo: plumb through cache in the unit tests

* cache: ensure we only allow absolute cache paths, fixed cache path resolution for remote repositories
This commit is contained in:
Jarek Kowalski
2021-03-01 06:15:39 -08:00
committed by GitHub
parent d734c20918
commit 4e705726fe
17 changed files with 760 additions and 418 deletions

View File

@@ -1,57 +1,72 @@
package content
package cache
import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)
// content cache metrics.
// cache metrics.
var (
metricContentCacheHitCount = stats.Int64(
MetricHitCount = stats.Int64(
"kopia/content/cache/hit_count",
"Number of time content was retrieved from the cache",
stats.UnitDimensionless,
)
metricContentCacheHitBytes = stats.Int64(
MetricHitBytes = stats.Int64(
"kopia/content/cache/hit_bytes",
"Number of bytes retrieved from the cache",
stats.UnitBytes,
)
metricContentCacheMissCount = stats.Int64(
MetricMissCount = stats.Int64(
"kopia/content/cache/miss_count",
"Number of time content was not found in the cache and fetched from the storage",
stats.UnitDimensionless,
)
metricContentCacheMissBytes = stats.Int64(
MetricMalformedCacheDataCount = stats.Int64(
"kopia/content/cache/malformed",
"Number of times malformed content was read from the cache",
stats.UnitDimensionless,
)
MetricMissBytes = stats.Int64(
"kopia/content/cache/missed_bytes",
"Number of bytes retrieved from the underlying storage",
stats.UnitBytes,
)
metricContentCacheMissErrors = stats.Int64(
MetricMissErrors = stats.Int64(
"kopia/content/cache/miss_error_count",
"Number of time content could not be found in the underlying storage",
stats.UnitDimensionless,
)
metricContentCacheStoreErrors = stats.Int64(
MetricStoreErrors = stats.Int64(
"kopia/content/cache/store_error_count",
"Number of time content could not be saved in the cache",
stats.UnitDimensionless,
)
)
func simpleAggregation(m stats.Measure, agg *view.Aggregation) *view.View {
return &view.View{
Name: m.Name(),
Aggregation: agg,
Description: m.Description(),
Measure: m,
}
}
func init() {
if err := view.Register(
simpleAggregation(metricContentCacheHitCount, view.Count()),
simpleAggregation(metricContentCacheHitBytes, view.Sum()),
simpleAggregation(metricContentCacheMissCount, view.Count()),
simpleAggregation(metricContentCacheMissBytes, view.Sum()),
simpleAggregation(metricContentCacheMissErrors, view.Count()),
simpleAggregation(metricContentCacheStoreErrors, view.Count()),
simpleAggregation(MetricHitCount, view.Count()),
simpleAggregation(MetricHitBytes, view.Sum()),
simpleAggregation(MetricMissCount, view.Count()),
simpleAggregation(MetricMissBytes, view.Sum()),
simpleAggregation(MetricMissErrors, view.Count()),
simpleAggregation(MetricStoreErrors, view.Count()),
); err != nil {
panic("unable to register opencensus views: " + err.Error())
}

49
internal/cache/cache_storage.go vendored Normal file
View File

@@ -0,0 +1,49 @@
package cache
import (
"context"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
)
// Storage is the storage interface required by the cache and is implemented by the filesystem Storage.
type Storage interface {
blob.Storage
TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error
}
// NewStorageOrNil returns cache.Storage backed by the provided directory.
func NewStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (Storage, error) {
if maxBytes <= 0 || cacheDir == "" {
return nil, nil
}
if !filepath.IsAbs(cacheDir) {
return nil, errors.Errorf("cache dir %q was not absolute", cacheDir)
}
contentCacheDir := filepath.Join(cacheDir, subdir)
if _, err := os.Stat(contentCacheDir); os.IsNotExist(err) {
if mkdirerr := os.MkdirAll(contentCacheDir, 0o700); mkdirerr != nil {
return nil, errors.Wrap(mkdirerr, "error creating cache directory")
}
}
fs, err := filesystem.New(ctxutil.Detach(ctx), &filesystem.Options{
Path: contentCacheDir,
DirectoryShards: []int{2},
})
if err != nil {
return nil, errors.Wrap(err, "error initializing filesystem cache")
}
return fs.(Storage), nil
}

251
internal/cache/persistent_lru_cache.go vendored Normal file
View File

@@ -0,0 +1,251 @@
// Package cache implements durable on-disk cache with LRU expiration.
package cache
import (
"container/heap"
"context"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
var log = logging.GetContextLoggerFunc("cache")
const (
// DefaultSweepFrequency is how frequently the contents of cache are sweeped to remove excess data.
DefaultSweepFrequency = 1 * time.Minute
// DefaultTouchThreshold specifies the resolution of timestamps used to determine which cache items
// to expire. This helps cache storage writes on frequently accessed items.
DefaultTouchThreshold = 10 * time.Minute
)
// PersistentCache provides persistent on-disk cache.
type PersistentCache struct {
anyChange int32
cacheStorage Storage
storageProtection StorageProtection
maxSizeBytes int64
sweepFrequency time.Duration
touchThreshold time.Duration
description string
periodicSweepRunning sync.WaitGroup
periodicSweepClosed chan struct{}
}
// GetOrLoad is utility function gets the provided item from the cache or invokes the provided fetch function.
// The function also appends and verifies HMAC checksums using provided secret on all cached items to ensure data integrity.
func (c *PersistentCache) GetOrLoad(ctx context.Context, key string, fetch func() ([]byte, error)) ([]byte, error) {
if c == nil {
// special case - also works on non-initialized cache pointer.
return fetch()
}
if b := c.Get(ctx, key, 0, -1); b != nil {
return b, nil
}
b, err := fetch()
if err != nil {
stats.Record(ctx, MetricMissErrors.M(1))
return nil, err
}
stats.Record(ctx, MetricMissBytes.M(int64(len(b))))
c.Put(ctx, key, b)
return b, nil
}
// Get fetches the contents of a cached blob when (length < 0) or a subset of it (when length >= 0).
// returns nil if not found.
func (c *PersistentCache) Get(ctx context.Context, key string, offset, length int64) []byte {
if c == nil {
return nil
}
if length >= 0 && !c.storageProtection.SupportsPartial() {
return nil
}
v, err := c.cacheStorage.GetBlob(ctx, blob.ID(key), offset, length)
if err == nil {
vb, err := c.storageProtection.Verify(key, v)
if err == nil {
// cache hit
stats.Record(ctx,
MetricHitCount.M(1),
MetricHitBytes.M(int64(len(vb))),
)
// cache hit
c.cacheStorage.TouchBlob(ctx, blob.ID(key), c.touchThreshold) //nolint:errcheck
return vb
}
// delete invalid blob
stats.Record(ctx, MetricMalformedCacheDataCount.M(1))
if err := c.cacheStorage.DeleteBlob(ctx, blob.ID(key)); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Warningf("unable to delete %v entry %v: %v", c.description, key, err)
}
}
// cache miss
stats.Record(ctx, MetricMissCount.M(1))
return nil
}
// Put adds the provided key-value pair to the cache.
func (c *PersistentCache) Put(ctx context.Context, key string, data []byte) {
if c == nil {
return
}
atomic.StoreInt32(&c.anyChange, 1)
if err := c.cacheStorage.PutBlob(ctx, blob.ID(key), gather.FromSlice(c.storageProtection.Protect(key, data))); err != nil {
stats.Record(ctx, MetricStoreErrors.M(1))
log(ctx).Warningf("unable to add %v to %v: %v", key, c.description, err)
}
}
// Close closes the instance of persistent cache possibly waiting for at least one sweep to complete.
func (c *PersistentCache) Close(ctx context.Context) {
if c == nil {
return
}
close(c.periodicSweepClosed)
c.periodicSweepRunning.Wait()
// if we added anything to the cache in this sesion, run one last sweep before shutting down.
if atomic.LoadInt32(&c.anyChange) == 1 {
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Warningf("error during final sweep of the %v: %v", c.description, err)
}
}
}
func (c *PersistentCache) sweepDirectoryPeriodically(ctx context.Context) {
defer c.periodicSweepRunning.Done()
for {
select {
case <-c.periodicSweepClosed:
return
case <-time.After(c.sweepFrequency):
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Warningf("error during periodic sweep of %v: %v", c.description, err)
}
}
}
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap []blob.Metadata
func (h contentMetadataHeap) Len() int { return len(h) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(blob.Metadata))
}
func (h *contentMetadataHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
func (c *PersistentCache) sweepDirectory(ctx context.Context) (err error) {
t0 := clock.Now()
var h contentMetadataHeap
var totalRetainedSize int64
err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.maxSizeBytes {
oldest := heap.Pop(&h).(blob.Metadata)
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
return nil
})
if err != nil {
return errors.Wrapf(err, "error listing %v", c.description)
}
log(ctx).Debugf("finished sweeping %v in %v and retained %v/%v bytes (%v %%)", c.description, clock.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes)
return nil
}
// NewPersistentCache creates the persistent cache in the provided storage.
func NewPersistentCache(ctx context.Context, description string, cacheStorage Storage, storageProtection StorageProtection, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*PersistentCache, error) {
if storageProtection == nil {
storageProtection = nullStorageProtection{}
}
c := &PersistentCache{
cacheStorage: cacheStorage,
maxSizeBytes: maxSizeBytes,
periodicSweepClosed: make(chan struct{}),
touchThreshold: touchThreshold,
sweepFrequency: sweepFrequency,
description: description,
storageProtection: storageProtection,
}
// errGood is a marker error to stop blob iteration quickly, does not
// indicate any problem.
errGood := errors.Errorf("good")
// verify that cache storage is functional by listing from it
if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
// nolint:wrapcheck
return errGood
}); err != nil && !errors.Is(err, errGood) {
return nil, errors.Wrapf(err, "unable to open %v", c.description)
}
c.periodicSweepRunning.Add(1)
go c.sweepDirectoryPeriodically(ctx)
return c, nil
}

View File

@@ -0,0 +1,99 @@
package cache_test
import (
"bytes"
"context"
"testing"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob"
)
func TestPersistentLRUCache(t *testing.T) {
cacheDir := testutil.TempDirectory(t)
ctx := testlogging.Context(t)
const maxSizeBytes = 1000
cs, err := cache.NewStorageOrNil(ctx, cacheDir, maxSizeBytes, "subdir")
if err != nil {
t.Fatal(err)
}
pc, err := cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency)
if err != nil {
t.Fatal(err)
}
if got := pc.Get(ctx, "key", 0, -1); got != nil {
t.Fatalf("unexpected cache hit on empty cache: %x", got)
}
someData := bytes.Repeat([]byte{1}, 300)
pc.Put(ctx, "key1", someData)
verifyBlobExists(ctx, t, cs, "key1")
// sleep between adding key1 and the rest to make it easily the oldest
// even if the filesystem is not very precise keeping time.
time.Sleep(2 * time.Second)
pc.Put(ctx, "key2", someData)
verifyBlobExists(ctx, t, cs, "key2")
pc.Put(ctx, "key3", someData)
verifyBlobExists(ctx, t, cs, "key3")
pc.Put(ctx, "key4", someData)
verifyBlobExists(ctx, t, cs, "key4")
if got, want := pc.Get(ctx, "key2", 0, -1), someData; !bytes.Equal(got, want) {
t.Fatalf("invalid data retrieved from cache: %x", got)
}
// final sweep is performed on close at which time key1 becomes candidate
// for expulsion from cache because it's the oldest and we have 1200 bytes in the cache
// but the limit is only 1000.
pc.Close(ctx)
verifyBlobDoesNotExist(ctx, t, cs, "key1")
verifyBlobExists(ctx, t, cs, "key2")
verifyBlobExists(ctx, t, cs, "key3")
verifyBlobExists(ctx, t, cs, "key4")
pc, err = cache.NewPersistentCache(ctx, "testing", cs, cache.ChecksumProtection([]byte{1, 2, 3}), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency)
if err != nil {
t.Fatal(err)
}
verifyCached(ctx, t, pc, "key1", nil)
verifyCached(ctx, t, pc, "key2", someData)
verifyCached(ctx, t, pc, "key3", someData)
verifyCached(ctx, t, pc, "key4", someData)
}
func verifyCached(ctx context.Context, t *testing.T, pc *cache.PersistentCache, key string, want []byte) {
t.Helper()
if got := pc.Get(ctx, key, 0, -1); !bytes.Equal(got, want) {
t.Fatalf("invalid cached result for %v: %x, want %x", key, got, want)
}
}
func verifyBlobExists(ctx context.Context, t *testing.T, st blob.Storage, blobID blob.ID) {
t.Helper()
if _, err := st.GetMetadata(ctx, blobID); err != nil {
t.Fatalf("blob %v error: %v", blobID, err)
}
}
func verifyBlobDoesNotExist(ctx context.Context, t *testing.T, st blob.Storage, blobID blob.ID) {
t.Helper()
if _, err := st.GetMetadata(ctx, blobID); !errors.Is(err, blob.ErrBlobNotFound) {
t.Fatalf("unexpected blob %v error: %v", blobID, err)
}
}

110
internal/cache/storage_protection.go vendored Normal file
View File

@@ -0,0 +1,110 @@
package cache
import (
"crypto/sha256"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/hmac"
"github.com/kopia/kopia/repo/encryption"
)
// encryptionProtectionAlgorithm is the authenticated encryption algorithm used by authenticatedEncryptionProtection.
var encryptionProtectionAlgorithm = "AES256-GCM-HMAC-SHA256"
// StorageProtection encapsulates protection (HMAC and/or encryption) applied to local cache items.
type StorageProtection interface {
SupportsPartial() bool
Protect(id string, b []byte) []byte
Verify(id string, b []byte) ([]byte, error)
}
type nullStorageProtection struct{}
func (nullStorageProtection) Protect(id string, b []byte) []byte {
return b
}
func (nullStorageProtection) Verify(id string, b []byte) ([]byte, error) {
return b, nil
}
func (nullStorageProtection) SupportsPartial() bool {
return true
}
// NoProtection returns implementation of StorageProtection that offers no protection.
func NoProtection() StorageProtection {
return nullStorageProtection{}
}
type checksumProtection struct {
Secret []byte
}
func (p checksumProtection) Protect(id string, b []byte) []byte {
return hmac.Append(b, p.Secret)
}
func (p checksumProtection) Verify(id string, b []byte) ([]byte, error) {
return hmac.VerifyAndStrip(b, p.Secret)
}
func (checksumProtection) SupportsPartial() bool {
return false
}
// ChecksumProtection returns StorageProtection that protects cached data using HMAC checksums without encryption.
func ChecksumProtection(key []byte) StorageProtection {
return checksumProtection{key}
}
type authenticatedEncryptionProtection struct {
e encryption.Encryptor
}
func (p authenticatedEncryptionProtection) deriveIV(id string) []byte {
contentID := sha256.Sum256([]byte(id))
return contentID[:]
}
func (p authenticatedEncryptionProtection) Protect(id string, b []byte) []byte {
c, err := p.e.Encrypt(nil, b, p.deriveIV(id))
if err != nil {
panic("encryption unexpectedly failed: " + err.Error())
}
return c
}
func (authenticatedEncryptionProtection) SupportsPartial() bool {
return false
}
func (p authenticatedEncryptionProtection) Verify(id string, b []byte) ([]byte, error) {
return p.e.Decrypt(nil, b, p.deriveIV(id))
}
type authenticatedEncryptionProtectionKey []byte
func (k authenticatedEncryptionProtectionKey) GetEncryptionAlgorithm() string {
return encryptionProtectionAlgorithm
}
func (k authenticatedEncryptionProtectionKey) GetMasterKey() []byte {
return k
}
// AuthenticatedEncryptionProtection returns StorageProtection that protects cached data using authenticated encryption.
func AuthenticatedEncryptionProtection(key []byte) (StorageProtection, error) {
e, err := encryption.CreateEncryptor(authenticatedEncryptionProtectionKey(key))
if err != nil {
return nil, errors.Wrap(err, "unable to create encryptor")
}
if !e.IsAuthenticated() {
return nil, errors.Wrap(err, "encryption is not authenticated!")
}
return authenticatedEncryptionProtection{e}, nil
}

View File

@@ -0,0 +1,40 @@
package cache_test
import (
"bytes"
"testing"
"github.com/kopia/kopia/internal/cache"
)
func TestHMACStorageProtection(t *testing.T) {
testStorageProtection(t, cache.ChecksumProtection([]byte{1, 2, 3, 4}))
}
func TestEncryptionStorageProtection(t *testing.T) {
e, err := cache.AuthenticatedEncryptionProtection([]byte{1})
if err != nil {
t.Fatal(err)
}
testStorageProtection(t, e)
}
// nolint:thelper
func testStorageProtection(t *testing.T, sp cache.StorageProtection) {
payload := []byte{0, 1, 2, 3, 4}
protected := sp.Protect("x", payload)
unprotected, err := sp.Verify("x", protected)
if err != nil {
t.Fatal(err)
}
if got, want := unprotected, payload; !bytes.Equal(got, want) {
t.Fatalf("invalid unprotected payload %x, wanted %x", got, want)
}
// flip one bit
protected[0] ^= 1
}

View File

@@ -16,6 +16,7 @@
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/internal/server"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
@@ -28,6 +29,8 @@
testHostname = "bar"
testPassword = "123"
testPathname = "/tmp/path"
maxCacheSizeBytes = 1e6
)
// nolint:thelper
@@ -85,6 +88,9 @@ func testServer(t *testing.T, disableGRPC bool) {
rep, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{
Username: testUsername,
Hostname: testHostname,
}, &content.CachingOptions{
CacheDirectory: testutil.TempDirectory(t),
MaxCacheSizeBytes: maxCacheSizeBytes,
}, testPassword)
if err != nil {
t.Fatal(err)
@@ -102,7 +108,7 @@ func TestGPRServer_AuthenticationError(t *testing.T) {
if _, err := repo.OpenGRPCAPIRepository(ctx, apiServerInfo, repo.ClientOptions{
Username: "bad-username",
Hostname: "bad-hostname",
}, "bad-password"); err == nil {
}, nil, "bad-password"); err == nil {
t.Fatal("unexpected success when connecting with invalid username")
}
}

View File

@@ -14,6 +14,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/apiclient"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/remoterepoapi"
"github.com/kopia/kopia/repo/content"
@@ -38,6 +39,8 @@ type apiServerRepository struct {
cliOpts ClientOptions
omgr *object.Manager
wso WriteSessionOptions
contentCache *cache.PersistentCache
}
func (r *apiServerRepository) APIServerURL() string {
@@ -160,13 +163,15 @@ func (r *apiServerRepository) ContentInfo(ctx context.Context, contentID content
}
func (r *apiServerRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
var result []byte
return r.contentCache.GetOrLoad(ctx, string(contentID), func() ([]byte, error) {
var result []byte
if err := r.cli.Get(ctx, "contents/"+string(contentID), content.ErrContentNotFound, &result); err != nil {
return nil, errors.Wrap(err, "GetContent")
}
if err := r.cli.Get(ctx, "contents/"+string(contentID), content.ErrContentNotFound, &result); err != nil {
return nil, errors.Wrap(err, "GetContent")
}
return result, nil
return result, nil
})
}
func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) {
@@ -209,7 +214,7 @@ func (r *apiServerRepository) Close(ctx context.Context) error {
var _ Repository = (*apiServerRepository)(nil)
// openRestAPIRepository connects remote repository over Kopia API.
func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, password string) (Repository, error) {
func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string) (Repository, error) {
cli, err := apiclient.NewKopiaAPIClient(apiclient.Options{
BaseURL: si.BaseURL,
TrustedServerCertificateFingerprint: si.TrustedServerCertificateFingerprint,
@@ -222,8 +227,9 @@ func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
}
rr := &apiServerRepository{
cli: cli,
cliOpts: cliOpts,
cli: cli,
cliOpts: cliOpts,
contentCache: contentCache,
wso: WriteSessionOptions{
OnUpload: func(i int64) {},
},
@@ -259,6 +265,7 @@ func ConnectAPIServer(ctx context.Context, configFile string, si *APIServerInfo,
lc := LocalConfig{
APIServer: si,
ClientOptions: opt.ClientOptions.ApplyDefaults(ctx, "API Server: "+si.BaseURL),
Caching: opt.CachingOptions.CloneOrDefault(),
}
d, err := json.MarshalIndent(&lc, "", " ")

View File

@@ -12,6 +12,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/buf"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
@@ -266,7 +267,7 @@ func (sm *SharedManager) verifyChecksum(data, contentID []byte) error {
}
func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *CachingOptions) error {
dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents")
dataCacheStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents")
if err != nil {
return errors.Wrap(err, "unable to initialize data cache storage")
}
@@ -281,7 +282,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
metadataCacheSize = caching.MaxCacheSizeBytes
}
metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata")
metadataCacheStorage, err := cache.NewStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata")
if err != nil {
return errors.Wrap(err, "unable to initialize data cache storage")
}

View File

@@ -2,14 +2,8 @@
import (
"context"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/ctxutil"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
)
type cacheKey string
@@ -18,29 +12,3 @@ type contentCache interface {
close(ctx context.Context)
getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error)
}
func newCacheStorageOrNil(ctx context.Context, cacheDir string, maxBytes int64, subdir string) (blob.Storage, error) {
var cacheStorage blob.Storage
var err error
if maxBytes > 0 && cacheDir != "" {
contentCacheDir := filepath.Join(cacheDir, subdir)
if _, err = os.Stat(contentCacheDir); os.IsNotExist(err) {
if mkdirerr := os.MkdirAll(contentCacheDir, 0o700); mkdirerr != nil {
return nil, errors.Wrap(mkdirerr, "error creating cache directory")
}
}
cacheStorage, err = filesystem.New(ctxutil.Detach(ctx), &filesystem.Options{
Path: contentCacheDir,
DirectoryShards: []int{2},
})
if err != nil {
return nil, errors.Wrap(err, "error initializing filesystem cache")
}
}
return cacheStorage, nil
}

View File

@@ -1,202 +0,0 @@
package content
import (
"container/heap"
"context"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/repo/blob"
)
const (
defaultSweepFrequency = 1 * time.Minute
defaultTouchThreshold = 10 * time.Minute
mutexAgeCutoff = 5 * time.Minute
)
type mutexLRU struct {
// values aligned to 8-bytes due to atomic access
lastUsedNanoseconds int64
mu *sync.Mutex
}
// cacheBase provides common implementation for per-content and per-blob caches.
type cacheBase struct {
anyChange int32
cacheStorage blob.Storage
maxSizeBytes int64
sweepFrequency time.Duration
touchThreshold time.Duration
description string
periodicSweepRunning sync.WaitGroup
periodicSweepClosed chan struct{}
// stores key to *mutexLRU mapping which is periodically garbage-collected
// and used to eliminate/minimize concurrent fetches of the same cached item.
loadingMap sync.Map
}
type blobToucher interface {
TouchBlob(ctx context.Context, contentID blob.ID, threshold time.Duration) error
}
func (c *cacheBase) touch(ctx context.Context, blobID blob.ID) {
if t, ok := c.cacheStorage.(blobToucher); ok {
t.TouchBlob(ctx, blobID, c.touchThreshold) //nolint:errcheck
}
}
func (c *cacheBase) close(ctx context.Context) {
close(c.periodicSweepClosed)
c.periodicSweepRunning.Wait()
// if we added anything to the cache in this sesion, run one last sweep before shutting down.
if atomic.LoadInt32(&c.anyChange) == 1 {
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Warningf("error during final sweep of the %v: %v", c.description, err)
}
}
}
func (c *cacheBase) perItemMutex(key interface{}) *sync.Mutex {
now := clock.Now().UnixNano()
v, ok := c.loadingMap.Load(key)
if !ok {
v, _ = c.loadingMap.LoadOrStore(key, &mutexLRU{
mu: &sync.Mutex{},
lastUsedNanoseconds: now,
})
}
m := v.(*mutexLRU)
atomic.StoreInt64(&m.lastUsedNanoseconds, now)
return m.mu
}
func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) {
defer c.periodicSweepRunning.Done()
for {
select {
case <-c.periodicSweepClosed:
return
case <-time.After(c.sweepFrequency):
c.sweepMutexes()
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Warningf("error during periodic sweep of %v: %v", c.description, err)
}
}
}
}
// A contentMetadataHeap implements heap.Interface and holds blob.Metadata.
type contentMetadataHeap []blob.Metadata
func (h contentMetadataHeap) Len() int { return len(h) }
func (h contentMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}
func (h contentMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *contentMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(blob.Metadata))
}
func (h *contentMetadataHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
func (c *cacheBase) sweepDirectory(ctx context.Context) (err error) {
t0 := clock.Now()
var h contentMetadataHeap
var totalRetainedSize int64
err = c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.maxSizeBytes {
oldest := heap.Pop(&h).(blob.Metadata)
if delerr := c.cacheStorage.DeleteBlob(ctx, oldest.BlobID); delerr != nil {
log(ctx).Warningf("unable to remove %v: %v", oldest.BlobID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
return nil
})
if err != nil {
return errors.Wrapf(err, "error listing %v", c.description)
}
log(ctx).Debugf("finished sweeping %v in %v and retained %v/%v bytes (%v %%)", c.description, clock.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes)
return nil
}
func (c *cacheBase) sweepMutexes() {
cutoffTime := clock.Now().Add(-mutexAgeCutoff).UnixNano()
// remove from loadingMap all items that have not been touched recently.
// since the mutexes are only for performance (to avoid loading duplicates)
// and not for correctness, it's always safe to remove them.
c.loadingMap.Range(func(key, value interface{}) bool {
if m := value.(*mutexLRU); atomic.LoadInt64(&m.lastUsedNanoseconds) < cutoffTime {
c.loadingMap.Delete(key)
}
return true
})
}
func newContentCacheBase(ctx context.Context, description string, cacheStorage blob.Storage, maxSizeBytes int64, touchThreshold, sweepFrequency time.Duration) (*cacheBase, error) {
c := &cacheBase{
cacheStorage: cacheStorage,
maxSizeBytes: maxSizeBytes,
periodicSweepClosed: make(chan struct{}),
touchThreshold: touchThreshold,
sweepFrequency: sweepFrequency,
description: description,
}
// errGood is a marker error to stop blob iteration quickly, does not
// indicate any problem.
errGood := errors.Errorf("good")
// verify that cache storage is functional by listing from it
if err := c.cacheStorage.ListBlobs(ctx, "", func(it blob.Metadata) error {
// nolint:wrapcheck
return errGood
}); err != nil && !errors.Is(err, errGood) {
return nil, errors.Wrapf(err, "unable to open %v", c.description)
}
c.periodicSweepRunning.Add(1)
go c.sweepDirectoryPeriodically(ctx)
return c, nil
}

View File

@@ -2,21 +2,16 @@
import (
"context"
"sync/atomic"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/hmac"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/repo/blob"
)
type contentCacheForData struct {
*cacheBase
st blob.Storage
hmacSecret []byte
pc *cache.PersistentCache
st blob.Storage
}
func adjustCacheKey(cacheKey cacheKey) cacheKey {
@@ -32,81 +27,27 @@ func adjustCacheKey(cacheKey cacheKey) cacheKey {
func (c *contentCacheForData) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
cacheKey = adjustCacheKey(cacheKey)
if b := c.readAndVerifyCacheContent(ctx, cacheKey); b != nil {
stats.Record(ctx,
metricContentCacheHitCount.M(1),
metricContentCacheHitBytes.M(int64(len(b))),
)
return b, nil
}
stats.Record(ctx, metricContentCacheMissCount.M(1))
b, err := c.st.GetBlob(ctx, blobID, offset, length)
if err != nil {
stats.Record(ctx, metricContentCacheMissErrors.M(1))
} else {
stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(b))))
}
if errors.Is(err, blob.ErrBlobNotFound) {
// not found in underlying storage
// nolint:wrapcheck
return nil, err
}
if err != nil {
return nil, errors.Wrap(err, "error getting content from cache")
}
atomic.StoreInt32(&c.anyChange, 1)
if puterr := c.cacheStorage.PutBlob(ctx, blob.ID(cacheKey), gather.FromSlice(hmac.Append(b, c.hmacSecret))); puterr != nil {
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
log(ctx).Warningf("unable to write cache item %v: %v", cacheKey, puterr)
}
return b, nil
return c.pc.GetOrLoad(ctx, string(cacheKey), func() ([]byte, error) {
return c.st.GetBlob(ctx, blobID, offset, length)
})
}
func (c *contentCacheForData) readAndVerifyCacheContent(ctx context.Context, cacheKey cacheKey) []byte {
b, err := c.cacheStorage.GetBlob(ctx, blob.ID(cacheKey), 0, -1)
if err == nil {
b, err = hmac.VerifyAndStrip(b, c.hmacSecret)
if err == nil {
c.touch(ctx, blob.ID(cacheKey))
// retrieved from cache and HMAC valid
return b
}
// ignore malformed contents
log(ctx).Warningf("malformed content %v: %v", cacheKey, err)
return nil
}
if !errors.Is(err, blob.ErrBlobNotFound) {
log(ctx).Warningf("unable to read cache %v: %v", cacheKey, err)
}
return nil
func (c *contentCacheForData) close(ctx context.Context) {
c.pc.Close(ctx)
}
func newContentCacheForData(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) {
func newContentCacheForData(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, maxSizeBytes int64, hmacSecret []byte) (contentCache, error) {
if cacheStorage == nil {
return passthroughContentCache{st}, nil
}
cb, err := newContentCacheBase(ctx, "content cache", cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency)
pc, err := cache.NewPersistentCache(ctx, "content cache", cacheStorage, cache.ChecksumProtection(hmacSecret), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency)
if err != nil {
return nil, errors.Wrap(err, "unable to create base cache")
}
return &contentCacheForData{
st: st,
hmacSecret: append([]byte(nil), hmacSecret...),
cacheBase: cb,
st: st,
pc: pc,
}, nil
}

View File

@@ -2,22 +2,28 @@
import (
"context"
"sync/atomic"
"hash/fnv"
"io"
"sync"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/repo/blob"
)
const metadataCacheSyncParallelism = 16
const (
metadataCacheSyncParallelism = 16
metadataCacheMutexShards = 16
)
type contentCacheForMetadata struct {
*cacheBase
pc *cache.PersistentCache
st blob.Storage
st blob.Storage
shardedMutexes [metadataCacheMutexShards]sync.Mutex
}
// sync synchronizes metadata cache with all blobs found in the storage.
@@ -50,31 +56,31 @@ func (c *contentCacheForMetadata) sync(ctx context.Context) error {
return eg.Wait()
}
func (c *contentCacheForMetadata) mutexForBlob(blobID blob.ID) *sync.Mutex {
// hash the blob ID to pick one of the sharded mutexes.
h := fnv.New32()
io.WriteString(h, string(blobID)) //nolint:errcheck
mutexID := h.Sum32() % metadataCacheMutexShards
return &c.shardedMutexes[mutexID]
}
func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cacheKey, blobID blob.ID, offset, length int64) ([]byte, error) {
m := c.perItemMutex(blobID)
m := c.mutexForBlob(blobID)
m.Lock()
defer m.Unlock()
if v, err := c.cacheBase.cacheStorage.GetBlob(ctx, blobID, offset, length); err == nil {
// cache hit
stats.Record(ctx,
metricContentCacheHitCount.M(1),
metricContentCacheHitBytes.M(int64(len(v))),
)
if v := c.pc.Get(ctx, string(blobID), offset, length); v != nil {
return v, nil
}
stats.Record(ctx, metricContentCacheMissCount.M(1))
// read the entire blob
log(ctx).Debugf("fetching metadata blob %q", blobID)
blobData, err := c.st.GetBlob(ctx, blobID, 0, -1)
if err != nil {
stats.Record(ctx, metricContentCacheMissErrors.M(1))
stats.Record(ctx, cache.MetricMissErrors.M(1))
} else {
stats.Record(ctx, metricContentCacheMissBytes.M(int64(len(blobData))))
stats.Record(ctx, cache.MetricMissBytes.M(int64(len(blobData))))
}
if errors.Is(err, blob.ErrBlobNotFound) {
@@ -88,13 +94,8 @@ func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cache
return nil, err
}
atomic.StoreInt32(&c.anyChange, 1)
// store the whole blob in the cache.
if puterr := c.cacheStorage.PutBlob(ctx, blobID, gather.FromSlice(blobData)); puterr != nil {
stats.Record(ctx, metricContentCacheStoreErrors.M(1))
log(ctx).Warningf("unable to write cache item %v: %v", blobID, puterr)
}
c.pc.Put(ctx, string(blobID), blobData)
if offset == 0 && length == -1 {
return blobData, nil
@@ -107,18 +108,22 @@ func (c *contentCacheForMetadata) getContent(ctx context.Context, cacheKey cache
return blobData[offset : offset+length], nil
}
func newContentCacheForMetadata(ctx context.Context, st, cacheStorage blob.Storage, maxSizeBytes int64) (contentCache, error) {
func (c *contentCacheForMetadata) close(ctx context.Context) {
c.pc.Close(ctx)
}
func newContentCacheForMetadata(ctx context.Context, st blob.Storage, cacheStorage cache.Storage, maxSizeBytes int64) (contentCache, error) {
if cacheStorage == nil {
return passthroughContentCache{st}, nil
}
cb, err := newContentCacheBase(ctx, "metadata cache", cacheStorage, maxSizeBytes, defaultTouchThreshold, defaultSweepFrequency)
pc, err := cache.NewPersistentCache(ctx, "metadata cache", cacheStorage, cache.NoProtection(), maxSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency)
if err != nil {
return nil, errors.Wrap(err, "unable to create base cache")
}
return &contentCacheForMetadata{
st: st,
cacheBase: cb,
st: st,
pc: pc,
}, nil
}

View File

@@ -2,6 +2,7 @@
import (
"bytes"
"context"
"reflect"
"sort"
"strings"
@@ -12,6 +13,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
@@ -52,27 +54,27 @@ func TestCacheExpiration(t *testing.T) {
underlyingStorage := newUnderlyingStorageForContentCacheTesting(t)
cb, err := newContentCacheBase(testlogging.Context(t), "test cache", cacheStorage, 10000, 0, 500*time.Millisecond)
pc, err := cache.NewPersistentCache(testlogging.Context(t), "test cache", cacheStorage.(cache.Storage), cache.NoProtection(), 10000, 0, 500*time.Millisecond)
if err != nil {
t.Fatalf("unable to create base cache: %v", err)
}
cache := &contentCacheForData{
st: underlyingStorage,
cacheBase: cb,
cc := &contentCacheForData{
st: underlyingStorage,
pc: pc,
}
ctx := testlogging.Context(t)
defer cache.close(ctx)
defer cc.close(ctx)
_, err = cache.getContent(ctx, "00000a", "content-4k", 0, -1) // 4k
_, err = cc.getContent(ctx, "00000a", "content-4k", 0, -1) // 4k
assertNoError(t, err)
_, err = cache.getContent(ctx, "00000b", "content-4k", 0, -1) // 4k
_, err = cc.getContent(ctx, "00000b", "content-4k", 0, -1) // 4k
assertNoError(t, err)
_, err = cache.getContent(ctx, "00000c", "content-4k", 0, -1) // 4k
_, err = cc.getContent(ctx, "00000c", "content-4k", 0, -1) // 4k
assertNoError(t, err)
_, err = cache.getContent(ctx, "00000d", "content-4k", 0, -1) // 4k
_, err = cc.getContent(ctx, "00000d", "content-4k", 0, -1) // 4k
assertNoError(t, err)
// wait for a sweep
@@ -94,7 +96,7 @@ func TestCacheExpiration(t *testing.T) {
}
for _, tc := range cases {
_, got := cache.getContent(ctx, cacheKey(tc.blobID), "content-4k", 0, -1)
_, got := cc.getContent(ctx, cacheKey(tc.blobID), "content-4k", 0, -1)
if want := tc.expectedError; !errors.Is(got, want) {
t.Errorf("unexpected error when getting content %v: %v wanted %v", tc.blobID, got, want)
} else {
@@ -110,22 +112,22 @@ func TestDiskContentCache(t *testing.T) {
const maxBytes = 10000
cacheStorage, err := newCacheStorageOrNil(ctx, tmpDir, maxBytes, "contents")
cacheStorage, err := cache.NewStorageOrNil(ctx, tmpDir, maxBytes, "contents")
if err != nil {
t.Fatal(err)
}
cache, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil)
cc, err := newContentCacheForData(ctx, newUnderlyingStorageForContentCacheTesting(t), cacheStorage, maxBytes, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer cache.close(ctx)
defer cc.close(ctx)
verifyContentCache(t, cache)
verifyContentCache(t, cc, cacheStorage)
}
func verifyContentCache(t *testing.T, cache contentCache) {
func verifyContentCache(t *testing.T, cc contentCache, cacheStorage blob.Storage) {
t.Helper()
ctx := testlogging.Context(t)
@@ -147,12 +149,12 @@ func verifyContentCache(t *testing.T, cache contentCache) {
{"xf0f0f3", "no-such-content", 0, -1, nil, blob.ErrBlobNotFound},
{"xf0f0f4", "no-such-content", 10, 5, nil, blob.ErrBlobNotFound},
{"f0f0f5", "content-1", 7, 3, []byte{8, 9, 10}, nil},
{"xf0f0f6", "content-1", 11, 10, nil, errors.Errorf("error getting content from cache: invalid offset: 11: invalid blob offset or length")},
{"xf0f0f6", "content-1", -1, 5, nil, errors.Errorf("error getting content from cache: invalid offset: -1: invalid blob offset or length")},
{"xf0f0f6", "content-1", 11, 10, nil, errors.Errorf("invalid offset: 11: invalid blob offset or length")},
{"xf0f0f6", "content-1", -1, 5, nil, errors.Errorf("invalid offset: -1: invalid blob offset or length")},
}
for _, tc := range cases {
v, err := cache.getContent(ctx, tc.cacheKey, tc.blobID, tc.offset, tc.length)
v, err := cc.getContent(ctx, tc.cacheKey, tc.blobID, tc.offset, tc.length)
if (err != nil) != (tc.err != nil) {
t.Errorf("unexpected error for %v: %+v, wanted %+v", tc.cacheKey, err, tc.err)
} else if err != nil && err.Error() != tc.err.Error() {
@@ -163,27 +165,25 @@ func verifyContentCache(t *testing.T, cache contentCache) {
}
}
verifyStorageContentList(t, cache.(*contentCacheForData).cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5")
verifyStorageContentList(t, cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5")
})
t.Run("DataCorruption", func(t *testing.T) {
var cacheKey blob.ID = "f0f0f1x"
d, err := cache.(*contentCacheForData).cacheStorage.GetBlob(ctx, cacheKey, 0, -1)
if err != nil {
t.Fatalf("unable to retrieve data from cache: %v", err)
}
const cacheKey = "f0f0f1x"
d, err := cacheStorage.GetBlob(ctx, cacheKey, 0, -1)
must(t, err)
// corrupt the data and write back
d[0] ^= 1
if puterr := cache.(*contentCacheForData).cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)); puterr != nil {
t.Fatalf("unable to write corrupted content: %v", puterr)
}
must(t, cacheStorage.PutBlob(ctx, cacheKey, gather.FromSlice(d)))
v, err := cache.getContent(ctx, "xf0f0f1", "content-1", 1, 5)
v, err := cc.getContent(ctx, "xf0f0f1", "content-1", 1, 5)
if err != nil {
t.Fatalf("error in getContent: %v", err)
}
if got, want := v, []byte{2, 3, 4, 5, 6}; !reflect.DeepEqual(v, want) {
t.Errorf("invalid result when reading corrupted data: %v, wanted %v", got, want)
}
@@ -206,7 +206,7 @@ func TestCacheFailureToOpen(t *testing.T) {
}
// Will fail because of ListBlobs failure.
_, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
_, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil)
if err == nil || !strings.Contains(err.Error(), someError.Error()) {
t.Errorf("invalid error %v, wanted: %v", err, someError)
}
@@ -214,12 +214,12 @@ func TestCacheFailureToOpen(t *testing.T) {
// ListBlobs fails only once, next time it succeeds.
ctx := testlogging.Context(t)
cache, err := newContentCacheForData(ctx, underlyingStorage, faultyCache, 10000, nil)
cc, err := newContentCacheForData(ctx, underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer cache.close(ctx)
cc.close(ctx)
}
func TestCacheFailureToWrite(t *testing.T) {
@@ -232,14 +232,14 @@ func TestCacheFailureToWrite(t *testing.T) {
Base: cacheStorage,
}
cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
ctx := testlogging.Context(t)
defer cache.close(ctx)
defer cc.close(ctx)
faultyCache.Faults = map[string][]*blobtesting.Fault{
"PutBlob": {
@@ -247,7 +247,7 @@ func TestCacheFailureToWrite(t *testing.T) {
},
}
v, err := cache.getContent(ctx, "aa", "content-1", 0, 3)
v, err := cc.getContent(ctx, "aa", "content-1", 0, 3)
if err != nil {
t.Errorf("write failure wasn't ignored: %v", err)
}
@@ -276,14 +276,14 @@ func TestCacheFailureToRead(t *testing.T) {
Base: cacheStorage,
}
cache, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, faultyCache, 10000, nil)
cc, err := newContentCacheForData(testlogging.Context(t), underlyingStorage, withoutTouchBlob{faultyCache}, 10000, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
ctx := testlogging.Context(t)
defer cache.close(ctx)
defer cc.close(ctx)
faultyCache.Faults = map[string][]*blobtesting.Fault{
"GetBlob": {
@@ -292,7 +292,7 @@ func TestCacheFailureToRead(t *testing.T) {
}
for i := 0; i < 2; i++ {
v, err := cache.getContent(ctx, "aa", "content-1", 0, 3)
v, err := cc.getContent(ctx, "aa", "content-1", 0, 3)
if err != nil {
t.Errorf("read failure wasn't ignored: %v", err)
}
@@ -329,3 +329,11 @@ func assertNoError(t *testing.T, err error) {
t.Errorf("err: %v", err)
}
}
type withoutTouchBlob struct {
blob.Storage
}
func (c withoutTouchBlob) TouchBlob(ctx context.Context, blobID blob.ID, threshold time.Duration) error {
return errors.Errorf("TouchBlob not implemented")
}

View File

@@ -15,6 +15,7 @@
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/ctxutil"
apipb "github.com/kopia/kopia/internal/grpcapi"
@@ -58,6 +59,8 @@ type grpcRepositoryClient struct {
objectFormat object.Format
cliOpts ClientOptions
omgr *object.Manager
contentCache *cache.PersistentCache
}
type grpcInnerSession struct {
@@ -383,7 +386,7 @@ func (r *grpcInnerSession) Flush(ctx context.Context) error {
}
func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOptions) (RepositoryWriter, error) {
w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, opt, false)
w, err := newGRPCAPIRepositoryForConnection(ctx, r.conn, r.connRefCount, r.cliOpts, opt, r.contentCache, false)
if err != nil {
return nil, err
}
@@ -493,14 +496,16 @@ func unhandledSessionResponse(resp *apipb.SessionResponse) error {
}
func (r *grpcRepositoryClient) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
v, err := r.maybeRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) {
return sess.GetContent(ctx, contentID)
})
if err != nil {
return nil, err
}
return r.contentCache.GetOrLoad(ctx, string(contentID), func() ([]byte, error) {
v, err := r.maybeRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) {
return sess.GetContent(ctx, contentID)
})
if err != nil {
return nil, err
}
return v.([]byte), nil
return v.([]byte), nil
})
}
func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
@@ -587,6 +592,9 @@ func (r *grpcRepositoryClient) Close(ctx context.Context) error {
if atomic.AddInt32(r.connRefCount, -1) == 0 {
log(ctx).Debugf("closing GPRC connection to %v", r.conn.Target())
defer r.contentCache.Close(ctx)
return errors.Wrap(r.conn.Close(), "error closing GRPC connection")
}
@@ -620,7 +628,7 @@ func (c grpcCreds) RequireTransportSecurity() bool {
// OpenGRPCAPIRepository opens the Repository based on remote GRPC server.
// The APIServerInfo must have the address of the repository as 'kopia://host:port'
func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, password string) (Repository, error) {
func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, contentCache *cache.PersistentCache, password string) (Repository, error) {
var transportCreds credentials.TransportCredentials
if si.TrustedServerCertificateFingerprint != "" {
@@ -651,7 +659,7 @@ func OpenGRPCAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
return nil, errors.Wrap(err, "dial error")
}
rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, WriteSessionOptions{}, true)
rep, err := newGRPCAPIRepositoryForConnection(ctx, conn, new(int32), cliOpts, WriteSessionOptions{}, contentCache, true)
if err != nil {
return nil, err
}
@@ -719,7 +727,7 @@ func (r *grpcRepositoryClient) killInnerSession() {
}
// newGRPCAPIRepositoryForConnection opens GRPC-based repository connection.
func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, opt WriteSessionOptions, transparentRetries bool) (*grpcRepositoryClient, error) {
func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientConn, connRefCount *int32, cliOpts ClientOptions, opt WriteSessionOptions, contentCache *cache.PersistentCache, transparentRetries bool) (*grpcRepositoryClient, error) {
if opt.OnUpload == nil {
opt.OnUpload = func(i int64) {}
}
@@ -731,6 +739,7 @@ func newGRPCAPIRepositoryForConnection(ctx context.Context, conn *grpc.ClientCon
transparentRetries: transparentRetries,
opt: opt,
isReadOnly: cliOpts.ReadOnly,
contentCache: contentCache,
}
v, err := rr.inSessionWithoutRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) {

View File

@@ -10,8 +10,10 @@
"time"
"github.com/pkg/errors"
"golang.org/x/crypto/scrypt"
"github.com/kopia/kopia/internal/atomicfile"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/repo/blob"
loggingwrapper "github.com/kopia/kopia/repo/blob/logging"
"github.com/kopia/kopia/repo/blob/readonly"
@@ -75,28 +77,61 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
return nil, err
}
// cache directory is stored as relative to config file name, resolve it to absolute.
if lc.Caching != nil {
if lc.Caching.CacheDirectory != "" && !filepath.IsAbs(lc.Caching.CacheDirectory) {
lc.Caching.CacheDirectory = filepath.Join(filepath.Dir(configFile), lc.Caching.CacheDirectory)
}
}
if lc.APIServer != nil {
return OpenAPIServer(ctx, lc.APIServer, lc.ClientOptions, password)
return OpenAPIServer(ctx, lc.APIServer, lc.ClientOptions, lc.Caching, password)
}
return openDirect(ctx, configFile, lc, password, options)
}
// OpenAPIServer connects remote repository over Kopia API.
func OpenAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, password string) (Repository, error) {
if si.DisableGRPC {
return openRestAPIRepository(ctx, si, cliOpts, password)
func getContentCacheOrNil(ctx context.Context, opt *content.CachingOptions, password string) (*cache.PersistentCache, error) {
opt = opt.CloneOrDefault()
cs, err := cache.NewStorageOrNil(ctx, opt.CacheDirectory, opt.MaxCacheSizeBytes, "server-contents")
if cs == nil {
// this may be (nil, nil) or (nil, err)
return nil, errors.Wrap(err, "error opening storage")
}
return OpenGRPCAPIRepository(ctx, si, cliOpts, password)
// derive content cache key from the password & HMAC secret using scrypt.
salt := append([]byte("content-cache-protection"), opt.HMACSecret...)
cacheEncryptionKey, err := scrypt.Key([]byte(password), salt, 65536, 8, 1, 32)
if err != nil {
return nil, errors.Wrap(err, "unable to derive cache encryption key from password")
}
prot, err := cache.AuthenticatedEncryptionProtection(cacheEncryptionKey)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize protection")
}
return cache.NewPersistentCache(ctx, "cache-storage", cs, prot, opt.MaxCacheSizeBytes, cache.DefaultTouchThreshold, cache.DefaultSweepFrequency)
}
// OpenAPIServer connects remote repository over Kopia API.
func OpenAPIServer(ctx context.Context, si *APIServerInfo, cliOpts ClientOptions, cachingOptions *content.CachingOptions, password string) (Repository, error) {
contentCache, err := getContentCacheOrNil(ctx, cachingOptions, password)
if err != nil {
return nil, errors.Wrap(err, "error opening content cache")
}
if si.DisableGRPC {
return openRestAPIRepository(ctx, si, cliOpts, contentCache, password)
}
return OpenGRPCAPIRepository(ctx, si, cliOpts, contentCache, password)
}
// openDirect opens the repository that directly manipulates blob storage..
func openDirect(ctx context.Context, configFile string, lc *LocalConfig, password string, options *Options) (rep Repository, err error) {
if lc.Caching.CacheDirectory != "" && !filepath.IsAbs(lc.Caching.CacheDirectory) {
lc.Caching.CacheDirectory = filepath.Join(filepath.Dir(configFile), lc.Caching.CacheDirectory)
}
if lc.Storage == nil {
return nil, errors.Errorf("storage not set in the configuration file")
}

View File

@@ -109,7 +109,7 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
}, repo.ClientOptions{
Username: "foo",
Hostname: "bar",
}, "baz")
}, nil, "baz")
if err != nil {
t.Fatal(err)
}
@@ -242,7 +242,7 @@ func testAPIServerRepository(t *testing.T, serverStartArgs []string, useGRPC, al
}, repo.ClientOptions{
Username: "foo",
Hostname: "bar",
}, "baz")
}, nil, "baz")
if dur := clock.Since(t0); dur > 15*time.Second {
t.Fatalf("failed connection took %v", dur)