feat(skd): compute_latest_events calls the new LatestEvent::update method.

This commit is contained in:
Ivan Enderlin
2025-07-01 14:55:34 +02:00
parent 6f84a44a1c
commit 3eeb046e62
2 changed files with 91 additions and 30 deletions

View File

@@ -15,32 +15,53 @@
use eyeball::{AsyncLock, SharedObservable, Subscriber};
use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId};
use crate::event_cache::RoomEventCache;
/// The latest event of a room or a thread.
///
/// Use [`LatestEvent::subscribe`] to get a stream of updates.
#[derive(Debug)]
pub(super) struct LatestEvent {
/// The room owning this latest event.
_room_id: OwnedRoomId,
room_id: OwnedRoomId,
/// The thread (if any) owning this latest event.
_thread_id: Option<OwnedEventId>,
thread_id: Option<OwnedEventId>,
/// The latest event value.
value: SharedObservable<LatestEventValue, AsyncLock>,
}
impl LatestEvent {
pub(super) fn new(room_id: &RoomId, thread_id: Option<&EventId>) -> Option<Self> {
Some(Self {
_room_id: room_id.to_owned(),
_thread_id: thread_id.map(ToOwned::to_owned),
value: SharedObservable::new_async(LatestEventValue::None),
})
pub(super) async fn new(
room_id: &RoomId,
thread_id: Option<&EventId>,
room_event_cache: &RoomEventCache,
) -> Self {
Self {
room_id: room_id.to_owned(),
thread_id: thread_id.map(ToOwned::to_owned),
value: SharedObservable::new_async(
LatestEventValue::new(room_id, thread_id, room_event_cache).await,
),
}
}
/// Return a [`Subscriber`] to new values.
pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
self.value.subscribe().await
}
/// Update the inner latest event value.
pub async fn update(&mut self, room_event_cache: &RoomEventCache) {
let new_value =
LatestEventValue::new(&self.room_id, self.thread_id.as_deref(), room_event_cache).await;
match new_value {
LatestEventValue::None => {
// The new value is `None`. It means no new value has been
// computed. Let's keep the old value.
}
}
}
}
/// A latest event value!
@@ -49,3 +70,13 @@ pub enum LatestEventValue {
/// No value has been computed yet, or no candidate value was found.
None,
}
impl LatestEventValue {
async fn new(
_room_id: &RoomId,
_thread_id: Option<&EventId>,
_room_event_cache: &RoomEventCache,
) -> Self {
LatestEventValue::None
}
}

View File

@@ -66,7 +66,7 @@ use tokio::sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::error;
use crate::{
event_cache::{EventCache, EventCacheError, RoomEventCacheGenericUpdate},
event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate},
send_queue::SendQueue,
};
@@ -270,12 +270,12 @@ impl RegisteredRooms {
// In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's
// create and insert it.
if room_latest_event.per_thread.contains_key(thread_id).not() {
if let Some(latest_event) =
RoomLatestEvents::create_latest_event_for(room_id, Some(thread_id))
.await
{
room_latest_event.per_thread.insert(thread_id.to_owned(), latest_event);
}
room_latest_event.per_thread.insert(
thread_id.to_owned(),
room_latest_event
.create_latest_event_for(room_id, Some(thread_id))
.await,
);
}
}
@@ -398,6 +398,8 @@ struct RoomLatestEvents {
/// The latest events for each thread.
per_thread: HashMap<OwnedEventId, LatestEvent>,
room_event_cache: RoomEventCache,
}
impl RoomLatestEvents {
@@ -405,7 +407,7 @@ impl RoomLatestEvents {
room_id: &RoomId,
event_cache: &EventCache,
) -> Result<Option<Self>, LatestEventsError> {
let _room_event_cache = match event_cache.for_room(room_id).await {
let room_event_cache = match event_cache.for_room(room_id).await {
// It's fine to drop the `EventCacheDropHandles` here as the caller
// (`LatestEventState`) owns a clone of the `EventCache`.
Ok((room_event_cache, _drop_handles)) => room_event_cache,
@@ -413,20 +415,28 @@ impl RoomLatestEvents {
Err(err) => return Err(LatestEventsError::EventCache(err)),
};
let latest_event = match Self::create_latest_event_for(room_id, None).await {
Some(latest_event) => latest_event,
None => return Ok(None),
};
Ok(Some(Self { for_the_room: latest_event, per_thread: HashMap::new() }))
Ok(Some(Self {
for_the_room: Self::create_latest_event_for_inner(room_id, None, &room_event_cache)
.await,
per_thread: HashMap::new(),
room_event_cache,
}))
}
#[allow(clippy::unused_async)]
async fn create_latest_event_for(
&self,
room_id: &RoomId,
thread_id: Option<&EventId>,
) -> Option<LatestEvent> {
LatestEvent::new(room_id, thread_id)
) -> LatestEvent {
Self::create_latest_event_for_inner(room_id, thread_id, &self.room_event_cache).await
}
async fn create_latest_event_for_inner(
room_id: &RoomId,
thread_id: Option<&EventId>,
room_event_cache: &RoomEventCache,
) -> LatestEvent {
LatestEvent::new(room_id, thread_id, room_event_cache).await
}
/// Get the [`LatestEvent`] for the room.
@@ -438,6 +448,15 @@ impl RoomLatestEvents {
fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
self.per_thread.get(thread_id)
}
/// Update the latest events for the room and its threads.
async fn update(&mut self) {
self.for_the_room.update(&self.room_event_cache).await;
for latest_event in self.per_thread.values_mut() {
latest_event.update(&self.room_event_cache).await;
}
}
}
/// The task responsible to listen to the [`EventCache`] and the [`SendQueue`].
@@ -543,9 +562,11 @@ async fn compute_latest_events_task(
registered_rooms: Arc<RegisteredRooms>,
mut latest_event_queue_receiver: mpsc::UnboundedReceiver<OwnedRoomId>,
) {
let mut buffer = Vec::with_capacity(16);
const BUFFER_SIZE: usize = 16;
while latest_event_queue_receiver.recv_many(&mut buffer, 16).await > 0 {
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
compute_latest_events(&registered_rooms, &buffer).await;
buffer.clear();
}
@@ -553,9 +574,18 @@ async fn compute_latest_events_task(
error!("`compute_latest_events_task` has stopped");
}
#[allow(clippy::unused_async)]
async fn compute_latest_events(_registered_rooms: &RegisteredRooms, _for_rooms: &[OwnedRoomId]) {
// todo
async fn compute_latest_events(registered_rooms: &RegisteredRooms, for_rooms: &[OwnedRoomId]) {
for room_id in for_rooms {
let mut rooms = registered_rooms.rooms.write().await;
if let Some(room_latest_events) = rooms.get_mut(room_id) {
room_latest_events.update().await;
} else {
error!(?room_id, "Failed to find the room");
continue;
}
}
}
#[cfg(all(test, not(target_family = "wasm")))]