From c69d28ef7904bfe93afecdce7497ed5550957b42 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 23 Feb 2023 15:06:09 +0100 Subject: [PATCH] feat(sdk): Remove clones of large responses in Sliding Sync. I admit this patch is quite tricky. Please try to follow me. So, first off, in `SlidingSyncRoom::new`, we were clearing the timeline, because somehow it exists twice in memory at this step. Which led me to understand how `SlidingSync::handle_response` was working. I've clarified how this part of the code works. We are dealing with 2 kind of responses for a specific reason: `SyncResponse` and `v4::Response`, now it's documented and I hope it's clearer. Then, I notice that we were passing a clone of the entire sliding sync response (`v4::Response`) to `Client::process_sliding_sync`. I thought it was suboptimal, so I've updated the code to take a reference. It led me to update `BaseClient::process_sliding_sync`. It was a little bit tricky, but I reckon we have less clones now than before. And now, back to `SlidingSync::handle_response`, I was able to compute the `timeline` correctly by draining it from the `v4::Response`, or by moving it from `SyncResponse`. So it's no longer necessary to have this clearing code inside `SlidingSyncRoom::new`. Honestly it has nothing to do at this place before. To conclude: We have cleaner code, and less clones. What thing I reckon could be optimized, is that the entire `timeline` (`Vec`) is cloned to be passed to `Client::handle_timeline`. So this timeline exists in 2 places: in Sliding Sync, and somewhere else. I don't believe it's a problem now, that's how it works, but we must be aware of that. --- crates/matrix-sdk-base/src/sliding_sync.rs | 37 +++++++++------ crates/matrix-sdk/src/sliding_sync/client.rs | 3 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 47 ++++++++++++++------ crates/matrix-sdk/src/sliding_sync/room.rs | 3 -- 4 files changed, 59 insertions(+), 31 deletions(-) diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 4a7dde386..23b36dd06 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -26,7 +26,7 @@ impl BaseClient { /// * `response` - The response that we received after a successful sliding /// sync. #[instrument(skip_all, level = "trace")] - pub async fn process_sliding_sync(&self, response: v4::Response) -> Result { + pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { #[allow(unused_variables)] let v4::Response { // FIXME not yet supported by sliding sync. see @@ -39,6 +39,7 @@ impl BaseClient { //presence, .. } = response; + info!(rooms = rooms.len(), lists = lists.len(), extensions = !extensions.is_empty()); if rooms.is_empty() && extensions.is_empty() { @@ -49,21 +50,31 @@ impl BaseClient { let v4::Extensions { to_device, e2ee, account_data, .. } = extensions; - let to_device_events = to_device.map(|v4| v4.events).unwrap_or_default(); + let to_device_events = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default(); // Destructure the single `None` of the E2EE extension into separate objects - // since that's what the OlmMachine API expects. Passing in the default - // empty maps and vecs for this is completely fine, since the OlmMachine + // since that's what the `OlmMachine` API expects. Passing in the default + // empty maps and vecs for this is completely fine, since the `OlmMachine` // assumes empty maps/vecs mean no change in the one-time key counts. + + // We declare default values that can be referenced hereinbelow. When we try to + // extract values from `e2ee`, that would be unfortunate to clone the + // value just to pass them (to remove them `e2ee`) as a reference later. + let device_one_time_keys_count = Default::default(); + let device_unused_fallback_key_types = Default::default(); + let (device_lists, device_one_time_keys_count, device_unused_fallback_key_types) = e2ee + .as_ref() .map(|e2ee| { ( - e2ee.device_lists, - e2ee.device_one_time_keys_count, - e2ee.device_unused_fallback_key_types, + e2ee.device_lists.clone(), + &e2ee.device_one_time_keys_count, + &e2ee.device_unused_fallback_key_types, ) }) - .unwrap_or_default(); + .unwrap_or_else(|| { + (Default::default(), &device_one_time_keys_count, &device_unused_fallback_key_types) + }); info!( to_device_events = to_device_events.len(), @@ -153,7 +164,7 @@ impl BaseClient { // } let room_account_data = if let Some(inner_account_data) = &account_data { - if let Some(events) = inner_account_data.rooms.get(&room_id) { + if let Some(events) = inner_account_data.rooms.get(&*room_id) { self.handle_room_account_data(&room_id, events, &mut changes).await; Some(events.to_vec()) } else { @@ -171,8 +182,8 @@ impl BaseClient { .handle_timeline( &room, room_data.limited, - room_data.timeline, - room_data.prev_batch, + room_data.timeline.clone(), + room_data.prev_batch.clone(), &push_rules, &mut user_ids, &mut room_info, @@ -246,7 +257,7 @@ impl BaseClient { debug!("applied changes"); let device_one_time_keys_count = - device_one_time_keys_count.into_iter().map(|(k, v)| (k, v.into())).collect(); + device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect(); Ok(SyncResponse { rooms: new_rooms, @@ -254,7 +265,7 @@ impl BaseClient { notifications: changes.notifications, // FIXME not yet supported by sliding sync. presence: Default::default(), - account_data: account_data.map(|a| a.global).unwrap_or_default(), + account_data: account_data.as_ref().map(|a| a.global.clone()).unwrap_or_default(), to_device_events, device_lists, device_one_time_keys_count, diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 6c5ce103a..68bb3ce63 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -14,11 +14,12 @@ impl Client { #[instrument(skip(self, response))] pub(crate) async fn process_sliding_sync( &self, - response: v4::Response, + response: &v4::Response, ) -> Result { let response = self.base_client().process_sliding_sync(response).await?; debug!("done processing on base_client"); self.handle_sync_response(&response).await?; + Ok(response) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index a60361367..9a242955d 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -975,28 +975,45 @@ impl SlidingSync { self.rooms.read().unwrap().values().cloned().collect() } + /// Handle the HTTP response. + /// + /// But which response? `v4::Response`, aka the Sliding Sync response, or + /// `SyncResponse` which still relies on `v3`? + /// Well that's tricky. We have both here, because this Sliding Sync + /// implementation is still experimental, and we didn't want to be too + /// invasive. Thus, `SyncResponse` doesn't support Sliding Sync yet. Hence + /// the fact this method handles both at the same time. It's not super + /// annoying but it was important to clarify that. #[instrument(skip_all, fields(views = views.len()))] async fn handle_response( &self, - resp: v4::Response, + sliding_sync_response: v4::Response, extensions: Option, views: &mut BTreeMap, ) -> Result { - let mut processed = self.client.process_sliding_sync(resp.clone()).await?; - debug!("main client processed."); - Observable::set(&mut self.pos.write().unwrap(), Some(resp.pos)); - Observable::set(&mut self.delta_token.write().unwrap(), resp.delta_token); + // We may not need the `sync_response` in the future (once `SyncResponse` will + // move to Sliding Sync, i.e. to `v4::Response`), but processing the + // `sliding_sync_response` is vital, so it must be done somewhere; for now it + // happens here. + let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?; - let update = { + debug!("sliding sync response has been processed"); + + Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos)); + Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token); + + let update_summary = { let mut rooms = Vec::new(); let mut rooms_map = self.rooms.write().unwrap(); - for (id, mut room_data) in resp.rooms.into_iter() { - let timeline = if let Some(joined_room) = processed.rooms.join.remove(&id) { + + for (id, mut room_data) in sliding_sync_response.rooms.into_iter() { + // `sync_response` contains the rooms with decrypted events if any, so look at + // the timeline events here first if the room exists. + // Otherwise, let's look at the timeline inside the `sliding_sync_response`. + let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) { joined_room.timeline.events } else { - let events = room_data.timeline.into_iter().map(Into::into).collect(); - room_data.timeline = vec![]; - events + room_data.timeline.drain(..).map(Into::into).collect() }; if let Some(mut room) = rooms_map.remove(&id) { @@ -1018,7 +1035,7 @@ impl SlidingSync { let mut updated_views = Vec::new(); - for (name, updates) in resp.lists { + for (name, updates) in sliding_sync_response.lists { let Some(generator) = views.get_mut(&name) else { error!("Response for view {name} - unknown to us. skipping"); continue @@ -1031,7 +1048,9 @@ impl SlidingSync { } // Update the `to-device` next-batch if found. - if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) { + if let Some(to_device_since) = + sliding_sync_response.extensions.to_device.map(|t| t.next_batch) + { self.update_to_device_since(to_device_since) } @@ -1046,7 +1065,7 @@ impl SlidingSync { self.cache_to_storage().await?; - Ok(update) + Ok(update_summary) } async fn sync_once( diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index f32141272..402d48cec 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -44,9 +44,6 @@ impl SlidingSyncRoom { mut inner: v4::SlidingSyncRoom, timeline: Vec, ) -> Self { - // we overwrite to only keep one copy - inner.timeline = vec![]; - let mut timeline_queue = ObservableVector::new(); timeline_queue.append(timeline.into_iter().collect());