Improvements for dealing with eventually-consistent stores (S3) (#437)

* content: added support for cache of own writes

Thi keeps track of which blobs (n and m) have been written by the
local repository client, so that even if the storage listing
is eventually consistent (as in S3), we get somewhat sane behavior.

Note that this is still assumming read-after-create semantics, which
S3 also guarantees, otherwise it's very hard to do anything useful.

* compaction: support for compaction logs

Instead of compaction immediately deleting source index blobs, we now
write log entries (with `m` prefix) which are merged on reads
and applied only if the blob list includes all inputs and outputs, in
which case the inputs are discarded since they are known to have been
superseded by the outputs.

This addresses eventual consistency issues in stores such as S3,
which don't guarantee list-after-put or list-after-delete. With such
stores the repository is ultimately eventually consistent and there's
not much that can be done about it, unless we use second strongly
consistent storage (such as GCS) for the index only.

* content: updated list cache to cache both `n` and `m`

* repo: fixed cache clear on windows

Clearing cache requires closing repository first, as Windows is holding
the files locked.

This requires ability to close the repository twice.

* content: refactored index blob management into indexBlobManager

* testing: fixed blobtesting.Map storage to allow overwrites

* blob: added debug output String() to blob.Metadata

* testing: added indexBlobManager stress test

This works by using N parallel "actors", each repeatedly performing
operations on indexBlobManagers all sharing single eventually consistent
storage.

Each actor runs in a loop and randomly selects between:

- *reading* all contents in indexes and verifying that it includes
  all contents written by the actor so far and that contents are
  correctly marked as deleted
- *creating* new contents
- *deleting* one of previously-created contents (by the same actor)
- *compacting* all index files into one

The test runs on accelerated time (every read of time moves it by 0.1
seconds) and simulates several hours of running.

In case of a failure, the log should provide enough debugging
information to trace the exact sequence of events leading up to the
failure - each log line is prefixed with actorID and all storage
access is logged.

* makefile: increase test timeout

* content: fixed index blob manager race

The race is where if we delete compaction log too early, it may lead to
previously deleted contents becoming temporarily live again to an
outside observer.

Added test case that reproduces the issue, verified that it fails
without the fix and passed with one.

* testing: improvements to TestIndexBlobManagerStress test

- better logging to be able to trace the root cause in case of a failure
- prevented concurrent compaction which is unsafe:

The sequence:

1. A creates contentA1 in INDEX-1
2. B creates contentB1 in INDEX-2
3. A deletes contentA1 in INDEX-3
4. B does compaction, but is not seeing INDEX-3 (due to EC or simply
   because B started read before #3 completed), so it writes
   INDEX-4==merge(INDEX-1,INDEX-2)
   * INDEX-4 has contentA1 as active
5. A does compaction but it's not seeing INDEX-4 yet (due to EC
   or because read started before #4), so it drops contentA1, writes
   INDEX-5=merge(INDEX-1,INDEX-2,INDEX-3)
   * INDEX-5 does not have contentA1
7. C sees INDEX-5 and INDEX-5 and merge(INDEX-4,INDEX-5)
   contains contentA1 which is wrong, because A has been deleted
   (and there's no record of it anywhere in the system)

* content: when building pack index ensure index bytes are different each time by adding 32 random bytes
This commit is contained in:
Jarek Kowalski
2020-05-31 17:11:20 -07:00
committed by GitHub
parent 9123e3ebff
commit d68273a576
32 changed files with 1911 additions and 390 deletions

View File

@@ -165,10 +165,10 @@ test-with-coverage-pkgonly:
$(GO_TEST) -count=1 -coverprofile=tmp.cov -timeout 90s github.com/kopia/kopia/...
test:
$(GO_TEST) -count=1 -timeout 90s ./...
$(GO_TEST) -count=1 -timeout 180s ./...
vtest:
$(GO_TEST) -count=1 -short -v -timeout 90s ./...
$(GO_TEST) -count=1 -short -v -timeout 180s ./...
dist-binary:
go build -o $(KOPIA_INTEGRATION_EXE) github.com/kopia/kopia

View File

@@ -6,6 +6,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/repo"
)
@@ -17,7 +18,14 @@ func runCacheClearCommand(ctx context.Context, rep *repo.DirectRepository) error
if d := rep.Content.CachingOptions.CacheDirectory; d != "" {
printStderr("Clearing cache directory: %v.\n", d)
err := os.RemoveAll(d)
// close repository before removing cache
if err := rep.Close(ctx); err != nil {
return errors.Wrap(err, "unable to close repository")
}
err := retry.WithExponentialBackoffNoValue(ctx, "delete cache", func() error {
return os.RemoveAll(d)
}, retry.Always)
if err != nil {
return err
}

View File

@@ -19,7 +19,7 @@
)
func runCacheSetCommand(ctx context.Context, rep *repo.DirectRepository) error {
opts := rep.Content.CachingOptions
opts := rep.Content.CachingOptions.CloneOrDefault()
changed := 0

View File

@@ -9,13 +9,14 @@
)
var (
blockIndexListCommand = indexCommands.Command("list", "List content indexes").Alias("ls").Default()
blockIndexListSummary = blockIndexListCommand.Flag("summary", "Display index blob summary").Bool()
blockIndexListSort = blockIndexListCommand.Flag("sort", "Index blob sort order").Default("time").Enum("time", "size", "name")
blockIndexListCommand = indexCommands.Command("list", "List content indexes").Alias("ls").Default()
blockIndexListSummary = blockIndexListCommand.Flag("summary", "Display index blob summary").Bool()
blockIndexListIncludeSuperseded = blockIndexListCommand.Flag("superseded", "Include inactive index files superseded by compaction").Bool()
blockIndexListSort = blockIndexListCommand.Flag("sort", "Index blob sort order").Default("time").Enum("time", "size", "name")
)
func runListBlockIndexesAction(ctx context.Context, rep *repo.DirectRepository) error {
blks, err := rep.Content.IndexBlobs(ctx)
blks, err := rep.Content.IndexBlobs(ctx, *blockIndexListIncludeSuperseded)
if err != nil {
return err
}
@@ -36,11 +37,11 @@ func runListBlockIndexesAction(ctx context.Context, rep *repo.DirectRepository)
}
for _, b := range blks {
fmt.Printf("%-70v %10v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp))
fmt.Printf("%-40v %10v %v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp), b.Superseded)
}
if *blockIndexListSummary {
fmt.Printf("total %v blocks\n", len(blks))
fmt.Printf("total %v indexes\n", len(blks))
}
return nil

View File

@@ -3,7 +3,9 @@
import (
"context"
"io/ioutil"
"math"
"math/rand"
"strings"
"sync"
"time"
@@ -77,10 +79,12 @@ func (e *ecCacheEntry) isValid() bool {
type eventuallyConsistentStorage struct {
mu sync.Mutex
listDropProbability float64
recentlyDeleted sync.Map
listSettleTime time.Duration
caches []*ecFrontendCache
realStorage blob.Storage
timeNow func() time.Time
}
func (s *eventuallyConsistentStorage) randomFrontendCache() *ecFrontendCache {
@@ -163,27 +167,89 @@ func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, d
func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
s.randomFrontendCache().put(id, nil)
// capture metadata before deleting
md, err := s.realStorage.GetMetadata(ctx, id)
if errors.Is(err, blob.ErrBlobNotFound) {
return blob.ErrBlobNotFound
}
if err != nil {
return err
}
if err := s.realStorage.DeleteBlob(ctx, id); err != nil {
return err
}
md.Timestamp = s.timeNow()
s.recentlyDeleted.Store(id, md)
return nil
}
func (s *eventuallyConsistentStorage) shouldApplyInconsistency(ctx context.Context, age time.Duration, desc string) bool {
if age < 0 {
age = -age
}
if age >= s.listSettleTime {
return false
}
x := age.Seconds() / s.listSettleTime.Seconds() // [0..1)
// y=1-(x^0.3) is:
// about 50% probability of inconsistency after 10% of listSettleTime
// about 25% probability of inconsistency after 40% of listSettleTime
// about 10% probability of inconsistency after 67% of listSettleTime
// about 1% probability of inconsistency after 95% of listSettleTime
const power = 0.3
prob := 1 - math.Pow(x, power)
if rand.Float64() < prob {
log(ctx).Debugf("applying inconsistency %v (probability %v)", desc, prob)
return true
}
return false
}
func (s *eventuallyConsistentStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error {
return s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
e := s.randomFrontendCache().get(bm.BlobID)
if e != nil {
// item recently manipulated by the cache, skip from the results with some
// probability
if rand.Float64() < s.listDropProbability {
// skip callback if locally deleted
return nil
}
now := s.timeNow()
if err := s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "hide recently created "+string(bm.BlobID)) {
return nil
}
return callback(bm)
}); err != nil {
return err
}
var resultErr error
// process recently deleted items and resurrect them with some probability
s.recentlyDeleted.Range(func(key, value interface{}) bool {
blobID := key.(blob.ID)
if !strings.HasPrefix(string(blobID), string(prefix)) {
return true
}
bm := value.(blob.Metadata)
if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "resurrect recently deleted "+string(bm.BlobID)) {
if resultErr = callback(bm); resultErr != nil {
return false
}
}
return true
})
return resultErr
}
func (s *eventuallyConsistentStorage) Close(ctx context.Context) error {
@@ -196,10 +262,11 @@ func (s *eventuallyConsistentStorage) ConnectionInfo() blob.ConnectionInfo {
// NewEventuallyConsistentStorage returns an eventually-consistent storage wrapper on top
// of provided storage.
func NewEventuallyConsistentStorage(st blob.Storage, listDropProbability float64) blob.Storage {
func NewEventuallyConsistentStorage(st blob.Storage, listSettleTime time.Duration, timeNow func() time.Time) blob.Storage {
return &eventuallyConsistentStorage{
realStorage: st,
caches: make([]*ecFrontendCache, 4),
listDropProbability: listDropProbability,
realStorage: st,
caches: make([]*ecFrontendCache, 4),
listSettleTime: listSettleTime,
timeNow: timeNow,
}
}

View File

@@ -69,10 +69,6 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) e
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.data[id]; ok {
return nil
}
s.keyTime[id] = s.timeNow()
var b bytes.Buffer

View File

@@ -86,3 +86,17 @@ func ContextWithLevel(t testingT, level Level) context.Context {
return &testLogger{t, "[" + module + "] ", level}
})
}
// ContextWithLevelAndPrefix returns a context with attached logger that emits all log entries with given log level or above.
func ContextWithLevelAndPrefix(t testingT, level Level, prefix string) context.Context {
return logging.WithLogger(context.Background(), func(module string) logging.Logger {
return &testLogger{t, "[" + module + "] " + prefix, level}
})
}
// ContextWithLevelAndPrefixFunc returns a context with attached logger that emits all log entries with given log level or above.
func ContextWithLevelAndPrefixFunc(t testingT, level Level, prefixFunc func() string) context.Context {
return logging.WithLogger(context.Background(), func(module string) logging.Logger {
return &testLogger{t, "[" + module + "] " + prefixFunc(), level}
})
}

View File

@@ -22,9 +22,9 @@ func (s *loggingStorage) GetBlob(ctx context.Context, id blob.ID, offset, length
dt := time.Since(t0)
if len(result) < maxLoggedBlobLength {
s.printf(s.prefix+"GetBlob(%q,%v,%v)=(%#v, %#v) took %v", id, offset, length, result, err, dt)
s.printf(s.prefix+"GetBlob(%q,%v,%v)=(%v, %#v) took %v", id, offset, length, result, err, dt)
} else {
s.printf(s.prefix+"GetBlob(%q,%v,%v)=({%#v bytes}, %#v) took %v", id, offset, length, len(result), err, dt)
s.printf(s.prefix+"GetBlob(%q,%v,%v)=({%v bytes}, %#v) took %v", id, offset, length, len(result), err, dt)
}
return result, err
@@ -35,7 +35,7 @@ func (s *loggingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta
result, err := s.base.GetMetadata(ctx, id)
dt := time.Since(t0)
s.printf(s.prefix+"GetMetadata(%q)=(%#v, %#v) took %v", id, result, err, dt)
s.printf(s.prefix+"GetMetadata(%q)=(%v, %#v) took %v", id, result, err, dt)
return result, err
}

View File

@@ -2,6 +2,7 @@
import (
"context"
"encoding/json"
"io"
"sync"
"time"
@@ -62,9 +63,14 @@ type Storage interface {
// Metadata represents metadata about a single BLOB in a storage.
type Metadata struct {
BlobID ID
Length int64
Timestamp time.Time
BlobID ID `json:"id"`
Length int64 `json:"length"`
Timestamp time.Time `json:"timestamp"`
}
func (m *Metadata) String() string {
b, _ := json.Marshal(m)
return string(b)
}
// ErrBlobNotFound is returned when a BLOB cannot be found in storage.
@@ -123,55 +129,3 @@ func IterateAllPrefixesInParallel(ctx context.Context, parallelism int, st Stora
// return first error or nil
return <-errch
}
// ListAllBlobsConsistent lists all blobs with given name prefix in the provided storage until the results are
// consistent. The results are consistent if the list result fetched twice is identical. This guarantees that while
// the first scan was in progress, no new blob was added or removed.
// maxAttempts specifies maximum number of list attempts (must be >= 2)
func ListAllBlobsConsistent(ctx context.Context, st Storage, prefix ID, maxAttempts int) ([]Metadata, error) {
var previous []Metadata
for i := 0; i < maxAttempts; i++ {
result, err := ListAllBlobs(ctx, st, prefix)
if err != nil {
return nil, err
}
if i > 0 && sameBlobs(result, previous) {
return result, nil
}
previous = result
}
return nil, errors.Errorf("unable to achieve consistent snapshot despite %v attempts", maxAttempts)
}
// sameBlobs returns true if b1 & b2 contain the same blobs (ignoring order).
func sameBlobs(b1, b2 []Metadata) bool {
if len(b1) != len(b2) {
return false
}
m := map[ID]Metadata{}
for _, b := range b1 {
m[b.BlobID] = normalizeMetadata(b)
}
for _, b := range b2 {
if r := m[b.BlobID]; r != normalizeMetadata(b) {
return false
}
}
return true
}
func normalizeMetadata(m Metadata) Metadata {
return Metadata{m.BlobID, m.Length, normalizeTimestamp(m.Timestamp)}
}
func normalizeTimestamp(t time.Time) time.Time {
return time.Unix(0, t.UnixNano())
}

View File

@@ -1,58 +0,0 @@
package blob_test
import (
"testing"
"time"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
)
func TestListAllBlobsConsistent(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, time.Now)
st.PutBlob(ctx, "foo1", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck
st.PutBlob(ctx, "foo2", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck
st.PutBlob(ctx, "foo3", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck
// set up faulty storage that will add a blob while a scan is in progress.
f := &blobtesting.FaultyStorage{
Base: st,
Faults: map[string][]*blobtesting.Fault{
"ListBlobsItem": {
{ErrCallback: func() error {
st.PutBlob(ctx, "foo0", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck
return nil
}},
},
},
}
r, err := blob.ListAllBlobsConsistent(ctx, f, "foo", 3)
if err != nil {
t.Fatalf("error: %v", err)
}
// make sure we get the list with 4 items, not 3.
if got, want := len(r), 4; got != want {
t.Errorf("unexpected list result count: %v, want %v", got, want)
}
}
func TestListAllBlobsConsistentEmpty(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, time.Now)
r, err := blob.ListAllBlobsConsistent(ctx, st, "foo", 3)
if err != nil {
t.Fatalf("error: %v", err)
}
if got, want := len(r), 0; got != want {
t.Errorf("unexpected list result count: %v, want %v", got, want)
}
}

View File

@@ -63,7 +63,7 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s
lc.Username = getDefaultUserName(ctx)
}
if err = setupCaching(ctx, configFile, &lc, opt.CachingOptions, f.UniqueID); err != nil {
if err = setupCaching(ctx, configFile, &lc, &opt.CachingOptions, f.UniqueID); err != nil {
return errors.Wrap(err, "unable to set up caching")
}
@@ -107,7 +107,9 @@ func verifyConnect(ctx context.Context, configFile, password string, persist boo
return r.Close(ctx)
}
func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt content.CachingOptions, uniqueID []byte) error {
func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt *content.CachingOptions, uniqueID []byte) error {
opt = opt.CloneOrDefault()
if opt.MaxCacheSizeBytes == 0 {
lc.Caching = &content.CachingOptions{}
return nil

View File

@@ -2,6 +2,7 @@
import (
"bufio"
"crypto/rand"
"encoding/binary"
"io"
"sort"
@@ -16,6 +17,7 @@
deletedMarker = 0x80000000
entryFixedHeaderLength = 20
randomSuffixSize = 32
)
// packIndexBuilder prepares and writes content index.
@@ -107,6 +109,15 @@ func (b packIndexBuilder) Build(output io.Writer) error {
return errors.Wrap(err, "error writing extra data")
}
randomSuffix := make([]byte, randomSuffixSize)
if _, err := rand.Read(randomSuffix); err != nil {
return errors.Wrap(err, "error getting random bytes for suffix")
}
if _, err := w.Write(randomSuffix); err != nil {
return errors.Wrap(err, "error writing extra random suffix to ensure indexes are always globally unique")
}
return w.Flush()
}

View File

@@ -6,6 +6,18 @@ type CachingOptions struct {
MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"`
MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"`
MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"`
IgnoreListCache bool `json:"-"`
HMACSecret []byte `json:"-"`
ownWritesCache ownWritesCache
}
// CloneOrDefault returns a clone of the caching options or empty options for nil.
func (c *CachingOptions) CloneOrDefault() *CachingOptions {
if c == nil {
return &CachingOptions{}
}
c2 := *c
return &c2
}

View File

@@ -133,7 +133,20 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b
return true, nil
}
func newCommittedContentIndex(caching CachingOptions) *committedContentIndex {
func (b *committedContentIndex) close() error {
b.mu.Lock()
defer b.mu.Unlock()
for _, pi := range b.inUse {
if err := pi.Close(); err != nil {
return errors.Wrap(err, "unable to close index")
}
}
return nil
}
func newCommittedContentIndex(caching *CachingOptions) *committedContentIndex {
var cache committedContentIndexCache
if caching.CacheDirectory != "" {

View File

@@ -82,7 +82,7 @@ func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) {
c.sweepMutexes()
if err := c.sweepDirectory(ctx); err != nil {
log(ctx).Warningf("cacheBase sweep failed: %v", err)
log(ctx).Warningf("cache sweep failed: %v", err)
}
}
}

View File

@@ -104,7 +104,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
MaxPackSize: maxPackSize,
MasterKey: make([]byte, 32), // zero key, does not matter
Version: 1,
}, CachingOptions{}, time.Now, nil)
}, nil, time.Now, nil)
if err != nil {
t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error())
return

View File

@@ -24,7 +24,7 @@ func TestContentIndexRecovery(t *testing.T) {
}
// delete all index blobs
assertNoError(t, bm.st.ListBlobs(ctx, newIndexBlobPrefix, func(bi blob.Metadata) error {
assertNoError(t, bm.st.ListBlobs(ctx, indexBlobPrefix, func(bi blob.Metadata) error {
log(ctx).Debugf("deleting %v", bi.BlobID)
return bm.st.DeleteBlob(ctx, bi.BlobID)
}))

View File

@@ -44,7 +44,7 @@
const (
parallelFetches = 5 // number of parallel reads goroutines
flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes
newIndexBlobPrefix = "n"
indexBlobPrefix = "n"
defaultMinPreambleLength = 32
defaultMaxPreambleLength = 32
defaultPaddingUnit = 4096
@@ -65,9 +65,8 @@
// IndexBlobInfo is an information about a single index blob managed by Manager.
type IndexBlobInfo struct {
BlobID blob.ID
Length int64
Timestamp time.Time
blob.Metadata
Superseded []blob.Metadata
}
// Manager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store.
@@ -307,12 +306,12 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error {
data := b.Bytes()
dataCopy := append([]byte(nil), data...)
indexBlobID, err := bm.writePackIndexesNew(ctx, data)
indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data)
if err != nil {
return err
}
if err := bm.committedContents.addContent(ctx, indexBlobID, dataCopy, true); err != nil {
if err := bm.committedContents.addContent(ctx, indexBlobMD.BlobID, dataCopy, true); err != nil {
return errors.Wrap(err, "unable to add committed content")
}
@@ -405,6 +404,10 @@ func (bm *Manager) Close(ctx context.Context) error {
return errors.Wrap(err, "error flushing")
}
if err := bm.committedContents.close(); err != nil {
return errors.Wrap(err, "error closed committed content index")
}
bm.contentCache.close()
bm.metadataCache.close()
bm.encryptionBufferPool.Close()
@@ -666,7 +669,7 @@ type ManagerOptions struct {
}
// NewManager creates new content manager with given packing options and a formatter.
func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, options ManagerOptions) (*Manager, error) {
func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, options ManagerOptions) (*Manager, error) {
nowFn := options.TimeNow
if nowFn == nil {
nowFn = time.Now // allow:no-inject-time
@@ -675,7 +678,7 @@ func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, cach
return newManagerWithOptions(ctx, st, f, caching, nowFn, options.RepositoryFormatBytes)
}
func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) {
func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) {
if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion {
return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion)
}
@@ -689,43 +692,10 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp
return nil, err
}
dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents")
if err != nil {
return nil, errors.Wrap(err, "unable to initialize data cache storage")
}
dataCache, err := newContentCacheForData(ctx, st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize content cache")
}
metadataCacheSize := caching.MaxMetadataCacheSizeBytes
if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 {
metadataCacheSize = caching.MaxCacheSizeBytes
}
metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata")
if err != nil {
return nil, errors.Wrap(err, "unable to initialize data cache storage")
}
metadataCache, err := newContentCacheForMetadata(ctx, st, metadataCacheStorage, metadataCacheSize)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize metadata cache")
}
listCache, err := newListCache(st, caching)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize list cache")
}
contentIndex := newCommittedContentIndex(caching)
mu := &sync.RWMutex{}
m := &Manager{
lockFreeManager: lockFreeManager{
Format: *f,
CachingOptions: caching,
timeNow: timeNow,
maxPackSize: f.MaxPackSize,
encryptor: encryptor,
@@ -733,14 +703,10 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp
minPreambleLength: defaultMinPreambleLength,
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
contentCache: dataCache,
metadataCache: metadataCache,
listCache: listCache,
st: st,
repositoryFormatBytes: repositoryFormatBytes,
checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "",
writeFormatVersion: int32(f.Version),
committedContents: contentIndex,
encryptionBufferPool: buf.NewPool(ctx, defaultEncryptionBufferPoolSegmentSize+encryptor.MaxOverhead(), "content-manager-encryption"),
},
@@ -752,9 +718,76 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp
packIndexBuilder: make(packIndexBuilder),
}
if err := setupCaches(ctx, m, caching); err != nil {
return nil, errors.Wrap(err, "unable to set up caches")
}
if err := m.CompactIndexes(ctx, autoCompactionOptions); err != nil {
return nil, errors.Wrap(err, "error initializing content manager")
}
return m, nil
}
func setupCaches(ctx context.Context, m *Manager, caching *CachingOptions) error {
caching = caching.CloneOrDefault()
dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents")
if err != nil {
return errors.Wrap(err, "unable to initialize data cache storage")
}
dataCache, err := newContentCacheForData(ctx, m.st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret)
if err != nil {
return errors.Wrap(err, "unable to initialize content cache")
}
metadataCacheSize := caching.MaxMetadataCacheSizeBytes
if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 {
metadataCacheSize = caching.MaxCacheSizeBytes
}
metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata")
if err != nil {
return errors.Wrap(err, "unable to initialize data cache storage")
}
metadataCache, err := newContentCacheForMetadata(ctx, m.st, metadataCacheStorage, metadataCacheSize)
if err != nil {
return errors.Wrap(err, "unable to initialize metadata cache")
}
listCache, err := newListCache(m.st, caching)
if err != nil {
return errors.Wrap(err, "unable to initialize list cache")
}
if caching.ownWritesCache == nil {
// this is test hook to allow test to specify custom cache
caching.ownWritesCache, err = newOwnWritesCache(ctx, caching, m.timeNow)
if err != nil {
return errors.Wrap(err, "unable to initialize own writes cache")
}
}
contentIndex := newCommittedContentIndex(caching)
// once everything is ready, set it up
m.CachingOptions = *caching
m.contentCache = dataCache
m.metadataCache = metadataCache
m.committedContents = contentIndex
m.indexBlobManager = &indexBlobManagerImpl{
st: m.st,
encryptor: m.encryptor,
hasher: m.hasher,
timeNow: m.timeNow,
ownWritesCache: caching.ownWritesCache,
listCache: listCache,
indexBlobCache: metadataCache,
maxEventualConsistencySettleTime: defaultEventualConsistencySettleTime,
}
return nil
}

View File

@@ -6,6 +6,8 @@
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
)
const verySmallContentFraction = 20 // blobs less than 1/verySmallContentFraction of maxPackSize are considered 'very small'
@@ -87,15 +89,18 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs []
return nil
}
formatLog(ctx).Debugf("compacting %v contents", len(indexBlobs))
formatLog(ctx).Debugf("compacting %v index blobs", len(indexBlobs))
t0 := time.Now() // allow:no-inject-time
bld := make(packIndexBuilder)
var inputs, outputs []blob.Metadata
for _, indexBlob := range indexBlobs {
if err := bm.addIndexBlobsToBuilder(ctx, bld, indexBlob, opt); err != nil {
return err
return errors.Wrap(err, "error adding index to builder")
}
inputs = append(inputs, indexBlob.Metadata)
}
var buf bytes.Buffer
@@ -103,32 +108,33 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs []
return errors.Wrap(err, "unable to build an index")
}
compactedIndexBlob, err := bm.writePackIndexesNew(ctx, buf.Bytes())
compactedIndexBlob, err := bm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes())
if err != nil {
return errors.Wrap(err, "unable to write compacted indexes")
}
formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0)) // allow:no-inject-time
// compaction wrote index blob that's the same as one of the sources
// it must be a no-op.
for _, indexBlob := range indexBlobs {
if indexBlob.BlobID == compactedIndexBlob {
continue
if indexBlob.BlobID == compactedIndexBlob.BlobID {
formatLog(ctx).Debugf("compaction was a no-op")
return nil
}
}
bm.listCache.deleteListCache()
outputs = append(outputs, compactedIndexBlob)
if err := bm.st.DeleteBlob(ctx, indexBlob.BlobID); err != nil {
log(ctx).Warningf("unable to delete compacted blob %q: %v", indexBlob.BlobID, err)
}
if err := bm.indexBlobManager.registerCompaction(ctx, inputs, outputs); err != nil {
return errors.Wrap(err, "unable to register compaction")
}
return nil
}
func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo, opt CompactOptions) error {
data, err := bm.getIndexBlobInternal(ctx, indexBlob.BlobID)
data, err := bm.indexBlobManager.getIndexBlob(ctx, indexBlob.BlobID)
if err != nil {
return err
return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID)
}
index, err := openPackIndex(bytes.NewReader(data))
@@ -147,3 +153,17 @@ func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuil
return nil
}
func addBlobsToIndex(ndx map[blob.ID]*IndexBlobInfo, blobs []blob.Metadata) {
for _, it := range blobs {
if ndx[it.BlobID] == nil {
ndx[it.BlobID] = &IndexBlobInfo{
Metadata: blob.Metadata{
BlobID: it.BlobID,
Length: it.Length,
Timestamp: it.Timestamp,
},
}
}
}
}

View File

@@ -7,7 +7,6 @@
cryptorand "crypto/rand"
"encoding/hex"
"io"
"strings"
"sync"
"time"
@@ -25,11 +24,11 @@ type lockFreeManager struct {
// this one is not lock-free
Stats Stats
listCache *listCache
st blob.Storage
Format FormattingOptions
CachingOptions CachingOptions
indexBlobManager indexBlobManager
contentCache contentCache
metadataCache contentCache
committedContents *committedContentIndex
@@ -67,6 +66,8 @@ func (bm *lockFreeManager) maybeEncryptContentDataForPacking(output *gather.Writ
return errors.Wrap(err, "unable to encrypt")
}
bm.Stats.encrypted(len(data))
output.Append(cipherText)
return nil
@@ -93,32 +94,32 @@ func (bm *lockFreeManager) loadPackIndexesUnlocked(ctx context.Context) ([]Index
}
if i > 0 {
bm.listCache.deleteListCache()
bm.indexBlobManager.flushCache()
log(ctx).Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i)
time.Sleep(nextSleepTime)
nextSleepTime *= 2
}
contents, err := bm.listCache.listIndexBlobs(ctx)
indexBlobs, err := bm.indexBlobManager.listIndexBlobs(ctx, false)
if err != nil {
return nil, false, err
}
err = bm.tryLoadPackIndexBlobsUnlocked(ctx, contents)
err = bm.tryLoadPackIndexBlobsUnlocked(ctx, indexBlobs)
if err == nil {
var contentIDs []blob.ID
for _, b := range contents {
contentIDs = append(contentIDs, b.BlobID)
var indexBlobIDs []blob.ID
for _, b := range indexBlobs {
indexBlobIDs = append(indexBlobIDs, b.BlobID)
}
var updated bool
updated, err = bm.committedContents.use(ctx, contentIDs)
updated, err = bm.committedContents.use(ctx, indexBlobIDs)
if err != nil {
return nil, false, err
}
return contents, updated, nil
return indexBlobs, updated, nil
}
if err != blob.ErrBlobNotFound {
@@ -129,8 +130,8 @@ func (bm *lockFreeManager) loadPackIndexesUnlocked(ctx context.Context) ([]Index
return nil, false, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts)
}
func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, contents []IndexBlobInfo) error {
ch, unprocessedIndexesSize, err := bm.unprocessedIndexBlobsUnlocked(ctx, contents)
func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, indexBlobs []IndexBlobInfo) error {
ch, unprocessedIndexesSize, err := bm.unprocessedIndexBlobsUnlocked(ctx, indexBlobs)
if err != nil {
return err
}
@@ -152,7 +153,7 @@ func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, co
defer wg.Done()
for indexBlobID := range ch {
data, err := bm.getIndexBlobInternal(ctx, indexBlobID)
data, err := bm.indexBlobManager.getIndexBlob(ctx, indexBlobID)
if err != nil {
errch <- err
return
@@ -317,37 +318,8 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi
}
// IndexBlobs returns the list of active index blobs.
func (bm *lockFreeManager) IndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) {
return bm.listCache.listIndexBlobs(ctx)
}
func (bm *lockFreeManager) getIndexBlobInternal(ctx context.Context, blobID blob.ID) ([]byte, error) {
payload, err := bm.metadataCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1)
if err != nil {
return nil, err
}
iv, err := getIndexBlobIV(blobID)
if err != nil {
return nil, err
}
bm.Stats.readContent(len(payload))
payload, err = bm.encryptor.Decrypt(nil, payload, iv)
bm.Stats.decrypted(len(payload))
if err != nil {
return nil, errors.Wrap(err, "decrypt error")
}
// Since the encryption key is a function of data, we must be able to generate exactly the same key
// after decrypting the content. This serves as a checksum.
if err := bm.verifyChecksum(payload, iv); err != nil {
return nil, err
}
return payload, nil
func (bm *lockFreeManager) IndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) {
return bm.indexBlobManager.listIndexBlobs(ctx, includeInactive)
}
func getPackedContentIV(output []byte, contentID ID) ([]byte, error) {
@@ -359,49 +331,12 @@ func getPackedContentIV(output []byte, contentID ID) ([]byte, error) {
return output[0:n], nil
}
func getIndexBlobIV(s blob.ID) ([]byte, error) {
if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic
s = s[0:p]
}
return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):]))
}
func (bm *lockFreeManager) writePackFileNotLocked(ctx context.Context, packFile blob.ID, data gather.Bytes) error {
bm.Stats.wroteContent(data.Length())
bm.listCache.deleteListCache()
return bm.st.PutBlob(ctx, packFile, data)
}
func (bm *lockFreeManager) encryptAndWriteBlobNotLocked(ctx context.Context, data []byte, prefix blob.ID) (blob.ID, error) {
var hashOutput [maxHashSize]byte
hash := bm.hashData(hashOutput[:0], data)
blobID := prefix + blob.ID(hex.EncodeToString(hash))
iv, err := getIndexBlobIV(blobID)
if err != nil {
return "", err
}
bm.Stats.encrypted(len(data))
data2, err := bm.encryptor.Encrypt(nil, data, iv)
if err != nil {
return "", err
}
bm.Stats.wroteContent(len(data2))
bm.listCache.deleteListCache()
if err := bm.st.PutBlob(ctx, blobID, gather.FromSlice(data2)); err != nil {
return "", err
}
return blobID, nil
}
func (bm *lockFreeManager) hashData(output, data []byte) []byte {
// Hash the content and compute encryption key.
contentID := bm.hasher(output, data)
@@ -410,10 +345,6 @@ func (bm *lockFreeManager) hashData(output, data []byte) []byte {
return contentID
}
func (bm *lockFreeManager) writePackIndexesNew(ctx context.Context, data []byte) (blob.ID, error) {
return bm.encryptAndWriteBlobNotLocked(ctx, data, newIndexBlobPrefix)
}
func (bm *lockFreeManager) verifyChecksum(data, contentID []byte) error {
var hashOutput [maxHashSize]byte

View File

@@ -0,0 +1,194 @@
package content
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/filesystem"
)
const ownWritesCacheRetention = 15 * time.Minute
type ownWritesCache interface {
add(ctx context.Context, mb blob.Metadata) error
merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error)
delete(ctx context.Context, md blob.ID) error
}
// nullOwnWritesCache is an implementation of ownWritesCache that ignores all changes.
type nullOwnWritesCache struct {
}
func (n *nullOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error {
return nil
}
func (n *nullOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error {
return nil
}
func (n *nullOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) {
return source, nil
}
// memoryOwnWritesCache is an implementation of ownWritesCache that caches in memory.
type memoryOwnWritesCache struct {
entries sync.Map
timeNow func() time.Time
}
func (n *memoryOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error {
log(ctx).Debugf("adding %v to own-writes cache", mb.BlobID)
n.entries.Store(mb.BlobID, mb)
return nil
}
func (n *memoryOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error {
return n.add(ctx, blob.Metadata{
BlobID: blobID,
Length: -1,
Timestamp: n.timeNow(),
})
}
func (n *memoryOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) {
var result []blob.Metadata
n.entries.Range(func(key, value interface{}) bool {
md := value.(blob.Metadata)
if !strings.HasPrefix(string(md.BlobID), string(prefix)) {
return true
}
if age := n.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention {
result = append(result, md)
} else {
log(ctx).Debugf("deleting stale own writes cache entry: %v (%v)", key, age)
n.entries.Delete(key)
}
return true
})
return mergeOwnWrites(ctx, source, result), nil
}
// persistentOwnWritesCache is an implementation of ownWritesCache that caches entries to strongly consistent blob storage.
type persistentOwnWritesCache struct {
st blob.Storage
timeNow func() time.Time
}
func (d *persistentOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error {
j, err := json.Marshal(mb)
if err != nil {
return errors.Wrap(err, "unable to marshal JSON")
}
return d.st.PutBlob(ctx, mb.BlobID, gather.FromSlice(j))
}
func (d *persistentOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) {
var myWrites []blob.Metadata
err := d.st.ListBlobs(ctx, prefix, func(md blob.Metadata) error {
b, err := d.st.GetBlob(ctx, md.BlobID, 0, -1)
if err == blob.ErrBlobNotFound {
return nil
}
if err != nil {
return errors.Wrapf(err, "error reading own write cache entry %v", md.BlobID)
}
var originalMD blob.Metadata
if err := json.Unmarshal(b, &originalMD); err != nil {
return errors.Wrapf(err, "error unmarshaling own write cache entry %v", md.BlobID)
}
// note that we're assuming that time scale used by timeNow() is the same as used by
// cache storage, which is fine, since the cache is local and not on remote FS.
if age := d.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention {
myWrites = append(myWrites, originalMD)
} else {
log(ctx).Debugf("deleting blob %v from own-write cache because it's too old: %v (%v)", md.BlobID, age, originalMD.Timestamp)
if err := d.st.DeleteBlob(ctx, md.BlobID); err != nil && err != blob.ErrBlobNotFound {
return errors.Wrap(err, "error deleting stale blob")
}
}
return nil
})
return mergeOwnWrites(ctx, source, myWrites), err
}
func (d *persistentOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error {
return d.add(ctx, blob.Metadata{
BlobID: blobID,
Length: -1,
Timestamp: d.timeNow(),
})
}
func mergeOwnWrites(ctx context.Context, source, own []blob.Metadata) []blob.Metadata {
m := map[blob.ID]blob.Metadata{}
for _, v := range source {
m[v.BlobID] = v
}
for _, v := range own {
if v.Length < 0 {
delete(m, v.BlobID)
} else {
m[v.BlobID] = v
}
}
var s []blob.Metadata
for _, v := range m {
s = append(s, v)
}
log(ctx).Debugf("merged %v backend blobs and %v local blobs into %v", source, own, s)
return s
}
func newOwnWritesCache(ctx context.Context, caching *CachingOptions, timeNow func() time.Time) (ownWritesCache, error) {
if caching.CacheDirectory == "" {
return &memoryOwnWritesCache{timeNow: timeNow}, nil
}
dirname := filepath.Join(caching.CacheDirectory, "own-writes")
if err := os.MkdirAll(dirname, 0700); err != nil {
return nil, errors.Wrap(err, "unable to create own writes cache directory")
}
st, err := filesystem.New(ctx, &filesystem.Options{
Path: dirname,
DirectoryShards: []int{},
})
if err != nil {
return nil, errors.Wrap(err, "unable to create own writes cache storage")
}
return &persistentOwnWritesCache{st, timeNow}, nil
}

View File

@@ -22,6 +22,7 @@
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/logging"
)
const (
@@ -202,7 +203,7 @@ func TestContentManagerEmpty(t *testing.T) {
func verifyActiveIndexBlobCount(ctx context.Context, t *testing.T, bm *Manager, expected int) {
t.Helper()
blks, err := bm.IndexBlobs(ctx)
blks, err := bm.IndexBlobs(ctx, false)
if err != nil {
t.Errorf("error listing active index blobs: %v", err)
return
@@ -319,7 +320,7 @@ func TestContentManagerFailedToWritePack(t *testing.T) {
MaxPackSize: maxPackSize,
HMACSecret: []byte("foo"),
MasterKey: []byte("0123456789abcdef0123456789abcdef"),
}, CachingOptions{}, faketime.Frozen(fakeTime), nil)
}, nil, faketime.Frozen(fakeTime), nil)
if err != nil {
t.Fatalf("can't create bm: %v", err)
}
@@ -410,17 +411,13 @@ func TestContentManagerConcurrency(t *testing.T) {
verifyContent(ctx, t, bm4, bm2content, seededRandomData(32, 100))
verifyContent(ctx, t, bm4, bm3content, seededRandomData(33, 100))
if got, want := getIndexCount(data), 4; got != want {
t.Errorf("unexpected index count before compaction: %v, wanted %v", got, want)
}
validateIndexCount(t, data, 4, 0)
if err := bm4.CompactIndexes(ctx, CompactOptions{MaxSmallBlobs: 1}); err != nil {
t.Errorf("compaction error: %v", err)
}
if got, want := getIndexCount(data), 1; got != want {
t.Errorf("unexpected index count after compaction: %v, wanted %v", got, want)
}
validateIndexCount(t, data, 5, 1)
// new content manager at this point can see all data.
bm5 := newTestContentManager(t, data, keyTime, nil)
@@ -437,6 +434,30 @@ func TestContentManagerConcurrency(t *testing.T) {
}
}
func validateIndexCount(t *testing.T, data map[blob.ID][]byte, wantIndexCount, wantCompactionLogCount int) {
t.Helper()
var indexCnt, compactionLogCnt int
for blobID := range data {
if strings.HasPrefix(string(blobID), indexBlobPrefix) {
indexCnt++
}
if strings.HasPrefix(string(blobID), compactionLogBlobPrefix) {
compactionLogCnt++
}
}
if got, want := indexCnt, wantIndexCount; got != want {
t.Fatalf("unexpected index blob count %v, want %v", got, want)
}
if got, want := compactionLogCnt, wantCompactionLogCount; got != want {
t.Fatalf("unexpected compaction log blob count %v, want %v", got, want)
}
}
func TestDeleteContent(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}
@@ -1694,6 +1715,82 @@ func verifyVersionCompat(t *testing.T, writeVersion int) {
verifyContentManagerDataSet(ctx, t, mgr, dataSet)
}
func TestReadsOwnWritesWithEventualConsistencyPersistentOwnWritesCache(t *testing.T) {
data := blobtesting.DataMap{}
timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
st := blobtesting.NewMapStorage(data, nil, timeNow)
cacheData := blobtesting.DataMap{}
cacheKeyTime := map[blob.ID]time.Time{}
cacheSt := blobtesting.NewMapStorage(cacheData, cacheKeyTime, timeNow)
ecst := blobtesting.NewEventuallyConsistentStorage(
logging.NewWrapper(st, t.Logf, "[STORAGE] "),
3*time.Second,
timeNow)
// disable own writes cache, will still be ok if store is strongly consistent
verifyReadsOwnWrites(t, ecst, timeNow, &persistentOwnWritesCache{
st: cacheSt,
timeNow: timeNow,
})
}
func TestReadsOwnWritesWithStrongConsistencyAndNoCaching(t *testing.T) {
data := blobtesting.DataMap{}
timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
st := blobtesting.NewMapStorage(data, nil, timeNow)
// if we used nullOwnWritesCache and eventual consistency, the test would fail
// st = blobtesting.NewEventuallyConsistentStorage(logging.NewWrapper(st, t.Logf, "[STORAGE] "), 0.1)
// disable own writes cache, will still be ok if store is strongly consistent
verifyReadsOwnWrites(t, st, timeNow, &nullOwnWritesCache{})
}
func TestReadsOwnWritesWithEventualConsistencyInMemoryOwnWritesCache(t *testing.T) {
data := blobtesting.DataMap{}
timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
st := blobtesting.NewMapStorage(data, nil, timeNow)
ecst := blobtesting.NewEventuallyConsistentStorage(
logging.NewWrapper(st, t.Logf, "[STORAGE] "),
3*time.Second,
timeNow)
verifyReadsOwnWrites(t, ecst, timeNow, &memoryOwnWritesCache{timeNow: timeNow})
}
func verifyReadsOwnWrites(t *testing.T, st blob.Storage, timeNow func() time.Time, sharedOwnWritesCache ownWritesCache) {
ctx := testlogging.Context(t)
cachingOptions := &CachingOptions{
ownWritesCache: sharedOwnWritesCache,
}
bm := newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow)
ids := make([]ID, 100)
for i := 0; i < len(ids); i++ {
ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2))
for j := 0; j < i; j++ {
// verify all contents written so far
verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2))
}
// every 10 contents, create new content manager
if i%10 == 0 {
t.Logf("------- reopening -----")
must(t, bm.Close(ctx))
bm = newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow)
}
}
must(t, bm.Close(ctx))
bm = newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow)
for i := 0; i < len(ids); i++ {
verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2))
}
}
func verifyContentManagerDataSet(ctx context.Context, t *testing.T, mgr *Manager, dataSet map[ID][]byte) {
for contentID, originalPayload := range dataSet {
v, err := mgr.GetContent(ctx, contentID)
@@ -1714,6 +1811,10 @@ func newTestContentManager(t *testing.T, data blobtesting.DataMap, keyTime map[b
}
func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc func() time.Time) *Manager {
return newTestContentManagerWithStorageAndCaching(t, st, nil, timeFunc)
}
func newTestContentManagerWithStorageAndCaching(t *testing.T, st blob.Storage, co *CachingOptions, timeFunc func() time.Time) *Manager {
if timeFunc == nil {
timeFunc = faketime.AutoAdvance(fakeTime, 1*time.Second)
}
@@ -1724,7 +1825,7 @@ func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc fu
HMACSecret: hmacSecret,
MaxPackSize: maxPackSize,
Version: 1,
}, CachingOptions{}, timeFunc, nil)
}, co, timeFunc, nil)
if err != nil {
panic("can't create content manager: " + err.Error())
}
@@ -1734,18 +1835,6 @@ func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc fu
return bm
}
func getIndexCount(d blobtesting.DataMap) int {
var cnt int
for blobID := range d {
if strings.HasPrefix(string(blobID), newIndexBlobPrefix) {
cnt++
}
}
return cnt
}
func verifyContentNotFound(ctx context.Context, t *testing.T, bm *Manager, contentID ID) {
t.Helper()
@@ -1760,7 +1849,7 @@ func verifyContent(ctx context.Context, t *testing.T, bm *Manager, contentID ID,
b2, err := bm.GetContent(ctx, contentID)
if err != nil {
t.Errorf("unable to read content %q: %v", contentID, err)
t.Fatalf("unable to read content %q: %v", contentID, err)
return
}
@@ -1893,3 +1982,11 @@ func getContentInfo(t *testing.T, bm *Manager, c ID) Info {
return i
}
func must(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -0,0 +1,462 @@
package content
import (
"bytes"
"context"
"crypto/aes"
"encoding/hex"
"encoding/json"
"strings"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
)
// indexBlobManager is the API of index blob manager as used by content manager.
type indexBlobManager interface {
writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error)
listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error)
getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error)
registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error
flushCache()
}
const (
compactionLogBlobPrefix = "m"
cleanupBlobPrefix = "l"
defaultEventualConsistencySettleTime = 1 * time.Hour
)
// compactionLogEntry represents contents of compaction log entry stored in `m` blob.
type compactionLogEntry struct {
// list of input blob names that were compacted together.
InputMetadata []blob.Metadata `json:"inputMetadata"`
// list of blobs that are results of compaction.
OutputMetadata []blob.Metadata `json:"outputMetadata"`
// Metadata of the compaction blob itself, not serialized.
metadata blob.Metadata
}
// cleanupEntry represents contents of cleanup entry stored in `l` blob.
type cleanupEntry struct {
BlobID blob.ID `json:"blobID"`
age time.Duration // not serialized, computed on load
}
type indexBlobManagerImpl struct {
st blob.Storage
hasher hashing.HashFunc
encryptor encryption.Encryptor
listCache *listCache
ownWritesCache ownWritesCache
timeNow func() time.Time
indexBlobCache contentCache
maxEventualConsistencySettleTime time.Duration
}
func (m *indexBlobManagerImpl) listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) {
compactionLogMetadata, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix)
if err != nil {
return nil, errors.Wrap(err, "error listing compaction log entries")
}
compactionLogMetadata, err = m.ownWritesCache.merge(ctx, compactionLogBlobPrefix, compactionLogMetadata)
if err != nil {
return nil, errors.Wrap(err, "error merging local writes for compaction log entries")
}
storageIndexBlobs, err := m.listCache.listBlobs(ctx, indexBlobPrefix)
if err != nil {
return nil, errors.Wrap(err, "error listing index blobs")
}
storageIndexBlobs, err = m.ownWritesCache.merge(ctx, indexBlobPrefix, storageIndexBlobs)
if err != nil {
return nil, errors.Wrap(err, "error merging local writes for index blobs")
}
indexMap := map[blob.ID]*IndexBlobInfo{}
addBlobsToIndex(indexMap, storageIndexBlobs)
compactionLogs, err := m.getCompactionLogEntries(ctx, compactionLogMetadata)
if err != nil {
return nil, errors.Wrap(err, "error reading compaction log")
}
// remove entries from indexMap that have been compacted and replaced by other indexes.
removeCompactedIndexes(ctx, indexMap, compactionLogs, includeInactive)
var results []IndexBlobInfo
for _, v := range indexMap {
results = append(results, *v)
}
return results, nil
}
func (m *indexBlobManagerImpl) flushCache() {
m.listCache.deleteListCache(indexBlobPrefix)
m.listCache.deleteListCache(compactionLogBlobPrefix)
}
func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error {
logEntryBytes, err := json.Marshal(&compactionLogEntry{
InputMetadata: inputs,
OutputMetadata: outputs,
})
if err != nil {
return errors.Wrap(err, "unable to marshal log entry bytes")
}
compactionLogBlobMetadata, err := m.encryptAndWriteBlob(ctx, logEntryBytes, compactionLogBlobPrefix)
if err != nil {
return errors.Wrap(err, "unable to write compaction log")
}
formatLog(ctx).Debugf("compacted indexes %v into %v and wrote log %v", inputs, outputs, compactionLogBlobMetadata)
if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata); err != nil {
return errors.Wrap(err, "error deleting old index blobs")
}
return nil
}
func (m *indexBlobManagerImpl) getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) {
return m.getEncryptedBlob(ctx, blobID)
}
func (m *indexBlobManagerImpl) getEncryptedBlob(ctx context.Context, blobID blob.ID) ([]byte, error) {
payload, err := m.indexBlobCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1)
if err != nil {
return nil, err
}
iv, err := getIndexBlobIV(blobID)
if err != nil {
return nil, err
}
payload, err = m.encryptor.Decrypt(nil, payload, iv)
if err != nil {
return nil, errors.Wrap(err, "decrypt error")
}
// Since the encryption key is a function of data, we must be able to generate exactly the same key
// after decrypting the content. This serves as a checksum.
if err := m.verifyChecksum(payload, iv); err != nil {
return nil, err
}
return payload, nil
}
func (m *indexBlobManagerImpl) verifyChecksum(data, contentID []byte) error {
var hashOutput [maxHashSize]byte
expected := m.hasher(hashOutput[:0], data)
expected = expected[len(expected)-aes.BlockSize:]
if !bytes.HasSuffix(contentID, expected) {
return errors.Errorf("invalid checksum for blob %x, expected %x", contentID, expected)
}
return nil
}
func (m *indexBlobManagerImpl) writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) {
return m.encryptAndWriteBlob(ctx, data, indexBlobPrefix)
}
func (m *indexBlobManagerImpl) encryptAndWriteBlob(ctx context.Context, data []byte, prefix blob.ID) (blob.Metadata, error) {
var hashOutput [maxHashSize]byte
hash := m.hasher(hashOutput[:0], data)
blobID := prefix + blob.ID(hex.EncodeToString(hash))
iv, err := getIndexBlobIV(blobID)
if err != nil {
return blob.Metadata{}, err
}
data2, err := m.encryptor.Encrypt(nil, data, iv)
if err != nil {
return blob.Metadata{}, err
}
m.listCache.deleteListCache(prefix)
err = m.st.PutBlob(ctx, blobID, gather.FromSlice(data2))
if err != nil {
return blob.Metadata{}, err
}
bm, err := m.st.GetMetadata(ctx, blobID)
if err != nil {
return blob.Metadata{}, errors.Wrap(err, "unable to get blob metadata")
}
if err := m.ownWritesCache.add(ctx, bm); err != nil {
log(ctx).Warningf("unable to cache own write: %v", err)
}
return bm, nil
}
func (m *indexBlobManagerImpl) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) {
results := map[blob.ID]*compactionLogEntry{}
for _, cb := range blobs {
data, err := m.getEncryptedBlob(ctx, cb.BlobID)
if errors.Is(err, blob.ErrBlobNotFound) {
continue
}
if err != nil {
return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID)
}
le := &compactionLogEntry{}
if err := json.Unmarshal(data, le); err != nil {
return nil, errors.Wrap(err, "unable to read compaction log entry %q")
}
le.metadata = cb
results[cb.BlobID] = le
}
return results, nil
}
func (m *indexBlobManagerImpl) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) {
results := map[blob.ID]*cleanupEntry{}
for _, cb := range blobs {
data, err := m.getEncryptedBlob(ctx, cb.BlobID)
if errors.Is(err, blob.ErrBlobNotFound) {
continue
}
if err != nil {
return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID)
}
le := &cleanupEntry{}
if err := json.Unmarshal(data, le); err != nil {
return nil, errors.Wrap(err, "unable to read compaction log entry %q")
}
le.age = latestServerBlobTime.Sub(cb.Timestamp)
results[cb.BlobID] = le
}
return results, nil
}
func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata) error {
allCompactionLogBlobs, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix)
if err != nil {
return errors.Wrap(err, "error listing compaction log blobs")
}
// look for server-assigned timestamp of the compaction log entry we just wrote as a reference.
// we're assuming server-generated timestamps are somewhat reasonable and time is moving
compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-m.maxEventualConsistencySettleTime)
compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff)
log(ctx).Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff)
compactionBlobEntries, err := m.getCompactionLogEntries(ctx, compactionBlobs)
if err != nil {
return errors.Wrap(err, "unable to get compaction log entries")
}
allCleanupBlobs, err := m.listCache.listBlobs(ctx, cleanupBlobPrefix)
if err != nil {
return errors.Wrap(err, "error listing cleanup blobs")
}
cleanupEntries, err := m.getCleanupEntries(ctx, latestBlob.Timestamp, allCleanupBlobs)
if err != nil {
return errors.Wrap(err, "error loading cleanup blobs")
}
indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries)
compactionLogBlobsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries)
// note that we must always delete index blobs first before compaction logs
// otherwise we may inadvertedly resurrect an index blob that should have been removed.
if err := m.deleteBlobsFromStorageAndCache(ctx, indexBlobsToDelete); err != nil {
return errors.Wrap(err, "unable to delete compaction logs")
}
compactionLogBlobsToDelayCleanup := m.findCompactionLogBlobsToDelayCleanup(ctx, compactionBlobs)
if err := m.delayCleanupBlobs(ctx, compactionLogBlobsToDelayCleanup); err != nil {
return errors.Wrap(err, "unable to schedule delayed cleanup of blobs")
}
if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogBlobsToDelete); err != nil {
return errors.Wrap(err, "unable to delete compaction logs")
}
if err := m.deleteBlobsFromStorageAndCache(ctx, cleanupBlobsToDelete); err != nil {
return errors.Wrap(err, "unable to delete cleanup blobs")
}
m.flushCache()
return nil
}
func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry) []blob.ID {
tmp := map[blob.ID]bool{}
for _, cl := range entries {
// are the input index blobs in this compaction eligble for deletion?
if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < m.maxEventualConsistencySettleTime {
log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, m.maxEventualConsistencySettleTime)
continue
}
for _, b := range cl.InputMetadata {
log(ctx).Debugf("will delete old index %v compacted to %v", b, cl.OutputMetadata)
tmp[b.BlobID] = true
}
}
var result []blob.ID
for k := range tmp {
result = append(result, k)
}
return result
}
func (m *indexBlobManagerImpl) findCompactionLogBlobsToDelayCleanup(ctx context.Context, compactionBlobs []blob.Metadata) []blob.ID {
var result []blob.ID
for _, cb := range compactionBlobs {
log(ctx).Debugf("will delete compaction log blob %v", cb)
result = append(result, cb.BlobID)
}
return result
}
func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry) (compactionLogs, cleanupBlobs []blob.ID) {
for _, e := range entries {
if e.age > m.maxEventualConsistencySettleTime {
compactionLogs = append(compactionLogs, e.BlobID)
cleanupBlobs = append(cleanupBlobs, e.BlobID)
}
}
return
}
func (m *indexBlobManagerImpl) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID) error {
for _, b := range blobIDs {
payload, err := json.Marshal(&cleanupEntry{
BlobID: b,
})
if err != nil {
return errors.Wrap(err, "unable to marshal cleanup log bytes")
}
if _, err := m.encryptAndWriteBlob(ctx, payload, cleanupBlobPrefix); err != nil {
return errors.Wrap(err, "unable to cleanup log")
}
}
return nil
}
func (m *indexBlobManagerImpl) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error {
for _, blobID := range blobIDs {
if err := m.st.DeleteBlob(ctx, blobID); err != nil && err != blob.ErrBlobNotFound {
return errors.Wrapf(err, "unable to delete blob %v", blobID)
}
if err := m.ownWritesCache.delete(ctx, blobID); err != nil {
return errors.Wrapf(err, "unable to delete blob %v from own-writes cache", blobID)
}
}
return nil
}
func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata {
var res []blob.Metadata
for _, m := range m {
if !m.Timestamp.After(cutoffTime) {
res = append(res, m)
}
}
return res
}
func getIndexBlobIV(s blob.ID) ([]byte, error) {
if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic
s = s[0:p]
}
return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):]))
}
func removeCompactedIndexes(ctx context.Context, m map[blob.ID]*IndexBlobInfo, compactionLogs map[blob.ID]*compactionLogEntry, markAsSuperseded bool) {
var validCompactionLogs []*compactionLogEntry
for _, cl := range compactionLogs {
// only process compaction logs for which we have found all the outputs.
haveAllOutputs := true
for _, o := range cl.OutputMetadata {
if m[o.BlobID] == nil {
haveAllOutputs = false
log(ctx).Debugf("blob %v referenced by compaction log is not found", o.BlobID)
break
}
}
if haveAllOutputs {
validCompactionLogs = append(validCompactionLogs, cl)
}
}
// now remove all inputs from the set if there's a valid compaction log entry with all the outputs.
for _, cl := range validCompactionLogs {
for _, ib := range cl.InputMetadata {
if md := m[ib.BlobID]; md != nil && md.Superseded == nil {
log(ctx).Debugf("ignoring index blob %v (%v) because it's been compacted to %v", ib, md.Timestamp, cl.OutputMetadata)
if markAsSuperseded {
md.Superseded = cl.OutputMetadata
} else {
delete(m, ib.BlobID)
}
}
}
}
}

View File

@@ -0,0 +1,770 @@
package content
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"strings"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/logging"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
)
// we use two fake time sources - one for local client and one for the remote store
// to simulate clock drift
var (
fakeLocalStartTime = time.Date(2020, 1, 1, 14, 0, 0, 0, time.UTC)
fakeStoreStartTime = time.Date(2020, 1, 1, 10, 0, 0, 0, time.UTC)
)
const (
testIndexBlobDeleteAge = 1 * time.Minute
testEventualConsistencySettleTime = 45 * time.Second
)
func TestIndexBlobManager(t *testing.T) {
cases := []struct {
storageTimeAdvanceBetweenCompactions time.Duration
wantIndexCount int
wantCompactionLogCount int
wantCleanupCount int
}{
{
// we write 6 index blobs and 2 compaction logs
// but not enough time has passed to delete anything
storageTimeAdvanceBetweenCompactions: 0,
wantIndexCount: 6,
wantCompactionLogCount: 2,
},
{
// we write 6 index blobs and 2 compaction logs
// enough time has passed to delete 3 indexes and create cleanup log
storageTimeAdvanceBetweenCompactions: testIndexBlobDeleteAge + 1*time.Second,
wantIndexCount: 3,
wantCompactionLogCount: 2,
wantCleanupCount: 1,
},
}
for _, tc := range cases {
tc := tc
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
// fake underlying blob store with fake time
storageData := blobtesting.DataMap{}
fakeLocalTime := faketime.NewTimeAdvance(fakeLocalStartTime)
fakeStorageTime := faketime.NewTimeAdvance(fakeStoreStartTime)
st := blobtesting.NewMapStorage(storageData, nil, fakeStorageTime.NowFunc())
st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeStorageTime.NowFunc())
m := newIndexBlobManagerForTesting(t, st, fakeLocalTime.NowFunc())
assertIndexBlobList(t, m)
b1 := mustWriteIndexBlob(t, m, "index-1")
assertIndexBlobList(t, m, b1)
fakeStorageTime.Advance(1 * time.Second)
b2 := mustWriteIndexBlob(t, m, "index-2")
assertIndexBlobList(t, m, b1, b2)
fakeStorageTime.Advance(1 * time.Second)
b3 := mustWriteIndexBlob(t, m, "index-3")
assertIndexBlobList(t, m, b1, b2, b3)
fakeStorageTime.Advance(1 * time.Second)
b4 := mustWriteIndexBlob(t, m, "index-4")
assertIndexBlobList(t, m, b1, b2, b3, b4)
fakeStorageTime.Advance(1 * time.Second)
assertBlobCounts(t, storageData, 4, 0, 0)
// first compaction b1+b2+b3=>b4
mustRegisterCompaction(t, m, []blob.Metadata{b1, b2, b3}, []blob.Metadata{b4})
assertIndexBlobList(t, m, b4)
fakeStorageTime.Advance(tc.storageTimeAdvanceBetweenCompactions)
// second compaction b4+b5=>b6
b5 := mustWriteIndexBlob(t, m, "index-5")
b6 := mustWriteIndexBlob(t, m, "index-6")
mustRegisterCompaction(t, m, []blob.Metadata{b4, b5}, []blob.Metadata{b6})
assertIndexBlobList(t, m, b6)
assertBlobCounts(t, storageData, tc.wantIndexCount, tc.wantCompactionLogCount, tc.wantCleanupCount)
})
}
}
type action int
const (
actionWrite = 1
actionRead = 2
actionCompact = 3
actionDelete = 4
actionUndelete = 5
actionCompactAndDropDeleted = 6
)
// actionsTestIndexBlobManagerStress is a set of actionsTestIndexBlobManagerStress by each actor performed in TestIndexBlobManagerStress with weights
var actionsTestIndexBlobManagerStress = []struct {
a action
weight int
}{
{actionWrite, 10},
{actionRead, 10},
{actionCompact, 10},
{actionDelete, 10},
{actionUndelete, 10},
{actionCompactAndDropDeleted, 10},
}
func pickRandomActionTestIndexBlobManagerStress() action {
sum := 0
for _, a := range actionsTestIndexBlobManagerStress {
sum += a.weight
}
n := rand.Intn(sum)
for _, a := range actionsTestIndexBlobManagerStress {
if n < a.weight {
return a.a
}
n -= a.weight
}
panic("impossible")
}
// TestIndexBlobManagerStress launches N actors, each randomly writing new index blobs,
// verifying that all blobs previously written by it are correct and randomly compacting blobs.
// nolint:gocyclo
func TestIndexBlobManagerStress(t *testing.T) {
t.Parallel()
rand.Seed(time.Now().UnixNano())
for i := range actionsTestIndexBlobManagerStress {
actionsTestIndexBlobManagerStress[i].weight = rand.Intn(100)
t.Logf("weight[%v] = %v", i, actionsTestIndexBlobManagerStress[i].weight)
}
var (
fakeTimeFunc = faketime.AutoAdvance(fakeLocalStartTime, 100*time.Millisecond)
deadline time.Time // when (according to fakeTimeFunc should the test finish)
localTimeDeadline time.Time // when (according to time.Now, the test should finish)
)
localTimeDeadline = time.Now().Add(30 * time.Second)
if os.Getenv("CI") != "" {
// when running on CI, simulate 4 hours, this takes about ~15-20 seconds.
deadline = fakeTimeFunc().Add(4 * time.Hour)
} else {
// otherwise test only 1 hour, which still provides decent coverage, takes about 3-5 seconds.
deadline = fakeTimeFunc().Add(1 * time.Hour)
}
// shared storage
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, fakeTimeFunc)
var eg errgroup.Group
numActors := 2
for actorID := 0; actorID < numActors; actorID++ {
actorID := actorID
loggedSt := logging.NewWrapper(st, func(m string, args ...interface{}) {
t.Logf(fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID)+m, args...)
}, "")
contentPrefix := fmt.Sprintf("a%v", actorID)
eg.Go(func() error {
numWritten := 0
deletedContents := map[string]bool{}
ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string {
return fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID)
})
m := newIndexBlobManagerForTesting(t, loggedSt, fakeTimeFunc)
// run stress test until the deadline, aborting early on any failure
for fakeTimeFunc().Before(deadline) && time.Now().Before(localTimeDeadline) {
switch pickRandomActionTestIndexBlobManagerStress() {
case actionRead:
if err := verifyFakeContentsWritten(ctx, m, numWritten, contentPrefix, deletedContents); err != nil {
return errors.Wrapf(err, "actor[%v] error verifying contents", actorID)
}
case actionWrite:
if err := writeFakeContents(ctx, m, contentPrefix, rand.Intn(10)+5, &numWritten, fakeTimeFunc); err != nil {
return errors.Wrapf(err, "actor[%v] write error", actorID)
}
case actionDelete:
if err := deleteFakeContents(ctx, m, contentPrefix, numWritten, deletedContents, fakeTimeFunc); err != nil {
return errors.Wrapf(err, "actor[%v] delete error", actorID)
}
case actionUndelete:
if err := undeleteFakeContents(ctx, m, deletedContents, fakeTimeFunc); err != nil {
return errors.Wrapf(err, "actor[%v] undelete error", actorID)
}
case actionCompact:
// compaction by more than one actor is unsafe, do it only if actorID == 0
if actorID != 0 {
continue
}
if err := fakeCompaction(ctx, m, false); err != nil {
return errors.Wrapf(err, "actor[%v] compaction error", actorID)
}
case actionCompactAndDropDeleted:
// compaction by more than one actor is unsafe, do it only if actorID == 0
if actorID != 0 {
continue
}
if err := fakeCompaction(ctx, m, true); err != nil {
return errors.Wrapf(err, "actor[%v] compaction error", actorID)
}
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
t.Errorf("err: %+v", err)
}
}
func TestIndexBlobManagerPreventsResurrectOfDeletedContents(t *testing.T) {
rand.Seed(time.Now().UnixNano())
// the test is randomized and runs very quickly, run it lots of times
failed := false
for i := 0; i < 100 && !failed; i++ {
t.Run(fmt.Sprintf("attempt-%v", i), func(t *testing.T) {
verifyIndexBlobManagerPreventsResurrectOfDeletedContents(
t, 1*time.Second, 1*time.Second, testIndexBlobDeleteAge, 1*time.Second, 2*time.Second,
)
})
}
}
func TestCompactionCreatesPreviousIndex(t *testing.T) {
rand.Seed(time.Now().UnixNano())
storageData := blobtesting.DataMap{}
fakeTime := faketime.NewTimeAdvance(fakeLocalStartTime)
fakeTimeFunc := fakeTime.NowFunc()
st := blobtesting.NewMapStorage(storageData, nil, fakeTimeFunc)
st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeTimeFunc)
st = logging.NewWrapper(st, func(msg string, args ...interface{}) {
t.Logf("[store] "+fakeTimeFunc().Format("150405.000")+" "+msg, args...)
}, "store: ")
m := newIndexBlobManagerForTesting(t, st, fakeTimeFunc)
numWritten := 0
deleted := map[string]bool{}
prefix := "prefix"
ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string {
return fakeTimeFunc().Format("150405.000") + " "
})
// index#1 - add content1
must(t, writeFakeContents(ctx, m, prefix, 1, &numWritten, fakeTimeFunc))
fakeTime.Advance(1 * time.Second)
// index#2 - add content2
must(t, writeFakeContents(ctx, m, prefix, 1, &numWritten, fakeTimeFunc))
fakeTime.Advance(1 * time.Second)
// index#3 - {content1, content2}, index#1, index#2 marked for deletion
must(t, fakeCompaction(ctx, m, false))
fakeTime.Advance(1 * time.Second)
// index#4 - delete content1
must(t, deleteFakeContents(ctx, m, prefix, 1, deleted, fakeTimeFunc))
fakeTime.Advance(1 * time.Second)
// this will create index identical to index#2,
// we will embed random ID in the index to ensure that they get different blob ID each time.
// otherwise (since indexes are based on hash of content) they would create the same blob ID.
// if this was the case, first compaction marks index#1 as deleted and second compaction
// revives it.
must(t, fakeCompaction(ctx, m, true))
fakeTime.Advance(testEventualConsistencySettleTime)
// if we were not to add randomness to index blobs, this would fail.
must(t, verifyFakeContentsWritten(ctx, m, 2, prefix, deleted))
}
func TestIndexBlobManagerPreventsResurrectOfDeletedContents_RandomizedTimings(t *testing.T) {
rand.Seed(time.Now().UnixNano())
// the test is randomized and runs very quickly, run it lots of times
for i := 0; i < 1000; i++ {
t.Run(fmt.Sprintf("attempt-%v", i), func(t *testing.T) {
verifyIndexBlobManagerPreventsResurrectOfDeletedContents(
t,
randomDuration(10*time.Second),
randomDuration(10*time.Second),
testIndexBlobDeleteAge+randomDuration(testIndexBlobDeleteAge),
randomDuration(10*time.Second),
randomDuration(2*testEventualConsistencySettleTime),
)
})
}
}
func randomDuration(max time.Duration) time.Duration {
return time.Duration(float64(max) * rand.Float64())
}
func verifyIndexBlobManagerPreventsResurrectOfDeletedContents(t *testing.T, delay1, delay2, delay3, delay4, delay5 time.Duration) {
t.Logf("delays: %v %v %v %v %v", delay1, delay2, delay3, delay4, delay5)
storageData := blobtesting.DataMap{}
fakeTime := faketime.NewTimeAdvance(fakeLocalStartTime)
fakeTimeFunc := fakeTime.NowFunc()
st := blobtesting.NewMapStorage(storageData, nil, fakeTimeFunc)
st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeTimeFunc)
st = logging.NewWrapper(st, func(msg string, args ...interface{}) {
t.Logf("[store] "+fakeTimeFunc().Format("150405.000")+" "+msg, args...)
}, "store: ")
m := newIndexBlobManagerForTesting(t, st, fakeTimeFunc)
numWritten := 0
deleted := map[string]bool{}
prefix := "prefix"
ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string {
return fakeTimeFunc().Format("150405.000") + " "
})
// index#1 - write 2 contents
must(t, writeFakeContents(ctx, m, prefix, 2, &numWritten, fakeTimeFunc))
fakeTime.Advance(delay1)
// index#2 - delete first of the two contents.
must(t, deleteFakeContents(ctx, m, prefix, 1, deleted, fakeTimeFunc))
fakeTime.Advance(delay2)
// index#3, log#3 - replaces index#1 and #2
must(t, fakeCompaction(ctx, m, true))
fakeTime.Advance(delay3)
numWritten2 := numWritten
// index#4 - create one more content
must(t, writeFakeContents(ctx, m, prefix, 2, &numWritten, fakeTimeFunc))
fakeTime.Advance(delay4)
// index#5, log#4 replaces index#3 and index#4, this will delete index#1 and index#2 and log#3
must(t, fakeCompaction(ctx, m, true))
t.Logf("************************************************ VERIFY")
// advance the time just enough for eventual consistency to be visible
fakeTime.Advance(delay5)
// using another reader, make sure that all writes up to numWritten2 are correct regardless of whether
// compaction is visible
another := newIndexBlobManagerForTesting(t, st, fakeTimeFunc)
must(t, verifyFakeContentsWritten(ctx, another, numWritten2, prefix, deleted))
// verify that this reader can see all its own writes regardless of eventual consistency
must(t, verifyFakeContentsWritten(ctx, m, numWritten, prefix, deleted))
// after eventual consistency is settled, another reader can see all our writes
fakeTime.Advance(testEventualConsistencySettleTime)
must(t, verifyFakeContentsWritten(ctx, another, numWritten, prefix, deleted))
}
type fakeContentIndexEntry struct {
ModTime time.Time
Deleted bool
}
func verifyFakeContentsWritten(ctx context.Context, m indexBlobManager, numWritten int, contentPrefix string, deletedContents map[string]bool) error {
if numWritten == 0 {
return nil
}
log(ctx).Debugf("verifyFakeContentsWritten()")
defer log(ctx).Debugf("finished verifyFakeContentsWritten()")
all, _, err := getAllFakeContents(ctx, m)
if err != nil {
return errors.Wrap(err, "error getting all contents")
}
// verify that all contents previously written can be read.
for i := 0; i < numWritten; i++ {
id := fakeContentID(contentPrefix, i)
if _, ok := all[id]; !ok {
if deletedContents[id] {
continue
}
return errors.Errorf("could not find content previously written by itself: %v (got %v)", id, all)
}
if got, want := all[id].Deleted, deletedContents[id]; got != want {
return errors.Errorf("deleted flag does not match for %v: %v want %v", id, got, want)
}
}
return nil
}
func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) error {
log(ctx).Debugf("fakeCompaction(dropDeleted=%v)", dropDeleted)
defer log(ctx).Debugf("finished fakeCompaction(dropDeleted=%v)", dropDeleted)
allContents, allBlobs, err := getAllFakeContents(ctx, m)
if err != nil {
return errors.Wrap(err, "error getting contents")
}
dropped := map[string]fakeContentIndexEntry{}
if dropDeleted {
for cid, e := range allContents {
if e.Deleted {
dropped[cid] = e
delete(allContents, cid)
}
}
}
if len(allBlobs) <= 1 {
return nil
}
outputBM, err := writeFakeIndex(ctx, m, allContents)
if err != nil {
return errors.Wrap(err, "unable to write index")
}
for cid, e := range dropped {
log(ctx).Debugf("dropped deleted %v %v from %v", cid, e, outputBM)
}
var (
inputs []blob.Metadata
outputs = []blob.Metadata{outputBM}
)
for _, bi := range allBlobs {
if bi.BlobID == outputBM.BlobID {
// no compaction, output is the same as one of the inputs
return nil
}
inputs = append(inputs, bi.Metadata)
}
if err := m.registerCompaction(ctx, inputs, outputs); err != nil {
return errors.Wrap(err, "compaction error")
}
return nil
}
func fakeContentID(prefix string, n int) string {
return fmt.Sprintf("%v-%06v", prefix, n)
}
func deleteFakeContents(ctx context.Context, m indexBlobManager, prefix string, numWritten int, deleted map[string]bool, timeFunc func() time.Time) error {
if numWritten == 0 {
return nil
}
log(ctx).Debugf("deleteFakeContents()")
defer log(ctx).Debugf("finished deleteFakeContents()")
count := rand.Intn(10) + 5
ndx := map[string]fakeContentIndexEntry{}
for i := 0; i < count; i++ {
n := fakeContentID(prefix, rand.Intn(numWritten))
if deleted[n] {
continue
}
ndx[n] = fakeContentIndexEntry{
ModTime: timeFunc(),
Deleted: true,
}
deleted[n] = true
}
if len(ndx) == 0 {
return nil
}
_, err := writeFakeIndex(ctx, m, ndx)
return err
}
func undeleteFakeContents(ctx context.Context, m indexBlobManager, deleted map[string]bool, timeFunc func() time.Time) error {
if len(deleted) == 0 {
return nil
}
log(ctx).Debugf("undeleteFakeContents()")
defer log(ctx).Debugf("finished undeleteFakeContents()")
count := rand.Intn(5)
ndx := map[string]fakeContentIndexEntry{}
for n := range deleted {
if count == 0 {
break
}
// undelete
ndx[n] = fakeContentIndexEntry{
ModTime: timeFunc(),
Deleted: false,
}
delete(deleted, n)
count--
}
if len(ndx) == 0 {
return nil
}
_, err := writeFakeIndex(ctx, m, ndx)
return err
}
func writeFakeContents(ctx context.Context, m indexBlobManager, prefix string, count int, numWritten *int, timeFunc func() time.Time) error {
log(ctx).Debugf("writeFakeContents()")
defer log(ctx).Debugf("finished writeFakeContents()")
ndx := map[string]fakeContentIndexEntry{}
for i := 0; i < count; i++ {
n := fakeContentID(prefix, *numWritten)
ndx[n] = fakeContentIndexEntry{
ModTime: timeFunc(),
}
(*numWritten)++
}
_, err := writeFakeIndex(ctx, m, ndx)
return err
}
type fakeIndexData struct {
RandomID int64
Entries map[string]fakeContentIndexEntry
}
func writeFakeIndex(ctx context.Context, m indexBlobManager, ndx map[string]fakeContentIndexEntry) (blob.Metadata, error) {
j, err := json.Marshal(fakeIndexData{
RandomID: rand.Int63(),
Entries: ndx,
})
if err != nil {
return blob.Metadata{}, errors.Wrap(err, "json error")
}
bm, err := m.writeIndexBlob(ctx, j)
if err != nil {
return blob.Metadata{}, errors.Wrap(err, "error writing blob")
}
for k, v := range ndx {
log(ctx).Debugf("wrote content %v %v in blob %v", k, v, bm)
}
return bm, nil
}
var errGetAllFakeContentsRetry = errors.New("retry")
func getAllFakeContents(ctx context.Context, m indexBlobManager) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) {
allContents, allBlobs, err := getAllFakeContentsInternal(ctx, m)
for err == errGetAllFakeContentsRetry {
allContents, allBlobs, err = getAllFakeContentsInternal(ctx, m)
}
return allContents, allBlobs, err
}
func getAllFakeContentsInternal(ctx context.Context, m indexBlobManager) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) {
blobs, err := m.listIndexBlobs(ctx, false)
if err != nil {
return nil, nil, errors.Wrap(err, "error listing index blobs")
}
log(ctx).Debugf("got blobs: %v", blobs)
allContents := map[string]fakeContentIndexEntry{}
for _, bi := range blobs {
bb, err := m.getIndexBlob(ctx, bi.BlobID)
if err == blob.ErrBlobNotFound {
return nil, nil, errGetAllFakeContentsRetry
}
if err != nil {
return nil, nil, errors.Wrap(err, "error reading blob")
}
var indexData fakeIndexData
if err := json.Unmarshal(bb, &indexData); err != nil {
log(ctx).Debugf("invalid JSON %v: %v", string(bb), err)
return nil, nil, errors.Wrap(err, "error unmarshaling")
}
// merge contents based based on time
for k, v := range indexData.Entries {
old, ok := allContents[k]
if !ok {
allContents[k] = v
} else if v.ModTime.After(old.ModTime) {
allContents[k] = v
}
}
}
return allContents, blobs, nil
}
func assertBlobCounts(t *testing.T, data blobtesting.DataMap, wantN, wantM, wantL int) {
t.Helper()
require.Len(t, keysWithPrefix(data, compactionLogBlobPrefix), wantM)
require.Len(t, keysWithPrefix(data, indexBlobPrefix), wantN)
require.Len(t, keysWithPrefix(data, "l"), wantL)
}
func keysWithPrefix(data blobtesting.DataMap, prefix blob.ID) []blob.ID {
var res []blob.ID
for k := range data {
if strings.HasPrefix(string(k), string(prefix)) {
res = append(res, k)
}
}
return res
}
func mustRegisterCompaction(t *testing.T, m indexBlobManager, inputs, outputs []blob.Metadata) {
t.Logf("compacting %v to %v", inputs, outputs)
err := m.registerCompaction(testlogging.Context(t), inputs, outputs)
if err != nil {
t.Fatalf("failed to write index blob: %v", err)
}
}
func mustWriteIndexBlob(t *testing.T, m indexBlobManager, data string) blob.Metadata {
t.Logf("writing index blob %q", data)
blobMD, err := m.writeIndexBlob(testlogging.Context(t), []byte(data))
if err != nil {
t.Fatalf("failed to write index blob: %v", err)
}
return blobMD
}
func assertIndexBlobList(t *testing.T, m indexBlobManager, wantMD ...blob.Metadata) {
t.Helper()
var want []blob.ID
for _, it := range wantMD {
want = append(want, it.BlobID)
}
l, err := m.listIndexBlobs(testlogging.Context(t), false)
if err != nil {
t.Fatalf("failed to list index blobs: %v", err)
}
t.Logf("asserting blob list %v vs %v", want, l)
var got []blob.ID
for _, it := range l {
got = append(got, it.BlobID)
}
require.ElementsMatch(t, got, want)
}
func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow func() time.Time) indexBlobManager {
p := &FormattingOptions{
Encryption: encryption.DeprecatedNoneAlgorithm,
Hash: hashing.DefaultAlgorithm,
}
enc, err := encryption.CreateEncryptor(p)
if err != nil {
t.Fatalf("unable to create encryptor: %v", err)
}
hf, err := hashing.CreateHashFunc(p)
if err != nil {
t.Fatalf("unable to create hash: %v", err)
}
lc, err := newListCache(st, &CachingOptions{})
if err != nil {
t.Fatalf("unable to create list cache: %v", err)
}
m := &indexBlobManagerImpl{
st: st,
ownWritesCache: &persistentOwnWritesCache{
blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, localTimeNow),
localTimeNow},
indexBlobCache: passthroughContentCache{st},
encryptor: enc,
hasher: hf,
listCache: lc,
timeNow: localTimeNow,
maxEventualConsistencySettleTime: testIndexBlobDeleteAge,
}
return m
}

View File

@@ -1,15 +1,15 @@
package content
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"time"
"github.com/natefinch/atomic"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/hmac"
@@ -18,70 +18,69 @@
type listCache struct {
st blob.Storage
cacheFile string
cacheFilePrefix string
listCacheDuration time.Duration
hmacSecret []byte
}
func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) {
if c.cacheFile != "" {
ci, err := c.readContentsFromCache(ctx)
func (c *listCache) listBlobs(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) {
if c.cacheFilePrefix != "" {
ci, err := c.readBlobsFromCache(ctx, prefix)
if err == nil {
expirationTime := ci.Timestamp.Add(c.listCacheDuration)
if time.Now().Before(expirationTime) { // allow:no-inject-time
log(ctx).Debugf("retrieved list of index blobs from cache")
return ci.Contents, nil
log(ctx).Debugf("retrieved list of %v '%v' index blobs from cache", len(ci.Blobs), prefix)
return ci.Blobs, nil
}
} else if err != blob.ErrBlobNotFound {
log(ctx).Warningf("unable to open cache file: %v", err)
}
}
contents, err := listIndexBlobsFromStorage(ctx, c.st)
blobs, err := blob.ListAllBlobs(ctx, c.st, prefix)
if err == nil {
c.saveListToCache(ctx, &cachedList{
Contents: contents,
c.saveListToCache(ctx, prefix, &cachedList{
Blobs: blobs,
Timestamp: time.Now(), // allow:no-inject-time
})
}
log(ctx).Debugf("found %v index blobs from source", len(contents))
log(ctx).Debugf("listed %v index blobs with prefix %v from source", len(blobs), prefix)
return contents, err
return blobs, err
}
func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) {
if c.cacheFile == "" {
func (c *listCache) saveListToCache(ctx context.Context, prefix blob.ID, ci *cachedList) {
if c.cacheFilePrefix == "" {
return
}
log(ctx).Debugf("saving index blobs to cache: %v", len(ci.Contents))
log(ctx).Debugf("saving %v blobs with prefix %v to cache", len(ci.Blobs), prefix)
if data, err := json.Marshal(ci); err == nil {
mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano()) // allow:no-inject-time
if err := ioutil.WriteFile(c.cacheFile+mySuffix, hmac.Append(data, c.hmacSecret), 0600); err != nil {
b := hmac.Append(data, c.hmacSecret)
if err := atomic.WriteFile(c.cacheFilePrefix+string(prefix), bytes.NewReader(b)); err != nil {
log(ctx).Warningf("unable to write list cache: %v", err)
}
os.Rename(c.cacheFile+mySuffix, c.cacheFile) //nolint:errcheck
os.Remove(c.cacheFile + mySuffix) //nolint:errcheck
}
}
func (c *listCache) deleteListCache() {
if c.cacheFile != "" {
os.Remove(c.cacheFile) //nolint:errcheck
func (c *listCache) deleteListCache(prefix blob.ID) {
if c.cacheFilePrefix != "" {
os.Remove(c.cacheFilePrefix + string(prefix)) //nolint:errcheck
}
}
func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, error) {
func (c *listCache) readBlobsFromCache(ctx context.Context, prefix blob.ID) (*cachedList, error) {
if !shouldUseListCache(ctx) {
return nil, blob.ErrBlobNotFound
}
ci := &cachedList{}
data, err := ioutil.ReadFile(c.cacheFile)
fname := c.cacheFilePrefix + string(prefix)
data, err := ioutil.ReadFile(fname) //nolint:gosec
if err != nil {
if os.IsNotExist(err) {
return nil, blob.ErrBlobNotFound
@@ -92,7 +91,7 @@ func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, err
data, err = hmac.VerifyAndStrip(data, c.hmacSecret)
if err != nil {
return nil, errors.Wrapf(err, "invalid file %v", c.cacheFile)
return nil, errors.Wrapf(err, "invalid file %v", fname)
}
if err := json.Unmarshal(data, &ci); err != nil {
@@ -104,36 +103,14 @@ func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, err
type cachedList struct {
Timestamp time.Time `json:"timestamp"`
Contents []IndexBlobInfo `json:"contents"`
Blobs []blob.Metadata `json:"blobs"`
}
// listIndexBlobsFromStorage returns the list of index blobs in the given storage.
// The list of contents is not guaranteed to be sorted.
func listIndexBlobsFromStorage(ctx context.Context, st blob.Storage) ([]IndexBlobInfo, error) {
snapshot, err := blob.ListAllBlobsConsistent(ctx, st, newIndexBlobPrefix, math.MaxInt32)
if err != nil {
return nil, err
}
var results []IndexBlobInfo
for _, it := range snapshot {
ii := IndexBlobInfo{
BlobID: it.BlobID,
Timestamp: it.Timestamp,
Length: it.Length,
}
results = append(results, ii)
}
return results, err
}
func newListCache(st blob.Storage, caching CachingOptions) (*listCache, error) {
var listCacheFile string
func newListCache(st blob.Storage, caching *CachingOptions) (*listCache, error) {
var listCacheFilePrefix string
if caching.CacheDirectory != "" {
listCacheFile = filepath.Join(caching.CacheDirectory, "list")
listCacheFilePrefix = filepath.Join(caching.CacheDirectory, "blob-list-")
if _, err := os.Stat(caching.CacheDirectory); os.IsNotExist(err) {
if err := os.MkdirAll(caching.CacheDirectory, 0700); err != nil {
@@ -144,14 +121,10 @@ func newListCache(st blob.Storage, caching CachingOptions) (*listCache, error) {
c := &listCache{
st: st,
cacheFile: listCacheFile,
cacheFilePrefix: listCacheFilePrefix,
hmacSecret: caching.HMACSecret,
listCacheDuration: time.Duration(caching.MaxListCacheDurationSec) * time.Second,
}
if caching.IgnoreListCache {
c.deleteListCache()
}
return c, nil
}

View File

@@ -117,12 +117,21 @@ func TestPackIndex(t *testing.T) {
data2 := buf2.Bytes()
data3 := buf3.Bytes()
if !bytes.Equal(data1, data2) {
t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1), hex.Dump(data2))
// each build produces exactly idendical prefix except for the trailing random bytes.
data1Prefix := data1[0 : len(data1)-randomSuffixSize]
data2Prefix := data2[0 : len(data2)-randomSuffixSize]
data3Prefix := data3[0 : len(data3)-randomSuffixSize]
if !bytes.Equal(data1Prefix, data2Prefix) {
t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1Prefix), hex.Dump(data2Prefix))
}
if !bytes.Equal(data2, data3) {
t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2), hex.Dump(data3))
if !bytes.Equal(data2Prefix, data3Prefix) {
t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2Prefix), hex.Dump(data3Prefix))
}
if bytes.Equal(data1, data2) {
t.Errorf("builder output expected to be different, but was the same")
}
t.Run("FuzzTest", func(t *testing.T) {

View File

@@ -148,7 +148,7 @@ func TestManifestInitCorruptedBlock(t *testing.T) {
}
// write some data to storage
bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{})
bm, err := content.NewManager(ctx, st, f, nil, content.ManagerOptions{})
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -174,7 +174,7 @@ func TestManifestInitCorruptedBlock(t *testing.T) {
}
// make a new content manager based on corrupted data.
bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{})
bm, err = content.NewManager(ctx, st, f, nil, content.ManagerOptions{})
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -305,7 +305,7 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da
Encryption: encryption.DefaultAlgorithm,
MaxPackSize: 100000,
Version: 1,
}, content.CachingOptions{}, content.ManagerOptions{})
}, nil, content.ManagerOptions{})
if err != nil {
t.Fatalf("can't create content manager: %v", err)
}

View File

@@ -80,7 +80,7 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor
st = loggingwrapper.NewWrapper(st, options.TraceStorage, "[STORAGE] ")
}
r, err := OpenWithConfig(ctx, st, lc, password, options, *lc.Caching)
r, err := OpenWithConfig(ctx, st, lc, password, options, lc.Caching)
if err != nil {
st.Close(ctx) //nolint:errcheck
return nil, err
@@ -103,7 +103,9 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor
}
// OpenWithConfig opens the repository with a given configuration, avoiding the need for a config file.
func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching content.CachingOptions) (*DirectRepository, error) {
func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching *content.CachingOptions) (*DirectRepository, error) {
caching = caching.CloneOrDefault()
// Read format blob, potentially from cache.
fb, err := readAndCacheFormatBlobBytes(ctx, st, caching.CacheDirectory)
if err != nil {
@@ -173,7 +175,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
}
// SetCachingConfig changes caching configuration for a given repository.
func (r *DirectRepository) SetCachingConfig(ctx context.Context, opt content.CachingOptions) error {
func (r *DirectRepository) SetCachingConfig(ctx context.Context, opt *content.CachingOptions) error {
lc, err := loadConfigFromFile(r.ConfigFile)
if err != nil {
return err

View File

@@ -49,6 +49,8 @@ type DirectRepository struct {
timeNow func() time.Time
formatBlob *formatBlob
masterKey []byte
closed bool
}
// DeriveKey derives encryption key of the provided length from the master key.
@@ -114,6 +116,10 @@ func (r *DirectRepository) DeleteManifest(ctx context.Context, id manifest.ID) e
// Close closes the repository and releases all resources.
func (r *DirectRepository) Close(ctx context.Context) error {
if r.closed {
return nil
}
if err := r.Flush(ctx); err != nil {
return errors.Wrap(err, "error flushing")
}
@@ -130,6 +136,8 @@ func (r *DirectRepository) Close(ctx context.Context) error {
return errors.Wrap(err, "error closing blob storage")
}
r.closed = true
return nil
}

View File

@@ -39,9 +39,13 @@ func TestIndexRecover(t *testing.T) {
e.RunAndExpectSuccess(t, "blob", "delete", indexFile)
}
// clear the cache to get rid of cache of own writes.
e.RunAndVerifyOutputLineCount(t, 0, "cache", "clear")
// there should be no index files at this point
e.RunAndVerifyOutputLineCount(t, 0, "index", "ls", "--no-list-caching")
// there should be no blocks, since there are no indexesto find them
// there should be no contents, since there are no indexes to find them
e.RunAndVerifyOutputLineCount(t, 0, "content", "ls")
// now recover index from all blocks

View File

@@ -35,7 +35,7 @@ func TestStressRepository(t *testing.T) {
t.Skip("skipping stress test during short tests")
}
ctx := content.UsingListCache(testlogging.Context(t), false)
ctx := testlogging.Context(t)
tmpPath, err := ioutil.TempDir("", "kopia")
if err != nil {

View File

@@ -31,8 +31,6 @@ func TestStressBlockManager(t *testing.T) {
duration = 30 * time.Second
}
// TODO: use blobtesting.NewEventuallyConsistentStorage(memst, 0.1) instead of memst here
stressTestWithStorage(t, memst, duration)
}
@@ -46,7 +44,7 @@ func stressTestWithStorage(t *testing.T, st blob.Storage, duration time.Duration
Encryption: "AES-256-CTR",
MaxPackSize: 20000000,
MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
}, content.CachingOptions{}, content.ManagerOptions{})
}, nil, content.ManagerOptions{})
}
seed0 := time.Now().Nanosecond()