mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-04-12 11:28:20 -04:00
Merge pull request #8379 from fschade/multi-user-sse-event
[full-ci] enhancement: allow sending multiple userIDs in one SSE event
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
Enhancement: allow sending multiple user ids in one sse event
|
||||
|
||||
Sending multiple user ids in one sse event is now possible which reduces the number of sent events.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/8379
|
||||
https://github.com/cs3org/reva/pull/4501
|
||||
2
go.mod
2
go.mod
@@ -12,7 +12,7 @@ require (
|
||||
github.com/blevesearch/bleve/v2 v2.3.10
|
||||
github.com/coreos/go-oidc/v3 v3.9.0
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
|
||||
github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1
|
||||
github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568
|
||||
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
|
||||
github.com/disintegration/imaging v1.6.2
|
||||
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1019,8 +1019,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
|
||||
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
|
||||
github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1 h1:74SlyiFIUYcTvSFaEyeyABg3nuiDko91ve0fE++t87s=
|
||||
github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1/go.mod h1:GCN3g6uYE0Nvd31dGlhaGGyUviUfbG2NkecPRv5oSc4=
|
||||
github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568 h1:RDytVxpVjcJNLQM34yFvHkxnysVQJ0Sa4k4vuI1SH5o=
|
||||
github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568/go.mod h1:GCN3g6uYE0Nvd31dGlhaGGyUviUfbG2NkecPRv5oSc4=
|
||||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
|
||||
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
|
||||
|
||||
@@ -8,13 +8,15 @@ import (
|
||||
"reflect"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// ClientlogService is the service responsible for user activities
|
||||
@@ -116,22 +118,20 @@ func (cl *ClientlogService) processEvent(event events.Event) {
|
||||
}
|
||||
|
||||
// II) instruct sse service to send the information
|
||||
for _, id := range users {
|
||||
if err := cl.sendSSE(id, evType, data); err != nil {
|
||||
cl.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
|
||||
return
|
||||
}
|
||||
if err := cl.sendSSE(users, evType, data); err != nil {
|
||||
cl.log.Error().Err(err).Interface("userIDs", users).Str("eventid", event.ID).Msg("failed to store event for user")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *ClientlogService) sendSSE(userid string, evType string, data interface{}) error {
|
||||
func (cl *ClientlogService) sendSSE(userIDs []string, evType string, data interface{}) error {
|
||||
b, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return events.Publish(context.Background(), cl.publisher, events.SendSSE{
|
||||
UserID: userid,
|
||||
UserIDs: userIDs,
|
||||
Type: evType,
|
||||
Message: b,
|
||||
})
|
||||
|
||||
@@ -3,11 +3,12 @@ package service
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
revactx "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/r3labs/sse/v2"
|
||||
|
||||
revactx "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
|
||||
)
|
||||
@@ -51,10 +52,12 @@ func (s SSE) ListenForEvents() {
|
||||
default:
|
||||
s.l.Error().Interface("event", ev).Msg("unhandled event")
|
||||
case events.SendSSE:
|
||||
s.sse.Publish(ev.UserID, &sse.Event{
|
||||
Event: []byte(ev.Type),
|
||||
Data: ev.Message,
|
||||
})
|
||||
for _, uid := range ev.UserIDs {
|
||||
s.sse.Publish(uid, &sse.Event{
|
||||
Event: []byte(ev.Type),
|
||||
Data: ev.Message,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,10 +10,14 @@ import (
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
"github.com/go-chi/chi/v5"
|
||||
micrometadata "go-micro.dev/v4/metadata"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/roles"
|
||||
@@ -22,9 +26,6 @@ import (
|
||||
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
|
||||
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
|
||||
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
|
||||
micrometadata "go-micro.dev/v4/metadata"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// UserlogService is the service responsible for user activities
|
||||
@@ -189,16 +190,18 @@ func (ul *UserlogService) processEvent(event events.Event) {
|
||||
|
||||
// III) store the eventID for each user
|
||||
for _, id := range users {
|
||||
if !ul.cfg.DisableSSE {
|
||||
if err := ul.sendSSE(ctx, id, event, gwc); err != nil {
|
||||
ul.log.Error().Err(err).Str("userid", id).Str("eventid", event.ID).Msg("cannot create sse event")
|
||||
}
|
||||
}
|
||||
if err := ul.addEventToUser(ctx, id, event); err != nil {
|
||||
if err := ul.addEventToUser(id, event); err != nil {
|
||||
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// IV) send sses
|
||||
if !ul.cfg.DisableSSE {
|
||||
if err := ul.sendSSE(ctx, users, event, gwc); err != nil {
|
||||
ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetEvents allows retrieving events from the eventhistory by userid
|
||||
@@ -229,12 +232,12 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehms
|
||||
|
||||
// remove expired events from list asynchronously
|
||||
go func() {
|
||||
if err := ul.removeExpiredEvents(userid, eventIDs, resp.Events); err != nil {
|
||||
if err := ul.removeExpiredEvents(userid, eventIDs, resp.GetEvents()); err != nil {
|
||||
ul.log.Error().Err(err).Str("userid", userid).Msg("could not remove expired events from user")
|
||||
}
|
||||
}()
|
||||
|
||||
return resp.Events, nil
|
||||
return resp.GetEvents(), nil
|
||||
}
|
||||
|
||||
// DeleteEvents will delete the specified events
|
||||
@@ -247,7 +250,7 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error {
|
||||
return ul.alterUserEventList(userid, func(ids []string) []string {
|
||||
var newids []string
|
||||
for _, id := range ids {
|
||||
if _, delete := toDelete[id]; delete {
|
||||
if _, del := toDelete[id]; del {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -330,28 +333,52 @@ func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []stri
|
||||
})
|
||||
}
|
||||
|
||||
func (ul *UserlogService) addEventToUser(ctx context.Context, userid string, event events.Event) error {
|
||||
func (ul *UserlogService) addEventToUser(userid string, event events.Event) error {
|
||||
return ul.alterUserEventList(userid, func(ids []string) []string {
|
||||
return append(ids, event.ID)
|
||||
})
|
||||
}
|
||||
|
||||
func (ul *UserlogService) sendSSE(ctx context.Context, userid string, event events.Event, gwc gateway.GatewayAPIClient) error {
|
||||
ev, err := NewConverter(ctx, ul.getUserLocale(userid), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
|
||||
if err != nil {
|
||||
return err
|
||||
func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error {
|
||||
m := make(map[string]events.SendSSE)
|
||||
|
||||
for _, userid := range userIDs {
|
||||
loc := ul.getUserLocale(userid)
|
||||
if ev, ok := m[loc]; ok {
|
||||
ev.UserIDs = append(m[loc].UserIDs, userid)
|
||||
m[loc] = ev
|
||||
continue
|
||||
}
|
||||
|
||||
ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
|
||||
if err != nil {
|
||||
if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) {
|
||||
// the resource was not found, we assume it is deleted
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m[loc] = events.SendSSE{
|
||||
UserIDs: []string{userid},
|
||||
Type: "userlog-notification",
|
||||
Message: b,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
b, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
for _, ev := range m {
|
||||
if err := events.Publish(ctx, ul.publisher, ev); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return events.Publish(context.Background(), ul.publisher, events.SendSSE{
|
||||
UserID: userid,
|
||||
Type: "userlog-notification",
|
||||
Message: b,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error {
|
||||
|
||||
4
vendor/github.com/cs3org/reva/v2/pkg/events/sse.go
generated
vendored
4
vendor/github.com/cs3org/reva/v2/pkg/events/sse.go
generated
vendored
@@ -4,9 +4,9 @@ import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// SendSEE instructs the sse service to send a notification to a user
|
||||
// SendSSE instructs the sse service to send one or multiple notifications
|
||||
type SendSSE struct {
|
||||
UserID string
|
||||
UserIDs []string
|
||||
Type string
|
||||
Message []byte
|
||||
}
|
||||
|
||||
@@ -50,10 +50,16 @@ func (i *Index) Load(index string) (map[string]string, error) {
|
||||
}
|
||||
|
||||
// Add adds an entry to an index
|
||||
// Consider calling AddAll() when trying to add multiple entries as every Add call has to lock the index
|
||||
func (i *Index) Add(index, key string, value string) error {
|
||||
return i.updateIndex(index, map[string]string{key: value}, []string{})
|
||||
}
|
||||
|
||||
// AddAll adds multiple entries to the index
|
||||
func (i *Index) AddAll(index string, m map[string]string) error {
|
||||
return i.updateIndex(index, m, []string{})
|
||||
}
|
||||
|
||||
// Remove removes an entry from the index
|
||||
func (i *Index) Remove(index, key string) error {
|
||||
return i.updateIndex(index, map[string]string{}, []string{key})
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -362,7 +362,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
|
||||
# github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1
|
||||
# github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568
|
||||
## explicit; go 1.21
|
||||
github.com/cs3org/reva/v2/cmd/revad/internal/grace
|
||||
github.com/cs3org/reva/v2/cmd/revad/runtime
|
||||
|
||||
Reference in New Issue
Block a user