From 78e8f5035d61bbf4590e2d001c5945b440be715e Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 9 May 2022 22:40:57 -0700 Subject: [PATCH] fix(repository): fix deletion immediately after creation (#1937) In a very rare case, when content is created then deleted or forgotten and immediately recreated in the same second, the newly recreated content may be ignored due to how indices are merged. This change ensures that on each {write,delete,forget} we always move the time forward relative to latest index entry, even if the local clock did not advance at all. --- repo/content/content_manager.go | 27 ++++++++++++++++----- repo/content/content_manager_test.go | 36 ++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index bc4f1afdf..1dbcb56ae 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -199,11 +199,22 @@ func (bm *WriteManager) deletePreexistingContent(ctx context.Context, ci Info) e return errors.Wrap(err, "unable to create pack") } - pp.currentPackItems[ci.GetContentID()] = &deletedInfo{ci, bm.timeNow().Unix()} + pp.currentPackItems[ci.GetContentID()] = &deletedInfo{ci, bm.contentWriteTime(ci.GetTimestampSeconds())} return nil } +// contentWriteTime returns content write time for new content +// by computing max(timeNow().Unix(), previousUnixTimeSeconds + 1). +func (bm *WriteManager) contentWriteTime(previousUnixTimeSeconds int64) int64 { + t := bm.timeNow().Unix() + if t > previousUnixTimeSeconds { + return t + } + + return previousUnixTimeSeconds + 1 +} + type deletedInfo struct { Info deletedTime int64 @@ -256,7 +267,7 @@ func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context return nil } -func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, isRewrite bool) error { +func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, previousWriteTime int64) error { // see if the current index is old enough to cause automatic flush. if err := bm.maybeFlushBasedOnTimeUnlocked(ctx); err != nil { return errors.Wrap(err, "unable to flush old pending writes") @@ -275,7 +286,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat bm.lock() - if !isRewrite { + if previousWriteTime < 0 { if _, _, err = bm.getContentInfoReadLocked(ctx, contentID); err == nil { // we lost the race while compressing the content, the content now exists. bm.unlock() @@ -317,7 +328,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat ContentID: contentID, PackBlobID: pp.packBlobID, PackOffset: uint32(pp.currentPackData.Length()), - TimestampSeconds: bm.timeNow().Unix(), + TimestampSeconds: bm.contentWriteTime(previousWriteTime), FormatVersion: byte(bm.writeFormatVersion), OriginalLength: uint32(data.Length()), } @@ -678,7 +689,7 @@ func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRe isDeleted = false } - return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), true) + return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds()) } func packPrefixForContentID(contentID ID) blob.ID { @@ -748,6 +759,8 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre contentID := prefix + ID(hex.EncodeToString(bm.hashData(hashOutput[:0], data))) + previousWriteTime := int64(-1) + bm.mu.RLock() _, bi, err := bm.getContentInfoReadLocked(ctx, contentID) bm.mu.RUnlock() @@ -758,12 +771,14 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre return contentID, nil } + previousWriteTime = bi.GetTimestampSeconds() + bm.log.Debugf("write-content %v previously-deleted", contentID) } else { bm.log.Debugf("write-content %v new", contentID) } - return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, false) + return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, previousWriteTime) } // GetContent gets the contents of a given content. If the content is not found returns ErrContentNotFound. diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 0c094122f..9e00f4813 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -615,6 +615,42 @@ func (s *contentManagerSuite) TestDeleteContent(t *testing.T) { verifyContentNotFound(ctx, t, bm, content2) } +func (s *contentManagerSuite) TestDeletionAfterCreationWithFrozenTime(t *testing.T) { + ctx := testlogging.Context(t) + data := blobtesting.DataMap{} + st := blobtesting.NewMapStorage(data, nil, nil) + + // first - write new content + bm := s.newTestContentManagerWithCustomTime(t, st, faketime.Frozen(fakeTime)) + content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(40, 16)) + require.NoError(t, bm.Flush(ctx)) + + // second - delete content previously deleted + bm = s.newTestContentManagerWithCustomTime(t, st, faketime.Frozen(fakeTime)) + ci, err := bm.ContentInfo(ctx, content1) + require.NoError(t, err) + require.Equal(t, fakeTime, ci.Timestamp().UTC()) + + require.NoError(t, bm.DeleteContent(ctx, content1)) + require.NoError(t, bm.Flush(ctx)) + ci, err = bm.ContentInfo(ctx, content1) + require.NoError(t, err) + + // time did not move, but we ensured that the time is greater than in the previous index. + require.Equal(t, fakeTime.Add(1*time.Second), ci.Timestamp().UTC()) + + // third - recreate content previously deleted + bm = s.newTestContentManagerWithCustomTime(t, st, faketime.Frozen(fakeTime)) + require.Equal(t, content1, writeContentAndVerify(ctx, t, bm, seededRandomData(40, 16))) + require.NoError(t, bm.Flush(ctx)) + + ci, err = bm.ContentInfo(ctx, content1) + require.NoError(t, err) + + // rewrite moves the time by another second + require.Equal(t, fakeTime.Add(2*time.Second), ci.Timestamp().UTC()) +} + // nolint:gocyclo func (s *contentManagerSuite) TestUndeleteContentSimple(t *testing.T) { ctx := testlogging.Context(t)