refactor(groupware): logging and metrics improvements

* some minor code refactorings to improve logging and metrics

 * more code documentation
This commit is contained in:
Pascal Bleser
2025-08-28 17:00:41 +02:00
parent a31d1991e2
commit 3d85aa69b7
5 changed files with 181 additions and 99 deletions

View File

@@ -53,39 +53,56 @@ const (
logMethod = "method"
)
// Minimalistic representation of a User, containing only the attributes that are
// necessary for the Groupware implementation.
type User interface {
GetUsername() string
GetId() string
}
// Provides a User that is associated with a request.
type UserProvider interface {
// Provide the user for JMAP operations.
GetUser(req *http.Request, ctx context.Context, logger *log.Logger) (User, error)
}
// Background job that needs to be executed asynchronously by the Groupware.
type Job struct {
id uint64
// An identifier for the job, to use in logs for correlation.
id uint64
// A human readable description of the job, to use in logs.
description string
logger *log.Logger
job func(uint64, *log.Logger)
// The logger to use for the job.
logger *log.Logger
// The function that performs the job.
job func(uint64, *log.Logger)
}
type Groupware struct {
mux *chi.Mux
metrics *metrics.Metrics
sseServer *sse.Server
mux *chi.Mux
metrics *metrics.Metrics
sseServer *sse.Server
// A map of all the SSE streams that have been created, in order to be able to iterate over them as,
// unfortunately, the sse implementation does not provide such a function.
// Key: the stream ID, which is the username
// Value: the timestamp of the creation of the stream
streams cmap.ConcurrentMap
logger *log.Logger
defaultEmailLimit uint
maxBodyValueBytes uint
sessionCache *ttlcache.Cache[string, cachedSession]
jmap *jmap.Client
userProvider UserProvider
eventChannel chan Event
jobsChannel chan Job
jobCounter atomic.Uint64
// Caches successful and failed Sessions by the username.
sessionCache *ttlcache.Cache[sessionKey, cachedSession]
jmap *jmap.Client
userProvider UserProvider
// SSE events that need to be pushed to clients.
eventChannel chan Event
// Background jobs that need to be executed.
jobsChannel chan Job
// A threadsafe counter to generate the job IDs.
jobCounter atomic.Uint64
}
// An error during the Groupware initialization.
type GroupwareInitializationError struct {
Message string
Err error
@@ -102,61 +119,17 @@ func (e GroupwareInitializationError) Unwrap() error {
return e.Err
}
type GroupwareSessionEventListener struct {
logger *log.Logger
sessionCache *ttlcache.Cache[string, cachedSession]
counter prometheus.Counter
}
func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) {
// it's enough to remove the session from the cache, as it will be fetched on-demand
// the next time an operation is performed on behalf of the user
l.sessionCache.Delete(session.Username)
if l.counter != nil {
l.counter.Inc()
}
l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState)
}
var _ jmap.SessionEventListener = GroupwareSessionEventListener{}
// SSE Event.
type Event struct {
Type string
// The type of event, will be sent as the "type" attribute.
Type string
// The ID of the stream to push the event to, typically the username.
Stream string
Body any
// The payload of the event, will be serialized as JSON.
Body any
}
type ConstMetricCollector struct {
metric prometheus.Metric
}
func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.metric.Desc()
}
func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- c.metric
}
type SessionCacheMetricsCollector struct {
desc *prometheus.Desc
supply func() ttlcache.Metrics
}
func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- s.desc
}
func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m := s.supply()
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses)
}
var _ prometheus.Collector = SessionCacheMetricsCollector{}
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) {
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prometheusRegistry prometheus.Registerer) (*Groupware, error) {
baseUrl, err := url.Parse(config.Mail.BaseUrl)
if err != nil {
logger.Error().Err(err).Msgf("failed to parse configured Mail.Baseurl '%v'", config.Mail.BaseUrl)
@@ -188,13 +161,14 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive
sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting
insecure := true // TODO make configurable
insecureTls := true // TODO make configurable
m := metrics.New(logger)
// TODO add timeouts and other meaningful configuration settings for the HTTP client
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = responseHeaderTimeout
if insecure {
if insecureTls {
tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable
tr.TLSClientConfig = tlsConfig
}
@@ -217,7 +191,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
jmapClient := jmap.NewClient(api, api, api)
var sessionCache *ttlcache.Cache[string, cachedSession]
var sessionCache *ttlcache.Cache[sessionKey, cachedSession]
{
sessionLoader := &sessionCacheLoader{
logger: logger,
@@ -226,18 +200,18 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
}
sessionCache = ttlcache.New(
ttlcache.WithCapacity[string, cachedSession](sessionCacheMaxCapacity),
ttlcache.WithTTL[string, cachedSession](sessionCacheTtl),
ttlcache.WithDisableTouchOnHit[string, cachedSession](),
ttlcache.WithCapacity[sessionKey, cachedSession](sessionCacheMaxCapacity),
ttlcache.WithTTL[sessionKey, cachedSession](sessionCacheTtl),
ttlcache.WithDisableTouchOnHit[sessionKey, cachedSession](),
ttlcache.WithLoader(sessionLoader),
)
go sessionCache.Start()
prometheus.Register(SessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics})
prometheusRegistry.Register(sessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics})
}
if logger.Trace().Enabled() {
sessionCache.OnEviction(func(c context.Context, r ttlcache.EvictionReason, item *ttlcache.Item[string, cachedSession]) {
sessionCache.OnEviction(func(c context.Context, r ttlcache.EvictionReason, item *ttlcache.Item[sessionKey, cachedSession]) {
if logger.Trace().Enabled() {
reason := ""
switch r {
case ttlcache.EvictionReasonDeleted:
@@ -258,10 +232,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
typ = "failed"
}
logger.Trace().Msgf("%s session cache eviction of user '%v' after %v: %v", typ, item.Key(), spentInCache, reason)
})
}
}
})
sessionEventListener := GroupwareSessionEventListener{
sessionEventListener := sessionEventListener{
sessionCache: sessionCache,
logger: logger,
counter: prometheus.NewCounter(prometheus.CounterOpts{
@@ -273,6 +247,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
}
jmapClient.AddSessionEventListener(&sessionEventListener)
// A channel to process SSE Events with a single worker.
eventChannel := make(chan Event, eventChannelSize)
{
totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
@@ -284,10 +259,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
if err != nil {
logger.Warn().Err(err).Msg("failed to create event_buffer_size metric")
} else {
prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric})
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "event_buffer_queued",
@@ -307,7 +282,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) {
sseSubscribers.Add(-1)
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "sse_subscribers",
@@ -328,10 +303,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric")
} else {
prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric})
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_buffer_queued",
@@ -352,10 +327,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_total metric")
} else {
prometheus.MustRegister(ConstMetricCollector{metric: totalWorkersMetric})
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkersMetric})
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_busy",
@@ -480,7 +455,7 @@ func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) {
// Provide a JMAP Session for the
func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, *GroupwareError) {
item := g.sessionCache.Get(user.GetUsername())
item := g.sessionCache.Get(toSessionKey(user.GetUsername()))
if item != nil {
value := item.Value()
if value != nil {
@@ -823,7 +798,7 @@ func (g *Groupware) withSession(w http.ResponseWriter, r *http.Request, handler
return Response{}, false
}
logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
logger = log.From(logger.With().Str(logUserId, log.SafeString(user.GetId())))
session, ok, gwerr := g.session(user, r, ctx, logger)
if gwerr != nil {
@@ -928,7 +903,7 @@ func (g *Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(
return
}
logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
logger = log.From(logger.With().Str(logUserId, log.SafeString(user.GetId())))
session, ok, gwerr := g.session(user, r, ctx, logger)
if gwerr != nil {

View File

@@ -4,10 +4,23 @@ import (
"time"
"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
type sessionKey string
func toSessionKey(username string) sessionKey {
return sessionKey(username)
}
func usernameFromSessionKey(key sessionKey) string {
return string(key)
}
type cachedSession interface {
Success() bool
Get() jmap.Session
@@ -61,15 +74,56 @@ type sessionCacheLoader struct {
errorTtl time.Duration
}
func (l *sessionCacheLoader) Load(c *ttlcache.Cache[string, cachedSession], username string) *ttlcache.Item[string, cachedSession] {
func (l *sessionCacheLoader) Load(c *ttlcache.Cache[sessionKey, cachedSession], key sessionKey) *ttlcache.Item[sessionKey, cachedSession] {
username := usernameFromSessionKey(key)
session, err := l.jmapClient.FetchSession(username, l.logger)
if err != nil {
l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", username)
return c.Set(username, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", key)
return c.Set(key, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
} else {
l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", username)
return c.Set(username, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", key)
return c.Set(key, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
}
}
var _ ttlcache.Loader[string, cachedSession] = &sessionCacheLoader{}
var _ ttlcache.Loader[sessionKey, cachedSession] = &sessionCacheLoader{}
// Listens to JMAP Session outdated events, in order to remove outdated Sessions
// from the Groupware Session cache.
type sessionEventListener struct {
logger *log.Logger
sessionCache *ttlcache.Cache[sessionKey, cachedSession]
counter prometheus.Counter
}
func (l sessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) {
// it's enough to remove the session from the cache, as it will be fetched on-demand
// the next time an operation is performed on behalf of the user
l.sessionCache.Delete(toSessionKey(session.Username))
if l.counter != nil {
l.counter.Inc()
}
l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState)
}
var _ jmap.SessionEventListener = sessionEventListener{}
// A Prometheus Collector for the Session cache metrics.
type sessionCacheMetricsCollector struct {
desc *prometheus.Desc
supply func() ttlcache.Metrics
}
func (s sessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- s.desc
}
func (s sessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m := s.supply()
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses)
}
var _ prometheus.Collector = sessionCacheMetricsCollector{}

View File

@@ -177,7 +177,20 @@ func New(logger *log.Logger) *Metrics {
}, []string{Labels.Endpoint}),
}
r := reflect.ValueOf(*m)
registerAll(m, logger)
return m
}
func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) {
obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
}
func registerAll(m any, logger *log.Logger) {
r := reflect.ValueOf(m)
if r.Kind() == reflect.Pointer {
r = r.Elem()
}
for i := 0; i < r.NumField(); i++ {
n := r.Type().Field(i).Name
f := r.Field(i)
@@ -190,9 +203,44 @@ func New(logger *log.Logger) *Metrics {
}
}
}
return m
}
func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) {
obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
type ConstMetricCollector struct {
Metric prometheus.Metric
}
func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.Metric.Desc()
}
func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- c.Metric
}
type LoggingPrometheusRegisterer struct {
delegate prometheus.Registerer
logger *log.Logger
}
func NewLoggingPrometheusRegisterer(delegate prometheus.Registerer, logger *log.Logger) LoggingPrometheusRegisterer {
return LoggingPrometheusRegisterer{
delegate: delegate,
logger: logger,
}
}
func (r LoggingPrometheusRegisterer) Register(c prometheus.Collector) error {
err := r.delegate.Register(c)
if err != nil {
r.logger.Warn().Err(err).Msgf("failed to register metric")
}
return err
}
func (r LoggingPrometheusRegisterer) MustRegister(...prometheus.Collector) {
panic("don't use MustRegister")
}
func (r LoggingPrometheusRegisterer) Unregister(c prometheus.Collector) bool {
return r.delegate.Unregister(c)
}
var _ prometheus.Registerer = LoggingPrometheusRegisterer{}

View File

@@ -9,13 +9,13 @@ import (
var registered atomic.Bool
func StartupMetrics() {
func StartupMetrics(registerer prometheus.Registerer) {
// use an atomic boolean to make the operation idempotent,
// instead of causing a panic in case this function is
// called twice
if registered.CompareAndSwap(false, true) {
// https://github.com/prometheus/common/blob/8558a5b7db3c84fa38b4766966059a7bd5bfa2ee/version/info.go#L36-L56
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
registerer.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",

View File

@@ -5,6 +5,7 @@ import (
"net/http"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/riandyrn/otelchi"
"github.com/opencloud-eu/opencloud/pkg/log"
@@ -34,7 +35,11 @@ func NewService(opts ...Option) (Service, error) {
),
)
gw, err := groupware.NewGroupware(options.Config, &options.Logger, m)
logger := &options.Logger
registerer := metrics.NewLoggingPrometheusRegisterer(prometheus.DefaultRegisterer, logger)
gw, err := groupware.NewGroupware(options.Config, logger, m, registerer)
if err != nil {
return nil, err
}
@@ -53,7 +58,7 @@ func NewService(opts ...Option) (Service, error) {
}
}
metrics.StartupMetrics()
metrics.StartupMetrics(registerer)
return gw, nil
}