From f45c9aa3a7d7d356307e085802c8f855f310fa7d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 8 Jul 2025 15:14:57 +0200 Subject: [PATCH] feat(timeline): support threaded read receipts --- .../src/timeline/controller/mod.rs | 65 +++++++---- .../src/timeline/controller/read_receipts.rs | 110 +++++++++++++----- .../src/timeline/controller/state.rs | 2 +- .../matrix-sdk-ui/src/timeline/tests/mod.rs | 1 + crates/matrix-sdk-ui/src/timeline/traits.rs | 36 +++--- 5 files changed, 146 insertions(+), 68 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index cc55c861a..e36228b92 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -126,6 +126,31 @@ pub(in crate::timeline) enum TimelineFocusKind { }, } +impl TimelineFocusKind

{ + /// Returns the [`ReceiptThread`] that should be used for the current + /// timeline focus. + /// + /// Live and event timelines will use the unthreaded read receipt type in + /// general, unless they hide in-thread events, in which case they will + /// use the main thread. + pub(super) fn receipt_thread(&self) -> ReceiptThread { + match self { + TimelineFocusKind::Live { hide_threaded_events } + | TimelineFocusKind::Event { hide_threaded_events, .. } => { + if *hide_threaded_events { + ReceiptThread::Main + } else { + ReceiptThread::Unthreaded + } + } + TimelineFocusKind::Thread { root_event_id } => { + ReceiptThread::Thread(root_event_id.clone()) + } + TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded, + } + } +} + #[derive(Clone, Debug)] pub(super) struct TimelineController { /// Inner mutable state. @@ -1196,7 +1221,13 @@ impl TimelineController { &self, user_id: &UserId, ) -> Option<(OwnedEventId, Receipt)> { - self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await + let receipt_thread = self.focus.receipt_thread(); + + self.state + .read() + .await + .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider) + .await } /// Get the ID of the timeline event with the latest read receipt for the @@ -1460,22 +1491,9 @@ impl TimelineController { receipt_type: &SendReceiptType, ) -> ReceiptThread { if matches!(receipt_type, SendReceiptType::FullyRead) { - return ReceiptThread::Unthreaded; - } - - match &*self.focus { - TimelineFocusKind::Live { hide_threaded_events } - | TimelineFocusKind::Event { hide_threaded_events, .. } => { - if *hide_threaded_events { - ReceiptThread::Main - } else { - ReceiptThread::Unthreaded - } - } - TimelineFocusKind::Thread { root_event_id, .. } => { - ReceiptThread::Thread(root_event_id.to_owned()) - } - TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded, + ReceiptThread::Unthreaded + } else { + self.focus.receipt_thread() } } @@ -1485,14 +1503,9 @@ impl TimelineController { pub(super) async fn should_send_receipt( &self, receipt_type: &SendReceiptType, - thread: &ReceiptThread, + receipt_thread: &ReceiptThread, event_id: &EventId, ) -> bool { - // We don't support threaded receipts yet. - if *thread != ReceiptThread::Unthreaded { - return true; - } - let own_user_id = self.room().own_user_id(); let state = self.state.read().await; let room = self.room(); @@ -1504,6 +1517,7 @@ impl TimelineController { .user_receipt( own_user_id, ReceiptType::Read, + receipt_thread.clone(), room, state.items.all_remote_events(), ) @@ -1522,11 +1536,12 @@ impl TimelineController { } } } + // Implicit read receipts are saved as public read receipts, so get the latest. It also // doesn't make sense to have a private read receipt behind a public one. SendReceiptType::ReadPrivate => { if let Some((old_priv_read, _)) = - state.latest_user_read_receipt(own_user_id, room).await + state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await { trace!(%old_priv_read, "found a previous private receipt"); if let Some(relative_pos) = state.meta.compare_events_positions( @@ -1541,6 +1556,7 @@ impl TimelineController { } } } + SendReceiptType::FullyRead => { if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await { @@ -1553,6 +1569,7 @@ impl TimelineController { } } } + _ => {} } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/read_receipts.rs b/crates/matrix-sdk-ui/src/timeline/controller/read_receipts.rs index 32cd749bd..d618cc87a 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/read_receipts.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/read_receipts.rs @@ -503,15 +503,28 @@ impl TimelineStateTransaction<'_, P> { own_user_id: &UserId, ) { trace!("handling explicit read receipts"); + let own_receipt_thread = self.focus.receipt_thread(); + for (event_id, receipt_types) in receipt_event_content.0 { for (receipt_type, receipts) in receipt_types { - // We only care about read receipts here. + // Discard the read marker updates in this function. if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) { continue; } for (user_id, receipt) in receipts { - if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) { + if own_receipt_thread == ReceiptThread::Unthreaded { + // If the own receipt thread is unthreaded, we maintain maximal + // compatibility with clients using either unthreaded or main-thread read + // receipts by allowing both here. + if !matches!( + receipt.thread, + ReceiptThread::Unthreaded | ReceiptThread::Main + ) { + continue; + } + } else if own_receipt_thread != receipt.thread { + // Otherwise, we only keep the receipts of the same thread kind. continue; } @@ -542,7 +555,9 @@ impl TimelineStateTransaction<'_, P> { room_data_provider: &P, ) { trace!(%event_id, "loading initial receipts for an event"); - let read_receipts = room_data_provider.load_event_receipts(event_id).await; + + let receipt_thread = self.focus.receipt_thread(); + let read_receipts = room_data_provider.load_event_receipts(event_id, receipt_thread).await; let own_user_id = room_data_provider.own_user_id(); // Since they are explicit read receipts, we need to check if they are @@ -582,12 +597,16 @@ impl TimelineStateTransaction<'_, P> { return; }; - trace!(%event_id, "adding implicit read receipt"); - let receipt = Receipt::new(timestamp); + trace!(%user_id, %event_id, "adding implicit read receipt"); + + let mut receipt = Receipt::new(timestamp); + receipt.thread = self.focus.receipt_thread(); + let full_receipt = FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt }; let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id); + self.meta.read_receipts.maybe_update_read_receipt( full_receipt, is_own_event, @@ -659,12 +678,15 @@ impl TimelineState

{ ) { let own_user_id = room_data_provider.own_user_id().to_owned(); + let receipt_thread = self.focus.receipt_thread(); + let wants_unthreaded_receipts = receipt_thread == ReceiptThread::Unthreaded; + let mut read_receipt = room_data_provider - .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id) + .load_user_receipt(receipt_type.clone(), receipt_thread, &own_user_id) .await; - // Fallback to the one in the main thread. - if read_receipt.is_none() { + if wants_unthreaded_receipts && read_receipt.is_none() { + // Fallback to the one in the main thread. read_receipt = room_data_provider .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id) .await; @@ -681,21 +703,36 @@ impl TimelineState

{ pub(super) async fn latest_user_read_receipt( &self, user_id: &UserId, + receipt_thread: ReceiptThread, room_data_provider: &P, ) -> Option<(OwnedEventId, Receipt)> { let all_remote_events = self.items.all_remote_events(); + let public_read_receipt = self .meta - .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events) + .user_receipt( + user_id, + ReceiptType::Read, + receipt_thread.clone(), + room_data_provider, + all_remote_events, + ) .await; + let private_read_receipt = self .meta - .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events) + .user_receipt( + user_id, + ReceiptType::ReadPrivate, + receipt_thread, + room_data_provider, + all_remote_events, + ) .await; // Let's assume that a private read receipt should be more recent than a public - // read receipt, otherwise there's no point in the private read receipt, - // and use it as default. + // read receipt (otherwise there's no point in the private read receipt), + // and use it as the default. match self.meta.compare_optional_receipts( public_read_receipt.as_ref(), private_read_receipt.as_ref(), @@ -749,10 +786,16 @@ impl TimelineMetadata { /// /// This will attempt to read the latest user receipt for a user from the /// cache, or load it from the storage if missing from the cache. + /// + /// If the `ReceiptThread` is `Unthreaded`, it will try to find either the + /// unthreaded or the main-thread read receipt, to be maximally + /// compatible with clients using one or the other. Otherwise, it will + /// select only the receipts for that specific thread. pub(super) async fn user_receipt( &self, user_id: &UserId, receipt_type: ReceiptType, + receipt_thread: ReceiptThread, room_data_provider: &P, all_remote_events: &AllRemoteEvents, ) -> Option<(OwnedEventId, Receipt)> { @@ -761,24 +804,35 @@ impl TimelineMetadata { return Some(receipt.clone()); } - let unthreaded_read_receipt = room_data_provider - .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id) - .await; + if receipt_thread == ReceiptThread::Unthreaded { + // Maintain compatibility with clients using either the unthreaded and main read + // receipts, and try to find the most recent one. + let unthreaded_read_receipt = room_data_provider + .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id) + .await; - let main_thread_read_receipt = room_data_provider - .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id) - .await; + let main_thread_read_receipt = room_data_provider + .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id) + .await; - // Let's use the unthreaded read receipt as default, since it's the one we - // should be using. - match self.compare_optional_receipts( - main_thread_read_receipt.as_ref(), - unthreaded_read_receipt.as_ref(), - all_remote_events, - ) { - Ordering::Greater => main_thread_read_receipt, - Ordering::Less => unthreaded_read_receipt, - _ => unreachable!(), + // Let's use the unthreaded read receipt as default, since it's the one we + // should be using. + match self.compare_optional_receipts( + main_thread_read_receipt.as_ref(), + unthreaded_read_receipt.as_ref(), + all_remote_events, + ) { + Ordering::Greater => main_thread_read_receipt, + Ordering::Less => unthreaded_read_receipt, + _ => unreachable!(), + } + } else { + // In all the other cases, use the thread's read receipt. A main-thread receipt + // in particular will use this code path, and not be compatible with + // an unthreaded read receipt. + room_data_provider + .load_user_receipt(receipt_type.clone(), receipt_thread, user_id) + .await } } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index e753b30d1..0d8d12bbf 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -46,7 +46,7 @@ pub(in crate::timeline) struct TimelineState { pub meta: TimelineMetadata, /// The kind of focus of this timeline. - focus: Arc>, + pub(super) focus: Arc>, } impl TimelineState

{ diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index 1903e8224..b6785934f 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -382,6 +382,7 @@ impl RoomDataProvider for TestRoomDataProvider { async fn load_event_receipts<'a>( &'a self, event_id: &'a EventId, + _receipt_thread: ReceiptThread, ) -> IndexMap { let mut map = IndexMap::new(); diff --git a/crates/matrix-sdk-ui/src/timeline/traits.rs b/crates/matrix-sdk-ui/src/timeline/traits.rs index 458979ab0..7dc259c58 100644 --- a/crates/matrix-sdk-ui/src/timeline/traits.rs +++ b/crates/matrix-sdk-ui/src/timeline/traits.rs @@ -113,6 +113,7 @@ pub(super) trait RoomDataProvider: fn load_event_receipts<'a>( &'a self, event_id: &'a EventId, + receipt_thread: ReceiptThread, ) -> impl Future> + SendOutsideWasm + 'a; /// Load the current fully-read event id, from storage. @@ -215,31 +216,36 @@ impl RoomDataProvider for Room { async fn load_event_receipts<'a>( &'a self, event_id: &'a EventId, + receipt_thread: ReceiptThread, ) -> IndexMap { - let mut unthreaded_receipts = match self - .load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id) + let mut result = match self + .load_event_receipts(ReceiptType::Read, receipt_thread.clone(), event_id) .await { Ok(receipts) => receipts.into_iter().collect(), Err(e) => { - error!(?event_id, "Failed to get unthreaded read receipts for event: {e}"); + error!(?event_id, ?receipt_thread, "Failed to get read receipts for event: {e}"); IndexMap::new() } }; - let main_thread_receipts = match self - .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id) - .await - { - Ok(receipts) => receipts, - Err(e) => { - error!(?event_id, "Failed to get main thread read receipts for event: {e}"); - Vec::new() - } - }; + if receipt_thread == ReceiptThread::Unthreaded { + // Include the main thread receipts as well, to be maximally compatible with + // clients using either the unthreaded or main thread receipt type. + let main_thread_receipts = match self + .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id) + .await + { + Ok(receipts) => receipts, + Err(e) => { + error!(?event_id, "Failed to get main thread read receipts for event: {e}"); + Vec::new() + } + }; + result.extend(main_thread_receipts); + } - unthreaded_receipts.extend(main_thread_receipts); - unthreaded_receipts + result } async fn push_context(&self) -> Option {