From 2d657fe908dd1f40070b7ddece8d5095db9a0301 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 4 Aug 2025 15:50:25 +0200 Subject: [PATCH] feat(sdk): `LatestEvents` listens to the `SendQueue`. This patch updates `LatestEvents` to listen to the updates from the `SendQueue`. The `listen_to_event_cache_and_send_queue_updates` function contains the important change. A new `LatestEventQueueUpdate` enum is added to represent either an update from the event cache, or from the send queue. So far, `compute_latest_events` does nothing in particular, apart from panicking with a `todo!()` when a send queue update is met. --- crates/matrix-sdk/src/latest_events/mod.rs | 214 +++++++++++++++++++-- 1 file changed, 196 insertions(+), 18 deletions(-) diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index 2418f6d87..d43ac9d51 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -57,19 +57,22 @@ use std::{ pub use error::LatestEventsError; use eyeball::{AsyncLock, Subscriber}; -use futures_util::{select, FutureExt}; +use futures_util::FutureExt; use latest_event::LatestEvent; pub use latest_event::LatestEventValue; use matrix_sdk_common::executor::{spawn, AbortOnDrop, JoinHandleExt as _}; use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId}; -use tokio::sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tokio::{ + select, + sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; use tracing::error; use crate::{ client::WeakClient, event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate}, room::WeakRoom, - send_queue::SendQueue, + send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate}, }; /// The entry point to fetch the [`LatestEventValue`] for rooms or threads. @@ -408,6 +411,25 @@ enum RoomRegistration { Remove(OwnedRoomId), } +/// Represents the kind of updates the [`compute_latest_events_task`] will have +/// to deal with. +enum LatestEventQueueUpdate { + /// An update from the [`EventCache`] happened. + EventCache { + /// The ID of the room that has triggered the update. + room_id: OwnedRoomId, + }, + + /// An update from the [`SendQueue`] happened. + SendQueue { + /// The ID of the room that has triggered the update. + room_id: OwnedRoomId, + + /// The update itself. + update: RoomSendQueueUpdate, + }, +} + /// Type holding the [`LatestEvent`] for a room and for all its threads. #[derive(Debug)] struct RoomLatestEvents { @@ -529,11 +551,12 @@ async fn listen_to_event_cache_and_send_queue_updates_task( registered_rooms: Arc, mut room_registration_receiver: mpsc::Receiver, event_cache: EventCache, - _send_queue: SendQueue, - latest_event_queue_sender: mpsc::UnboundedSender, + send_queue: SendQueue, + latest_event_queue_sender: mpsc::UnboundedSender, ) { let mut event_cache_generic_updates_subscriber = event_cache.subscribe_to_room_generic_updates(); + let mut send_queue_generic_updates_subscriber = send_queue.subscribe(); // Initialise the list of rooms that are listened. // @@ -547,6 +570,7 @@ async fn listen_to_event_cache_and_send_queue_updates_task( if listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut event_cache_generic_updates_subscriber, + &mut send_queue_generic_updates_subscriber, &mut listened_rooms, &latest_event_queue_sender, ) @@ -567,10 +591,15 @@ async fn listen_to_event_cache_and_send_queue_updates_task( async fn listen_to_event_cache_and_send_queue_updates( room_registration_receiver: &mut mpsc::Receiver, event_cache_generic_updates_subscriber: &mut broadcast::Receiver, + send_queue_generic_updates_subscriber: &mut broadcast::Receiver, listened_rooms: &mut HashSet, - latest_event_queue_sender: &mpsc::UnboundedSender, + latest_event_queue_sender: &mpsc::UnboundedSender, ) -> ControlFlow<()> { + // We need a biased select here: `room_registration_receiver` must have the + // priority over other futures. select! { + biased; + update = room_registration_receiver.recv().fuse() => { match update { Some(RoomRegistration::Add(room_id)) => { @@ -592,7 +621,9 @@ async fn listen_to_event_cache_and_send_queue_updates( let room_id = room_event_cache_generic_update.room_id; if listened_rooms.contains(&room_id) { - let _ = latest_event_queue_sender.send(room_id); + let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache { + room_id + }); } } else { error!("`event_cache_generic_updates` channel has been closed"); @@ -600,6 +631,21 @@ async fn listen_to_event_cache_and_send_queue_updates( return ControlFlow::Break(()); } } + + send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => { + if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update { + if listened_rooms.contains(&room_id) { + let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue { + room_id, + update + }); + } + } else { + error!("`send_queue_generic_updates` channel has been closed"); + + return ControlFlow::Break(()); + } + } } ControlFlow::Continue(()) @@ -612,7 +658,7 @@ async fn listen_to_event_cache_and_send_queue_updates( /// [`listen_to_event_cache_and_send_queue_updates_task`]. async fn compute_latest_events_task( registered_rooms: Arc, - mut latest_event_queue_receiver: mpsc::UnboundedReceiver, + mut latest_event_queue_receiver: mpsc::UnboundedReceiver, ) { const BUFFER_SIZE: usize = 16; @@ -626,16 +672,29 @@ async fn compute_latest_events_task( error!("`compute_latest_events_task` has stopped"); } -async fn compute_latest_events(registered_rooms: &RegisteredRooms, for_rooms: &[OwnedRoomId]) { - for room_id in for_rooms { - let mut rooms = registered_rooms.rooms.write().await; +async fn compute_latest_events( + registered_rooms: &RegisteredRooms, + latest_event_queue_updates: &[LatestEventQueueUpdate], +) { + for latest_event_queue_update in latest_event_queue_updates { + match latest_event_queue_update { + LatestEventQueueUpdate::EventCache { room_id } => { + let mut rooms = registered_rooms.rooms.write().await; - if let Some(room_latest_events) = rooms.get_mut(room_id) { - room_latest_events.update().await; - } else { - error!(?room_id, "Failed to find the room"); + if let Some(room_latest_events) = rooms.get_mut(room_id) { + room_latest_events.update().await; + } else { + error!(?room_id, "Failed to find the room"); - continue; + continue; + } + } + + LatestEventQueueUpdate::SendQueue { room_id, update } => { + // let mut rooms = registered_rooms.rooms.write().await; + + todo!() + } } } } @@ -650,12 +709,12 @@ mod tests { RoomState, }; use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder}; - use ruma::{event_id, owned_room_id, room_id, user_id}; + use ruma::{event_id, owned_room_id, room_id, user_id, OwnedTransactionId}; use stream_assert::assert_pending; use super::{ broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet, LatestEventValue, - RoomEventCacheGenericUpdate, RoomRegistration, + RoomEventCacheGenericUpdate, RoomRegistration, RoomSendQueueUpdate, SendQueueUpdate, }; use crate::test_utils::mocks::MatrixMockServer; @@ -819,6 +878,8 @@ mod tests { let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -831,6 +892,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -850,6 +912,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -873,6 +936,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -892,6 +956,8 @@ mod tests { let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -902,6 +968,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -920,6 +987,8 @@ mod tests { let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -933,6 +1002,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -958,6 +1028,82 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &mut listened_rooms, + &latest_event_queue_sender, + ) + .await + .is_continue()); + } + + assert_eq!(listened_rooms.len(), 1); + assert!(listened_rooms.contains(&room_id)); + + // A latest event computation has been triggered! + assert!(latest_event_queue_receiver.is_empty().not()); + } + } + + #[async_test] + async fn test_inputs_task_can_listen_to_send_queue() { + let room_id = owned_room_id!("!r0"); + + let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); + let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = + broadcast::channel(1); + let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); + let mut listened_rooms = HashSet::new(); + let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); + + // New send queue update, but the `LatestEvents` isn't listening to it. + { + send_queue_generic_update_sender + .send(SendQueueUpdate { + room_id: room_id.clone(), + update: RoomSendQueueUpdate::SentEvent { + transaction_id: OwnedTransactionId::from("txnid0"), + event_id: event_id!("$ev0").to_owned(), + }, + }) + .unwrap(); + + // Run the task. + assert!(listen_to_event_cache_and_send_queue_updates( + &mut room_registration_receiver, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &mut listened_rooms, + &latest_event_queue_sender, + ) + .await + .is_continue()); + + assert!(listened_rooms.is_empty()); + + // No latest event computation has been triggered. + assert!(latest_event_queue_receiver.is_empty()); + } + + // New send queue update, but this time, the `LatestEvents` is listening to it. + { + room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap(); + send_queue_generic_update_sender + .send(SendQueueUpdate { + room_id: room_id.clone(), + update: RoomSendQueueUpdate::SentEvent { + transaction_id: OwnedTransactionId::from("txnid1"), + event_id: event_id!("$ev1").to_owned(), + }, + }) + .unwrap(); + + // Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`. + for _ in 0..2 { + assert!(listen_to_event_cache_and_send_queue_updates( + &mut room_registration_receiver, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -978,6 +1124,8 @@ mod tests { let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -988,6 +1136,36 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &mut listened_rooms, + &latest_event_queue_sender, + ) + .await + // It breaks! + .is_break()); + + assert_eq!(listened_rooms.len(), 0); + assert!(latest_event_queue_receiver.is_empty()); + } + + #[async_test] + async fn test_inputs_task_stops_when_send_queue_channel_is_closed() { + let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); + let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = + broadcast::channel(1); + let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); + let mut listened_rooms = HashSet::new(); + let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); + + // Drop the sender to close the channel. + drop(send_queue_generic_update_sender); + + // Run the task. + assert!(listen_to_event_cache_and_send_queue_updates( + &mut room_registration_receiver, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, )