From d2b02ec2e8b523be8310a571b2aa2d05bdd1e451 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 15 Jan 2024 12:29:36 +0100 Subject: [PATCH] sliding_sync: Trigger room list update when room info changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes https://github.com/element-hq/element-x-ios/issues/1847 Signed-off-by: Timo Kösters --- bindings/matrix-sdk-ffi/src/room_list.rs | 5 ++- crates/matrix-sdk-base/src/client.rs | 24 +++++++---- crates/matrix-sdk-base/src/rooms/normal.rs | 17 ++++++-- crates/matrix-sdk-base/src/sliding_sync.rs | 5 ++- crates/matrix-sdk-base/src/store/mod.rs | 32 ++++++++++++--- .../src/room_list_service/room_list.rs | 41 +++++++++++++++++-- .../tests/integration/room_list_service.rs | 3 +- crates/matrix-sdk/src/client/mod.rs | 15 +++++++ crates/matrix-sdk/src/sliding_sync/mod.rs | 4 ++ 9 files changed, 123 insertions(+), 23 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room_list.rs b/bindings/matrix-sdk-ffi/src/room_list.rs index 1911a924a..80379233c 100644 --- a/bindings/matrix-sdk-ffi/src/room_list.rs +++ b/bindings/matrix-sdk-ffi/src/room_list.rs @@ -213,7 +213,10 @@ impl RoomList { listener: Box, ) -> RoomListEntriesWithDynamicAdaptersResult { let (entries_stream, dynamic_entries_controller) = - self.inner.entries_with_dynamic_adapters(page_size.try_into().unwrap()); + self.inner.entries_with_dynamic_adapters( + page_size.try_into().unwrap(), + self.room_list_service.inner.client().clone(), + ); RoomListEntriesWithDynamicAdaptersResult { controller: Arc::new(RoomListDynamicEntriesController::new( diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index c833a475a..66869426b 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -16,6 +16,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, fmt, iter, + sync::RwLock as StdRwLock, }; #[cfg(feature = "e2e-encryption")] use std::{ops::Deref, sync::Arc}; @@ -50,6 +51,7 @@ use ruma::{ serde::Raw, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId, }; +use tokio::sync::broadcast; use tokio::sync::Mutex; #[cfg(feature = "e2e-encryption")] use tokio::sync::{RwLock, RwLockReadGuard}; @@ -94,6 +96,7 @@ pub struct BaseClient { olm_machine: Arc>>, /// Observable of when a user is ignored/unignored. pub(crate) ignore_user_list_changes: SharedObservable<()>, + pub(crate) roominfo_update_sender: Arc>>>, } #[cfg(not(tarpaulin_include))] @@ -126,9 +129,16 @@ impl BaseClient { #[cfg(feature = "e2e-encryption")] olm_machine: Default::default(), ignore_user_list_changes: Default::default(), + roominfo_update_sender: Arc::new(StdRwLock::new(None)), } } + /// Replaces the sender that triggers updates when room info changes. This + /// should only be used once at initialization. + pub fn set_roominfo_update_sender(&self, sender: broadcast::Sender) { + *self.roominfo_update_sender.write().unwrap() = Some(sender); + } + /// Clones the current base client to use the same crypto store but a /// different, in-memory store config, and resets transient state. pub fn clone_with_in_memory_state_store(&self) -> Self { @@ -162,7 +172,7 @@ impl BaseClient { /// Lookup the Room for the given RoomId, or create one, if it didn't exist /// yet in the store pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room { - self.store.get_or_create_room(room_id, room_state) + self.store.get_or_create_room(room_id, room_state, &self) } /// Get all the rooms this client knows about. @@ -195,7 +205,7 @@ impl BaseClient { /// This method panics if it is called twice. pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> { debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login"); - self.store.set_session_meta(session_meta.clone()).await?; + self.store.set_session_meta(session_meta.clone(), &self).await?; #[cfg(feature = "e2e-encryption")] self.regenerate_olm().await?; @@ -726,7 +736,7 @@ impl BaseClient { /// /// Update the internal and cached state accordingly. Return the final Room. pub async fn room_joined(&self, room_id: &RoomId) -> Result { - let room = self.store.get_or_create_room(room_id, RoomState::Joined); + let room = self.store.get_or_create_room(room_id, RoomState::Joined, &self); if room.state() != RoomState::Joined { let _sync_lock = self.sync_lock().lock().await; @@ -747,7 +757,7 @@ impl BaseClient { /// /// Update the internal and cached state accordingly. pub async fn room_left(&self, room_id: &RoomId) -> Result<()> { - let room = self.store.get_or_create_room(room_id, RoomState::Left); + let room = self.store.get_or_create_room(room_id, RoomState::Left, &self); if room.state() != RoomState::Left { let _sync_lock = self.sync_lock().lock().await; @@ -817,7 +827,7 @@ impl BaseClient { let mut notifications = Default::default(); for (room_id, new_info) in response.rooms.join { - let room = self.store.get_or_create_room(&room_id, RoomState::Joined); + let room = self.store.get_or_create_room(&room_id, RoomState::Joined, &self); let mut room_info = room.clone_info(); room_info.mark_as_joined(); @@ -925,7 +935,7 @@ impl BaseClient { } for (room_id, new_info) in response.rooms.leave { - let room = self.store.get_or_create_room(&room_id, RoomState::Left); + let room = self.store.get_or_create_room(&room_id, RoomState::Left, &self); let mut room_info = room.clone_info(); room_info.mark_as_left(); room_info.mark_state_partially_synced(); @@ -979,7 +989,7 @@ impl BaseClient { } for (room_id, new_info) in response.rooms.invite { - let room = self.store.get_or_create_room(&room_id, RoomState::Invited); + let room = self.store.get_or_create_room(&room_id, RoomState::Invited, &self); let mut room_info = room.clone_info(); room_info.mark_as_invited(); room_info.mark_state_fully_synced(); diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 8f5839ee2..d96146dee 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -54,6 +54,7 @@ use ruma::{ RoomId, RoomVersionId, UserId, }; use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; use tracing::{debug, field::debug, info, instrument, trace, warn}; use super::{ @@ -77,6 +78,7 @@ pub struct Room { room_id: OwnedRoomId, own_user_id: OwnedUserId, inner: SharedObservable, + roominfo_update_sender: Option>, store: Arc, /// The most recent few encrypted events. When the keys come through to @@ -144,15 +146,17 @@ impl Room { store: Arc, room_id: &RoomId, room_state: RoomState, + roominfo_update_sender: Option>, ) -> Self { let room_info = RoomInfo::new(room_id, room_state); - Self::restore(own_user_id, store, room_info) + Self::restore(own_user_id, store, room_info, roominfo_update_sender) } pub(crate) fn restore( own_user_id: &UserId, store: Arc, room_info: RoomInfo, + roominfo_update_sender: Option>, ) -> Self { Self { own_user_id: own_user_id.into(), @@ -163,6 +167,7 @@ impl Room { latest_encrypted_events: Arc::new(SyncRwLock::new(RingBuffer::new( Self::MAX_ENCRYPTED_EVENTS, ))), + roominfo_update_sender, } } @@ -644,10 +649,14 @@ impl Room { self.inner.get() } - /// Update the inner summary with the given RoomInfo, and notify - /// subscribers. + /// Update the summary with given RoomInfo. This also triggers an update for + /// the roominfo_update_recv. pub fn set_room_info(&self, room_info: RoomInfo) { self.inner.set(room_info); + if let Some(sender) = &self.roominfo_update_sender { + // Ignore error if receiver is down + let _ = sender.send(self.room_id.clone()); + } } /// Get the `RoomMember` with the given `user_id`. @@ -1615,7 +1624,7 @@ mod tests { let user_id = user_id!("@me:example.org"); let room_id = room_id!("!test:localhost"); - (store.clone(), Room::new(user_id, store, room_id, room_type)) + (store.clone(), Room::new(user_id, store, room_id, room_type, None)) } fn make_stripped_member_event(user_id: &UserId, name: &str) -> Raw { diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 8675d4cd2..5adb1e7db 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -460,7 +460,7 @@ impl BaseClient { room_id: &RoomId, ) -> (Room, RoomInfo, Option) { if let Some(invite_state) = &room_data.invite_state { - let room = store.get_or_create_room(room_id, RoomState::Invited); + let room = store.get_or_create_room(room_id, RoomState::Invited, &self); let mut room_info = room.clone_info(); // We don't actually know what events are inside invite_state. In theory, they @@ -482,7 +482,7 @@ impl BaseClient { Some(v3::InvitedRoom::from(v3::InviteState::from(invite_state.clone()))), ) } else { - let room = store.get_or_create_room(room_id, RoomState::Joined); + let room = store.get_or_create_room(room_id, RoomState::Joined, &self); let mut room_info = room.clone_info(); // We default to considering this room joined if it's not an invite. If it's @@ -1596,6 +1596,7 @@ mod tests { Arc::new(MemoryStore::new()), room_id!("!r:e.co"), RoomState::Joined, + None, ) } diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index b480d4881..ab2843abd 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -58,7 +58,7 @@ pub type BoxStream = Pin + Send>>; use crate::{ rooms::{RoomInfo, RoomState}, - MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta, + BaseClient, MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta, }; pub(crate) mod ambiguity_map; @@ -171,9 +171,18 @@ impl Store { /// inner `StateStore`. /// /// This method panics if it is called twice. - pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> { + pub async fn set_session_meta( + &self, + session_meta: SessionMeta, + client: &BaseClient, + ) -> Result<()> { for info in self.inner.get_room_infos().await? { - let room = Room::restore(&session_meta.user_id, self.inner.clone(), info); + let room = Room::restore( + &session_meta.user_id, + self.inner.clone(), + info, + client.roominfo_update_sender.read().unwrap().clone(), + ); self.rooms.write().unwrap().insert(room.room_id().to_owned(), room); } @@ -214,7 +223,12 @@ impl Store { /// Lookup the Room for the given RoomId, or create one, if it didn't exist /// yet in the store - pub fn get_or_create_room(&self, room_id: &RoomId, room_type: RoomState) -> Room { + pub fn get_or_create_room( + &self, + room_id: &RoomId, + room_type: RoomState, + client: &BaseClient, + ) -> Room { let user_id = &self.session_meta.get().expect("Creating room while not being logged in").user_id; @@ -222,7 +236,15 @@ impl Store { .write() .unwrap() .entry(room_id.to_owned()) - .or_insert_with(|| Room::new(user_id, self.inner.clone(), room_id, room_type)) + .or_insert_with(|| { + Room::new( + user_id, + self.inner.clone(), + room_id, + room_type, + client.roominfo_update_sender.read().unwrap().clone(), + ) + }) .clone() } } diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index 93d1e6763..febab7ac6 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -23,8 +23,9 @@ use eyeball_im_util::vector::VectorObserverExt; use futures_util::{pin_mut, stream, Stream, StreamExt as _}; use matrix_sdk::{ executor::{spawn, JoinHandle}, - RoomListEntry, SlidingSync, SlidingSyncList, + Client, RoomListEntry, SlidingSync, SlidingSyncList, }; +use tokio::select; use super::{filters::Filter, Error, State}; @@ -123,6 +124,7 @@ impl RoomList { pub fn entries_with_dynamic_adapters( &self, page_size: usize, + client: Client, ) -> (impl Stream>>, RoomListDynamicEntriesController) { let list = self.sliding_sync_list.clone(); @@ -142,8 +144,41 @@ impl RoomList { let stream = stream! { loop { let filter_fn = filter_fn_cell.take().await; - let (values, stream) = list - .room_list_stream() + let (raw_values, mut raw_stream) = list.room_list_stream(); + let mut raw_current_values = raw_values.clone(); + let client = client.clone(); + let raw_stream_with_recv = stream! { + loop { + let mut roominfo_update_recv = client.roominfo_update_recv(); + select! { + v = raw_stream.next() => { + if let Some(v) = v { + for change in &v { + change.clone().apply(&mut raw_current_values); + } + yield v; + } else { + break; + } + } + room_id = roominfo_update_recv.recv() => { + if let Ok(room_id) = room_id { + for (index, room) in raw_current_values.iter().enumerate() { + if let RoomListEntry::Filled(r) = room { + if r == &room_id { + let update = VectorDiff::Set { index, value: raw_current_values[index].clone() }; + yield vec![update]; + break; + } + } + } + } + } + } + } + }; + + let (values, stream) = (raw_values, raw_stream_with_recv) .filter(filter_fn) .dynamic_limit_with_initial_value(page_size, limit_stream.clone()); diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 276e54280..cf0bac604 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -1599,7 +1599,8 @@ async fn test_dynamic_entries_stream() -> Result<(), Error> { let all_rooms = room_list.all_rooms().await?; - let (dynamic_entries_stream, dynamic_entries) = all_rooms.entries_with_dynamic_adapters(5); + let (dynamic_entries_stream, dynamic_entries) = + all_rooms.entries_with_dynamic_adapters(5, client.clone()); pin_mut!(dynamic_entries_stream); sync_then_assert_request_and_fake_response! { diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index c6067abac..92ecabb9d 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -212,6 +212,10 @@ pub(crate) struct ClientLocks { /// outside the `OlmMachine`. #[cfg(feature = "e2e-encryption")] pub(crate) crypto_store_generation: Arc>>, + + /// Ensure that only one service is syncing at a time. Otherwise, they will + /// override each other's updates because they batch changes. + pub(crate) sync_service_lock: Mutex<()>, } pub(crate) struct ClientInner { @@ -241,6 +245,9 @@ pub(crate) struct ClientInner { /// Notification handlers. See `register_notification_handler`. notification_handlers: RwLock>, pub(crate) room_update_channels: StdMutex>>, + /// An update is sent every time the info of a room changes. This is used to + /// trigger updates for the ui. + roominfo_update_recv: broadcast::Receiver, /// Whether the client should update its homeserver URL with the discovery /// information present in the login response. respect_login_well_known: bool, @@ -276,6 +283,9 @@ impl ClientInner { respect_login_well_known: bool, #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings, ) -> Arc { + let (roominfo_update_sender, roominfo_update_recv) = broadcast::channel(100); + base_client.set_roominfo_update_sender(roominfo_update_sender.clone()); + let client = Self { homeserver: StdRwLock::new(homeserver), auth_ctx, @@ -291,6 +301,7 @@ impl ClientInner { event_handlers: Default::default(), notification_handlers: Default::default(), room_update_channels: Default::default(), + roominfo_update_recv, respect_login_well_known, sync_beat: event_listener::Event::new(), #[cfg(feature = "e2e-encryption")] @@ -844,6 +855,10 @@ impl Client { } } + pub fn roominfo_update_recv(&self) -> broadcast::Receiver { + self.inner.roominfo_update_recv.resubscribe() + } + pub(crate) async fn notification_handlers( &self, ) -> RwLockReadGuard<'_, Vec> { diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 9ffe486e3..67924cb78 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -693,7 +693,11 @@ impl SlidingSync { } // Handle the response. + // This will batch changes to rooms, so no other service should run at the same + // time + let lock = this.inner.client.locks().sync_service_lock.lock().await; let updates = this.handle_response(response, &mut position_guard).await?; + drop(lock); this.cache_to_storage(&position_guard).await?;