diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 5e5da27c7..17ad50e47 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -204,7 +204,7 @@ impl BaseClient { /// This method panics if it is called twice. pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> { debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login"); - self.store.set_session_meta(session_meta.clone(), self).await?; + self.store.set_session_meta(session_meta.clone(), &self.roominfo_update_sender).await?; #[cfg(feature = "e2e-encryption")] self.regenerate_olm().await?; @@ -1426,6 +1426,8 @@ impl BaseClient { .collect() } + /// Returns a receiver that gets events for each room info update. To watch + /// for new events, use `receiver.resubscribe()`. pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver { &self.roominfo_update_receiver } diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 0a579c97e..724bfcfae 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -654,7 +654,7 @@ impl Room { pub fn set_room_info(&self, room_info: RoomInfo) { self.inner.set(room_info); - // Ignore error if receiver is down + // Ignore error if no receiver exists let _ = self.roominfo_update_sender.send(self.room_id.clone()); } diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 1fab4f08f..149538e79 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -58,7 +58,7 @@ pub type BoxStream = Pin + Send>>; use crate::{ rooms::{RoomInfo, RoomState}, - BaseClient, MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta, + MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta, }; pub(crate) mod ambiguity_map; @@ -174,14 +174,14 @@ impl Store { pub async fn set_session_meta( &self, session_meta: SessionMeta, - client: &BaseClient, + roominfo_update_sender: &broadcast::Sender, ) -> Result<()> { for info in self.inner.get_room_infos().await? { let room = Room::restore( &session_meta.user_id, self.inner.clone(), info, - client.roominfo_update_sender.clone(), + roominfo_update_sender.clone(), ); self.rooms.write().unwrap().insert(room.room_id().to_owned(), room); } diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 67301f0e0..49ef310aa 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -444,6 +444,8 @@ impl Client { self.base_client().session_meta() } + /// Returns a receiver that gets events for each room info update. To watch + /// for new events, use `receiver.resubscribe()`. pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver { self.base_client().roominfo_update_receiver() } diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index 6042d699a..9967aadc1 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -623,6 +623,7 @@ impl wiremock::Respond for CustomResponder { #[tokio::test] async fn test_delayed_decryption_latest_event() -> Result<()> { let server = MockServer::start().await; + // Setup mockserver that drops to-device messages if DROP_TODEVICE is true server .register(Mock::given(AnyMatcher).respond_with(CustomResponder::new(drop_todevice))) .await; @@ -696,6 +697,7 @@ async fn test_delayed_decryption_latest_event() -> Result<()> { let alice_room = alice.get_room(alice_room.room_id()).unwrap(); let bob_room = bob.get_room(alice_room.room_id()).unwrap(); bob_room.join().await.unwrap(); + // Send a message, but the keys won't arrive bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; sleep(Duration::from_secs(1)).await; @@ -711,13 +713,22 @@ async fn test_delayed_decryption_latest_event() -> Result<()> { .entries_with_dynamic_adapters(10, alice.roominfo_update_receiver()); entries.set_filter(Box::new(new_filter_all(vec![]))); pin_mut!(stream); + + // Stream only has the initial Reset entry timeout(Duration::from_millis(100), stream.next()).await.unwrap(); assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + + // Latest event is not set yet assert!(matches!(alice_room.latest_event(), None)); + // Now we allow the key to come through *DROP_TODEVICE.lock().unwrap() = false; sleep(Duration::from_secs(1)).await; + + // Latest event is set now alice_room.latest_event().unwrap(); + + // The stream has a single update timeout(Duration::from_millis(100), stream.next()).await.unwrap(); assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());