diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 25475a3fb..a123ad268 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -245,21 +245,6 @@ impl TimelineBuilder { .instrument(span) }); - let mut ignore_user_list_stream = client.subscribe_to_ignore_user_list_changes(); - let ignore_user_list_update_join_handle = spawn({ - let inner = inner.clone(); - - let span = info_span!(parent: Span::none(), "ignore_user_list_update_handler", room_id = ?room.room_id()); - span.follows_from(Span::current()); - - async move { - while ignore_user_list_stream.next().await.is_some() { - inner.clear().await; - } - } - .instrument(span) - }); - // Not using room.add_event_handler here because RoomKey events are // to-device events that are not received in the context of a room. @@ -321,7 +306,6 @@ impl TimelineBuilder { client, event_handler_handles: handles, room_update_join_handle, - ignore_user_list_update_join_handle, room_key_from_backups_join_handle, _event_cache_drop_handle: event_cache_drop, }), diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index c1599388c..2f0eb016b 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -820,7 +820,6 @@ struct TimelineDropHandle { client: Client, event_handler_handles: Vec, room_update_join_handle: JoinHandle<()>, - ignore_user_list_update_join_handle: JoinHandle<()>, room_key_from_backups_join_handle: JoinHandle<()>, _event_cache_drop_handle: Arc, } @@ -831,7 +830,6 @@ impl Drop for TimelineDropHandle { self.client.remove_event_handler(handle); } self.room_update_join_handle.abort(); - self.ignore_user_list_update_join_handle.abort(); self.room_key_from_backups_join_handle.abort(); } } diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 0dbce1d0e..33c806a88 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -48,6 +48,7 @@ use std::{ time::Duration, }; +use eyeball::Subscriber; use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, SyncTimelineEvent, TimelineEvent}, sync::{JoinedRoomUpdate, LeftRoomUpdate, RoomUpdates, Timeline}, @@ -66,7 +67,7 @@ use tokio::{ }, time::timeout, }; -use tracing::{error, instrument, trace, warn}; +use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span}; use self::{ linked_chunk::ChunkContent, @@ -118,6 +119,9 @@ pub type Result = std::result::Result; pub struct EventCacheDropHandles { /// Task that listens to room updates. listen_updates_task: JoinHandle<()>, + + /// Task that listens to updates to the user's ignored list. + ignore_user_list_update_task: JoinHandle<()>, } impl Debug for EventCacheDropHandles { @@ -129,6 +133,7 @@ impl Debug for EventCacheDropHandles { impl Drop for EventCacheDropHandles { fn drop(&mut self) { self.listen_updates_task.abort(); + self.ignore_user_list_update_task.abort(); } } @@ -172,16 +177,39 @@ impl EventCache { let _ = self.inner.drop_handles.get_or_init(|| { // Spawn the task that will listen to all the room updates at once. - let room_updates_feed = client.subscribe_to_all_room_updates(); - let listen_updates_task = - spawn(Self::listen_task(self.inner.clone(), room_updates_feed)); + let listen_updates_task = spawn(Self::listen_task( + self.inner.clone(), + client.subscribe_to_all_room_updates(), + )); - Arc::new(EventCacheDropHandles { listen_updates_task }) + let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task( + self.inner.clone(), + client.subscribe_to_ignore_user_list_changes(), + )); + + Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task }) }); Ok(()) } + #[instrument(skip_all)] + async fn ignore_user_list_update_task( + inner: Arc, + mut ignore_user_list_stream: Subscriber>, + ) { + let span = info_span!(parent: Span::none(), "ignore_user_list_update_task"); + span.follows_from(Span::current()); + + async move { + while ignore_user_list_stream.next().await.is_some() { + inner.clear_all_rooms().await; + } + } + .instrument(span) + .await; + } + #[instrument(skip_all)] async fn listen_task( inner: Arc, @@ -209,20 +237,7 @@ impl EventCache { // no way to reconcile at the moment! // TODO: implement Smart Matching™, warn!(num_skipped, "Lagged behind room updates, clearing all rooms"); - - // Note: one must NOT clear the `by_room` map, because if something subscribed - // to a room update, they would never get any new update for that room, since - // re-creating the `RoomEventCache` would create a new unrelated sender. - - let rooms = inner.by_room.write().await; - - for room in rooms.values() { - // Notify all the observers that we've lost track of state. (We ignore the - // error if there aren't any.) - let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear); - // Clear all the events in memory. - room.inner.events.write().await.reset(); - } + inner.clear_all_rooms().await; } Err(RecvError::Closed) => { @@ -301,6 +316,22 @@ impl EventCacheInner { Ok(Client { inner: self.client.upgrade().ok_or(EventCacheError::ClientDropped)? }) } + /// Clears all the room's data. + async fn clear_all_rooms(&self) { + // Note: one must NOT clear the `by_room` map, because if something subscribed + // to a room update, they would never get any new update for that room, since + // re-creating the `RoomEventCache` would create a new unrelated sender. + let rooms = self.by_room.write().await; + + for room in rooms.values() { + // Notify all the observers that we've lost track of state. (We ignore the + // error if there aren't any.) + let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear); + // Clear all the events in memory. + room.inner.events.write().await.reset(); + } + } + /// Handles a single set of room updates at once. #[instrument(skip(self, updates))] async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {