diff --git a/changelog/unreleased/enhancement-allow-multiple-event-user-ids.md b/changelog/unreleased/enhancement-allow-multiple-event-user-ids.md new file mode 100644 index 0000000000..0fd033cea7 --- /dev/null +++ b/changelog/unreleased/enhancement-allow-multiple-event-user-ids.md @@ -0,0 +1,6 @@ +Enhancement: allow sending multiple user ids in one sse event + +Sending multiple user ids in one sse event is now possible which reduces the number of sent events. + +https://github.com/owncloud/ocis/pull/8379 +https://github.com/cs3org/reva/pull/4501 diff --git a/go.mod b/go.mod index 2537f1cffc..93ac2cc98f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/blevesearch/bleve/v2 v2.3.10 github.com/coreos/go-oidc/v3 v3.9.0 github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 - github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1 + github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568 github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e diff --git a/go.sum b/go.sum index be0d8686f8..244e7dbd7a 100644 --- a/go.sum +++ b/go.sum @@ -1019,8 +1019,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c= github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1 h1:74SlyiFIUYcTvSFaEyeyABg3nuiDko91ve0fE++t87s= -github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1/go.mod h1:GCN3g6uYE0Nvd31dGlhaGGyUviUfbG2NkecPRv5oSc4= +github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568 h1:RDytVxpVjcJNLQM34yFvHkxnysVQJ0Sa4k4vuI1SH5o= +github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568/go.mod h1:GCN3g6uYE0Nvd31dGlhaGGyUviUfbG2NkecPRv5oSc4= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index 22b61cc0cf..f905c8984c 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -8,13 +8,15 @@ import ( "reflect" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "go.opentelemetry.io/otel/trace" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/clientlog/pkg/config" - "go.opentelemetry.io/otel/trace" ) // ClientlogService is the service responsible for user activities @@ -116,22 +118,20 @@ func (cl *ClientlogService) processEvent(event events.Event) { } // II) instruct sse service to send the information - for _, id := range users { - if err := cl.sendSSE(id, evType, data); err != nil { - cl.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") - return - } + if err := cl.sendSSE(users, evType, data); err != nil { + cl.log.Error().Err(err).Interface("userIDs", users).Str("eventid", event.ID).Msg("failed to store event for user") + return } } -func (cl *ClientlogService) sendSSE(userid string, evType string, data interface{}) error { +func (cl *ClientlogService) sendSSE(userIDs []string, evType string, data interface{}) error { b, err := json.Marshal(data) if err != nil { return err } return events.Publish(context.Background(), cl.publisher, events.SendSSE{ - UserID: userid, + UserIDs: userIDs, Type: evType, Message: b, }) diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go index 23979d8664..aac0cdd426 100644 --- a/services/sse/pkg/service/service.go +++ b/services/sse/pkg/service/service.go @@ -3,11 +3,12 @@ package service import ( "net/http" - revactx "github.com/cs3org/reva/v2/pkg/ctx" - "github.com/cs3org/reva/v2/pkg/events" "github.com/go-chi/chi/v5" "github.com/r3labs/sse/v2" + revactx "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/sse/pkg/config" ) @@ -51,10 +52,12 @@ func (s SSE) ListenForEvents() { default: s.l.Error().Interface("event", ev).Msg("unhandled event") case events.SendSSE: - s.sse.Publish(ev.UserID, &sse.Event{ - Event: []byte(ev.Type), - Data: ev.Message, - }) + for _, uid := range ev.UserIDs { + s.sse.Publish(uid, &sse.Event{ + Event: []byte(ev.Type), + Data: ev.Message, + }) + } } } } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 684b9c8f3d..696055de2d 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -10,10 +10,14 @@ import ( gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/go-chi/chi/v5" + micrometadata "go-micro.dev/v4/metadata" + "go-micro.dev/v4/store" + "go.opentelemetry.io/otel/trace" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/go-chi/chi/v5" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/middleware" "github.com/owncloud/ocis/v2/ocis-pkg/roles" @@ -22,9 +26,6 @@ import ( settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" - micrometadata "go-micro.dev/v4/metadata" - "go-micro.dev/v4/store" - "go.opentelemetry.io/otel/trace" ) // UserlogService is the service responsible for user activities @@ -189,16 +190,18 @@ func (ul *UserlogService) processEvent(event events.Event) { // III) store the eventID for each user for _, id := range users { - if !ul.cfg.DisableSSE { - if err := ul.sendSSE(ctx, id, event, gwc); err != nil { - ul.log.Error().Err(err).Str("userid", id).Str("eventid", event.ID).Msg("cannot create sse event") - } - } - if err := ul.addEventToUser(ctx, id, event); err != nil { + if err := ul.addEventToUser(id, event); err != nil { ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") return } } + + // IV) send sses + if !ul.cfg.DisableSSE { + if err := ul.sendSSE(ctx, users, event, gwc); err != nil { + ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event") + } + } } // GetEvents allows retrieving events from the eventhistory by userid @@ -229,12 +232,12 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehms // remove expired events from list asynchronously go func() { - if err := ul.removeExpiredEvents(userid, eventIDs, resp.Events); err != nil { + if err := ul.removeExpiredEvents(userid, eventIDs, resp.GetEvents()); err != nil { ul.log.Error().Err(err).Str("userid", userid).Msg("could not remove expired events from user") } }() - return resp.Events, nil + return resp.GetEvents(), nil } // DeleteEvents will delete the specified events @@ -247,7 +250,7 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { return ul.alterUserEventList(userid, func(ids []string) []string { var newids []string for _, id := range ids { - if _, delete := toDelete[id]; delete { + if _, del := toDelete[id]; del { continue } @@ -330,28 +333,52 @@ func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []stri }) } -func (ul *UserlogService) addEventToUser(ctx context.Context, userid string, event events.Event) error { +func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { return ul.alterUserEventList(userid, func(ids []string) []string { return append(ids, event.ID) }) } -func (ul *UserlogService) sendSSE(ctx context.Context, userid string, event events.Event, gwc gateway.GatewayAPIClient) error { - ev, err := NewConverter(ctx, ul.getUserLocale(userid), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event) - if err != nil { - return err +func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error { + m := make(map[string]events.SendSSE) + + for _, userid := range userIDs { + loc := ul.getUserLocale(userid) + if ev, ok := m[loc]; ok { + ev.UserIDs = append(m[loc].UserIDs, userid) + m[loc] = ev + continue + } + + ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event) + if err != nil { + if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) { + // the resource was not found, we assume it is deleted + continue + } + return err + } + + b, err := json.Marshal(ev) + if err != nil { + return err + } + + m[loc] = events.SendSSE{ + UserIDs: []string{userid}, + Type: "userlog-notification", + Message: b, + } + } - b, err := json.Marshal(ev) - if err != nil { - return err + for _, ev := range m { + if err := events.Publish(ctx, ul.publisher, ev); err != nil { + return err + } } - return events.Publish(context.Background(), ul.publisher, events.SendSSE{ - UserID: userid, - Type: "userlog-notification", - Message: b, - }) + return nil } func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/sse.go b/vendor/github.com/cs3org/reva/v2/pkg/events/sse.go index 34841c81f3..fa3459d9f5 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/sse.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/sse.go @@ -4,9 +4,9 @@ import ( "encoding/json" ) -// SendSEE instructs the sse service to send a notification to a user +// SendSSE instructs the sse service to send one or multiple notifications type SendSSE struct { - UserID string + UserIDs []string Type string Message []byte } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex/spaceidindex.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex/spaceidindex.go index 4d9add3895..d9dc9c3597 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex/spaceidindex.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex/spaceidindex.go @@ -50,10 +50,16 @@ func (i *Index) Load(index string) (map[string]string, error) { } // Add adds an entry to an index +// Consider calling AddAll() when trying to add multiple entries as every Add call has to lock the index func (i *Index) Add(index, key string, value string) error { return i.updateIndex(index, map[string]string{key: value}, []string{}) } +// AddAll adds multiple entries to the index +func (i *Index) AddAll(index string, m map[string]string) error { + return i.updateIndex(index, m, []string{}) +} + // Remove removes an entry from the index func (i *Index) Remove(index, key string) error { return i.updateIndex(index, map[string]string{}, []string{key}) diff --git a/vendor/modules.txt b/vendor/modules.txt index a616ffaad3..4aef0ec19f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -362,7 +362,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.18.1-0.20240206122233-bf89f7aaedd1 +# github.com/cs3org/reva/v2 v2.18.1-0.20240206135152-2343fe56d568 ## explicit; go 1.21 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime