fix(repository): fix deletion immediately after creation (#1937)

In a very rare case, when content is created then deleted or forgotten
and immediately recreated in the same second, the newly recreated
content may be ignored due to how indices are merged.

This change ensures that on each {write,delete,forget} we always move
the time forward relative to latest index entry, even if the local clock
did not advance at all.
This commit is contained in:
Jarek Kowalski
2022-05-09 22:40:57 -07:00
committed by GitHub
parent 81c0580a01
commit 78e8f5035d
2 changed files with 57 additions and 6 deletions

View File

@@ -199,11 +199,22 @@ func (bm *WriteManager) deletePreexistingContent(ctx context.Context, ci Info) e
return errors.Wrap(err, "unable to create pack")
}
pp.currentPackItems[ci.GetContentID()] = &deletedInfo{ci, bm.timeNow().Unix()}
pp.currentPackItems[ci.GetContentID()] = &deletedInfo{ci, bm.contentWriteTime(ci.GetTimestampSeconds())}
return nil
}
// contentWriteTime returns content write time for new content
// by computing max(timeNow().Unix(), previousUnixTimeSeconds + 1).
func (bm *WriteManager) contentWriteTime(previousUnixTimeSeconds int64) int64 {
t := bm.timeNow().Unix()
if t > previousUnixTimeSeconds {
return t
}
return previousUnixTimeSeconds + 1
}
type deletedInfo struct {
Info
deletedTime int64
@@ -256,7 +267,7 @@ func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context
return nil
}
func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, isRewrite bool) error {
func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, previousWriteTime int64) error {
// see if the current index is old enough to cause automatic flush.
if err := bm.maybeFlushBasedOnTimeUnlocked(ctx); err != nil {
return errors.Wrap(err, "unable to flush old pending writes")
@@ -275,7 +286,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
bm.lock()
if !isRewrite {
if previousWriteTime < 0 {
if _, _, err = bm.getContentInfoReadLocked(ctx, contentID); err == nil {
// we lost the race while compressing the content, the content now exists.
bm.unlock()
@@ -317,7 +328,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
ContentID: contentID,
PackBlobID: pp.packBlobID,
PackOffset: uint32(pp.currentPackData.Length()),
TimestampSeconds: bm.timeNow().Unix(),
TimestampSeconds: bm.contentWriteTime(previousWriteTime),
FormatVersion: byte(bm.writeFormatVersion),
OriginalLength: uint32(data.Length()),
}
@@ -678,7 +689,7 @@ func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRe
isDeleted = false
}
return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), true)
return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds())
}
func packPrefixForContentID(contentID ID) blob.ID {
@@ -748,6 +759,8 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre
contentID := prefix + ID(hex.EncodeToString(bm.hashData(hashOutput[:0], data)))
previousWriteTime := int64(-1)
bm.mu.RLock()
_, bi, err := bm.getContentInfoReadLocked(ctx, contentID)
bm.mu.RUnlock()
@@ -758,12 +771,14 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre
return contentID, nil
}
previousWriteTime = bi.GetTimestampSeconds()
bm.log.Debugf("write-content %v previously-deleted", contentID)
} else {
bm.log.Debugf("write-content %v new", contentID)
}
return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, false)
return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, previousWriteTime)
}
// GetContent gets the contents of a given content. If the content is not found returns ErrContentNotFound.

View File

@@ -615,6 +615,42 @@ func (s *contentManagerSuite) TestDeleteContent(t *testing.T) {
verifyContentNotFound(ctx, t, bm, content2)
}
func (s *contentManagerSuite) TestDeletionAfterCreationWithFrozenTime(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
// first - write new content
bm := s.newTestContentManagerWithCustomTime(t, st, faketime.Frozen(fakeTime))
content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(40, 16))
require.NoError(t, bm.Flush(ctx))
// second - delete content previously deleted
bm = s.newTestContentManagerWithCustomTime(t, st, faketime.Frozen(fakeTime))
ci, err := bm.ContentInfo(ctx, content1)
require.NoError(t, err)
require.Equal(t, fakeTime, ci.Timestamp().UTC())
require.NoError(t, bm.DeleteContent(ctx, content1))
require.NoError(t, bm.Flush(ctx))
ci, err = bm.ContentInfo(ctx, content1)
require.NoError(t, err)
// time did not move, but we ensured that the time is greater than in the previous index.
require.Equal(t, fakeTime.Add(1*time.Second), ci.Timestamp().UTC())
// third - recreate content previously deleted
bm = s.newTestContentManagerWithCustomTime(t, st, faketime.Frozen(fakeTime))
require.Equal(t, content1, writeContentAndVerify(ctx, t, bm, seededRandomData(40, 16)))
require.NoError(t, bm.Flush(ctx))
ci, err = bm.ContentInfo(ctx, content1)
require.NoError(t, err)
// rewrite moves the time by another second
require.Equal(t, fakeTime.Add(2*time.Second), ci.Timestamp().UTC())
}
// nolint:gocyclo
func (s *contentManagerSuite) TestUndeleteContentSimple(t *testing.T) {
ctx := testlogging.Context(t)