From 02cfee68c46cecd523959b8dd899bb8bdc5847d6 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 10 May 2023 15:51:53 +0200 Subject: [PATCH] feat(sdk): `SlidingSyncRoom` is cheap to clone. This patch changes `SlidingSyncRoom` to move all its fields inside an inner type `SlidingSyncRoom` behind an `Arc`, so that cloning is cheap, and all clones are sharing the same state. --- .../matrix-sdk/src/sliding_sync/list/mod.rs | 2 +- crates/matrix-sdk/src/sliding_sync/room.rs | 566 +++++++++++------- 2 files changed, 350 insertions(+), 218 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index 0166ce835..38a5b97a3 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -28,7 +28,7 @@ use tracing::{instrument, warn}; use super::{Error, SlidingSyncInternalMessage}; use crate::Result; -/// Holding a specific filtered list within the concept of sliding sync. +/// Holding a specific filtered list within the concept of Sliding Sync. /// /// It is OK to clone this type as much as you need: cloning it is cheap. #[derive(Clone, Debug)] diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index 2eebd7c3d..56c610738 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -1,4 +1,8 @@ -use std::{fmt::Debug, ops::Not}; +use std::{ + fmt::Debug, + ops::Not, + sync::{Arc, RwLock}, +}; use eyeball_im::Vector; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; @@ -6,7 +10,7 @@ use ruma::{ api::client::sync::sync_events::{v4, UnreadNotificationsCount}, events::AnySyncStateEvent, serde::Raw, - OwnedRoomId, + OwnedRoomId, RoomId, }; use serde::{Deserialize, Serialize}; use tracing::{error, instrument}; @@ -16,33 +20,8 @@ use crate::{ Client, }; -/// A Sliding Sync Room. -/// -/// It contains some information about a specific room, along with a queue of -/// events for the timeline. -#[derive(Debug, Clone)] -pub struct SlidingSyncRoom { - /// The client, used to fetch [`Room`][crate::room::Room]. - client: Client, - - /// The room ID. - room_id: OwnedRoomId, - - /// The room representation as a `SlidingSync`'s response from the server. - /// - /// We update this response when an update is needed. - inner: v4::SlidingSyncRoom, - - /// Internal state of `Self`. - state: SlidingSyncRoomState, - - /// A queue of received events, used to build a - /// [`Timeline`][crate::Timeline]. - timeline_queue: Vector, -} - /// The state of a [`SlidingSyncRoom`]. -#[derive(Debug, Default, Clone, PartialEq)] +#[derive(Copy, Clone, Debug, Default, PartialEq)] pub enum SlidingSyncRoomState { /// The room is not loaded, i.e. not updates have been received yet. #[default] @@ -56,6 +35,17 @@ pub enum SlidingSyncRoomState { Loaded, } +/// A Sliding Sync Room. +/// +/// It contains some information about a specific room, along with a queue of +/// events for the timeline. +/// +/// It is OK to clone this type as much as you need: cloning it is cheap. +#[derive(Debug, Clone)] +pub struct SlidingSyncRoom { + inner: Arc, +} + impl SlidingSyncRoom { /// Create a new `SlidingSyncRoom`. pub(super) fn new( @@ -65,57 +55,66 @@ impl SlidingSyncRoom { timeline: Vec, ) -> Self { Self { - client, - room_id, - inner, - state: SlidingSyncRoomState::NotLoaded, - timeline_queue: timeline.into(), + inner: Arc::new(SlidingSyncRoomInner { + client, + room_id, + inner: RwLock::new(inner), + state: RwLock::new(SlidingSyncRoomState::NotLoaded), + timeline_queue: RwLock::new(timeline.into()), + }), } } /// Get the room ID of this `SlidingSyncRoom`. - pub fn room_id(&self) -> &ruma::RoomId { - &self.room_id + pub fn room_id(&self) -> &RoomId { + &self.inner.room_id } /// This rooms name as calculated by the server, if any - pub fn name(&self) -> Option<&str> { - self.inner.name.as_deref() + pub fn name(&self) -> Option { + let inner = self.inner.inner.read().unwrap(); + + inner.name.to_owned() } /// Is this a direct message? pub fn is_dm(&self) -> Option { - self.inner.is_dm + let inner = self.inner.inner.read().unwrap(); + + inner.is_dm } /// Was this an initial response? pub fn is_initial_response(&self) -> Option { - self.inner.initial + let inner = self.inner.inner.read().unwrap(); + + inner.initial } /// Is there any unread notifications? pub fn has_unread_notifications(&self) -> bool { - self.inner.unread_notifications.is_empty().not() + let inner = self.inner.inner.read().unwrap(); + + inner.unread_notifications.is_empty().not() } /// Get unread notifications. - pub fn unread_notifications(&self) -> &UnreadNotificationsCount { - &self.inner.unread_notifications + pub fn unread_notifications(&self) -> UnreadNotificationsCount { + let inner = self.inner.inner.read().unwrap(); + + inner.unread_notifications.clone() } /// Get the required state. - pub fn required_state(&self) -> &Vec> { - &self.inner.required_state - } + pub fn required_state(&self) -> Vec> { + let inner = self.inner.inner.read().unwrap(); - /// Get the previous batch. - fn prev_batch(&self) -> Option<&String> { - self.inner.prev_batch.as_ref() + inner.required_state.clone() } /// `Timeline` of this room pub async fn timeline(&self) -> Option { - Some(self.timeline_builder()?.track_read_marker_and_receipts().build().await) + Some(self.inner.timeline_builder()?.track_read_marker_and_receipts().build().await) } /// The latest timeline item of this room. @@ -124,25 +123,7 @@ impl SlidingSyncRoom { /// this `SlidingSyncRoom`. #[instrument(skip_all)] pub async fn latest_event(&self) -> Option { - self.timeline_builder()?.build().await.latest_event().await - } - - fn timeline_builder(&self) -> Option { - if let Some(room) = self.client.get_room(&self.room_id) { - Some( - Timeline::builder(&room) - .events(self.prev_batch().cloned(), self.timeline_queue.clone()), - ) - } else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) { - Some(Timeline::builder(&invited_room).events(None, Vector::new())) - } else { - error!( - room_id = ?self.room_id, - "Room not found in client. Can't provide a timeline for it" - ); - - None - } + self.inner.timeline_builder()?.build().await.latest_event().await } pub(super) fn update( @@ -161,66 +142,147 @@ impl SlidingSyncRoom { .. } = room_data; - self.inner.unread_notifications = unread_notifications; + { + let mut inner = self.inner.inner.write().unwrap(); - // The server might not send some parts of the response, because they were sent - // before and the server wants to save bandwidth. So let's update the values - // only when they exist. + inner.unread_notifications = unread_notifications; - if name.is_some() { - self.inner.name = name; - } + // The server might not send some parts of the response, because they were sent + // before and the server wants to save bandwidth. So let's update the values + // only when they exist. - if initial.is_some() { - self.inner.initial = initial; - } - - if is_dm.is_some() { - self.inner.is_dm = is_dm; - } - - if !required_state.is_empty() { - self.inner.required_state = required_state; - } - - if prev_batch.is_some() { - self.inner.prev_batch = prev_batch; - } - - // There is timeline updates. - if !timeline_updates.is_empty() { - if let SlidingSyncRoomState::Preloaded = self.state { - // If the room has been read from the cache, we overwrite the timeline queue - // with the timeline updates. - - self.timeline_queue.clear(); - self.timeline_queue.extend(timeline_updates); - } else if limited { - // The server alerted us that we missed items in between. - - self.timeline_queue.clear(); - self.timeline_queue.extend(timeline_updates); - } else { - // It's the hot path. We have new updates that must be added to the existing - // timeline queue. - - self.timeline_queue.extend(timeline_updates); + if name.is_some() { + inner.name = name; } - } else if limited { - // The timeline updates are empty. But `limited` is set to true. It's a way to - // alert that we are stale. In this case, we should just clear the - // existing timeline. - self.timeline_queue.clear(); + if initial.is_some() { + inner.initial = initial; + } + + if is_dm.is_some() { + inner.is_dm = is_dm; + } + + if !required_state.is_empty() { + inner.required_state = required_state; + } + + if prev_batch.is_some() { + inner.prev_batch = prev_batch; + } } - self.state = SlidingSyncRoomState::Loaded; + let mut state = self.inner.state.write().unwrap(); + + { + let mut timeline_queue = self.inner.timeline_queue.write().unwrap(); + + // There is timeline updates. + if !timeline_updates.is_empty() { + if let SlidingSyncRoomState::Preloaded = *state { + // If the room has been read from the cache, we overwrite the timeline queue + // with the timeline updates. + + timeline_queue.clear(); + timeline_queue.extend(timeline_updates); + } else if limited { + // The server alerted us that we missed items in between. + + timeline_queue.clear(); + timeline_queue.extend(timeline_updates); + } else { + // It's the hot path. We have new updates that must be added to the existing + // timeline queue. + + timeline_queue.extend(timeline_updates); + } + } else if limited { + // The timeline updates are empty. But `limited` is set to true. It's a way to + // alert that we are stale. In this case, we should just clear the + // existing timeline. + + timeline_queue.clear(); + } + } + + *state = SlidingSyncRoomState::Loaded; } pub(super) fn from_frozen(frozen_room: FrozenSlidingSyncRoom, client: Client) -> Self { let FrozenSlidingSyncRoom { room_id, inner, timeline_queue } = frozen_room; - Self { client, room_id, inner, state: SlidingSyncRoomState::Preloaded, timeline_queue } + Self { + inner: Arc::new(SlidingSyncRoomInner { + client, + room_id, + inner: RwLock::new(inner), + state: RwLock::new(SlidingSyncRoomState::Preloaded), + timeline_queue: RwLock::new(timeline_queue), + }), + } + } +} + +#[cfg(test)] +impl SlidingSyncRoom { + fn state(&self) -> SlidingSyncRoomState { + *self.inner.state.read().unwrap() + } + + fn set_state(&mut self, state: SlidingSyncRoomState) { + *self.inner.state.write().unwrap() = state; + } + + fn timeline_queue(&self) -> std::sync::RwLockReadGuard> { + self.inner.timeline_queue.read().unwrap() + } +} + +#[derive(Debug)] +struct SlidingSyncRoomInner { + /// The client, used to fetch [`Room`][crate::room::Room]. + client: Client, + + /// The room ID. + room_id: OwnedRoomId, + + /// The room representation as a `SlidingSync`'s response from the server. + /// + /// We update this response when an update is needed. + inner: RwLock, + + /// Internal state of `Self`. + state: RwLock, + + /// A queue of received events, used to build a + /// [`Timeline`][crate::Timeline]. + timeline_queue: RwLock>, +} + +impl SlidingSyncRoomInner { + /// Get the previous batch. + fn prev_batch(&self) -> Option { + let inner = self.inner.read().unwrap(); + + inner.prev_batch.clone() + } + + fn timeline_builder(&self) -> Option { + if let Some(room) = self.client.get_room(&self.room_id) { + Some( + Timeline::builder(&room) + .events(self.prev_batch(), self.timeline_queue.read().unwrap().clone()), + ) + } else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) { + Some(Timeline::builder(&invited_room).events(None, Vector::new())) + } else { + error!( + room_id = ?self.room_id, + "Room not found in client. Can't provide a timeline for it" + ); + + None + } } } @@ -240,10 +302,10 @@ const NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE: usize = 10; impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom { fn from(value: &SlidingSyncRoom) -> Self { - let timeline_queue = &value.timeline_queue; + let timeline_queue = &value.inner.timeline_queue.read().unwrap(); let timeline_length = timeline_queue.len(); - let mut inner = value.inner.clone(); + let mut inner = value.inner.inner.read().unwrap().clone(); // To not overflow the cache, we only freeze the newest N items. On doing // so, we must drop the `prev_batch` key however, as we'd otherwise @@ -252,16 +314,17 @@ impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom { let timeline_queue = if timeline_length > NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE { inner.prev_batch = None; - timeline_queue + (*timeline_queue) .iter() .skip(timeline_length - NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE) .cloned() - .collect() + .collect::>() + .into() } else { - timeline_queue.clone() + (*timeline_queue).clone() }; - Self { room_id: value.room_id.clone(), inner, timeline_queue } + Self { room_id: value.inner.room_id.clone(), inner, timeline_queue } } } @@ -303,24 +366,24 @@ mod tests { async fn test_state_from_not_loaded() { let mut room = new_room(room_id!("!foo:bar.org"), room_response!({})).await; - assert_eq!(room.state, SlidingSyncRoomState::NotLoaded); + assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded); // Update with an empty response, but it doesn't matter. room.update(room_response!({}), vec![]); - assert_eq!(room.state, SlidingSyncRoomState::Loaded); + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); } #[tokio::test] async fn test_state_from_preloaded() { let mut room = new_room(room_id!("!foo:bar.org"), room_response!({})).await; - room.state = SlidingSyncRoomState::Preloaded; + room.set_state(SlidingSyncRoomState::Preloaded); // Update with an empty response, but it doesn't matter. room.update(room_response!({}), vec![]); - assert_eq!(room.state, SlidingSyncRoomState::Loaded); + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); } #[tokio::test] @@ -378,7 +441,7 @@ mod tests { test_room_name { name() = None; receives room_response!({"name": "gordon"}); - _ = Some("gordon"); + _ = Some("gordon".to_string()); } test_room_is_dm { @@ -416,11 +479,35 @@ mod tests { receives room_response!({"highlight_count": 42}); _ = Some(uint!(42)); } + } - test_prev_batch { - prev_batch() = None; - receives room_response!({"prev_batch": "t111_222_333"}); - _ = Some(&"t111_222_333".to_string()); + #[tokio::test] + async fn test_prev_batch() { + // Default value. + { + let room = new_room(room_id!("!foo:bar.org"), room_response!({})).await; + + assert_eq!(room.inner.prev_batch(), None); + } + + // Some value when initializing. + { + let room = + new_room(room_id!("!foo:bar.org"), room_response!({"prev_batch": "t111_222_333"})) + .await; + + assert_eq!(room.inner.prev_batch(), Some("t111_222_333".to_string())); + } + + // Some value when updating. + { + let mut room = new_room(room_id!("!foo:bar.org"), room_response!({})).await; + + assert_eq!(room.inner.prev_batch(), None); + + room.update(room_response!({"prev_batch": "t111_222_333"}), vec![]); + + assert_eq!(room.inner.prev_batch(), Some("t111_222_333".to_string())); } } @@ -485,7 +572,7 @@ mod tests { async fn test_timeline_initially_empty() { let room = new_room(room_id!("!foo:bar.org"), room_response!({})).await; - assert!(room.timeline_queue.is_empty()); + assert!(room.timeline_queue().is_empty()); } macro_rules! timeline_event { @@ -534,14 +621,18 @@ mod tests { ) .await; - assert_eq!(room.state, SlidingSyncRoomState::NotLoaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + ); + } } #[tokio::test] @@ -556,26 +647,34 @@ mod tests { ) .await; - assert_eq!(room.state, SlidingSyncRoomState::NotLoaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + ); + } room.update(room_response!({}), vec![]); // The queue is unmodified. - assert_eq!(room.state, SlidingSyncRoomState::Loaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + ); + } } #[tokio::test] @@ -590,14 +689,18 @@ mod tests { ) .await; - assert_eq!(room.state, SlidingSyncRoomState::NotLoaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + ); + } room.update( room_response!({ @@ -607,8 +710,12 @@ mod tests { ); // The queue has been emptied. - assert_eq!(room.state, SlidingSyncRoomState::Loaded); - assert_eq!(room.timeline_queue.len(), 0); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); + assert_eq!(timeline_queue.len(), 0); + } } #[tokio::test] @@ -623,16 +730,20 @@ mod tests { ) .await; - room.state = SlidingSyncRoomState::Preloaded; + room.set_state(SlidingSyncRoomState::Preloaded); - assert_eq!(room.state, SlidingSyncRoomState::Preloaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::Preloaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + ); + } room.update( room_response!({}), @@ -643,14 +754,18 @@ mod tests { ); // The queue is emptied, and new events are appended. - assert_eq!(room.state, SlidingSyncRoomState::Loaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x2:baz.org", - 1 => "$x3:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x2:baz.org", + 1 => "$x3:baz.org", + } + ); + } } #[tokio::test] @@ -665,14 +780,18 @@ mod tests { ) .await; - assert_eq!(room.state, SlidingSyncRoomState::NotLoaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + ); + } room.update( room_response!({}), @@ -683,16 +802,20 @@ mod tests { ); // New events are appended to the queue. - assert_eq!(room.state, SlidingSyncRoomState::Loaded); - assert_eq!(room.timeline_queue.len(), 4); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - 2 => "$x2:baz.org", - 3 => "$x3:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); + assert_eq!(timeline_queue.len(), 4); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + 2 => "$x2:baz.org", + 3 => "$x3:baz.org", + } + ); + } } #[tokio::test] @@ -707,14 +830,19 @@ mod tests { ) .await; - assert_eq!(room.state, SlidingSyncRoomState::NotLoaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x0:baz.org", - 1 => "$x1:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x0:baz.org", + 1 => "$x1:baz.org", + } + + ); + } room.update( room_response!({ @@ -727,14 +855,18 @@ mod tests { ); // The queue is emptied, and new events are appended. - assert_eq!(room.state, SlidingSyncRoomState::Loaded); - assert_eq!(room.timeline_queue.len(), 2); - assert_timeline_queue_event_ids!( - with room.timeline_queue { - 0 => "$x2:baz.org", - 1 => "$x3:baz.org", - } - ); + { + let timeline_queue = room.timeline_queue(); + + assert_eq!(room.state(), SlidingSyncRoomState::Loaded); + assert_eq!(timeline_queue.len(), 2); + assert_timeline_queue_event_ids!( + with timeline_queue { + 0 => "$x2:baz.org", + 1 => "$x3:baz.org", + } + ); + } } #[test]