mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-03-31 22:02:29 -04:00
Add tracing to more places in userlog.
This passes the treaceprovider to more places where it belongs and also adds tracing to the event processing.
This commit is contained in:
@@ -19,8 +19,7 @@ import (
|
||||
)
|
||||
|
||||
// Service is the service interface
|
||||
type Service interface {
|
||||
}
|
||||
type Service interface{}
|
||||
|
||||
// Server initializes the http service and server.
|
||||
func Server(opts ...Option) (http.Service, error) {
|
||||
@@ -90,6 +89,7 @@ func Server(opts ...Option) (http.Service, error) {
|
||||
svc.ValueClient(options.ValueClient),
|
||||
svc.RoleClient(options.RoleClient),
|
||||
svc.RegisteredEvents(options.RegisteredEvents),
|
||||
svc.TraceProvider(options.TracerProvider),
|
||||
)
|
||||
if err != nil {
|
||||
return http.Service{}, err
|
||||
|
||||
@@ -24,7 +24,7 @@ func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// HandleGetEvents is the GET handler for events
|
||||
func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := tracer.Start(r.Context(), "HandleGetEvents")
|
||||
ctx, span := ul.tracer.Start(r.Context(), "HandleGetEvents")
|
||||
defer span.End()
|
||||
u, ok := revactx.ContextGetUser(ctx)
|
||||
if !ok {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
|
||||
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Option for the userlog service
|
||||
@@ -27,6 +28,7 @@ type Options struct {
|
||||
ValueClient settingssvc.ValueService
|
||||
RoleClient settingssvc.RoleService
|
||||
RegisteredEvents []events.Unmarshaller
|
||||
TraceProvider trace.TracerProvider
|
||||
}
|
||||
|
||||
// Logger configures a logger for the userlog service
|
||||
@@ -98,3 +100,10 @@ func RoleClient(rs settingssvc.RoleService) Option {
|
||||
o.RoleClient = rs
|
||||
}
|
||||
}
|
||||
|
||||
// TraceProvider adds a tracer provider for the userlog service
|
||||
func TraceProvider(tp trace.TracerProvider) Option {
|
||||
return func(o *Options) {
|
||||
o.TraceProvider = tp
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,17 +29,10 @@ import (
|
||||
"github.com/r3labs/sse/v2"
|
||||
micrometadata "go-micro.dev/v4/metadata"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
var tracer trace.Tracer
|
||||
|
||||
func init() {
|
||||
tracer = otel.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service")
|
||||
}
|
||||
|
||||
// UserlogService is the service responsible for user activities
|
||||
type UserlogService struct {
|
||||
log log.Logger
|
||||
@@ -51,6 +44,8 @@ type UserlogService struct {
|
||||
valueClient settingssvc.ValueService
|
||||
sse *sse.Server
|
||||
registeredEvents map[string]events.Unmarshaller
|
||||
tp trace.TracerProvider
|
||||
tracer trace.Tracer
|
||||
}
|
||||
|
||||
// NewUserlogService returns an EventHistory service
|
||||
@@ -78,6 +73,8 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
|
||||
gatewaySelector: o.GatewaySelector,
|
||||
valueClient: o.ValueClient,
|
||||
registeredEvents: make(map[string]events.Unmarshaller),
|
||||
tp: o.TraceProvider,
|
||||
tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service"),
|
||||
}
|
||||
|
||||
if !ul.cfg.DisableSSE {
|
||||
@@ -114,91 +111,95 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
|
||||
// MemorizeEvents stores eventIDs a user wants to receive
|
||||
func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
|
||||
for event := range ch {
|
||||
// for each event we need to:
|
||||
// I) find users eligible to receive the event
|
||||
var (
|
||||
users []string
|
||||
executant *user.UserId
|
||||
err error
|
||||
)
|
||||
ul.processEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
switch e := event.Event.(type) {
|
||||
func (ul *UserlogService) processEvent(event events.Event) {
|
||||
// for each event we need to:
|
||||
// I) find users eligible to receive the event
|
||||
var (
|
||||
users []string
|
||||
executant *user.UserId
|
||||
err error
|
||||
)
|
||||
|
||||
switch e := event.Event.(type) {
|
||||
default:
|
||||
err = errors.New("unhandled event")
|
||||
// file related
|
||||
case events.PostprocessingStepFinished:
|
||||
switch e.FinishedStep {
|
||||
case events.PPStepAntivirus:
|
||||
result := e.Result.(events.VirusscanResult)
|
||||
if !result.Infected {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: should space mangers also be informed?
|
||||
users = append(users, e.ExecutingUser.GetId().GetOpaqueId())
|
||||
case events.PPStepPolicies:
|
||||
if e.Outcome == events.PPOutcomeContinue {
|
||||
return
|
||||
}
|
||||
users = append(users, e.ExecutingUser.GetId().GetOpaqueId())
|
||||
default:
|
||||
err = errors.New("unhandled event")
|
||||
// file related
|
||||
case events.PostprocessingStepFinished:
|
||||
switch e.FinishedStep {
|
||||
case events.PPStepAntivirus:
|
||||
result := e.Result.(events.VirusscanResult)
|
||||
if !result.Infected {
|
||||
continue
|
||||
}
|
||||
return
|
||||
|
||||
// TODO: should space mangers also be informed?
|
||||
users = append(users, e.ExecutingUser.GetId().GetOpaqueId())
|
||||
case events.PPStepPolicies:
|
||||
if e.Outcome == events.PPOutcomeContinue {
|
||||
continue
|
||||
}
|
||||
users = append(users, e.ExecutingUser.GetId().GetOpaqueId())
|
||||
default:
|
||||
continue
|
||||
|
||||
}
|
||||
// space related // TODO: how to find spaceadmins?
|
||||
case events.SpaceDisabled:
|
||||
executant = e.Executant
|
||||
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer)
|
||||
case events.SpaceDeleted:
|
||||
executant = e.Executant
|
||||
for u := range e.FinalMembers {
|
||||
users = append(users, u)
|
||||
}
|
||||
case events.SpaceShared:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.SpaceUnshared:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.SpaceMembershipExpired:
|
||||
users, err = ul.resolveID(ul.impersonate(e.SpaceOwner), e.GranteeUserID, e.GranteeGroupID)
|
||||
|
||||
// share related
|
||||
case events.ShareCreated:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.ShareRemoved:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.ShareExpired:
|
||||
users, err = ul.resolveID(ul.impersonate(e.ShareOwner), e.GranteeUserID, e.GranteeGroupID)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// TODO: Find out why this errors on ci pipeline
|
||||
ul.log.Debug().Err(err).Interface("event", event).Msg("error gathering members for event")
|
||||
continue
|
||||
// space related // TODO: how to find spaceadmins?
|
||||
case events.SpaceDisabled:
|
||||
executant = e.Executant
|
||||
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer)
|
||||
case events.SpaceDeleted:
|
||||
executant = e.Executant
|
||||
for u := range e.FinalMembers {
|
||||
users = append(users, u)
|
||||
}
|
||||
case events.SpaceShared:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.SpaceUnshared:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.SpaceMembershipExpired:
|
||||
users, err = ul.resolveID(ul.impersonate(e.SpaceOwner), e.GranteeUserID, e.GranteeGroupID)
|
||||
|
||||
// II) filter users who want to receive the event
|
||||
// This step is postponed for later.
|
||||
// For now each user should get all events she is eligible to receive
|
||||
// ...except notifications for their own actions
|
||||
users = removeExecutant(users, executant)
|
||||
// share related
|
||||
case events.ShareCreated:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.ShareRemoved:
|
||||
executant = e.Executant
|
||||
users, err = ul.resolveID(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID)
|
||||
case events.ShareExpired:
|
||||
users, err = ul.resolveID(ul.impersonate(e.ShareOwner), e.GranteeUserID, e.GranteeGroupID)
|
||||
}
|
||||
|
||||
// III) store the eventID for each user
|
||||
for _, id := range users {
|
||||
if err := ul.addEventToUser(id, event); err != nil {
|
||||
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: Find out why this errors on ci pipeline
|
||||
ul.log.Debug().Err(err).Interface("event", event).Msg("error gathering members for event")
|
||||
return
|
||||
}
|
||||
|
||||
// II) filter users who want to receive the event
|
||||
// This step is postponed for later.
|
||||
// For now each user should get all events she is eligible to receive
|
||||
// ...except notifications for their own actions
|
||||
users = removeExecutant(users, executant)
|
||||
|
||||
// III) store the eventID for each user
|
||||
for _, id := range users {
|
||||
if err := ul.addEventToUser(id, event); err != nil {
|
||||
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetEvents allows retrieving events from the eventhistory by userid
|
||||
func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) {
|
||||
ctx, span := tracer.Start(ctx, "GetEvents")
|
||||
ctx, span := ul.tracer.Start(ctx, "GetEvents")
|
||||
defer span.End()
|
||||
rec, err := ul.store.Read(userid)
|
||||
if err != nil && err != store.ErrNotFound {
|
||||
@@ -227,11 +228,9 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehms
|
||||
if err := ul.removeExpiredEvents(userid, eventIDs, resp.Events); err != nil {
|
||||
ul.log.Error().Err(err).Str("userid", userid).Msg("could not remove expired events from user")
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return resp.Events, nil
|
||||
|
||||
}
|
||||
|
||||
// DeleteEvents will delete the specified events
|
||||
@@ -256,7 +255,7 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error {
|
||||
|
||||
// StoreGlobalEvent will store a global event that will be returned with each `GetEvents` request
|
||||
func (ul *UserlogService) StoreGlobalEvent(ctx context.Context, typ string, data map[string]string) error {
|
||||
ctx, span := tracer.Start(ctx, "StoreGlobalEvent")
|
||||
ctx, span := ul.tracer.Start(ctx, "StoreGlobalEvent")
|
||||
defer span.End()
|
||||
switch typ {
|
||||
default:
|
||||
@@ -293,12 +292,11 @@ func (ul *UserlogService) StoreGlobalEvent(ctx context.Context, typ string, data
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// GetGlobalEvents will return all global events
|
||||
func (ul *UserlogService) GetGlobalEvents(ctx context.Context) (map[string]json.RawMessage, error) {
|
||||
_, span := tracer.Start(ctx, "GetGlobalEvents")
|
||||
_, span := ul.tracer.Start(ctx, "GetGlobalEvents")
|
||||
defer span.End()
|
||||
out := make(map[string]json.RawMessage)
|
||||
|
||||
@@ -318,7 +316,7 @@ func (ul *UserlogService) GetGlobalEvents(ctx context.Context) (map[string]json.
|
||||
|
||||
// DeleteGlobalEvents will delete the specified event
|
||||
func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []string) error {
|
||||
_, span := tracer.Start(ctx, "DeleteGlobalEvents")
|
||||
_, span := ul.tracer.Start(ctx, "DeleteGlobalEvents")
|
||||
defer span.End()
|
||||
return ul.alterGlobalEvents(ctx, func(evs map[string]json.RawMessage) error {
|
||||
for _, name := range evnames {
|
||||
@@ -406,7 +404,7 @@ func (ul *UserlogService) alterUserEventList(userid string, alter func([]string)
|
||||
}
|
||||
|
||||
func (ul *UserlogService) alterGlobalEvents(ctx context.Context, alter func(map[string]json.RawMessage) error) error {
|
||||
_, span := tracer.Start(ctx, "alterGlobalEvents")
|
||||
_, span := ul.tracer.Start(ctx, "alterGlobalEvents")
|
||||
defer span.End()
|
||||
evs, err := ul.GetGlobalEvents(ctx)
|
||||
if err != nil && err != store.ErrNotFound {
|
||||
|
||||
Reference in New Issue
Block a user