refactor(sdk): Rename RoomEventCacheListener to RoomEventCacheSubscriber.

This patch removes a name ambiguity around _listener_ vs. _subscriber_.
Both terms are used to talk about `RoomEventCacheListener`. We usually
use the term _subscriber_ for the type being returned by a `subscribe`
method. The code refers to this sometimes as listener, sometimes
as subscriber, sometimes both in the same sentence, which can be very
confusing! This patch solves this by using the _subscriber_ term only.
This commit is contained in:
Ivan Enderlin
2025-06-18 12:55:44 +02:00
parent 50c3217353
commit 45caaffb26
3 changed files with 46 additions and 44 deletions

View File

@@ -22,7 +22,7 @@ use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
crypto::store::types::RoomKeyInfo,
encryption::backups::BackupState,
event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate},
event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate},
executor::spawn,
send_queue::RoomSendQueueUpdate,
Room,
@@ -356,7 +356,7 @@ where
async fn room_event_cache_updates_task(
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
mut event_subscriber: RoomEventCacheListener,
mut room_event_cache_subscriber: RoomEventCacheSubscriber,
timeline_focus: TimelineFocus,
) {
trace!("Spawned the event subscriber task.");
@@ -364,7 +364,7 @@ async fn room_event_cache_updates_task(
loop {
trace!("Waiting for an event.");
let update = match event_subscriber.recv().await {
let update = match room_event_cache_subscriber.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {

View File

@@ -63,7 +63,7 @@ mod room;
pub mod paginator;
pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
pub use room::{RoomEventCache, RoomEventCacheListener};
pub use room::{RoomEventCache, RoomEventCacheSubscriber};
/// An error observed in the [`EventCache`].
#[derive(thiserror::Error, Debug)]
@@ -276,14 +276,14 @@ impl EventCache {
/// The auto-shrink mechanism works this way:
///
/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
/// increment the active number of listeners to that room, aka
/// [`RoomEventCacheState::listener_count`].
/// increment the active number of subscribers to that room, aka
/// [`RoomEventCacheState::subscriber_count`].
/// - When that subscriber is dropped, it will decrement that count; and
/// notify the task below if it reached 0.
/// - The task spawned here, owned by the [`EventCacheInner`], will listen
/// to such notifications that a room may be shrunk. It will attempt an
/// auto-shrink, by letting the inner state decide whether this is a good
/// time to do so (new listeners might have spawned in the meanwhile).
/// time to do so (new subscribers might have spawned in the meanwhile).
#[instrument(skip_all)]
async fn auto_shrink_linked_chunk_task(
inner: Arc<EventCacheInner>,
@@ -303,11 +303,11 @@ impl EventCache {
trace!("waiting for state lock…");
let mut state = room.inner.state.write().await;
match state.auto_shrink_if_no_listeners().await {
match state.auto_shrink_if_no_subscribers().await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
// listeners, right? RIGHT? Especially because the state is guarded behind
// subscribers, right? RIGHT? Especially because the state is guarded behind
// a lock.
//
// However, better safe than sorry, and it's cheap to send an update here,

View File

@@ -66,15 +66,15 @@ impl fmt::Debug for RoomEventCache {
}
}
/// Thin wrapper for a room event cache listener, so as to trigger side-effects
/// when all listeners are gone.
/// Thin wrapper for a room event cache subscriber, so as to trigger
/// side-effects when all subscribers are gone.
///
/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
/// more listeners are active. This is an optimisation to reduce the number of
/// data held in memory by a [`RoomEventCache`]: when no more listeners are
/// more subscribers are active. This is an optimisation to reduce the number of
/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
/// active, all data are reduced to the minimum.
#[allow(missing_debug_implementations)]
pub struct RoomEventCacheListener {
pub struct RoomEventCacheSubscriber {
/// Underlying receiver of the room event cache's updates.
recv: Receiver<RoomEventCacheUpdate>,
@@ -85,17 +85,19 @@ pub struct RoomEventCacheListener {
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
/// Shared instance of the auto-shrinker.
listener_count: Arc<AtomicUsize>,
subscriber_count: Arc<AtomicUsize>,
}
impl Drop for RoomEventCacheListener {
impl Drop for RoomEventCacheSubscriber {
fn drop(&mut self) {
let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
trace!(
"dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
);
if previous_listener_count == 1 {
// We were the last instance of the listener; let the auto-shrinker know by
if previous_subscriber_count == 1 {
// We were the last instance of the subscriber; let the auto-shrinker know by
// notifying it of our room id.
let mut room_id = self.room_id.clone();
@@ -125,12 +127,12 @@ impl Drop for RoomEventCacheListener {
}
}
trace!("sent notification to the parent channel that we were the last listener");
trace!("sent notification to the parent channel that we were the last subscriber");
}
}
}
impl Deref for RoomEventCacheListener {
impl Deref for RoomEventCacheSubscriber {
type Target = Receiver<RoomEventCacheUpdate>;
fn deref(&self) -> &Self::Target {
@@ -138,7 +140,7 @@ impl Deref for RoomEventCacheListener {
}
}
impl DerefMut for RoomEventCacheListener {
impl DerefMut for RoomEventCacheSubscriber {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.recv
}
@@ -167,7 +169,7 @@ impl RoomEventCache {
/// Read all current events.
///
/// Use [`RoomEventCache::subscribe`] to get all current events, plus a
/// listener/subscriber.
/// subscriber.
pub async fn events(&self) -> Vec<Event> {
let state = self.inner.state.read().await;
@@ -178,24 +180,24 @@ impl RoomEventCache {
/// events.
///
/// Use [`RoomEventCache::events`] to get all current events without the
/// listener/subscriber. Creating, and especially dropping, a
/// [`RoomEventCacheListener`] isn't free, as it triggers side-effects.
pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheListener) {
/// subscriber. Creating, and especially dropping, a
/// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
pub async fn subscribe(&self) -> (Vec<Event>, RoomEventCacheSubscriber) {
let state = self.inner.state.read().await;
let events = state.events().events().map(|(_position, item)| item.clone()).collect();
let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst);
trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
let recv = self.inner.sender.subscribe();
let listener = RoomEventCacheListener {
let subscriber = RoomEventCacheSubscriber {
recv,
room_id: self.inner.room_id.clone(),
auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
listener_count: state.listener_count.clone(),
subscriber_count: state.subscriber_count.clone(),
};
(events, listener)
(events, subscriber)
}
/// Return a [`RoomPagination`] API object useful for running
@@ -511,9 +513,9 @@ mod private {
pagination_status: SharedObservable<RoomPaginationStatus>,
/// An atomic count of the current number of listeners of the
/// An atomic count of the current number of subscriber of the
/// [`super::RoomEventCache`].
pub(super) listener_count: Arc<AtomicUsize>,
pub(super) subscriber_count: Arc<AtomicUsize>,
}
impl RoomEventCacheState {
@@ -566,7 +568,7 @@ mod private {
store,
events,
waited_for_initial_prev_token: false,
listener_count: Default::default(),
subscriber_count: Default::default(),
pagination_status,
})
}
@@ -777,17 +779,17 @@ mod private {
Ok(())
}
/// Automatically shrink the room if there are no listeners, as
/// indicated by the atomic number of active listeners.
/// Automatically shrink the room if there are no more subscribers, as
/// indicated by the atomic number of active subscribers.
#[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
pub(crate) async fn auto_shrink_if_no_listeners(
pub(crate) async fn auto_shrink_if_no_subscribers(
&mut self,
) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst);
trace!(listener_count, "received request to auto-shrink");
trace!(subscriber_count, "received request to auto-shrink");
if listener_count == 0 {
if subscriber_count == 0 {
// If we are the last strong reference to the auto-shrinker, we can shrink the
// events data structure to its last chunk.
self.shrink_to_last_chunk().await?;
@@ -2579,9 +2581,9 @@ mod timed_tests {
assert!(stream1.is_empty());
// Have another listener subscribe to the event cache.
// Have another subscriber.
// Since it's not the first one, and the previous one loaded some more events,
// the second listener seems them all.
// the second subscribers sees them all.
let (events2, stream2) = room_event_cache.subscribe().await;
assert_eq!(events2.len(), 2);
assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
@@ -2604,7 +2606,7 @@ mod timed_tests {
{
// Check the inner state: there's no more shared auto-shrinker.
let state = room_event_cache.inner.state.read().await;
assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0);
}
// Getting the events will only give us the latest chunk.