fix(general): use a fixed time for protecting newly created content (#1994)

* test MinContentAgeSubjectToGC

* lint: move check for whether content is deleted to the caller to reduce gocycle complexity

* nit: add new lines before return

* log GC stats

* fix(maintenance): use a fixed time for protecting newly created content

Previously, the reference time used to determine whether a content had been
recently created would change througout a snapshot GC execution. For
long-running GC tasks, this non-deterministically shrinked the safety window
specified in `MinContentAgeSubjectToGC`.

Now, the snapshot GC starting time is used as a fix refererence for the
safety check.

* remove test entry point to avoid double execution of the test
This commit is contained in:
Julio Lopez
2022-05-31 20:29:38 -07:00
committed by GitHub
parent 511f4aa65d
commit 65fc11ed58
3 changed files with 148 additions and 9 deletions

View File

@@ -4,6 +4,7 @@
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
@@ -66,17 +67,32 @@ func findInUseContentIDs(ctx context.Context, rep repo.Repository, used *sync.Ma
}
// Run performs garbage collection on all the snapshots in the repository.
func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters) (Stats, error) {
func Run(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, maintenanceStartTime time.Time) (Stats, error) {
var st Stats
err := maintenance.ReportRun(ctx, rep, maintenance.TaskSnapshotGarbageCollection, nil, func() error {
return runInternal(ctx, rep, gcDelete, safety, &st)
if err := runInternal(ctx, rep, gcDelete, safety, maintenanceStartTime, &st); err != nil {
return err
}
l := log(ctx)
l.Infof("GC found %v unused contents (%v bytes)", st.UnusedCount, units.BytesStringBase2(st.UnusedBytes))
l.Infof("GC found %v unused contents that are too recent to delete (%v bytes)", st.TooRecentCount, units.BytesStringBase2(st.TooRecentBytes))
l.Infof("GC found %v in-use contents (%v bytes)", st.InUseCount, units.BytesStringBase2(st.InUseBytes))
l.Infof("GC found %v in-use system-contents (%v bytes)", st.SystemCount, units.BytesStringBase2(st.SystemBytes))
if st.UnusedCount > 0 && !gcDelete {
return errors.Errorf("Not deleting because 'gcDelete' was not set")
}
return nil
})
return st, errors.Wrap(err, "error running snapshot gc")
}
func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, st *Stats) error {
func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete bool, safety maintenance.SafetyParameters, maintenanceStartTime time.Time, st *Stats) error {
var (
used sync.Map
@@ -106,12 +122,14 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete
}
inUse.Add(int64(ci.GetPackedLength()))
return nil
}
if rep.Time().Sub(ci.Timestamp()) < safety.MinContentAgeSubjectToGC {
if maintenanceStartTime.Sub(ci.Timestamp()) < safety.MinContentAgeSubjectToGC {
log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.GetContentID(), ci.GetPackedLength(), ci.Timestamp())
tooRecent.Add(int64(ci.GetPackedLength()))
return nil
}
@@ -146,9 +164,5 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete
return errors.Wrap(err, "error iterating contents")
}
if st.UnusedCount > 0 && !gcDelete {
return errors.Errorf("Not deleting because '--delete' flag was not set")
}
return errors.Wrap(rep.Flush(ctx), "flush error")
}

View File

@@ -18,7 +18,7 @@ func Run(ctx context.Context, dr repo.DirectRepositoryWriter, mode maintenance.M
func(ctx context.Context, runParams maintenance.RunParameters) error {
// run snapshot GC before full maintenance
if runParams.Mode == maintenance.ModeFull {
if _, err := snapshotgc.Run(ctx, dr, true, safety); err != nil {
if _, err := snapshotgc.Run(ctx, dr, true, safety, runParams.MaintenanceStartTime); err != nil {
return errors.Wrap(err, "snapshot GC failure")
}
}

View File

@@ -2,6 +2,7 @@
import (
"context"
"encoding/binary"
"testing"
"time"
@@ -168,6 +169,67 @@ func (s *formatSpecificTestSuite) TestMaintenanceReuseDirManifest(t *testing.T)
t.Log("root info:", pretty.Sprint(info))
}
func (s *formatSpecificTestSuite) TestSnapshotGCMinContentAgeSafety(t *testing.T) {
ctx := testlogging.Context(t)
th := newTestHarness(t, s.formatVersion)
require.NotNil(t, th)
require.NotNil(t, th.sourceDir)
th.sourceDir.AddFile("f2", []byte{1, 2, 3, 4}, defaultPermissions)
// Create and delete a snapshot of th.sourceDir dir, which contains 'd1'
si := snapshot.SourceInfo{
Host: "host",
UserName: "user",
Path: "/foo",
}
mustSnapshot(t, th.RepositoryWriter, th.sourceDir, si)
mustFlush(t, th.RepositoryWriter)
const contentCount = 10000
require.NoError(t, th.Repository.Refresh(ctx))
// create 10000 unreferenced contents
oids := create4ByteObjects(t, th.Repository, 0, contentCount)
require.NoError(t, th.Repository.Refresh(ctx))
cids := objectIDsToContentIDs(t, oids)
// check contents are not marked as deleted
checkContentDeletion(t, th.Repository, cids, false)
mustFlush(t, th.RepositoryWriter)
safety := maintenance.SafetyFull
// Advance time so the first content created above is close fall of the
// SafetyFull.MinContentAgeSubjectToGC window. Leave a small buffer
// of 10 seconds below for "passage of time" between now an when snapshot
// GC actually starts.
// Note: The duration of this buffer is a "magical number" that depends on
// the number of times time is advanced between "now" and when snapshot
// gc start
ci, err := th.Repository.ContentInfo(ctx, cids[0])
require.NoError(t, err)
require.NotEmpty(t, ci)
timeAdvance := safety.MinContentAgeSubjectToGC - th.fakeTime.NowFunc()().Sub(ci.Timestamp()) - 10*time.Second
require.Positive(t, timeAdvance)
th.fakeTime.Advance(timeAdvance)
err = snapshotmaintenance.Run(ctx, th.RepositoryWriter, maintenance.ModeFull, true, maintenance.SafetyFull)
require.NoError(t, err)
mustFlush(t, th.RepositoryWriter)
// check contents have not been marked as deleted.
checkContentDeletion(t, th.Repository, cids, false)
}
func newTestHarness(t *testing.T, formatVersion content.FormatVersion) *testHarness {
t.Helper()
@@ -305,3 +367,66 @@ func mustGetContentID(t *testing.T, oid object.ID) content.ID {
return c
}
func create4ByteObjects(t *testing.T, r repo.Repository, base, count int) []object.ID {
t.Helper()
oids := make([]object.ID, 0, count)
ctx := testlogging.Context(t)
ctx, rw, err := r.NewWriter(ctx, repo.WriteSessionOptions{})
require.NoError(t, err)
defer rw.Close(ctx)
var b [4]byte
for i := base; i < base+count; i++ {
w := rw.NewObjectWriter(ctx, object.WriterOptions{Description: "create-test-contents"})
binary.BigEndian.PutUint32(b[:], uint32(i))
_, err := w.Write(b[:])
require.NoError(t, err)
oid, err := w.Result()
require.NoError(t, err)
require.NoError(t, w.Close())
oids = append(oids, oid)
}
require.NoError(t, rw.Flush(ctx))
return oids
}
func objectIDsToContentIDs(t *testing.T, oids []object.ID) []content.ID {
t.Helper()
cids := make([]content.ID, 0, len(oids))
for _, oid := range oids {
cid, _, ok := oid.ContentID()
require.True(t, ok)
cids = append(cids, cid)
}
return cids
}
func checkContentDeletion(t *testing.T, r repo.Repository, cids []content.ID, deleted bool) {
t.Helper()
ctx := testlogging.Context(t)
for i, cid := range cids {
ci, err := r.ContentInfo(ctx, cid)
require.NoErrorf(t, err, "i:%d cid:%s", i, cid)
require.Equalf(t, deleted, ci.GetDeleted(), "i:%d cid:%s", i, cid)
}
}