feat(timeline): support threaded read receipts

This commit is contained in:
Benjamin Bouvier
2025-07-08 15:14:57 +02:00
parent 7966dd0544
commit f45c9aa3a7
5 changed files with 146 additions and 68 deletions

View File

@@ -126,6 +126,31 @@ pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
},
}
impl<P: RoomDataProvider> TimelineFocusKind<P> {
/// Returns the [`ReceiptThread`] that should be used for the current
/// timeline focus.
///
/// Live and event timelines will use the unthreaded read receipt type in
/// general, unless they hide in-thread events, in which case they will
/// use the main thread.
pub(super) fn receipt_thread(&self) -> ReceiptThread {
match self {
TimelineFocusKind::Live { hide_threaded_events }
| TimelineFocusKind::Event { hide_threaded_events, .. } => {
if *hide_threaded_events {
ReceiptThread::Main
} else {
ReceiptThread::Unthreaded
}
}
TimelineFocusKind::Thread { root_event_id } => {
ReceiptThread::Thread(root_event_id.clone())
}
TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded,
}
}
}
#[derive(Clone, Debug)]
pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
/// Inner mutable state.
@@ -1196,7 +1221,13 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
&self,
user_id: &UserId,
) -> Option<(OwnedEventId, Receipt)> {
self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
let receipt_thread = self.focus.receipt_thread();
self.state
.read()
.await
.latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
.await
}
/// Get the ID of the timeline event with the latest read receipt for the
@@ -1460,22 +1491,9 @@ impl TimelineController {
receipt_type: &SendReceiptType,
) -> ReceiptThread {
if matches!(receipt_type, SendReceiptType::FullyRead) {
return ReceiptThread::Unthreaded;
}
match &*self.focus {
TimelineFocusKind::Live { hide_threaded_events }
| TimelineFocusKind::Event { hide_threaded_events, .. } => {
if *hide_threaded_events {
ReceiptThread::Main
} else {
ReceiptThread::Unthreaded
}
}
TimelineFocusKind::Thread { root_event_id, .. } => {
ReceiptThread::Thread(root_event_id.to_owned())
}
TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded,
ReceiptThread::Unthreaded
} else {
self.focus.receipt_thread()
}
}
@@ -1485,14 +1503,9 @@ impl TimelineController {
pub(super) async fn should_send_receipt(
&self,
receipt_type: &SendReceiptType,
thread: &ReceiptThread,
receipt_thread: &ReceiptThread,
event_id: &EventId,
) -> bool {
// We don't support threaded receipts yet.
if *thread != ReceiptThread::Unthreaded {
return true;
}
let own_user_id = self.room().own_user_id();
let state = self.state.read().await;
let room = self.room();
@@ -1504,6 +1517,7 @@ impl TimelineController {
.user_receipt(
own_user_id,
ReceiptType::Read,
receipt_thread.clone(),
room,
state.items.all_remote_events(),
)
@@ -1522,11 +1536,12 @@ impl TimelineController {
}
}
}
// Implicit read receipts are saved as public read receipts, so get the latest. It also
// doesn't make sense to have a private read receipt behind a public one.
SendReceiptType::ReadPrivate => {
if let Some((old_priv_read, _)) =
state.latest_user_read_receipt(own_user_id, room).await
state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
{
trace!(%old_priv_read, "found a previous private receipt");
if let Some(relative_pos) = state.meta.compare_events_positions(
@@ -1541,6 +1556,7 @@ impl TimelineController {
}
}
}
SendReceiptType::FullyRead => {
if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
{
@@ -1553,6 +1569,7 @@ impl TimelineController {
}
}
}
_ => {}
}

View File

@@ -503,15 +503,28 @@ impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
own_user_id: &UserId,
) {
trace!("handling explicit read receipts");
let own_receipt_thread = self.focus.receipt_thread();
for (event_id, receipt_types) in receipt_event_content.0 {
for (receipt_type, receipts) in receipt_types {
// We only care about read receipts here.
// Discard the read marker updates in this function.
if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
continue;
}
for (user_id, receipt) in receipts {
if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
if own_receipt_thread == ReceiptThread::Unthreaded {
// If the own receipt thread is unthreaded, we maintain maximal
// compatibility with clients using either unthreaded or main-thread read
// receipts by allowing both here.
if !matches!(
receipt.thread,
ReceiptThread::Unthreaded | ReceiptThread::Main
) {
continue;
}
} else if own_receipt_thread != receipt.thread {
// Otherwise, we only keep the receipts of the same thread kind.
continue;
}
@@ -542,7 +555,9 @@ impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
room_data_provider: &P,
) {
trace!(%event_id, "loading initial receipts for an event");
let read_receipts = room_data_provider.load_event_receipts(event_id).await;
let receipt_thread = self.focus.receipt_thread();
let read_receipts = room_data_provider.load_event_receipts(event_id, receipt_thread).await;
let own_user_id = room_data_provider.own_user_id();
// Since they are explicit read receipts, we need to check if they are
@@ -582,12 +597,16 @@ impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
return;
};
trace!(%event_id, "adding implicit read receipt");
let receipt = Receipt::new(timestamp);
trace!(%user_id, %event_id, "adding implicit read receipt");
let mut receipt = Receipt::new(timestamp);
receipt.thread = self.focus.receipt_thread();
let full_receipt =
FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
self.meta.read_receipts.maybe_update_read_receipt(
full_receipt,
is_own_event,
@@ -659,12 +678,15 @@ impl<P: RoomDataProvider> TimelineState<P> {
) {
let own_user_id = room_data_provider.own_user_id().to_owned();
let receipt_thread = self.focus.receipt_thread();
let wants_unthreaded_receipts = receipt_thread == ReceiptThread::Unthreaded;
let mut read_receipt = room_data_provider
.load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
.load_user_receipt(receipt_type.clone(), receipt_thread, &own_user_id)
.await;
// Fallback to the one in the main thread.
if read_receipt.is_none() {
if wants_unthreaded_receipts && read_receipt.is_none() {
// Fallback to the one in the main thread.
read_receipt = room_data_provider
.load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
.await;
@@ -681,21 +703,36 @@ impl<P: RoomDataProvider> TimelineState<P> {
pub(super) async fn latest_user_read_receipt(
&self,
user_id: &UserId,
receipt_thread: ReceiptThread,
room_data_provider: &P,
) -> Option<(OwnedEventId, Receipt)> {
let all_remote_events = self.items.all_remote_events();
let public_read_receipt = self
.meta
.user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events)
.user_receipt(
user_id,
ReceiptType::Read,
receipt_thread.clone(),
room_data_provider,
all_remote_events,
)
.await;
let private_read_receipt = self
.meta
.user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events)
.user_receipt(
user_id,
ReceiptType::ReadPrivate,
receipt_thread,
room_data_provider,
all_remote_events,
)
.await;
// Let's assume that a private read receipt should be more recent than a public
// read receipt, otherwise there's no point in the private read receipt,
// and use it as default.
// read receipt (otherwise there's no point in the private read receipt),
// and use it as the default.
match self.meta.compare_optional_receipts(
public_read_receipt.as_ref(),
private_read_receipt.as_ref(),
@@ -749,10 +786,16 @@ impl TimelineMetadata {
///
/// This will attempt to read the latest user receipt for a user from the
/// cache, or load it from the storage if missing from the cache.
///
/// If the `ReceiptThread` is `Unthreaded`, it will try to find either the
/// unthreaded or the main-thread read receipt, to be maximally
/// compatible with clients using one or the other. Otherwise, it will
/// select only the receipts for that specific thread.
pub(super) async fn user_receipt<P: RoomDataProvider>(
&self,
user_id: &UserId,
receipt_type: ReceiptType,
receipt_thread: ReceiptThread,
room_data_provider: &P,
all_remote_events: &AllRemoteEvents,
) -> Option<(OwnedEventId, Receipt)> {
@@ -761,24 +804,35 @@ impl TimelineMetadata {
return Some(receipt.clone());
}
let unthreaded_read_receipt = room_data_provider
.load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
.await;
if receipt_thread == ReceiptThread::Unthreaded {
// Maintain compatibility with clients using either the unthreaded and main read
// receipts, and try to find the most recent one.
let unthreaded_read_receipt = room_data_provider
.load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
.await;
let main_thread_read_receipt = room_data_provider
.load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
.await;
let main_thread_read_receipt = room_data_provider
.load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
.await;
// Let's use the unthreaded read receipt as default, since it's the one we
// should be using.
match self.compare_optional_receipts(
main_thread_read_receipt.as_ref(),
unthreaded_read_receipt.as_ref(),
all_remote_events,
) {
Ordering::Greater => main_thread_read_receipt,
Ordering::Less => unthreaded_read_receipt,
_ => unreachable!(),
// Let's use the unthreaded read receipt as default, since it's the one we
// should be using.
match self.compare_optional_receipts(
main_thread_read_receipt.as_ref(),
unthreaded_read_receipt.as_ref(),
all_remote_events,
) {
Ordering::Greater => main_thread_read_receipt,
Ordering::Less => unthreaded_read_receipt,
_ => unreachable!(),
}
} else {
// In all the other cases, use the thread's read receipt. A main-thread receipt
// in particular will use this code path, and not be compatible with
// an unthreaded read receipt.
room_data_provider
.load_user_receipt(receipt_type.clone(), receipt_thread, user_id)
.await
}
}

View File

@@ -46,7 +46,7 @@ pub(in crate::timeline) struct TimelineState<P: RoomDataProvider> {
pub meta: TimelineMetadata,
/// The kind of focus of this timeline.
focus: Arc<TimelineFocusKind<P>>,
pub(super) focus: Arc<TimelineFocusKind<P>>,
}
impl<P: RoomDataProvider> TimelineState<P> {

View File

@@ -382,6 +382,7 @@ impl RoomDataProvider for TestRoomDataProvider {
async fn load_event_receipts<'a>(
&'a self,
event_id: &'a EventId,
_receipt_thread: ReceiptThread,
) -> IndexMap<OwnedUserId, Receipt> {
let mut map = IndexMap::new();

View File

@@ -113,6 +113,7 @@ pub(super) trait RoomDataProvider:
fn load_event_receipts<'a>(
&'a self,
event_id: &'a EventId,
receipt_thread: ReceiptThread,
) -> impl Future<Output = IndexMap<OwnedUserId, Receipt>> + SendOutsideWasm + 'a;
/// Load the current fully-read event id, from storage.
@@ -215,31 +216,36 @@ impl RoomDataProvider for Room {
async fn load_event_receipts<'a>(
&'a self,
event_id: &'a EventId,
receipt_thread: ReceiptThread,
) -> IndexMap<OwnedUserId, Receipt> {
let mut unthreaded_receipts = match self
.load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id)
let mut result = match self
.load_event_receipts(ReceiptType::Read, receipt_thread.clone(), event_id)
.await
{
Ok(receipts) => receipts.into_iter().collect(),
Err(e) => {
error!(?event_id, "Failed to get unthreaded read receipts for event: {e}");
error!(?event_id, ?receipt_thread, "Failed to get read receipts for event: {e}");
IndexMap::new()
}
};
let main_thread_receipts = match self
.load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
.await
{
Ok(receipts) => receipts,
Err(e) => {
error!(?event_id, "Failed to get main thread read receipts for event: {e}");
Vec::new()
}
};
if receipt_thread == ReceiptThread::Unthreaded {
// Include the main thread receipts as well, to be maximally compatible with
// clients using either the unthreaded or main thread receipt type.
let main_thread_receipts = match self
.load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
.await
{
Ok(receipts) => receipts,
Err(e) => {
error!(?event_id, "Failed to get main thread read receipts for event: {e}");
Vec::new()
}
};
result.extend(main_thread_receipts);
}
unthreaded_receipts.extend(main_thread_receipts);
unthreaded_receipts
result
}
async fn push_context(&self) -> Option<PushContext> {