Expose nats metrics of the search service

This commit is contained in:
André Duffeck
2025-07-07 15:36:50 +02:00
parent ef30a996df
commit 400b9a5d30
7 changed files with 73 additions and 7 deletions

View File

@@ -15,4 +15,4 @@ packages:
Retriever: {}
github.com/opencloud-eu/opencloud/services/search/pkg/search:
interfaces:
Searcher: {}
Searcher: {}

View File

@@ -1,6 +1,8 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
// Namespace defines the namespace for the defines metrics.
@@ -13,7 +15,10 @@ var (
// Metrics defines the available metrics of this service.
type Metrics struct {
// Counter *prometheus.CounterVec
BuildInfo *prometheus.GaugeVec
BuildInfo *prometheus.GaugeVec
EventsOutstandingAcks prometheus.Gauge
EventsUnprocessed prometheus.Gauge
EventsRedelivered prometheus.Gauge
}
// New initializes the available metrics.
@@ -25,9 +30,30 @@ func New() *Metrics {
Name: "build_info",
Help: "Build information",
}, []string{"version"}),
EventsOutstandingAcks: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_outstanding_acks",
Help: "Number of outstanding acks for events",
}),
EventsUnprocessed: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_unprocessed",
Help: "Number of unprocessed events",
}),
EventsRedelivered: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_redelivered",
Help: "Number of redelivered events",
}),
}
_ = prometheus.Register(m.BuildInfo)
// TODO: implement metrics
_ = prometheus.Register(m.EventsOutstandingAcks)
_ = prometheus.Register(m.EventsUnprocessed)
_ = prometheus.Register(m.EventsRedelivered)
return m
}

View File

@@ -1,11 +1,13 @@
package search
import (
"context"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
@@ -13,7 +15,7 @@ import (
// HandleEvents listens to the needed events,
// it handles the whole resource indexing livecycle.
func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.Logger) error {
func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, m *metrics.Metrics, logger log.Logger) error {
evts := []events.Unmarshaller{
events.ItemTrashed{},
events.ItemRestored{},
@@ -37,6 +39,10 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.
return err
}
if m != nil {
monitorMetrics(stream, "search-pull", m, logger)
}
if cfg.Events.NumConsumers == 0 {
cfg.Events.NumConsumers = 1
}
@@ -103,3 +109,25 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.
return nil
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Debug().Msg("updated event metrics")
}
}()
}

View File

@@ -31,7 +31,7 @@ var _ = DescribeTable("events",
Events: config.Events{
AsyncUploads: asyncUploads,
},
}, log.NewLogger())
}, nil, log.NewLogger())
for _, mck := range mcks {
s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {

View File

@@ -36,6 +36,7 @@ func Server(opts ...Option) (grpc.Service, func(), error) {
svc.Logger(options.Logger),
svc.JWTSecret(options.JWTSecret),
svc.TracerProvider(options.TraceProvider),
svc.Metrics(options.Metrics),
)
if err != nil {
options.Logger.Error().

View File

@@ -3,6 +3,7 @@ package service
import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"go.opentelemetry.io/otel/trace"
)
@@ -15,6 +16,7 @@ type Options struct {
Config *config.Config
JWTSecret string
TracerProvider trace.TracerProvider
Metrics *metrics.Metrics
}
func newOptions(opts ...Option) Options {
@@ -54,3 +56,12 @@ func TracerProvider(val trace.TracerProvider) Option {
o.TracerProvider = val
}
}
// Metrics provides a function to set the Metrics option.
func Metrics(val *metrics.Metrics) Option {
return func(o *Options) {
if val != nil {
o.Metrics = val
}
}
}

View File

@@ -98,7 +98,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, err
}
if err := search.HandleEvents(ss, stream, cfg, logger); err != nil {
if err := search.HandleEvents(ss, stream, cfg, options.Metrics, logger); err != nil {
return nil, teardown, err
}