refactor(event cache): move the listening to linked chunk updates to its own function, and introduce a select! at the top level

This commit is contained in:
Benjamin Bouvier
2025-08-14 13:00:36 +02:00
parent 4fc28c4701
commit 705d6f870e

View File

@@ -53,9 +53,12 @@ use matrix_sdk_common::executor::{spawn, JoinHandle};
use matrix_sdk_search::error::IndexError;
use room::RoomEventCacheState;
use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId};
use tokio::sync::{
broadcast::{channel, error::RecvError, Receiver, Sender},
mpsc, Mutex, RwLock,
use tokio::{
select,
sync::{
broadcast::{channel, error::RecvError, Receiver, Sender},
mpsc, Mutex, RwLock,
},
};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
@@ -422,6 +425,92 @@ impl EventCache {
self.inner.generic_update_sender.subscribe()
}
async fn handle_thread_subscriber_linked_chunk_update(
client: &WeakClient,
thread_subscriber_sender: &Sender<()>,
up: RoomEventCacheLinkedChunkUpdate,
) -> bool {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
return false;
};
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
trace!("received an update for a non-thread linked chunk, ignoring");
return true;
};
let Some(room) = client.get_room(room_id) else {
warn!(%room_id, "unknown room");
return true;
};
let thread_root = thread_root.clone();
let new_events = up.events();
if new_events.is_empty() {
// No new events, nothing to do.
return true;
}
// This `PushContext` is going to be used to compute whether an in-thread event
// would trigger a mention.
//
// Of course, we're not interested in an in-thread event causing a mention,
// because it's part of a thread we've subscribed to. So the
// `PushContext` must not include the check for thread subscriptions (otherwise
// it would be impossible to subscribe to new threads).
let with_thread_subscriptions = false;
let Some(push_context) = room
.push_context_internal(with_thread_subscriptions)
.await
.inspect_err(|err| {
warn!("Failed to get push context for threads: {err}");
})
.ok()
.flatten()
else {
warn!("Missing push context for thread subscriptions.");
return true;
};
let mut subscribe_up_to = None;
// Find if there's an event that would trigger a mention for the current
// user, iterating from the end of the new events towards the oldest,
for ev in new_events.into_iter().rev() {
if push_context
.for_event(ev.raw())
.await
.into_iter()
.any(|action| action.should_notify())
{
let Some(event_id) = ev.event_id() else {
// Shouldn't happen.
continue;
};
subscribe_up_to = Some(event_id);
break;
}
}
// And if we've found such a mention, subscribe to the thread up to this
// event.
if let Some(event_id) = subscribe_up_to {
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
}
}
return true;
}
#[instrument(skip_all)]
async fn thread_subscriber_task(
client: WeakClient,
@@ -436,97 +525,23 @@ impl EventCache {
let mut rx = linked_chunk_update_sender.subscribe();
loop {
match rx.recv().await {
Ok(up) => {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
break;
};
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
trace!("received an update for a non-thread linked chunk, ignoring");
continue;
};
let Some(room) = client.get_room(&room_id) else {
warn!(%room_id, "unknown room");
continue;
};
let thread_root = thread_root.clone();
let new_events = up.events();
if new_events.is_empty() {
// No new events, nothing to do.
continue;
}
// This `PushContext` is going to be used to compute whether an in-thread event
// would trigger a mention.
//
// Of course, we're not interested in an in-thread event causing a mention,
// because it's part of a thread we've subscribed to. So the
// `PushContext` must not include the check for thread subscriptions (otherwise
// it would be impossible to subscribe to new threads).
let with_thread_subscriptions = false;
let Some(push_context) = room
.push_context_internal(with_thread_subscriptions)
.await
.inspect_err(|err| {
warn!("Failed to get push context for threads: {err}");
})
.ok()
.flatten()
else {
warn!("Missing push context for thread subscriptions.");
continue;
};
let mut subscribe_up_to = None;
// Find if there's an event that would trigger a mention for the current
// user, iterating from the end of the new events towards the oldest,
for ev in new_events.into_iter().rev() {
if push_context
.for_event(ev.raw())
.await
.into_iter()
.any(|action| action.should_notify())
{
let Some(event_id) = ev.event_id() else {
// Shouldn't happen.
continue;
};
subscribe_up_to = Some(event_id.to_owned());
select! {
res = rx.recv() => {
match res {
Ok(up) => {
if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
break;
}
}
Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}
}
// And if we've found such a mention, subscribe to the thread up to this
// event.
if let Some(event_id) = subscribe_up_to {
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
if let Err(err) =
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
{
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
}