From d9ce3d0ad63f64d8845a60af6b5bd58aed5d2ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= Date: Tue, 10 Mar 2020 00:42:10 -0700 Subject: [PATCH] Inject time in Kopia components (#314) Motivation: Allow time injection for (unit) tests, to more easily test and verify time-dependent invariants. Add time injection support for: * repo.Manager * manifest.Manager * snapshot.Uploader Then, wire up to these components. The content.Manager already had support for time injection, but was not wired up from the time function passed to repo creation. Add an internal/faketime package for testing. Mainly code movement from testing code in the repo/content package. Motivation: make it available to other packages outside content Also, add simple tests for faketime functions. --- Makefile | 8 +- internal/faketime/faketime.go | 62 ++++++++++++ internal/faketime/faketime_test.go | 91 +++++++++++++++++ internal/repotesting/repotesting_test.go | 97 +++++++++++++++++++ repo/blob/azure/azure_storage.go | 2 +- repo/blob/filesystem/filesystem_storage.go | 2 +- repo/blob/gcs/gcs_storage.go | 2 +- repo/content/block_manager_compaction.go | 6 +- .../committed_content_index_disk_cache.go | 2 +- repo/content/content_cache.go | 4 +- repo/content/content_manager.go | 19 +++- repo/content/content_manager_test.go | 43 +++----- repo/content/list_cache.go | 6 +- repo/manifest/manifest_manager.go | 19 +++- repo/manifest/manifest_manager_test.go | 12 +-- repo/open.go | 12 ++- repo/repository.go | 14 +++ snapshot/gc/gc.go | 2 +- snapshot/policy/retention_policy.go | 2 +- snapshot/snapshotfs/all_sources.go | 2 +- snapshot/snapshotfs/source_directories.go | 2 +- snapshot/snapshotfs/source_snapshots.go | 2 +- snapshot/snapshotfs/upload.go | 5 +- tests/stress_test/stress_test.go | 2 +- 24 files changed, 351 insertions(+), 67 deletions(-) create mode 100644 internal/faketime/faketime.go create mode 100644 internal/faketime/faketime_test.go create mode 100644 internal/repotesting/repotesting_test.go diff --git a/Makefile b/Makefile index c69255c77..3911229d9 100644 --- a/Makefile +++ b/Makefile @@ -44,7 +44,13 @@ lint: $(linter) lint-and-log: $(linter) $(linter) --deadline 180s run $(linter_flags) | tee .linterr.txt -vet: + +vet-time-inject: + ! find repo snapshot -name '*.go' -not -path 'repo/blob/logging/*' -not -name '*_test.go' \ + -exec grep -n -e time.Now -e time.Since -e time.Until {} + \ + | grep -v -e allow:no-inject-time + +vet: vet-time-inject go vet -all . travis-setup: travis-install-gpg-key travis-install-test-credentials all-tools diff --git a/internal/faketime/faketime.go b/internal/faketime/faketime.go new file mode 100644 index 000000000..e97d7d254 --- /dev/null +++ b/internal/faketime/faketime.go @@ -0,0 +1,62 @@ +// Package faketime fakes time for tests +package faketime + +import ( + "sync" + "sync/atomic" + "time" +) + +// Frozen returns a function that always returns t +func Frozen(t time.Time) func() time.Time { + return func() time.Time { + return t + } +} + +// AutoAdvance returns a time source function that returns a time equal to +// 't + ((n - 1) * dt)' wheren n is the number of serialized invocations of +// the returned function. The returned function will generate a time series of +// the form [t, t+dt, t+2dt, t+3dt, ...] +func AutoAdvance(t time.Time, dt time.Duration) func() time.Time { + var mu sync.Mutex + + return func() time.Time { + mu.Lock() + defer mu.Unlock() + + ret := t + t = t.Add(dt) + + return ret + } +} + +// TimeAdvance allows controlling the passage of time. Intended to be used in +// tests. +type TimeAdvance struct { + base time.Time + delta int64 +} + +// NewTimeAdvance creates a TimeAdvance with the given start time +func NewTimeAdvance(start time.Time) *TimeAdvance { + return &TimeAdvance{base: start} +} + +// NowFunc returns a time provider function for t +func (t *TimeAdvance) NowFunc() func() time.Time { + return func() time.Time { + dt := atomic.LoadInt64(&t.delta) + + return t.base.Add(time.Duration(dt)) + } +} + +// Advance advances t by dt, such that the next call to t.NowFunc()() returns +// current t + dt +func (t *TimeAdvance) Advance(dt time.Duration) time.Time { + advance := atomic.AddInt64(&t.delta, int64(dt)) + + return t.base.Add(time.Duration(advance)) +} diff --git a/internal/faketime/faketime_test.go b/internal/faketime/faketime_test.go new file mode 100644 index 000000000..d7ed47918 --- /dev/null +++ b/internal/faketime/faketime_test.go @@ -0,0 +1,91 @@ +package faketime + +import ( + "sync" + "testing" + "time" +) + +func TestFrozen(t *testing.T) { + times := []time.Time{ + time.Date(2015, 1, 3, 0, 0, 0, 0, time.UTC), + time.Now(), + } + + for _, tm := range times { + timeNow := Frozen(tm) + + for i := 0; i < 5; i++ { + if want, got := tm, timeNow(); got != want { + t.Fatalf("Invalid frozen time, got: %v, want: %v", got, want) + } + } + } +} + +func TestAutoAdvance(t *testing.T) { + const ( + goRoutinesCount = 3 + iterations = 20 + ) + + startTime := time.Date(2018, 1, 6, 0, 0, 0, 0, time.UTC) + timeNow := AutoAdvance(startTime, 10*time.Second) + tchan := make(chan time.Time, 2*goRoutinesCount) + + var wg sync.WaitGroup + + wg.Add(goRoutinesCount) + + for i := 0; i < goRoutinesCount; i++ { + go func() { + defer wg.Done() + + times := make([]time.Time, iterations) + + for j := 0; j < iterations; j++ { + times[j] = timeNow() + } + + for _, ts := range times { + tchan <- ts + } + }() + } + + go func() { + wg.Wait() + close(tchan) + }() + + tMap := make(map[time.Time]struct{}, iterations*goRoutinesCount) + + for ts := range tchan { + if _, ok := tMap[ts]; ok { + t.Error("Found repeated time value: ", ts) + } + + tMap[ts] = struct{}{} + } + + if got, want := len(tMap), goRoutinesCount*iterations; got != want { + t.Fatalf("number of generated times does not match, got: %v, want: %v", got, want) + } +} + +func TestTimeAdvance(t *testing.T) { + startTime := time.Date(2019, 1, 6, 0, 0, 0, 0, time.UTC) + ta := NewTimeAdvance(startTime) + now := ta.NowFunc() + + if got, want := now(), startTime; got != want { + t.Errorf("expected time does not match, got: %v, want: %v", got, want) + } + + dt := 5 * time.Minute + ta.Advance(dt) + + if got, want := now(), startTime.Add(dt); got != want { + t.Errorf("expected time does not match, got: %v, want: %v", got, want) + } +} diff --git a/internal/repotesting/repotesting_test.go b/internal/repotesting/repotesting_test.go new file mode 100644 index 000000000..7220dd439 --- /dev/null +++ b/internal/repotesting/repotesting_test.go @@ -0,0 +1,97 @@ +package repotesting + +import ( + "testing" + "time" + + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/mockfs" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" + "github.com/kopia/kopia/snapshot/snapshotfs" +) + +func TestTimeFuncWiring(t *testing.T) { + var env Environment + + ctx := testlogging.Context(t) + defer env.Setup(t).Close(ctx, t) + + env.Repository.Close(ctx) + + ft := faketime.NewTimeAdvance(time.Date(2018, time.February, 6, 0, 0, 0, 0, time.UTC)) + + // Re open with injected time + r, err := repo.Open(ctx, env.Repository.ConfigFile, masterPassword, &repo.Options{TimeNowFunc: ft.NowFunc()}) + if err != nil { + t.Fatal("Failed to open repo:", err) + } + + env.Repository = r + + // verify wiring for the repo layer + if got, want := r.Time(), ft.NowFunc()(); !got.Equal(want) { + t.Errorf("times don't match, got %v, want %v", got, want) + } + + if want, got := ft.Advance(10*time.Minute), r.Time(); !got.Equal(want) { + t.Errorf("times don't match, got %v, want %v", got, want) + } + + // verify wiring for the content layer + nt := ft.Advance(20 * time.Second) + + cid, err := r.Content.WriteContent(ctx, []byte("foo"), "") + if err != nil { + t.Fatal("failed to write content:", err) + } + + info, err := r.Content.ContentInfo(ctx, cid) + if err != nil { + t.Fatal("failed to get content info for", cid, err) + } + + if got, want := info.Timestamp(), nt; !got.Equal(want) { + t.Errorf("content time does not match, got %v, want %v", got, want) + } + + // verify wiring for the manifest layer + nt = ft.Advance(3 * time.Minute) + labels := map[string]string{"l1": "v1", "l2": "v2", "type": "my-manifest"} + mid, err := r.Manifests.Put(ctx, labels, "manifest content") + + if err != nil { + t.Fatal("failed to put manifest:", err) + } + + meta, err := r.Manifests.GetMetadata(ctx, mid) + + if err != nil { + t.Fatal("failed to get manifest metadata:", err) + } + + if got, want := meta.ModTime, nt; !got.Equal(want) { + t.Errorf("manifest time does not match, got %v, want %v", got, want) + } + + const defaultPermissions = 0777 + + // verify wiring for the snapshot layer + sourceDir := mockfs.NewDirectory() + sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions) + + nt = ft.Advance(1 * time.Hour) + u := snapshotfs.NewUploader(r) + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + s1, err := u.Upload(ctx, sourceDir, policyTree, snapshot.SourceInfo{}) + + if err != nil { + t.Fatal("failed to create snapshot:", err) + } + + if got, want := nt, s1.StartTime; !got.Equal(want) { + t.Fatalf("snapshot time does not match, got: %v, want: %v", got, want) + } +} diff --git a/repo/blob/azure/azure_storage.go b/repo/blob/azure/azure_storage.go index 5489d05d8..32b0f0a9b 100644 --- a/repo/blob/azure/azure_storage.go +++ b/repo/blob/azure/azure_storage.go @@ -238,7 +238,7 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) { // verify Azure connection is functional by listing blobs in a bucket, which will fail if the container // does not exist. We list with a prefix that will not exist, to avoid iterating through any objects. - nonExistentPrefix := fmt.Sprintf("kopia-azure-storage-initializing-%v", time.Now().UnixNano()) + nonExistentPrefix := fmt.Sprintf("kopia-azure-storage-initializing-%v", time.Now().UnixNano()) // allow:no-inject-time err = az.ListBlobs(ctx, blob.ID(nonExistentPrefix), func(md blob.Metadata) error { return nil }) diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index 86b550c30..af14599b7 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -193,7 +193,7 @@ func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold ti return err } - n := time.Now() + n := time.Now() // allow:no-inject-time age := n.Sub(st.ModTime()) if age < threshold { diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index d825b86bf..7995292eb 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -277,7 +277,7 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) { // verify GCS connection is functional by listing blobs in a bucket, which will fail if the bucket // does not exist. We list with a prefix that will not exist, to avoid iterating through any objects. - nonExistentPrefix := fmt.Sprintf("kopia-gcs-storage-initializing-%v", time.Now().UnixNano()) + nonExistentPrefix := fmt.Sprintf("kopia-gcs-storage-initializing-%v", time.Now().UnixNano()) // allow:no-inject-time err = gcs.ListBlobs(ctx, blob.ID(nonExistentPrefix), func(md blob.Metadata) error { return nil }) diff --git a/repo/content/block_manager_compaction.go b/repo/content/block_manager_compaction.go index dd05098f9..90d9cfa6b 100644 --- a/repo/content/block_manager_compaction.go +++ b/repo/content/block_manager_compaction.go @@ -89,7 +89,7 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs [] formatLog(ctx).Debugf("compacting %v contents", len(indexBlobs)) - t0 := time.Now() + t0 := time.Now() // allow:no-inject-time bld := make(packIndexBuilder) for _, indexBlob := range indexBlobs { @@ -108,7 +108,7 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs [] return errors.Wrap(err, "unable to write compacted indexes") } - formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0)) + formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0)) // allow:no-inject-time for _, indexBlob := range indexBlobs { if indexBlob.BlobID == compactedIndexBlob { @@ -137,7 +137,7 @@ func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuil } _ = index.Iterate("", func(i Info) error { - if i.Deleted && opt.SkipDeletedOlderThan > 0 && time.Since(i.Timestamp()) > opt.SkipDeletedOlderThan { + if i.Deleted && opt.SkipDeletedOlderThan > 0 && bm.timeNow().Sub(i.Timestamp()) > opt.SkipDeletedOlderThan { log(ctx).Debugf("skipping content %v deleted at %v", i.ID, i.Timestamp()) return nil } diff --git a/repo/content/committed_content_index_disk_cache.go b/repo/content/committed_content_index_disk_cache.go index 6db93b558..34b1edded 100644 --- a/repo/content/committed_content_index_disk_cache.go +++ b/repo/content/committed_content_index_disk_cache.go @@ -151,7 +151,7 @@ func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used } for _, rem := range remaining { - if time.Since(rem.ModTime()) > unusedCommittedContentIndexCleanupTime { + if time.Since(rem.ModTime()) > unusedCommittedContentIndexCleanupTime { // allow:no-inject-time log(ctx).Debugf("removing unused %v %v", rem.Name(), rem.ModTime()) if err := os.Remove(filepath.Join(c.dirname, rem.Name())); err != nil { diff --git a/repo/content/content_cache.go b/repo/content/content_cache.go index c0b37fe9f..b17dd7acd 100644 --- a/repo/content/content_cache.go +++ b/repo/content/content_cache.go @@ -175,7 +175,7 @@ func (c *contentCache) sweepDirectory(ctx context.Context) (err error) { return nil } - t0 := time.Now() + t0 := time.Now() // allow:no-inject-time var h contentMetadataHeap @@ -199,7 +199,7 @@ func (c *contentCache) sweepDirectory(ctx context.Context) (err error) { return errors.Wrap(err, "error listing cache") } - log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) + log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time c.lastTotalSizeBytes = totalRetainedSize return nil diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 9045b22a0..2e5745347 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -568,17 +568,28 @@ func (bm *Manager) Refresh(ctx context.Context) (bool, error) { log(ctx).Debugf("Refresh started") - t0 := time.Now() + t0 := time.Now() // allow:no-inject-time _, updated, err := bm.loadPackIndexesUnlocked(ctx) - log(ctx).Debugf("Refresh completed in %v and updated=%v", time.Since(t0), updated) + log(ctx).Debugf("Refresh completed in %v and updated=%v", time.Since(t0), updated) // allow:no-inject-time return updated, err } +// ManagerOptions are the optional parameters for manager creation +type ManagerOptions struct { + RepositoryFormatBytes []byte + TimeNow func() time.Time // Time provider +} + // NewManager creates new content manager with given packing options and a formatter. -func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, repositoryFormatBytes []byte) (*Manager, error) { - return newManagerWithOptions(ctx, st, f, caching, time.Now, repositoryFormatBytes) +func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, options ManagerOptions) (*Manager, error) { + nowFn := options.TimeNow + if nowFn == nil { + nowFn = time.Now // allow:no-inject-time + } + + return newManagerWithOptions(ctx, st, f, caching, nowFn, options.RepositoryFormatBytes) } func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) { diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 6c4bc67bf..c4e5317a6 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -19,6 +19,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/faketime" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" ) @@ -228,7 +229,7 @@ func TestContentManagerWriteMultiple(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} - timeFunc := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second) + timeFunc := faketime.AutoAdvance(fakeTime, 1*time.Second) bm := newTestContentManager(t, data, keyTime, timeFunc) var contentIDs []ID @@ -286,7 +287,7 @@ func TestContentManagerFailedToWritePack(t *testing.T) { MaxPackSize: maxPackSize, HMACSecret: []byte("foo"), MasterKey: []byte("0123456789abcdef0123456789abcdef"), - }, CachingOptions{}, fakeTimeNowFrozen(fakeTime), nil) + }, CachingOptions{}, faketime.Frozen(fakeTime), nil) if err != nil { t.Fatalf("can't create bm: %v", err) } @@ -320,7 +321,7 @@ func TestContentManagerConcurrency(t *testing.T) { dumpContentManagerData(ctx, t, data) bm1 := newTestContentManager(t, data, keyTime, nil) bm2 := newTestContentManager(t, data, keyTime, nil) - bm3 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1*time.Second)) + bm3 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(fakeTime.Add(1), 1*time.Second)) // all bm* can see pre-existing content verifyContent(ctx, t, bm1, preexistingContent, seededRandomData(10, 100)) @@ -708,7 +709,7 @@ func TestRewriteNonDeleted(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} - fakeNow := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second) + fakeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) bm := newTestContentManager(t, data, keyTime, fakeNow) applyStep := func(action int) { @@ -774,7 +775,7 @@ func TestRewriteDeleted(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} - fakeNow := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second) + fakeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) bm := newTestContentManager(t, data, keyTime, fakeNow) applyStep := func(action int) { @@ -826,22 +827,22 @@ func TestDeleteAndRecreate(t *testing.T) { // write a content data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} - bm := newTestContentManager(t, data, keyTime, fakeTimeNowFrozen(fakeTime)) + bm := newTestContentManager(t, data, keyTime, faketime.Frozen(fakeTime)) content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100)) bm.Flush(ctx) // delete but at given timestamp but don't commit yet. - bm0 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1*time.Second)) + bm0 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(tc.deletionTime, 1*time.Second)) assertNoError(t, bm0.DeleteContent(ctx, content1)) // delete it at t0+10 - bm1 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10*time.Second), 1*time.Second)) + bm1 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(fakeTime.Add(10*time.Second), 1*time.Second)) verifyContent(ctx, t, bm1, content1, seededRandomData(10, 100)) assertNoError(t, bm1.DeleteContent(ctx, content1)) bm1.Flush(ctx) // recreate at t0+20 - bm2 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20*time.Second), 1*time.Second)) + bm2 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(fakeTime.Add(20*time.Second), 1*time.Second)) content2 := writeContentAndVerify(ctx, t, bm2, seededRandomData(10, 100)) bm2.Flush(ctx) @@ -1121,7 +1122,7 @@ func TestContentWriteAliasing(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} - bm := newTestContentManager(t, data, keyTime, fakeTimeNowFrozen(fakeTime)) + bm := newTestContentManager(t, data, keyTime, faketime.Frozen(fakeTime)) contentData := []byte{100, 0, 0} id1 := writeContentAndVerify(ctx, t, bm, contentData) @@ -1147,7 +1148,7 @@ func TestContentReadAliasing(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} - bm := newTestContentManager(t, data, keyTime, fakeTimeNowFrozen(fakeTime)) + bm := newTestContentManager(t, data, keyTime, faketime.Frozen(fakeTime)) contentData := []byte{100, 0, 0} id1 := writeContentAndVerify(ctx, t, bm, contentData) @@ -1257,7 +1258,7 @@ func newTestContentManager(t *testing.T, data blobtesting.DataMap, keyTime map[b func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc func() time.Time) *Manager { if timeFunc == nil { - timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second) + timeFunc = faketime.AutoAdvance(fakeTime, 1*time.Second) } bm, err := newManagerWithOptions(testlogging.Context(t), st, &FormattingOptions{ @@ -1288,24 +1289,6 @@ func getIndexCount(d blobtesting.DataMap) int { return cnt } -func fakeTimeNowFrozen(t time.Time) func() time.Time { - return fakeTimeNowWithAutoAdvance(t, 0) -} - -func fakeTimeNowWithAutoAdvance(t time.Time, dt time.Duration) func() time.Time { - var mu sync.Mutex - - return func() time.Time { - mu.Lock() - defer mu.Unlock() - - ret := t - t = t.Add(dt) - - return ret - } -} - func verifyContentNotFound(ctx context.Context, t *testing.T, bm *Manager, contentID ID) { t.Helper() diff --git a/repo/content/list_cache.go b/repo/content/list_cache.go index 2839ddf5b..3fc931ad9 100644 --- a/repo/content/list_cache.go +++ b/repo/content/list_cache.go @@ -28,7 +28,7 @@ func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) ci, err := c.readContentsFromCache(ctx) if err == nil { expirationTime := ci.Timestamp.Add(c.listCacheDuration) - if time.Now().Before(expirationTime) { + if time.Now().Before(expirationTime) { // allow:no-inject-time log(ctx).Debugf("retrieved list of index blobs from cache") return ci.Contents, nil } @@ -41,7 +41,7 @@ func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) if err == nil { c.saveListToCache(ctx, &cachedList{ Contents: contents, - Timestamp: time.Now(), + Timestamp: time.Now(), // allow:no-inject-time }) } @@ -58,7 +58,7 @@ func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) { log(ctx).Debugf("saving index blobs to cache: %v", len(ci.Contents)) if data, err := json.Marshal(ci); err == nil { - mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano()) + mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano()) // allow:no-inject-time if err := ioutil.WriteFile(c.cacheFile+mySuffix, hmac.Append(data, c.hmacSecret), 0600); err != nil { log(ctx).Warningf("unable to write list cache: %v", err) } diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index ab613720a..ddb619d76 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -55,6 +55,8 @@ type Manager struct { committedEntries map[ID]*manifestEntry committedContentIDs map[content.ID]bool + + timeNow func() time.Time // Time provider } // Put serializes the provided payload to JSON and persists it. Returns unique identifier that represents the manifest. @@ -82,7 +84,7 @@ func (m *Manager) Put(ctx context.Context, labels map[string]string, payload int e := &manifestEntry{ ID: ID(hex.EncodeToString(random)), - ModTime: time.Now().UTC(), + ModTime: m.timeNow().UTC(), Labels: copyLabels(labels), Content: b, } @@ -276,7 +278,7 @@ func (m *Manager) Delete(ctx context.Context, id ID) error { m.pendingEntries[id] = &manifestEntry{ ID: id, - ModTime: time.Now().UTC(), + ModTime: m.timeNow().UTC(), Deleted: true, } @@ -483,13 +485,24 @@ func copyLabels(m map[string]string) map[string]string { return r } +// ManagerOptions are optional parameters for Manager creation +type ManagerOptions struct { + TimeNow func() time.Time // Time provider +} + // NewManager returns new manifest manager for the provided content manager. -func NewManager(ctx context.Context, b contentManager) (*Manager, error) { +func NewManager(ctx context.Context, b contentManager, options ManagerOptions) (*Manager, error) { + timeNow := options.TimeNow + if timeNow == nil { + timeNow = time.Now // allow:no-inject-time + } + m := &Manager{ b: b, pendingEntries: map[ID]*manifestEntry{}, committedEntries: map[ID]*manifestEntry{}, committedContentIDs: map[content.ID]bool{}, + timeNow: timeNow, } return m, nil diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 0329b86a3..f251478d8 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -146,12 +146,12 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // write some data to storage - bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, nil) + bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } - mgr, err := NewManager(ctx, bm) + mgr, err := NewManager(ctx, bm, ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } @@ -172,12 +172,12 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // make a new content manager based on corrupted data. - bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, nil) + bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } - mgr, err = NewManager(ctx, bm) + mgr, err = NewManager(ctx, bm, ManagerOptions{}) if err != nil { t.Fatalf("err: %v", err) } @@ -296,12 +296,12 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da Encryption: encryption.NoneAlgorithm, MaxPackSize: 100000, Version: 1, - }, content.CachingOptions{}, nil) + }, content.CachingOptions{}, content.ManagerOptions{}) if err != nil { t.Fatalf("can't create content manager: %v", err) } - mm, err := NewManager(ctx, bm) + mm, err := NewManager(ctx, bm, ManagerOptions{}) if err != nil { t.Fatalf("can't create manifest manager: %v", err) } diff --git a/repo/open.go b/repo/open.go index ae5428aac..74012c055 100644 --- a/repo/open.go +++ b/repo/open.go @@ -5,6 +5,7 @@ "encoding/json" "io/ioutil" "path/filepath" + "time" "github.com/pkg/errors" @@ -22,6 +23,7 @@ type Options struct { TraceStorage func(f string, args ...interface{}) // Logs all storage access using provided Printf-style function ObjectManagerOptions object.ManagerOptions + TimeNowFunc func() time.Time // Time provider } // ErrInvalidPassword is returned when repository password is invalid. @@ -117,7 +119,12 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw fo.MaxPackSize = 20 << 20 // nolint:gomnd } - cm, err := content.NewManager(ctx, st, fo, caching, fb) + cmOpts := content.ManagerOptions{ + RepositoryFormatBytes: fb, + TimeNow: defaultTime(options.TimeNowFunc), + } + + cm, err := content.NewManager(ctx, st, fo, caching, cmOpts) if err != nil { return nil, errors.Wrap(err, "unable to open content manager") } @@ -127,7 +134,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw return nil, errors.Wrap(err, "unable to open object manager") } - manifests, err := manifest.NewManager(ctx, cm) + manifests, err := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}) if err != nil { return nil, errors.Wrap(err, "unable to open manifests") } @@ -141,6 +148,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw formatBlob: f, masterKey: masterKey, + timeNow: cmOpts.TimeNow, }, nil } diff --git a/repo/repository.go b/repo/repository.go index ce3e8ac25..46b8bffaa 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -25,6 +25,7 @@ type Repository struct { Hostname string // connected (localhost) hostname Username string // connected username + timeNow func() time.Time formatBlob *formatBlob masterKey []byte } @@ -91,3 +92,16 @@ func (r *Repository) RefreshPeriodically(ctx context.Context, interval time.Dura } } } + +// Time returns the current local time for the repo +func (r *Repository) Time() time.Time { + return defaultTime(r.timeNow)() +} + +func defaultTime(f func() time.Time) func() time.Time { + if f != nil { + return f + } + + return time.Now // allow:no-inject-time +} diff --git a/snapshot/gc/gc.go b/snapshot/gc/gc.go index 4261afcc9..88d2ead98 100644 --- a/snapshot/gc/gc.go +++ b/snapshot/gc/gc.go @@ -95,7 +95,7 @@ func Run(ctx context.Context, rep *repo.Repository, minContentAge time.Duration, } if _, ok := used.Load(ci.ID); !ok { - if time.Since(ci.Timestamp()) < minContentAge { + if rep.Time().Sub(ci.Timestamp()) < minContentAge { log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ID, ci.Length, ci.Timestamp()) atomic.AddInt32(&tooRecentCount, 1) atomic.AddInt64(&totalTooRecentBytes, int64(ci.Length)) diff --git a/snapshot/policy/retention_policy.go b/snapshot/policy/retention_policy.go index 602b9934d..bd6420614 100644 --- a/snapshot/policy/retention_policy.go +++ b/snapshot/policy/retention_policy.go @@ -20,7 +20,7 @@ type RetentionPolicy struct { // ComputeRetentionReasons computes the reasons why each snapshot is retained, based on // the settings in retention policy and stores them in RetentionReason field. func (r *RetentionPolicy) ComputeRetentionReasons(manifests []*snapshot.Manifest) { - now := time.Now() + now := time.Now() // allow:no-inject-time maxTime := now.Add(365 * 24 * time.Hour) cutoffTime := func(setting *int, add func(time.Time, int) time.Time) time.Time { diff --git a/snapshot/snapshotfs/all_sources.go b/snapshot/snapshotfs/all_sources.go index 1ccfc40ce..abe5ca6ec 100644 --- a/snapshot/snapshotfs/all_sources.go +++ b/snapshot/snapshotfs/all_sources.go @@ -28,7 +28,7 @@ func (s *repositoryAllSources) Name() string { } func (s *repositoryAllSources) ModTime() time.Time { - return time.Now() + return s.rep.Time() } func (s *repositoryAllSources) Mode() os.FileMode { diff --git a/snapshot/snapshotfs/source_directories.go b/snapshot/snapshotfs/source_directories.go index 8512a1555..8e9cdf10f 100644 --- a/snapshot/snapshotfs/source_directories.go +++ b/snapshot/snapshotfs/source_directories.go @@ -28,7 +28,7 @@ func (s *sourceDirectories) Mode() os.FileMode { } func (s *sourceDirectories) ModTime() time.Time { - return time.Now() + return s.rep.Time() } func (s *sourceDirectories) Sys() interface{} { diff --git a/snapshot/snapshotfs/source_snapshots.go b/snapshot/snapshotfs/source_snapshots.go index 2cf1da09d..a7bad246c 100644 --- a/snapshot/snapshotfs/source_snapshots.go +++ b/snapshot/snapshotfs/source_snapshots.go @@ -40,7 +40,7 @@ func (s *sourceSnapshots) Sys() interface{} { } func (s *sourceSnapshots) ModTime() time.Time { - return time.Now() + return s.rep.Time() } func (s *sourceSnapshots) Owner() fs.OwnerInfo { diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index e26e2583b..cab5b1fd8 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -11,7 +11,6 @@ "runtime" "sync" "sync/atomic" - "time" "github.com/pkg/errors" @@ -749,7 +748,7 @@ func (u *Uploader) Upload( var err error - s.StartTime = time.Now() + s.StartTime = u.repo.Time() switch entry := source.(type) { case fs.Directory: @@ -778,7 +777,7 @@ func (u *Uploader) Upload( } s.IncompleteReason = u.cancelReason() - s.EndTime = time.Now() + s.EndTime = u.repo.Time() s.Stats = u.stats return s, nil diff --git a/tests/stress_test/stress_test.go b/tests/stress_test/stress_test.go index dfdc9a4aa..c51784214 100644 --- a/tests/stress_test/stress_test.go +++ b/tests/stress_test/stress_test.go @@ -44,7 +44,7 @@ func stressTestWithStorage(t *testing.T, st blob.Storage, duration time.Duration Encryption: "AES-256-CTR", MaxPackSize: 20000000, MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, - }, content.CachingOptions{}, nil) + }, content.CachingOptions{}, content.ManagerOptions{}) } seed0 := time.Now().Nanosecond()