mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-18 13:40:55 -04:00
feat(sdk): Improve RoomEventCacheUpdate (#3471)
* chore(sdk): Rename `RoomEventCacheUpdate::UpdateReadMarker`.
This patch renames `RoomEventCacheUpdate::UpdateReadMarker` to
`ReadMarker`. The `Update` prefix is already part of the enum name.
* feat(sdk): Rename `RoomEventCacheUpdate::ReadMarker::event_id` to `move_to`.
This patch renames `RoomEventCacheUpdate::ReadMarker::event_id` to
`ReadMarker::move_to` as I feel like it conveys a better semantics.
* feat(sdk): Extract `RoomEventCacheUpdate::Append::ambiguity_changes`.
This patch extracts `RoomEventCacheUpdate::Append::ambiguity_changes`
into a new variant `RoomEventCacheUpdate::Members { ambiguity_changes }
`.
This patch also creates a new private
`RoomEventCacheInner::send_grouped_updates_for_events` method to ensure
the updates are sent in a particular order.
* feat(sdk): Rename `RoomEventCacheUpdate::Append`.
This patch renames `RoomEventCacheUpdate::Append` to `SyncEvents`. The
field `events` is renamed `timeline`.
This patch also renames some variables to clarify the code and to match
the renamings in `RoomEventCacheUpdate`.
* feat(sdk): Rename `RoomEventCacheUpdate::ReadMarker` and `Members`.
This patch renames `ReadMarker { move_to: … }` to `MoveReaderMarkerTo
{ event_id: … }`.
This patch also renames `Members` to `UpdateMembers`.
Finally, this patch renames some other variables to avoid clashes with
terminology in `matrix_sdk_ui`.
* feat(sdk): Split `RoomEventCacheUpdate::SyncEvents`.
This patch splits `RoomEventCacheUpdate::SyncEvents` into 2 new variants:
`AddTimelineEvents` and `AddEphemeralEvents`.
This patch takes this opportunity to update `matrix_sdk_ui::timeline`
a little bit too. `handle_sync_events` is renamed
`handle_ephemeral_events`, and the `SyncTimelineEvent` argument is
removed: it's possible to use `add_events_at` directly to handle the
`SyncTimelineEvent`s.
* fix(sdk): Do not send `RoomEventCacheUpdate` if values are empty.
This patch prevents sending useless `RoomEventCacheUdpate` if their
respective values are empty.
* chore(ui): Update a log message.
* Apply suggestions from code review
Signed-off-by: Benjamin Bouvier <public@benj.me>
---------
Signed-off-by: Benjamin Bouvier <public@benj.me>
Co-authored-by: Benjamin Bouvier <public@benj.me>
This commit is contained in:
@@ -15,7 +15,11 @@
|
||||
use std::{collections::BTreeSet, sync::Arc};
|
||||
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use matrix_sdk::{event_cache::RoomEventCacheUpdate, executor::spawn, Room};
|
||||
use matrix_sdk::{
|
||||
event_cache::{EventsOrigin, RoomEventCacheUpdate},
|
||||
executor::spawn,
|
||||
Room,
|
||||
};
|
||||
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{info, info_span, trace, warn, Instrument, Span};
|
||||
@@ -27,7 +31,10 @@ use super::{
|
||||
queue::send_queued_messages,
|
||||
Error, Timeline, TimelineDropHandle, TimelineFocus,
|
||||
};
|
||||
use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
|
||||
use crate::{
|
||||
timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd},
|
||||
unable_to_decrypt_hook::UtdHookManager,
|
||||
};
|
||||
|
||||
/// Builder that allows creating and configuring various parts of a
|
||||
/// [`Timeline`].
|
||||
@@ -203,7 +210,7 @@ impl TimelineBuilder {
|
||||
};
|
||||
|
||||
match update {
|
||||
RoomEventCacheUpdate::UpdateReadMarker { event_id } => {
|
||||
RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
|
||||
trace!(target = %event_id, "Handling fully read marker.");
|
||||
inner.handle_fully_read_marker(event_id).await;
|
||||
}
|
||||
@@ -220,13 +227,26 @@ impl TimelineBuilder {
|
||||
inner.clear().await;
|
||||
}
|
||||
|
||||
RoomEventCacheUpdate::Append { events, ephemeral, ambiguity_changes } => {
|
||||
trace!("Received new events from sync.");
|
||||
RoomEventCacheUpdate::AddTimelineEvents { events, origin } => {
|
||||
trace!("Received new timeline events.");
|
||||
|
||||
// TODO: (bnjbvr) ephemeral should be handled by the event cache, and
|
||||
// we should replace this with a simple `add_events_at`.
|
||||
inner.handle_sync_events(events, ephemeral).await;
|
||||
inner.add_events_at(
|
||||
events,
|
||||
TimelineEnd::Back,
|
||||
match origin {
|
||||
EventsOrigin::Sync => RemoteEventOrigin::Sync,
|
||||
}
|
||||
).await;
|
||||
}
|
||||
|
||||
RoomEventCacheUpdate::AddEphemeralEvents { events } => {
|
||||
trace!("Received new ephemeral events from sync.");
|
||||
|
||||
// TODO: (bnjbvr) ephemeral should be handled by the event cache.
|
||||
inner.handle_ephemeral_events(events).await;
|
||||
}
|
||||
|
||||
RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
|
||||
if !ambiguity_changes.is_empty() {
|
||||
let member_ambiguity_changes = ambiguity_changes
|
||||
.values()
|
||||
|
||||
@@ -606,13 +606,12 @@ impl<P: RoomDataProvider> TimelineInner<P> {
|
||||
self.state.write().await.handle_fully_read_marker(fully_read_event_id);
|
||||
}
|
||||
|
||||
pub(super) async fn handle_sync_events(
|
||||
pub(super) async fn handle_ephemeral_events(
|
||||
&self,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
) {
|
||||
let mut state = self.state.write().await;
|
||||
state.handle_sync_events(events, ephemeral, &self.room_data_provider, &self.settings).await;
|
||||
state.handle_ephemeral_events(events, &self.room_data_provider).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -127,37 +127,28 @@ impl TimelineInnerState {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(super) async fn handle_sync_events<P: RoomDataProvider>(
|
||||
pub(super) async fn handle_ephemeral_events<P: RoomDataProvider>(
|
||||
&mut self,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
room_data_provider: &P,
|
||||
settings: &TimelineInnerSettings,
|
||||
) {
|
||||
if events.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut txn = self.transaction();
|
||||
|
||||
txn.add_events_at(
|
||||
events,
|
||||
TimelineEnd::Back,
|
||||
RemoteEventOrigin::Sync,
|
||||
room_data_provider,
|
||||
settings,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !ephemeral.is_empty() {
|
||||
trace!("Handling ephemeral room events");
|
||||
let own_user_id = room_data_provider.own_user_id();
|
||||
for raw_event in ephemeral {
|
||||
match raw_event.deserialize() {
|
||||
Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => {
|
||||
txn.handle_explicit_read_receipts(ev.content, own_user_id);
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
let event_type = raw_event.get_field::<String>("type").ok().flatten();
|
||||
warn!(event_type, "Failed to deserialize ephemeral event: {e}");
|
||||
}
|
||||
trace!("Handling ephemeral room events");
|
||||
let own_user_id = room_data_provider.own_user_id();
|
||||
for raw_event in events {
|
||||
match raw_event.deserialize() {
|
||||
Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => {
|
||||
txn.handle_explicit_read_receipts(ev.content, own_user_id);
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
let event_type = raw_event.get_field::<String>("type").ok().flatten();
|
||||
warn!(event_type, "Failed to deserialize ephemeral event: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -515,7 +515,7 @@ impl RoomEventCacheInner {
|
||||
handled_read_marker = true;
|
||||
|
||||
// Propagate to observers. (We ignore the error if there aren't any.)
|
||||
let _ = self.sender.send(RoomEventCacheUpdate::UpdateReadMarker {
|
||||
let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
|
||||
event_id: ev.content.event_id,
|
||||
});
|
||||
}
|
||||
@@ -549,7 +549,7 @@ impl RoomEventCacheInner {
|
||||
async fn handle_timeline(
|
||||
&self,
|
||||
timeline: Timeline,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
|
||||
) -> Result<()> {
|
||||
if timeline.limited {
|
||||
@@ -561,7 +561,7 @@ impl RoomEventCacheInner {
|
||||
self.replace_all_events_by(
|
||||
timeline.events,
|
||||
timeline.prev_batch,
|
||||
ephemeral,
|
||||
ephemeral_events,
|
||||
ambiguity_changes,
|
||||
)
|
||||
.await?;
|
||||
@@ -572,7 +572,7 @@ impl RoomEventCacheInner {
|
||||
self.append_new_events(
|
||||
timeline.events,
|
||||
timeline.prev_batch,
|
||||
ephemeral,
|
||||
ephemeral_events,
|
||||
ambiguity_changes,
|
||||
)
|
||||
.await?;
|
||||
@@ -590,9 +590,9 @@ impl RoomEventCacheInner {
|
||||
/// storage, notifying observers.
|
||||
async fn replace_all_events_by(
|
||||
&self,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
sync_timeline_events: Vec<SyncTimelineEvent>,
|
||||
prev_batch: Option<String>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
|
||||
) -> Result<()> {
|
||||
// Acquire the lock.
|
||||
@@ -607,9 +607,9 @@ impl RoomEventCacheInner {
|
||||
// Push the new events.
|
||||
self.append_events_locked_impl(
|
||||
room_events,
|
||||
events,
|
||||
sync_timeline_events,
|
||||
prev_batch,
|
||||
ephemeral,
|
||||
ephemeral_events,
|
||||
ambiguity_changes,
|
||||
)
|
||||
.await
|
||||
@@ -619,16 +619,16 @@ impl RoomEventCacheInner {
|
||||
/// observers.
|
||||
async fn append_new_events(
|
||||
&self,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
sync_timeline_events: Vec<SyncTimelineEvent>,
|
||||
prev_batch: Option<String>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
|
||||
) -> Result<()> {
|
||||
self.append_events_locked_impl(
|
||||
self.events.write().await,
|
||||
events,
|
||||
sync_timeline_events,
|
||||
prev_batch,
|
||||
ephemeral,
|
||||
ephemeral_events,
|
||||
ambiguity_changes,
|
||||
)
|
||||
.await
|
||||
@@ -642,14 +642,14 @@ impl RoomEventCacheInner {
|
||||
async fn append_events_locked_impl(
|
||||
&self,
|
||||
mut room_events: RwLockWriteGuard<'_, RoomEvents>,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
sync_timeline_events: Vec<SyncTimelineEvent>,
|
||||
prev_batch: Option<String>,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
|
||||
) -> Result<()> {
|
||||
if events.is_empty()
|
||||
if sync_timeline_events.is_empty()
|
||||
&& prev_batch.is_none()
|
||||
&& ephemeral.is_empty()
|
||||
&& ephemeral_events.is_empty()
|
||||
&& ambiguity_changes.is_empty()
|
||||
{
|
||||
return Ok(());
|
||||
@@ -662,7 +662,7 @@ impl RoomEventCacheInner {
|
||||
room_events.push_gap(Gap { prev_token: prev_token.clone() });
|
||||
}
|
||||
|
||||
room_events.push_events(events.clone().into_iter());
|
||||
room_events.push_events(sync_timeline_events.clone().into_iter());
|
||||
}
|
||||
|
||||
// Now that all events have been added, we can trigger the
|
||||
@@ -671,8 +671,25 @@ impl RoomEventCacheInner {
|
||||
self.pagination.token_notifier.notify_one();
|
||||
}
|
||||
|
||||
let _ =
|
||||
self.sender.send(RoomEventCacheUpdate::Append { events, ephemeral, ambiguity_changes });
|
||||
// The order of `RoomEventCacheUpdate`s is **really** important here.
|
||||
{
|
||||
if !sync_timeline_events.is_empty() {
|
||||
let _ = self.sender.send(RoomEventCacheUpdate::AddTimelineEvents {
|
||||
events: sync_timeline_events,
|
||||
origin: EventsOrigin::Sync,
|
||||
});
|
||||
}
|
||||
|
||||
if !ephemeral_events.is_empty() {
|
||||
let _ = self
|
||||
.sender
|
||||
.send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
|
||||
}
|
||||
|
||||
if !ambiguity_changes.is_empty() {
|
||||
let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -702,24 +719,42 @@ pub enum RoomEventCacheUpdate {
|
||||
Clear,
|
||||
|
||||
/// The fully read marker has moved to a different event.
|
||||
UpdateReadMarker {
|
||||
MoveReadMarkerTo {
|
||||
/// Event at which the read marker is now pointing.
|
||||
event_id: OwnedEventId,
|
||||
},
|
||||
|
||||
/// The room has new events.
|
||||
Append {
|
||||
/// All the new events that have been added to the room's timeline.
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
/// XXX: this is temporary, until read receipts are handled in the event
|
||||
/// cache
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
/// The members have changed.
|
||||
UpdateMembers {
|
||||
/// Collection of ambiguity changes that room member events trigger.
|
||||
///
|
||||
/// This is a map of event ID of the `m.room.member` event to the
|
||||
/// details of the ambiguity change.
|
||||
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
|
||||
},
|
||||
|
||||
/// The room has received new timeline events.
|
||||
AddTimelineEvents {
|
||||
/// All the new events that have been added to the room's timeline.
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
|
||||
/// Where the events are coming from.
|
||||
origin: EventsOrigin,
|
||||
},
|
||||
|
||||
/// The room has received new ephemeral events.
|
||||
AddEphemeralEvents {
|
||||
/// XXX: this is temporary, until read receipts are handled in the event
|
||||
/// cache
|
||||
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Indicate where events are coming from.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum EventsOrigin {
|
||||
/// Events are coming from a sync.
|
||||
Sync,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -790,7 +825,7 @@ mod tests {
|
||||
// … there's only one read marker update.
|
||||
assert_matches!(
|
||||
stream.recv().await.unwrap(),
|
||||
RoomEventCacheUpdate::UpdateReadMarker { .. }
|
||||
RoomEventCacheUpdate::MoveReadMarkerTo { .. }
|
||||
);
|
||||
|
||||
assert!(stream.recv().now_or_never().is_none());
|
||||
|
||||
@@ -104,7 +104,7 @@ async fn test_add_initial_events() {
|
||||
.expect("should've received a room event cache update");
|
||||
|
||||
// Which contains the event that was sent beforehand.
|
||||
assert_let!(RoomEventCacheUpdate::Append { events, .. } = update);
|
||||
assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update);
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "bonjour monde");
|
||||
|
||||
@@ -130,7 +130,7 @@ async fn test_add_initial_events() {
|
||||
.await
|
||||
.expect("timeout after receiving a sync update")
|
||||
.expect("should've received a room event cache update");
|
||||
assert_let!(RoomEventCacheUpdate::Append { events, .. } = update);
|
||||
assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update);
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "new choice!");
|
||||
|
||||
@@ -233,7 +233,7 @@ async fn test_ignored_unignored() {
|
||||
.expect("timeout after receiving a sync update")
|
||||
.expect("should've received a room event cache update");
|
||||
|
||||
assert_let!(RoomEventCacheUpdate::Append { events, .. } = update);
|
||||
assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update);
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_event_matches_msg(&events[0], "i don't like this dexter");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user