mirror of
https://github.com/kopia/kopia.git
synced 2026-05-11 00:04:46 -04:00
fix(repository): fixed handling of content.Info (#3356)
* fix(repository): fixed handling of content.Info Previously content.Info was an interface which was implemented by: * index.InfoStruct * index.indexEntryInfoV1 * index.indexEntryInfoV2 The last 2 implementations were relying on memory-mapped files which in rare cases could be closed while Kopia was still processing them leading to #2599. This changes fixes the bug and strictly separates content.Info (which is now always a struct) from the other two (which were renamed as index.InfoReader and only used inside repo/content/...). In addition to being safer, this _should_ reduce memory allocations. * reduce the size of content.Info with proper alignment. * pr feedback * renamed index.InfoStruct to index.Info
This commit is contained in:
@@ -169,7 +169,7 @@ func (c *commandIndexInspect) inspectSingleIndexBlob(ctx context.Context, rep re
|
||||
}
|
||||
|
||||
for _, ent := range entries {
|
||||
output <- indexBlobPlusContentInfo{bm, content.ToInfoStruct(ent)}
|
||||
output <- indexBlobPlusContentInfo{bm, ent}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/content/indexblob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
)
|
||||
@@ -92,14 +91,14 @@ func (c *commandRepositoryUpgrade) setup(svc advancedAppServices, parent command
|
||||
}
|
||||
|
||||
// assign store the info struct in a map that can be used to compare indexes.
|
||||
func assign(iif content.Info, i int, m map[content.ID][2]index.Info) {
|
||||
func assign(iif content.Info, i int, m map[content.ID][2]content.Info) {
|
||||
v := m[iif.GetContentID()]
|
||||
v[i] = iif
|
||||
m[iif.GetContentID()] = v
|
||||
}
|
||||
|
||||
// loadIndexBlobs load index blobs into indexEntries map. indexEntries map will allow comparison betweel two indexes (index at which == 0 and index at which == 1).
|
||||
func loadIndexBlobs(ctx context.Context, indexEntries map[content.ID][2]index.Info, sm *content.SharedManager, which int, indexBlobInfos []indexblob.Metadata) error {
|
||||
func loadIndexBlobs(ctx context.Context, indexEntries map[content.ID][2]content.Info, sm *content.SharedManager, which int, indexBlobInfos []indexblob.Metadata) error {
|
||||
d := gather.WriteBuffer{}
|
||||
|
||||
for _, indexBlobInfo := range indexBlobInfos {
|
||||
@@ -121,7 +120,7 @@ func loadIndexBlobs(ctx context.Context, indexEntries map[content.ID][2]index.In
|
||||
// validateAction returns an error if the new V1 index blob content does not match the source V0 index blob content.
|
||||
// This is used to check that the upgraded index (V1 index) reflects the content of the old V0 index.
|
||||
func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
indexEntries := map[content.ID][2]index.Info{}
|
||||
indexEntries := map[content.ID][2]content.Info{}
|
||||
|
||||
sm := rep.ContentManager().SharedManager
|
||||
|
||||
@@ -155,20 +154,23 @@ func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.
|
||||
|
||||
var msgs []string // a place to keep messages from the index comparison process
|
||||
|
||||
var zeroInfo content.Info
|
||||
|
||||
// both indexes will have matching contentiDs with matching indexInfo structures.
|
||||
//nolint:gocritic
|
||||
for contentID, indexEntryPairs := range indexEntries {
|
||||
iep0 := indexEntryPairs[0] // first entry of index entry pair
|
||||
iep1 := indexEntryPairs[1] // second entry of index entry pair
|
||||
|
||||
// check that both the new and old indexes have entries for the same content
|
||||
if iep0 != nil && iep1 != nil {
|
||||
if iep0 != zeroInfo && iep1 != zeroInfo {
|
||||
// this is the happy-path, check the entries. any problems found will be added to msgs
|
||||
msgs = append(msgs, CheckIndexInfo(iep0, iep1)...)
|
||||
continue
|
||||
}
|
||||
|
||||
// one of iep0 or iep1 are nil .. find out which one and add an appropriate message.
|
||||
if iep0 != nil {
|
||||
if iep0 != zeroInfo {
|
||||
msgs = append(msgs, fmt.Sprintf("lop-sided index entries for contentID %q at blob %q", contentID, iep0.GetPackBlobID()))
|
||||
continue
|
||||
}
|
||||
@@ -194,7 +196,7 @@ func (c *commandRepositoryUpgrade) validateAction(ctx context.Context, rep repo.
|
||||
}
|
||||
|
||||
// CheckIndexInfo compare two index infos. If a mismatch exists, return an error with diagnostic information.
|
||||
func CheckIndexInfo(i0, i1 index.Info) []string {
|
||||
func CheckIndexInfo(i0, i1 content.Info) []string {
|
||||
var q []string
|
||||
|
||||
switch {
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/cli"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/tests/testenv"
|
||||
)
|
||||
@@ -298,120 +298,120 @@ func (s *formatSpecificTestSuite) TestRepositoryUpgradeStatusWhileLocked(t *test
|
||||
|
||||
func TestRepositoryUpgrade_checkIndexInfo(t *testing.T) {
|
||||
tcs := []struct {
|
||||
indexInfo0 index.Info
|
||||
indexInfo1 index.Info
|
||||
indexInfo0 content.Info
|
||||
indexInfo1 content.Info
|
||||
expectRegexs []string
|
||||
}{
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{PackBlobID: "a"},
|
||||
indexInfo1: &index.InfoStruct{PackBlobID: "a"},
|
||||
indexInfo0: content.Info{PackBlobID: "a"},
|
||||
indexInfo1: content.Info{PackBlobID: "a"},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{PackBlobID: "a"},
|
||||
indexInfo1: &index.InfoStruct{PackBlobID: "b"},
|
||||
indexInfo0: content.Info{PackBlobID: "a"},
|
||||
indexInfo1: content.Info{PackBlobID: "b"},
|
||||
expectRegexs: []string{
|
||||
`do not match: "a", "b".*PackBlobID`,
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{TimestampSeconds: 1},
|
||||
indexInfo1: &index.InfoStruct{TimestampSeconds: 1},
|
||||
indexInfo0: content.Info{TimestampSeconds: 1},
|
||||
indexInfo1: content.Info{TimestampSeconds: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{TimestampSeconds: 1},
|
||||
indexInfo1: &index.InfoStruct{TimestampSeconds: 2},
|
||||
indexInfo0: content.Info{TimestampSeconds: 1},
|
||||
indexInfo1: content.Info{TimestampSeconds: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*TimestampSeconds",
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{OriginalLength: 1},
|
||||
indexInfo1: &index.InfoStruct{OriginalLength: 1},
|
||||
indexInfo0: content.Info{OriginalLength: 1},
|
||||
indexInfo1: content.Info{OriginalLength: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{OriginalLength: 1},
|
||||
indexInfo1: &index.InfoStruct{OriginalLength: 2},
|
||||
indexInfo0: content.Info{OriginalLength: 1},
|
||||
indexInfo1: content.Info{OriginalLength: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*OriginalLength",
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{PackedLength: 1},
|
||||
indexInfo1: &index.InfoStruct{PackedLength: 1},
|
||||
indexInfo0: content.Info{PackedLength: 1},
|
||||
indexInfo1: content.Info{PackedLength: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{PackedLength: 1},
|
||||
indexInfo1: &index.InfoStruct{PackedLength: 2},
|
||||
indexInfo0: content.Info{PackedLength: 1},
|
||||
indexInfo1: content.Info{PackedLength: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*PackedLength",
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{PackOffset: 1},
|
||||
indexInfo1: &index.InfoStruct{PackOffset: 1},
|
||||
indexInfo0: content.Info{PackOffset: 1},
|
||||
indexInfo1: content.Info{PackOffset: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{PackOffset: 1},
|
||||
indexInfo1: &index.InfoStruct{PackOffset: 2},
|
||||
indexInfo0: content.Info{PackOffset: 1},
|
||||
indexInfo1: content.Info{PackOffset: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*PackOffset",
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{Deleted: true},
|
||||
indexInfo1: &index.InfoStruct{Deleted: true},
|
||||
indexInfo0: content.Info{Deleted: true},
|
||||
indexInfo1: content.Info{Deleted: true},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{Deleted: false},
|
||||
indexInfo1: &index.InfoStruct{Deleted: true},
|
||||
indexInfo0: content.Info{Deleted: false},
|
||||
indexInfo1: content.Info{Deleted: true},
|
||||
expectRegexs: []string{
|
||||
"do not match.*Deleted",
|
||||
},
|
||||
},
|
||||
// simple logic error can make result of this false... so check
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{Deleted: false},
|
||||
indexInfo1: &index.InfoStruct{Deleted: false},
|
||||
indexInfo0: content.Info{Deleted: false},
|
||||
indexInfo1: content.Info{Deleted: false},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{FormatVersion: 1},
|
||||
indexInfo1: &index.InfoStruct{FormatVersion: 1},
|
||||
indexInfo0: content.Info{FormatVersion: 1},
|
||||
indexInfo1: content.Info{FormatVersion: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{FormatVersion: 1},
|
||||
indexInfo1: &index.InfoStruct{FormatVersion: 2},
|
||||
indexInfo0: content.Info{FormatVersion: 1},
|
||||
indexInfo1: content.Info{FormatVersion: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*FormatVersion",
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{CompressionHeaderID: 1},
|
||||
indexInfo1: &index.InfoStruct{CompressionHeaderID: 1},
|
||||
indexInfo0: content.Info{CompressionHeaderID: 1},
|
||||
indexInfo1: content.Info{CompressionHeaderID: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{CompressionHeaderID: 1},
|
||||
indexInfo1: &index.InfoStruct{CompressionHeaderID: 2},
|
||||
indexInfo0: content.Info{CompressionHeaderID: 1},
|
||||
indexInfo1: content.Info{CompressionHeaderID: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*CompressionHeaderID",
|
||||
},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{EncryptionKeyID: 1},
|
||||
indexInfo1: &index.InfoStruct{EncryptionKeyID: 1},
|
||||
indexInfo0: content.Info{EncryptionKeyID: 1},
|
||||
indexInfo1: content.Info{EncryptionKeyID: 1},
|
||||
expectRegexs: []string{},
|
||||
},
|
||||
{
|
||||
indexInfo0: &index.InfoStruct{EncryptionKeyID: 1},
|
||||
indexInfo1: &index.InfoStruct{EncryptionKeyID: 2},
|
||||
indexInfo0: content.Info{EncryptionKeyID: 1},
|
||||
indexInfo1: content.Info{EncryptionKeyID: 2},
|
||||
expectRegexs: []string{
|
||||
"do not match.*EncryptionKeyID",
|
||||
},
|
||||
|
||||
@@ -386,14 +386,14 @@ func forgetContents(t *testing.T, env *testenv.CLITest, contentIDs ...string) {
|
||||
env.RunAndExpectSuccess(t, append([]string{"blob", "rm"}, blobIDs...)...)
|
||||
}
|
||||
|
||||
func mustGetContentMap(t *testing.T, env *testenv.CLITest) map[content.ID]content.InfoStruct {
|
||||
func mustGetContentMap(t *testing.T, env *testenv.CLITest) map[content.ID]content.Info {
|
||||
t.Helper()
|
||||
|
||||
var contents1 []content.InfoStruct
|
||||
var contents1 []content.Info
|
||||
|
||||
testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "content", "ls", "--json"), &contents1)
|
||||
|
||||
contentMap := map[content.ID]content.InfoStruct{}
|
||||
contentMap := map[content.ID]content.Info{}
|
||||
for _, v := range contents1 {
|
||||
contentMap[v.ContentID] = v
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
@@ -54,8 +53,6 @@ func (c *jsonOutput) cleanupSnapshotManifestListForJSON(manifests []*snapshot.Ma
|
||||
|
||||
func (c *jsonOutput) cleanupForJSON(v interface{}) interface{} {
|
||||
switch v := v.(type) {
|
||||
case content.Info:
|
||||
return content.ToInfoStruct(v)
|
||||
case *snapshot.Manifest:
|
||||
return c.cleanupSnapshotManifestForJSON(v)
|
||||
case []*snapshot.Manifest:
|
||||
|
||||
@@ -62,7 +62,7 @@ func InfoDiff(i1, i2 index.Info, ignore ...string) []string {
|
||||
// dear future reader, if this fails because the number of methods has changed,
|
||||
// you need to add additional verification above.
|
||||
//nolint:gomnd
|
||||
if cnt := reflect.TypeOf((*index.Info)(nil)).Elem().NumMethod(); cnt != 11 {
|
||||
if cnt := reflect.TypeOf((*index.InfoReader)(nil)).Elem().NumMethod(); cnt != 11 {
|
||||
diffs = append(diffs, fmt.Sprintf("unexpected number of methods on content.Info: %v, must update the test", cnt))
|
||||
}
|
||||
|
||||
|
||||
@@ -186,13 +186,13 @@ func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOpt
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) {
|
||||
var bi content.InfoStruct
|
||||
var bi content.Info
|
||||
|
||||
if err := r.cli.Get(ctx, "contents/"+contentID.String()+"?info=1", content.ErrContentNotFound, &bi); err != nil {
|
||||
return nil, errors.Wrap(err, "ContentInfo")
|
||||
return content.Info{}, errors.Wrap(err, "ContentInfo")
|
||||
}
|
||||
|
||||
return &bi, nil
|
||||
return bi, nil
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
|
||||
|
||||
@@ -66,20 +66,20 @@ func (c *committedContentIndex) getContent(contentID ID) (Info, error) {
|
||||
info, err := c.merged.GetInfo(contentID)
|
||||
if info != nil {
|
||||
if shouldIgnore(info, c.deletionWatermark) {
|
||||
return nil, ErrContentNotFound
|
||||
return index.Info{}, ErrContentNotFound
|
||||
}
|
||||
|
||||
return info, nil
|
||||
return index.ToInfoStruct(info), nil
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
return nil, ErrContentNotFound
|
||||
return index.Info{}, ErrContentNotFound
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "error getting content info from index")
|
||||
return index.Info{}, errors.Wrap(err, "error getting content info from index")
|
||||
}
|
||||
|
||||
func shouldIgnore(id Info, deletionWatermark time.Time) bool {
|
||||
func shouldIgnore(id index.InfoReader, deletionWatermark time.Time) bool {
|
||||
if !id.GetDeleted() {
|
||||
return false
|
||||
}
|
||||
@@ -131,12 +131,12 @@ func (c *committedContentIndex) listContents(r IDRange, cb func(i Info) error) e
|
||||
c.mu.RUnlock()
|
||||
|
||||
//nolint:wrapcheck
|
||||
return m.Iterate(r, func(i Info) error {
|
||||
return m.Iterate(r, func(i index.InfoReader) error {
|
||||
if shouldIgnore(i, deletionWatermark) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return cb(i)
|
||||
return cb(index.ToInfoStruct(i))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -257,8 +257,8 @@ func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merge
|
||||
b := index.Builder{}
|
||||
|
||||
for _, ndx := range toMerge {
|
||||
if err := ndx.Iterate(index.AllIDs, func(i Info) error {
|
||||
b.Add(i)
|
||||
if err := ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
|
||||
b.Add(index.ToInfoStruct(i))
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to iterate index entries")
|
||||
|
||||
@@ -48,8 +48,8 @@ func testCache(t *testing.T, cache committedContentIndexCache, fakeTime *faketim
|
||||
}
|
||||
|
||||
require.NoError(t, cache.addContentToCache(ctx, "ndx1", mustBuildIndex(t, index.Builder{
|
||||
mustParseID(t, "c1"): &InfoStruct{PackBlobID: "p1234", ContentID: mustParseID(t, "c1")},
|
||||
mustParseID(t, "c2"): &InfoStruct{PackBlobID: "p1234", ContentID: mustParseID(t, "c2")},
|
||||
mustParseID(t, "c1"): Info{PackBlobID: "p1234", ContentID: mustParseID(t, "c1")},
|
||||
mustParseID(t, "c2"): Info{PackBlobID: "p1234", ContentID: mustParseID(t, "c2")},
|
||||
})))
|
||||
|
||||
has, err = cache.hasIndexBlobID(ctx, "ndx1")
|
||||
@@ -60,13 +60,13 @@ func testCache(t *testing.T, cache committedContentIndexCache, fakeTime *faketim
|
||||
}
|
||||
|
||||
require.NoError(t, cache.addContentToCache(ctx, "ndx2", mustBuildIndex(t, index.Builder{
|
||||
mustParseID(t, "c3"): &InfoStruct{PackBlobID: "p2345", ContentID: mustParseID(t, "c3")},
|
||||
mustParseID(t, "c4"): &InfoStruct{PackBlobID: "p2345", ContentID: mustParseID(t, "c4")},
|
||||
mustParseID(t, "c3"): Info{PackBlobID: "p2345", ContentID: mustParseID(t, "c3")},
|
||||
mustParseID(t, "c4"): Info{PackBlobID: "p2345", ContentID: mustParseID(t, "c4")},
|
||||
})))
|
||||
|
||||
require.NoError(t, cache.addContentToCache(ctx, "ndx2", mustBuildIndex(t, index.Builder{
|
||||
mustParseID(t, "c3"): &InfoStruct{PackBlobID: "p2345", ContentID: mustParseID(t, "c3")},
|
||||
mustParseID(t, "c4"): &InfoStruct{PackBlobID: "p2345", ContentID: mustParseID(t, "c4")},
|
||||
mustParseID(t, "c3"): Info{PackBlobID: "p2345", ContentID: mustParseID(t, "c3")},
|
||||
mustParseID(t, "c4"): Info{PackBlobID: "p2345", ContentID: mustParseID(t, "c4")},
|
||||
})))
|
||||
|
||||
ndx1, err := cache.openIndex(ctx, "ndx1")
|
||||
|
||||
@@ -29,7 +29,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b
|
||||
|
||||
var recovered []Info
|
||||
|
||||
err = ndx.Iterate(index.AllIDs, func(i Info) error {
|
||||
err = ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
|
||||
// 'i' is ephemeral and will depend on temporary buffers which
|
||||
// won't be available when this function returns, we need to
|
||||
// convert it to durable struct.
|
||||
|
||||
@@ -186,11 +186,19 @@ func (bm *WriteManager) deletePreexistingContent(ctx context.Context, ci Info) e
|
||||
return errors.Wrap(err, "unable to create pack")
|
||||
}
|
||||
|
||||
pp.currentPackItems[ci.GetContentID()] = &deletedInfo{ci, bm.contentWriteTime(ci.GetTimestampSeconds())}
|
||||
pp.currentPackItems[ci.GetContentID()] = deletedInfo(ci, bm.contentWriteTime(ci.GetTimestampSeconds()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deletedInfo(is Info, deletedTime int64) Info {
|
||||
// clone and set deleted time
|
||||
is.Deleted = true
|
||||
is.TimestampSeconds = deletedTime
|
||||
|
||||
return is
|
||||
}
|
||||
|
||||
// contentWriteTime returns content write time for new content
|
||||
// by computing max(timeNow().Unix(), previousUnixTimeSeconds + 1).
|
||||
func (bm *WriteManager) contentWriteTime(previousUnixTimeSeconds int64) int64 {
|
||||
@@ -202,19 +210,6 @@ func (bm *WriteManager) contentWriteTime(previousUnixTimeSeconds int64) int64 {
|
||||
return previousUnixTimeSeconds + 1
|
||||
}
|
||||
|
||||
type deletedInfo struct {
|
||||
Info
|
||||
deletedTime int64
|
||||
}
|
||||
|
||||
func (d *deletedInfo) GetDeleted() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *deletedInfo) GetTimestampSeconds() int64 {
|
||||
return d.deletedTime
|
||||
}
|
||||
|
||||
func (bm *WriteManager) maybeFlushBasedOnTimeUnlocked(ctx context.Context) error {
|
||||
bm.lock()
|
||||
shouldFlush := bm.timeNow().After(bm.flushPackIndexesAfter)
|
||||
@@ -310,7 +305,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
return errors.Wrap(err, "unable to create pending pack")
|
||||
}
|
||||
|
||||
info := &InfoStruct{
|
||||
info := Info{
|
||||
Deleted: isDeleted,
|
||||
ContentID: contentID,
|
||||
PackBlobID: pp.packBlobID,
|
||||
@@ -665,11 +660,11 @@ func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID,
|
||||
|
||||
pp, bi, err := bm.getContentInfoReadLocked(ctx, contentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return Info{}, err
|
||||
}
|
||||
|
||||
if err := bm.getContentDataReadLocked(ctx, pp, bi, output); err != nil {
|
||||
return nil, err
|
||||
return Info{}, err
|
||||
}
|
||||
|
||||
return bi, nil
|
||||
@@ -885,7 +880,7 @@ func (bm *WriteManager) getOverlayContentInfoReadLocked(contentID ID) (*pendingP
|
||||
return nil, ci, true
|
||||
}
|
||||
|
||||
return nil, nil, false
|
||||
return nil, Info{}, false
|
||||
}
|
||||
|
||||
// +checklocksread:bm.mu
|
||||
@@ -896,7 +891,7 @@ func (bm *WriteManager) getContentInfoReadLocked(ctx context.Context, contentID
|
||||
|
||||
// see if the content existed before
|
||||
if err := bm.maybeRefreshIndexes(ctx); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, Info{}, err
|
||||
}
|
||||
|
||||
info, err := bm.committedContents.getContent(contentID)
|
||||
@@ -912,7 +907,7 @@ func (bm *WriteManager) ContentInfo(ctx context.Context, contentID ID) (Info, er
|
||||
_, bi, err := bm.getContentInfoReadLocked(ctx, contentID)
|
||||
if err != nil {
|
||||
bm.log.Debugf("ContentInfo(%q) - error %v", contentID, err)
|
||||
return nil, err
|
||||
return Info{}, err
|
||||
}
|
||||
|
||||
return bi, err
|
||||
|
||||
@@ -77,7 +77,7 @@ func ParseIndexBlob(blobID blob.ID, encrypted gather.Bytes, crypter blobcrypto.C
|
||||
|
||||
var results []Info
|
||||
|
||||
err = ndx.Iterate(index.AllIDs, func(i Info) error {
|
||||
err = ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
|
||||
results = append(results, index.ToInfoStruct(i))
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -970,7 +970,7 @@ func (s *contentManagerSuite) TestDeleteAfterUndelete(t *testing.T) {
|
||||
t.Fatal("error while flushing:", err)
|
||||
}
|
||||
|
||||
c2Want = withDeleted{c2Want, true}
|
||||
c2Want = withDeleted(c2Want)
|
||||
deleteContentAfterUndeleteAndCheck(ctx, t, bm, content2, c2Want)
|
||||
}
|
||||
|
||||
@@ -983,7 +983,7 @@ func deleteContentAfterUndeleteAndCheck(ctx context.Context, t *testing.T, bm *W
|
||||
t.Fatalf("Expected content %q to be deleted, got: %#v", id, got)
|
||||
}
|
||||
|
||||
if diff := indextest.InfoDiff(want, got, "GetTimestampSeconds"); len(diff) != 0 {
|
||||
if diff := indextest.InfoDiff(want, got, "GetTimestampSeconds", "Timestamp"); len(diff) != 0 {
|
||||
t.Fatalf("Content %q info does not match\ndiff: %v", id, diff)
|
||||
}
|
||||
|
||||
@@ -2673,13 +2673,10 @@ func verifyBlobCount(t *testing.T, data blobtesting.DataMap, want map[blob.ID]in
|
||||
}
|
||||
}
|
||||
|
||||
type withDeleted struct {
|
||||
index.Info
|
||||
deleted bool
|
||||
}
|
||||
func withDeleted(i Info) Info {
|
||||
i.Deleted = true
|
||||
|
||||
func (o withDeleted) GetDeleted() bool {
|
||||
return o.deleted
|
||||
return i
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -68,7 +68,7 @@ func (bm *WriteManager) PrefetchContents(ctx context.Context, contentIDs []ID, h
|
||||
|
||||
for _, ci := range contentIDs {
|
||||
_, bi, _ := bm.getContentInfoReadLocked(ctx, ci)
|
||||
if bi == nil {
|
||||
if bi == (Info{}) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -18,10 +18,10 @@
|
||||
type Index interface {
|
||||
io.Closer
|
||||
ApproximateCount() int
|
||||
GetInfo(contentID ID) (Info, error)
|
||||
GetInfo(contentID ID) (InfoReader, error)
|
||||
|
||||
// invoked the provided callback for all entries such that entry.ID >= startID and entry.ID < endID
|
||||
Iterate(r IDRange, cb func(Info) error) error
|
||||
Iterate(r IDRange, cb func(InfoReader) error) error
|
||||
}
|
||||
|
||||
// Open reads an Index from a given reader. The caller must call Close() when the index is no longer used.
|
||||
|
||||
@@ -37,7 +37,8 @@ func (b Builder) Clone() Builder {
|
||||
func (b Builder) Add(i Info) {
|
||||
cid := i.GetContentID()
|
||||
|
||||
if contentInfoGreaterThan(i, b[cid]) {
|
||||
old, found := b[cid]
|
||||
if !found || contentInfoGreaterThanStruct(i, old) {
|
||||
b[cid] = i
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ func (e indexEntryInfoV1) GetEncryptionKeyID() byte {
|
||||
return 0
|
||||
}
|
||||
|
||||
var _ Info = indexEntryInfoV1{}
|
||||
var _ InfoReader = indexEntryInfoV1{}
|
||||
|
||||
type indexV1 struct {
|
||||
hdr v1HeaderInfo
|
||||
@@ -125,7 +125,7 @@ func (b *indexV1) ApproximateCount() int {
|
||||
// Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically.
|
||||
// The iteration ends when the callback returns an error, which is propagated to the caller or when
|
||||
// all contents have been visited.
|
||||
func (b *indexV1) Iterate(r IDRange, cb func(Info) error) error {
|
||||
func (b *indexV1) Iterate(r IDRange, cb func(InfoReader) error) error {
|
||||
startPos, err := b.findEntryPosition(r.StartID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not find starting position")
|
||||
@@ -241,7 +241,7 @@ func (b *indexV1) findEntry(output []byte, contentID ID) ([]byte, error) {
|
||||
}
|
||||
|
||||
// GetInfo returns information about a given content. If a content is not found, nil is returned.
|
||||
func (b *indexV1) GetInfo(contentID ID) (Info, error) {
|
||||
func (b *indexV1) GetInfo(contentID ID) (InfoReader, error) {
|
||||
var entryBuf [v1MaxEntrySize]byte
|
||||
|
||||
e, err := b.findEntry(entryBuf[:0], contentID)
|
||||
@@ -256,7 +256,7 @@ func (b *indexV1) GetInfo(contentID ID) (Info, error) {
|
||||
return b.entryToInfo(contentID, e)
|
||||
}
|
||||
|
||||
func (b *indexV1) entryToInfo(contentID ID, entryData []byte) (Info, error) {
|
||||
func (b *indexV1) entryToInfo(contentID ID, entryData []byte) (InfoReader, error) {
|
||||
if len(entryData) != v1EntryLength {
|
||||
return nil, errors.Errorf("invalid entry length: %v", len(entryData))
|
||||
}
|
||||
|
||||
@@ -224,7 +224,7 @@ func (e indexV2EntryInfo) Timestamp() time.Time {
|
||||
return time.Unix(e.GetTimestampSeconds(), 0)
|
||||
}
|
||||
|
||||
var _ Info = indexV2EntryInfo{}
|
||||
var _ InfoReader = indexV2EntryInfo{}
|
||||
|
||||
type v2HeaderInfo struct {
|
||||
version int
|
||||
@@ -277,7 +277,7 @@ func (b *indexV2) ApproximateCount() int {
|
||||
// Iterate invokes the provided callback function for a range of contents in the index, sorted alphabetically.
|
||||
// The iteration ends when the callback returns an error, which is propagated to the caller or when
|
||||
// all contents have been visited.
|
||||
func (b *indexV2) Iterate(r IDRange, cb func(Info) error) error {
|
||||
func (b *indexV2) Iterate(r IDRange, cb func(InfoReader) error) error {
|
||||
startPos, err := b.findEntryPosition(r.StartID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not find starting position")
|
||||
@@ -389,7 +389,7 @@ func (b *indexV2) findEntry(contentID ID) ([]byte, error) {
|
||||
}
|
||||
|
||||
// GetInfo returns information about a given content. If a content is not found, nil is returned.
|
||||
func (b *indexV2) GetInfo(contentID ID) (Info, error) {
|
||||
func (b *indexV2) GetInfo(contentID ID) (InfoReader, error) {
|
||||
e, err := b.findEntry(contentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -402,7 +402,7 @@ func (b *indexV2) GetInfo(contentID ID) (Info, error) {
|
||||
return b.entryToInfo(contentID, e)
|
||||
}
|
||||
|
||||
func (b *indexV2) entryToInfo(contentID ID, entryData []byte) (Info, error) {
|
||||
func (b *indexV2) entryToInfo(contentID ID, entryData []byte) (InfoReader, error) {
|
||||
if len(entryData) < v2EntryMinLength {
|
||||
return nil, errors.Errorf("invalid entry length: %v", len(entryData))
|
||||
}
|
||||
|
||||
@@ -7,10 +7,10 @@
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
)
|
||||
|
||||
// Info is an information about a single piece of content managed by Manager.
|
||||
// InfoReader is an information about a single piece of content managed by Manager.
|
||||
//
|
||||
//nolint:interfacebloat
|
||||
type Info interface {
|
||||
type InfoReader interface {
|
||||
GetContentID() ID
|
||||
GetPackBlobID() blob.ID
|
||||
GetTimestampSeconds() int64
|
||||
@@ -24,62 +24,58 @@ type Info interface {
|
||||
GetEncryptionKeyID() byte
|
||||
}
|
||||
|
||||
// InfoStruct is an implementation of Info based on a structure.
|
||||
type InfoStruct struct {
|
||||
ContentID ID `json:"contentID"`
|
||||
// Info is an implementation of Info based on a structure.
|
||||
type Info struct {
|
||||
PackBlobID blob.ID `json:"packFile,omitempty"`
|
||||
TimestampSeconds int64 `json:"time"`
|
||||
OriginalLength uint32 `json:"originalLength"`
|
||||
PackedLength uint32 `json:"length"`
|
||||
PackOffset uint32 `json:"packOffset,omitempty"`
|
||||
CompressionHeaderID compression.HeaderID `json:"compression,omitempty"`
|
||||
ContentID ID `json:"contentID"`
|
||||
Deleted bool `json:"deleted"`
|
||||
FormatVersion byte `json:"formatVersion"`
|
||||
CompressionHeaderID compression.HeaderID `json:"compression,omitempty"`
|
||||
EncryptionKeyID byte `json:"encryptionKeyID,omitempty"`
|
||||
}
|
||||
|
||||
// GetContentID implements the Info interface.
|
||||
func (i *InfoStruct) GetContentID() ID { return i.ContentID }
|
||||
func (i Info) GetContentID() ID { return i.ContentID }
|
||||
|
||||
// GetPackBlobID implements the Info interface.
|
||||
func (i *InfoStruct) GetPackBlobID() blob.ID { return i.PackBlobID }
|
||||
func (i Info) GetPackBlobID() blob.ID { return i.PackBlobID }
|
||||
|
||||
// GetTimestampSeconds implements the Info interface.
|
||||
func (i *InfoStruct) GetTimestampSeconds() int64 { return i.TimestampSeconds }
|
||||
func (i Info) GetTimestampSeconds() int64 { return i.TimestampSeconds }
|
||||
|
||||
// GetOriginalLength implements the Info interface.
|
||||
func (i *InfoStruct) GetOriginalLength() uint32 { return i.OriginalLength }
|
||||
func (i Info) GetOriginalLength() uint32 { return i.OriginalLength }
|
||||
|
||||
// GetPackedLength implements the Info interface.
|
||||
func (i *InfoStruct) GetPackedLength() uint32 { return i.PackedLength }
|
||||
func (i Info) GetPackedLength() uint32 { return i.PackedLength }
|
||||
|
||||
// GetPackOffset implements the Info interface.
|
||||
func (i *InfoStruct) GetPackOffset() uint32 { return i.PackOffset }
|
||||
func (i Info) GetPackOffset() uint32 { return i.PackOffset }
|
||||
|
||||
// GetDeleted implements the Info interface.
|
||||
func (i *InfoStruct) GetDeleted() bool { return i.Deleted }
|
||||
func (i Info) GetDeleted() bool { return i.Deleted }
|
||||
|
||||
// GetFormatVersion implements the Info interface.
|
||||
func (i *InfoStruct) GetFormatVersion() byte { return i.FormatVersion }
|
||||
func (i Info) GetFormatVersion() byte { return i.FormatVersion }
|
||||
|
||||
// GetCompressionHeaderID implements the Info interface.
|
||||
func (i *InfoStruct) GetCompressionHeaderID() compression.HeaderID { return i.CompressionHeaderID }
|
||||
func (i Info) GetCompressionHeaderID() compression.HeaderID { return i.CompressionHeaderID }
|
||||
|
||||
// GetEncryptionKeyID implements the Info interface.
|
||||
func (i *InfoStruct) GetEncryptionKeyID() byte { return i.EncryptionKeyID }
|
||||
func (i Info) GetEncryptionKeyID() byte { return i.EncryptionKeyID }
|
||||
|
||||
// Timestamp implements the Info interface.
|
||||
func (i *InfoStruct) Timestamp() time.Time {
|
||||
func (i Info) Timestamp() time.Time {
|
||||
return time.Unix(i.GetTimestampSeconds(), 0)
|
||||
}
|
||||
|
||||
// ToInfoStruct converts the provided Info to *InfoStruct.
|
||||
func ToInfoStruct(i Info) *InfoStruct {
|
||||
if is, ok := i.(*InfoStruct); ok {
|
||||
return is
|
||||
}
|
||||
|
||||
return &InfoStruct{
|
||||
// ToInfoStruct converts the provided Info to InfoStruct.
|
||||
func ToInfoStruct(i InfoReader) Info {
|
||||
return Info{
|
||||
ContentID: i.GetContentID(),
|
||||
PackBlobID: i.GetPackBlobID(),
|
||||
TimestampSeconds: i.GetTimestampSeconds(),
|
||||
|
||||
@@ -33,17 +33,23 @@ func (m Merged) Close() error {
|
||||
return errors.Wrap(err, "closing index shards")
|
||||
}
|
||||
|
||||
func contentInfoGreaterThan(a, b Info) bool {
|
||||
if b == nil {
|
||||
// everyrhing is greater than nil
|
||||
return true
|
||||
func contentInfoGreaterThan(a, b InfoReader) bool {
|
||||
if l, r := a.GetTimestampSeconds(), b.GetTimestampSeconds(); l != r {
|
||||
// different timestamps, higher one wins
|
||||
return l > r
|
||||
}
|
||||
|
||||
if a == nil {
|
||||
// nil is less than everything
|
||||
return false
|
||||
if l, r := a.GetDeleted(), b.GetDeleted(); l != r {
|
||||
// non-deleted is greater than deleted.
|
||||
return !a.GetDeleted()
|
||||
}
|
||||
|
||||
// both same time, both deleted, we must ensure we always resolve to the same pack blob.
|
||||
// since pack blobs are random and unique, simple lexicographic ordering will suffice.
|
||||
return a.GetPackBlobID() > b.GetPackBlobID()
|
||||
}
|
||||
|
||||
func contentInfoGreaterThanStruct(a, b Info) bool {
|
||||
if l, r := a.GetTimestampSeconds(), b.GetTimestampSeconds(); l != r {
|
||||
// different timestamps, higher one wins
|
||||
return l > r
|
||||
@@ -60,8 +66,8 @@ func contentInfoGreaterThan(a, b Info) bool {
|
||||
}
|
||||
|
||||
// GetInfo returns information about a single content. If a content is not found, returns (nil,nil).
|
||||
func (m Merged) GetInfo(id ID) (Info, error) {
|
||||
var best Info
|
||||
func (m Merged) GetInfo(id ID) (InfoReader, error) {
|
||||
var best InfoReader
|
||||
|
||||
for _, ndx := range m {
|
||||
i, err := ndx.GetInfo(id)
|
||||
@@ -69,7 +75,7 @@ func (m Merged) GetInfo(id ID) (Info, error) {
|
||||
return nil, errors.Wrapf(err, "error getting id %v from index shard", id)
|
||||
}
|
||||
|
||||
if contentInfoGreaterThan(i, best) {
|
||||
if i != nil && (best == nil || contentInfoGreaterThan(i, best)) {
|
||||
best = i
|
||||
}
|
||||
}
|
||||
@@ -78,8 +84,8 @@ func (m Merged) GetInfo(id ID) (Info, error) {
|
||||
}
|
||||
|
||||
type nextInfo struct {
|
||||
it Info
|
||||
ch <-chan Info
|
||||
it InfoReader
|
||||
ch <-chan InfoReader
|
||||
}
|
||||
|
||||
type nextInfoHeap []*nextInfo
|
||||
@@ -107,14 +113,14 @@ func (h *nextInfoHeap) Pop() interface{} {
|
||||
return x
|
||||
}
|
||||
|
||||
func iterateChan(r IDRange, ndx Index, done chan bool, wg *sync.WaitGroup) <-chan Info {
|
||||
ch := make(chan Info, 1)
|
||||
func iterateChan(r IDRange, ndx Index, done chan bool, wg *sync.WaitGroup) <-chan InfoReader {
|
||||
ch := make(chan InfoReader, 1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(ch)
|
||||
|
||||
_ = ndx.Iterate(r, func(i Info) error {
|
||||
_ = ndx.Iterate(r, func(i InfoReader) error {
|
||||
select {
|
||||
case <-done:
|
||||
return errors.New("end of iteration")
|
||||
@@ -129,7 +135,7 @@ func iterateChan(r IDRange, ndx Index, done chan bool, wg *sync.WaitGroup) <-cha
|
||||
|
||||
// Iterate invokes the provided callback for all unique content IDs in the underlying sources until either
|
||||
// all contents have been visited or until an error is returned by the callback.
|
||||
func (m Merged) Iterate(r IDRange, cb func(i Info) error) error {
|
||||
func (m Merged) Iterate(r IDRange, cb func(i InfoReader) error) error {
|
||||
var minHeap nextInfoHeap
|
||||
|
||||
done := make(chan bool)
|
||||
@@ -152,7 +158,7 @@ func (m Merged) Iterate(r IDRange, cb func(i Info) error) error {
|
||||
defer wg.Wait()
|
||||
defer close(done)
|
||||
|
||||
var pendingItem Info
|
||||
var pendingItem InfoReader
|
||||
|
||||
for len(minHeap) > 0 {
|
||||
//nolint:forcetypeassert
|
||||
@@ -165,7 +171,7 @@ func (m Merged) Iterate(r IDRange, cb func(i Info) error) error {
|
||||
}
|
||||
|
||||
pendingItem = min.it
|
||||
} else if contentInfoGreaterThan(min.it, pendingItem) {
|
||||
} else if min.it != nil && contentInfoGreaterThan(min.it, pendingItem) {
|
||||
pendingItem = min.it
|
||||
}
|
||||
|
||||
|
||||
@@ -14,25 +14,25 @@
|
||||
|
||||
func TestMerged(t *testing.T) {
|
||||
i1, err := indexWithItems(
|
||||
&InfoStruct{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},
|
||||
&InfoStruct{ContentID: mustParseID(t, "ddeeff"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
&InfoStruct{ContentID: mustParseID(t, "z010203"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
&InfoStruct{ContentID: mustParseID(t, "de1e1e"), TimestampSeconds: 4, PackBlobID: "xx", PackOffset: 111},
|
||||
Info{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},
|
||||
Info{ContentID: mustParseID(t, "ddeeff"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
Info{ContentID: mustParseID(t, "z010203"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
Info{ContentID: mustParseID(t, "de1e1e"), TimestampSeconds: 4, PackBlobID: "xx", PackOffset: 111},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
i2, err := indexWithItems(
|
||||
&InfoStruct{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 3, PackBlobID: "yy", PackOffset: 33},
|
||||
&InfoStruct{ContentID: mustParseID(t, "xaabbcc"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
&InfoStruct{ContentID: mustParseID(t, "de1e1e"), TimestampSeconds: 4, PackBlobID: "xx", PackOffset: 222, Deleted: true},
|
||||
Info{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 3, PackBlobID: "yy", PackOffset: 33},
|
||||
Info{ContentID: mustParseID(t, "xaabbcc"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
Info{ContentID: mustParseID(t, "de1e1e"), TimestampSeconds: 4, PackBlobID: "xx", PackOffset: 222, Deleted: true},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
i3, err := indexWithItems(
|
||||
&InfoStruct{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 2, PackBlobID: "zz", PackOffset: 22},
|
||||
&InfoStruct{ContentID: mustParseID(t, "ddeeff"), TimestampSeconds: 1, PackBlobID: "zz", PackOffset: 222},
|
||||
&InfoStruct{ContentID: mustParseID(t, "k010203"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
&InfoStruct{ContentID: mustParseID(t, "k020304"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
Info{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 2, PackBlobID: "zz", PackOffset: 22},
|
||||
Info{ContentID: mustParseID(t, "ddeeff"), TimestampSeconds: 1, PackBlobID: "zz", PackOffset: 222},
|
||||
Info{ContentID: mustParseID(t, "k010203"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
Info{ContentID: mustParseID(t, "k020304"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 111},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -46,7 +46,7 @@ func TestMerged(t *testing.T) {
|
||||
|
||||
require.Equal(t, uint32(33), i.GetPackOffset())
|
||||
|
||||
require.NoError(t, m.Iterate(AllIDs, func(i Info) error {
|
||||
require.NoError(t, m.Iterate(AllIDs, func(i InfoReader) error {
|
||||
if i.GetContentID() == mustParseID(t, "de1e1e") {
|
||||
if i.GetDeleted() {
|
||||
t.Errorf("iteration preferred deleted content over non-deleted")
|
||||
@@ -59,7 +59,7 @@ func TestMerged(t *testing.T) {
|
||||
|
||||
// error is propagated.
|
||||
someErr := errors.Errorf("some error")
|
||||
require.ErrorIs(t, m.Iterate(AllIDs, func(i Info) error {
|
||||
require.ErrorIs(t, m.Iterate(AllIDs, func(i InfoReader) error {
|
||||
if i.GetContentID() == mustParseID(t, "aabbcc") {
|
||||
return someErr
|
||||
}
|
||||
@@ -70,7 +70,7 @@ func TestMerged(t *testing.T) {
|
||||
fmt.Println("=========== END")
|
||||
|
||||
// empty merged index does not invoke callback during iteration.
|
||||
require.NoError(t, Merged{}.Iterate(AllIDs, func(i Info) error {
|
||||
require.NoError(t, Merged{}.Iterate(AllIDs, func(i InfoReader) error {
|
||||
return someErr
|
||||
}))
|
||||
|
||||
@@ -152,7 +152,7 @@ type failingIndex struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (i failingIndex) GetInfo(contentID ID) (Info, error) {
|
||||
func (i failingIndex) GetInfo(contentID ID) (InfoReader, error) {
|
||||
return nil, i.err
|
||||
}
|
||||
|
||||
@@ -168,23 +168,23 @@ func TestMergedGetInfoError(t *testing.T) {
|
||||
|
||||
func TestMergedIndexIsConsistent(t *testing.T) {
|
||||
i1, err := indexWithItems(
|
||||
&InfoStruct{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},
|
||||
&InfoStruct{ContentID: mustParseID(t, "bbccdd"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},
|
||||
&InfoStruct{ContentID: mustParseID(t, "ccddee"), TimestampSeconds: 1, PackBlobID: "ff", PackOffset: 11, Deleted: true},
|
||||
Info{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},
|
||||
Info{ContentID: mustParseID(t, "bbccdd"), TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},
|
||||
Info{ContentID: mustParseID(t, "ccddee"), TimestampSeconds: 1, PackBlobID: "ff", PackOffset: 11, Deleted: true},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
i2, err := indexWithItems(
|
||||
&InfoStruct{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "yy", PackOffset: 33},
|
||||
&InfoStruct{ContentID: mustParseID(t, "bbccdd"), TimestampSeconds: 1, PackBlobID: "yy", PackOffset: 11, Deleted: true},
|
||||
&InfoStruct{ContentID: mustParseID(t, "ccddee"), TimestampSeconds: 1, PackBlobID: "gg", PackOffset: 11, Deleted: true},
|
||||
Info{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "yy", PackOffset: 33},
|
||||
Info{ContentID: mustParseID(t, "bbccdd"), TimestampSeconds: 1, PackBlobID: "yy", PackOffset: 11, Deleted: true},
|
||||
Info{ContentID: mustParseID(t, "ccddee"), TimestampSeconds: 1, PackBlobID: "gg", PackOffset: 11, Deleted: true},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
i3, err := indexWithItems(
|
||||
&InfoStruct{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "zz", PackOffset: 22},
|
||||
&InfoStruct{ContentID: mustParseID(t, "bbccdd"), TimestampSeconds: 1, PackBlobID: "zz", PackOffset: 11, Deleted: true},
|
||||
&InfoStruct{ContentID: mustParseID(t, "ccddee"), TimestampSeconds: 1, PackBlobID: "hh", PackOffset: 11, Deleted: true},
|
||||
Info{ContentID: mustParseID(t, "aabbcc"), TimestampSeconds: 1, PackBlobID: "zz", PackOffset: 22},
|
||||
Info{ContentID: mustParseID(t, "bbccdd"), TimestampSeconds: 1, PackBlobID: "zz", PackOffset: 11, Deleted: true},
|
||||
Info{ContentID: mustParseID(t, "ccddee"), TimestampSeconds: 1, PackBlobID: "hh", PackOffset: 11, Deleted: true},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -229,7 +229,7 @@ func iterateIDRange(t *testing.T, m Index, r IDRange) []ID {
|
||||
|
||||
var inOrder []ID
|
||||
|
||||
require.NoError(t, m.Iterate(r, func(i Info) error {
|
||||
require.NoError(t, m.Iterate(r, func(i InfoReader) error {
|
||||
inOrder = append(inOrder, i.GetContentID())
|
||||
return nil
|
||||
}))
|
||||
|
||||
@@ -112,7 +112,7 @@ func testPackIndex(t *testing.T, version int) {
|
||||
var infos []Info
|
||||
// deleted contents with all information
|
||||
for i := 0; i < 100; i++ {
|
||||
infos = append(infos, &InfoStruct{
|
||||
infos = append(infos, Info{
|
||||
TimestampSeconds: randomUnixTime(),
|
||||
Deleted: true,
|
||||
ContentID: deterministicContentID(t, "deleted-packed", i),
|
||||
@@ -127,7 +127,7 @@ func testPackIndex(t *testing.T, version int) {
|
||||
}
|
||||
// non-deleted content
|
||||
for i := 0; i < 100; i++ {
|
||||
infos = append(infos, &InfoStruct{
|
||||
infos = append(infos, Info{
|
||||
TimestampSeconds: randomUnixTime(),
|
||||
ContentID: deterministicContentID(t, "packed", i),
|
||||
PackBlobID: deterministicPackBlobID(i),
|
||||
@@ -142,7 +142,7 @@ func testPackIndex(t *testing.T, version int) {
|
||||
|
||||
// dear future reader, if this fails because the number of methods has changed,
|
||||
// you need to add additional test cases above.
|
||||
if cnt := reflect.TypeOf((*Info)(nil)).Elem().NumMethod(); cnt != 11 {
|
||||
if cnt := reflect.TypeOf((*InfoReader)(nil)).Elem().NumMethod(); cnt != 11 {
|
||||
t.Fatalf("unexpected number of methods on content.Info: %v, must update the test", cnt)
|
||||
}
|
||||
|
||||
@@ -202,22 +202,22 @@ func testPackIndex(t *testing.T, version int) {
|
||||
|
||||
if version == 1 {
|
||||
// v1 does not preserve original length.
|
||||
want = withOriginalLength{want, want.GetPackedLength() - fakeEncryptionOverhead}
|
||||
want = withOriginalLength(want, want.GetPackedLength()-fakeEncryptionOverhead)
|
||||
}
|
||||
|
||||
require.Equal(t, ToInfoStruct(want), ToInfoStruct(info2))
|
||||
require.Equal(t, want, ToInfoStruct(info2))
|
||||
}
|
||||
|
||||
cnt := 0
|
||||
|
||||
require.NoError(t, ndx.Iterate(AllIDs, func(info2 Info) error {
|
||||
require.NoError(t, ndx.Iterate(AllIDs, func(info2 InfoReader) error {
|
||||
want := infoMap[info2.GetContentID()]
|
||||
if version == 1 {
|
||||
// v1 does not preserve original length.
|
||||
want = withOriginalLength{want, want.GetPackedLength() - fakeEncryptionOverhead}
|
||||
want = withOriginalLength(want, want.GetPackedLength()-fakeEncryptionOverhead)
|
||||
}
|
||||
|
||||
require.Equal(t, ToInfoStruct(want), ToInfoStruct(info2))
|
||||
require.Equal(t, want, ToInfoStruct(info2))
|
||||
cnt++
|
||||
return nil
|
||||
}))
|
||||
@@ -244,7 +244,7 @@ func testPackIndex(t *testing.T, version int) {
|
||||
for _, prefix := range prefixes {
|
||||
cnt2 := 0
|
||||
prefix := prefix
|
||||
require.NoError(t, ndx.Iterate(PrefixRange(prefix), func(info2 Info) error {
|
||||
require.NoError(t, ndx.Iterate(PrefixRange(prefix), func(info2 InfoReader) error {
|
||||
cnt2++
|
||||
if !strings.HasPrefix(info2.GetContentID().String(), string(prefix)) {
|
||||
t.Errorf("unexpected item %v when iterating prefix %v", info2.GetContentID(), prefix)
|
||||
@@ -257,15 +257,15 @@ func testPackIndex(t *testing.T, version int) {
|
||||
|
||||
func TestPackIndexPerContentLimits(t *testing.T) {
|
||||
cases := []struct {
|
||||
info *InfoStruct
|
||||
info Info
|
||||
errMsg string
|
||||
}{
|
||||
{&InfoStruct{PackedLength: v2MaxContentLength}, "maximum content length is too high"},
|
||||
{&InfoStruct{PackedLength: v2MaxContentLength - 1}, ""},
|
||||
{&InfoStruct{OriginalLength: v2MaxContentLength}, "maximum content length is too high"},
|
||||
{&InfoStruct{OriginalLength: v2MaxContentLength - 1}, ""},
|
||||
{&InfoStruct{PackOffset: v2MaxPackOffset}, "pack offset 1073741824 is too high"},
|
||||
{&InfoStruct{PackOffset: v2MaxPackOffset - 1}, ""},
|
||||
{Info{PackedLength: v2MaxContentLength}, "maximum content length is too high"},
|
||||
{Info{PackedLength: v2MaxContentLength - 1}, ""},
|
||||
{Info{OriginalLength: v2MaxContentLength}, "maximum content length is too high"},
|
||||
{Info{OriginalLength: v2MaxContentLength - 1}, ""},
|
||||
{Info{PackOffset: v2MaxPackOffset}, "pack offset 1073741824 is too high"},
|
||||
{Info{PackOffset: v2MaxPackOffset - 1}, ""},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
@@ -287,7 +287,7 @@ func TestPackIndexPerContentLimits(t *testing.T) {
|
||||
got, err := pi.GetInfo(cid)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, ToInfoStruct(got), ToInfoStruct(tc.info))
|
||||
require.Equal(t, ToInfoStruct(got), tc.info)
|
||||
} else {
|
||||
err := b.buildV2(&result)
|
||||
require.Error(t, err)
|
||||
@@ -302,7 +302,7 @@ func TestSortedContents(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
v := deterministicContentID(t, "", i)
|
||||
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: v,
|
||||
})
|
||||
}
|
||||
@@ -322,34 +322,34 @@ func TestSortedContents(t *testing.T) {
|
||||
func TestSortedContents2(t *testing.T) {
|
||||
b := Builder{}
|
||||
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "0123"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "1023"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "0f23"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "f023"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "g0123"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "g1023"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "i0123"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "i1023"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "h0123"),
|
||||
})
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: mustParseID(t, "h1023"),
|
||||
})
|
||||
|
||||
@@ -372,7 +372,7 @@ func TestPackIndexV2TooManyUniqueFormats(t *testing.T) {
|
||||
for i := 0; i < v2MaxFormatCount; i++ {
|
||||
v := deterministicContentID(t, "", i)
|
||||
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: v,
|
||||
PackBlobID: blob.ID(v.String()),
|
||||
FormatVersion: 1,
|
||||
@@ -383,7 +383,7 @@ func TestPackIndexV2TooManyUniqueFormats(t *testing.T) {
|
||||
require.NoError(t, b.buildV2(io.Discard))
|
||||
|
||||
// add one more to push it over the edge
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: deterministicContentID(t, "", v2MaxFormatCount),
|
||||
FormatVersion: 1,
|
||||
CompressionHeaderID: compression.HeaderID(5000),
|
||||
@@ -404,7 +404,7 @@ func fuzzTestIndexOpen(originalData []byte) {
|
||||
return
|
||||
}
|
||||
cnt := 0
|
||||
_ = ndx.Iterate(AllIDs, func(cb Info) error {
|
||||
_ = ndx.Iterate(AllIDs, func(cb InfoReader) error {
|
||||
if cnt < 10 {
|
||||
_, _ = ndx.GetInfo(cb.GetContentID())
|
||||
}
|
||||
@@ -466,7 +466,7 @@ func TestShard(t *testing.T) {
|
||||
|
||||
// add ID to the builder
|
||||
for _, id := range ids {
|
||||
b.Add(&InfoStruct{
|
||||
b.Add(Info{
|
||||
ContentID: deterministicContentID(t, "", id),
|
||||
})
|
||||
}
|
||||
@@ -516,13 +516,11 @@ func verifyAllShardedIDs(t *testing.T, sharded []Builder, numTotal, numShards in
|
||||
return lens
|
||||
}
|
||||
|
||||
type withOriginalLength struct {
|
||||
Info
|
||||
originalLength uint32
|
||||
}
|
||||
func withOriginalLength(is Info, originalLength uint32) Info {
|
||||
// clone and override original length
|
||||
is.OriginalLength = originalLength
|
||||
|
||||
func (o withOriginalLength) GetOriginalLength() uint32 {
|
||||
return o.originalLength
|
||||
return is
|
||||
}
|
||||
|
||||
func mustParseID(t *testing.T, s string) ID {
|
||||
|
||||
@@ -564,8 +564,8 @@ func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld ind
|
||||
return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
|
||||
}
|
||||
|
||||
_ = ndx.Iterate(index.AllIDs, func(i index.Info) error {
|
||||
bld.Add(i)
|
||||
_ = ndx.Iterate(index.AllIDs, func(i index.InfoReader) error {
|
||||
bld.Add(index.ToInfoStruct(i))
|
||||
return nil
|
||||
})
|
||||
|
||||
|
||||
@@ -13,12 +13,9 @@
|
||||
// IDPrefix represents a content ID prefix (empty string or single character between 'g' and 'z').
|
||||
IDPrefix = index.IDPrefix
|
||||
|
||||
// Info is an information about a single piece of content managed by Manager.
|
||||
// Info describes a single piece of content.
|
||||
Info = index.Info
|
||||
|
||||
// InfoStruct is an implementation of Info based on a structure.
|
||||
InfoStruct = index.InfoStruct
|
||||
|
||||
// IDRange represents a range of IDs.
|
||||
IDRange = index.IDRange
|
||||
)
|
||||
@@ -28,11 +25,6 @@
|
||||
//nolint:gochecknoglobals
|
||||
var EmptyID = index.EmptyID
|
||||
|
||||
// ToInfoStruct converts the provided Info to *InfoStruct.
|
||||
func ToInfoStruct(i Info) *InfoStruct {
|
||||
return index.ToInfoStruct(i)
|
||||
}
|
||||
|
||||
// IDFromHash creates and validates content ID from a prefix and hash.
|
||||
func IDFromHash(prefix IDPrefix, hash []byte) (ID, error) {
|
||||
//nolint:wrapcheck
|
||||
|
||||
@@ -592,10 +592,10 @@ func (r *grpcInnerSession) contentInfo(ctx context.Context, contentID content.ID
|
||||
case *apipb.SessionResponse_GetContentInfo:
|
||||
contentID, err := content.ParseID(rr.GetContentInfo.GetInfo().GetId())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "invalid content ID")
|
||||
return content.Info{}, errors.Wrap(err, "invalid content ID")
|
||||
}
|
||||
|
||||
return &content.InfoStruct{
|
||||
return content.Info{
|
||||
ContentID: contentID,
|
||||
PackedLength: rr.GetContentInfo.GetInfo().GetPackedLength(),
|
||||
TimestampSeconds: rr.GetContentInfo.GetInfo().GetTimestampSeconds(),
|
||||
@@ -607,11 +607,11 @@ func (r *grpcInnerSession) contentInfo(ctx context.Context, contentID content.ID
|
||||
}, nil
|
||||
|
||||
default:
|
||||
return nil, unhandledSessionResponse(resp)
|
||||
return content.Info{}, unhandledSessionResponse(resp)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errNoSessionResponse()
|
||||
return content.Info{}, errNoSessionResponse()
|
||||
}
|
||||
|
||||
func errorFromSessionResponse(rr *apipb.ErrorResponse) error {
|
||||
|
||||
@@ -88,10 +88,10 @@ func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content.
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if d, ok := f.data[contentID]; ok {
|
||||
return &content.InfoStruct{ContentID: contentID, PackedLength: uint32(len(d))}, nil
|
||||
return content.Info{ContentID: contentID, PackedLength: uint32(len(d))}, nil
|
||||
}
|
||||
|
||||
return nil, blob.ErrBlobNotFound
|
||||
return content.Info{}, blob.ErrBlobNotFound
|
||||
}
|
||||
|
||||
func (f *fakeContentManager) Flush(ctx context.Context) error {
|
||||
|
||||
@@ -23,7 +23,7 @@ func TestDefaultGlobalPolicy(t *testing.T) {
|
||||
|
||||
// verify we created global policy entry
|
||||
|
||||
var contents []content.InfoStruct
|
||||
var contents []content.Info
|
||||
|
||||
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "content", "ls", "--json"), &contents)
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ func (s *formatSpecificTestSuite) TestSnapshotGC(t *testing.T) {
|
||||
e.RunAndExpectSuccess(t, "maintenance", "run", "--full", "--safety=full")
|
||||
|
||||
// data block + directory block + manifest block + manifest block from manifest deletion
|
||||
var contentInfo []content.InfoStruct
|
||||
var contentInfo []content.Info
|
||||
|
||||
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "content", "list", "--json"), &contentInfo)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user