diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 65a6790108..5888a31834 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -88,7 +88,7 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store. m := metrics.New() m.BuildInfo.WithLabelValues(version.GetString()).Set(1) - monitorMetrics(raw, "postprocessing-pull", m, logger) + monitorMetrics(ctx, raw, "postprocessing-pull", m, logger) return &PostprocessingService{ ctx: ctx, @@ -423,25 +423,30 @@ func (pps *PostprocessingService) findUploadsByStep(step events.Postprocessingst return ids } -func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { - ctx := context.Background() +func monitorMetrics(ctx context.Context, stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { consumer, err := stream.JetStream().Consumer(ctx, name) if err != nil { logger.Error().Err(err).Msg("failed to get consumer") } ticker := time.NewTicker(5 * time.Second) go func() { - for range ticker.C { - info, err := consumer.Info(ctx) - if err != nil { - logger.Error().Err(err).Msg("failed to get consumer") - continue - } + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + info, err := consumer.Info(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + continue + } - m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) - m.EventsUnprocessed.Set(float64(info.NumPending)) - m.EventsRedelivered.Set(float64(info.NumRedelivered)) - logger.Trace().Msg("updated postprocessing event metrics") + m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) + m.EventsUnprocessed.Set(float64(info.NumPending)) + m.EventsRedelivered.Set(float64(info.NumRedelivered)) + logger.Trace().Msg("updated postprocessing event metrics") + } } }() } diff --git a/services/search/pkg/service/event/service.go b/services/search/pkg/service/event/service.go index adbbd70aa5..04d8f2076a 100644 --- a/services/search/pkg/service/event/service.go +++ b/services/search/pkg/service/event/service.go @@ -90,7 +90,7 @@ func (s Service) Run() error { } if s.m != nil { - monitorMetrics(s.stream, "search-pull", s.m, s.log) + monitorMetrics(s.ctx, s.stream, "search-pull", s.m, s.log) } var wg sync.WaitGroup @@ -208,25 +208,30 @@ func (s Service) processEvent(e raw.Event) error { return nil } -func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { - ctx := context.Background() +func monitorMetrics(ctx context.Context, stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { consumer, err := stream.JetStream().Consumer(ctx, name) if err != nil { logger.Error().Err(err).Msg("failed to get consumer") } ticker := time.NewTicker(5 * time.Second) go func() { - for range ticker.C { - info, err := consumer.Info(ctx) - if err != nil { - logger.Error().Err(err).Msg("failed to get consumer") - continue - } + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + info, err := consumer.Info(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + continue + } - m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) - m.EventsUnprocessed.Set(float64(info.NumPending)) - m.EventsRedelivered.Set(float64(info.NumRedelivered)) - logger.Trace().Msg("updated search event metrics") + m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) + m.EventsUnprocessed.Set(float64(info.NumPending)) + m.EventsRedelivered.Set(float64(info.NumRedelivered)) + logger.Trace().Msg("updated search event metrics") + } } }() }