mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 14:44:47 -04:00
Inject time in Kopia components (#314)
Motivation: Allow time injection for (unit) tests, to more easily test and verify time-dependent invariants. Add time injection support for: * repo.Manager * manifest.Manager * snapshot.Uploader Then, wire up to these components. The content.Manager already had support for time injection, but was not wired up from the time function passed to repo creation. Add an internal/faketime package for testing. Mainly code movement from testing code in the repo/content package. Motivation: make it available to other packages outside content Also, add simple tests for faketime functions.
This commit is contained in:
8
Makefile
8
Makefile
@@ -44,7 +44,13 @@ lint: $(linter)
|
||||
lint-and-log: $(linter)
|
||||
$(linter) --deadline 180s run $(linter_flags) | tee .linterr.txt
|
||||
|
||||
vet:
|
||||
|
||||
vet-time-inject:
|
||||
! find repo snapshot -name '*.go' -not -path 'repo/blob/logging/*' -not -name '*_test.go' \
|
||||
-exec grep -n -e time.Now -e time.Since -e time.Until {} + \
|
||||
| grep -v -e allow:no-inject-time
|
||||
|
||||
vet: vet-time-inject
|
||||
go vet -all .
|
||||
|
||||
travis-setup: travis-install-gpg-key travis-install-test-credentials all-tools
|
||||
|
||||
62
internal/faketime/faketime.go
Normal file
62
internal/faketime/faketime.go
Normal file
@@ -0,0 +1,62 @@
|
||||
// Package faketime fakes time for tests
|
||||
package faketime
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Frozen returns a function that always returns t
|
||||
func Frozen(t time.Time) func() time.Time {
|
||||
return func() time.Time {
|
||||
return t
|
||||
}
|
||||
}
|
||||
|
||||
// AutoAdvance returns a time source function that returns a time equal to
|
||||
// 't + ((n - 1) * dt)' wheren n is the number of serialized invocations of
|
||||
// the returned function. The returned function will generate a time series of
|
||||
// the form [t, t+dt, t+2dt, t+3dt, ...]
|
||||
func AutoAdvance(t time.Time, dt time.Duration) func() time.Time {
|
||||
var mu sync.Mutex
|
||||
|
||||
return func() time.Time {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
ret := t
|
||||
t = t.Add(dt)
|
||||
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
// TimeAdvance allows controlling the passage of time. Intended to be used in
|
||||
// tests.
|
||||
type TimeAdvance struct {
|
||||
base time.Time
|
||||
delta int64
|
||||
}
|
||||
|
||||
// NewTimeAdvance creates a TimeAdvance with the given start time
|
||||
func NewTimeAdvance(start time.Time) *TimeAdvance {
|
||||
return &TimeAdvance{base: start}
|
||||
}
|
||||
|
||||
// NowFunc returns a time provider function for t
|
||||
func (t *TimeAdvance) NowFunc() func() time.Time {
|
||||
return func() time.Time {
|
||||
dt := atomic.LoadInt64(&t.delta)
|
||||
|
||||
return t.base.Add(time.Duration(dt))
|
||||
}
|
||||
}
|
||||
|
||||
// Advance advances t by dt, such that the next call to t.NowFunc()() returns
|
||||
// current t + dt
|
||||
func (t *TimeAdvance) Advance(dt time.Duration) time.Time {
|
||||
advance := atomic.AddInt64(&t.delta, int64(dt))
|
||||
|
||||
return t.base.Add(time.Duration(advance))
|
||||
}
|
||||
91
internal/faketime/faketime_test.go
Normal file
91
internal/faketime/faketime_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package faketime
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFrozen(t *testing.T) {
|
||||
times := []time.Time{
|
||||
time.Date(2015, 1, 3, 0, 0, 0, 0, time.UTC),
|
||||
time.Now(),
|
||||
}
|
||||
|
||||
for _, tm := range times {
|
||||
timeNow := Frozen(tm)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
if want, got := tm, timeNow(); got != want {
|
||||
t.Fatalf("Invalid frozen time, got: %v, want: %v", got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutoAdvance(t *testing.T) {
|
||||
const (
|
||||
goRoutinesCount = 3
|
||||
iterations = 20
|
||||
)
|
||||
|
||||
startTime := time.Date(2018, 1, 6, 0, 0, 0, 0, time.UTC)
|
||||
timeNow := AutoAdvance(startTime, 10*time.Second)
|
||||
tchan := make(chan time.Time, 2*goRoutinesCount)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(goRoutinesCount)
|
||||
|
||||
for i := 0; i < goRoutinesCount; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
times := make([]time.Time, iterations)
|
||||
|
||||
for j := 0; j < iterations; j++ {
|
||||
times[j] = timeNow()
|
||||
}
|
||||
|
||||
for _, ts := range times {
|
||||
tchan <- ts
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(tchan)
|
||||
}()
|
||||
|
||||
tMap := make(map[time.Time]struct{}, iterations*goRoutinesCount)
|
||||
|
||||
for ts := range tchan {
|
||||
if _, ok := tMap[ts]; ok {
|
||||
t.Error("Found repeated time value: ", ts)
|
||||
}
|
||||
|
||||
tMap[ts] = struct{}{}
|
||||
}
|
||||
|
||||
if got, want := len(tMap), goRoutinesCount*iterations; got != want {
|
||||
t.Fatalf("number of generated times does not match, got: %v, want: %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimeAdvance(t *testing.T) {
|
||||
startTime := time.Date(2019, 1, 6, 0, 0, 0, 0, time.UTC)
|
||||
ta := NewTimeAdvance(startTime)
|
||||
now := ta.NowFunc()
|
||||
|
||||
if got, want := now(), startTime; got != want {
|
||||
t.Errorf("expected time does not match, got: %v, want: %v", got, want)
|
||||
}
|
||||
|
||||
dt := 5 * time.Minute
|
||||
ta.Advance(dt)
|
||||
|
||||
if got, want := now(), startTime.Add(dt); got != want {
|
||||
t.Errorf("expected time does not match, got: %v, want: %v", got, want)
|
||||
}
|
||||
}
|
||||
97
internal/repotesting/repotesting_test.go
Normal file
97
internal/repotesting/repotesting_test.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package repotesting
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/internal/faketime"
|
||||
"github.com/kopia/kopia/internal/mockfs"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
)
|
||||
|
||||
func TestTimeFuncWiring(t *testing.T) {
|
||||
var env Environment
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
defer env.Setup(t).Close(ctx, t)
|
||||
|
||||
env.Repository.Close(ctx)
|
||||
|
||||
ft := faketime.NewTimeAdvance(time.Date(2018, time.February, 6, 0, 0, 0, 0, time.UTC))
|
||||
|
||||
// Re open with injected time
|
||||
r, err := repo.Open(ctx, env.Repository.ConfigFile, masterPassword, &repo.Options{TimeNowFunc: ft.NowFunc()})
|
||||
if err != nil {
|
||||
t.Fatal("Failed to open repo:", err)
|
||||
}
|
||||
|
||||
env.Repository = r
|
||||
|
||||
// verify wiring for the repo layer
|
||||
if got, want := r.Time(), ft.NowFunc()(); !got.Equal(want) {
|
||||
t.Errorf("times don't match, got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if want, got := ft.Advance(10*time.Minute), r.Time(); !got.Equal(want) {
|
||||
t.Errorf("times don't match, got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
// verify wiring for the content layer
|
||||
nt := ft.Advance(20 * time.Second)
|
||||
|
||||
cid, err := r.Content.WriteContent(ctx, []byte("foo"), "")
|
||||
if err != nil {
|
||||
t.Fatal("failed to write content:", err)
|
||||
}
|
||||
|
||||
info, err := r.Content.ContentInfo(ctx, cid)
|
||||
if err != nil {
|
||||
t.Fatal("failed to get content info for", cid, err)
|
||||
}
|
||||
|
||||
if got, want := info.Timestamp(), nt; !got.Equal(want) {
|
||||
t.Errorf("content time does not match, got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
// verify wiring for the manifest layer
|
||||
nt = ft.Advance(3 * time.Minute)
|
||||
labels := map[string]string{"l1": "v1", "l2": "v2", "type": "my-manifest"}
|
||||
mid, err := r.Manifests.Put(ctx, labels, "manifest content")
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("failed to put manifest:", err)
|
||||
}
|
||||
|
||||
meta, err := r.Manifests.GetMetadata(ctx, mid)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("failed to get manifest metadata:", err)
|
||||
}
|
||||
|
||||
if got, want := meta.ModTime, nt; !got.Equal(want) {
|
||||
t.Errorf("manifest time does not match, got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
const defaultPermissions = 0777
|
||||
|
||||
// verify wiring for the snapshot layer
|
||||
sourceDir := mockfs.NewDirectory()
|
||||
sourceDir.AddFile("f1", []byte{1, 2, 3}, defaultPermissions)
|
||||
|
||||
nt = ft.Advance(1 * time.Hour)
|
||||
u := snapshotfs.NewUploader(r)
|
||||
policyTree := policy.BuildTree(nil, policy.DefaultPolicy)
|
||||
s1, err := u.Upload(ctx, sourceDir, policyTree, snapshot.SourceInfo{})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("failed to create snapshot:", err)
|
||||
}
|
||||
|
||||
if got, want := nt, s1.StartTime; !got.Equal(want) {
|
||||
t.Fatalf("snapshot time does not match, got: %v, want: %v", got, want)
|
||||
}
|
||||
}
|
||||
@@ -238,7 +238,7 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) {
|
||||
|
||||
// verify Azure connection is functional by listing blobs in a bucket, which will fail if the container
|
||||
// does not exist. We list with a prefix that will not exist, to avoid iterating through any objects.
|
||||
nonExistentPrefix := fmt.Sprintf("kopia-azure-storage-initializing-%v", time.Now().UnixNano())
|
||||
nonExistentPrefix := fmt.Sprintf("kopia-azure-storage-initializing-%v", time.Now().UnixNano()) // allow:no-inject-time
|
||||
err = az.ListBlobs(ctx, blob.ID(nonExistentPrefix), func(md blob.Metadata) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -193,7 +193,7 @@ func (fs *fsStorage) TouchBlob(ctx context.Context, blobID blob.ID, threshold ti
|
||||
return err
|
||||
}
|
||||
|
||||
n := time.Now()
|
||||
n := time.Now() // allow:no-inject-time
|
||||
|
||||
age := n.Sub(st.ModTime())
|
||||
if age < threshold {
|
||||
|
||||
@@ -277,7 +277,7 @@ func New(ctx context.Context, opt *Options) (blob.Storage, error) {
|
||||
|
||||
// verify GCS connection is functional by listing blobs in a bucket, which will fail if the bucket
|
||||
// does not exist. We list with a prefix that will not exist, to avoid iterating through any objects.
|
||||
nonExistentPrefix := fmt.Sprintf("kopia-gcs-storage-initializing-%v", time.Now().UnixNano())
|
||||
nonExistentPrefix := fmt.Sprintf("kopia-gcs-storage-initializing-%v", time.Now().UnixNano()) // allow:no-inject-time
|
||||
err = gcs.ListBlobs(ctx, blob.ID(nonExistentPrefix), func(md blob.Metadata) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -89,7 +89,7 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs []
|
||||
|
||||
formatLog(ctx).Debugf("compacting %v contents", len(indexBlobs))
|
||||
|
||||
t0 := time.Now()
|
||||
t0 := time.Now() // allow:no-inject-time
|
||||
bld := make(packIndexBuilder)
|
||||
|
||||
for _, indexBlob := range indexBlobs {
|
||||
@@ -108,7 +108,7 @@ func (bm *Manager) compactAndDeleteIndexBlobs(ctx context.Context, indexBlobs []
|
||||
return errors.Wrap(err, "unable to write compacted indexes")
|
||||
}
|
||||
|
||||
formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0))
|
||||
formatLog(ctx).Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlob, time.Since(t0)) // allow:no-inject-time
|
||||
|
||||
for _, indexBlob := range indexBlobs {
|
||||
if indexBlob.BlobID == compactedIndexBlob {
|
||||
@@ -137,7 +137,7 @@ func (bm *Manager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuil
|
||||
}
|
||||
|
||||
_ = index.Iterate("", func(i Info) error {
|
||||
if i.Deleted && opt.SkipDeletedOlderThan > 0 && time.Since(i.Timestamp()) > opt.SkipDeletedOlderThan {
|
||||
if i.Deleted && opt.SkipDeletedOlderThan > 0 && bm.timeNow().Sub(i.Timestamp()) > opt.SkipDeletedOlderThan {
|
||||
log(ctx).Debugf("skipping content %v deleted at %v", i.ID, i.Timestamp())
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used
|
||||
}
|
||||
|
||||
for _, rem := range remaining {
|
||||
if time.Since(rem.ModTime()) > unusedCommittedContentIndexCleanupTime {
|
||||
if time.Since(rem.ModTime()) > unusedCommittedContentIndexCleanupTime { // allow:no-inject-time
|
||||
log(ctx).Debugf("removing unused %v %v", rem.Name(), rem.ModTime())
|
||||
|
||||
if err := os.Remove(filepath.Join(c.dirname, rem.Name())); err != nil {
|
||||
|
||||
@@ -175,7 +175,7 @@ func (c *contentCache) sweepDirectory(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
t0 := time.Now()
|
||||
t0 := time.Now() // allow:no-inject-time
|
||||
|
||||
var h contentMetadataHeap
|
||||
|
||||
@@ -199,7 +199,7 @@ func (c *contentCache) sweepDirectory(ctx context.Context) (err error) {
|
||||
return errors.Wrap(err, "error listing cache")
|
||||
}
|
||||
|
||||
log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes)
|
||||
log(ctx).Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes) // allow:no-inject-time
|
||||
c.lastTotalSizeBytes = totalRetainedSize
|
||||
|
||||
return nil
|
||||
|
||||
@@ -568,17 +568,28 @@ func (bm *Manager) Refresh(ctx context.Context) (bool, error) {
|
||||
|
||||
log(ctx).Debugf("Refresh started")
|
||||
|
||||
t0 := time.Now()
|
||||
t0 := time.Now() // allow:no-inject-time
|
||||
|
||||
_, updated, err := bm.loadPackIndexesUnlocked(ctx)
|
||||
log(ctx).Debugf("Refresh completed in %v and updated=%v", time.Since(t0), updated)
|
||||
log(ctx).Debugf("Refresh completed in %v and updated=%v", time.Since(t0), updated) // allow:no-inject-time
|
||||
|
||||
return updated, err
|
||||
}
|
||||
|
||||
// ManagerOptions are the optional parameters for manager creation
|
||||
type ManagerOptions struct {
|
||||
RepositoryFormatBytes []byte
|
||||
TimeNow func() time.Time // Time provider
|
||||
}
|
||||
|
||||
// NewManager creates new content manager with given packing options and a formatter.
|
||||
func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, repositoryFormatBytes []byte) (*Manager, error) {
|
||||
return newManagerWithOptions(ctx, st, f, caching, time.Now, repositoryFormatBytes)
|
||||
func NewManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, options ManagerOptions) (*Manager, error) {
|
||||
nowFn := options.TimeNow
|
||||
if nowFn == nil {
|
||||
nowFn = time.Now // allow:no-inject-time
|
||||
}
|
||||
|
||||
return newManagerWithOptions(ctx, st, f, caching, nowFn, options.RepositoryFormatBytes)
|
||||
}
|
||||
|
||||
func newManagerWithOptions(ctx context.Context, st blob.Storage, f *FormattingOptions, caching CachingOptions, timeNow func() time.Time, repositoryFormatBytes []byte) (*Manager, error) {
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/faketime"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
@@ -228,7 +229,7 @@ func TestContentManagerWriteMultiple(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
data := blobtesting.DataMap{}
|
||||
keyTime := map[blob.ID]time.Time{}
|
||||
timeFunc := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
|
||||
timeFunc := faketime.AutoAdvance(fakeTime, 1*time.Second)
|
||||
bm := newTestContentManager(t, data, keyTime, timeFunc)
|
||||
|
||||
var contentIDs []ID
|
||||
@@ -286,7 +287,7 @@ func TestContentManagerFailedToWritePack(t *testing.T) {
|
||||
MaxPackSize: maxPackSize,
|
||||
HMACSecret: []byte("foo"),
|
||||
MasterKey: []byte("0123456789abcdef0123456789abcdef"),
|
||||
}, CachingOptions{}, fakeTimeNowFrozen(fakeTime), nil)
|
||||
}, CachingOptions{}, faketime.Frozen(fakeTime), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("can't create bm: %v", err)
|
||||
}
|
||||
@@ -320,7 +321,7 @@ func TestContentManagerConcurrency(t *testing.T) {
|
||||
dumpContentManagerData(ctx, t, data)
|
||||
bm1 := newTestContentManager(t, data, keyTime, nil)
|
||||
bm2 := newTestContentManager(t, data, keyTime, nil)
|
||||
bm3 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1*time.Second))
|
||||
bm3 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(fakeTime.Add(1), 1*time.Second))
|
||||
|
||||
// all bm* can see pre-existing content
|
||||
verifyContent(ctx, t, bm1, preexistingContent, seededRandomData(10, 100))
|
||||
@@ -708,7 +709,7 @@ func TestRewriteNonDeleted(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
data := blobtesting.DataMap{}
|
||||
keyTime := map[blob.ID]time.Time{}
|
||||
fakeNow := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
|
||||
fakeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
|
||||
bm := newTestContentManager(t, data, keyTime, fakeNow)
|
||||
|
||||
applyStep := func(action int) {
|
||||
@@ -774,7 +775,7 @@ func TestRewriteDeleted(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
data := blobtesting.DataMap{}
|
||||
keyTime := map[blob.ID]time.Time{}
|
||||
fakeNow := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
|
||||
fakeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
|
||||
bm := newTestContentManager(t, data, keyTime, fakeNow)
|
||||
|
||||
applyStep := func(action int) {
|
||||
@@ -826,22 +827,22 @@ func TestDeleteAndRecreate(t *testing.T) {
|
||||
// write a content
|
||||
data := blobtesting.DataMap{}
|
||||
keyTime := map[blob.ID]time.Time{}
|
||||
bm := newTestContentManager(t, data, keyTime, fakeTimeNowFrozen(fakeTime))
|
||||
bm := newTestContentManager(t, data, keyTime, faketime.Frozen(fakeTime))
|
||||
content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100))
|
||||
bm.Flush(ctx)
|
||||
|
||||
// delete but at given timestamp but don't commit yet.
|
||||
bm0 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1*time.Second))
|
||||
bm0 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(tc.deletionTime, 1*time.Second))
|
||||
assertNoError(t, bm0.DeleteContent(ctx, content1))
|
||||
|
||||
// delete it at t0+10
|
||||
bm1 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10*time.Second), 1*time.Second))
|
||||
bm1 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(fakeTime.Add(10*time.Second), 1*time.Second))
|
||||
verifyContent(ctx, t, bm1, content1, seededRandomData(10, 100))
|
||||
assertNoError(t, bm1.DeleteContent(ctx, content1))
|
||||
bm1.Flush(ctx)
|
||||
|
||||
// recreate at t0+20
|
||||
bm2 := newTestContentManager(t, data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20*time.Second), 1*time.Second))
|
||||
bm2 := newTestContentManager(t, data, keyTime, faketime.AutoAdvance(fakeTime.Add(20*time.Second), 1*time.Second))
|
||||
content2 := writeContentAndVerify(ctx, t, bm2, seededRandomData(10, 100))
|
||||
bm2.Flush(ctx)
|
||||
|
||||
@@ -1121,7 +1122,7 @@ func TestContentWriteAliasing(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
data := blobtesting.DataMap{}
|
||||
keyTime := map[blob.ID]time.Time{}
|
||||
bm := newTestContentManager(t, data, keyTime, fakeTimeNowFrozen(fakeTime))
|
||||
bm := newTestContentManager(t, data, keyTime, faketime.Frozen(fakeTime))
|
||||
|
||||
contentData := []byte{100, 0, 0}
|
||||
id1 := writeContentAndVerify(ctx, t, bm, contentData)
|
||||
@@ -1147,7 +1148,7 @@ func TestContentReadAliasing(t *testing.T) {
|
||||
ctx := testlogging.Context(t)
|
||||
data := blobtesting.DataMap{}
|
||||
keyTime := map[blob.ID]time.Time{}
|
||||
bm := newTestContentManager(t, data, keyTime, fakeTimeNowFrozen(fakeTime))
|
||||
bm := newTestContentManager(t, data, keyTime, faketime.Frozen(fakeTime))
|
||||
|
||||
contentData := []byte{100, 0, 0}
|
||||
id1 := writeContentAndVerify(ctx, t, bm, contentData)
|
||||
@@ -1257,7 +1258,7 @@ func newTestContentManager(t *testing.T, data blobtesting.DataMap, keyTime map[b
|
||||
|
||||
func newTestContentManagerWithStorage(t *testing.T, st blob.Storage, timeFunc func() time.Time) *Manager {
|
||||
if timeFunc == nil {
|
||||
timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
|
||||
timeFunc = faketime.AutoAdvance(fakeTime, 1*time.Second)
|
||||
}
|
||||
|
||||
bm, err := newManagerWithOptions(testlogging.Context(t), st, &FormattingOptions{
|
||||
@@ -1288,24 +1289,6 @@ func getIndexCount(d blobtesting.DataMap) int {
|
||||
return cnt
|
||||
}
|
||||
|
||||
func fakeTimeNowFrozen(t time.Time) func() time.Time {
|
||||
return fakeTimeNowWithAutoAdvance(t, 0)
|
||||
}
|
||||
|
||||
func fakeTimeNowWithAutoAdvance(t time.Time, dt time.Duration) func() time.Time {
|
||||
var mu sync.Mutex
|
||||
|
||||
return func() time.Time {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
ret := t
|
||||
t = t.Add(dt)
|
||||
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
func verifyContentNotFound(ctx context.Context, t *testing.T, bm *Manager, contentID ID) {
|
||||
t.Helper()
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error)
|
||||
ci, err := c.readContentsFromCache(ctx)
|
||||
if err == nil {
|
||||
expirationTime := ci.Timestamp.Add(c.listCacheDuration)
|
||||
if time.Now().Before(expirationTime) {
|
||||
if time.Now().Before(expirationTime) { // allow:no-inject-time
|
||||
log(ctx).Debugf("retrieved list of index blobs from cache")
|
||||
return ci.Contents, nil
|
||||
}
|
||||
@@ -41,7 +41,7 @@ func (c *listCache) listIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error)
|
||||
if err == nil {
|
||||
c.saveListToCache(ctx, &cachedList{
|
||||
Contents: contents,
|
||||
Timestamp: time.Now(),
|
||||
Timestamp: time.Now(), // allow:no-inject-time
|
||||
})
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) {
|
||||
log(ctx).Debugf("saving index blobs to cache: %v", len(ci.Contents))
|
||||
|
||||
if data, err := json.Marshal(ci); err == nil {
|
||||
mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano())
|
||||
mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano()) // allow:no-inject-time
|
||||
if err := ioutil.WriteFile(c.cacheFile+mySuffix, hmac.Append(data, c.hmacSecret), 0600); err != nil {
|
||||
log(ctx).Warningf("unable to write list cache: %v", err)
|
||||
}
|
||||
|
||||
@@ -55,6 +55,8 @@ type Manager struct {
|
||||
|
||||
committedEntries map[ID]*manifestEntry
|
||||
committedContentIDs map[content.ID]bool
|
||||
|
||||
timeNow func() time.Time // Time provider
|
||||
}
|
||||
|
||||
// Put serializes the provided payload to JSON and persists it. Returns unique identifier that represents the manifest.
|
||||
@@ -82,7 +84,7 @@ func (m *Manager) Put(ctx context.Context, labels map[string]string, payload int
|
||||
|
||||
e := &manifestEntry{
|
||||
ID: ID(hex.EncodeToString(random)),
|
||||
ModTime: time.Now().UTC(),
|
||||
ModTime: m.timeNow().UTC(),
|
||||
Labels: copyLabels(labels),
|
||||
Content: b,
|
||||
}
|
||||
@@ -276,7 +278,7 @@ func (m *Manager) Delete(ctx context.Context, id ID) error {
|
||||
|
||||
m.pendingEntries[id] = &manifestEntry{
|
||||
ID: id,
|
||||
ModTime: time.Now().UTC(),
|
||||
ModTime: m.timeNow().UTC(),
|
||||
Deleted: true,
|
||||
}
|
||||
|
||||
@@ -483,13 +485,24 @@ func copyLabels(m map[string]string) map[string]string {
|
||||
return r
|
||||
}
|
||||
|
||||
// ManagerOptions are optional parameters for Manager creation
|
||||
type ManagerOptions struct {
|
||||
TimeNow func() time.Time // Time provider
|
||||
}
|
||||
|
||||
// NewManager returns new manifest manager for the provided content manager.
|
||||
func NewManager(ctx context.Context, b contentManager) (*Manager, error) {
|
||||
func NewManager(ctx context.Context, b contentManager, options ManagerOptions) (*Manager, error) {
|
||||
timeNow := options.TimeNow
|
||||
if timeNow == nil {
|
||||
timeNow = time.Now // allow:no-inject-time
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
b: b,
|
||||
pendingEntries: map[ID]*manifestEntry{},
|
||||
committedEntries: map[ID]*manifestEntry{},
|
||||
committedContentIDs: map[content.ID]bool{},
|
||||
timeNow: timeNow,
|
||||
}
|
||||
|
||||
return m, nil
|
||||
|
||||
@@ -146,12 +146,12 @@ func TestManifestInitCorruptedBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
// write some data to storage
|
||||
bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, nil)
|
||||
bm, err := content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
mgr, err := NewManager(ctx, bm)
|
||||
mgr, err := NewManager(ctx, bm, ManagerOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -172,12 +172,12 @@ func TestManifestInitCorruptedBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
// make a new content manager based on corrupted data.
|
||||
bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, nil)
|
||||
bm, err = content.NewManager(ctx, st, f, content.CachingOptions{}, content.ManagerOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
mgr, err = NewManager(ctx, bm)
|
||||
mgr, err = NewManager(ctx, bm, ManagerOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -296,12 +296,12 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da
|
||||
Encryption: encryption.NoneAlgorithm,
|
||||
MaxPackSize: 100000,
|
||||
Version: 1,
|
||||
}, content.CachingOptions{}, nil)
|
||||
}, content.CachingOptions{}, content.ManagerOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("can't create content manager: %v", err)
|
||||
}
|
||||
|
||||
mm, err := NewManager(ctx, bm)
|
||||
mm, err := NewManager(ctx, bm, ManagerOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("can't create manifest manager: %v", err)
|
||||
}
|
||||
|
||||
12
repo/open.go
12
repo/open.go
@@ -5,6 +5,7 @@
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -22,6 +23,7 @@
|
||||
type Options struct {
|
||||
TraceStorage func(f string, args ...interface{}) // Logs all storage access using provided Printf-style function
|
||||
ObjectManagerOptions object.ManagerOptions
|
||||
TimeNowFunc func() time.Time // Time provider
|
||||
}
|
||||
|
||||
// ErrInvalidPassword is returned when repository password is invalid.
|
||||
@@ -117,7 +119,12 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
|
||||
fo.MaxPackSize = 20 << 20 // nolint:gomnd
|
||||
}
|
||||
|
||||
cm, err := content.NewManager(ctx, st, fo, caching, fb)
|
||||
cmOpts := content.ManagerOptions{
|
||||
RepositoryFormatBytes: fb,
|
||||
TimeNow: defaultTime(options.TimeNowFunc),
|
||||
}
|
||||
|
||||
cm, err := content.NewManager(ctx, st, fo, caching, cmOpts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to open content manager")
|
||||
}
|
||||
@@ -127,7 +134,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
|
||||
return nil, errors.Wrap(err, "unable to open object manager")
|
||||
}
|
||||
|
||||
manifests, err := manifest.NewManager(ctx, cm)
|
||||
manifests, err := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to open manifests")
|
||||
}
|
||||
@@ -141,6 +148,7 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
|
||||
|
||||
formatBlob: f,
|
||||
masterKey: masterKey,
|
||||
timeNow: cmOpts.TimeNow,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ type Repository struct {
|
||||
Hostname string // connected (localhost) hostname
|
||||
Username string // connected username
|
||||
|
||||
timeNow func() time.Time
|
||||
formatBlob *formatBlob
|
||||
masterKey []byte
|
||||
}
|
||||
@@ -91,3 +92,16 @@ func (r *Repository) RefreshPeriodically(ctx context.Context, interval time.Dura
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Time returns the current local time for the repo
|
||||
func (r *Repository) Time() time.Time {
|
||||
return defaultTime(r.timeNow)()
|
||||
}
|
||||
|
||||
func defaultTime(f func() time.Time) func() time.Time {
|
||||
if f != nil {
|
||||
return f
|
||||
}
|
||||
|
||||
return time.Now // allow:no-inject-time
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func Run(ctx context.Context, rep *repo.Repository, minContentAge time.Duration,
|
||||
}
|
||||
|
||||
if _, ok := used.Load(ci.ID); !ok {
|
||||
if time.Since(ci.Timestamp()) < minContentAge {
|
||||
if rep.Time().Sub(ci.Timestamp()) < minContentAge {
|
||||
log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ID, ci.Length, ci.Timestamp())
|
||||
atomic.AddInt32(&tooRecentCount, 1)
|
||||
atomic.AddInt64(&totalTooRecentBytes, int64(ci.Length))
|
||||
|
||||
@@ -20,7 +20,7 @@ type RetentionPolicy struct {
|
||||
// ComputeRetentionReasons computes the reasons why each snapshot is retained, based on
|
||||
// the settings in retention policy and stores them in RetentionReason field.
|
||||
func (r *RetentionPolicy) ComputeRetentionReasons(manifests []*snapshot.Manifest) {
|
||||
now := time.Now()
|
||||
now := time.Now() // allow:no-inject-time
|
||||
maxTime := now.Add(365 * 24 * time.Hour)
|
||||
|
||||
cutoffTime := func(setting *int, add func(time.Time, int) time.Time) time.Time {
|
||||
|
||||
@@ -28,7 +28,7 @@ func (s *repositoryAllSources) Name() string {
|
||||
}
|
||||
|
||||
func (s *repositoryAllSources) ModTime() time.Time {
|
||||
return time.Now()
|
||||
return s.rep.Time()
|
||||
}
|
||||
|
||||
func (s *repositoryAllSources) Mode() os.FileMode {
|
||||
|
||||
@@ -28,7 +28,7 @@ func (s *sourceDirectories) Mode() os.FileMode {
|
||||
}
|
||||
|
||||
func (s *sourceDirectories) ModTime() time.Time {
|
||||
return time.Now()
|
||||
return s.rep.Time()
|
||||
}
|
||||
|
||||
func (s *sourceDirectories) Sys() interface{} {
|
||||
|
||||
@@ -40,7 +40,7 @@ func (s *sourceSnapshots) Sys() interface{} {
|
||||
}
|
||||
|
||||
func (s *sourceSnapshots) ModTime() time.Time {
|
||||
return time.Now()
|
||||
return s.rep.Time()
|
||||
}
|
||||
|
||||
func (s *sourceSnapshots) Owner() fs.OwnerInfo {
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -749,7 +748,7 @@ func (u *Uploader) Upload(
|
||||
|
||||
var err error
|
||||
|
||||
s.StartTime = time.Now()
|
||||
s.StartTime = u.repo.Time()
|
||||
|
||||
switch entry := source.(type) {
|
||||
case fs.Directory:
|
||||
@@ -778,7 +777,7 @@ func (u *Uploader) Upload(
|
||||
}
|
||||
|
||||
s.IncompleteReason = u.cancelReason()
|
||||
s.EndTime = time.Now()
|
||||
s.EndTime = u.repo.Time()
|
||||
s.Stats = u.stats
|
||||
|
||||
return s, nil
|
||||
|
||||
@@ -44,7 +44,7 @@ func stressTestWithStorage(t *testing.T, st blob.Storage, duration time.Duration
|
||||
Encryption: "AES-256-CTR",
|
||||
MaxPackSize: 20000000,
|
||||
MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
|
||||
}, content.CachingOptions{}, nil)
|
||||
}, content.CachingOptions{}, content.ManagerOptions{})
|
||||
}
|
||||
|
||||
seed0 := time.Now().Nanosecond()
|
||||
|
||||
Reference in New Issue
Block a user