diff --git a/crates/matrix-sdk-base/src/read_receipts.rs b/crates/matrix-sdk-base/src/read_receipts.rs index 31e775d56..b0fcbcc8b 100644 --- a/crates/matrix-sdk-base/src/read_receipts.rs +++ b/crates/matrix-sdk-base/src/read_receipts.rs @@ -122,7 +122,6 @@ use std::{ num::NonZeroUsize, }; -use eyeball_im::Vector; use matrix_sdk_common::{deserialized_responses::TimelineEvent, ring_buffer::RingBuffer}; use ruma::{ events::{ @@ -294,10 +293,7 @@ struct ReceiptSelector { } impl ReceiptSelector { - fn new( - all_events: &Vector, - latest_active_receipt_event: Option<&EventId>, - ) -> Self { + fn new(all_events: &[TimelineEvent], latest_active_receipt_event: Option<&EventId>) -> Self { let event_id_to_pos = Self::create_sync_index(all_events.iter()); let best_pos = @@ -457,23 +453,22 @@ pub(crate) fn compute_unread_counts( user_id: &UserId, room_id: &RoomId, receipt_event: Option<&ReceiptEventContent>, - previous_events: Vector, + mut previous_events: Vec, new_events: &[TimelineEvent], read_receipts: &mut RoomReadReceipts, ) { - debug!(?read_receipts, "Starting."); + debug!(?read_receipts, "Starting"); let all_events = if events_intersects(previous_events.iter(), new_events) { // The previous and new events sets can intersect, for instance if we restored // previous events from the disk cache, or a timeline was limited. This // means the old events will be cleared, because we don't reconcile - // timelines in sliding sync (yet). As a result, forget + // timelines in the event cache (yet). As a result, forget // about the previous events. - Vector::from_iter(new_events.iter().cloned()) + new_events.to_owned() } else { - let mut all_events = previous_events; - all_events.extend(new_events.iter().cloned()); - all_events + previous_events.extend(new_events.iter().cloned()); + previous_events }; let new_receipt = { @@ -481,6 +476,7 @@ pub(crate) fn compute_unread_counts( &all_events, read_receipts.latest_active.as_ref().map(|receipt| &*receipt.event_id), ); + selector.try_match_implicit(user_id, new_events); selector.handle_pending_receipts(&mut read_receipts.pending); if let Some(receipt_event) = receipt_event { @@ -622,7 +618,6 @@ fn marks_as_unread(event: &Raw, user_id: &UserId) -> bool mod tests { use std::{num::NonZeroUsize, ops::Not as _}; - use eyeball_im::Vector; use matrix_sdk_common::{deserialized_responses::TimelineEvent, ring_buffer::RingBuffer}; use matrix_sdk_test::event_factory::EventFactory; use ruma::{ @@ -915,7 +910,7 @@ mod tests { let room_id = room_id!("!room:example.org"); let receipt_event_id = event_id!("$1"); - let mut previous_events = Vector::new(); + let mut previous_events = Vec::new(); let f = EventFactory::new(); let ev1 = f.text_msg("A").sender(other_user_id).event_id(receipt_event_id).into_event(); @@ -940,8 +935,8 @@ mod tests { assert_eq!(read_receipts.num_unread, 1); // Receive the same receipt event, with a new sync event. - previous_events.push_back(ev1); - previous_events.push_back(ev2); + previous_events.push(ev1); + previous_events.push(ev2); let new_event = f.text_msg("A").sender(other_user_id).event_id(event_id!("$3")).into_event(); @@ -958,7 +953,7 @@ mod tests { assert_eq!(read_receipts.num_unread, 2); } - fn make_test_events(user_id: &UserId) -> Vector { + fn make_test_events(user_id: &UserId) -> Vec { let f = EventFactory::new().sender(user_id); let ev1 = f.text_msg("With the lights out, it's less dangerous").event_id(event_id!("$1")); let ev2 = f.text_msg("Here we are now, entertain us").event_id(event_id!("$2")); @@ -976,7 +971,7 @@ mod tests { let room_id = room_id!("!room:example.org"); let all_events = make_test_events(user_id!("@bob:example.org")); - let head_events: Vector<_> = all_events.iter().take(2).cloned().collect(); + let head_events: Vec<_> = all_events.iter().take(2).cloned().collect(); let tail_events: Vec<_> = all_events.iter().skip(2).cloned().collect(); // Given a receipt event marking events 1-3 as read using a combination of @@ -1165,7 +1160,7 @@ mod tests { { // No initial active receipt, so the first receipt we get *will* win. - let mut selector = ReceiptSelector::new(&vec![].into(), None); + let mut selector = ReceiptSelector::new(&[], None); selector.try_select_later(event_id!("$1"), 0); let best_receipt = selector.select(); assert_eq!(best_receipt.unwrap().event_id, event_id!("$1")); @@ -1203,11 +1198,11 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // No pending receipt => no better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); selector.handle_pending_receipts(&mut pending); @@ -1221,7 +1216,7 @@ mod tests { { // No pending receipt, and there was an active last receipt => no better // receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$1"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); selector.handle_pending_receipts(&mut pending); @@ -1239,11 +1234,11 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // A pending receipt for an event that is still missing => no better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$3")); @@ -1257,7 +1252,7 @@ mod tests { { // Ditto but there was an active receipt => no better receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$1"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$3")); @@ -1276,11 +1271,11 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // A pending receipt for an event that is present => better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$2")); @@ -1296,7 +1291,7 @@ mod tests { { // Mixed found and not found receipt => better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$1")); @@ -1318,12 +1313,12 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // Same, and there was an initial receipt that was less good than the one we // selected => better receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$1"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$2")); @@ -1339,7 +1334,7 @@ mod tests { { // Same, but the previous receipt was better => no better receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$2"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$1")); @@ -1484,26 +1479,26 @@ mod tests { // When the selector sees only other users' events, let mut selector = ReceiptSelector::new(&events, None); // And I search for my implicit read receipt, - selector.try_match_implicit(&myself, &events.iter().cloned().collect::>()); + selector.try_match_implicit(&myself, &events); // Then I don't find any. let best_receipt = selector.select(); assert!(best_receipt.is_none()); // Now, if there are events I've written too... let f = EventFactory::new(); - events.push_back( + events.push( f.text_msg("A mulatto, an albino") .sender(&myself) .event_id(event_id!("$6")) .into_event(), ); - events.push_back( + events.push( f.text_msg("A mosquito, my libido").sender(bob).event_id(event_id!("$7")).into_event(), ); let mut selector = ReceiptSelector::new(&events, None); // And I search for my implicit read receipt, - selector.try_match_implicit(&myself, &events.iter().cloned().collect::>()); + selector.try_match_implicit(&myself, &events); // Then my last sent event counts as a read receipt. let best_receipt = selector.select(); assert_eq!(best_receipt.unwrap().event_id, event_id!("$6")); @@ -1520,7 +1515,7 @@ mod tests { // One by me, let f = EventFactory::new(); - events.push_back( + events.push( f.text_msg("A mulatto, an albino") .sender(user_id) .event_id(event_id!("$6")) @@ -1528,10 +1523,10 @@ mod tests { ); // And others by Bob, - events.push_back( + events.push( f.text_msg("A mosquito, my libido").sender(bob).event_id(event_id!("$7")).into_event(), ); - events.push_back( + events.push( f.text_msg("A denial, a denial").sender(bob).event_id(event_id!("$8")).into_event(), ); @@ -1551,7 +1546,7 @@ mod tests { user_id, room_id, Some(&receipt_event), - Vector::new(), + Vec::new(), &events, &mut read_receipts, ); diff --git a/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs b/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs index 8d13f959a..fbc182e6f 100644 --- a/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs +++ b/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs @@ -14,7 +14,12 @@ use std::collections::BTreeMap; -use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId}; +use ruma::{ + api::client::sync::sync_events::v5 as http, + events::{receipt::ReceiptEventContent, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent}, + serde::Raw, + OwnedRoomId, RoomId, +}; use super::super::super::{ account_data::for_room as account_data_for_room, ephemeral_events::dispatch_receipt, Context, @@ -25,22 +30,13 @@ use crate::{ RoomState, }; -pub fn dispatch_ephemeral_events( - context: &mut Context, - receipts: &http::response::Receipts, +/// Dispatch the ephemeral events in the `extensions.typing` part of the +/// response. +pub fn dispatch_typing_ephemeral_events( + _context: &mut Context, typing: &http::response::Typing, joined_room_updates: &mut BTreeMap, ) { - for (room_id, raw) in &receipts.rooms { - dispatch_receipt(context, raw.cast_ref(), room_id); - - joined_room_updates - .entry(room_id.to_owned()) - .or_default() - .ephemeral - .push(raw.clone().cast()); - } - for (room_id, raw) in &typing.rooms { joined_room_updates .entry(room_id.to_owned()) @@ -50,6 +46,20 @@ pub fn dispatch_ephemeral_events( } } +/// Dispatch the ephemeral event in the `extensions.receipts` part of the +/// response for a particular room. +pub fn dispatch_receipt_ephemeral_event_for_room( + context: &mut Context, + room_id: &RoomId, + receipt: &Raw>, + joined_room_update: &mut JoinedRoomUpdate, +) { + let receipt: Raw = receipt.cast_ref().clone(); + + dispatch_receipt(context, &receipt, room_id); + joined_room_update.ephemeral.push(receipt); +} + pub async fn room_account_data( context: &mut Context, account_data: &http::response::AccountData, diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 95a3e5e88..0d86c7549 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -14,7 +14,8 @@ //! Extend `BaseClient` with capabilities to handle MSC4186. -use ruma::api::client::sync::sync_events::v5 as http; +use matrix_sdk_common::deserialized_responses::TimelineEvent; +use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId}; #[cfg(feature = "e2e-encryption")] use ruma::{events::AnyToDeviceEvent, serde::Raw}; use tracing::{instrument, trace}; @@ -22,6 +23,7 @@ use tracing::{instrument, trace}; use super::BaseClient; use crate::{ error::Result, + read_receipts::compute_unread_counts, response_processors as processors, rooms::normal::RoomInfoNotableUpdateReasons, store::ambiguity_map::AmbiguityCache, @@ -194,11 +196,9 @@ impl BaseClient { // Handle read receipts and typing notifications independently of the rooms: // these both live in a different subsection of the server's response, // so they may exist without any update for the associated room. - processors::room::msc4186::extensions::dispatch_ephemeral_events( + processors::room::msc4186::extensions::dispatch_typing_ephemeral_events( &mut context, - &extensions.receipts, &extensions.typing, - // We assume this can only happen in joined rooms, or something's very wrong. &mut room_updates.joined, ); @@ -211,43 +211,6 @@ impl BaseClient { ) .await; - // Rooms in `room_updates.joined` either have a timeline update, or a new read - // receipt. Update the read receipt accordingly. - // let user_id = &self.session_meta().expect("logged in user").user_id; - - for (room_id, _joined_room_update) in &mut room_updates.joined { - if let Some(room_info) = context - .state_changes - .room_infos - .get(room_id) - .cloned() - .or_else(|| self.get_room(room_id).map(|r| r.clone_info())) - { - let prev_read_receipts = room_info.read_receipts.clone(); - - /* - compute_unread_counts( - user_id, - room_id, - context.state_changes.receipts.get(room_id), - previous_events_provider.for_room(room_id), - &joined_room_update.timeline.events, - &mut room_info.read_receipts, - ); - */ - - if prev_read_receipts != room_info.read_receipts { - context - .room_info_notable_updates - .entry(room_id.clone()) - .or_default() - .insert(RoomInfoNotableUpdateReasons::READ_RECEIPT); - - context.state_changes.add_room(room_info); - } - } - } - global_account_data_processor.apply(&mut context, &state_store).await; context.state_changes.ambiguity_maps = ambiguity_cache.cache; @@ -283,6 +246,71 @@ impl BaseClient { to_device: Default::default(), }) } + + /// Process the `receipts` extension, and compute (and save) the unread + /// counts based on read receipts, for a particular room. + #[doc(hidden)] + pub async fn process_sliding_sync_receipts_extension_for_room( + &self, + room_id: &OwnedRoomId, + response: &http::Response, + sync_response: &mut SyncResponse, + room_previous_events: Vec, + ) -> Result<()> { + let mut context = processors::Context::default(); + + let mut save_context = false; + + // Get or create the `JoinedRoomUpdate`, so that we can push the receipt + // ephemeral event, and compute the unread counts. + let joined_room_update = sync_response.rooms.joined.entry(room_id.to_owned()).or_default(); + + // Handle the receipt ephemeral event. + if let Some(receipt_ephemeral_event) = response.extensions.receipts.rooms.get(room_id) { + processors::room::msc4186::extensions::dispatch_receipt_ephemeral_event_for_room( + &mut context, + room_id, + receipt_ephemeral_event, + joined_room_update, + ); + save_context = true; + } + + let user_id = &self.session_meta().expect("logged in user").user_id; + + // Rooms in `room_updates.joined` either have a timeline update, or a new read + // receipt. Update the read receipt accordingly. + if let Some(mut room_info) = self.get_room(room_id).map(|room| room.clone_info()) { + let prev_read_receipts = room_info.read_receipts.clone(); + + compute_unread_counts( + user_id, + room_id, + context.state_changes.receipts.get(room_id), + room_previous_events, + &joined_room_update.timeline.events, + &mut room_info.read_receipts, + ); + + if prev_read_receipts != room_info.read_receipts { + context + .room_info_notable_updates + .entry(room_id.clone()) + .or_default() + .insert(RoomInfoNotableUpdateReasons::READ_RECEIPT); + + context.state_changes.add_room(room_info); + save_context = true; + } + } + + // Save the new `RoomInfo` if updated. + if save_context { + processors::changes::save_only(context, &self.state_store).await?; + } + + Ok(()) + } } #[cfg(all(test, not(target_family = "wasm")))] @@ -1848,68 +1876,6 @@ mod tests { assert!(room_info_notable_update_stream.is_empty()); } - #[async_test] - async fn test_read_receipt_can_trigger_a_notable_update_reason() { - // Given a logged-in client - let client = logged_in_base_client(None).await; - let mut room_info_notable_update_stream = client.room_info_notable_update_receiver(); - - // When I send sliding sync response containing a new room. - let room_id = room_id!("!r:e.uk"); - let room = http::response::Room::new(); - let response = response_with_room(room_id, room); - client - .process_sliding_sync(&response, &RequestedRequiredStates::default()) - .await - .expect("Failed to process sync"); - - // Then a room info notable update is received, but not the one we are - // interested by. - assert_matches!( - room_info_notable_update_stream.recv().await, - Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { - assert_eq!(received_room_id, room_id); - assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT)); - } - ); - assert_matches!( - room_info_notable_update_stream.recv().await, - Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { - assert_eq!(received_room_id, room_id); - assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME)); - } - ); - assert!(room_info_notable_update_stream.is_empty()); - - // When I send sliding sync response containing a couple of events with no read - // receipt. - let room_id = room_id!("!r:e.uk"); - let events = vec![ - make_raw_event("m.room.message", "$3"), - make_raw_event("m.room.message", "$4"), - make_raw_event("m.read", "$5"), - ]; - let room = assign!(http::response::Room::new(), { - timeline: events, - }); - let response = response_with_room(room_id, room); - client - .process_sliding_sync(&response, &RequestedRequiredStates::default()) - .await - .expect("Failed to process sync"); - - // Then a room info notable update is received. - assert_matches!( - room_info_notable_update_stream.recv().await, - Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { - assert_eq!(received_room_id, room_id); - // assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT)); - assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE)); - } - ); - assert!(room_info_notable_update_stream.is_empty()); - } - #[async_test] async fn test_leaving_room_can_trigger_a_notable_update_reason() { // Given a logged-in client diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index c95ef2663..d8e0a1746 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use matrix_sdk_base::{sync::SyncResponse, RequestedRequiredStates}; use ruma::{ api::client::{discovery::get_supported_versions, sync::sync_events::v5 as http}, @@ -192,12 +194,14 @@ impl SlidingSyncResponseProcessor { response: &http::Response, requested_required_states: &RequestedRequiredStates, ) -> Result<()> { - self.response = Some( - self.client - .base_client() - .process_sliding_sync(response, requested_required_states) - .await?, - ); + let mut sync_response = self + .client + .base_client() + .process_sliding_sync(response, requested_required_states) + .await?; + handle_receipts_extension(&self.client, response, &mut sync_response).await?; + + self.response = Some(sync_response); self.post_process().await } @@ -238,14 +242,60 @@ async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Re Ok(()) } +/// Update the receipts extension and compute the read receipt accordingly. +async fn handle_receipts_extension( + client: &Client, + response: &http::Response, + sync_response: &mut SyncResponse, +) -> Result<()> { + // We need to compute read receipts for each joined room that has received an + // update, or from each room that has received a receipt ephemeral event. + let room_ids = BTreeSet::from_iter( + sync_response + .rooms + .joined + .keys() + .cloned() + .chain(response.extensions.receipts.rooms.keys().cloned()), + ); + + for room_id in room_ids { + let Ok((room_event_cache, _drop_handle)) = client.event_cache().for_room(&room_id).await + else { + tracing::info!( + ?room_id, + "Failed to fetch the `RoomEventCache` when computing unread counts" + ); + + continue; + }; + + let previous_events = room_event_cache.events().await; + + client + .base_client() + .process_sliding_sync_receipts_extension_for_room( + &room_id, + response, + sync_response, + previous_events, + ) + .await?; + } + Ok(()) +} + #[cfg(all(test, not(target_family = "wasm")))] mod tests { use std::collections::BTreeMap; use assert_matches::assert_matches; - use matrix_sdk_base::{notification_settings::RoomNotificationMode, RequestedRequiredStates}; + use matrix_sdk_base::{ + notification_settings::RoomNotificationMode, RequestedRequiredStates, + RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, + }; use matrix_sdk_test::async_test; - use ruma::{assign, room_id, serde::Raw}; + use ruma::{assign, events::AnySyncTimelineEvent, room_id, serde::Raw}; use serde_json::json; use wiremock::{ matchers::{method, path}, @@ -255,8 +305,8 @@ mod tests { use super::{get_supported_versions, Version, VersionBuilder}; use crate::{ error::Result, - sliding_sync::{http, VersionBuilderError}, - test_utils::logged_in_client_with_server, + sliding_sync::{client::SlidingSyncResponseProcessor, http, VersionBuilderError}, + test_utils::{logged_in_client, logged_in_client_with_server}, SlidingSyncList, SlidingSyncMode, }; @@ -452,4 +502,100 @@ mod tests { Ok(()) } + + #[async_test] + async fn test_read_receipt_can_trigger_a_notable_update_reason() { + use ruma::api::client::sync::sync_events::v5 as http; + + // Given a logged-in client + let client = logged_in_client(None).await; + client.event_cache().subscribe().unwrap(); + + let mut room_info_notable_update_stream = client.room_info_notable_update_receiver(); + + // When I send sliding sync response containing a new room. + let room_id = room_id!("!r:e.uk"); + let room = http::response::Room::new(); + let mut response = http::Response::new("5".to_owned()); + response.rooms.insert(room_id.to_owned(), room); + + let mut processor = SlidingSyncResponseProcessor::new(client.clone()); + processor + .handle_room_response(&response, &RequestedRequiredStates::default()) + .await + .expect("Failed to process sync"); + processor.process_and_take_response().await.expect("Failed to finish processing sync"); + + // Then room info notable updates are received. + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}"); + } + ); + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}"); + } + ); + assert!(room_info_notable_update_stream.is_empty()); + + // When I send sliding sync response containing a couple of events with no read + // receipt. + let room_id = room_id!("!r:e.uk"); + let events = vec![ + make_raw_event("m.room.message", "$3"), + make_raw_event("m.room.message", "$4"), + make_raw_event("m.read", "$5"), + ]; + let room = assign!(http::response::Room::new(), { + timeline: events, + }); + let mut response = http::Response::new("5".to_owned()); + response.rooms.insert(room_id.to_owned(), room); + + let mut processor = SlidingSyncResponseProcessor::new(client.clone()); + processor + .handle_room_response(&response, &RequestedRequiredStates::default()) + .await + .expect("Failed to process sync"); + processor.process_and_take_response().await.expect("Failed to finish processing sync"); + + // Then room info notable updates are received. + // + // `NONE` because the regular sync process ends up to updating nothing. + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}"); + } + ); + // `READ_RECEIPT` because this is what we expect. + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}"); + } + ); + assert!(room_info_notable_update_stream.is_empty()); + } + + fn make_raw_event(event_type: &str, id: &str) -> Raw { + Raw::from_json_string( + json!({ + "type": event_type, + "event_id": id, + "content": { "msgtype": "m.text", "body": "my msg" }, + "sender": "@u:h.uk", + "origin_server_ts": 12344445, + }) + .to_string(), + ) + .unwrap() + } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 7d5fe7189..093332449 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -344,7 +344,7 @@ impl SlidingSync { // the `sync_response.join`. Mark them as updated too. // // Since we've removed rooms that were in the room subsection from - // `sync_response.rooms.join`, the remaining ones aren't already present in + // `sync_response.rooms.joined`, the remaining ones aren't already present in // `updated_rooms` and wouldn't cause any duplicates. updated_rooms.extend(sync_response.rooms.joined.keys().cloned()); @@ -2072,6 +2072,7 @@ mod tests { let server = MockServer::start().await; let client = logged_in_client(Some(server.uri())).await; + client.event_cache().subscribe().unwrap(); let sliding_sync = client .sliding_sync("test")? diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index 6aa19cb13..28f20b784 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -393,7 +393,6 @@ impl UpdateObserver { } } -/* #[tokio::test] async fn test_room_notification_count() -> Result<()> { use tokio::time::timeout; @@ -401,6 +400,8 @@ async fn test_room_notification_count() -> Result<()> { let bob = TestClientBuilder::new("bob").use_sqlite().build().await?; let alice = TestClientBuilder::new("alice").use_sqlite().build().await?; + alice.event_cache().subscribe().unwrap(); + // Spawn sync for Bob. spawn({ let bob = bob.clone(); @@ -643,7 +644,6 @@ async fn test_room_notification_count() -> Result<()> { Ok(()) } -*/ /// Response preprocessor that drops to_device events fn drop_todevice_events(response: &mut Bytes) {