From aaeab050da4d99abe512efaed291d2b4da6de691 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Fri, 27 Feb 2026 15:19:52 +0100 Subject: [PATCH] chore(sdk): Extract `EventCache::auto_shrink_linked_chunk_task` to `tasks.rs`. --- crates/matrix-sdk/src/event_cache/mod.rs | 81 +------------------ crates/matrix-sdk/src/event_cache/tasks.rs | 94 +++++++++++++++++++++- 2 files changed, 94 insertions(+), 81 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 6c9d40772..1001613fd 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -31,7 +31,7 @@ use std::{ collections::HashMap, fmt, ops::{Deref, DerefMut}, - sync::{Arc, OnceLock, Weak}, + sync::{Arc, OnceLock}, }; use eyeball::SharedObservable; @@ -51,7 +51,7 @@ use tokio::sync::{ broadcast::{Receiver, Sender, channel}, mpsc, }; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{error, instrument, trace}; use crate::{ Client, @@ -293,7 +293,7 @@ impl EventCache { // Force-initialize the sender in the [`RoomEventCacheInner`]. self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender); - let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", Self::auto_shrink_linked_chunk_task( + let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task( Arc::downgrade(&self.inner), auto_shrink_receiver, )); @@ -330,81 +330,6 @@ impl EventCache { self.inner.handle_room_updates(updates).await } - /// Spawns the task that will listen to auto-shrink notifications. - /// - /// The auto-shrink mechanism works this way: - /// - /// - Each time there's a new subscriber to a [`RoomEventCache`], it will - /// increment the active number of subscribers to that room, aka - /// [`RoomEventCacheState::subscriber_count`]. - /// - When that subscriber is dropped, it will decrement that count; and - /// notify the task below if it reached 0. - /// - The task spawned here, owned by the [`EventCacheInner`], will listen - /// to such notifications that a room may be shrunk. It will attempt an - /// auto-shrink, by letting the inner state decide whether this is a good - /// time to do so (new subscribers might have spawned in the meanwhile). - #[instrument(skip_all)] - async fn auto_shrink_linked_chunk_task( - inner: Weak, - mut rx: mpsc::Receiver, - ) { - while let Some(room_id) = rx.recv().await { - trace!(for_room = %room_id, "received notification to shrink"); - - let Some(inner) = inner.upgrade() else { - return; - }; - - let room = match inner.for_room(&room_id).await { - Ok(room) => room, - Err(err) => { - warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); - continue; - } - }; - - trace!("waiting for state lock…"); - let mut state = match room.state().write().await { - Ok(state) => state, - Err(err) => { - warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}"); - continue; - } - }; - - match state.auto_shrink_if_no_subscribers().await { - Ok(diffs) => { - if let Some(diffs) = diffs { - // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any - // subscribers, right? RIGHT? Especially because the state is guarded behind - // a lock. - // - // However, better safe than sorry, and it's cheap to send an update here, - // so let's do it! - if !diffs.is_empty() { - room.update_sender().send( - RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { - diffs, - origin: EventsOrigin::Cache, - }), - None, - ); - } - } else { - debug!("auto-shrinking didn't happen"); - } - } - - Err(err) => { - // There's not much we can do here, unfortunately. - warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}"); - } - } - } - - info!("Auto-shrink linked chunk task has been closed, exiting"); - } - /// Check whether [`EventCache::subscribe`] has been called. pub fn has_subscribed(&self) -> bool { self.inner.drop_handles.get().is_some() diff --git a/crates/matrix-sdk/src/event_cache/tasks.rs b/crates/matrix-sdk/src/event_cache/tasks.rs index ae12e2c06..93e487623 100644 --- a/crates/matrix-sdk/src/event_cache/tasks.rs +++ b/crates/matrix-sdk/src/event_cache/tasks.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Weak}, +}; use eyeball::Subscriber; use matrix_sdk_base::{ @@ -22,11 +25,17 @@ use matrix_sdk_base::{ use ruma::{OwnedEventId, OwnedTransactionId}; use tokio::{ select, - sync::broadcast::{Receiver, Sender, error::RecvError}, + sync::{ + broadcast::{Receiver, Sender, error::RecvError}, + mpsc, + }, }; use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn}; -use super::{EventCacheError, EventCacheInner, RoomEventCacheLinkedChunkUpdate}; +use super::{ + AutoShrinkChannelPayload, EventCacheError, EventCacheInner, EventsOrigin, + RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs, +}; use crate::{ client::WeakClient, send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate}, @@ -104,6 +113,85 @@ pub(super) async fn ignore_user_list_update_task( .await; } +/// Spawns the task that will listen to auto-shrink notifications. +/// +/// The auto-shrink mechanism works this way: +/// +/// - Each time there's a new subscriber to a [`RoomEventCache`], it will +/// increment the active number of subscribers to that room, aka +/// `RoomEventCacheState::subscriber_count`. +/// - When that subscriber is dropped, it will decrement that count; and notify +/// the task below if it reached 0. +/// - The task spawned here, owned by the [`EventCacheInner`], will listen to +/// such notifications that a room may be shrunk. It will attempt an +/// auto-shrink, by letting the inner state decide whether this is a good time +/// to do so (new subscribers might have spawned in the meanwhile). +/// +/// [`RoomEventCache`]: super::RoomEventCache +/// [`EventCacheInner`]: super::EventCacheInner +#[instrument(skip_all)] +pub(super) async fn auto_shrink_linked_chunk_task( + inner: Weak, + mut rx: mpsc::Receiver, +) { + while let Some(room_id) = rx.recv().await { + trace!(for_room = %room_id, "received notification to shrink"); + + let Some(inner) = inner.upgrade() else { + return; + }; + + let room = match inner.for_room(&room_id).await { + Ok(room) => room, + Err(err) => { + warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); + continue; + } + }; + + trace!("waiting for state lock…"); + + let mut state = match room.state().write().await { + Ok(state) => state, + Err(err) => { + warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}"); + continue; + } + }; + + match state.auto_shrink_if_no_subscribers().await { + Ok(diffs) => { + if let Some(diffs) = diffs { + // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any + // subscribers, right? RIGHT? Especially because the state is guarded behind + // a lock. + // + // However, better safe than sorry, and it's cheap to send an update here, + // so let's do it! + if !diffs.is_empty() { + room.update_sender().send( + RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { + diffs, + origin: EventsOrigin::Cache, + }), + None, + ); + } + } else { + debug!("auto-shrinking didn't happen"); + } + } + + Err(err) => { + // There's not much we can do here, unfortunately. + warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}"); + } + } + } + + info!("Auto-shrink linked chunk task has been closed, exiting"); +} + /// Handle [`SendQueueUpdate`] and [`RoomEventCacheLinkedChunkUpdate`] to update /// the threads, for a thread the user was not subscribed to. #[instrument(skip_all)]