From 91784ded7249e2cad1b19e745f9e66c0ab6cfe6e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 31 Aug 2023 14:32:32 +0200 Subject: [PATCH] feat(ui): `TimelineInnerStateLock::lock` is replaced by `read` and `write`. The `Timeline` batches its updates to its subscribers (e.g. a client app, like Element X). A batch is built every time the inner state lock of the `Timeline` is released. On the paper, it's nice; in practise, even a read operation on the `Timeline` leads to building a new batch. This is inefficient and consumes resources (like CPU cycles, FFI boundary crossings, memory allocations etc.) even if there is no update to put in the batch: this will just batch empty updates. To avoid that, one needs to make the difference between read or write operations onto the inner state. Only the write operations will fire a batch. This patch splits the `TimelineInnerStateLock::lock` method into `::read` and `::write`. The idea is that a read-only lock doesn't hold a clone of the lock release observer (`lock_release_ob: SharedObservable<()>`), it will not notify the observer. Then it's only the write lock that holds a clone of the lock release observer, and will notify it. This patch updates the code accordingly as best as possible. --- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 57 ++++++++++--------- .../matrix-sdk-ui/src/timeline/inner/state.rs | 42 +++++++------- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 80b875a1f..0ab63b0fd 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -66,7 +66,7 @@ use super::{ mod state; pub(super) use self::state::TimelineInnerState; -use self::state::{TimelineInnerStateLock, TimelineInnerStateLockGuard}; +use self::state::{TimelineInnerStateLock, TimelineInnerStateWriteGuard}; #[derive(Clone, Debug)] pub(super) struct TimelineInner { @@ -141,14 +141,14 @@ impl TimelineInner

{ /// /// Cheap because `im::Vector` is cheap to clone. pub(super) async fn items(&self) -> Vector> { - self.state.lock().await.items.clone() + self.state.read().await.items.clone() } pub(super) async fn subscribe( &self, ) -> (Vector>, VectorSubscriber>) { trace!("Creating timeline items signal"); - let state = self.state.lock().await; + let state = self.state.read().await; // auto-deref to the inner vector's clone method let items = state.items.clone(); let stream = state.items.subscribe(); @@ -172,7 +172,7 @@ impl TimelineInner

{ F: Fn(Arc) -> Option, { trace!("Creating timeline items signal"); - let state = self.state.lock().await; + let state = self.state.read().await; state.items.subscribe_filter_map(f) } @@ -180,7 +180,7 @@ impl TimelineInner

{ &self, annotation: &Annotation, ) -> Result { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let user_id = self.room_data_provider.own_user_id(); @@ -301,7 +301,7 @@ impl TimelineInner

{ ) { let own_user_id = self.room_data_provider.own_user_id().to_owned(); self.state - .lock() + .write() .await .users_read_receipts .entry(own_user_id) @@ -317,7 +317,7 @@ impl TimelineInner

{ debug!("Adding {} initial events", events.len()); - let mut state = self.state.lock().await; + let mut state = self.state.write().await; for event in events { state .handle_remote_event( @@ -332,12 +332,12 @@ impl TimelineInner

{ pub(super) async fn clear(&self) { trace!("Clearing timeline"); - self.state.lock().await.clear(); + self.state.write().await.clear(); } #[instrument(skip_all)] pub(super) async fn handle_joined_room_update(&self, update: JoinedRoom) { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; state.handle_sync_timeline(update.timeline, &self.room_data_provider, &self.settings).await; trace!("Handling account data"); @@ -372,7 +372,7 @@ impl TimelineInner

{ pub(super) async fn handle_sync_timeline(&self, timeline: Timeline) { self.state - .lock() + .write() .await .handle_sync_timeline(timeline, &self.room_data_provider, &self.settings) .await; @@ -381,7 +381,7 @@ impl TimelineInner

{ #[cfg(test)] pub(super) async fn handle_live_event(&self, event: SyncTimelineEvent) { self.state - .lock() + .write() .await .handle_live_event(event, &self.room_data_provider, &self.settings) .await; @@ -397,7 +397,7 @@ impl TimelineInner

{ let sender = self.room_data_provider.own_user_id().to_owned(); let profile = self.room_data_provider.profile(&sender).await; - let mut state = self.state.lock().await; + let mut state = self.state.write().await; state.handle_local_event(sender, profile, txn_id, content, &self.settings); } @@ -412,7 +412,7 @@ impl TimelineInner

{ let sender = self.room_data_provider.own_user_id().to_owned(); let profile = self.room_data_provider.profile(&sender).await; - let mut state = self.state.lock().await; + let mut state = self.state.write().await; state.handle_local_redaction(sender, profile, txn_id, to_redact, content, &self.settings); } @@ -425,7 +425,7 @@ impl TimelineInner

{ txn_id: &TransactionId, send_state: EventSendState, ) { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let new_event_id: Option<&EventId> = match &send_state { EventSendState::Sent { event_id } => Some(event_id), @@ -526,7 +526,7 @@ impl TimelineInner

{ annotation: &Annotation, result: &ReactionToggleResult, ) -> Result { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let user_id = self.room_data_provider.own_user_id(); let annotation_key: AnnotationKey = annotation.into(); @@ -568,7 +568,7 @@ impl TimelineInner

{ &self, txn_id: &TransactionId, ) -> Option { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let (idx, item) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))?; let local_item = item.as_local()?; @@ -594,7 +594,8 @@ impl TimelineInner

{ } pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; + if let Some((idx, _)) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id)) { @@ -615,7 +616,7 @@ impl TimelineInner

{ &self, events: Vec, ) -> Option { - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let mut total = HandleManyEventsResult::default(); for event in events { @@ -636,7 +637,7 @@ impl TimelineInner

{ } pub(super) async fn set_fully_read_event(&self, fully_read_event_id: OwnedEventId) { - self.state.lock().await.set_fully_read_event(fully_read_event_id) + self.state.write().await.set_fully_read_event(fully_read_event_id) } #[cfg(feature = "e2e-encryption")] @@ -667,7 +668,7 @@ impl TimelineInner

{ ) { use super::EncryptedMessage; - let mut state = self.state.clone().lock_owned().await; + let mut state = self.state.clone().write_owned().await; let should_retry = move |session_id: &str| { if let Some(session_ids) = &session_ids { @@ -797,7 +798,7 @@ impl TimelineInner

{ } async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails) { - self.state.lock().await.items.for_each(|mut entry| { + self.state.write().await.items.for_each(|mut entry| { let Some(event_item) = entry.as_event() else { return }; if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) { let new_item = entry.with_kind(TimelineItemKind::Event( @@ -811,7 +812,7 @@ impl TimelineInner

{ pub(super) async fn update_sender_profiles(&self) { trace!("Updating sender profiles"); - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let mut entries = state.items.entries(); while let Some(mut entry) = entries.next() { let Some(event_item) = entry.as_event() else { continue }; @@ -851,7 +852,7 @@ impl TimelineInner

{ #[cfg(test)] pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) { let own_user_id = self.room_data_provider.own_user_id(); - self.state.lock().await.handle_explicit_read_receipts(receipt_event_content, own_user_id); + self.state.write().await.handle_explicit_read_receipts(receipt_event_content, own_user_id); } } @@ -890,7 +891,7 @@ impl TimelineInner { &self, event_id: &EventId, ) -> Result<(), super::Error> { - let state = self.state.lock().await; + let state = self.state.write().await; let (index, item) = rfind_event_by_id(&state.items, event_id) .ok_or(super::Error::RemoteEventNotInTimeline)?; let remote_item = item.as_remote().ok_or(super::Error::RemoteEventNotInTimeline)?.clone(); @@ -925,7 +926,7 @@ impl TimelineInner { // We need to be sure to have the latest position of the event as it might have // changed while waiting for the request. - let mut state = self.state.lock().await; + let mut state = self.state.write().await; let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id) .ok_or(super::Error::RemoteEventNotInTimeline)?; @@ -961,7 +962,7 @@ impl TimelineInner { &self, user_id: &UserId, ) -> Option<(OwnedEventId, Receipt)> { - let state = self.state.lock().await; + let state = self.state.read().await; let room = self.room(); state.latest_user_read_receipt(user_id, room).await @@ -982,7 +983,7 @@ impl TimelineInner { } let own_user_id = self.room().own_user_id(); - let state = self.state.lock().await; + let state = self.state.read().await; let room = self.room(); match receipt_type { @@ -1036,7 +1037,7 @@ pub(super) struct HandleManyEventsResult { } async fn fetch_replied_to_event( - mut state: TimelineInnerStateLockGuard<'_>, + mut state: TimelineInnerStateWriteGuard<'_>, index: usize, item: &EventTimelineItem, message: &Message, diff --git a/crates/matrix-sdk-ui/src/timeline/inner/state.rs b/crates/matrix-sdk-ui/src/timeline/inner/state.rs index 41a930ee8..75a198cf5 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/state.rs @@ -34,7 +34,7 @@ use ruma::{ MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId, UserId, }; -use tokio::sync::{Mutex, MutexGuard, OwnedMutexGuard}; +use tokio::sync::{OwnedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, error, instrument, trace, warn}; use super::{ReactionState, TimelineInnerSettings}; @@ -58,29 +58,33 @@ use crate::{ #[derive(Clone)] pub(in crate::timeline) struct TimelineInnerStateLock { - inner: Arc>, + inner: Arc>, lock_release_ob: SharedObservable<()>, } impl TimelineInnerStateLock { pub(super) fn new(state: TimelineInnerState) -> Self { - Self { inner: Arc::new(Mutex::new(state)), lock_release_ob: Default::default() } + Self { inner: Arc::new(RwLock::new(state)), lock_release_ob: Default::default() } } pub(super) fn subscribe_lock_release(&self) -> Subscriber<()> { self.lock_release_ob.subscribe() } - pub async fn lock(&self) -> TimelineInnerStateLockGuard<'_> { - TimelineInnerStateLockGuard { - inner: self.inner.lock().await, + pub async fn read(&self) -> RwLockReadGuard<'_, TimelineInnerState> { + self.inner.read().await + } + + pub async fn write(&self) -> TimelineInnerStateWriteGuard<'_> { + TimelineInnerStateWriteGuard { + inner: self.inner.write().await, lock_release_ob: self.lock_release_ob.clone(), } } - pub async fn lock_owned(&self) -> TimelineInnerStateOwnedLockGuard { - TimelineInnerStateOwnedLockGuard { - inner: self.inner.clone().lock_owned().await, + pub async fn write_owned(&self) -> TimelineInnerStateOwnedWriteGuard { + TimelineInnerStateOwnedWriteGuard { + inner: self.inner.clone().write_owned().await, lock_release_ob: self.lock_release_ob.clone(), } } @@ -477,12 +481,12 @@ impl TimelineInnerState { } } -pub(in crate::timeline) struct TimelineInnerStateLockGuard<'a> { - inner: MutexGuard<'a, TimelineInnerState>, +pub(in crate::timeline) struct TimelineInnerStateWriteGuard<'a> { + inner: RwLockWriteGuard<'a, TimelineInnerState>, lock_release_ob: SharedObservable<()>, } -impl Deref for TimelineInnerStateLockGuard<'_> { +impl Deref for TimelineInnerStateWriteGuard<'_> { type Target = TimelineInnerState; fn deref(&self) -> &Self::Target { @@ -490,24 +494,24 @@ impl Deref for TimelineInnerStateLockGuard<'_> { } } -impl DerefMut for TimelineInnerStateLockGuard<'_> { +impl DerefMut for TimelineInnerStateWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } -impl Drop for TimelineInnerStateLockGuard<'_> { +impl Drop for TimelineInnerStateWriteGuard<'_> { fn drop(&mut self) { self.lock_release_ob.set(()); } } -pub(in crate::timeline) struct TimelineInnerStateOwnedLockGuard { - inner: OwnedMutexGuard, +pub(in crate::timeline) struct TimelineInnerStateOwnedWriteGuard { + inner: OwnedRwLockWriteGuard, lock_release_ob: SharedObservable<()>, } -impl Deref for TimelineInnerStateOwnedLockGuard { +impl Deref for TimelineInnerStateOwnedWriteGuard { type Target = TimelineInnerState; fn deref(&self) -> &Self::Target { @@ -515,13 +519,13 @@ impl Deref for TimelineInnerStateOwnedLockGuard { } } -impl DerefMut for TimelineInnerStateOwnedLockGuard { +impl DerefMut for TimelineInnerStateOwnedWriteGuard { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } -impl Drop for TimelineInnerStateOwnedLockGuard { +impl Drop for TimelineInnerStateOwnedWriteGuard { fn drop(&mut self) { self.lock_release_ob.set(()); }