From d68273a576c29ca6136a190bc1fd922d6ca7fca0 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 31 May 2020 17:11:20 -0700 Subject: [PATCH] Improvements for dealing with eventually-consistent stores (S3) (#437) * content: added support for cache of own writes Thi keeps track of which blobs (n and m) have been written by the local repository client, so that even if the storage listing is eventually consistent (as in S3), we get somewhat sane behavior. Note that this is still assumming read-after-create semantics, which S3 also guarantees, otherwise it's very hard to do anything useful. * compaction: support for compaction logs Instead of compaction immediately deleting source index blobs, we now write log entries (with `m` prefix) which are merged on reads and applied only if the blob list includes all inputs and outputs, in which case the inputs are discarded since they are known to have been superseded by the outputs. This addresses eventual consistency issues in stores such as S3, which don't guarantee list-after-put or list-after-delete. With such stores the repository is ultimately eventually consistent and there's not much that can be done about it, unless we use second strongly consistent storage (such as GCS) for the index only. * content: updated list cache to cache both `n` and `m` * repo: fixed cache clear on windows Clearing cache requires closing repository first, as Windows is holding the files locked. This requires ability to close the repository twice. * content: refactored index blob management into indexBlobManager * testing: fixed blobtesting.Map storage to allow overwrites * blob: added debug output String() to blob.Metadata * testing: added indexBlobManager stress test This works by using N parallel "actors", each repeatedly performing operations on indexBlobManagers all sharing single eventually consistent storage. Each actor runs in a loop and randomly selects between: - *reading* all contents in indexes and verifying that it includes all contents written by the actor so far and that contents are correctly marked as deleted - *creating* new contents - *deleting* one of previously-created contents (by the same actor) - *compacting* all index files into one The test runs on accelerated time (every read of time moves it by 0.1 seconds) and simulates several hours of running. In case of a failure, the log should provide enough debugging information to trace the exact sequence of events leading up to the failure - each log line is prefixed with actorID and all storage access is logged. * makefile: increase test timeout * content: fixed index blob manager race The race is where if we delete compaction log too early, it may lead to previously deleted contents becoming temporarily live again to an outside observer. Added test case that reproduces the issue, verified that it fails without the fix and passed with one. * testing: improvements to TestIndexBlobManagerStress test - better logging to be able to trace the root cause in case of a failure - prevented concurrent compaction which is unsafe: The sequence: 1. A creates contentA1 in INDEX-1 2. B creates contentB1 in INDEX-2 3. A deletes contentA1 in INDEX-3 4. B does compaction, but is not seeing INDEX-3 (due to EC or simply because B started read before #3 completed), so it writes INDEX-4==merge(INDEX-1,INDEX-2) * INDEX-4 has contentA1 as active 5. A does compaction but it's not seeing INDEX-4 yet (due to EC or because read started before #4), so it drops contentA1, writes INDEX-5=merge(INDEX-1,INDEX-2,INDEX-3) * INDEX-5 does not have contentA1 7. C sees INDEX-5 and INDEX-5 and merge(INDEX-4,INDEX-5) contains contentA1 which is wrong, because A has been deleted (and there's no record of it anywhere in the system) * content: when building pack index ensure index bytes are different each time by adding 32 random bytes --- Makefile | 4 +- cli/command_cache_clear.go | 10 +- cli/command_cache_set.go | 2 +- cli/command_index_list.go | 13 +- internal/blobtesting/eventually_consistent.go | 95 ++- internal/blobtesting/map.go | 4 - internal/testlogging/ctx.go | 14 + repo/blob/logging/logging_storage.go | 6 +- repo/blob/storage.go | 64 +- repo/blob/storage_test.go | 58 -- repo/connect.go | 6 +- repo/content/builder.go | 11 + repo/content/caching_options.go | 14 +- repo/content/committed_content_index.go | 15 +- repo/content/content_cache_base.go | 2 +- repo/content/content_formatter_test.go | 2 +- repo/content/content_index_recovery_test.go | 2 +- repo/content/content_manager.go | 123 ++- ...mpaction.go => content_manager_indexes.go} | 48 +- repo/content/content_manager_lock_free.go | 101 +-- repo/content/content_manager_own_writes.go | 194 +++++ repo/content/content_manager_test.go | 141 +++- repo/content/index_blob_manager.go | 462 +++++++++++ repo/content/index_blob_manager_test.go | 770 ++++++++++++++++++ repo/content/list_cache.go | 89 +- repo/content/packindex_test.go | 17 +- repo/manifest/manifest_manager_test.go | 6 +- repo/open.go | 8 +- repo/repository.go | 8 + tests/end_to_end_test/index_recover_test.go | 6 +- .../repository_stress_test.go | 2 +- tests/stress_test/stress_test.go | 4 +- 32 files changed, 1911 insertions(+), 390 deletions(-) delete mode 100644 repo/blob/storage_test.go rename repo/content/{block_manager_compaction.go => content_manager_indexes.go} (75%) create mode 100644 repo/content/content_manager_own_writes.go create mode 100644 repo/content/index_blob_manager.go create mode 100644 repo/content/index_blob_manager_test.go diff --git a/Makefile b/Makefile index 268f1853d..c24843fef 100644 --- a/Makefile +++ b/Makefile @@ -165,10 +165,10 @@ test-with-coverage-pkgonly: $(GO_TEST) -count=1 -coverprofile=tmp.cov -timeout 90s github.com/kopia/kopia/... test: - $(GO_TEST) -count=1 -timeout 90s ./... + $(GO_TEST) -count=1 -timeout 180s ./... vtest: - $(GO_TEST) -count=1 -short -v -timeout 90s ./... + $(GO_TEST) -count=1 -short -v -timeout 180s ./... dist-binary: go build -o $(KOPIA_INTEGRATION_EXE) github.com/kopia/kopia diff --git a/cli/command_cache_clear.go b/cli/command_cache_clear.go index 45453498c..857c1f24c 100644 --- a/cli/command_cache_clear.go +++ b/cli/command_cache_clear.go @@ -6,6 +6,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/repo" ) @@ -17,7 +18,14 @@ func runCacheClearCommand(ctx context.Context, rep *repo.DirectRepository) error if d := rep.Content.CachingOptions.CacheDirectory; d != "" { printStderr("Clearing cache directory: %v.\n", d) - err := os.RemoveAll(d) + // close repository before removing cache + if err := rep.Close(ctx); err != nil { + return errors.Wrap(err, "unable to close repository") + } + + err := retry.WithExponentialBackoffNoValue(ctx, "delete cache", func() error { + return os.RemoveAll(d) + }, retry.Always) if err != nil { return err } diff --git a/cli/command_cache_set.go b/cli/command_cache_set.go index 2ebfc2dfe..290261bb2 100644 --- a/cli/command_cache_set.go +++ b/cli/command_cache_set.go @@ -19,7 +19,7 @@ ) func runCacheSetCommand(ctx context.Context, rep *repo.DirectRepository) error { - opts := rep.Content.CachingOptions + opts := rep.Content.CachingOptions.CloneOrDefault() changed := 0 diff --git a/cli/command_index_list.go b/cli/command_index_list.go index ca645a364..1e280493c 100644 --- a/cli/command_index_list.go +++ b/cli/command_index_list.go @@ -9,13 +9,14 @@ ) var ( - blockIndexListCommand = indexCommands.Command("list", "List content indexes").Alias("ls").Default() - blockIndexListSummary = blockIndexListCommand.Flag("summary", "Display index blob summary").Bool() - blockIndexListSort = blockIndexListCommand.Flag("sort", "Index blob sort order").Default("time").Enum("time", "size", "name") + blockIndexListCommand = indexCommands.Command("list", "List content indexes").Alias("ls").Default() + blockIndexListSummary = blockIndexListCommand.Flag("summary", "Display index blob summary").Bool() + blockIndexListIncludeSuperseded = blockIndexListCommand.Flag("superseded", "Include inactive index files superseded by compaction").Bool() + blockIndexListSort = blockIndexListCommand.Flag("sort", "Index blob sort order").Default("time").Enum("time", "size", "name") ) func runListBlockIndexesAction(ctx context.Context, rep *repo.DirectRepository) error { - blks, err := rep.Content.IndexBlobs(ctx) + blks, err := rep.Content.IndexBlobs(ctx, *blockIndexListIncludeSuperseded) if err != nil { return err } @@ -36,11 +37,11 @@ func runListBlockIndexesAction(ctx context.Context, rep *repo.DirectRepository) } for _, b := range blks { - fmt.Printf("%-70v %10v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp)) + fmt.Printf("%-40v %10v %v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp), b.Superseded) } if *blockIndexListSummary { - fmt.Printf("total %v blocks\n", len(blks)) + fmt.Printf("total %v indexes\n", len(blks)) } return nil diff --git a/internal/blobtesting/eventually_consistent.go b/internal/blobtesting/eventually_consistent.go index a0651b273..c3ce774f6 100644 --- a/internal/blobtesting/eventually_consistent.go +++ b/internal/blobtesting/eventually_consistent.go @@ -3,7 +3,9 @@ import ( "context" "io/ioutil" + "math" "math/rand" + "strings" "sync" "time" @@ -77,10 +79,12 @@ func (e *ecCacheEntry) isValid() bool { type eventuallyConsistentStorage struct { mu sync.Mutex - listDropProbability float64 + recentlyDeleted sync.Map + listSettleTime time.Duration caches []*ecFrontendCache realStorage blob.Storage + timeNow func() time.Time } func (s *eventuallyConsistentStorage) randomFrontendCache() *ecFrontendCache { @@ -163,27 +167,89 @@ func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, d func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error { s.randomFrontendCache().put(id, nil) + // capture metadata before deleting + md, err := s.realStorage.GetMetadata(ctx, id) + + if errors.Is(err, blob.ErrBlobNotFound) { + return blob.ErrBlobNotFound + } + + if err != nil { + return err + } + if err := s.realStorage.DeleteBlob(ctx, id); err != nil { return err } + md.Timestamp = s.timeNow() + s.recentlyDeleted.Store(id, md) + return nil } +func (s *eventuallyConsistentStorage) shouldApplyInconsistency(ctx context.Context, age time.Duration, desc string) bool { + if age < 0 { + age = -age + } + + if age >= s.listSettleTime { + return false + } + + x := age.Seconds() / s.listSettleTime.Seconds() // [0..1) + + // y=1-(x^0.3) is: + // about 50% probability of inconsistency after 10% of listSettleTime + // about 25% probability of inconsistency after 40% of listSettleTime + // about 10% probability of inconsistency after 67% of listSettleTime + // about 1% probability of inconsistency after 95% of listSettleTime + + const power = 0.3 + + prob := 1 - math.Pow(x, power) + + if rand.Float64() < prob { + log(ctx).Debugf("applying inconsistency %v (probability %v)", desc, prob) + return true + } + + return false +} + func (s *eventuallyConsistentStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error { - return s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { - e := s.randomFrontendCache().get(bm.BlobID) - if e != nil { - // item recently manipulated by the cache, skip from the results with some - // probability - if rand.Float64() < s.listDropProbability { - // skip callback if locally deleted - return nil - } + now := s.timeNow() + + if err := s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { + if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "hide recently created "+string(bm.BlobID)) { + return nil } return callback(bm) + }); err != nil { + return err + } + + var resultErr error + + // process recently deleted items and resurrect them with some probability + s.recentlyDeleted.Range(func(key, value interface{}) bool { + blobID := key.(blob.ID) + if !strings.HasPrefix(string(blobID), string(prefix)) { + return true + } + + bm := value.(blob.Metadata) + if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "resurrect recently deleted "+string(bm.BlobID)) { + if resultErr = callback(bm); resultErr != nil { + return false + } + } + + return true }) + + return resultErr } func (s *eventuallyConsistentStorage) Close(ctx context.Context) error { @@ -196,10 +262,11 @@ func (s *eventuallyConsistentStorage) ConnectionInfo() blob.ConnectionInfo { // NewEventuallyConsistentStorage returns an eventually-consistent storage wrapper on top // of provided storage. -func NewEventuallyConsistentStorage(st blob.Storage, listDropProbability float64) blob.Storage { +func NewEventuallyConsistentStorage(st blob.Storage, listSettleTime time.Duration, timeNow func() time.Time) blob.Storage { return &eventuallyConsistentStorage{ - realStorage: st, - caches: make([]*ecFrontendCache, 4), - listDropProbability: listDropProbability, + realStorage: st, + caches: make([]*ecFrontendCache, 4), + listSettleTime: listSettleTime, + timeNow: timeNow, } } diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 5cd964b03..3395468ee 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -69,10 +69,6 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) e s.mutex.Lock() defer s.mutex.Unlock() - if _, ok := s.data[id]; ok { - return nil - } - s.keyTime[id] = s.timeNow() var b bytes.Buffer diff --git a/internal/testlogging/ctx.go b/internal/testlogging/ctx.go index 25a05a5d1..ae6567228 100644 --- a/internal/testlogging/ctx.go +++ b/internal/testlogging/ctx.go @@ -86,3 +86,17 @@ func ContextWithLevel(t testingT, level Level) context.Context { return &testLogger{t, "[" + module + "] ", level} }) } + +// ContextWithLevelAndPrefix returns a context with attached logger that emits all log entries with given log level or above. +func ContextWithLevelAndPrefix(t testingT, level Level, prefix string) context.Context { + return logging.WithLogger(context.Background(), func(module string) logging.Logger { + return &testLogger{t, "[" + module + "] " + prefix, level} + }) +} + +// ContextWithLevelAndPrefixFunc returns a context with attached logger that emits all log entries with given log level or above. +func ContextWithLevelAndPrefixFunc(t testingT, level Level, prefixFunc func() string) context.Context { + return logging.WithLogger(context.Background(), func(module string) logging.Logger { + return &testLogger{t, "[" + module + "] " + prefixFunc(), level} + }) +} diff --git a/repo/blob/logging/logging_storage.go b/repo/blob/logging/logging_storage.go index ae1647c59..f41e1e032 100644 --- a/repo/blob/logging/logging_storage.go +++ b/repo/blob/logging/logging_storage.go @@ -22,9 +22,9 @@ func (s *loggingStorage) GetBlob(ctx context.Context, id blob.ID, offset, length dt := time.Since(t0) if len(result) < maxLoggedBlobLength { - s.printf(s.prefix+"GetBlob(%q,%v,%v)=(%#v, %#v) took %v", id, offset, length, result, err, dt) + s.printf(s.prefix+"GetBlob(%q,%v,%v)=(%v, %#v) took %v", id, offset, length, result, err, dt) } else { - s.printf(s.prefix+"GetBlob(%q,%v,%v)=({%#v bytes}, %#v) took %v", id, offset, length, len(result), err, dt) + s.printf(s.prefix+"GetBlob(%q,%v,%v)=({%v bytes}, %#v) took %v", id, offset, length, len(result), err, dt) } return result, err @@ -35,7 +35,7 @@ func (s *loggingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta result, err := s.base.GetMetadata(ctx, id) dt := time.Since(t0) - s.printf(s.prefix+"GetMetadata(%q)=(%#v, %#v) took %v", id, result, err, dt) + s.printf(s.prefix+"GetMetadata(%q)=(%v, %#v) took %v", id, result, err, dt) return result, err } diff --git a/repo/blob/storage.go b/repo/blob/storage.go index 07ff2244d..43a51e193 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -2,6 +2,7 @@ import ( "context" + "encoding/json" "io" "sync" "time" @@ -62,9 +63,14 @@ type Storage interface { // Metadata represents metadata about a single BLOB in a storage. type Metadata struct { - BlobID ID - Length int64 - Timestamp time.Time + BlobID ID `json:"id"` + Length int64 `json:"length"` + Timestamp time.Time `json:"timestamp"` +} + +func (m *Metadata) String() string { + b, _ := json.Marshal(m) + return string(b) } // ErrBlobNotFound is returned when a BLOB cannot be found in storage. @@ -123,55 +129,3 @@ func IterateAllPrefixesInParallel(ctx context.Context, parallelism int, st Stora // return first error or nil return <-errch } - -// ListAllBlobsConsistent lists all blobs with given name prefix in the provided storage until the results are -// consistent. The results are consistent if the list result fetched twice is identical. This guarantees that while -// the first scan was in progress, no new blob was added or removed. -// maxAttempts specifies maximum number of list attempts (must be >= 2) -func ListAllBlobsConsistent(ctx context.Context, st Storage, prefix ID, maxAttempts int) ([]Metadata, error) { - var previous []Metadata - - for i := 0; i < maxAttempts; i++ { - result, err := ListAllBlobs(ctx, st, prefix) - if err != nil { - return nil, err - } - - if i > 0 && sameBlobs(result, previous) { - return result, nil - } - - previous = result - } - - return nil, errors.Errorf("unable to achieve consistent snapshot despite %v attempts", maxAttempts) -} - -// sameBlobs returns true if b1 & b2 contain the same blobs (ignoring order). -func sameBlobs(b1, b2 []Metadata) bool { - if len(b1) != len(b2) { - return false - } - - m := map[ID]Metadata{} - - for _, b := range b1 { - m[b.BlobID] = normalizeMetadata(b) - } - - for _, b := range b2 { - if r := m[b.BlobID]; r != normalizeMetadata(b) { - return false - } - } - - return true -} - -func normalizeMetadata(m Metadata) Metadata { - return Metadata{m.BlobID, m.Length, normalizeTimestamp(m.Timestamp)} -} - -func normalizeTimestamp(t time.Time) time.Time { - return time.Unix(0, t.UnixNano()) -} diff --git a/repo/blob/storage_test.go b/repo/blob/storage_test.go deleted file mode 100644 index bf001d4c9..000000000 --- a/repo/blob/storage_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package blob_test - -import ( - "testing" - "time" - - "github.com/kopia/kopia/internal/blobtesting" - "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/testlogging" - "github.com/kopia/kopia/repo/blob" -) - -func TestListAllBlobsConsistent(t *testing.T) { - ctx := testlogging.Context(t) - data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, time.Now) - st.PutBlob(ctx, "foo1", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - st.PutBlob(ctx, "foo2", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - st.PutBlob(ctx, "foo3", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - - // set up faulty storage that will add a blob while a scan is in progress. - f := &blobtesting.FaultyStorage{ - Base: st, - Faults: map[string][]*blobtesting.Fault{ - "ListBlobsItem": { - {ErrCallback: func() error { - st.PutBlob(ctx, "foo0", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - return nil - }}, - }, - }, - } - - r, err := blob.ListAllBlobsConsistent(ctx, f, "foo", 3) - if err != nil { - t.Fatalf("error: %v", err) - } - - // make sure we get the list with 4 items, not 3. - if got, want := len(r), 4; got != want { - t.Errorf("unexpected list result count: %v, want %v", got, want) - } -} - -func TestListAllBlobsConsistentEmpty(t *testing.T) { - ctx := testlogging.Context(t) - data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, time.Now) - - r, err := blob.ListAllBlobsConsistent(ctx, st, "foo", 3) - if err != nil { - t.Fatalf("error: %v", err) - } - - if got, want := len(r), 0; got != want { - t.Errorf("unexpected list result count: %v, want %v", got, want) - } -} diff --git a/repo/connect.go b/repo/connect.go index 77823e083..073cb2745 100644 --- a/repo/connect.go +++ b/repo/connect.go @@ -63,7 +63,7 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s lc.Username = getDefaultUserName(ctx) } - if err = setupCaching(ctx, configFile, &lc, opt.CachingOptions, f.UniqueID); err != nil { + if err = setupCaching(ctx, configFile, &lc, &opt.CachingOptions, f.UniqueID); err != nil { return errors.Wrap(err, "unable to set up caching") } @@ -107,7 +107,9 @@ func verifyConnect(ctx context.Context, configFile, password string, persist boo return r.Close(ctx) } -func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt content.CachingOptions, uniqueID []byte) error { +func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt *content.CachingOptions, uniqueID []byte) error { + opt = opt.CloneOrDefault() + if opt.MaxCacheSizeBytes == 0 { lc.Caching = &content.CachingOptions{} return nil diff --git a/repo/content/builder.go b/repo/content/builder.go index a942158e8..27b517b7b 100644 --- a/repo/content/builder.go +++ b/repo/content/builder.go @@ -2,6 +2,7 @@ import ( "bufio" + "crypto/rand" "encoding/binary" "io" "sort" @@ -16,6 +17,7 @@ deletedMarker = 0x80000000 entryFixedHeaderLength = 20 + randomSuffixSize = 32 ) // packIndexBuilder prepares and writes content index. @@ -107,6 +109,15 @@ func (b packIndexBuilder) Build(output io.Writer) error { return errors.Wrap(err, "error writing extra data") } + randomSuffix := make([]byte, randomSuffixSize) + if _, err := rand.Read(randomSuffix); err != nil { + return errors.Wrap(err, "error getting random bytes for suffix") + } + + if _, err := w.Write(randomSuffix); err != nil { + return errors.Wrap(err, "error writing extra random suffix to ensure indexes are always globally unique") + } + return w.Flush() } diff --git a/repo/content/caching_options.go b/repo/content/caching_options.go index 7ffb5a853..b633b2ac6 100644 --- a/repo/content/caching_options.go +++ b/repo/content/caching_options.go @@ -6,6 +6,18 @@ type CachingOptions struct { MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"` MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"` MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"` - IgnoreListCache bool `json:"-"` HMACSecret []byte `json:"-"` + + ownWritesCache ownWritesCache +} + +// CloneOrDefault returns a clone of the caching options or empty options for nil. +func (c *CachingOptions) CloneOrDefault() *CachingOptions { + if c == nil { + return &CachingOptions{} + } + + c2 := *c + + return &c2 } diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 4e4d5f235..a772f700a 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -133,7 +133,20 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b return true, nil } -func newCommittedContentIndex(caching CachingOptions) *committedContentIndex { +func (b *committedContentIndex) close() error { + b.mu.Lock() + defer b.mu.Unlock() + + for _, pi := range b.inUse { + if err := pi.Close(); err != nil { + return errors.Wrap(err, "unable to close index") + } + } + + return nil +} + +func newCommittedContentIndex(caching *CachingOptions) *committedContentIndex { var cache committedContentIndexCache if caching.CacheDirectory != "" { diff --git a/repo/content/content_cache_base.go b/repo/content/content_cache_base.go index 0e390e848..731476702 100644 --- a/repo/content/content_cache_base.go +++ b/repo/content/content_cache_base.go @@ -82,7 +82,7 @@ func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) { c.sweepMutexes() if err := c.sweepDirectory(ctx); err != nil { - log(ctx).Warningf("cacheBase sweep failed: %v", err) + log(ctx).Warningf("cache sweep failed: %v", err) } } } diff --git a/repo/content/content_formatter_test.go b/repo/content/content_formatter_test.go index f67722515..8a0d13e2d 100644 --- a/repo/content/content_formatter_test.go +++ b/repo/content/content_formatter_test.go @@ -104,7 +104,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp MaxPackSize: maxPackSize, MasterKey: make([]byte, 32), // zero key, does not matter Version: 1, - }, CachingOptions{}, time.Now, nil) + }, nil, time.Now, nil) if err != nil { t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error()) return diff --git a/repo/content/content_index_recovery_test.go b/repo/content/content_index_recovery_test.go index 767bfb0a4..67eac963b 100644 --- a/repo/content/content_index_recovery_test.go +++ b/repo/content/content_index_recovery_test.go @@ -24,7 +24,7 @@ func TestContentIndexRecovery(t *testing.T) { } // delete all index blobs - assertNoError(t, bm.st.ListBlobs(ctx, newIndexBlobPrefix, func(bi blob.Metadata) error { + assertNoError(t, bm.st.ListBlobs(ctx, indexBlobPrefix, func(bi blob.Metadata) error { log(ctx).Debugf("deleting %v", bi.BlobID) return bm.st.DeleteBlob(ctx, bi.BlobID) })) diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index c2e2f2aef..353916f3e 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -44,7 +44,7 @@ const ( parallelFetches = 5 // number of parallel reads goroutines flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes - newIndexBlobPrefix = "n" + indexBlobPrefix = "n" defaultMinPreambleLength = 32 defaultMaxPreambleLength = 32 defaultPaddingUnit = 4096 @@ -65,9 +65,8 @@ // IndexBlobInfo is an information about a single index blob managed by Manager. type IndexBlobInfo struct { - BlobID blob.ID - Length int64 - Timestamp time.Time + blob.Metadata + Superseded []blob.Metadata } // Manager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store. @@ -307,12 +306,12 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { data := b.Bytes() dataCopy := append([]byte(nil), data...) - indexBlobID, err := bm.writePackIndexesNew(ctx, data) + indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data) if err != nil { return err } - if err := bm.committedContents.addContent(ctx, indexBlobID, dataCopy, true); err != nil { + if err := bm.committedContents.addContent(ctx, indexBlobMD.BlobID, dataCopy, true); err != nil { return errors.Wrap(err, "unable to add committed content") } @@ -405,6 +404,10 @@ func (bm *Manager) Close(ctx context.Context) error { return errors.Wrap(err, "error flushing") } + if err := bm.committedContents.close(); err != nil { + return errors.Wrap(err, "error closed committed content index") + } + bm.contentCache.close() bm.metadataCache.close() bm.encryptionBufferPool.Close() @@ -666,7 +669,7 @@ type ManagerOptions struct { } // NewManager creates new content manager with given packing options and a formatter. -func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, options ManagerOptions) (*Manager, error) { +func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, options ManagerOptions) (*Manager, error) { nowFn := options.TimeNow if nowFn == nil { nowFn = time.Now // allow:no-inject-time @@ -675,7 +678,7 @@ func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, cach return newManagerWithOptions(ctx, st, f, caching, nowFn, options.RepositoryFormatBytes) } -func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) { +func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) { if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion { return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion) } @@ -689,43 +692,10 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp return nil, err } - dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") - if err != nil { - return nil, errors.Wrap(err, "unable to initialize data cache storage") - } - - dataCache, err := newContentCacheForData(ctx, st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize content cache") - } - - metadataCacheSize := caching.MaxMetadataCacheSizeBytes - if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 { - metadataCacheSize = caching.MaxCacheSizeBytes - } - - metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") - if err != nil { - return nil, errors.Wrap(err, "unable to initialize data cache storage") - } - - metadataCache, err := newContentCacheForMetadata(ctx, st, metadataCacheStorage, metadataCacheSize) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize metadata cache") - } - - listCache, err := newListCache(st, caching) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize list cache") - } - - contentIndex := newCommittedContentIndex(caching) - mu := &sync.RWMutex{} m := &Manager{ lockFreeManager: lockFreeManager{ Format: *f, - CachingOptions: caching, timeNow: timeNow, maxPackSize: f.MaxPackSize, encryptor: encryptor, @@ -733,14 +703,10 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp minPreambleLength: defaultMinPreambleLength, maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, - contentCache: dataCache, - metadataCache: metadataCache, - listCache: listCache, st: st, repositoryFormatBytes: repositoryFormatBytes, checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "", writeFormatVersion: int32(f.Version), - committedContents: contentIndex, encryptionBufferPool: buf.NewPool(ctx, defaultEncryptionBufferPoolSegmentSize+encryptor.MaxOverhead(), "content-manager-encryption"), }, @@ -752,9 +718,76 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp packIndexBuilder: make(packIndexBuilder), } + if err := setupCaches(ctx, m, caching); err != nil { + return nil, errors.Wrap(err, "unable to set up caches") + } + if err := m.CompactIndexes(ctx, autoCompactionOptions); err != nil { return nil, errors.Wrap(err, "error initializing content manager") } return m, nil } + +func setupCaches(ctx context.Context, m *Manager, caching *CachingOptions) error { + caching = caching.CloneOrDefault() + + dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") + if err != nil { + return errors.Wrap(err, "unable to initialize data cache storage") + } + + dataCache, err := newContentCacheForData(ctx, m.st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) + if err != nil { + return errors.Wrap(err, "unable to initialize content cache") + } + + metadataCacheSize := caching.MaxMetadataCacheSizeBytes + if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 { + metadataCacheSize = caching.MaxCacheSizeBytes + } + + metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") + if err != nil { + return errors.Wrap(err, "unable to initialize data cache storage") + } + + metadataCache, err := newContentCacheForMetadata(ctx, m.st, metadataCacheStorage, metadataCacheSize) + if err != nil { + return errors.Wrap(err, "unable to initialize metadata cache") + } + + listCache, err := newListCache(m.st, caching) + if err != nil { + return errors.Wrap(err, "unable to initialize list cache") + } + + if caching.ownWritesCache == nil { + // this is test hook to allow test to specify custom cache + caching.ownWritesCache, err = newOwnWritesCache(ctx, caching, m.timeNow) + if err != nil { + return errors.Wrap(err, "unable to initialize own writes cache") + } + } + + contentIndex := newCommittedContentIndex(caching) + + // once everything is ready, set it up + m.CachingOptions = *caching + m.contentCache = dataCache + m.metadataCache = metadataCache + m.committedContents = contentIndex + + m.indexBlobManager = &indexBlobManagerImpl{ + st: m.st, + encryptor: m.encryptor, + hasher: m.hasher, + timeNow: m.timeNow, + ownWritesCache: caching.ownWritesCache, + listCache: listCache, + indexBlobCache: metadataCache, + maxEventualConsistencySettleTime: defaultEventualConsistencySettleTime, + } + + return nil +} diff --git a/repo/content/block_manager_compaction.go b/repo/content/content_manager_indexes.go similarity index 75% rename from repo/content/block_manager_compaction.go rename to repo/content/content_manager_indexes.go index 8a6912bab..043458b18 100644 --- a/repo/content/block_manager_compaction.go +++ b/repo/content/content_manager_indexes.go @@ -6,6 +6,8 @@ "time" "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" ) const verySmallContentFraction = 20 // blobs less than 1/verySmallContentFraction of maxPackSize are considered 'very small' @@ -87,15 +89,18 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs [] return nil } - formatLog(ctx).Debugf("compacting %v contents", len(indexBlobs)) + formatLog(ctx).Debugf("compacting %v index blobs", len(indexBlobs)) - t0 := time.Now() // allow:no-inject-time bld := make(packIndexBuilder) + var inputs, outputs []blob.Metadata + for _, indexBlob := range indexBlobs { if err := bm.addIndexBlobsToBuilder(ctx, bld, indexBlob, opt); err != nil { - return err + return errors.Wrap(err, "error adding index to builder") } + + inputs = append(inputs, indexBlob.Metadata) } var buf bytes.Buffer @@ -103,32 +108,33 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs [] return errors.Wrap(err, "unable to build an index") } - compactedIndexBlob, err := bm.writePackIndexesNew(ctx, buf.Bytes()) + compactedIndexBlob, err := bm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes()) if err != nil { return errors.Wrap(err, "unable to write compacted indexes") } - formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0)) // allow:no-inject-time - + // compaction wrote index blob that's the same as one of the sources + // it must be a no-op. for _, indexBlob := range indexBlobs { - if indexBlob.BlobID == compactedIndexBlob { - continue + if indexBlob.BlobID == compactedIndexBlob.BlobID { + formatLog(ctx).Debugf("compaction was a no-op") + return nil } + } - bm.listCache.deleteListCache() + outputs = append(outputs, compactedIndexBlob) - if err := bm.st.DeleteBlob(ctx, indexBlob.BlobID); err != nil { - log(ctx).Warningf("unable to delete compacted blob %q: %v", indexBlob.BlobID, err) - } + if err := bm.indexBlobManager.registerCompaction(ctx, inputs, outputs); err != nil { + return errors.Wrap(err, "unable to register compaction") } return nil } func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo, opt CompactOptions) error { - data, err := bm.getIndexBlobInternal(ctx, indexBlob.BlobID) + data, err := bm.indexBlobManager.getIndexBlob(ctx, indexBlob.BlobID) if err != nil { - return err + return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID) } index, err := openPackIndex(bytes.NewReader(data)) @@ -147,3 +153,17 @@ func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuil return nil } + +func addBlobsToIndex(ndx map[blob.ID]*IndexBlobInfo, blobs []blob.Metadata) { + for _, it := range blobs { + if ndx[it.BlobID] == nil { + ndx[it.BlobID] = &IndexBlobInfo{ + Metadata: blob.Metadata{ + BlobID: it.BlobID, + Length: it.Length, + Timestamp: it.Timestamp, + }, + } + } + } +} diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index c23a55359..847bf28b9 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -7,7 +7,6 @@ cryptorand "crypto/rand" "encoding/hex" "io" - "strings" "sync" "time" @@ -25,11 +24,11 @@ type lockFreeManager struct { // this one is not lock-free Stats Stats - listCache *listCache st blob.Storage Format FormattingOptions CachingOptions CachingOptions + indexBlobManager indexBlobManager contentCache contentCache metadataCache contentCache committedContents *committedContentIndex @@ -67,6 +66,8 @@ func (bm *lockFreeManager) maybeEncryptContentDataForPacking(output *gather.Writ return errors.Wrap(err, "unable to encrypt") } + bm.Stats.encrypted(len(data)) + output.Append(cipherText) return nil @@ -93,32 +94,32 @@ func (bm *lockFreeManager) loadPackIndexesUnlocked(ctx context.Context) ([]Index } if i > 0 { - bm.listCache.deleteListCache() + bm.indexBlobManager.flushCache() log(ctx).Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i) time.Sleep(nextSleepTime) nextSleepTime *= 2 } - contents, err := bm.listCache.listIndexBlobs(ctx) + indexBlobs, err := bm.indexBlobManager.listIndexBlobs(ctx, false) if err != nil { return nil, false, err } - err = bm.tryLoadPackIndexBlobsUnlocked(ctx, contents) + err = bm.tryLoadPackIndexBlobsUnlocked(ctx, indexBlobs) if err == nil { - var contentIDs []blob.ID - for _, b := range contents { - contentIDs = append(contentIDs, b.BlobID) + var indexBlobIDs []blob.ID + for _, b := range indexBlobs { + indexBlobIDs = append(indexBlobIDs, b.BlobID) } var updated bool - updated, err = bm.committedContents.use(ctx, contentIDs) + updated, err = bm.committedContents.use(ctx, indexBlobIDs) if err != nil { return nil, false, err } - return contents, updated, nil + return indexBlobs, updated, nil } if err != blob.ErrBlobNotFound { @@ -129,8 +130,8 @@ func (bm *lockFreeManager) loadPackIndexesUnlocked(ctx context.Context) ([]Index return nil, false, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts) } -func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, contents []IndexBlobInfo) error { - ch, unprocessedIndexesSize, err := bm.unprocessedIndexBlobsUnlocked(ctx, contents) +func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, indexBlobs []IndexBlobInfo) error { + ch, unprocessedIndexesSize, err := bm.unprocessedIndexBlobsUnlocked(ctx, indexBlobs) if err != nil { return err } @@ -152,7 +153,7 @@ func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, co defer wg.Done() for indexBlobID := range ch { - data, err := bm.getIndexBlobInternal(ctx, indexBlobID) + data, err := bm.indexBlobManager.getIndexBlob(ctx, indexBlobID) if err != nil { errch <- err return @@ -317,37 +318,8 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi } // IndexBlobs returns the list of active index blobs. -func (bm *lockFreeManager) IndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) { - return bm.listCache.listIndexBlobs(ctx) -} - -func (bm *lockFreeManager) getIndexBlobInternal(ctx context.Context, blobID blob.ID) ([]byte, error) { - payload, err := bm.metadataCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) - if err != nil { - return nil, err - } - - iv, err := getIndexBlobIV(blobID) - if err != nil { - return nil, err - } - - bm.Stats.readContent(len(payload)) - - payload, err = bm.encryptor.Decrypt(nil, payload, iv) - bm.Stats.decrypted(len(payload)) - - if err != nil { - return nil, errors.Wrap(err, "decrypt error") - } - - // Since the encryption key is a function of data, we must be able to generate exactly the same key - // after decrypting the content. This serves as a checksum. - if err := bm.verifyChecksum(payload, iv); err != nil { - return nil, err - } - - return payload, nil +func (bm *lockFreeManager) IndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) { + return bm.indexBlobManager.listIndexBlobs(ctx, includeInactive) } func getPackedContentIV(output []byte, contentID ID) ([]byte, error) { @@ -359,49 +331,12 @@ func getPackedContentIV(output []byte, contentID ID) ([]byte, error) { return output[0:n], nil } -func getIndexBlobIV(s blob.ID) ([]byte, error) { - if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic - s = s[0:p] - } - - return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):])) -} - func (bm *lockFreeManager) writePackFileNotLocked(ctx context.Context, packFile blob.ID, data gather.Bytes) error { bm.Stats.wroteContent(data.Length()) - bm.listCache.deleteListCache() return bm.st.PutBlob(ctx, packFile, data) } -func (bm *lockFreeManager) encryptAndWriteBlobNotLocked(ctx context.Context, data []byte, prefix blob.ID) (blob.ID, error) { - var hashOutput [maxHashSize]byte - - hash := bm.hashData(hashOutput[:0], data) - blobID := prefix + blob.ID(hex.EncodeToString(hash)) - - iv, err := getIndexBlobIV(blobID) - if err != nil { - return "", err - } - - bm.Stats.encrypted(len(data)) - - data2, err := bm.encryptor.Encrypt(nil, data, iv) - if err != nil { - return "", err - } - - bm.Stats.wroteContent(len(data2)) - bm.listCache.deleteListCache() - - if err := bm.st.PutBlob(ctx, blobID, gather.FromSlice(data2)); err != nil { - return "", err - } - - return blobID, nil -} - func (bm *lockFreeManager) hashData(output, data []byte) []byte { // Hash the content and compute encryption key. contentID := bm.hasher(output, data) @@ -410,10 +345,6 @@ func (bm *lockFreeManager) hashData(output, data []byte) []byte { return contentID } -func (bm *lockFreeManager) writePackIndexesNew(ctx context.Context, data []byte) (blob.ID, error) { - return bm.encryptAndWriteBlobNotLocked(ctx, data, newIndexBlobPrefix) -} - func (bm *lockFreeManager) verifyChecksum(data, contentID []byte) error { var hashOutput [maxHashSize]byte diff --git a/repo/content/content_manager_own_writes.go b/repo/content/content_manager_own_writes.go new file mode 100644 index 000000000..ca5a2cc82 --- /dev/null +++ b/repo/content/content_manager_own_writes.go @@ -0,0 +1,194 @@ +package content + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/filesystem" +) + +const ownWritesCacheRetention = 15 * time.Minute + +type ownWritesCache interface { + add(ctx context.Context, mb blob.Metadata) error + merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) + delete(ctx context.Context, md blob.ID) error +} + +// nullOwnWritesCache is an implementation of ownWritesCache that ignores all changes. +type nullOwnWritesCache struct { +} + +func (n *nullOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error { + return nil +} + +func (n *nullOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error { + return nil +} + +func (n *nullOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) { + return source, nil +} + +// memoryOwnWritesCache is an implementation of ownWritesCache that caches in memory. +type memoryOwnWritesCache struct { + entries sync.Map + timeNow func() time.Time +} + +func (n *memoryOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error { + log(ctx).Debugf("adding %v to own-writes cache", mb.BlobID) + n.entries.Store(mb.BlobID, mb) + + return nil +} + +func (n *memoryOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error { + return n.add(ctx, blob.Metadata{ + BlobID: blobID, + Length: -1, + Timestamp: n.timeNow(), + }) +} + +func (n *memoryOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) { + var result []blob.Metadata + + n.entries.Range(func(key, value interface{}) bool { + md := value.(blob.Metadata) + if !strings.HasPrefix(string(md.BlobID), string(prefix)) { + return true + } + + if age := n.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention { + result = append(result, md) + } else { + log(ctx).Debugf("deleting stale own writes cache entry: %v (%v)", key, age) + + n.entries.Delete(key) + } + + return true + }) + + return mergeOwnWrites(ctx, source, result), nil +} + +// persistentOwnWritesCache is an implementation of ownWritesCache that caches entries to strongly consistent blob storage. +type persistentOwnWritesCache struct { + st blob.Storage + timeNow func() time.Time +} + +func (d *persistentOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error { + j, err := json.Marshal(mb) + if err != nil { + return errors.Wrap(err, "unable to marshal JSON") + } + + return d.st.PutBlob(ctx, mb.BlobID, gather.FromSlice(j)) +} + +func (d *persistentOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) { + var myWrites []blob.Metadata + + err := d.st.ListBlobs(ctx, prefix, func(md blob.Metadata) error { + b, err := d.st.GetBlob(ctx, md.BlobID, 0, -1) + if err == blob.ErrBlobNotFound { + return nil + } + + if err != nil { + return errors.Wrapf(err, "error reading own write cache entry %v", md.BlobID) + } + + var originalMD blob.Metadata + + if err := json.Unmarshal(b, &originalMD); err != nil { + return errors.Wrapf(err, "error unmarshaling own write cache entry %v", md.BlobID) + } + + // note that we're assuming that time scale used by timeNow() is the same as used by + // cache storage, which is fine, since the cache is local and not on remote FS. + if age := d.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention { + myWrites = append(myWrites, originalMD) + } else { + log(ctx).Debugf("deleting blob %v from own-write cache because it's too old: %v (%v)", md.BlobID, age, originalMD.Timestamp) + + if err := d.st.DeleteBlob(ctx, md.BlobID); err != nil && err != blob.ErrBlobNotFound { + return errors.Wrap(err, "error deleting stale blob") + } + } + + return nil + }) + + return mergeOwnWrites(ctx, source, myWrites), err +} + +func (d *persistentOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error { + return d.add(ctx, blob.Metadata{ + BlobID: blobID, + Length: -1, + Timestamp: d.timeNow(), + }) +} + +func mergeOwnWrites(ctx context.Context, source, own []blob.Metadata) []blob.Metadata { + m := map[blob.ID]blob.Metadata{} + + for _, v := range source { + m[v.BlobID] = v + } + + for _, v := range own { + if v.Length < 0 { + delete(m, v.BlobID) + } else { + m[v.BlobID] = v + } + } + + var s []blob.Metadata + + for _, v := range m { + s = append(s, v) + } + + log(ctx).Debugf("merged %v backend blobs and %v local blobs into %v", source, own, s) + + return s +} + +func newOwnWritesCache(ctx context.Context, caching *CachingOptions, timeNow func() time.Time) (ownWritesCache, error) { + if caching.CacheDirectory == "" { + return &memoryOwnWritesCache{timeNow: timeNow}, nil + } + + dirname := filepath.Join(caching.CacheDirectory, "own-writes") + + if err := os.MkdirAll(dirname, 0700); err != nil { + return nil, errors.Wrap(err, "unable to create own writes cache directory") + } + + st, err := filesystem.New(ctx, &filesystem.Options{ + Path: dirname, + DirectoryShards: []int{}, + }) + + if err != nil { + return nil, errors.Wrap(err, "unable to create own writes cache storage") + } + + return &persistentOwnWritesCache{st, timeNow}, nil +} diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 02ad91a8d..d24fe23f5 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -22,6 +22,7 @@ "github.com/kopia/kopia/internal/faketime" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/logging" ) const ( @@ -202,7 +203,7 @@ func TestContentManagerEmpty(t *testing.T) { func verifyActiveIndexBlobCount(ctx context.Context, t *testing.T, bm *Manager, expected int) { t.Helper() - blks, err := bm.IndexBlobs(ctx) + blks, err := bm.IndexBlobs(ctx, false) if err != nil { t.Errorf("error listing active index blobs: %v", err) return @@ -319,7 +320,7 @@ func TestContentManagerFailedToWritePack(t *testing.T) { MaxPackSize: maxPackSize, HMACSecret: []byte("foo"), MasterKey: []byte("0123456789abcdef0123456789abcdef"), - }, CachingOptions{}, faketime.Frozen(fakeTime), nil) + }, nil, faketime.Frozen(fakeTime), nil) if err != nil { t.Fatalf("can't create bm: %v", err) } @@ -410,17 +411,13 @@ func TestContentManagerConcurrency(t *testing.T) { verifyContent(ctx, t, bm4, bm2content, seededRandomData(32, 100)) verifyContent(ctx, t, bm4, bm3content, seededRandomData(33, 100)) - if got, want := getIndexCount(data), 4; got != want { - t.Errorf("unexpected index count before compaction: %v, wanted %v", got, want) - } + validateIndexCount(t, data, 4, 0) if err := bm4.CompactIndexes(ctx, CompactOptions{MaxSmallBlobs: 1}); err != nil { t.Errorf("compaction error: %v", err) } - if got, want := getIndexCount(data), 1; got != want { - t.Errorf("unexpected index count after compaction: %v, wanted %v", got, want) - } + validateIndexCount(t, data, 5, 1) // new content manager at this point can see all data. bm5 := newTestContentManager(t, data, keyTime, nil) @@ -437,6 +434,30 @@ func TestContentManagerConcurrency(t *testing.T) { } } +func validateIndexCount(t *testing.T, data map[blob.ID][]byte, wantIndexCount, wantCompactionLogCount int) { + t.Helper() + + var indexCnt, compactionLogCnt int + + for blobID := range data { + if strings.HasPrefix(string(blobID), indexBlobPrefix) { + indexCnt++ + } + + if strings.HasPrefix(string(blobID), compactionLogBlobPrefix) { + compactionLogCnt++ + } + } + + if got, want := indexCnt, wantIndexCount; got != want { + t.Fatalf("unexpected index blob count %v, want %v", got, want) + } + + if got, want := compactionLogCnt, wantCompactionLogCount; got != want { + t.Fatalf("unexpected compaction log blob count %v, want %v", got, want) + } +} + func TestDeleteContent(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} @@ -1694,6 +1715,82 @@ func verifyVersionCompat(t *testing.T, writeVersion int) { verifyContentManagerDataSet(ctx, t, mgr, dataSet) } +func TestReadsOwnWritesWithEventualConsistencyPersistentOwnWritesCache(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + cacheData := blobtesting.DataMap{} + cacheKeyTime := map[blob.ID]time.Time{} + cacheSt := blobtesting.NewMapStorage(cacheData, cacheKeyTime, timeNow) + ecst := blobtesting.NewEventuallyConsistentStorage( + logging.NewWrapper(st, t.Logf, "[STORAGE] "), + 3*time.Second, + timeNow) + + // disable own writes cache, will still be ok if store is strongly consistent + verifyReadsOwnWrites(t, ecst, timeNow, &persistentOwnWritesCache{ + st: cacheSt, + timeNow: timeNow, + }) +} + +func TestReadsOwnWritesWithStrongConsistencyAndNoCaching(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + + // if we used nullOwnWritesCache and eventual consistency, the test would fail + // st = blobtesting.NewEventuallyConsistentStorage(logging.NewWrapper(st, t.Logf, "[STORAGE] "), 0.1) + + // disable own writes cache, will still be ok if store is strongly consistent + verifyReadsOwnWrites(t, st, timeNow, &nullOwnWritesCache{}) +} + +func TestReadsOwnWritesWithEventualConsistencyInMemoryOwnWritesCache(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + ecst := blobtesting.NewEventuallyConsistentStorage( + logging.NewWrapper(st, t.Logf, "[STORAGE] "), + 3*time.Second, + timeNow) + + verifyReadsOwnWrites(t, ecst, timeNow, &memoryOwnWritesCache{timeNow: timeNow}) +} + +func verifyReadsOwnWrites(t *testing.T, st blob.Storage, timeNow func() time.Time, sharedOwnWritesCache ownWritesCache) { + ctx := testlogging.Context(t) + cachingOptions := &CachingOptions{ + ownWritesCache: sharedOwnWritesCache, + } + + bm := newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow) + + ids := make([]ID, 100) + for i := 0; i < len(ids); i++ { + ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2)) + + for j := 0; j < i; j++ { + // verify all contents written so far + verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2)) + } + + // every 10 contents, create new content manager + if i%10 == 0 { + t.Logf("------- reopening -----") + must(t, bm.Close(ctx)) + bm = newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow) + } + } + + must(t, bm.Close(ctx)) + bm = newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow) + + for i := 0; i < len(ids); i++ { + verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2)) + } +} + func verifyContentManagerDataSet(ctx context.Context, t *testing.T, mgr *Manager, dataSet map[ID][]byte) { for contentID, originalPayload := range dataSet { v, err := mgr.GetContent(ctx, contentID) @@ -1714,6 +1811,10 @@ func newTestContentManager(t *testing.T, data blobtesting.DataMap, keyTime map[b } func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc func() time.Time) *Manager { + return newTestContentManagerWithStorageAndCaching(t, st, nil, timeFunc) +} + +func newTestContentManagerWithStorageAndCaching(t *testing.T, st blob.Storage, co *CachingOptions, timeFunc func() time.Time) *Manager { if timeFunc == nil { timeFunc = faketime.AutoAdvance(fakeTime, 1*time.Second) } @@ -1724,7 +1825,7 @@ func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc fu HMACSecret: hmacSecret, MaxPackSize: maxPackSize, Version: 1, - }, CachingOptions{}, timeFunc, nil) + }, co, timeFunc, nil) if err != nil { panic("can't create content manager: " + err.Error()) } @@ -1734,18 +1835,6 @@ func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc fu return bm } -func getIndexCount(d blobtesting.DataMap) int { - var cnt int - - for blobID := range d { - if strings.HasPrefix(string(blobID), newIndexBlobPrefix) { - cnt++ - } - } - - return cnt -} - func verifyContentNotFound(ctx context.Context, t *testing.T, bm *Manager, contentID ID) { t.Helper() @@ -1760,7 +1849,7 @@ func verifyContent(ctx context.Context, t *testing.T, bm *Manager, contentID ID, b2, err := bm.GetContent(ctx, contentID) if err != nil { - t.Errorf("unable to read content %q: %v", contentID, err) + t.Fatalf("unable to read content %q: %v", contentID, err) return } @@ -1893,3 +1982,11 @@ func getContentInfo(t *testing.T, bm *Manager, c ID) Info { return i } + +func must(t *testing.T, err error) { + t.Helper() + + if err != nil { + t.Fatal(err) + } +} diff --git a/repo/content/index_blob_manager.go b/repo/content/index_blob_manager.go new file mode 100644 index 000000000..4d3e30dcc --- /dev/null +++ b/repo/content/index_blob_manager.go @@ -0,0 +1,462 @@ +package content + +import ( + "bytes" + "context" + "crypto/aes" + "encoding/hex" + "encoding/json" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" +) + +// indexBlobManager is the API of index blob manager as used by content manager. +type indexBlobManager interface { + writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) + listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) + getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) + registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error + flushCache() +} + +const ( + compactionLogBlobPrefix = "m" + cleanupBlobPrefix = "l" + defaultEventualConsistencySettleTime = 1 * time.Hour +) + +// compactionLogEntry represents contents of compaction log entry stored in `m` blob. +type compactionLogEntry struct { + // list of input blob names that were compacted together. + InputMetadata []blob.Metadata `json:"inputMetadata"` + + // list of blobs that are results of compaction. + OutputMetadata []blob.Metadata `json:"outputMetadata"` + + // Metadata of the compaction blob itself, not serialized. + metadata blob.Metadata +} + +// cleanupEntry represents contents of cleanup entry stored in `l` blob. +type cleanupEntry struct { + BlobID blob.ID `json:"blobID"` + + age time.Duration // not serialized, computed on load +} + +type indexBlobManagerImpl struct { + st blob.Storage + hasher hashing.HashFunc + encryptor encryption.Encryptor + listCache *listCache + ownWritesCache ownWritesCache + timeNow func() time.Time + indexBlobCache contentCache + maxEventualConsistencySettleTime time.Duration +} + +func (m *indexBlobManagerImpl) listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) { + compactionLogMetadata, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix) + if err != nil { + return nil, errors.Wrap(err, "error listing compaction log entries") + } + + compactionLogMetadata, err = m.ownWritesCache.merge(ctx, compactionLogBlobPrefix, compactionLogMetadata) + if err != nil { + return nil, errors.Wrap(err, "error merging local writes for compaction log entries") + } + + storageIndexBlobs, err := m.listCache.listBlobs(ctx, indexBlobPrefix) + if err != nil { + return nil, errors.Wrap(err, "error listing index blobs") + } + + storageIndexBlobs, err = m.ownWritesCache.merge(ctx, indexBlobPrefix, storageIndexBlobs) + if err != nil { + return nil, errors.Wrap(err, "error merging local writes for index blobs") + } + + indexMap := map[blob.ID]*IndexBlobInfo{} + addBlobsToIndex(indexMap, storageIndexBlobs) + + compactionLogs, err := m.getCompactionLogEntries(ctx, compactionLogMetadata) + if err != nil { + return nil, errors.Wrap(err, "error reading compaction log") + } + + // remove entries from indexMap that have been compacted and replaced by other indexes. + removeCompactedIndexes(ctx, indexMap, compactionLogs, includeInactive) + + var results []IndexBlobInfo + for _, v := range indexMap { + results = append(results, *v) + } + + return results, nil +} + +func (m *indexBlobManagerImpl) flushCache() { + m.listCache.deleteListCache(indexBlobPrefix) + m.listCache.deleteListCache(compactionLogBlobPrefix) +} + +func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error { + logEntryBytes, err := json.Marshal(&compactionLogEntry{ + InputMetadata: inputs, + OutputMetadata: outputs, + }) + if err != nil { + return errors.Wrap(err, "unable to marshal log entry bytes") + } + + compactionLogBlobMetadata, err := m.encryptAndWriteBlob(ctx, logEntryBytes, compactionLogBlobPrefix) + if err != nil { + return errors.Wrap(err, "unable to write compaction log") + } + + formatLog(ctx).Debugf("compacted indexes %v into %v and wrote log %v", inputs, outputs, compactionLogBlobMetadata) + + if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata); err != nil { + return errors.Wrap(err, "error deleting old index blobs") + } + + return nil +} + +func (m *indexBlobManagerImpl) getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) { + return m.getEncryptedBlob(ctx, blobID) +} + +func (m *indexBlobManagerImpl) getEncryptedBlob(ctx context.Context, blobID blob.ID) ([]byte, error) { + payload, err := m.indexBlobCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) + if err != nil { + return nil, err + } + + iv, err := getIndexBlobIV(blobID) + if err != nil { + return nil, err + } + + payload, err = m.encryptor.Decrypt(nil, payload, iv) + + if err != nil { + return nil, errors.Wrap(err, "decrypt error") + } + + // Since the encryption key is a function of data, we must be able to generate exactly the same key + // after decrypting the content. This serves as a checksum. + if err := m.verifyChecksum(payload, iv); err != nil { + return nil, err + } + + return payload, nil +} + +func (m *indexBlobManagerImpl) verifyChecksum(data, contentID []byte) error { + var hashOutput [maxHashSize]byte + + expected := m.hasher(hashOutput[:0], data) + expected = expected[len(expected)-aes.BlockSize:] + + if !bytes.HasSuffix(contentID, expected) { + return errors.Errorf("invalid checksum for blob %x, expected %x", contentID, expected) + } + + return nil +} + +func (m *indexBlobManagerImpl) writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) { + return m.encryptAndWriteBlob(ctx, data, indexBlobPrefix) +} + +func (m *indexBlobManagerImpl) encryptAndWriteBlob(ctx context.Context, data []byte, prefix blob.ID) (blob.Metadata, error) { + var hashOutput [maxHashSize]byte + + hash := m.hasher(hashOutput[:0], data) + blobID := prefix + blob.ID(hex.EncodeToString(hash)) + + iv, err := getIndexBlobIV(blobID) + if err != nil { + return blob.Metadata{}, err + } + + data2, err := m.encryptor.Encrypt(nil, data, iv) + if err != nil { + return blob.Metadata{}, err + } + + m.listCache.deleteListCache(prefix) + + err = m.st.PutBlob(ctx, blobID, gather.FromSlice(data2)) + if err != nil { + return blob.Metadata{}, err + } + + bm, err := m.st.GetMetadata(ctx, blobID) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "unable to get blob metadata") + } + + if err := m.ownWritesCache.add(ctx, bm); err != nil { + log(ctx).Warningf("unable to cache own write: %v", err) + } + + return bm, nil +} + +func (m *indexBlobManagerImpl) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) { + results := map[blob.ID]*compactionLogEntry{} + + for _, cb := range blobs { + data, err := m.getEncryptedBlob(ctx, cb.BlobID) + + if errors.Is(err, blob.ErrBlobNotFound) { + continue + } + + if err != nil { + return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID) + } + + le := &compactionLogEntry{} + + if err := json.Unmarshal(data, le); err != nil { + return nil, errors.Wrap(err, "unable to read compaction log entry %q") + } + + le.metadata = cb + + results[cb.BlobID] = le + } + + return results, nil +} + +func (m *indexBlobManagerImpl) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) { + results := map[blob.ID]*cleanupEntry{} + + for _, cb := range blobs { + data, err := m.getEncryptedBlob(ctx, cb.BlobID) + + if errors.Is(err, blob.ErrBlobNotFound) { + continue + } + + if err != nil { + return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID) + } + + le := &cleanupEntry{} + + if err := json.Unmarshal(data, le); err != nil { + return nil, errors.Wrap(err, "unable to read compaction log entry %q") + } + + le.age = latestServerBlobTime.Sub(cb.Timestamp) + + results[cb.BlobID] = le + } + + return results, nil +} + +func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata) error { + allCompactionLogBlobs, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix) + if err != nil { + return errors.Wrap(err, "error listing compaction log blobs") + } + + // look for server-assigned timestamp of the compaction log entry we just wrote as a reference. + // we're assuming server-generated timestamps are somewhat reasonable and time is moving + compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-m.maxEventualConsistencySettleTime) + compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff) + + log(ctx).Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff) + + compactionBlobEntries, err := m.getCompactionLogEntries(ctx, compactionBlobs) + if err != nil { + return errors.Wrap(err, "unable to get compaction log entries") + } + + allCleanupBlobs, err := m.listCache.listBlobs(ctx, cleanupBlobPrefix) + if err != nil { + return errors.Wrap(err, "error listing cleanup blobs") + } + + cleanupEntries, err := m.getCleanupEntries(ctx, latestBlob.Timestamp, allCleanupBlobs) + if err != nil { + return errors.Wrap(err, "error loading cleanup blobs") + } + + indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries) + compactionLogBlobsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries) + + // note that we must always delete index blobs first before compaction logs + // otherwise we may inadvertedly resurrect an index blob that should have been removed. + if err := m.deleteBlobsFromStorageAndCache(ctx, indexBlobsToDelete); err != nil { + return errors.Wrap(err, "unable to delete compaction logs") + } + + compactionLogBlobsToDelayCleanup := m.findCompactionLogBlobsToDelayCleanup(ctx, compactionBlobs) + + if err := m.delayCleanupBlobs(ctx, compactionLogBlobsToDelayCleanup); err != nil { + return errors.Wrap(err, "unable to schedule delayed cleanup of blobs") + } + + if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogBlobsToDelete); err != nil { + return errors.Wrap(err, "unable to delete compaction logs") + } + + if err := m.deleteBlobsFromStorageAndCache(ctx, cleanupBlobsToDelete); err != nil { + return errors.Wrap(err, "unable to delete cleanup blobs") + } + + m.flushCache() + + return nil +} + +func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry) []blob.ID { + tmp := map[blob.ID]bool{} + + for _, cl := range entries { + // are the input index blobs in this compaction eligble for deletion? + if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < m.maxEventualConsistencySettleTime { + log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, m.maxEventualConsistencySettleTime) + continue + } + + for _, b := range cl.InputMetadata { + log(ctx).Debugf("will delete old index %v compacted to %v", b, cl.OutputMetadata) + + tmp[b.BlobID] = true + } + } + + var result []blob.ID + + for k := range tmp { + result = append(result, k) + } + + return result +} + +func (m *indexBlobManagerImpl) findCompactionLogBlobsToDelayCleanup(ctx context.Context, compactionBlobs []blob.Metadata) []blob.ID { + var result []blob.ID + + for _, cb := range compactionBlobs { + log(ctx).Debugf("will delete compaction log blob %v", cb) + result = append(result, cb.BlobID) + } + + return result +} + +func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry) (compactionLogs, cleanupBlobs []blob.ID) { + for _, e := range entries { + if e.age > m.maxEventualConsistencySettleTime { + compactionLogs = append(compactionLogs, e.BlobID) + cleanupBlobs = append(cleanupBlobs, e.BlobID) + } + } + + return +} + +func (m *indexBlobManagerImpl) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID) error { + for _, b := range blobIDs { + payload, err := json.Marshal(&cleanupEntry{ + BlobID: b, + }) + if err != nil { + return errors.Wrap(err, "unable to marshal cleanup log bytes") + } + + if _, err := m.encryptAndWriteBlob(ctx, payload, cleanupBlobPrefix); err != nil { + return errors.Wrap(err, "unable to cleanup log") + } + } + + return nil +} + +func (m *indexBlobManagerImpl) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error { + for _, blobID := range blobIDs { + if err := m.st.DeleteBlob(ctx, blobID); err != nil && err != blob.ErrBlobNotFound { + return errors.Wrapf(err, "unable to delete blob %v", blobID) + } + + if err := m.ownWritesCache.delete(ctx, blobID); err != nil { + return errors.Wrapf(err, "unable to delete blob %v from own-writes cache", blobID) + } + } + + return nil +} + +func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata { + var res []blob.Metadata + + for _, m := range m { + if !m.Timestamp.After(cutoffTime) { + res = append(res, m) + } + } + + return res +} + +func getIndexBlobIV(s blob.ID) ([]byte, error) { + if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic + s = s[0:p] + } + + return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):])) +} + +func removeCompactedIndexes(ctx context.Context, m map[blob.ID]*IndexBlobInfo, compactionLogs map[blob.ID]*compactionLogEntry, markAsSuperseded bool) { + var validCompactionLogs []*compactionLogEntry + + for _, cl := range compactionLogs { + // only process compaction logs for which we have found all the outputs. + haveAllOutputs := true + + for _, o := range cl.OutputMetadata { + if m[o.BlobID] == nil { + haveAllOutputs = false + + log(ctx).Debugf("blob %v referenced by compaction log is not found", o.BlobID) + + break + } + } + + if haveAllOutputs { + validCompactionLogs = append(validCompactionLogs, cl) + } + } + + // now remove all inputs from the set if there's a valid compaction log entry with all the outputs. + for _, cl := range validCompactionLogs { + for _, ib := range cl.InputMetadata { + if md := m[ib.BlobID]; md != nil && md.Superseded == nil { + log(ctx).Debugf("ignoring index blob %v (%v) because it's been compacted to %v", ib, md.Timestamp, cl.OutputMetadata) + + if markAsSuperseded { + md.Superseded = cl.OutputMetadata + } else { + delete(m, ib.BlobID) + } + } + } + } +} diff --git a/repo/content/index_blob_manager_test.go b/repo/content/index_blob_manager_test.go new file mode 100644 index 000000000..8d338ad79 --- /dev/null +++ b/repo/content/index_blob_manager_test.go @@ -0,0 +1,770 @@ +package content + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "os" + "strings" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/logging" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" +) + +// we use two fake time sources - one for local client and one for the remote store +// to simulate clock drift +var ( + fakeLocalStartTime = time.Date(2020, 1, 1, 14, 0, 0, 0, time.UTC) + fakeStoreStartTime = time.Date(2020, 1, 1, 10, 0, 0, 0, time.UTC) +) + +const ( + testIndexBlobDeleteAge = 1 * time.Minute + testEventualConsistencySettleTime = 45 * time.Second +) + +func TestIndexBlobManager(t *testing.T) { + cases := []struct { + storageTimeAdvanceBetweenCompactions time.Duration + wantIndexCount int + wantCompactionLogCount int + wantCleanupCount int + }{ + { + // we write 6 index blobs and 2 compaction logs + // but not enough time has passed to delete anything + storageTimeAdvanceBetweenCompactions: 0, + wantIndexCount: 6, + wantCompactionLogCount: 2, + }, + { + // we write 6 index blobs and 2 compaction logs + // enough time has passed to delete 3 indexes and create cleanup log + storageTimeAdvanceBetweenCompactions: testIndexBlobDeleteAge + 1*time.Second, + wantIndexCount: 3, + wantCompactionLogCount: 2, + wantCleanupCount: 1, + }, + } + + for _, tc := range cases { + tc := tc + + t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) { + // fake underlying blob store with fake time + storageData := blobtesting.DataMap{} + + fakeLocalTime := faketime.NewTimeAdvance(fakeLocalStartTime) + fakeStorageTime := faketime.NewTimeAdvance(fakeStoreStartTime) + + st := blobtesting.NewMapStorage(storageData, nil, fakeStorageTime.NowFunc()) + st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeStorageTime.NowFunc()) + m := newIndexBlobManagerForTesting(t, st, fakeLocalTime.NowFunc()) + + assertIndexBlobList(t, m) + + b1 := mustWriteIndexBlob(t, m, "index-1") + assertIndexBlobList(t, m, b1) + fakeStorageTime.Advance(1 * time.Second) + + b2 := mustWriteIndexBlob(t, m, "index-2") + assertIndexBlobList(t, m, b1, b2) + fakeStorageTime.Advance(1 * time.Second) + + b3 := mustWriteIndexBlob(t, m, "index-3") + assertIndexBlobList(t, m, b1, b2, b3) + fakeStorageTime.Advance(1 * time.Second) + + b4 := mustWriteIndexBlob(t, m, "index-4") + assertIndexBlobList(t, m, b1, b2, b3, b4) + fakeStorageTime.Advance(1 * time.Second) + assertBlobCounts(t, storageData, 4, 0, 0) + + // first compaction b1+b2+b3=>b4 + mustRegisterCompaction(t, m, []blob.Metadata{b1, b2, b3}, []blob.Metadata{b4}) + + assertIndexBlobList(t, m, b4) + fakeStorageTime.Advance(tc.storageTimeAdvanceBetweenCompactions) + + // second compaction b4+b5=>b6 + b5 := mustWriteIndexBlob(t, m, "index-5") + b6 := mustWriteIndexBlob(t, m, "index-6") + mustRegisterCompaction(t, m, []blob.Metadata{b4, b5}, []blob.Metadata{b6}) + assertIndexBlobList(t, m, b6) + assertBlobCounts(t, storageData, tc.wantIndexCount, tc.wantCompactionLogCount, tc.wantCleanupCount) + }) + } +} + +type action int + +const ( + actionWrite = 1 + actionRead = 2 + actionCompact = 3 + actionDelete = 4 + actionUndelete = 5 + actionCompactAndDropDeleted = 6 +) + +// actionsTestIndexBlobManagerStress is a set of actionsTestIndexBlobManagerStress by each actor performed in TestIndexBlobManagerStress with weights +var actionsTestIndexBlobManagerStress = []struct { + a action + weight int +}{ + {actionWrite, 10}, + {actionRead, 10}, + {actionCompact, 10}, + {actionDelete, 10}, + {actionUndelete, 10}, + {actionCompactAndDropDeleted, 10}, +} + +func pickRandomActionTestIndexBlobManagerStress() action { + sum := 0 + for _, a := range actionsTestIndexBlobManagerStress { + sum += a.weight + } + + n := rand.Intn(sum) + for _, a := range actionsTestIndexBlobManagerStress { + if n < a.weight { + return a.a + } + + n -= a.weight + } + + panic("impossible") +} + +// TestIndexBlobManagerStress launches N actors, each randomly writing new index blobs, +// verifying that all blobs previously written by it are correct and randomly compacting blobs. +// nolint:gocyclo +func TestIndexBlobManagerStress(t *testing.T) { + t.Parallel() + + rand.Seed(time.Now().UnixNano()) + + for i := range actionsTestIndexBlobManagerStress { + actionsTestIndexBlobManagerStress[i].weight = rand.Intn(100) + t.Logf("weight[%v] = %v", i, actionsTestIndexBlobManagerStress[i].weight) + } + + var ( + fakeTimeFunc = faketime.AutoAdvance(fakeLocalStartTime, 100*time.Millisecond) + deadline time.Time // when (according to fakeTimeFunc should the test finish) + localTimeDeadline time.Time // when (according to time.Now, the test should finish) + ) + + localTimeDeadline = time.Now().Add(30 * time.Second) + + if os.Getenv("CI") != "" { + // when running on CI, simulate 4 hours, this takes about ~15-20 seconds. + deadline = fakeTimeFunc().Add(4 * time.Hour) + } else { + // otherwise test only 1 hour, which still provides decent coverage, takes about 3-5 seconds. + deadline = fakeTimeFunc().Add(1 * time.Hour) + } + + // shared storage + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, fakeTimeFunc) + + var eg errgroup.Group + + numActors := 2 + + for actorID := 0; actorID < numActors; actorID++ { + actorID := actorID + loggedSt := logging.NewWrapper(st, func(m string, args ...interface{}) { + t.Logf(fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID)+m, args...) + }, "") + contentPrefix := fmt.Sprintf("a%v", actorID) + + eg.Go(func() error { + numWritten := 0 + deletedContents := map[string]bool{} + ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string { + return fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID) + }) + + m := newIndexBlobManagerForTesting(t, loggedSt, fakeTimeFunc) + + // run stress test until the deadline, aborting early on any failure + for fakeTimeFunc().Before(deadline) && time.Now().Before(localTimeDeadline) { + switch pickRandomActionTestIndexBlobManagerStress() { + case actionRead: + if err := verifyFakeContentsWritten(ctx, m, numWritten, contentPrefix, deletedContents); err != nil { + return errors.Wrapf(err, "actor[%v] error verifying contents", actorID) + } + + case actionWrite: + if err := writeFakeContents(ctx, m, contentPrefix, rand.Intn(10)+5, &numWritten, fakeTimeFunc); err != nil { + return errors.Wrapf(err, "actor[%v] write error", actorID) + } + + case actionDelete: + if err := deleteFakeContents(ctx, m, contentPrefix, numWritten, deletedContents, fakeTimeFunc); err != nil { + return errors.Wrapf(err, "actor[%v] delete error", actorID) + } + + case actionUndelete: + if err := undeleteFakeContents(ctx, m, deletedContents, fakeTimeFunc); err != nil { + return errors.Wrapf(err, "actor[%v] undelete error", actorID) + } + + case actionCompact: + // compaction by more than one actor is unsafe, do it only if actorID == 0 + if actorID != 0 { + continue + } + + if err := fakeCompaction(ctx, m, false); err != nil { + return errors.Wrapf(err, "actor[%v] compaction error", actorID) + } + + case actionCompactAndDropDeleted: + // compaction by more than one actor is unsafe, do it only if actorID == 0 + if actorID != 0 { + continue + } + + if err := fakeCompaction(ctx, m, true); err != nil { + return errors.Wrapf(err, "actor[%v] compaction error", actorID) + } + } + } + + return nil + }) + } + + if err := eg.Wait(); err != nil { + t.Errorf("err: %+v", err) + } +} + +func TestIndexBlobManagerPreventsResurrectOfDeletedContents(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + // the test is randomized and runs very quickly, run it lots of times + failed := false + for i := 0; i < 100 && !failed; i++ { + t.Run(fmt.Sprintf("attempt-%v", i), func(t *testing.T) { + verifyIndexBlobManagerPreventsResurrectOfDeletedContents( + t, 1*time.Second, 1*time.Second, testIndexBlobDeleteAge, 1*time.Second, 2*time.Second, + ) + }) + } +} + +func TestCompactionCreatesPreviousIndex(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + storageData := blobtesting.DataMap{} + + fakeTime := faketime.NewTimeAdvance(fakeLocalStartTime) + fakeTimeFunc := fakeTime.NowFunc() + + st := blobtesting.NewMapStorage(storageData, nil, fakeTimeFunc) + st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeTimeFunc) + st = logging.NewWrapper(st, func(msg string, args ...interface{}) { + t.Logf("[store] "+fakeTimeFunc().Format("150405.000")+" "+msg, args...) + }, "store: ") + m := newIndexBlobManagerForTesting(t, st, fakeTimeFunc) + + numWritten := 0 + deleted := map[string]bool{} + + prefix := "prefix" + ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string { + return fakeTimeFunc().Format("150405.000") + " " + }) + + // index#1 - add content1 + must(t, writeFakeContents(ctx, m, prefix, 1, &numWritten, fakeTimeFunc)) + fakeTime.Advance(1 * time.Second) + + // index#2 - add content2 + must(t, writeFakeContents(ctx, m, prefix, 1, &numWritten, fakeTimeFunc)) + fakeTime.Advance(1 * time.Second) + + // index#3 - {content1, content2}, index#1, index#2 marked for deletion + must(t, fakeCompaction(ctx, m, false)) + fakeTime.Advance(1 * time.Second) + + // index#4 - delete content1 + must(t, deleteFakeContents(ctx, m, prefix, 1, deleted, fakeTimeFunc)) + fakeTime.Advance(1 * time.Second) + + // this will create index identical to index#2, + // we will embed random ID in the index to ensure that they get different blob ID each time. + // otherwise (since indexes are based on hash of content) they would create the same blob ID. + // if this was the case, first compaction marks index#1 as deleted and second compaction + // revives it. + must(t, fakeCompaction(ctx, m, true)) + fakeTime.Advance(testEventualConsistencySettleTime) + + // if we were not to add randomness to index blobs, this would fail. + must(t, verifyFakeContentsWritten(ctx, m, 2, prefix, deleted)) +} + +func TestIndexBlobManagerPreventsResurrectOfDeletedContents_RandomizedTimings(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + // the test is randomized and runs very quickly, run it lots of times + for i := 0; i < 1000; i++ { + t.Run(fmt.Sprintf("attempt-%v", i), func(t *testing.T) { + verifyIndexBlobManagerPreventsResurrectOfDeletedContents( + t, + randomDuration(10*time.Second), + randomDuration(10*time.Second), + testIndexBlobDeleteAge+randomDuration(testIndexBlobDeleteAge), + randomDuration(10*time.Second), + randomDuration(2*testEventualConsistencySettleTime), + ) + }) + } +} + +func randomDuration(max time.Duration) time.Duration { + return time.Duration(float64(max) * rand.Float64()) +} + +func verifyIndexBlobManagerPreventsResurrectOfDeletedContents(t *testing.T, delay1, delay2, delay3, delay4, delay5 time.Duration) { + t.Logf("delays: %v %v %v %v %v", delay1, delay2, delay3, delay4, delay5) + + storageData := blobtesting.DataMap{} + + fakeTime := faketime.NewTimeAdvance(fakeLocalStartTime) + fakeTimeFunc := fakeTime.NowFunc() + + st := blobtesting.NewMapStorage(storageData, nil, fakeTimeFunc) + st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeTimeFunc) + st = logging.NewWrapper(st, func(msg string, args ...interface{}) { + t.Logf("[store] "+fakeTimeFunc().Format("150405.000")+" "+msg, args...) + }, "store: ") + m := newIndexBlobManagerForTesting(t, st, fakeTimeFunc) + + numWritten := 0 + deleted := map[string]bool{} + + prefix := "prefix" + ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string { + return fakeTimeFunc().Format("150405.000") + " " + }) + + // index#1 - write 2 contents + must(t, writeFakeContents(ctx, m, prefix, 2, &numWritten, fakeTimeFunc)) + fakeTime.Advance(delay1) + // index#2 - delete first of the two contents. + must(t, deleteFakeContents(ctx, m, prefix, 1, deleted, fakeTimeFunc)) + fakeTime.Advance(delay2) + // index#3, log#3 - replaces index#1 and #2 + must(t, fakeCompaction(ctx, m, true)) + fakeTime.Advance(delay3) + + numWritten2 := numWritten + + // index#4 - create one more content + must(t, writeFakeContents(ctx, m, prefix, 2, &numWritten, fakeTimeFunc)) + fakeTime.Advance(delay4) + + // index#5, log#4 replaces index#3 and index#4, this will delete index#1 and index#2 and log#3 + must(t, fakeCompaction(ctx, m, true)) + + t.Logf("************************************************ VERIFY") + + // advance the time just enough for eventual consistency to be visible + fakeTime.Advance(delay5) + + // using another reader, make sure that all writes up to numWritten2 are correct regardless of whether + // compaction is visible + another := newIndexBlobManagerForTesting(t, st, fakeTimeFunc) + must(t, verifyFakeContentsWritten(ctx, another, numWritten2, prefix, deleted)) + + // verify that this reader can see all its own writes regardless of eventual consistency + must(t, verifyFakeContentsWritten(ctx, m, numWritten, prefix, deleted)) + + // after eventual consistency is settled, another reader can see all our writes + fakeTime.Advance(testEventualConsistencySettleTime) + must(t, verifyFakeContentsWritten(ctx, another, numWritten, prefix, deleted)) +} + +type fakeContentIndexEntry struct { + ModTime time.Time + Deleted bool +} + +func verifyFakeContentsWritten(ctx context.Context, m indexBlobManager, numWritten int, contentPrefix string, deletedContents map[string]bool) error { + if numWritten == 0 { + return nil + } + + log(ctx).Debugf("verifyFakeContentsWritten()") + defer log(ctx).Debugf("finished verifyFakeContentsWritten()") + + all, _, err := getAllFakeContents(ctx, m) + if err != nil { + return errors.Wrap(err, "error getting all contents") + } + + // verify that all contents previously written can be read. + for i := 0; i < numWritten; i++ { + id := fakeContentID(contentPrefix, i) + if _, ok := all[id]; !ok { + if deletedContents[id] { + continue + } + + return errors.Errorf("could not find content previously written by itself: %v (got %v)", id, all) + } + + if got, want := all[id].Deleted, deletedContents[id]; got != want { + return errors.Errorf("deleted flag does not match for %v: %v want %v", id, got, want) + } + } + + return nil +} + +func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) error { + log(ctx).Debugf("fakeCompaction(dropDeleted=%v)", dropDeleted) + defer log(ctx).Debugf("finished fakeCompaction(dropDeleted=%v)", dropDeleted) + + allContents, allBlobs, err := getAllFakeContents(ctx, m) + if err != nil { + return errors.Wrap(err, "error getting contents") + } + + dropped := map[string]fakeContentIndexEntry{} + + if dropDeleted { + for cid, e := range allContents { + if e.Deleted { + dropped[cid] = e + + delete(allContents, cid) + } + } + } + + if len(allBlobs) <= 1 { + return nil + } + + outputBM, err := writeFakeIndex(ctx, m, allContents) + if err != nil { + return errors.Wrap(err, "unable to write index") + } + + for cid, e := range dropped { + log(ctx).Debugf("dropped deleted %v %v from %v", cid, e, outputBM) + } + + var ( + inputs []blob.Metadata + outputs = []blob.Metadata{outputBM} + ) + + for _, bi := range allBlobs { + if bi.BlobID == outputBM.BlobID { + // no compaction, output is the same as one of the inputs + return nil + } + + inputs = append(inputs, bi.Metadata) + } + + if err := m.registerCompaction(ctx, inputs, outputs); err != nil { + return errors.Wrap(err, "compaction error") + } + + return nil +} + +func fakeContentID(prefix string, n int) string { + return fmt.Sprintf("%v-%06v", prefix, n) +} + +func deleteFakeContents(ctx context.Context, m indexBlobManager, prefix string, numWritten int, deleted map[string]bool, timeFunc func() time.Time) error { + if numWritten == 0 { + return nil + } + + log(ctx).Debugf("deleteFakeContents()") + defer log(ctx).Debugf("finished deleteFakeContents()") + + count := rand.Intn(10) + 5 + + ndx := map[string]fakeContentIndexEntry{} + + for i := 0; i < count; i++ { + n := fakeContentID(prefix, rand.Intn(numWritten)) + if deleted[n] { + continue + } + + ndx[n] = fakeContentIndexEntry{ + ModTime: timeFunc(), + Deleted: true, + } + + deleted[n] = true + } + + if len(ndx) == 0 { + return nil + } + + _, err := writeFakeIndex(ctx, m, ndx) + + return err +} + +func undeleteFakeContents(ctx context.Context, m indexBlobManager, deleted map[string]bool, timeFunc func() time.Time) error { + if len(deleted) == 0 { + return nil + } + + log(ctx).Debugf("undeleteFakeContents()") + defer log(ctx).Debugf("finished undeleteFakeContents()") + + count := rand.Intn(5) + + ndx := map[string]fakeContentIndexEntry{} + + for n := range deleted { + if count == 0 { + break + } + + // undelete + ndx[n] = fakeContentIndexEntry{ + ModTime: timeFunc(), + Deleted: false, + } + + delete(deleted, n) + count-- + } + + if len(ndx) == 0 { + return nil + } + + _, err := writeFakeIndex(ctx, m, ndx) + + return err +} + +func writeFakeContents(ctx context.Context, m indexBlobManager, prefix string, count int, numWritten *int, timeFunc func() time.Time) error { + log(ctx).Debugf("writeFakeContents()") + defer log(ctx).Debugf("finished writeFakeContents()") + + ndx := map[string]fakeContentIndexEntry{} + + for i := 0; i < count; i++ { + n := fakeContentID(prefix, *numWritten) + ndx[n] = fakeContentIndexEntry{ + ModTime: timeFunc(), + } + + (*numWritten)++ + } + + _, err := writeFakeIndex(ctx, m, ndx) + + return err +} + +type fakeIndexData struct { + RandomID int64 + Entries map[string]fakeContentIndexEntry +} + +func writeFakeIndex(ctx context.Context, m indexBlobManager, ndx map[string]fakeContentIndexEntry) (blob.Metadata, error) { + j, err := json.Marshal(fakeIndexData{ + RandomID: rand.Int63(), + Entries: ndx, + }) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "json error") + } + + bm, err := m.writeIndexBlob(ctx, j) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "error writing blob") + } + + for k, v := range ndx { + log(ctx).Debugf("wrote content %v %v in blob %v", k, v, bm) + } + + return bm, nil +} + +var errGetAllFakeContentsRetry = errors.New("retry") + +func getAllFakeContents(ctx context.Context, m indexBlobManager) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) { + allContents, allBlobs, err := getAllFakeContentsInternal(ctx, m) + + for err == errGetAllFakeContentsRetry { + allContents, allBlobs, err = getAllFakeContentsInternal(ctx, m) + } + + return allContents, allBlobs, err +} + +func getAllFakeContentsInternal(ctx context.Context, m indexBlobManager) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) { + blobs, err := m.listIndexBlobs(ctx, false) + if err != nil { + return nil, nil, errors.Wrap(err, "error listing index blobs") + } + + log(ctx).Debugf("got blobs: %v", blobs) + + allContents := map[string]fakeContentIndexEntry{} + + for _, bi := range blobs { + bb, err := m.getIndexBlob(ctx, bi.BlobID) + if err == blob.ErrBlobNotFound { + return nil, nil, errGetAllFakeContentsRetry + } + + if err != nil { + return nil, nil, errors.Wrap(err, "error reading blob") + } + + var indexData fakeIndexData + + if err := json.Unmarshal(bb, &indexData); err != nil { + log(ctx).Debugf("invalid JSON %v: %v", string(bb), err) + return nil, nil, errors.Wrap(err, "error unmarshaling") + } + + // merge contents based based on time + for k, v := range indexData.Entries { + old, ok := allContents[k] + + if !ok { + allContents[k] = v + } else if v.ModTime.After(old.ModTime) { + allContents[k] = v + } + } + } + + return allContents, blobs, nil +} + +func assertBlobCounts(t *testing.T, data blobtesting.DataMap, wantN, wantM, wantL int) { + t.Helper() + require.Len(t, keysWithPrefix(data, compactionLogBlobPrefix), wantM) + require.Len(t, keysWithPrefix(data, indexBlobPrefix), wantN) + require.Len(t, keysWithPrefix(data, "l"), wantL) +} + +func keysWithPrefix(data blobtesting.DataMap, prefix blob.ID) []blob.ID { + var res []blob.ID + + for k := range data { + if strings.HasPrefix(string(k), string(prefix)) { + res = append(res, k) + } + } + + return res +} + +func mustRegisterCompaction(t *testing.T, m indexBlobManager, inputs, outputs []blob.Metadata) { + t.Logf("compacting %v to %v", inputs, outputs) + + err := m.registerCompaction(testlogging.Context(t), inputs, outputs) + if err != nil { + t.Fatalf("failed to write index blob: %v", err) + } +} + +func mustWriteIndexBlob(t *testing.T, m indexBlobManager, data string) blob.Metadata { + t.Logf("writing index blob %q", data) + + blobMD, err := m.writeIndexBlob(testlogging.Context(t), []byte(data)) + if err != nil { + t.Fatalf("failed to write index blob: %v", err) + } + + return blobMD +} + +func assertIndexBlobList(t *testing.T, m indexBlobManager, wantMD ...blob.Metadata) { + t.Helper() + + var want []blob.ID + for _, it := range wantMD { + want = append(want, it.BlobID) + } + + l, err := m.listIndexBlobs(testlogging.Context(t), false) + if err != nil { + t.Fatalf("failed to list index blobs: %v", err) + } + + t.Logf("asserting blob list %v vs %v", want, l) + + var got []blob.ID + for _, it := range l { + got = append(got, it.BlobID) + } + + require.ElementsMatch(t, got, want) +} + +func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow func() time.Time) indexBlobManager { + p := &FormattingOptions{ + Encryption: encryption.DeprecatedNoneAlgorithm, + Hash: hashing.DefaultAlgorithm, + } + + enc, err := encryption.CreateEncryptor(p) + if err != nil { + t.Fatalf("unable to create encryptor: %v", err) + } + + hf, err := hashing.CreateHashFunc(p) + if err != nil { + t.Fatalf("unable to create hash: %v", err) + } + + lc, err := newListCache(st, &CachingOptions{}) + if err != nil { + t.Fatalf("unable to create list cache: %v", err) + } + + m := &indexBlobManagerImpl{ + st: st, + ownWritesCache: &persistentOwnWritesCache{ + blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, localTimeNow), + localTimeNow}, + indexBlobCache: passthroughContentCache{st}, + encryptor: enc, + hasher: hf, + listCache: lc, + timeNow: localTimeNow, + maxEventualConsistencySettleTime: testIndexBlobDeleteAge, + } + + return m +} diff --git a/repo/content/list_cache.go b/repo/content/list_cache.go index 3fc931ad9..7c2c5cee0 100644 --- a/repo/content/list_cache.go +++ b/repo/content/list_cache.go @@ -1,15 +1,15 @@ package content import ( + "bytes" "context" "encoding/json" - "fmt" "io/ioutil" - "math" "os" "path/filepath" "time" + "github.com/natefinch/atomic" "github.com/pkg/errors" "github.com/kopia/kopia/internal/hmac" @@ -18,70 +18,69 @@ type listCache struct { st blob.Storage - cacheFile string + cacheFilePrefix string listCacheDuration time.Duration hmacSecret []byte } -func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) { - if c.cacheFile != "" { - ci, err := c.readContentsFromCache(ctx) +func (c *listCache) listBlobs(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) { + if c.cacheFilePrefix != "" { + ci, err := c.readBlobsFromCache(ctx, prefix) if err == nil { expirationTime := ci.Timestamp.Add(c.listCacheDuration) if time.Now().Before(expirationTime) { // allow:no-inject-time - log(ctx).Debugf("retrieved list of index blobs from cache") - return ci.Contents, nil + log(ctx).Debugf("retrieved list of %v '%v' index blobs from cache", len(ci.Blobs), prefix) + return ci.Blobs, nil } } else if err != blob.ErrBlobNotFound { log(ctx).Warningf("unable to open cache file: %v", err) } } - contents, err := listIndexBlobsFromStorage(ctx, c.st) + blobs, err := blob.ListAllBlobs(ctx, c.st, prefix) if err == nil { - c.saveListToCache(ctx, &cachedList{ - Contents: contents, + c.saveListToCache(ctx, prefix, &cachedList{ + Blobs: blobs, Timestamp: time.Now(), // allow:no-inject-time }) } - log(ctx).Debugf("found %v index blobs from source", len(contents)) + log(ctx).Debugf("listed %v index blobs with prefix %v from source", len(blobs), prefix) - return contents, err + return blobs, err } -func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) { - if c.cacheFile == "" { +func (c *listCache) saveListToCache(ctx context.Context, prefix blob.ID, ci *cachedList) { + if c.cacheFilePrefix == "" { return } - log(ctx).Debugf("saving index blobs to cache: %v", len(ci.Contents)) + log(ctx).Debugf("saving %v blobs with prefix %v to cache", len(ci.Blobs), prefix) if data, err := json.Marshal(ci); err == nil { - mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano()) // allow:no-inject-time - if err := ioutil.WriteFile(c.cacheFile+mySuffix, hmac.Append(data, c.hmacSecret), 0600); err != nil { + b := hmac.Append(data, c.hmacSecret) + if err := atomic.WriteFile(c.cacheFilePrefix+string(prefix), bytes.NewReader(b)); err != nil { log(ctx).Warningf("unable to write list cache: %v", err) } - - os.Rename(c.cacheFile+mySuffix, c.cacheFile) //nolint:errcheck - os.Remove(c.cacheFile + mySuffix) //nolint:errcheck } } -func (c *listCache) deleteListCache() { - if c.cacheFile != "" { - os.Remove(c.cacheFile) //nolint:errcheck +func (c *listCache) deleteListCache(prefix blob.ID) { + if c.cacheFilePrefix != "" { + os.Remove(c.cacheFilePrefix + string(prefix)) //nolint:errcheck } } -func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, error) { +func (c *listCache) readBlobsFromCache(ctx context.Context, prefix blob.ID) (*cachedList, error) { if !shouldUseListCache(ctx) { return nil, blob.ErrBlobNotFound } ci := &cachedList{} - data, err := ioutil.ReadFile(c.cacheFile) + fname := c.cacheFilePrefix + string(prefix) + + data, err := ioutil.ReadFile(fname) //nolint:gosec if err != nil { if os.IsNotExist(err) { return nil, blob.ErrBlobNotFound @@ -92,7 +91,7 @@ func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, err data, err = hmac.VerifyAndStrip(data, c.hmacSecret) if err != nil { - return nil, errors.Wrapf(err, "invalid file %v", c.cacheFile) + return nil, errors.Wrapf(err, "invalid file %v", fname) } if err := json.Unmarshal(data, &ci); err != nil { @@ -104,36 +103,14 @@ func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, err type cachedList struct { Timestamp time.Time `json:"timestamp"` - Contents []IndexBlobInfo `json:"contents"` + Blobs []blob.Metadata `json:"blobs"` } -// listIndexBlobsFromStorage returns the list of index blobs in the given storage. -// The list of contents is not guaranteed to be sorted. -func listIndexBlobsFromStorage(ctx context.Context, st blob.Storage) ([]IndexBlobInfo, error) { - snapshot, err := blob.ListAllBlobsConsistent(ctx, st, newIndexBlobPrefix, math.MaxInt32) - if err != nil { - return nil, err - } - - var results []IndexBlobInfo - - for _, it := range snapshot { - ii := IndexBlobInfo{ - BlobID: it.BlobID, - Timestamp: it.Timestamp, - Length: it.Length, - } - results = append(results, ii) - } - - return results, err -} - -func newListCache(st blob.Storage, caching CachingOptions) (*listCache, error) { - var listCacheFile string +func newListCache(st blob.Storage, caching *CachingOptions) (*listCache, error) { + var listCacheFilePrefix string if caching.CacheDirectory != "" { - listCacheFile = filepath.Join(caching.CacheDirectory, "list") + listCacheFilePrefix = filepath.Join(caching.CacheDirectory, "blob-list-") if _, err := os.Stat(caching.CacheDirectory); os.IsNotExist(err) { if err := os.MkdirAll(caching.CacheDirectory, 0700); err != nil { @@ -144,14 +121,10 @@ func newListCache(st blob.Storage, caching CachingOptions) (*listCache, error) { c := &listCache{ st: st, - cacheFile: listCacheFile, + cacheFilePrefix: listCacheFilePrefix, hmacSecret: caching.HMACSecret, listCacheDuration: time.Duration(caching.MaxListCacheDurationSec) * time.Second, } - if caching.IgnoreListCache { - c.deleteListCache() - } - return c, nil } diff --git a/repo/content/packindex_test.go b/repo/content/packindex_test.go index 79cce8bf4..01113e692 100644 --- a/repo/content/packindex_test.go +++ b/repo/content/packindex_test.go @@ -117,12 +117,21 @@ func TestPackIndex(t *testing.T) { data2 := buf2.Bytes() data3 := buf3.Bytes() - if !bytes.Equal(data1, data2) { - t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1), hex.Dump(data2)) + // each build produces exactly idendical prefix except for the trailing random bytes. + data1Prefix := data1[0 : len(data1)-randomSuffixSize] + data2Prefix := data2[0 : len(data2)-randomSuffixSize] + data3Prefix := data3[0 : len(data3)-randomSuffixSize] + + if !bytes.Equal(data1Prefix, data2Prefix) { + t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1Prefix), hex.Dump(data2Prefix)) } - if !bytes.Equal(data2, data3) { - t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2), hex.Dump(data3)) + if !bytes.Equal(data2Prefix, data3Prefix) { + t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2Prefix), hex.Dump(data3Prefix)) + } + + if bytes.Equal(data1, data2) { + t.Errorf("builder output expected to be different, but was the same") } t.Run("FuzzTest", func(t *testing.T) { diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index eefe33215..c7916d77f 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -148,7 +148,7 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // write some data to storage - bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{}) + bm, err := content.NewManager(ctx, st, f, nil, content.ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } @@ -174,7 +174,7 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // make a new content manager based on corrupted data. - bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{}) + bm, err = content.NewManager(ctx, st, f, nil, content.ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } @@ -305,7 +305,7 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da Encryption: encryption.DefaultAlgorithm, MaxPackSize: 100000, Version: 1, - }, content.CachingOptions{}, content.ManagerOptions{}) + }, nil, content.ManagerOptions{}) if err != nil { t.Fatalf("can't create content manager: %v", err) } diff --git a/repo/open.go b/repo/open.go index fcc1a1d5a..a607d3c0a 100644 --- a/repo/open.go +++ b/repo/open.go @@ -80,7 +80,7 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor st = loggingwrapper.NewWrapper(st, options.TraceStorage, "[STORAGE] ") } - r, err := OpenWithConfig(ctx, st, lc, password, options, *lc.Caching) + r, err := OpenWithConfig(ctx, st, lc, password, options, lc.Caching) if err != nil { st.Close(ctx) //nolint:errcheck return nil, err @@ -103,7 +103,9 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor } // OpenWithConfig opens the repository with a given configuration, avoiding the need for a config file. -func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching content.CachingOptions) (*DirectRepository, error) { +func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching *content.CachingOptions) (*DirectRepository, error) { + caching = caching.CloneOrDefault() + // Read format blob, potentially from cache. fb, err := readAndCacheFormatBlobBytes(ctx, st, caching.CacheDirectory) if err != nil { @@ -173,7 +175,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw } // SetCachingConfig changes caching configuration for a given repository. -func (r *DirectRepository) SetCachingConfig(ctx context.Context, opt content.CachingOptions) error { +func (r *DirectRepository) SetCachingConfig(ctx context.Context, opt *content.CachingOptions) error { lc, err := loadConfigFromFile(r.ConfigFile) if err != nil { return err diff --git a/repo/repository.go b/repo/repository.go index 94faef6ba..0fc291228 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -49,6 +49,8 @@ type DirectRepository struct { timeNow func() time.Time formatBlob *formatBlob masterKey []byte + + closed bool } // DeriveKey derives encryption key of the provided length from the master key. @@ -114,6 +116,10 @@ func (r *DirectRepository) DeleteManifest(ctx context.Context, id manifest.ID) e // Close closes the repository and releases all resources. func (r *DirectRepository) Close(ctx context.Context) error { + if r.closed { + return nil + } + if err := r.Flush(ctx); err != nil { return errors.Wrap(err, "error flushing") } @@ -130,6 +136,8 @@ func (r *DirectRepository) Close(ctx context.Context) error { return errors.Wrap(err, "error closing blob storage") } + r.closed = true + return nil } diff --git a/tests/end_to_end_test/index_recover_test.go b/tests/end_to_end_test/index_recover_test.go index e3574ec33..e9e0fe7ef 100644 --- a/tests/end_to_end_test/index_recover_test.go +++ b/tests/end_to_end_test/index_recover_test.go @@ -39,9 +39,13 @@ func TestIndexRecover(t *testing.T) { e.RunAndExpectSuccess(t, "blob", "delete", indexFile) } + // clear the cache to get rid of cache of own writes. + e.RunAndVerifyOutputLineCount(t, 0, "cache", "clear") + // there should be no index files at this point e.RunAndVerifyOutputLineCount(t, 0, "index", "ls", "--no-list-caching") - // there should be no blocks, since there are no indexesto find them + + // there should be no contents, since there are no indexes to find them e.RunAndVerifyOutputLineCount(t, 0, "content", "ls") // now recover index from all blocks diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index 022bc2408..4c4b05a20 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -35,7 +35,7 @@ func TestStressRepository(t *testing.T) { t.Skip("skipping stress test during short tests") } - ctx := content.UsingListCache(testlogging.Context(t), false) + ctx := testlogging.Context(t) tmpPath, err := ioutil.TempDir("", "kopia") if err != nil { diff --git a/tests/stress_test/stress_test.go b/tests/stress_test/stress_test.go index f48b48a1e..289019096 100644 --- a/tests/stress_test/stress_test.go +++ b/tests/stress_test/stress_test.go @@ -31,8 +31,6 @@ func TestStressBlockManager(t *testing.T) { duration = 30 * time.Second } - // TODO: use blobtesting.NewEventuallyConsistentStorage(memst, 0.1) instead of memst here - stressTestWithStorage(t, memst, duration) } @@ -46,7 +44,7 @@ func stressTestWithStorage(t *testing.T, st blob.Storage, duration time.Duration Encryption: "AES-256-CTR", MaxPackSize: 20000000, MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, - }, content.CachingOptions{}, content.ManagerOptions{}) + }, nil, content.ManagerOptions{}) } seed0 := time.Now().Nanosecond()