enhancement(search): implement cluster health checks

This commit is contained in:
fschade
2025-07-31 18:39:47 +02:00
parent 1236cedacc
commit 5abfd1744e
4 changed files with 102 additions and 2 deletions

View File

@@ -7,6 +7,8 @@ import (
"fmt"
"strings"
storageProvider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/reva/v2/pkg/utils"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
@@ -22,6 +24,14 @@ type Engine struct {
}
func NewEngine(index string, client *opensearchgoAPI.Client) (*Engine, error) {
_, healthy, err := clusterHealth(context.Background(), client, []string{index})
switch {
case err != nil:
return nil, fmt.Errorf("failed to get cluster health: %w", err)
case !healthy:
return nil, fmt.Errorf("cluster health is not healthy")
}
return &Engine{index: index, client: client}, nil
}
@@ -41,9 +51,25 @@ func (e *Engine) Search(ctx context.Context, sir *searchService.SearchIndexReque
return nil, fmt.Errorf("failed to compile query: %w", err)
}
body, err := NewRootQuery(builderToBoolQuery(builder).Filter(
boolQuery := builderToBoolQuery(builder).Filter(
NewTermQuery[bool]("Deleted").Value(false),
)).MarshalJSON()
)
if sir.Ref != nil {
boolQuery.Filter(
NewMatchPhraseQuery("RootID").Query(
storagespace.FormatResourceID(
&storageProvider.ResourceId{
StorageId: sir.Ref.GetResourceId().GetStorageId(),
SpaceId: sir.Ref.GetResourceId().GetSpaceId(),
OpaqueId: sir.Ref.GetResourceId().GetOpaqueId(),
},
),
),
)
}
body, err := NewRootQuery(boolQuery).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal query: %w", err)
}

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"testing"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -12,11 +14,27 @@ import (
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch/internal/test"
)
func TestNewEngine(t *testing.T) {
t.Run("fails to create if the cluster is not healthy", func(t *testing.T) {
client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{
Client: opensearchgo.Config{
Addresses: []string{"http://localhost:1025"},
},
})
require.NoError(t, err, "failed to create OpenSearch client")
engine, err := opensearch.NewEngine("test-engine-new-engine", client)
assert.Nil(t, engine)
assert.ErrorIs(t, err, opensearch.ErrUnhealthyCluster)
})
}
func TestEngine_Search(t *testing.T) {
index := "test-engine-search"
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCreate(index, "")
defer tc.Require.IndicesDelete([]string{index})
@@ -60,6 +78,7 @@ func TestEngine_Upsert(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCreate(index, "")
defer tc.Require.IndicesDelete([]string{index})
@@ -81,6 +100,7 @@ func TestEngine_Delete(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCreate(index, "")
defer tc.Require.IndicesDelete([]string{index})
@@ -108,6 +128,7 @@ func TestEngine_Restore(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCreate(index, "")
defer tc.Require.IndicesDelete([]string{index})
@@ -136,6 +157,7 @@ func TestEngine_Purge(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCreate(index, "")
defer tc.Require.IndicesDelete([]string{index})
@@ -158,6 +180,7 @@ func TestEngine_DocCount(t *testing.T) {
tc := opensearchtest.NewDefaultTestClient(t)
tc.Require.IndicesReset([]string{index})
tc.Require.IndicesCount([]string{index}, "", 0)
tc.Require.IndicesCreate(index, "")
defer tc.Require.IndicesDelete([]string{index})

View File

@@ -115,6 +115,22 @@ func (tc *TestClient) IndicesDelete(ctx context.Context, indices []string) error
}
}
func (tc *TestClient) IndicesCreate(ctx context.Context, index string, body string) error {
resp, err := tc.c.Indices.Create(ctx, opensearchgoAPI.IndicesCreateReq{
Index: index,
Body: strings.NewReader(body),
})
switch {
case err != nil:
return fmt.Errorf("failed to create index %s: %w", index, err)
case !resp.Acknowledged:
return fmt.Errorf("index creation not acknowledged for %s", index)
default:
return nil
}
}
func (tc *TestClient) IndicesCount(ctx context.Context, indices []string, body string) (int, error) {
if err := tc.IndicesRefresh(ctx, indices, []int{404}); err != nil {
return 0, err
@@ -182,6 +198,10 @@ func (trc *testRequireClient) IndicesRefresh(indices []string, ignore []int) {
require.NoError(trc.t, trc.tc.IndicesRefresh(trc.t.Context(), indices, ignore))
}
func (trc *testRequireClient) IndicesCreate(index string, body string) {
require.NoError(trc.t, trc.tc.IndicesCreate(trc.t.Context(), index, body))
}
func (trc *testRequireClient) IndicesDelete(indices []string) {
require.NoError(trc.t, trc.tc.IndicesDelete(trc.t.Context(), indices))
}

View File

@@ -1,10 +1,18 @@
package opensearch
import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"
"dario.cat/mergo"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
)
var (
ErrUnhealthyCluster = fmt.Errorf("cluster is not healthy")
)
func isEmpty(x any) bool {
@@ -57,3 +65,26 @@ func convert[T any](v any) (T, error) {
return t, nil
}
func clusterHealth(ctx context.Context, client *opensearchgoAPI.Client, indices []string) (*opensearchgoAPI.ClusterHealthResp, bool, error) {
resp, err := client.Cluster.Health(ctx, &opensearchgoAPI.ClusterHealthReq{
Indices: indices,
Params: opensearchgoAPI.ClusterHealthParams{
Local: opensearchgoAPI.ToPointer(true),
Timeout: 5 * time.Second,
},
})
if err != nil {
return nil, false, fmt.Errorf("%w, failed to get cluster health: %w", ErrUnhealthyCluster, err)
}
if resp.TimedOut {
return resp, false, fmt.Errorf("%w, cluster health request timed out", ErrUnhealthyCluster)
}
if resp.Status != "green" && resp.Status != "yellow" {
return resp, false, fmt.Errorf("%w, cluster health is not green or yellow: %s", ErrUnhealthyCluster, resp.Status)
}
return resp, true, nil
}