From 3d85aa69b70222204901ac50beb2bba0c7e43efb Mon Sep 17 00:00:00 2001
From: Pascal Bleser
Date: Thu, 28 Aug 2025 17:00:41 +0200
Subject: [PATCH] refactor(groupware): logging and metrics improvements
* some minor code refactorings to improve logging and metrics
* more code documentation
---
.../pkg/groupware/groupware_framework.go | 145 ++++++++----------
.../pkg/groupware/groupware_session.go | 66 +++++++-
services/groupware/pkg/metrics/metrics.go | 56 ++++++-
.../groupware/pkg/metrics/startup_metrics.go | 4 +-
.../groupware/pkg/service/http/v0/service.go | 9 +-
5 files changed, 181 insertions(+), 99 deletions(-)
diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go
index c85eac6189..dd70fb297e 100644
--- a/services/groupware/pkg/groupware/groupware_framework.go
+++ b/services/groupware/pkg/groupware/groupware_framework.go
@@ -53,39 +53,56 @@ const (
logMethod = "method"
)
+// Minimalistic representation of a User, containing only the attributes that are
+// necessary for the Groupware implementation.
type User interface {
GetUsername() string
GetId() string
}
+// Provides a User that is associated with a request.
type UserProvider interface {
// Provide the user for JMAP operations.
GetUser(req *http.Request, ctx context.Context, logger *log.Logger) (User, error)
}
+// Background job that needs to be executed asynchronously by the Groupware.
type Job struct {
- id uint64
+ // An identifier for the job, to use in logs for correlation.
+ id uint64
+ // A human readable description of the job, to use in logs.
description string
- logger *log.Logger
- job func(uint64, *log.Logger)
+ // The logger to use for the job.
+ logger *log.Logger
+ // The function that performs the job.
+ job func(uint64, *log.Logger)
}
type Groupware struct {
- mux *chi.Mux
- metrics *metrics.Metrics
- sseServer *sse.Server
+ mux *chi.Mux
+ metrics *metrics.Metrics
+ sseServer *sse.Server
+ // A map of all the SSE streams that have been created, in order to be able to iterate over them as,
+ // unfortunately, the sse implementation does not provide such a function.
+ // Key: the stream ID, which is the username
+ // Value: the timestamp of the creation of the stream
streams cmap.ConcurrentMap
logger *log.Logger
defaultEmailLimit uint
maxBodyValueBytes uint
- sessionCache *ttlcache.Cache[string, cachedSession]
- jmap *jmap.Client
- userProvider UserProvider
- eventChannel chan Event
- jobsChannel chan Job
- jobCounter atomic.Uint64
+ // Caches successful and failed Sessions by the username.
+ sessionCache *ttlcache.Cache[sessionKey, cachedSession]
+ jmap *jmap.Client
+ userProvider UserProvider
+ // SSE events that need to be pushed to clients.
+ eventChannel chan Event
+ // Background jobs that need to be executed.
+ jobsChannel chan Job
+ // A threadsafe counter to generate the job IDs.
+ jobCounter atomic.Uint64
}
+// An error during the Groupware initialization.
type GroupwareInitializationError struct {
Message string
Err error
@@ -102,61 +119,17 @@ func (e GroupwareInitializationError) Unwrap() error {
return e.Err
}
-type GroupwareSessionEventListener struct {
- logger *log.Logger
- sessionCache *ttlcache.Cache[string, cachedSession]
- counter prometheus.Counter
-}
-
-func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) {
- // it's enough to remove the session from the cache, as it will be fetched on-demand
- // the next time an operation is performed on behalf of the user
- l.sessionCache.Delete(session.Username)
- if l.counter != nil {
- l.counter.Inc()
- }
-
- l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState)
-}
-
-var _ jmap.SessionEventListener = GroupwareSessionEventListener{}
-
+// SSE Event.
type Event struct {
- Type string
+ // The type of event, will be sent as the "type" attribute.
+ Type string
+ // The ID of the stream to push the event to, typically the username.
Stream string
- Body any
+ // The payload of the event, will be serialized as JSON.
+ Body any
}
-type ConstMetricCollector struct {
- metric prometheus.Metric
-}
-
-func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) {
- ch <- c.metric.Desc()
-}
-func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) {
- ch <- c.metric
-}
-
-type SessionCacheMetricsCollector struct {
- desc *prometheus.Desc
- supply func() ttlcache.Metrics
-}
-
-func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
- ch <- s.desc
-}
-func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
- m := s.supply()
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions)
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions)
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits)
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses)
-}
-
-var _ prometheus.Collector = SessionCacheMetricsCollector{}
-
-func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) {
+func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prometheusRegistry prometheus.Registerer) (*Groupware, error) {
baseUrl, err := url.Parse(config.Mail.BaseUrl)
if err != nil {
logger.Error().Err(err).Msgf("failed to parse configured Mail.Baseurl '%v'", config.Mail.BaseUrl)
@@ -188,13 +161,14 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive
sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting
- insecure := true // TODO make configurable
+ insecureTls := true // TODO make configurable
m := metrics.New(logger)
+ // TODO add timeouts and other meaningful configuration settings for the HTTP client
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = responseHeaderTimeout
- if insecure {
+ if insecureTls {
tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable
tr.TLSClientConfig = tlsConfig
}
@@ -217,7 +191,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
jmapClient := jmap.NewClient(api, api, api)
- var sessionCache *ttlcache.Cache[string, cachedSession]
+ var sessionCache *ttlcache.Cache[sessionKey, cachedSession]
{
sessionLoader := &sessionCacheLoader{
logger: logger,
@@ -226,18 +200,18 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
}
sessionCache = ttlcache.New(
- ttlcache.WithCapacity[string, cachedSession](sessionCacheMaxCapacity),
- ttlcache.WithTTL[string, cachedSession](sessionCacheTtl),
- ttlcache.WithDisableTouchOnHit[string, cachedSession](),
+ ttlcache.WithCapacity[sessionKey, cachedSession](sessionCacheMaxCapacity),
+ ttlcache.WithTTL[sessionKey, cachedSession](sessionCacheTtl),
+ ttlcache.WithDisableTouchOnHit[sessionKey, cachedSession](),
ttlcache.WithLoader(sessionLoader),
)
go sessionCache.Start()
- prometheus.Register(SessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics})
+ prometheusRegistry.Register(sessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics})
}
- if logger.Trace().Enabled() {
- sessionCache.OnEviction(func(c context.Context, r ttlcache.EvictionReason, item *ttlcache.Item[string, cachedSession]) {
+ sessionCache.OnEviction(func(c context.Context, r ttlcache.EvictionReason, item *ttlcache.Item[sessionKey, cachedSession]) {
+ if logger.Trace().Enabled() {
reason := ""
switch r {
case ttlcache.EvictionReasonDeleted:
@@ -258,10 +232,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
typ = "failed"
}
logger.Trace().Msgf("%s session cache eviction of user '%v' after %v: %v", typ, item.Key(), spentInCache, reason)
- })
- }
+ }
+ })
- sessionEventListener := GroupwareSessionEventListener{
+ sessionEventListener := sessionEventListener{
sessionCache: sessionCache,
logger: logger,
counter: prometheus.NewCounter(prometheus.CounterOpts{
@@ -273,6 +247,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
}
jmapClient.AddSessionEventListener(&sessionEventListener)
+ // A channel to process SSE Events with a single worker.
eventChannel := make(chan Event, eventChannelSize)
{
totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
@@ -284,10 +259,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
if err != nil {
logger.Warn().Err(err).Msg("failed to create event_buffer_size metric")
} else {
- prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
+ prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric})
}
- prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "event_buffer_queued",
@@ -307,7 +282,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) {
sseSubscribers.Add(-1)
}
- prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "sse_subscribers",
@@ -328,10 +303,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric")
} else {
- prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
+ prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric})
}
- prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_buffer_queued",
@@ -352,10 +327,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_total metric")
} else {
- prometheus.MustRegister(ConstMetricCollector{metric: totalWorkersMetric})
+ prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkersMetric})
}
- prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_busy",
@@ -480,7 +455,7 @@ func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) {
// Provide a JMAP Session for the
func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, *GroupwareError) {
- item := g.sessionCache.Get(user.GetUsername())
+ item := g.sessionCache.Get(toSessionKey(user.GetUsername()))
if item != nil {
value := item.Value()
if value != nil {
@@ -823,7 +798,7 @@ func (g *Groupware) withSession(w http.ResponseWriter, r *http.Request, handler
return Response{}, false
}
- logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
+ logger = log.From(logger.With().Str(logUserId, log.SafeString(user.GetId())))
session, ok, gwerr := g.session(user, r, ctx, logger)
if gwerr != nil {
@@ -928,7 +903,7 @@ func (g *Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(
return
}
- logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
+ logger = log.From(logger.With().Str(logUserId, log.SafeString(user.GetId())))
session, ok, gwerr := g.session(user, r, ctx, logger)
if gwerr != nil {
diff --git a/services/groupware/pkg/groupware/groupware_session.go b/services/groupware/pkg/groupware/groupware_session.go
index 569494881a..f1d7aeba1e 100644
--- a/services/groupware/pkg/groupware/groupware_session.go
+++ b/services/groupware/pkg/groupware/groupware_session.go
@@ -4,10 +4,23 @@ import (
"time"
"github.com/jellydator/ttlcache/v3"
+ "github.com/prometheus/client_golang/prometheus"
+
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
+ "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
+type sessionKey string
+
+func toSessionKey(username string) sessionKey {
+ return sessionKey(username)
+}
+
+func usernameFromSessionKey(key sessionKey) string {
+ return string(key)
+}
+
type cachedSession interface {
Success() bool
Get() jmap.Session
@@ -61,15 +74,56 @@ type sessionCacheLoader struct {
errorTtl time.Duration
}
-func (l *sessionCacheLoader) Load(c *ttlcache.Cache[string, cachedSession], username string) *ttlcache.Item[string, cachedSession] {
+func (l *sessionCacheLoader) Load(c *ttlcache.Cache[sessionKey, cachedSession], key sessionKey) *ttlcache.Item[sessionKey, cachedSession] {
+ username := usernameFromSessionKey(key)
session, err := l.jmapClient.FetchSession(username, l.logger)
if err != nil {
- l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", username)
- return c.Set(username, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
+ l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", key)
+ return c.Set(key, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
} else {
- l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", username)
- return c.Set(username, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
+ l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", key)
+ return c.Set(key, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
}
}
-var _ ttlcache.Loader[string, cachedSession] = &sessionCacheLoader{}
+var _ ttlcache.Loader[sessionKey, cachedSession] = &sessionCacheLoader{}
+
+// Listens to JMAP Session outdated events, in order to remove outdated Sessions
+// from the Groupware Session cache.
+type sessionEventListener struct {
+ logger *log.Logger
+ sessionCache *ttlcache.Cache[sessionKey, cachedSession]
+ counter prometheus.Counter
+}
+
+func (l sessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) {
+ // it's enough to remove the session from the cache, as it will be fetched on-demand
+ // the next time an operation is performed on behalf of the user
+ l.sessionCache.Delete(toSessionKey(session.Username))
+ if l.counter != nil {
+ l.counter.Inc()
+ }
+
+ l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState)
+}
+
+var _ jmap.SessionEventListener = sessionEventListener{}
+
+// A Prometheus Collector for the Session cache metrics.
+type sessionCacheMetricsCollector struct {
+ desc *prometheus.Desc
+ supply func() ttlcache.Metrics
+}
+
+func (s sessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
+ ch <- s.desc
+}
+func (s sessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
+ m := s.supply()
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses)
+}
+
+var _ prometheus.Collector = sessionCacheMetricsCollector{}
diff --git a/services/groupware/pkg/metrics/metrics.go b/services/groupware/pkg/metrics/metrics.go
index d608608e91..5cb0514337 100644
--- a/services/groupware/pkg/metrics/metrics.go
+++ b/services/groupware/pkg/metrics/metrics.go
@@ -177,7 +177,20 @@ func New(logger *log.Logger) *Metrics {
}, []string{Labels.Endpoint}),
}
- r := reflect.ValueOf(*m)
+ registerAll(m, logger)
+
+ return m
+}
+
+func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) {
+ obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
+}
+
+func registerAll(m any, logger *log.Logger) {
+ r := reflect.ValueOf(m)
+ if r.Kind() == reflect.Pointer {
+ r = r.Elem()
+ }
for i := 0; i < r.NumField(); i++ {
n := r.Type().Field(i).Name
f := r.Field(i)
@@ -190,9 +203,44 @@ func New(logger *log.Logger) *Metrics {
}
}
}
- return m
}
-func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) {
- obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
+type ConstMetricCollector struct {
+ Metric prometheus.Metric
}
+
+func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) {
+ ch <- c.Metric.Desc()
+}
+func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) {
+ ch <- c.Metric
+}
+
+type LoggingPrometheusRegisterer struct {
+ delegate prometheus.Registerer
+ logger *log.Logger
+}
+
+func NewLoggingPrometheusRegisterer(delegate prometheus.Registerer, logger *log.Logger) LoggingPrometheusRegisterer {
+ return LoggingPrometheusRegisterer{
+ delegate: delegate,
+ logger: logger,
+ }
+}
+
+func (r LoggingPrometheusRegisterer) Register(c prometheus.Collector) error {
+ err := r.delegate.Register(c)
+ if err != nil {
+ r.logger.Warn().Err(err).Msgf("failed to register metric")
+ }
+ return err
+}
+func (r LoggingPrometheusRegisterer) MustRegister(...prometheus.Collector) {
+ panic("don't use MustRegister")
+}
+
+func (r LoggingPrometheusRegisterer) Unregister(c prometheus.Collector) bool {
+ return r.delegate.Unregister(c)
+}
+
+var _ prometheus.Registerer = LoggingPrometheusRegisterer{}
diff --git a/services/groupware/pkg/metrics/startup_metrics.go b/services/groupware/pkg/metrics/startup_metrics.go
index 9efcc8bfa8..f762286243 100644
--- a/services/groupware/pkg/metrics/startup_metrics.go
+++ b/services/groupware/pkg/metrics/startup_metrics.go
@@ -9,13 +9,13 @@ import (
var registered atomic.Bool
-func StartupMetrics() {
+func StartupMetrics(registerer prometheus.Registerer) {
// use an atomic boolean to make the operation idempotent,
// instead of causing a panic in case this function is
// called twice
if registered.CompareAndSwap(false, true) {
// https://github.com/prometheus/common/blob/8558a5b7db3c84fa38b4766966059a7bd5bfa2ee/version/info.go#L36-L56
- prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ registerer.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
diff --git a/services/groupware/pkg/service/http/v0/service.go b/services/groupware/pkg/service/http/v0/service.go
index 6d56c48bd0..432c7cece7 100644
--- a/services/groupware/pkg/service/http/v0/service.go
+++ b/services/groupware/pkg/service/http/v0/service.go
@@ -5,6 +5,7 @@ import (
"net/http"
"github.com/go-chi/chi/v5"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/riandyrn/otelchi"
"github.com/opencloud-eu/opencloud/pkg/log"
@@ -34,7 +35,11 @@ func NewService(opts ...Option) (Service, error) {
),
)
- gw, err := groupware.NewGroupware(options.Config, &options.Logger, m)
+ logger := &options.Logger
+
+ registerer := metrics.NewLoggingPrometheusRegisterer(prometheus.DefaultRegisterer, logger)
+
+ gw, err := groupware.NewGroupware(options.Config, logger, m, registerer)
if err != nil {
return nil, err
}
@@ -53,7 +58,7 @@ func NewService(opts ...Option) (Service, error) {
}
}
- metrics.StartupMetrics()
+ metrics.StartupMetrics(registerer)
return gw, nil
}