diff --git a/services/groupware/pkg/command/server.go b/services/groupware/pkg/command/server.go index 613f0f3aa3..150329029b 100644 --- a/services/groupware/pkg/command/server.go +++ b/services/groupware/pkg/command/server.go @@ -7,7 +7,6 @@ import ( "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/tracing" - "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/groupware/pkg/config" "github.com/opencloud-eu/opencloud/services/groupware/pkg/config/parser" "github.com/opencloud-eu/opencloud/services/groupware/pkg/logging" @@ -37,13 +36,11 @@ func Server(cfg *config.Config) *cli.Command { var ( gr = run.Group{} ctx, cancel = context.WithCancel(c.Context) - m = metrics.New() + m = metrics.NewHttpMetrics() ) defer cancel() - m.BuildInfo.WithLabelValues(version.GetString()).Set(1) - server, err := debug.Server( debug.Logger(logger), debug.Config(cfg), diff --git a/services/groupware/pkg/groupware/groupware_api_messages.go b/services/groupware/pkg/groupware/groupware_api_messages.go index d4dd8719cc..47a3a29edf 100644 --- a/services/groupware/pkg/groupware/groupware_api_messages.go +++ b/services/groupware/pkg/groupware/groupware_api_messages.go @@ -9,8 +9,11 @@ import ( "github.com/go-chi/chi/v5" + "github.com/prometheus/client_golang/prometheus" + "github.com/opencloud-eu/opencloud/pkg/jmap" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics" ) // When the request succeeds without a "since" query parameter. @@ -574,14 +577,23 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) { reqId := req.GetRequestId() accountId := req.GetAccountId() logger := log.From(req.logger.With().Str(logEmailId, log.SafeString(id))) + getEmailsBefore := time.Now() emails, sessionState, jerr := g.jmap.GetEmails(accountId, req.session, req.ctx, logger, []string{id}, true, g.maxBodyValueBytes) + getEmailsDuration := time.Since(getEmailsBefore) if jerr != nil { return req.errorResponseFromJmap(jerr) } if len(emails.Emails) < 1 { + g.metrics.EmailByIdDuration.WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).Observe(getEmailsDuration.Seconds()) logger.Trace().Msg("failed to find any emails matching id") // the id is already in the log field return notFoundResponse(sessionState) } + g.metrics.EmailByIdDuration. + WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).(prometheus.ExemplarObserver). + ObserveWithExemplar(getEmailsDuration.Seconds(), prometheus.Labels{ + metrics.Labels.RequestId: reqId, + }) + email := emails.Emails[0] beacon := email.ReceivedAt // TODO configurable: either relative to when the email was received, or relative to now diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go index bb9cf13f3b..149ac6b3af 100644 --- a/services/groupware/pkg/groupware/groupware_framework.go +++ b/services/groupware/pkg/groupware/groupware_framework.go @@ -19,11 +19,15 @@ import ( "github.com/r3labs/sse/v2" "github.com/rs/zerolog" + "github.com/prometheus/client_golang/prometheus" + "github.com/jellydator/ttlcache/v3" "github.com/opencloud-eu/opencloud/pkg/jmap" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/services/groupware/pkg/config" + "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics" ) const ( @@ -40,6 +44,11 @@ const ( logFolderId = "folder-id" logQuery = "query" logEmailId = "email-id" + logJobDescription = "job" + logJobId = "job-id" + logStreamId = "stream-id" + logPath = "path" + logMethod = "method" ) type User interface { @@ -61,6 +70,7 @@ type Job struct { type Groupware struct { mux *chi.Mux + metrics *metrics.Metrics sseServer *sse.Server streams map[string]time.Time streamsLock sync.Mutex @@ -94,12 +104,16 @@ func (e GroupwareInitializationError) Unwrap() error { type GroupwareSessionEventListener struct { logger *log.Logger sessionCache *ttlcache.Cache[string, cachedSession] + counter prometheus.Counter } func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) { // it's enough to remove the session from the cache, as it will be fetched on-demand // the next time an operation is performed on behalf of the user l.sessionCache.Delete(session.Username) + if l.counter != nil { + l.counter.Inc() + } l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState) } @@ -112,6 +126,35 @@ type Event struct { Body any } +type ConstMetricCollector struct { + metric prometheus.Metric +} + +func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.metric.Desc() +} +func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) { + ch <- c.metric +} + +type SessionCacheMetricsCollector struct { + desc *prometheus.Desc + supply func() ttlcache.Metrics +} + +func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- s.desc +} +func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) { + m := s.supply() + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.SessionCacheTypeEvictions) + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.SessionCacheTypeInsertions) + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.SessionCacheTypeHits) + ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.SessionCacheTypeMisses) +} + +var _ prometheus.Collector = SessionCacheMetricsCollector{} + func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) { baseUrl, err := url.Parse(config.Mail.BaseUrl) if err != nil { @@ -130,6 +173,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro return nil, GroupwareInitializationError{Message: "Mail.Master.Password is empty"} } + m := metrics.New() + defaultEmailLimit := max(config.Mail.DefaultEmailLimit, 0) maxBodyValueBytes := max(config.Mail.MaxBodyValueBytes, 0) responseHeaderTimeout := max(config.Mail.ResponseHeaderTimeout, 0) @@ -137,7 +182,12 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro sessionCacheTtl := max(config.Mail.SessionCache.Ttl, 0) sessionFailureCacheTtl := max(config.Mail.SessionCache.FailureTtl, 0) - keepStreamsAlive := true // TODO configuration + eventChannelSize := 100 // TODO make channel queue buffering size configurable + workerQueueSize := 100 // TODO configuration setting + workerPoolSize := 10 // TODO configuration setting + + keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive + sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting tr := http.DefaultTransport.(*http.Transport).Clone() tr.ResponseHeaderTimeout = responseHeaderTimeout @@ -172,6 +222,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro ttlcache.WithLoader(sessionLoader), ) go sessionCache.Start() + + prometheus.Register(SessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics}) } if logger.Trace().Enabled() { @@ -196,20 +248,113 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro }) } - sessionEventListener := GroupwareSessionEventListener{sessionCache: sessionCache, logger: logger} + sessionEventListener := GroupwareSessionEventListener{ + sessionCache: sessionCache, + logger: logger, + counter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.Subsystem, + Name: "outdated_sessions", + Help: "Counts outdated session events", + }), + } jmapClient.AddSessionEventListener(&sessionEventListener) - eventChannel := make(chan Event, 100) // TODO make channel queue buffering size configurable + eventChannel := make(chan Event, eventChannelSize) + { + totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc( + prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "event_buffer_size"), + "Size of the buffer channel for server-sent events to process", + nil, + nil, + ), prometheus.GaugeValue, float64(eventChannelSize)) + if err != nil { + logger.Warn().Err(err).Msg("failed to create event_buffer_size metric") + } else { + prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric}) + } + + prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.Subsystem, + Name: "event_buffer_queued", + Help: "Number of queued server-sent events", + }, func() float64 { + return float64(len(eventChannel)) + })) + } sseServer := sse.New() - sseServer.EventTTL = time.Duration(5) * time.Minute // TODO configuration setting + sseServer.EventTTL = sseEventTtl + { + var sseSubscribers atomic.Int32 + sseServer.OnSubscribe = func(streamID string, sub *sse.Subscriber) { + sseSubscribers.Add(1) + } + sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) { + sseSubscribers.Add(-1) + } + prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.Subsystem, + Name: "sse_subscribers", + Help: "Number of subscribers for server-sent event streams", + }, func() float64 { + return float64(sseSubscribers.Load()) + })) + } - workerQueueSize := 100 // TODO configuration setting - workerPoolSize := 10 // TODO configuration setting jobsChannel := make(chan Job, workerQueueSize) + { + totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc( + prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_buffer_size"), + "Size of the buffer channel for background worker jobs", + nil, + nil, + ), prometheus.GaugeValue, float64(workerQueueSize)) + if err != nil { + logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric") + } else { + prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric}) + } + + prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.Subsystem, + Name: "workers_buffer_queued", + Help: "Number of queued background jobs", + }, func() float64 { + return float64(len(jobsChannel)) + })) + } + + var busyWorkers atomic.Int32 + { + totalWorkersMetric, err := prometheus.NewConstMetric(prometheus.NewDesc( + prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_total"), + "Total amount of background job workers", + nil, + nil, + ), prometheus.GaugeValue, float64(workerPoolSize)) + if err != nil { + logger.Warn().Err(err).Msg("failed to create workers_total metric") + } else { + prometheus.MustRegister(ConstMetricCollector{metric: totalWorkersMetric}) + } + + prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.Subsystem, + Name: "workers_busy", + Help: "Number of background job workers that are currently busy executing jobs", + }, func() float64 { + return float64(busyWorkers.Load()) + })) + } g := &Groupware{ mux: mux, + metrics: m, sseServer: sseServer, streams: map[string]time.Time{}, streamsLock: sync.Mutex{}, @@ -225,11 +370,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro } for w := 1; w <= workerPoolSize; w++ { - go g.worker(jobsChannel) + go g.worker(jobsChannel, &busyWorkers) } - if keepStreamsAlive { - ticker := time.NewTicker(time.Duration(30) * time.Second) // TODO configuration + if keepStreamsAliveInterval != 0 { + ticker := time.NewTicker(keepStreamsAliveInterval) //defer ticker.Stop() go func() { for range ticker.C { @@ -243,12 +388,16 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro return g, nil } -func (g *Groupware) worker(jobs <-chan Job) { +func (g *Groupware) worker(jobs <-chan Job, busy *atomic.Int32) { for job := range jobs { + busy.Add(1) before := time.Now() - logger := log.From(job.logger.With().Str("job", job.description).Uint64("job-id", job.id)) + logger := log.From(job.logger.With().Str(logJobDescription, job.description).Uint64(logJobId, job.id)) job.job(job.id, logger) - logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove + if logger.Trace().Enabled() { + logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove + } + busy.Add(-1) } } @@ -269,7 +418,7 @@ func (g *Groupware) listenForEvents() { Data: data, }) if !published && g.logger.Debug().Enabled() { - g.logger.Debug().Str("stream", log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details + g.logger.Debug().Str(logStreamId, log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details } } else { g.logger.Error().Err(err).Msgf("failed to serialize %T body to JSON", ev) @@ -778,7 +927,7 @@ func (g *Groupware) NotFound(w http.ResponseWriter, r *http.Request) { if level.Enabled() { path := log.SafeString(r.URL.Path) method := log.SafeString(r.Method) - level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path) + level.Str(logPath, path).Str(logMethod, method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path) } w.WriteHeader(http.StatusNotFound) } @@ -788,7 +937,7 @@ func (g *Groupware) MethodNotAllowed(w http.ResponseWriter, r *http.Request) { if level.Enabled() { path := log.SafeString(r.URL.Path) method := log.SafeString(r.Method) - level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method) + level.Str(logPath, path).Str(logMethod, method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method) } w.WriteHeader(http.StatusNotFound) } diff --git a/services/groupware/pkg/metrics/http_metrics.go b/services/groupware/pkg/metrics/http_metrics.go new file mode 100644 index 0000000000..3cd08592e7 --- /dev/null +++ b/services/groupware/pkg/metrics/http_metrics.go @@ -0,0 +1,11 @@ +package metrics + +type HttpMetrics struct { +} + +// New initializes the available metrics. +func NewHttpMetrics() *HttpMetrics { + m := &HttpMetrics{} + + return m +} diff --git a/services/groupware/pkg/metrics/metrics.go b/services/groupware/pkg/metrics/metrics.go index 7d9fa58ec9..ca2d3b47bf 100644 --- a/services/groupware/pkg/metrics/metrics.go +++ b/services/groupware/pkg/metrics/metrics.go @@ -1,8 +1,10 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) -var ( +const ( // Namespace defines the namespace for the defines metrics. Namespace = "opencloud" @@ -12,23 +14,50 @@ var ( // Metrics defines the available metrics of this service. type Metrics struct { - BuildInfo *prometheus.GaugeVec + SessionCacheDesc *prometheus.Desc + /*SSessionCache *prometheus.GaugeVec*/ + EmailByIdDuration *prometheus.HistogramVec +} + +const ( + ResultFound = "found" + ResultNotFound = "not-found" + + SessionCacheTypeInsertions = "insertions" + SessionCacheTypeHits = "hits" + SessionCacheTypeMisses = "misses" + SessionCacheTypeEvictions = "evictions" +) + +var Labels = struct { + Endpoint string + Result string + SessionCacheType string + RequestId string +}{ + Endpoint: "endpoint", + Result: "result", + SessionCacheType: "type", + RequestId: "requestId", } // New initializes the available metrics. func New() *Metrics { m := &Metrics{ - BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + SessionCacheDesc: prometheus.NewDesc( + prometheus.BuildFQName(Namespace, Subsystem, "session_cache"), + "Session cache statistics", + []string{Labels.SessionCacheType}, + nil, + ), + EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: Namespace, Subsystem: Subsystem, - Name: "build_info", - Help: "Build information", - }, []string{"version"}), + //Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200}, + Name: "email_by_id_duration_seconds", + Help: "Duration in seconds for retrieving an Email by its id", + }, []string{Labels.Endpoint, Labels.Result}), } - _ = prometheus.Register( - m.BuildInfo, - ) - return m } diff --git a/services/groupware/pkg/metrics/startup_metrics.go b/services/groupware/pkg/metrics/startup_metrics.go new file mode 100644 index 0000000000..9efcc8bfa8 --- /dev/null +++ b/services/groupware/pkg/metrics/startup_metrics.go @@ -0,0 +1,28 @@ +package metrics + +import ( + "sync/atomic" + + "github.com/opencloud-eu/opencloud/pkg/version" + "github.com/prometheus/client_golang/prometheus" +) + +var registered atomic.Bool + +func StartupMetrics() { + // use an atomic boolean to make the operation idempotent, + // instead of causing a panic in case this function is + // called twice + if registered.CompareAndSwap(false, true) { + // https://github.com/prometheus/common/blob/8558a5b7db3c84fa38b4766966059a7bd5bfa2ee/version/info.go#L36-L56 + prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "build_info", + Help: "Build information", + ConstLabels: prometheus.Labels{ + "version": version.GetString(), + }, + }, func() float64 { return 1 })) + } +} diff --git a/services/groupware/pkg/server/http/option.go b/services/groupware/pkg/server/http/option.go index eb54b7288c..ed4d76a76f 100644 --- a/services/groupware/pkg/server/http/option.go +++ b/services/groupware/pkg/server/http/option.go @@ -20,7 +20,7 @@ type Options struct { Logger log.Logger Context context.Context Config *config.Config - Metrics *metrics.Metrics + Metrics *metrics.HttpMetrics Flags []cli.Flag TraceProvider trace.TracerProvider } @@ -58,7 +58,7 @@ func Config(val *config.Config) Option { } // Metrics provides a function to set the metrics option. -func Metrics(val *metrics.Metrics) Option { +func Metrics(val *metrics.HttpMetrics) Option { return func(o *Options) { o.Metrics = val } diff --git a/services/groupware/pkg/service/http/v0/instrument.go b/services/groupware/pkg/service/http/v0/instrument.go index d3b17817fe..bb63e5160e 100644 --- a/services/groupware/pkg/service/http/v0/instrument.go +++ b/services/groupware/pkg/service/http/v0/instrument.go @@ -7,7 +7,7 @@ import ( ) // NewInstrument returns a service that instruments metrics. -func NewInstrument(next Service, metrics *metrics.Metrics) Service { +func NewInstrument(next Service, metrics *metrics.HttpMetrics) Service { return instrument{ next: next, metrics: metrics, @@ -16,7 +16,7 @@ func NewInstrument(next Service, metrics *metrics.Metrics) Service { type instrument struct { next Service - metrics *metrics.Metrics + metrics *metrics.HttpMetrics } // ServeHTTP implements the Service interface. diff --git a/services/groupware/pkg/service/http/v0/service.go b/services/groupware/pkg/service/http/v0/service.go index 2eea65bf17..6d56c48bd0 100644 --- a/services/groupware/pkg/service/http/v0/service.go +++ b/services/groupware/pkg/service/http/v0/service.go @@ -10,6 +10,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/groupware/pkg/groupware" + "github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics" ) // Service defines the service handlers. @@ -52,5 +53,7 @@ func NewService(opts ...Option) (Service, error) { } } + metrics.StartupMetrics() + return gw, nil }