feat(sdk): SlidingSyncRoom is cheap to clone.

This patch changes `SlidingSyncRoom` to move all its fields inside an
inner type `SlidingSyncRoom` behind an `Arc`, so that cloning is cheap,
and all clones are sharing the same state.
This commit is contained in:
Ivan Enderlin
2023-05-10 15:51:53 +02:00
parent 09bb0fcdd3
commit 02cfee68c4
2 changed files with 350 additions and 218 deletions

View File

@@ -28,7 +28,7 @@ use tracing::{instrument, warn};
use super::{Error, SlidingSyncInternalMessage};
use crate::Result;
/// Holding a specific filtered list within the concept of sliding sync.
/// Holding a specific filtered list within the concept of Sliding Sync.
///
/// It is OK to clone this type as much as you need: cloning it is cheap.
#[derive(Clone, Debug)]

View File

@@ -1,4 +1,8 @@
use std::{fmt::Debug, ops::Not};
use std::{
fmt::Debug,
ops::Not,
sync::{Arc, RwLock},
};
use eyeball_im::Vector;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
@@ -6,7 +10,7 @@ use ruma::{
api::client::sync::sync_events::{v4, UnreadNotificationsCount},
events::AnySyncStateEvent,
serde::Raw,
OwnedRoomId,
OwnedRoomId, RoomId,
};
use serde::{Deserialize, Serialize};
use tracing::{error, instrument};
@@ -16,33 +20,8 @@ use crate::{
Client,
};
/// A Sliding Sync Room.
///
/// It contains some information about a specific room, along with a queue of
/// events for the timeline.
#[derive(Debug, Clone)]
pub struct SlidingSyncRoom {
/// The client, used to fetch [`Room`][crate::room::Room].
client: Client,
/// The room ID.
room_id: OwnedRoomId,
/// The room representation as a `SlidingSync`'s response from the server.
///
/// We update this response when an update is needed.
inner: v4::SlidingSyncRoom,
/// Internal state of `Self`.
state: SlidingSyncRoomState,
/// A queue of received events, used to build a
/// [`Timeline`][crate::Timeline].
timeline_queue: Vector<SyncTimelineEvent>,
}
/// The state of a [`SlidingSyncRoom`].
#[derive(Debug, Default, Clone, PartialEq)]
#[derive(Copy, Clone, Debug, Default, PartialEq)]
pub enum SlidingSyncRoomState {
/// The room is not loaded, i.e. not updates have been received yet.
#[default]
@@ -56,6 +35,17 @@ pub enum SlidingSyncRoomState {
Loaded,
}
/// A Sliding Sync Room.
///
/// It contains some information about a specific room, along with a queue of
/// events for the timeline.
///
/// It is OK to clone this type as much as you need: cloning it is cheap.
#[derive(Debug, Clone)]
pub struct SlidingSyncRoom {
inner: Arc<SlidingSyncRoomInner>,
}
impl SlidingSyncRoom {
/// Create a new `SlidingSyncRoom`.
pub(super) fn new(
@@ -65,57 +55,66 @@ impl SlidingSyncRoom {
timeline: Vec<SyncTimelineEvent>,
) -> Self {
Self {
client,
room_id,
inner,
state: SlidingSyncRoomState::NotLoaded,
timeline_queue: timeline.into(),
inner: Arc::new(SlidingSyncRoomInner {
client,
room_id,
inner: RwLock::new(inner),
state: RwLock::new(SlidingSyncRoomState::NotLoaded),
timeline_queue: RwLock::new(timeline.into()),
}),
}
}
/// Get the room ID of this `SlidingSyncRoom`.
pub fn room_id(&self) -> &ruma::RoomId {
&self.room_id
pub fn room_id(&self) -> &RoomId {
&self.inner.room_id
}
/// This rooms name as calculated by the server, if any
pub fn name(&self) -> Option<&str> {
self.inner.name.as_deref()
pub fn name(&self) -> Option<String> {
let inner = self.inner.inner.read().unwrap();
inner.name.to_owned()
}
/// Is this a direct message?
pub fn is_dm(&self) -> Option<bool> {
self.inner.is_dm
let inner = self.inner.inner.read().unwrap();
inner.is_dm
}
/// Was this an initial response?
pub fn is_initial_response(&self) -> Option<bool> {
self.inner.initial
let inner = self.inner.inner.read().unwrap();
inner.initial
}
/// Is there any unread notifications?
pub fn has_unread_notifications(&self) -> bool {
self.inner.unread_notifications.is_empty().not()
let inner = self.inner.inner.read().unwrap();
inner.unread_notifications.is_empty().not()
}
/// Get unread notifications.
pub fn unread_notifications(&self) -> &UnreadNotificationsCount {
&self.inner.unread_notifications
pub fn unread_notifications(&self) -> UnreadNotificationsCount {
let inner = self.inner.inner.read().unwrap();
inner.unread_notifications.clone()
}
/// Get the required state.
pub fn required_state(&self) -> &Vec<Raw<AnySyncStateEvent>> {
&self.inner.required_state
}
pub fn required_state(&self) -> Vec<Raw<AnySyncStateEvent>> {
let inner = self.inner.inner.read().unwrap();
/// Get the previous batch.
fn prev_batch(&self) -> Option<&String> {
self.inner.prev_batch.as_ref()
inner.required_state.clone()
}
/// `Timeline` of this room
pub async fn timeline(&self) -> Option<Timeline> {
Some(self.timeline_builder()?.track_read_marker_and_receipts().build().await)
Some(self.inner.timeline_builder()?.track_read_marker_and_receipts().build().await)
}
/// The latest timeline item of this room.
@@ -124,25 +123,7 @@ impl SlidingSyncRoom {
/// this `SlidingSyncRoom`.
#[instrument(skip_all)]
pub async fn latest_event(&self) -> Option<EventTimelineItem> {
self.timeline_builder()?.build().await.latest_event().await
}
fn timeline_builder(&self) -> Option<TimelineBuilder> {
if let Some(room) = self.client.get_room(&self.room_id) {
Some(
Timeline::builder(&room)
.events(self.prev_batch().cloned(), self.timeline_queue.clone()),
)
} else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) {
Some(Timeline::builder(&invited_room).events(None, Vector::new()))
} else {
error!(
room_id = ?self.room_id,
"Room not found in client. Can't provide a timeline for it"
);
None
}
self.inner.timeline_builder()?.build().await.latest_event().await
}
pub(super) fn update(
@@ -161,66 +142,147 @@ impl SlidingSyncRoom {
..
} = room_data;
self.inner.unread_notifications = unread_notifications;
{
let mut inner = self.inner.inner.write().unwrap();
// The server might not send some parts of the response, because they were sent
// before and the server wants to save bandwidth. So let's update the values
// only when they exist.
inner.unread_notifications = unread_notifications;
if name.is_some() {
self.inner.name = name;
}
// The server might not send some parts of the response, because they were sent
// before and the server wants to save bandwidth. So let's update the values
// only when they exist.
if initial.is_some() {
self.inner.initial = initial;
}
if is_dm.is_some() {
self.inner.is_dm = is_dm;
}
if !required_state.is_empty() {
self.inner.required_state = required_state;
}
if prev_batch.is_some() {
self.inner.prev_batch = prev_batch;
}
// There is timeline updates.
if !timeline_updates.is_empty() {
if let SlidingSyncRoomState::Preloaded = self.state {
// If the room has been read from the cache, we overwrite the timeline queue
// with the timeline updates.
self.timeline_queue.clear();
self.timeline_queue.extend(timeline_updates);
} else if limited {
// The server alerted us that we missed items in between.
self.timeline_queue.clear();
self.timeline_queue.extend(timeline_updates);
} else {
// It's the hot path. We have new updates that must be added to the existing
// timeline queue.
self.timeline_queue.extend(timeline_updates);
if name.is_some() {
inner.name = name;
}
} else if limited {
// The timeline updates are empty. But `limited` is set to true. It's a way to
// alert that we are stale. In this case, we should just clear the
// existing timeline.
self.timeline_queue.clear();
if initial.is_some() {
inner.initial = initial;
}
if is_dm.is_some() {
inner.is_dm = is_dm;
}
if !required_state.is_empty() {
inner.required_state = required_state;
}
if prev_batch.is_some() {
inner.prev_batch = prev_batch;
}
}
self.state = SlidingSyncRoomState::Loaded;
let mut state = self.inner.state.write().unwrap();
{
let mut timeline_queue = self.inner.timeline_queue.write().unwrap();
// There is timeline updates.
if !timeline_updates.is_empty() {
if let SlidingSyncRoomState::Preloaded = *state {
// If the room has been read from the cache, we overwrite the timeline queue
// with the timeline updates.
timeline_queue.clear();
timeline_queue.extend(timeline_updates);
} else if limited {
// The server alerted us that we missed items in between.
timeline_queue.clear();
timeline_queue.extend(timeline_updates);
} else {
// It's the hot path. We have new updates that must be added to the existing
// timeline queue.
timeline_queue.extend(timeline_updates);
}
} else if limited {
// The timeline updates are empty. But `limited` is set to true. It's a way to
// alert that we are stale. In this case, we should just clear the
// existing timeline.
timeline_queue.clear();
}
}
*state = SlidingSyncRoomState::Loaded;
}
pub(super) fn from_frozen(frozen_room: FrozenSlidingSyncRoom, client: Client) -> Self {
let FrozenSlidingSyncRoom { room_id, inner, timeline_queue } = frozen_room;
Self { client, room_id, inner, state: SlidingSyncRoomState::Preloaded, timeline_queue }
Self {
inner: Arc::new(SlidingSyncRoomInner {
client,
room_id,
inner: RwLock::new(inner),
state: RwLock::new(SlidingSyncRoomState::Preloaded),
timeline_queue: RwLock::new(timeline_queue),
}),
}
}
}
#[cfg(test)]
impl SlidingSyncRoom {
fn state(&self) -> SlidingSyncRoomState {
*self.inner.state.read().unwrap()
}
fn set_state(&mut self, state: SlidingSyncRoomState) {
*self.inner.state.write().unwrap() = state;
}
fn timeline_queue(&self) -> std::sync::RwLockReadGuard<Vector<SyncTimelineEvent>> {
self.inner.timeline_queue.read().unwrap()
}
}
#[derive(Debug)]
struct SlidingSyncRoomInner {
/// The client, used to fetch [`Room`][crate::room::Room].
client: Client,
/// The room ID.
room_id: OwnedRoomId,
/// The room representation as a `SlidingSync`'s response from the server.
///
/// We update this response when an update is needed.
inner: RwLock<v4::SlidingSyncRoom>,
/// Internal state of `Self`.
state: RwLock<SlidingSyncRoomState>,
/// A queue of received events, used to build a
/// [`Timeline`][crate::Timeline].
timeline_queue: RwLock<Vector<SyncTimelineEvent>>,
}
impl SlidingSyncRoomInner {
/// Get the previous batch.
fn prev_batch(&self) -> Option<String> {
let inner = self.inner.read().unwrap();
inner.prev_batch.clone()
}
fn timeline_builder(&self) -> Option<TimelineBuilder> {
if let Some(room) = self.client.get_room(&self.room_id) {
Some(
Timeline::builder(&room)
.events(self.prev_batch(), self.timeline_queue.read().unwrap().clone()),
)
} else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) {
Some(Timeline::builder(&invited_room).events(None, Vector::new()))
} else {
error!(
room_id = ?self.room_id,
"Room not found in client. Can't provide a timeline for it"
);
None
}
}
}
@@ -240,10 +302,10 @@ const NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE: usize = 10;
impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom {
fn from(value: &SlidingSyncRoom) -> Self {
let timeline_queue = &value.timeline_queue;
let timeline_queue = &value.inner.timeline_queue.read().unwrap();
let timeline_length = timeline_queue.len();
let mut inner = value.inner.clone();
let mut inner = value.inner.inner.read().unwrap().clone();
// To not overflow the cache, we only freeze the newest N items. On doing
// so, we must drop the `prev_batch` key however, as we'd otherwise
@@ -252,16 +314,17 @@ impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom {
let timeline_queue = if timeline_length > NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE {
inner.prev_batch = None;
timeline_queue
(*timeline_queue)
.iter()
.skip(timeline_length - NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE)
.cloned()
.collect()
.collect::<Vec<_>>()
.into()
} else {
timeline_queue.clone()
(*timeline_queue).clone()
};
Self { room_id: value.room_id.clone(), inner, timeline_queue }
Self { room_id: value.inner.room_id.clone(), inner, timeline_queue }
}
}
@@ -303,24 +366,24 @@ mod tests {
async fn test_state_from_not_loaded() {
let mut room = new_room(room_id!("!foo:bar.org"), room_response!({})).await;
assert_eq!(room.state, SlidingSyncRoomState::NotLoaded);
assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
// Update with an empty response, but it doesn't matter.
room.update(room_response!({}), vec![]);
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
}
#[tokio::test]
async fn test_state_from_preloaded() {
let mut room = new_room(room_id!("!foo:bar.org"), room_response!({})).await;
room.state = SlidingSyncRoomState::Preloaded;
room.set_state(SlidingSyncRoomState::Preloaded);
// Update with an empty response, but it doesn't matter.
room.update(room_response!({}), vec![]);
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
}
#[tokio::test]
@@ -378,7 +441,7 @@ mod tests {
test_room_name {
name() = None;
receives room_response!({"name": "gordon"});
_ = Some("gordon");
_ = Some("gordon".to_string());
}
test_room_is_dm {
@@ -416,11 +479,35 @@ mod tests {
receives room_response!({"highlight_count": 42});
_ = Some(uint!(42));
}
}
test_prev_batch {
prev_batch() = None;
receives room_response!({"prev_batch": "t111_222_333"});
_ = Some(&"t111_222_333".to_string());
#[tokio::test]
async fn test_prev_batch() {
// Default value.
{
let room = new_room(room_id!("!foo:bar.org"), room_response!({})).await;
assert_eq!(room.inner.prev_batch(), None);
}
// Some value when initializing.
{
let room =
new_room(room_id!("!foo:bar.org"), room_response!({"prev_batch": "t111_222_333"}))
.await;
assert_eq!(room.inner.prev_batch(), Some("t111_222_333".to_string()));
}
// Some value when updating.
{
let mut room = new_room(room_id!("!foo:bar.org"), room_response!({})).await;
assert_eq!(room.inner.prev_batch(), None);
room.update(room_response!({"prev_batch": "t111_222_333"}), vec![]);
assert_eq!(room.inner.prev_batch(), Some("t111_222_333".to_string()));
}
}
@@ -485,7 +572,7 @@ mod tests {
async fn test_timeline_initially_empty() {
let room = new_room(room_id!("!foo:bar.org"), room_response!({})).await;
assert!(room.timeline_queue.is_empty());
assert!(room.timeline_queue().is_empty());
}
macro_rules! timeline_event {
@@ -534,14 +621,18 @@ mod tests {
)
.await;
assert_eq!(room.state, SlidingSyncRoomState::NotLoaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
}
#[tokio::test]
@@ -556,26 +647,34 @@ mod tests {
)
.await;
assert_eq!(room.state, SlidingSyncRoomState::NotLoaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
room.update(room_response!({}), vec![]);
// The queue is unmodified.
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
}
#[tokio::test]
@@ -590,14 +689,18 @@ mod tests {
)
.await;
assert_eq!(room.state, SlidingSyncRoomState::NotLoaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
room.update(
room_response!({
@@ -607,8 +710,12 @@ mod tests {
);
// The queue has been emptied.
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.timeline_queue.len(), 0);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
assert_eq!(timeline_queue.len(), 0);
}
}
#[tokio::test]
@@ -623,16 +730,20 @@ mod tests {
)
.await;
room.state = SlidingSyncRoomState::Preloaded;
room.set_state(SlidingSyncRoomState::Preloaded);
assert_eq!(room.state, SlidingSyncRoomState::Preloaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::Preloaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
room.update(
room_response!({}),
@@ -643,14 +754,18 @@ mod tests {
);
// The queue is emptied, and new events are appended.
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x2:baz.org",
1 => "$x3:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x2:baz.org",
1 => "$x3:baz.org",
}
);
}
}
#[tokio::test]
@@ -665,14 +780,18 @@ mod tests {
)
.await;
assert_eq!(room.state, SlidingSyncRoomState::NotLoaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
room.update(
room_response!({}),
@@ -683,16 +802,20 @@ mod tests {
);
// New events are appended to the queue.
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.timeline_queue.len(), 4);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
2 => "$x2:baz.org",
3 => "$x3:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
assert_eq!(timeline_queue.len(), 4);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
2 => "$x2:baz.org",
3 => "$x3:baz.org",
}
);
}
}
#[tokio::test]
@@ -707,14 +830,19 @@ mod tests {
)
.await;
assert_eq!(room.state, SlidingSyncRoomState::NotLoaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x0:baz.org",
1 => "$x1:baz.org",
}
);
}
room.update(
room_response!({
@@ -727,14 +855,18 @@ mod tests {
);
// The queue is emptied, and new events are appended.
assert_eq!(room.state, SlidingSyncRoomState::Loaded);
assert_eq!(room.timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with room.timeline_queue {
0 => "$x2:baz.org",
1 => "$x3:baz.org",
}
);
{
let timeline_queue = room.timeline_queue();
assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
assert_eq!(timeline_queue.len(), 2);
assert_timeline_queue_event_ids!(
with timeline_queue {
0 => "$x2:baz.org",
1 => "$x3:baz.org",
}
);
}
}
#[test]