diff --git a/services/search/pkg/opensearch/engine.go b/services/search/pkg/opensearch/engine.go index 7dc9ea7be4..12f537f3a2 100644 --- a/services/search/pkg/opensearch/engine.go +++ b/services/search/pkg/opensearch/engine.go @@ -7,6 +7,8 @@ import ( "fmt" "strings" + storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" @@ -22,6 +24,14 @@ type Engine struct { } func NewEngine(index string, client *opensearchgoAPI.Client) (*Engine, error) { + _, healthy, err := clusterHealth(context.Background(), client, []string{index}) + switch { + case err != nil: + return nil, fmt.Errorf("failed to get cluster health: %w", err) + case !healthy: + return nil, fmt.Errorf("cluster health is not healthy") + } + return &Engine{index: index, client: client}, nil } @@ -41,9 +51,25 @@ func (e *Engine) Search(ctx context.Context, sir *searchService.SearchIndexReque return nil, fmt.Errorf("failed to compile query: %w", err) } - body, err := NewRootQuery(builderToBoolQuery(builder).Filter( + boolQuery := builderToBoolQuery(builder).Filter( NewTermQuery[bool]("Deleted").Value(false), - )).MarshalJSON() + ) + + if sir.Ref != nil { + boolQuery.Filter( + NewMatchPhraseQuery("RootID").Query( + storagespace.FormatResourceID( + &storageProvider.ResourceId{ + StorageId: sir.Ref.GetResourceId().GetStorageId(), + SpaceId: sir.Ref.GetResourceId().GetSpaceId(), + OpaqueId: sir.Ref.GetResourceId().GetOpaqueId(), + }, + ), + ), + ) + } + + body, err := NewRootQuery(boolQuery).MarshalJSON() if err != nil { return nil, fmt.Errorf("failed to marshal query: %w", err) } diff --git a/services/search/pkg/opensearch/engine_test.go b/services/search/pkg/opensearch/engine_test.go index 0e8a300287..04d3717ad6 100644 --- a/services/search/pkg/opensearch/engine_test.go +++ b/services/search/pkg/opensearch/engine_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" + opensearchgo "github.com/opensearch-project/opensearch-go/v4" + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12,11 +14,27 @@ import ( "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test" ) +func TestNewEngine(t *testing.T) { + t.Run("fails to create if the cluster is not healthy", func(t *testing.T) { + client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{ + Client: opensearchgo.Config{ + Addresses: []string{"http://localhost:1025"}, + }, + }) + require.NoError(t, err, "failed to create OpenSearch client") + + engine, err := opensearch.NewEngine("test-engine-new-engine", client) + assert.Nil(t, engine) + assert.ErrorIs(t, err, opensearch.ErrUnhealthyCluster) + }) +} + func TestEngine_Search(t *testing.T) { index := "test-engine-search" tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, "", 0) + tc.Require.IndicesCreate(index, "") defer tc.Require.IndicesDelete([]string{index}) @@ -60,6 +78,7 @@ func TestEngine_Upsert(t *testing.T) { tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, "", 0) + tc.Require.IndicesCreate(index, "") defer tc.Require.IndicesDelete([]string{index}) @@ -81,6 +100,7 @@ func TestEngine_Delete(t *testing.T) { tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, "", 0) + tc.Require.IndicesCreate(index, "") defer tc.Require.IndicesDelete([]string{index}) @@ -108,6 +128,7 @@ func TestEngine_Restore(t *testing.T) { tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, "", 0) + tc.Require.IndicesCreate(index, "") defer tc.Require.IndicesDelete([]string{index}) @@ -136,6 +157,7 @@ func TestEngine_Purge(t *testing.T) { tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, "", 0) + tc.Require.IndicesCreate(index, "") defer tc.Require.IndicesDelete([]string{index}) @@ -158,6 +180,7 @@ func TestEngine_DocCount(t *testing.T) { tc := opensearchtest.NewDefaultTestClient(t) tc.Require.IndicesReset([]string{index}) tc.Require.IndicesCount([]string{index}, "", 0) + tc.Require.IndicesCreate(index, "") defer tc.Require.IndicesDelete([]string{index}) diff --git a/services/search/pkg/opensearch/internal/test/os.go b/services/search/pkg/opensearch/internal/test/os.go index 24492f50c1..dfea2c54de 100644 --- a/services/search/pkg/opensearch/internal/test/os.go +++ b/services/search/pkg/opensearch/internal/test/os.go @@ -115,6 +115,22 @@ func (tc *TestClient) IndicesDelete(ctx context.Context, indices []string) error } } +func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body string) error { + resp, err := tc.c.Indices.Create(ctx, opensearchgoAPI.IndicesCreateReq{ + Index: index, + Body: strings.NewReader(body), + }) + + switch { + case err != nil: + return fmt.Errorf("failed to create index %s: %w", index, err) + case !resp.Acknowledged: + return fmt.Errorf("index creation not acknowledged for %s", index) + default: + return nil + } +} + func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body string) (int, error) { if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil { return 0, err @@ -182,6 +198,10 @@ func (trc *testRequireClient) IndicesRefresh(indices []string, ignore []int) { require.NoError(trc.t, trc.tc.IndicesRefresh(trc.t.Context(), indices, ignore)) } +func (trc *testRequireClient) IndicesCreate(index string, body string) { + require.NoError(trc.t, trc.tc.IndicesCreate(trc.t.Context(), index, body)) +} + func (trc *testRequireClient) IndicesDelete(indices []string) { require.NoError(trc.t, trc.tc.IndicesDelete(trc.t.Context(), indices)) } diff --git a/services/search/pkg/opensearch/opensearch.go b/services/search/pkg/opensearch/opensearch.go index 7111b82d42..2ca738d24a 100644 --- a/services/search/pkg/opensearch/opensearch.go +++ b/services/search/pkg/opensearch/opensearch.go @@ -1,10 +1,18 @@ package opensearch import ( + "context" "encoding/json" + "fmt" "reflect" + "time" "dario.cat/mergo" + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" +) + +var ( + ErrUnhealthyCluster = fmt.Errorf("cluster is not healthy") ) func isEmpty(x any) bool { @@ -57,3 +65,26 @@ func convert[T any](v any) (T, error) { return t, nil } + +func clusterHealth(ctx context.Context, client *opensearchgoAPI.Client, indices []string) (*opensearchgoAPI.ClusterHealthResp, bool, error) { + resp, err := client.Cluster.Health(ctx, &opensearchgoAPI.ClusterHealthReq{ + Indices: indices, + Params: opensearchgoAPI.ClusterHealthParams{ + Local: opensearchgoAPI.ToPointer(true), + Timeout: 5 * time.Second, + }, + }) + if err != nil { + return nil, false, fmt.Errorf("%w, failed to get cluster health: %w", ErrUnhealthyCluster, err) + } + + if resp.TimedOut { + return resp, false, fmt.Errorf("%w, cluster health request timed out", ErrUnhealthyCluster) + } + + if resp.Status != "green" && resp.Status != "yellow" { + return resp, false, fmt.Errorf("%w, cluster health is not green or yellow: %s", ErrUnhealthyCluster, resp.Status) + } + + return resp, true, nil +}