diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 4a7dde386..5eae74639 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -1,12 +1,16 @@ +use std::collections::BTreeMap; #[cfg(feature = "e2e-encryption")] use std::ops::Deref; -use ruma::api::client::sync::sync_events::{ - v3::{self, Ephemeral}, - v4, -}; #[cfg(feature = "e2e-encryption")] use ruma::UserId; +use ruma::{ + api::client::sync::sync_events::{ + v3::{self, Ephemeral}, + v4, DeviceLists, + }, + DeviceKeyAlgorithm, UInt, +}; use tracing::{debug, info, instrument}; use super::BaseClient; @@ -26,7 +30,7 @@ impl BaseClient { /// * `response` - The response that we received after a successful sliding /// sync. #[instrument(skip_all, level = "trace")] - pub async fn process_sliding_sync(&self, response: v4::Response) -> Result { + pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { #[allow(unused_variables)] let v4::Response { // FIXME not yet supported by sliding sync. see @@ -39,6 +43,7 @@ impl BaseClient { //presence, .. } = response; + info!(rooms = rooms.len(), lists = lists.len(), extensions = !extensions.is_empty()); if rooms.is_empty() && extensions.is_empty() { @@ -49,21 +54,35 @@ impl BaseClient { let v4::Extensions { to_device, e2ee, account_data, .. } = extensions; - let to_device_events = to_device.map(|v4| v4.events).unwrap_or_default(); + let to_device_events = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default(); // Destructure the single `None` of the E2EE extension into separate objects - // since that's what the OlmMachine API expects. Passing in the default - // empty maps and vecs for this is completely fine, since the OlmMachine + // since that's what the `OlmMachine` API expects. Passing in the default + // empty maps and vecs for this is completely fine, since the `OlmMachine` // assumes empty maps/vecs mean no change in the one-time key counts. + + // We declare default values that can be referenced hereinbelow. When we try to + // extract values from `e2ee`, that would be unfortunate to clone the + // value just to pass them (to remove them `e2ee`) as a reference later. + let device_one_time_keys_count = BTreeMap::::default(); + let device_unused_fallback_key_types = None; + let (device_lists, device_one_time_keys_count, device_unused_fallback_key_types) = e2ee + .as_ref() .map(|e2ee| { ( - e2ee.device_lists, - e2ee.device_one_time_keys_count, - e2ee.device_unused_fallback_key_types, + e2ee.device_lists.clone(), + &e2ee.device_one_time_keys_count, + &e2ee.device_unused_fallback_key_types, ) }) - .unwrap_or_default(); + .unwrap_or_else(|| { + ( + DeviceLists::default(), + &device_one_time_keys_count, + &device_unused_fallback_key_types, + ) + }); info!( to_device_events = to_device_events.len(), @@ -80,7 +99,7 @@ impl BaseClient { self.preprocess_to_device_events( to_device_events, &device_lists, - &device_one_time_keys_count, + device_one_time_keys_count, device_unused_fallback_key_types.as_deref(), ) .await? @@ -98,14 +117,14 @@ impl BaseClient { let mut new_rooms = Rooms::default(); - for (room_id, room_data) in rooms.into_iter() { + for (room_id, room_data) in rooms { if !room_data.invite_state.is_empty() { let invite_states = &room_data.invite_state; - let room = store.get_or_create_stripped_room(&room_id).await; + let room = store.get_or_create_stripped_room(room_id).await; let mut room_info = room.clone_info(); room_info.mark_state_partially_synced(); - if let Some(r) = store.get_room(&room_id) { + if let Some(r) = store.get_room(room_id) { let mut room_info = r.clone_info(); room_info.mark_as_invited(); // FIXME: this might not be accurate room_info.mark_state_partially_synced(); @@ -119,7 +138,7 @@ impl BaseClient { v3::InvitedRoom::from(v3::InviteState::from(invite_states.clone())), ); } else { - let room = store.get_or_create_room(&room_id, RoomType::Joined).await; + let room = store.get_or_create_room(room_id, RoomType::Joined).await; let mut room_info = room.clone_info(); room_info.mark_as_joined(); // FIXME: this might not be accurate room_info.mark_state_partially_synced(); @@ -153,8 +172,8 @@ impl BaseClient { // } let room_account_data = if let Some(inner_account_data) = &account_data { - if let Some(events) = inner_account_data.rooms.get(&room_id) { - self.handle_room_account_data(&room_id, events, &mut changes).await; + if let Some(events) = inner_account_data.rooms.get(room_id) { + self.handle_room_account_data(room_id, events, &mut changes).await; Some(events.to_vec()) } else { None @@ -171,8 +190,8 @@ impl BaseClient { .handle_timeline( &room, room_data.limited, - room_data.timeline, - room_data.prev_batch, + room_data.timeline.clone(), + room_data.prev_batch.clone(), &push_rules, &mut user_ids, &mut room_info, @@ -188,8 +207,8 @@ impl BaseClient { // The room turned on encryption in this sync, we need // to also get all the existing users and mark them for // tracking. - let joined = store.get_joined_user_ids(&room_id).await?; - let invited = store.get_invited_user_ids(&room_id).await?; + let joined = store.get_joined_user_ids(room_id).await?; + let invited = store.get_invited_user_ids(room_id).await?; let user_ids: Vec<&UserId> = joined.iter().chain(&invited).map(Deref::deref).collect(); @@ -246,7 +265,7 @@ impl BaseClient { debug!("applied changes"); let device_one_time_keys_count = - device_one_time_keys_count.into_iter().map(|(k, v)| (k, v.into())).collect(); + device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect(); Ok(SyncResponse { rooms: new_rooms, @@ -254,7 +273,7 @@ impl BaseClient { notifications: changes.notifications, // FIXME not yet supported by sliding sync. presence: Default::default(), - account_data: account_data.map(|a| a.global).unwrap_or_default(), + account_data: account_data.as_ref().map(|a| a.global.clone()).unwrap_or_default(), to_device_events, device_lists, device_one_time_keys_count, diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 6c5ce103a..68bb3ce63 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -14,11 +14,12 @@ impl Client { #[instrument(skip(self, response))] pub(crate) async fn process_sliding_sync( &self, - response: v4::Response, + response: &v4::Response, ) -> Result { let response = self.base_client().process_sliding_sync(response).await?; debug!("done processing on base_client"); self.handle_sync_response(&response).await?; + Ok(response) } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index a60361367..5f85b72f5 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -975,28 +975,39 @@ impl SlidingSync { self.rooms.read().unwrap().values().cloned().collect() } + /// Handle the HTTP response. #[instrument(skip_all, fields(views = views.len()))] async fn handle_response( &self, - resp: v4::Response, + sliding_sync_response: v4::Response, extensions: Option, views: &mut BTreeMap, ) -> Result { - let mut processed = self.client.process_sliding_sync(resp.clone()).await?; - debug!("main client processed."); - Observable::set(&mut self.pos.write().unwrap(), Some(resp.pos)); - Observable::set(&mut self.delta_token.write().unwrap(), resp.delta_token); + // Handle and transform a Sliding Sync Response to a `SyncResponse`. + // + // We may not need the `sync_response` in the future (once `SyncResponse` will + // move to Sliding Sync, i.e. to `v4::Response`), but processing the + // `sliding_sync_response` is vital, so it must be done somewhere; for now it + // happens here. + let mut sync_response = self.client.process_sliding_sync(&sliding_sync_response).await?; - let update = { + debug!("sliding sync response has been processed"); + + Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos)); + Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token); + + let update_summary = { let mut rooms = Vec::new(); let mut rooms_map = self.rooms.write().unwrap(); - for (id, mut room_data) in resp.rooms.into_iter() { - let timeline = if let Some(joined_room) = processed.rooms.join.remove(&id) { + + for (id, mut room_data) in sliding_sync_response.rooms.into_iter() { + // `sync_response` contains the rooms with decrypted events if any, so look at + // the timeline events here first if the room exists. + // Otherwise, let's look at the timeline inside the `sliding_sync_response`. + let timeline = if let Some(joined_room) = sync_response.rooms.join.remove(&id) { joined_room.timeline.events } else { - let events = room_data.timeline.into_iter().map(Into::into).collect(); - room_data.timeline = vec![]; - events + room_data.timeline.drain(..).map(Into::into).collect() }; if let Some(mut room) = rooms_map.remove(&id) { @@ -1018,7 +1029,7 @@ impl SlidingSync { let mut updated_views = Vec::new(); - for (name, updates) in resp.lists { + for (name, updates) in sliding_sync_response.lists { let Some(generator) = views.get_mut(&name) else { error!("Response for view {name} - unknown to us. skipping"); continue @@ -1031,7 +1042,9 @@ impl SlidingSync { } // Update the `to-device` next-batch if found. - if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) { + if let Some(to_device_since) = + sliding_sync_response.extensions.to_device.map(|t| t.next_batch) + { self.update_to_device_since(to_device_since) } @@ -1046,7 +1059,7 @@ impl SlidingSync { self.cache_to_storage().await?; - Ok(update) + Ok(update_summary) } async fn sync_once( diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs index f32141272..3a1c89147 100644 --- a/crates/matrix-sdk/src/sliding_sync/room.rs +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -41,12 +41,9 @@ impl SlidingSyncRoom { pub(super) fn new( client: Client, room_id: OwnedRoomId, - mut inner: v4::SlidingSyncRoom, + inner: v4::SlidingSyncRoom, timeline: Vec, ) -> Self { - // we overwrite to only keep one copy - inner.timeline = vec![]; - let mut timeline_queue = ObservableVector::new(); timeline_queue.append(timeline.into_iter().collect());