Files
opencloud/services/search/pkg/command/server.go
Christian Richter e7a5788634 migrate search from urfave/cli to spf13/cobra
Signed-off-by: Christian Richter <c.richter@opencloud.eu>
2025-12-15 16:40:26 +01:00

241 lines
8.3 KiB
Go

package command
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"os"
"os/signal"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/runner"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/search/pkg/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
"github.com/opencloud-eu/opencloud/services/search/pkg/logging"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
"github.com/opencloud-eu/opencloud/services/search/pkg/server/debug"
"github.com/opencloud-eu/opencloud/services/search/pkg/server/grpc"
svcEvent "github.com/opencloud-eu/opencloud/services/search/pkg/service/event"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/spf13/cobra"
)
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cobra.Command {
return &cobra.Command{
Use: "server",
Short: fmt.Sprintf("start the %s service without runtime (unsupervised mode)", cfg.Service.Name),
PreRunE: func(cmd *cobra.Command, args []string) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
RunE: func(cmd *cobra.Command, args []string) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
traceProvider, err := tracing.GetTraceProvider(cmd.Context(), cfg.Commons.TracesExporter, cfg.Service.Name)
if err != nil {
return err
}
cfg.GrpcClient, err = ogrpc.NewClient(
append(ogrpc.GetClientOptions(cfg.GRPCClientTLS), ogrpc.WithTraceProvider(traceProvider))...,
)
if err != nil {
return err
}
var cancel context.CancelFunc
if cfg.Context == nil {
cfg.Context, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}
ctx := cfg.Context
mtrcs := metrics.New()
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)
// initialize search engine
var eng search.Engine
switch cfg.Engine.Type {
case "bleve":
idx, err := bleve.NewIndex(cfg.Engine.Bleve.Datapath)
if err != nil {
return err
}
defer func() {
if err = idx.Close(); err != nil {
logger.Error().Err(err).Msg("could not close bleve index")
}
}()
eng = bleve.NewBackend(idx, bleveQuery.DefaultCreator, logger)
case "open-search":
clientConfig := opensearchgo.Config{
Addresses: cfg.Engine.OpenSearch.Client.Addresses,
Username: cfg.Engine.OpenSearch.Client.Username,
Password: cfg.Engine.OpenSearch.Client.Password,
Header: cfg.Engine.OpenSearch.Client.Header,
RetryOnStatus: cfg.Engine.OpenSearch.Client.RetryOnStatus,
DisableRetry: cfg.Engine.OpenSearch.Client.DisableRetry,
EnableRetryOnTimeout: cfg.Engine.OpenSearch.Client.EnableRetryOnTimeout,
MaxRetries: cfg.Engine.OpenSearch.Client.MaxRetries,
CompressRequestBody: cfg.Engine.OpenSearch.Client.CompressRequestBody,
DiscoverNodesOnStart: cfg.Engine.OpenSearch.Client.DiscoverNodesOnStart,
DiscoverNodesInterval: cfg.Engine.OpenSearch.Client.DiscoverNodesInterval,
EnableMetrics: cfg.Engine.OpenSearch.Client.EnableMetrics,
EnableDebugLogger: cfg.Engine.OpenSearch.Client.EnableDebugLogger,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: cfg.Engine.OpenSearch.Client.Insecure,
},
},
}
if cfg.Engine.OpenSearch.Client.CACert != "" {
certBytes, err := os.ReadFile(cfg.Engine.OpenSearch.Client.CACert)
if err != nil {
return fmt.Errorf("failed to read CA cert: %w", err)
}
clientConfig.CACert = certBytes
}
client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{Client: clientConfig})
if err != nil {
return fmt.Errorf("failed to create OpenSearch client: %w", err)
}
openSearchBackend, err := opensearch.NewBackend(cfg.Engine.OpenSearch.ResourceIndex.Name, client)
if err != nil {
return fmt.Errorf("failed to create OpenSearch backend: %w", err)
}
eng = openSearchBackend
default:
return fmt.Errorf("unknown search engine: %s", cfg.Engine.Type)
}
// initialize gateway selector
selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(traceProvider))
if err != nil {
logger.Fatal().Err(err).Msg("could not get reva gateway selector")
return err
}
// initialize search content extractor
var extractor content.Extractor
switch cfg.Extractor.Type {
case "basic":
if extractor, err = content.NewBasicExtractor(logger); err != nil {
return err
}
case "tika":
if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil {
return err
}
default:
return fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
ss := search.NewService(selector, eng, extractor, mtrcs, logger, cfg)
// setup the servers
gr := runner.NewGroup()
if !cfg.GRPC.Disabled {
grpcServer, err := grpc.Server(
grpc.Config(cfg),
grpc.Logger(logger),
grpc.Name(cfg.Service.Name),
grpc.Context(ctx),
grpc.Metrics(mtrcs),
grpc.JWTSecret(cfg.TokenManager.JWTSecret),
grpc.TraceProvider(traceProvider),
grpc.GatewaySelector(selector),
grpc.Searcher(ss),
)
if err != nil {
logger.Error().Err(err).Str("transport", "grpc").Msg("Failed to initialize server")
return err
}
gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer))
} else {
logger.Info().Msg("gRPC server disabled, not starting gRPC service")
}
if !cfg.Events.Disabled {
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
bus, err := raw.FromConfig(context.Background(), connName, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
AuthUsername: cfg.Events.AuthUsername,
AuthPassword: cfg.Events.AuthPassword,
MaxAckPending: cfg.Events.MaxAckPending,
AckWait: cfg.Events.AckWait,
})
if err != nil {
logger.Error().Err(err).Msg("Failed to create event bus client")
return err
}
eventSvc, err := svcEvent.New(ctx, bus, logger, traceProvider, mtrcs, ss, cfg.Events.DebounceDuration, cfg.Events.NumConsumers, cfg.Events.AsyncUploads)
if err != nil {
logger.Error().Err(err).Str("transport", "event").Msg("Failed to initialize server")
return err
}
gr.Add(runner.New(cfg.Service.Name+".svc", func() error {
return eventSvc.Run()
}, func() {
eventSvc.Close()
}))
} else {
logger.Info().Msg("event listening disabled, not starting event service")
}
// always start a debug server
{
debugServer, err := debug.Server(
debug.Logger(logger),
debug.Context(ctx),
debug.Config(cfg),
)
if err != nil {
logger.Error().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
}
gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer))
}
grResults := gr.Run(ctx)
// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}