From 2649587d2ff4f5aaa44f0375a093f458c171f04b Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 13 May 2025 17:23:10 +0200 Subject: [PATCH] refactor(sdk): Use the Event Cache for `read_receipts::compute_unread_counts`. The `read_receipts::compute_unread_counts` function needs the _previous events_ to compute the read receipt correctly. These previous events were store in `SlidingSyncRoom::timeline_queue`. Since the removal of `timeline_queue` in the previous patches, this patch uses the Event Cache to fetch them. It only uses events that are loaded in memory. This is as correct as the prior behaviour, even this is still incorrect since it doesn't back-paginate to get a better view. This is for later. The goal of this patch is to restore the same behaviour, without `timeline_queue`. The main problem is that read receipts are computed in `matrix-sdk-base`, and that the Event Cache lives in `matrix-sdk`. Thus, we change the `SlidingSyncResponseProcessor` to handle read receipt in particular. The `matrix_sdk_base::response_processors::rooms::msc4186::extensions::dispa tch_ephemeral_events` function has been split in two methods `dispatch_typing_ephemeral_events`, and `dispatch_receipt_ephemeral_event_for_room`. The workflow has been a little bit redesigned to fit in the new `SlidingSyncResponseProcessor` constraints. This patch moves one test from `matrix-sdk-base` into `matrix-sdk`, because to compute the read receipt, the Event Cache must be enabled/listening to sync updates. --- crates/matrix-sdk-base/src/read_receipts.rs | 73 ++++---- .../room/msc4186/extensions.rs | 38 ++-- crates/matrix-sdk-base/src/sliding_sync.rs | 172 +++++++----------- crates/matrix-sdk/src/sliding_sync/client.rs | 166 ++++++++++++++++- crates/matrix-sdk/src/sliding_sync/mod.rs | 3 +- .../src/tests/sliding_sync/room.rs | 4 +- 6 files changed, 287 insertions(+), 169 deletions(-) diff --git a/crates/matrix-sdk-base/src/read_receipts.rs b/crates/matrix-sdk-base/src/read_receipts.rs index 31e775d56..b0fcbcc8b 100644 --- a/crates/matrix-sdk-base/src/read_receipts.rs +++ b/crates/matrix-sdk-base/src/read_receipts.rs @@ -122,7 +122,6 @@ use std::{ num::NonZeroUsize, }; -use eyeball_im::Vector; use matrix_sdk_common::{deserialized_responses::TimelineEvent, ring_buffer::RingBuffer}; use ruma::{ events::{ @@ -294,10 +293,7 @@ struct ReceiptSelector { } impl ReceiptSelector { - fn new( - all_events: &Vector, - latest_active_receipt_event: Option<&EventId>, - ) -> Self { + fn new(all_events: &[TimelineEvent], latest_active_receipt_event: Option<&EventId>) -> Self { let event_id_to_pos = Self::create_sync_index(all_events.iter()); let best_pos = @@ -457,23 +453,22 @@ pub(crate) fn compute_unread_counts( user_id: &UserId, room_id: &RoomId, receipt_event: Option<&ReceiptEventContent>, - previous_events: Vector, + mut previous_events: Vec, new_events: &[TimelineEvent], read_receipts: &mut RoomReadReceipts, ) { - debug!(?read_receipts, "Starting."); + debug!(?read_receipts, "Starting"); let all_events = if events_intersects(previous_events.iter(), new_events) { // The previous and new events sets can intersect, for instance if we restored // previous events from the disk cache, or a timeline was limited. This // means the old events will be cleared, because we don't reconcile - // timelines in sliding sync (yet). As a result, forget + // timelines in the event cache (yet). As a result, forget // about the previous events. - Vector::from_iter(new_events.iter().cloned()) + new_events.to_owned() } else { - let mut all_events = previous_events; - all_events.extend(new_events.iter().cloned()); - all_events + previous_events.extend(new_events.iter().cloned()); + previous_events }; let new_receipt = { @@ -481,6 +476,7 @@ pub(crate) fn compute_unread_counts( &all_events, read_receipts.latest_active.as_ref().map(|receipt| &*receipt.event_id), ); + selector.try_match_implicit(user_id, new_events); selector.handle_pending_receipts(&mut read_receipts.pending); if let Some(receipt_event) = receipt_event { @@ -622,7 +618,6 @@ fn marks_as_unread(event: &Raw, user_id: &UserId) -> bool mod tests { use std::{num::NonZeroUsize, ops::Not as _}; - use eyeball_im::Vector; use matrix_sdk_common::{deserialized_responses::TimelineEvent, ring_buffer::RingBuffer}; use matrix_sdk_test::event_factory::EventFactory; use ruma::{ @@ -915,7 +910,7 @@ mod tests { let room_id = room_id!("!room:example.org"); let receipt_event_id = event_id!("$1"); - let mut previous_events = Vector::new(); + let mut previous_events = Vec::new(); let f = EventFactory::new(); let ev1 = f.text_msg("A").sender(other_user_id).event_id(receipt_event_id).into_event(); @@ -940,8 +935,8 @@ mod tests { assert_eq!(read_receipts.num_unread, 1); // Receive the same receipt event, with a new sync event. - previous_events.push_back(ev1); - previous_events.push_back(ev2); + previous_events.push(ev1); + previous_events.push(ev2); let new_event = f.text_msg("A").sender(other_user_id).event_id(event_id!("$3")).into_event(); @@ -958,7 +953,7 @@ mod tests { assert_eq!(read_receipts.num_unread, 2); } - fn make_test_events(user_id: &UserId) -> Vector { + fn make_test_events(user_id: &UserId) -> Vec { let f = EventFactory::new().sender(user_id); let ev1 = f.text_msg("With the lights out, it's less dangerous").event_id(event_id!("$1")); let ev2 = f.text_msg("Here we are now, entertain us").event_id(event_id!("$2")); @@ -976,7 +971,7 @@ mod tests { let room_id = room_id!("!room:example.org"); let all_events = make_test_events(user_id!("@bob:example.org")); - let head_events: Vector<_> = all_events.iter().take(2).cloned().collect(); + let head_events: Vec<_> = all_events.iter().take(2).cloned().collect(); let tail_events: Vec<_> = all_events.iter().skip(2).cloned().collect(); // Given a receipt event marking events 1-3 as read using a combination of @@ -1165,7 +1160,7 @@ mod tests { { // No initial active receipt, so the first receipt we get *will* win. - let mut selector = ReceiptSelector::new(&vec![].into(), None); + let mut selector = ReceiptSelector::new(&[], None); selector.try_select_later(event_id!("$1"), 0); let best_receipt = selector.select(); assert_eq!(best_receipt.unwrap().event_id, event_id!("$1")); @@ -1203,11 +1198,11 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // No pending receipt => no better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); selector.handle_pending_receipts(&mut pending); @@ -1221,7 +1216,7 @@ mod tests { { // No pending receipt, and there was an active last receipt => no better // receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$1"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); selector.handle_pending_receipts(&mut pending); @@ -1239,11 +1234,11 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // A pending receipt for an event that is still missing => no better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$3")); @@ -1257,7 +1252,7 @@ mod tests { { // Ditto but there was an active receipt => no better receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$1"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$3")); @@ -1276,11 +1271,11 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // A pending receipt for an event that is present => better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$2")); @@ -1296,7 +1291,7 @@ mod tests { { // Mixed found and not found receipt => better receipt. - let mut selector = ReceiptSelector::new(&events, None); + let mut selector = ReceiptSelector::new(events, None); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$1")); @@ -1318,12 +1313,12 @@ mod tests { let f = EventFactory::new().sender(sender); let ev1 = f.text_msg("yo").event_id(event_id!("$1")).into_event(); let ev2 = f.text_msg("well?").event_id(event_id!("$2")).into_event(); - let events: Vector<_> = vec![ev1, ev2].into(); + let events = &[ev1, ev2][..]; { // Same, and there was an initial receipt that was less good than the one we // selected => better receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$1"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$2")); @@ -1339,7 +1334,7 @@ mod tests { { // Same, but the previous receipt was better => no better receipt. - let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2"))); + let mut selector = ReceiptSelector::new(events, Some(event_id!("$2"))); let mut pending = RingBuffer::new(NonZeroUsize::new(16).unwrap()); pending.push(owned_event_id!("$1")); @@ -1484,26 +1479,26 @@ mod tests { // When the selector sees only other users' events, let mut selector = ReceiptSelector::new(&events, None); // And I search for my implicit read receipt, - selector.try_match_implicit(&myself, &events.iter().cloned().collect::>()); + selector.try_match_implicit(&myself, &events); // Then I don't find any. let best_receipt = selector.select(); assert!(best_receipt.is_none()); // Now, if there are events I've written too... let f = EventFactory::new(); - events.push_back( + events.push( f.text_msg("A mulatto, an albino") .sender(&myself) .event_id(event_id!("$6")) .into_event(), ); - events.push_back( + events.push( f.text_msg("A mosquito, my libido").sender(bob).event_id(event_id!("$7")).into_event(), ); let mut selector = ReceiptSelector::new(&events, None); // And I search for my implicit read receipt, - selector.try_match_implicit(&myself, &events.iter().cloned().collect::>()); + selector.try_match_implicit(&myself, &events); // Then my last sent event counts as a read receipt. let best_receipt = selector.select(); assert_eq!(best_receipt.unwrap().event_id, event_id!("$6")); @@ -1520,7 +1515,7 @@ mod tests { // One by me, let f = EventFactory::new(); - events.push_back( + events.push( f.text_msg("A mulatto, an albino") .sender(user_id) .event_id(event_id!("$6")) @@ -1528,10 +1523,10 @@ mod tests { ); // And others by Bob, - events.push_back( + events.push( f.text_msg("A mosquito, my libido").sender(bob).event_id(event_id!("$7")).into_event(), ); - events.push_back( + events.push( f.text_msg("A denial, a denial").sender(bob).event_id(event_id!("$8")).into_event(), ); @@ -1551,7 +1546,7 @@ mod tests { user_id, room_id, Some(&receipt_event), - Vector::new(), + Vec::new(), &events, &mut read_receipts, ); diff --git a/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs b/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs index 8d13f959a..fbc182e6f 100644 --- a/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs +++ b/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs @@ -14,7 +14,12 @@ use std::collections::BTreeMap; -use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId}; +use ruma::{ + api::client::sync::sync_events::v5 as http, + events::{receipt::ReceiptEventContent, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent}, + serde::Raw, + OwnedRoomId, RoomId, +}; use super::super::super::{ account_data::for_room as account_data_for_room, ephemeral_events::dispatch_receipt, Context, @@ -25,22 +30,13 @@ use crate::{ RoomState, }; -pub fn dispatch_ephemeral_events( - context: &mut Context, - receipts: &http::response::Receipts, +/// Dispatch the ephemeral events in the `extensions.typing` part of the +/// response. +pub fn dispatch_typing_ephemeral_events( + _context: &mut Context, typing: &http::response::Typing, joined_room_updates: &mut BTreeMap, ) { - for (room_id, raw) in &receipts.rooms { - dispatch_receipt(context, raw.cast_ref(), room_id); - - joined_room_updates - .entry(room_id.to_owned()) - .or_default() - .ephemeral - .push(raw.clone().cast()); - } - for (room_id, raw) in &typing.rooms { joined_room_updates .entry(room_id.to_owned()) @@ -50,6 +46,20 @@ pub fn dispatch_ephemeral_events( } } +/// Dispatch the ephemeral event in the `extensions.receipts` part of the +/// response for a particular room. +pub fn dispatch_receipt_ephemeral_event_for_room( + context: &mut Context, + room_id: &RoomId, + receipt: &Raw>, + joined_room_update: &mut JoinedRoomUpdate, +) { + let receipt: Raw = receipt.cast_ref().clone(); + + dispatch_receipt(context, &receipt, room_id); + joined_room_update.ephemeral.push(receipt); +} + pub async fn room_account_data( context: &mut Context, account_data: &http::response::AccountData, diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 95a3e5e88..0d86c7549 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -14,7 +14,8 @@ //! Extend `BaseClient` with capabilities to handle MSC4186. -use ruma::api::client::sync::sync_events::v5 as http; +use matrix_sdk_common::deserialized_responses::TimelineEvent; +use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId}; #[cfg(feature = "e2e-encryption")] use ruma::{events::AnyToDeviceEvent, serde::Raw}; use tracing::{instrument, trace}; @@ -22,6 +23,7 @@ use tracing::{instrument, trace}; use super::BaseClient; use crate::{ error::Result, + read_receipts::compute_unread_counts, response_processors as processors, rooms::normal::RoomInfoNotableUpdateReasons, store::ambiguity_map::AmbiguityCache, @@ -194,11 +196,9 @@ impl BaseClient { // Handle read receipts and typing notifications independently of the rooms: // these both live in a different subsection of the server's response, // so they may exist without any update for the associated room. - processors::room::msc4186::extensions::dispatch_ephemeral_events( + processors::room::msc4186::extensions::dispatch_typing_ephemeral_events( &mut context, - &extensions.receipts, &extensions.typing, - // We assume this can only happen in joined rooms, or something's very wrong. &mut room_updates.joined, ); @@ -211,43 +211,6 @@ impl BaseClient { ) .await; - // Rooms in `room_updates.joined` either have a timeline update, or a new read - // receipt. Update the read receipt accordingly. - // let user_id = &self.session_meta().expect("logged in user").user_id; - - for (room_id, _joined_room_update) in &mut room_updates.joined { - if let Some(room_info) = context - .state_changes - .room_infos - .get(room_id) - .cloned() - .or_else(|| self.get_room(room_id).map(|r| r.clone_info())) - { - let prev_read_receipts = room_info.read_receipts.clone(); - - /* - compute_unread_counts( - user_id, - room_id, - context.state_changes.receipts.get(room_id), - previous_events_provider.for_room(room_id), - &joined_room_update.timeline.events, - &mut room_info.read_receipts, - ); - */ - - if prev_read_receipts != room_info.read_receipts { - context - .room_info_notable_updates - .entry(room_id.clone()) - .or_default() - .insert(RoomInfoNotableUpdateReasons::READ_RECEIPT); - - context.state_changes.add_room(room_info); - } - } - } - global_account_data_processor.apply(&mut context, &state_store).await; context.state_changes.ambiguity_maps = ambiguity_cache.cache; @@ -283,6 +246,71 @@ impl BaseClient { to_device: Default::default(), }) } + + /// Process the `receipts` extension, and compute (and save) the unread + /// counts based on read receipts, for a particular room. + #[doc(hidden)] + pub async fn process_sliding_sync_receipts_extension_for_room( + &self, + room_id: &OwnedRoomId, + response: &http::Response, + sync_response: &mut SyncResponse, + room_previous_events: Vec, + ) -> Result<()> { + let mut context = processors::Context::default(); + + let mut save_context = false; + + // Get or create the `JoinedRoomUpdate`, so that we can push the receipt + // ephemeral event, and compute the unread counts. + let joined_room_update = sync_response.rooms.joined.entry(room_id.to_owned()).or_default(); + + // Handle the receipt ephemeral event. + if let Some(receipt_ephemeral_event) = response.extensions.receipts.rooms.get(room_id) { + processors::room::msc4186::extensions::dispatch_receipt_ephemeral_event_for_room( + &mut context, + room_id, + receipt_ephemeral_event, + joined_room_update, + ); + save_context = true; + } + + let user_id = &self.session_meta().expect("logged in user").user_id; + + // Rooms in `room_updates.joined` either have a timeline update, or a new read + // receipt. Update the read receipt accordingly. + if let Some(mut room_info) = self.get_room(room_id).map(|room| room.clone_info()) { + let prev_read_receipts = room_info.read_receipts.clone(); + + compute_unread_counts( + user_id, + room_id, + context.state_changes.receipts.get(room_id), + room_previous_events, + &joined_room_update.timeline.events, + &mut room_info.read_receipts, + ); + + if prev_read_receipts != room_info.read_receipts { + context + .room_info_notable_updates + .entry(room_id.clone()) + .or_default() + .insert(RoomInfoNotableUpdateReasons::READ_RECEIPT); + + context.state_changes.add_room(room_info); + save_context = true; + } + } + + // Save the new `RoomInfo` if updated. + if save_context { + processors::changes::save_only(context, &self.state_store).await?; + } + + Ok(()) + } } #[cfg(all(test, not(target_family = "wasm")))] @@ -1848,68 +1876,6 @@ mod tests { assert!(room_info_notable_update_stream.is_empty()); } - #[async_test] - async fn test_read_receipt_can_trigger_a_notable_update_reason() { - // Given a logged-in client - let client = logged_in_base_client(None).await; - let mut room_info_notable_update_stream = client.room_info_notable_update_receiver(); - - // When I send sliding sync response containing a new room. - let room_id = room_id!("!r:e.uk"); - let room = http::response::Room::new(); - let response = response_with_room(room_id, room); - client - .process_sliding_sync(&response, &RequestedRequiredStates::default()) - .await - .expect("Failed to process sync"); - - // Then a room info notable update is received, but not the one we are - // interested by. - assert_matches!( - room_info_notable_update_stream.recv().await, - Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { - assert_eq!(received_room_id, room_id); - assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT)); - } - ); - assert_matches!( - room_info_notable_update_stream.recv().await, - Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { - assert_eq!(received_room_id, room_id); - assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME)); - } - ); - assert!(room_info_notable_update_stream.is_empty()); - - // When I send sliding sync response containing a couple of events with no read - // receipt. - let room_id = room_id!("!r:e.uk"); - let events = vec![ - make_raw_event("m.room.message", "$3"), - make_raw_event("m.room.message", "$4"), - make_raw_event("m.read", "$5"), - ]; - let room = assign!(http::response::Room::new(), { - timeline: events, - }); - let response = response_with_room(room_id, room); - client - .process_sliding_sync(&response, &RequestedRequiredStates::default()) - .await - .expect("Failed to process sync"); - - // Then a room info notable update is received. - assert_matches!( - room_info_notable_update_stream.recv().await, - Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { - assert_eq!(received_room_id, room_id); - // assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT)); - assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE)); - } - ); - assert!(room_info_notable_update_stream.is_empty()); - } - #[async_test] async fn test_leaving_room_can_trigger_a_notable_update_reason() { // Given a logged-in client diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index c95ef2663..d8e0a1746 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use matrix_sdk_base::{sync::SyncResponse, RequestedRequiredStates}; use ruma::{ api::client::{discovery::get_supported_versions, sync::sync_events::v5 as http}, @@ -192,12 +194,14 @@ impl SlidingSyncResponseProcessor { response: &http::Response, requested_required_states: &RequestedRequiredStates, ) -> Result<()> { - self.response = Some( - self.client - .base_client() - .process_sliding_sync(response, requested_required_states) - .await?, - ); + let mut sync_response = self + .client + .base_client() + .process_sliding_sync(response, requested_required_states) + .await?; + handle_receipts_extension(&self.client, response, &mut sync_response).await?; + + self.response = Some(sync_response); self.post_process().await } @@ -238,14 +242,60 @@ async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Re Ok(()) } +/// Update the receipts extension and compute the read receipt accordingly. +async fn handle_receipts_extension( + client: &Client, + response: &http::Response, + sync_response: &mut SyncResponse, +) -> Result<()> { + // We need to compute read receipts for each joined room that has received an + // update, or from each room that has received a receipt ephemeral event. + let room_ids = BTreeSet::from_iter( + sync_response + .rooms + .joined + .keys() + .cloned() + .chain(response.extensions.receipts.rooms.keys().cloned()), + ); + + for room_id in room_ids { + let Ok((room_event_cache, _drop_handle)) = client.event_cache().for_room(&room_id).await + else { + tracing::info!( + ?room_id, + "Failed to fetch the `RoomEventCache` when computing unread counts" + ); + + continue; + }; + + let previous_events = room_event_cache.events().await; + + client + .base_client() + .process_sliding_sync_receipts_extension_for_room( + &room_id, + response, + sync_response, + previous_events, + ) + .await?; + } + Ok(()) +} + #[cfg(all(test, not(target_family = "wasm")))] mod tests { use std::collections::BTreeMap; use assert_matches::assert_matches; - use matrix_sdk_base::{notification_settings::RoomNotificationMode, RequestedRequiredStates}; + use matrix_sdk_base::{ + notification_settings::RoomNotificationMode, RequestedRequiredStates, + RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, + }; use matrix_sdk_test::async_test; - use ruma::{assign, room_id, serde::Raw}; + use ruma::{assign, events::AnySyncTimelineEvent, room_id, serde::Raw}; use serde_json::json; use wiremock::{ matchers::{method, path}, @@ -255,8 +305,8 @@ mod tests { use super::{get_supported_versions, Version, VersionBuilder}; use crate::{ error::Result, - sliding_sync::{http, VersionBuilderError}, - test_utils::logged_in_client_with_server, + sliding_sync::{client::SlidingSyncResponseProcessor, http, VersionBuilderError}, + test_utils::{logged_in_client, logged_in_client_with_server}, SlidingSyncList, SlidingSyncMode, }; @@ -452,4 +502,100 @@ mod tests { Ok(()) } + + #[async_test] + async fn test_read_receipt_can_trigger_a_notable_update_reason() { + use ruma::api::client::sync::sync_events::v5 as http; + + // Given a logged-in client + let client = logged_in_client(None).await; + client.event_cache().subscribe().unwrap(); + + let mut room_info_notable_update_stream = client.room_info_notable_update_receiver(); + + // When I send sliding sync response containing a new room. + let room_id = room_id!("!r:e.uk"); + let room = http::response::Room::new(); + let mut response = http::Response::new("5".to_owned()); + response.rooms.insert(room_id.to_owned(), room); + + let mut processor = SlidingSyncResponseProcessor::new(client.clone()); + processor + .handle_room_response(&response, &RequestedRequiredStates::default()) + .await + .expect("Failed to process sync"); + processor.process_and_take_response().await.expect("Failed to finish processing sync"); + + // Then room info notable updates are received. + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}"); + } + ); + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}"); + } + ); + assert!(room_info_notable_update_stream.is_empty()); + + // When I send sliding sync response containing a couple of events with no read + // receipt. + let room_id = room_id!("!r:e.uk"); + let events = vec![ + make_raw_event("m.room.message", "$3"), + make_raw_event("m.room.message", "$4"), + make_raw_event("m.read", "$5"), + ]; + let room = assign!(http::response::Room::new(), { + timeline: events, + }); + let mut response = http::Response::new("5".to_owned()); + response.rooms.insert(room_id.to_owned(), room); + + let mut processor = SlidingSyncResponseProcessor::new(client.clone()); + processor + .handle_room_response(&response, &RequestedRequiredStates::default()) + .await + .expect("Failed to process sync"); + processor.process_and_take_response().await.expect("Failed to finish processing sync"); + + // Then room info notable updates are received. + // + // `NONE` because the regular sync process ends up to updating nothing. + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}"); + } + ); + // `READ_RECEIPT` because this is what we expect. + assert_matches!( + room_info_notable_update_stream.recv().await, + Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => { + assert_eq!(received_room_id, room_id); + assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}"); + } + ); + assert!(room_info_notable_update_stream.is_empty()); + } + + fn make_raw_event(event_type: &str, id: &str) -> Raw { + Raw::from_json_string( + json!({ + "type": event_type, + "event_id": id, + "content": { "msgtype": "m.text", "body": "my msg" }, + "sender": "@u:h.uk", + "origin_server_ts": 12344445, + }) + .to_string(), + ) + .unwrap() + } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 7d5fe7189..093332449 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -344,7 +344,7 @@ impl SlidingSync { // the `sync_response.join`. Mark them as updated too. // // Since we've removed rooms that were in the room subsection from - // `sync_response.rooms.join`, the remaining ones aren't already present in + // `sync_response.rooms.joined`, the remaining ones aren't already present in // `updated_rooms` and wouldn't cause any duplicates. updated_rooms.extend(sync_response.rooms.joined.keys().cloned()); @@ -2072,6 +2072,7 @@ mod tests { let server = MockServer::start().await; let client = logged_in_client(Some(server.uri())).await; + client.event_cache().subscribe().unwrap(); let sliding_sync = client .sliding_sync("test")? diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index 6aa19cb13..28f20b784 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -393,7 +393,6 @@ impl UpdateObserver { } } -/* #[tokio::test] async fn test_room_notification_count() -> Result<()> { use tokio::time::timeout; @@ -401,6 +400,8 @@ async fn test_room_notification_count() -> Result<()> { let bob = TestClientBuilder::new("bob").use_sqlite().build().await?; let alice = TestClientBuilder::new("alice").use_sqlite().build().await?; + alice.event_cache().subscribe().unwrap(); + // Spawn sync for Bob. spawn({ let bob = bob.clone(); @@ -643,7 +644,6 @@ async fn test_room_notification_count() -> Result<()> { Ok(()) } -*/ /// Response preprocessor that drops to_device events fn drop_todevice_events(response: &mut Bytes) {