groupware: WS push improvements, add getting email changes to WS integration test

This commit is contained in:
Pascal Bleser
2025-11-28 14:34:31 +01:00
parent d8b73cd614
commit db277cf9b0
9 changed files with 235 additions and 72 deletions

View File

@@ -14,7 +14,7 @@ type ApiClient interface {
}
type WsPushListener interface {
OnNotification(stateChange StateChange)
OnNotification(username string, stateChange StateChange)
}
type WsClient interface {
@@ -23,7 +23,7 @@ type WsClient interface {
}
type WsClientFactory interface {
EnableNotifications(pushState string, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error)
EnableNotifications(pushState State, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error)
io.Closer
}

View File

@@ -188,10 +188,39 @@ func (j *Client) GetAllEmailsInMailbox(accountId string, session *Session, ctx c
})
}
func (j *Client) GetEmailChanges(accountId string, session *Session, ctx context.Context, logger *log.Logger, acceptLanguage string, sinceState State, maxChanges uint) (EmailChangesResponse, SessionState, State, Language, Error) {
logger = j.loggerParams("GetEmailChanges", session, logger, func(z zerolog.Context) zerolog.Context {
return z.Str(logSinceState, string(sinceState))
})
changes := EmailChangesCommand{
AccountId: accountId,
SinceState: sinceState,
}
if maxChanges > 0 {
changes.MaxChanges = maxChanges
}
cmd, err := j.request(session, logger, invocation(CommandEmailChanges, changes, "0"))
if err != nil {
return EmailChangesResponse{}, "", "", "", simpleError(err, JmapErrorInvalidJmapRequestPayload)
}
return command(j.api, logger, ctx, session, j.onSessionOutdated, cmd, acceptLanguage, func(body *Response) (EmailChangesResponse, State, Error) {
var changesResponse EmailChangesResponse
err = retrieveResponseMatchParameters(logger, body, CommandEmailChanges, "0", &changesResponse)
if err != nil {
return EmailChangesResponse{}, "", err
}
return changesResponse, changesResponse.NewState, nil
})
}
// Get all the Emails that have been created, updated or deleted since a given state.
func (j *Client) GetEmailsSince(accountId string, session *Session, ctx context.Context, logger *log.Logger, acceptLanguage string, sinceState string, fetchBodies bool, maxBodyValueBytes uint, maxChanges uint) (MailboxChanges, SessionState, State, Language, Error) {
func (j *Client) GetEmailsSince(accountId string, session *Session, ctx context.Context, logger *log.Logger, acceptLanguage string, sinceState State, fetchBodies bool, maxBodyValueBytes uint, maxChanges uint) (MailboxChanges, SessionState, State, Language, Error) {
logger = j.loggerParams("GetEmailsSince", session, logger, func(z zerolog.Context) zerolog.Context {
return z.Bool(logFetchBodies, fetchBodies).Str(logSinceState, sinceState)
return z.Bool(logFetchBodies, fetchBodies).Str(logSinceState, string(sinceState))
})
changes := EmailChangesCommand{
@@ -254,7 +283,7 @@ func (j *Client) GetEmailsSince(accountId string, session *Session, ctx context.
HasMoreChanges: changesResponse.HasMoreChanges,
NewState: changesResponse.NewState,
Created: createdResponse.List,
Updated: createdResponse.List,
Updated: updatedResponse.List,
}, updatedResponse.State, nil
})
}

View File

@@ -1,13 +1,9 @@
package jmap
import (
"github.com/opencloud-eu/opencloud/pkg/log"
)
func (j *Client) EnablePush(pushState string, session *Session, _ *log.Logger) Error {
panic("not implemented") // TODO implement push
func (j *Client) EnablePushNotifications(pushState State, sessionProvider func() (*Session, error)) (WsClient, error) {
return j.ws.EnableNotifications(pushState, sessionProvider, j)
}
func (j *Client) DisablePush(_ *Session, _ *log.Logger) Error {
panic("not implemented") // TODO implement push
func (j *Client) AddWsPushListener(listener WsPushListener) {
j.wsPushListeners.add(listener)
}

View File

@@ -38,10 +38,6 @@ func NewClient(session SessionClient, api ApiClient, blob BlobClient, ws WsClien
}
}
func (j *Client) EnableNotifications(pushState string, sessionProvider func() (*Session, error)) (WsClient, error) {
return j.ws.EnableNotifications(pushState, sessionProvider, j)
}
func (j *Client) AddSessionEventListener(listener SessionEventListener) {
j.sessionEventListeners.add(listener)
}
@@ -52,13 +48,9 @@ func (j *Client) onSessionOutdated(session *Session, newSessionState SessionStat
})
}
func (j *Client) AddWsPushListener(listener WsPushListener) {
j.wsPushListeners.add(listener)
}
func (j *Client) OnNotification(stateChange StateChange) {
func (j *Client) OnNotification(username string, stateChange StateChange) {
j.wsPushListeners.signal(func(listener WsPushListener) {
listener.OnNotification(stateChange)
listener.OnNotification(username, stateChange)
})
}

View File

@@ -426,7 +426,7 @@ type WebSocketPushEnable struct {
// The last "pushState" token that the client received from the server.
// Upon receipt of a "pushState" token, the server SHOULD immediately send all changes since that state token.
PushState string `json:"pushState,omitempty"`
PushState State `json:"pushState,omitempty"`
}
type WebSocketPushDisable struct {
@@ -541,46 +541,48 @@ func (w *HttpWsClient) readPump() {
//c.conn.SetReadDeadline(time.Now().Add(pongWait))
//c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
logger := log.From(w.logger.With().Str("username", w.username))
for {
if _, message, err := w.c.ReadMessage(); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
w.logger.Error().Err(err).Msg("unexpected close")
logger.Error().Err(err).Msg("unexpected close")
}
break
} else {
if w.logger.Trace().Enabled() {
w.logger.Trace().Str(logEndpoint, w.endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypePush).Msg(string(message))
if logger.Trace().Enabled() {
logger.Trace().Str(logEndpoint, w.endpoint).Str(logProto, logProtoJmapWs).Str(logType, logTypePush).Msg(string(message))
}
var peek struct {
Type string `json:"@type"`
}
if err := json.Unmarshal(message, &peek); err != nil {
w.logger.Error().Err(err).Msg("failed to deserialized pushed WS message")
logger.Error().Err(err).Msg("failed to deserialized pushed WS message")
continue
}
switch peek.Type {
case string(TypeOfStateChange):
var stateChange StateChange
if err := json.Unmarshal(message, &stateChange); err != nil {
w.logger.Error().Err(err).Msgf("failed to deserialized pushed WS message into a %T", stateChange)
logger.Error().Err(err).Msgf("failed to deserialized pushed WS message into a %T", stateChange)
continue
} else {
if w.listener != nil {
w.listener.OnNotification(stateChange)
w.listener.OnNotification(w.username, stateChange)
} else {
w.logger.Warn().Msgf("no listener to be notified of %v", stateChange)
logger.Warn().Msgf("no listener to be notified of %v", stateChange)
}
}
default:
w.logger.Warn().Msgf("unsupported pushed WS message JMAP @type: '%s'", peek.Type)
logger.Warn().Msgf("unsupported pushed WS message JMAP @type: '%s'", peek.Type)
continue
}
}
}
}
func (w *HttpWsClientFactory) EnableNotifications(pushState string, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) {
func (w *HttpWsClientFactory) EnableNotifications(pushState State, sessionProvider func() (*Session, error), listener WsPushListener) (WsClient, Error) {
c, username, endpoint, jerr := w.connect(sessionProvider)
if jerr != nil {
return nil, jerr
@@ -628,18 +630,19 @@ func (c *HttpWsClient) DisableNotifications() Error {
return nil
}
err := c.c.WriteJSON(WebSocketPushDisable{
Type: WebSocketPushTypeDisable,
})
if err != nil {
return SimpleError{code: JmapErrorWssFailedToSendWebSocketPushDisable, err: err}
}
werr := c.c.WriteJSON(WebSocketPushDisable{Type: WebSocketPushTypeDisable})
merr := c.c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
cerr := c.c.Close()
err = c.c.Close()
if err != nil {
return SimpleError{code: JmapErrorWssFailedToClose, err: err}
if werr != nil {
return SimpleError{code: JmapErrorWssFailedToClose, err: werr}
}
if merr != nil {
return SimpleError{code: JmapErrorWssFailedToClose, err: merr}
}
if cerr != nil {
return SimpleError{code: JmapErrorWssFailedToClose, err: cerr}
}
return nil
}

View File

@@ -73,7 +73,7 @@ func TestEmails(t *testing.T) {
var threads int = 0
var mails []filledMail = nil
{
mails, threads, err = s.fillEmailsWithImap(inboxFolder, count)
mails, threads, err = s.fillEmailsWithImap(inboxFolder, count, false)
require.NoError(err)
}
mailsByMessageId := structs.Index(mails, func(mail filledMail) string { return mail.messageId })
@@ -271,7 +271,7 @@ var allKeywords = map[string]imap.Flag{
JmapKeywordSeen: imap.FlagSeen,
}
func (s *StalwartTest) fillEmailsWithImap(folder string, count int) ([]filledMail, int, error) {
func (s *StalwartTest) fillEmailsWithImap(folder string, count int, empty bool) ([]filledMail, int, error) {
to := fmt.Sprintf("%s <%s>", s.userPersonName, s.userEmail)
ccEvery := 2
bccEvery := 3
@@ -302,24 +302,26 @@ func (s *StalwartTest) fillEmailsWithImap(folder string, count int) ([]filledMai
return nil, 0, err
}
if ids, err := c.Search(&imap.SearchCriteria{}, nil).Wait(); err != nil {
return nil, 0, err
} else {
if len(ids.AllSeqNums()) > 0 {
storeFlags := imap.StoreFlags{
Op: imap.StoreFlagsAdd,
Flags: []imap.Flag{imap.FlagDeleted},
Silent: true,
}
if err = c.Store(ids.All, &storeFlags, nil).Close(); err != nil {
return nil, 0, err
}
if err = c.Expunge().Close(); err != nil {
return nil, 0, err
}
log.Printf("🗑️ deleted %d messages in %s", len(ids.AllSeqNums()), folder)
if empty {
if ids, err := c.Search(&imap.SearchCriteria{}, nil).Wait(); err != nil {
return nil, 0, err
} else {
log.Printf(" did not delete any messages, %s is empty", folder)
if len(ids.AllSeqNums()) > 0 {
storeFlags := imap.StoreFlags{
Op: imap.StoreFlagsAdd,
Flags: []imap.Flag{imap.FlagDeleted},
Silent: true,
}
if err = c.Store(ids.All, &storeFlags, nil).Close(); err != nil {
return nil, 0, err
}
if err = c.Expunge().Close(); err != nil {
return nil, 0, err
}
log.Printf("🗑️ deleted %d messages in %s", len(ids.AllSeqNums()), folder)
} else {
log.Printf(" did not delete any messages, %s is empty", folder)
}
}
}
@@ -525,7 +527,7 @@ func (s *StalwartTest) fillEmailsWithImap(folder string, count int) ([]filledMai
if inboxCount == -1 {
return nil, 0, fmt.Errorf("failed to find folder '%v' via IMAP", folder)
}
if count != inboxCount {
if empty && count != inboxCount {
return nil, 0, fmt.Errorf("wrong number of emails in the inbox after filling, expecting %v, has %v", count, inboxCount)
}

View File

@@ -7,12 +7,15 @@ import (
"time"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testWsPushListener struct {
t *testing.T
logger *log.Logger
username string
mailAccountId string
calls atomic.Uint32
m sync.Mutex
@@ -21,9 +24,10 @@ type testWsPushListener struct {
mailboxStates []string
}
func (l *testWsPushListener) OnNotification(pushState StateChange) {
func (l *testWsPushListener) OnNotification(username string, pushState StateChange) {
assert.Equal(l.t, l.username, username)
l.calls.Add(1)
// pushState is currently not supported by Stalwart, let's use the Email state instead
// pushState is currently not supported by Stalwart, let's use the object states instead
l.logger.Debug().Msgf("received %T: %v", pushState, pushState)
if changed, ok := pushState.Changed[l.mailAccountId]; ok {
l.m.Lock()
@@ -37,7 +41,12 @@ func (l *testWsPushListener) OnNotification(pushState StateChange) {
l.mailboxStates = append(l.mailboxStates, st)
}
l.m.Unlock()
unsupportedKeys := structs.Filter(structs.Keys(changed), func(o ObjectType) bool { return o != EmailType && o != ThreadType && o != MailboxType })
assert.Empty(l.t, unsupportedKeys)
}
unsupportedAccounts := structs.Filter(structs.Keys(pushState.Changed), func(s string) bool { return s != l.mailAccountId })
assert.Empty(l.t, unsupportedAccounts)
}
var _ WsPushListener = &testWsPushListener{}
@@ -47,6 +56,8 @@ func TestWs(t *testing.T) {
return
}
assert.NoError(t, nil)
require := require.New(t)
s, err := newStalwartTest(t)
@@ -59,11 +70,45 @@ func TestWs(t *testing.T) {
_, inboxFolder = s.findInbox(t, mailAccountId)
}
l := &testWsPushListener{logger: s.logger, mailAccountId: mailAccountId}
l := &testWsPushListener{t: t, username: s.username, logger: s.logger, mailAccountId: mailAccountId}
s.client.AddWsPushListener(l)
require.Equal(uint32(0), l.calls.Load())
wsc, err := s.client.EnableNotifications("", func() (*Session, error) { return s.session, nil })
require.Equal(uint32(0), l.calls.Load())
{
l.m.Lock()
require.Len(l.emailStates, 0)
require.Len(l.mailboxStates, 0)
require.Len(l.threadStates, 0)
l.m.Unlock()
}
var initialState State
{
changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", "", 0)
require.NoError(err)
require.Equal(s.session.State, sessionState)
require.NotEmpty(state)
//fmt.Printf("\x1b[45;1;4mChanges [%s]:\x1b[0m\n", state)
//for _, c := range changes.Created { fmt.Printf("%s %s\n", c.Id, c.Subject) }
initialState = state
require.Empty(changes.Created)
require.Empty(changes.Destroyed)
require.Empty(changes.Updated)
}
require.NotEmpty(initialState)
{
changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", initialState, 0)
require.NoError(err)
require.Equal(s.session.State, sessionState)
require.Equal(initialState, state)
require.Equal(initialState, changes.NewState)
require.Empty(changes.Created)
require.Empty(changes.Destroyed)
require.Empty(changes.Updated)
}
wsc, err := s.client.EnablePushNotifications(initialState, func() (*Session, error) { return s.session, nil })
require.NoError(err)
defer wsc.Close()
@@ -76,8 +121,10 @@ func TestWs(t *testing.T) {
l.m.Unlock()
}
emailIds := []string{}
{
_, n, err := s.fillEmailsWithImap(inboxFolder, 1)
_, n, err := s.fillEmailsWithImap(inboxFolder, 1, false)
require.NoError(err)
require.Equal(1, n)
}
@@ -92,9 +139,24 @@ func TestWs(t *testing.T) {
require.Len(l.threadStates, 1)
l.m.Unlock()
}
var lastState State
{
changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", initialState, 0)
require.NoError(err)
require.Equal(s.session.State, sessionState)
require.NotEqual(initialState, state)
require.NotEqual(initialState, changes.NewState)
require.Equal(state, changes.NewState)
require.Len(changes.Created, 1)
require.Empty(changes.Destroyed)
require.Empty(changes.Updated)
lastState = state
emailIds = append(emailIds, changes.Created...)
}
{
_, n, err := s.fillEmailsWithImap(inboxFolder, 1)
_, n, err := s.fillEmailsWithImap(inboxFolder, 1, false)
require.NoError(err)
require.Equal(1, n)
}
@@ -112,6 +174,53 @@ func TestWs(t *testing.T) {
assert.NotEqual(t, l.threadStates[0], l.threadStates[1])
l.m.Unlock()
}
{
changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", lastState, 0)
require.NoError(err)
require.Equal(s.session.State, sessionState)
require.NotEqual(lastState, state)
require.NotEqual(lastState, changes.NewState)
require.Equal(state, changes.NewState)
require.Len(changes.Created, 1)
require.Empty(changes.Destroyed)
require.Empty(changes.Updated)
lastState = state
emailIds = append(emailIds, changes.Created...)
}
{
_, n, err := s.fillEmailsWithImap(inboxFolder, 0, true)
require.NoError(err)
require.Equal(0, n)
}
require.Eventually(func() bool {
return l.calls.Load() == uint32(3)
}, 3*time.Second, 200*time.Millisecond, "WS push listener was not called after third email state change")
{
l.m.Lock()
require.Len(l.emailStates, 3)
require.Len(l.mailboxStates, 3)
require.Len(l.threadStates, 3)
assert.NotEqual(t, l.emailStates[1], l.emailStates[2])
assert.NotEqual(t, l.mailboxStates[1], l.mailboxStates[2])
assert.NotEqual(t, l.threadStates[1], l.threadStates[2])
l.m.Unlock()
}
{
changes, sessionState, state, _, err := s.client.GetEmailChanges(mailAccountId, s.session, s.ctx, s.logger, "", lastState, 0)
require.NoError(err)
require.Equal(s.session.State, sessionState)
require.NotEqual(lastState, state)
require.NotEqual(lastState, changes.NewState)
require.Equal(state, changes.NewState)
require.Empty(changes.Created)
require.Len(changes.Destroyed, 2)
require.EqualValues(emailIds, changes.Destroyed)
require.Empty(changes.Updated)
lastState = state
}
err = wsc.DisableNotifications()
require.NoError(err)

View File

@@ -1884,7 +1884,7 @@ type EmailChangesCommand struct {
//
// This is the string that was returned as the state argument in the Email/get response.
// The server will return the changes that have occurred since this state.
SinceState string `json:"sinceState,omitzero,omitempty"`
SinceState State `json:"sinceState,omitzero,omitempty"`
// The maximum number of ids to return in the response.
//
@@ -2705,6 +2705,10 @@ func invocation(command Command, parameters any, tag string) Invocation {
}
}
type TypeOfRequest string
const RequestType = TypeOfRequest("Request")
type Request struct {
// The set of capabilities the client wishes to use.
//
@@ -2722,8 +2726,19 @@ type Request struct {
// A map of a (client-specified) creation id to the id the server assigned when a record was successfully created (optional).
CreatedIds map[string]string `json:"createdIds,omitempty"`
// This MUST be the string "Request".
// The specification extends the Response object with two additional arguments when used over a WebSocket.
Type TypeOfRequest `json:"@type,omitempty"`
// A client-specified identifier for the request to be echoed back in the response to this request (optional).
Id string `json:"id,omitempty"`
}
type TypeOfResponse string
const ResponseType = TypeOfResponse("Response")
type Response struct {
// An array of responses, in the same format as the methodCalls on the Request object.
// The output of the methods MUST be added to the methodResponses array in the same order that the methods are processed.
@@ -2742,6 +2757,13 @@ type Response struct {
//
// [Section 2]: https://jmap.io/spec-core.html#the-jmap-session-resource
SessionState SessionState `json:"sessionState"`
// This MUST be the string "Response".
// The specification extends the Response object with two additional arguments when used over a WebSocket.
Type TypeOfResponse `json:"@type,omitempty"`
// MUST be returned if an identifier is included in the request (optional).
RequestId string `json:"requestId,omitempty"`
}
type EmailQueryResponse struct {
@@ -3662,7 +3684,7 @@ type StateChange struct {
// that occurred while it was disconnected. If the server does not support "pushState" tokens,
// the client will have to issue a series of "/changes" requests upon reconnection to update
// its state to match that of the server.
PushState string `json:"pushState"`
PushState State `json:"pushState"`
}
type AddressBookRights struct {

View File

@@ -240,3 +240,13 @@ func Concat[E any](arys ...[]E) []E {
}
return r
}
func Filter[E any](s []E, predicate func(E) bool) []E {
r := []E{}
for _, e := range s {
if predicate(e) {
r = append(r, e)
}
}
return r
}