feat(event cache): compute read receipts in the event cache (part 2)

This is a basic implementation that works, but it should unlock
improvements already (getting the unread count updated whenever a UTD
has been resolved) and it will pave the way for future improvements
(notably with respect to performance).
This commit is contained in:
Benjamin Bouvier
2026-03-05 17:28:23 +01:00
parent 627e118ef3
commit a0ce0cfaf2
7 changed files with 175 additions and 28 deletions

View File

@@ -1103,6 +1103,16 @@ impl RoomInfo {
.is_some_and(|pinned| pinned.contains(&event_id.to_owned()))
}
/// Returns the computed read receipts for this room.
pub fn read_receipts(&self) -> &RoomReadReceipts {
&self.read_receipts
}
/// Set the computed read receipts for this room.
pub fn set_read_receipts(&mut self, read_receipts: RoomReadReceipts) {
self.read_receipts = read_receipts;
}
/// Apply migrations to this `RoomInfo` if needed.
///
/// This should be used to populate new fields with data from the state

View File

@@ -1971,6 +1971,16 @@ async fn test_room_sorting() -> Result<(), Error> {
end;
};
// All rooms get new messages, so their entries will get updates because of read
// receipt updates.
//
// Starting with r0.
assert_entries_batch! {
[stream]
set [ 0 ] [ "!r0:bar.org" ];
end;
};
// Now we have:
//
// | index | room ID | recency | name |
@@ -1988,6 +1998,13 @@ async fn test_room_sorting() -> Result<(), Error> {
end;
};
// Read receipt update for r1.
assert_entries_batch! {
[stream]
set [ 1 ] [ "!r1:bar.org" ];
end;
};
// Now we have:
//
// | index | room ID | recency | name |
@@ -2005,6 +2022,13 @@ async fn test_room_sorting() -> Result<(), Error> {
end;
};
// Read receipt update for r2.
assert_entries_batch! {
[stream]
set [ 0 ] [ "!r2:bar.org" ];
end;
};
// Now we have:
//
// | index | room ID | recency | name |
@@ -2015,23 +2039,6 @@ async fn test_room_sorting() -> Result<(), Error> {
// | 3 | !r4 | 5 | |
// | 4 | !r3 | 4 | |
// Rooms are individually updated.
assert_entries_batch! {
[stream]
set [ 1 ] [ "!r0:bar.org" ];
end;
};
assert_entries_batch! {
[stream]
set [ 2 ] [ "!r1:bar.org" ];
end;
};
assert_entries_batch! {
[stream]
set [ 0 ] [ "!r2:bar.org" ];
end;
};
assert_pending!(stream);
sync_then_assert_request_and_fake_response! {
@@ -2134,12 +2141,12 @@ async fn test_room_sorting() -> Result<(), Error> {
// Rooms are individually updated.
assert_entries_batch! {
[stream]
set [ 2 ] [ "!r6:bar.org" ];
set [ 0 ] [ "!r3:bar.org" ];
end;
};
assert_entries_batch! {
[stream]
set [ 0 ] [ "!r3:bar.org" ];
set [ 2 ] [ "!r6:bar.org" ];
end;
};
assert_entries_batch! {

View File

@@ -24,7 +24,7 @@ use ruma::{OwnedRoomId, RoomId};
use tokio::sync::{broadcast::Sender, mpsc};
use super::{EventCacheError, Result};
use crate::{client::WeakClient, event_cache::EventsOrigin};
use crate::{client::WeakClient, event_cache::EventsOrigin, room::WeakRoom};
pub mod event_focused;
pub mod event_linked_chunk;
@@ -71,9 +71,12 @@ impl Caches {
let own_user_id =
client.user_id().expect("the user must be logged in, at this point").to_owned();
let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
let room_state = room::RoomEventCacheStateLock::new(
own_user_id,
room_id.to_owned(),
weak_room,
room_version_rules,
enabled_thread_support,
update_sender.clone(),

View File

@@ -528,7 +528,7 @@ impl RoomEventCacheInner {
trace!("adding new events");
let (stored_prev_batch_token, timeline_event_diffs) =
self.state.write().await?.handle_sync(timeline).await?;
self.state.write().await?.handle_sync(timeline, &ephemeral_events).await?;
// Now that all events have been added, we can trigger the
// `pagination_token_notifier`.

View File

@@ -238,9 +238,22 @@ impl PaginatedCache for Arc<RoomEventCacheInner> {
&topo_ordered_events,
);
// A back-pagination can't include new read receipt events, as those are
// ephemeral events not included in /messages responses, so we can
// safely set the receipt event to None here.
//
// Note: read receipts may be updated anyhow in the post-processing step, as the
// back-pagination may have revealed the event pointed to by the latest read
// receipt.
let receipt_event = None;
// Note: this flushes updates to the store.
state
.post_process_new_events(topo_ordered_events, PostProcessingOrigin::Backpagination)
.post_process_new_events(
topo_ordered_events,
PostProcessingOrigin::Backpagination,
receipt_event,
)
.await?;
let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();

View File

@@ -23,7 +23,7 @@ use std::{
use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
apply_redaction,
RoomInfoNotableUpdateReasons, StateChanges, ThreadingSupport, apply_redaction,
deserialized_responses::{ThreadSummary, ThreadSummaryStatus},
event_cache::{
Event, Gap,
@@ -40,10 +40,14 @@ use matrix_sdk_common::executor::spawn;
use ruma::{
EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId,
events::{
AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
relation::RelationType, room::redaction::SyncRoomRedactionEvent,
AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
MessageLikeEventType,
receipt::{ReceiptEventContent, SyncReceiptEvent},
relation::RelationType,
room::redaction::SyncRoomRedactionEvent,
},
room_version_rules::RoomVersionRules,
serde::Raw,
};
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::{debug, error, instrument, trace, warn};
@@ -68,7 +72,11 @@ use super::{
};
use crate::{
Room,
event_cache::{EventFocusThreadMode, caches::event_focused::EventFocusedCache},
event_cache::{
EventFocusThreadMode,
caches::{event_focused::EventFocusedCache, read_receipts::compute_unread_counts},
},
room::WeakRoom,
};
/// Key for the event-focused caches.
@@ -87,6 +95,9 @@ pub struct RoomEventCacheState {
/// The room this state relates to.
pub room_id: OwnedRoomId,
/// A weak reference to the actual room.
pub room: WeakRoom,
/// The user's own user id.
pub own_user_id: OwnedUserId,
@@ -165,6 +176,7 @@ impl RoomEventCacheStateLock {
pub async fn new(
own_user_id: OwnedUserId,
room_id: OwnedRoomId,
room: WeakRoom,
room_version_rules: RoomVersionRules,
enabled_thread_support: bool,
update_sender: RoomEventCacheUpdateSender,
@@ -232,6 +244,7 @@ impl RoomEventCacheStateLock {
own_user_id,
enabled_thread_support,
room_id,
room,
store,
room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
linked_chunk,
@@ -799,6 +812,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
pub async fn handle_sync(
&mut self,
mut timeline: Timeline,
ephemeral_events: &[Raw<AnySyncEphemeralRoomEvent>],
) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
let mut prev_batch = timeline.prev_batch.take();
@@ -892,7 +906,24 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
.room_linked_chunk
.push_live_events(prev_batch.map(|prev_token| Gap { token: prev_token }), &events);
self.post_process_new_events(events, PostProcessingOrigin::Sync).await?;
// Extract a new read receipt, if available.
let mut receipt_event = None;
for raw_ephemeral in ephemeral_events {
match raw_ephemeral.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(SyncReceiptEvent { content, .. })) => {
receipt_event = Some(content);
break;
}
Ok(_) => {}
Err(err) => {
error!("error when deserializing an ephemeral event from sync: {err}");
}
}
}
self.post_process_new_events(events, PostProcessingOrigin::Sync, receipt_event).await?;
if timeline.limited && has_new_gap {
// If there was a previous batch token for a limited timeline, unload the chunks
@@ -930,6 +961,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
&mut self,
events: Vec<Event>,
post_processing_origin: PostProcessingOrigin,
receipt_event: Option<ReceiptEventContent>,
) -> Result<(), EventCacheError> {
// Update the store before doing the post-processing.
self.propagate_changes().await?;
@@ -999,6 +1031,78 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
self.update_threads(new_events_by_thread, post_processing_origin).await?;
}
self.update_read_receipts(receipt_event.as_ref()).await?;
Ok(())
}
/// Update read receipts for all events in the room, based on the current
/// state of the in-memory linked chunk.
pub async fn update_read_receipts(
&mut self,
receipt_event: Option<&ReceiptEventContent>,
) -> Result<(), EventCacheError> {
let Some(room) = self.state.room.get() else {
debug!("can't update read receipts: client's closing");
return Ok(());
};
// TODO(bnjbvr): avoid cloning all events, eventually? :)
let all_events = self
.state
.room_linked_chunk
.events()
.map(|(_, event)| event.clone())
.collect::<Vec<_>>();
let user_id = &self.state.own_user_id;
let room_id = &self.state.room_id;
// TODO(bnjbvr): change the signature of `compute_unread_counts` to take a bool
// instead (future commit in same PR).
let threading_support = if self.state.enabled_thread_support {
ThreadingSupport::Enabled { with_subscriptions: false }
} else {
ThreadingSupport::Disabled
};
let mut room_info = room.clone_info();
let prev_read_receipts = room_info.read_receipts().clone();
let mut read_receipts = prev_read_receipts.clone();
compute_unread_counts(
user_id,
room_id,
receipt_event,
all_events,
&mut read_receipts,
threading_support,
);
if prev_read_receipts != read_receipts {
// The read receipt has changed! Do a little dance to update the `RoomInfo` in
// the state store, and then in the room itself, so that observers
// can be notified of the change.
let client = room.client();
// Take the state store lock.
let _state_store_lock = client.base_client().state_store_lock().lock().await;
// Reuse and update the room info from above.
room_info.set_read_receipts(read_receipts);
let mut state_changes = StateChanges::default();
state_changes.add_room(room_info.clone());
// Update the `RoomInfo` in the state store.
if let Err(error) = client.state_store().save_changes(&state_changes).await {
error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
}
// Update the `RoomInfo` of the room.
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT);
}
Ok(())
}

View File

@@ -403,7 +403,17 @@ impl EventCache {
}
}
state.post_process_new_events(new_events, PostProcessingOrigin::Redecryption).await?;
// Read receipt events aren't encrypted, so we can't have decrypted a new one
// here. As a result, we don't have any new receipt events to
// post-process, so we can just pass `None` here.
//
// Note: read receipts may be updated anyhow in the post-processing step, as the
// redecryption may have decrypted some events that don't count as unreads.
let receipt_event = None;
state
.post_process_new_events(new_events, PostProcessingOrigin::Redecryption, receipt_event)
.await?;
// We replaced a bunch of events, reactive updates for those replacements have
// been queued up. We need to send them out to our subscribers now.