From faf4f87bf9e7e788cfe8672de8ac85ed645f2bd6 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Mon, 15 May 2023 16:05:04 +0200 Subject: [PATCH] fix(GODT-2626): Do not merge events API has advised that doing this can lead to invalid state. We now return an array of events so that those are processed in order. --- event.go | 46 ++++++++++++++++++++++--------------------- event_test.go | 7 +++++-- event_types.go | 25 ----------------------- server/server_test.go | 13 ++++++------ 4 files changed, 36 insertions(+), 55 deletions(-) diff --git a/event.go b/event.go index 25b8038..6a2ba84 100644 --- a/event.go +++ b/event.go @@ -22,37 +22,37 @@ func (c *Client) GetLatestEventID(ctx context.Context) (string, error) { return res.EventID, nil } -// maxMergedEvents limits the number of events which are merged per one GetEvent +// maxCollectedEvents limits the number of events which are collected per one GetEvent // call. -const maxMergedEvents = 50 +const maxCollectedEvents = 50 + +func (c *Client) GetEvent(ctx context.Context, eventID string) ([]Event, bool, error) { + var events []Event -func (c *Client) GetEvent(ctx context.Context, eventID string) (Event, bool, error) { event, more, err := c.getEvent(ctx, eventID) if err != nil { - return Event{}, more, err + return nil, more, err } - nMerged := 0 + events = append(events, event) + + nCollected := 0 for more { - nMerged++ - if nMerged >= maxMergedEvents { + nCollected++ + if nCollected >= maxCollectedEvents { break } - var next Event - - next, more, err = c.getEvent(ctx, event.EventID) + event, more, err = c.getEvent(ctx, event.EventID) if err != nil { - return Event{}, false, err + return nil, false, err } - if err := event.merge(next); err != nil { - return Event{}, false, err - } + events = append(events, event) } - return event, more, nil + return events, more, nil } // NewEventStreamer returns a new event stream. @@ -77,21 +77,23 @@ func (c *Client) NewEventStream(ctx context.Context, period, jitter time.Duratio // ... } - event, _, err := c.GetEvent(ctx, lastEventID) + events, _, err := c.GetEvent(ctx, lastEventID) if err != nil { continue } - if event.EventID == lastEventID { + if events[len(events)-1].EventID == lastEventID { continue } - select { - case <-ctx.Done(): - return + for _, evt := range events { + select { + case <-ctx.Done(): + return - case eventCh <- event: - lastEventID = event.EventID + case eventCh <- evt: + lastEventID = evt.EventID + } } } }() diff --git a/event_test.go b/event_test.go index ff83930..cc74526 100644 --- a/event_test.go +++ b/event_test.go @@ -105,11 +105,14 @@ func TestMaxEventMerge(t *testing.T) { require.NoError(t, err) } - event, more, err := c.GetEvent(ctx, latestID) + events, more, err := c.GetEvent(ctx, latestID) require.NoError(t, err) require.True(t, more) + require.Equal(t, 50, len(events)) - event, more, err = c.GetEvent(ctx, event.EventID) + events2, more, err := c.GetEvent(ctx, events[len(events)-1].EventID) + require.NotEqual(t, events, events2) require.NoError(t, err) require.False(t, more) + require.Equal(t, 26, len(events2)) } diff --git a/event_types.go b/event_types.go index fa8d4e3..9ac62d5 100644 --- a/event_types.go +++ b/event_types.go @@ -70,31 +70,6 @@ func (event Event) String() string { return fmt.Sprintf("Event %s: %s", event.EventID, strings.Join(parts, ", ")) } -// merge combines this event with the other event (assumed to be newer!). -// TODO: Intelligent merging: if there are multiple EventUpdate(Flags) events, can we just take the latest one? -func (event *Event) merge(other Event) error { - event.EventID = other.EventID - - if other.User != nil { - event.User = other.User - } - - if other.MailSettings != nil { - event.MailSettings = other.MailSettings - } - - // For now, label events are simply appended. - event.Labels = append(event.Labels, other.Labels...) - - // For now, message events are simply appended. - event.Messages = append(event.Messages, other.Messages...) - - // For now, address events are simply appended. - event.Addresses = append(event.Addresses, other.Addresses...) - - return nil -} - type RefreshFlag uint8 const ( diff --git a/server/server_test.go b/server/server_test.go index 052e9e5..de7578a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -296,12 +296,13 @@ func TestServer_MessagesDeleteAfterUpdate(t *testing.T) { event, more, err := c.GetEvent(ctx, eventID) require.NoError(t, err) require.False(t, more) + require.Equal(t, 1, len(event)) // The event should have the correct number of message events. - require.Len(t, event.Messages, 500) + require.Len(t, event[0].Messages, 500) // All the events should be delete events. - for _, message := range event.Messages { + for _, message := range event[0].Messages { require.Equal(t, proton.EventDelete, message.Action) } }) @@ -402,18 +403,18 @@ func TestServer_Events_Multi(t *testing.T) { require.NoError(t, err) require.Equal(t, latest, latestAgain) - event, more, err := c.GetEvent(ctx, latest) + events, more, err := c.GetEvent(ctx, latest) require.NoError(t, err) require.False(t, more) // The event should be empty. - require.Equal(t, proton.Event{EventID: event.EventID}, event) + require.Equal(t, []proton.Event{{EventID: events[0].EventID}}, events) // After fetching an empty event, its ID should still be the latest. - eventAgain, more, err := c.GetEvent(ctx, event.EventID) + eventAgain, more, err := c.GetEvent(ctx, events[0].EventID) require.NoError(t, err) require.False(t, more) - require.Equal(t, eventAgain.EventID, event.EventID) + require.Equal(t, eventAgain[0].EventID, events[0].EventID) }) } })