read receipts: introduce ReceiptSelector helper and add many unit tests

The `ReceiptSelector` splits the tasks of computing a better new receipt into small parts, making it trivial to test in isolation.
This commit is contained in:
Benjamin Bouvier
2024-01-11 17:10:18 +01:00
parent ddb44d35cd
commit 92df885474

View File

@@ -177,15 +177,110 @@ impl PreviousEventsProvider for () {
}
}
/// Create a mapping of `event_id` -> sync order for all events that have an `event_id`.
fn create_sync_index<'a>(
events: impl Iterator<Item = &'a SyncTimelineEvent> + 'a,
) -> BTreeMap<OwnedEventId, usize> {
BTreeMap::from_iter(
events
.enumerate()
.filter_map(|(pos, event)| event.event_id().map(|event_id| (event_id, pos))),
)
/// Small helper to select the "best" receipt (that with the biggest sync order).
struct ReceiptSelector {
/// Mapping of known event IDs to their sync order.
event_id_to_pos: BTreeMap<OwnedEventId, usize>,
/// The event with the biggest sync order, for which we had a user receipt, so far.
best_receipt: Option<OwnedEventId>,
/// The biggest sync order attached to the `best_receipt`.
best_pos: Option<usize>,
}
impl ReceiptSelector {
fn new(
all_events: &Vector<SyncTimelineEvent>,
latest_active_receipt_event: Option<&EventId>,
) -> Self {
let event_id_to_pos = Self::create_sync_index(all_events.iter());
let best_pos =
latest_active_receipt_event.and_then(|event_id| event_id_to_pos.get(event_id)).copied();
Self { best_pos, best_receipt: None, event_id_to_pos }
}
/// Create a mapping of `event_id` -> sync order for all events that have an `event_id`.
fn create_sync_index<'a>(
events: impl Iterator<Item = &'a SyncTimelineEvent> + 'a,
) -> BTreeMap<OwnedEventId, usize> {
// TODO: this should be cached and incrementally updated.
BTreeMap::from_iter(
events
.enumerate()
.filter_map(|(pos, event)| event.event_id().map(|event_id| (event_id, pos))),
)
}
/// Consider the current event and its position as a better read receipt.
fn try_select_better(&mut self, event_id: &EventId, event_pos: usize) {
// We now have a position for an event that had a read receipt, but wasn't found
// before. Consider if it is the most recent now.
if let Some(best_pos) = self.best_pos.as_mut() {
// Note: by using a strict comparison here, we protect against the
// server sending a receipt on the same event multiple times.
if event_pos > *best_pos {
*best_pos = event_pos;
self.best_receipt = Some(event_id.to_owned());
}
} else {
// We didn't have a previous receipt, this is the first one we
// store: remember it.
self.best_pos = Some(event_pos);
self.best_receipt = Some(event_id.to_owned());
}
}
/// Try to match pending receipts against new events.
fn handle_pending_receipts(&mut self, pending: &mut BTreeSet<OwnedEventId>) {
// Try to match stashes receipts against the new events.
pending.retain(|event_id| {
if let Some(event_pos) = self.event_id_to_pos.get(event_id) {
// Maybe select this read receipt as it might be better than the ones we had.
self.try_select_better(&*event_id, *event_pos);
// Remove this stashed read receipt from the pending list, as it's been
// reconciled with its event.
false
} else {
// Keep it for further iterations.
true
}
});
}
/// Try to match new receipts against all (new and old) events.
///
/// Returns all the new pending receipts (those for which we didn't have a known matching
/// event).
fn handle_new_receipt(
&mut self,
user_id: &UserId,
receipt_event: &ReceiptEventContent,
) -> Vec<OwnedEventId> {
let mut pending = Vec::new();
// Now consider new receipts.
for (event_id, receipts) in &receipt_event.0 {
for ty in [ReceiptType::Read, ReceiptType::ReadPrivate] {
if let Some(receipt) = receipts.get(&ty).and_then(|receipts| receipts.get(user_id))
{
if matches!(receipt.thread, ReceiptThread::Main | ReceiptThread::Unthreaded) {
if let Some(event_pos) = self.event_id_to_pos.get(event_id) {
self.try_select_better(event_id, *event_pos);
} else {
// It's a new pending receipt.
pending.push(event_id.clone());
}
}
}
}
}
pending
}
fn finish(self) -> Option<LatestReadReceipt> {
self.best_receipt.map(|event_id| LatestReadReceipt { event_id })
}
}
/// Given a set of events coming from sync, for a room, update the
@@ -212,80 +307,18 @@ pub(crate) fn compute_notifications<PEP: PreviousEventsProvider>(
let mut all_events = previous_events_provider.for_room(room_id);
all_events.extend(new_events.iter().cloned());
let event_id_to_pos = create_sync_index(all_events.iter());
// We're looking for a receipt that has a position that is at least further (>)
// than the one we knew about (if any).
let mut best_receipt = None;
let mut best_pos = read_receipts
.latest_active
.as_ref()
.and_then(|r| event_id_to_pos.get(&r.event_id))
.copied();
// Try to match stashes receipts against the new events.
read_receipts.pending.retain(|event_id| {
if let Some(event_pos) = event_id_to_pos.get(event_id) {
// We now have a position for an event that had a read receipt, but wasn't found
// before. Consider if it is the most recent now.
if let Some(best_pos) = best_pos.as_mut() {
// Note: by using a strict comparison here, we protect against the
// server sending a receipt on the same event multiple times.
if *event_pos > *best_pos {
*best_pos = *event_pos;
best_receipt = Some(event_id.clone());
}
} else {
// We didn't have a previous receipt, this is the first one we
// store: remember it.
best_pos = Some(*event_pos);
best_receipt = Some(event_id.clone());
}
// Remove this stashed read receipt from the pending list, as it's been
// reconciled with its event.
false
} else {
// Keep it for further iterations.
true
let new_receipt = {
let mut selector = ReceiptSelector::new(
&all_events,
read_receipts.latest_active.as_ref().map(|receipt| &*receipt.event_id),
);
selector.handle_pending_receipts(&mut read_receipts.pending);
if let Some(receipt_event) = receipt_event {
trace!("Got a new receipt event!");
read_receipts.pending.extend(selector.handle_new_receipt(user_id, receipt_event));
}
});
if let Some(receipt_event) = receipt_event {
trace!("Got a new receipt event!");
// Now consider new receipts.
for (event_id, receipts) in &receipt_event.0 {
for ty in [ReceiptType::Read, ReceiptType::ReadPrivate] {
if let Some(receipt) = receipts.get(&ty).and_then(|receipts| receipts.get(user_id))
{
if matches!(receipt.thread, ReceiptThread::Main | ReceiptThread::Unthreaded) {
if let Some(event_pos) = event_id_to_pos.get(event_id) {
if let Some(best_pos) = best_pos.as_mut() {
// Note: by using a strict comparison here, we protect against the
// server sending a receipt on the same event multiple times.
if *event_pos > *best_pos {
*best_pos = *event_pos;
best_receipt = Some(event_id.clone());
}
} else {
// We didn't have a previous receipt, this is the first one we
// store: remember it.
best_pos = Some(*event_pos);
best_receipt = Some(event_id.clone());
}
} else {
// It's a new pending receipt.
read_receipts.pending.insert(event_id.clone());
}
}
}
}
}
}
// I swear the `LatestReadReceipt` might get handy at some point.
let new_receipt = best_receipt.map(|r| LatestReadReceipt { event_id: r.clone() });
selector.finish()
};
if let Some(new_receipt) = new_receipt {
// We've found the id of an event to which the receipt attaches. The associated
@@ -422,7 +455,7 @@ fn marks_as_unread(event: &Raw<AnySyncTimelineEvent>, user_id: &UserId) -> bool
#[cfg(test)]
mod tests {
use std::ops::Not as _;
use std::{collections::BTreeSet, ops::Not as _};
use eyeball_im::Vector;
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
@@ -430,14 +463,14 @@ mod tests {
use ruma::{
event_id,
events::receipt::{ReceiptThread, ReceiptType},
owned_event_id,
owned_event_id, owned_user_id,
push::Action,
room_id, user_id, EventId, UserId,
};
use super::{compute_notifications, create_sync_index};
use super::compute_notifications;
use crate::{
read_receipts::{marks_as_unread, RoomReadReceipts},
read_receipts::{marks_as_unread, ReceiptSelector, RoomReadReceipts},
PreviousEventsProvider,
};
@@ -812,25 +845,25 @@ mod tests {
assert_eq!(read_receipts.num_unread, 2);
}
fn make_test_events(user_id: &UserId) -> Vector<SyncTimelineEvent> {
let ev1 = sync_timeline_message(user_id, "$1", "With the lights out, it's less dangerous");
let ev2 = sync_timeline_message(user_id, "$2", "Here we are now, entertain us");
let ev3 = sync_timeline_message(user_id, "$3", "I feel stupid and contagious");
let ev4 = sync_timeline_message(user_id, "$4", "Here we are now, entertain us");
let ev5 = sync_timeline_message(user_id, "$5", "Hello, hello, hello, how low?");
vec![ev1, ev2, ev3, ev4, ev5].into()
}
/// Test that when multiple receipts come in a single event, we can still find the latest one
/// according to the sync order.
#[test]
fn test_compute_notifications_multiple_receipts_in_one_event() {
let user_id = user_id!("@alice:example.org");
let other_user_id = user_id!("@bob:example.org");
let room_id = room_id!("!room:example.org");
let ev1 = sync_timeline_message(other_user_id, "$1", "boom");
let ev2 = sync_timeline_message(other_user_id, "$2", "boom");
let ev3 = sync_timeline_message(other_user_id, "$3", "boom");
let ev4 = sync_timeline_message(other_user_id, "$4", "boom");
let ev5 = sync_timeline_message(other_user_id, "$5", "i want you in my room");
let all_events: Vector<_> =
vec![ev1.clone(), ev2.clone(), ev3.clone(), ev4.clone(), ev5.clone()].into();
let head_events: Vector<_> = vec![ev1, ev2].into();
let tail_events = &[ev3, ev4, ev5];
let all_events = make_test_events(user_id!("@bob:example.org"));
let head_events: Vector<_> = all_events.iter().take(2).cloned().collect();
let tail_events: Vec<_> = all_events.iter().skip(2).cloned().collect();
for receipt_type_1 in &[ReceiptType::Read, ReceiptType::ReadPrivate] {
for receipt_thread_1 in &[ReceiptThread::Unthreaded, ReceiptThread::Main] {
@@ -881,7 +914,7 @@ mod tests {
room_id,
Some(&receipt_event),
&head_events,
tail_events,
&tail_events,
&mut read_receipts,
)
.unwrap());
@@ -897,14 +930,10 @@ mod tests {
}
#[test]
fn test_create_sync_index() {
fn test_receipt_selector_create_sync_index() {
let uid = user_id!("@bob:example.org");
let ev1 = sync_timeline_message(uid, "$1", "boom");
let ev2 = sync_timeline_message(uid, "$2", "boom");
let ev3 = sync_timeline_message(uid, "$3", "boom");
let ev4 = sync_timeline_message(uid, "$4", "boom");
let ev5 = sync_timeline_message(uid, "$5", "i want you in my room");
let events = make_test_events(uid);
// An event with no id.
let ev6 = SyncTimelineEvent::new(sync_timeline_event!({
@@ -914,7 +943,7 @@ mod tests {
"content": { "body": "yolo", "msgtype": "m.text" },
}));
let index = create_sync_index([ev1, ev2, ev3, ev4, ev5, ev6].iter());
let index = ReceiptSelector::create_sync_index(events.iter().chain(&[ev6]));
assert_eq!(*index.get(event_id!("$1")).unwrap(), 0);
assert_eq!(*index.get(event_id!("$2")).unwrap(), 1);
@@ -924,5 +953,298 @@ mod tests {
assert_eq!(index.get(event_id!("$6")), None);
assert_eq!(index.len(), 5);
// Sync order are set according to the position in the vector.
let index = ReceiptSelector::create_sync_index(
[events[1].clone(), events[2].clone(), events[4].clone()].iter(),
);
assert_eq!(*index.get(event_id!("$2")).unwrap(), 0);
assert_eq!(*index.get(event_id!("$3")).unwrap(), 1);
assert_eq!(*index.get(event_id!("$5")).unwrap(), 2);
assert_eq!(index.len(), 3);
}
#[test]
fn test_receipt_selector_try_select_better() {
let events = make_test_events(user_id!("@bob:example.org"));
{
// No initial active receipt, so the first receipt we get *will* win.
let mut selector = ReceiptSelector::new(&vec![].into(), None);
selector.try_select_better(event_id!("$1"), 0);
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$1"));
}
{
// $3 is at pos 2, $1 at position 0, so $3 wins => no new change.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$3")));
selector.try_select_better(event_id!("$1"), 0);
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// $3 is at pos 2, $4 at position 3, so $4 wins.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$3")));
selector.try_select_better(event_id!("$4"), 3);
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$4"));
}
}
#[test]
fn test_receipt_selector_handle_pending_receipts() {
let sender = user_id!("@bob:example.org");
let ev1 = sync_timeline_message(sender, event_id!("$1"), "yo");
let ev2 = sync_timeline_message(sender, event_id!("$2"), "well?");
let events: Vector<_> = vec![ev1, ev2].into();
// Each test must be duplicated here:
// - one time it must run with no active receipt,
// - one time it must run with an active receipt (consider that the active receipt may be
// better *or* less good).
{
// No pending receipt => no better receipt.
let mut selector = ReceiptSelector::new(&events, None);
let mut pending = BTreeSet::new();
selector.handle_pending_receipts(&mut pending);
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// No pending receipt, and there was an active last receipt => no better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1")));
let mut pending = BTreeSet::new();
selector.handle_pending_receipts(&mut pending);
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// A pending receipt for an event that is still missing => no better receipt.
let mut selector = ReceiptSelector::new(&events, None);
let mut pending = BTreeSet::from_iter([owned_event_id!("$3")]);
selector.handle_pending_receipts(&mut pending);
assert_eq!(pending.len(), 1);
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// Ditto but there was an active receipt => no better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1")));
let mut pending = BTreeSet::from_iter([owned_event_id!("$3")]);
selector.handle_pending_receipts(&mut pending);
assert_eq!(pending.len(), 1);
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// A pending receipt for an event that is present => better receipt.
let mut selector = ReceiptSelector::new(&events, None);
let mut pending = BTreeSet::from_iter([owned_event_id!("$2")]);
selector.handle_pending_receipts(&mut pending);
// The receipt for $2 has been found.
assert!(pending.is_empty());
// The new receipt has been returned.
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$2"));
}
{
// Same, and there was an initial receipt that was less good than the one we selected => better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1")));
let mut pending = BTreeSet::from_iter([owned_event_id!("$2")]);
selector.handle_pending_receipts(&mut pending);
// The receipt for $2 has been found.
assert!(pending.is_empty());
// The new receipt has been returned.
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$2"));
}
{
// Same, but the previous receipt was better => no better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2")));
let mut pending = BTreeSet::from_iter([owned_event_id!("$1")]);
selector.handle_pending_receipts(&mut pending);
// The receipt for $1 has been found.
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// Mixed found and not found receipt => better receipt.
let mut selector = ReceiptSelector::new(&events, None);
let mut pending = BTreeSet::from_iter([owned_event_id!("$1"), owned_event_id!("$3")]);
selector.handle_pending_receipts(&mut pending);
// The receipt for $1 has been found, but not that for $3.
assert_eq!(pending.len(), 1);
assert!(pending.contains(event_id!("$3")));
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$1"));
}
}
#[test]
fn test_receipt_selector_handle_new_receipt() {
let myself = owned_user_id!("@alice:example.org");
let events = make_test_events(user_id!("@bob:example.org"));
{
// Thread receipts are ignored.
let mut selector = ReceiptSelector::new(&events, None);
let receipt_event = EventBuilder::new().make_receipt_event_content([(
owned_event_id!("$5"),
ReceiptType::Read,
myself.clone(),
ReceiptThread::Thread(owned_event_id!("$2")),
)]);
let pending = selector.handle_new_receipt(&myself, &receipt_event);
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
for receipt_type in [ReceiptType::Read, ReceiptType::ReadPrivate] {
for receipt_thread in [ReceiptThread::Main, ReceiptThread::Unthreaded] {
{
// Receipt for an event we don't know about => it's pending, and no better receipt.
let mut selector = ReceiptSelector::new(&events, None);
let receipt_event = EventBuilder::new().make_receipt_event_content([(
owned_event_id!("$6"),
receipt_type.clone(),
myself.clone(),
receipt_thread.clone(),
)]);
let pending = selector.handle_new_receipt(&myself, &receipt_event);
assert_eq!(pending[0], event_id!("$6"));
assert_eq!(pending.len(), 1);
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// Receipt for an event we knew about, no initial active receipt => better receipt.
let mut selector = ReceiptSelector::new(&events, None);
let receipt_event = EventBuilder::new().make_receipt_event_content([(
owned_event_id!("$3"),
receipt_type.clone(),
myself.clone(),
receipt_thread.clone(),
)]);
let pending = selector.handle_new_receipt(&myself, &receipt_event);
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$3"));
}
{
// Receipt for an event we knew about, initial active receipt was better => no better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$4")));
let receipt_event = EventBuilder::new().make_receipt_event_content([(
owned_event_id!("$3"),
receipt_type.clone(),
myself.clone(),
receipt_thread.clone(),
)]);
let pending = selector.handle_new_receipt(&myself, &receipt_event);
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert!(best_receipt.is_none());
}
{
// Receipt for an event we knew about, initial active receipt was less good => new better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2")));
let receipt_event = EventBuilder::new().make_receipt_event_content([(
owned_event_id!("$3"),
receipt_type.clone(),
myself.clone(),
receipt_thread.clone(),
)]);
let pending = selector.handle_new_receipt(&myself, &receipt_event);
assert!(pending.is_empty());
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$3"));
}
}
} // end for
{
// Final boss: multiple receipts in the receipt event, the best one is used => new better receipt.
let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2")));
let receipt_event = EventBuilder::new().make_receipt_event_content([
(
owned_event_id!("$4"),
ReceiptType::ReadPrivate,
myself.clone(),
ReceiptThread::Unthreaded,
),
(
owned_event_id!("$6"),
ReceiptType::ReadPrivate,
myself.clone(),
ReceiptThread::Main,
),
(owned_event_id!("$3"), ReceiptType::Read, myself.clone(), ReceiptThread::Main),
]);
let pending = selector.handle_new_receipt(&myself, &receipt_event);
assert_eq!(pending.len(), 1);
assert_eq!(pending[0], event_id!("$6"));
let best_receipt = selector.finish();
assert_eq!(best_receipt.unwrap().event_id, event_id!("$4"));
}
}
}