ui: Handle read receipts in the main thread

Improves the compatibility with clients using threads

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
This commit is contained in:
Kévin Commaille
2023-10-11 13:02:34 +02:00
committed by Benjamin Bouvier
parent ddb4bf13b1
commit ff83f5abcb
5 changed files with 139 additions and 82 deletions

View File

@@ -19,12 +19,9 @@ use imbl::Vector;
use matrix_sdk::{
deserialized_responses::SyncTimelineEvent, executor::spawn, sync::RoomUpdate, Room,
};
use ruma::events::{
receipt::{ReceiptThread, ReceiptType},
AnySyncTimelineEvent,
};
use ruma::events::{receipt::ReceiptType, AnySyncTimelineEvent};
use tokio::sync::{broadcast, mpsc, Notify};
use tracing::{error, info, info_span, trace, warn, Instrument, Span};
use tracing::{info, info_span, trace, warn, Instrument, Span};
#[cfg(feature = "e2e-encryption")]
use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event};
@@ -123,42 +120,8 @@ impl TimelineBuilder {
let mut inner = TimelineInner::new(room).with_settings(settings);
if track_read_marker_and_receipts {
match inner
.room()
.user_receipt(
ReceiptType::Read,
ReceiptThread::Unthreaded,
inner.room().own_user_id(),
)
.await
{
Ok(Some(read_receipt)) => {
inner.set_initial_user_receipt(ReceiptType::Read, read_receipt).await;
}
Err(e) => {
error!("Failed to get public read receipt of own user from the store: {e}");
}
_ => {}
}
match inner
.room()
.user_receipt(
ReceiptType::ReadPrivate,
ReceiptThread::Unthreaded,
inner.room().own_user_id(),
)
.await
{
Ok(Some(private_read_receipt)) => {
inner
.set_initial_user_receipt(ReceiptType::ReadPrivate, private_read_receipt)
.await;
}
Err(e) => {
error!("Failed to get private read receipt of own user from the store: {e}");
}
_ => {}
}
inner.populate_initial_user_receipt(ReceiptType::Read).await;
inner.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}
if has_events {

View File

@@ -291,12 +291,26 @@ impl<P: RoomDataProvider> TimelineInner<P> {
Ok(result)
}
pub(super) async fn set_initial_user_receipt(
&mut self,
receipt_type: ReceiptType,
receipt: (OwnedEventId, Receipt),
) {
pub(super) async fn populate_initial_user_receipt(&mut self, receipt_type: ReceiptType) {
let own_user_id = self.room_data_provider.own_user_id().to_owned();
let mut read_receipt = self
.room_data_provider
.user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
.await;
// Fallback to the one in the main thread.
if read_receipt.is_none() {
read_receipt = self
.room_data_provider
.user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
.await;
}
let Some(read_receipt) = read_receipt else {
return;
};
self.state
.write()
.await
@@ -304,7 +318,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
.users_read_receipts
.entry(own_user_id)
.or_default()
.insert(receipt_type, receipt);
.insert(receipt_type, read_receipt);
}
pub(super) async fn add_initial_events(

View File

@@ -317,7 +317,7 @@ impl TimelineInnerStateTransaction<'_> {
}
for (user_id, receipt) in receipts {
if receipt.thread != ReceiptThread::Unthreaded {
if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
continue;
}
@@ -464,36 +464,23 @@ impl TimelineInnerState {
let private_read_receipt = self.user_receipt(user_id, ReceiptType::ReadPrivate, room).await;
// If we only have one, return it.
let Some((pub_event_id, pub_receipt)) = &public_read_receipt else {
let Some(pub_receipt) = &public_read_receipt else {
return private_read_receipt;
};
let Some((priv_event_id, priv_receipt)) = &private_read_receipt else {
let Some(priv_receipt) = &private_read_receipt else {
return public_read_receipt;
};
// Compare by position in the timeline.
if let Some(relative_pos) = self.meta.compare_events_positions(pub_event_id, priv_event_id)
{
if relative_pos == RelativePosition::After {
return private_read_receipt;
match self.compare_receipts(pub_receipt, priv_receipt) {
ReceiptCmp::First => public_read_receipt,
ReceiptCmp::Second => private_read_receipt,
ReceiptCmp::Unknown => {
// As a fallback, let's assume that a private read receipt should be more recent
// than a public read receipt, otherwise there's no point in the private read
// receipt.
private_read_receipt
}
return public_read_receipt;
}
// Compare by timestamp.
if let Some((pub_ts, priv_ts)) = pub_receipt.ts.zip(priv_receipt.ts) {
if priv_ts > pub_ts {
return private_read_receipt;
}
return public_read_receipt;
}
// As a fallback, let's assume that a private read receipt should be more recent
// than a public read receipt, otherwise there's no point in the private read
// receipt.
private_read_receipt
}
}
@@ -513,14 +500,85 @@ impl TimelineInnerMetadata {
.and_then(|user_map| user_map.get(&receipt_type))
.cloned()
{
// Since it is in the timeline, it should be the most recent.
return Some(receipt);
}
room.user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
let unthreaded_read_receipt = room
.user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
.await
.unwrap_or_else(|e| {
error!("Could not get user read receipt of type {receipt_type:?}: {e}");
error!("Could not get unthreaded user read receipt of type {receipt_type:?}: {e}");
None
})
});
let main_thread_read_receipt = room
.user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
.await
.unwrap_or_else(|e| {
error!("Could not get main thread user read receipt of type {receipt_type:?}: {e}");
None
});
// If we only have one, return it.
let Some(unthreaded_receipt) = &unthreaded_read_receipt else {
return main_thread_read_receipt;
};
let Some(main_thread_receipt) = &main_thread_read_receipt else {
return unthreaded_read_receipt;
};
match self.compare_receipts(unthreaded_receipt, main_thread_receipt) {
ReceiptCmp::First => unthreaded_read_receipt,
ReceiptCmp::Second => main_thread_read_receipt,
ReceiptCmp::Unknown => {
// As a fallback, let's use the unthreaded read receipt, since it's the one
// we should be using.
unthreaded_read_receipt
}
}
}
/// Compares two receipts to know which one is more recent.
///
/// Returns `None` if it's not possible to know which one is the more
/// recent.
fn compare_receipts(
&self,
first: &(OwnedEventId, Receipt),
second: &(OwnedEventId, Receipt),
) -> ReceiptCmp {
let (first_event_id, first_receipt) = first;
let (second_event_id, second_receipt) = second;
// Compare by position in the timeline.
if let Some(relative_pos) = self.compare_events_positions(first_event_id, second_event_id) {
if relative_pos == RelativePosition::After {
return ReceiptCmp::Second;
}
return ReceiptCmp::First;
}
// Compare by timestamp.
if let Some((first_ts, second_ts)) = first_receipt.ts.zip(second_receipt.ts) {
if second_ts > first_ts {
return ReceiptCmp::Second;
}
return ReceiptCmp::First;
}
ReceiptCmp::Unknown
}
}
/// Result of read receipts comparison.
enum ReceiptCmp {
/// The first one is more recent.
First,
/// The second one is more recent.
Second,
/// We don't know which one is more recent.
Unknown,
}

View File

@@ -141,13 +141,27 @@ impl RoomDataProvider for Room {
}
async fn read_receipts_for_event(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt> {
match self.event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id).await {
Ok(receipts) => receipts.into_iter().collect(),
Err(e) => {
error!(?event_id, "Failed to get read receipts for event: {e}");
IndexMap::new()
}
}
let mut unthreaded_receipts =
match self.event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id).await
{
Ok(receipts) => receipts.into_iter().collect(),
Err(e) => {
error!(?event_id, "Failed to get unthreaded read receipts for event: {e}");
IndexMap::new()
}
};
let main_thread_receipts =
match self.event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id).await {
Ok(receipts) => receipts,
Err(e) => {
error!(?event_id, "Failed to get main thread read receipts for event: {e}");
Vec::new()
}
};
unthreaded_receipts.extend(main_thread_receipts);
unthreaded_receipts
}
async fn push_rules_and_context(&self) -> Option<(Ruleset, PushConditionRoomCtx)> {

View File

@@ -214,14 +214,22 @@ async fn read_receipts_updates() {
let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap();
assert_eq!(alice_receipt_event_id, third_event_id);
// New user with explicit read receipt.
// New user with explicit threaded and unthreaded read receipts.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
second_event_id: {
"m.read": {
bob: {
"ts": 1436451350,
},
},
},
third_event_id: {
"m.read": {
bob: {
"ts": 1436451550,
"thread_id": "main",
},
},
},