diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 3dba5cacac..696055de2d 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -190,16 +190,18 @@ func (ul *UserlogService) processEvent(event events.Event) { // III) store the eventID for each user for _, id := range users { - if !ul.cfg.DisableSSE { - if err := ul.sendSSE(ctx, id, event, gwc); err != nil { - ul.log.Error().Err(err).Str("userid", id).Str("eventid", event.ID).Msg("cannot create sse event") - } - } - if err := ul.addEventToUser(ctx, id, event); err != nil { + if err := ul.addEventToUser(id, event); err != nil { ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") return } } + + // IV) send sses + if !ul.cfg.DisableSSE { + if err := ul.sendSSE(ctx, users, event, gwc); err != nil { + ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event") + } + } } // GetEvents allows retrieving events from the eventhistory by userid @@ -230,12 +232,12 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehms // remove expired events from list asynchronously go func() { - if err := ul.removeExpiredEvents(userid, eventIDs, resp.Events); err != nil { + if err := ul.removeExpiredEvents(userid, eventIDs, resp.GetEvents()); err != nil { ul.log.Error().Err(err).Str("userid", userid).Msg("could not remove expired events from user") } }() - return resp.Events, nil + return resp.GetEvents(), nil } // DeleteEvents will delete the specified events @@ -248,7 +250,7 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { return ul.alterUserEventList(userid, func(ids []string) []string { var newids []string for _, id := range ids { - if _, delete := toDelete[id]; delete { + if _, del := toDelete[id]; del { continue } @@ -331,28 +333,52 @@ func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []stri }) } -func (ul *UserlogService) addEventToUser(ctx context.Context, userid string, event events.Event) error { +func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { return ul.alterUserEventList(userid, func(ids []string) []string { return append(ids, event.ID) }) } -func (ul *UserlogService) sendSSE(ctx context.Context, userid string, event events.Event, gwc gateway.GatewayAPIClient) error { - ev, err := NewConverter(ctx, ul.getUserLocale(userid), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event) - if err != nil { - return err +func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error { + m := make(map[string]events.SendSSE) + + for _, userid := range userIDs { + loc := ul.getUserLocale(userid) + if ev, ok := m[loc]; ok { + ev.UserIDs = append(m[loc].UserIDs, userid) + m[loc] = ev + continue + } + + ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event) + if err != nil { + if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) { + // the resource was not found, we assume it is deleted + continue + } + return err + } + + b, err := json.Marshal(ev) + if err != nil { + return err + } + + m[loc] = events.SendSSE{ + UserIDs: []string{userid}, + Type: "userlog-notification", + Message: b, + } + } - b, err := json.Marshal(ev) - if err != nil { - return err + for _, ev := range m { + if err := events.Publish(ctx, ul.publisher, ev); err != nil { + return err + } } - return events.Publish(context.Background(), ul.publisher, events.SendSSE{ - UserIDs: []string{userid}, - Type: "userlog-notification", - Message: b, - }) + return nil } func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error {