refactor(repository): refactored internal index read API to reduce memory allocations (#3754)

* refactor(repository): refactored internal index read API to reduce memory allocations

* fixed stress test flake, improved debuggability

* fixed spurious checklocks failures

* post-merge fixes

* pr feedback
This commit is contained in:
Jarek Kowalski
2024-04-12 22:59:11 -07:00
committed by GitHub
parent 43d3982b21
commit b55d5b474c
41 changed files with 693 additions and 851 deletions

View File

@@ -54,11 +54,11 @@ func (c *commandContentList) run(ctx context.Context, rep repo.DirectRepository)
IncludeDeleted: c.includeDeleted || c.deletedOnly,
},
func(b content.Info) error {
if c.deletedOnly && !b.GetDeleted() {
if c.deletedOnly && !b.Deleted {
return nil
}
totalSize.Add(int64(b.GetPackedLength()))
totalSize.Add(int64(b.PackedLength))
switch {
case c.jo.jsonOutput:
@@ -68,7 +68,7 @@ func(b content.Info) error {
case c.long:
c.outputLong(b)
default:
c.out.printStdout("%v\n", b.GetContentID())
c.out.printStdout("%v\n", b.ContentID)
}
return nil
@@ -89,12 +89,12 @@ func(b content.Info) error {
func (c *commandContentList) outputLong(b content.Info) {
c.out.printStdout("%v %v %v %v %v+%v%v %v\n",
b.GetContentID(),
b.GetOriginalLength(),
b.ContentID,
b.OriginalLength,
formatTimestamp(b.Timestamp()),
b.GetPackBlobID(),
b.GetPackOffset(),
maybeHumanReadableBytes(c.human, int64(b.GetPackedLength())),
b.PackBlobID,
b.PackOffset,
maybeHumanReadableBytes(c.human, int64(b.PackedLength)),
c.deletedInfoString(b),
c.compressionInfoStringString(b),
)
@@ -102,16 +102,16 @@ func (c *commandContentList) outputLong(b content.Info) {
func (c *commandContentList) outputCompressed(b content.Info) {
c.out.printStdout("%v length %v packed %v %v %v\n",
b.GetContentID(),
maybeHumanReadableBytes(c.human, int64(b.GetOriginalLength())),
maybeHumanReadableBytes(c.human, int64(b.GetPackedLength())),
b.ContentID,
maybeHumanReadableBytes(c.human, int64(b.OriginalLength)),
maybeHumanReadableBytes(c.human, int64(b.PackedLength)),
c.compressionInfoStringString(b),
c.deletedInfoString(b),
)
}
func (*commandContentList) deletedInfoString(b content.Info) string {
if b.GetDeleted() {
if b.Deleted {
return " (deleted)"
}
@@ -119,7 +119,7 @@ func (*commandContentList) deletedInfoString(b content.Info) string {
}
func (*commandContentList) compressionInfoStringString(b content.Info) string {
h := b.GetCompressionHeaderID()
h := b.CompressionHeaderID
if h == content.NoCompression {
return "-"
}
@@ -129,8 +129,8 @@ func (*commandContentList) compressionInfoStringString(b content.Info) string {
s = fmt.Sprintf("compression-%x", h)
}
if b.GetOriginalLength() > 0 {
s += " " + formatCompressionPercentage(int64(b.GetOriginalLength()), int64(b.GetPackedLength()))
if b.OriginalLength > 0 {
s += " " + formatCompressionPercentage(int64(b.OriginalLength), int64(b.PackedLength))
}
return s

View File

@@ -130,24 +130,24 @@ func (c *commandContentStats) calculateStats(ctx context.Context, rep repo.Direc
Range: c.contentRange.contentIDRange(),
},
func(b content.Info) error {
grandTotal.packedSize += int64(b.GetPackedLength())
grandTotal.originalSize += int64(b.GetOriginalLength())
grandTotal.packedSize += int64(b.PackedLength)
grandTotal.originalSize += int64(b.OriginalLength)
grandTotal.count++
bct := byCompressionTotal[b.GetCompressionHeaderID()]
bct := byCompressionTotal[b.CompressionHeaderID]
if bct == nil {
bct = &contentStatsTotals{}
byCompressionTotal[b.GetCompressionHeaderID()] = bct
byCompressionTotal[b.CompressionHeaderID] = bct
}
bct.packedSize += int64(b.GetPackedLength())
bct.originalSize += int64(b.GetOriginalLength())
bct.packedSize += int64(b.PackedLength)
bct.originalSize += int64(b.OriginalLength)
bct.count++
for s := range countMap {
if b.GetPackedLength() < s {
if b.PackedLength < s {
countMap[s]++
totalSizeOfContentsUnder[s] += int64(b.GetPackedLength())
totalSizeOfContentsUnder[s] += int64(b.PackedLength)
}
}

View File

@@ -149,19 +149,19 @@ func (c *commandContentVerify) getTotalContentCount(ctx context.Context, rep rep
}
func (c *commandContentVerify) contentVerify(ctx context.Context, r content.Reader, ci content.Info, blobMap map[blob.ID]blob.Metadata, downloadPercent float64) error {
bi, ok := blobMap[ci.GetPackBlobID()]
bi, ok := blobMap[ci.PackBlobID]
if !ok {
return errors.Errorf("content %v depends on missing blob %v", ci.GetContentID(), ci.GetPackBlobID())
return errors.Errorf("content %v depends on missing blob %v", ci.ContentID, ci.PackBlobID)
}
if int64(ci.GetPackOffset()+ci.GetPackedLength()) > bi.Length {
return errors.Errorf("content %v out of bounds of its pack blob %v", ci.GetContentID(), ci.GetPackBlobID())
if int64(ci.PackOffset+ci.PackedLength) > bi.Length {
return errors.Errorf("content %v out of bounds of its pack blob %v", ci.ContentID, ci.PackBlobID)
}
//nolint:gosec
if 100*rand.Float64() < downloadPercent {
if _, err := r.GetContent(ctx, ci.GetContentID()); err != nil {
return errors.Wrapf(err, "content %v is invalid", ci.GetContentID())
if _, err := r.GetContent(ctx, ci.ContentID); err != nil {
return errors.Wrapf(err, "content %v is invalid", ci.ContentID)
}
return nil

View File

@@ -113,7 +113,7 @@ func (c *commandIndexInspect) dumpIndexBlobEntries(entries chan indexBlobPlusCon
bm := ent.indexBlob
state := "created"
if ci.GetDeleted() {
if ci.Deleted {
state = "deleted"
}
@@ -123,7 +123,7 @@ func (c *commandIndexInspect) dumpIndexBlobEntries(entries chan indexBlobPlusCon
c.out.printStdout("%v %v %v %v %v %v %v %v\n",
formatTimestampPrecise(bm.Timestamp), bm.BlobID,
ci.GetContentID(), state, formatTimestampPrecise(ci.Timestamp()), ci.GetPackBlobID(), ci.GetPackOffset(), ci.GetPackedLength())
ci.ContentID, state, formatTimestampPrecise(ci.Timestamp()), ci.PackBlobID, ci.PackOffset, ci.PackedLength)
}
}
@@ -132,7 +132,7 @@ func (c *commandIndexInspect) shouldInclude(ci content.Info) bool {
return true
}
contentID := ci.GetContentID().String()
contentID := ci.ContentID.String()
for _, cid := range c.contentIDs {
if cid == contentID {

View File

@@ -93,9 +93,9 @@ func (c *commandRepositoryUpgrade) setup(svc advancedAppServices, parent command
// assign store the info struct in a map that can be used to compare indexes.
func assign(iif content.Info, i int, m map[content.ID][2]content.Info) {
v := m[iif.GetContentID()]
v := m[iif.ContentID]
v[i] = iif
m[iif.GetContentID()] = v
m[iif.ContentID] = v
}
// loadIndexBlobs load index blobs into indexEntries map. indexEntries map will allow comparison betweel two indexes (index at which == 0 and index at which == 1).
@@ -172,11 +172,11 @@ func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.
// one of iep0 or iep1 are nil .. find out which one and add an appropriate message.
if iep0 != zeroInfo {
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep0.GetPackBlobID()))
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep0.PackBlobID))
continue
}
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep1.GetPackBlobID()))
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep1.PackBlobID))
}
// no msgs means the check passed without finding anything wrong
@@ -201,24 +201,24 @@ func CheckIndexInfo(i0, i1 content.Info) []string {
var q []string
switch {
case i0.GetFormatVersion() != i1.GetFormatVersion():
q = append(q, fmt.Sprintf("mismatched FormatVersions: %v %v", i0.GetFormatVersion(), i1.GetFormatVersion()))
case i0.GetOriginalLength() != i1.GetOriginalLength():
q = append(q, fmt.Sprintf("mismatched OriginalLengths: %v %v", i0.GetOriginalLength(), i1.GetOriginalLength()))
case i0.GetPackBlobID() != i1.GetPackBlobID():
q = append(q, fmt.Sprintf("mismatched PackBlobIDs: %v %v", i0.GetPackBlobID(), i1.GetPackBlobID()))
case i0.GetPackedLength() != i1.GetPackedLength():
q = append(q, fmt.Sprintf("mismatched PackedLengths: %v %v", i0.GetPackedLength(), i1.GetPackedLength()))
case i0.GetPackOffset() != i1.GetPackOffset():
q = append(q, fmt.Sprintf("mismatched PackOffsets: %v %v", i0.GetPackOffset(), i1.GetPackOffset()))
case i0.GetEncryptionKeyID() != i1.GetEncryptionKeyID():
q = append(q, fmt.Sprintf("mismatched EncryptionKeyIDs: %v %v", i0.GetEncryptionKeyID(), i1.GetEncryptionKeyID()))
case i0.GetCompressionHeaderID() != i1.GetCompressionHeaderID():
q = append(q, fmt.Sprintf("mismatched GetCompressionHeaderID: %v %v", i0.GetCompressionHeaderID(), i1.GetCompressionHeaderID()))
case i0.GetDeleted() != i1.GetDeleted():
q = append(q, fmt.Sprintf("mismatched Deleted flags: %v %v", i0.GetDeleted(), i1.GetDeleted()))
case i0.GetTimestampSeconds() != i1.GetTimestampSeconds():
q = append(q, fmt.Sprintf("mismatched TimestampSeconds: %v %v", i0.GetTimestampSeconds(), i1.GetTimestampSeconds()))
case i0.FormatVersion != i1.FormatVersion:
q = append(q, fmt.Sprintf("mismatched FormatVersions: %v %v", i0.FormatVersion, i1.FormatVersion))
case i0.OriginalLength != i1.OriginalLength:
q = append(q, fmt.Sprintf("mismatched OriginalLengths: %v %v", i0.OriginalLength, i1.OriginalLength))
case i0.PackBlobID != i1.PackBlobID:
q = append(q, fmt.Sprintf("mismatched PackBlobIDs: %v %v", i0.PackBlobID, i1.PackBlobID))
case i0.PackedLength != i1.PackedLength:
q = append(q, fmt.Sprintf("mismatched PackedLengths: %v %v", i0.PackedLength, i1.PackedLength))
case i0.PackOffset != i1.PackOffset:
q = append(q, fmt.Sprintf("mismatched PackOffsets: %v %v", i0.PackOffset, i1.PackOffset))
case i0.EncryptionKeyID != i1.EncryptionKeyID:
q = append(q, fmt.Sprintf("mismatched EncryptionKeyIDs: %v %v", i0.EncryptionKeyID, i1.EncryptionKeyID))
case i0.CompressionHeaderID != i1.CompressionHeaderID:
q = append(q, fmt.Sprintf("mismatched GetCompressionHeaderID: %v %v", i0.CompressionHeaderID, i1.CompressionHeaderID))
case i0.Deleted != i1.Deleted:
q = append(q, fmt.Sprintf("mismatched Deleted flags: %v %v", i0.Deleted, i1.Deleted))
case i0.TimestampSeconds != i1.TimestampSeconds:
q = append(q, fmt.Sprintf("mismatched TimestampSeconds: %v %v", i0.TimestampSeconds, i1.TimestampSeconds))
}
if len(q) == 0 {
@@ -226,7 +226,7 @@ func CheckIndexInfo(i0, i1 content.Info) []string {
}
for i := range q {
q[i] = fmt.Sprintf("index blobs do not match: %q, %q: %s", i0.GetPackBlobID(), i1.GetPackBlobID(), q[i])
q[i] = fmt.Sprintf("index blobs do not match: %q, %q: %s", i0.PackBlobID, i1.PackBlobID, q[i])
}
return q

View File

@@ -15,35 +15,35 @@
func InfoDiff(i1, i2 index.Info, ignore ...string) []string {
var diffs []string
if l, r := i1.GetContentID(), i2.GetContentID(); l != r {
if l, r := i1.ContentID, i2.ContentID; l != r {
diffs = append(diffs, fmt.Sprintf("GetContentID %v != %v", l, r))
}
if l, r := i1.GetPackBlobID(), i2.GetPackBlobID(); l != r {
if l, r := i1.PackBlobID, i2.PackBlobID; l != r {
diffs = append(diffs, fmt.Sprintf("GetPackBlobID %v != %v", l, r))
}
if l, r := i1.GetDeleted(), i2.GetDeleted(); l != r {
if l, r := i1.Deleted, i2.Deleted; l != r {
diffs = append(diffs, fmt.Sprintf("GetDeleted %v != %v", l, r))
}
if l, r := i1.GetFormatVersion(), i2.GetFormatVersion(); l != r {
if l, r := i1.FormatVersion, i2.FormatVersion; l != r {
diffs = append(diffs, fmt.Sprintf("GetFormatVersion %v != %v", l, r))
}
if l, r := i1.GetOriginalLength(), i2.GetOriginalLength(); l != r {
if l, r := i1.OriginalLength, i2.OriginalLength; l != r {
diffs = append(diffs, fmt.Sprintf("GetOriginalLength %v != %v", l, r))
}
if l, r := i1.GetPackOffset(), i2.GetPackOffset(); l != r {
if l, r := i1.PackOffset, i2.PackOffset; l != r {
diffs = append(diffs, fmt.Sprintf("GetPackOffset %v != %v", l, r))
}
if l, r := i1.GetPackedLength(), i2.GetPackedLength(); l != r {
if l, r := i1.PackedLength, i2.PackedLength; l != r {
diffs = append(diffs, fmt.Sprintf("GetPackedLength %v != %v", l, r))
}
if l, r := i1.GetTimestampSeconds(), i2.GetTimestampSeconds(); l != r {
if l, r := i1.TimestampSeconds, i2.TimestampSeconds; l != r {
diffs = append(diffs, fmt.Sprintf("GetTimestampSeconds %v != %v", l, r))
}
@@ -51,18 +51,17 @@ func InfoDiff(i1, i2 index.Info, ignore ...string) []string {
diffs = append(diffs, fmt.Sprintf("Timestamp %v != %v", l, r))
}
if l, r := i1.GetCompressionHeaderID(), i2.GetCompressionHeaderID(); l != r {
if l, r := i1.CompressionHeaderID, i2.CompressionHeaderID; l != r {
diffs = append(diffs, fmt.Sprintf("GetCompressionHeaderID %v != %v", l, r))
}
if l, r := i1.GetEncryptionKeyID(), i2.GetEncryptionKeyID(); l != r {
if l, r := i1.EncryptionKeyID, i2.EncryptionKeyID; l != r {
diffs = append(diffs, fmt.Sprintf("GetEncryptionKeyID %v != %v", l, r))
}
// dear future reader, if this fails because the number of methods has changed,
// you need to add additional verification above.
//nolint:gomnd
if cnt := reflect.TypeOf((*index.InfoReader)(nil)).Elem().NumMethod(); cnt != 11 {
if cnt := reflect.TypeOf(index.Info{}).NumMethod(); cnt != 1 {
diffs = append(diffs, fmt.Sprintf("unexpected number of methods on content.Info: %v, must update the test", cnt))
}

View File

@@ -215,14 +215,14 @@ func handleGetContentInfoRequest(ctx context.Context, dw repo.DirectRepositoryWr
Response: &grpcapi.SessionResponse_GetContentInfo{
GetContentInfo: &grpcapi.GetContentInfoResponse{
Info: &grpcapi.ContentInfo{
Id: ci.GetContentID().String(),
PackedLength: ci.GetPackedLength(),
TimestampSeconds: ci.GetTimestampSeconds(),
PackBlobId: string(ci.GetPackBlobID()),
PackOffset: ci.GetPackOffset(),
Deleted: ci.GetDeleted(),
FormatVersion: uint32(ci.GetFormatVersion()),
OriginalLength: ci.GetOriginalLength(),
Id: ci.ContentID.String(),
PackedLength: ci.PackedLength,
TimestampSeconds: ci.TimestampSeconds,
PackBlobId: string(ci.PackBlobID),
PackOffset: ci.PackOffset,
Deleted: ci.Deleted,
FormatVersion: uint32(ci.FormatVersion),
OriginalLength: ci.OriginalLength,
},
},
},

View File

@@ -63,13 +63,15 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
c.mu.RLock()
defer c.mu.RUnlock()
info, err := c.merged.GetInfo(contentID)
if info != nil {
var info Info
ok, err := c.merged.GetInfo(contentID, &info)
if ok {
if shouldIgnore(info, c.deletionWatermark) {
return index.Info{}, ErrContentNotFound
}
return index.ToInfoStruct(info), nil
return info, nil
}
if err == nil {
@@ -79,8 +81,8 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
return index.Info{}, errors.Wrap(err, "error getting content info from index")
}
func shouldIgnore(id index.InfoReader, deletionWatermark time.Time) bool {
if !id.GetDeleted() {
func shouldIgnore(id index.Info, deletionWatermark time.Time) bool {
if !id.Deleted {
return false
}
@@ -131,12 +133,12 @@ func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) e
c.mu.RUnlock()
//nolint:wrapcheck
return m.Iterate(r, func(i index.InfoReader) error {
return m.Iterate(r, func(i index.Info) error {
if shouldIgnore(i, deletionWatermark) {
return nil
}
return cb(index.ToInfoStruct(i))
return cb(i)
})
}
@@ -257,8 +259,8 @@ func (c *committedContentIndex) combineSmallIndexes(ctx context.Context, m index
b := index.Builder{}
for _, ndx := range toMerge {
if err := ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
b.Add(index.ToInfoStruct(i))
if err := ndx.Iterate(index.AllIDs, func(i index.Info) error {
b.Add(i)
return nil
}); err != nil {
return nil, errors.Wrap(err, "unable to iterate index entries")

View File

@@ -75,19 +75,23 @@ func testCache(t *testing.T, cache committedContentIndexCache, fakeTime *faketim
ndx2, err := cache.openIndex(ctx, "ndx2")
require.NoError(t, err)
i, err := ndx1.GetInfo(mustParseID(t, "c1"))
var i Info
ok, err := ndx1.GetInfo(mustParseID(t, "c1"), &i)
require.True(t, ok)
require.NoError(t, err)
if got, want := i.GetPackBlobID(), blob.ID("p1234"); got != want {
if got, want := i.PackBlobID, blob.ID("p1234"); got != want {
t.Fatalf("unexpected pack blob ID: %v, want %v", got, want)
}
require.NoError(t, ndx1.Close())
i, err = ndx2.GetInfo(mustParseID(t, "c3"))
ok, err = ndx2.GetInfo(mustParseID(t, "c3"), &i)
require.True(t, ok)
require.NoError(t, err)
if got, want := i.GetPackBlobID(), blob.ID("p2345"); got != want {
if got, want := i.PackBlobID, blob.ID("p2345"); got != want {
t.Fatalf("unexpected pack blob ID: %v, want %v", got, want)
}

View File

@@ -287,25 +287,25 @@ func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info,
var hashBuf [hashing.MaxHashSize]byte
iv := getPackedContentIV(hashBuf[:0], bi.GetContentID())
iv := getPackedContentIV(hashBuf[:0], bi.ContentID)
// reserved for future use
if k := bi.GetEncryptionKeyID(); k != 0 {
if k := bi.EncryptionKeyID; k != 0 {
return errors.Errorf("unsupported encryption key ID: %v", k)
}
h := bi.GetCompressionHeaderID()
h := bi.CompressionHeaderID
if h == 0 {
return errors.Wrapf(
sm.decryptAndVerify(payload, iv, output),
"invalid checksum at %v offset %v length %v/%v", bi.GetPackBlobID(), bi.GetPackOffset(), bi.GetPackedLength(), payload.Length())
"invalid checksum at %v offset %v length %v/%v", bi.PackBlobID, bi.PackOffset, bi.PackedLength, payload.Length())
}
var tmp gather.WriteBuffer
defer tmp.Close()
if err := sm.decryptAndVerify(payload, iv, &tmp); err != nil {
return errors.Wrapf(err, "invalid checksum at %v offset %v length %v/%v", bi.GetPackBlobID(), bi.GetPackOffset(), bi.GetPackedLength(), payload.Length())
return errors.Wrapf(err, "invalid checksum at %v offset %v length %v/%v", bi.PackBlobID, bi.PackOffset, bi.PackedLength, payload.Length())
}
c := compression.ByHeaderID[h]

View File

@@ -30,12 +30,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b
var recovered []Info
err = ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
// 'i' is ephemeral and will depend on temporary buffers which
// won't be available when this function returns, we need to
// convert it to durable struct.
is := index.ToInfoStruct(i)
err = ndx.Iterate(index.AllIDs, func(is index.Info) error {
recovered = append(recovered, is)
return nil

View File

@@ -129,7 +129,7 @@ func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error {
// remove from all pending packs
for _, pp := range bm.pendingPacks {
if bi, ok := pp.currentPackItems[contentID]; ok && !bi.GetDeleted() {
if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted {
delete(pp.currentPackItems, contentID)
return nil
}
@@ -137,7 +137,7 @@ func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error {
// remove from all packs that are being written, since they will be committed to index soon
for _, pp := range bm.writingPacks {
if bi, ok := pp.currentPackItems[contentID]; ok && !bi.GetDeleted() {
if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted {
return bm.deletePreexistingContent(ctx, bi)
}
}
@@ -177,16 +177,16 @@ func (bm *WriteManager) maybeRefreshIndexes(ctx context.Context) error {
// Intentionally passing bi by value.
// +checklocks:bm.mu
func (bm *WriteManager) deletePreexistingContent(ctx context.Context, ci Info) error {
if ci.GetDeleted() {
if ci.Deleted {
return nil
}
pp, err := bm.getOrCreatePendingPackInfoLocked(ctx, packPrefixForContentID(ci.GetContentID()))
pp, err := bm.getOrCreatePendingPackInfoLocked(ctx, packPrefixForContentID(ci.ContentID))
if err != nil {
return errors.Wrap(err, "unable to create pack")
}
pp.currentPackItems[ci.GetContentID()] = deletedInfo(ci, bm.contentWriteTime(ci.GetTimestampSeconds()))
pp.currentPackItems[ci.ContentID] = deletedInfo(ci, bm.contentWriteTime(ci.TimestampSeconds))
return nil
}
@@ -377,13 +377,13 @@ func (bm *WriteManager) verifyInvariantsLocked(mp format.MutableParameters) {
func (bm *WriteManager) verifyCurrentPackItemsLocked() {
for _, pp := range bm.pendingPacks {
for k, cpi := range pp.currentPackItems {
bm.assertInvariant(cpi.GetContentID() == k, "content ID entry has invalid key: %v %v", cpi.GetContentID(), k)
bm.assertInvariant(cpi.ContentID == k, "content ID entry has invalid key: %v %v", cpi.ContentID, k)
if !cpi.GetDeleted() {
bm.assertInvariant(cpi.GetPackBlobID() == pp.packBlobID, "non-deleted pending pack item %q must be from the pending pack %q, was %q", cpi.GetContentID(), pp.packBlobID, cpi.GetPackBlobID())
if !cpi.Deleted {
bm.assertInvariant(cpi.PackBlobID == pp.packBlobID, "non-deleted pending pack item %q must be from the pending pack %q, was %q", cpi.ContentID, pp.packBlobID, cpi.PackBlobID)
}
bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID())
bm.assertInvariant(cpi.TimestampSeconds != 0, "content has no timestamp: %v", cpi.ContentID)
}
}
}
@@ -391,16 +391,16 @@ func (bm *WriteManager) verifyCurrentPackItemsLocked() {
// +checklocks:bm.mu
func (bm *WriteManager) verifyPackIndexBuilderLocked(mp format.MutableParameters) {
for k, cpi := range bm.packIndexBuilder {
bm.assertInvariant(cpi.GetContentID() == k, "content ID entry has invalid key: %v %v", cpi.GetContentID(), k)
bm.assertInvariant(cpi.ContentID == k, "content ID entry has invalid key: %v %v", cpi.ContentID, k)
if cpi.GetDeleted() {
bm.assertInvariant(cpi.GetPackBlobID() == "", "content can't be both deleted and have a pack content: %v", cpi.GetContentID())
if cpi.Deleted {
bm.assertInvariant(cpi.PackBlobID == "", "content can't be both deleted and have a pack content: %v", cpi.ContentID)
} else {
bm.assertInvariant(cpi.GetPackBlobID() != "", "content that's not deleted must have a pack content: %+v", cpi)
bm.assertInvariant(cpi.GetFormatVersion() == byte(mp.Version), "content that's not deleted must have a valid format version: %+v", cpi)
bm.assertInvariant(cpi.PackBlobID != "", "content that's not deleted must have a pack content: %+v", cpi)
bm.assertInvariant(cpi.FormatVersion == byte(mp.Version), "content that's not deleted must have a valid format version: %+v", cpi)
}
bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID())
bm.assertInvariant(cpi.TimestampSeconds != 0, "content has no timestamp: %v", cpi.ContentID)
}
}
@@ -708,7 +708,7 @@ func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRe
return errors.Wrap(err, "unable to get content data and info")
}
isDeleted := bi.GetDeleted()
isDeleted := bi.Deleted
if onlyRewriteDeleted {
if !isDeleted {
@@ -718,7 +718,7 @@ func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRe
isDeleted = false
}
return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds(), mp)
return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.CompressionHeaderID, bi.TimestampSeconds, mp)
}
func packPrefixForContentID(contentID ID) blob.ID {
@@ -820,14 +820,14 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre
// content already tracked
if err == nil {
if !bi.GetDeleted() {
if !bi.Deleted {
bm.deduplicatedContents.Add(1)
bm.deduplicatedBytes.Add(int64(data.Length()))
return contentID, nil
}
previousWriteTime = bi.GetTimestampSeconds()
previousWriteTime = bi.TimestampSeconds
logbuf.AppendString(" previously-deleted:")
logbuf.AppendInt64(previousWriteTime)

View File

@@ -77,8 +77,8 @@ func ParseIndexBlob(blobID blob.ID, encrypted gather.Bytes, crypter blobcrypto.C
var results []Info
err = ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
results = append(results, index.ToInfoStruct(i))
err = ndx.Iterate(index.AllIDs, func(i index.Info) error {
results = append(results, i)
return nil
})

View File

@@ -120,16 +120,16 @@ func (bm *WriteManager) IterateContents(ctx context.Context, opts IterateOptions
invokeCallback := func(i Info) error {
if !opts.IncludeDeleted {
if ci, ok := uncommitted[i.GetContentID()]; ok {
if ci.GetDeleted() {
if ci, ok := uncommitted[i.ContentID]; ok {
if ci.Deleted {
return nil
}
} else if i.GetDeleted() {
} else if i.Deleted {
return nil
}
}
if !opts.Range.Contains(i.GetContentID()) {
if !opts.Range.Contains(i.ContentID) {
return nil
}
@@ -198,18 +198,18 @@ func (bm *WriteManager) IteratePacks(ctx context.Context, options IteratePackOpt
IncludeDeleted: options.IncludePacksWithOnlyDeletedContent,
},
func(ci Info) error {
if !options.matchesBlob(ci.GetPackBlobID()) {
if !options.matchesBlob(ci.PackBlobID) {
return nil
}
pi := packUsage[ci.GetPackBlobID()]
pi := packUsage[ci.PackBlobID]
if pi == nil {
pi = &PackInfo{}
packUsage[ci.GetPackBlobID()] = pi
packUsage[ci.PackBlobID] = pi
}
pi.PackID = ci.GetPackBlobID()
pi.PackID = ci.PackBlobID
pi.ContentCount++
pi.TotalSize += int64(ci.GetPackedLength())
pi.TotalSize += int64(ci.PackedLength)
if options.IncludeContentInfos {
pi.ContentInfos = append(pi.ContentInfos, ci)
}

View File

@@ -101,21 +101,21 @@ func writeRandomBytesToBuffer(b *gather.WriteBuffer, count int) error {
func contentCacheKeyForInfo(bi Info) string {
// append format-specific information
// see https://github.com/kopia/kopia/issues/1843 for an explanation
return fmt.Sprintf("%v.%x.%x.%x", bi.GetContentID(), bi.GetCompressionHeaderID(), bi.GetFormatVersion(), bi.GetEncryptionKeyID())
return fmt.Sprintf("%v.%x.%x.%x", bi.ContentID, bi.CompressionHeaderID, bi.FormatVersion, bi.EncryptionKeyID)
}
func (sm *SharedManager) getContentDataReadLocked(ctx context.Context, pp *pendingPackInfo, bi Info, output *gather.WriteBuffer) error {
var payload gather.WriteBuffer
defer payload.Close()
if pp != nil && pp.packBlobID == bi.GetPackBlobID() {
if pp != nil && pp.packBlobID == bi.PackBlobID {
// we need to use a lock here in case somebody else writes to the pack at the same time.
if err := pp.currentPackData.AppendSectionTo(&payload, int(bi.GetPackOffset()), int(bi.GetPackedLength())); err != nil {
if err := pp.currentPackData.AppendSectionTo(&payload, int(bi.PackOffset), int(bi.PackedLength)); err != nil {
// should never happen
return errors.Wrap(err, "error appending pending content data to buffer")
}
} else if err := sm.getCacheForContentID(bi.GetContentID()).GetContent(ctx, contentCacheKeyForInfo(bi), bi.GetPackBlobID(), int64(bi.GetPackOffset()), int64(bi.GetPackedLength()), &payload); err != nil {
return errors.Wrap(err, "error getting cached content")
} else if err := sm.getCacheForContentID(bi.ContentID).GetContent(ctx, contentCacheKeyForInfo(bi), bi.PackBlobID, int64(bi.PackOffset), int64(bi.PackedLength), &payload); err != nil {
return errors.Wrapf(err, "error getting cached content from blob %q", bi.PackBlobID)
}
return sm.decryptContentAndVerify(payload.Bytes(), bi, output)
@@ -129,7 +129,7 @@ func (sm *SharedManager) preparePackDataContent(mp format.MutableParameters, pp
defer sb.Release()
for _, info := range pp.currentPackItems {
if info.GetPackBlobID() == pp.packBlobID {
if info.PackBlobID == pp.packBlobID {
haveContent = true
}
@@ -137,13 +137,13 @@ func (sm *SharedManager) preparePackDataContent(mp format.MutableParameters, pp
sb.AppendString("add-to-pack ")
sb.AppendString(string(pp.packBlobID))
sb.AppendString(" ")
info.GetContentID().AppendToLogBuffer(sb)
info.ContentID.AppendToLogBuffer(sb)
sb.AppendString(" p:")
sb.AppendString(string(info.GetPackBlobID()))
sb.AppendString(string(info.PackBlobID))
sb.AppendString(" ")
sb.AppendUint32(info.GetPackedLength())
sb.AppendUint32(info.PackedLength)
sb.AppendString(" d:")
sb.AppendBoolean(info.GetDeleted())
sb.AppendBoolean(info.Deleted)
sm.log.Debugf(sb.String())
packFileIndex.Add(info)

View File

@@ -740,15 +740,15 @@ func (s *contentManagerSuite) TestUndeleteContentSimple(t *testing.T) {
got, want := getContentInfo(t, bm, tc.cid), tc.info
if got.GetDeleted() {
if got.Deleted {
t.Error("Content marked as deleted:", got)
}
if got.GetPackBlobID() == "" {
if got.PackBlobID == "" {
t.Error("Empty pack id for undeleted content:", tc.cid)
}
if got.GetPackOffset() == 0 {
if got.PackOffset == 0 {
t.Error("0 offset for undeleted content:", tc.cid)
}
@@ -788,15 +788,15 @@ func (s *contentManagerSuite) TestUndeleteContentSimple(t *testing.T) {
t.Log("case name:", tc.name)
got := getContentInfo(t, bm, tc.cid)
if got.GetDeleted() {
if got.Deleted {
t.Error("Content marked as deleted:", got)
}
if got.GetPackBlobID() == "" {
if got.PackBlobID == "" {
t.Error("Empty pack id for undeleted content:", tc.cid)
}
if got.GetPackOffset() == 0 {
if got.PackOffset == 0 {
t.Error("0 offset for undeleted content:", tc.cid)
}
@@ -890,7 +890,7 @@ func (s *contentManagerSuite) TestUndeleteContent(t *testing.T) {
t.Fatalf("unable to get content info for %v: %v", id, err)
}
if got, want := ci.GetDeleted(), false; got != want {
if got, want := ci.Deleted, false; got != want {
t.Fatalf("content %v was not undeleted: %v", id, ci)
}
}
@@ -906,7 +906,7 @@ func (s *contentManagerSuite) TestUndeleteContent(t *testing.T) {
t.Fatalf("unable to get content info for %v: %v", id, err)
}
if got, want := ci.GetDeleted(), false; got != want {
if got, want := ci.Deleted, false; got != want {
t.Fatalf("content %v was not undeleted: %v", id, ci)
}
}
@@ -921,7 +921,7 @@ func (s *contentManagerSuite) TestUndeleteContent(t *testing.T) {
t.Fatalf("unable to get content info for %v: %v", id, err)
}
if got, want := ci.GetDeleted(), false; got != want {
if got, want := ci.Deleted, false; got != want {
t.Fatalf("content %v was not undeleted: %v", id, ci)
}
}
@@ -980,7 +980,7 @@ func deleteContentAfterUndeleteAndCheck(ctx context.Context, t *testing.T, bm *W
deleteContent(ctx, t, bm, id)
got := getContentInfo(t, bm, id)
if !got.GetDeleted() {
if !got.Deleted {
t.Fatalf("Expected content %q to be deleted, got: %#v", id, got)
}
@@ -994,7 +994,7 @@ func deleteContentAfterUndeleteAndCheck(ctx context.Context, t *testing.T, bm *W
// check c1 again
got = getContentInfo(t, bm, id)
if !got.GetDeleted() {
if !got.Deleted {
t.Fatal("Expected content to be deleted, got: ", got)
}
@@ -1228,7 +1228,7 @@ func (s *contentManagerSuite) verifyAllDataPresent(ctx context.Context, t *testi
defer bm.CloseShared(ctx)
_ = bm.IterateContents(ctx, IterateOptions{}, func(ci Info) error {
delete(contentIDs, ci.GetContentID())
delete(contentIDs, ci.ContentID)
return nil
})
@@ -1638,7 +1638,7 @@ func (s *contentManagerSuite) TestIterateContents(t *testing.T) {
}
mu.Lock()
got[ci.GetContentID()] = true
got[ci.ContentID] = true
mu.Unlock()
return nil
})
@@ -1821,9 +1821,9 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) {
require.NoError(t, err)
if bm.SupportsContentCompression() {
require.Equal(t, compression.HeaderZstdFastest, info.GetCompressionHeaderID())
require.Equal(t, compression.HeaderZstdFastest, info.CompressionHeaderID)
} else {
require.Equal(t, NoCompression, info.GetCompressionHeaderID())
require.Equal(t, NoCompression, info.CompressionHeaderID)
}
}
@@ -2046,9 +2046,9 @@ func (s *contentManagerSuite) TestCompression_CompressibleData(t *testing.T) {
require.NoError(t, err)
// gzip-compressed length
require.Equal(t, uint32(79), ci.GetPackedLength())
require.Equal(t, uint32(len(compressibleData)), ci.GetOriginalLength())
require.Equal(t, headerID, ci.GetCompressionHeaderID())
require.Equal(t, uint32(79), ci.PackedLength)
require.Equal(t, uint32(len(compressibleData)), ci.OriginalLength)
require.Equal(t, headerID, ci.CompressionHeaderID)
verifyContent(ctx, t, bm, cid, compressibleData)
@@ -2083,9 +2083,9 @@ func (s *contentManagerSuite) TestCompression_NonCompressibleData(t *testing.T)
require.NoError(t, err)
// verify compression did not occur
require.Greater(t, ci.GetPackedLength(), ci.GetOriginalLength())
require.Equal(t, uint32(len(nonCompressibleData)), ci.GetOriginalLength())
require.Equal(t, NoCompression, ci.GetCompressionHeaderID())
require.Greater(t, ci.PackedLength, ci.OriginalLength)
require.Equal(t, uint32(len(nonCompressibleData)), ci.OriginalLength)
require.Equal(t, NoCompression, ci.CompressionHeaderID)
require.NoError(t, bm.Flush(ctx))
verifyContent(ctx, t, bm, cid, nonCompressibleData)
@@ -2173,12 +2173,12 @@ func (s *contentManagerSuite) TestPrefetchContent(t *testing.T) {
id6 := writeContentAndVerify(ctx, t, bm, bytes.Repeat([]byte{6, 7, 8, 9, 10, 11}, 1e6))
require.NoError(t, bm.Flush(ctx))
blob1 := getContentInfo(t, bm, id1).GetPackBlobID()
require.Equal(t, blob1, getContentInfo(t, bm, id2).GetPackBlobID())
require.Equal(t, blob1, getContentInfo(t, bm, id3).GetPackBlobID())
blob2 := getContentInfo(t, bm, id4).GetPackBlobID()
require.Equal(t, blob2, getContentInfo(t, bm, id5).GetPackBlobID())
require.Equal(t, blob2, getContentInfo(t, bm, id6).GetPackBlobID())
blob1 := getContentInfo(t, bm, id1).PackBlobID
require.Equal(t, blob1, getContentInfo(t, bm, id2).PackBlobID)
require.Equal(t, blob1, getContentInfo(t, bm, id3).PackBlobID)
blob2 := getContentInfo(t, bm, id4).PackBlobID
require.Equal(t, blob2, getContentInfo(t, bm, id5).PackBlobID)
require.Equal(t, blob2, getContentInfo(t, bm, id6).PackBlobID)
ccd := bm.contentCache
ccm := bm.metadataCache
@@ -2489,7 +2489,7 @@ func verifyDeletedContentRead(ctx context.Context, t *testing.T, bm *WriteManage
return
}
if !ci.GetDeleted() {
if !ci.Deleted {
t.Errorf("Expected content to be deleted, but it is not: %#v", ci)
}
}

View File

@@ -39,7 +39,7 @@ func (o *prefetchOptions) shouldPrefetchEntireBlob(infos []Info) bool {
var total int64
for _, i := range infos {
total += int64(i.GetPackedLength())
total += int64(i.PackedLength)
}
return total >= o.fullBlobPrefetchBytesThreshold
@@ -72,7 +72,7 @@ func (bm *WriteManager) PrefetchContents(ctx context.Context, contentIDs []ID, h
continue
}
contentsByBlob[bi.GetPackBlobID()] = append(contentsByBlob[bi.GetPackBlobID()], bi)
contentsByBlob[bi.PackBlobID] = append(contentsByBlob[bi.PackBlobID], bi)
prefetched = append(prefetched, ci)
}
@@ -97,7 +97,7 @@ type work struct {
workCh <- work{blobID: b}
} else {
for _, bi := range infos {
workCh <- work{contentID: bi.GetContentID()}
workCh <- work{contentID: bi.ContentID}
}
}
}

View File

@@ -18,10 +18,10 @@
type Index interface {
io.Closer
ApproximateCount() int
GetInfo(contentID ID) (InfoReader, error)
GetInfo(contentID ID, result *Info) (bool, error)
// invoked the provided callback for all entries such that entry.ID >= startID and entry.ID < endID
Iterate(r IDRange, cb func(InfoReader) error) error
Iterate(r IDRange, cb func(Info) error) error
}
// Open reads an Index from a given reader. The caller must call Close() when the index is no longer used.

View File

@@ -35,7 +35,7 @@ func (b Builder) Clone() Builder {
// Add adds a new entry to the builder or conditionally replaces it if the timestamp is greater.
func (b Builder) Add(i Info) {
cid := i.GetContentID()
cid := i.ContentID
old, found := b[cid]
if !found || contentInfoGreaterThanStruct(i, old) {
@@ -94,7 +94,7 @@ func (b Builder) sortedContents() []Info {
buck := buckets[i]
sort.Slice(buck, func(i, j int) bool {
return buck[i].GetContentID().less(buck[j].GetContentID())
return buck[i].ContentID.less(buck[j].ContentID)
})
}
}

View File

@@ -6,12 +6,11 @@
"encoding/binary"
"io"
"sort"
"time"
"sync"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/compression"
)
const (
@@ -35,79 +34,12 @@ type FormatV1 struct {
Entries []struct {
Key []byte // key bytes (KeySize)
Entry indexEntryInfoV1
Entry []byte // entry bytes (EntrySize)
}
ExtraData []byte // extra data
}
type indexEntryInfoV1 struct {
data []byte
contentID ID
b *indexV1
}
func (e indexEntryInfoV1) GetContentID() ID {
return e.contentID
}
// entry bytes 0..5: 48-bit big-endian timestamp in seconds since 1970/01/01 UTC.
func (e indexEntryInfoV1) GetTimestampSeconds() int64 {
return decodeBigEndianUint48(e.data)
}
// entry byte 6: format version (currently always == 1).
func (e indexEntryInfoV1) GetFormatVersion() byte {
return e.data[6]
}
// entry byte 7: length of pack content ID
// entry bytes 8..11: 4 bytes, big endian, offset within index file where pack (blob) ID begins.
func (e indexEntryInfoV1) GetPackBlobID() blob.ID {
nameLength := int(e.data[7])
nameOffset := decodeBigEndianUint32(e.data[8:])
nameBuf, err := safeSlice(e.b.data, int64(nameOffset), nameLength)
if err != nil {
return invalidBlobID
}
return blob.ID(nameBuf[0:nameLength])
}
// entry bytes 12..15 - deleted flag (MSBit), 31 lower bits encode pack offset.
func (e indexEntryInfoV1) GetDeleted() bool {
return e.data[12]&0x80 != 0
}
func (e indexEntryInfoV1) GetPackOffset() uint32 {
const packOffsetMask = 1<<31 - 1
return decodeBigEndianUint32(e.data[12:]) & packOffsetMask
}
// bytes 16..19: 4 bytes, big endian, content length.
func (e indexEntryInfoV1) GetPackedLength() uint32 {
return decodeBigEndianUint32(e.data[16:])
}
func (e indexEntryInfoV1) GetOriginalLength() uint32 {
return e.GetPackedLength() - e.b.v1PerContentOverhead
}
func (e indexEntryInfoV1) Timestamp() time.Time {
return time.Unix(e.GetTimestampSeconds(), 0)
}
func (e indexEntryInfoV1) GetCompressionHeaderID() compression.HeaderID {
return 0
}
func (e indexEntryInfoV1) GetEncryptionKeyID() byte {
return 0
}
var _ InfoReader = indexEntryInfoV1{}
type indexV1 struct {
hdr v1HeaderInfo
data []byte
@@ -116,6 +48,57 @@ type indexV1 struct {
// v1 index does not explicitly store per-content length so we compute it from packed length and fixed overhead
// provided by the encryptor.
v1PerContentOverhead uint32
nameOffsetToBlobIDMutex sync.Mutex
// +checklocks:nameOffsetToBlobIDMutex
nameOffsetToBlobID map[uint32]blob.ID
}
func (b *indexV1) packBlobIDForOffset(nameOffset uint32, nameLength int) blob.ID {
b.nameOffsetToBlobIDMutex.Lock()
defer b.nameOffsetToBlobIDMutex.Unlock()
packBlobID, ok := b.nameOffsetToBlobID[nameOffset]
if !ok {
nameBuf, err := safeSlice(b.data, int64(nameOffset), nameLength)
if err != nil {
return invalidBlobID
}
packBlobID = blob.ID(nameBuf[0:nameLength])
b.nameOffsetToBlobID[nameOffset] = packBlobID
}
return packBlobID
}
func (b *indexV1) entryToInfoStruct(contentID ID, data []byte, result *Info) error {
if len(data) != v1EntryLength {
return errors.Errorf("invalid entry length: %v", len(data))
}
result.ContentID = contentID
result.TimestampSeconds = decodeBigEndianUint48(data)
result.FormatVersion = data[6]
// entry byte 7: length of pack content ID
// entry bytes 8..11: 4 bytes, big endian, offset within index file where pack (blob) ID begins.
nameLength := int(data[7])
nameOffset := decodeBigEndianUint32(data[8:])
result.PackBlobID = b.packBlobIDForOffset(nameOffset, nameLength)
// entry bytes 12..15 - deleted flag (MSBit), 31 lower bits encode pack offset.
result.Deleted = data[12]&0x80 != 0 //nolint:gomnd
const packOffsetMask = 1<<31 - 1
result.PackOffset = decodeBigEndianUint32(data[12:]) & packOffsetMask
result.PackedLength = decodeBigEndianUint32(data[16:])
result.OriginalLength = result.PackedLength - b.v1PerContentOverhead
result.CompressionHeaderID = 0
result.EncryptionKeyID = 0
return nil
}
func (b *indexV1) ApproximateCount() int {
@@ -125,7 +108,7 @@ func (b *indexV1) ApproximateCount() int {
// Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically.
// The iteration ends when the callback returns an error, which is propagated to the caller or when
// all contents have been visited.
func (b *indexV1) Iterate(r IDRange, cb func(InfoReader) error) error {
func (b *indexV1) Iterate(r IDRange, cb func(Info) error) error {
startPos, err := b.findEntryPosition(r.StartID)
if err != nil {
return errors.Wrap(err, "could not find starting position")
@@ -146,12 +129,13 @@ func (b *indexV1) Iterate(r IDRange, cb func(InfoReader) error) error {
break
}
i, err := b.entryToInfo(contentID, entry[b.hdr.keySize:])
if err != nil {
var tmp Info
if err := b.entryToInfoStruct(contentID, entry[b.hdr.keySize:], &tmp); err != nil {
return errors.Wrap(err, "invalid index data")
}
if err := cb(i); err != nil {
if err := cb(tmp); err != nil {
return err
}
}
@@ -241,27 +225,27 @@ func (b *indexV1) findEntry(output []byte, contentID ID) ([]byte, error) {
}
// GetInfo returns information about a given content. If a content is not found, nil is returned.
func (b *indexV1) GetInfo(contentID ID) (InfoReader, error) {
func (b *indexV1) GetInfo(contentID ID, result *Info) (bool, error) {
var entryBuf [v1MaxEntrySize]byte
e, err := b.findEntry(entryBuf[:0], contentID)
if err != nil {
return nil, err
return false, err
}
if e == nil {
return nil, nil
return false, nil
}
return b.entryToInfo(contentID, e)
}
func (b *indexV1) entryToInfo(contentID ID, entryData []byte) (InfoReader, error) {
if len(entryData) != v1EntryLength {
return nil, errors.Errorf("invalid entry length: %v", len(entryData))
if len(e) != v1EntryLength {
return false, errors.Errorf("invalid entry length: %v", len(e))
}
return indexEntryInfoV1{entryData, contentID, b}, nil
if err := b.entryToInfoStruct(contentID, e, result); err != nil {
return false, errors.Wrap(err, "unable to convert entry to info")
}
return true, nil
}
// Close closes the index.
@@ -330,13 +314,13 @@ func (b *indexBuilderV1) prepareExtraData(allContents []Info) []byte {
for i, it := range allContents {
if i == 0 {
b.keyLength = len(contentIDToBytes(hashBuf[:0], it.GetContentID()))
b.keyLength = len(contentIDToBytes(hashBuf[:0], it.ContentID))
}
if it.GetPackBlobID() != "" {
if _, ok := b.packBlobIDOffsets[it.GetPackBlobID()]; !ok {
b.packBlobIDOffsets[it.GetPackBlobID()] = uint32(len(extraData))
extraData = append(extraData, []byte(it.GetPackBlobID())...)
if it.PackBlobID != "" {
if _, ok := b.packBlobIDOffsets[it.PackBlobID]; !ok {
b.packBlobIDOffsets[it.PackBlobID] = uint32(len(extraData))
extraData = append(extraData, []byte(it.PackBlobID)...)
}
}
}
@@ -349,17 +333,17 @@ func (b *indexBuilderV1) prepareExtraData(allContents []Info) []byte {
func (b *indexBuilderV1) writeEntry(w io.Writer, it Info, entry []byte) error {
var hashBuf [maxContentIDSize]byte
k := contentIDToBytes(hashBuf[:0], it.GetContentID())
k := contentIDToBytes(hashBuf[:0], it.ContentID)
if len(k) != b.keyLength {
return errors.Errorf("inconsistent key length: %v vs %v", len(k), b.keyLength)
}
if it.GetCompressionHeaderID() != 0 {
if it.CompressionHeaderID != 0 {
return errors.Errorf("compression not supported in index v1")
}
if it.GetEncryptionKeyID() != 0 {
if it.EncryptionKeyID != 0 {
return errors.Errorf("encryption key ID not supported in index v1")
}
@@ -383,23 +367,23 @@ func (b *indexBuilderV1) formatEntry(entry []byte, it Info) error {
entryPackFileOffset := entry[8:12]
entryPackedOffset := entry[12:16]
entryPackedLength := entry[16:20]
timestampAndFlags := uint64(it.GetTimestampSeconds()) << 16 //nolint:gomnd
timestampAndFlags := uint64(it.TimestampSeconds) << 16 //nolint:gomnd
packBlobID := it.GetPackBlobID()
packBlobID := it.PackBlobID
if len(packBlobID) == 0 {
return errors.Errorf("empty pack content ID for %v", it.GetContentID())
return errors.Errorf("empty pack content ID for %v", it.ContentID)
}
binary.BigEndian.PutUint32(entryPackFileOffset, b.extraDataOffset+b.packBlobIDOffsets[packBlobID])
if it.GetDeleted() {
binary.BigEndian.PutUint32(entryPackedOffset, it.GetPackOffset()|v1DeletedMarker)
if it.Deleted {
binary.BigEndian.PutUint32(entryPackedOffset, it.PackOffset|v1DeletedMarker)
} else {
binary.BigEndian.PutUint32(entryPackedOffset, it.GetPackOffset())
binary.BigEndian.PutUint32(entryPackedOffset, it.PackOffset)
}
binary.BigEndian.PutUint32(entryPackedLength, it.GetPackedLength())
timestampAndFlags |= uint64(it.GetFormatVersion()) << 8 //nolint:gomnd
binary.BigEndian.PutUint32(entryPackedLength, it.PackedLength)
timestampAndFlags |= uint64(it.FormatVersion) << 8 //nolint:gomnd
timestampAndFlags |= uint64(len(packBlobID))
binary.BigEndian.PutUint64(entryTimestampAndFlags, timestampAndFlags)
@@ -434,5 +418,5 @@ func v1ReadHeader(data []byte) (v1HeaderInfo, error) {
}
func openV1PackIndex(hdr v1HeaderInfo, data []byte, closer func() error, overhead uint32) (Index, error) {
return &indexV1{hdr, data, closer, overhead}, nil
return &indexV1{hdr, data, closer, overhead, sync.Mutex{}, map[uint32]blob.ID{}}, nil
}

View File

@@ -7,7 +7,6 @@
"fmt"
"io"
"sort"
"time"
"github.com/pkg/errors"
@@ -114,7 +113,7 @@ type FormatV2 struct {
Entries []struct {
Key []byte // key bytes (KeySize)
Entry indexV2EntryInfo
Entry []byte // entry bytes (EntrySize)
}
// each entry contains offset+length of the name of the pack blob, so that each entry can refer to the index
@@ -136,96 +135,63 @@ type indexV2FormatInfo struct {
encryptionKeyID byte
}
type indexV2EntryInfo struct {
data []byte
contentID ID
b *indexV2
type indexV2 struct {
hdr v2HeaderInfo
data []byte
closer func() error
formats []indexV2FormatInfo
packBlobIDs []blob.ID
}
func (e indexV2EntryInfo) GetContentID() ID {
return e.contentID
}
func (e indexV2EntryInfo) GetTimestampSeconds() int64 {
return int64(decodeBigEndianUint32(e.data[v2EntryOffsetTimestampSeconds:])) + int64(e.b.hdr.baseTimestamp)
}
func (e indexV2EntryInfo) GetDeleted() bool {
return e.data[v2EntryOffsetPackOffsetAndFlags]&v2EntryDeletedFlag != 0
}
func (e indexV2EntryInfo) GetPackOffset() uint32 {
return decodeBigEndianUint32(e.data[v2EntryOffsetPackOffsetAndFlags:]) & v2EntryPackOffsetMask
}
func (e indexV2EntryInfo) GetOriginalLength() uint32 {
v := decodeBigEndianUint24(e.data[v2EntryOffsetOriginalLength:])
if len(e.data) > v2EntryOffsetHighLengthBits {
v |= uint32(e.data[v2EntryOffsetHighLengthBits]>>v2EntryHighLengthBitsOriginalLengthShift) << v2EntryHighLengthShift
func (b *indexV2) entryToInfoStruct(contentID ID, data []byte, result *Info) error {
if len(data) < v2EntryMinLength {
return errors.Errorf("invalid entry length: %v", len(data))
}
return v
}
result.ContentID = contentID
result.TimestampSeconds = int64(decodeBigEndianUint32(data[v2EntryOffsetTimestampSeconds:])) + int64(b.hdr.baseTimestamp)
result.Deleted = data[v2EntryOffsetPackOffsetAndFlags]&v2EntryDeletedFlag != 0
result.PackOffset = decodeBigEndianUint32(data[v2EntryOffsetPackOffsetAndFlags:]) & v2EntryPackOffsetMask
result.OriginalLength = decodeBigEndianUint24(data[v2EntryOffsetOriginalLength:])
func (e indexV2EntryInfo) GetPackedLength() uint32 {
v := decodeBigEndianUint24(e.data[v2EntryOffsetPackedLength:])
if len(e.data) > v2EntryOffsetHighLengthBits {
v |= uint32(e.data[v2EntryOffsetHighLengthBits]&v2EntryHghLengthBitsPackedLengthMask) << v2EntryHighLengthShift
if len(data) > v2EntryOffsetHighLengthBits {
result.OriginalLength |= uint32(data[v2EntryOffsetHighLengthBits]>>v2EntryHighLengthBitsOriginalLengthShift) << v2EntryHighLengthShift
}
return v
result.PackedLength = decodeBigEndianUint24(data[v2EntryOffsetPackedLength:])
if len(data) > v2EntryOffsetHighLengthBits {
result.PackedLength |= uint32(data[v2EntryOffsetHighLengthBits]&v2EntryHghLengthBitsPackedLengthMask) << v2EntryHighLengthShift
}
fid := formatIDIndex(data)
if fid >= len(b.formats) {
result.FormatVersion = invalidFormatVersion
result.CompressionHeaderID = invalidCompressionHeaderID
result.EncryptionKeyID = invalidEncryptionKeyID
} else {
result.FormatVersion = b.formats[fid].formatVersion
result.CompressionHeaderID = b.formats[fid].compressionHeaderID
result.EncryptionKeyID = b.formats[fid].encryptionKeyID
}
packIDIndex := uint32(decodeBigEndianUint16(data[v2EntryOffsetPackBlobID:]))
if len(data) > v2EntryOffsetExtendedPackBlobID {
packIDIndex |= uint32(data[v2EntryOffsetExtendedPackBlobID]) << v2EntryExtendedPackBlobIDShift
}
result.PackBlobID = b.getPackBlobIDByIndex(packIDIndex)
return nil
}
func (e indexV2EntryInfo) formatIDIndex() int {
if len(e.data) > v2EntryOffsetFormatID {
return int(e.data[v2EntryOffsetFormatID])
func formatIDIndex(data []byte) int {
if len(data) > v2EntryOffsetFormatID {
return int(data[v2EntryOffsetFormatID])
}
return 0
}
func (e indexV2EntryInfo) GetFormatVersion() byte {
fid := e.formatIDIndex()
if fid > len(e.b.formats) {
return invalidFormatVersion
}
return e.b.formats[fid].formatVersion
}
func (e indexV2EntryInfo) GetCompressionHeaderID() compression.HeaderID {
fid := e.formatIDIndex()
if fid > len(e.b.formats) {
return invalidCompressionHeaderID
}
return e.b.formats[fid].compressionHeaderID
}
func (e indexV2EntryInfo) GetEncryptionKeyID() byte {
fid := e.formatIDIndex()
if fid > len(e.b.formats) {
return invalidEncryptionKeyID
}
return e.b.formats[fid].encryptionKeyID
}
func (e indexV2EntryInfo) GetPackBlobID() blob.ID {
packIDIndex := uint32(decodeBigEndianUint16(e.data[v2EntryOffsetPackBlobID:]))
if len(e.data) > v2EntryOffsetExtendedPackBlobID {
packIDIndex |= uint32(e.data[v2EntryOffsetExtendedPackBlobID]) << v2EntryExtendedPackBlobIDShift
}
return e.b.getPackBlobIDByIndex(packIDIndex)
}
func (e indexV2EntryInfo) Timestamp() time.Time {
return time.Unix(e.GetTimestampSeconds(), 0)
}
var _ InfoReader = indexV2EntryInfo{}
type v2HeaderInfo struct {
version int
keySize int
@@ -242,32 +208,12 @@ type v2HeaderInfo struct {
entryStride int64 // guaranteed to be < v2MaxEntrySize
}
type indexV2 struct {
hdr v2HeaderInfo
data []byte
closer func() error
formats []indexV2FormatInfo
}
func (b *indexV2) getPackBlobIDByIndex(ndx uint32) blob.ID {
if ndx >= uint32(b.hdr.packCount) {
return invalidBlobID
}
buf, err := safeSlice(b.data, b.hdr.packsOffset+int64(v2PackInfoSize*ndx), v2PackInfoSize)
if err != nil {
return invalidBlobID
}
nameLength := int(buf[0])
nameOffset := binary.BigEndian.Uint32(buf[1:])
nameBuf, err := safeSliceString(b.data, int64(nameOffset), nameLength)
if err != nil {
return invalidBlobID
}
return blob.ID(nameBuf)
return b.packBlobIDs[ndx]
}
func (b *indexV2) ApproximateCount() int {
@@ -277,12 +223,14 @@ func (b *indexV2) ApproximateCount() int {
// Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically.
// The iteration ends when the callback returns an error, which is propagated to the caller or when
// all contents have been visited.
func (b *indexV2) Iterate(r IDRange, cb func(InfoReader) error) error {
func (b *indexV2) Iterate(r IDRange, cb func(Info) error) error {
startPos, err := b.findEntryPosition(r.StartID)
if err != nil {
return errors.Wrap(err, "could not find starting position")
}
var tmp Info
for i := startPos; i < b.hdr.entryCount; i++ {
entry, err := safeSlice(b.data, b.entryOffset(i), int(b.hdr.entryStride))
if err != nil {
@@ -296,12 +244,11 @@ func (b *indexV2) Iterate(r IDRange, cb func(InfoReader) error) error {
break
}
i, err := b.entryToInfo(contentID, entry[b.hdr.keySize:])
if err != nil {
if err := b.entryToInfoStruct(contentID, entry[b.hdr.keySize:], &tmp); err != nil {
return errors.Wrap(err, "invalid index data")
}
if err := cb(i); err != nil {
if err := cb(tmp); err != nil {
return err
}
}
@@ -389,25 +336,21 @@ func (b *indexV2) findEntry(contentID ID) ([]byte, error) {
}
// GetInfo returns information about a given content. If a content is not found, nil is returned.
func (b *indexV2) GetInfo(contentID ID) (InfoReader, error) {
func (b *indexV2) GetInfo(contentID ID, result *Info) (bool, error) {
e, err := b.findEntry(contentID)
if err != nil {
return nil, err
return false, err
}
if e == nil {
return nil, nil
return false, nil
}
return b.entryToInfo(contentID, e)
}
func (b *indexV2) entryToInfo(contentID ID, entryData []byte) (InfoReader, error) {
if len(entryData) < v2EntryMinLength {
return nil, errors.Errorf("invalid entry length: %v", len(entryData))
if err := b.entryToInfoStruct(contentID, e, result); err != nil {
return false, err
}
return indexV2EntryInfo{entryData, contentID, b}, nil
return true, nil
}
// Close closes the index.
@@ -432,9 +375,9 @@ type indexBuilderV2 struct {
func indexV2FormatInfoFromInfo(v Info) indexV2FormatInfo {
return indexV2FormatInfo{
formatVersion: v.GetFormatVersion(),
compressionHeaderID: v.GetCompressionHeaderID(),
encryptionKeyID: v.GetEncryptionKeyID(),
formatVersion: v.FormatVersion,
compressionHeaderID: v.CompressionHeaderID,
encryptionKeyID: v.EncryptionKeyID,
}
}
@@ -457,7 +400,7 @@ func buildPackIDToIndexMap(sortedInfos []Info) map[blob.ID]int {
result := map[blob.ID]int{}
for _, v := range sortedInfos {
blobID := v.GetPackBlobID()
blobID := v.PackBlobID
if _, ok := result[blobID]; !ok {
result[blobID] = len(result)
}
@@ -469,15 +412,15 @@ func buildPackIDToIndexMap(sortedInfos []Info) map[blob.ID]int {
// maxContentLengths computes max content lengths in the builder.
func maxContentLengths(sortedInfos []Info) (maxPackedLength, maxOriginalLength, maxPackOffset uint32) {
for _, v := range sortedInfos {
if l := v.GetPackedLength(); l > maxPackedLength {
if l := v.PackedLength; l > maxPackedLength {
maxPackedLength = l
}
if l := v.GetOriginalLength(); l > maxOriginalLength {
if l := v.OriginalLength; l > maxOriginalLength {
maxOriginalLength = l
}
if l := v.GetPackOffset(); l > maxPackOffset {
if l := v.PackOffset; l > maxPackOffset {
maxPackOffset = l
}
}
@@ -538,7 +481,7 @@ func newIndexBuilderV2(sortedInfos []Info) (*indexBuilderV2, error) {
if len(sortedInfos) > 0 {
var hashBuf [maxContentIDSize]byte
keyLength = len(contentIDToBytes(hashBuf[:0], sortedInfos[0].GetContentID()))
keyLength = len(contentIDToBytes(hashBuf[:0], sortedInfos[0].ContentID))
}
return &indexBuilderV2{
@@ -627,10 +570,10 @@ func (b *indexBuilderV2) prepareExtraData(sortedInfos []Info) []byte {
var extraData []byte
for _, it := range sortedInfos {
if it.GetPackBlobID() != "" {
if _, ok := b.packBlobIDOffsets[it.GetPackBlobID()]; !ok {
b.packBlobIDOffsets[it.GetPackBlobID()] = uint32(len(extraData))
extraData = append(extraData, []byte(it.GetPackBlobID())...)
if it.PackBlobID != "" {
if _, ok := b.packBlobIDOffsets[it.PackBlobID]; !ok {
b.packBlobIDOffsets[it.PackBlobID] = uint32(len(extraData))
extraData = append(extraData, []byte(it.PackBlobID)...)
}
}
}
@@ -646,7 +589,7 @@ func (b *indexBuilderV2) prepareExtraData(sortedInfos []Info) []byte {
func (b *indexBuilderV2) writeIndexEntry(w io.Writer, it Info) error {
var hashBuf [maxContentIDSize]byte
k := contentIDToBytes(hashBuf[:0], it.GetContentID())
k := contentIDToBytes(hashBuf[:0], it.ContentID)
if len(k) != b.keyLength {
return errors.Errorf("inconsistent key length: %v vs %v", len(k), b.keyLength)
@@ -693,14 +636,14 @@ func (b *indexBuilderV2) writeIndexValueEntry(w io.Writer, it Info) error {
binary.BigEndian.PutUint32(
buf[v2EntryOffsetTimestampSeconds:],
uint32(it.GetTimestampSeconds()-b.baseTimestamp))
uint32(it.TimestampSeconds-b.baseTimestamp))
// 4-7: pack offset bits 0..29
// flags:
// isDeleted (1 bit)
packOffsetAndFlags := it.GetPackOffset()
if it.GetDeleted() {
packOffsetAndFlags := it.PackOffset
if it.Deleted {
packOffsetAndFlags |= v2DeletedMarker
}
@@ -708,15 +651,15 @@ func (b *indexBuilderV2) writeIndexValueEntry(w io.Writer, it Info) error {
// 8-10: original length bits 0..23
encodeBigEndianUint24(buf[v2EntryOffsetOriginalLength:], it.GetOriginalLength())
encodeBigEndianUint24(buf[v2EntryOffsetOriginalLength:], it.OriginalLength)
// 11-13: packed length bits 0..23
encodeBigEndianUint24(buf[v2EntryOffsetPackedLength:], it.GetPackedLength())
encodeBigEndianUint24(buf[v2EntryOffsetPackedLength:], it.PackedLength)
// 14-15: pack ID (lower 16 bits)- index into Packs[]
packBlobIndex := b.packID2Index[it.GetPackBlobID()]
packBlobIndex := b.packID2Index[it.PackBlobID]
binary.BigEndian.PutUint16(buf[v2EntryOffsetPackBlobID:], uint16(packBlobIndex))
// 16: format ID - index into Formats[] - 0 - present if not all formats are identical
@@ -729,7 +672,7 @@ func (b *indexBuilderV2) writeIndexValueEntry(w io.Writer, it Info) error {
// 18: high-order bits - present if any content length is greater than 2^24 == 16MiB
// original length bits 24..27 (4 hi bits)
// packed length bits 24..27 (4 lo bits)
buf[v2EntryOffsetHighLengthBits] = byte(it.GetPackedLength()>>v2EntryHighLengthShift) | byte((it.GetOriginalLength()>>v2EntryHighLengthShift)<<v2EntryHighLengthBitsOriginalLengthShift)
buf[v2EntryOffsetHighLengthBits] = byte(it.PackedLength>>v2EntryHighLengthShift) | byte((it.OriginalLength>>v2EntryHighLengthShift)<<v2EntryHighLengthBitsOriginalLengthShift)
for i := b.entrySize; i < v2EntryMaxLength; i++ {
if buf[i] != 0 {
@@ -777,11 +720,31 @@ func openV2PackIndex(data []byte, closer func() error) (Index, error) {
return nil, errors.Errorf("unable to read formats section")
}
packIDs := make([]blob.ID, hi.packCount)
for i := range int(hi.packCount) {
buf, err := safeSlice(data, hi.packsOffset+int64(v2PackInfoSize*i), v2PackInfoSize)
if err != nil {
return nil, errors.Errorf("unable to read pack blob IDs section - 1")
}
nameLength := int(buf[0])
nameOffset := binary.BigEndian.Uint32(buf[1:])
nameBuf, err := safeSliceString(data, int64(nameOffset), nameLength)
if err != nil {
return nil, errors.Errorf("unable to read pack blob IDs section - 2")
}
packIDs[i] = blob.ID(nameBuf)
}
return &indexV2{
hdr: hi,
data: data,
closer: closer,
formats: parseFormatsBuffer(formatsBuf, int(hi.formatCount)),
hdr: hi,
data: data,
closer: closer,
formats: parseFormatsBuffer(formatsBuf, int(hi.formatCount)),
packBlobIDs: packIDs,
}, nil
}

View File

@@ -7,23 +7,6 @@
"github.com/kopia/kopia/repo/compression"
)
// InfoReader is an information about a single piece of content managed by Manager.
//
//nolint:interfacebloat
type InfoReader interface {
GetContentID() ID
GetPackBlobID() blob.ID
GetTimestampSeconds() int64
Timestamp() time.Time
GetOriginalLength() uint32
GetPackedLength() uint32
GetPackOffset() uint32
GetDeleted() bool
GetFormatVersion() byte
GetCompressionHeaderID() compression.HeaderID
GetEncryptionKeyID() byte
}
// Info is an implementation of Info based on a structure.
type Info struct {
PackBlobID blob.ID `json:"packFile,omitempty"`
@@ -38,53 +21,7 @@ type Info struct {
EncryptionKeyID byte `json:"encryptionKeyID,omitempty"`
}
// GetContentID implements the Info interface.
func (i Info) GetContentID() ID { return i.ContentID }
// GetPackBlobID implements the Info interface.
func (i Info) GetPackBlobID() blob.ID { return i.PackBlobID }
// GetTimestampSeconds implements the Info interface.
func (i Info) GetTimestampSeconds() int64 { return i.TimestampSeconds }
// GetOriginalLength implements the Info interface.
func (i Info) GetOriginalLength() uint32 { return i.OriginalLength }
// GetPackedLength implements the Info interface.
func (i Info) GetPackedLength() uint32 { return i.PackedLength }
// GetPackOffset implements the Info interface.
func (i Info) GetPackOffset() uint32 { return i.PackOffset }
// GetDeleted implements the Info interface.
func (i Info) GetDeleted() bool { return i.Deleted }
// GetFormatVersion implements the Info interface.
func (i Info) GetFormatVersion() byte { return i.FormatVersion }
// GetCompressionHeaderID implements the Info interface.
func (i Info) GetCompressionHeaderID() compression.HeaderID { return i.CompressionHeaderID }
// GetEncryptionKeyID implements the Info interface.
func (i Info) GetEncryptionKeyID() byte { return i.EncryptionKeyID }
// Timestamp implements the Info interface.
func (i Info) Timestamp() time.Time {
return time.Unix(i.GetTimestampSeconds(), 0)
}
// ToInfoStruct converts the provided Info to InfoStruct.
func ToInfoStruct(i InfoReader) Info {
return Info{
ContentID: i.GetContentID(),
PackBlobID: i.GetPackBlobID(),
TimestampSeconds: i.GetTimestampSeconds(),
OriginalLength: i.GetOriginalLength(),
PackedLength: i.GetPackedLength(),
PackOffset: i.GetPackOffset(),
Deleted: i.GetDeleted(),
FormatVersion: i.GetFormatVersion(),
CompressionHeaderID: i.GetCompressionHeaderID(),
EncryptionKeyID: i.GetEncryptionKeyID(),
}
return time.Unix(i.TimestampSeconds, 0)
}

View File

@@ -33,70 +33,62 @@ func (m Merged) Close() error {
return errors.Wrap(err, "closing index shards")
}
func contentInfoGreaterThan(a, b InfoReader) bool {
if l, r := a.GetTimestampSeconds(), b.GetTimestampSeconds(); l != r {
// different timestamps, higher one wins
return l > r
}
if l, r := a.GetDeleted(), b.GetDeleted(); l != r {
// non-deleted is greater than deleted.
return !a.GetDeleted()
}
// both same time, both deleted, we must ensure we always resolve to the same pack blob.
// since pack blobs are random and unique, simple lexicographic ordering will suffice.
return a.GetPackBlobID() > b.GetPackBlobID()
}
func contentInfoGreaterThanStruct(a, b Info) bool {
if l, r := a.GetTimestampSeconds(), b.GetTimestampSeconds(); l != r {
if l, r := a.TimestampSeconds, b.TimestampSeconds; l != r {
// different timestamps, higher one wins
return l > r
}
if l, r := a.GetDeleted(), b.GetDeleted(); l != r {
if l, r := a.Deleted, b.Deleted; l != r {
// non-deleted is greater than deleted.
return !a.GetDeleted()
return !a.Deleted
}
// both same time, both deleted, we must ensure we always resolve to the same pack blob.
// since pack blobs are random and unique, simple lexicographic ordering will suffice.
return a.GetPackBlobID() > b.GetPackBlobID()
return a.PackBlobID > b.PackBlobID
}
// GetInfo returns information about a single content. If a content is not found, returns (nil,nil).
func (m Merged) GetInfo(id ID) (InfoReader, error) {
var best InfoReader
// GetInfo returns information about a single content. If a content is not found, returns (false,nil).
func (m Merged) GetInfo(id ID, result *Info) (bool, error) {
var (
found bool
tmp Info
)
for _, ndx := range m {
i, err := ndx.GetInfo(id)
ok, err := ndx.GetInfo(id, &tmp)
if err != nil {
return nil, errors.Wrapf(err, "error getting id %v from index shard", id)
return false, errors.Wrapf(err, "error getting id %v from index shard", id)
}
if i != nil && (best == nil || contentInfoGreaterThan(i, best)) {
best = i
if !ok {
continue
}
if !found || contentInfoGreaterThanStruct(tmp, *result) {
*result = tmp
found = true
}
}
return best, nil
return found, nil
}
type nextInfo struct {
it InfoReader
ch <-chan InfoReader
it Info
ch <-chan Info
}
type nextInfoHeap []*nextInfo
func (h nextInfoHeap) Len() int { return len(h) }
func (h nextInfoHeap) Less(i, j int) bool {
if a, b := h[i].it.GetContentID(), h[j].it.GetContentID(); a != b {
if a, b := h[i].it.ContentID, h[j].it.ContentID; a != b {
return a.less(b)
}
return !contentInfoGreaterThan(h[i].it, h[j].it)
return !contentInfoGreaterThanStruct(h[i].it, h[j].it)
}
func (h nextInfoHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
@@ -113,14 +105,14 @@ func (h *nextInfoHeap) Pop() interface{} {
return x
}
func iterateChan(r IDRange, ndx Index, done chan bool, wg *sync.WaitGroup) <-chan InfoReader {
ch := make(chan InfoReader, 1)
func iterateChan(r IDRange, ndx Index, done chan bool, wg *sync.WaitGroup) <-chan Info {
ch := make(chan Info, 1)
go func() {
defer wg.Done()
defer close(ch)
_ = ndx.Iterate(r, func(i InfoReader) error {
_ = ndx.Iterate(r, func(i Info) error {
select {
case <-done:
return errors.New("end of iteration")
@@ -135,7 +127,7 @@ func iterateChan(r IDRange, ndx Index, done chan bool, wg *sync.WaitGroup) <-cha
// Iterate invokes the provided callback for all unique content IDs in the underlying sources until either
// all contents have been visited or until an error is returned by the callback.
func (m Merged) Iterate(r IDRange, cb func(i InfoReader) error) error {
func (m Merged) Iterate(r IDRange, cb func(i Info) error) error {
var minHeap nextInfoHeap
done := make(chan bool)
@@ -158,20 +150,24 @@ func (m Merged) Iterate(r IDRange, cb func(i InfoReader) error) error {
defer wg.Wait()
defer close(done)
var pendingItem InfoReader
var (
havePendingItem bool
pendingItem Info
)
for len(minHeap) > 0 {
//nolint:forcetypeassert
min := heap.Pop(&minHeap).(*nextInfo)
if pendingItem == nil || pendingItem.GetContentID() != min.it.GetContentID() {
if pendingItem != nil {
if !havePendingItem || pendingItem.ContentID != min.it.ContentID {
if havePendingItem {
if err := cb(pendingItem); err != nil {
return err
}
}
pendingItem = min.it
} else if min.it != nil && contentInfoGreaterThan(min.it, pendingItem) {
havePendingItem = true
} else if contentInfoGreaterThanStruct(min.it, pendingItem) {
pendingItem = min.it
}
@@ -181,7 +177,7 @@ func (m Merged) Iterate(r IDRange, cb func(i InfoReader) error) error {
}
}
if pendingItem != nil {
if havePendingItem {
return cb(pendingItem)
}

View File

@@ -40,15 +40,17 @@ func TestMerged(t *testing.T) {
require.Equal(t, 11, m.ApproximateCount())
i, err := m.GetInfo(mustParseID(t, "aabbcc"))
var i Info
ok, err := m.GetInfo(mustParseID(t, "aabbcc"), &i)
require.True(t, ok)
require.NoError(t, err)
require.NotNil(t, i)
require.Equal(t, uint32(33), i.GetPackOffset())
require.Equal(t, uint32(33), i.PackOffset)
require.NoError(t, m.Iterate(AllIDs, func(i InfoReader) error {
if i.GetContentID() == mustParseID(t, "de1e1e") {
if i.GetDeleted() {
require.NoError(t, m.Iterate(AllIDs, func(i Info) error {
if i.ContentID == mustParseID(t, "de1e1e") {
if i.Deleted {
t.Errorf("iteration preferred deleted content over non-deleted")
}
}
@@ -59,8 +61,8 @@ func TestMerged(t *testing.T) {
// error is propagated.
someErr := errors.Errorf("some error")
require.ErrorIs(t, m.Iterate(AllIDs, func(i InfoReader) error {
if i.GetContentID() == mustParseID(t, "aabbcc") {
require.ErrorIs(t, m.Iterate(AllIDs, func(i Info) error {
if i.ContentID == mustParseID(t, "aabbcc") {
return someErr
}
@@ -70,13 +72,14 @@ func TestMerged(t *testing.T) {
fmt.Println("=========== END")
// empty merged index does not invoke callback during iteration.
require.NoError(t, Merged{}.Iterate(AllIDs, func(i InfoReader) error {
require.NoError(t, Merged{}.Iterate(AllIDs, func(i Info) error {
return someErr
}))
i, err = m.GetInfo(mustParseID(t, "de1e1e"))
ok, err = m.GetInfo(mustParseID(t, "de1e1e"), &i)
require.True(t, ok)
require.NoError(t, err)
require.False(t, i.GetDeleted())
require.False(t, i.Deleted)
cases := []struct {
r IDRange
@@ -152,8 +155,8 @@ type failingIndex struct {
err error
}
func (i failingIndex) GetInfo(contentID ID) (InfoReader, error) {
return nil, i.err
func (i failingIndex) GetInfo(contentID ID, result *Info) (bool, error) {
return false, i.err
}
func TestMergedGetInfoError(t *testing.T) {
@@ -161,9 +164,10 @@ func TestMergedGetInfoError(t *testing.T) {
m := Merged{failingIndex{nil, someError}}
info, err := m.GetInfo(mustParseID(t, "xabcdef"))
var info Info
ok, err := m.GetInfo(mustParseID(t, "xabcdef"), &info)
require.ErrorIs(t, err, someError)
require.Nil(t, info)
require.False(t, ok)
}
func TestMergedIndexIsConsistent(t *testing.T) {
@@ -198,29 +202,31 @@ func TestMergedIndexIsConsistent(t *testing.T) {
}
for _, m := range cases {
i, err := m.GetInfo(mustParseID(t, "aabbcc"))
if err != nil || i == nil {
var i Info
ok, err := m.GetInfo(mustParseID(t, "aabbcc"), &i)
if err != nil || !ok {
t.Fatalf("unable to get info: %v", err)
}
// all things being equal, highest pack blob ID wins
require.Equal(t, blob.ID("zz"), i.GetPackBlobID())
require.Equal(t, blob.ID("zz"), i.PackBlobID)
i, err = m.GetInfo(mustParseID(t, "bbccdd"))
if err != nil || i == nil {
ok, err = m.GetInfo(mustParseID(t, "bbccdd"), &i)
if err != nil || !ok {
t.Fatalf("unable to get info: %v", err)
}
// given identical timestamps, non-deleted wins.
require.Equal(t, blob.ID("xx"), i.GetPackBlobID())
require.Equal(t, blob.ID("xx"), i.PackBlobID)
i, err = m.GetInfo(mustParseID(t, "ccddee"))
if err != nil || i == nil {
ok, err = m.GetInfo(mustParseID(t, "ccddee"), &i)
if err != nil || !ok {
t.Fatalf("unable to get info: %v", err)
}
// given identical timestamps and all deleted, highest pack blob ID wins.
require.Equal(t, blob.ID("hh"), i.GetPackBlobID())
require.Equal(t, blob.ID("hh"), i.PackBlobID)
}
}
@@ -229,8 +235,8 @@ func iterateIDRange(t *testing.T, m Index, r IDRange) []ID {
var inOrder []ID
require.NoError(t, m.Iterate(r, func(i InfoReader) error {
inOrder = append(inOrder, i.GetContentID())
require.NoError(t, m.Iterate(r, func(i Info) error {
inOrder = append(inOrder, i.ContentID)
return nil
}))

View File

@@ -7,7 +7,6 @@
"fmt"
"io"
"math/rand"
"reflect"
"strings"
"testing"
@@ -141,19 +140,13 @@ func testPackIndex(t *testing.T, version int) {
})
}
// dear future reader, if this fails because the number of methods has changed,
// you need to add additional test cases above.
if cnt := reflect.TypeOf((*InfoReader)(nil)).Elem().NumMethod(); cnt != 11 {
t.Fatalf("unexpected number of methods on content.Info: %v, must update the test", cnt)
}
infoMap := map[ID]Info{}
b1 := make(Builder)
b2 := make(Builder)
b3 := make(Builder)
for _, info := range infos {
infoMap[info.GetContentID()] = info
infoMap[info.ContentID] = info
b1.Add(info)
b2.Add(info)
b3.Add(info)
@@ -195,30 +188,32 @@ func testPackIndex(t *testing.T, version int) {
}
for _, want := range infos {
info2, err := ndx.GetInfo(want.GetContentID())
if err != nil {
t.Errorf("unable to find %v", want.GetContentID())
var info2 Info
ok, err := ndx.GetInfo(want.ContentID, &info2)
if err != nil || !ok {
t.Errorf("unable to find %v", want.ContentID)
continue
}
if version == 1 {
// v1 does not preserve original length.
want = withOriginalLength(want, want.GetPackedLength()-fakeEncryptionOverhead)
want = withOriginalLength(want, want.PackedLength-fakeEncryptionOverhead)
}
require.Equal(t, want, ToInfoStruct(info2))
require.Equal(t, want, info2)
}
cnt := 0
require.NoError(t, ndx.Iterate(AllIDs, func(info2 InfoReader) error {
want := infoMap[info2.GetContentID()]
require.NoError(t, ndx.Iterate(AllIDs, func(info2 Info) error {
want := infoMap[info2.ContentID]
if version == 1 {
// v1 does not preserve original length.
want = withOriginalLength(want, want.GetPackedLength()-fakeEncryptionOverhead)
want = withOriginalLength(want, want.PackedLength-fakeEncryptionOverhead)
}
require.Equal(t, want, ToInfoStruct(info2))
require.Equal(t, want, info2)
cnt++
return nil
}))
@@ -232,22 +227,24 @@ func testPackIndex(t *testing.T, version int) {
for i := range 100 {
contentID := deterministicContentID(t, "no-such-content", i)
v, err := ndx.GetInfo(contentID)
var v Info
ok, err := ndx.GetInfo(contentID, &v)
if err != nil {
t.Errorf("unable to get content %v: %v", contentID, err)
}
if v != nil {
if ok {
t.Errorf("unexpected result when getting content %v: %v", contentID, v)
}
}
for _, prefix := range prefixes {
cnt2 := 0
require.NoError(t, ndx.Iterate(PrefixRange(prefix), func(info2 InfoReader) error {
require.NoError(t, ndx.Iterate(PrefixRange(prefix), func(info2 Info) error {
cnt2++
if !strings.HasPrefix(info2.GetContentID().String(), string(prefix)) {
t.Errorf("unexpected item %v when iterating prefix %v", info2.GetContentID(), prefix)
if !strings.HasPrefix(info2.ContentID.String(), string(prefix)) {
t.Errorf("unexpected item %v when iterating prefix %v", info2.ContentID, prefix)
}
return nil
}))
@@ -284,10 +281,13 @@ func TestPackIndexPerContentLimits(t *testing.T) {
pi, err := Open(result.Bytes(), nil, func() int { return fakeEncryptionOverhead })
require.NoError(t, err)
got, err := pi.GetInfo(cid)
require.NoError(t, err)
var got Info
require.Equal(t, ToInfoStruct(got), tc.info)
ok, err := pi.GetInfo(cid, &got)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, got, tc.info)
} else {
err := b.buildV2(&result)
require.Error(t, err)
@@ -311,11 +311,11 @@ func TestSortedContents(t *testing.T) {
var last ID
for _, info := range got {
if info.GetContentID().less(last) {
t.Fatalf("not sorted %v (was %v)!", info.GetContentID(), last)
if info.ContentID.less(last) {
t.Fatalf("not sorted %v (was %v)!", info.ContentID, last)
}
last = info.GetContentID()
last = info.ContentID
}
}
@@ -358,11 +358,11 @@ func TestSortedContents2(t *testing.T) {
var last ID
for _, info := range got {
if info.GetContentID().less(last) {
t.Fatalf("not sorted %v (was %v)!", info.GetContentID(), last)
if info.ContentID.less(last) {
t.Fatalf("not sorted %v (was %v)!", info.ContentID, last)
}
last = info.GetContentID()
last = info.ContentID
}
}
@@ -404,9 +404,11 @@ func fuzzTestIndexOpen(originalData []byte) {
return
}
cnt := 0
_ = ndx.Iterate(AllIDs, func(cb InfoReader) error {
_ = ndx.Iterate(AllIDs, func(cb Info) error {
if cnt < 10 {
_, _ = ndx.GetInfo(cb.GetContentID())
var tmp Info
_, _ = ndx.GetInfo(cb.ContentID, &tmp)
}
cnt++
return nil
@@ -506,7 +508,7 @@ func verifyAllShardedIDs(t *testing.T, sharded []Builder, numTotal, numShards in
lens = append(lens, len(s))
for _, v := range s {
delete(m, v.GetContentID())
delete(m, v.ContentID)
}
}

View File

@@ -540,9 +540,9 @@ func (m *ManagerV0) dropContentsFromBuilder(bld index.Builder, opt CompactOption
m.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore)
for _, i := range bld {
if i.GetDeleted() && i.Timestamp().Before(opt.DropDeletedBefore) {
m.log.Debugf("drop-from-index-old-deleted %v %v", i.GetContentID(), i.Timestamp())
delete(bld, i.GetContentID())
if i.Deleted && i.Timestamp().Before(opt.DropDeletedBefore) {
m.log.Debugf("drop-from-index-old-deleted %v %v", i.ContentID, i.Timestamp())
delete(bld, i.ContentID)
}
}
@@ -564,8 +564,8 @@ func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld ind
return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
}
_ = ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
bld.Add(index.ToInfoStruct(i))
_ = ndx.Iterate(index.AllIDs, func(i index.Info) error {
bld.Add(i)
return nil
})

View File

@@ -78,32 +78,32 @@ func RewriteContents(ctx context.Context, rep repo.DirectRepositoryWriter, opt *
}
var optDeleted string
if c.GetDeleted() {
if c.Deleted {
optDeleted = " (deleted)"
}
age := rep.Time().Sub(c.Timestamp())
if age < safety.RewriteMinAge {
log(ctx).Debugf("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.", c.GetContentID(), c.GetPackedLength(), c.GetPackBlobID(), optDeleted, age)
log(ctx).Debugf("Not rewriting content %v (%v bytes) from pack %v%v %v, because it's too new.", c.ContentID, c.PackedLength, c.PackBlobID, optDeleted, age)
continue
}
log(ctx).Debugf("Rewriting content %v (%v bytes) from pack %v%v %v", c.GetContentID(), c.GetPackedLength(), c.GetPackBlobID(), optDeleted, age)
log(ctx).Debugf("Rewriting content %v (%v bytes) from pack %v%v %v", c.ContentID, c.PackedLength, c.PackBlobID, optDeleted, age)
mu.Lock()
totalBytes += int64(c.GetPackedLength())
totalBytes += int64(c.PackedLength)
mu.Unlock()
if opt.DryRun {
continue
}
if err := rep.ContentManager().RewriteContent(ctx, c.GetContentID()); err != nil {
if err := rep.ContentManager().RewriteContent(ctx, c.ContentID); err != nil {
// provide option to ignore failures when rewriting deleted contents during maintenance
// this is for advanced use only
if os.Getenv("KOPIA_IGNORE_MAINTENANCE_REWRITE_ERROR") != "" && c.GetDeleted() {
log(ctx).Infof("IGNORED: unable to rewrite deleted content %q: %v", c.GetContentID(), err)
if os.Getenv("KOPIA_IGNORE_MAINTENANCE_REWRITE_ERROR") != "" && c.Deleted {
log(ctx).Infof("IGNORED: unable to rewrite deleted content %q: %v", c.ContentID, err)
} else {
log(ctx).Infof("unable to rewrite content %q: %v", c.GetContentID(), err)
log(ctx).Infof("unable to rewrite content %q: %v", c.ContentID, err)
mu.Lock()
failedCount++
mu.Unlock()
@@ -171,7 +171,7 @@ func findContentWithFormatVersion(ctx context.Context, rep repo.DirectRepository
IncludeDeleted: true,
},
func(b content.Info) error {
if int(b.GetFormatVersion()) == opt.FormatVersion && strings.HasPrefix(string(b.GetPackBlobID()), string(opt.PackPrefix)) {
if int(b.FormatVersion) == opt.FormatVersion && strings.HasPrefix(string(b.PackBlobID), string(opt.PackPrefix)) {
ch <- contentInfoOrError{Info: b}
}

View File

@@ -105,7 +105,7 @@ func verifyContentDeletedState(ctx context.Context, t *testing.T, rep repo.Repos
info, err := rep.ContentInfo(ctx, cid)
require.NoError(t, err)
require.Equal(t, want, info.GetDeleted())
require.Equal(t, want, info.Deleted)
}
func verifyObjectReadable(ctx context.Context, t *testing.T, rep repo.Repository, objectID object.ID) {

View File

@@ -148,12 +148,12 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte
Range: index.PrefixRange(ContentPrefix),
Parallel: manifestLoadParallelism,
}, func(ci content.Info) error {
man, err := loadManifestContent(ctx, m.b, ci.GetContentID())
man, err := loadManifestContent(ctx, m.b, ci.ContentID)
if err != nil {
// this can be used to allow corrupterd repositories to still open and see the
// (incomplete) list of manifests.
if os.Getenv("KOPIA_IGNORE_MALFORMED_MANIFEST_CONTENTS") != "" {
log(ctx).Warnf("ignoring malformed manifest content %v: %v", ci.GetContentID(), err)
log(ctx).Warnf("ignoring malformed manifest content %v: %v", ci.ContentID, err)
return nil
}
@@ -162,7 +162,7 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte
}
mu.Lock()
manifests[ci.GetContentID()] = man
manifests[ci.ContentID] = man
mu.Unlock()
return nil

View File

@@ -64,12 +64,12 @@ func CalculateStorageStats(ctx context.Context, rep repo.Repository, manifests [
return errors.Wrapf(err, "error getting content info for %v", cid)
}
l := int64(info.GetOriginalLength())
l := int64(info.OriginalLength)
atomic.AddInt64(&unique.OriginalContentBytes, l)
atomic.AddInt64(&runningTotal.OriginalContentBytes, l)
l2 := int64(info.GetPackedLength())
l2 := int64(info.PackedLength)
atomic.AddInt64(&unique.PackedContentBytes, l2)
atomic.AddInt64(&runningTotal.PackedContentBytes, l2)

View File

@@ -76,8 +76,8 @@ func (v *Verifier) VerifyFile(ctx context.Context, oid object.ID, entryPath stri
return errors.Wrapf(err, "error verifying content %v", cid)
}
if _, ok := v.blobMap[ci.GetPackBlobID()]; !ok {
return errors.Errorf("object %v is backed by missing blob %v", oid, ci.GetPackBlobID())
if _, ok := v.blobMap[ci.PackBlobID]; !ok {
return errors.Errorf("object %v is backed by missing blob %v", oid, ci.PackBlobID)
}
}
}

View File

@@ -116,39 +116,39 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete
// Ensure that the iteration includes deleted contents, so those can be
// undeleted (recovered).
err := rep.ContentReader().IterateContents(ctx, content.IterateOptions{IncludeDeleted: true}, func(ci content.Info) error {
if manifest.ContentPrefix == ci.GetContentID().Prefix() {
system.Add(int64(ci.GetPackedLength()))
if manifest.ContentPrefix == ci.ContentID.Prefix() {
system.Add(int64(ci.PackedLength))
return nil
}
var cidbuf [128]byte
if used.Contains(ci.GetContentID().Append(cidbuf[:0])) {
if ci.GetDeleted() {
if err := rep.ContentManager().UndeleteContent(ctx, ci.GetContentID()); err != nil {
if used.Contains(ci.ContentID.Append(cidbuf[:0])) {
if ci.Deleted {
if err := rep.ContentManager().UndeleteContent(ctx, ci.ContentID); err != nil {
return errors.Wrapf(err, "Could not undelete referenced content: %v", ci)
}
undeleted.Add(int64(ci.GetPackedLength()))
undeleted.Add(int64(ci.PackedLength))
}
inUse.Add(int64(ci.GetPackedLength()))
inUse.Add(int64(ci.PackedLength))
return nil
}
if maintenanceStartTime.Sub(ci.Timestamp()) < safety.MinContentAgeSubjectToGC {
log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.GetContentID(), ci.GetPackedLength(), ci.Timestamp())
tooRecent.Add(int64(ci.GetPackedLength()))
log(ctx).Debugf("recent unreferenced content %v (%v bytes, modified %v)", ci.ContentID, ci.PackedLength, ci.Timestamp())
tooRecent.Add(int64(ci.PackedLength))
return nil
}
log(ctx).Debugf("unreferenced %v (%v bytes, modified %v)", ci.GetContentID(), ci.GetPackedLength(), ci.Timestamp())
cnt, totalSize := unused.Add(int64(ci.GetPackedLength()))
log(ctx).Debugf("unreferenced %v (%v bytes, modified %v)", ci.ContentID, ci.PackedLength, ci.Timestamp())
cnt, totalSize := unused.Add(int64(ci.PackedLength))
if gcDelete {
if err := rep.ContentManager().DeleteContent(ctx, ci.GetContentID()); err != nil {
if err := rep.ContentManager().DeleteContent(ctx, ci.ContentID); err != nil {
return errors.Wrap(err, "error deleting content")
}
}

View File

@@ -134,7 +134,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceReuseDirManifest(t *testing.T)
info, err := r2.(repo.DirectRepository).ContentInfo(ctx, mustGetContentID(t, s2.RootObjectID()))
require.NoError(t, err)
require.False(t, info.GetDeleted(), "content must not be deleted")
require.False(t, info.Deleted, "content must not be deleted")
_, err = r2.VerifyObject(ctx, s2.RootObjectID())
require.NoError(t, err)
@@ -148,7 +148,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceReuseDirManifest(t *testing.T)
info, err = th.RepositoryWriter.ContentInfo(ctx, mustGetContentID(t, s2.RootObjectID()))
require.NoError(t, err)
require.True(t, info.GetDeleted(), "content must be deleted")
require.True(t, info.Deleted, "content must be deleted")
_, err = th.RepositoryWriter.VerifyObject(ctx, s2.RootObjectID())
require.NoError(t, err)
@@ -162,7 +162,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceReuseDirManifest(t *testing.T)
// Was the previous root undeleted
info, err = th.RepositoryWriter.ContentInfo(ctx, mustGetContentID(t, s2.RootObjectID()))
require.NoError(t, err)
require.False(t, info.GetDeleted(), "content must not be deleted")
require.False(t, info.Deleted, "content must not be deleted")
_, err = th.RepositoryWriter.VerifyObject(ctx, s2.RootObjectID())
require.NoError(t, err)
@@ -426,6 +426,6 @@ func checkContentDeletion(t *testing.T, r repo.Repository, cids []content.ID, de
ci, err := r.ContentInfo(ctx, cid)
require.NoErrorf(t, err, "i:%d cid:%s", i, cid)
require.Equalf(t, deleted, ci.GetDeleted(), "i:%d cid:%s", i, cid)
require.Equalf(t, deleted, ci.Deleted, "i:%d cid:%s", i, cid)
}
}

View File

@@ -31,7 +31,7 @@ func TestDefaultGlobalPolicy(t *testing.T) {
t.Fatalf("unexpected number of contents %v, want %v", got, want)
}
globalPolicyContentID := contents[0].GetContentID()
globalPolicyContentID := contents[0].ContentID
e.RunAndExpectSuccess(t, "content", "show", "-jz", globalPolicyContentID.String())
// make sure the policy is visible in the manifest list

View File

@@ -1,93 +0,0 @@
package repomodel
import (
"math/rand"
"sync"
"github.com/kopia/kopia/repo/content"
)
// ContentSet represents a set of contents.
type ContentSet struct {
mu sync.Mutex
ids []content.ID
}
// PickRandom picks one random content from the set or empty string.
func (s *ContentSet) PickRandom() content.ID {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.ids) == 0 {
return content.EmptyID
}
//nolint:gosec
return s.ids[rand.Intn(len(s.ids))]
}
// Snapshot returns the snapshot of all IDs.
func (s *ContentSet) Snapshot() ContentSet {
s.mu.Lock()
defer s.mu.Unlock()
return ContentSet{
ids: append([]content.ID(nil), s.ids...),
}
}
// Replace replaces all elements in the set.
func (s *ContentSet) Replace(ids []content.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append([]content.ID(nil), s.ids...)
}
// Add adds the provided items to the set.
func (s *ContentSet) Add(d ...content.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append(s.ids, d...)
}
// RemoveAll removes the provided items from the set.
func (s *ContentSet) RemoveAll(d ...content.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = removeAllContentIDs(s.ids, d)
}
func removeAllContentIDs(a, b []content.ID) []content.ID {
var result []content.ID
for _, v := range a {
found := false
for _, v2 := range b {
if v2 == v {
found = true
break
}
}
if !found {
result = append(result, v)
}
}
return result
}
// Clear removes all elements from the set.
func (s *ContentSet) Clear() ContentSet {
s.mu.Lock()
defer s.mu.Unlock()
old := s.ids
s.ids = nil
return ContentSet{ids: old}
}

View File

@@ -1,93 +0,0 @@
package repomodel
import (
"math/rand"
"sync"
"github.com/kopia/kopia/repo/manifest"
)
// ManifestSet represents a set of manifests.
type ManifestSet struct {
mu sync.Mutex
ids []manifest.ID
}
// PickRandom picks one random manifest from the set or empty string.
func (s *ManifestSet) PickRandom() manifest.ID {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.ids) == 0 {
return ""
}
//nolint:gosec
return s.ids[rand.Intn(len(s.ids))]
}
// Snapshot returns the snapshot of all IDs.
func (s *ManifestSet) Snapshot() ManifestSet {
s.mu.Lock()
defer s.mu.Unlock()
return ManifestSet{
ids: append([]manifest.ID(nil), s.ids...),
}
}
// Replace replaces all elements in the set.
func (s *ManifestSet) Replace(ids []manifest.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append([]manifest.ID(nil), s.ids...)
}
// Add adds the provided items to the set.
func (s *ManifestSet) Add(d ...manifest.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = append(s.ids, d...)
}
// RemoveAll removes the provided items from the set.
func (s *ManifestSet) RemoveAll(d ...manifest.ID) {
s.mu.Lock()
defer s.mu.Unlock()
s.ids = removeAllManifestIDs(s.ids, d)
}
func removeAllManifestIDs(a, b []manifest.ID) []manifest.ID {
var result []manifest.ID
for _, v := range a {
found := false
for _, v2 := range b {
if v2 == v {
found = true
break
}
}
if !found {
result = append(result, v)
}
}
return result
}
// Clear removes all elements from the set.
func (s *ManifestSet) Clear() ManifestSet {
s.mu.Lock()
defer s.mu.Unlock()
old := s.ids
s.ids = nil
return ManifestSet{ids: old}
}

View File

@@ -1,28 +1,40 @@
package repomodel
import "sync"
import (
"context"
"sync"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/manifest"
)
var log = logging.Module("repomodel") // +checklocksignore
// OpenRepository models the behavior of an open repository.
type OpenRepository struct {
RepoData *RepositoryData
mu sync.Mutex
Contents ContentSet
Manifests ManifestSet
RepoData *RepositoryData // +checklocksignore
ReadableContents *TrackingSet[content.ID] // +checklocksignore
ReadableManifests *TrackingSet[manifest.ID] // +checklocksignore
EnableMaintenance bool
mu sync.Mutex
openID string
}
// Refresh refreshes the set of committed Contents and manifest from repositor.
func (o *OpenRepository) Refresh() {
o.Contents.Replace(o.RepoData.Contents.Snapshot().ids)
o.Manifests.Replace(o.RepoData.Manifests.Snapshot().ids)
func (o *OpenRepository) Refresh(ctx context.Context, cids *TrackingSet[content.ID], mids *TrackingSet[manifest.ID]) {
o.ReadableContents.Replace(ctx, cids.ids)
o.ReadableManifests.Replace(ctx, mids.ids)
}
// NewSession creates new model for a session to access a repository.
func (o *OpenRepository) NewSession() *RepositorySession {
func (o *OpenRepository) NewSession(sessionID string) *RepositorySession {
return &RepositorySession{
OpenRepo: o,
OpenRepo: o,
WrittenContents: NewChangeSet[content.ID](o.openID + "-written-" + sessionID),
WrittenManifests: NewChangeSet[manifest.ID](o.openID + "-written-" + sessionID),
}
}

View File

@@ -1,30 +1,39 @@
// Package repomodel provides simplified model of repository operation.
package repomodel
import "sync/atomic"
import (
"sync/atomic"
// RepositoryData models the d stored in the repository.
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
)
// RepositoryData models the data stored in the repository.
type RepositoryData struct {
Contents ContentSet
Manifests ManifestSet
CommittedContents *TrackingSet[content.ID]
CommittedManifests *TrackingSet[manifest.ID]
openCounter *int32
}
// OpenRepository returns an OpenRepository model based on current snapshot of RepositoryData.
func (d *RepositoryData) OpenRepository() *OpenRepository {
func (d *RepositoryData) OpenRepository(openID string) *OpenRepository {
return &OpenRepository{
RepoData: d,
Contents: d.Contents.Snapshot(),
Manifests: d.Manifests.Snapshot(),
ReadableContents: d.CommittedContents.Snapshot(openID + "-contents"),
ReadableManifests: d.CommittedManifests.Snapshot(openID + "-manifests"),
EnableMaintenance: atomic.AddInt32(d.openCounter, 1) == 1,
openID: openID,
}
}
// NewRepositoryData creates new RepositoryData model.
func NewRepositoryData() *RepositoryData {
return &RepositoryData{
openCounter: new(int32),
openCounter: new(int32),
CommittedContents: NewChangeSet[content.ID]("committed-contents"),
CommittedManifests: NewChangeSet[manifest.ID]("committed-manifests"),
}
}

View File

@@ -1,6 +1,8 @@
package repomodel
import (
"context"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
)
@@ -9,39 +11,39 @@
type RepositorySession struct {
OpenRepo *OpenRepository
WrittenContents ContentSet
WrittenManifests ManifestSet
WrittenContents *TrackingSet[content.ID]
WrittenManifests *TrackingSet[manifest.ID]
}
// WriteContent adds the provided content ID to the model.
func (s *RepositorySession) WriteContent(cid content.ID) {
s.WrittenContents.Add(cid)
func (s *RepositorySession) WriteContent(ctx context.Context, cid content.ID) {
s.WrittenContents.Add(ctx, cid)
}
// WriteManifest adds the provided manifest ID to the model.
func (s *RepositorySession) WriteManifest(mid manifest.ID) {
s.WrittenManifests.Add(mid)
func (s *RepositorySession) WriteManifest(ctx context.Context, mid manifest.ID) {
s.WrittenManifests.Add(ctx, mid)
}
// Refresh refreshes the set of committed contents and manifest from repositor.
func (s *RepositorySession) Refresh() {
s.OpenRepo.Refresh()
func (s *RepositorySession) Refresh(ctx context.Context, cids *TrackingSet[content.ID], mids *TrackingSet[manifest.ID]) {
s.OpenRepo.Refresh(ctx, cids, mids)
}
// Flush flushes the changes written in this RepositorySession and makes them available
// to other RepositoryData model.
func (s *RepositorySession) Flush(wc *ContentSet, wm *ManifestSet) {
func (s *RepositorySession) Flush(ctx context.Context, wc *TrackingSet[content.ID], wm *TrackingSet[manifest.ID]) {
s.OpenRepo.mu.Lock()
defer s.OpenRepo.mu.Unlock()
// data flushed is visible to other sessions in the same open repository.
s.OpenRepo.Contents.Add(wc.ids...)
s.OpenRepo.Manifests.Add(wm.ids...)
s.OpenRepo.ReadableContents.Add(ctx, wc.ids...)
s.OpenRepo.ReadableManifests.Add(ctx, wm.ids...)
// data flushed is visible to other sessions in other open repositories.
s.OpenRepo.RepoData.Contents.Add(wc.ids...)
s.OpenRepo.RepoData.Manifests.Add(wm.ids...)
s.OpenRepo.RepoData.CommittedContents.Add(ctx, wc.ids...)
s.OpenRepo.RepoData.CommittedManifests.Add(ctx, wm.ids...)
s.WrittenContents.RemoveAll(wc.ids...)
s.WrittenManifests.RemoveAll(wm.ids...)
s.WrittenContents.RemoveAll(ctx, wc.ids...)
s.WrittenManifests.RemoveAll(ctx, wm.ids...)
}

View File

@@ -0,0 +1,112 @@
package repomodel
import (
"context"
"math/rand"
"slices"
"sync"
)
// TrackingSet represents a set of items with built-in.
type TrackingSet[T comparable] struct {
mu sync.Mutex
ids []T // +checklocksignore
setID string // +checklocksignore
}
// PickRandom picks one random manifest from the set or empty string.
func (s *TrackingSet[T]) PickRandom(ctx context.Context) T {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.ids) == 0 {
var defT T
return defT
}
//nolint:gosec
picked := s.ids[rand.Intn(len(s.ids))]
log(ctx).Debugw("picked random", "setID", s.setID, "picked", picked)
return picked
}
// Snapshot returns the snapshot of all IDs.
func (s *TrackingSet[T]) Snapshot(name string) *TrackingSet[T] {
s.mu.Lock()
defer s.mu.Unlock()
return &TrackingSet[T]{
ids: append([]T(nil), s.ids...),
setID: name,
}
}
// Replace replaces all elements in the set.
func (s *TrackingSet[T]) Replace(ctx context.Context, ids []T) {
s.mu.Lock()
defer s.mu.Unlock()
log(ctx).Debugw("replacing set", "setID", s.setID, "ids", ids)
s.ids = append([]T(nil), ids...)
}
// Add adds the provided items to the set.
func (s *TrackingSet[T]) Add(ctx context.Context, d ...T) {
if len(d) == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
log(ctx).Debugw("adding to set", "setID", s.setID, "ids", d)
s.ids = append(s.ids, d...)
}
// RemoveAll removes the provided items from the set.
func (s *TrackingSet[T]) RemoveAll(ctx context.Context, d ...T) {
if len(d) == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
log(ctx).Debugw("removing from set", "setID", s.setID, "ids", d)
s.ids = removeAll(s.ids, d)
}
func removeAll[T comparable](original, toRemove []T) []T {
var result []T
for _, v := range original {
if !slices.Contains(toRemove, v) {
result = append(result, v)
}
}
return result
}
// Clear removes all elements from the set.
func (s *TrackingSet[T]) Clear(ctx context.Context) TrackingSet[T] {
s.mu.Lock()
defer s.mu.Unlock()
old := s.ids
s.ids = nil
log(ctx).Debugw("clearing set", "setID", s.setID, "was", old)
return TrackingSet[T]{ids: old}
}
// NewChangeSet creates new tracking set.
func NewChangeSet[T comparable](setID string) *TrackingSet[T] {
return &TrackingSet[T]{setID: setID}
}

View File

@@ -257,16 +257,18 @@ func runStress(t *testing.T, opt *StressOptions) {
for _, configFile := range configFiles {
for i := range opt.OpenRepositoriesPerConfig {
openID := fmt.Sprintf("open-%v", i)
eg.Go(func() error {
log := testlogging.Printf(func(msg string, args ...interface{}) {
fmt.Fprintf(logFile, clock.Now().Format("2006-01-02T15:04:05.000000Z07:00")+" "+msg+"\n", args...)
}, "").With("worker", fmt.Sprintf("%v::o%v", filepath.Base(configFile), i))
}, "").With("cfg", fmt.Sprintf("%v::o%v", filepath.Base(configFile), i))
ctx2 := logging.WithLogger(ctx, func(module string) logging.Logger {
return log
})
return longLivedRepositoryTest(ctx2, t, configFile, rm, log, opt, &stop)
return longLivedRepositoryTest(ctx2, t, openID, configFile, rm, log, opt, &stop)
})
}
}
@@ -282,12 +284,12 @@ func runStress(t *testing.T, opt *StressOptions) {
require.NoError(t, eg.Wait())
}
func longLivedRepositoryTest(ctx context.Context, t *testing.T, configFile string, rm *repomodel.RepositoryData, log logging.Logger, opt *StressOptions, stop *atomic.Bool) error {
func longLivedRepositoryTest(ctx context.Context, t *testing.T, openID, configFile string, rm *repomodel.RepositoryData, log logging.Logger, opt *StressOptions, stop *atomic.Bool) error {
t.Helper()
// important to call OpenRepository() before repo.Open() to ensure we're not seeing state
// added between repo.Open() and OpenRepository()
or := rm.OpenRepository()
or := rm.OpenRepository(openID)
rep, err := repo.Open(ctx, configFile, masterPassword, &repo.Options{})
if err != nil {
@@ -299,7 +301,7 @@ func longLivedRepositoryTest(ctx context.Context, t *testing.T, configFile strin
eg, ctx := errgroup.WithContext(ctx)
for i := range opt.SessionsPerOpenRepository {
ors := or.NewSession()
ors := or.NewSession(fmt.Sprintf("session-%v", i))
_, w, err := rep.(repo.DirectRepository).NewDirectWriter(ctx, repo.WriteSessionOptions{
Purpose: fmt.Sprintf("longLivedRepositoryTest-w%v", i),
@@ -365,13 +367,13 @@ func writeRandomContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *
log.Debugf("writeRandomContent(%v,%x)", contentID, data[0:16])
rs.WriteContent(contentID)
rs.WriteContent(ctx, contentID)
return errors.Wrapf(err, "writeRandomContent(%v)", contentID)
}
func readPendingContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
contentID := rs.WrittenContents.PickRandom()
contentID := rs.WrittenContents.PickRandom(ctx)
if contentID == content.EmptyID {
return errSkipped
}
@@ -387,7 +389,7 @@ func readPendingContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *
}
func readFlushedContent(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
contentID := rs.OpenRepo.Contents.PickRandom()
contentID := rs.OpenRepo.ReadableContents.PickRandom(ctx)
if contentID == content.EmptyID {
return errSkipped
}
@@ -419,7 +421,7 @@ func listAndReadAllContents(ctx context.Context, r repo.DirectRepositoryWriter,
ctx,
content.IterateOptions{},
func(ci content.Info) error {
cid := ci.GetContentID()
cid := ci.ContentID
_, err := r.ContentReader().GetContent(ctx, cid)
if err != nil {
return errors.Wrapf(err, "error reading content %v", cid)
@@ -448,8 +450,8 @@ func flush(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.Rep
// this is necessary since operations can proceed in parallel to Flush() which might add more data
// to the model. It would be incorrect to flush the latest state of the model
// because we don't know for sure if the corresponding repository data has indeed been flushed.
wc := rs.WrittenContents.Snapshot()
wm := rs.WrittenManifests.Snapshot()
wc := rs.WrittenContents.Snapshot("")
wm := rs.WrittenManifests.Snapshot("")
if err := r.Flush(ctx); err != nil {
return errors.Wrap(err, "error flushing")
@@ -457,7 +459,7 @@ func flush(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.Rep
// flush model after flushing the repository to communicate to other sessions that they can expect
// to see flushed items now.
rs.Flush(&wc, &wm)
rs.Flush(ctx, wc, wm)
return nil
}
@@ -467,17 +469,20 @@ func refresh(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.R
// refresh model before refreshing repository to guarantee that repository has at least all the items in
// the model (possibly more).
rs.Refresh()
cids := rs.OpenRepo.RepoData.CommittedContents.Snapshot("")
mids := rs.OpenRepo.RepoData.CommittedManifests.Snapshot("")
if err := r.Refresh(ctx); err != nil {
return errors.Wrap(err, "refresh error")
}
rs.Refresh(ctx, cids, mids)
return nil
}
func readPendingManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
manifestID := rs.WrittenManifests.PickRandom()
manifestID := rs.WrittenManifests.PickRandom(ctx)
if manifestID == "" {
return errSkipped
}
@@ -493,7 +498,7 @@ func readPendingManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs
}
func readFlushedManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs *repomodel.RepositorySession, log logging.Logger) error {
manifestID := rs.OpenRepo.Manifests.PickRandom()
manifestID := rs.OpenRepo.ReadableManifests.PickRandom(ctx)
if manifestID == "" {
return errSkipped
}
@@ -531,7 +536,7 @@ func writeRandomManifest(ctx context.Context, r repo.DirectRepositoryWriter, rs
}
log.Debugf("writeRandomManifest(%v)", mid)
rs.WriteManifest(mid)
rs.WriteManifest(ctx, mid)
return err
}