groupware: improve metrics

* implement more metrics, in a more streamlined fashion

 * use concurrent-map to store SSE streams instead of a regular map with
   one big lock that will not scale when it grows, causing too much
   contention on that one lock

 * while testing error metrics, noticed a few bugs with error handling
   when Stalwart is down: fixed
This commit is contained in:
Pascal Bleser
2025-08-27 17:23:51 +02:00
parent b86e4f388d
commit 7adb66dd16
7 changed files with 314 additions and 150 deletions

View File

@@ -18,12 +18,7 @@ func (g *Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
blobId := chi.URLParam(req.r, UriParamBlobId)
if blobId == "" {
errorId := req.errorId()
msg := fmt.Sprintf("Invalid value for path parameter '%v': empty", UriParamBlobId)
return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: UriParamBlobId}),
))
return req.parameterErrorResponse(UriParamBlobId, fmt.Sprintf("Invalid value for path parameter '%v': empty", UriParamBlobId))
}
res, _, err := g.jmap.GetBlob(req.GetAccountId(), req.session, req.ctx, req.logger, blobId)
@@ -102,7 +97,7 @@ func (g *Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) {
_, err := io.Copy(w, blob.Body)
if err != nil {
return apiError(req.errorId(), ErrorStreamingResponse)
return req.observedParameterError(ErrorStreamingResponse)
}
return nil

View File

@@ -2,7 +2,6 @@ package groupware
import (
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
@@ -33,11 +32,6 @@ type SwaggerGetMailboxById200 struct {
// 500: ErrorResponse500
func (g *Groupware) GetMailbox(w http.ResponseWriter, r *http.Request) {
mailboxId := chi.URLParam(r, UriParamMailboxId)
if mailboxId == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
g.respond(w, r, func(req Request) Response {
res, sessionState, err := g.jmap.GetMailbox(req.GetAccountId(), req.session, req.ctx, req.logger, []string{mailboxId})
if err != nil {
@@ -102,19 +96,17 @@ func (g *Groupware) GetMailboxes(w http.ResponseWriter, r *http.Request) {
filter.Role = role
hasCriteria = true
}
subscribed := q.Get(QueryParamMailboxSearchSubscribed)
if subscribed != "" {
b, err := strconv.ParseBool(subscribed)
if err != nil {
// TODO proper response object
w.WriteHeader(http.StatusBadRequest)
return
}
filter.IsSubscribed = &b
hasCriteria = true
}
g.respond(w, r, func(req Request) Response {
subscribed, set, err := req.parseBoolParam(QueryParamMailboxSearchSubscribed, false)
if err != nil {
return errorResponse(err)
}
if set {
filter.IsSubscribed = &subscribed
hasCriteria = true
}
if hasCriteria {
mailboxes, sessionState, err := g.jmap.SearchMailboxes(req.GetAccountId(), req.session, req.ctx, req.logger, filter)
if err != nil {

View File

@@ -9,8 +9,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
@@ -64,12 +62,7 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
maxChanges := uint(0)
g.respond(w, r, func(req Request) Response {
if mailboxId == "" {
errorId := req.errorId()
msg := fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId)
return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: UriParamMailboxId}),
))
return req.parameterErrorResponse(UriParamMailboxId, fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId))
}
logger := log.From(req.logger.With().Str(HeaderSince, since))
@@ -84,14 +77,9 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
g.respond(w, r, func(req Request) Response {
l := req.logger.With()
if mailboxId == "" {
errorId := req.errorId()
msg := fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId)
return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: UriParamMailboxId}),
))
return req.parameterErrorResponse(UriParamMailboxId, fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId))
}
offset, ok, err := req.parseUNumericParam(QueryParamOffset, 0)
offset, ok, err := req.parseUIntParam(QueryParamOffset, 0)
if err != nil {
return errorResponse(err)
}
@@ -99,7 +87,7 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
l = l.Uint(QueryParamOffset, offset)
}
limit, ok, err := req.parseUNumericParam(QueryParamLimit, g.defaultEmailLimit)
limit, ok, err := req.parseUIntParam(QueryParamLimit, g.defaultEmailLimit)
if err != nil {
return errorResponse(err)
}
@@ -124,12 +112,7 @@ func (g *Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
ids := strings.Split(id, ",")
if len(ids) < 1 {
errorId := req.errorId()
msg := fmt.Sprintf("Invalid value for path parameter '%v': '%s': %s", UriParamMessageId, log.SafeString(id), "empty list of mail ids")
return errorResponse(apiError(errorId, ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: UriParamMessageId}),
))
return req.parameterErrorResponse(UriParamMessageId, fmt.Sprintf("Invalid value for path parameter '%v': '%s': %s", UriParamMessageId, log.SafeString(id), "empty list of mail ids"))
}
logger := log.From(req.logger.With().Str("id", log.SafeString(id)))
@@ -145,7 +128,7 @@ func (g *Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) {
func (g *Groupware) getMessagesSince(w http.ResponseWriter, r *http.Request, since string) {
g.respond(w, r, func(req Request) Response {
l := req.logger.With().Str(QueryParamSince, since)
maxChanges, ok, err := req.parseUNumericParam(QueryParamMaxChanges, 0)
maxChanges, ok, err := req.parseUIntParam(QueryParamMaxChanges, 0)
if err != nil {
return errorResponse(err)
}
@@ -187,7 +170,7 @@ type MessageSearchResults struct {
QueryState jmap.State `json:"queryState,omitempty"`
}
func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uint, uint, *log.Logger, Response) {
func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uint, uint, *log.Logger, *Error) {
q := req.r.URL.Query()
mailboxId := q.Get(QueryParamMailboxId)
notInMailboxIds := q[QueryParamNotInMailboxId]
@@ -202,17 +185,17 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
l := req.logger.With()
offset, ok, err := req.parseUNumericParam(QueryParamOffset, 0)
offset, ok, err := req.parseUIntParam(QueryParamOffset, 0)
if err != nil {
return false, nil, 0, 0, nil, errorResponse(err)
return false, nil, 0, 0, nil, err
}
if ok {
l = l.Uint(QueryParamOffset, offset)
}
limit, ok, err := req.parseUNumericParam(QueryParamLimit, g.defaultEmailLimit)
limit, ok, err := req.parseUIntParam(QueryParamLimit, g.defaultEmailLimit)
if err != nil {
return false, nil, 0, 0, nil, errorResponse(err)
return false, nil, 0, 0, nil, err
}
if ok {
l = l.Uint(QueryParamLimit, limit)
@@ -220,7 +203,7 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
before, ok, err := req.parseDateParam(QueryParamSearchBefore)
if err != nil {
return false, nil, 0, 0, nil, errorResponse(err)
return false, nil, 0, 0, nil, err
}
if ok {
l = l.Time(QueryParamSearchBefore, before)
@@ -228,7 +211,7 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
after, ok, err := req.parseDateParam(QueryParamSearchAfter)
if err != nil {
return false, nil, 0, 0, nil, errorResponse(err)
return false, nil, 0, 0, nil, err
}
if ok {
l = l.Time(QueryParamSearchAfter, after)
@@ -262,17 +245,17 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
l = l.Str(QueryParamSearchBody, log.SafeString(body))
}
minSize, ok, err := req.parseNumericParam(QueryParamSearchMinSize, 0)
minSize, ok, err := req.parseIntParam(QueryParamSearchMinSize, 0)
if err != nil {
return false, nil, 0, 0, nil, errorResponse(err)
return false, nil, 0, 0, nil, err
}
if ok {
l = l.Int(QueryParamSearchMinSize, minSize)
}
maxSize, ok, err := req.parseNumericParam(QueryParamSearchMaxSize, 0)
maxSize, ok, err := req.parseIntParam(QueryParamSearchMaxSize, 0)
if err != nil {
return false, nil, 0, 0, nil, errorResponse(err)
return false, nil, 0, 0, nil, err
}
if ok {
l = l.Int(QueryParamSearchMaxSize, maxSize)
@@ -314,14 +297,14 @@ func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, uin
}
}
return true, filter, offset, limit, logger, Response{}
return true, filter, offset, limit, logger, nil
}
func (g *Groupware) searchMessages(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
ok, filter, offset, limit, logger, errResp := g.buildFilter(req)
ok, filter, offset, limit, logger, err := g.buildFilter(req)
if !ok {
return errResp
return errorResponse(err)
}
if !filter.IsNotEmpty() {
@@ -498,7 +481,6 @@ func (g *Groupware) DeleteMessage(w http.ResponseWriter, r *http.Request) {
l := req.logger.With()
l.Str(UriParamMessageId, messageId)
logger := log.From(l)
_, sessionState, jerr := g.jmap.DeleteEmails(req.GetAccountId(), []string{messageId}, req.session, req.ctx, logger)
@@ -564,12 +546,12 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, UriParamMessageId)
g.respond(w, r, func(req Request) Response {
limit, _, err := req.parseUNumericParam(QueryParamLimit, 10) // TODO configurable default limit
limit, _, err := req.parseUIntParam(QueryParamLimit, 10) // TODO configurable default limit
if err != nil {
return errorResponse(err)
}
days, _, err := req.parseUNumericParam(QueryParamDays, 5) // TODO configurable default days
days, _, err := req.parseUIntParam(QueryParamDays, 5) // TODO configurable default days
if err != nil {
return errorResponse(err)
}
@@ -583,16 +565,14 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
if len(emails.Emails) < 1 {
g.metrics.EmailByIdDuration.WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).Observe(getEmailsDuration.Seconds())
req.observe(g.metrics.EmailByIdDuration.WithLabelValues(req.session.JmapEndpoint, metrics.Values.Result.NotFound), getEmailsDuration.Seconds())
logger.Trace().Msg("failed to find any emails matching id") // the id is already in the log field
return notFoundResponse(sessionState)
} else {
req.observe(g.metrics.EmailByIdDuration.WithLabelValues(req.session.JmapEndpoint, metrics.Values.Result.Found), getEmailsDuration.Seconds())
}
g.metrics.EmailByIdDuration.
WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).(prometheus.ExemplarObserver).
ObserveWithExemplar(getEmailsDuration.Seconds(), prometheus.Labels{
metrics.Labels.RequestId: reqId,
})
email := emails.Emails[0]
@@ -604,10 +584,14 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
bgctx := context.Background()
g.job(logger, RelationTypeSameSender, func(jobId uint64, l *log.Logger) {
before := time.Now()
results, _, jerr := g.jmap.QueryEmails(accountId, filter, req.session, bgctx, l, 0, limit, false, g.maxBodyValueBytes)
duration := time.Since(before)
if jerr != nil {
req.observeJmapError(jerr)
l.Error().Err(jerr).Msgf("failed to query %v emails", RelationTypeSameSender)
} else {
req.observe(g.metrics.EmailSameSenderDuration.WithLabelValues(req.session.JmapEndpoint), duration.Seconds())
related := filterEmails(results.Emails, email)
l.Trace().Msgf("'%v' found %v other emails", RelationTypeSameSender, len(related))
if len(related) > 0 {
@@ -617,10 +601,14 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
})
g.job(logger, RelationTypeSameThread, func(jobId uint64, l *log.Logger) {
before := time.Now()
emails, _, jerr := g.jmap.EmailsInThread(accountId, email.ThreadId, req.session, bgctx, l, false, g.maxBodyValueBytes)
duration := time.Since(before)
if jerr != nil {
req.observeJmapError(jerr)
l.Error().Err(jerr).Msgf("failed to list %v emails", RelationTypeSameThread)
} else {
req.observe(g.metrics.EmailSameThreadDuration.WithLabelValues(req.session.JmapEndpoint), duration.Seconds())
related := filterEmails(emails, email)
l.Trace().Msgf("'%v' found %v other emails", RelationTypeSameThread, len(related))
if len(related) > 0 {

View File

@@ -160,6 +160,7 @@ const (
ErrorCodeInvalidRequestPayload = "INVRQP"
ErrorCodeInvalidResponsePayload = "INVRSP"
ErrorCodeInvalidRequestParameter = "INVPAR"
ErrorCodeInvalidRequestBody = "INVBDY"
ErrorCodeNonExistingAccount = "INVACC"
ErrorCodeApiInconsistency = "APIINC"
ErrorCodeInvalidUserRequest = "INVURQ"
@@ -262,6 +263,12 @@ var (
Title: "Invalid Request Parameter",
Detail: "At least one of the parameters in the request is invalid.",
}
ErrorInvalidRequestBody = GroupwareError{
Status: http.StatusBadRequest,
Code: ErrorCodeInvalidRequestBody,
Title: "Invalid Request Body",
Detail: "The body of the request is invalid.",
}
ErrorInvalidUserRequest = GroupwareError{
Status: http.StatusBadRequest,
Code: ErrorCodeInvalidUserRequest,
@@ -432,6 +439,10 @@ func apiError(id string, gwerr GroupwareError, options ...ErrorOpt) *Error {
return err
}
func (r Request) observedParameterError(gwerr GroupwareError, options ...ErrorOpt) *Error {
return r.observeParameterError(apiError(r.errorId(), gwerr, options...))
}
func (r Request) apiErrorFromJmap(err jmap.Error) *Error {
if err == nil {
return nil
@@ -450,5 +461,5 @@ func errorResponses(errors ...Error) ErrorResponse {
}
func (r Request) errorResponseFromJmap(err jmap.Error) Response {
return errorResponse(r.apiErrorFromJmap(err))
return errorResponse(r.apiErrorFromJmap(r.observeJmapError(err)))
}

View File

@@ -9,7 +9,6 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -23,11 +22,14 @@ import (
"github.com/jellydator/ttlcache/v3"
cmap "github.com/orcaman/concurrent-map"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
groupwaremiddleware "github.com/opencloud-eu/opencloud/services/groupware/pkg/middleware"
)
const (
@@ -72,8 +74,7 @@ type Groupware struct {
mux *chi.Mux
metrics *metrics.Metrics
sseServer *sse.Server
streams map[string]time.Time
streamsLock sync.Mutex
streams cmap.ConcurrentMap
logger *log.Logger
defaultEmailLimit uint
maxBodyValueBytes uint
@@ -147,10 +148,10 @@ func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
}
func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m := s.supply()
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.SessionCacheTypeEvictions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.SessionCacheTypeInsertions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.SessionCacheTypeHits)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.SessionCacheTypeMisses)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.Values.SessionCache.Evictions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.Values.SessionCache.Insertions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.Values.SessionCache.Hits)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.Values.SessionCache.Misses)
}
var _ prometheus.Collector = SessionCacheMetricsCollector{}
@@ -173,8 +174,6 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
return nil, GroupwareInitializationError{Message: "Mail.Master.Password is empty"}
}
m := metrics.New()
defaultEmailLimit := max(config.Mail.DefaultEmailLimit, 0)
maxBodyValueBytes := max(config.Mail.MaxBodyValueBytes, 0)
responseHeaderTimeout := max(config.Mail.ResponseHeaderTimeout, 0)
@@ -189,10 +188,16 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive
sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting
insecure := true // TODO make configurable
m := metrics.New(logger)
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = responseHeaderTimeout
tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable
tr.TLSClientConfig = tlsConfig
if insecure {
tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable
tr.TLSClientConfig = tlsConfig
}
c := *http.DefaultClient
c.Transport = tr
@@ -203,6 +208,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
&c,
masterUsername,
masterPassword,
jmap.HttpJmapApiClientMetrics{
SuccessfulRequestPerEndpointCounter: m.SuccessfulRequestPerEndpointCounter,
FailedRequestPerEndpointCounter: m.FailedRequestPerEndpointCounter,
FailedRequestStatusPerEndpointCounter: m.FailedRequestStatusPerEndpointCounter,
},
)
jmapClient := jmap.NewClient(api, api, api)
@@ -243,8 +253,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
reason = fmt.Sprintf("unknown (%v)", r)
}
spentInCache := time.Since(item.Value().Since())
logger.Trace().Msgf("session cache eviction of user '%v' after %v: %v", item.Key(), spentInCache, reason)
typ := "successful"
if !item.Value().Success() {
typ = "failed"
}
logger.Trace().Msgf("%s session cache eviction of user '%v' after %v: %v", typ, item.Key(), spentInCache, reason)
})
}
@@ -254,7 +267,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
counter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "outdated_sessions",
Name: "outdated_sessions_count",
Help: "Counts outdated session events",
}),
}
@@ -356,8 +369,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
mux: mux,
metrics: m,
sseServer: sseServer,
streams: map[string]time.Time{},
streamsLock: sync.Mutex{},
streams: cmap.New(),
logger: logger,
sessionCache: sessionCache,
userProvider: userProvider,
@@ -427,6 +439,7 @@ func (g *Groupware) listenForEvents() {
}
func (g *Groupware) push(user User, typ string, body any) {
g.metrics.SSEEventsCounter.WithLabelValues(typ).Inc()
g.eventChannel <- Event{Type: typ, Stream: user.GetUsername(), Body: body}
}
@@ -435,23 +448,14 @@ func (g *Groupware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (g *Groupware) addStream(stream string) bool {
g.streamsLock.Lock()
defer g.streamsLock.Unlock()
_, ok := g.streams[stream]
if ok {
return false
}
g.streams[stream] = time.Now()
return true
return g.streams.SetIfAbsent(stream, time.Now())
}
func (g *Groupware) keepStreamsAlive() {
event := &sse.Event{Comment: []byte("keepalive")}
g.streamsLock.Lock()
defer g.streamsLock.Unlock()
for stream := range g.streams {
g.streams.IterCb(func(stream string, created any) {
g.sseServer.Publish(stream, event)
}
})
}
func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) {
@@ -475,7 +479,7 @@ func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) {
}
// Provide a JMAP Session for the
func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, error) {
func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, *GroupwareError) {
item := g.sessionCache.Get(user.GetUsername())
if item != nil {
value := item.Value()
@@ -598,6 +602,10 @@ func (r Request) GetRequestId() string {
return chimiddleware.GetReqID(r.ctx)
}
func (r Request) GetTraceId() string {
return groupwaremiddleware.GetTraceID(r.ctx)
}
func (r Request) GetAccountId() string {
accountId := chi.URLParam(r.r, UriParamAccount)
return r.session.MailAccountId(accountId)
@@ -608,9 +616,9 @@ func (r Request) GetAccount() (jmap.SessionAccount, *Error) {
account, ok := r.session.Accounts[accountId]
if !ok {
errorId := r.errorId()
r.logger.Debug().Msgf("failed to find account '%v'", accountId)
return jmap.SessionAccount{}, apiError(errorId, ErrorNonExistingAccount,
// TODO metric for inexistent accounts
return jmap.SessionAccount{}, apiError(r.errorId(), ErrorNonExistingAccount,
withDetail(fmt.Sprintf("The account '%v' does not exist", log.SafeString(accountId))),
withSource(&ErrorSource{Parameter: UriParamAccount}),
)
@@ -618,7 +626,17 @@ func (r Request) GetAccount() (jmap.SessionAccount, *Error) {
return account, nil
}
func (r Request) parseNumericParam(param string, defaultValue int) (int, bool, *Error) {
func (r Request) parameterError(param string, detail string) *Error {
return r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(detail),
withSource(&ErrorSource{Parameter: param}))
}
func (r Request) parameterErrorResponse(param string, detail string) Response {
return errorResponse(r.parameterError(param, detail))
}
func (r Request) parseIntParam(param string, defaultValue int) (int, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
@@ -631,11 +649,10 @@ func (r Request) parseNumericParam(param string, defaultValue int) (int, bool, *
value, err := strconv.ParseInt(str, 10, 0)
if err != nil {
errorId := r.errorId()
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
return defaultValue, true, apiError(errorId, ErrorInvalidRequestParameter,
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -643,7 +660,7 @@ func (r Request) parseNumericParam(param string, defaultValue int) (int, bool, *
return int(value), true, nil
}
func (r Request) parseUNumericParam(param string, defaultValue uint) (uint, bool, *Error) {
func (r Request) parseUIntParam(param string, defaultValue uint) (uint, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
@@ -656,11 +673,10 @@ func (r Request) parseUNumericParam(param string, defaultValue uint) (uint, bool
value, err := strconv.ParseUint(str, 10, 0)
if err != nil {
errorId := r.errorId()
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
return defaultValue, true, apiError(errorId, ErrorInvalidRequestParameter,
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -681,9 +697,8 @@ func (r Request) parseDateParam(param string) (time.Time, bool, *Error) {
t, err := time.Parse(time.RFC3339, str)
if err != nil {
errorId := r.errorId()
msg := fmt.Sprintf("Invalid RFC3339 value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
return time.Time{}, true, apiError(errorId, ErrorInvalidRequestParameter,
return time.Time{}, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -704,9 +719,8 @@ func (r Request) parseBoolParam(param string, defaultValue bool) (bool, bool, *E
b, err := strconv.ParseBool(str)
if err != nil {
errorId := r.errorId()
msg := fmt.Sprintf("Invalid boolean value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
return defaultValue, true, apiError(errorId, ErrorInvalidRequestParameter,
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
@@ -725,11 +739,29 @@ func (r Request) body(target any) *Error {
err := json.NewDecoder(body).Decode(target)
if err != nil {
// TODO(pbleser-oc) error handling when failing to decode body
return r.observedParameterError(ErrorInvalidRequestBody, withSource(&ErrorSource{Pointer: "/"})) // we don't get any details here
}
return nil
}
func (r Request) observe(obs prometheus.Observer, value float64) {
metrics.WithExemplar(obs, value, r.GetRequestId(), r.GetTraceId())
}
func (r Request) observeParameterError(err *Error) *Error {
if err != nil {
r.g.metrics.ParameterErrorCounter.WithLabelValues(err.Code).Inc()
}
return err
}
func (r Request) observeJmapError(jerr jmap.Error) jmap.Error {
if jerr != nil {
r.g.metrics.JmapErrorCounter.WithLabelValues(r.session.JmapEndpoint, strconv.Itoa(jerr.Code())).Inc()
}
return jerr
}
func (g *Groupware) log(error *Error) {
var level *zerolog.Event
if error.NumStatus < 300 {
@@ -781,26 +813,33 @@ func (g *Groupware) withSession(w http.ResponseWriter, r *http.Request, handler
user, err := g.userProvider.GetUser(r, ctx, logger)
if err != nil {
g.metrics.AuthenticationFailureCounter.Inc()
g.serveError(w, r, apiError(errorId(r, ctx), ErrorInvalidAuthentication))
return Response{}, false
}
if user == nil {
g.metrics.AuthenticationFailureCounter.Inc()
g.serveError(w, r, apiError(errorId(r, ctx), ErrorMissingAuthentication))
return Response{}, false
}
logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
session, ok, err := g.session(user, r, ctx, logger)
if err != nil {
logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session")
render.Status(r, http.StatusInternalServerError)
session, ok, gwerr := g.session(user, r, ctx, logger)
if gwerr != nil {
g.metrics.SessionFailureCounter.Inc()
errorId := errorId(r, ctx)
logger.Error().Str("code", gwerr.Code).Str("error", gwerr.Title).Str("detail", gwerr.Detail).Str(logErrorId, errorId).Msg("failed to determine JMAP session")
g.serveError(w, r, apiError(errorId, *gwerr))
return Response{}, false
}
if !ok {
// no session = authentication failed
logger.Warn().Err(err).Interface(logQuery, r.URL.Query()).Msg("could not authenticate")
render.Status(r, http.StatusForbidden)
g.metrics.SessionFailureCounter.Inc()
errorId := errorId(r, ctx)
logger.Error().Str(logErrorId, errorId).Msg("could not authenticate, failed to find Session")
gwerr = &ErrorInvalidAuthentication
g.serveError(w, r, apiError(errorId, *gwerr))
return Response{}, false
}
decoratedLogger := session.DecorateLogger(*logger)
@@ -891,18 +930,22 @@ func (g *Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(
logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId())))
session, ok, err := g.session(user, r, ctx, logger)
if err != nil {
logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session")
render.Status(r, http.StatusInternalServerError)
session, ok, gwerr := g.session(user, r, ctx, logger)
if gwerr != nil {
errorId := errorId(r, ctx)
logger.Error().Str("code", gwerr.Code).Str("error", gwerr.Title).Str("detail", gwerr.Detail).Str(logErrorId, errorId).Msg("failed to determine JMAP session")
g.serveError(w, r, apiError(errorId, *gwerr))
return
}
if !ok {
// no session = authentication failed
logger.Warn().Err(err).Interface(logQuery, r.URL.Query()).Msg("could not authenticate")
render.Status(r, http.StatusForbidden)
errorId := errorId(r, ctx)
logger.Error().Str(logErrorId, errorId).Msg("could not authenticate, failed to find Session")
gwerr = &ErrorInvalidAuthentication
g.serveError(w, r, apiError(errorId, *gwerr))
return
}
decoratedLogger := session.DecorateLogger(*logger)
req := Request{

View File

@@ -11,7 +11,7 @@ import (
type cachedSession interface {
Success() bool
Get() jmap.Session
Error() error
Error() *GroupwareError
Since() time.Time
}
@@ -26,7 +26,7 @@ func (s succeededSession) Success() bool {
func (s succeededSession) Get() jmap.Session {
return s.session
}
func (s succeededSession) Error() error {
func (s succeededSession) Error() *GroupwareError {
return nil
}
func (s succeededSession) Since() time.Time {
@@ -37,7 +37,7 @@ var _ cachedSession = succeededSession{}
type failedSession struct {
since time.Time
err error
err *GroupwareError
}
func (s failedSession) Success() bool {
@@ -46,7 +46,7 @@ func (s failedSession) Success() bool {
func (s failedSession) Get() jmap.Session {
panic("this should never be called")
}
func (s failedSession) Error() error {
func (s failedSession) Error() *GroupwareError {
return s.err
}
func (s failedSession) Since() time.Time {
@@ -65,10 +65,10 @@ func (l *sessionCacheLoader) Load(c *ttlcache.Cache[string, cachedSession], user
session, err := l.jmapClient.FetchSession(username, l.logger)
if err != nil {
l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", username)
return c.Set(username, failedSession{err: err}, l.errorTtl)
return c.Set(username, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
} else {
l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", username)
return c.Set(username, succeededSession{session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
return c.Set(username, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
}
}

View File

@@ -1,6 +1,9 @@
package metrics
import (
"reflect"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/prometheus/client_golang/prometheus"
)
@@ -14,35 +17,82 @@ const (
// Metrics defines the available metrics of this service.
type Metrics struct {
SessionCacheDesc *prometheus.Desc
/*SSessionCache *prometheus.GaugeVec*/
EmailByIdDuration *prometheus.HistogramVec
SessionCacheDesc *prometheus.Desc
JmapErrorCounter *prometheus.CounterVec
ParameterErrorCounter *prometheus.CounterVec
AuthenticationFailureCounter prometheus.Counter
SessionFailureCounter prometheus.Counter
SSEEventsCounter *prometheus.CounterVec
SuccessfulRequestPerEndpointCounter *prometheus.CounterVec
FailedRequestPerEndpointCounter *prometheus.CounterVec
FailedRequestStatusPerEndpointCounter *prometheus.CounterVec
EmailByIdDuration *prometheus.HistogramVec
EmailSameSenderDuration *prometheus.HistogramVec
EmailSameThreadDuration *prometheus.HistogramVec
}
const (
ResultFound = "found"
ResultNotFound = "not-found"
SessionCacheTypeInsertions = "insertions"
SessionCacheTypeHits = "hits"
SessionCacheTypeMisses = "misses"
SessionCacheTypeEvictions = "evictions"
)
var Labels = struct {
Endpoint string
Result string
SessionCacheType string
RequestId string
TraceId string
SSEType string
ErrorCode string
HttpStatusCode string
}{
Endpoint: "endpoint",
Result: "result",
SessionCacheType: "type",
RequestId: "requestId",
RequestId: "requestID",
TraceId: "traceID",
SSEType: "type",
ErrorCode: "code",
HttpStatusCode: "statusCode",
}
var Values = struct {
Result struct {
Found string
NotFound string
Success string
Failure string
}
SessionCache struct {
Insertions string
Hits string
Misses string
Evictions string
}
}{
Result: struct {
Found string
NotFound string
Success string
Failure string
}{
Found: "found",
NotFound: "not-found",
Success: "success",
Failure: "failure",
},
SessionCache: struct {
Insertions string
Hits string
Misses string
Evictions string
}{
Insertions: "insertions",
Hits: "hits",
Misses: "misses",
Evictions: "evictions",
},
}
// New initializes the available metrics.
func New() *Metrics {
func New(logger *log.Logger) *Metrics {
m := &Metrics{
SessionCacheDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "session_cache"),
@@ -50,14 +100,99 @@ func New() *Metrics {
[]string{Labels.SessionCacheType},
nil,
),
EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
AuthenticationFailureCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
//Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200},
Name: "email_by_id_duration_seconds",
Help: "Duration in seconds for retrieving an Email by its id",
Name: "auth_failures_count",
Help: "Number of failed authentications",
}),
SessionFailureCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "session_failures_count",
Help: "Number of session retrieval failures",
}),
ParameterErrorCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "param_errors_count",
Help: "Number of invalid request parameter errors that occured",
}, []string{Labels.ErrorCode}),
JmapErrorCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "jmap_errors_count",
Help: "Number of JMAP errors that occured",
}, []string{Labels.Endpoint, Labels.ErrorCode}),
SuccessfulRequestPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "jmap_requests_count",
Help: "Number of JMAP requests",
ConstLabels: prometheus.Labels{
Labels.Result: Values.Result.Success,
},
}, []string{Labels.Endpoint}),
FailedRequestPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "jmap_requests_count",
Help: "Number of JMAP requests",
ConstLabels: prometheus.Labels{
Labels.Result: Values.Result.Failure,
},
}, []string{Labels.Endpoint}),
FailedRequestStatusPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "jmap_requests_failures_status_count",
Help: "Number of JMAP requests",
}, []string{Labels.Endpoint, Labels.HttpStatusCode}),
SSEEventsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "sse_events_count",
Help: "Number of Server-Side Events that have been sent",
}, []string{Labels.SSEType}),
EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
NativeHistogramBucketFactor: 1.1,
Name: "email_by_id_duration_seconds",
Help: "Duration in seconds for retrieving an Email by its id",
}, []string{Labels.Endpoint, Labels.Result}),
EmailSameSenderDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
NativeHistogramBucketFactor: 1.1,
Name: "email_same_sender_duration_seconds",
Help: "Duration in seconds for searching for related same-sender Emails",
}, []string{Labels.Endpoint}),
EmailSameThreadDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
NativeHistogramBucketFactor: 1.1,
Name: "email_same_thread_duration_seconds",
Help: "Duration in seconds for searching for related same-thread Emails",
}, []string{Labels.Endpoint}),
}
r := reflect.ValueOf(*m)
for i := 0; i < r.NumField(); i++ {
n := r.Type().Field(i).Name
f := r.Field(i)
v := f.Interface()
c, ok := v.(prometheus.Collector)
if ok {
err := prometheus.Register(c)
if err != nil {
logger.Warn().Err(err).Msgf("failed to register metric '%s' (%T)", n, c)
}
}
}
return m
}
func WithExemplar(obs prometheus.Observer, value float64, requestId string, traceId string) {
obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
}