From 664a64b3f35fc6c50ded44a7e86b22a14edf6568 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 May 2024 11:26:38 +0200 Subject: [PATCH] feat(sdk): Improve `RoomEventCacheUpdate` (#3471) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore(sdk): Rename `RoomEventCacheUpdate::UpdateReadMarker`. This patch renames `RoomEventCacheUpdate::UpdateReadMarker` to `ReadMarker`. The `Update` prefix is already part of the enum name. * feat(sdk): Rename `RoomEventCacheUpdate::ReadMarker::event_id` to `move_to`. This patch renames `RoomEventCacheUpdate::ReadMarker::event_id` to `ReadMarker::move_to` as I feel like it conveys a better semantics. * feat(sdk): Extract `RoomEventCacheUpdate::Append::ambiguity_changes`. This patch extracts `RoomEventCacheUpdate::Append::ambiguity_changes` into a new variant `RoomEventCacheUpdate::Members { ambiguity_changes } `. This patch also creates a new private `RoomEventCacheInner::send_grouped_updates_for_events` method to ensure the updates are sent in a particular order. * feat(sdk): Rename `RoomEventCacheUpdate::Append`. This patch renames `RoomEventCacheUpdate::Append` to `SyncEvents`. The field `events` is renamed `timeline`. This patch also renames some variables to clarify the code and to match the renamings in `RoomEventCacheUpdate`. * feat(sdk): Rename `RoomEventCacheUpdate::ReadMarker` and `Members`. This patch renames `ReadMarker { move_to: … }` to `MoveReaderMarkerTo { event_id: … }`. This patch also renames `Members` to `UpdateMembers`. Finally, this patch renames some other variables to avoid clashes with terminology in `matrix_sdk_ui`. * feat(sdk): Split `RoomEventCacheUpdate::SyncEvents`. This patch splits `RoomEventCacheUpdate::SyncEvents` into 2 new variants: `AddTimelineEvents` and `AddEphemeralEvents`. This patch takes this opportunity to update `matrix_sdk_ui::timeline` a little bit too. `handle_sync_events` is renamed `handle_ephemeral_events`, and the `SyncTimelineEvent` argument is removed: it's possible to use `add_events_at` directly to handle the `SyncTimelineEvent`s. * fix(sdk): Do not send `RoomEventCacheUpdate` if values are empty. This patch prevents sending useless `RoomEventCacheUdpate` if their respective values are empty. * chore(ui): Update a log message. * Apply suggestions from code review Signed-off-by: Benjamin Bouvier --------- Signed-off-by: Benjamin Bouvier Co-authored-by: Benjamin Bouvier --- crates/matrix-sdk-ui/src/timeline/builder.rs | 36 ++++++-- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 7 +- .../matrix-sdk-ui/src/timeline/inner/state.rs | 43 ++++----- crates/matrix-sdk/src/event_cache/mod.rs | 91 +++++++++++++------ .../tests/integration/event_cache.rs | 6 +- 5 files changed, 114 insertions(+), 69 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index f5dc0caa2..96d9fec8a 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -15,7 +15,11 @@ use std::{collections::BTreeSet, sync::Arc}; use futures_util::{pin_mut, StreamExt}; -use matrix_sdk::{event_cache::RoomEventCacheUpdate, executor::spawn, Room}; +use matrix_sdk::{ + event_cache::{EventsOrigin, RoomEventCacheUpdate}, + executor::spawn, + Room, +}; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; use tokio::sync::{broadcast, mpsc}; use tracing::{info, info_span, trace, warn, Instrument, Span}; @@ -27,7 +31,10 @@ use super::{ queue::send_queued_messages, Error, Timeline, TimelineDropHandle, TimelineFocus, }; -use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager}; +use crate::{ + timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd}, + unable_to_decrypt_hook::UtdHookManager, +}; /// Builder that allows creating and configuring various parts of a /// [`Timeline`]. @@ -203,7 +210,7 @@ impl TimelineBuilder { }; match update { - RoomEventCacheUpdate::UpdateReadMarker { event_id } => { + RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => { trace!(target = %event_id, "Handling fully read marker."); inner.handle_fully_read_marker(event_id).await; } @@ -220,13 +227,26 @@ impl TimelineBuilder { inner.clear().await; } - RoomEventCacheUpdate::Append { events, ephemeral, ambiguity_changes } => { - trace!("Received new events from sync."); + RoomEventCacheUpdate::AddTimelineEvents { events, origin } => { + trace!("Received new timeline events."); - // TODO: (bnjbvr) ephemeral should be handled by the event cache, and - // we should replace this with a simple `add_events_at`. - inner.handle_sync_events(events, ephemeral).await; + inner.add_events_at( + events, + TimelineEnd::Back, + match origin { + EventsOrigin::Sync => RemoteEventOrigin::Sync, + } + ).await; + } + RoomEventCacheUpdate::AddEphemeralEvents { events } => { + trace!("Received new ephemeral events from sync."); + + // TODO: (bnjbvr) ephemeral should be handled by the event cache. + inner.handle_ephemeral_events(events).await; + } + + RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => { if !ambiguity_changes.is_empty() { let member_ambiguity_changes = ambiguity_changes .values() diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 4323ad159..108e12779 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -606,13 +606,12 @@ impl TimelineInner

{ self.state.write().await.handle_fully_read_marker(fully_read_event_id); } - pub(super) async fn handle_sync_events( + pub(super) async fn handle_ephemeral_events( &self, - events: Vec, - ephemeral: Vec>, + events: Vec>, ) { let mut state = self.state.write().await; - state.handle_sync_events(events, ephemeral, &self.room_data_provider, &self.settings).await; + state.handle_ephemeral_events(events, &self.room_data_provider).await; } #[cfg(test)] diff --git a/crates/matrix-sdk-ui/src/timeline/inner/state.rs b/crates/matrix-sdk-ui/src/timeline/inner/state.rs index 1cdb91d49..e54bf4d74 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/state.rs @@ -127,37 +127,28 @@ impl TimelineInnerState { } #[instrument(skip_all)] - pub(super) async fn handle_sync_events( + pub(super) async fn handle_ephemeral_events( &mut self, - events: Vec, - ephemeral: Vec>, + events: Vec>, room_data_provider: &P, - settings: &TimelineInnerSettings, ) { + if events.is_empty() { + return; + } + let mut txn = self.transaction(); - txn.add_events_at( - events, - TimelineEnd::Back, - RemoteEventOrigin::Sync, - room_data_provider, - settings, - ) - .await; - - if !ephemeral.is_empty() { - trace!("Handling ephemeral room events"); - let own_user_id = room_data_provider.own_user_id(); - for raw_event in ephemeral { - match raw_event.deserialize() { - Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => { - txn.handle_explicit_read_receipts(ev.content, own_user_id); - } - Ok(_) => {} - Err(e) => { - let event_type = raw_event.get_field::("type").ok().flatten(); - warn!(event_type, "Failed to deserialize ephemeral event: {e}"); - } + trace!("Handling ephemeral room events"); + let own_user_id = room_data_provider.own_user_id(); + for raw_event in events { + match raw_event.deserialize() { + Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => { + txn.handle_explicit_read_receipts(ev.content, own_user_id); + } + Ok(_) => {} + Err(e) => { + let event_type = raw_event.get_field::("type").ok().flatten(); + warn!(event_type, "Failed to deserialize ephemeral event: {e}"); } } } diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index c4226a2bc..b4349737e 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -515,7 +515,7 @@ impl RoomEventCacheInner { handled_read_marker = true; // Propagate to observers. (We ignore the error if there aren't any.) - let _ = self.sender.send(RoomEventCacheUpdate::UpdateReadMarker { + let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id, }); } @@ -549,7 +549,7 @@ impl RoomEventCacheInner { async fn handle_timeline( &self, timeline: Timeline, - ephemeral: Vec>, + ephemeral_events: Vec>, ambiguity_changes: BTreeMap, ) -> Result<()> { if timeline.limited { @@ -561,7 +561,7 @@ impl RoomEventCacheInner { self.replace_all_events_by( timeline.events, timeline.prev_batch, - ephemeral, + ephemeral_events, ambiguity_changes, ) .await?; @@ -572,7 +572,7 @@ impl RoomEventCacheInner { self.append_new_events( timeline.events, timeline.prev_batch, - ephemeral, + ephemeral_events, ambiguity_changes, ) .await?; @@ -590,9 +590,9 @@ impl RoomEventCacheInner { /// storage, notifying observers. async fn replace_all_events_by( &self, - events: Vec, + sync_timeline_events: Vec, prev_batch: Option, - ephemeral: Vec>, + ephemeral_events: Vec>, ambiguity_changes: BTreeMap, ) -> Result<()> { // Acquire the lock. @@ -607,9 +607,9 @@ impl RoomEventCacheInner { // Push the new events. self.append_events_locked_impl( room_events, - events, + sync_timeline_events, prev_batch, - ephemeral, + ephemeral_events, ambiguity_changes, ) .await @@ -619,16 +619,16 @@ impl RoomEventCacheInner { /// observers. async fn append_new_events( &self, - events: Vec, + sync_timeline_events: Vec, prev_batch: Option, - ephemeral: Vec>, + ephemeral_events: Vec>, ambiguity_changes: BTreeMap, ) -> Result<()> { self.append_events_locked_impl( self.events.write().await, - events, + sync_timeline_events, prev_batch, - ephemeral, + ephemeral_events, ambiguity_changes, ) .await @@ -642,14 +642,14 @@ impl RoomEventCacheInner { async fn append_events_locked_impl( &self, mut room_events: RwLockWriteGuard<'_, RoomEvents>, - events: Vec, + sync_timeline_events: Vec, prev_batch: Option, - ephemeral: Vec>, + ephemeral_events: Vec>, ambiguity_changes: BTreeMap, ) -> Result<()> { - if events.is_empty() + if sync_timeline_events.is_empty() && prev_batch.is_none() - && ephemeral.is_empty() + && ephemeral_events.is_empty() && ambiguity_changes.is_empty() { return Ok(()); @@ -662,7 +662,7 @@ impl RoomEventCacheInner { room_events.push_gap(Gap { prev_token: prev_token.clone() }); } - room_events.push_events(events.clone().into_iter()); + room_events.push_events(sync_timeline_events.clone().into_iter()); } // Now that all events have been added, we can trigger the @@ -671,8 +671,25 @@ impl RoomEventCacheInner { self.pagination.token_notifier.notify_one(); } - let _ = - self.sender.send(RoomEventCacheUpdate::Append { events, ephemeral, ambiguity_changes }); + // The order of `RoomEventCacheUpdate`s is **really** important here. + { + if !sync_timeline_events.is_empty() { + let _ = self.sender.send(RoomEventCacheUpdate::AddTimelineEvents { + events: sync_timeline_events, + origin: EventsOrigin::Sync, + }); + } + + if !ephemeral_events.is_empty() { + let _ = self + .sender + .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }); + } + + if !ambiguity_changes.is_empty() { + let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }); + } + } Ok(()) } @@ -702,24 +719,42 @@ pub enum RoomEventCacheUpdate { Clear, /// The fully read marker has moved to a different event. - UpdateReadMarker { + MoveReadMarkerTo { /// Event at which the read marker is now pointing. event_id: OwnedEventId, }, - /// The room has new events. - Append { - /// All the new events that have been added to the room's timeline. - events: Vec, - /// XXX: this is temporary, until read receipts are handled in the event - /// cache - ephemeral: Vec>, + /// The members have changed. + UpdateMembers { /// Collection of ambiguity changes that room member events trigger. /// /// This is a map of event ID of the `m.room.member` event to the /// details of the ambiguity change. ambiguity_changes: BTreeMap, }, + + /// The room has received new timeline events. + AddTimelineEvents { + /// All the new events that have been added to the room's timeline. + events: Vec, + + /// Where the events are coming from. + origin: EventsOrigin, + }, + + /// The room has received new ephemeral events. + AddEphemeralEvents { + /// XXX: this is temporary, until read receipts are handled in the event + /// cache + events: Vec>, + }, +} + +/// Indicate where events are coming from. +#[derive(Debug, Clone)] +pub enum EventsOrigin { + /// Events are coming from a sync. + Sync, } #[cfg(test)] @@ -790,7 +825,7 @@ mod tests { // … there's only one read marker update. assert_matches!( stream.recv().await.unwrap(), - RoomEventCacheUpdate::UpdateReadMarker { .. } + RoomEventCacheUpdate::MoveReadMarkerTo { .. } ); assert!(stream.recv().now_or_never().is_none()); diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 72029127f..5d6124f47 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -104,7 +104,7 @@ async fn test_add_initial_events() { .expect("should've received a room event cache update"); // Which contains the event that was sent beforehand. - assert_let!(RoomEventCacheUpdate::Append { events, .. } = update); + assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update); assert_eq!(events.len(), 1); assert_event_matches_msg(&events[0], "bonjour monde"); @@ -130,7 +130,7 @@ async fn test_add_initial_events() { .await .expect("timeout after receiving a sync update") .expect("should've received a room event cache update"); - assert_let!(RoomEventCacheUpdate::Append { events, .. } = update); + assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update); assert_eq!(events.len(), 1); assert_event_matches_msg(&events[0], "new choice!"); @@ -233,7 +233,7 @@ async fn test_ignored_unignored() { .expect("timeout after receiving a sync update") .expect("should've received a room event cache update"); - assert_let!(RoomEventCacheUpdate::Append { events, .. } = update); + assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update); assert_eq!(events.len(), 1); assert_event_matches_msg(&events[0], "i don't like this dexter");