From c69d28ef7904bfe93afecdce7497ed5550957b42 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 15:06:09 +0100 Subject: [PATCH 1/6] feat(sdk): Remove clones of large responses in Sliding Sync. I admit this patch is quite tricky. Please try to follow me. So, first off, in `SlidingSyncRoom::new`, we were clearing the timeline, because somehow it exists twice in memory at this step. Which led me to understand how `SlidingSync::handle_response` was working. I've clarified how this part of the code works. We are dealing with 2 kind of responses for a specific reason: `SyncResponse` and `v4::Response`, now it's documented and I hope it's clearer. Then, I notice that we were passing a clone of the entire sliding sync response (`v4::Response`) to `Client::process_sliding_sync`. I thought it was suboptimal, so I've updated the code to take a reference. It led me to update `BaseClient::process_sliding_sync`. It was a little bit tricky, but I reckon we have less clones now than before. And now, back to `SlidingSync::handle_response`, I was able to compute the `timeline` correctly by draining it from the `v4::Response`, or by moving it from `SyncResponse`. So it's no longer necessary to have this clearing code inside `SlidingSyncRoom::new`. Honestly it has nothing to do at this place before. To conclude: We have cleaner code, and less clones. What thing I reckon could be optimized, is that the entire `timeline` (`Vec`) is cloned to be passed to `Client::handle_timeline`. So this timeline exists in 2 places: in Sliding Sync, and somewhere else. I don't believe it's a problem now, that's how it works, but we must be aware of that. --- crates/matrix-sdk-base/src/sliding_sync.rs | 37 +++++++++------ crates/matrix-sdk/src/sliding_sync/client.rs | 3 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 47 ++++++++++++++------ crates/matrix-sdk/src/sliding_sync/room.rs | 3 -- 4 files changed, 59 insertions(+), 31 deletions(-) diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 4a7dde386..23b36dd06 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -26,7 +26,7 @@ impl BaseClient { /// * `response` - The response that we received after a successful sliding /// sync. #[instrument(skip_all, level = "trace")] - pub async fn process_sliding_sync(&self, response: v4::Response) -> Result { + pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { #[allow(unused_variables)] let v4::Response { // FIXME not yet supported by sliding sync. see @@ -39,6 +39,7 @@ impl BaseClient { //presence, .. } = response; + info!(rooms = rooms.len(), lists = lists.len(), extensions = !extensions.is_empty()); if rooms.is_empty() && extensions.is_empty() { @@ -49,21 +50,31 @@ impl BaseClient { let v4::Extensions { to_device, e2ee, account_data, .. } = extensions; - let to_device_events = to_device.map(|v4| v4.events).unwrap_or_default(); + let to_device_events = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default(); // Destructure the single `None` of the E2EE extension into separate objects - // since that's what the OlmMachine API expects. Passing in the default - // empty maps and vecs for this is completely fine, since the OlmMachine + // since that's what the `OlmMachine` API expects. Passing in the default + // empty maps and vecs for this is completely fine, since the `OlmMachine` // assumes empty maps/vecs mean no change in the one-time key counts. + + // We declare default values that can be referenced hereinbelow. When we try to + // extract values from `e2ee`, that would be unfortunate to clone the + // value just to pass them (to remove them `e2ee`) as a reference later. + let device_one_time_keys_count = Default::default(); + let device_unused_fallback_key_types = Default::default(); + let (device_lists, device_one_time_keys_count, device_unused_fallback_key_types) = e2ee + .as_ref() .map(|e2ee| { ( - e2ee.device_lists, - e2ee.device_one_time_keys_count, - e2ee.device_unused_fallback_key_types, + e2ee.device_lists.clone(), + &e2ee.device_one_time_keys_count, + &e2ee.device_unused_fallback_key_types, ) }) - .unwrap_or_default(); + .unwrap_or_else(|| { + (Default::default(), &device_one_time_keys_count, &device_unused_fallback_key_types) + }); info!( to_device_events = to_device_events.len(), @@ -153,7 +164,7 @@ impl BaseClient { // } let room_account_data = if let Some(inner_account_data) = &account_data { - if let Some(events) = inner_account_data.rooms.get(&room_id) { + if let Some(events) = inner_account_data.rooms.get(&*room_id) { self.handle_room_account_data(&room_id, events, &mut changes).await; Some(events.to_vec()) } else { @@ -171,8 +182,8 @@ impl BaseClient { .handle_timeline( &room, room_data.limited, - room_data.timeline, - room_data.prev_batch, + room_data.timeline.clone(), + room_data.prev_batch.clone(), &push_rules, &mut user_ids, &mut room_info, @@ -246,7 +257,7 @@ impl BaseClient { debug!("applied changes"); let device_one_time_keys_count = - device_one_time_keys_count.into_iter().map(|(k, v)| (k, v.into())).collect(); + device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect(); Ok(SyncResponse { rooms: new_rooms, @@ -254,7 +265,7 @@ impl BaseClient { notifications: changes.notifications, // FIXME not yet supported by sliding sync. presence: Default::default(), - account_data: account_data.map(|a| a.global).unwrap_or_default(), + account_data: account_data.as_ref().map(|a| a.global.clone()).unwrap_or_default(), to_device_events, device_lists, device_one_time_keys_count, diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 6c5ce103a..68bb3ce63 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -14,11 +14,12 @@ impl Client { #[instrument(skip(self, response))] pub(crate) async fn process_sliding_sync( &self, - response: v4::Response, + response: &v4::Response, ) -> Result { let response = self.base_client().process_sliding_sync(response).await?; debug!("done processing on base_client"); self.handle_sync_response(&response).await?; + Ok(response) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index a60361367..9a242955d 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -975,28 +975,45 @@ impl SlidingSync { self.rooms.read().unwrap().values().cloned().collect() } + /// Handle the HTTP response. + /// + /// But which response? `v4::Response`, aka the Sliding Sync response, or + /// `SyncResponse` which still relies on `v3`? + /// Well that's tricky. We have both here, because this Sliding Sync + /// implementation is still experimental, and we didn't want to be too + /// invasive. Thus, `SyncResponse` doesn't support Sliding Sync yet. Hence + /// the fact this method handles both at the same time. It's not super + /// annoying but it was important to clarify that. #[instrument(skip_all, fields(views = views.len()))] async fn handle_response( &self, - resp: v4::Response, + sliding_sync_response: v4::Response, extensions: Option, views: &mut BTreeMap, ) -> Result { - let mut processed = self.client.process_sliding_sync(resp.clone()).await?; - debug!("main client processed."); - Observable::set(&mut self.pos.write().unwrap(), Some(resp.pos)); - Observable::set(&mut self.delta_token.write().unwrap(), resp.delta_token); + // We may not need the `sync_response` in the future (once `SyncResponse` will + // move to Sliding Sync, i.e. to `v4::Response`), but processing the + // `sliding_sync_response` is vital, so it must be done somewhere; for now it + // happens here. + let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?; - let update = { + debug!("sliding sync response has been processed"); + + Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos)); + Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token); + + let update_summary = { let mut rooms = Vec::new(); let mut rooms_map = self.rooms.write().unwrap(); - for (id, mut room_data) in resp.rooms.into_iter() { - let timeline = if let Some(joined_room) = processed.rooms.join.remove(&id) { + + for (id, mut room_data) in sliding_sync_response.rooms.into_iter() { + // `sync_response` contains the rooms with decrypted events if any, so look at + // the timeline events here first if the room exists. + // Otherwise, let's look at the timeline inside the `sliding_sync_response`. + let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) { joined_room.timeline.events } else { - let events = room_data.timeline.into_iter().map(Into::into).collect(); - room_data.timeline = vec![]; - events + room_data.timeline.drain(..).map(Into::into).collect() }; if let Some(mut room) = rooms_map.remove(&id) { @@ -1018,7 +1035,7 @@ impl SlidingSync { let mut updated_views = Vec::new(); - for (name, updates) in resp.lists { + for (name, updates) in sliding_sync_response.lists { let Some(generator) = views.get_mut(&name) else { error!("Response for view {name} - unknown to us. skipping"); continue @@ -1031,7 +1048,9 @@ impl SlidingSync { } // Update the `to-device` next-batch if found. - if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) { + if let Some(to_device_since) = + sliding_sync_response.extensions.to_device.map(|t| t.next_batch) + { self.update_to_device_since(to_device_since) } @@ -1046,7 +1065,7 @@ impl SlidingSync { self.cache_to_storage().await?; - Ok(update) + Ok(update_summary) } async fn sync_once( diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index f32141272..402d48cec 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -44,9 +44,6 @@ impl SlidingSyncRoom { mut inner: v4::SlidingSyncRoom, timeline: Vec, ) -> Self { - // we overwrite to only keep one copy - inner.timeline = vec![]; - let mut timeline_queue = ObservableVector::new(); timeline_queue.append(timeline.into_iter().collect()); From 54654aa0c77b949ce7c3ada04c55c52a318ca126 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:03:11 +0100 Subject: [PATCH 2/6] chore(sdk): Simplify references, thanks Clippy. --- crates/matrix-sdk-base/src/sliding_sync.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 23b36dd06..99cdae04c 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -91,7 +91,7 @@ impl BaseClient { self.preprocess_to_device_events( to_device_events, &device_lists, - &device_one_time_keys_count, + device_one_time_keys_count, device_unused_fallback_key_types.as_deref(), ) .await? @@ -109,14 +109,14 @@ impl BaseClient { let mut new_rooms = Rooms::default(); - for (room_id, room_data) in rooms.into_iter() { + for (room_id, room_data) in rooms { if !room_data.invite_state.is_empty() { let invite_states = &room_data.invite_state; - let room = store.get_or_create_stripped_room(&room_id).await; + let room = store.get_or_create_stripped_room(room_id).await; let mut room_info = room.clone_info(); room_info.mark_state_partially_synced(); - if let Some(r) = store.get_room(&room_id) { + if let Some(r) = store.get_room(room_id) { let mut room_info = r.clone_info(); room_info.mark_as_invited(); // FIXME: this might not be accurate room_info.mark_state_partially_synced(); @@ -130,7 +130,7 @@ impl BaseClient { v3::InvitedRoom::from(v3::InviteState::from(invite_states.clone())), ); } else { - let room = store.get_or_create_room(&room_id, RoomType::Joined).await; + let room = store.get_or_create_room(room_id, RoomType::Joined).await; let mut room_info = room.clone_info(); room_info.mark_as_joined(); // FIXME: this might not be accurate room_info.mark_state_partially_synced(); @@ -164,8 +164,8 @@ impl BaseClient { // } let room_account_data = if let Some(inner_account_data) = &account_data { - if let Some(events) = inner_account_data.rooms.get(&*room_id) { - self.handle_room_account_data(&room_id, events, &mut changes).await; + if let Some(events) = inner_account_data.rooms.get(room_id) { + self.handle_room_account_data(room_id, events, &mut changes).await; Some(events.to_vec()) } else { None @@ -199,8 +199,8 @@ impl BaseClient { // The room turned on encryption in this sync, we need // to also get all the existing users and mark them for // tracking. - let joined = store.get_joined_user_ids(&room_id).await?; - let invited = store.get_invited_user_ids(&room_id).await?; + let joined = store.get_joined_user_ids(room_id).await?; + let invited = store.get_invited_user_ids(room_id).await?; let user_ids: Vec<&UserId> = joined.iter().chain(&invited).map(Deref::deref).collect(); From af6cd85cf43aba75dc618016fc09a16d79dfff3d Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:27:19 +0100 Subject: [PATCH 3/6] chore(sdk): `SlidingSyncRoom::new` no longer needs a mutable `inner`. --- crates/matrix-sdk/src/sliding_sync/room.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index 402d48cec..3a1c89147 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -41,7 +41,7 @@ impl SlidingSyncRoom { pub(super) fn new( client: Client, room_id: OwnedRoomId, - mut inner: v4::SlidingSyncRoom, + inner: v4::SlidingSyncRoom, timeline: Vec, ) -> Self { let mut timeline_queue = ObservableVector::new(); From 3c44f87bee622ad892ee65553eb7b18782e2af36 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:39:39 +0100 Subject: [PATCH 4/6] doc(sdk): Simplify one documentation. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 9a242955d..3292ce7ea 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -978,12 +978,8 @@ impl SlidingSync { /// Handle the HTTP response. /// /// But which response? `v4::Response`, aka the Sliding Sync response, or - /// `SyncResponse` which still relies on `v3`? - /// Well that's tricky. We have both here, because this Sliding Sync - /// implementation is still experimental, and we didn't want to be too - /// invasive. Thus, `SyncResponse` doesn't support Sliding Sync yet. Hence - /// the fact this method handles both at the same time. It's not super - /// annoying but it was important to clarify that. + /// `SyncResponse`? We have both because `SyncResponse` doesn't support + /// Sliding Sync yet. #[instrument(skip_all, fields(views = views.len()))] async fn handle_response( &self, From 3b01e4f9a6ab8cd9d54a7acec3ca53c65c6a2729 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 16:39:56 +0100 Subject: [PATCH 5/6] chore(sdk): Use `T::default` instead of `Default::default`. --- crates/matrix-sdk-base/src/sliding_sync.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 99cdae04c..5eae74639 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -1,12 +1,16 @@ +use std::collections::BTreeMap; #[cfg(feature = "e2e-encryption")] use std::ops::Deref; -use ruma::api::client::sync::sync_events::{ - v3::{self, Ephemeral}, - v4, -}; #[cfg(feature = "e2e-encryption")] use ruma::UserId; +use ruma::{ + api::client::sync::sync_events::{ + v3::{self, Ephemeral}, + v4, DeviceLists, + }, + DeviceKeyAlgorithm, UInt, +}; use tracing::{debug, info, instrument}; use super::BaseClient; @@ -60,8 +64,8 @@ impl BaseClient { // We declare default values that can be referenced hereinbelow. When we try to // extract values from `e2ee`, that would be unfortunate to clone the // value just to pass them (to remove them `e2ee`) as a reference later. - let device_one_time_keys_count = Default::default(); - let device_unused_fallback_key_types = Default::default(); + let device_one_time_keys_count = BTreeMap::::default(); + let device_unused_fallback_key_types = None; let (device_lists, device_one_time_keys_count, device_unused_fallback_key_types) = e2ee .as_ref() @@ -73,7 +77,11 @@ impl BaseClient { ) }) .unwrap_or_else(|| { - (Default::default(), &device_one_time_keys_count, &device_unused_fallback_key_types) + ( + DeviceLists::default(), + &device_one_time_keys_count, + &device_unused_fallback_key_types, + ) }); info!( From 70380a6ee4622408734c78681182de4f21bc1158 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 27 Feb 2023 09:30:50 +0100 Subject: [PATCH 6/6] doc(sdk): Improve documentation of `SlidingSync::handle_response`. --- crates/matrix-sdk/src/sliding_sync/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 3292ce7ea..5f85b72f5 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -976,10 +976,6 @@ impl SlidingSync { } /// Handle the HTTP response. - /// - /// But which response? `v4::Response`, aka the Sliding Sync response, or - /// `SyncResponse`? We have both because `SyncResponse` doesn't support - /// Sliding Sync yet. #[instrument(skip_all, fields(views = views.len()))] async fn handle_response( &self, @@ -987,6 +983,8 @@ impl SlidingSync { extensions: Option, views: &mut BTreeMap, ) -> Result { + // Handle and transform a Sliding Sync Response to a `SyncResponse`. + // // We may not need the `sync_response` in the future (once `SyncResponse` will // move to Sliding Sync, i.e. to `v4::Response`), but processing the // `sliding_sync_response` is vital, so it must be done somewhere; for now it