adjust userlog service

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-08-18 11:20:32 +02:00
parent 1bfdc43054
commit 91176db30d
8 changed files with 31 additions and 59 deletions

View File

@@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
@@ -121,7 +121,7 @@ func Server(cfg *config.Config) *cli.Command {
http.Config(cfg),
http.Metrics(mtrcs),
http.Store(st),
http.Consumer(consumer),
http.Stream(stream),
http.GatewaySelector(gatewaySelector),
http.History(hClient),
http.Value(vClient),

View File

@@ -28,7 +28,7 @@ type Config struct {
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."`
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications."`
GlobalNotificationsSecret string `yaml:"global_notifications_secret" env:"USERLOG_GLOBAL_NOTIFICATIONS_SECRET" desc:"The secret to secure the global notifications endpoint. Only system admins and users knowing that secret can call the global notifications POST/DELETE endpoints."`

View File

@@ -28,7 +28,7 @@ type Options struct {
Flags []cli.Flag
Namespace string
Store store.Store
Consumer events.Consumer
Stream events.Stream
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
HistoryClient ehsvc.EventHistoryService
ValueClient settingssvc.ValueService
@@ -97,10 +97,10 @@ func Store(store store.Store) Option {
}
}
// Consumer provides a function to configure the consumer
func Consumer(consumer events.Consumer) Option {
// Stream provides a function to configure the stream
func Stream(stream events.Stream) Option {
return func(o *Options) {
o.Consumer = consumer
o.Stream = stream
}
}

View File

@@ -80,7 +80,7 @@ func Server(opts ...Option) (http.Service, error) {
handle, err := svc.NewUserlogService(
svc.Logger(options.Logger),
svc.Consumer(options.Consumer),
svc.Stream(options.Stream),
svc.Mux(mux),
svc.Store(options.Store),
svc.Config(options.Config),

View File

@@ -91,33 +91,6 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.Write(b)
}
// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}
uid := u.GetId().GetOpaqueId()
if uid == "" {
ul.log.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}
stream := ul.sse.CreateStream(uid)
stream.AutoReplay = false
// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()
ul.sse.ServeHTTP(w, r)
}
// HandlePostGlobaelEvent is the POST handler for global events
func (ul *UserlogService) HandlePostGlobalEvent(w http.ResponseWriter, r *http.Request) {
var req PostEventsRequest

View File

@@ -19,7 +19,7 @@ type Option func(*Options)
// Options for the userlog service
type Options struct {
Logger log.Logger
Consumer events.Consumer
Stream events.Stream
Mux *chi.Mux
Store store.Store
Config *config.Config
@@ -38,10 +38,10 @@ func Logger(log log.Logger) Option {
}
}
// Consumer configures an event consumer for the userlog service
func Consumer(c events.Consumer) Option {
// Stream configures an event stream for the userlog service
func Stream(s events.Stream) Option {
return func(o *Options) {
o.Consumer = c
o.Stream = s
}
}

View File

@@ -26,7 +26,6 @@ import (
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/r3labs/sse/v2"
micrometadata "go-micro.dev/v4/metadata"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
@@ -42,10 +41,10 @@ type UserlogService struct {
historyClient ehsvc.EventHistoryService
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
valueClient settingssvc.ValueService
sse *sse.Server
registeredEvents map[string]events.Unmarshaller
tp trace.TracerProvider
tracer trace.Tracer
publisher events.Publisher
}
// NewUserlogService returns an EventHistory service
@@ -55,11 +54,11 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
opt(o)
}
if o.Consumer == nil || o.Store == nil {
return nil, fmt.Errorf("need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store)
if o.Stream == nil || o.Store == nil {
return nil, fmt.Errorf("need non nil stream (%v) and store (%v) to work properly", o.Stream, o.Store)
}
ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...)
ch, err := events.Consume(o.Stream, "userlog", o.RegisteredEvents...)
if err != nil {
return nil, err
}
@@ -75,10 +74,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
registeredEvents: make(map[string]events.Unmarshaller),
tp: o.TraceProvider,
tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service"),
}
if !ul.cfg.DisableSSE {
ul.sse = sse.New()
publisher: o.Stream,
}
for _, e := range o.RegisteredEvents {
@@ -97,10 +93,6 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
r.Delete("/", ul.HandleDeleteEvents)
r.Post("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandlePostGlobalEvent))
r.Delete("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandleDeleteGlobalEvent))
if !ul.cfg.DisableSSE {
r.Get("/sse", ul.HandleSSE)
}
})
go ul.MemorizeEvents(ch)
@@ -348,8 +340,11 @@ func (ul *UserlogService) sendSSE(userid string, event events.Event) error {
return err
}
ul.sse.Publish(userid, &sse.Event{Data: b})
return nil
return events.Publish(context.Background(), ul.publisher, events.SendSSE{
UserID: userid,
Type: "userlog-notification",
Message: b,
})
}
func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error {

View File

@@ -79,7 +79,7 @@ var _ = Describe("UserlogService", func() {
ul, err = service.NewUserlogService(
service.Config(cfg),
service.Consumer(bus),
service.Stream(bus),
service.Store(sto),
service.Logger(log.NewLogger()),
service.Mux(chi.NewMux()),
@@ -96,9 +96,9 @@ var _ = Describe("UserlogService", func() {
It("it stores, returns and deletes a couple of events", func() {
ids := make(map[string]struct{})
ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
// ids[bus.Publish(events.SpaceMembershipExpired{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{}
// ids[bus.Publish(events.ShareCreated{Executant: &user.UserId{OpaqueId: "userid"}})] = struct{}{}
@@ -156,7 +156,11 @@ func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan micr
return ch, nil
}
func (tb testBus) Publish(e interface{}) string {
func (tb testBus) Publish(_ string, _ interface{}, _ ...microevents.PublishOption) error {
return nil
}
func (tb testBus) publish(e interface{}) string {
ev := events.Event{
ID: uuid.New().String(),
Type: reflect.TypeOf(e).String(),