From db277cf9b076f536b80637de903ecd2cd8f835c8 Mon Sep 17 00:00:00 2001 From: Pascal Bleser Date: Fri, 28 Nov 2025 14:34:31 +0100 Subject: [PATCH] groupware: WS push improvements, add getting email changes to WS integration test --- pkg/jmap/jmap_api.go | 4 +- pkg/jmap/jmap_api_email.go | 35 ++++++- pkg/jmap/jmap_api_ws.go | 12 +-- pkg/jmap/jmap_client.go | 12 +-- pkg/jmap/jmap_http.go | 43 +++++---- pkg/jmap/jmap_integration_email_test.go | 42 ++++---- pkg/jmap/jmap_integration_ws_test.go | 123 ++++++++++++++++++++++-- pkg/jmap/jmap_model.go | 26 ++++- pkg/structs/structs.go | 10 ++ 9 files changed, 235 insertions(+), 72 deletions(-) diff --git a/pkg/jmap/jmap_api.go b/pkg/jmap/jmap_api.go index d864d3087f..cede3f1904 100644 --- a/pkg/jmap/jmap_api.go +++ b/pkg/jmap/jmap_api.go @@ -14,7 +14,7 @@ type ApiClient interface { } type WsPushListener interface { - OnNotification(stateChange StateChange) + OnNotification(username string, stateChange StateChange) } type WsClient interface { @@ -23,7 +23,7 @@ type WsClient interface { } type WsClientFactory interface { - EnableNotifications(pushState string, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) + EnableNotifications(pushState State, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) io.Closer } diff --git a/pkg/jmap/jmap_api_email.go b/pkg/jmap/jmap_api_email.go index bdcab1a855..b145f7d616 100644 --- a/pkg/jmap/jmap_api_email.go +++ b/pkg/jmap/jmap_api_email.go @@ -188,10 +188,39 @@ func (j *Client) GetAllEmailsInMailbox(accountId string, session *Session, ctx c }) } +func (j *Client) GetEmailChanges(accountId string, session *Session, ctx context.Context, logger *log.Logger, acceptLanguage string, sinceState State, maxChanges uint) (EmailChangesResponse, SessionState, State, Language, Error) { + logger = j.loggerParams("GetEmailChanges", session, logger, func(z zerolog.Context) zerolog.Context { + return z.Str(logSinceState, string(sinceState)) + }) + + changes := EmailChangesCommand{ + AccountId: accountId, + SinceState: sinceState, + } + if maxChanges > 0 { + changes.MaxChanges = maxChanges + } + + cmd, err := j.request(session, logger, invocation(CommandEmailChanges, changes, "0")) + if err != nil { + return EmailChangesResponse{}, "", "", "", simpleError(err, JmapErrorInvalidJmapRequestPayload) + } + + return command(j.api, logger, ctx, session, j.onSessionOutdated, cmd, acceptLanguage, func(body *Response) (EmailChangesResponse, State, Error) { + var changesResponse EmailChangesResponse + err = retrieveResponseMatchParameters(logger, body, CommandEmailChanges, "0", &changesResponse) + if err != nil { + return EmailChangesResponse{}, "", err + } + + return changesResponse, changesResponse.NewState, nil + }) +} + // Get all the Emails that have been created, updated or deleted since a given state. -func (j *Client) GetEmailsSince(accountId string, session *Session, ctx context.Context, logger *log.Logger, acceptLanguage string, sinceState string, fetchBodies bool, maxBodyValueBytes uint, maxChanges uint) (MailboxChanges, SessionState, State, Language, Error) { +func (j *Client) GetEmailsSince(accountId string, session *Session, ctx context.Context, logger *log.Logger, acceptLanguage string, sinceState State, fetchBodies bool, maxBodyValueBytes uint, maxChanges uint) (MailboxChanges, SessionState, State, Language, Error) { logger = j.loggerParams("GetEmailsSince", session, logger, func(z zerolog.Context) zerolog.Context { - return z.Bool(logFetchBodies, fetchBodies).Str(logSinceState, sinceState) + return z.Bool(logFetchBodies, fetchBodies).Str(logSinceState, string(sinceState)) }) changes := EmailChangesCommand{ @@ -254,7 +283,7 @@ func (j *Client) GetEmailsSince(accountId string, session *Session, ctx context. HasMoreChanges: changesResponse.HasMoreChanges, NewState: changesResponse.NewState, Created: createdResponse.List, - Updated: createdResponse.List, + Updated: updatedResponse.List, }, updatedResponse.State, nil }) } diff --git a/pkg/jmap/jmap_api_ws.go b/pkg/jmap/jmap_api_ws.go index 71b8ce8c8f..3af91c454f 100644 --- a/pkg/jmap/jmap_api_ws.go +++ b/pkg/jmap/jmap_api_ws.go @@ -1,13 +1,9 @@ package jmap -import ( - "github.com/opencloud-eu/opencloud/pkg/log" -) - -func (j *Client) EnablePush(pushState string, session *Session, _ *log.Logger) Error { - panic("not implemented") // TODO implement push +func (j *Client) EnablePushNotifications(pushState State, sessionProvider func() (*Session, error)) (WsClient, error) { + return j.ws.EnableNotifications(pushState, sessionProvider, j) } -func (j *Client) DisablePush(_ *Session, _ *log.Logger) Error { - panic("not implemented") // TODO implement push +func (j *Client) AddWsPushListener(listener WsPushListener) { + j.wsPushListeners.add(listener) } diff --git a/pkg/jmap/jmap_client.go b/pkg/jmap/jmap_client.go index f1133eaa2f..b16d4a012d 100644 --- a/pkg/jmap/jmap_client.go +++ b/pkg/jmap/jmap_client.go @@ -38,10 +38,6 @@ func NewClient(session SessionClient, api ApiClient, blob BlobClient, ws WsClien } } -func (j *Client) EnableNotifications(pushState string, sessionProvider func() (*Session, error)) (WsClient, error) { - return j.ws.EnableNotifications(pushState, sessionProvider, j) -} - func (j *Client) AddSessionEventListener(listener SessionEventListener) { j.sessionEventListeners.add(listener) } @@ -52,13 +48,9 @@ func (j *Client) onSessionOutdated(session *Session, newSessionState SessionStat }) } -func (j *Client) AddWsPushListener(listener WsPushListener) { - j.wsPushListeners.add(listener) -} - -func (j *Client) OnNotification(stateChange StateChange) { +func (j *Client) OnNotification(username string, stateChange StateChange) { j.wsPushListeners.signal(func(listener WsPushListener) { - listener.OnNotification(stateChange) + listener.OnNotification(username, stateChange) }) } diff --git a/pkg/jmap/jmap_http.go b/pkg/jmap/jmap_http.go index 50dc459c6d..a7851f8c12 100644 --- a/pkg/jmap/jmap_http.go +++ b/pkg/jmap/jmap_http.go @@ -426,7 +426,7 @@ type WebSocketPushEnable struct { // The last "pushState" token that the client received from the server. // Upon receipt of a "pushState" token, the server SHOULD immediately send all changes since that state token. - PushState string `json:"pushState,omitempty"` + PushState State `json:"pushState,omitempty"` } type WebSocketPushDisable struct { @@ -541,46 +541,48 @@ func (w *HttpWsClient) readPump() { //c.conn.SetReadDeadline(time.Now().Add(pongWait)) //c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + logger := log.From(w.logger.With().Str("username", w.username)) + for { if _, message, err := w.c.ReadMessage(); err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - w.logger.Error().Err(err).Msg("unexpected close") + logger.Error().Err(err).Msg("unexpected close") } break } else { - if w.logger.Trace().Enabled() { - w.logger.Trace().Str(logEndpoint, w.endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypePush).Msg(string(message)) + if logger.Trace().Enabled() { + logger.Trace().Str(logEndpoint, w.endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypePush).Msg(string(message)) } var peek struct { Type string `json:"@type"` } if err := json.Unmarshal(message, &peek); err != nil { - w.logger.Error().Err(err).Msg("failed to deserialized pushed WS message") + logger.Error().Err(err).Msg("failed to deserialized pushed WS message") continue } switch peek.Type { case string(TypeOfStateChange): var stateChange StateChange if err := json.Unmarshal(message, &stateChange); err != nil { - w.logger.Error().Err(err).Msgf("failed to deserialized pushed WS message into a %T", stateChange) + logger.Error().Err(err).Msgf("failed to deserialized pushed WS message into a %T", stateChange) continue } else { if w.listener != nil { - w.listener.OnNotification(stateChange) + w.listener.OnNotification(w.username, stateChange) } else { - w.logger.Warn().Msgf("no listener to be notified of %v", stateChange) + logger.Warn().Msgf("no listener to be notified of %v", stateChange) } } default: - w.logger.Warn().Msgf("unsupported pushed WS message JMAP @type: '%s'", peek.Type) + logger.Warn().Msgf("unsupported pushed WS message JMAP @type: '%s'", peek.Type) continue } } } } -func (w *HttpWsClientFactory) EnableNotifications(pushState string, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) { +func (w *HttpWsClientFactory) EnableNotifications(pushState State, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) { c, username, endpoint, jerr := w.connect(sessionProvider) if jerr != nil { return nil, jerr @@ -628,18 +630,19 @@ func (c *HttpWsClient) DisableNotifications() Error { return nil } - err := c.c.WriteJSON(WebSocketPushDisable{ - Type: WebSocketPushTypeDisable, - }) - if err != nil { - return SimpleError{code: JmapErrorWssFailedToSendWebSocketPushDisable, err: err} - } + werr := c.c.WriteJSON(WebSocketPushDisable{Type: WebSocketPushTypeDisable}) + merr := c.c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + cerr := c.c.Close() - err = c.c.Close() - if err != nil { - return SimpleError{code: JmapErrorWssFailedToClose, err: err} + if werr != nil { + return SimpleError{code: JmapErrorWssFailedToClose, err: werr} + } + if merr != nil { + return SimpleError{code: JmapErrorWssFailedToClose, err: merr} + } + if cerr != nil { + return SimpleError{code: JmapErrorWssFailedToClose, err: cerr} } - return nil } diff --git a/pkg/jmap/jmap_integration_email_test.go b/pkg/jmap/jmap_integration_email_test.go index 65db7206e9..bb39c75023 100644 --- a/pkg/jmap/jmap_integration_email_test.go +++ b/pkg/jmap/jmap_integration_email_test.go @@ -73,7 +73,7 @@ func TestEmails(t *testing.T) { var threads int = 0 var mails []filledMail = nil { - mails, threads, err = s.fillEmailsWithImap(inboxFolder, count) + mails, threads, err = s.fillEmailsWithImap(inboxFolder, count, false) require.NoError(err) } mailsByMessageId := structs.Index(mails, func(mail filledMail) string { return mail.messageId }) @@ -271,7 +271,7 @@ var allKeywords = map[string]imap.Flag{ JmapKeywordSeen: imap.FlagSeen, } -func (s *StalwartTest) fillEmailsWithImap(folder string, count int) ([]filledMail, int, error) { +func (s *StalwartTest) fillEmailsWithImap(folder string, count int, empty bool) ([]filledMail, int, error) { to := fmt.Sprintf("%s <%s>", s.userPersonName, s.userEmail) ccEvery := 2 bccEvery := 3 @@ -302,24 +302,26 @@ func (s *StalwartTest) fillEmailsWithImap(folder string, count int) ([]filledMai return nil, 0, err } - if ids, err := c.Search(&imap.SearchCriteria{}, nil).Wait(); err != nil { - return nil, 0, err - } else { - if len(ids.AllSeqNums()) > 0 { - storeFlags := imap.StoreFlags{ - Op: imap.StoreFlagsAdd, - Flags: []imap.Flag{imap.FlagDeleted}, - Silent: true, - } - if err = c.Store(ids.All, &storeFlags, nil).Close(); err != nil { - return nil, 0, err - } - if err = c.Expunge().Close(); err != nil { - return nil, 0, err - } - log.Printf("đŸ—‘ī¸ deleted %d messages in %s", len(ids.AllSeqNums()), folder) + if empty { + if ids, err := c.Search(&imap.SearchCriteria{}, nil).Wait(); err != nil { + return nil, 0, err } else { - log.Printf("â„šī¸ did not delete any messages, %s is empty", folder) + if len(ids.AllSeqNums()) > 0 { + storeFlags := imap.StoreFlags{ + Op: imap.StoreFlagsAdd, + Flags: []imap.Flag{imap.FlagDeleted}, + Silent: true, + } + if err = c.Store(ids.All, &storeFlags, nil).Close(); err != nil { + return nil, 0, err + } + if err = c.Expunge().Close(); err != nil { + return nil, 0, err + } + log.Printf("đŸ—‘ī¸ deleted %d messages in %s", len(ids.AllSeqNums()), folder) + } else { + log.Printf("â„šī¸ did not delete any messages, %s is empty", folder) + } } } @@ -525,7 +527,7 @@ func (s *StalwartTest) fillEmailsWithImap(folder string, count int) ([]filledMai if inboxCount == -1 { return nil, 0, fmt.Errorf("failed to find folder '%v' via IMAP", folder) } - if count != inboxCount { + if empty && count != inboxCount { return nil, 0, fmt.Errorf("wrong number of emails in the inbox after filling, expecting %v, has %v", count, inboxCount) } diff --git a/pkg/jmap/jmap_integration_ws_test.go b/pkg/jmap/jmap_integration_ws_test.go index a6b8880b0a..e01f63e0c6 100644 --- a/pkg/jmap/jmap_integration_ws_test.go +++ b/pkg/jmap/jmap_integration_ws_test.go @@ -7,12 +7,15 @@ import ( "time" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/structs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) type testWsPushListener struct { + t *testing.T logger *log.Logger + username string mailAccountId string calls atomic.Uint32 m sync.Mutex @@ -21,9 +24,10 @@ type testWsPushListener struct { mailboxStates []string } -func (l *testWsPushListener) OnNotification(pushState StateChange) { +func (l *testWsPushListener) OnNotification(username string, pushState StateChange) { + assert.Equal(l.t, l.username, username) l.calls.Add(1) - // pushState is currently not supported by Stalwart, let's use the Email state instead + // pushState is currently not supported by Stalwart, let's use the object states instead l.logger.Debug().Msgf("received %T: %v", pushState, pushState) if changed, ok := pushState.Changed[l.mailAccountId]; ok { l.m.Lock() @@ -37,7 +41,12 @@ func (l *testWsPushListener) OnNotification(pushState StateChange) { l.mailboxStates = append(l.mailboxStates, st) } l.m.Unlock() + + unsupportedKeys := structs.Filter(structs.Keys(changed), func(o ObjectType) bool { return o != EmailType && o != ThreadType && o != MailboxType }) + assert.Empty(l.t, unsupportedKeys) } + unsupportedAccounts := structs.Filter(structs.Keys(pushState.Changed), func(s string) bool { return s != l.mailAccountId }) + assert.Empty(l.t, unsupportedAccounts) } var _ WsPushListener = &testWsPushListener{} @@ -47,6 +56,8 @@ func TestWs(t *testing.T) { return } + assert.NoError(t, nil) + require := require.New(t) s, err := newStalwartTest(t) @@ -59,11 +70,45 @@ func TestWs(t *testing.T) { _, inboxFolder = s.findInbox(t, mailAccountId) } - l := &testWsPushListener{logger: s.logger, mailAccountId: mailAccountId} + l := &testWsPushListener{t: t, username: s.username, logger: s.logger, mailAccountId: mailAccountId} s.client.AddWsPushListener(l) - require.Equal(uint32(0), l.calls.Load()) - wsc, err := s.client.EnableNotifications("", func() (*Session, error) { return s.session, nil }) + require.Equal(uint32(0), l.calls.Load()) + { + l.m.Lock() + require.Len(l.emailStates, 0) + require.Len(l.mailboxStates, 0) + require.Len(l.threadStates, 0) + l.m.Unlock() + } + + var initialState State + { + changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", "", 0) + require.NoError(err) + require.Equal(s.session.State, sessionState) + require.NotEmpty(state) + //fmt.Printf("\x1b[45;1;4mChanges [%s]:\x1b[0m\n", state) + //for _, c := range changes.Created { fmt.Printf("%s %s\n", c.Id, c.Subject) } + initialState = state + require.Empty(changes.Created) + require.Empty(changes.Destroyed) + require.Empty(changes.Updated) + } + require.NotEmpty(initialState) + + { + changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", initialState, 0) + require.NoError(err) + require.Equal(s.session.State, sessionState) + require.Equal(initialState, state) + require.Equal(initialState, changes.NewState) + require.Empty(changes.Created) + require.Empty(changes.Destroyed) + require.Empty(changes.Updated) + } + + wsc, err := s.client.EnablePushNotifications(initialState, func() (*Session, error) { return s.session, nil }) require.NoError(err) defer wsc.Close() @@ -76,8 +121,10 @@ func TestWs(t *testing.T) { l.m.Unlock() } + emailIds := []string{} + { - _, n, err := s.fillEmailsWithImap(inboxFolder, 1) + _, n, err := s.fillEmailsWithImap(inboxFolder, 1, false) require.NoError(err) require.Equal(1, n) } @@ -92,9 +139,24 @@ func TestWs(t *testing.T) { require.Len(l.threadStates, 1) l.m.Unlock() } + var lastState State + { + changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", initialState, 0) + require.NoError(err) + require.Equal(s.session.State, sessionState) + require.NotEqual(initialState, state) + require.NotEqual(initialState, changes.NewState) + require.Equal(state, changes.NewState) + require.Len(changes.Created, 1) + require.Empty(changes.Destroyed) + require.Empty(changes.Updated) + lastState = state + + emailIds = append(emailIds, changes.Created...) + } { - _, n, err := s.fillEmailsWithImap(inboxFolder, 1) + _, n, err := s.fillEmailsWithImap(inboxFolder, 1, false) require.NoError(err) require.Equal(1, n) } @@ -112,6 +174,53 @@ func TestWs(t *testing.T) { assert.NotEqual(t, l.threadStates[0], l.threadStates[1]) l.m.Unlock() } + { + changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", lastState, 0) + require.NoError(err) + require.Equal(s.session.State, sessionState) + require.NotEqual(lastState, state) + require.NotEqual(lastState, changes.NewState) + require.Equal(state, changes.NewState) + require.Len(changes.Created, 1) + require.Empty(changes.Destroyed) + require.Empty(changes.Updated) + lastState = state + + emailIds = append(emailIds, changes.Created...) + } + + { + _, n, err := s.fillEmailsWithImap(inboxFolder, 0, true) + require.NoError(err) + require.Equal(0, n) + } + + require.Eventually(func() bool { + return l.calls.Load() == uint32(3) + }, 3*time.Second, 200*time.Millisecond, "WS push listener was not called after third email state change") + { + l.m.Lock() + require.Len(l.emailStates, 3) + require.Len(l.mailboxStates, 3) + require.Len(l.threadStates, 3) + assert.NotEqual(t, l.emailStates[1], l.emailStates[2]) + assert.NotEqual(t, l.mailboxStates[1], l.mailboxStates[2]) + assert.NotEqual(t, l.threadStates[1], l.threadStates[2]) + l.m.Unlock() + } + { + changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", lastState, 0) + require.NoError(err) + require.Equal(s.session.State, sessionState) + require.NotEqual(lastState, state) + require.NotEqual(lastState, changes.NewState) + require.Equal(state, changes.NewState) + require.Empty(changes.Created) + require.Len(changes.Destroyed, 2) + require.EqualValues(emailIds, changes.Destroyed) + require.Empty(changes.Updated) + lastState = state + } err = wsc.DisableNotifications() require.NoError(err) diff --git a/pkg/jmap/jmap_model.go b/pkg/jmap/jmap_model.go index 889e08efda..bf01424cd8 100644 --- a/pkg/jmap/jmap_model.go +++ b/pkg/jmap/jmap_model.go @@ -1884,7 +1884,7 @@ type EmailChangesCommand struct { // // This is the string that was returned as the state argument in the Email/get response. // The server will return the changes that have occurred since this state. - SinceState string `json:"sinceState,omitzero,omitempty"` + SinceState State `json:"sinceState,omitzero,omitempty"` // The maximum number of ids to return in the response. // @@ -2705,6 +2705,10 @@ func invocation(command Command, parameters any, tag string) Invocation { } } +type TypeOfRequest string + +const RequestType = TypeOfRequest("Request") + type Request struct { // The set of capabilities the client wishes to use. // @@ -2722,8 +2726,19 @@ type Request struct { // A map of a (client-specified) creation id to the id the server assigned when a record was successfully created (optional). CreatedIds map[string]string `json:"createdIds,omitempty"` + + // This MUST be the string "Request". + // The specification extends the Response object with two additional arguments when used over a WebSocket. + Type TypeOfRequest `json:"@type,omitempty"` + + // A client-specified identifier for the request to be echoed back in the response to this request (optional). + Id string `json:"id,omitempty"` } +type TypeOfResponse string + +const ResponseType = TypeOfResponse("Response") + type Response struct { // An array of responses, in the same format as the methodCalls on the Request object. // The output of the methods MUST be added to the methodResponses array in the same order that the methods are processed. @@ -2742,6 +2757,13 @@ type Response struct { // // [Section 2]: https://jmap.io/spec-core.html#the-jmap-session-resource SessionState SessionState `json:"sessionState"` + + // This MUST be the string "Response". + // The specification extends the Response object with two additional arguments when used over a WebSocket. + Type TypeOfResponse `json:"@type,omitempty"` + + // MUST be returned if an identifier is included in the request (optional). + RequestId string `json:"requestId,omitempty"` } type EmailQueryResponse struct { @@ -3662,7 +3684,7 @@ type StateChange struct { // that occurred while it was disconnected. If the server does not support "pushState" tokens, // the client will have to issue a series of "/changes" requests upon reconnection to update // its state to match that of the server. - PushState string `json:"pushState"` + PushState State `json:"pushState"` } type AddressBookRights struct { diff --git a/pkg/structs/structs.go b/pkg/structs/structs.go index ea7126f186..4eb63e9a2e 100644 --- a/pkg/structs/structs.go +++ b/pkg/structs/structs.go @@ -240,3 +240,13 @@ func Concat[E any](arys ...[]E) []E { } return r } + +func Filter[E any](s []E, predicate func(E) bool) []E { + r := []E{} + for _, e := range s { + if predicate(e) { + r = append(r, e) + } + } + return r +}