sliding_sync: More documentation for roominfo sender/receiver

Signed-off-by: Timo Kösters <timo@koesters.xyz>
This commit is contained in:
Timo Kösters
2024-01-23 16:04:56 +01:00
committed by Benjamin Bouvier
parent 79f97504f8
commit 91331bea51
5 changed files with 20 additions and 5 deletions

View File

@@ -204,7 +204,7 @@ impl BaseClient {
/// This method panics if it is called twice.
pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> {
debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
self.store.set_session_meta(session_meta.clone(), self).await?;
self.store.set_session_meta(session_meta.clone(), &self.roominfo_update_sender).await?;
#[cfg(feature = "e2e-encryption")]
self.regenerate_olm().await?;
@@ -1426,6 +1426,8 @@ impl BaseClient {
.collect()
}
/// Returns a receiver that gets events for each room info update. To watch
/// for new events, use `receiver.resubscribe()`.
pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver<OwnedRoomId> {
&self.roominfo_update_receiver
}

View File

@@ -654,7 +654,7 @@ impl Room {
pub fn set_room_info(&self, room_info: RoomInfo) {
self.inner.set(room_info);
// Ignore error if receiver is down
// Ignore error if no receiver exists
let _ = self.roominfo_update_sender.send(self.room_id.clone());
}

View File

@@ -58,7 +58,7 @@ pub type BoxStream<T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send>>;
use crate::{
rooms::{RoomInfo, RoomState},
BaseClient, MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};
pub(crate) mod ambiguity_map;
@@ -174,14 +174,14 @@ impl Store {
pub async fn set_session_meta(
&self,
session_meta: SessionMeta,
client: &BaseClient,
roominfo_update_sender: &broadcast::Sender<OwnedRoomId>,
) -> Result<()> {
for info in self.inner.get_room_infos().await? {
let room = Room::restore(
&session_meta.user_id,
self.inner.clone(),
info,
client.roominfo_update_sender.clone(),
roominfo_update_sender.clone(),
);
self.rooms.write().unwrap().insert(room.room_id().to_owned(), room);
}

View File

@@ -444,6 +444,8 @@ impl Client {
self.base_client().session_meta()
}
/// Returns a receiver that gets events for each room info update. To watch
/// for new events, use `receiver.resubscribe()`.
pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver<OwnedRoomId> {
self.base_client().roominfo_update_receiver()
}

View File

@@ -623,6 +623,7 @@ impl wiremock::Respond for CustomResponder {
#[tokio::test]
async fn test_delayed_decryption_latest_event() -> Result<()> {
let server = MockServer::start().await;
// Setup mockserver that drops to-device messages if DROP_TODEVICE is true
server
.register(Mock::given(AnyMatcher).respond_with(CustomResponder::new(drop_todevice)))
.await;
@@ -696,6 +697,7 @@ async fn test_delayed_decryption_latest_event() -> Result<()> {
let alice_room = alice.get_room(alice_room.room_id()).unwrap();
let bob_room = bob.get_room(alice_room.room_id()).unwrap();
bob_room.join().await.unwrap();
// Send a message, but the keys won't arrive
bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?;
sleep(Duration::from_secs(1)).await;
@@ -711,13 +713,22 @@ async fn test_delayed_decryption_latest_event() -> Result<()> {
.entries_with_dynamic_adapters(10, alice.roominfo_update_receiver());
entries.set_filter(Box::new(new_filter_all(vec![])));
pin_mut!(stream);
// Stream only has the initial Reset entry
timeout(Duration::from_millis(100), stream.next()).await.unwrap();
assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());
// Latest event is not set yet
assert!(matches!(alice_room.latest_event(), None));
// Now we allow the key to come through
*DROP_TODEVICE.lock().unwrap() = false;
sleep(Duration::from_secs(1)).await;
// Latest event is set now
alice_room.latest_event().unwrap();
// The stream has a single update
timeout(Duration::from_millis(100), stream.next()).await.unwrap();
assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());