chore(sdk): Extract EventCache::auto_shrink_linked_chunk_task to tasks.rs.

This commit is contained in:
Ivan Enderlin
2026-02-27 15:19:52 +01:00
parent 6b120505b8
commit aaeab050da
2 changed files with 94 additions and 81 deletions

View File

@@ -31,7 +31,7 @@ use std::{
collections::HashMap,
fmt,
ops::{Deref, DerefMut},
sync::{Arc, OnceLock, Weak},
sync::{Arc, OnceLock},
};
use eyeball::SharedObservable;
@@ -51,7 +51,7 @@ use tokio::sync::{
broadcast::{Receiver, Sender, channel},
mpsc,
};
use tracing::{debug, error, info, instrument, trace, warn};
use tracing::{error, instrument, trace};
use crate::{
Client,
@@ -293,7 +293,7 @@ impl EventCache {
// Force-initialize the sender in the [`RoomEventCacheInner`].
self.inner.auto_shrink_sender.get_or_init(|| auto_shrink_sender);
let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", Self::auto_shrink_linked_chunk_task(
let auto_shrink_linked_chunk_task = task_monitor.spawn_background_task("event_cache::auto_shrink_linked_chunk_task", tasks::auto_shrink_linked_chunk_task(
Arc::downgrade(&self.inner),
auto_shrink_receiver,
));
@@ -330,81 +330,6 @@ impl EventCache {
self.inner.handle_room_updates(updates).await
}
/// Spawns the task that will listen to auto-shrink notifications.
///
/// The auto-shrink mechanism works this way:
///
/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
/// increment the active number of subscribers to that room, aka
/// [`RoomEventCacheState::subscriber_count`].
/// - When that subscriber is dropped, it will decrement that count; and
/// notify the task below if it reached 0.
/// - The task spawned here, owned by the [`EventCacheInner`], will listen
/// to such notifications that a room may be shrunk. It will attempt an
/// auto-shrink, by letting the inner state decide whether this is a good
/// time to do so (new subscribers might have spawned in the meanwhile).
#[instrument(skip_all)]
async fn auto_shrink_linked_chunk_task(
inner: Weak<EventCacheInner>,
mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
) {
while let Some(room_id) = rx.recv().await {
trace!(for_room = %room_id, "received notification to shrink");
let Some(inner) = inner.upgrade() else {
return;
};
let room = match inner.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
continue;
}
};
trace!("waiting for state lock…");
let mut state = match room.state().write().await {
Ok(state) => state,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
continue;
}
};
match state.auto_shrink_if_no_subscribers().await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
// subscribers, right? RIGHT? Especially because the state is guarded behind
// a lock.
//
// However, better safe than sorry, and it's cheap to send an update here,
// so let's do it!
if !diffs.is_empty() {
room.update_sender().send(
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs,
origin: EventsOrigin::Cache,
}),
None,
);
}
} else {
debug!("auto-shrinking didn't happen");
}
}
Err(err) => {
// There's not much we can do here, unfortunately.
warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
}
}
}
info!("Auto-shrink linked chunk task has been closed, exiting");
}
/// Check whether [`EventCache::subscribe`] has been called.
pub fn has_subscribed(&self) -> bool {
self.inner.drop_handles.get().is_some()

View File

@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use eyeball::Subscriber;
use matrix_sdk_base::{
@@ -22,11 +25,17 @@ use matrix_sdk_base::{
use ruma::{OwnedEventId, OwnedTransactionId};
use tokio::{
select,
sync::broadcast::{Receiver, Sender, error::RecvError},
sync::{
broadcast::{Receiver, Sender, error::RecvError},
mpsc,
},
};
use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
use super::{EventCacheError, EventCacheInner, RoomEventCacheLinkedChunkUpdate};
use super::{
AutoShrinkChannelPayload, EventCacheError, EventCacheInner, EventsOrigin,
RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs,
};
use crate::{
client::WeakClient,
send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
@@ -104,6 +113,85 @@ pub(super) async fn ignore_user_list_update_task(
.await;
}
/// Spawns the task that will listen to auto-shrink notifications.
///
/// The auto-shrink mechanism works this way:
///
/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
/// increment the active number of subscribers to that room, aka
/// `RoomEventCacheState::subscriber_count`.
/// - When that subscriber is dropped, it will decrement that count; and notify
/// the task below if it reached 0.
/// - The task spawned here, owned by the [`EventCacheInner`], will listen to
/// such notifications that a room may be shrunk. It will attempt an
/// auto-shrink, by letting the inner state decide whether this is a good time
/// to do so (new subscribers might have spawned in the meanwhile).
///
/// [`RoomEventCache`]: super::RoomEventCache
/// [`EventCacheInner`]: super::EventCacheInner
#[instrument(skip_all)]
pub(super) async fn auto_shrink_linked_chunk_task(
inner: Weak<EventCacheInner>,
mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
) {
while let Some(room_id) = rx.recv().await {
trace!(for_room = %room_id, "received notification to shrink");
let Some(inner) = inner.upgrade() else {
return;
};
let room = match inner.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
continue;
}
};
trace!("waiting for state lock…");
let mut state = match room.state().write().await {
Ok(state) => state,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
continue;
}
};
match state.auto_shrink_if_no_subscribers().await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
// subscribers, right? RIGHT? Especially because the state is guarded behind
// a lock.
//
// However, better safe than sorry, and it's cheap to send an update here,
// so let's do it!
if !diffs.is_empty() {
room.update_sender().send(
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs,
origin: EventsOrigin::Cache,
}),
None,
);
}
} else {
debug!("auto-shrinking didn't happen");
}
}
Err(err) => {
// There's not much we can do here, unfortunately.
warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
}
}
}
info!("Auto-shrink linked chunk task has been closed, exiting");
}
/// Handle [`SendQueueUpdate`] and [`RoomEventCacheLinkedChunkUpdate`] to update
/// the threads, for a thread the user was not subscribed to.
#[instrument(skip_all)]