From 400b9a5d30b0ad60f492aff66ce9f9be0aec8bec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 7 Jul 2025 15:36:50 +0200 Subject: [PATCH] Expose nats metrics of the search service --- services/search/.mockery.yaml | 2 +- services/search/pkg/metrics/metrics.go | 32 +++++++++++++++++-- services/search/pkg/search/events.go | 30 ++++++++++++++++- services/search/pkg/search/events_test.go | 2 +- services/search/pkg/server/grpc/server.go | 1 + services/search/pkg/service/grpc/v0/option.go | 11 +++++++ .../search/pkg/service/grpc/v0/service.go | 2 +- 7 files changed, 73 insertions(+), 7 deletions(-) diff --git a/services/search/.mockery.yaml b/services/search/.mockery.yaml index 12e0d3e066..1c29054193 100644 --- a/services/search/.mockery.yaml +++ b/services/search/.mockery.yaml @@ -15,4 +15,4 @@ packages: Retriever: {} github.com/opencloud-eu/opencloud/services/search/pkg/search: interfaces: - Searcher: {} + Searcher: {} \ No newline at end of file diff --git a/services/search/pkg/metrics/metrics.go b/services/search/pkg/metrics/metrics.go index c8459d7822..437dc544ce 100644 --- a/services/search/pkg/metrics/metrics.go +++ b/services/search/pkg/metrics/metrics.go @@ -1,6 +1,8 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) var ( // Namespace defines the namespace for the defines metrics. @@ -13,7 +15,10 @@ var ( // Metrics defines the available metrics of this service. type Metrics struct { // Counter *prometheus.CounterVec - BuildInfo *prometheus.GaugeVec + BuildInfo *prometheus.GaugeVec + EventsOutstandingAcks prometheus.Gauge + EventsUnprocessed prometheus.Gauge + EventsRedelivered prometheus.Gauge } // New initializes the available metrics. @@ -25,9 +30,30 @@ func New() *Metrics { Name: "build_info", Help: "Build information", }, []string{"version"}), + EventsOutstandingAcks: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_outstanding_acks", + Help: "Number of outstanding acks for events", + }), + EventsUnprocessed: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_unprocessed", + Help: "Number of unprocessed events", + }), + EventsRedelivered: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_redelivered", + Help: "Number of redelivered events", + }), } _ = prometheus.Register(m.BuildInfo) - // TODO: implement metrics + _ = prometheus.Register(m.EventsOutstandingAcks) + _ = prometheus.Register(m.EventsUnprocessed) + _ = prometheus.Register(m.EventsRedelivered) + return m } diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index 32c4c8c539..eca39f94f4 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -1,11 +1,13 @@ package search import ( + "context" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" + "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/raw" "github.com/opencloud-eu/reva/v2/pkg/storagespace" @@ -13,7 +15,7 @@ import ( // HandleEvents listens to the needed events, // it handles the whole resource indexing livecycle. -func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.Logger) error { +func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, m *metrics.Metrics, logger log.Logger) error { evts := []events.Unmarshaller{ events.ItemTrashed{}, events.ItemRestored{}, @@ -37,6 +39,10 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log. return err } + if m != nil { + monitorMetrics(stream, "search-pull", m, logger) + } + if cfg.Events.NumConsumers == 0 { cfg.Events.NumConsumers = 1 } @@ -103,3 +109,25 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log. return nil } + +func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { + ctx := context.Background() + consumer, err := stream.JetStream().Consumer(ctx, name) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + ticker := time.NewTicker(5 * time.Second) + go func() { + for range ticker.C { + info, err := consumer.Info(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + + m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) + m.EventsUnprocessed.Set(float64(info.NumPending)) + m.EventsRedelivered.Set(float64(info.NumRedelivered)) + logger.Debug().Msg("updated event metrics") + } + }() +} diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/search/events_test.go index 15d37e541c..6640785818 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/search/events_test.go @@ -31,7 +31,7 @@ var _ = DescribeTable("events", Events: config.Events{ AsyncUploads: asyncUploads, }, - }, log.NewLogger()) + }, nil, log.NewLogger()) for _, mck := range mcks { s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { diff --git a/services/search/pkg/server/grpc/server.go b/services/search/pkg/server/grpc/server.go index 61e59a4f51..d3a012ec89 100644 --- a/services/search/pkg/server/grpc/server.go +++ b/services/search/pkg/server/grpc/server.go @@ -36,6 +36,7 @@ func Server(opts ...Option) (grpc.Service, func(), error) { svc.Logger(options.Logger), svc.JWTSecret(options.JWTSecret), svc.TracerProvider(options.TraceProvider), + svc.Metrics(options.Metrics), ) if err != nil { options.Logger.Error(). diff --git a/services/search/pkg/service/grpc/v0/option.go b/services/search/pkg/service/grpc/v0/option.go index 2bb8d0bf3d..b7ab82250b 100644 --- a/services/search/pkg/service/grpc/v0/option.go +++ b/services/search/pkg/service/grpc/v0/option.go @@ -3,6 +3,7 @@ package service import ( "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" + "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" "go.opentelemetry.io/otel/trace" ) @@ -15,6 +16,7 @@ type Options struct { Config *config.Config JWTSecret string TracerProvider trace.TracerProvider + Metrics *metrics.Metrics } func newOptions(opts ...Option) Options { @@ -54,3 +56,12 @@ func TracerProvider(val trace.TracerProvider) Option { o.TracerProvider = val } } + +// Metrics provides a function to set the Metrics option. +func Metrics(val *metrics.Metrics) Option { + return func(o *Options) { + if val != nil { + o.Metrics = val + } + } +} diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 59dc238e91..bb410e5645 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -98,7 +98,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, err } - if err := search.HandleEvents(ss, stream, cfg, logger); err != nil { + if err := search.HandleEvents(ss, stream, cfg, options.Metrics, logger); err != nil { return nil, teardown, err }