stop metrics tickers on context cancel

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2026-04-28 13:00:01 +02:00
committed by Ralf Haferkamp
parent 7a149787d0
commit 2d1cc3fb3a
2 changed files with 36 additions and 26 deletions

View File

@@ -88,7 +88,7 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
m := metrics.New()
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
monitorMetrics(raw, "postprocessing-pull", m, logger)
monitorMetrics(ctx, raw, "postprocessing-pull", m, logger)
return &PostprocessingService{
ctx: ctx,
@@ -423,25 +423,30 @@ func (pps *PostprocessingService) findUploadsByStep(step events.Postprocessingst
return ids
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
func monitorMetrics(ctx context.Context, stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
continue
}
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
continue
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated postprocessing event metrics")
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated postprocessing event metrics")
}
}
}()
}

View File

@@ -90,7 +90,7 @@ func (s Service) Run() error {
}
if s.m != nil {
monitorMetrics(s.stream, "search-pull", s.m, s.log)
monitorMetrics(s.ctx, s.stream, "search-pull", s.m, s.log)
}
var wg sync.WaitGroup
@@ -208,25 +208,30 @@ func (s Service) processEvent(e raw.Event) error {
return nil
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
func monitorMetrics(ctx context.Context, stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
continue
}
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
continue
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated search event metrics")
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated search event metrics")
}
}
}()
}