diff --git a/Makefile b/Makefile index 268f1853d..c24843fef 100644 --- a/Makefile +++ b/Makefile @@ -165,10 +165,10 @@ test-with-coverage-pkgonly: $(GO_TEST) -count=1 -coverprofile=tmp.cov -timeout 90s github.com/kopia/kopia/... test: - $(GO_TEST) -count=1 -timeout 90s ./... + $(GO_TEST) -count=1 -timeout 180s ./... vtest: - $(GO_TEST) -count=1 -short -v -timeout 90s ./... + $(GO_TEST) -count=1 -short -v -timeout 180s ./... dist-binary: go build -o $(KOPIA_INTEGRATION_EXE) github.com/kopia/kopia diff --git a/cli/command_cache_clear.go b/cli/command_cache_clear.go index 45453498c..857c1f24c 100644 --- a/cli/command_cache_clear.go +++ b/cli/command_cache_clear.go @@ -6,6 +6,7 @@ "github.com/pkg/errors" + "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/repo" ) @@ -17,7 +18,14 @@ func runCacheClearCommand(ctx context.Context, rep *repo.DirectRepository) error if d := rep.Content.CachingOptions.CacheDirectory; d != "" { printStderr("Clearing cache directory: %v.\n", d) - err := os.RemoveAll(d) + // close repository before removing cache + if err := rep.Close(ctx); err != nil { + return errors.Wrap(err, "unable to close repository") + } + + err := retry.WithExponentialBackoffNoValue(ctx, "delete cache", func() error { + return os.RemoveAll(d) + }, retry.Always) if err != nil { return err } diff --git a/cli/command_cache_set.go b/cli/command_cache_set.go index 2ebfc2dfe..290261bb2 100644 --- a/cli/command_cache_set.go +++ b/cli/command_cache_set.go @@ -19,7 +19,7 @@ ) func runCacheSetCommand(ctx context.Context, rep *repo.DirectRepository) error { - opts := rep.Content.CachingOptions + opts := rep.Content.CachingOptions.CloneOrDefault() changed := 0 diff --git a/cli/command_index_list.go b/cli/command_index_list.go index ca645a364..1e280493c 100644 --- a/cli/command_index_list.go +++ b/cli/command_index_list.go @@ -9,13 +9,14 @@ ) var ( - blockIndexListCommand = indexCommands.Command("list", "List content indexes").Alias("ls").Default() - blockIndexListSummary = blockIndexListCommand.Flag("summary", "Display index blob summary").Bool() - blockIndexListSort = blockIndexListCommand.Flag("sort", "Index blob sort order").Default("time").Enum("time", "size", "name") + blockIndexListCommand = indexCommands.Command("list", "List content indexes").Alias("ls").Default() + blockIndexListSummary = blockIndexListCommand.Flag("summary", "Display index blob summary").Bool() + blockIndexListIncludeSuperseded = blockIndexListCommand.Flag("superseded", "Include inactive index files superseded by compaction").Bool() + blockIndexListSort = blockIndexListCommand.Flag("sort", "Index blob sort order").Default("time").Enum("time", "size", "name") ) func runListBlockIndexesAction(ctx context.Context, rep *repo.DirectRepository) error { - blks, err := rep.Content.IndexBlobs(ctx) + blks, err := rep.Content.IndexBlobs(ctx, *blockIndexListIncludeSuperseded) if err != nil { return err } @@ -36,11 +37,11 @@ func runListBlockIndexesAction(ctx context.Context, rep *repo.DirectRepository) } for _, b := range blks { - fmt.Printf("%-70v %10v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp)) + fmt.Printf("%-40v %10v %v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp), b.Superseded) } if *blockIndexListSummary { - fmt.Printf("total %v blocks\n", len(blks)) + fmt.Printf("total %v indexes\n", len(blks)) } return nil diff --git a/internal/blobtesting/eventually_consistent.go b/internal/blobtesting/eventually_consistent.go index a0651b273..c3ce774f6 100644 --- a/internal/blobtesting/eventually_consistent.go +++ b/internal/blobtesting/eventually_consistent.go @@ -3,7 +3,9 @@ import ( "context" "io/ioutil" + "math" "math/rand" + "strings" "sync" "time" @@ -77,10 +79,12 @@ func (e *ecCacheEntry) isValid() bool { type eventuallyConsistentStorage struct { mu sync.Mutex - listDropProbability float64 + recentlyDeleted sync.Map + listSettleTime time.Duration caches []*ecFrontendCache realStorage blob.Storage + timeNow func() time.Time } func (s *eventuallyConsistentStorage) randomFrontendCache() *ecFrontendCache { @@ -163,27 +167,89 @@ func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, d func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error { s.randomFrontendCache().put(id, nil) + // capture metadata before deleting + md, err := s.realStorage.GetMetadata(ctx, id) + + if errors.Is(err, blob.ErrBlobNotFound) { + return blob.ErrBlobNotFound + } + + if err != nil { + return err + } + if err := s.realStorage.DeleteBlob(ctx, id); err != nil { return err } + md.Timestamp = s.timeNow() + s.recentlyDeleted.Store(id, md) + return nil } +func (s *eventuallyConsistentStorage) shouldApplyInconsistency(ctx context.Context, age time.Duration, desc string) bool { + if age < 0 { + age = -age + } + + if age >= s.listSettleTime { + return false + } + + x := age.Seconds() / s.listSettleTime.Seconds() // [0..1) + + // y=1-(x^0.3) is: + // about 50% probability of inconsistency after 10% of listSettleTime + // about 25% probability of inconsistency after 40% of listSettleTime + // about 10% probability of inconsistency after 67% of listSettleTime + // about 1% probability of inconsistency after 95% of listSettleTime + + const power = 0.3 + + prob := 1 - math.Pow(x, power) + + if rand.Float64() < prob { + log(ctx).Debugf("applying inconsistency %v (probability %v)", desc, prob) + return true + } + + return false +} + func (s *eventuallyConsistentStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error { - return s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { - e := s.randomFrontendCache().get(bm.BlobID) - if e != nil { - // item recently manipulated by the cache, skip from the results with some - // probability - if rand.Float64() < s.listDropProbability { - // skip callback if locally deleted - return nil - } + now := s.timeNow() + + if err := s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { + if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "hide recently created "+string(bm.BlobID)) { + return nil } return callback(bm) + }); err != nil { + return err + } + + var resultErr error + + // process recently deleted items and resurrect them with some probability + s.recentlyDeleted.Range(func(key, value interface{}) bool { + blobID := key.(blob.ID) + if !strings.HasPrefix(string(blobID), string(prefix)) { + return true + } + + bm := value.(blob.Metadata) + if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "resurrect recently deleted "+string(bm.BlobID)) { + if resultErr = callback(bm); resultErr != nil { + return false + } + } + + return true }) + + return resultErr } func (s *eventuallyConsistentStorage) Close(ctx context.Context) error { @@ -196,10 +262,11 @@ func (s *eventuallyConsistentStorage) ConnectionInfo() blob.ConnectionInfo { // NewEventuallyConsistentStorage returns an eventually-consistent storage wrapper on top // of provided storage. -func NewEventuallyConsistentStorage(st blob.Storage, listDropProbability float64) blob.Storage { +func NewEventuallyConsistentStorage(st blob.Storage, listSettleTime time.Duration, timeNow func() time.Time) blob.Storage { return &eventuallyConsistentStorage{ - realStorage: st, - caches: make([]*ecFrontendCache, 4), - listDropProbability: listDropProbability, + realStorage: st, + caches: make([]*ecFrontendCache, 4), + listSettleTime: listSettleTime, + timeNow: timeNow, } } diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 5cd964b03..3395468ee 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -69,10 +69,6 @@ func (s *mapStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes) e s.mutex.Lock() defer s.mutex.Unlock() - if _, ok := s.data[id]; ok { - return nil - } - s.keyTime[id] = s.timeNow() var b bytes.Buffer diff --git a/internal/testlogging/ctx.go b/internal/testlogging/ctx.go index 25a05a5d1..ae6567228 100644 --- a/internal/testlogging/ctx.go +++ b/internal/testlogging/ctx.go @@ -86,3 +86,17 @@ func ContextWithLevel(t testingT, level Level) context.Context { return &testLogger{t, "[" + module + "] ", level} }) } + +// ContextWithLevelAndPrefix returns a context with attached logger that emits all log entries with given log level or above. +func ContextWithLevelAndPrefix(t testingT, level Level, prefix string) context.Context { + return logging.WithLogger(context.Background(), func(module string) logging.Logger { + return &testLogger{t, "[" + module + "] " + prefix, level} + }) +} + +// ContextWithLevelAndPrefixFunc returns a context with attached logger that emits all log entries with given log level or above. +func ContextWithLevelAndPrefixFunc(t testingT, level Level, prefixFunc func() string) context.Context { + return logging.WithLogger(context.Background(), func(module string) logging.Logger { + return &testLogger{t, "[" + module + "] " + prefixFunc(), level} + }) +} diff --git a/repo/blob/logging/logging_storage.go b/repo/blob/logging/logging_storage.go index ae1647c59..f41e1e032 100644 --- a/repo/blob/logging/logging_storage.go +++ b/repo/blob/logging/logging_storage.go @@ -22,9 +22,9 @@ func (s *loggingStorage) GetBlob(ctx context.Context, id blob.ID, offset, length dt := time.Since(t0) if len(result) < maxLoggedBlobLength { - s.printf(s.prefix+"GetBlob(%q,%v,%v)=(%#v, %#v) took %v", id, offset, length, result, err, dt) + s.printf(s.prefix+"GetBlob(%q,%v,%v)=(%v, %#v) took %v", id, offset, length, result, err, dt) } else { - s.printf(s.prefix+"GetBlob(%q,%v,%v)=({%#v bytes}, %#v) took %v", id, offset, length, len(result), err, dt) + s.printf(s.prefix+"GetBlob(%q,%v,%v)=({%v bytes}, %#v) took %v", id, offset, length, len(result), err, dt) } return result, err @@ -35,7 +35,7 @@ func (s *loggingStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Meta result, err := s.base.GetMetadata(ctx, id) dt := time.Since(t0) - s.printf(s.prefix+"GetMetadata(%q)=(%#v, %#v) took %v", id, result, err, dt) + s.printf(s.prefix+"GetMetadata(%q)=(%v, %#v) took %v", id, result, err, dt) return result, err } diff --git a/repo/blob/storage.go b/repo/blob/storage.go index 07ff2244d..43a51e193 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -2,6 +2,7 @@ import ( "context" + "encoding/json" "io" "sync" "time" @@ -62,9 +63,14 @@ type Storage interface { // Metadata represents metadata about a single BLOB in a storage. type Metadata struct { - BlobID ID - Length int64 - Timestamp time.Time + BlobID ID `json:"id"` + Length int64 `json:"length"` + Timestamp time.Time `json:"timestamp"` +} + +func (m *Metadata) String() string { + b, _ := json.Marshal(m) + return string(b) } // ErrBlobNotFound is returned when a BLOB cannot be found in storage. @@ -123,55 +129,3 @@ func IterateAllPrefixesInParallel(ctx context.Context, parallelism int, st Stora // return first error or nil return <-errch } - -// ListAllBlobsConsistent lists all blobs with given name prefix in the provided storage until the results are -// consistent. The results are consistent if the list result fetched twice is identical. This guarantees that while -// the first scan was in progress, no new blob was added or removed. -// maxAttempts specifies maximum number of list attempts (must be >= 2) -func ListAllBlobsConsistent(ctx context.Context, st Storage, prefix ID, maxAttempts int) ([]Metadata, error) { - var previous []Metadata - - for i := 0; i < maxAttempts; i++ { - result, err := ListAllBlobs(ctx, st, prefix) - if err != nil { - return nil, err - } - - if i > 0 && sameBlobs(result, previous) { - return result, nil - } - - previous = result - } - - return nil, errors.Errorf("unable to achieve consistent snapshot despite %v attempts", maxAttempts) -} - -// sameBlobs returns true if b1 & b2 contain the same blobs (ignoring order). -func sameBlobs(b1, b2 []Metadata) bool { - if len(b1) != len(b2) { - return false - } - - m := map[ID]Metadata{} - - for _, b := range b1 { - m[b.BlobID] = normalizeMetadata(b) - } - - for _, b := range b2 { - if r := m[b.BlobID]; r != normalizeMetadata(b) { - return false - } - } - - return true -} - -func normalizeMetadata(m Metadata) Metadata { - return Metadata{m.BlobID, m.Length, normalizeTimestamp(m.Timestamp)} -} - -func normalizeTimestamp(t time.Time) time.Time { - return time.Unix(0, t.UnixNano()) -} diff --git a/repo/blob/storage_test.go b/repo/blob/storage_test.go deleted file mode 100644 index bf001d4c9..000000000 --- a/repo/blob/storage_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package blob_test - -import ( - "testing" - "time" - - "github.com/kopia/kopia/internal/blobtesting" - "github.com/kopia/kopia/internal/gather" - "github.com/kopia/kopia/internal/testlogging" - "github.com/kopia/kopia/repo/blob" -) - -func TestListAllBlobsConsistent(t *testing.T) { - ctx := testlogging.Context(t) - data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, time.Now) - st.PutBlob(ctx, "foo1", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - st.PutBlob(ctx, "foo2", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - st.PutBlob(ctx, "foo3", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - - // set up faulty storage that will add a blob while a scan is in progress. - f := &blobtesting.FaultyStorage{ - Base: st, - Faults: map[string][]*blobtesting.Fault{ - "ListBlobsItem": { - {ErrCallback: func() error { - st.PutBlob(ctx, "foo0", gather.FromSlice([]byte{1, 2, 3})) //nolint:errcheck - return nil - }}, - }, - }, - } - - r, err := blob.ListAllBlobsConsistent(ctx, f, "foo", 3) - if err != nil { - t.Fatalf("error: %v", err) - } - - // make sure we get the list with 4 items, not 3. - if got, want := len(r), 4; got != want { - t.Errorf("unexpected list result count: %v, want %v", got, want) - } -} - -func TestListAllBlobsConsistentEmpty(t *testing.T) { - ctx := testlogging.Context(t) - data := blobtesting.DataMap{} - st := blobtesting.NewMapStorage(data, nil, time.Now) - - r, err := blob.ListAllBlobsConsistent(ctx, st, "foo", 3) - if err != nil { - t.Fatalf("error: %v", err) - } - - if got, want := len(r), 0; got != want { - t.Errorf("unexpected list result count: %v, want %v", got, want) - } -} diff --git a/repo/connect.go b/repo/connect.go index 77823e083..073cb2745 100644 --- a/repo/connect.go +++ b/repo/connect.go @@ -63,7 +63,7 @@ func Connect(ctx context.Context, configFile string, st blob.Storage, password s lc.Username = getDefaultUserName(ctx) } - if err = setupCaching(ctx, configFile, &lc, opt.CachingOptions, f.UniqueID); err != nil { + if err = setupCaching(ctx, configFile, &lc, &opt.CachingOptions, f.UniqueID); err != nil { return errors.Wrap(err, "unable to set up caching") } @@ -107,7 +107,9 @@ func verifyConnect(ctx context.Context, configFile, password string, persist boo return r.Close(ctx) } -func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt content.CachingOptions, uniqueID []byte) error { +func setupCaching(ctx context.Context, configPath string, lc *LocalConfig, opt *content.CachingOptions, uniqueID []byte) error { + opt = opt.CloneOrDefault() + if opt.MaxCacheSizeBytes == 0 { lc.Caching = &content.CachingOptions{} return nil diff --git a/repo/content/builder.go b/repo/content/builder.go index a942158e8..27b517b7b 100644 --- a/repo/content/builder.go +++ b/repo/content/builder.go @@ -2,6 +2,7 @@ import ( "bufio" + "crypto/rand" "encoding/binary" "io" "sort" @@ -16,6 +17,7 @@ deletedMarker = 0x80000000 entryFixedHeaderLength = 20 + randomSuffixSize = 32 ) // packIndexBuilder prepares and writes content index. @@ -107,6 +109,15 @@ func (b packIndexBuilder) Build(output io.Writer) error { return errors.Wrap(err, "error writing extra data") } + randomSuffix := make([]byte, randomSuffixSize) + if _, err := rand.Read(randomSuffix); err != nil { + return errors.Wrap(err, "error getting random bytes for suffix") + } + + if _, err := w.Write(randomSuffix); err != nil { + return errors.Wrap(err, "error writing extra random suffix to ensure indexes are always globally unique") + } + return w.Flush() } diff --git a/repo/content/caching_options.go b/repo/content/caching_options.go index 7ffb5a853..b633b2ac6 100644 --- a/repo/content/caching_options.go +++ b/repo/content/caching_options.go @@ -6,6 +6,18 @@ type CachingOptions struct { MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"` MaxMetadataCacheSizeBytes int64 `json:"maxMetadataCacheSize,omitempty"` MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"` - IgnoreListCache bool `json:"-"` HMACSecret []byte `json:"-"` + + ownWritesCache ownWritesCache +} + +// CloneOrDefault returns a clone of the caching options or empty options for nil. +func (c *CachingOptions) CloneOrDefault() *CachingOptions { + if c == nil { + return &CachingOptions{} + } + + c2 := *c + + return &c2 } diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 4e4d5f235..a772f700a 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -133,7 +133,20 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b return true, nil } -func newCommittedContentIndex(caching CachingOptions) *committedContentIndex { +func (b *committedContentIndex) close() error { + b.mu.Lock() + defer b.mu.Unlock() + + for _, pi := range b.inUse { + if err := pi.Close(); err != nil { + return errors.Wrap(err, "unable to close index") + } + } + + return nil +} + +func newCommittedContentIndex(caching *CachingOptions) *committedContentIndex { var cache committedContentIndexCache if caching.CacheDirectory != "" { diff --git a/repo/content/content_cache_base.go b/repo/content/content_cache_base.go index 0e390e848..731476702 100644 --- a/repo/content/content_cache_base.go +++ b/repo/content/content_cache_base.go @@ -82,7 +82,7 @@ func (c *cacheBase) sweepDirectoryPeriodically(ctx context.Context) { c.sweepMutexes() if err := c.sweepDirectory(ctx); err != nil { - log(ctx).Warningf("cacheBase sweep failed: %v", err) + log(ctx).Warningf("cache sweep failed: %v", err) } } } diff --git a/repo/content/content_formatter_test.go b/repo/content/content_formatter_test.go index f67722515..8a0d13e2d 100644 --- a/repo/content/content_formatter_test.go +++ b/repo/content/content_formatter_test.go @@ -104,7 +104,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp MaxPackSize: maxPackSize, MasterKey: make([]byte, 32), // zero key, does not matter Version: 1, - }, CachingOptions{}, time.Now, nil) + }, nil, time.Now, nil) if err != nil { t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error()) return diff --git a/repo/content/content_index_recovery_test.go b/repo/content/content_index_recovery_test.go index 767bfb0a4..67eac963b 100644 --- a/repo/content/content_index_recovery_test.go +++ b/repo/content/content_index_recovery_test.go @@ -24,7 +24,7 @@ func TestContentIndexRecovery(t *testing.T) { } // delete all index blobs - assertNoError(t, bm.st.ListBlobs(ctx, newIndexBlobPrefix, func(bi blob.Metadata) error { + assertNoError(t, bm.st.ListBlobs(ctx, indexBlobPrefix, func(bi blob.Metadata) error { log(ctx).Debugf("deleting %v", bi.BlobID) return bm.st.DeleteBlob(ctx, bi.BlobID) })) diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index c2e2f2aef..353916f3e 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -44,7 +44,7 @@ const ( parallelFetches = 5 // number of parallel reads goroutines flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes - newIndexBlobPrefix = "n" + indexBlobPrefix = "n" defaultMinPreambleLength = 32 defaultMaxPreambleLength = 32 defaultPaddingUnit = 4096 @@ -65,9 +65,8 @@ // IndexBlobInfo is an information about a single index blob managed by Manager. type IndexBlobInfo struct { - BlobID blob.ID - Length int64 - Timestamp time.Time + blob.Metadata + Superseded []blob.Metadata } // Manager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store. @@ -307,12 +306,12 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { data := b.Bytes() dataCopy := append([]byte(nil), data...) - indexBlobID, err := bm.writePackIndexesNew(ctx, data) + indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data) if err != nil { return err } - if err := bm.committedContents.addContent(ctx, indexBlobID, dataCopy, true); err != nil { + if err := bm.committedContents.addContent(ctx, indexBlobMD.BlobID, dataCopy, true); err != nil { return errors.Wrap(err, "unable to add committed content") } @@ -405,6 +404,10 @@ func (bm *Manager) Close(ctx context.Context) error { return errors.Wrap(err, "error flushing") } + if err := bm.committedContents.close(); err != nil { + return errors.Wrap(err, "error closed committed content index") + } + bm.contentCache.close() bm.metadataCache.close() bm.encryptionBufferPool.Close() @@ -666,7 +669,7 @@ type ManagerOptions struct { } // NewManager creates new content manager with given packing options and a formatter. -func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, options ManagerOptions) (*Manager, error) { +func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, options ManagerOptions) (*Manager, error) { nowFn := options.TimeNow if nowFn == nil { nowFn = time.Now // allow:no-inject-time @@ -675,7 +678,7 @@ func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, cach return newManagerWithOptions(ctx, st, f, caching, nowFn, options.RepositoryFormatBytes) } -func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) { +func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) { if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion { return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion) } @@ -689,43 +692,10 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp return nil, err } - dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") - if err != nil { - return nil, errors.Wrap(err, "unable to initialize data cache storage") - } - - dataCache, err := newContentCacheForData(ctx, st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize content cache") - } - - metadataCacheSize := caching.MaxMetadataCacheSizeBytes - if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 { - metadataCacheSize = caching.MaxCacheSizeBytes - } - - metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") - if err != nil { - return nil, errors.Wrap(err, "unable to initialize data cache storage") - } - - metadataCache, err := newContentCacheForMetadata(ctx, st, metadataCacheStorage, metadataCacheSize) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize metadata cache") - } - - listCache, err := newListCache(st, caching) - if err != nil { - return nil, errors.Wrap(err, "unable to initialize list cache") - } - - contentIndex := newCommittedContentIndex(caching) - mu := &sync.RWMutex{} m := &Manager{ lockFreeManager: lockFreeManager{ Format: *f, - CachingOptions: caching, timeNow: timeNow, maxPackSize: f.MaxPackSize, encryptor: encryptor, @@ -733,14 +703,10 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp minPreambleLength: defaultMinPreambleLength, maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, - contentCache: dataCache, - metadataCache: metadataCache, - listCache: listCache, st: st, repositoryFormatBytes: repositoryFormatBytes, checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "", writeFormatVersion: int32(f.Version), - committedContents: contentIndex, encryptionBufferPool: buf.NewPool(ctx, defaultEncryptionBufferPoolSegmentSize+encryptor.MaxOverhead(), "content-manager-encryption"), }, @@ -752,9 +718,76 @@ func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOp packIndexBuilder: make(packIndexBuilder), } + if err := setupCaches(ctx, m, caching); err != nil { + return nil, errors.Wrap(err, "unable to set up caches") + } + if err := m.CompactIndexes(ctx, autoCompactionOptions); err != nil { return nil, errors.Wrap(err, "error initializing content manager") } return m, nil } + +func setupCaches(ctx context.Context, m *Manager, caching *CachingOptions) error { + caching = caching.CloneOrDefault() + + dataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, caching.MaxCacheSizeBytes, "contents") + if err != nil { + return errors.Wrap(err, "unable to initialize data cache storage") + } + + dataCache, err := newContentCacheForData(ctx, m.st, dataCacheStorage, caching.MaxCacheSizeBytes, caching.HMACSecret) + if err != nil { + return errors.Wrap(err, "unable to initialize content cache") + } + + metadataCacheSize := caching.MaxMetadataCacheSizeBytes + if metadataCacheSize == 0 && caching.MaxCacheSizeBytes > 0 { + metadataCacheSize = caching.MaxCacheSizeBytes + } + + metadataCacheStorage, err := newCacheStorageOrNil(ctx, caching.CacheDirectory, metadataCacheSize, "metadata") + if err != nil { + return errors.Wrap(err, "unable to initialize data cache storage") + } + + metadataCache, err := newContentCacheForMetadata(ctx, m.st, metadataCacheStorage, metadataCacheSize) + if err != nil { + return errors.Wrap(err, "unable to initialize metadata cache") + } + + listCache, err := newListCache(m.st, caching) + if err != nil { + return errors.Wrap(err, "unable to initialize list cache") + } + + if caching.ownWritesCache == nil { + // this is test hook to allow test to specify custom cache + caching.ownWritesCache, err = newOwnWritesCache(ctx, caching, m.timeNow) + if err != nil { + return errors.Wrap(err, "unable to initialize own writes cache") + } + } + + contentIndex := newCommittedContentIndex(caching) + + // once everything is ready, set it up + m.CachingOptions = *caching + m.contentCache = dataCache + m.metadataCache = metadataCache + m.committedContents = contentIndex + + m.indexBlobManager = &indexBlobManagerImpl{ + st: m.st, + encryptor: m.encryptor, + hasher: m.hasher, + timeNow: m.timeNow, + ownWritesCache: caching.ownWritesCache, + listCache: listCache, + indexBlobCache: metadataCache, + maxEventualConsistencySettleTime: defaultEventualConsistencySettleTime, + } + + return nil +} diff --git a/repo/content/block_manager_compaction.go b/repo/content/content_manager_indexes.go similarity index 75% rename from repo/content/block_manager_compaction.go rename to repo/content/content_manager_indexes.go index 8a6912bab..043458b18 100644 --- a/repo/content/block_manager_compaction.go +++ b/repo/content/content_manager_indexes.go @@ -6,6 +6,8 @@ "time" "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" ) const verySmallContentFraction = 20 // blobs less than 1/verySmallContentFraction of maxPackSize are considered 'very small' @@ -87,15 +89,18 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs [] return nil } - formatLog(ctx).Debugf("compacting %v contents", len(indexBlobs)) + formatLog(ctx).Debugf("compacting %v index blobs", len(indexBlobs)) - t0 := time.Now() // allow:no-inject-time bld := make(packIndexBuilder) + var inputs, outputs []blob.Metadata + for _, indexBlob := range indexBlobs { if err := bm.addIndexBlobsToBuilder(ctx, bld, indexBlob, opt); err != nil { - return err + return errors.Wrap(err, "error adding index to builder") } + + inputs = append(inputs, indexBlob.Metadata) } var buf bytes.Buffer @@ -103,32 +108,33 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs [] return errors.Wrap(err, "unable to build an index") } - compactedIndexBlob, err := bm.writePackIndexesNew(ctx, buf.Bytes()) + compactedIndexBlob, err := bm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes()) if err != nil { return errors.Wrap(err, "unable to write compacted indexes") } - formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0)) // allow:no-inject-time - + // compaction wrote index blob that's the same as one of the sources + // it must be a no-op. for _, indexBlob := range indexBlobs { - if indexBlob.BlobID == compactedIndexBlob { - continue + if indexBlob.BlobID == compactedIndexBlob.BlobID { + formatLog(ctx).Debugf("compaction was a no-op") + return nil } + } - bm.listCache.deleteListCache() + outputs = append(outputs, compactedIndexBlob) - if err := bm.st.DeleteBlob(ctx, indexBlob.BlobID); err != nil { - log(ctx).Warningf("unable to delete compacted blob %q: %v", indexBlob.BlobID, err) - } + if err := bm.indexBlobManager.registerCompaction(ctx, inputs, outputs); err != nil { + return errors.Wrap(err, "unable to register compaction") } return nil } func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo, opt CompactOptions) error { - data, err := bm.getIndexBlobInternal(ctx, indexBlob.BlobID) + data, err := bm.indexBlobManager.getIndexBlob(ctx, indexBlob.BlobID) if err != nil { - return err + return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID) } index, err := openPackIndex(bytes.NewReader(data)) @@ -147,3 +153,17 @@ func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuil return nil } + +func addBlobsToIndex(ndx map[blob.ID]*IndexBlobInfo, blobs []blob.Metadata) { + for _, it := range blobs { + if ndx[it.BlobID] == nil { + ndx[it.BlobID] = &IndexBlobInfo{ + Metadata: blob.Metadata{ + BlobID: it.BlobID, + Length: it.Length, + Timestamp: it.Timestamp, + }, + } + } + } +} diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index c23a55359..847bf28b9 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -7,7 +7,6 @@ cryptorand "crypto/rand" "encoding/hex" "io" - "strings" "sync" "time" @@ -25,11 +24,11 @@ type lockFreeManager struct { // this one is not lock-free Stats Stats - listCache *listCache st blob.Storage Format FormattingOptions CachingOptions CachingOptions + indexBlobManager indexBlobManager contentCache contentCache metadataCache contentCache committedContents *committedContentIndex @@ -67,6 +66,8 @@ func (bm *lockFreeManager) maybeEncryptContentDataForPacking(output *gather.Writ return errors.Wrap(err, "unable to encrypt") } + bm.Stats.encrypted(len(data)) + output.Append(cipherText) return nil @@ -93,32 +94,32 @@ func (bm *lockFreeManager) loadPackIndexesUnlocked(ctx context.Context) ([]Index } if i > 0 { - bm.listCache.deleteListCache() + bm.indexBlobManager.flushCache() log(ctx).Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i) time.Sleep(nextSleepTime) nextSleepTime *= 2 } - contents, err := bm.listCache.listIndexBlobs(ctx) + indexBlobs, err := bm.indexBlobManager.listIndexBlobs(ctx, false) if err != nil { return nil, false, err } - err = bm.tryLoadPackIndexBlobsUnlocked(ctx, contents) + err = bm.tryLoadPackIndexBlobsUnlocked(ctx, indexBlobs) if err == nil { - var contentIDs []blob.ID - for _, b := range contents { - contentIDs = append(contentIDs, b.BlobID) + var indexBlobIDs []blob.ID + for _, b := range indexBlobs { + indexBlobIDs = append(indexBlobIDs, b.BlobID) } var updated bool - updated, err = bm.committedContents.use(ctx, contentIDs) + updated, err = bm.committedContents.use(ctx, indexBlobIDs) if err != nil { return nil, false, err } - return contents, updated, nil + return indexBlobs, updated, nil } if err != blob.ErrBlobNotFound { @@ -129,8 +130,8 @@ func (bm *lockFreeManager) loadPackIndexesUnlocked(ctx context.Context) ([]Index return nil, false, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts) } -func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, contents []IndexBlobInfo) error { - ch, unprocessedIndexesSize, err := bm.unprocessedIndexBlobsUnlocked(ctx, contents) +func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, indexBlobs []IndexBlobInfo) error { + ch, unprocessedIndexesSize, err := bm.unprocessedIndexBlobsUnlocked(ctx, indexBlobs) if err != nil { return err } @@ -152,7 +153,7 @@ func (bm *lockFreeManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, co defer wg.Done() for indexBlobID := range ch { - data, err := bm.getIndexBlobInternal(ctx, indexBlobID) + data, err := bm.indexBlobManager.getIndexBlob(ctx, indexBlobID) if err != nil { errch <- err return @@ -317,37 +318,8 @@ func (bm *lockFreeManager) preparePackDataContent(ctx context.Context, pp *pendi } // IndexBlobs returns the list of active index blobs. -func (bm *lockFreeManager) IndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) { - return bm.listCache.listIndexBlobs(ctx) -} - -func (bm *lockFreeManager) getIndexBlobInternal(ctx context.Context, blobID blob.ID) ([]byte, error) { - payload, err := bm.metadataCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) - if err != nil { - return nil, err - } - - iv, err := getIndexBlobIV(blobID) - if err != nil { - return nil, err - } - - bm.Stats.readContent(len(payload)) - - payload, err = bm.encryptor.Decrypt(nil, payload, iv) - bm.Stats.decrypted(len(payload)) - - if err != nil { - return nil, errors.Wrap(err, "decrypt error") - } - - // Since the encryption key is a function of data, we must be able to generate exactly the same key - // after decrypting the content. This serves as a checksum. - if err := bm.verifyChecksum(payload, iv); err != nil { - return nil, err - } - - return payload, nil +func (bm *lockFreeManager) IndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) { + return bm.indexBlobManager.listIndexBlobs(ctx, includeInactive) } func getPackedContentIV(output []byte, contentID ID) ([]byte, error) { @@ -359,49 +331,12 @@ func getPackedContentIV(output []byte, contentID ID) ([]byte, error) { return output[0:n], nil } -func getIndexBlobIV(s blob.ID) ([]byte, error) { - if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic - s = s[0:p] - } - - return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):])) -} - func (bm *lockFreeManager) writePackFileNotLocked(ctx context.Context, packFile blob.ID, data gather.Bytes) error { bm.Stats.wroteContent(data.Length()) - bm.listCache.deleteListCache() return bm.st.PutBlob(ctx, packFile, data) } -func (bm *lockFreeManager) encryptAndWriteBlobNotLocked(ctx context.Context, data []byte, prefix blob.ID) (blob.ID, error) { - var hashOutput [maxHashSize]byte - - hash := bm.hashData(hashOutput[:0], data) - blobID := prefix + blob.ID(hex.EncodeToString(hash)) - - iv, err := getIndexBlobIV(blobID) - if err != nil { - return "", err - } - - bm.Stats.encrypted(len(data)) - - data2, err := bm.encryptor.Encrypt(nil, data, iv) - if err != nil { - return "", err - } - - bm.Stats.wroteContent(len(data2)) - bm.listCache.deleteListCache() - - if err := bm.st.PutBlob(ctx, blobID, gather.FromSlice(data2)); err != nil { - return "", err - } - - return blobID, nil -} - func (bm *lockFreeManager) hashData(output, data []byte) []byte { // Hash the content and compute encryption key. contentID := bm.hasher(output, data) @@ -410,10 +345,6 @@ func (bm *lockFreeManager) hashData(output, data []byte) []byte { return contentID } -func (bm *lockFreeManager) writePackIndexesNew(ctx context.Context, data []byte) (blob.ID, error) { - return bm.encryptAndWriteBlobNotLocked(ctx, data, newIndexBlobPrefix) -} - func (bm *lockFreeManager) verifyChecksum(data, contentID []byte) error { var hashOutput [maxHashSize]byte diff --git a/repo/content/content_manager_own_writes.go b/repo/content/content_manager_own_writes.go new file mode 100644 index 000000000..ca5a2cc82 --- /dev/null +++ b/repo/content/content_manager_own_writes.go @@ -0,0 +1,194 @@ +package content + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/filesystem" +) + +const ownWritesCacheRetention = 15 * time.Minute + +type ownWritesCache interface { + add(ctx context.Context, mb blob.Metadata) error + merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) + delete(ctx context.Context, md blob.ID) error +} + +// nullOwnWritesCache is an implementation of ownWritesCache that ignores all changes. +type nullOwnWritesCache struct { +} + +func (n *nullOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error { + return nil +} + +func (n *nullOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error { + return nil +} + +func (n *nullOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) { + return source, nil +} + +// memoryOwnWritesCache is an implementation of ownWritesCache that caches in memory. +type memoryOwnWritesCache struct { + entries sync.Map + timeNow func() time.Time +} + +func (n *memoryOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error { + log(ctx).Debugf("adding %v to own-writes cache", mb.BlobID) + n.entries.Store(mb.BlobID, mb) + + return nil +} + +func (n *memoryOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error { + return n.add(ctx, blob.Metadata{ + BlobID: blobID, + Length: -1, + Timestamp: n.timeNow(), + }) +} + +func (n *memoryOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) { + var result []blob.Metadata + + n.entries.Range(func(key, value interface{}) bool { + md := value.(blob.Metadata) + if !strings.HasPrefix(string(md.BlobID), string(prefix)) { + return true + } + + if age := n.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention { + result = append(result, md) + } else { + log(ctx).Debugf("deleting stale own writes cache entry: %v (%v)", key, age) + + n.entries.Delete(key) + } + + return true + }) + + return mergeOwnWrites(ctx, source, result), nil +} + +// persistentOwnWritesCache is an implementation of ownWritesCache that caches entries to strongly consistent blob storage. +type persistentOwnWritesCache struct { + st blob.Storage + timeNow func() time.Time +} + +func (d *persistentOwnWritesCache) add(ctx context.Context, mb blob.Metadata) error { + j, err := json.Marshal(mb) + if err != nil { + return errors.Wrap(err, "unable to marshal JSON") + } + + return d.st.PutBlob(ctx, mb.BlobID, gather.FromSlice(j)) +} + +func (d *persistentOwnWritesCache) merge(ctx context.Context, prefix blob.ID, source []blob.Metadata) ([]blob.Metadata, error) { + var myWrites []blob.Metadata + + err := d.st.ListBlobs(ctx, prefix, func(md blob.Metadata) error { + b, err := d.st.GetBlob(ctx, md.BlobID, 0, -1) + if err == blob.ErrBlobNotFound { + return nil + } + + if err != nil { + return errors.Wrapf(err, "error reading own write cache entry %v", md.BlobID) + } + + var originalMD blob.Metadata + + if err := json.Unmarshal(b, &originalMD); err != nil { + return errors.Wrapf(err, "error unmarshaling own write cache entry %v", md.BlobID) + } + + // note that we're assuming that time scale used by timeNow() is the same as used by + // cache storage, which is fine, since the cache is local and not on remote FS. + if age := d.timeNow().Sub(md.Timestamp); age < ownWritesCacheRetention { + myWrites = append(myWrites, originalMD) + } else { + log(ctx).Debugf("deleting blob %v from own-write cache because it's too old: %v (%v)", md.BlobID, age, originalMD.Timestamp) + + if err := d.st.DeleteBlob(ctx, md.BlobID); err != nil && err != blob.ErrBlobNotFound { + return errors.Wrap(err, "error deleting stale blob") + } + } + + return nil + }) + + return mergeOwnWrites(ctx, source, myWrites), err +} + +func (d *persistentOwnWritesCache) delete(ctx context.Context, blobID blob.ID) error { + return d.add(ctx, blob.Metadata{ + BlobID: blobID, + Length: -1, + Timestamp: d.timeNow(), + }) +} + +func mergeOwnWrites(ctx context.Context, source, own []blob.Metadata) []blob.Metadata { + m := map[blob.ID]blob.Metadata{} + + for _, v := range source { + m[v.BlobID] = v + } + + for _, v := range own { + if v.Length < 0 { + delete(m, v.BlobID) + } else { + m[v.BlobID] = v + } + } + + var s []blob.Metadata + + for _, v := range m { + s = append(s, v) + } + + log(ctx).Debugf("merged %v backend blobs and %v local blobs into %v", source, own, s) + + return s +} + +func newOwnWritesCache(ctx context.Context, caching *CachingOptions, timeNow func() time.Time) (ownWritesCache, error) { + if caching.CacheDirectory == "" { + return &memoryOwnWritesCache{timeNow: timeNow}, nil + } + + dirname := filepath.Join(caching.CacheDirectory, "own-writes") + + if err := os.MkdirAll(dirname, 0700); err != nil { + return nil, errors.Wrap(err, "unable to create own writes cache directory") + } + + st, err := filesystem.New(ctx, &filesystem.Options{ + Path: dirname, + DirectoryShards: []int{}, + }) + + if err != nil { + return nil, errors.Wrap(err, "unable to create own writes cache storage") + } + + return &persistentOwnWritesCache{st, timeNow}, nil +} diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 02ad91a8d..d24fe23f5 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -22,6 +22,7 @@ "github.com/kopia/kopia/internal/faketime" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/logging" ) const ( @@ -202,7 +203,7 @@ func TestContentManagerEmpty(t *testing.T) { func verifyActiveIndexBlobCount(ctx context.Context, t *testing.T, bm *Manager, expected int) { t.Helper() - blks, err := bm.IndexBlobs(ctx) + blks, err := bm.IndexBlobs(ctx, false) if err != nil { t.Errorf("error listing active index blobs: %v", err) return @@ -319,7 +320,7 @@ func TestContentManagerFailedToWritePack(t *testing.T) { MaxPackSize: maxPackSize, HMACSecret: []byte("foo"), MasterKey: []byte("0123456789abcdef0123456789abcdef"), - }, CachingOptions{}, faketime.Frozen(fakeTime), nil) + }, nil, faketime.Frozen(fakeTime), nil) if err != nil { t.Fatalf("can't create bm: %v", err) } @@ -410,17 +411,13 @@ func TestContentManagerConcurrency(t *testing.T) { verifyContent(ctx, t, bm4, bm2content, seededRandomData(32, 100)) verifyContent(ctx, t, bm4, bm3content, seededRandomData(33, 100)) - if got, want := getIndexCount(data), 4; got != want { - t.Errorf("unexpected index count before compaction: %v, wanted %v", got, want) - } + validateIndexCount(t, data, 4, 0) if err := bm4.CompactIndexes(ctx, CompactOptions{MaxSmallBlobs: 1}); err != nil { t.Errorf("compaction error: %v", err) } - if got, want := getIndexCount(data), 1; got != want { - t.Errorf("unexpected index count after compaction: %v, wanted %v", got, want) - } + validateIndexCount(t, data, 5, 1) // new content manager at this point can see all data. bm5 := newTestContentManager(t, data, keyTime, nil) @@ -437,6 +434,30 @@ func TestContentManagerConcurrency(t *testing.T) { } } +func validateIndexCount(t *testing.T, data map[blob.ID][]byte, wantIndexCount, wantCompactionLogCount int) { + t.Helper() + + var indexCnt, compactionLogCnt int + + for blobID := range data { + if strings.HasPrefix(string(blobID), indexBlobPrefix) { + indexCnt++ + } + + if strings.HasPrefix(string(blobID), compactionLogBlobPrefix) { + compactionLogCnt++ + } + } + + if got, want := indexCnt, wantIndexCount; got != want { + t.Fatalf("unexpected index blob count %v, want %v", got, want) + } + + if got, want := compactionLogCnt, wantCompactionLogCount; got != want { + t.Fatalf("unexpected compaction log blob count %v, want %v", got, want) + } +} + func TestDeleteContent(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} @@ -1694,6 +1715,82 @@ func verifyVersionCompat(t *testing.T, writeVersion int) { verifyContentManagerDataSet(ctx, t, mgr, dataSet) } +func TestReadsOwnWritesWithEventualConsistencyPersistentOwnWritesCache(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + cacheData := blobtesting.DataMap{} + cacheKeyTime := map[blob.ID]time.Time{} + cacheSt := blobtesting.NewMapStorage(cacheData, cacheKeyTime, timeNow) + ecst := blobtesting.NewEventuallyConsistentStorage( + logging.NewWrapper(st, t.Logf, "[STORAGE] "), + 3*time.Second, + timeNow) + + // disable own writes cache, will still be ok if store is strongly consistent + verifyReadsOwnWrites(t, ecst, timeNow, &persistentOwnWritesCache{ + st: cacheSt, + timeNow: timeNow, + }) +} + +func TestReadsOwnWritesWithStrongConsistencyAndNoCaching(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + + // if we used nullOwnWritesCache and eventual consistency, the test would fail + // st = blobtesting.NewEventuallyConsistentStorage(logging.NewWrapper(st, t.Logf, "[STORAGE] "), 0.1) + + // disable own writes cache, will still be ok if store is strongly consistent + verifyReadsOwnWrites(t, st, timeNow, &nullOwnWritesCache{}) +} + +func TestReadsOwnWritesWithEventualConsistencyInMemoryOwnWritesCache(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + ecst := blobtesting.NewEventuallyConsistentStorage( + logging.NewWrapper(st, t.Logf, "[STORAGE] "), + 3*time.Second, + timeNow) + + verifyReadsOwnWrites(t, ecst, timeNow, &memoryOwnWritesCache{timeNow: timeNow}) +} + +func verifyReadsOwnWrites(t *testing.T, st blob.Storage, timeNow func() time.Time, sharedOwnWritesCache ownWritesCache) { + ctx := testlogging.Context(t) + cachingOptions := &CachingOptions{ + ownWritesCache: sharedOwnWritesCache, + } + + bm := newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow) + + ids := make([]ID, 100) + for i := 0; i < len(ids); i++ { + ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2)) + + for j := 0; j < i; j++ { + // verify all contents written so far + verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2)) + } + + // every 10 contents, create new content manager + if i%10 == 0 { + t.Logf("------- reopening -----") + must(t, bm.Close(ctx)) + bm = newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow) + } + } + + must(t, bm.Close(ctx)) + bm = newTestContentManagerWithStorageAndCaching(t, st, cachingOptions, timeNow) + + for i := 0; i < len(ids); i++ { + verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2)) + } +} + func verifyContentManagerDataSet(ctx context.Context, t *testing.T, mgr *Manager, dataSet map[ID][]byte) { for contentID, originalPayload := range dataSet { v, err := mgr.GetContent(ctx, contentID) @@ -1714,6 +1811,10 @@ func newTestContentManager(t *testing.T, data blobtesting.DataMap, keyTime map[b } func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc func() time.Time) *Manager { + return newTestContentManagerWithStorageAndCaching(t, st, nil, timeFunc) +} + +func newTestContentManagerWithStorageAndCaching(t *testing.T, st blob.Storage, co *CachingOptions, timeFunc func() time.Time) *Manager { if timeFunc == nil { timeFunc = faketime.AutoAdvance(fakeTime, 1*time.Second) } @@ -1724,7 +1825,7 @@ func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc fu HMACSecret: hmacSecret, MaxPackSize: maxPackSize, Version: 1, - }, CachingOptions{}, timeFunc, nil) + }, co, timeFunc, nil) if err != nil { panic("can't create content manager: " + err.Error()) } @@ -1734,18 +1835,6 @@ func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc fu return bm } -func getIndexCount(d blobtesting.DataMap) int { - var cnt int - - for blobID := range d { - if strings.HasPrefix(string(blobID), newIndexBlobPrefix) { - cnt++ - } - } - - return cnt -} - func verifyContentNotFound(ctx context.Context, t *testing.T, bm *Manager, contentID ID) { t.Helper() @@ -1760,7 +1849,7 @@ func verifyContent(ctx context.Context, t *testing.T, bm *Manager, contentID ID, b2, err := bm.GetContent(ctx, contentID) if err != nil { - t.Errorf("unable to read content %q: %v", contentID, err) + t.Fatalf("unable to read content %q: %v", contentID, err) return } @@ -1893,3 +1982,11 @@ func getContentInfo(t *testing.T, bm *Manager, c ID) Info { return i } + +func must(t *testing.T, err error) { + t.Helper() + + if err != nil { + t.Fatal(err) + } +} diff --git a/repo/content/index_blob_manager.go b/repo/content/index_blob_manager.go new file mode 100644 index 000000000..4d3e30dcc --- /dev/null +++ b/repo/content/index_blob_manager.go @@ -0,0 +1,462 @@ +package content + +import ( + "bytes" + "context" + "crypto/aes" + "encoding/hex" + "encoding/json" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" +) + +// indexBlobManager is the API of index blob manager as used by content manager. +type indexBlobManager interface { + writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) + listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) + getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) + registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error + flushCache() +} + +const ( + compactionLogBlobPrefix = "m" + cleanupBlobPrefix = "l" + defaultEventualConsistencySettleTime = 1 * time.Hour +) + +// compactionLogEntry represents contents of compaction log entry stored in `m` blob. +type compactionLogEntry struct { + // list of input blob names that were compacted together. + InputMetadata []blob.Metadata `json:"inputMetadata"` + + // list of blobs that are results of compaction. + OutputMetadata []blob.Metadata `json:"outputMetadata"` + + // Metadata of the compaction blob itself, not serialized. + metadata blob.Metadata +} + +// cleanupEntry represents contents of cleanup entry stored in `l` blob. +type cleanupEntry struct { + BlobID blob.ID `json:"blobID"` + + age time.Duration // not serialized, computed on load +} + +type indexBlobManagerImpl struct { + st blob.Storage + hasher hashing.HashFunc + encryptor encryption.Encryptor + listCache *listCache + ownWritesCache ownWritesCache + timeNow func() time.Time + indexBlobCache contentCache + maxEventualConsistencySettleTime time.Duration +} + +func (m *indexBlobManagerImpl) listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) { + compactionLogMetadata, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix) + if err != nil { + return nil, errors.Wrap(err, "error listing compaction log entries") + } + + compactionLogMetadata, err = m.ownWritesCache.merge(ctx, compactionLogBlobPrefix, compactionLogMetadata) + if err != nil { + return nil, errors.Wrap(err, "error merging local writes for compaction log entries") + } + + storageIndexBlobs, err := m.listCache.listBlobs(ctx, indexBlobPrefix) + if err != nil { + return nil, errors.Wrap(err, "error listing index blobs") + } + + storageIndexBlobs, err = m.ownWritesCache.merge(ctx, indexBlobPrefix, storageIndexBlobs) + if err != nil { + return nil, errors.Wrap(err, "error merging local writes for index blobs") + } + + indexMap := map[blob.ID]*IndexBlobInfo{} + addBlobsToIndex(indexMap, storageIndexBlobs) + + compactionLogs, err := m.getCompactionLogEntries(ctx, compactionLogMetadata) + if err != nil { + return nil, errors.Wrap(err, "error reading compaction log") + } + + // remove entries from indexMap that have been compacted and replaced by other indexes. + removeCompactedIndexes(ctx, indexMap, compactionLogs, includeInactive) + + var results []IndexBlobInfo + for _, v := range indexMap { + results = append(results, *v) + } + + return results, nil +} + +func (m *indexBlobManagerImpl) flushCache() { + m.listCache.deleteListCache(indexBlobPrefix) + m.listCache.deleteListCache(compactionLogBlobPrefix) +} + +func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error { + logEntryBytes, err := json.Marshal(&compactionLogEntry{ + InputMetadata: inputs, + OutputMetadata: outputs, + }) + if err != nil { + return errors.Wrap(err, "unable to marshal log entry bytes") + } + + compactionLogBlobMetadata, err := m.encryptAndWriteBlob(ctx, logEntryBytes, compactionLogBlobPrefix) + if err != nil { + return errors.Wrap(err, "unable to write compaction log") + } + + formatLog(ctx).Debugf("compacted indexes %v into %v and wrote log %v", inputs, outputs, compactionLogBlobMetadata) + + if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata); err != nil { + return errors.Wrap(err, "error deleting old index blobs") + } + + return nil +} + +func (m *indexBlobManagerImpl) getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) { + return m.getEncryptedBlob(ctx, blobID) +} + +func (m *indexBlobManagerImpl) getEncryptedBlob(ctx context.Context, blobID blob.ID) ([]byte, error) { + payload, err := m.indexBlobCache.getContent(ctx, cacheKey(blobID), blobID, 0, -1) + if err != nil { + return nil, err + } + + iv, err := getIndexBlobIV(blobID) + if err != nil { + return nil, err + } + + payload, err = m.encryptor.Decrypt(nil, payload, iv) + + if err != nil { + return nil, errors.Wrap(err, "decrypt error") + } + + // Since the encryption key is a function of data, we must be able to generate exactly the same key + // after decrypting the content. This serves as a checksum. + if err := m.verifyChecksum(payload, iv); err != nil { + return nil, err + } + + return payload, nil +} + +func (m *indexBlobManagerImpl) verifyChecksum(data, contentID []byte) error { + var hashOutput [maxHashSize]byte + + expected := m.hasher(hashOutput[:0], data) + expected = expected[len(expected)-aes.BlockSize:] + + if !bytes.HasSuffix(contentID, expected) { + return errors.Errorf("invalid checksum for blob %x, expected %x", contentID, expected) + } + + return nil +} + +func (m *indexBlobManagerImpl) writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) { + return m.encryptAndWriteBlob(ctx, data, indexBlobPrefix) +} + +func (m *indexBlobManagerImpl) encryptAndWriteBlob(ctx context.Context, data []byte, prefix blob.ID) (blob.Metadata, error) { + var hashOutput [maxHashSize]byte + + hash := m.hasher(hashOutput[:0], data) + blobID := prefix + blob.ID(hex.EncodeToString(hash)) + + iv, err := getIndexBlobIV(blobID) + if err != nil { + return blob.Metadata{}, err + } + + data2, err := m.encryptor.Encrypt(nil, data, iv) + if err != nil { + return blob.Metadata{}, err + } + + m.listCache.deleteListCache(prefix) + + err = m.st.PutBlob(ctx, blobID, gather.FromSlice(data2)) + if err != nil { + return blob.Metadata{}, err + } + + bm, err := m.st.GetMetadata(ctx, blobID) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "unable to get blob metadata") + } + + if err := m.ownWritesCache.add(ctx, bm); err != nil { + log(ctx).Warningf("unable to cache own write: %v", err) + } + + return bm, nil +} + +func (m *indexBlobManagerImpl) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) { + results := map[blob.ID]*compactionLogEntry{} + + for _, cb := range blobs { + data, err := m.getEncryptedBlob(ctx, cb.BlobID) + + if errors.Is(err, blob.ErrBlobNotFound) { + continue + } + + if err != nil { + return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID) + } + + le := &compactionLogEntry{} + + if err := json.Unmarshal(data, le); err != nil { + return nil, errors.Wrap(err, "unable to read compaction log entry %q") + } + + le.metadata = cb + + results[cb.BlobID] = le + } + + return results, nil +} + +func (m *indexBlobManagerImpl) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) { + results := map[blob.ID]*cleanupEntry{} + + for _, cb := range blobs { + data, err := m.getEncryptedBlob(ctx, cb.BlobID) + + if errors.Is(err, blob.ErrBlobNotFound) { + continue + } + + if err != nil { + return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID) + } + + le := &cleanupEntry{} + + if err := json.Unmarshal(data, le); err != nil { + return nil, errors.Wrap(err, "unable to read compaction log entry %q") + } + + le.age = latestServerBlobTime.Sub(cb.Timestamp) + + results[cb.BlobID] = le + } + + return results, nil +} + +func (m *indexBlobManagerImpl) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata) error { + allCompactionLogBlobs, err := m.listCache.listBlobs(ctx, compactionLogBlobPrefix) + if err != nil { + return errors.Wrap(err, "error listing compaction log blobs") + } + + // look for server-assigned timestamp of the compaction log entry we just wrote as a reference. + // we're assuming server-generated timestamps are somewhat reasonable and time is moving + compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-m.maxEventualConsistencySettleTime) + compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff) + + log(ctx).Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff) + + compactionBlobEntries, err := m.getCompactionLogEntries(ctx, compactionBlobs) + if err != nil { + return errors.Wrap(err, "unable to get compaction log entries") + } + + allCleanupBlobs, err := m.listCache.listBlobs(ctx, cleanupBlobPrefix) + if err != nil { + return errors.Wrap(err, "error listing cleanup blobs") + } + + cleanupEntries, err := m.getCleanupEntries(ctx, latestBlob.Timestamp, allCleanupBlobs) + if err != nil { + return errors.Wrap(err, "error loading cleanup blobs") + } + + indexBlobsToDelete := m.findIndexBlobsToDelete(ctx, latestBlob.Timestamp, compactionBlobEntries) + compactionLogBlobsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries) + + // note that we must always delete index blobs first before compaction logs + // otherwise we may inadvertedly resurrect an index blob that should have been removed. + if err := m.deleteBlobsFromStorageAndCache(ctx, indexBlobsToDelete); err != nil { + return errors.Wrap(err, "unable to delete compaction logs") + } + + compactionLogBlobsToDelayCleanup := m.findCompactionLogBlobsToDelayCleanup(ctx, compactionBlobs) + + if err := m.delayCleanupBlobs(ctx, compactionLogBlobsToDelayCleanup); err != nil { + return errors.Wrap(err, "unable to schedule delayed cleanup of blobs") + } + + if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogBlobsToDelete); err != nil { + return errors.Wrap(err, "unable to delete compaction logs") + } + + if err := m.deleteBlobsFromStorageAndCache(ctx, cleanupBlobsToDelete); err != nil { + return errors.Wrap(err, "unable to delete cleanup blobs") + } + + m.flushCache() + + return nil +} + +func (m *indexBlobManagerImpl) findIndexBlobsToDelete(ctx context.Context, latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry) []blob.ID { + tmp := map[blob.ID]bool{} + + for _, cl := range entries { + // are the input index blobs in this compaction eligble for deletion? + if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < m.maxEventualConsistencySettleTime { + log(ctx).Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, m.maxEventualConsistencySettleTime) + continue + } + + for _, b := range cl.InputMetadata { + log(ctx).Debugf("will delete old index %v compacted to %v", b, cl.OutputMetadata) + + tmp[b.BlobID] = true + } + } + + var result []blob.ID + + for k := range tmp { + result = append(result, k) + } + + return result +} + +func (m *indexBlobManagerImpl) findCompactionLogBlobsToDelayCleanup(ctx context.Context, compactionBlobs []blob.Metadata) []blob.ID { + var result []blob.ID + + for _, cb := range compactionBlobs { + log(ctx).Debugf("will delete compaction log blob %v", cb) + result = append(result, cb.BlobID) + } + + return result +} + +func (m *indexBlobManagerImpl) findBlobsToDelete(entries map[blob.ID]*cleanupEntry) (compactionLogs, cleanupBlobs []blob.ID) { + for _, e := range entries { + if e.age > m.maxEventualConsistencySettleTime { + compactionLogs = append(compactionLogs, e.BlobID) + cleanupBlobs = append(cleanupBlobs, e.BlobID) + } + } + + return +} + +func (m *indexBlobManagerImpl) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID) error { + for _, b := range blobIDs { + payload, err := json.Marshal(&cleanupEntry{ + BlobID: b, + }) + if err != nil { + return errors.Wrap(err, "unable to marshal cleanup log bytes") + } + + if _, err := m.encryptAndWriteBlob(ctx, payload, cleanupBlobPrefix); err != nil { + return errors.Wrap(err, "unable to cleanup log") + } + } + + return nil +} + +func (m *indexBlobManagerImpl) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error { + for _, blobID := range blobIDs { + if err := m.st.DeleteBlob(ctx, blobID); err != nil && err != blob.ErrBlobNotFound { + return errors.Wrapf(err, "unable to delete blob %v", blobID) + } + + if err := m.ownWritesCache.delete(ctx, blobID); err != nil { + return errors.Wrapf(err, "unable to delete blob %v from own-writes cache", blobID) + } + } + + return nil +} + +func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata { + var res []blob.Metadata + + for _, m := range m { + if !m.Timestamp.After(cutoffTime) { + res = append(res, m) + } + } + + return res +} + +func getIndexBlobIV(s blob.ID) ([]byte, error) { + if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic + s = s[0:p] + } + + return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):])) +} + +func removeCompactedIndexes(ctx context.Context, m map[blob.ID]*IndexBlobInfo, compactionLogs map[blob.ID]*compactionLogEntry, markAsSuperseded bool) { + var validCompactionLogs []*compactionLogEntry + + for _, cl := range compactionLogs { + // only process compaction logs for which we have found all the outputs. + haveAllOutputs := true + + for _, o := range cl.OutputMetadata { + if m[o.BlobID] == nil { + haveAllOutputs = false + + log(ctx).Debugf("blob %v referenced by compaction log is not found", o.BlobID) + + break + } + } + + if haveAllOutputs { + validCompactionLogs = append(validCompactionLogs, cl) + } + } + + // now remove all inputs from the set if there's a valid compaction log entry with all the outputs. + for _, cl := range validCompactionLogs { + for _, ib := range cl.InputMetadata { + if md := m[ib.BlobID]; md != nil && md.Superseded == nil { + log(ctx).Debugf("ignoring index blob %v (%v) because it's been compacted to %v", ib, md.Timestamp, cl.OutputMetadata) + + if markAsSuperseded { + md.Superseded = cl.OutputMetadata + } else { + delete(m, ib.BlobID) + } + } + } + } +} diff --git a/repo/content/index_blob_manager_test.go b/repo/content/index_blob_manager_test.go new file mode 100644 index 000000000..8d338ad79 --- /dev/null +++ b/repo/content/index_blob_manager_test.go @@ -0,0 +1,770 @@ +package content + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "os" + "strings" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/logging" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" +) + +// we use two fake time sources - one for local client and one for the remote store +// to simulate clock drift +var ( + fakeLocalStartTime = time.Date(2020, 1, 1, 14, 0, 0, 0, time.UTC) + fakeStoreStartTime = time.Date(2020, 1, 1, 10, 0, 0, 0, time.UTC) +) + +const ( + testIndexBlobDeleteAge = 1 * time.Minute + testEventualConsistencySettleTime = 45 * time.Second +) + +func TestIndexBlobManager(t *testing.T) { + cases := []struct { + storageTimeAdvanceBetweenCompactions time.Duration + wantIndexCount int + wantCompactionLogCount int + wantCleanupCount int + }{ + { + // we write 6 index blobs and 2 compaction logs + // but not enough time has passed to delete anything + storageTimeAdvanceBetweenCompactions: 0, + wantIndexCount: 6, + wantCompactionLogCount: 2, + }, + { + // we write 6 index blobs and 2 compaction logs + // enough time has passed to delete 3 indexes and create cleanup log + storageTimeAdvanceBetweenCompactions: testIndexBlobDeleteAge + 1*time.Second, + wantIndexCount: 3, + wantCompactionLogCount: 2, + wantCleanupCount: 1, + }, + } + + for _, tc := range cases { + tc := tc + + t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) { + // fake underlying blob store with fake time + storageData := blobtesting.DataMap{} + + fakeLocalTime := faketime.NewTimeAdvance(fakeLocalStartTime) + fakeStorageTime := faketime.NewTimeAdvance(fakeStoreStartTime) + + st := blobtesting.NewMapStorage(storageData, nil, fakeStorageTime.NowFunc()) + st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeStorageTime.NowFunc()) + m := newIndexBlobManagerForTesting(t, st, fakeLocalTime.NowFunc()) + + assertIndexBlobList(t, m) + + b1 := mustWriteIndexBlob(t, m, "index-1") + assertIndexBlobList(t, m, b1) + fakeStorageTime.Advance(1 * time.Second) + + b2 := mustWriteIndexBlob(t, m, "index-2") + assertIndexBlobList(t, m, b1, b2) + fakeStorageTime.Advance(1 * time.Second) + + b3 := mustWriteIndexBlob(t, m, "index-3") + assertIndexBlobList(t, m, b1, b2, b3) + fakeStorageTime.Advance(1 * time.Second) + + b4 := mustWriteIndexBlob(t, m, "index-4") + assertIndexBlobList(t, m, b1, b2, b3, b4) + fakeStorageTime.Advance(1 * time.Second) + assertBlobCounts(t, storageData, 4, 0, 0) + + // first compaction b1+b2+b3=>b4 + mustRegisterCompaction(t, m, []blob.Metadata{b1, b2, b3}, []blob.Metadata{b4}) + + assertIndexBlobList(t, m, b4) + fakeStorageTime.Advance(tc.storageTimeAdvanceBetweenCompactions) + + // second compaction b4+b5=>b6 + b5 := mustWriteIndexBlob(t, m, "index-5") + b6 := mustWriteIndexBlob(t, m, "index-6") + mustRegisterCompaction(t, m, []blob.Metadata{b4, b5}, []blob.Metadata{b6}) + assertIndexBlobList(t, m, b6) + assertBlobCounts(t, storageData, tc.wantIndexCount, tc.wantCompactionLogCount, tc.wantCleanupCount) + }) + } +} + +type action int + +const ( + actionWrite = 1 + actionRead = 2 + actionCompact = 3 + actionDelete = 4 + actionUndelete = 5 + actionCompactAndDropDeleted = 6 +) + +// actionsTestIndexBlobManagerStress is a set of actionsTestIndexBlobManagerStress by each actor performed in TestIndexBlobManagerStress with weights +var actionsTestIndexBlobManagerStress = []struct { + a action + weight int +}{ + {actionWrite, 10}, + {actionRead, 10}, + {actionCompact, 10}, + {actionDelete, 10}, + {actionUndelete, 10}, + {actionCompactAndDropDeleted, 10}, +} + +func pickRandomActionTestIndexBlobManagerStress() action { + sum := 0 + for _, a := range actionsTestIndexBlobManagerStress { + sum += a.weight + } + + n := rand.Intn(sum) + for _, a := range actionsTestIndexBlobManagerStress { + if n < a.weight { + return a.a + } + + n -= a.weight + } + + panic("impossible") +} + +// TestIndexBlobManagerStress launches N actors, each randomly writing new index blobs, +// verifying that all blobs previously written by it are correct and randomly compacting blobs. +// nolint:gocyclo +func TestIndexBlobManagerStress(t *testing.T) { + t.Parallel() + + rand.Seed(time.Now().UnixNano()) + + for i := range actionsTestIndexBlobManagerStress { + actionsTestIndexBlobManagerStress[i].weight = rand.Intn(100) + t.Logf("weight[%v] = %v", i, actionsTestIndexBlobManagerStress[i].weight) + } + + var ( + fakeTimeFunc = faketime.AutoAdvance(fakeLocalStartTime, 100*time.Millisecond) + deadline time.Time // when (according to fakeTimeFunc should the test finish) + localTimeDeadline time.Time // when (according to time.Now, the test should finish) + ) + + localTimeDeadline = time.Now().Add(30 * time.Second) + + if os.Getenv("CI") != "" { + // when running on CI, simulate 4 hours, this takes about ~15-20 seconds. + deadline = fakeTimeFunc().Add(4 * time.Hour) + } else { + // otherwise test only 1 hour, which still provides decent coverage, takes about 3-5 seconds. + deadline = fakeTimeFunc().Add(1 * time.Hour) + } + + // shared storage + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, fakeTimeFunc) + + var eg errgroup.Group + + numActors := 2 + + for actorID := 0; actorID < numActors; actorID++ { + actorID := actorID + loggedSt := logging.NewWrapper(st, func(m string, args ...interface{}) { + t.Logf(fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID)+m, args...) + }, "") + contentPrefix := fmt.Sprintf("a%v", actorID) + + eg.Go(func() error { + numWritten := 0 + deletedContents := map[string]bool{} + ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string { + return fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID) + }) + + m := newIndexBlobManagerForTesting(t, loggedSt, fakeTimeFunc) + + // run stress test until the deadline, aborting early on any failure + for fakeTimeFunc().Before(deadline) && time.Now().Before(localTimeDeadline) { + switch pickRandomActionTestIndexBlobManagerStress() { + case actionRead: + if err := verifyFakeContentsWritten(ctx, m, numWritten, contentPrefix, deletedContents); err != nil { + return errors.Wrapf(err, "actor[%v] error verifying contents", actorID) + } + + case actionWrite: + if err := writeFakeContents(ctx, m, contentPrefix, rand.Intn(10)+5, &numWritten, fakeTimeFunc); err != nil { + return errors.Wrapf(err, "actor[%v] write error", actorID) + } + + case actionDelete: + if err := deleteFakeContents(ctx, m, contentPrefix, numWritten, deletedContents, fakeTimeFunc); err != nil { + return errors.Wrapf(err, "actor[%v] delete error", actorID) + } + + case actionUndelete: + if err := undeleteFakeContents(ctx, m, deletedContents, fakeTimeFunc); err != nil { + return errors.Wrapf(err, "actor[%v] undelete error", actorID) + } + + case actionCompact: + // compaction by more than one actor is unsafe, do it only if actorID == 0 + if actorID != 0 { + continue + } + + if err := fakeCompaction(ctx, m, false); err != nil { + return errors.Wrapf(err, "actor[%v] compaction error", actorID) + } + + case actionCompactAndDropDeleted: + // compaction by more than one actor is unsafe, do it only if actorID == 0 + if actorID != 0 { + continue + } + + if err := fakeCompaction(ctx, m, true); err != nil { + return errors.Wrapf(err, "actor[%v] compaction error", actorID) + } + } + } + + return nil + }) + } + + if err := eg.Wait(); err != nil { + t.Errorf("err: %+v", err) + } +} + +func TestIndexBlobManagerPreventsResurrectOfDeletedContents(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + // the test is randomized and runs very quickly, run it lots of times + failed := false + for i := 0; i < 100 && !failed; i++ { + t.Run(fmt.Sprintf("attempt-%v", i), func(t *testing.T) { + verifyIndexBlobManagerPreventsResurrectOfDeletedContents( + t, 1*time.Second, 1*time.Second, testIndexBlobDeleteAge, 1*time.Second, 2*time.Second, + ) + }) + } +} + +func TestCompactionCreatesPreviousIndex(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + storageData := blobtesting.DataMap{} + + fakeTime := faketime.NewTimeAdvance(fakeLocalStartTime) + fakeTimeFunc := fakeTime.NowFunc() + + st := blobtesting.NewMapStorage(storageData, nil, fakeTimeFunc) + st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeTimeFunc) + st = logging.NewWrapper(st, func(msg string, args ...interface{}) { + t.Logf("[store] "+fakeTimeFunc().Format("150405.000")+" "+msg, args...) + }, "store: ") + m := newIndexBlobManagerForTesting(t, st, fakeTimeFunc) + + numWritten := 0 + deleted := map[string]bool{} + + prefix := "prefix" + ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string { + return fakeTimeFunc().Format("150405.000") + " " + }) + + // index#1 - add content1 + must(t, writeFakeContents(ctx, m, prefix, 1, &numWritten, fakeTimeFunc)) + fakeTime.Advance(1 * time.Second) + + // index#2 - add content2 + must(t, writeFakeContents(ctx, m, prefix, 1, &numWritten, fakeTimeFunc)) + fakeTime.Advance(1 * time.Second) + + // index#3 - {content1, content2}, index#1, index#2 marked for deletion + must(t, fakeCompaction(ctx, m, false)) + fakeTime.Advance(1 * time.Second) + + // index#4 - delete content1 + must(t, deleteFakeContents(ctx, m, prefix, 1, deleted, fakeTimeFunc)) + fakeTime.Advance(1 * time.Second) + + // this will create index identical to index#2, + // we will embed random ID in the index to ensure that they get different blob ID each time. + // otherwise (since indexes are based on hash of content) they would create the same blob ID. + // if this was the case, first compaction marks index#1 as deleted and second compaction + // revives it. + must(t, fakeCompaction(ctx, m, true)) + fakeTime.Advance(testEventualConsistencySettleTime) + + // if we were not to add randomness to index blobs, this would fail. + must(t, verifyFakeContentsWritten(ctx, m, 2, prefix, deleted)) +} + +func TestIndexBlobManagerPreventsResurrectOfDeletedContents_RandomizedTimings(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + // the test is randomized and runs very quickly, run it lots of times + for i := 0; i < 1000; i++ { + t.Run(fmt.Sprintf("attempt-%v", i), func(t *testing.T) { + verifyIndexBlobManagerPreventsResurrectOfDeletedContents( + t, + randomDuration(10*time.Second), + randomDuration(10*time.Second), + testIndexBlobDeleteAge+randomDuration(testIndexBlobDeleteAge), + randomDuration(10*time.Second), + randomDuration(2*testEventualConsistencySettleTime), + ) + }) + } +} + +func randomDuration(max time.Duration) time.Duration { + return time.Duration(float64(max) * rand.Float64()) +} + +func verifyIndexBlobManagerPreventsResurrectOfDeletedContents(t *testing.T, delay1, delay2, delay3, delay4, delay5 time.Duration) { + t.Logf("delays: %v %v %v %v %v", delay1, delay2, delay3, delay4, delay5) + + storageData := blobtesting.DataMap{} + + fakeTime := faketime.NewTimeAdvance(fakeLocalStartTime) + fakeTimeFunc := fakeTime.NowFunc() + + st := blobtesting.NewMapStorage(storageData, nil, fakeTimeFunc) + st = blobtesting.NewEventuallyConsistentStorage(st, testEventualConsistencySettleTime, fakeTimeFunc) + st = logging.NewWrapper(st, func(msg string, args ...interface{}) { + t.Logf("[store] "+fakeTimeFunc().Format("150405.000")+" "+msg, args...) + }, "store: ") + m := newIndexBlobManagerForTesting(t, st, fakeTimeFunc) + + numWritten := 0 + deleted := map[string]bool{} + + prefix := "prefix" + ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string { + return fakeTimeFunc().Format("150405.000") + " " + }) + + // index#1 - write 2 contents + must(t, writeFakeContents(ctx, m, prefix, 2, &numWritten, fakeTimeFunc)) + fakeTime.Advance(delay1) + // index#2 - delete first of the two contents. + must(t, deleteFakeContents(ctx, m, prefix, 1, deleted, fakeTimeFunc)) + fakeTime.Advance(delay2) + // index#3, log#3 - replaces index#1 and #2 + must(t, fakeCompaction(ctx, m, true)) + fakeTime.Advance(delay3) + + numWritten2 := numWritten + + // index#4 - create one more content + must(t, writeFakeContents(ctx, m, prefix, 2, &numWritten, fakeTimeFunc)) + fakeTime.Advance(delay4) + + // index#5, log#4 replaces index#3 and index#4, this will delete index#1 and index#2 and log#3 + must(t, fakeCompaction(ctx, m, true)) + + t.Logf("************************************************ VERIFY") + + // advance the time just enough for eventual consistency to be visible + fakeTime.Advance(delay5) + + // using another reader, make sure that all writes up to numWritten2 are correct regardless of whether + // compaction is visible + another := newIndexBlobManagerForTesting(t, st, fakeTimeFunc) + must(t, verifyFakeContentsWritten(ctx, another, numWritten2, prefix, deleted)) + + // verify that this reader can see all its own writes regardless of eventual consistency + must(t, verifyFakeContentsWritten(ctx, m, numWritten, prefix, deleted)) + + // after eventual consistency is settled, another reader can see all our writes + fakeTime.Advance(testEventualConsistencySettleTime) + must(t, verifyFakeContentsWritten(ctx, another, numWritten, prefix, deleted)) +} + +type fakeContentIndexEntry struct { + ModTime time.Time + Deleted bool +} + +func verifyFakeContentsWritten(ctx context.Context, m indexBlobManager, numWritten int, contentPrefix string, deletedContents map[string]bool) error { + if numWritten == 0 { + return nil + } + + log(ctx).Debugf("verifyFakeContentsWritten()") + defer log(ctx).Debugf("finished verifyFakeContentsWritten()") + + all, _, err := getAllFakeContents(ctx, m) + if err != nil { + return errors.Wrap(err, "error getting all contents") + } + + // verify that all contents previously written can be read. + for i := 0; i < numWritten; i++ { + id := fakeContentID(contentPrefix, i) + if _, ok := all[id]; !ok { + if deletedContents[id] { + continue + } + + return errors.Errorf("could not find content previously written by itself: %v (got %v)", id, all) + } + + if got, want := all[id].Deleted, deletedContents[id]; got != want { + return errors.Errorf("deleted flag does not match for %v: %v want %v", id, got, want) + } + } + + return nil +} + +func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) error { + log(ctx).Debugf("fakeCompaction(dropDeleted=%v)", dropDeleted) + defer log(ctx).Debugf("finished fakeCompaction(dropDeleted=%v)", dropDeleted) + + allContents, allBlobs, err := getAllFakeContents(ctx, m) + if err != nil { + return errors.Wrap(err, "error getting contents") + } + + dropped := map[string]fakeContentIndexEntry{} + + if dropDeleted { + for cid, e := range allContents { + if e.Deleted { + dropped[cid] = e + + delete(allContents, cid) + } + } + } + + if len(allBlobs) <= 1 { + return nil + } + + outputBM, err := writeFakeIndex(ctx, m, allContents) + if err != nil { + return errors.Wrap(err, "unable to write index") + } + + for cid, e := range dropped { + log(ctx).Debugf("dropped deleted %v %v from %v", cid, e, outputBM) + } + + var ( + inputs []blob.Metadata + outputs = []blob.Metadata{outputBM} + ) + + for _, bi := range allBlobs { + if bi.BlobID == outputBM.BlobID { + // no compaction, output is the same as one of the inputs + return nil + } + + inputs = append(inputs, bi.Metadata) + } + + if err := m.registerCompaction(ctx, inputs, outputs); err != nil { + return errors.Wrap(err, "compaction error") + } + + return nil +} + +func fakeContentID(prefix string, n int) string { + return fmt.Sprintf("%v-%06v", prefix, n) +} + +func deleteFakeContents(ctx context.Context, m indexBlobManager, prefix string, numWritten int, deleted map[string]bool, timeFunc func() time.Time) error { + if numWritten == 0 { + return nil + } + + log(ctx).Debugf("deleteFakeContents()") + defer log(ctx).Debugf("finished deleteFakeContents()") + + count := rand.Intn(10) + 5 + + ndx := map[string]fakeContentIndexEntry{} + + for i := 0; i < count; i++ { + n := fakeContentID(prefix, rand.Intn(numWritten)) + if deleted[n] { + continue + } + + ndx[n] = fakeContentIndexEntry{ + ModTime: timeFunc(), + Deleted: true, + } + + deleted[n] = true + } + + if len(ndx) == 0 { + return nil + } + + _, err := writeFakeIndex(ctx, m, ndx) + + return err +} + +func undeleteFakeContents(ctx context.Context, m indexBlobManager, deleted map[string]bool, timeFunc func() time.Time) error { + if len(deleted) == 0 { + return nil + } + + log(ctx).Debugf("undeleteFakeContents()") + defer log(ctx).Debugf("finished undeleteFakeContents()") + + count := rand.Intn(5) + + ndx := map[string]fakeContentIndexEntry{} + + for n := range deleted { + if count == 0 { + break + } + + // undelete + ndx[n] = fakeContentIndexEntry{ + ModTime: timeFunc(), + Deleted: false, + } + + delete(deleted, n) + count-- + } + + if len(ndx) == 0 { + return nil + } + + _, err := writeFakeIndex(ctx, m, ndx) + + return err +} + +func writeFakeContents(ctx context.Context, m indexBlobManager, prefix string, count int, numWritten *int, timeFunc func() time.Time) error { + log(ctx).Debugf("writeFakeContents()") + defer log(ctx).Debugf("finished writeFakeContents()") + + ndx := map[string]fakeContentIndexEntry{} + + for i := 0; i < count; i++ { + n := fakeContentID(prefix, *numWritten) + ndx[n] = fakeContentIndexEntry{ + ModTime: timeFunc(), + } + + (*numWritten)++ + } + + _, err := writeFakeIndex(ctx, m, ndx) + + return err +} + +type fakeIndexData struct { + RandomID int64 + Entries map[string]fakeContentIndexEntry +} + +func writeFakeIndex(ctx context.Context, m indexBlobManager, ndx map[string]fakeContentIndexEntry) (blob.Metadata, error) { + j, err := json.Marshal(fakeIndexData{ + RandomID: rand.Int63(), + Entries: ndx, + }) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "json error") + } + + bm, err := m.writeIndexBlob(ctx, j) + if err != nil { + return blob.Metadata{}, errors.Wrap(err, "error writing blob") + } + + for k, v := range ndx { + log(ctx).Debugf("wrote content %v %v in blob %v", k, v, bm) + } + + return bm, nil +} + +var errGetAllFakeContentsRetry = errors.New("retry") + +func getAllFakeContents(ctx context.Context, m indexBlobManager) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) { + allContents, allBlobs, err := getAllFakeContentsInternal(ctx, m) + + for err == errGetAllFakeContentsRetry { + allContents, allBlobs, err = getAllFakeContentsInternal(ctx, m) + } + + return allContents, allBlobs, err +} + +func getAllFakeContentsInternal(ctx context.Context, m indexBlobManager) (map[string]fakeContentIndexEntry, []IndexBlobInfo, error) { + blobs, err := m.listIndexBlobs(ctx, false) + if err != nil { + return nil, nil, errors.Wrap(err, "error listing index blobs") + } + + log(ctx).Debugf("got blobs: %v", blobs) + + allContents := map[string]fakeContentIndexEntry{} + + for _, bi := range blobs { + bb, err := m.getIndexBlob(ctx, bi.BlobID) + if err == blob.ErrBlobNotFound { + return nil, nil, errGetAllFakeContentsRetry + } + + if err != nil { + return nil, nil, errors.Wrap(err, "error reading blob") + } + + var indexData fakeIndexData + + if err := json.Unmarshal(bb, &indexData); err != nil { + log(ctx).Debugf("invalid JSON %v: %v", string(bb), err) + return nil, nil, errors.Wrap(err, "error unmarshaling") + } + + // merge contents based based on time + for k, v := range indexData.Entries { + old, ok := allContents[k] + + if !ok { + allContents[k] = v + } else if v.ModTime.After(old.ModTime) { + allContents[k] = v + } + } + } + + return allContents, blobs, nil +} + +func assertBlobCounts(t *testing.T, data blobtesting.DataMap, wantN, wantM, wantL int) { + t.Helper() + require.Len(t, keysWithPrefix(data, compactionLogBlobPrefix), wantM) + require.Len(t, keysWithPrefix(data, indexBlobPrefix), wantN) + require.Len(t, keysWithPrefix(data, "l"), wantL) +} + +func keysWithPrefix(data blobtesting.DataMap, prefix blob.ID) []blob.ID { + var res []blob.ID + + for k := range data { + if strings.HasPrefix(string(k), string(prefix)) { + res = append(res, k) + } + } + + return res +} + +func mustRegisterCompaction(t *testing.T, m indexBlobManager, inputs, outputs []blob.Metadata) { + t.Logf("compacting %v to %v", inputs, outputs) + + err := m.registerCompaction(testlogging.Context(t), inputs, outputs) + if err != nil { + t.Fatalf("failed to write index blob: %v", err) + } +} + +func mustWriteIndexBlob(t *testing.T, m indexBlobManager, data string) blob.Metadata { + t.Logf("writing index blob %q", data) + + blobMD, err := m.writeIndexBlob(testlogging.Context(t), []byte(data)) + if err != nil { + t.Fatalf("failed to write index blob: %v", err) + } + + return blobMD +} + +func assertIndexBlobList(t *testing.T, m indexBlobManager, wantMD ...blob.Metadata) { + t.Helper() + + var want []blob.ID + for _, it := range wantMD { + want = append(want, it.BlobID) + } + + l, err := m.listIndexBlobs(testlogging.Context(t), false) + if err != nil { + t.Fatalf("failed to list index blobs: %v", err) + } + + t.Logf("asserting blob list %v vs %v", want, l) + + var got []blob.ID + for _, it := range l { + got = append(got, it.BlobID) + } + + require.ElementsMatch(t, got, want) +} + +func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow func() time.Time) indexBlobManager { + p := &FormattingOptions{ + Encryption: encryption.DeprecatedNoneAlgorithm, + Hash: hashing.DefaultAlgorithm, + } + + enc, err := encryption.CreateEncryptor(p) + if err != nil { + t.Fatalf("unable to create encryptor: %v", err) + } + + hf, err := hashing.CreateHashFunc(p) + if err != nil { + t.Fatalf("unable to create hash: %v", err) + } + + lc, err := newListCache(st, &CachingOptions{}) + if err != nil { + t.Fatalf("unable to create list cache: %v", err) + } + + m := &indexBlobManagerImpl{ + st: st, + ownWritesCache: &persistentOwnWritesCache{ + blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, localTimeNow), + localTimeNow}, + indexBlobCache: passthroughContentCache{st}, + encryptor: enc, + hasher: hf, + listCache: lc, + timeNow: localTimeNow, + maxEventualConsistencySettleTime: testIndexBlobDeleteAge, + } + + return m +} diff --git a/repo/content/list_cache.go b/repo/content/list_cache.go index 3fc931ad9..7c2c5cee0 100644 --- a/repo/content/list_cache.go +++ b/repo/content/list_cache.go @@ -1,15 +1,15 @@ package content import ( + "bytes" "context" "encoding/json" - "fmt" "io/ioutil" - "math" "os" "path/filepath" "time" + "github.com/natefinch/atomic" "github.com/pkg/errors" "github.com/kopia/kopia/internal/hmac" @@ -18,70 +18,69 @@ type listCache struct { st blob.Storage - cacheFile string + cacheFilePrefix string listCacheDuration time.Duration hmacSecret []byte } -func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) { - if c.cacheFile != "" { - ci, err := c.readContentsFromCache(ctx) +func (c *listCache) listBlobs(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) { + if c.cacheFilePrefix != "" { + ci, err := c.readBlobsFromCache(ctx, prefix) if err == nil { expirationTime := ci.Timestamp.Add(c.listCacheDuration) if time.Now().Before(expirationTime) { // allow:no-inject-time - log(ctx).Debugf("retrieved list of index blobs from cache") - return ci.Contents, nil + log(ctx).Debugf("retrieved list of %v '%v' index blobs from cache", len(ci.Blobs), prefix) + return ci.Blobs, nil } } else if err != blob.ErrBlobNotFound { log(ctx).Warningf("unable to open cache file: %v", err) } } - contents, err := listIndexBlobsFromStorage(ctx, c.st) + blobs, err := blob.ListAllBlobs(ctx, c.st, prefix) if err == nil { - c.saveListToCache(ctx, &cachedList{ - Contents: contents, + c.saveListToCache(ctx, prefix, &cachedList{ + Blobs: blobs, Timestamp: time.Now(), // allow:no-inject-time }) } - log(ctx).Debugf("found %v index blobs from source", len(contents)) + log(ctx).Debugf("listed %v index blobs with prefix %v from source", len(blobs), prefix) - return contents, err + return blobs, err } -func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) { - if c.cacheFile == "" { +func (c *listCache) saveListToCache(ctx context.Context, prefix blob.ID, ci *cachedList) { + if c.cacheFilePrefix == "" { return } - log(ctx).Debugf("saving index blobs to cache: %v", len(ci.Contents)) + log(ctx).Debugf("saving %v blobs with prefix %v to cache", len(ci.Blobs), prefix) if data, err := json.Marshal(ci); err == nil { - mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano()) // allow:no-inject-time - if err := ioutil.WriteFile(c.cacheFile+mySuffix, hmac.Append(data, c.hmacSecret), 0600); err != nil { + b := hmac.Append(data, c.hmacSecret) + if err := atomic.WriteFile(c.cacheFilePrefix+string(prefix), bytes.NewReader(b)); err != nil { log(ctx).Warningf("unable to write list cache: %v", err) } - - os.Rename(c.cacheFile+mySuffix, c.cacheFile) //nolint:errcheck - os.Remove(c.cacheFile + mySuffix) //nolint:errcheck } } -func (c *listCache) deleteListCache() { - if c.cacheFile != "" { - os.Remove(c.cacheFile) //nolint:errcheck +func (c *listCache) deleteListCache(prefix blob.ID) { + if c.cacheFilePrefix != "" { + os.Remove(c.cacheFilePrefix + string(prefix)) //nolint:errcheck } } -func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, error) { +func (c *listCache) readBlobsFromCache(ctx context.Context, prefix blob.ID) (*cachedList, error) { if !shouldUseListCache(ctx) { return nil, blob.ErrBlobNotFound } ci := &cachedList{} - data, err := ioutil.ReadFile(c.cacheFile) + fname := c.cacheFilePrefix + string(prefix) + + data, err := ioutil.ReadFile(fname) //nolint:gosec if err != nil { if os.IsNotExist(err) { return nil, blob.ErrBlobNotFound @@ -92,7 +91,7 @@ func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, err data, err = hmac.VerifyAndStrip(data, c.hmacSecret) if err != nil { - return nil, errors.Wrapf(err, "invalid file %v", c.cacheFile) + return nil, errors.Wrapf(err, "invalid file %v", fname) } if err := json.Unmarshal(data, &ci); err != nil { @@ -104,36 +103,14 @@ func (c *listCache) readContentsFromCache(ctx context.Context) (*cachedList, err type cachedList struct { Timestamp time.Time `json:"timestamp"` - Contents []IndexBlobInfo `json:"contents"` + Blobs []blob.Metadata `json:"blobs"` } -// listIndexBlobsFromStorage returns the list of index blobs in the given storage. -// The list of contents is not guaranteed to be sorted. -func listIndexBlobsFromStorage(ctx context.Context, st blob.Storage) ([]IndexBlobInfo, error) { - snapshot, err := blob.ListAllBlobsConsistent(ctx, st, newIndexBlobPrefix, math.MaxInt32) - if err != nil { - return nil, err - } - - var results []IndexBlobInfo - - for _, it := range snapshot { - ii := IndexBlobInfo{ - BlobID: it.BlobID, - Timestamp: it.Timestamp, - Length: it.Length, - } - results = append(results, ii) - } - - return results, err -} - -func newListCache(st blob.Storage, caching CachingOptions) (*listCache, error) { - var listCacheFile string +func newListCache(st blob.Storage, caching *CachingOptions) (*listCache, error) { + var listCacheFilePrefix string if caching.CacheDirectory != "" { - listCacheFile = filepath.Join(caching.CacheDirectory, "list") + listCacheFilePrefix = filepath.Join(caching.CacheDirectory, "blob-list-") if _, err := os.Stat(caching.CacheDirectory); os.IsNotExist(err) { if err := os.MkdirAll(caching.CacheDirectory, 0700); err != nil { @@ -144,14 +121,10 @@ func newListCache(st blob.Storage, caching CachingOptions) (*listCache, error) { c := &listCache{ st: st, - cacheFile: listCacheFile, + cacheFilePrefix: listCacheFilePrefix, hmacSecret: caching.HMACSecret, listCacheDuration: time.Duration(caching.MaxListCacheDurationSec) * time.Second, } - if caching.IgnoreListCache { - c.deleteListCache() - } - return c, nil } diff --git a/repo/content/packindex_test.go b/repo/content/packindex_test.go index 79cce8bf4..01113e692 100644 --- a/repo/content/packindex_test.go +++ b/repo/content/packindex_test.go @@ -117,12 +117,21 @@ func TestPackIndex(t *testing.T) { data2 := buf2.Bytes() data3 := buf3.Bytes() - if !bytes.Equal(data1, data2) { - t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1), hex.Dump(data2)) + // each build produces exactly idendical prefix except for the trailing random bytes. + data1Prefix := data1[0 : len(data1)-randomSuffixSize] + data2Prefix := data2[0 : len(data2)-randomSuffixSize] + data3Prefix := data3[0 : len(data3)-randomSuffixSize] + + if !bytes.Equal(data1Prefix, data2Prefix) { + t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1Prefix), hex.Dump(data2Prefix)) } - if !bytes.Equal(data2, data3) { - t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2), hex.Dump(data3)) + if !bytes.Equal(data2Prefix, data3Prefix) { + t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2Prefix), hex.Dump(data3Prefix)) + } + + if bytes.Equal(data1, data2) { + t.Errorf("builder output expected to be different, but was the same") } t.Run("FuzzTest", func(t *testing.T) { diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index eefe33215..c7916d77f 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -148,7 +148,7 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // write some data to storage - bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{}) + bm, err := content.NewManager(ctx, st, f, nil, content.ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } @@ -174,7 +174,7 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // make a new content manager based on corrupted data. - bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{}) + bm, err = content.NewManager(ctx, st, f, nil, content.ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } @@ -305,7 +305,7 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da Encryption: encryption.DefaultAlgorithm, MaxPackSize: 100000, Version: 1, - }, content.CachingOptions{}, content.ManagerOptions{}) + }, nil, content.ManagerOptions{}) if err != nil { t.Fatalf("can't create content manager: %v", err) } diff --git a/repo/open.go b/repo/open.go index fcc1a1d5a..a607d3c0a 100644 --- a/repo/open.go +++ b/repo/open.go @@ -80,7 +80,7 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor st = loggingwrapper.NewWrapper(st, options.TraceStorage, "[STORAGE] ") } - r, err := OpenWithConfig(ctx, st, lc, password, options, *lc.Caching) + r, err := OpenWithConfig(ctx, st, lc, password, options, lc.Caching) if err != nil { st.Close(ctx) //nolint:errcheck return nil, err @@ -103,7 +103,9 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor } // OpenWithConfig opens the repository with a given configuration, avoiding the need for a config file. -func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching content.CachingOptions) (*DirectRepository, error) { +func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching *content.CachingOptions) (*DirectRepository, error) { + caching = caching.CloneOrDefault() + // Read format blob, potentially from cache. fb, err := readAndCacheFormatBlobBytes(ctx, st, caching.CacheDirectory) if err != nil { @@ -173,7 +175,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw } // SetCachingConfig changes caching configuration for a given repository. -func (r *DirectRepository) SetCachingConfig(ctx context.Context, opt content.CachingOptions) error { +func (r *DirectRepository) SetCachingConfig(ctx context.Context, opt *content.CachingOptions) error { lc, err := loadConfigFromFile(r.ConfigFile) if err != nil { return err diff --git a/repo/repository.go b/repo/repository.go index 94faef6ba..0fc291228 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -49,6 +49,8 @@ type DirectRepository struct { timeNow func() time.Time formatBlob *formatBlob masterKey []byte + + closed bool } // DeriveKey derives encryption key of the provided length from the master key. @@ -114,6 +116,10 @@ func (r *DirectRepository) DeleteManifest(ctx context.Context, id manifest.ID) e // Close closes the repository and releases all resources. func (r *DirectRepository) Close(ctx context.Context) error { + if r.closed { + return nil + } + if err := r.Flush(ctx); err != nil { return errors.Wrap(err, "error flushing") } @@ -130,6 +136,8 @@ func (r *DirectRepository) Close(ctx context.Context) error { return errors.Wrap(err, "error closing blob storage") } + r.closed = true + return nil } diff --git a/tests/end_to_end_test/index_recover_test.go b/tests/end_to_end_test/index_recover_test.go index e3574ec33..e9e0fe7ef 100644 --- a/tests/end_to_end_test/index_recover_test.go +++ b/tests/end_to_end_test/index_recover_test.go @@ -39,9 +39,13 @@ func TestIndexRecover(t *testing.T) { e.RunAndExpectSuccess(t, "blob", "delete", indexFile) } + // clear the cache to get rid of cache of own writes. + e.RunAndVerifyOutputLineCount(t, 0, "cache", "clear") + // there should be no index files at this point e.RunAndVerifyOutputLineCount(t, 0, "index", "ls", "--no-list-caching") - // there should be no blocks, since there are no indexesto find them + + // there should be no contents, since there are no indexes to find them e.RunAndVerifyOutputLineCount(t, 0, "content", "ls") // now recover index from all blocks diff --git a/tests/repository_stress_test/repository_stress_test.go b/tests/repository_stress_test/repository_stress_test.go index 022bc2408..4c4b05a20 100644 --- a/tests/repository_stress_test/repository_stress_test.go +++ b/tests/repository_stress_test/repository_stress_test.go @@ -35,7 +35,7 @@ func TestStressRepository(t *testing.T) { t.Skip("skipping stress test during short tests") } - ctx := content.UsingListCache(testlogging.Context(t), false) + ctx := testlogging.Context(t) tmpPath, err := ioutil.TempDir("", "kopia") if err != nil { diff --git a/tests/stress_test/stress_test.go b/tests/stress_test/stress_test.go index f48b48a1e..289019096 100644 --- a/tests/stress_test/stress_test.go +++ b/tests/stress_test/stress_test.go @@ -31,8 +31,6 @@ func TestStressBlockManager(t *testing.T) { duration = 30 * time.Second } - // TODO: use blobtesting.NewEventuallyConsistentStorage(memst, 0.1) instead of memst here - stressTestWithStorage(t, memst, duration) } @@ -46,7 +44,7 @@ func stressTestWithStorage(t *testing.T, st blob.Storage, duration time.Duration Encryption: "AES-256-CTR", MaxPackSize: 20000000, MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, - }, content.CachingOptions{}, content.ManagerOptions{}) + }, nil, content.ManagerOptions{}) } seed0 := time.Now().Nanosecond()