From c7bb8665957c145b47f40c8cb94a5da3218d672e Mon Sep 17 00:00:00 2001 From: Pascal Bleser Date: Fri, 22 Aug 2025 15:51:52 +0200 Subject: [PATCH] groupware: initial related emails implementation with SSE --- pkg/jmap/jmap_api_email.go | 139 ++++++++- pkg/jmap/jmap_error.go | 6 +- pkg/jmap/jmap_model.go | 5 + .../pkg/groupware/groupware_api_account.go | 6 +- .../pkg/groupware/groupware_api_blob.go | 6 +- .../pkg/groupware/groupware_api_identity.go | 2 +- .../pkg/groupware/groupware_api_mailbox.go | 4 +- .../pkg/groupware/groupware_api_messages.go | 134 ++++++++- .../pkg/groupware/groupware_api_vacation.go | 4 +- .../pkg/groupware/groupware_framework.go | 265 ++++++++++++++++-- .../groupware/pkg/groupware/groupware_reva.go | 42 ++- .../pkg/groupware/groupware_route.go | 10 +- 12 files changed, 543 insertions(+), 80 deletions(-) diff --git a/pkg/jmap/jmap_api_email.go b/pkg/jmap/jmap_api_email.go index 6eb3476bdc..9b2abdcacb 100644 --- a/pkg/jmap/jmap_api_email.go +++ b/pkg/jmap/jmap_api_email.go @@ -354,12 +354,87 @@ func (j *Client) QueryEmailSnippets(accountId string, filter EmailFilterElement, } +type EmailQueryResult struct { + Results []Email `json:"results"` + Total int `json:"total"` + Limit int `json:"limit,omitzero"` + Position int `json:"position,omitzero"` + QueryState string `json:"queryState"` + SessionState string `json:"sessionState,omitempty"` +} + +func (j *Client) QueryEmails(accountId string, filter EmailFilterElement, session *Session, ctx context.Context, logger *log.Logger, offset int, limit int, fetchBodies bool, maxBodyValueBytes int) (EmailQueryResult, Error) { + aid := session.MailAccountId(accountId) + logger = j.loggerParams(aid, "QueryEmails", session, logger, func(z zerolog.Context) zerolog.Context { + return z.Bool(logFetchBodies, fetchBodies) + }) + + query := EmailQueryCommand{ + AccountId: aid, + Filter: filter, + Sort: []EmailComparator{{Property: emailSortByReceivedAt, IsAscending: false}}, + CollapseThreads: true, + CalculateTotal: true, + } + if offset >= 0 { + query.Position = offset + } + if limit >= 0 { + query.Limit = limit + } + + mails := EmailGetRefCommand{ + AccountId: aid, + IdRef: &ResultReference{ + ResultOf: "0", + Name: CommandEmailQuery, + Path: "/ids/*", + }, + FetchAllBodyValues: fetchBodies, + MaxBodyValueBytes: maxBodyValueBytes, + } + + cmd, err := request( + invocation(CommandEmailQuery, query, "0"), + invocation(CommandEmailGet, mails, "1"), + ) + + if err != nil { + logger.Error().Err(err) + return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapRequestPayload) + } + + return command(j.api, logger, ctx, session, j.onSessionOutdated, cmd, func(body *Response) (EmailQueryResult, Error) { + var queryResponse EmailQueryResponse + err = retrieveResponseMatchParameters(body, CommandEmailQuery, "0", &queryResponse) + if err != nil { + return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) + } + + var emailsResponse EmailGetResponse + err = retrieveResponseMatchParameters(body, CommandEmailGet, "1", &emailsResponse) + if err != nil { + return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) + } + + return EmailQueryResult{ + Results: emailsResponse.List, + Total: queryResponse.Total, + Limit: queryResponse.Limit, + Position: queryResponse.Position, + QueryState: queryResponse.QueryState, + SessionState: body.SessionState, + }, nil + }) + +} + type EmailWithSnippets struct { Email Email `json:"email"` Snippets []SearchSnippet `json:"snippets,omitempty"` } -type EmailQueryResult struct { +type EmailQueryWithSnippetsResult struct { Results []EmailWithSnippets `json:"results"` Total int `json:"total"` Limit int `json:"limit,omitzero"` @@ -368,9 +443,9 @@ type EmailQueryResult struct { SessionState string `json:"sessionState,omitempty"` } -func (j *Client) QueryEmails(accountId string, filter EmailFilterElement, session *Session, ctx context.Context, logger *log.Logger, offset int, limit int, fetchBodies bool, maxBodyValueBytes int) (EmailQueryResult, Error) { +func (j *Client) QueryEmailsWithSnippets(accountId string, filter EmailFilterElement, session *Session, ctx context.Context, logger *log.Logger, offset int, limit int, fetchBodies bool, maxBodyValueBytes int) (EmailQueryWithSnippetsResult, Error) { aid := session.MailAccountId(accountId) - logger = j.loggerParams(aid, "QueryEmails", session, logger, func(z zerolog.Context) zerolog.Context { + logger = j.loggerParams(aid, "QueryEmailsWithSnippets", session, logger, func(z zerolog.Context) zerolog.Context { return z.Bool(logFetchBodies, fetchBodies) }) @@ -417,26 +492,26 @@ func (j *Client) QueryEmails(accountId string, filter EmailFilterElement, sessio if err != nil { logger.Error().Err(err) - return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapRequestPayload) + return EmailQueryWithSnippetsResult{}, simpleError(err, JmapErrorInvalidJmapRequestPayload) } - return command(j.api, logger, ctx, session, j.onSessionOutdated, cmd, func(body *Response) (EmailQueryResult, Error) { + return command(j.api, logger, ctx, session, j.onSessionOutdated, cmd, func(body *Response) (EmailQueryWithSnippetsResult, Error) { var queryResponse EmailQueryResponse err = retrieveResponseMatchParameters(body, CommandEmailQuery, "0", &queryResponse) if err != nil { - return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) + return EmailQueryWithSnippetsResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) } var snippetResponse SearchSnippetGetResponse err = retrieveResponseMatchParameters(body, CommandSearchSnippetGet, "1", &snippetResponse) if err != nil { - return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) + return EmailQueryWithSnippetsResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) } var emailsResponse EmailGetResponse err = retrieveResponseMatchParameters(body, CommandEmailGet, "2", &emailsResponse) if err != nil { - return EmailQueryResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) + return EmailQueryWithSnippetsResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) } snippetsById := map[string][]SearchSnippet{} @@ -460,7 +535,7 @@ func (j *Client) QueryEmails(accountId string, filter EmailFilterElement, sessio }) } - return EmailQueryResult{ + return EmailQueryWithSnippetsResult{ Results: results, Total: queryResponse.Total, Limit: queryResponse.Limit, @@ -808,5 +883,51 @@ func (j *Client) SubmitEmail(accountId string, identityId string, emailId string SessionState: body.SessionState, }, nil }) +} + +type EmailsInThreadResult struct { + Emails []Email `json:"emails"` + SessionState string `json:"sessionState"` +} + +func (j *Client) EmailsInThread(accountId string, threadId string, session *Session, ctx context.Context, logger *log.Logger, fetchBodies bool, maxBodyValueBytes int) (EmailsInThreadResult, Error) { + aid := session.MailAccountId(accountId) + logger = j.loggerParams(aid, "EmailsInThread", session, logger, func(z zerolog.Context) zerolog.Context { + return z.Bool(logFetchBodies, fetchBodies).Str("threadId", log.SafeString(threadId)) + }) + + cmd, err := request( + invocation(CommandThreadGet, ThreadGetCommand{ + AccountId: aid, + Ids: []string{threadId}, + }, "0"), + invocation(CommandEmailGet, EmailGetRefCommand{ + AccountId: aid, + IdRef: &ResultReference{ + ResultOf: "0", + Name: CommandThreadGet, + Path: "/list/*/emailIds", + }, + FetchAllBodyValues: fetchBodies, + MaxBodyValueBytes: maxBodyValueBytes, + }, "1"), + ) + + if err != nil { + logger.Error().Err(err) + return EmailsInThreadResult{}, simpleError(err, JmapErrorInvalidJmapRequestPayload) + } + + return command(j.api, logger, ctx, session, j.onSessionOutdated, cmd, func(body *Response) (EmailsInThreadResult, Error) { + var emailsResponse EmailGetResponse + err = retrieveResponseMatchParameters(body, CommandEmailGet, "1", &emailsResponse) + if err != nil { + return EmailsInThreadResult{}, simpleError(err, JmapErrorInvalidJmapResponsePayload) + } + return EmailsInThreadResult{ + Emails: emailsResponse.List, + SessionState: body.SessionState, + }, nil + }) } diff --git a/pkg/jmap/jmap_error.go b/pkg/jmap/jmap_error.go index 425260d535..a21c09fe3b 100644 --- a/pkg/jmap/jmap_error.go +++ b/pkg/jmap/jmap_error.go @@ -40,7 +40,11 @@ func (e SimpleError) Unwrap() error { return e.err } func (e SimpleError) Error() string { - return e.err.Error() + if e.err != nil { + return e.err.Error() + } else { + return "" + } } func simpleError(err error, code int) Error { diff --git a/pkg/jmap/jmap_model.go b/pkg/jmap/jmap_model.go index dbadc44356..daaeecb115 100644 --- a/pkg/jmap/jmap_model.go +++ b/pkg/jmap/jmap_model.go @@ -2344,6 +2344,11 @@ type Thread struct { EmailIds []string } +type ThreadGetCommand struct { + AccountId string `json:"accountId"` + Ids []string `json:"ids,omitempty"` +} + type ThreadGetResponse struct { AccountId string State string diff --git a/services/groupware/pkg/groupware/groupware_api_account.go b/services/groupware/pkg/groupware/groupware_api_account.go index 1d40f97cef..946718a763 100644 --- a/services/groupware/pkg/groupware/groupware_api_account.go +++ b/services/groupware/pkg/groupware/groupware_api_account.go @@ -7,7 +7,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/structs" ) -func (g Groupware) GetAccount(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetAccount(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { account, err := req.GetAccount() if err != nil { @@ -17,7 +17,7 @@ func (g Groupware) GetAccount(w http.ResponseWriter, r *http.Request) { }) } -func (g Groupware) GetAccounts(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetAccounts(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { return response(req.session.Accounts, req.session.State) }) @@ -54,7 +54,7 @@ type SwaggerAccountBootstrapResponse struct { } } -func (g Groupware) GetAccountBootstrap(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetAccountBootstrap(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { mailAccountId := req.GetAccountId() accountIds := structs.Keys(req.session.Accounts) diff --git a/services/groupware/pkg/groupware/groupware_api_blob.go b/services/groupware/pkg/groupware/groupware_api_blob.go index 92265c1f61..113e0ba5a1 100644 --- a/services/groupware/pkg/groupware/groupware_api_blob.go +++ b/services/groupware/pkg/groupware/groupware_api_blob.go @@ -13,7 +13,7 @@ const ( DefaultBlobDownloadType = "application/octet-stream" ) -func (g Groupware) GetBlob(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetBlob(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { blobId := chi.URLParam(req.r, UriParamBlobId) if blobId == "" { @@ -37,7 +37,7 @@ func (g Groupware) GetBlob(w http.ResponseWriter, r *http.Request) { }) } -func (g Groupware) UploadBlob(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) UploadBlob(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { contentType := r.Header.Get("Content-Type") body := r.Body @@ -59,7 +59,7 @@ func (g Groupware) UploadBlob(w http.ResponseWriter, r *http.Request) { }) } -func (g Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) { g.stream(w, r, func(req Request, w http.ResponseWriter) *Error { blobId := chi.URLParam(req.r, UriParamBlobId) name := chi.URLParam(req.r, UriParamBlobName) diff --git a/services/groupware/pkg/groupware/groupware_api_identity.go b/services/groupware/pkg/groupware/groupware_api_identity.go index e5cdbd7b77..6a7a993534 100644 --- a/services/groupware/pkg/groupware/groupware_api_identity.go +++ b/services/groupware/pkg/groupware/groupware_api_identity.go @@ -24,7 +24,7 @@ type SwaggerGetIdentitiesResponse struct { // 400: ErrorResponse400 // 404: ErrorResponse404 // 500: ErrorResponse500 -func (g Groupware) GetIdentities(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetIdentities(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { res, err := g.jmap.GetIdentity(req.GetAccountId(), req.session, req.ctx, req.logger) if err != nil { diff --git a/services/groupware/pkg/groupware/groupware_api_mailbox.go b/services/groupware/pkg/groupware/groupware_api_mailbox.go index dac8667362..2cf219e20b 100644 --- a/services/groupware/pkg/groupware/groupware_api_mailbox.go +++ b/services/groupware/pkg/groupware/groupware_api_mailbox.go @@ -31,7 +31,7 @@ type SwaggerGetMailboxById200 struct { // 400: ErrorResponse400 // 404: ErrorResponse404 // 500: ErrorResponse500 -func (g Groupware) GetMailbox(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetMailbox(w http.ResponseWriter, r *http.Request) { mailboxId := chi.URLParam(r, UriParamMailboxId) if mailboxId == "" { w.WriteHeader(http.StatusBadRequest) @@ -87,7 +87,7 @@ type SwaggerMailboxesResponse200 struct { // 200: MailboxesResponse200 // 400: ErrorResponse400 // 500: ErrorResponse500 -func (g Groupware) GetMailboxes(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetMailboxes(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() var filter jmap.MailboxFilterCondition diff --git a/services/groupware/pkg/groupware/groupware_api_messages.go b/services/groupware/pkg/groupware/groupware_api_messages.go index f3343a3d9a..83bbc2a6a6 100644 --- a/services/groupware/pkg/groupware/groupware_api_messages.go +++ b/services/groupware/pkg/groupware/groupware_api_messages.go @@ -1,6 +1,7 @@ package groupware import ( + "context" "fmt" "net/http" "strings" @@ -51,7 +52,7 @@ type SwaggerGetAllMessagesInMailboxSince200 struct { // 400: ErrorResponse400 // 404: ErrorResponse404 // 500: ErrorResponse500 -func (g Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Request) { mailboxId := chi.URLParam(r, UriParamMailboxId) since := r.Header.Get(HeaderSince) @@ -115,7 +116,7 @@ func (g Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reques } } -func (g Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, UriParamMessageId) g.respond(w, r, func(req Request) Response { ids := strings.Split(id, ",") @@ -138,7 +139,7 @@ func (g Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) { }) } -func (g Groupware) getMessagesSince(w http.ResponseWriter, r *http.Request, since string) { +func (g *Groupware) getMessagesSince(w http.ResponseWriter, r *http.Request, since string) { g.respond(w, r, func(req Request) Response { l := req.logger.With().Str(QueryParamSince, since) maxChanges, ok, err := req.parseNumericParam(QueryParamMaxChanges, -1) @@ -183,7 +184,7 @@ type MessageSearchResults struct { QueryState string `json:"queryState,omitempty"` } -func (g Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, int, int, *log.Logger, Response) { +func (g *Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, int, int, *log.Logger, Response) { q := req.r.URL.Query() mailboxId := q.Get(QueryParamMailboxId) notInMailboxIds := q[QueryParamNotInMailboxId] @@ -313,7 +314,7 @@ func (g Groupware) buildFilter(req Request) (bool, jmap.EmailFilterElement, int, return true, filter, offset, limit, logger, Response{} } -func (g Groupware) searchMessages(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) searchMessages(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { ok, filter, offset, limit, logger, errResp := g.buildFilter(req) if !ok { @@ -341,7 +342,7 @@ func (g Groupware) searchMessages(w http.ResponseWriter, r *http.Request) { logger = log.From(logger.With().Bool(QueryParamSearchFetchBodies, fetchBodies)) } - results, jerr := g.jmap.QueryEmails(req.GetAccountId(), filter, req.session, req.ctx, logger, offset, limit, fetchBodies, g.maxBodyValueBytes) + results, jerr := g.jmap.QueryEmailsWithSnippets(req.GetAccountId(), filter, req.session, req.ctx, logger, offset, limit, fetchBodies, g.maxBodyValueBytes) if jerr != nil { return req.errorResponseFromJmap(jerr) } @@ -383,7 +384,7 @@ func (g Groupware) searchMessages(w http.ResponseWriter, r *http.Request) { }) } -func (g Groupware) GetMessages(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetMessages(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() since := q.Get(QueryParamSince) if since == "" { @@ -409,7 +410,7 @@ type MessageCreation struct { BodyValues map[string]jmap.EmailBodyValue `json:"bodyValues,omitempty"` } -func (g Groupware) CreateMessage(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) CreateMessage(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { logger := req.logger @@ -445,11 +446,11 @@ func (g Groupware) CreateMessage(w http.ResponseWriter, r *http.Request) { return req.errorResponseFromJmap(jerr) } - return response(created.Email, created.State) + return response(created.Email, created.SessionState) }) } -func (g Groupware) UpdateMessage(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) UpdateMessage(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { messageId := chi.URLParam(r, UriParamMessageId) @@ -483,12 +484,12 @@ func (g Groupware) UpdateMessage(w http.ResponseWriter, r *http.Request) { "An internal API behaved unexpectedly: wrong Email update ID response from JMAP endpoint"))) } - return response(updatedEmail, result.State) + return response(updatedEmail, result.SessionState) }) } -func (g Groupware) DeleteMessage(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) DeleteMessage(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { messageId := chi.URLParam(r, UriParamMessageId) @@ -502,6 +503,113 @@ func (g Groupware) DeleteMessage(w http.ResponseWriter, r *http.Request) { return req.errorResponseFromJmap(jerr) } - return noContentResponse(deleted.State) + return noContentResponse(deleted.SessionState) + }) +} + +type AboutMessageEmailsEvent struct { + Id string `json:"id"` + Source string `json:"source"` + Emails []jmap.Email `json:"emails"` +} + +type AboutMessageResponse struct { + Email jmap.Email `json:"email"` + RequestId string `json:"requestId"` + // IV + // Key (AES-256) +} + +func relatedEmails(email jmap.Email, beacon time.Time, days int) jmap.EmailFilterElement { + filters := []jmap.EmailFilterElement{} + for _, from := range email.From { + if from.Email != "" { + filters = append(filters, jmap.EmailFilterCondition{From: from.Email}) + } + } + for _, sender := range email.Sender { + if sender.Email != "" { + filters = append(filters, jmap.EmailFilterCondition{From: sender.Email}) + } + } + + timeFilter := jmap.EmailFilterCondition{ + Before: beacon.Add(time.Duration(days) * time.Hour * 24), + After: beacon.Add(time.Duration(-days) * time.Hour * 24), + } + + var filter jmap.EmailFilterElement + if len(filters) > 0 { + filter = jmap.EmailFilterOperator{ + Operator: jmap.And, + Conditions: []jmap.EmailFilterElement{ + timeFilter, + jmap.EmailFilterOperator{ + Operator: jmap.Or, + Conditions: filters, + }, + }, + } + } else { + filter = timeFilter + } + + return filter +} + +func (g *Groupware) AboutMessage(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, UriParamMessageId) + + limit := 10 // TODO configurable + days := 3 // TODO configurable + + g.respond(w, r, func(req Request) Response { + reqId := req.GetRequestId() + accountId := req.GetAccountId() + logger := log.From(req.logger.With().Str("id", log.SafeString(id))) + emails, jerr := g.jmap.GetEmails(accountId, req.session, req.ctx, logger, []string{id}, true, g.maxBodyValueBytes) + if jerr != nil { + return req.errorResponseFromJmap(jerr) + } + if len(emails.Emails) < 1 { + logger.Trace().Msg("failed to find any emails matching id") + return notFoundResponse(emails.SessionState) + } + email := emails.Emails[0] + + beacon := email.ReceivedAt // TODO configurable: either relative to when the email was received, or relative to now + //beacon := time.Now() + filter := relatedEmails(email, beacon, days) + + // bgctx, _ := context.WithTimeout(context.Background(), time.Duration(30)*time.Second) // TODO configurable + bgctx := context.Background() + + g.job(logger, "query related emails", func(jobId uint64, l *log.Logger) { + results, jerr := g.jmap.QueryEmails(accountId, filter, req.session, bgctx, l, 0, limit, false, g.maxBodyValueBytes) + if jerr != nil { + l.Error().Err(jerr) + } else { + l.Trace().Msgf("about query found %v emails", len(results.Results)) + // TODO filter out the original email + req.push("email", AboutMessageEmailsEvent{Id: reqId, Emails: results.Results, Source: "same-sender"}) + } + }) + + g.job(logger, "emails in thread", func(jobId uint64, l *log.Logger) { + results, jerr := g.jmap.EmailsInThread(accountId, email.ThreadId, req.session, bgctx, l, false, g.maxBodyValueBytes) + l.Info().Interface("results", results).Msg("emails in thread?") + if jerr != nil { + l.Error().Err(jerr) + } else { + l.Trace().Msgf("about thread query found %v emails", len(results.Emails)) + // TODO filter out the original email + req.push("email", AboutMessageEmailsEvent{Id: reqId, Emails: results.Emails, Source: "same-thread"}) + } + }) + + return response(AboutMessageResponse{ + Email: email, + RequestId: reqId, + }, emails.State) }) } diff --git a/services/groupware/pkg/groupware/groupware_api_vacation.go b/services/groupware/pkg/groupware/groupware_api_vacation.go index 82e1bd6934..0d32307efa 100644 --- a/services/groupware/pkg/groupware/groupware_api_vacation.go +++ b/services/groupware/pkg/groupware/groupware_api_vacation.go @@ -28,7 +28,7 @@ type SwaggerGetVacationResponse200 struct { // 200: GetVacationResponse200 // 400: ErrorResponse400 // 500: ErrorResponse500 -func (g Groupware) GetVacation(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) GetVacation(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { res, err := g.jmap.GetVacationResponse(req.GetAccountId(), req.session, req.ctx, req.logger) if err != nil { @@ -58,7 +58,7 @@ type SwaggerSetVacationResponse200 struct { // 200: SetVacationResponse200 // 400: ErrorResponse400 // 500: ErrorResponse500 -func (g Groupware) SetVacation(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) SetVacation(w http.ResponseWriter, r *http.Request) { g.respond(w, r, func(req Request) Response { var body jmap.VacationResponsePayload err := req.body(&body) diff --git a/services/groupware/pkg/groupware/groupware_framework.go b/services/groupware/pkg/groupware/groupware_framework.go index 6f34028eff..a6f6567654 100644 --- a/services/groupware/pkg/groupware/groupware_framework.go +++ b/services/groupware/pkg/groupware/groupware_framework.go @@ -9,10 +9,14 @@ import ( "net/http" "net/url" "strconv" + "sync" + "sync/atomic" "time" "github.com/go-chi/chi/v5" + chimiddleware "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/render" + "github.com/r3labs/sse/v2" "github.com/rs/zerolog" "github.com/jellydator/ttlcache/v3" @@ -24,6 +28,7 @@ import ( const ( logUsername = "username" // this should match jmap.logUsername to avoid having the field twice in the logs under different keys + logUserId = "user-id" logErrorId = "error-id" logErrorCode = "code" logErrorStatus = "status" @@ -36,14 +41,37 @@ const ( logQuery = "query" ) +type User interface { + GetUsername() string + GetId() string +} + +type UserProvider interface { + // Provide the user for JMAP operations. + GetUser(req *http.Request, ctx context.Context, logger *log.Logger) (User, error) +} + +type Job struct { + id uint64 + description string + logger *log.Logger + job func(uint64, *log.Logger) +} + type Groupware struct { mux *chi.Mux + sseServer *sse.Server + streams map[string]time.Time + streamsLock sync.Mutex logger *log.Logger defaultEmailLimit int maxBodyValueBytes int sessionCache *ttlcache.Cache[string, cachedSession] jmap *jmap.Client - usernameProvider UsernameProvider + userProvider UserProvider + eventChannel chan Event + jobsChannel chan Job + jobCounter atomic.Uint64 } type GroupwareInitializationError struct { @@ -77,6 +105,12 @@ func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, var _ jmap.SessionEventListener = GroupwareSessionEventListener{} +type Event struct { + Type string + Stream string + Body any +} + func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) { baseUrl, err := url.Parse(config.Mail.BaseUrl) if err != nil { @@ -102,14 +136,16 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro sessionCacheTtl := max(config.Mail.SessionCache.Ttl, 0) sessionFailureCacheTtl := max(config.Mail.SessionCache.FailureTtl, 0) + keepStreamsAlive := true // TODO configuration + tr := http.DefaultTransport.(*http.Transport).Clone() tr.ResponseHeaderTimeout = responseHeaderTimeout - tlsConfig := &tls.Config{InsecureSkipVerify: true} + tlsConfig := &tls.Config{InsecureSkipVerify: true} // TODO make configurable tr.TLSClientConfig = tlsConfig c := *http.DefaultClient c.Transport = tr - usernameProvider := NewRevaContextUsernameProvider() + userProvider := NewRevaContextUsernameProvider() api := jmap.NewHttpJmapApiClient( *baseUrl, @@ -161,24 +197,144 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro sessionEventListener := GroupwareSessionEventListener{sessionCache: sessionCache, logger: logger} jmapClient.AddSessionEventListener(&sessionEventListener) - return &Groupware{ + eventChannel := make(chan Event, 100) // TODO make channel queue buffering size configurable + + sseServer := sse.New() + sseServer.EventTTL = time.Duration(5) * time.Minute // TODO configuration setting + + workerQueueSize := 100 // TODO configuration setting + workerPoolSize := 10 // TODO configuration setting + jobsChannel := make(chan Job, workerQueueSize) + + g := &Groupware{ mux: mux, + sseServer: sseServer, + streams: map[string]time.Time{}, + streamsLock: sync.Mutex{}, logger: logger, sessionCache: sessionCache, - usernameProvider: usernameProvider, + userProvider: userProvider, jmap: &jmapClient, defaultEmailLimit: defaultEmailLimit, maxBodyValueBytes: maxBodyValueBytes, - }, nil + eventChannel: eventChannel, + jobsChannel: jobsChannel, + jobCounter: atomic.Uint64{}, + } + + /* + sessionCache.OnInsertion(func(c context.Context, item *ttlcache.Item[string, cachedSession]) { + str := sseServer.CreateStream(item.Key()) + if logger.Trace().Enabled() { + logger.Trace().Msgf("created stream %v for '%v'", log.SafeString(str.ID), log.SafeString(item.Key())) + } + }) + */ + + for w := 1; w <= workerPoolSize; w++ { + go g.worker(jobsChannel) + } + + if keepStreamsAlive { + ticker := time.NewTicker(time.Duration(30) * time.Second) // TODO configuration + //defer ticker.Stop() + go func() { + for range ticker.C { + g.keepStreamsAlive() + } + }() + } + + go g.listenForEvents() + + return g, nil } -func (g Groupware) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) worker(jobs <-chan Job) { + for job := range jobs { + before := time.Now() + logger := log.From(job.logger.With().Str("job", job.description).Uint64("job-id", job.id)) + job.job(job.id, logger) + logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove + } +} + +func (g *Groupware) job(logger *log.Logger, description string, f func(uint64, *log.Logger)) uint64 { + id := g.jobCounter.Add(1) + before := time.Now() + g.jobsChannel <- Job{id: id, description: description, logger: logger, job: f} + g.logger.Trace().Msgf("pushed job %d [%s] in %v", id, description, time.Since(before)) // TODO remove + return id +} + +func (g *Groupware) listenForEvents() { + for ev := range g.eventChannel { + data, err := json.Marshal(ev.Body) + if err == nil { + published := g.sseServer.TryPublish(ev.Stream, &sse.Event{ + Event: []byte(ev.Type), + Data: data, + }) + if !published && g.logger.Debug().Enabled() { + g.logger.Debug().Str("stream", log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details + } + } else { + g.logger.Error().Err(err).Msgf("failed to serialize %T body to JSON", ev) + } + } +} + +func (g *Groupware) push(user User, typ string, body any) { + g.eventChannel <- Event{Type: typ, Stream: user.GetUsername(), Body: body} +} + +func (g *Groupware) ServeHTTP(w http.ResponseWriter, r *http.Request) { g.mux.ServeHTTP(w, r) } +func (g *Groupware) addStream(stream string) bool { + g.streamsLock.Lock() + defer g.streamsLock.Unlock() + _, ok := g.streams[stream] + if ok { + return false + } + g.streams[stream] = time.Now() + return true +} + +func (g *Groupware) keepStreamsAlive() { + event := &sse.Event{Comment: []byte("keepalive")} + g.streamsLock.Lock() + defer g.streamsLock.Unlock() + for stream := range g.streams { + g.sseServer.Publish(stream, event) + } +} + +func (g *Groupware) ServeSSE(w http.ResponseWriter, r *http.Request) { + g.withSession(w, r, func(req Request) Response { + stream := req.GetUser().GetUsername() + + if g.addStream(stream) { + str := g.sseServer.CreateStream(stream) + if g.logger.Trace().Enabled() { + g.logger.Trace().Msgf("created stream '%v'", log.SafeString(str.ID)) + } + } + + q := r.URL.Query() + q.Set("stream", stream) + r.URL.RawQuery = q.Encode() + + g.sseServer.ServeHTTP(w, r) + return Response{} + }) +} + // Provide a JMAP Session for the -func (g Groupware) session(username string, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, error) { - item := g.sessionCache.Get(username) +func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *log.Logger) (jmap.Session, bool, error) { + item := g.sessionCache.Get(user.GetUsername()) if item != nil { value := item.Value() if value != nil { @@ -196,6 +352,8 @@ func (g Groupware) session(username string, _ *http.Request, _ context.Context, // API of handlers but also to make it easier to expand it in the future without having to modify // the parameter list of every single handler function type Request struct { + g *Groupware + user User r *http.Request ctx context.Context logger *log.Logger @@ -204,6 +362,7 @@ type Request struct { type Response struct { body any + status int err *Error etag string sessionState string @@ -247,22 +406,56 @@ func etagOnlyResponse(body any, etag string) Response { func noContentResponse(sessionStatus string) Response { return Response{ - body: "", + body: nil, + status: http.StatusNoContent, err: nil, etag: sessionStatus, sessionState: sessionStatus, } } +func acceptedResponse(sessionStatus string) Response { + return Response{ + body: nil, + status: http.StatusAccepted, + err: nil, + etag: sessionStatus, + sessionState: sessionStatus, + } +} + +func timeoutResponse(sessionStatus string) Response { + return Response{ + body: nil, + status: http.StatusRequestTimeout, + err: nil, + etag: "", + sessionState: sessionStatus, + } +} + func notFoundResponse(sessionStatus string) Response { return Response{ body: nil, + status: http.StatusNotFound, err: nil, etag: sessionStatus, sessionState: sessionStatus, } } +func (r Request) push(typ string, event any) { + r.g.push(r.user, typ, event) +} + +func (r Request) GetUser() User { + return r.user +} + +func (r Request) GetRequestId() string { + return chimiddleware.GetReqID(r.ctx) +} + func (r Request) GetAccountId() string { accountId := chi.URLParam(r.r, UriParamAccount) return r.session.MailAccountId(accountId) @@ -368,7 +561,7 @@ func (r Request) body(target any) *Error { return nil } -func (g Groupware) log(error *Error) { +func (g *Groupware) log(error *Error) { var level *zerolog.Event if error.NumStatus < 300 { // shouldn't land here, but just in case: 1xx and 2xx are "OK" and should be logged as debug @@ -401,7 +594,7 @@ func (g Groupware) log(error *Error) { l.Msg(error.Title) } -func (g Groupware) serveError(w http.ResponseWriter, r *http.Request, error *Error) { +func (g *Groupware) serveError(w http.ResponseWriter, r *http.Request, error *Error) { if error == nil { return } @@ -412,24 +605,24 @@ func (g Groupware) serveError(w http.ResponseWriter, r *http.Request, error *Err render.Render(w, r, errorResponses(*error)) } -func (g Groupware) withSession(w http.ResponseWriter, r *http.Request, handler func(r Request) Response) (Response, bool) { +func (g *Groupware) withSession(w http.ResponseWriter, r *http.Request, handler func(r Request) Response) (Response, bool) { ctx := r.Context() sl := g.logger.SubloggerWithRequestID(ctx) logger := &sl - username, ok, err := g.usernameProvider.GetUsername(r, ctx, logger) + user, err := g.userProvider.GetUser(r, ctx, logger) if err != nil { g.serveError(w, r, apiError(errorId(r, ctx), ErrorInvalidAuthentication)) return Response{}, false } - if !ok { + if user == nil { g.serveError(w, r, apiError(errorId(r, ctx), ErrorMissingAuthentication)) return Response{}, false } - logger = log.From(logger.With().Str(logUsername, log.SafeString(username))) + logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId()))) - session, ok, err := g.session(username, r, ctx, logger) + session, ok, err := g.session(user, r, ctx, logger) if err != nil { logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session") render.Status(r, http.StatusInternalServerError) @@ -444,6 +637,8 @@ func (g Groupware) withSession(w http.ResponseWriter, r *http.Request, handler f decoratedLogger := session.DecorateLogger(*logger) req := Request{ + g: g, + user: user, r: r, ctx: ctx, logger: decoratedLogger, @@ -454,7 +649,7 @@ func (g Groupware) withSession(w http.ResponseWriter, r *http.Request, handler f return response, true } -func (g Groupware) sendResponse(w http.ResponseWriter, r *http.Request, response Response) { +func (g *Groupware) sendResponse(w http.ResponseWriter, r *http.Request, response Response) { if response.err != nil { g.log(response.err) w.Header().Add("Content-Type", ContentTypeJsonApi) @@ -474,17 +669,15 @@ func (g Groupware) sendResponse(w http.ResponseWriter, r *http.Request, response } switch response.body { - case nil: - w.WriteHeader(http.StatusNotFound) - case "": - w.WriteHeader(http.StatusNoContent) + case nil, "": + w.WriteHeader(response.status) default: render.Status(r, http.StatusOK) render.JSON(w, r, response.body) } } -func (g Groupware) respond(w http.ResponseWriter, r *http.Request, handler func(r Request) Response) { +func (g *Groupware) respond(w http.ResponseWriter, r *http.Request, handler func(r Request) Response) { response, ok := g.withSession(w, r, handler) if !ok { return @@ -492,24 +685,24 @@ func (g Groupware) respond(w http.ResponseWriter, r *http.Request, handler func( g.sendResponse(w, r, response) } -func (g Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(r Request, w http.ResponseWriter) *Error) { +func (g *Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(r Request, w http.ResponseWriter) *Error) { ctx := r.Context() sl := g.logger.SubloggerWithRequestID(ctx) logger := &sl - username, ok, err := g.usernameProvider.GetUsername(r, ctx, logger) + user, err := g.userProvider.GetUser(r, ctx, logger) if err != nil { g.serveError(w, r, apiError(errorId(r, ctx), ErrorInvalidAuthentication)) return } - if !ok { + if user == nil { g.serveError(w, r, apiError(errorId(r, ctx), ErrorMissingAuthentication)) return } - logger = log.From(logger.With().Str(logUsername, log.SafeString(username))) + logger = log.From(logger.With().Str(logUsername, log.SafeString(user.GetUsername())).Str(logUserId, log.SafeString(user.GetId()))) - session, ok, err := g.session(username, r, ctx, logger) + session, ok, err := g.session(user, r, ctx, logger) if err != nil { logger.Error().Err(err).Interface(logQuery, r.URL.Query()).Msg("failed to determine JMAP session") render.Status(r, http.StatusInternalServerError) @@ -540,12 +733,22 @@ func (g Groupware) stream(w http.ResponseWriter, r *http.Request, handler func(r } } -func (g Groupware) NotFound(w http.ResponseWriter, r *http.Request) { +func (g *Groupware) NotFound(w http.ResponseWriter, r *http.Request) { level := g.logger.Debug() if level.Enabled() { path := log.SafeString(r.URL.Path) - level.Str("path", path).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path) + method := log.SafeString(r.Method) + level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path) + } + w.WriteHeader(http.StatusNotFound) +} + +func (g *Groupware) MethodNotAllowed(w http.ResponseWriter, r *http.Request) { + level := g.logger.Debug() + if level.Enabled() { + path := log.SafeString(r.URL.Path) + method := log.SafeString(r.Method) + level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method) } - render.Status(r, http.StatusNotFound) w.WriteHeader(http.StatusNotFound) } diff --git a/services/groupware/pkg/groupware/groupware_reva.go b/services/groupware/pkg/groupware/groupware_reva.go index 4572a603be..d9f0a50a47 100644 --- a/services/groupware/pkg/groupware/groupware_reva.go +++ b/services/groupware/pkg/groupware/groupware_reva.go @@ -2,35 +2,51 @@ package groupware import ( "context" + "errors" "net/http" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/opencloud-eu/opencloud/pkg/log" revactx "github.com/opencloud-eu/reva/v2/pkg/ctx" ) // UsernameProvider implementation that uses Reva's enrichment of the Context // to retrieve the current username. -type revaContextUsernameProvider struct { +type revaContextUserProvider struct { } -type UsernameProvider interface { - // Provide the username for JMAP operations. - GetUsername(req *http.Request, ctx context.Context, logger *log.Logger) (string, bool, error) -} +var _ UserProvider = revaContextUserProvider{} -var _ UsernameProvider = revaContextUsernameProvider{} - -func NewRevaContextUsernameProvider() UsernameProvider { - return revaContextUsernameProvider{} +func NewRevaContextUsernameProvider() UserProvider { + return revaContextUserProvider{} } // var errUserNotInContext = fmt.Errorf("user not in context") -func (r revaContextUsernameProvider) GetUsername(req *http.Request, ctx context.Context, logger *log.Logger) (string, bool, error) { +var ( + errUserNotInRevaContext = errors.New("failed to find user in reva context") +) + +func (r revaContextUserProvider) GetUser(req *http.Request, ctx context.Context, logger *log.Logger) (User, error) { u, ok := revactx.ContextGetUser(ctx) if !ok { - logger.Error().Ctx(ctx).Msgf("could not get user: user not in reva context: %v", ctx) - return "", false, nil + err := errUserNotInRevaContext + logger.Error().Err(err).Ctx(ctx).Msgf("could not get user: user not in reva context: %v", ctx) + return nil, err } - return u.GetUsername(), true, nil + return RevaUser{user: u}, nil } + +type RevaUser struct { + user *userv1beta1.User +} + +func (r RevaUser) GetUsername() string { + return r.user.GetUsername() +} + +func (r RevaUser) GetId() string { + return r.user.GetId().GetOpaqueId() +} + +var _ User = RevaUser{} diff --git a/services/groupware/pkg/groupware/groupware_route.go b/services/groupware/pkg/groupware/groupware_route.go index 87a69821ce..bd98511d5e 100644 --- a/services/groupware/pkg/groupware/groupware_route.go +++ b/services/groupware/pkg/groupware/groupware_route.go @@ -10,6 +10,7 @@ const ( UriParamMessageId = "messageid" UriParamBlobId = "blobid" UriParamBlobName = "blobname" + UriParamStreamId = "stream" QueryParamMailboxSearchName = "name" QueryParamMailboxSearchRole = "role" QueryParamMailboxSearchSubscribed = "subscribed" @@ -37,7 +38,9 @@ const ( HeaderSince = "if-none-match" ) -func (g Groupware) Route(r chi.Router) { +func (g *Groupware) Route(r chi.Router) { + r.HandleFunc("/events/{stream}", g.ServeSSE) + r.Get("/", g.Index) r.Get("/accounts", g.GetAccounts) r.Route("/accounts/{accountid}", func(r chi.Router) { @@ -57,12 +60,15 @@ func (g Groupware) Route(r chi.Router) { r.Get("/{messageid}", g.GetMessagesById) // r.Put("/{messageid}", g.ReplaceMessage) // TODO r.Patch("/{messageid}", g.UpdateMessage) - r.Delete("/{messageId}", g.DeleteMessage) + r.Delete("/{messageid}", g.DeleteMessage) + r.MethodFunc("REPORT", "/{messageid}", g.AboutMessage) }) r.Route("/blobs", func(r chi.Router) { r.Get("/{blobid}", g.GetBlob) r.Get("/{blobid}/{blobname}", g.DownloadBlob) // ?type= }) }) + r.NotFound(g.NotFound) + r.MethodNotAllowed(g.MethodNotAllowed) }