From 705d6f870ec10b2de9eb7bd1faab78f5083bab3c Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 14 Aug 2025 13:00:36 +0200 Subject: [PATCH] refactor(event cache): move the listening to linked chunk updates to its own function, and introduce a select! at the top level --- crates/matrix-sdk/src/event_cache/mod.rs | 193 ++++++++++++----------- 1 file changed, 104 insertions(+), 89 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 284a363ab..7f2cf758f 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -53,9 +53,12 @@ use matrix_sdk_common::executor::{spawn, JoinHandle}; use matrix_sdk_search::error::IndexError; use room::RoomEventCacheState; use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId}; -use tokio::sync::{ - broadcast::{channel, error::RecvError, Receiver, Sender}, - mpsc, Mutex, RwLock, +use tokio::{ + select, + sync::{ + broadcast::{channel, error::RecvError, Receiver, Sender}, + mpsc, Mutex, RwLock, + }, }; use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span}; @@ -422,6 +425,92 @@ impl EventCache { self.inner.generic_update_sender.subscribe() } + async fn handle_thread_subscriber_linked_chunk_update( + client: &WeakClient, + thread_subscriber_sender: &Sender<()>, + up: RoomEventCacheLinkedChunkUpdate, + ) -> bool { + let Some(client) = client.get() else { + // Client shutting down. + debug!("Client is shutting down, exiting thread subscriber task"); + return false; + }; + + let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else { + trace!("received an update for a non-thread linked chunk, ignoring"); + return true; + }; + + let Some(room) = client.get_room(room_id) else { + warn!(%room_id, "unknown room"); + return true; + }; + + let thread_root = thread_root.clone(); + + let new_events = up.events(); + if new_events.is_empty() { + // No new events, nothing to do. + return true; + } + + // This `PushContext` is going to be used to compute whether an in-thread event + // would trigger a mention. + // + // Of course, we're not interested in an in-thread event causing a mention, + // because it's part of a thread we've subscribed to. So the + // `PushContext` must not include the check for thread subscriptions (otherwise + // it would be impossible to subscribe to new threads). + + let with_thread_subscriptions = false; + + let Some(push_context) = room + .push_context_internal(with_thread_subscriptions) + .await + .inspect_err(|err| { + warn!("Failed to get push context for threads: {err}"); + }) + .ok() + .flatten() + else { + warn!("Missing push context for thread subscriptions."); + return true; + }; + + let mut subscribe_up_to = None; + + // Find if there's an event that would trigger a mention for the current + // user, iterating from the end of the new events towards the oldest, + for ev in new_events.into_iter().rev() { + if push_context + .for_event(ev.raw()) + .await + .into_iter() + .any(|action| action.should_notify()) + { + let Some(event_id) = ev.event_id() else { + // Shouldn't happen. + continue; + }; + subscribe_up_to = Some(event_id); + break; + } + } + + // And if we've found such a mention, subscribe to the thread up to this + // event. + if let Some(event_id) = subscribe_up_to { + trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to"); + if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await { + warn!(%err, "Failed to subscribe to thread"); + } else { + let _ = thread_subscriber_sender.send(()); + } + } + + return true; + } + #[instrument(skip_all)] async fn thread_subscriber_task( client: WeakClient, @@ -436,97 +525,23 @@ impl EventCache { let mut rx = linked_chunk_update_sender.subscribe(); loop { - match rx.recv().await { - Ok(up) => { - let Some(client) = client.get() else { - // Client shutting down. - debug!("Client is shutting down, exiting thread subscriber task"); - break; - }; - - let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else { - trace!("received an update for a non-thread linked chunk, ignoring"); - continue; - }; - - let Some(room) = client.get_room(&room_id) else { - warn!(%room_id, "unknown room"); - continue; - }; - - let thread_root = thread_root.clone(); - - let new_events = up.events(); - if new_events.is_empty() { - // No new events, nothing to do. - continue; - } - - // This `PushContext` is going to be used to compute whether an in-thread event - // would trigger a mention. - // - // Of course, we're not interested in an in-thread event causing a mention, - // because it's part of a thread we've subscribed to. So the - // `PushContext` must not include the check for thread subscriptions (otherwise - // it would be impossible to subscribe to new threads). - - let with_thread_subscriptions = false; - - let Some(push_context) = room - .push_context_internal(with_thread_subscriptions) - .await - .inspect_err(|err| { - warn!("Failed to get push context for threads: {err}"); - }) - .ok() - .flatten() - else { - warn!("Missing push context for thread subscriptions."); - continue; - }; - - let mut subscribe_up_to = None; - - // Find if there's an event that would trigger a mention for the current - // user, iterating from the end of the new events towards the oldest, - for ev in new_events.into_iter().rev() { - if push_context - .for_event(ev.raw()) - .await - .into_iter() - .any(|action| action.should_notify()) - { - let Some(event_id) = ev.event_id() else { - // Shouldn't happen. - continue; - }; - subscribe_up_to = Some(event_id.to_owned()); + select! { + res = rx.recv() => { + match res { + Ok(up) => { + if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await { + break; + } + } + Err(RecvError::Closed) => { + debug!("Linked chunk update channel has been closed, exiting thread subscriber task"); break; } - } - - // And if we've found such a mention, subscribe to the thread up to this - // event. - if let Some(event_id) = subscribe_up_to { - trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to"); - if let Err(err) = - room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await - { - warn!(%err, "Failed to subscribe to thread"); - } else { - let _ = thread_subscriber_sender.send(()); + Err(RecvError::Lagged(num_skipped)) => { + warn!(num_skipped, "Lagged behind linked chunk updates"); } } } - - Err(RecvError::Closed) => { - debug!("Linked chunk update channel has been closed, exiting thread subscriber task"); - break; - } - - Err(RecvError::Lagged(num_skipped)) => { - warn!(num_skipped, "Lagged behind linked chunk updates"); - } } } }