feat(sdk): Add methods to only send receipts newer than the current ones in the timeline

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
This commit is contained in:
Kévin Commaille
2023-02-21 10:43:10 +01:00
committed by Jonas Platte
parent 1c9aa7c907
commit 0f54877b31
6 changed files with 1068 additions and 162 deletions

View File

@@ -1072,9 +1072,9 @@ impl Joined {
/// Receipts to send all at once.
#[derive(Debug, Clone, Default)]
pub struct Receipts {
fully_read: Option<OwnedEventId>,
read_receipt: Option<OwnedEventId>,
private_read_receipt: Option<OwnedEventId>,
pub(super) fully_read: Option<OwnedEventId>,
pub(super) read_receipt: Option<OwnedEventId>,
pub(super) private_read_receipt: Option<OwnedEventId>,
}
impl Receipts {

View File

@@ -29,6 +29,7 @@ use matrix_sdk_base::{
#[cfg(feature = "e2e-encryption")]
use ruma::RoomId;
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
fully_read::FullyReadEvent,
receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
@@ -45,13 +46,18 @@ use tracing::{debug, error, field::debug, instrument, trace, warn};
use tracing::{field, info, info_span, Instrument as _};
use super::{
compare_events_positions,
event_handler::{
update_read_marker, Flow, HandleEventResult, TimelineEventHandler, TimelineEventKind,
TimelineEventMetadata, TimelineItemPosition,
},
read_receipts::{handle_explicit_read_receipts, load_read_receipts_for_event},
read_receipts::{
handle_explicit_read_receipts, latest_user_read_receipt, load_read_receipts_for_event,
user_receipt,
},
rfind_event_by_id, rfind_event_item, EventSendState, EventTimelineItem, InReplyToDetails,
Message, Profile, RepliedToEvent, TimelineDetails, TimelineItem, TimelineItemContent,
Message, Profile, RelativePosition, RepliedToEvent, TimelineDetails, TimelineItem,
TimelineItemContent,
};
use crate::{
events::SyncTimelineEventWithoutContent,
@@ -578,6 +584,80 @@ impl TimelineInner {
Ok(item)
}
/// Get the latest read receipt for the given user.
///
/// Useful to get the latest read receipt, whether it's private or public.
pub(super) async fn latest_user_read_receipt(
&self,
user_id: &UserId,
) -> Option<(OwnedEventId, Receipt)> {
let state = self.state.lock().await;
let room = self.room();
latest_user_read_receipt(user_id, &state, room).await
}
/// Check whether the given receipt should be sent.
///
/// Returns `false` if the given receipt is older than the current one.
pub(super) async fn should_send_receipt(
&self,
receipt_type: &SendReceiptType,
thread: &ReceiptThread,
event_id: &EventId,
) -> bool {
// We don't support threaded receipts yet.
if *thread != ReceiptThread::Unthreaded {
return true;
}
let own_user_id = self.room().own_user_id();
let state = self.state.lock().await;
let room = self.room();
match receipt_type {
SendReceiptType::Read => {
if let Some((old_pub_read, _)) =
user_receipt(own_user_id, ReceiptType::Read, &state, room).await
{
if let Some(relative_pos) =
compare_events_positions(&old_pub_read, event_id, &state.items)
{
return relative_pos == RelativePosition::After;
}
}
}
// Implicit read receipts are saved as public read receipts, so get the latest. It also
// doesn't make sense to have a private read receipt behind a public one.
SendReceiptType::ReadPrivate => {
if let Some((old_priv_read, _)) =
latest_user_read_receipt(own_user_id, &state, room).await
{
if let Some(relative_pos) =
compare_events_positions(&old_priv_read, event_id, &state.items)
{
return relative_pos == RelativePosition::After;
}
}
}
SendReceiptType::FullyRead => {
if let Some(old_fully_read) = self.fully_read_event().await {
if let Some(relative_pos) = compare_events_positions(
&old_fully_read.content.event_id,
event_id,
&state.items,
) {
return relative_pos == RelativePosition::After;
}
}
}
_ => {}
}
// Let the server handle unknown receipts.
true
}
}
async fn fetch_replied_to_event(

View File

@@ -24,12 +24,18 @@ use im::Vector;
use matrix_sdk_base::locks::Mutex;
use pin_project_lite::pin_project;
use ruma::{
assign, events::AnyMessageLikeEventContent, EventId, MilliSecondsSinceUnixEpoch, TransactionId,
api::client::receipt::create_receipt::v3::ReceiptType,
assign,
events::{
receipt::{Receipt, ReceiptThread},
AnyMessageLikeEventContent,
},
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, TransactionId, UserId,
};
use thiserror::Error;
use tracing::{error, instrument, warn};
use super::Joined;
use super::{Joined, Receipts};
use crate::{
event_handler::EventHandlerHandle,
room::{self, MessagesOptions},
@@ -325,6 +331,95 @@ impl Timeline {
}
}
}
/// Get the latest read receipt for the given user.
///
/// Contrary to [`Common::user_receipt()`](super::Common::user_receipt) that
/// only keeps track of read receipts received from the homeserver, this
/// keeps also track of implicit read receipts in this timeline, i.e.
/// when a room member sends an event.
#[instrument(skip(self), parent = &self.room().client.inner.root_span)]
pub async fn latest_user_read_receipt(
&self,
user_id: &UserId,
) -> Option<(OwnedEventId, Receipt)> {
self.inner.latest_user_read_receipt(user_id).await
}
/// Send the given receipt.
///
/// This uses [`Joined::send_single_receipt`] internally, but checks
/// first if the receipt points to an event in this timeline that is more
/// recent than the current ones, to avoid unnecessary requests.
#[instrument(skip(self), parent = &self.room().client.inner.root_span)]
pub async fn send_single_receipt(
&self,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: OwnedEventId,
) -> Result<()> {
if !self.inner.should_send_receipt(&receipt_type, &thread, &event_id).await {
return Ok(());
}
// If this room isn't actually in joined state, we'll get a server error.
// Not ideal, but works for now.
let room = Joined { inner: self.room().clone() };
room.send_single_receipt(receipt_type, thread, event_id).await
}
/// Send the given receipts.
///
/// This uses [`Joined::send_multiple_receipts`] internally, but checks
/// first if the receipts point to events in this timeline that are more
/// recent than the current ones, to avoid unnecessary requests.
#[instrument(skip(self), parent = &self.room().client.inner.root_span)]
pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
if let Some(fully_read) = &receipts.fully_read {
if !self
.inner
.should_send_receipt(
&ReceiptType::FullyRead,
&ReceiptThread::Unthreaded,
fully_read,
)
.await
{
receipts.fully_read = None;
}
}
if let Some(read_receipt) = &receipts.read_receipt {
if !self
.inner
.should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
.await
{
receipts.read_receipt = None;
}
}
if let Some(private_read_receipt) = &receipts.private_read_receipt {
if !self
.inner
.should_send_receipt(
&ReceiptType::ReadPrivate,
&ReceiptThread::Unthreaded,
private_read_receipt,
)
.await
{
receipts.private_read_receipt = None;
}
}
// If this room isn't actually in joined state, we'll get a server error.
// Not ideal, but works for now.
let room = Joined { inner: self.room().clone() };
room.send_multiple_receipts(receipts).await
}
}
#[derive(Debug)]
@@ -484,3 +579,33 @@ pub enum Error {
#[error("Unsupported event")]
UnsupportedEvent,
}
/// Result of comparing events position in the timeline.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RelativePosition {
/// Event B is after (more recent than) event A.
After,
/// They are the same event.
Same,
/// Event B is before (older than) event A.
Before,
}
fn compare_events_positions(
event_a: &EventId,
event_b: &EventId,
timeline_items: &Vector<Arc<TimelineItem>>,
) -> Option<RelativePosition> {
if event_a == event_b {
return Some(RelativePosition::Same);
}
let (pos_event_a, _) = rfind_event_by_id(timeline_items, event_a)?;
let (pos_event_b, _) = rfind_event_by_id(timeline_items, event_b)?;
if pos_event_a > pos_event_b {
Some(RelativePosition::Before)
} else {
Some(RelativePosition::After)
}
}

View File

@@ -23,9 +23,11 @@ use ruma::{
use tracing::error;
use super::{
compare_events_positions,
inner::{RoomDataProvider, TimelineInnerState},
rfind_event_by_id, EventTimelineItem, TimelineItem,
rfind_event_by_id, EventTimelineItem, RelativePosition, TimelineItem,
};
use crate::room;
struct FullReceipt<'a> {
event_id: &'a EventId,
@@ -229,3 +231,74 @@ pub(super) async fn load_read_receipts_for_event<P: RoomDataProvider>(
read_receipts
}
/// Get the unthreaded receipt of the given type for the given user in the
/// timeline.
pub(super) async fn user_receipt(
user_id: &UserId,
receipt_type: ReceiptType,
timeline_state: &TimelineInnerState,
room: &room::Common,
) -> Option<(OwnedEventId, Receipt)> {
if let Some(receipt) = timeline_state
.users_read_receipts
.get(user_id)
.and_then(|user_map| user_map.get(&receipt_type))
.cloned()
{
return Some(receipt);
}
room.user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
.await
.unwrap_or_else(|e| {
error!("Could not get user read receipt of type {receipt_type:?}: {e}");
None
})
}
/// Get the latest read receipt for the given user.
///
/// Useful to get the latest read receipt, whether it's private or public.
pub(super) async fn latest_user_read_receipt(
user_id: &UserId,
timeline_state: &TimelineInnerState,
room: &room::Common,
) -> Option<(OwnedEventId, Receipt)> {
let public_read_receipt = user_receipt(user_id, ReceiptType::Read, timeline_state, room).await;
let private_read_receipt =
user_receipt(user_id, ReceiptType::ReadPrivate, timeline_state, room).await;
// If we only have one, return it.
let Some((pub_event_id, pub_receipt)) = &public_read_receipt else {
return private_read_receipt;
};
let Some((priv_event_id, priv_receipt)) = &private_read_receipt else {
return public_read_receipt;
};
// Compare by position in the timeline.
if let Some(relative_pos) =
compare_events_positions(pub_event_id, priv_event_id, &timeline_state.items)
{
if relative_pos == RelativePosition::After {
return private_read_receipt;
}
return public_read_receipt;
}
// Compare by timestamp.
if let Some((pub_ts, priv_ts)) = pub_receipt.ts.zip(priv_receipt.ts) {
if priv_ts > pub_ts {
return private_read_receipt;
}
return public_read_receipt;
}
// As a fallback, let's assume that a private read receipt should be more recent
// than a public read receipt, otherwise there's no point in the private read
// receipt.
private_read_receipt
}

View File

@@ -16,8 +16,8 @@ use matrix_sdk::{
};
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::{
async_test, test_json, EphemeralTestEvent, EventBuilder, JoinedRoomBuilder,
RoomAccountDataTestEvent, TimelineTestEvent,
async_test, test_json, EventBuilder, JoinedRoomBuilder, RoomAccountDataTestEvent,
TimelineTestEvent,
};
use ruma::{
event_id,
@@ -33,6 +33,8 @@ use wiremock::{
Mock, ResponseTemplate,
};
mod read_receipts;
use crate::{logged_in_client, mock_encryption_state, mock_sync};
#[async_test]
@@ -732,158 +734,6 @@ async fn in_reply_to_details() {
assert_matches!(message.in_reply_to().unwrap().details, TimelineDetails::Ready(_));
}
#[async_test]
async fn read_receipts_updates() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let alice = user_id!("@alice:localhost");
let bob = user_id!("@bob:localhost");
let second_event_id = event_id!("$e32037280er453l:localhost");
let third_event_id = event_id!("$Sg2037280074GZr34:localhost");
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline().await;
let (items, mut timeline_stream) = timeline.subscribe().await;
assert!(items.is_empty());
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::MessageText)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "I'm dancing too",
"msgtype": "m.text"
},
"event_id": second_event_id,
"origin_server_ts": 152039280,
"sender": alice,
"type": "m.room.message",
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "Viva la macarena!",
"msgtype": "m.text"
},
"event_id": third_event_id,
"origin_server_ts": 152045280,
"sender": alice,
"type": "m.room.message",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let _day_divider = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
// We don't list the read receipt of our own user on events.
let first_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let first_event = first_item.as_event().unwrap().as_remote().unwrap();
assert!(first_event.read_receipts().is_empty());
// Implicit read receipt of @alice:localhost.
let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let second_event = second_item.as_event().unwrap().as_remote().unwrap();
assert_eq!(second_event.read_receipts().len(), 1);
// Read receipt of @alice:localhost is moved to third event.
let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 2, value }) => value);
let second_event = second_item.as_event().unwrap().as_remote().unwrap();
assert!(second_event.read_receipts().is_empty());
let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let third_event = third_item.as_event().unwrap().as_remote().unwrap();
assert_eq!(third_event.read_receipts().len(), 1);
// Read receipt on unknown event is ignored.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
"$unknowneventid": {
"m.read": {
alice: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
// Read receipt on older event is ignored.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
second_event_id: {
"m.read": {
alice: {
"ts": 1436451550,
},
},
},
},
"type": "m.receipt",
})),
));
// Read receipt on same event is ignored.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
third_event_id: {
"m.read": {
alice: {
"ts": 1436451550,
},
},
},
},
"type": "m.receipt",
})),
));
// New user with explicit read receipt.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
third_event_id: {
"m.read": {
bob: {
"ts": 1436451550,
},
},
},
},
"type": "m.receipt",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 3, value }) => value);
let third_event = third_item.as_event().unwrap().as_remote().unwrap();
assert_eq!(third_event.read_receipts().len(), 2);
}
#[async_test]
async fn sync_highlighted() {
let room_id = room_id!("!a98sd12bjh:example.org");

View File

@@ -0,0 +1,778 @@
use std::time::Duration;
use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk::{config::SyncSettings, room::Receipts};
use matrix_sdk_test::{
async_test, EphemeralTestEvent, EventBuilder, JoinedRoomBuilder, RoomAccountDataTestEvent,
TimelineTestEvent,
};
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType, event_id,
events::receipt::ReceiptThread, room_id, user_id,
};
use serde_json::json;
use wiremock::{
matchers::{body_json, header, method, path_regex},
Mock, ResponseTemplate,
};
use crate::{logged_in_client, mock_sync};
#[async_test]
async fn read_receipts_updates() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let own_user_id = client.user_id().unwrap();
let alice = user_id!("@alice:localhost");
let bob = user_id!("@bob:localhost");
let second_event_id = event_id!("$e32037280er453l:localhost");
let third_event_id = event_id!("$Sg2037280074GZr34:localhost");
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline().await;
let (items, mut timeline_stream) = timeline.subscribe().await;
assert!(items.is_empty());
let own_receipt = timeline.latest_user_read_receipt(own_user_id).await;
assert_matches!(own_receipt, None);
let alice_receipt = timeline.latest_user_read_receipt(alice).await;
assert_matches!(alice_receipt, None);
let bob_receipt = timeline.latest_user_read_receipt(bob).await;
assert_matches!(bob_receipt, None);
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::MessageText)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "I'm dancing too",
"msgtype": "m.text"
},
"event_id": second_event_id,
"origin_server_ts": 152039280,
"sender": alice,
"type": "m.room.message",
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "Viva la macarena!",
"msgtype": "m.text"
},
"event_id": third_event_id,
"origin_server_ts": 152045280,
"sender": alice,
"type": "m.room.message",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let _day_divider = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
// We don't list the read receipt of our own user on events.
let first_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let first_event = first_item.as_event().unwrap().as_remote().unwrap();
assert!(first_event.read_receipts().is_empty());
let (own_receipt_event_id, _) = timeline.latest_user_read_receipt(own_user_id).await.unwrap();
assert_eq!(own_receipt_event_id, first_event.event_id());
// Implicit read receipt of @alice:localhost.
let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let second_event = second_item.as_event().unwrap().as_remote().unwrap();
assert_eq!(second_event.read_receipts().len(), 1);
// Read receipt of @alice:localhost is moved to third event.
let second_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 2, value }) => value);
let second_event = second_item.as_event().unwrap().as_remote().unwrap();
assert!(second_event.read_receipts().is_empty());
let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
let third_event = third_item.as_event().unwrap().as_remote().unwrap();
assert_eq!(third_event.read_receipts().len(), 1);
let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap();
assert_eq!(alice_receipt_event_id, third_event_id);
// Read receipt on unknown event is ignored.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
"$unknowneventid": {
"m.read": {
alice: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap();
assert_eq!(alice_receipt_event_id, third_event.event_id());
// Read receipt on older event is ignored.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
second_event_id: {
"m.read": {
alice: {
"ts": 1436451550,
},
},
},
},
"type": "m.receipt",
})),
));
let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap();
assert_eq!(alice_receipt_event_id, third_event_id);
// Read receipt on same event is ignored.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
third_event_id: {
"m.read": {
alice: {
"ts": 1436451550,
},
},
},
},
"type": "m.receipt",
})),
));
let (alice_receipt_event_id, _) = timeline.latest_user_read_receipt(alice).await.unwrap();
assert_eq!(alice_receipt_event_id, third_event_id);
// New user with explicit read receipt.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
third_event_id: {
"m.read": {
bob: {
"ts": 1436451550,
},
},
},
},
"type": "m.receipt",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let third_item = assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 3, value }) => value);
let third_event = third_item.as_event().unwrap().as_remote().unwrap();
assert_eq!(third_event.read_receipts().len(), 2);
let (bob_receipt_event_id, _) = timeline.latest_user_read_receipt(bob).await.unwrap();
assert_eq!(bob_receipt_event_id, third_event_id);
// Private read receipt is updated.
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_ephemeral_event(
EphemeralTestEvent::Custom(json!({
"content": {
second_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let (own_user_receipt_event_id, _) =
timeline.latest_user_read_receipt(own_user_id).await.unwrap();
assert_eq!(own_user_receipt_event_id, second_event_id);
}
#[async_test]
async fn send_single_receipt() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let own_user_id = client.user_id().unwrap();
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline().await;
// Unknown receipts are sent.
let first_receipts_event_id = event_id!("$first_receipts_event_id");
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Public read receipt")
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read\.private/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Private read receipt")
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.fully_read/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Fully-read marker")
.mount(&server)
.await;
timeline
.send_single_receipt(
ReceiptType::Read,
ReceiptThread::Unthreaded,
first_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::ReadPrivate,
ReceiptThread::Unthreaded,
first_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::FullyRead,
ReceiptThread::Unthreaded,
first_receipts_event_id.to_owned(),
)
.await
.unwrap();
server.reset().await;
// Unchanged receipts are not sent.
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_ephemeral_event(EphemeralTestEvent::Custom(json!({
"content": {
first_receipts_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
"m.read": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})))
.add_account_data(RoomAccountDataTestEvent::Custom(json!({
"content": {
"event_id": first_receipts_event_id,
},
"type": "m.fully_read",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
timeline
.send_single_receipt(
ReceiptType::Read,
ReceiptThread::Unthreaded,
first_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::ReadPrivate,
ReceiptThread::Unthreaded,
first_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::FullyRead,
ReceiptThread::Unthreaded,
first_receipts_event_id.to_owned(),
)
.await
.unwrap();
server.reset().await;
// Receipts with unknown previous receipts are always sent.
let second_receipts_event_id = event_id!("$second_receipts_event_id");
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Public read receipt")
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read\.private/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Private read receipt")
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.fully_read/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Fully-read marker")
.mount(&server)
.await;
timeline
.send_single_receipt(
ReceiptType::Read,
ReceiptThread::Unthreaded,
second_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::ReadPrivate,
ReceiptThread::Unthreaded,
second_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::FullyRead,
ReceiptThread::Unthreaded,
second_receipts_event_id.to_owned(),
)
.await
.unwrap();
server.reset().await;
// Newer receipts in the timeline are sent.
let third_receipts_event_id = event_id!("$third_receipts_event_id");
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "I'm User A",
"msgtype": "m.text",
},
"event_id": second_receipts_event_id,
"origin_server_ts": 152046694,
"sender": "@user_a:example.org",
"type": "m.room.message",
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "I'm User B",
"msgtype": "m.text",
},
"event_id": third_receipts_event_id,
"origin_server_ts": 152049794,
"sender": "@user_b:example.org",
"type": "m.room.message",
})))
.add_ephemeral_event(EphemeralTestEvent::Custom(json!({
"content": {
second_receipts_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
"m.read": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})))
.add_account_data(RoomAccountDataTestEvent::Custom(json!({
"content": {
"event_id": second_receipts_event_id,
},
"type": "m.fully_read",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Public read receipt")
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.read\.private/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Private read receipt")
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/receipt/m\.fully_read/"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.named("Fully-read marker")
.mount(&server)
.await;
timeline
.send_single_receipt(
ReceiptType::Read,
ReceiptThread::Unthreaded,
third_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::ReadPrivate,
ReceiptThread::Unthreaded,
third_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::FullyRead,
ReceiptThread::Unthreaded,
third_receipts_event_id.to_owned(),
)
.await
.unwrap();
server.reset().await;
// Older receipts in the timeline are not sent.
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_ephemeral_event(EphemeralTestEvent::Custom(json!({
"content": {
third_receipts_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
"m.read": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})))
.add_account_data(RoomAccountDataTestEvent::Custom(json!({
"content": {
"event_id": third_receipts_event_id,
},
"type": "m.fully_read",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
timeline
.send_single_receipt(
ReceiptType::Read,
ReceiptThread::Unthreaded,
second_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::ReadPrivate,
ReceiptThread::Unthreaded,
second_receipts_event_id.to_owned(),
)
.await
.unwrap();
timeline
.send_single_receipt(
ReceiptType::FullyRead,
ReceiptThread::Unthreaded,
second_receipts_event_id.to_owned(),
)
.await
.unwrap();
}
#[async_test]
async fn send_multiple_receipts() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let own_user_id = client.user_id().unwrap();
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline().await;
// Unknown receipts are sent.
let first_receipts_event_id = event_id!("$first_receipts_event_id");
let first_receipts = Receipts::new()
.fully_read_marker(Some(first_receipts_event_id.to_owned()))
.public_read_receipt(Some(first_receipts_event_id.to_owned()))
.private_read_receipt(Some(first_receipts_event_id.to_owned()));
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/read_markers$"))
.and(header("authorization", "Bearer 1234"))
.and(body_json(json!({
"m.fully_read": first_receipts_event_id,
"m.read": first_receipts_event_id,
"m.read.private": first_receipts_event_id,
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.mount(&server)
.await;
timeline.send_multiple_receipts(first_receipts.clone()).await.unwrap();
server.reset().await;
// Unchanged receipts are not sent.
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_ephemeral_event(EphemeralTestEvent::Custom(json!({
"content": {
first_receipts_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
"m.read": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})))
.add_account_data(RoomAccountDataTestEvent::Custom(json!({
"content": {
"event_id": first_receipts_event_id,
},
"type": "m.fully_read",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
timeline.send_multiple_receipts(first_receipts).await.unwrap();
server.reset().await;
// Receipts with unknown previous receipts are always sent.
let second_receipts_event_id = event_id!("$second_receipts_event_id");
let second_receipts = Receipts::new()
.fully_read_marker(Some(second_receipts_event_id.to_owned()))
.public_read_receipt(Some(second_receipts_event_id.to_owned()))
.private_read_receipt(Some(second_receipts_event_id.to_owned()));
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/read_markers$"))
.and(header("authorization", "Bearer 1234"))
.and(body_json(json!({
"m.fully_read": second_receipts_event_id,
"m.read": second_receipts_event_id,
"m.read.private": second_receipts_event_id,
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.mount(&server)
.await;
timeline.send_multiple_receipts(second_receipts.clone()).await.unwrap();
server.reset().await;
// Newer receipts in the timeline are sent.
let third_receipts_event_id = event_id!("$third_receipts_event_id");
let third_receipts = Receipts::new()
.fully_read_marker(Some(third_receipts_event_id.to_owned()))
.public_read_receipt(Some(third_receipts_event_id.to_owned()))
.private_read_receipt(Some(third_receipts_event_id.to_owned()));
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "I'm User A",
"msgtype": "m.text",
},
"event_id": second_receipts_event_id,
"origin_server_ts": 152046694,
"sender": "@user_a:example.org",
"type": "m.room.message",
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "I'm User B",
"msgtype": "m.text",
},
"event_id": third_receipts_event_id,
"origin_server_ts": 152049794,
"sender": "@user_b:example.org",
"type": "m.room.message",
})))
.add_ephemeral_event(EphemeralTestEvent::Custom(json!({
"content": {
second_receipts_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
"m.read": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})))
.add_account_data(RoomAccountDataTestEvent::Custom(json!({
"content": {
"event_id": second_receipts_event_id,
},
"type": "m.fully_read",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
Mock::given(method("POST"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/read_markers$"))
.and(header("authorization", "Bearer 1234"))
.and(body_json(json!({
"m.fully_read": third_receipts_event_id,
"m.read": third_receipts_event_id,
"m.read.private": third_receipts_event_id,
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.expect(1)
.mount(&server)
.await;
timeline.send_multiple_receipts(third_receipts.clone()).await.unwrap();
server.reset().await;
// Older receipts in the timeline are not sent.
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_ephemeral_event(EphemeralTestEvent::Custom(json!({
"content": {
third_receipts_event_id: {
"m.read.private": {
own_user_id: {
"ts": 1436453550,
},
},
"m.read": {
own_user_id: {
"ts": 1436453550,
},
},
},
},
"type": "m.receipt",
})))
.add_account_data(RoomAccountDataTestEvent::Custom(json!({
"content": {
"event_id": third_receipts_event_id,
},
"type": "m.fully_read",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
timeline.send_multiple_receipts(second_receipts.clone()).await.unwrap();
}