feat(sdk): LatestEvents listens to the SendQueue.

This patch updates `LatestEvents` to listen to the updates from the
`SendQueue`. The `listen_to_event_cache_and_send_queue_updates` function
contains the important change. A new `LatestEventQueueUpdate` enum is
added to represent either an update from the event cache, or from the
send queue.

So far, `compute_latest_events` does nothing in particular, apart from
panicking with a `todo!()` when a send queue update is met.
This commit is contained in:
Ivan Enderlin
2025-08-04 15:50:25 +02:00
parent 5e43177d3a
commit 2d657fe908

View File

@@ -57,19 +57,22 @@ use std::{
pub use error::LatestEventsError;
use eyeball::{AsyncLock, Subscriber};
use futures_util::{select, FutureExt};
use futures_util::FutureExt;
use latest_event::LatestEvent;
pub use latest_event::LatestEventValue;
use matrix_sdk_common::executor::{spawn, AbortOnDrop, JoinHandleExt as _};
use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId};
use tokio::sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::{
select,
sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
use tracing::error;
use crate::{
client::WeakClient,
event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate},
room::WeakRoom,
send_queue::SendQueue,
send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
};
/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
@@ -408,6 +411,25 @@ enum RoomRegistration {
Remove(OwnedRoomId),
}
/// Represents the kind of updates the [`compute_latest_events_task`] will have
/// to deal with.
enum LatestEventQueueUpdate {
/// An update from the [`EventCache`] happened.
EventCache {
/// The ID of the room that has triggered the update.
room_id: OwnedRoomId,
},
/// An update from the [`SendQueue`] happened.
SendQueue {
/// The ID of the room that has triggered the update.
room_id: OwnedRoomId,
/// The update itself.
update: RoomSendQueueUpdate,
},
}
/// Type holding the [`LatestEvent`] for a room and for all its threads.
#[derive(Debug)]
struct RoomLatestEvents {
@@ -529,11 +551,12 @@ async fn listen_to_event_cache_and_send_queue_updates_task(
registered_rooms: Arc<RegisteredRooms>,
mut room_registration_receiver: mpsc::Receiver<RoomRegistration>,
event_cache: EventCache,
_send_queue: SendQueue,
latest_event_queue_sender: mpsc::UnboundedSender<OwnedRoomId>,
send_queue: SendQueue,
latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
) {
let mut event_cache_generic_updates_subscriber =
event_cache.subscribe_to_room_generic_updates();
let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
// Initialise the list of rooms that are listened.
//
@@ -547,6 +570,7 @@ async fn listen_to_event_cache_and_send_queue_updates_task(
if listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut event_cache_generic_updates_subscriber,
&mut send_queue_generic_updates_subscriber,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -567,10 +591,15 @@ async fn listen_to_event_cache_and_send_queue_updates_task(
async fn listen_to_event_cache_and_send_queue_updates(
room_registration_receiver: &mut mpsc::Receiver<RoomRegistration>,
event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
listened_rooms: &mut HashSet<OwnedRoomId>,
latest_event_queue_sender: &mpsc::UnboundedSender<OwnedRoomId>,
latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
) -> ControlFlow<()> {
// We need a biased select here: `room_registration_receiver` must have the
// priority over other futures.
select! {
biased;
update = room_registration_receiver.recv().fuse() => {
match update {
Some(RoomRegistration::Add(room_id)) => {
@@ -592,7 +621,9 @@ async fn listen_to_event_cache_and_send_queue_updates(
let room_id = room_event_cache_generic_update.room_id;
if listened_rooms.contains(&room_id) {
let _ = latest_event_queue_sender.send(room_id);
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
room_id
});
}
} else {
error!("`event_cache_generic_updates` channel has been closed");
@@ -600,6 +631,21 @@ async fn listen_to_event_cache_and_send_queue_updates(
return ControlFlow::Break(());
}
}
send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => {
if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
if listened_rooms.contains(&room_id) {
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
room_id,
update
});
}
} else {
error!("`send_queue_generic_updates` channel has been closed");
return ControlFlow::Break(());
}
}
}
ControlFlow::Continue(())
@@ -612,7 +658,7 @@ async fn listen_to_event_cache_and_send_queue_updates(
/// [`listen_to_event_cache_and_send_queue_updates_task`].
async fn compute_latest_events_task(
registered_rooms: Arc<RegisteredRooms>,
mut latest_event_queue_receiver: mpsc::UnboundedReceiver<OwnedRoomId>,
mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
) {
const BUFFER_SIZE: usize = 16;
@@ -626,16 +672,29 @@ async fn compute_latest_events_task(
error!("`compute_latest_events_task` has stopped");
}
async fn compute_latest_events(registered_rooms: &RegisteredRooms, for_rooms: &[OwnedRoomId]) {
for room_id in for_rooms {
let mut rooms = registered_rooms.rooms.write().await;
async fn compute_latest_events(
registered_rooms: &RegisteredRooms,
latest_event_queue_updates: &[LatestEventQueueUpdate],
) {
for latest_event_queue_update in latest_event_queue_updates {
match latest_event_queue_update {
LatestEventQueueUpdate::EventCache { room_id } => {
let mut rooms = registered_rooms.rooms.write().await;
if let Some(room_latest_events) = rooms.get_mut(room_id) {
room_latest_events.update().await;
} else {
error!(?room_id, "Failed to find the room");
if let Some(room_latest_events) = rooms.get_mut(room_id) {
room_latest_events.update().await;
} else {
error!(?room_id, "Failed to find the room");
continue;
continue;
}
}
LatestEventQueueUpdate::SendQueue { room_id, update } => {
// let mut rooms = registered_rooms.rooms.write().await;
todo!()
}
}
}
}
@@ -650,12 +709,12 @@ mod tests {
RoomState,
};
use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder};
use ruma::{event_id, owned_room_id, room_id, user_id};
use ruma::{event_id, owned_room_id, room_id, user_id, OwnedTransactionId};
use stream_assert::assert_pending;
use super::{
broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet, LatestEventValue,
RoomEventCacheGenericUpdate, RoomRegistration,
RoomEventCacheGenericUpdate, RoomRegistration, RoomSendQueueUpdate, SendQueueUpdate,
};
use crate::test_utils::mocks::MatrixMockServer;
@@ -819,6 +878,8 @@ mod tests {
let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
@@ -831,6 +892,7 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -850,6 +912,7 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -873,6 +936,7 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -892,6 +956,8 @@ mod tests {
let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
@@ -902,6 +968,7 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -920,6 +987,8 @@ mod tests {
let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
@@ -933,6 +1002,7 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -958,6 +1028,82 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue());
}
assert_eq!(listened_rooms.len(), 1);
assert!(listened_rooms.contains(&room_id));
// A latest event computation has been triggered!
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_send_queue() {
let room_id = owned_room_id!("!r0");
let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
// New send queue update, but the `LatestEvents` isn't listening to it.
{
send_queue_generic_update_sender
.send(SendQueueUpdate {
room_id: room_id.clone(),
update: RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid0"),
event_id: event_id!("$ev0").to_owned(),
},
})
.unwrap();
// Run the task.
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue());
assert!(listened_rooms.is_empty());
// No latest event computation has been triggered.
assert!(latest_event_queue_receiver.is_empty());
}
// New send queue update, but this time, the `LatestEvents` is listening to it.
{
room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
send_queue_generic_update_sender
.send(SendQueueUpdate {
room_id: room_id.clone(),
update: RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid1"),
event_id: event_id!("$ev1").to_owned(),
},
})
.unwrap();
// Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`.
for _ in 0..2 {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
@@ -978,6 +1124,8 @@ mod tests {
let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
@@ -988,6 +1136,36 @@ mod tests {
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
// It breaks!
.is_break());
assert_eq!(listened_rooms.len(), 0);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
// Drop the sender to close the channel.
drop(send_queue_generic_update_sender);
// Run the task.
assert!(listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)