refactored block cache and list cache

This commit is contained in:
Jarek Kowalski
2018-06-10 12:55:42 -07:00
parent 846a46b879
commit e9c11ea106
7 changed files with 334 additions and 309 deletions

View File

@@ -1,62 +1,171 @@
package block
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/kopia/kopia/storage"
"github.com/kopia/kopia/storage/filesystem"
"github.com/rs/zerolog/log"
)
type blockCache interface {
getContentBlock(ctx context.Context, cacheKey string, physicalBlockID string, offset, length int64) ([]byte, error)
listIndexBlocks(ctx context.Context) ([]IndexInfo, error)
deleteListCache(ctx context.Context)
close() error
const (
sweepCacheFrequency = 1 * time.Minute
)
type blockCache struct {
st storage.Storage
cacheStorage storage.Storage
maxSizeBytes int64
hmacSecret []byte
mu sync.Mutex
lastTotalSizeBytes int64
closed chan struct{}
}
// CachingOptions specifies configuration of local cache.
type CachingOptions struct {
CacheDirectory string `json:"cacheDirectory,omitempty"`
MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"`
MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"`
IgnoreListCache bool `json:"-"`
HMACSecret []byte `json:"-"`
}
func newBlockCache(ctx context.Context, st storage.Storage, caching CachingOptions) (blockCache, error) {
if caching.MaxCacheSizeBytes == 0 || caching.CacheDirectory == "" {
return nullBlockCache{st}, nil
func adjustCacheKey(cacheKey string) string {
// block IDs with odd length have a single-byte prefix.
// move the prefix to the end of cache key to make sure the top level shard is spread 256 ways.
if len(cacheKey)%2 == 1 {
return cacheKey[1:] + cacheKey[0:1]
}
blockCacheDir := filepath.Join(caching.CacheDirectory, "blocks")
return cacheKey
}
if _, err := os.Stat(blockCacheDir); os.IsNotExist(err) {
if err := os.MkdirAll(blockCacheDir, 0700); err != nil {
return nil, err
func (c *blockCache) getContentBlock(ctx context.Context, cacheKey string, physicalBlockID string, offset, length int64) ([]byte, error) {
cacheKey = adjustCacheKey(cacheKey)
useCache := shouldUseBlockCache(ctx) && c.cacheStorage != nil
if useCache {
b, err := c.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err == nil {
b, err = verifyAndStripHMAC(b, c.hmacSecret)
if err == nil {
// retrieved from cache and HMAC valid
return b, nil
}
// ignore malformed blocks
log.Warn().Msgf("malformed block %v: %v", cacheKey, err)
} else if err != storage.ErrBlockNotFound {
log.Warn().Msgf("unable to read cache %v: %v", cacheKey, err)
}
}
cacheStorage, err := filesystem.New(context.Background(), &filesystem.Options{
Path: blockCacheDir,
DirectoryShards: []int{2},
})
if err != nil {
b, err := c.st.GetBlock(ctx, physicalBlockID, offset, length)
if err == storage.ErrBlockNotFound {
// not found in underlying storage
return nil, err
}
c := &localStorageCache{
st: st,
cacheStorage: cacheStorage,
maxSizeBytes: caching.MaxCacheSizeBytes,
hmacSecret: append([]byte(nil), caching.HMACSecret...),
listCacheDuration: time.Duration(caching.MaxListCacheDurationSec) * time.Second,
closed: make(chan struct{}),
if err == nil && useCache {
if puterr := c.cacheStorage.PutBlock(ctx, cacheKey, bytes.NewReader(appendHMAC(b, c.hmacSecret))); puterr != nil {
log.Warn().Msgf("unable to write cache item %v: %v", cacheKey, puterr)
}
}
if caching.IgnoreListCache {
c.deleteListCache(ctx)
return b, err
}
func (c *blockCache) close() {
close(c.closed)
}
func (c *blockCache) sweepDirectoryPeriodically(ctx context.Context) {
for {
select {
case <-c.closed:
return
case <-time.After(sweepCacheFrequency):
err := c.sweepDirectory(ctx)
if err != nil {
log.Printf("warning: blockCache sweep failed: %v", err)
}
}
}
}
func (c *blockCache) sweepDirectory(ctx context.Context) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cacheStorage == nil {
return nil
}
t0 := time.Now()
log.Debug().Msg("sweeping cache")
ctx, cancel := context.WithCancel(ctx)
ch := c.cacheStorage.ListBlocks(ctx, "")
defer cancel()
var items []storage.BlockMetadata
for it := range ch {
if it.Error != nil {
return fmt.Errorf("error listing cache: %v", it.Error)
}
items = append(items, it)
}
sort.Slice(items, func(i, j int) bool {
return items[i].TimeStamp.After(items[j].TimeStamp)
})
var totalRetainedSize int64
for _, it := range items {
if totalRetainedSize > c.maxSizeBytes {
if err := c.cacheStorage.DeleteBlock(ctx, it.BlockID); err != nil {
log.Warn().Msgf("unable to remove %v: %v", it.BlockID, err)
}
} else {
totalRetainedSize += it.Length
}
}
log.Debug().Msgf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes)
c.lastTotalSizeBytes = totalRetainedSize
return nil
}
func newBlockCache(ctx context.Context, st storage.Storage, caching CachingOptions) (*blockCache, error) {
var cacheStorage storage.Storage
var err error
if caching.MaxCacheSizeBytes > 0 && caching.CacheDirectory != "" {
blockCacheDir := filepath.Join(caching.CacheDirectory, "blocks")
if _, err = os.Stat(blockCacheDir); os.IsNotExist(err) {
if err = os.MkdirAll(blockCacheDir, 0700); err != nil {
return nil, err
}
}
cacheStorage, err = filesystem.New(context.Background(), &filesystem.Options{
Path: blockCacheDir,
DirectoryShards: []int{2},
})
if err != nil {
return nil, err
}
}
c := &blockCache{
st: st,
cacheStorage: cacheStorage,
maxSizeBytes: caching.MaxCacheSizeBytes,
hmacSecret: append([]byte(nil), caching.HMACSecret...),
closed: make(chan struct{}),
}
if err := c.sweepDirectory(ctx); err != nil {

View File

@@ -59,9 +59,10 @@ type IndexInfo struct {
type Manager struct {
Format FormattingOptions
stats Stats
cache blockCache
st storage.Storage
stats Stats
blockCache *blockCache
listCache *listCache
st storage.Storage
mu sync.Mutex
locked bool
@@ -387,7 +388,7 @@ func appendRandomBytes(b []byte, count int) ([]byte, error) {
// IndexBlocks returns the list of active index blocks, sorted by time.
func (bm *Manager) IndexBlocks(ctx context.Context) ([]IndexInfo, error) {
blocks, err := bm.cache.listIndexBlocks(ctx)
blocks, err := bm.listCache.listIndexBlocks(ctx)
if err != nil {
return nil, err
}
@@ -411,13 +412,13 @@ func (bm *Manager) loadPackIndexesLocked(ctx context.Context) ([]IndexInfo, erro
}
if i > 0 {
bm.cache.deleteListCache(ctx)
bm.listCache.deleteListCache(ctx)
log.Printf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i)
time.Sleep(nextSleepTime)
nextSleepTime *= 2
}
blocks, err := bm.cache.listIndexBlocks(ctx)
blocks, err := bm.listCache.listIndexBlocks(ctx)
if err != nil {
return nil, err
}
@@ -503,6 +504,11 @@ func (bm *Manager) unprocessedIndexBlocks(blocks []IndexInfo) (<-chan string, er
return ch, nil
}
// Close closes the block manager.
func (bm *Manager) Close() {
bm.blockCache.close()
}
// CompactIndexes performs compaction of index blocks ensuring that # of small blocks is between minSmallBlockCount and maxSmallBlockCount
func (bm *Manager) CompactIndexes(ctx context.Context, minSmallBlockCount int, maxSmallBlockCount int) error {
bm.lock()
@@ -662,7 +668,7 @@ func (bm *Manager) compactAndDeleteIndexBlocks(ctx context.Context, indexBlocks
continue
}
bm.cache.deleteListCache(ctx)
bm.listCache.deleteListCache(ctx)
if err := bm.st.DeleteBlock(ctx, indexBlock.FileName); err != nil {
log.Warn().Msgf("unable to delete compacted block %q: %v", indexBlock.FileName, err)
}
@@ -756,7 +762,7 @@ func (bm *Manager) writePackDataNotLocked(ctx context.Context, data []byte) (str
atomic.AddInt32(&bm.stats.WrittenBlocks, 1)
atomic.AddInt64(&bm.stats.WrittenBytes, int64(len(data)))
bm.cache.deleteListCache(ctx)
bm.listCache.deleteListCache(ctx)
if err := bm.st.PutBlock(ctx, physicalBlockID, bytes.NewReader(data)); err != nil {
return "", err
}
@@ -777,7 +783,7 @@ func (bm *Manager) encryptAndWriteBlockNotLocked(ctx context.Context, data []byt
atomic.AddInt32(&bm.stats.WrittenBlocks, 1)
atomic.AddInt64(&bm.stats.WrittenBytes, int64(len(data)))
bm.cache.deleteListCache(ctx)
bm.listCache.deleteListCache(ctx)
if err := bm.st.PutBlock(ctx, physicalBlockID, bytes.NewReader(data2)); err != nil {
return "", err
}
@@ -866,7 +872,7 @@ func (bm *Manager) getPackedBlockInternalLocked(ctx context.Context, blockID str
packFile := bi.PackFile
bm.mu.Unlock()
payload, err := bm.cache.getContentBlock(ctx, blockID, packFile, int64(bi.PackOffset), int64(bi.Length))
payload, err := bm.blockCache.getContentBlock(ctx, blockID, packFile, int64(bi.PackOffset), int64(bi.Length))
bm.mu.Lock()
if err != nil {
return nil, false, fmt.Errorf("unable to read storage block %v", err)
@@ -906,7 +912,7 @@ func (bm *Manager) decryptAndVerifyPayload(formatVersion byte, payload []byte, o
}
func (bm *Manager) getPhysicalBlockInternal(ctx context.Context, blockID string) ([]byte, error) {
payload, err := bm.cache.getContentBlock(ctx, blockID, blockID, 0, -1)
payload, err := bm.blockCache.getContentBlock(ctx, blockID, blockID, 0, -1)
if err != nil {
return nil, err
}
@@ -1032,9 +1038,14 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting
return nil, err
}
cache, err := newBlockCache(ctx, st, caching)
blockCache, err := newBlockCache(ctx, st, caching)
if err != nil {
return nil, fmt.Errorf("unable to initialize cache: %v", err)
return nil, fmt.Errorf("unable to initialize block cache: %v", err)
}
listCache, err := newListCache(ctx, st, caching)
if err != nil {
return nil, fmt.Errorf("unable to initialize list cache: %v", err)
}
var cbi committedBlockIndex
@@ -1059,7 +1070,8 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting
minPreambleLength: defaultMinPreambleLength,
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
cache: cache,
blockCache: blockCache,
listCache: listCache,
st: st,
activeBlocksExtraTime: activeBlocksExtraTime,
writeFormatVersion: int32(f.Version),

33
block/cache_hmac.go Normal file
View File

@@ -0,0 +1,33 @@
package block
import "crypto/hmac"
import "crypto/sha256"
import "errors"
func appendHMAC(data []byte, secret []byte) []byte {
h := hmac.New(sha256.New, secret)
h.Write(data) // nolint:errcheck
return h.Sum(data)
}
func verifyAndStripHMAC(b []byte, secret []byte) ([]byte, error) {
if len(b) < sha256.Size {
return nil, errors.New("invalid data - too short")
}
p := len(b) - sha256.Size
data := b[0:p]
signature := b[p:]
h := hmac.New(sha256.New, secret)
h.Write(data) // nolint:errcheck
validSignature := h.Sum(nil)
if len(signature) != len(validSignature) {
return nil, errors.New("invalid signature length")
}
if hmac.Equal(validSignature, signature) {
return data, nil
}
return nil, errors.New("invalid data - corrupted")
}

10
block/caching_options.go Normal file
View File

@@ -0,0 +1,10 @@
package block
// CachingOptions specifies configuration of local cache.
type CachingOptions struct {
CacheDirectory string `json:"cacheDirectory,omitempty"`
MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"`
MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"`
IgnoreListCache bool `json:"-"`
HMACSecret []byte `json:"-"`
}

121
block/list_cache.go Normal file
View File

@@ -0,0 +1,121 @@
package block
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/kopia/kopia/storage"
"github.com/rs/zerolog/log"
)
type listCache struct {
st storage.Storage
cacheFile string
listCacheDuration time.Duration
hmacSecret []byte
}
func (c *listCache) listIndexBlocks(ctx context.Context) ([]IndexInfo, error) {
if c.cacheFile != "" {
ci, err := c.readBlocksFromCache(ctx)
if err == nil {
expirationTime := ci.Timestamp.Add(c.listCacheDuration)
if time.Now().Before(expirationTime) {
log.Debug().Msg("retrieved list of index blocks from cache")
return ci.Blocks, nil
}
} else if err != storage.ErrBlockNotFound {
log.Warn().Err(err).Msgf("unable to open cache file")
}
}
log.Debug().Msg("listing index blocks from source")
blocks, err := listIndexBlocksFromStorage(ctx, c.st)
if err == nil {
c.saveListToCache(ctx, &cachedList{
Blocks: blocks,
Timestamp: time.Now(),
})
}
return blocks, err
}
func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) {
if c.cacheFile == "" {
return
}
log.Debug().Int("count", len(ci.Blocks)).Msg("saving index blocks to cache")
if data, err := json.Marshal(ci); err == nil {
if err := ioutil.WriteFile(c.cacheFile, appendHMAC(data, c.hmacSecret), 0600); err != nil {
log.Warn().Msgf("unable to write list cache: %v", err)
}
}
}
func (c *listCache) deleteListCache(ctx context.Context) {
if c.cacheFile != "" {
os.Remove(c.cacheFile) //nolint:errcheck
}
}
func (c *listCache) readBlocksFromCache(ctx context.Context) (*cachedList, error) {
if !shouldUseListCache(ctx) {
return nil, storage.ErrBlockNotFound
}
ci := &cachedList{}
data, err := ioutil.ReadFile(c.cacheFile)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrBlockNotFound
}
return nil, err
}
data, err = verifyAndStripHMAC(data, c.hmacSecret)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &ci); err != nil {
return nil, fmt.Errorf("can't unmarshal cached list results: %v", err)
}
return ci, nil
}
func newListCache(ctx context.Context, st storage.Storage, caching CachingOptions) (*listCache, error) {
var listCacheFile string
if caching.CacheDirectory != "" {
listCacheFile = filepath.Join(caching.CacheDirectory, "list")
if _, err := os.Stat(caching.CacheDirectory); os.IsNotExist(err) {
if err := os.MkdirAll(caching.CacheDirectory, 0700); err != nil {
return nil, err
}
}
}
c := &listCache{
st: st,
cacheFile: listCacheFile,
hmacSecret: caching.HMACSecret,
listCacheDuration: time.Duration(caching.MaxListCacheDurationSec) * time.Second,
}
if caching.IgnoreListCache {
c.deleteListCache(ctx)
}
return c, nil
}

View File

@@ -1,234 +0,0 @@
package block
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"sort"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/kopia/kopia/storage"
)
const (
sweepCacheFrequency = 1 * time.Minute
fullListCacheItem = "list"
)
type localStorageCache struct {
st storage.Storage
cacheStorage storage.Storage
maxSizeBytes int64
listCacheDuration time.Duration
hmacSecret []byte
mu sync.Mutex
lastTotalSizeBytes int64
closed chan struct{}
}
func (c *localStorageCache) getContentBlock(ctx context.Context, cacheKey string, physicalBlockID string, offset, length int64) ([]byte, error) {
// block IDs with odd length have a single-byte prefix.
// move the prefix to the end of cache key to make sure the top level shard is spread 256 ways.
if len(cacheKey)%2 == 1 {
cacheKey = cacheKey[1:] + cacheKey[0:1]
}
useCache := shouldUseBlockCache(ctx)
if useCache {
b, err := c.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err == nil {
b, err = c.verifyHMAC(b)
if err == nil {
// retrieved from cache and HMAC valid
return b, nil
}
// ignore malformed blocks
log.Warn().Msgf("malformed block %v: %v", cacheKey, err)
} else if err != storage.ErrBlockNotFound {
log.Warn().Msgf("unable to read cache %v: %v", cacheKey, err)
}
}
b, err := c.st.GetBlock(ctx, physicalBlockID, offset, length)
if err == storage.ErrBlockNotFound {
// not found in underlying storage
return nil, err
}
if err == nil && useCache {
c.writeToCacheBestEffort(ctx, cacheKey, b)
}
return b, err
}
func (c *localStorageCache) writeToCacheBestEffort(ctx context.Context, cacheKey string, data []byte) {
rdr := io.MultiReader(
bytes.NewReader(data),
bytes.NewReader(c.computeHMAC(data)),
)
if err := c.cacheStorage.PutBlock(ctx, cacheKey, rdr); err != nil {
log.Warn().Msgf("unable to write cache item %v: %v", cacheKey, err)
}
}
func (c *localStorageCache) listIndexBlocks(ctx context.Context) ([]IndexInfo, error) {
ci, err := c.readBlocksFromCacheBlock(ctx, fullListCacheItem)
if err == nil {
expirationTime := ci.Timestamp.Add(c.listCacheDuration)
if time.Now().Before(expirationTime) {
log.Debug().Str("blockID", fullListCacheItem).Msg("retrieved index blocks from cache")
return ci.Blocks, nil
}
} else if err != storage.ErrBlockNotFound {
log.Warn().Err(err).Msgf("unable to open cache file")
}
log.Debug().Msg("listing index blocks from source")
blocks, err := listIndexBlocksFromStorage(ctx, c.st)
if err == nil {
c.saveListToCache(ctx, fullListCacheItem, &cachedList{
Blocks: blocks,
Timestamp: time.Now(),
})
}
return blocks, err
}
func (c *localStorageCache) saveListToCache(ctx context.Context, cachedListBlockID string, ci *cachedList) {
log.Debug().Str("blockID", cachedListBlockID).Int("count", len(ci.Blocks)).Msg("saving index blocks to cache")
if data, err := json.Marshal(ci); err == nil {
c.writeToCacheBestEffort(ctx, cachedListBlockID, data)
}
}
func (c *localStorageCache) readBlocksFromCacheBlock(ctx context.Context, blockID string) (*cachedList, error) {
if !shouldUseListCache(ctx) {
return nil, storage.ErrBlockNotFound
}
ci := &cachedList{}
data, err := c.cacheStorage.GetBlock(ctx, blockID, 0, -1)
if err != nil {
return nil, err
}
data, err = c.verifyHMAC(data)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &ci); err != nil {
return nil, fmt.Errorf("can't unmarshal cached list results: %v", err)
}
return ci, nil
}
func (c *localStorageCache) computeHMAC(data []byte) []byte {
h := hmac.New(sha256.New, c.hmacSecret)
h.Write(data) // nolint:errcheck
return h.Sum(nil)
}
func (c *localStorageCache) verifyHMAC(b []byte) ([]byte, error) {
if len(b) < sha256.Size {
return nil, errors.New("invalid data - too short")
}
p := len(b) - sha256.Size
data := b[0:p]
signature := b[p:]
validSignature := c.computeHMAC(data)
if len(signature) != len(validSignature) {
return nil, errors.New("invalid signature length")
}
if hmac.Equal(validSignature, signature) {
return data, nil
}
return nil, errors.New("invalid data - corrupted")
}
func (c *localStorageCache) close() error {
close(c.closed)
return nil
}
func (c *localStorageCache) sweepDirectoryPeriodically(ctx context.Context) {
for {
select {
case <-c.closed:
return
case <-time.After(sweepCacheFrequency):
err := c.sweepDirectory(ctx)
if err != nil {
log.Printf("warning: blockCache sweep failed: %v", err)
}
}
}
}
func (c *localStorageCache) sweepDirectory(ctx context.Context) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.maxSizeBytes == 0 {
return nil
}
t0 := time.Now()
log.Debug().Msg("sweeping cache")
ctx, cancel := context.WithCancel(ctx)
ch := c.cacheStorage.ListBlocks(ctx, "")
defer cancel()
var items []storage.BlockMetadata
for it := range ch {
if it.Error != nil {
return fmt.Errorf("error listing cache: %v", it.Error)
}
items = append(items, it)
}
sort.Slice(items, func(i, j int) bool {
return items[i].TimeStamp.After(items[j].TimeStamp)
})
var totalRetainedSize int64
for _, it := range items {
if totalRetainedSize > c.maxSizeBytes {
if err := c.cacheStorage.DeleteBlock(ctx, it.BlockID); err != nil {
log.Warn().Msgf("unable to remove %v: %v", it.BlockID, err)
}
} else {
totalRetainedSize += it.Length
}
}
log.Debug().Msgf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes)
c.lastTotalSizeBytes = totalRetainedSize
return nil
}
func (c *localStorageCache) deleteListCache(ctx context.Context) {
if err := c.cacheStorage.DeleteBlock(ctx, fullListCacheItem); err != nil && err != storage.ErrBlockNotFound {
log.Warn().Err(err).Msg("unable to delete cache item")
}
}

View File

@@ -1,26 +0,0 @@
package block
import (
"context"
"github.com/kopia/kopia/storage"
)
type nullBlockCache struct {
st storage.Storage
}
func (c nullBlockCache) getContentBlock(ctx context.Context, cacheKey string, blockID string, offset, length int64) ([]byte, error) {
return c.st.GetBlock(ctx, blockID, offset, length)
}
func (c nullBlockCache) listIndexBlocks(ctx context.Context) ([]IndexInfo, error) {
return listIndexBlocksFromStorage(ctx, c.st)
}
func (c nullBlockCache) deleteListCache(ctx context.Context) {
}
func (c nullBlockCache) close() error {
return nil
}