diff --git a/go.mod b/go.mod index 67c7fb375..6371e9078 100644 --- a/go.mod +++ b/go.mod @@ -111,7 +111,7 @@ require ( github.com/prometheus/statsd_exporter v0.22.2 // indirect github.com/rs/xid v1.3.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.7.0 // indirect + go.uber.org/multierr v1.7.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20211021150943-2b146023228c // indirect diff --git a/repo/content/merged.go b/repo/content/merged.go index 701e91223..9992abe64 100644 --- a/repo/content/merged.go +++ b/repo/content/merged.go @@ -4,10 +4,9 @@ "container/heap" "github.com/pkg/errors" + "go.uber.org/multierr" ) -const iterateParallelism = 16 - // mergedIndex is an implementation of Index that transparently merges returns from underlying Indexes. type mergedIndex []packIndex @@ -23,13 +22,13 @@ func (m mergedIndex) ApproximateCount() int { // Close closes all underlying indexes. func (m mergedIndex) Close() error { + var err error + for _, ndx := range m { - if err := ndx.Close(); err != nil { - return errors.Wrap(err, "error closing index shard") - } + err = multierr.Append(err, ndx.Close()) } - return nil + return errors.Wrap(err, "closing index shards") } func contentInfoGreaterThan(a, b Info) bool { @@ -107,7 +106,7 @@ func (h *nextInfoHeap) Pop() interface{} { } func iterateChan(r IDRange, ndx packIndex, done chan bool) <-chan Info { - ch := make(chan Info, iterateParallelism) + ch := make(chan Info, 1) go func() { defer close(ch) diff --git a/repo/content/merged_test.go b/repo/content/merged_test.go index a2f1ab99f..23c85cebb 100644 --- a/repo/content/merged_test.go +++ b/repo/content/merged_test.go @@ -2,6 +2,7 @@ import ( "bytes" + "fmt" "reflect" "testing" @@ -37,14 +38,13 @@ func TestMerged(t *testing.T) { m := mergedIndex{i1, i2, i3} - i, err := m.GetInfo("aabbcc") - if err != nil || i == nil { - t.Fatalf("unable to get info: %v", err) - } + require.Equal(t, m.ApproximateCount(), 11) - if got, want := i.GetPackOffset(), uint32(33); got != want { - t.Errorf("invalid pack offset %v, wanted %v", got, want) - } + i, err := m.GetInfo("aabbcc") + require.NoError(t, err) + require.NotNil(t, i) + + require.Equal(t, uint32(33), i.GetPackOffset()) require.NoError(t, m.Iterate(AllIDs, func(i Info) error { if i.GetContentID() == "de1e1e" { @@ -55,11 +55,28 @@ func TestMerged(t *testing.T) { return nil })) - if i, err := m.GetInfo("de1e1e"); err != nil { - t.Errorf("error getting deleted content info: %v", err) - } else if i.GetDeleted() { - t.Errorf("GetInfo preferred deleted content over non-deleted") - } + fmt.Println("=========== START") + + // error is propagated. + someErr := errors.Errorf("some error") + require.ErrorIs(t, m.Iterate(AllIDs, func(i Info) error { + if i.GetContentID() == "aabbcc" { + return someErr + } + + return nil + }), someErr) + + fmt.Println("=========== END") + + // empty merged index does not invoke callback during iteration. + require.NoError(t, mergedIndex{}.Iterate(AllIDs, func(i Info) error { + return someErr + })) + + i, err = m.GetInfo("de1e1e") + require.NoError(t, err) + require.False(t, i.GetDeleted()) cases := []struct { r IDRange @@ -130,6 +147,25 @@ func TestMerged(t *testing.T) { } } +type failingIndex struct { + packIndex + err error +} + +func (i failingIndex) GetInfo(contentID ID) (Info, error) { + return nil, i.err +} + +func TestMergedGetInfoError(t *testing.T) { + someError := errors.Errorf("some error") + + m := mergedIndex{failingIndex{nil, someError}} + + info, err := m.GetInfo("some-id") + require.ErrorIs(t, err, someError) + require.Nil(t, info) +} + func TestMergedIndexIsConsistent(t *testing.T) { i1, err := indexWithItems( &InfoStruct{ContentID: "aabbcc", TimestampSeconds: 1, PackBlobID: "xx", PackOffset: 11},