diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go index c85eac6189..dd70fb297e 100644 --- a/services/groupware/pkg/groupware/groupware_framework.go +++ b/services/groupware/pkg/groupware/groupware_framework.go @@ -53,39 +53,56 @@ const ( logMethod = "method" ) +// Minimalistic representation of a User, containing only the attributes that are +// necessary for the Groupware implementation. type User interface { GetUsername() string GetId() string } +// Provides a User that is associated with a request. type UserProvider interface { // Provide the user for JMAP operations. GetUser(req *http.Request, ctx context.Context, logger *log.Logger) (User, error) } +// Background job that needs to be executed asynchronously by the Groupware. type Job struct { - id uint64 + // An identifier for the job, to use in logs for correlation. + id uint64 + // A human readable description of the job, to use in logs. description string - logger *log.Logger - job func(uint64, *log.Logger) + // The logger to use for the job. + logger *log.Logger + // The function that performs the job. + job func(uint64, *log.Logger) } type Groupware struct { - mux *chi.Mux - metrics *metrics.Metrics - sseServer *sse.Server + mux *chi.Mux + metrics *metrics.Metrics + sseServer *sse.Server + // A map of all the SSE streams that have been created, in order to be able to iterate over them as, + // unfortunately, the sse implementation does not provide such a function. + // Key: the stream ID, which is the username + // Value: the timestamp of the creation of the stream streams cmap.ConcurrentMap logger *log.Logger defaultEmailLimit uint maxBodyValueBytes uint - sessionCache *ttlcache.Cache[string, cachedSession] - jmap *jmap.Client - userProvider UserProvider - eventChannel chan Event - jobsChannel chan Job - jobCounter atomic.Uint64 + // Caches successful and failed Sessions by the username. + sessionCache *ttlcache.Cache[sessionKey, cachedSession] + jmap *jmap.Client + userProvider UserProvider + // SSE events that need to be pushed to clients. + eventChannel chan Event + // Background jobs that need to be executed. + jobsChannel chan Job + // A threadsafe counter to generate the job IDs. + jobCounter atomic.Uint64 } +// An error during the Groupware initialization. type GroupwareInitializationError struct { Message string Err error @@ -102,61 +119,17 @@ func (e GroupwareInitializationError) Unwrap() error { return e.Err } -type GroupwareSessionEventListener struct { - logger *log.Logger - sessionCache *ttlcache.Cache[string, cachedSession] - counter prometheus.Counter -} - -func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) { - // it's enough to remove the session from the cache, as it will be fetched on-demand - // the next time an operation is performed on behalf of the user - l.sessionCache.Delete(session.Username) - if l.counter != nil { - l.counter.Inc() - } - - l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState) -} - -var _ jmap.SessionEventListener = GroupwareSessionEventListener{} - +// SSE Event. type Event struct { - Type string + // The type of event, will be sent as the "type" attribute. + Type string + // The ID of the stream to push the event to, typically the username. Stream string - Body any + // The payload of the event, will be serialized as JSON. + Body any } -type ConstMetricCollector struct { - metric prometheus.Metric -} - -func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- c.metric.Desc() -} -func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) { - ch <- c.metric -} - -type SessionCacheMetricsCollector struct { - desc *prometheus.Desc - supply func() ttlcache.Metrics -} - -func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- s.desc -} -func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) { - m := s.supply() - ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions) - ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions) - ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits) - ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses) -} - -var _ prometheus.Collector = SessionCacheMetricsCollector{} - -func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) { +func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prometheusRegistry prometheus.Registerer) (*Groupware, error) { baseUrl, err := url.Parse(config.Mail.BaseUrl) if err != nil { logger.Error().Err(err).Msgf("failed to parse configured Mail.Baseurl '%v'", config.Mail.BaseUrl) @@ -188,13 +161,14 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting - insecure := true // TODO make configurable + insecureTls := true // TODO make configurable m := metrics.New(logger) + // TODO add timeouts and other meaningful configuration settings for the HTTP client tr := http.DefaultTransport.(*http.Transport).Clone() tr.ResponseHeaderTimeout = responseHeaderTimeout - if insecure { + if insecureTls { tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable tr.TLSClientConfig = tlsConfig } @@ -217,7 +191,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro jmapClient := jmap.NewClient(api, api, api) - var sessionCache *ttlcache.Cache[string, cachedSession] + var sessionCache *ttlcache.Cache[sessionKey, cachedSession] { sessionLoader := &sessionCacheLoader{ logger: logger, @@ -226,18 +200,18 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro } sessionCache = ttlcache.New( - ttlcache.WithCapacity[string, cachedSession](sessionCacheMaxCapacity), - ttlcache.WithTTL[string, cachedSession](sessionCacheTtl), - ttlcache.WithDisableTouchOnHit[string, cachedSession](), + ttlcache.WithCapacity[sessionKey, cachedSession](sessionCacheMaxCapacity), + ttlcache.WithTTL[sessionKey, cachedSession](sessionCacheTtl), + ttlcache.WithDisableTouchOnHit[sessionKey, cachedSession](), ttlcache.WithLoader(sessionLoader), ) go sessionCache.Start() - prometheus.Register(SessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics}) + prometheusRegistry.Register(sessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics}) } - if logger.Trace().Enabled() { - sessionCache.OnEviction(func(c context.Context, r ttlcache.EvictionReason, item *ttlcache.Item[string, cachedSession]) { + sessionCache.OnEviction(func(c context.Context, r ttlcache.EvictionReason, item *ttlcache.Item[sessionKey, cachedSession]) { + if logger.Trace().Enabled() { reason := "" switch r { case ttlcache.EvictionReasonDeleted: @@ -258,10 +232,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro typ = "failed" } logger.Trace().Msgf("%s session cache eviction of user '%v' after %v: %v", typ, item.Key(), spentInCache, reason) - }) - } + } + }) - sessionEventListener := GroupwareSessionEventListener{ + sessionEventListener := sessionEventListener{ sessionCache: sessionCache, logger: logger, counter: prometheus.NewCounter(prometheus.CounterOpts{ @@ -273,6 +247,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro } jmapClient.AddSessionEventListener(&sessionEventListener) + // A channel to process SSE Events with a single worker. eventChannel := make(chan Event, eventChannelSize) { totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc( @@ -284,10 +259,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro if err != nil { logger.Warn().Err(err).Msg("failed to create event_buffer_size metric") } else { - prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric}) + prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric}) } - prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: metrics.Namespace, Subsystem: metrics.Subsystem, Name: "event_buffer_queued", @@ -307,7 +282,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) { sseSubscribers.Add(-1) } - prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: metrics.Namespace, Subsystem: metrics.Subsystem, Name: "sse_subscribers", @@ -328,10 +303,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro if err != nil { logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric") } else { - prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric}) + prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric}) } - prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: metrics.Namespace, Subsystem: metrics.Subsystem, Name: "workers_buffer_queued", @@ -352,10 +327,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro if err != nil { logger.Warn().Err(err).Msg("failed to create workers_total metric") } else { - prometheus.MustRegister(ConstMetricCollector{metric: totalWorkersMetric}) + prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkersMetric}) } - prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: metrics.Namespace, Subsystem: metrics.Subsystem, Name: "workers_busy", @@ -480,7 +455,7 @@ func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) { // Provide a JMAP Session for the func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, *GroupwareError) { - item := g.sessionCache.Get(user.GetUsername()) + item := g.sessionCache.Get(toSessionKey(user.GetUsername())) if item != nil { value := item.Value() if value != nil { @@ -823,7 +798,7 @@ func (g *Groupware) withSession(w http.ResponseWriter, r *http.Request, handler return Response{}, false } - logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId()))) + logger = log.From(logger.With().Str(logUserId, log.SafeString(user.GetId()))) session, ok, gwerr := g.session(user, r, ctx, logger) if gwerr != nil { @@ -928,7 +903,7 @@ func (g *Groupware) stream(w http.ResponseWriter, r *http.Request, handler func( return } - logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId()))) + logger = log.From(logger.With().Str(logUserId, log.SafeString(user.GetId()))) session, ok, gwerr := g.session(user, r, ctx, logger) if gwerr != nil { diff --git a/services/groupware/pkg/groupware/groupware_session.go b/services/groupware/pkg/groupware/groupware_session.go index 569494881a..f1d7aeba1e 100644 --- a/services/groupware/pkg/groupware/groupware_session.go +++ b/services/groupware/pkg/groupware/groupware_session.go @@ -4,10 +4,23 @@ import ( "time" "github.com/jellydator/ttlcache/v3" + "github.com/prometheus/client_golang/prometheus" + "github.com/opencloud-eu/opencloud/pkg/jmap" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics" ) +type sessionKey string + +func toSessionKey(username string) sessionKey { + return sessionKey(username) +} + +func usernameFromSessionKey(key sessionKey) string { + return string(key) +} + type cachedSession interface { Success() bool Get() jmap.Session @@ -61,15 +74,56 @@ type sessionCacheLoader struct { errorTtl time.Duration } -func (l *sessionCacheLoader) Load(c *ttlcache.Cache[string, cachedSession], username string) *ttlcache.Item[string, cachedSession] { +func (l *sessionCacheLoader) Load(c *ttlcache.Cache[sessionKey, cachedSession], key sessionKey) *ttlcache.Item[sessionKey, cachedSession] { + username := usernameFromSessionKey(key) session, err := l.jmapClient.FetchSession(username, l.logger) if err != nil { - l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", username) - return c.Set(username, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl) + l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", key) + return c.Set(key, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl) } else { - l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", username) - return c.Set(username, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache + l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", key) + return c.Set(key, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache } } -var _ ttlcache.Loader[string, cachedSession] = &sessionCacheLoader{} +var _ ttlcache.Loader[sessionKey, cachedSession] = &sessionCacheLoader{} + +// Listens to JMAP Session outdated events, in order to remove outdated Sessions +// from the Groupware Session cache. +type sessionEventListener struct { + logger *log.Logger + sessionCache *ttlcache.Cache[sessionKey, cachedSession] + counter prometheus.Counter +} + +func (l sessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) { + // it's enough to remove the session from the cache, as it will be fetched on-demand + // the next time an operation is performed on behalf of the user + l.sessionCache.Delete(toSessionKey(session.Username)) + if l.counter != nil { + l.counter.Inc() + } + + l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState) +} + +var _ jmap.SessionEventListener = sessionEventListener{} + +// A Prometheus Collector for the Session cache metrics. +type sessionCacheMetricsCollector struct { + desc *prometheus.Desc + supply func() ttlcache.Metrics +} + +func (s sessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- s.desc +} +func (s sessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) { + m := s.supply() + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions) + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions) + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits) + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses) +} + +var _ prometheus.Collector = sessionCacheMetricsCollector{} diff --git a/services/groupware/pkg/metrics/metrics.go b/services/groupware/pkg/metrics/metrics.go index d608608e91..5cb0514337 100644 --- a/services/groupware/pkg/metrics/metrics.go +++ b/services/groupware/pkg/metrics/metrics.go @@ -177,7 +177,20 @@ func New(logger *log.Logger) *Metrics { }, []string{Labels.Endpoint}), } - r := reflect.ValueOf(*m) + registerAll(m, logger) + + return m +} + +func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) { + obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId}) +} + +func registerAll(m any, logger *log.Logger) { + r := reflect.ValueOf(m) + if r.Kind() == reflect.Pointer { + r = r.Elem() + } for i := 0; i < r.NumField(); i++ { n := r.Type().Field(i).Name f := r.Field(i) @@ -190,9 +203,44 @@ func New(logger *log.Logger) *Metrics { } } } - return m } -func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) { - obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId}) +type ConstMetricCollector struct { + Metric prometheus.Metric } + +func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.Metric.Desc() +} +func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) { + ch <- c.Metric +} + +type LoggingPrometheusRegisterer struct { + delegate prometheus.Registerer + logger *log.Logger +} + +func NewLoggingPrometheusRegisterer(delegate prometheus.Registerer, logger *log.Logger) LoggingPrometheusRegisterer { + return LoggingPrometheusRegisterer{ + delegate: delegate, + logger: logger, + } +} + +func (r LoggingPrometheusRegisterer) Register(c prometheus.Collector) error { + err := r.delegate.Register(c) + if err != nil { + r.logger.Warn().Err(err).Msgf("failed to register metric") + } + return err +} +func (r LoggingPrometheusRegisterer) MustRegister(...prometheus.Collector) { + panic("don't use MustRegister") +} + +func (r LoggingPrometheusRegisterer) Unregister(c prometheus.Collector) bool { + return r.delegate.Unregister(c) +} + +var _ prometheus.Registerer = LoggingPrometheusRegisterer{} diff --git a/services/groupware/pkg/metrics/startup_metrics.go b/services/groupware/pkg/metrics/startup_metrics.go index 9efcc8bfa8..f762286243 100644 --- a/services/groupware/pkg/metrics/startup_metrics.go +++ b/services/groupware/pkg/metrics/startup_metrics.go @@ -9,13 +9,13 @@ import ( var registered atomic.Bool -func StartupMetrics() { +func StartupMetrics(registerer prometheus.Registerer) { // use an atomic boolean to make the operation idempotent, // instead of causing a panic in case this function is // called twice if registered.CompareAndSwap(false, true) { // https://github.com/prometheus/common/blob/8558a5b7db3c84fa38b4766966059a7bd5bfa2ee/version/info.go#L36-L56 - prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + registerer.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: Subsystem, Name: "build_info", diff --git a/services/groupware/pkg/service/http/v0/service.go b/services/groupware/pkg/service/http/v0/service.go index 6d56c48bd0..432c7cece7 100644 --- a/services/groupware/pkg/service/http/v0/service.go +++ b/services/groupware/pkg/service/http/v0/service.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "github.com/prometheus/client_golang/prometheus" "github.com/riandyrn/otelchi" "github.com/opencloud-eu/opencloud/pkg/log" @@ -34,7 +35,11 @@ func NewService(opts ...Option) (Service, error) { ), ) - gw, err := groupware.NewGroupware(options.Config, &options.Logger, m) + logger := &options.Logger + + registerer := metrics.NewLoggingPrometheusRegisterer(prometheus.DefaultRegisterer, logger) + + gw, err := groupware.NewGroupware(options.Config, logger, m, registerer) if err != nil { return nil, err } @@ -53,7 +58,7 @@ func NewService(opts ...Option) (Service, error) { } } - metrics.StartupMetrics() + metrics.StartupMetrics(registerer) return gw, nil }