fix(GODT-2626): Do not merge events

API has advised that doing this can lead to invalid state. We now return
an array of events so that those are processed in order.
This commit is contained in:
Leander Beernaert
2023-05-15 16:05:04 +02:00
committed by LBeernaertProton
parent 6665f11b88
commit faf4f87bf9
4 changed files with 36 additions and 55 deletions

View File

@@ -22,37 +22,37 @@ func (c *Client) GetLatestEventID(ctx context.Context) (string, error) {
return res.EventID, nil
}
// maxMergedEvents limits the number of events which are merged per one GetEvent
// maxCollectedEvents limits the number of events which are collected per one GetEvent
// call.
const maxMergedEvents = 50
const maxCollectedEvents = 50
func (c *Client) GetEvent(ctx context.Context, eventID string) ([]Event, bool, error) {
var events []Event
func (c *Client) GetEvent(ctx context.Context, eventID string) (Event, bool, error) {
event, more, err := c.getEvent(ctx, eventID)
if err != nil {
return Event{}, more, err
return nil, more, err
}
nMerged := 0
events = append(events, event)
nCollected := 0
for more {
nMerged++
if nMerged >= maxMergedEvents {
nCollected++
if nCollected >= maxCollectedEvents {
break
}
var next Event
next, more, err = c.getEvent(ctx, event.EventID)
event, more, err = c.getEvent(ctx, event.EventID)
if err != nil {
return Event{}, false, err
return nil, false, err
}
if err := event.merge(next); err != nil {
return Event{}, false, err
}
events = append(events, event)
}
return event, more, nil
return events, more, nil
}
// NewEventStreamer returns a new event stream.
@@ -77,21 +77,23 @@ func (c *Client) NewEventStream(ctx context.Context, period, jitter time.Duratio
// ...
}
event, _, err := c.GetEvent(ctx, lastEventID)
events, _, err := c.GetEvent(ctx, lastEventID)
if err != nil {
continue
}
if event.EventID == lastEventID {
if events[len(events)-1].EventID == lastEventID {
continue
}
select {
case <-ctx.Done():
return
for _, evt := range events {
select {
case <-ctx.Done():
return
case eventCh <- event:
lastEventID = event.EventID
case eventCh <- evt:
lastEventID = evt.EventID
}
}
}
}()

View File

@@ -105,11 +105,14 @@ func TestMaxEventMerge(t *testing.T) {
require.NoError(t, err)
}
event, more, err := c.GetEvent(ctx, latestID)
events, more, err := c.GetEvent(ctx, latestID)
require.NoError(t, err)
require.True(t, more)
require.Equal(t, 50, len(events))
event, more, err = c.GetEvent(ctx, event.EventID)
events2, more, err := c.GetEvent(ctx, events[len(events)-1].EventID)
require.NotEqual(t, events, events2)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, 26, len(events2))
}

View File

@@ -70,31 +70,6 @@ func (event Event) String() string {
return fmt.Sprintf("Event %s: %s", event.EventID, strings.Join(parts, ", "))
}
// merge combines this event with the other event (assumed to be newer!).
// TODO: Intelligent merging: if there are multiple EventUpdate(Flags) events, can we just take the latest one?
func (event *Event) merge(other Event) error {
event.EventID = other.EventID
if other.User != nil {
event.User = other.User
}
if other.MailSettings != nil {
event.MailSettings = other.MailSettings
}
// For now, label events are simply appended.
event.Labels = append(event.Labels, other.Labels...)
// For now, message events are simply appended.
event.Messages = append(event.Messages, other.Messages...)
// For now, address events are simply appended.
event.Addresses = append(event.Addresses, other.Addresses...)
return nil
}
type RefreshFlag uint8
const (

View File

@@ -296,12 +296,13 @@ func TestServer_MessagesDeleteAfterUpdate(t *testing.T) {
event, more, err := c.GetEvent(ctx, eventID)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, 1, len(event))
// The event should have the correct number of message events.
require.Len(t, event.Messages, 500)
require.Len(t, event[0].Messages, 500)
// All the events should be delete events.
for _, message := range event.Messages {
for _, message := range event[0].Messages {
require.Equal(t, proton.EventDelete, message.Action)
}
})
@@ -402,18 +403,18 @@ func TestServer_Events_Multi(t *testing.T) {
require.NoError(t, err)
require.Equal(t, latest, latestAgain)
event, more, err := c.GetEvent(ctx, latest)
events, more, err := c.GetEvent(ctx, latest)
require.NoError(t, err)
require.False(t, more)
// The event should be empty.
require.Equal(t, proton.Event{EventID: event.EventID}, event)
require.Equal(t, []proton.Event{{EventID: events[0].EventID}}, events)
// After fetching an empty event, its ID should still be the latest.
eventAgain, more, err := c.GetEvent(ctx, event.EventID)
eventAgain, more, err := c.GetEvent(ctx, events[0].EventID)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, eventAgain.EventID, event.EventID)
require.Equal(t, eventAgain[0].EventID, events[0].EventID)
})
}
})