diff --git a/crates/matrix-sdk-base/src/room/room_info.rs b/crates/matrix-sdk-base/src/room/room_info.rs index 9cd8a394e..04a1c7f4a 100644 --- a/crates/matrix-sdk-base/src/room/room_info.rs +++ b/crates/matrix-sdk-base/src/room/room_info.rs @@ -1103,6 +1103,16 @@ impl RoomInfo { .is_some_and(|pinned| pinned.contains(&event_id.to_owned())) } + /// Returns the computed read receipts for this room. + pub fn read_receipts(&self) -> &RoomReadReceipts { + &self.read_receipts + } + + /// Set the computed read receipts for this room. + pub fn set_read_receipts(&mut self, read_receipts: RoomReadReceipts) { + self.read_receipts = read_receipts; + } + /// Apply migrations to this `RoomInfo` if needed. /// /// This should be used to populate new fields with data from the state diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 458879f21..cae35af4d 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -1971,6 +1971,16 @@ async fn test_room_sorting() -> Result<(), Error> { end; }; + // All rooms get new messages, so their entries will get updates because of read + // receipt updates. + // + // Starting with r0. + assert_entries_batch! { + [stream] + set [ 0 ] [ "!r0:bar.org" ]; + end; + }; + // Now we have: // // | index | room ID | recency | name | @@ -1988,6 +1998,13 @@ async fn test_room_sorting() -> Result<(), Error> { end; }; + // Read receipt update for r1. + assert_entries_batch! { + [stream] + set [ 1 ] [ "!r1:bar.org" ]; + end; + }; + // Now we have: // // | index | room ID | recency | name | @@ -2005,6 +2022,13 @@ async fn test_room_sorting() -> Result<(), Error> { end; }; + // Read receipt update for r2. + assert_entries_batch! { + [stream] + set [ 0 ] [ "!r2:bar.org" ]; + end; + }; + // Now we have: // // | index | room ID | recency | name | @@ -2015,23 +2039,6 @@ async fn test_room_sorting() -> Result<(), Error> { // | 3 | !r4 | 5 | | // | 4 | !r3 | 4 | | - // Rooms are individually updated. - assert_entries_batch! { - [stream] - set [ 1 ] [ "!r0:bar.org" ]; - end; - }; - assert_entries_batch! { - [stream] - set [ 2 ] [ "!r1:bar.org" ]; - end; - }; - assert_entries_batch! { - [stream] - set [ 0 ] [ "!r2:bar.org" ]; - end; - }; - assert_pending!(stream); sync_then_assert_request_and_fake_response! { @@ -2134,12 +2141,12 @@ async fn test_room_sorting() -> Result<(), Error> { // Rooms are individually updated. assert_entries_batch! { [stream] - set [ 2 ] [ "!r6:bar.org" ]; + set [ 0 ] [ "!r3:bar.org" ]; end; }; assert_entries_batch! { [stream] - set [ 0 ] [ "!r3:bar.org" ]; + set [ 2 ] [ "!r6:bar.org" ]; end; }; assert_entries_batch! { diff --git a/crates/matrix-sdk/src/event_cache/caches/mod.rs b/crates/matrix-sdk/src/event_cache/caches/mod.rs index 9c4a78012..42adbb4d0 100644 --- a/crates/matrix-sdk/src/event_cache/caches/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/mod.rs @@ -24,7 +24,7 @@ use ruma::{OwnedRoomId, RoomId}; use tokio::sync::{broadcast::Sender, mpsc}; use super::{EventCacheError, Result}; -use crate::{client::WeakClient, event_cache::EventsOrigin}; +use crate::{client::WeakClient, event_cache::EventsOrigin, room::WeakRoom}; pub mod event_focused; pub mod event_linked_chunk; @@ -71,9 +71,12 @@ impl Caches { let own_user_id = client.user_id().expect("the user must be logged in, at this point").to_owned(); + let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned()); + let room_state = room::RoomEventCacheStateLock::new( own_user_id, room_id.to_owned(), + weak_room, room_version_rules, enabled_thread_support, update_sender.clone(), diff --git a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs index 2dae8a88e..3d196d47d 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/mod.rs @@ -528,7 +528,7 @@ impl RoomEventCacheInner { trace!("adding new events"); let (stored_prev_batch_token, timeline_event_diffs) = - self.state.write().await?.handle_sync(timeline).await?; + self.state.write().await?.handle_sync(timeline, &ephemeral_events).await?; // Now that all events have been added, we can trigger the // `pagination_token_notifier`. diff --git a/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs b/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs index ea1f97b63..66226068a 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/pagination.rs @@ -238,9 +238,22 @@ impl PaginatedCache for Arc { &topo_ordered_events, ); + // A back-pagination can't include new read receipt events, as those are + // ephemeral events not included in /messages responses, so we can + // safely set the receipt event to None here. + // + // Note: read receipts may be updated anyhow in the post-processing step, as the + // back-pagination may have revealed the event pointed to by the latest read + // receipt. + let receipt_event = None; + // Note: this flushes updates to the store. state - .post_process_new_events(topo_ordered_events, PostProcessingOrigin::Backpagination) + .post_process_new_events( + topo_ordered_events, + PostProcessingOrigin::Backpagination, + receipt_event, + ) .await?; let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs(); diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index 735142756..e542f5e32 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -23,7 +23,7 @@ use std::{ use eyeball::SharedObservable; use eyeball_im::VectorDiff; use matrix_sdk_base::{ - apply_redaction, + RoomInfoNotableUpdateReasons, StateChanges, ThreadingSupport, apply_redaction, deserialized_responses::{ThreadSummary, ThreadSummaryStatus}, event_cache::{ Event, Gap, @@ -40,10 +40,14 @@ use matrix_sdk_common::executor::spawn; use ruma::{ EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, events::{ - AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, - relation::RelationType, room::redaction::SyncRoomRedactionEvent, + AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, + MessageLikeEventType, + receipt::{ReceiptEventContent, SyncReceiptEvent}, + relation::RelationType, + room::redaction::SyncRoomRedactionEvent, }, room_version_rules::RoomVersionRules, + serde::Raw, }; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, error, instrument, trace, warn}; @@ -68,7 +72,11 @@ use super::{ }; use crate::{ Room, - event_cache::{EventFocusThreadMode, caches::event_focused::EventFocusedCache}, + event_cache::{ + EventFocusThreadMode, + caches::{event_focused::EventFocusedCache, read_receipts::compute_unread_counts}, + }, + room::WeakRoom, }; /// Key for the event-focused caches. @@ -87,6 +95,9 @@ pub struct RoomEventCacheState { /// The room this state relates to. pub room_id: OwnedRoomId, + /// A weak reference to the actual room. + pub room: WeakRoom, + /// The user's own user id. pub own_user_id: OwnedUserId, @@ -165,6 +176,7 @@ impl RoomEventCacheStateLock { pub async fn new( own_user_id: OwnedUserId, room_id: OwnedRoomId, + room: WeakRoom, room_version_rules: RoomVersionRules, enabled_thread_support: bool, update_sender: RoomEventCacheUpdateSender, @@ -232,6 +244,7 @@ impl RoomEventCacheStateLock { own_user_id, enabled_thread_support, room_id, + room, store, room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk( linked_chunk, @@ -799,6 +812,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { pub async fn handle_sync( &mut self, mut timeline: Timeline, + ephemeral_events: &[Raw], ) -> Result<(bool, Vec>), EventCacheError> { let mut prev_batch = timeline.prev_batch.take(); @@ -892,7 +906,24 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { .room_linked_chunk .push_live_events(prev_batch.map(|prev_token| Gap { token: prev_token }), &events); - self.post_process_new_events(events, PostProcessingOrigin::Sync).await?; + // Extract a new read receipt, if available. + let mut receipt_event = None; + for raw_ephemeral in ephemeral_events { + match raw_ephemeral.deserialize() { + Ok(AnySyncEphemeralRoomEvent::Receipt(SyncReceiptEvent { content, .. })) => { + receipt_event = Some(content); + break; + } + + Ok(_) => {} + + Err(err) => { + error!("error when deserializing an ephemeral event from sync: {err}"); + } + } + } + + self.post_process_new_events(events, PostProcessingOrigin::Sync, receipt_event).await?; if timeline.limited && has_new_gap { // If there was a previous batch token for a limited timeline, unload the chunks @@ -930,6 +961,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { &mut self, events: Vec, post_processing_origin: PostProcessingOrigin, + receipt_event: Option, ) -> Result<(), EventCacheError> { // Update the store before doing the post-processing. self.propagate_changes().await?; @@ -999,6 +1031,78 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { self.update_threads(new_events_by_thread, post_processing_origin).await?; } + self.update_read_receipts(receipt_event.as_ref()).await?; + + Ok(()) + } + + /// Update read receipts for all events in the room, based on the current + /// state of the in-memory linked chunk. + pub async fn update_read_receipts( + &mut self, + receipt_event: Option<&ReceiptEventContent>, + ) -> Result<(), EventCacheError> { + let Some(room) = self.state.room.get() else { + debug!("can't update read receipts: client's closing"); + return Ok(()); + }; + + // TODO(bnjbvr): avoid cloning all events, eventually? :) + let all_events = self + .state + .room_linked_chunk + .events() + .map(|(_, event)| event.clone()) + .collect::>(); + + let user_id = &self.state.own_user_id; + let room_id = &self.state.room_id; + + // TODO(bnjbvr): change the signature of `compute_unread_counts` to take a bool + // instead (future commit in same PR). + let threading_support = if self.state.enabled_thread_support { + ThreadingSupport::Enabled { with_subscriptions: false } + } else { + ThreadingSupport::Disabled + }; + + let mut room_info = room.clone_info(); + let prev_read_receipts = room_info.read_receipts().clone(); + let mut read_receipts = prev_read_receipts.clone(); + + compute_unread_counts( + user_id, + room_id, + receipt_event, + all_events, + &mut read_receipts, + threading_support, + ); + + if prev_read_receipts != read_receipts { + // The read receipt has changed! Do a little dance to update the `RoomInfo` in + // the state store, and then in the room itself, so that observers + // can be notified of the change. + let client = room.client(); + + // Take the state store lock. + let _state_store_lock = client.base_client().state_store_lock().lock().await; + + // Reuse and update the room info from above. + room_info.set_read_receipts(read_receipts); + + let mut state_changes = StateChanges::default(); + state_changes.add_room(room_info.clone()); + + // Update the `RoomInfo` in the state store. + if let Err(error) = client.state_store().save_changes(&state_changes).await { + error!(room_id = ?room.room_id(), ?error, "Failed to save the changes"); + } + + // Update the `RoomInfo` of the room. + room.set_room_info(room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT); + } + Ok(()) } diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 915723766..34b444510 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -403,7 +403,17 @@ impl EventCache { } } - state.post_process_new_events(new_events, PostProcessingOrigin::Redecryption).await?; + // Read receipt events aren't encrypted, so we can't have decrypted a new one + // here. As a result, we don't have any new receipt events to + // post-process, so we can just pass `None` here. + // + // Note: read receipts may be updated anyhow in the post-processing step, as the + // redecryption may have decrypted some events that don't count as unreads. + let receipt_event = None; + + state + .post_process_new_events(new_events, PostProcessingOrigin::Redecryption, receipt_event) + .await?; // We replaced a bunch of events, reactive updates for those replacements have // been queued up. We need to send them out to our subscribers now.