diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 0009f3ad6..556c0ec36 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -916,12 +916,6 @@ impl BaseClient { presence: response.presence.events, account_data: response.account_data.events, to_device, - device_lists: response.device_lists, - device_one_time_keys_count: response - .device_one_time_keys_count - .into_iter() - .map(|(k, v)| (k, v.into())) - .collect(), ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes }, notifications: changes.notifications, }; diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 0b30763c0..93e8d7490 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -21,7 +21,8 @@ use ruma::{ v3::{self, InvitedRoom, RoomSummary}, v4::{self, AccountData}, }, - events::AnySyncStateEvent, + events::{AnySyncStateEvent, AnyToDeviceEvent}, + serde::Raw, RoomId, }; use tracing::{debug, info, instrument, warn}; @@ -41,6 +42,57 @@ use crate::{ }; impl BaseClient { + #[cfg(feature = "e2e-encryption")] + /// Processes the E2EE-related events from the Sliding Sync response. + /// + /// In addition to writes to the crypto store, this may also write into the + /// state store, in particular it may write latest-events to the state + /// store. + pub async fn process_sliding_sync_e2ee( + &self, + extensions: &v4::Extensions, + ) -> Result>> { + if extensions.is_empty() { + return Ok(Default::default()); + } + + let v4::Extensions { to_device, e2ee, .. } = extensions; + + let to_device = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default(); + + info!( + to_device_events = to_device.len(), + device_one_time_keys_count = e2ee.device_one_time_keys_count.len(), + device_unused_fallback_key_types = + e2ee.device_unused_fallback_key_types.as_ref().map(|v| v.len()) + ); + + let mut changes = StateChanges::default(); + + // Process the to-device events and other related e2ee data. This returns a list + // of all the to-device events that were passed in but encrypted ones + // were replaced with their decrypted version. + // Passing in the default empty maps and vecs for this is completely fine, since + // the `OlmMachine` assumes empty maps/vecs mean no change in the one-time key + // counts. + let to_device = self + .preprocess_to_device_events( + to_device, + &e2ee.device_lists, + &e2ee.device_one_time_keys_count, + e2ee.device_unused_fallback_key_types.as_deref(), + &mut changes, + ) + .await?; + + debug!("ready to submit changes to store"); + self.store.save_changes(&changes).await?; + self.apply_changes(&changes).await; + debug!("applied changes"); + + Ok(to_device) + } + /// Process a response from a sliding sync call. /// /// # Arguments @@ -49,7 +101,6 @@ impl BaseClient { /// sync. #[instrument(skip_all, level = "trace")] pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { - #[allow(unused_variables)] let v4::Response { // FIXME not yet supported by sliding sync. see // https://github.com/matrix-org/matrix-rust-sdk/issues/1014 @@ -70,36 +121,10 @@ impl BaseClient { return Ok(SyncResponse::default()); }; - let v4::Extensions { to_device, e2ee, account_data, receipts, .. } = extensions; - - let to_device = to_device.as_ref().map(|v4| v4.events.clone()).unwrap_or_default(); - - info!( - to_device_events = to_device.len(), - device_one_time_keys_count = e2ee.device_one_time_keys_count.len(), - device_unused_fallback_key_types = - e2ee.device_unused_fallback_key_types.as_ref().map(|v| v.len()) - ); + let v4::Extensions { account_data, receipts, .. } = extensions; let mut changes = StateChanges::default(); - // Process the to-device events and other related e2ee data. This returns a list - // of all the to-device events that were passed in but encrypted ones - // were replaced with their decrypted version. - // Passing in the default empty maps and vecs for this is completely fine, since - // the `OlmMachine` assumes empty maps/vecs mean no change in the one-time key - // counts. - #[cfg(feature = "e2e-encryption")] - let to_device = self - .preprocess_to_device_events( - to_device, - &e2ee.device_lists, - &e2ee.device_one_time_keys_count, - e2ee.device_unused_fallback_key_types.as_deref(), - &mut changes, - ) - .await?; - let store = self.store.clone(); let mut ambiguity_cache = AmbiguityCache::new(store.inner.clone()); @@ -172,9 +197,6 @@ impl BaseClient { self.apply_changes(&changes).await; debug!("applied changes"); - let device_one_time_keys_count = - e2ee.device_one_time_keys_count.iter().map(|(k, v)| (k.clone(), (*v).into())).collect(); - Ok(SyncResponse { rooms: new_rooms, ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes }, @@ -182,9 +204,7 @@ impl BaseClient { // FIXME not yet supported by sliding sync. presence: Default::default(), account_data: account_data.global.clone(), - to_device, - device_lists: e2ee.device_lists.clone(), - device_one_time_keys_count, + to_device: Default::default(), }) } diff --git a/crates/matrix-sdk-base/src/sync.rs b/crates/matrix-sdk-base/src/sync.rs index 0e275fa3e..994733e75 100644 --- a/crates/matrix-sdk-base/src/sync.rs +++ b/crates/matrix-sdk-base/src/sync.rs @@ -21,7 +21,7 @@ use ruma::{ api::client::{ push::get_notifications::v3::Notification, sync::sync_events::{ - v3::InvitedRoom, DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount, + v3::InvitedRoom, UnreadNotificationsCount as RumaUnreadNotificationsCount, }, }, events::{ @@ -29,7 +29,7 @@ use ruma::{ AnySyncEphemeralRoomEvent, AnySyncStateEvent, AnyToDeviceEvent, }, serde::Raw, - DeviceKeyAlgorithm, OwnedRoomId, + OwnedRoomId, }; use serde::{Deserialize, Serialize}; @@ -54,13 +54,6 @@ pub struct SyncResponse { pub account_data: Vec>, /// Messages sent directly between devices. pub to_device: Vec>, - /// Information on E2E device updates. - /// - /// Only present on an incremental sync. - pub device_lists: DeviceLists, - /// For each key algorithm, the number of unclaimed one-time keys - /// currently held on the server for a device. - pub device_one_time_keys_count: BTreeMap, /// Collection of ambiguity changes that room member events trigger. pub ambiguity_changes: AmbiguityChanges, /// New notifications per room. @@ -74,8 +67,6 @@ impl fmt::Debug for SyncResponse { .field("rooms", &self.rooms) .field("account_data", &DebugListOfRawEventsNoId(&self.account_data)) .field("to_device", &DebugListOfRawEventsNoId(&self.to_device)) - .field("device_lists", &self.device_lists) - .field("device_one_time_keys_count", &self.device_one_time_keys_count) .field("ambiguity_changes", &self.ambiguity_changes) .field("notifications", &DebugNotificationMap(&self.notifications)) .finish_non_exhaustive() diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 897cc8fb8..02f462a4a 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -537,7 +537,7 @@ mod test { // And the room is stored in the client so it can be extracted when needed let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.unwrap(); + client.process_sliding_sync(&response, vec![]).await.unwrap(); // When we construct a timeline event from it let timeline_item = EventTimelineItem::from_latest_event(&ss_room, event).await.unwrap(); diff --git a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs index bbfa1c0e0..a0a2f6ede 100644 --- a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs +++ b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs @@ -122,7 +122,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); room.timeline.push(event.event); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response).await.unwrap(); + client.process_sliding_sync(&response, vec![]).await.unwrap(); } fn message_event( diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 29b70a351..9dfb5b895 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -1,5 +1,5 @@ use matrix_sdk_base::sync::SyncResponse; -use ruma::api::client::sync::sync_events::v4; +use ruma::{api::client::sync::sync_events::v4, events::AnyToDeviceEvent, serde::Raw}; use tracing::{debug, instrument}; use super::{SlidingSync, SlidingSyncBuilder}; @@ -14,10 +14,28 @@ impl Client { Ok(SlidingSync::builder(id.into(), self.clone())?) } - /// Handle all the information provided in a sliding sync response + /// Handle all the e2ee information provided in a sliding sync response. + #[cfg(feature = "e2e-encryption")] + pub(crate) async fn process_sliding_sync_e2ee( + &self, + extensions: &v4::Extensions, + ) -> Result>> { + Ok(self.base_client().process_sliding_sync_e2ee(extensions).await?) + } + + /// Handle all the information provided in a sliding sync response, except + /// for the e2ee bits that are handled by `process_sliding_sync_e2ee` + /// (and which results can be passed as the second argument). #[instrument(skip(self, response))] - pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { - let response = self.base_client().process_sliding_sync(response).await?; + pub async fn process_sliding_sync( + &self, + response: &v4::Response, + to_device_events: Vec>, + ) -> Result { + let mut response = self.base_client().process_sliding_sync(response).await?; + + response.to_device.extend(to_device_events); + debug!("done processing on base_client"); self.handle_sync_response(&response).await?; diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index fed457a8c..d0882cb26 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -38,6 +38,7 @@ pub use client::*; pub use error::*; use futures_core::stream::Stream; pub use list::*; +use matrix_sdk_base::sync::SyncResponse; pub use room::*; use ruma::{ api::client::{ @@ -278,8 +279,30 @@ impl SlidingSync { // move to Sliding Sync, i.e. to `v4::Response`), but processing the // `sliding_sync_response` is vital, so it must be done somewhere; for now it // happens here. - let mut sync_response = - self.inner.client.process_sliding_sync(&sliding_sync_response).await?; + + let to_device_events = Vec::new(); + + #[cfg(feature = "e2e-encryption")] + let to_device_events = if self.is_e2ee_enabled() { + self.inner.client.process_sliding_sync_e2ee(&sliding_sync_response.extensions).await? + } else { + to_device_events + }; + + // Only handle the room's subsection of the response, if this sliding sync was configured + // to do so. That's because even when not requesting it, sometimes the current (2023-07-20) + // proxy will forward room events unrelated to the current connection's parameters. + // NOTE: SS proxy workaround. + let handle_room_response = { + !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty() + || !self.inner.lists.read().await.is_empty() + }; + + let mut sync_response = if handle_room_response { + self.inner.client.process_sliding_sync(&sliding_sync_response, to_device_events).await? + } else { + assign!(SyncResponse::default(), { to_device: to_device_events }) + }; debug!(?sync_response, "Sliding Sync response has been handled by the client"); @@ -464,6 +487,12 @@ impl SlidingSync { )) } + #[cfg(feature = "e2e-encryption")] + /// Is the e2ee extension enabled for this sliding sync instance? + fn is_e2ee_enabled(&self) -> bool { + self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true) + } + #[instrument(skip_all, fields(pos))] async fn sync_once(&self) -> Result { let (request, request_config, requested_room_unsubscriptions) = @@ -486,7 +515,7 @@ impl SlidingSync { #[cfg(feature = "e2e-encryption")] let response = { - if self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true) { + if self.is_e2ee_enabled() { debug!("Sliding Sync is sending the request along with outgoing E2EE requests"); // Here, we need to run 2 things: @@ -570,7 +599,7 @@ impl SlidingSync { // unsubscriptions buffer. However, it would be an error to empty it entirely as // more unsubscriptions could have been inserted during the request/response // dance. So let's cherry-pick which unsubscriptions to remove. - { + if !requested_room_unsubscriptions.is_empty() { let room_unsubscriptions = &mut *this.inner.room_unsubscriptions.write().unwrap(); room_unsubscriptions @@ -828,6 +857,7 @@ impl StickyData for SlidingSyncStickyParameters { /// correctly, so we cheat and "correct" it using heuristics here. /// TODO remove this workaround as soon as support of the `limited` flag is /// properly implemented in the open-source proxy: https://github.com/matrix-org/sliding-sync/issues/197 +// NOTE: SS proxy workaround. fn compute_limited( known_rooms: &BTreeMap, response_rooms: &mut BTreeMap, diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs index 964b183e8..f76342a89 100644 --- a/crates/matrix-sdk/src/sync.rs +++ b/crates/matrix-sdk/src/sync.rs @@ -31,11 +31,11 @@ use matrix_sdk_base::{ use ruma::{ api::client::{ push::get_notifications::v3::Notification, - sync::sync_events::{self, v3::InvitedRoom, DeviceLists}, + sync::sync_events::{self, v3::InvitedRoom}, }, events::{presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyToDeviceEvent}, serde::Raw, - DeviceKeyAlgorithm, OwnedRoomId, RoomId, + OwnedRoomId, RoomId, }; use tracing::{debug, error, warn}; @@ -55,13 +55,6 @@ pub struct SyncResponse { pub account_data: Vec>, /// Messages sent directly between devices. pub to_device: Vec>, - /// Information on E2E device updates. - /// - /// Only present on an incremental sync. - pub device_lists: DeviceLists, - /// For each key algorithm, the number of unclaimed one-time keys - /// currently held on the server for a device. - pub device_one_time_keys_count: BTreeMap, /// Collection of ambiguity changes that room member events trigger. pub ambiguity_changes: AmbiguityChanges, /// New notifications per room. @@ -75,8 +68,6 @@ impl SyncResponse { presence, account_data, to_device, - device_lists, - device_one_time_keys_count, ambiguity_changes, notifications, } = base_response; @@ -87,8 +78,6 @@ impl SyncResponse { presence, account_data, to_device, - device_lists, - device_one_time_keys_count, ambiguity_changes, notifications, } @@ -102,8 +91,6 @@ impl fmt::Debug for SyncResponse { .field("rooms", &self.rooms) .field("account_data", &DebugListOfRawEventsNoId(&self.account_data)) .field("to_device", &DebugListOfRawEventsNoId(&self.to_device)) - .field("device_lists", &self.device_lists) - .field("device_one_time_keys_count", &self.device_one_time_keys_count) .field("ambiguity_changes", &self.ambiguity_changes) .field("notifications", &DebugNotificationMap(&self.notifications)) .finish_non_exhaustive() @@ -173,8 +160,6 @@ impl Client { presence, account_data, to_device, - device_lists: _, - device_one_time_keys_count: _, ambiguity_changes: _, notifications, } = response;