From 7adb66dd166b30079d00ed7eaea3d615211a35b8 Mon Sep 17 00:00:00 2001
From: Pascal Bleser
Date: Wed, 27 Aug 2025 17:23:51 +0200
Subject: [PATCH] groupware: improve metrics
* implement more metrics, in a more streamlined fashion
* use concurrent-map to store SSE streams instead of a regular map with
one big lock that will not scale when it grows, causing too much
contention on that one lock
* while testing error metrics, noticed a few bugs with error handling
when Stalwart is down: fixed
---
.../pkg/groupware/groupware_api_blob.go | 9 +-
.../pkg/groupware/groupware_api_mailbox.go | 26 +--
.../pkg/groupware/groupware_api_messages.go | 80 ++++----
.../pkg/groupware/groupware_error.go | 13 +-
.../pkg/groupware/groupware_framework.go | 151 +++++++++------
.../pkg/groupware/groupware_session.go | 12 +-
services/groupware/pkg/metrics/metrics.go | 173 ++++++++++++++++--
7 files changed, 314 insertions(+), 150 deletions(-)
diff --git a/services/groupware/pkg/groupware/groupware_api_blob.go b/services/groupware/pkg/groupware/groupware_api_blob.go
index d0647459f8..2ce7dd35ef 100644
--- a/services/groupware/pkg/groupware/groupware_api_blob.go
+++ b/services/groupware/pkg/groupware/groupware_api_blob.go
@@ -18,12 +18,7 @@ func (g *Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
blobId := chi.URLParam(req.r, UriParamBlobId)
if blobId == "" {
- errorId := req.errorId()
- msg := fmt.Sprintf("Invalid value for path parameter '%v': empty", UriParamBlobId)
- return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
- withDetail(msg),
- withSource(&ErrorSource{Parameter: UriParamBlobId}),
- ))
+ return req.parameterErrorResponse(UriParamBlobId, fmt.Sprintf("Invalid value for path parameter '%v': empty", UriParamBlobId))
}
res, _, err := g.jmap.GetBlob(req.GetAccountId(), req.session, req.ctx, req.logger, blobId)
@@ -102,7 +97,7 @@ func (g *Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) {
_, err := io.Copy(w, blob.Body)
if err != nil {
- return apiError(req.errorId(), ErrorStreamingResponse)
+ return req.observedParameterError(ErrorStreamingResponse)
}
return nil
diff --git a/services/groupware/pkg/groupware/groupware_api_mailbox.go b/services/groupware/pkg/groupware/groupware_api_mailbox.go
index 5d6bdf6019..2953d1b2ab 100644
--- a/services/groupware/pkg/groupware/groupware_api_mailbox.go
+++ b/services/groupware/pkg/groupware/groupware_api_mailbox.go
@@ -2,7 +2,6 @@ package groupware
import (
"net/http"
- "strconv"
"github.com/go-chi/chi/v5"
@@ -33,11 +32,6 @@ type SwaggerGetMailboxById200 struct {
// 500: ErrorResponse500
func (g *Groupware) GetMailbox(w http.ResponseWriter, r *http.Request) {
mailboxId := chi.URLParam(r, UriParamMailboxId)
- if mailboxId == "" {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
g.respond(w, r, func(req Request) Response {
res, sessionState, err := g.jmap.GetMailbox(req.GetAccountId(), req.session, req.ctx, req.logger, []string{mailboxId})
if err != nil {
@@ -102,19 +96,17 @@ func (g *Groupware) GetMailboxes(w http.ResponseWriter, r *http.Request) {
filter.Role = role
hasCriteria = true
}
- subscribed := q.Get(QueryParamMailboxSearchSubscribed)
- if subscribed != "" {
- b, err := strconv.ParseBool(subscribed)
- if err != nil {
- // TODO proper response object
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- filter.IsSubscribed = &b
- hasCriteria = true
- }
g.respond(w, r, func(req Request) Response {
+ subscribed, set, err := req.parseBoolParam(QueryParamMailboxSearchSubscribed, false)
+ if err != nil {
+ return errorResponse(err)
+ }
+ if set {
+ filter.IsSubscribed = &subscribed
+ hasCriteria = true
+ }
+
if hasCriteria {
mailboxes, sessionState, err := g.jmap.SearchMailboxes(req.GetAccountId(), req.session, req.ctx, req.logger, filter)
if err != nil {
diff --git a/services/groupware/pkg/groupware/groupware_api_messages.go b/services/groupware/pkg/groupware/groupware_api_messages.go
index 47a3a29edf..54014ef7b7 100644
--- a/services/groupware/pkg/groupware/groupware_api_messages.go
+++ b/services/groupware/pkg/groupware/groupware_api_messages.go
@@ -9,8 +9,6 @@ import (
"github.com/go-chi/chi/v5"
- "github.com/prometheus/client_golang/prometheus"
-
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
@@ -64,12 +62,7 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
maxChanges := uint(0)
g.respond(w, r, func(req Request) Response {
if mailboxId == "" {
- errorId := req.errorId()
- msg := fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId)
- return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
- withDetail(msg),
- withSource(&ErrorSource{Parameter: UriParamMailboxId}),
- ))
+ return req.parameterErrorResponse(UriParamMailboxId, fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId))
}
logger := log.From(req.logger.With().Str(HeaderSince, since))
@@ -84,14 +77,9 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
g.respond(w, r, func(req Request) Response {
l := req.logger.With()
if mailboxId == "" {
- errorId := req.errorId()
- msg := fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId)
- return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
- withDetail(msg),
- withSource(&ErrorSource{Parameter: UriParamMailboxId}),
- ))
+ return req.parameterErrorResponse(UriParamMailboxId, fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId))
}
- offset, ok, err := req.parseUNumericParam(QueryParamOffset, 0)
+ offset, ok, err := req.parseUIntParam(QueryParamOffset, 0)
if err != nil {
return errorResponse(err)
}
@@ -99,7 +87,7 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
l = l.Uint(QueryParamOffset, offset)
}
- limit, ok, err := req.parseUNumericParam(QueryParamLimit, g.defaultEmailLimit)
+ limit, ok, err := req.parseUIntParam(QueryParamLimit, g.defaultEmailLimit)
if err != nil {
return errorResponse(err)
}
@@ -124,12 +112,7 @@ func (g *Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
ids := strings.Split(id, ",")
if len(ids) < 1 {
- errorId := req.errorId()
- msg := fmt.Sprintf("Invalid value for path parameter '%v': '%s': %s", UriParamMessageId, log.SafeString(id), "empty list of mail ids")
- return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
- withDetail(msg),
- withSource(&ErrorSource{Parameter: UriParamMessageId}),
- ))
+ return req.parameterErrorResponse(UriParamMessageId, fmt.Sprintf("Invalid value for path parameter '%v': '%s': %s", UriParamMessageId, log.SafeString(id), "empty list of mail ids"))
}
logger := log.From(req.logger.With().Str("id", log.SafeString(id)))
@@ -145,7 +128,7 @@ func (g *Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) {
func (g *Groupware) getMessagesSince(w http.ResponseWriter, r *http.Request, since string) {
g.respond(w, r, func(req Request) Response {
l := req.logger.With().Str(QueryParamSince, since)
- maxChanges, ok, err := req.parseUNumericParam(QueryParamMaxChanges, 0)
+ maxChanges, ok, err := req.parseUIntParam(QueryParamMaxChanges, 0)
if err != nil {
return errorResponse(err)
}
@@ -187,7 +170,7 @@ type MessageSearchResults struct {
QueryState jmap.State `json:"queryState,omitempty"`
}
-func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uint, uint, *log.Logger, Response) {
+func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uint, uint, *log.Logger, *Error) {
q := req.r.URL.Query()
mailboxId := q.Get(QueryParamMailboxId)
notInMailboxIds := q[QueryParamNotInMailboxId]
@@ -202,17 +185,17 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
l := req.logger.With()
- offset, ok, err := req.parseUNumericParam(QueryParamOffset, 0)
+ offset, ok, err := req.parseUIntParam(QueryParamOffset, 0)
if err != nil {
- return false, nil, 0, 0, nil, errorResponse(err)
+ return false, nil, 0, 0, nil, err
}
if ok {
l = l.Uint(QueryParamOffset, offset)
}
- limit, ok, err := req.parseUNumericParam(QueryParamLimit, g.defaultEmailLimit)
+ limit, ok, err := req.parseUIntParam(QueryParamLimit, g.defaultEmailLimit)
if err != nil {
- return false, nil, 0, 0, nil, errorResponse(err)
+ return false, nil, 0, 0, nil, err
}
if ok {
l = l.Uint(QueryParamLimit, limit)
@@ -220,7 +203,7 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
before, ok, err := req.parseDateParam(QueryParamSearchBefore)
if err != nil {
- return false, nil, 0, 0, nil, errorResponse(err)
+ return false, nil, 0, 0, nil, err
}
if ok {
l = l.Time(QueryParamSearchBefore, before)
@@ -228,7 +211,7 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
after, ok, err := req.parseDateParam(QueryParamSearchAfter)
if err != nil {
- return false, nil, 0, 0, nil, errorResponse(err)
+ return false, nil, 0, 0, nil, err
}
if ok {
l = l.Time(QueryParamSearchAfter, after)
@@ -262,17 +245,17 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
l = l.Str(QueryParamSearchBody, log.SafeString(body))
}
- minSize, ok, err := req.parseNumericParam(QueryParamSearchMinSize, 0)
+ minSize, ok, err := req.parseIntParam(QueryParamSearchMinSize, 0)
if err != nil {
- return false, nil, 0, 0, nil, errorResponse(err)
+ return false, nil, 0, 0, nil, err
}
if ok {
l = l.Int(QueryParamSearchMinSize, minSize)
}
- maxSize, ok, err := req.parseNumericParam(QueryParamSearchMaxSize, 0)
+ maxSize, ok, err := req.parseIntParam(QueryParamSearchMaxSize, 0)
if err != nil {
- return false, nil, 0, 0, nil, errorResponse(err)
+ return false, nil, 0, 0, nil, err
}
if ok {
l = l.Int(QueryParamSearchMaxSize, maxSize)
@@ -314,14 +297,14 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
}
}
- return true, filter, offset, limit, logger, Response{}
+ return true, filter, offset, limit, logger, nil
}
func (g *Groupware) searchMessages(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
- ok, filter, offset, limit, logger, errResp := g.buildFilter(req)
+ ok, filter, offset, limit, logger, err := g.buildFilter(req)
if !ok {
- return errResp
+ return errorResponse(err)
}
if !filter.IsNotEmpty() {
@@ -498,7 +481,6 @@ func (g *Groupware) DeleteMessage(w http.ResponseWriter, r *http.Request) {
l := req.logger.With()
l.Str(UriParamMessageId, messageId)
-
logger := log.From(l)
_, sessionState, jerr := g.jmap.DeleteEmails(req.GetAccountId(), []string{messageId}, req.session, req.ctx, logger)
@@ -564,12 +546,12 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, UriParamMessageId)
g.respond(w, r, func(req Request) Response {
- limit, _, err := req.parseUNumericParam(QueryParamLimit, 10) // TODO configurable default limit
+ limit, _, err := req.parseUIntParam(QueryParamLimit, 10) // TODO configurable default limit
if err != nil {
return errorResponse(err)
}
- days, _, err := req.parseUNumericParam(QueryParamDays, 5) // TODO configurable default days
+ days, _, err := req.parseUIntParam(QueryParamDays, 5) // TODO configurable default days
if err != nil {
return errorResponse(err)
}
@@ -583,16 +565,14 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
+
if len(emails.Emails) < 1 {
- g.metrics.EmailByIdDuration.WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).Observe(getEmailsDuration.Seconds())
+ req.observe(g.metrics.EmailByIdDuration.WithLabelValues(req.session.JmapEndpoint, metrics.Values.Result.NotFound), getEmailsDuration.Seconds())
logger.Trace().Msg("failed to find any emails matching id") // the id is already in the log field
return notFoundResponse(sessionState)
+ } else {
+ req.observe(g.metrics.EmailByIdDuration.WithLabelValues(req.session.JmapEndpoint, metrics.Values.Result.Found), getEmailsDuration.Seconds())
}
- g.metrics.EmailByIdDuration.
- WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).(prometheus.ExemplarObserver).
- ObserveWithExemplar(getEmailsDuration.Seconds(), prometheus.Labels{
- metrics.Labels.RequestId: reqId,
- })
email := emails.Emails[0]
@@ -604,10 +584,14 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
bgctx := context.Background()
g.job(logger, RelationTypeSameSender, func(jobId uint64, l *log.Logger) {
+ before := time.Now()
results, _, jerr := g.jmap.QueryEmails(accountId, filter, req.session, bgctx, l, 0, limit, false, g.maxBodyValueBytes)
+ duration := time.Since(before)
if jerr != nil {
+ req.observeJmapError(jerr)
l.Error().Err(jerr).Msgf("failed to query %v emails", RelationTypeSameSender)
} else {
+ req.observe(g.metrics.EmailSameSenderDuration.WithLabelValues(req.session.JmapEndpoint), duration.Seconds())
related := filterEmails(results.Emails, email)
l.Trace().Msgf("'%v' found %v other emails", RelationTypeSameSender, len(related))
if len(related) > 0 {
@@ -617,10 +601,14 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
})
g.job(logger, RelationTypeSameThread, func(jobId uint64, l *log.Logger) {
+ before := time.Now()
emails, _, jerr := g.jmap.EmailsInThread(accountId, email.ThreadId, req.session, bgctx, l, false, g.maxBodyValueBytes)
+ duration := time.Since(before)
if jerr != nil {
+ req.observeJmapError(jerr)
l.Error().Err(jerr).Msgf("failed to list %v emails", RelationTypeSameThread)
} else {
+ req.observe(g.metrics.EmailSameThreadDuration.WithLabelValues(req.session.JmapEndpoint), duration.Seconds())
related := filterEmails(emails, email)
l.Trace().Msgf("'%v' found %v other emails", RelationTypeSameThread, len(related))
if len(related) > 0 {
diff --git a/services/groupware/pkg/groupware/groupware_error.go b/services/groupware/pkg/groupware/groupware_error.go
index eeb01ba16e..c6e1241185 100644
--- a/services/groupware/pkg/groupware/groupware_error.go
+++ b/services/groupware/pkg/groupware/groupware_error.go
@@ -160,6 +160,7 @@ const (
ErrorCodeInvalidRequestPayload = "INVRQP"
ErrorCodeInvalidResponsePayload = "INVRSP"
ErrorCodeInvalidRequestParameter = "INVPAR"
+ ErrorCodeInvalidRequestBody = "INVBDY"
ErrorCodeNonExistingAccount = "INVACC"
ErrorCodeApiInconsistency = "APIINC"
ErrorCodeInvalidUserRequest = "INVURQ"
@@ -262,6 +263,12 @@ var (
Title: "Invalid Request Parameter",
Detail: "At least one of the parameters in the request is invalid.",
}
+ ErrorInvalidRequestBody = GroupwareError{
+ Status: http.StatusBadRequest,
+ Code: ErrorCodeInvalidRequestBody,
+ Title: "Invalid Request Body",
+ Detail: "The body of the request is invalid.",
+ }
ErrorInvalidUserRequest = GroupwareError{
Status: http.StatusBadRequest,
Code: ErrorCodeInvalidUserRequest,
@@ -432,6 +439,10 @@ func apiError(id string, gwerr GroupwareError, options ...ErrorOpt) *Error {
return err
}
+func (r Request) observedParameterError(gwerr GroupwareError, options ...ErrorOpt) *Error {
+ return r.observeParameterError(apiError(r.errorId(), gwerr, options...))
+}
+
func (r Request) apiErrorFromJmap(err jmap.Error) *Error {
if err == nil {
return nil
@@ -450,5 +461,5 @@ func errorResponses(errors ...Error) ErrorResponse {
}
func (r Request) errorResponseFromJmap(err jmap.Error) Response {
- return errorResponse(r.apiErrorFromJmap(err))
+ return errorResponse(r.apiErrorFromJmap(r.observeJmapError(err)))
}
diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go
index 149ac6b3af..c85eac6189 100644
--- a/services/groupware/pkg/groupware/groupware_framework.go
+++ b/services/groupware/pkg/groupware/groupware_framework.go
@@ -9,7 +9,6 @@ import (
"net/http"
"net/url"
"strconv"
- "sync"
"sync/atomic"
"time"
@@ -23,11 +22,14 @@ import (
"github.com/jellydator/ttlcache/v3"
+ cmap "github.com/orcaman/concurrent-map"
+
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
+ groupwaremiddleware "github.com/opencloud-eu/opencloud/services/groupware/pkg/middleware"
)
const (
@@ -72,8 +74,7 @@ type Groupware struct {
mux *chi.Mux
metrics *metrics.Metrics
sseServer *sse.Server
- streams map[string]time.Time
- streamsLock sync.Mutex
+ streams cmap.ConcurrentMap
logger *log.Logger
defaultEmailLimit uint
maxBodyValueBytes uint
@@ -147,10 +148,10 @@ func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
}
func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m := s.supply()
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.SessionCacheTypeEvictions)
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.SessionCacheTypeInsertions)
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.SessionCacheTypeHits)
- ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.SessionCacheTypeMisses)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits)
+ ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses)
}
var _ prometheus.Collector = SessionCacheMetricsCollector{}
@@ -173,8 +174,6 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
return nil, GroupwareInitializationError{Message: "Mail.Master.Password is empty"}
}
- m := metrics.New()
-
defaultEmailLimit := max(config.Mail.DefaultEmailLimit, 0)
maxBodyValueBytes := max(config.Mail.MaxBodyValueBytes, 0)
responseHeaderTimeout := max(config.Mail.ResponseHeaderTimeout, 0)
@@ -189,10 +188,16 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive
sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting
+ insecure := true // TODO make configurable
+
+ m := metrics.New(logger)
+
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = responseHeaderTimeout
- tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable
- tr.TLSClientConfig = tlsConfig
+ if insecure {
+ tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable
+ tr.TLSClientConfig = tlsConfig
+ }
c := *http.DefaultClient
c.Transport = tr
@@ -203,6 +208,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
&c,
masterUsername,
masterPassword,
+ jmap.HttpJmapApiClientMetrics{
+ SuccessfulRequestPerEndpointCounter: m.SuccessfulRequestPerEndpointCounter,
+ FailedRequestPerEndpointCounter: m.FailedRequestPerEndpointCounter,
+ FailedRequestStatusPerEndpointCounter: m.FailedRequestStatusPerEndpointCounter,
+ },
)
jmapClient := jmap.NewClient(api, api, api)
@@ -243,8 +253,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
reason = fmt.Sprintf("unknown (%v)", r)
}
spentInCache := time.Since(item.Value().Since())
-
- logger.Trace().Msgf("session cache eviction of user '%v' after %v: %v", item.Key(), spentInCache, reason)
+ typ := "successful"
+ if !item.Value().Success() {
+ typ = "failed"
+ }
+ logger.Trace().Msgf("%s session cache eviction of user '%v' after %v: %v", typ, item.Key(), spentInCache, reason)
})
}
@@ -254,7 +267,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
counter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
- Name: "outdated_sessions",
+ Name: "outdated_sessions_count",
Help: "Counts outdated session events",
}),
}
@@ -356,8 +369,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
mux: mux,
metrics: m,
sseServer: sseServer,
- streams: map[string]time.Time{},
- streamsLock: sync.Mutex{},
+ streams: cmap.New(),
logger: logger,
sessionCache: sessionCache,
userProvider: userProvider,
@@ -427,6 +439,7 @@ func (g *Groupware) listenForEvents() {
}
func (g *Groupware) push(user User, typ string, body any) {
+ g.metrics.SSEEventsCounter.WithLabelValues(typ).Inc()
g.eventChannel <- Event{Type: typ, Stream: user.GetUsername(), Body: body}
}
@@ -435,23 +448,14 @@ func (g *Groupware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (g *Groupware) addStream(stream string) bool {
- g.streamsLock.Lock()
- defer g.streamsLock.Unlock()
- _, ok := g.streams[stream]
- if ok {
- return false
- }
- g.streams[stream] = time.Now()
- return true
+ return g.streams.SetIfAbsent(stream, time.Now())
}
func (g *Groupware) keepStreamsAlive() {
event := &sse.Event{Comment: []byte("keepalive")}
- g.streamsLock.Lock()
- defer g.streamsLock.Unlock()
- for stream := range g.streams {
+ g.streams.IterCb(func(stream string, created any) {
g.sseServer.Publish(stream, event)
- }
+ })
}
func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) {
@@ -475,7 +479,7 @@ func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) {
}
// Provide a JMAP Session for the
-func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, error) {
+func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, *GroupwareError) {
item := g.sessionCache.Get(user.GetUsername())
if item != nil {
value := item.Value()
@@ -598,6 +602,10 @@ func (r Request) GetRequestId() string {
return chimiddleware.GetReqID(r.ctx)
}
+func (r Request) GetTraceId() string {
+ return groupwaremiddleware.GetTraceID(r.ctx)
+}
+
func (r Request) GetAccountId() string {
accountId := chi.URLParam(r.r, UriParamAccount)
return r.session.MailAccountId(accountId)
@@ -608,9 +616,9 @@ func (r Request) GetAccount() (jmap.SessionAccount, *Error) {
account, ok := r.session.Accounts[accountId]
if !ok {
- errorId := r.errorId()
r.logger.Debug().Msgf("failed to find account '%v'", accountId)
- return jmap.SessionAccount{}, apiError(errorId, ErrorNonExistingAccount,
+ // TODO metric for inexistent accounts
+ return jmap.SessionAccount{}, apiError(r.errorId(), ErrorNonExistingAccount,
withDetail(fmt.Sprintf("The account '%v' does not exist", log.SafeString(accountId))),
withSource(&ErrorSource{Parameter: UriParamAccount}),
)
@@ -618,7 +626,17 @@ func (r Request) GetAccount() (jmap.SessionAccount, *Error) {
return account, nil
}
-func (r Request) parseNumericParam(param string, defaultValue int) (int, bool, *Error) {
+func (r Request) parameterError(param string, detail string) *Error {
+ return r.observedParameterError(ErrorInvalidRequestParameter,
+ withDetail(detail),
+ withSource(&ErrorSource{Parameter: param}))
+}
+
+func (r Request) parameterErrorResponse(param string, detail string) Response {
+ return errorResponse(r.parameterError(param, detail))
+}
+
+func (r Request) parseIntParam(param string, defaultValue int) (int, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
@@ -631,11 +649,10 @@ func (r Request) parseNumericParam(param string, defaultValue int) (int, bool, *
value, err := strconv.ParseInt(str, 10, 0)
if err != nil {
- errorId := r.errorId()
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
- return defaultValue, true, apiError(errorId, ErrorInvalidRequestParameter,
+ return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -643,7 +660,7 @@ func (r Request) parseNumericParam(param string, defaultValue int) (int, bool, *
return int(value), true, nil
}
-func (r Request) parseUNumericParam(param string, defaultValue uint) (uint, bool, *Error) {
+func (r Request) parseUIntParam(param string, defaultValue uint) (uint, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
@@ -656,11 +673,10 @@ func (r Request) parseUNumericParam(param string, defaultValue uint) (uint, bool
value, err := strconv.ParseUint(str, 10, 0)
if err != nil {
- errorId := r.errorId()
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
- return defaultValue, true, apiError(errorId, ErrorInvalidRequestParameter,
+ return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -681,9 +697,8 @@ func (r Request) parseDateParam(param string) (time.Time, bool, *Error) {
t, err := time.Parse(time.RFC3339, str)
if err != nil {
- errorId := r.errorId()
msg := fmt.Sprintf("Invalid RFC3339 value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
- return time.Time{}, true, apiError(errorId, ErrorInvalidRequestParameter,
+ return time.Time{}, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -704,9 +719,8 @@ func (r Request) parseBoolParam(param string, defaultValue bool) (bool, bool, *E
b, err := strconv.ParseBool(str)
if err != nil {
- errorId := r.errorId()
msg := fmt.Sprintf("Invalid boolean value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
- return defaultValue, true, apiError(errorId, ErrorInvalidRequestParameter,
+ return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -725,11 +739,29 @@ func (r Request) body(target any) *Error {
err := json.NewDecoder(body).Decode(target)
if err != nil {
- // TODO(pbleser-oc) error handling when failing to decode body
+ return r.observedParameterError(ErrorInvalidRequestBody, withSource(&ErrorSource{Pointer: "/"})) // we don't get any details here
}
return nil
}
+func (r Request) observe(obs prometheus.Observer, value float64) {
+ metrics.WithExemplar(obs, value, r.GetRequestId(), r.GetTraceId())
+}
+
+func (r Request) observeParameterError(err *Error) *Error {
+ if err != nil {
+ r.g.metrics.ParameterErrorCounter.WithLabelValues(err.Code).Inc()
+ }
+ return err
+}
+
+func (r Request) observeJmapError(jerr jmap.Error) jmap.Error {
+ if jerr != nil {
+ r.g.metrics.JmapErrorCounter.WithLabelValues(r.session.JmapEndpoint, strconv.Itoa(jerr.Code())).Inc()
+ }
+ return jerr
+}
+
func (g *Groupware) log(error *Error) {
var level *zerolog.Event
if error.NumStatus < 300 {
@@ -781,26 +813,33 @@ func (g *Groupware) withSession(w http.ResponseWriter, r *http.Request, handler
user, err := g.userProvider.GetUser(r, ctx, logger)
if err != nil {
+ g.metrics.AuthenticationFailureCounter.Inc()
g.serveError(w, r, apiError(errorId(r, ctx), ErrorInvalidAuthentication))
return Response{}, false
}
if user == nil {
+ g.metrics.AuthenticationFailureCounter.Inc()
g.serveError(w, r, apiError(errorId(r, ctx), ErrorMissingAuthentication))
return Response{}, false
}
logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
- session, ok, err := g.session(user, r, ctx, logger)
- if err != nil {
- logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session")
- render.Status(r, http.StatusInternalServerError)
+ session, ok, gwerr := g.session(user, r, ctx, logger)
+ if gwerr != nil {
+ g.metrics.SessionFailureCounter.Inc()
+ errorId := errorId(r, ctx)
+ logger.Error().Str("code", gwerr.Code).Str("error", gwerr.Title).Str("detail", gwerr.Detail).Str(logErrorId, errorId).Msg("failed to determine JMAP session")
+ g.serveError(w, r, apiError(errorId, *gwerr))
return Response{}, false
}
if !ok {
// no session = authentication failed
- logger.Warn().Err(err).Interface(logQuery, r.URL.Query()).Msg("could not authenticate")
- render.Status(r, http.StatusForbidden)
+ g.metrics.SessionFailureCounter.Inc()
+ errorId := errorId(r, ctx)
+ logger.Error().Str(logErrorId, errorId).Msg("could not authenticate, failed to find Session")
+ gwerr = &ErrorInvalidAuthentication
+ g.serveError(w, r, apiError(errorId, *gwerr))
return Response{}, false
}
decoratedLogger := session.DecorateLogger(*logger)
@@ -891,18 +930,22 @@ func (g *Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(
logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
- session, ok, err := g.session(user, r, ctx, logger)
- if err != nil {
- logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session")
- render.Status(r, http.StatusInternalServerError)
+ session, ok, gwerr := g.session(user, r, ctx, logger)
+ if gwerr != nil {
+ errorId := errorId(r, ctx)
+ logger.Error().Str("code", gwerr.Code).Str("error", gwerr.Title).Str("detail", gwerr.Detail).Str(logErrorId, errorId).Msg("failed to determine JMAP session")
+ g.serveError(w, r, apiError(errorId, *gwerr))
return
}
if !ok {
// no session = authentication failed
- logger.Warn().Err(err).Interface(logQuery, r.URL.Query()).Msg("could not authenticate")
- render.Status(r, http.StatusForbidden)
+ errorId := errorId(r, ctx)
+ logger.Error().Str(logErrorId, errorId).Msg("could not authenticate, failed to find Session")
+ gwerr = &ErrorInvalidAuthentication
+ g.serveError(w, r, apiError(errorId, *gwerr))
return
}
+
decoratedLogger := session.DecorateLogger(*logger)
req := Request{
diff --git a/services/groupware/pkg/groupware/groupware_session.go b/services/groupware/pkg/groupware/groupware_session.go
index 19a6a1d0da..569494881a 100644
--- a/services/groupware/pkg/groupware/groupware_session.go
+++ b/services/groupware/pkg/groupware/groupware_session.go
@@ -11,7 +11,7 @@ import (
type cachedSession interface {
Success() bool
Get() jmap.Session
- Error() error
+ Error() *GroupwareError
Since() time.Time
}
@@ -26,7 +26,7 @@ func (s succeededSession) Success() bool {
func (s succeededSession) Get() jmap.Session {
return s.session
}
-func (s succeededSession) Error() error {
+func (s succeededSession) Error() *GroupwareError {
return nil
}
func (s succeededSession) Since() time.Time {
@@ -37,7 +37,7 @@ var _ cachedSession = succeededSession{}
type failedSession struct {
since time.Time
- err error
+ err *GroupwareError
}
func (s failedSession) Success() bool {
@@ -46,7 +46,7 @@ func (s failedSession) Success() bool {
func (s failedSession) Get() jmap.Session {
panic("this should never be called")
}
-func (s failedSession) Error() error {
+func (s failedSession) Error() *GroupwareError {
return s.err
}
func (s failedSession) Since() time.Time {
@@ -65,10 +65,10 @@ func (l *sessionCacheLoader) Load(c *ttlcache.Cache[string, cachedSession], user
session, err := l.jmapClient.FetchSession(username, l.logger)
if err != nil {
l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", username)
- return c.Set(username, failedSession{err: err}, l.errorTtl)
+ return c.Set(username, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
} else {
l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", username)
- return c.Set(username, succeededSession{session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
+ return c.Set(username, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
}
}
diff --git a/services/groupware/pkg/metrics/metrics.go b/services/groupware/pkg/metrics/metrics.go
index ca2d3b47bf..d608608e91 100644
--- a/services/groupware/pkg/metrics/metrics.go
+++ b/services/groupware/pkg/metrics/metrics.go
@@ -1,6 +1,9 @@
package metrics
import (
+ "reflect"
+
+ "github.com/opencloud-eu/opencloud/pkg/log"
"github.com/prometheus/client_golang/prometheus"
)
@@ -14,35 +17,82 @@ const (
// Metrics defines the available metrics of this service.
type Metrics struct {
- SessionCacheDesc *prometheus.Desc
- /*SSessionCache *prometheus.GaugeVec*/
- EmailByIdDuration *prometheus.HistogramVec
+ SessionCacheDesc *prometheus.Desc
+ JmapErrorCounter *prometheus.CounterVec
+ ParameterErrorCounter *prometheus.CounterVec
+ AuthenticationFailureCounter prometheus.Counter
+ SessionFailureCounter prometheus.Counter
+ SSEEventsCounter *prometheus.CounterVec
+
+ SuccessfulRequestPerEndpointCounter *prometheus.CounterVec
+ FailedRequestPerEndpointCounter *prometheus.CounterVec
+ FailedRequestStatusPerEndpointCounter *prometheus.CounterVec
+
+ EmailByIdDuration *prometheus.HistogramVec
+ EmailSameSenderDuration *prometheus.HistogramVec
+ EmailSameThreadDuration *prometheus.HistogramVec
}
-const (
- ResultFound = "found"
- ResultNotFound = "not-found"
-
- SessionCacheTypeInsertions = "insertions"
- SessionCacheTypeHits = "hits"
- SessionCacheTypeMisses = "misses"
- SessionCacheTypeEvictions = "evictions"
-)
-
var Labels = struct {
Endpoint string
Result string
SessionCacheType string
RequestId string
+ TraceId string
+ SSEType string
+ ErrorCode string
+ HttpStatusCode string
}{
Endpoint: "endpoint",
Result: "result",
SessionCacheType: "type",
- RequestId: "requestId",
+ RequestId: "requestID",
+ TraceId: "traceID",
+ SSEType: "type",
+ ErrorCode: "code",
+ HttpStatusCode: "statusCode",
+}
+
+var Values = struct {
+ Result struct {
+ Found string
+ NotFound string
+ Success string
+ Failure string
+ }
+ SessionCache struct {
+ Insertions string
+ Hits string
+ Misses string
+ Evictions string
+ }
+}{
+ Result: struct {
+ Found string
+ NotFound string
+ Success string
+ Failure string
+ }{
+ Found: "found",
+ NotFound: "not-found",
+ Success: "success",
+ Failure: "failure",
+ },
+ SessionCache: struct {
+ Insertions string
+ Hits string
+ Misses string
+ Evictions string
+ }{
+ Insertions: "insertions",
+ Hits: "hits",
+ Misses: "misses",
+ Evictions: "evictions",
+ },
}
// New initializes the available metrics.
-func New() *Metrics {
+func New(logger *log.Logger) *Metrics {
m := &Metrics{
SessionCacheDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "session_cache"),
@@ -50,14 +100,99 @@ func New() *Metrics {
[]string{Labels.SessionCacheType},
nil,
),
- EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ AuthenticationFailureCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
- //Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200},
- Name: "email_by_id_duration_seconds",
- Help: "Duration in seconds for retrieving an Email by its id",
+ Name: "auth_failures_count",
+ Help: "Number of failed authentications",
+ }),
+ SessionFailureCounter: prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "session_failures_count",
+ Help: "Number of session retrieval failures",
+ }),
+ ParameterErrorCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "param_errors_count",
+ Help: "Number of invalid request parameter errors that occured",
+ }, []string{Labels.ErrorCode}),
+ JmapErrorCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "jmap_errors_count",
+ Help: "Number of JMAP errors that occured",
+ }, []string{Labels.Endpoint, Labels.ErrorCode}),
+ SuccessfulRequestPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "jmap_requests_count",
+ Help: "Number of JMAP requests",
+ ConstLabels: prometheus.Labels{
+ Labels.Result: Values.Result.Success,
+ },
+ }, []string{Labels.Endpoint}),
+ FailedRequestPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "jmap_requests_count",
+ Help: "Number of JMAP requests",
+ ConstLabels: prometheus.Labels{
+ Labels.Result: Values.Result.Failure,
+ },
+ }, []string{Labels.Endpoint}),
+ FailedRequestStatusPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "jmap_requests_failures_status_count",
+ Help: "Number of JMAP requests",
+ }, []string{Labels.Endpoint, Labels.HttpStatusCode}),
+ SSEEventsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "sse_events_count",
+ Help: "Number of Server-Side Events that have been sent",
+ }, []string{Labels.SSEType}),
+ EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ NativeHistogramBucketFactor: 1.1,
+ Name: "email_by_id_duration_seconds",
+ Help: "Duration in seconds for retrieving an Email by its id",
}, []string{Labels.Endpoint, Labels.Result}),
+ EmailSameSenderDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ NativeHistogramBucketFactor: 1.1,
+ Name: "email_same_sender_duration_seconds",
+ Help: "Duration in seconds for searching for related same-sender Emails",
+ }, []string{Labels.Endpoint}),
+ EmailSameThreadDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ NativeHistogramBucketFactor: 1.1,
+ Name: "email_same_thread_duration_seconds",
+ Help: "Duration in seconds for searching for related same-thread Emails",
+ }, []string{Labels.Endpoint}),
}
+ r := reflect.ValueOf(*m)
+ for i := 0; i < r.NumField(); i++ {
+ n := r.Type().Field(i).Name
+ f := r.Field(i)
+ v := f.Interface()
+ c, ok := v.(prometheus.Collector)
+ if ok {
+ err := prometheus.Register(c)
+ if err != nil {
+ logger.Warn().Err(err).Msgf("failed to register metric '%s' (%T)", n, c)
+ }
+ }
+ }
return m
}
+
+func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) {
+ obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
+}