From 020d5aa29201760aecac83065e1209c0f4a67a7d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 24 Jul 2023 15:41:28 +0200 Subject: [PATCH] fix: use a little state machine to handle the response and that allows to make sure that all event handlers are correctly called for all events contained in the response. --- crates/matrix-sdk-base/Cargo.toml | 2 +- crates/matrix-sdk-base/src/rooms/normal.rs | 2 +- crates/matrix-sdk-base/src/sliding_sync.rs | 12 ++-- crates/matrix-sdk-crypto/src/machine.rs | 2 +- crates/matrix-sdk-indexeddb/Cargo.toml | 1 + .../src/timeline/event_item/mod.rs | 2 +- .../src/timeline/sliding_sync_ext.rs | 2 +- crates/matrix-sdk/Cargo.toml | 2 +- crates/matrix-sdk/src/encryption/mod.rs | 2 +- crates/matrix-sdk/src/sliding_sync/client.rs | 69 ++++++++++++++----- crates/matrix-sdk/src/sliding_sync/mod.rs | 40 ++++++----- xtask/src/ci.rs | 2 +- 12 files changed, 88 insertions(+), 50 deletions(-) diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index 1696553ba..86c4b0f3b 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -25,7 +25,7 @@ message-ids = ["matrix-sdk-crypto?/message-ids"] experimental-sliding-sync = ["ruma/unstable-msc3575"] # helpers for testing features build upon this -testing = ["dep:http", "dep:matrix-sdk-test", "dep:assert_matches"] +testing = ["dep:http", "dep:matrix-sdk-test", "dep:assert_matches", "matrix-sdk-crypto?/testing"] [dependencies] assert_matches = { workspace = true, optional = true } diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 54b0cf13d..8e4216013 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -383,7 +383,7 @@ impl Room { } /// Update the last event in the room - #[cfg(feature = "experimental-sliding-sync")] + #[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))] pub(crate) fn set_latest_event(&self, latest_event: Option) { self.inner.write().unwrap().latest_event = latest_event; } diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 93e8d7490..28422144c 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -15,17 +15,21 @@ #[cfg(feature = "e2e-encryption")] use std::ops::Deref; +#[cfg(feature = "e2e-encryption")] use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; use ruma::{ api::client::sync::sync_events::{ v3::{self, InvitedRoom, RoomSummary}, v4::{self, AccountData}, }, - events::{AnySyncStateEvent, AnyToDeviceEvent}, - serde::Raw, + events::AnySyncStateEvent, RoomId, }; -use tracing::{debug, info, instrument, warn}; +#[cfg(feature = "e2e-encryption")] +use ruma::{events::AnyToDeviceEvent, serde::Raw}; +#[cfg(feature = "e2e-encryption")] +use tracing::warn; +use tracing::{debug, info, instrument}; use super::BaseClient; #[cfg(feature = "e2e-encryption")] @@ -432,7 +436,7 @@ fn cache_latest_events(room: &mut Room, room_info: &mut RoomInfo, events: &[Sync } } else { warn!( - "Failed to deserialise event as AnySyncTimelineEvent. ID={}", + "Failed to deserialize event as AnySyncTimelineEvent. ID={}", e.event_id().expect("Event has no ID!") ); } diff --git a/crates/matrix-sdk-crypto/src/machine.rs b/crates/matrix-sdk-crypto/src/machine.rs index 88e29a9a3..d0335d79d 100644 --- a/crates/matrix-sdk-crypto/src/machine.rs +++ b/crates/matrix-sdk-crypto/src/machine.rs @@ -1887,8 +1887,8 @@ impl OlmMachine { Arc::ptr_eq(&self.inner, &other.inner) } - #[cfg(any(feature = "testing", test))] /// Testing purposes only. + #[cfg(any(feature = "testing", test))] pub fn uploaded_key_count(&self) -> u64 { self.inner.account.uploaded_key_count() } diff --git a/crates/matrix-sdk-indexeddb/Cargo.toml b/crates/matrix-sdk-indexeddb/Cargo.toml index 1b2034ab9..2a0d53a69 100644 --- a/crates/matrix-sdk-indexeddb/Cargo.toml +++ b/crates/matrix-sdk-indexeddb/Cargo.toml @@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = ["e2e-encryption"] e2e-encryption = ["matrix-sdk-base/e2e-encryption", "dep:matrix-sdk-crypto"] +testing = ["matrix-sdk-crypto?/testing"] [dependencies] anyhow = { workspace = true } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 02f462a4a..897cc8fb8 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -537,7 +537,7 @@ mod test { // And the room is stored in the client so it can be extracted when needed let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response, vec![]).await.unwrap(); + client.process_sliding_sync(&response).await.unwrap(); // When we construct a timeline event from it let timeline_item = EventTimelineItem::from_latest_event(&ss_room, event).await.unwrap(); diff --git a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs index a0a2f6ede..bbfa1c0e0 100644 --- a/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs +++ b/crates/matrix-sdk-ui/src/timeline/sliding_sync_ext.rs @@ -122,7 +122,7 @@ mod tests { let mut room = v4::SlidingSyncRoom::new(); room.timeline.push(event.event); let response = response_with_room(room_id, room).await; - client.process_sliding_sync(&response, vec![]).await.unwrap(); + client.process_sliding_sync(&response).await.unwrap(); } fn message_event( diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index cefe071e3..bc47f6732 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "native-tls"] -testing = ["matrix-sdk-sqlite?/testing"] +testing = ["matrix-sdk-sqlite?/testing", "matrix-sdk-indexeddb?/testing", "matrix-sdk-base/testing"] e2e-encryption = [ "matrix-sdk-base/e2e-encryption", diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 0f8057af6..a05920ce2 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -947,8 +947,8 @@ impl Encryption { } } + /// Testing purposes only. #[cfg(any(test, feature = "testing"))] - /// Testing purposees only. pub async fn uploaded_key_count(&self) -> Result { let olm_machine = self.client.olm_machine().await; let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?; diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 9dfb5b895..8753acd2c 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -14,27 +14,14 @@ impl Client { Ok(SlidingSync::builder(id.into(), self.clone())?) } - /// Handle all the e2ee information provided in a sliding sync response. - #[cfg(feature = "e2e-encryption")] - pub(crate) async fn process_sliding_sync_e2ee( - &self, - extensions: &v4::Extensions, - ) -> Result>> { - Ok(self.base_client().process_sliding_sync_e2ee(extensions).await?) - } - /// Handle all the information provided in a sliding sync response, except - /// for the e2ee bits that are handled by `process_sliding_sync_e2ee` - /// (and which results can be passed as the second argument). + /// for the e2ee bits. + /// + /// If you need to handle encryption too, use the internal + /// `SlidingSyncResponseProcessor` instead. #[instrument(skip(self, response))] - pub async fn process_sliding_sync( - &self, - response: &v4::Response, - to_device_events: Vec>, - ) -> Result { - let mut response = self.base_client().process_sliding_sync(response).await?; - - response.to_device.extend(to_device_events); + pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result { + let response = self.base_client().process_sliding_sync(response).await?; debug!("done processing on base_client"); self.handle_sync_response(&response).await?; @@ -42,3 +29,47 @@ impl Client { Ok(response) } } + +/// Small helper to handle a `SlidingSync` response's sub parts. +/// +/// This will properly handle the encryption and the room response +/// independently, if needs be, making sure that both are properly processed by +/// event handlers. +#[must_use] +pub(crate) struct SlidingSyncResponseProcessor { + client: Client, + to_device_events: Vec>, + response: Option, +} + +impl SlidingSyncResponseProcessor { + pub fn new(client: Client) -> Self { + Self { client, to_device_events: Vec::new(), response: None } + } + + #[cfg(feature = "e2e-encryption")] + pub async fn handle_encryption(&mut self, extensions: &v4::Extensions) -> Result<()> { + // This is an internal API misuse if this is triggered (calling + // handle_room_response before this function), so panic is fine. + assert!(self.response.is_none()); + + self.to_device_events = + self.client.base_client().process_sliding_sync_e2ee(extensions).await?; + Ok(()) + } + + pub async fn handle_room_response(&mut self, response: &v4::Response) -> Result<()> { + self.response = Some(self.client.base_client().process_sliding_sync(response).await?); + Ok(()) + } + + pub async fn process_and_take_response(mut self) -> Result { + let mut response = self.response.take().unwrap_or_default(); + + response.to_device.extend(self.to_device_events); + + self.client.handle_sync_response(&response).await?; + + Ok(response) + } +} diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index cab834c5a..5bc1b35ed 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -34,11 +34,9 @@ use std::{ use async_stream::stream; pub use builder::*; -pub use client::*; pub use error::*; use futures_core::stream::Stream; pub use list::*; -use matrix_sdk_base::sync::SyncResponse; pub use room::*; use ruma::{ api::client::{ @@ -60,7 +58,9 @@ use self::{ cache::restore_sliding_sync_state, sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData}, }; -use crate::{config::RequestConfig, Client, Result}; +use crate::{ + config::RequestConfig, sliding_sync::client::SlidingSyncResponseProcessor, Client, Result, +}; /// The Sliding Sync instance. /// @@ -280,14 +280,12 @@ impl SlidingSync { // `sliding_sync_response` is vital, so it must be done somewhere; for now it // happens here. - let to_device_events = Vec::new(); + let mut response_processor = SlidingSyncResponseProcessor::new(self.inner.client.clone()); #[cfg(feature = "e2e-encryption")] - let to_device_events = if self.is_e2ee_enabled() { - self.inner.client.process_sliding_sync_e2ee(&sliding_sync_response.extensions).await? - } else { - to_device_events - }; + if self.is_e2ee_enabled() { + response_processor.handle_encryption(&sliding_sync_response.extensions).await? + } // Only handle the room's subsection of the response, if this sliding sync was // configured to do so. That's because even when not requesting it, @@ -295,16 +293,11 @@ impl SlidingSync { // unrelated to the current connection's parameters. // // NOTE: SS proxy workaround. - let handle_room_response = { - !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty() - || !self.inner.lists.read().await.is_empty() - }; + if self.must_process_rooms_response().await { + response_processor.handle_room_response(&sliding_sync_response).await? + } - let mut sync_response = if handle_room_response { - self.inner.client.process_sliding_sync(&sliding_sync_response, to_device_events).await? - } else { - assign!(SyncResponse::default(), { to_device: to_device_events }) - }; + let mut sync_response = response_processor.process_and_take_response().await?; debug!(?sync_response, "Sliding Sync response has been handled by the client"); @@ -489,12 +482,20 @@ impl SlidingSync { )) } - #[cfg(feature = "e2e-encryption")] /// Is the e2ee extension enabled for this sliding sync instance? + #[cfg(feature = "e2e-encryption")] fn is_e2ee_enabled(&self) -> bool { self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true) } + /// Should we process the room's subpart of a response? + async fn must_process_rooms_response(&self) -> bool { + // We consider that we must, if there's any room subscription or there's any + // list. + !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty() + || !self.inner.lists.read().await.is_empty() + } + #[instrument(skip_all, fields(pos))] async fn sync_once(&self) -> Result { let (request, request_config, requested_room_unsubscriptions) = @@ -1625,6 +1626,7 @@ mod tests { } #[async_test] + #[cfg(feature = "e2e-encryption")] async fn test_process_only_encryption_events() -> Result<()> { let room = owned_room_id!("!croissant:example.org"); diff --git a/xtask/src/ci.rs b/xtask/src/ci.rs index 526233574..9028b03b6 100644 --- a/xtask/src/ci.rs +++ b/xtask/src/ci.rs @@ -347,7 +347,7 @@ fn run_wasm_pack_tests(cmd: Option) -> Result<()> { WasmFeatureSet::MatrixSdkIndexeddbStores, ( "crates/matrix-sdk", - "--no-default-features --features js,indexeddb,e2e-encryption,rustls-tls --lib", + "--no-default-features --features js,indexeddb,e2e-encryption,rustls-tls,testing --lib", ), ), (