changed how caching is controlled in block manager by using context instead of options

This commit is contained in:
Jarek Kowalski
2018-06-10 10:20:52 -07:00
parent 9a17179615
commit 846a46b879
6 changed files with 88 additions and 48 deletions

34
block/context.go Normal file
View File

@@ -0,0 +1,34 @@
package block
import "context"
type contextKey string
var useBlockCacheContextKey contextKey = "use-block-cache"
var useListCacheContextKey contextKey = "use-list-cache"
// UsingBlockCache returns a derived context that causes block manager to use cache.
func UsingBlockCache(ctx context.Context, enabled bool) context.Context {
return context.WithValue(ctx, useBlockCacheContextKey, enabled)
}
// UsingListCache returns a derived context that causes block manager to use cache.
func UsingListCache(ctx context.Context, enabled bool) context.Context {
return context.WithValue(ctx, useListCacheContextKey, enabled)
}
func shouldUseBlockCache(ctx context.Context) bool {
if enabled, ok := ctx.Value(useBlockCacheContextKey).(bool); ok {
return enabled
}
return true
}
func shouldUseListCache(ctx context.Context) bool {
if enabled, ok := ctx.Value(useBlockCacheContextKey).(bool); ok {
return enabled
}
return true
}

View File

@@ -42,27 +42,31 @@ func (c *localStorageCache) getContentBlock(ctx context.Context, cacheKey string
if len(cacheKey)%2 == 1 {
cacheKey = cacheKey[1:] + cacheKey[0:1]
}
b, err := c.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err == nil {
b, err = c.verifyHMAC(b)
if err == nil {
// retrieved from cache and HMAC valid
return b, nil
}
// ignore malformed blocks
log.Warn().Msgf("malformed block %v: %v", cacheKey, err)
} else if err != storage.ErrBlockNotFound {
log.Warn().Msgf("unable to read cache %v: %v", cacheKey, err)
useCache := shouldUseBlockCache(ctx)
if useCache {
b, err := c.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err == nil {
b, err = c.verifyHMAC(b)
if err == nil {
// retrieved from cache and HMAC valid
return b, nil
}
// ignore malformed blocks
log.Warn().Msgf("malformed block %v: %v", cacheKey, err)
} else if err != storage.ErrBlockNotFound {
log.Warn().Msgf("unable to read cache %v: %v", cacheKey, err)
}
}
b, err = c.st.GetBlock(ctx, physicalBlockID, offset, length)
b, err := c.st.GetBlock(ctx, physicalBlockID, offset, length)
if err == storage.ErrBlockNotFound {
// not found in underlying storage
return nil, err
}
if err == nil {
if err == nil && useCache {
c.writeToCacheBestEffort(ctx, cacheKey, b)
}
@@ -111,6 +115,10 @@ func (c *localStorageCache) saveListToCache(ctx context.Context, cachedListBlock
}
func (c *localStorageCache) readBlocksFromCacheBlock(ctx context.Context, blockID string) (*cachedList, error) {
if !shouldUseListCache(ctx) {
return nil, storage.ErrBlockNotFound
}
ci := &cachedList{}
data, err := c.cacheStorage.GetBlock(ctx, blockID, 0, -1)
if err != nil {

View File

@@ -6,6 +6,8 @@
"os"
"time"
"github.com/kopia/kopia/block"
"github.com/kopia/kopia/repo"
kingpin "gopkg.in/alecthomas/kingpin.v2"
@@ -41,6 +43,8 @@ func noRepositoryAction(act func(ctx context.Context) error) func(ctx *kingpin.P
func repositoryAction(act func(ctx context.Context, rep *repo.Repository) error) func(ctx *kingpin.ParseContext) error {
return func(kpc *kingpin.ParseContext) error {
ctx := context.Background()
ctx = block.UsingBlockCache(ctx, *enableCaching)
ctx = block.UsingListCache(ctx, *enableListCaching)
t0 := time.Now()
rep := mustOpenRepository(ctx, nil)

View File

@@ -9,6 +9,7 @@
"sync"
"time"
"github.com/kopia/kopia/block"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/parallelwork"
"github.com/kopia/kopia/object"
@@ -136,30 +137,16 @@ func (v *verifier) doVerifyDirectory(ctx context.Context, oid object.ID, path st
}
func (v *verifier) doVerifyObject(ctx context.Context, oid object.ID, path string, expectedLength int64) {
readFile := rand.Intn(100) < *verifyCommandFilesPercent
if expectedLength < 0 {
log.Printf("verifying object %v (readFile=%v)", oid, readFile)
log.Printf("verifying object %v", oid)
} else {
log.Printf("verifying object %v (%v) with length %v (readFile=%v)", path, oid, expectedLength, readFile)
log.Printf("verifying object %v (%v) with length %v", path, oid, expectedLength)
}
var length int64
var err error
if readFile {
var r object.Reader
r, err = v.om.Open(ctx, oid)
if err != nil {
v.reportError(path, fmt.Errorf("error verifying %v: %v", oid, err))
return
}
defer r.Close() //nolint:errcheck
length, err = io.Copy(ioutil.Discard, r)
} else {
length, _, err = v.om.VerifyObject(ctx, oid)
}
length, _, err = v.om.VerifyObject(ctx, oid)
if err != nil {
v.reportError(path, fmt.Errorf("error verifying %v: %v", oid, err))
}
@@ -167,6 +154,27 @@ func (v *verifier) doVerifyObject(ctx context.Context, oid object.ID, path strin
if expectedLength >= 0 && length != expectedLength {
v.reportError(path, fmt.Errorf("invalid object length %q, %v, expected %v", oid, length, expectedLength))
}
if rand.Intn(100) < *verifyCommandFilesPercent {
if err := v.readEntireObject(ctx, oid, path); err != nil {
v.reportError(path, fmt.Errorf("error reading object %v: %v", oid, err))
}
}
}
func (v *verifier) readEntireObject(ctx context.Context, oid object.ID, path string) error {
log.Printf("reading object %v %v", oid, path)
ctx = block.UsingBlockCache(ctx, false)
// also read the entire file
r, err := v.om.Open(ctx, oid)
if err != nil {
return err
}
defer r.Close() //nolint:errcheck
_, err = io.Copy(ioutil.Discard, r)
return err
}
func runVerifyCommand(ctx context.Context, rep *repo.Repository) error {

View File

@@ -86,9 +86,6 @@ func applyOptionsFromFlags(opts *repo.Options) *repo.Options {
opts.ObjectManagerOptions.Trace = log.Printf
}
opts.DisableCache = !*enableCaching
opts.DisableListCache = !*enableListCaching
return opts
}

View File

@@ -22,12 +22,9 @@
// Options provides configuration parameters for connection to a repository.
type Options struct {
Credentials auth.Credentials // Provides credentials required to open the repository if not persisted.
CredentialsCallback func() (auth.Credentials, error) // Callback that provides credentials required to open the repository if not persisted.
TraceStorage func(f string, args ...interface{}) // Logs all storage access using provided Printf-style function
DisableCache bool // disable caching
DisableListCache bool // disable list caching
Credentials auth.Credentials // Provides credentials required to open the repository if not persisted.
CredentialsCallback func() (auth.Credentials, error) // Callback that provides credentials required to open the repository if not persisted.
TraceStorage func(f string, args ...interface{}) // Logs all storage access using provided Printf-style function
ObjectManagerOptions object.ManagerOptions
}
@@ -71,15 +68,7 @@ func Open(ctx context.Context, configFile string, options *Options) (rep *Reposi
return nil, fmt.Errorf("cannot open storage: %v", err)
}
caching := lc.Caching
if options.DisableCache {
caching = block.CachingOptions{}
}
if options.DisableListCache {
caching.IgnoreListCache = true
}
r, err := connect(ctx, st, creds, options, caching)
r, err := connect(ctx, st, creds, options, lc.Caching)
if err != nil {
st.Close(ctx) //nolint:errcheck
return nil, err