From 0f54877b31ef8fa23a4cff187a1ca4507377e82d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Commaille?= Date: Tue, 21 Feb 2023 10:43:10 +0100 Subject: [PATCH] feat(sdk): Add methods to only send receipts newer than the current ones in the timeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kévin Commaille --- crates/matrix-sdk/src/room/joined.rs | 6 +- crates/matrix-sdk/src/room/timeline/inner.rs | 84 +- crates/matrix-sdk/src/room/timeline/mod.rs | 129 ++- .../src/room/timeline/read_receipts.rs | 75 +- .../room/{timeline.rs => timeline/mod.rs} | 158 +--- .../room/timeline/read_receipts.rs | 778 ++++++++++++++++++ 6 files changed, 1068 insertions(+), 162 deletions(-) rename crates/matrix-sdk/tests/integration/room/{timeline.rs => timeline/mod.rs} (84%) create mode 100644 crates/matrix-sdk/tests/integration/room/timeline/read_receipts.rs diff --git a/crates/matrix-sdk/src/room/joined.rs b/crates/matrix-sdk/src/room/joined.rs index 7b8a27977..b31885dfa 100644 --- a/crates/matrix-sdk/src/room/joined.rs +++ b/crates/matrix-sdk/src/room/joined.rs @@ -1072,9 +1072,9 @@ impl Joined { /// Receipts to send all at once. #[derive(Debug, Clone, Default)] pub struct Receipts { - fully_read: Option, - read_receipt: Option, - private_read_receipt: Option, + pub(super) fully_read: Option, + pub(super) read_receipt: Option, + pub(super) private_read_receipt: Option, } impl Receipts { diff --git a/crates/matrix-sdk/src/room/timeline/inner.rs b/crates/matrix-sdk/src/room/timeline/inner.rs index 68503a45f..efb5e4fd8 100644 --- a/crates/matrix-sdk/src/room/timeline/inner.rs +++ b/crates/matrix-sdk/src/room/timeline/inner.rs @@ -29,6 +29,7 @@ use matrix_sdk_base::{ #[cfg(feature = "e2e-encryption")] use ruma::RoomId; use ruma::{ + api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType, events::{ fully_read::FullyReadEvent, receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType}, @@ -45,13 +46,18 @@ use tracing::{debug, error, field::debug, instrument, trace, warn}; use tracing::{field, info, info_span, Instrument as _}; use super::{ + compare_events_positions, event_handler::{ update_read_marker, Flow, HandleEventResult, TimelineEventHandler, TimelineEventKind, TimelineEventMetadata, TimelineItemPosition, }, - read_receipts::{handle_explicit_read_receipts, load_read_receipts_for_event}, + read_receipts::{ + handle_explicit_read_receipts, latest_user_read_receipt, load_read_receipts_for_event, + user_receipt, + }, rfind_event_by_id, rfind_event_item, EventSendState, EventTimelineItem, InReplyToDetails, - Message, Profile, RepliedToEvent, TimelineDetails, TimelineItem, TimelineItemContent, + Message, Profile, RelativePosition, RepliedToEvent, TimelineDetails, TimelineItem, + TimelineItemContent, }; use crate::{ events::SyncTimelineEventWithoutContent, @@ -578,6 +584,80 @@ impl TimelineInner { Ok(item) } + + /// Get the latest read receipt for the given user. + /// + /// Useful to get the latest read receipt, whether it's private or public. + pub(super) async fn latest_user_read_receipt( + &self, + user_id: &UserId, + ) -> Option<(OwnedEventId, Receipt)> { + let state = self.state.lock().await; + let room = self.room(); + + latest_user_read_receipt(user_id, &state, room).await + } + + /// Check whether the given receipt should be sent. + /// + /// Returns `false` if the given receipt is older than the current one. + pub(super) async fn should_send_receipt( + &self, + receipt_type: &SendReceiptType, + thread: &ReceiptThread, + event_id: &EventId, + ) -> bool { + // We don't support threaded receipts yet. + if *thread != ReceiptThread::Unthreaded { + return true; + } + + let own_user_id = self.room().own_user_id(); + let state = self.state.lock().await; + let room = self.room(); + + match receipt_type { + SendReceiptType::Read => { + if let Some((old_pub_read, _)) = + user_receipt(own_user_id, ReceiptType::Read, &state, room).await + { + if let Some(relative_pos) = + compare_events_positions(&old_pub_read, event_id, &state.items) + { + return relative_pos == RelativePosition::After; + } + } + } + // Implicit read receipts are saved as public read receipts, so get the latest. It also + // doesn't make sense to have a private read receipt behind a public one. + SendReceiptType::ReadPrivate => { + if let Some((old_priv_read, _)) = + latest_user_read_receipt(own_user_id, &state, room).await + { + if let Some(relative_pos) = + compare_events_positions(&old_priv_read, event_id, &state.items) + { + return relative_pos == RelativePosition::After; + } + } + } + SendReceiptType::FullyRead => { + if let Some(old_fully_read) = self.fully_read_event().await { + if let Some(relative_pos) = compare_events_positions( + &old_fully_read.content.event_id, + event_id, + &state.items, + ) { + return relative_pos == RelativePosition::After; + } + } + } + _ => {} + } + + // Let the server handle unknown receipts. + true + } } async fn fetch_replied_to_event( diff --git a/crates/matrix-sdk/src/room/timeline/mod.rs b/crates/matrix-sdk/src/room/timeline/mod.rs index 6108270fc..a0cdb9817 100644 --- a/crates/matrix-sdk/src/room/timeline/mod.rs +++ b/crates/matrix-sdk/src/room/timeline/mod.rs @@ -24,12 +24,18 @@ use im::Vector; use matrix_sdk_base::locks::Mutex; use pin_project_lite::pin_project; use ruma::{ - assign, events::AnyMessageLikeEventContent, EventId, MilliSecondsSinceUnixEpoch, TransactionId, + api::client::receipt::create_receipt::v3::ReceiptType, + assign, + events::{ + receipt::{Receipt, ReceiptThread}, + AnyMessageLikeEventContent, + }, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, TransactionId, UserId, }; use thiserror::Error; use tracing::{error, instrument, warn}; -use super::Joined; +use super::{Joined, Receipts}; use crate::{ event_handler::EventHandlerHandle, room::{self, MessagesOptions}, @@ -325,6 +331,95 @@ impl Timeline { } } } + + /// Get the latest read receipt for the given user. + /// + /// Contrary to [`Common::user_receipt()`](super::Common::user_receipt) that + /// only keeps track of read receipts received from the homeserver, this + /// keeps also track of implicit read receipts in this timeline, i.e. + /// when a room member sends an event. + #[instrument(skip(self), parent = &self.room().client.inner.root_span)] + pub async fn latest_user_read_receipt( + &self, + user_id: &UserId, + ) -> Option<(OwnedEventId, Receipt)> { + self.inner.latest_user_read_receipt(user_id).await + } + + /// Send the given receipt. + /// + /// This uses [`Joined::send_single_receipt`] internally, but checks + /// first if the receipt points to an event in this timeline that is more + /// recent than the current ones, to avoid unnecessary requests. + #[instrument(skip(self), parent = &self.room().client.inner.root_span)] + pub async fn send_single_receipt( + &self, + receipt_type: ReceiptType, + thread: ReceiptThread, + event_id: OwnedEventId, + ) -> Result<()> { + if !self.inner.should_send_receipt(&receipt_type, &thread, &event_id).await { + return Ok(()); + } + + // If this room isn't actually in joined state, we'll get a server error. + // Not ideal, but works for now. + let room = Joined { inner: self.room().clone() }; + + room.send_single_receipt(receipt_type, thread, event_id).await + } + + /// Send the given receipts. + /// + /// This uses [`Joined::send_multiple_receipts`] internally, but checks + /// first if the receipts point to events in this timeline that are more + /// recent than the current ones, to avoid unnecessary requests. + #[instrument(skip(self), parent = &self.room().client.inner.root_span)] + pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> { + if let Some(fully_read) = &receipts.fully_read { + if !self + .inner + .should_send_receipt( + &ReceiptType::FullyRead, + &ReceiptThread::Unthreaded, + fully_read, + ) + .await + { + receipts.fully_read = None; + } + } + + if let Some(read_receipt) = &receipts.read_receipt { + if !self + .inner + .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt) + .await + { + receipts.read_receipt = None; + } + } + + if let Some(private_read_receipt) = &receipts.private_read_receipt { + if !self + .inner + .should_send_receipt( + &ReceiptType::ReadPrivate, + &ReceiptThread::Unthreaded, + private_read_receipt, + ) + .await + { + receipts.private_read_receipt = None; + } + } + + // If this room isn't actually in joined state, we'll get a server error. + // Not ideal, but works for now. + let room = Joined { inner: self.room().clone() }; + + room.send_multiple_receipts(receipts).await + } } #[derive(Debug)] @@ -484,3 +579,33 @@ pub enum Error { #[error("Unsupported event")] UnsupportedEvent, } + +/// Result of comparing events position in the timeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RelativePosition { + /// Event B is after (more recent than) event A. + After, + /// They are the same event. + Same, + /// Event B is before (older than) event A. + Before, +} + +fn compare_events_positions( + event_a: &EventId, + event_b: &EventId, + timeline_items: &Vector>, +) -> Option { + if event_a == event_b { + return Some(RelativePosition::Same); + } + + let (pos_event_a, _) = rfind_event_by_id(timeline_items, event_a)?; + let (pos_event_b, _) = rfind_event_by_id(timeline_items, event_b)?; + + if pos_event_a > pos_event_b { + Some(RelativePosition::Before) + } else { + Some(RelativePosition::After) + } +} diff --git a/crates/matrix-sdk/src/room/timeline/read_receipts.rs b/crates/matrix-sdk/src/room/timeline/read_receipts.rs index 59d609be7..72ef2c0c8 100644 --- a/crates/matrix-sdk/src/room/timeline/read_receipts.rs +++ b/crates/matrix-sdk/src/room/timeline/read_receipts.rs @@ -23,9 +23,11 @@ use ruma::{ use tracing::error; use super::{ + compare_events_positions, inner::{RoomDataProvider, TimelineInnerState}, - rfind_event_by_id, EventTimelineItem, TimelineItem, + rfind_event_by_id, EventTimelineItem, RelativePosition, TimelineItem, }; +use crate::room; struct FullReceipt<'a> { event_id: &'a EventId, @@ -229,3 +231,74 @@ pub(super) async fn load_read_receipts_for_event( read_receipts } + +/// Get the unthreaded receipt of the given type for the given user in the +/// timeline. +pub(super) async fn user_receipt( + user_id: &UserId, + receipt_type: ReceiptType, + timeline_state: &TimelineInnerState, + room: &room::Common, +) -> Option<(OwnedEventId, Receipt)> { + if let Some(receipt) = timeline_state + .users_read_receipts + .get(user_id) + .and_then(|user_map| user_map.get(&receipt_type)) + .cloned() + { + return Some(receipt); + } + + room.user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id) + .await + .unwrap_or_else(|e| { + error!("Could not get user read receipt of type {receipt_type:?}: {e}"); + None + }) +} + +/// Get the latest read receipt for the given user. +/// +/// Useful to get the latest read receipt, whether it's private or public. +pub(super) async fn latest_user_read_receipt( + user_id: &UserId, + timeline_state: &TimelineInnerState, + room: &room::Common, +) -> Option<(OwnedEventId, Receipt)> { + let public_read_receipt = user_receipt(user_id, ReceiptType::Read, timeline_state, room).await; + let private_read_receipt = + user_receipt(user_id, ReceiptType::ReadPrivate, timeline_state, room).await; + + // If we only have one, return it. + let Some((pub_event_id, pub_receipt)) = &public_read_receipt else { + return private_read_receipt; + }; + let Some((priv_event_id, priv_receipt)) = &private_read_receipt else { + return public_read_receipt; + }; + + // Compare by position in the timeline. + if let Some(relative_pos) = + compare_events_positions(pub_event_id, priv_event_id, &timeline_state.items) + { + if relative_pos == RelativePosition::After { + return private_read_receipt; + } + + return public_read_receipt; + } + + // Compare by timestamp. + if let Some((pub_ts, priv_ts)) = pub_receipt.ts.zip(priv_receipt.ts) { + if priv_ts > pub_ts { + return private_read_receipt; + } + + return public_read_receipt; + } + + // As a fallback, let's assume that a private read receipt should be more recent + // than a public read receipt, otherwise there's no point in the private read + // receipt. + private_read_receipt +} diff --git a/crates/matrix-sdk/tests/integration/room/timeline.rs b/crates/matrix-sdk/tests/integration/room/timeline/mod.rs similarity index 84% rename from crates/matrix-sdk/tests/integration/room/timeline.rs rename to crates/matrix-sdk/tests/integration/room/timeline/mod.rs index 860ac9cc5..9eb4e76db 100644 --- a/crates/matrix-sdk/tests/integration/room/timeline.rs +++ b/crates/matrix-sdk/tests/integration/room/timeline/mod.rs @@ -16,8 +16,8 @@ use matrix_sdk::{ }; use matrix_sdk_common::executor::spawn; use matrix_sdk_test::{ - async_test, test_json, EphemeralTestEvent, EventBuilder, JoinedRoomBuilder, - RoomAccountDataTestEvent, TimelineTestEvent, + async_test, test_json, EventBuilder, JoinedRoomBuilder, RoomAccountDataTestEvent, + TimelineTestEvent, }; use ruma::{ event_id, @@ -33,6 +33,8 @@ use wiremock::{ Mock, ResponseTemplate, }; +mod read_receipts; + use crate::{logged_in_client, mock_encryption_state, mock_sync}; #[async_test] @@ -732,158 +734,6 @@ async fn in_reply_to_details() { assert_matches!(message.in_reply_to().unwrap().details, TimelineDetails::Ready(_)); } -#[async_test] -async fn read_receipts_updates() { - let room_id = room_id!("!a98sd12bjh:example.org"); - let (client, server) = logged_in_client().await; - let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - - let alice = user_id!("@alice:localhost"); - let bob = user_id!("@bob:localhost"); - - let second_event_id = event_id!("$e32037280er453l:localhost"); - let third_event_id = event_id!("$Sg2037280074GZr34:localhost"); - - let mut ev_builder = EventBuilder::new(); - ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); - - mock_sync(&server, ev_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; - - let room = client.get_room(room_id).unwrap(); - let timeline = room.timeline().await; - let (items, mut timeline_stream) = timeline.subscribe().await; - - assert!(items.is_empty()); - - ev_builder.add_joined_room( - JoinedRoomBuilder::new(room_id) - .add_timeline_event(TimelineTestEvent::MessageText) - .add_timeline_event(TimelineTestEvent::Custom(json!({ - "content": { - "body": "I'm dancing too", - "msgtype": "m.text" - }, - "event_id": second_event_id, - "origin_server_ts": 152039280, - "sender": alice, - "type": "m.room.message", - }))) - .add_timeline_event(TimelineTestEvent::Custom(json!({ - "content": { - "body": "Viva la macarena!", - "msgtype": "m.text" - }, - "event_id": third_event_id, - "origin_server_ts": 152045280, - "sender": alice, - "type": "m.room.message", - }))), - ); - - mock_sync(&server, ev_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; - - let _day_divider = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); - - // We don't list the read receipt of our own user on events. - let first_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); - let first_event = first_item.as_event().unwrap().as_remote().unwrap(); - assert!(first_event.read_receipts().is_empty()); - - // Implicit read receipt of @alice:localhost. - let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); - let second_event = second_item.as_event().unwrap().as_remote().unwrap(); - assert_eq!(second_event.read_receipts().len(), 1); - - // Read receipt of @alice:localhost is moved to third event. - let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 2, value }) => value); - let second_event = second_item.as_event().unwrap().as_remote().unwrap(); - assert!(second_event.read_receipts().is_empty()); - - let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); - let third_event = third_item.as_event().unwrap().as_remote().unwrap(); - assert_eq!(third_event.read_receipts().len(), 1); - - // Read receipt on unknown event is ignored. - ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( - EphemeralTestEvent::Custom(json!({ - "content": { - "$unknowneventid": { - "m.read": { - alice: { - "ts": 1436453550, - }, - }, - }, - }, - "type": "m.receipt", - })), - )); - - mock_sync(&server, ev_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; - - // Read receipt on older event is ignored. - ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( - EphemeralTestEvent::Custom(json!({ - "content": { - second_event_id: { - "m.read": { - alice: { - "ts": 1436451550, - }, - }, - }, - }, - "type": "m.receipt", - })), - )); - - // Read receipt on same event is ignored. - ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( - EphemeralTestEvent::Custom(json!({ - "content": { - third_event_id: { - "m.read": { - alice: { - "ts": 1436451550, - }, - }, - }, - }, - "type": "m.receipt", - })), - )); - - // New user with explicit read receipt. - ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( - EphemeralTestEvent::Custom(json!({ - "content": { - third_event_id: { - "m.read": { - bob: { - "ts": 1436451550, - }, - }, - }, - }, - "type": "m.receipt", - })), - )); - - mock_sync(&server, ev_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; - - let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 3, value }) => value); - let third_event = third_item.as_event().unwrap().as_remote().unwrap(); - assert_eq!(third_event.read_receipts().len(), 2); -} - #[async_test] async fn sync_highlighted() { let room_id = room_id!("!a98sd12bjh:example.org"); diff --git a/crates/matrix-sdk/tests/integration/room/timeline/read_receipts.rs b/crates/matrix-sdk/tests/integration/room/timeline/read_receipts.rs new file mode 100644 index 000000000..9edc03a8e --- /dev/null +++ b/crates/matrix-sdk/tests/integration/room/timeline/read_receipts.rs @@ -0,0 +1,778 @@ +use std::time::Duration; + +use assert_matches::assert_matches; +use eyeball_im::VectorDiff; +use futures_util::StreamExt; +use matrix_sdk::{config::SyncSettings, room::Receipts}; +use matrix_sdk_test::{ + async_test, EphemeralTestEvent, EventBuilder, JoinedRoomBuilder, RoomAccountDataTestEvent, + TimelineTestEvent, +}; +use ruma::{ + api::client::receipt::create_receipt::v3::ReceiptType, event_id, + events::receipt::ReceiptThread, room_id, user_id, +}; +use serde_json::json; +use wiremock::{ + matchers::{body_json, header, method, path_regex}, + Mock, ResponseTemplate, +}; + +use crate::{logged_in_client, mock_sync}; + +#[async_test] +async fn read_receipts_updates() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let own_user_id = client.user_id().unwrap(); + let alice = user_id!("@alice:localhost"); + let bob = user_id!("@bob:localhost"); + + let second_event_id = event_id!("$e32037280er453l:localhost"); + let third_event_id = event_id!("$Sg2037280074GZr34:localhost"); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline().await; + let (items, mut timeline_stream) = timeline.subscribe().await; + + assert!(items.is_empty()); + + let own_receipt = timeline.latest_user_read_receipt(own_user_id).await; + assert_matches!(own_receipt, None); + let alice_receipt = timeline.latest_user_read_receipt(alice).await; + assert_matches!(alice_receipt, None); + let bob_receipt = timeline.latest_user_read_receipt(bob).await; + assert_matches!(bob_receipt, None); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_timeline_event(TimelineTestEvent::MessageText) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "I'm dancing too", + "msgtype": "m.text" + }, + "event_id": second_event_id, + "origin_server_ts": 152039280, + "sender": alice, + "type": "m.room.message", + }))) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "Viva la macarena!", + "msgtype": "m.text" + }, + "event_id": third_event_id, + "origin_server_ts": 152045280, + "sender": alice, + "type": "m.room.message", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let _day_divider = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); + + // We don't list the read receipt of our own user on events. + let first_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); + let first_event = first_item.as_event().unwrap().as_remote().unwrap(); + assert!(first_event.read_receipts().is_empty()); + + let (own_receipt_event_id, _) = timeline.latest_user_read_receipt(own_user_id).await.unwrap(); + assert_eq!(own_receipt_event_id, first_event.event_id()); + + // Implicit read receipt of @alice:localhost. + let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); + let second_event = second_item.as_event().unwrap().as_remote().unwrap(); + assert_eq!(second_event.read_receipts().len(), 1); + + // Read receipt of @alice:localhost is moved to third event. + let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 2, value }) => value); + let second_event = second_item.as_event().unwrap().as_remote().unwrap(); + assert!(second_event.read_receipts().is_empty()); + + let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); + let third_event = third_item.as_event().unwrap().as_remote().unwrap(); + assert_eq!(third_event.read_receipts().len(), 1); + + let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap(); + assert_eq!(alice_receipt_event_id, third_event_id); + + // Read receipt on unknown event is ignored. + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + "$unknowneventid": { + "m.read": { + alice: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap(); + assert_eq!(alice_receipt_event_id, third_event.event_id()); + + // Read receipt on older event is ignored. + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + second_event_id: { + "m.read": { + alice: { + "ts": 1436451550, + }, + }, + }, + }, + "type": "m.receipt", + })), + )); + + let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap(); + assert_eq!(alice_receipt_event_id, third_event_id); + + // Read receipt on same event is ignored. + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + third_event_id: { + "m.read": { + alice: { + "ts": 1436451550, + }, + }, + }, + }, + "type": "m.receipt", + })), + )); + + let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap(); + assert_eq!(alice_receipt_event_id, third_event_id); + + // New user with explicit read receipt. + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + third_event_id: { + "m.read": { + bob: { + "ts": 1436451550, + }, + }, + }, + }, + "type": "m.receipt", + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 3, value }) => value); + let third_event = third_item.as_event().unwrap().as_remote().unwrap(); + assert_eq!(third_event.read_receipts().len(), 2); + + let (bob_receipt_event_id, _) = timeline.latest_user_read_receipt(bob).await.unwrap(); + assert_eq!(bob_receipt_event_id, third_event_id); + + // Private read receipt is updated. + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event( + EphemeralTestEvent::Custom(json!({ + "content": { + second_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let (own_user_receipt_event_id, _) = + timeline.latest_user_read_receipt(own_user_id).await.unwrap(); + assert_eq!(own_user_receipt_event_id, second_event_id); +} + +#[async_test] +async fn send_single_receipt() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let own_user_id = client.user_id().unwrap(); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline().await; + + // Unknown receipts are sent. + let first_receipts_event_id = event_id!("$first_receipts_event_id"); + + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Public read receipt") + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read\.private/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Private read receipt") + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.fully_read/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Fully-read marker") + .mount(&server) + .await; + + timeline + .send_single_receipt( + ReceiptType::Read, + ReceiptThread::Unthreaded, + first_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::ReadPrivate, + ReceiptThread::Unthreaded, + first_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::FullyRead, + ReceiptThread::Unthreaded, + first_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + server.reset().await; + + // Unchanged receipts are not sent. + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_ephemeral_event(EphemeralTestEvent::Custom(json!({ + "content": { + first_receipts_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + "m.read": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + }))) + .add_account_data(RoomAccountDataTestEvent::Custom(json!({ + "content": { + "event_id": first_receipts_event_id, + }, + "type": "m.fully_read", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + timeline + .send_single_receipt( + ReceiptType::Read, + ReceiptThread::Unthreaded, + first_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::ReadPrivate, + ReceiptThread::Unthreaded, + first_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::FullyRead, + ReceiptThread::Unthreaded, + first_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + server.reset().await; + + // Receipts with unknown previous receipts are always sent. + let second_receipts_event_id = event_id!("$second_receipts_event_id"); + + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Public read receipt") + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read\.private/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Private read receipt") + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.fully_read/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Fully-read marker") + .mount(&server) + .await; + + timeline + .send_single_receipt( + ReceiptType::Read, + ReceiptThread::Unthreaded, + second_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::ReadPrivate, + ReceiptThread::Unthreaded, + second_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::FullyRead, + ReceiptThread::Unthreaded, + second_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + server.reset().await; + + // Newer receipts in the timeline are sent. + let third_receipts_event_id = event_id!("$third_receipts_event_id"); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "I'm User A", + "msgtype": "m.text", + }, + "event_id": second_receipts_event_id, + "origin_server_ts": 152046694, + "sender": "@user_a:example.org", + "type": "m.room.message", + }))) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "I'm User B", + "msgtype": "m.text", + }, + "event_id": third_receipts_event_id, + "origin_server_ts": 152049794, + "sender": "@user_b:example.org", + "type": "m.room.message", + }))) + .add_ephemeral_event(EphemeralTestEvent::Custom(json!({ + "content": { + second_receipts_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + "m.read": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + }))) + .add_account_data(RoomAccountDataTestEvent::Custom(json!({ + "content": { + "event_id": second_receipts_event_id, + }, + "type": "m.fully_read", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Public read receipt") + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read\.private/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Private read receipt") + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.fully_read/")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .named("Fully-read marker") + .mount(&server) + .await; + + timeline + .send_single_receipt( + ReceiptType::Read, + ReceiptThread::Unthreaded, + third_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::ReadPrivate, + ReceiptThread::Unthreaded, + third_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::FullyRead, + ReceiptThread::Unthreaded, + third_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + server.reset().await; + + // Older receipts in the timeline are not sent. + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_ephemeral_event(EphemeralTestEvent::Custom(json!({ + "content": { + third_receipts_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + "m.read": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + }))) + .add_account_data(RoomAccountDataTestEvent::Custom(json!({ + "content": { + "event_id": third_receipts_event_id, + }, + "type": "m.fully_read", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + timeline + .send_single_receipt( + ReceiptType::Read, + ReceiptThread::Unthreaded, + second_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::ReadPrivate, + ReceiptThread::Unthreaded, + second_receipts_event_id.to_owned(), + ) + .await + .unwrap(); + timeline + .send_single_receipt( + ReceiptType::FullyRead, + ReceiptThread::Unthreaded, + second_receipts_event_id.to_owned(), + ) + .await + .unwrap(); +} + +#[async_test] +async fn send_multiple_receipts() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let own_user_id = client.user_id().unwrap(); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline().await; + + // Unknown receipts are sent. + let first_receipts_event_id = event_id!("$first_receipts_event_id"); + let first_receipts = Receipts::new() + .fully_read_marker(Some(first_receipts_event_id.to_owned())) + .public_read_receipt(Some(first_receipts_event_id.to_owned())) + .private_read_receipt(Some(first_receipts_event_id.to_owned())); + + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/read_markers$")) + .and(header("authorization", "Bearer 1234")) + .and(body_json(json!({ + "m.fully_read": first_receipts_event_id, + "m.read": first_receipts_event_id, + "m.read.private": first_receipts_event_id, + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .mount(&server) + .await; + + timeline.send_multiple_receipts(first_receipts.clone()).await.unwrap(); + server.reset().await; + + // Unchanged receipts are not sent. + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_ephemeral_event(EphemeralTestEvent::Custom(json!({ + "content": { + first_receipts_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + "m.read": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + }))) + .add_account_data(RoomAccountDataTestEvent::Custom(json!({ + "content": { + "event_id": first_receipts_event_id, + }, + "type": "m.fully_read", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + timeline.send_multiple_receipts(first_receipts).await.unwrap(); + server.reset().await; + + // Receipts with unknown previous receipts are always sent. + let second_receipts_event_id = event_id!("$second_receipts_event_id"); + let second_receipts = Receipts::new() + .fully_read_marker(Some(second_receipts_event_id.to_owned())) + .public_read_receipt(Some(second_receipts_event_id.to_owned())) + .private_read_receipt(Some(second_receipts_event_id.to_owned())); + + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/read_markers$")) + .and(header("authorization", "Bearer 1234")) + .and(body_json(json!({ + "m.fully_read": second_receipts_event_id, + "m.read": second_receipts_event_id, + "m.read.private": second_receipts_event_id, + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .mount(&server) + .await; + + timeline.send_multiple_receipts(second_receipts.clone()).await.unwrap(); + server.reset().await; + + // Newer receipts in the timeline are sent. + let third_receipts_event_id = event_id!("$third_receipts_event_id"); + let third_receipts = Receipts::new() + .fully_read_marker(Some(third_receipts_event_id.to_owned())) + .public_read_receipt(Some(third_receipts_event_id.to_owned())) + .private_read_receipt(Some(third_receipts_event_id.to_owned())); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "I'm User A", + "msgtype": "m.text", + }, + "event_id": second_receipts_event_id, + "origin_server_ts": 152046694, + "sender": "@user_a:example.org", + "type": "m.room.message", + }))) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "I'm User B", + "msgtype": "m.text", + }, + "event_id": third_receipts_event_id, + "origin_server_ts": 152049794, + "sender": "@user_b:example.org", + "type": "m.room.message", + }))) + .add_ephemeral_event(EphemeralTestEvent::Custom(json!({ + "content": { + second_receipts_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + "m.read": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + }))) + .add_account_data(RoomAccountDataTestEvent::Custom(json!({ + "content": { + "event_id": second_receipts_event_id, + }, + "type": "m.fully_read", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + Mock::given(method("POST")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/read_markers$")) + .and(header("authorization", "Bearer 1234")) + .and(body_json(json!({ + "m.fully_read": third_receipts_event_id, + "m.read": third_receipts_event_id, + "m.read.private": third_receipts_event_id, + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .mount(&server) + .await; + + timeline.send_multiple_receipts(third_receipts.clone()).await.unwrap(); + server.reset().await; + + // Older receipts in the timeline are not sent. + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_ephemeral_event(EphemeralTestEvent::Custom(json!({ + "content": { + third_receipts_event_id: { + "m.read.private": { + own_user_id: { + "ts": 1436453550, + }, + }, + "m.read": { + own_user_id: { + "ts": 1436453550, + }, + }, + }, + }, + "type": "m.receipt", + }))) + .add_account_data(RoomAccountDataTestEvent::Custom(json!({ + "content": { + "event_id": third_receipts_event_id, + }, + "type": "m.fully_read", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + timeline.send_multiple_receipts(second_receipts.clone()).await.unwrap(); +}