From 8b5d8aedfd71770a6091c9c14c620b83a07429db Mon Sep 17 00:00:00 2001
From: Pascal Bleser
Date: Tue, 26 Aug 2025 22:11:02 +0200
Subject: [PATCH] groupware: implement metrics
* implement a framework for metrics, with a few exemplary ones
---
services/groupware/pkg/command/server.go | 5 +-
.../pkg/groupware/groupware_api_messages.go | 12 ++
.../pkg/groupware/groupware_framework.go | 179 ++++++++++++++++--
.../groupware/pkg/metrics/http_metrics.go | 11 ++
services/groupware/pkg/metrics/metrics.go | 51 +++--
.../groupware/pkg/metrics/startup_metrics.go | 28 +++
services/groupware/pkg/server/http/option.go | 4 +-
.../pkg/service/http/v0/instrument.go | 4 +-
.../groupware/pkg/service/http/v0/service.go | 3 +
9 files changed, 263 insertions(+), 34 deletions(-)
create mode 100644 services/groupware/pkg/metrics/http_metrics.go
create mode 100644 services/groupware/pkg/metrics/startup_metrics.go
diff --git a/services/groupware/pkg/command/server.go b/services/groupware/pkg/command/server.go
index 613f0f3aa3..150329029b 100644
--- a/services/groupware/pkg/command/server.go
+++ b/services/groupware/pkg/command/server.go
@@ -7,7 +7,6 @@ import (
"github.com/oklog/run"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/tracing"
- "github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/logging"
@@ -37,13 +36,11 @@ func Server(cfg *config.Config) *cli.Command {
var (
gr = run.Group{}
ctx, cancel = context.WithCancel(c.Context)
- m = metrics.New()
+ m = metrics.NewHttpMetrics()
)
defer cancel()
- m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
-
server, err := debug.Server(
debug.Logger(logger),
debug.Config(cfg),
diff --git a/services/groupware/pkg/groupware/groupware_api_messages.go b/services/groupware/pkg/groupware/groupware_api_messages.go
index d4dd8719cc..47a3a29edf 100644
--- a/services/groupware/pkg/groupware/groupware_api_messages.go
+++ b/services/groupware/pkg/groupware/groupware_api_messages.go
@@ -9,8 +9,11 @@ import (
"github.com/go-chi/chi/v5"
+ "github.com/prometheus/client_golang/prometheus"
+
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
+ "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
// When the request succeeds without a "since" query parameter.
@@ -574,14 +577,23 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
reqId := req.GetRequestId()
accountId := req.GetAccountId()
logger := log.From(req.logger.With().Str(logEmailId, log.SafeString(id)))
+ getEmailsBefore := time.Now()
emails, sessionState, jerr := g.jmap.GetEmails(accountId, req.session, req.ctx, logger, []string{id}, true, g.maxBodyValueBytes)
+ getEmailsDuration := time.Since(getEmailsBefore)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
if len(emails.Emails) < 1 {
+ g.metrics.EmailByIdDuration.WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).Observe(getEmailsDuration.Seconds())
logger.Trace().Msg("failed to find any emails matching id") // the id is already in the log field
return notFoundResponse(sessionState)
}
+ g.metrics.EmailByIdDuration.
+ WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).(prometheus.ExemplarObserver).
+ ObserveWithExemplar(getEmailsDuration.Seconds(), prometheus.Labels{
+ metrics.Labels.RequestId: reqId,
+ })
+
email := emails.Emails[0]
beacon := email.ReceivedAt // TODO configurable: either relative to when the email was received, or relative to now
diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go
index bb9cf13f3b..149ac6b3af 100644
--- a/services/groupware/pkg/groupware/groupware_framework.go
+++ b/services/groupware/pkg/groupware/groupware_framework.go
@@ -19,11 +19,15 @@ import (
"github.com/r3labs/sse/v2"
"github.com/rs/zerolog"
+ "github.com/prometheus/client_golang/prometheus"
+
"github.com/jellydator/ttlcache/v3"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
+
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
+ "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
const (
@@ -40,6 +44,11 @@ const (
logFolderId = "folder-id"
logQuery = "query"
logEmailId = "email-id"
+ logJobDescription = "job"
+ logJobId = "job-id"
+ logStreamId = "stream-id"
+ logPath = "path"
+ logMethod = "method"
)
type User interface {
@@ -61,6 +70,7 @@ type Job struct {
type Groupware struct {
mux *chi.Mux
+ metrics *metrics.Metrics
sseServer *sse.Server
streams map[string]time.Time
streamsLock sync.Mutex
@@ -94,12 +104,16 @@ func (e GroupwareInitializationError) Unwrap() error {
type GroupwareSessionEventListener struct {
logger *log.Logger
sessionCache *ttlcache.Cache[string, cachedSession]
+ counter prometheus.Counter
}
func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) {
// it's enough to remove the session from the cache, as it will be fetched on-demand
// the next time an operation is performed on behalf of the user
l.sessionCache.Delete(session.Username)
+ if l.counter != nil {
+ l.counter.Inc()
+ }
l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState)
}
@@ -112,6 +126,35 @@ type Event struct {
Body any
}
+type ConstMetricCollector struct {
+ metric prometheus.Metric
+}
+
+func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) {
+ ch <- c.metric.Desc()
+}
+func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) {
+ ch <- c.metric
+}
+
+type SessionCacheMetricsCollector struct {
+ desc *prometheus.Desc
+ supply func() ttlcache.Metrics
+}
+
+func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
+ ch <- s.desc
+}
+func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
+ m := s.supply()
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.SessionCacheTypeEvictions)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.SessionCacheTypeInsertions)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.SessionCacheTypeHits)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.SessionCacheTypeMisses)
+}
+
+var _ prometheus.Collector = SessionCacheMetricsCollector{}
+
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) {
baseUrl, err := url.Parse(config.Mail.BaseUrl)
if err != nil {
@@ -130,6 +173,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
return nil, GroupwareInitializationError{Message: "Mail.Master.Password is empty"}
}
+ m := metrics.New()
+
defaultEmailLimit := max(config.Mail.DefaultEmailLimit, 0)
maxBodyValueBytes := max(config.Mail.MaxBodyValueBytes, 0)
responseHeaderTimeout := max(config.Mail.ResponseHeaderTimeout, 0)
@@ -137,7 +182,12 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
sessionCacheTtl := max(config.Mail.SessionCache.Ttl, 0)
sessionFailureCacheTtl := max(config.Mail.SessionCache.FailureTtl, 0)
- keepStreamsAlive := true // TODO configuration
+ eventChannelSize := 100 // TODO make channel queue buffering size configurable
+ workerQueueSize := 100 // TODO configuration setting
+ workerPoolSize := 10 // TODO configuration setting
+
+ keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive
+ sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = responseHeaderTimeout
@@ -172,6 +222,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
ttlcache.WithLoader(sessionLoader),
)
go sessionCache.Start()
+
+ prometheus.Register(SessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics})
}
if logger.Trace().Enabled() {
@@ -196,20 +248,113 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
})
}
- sessionEventListener := GroupwareSessionEventListener{sessionCache: sessionCache, logger: logger}
+ sessionEventListener := GroupwareSessionEventListener{
+ sessionCache: sessionCache,
+ logger: logger,
+ counter: prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: metrics.Namespace,
+ Subsystem: metrics.Subsystem,
+ Name: "outdated_sessions",
+ Help: "Counts outdated session events",
+ }),
+ }
jmapClient.AddSessionEventListener(&sessionEventListener)
- eventChannel := make(chan Event, 100) // TODO make channel queue buffering size configurable
+ eventChannel := make(chan Event, eventChannelSize)
+ {
+ totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
+ prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "event_buffer_size"),
+ "Size of the buffer channel for server-sent events to process",
+ nil,
+ nil,
+ ), prometheus.GaugeValue, float64(eventChannelSize))
+ if err != nil {
+ logger.Warn().Err(err).Msg("failed to create event_buffer_size metric")
+ } else {
+ prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
+ }
+
+ prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: metrics.Namespace,
+ Subsystem: metrics.Subsystem,
+ Name: "event_buffer_queued",
+ Help: "Number of queued server-sent events",
+ }, func() float64 {
+ return float64(len(eventChannel))
+ }))
+ }
sseServer := sse.New()
- sseServer.EventTTL = time.Duration(5) * time.Minute // TODO configuration setting
+ sseServer.EventTTL = sseEventTtl
+ {
+ var sseSubscribers atomic.Int32
+ sseServer.OnSubscribe = func(streamID string, sub *sse.Subscriber) {
+ sseSubscribers.Add(1)
+ }
+ sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) {
+ sseSubscribers.Add(-1)
+ }
+ prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: metrics.Namespace,
+ Subsystem: metrics.Subsystem,
+ Name: "sse_subscribers",
+ Help: "Number of subscribers for server-sent event streams",
+ }, func() float64 {
+ return float64(sseSubscribers.Load())
+ }))
+ }
- workerQueueSize := 100 // TODO configuration setting
- workerPoolSize := 10 // TODO configuration setting
jobsChannel := make(chan Job, workerQueueSize)
+ {
+ totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
+ prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_buffer_size"),
+ "Size of the buffer channel for background worker jobs",
+ nil,
+ nil,
+ ), prometheus.GaugeValue, float64(workerQueueSize))
+ if err != nil {
+ logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric")
+ } else {
+ prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
+ }
+
+ prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: metrics.Namespace,
+ Subsystem: metrics.Subsystem,
+ Name: "workers_buffer_queued",
+ Help: "Number of queued background jobs",
+ }, func() float64 {
+ return float64(len(jobsChannel))
+ }))
+ }
+
+ var busyWorkers atomic.Int32
+ {
+ totalWorkersMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
+ prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_total"),
+ "Total amount of background job workers",
+ nil,
+ nil,
+ ), prometheus.GaugeValue, float64(workerPoolSize))
+ if err != nil {
+ logger.Warn().Err(err).Msg("failed to create workers_total metric")
+ } else {
+ prometheus.MustRegister(ConstMetricCollector{metric: totalWorkersMetric})
+ }
+
+ prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: metrics.Namespace,
+ Subsystem: metrics.Subsystem,
+ Name: "workers_busy",
+ Help: "Number of background job workers that are currently busy executing jobs",
+ }, func() float64 {
+ return float64(busyWorkers.Load())
+ }))
+ }
g := &Groupware{
mux: mux,
+ metrics: m,
sseServer: sseServer,
streams: map[string]time.Time{},
streamsLock: sync.Mutex{},
@@ -225,11 +370,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
}
for w := 1; w <= workerPoolSize; w++ {
- go g.worker(jobsChannel)
+ go g.worker(jobsChannel, &busyWorkers)
}
- if keepStreamsAlive {
- ticker := time.NewTicker(time.Duration(30) * time.Second) // TODO configuration
+ if keepStreamsAliveInterval != 0 {
+ ticker := time.NewTicker(keepStreamsAliveInterval)
//defer ticker.Stop()
go func() {
for range ticker.C {
@@ -243,12 +388,16 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
return g, nil
}
-func (g *Groupware) worker(jobs <-chan Job) {
+func (g *Groupware) worker(jobs <-chan Job, busy *atomic.Int32) {
for job := range jobs {
+ busy.Add(1)
before := time.Now()
- logger := log.From(job.logger.With().Str("job", job.description).Uint64("job-id", job.id))
+ logger := log.From(job.logger.With().Str(logJobDescription, job.description).Uint64(logJobId, job.id))
job.job(job.id, logger)
- logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove
+ if logger.Trace().Enabled() {
+ logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove
+ }
+ busy.Add(-1)
}
}
@@ -269,7 +418,7 @@ func (g *Groupware) listenForEvents() {
Data: data,
})
if !published && g.logger.Debug().Enabled() {
- g.logger.Debug().Str("stream", log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details
+ g.logger.Debug().Str(logStreamId, log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details
}
} else {
g.logger.Error().Err(err).Msgf("failed to serialize %T body to JSON", ev)
@@ -778,7 +927,7 @@ func (g *Groupware) NotFound(w http.ResponseWriter, r *http.Request) {
if level.Enabled() {
path := log.SafeString(r.URL.Path)
method := log.SafeString(r.Method)
- level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path)
+ level.Str(logPath, path).Str(logMethod, method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path)
}
w.WriteHeader(http.StatusNotFound)
}
@@ -788,7 +937,7 @@ func (g *Groupware) MethodNotAllowed(w http.ResponseWriter, r *http.Request) {
if level.Enabled() {
path := log.SafeString(r.URL.Path)
method := log.SafeString(r.Method)
- level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method)
+ level.Str(logPath, path).Str(logMethod, method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method)
}
w.WriteHeader(http.StatusNotFound)
}
diff --git a/services/groupware/pkg/metrics/http_metrics.go b/services/groupware/pkg/metrics/http_metrics.go
new file mode 100644
index 0000000000..3cd08592e7
--- /dev/null
+++ b/services/groupware/pkg/metrics/http_metrics.go
@@ -0,0 +1,11 @@
+package metrics
+
+type HttpMetrics struct {
+}
+
+// New initializes the available metrics.
+func NewHttpMetrics() *HttpMetrics {
+ m := &HttpMetrics{}
+
+ return m
+}
diff --git a/services/groupware/pkg/metrics/metrics.go b/services/groupware/pkg/metrics/metrics.go
index 7d9fa58ec9..ca2d3b47bf 100644
--- a/services/groupware/pkg/metrics/metrics.go
+++ b/services/groupware/pkg/metrics/metrics.go
@@ -1,8 +1,10 @@
package metrics
-import "github.com/prometheus/client_golang/prometheus"
+import (
+ "github.com/prometheus/client_golang/prometheus"
+)
-var (
+const (
// Namespace defines the namespace for the defines metrics.
Namespace = "opencloud"
@@ -12,23 +14,50 @@ var (
// Metrics defines the available metrics of this service.
type Metrics struct {
- BuildInfo *prometheus.GaugeVec
+ SessionCacheDesc *prometheus.Desc
+ /*SSessionCache *prometheus.GaugeVec*/
+ EmailByIdDuration *prometheus.HistogramVec
+}
+
+const (
+ ResultFound = "found"
+ ResultNotFound = "not-found"
+
+ SessionCacheTypeInsertions = "insertions"
+ SessionCacheTypeHits = "hits"
+ SessionCacheTypeMisses = "misses"
+ SessionCacheTypeEvictions = "evictions"
+)
+
+var Labels = struct {
+ Endpoint string
+ Result string
+ SessionCacheType string
+ RequestId string
+}{
+ Endpoint: "endpoint",
+ Result: "result",
+ SessionCacheType: "type",
+ RequestId: "requestId",
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
- BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ SessionCacheDesc: prometheus.NewDesc(
+ prometheus.BuildFQName(Namespace, Subsystem, "session_cache"),
+ "Session cache statistics",
+ []string{Labels.SessionCacheType},
+ nil,
+ ),
+ EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
- Name: "build_info",
- Help: "Build information",
- }, []string{"version"}),
+ //Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200},
+ Name: "email_by_id_duration_seconds",
+ Help: "Duration in seconds for retrieving an Email by its id",
+ }, []string{Labels.Endpoint, Labels.Result}),
}
- _ = prometheus.Register(
- m.BuildInfo,
- )
-
return m
}
diff --git a/services/groupware/pkg/metrics/startup_metrics.go b/services/groupware/pkg/metrics/startup_metrics.go
new file mode 100644
index 0000000000..9efcc8bfa8
--- /dev/null
+++ b/services/groupware/pkg/metrics/startup_metrics.go
@@ -0,0 +1,28 @@
+package metrics
+
+import (
+ "sync/atomic"
+
+ "github.com/opencloud-eu/opencloud/pkg/version"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var registered atomic.Bool
+
+func StartupMetrics() {
+ // use an atomic boolean to make the operation idempotent,
+ // instead of causing a panic in case this function is
+ // called twice
+ if registered.CompareAndSwap(false, true) {
+ // https://github.com/prometheus/common/blob/8558a5b7db3c84fa38b4766966059a7bd5bfa2ee/version/info.go#L36-L56
+ prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "build_info",
+ Help: "Build information",
+ ConstLabels: prometheus.Labels{
+ "version": version.GetString(),
+ },
+ }, func() float64 { return 1 }))
+ }
+}
diff --git a/services/groupware/pkg/server/http/option.go b/services/groupware/pkg/server/http/option.go
index eb54b7288c..ed4d76a76f 100644
--- a/services/groupware/pkg/server/http/option.go
+++ b/services/groupware/pkg/server/http/option.go
@@ -20,7 +20,7 @@ type Options struct {
Logger log.Logger
Context context.Context
Config *config.Config
- Metrics *metrics.Metrics
+ Metrics *metrics.HttpMetrics
Flags []cli.Flag
TraceProvider trace.TracerProvider
}
@@ -58,7 +58,7 @@ func Config(val *config.Config) Option {
}
// Metrics provides a function to set the metrics option.
-func Metrics(val *metrics.Metrics) Option {
+func Metrics(val *metrics.HttpMetrics) Option {
return func(o *Options) {
o.Metrics = val
}
diff --git a/services/groupware/pkg/service/http/v0/instrument.go b/services/groupware/pkg/service/http/v0/instrument.go
index d3b17817fe..bb63e5160e 100644
--- a/services/groupware/pkg/service/http/v0/instrument.go
+++ b/services/groupware/pkg/service/http/v0/instrument.go
@@ -7,7 +7,7 @@ import (
)
// NewInstrument returns a service that instruments metrics.
-func NewInstrument(next Service, metrics *metrics.Metrics) Service {
+func NewInstrument(next Service, metrics *metrics.HttpMetrics) Service {
return instrument{
next: next,
metrics: metrics,
@@ -16,7 +16,7 @@ func NewInstrument(next Service, metrics *metrics.Metrics) Service {
type instrument struct {
next Service
- metrics *metrics.Metrics
+ metrics *metrics.HttpMetrics
}
// ServeHTTP implements the Service interface.
diff --git a/services/groupware/pkg/service/http/v0/service.go b/services/groupware/pkg/service/http/v0/service.go
index 2eea65bf17..6d56c48bd0 100644
--- a/services/groupware/pkg/service/http/v0/service.go
+++ b/services/groupware/pkg/service/http/v0/service.go
@@ -10,6 +10,7 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/groupware"
+ "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
// Service defines the service handlers.
@@ -52,5 +53,7 @@ func NewService(opts ...Option) (Service, error) {
}
}
+ metrics.StartupMetrics()
+
return gw, nil
}