Make pending local echoes sticky, take 2 (#2189)

* ui: Move EventSendState into timeline::event_item::local module

* [WIP] ui: Make pending local echoes stick to the bottom of the timeline

* test: update more test expectations

* chore: tweak comment to slightly better reflect reality

* nit: remove else after return

* fix: the item's insert position is insert_idx, not `items.len()` anymore

* fix: look for remote echo before local echo when processing send state

Previous code assumed that the latest timeline items would be the most recent, and that
if there was a remote echo, it would always be after the local echo, because of that.
That's not the case anymore, so we must look for possibly a remote echo first, and then
if we find it, apply the late update process.

Also, there might remain a day divider added by the local echo, if it were inserted last.
I'm not sure it covers all the cases, but I've now made it so that the day divider is removed
if it was the last element.

* feat: switch strategy; keep on pushing if there's nothing in the timeline yet

* Revert "test: update more test expectations"

This reverts commit 400cc93ba7c98042a28b5e8d5042899e854f6cff.

* test: reset test expectations

* Address review comments

* fix: don't mix up latest event with any status, with latest non-failed event index

---------

Co-authored-by: Jonas Platte <jplatte@matrix.org>
This commit is contained in:
Benjamin Bouvier
2023-06-30 11:37:49 +02:00
committed by GitHub
parent 5348e7cc12
commit d72fd34325
8 changed files with 185 additions and 103 deletions

View File

@@ -682,12 +682,12 @@ impl<'a> TimelineEventHandler<'a> {
if let Some(day_divider_item) =
maybe_create_day_divider_from_timestamps(old_ts, timestamp)
{
trace!("Adding day divider");
trace!("Adding day divider (local)");
self.items.push_back(Arc::new(day_divider_item));
}
} else {
// If there is no event item, there is no day divider yet.
trace!("Adding first day divider");
trace!("Adding first day divider (local)");
self.items.push_back(Arc::new(TimelineItem::day_divider(timestamp)));
}
@@ -799,36 +799,35 @@ impl<'a> TimelineEventHandler<'a> {
trace!(idx, "Replacing existing event");
self.items.set(idx, Arc::new(item.into()));
return;
} else {
// In more complex cases, remove the item and day
// divider (if necessary) before re-adding the item.
trace!("Removing local echo or duplicate timeline item");
self.items.remove(idx);
assert_ne!(
idx, 0,
"there is never an event item at index 0 because \
the first event item is preceded by a day divider"
);
// Pre-requisites for removing the day divider:
// 1. there is one preceding the old item at all
if self.items[idx - 1].is_day_divider()
// 2. the item after the old one that was removed
// is virtual (it should be impossible for this
// to be a read marker)
&& self
.items
.get(idx)
.map_or(true, |item| item.is_virtual())
{
trace!("Removing day divider");
self.items.remove(idx - 1);
}
// no return here, below code for adding a new event
// will run to re-add the removed item
}
// In more complex cases, remove the item and day
// divider (if necessary) before re-adding the item.
trace!("Removing local echo or duplicate timeline item");
self.items.remove(idx);
assert_ne!(
idx, 0,
"there is never an event item at index 0 because \
the first event item is preceded by a day divider"
);
// Pre-requisites for removing the day divider:
// 1. there is one preceding the old item at all
if self.items[idx - 1].is_day_divider()
// 2. the item after the old one that was removed is virtual (it should be
// impossible for this to be a read marker)
&& self
.items
.get(idx)
.map_or(true, |item| item.is_virtual())
{
trace!("Removing day divider");
self.items.remove(idx - 1);
}
// no return here, below code for adding a new event
// will run to re-add the removed item
} else if txn_id.is_some() {
warn!(
"Received event with transaction ID, but didn't \
@@ -836,26 +835,66 @@ impl<'a> TimelineEventHandler<'a> {
);
}
// Check if the latest event has the same date as this event.
if let Some(latest_event) = self.items.iter().rev().find_map(|item| item.as_event())
{
// Local echoes that are pending should stick to the bottom,
// find the latest event that isn't that.
let mut latest_event_stream = self
.items
.iter()
.enumerate()
.rev()
.filter_map(|(idx, item)| Some((idx, item.as_event()?)));
// Find the latest event, independently of success or failure status.
let latest_event = latest_event_stream.clone().next().unzip().1;
// Find the index of the latest non-failure event.
let latest_nonfailed_event_idx = latest_event_stream
.find(|(_, evt)| {
!matches!(
evt.send_state(),
Some(EventSendState::NotSentYet | EventSendState::Sent { .. })
)
})
.unzip()
.0;
// Insert the next item after the latest non-failed event item,
// or at the start if there is no such item.
let mut insert_idx = latest_nonfailed_event_idx.map_or(0, |idx| idx + 1);
// Keep push semantics, if we're inserting at the end.
let should_push = insert_idx == self.items.len();
if let Some(latest_event) = latest_event {
// Check if that event has the same date as the new one.
let old_ts = latest_event.timestamp();
if let Some(day_divider_item) =
maybe_create_day_divider_from_timestamps(old_ts, timestamp)
{
trace!("Adding day divider");
self.items.push_back(Arc::new(day_divider_item));
trace!("Adding day divider (remote)");
if should_push {
self.items.push_back(Arc::new(day_divider_item));
} else {
self.items.insert(insert_idx, Arc::new(day_divider_item));
insert_idx += 1;
}
}
} else {
// If there is no event item, there is no day divider yet.
trace!("Adding first day divider");
self.items.push_back(Arc::new(TimelineItem::day_divider(timestamp)));
trace!("Adding first day divider (remote)");
if should_push {
self.items.push_back(Arc::new(TimelineItem::day_divider(timestamp)));
} else {
self.items
.insert(insert_idx, Arc::new(TimelineItem::day_divider(timestamp)));
insert_idx += 1;
}
}
if self.track_read_receipts {
maybe_add_implicit_read_receipt(
self.items.len(),
insert_idx,
&mut item,
self.meta.is_own_event,
self.items,
@@ -863,8 +902,12 @@ impl<'a> TimelineEventHandler<'a> {
);
}
trace!("Adding new remote timeline item at the end");
self.items.push_back(Arc::new(item.into()));
trace!("Adding new remote timeline item after all non-pending events");
if should_push {
self.items.push_back(Arc::new(item.into()));
} else {
self.items.insert(insert_idx, Arc::new(item.into()));
}
}
#[cfg(feature = "e2e-encryption")]

View File

@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ruma::{EventId, OwnedTransactionId};
use std::sync::Arc;
use super::EventSendState;
use matrix_sdk::Error;
use ruma::{EventId, OwnedEventId, OwnedTransactionId};
/// An item for an event that was created locally and not yet echoed back by
/// the homeserver.
@@ -43,3 +44,24 @@ impl LocalEventTimelineItem {
Self { send_state, ..self.clone() }
}
}
/// This type represents the "send state" of a local event timeline item.
#[derive(Clone, Debug)]
pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,
/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed {
/// Details about how sending the event failed.
error: Arc<Error>,
},
/// Sending has been cancelled because an earlier event in the
/// message-sending queue failed.
Cancelled,
/// The local event has been sent successfully to the server.
Sent {
/// The event ID assigned by the server.
event_id: OwnedEventId,
},
}

View File

@@ -20,18 +20,20 @@ use once_cell::sync::Lazy;
use ruma::{
events::{receipt::Receipt, room::message::MessageType, AnySyncTimelineEvent},
serde::Raw,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedUserId, TransactionId,
UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedMxcUri, OwnedUserId, TransactionId, UserId,
};
mod content;
mod local;
mod remote;
pub use self::content::{
AnyOtherFullStateEventContent, BundledReactions, EncryptedMessage, InReplyToDetails,
MemberProfileChange, MembershipChange, Message, OtherState, ReactionGroup, RepliedToEvent,
RoomMembershipChange, Sticker, TimelineItemContent,
pub use self::{
content::{
AnyOtherFullStateEventContent, BundledReactions, EncryptedMessage, InReplyToDetails,
MemberProfileChange, MembershipChange, Message, OtherState, ReactionGroup, RepliedToEvent,
RoomMembershipChange, Sticker, TimelineItemContent,
},
local::EventSendState,
};
pub(super) use self::{
local::LocalEventTimelineItem,
@@ -290,27 +292,6 @@ impl EventTimelineItem {
}
}
/// This type represents the "send state" of a local event timeline item.
#[derive(Clone, Debug)]
pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,
/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed {
/// Details about how sending the event failed.
error: Arc<Error>,
},
/// Sending has been cancelled because an earlier event in the
/// message-sending queue failed.
Cancelled,
/// The local event has been sent successfully to the server.
Sent {
/// The event ID assigned by the server.
event_id: OwnedEventId,
},
}
impl From<LocalEventTimelineItem> for EventTimelineItemKind {
fn from(value: LocalEventTimelineItem) -> Self {
EventTimelineItemKind::Local(value)

View File

@@ -496,25 +496,19 @@ impl<P: RoomDataProvider> TimelineInner<P> {
_ => None,
};
// Look for the local event by the transaction ID or event ID.
let result = rfind_event_item(&state.items, |it| {
it.transaction_id() == Some(txn_id)
|| new_event_id.is_some() && it.event_id() == new_event_id
});
let Some((idx, item)) = result else {
// Event isn't found at all.
warn!("Timeline item not found, can't add event ID");
return;
};
let Some(local_item) = item.as_local() else {
// The local echoes are always at the end of the timeline, we must first make
// sure the remote echo hasn't showed up yet.
if rfind_event_item(&state.items, |it| {
new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
})
.is_some()
{
// Remote echo already received. This is very unlikely.
trace!("Remote echo received before send-event response");
let local_echo = rfind_event_item(&state.items, |it| {
it.transaction_id() == Some(txn_id)
});
let local_echo =
rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id));
// If there's both the remote echo and a local echo, that means the
// remote echo was received before the response *and* contained no
// transaction ID (and thus duplicated the local echo).
@@ -526,17 +520,33 @@ impl<P: RoomDataProvider> TimelineInner<P> {
error!("Inconsistent state: Local echo was not preceded by day divider");
return;
}
if idx == state.items.len() {
error!("Inconsistent state: Echo was duplicated but local echo was last");
return;
}
if state.items[idx - 1].is_day_divider() && state.items[idx].is_day_divider() {
// If local echo was the only event from that day, remove day divider.
if idx == state.items.len() && state.items[idx - 1].is_day_divider() {
// The day divider may have been added for this local echo, remove it and let
// the next message decide whether it's required or not.
state.items.remove(idx - 1);
}
}
return;
}
// Look for the local event by the transaction ID or event ID.
let result = rfind_event_item(&state.items, |it| {
it.transaction_id() == Some(txn_id)
|| new_event_id.is_some()
&& it.event_id() == new_event_id
&& it.as_local().is_some()
});
let Some((idx, item)) = result else {
// Event isn't found at all.
warn!("Timeline item not found, can't add event ID");
return;
};
let Some(local_item) = item.as_local() else {
warn!("We looked for a local item, but it transitioned to remote??");
return;
};

View File

@@ -125,8 +125,11 @@ async fn remote_echo_new_position() {
// … and another event that comes back before the remote echo
timeline.handle_live_message_event(&BOB, RoomMessageEventContent::text_plain("test")).await;
let _day_divider = assert_next_matches!(stream, VectorDiff::PushBack { value } => value);
let _bob_message = assert_next_matches!(stream, VectorDiff::PushBack { value } => value);
// … and is inserted before the local echo item
let _day_divider =
assert_next_matches!(stream, VectorDiff::Insert { index: 0, value } => value);
let _bob_message =
assert_next_matches!(stream, VectorDiff::Insert { index: 1, value } => value);
// When the remote echo comes in…
timeline
@@ -146,9 +149,9 @@ async fn remote_echo_new_position() {
.await;
// … the local echo should be removed
assert_next_matches!(stream, VectorDiff::Remove { index: 1 });
assert_next_matches!(stream, VectorDiff::Remove { index: 3 });
// … along with its day divider
assert_next_matches!(stream, VectorDiff::Remove { index: 0 });
assert_next_matches!(stream, VectorDiff::Remove { index: 2 });
// … and the remote echo added (no new day divider because both bob's and
// alice's message are from the same day according to server timestamps)

View File

@@ -246,15 +246,15 @@ async fn dedup_by_event_id_late() {
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
assert_next_matches!(timeline_stream, VectorDiff::PushBack { .. }); // day divider
assert_next_matches!(timeline_stream, VectorDiff::Insert { index: 0, .. }); // day divider
let remote_echo =
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value);
assert_next_matches!(timeline_stream, VectorDiff::Insert { index: 1, value } => value);
let item = remote_echo.as_event().unwrap();
assert_eq!(item.event_id(), Some(event_id));
// Local echo and its day divider are removed.
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 1 }));
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 }));
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 3 }));
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 2 }));
}
#[async_test]

View File

@@ -117,6 +117,31 @@ macro_rules! assert_timeline_stream {
)
};
// `insert [$nth] "$event_id"`
( @_ [ $stream:ident ] [ insert [$index:literal] $event_id:literal ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_timeline_stream!(
@_
[ $stream ]
[ $( $rest )* ]
[
$( $accumulator )*
{
assert_matches!(
$stream.next().now_or_never(),
Some(Some(VectorDiff::Insert { index: $index, value })) => {
assert_matches!(
value.as_ref(),
TimelineItem::Event(event_timeline_item) => {
assert_eq!(event_timeline_item.event_id().unwrap().as_str(), $event_id);
}
);
}
);
}
]
)
};
// `update [$nth] "$event_id"`
( @_ [ $stream:ident ] [ update [$index:literal] $event_id:literal ; $( $rest:tt )* ] [ $( $accumulator:tt )* ] ) => {
assert_timeline_stream!(

View File

@@ -76,13 +76,11 @@ async fn test_toggling_reaction() -> Result<()> {
let _join_rules = stream.next().await;
let _history_visibility = stream.next().await;
let _guest_access = stream.next().await;
let _remove_local_event = stream.next().await;
}
// Check we have the remote echo
{
let event =
assert_matches!(stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let event = assert_matches!(stream.next().await, Some(VectorDiff::Set { index: 7, value }) => value);
let event = event.as_event().unwrap();
assert_eq!(event.event_id().unwrap(), &event_id);
assert!(!event.is_local_echo());