Handle events asynchronously

That should help to keep up with the stream of messages and prevent
services from being flagged as slow consumers.
This commit is contained in:
André Duffeck
2026-06-18 15:05:49 +02:00
parent dbc8a21085
commit 06365d9739
3 changed files with 15 additions and 12 deletions

View File

@@ -83,7 +83,8 @@ EventLoop:
if !ok {
break EventLoop
}
cl.processEvent(event)
go cl.processEvent(event)
if cl.stopped.Load() {
break EventLoop

View File

@@ -49,17 +49,19 @@ func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ListenForEvents listens for events
func (s SSE) ListenForEvents() {
for e := range s.evChannel {
switch ev := e.Event.(type) {
default:
s.l.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
for _, uid := range ev.UserIDs {
s.sse.Publish(uid, &sse.Event{
Event: []byte(ev.Type),
Data: ev.Message,
})
go func() {
switch ev := e.Event.(type) {
default:
s.l.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
for _, uid := range ev.UserIDs {
s.sse.Publish(uid, &sse.Event{
Event: []byte(ev.Type),
Data: ev.Message,
})
}
}
}
}()
}
}

View File

@@ -102,7 +102,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
for i := 0; i < ul.cfg.MaxConcurrency; i++ {
go func(ch <-chan events.Event) {
for event := range ch {
ul.processEvent(event)
go ul.processEvent(event)
}
}(ch)
}