mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-19 14:19:06 -04:00
feat(timeline): expose the user's threaded receipt on each thread summary
This is half of the work: this will load the threaded receipt for each thread, every time we add/update a timeline item for an event that had a thread summary. Since we don't know which of the private or the public receipt is the most advanced, we simply pass both, to start with; it's expected that this code dies later, when we fold it in into the event cache. The second half will consist in updating the thread summaries when a new read receipt event happens.
This commit is contained in:
@@ -255,6 +255,11 @@ pub struct PollAnswer {
|
||||
pub struct ThreadSummary {
|
||||
pub latest_event: EmbeddedEventDetails,
|
||||
pub num_replies: u32,
|
||||
/// The user's own public read receipt event id, for this particular thread.
|
||||
pub public_read_receipt_event_id: Option<String>,
|
||||
/// The user's own private read receipt event id, for this particular
|
||||
/// thread.
|
||||
pub private_read_receipt_event_id: Option<String>,
|
||||
}
|
||||
|
||||
#[matrix_sdk_ffi_macros::export]
|
||||
@@ -273,6 +278,10 @@ impl From<matrix_sdk_ui::timeline::ThreadSummary> for ThreadSummary {
|
||||
Self {
|
||||
latest_event: EmbeddedEventDetails::from(value.latest_event),
|
||||
num_replies: value.num_replies,
|
||||
public_read_receipt_event_id: value.public_read_receipt_event_id.map(|v| v.to_string()),
|
||||
private_read_receipt_event_id: value
|
||||
.private_read_receipt_event_id
|
||||
.map(|v| v.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ use matrix_sdk::deserialized_responses::{
|
||||
};
|
||||
use ruma::{
|
||||
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
|
||||
events::AnySyncTimelineEvent, push::Action, serde::Raw,
|
||||
events::{AnySyncTimelineEvent, receipt::{ReceiptThread, ReceiptType}},
|
||||
push::Action,
|
||||
serde::Raw,
|
||||
};
|
||||
use tracing::{debug, instrument, trace, warn};
|
||||
|
||||
@@ -692,9 +694,47 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Load the public and private read receipts for the user, in the thread. In the
|
||||
// future, we might move this code in the event cache, so that read receipt
|
||||
// handling happens there instead.
|
||||
|
||||
// As an exception to handle the "implicit" read receipt (which is the latest event
|
||||
// sent by the user): if the latest event has been sent by the current user, then we
|
||||
// consider that as a read receipt.
|
||||
let (own_thread_public_receipt, own_thread_private_receipt) = if settings.track_read_receipts.is_enabled() {
|
||||
if let Some(ref latest_reply) = summary.latest_reply &&
|
||||
let Some(event) = RoomDataProvider::load_event(room_data_provider, latest_reply)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
warn!("Failed to load thread latest event: {err}");
|
||||
})
|
||||
.ok() &&
|
||||
// Parse the sender.
|
||||
let Ok(Some(sender)) = event.raw().get_field::<OwnedUserId>("sender") &&
|
||||
sender == self.meta.own_user_id {
|
||||
let latest = Some(latest_reply.clone());
|
||||
(latest.clone(), latest)
|
||||
} else {
|
||||
let own_thread_public_receipt =
|
||||
if let Some(event_id) = event.event_id() {
|
||||
room_data_provider.load_user_receipt(ReceiptType::Read, ReceiptThread::Thread(event_id), &self.meta.own_user_id).await
|
||||
.map(|(event_id, _receipt)| event_id) } else { None };
|
||||
|
||||
let own_thread_private_receipt =
|
||||
if let Some(event_id) = event.event_id() {
|
||||
room_data_provider.load_user_receipt(ReceiptType::ReadPrivate, ReceiptThread::Thread(event_id), &self.meta.own_user_id).await
|
||||
.map(|(event_id, _receipt)| event_id) } else { None };
|
||||
|
||||
(own_thread_public_receipt, own_thread_private_receipt)
|
||||
}
|
||||
} else { Default::default() };
|
||||
|
||||
Some(ThreadSummary {
|
||||
latest_event: TimelineDetails::from_initial_value(latest_reply_item),
|
||||
num_replies: summary.num_replies,
|
||||
public_read_receipt_event_id: own_thread_public_receipt,
|
||||
private_read_receipt_event_id: own_thread_private_receipt,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -53,6 +53,13 @@ pub struct ThreadSummary {
|
||||
/// thread-focused timeline with the same timeline filter may result in
|
||||
/// *fewer* events than this number.
|
||||
pub num_replies: u32,
|
||||
|
||||
/// The user's own public read receipt event id, for this particular thread.
|
||||
pub public_read_receipt_event_id: Option<OwnedEventId>,
|
||||
|
||||
/// The user's own private read receipt event id, for this particular
|
||||
/// thread.
|
||||
pub private_read_receipt_event_id: Option<OwnedEventId>,
|
||||
}
|
||||
|
||||
/// A special kind of [`super::TimelineItemContent`] that groups together
|
||||
|
||||
@@ -300,6 +300,11 @@ async fn test_extract_bundled_thread_summary() {
|
||||
// We get the count from the bundled thread summary.
|
||||
assert_eq!(summary.num_replies, 42);
|
||||
|
||||
// The read receipts haven't been filled, because we didn't have such
|
||||
// information for the current user.
|
||||
assert_eq!(summary.public_read_receipt_event_id, None);
|
||||
assert_eq!(summary.private_read_receipt_event_id, None);
|
||||
|
||||
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
|
||||
assert!(value.is_date_divider());
|
||||
}
|
||||
@@ -397,6 +402,11 @@ async fn test_new_thread_reply_causes_thread_summary_update() {
|
||||
// The thread summary contains the number of replies.
|
||||
assert_eq!(summary.num_replies, 1);
|
||||
|
||||
// The read receipts haven't been filled, because we didn't have such
|
||||
// information for the current user.
|
||||
assert_eq!(summary.public_read_receipt_event_id, None);
|
||||
assert_eq!(summary.private_read_receipt_event_id, None);
|
||||
|
||||
assert_pending!(stream);
|
||||
|
||||
// A new thread reply updates the number of replies in the thread.
|
||||
@@ -454,6 +464,11 @@ async fn test_new_thread_reply_causes_thread_summary_update() {
|
||||
|
||||
// The number of replies has been updated.
|
||||
assert_eq!(summary.num_replies, 2);
|
||||
|
||||
// The read receipts haven't been filled, because we didn't have such
|
||||
// information for the current user.
|
||||
assert_eq!(summary.public_read_receipt_event_id, None);
|
||||
assert_eq!(summary.private_read_receipt_event_id, None);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
@@ -2079,3 +2094,86 @@ async fn test_redaction_affects_thread_summary() {
|
||||
assert_eq!(event_item.event_id(), Some(thread_root));
|
||||
assert!(event_item.content().as_msglike().unwrap().thread_summary.is_none());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_main_timeline_has_receipts_in_thread_summaries() {
|
||||
// The initial read receipts for thread events are filled, as part of the
|
||||
// `ThreadSummary`, for main timeline:
|
||||
// - at start
|
||||
// - upon update of the read receipts events
|
||||
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = client_with_threading_support(&server).await;
|
||||
client.event_cache().subscribe().unwrap();
|
||||
|
||||
let own_user = client.user_id().unwrap();
|
||||
|
||||
// Sync some initial read receipts, one for the main timeline (that won't be
|
||||
// used) and another one for a threaded timeline, that will be used later.
|
||||
let room_id = room_id!("!a:b.c");
|
||||
let f = EventFactory::new().room(room_id).sender(&ALICE);
|
||||
|
||||
let thread_event_id = event_id!("$thread_root");
|
||||
let latest_event_id = event_id!("$latest_event");
|
||||
|
||||
let room = server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(room_id)
|
||||
.add_receipt(
|
||||
// Add a receipt for the latest event of the thread.
|
||||
f.read_receipts()
|
||||
.add(
|
||||
latest_event_id,
|
||||
own_user,
|
||||
ReceiptType::Read,
|
||||
ReceiptThread::Thread(thread_event_id.to_owned()),
|
||||
)
|
||||
.into_event(),
|
||||
)
|
||||
.add_timeline_event(
|
||||
// And the thread root itself.
|
||||
f.text_msg("thready thread mcthreadface")
|
||||
.with_bundled_thread_summary(
|
||||
f.text_msg("the last one!").event_id(latest_event_id).into(),
|
||||
42,
|
||||
false,
|
||||
)
|
||||
.event_id(thread_event_id),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let timeline = room.timeline().await.unwrap();
|
||||
let (mut initial_items, mut stream) = timeline.subscribe().await;
|
||||
|
||||
// Wait for the initial items.
|
||||
if initial_items.is_empty() {
|
||||
assert_let_timeout!(Some(timeline_updates) = stream.next());
|
||||
for up in timeline_updates {
|
||||
up.apply(&mut initial_items);
|
||||
}
|
||||
}
|
||||
|
||||
// Let's take a look at the items, now that they've been loaded: there will be
|
||||
// the day divider and the thread root itself.
|
||||
let items = timeline.items().await;
|
||||
assert_eq!(items.len(), 2);
|
||||
|
||||
// First, the day divider.
|
||||
let value = &items[0];
|
||||
assert!(value.is_date_divider());
|
||||
|
||||
// Second, the event with the thread summary that has some read receipts.
|
||||
let value = &items[1];
|
||||
let event_item = value.as_event().unwrap();
|
||||
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
|
||||
assert_let!(Some(summary) = event_item.content().thread_summary());
|
||||
|
||||
// We get the latest event from the bundled thread summary, and it's loaded.
|
||||
assert!(summary.latest_event.is_ready());
|
||||
|
||||
// The public read receipt event id is filled (but the private isn't).
|
||||
assert_eq!(summary.public_read_receipt_event_id.as_deref(), Some(latest_event_id));
|
||||
assert_eq!(summary.private_read_receipt_event_id, None);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user