From 45caaffb2628fbd9915afed0d8b8327978697bcd Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 18 Jun 2025 12:55:44 +0200 Subject: [PATCH] refactor(sdk): Rename `RoomEventCacheListener` to `RoomEventCacheSubscriber`. This patch removes a name ambiguity around _listener_ vs. _subscriber_. Both terms are used to talk about `RoomEventCacheListener`. We usually use the term _subscriber_ for the type being returned by a `subscribe` method. The code refers to this sometimes as listener, sometimes as subscriber, sometimes both in the same sentence, which can be very confusing! This patch solves this by using the _subscriber_ term only. --- crates/matrix-sdk-ui/src/timeline/builder.rs | 6 +- crates/matrix-sdk/src/event_cache/mod.rs | 12 ++-- crates/matrix-sdk/src/event_cache/room/mod.rs | 72 ++++++++++--------- 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 83b7483f6..e1d286f17 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -22,7 +22,7 @@ use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ crypto::store::types::RoomKeyInfo, encryption::backups::BackupState, - event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate}, + event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate}, executor::spawn, send_queue::RoomSendQueueUpdate, Room, @@ -356,7 +356,7 @@ where async fn room_event_cache_updates_task( room_event_cache: RoomEventCache, timeline_controller: TimelineController, - mut event_subscriber: RoomEventCacheListener, + mut room_event_cache_subscriber: RoomEventCacheSubscriber, timeline_focus: TimelineFocus, ) { trace!("Spawned the event subscriber task."); @@ -364,7 +364,7 @@ async fn room_event_cache_updates_task( loop { trace!("Waiting for an event."); - let update = match event_subscriber.recv().await { + let update = match room_event_cache_subscriber.recv().await { Ok(up) => up, Err(RecvError::Closed) => break, Err(RecvError::Lagged(num_skipped)) => { diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index f99f1f506..8f7a45ef9 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -63,7 +63,7 @@ mod room; pub mod paginator; pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus}; -pub use room::{RoomEventCache, RoomEventCacheListener}; +pub use room::{RoomEventCache, RoomEventCacheSubscriber}; /// An error observed in the [`EventCache`]. #[derive(thiserror::Error, Debug)] @@ -276,14 +276,14 @@ impl EventCache { /// The auto-shrink mechanism works this way: /// /// - Each time there's a new subscriber to a [`RoomEventCache`], it will - /// increment the active number of listeners to that room, aka - /// [`RoomEventCacheState::listener_count`]. + /// increment the active number of subscribers to that room, aka + /// [`RoomEventCacheState::subscriber_count`]. /// - When that subscriber is dropped, it will decrement that count; and /// notify the task below if it reached 0. /// - The task spawned here, owned by the [`EventCacheInner`], will listen /// to such notifications that a room may be shrunk. It will attempt an /// auto-shrink, by letting the inner state decide whether this is a good - /// time to do so (new listeners might have spawned in the meanwhile). + /// time to do so (new subscribers might have spawned in the meanwhile). #[instrument(skip_all)] async fn auto_shrink_linked_chunk_task( inner: Arc, @@ -303,11 +303,11 @@ impl EventCache { trace!("waiting for state lock…"); let mut state = room.inner.state.write().await; - match state.auto_shrink_if_no_listeners().await { + match state.auto_shrink_if_no_subscribers().await { Ok(diffs) => { if let Some(diffs) = diffs { // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any - // listeners, right? RIGHT? Especially because the state is guarded behind + // subscribers, right? RIGHT? Especially because the state is guarded behind // a lock. // // However, better safe than sorry, and it's cheap to send an update here, diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 97fc148b1..717076fb7 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -66,15 +66,15 @@ impl fmt::Debug for RoomEventCache { } } -/// Thin wrapper for a room event cache listener, so as to trigger side-effects -/// when all listeners are gone. +/// Thin wrapper for a room event cache subscriber, so as to trigger +/// side-effects when all subscribers are gone. /// /// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no -/// more listeners are active. This is an optimisation to reduce the number of -/// data held in memory by a [`RoomEventCache`]: when no more listeners are +/// more subscribers are active. This is an optimisation to reduce the number of +/// data held in memory by a [`RoomEventCache`]: when no more subscribers are /// active, all data are reduced to the minimum. #[allow(missing_debug_implementations)] -pub struct RoomEventCacheListener { +pub struct RoomEventCacheSubscriber { /// Underlying receiver of the room event cache's updates. recv: Receiver, @@ -85,17 +85,19 @@ pub struct RoomEventCacheListener { auto_shrink_sender: mpsc::Sender, /// Shared instance of the auto-shrinker. - listener_count: Arc, + subscriber_count: Arc, } -impl Drop for RoomEventCacheListener { +impl Drop for RoomEventCacheSubscriber { fn drop(&mut self) { - let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst); + let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst); - trace!("dropping a room event cache listener; previous count: {previous_listener_count}"); + trace!( + "dropping a room event cache subscriber; previous count: {previous_subscriber_count}" + ); - if previous_listener_count == 1 { - // We were the last instance of the listener; let the auto-shrinker know by + if previous_subscriber_count == 1 { + // We were the last instance of the subscriber; let the auto-shrinker know by // notifying it of our room id. let mut room_id = self.room_id.clone(); @@ -125,12 +127,12 @@ impl Drop for RoomEventCacheListener { } } - trace!("sent notification to the parent channel that we were the last listener"); + trace!("sent notification to the parent channel that we were the last subscriber"); } } } -impl Deref for RoomEventCacheListener { +impl Deref for RoomEventCacheSubscriber { type Target = Receiver; fn deref(&self) -> &Self::Target { @@ -138,7 +140,7 @@ impl Deref for RoomEventCacheListener { } } -impl DerefMut for RoomEventCacheListener { +impl DerefMut for RoomEventCacheSubscriber { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.recv } @@ -167,7 +169,7 @@ impl RoomEventCache { /// Read all current events. /// /// Use [`RoomEventCache::subscribe`] to get all current events, plus a - /// listener/subscriber. + /// subscriber. pub async fn events(&self) -> Vec { let state = self.inner.state.read().await; @@ -178,24 +180,24 @@ impl RoomEventCache { /// events. /// /// Use [`RoomEventCache::events`] to get all current events without the - /// listener/subscriber. Creating, and especially dropping, a - /// [`RoomEventCacheListener`] isn't free, as it triggers side-effects. - pub async fn subscribe(&self) -> (Vec, RoomEventCacheListener) { + /// subscriber. Creating, and especially dropping, a + /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects. + pub async fn subscribe(&self) -> (Vec, RoomEventCacheSubscriber) { let state = self.inner.state.read().await; let events = state.events().events().map(|(_position, item)| item.clone()).collect(); - let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst); - trace!("added a room event cache listener; new count: {}", previous_listener_count + 1); + let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst); + trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1); let recv = self.inner.sender.subscribe(); - let listener = RoomEventCacheListener { + let subscriber = RoomEventCacheSubscriber { recv, room_id: self.inner.room_id.clone(), auto_shrink_sender: self.inner.auto_shrink_sender.clone(), - listener_count: state.listener_count.clone(), + subscriber_count: state.subscriber_count.clone(), }; - (events, listener) + (events, subscriber) } /// Return a [`RoomPagination`] API object useful for running @@ -511,9 +513,9 @@ mod private { pagination_status: SharedObservable, - /// An atomic count of the current number of listeners of the + /// An atomic count of the current number of subscriber of the /// [`super::RoomEventCache`]. - pub(super) listener_count: Arc, + pub(super) subscriber_count: Arc, } impl RoomEventCacheState { @@ -566,7 +568,7 @@ mod private { store, events, waited_for_initial_prev_token: false, - listener_count: Default::default(), + subscriber_count: Default::default(), pagination_status, }) } @@ -777,17 +779,17 @@ mod private { Ok(()) } - /// Automatically shrink the room if there are no listeners, as - /// indicated by the atomic number of active listeners. + /// Automatically shrink the room if there are no more subscribers, as + /// indicated by the atomic number of active subscribers. #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub(crate) async fn auto_shrink_if_no_listeners( + pub(crate) async fn auto_shrink_if_no_subscribers( &mut self, ) -> Result>>, EventCacheError> { - let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst); + let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst); - trace!(listener_count, "received request to auto-shrink"); + trace!(subscriber_count, "received request to auto-shrink"); - if listener_count == 0 { + if subscriber_count == 0 { // If we are the last strong reference to the auto-shrinker, we can shrink the // events data structure to its last chunk. self.shrink_to_last_chunk().await?; @@ -2579,9 +2581,9 @@ mod timed_tests { assert!(stream1.is_empty()); - // Have another listener subscribe to the event cache. + // Have another subscriber. // Since it's not the first one, and the previous one loaded some more events, - // the second listener seems them all. + // the second subscribers sees them all. let (events2, stream2) = room_event_cache.subscribe().await; assert_eq!(events2.len(), 2); assert_eq!(events2[0].event_id().as_deref(), Some(evid1)); @@ -2604,7 +2606,7 @@ mod timed_tests { { // Check the inner state: there's no more shared auto-shrinker. let state = room_event_cache.inner.state.read().await; - assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0); + assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0); } // Getting the events will only give us the latest chunk.