diff --git a/crates/matrix-sdk/src/latest_events/latest_event.rs b/crates/matrix-sdk/src/latest_events/latest_event.rs index cc9301335..3466a2403 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event.rs @@ -15,32 +15,53 @@ use eyeball::{AsyncLock, SharedObservable, Subscriber}; use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId}; +use crate::event_cache::RoomEventCache; + /// The latest event of a room or a thread. /// /// Use [`LatestEvent::subscribe`] to get a stream of updates. #[derive(Debug)] pub(super) struct LatestEvent { /// The room owning this latest event. - _room_id: OwnedRoomId, + room_id: OwnedRoomId, /// The thread (if any) owning this latest event. - _thread_id: Option, + thread_id: Option, /// The latest event value. value: SharedObservable, } impl LatestEvent { - pub(super) fn new(room_id: &RoomId, thread_id: Option<&EventId>) -> Option { - Some(Self { - _room_id: room_id.to_owned(), - _thread_id: thread_id.map(ToOwned::to_owned), - value: SharedObservable::new_async(LatestEventValue::None), - }) + pub(super) async fn new( + room_id: &RoomId, + thread_id: Option<&EventId>, + room_event_cache: &RoomEventCache, + ) -> Self { + Self { + room_id: room_id.to_owned(), + thread_id: thread_id.map(ToOwned::to_owned), + value: SharedObservable::new_async( + LatestEventValue::new(room_id, thread_id, room_event_cache).await, + ), + } } /// Return a [`Subscriber`] to new values. pub async fn subscribe(&self) -> Subscriber { self.value.subscribe().await } + + /// Update the inner latest event value. + pub async fn update(&mut self, room_event_cache: &RoomEventCache) { + let new_value = + LatestEventValue::new(&self.room_id, self.thread_id.as_deref(), room_event_cache).await; + + match new_value { + LatestEventValue::None => { + // The new value is `None`. It means no new value has been + // computed. Let's keep the old value. + } + } + } } /// A latest event value! @@ -49,3 +70,13 @@ pub enum LatestEventValue { /// No value has been computed yet, or no candidate value was found. None, } + +impl LatestEventValue { + async fn new( + _room_id: &RoomId, + _thread_id: Option<&EventId>, + _room_event_cache: &RoomEventCache, + ) -> Self { + LatestEventValue::None + } +} diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index cba6dd59d..00f052c11 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -66,7 +66,7 @@ use tokio::sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::error; use crate::{ - event_cache::{EventCache, EventCacheError, RoomEventCacheGenericUpdate}, + event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate}, send_queue::SendQueue, }; @@ -270,12 +270,12 @@ impl RegisteredRooms { // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's // create and insert it. if room_latest_event.per_thread.contains_key(thread_id).not() { - if let Some(latest_event) = - RoomLatestEvents::create_latest_event_for(room_id, Some(thread_id)) - .await - { - room_latest_event.per_thread.insert(thread_id.to_owned(), latest_event); - } + room_latest_event.per_thread.insert( + thread_id.to_owned(), + room_latest_event + .create_latest_event_for(room_id, Some(thread_id)) + .await, + ); } } @@ -398,6 +398,8 @@ struct RoomLatestEvents { /// The latest events for each thread. per_thread: HashMap, + + room_event_cache: RoomEventCache, } impl RoomLatestEvents { @@ -405,7 +407,7 @@ impl RoomLatestEvents { room_id: &RoomId, event_cache: &EventCache, ) -> Result, LatestEventsError> { - let _room_event_cache = match event_cache.for_room(room_id).await { + let room_event_cache = match event_cache.for_room(room_id).await { // It's fine to drop the `EventCacheDropHandles` here as the caller // (`LatestEventState`) owns a clone of the `EventCache`. Ok((room_event_cache, _drop_handles)) => room_event_cache, @@ -413,20 +415,28 @@ impl RoomLatestEvents { Err(err) => return Err(LatestEventsError::EventCache(err)), }; - let latest_event = match Self::create_latest_event_for(room_id, None).await { - Some(latest_event) => latest_event, - None => return Ok(None), - }; - - Ok(Some(Self { for_the_room: latest_event, per_thread: HashMap::new() })) + Ok(Some(Self { + for_the_room: Self::create_latest_event_for_inner(room_id, None, &room_event_cache) + .await, + per_thread: HashMap::new(), + room_event_cache, + })) } - #[allow(clippy::unused_async)] async fn create_latest_event_for( + &self, room_id: &RoomId, thread_id: Option<&EventId>, - ) -> Option { - LatestEvent::new(room_id, thread_id) + ) -> LatestEvent { + Self::create_latest_event_for_inner(room_id, thread_id, &self.room_event_cache).await + } + + async fn create_latest_event_for_inner( + room_id: &RoomId, + thread_id: Option<&EventId>, + room_event_cache: &RoomEventCache, + ) -> LatestEvent { + LatestEvent::new(room_id, thread_id, room_event_cache).await } /// Get the [`LatestEvent`] for the room. @@ -438,6 +448,15 @@ impl RoomLatestEvents { fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> { self.per_thread.get(thread_id) } + + /// Update the latest events for the room and its threads. + async fn update(&mut self) { + self.for_the_room.update(&self.room_event_cache).await; + + for latest_event in self.per_thread.values_mut() { + latest_event.update(&self.room_event_cache).await; + } + } } /// The task responsible to listen to the [`EventCache`] and the [`SendQueue`]. @@ -543,9 +562,11 @@ async fn compute_latest_events_task( registered_rooms: Arc, mut latest_event_queue_receiver: mpsc::UnboundedReceiver, ) { - let mut buffer = Vec::with_capacity(16); + const BUFFER_SIZE: usize = 16; - while latest_event_queue_receiver.recv_many(&mut buffer, 16).await > 0 { + let mut buffer = Vec::with_capacity(BUFFER_SIZE); + + while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 { compute_latest_events(®istered_rooms, &buffer).await; buffer.clear(); } @@ -553,9 +574,18 @@ async fn compute_latest_events_task( error!("`compute_latest_events_task` has stopped"); } -#[allow(clippy::unused_async)] -async fn compute_latest_events(_registered_rooms: &RegisteredRooms, _for_rooms: &[OwnedRoomId]) { - // todo +async fn compute_latest_events(registered_rooms: &RegisteredRooms, for_rooms: &[OwnedRoomId]) { + for room_id in for_rooms { + let mut rooms = registered_rooms.rooms.write().await; + + if let Some(room_latest_events) = rooms.get_mut(room_id) { + room_latest_events.update().await; + } else { + error!(?room_id, "Failed to find the room"); + + continue; + } + } } #[cfg(all(test, not(target_family = "wasm")))]