From 92df88547458ebfac0c13ef8ac1e09a4d7f700ca Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 11 Jan 2024 17:10:18 +0100 Subject: [PATCH] read receipts: introduce `ReceiptSelector` helper and add many unit tests The `ReceiptSelector` splits the tasks of computing a better new receipt into small parts, making it trivial to test in isolation. --- crates/matrix-sdk-base/src/read_receipts.rs | 534 ++++++++++++++++---- 1 file changed, 428 insertions(+), 106 deletions(-) diff --git a/crates/matrix-sdk-base/src/read_receipts.rs b/crates/matrix-sdk-base/src/read_receipts.rs index 96ee3f0b0..80517c60b 100644 --- a/crates/matrix-sdk-base/src/read_receipts.rs +++ b/crates/matrix-sdk-base/src/read_receipts.rs @@ -177,15 +177,110 @@ impl PreviousEventsProvider for () { } } -/// Create a mapping of `event_id` -> sync order for all events that have an `event_id`. -fn create_sync_index<'a>( - events: impl Iterator + 'a, -) -> BTreeMap { - BTreeMap::from_iter( - events - .enumerate() - .filter_map(|(pos, event)| event.event_id().map(|event_id| (event_id, pos))), - ) +/// Small helper to select the "best" receipt (that with the biggest sync order). +struct ReceiptSelector { + /// Mapping of known event IDs to their sync order. + event_id_to_pos: BTreeMap, + /// The event with the biggest sync order, for which we had a user receipt, so far. + best_receipt: Option, + /// The biggest sync order attached to the `best_receipt`. + best_pos: Option, +} + +impl ReceiptSelector { + fn new( + all_events: &Vector, + latest_active_receipt_event: Option<&EventId>, + ) -> Self { + let event_id_to_pos = Self::create_sync_index(all_events.iter()); + + let best_pos = + latest_active_receipt_event.and_then(|event_id| event_id_to_pos.get(event_id)).copied(); + + Self { best_pos, best_receipt: None, event_id_to_pos } + } + + /// Create a mapping of `event_id` -> sync order for all events that have an `event_id`. + fn create_sync_index<'a>( + events: impl Iterator + 'a, + ) -> BTreeMap { + // TODO: this should be cached and incrementally updated. + BTreeMap::from_iter( + events + .enumerate() + .filter_map(|(pos, event)| event.event_id().map(|event_id| (event_id, pos))), + ) + } + + /// Consider the current event and its position as a better read receipt. + fn try_select_better(&mut self, event_id: &EventId, event_pos: usize) { + // We now have a position for an event that had a read receipt, but wasn't found + // before. Consider if it is the most recent now. + if let Some(best_pos) = self.best_pos.as_mut() { + // Note: by using a strict comparison here, we protect against the + // server sending a receipt on the same event multiple times. + if event_pos > *best_pos { + *best_pos = event_pos; + self.best_receipt = Some(event_id.to_owned()); + } + } else { + // We didn't have a previous receipt, this is the first one we + // store: remember it. + self.best_pos = Some(event_pos); + self.best_receipt = Some(event_id.to_owned()); + } + } + + /// Try to match pending receipts against new events. + fn handle_pending_receipts(&mut self, pending: &mut BTreeSet) { + // Try to match stashes receipts against the new events. + pending.retain(|event_id| { + if let Some(event_pos) = self.event_id_to_pos.get(event_id) { + // Maybe select this read receipt as it might be better than the ones we had. + self.try_select_better(&*event_id, *event_pos); + + // Remove this stashed read receipt from the pending list, as it's been + // reconciled with its event. + false + } else { + // Keep it for further iterations. + true + } + }); + } + + /// Try to match new receipts against all (new and old) events. + /// + /// Returns all the new pending receipts (those for which we didn't have a known matching + /// event). + fn handle_new_receipt( + &mut self, + user_id: &UserId, + receipt_event: &ReceiptEventContent, + ) -> Vec { + let mut pending = Vec::new(); + // Now consider new receipts. + for (event_id, receipts) in &receipt_event.0 { + for ty in [ReceiptType::Read, ReceiptType::ReadPrivate] { + if let Some(receipt) = receipts.get(&ty).and_then(|receipts| receipts.get(user_id)) + { + if matches!(receipt.thread, ReceiptThread::Main | ReceiptThread::Unthreaded) { + if let Some(event_pos) = self.event_id_to_pos.get(event_id) { + self.try_select_better(event_id, *event_pos); + } else { + // It's a new pending receipt. + pending.push(event_id.clone()); + } + } + } + } + } + pending + } + + fn finish(self) -> Option { + self.best_receipt.map(|event_id| LatestReadReceipt { event_id }) + } } /// Given a set of events coming from sync, for a room, update the @@ -212,80 +307,18 @@ pub(crate) fn compute_notifications( let mut all_events = previous_events_provider.for_room(room_id); all_events.extend(new_events.iter().cloned()); - let event_id_to_pos = create_sync_index(all_events.iter()); - - // We're looking for a receipt that has a position that is at least further (>) - // than the one we knew about (if any). - let mut best_receipt = None; - let mut best_pos = read_receipts - .latest_active - .as_ref() - .and_then(|r| event_id_to_pos.get(&r.event_id)) - .copied(); - - // Try to match stashes receipts against the new events. - read_receipts.pending.retain(|event_id| { - if let Some(event_pos) = event_id_to_pos.get(event_id) { - // We now have a position for an event that had a read receipt, but wasn't found - // before. Consider if it is the most recent now. - if let Some(best_pos) = best_pos.as_mut() { - // Note: by using a strict comparison here, we protect against the - // server sending a receipt on the same event multiple times. - if *event_pos > *best_pos { - *best_pos = *event_pos; - best_receipt = Some(event_id.clone()); - } - } else { - // We didn't have a previous receipt, this is the first one we - // store: remember it. - best_pos = Some(*event_pos); - best_receipt = Some(event_id.clone()); - } - - // Remove this stashed read receipt from the pending list, as it's been - // reconciled with its event. - false - } else { - // Keep it for further iterations. - true + let new_receipt = { + let mut selector = ReceiptSelector::new( + &all_events, + read_receipts.latest_active.as_ref().map(|receipt| &*receipt.event_id), + ); + selector.handle_pending_receipts(&mut read_receipts.pending); + if let Some(receipt_event) = receipt_event { + trace!("Got a new receipt event!"); + read_receipts.pending.extend(selector.handle_new_receipt(user_id, receipt_event)); } - }); - - if let Some(receipt_event) = receipt_event { - trace!("Got a new receipt event!"); - - // Now consider new receipts. - for (event_id, receipts) in &receipt_event.0 { - for ty in [ReceiptType::Read, ReceiptType::ReadPrivate] { - if let Some(receipt) = receipts.get(&ty).and_then(|receipts| receipts.get(user_id)) - { - if matches!(receipt.thread, ReceiptThread::Main | ReceiptThread::Unthreaded) { - if let Some(event_pos) = event_id_to_pos.get(event_id) { - if let Some(best_pos) = best_pos.as_mut() { - // Note: by using a strict comparison here, we protect against the - // server sending a receipt on the same event multiple times. - if *event_pos > *best_pos { - *best_pos = *event_pos; - best_receipt = Some(event_id.clone()); - } - } else { - // We didn't have a previous receipt, this is the first one we - // store: remember it. - best_pos = Some(*event_pos); - best_receipt = Some(event_id.clone()); - } - } else { - // It's a new pending receipt. - read_receipts.pending.insert(event_id.clone()); - } - } - } - } - } - } - - // I swear the `LatestReadReceipt` might get handy at some point. - let new_receipt = best_receipt.map(|r| LatestReadReceipt { event_id: r.clone() }); + selector.finish() + }; if let Some(new_receipt) = new_receipt { // We've found the id of an event to which the receipt attaches. The associated @@ -422,7 +455,7 @@ fn marks_as_unread(event: &Raw, user_id: &UserId) -> bool #[cfg(test)] mod tests { - use std::ops::Not as _; + use std::{collections::BTreeSet, ops::Not as _}; use eyeball_im::Vector; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; @@ -430,14 +463,14 @@ mod tests { use ruma::{ event_id, events::receipt::{ReceiptThread, ReceiptType}, - owned_event_id, + owned_event_id, owned_user_id, push::Action, room_id, user_id, EventId, UserId, }; - use super::{compute_notifications, create_sync_index}; + use super::compute_notifications; use crate::{ - read_receipts::{marks_as_unread, RoomReadReceipts}, + read_receipts::{marks_as_unread, ReceiptSelector, RoomReadReceipts}, PreviousEventsProvider, }; @@ -812,25 +845,25 @@ mod tests { assert_eq!(read_receipts.num_unread, 2); } + fn make_test_events(user_id: &UserId) -> Vector { + let ev1 = sync_timeline_message(user_id, "$1", "With the lights out, it's less dangerous"); + let ev2 = sync_timeline_message(user_id, "$2", "Here we are now, entertain us"); + let ev3 = sync_timeline_message(user_id, "$3", "I feel stupid and contagious"); + let ev4 = sync_timeline_message(user_id, "$4", "Here we are now, entertain us"); + let ev5 = sync_timeline_message(user_id, "$5", "Hello, hello, hello, how low?"); + vec![ev1, ev2, ev3, ev4, ev5].into() + } + /// Test that when multiple receipts come in a single event, we can still find the latest one /// according to the sync order. #[test] fn test_compute_notifications_multiple_receipts_in_one_event() { let user_id = user_id!("@alice:example.org"); - let other_user_id = user_id!("@bob:example.org"); let room_id = room_id!("!room:example.org"); - let ev1 = sync_timeline_message(other_user_id, "$1", "boom"); - let ev2 = sync_timeline_message(other_user_id, "$2", "boom"); - let ev3 = sync_timeline_message(other_user_id, "$3", "boom"); - let ev4 = sync_timeline_message(other_user_id, "$4", "boom"); - let ev5 = sync_timeline_message(other_user_id, "$5", "i want you in my room"); - - let all_events: Vector<_> = - vec![ev1.clone(), ev2.clone(), ev3.clone(), ev4.clone(), ev5.clone()].into(); - - let head_events: Vector<_> = vec![ev1, ev2].into(); - let tail_events = &[ev3, ev4, ev5]; + let all_events = make_test_events(user_id!("@bob:example.org")); + let head_events: Vector<_> = all_events.iter().take(2).cloned().collect(); + let tail_events: Vec<_> = all_events.iter().skip(2).cloned().collect(); for receipt_type_1 in &[ReceiptType::Read, ReceiptType::ReadPrivate] { for receipt_thread_1 in &[ReceiptThread::Unthreaded, ReceiptThread::Main] { @@ -881,7 +914,7 @@ mod tests { room_id, Some(&receipt_event), &head_events, - tail_events, + &tail_events, &mut read_receipts, ) .unwrap()); @@ -897,14 +930,10 @@ mod tests { } #[test] - fn test_create_sync_index() { + fn test_receipt_selector_create_sync_index() { let uid = user_id!("@bob:example.org"); - let ev1 = sync_timeline_message(uid, "$1", "boom"); - let ev2 = sync_timeline_message(uid, "$2", "boom"); - let ev3 = sync_timeline_message(uid, "$3", "boom"); - let ev4 = sync_timeline_message(uid, "$4", "boom"); - let ev5 = sync_timeline_message(uid, "$5", "i want you in my room"); + let events = make_test_events(uid); // An event with no id. let ev6 = SyncTimelineEvent::new(sync_timeline_event!({ @@ -914,7 +943,7 @@ mod tests { "content": { "body": "yolo", "msgtype": "m.text" }, })); - let index = create_sync_index([ev1, ev2, ev3, ev4, ev5, ev6].iter()); + let index = ReceiptSelector::create_sync_index(events.iter().chain(&[ev6])); assert_eq!(*index.get(event_id!("$1")).unwrap(), 0); assert_eq!(*index.get(event_id!("$2")).unwrap(), 1); @@ -924,5 +953,298 @@ mod tests { assert_eq!(index.get(event_id!("$6")), None); assert_eq!(index.len(), 5); + + // Sync order are set according to the position in the vector. + let index = ReceiptSelector::create_sync_index( + [events[1].clone(), events[2].clone(), events[4].clone()].iter(), + ); + + assert_eq!(*index.get(event_id!("$2")).unwrap(), 0); + assert_eq!(*index.get(event_id!("$3")).unwrap(), 1); + assert_eq!(*index.get(event_id!("$5")).unwrap(), 2); + + assert_eq!(index.len(), 3); + } + + #[test] + fn test_receipt_selector_try_select_better() { + let events = make_test_events(user_id!("@bob:example.org")); + + { + // No initial active receipt, so the first receipt we get *will* win. + let mut selector = ReceiptSelector::new(&vec![].into(), None); + selector.try_select_better(event_id!("$1"), 0); + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$1")); + } + + { + // $3 is at pos 2, $1 at position 0, so $3 wins => no new change. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$3"))); + selector.try_select_better(event_id!("$1"), 0); + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // $3 is at pos 2, $4 at position 3, so $4 wins. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$3"))); + selector.try_select_better(event_id!("$4"), 3); + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$4")); + } + } + + #[test] + fn test_receipt_selector_handle_pending_receipts() { + let sender = user_id!("@bob:example.org"); + let ev1 = sync_timeline_message(sender, event_id!("$1"), "yo"); + let ev2 = sync_timeline_message(sender, event_id!("$2"), "well?"); + let events: Vector<_> = vec![ev1, ev2].into(); + + // Each test must be duplicated here: + // - one time it must run with no active receipt, + // - one time it must run with an active receipt (consider that the active receipt may be + // better *or* less good). + + { + // No pending receipt => no better receipt. + let mut selector = ReceiptSelector::new(&events, None); + + let mut pending = BTreeSet::new(); + selector.handle_pending_receipts(&mut pending); + + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // No pending receipt, and there was an active last receipt => no better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + + let mut pending = BTreeSet::new(); + selector.handle_pending_receipts(&mut pending); + + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // A pending receipt for an event that is still missing => no better receipt. + let mut selector = ReceiptSelector::new(&events, None); + + let mut pending = BTreeSet::from_iter([owned_event_id!("$3")]); + selector.handle_pending_receipts(&mut pending); + + assert_eq!(pending.len(), 1); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // Ditto but there was an active receipt => no better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + + let mut pending = BTreeSet::from_iter([owned_event_id!("$3")]); + selector.handle_pending_receipts(&mut pending); + + assert_eq!(pending.len(), 1); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // A pending receipt for an event that is present => better receipt. + let mut selector = ReceiptSelector::new(&events, None); + + let mut pending = BTreeSet::from_iter([owned_event_id!("$2")]); + selector.handle_pending_receipts(&mut pending); + + // The receipt for $2 has been found. + assert!(pending.is_empty()); + + // The new receipt has been returned. + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$2")); + } + + { + // Same, and there was an initial receipt that was less good than the one we selected => better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$1"))); + + let mut pending = BTreeSet::from_iter([owned_event_id!("$2")]); + selector.handle_pending_receipts(&mut pending); + + // The receipt for $2 has been found. + assert!(pending.is_empty()); + + // The new receipt has been returned. + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$2")); + } + + { + // Same, but the previous receipt was better => no better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2"))); + + let mut pending = BTreeSet::from_iter([owned_event_id!("$1")]); + selector.handle_pending_receipts(&mut pending); + + // The receipt for $1 has been found. + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // Mixed found and not found receipt => better receipt. + let mut selector = ReceiptSelector::new(&events, None); + + let mut pending = BTreeSet::from_iter([owned_event_id!("$1"), owned_event_id!("$3")]); + selector.handle_pending_receipts(&mut pending); + + // The receipt for $1 has been found, but not that for $3. + assert_eq!(pending.len(), 1); + assert!(pending.contains(event_id!("$3"))); + + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$1")); + } + } + + #[test] + fn test_receipt_selector_handle_new_receipt() { + let myself = owned_user_id!("@alice:example.org"); + let events = make_test_events(user_id!("@bob:example.org")); + + { + // Thread receipts are ignored. + let mut selector = ReceiptSelector::new(&events, None); + + let receipt_event = EventBuilder::new().make_receipt_event_content([( + owned_event_id!("$5"), + ReceiptType::Read, + myself.clone(), + ReceiptThread::Thread(owned_event_id!("$2")), + )]); + + let pending = selector.handle_new_receipt(&myself, &receipt_event); + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + for receipt_type in [ReceiptType::Read, ReceiptType::ReadPrivate] { + for receipt_thread in [ReceiptThread::Main, ReceiptThread::Unthreaded] { + { + // Receipt for an event we don't know about => it's pending, and no better receipt. + let mut selector = ReceiptSelector::new(&events, None); + + let receipt_event = EventBuilder::new().make_receipt_event_content([( + owned_event_id!("$6"), + receipt_type.clone(), + myself.clone(), + receipt_thread.clone(), + )]); + + let pending = selector.handle_new_receipt(&myself, &receipt_event); + assert_eq!(pending[0], event_id!("$6")); + assert_eq!(pending.len(), 1); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // Receipt for an event we knew about, no initial active receipt => better receipt. + let mut selector = ReceiptSelector::new(&events, None); + + let receipt_event = EventBuilder::new().make_receipt_event_content([( + owned_event_id!("$3"), + receipt_type.clone(), + myself.clone(), + receipt_thread.clone(), + )]); + + let pending = selector.handle_new_receipt(&myself, &receipt_event); + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$3")); + } + + { + // Receipt for an event we knew about, initial active receipt was better => no better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$4"))); + + let receipt_event = EventBuilder::new().make_receipt_event_content([( + owned_event_id!("$3"), + receipt_type.clone(), + myself.clone(), + receipt_thread.clone(), + )]); + + let pending = selector.handle_new_receipt(&myself, &receipt_event); + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert!(best_receipt.is_none()); + } + + { + // Receipt for an event we knew about, initial active receipt was less good => new better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2"))); + + let receipt_event = EventBuilder::new().make_receipt_event_content([( + owned_event_id!("$3"), + receipt_type.clone(), + myself.clone(), + receipt_thread.clone(), + )]); + + let pending = selector.handle_new_receipt(&myself, &receipt_event); + assert!(pending.is_empty()); + + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$3")); + } + } + } // end for + + { + // Final boss: multiple receipts in the receipt event, the best one is used => new better receipt. + let mut selector = ReceiptSelector::new(&events, Some(event_id!("$2"))); + + let receipt_event = EventBuilder::new().make_receipt_event_content([ + ( + owned_event_id!("$4"), + ReceiptType::ReadPrivate, + myself.clone(), + ReceiptThread::Unthreaded, + ), + ( + owned_event_id!("$6"), + ReceiptType::ReadPrivate, + myself.clone(), + ReceiptThread::Main, + ), + (owned_event_id!("$3"), ReceiptType::Read, myself.clone(), ReceiptThread::Main), + ]); + + let pending = selector.handle_new_receipt(&myself, &receipt_event); + assert_eq!(pending.len(), 1); + assert_eq!(pending[0], event_id!("$6")); + + let best_receipt = selector.finish(); + assert_eq!(best_receipt.unwrap().event_id, event_id!("$4")); + } } }