From 1e13e06e34c8a37d16b3384cbf1d48daa7637fd8 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 20:38:49 +0100 Subject: [PATCH 1/7] Make event-handling related Client fields private --- crates/matrix-sdk/src/client.rs | 18 +++++++++++++++--- crates/matrix-sdk/src/event_handler.rs | 6 ++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index 3b4605092..3d5fc1a30 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -34,7 +34,7 @@ use matrix_sdk_base::{ }; use matrix_sdk_common::{ instant::{Duration, Instant}, - locks::{Mutex, RwLock}, + locks::{Mutex, RwLock, RwLockReadGuard}, }; use mime::{self, Mime}; use ruma::{ @@ -125,9 +125,9 @@ pub struct Client { pub(crate) members_request_locks: Arc>>>, pub(crate) typing_notice_times: Arc>, /// Event handlers. See `register_event_handler`. - pub(crate) event_handlers: Arc>, + event_handlers: Arc>, /// Custom event handler context. See `register_event_handler_context`. - pub(crate) event_handler_data: Arc>, + event_handler_data: Arc>, /// Notification handlers. See `register_notification_handler`. notification_handlers: Arc>>, /// Whether the client should operate in application service style mode. @@ -637,6 +637,10 @@ impl Client { self } + pub(crate) async fn event_handlers(&self) -> RwLockReadGuard<'_, EventHandlerMap> { + self.event_handlers.read().await + } + /// Add an arbitrary value for use as event handler context. /// /// The value can be obtained in an event handler by adding an argument of @@ -682,6 +686,14 @@ impl Client { self } + pub(crate) fn event_handler_context(&self) -> Option + where + T: Clone + Send + Sync + 'static, + { + let map = self.event_handler_data.read().unwrap(); + map.get::().cloned() + } + /// Register a handler for a notification. /// /// Similar to [`Client::register_event_handler`], but only allows functions diff --git a/crates/matrix-sdk/src/event_handler.rs b/crates/matrix-sdk/src/event_handler.rs index ee48a053a..97993eed7 100644 --- a/crates/matrix-sdk/src/event_handler.rs +++ b/crates/matrix-sdk/src/event_handler.rs @@ -186,8 +186,7 @@ pub struct Ctx(pub T); impl EventHandlerContext for Ctx { fn from_data(data: &EventHandlerData<'_>) -> Option { - let anymap = data.client.event_handler_data.read().unwrap(); - Some(Ctx(anymap.get::()?.clone())) + data.client.event_handler_context::().map(Ctx) } } @@ -330,8 +329,7 @@ impl Client { // Construct event handler futures let futures: Vec<_> = self - .event_handlers - .read() + .event_handlers() .await .get(&event_handler_id) .into_iter() From 544b24b0efaa0847cc196fd4a4c9a41b00c65ccf Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 20:57:48 +0100 Subject: [PATCH 2/7] Move some code out of client.rs It is by far the biggest module of the matrix-sdk crate. --- crates/matrix-sdk/src/client.rs | 138 ++---------------------------- crates/matrix-sdk/src/lib.rs | 1 + crates/matrix-sdk/src/sync.rs | 143 ++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 131 deletions(-) create mode 100644 crates/matrix-sdk/src/sync.rs diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index 3d5fc1a30..8eaad21cb 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -26,9 +26,8 @@ use std::{ use anymap2::any::CloneAnySendSync; use dashmap::DashMap; use futures_core::stream::Stream; -use futures_timer::Delay as sleep; use matrix_sdk_base::{ - deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse}, + deserialized_responses::SyncResponse, media::{MediaEventContent, MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, BaseClient, Session, Store, }; @@ -711,6 +710,12 @@ impl Client { self } + pub(crate) async fn notification_handlers( + &self, + ) -> RwLockReadGuard<'_, Vec> { + self.notification_handlers.read().await + } + /// Get all the rooms the client knows about. /// /// This will return the list of joined, invited, and left rooms. @@ -1723,135 +1728,6 @@ impl Client { self.send(request, None).await } - pub(crate) async fn process_sync( - &self, - response: sync_events::Response, - ) -> Result { - let response = self.base_client.receive_sync_response(response).await?; - let SyncResponse { - next_batch: _, - rooms, - presence, - account_data, - to_device: _, - device_lists: _, - device_one_time_keys_count: _, - ambiguity_changes: _, - notifications, - } = &response; - - self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?; - self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?; - - for (room_id, room_info) in &rooms.join { - let room = self.get_room(room_id); - if room.is_none() { - error!("Can't call event handler, room {} not found", room_id); - continue; - } - - let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } = - room_info; - - self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?; - self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) - .await?; - self.handle_sync_state_events(&room, &state.events).await?; - self.handle_sync_timeline_events(&room, &timeline.events).await?; - } - - for (room_id, room_info) in &rooms.leave { - let room = self.get_room(room_id); - if room.is_none() { - error!("Can't call event handler, room {} not found", room_id); - continue; - } - - let LeftRoom { timeline, state, account_data } = room_info; - - self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) - .await?; - self.handle_sync_state_events(&room, &state.events).await?; - self.handle_sync_timeline_events(&room, &timeline.events).await?; - } - - for (room_id, room_info) in &rooms.invite { - let room = self.get_room(room_id); - if room.is_none() { - error!("Can't call event handler, room {} not found", room_id); - continue; - } - - // FIXME: Destructure room_info - self.handle_sync_events( - EventKind::StrippedState, - &room, - &room_info.invite_state.events, - ) - .await?; - } - - // Construct notification event handler futures - let mut futures = Vec::new(); - for handler in &*self.notification_handlers.read().await { - for (room_id, room_notifications) in notifications { - let room = match self.get_room(room_id) { - Some(room) => room, - None => { - warn!("Can't call notification handler, room {} not found", room_id); - continue; - } - }; - - futures.extend(room_notifications.iter().map(|notification| { - (handler)(notification.clone(), room.clone(), self.clone()) - })); - } - } - - // Run the notification handler futures with the - // `self.notification_handlers` lock no longer being held, in order. - for fut in futures { - fut.await; - } - - Ok(response) - } - - async fn sync_loop_helper( - &self, - sync_settings: &mut crate::config::SyncSettings<'_>, - ) -> Result { - let response = self.sync_once(sync_settings.clone()).await; - - match response { - Ok(r) => { - sync_settings.token = Some(r.next_batch.clone()); - Ok(r) - } - Err(e) => { - error!("Received an invalid response: {}", e); - sleep::new(Duration::from_secs(1)).await; - Err(e) - } - } - } - - async fn delay_sync(last_sync_time: &mut Option) { - let now = Instant::now(); - - // If the last sync happened less than a second ago, sleep for a - // while to not hammer out requests if the server doesn't respect - // the sync timeout. - if let Some(t) = last_sync_time { - if now - *t <= Duration::from_secs(1) { - sleep::new(Duration::from_secs(1)).await; - } - } - - *last_sync_time = Some(now); - } - /// Synchronize the client's state with the latest state on the server. /// /// ## Syncing Events diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index a9863acd8..5f0e43676 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -55,6 +55,7 @@ mod http_client; pub mod room; /// High-level room API mod room_member; +mod sync; #[cfg(feature = "encryption")] pub mod encryption; diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs new file mode 100644 index 000000000..2b04742db --- /dev/null +++ b/crates/matrix-sdk/src/sync.rs @@ -0,0 +1,143 @@ +use std::time::Duration; + +use futures_timer::Delay as sleep; +use matrix_sdk_base::{ + deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse}, + instant::Instant, +}; +use ruma::api::client::r0::sync::sync_events; +use tracing::{error, warn}; + +use crate::{event_handler::EventKind, Client, Result}; + +/// Internal functionality related to getting events from the server (`sync_events` endpoint) +impl Client { + pub(crate) async fn process_sync( + &self, + response: sync_events::Response, + ) -> Result { + let response = self.base_client.receive_sync_response(response).await?; + let SyncResponse { + next_batch: _, + rooms, + presence, + account_data, + to_device: _, + device_lists: _, + device_one_time_keys_count: _, + ambiguity_changes: _, + notifications, + } = &response; + + self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?; + self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?; + + for (room_id, room_info) in &rooms.join { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } = + room_info; + + self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?; + self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) + .await?; + self.handle_sync_state_events(&room, &state.events).await?; + self.handle_sync_timeline_events(&room, &timeline.events).await?; + } + + for (room_id, room_info) in &rooms.leave { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + let LeftRoom { timeline, state, account_data } = room_info; + + self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) + .await?; + self.handle_sync_state_events(&room, &state.events).await?; + self.handle_sync_timeline_events(&room, &timeline.events).await?; + } + + for (room_id, room_info) in &rooms.invite { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + // FIXME: Destructure room_info + self.handle_sync_events( + EventKind::StrippedState, + &room, + &room_info.invite_state.events, + ) + .await?; + } + + // Construct notification event handler futures + let mut futures = Vec::new(); + for handler in &*self.notification_handlers().await { + for (room_id, room_notifications) in notifications { + let room = match self.get_room(room_id) { + Some(room) => room, + None => { + warn!("Can't call notification handler, room {} not found", room_id); + continue; + } + }; + + futures.extend(room_notifications.iter().map(|notification| { + (handler)(notification.clone(), room.clone(), self.clone()) + })); + } + } + + // Run the notification handler futures with the + // `self.notification_handlers` lock no longer being held, in order. + for fut in futures { + fut.await; + } + + Ok(response) + } + + pub(crate) async fn sync_loop_helper( + &self, + sync_settings: &mut crate::config::SyncSettings<'_>, + ) -> Result { + let response = self.sync_once(sync_settings.clone()).await; + + match response { + Ok(r) => { + sync_settings.token = Some(r.next_batch.clone()); + Ok(r) + } + Err(e) => { + error!("Received an invalid response: {}", e); + sleep::new(Duration::from_secs(1)).await; + Err(e) + } + } + } + + pub(crate) async fn delay_sync(last_sync_time: &mut Option) { + let now = Instant::now(); + + // If the last sync happened less than a second ago, sleep for a + // while to not hammer out requests if the server doesn't respect + // the sync timeout. + if let Some(t) = last_sync_time { + if now - *t <= Duration::from_secs(1) { + sleep::new(Duration::from_secs(1)).await; + } + } + + *last_sync_time = Some(now); + } +} From 8f47e6ffe91429719bb1458f5b1b3a6a59552722 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 20:58:42 +0100 Subject: [PATCH 3/7] Remove copy-pasted module documentation It doesn't seem right that both room and room_member have the same description. Also room_member is private and doesn't usually show up in docs. --- crates/matrix-sdk/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 5f0e43676..2adca293f 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -53,7 +53,6 @@ pub mod event_handler; mod http_client; /// High-level room API pub mod room; -/// High-level room API mod room_member; mod sync; From cc6f97bee97ac63ce3bd11bebb1fc87c77b2138e Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 21:09:44 +0100 Subject: [PATCH 4/7] Fix missing word --- crates/matrix-sdk/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index 8eaad21cb..db1674540 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -225,7 +225,7 @@ impl Client { /// // First let's try to construct an user id, presumably from user input. /// let alice = UserId::try_from("@alice:example.org")?; /// - /// // Now let's try to discover the homeserver and create client object. + /// // Now let's try to discover the homeserver and create a client object. /// let client = Client::new_from_user_id(&alice).await?; /// /// // Finally let's try to login. From 2456beaf88813664e5b3b2a791f471e1f7cbfbb9 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 21:18:23 +0100 Subject: [PATCH 5/7] Make Result alias more flexible --- .../src/webserver/warp.rs | 10 +++++----- crates/matrix-sdk/src/client.rs | 18 +++++++++--------- crates/matrix-sdk/src/config/client.rs | 4 ++-- .../src/encryption/identities/devices.rs | 9 +++------ .../src/encryption/identities/users.rs | 2 +- .../src/encryption/verification/qrcode.rs | 4 ++-- crates/matrix-sdk/src/error.rs | 2 +- crates/matrix-sdk/src/room/joined.rs | 16 ++++++++-------- 8 files changed, 31 insertions(+), 34 deletions(-) diff --git a/crates/matrix-sdk-appservice/src/webserver/warp.rs b/crates/matrix-sdk-appservice/src/webserver/warp.rs index 378615846..a7b57989e 100644 --- a/crates/matrix-sdk-appservice/src/webserver/warp.rs +++ b/crates/matrix-sdk-appservice/src/webserver/warp.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{net::ToSocketAddrs, result::Result as StdResult}; +use std::net::ToSocketAddrs; use matrix_sdk::{ bytes::Bytes, @@ -168,7 +168,7 @@ mod handlers { _user_id: String, appservice: AppService, request: http::Request, - ) -> StdResult { + ) -> Result { if let Some(user_exists) = appservice.event_handler.users.lock().await.as_mut() { let request = query_user::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; @@ -185,7 +185,7 @@ mod handlers { _room_id: String, appservice: AppService, request: http::Request, - ) -> StdResult { + ) -> Result { if let Some(room_exists) = appservice.event_handler.rooms.lock().await.as_mut() { let request = query_room::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; @@ -202,7 +202,7 @@ mod handlers { _txn_id: String, appservice: AppService, request: http::Request, - ) -> StdResult { + ) -> Result { let incoming_transaction: ruma::api::appservice::event::push_events::v1::IncomingRequest = ruma::api::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; @@ -224,7 +224,7 @@ struct ErrorMessage { message: String, } -pub async fn handle_rejection(err: Rejection) -> std::result::Result { +pub async fn handle_rejection(err: Rejection) -> Result { if err.find::().is_some() || err.find::().is_some() { let code = http::StatusCode::UNAUTHORIZED; let message = "UNAUTHORIZED"; diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index db1674540..63ade66bf 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -230,7 +230,7 @@ impl Client { /// /// // Finally let's try to login. /// client.login(alice, "password", None, None).await?; - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [spec]: https://spec.matrix.org/unstable/client-server-api/#well-known-uri @@ -866,7 +866,7 @@ impl Client { /// "Logged in as {}, got device_id {} and access_token {}", /// user, response.device_id, response.access_token /// ); - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`restore_login`]: #method.restore_login @@ -1528,7 +1528,7 @@ impl Client { /// for room in response.chunk { /// println!("Found room {:?}", room); /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn public_rooms_filtered( &self, @@ -1628,7 +1628,7 @@ impl Client { /// /// // Check the corresponding Response struct to find out what types are /// // returned - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn send( &self, @@ -1663,7 +1663,7 @@ impl Client { /// device.display_name.as_deref().unwrap_or("") /// ); /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn devices(&self) -> HttpResult { let request = get_devices::Request::new(); @@ -1716,7 +1716,7 @@ impl Client { /// .await?; /// } /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); pub async fn delete_devices( &self, devices: &[DeviceIdBox], @@ -1805,7 +1805,7 @@ impl Client { /// // Now keep on syncing forever. `sync()` will use the stored sync token /// // from our `sync_once()` call automatically. /// client.sync(SyncSettings::default()).await; - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`sync`]: #method.sync @@ -1896,7 +1896,7 @@ impl Client { /// // Now keep on syncing forever. `sync()` will use the latest sync token /// // automatically. /// client.sync(SyncSettings::default()).await; - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [argument docs]: #method.sync_once @@ -2025,7 +2025,7 @@ impl Client { /// } /// } /// - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` #[instrument] pub async fn sync_stream<'a>( diff --git a/crates/matrix-sdk/src/config/client.rs b/crates/matrix-sdk/src/config/client.rs index 2bdc64e40..1c0260686 100644 --- a/crates/matrix-sdk/src/config/client.rs +++ b/crates/matrix-sdk/src/config/client.rs @@ -89,7 +89,7 @@ impl ClientConfig { /// let client_config = ClientConfig::new() /// .proxy("http://localhost:8080")?; /// - /// # matrix_sdk::Result::Ok(()) + /// # Result::<_, matrix_sdk::Error>::Ok(()) /// ``` #[cfg(not(target_arch = "wasm32"))] pub fn proxy(mut self, proxy: &str) -> Result { @@ -104,7 +104,7 @@ impl ClientConfig { } /// Set a custom HTTP user agent for the client. - pub fn user_agent(mut self, user_agent: &str) -> std::result::Result { + pub fn user_agent(mut self, user_agent: &str) -> Result { self.user_agent = Some(HeaderValue::from_str(user_agent)?); Ok(self) } diff --git a/crates/matrix-sdk/src/encryption/identities/devices.rs b/crates/matrix-sdk/src/encryption/identities/devices.rs index 83bf626ee..904b855d4 100644 --- a/crates/matrix-sdk/src/encryption/identities/devices.rs +++ b/crates/matrix-sdk/src/encryption/identities/devices.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ops::Deref, result::Result as StdResult}; +use std::ops::Deref; use matrix_sdk_base::crypto::{ store::CryptoStoreError, Device as BaseDevice, LocalTrust, ReadOnlyDevice, @@ -250,7 +250,7 @@ impl Device { /// } /// # anyhow::Result::<()>::Ok(()) }); /// ``` - pub async fn verify(&self) -> std::result::Result<(), ManualVerifyError> { + pub async fn verify(&self) -> Result<(), ManualVerifyError> { let request = self.inner.verify().await?; self.client.send(request, None).await?; @@ -391,10 +391,7 @@ impl Device { /// # Arguments /// /// * `trust_state` - The new trust state that should be set for the device. - pub async fn set_local_trust( - &self, - trust_state: LocalTrust, - ) -> StdResult<(), CryptoStoreError> { + pub async fn set_local_trust(&self, trust_state: LocalTrust) -> Result<(), CryptoStoreError> { self.inner.set_local_trust(trust_state).await } } diff --git a/crates/matrix-sdk/src/encryption/identities/users.rs b/crates/matrix-sdk/src/encryption/identities/users.rs index 4ae3f047d..3c5fe30cf 100644 --- a/crates/matrix-sdk/src/encryption/identities/users.rs +++ b/crates/matrix-sdk/src/encryption/identities/users.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{result::Result, sync::Arc}; +use std::sync::Arc; use matrix_sdk_base::{ crypto::{ diff --git a/crates/matrix-sdk/src/encryption/verification/qrcode.rs b/crates/matrix-sdk/src/encryption/verification/qrcode.rs index 78b80e807..87322ff3d 100644 --- a/crates/matrix-sdk/src/encryption/verification/qrcode.rs +++ b/crates/matrix-sdk/src/encryption/verification/qrcode.rs @@ -71,7 +71,7 @@ impl QrVerification { /// /// The [`to_bytes()`](#method.to_bytes) method can be used to instead /// output the raw bytes that should be encoded as a QR code. - pub fn to_qr_code(&self) -> std::result::Result { + pub fn to_qr_code(&self) -> Result { self.inner.to_qr_code() } @@ -80,7 +80,7 @@ impl QrVerification { /// /// The [`to_qr_code()`](#method.to_qr_code) method can be used to instead /// output a `QrCode` object that can be rendered. - pub fn to_bytes(&self) -> std::result::Result, EncodingError> { + pub fn to_bytes(&self) -> Result, EncodingError> { self.inner.to_bytes() } diff --git a/crates/matrix-sdk/src/error.rs b/crates/matrix-sdk/src/error.rs index 33ab68f0c..cd02986a3 100644 --- a/crates/matrix-sdk/src/error.rs +++ b/crates/matrix-sdk/src/error.rs @@ -40,7 +40,7 @@ use thiserror::Error; use url::ParseError as UrlParseError; /// Result type of the matrix-sdk. -pub type Result = std::result::Result; +pub type Result = std::result::Result; /// Result type of a pure HTTP request. pub type HttpResult = std::result::Result; diff --git a/crates/matrix-sdk/src/room/joined.rs b/crates/matrix-sdk/src/room/joined.rs index ce9e64361..524d3022c 100644 --- a/crates/matrix-sdk/src/room/joined.rs +++ b/crates/matrix-sdk/src/room/joined.rs @@ -170,7 +170,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.typing_notice(true).await? /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn typing_notice(&self, typing: bool) -> Result<()> { // Only send a request to the homeserver if the old timeout has elapsed @@ -278,7 +278,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.enable_encryption().await? /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn enable_encryption(&self) -> Result<()> { use ruma::{ @@ -449,7 +449,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.send(content, Some(txn_id)).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`SyncMessageEvent`]: ruma::events::SyncMessageEvent @@ -517,7 +517,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.send_raw(content, "m.room.message", None).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`SyncMessageEvent`]: ruma::events::SyncMessageEvent @@ -631,7 +631,7 @@ impl Joined { /// None, /// ).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn send_attachment( &self, @@ -698,7 +698,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.send_state_event(content, "").await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn send_state_event( &self, @@ -791,7 +791,7 @@ impl Joined { /// let reason = Some("Indecent material"); /// room.redact(&event_id, reason, None).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn redact( &self, @@ -834,7 +834,7 @@ impl Joined { /// /// room.set_tag("u.work", tag_info ).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn set_tag(&self, tag: &str, tag_info: TagInfo) -> HttpResult { let user_id = self.client.user_id().await.ok_or(HttpError::AuthenticationRequired)?; From 73c5bfed283bce1c0d2fea5646652fbb2c0e2ddf Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 23:59:42 +0100 Subject: [PATCH 6/7] Switch from futures-locks to async-lock --- crates/matrix-sdk-base/Cargo.toml | 2 +- crates/matrix-sdk-common/Cargo.toml | 4 ++-- crates/matrix-sdk-common/src/locks.rs | 6 +----- crates/matrix-sdk-crypto/Cargo.toml | 2 +- crates/matrix-sdk/Cargo.toml | 2 +- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index 808ea6c19..648acc76e 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -50,7 +50,7 @@ default-features = false features = ["sync", "fs"] [dev-dependencies] -futures = { version = "0.3.15", default-features = false } +futures = { version = "0.3.15", default-features = false, features = ["executor"] } http = "0.2.4" matrix-sdk-test = { version = "0.4.0", path = "../matrix-sdk-test" } diff --git a/crates/matrix-sdk-common/Cargo.toml b/crates/matrix-sdk-common/Cargo.toml index f9043bffa..16b2df4a8 100644 --- a/crates/matrix-sdk-common/Cargo.toml +++ b/crates/matrix-sdk-common/Cargo.toml @@ -33,7 +33,7 @@ version = "0.1.9" features = ["now"] [target.'cfg(target_arch = "wasm32")'.dependencies] -futures-util = { version = "0.3.15", default-features = false } -futures-locks = { version = "0.6.0", default-features = false } +async-lock = "2.4.0" +futures-util = { version = "0.3.15", default-features = false, features = ["channel"] } wasm-bindgen-futures = "0.4.24" uuid = { version = "0.8.2", default-features = false, features = ["v4", "wasm-bindgen"] } diff --git a/crates/matrix-sdk-common/src/locks.rs b/crates/matrix-sdk-common/src/locks.rs index f3162309e..fbb89f52d 100644 --- a/crates/matrix-sdk-common/src/locks.rs +++ b/crates/matrix-sdk-common/src/locks.rs @@ -1,8 +1,4 @@ -// could switch to futures-lock completely at some point, blocker: -// https://github.com/asomers/futures-locks/issues/34 -// https://www.reddit.com/r/rust/comments/f4zldz/i_audited_3_different_implementation_of_async/ - #[cfg(target_arch = "wasm32")] -pub use futures_locks::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub use async_lock::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; #[cfg(not(target_arch = "wasm32"))] pub use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; diff --git a/crates/matrix-sdk-crypto/Cargo.toml b/crates/matrix-sdk-crypto/Cargo.toml index 6d1599a9b..70f6176b7 100644 --- a/crates/matrix-sdk-crypto/Cargo.toml +++ b/crates/matrix-sdk-crypto/Cargo.toml @@ -56,7 +56,7 @@ criterion = { version = "0.3.4", features = [ "async_tokio", "html_reports", ] } -futures = { version = "0.3.15", default-features = false } +futures = { version = "0.3.15", default-features = false, features = ["executor"] } http = "0.2.4" indoc = "1.0.3" matches = "0.1.8" diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index ccb083a83..f448e440b 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -108,7 +108,7 @@ features = ["wasm-bindgen"] [dev-dependencies] anyhow = "1.0" dirs = "3.0.2" -futures = { version = "0.3.15", default-features = false } +futures = { version = "0.3.15", default-features = false, features = ["executor"] } lazy_static = "1.4.0" matches = "0.1.8" matrix-sdk-test = { version = "0.4.0", path = "../matrix-sdk-test" } From b4220bd82464ac0341ce02bb6974125c978ef11d Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 19 Nov 2021 21:42:10 +0100 Subject: [PATCH 7/7] Reduce the size of `Client` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … by moving all of its fields into an inner reference-counted type rather than using reference counting for all fields individually. Some fields are still refcounted individually and thus go through an extra layer of indirection, but the size and clone simplification should still make this change worthwhile. --- crates/matrix-sdk/src/client.rs | 153 ++++++++++++++---------- crates/matrix-sdk/src/encryption/mod.rs | 48 ++++---- crates/matrix-sdk/src/room/common.rs | 11 +- crates/matrix-sdk/src/room/joined.rs | 54 +++++---- crates/matrix-sdk/src/sync.rs | 5 +- 5 files changed, 154 insertions(+), 117 deletions(-) diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index 63ade66bf..07ac96883 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -108,27 +108,31 @@ pub enum LoopCtrl { /// All of the state is held in an `Arc` so the `Client` can be cloned freely. #[derive(Clone)] pub struct Client { + pub(crate) inner: Arc, +} + +pub(crate) struct ClientInner { /// The URL of the homeserver to connect to. homeserver: Arc>, /// The underlying HTTP client. http_client: HttpClient, /// User session data. - pub(crate) base_client: BaseClient, + base_client: BaseClient, /// Locks making sure we only have one group session sharing request in /// flight per room. #[cfg(feature = "encryption")] - pub(crate) group_session_locks: Arc>>>, + pub(crate) group_session_locks: DashMap>>, #[cfg(feature = "encryption")] /// Lock making sure we're only doing one key claim request at a time. - pub(crate) key_claim_lock: Arc>, - pub(crate) members_request_locks: Arc>>>, - pub(crate) typing_notice_times: Arc>, + pub(crate) key_claim_lock: Mutex<()>, + pub(crate) members_request_locks: DashMap>>, + pub(crate) typing_notice_times: DashMap, /// Event handlers. See `register_event_handler`. - event_handlers: Arc>, + event_handlers: RwLock, /// Custom event handler context. See `register_event_handler_context`. - event_handler_data: Arc>, + event_handler_data: StdRwLock, /// Notification handlers. See `register_notification_handler`. - notification_handlers: Arc>>, + notification_handlers: RwLock>, /// Whether the client should operate in application service style mode. /// This is low-level functionality. For an high-level API check the /// `matrix_sdk_appservice` crate. @@ -141,7 +145,7 @@ pub struct Client { /// synchronization, e.g. if we send out a request to create a room, we can /// wait for the sync to get the data to fetch a room object from the state /// store. - pub(crate) sync_beat: Arc, + pub(crate) sync_beat: event_listener::Event, } #[cfg(not(tarpaulin_include))] @@ -185,7 +189,7 @@ impl Client { let http_client = HttpClient::new(client, homeserver.clone(), session, config.request_config); - Ok(Self { + let inner = Arc::new(ClientInner { homeserver, http_client, base_client, @@ -200,8 +204,10 @@ impl Client { notification_handlers: Default::default(), appservice_mode: config.appservice_mode, use_discovery_response: config.use_discovery_response, - sync_beat: event_listener::Event::new().into(), - }) + sync_beat: event_listener::Event::new(), + }); + + Ok(Self { inner }) } /// Create a new [`Client`] using homeserver auto discovery. @@ -267,6 +273,24 @@ impl Client { Ok(client) } + pub(crate) fn base_client(&self) -> &BaseClient { + &self.inner.base_client + } + + #[cfg(feature = "encryption")] + pub(crate) async fn olm_machine(&self) -> Option { + self.base_client().olm_machine().await + } + + #[cfg(feature = "encryption")] + pub(crate) async fn mark_request_as_sent( + &self, + request_id: &matrix_sdk_base::uuid::Uuid, + response: impl Into>, + ) -> Result<(), matrix_sdk_base::Error> { + self.base_client().mark_request_as_sent(request_id, response).await + } + fn homeserver_from_user_id(user_id: &UserId) -> Result { let homeserver = format!("https://{}", user_id.server_name()); #[allow(unused_mut)] @@ -289,7 +313,7 @@ impl Client { /// /// * `homeserver_url` - The new URL to use. pub async fn set_homeserver(&self, homeserver_url: Url) { - let mut homeserver = self.homeserver.write().await; + let mut homeserver = self.inner.homeserver.write().await; *homeserver = homeserver_url; } @@ -323,23 +347,23 @@ impl Client { /// Is the client logged in. pub async fn logged_in(&self) -> bool { - self.base_client.logged_in().await + self.inner.base_client.logged_in().await } /// The Homeserver of the client. pub async fn homeserver(&self) -> Url { - self.homeserver.read().await.clone() + self.inner.homeserver.read().await.clone() } /// Get the user id of the current owner of the client. pub async fn user_id(&self) -> Option { - let session = self.base_client.session().read().await; + let session = self.inner.base_client.session().read().await; session.as_ref().cloned().map(|s| s.user_id) } /// Get the device id that identifies the current session. pub async fn device_id(&self) -> Option { - let session = self.base_client.session().read().await; + let session = self.inner.base_client.session().read().await; session.as_ref().map(|s| s.device_id.clone()) } @@ -350,7 +374,7 @@ impl Client { /// Can be used with [`Client::restore_login`] to restore a previously /// logged in session. pub async fn session(&self) -> Option { - self.base_client.session().read().await.clone() + self.inner.base_client.session().read().await.clone() } /// Fetches the display name of the owner of the client. @@ -468,7 +492,7 @@ impl Client { /// Get a reference to the store. pub fn store(&self) -> &Store { - self.base_client.store() + self.inner.base_client.store() } /// Sets the mxc avatar url of the client's owner. The avatar gets unset if @@ -610,34 +634,39 @@ impl Client { ::Output: EventHandlerResult, { let event_type = H::ID.1; - self.event_handlers.write().await.entry(H::ID).or_default().push(Box::new(move |data| { - let maybe_fut = serde_json::from_str(data.raw.get()) - .map(|ev| handler.clone().handle_event(ev, data)); + self.inner.event_handlers.write().await.entry(H::ID).or_default().push(Box::new( + move |data| { + let maybe_fut = serde_json::from_str(data.raw.get()) + .map(|ev| handler.clone().handle_event(ev, data)); - Box::pin(async move { - match maybe_fut { - Ok(Some(fut)) => { - fut.await.print_error(event_type); - } - Ok(None) => { - error!("Event handler for {} has an invalid context argument", event_type); - } - Err(e) => { - warn!( - "Failed to deserialize `{}` event, skipping event handler.\n\ + Box::pin(async move { + match maybe_fut { + Ok(Some(fut)) => { + fut.await.print_error(event_type); + } + Ok(None) => { + error!( + "Event handler for {} has an invalid context argument", + event_type + ); + } + Err(e) => { + warn!( + "Failed to deserialize `{}` event, skipping event handler.\n\ Deserialization error: {}", - event_type, e, - ); + event_type, e, + ); + } } - } - }) - })); + }) + }, + )); self } pub(crate) async fn event_handlers(&self) -> RwLockReadGuard<'_, EventHandlerMap> { - self.event_handlers.read().await + self.inner.event_handlers.read().await } /// Add an arbitrary value for use as event handler context. @@ -681,7 +710,7 @@ impl Client { where T: Clone + Send + Sync + 'static, { - self.event_handler_data.write().unwrap().insert(ctx); + self.inner.event_handler_data.write().unwrap().insert(ctx); self } @@ -689,7 +718,7 @@ impl Client { where T: Clone + Send + Sync + 'static, { - let map = self.event_handler_data.read().unwrap(); + let map = self.inner.event_handler_data.read().unwrap(); map.get::().cloned() } @@ -703,7 +732,7 @@ impl Client { H: Fn(Notification, room::Room, Client) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { - self.notification_handlers.write().await.push(Box::new( + self.inner.notification_handlers.write().await.push(Box::new( move |notification, room, client| Box::pin((handler)(notification, room, client)), )); @@ -713,7 +742,7 @@ impl Client { pub(crate) async fn notification_handlers( &self, ) -> RwLockReadGuard<'_, Vec> { - self.notification_handlers.read().await + self.inner.notification_handlers.read().await } /// Get all the rooms the client knows about. @@ -1183,7 +1212,7 @@ impl Client { /// /// * `response` - A successful login response. async fn receive_login_response(&self, response: &login::Response) -> Result<()> { - if self.use_discovery_response { + if self.inner.use_discovery_response { if let Some(well_known) = &response.well_known { if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) { self.set_homeserver(homeserver).await; @@ -1191,7 +1220,7 @@ impl Client { } } - self.base_client.receive_login_response(response).await?; + self.inner.base_client.receive_login_response(response).await?; Ok(()) } @@ -1254,7 +1283,7 @@ impl Client { /// /// [`login`]: #method.login pub async fn restore_login(&self, session: Session) -> Result<()> { - Ok(self.base_client.restore_login(session).await?) + Ok(self.inner.base_client.restore_login(session).await?) } /// Register a user to the server. @@ -1301,8 +1330,8 @@ impl Client { let homeserver = self.homeserver().await; info!("Registering to {}", homeserver); - let config = if self.appservice_mode { - Some(self.http_client.request_config.force_auth()) + let config = if self.inner.appservice_mode { + Some(self.inner.http_client.request_config.force_auth()) } else { None }; @@ -1367,14 +1396,14 @@ impl Client { filter_name: &str, definition: FilterDefinition<'_>, ) -> Result { - if let Some(filter) = self.base_client.get_filter(filter_name).await? { + if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? { Ok(filter) } else { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = FilterUploadRequest::new(&user_id, definition); let response = self.send(request, None).await?; - self.base_client.receive_filter_upload(filter_name, &response).await?; + self.inner.base_client.receive_filter_upload(filter_name, &response).await?; Ok(response.filter_id) } @@ -1586,8 +1615,8 @@ impl Client { content_type: Some(content_type.essence_str()), }); - let request_config = self.http_client.request_config.timeout(timeout); - Ok(self.http_client.upload(request, Some(request_config)).await?) + let request_config = self.inner.http_client.request_config.timeout(timeout); + Ok(self.inner.http_client.upload(request, Some(request_config)).await?) } /// Send an arbitrary request to the server, without updating client state. @@ -1639,7 +1668,7 @@ impl Client { Request: OutgoingRequest + Debug, HttpError: From>, { - Ok(self.http_client.send(request, config).await?) + Ok(self.inner.http_client.send(request, config).await?) } /// Get information of all our own devices. @@ -1832,9 +1861,9 @@ impl Client { timeout: sync_settings.timeout, }); - let request_config = self.http_client.request_config.timeout( + let request_config = self.inner.http_client.request_config.timeout( sync_settings.timeout.unwrap_or_else(|| Duration::from_secs(0)) - + self.http_client.request_config.timeout, + + self.inner.http_client.request_config.timeout, ); let response = self.send(request, Some(request_config)).await?; @@ -1845,7 +1874,7 @@ impl Client { error!(error =? e, "Error while sending outgoing E2EE requests"); }; - self.sync_beat.notify(usize::MAX); + self.inner.sync_beat.notify(usize::MAX); Ok(response) } @@ -2050,7 +2079,7 @@ impl Client { /// Get the current, if any, sync token of the client. /// This will be None if the client didn't sync at least once. pub async fn sync_token(&self) -> Option { - self.base_client.sync_token().await + self.inner.base_client.sync_token().await } /// Get a media file's content. @@ -2069,7 +2098,7 @@ impl Client { use_cache: bool, ) -> Result> { let content = if use_cache { - self.base_client.store().get_media_content(request).await? + self.inner.base_client.store().get_media_content(request).await? } else { None }; @@ -2113,7 +2142,7 @@ impl Client { }; if use_cache { - self.base_client.store().add_media_content(request, content.clone()).await?; + self.inner.base_client.store().add_media_content(request, content.clone()).await?; } Ok(content) @@ -2126,7 +2155,7 @@ impl Client { /// /// * `request` - The `MediaRequest` of the content. pub async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> { - Ok(self.base_client.store().remove_media_content(request).await?) + Ok(self.inner.base_client.store().remove_media_content(request).await?) } /// Delete all the media content corresponding to the given @@ -2136,7 +2165,7 @@ impl Client { /// /// * `uri` - The `MxcUri` of the files. pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - Ok(self.base_client.store().remove_media_content_for_uri(uri).await?) + Ok(self.inner.base_client.store().remove_media_content_for_uri(uri).await?) } /// Get the file of the given media event content. @@ -2655,7 +2684,7 @@ pub(crate) mod test { .add_state_event(EventsJson::PowerLevels) .build_sync_response(); - client.base_client.receive_sync_response(response).await.unwrap(); + client.inner.base_client.receive_sync_response(response).await.unwrap(); let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); assert_eq!(client.homeserver().await, Url::parse(&mockito::server_url()).unwrap()); diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 7a33b7e8c..8f599d6e0 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -295,7 +295,7 @@ impl Client { /// called the fingerprint of the device. #[cfg(feature = "encryption")] pub async fn ed25519_key(&self) -> Option { - self.base_client.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned()) + self.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned()) } /// Get the status of the private cross signing keys. @@ -304,7 +304,7 @@ impl Client { /// stored locally. #[cfg(feature = "encryption")] pub async fn cross_signing_status(&self) -> Option { - if let Some(machine) = self.base_client.olm_machine().await { + if let Some(machine) = self.olm_machine().await { Some(machine.cross_signing_status().await) } else { None @@ -317,13 +317,13 @@ impl Client { /// capable devices up to date. #[cfg(feature = "encryption")] pub async fn tracked_users(&self) -> HashSet { - self.base_client.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default() + self.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default() } /// Get a verification object with the given flow id. #[cfg(feature = "encryption")] pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option { - let olm = self.base_client.olm_machine().await?; + let olm = self.olm_machine().await?; olm.get_verification(user_id, flow_id).map(|v| match v { matrix_sdk_base::crypto::Verification::SasV1(s) => { SasVerification { inner: s, client: self.clone() }.into() @@ -343,7 +343,7 @@ impl Client { user_id: &UserId, flow_id: impl AsRef, ) -> Option { - let olm = self.base_client.olm_machine().await?; + let olm = self.olm_machine().await?; olm.get_verification_request(user_id, flow_id) .map(|r| VerificationRequest { inner: r, client: self.clone() }) @@ -388,7 +388,7 @@ impl Client { user_id: &UserId, device_id: &DeviceId, ) -> StdResult, CryptoStoreError> { - let device = self.base_client.get_device(user_id, device_id).await?; + let device = self.base_client().get_device(user_id, device_id).await?; Ok(device.map(|d| Device { inner: d, client: self.clone() })) } @@ -425,7 +425,7 @@ impl Client { &self, user_id: &UserId, ) -> StdResult { - let devices = self.base_client.get_user_devices(user_id).await?; + let devices = self.base_client().get_user_devices(user_id).await?; Ok(UserDevices { inner: devices, client: self.clone() }) } @@ -468,7 +468,7 @@ impl Client { ) -> StdResult, CryptoStoreError> { use crate::encryption::identities::UserIdentity; - if let Some(olm) = self.base_client.olm_machine().await { + if let Some(olm) = self.olm_machine().await { let identity = olm.get_identity(user_id).await?; Ok(identity.map(|i| match i { @@ -526,7 +526,7 @@ impl Client { /// # anyhow::Result::<()>::Ok(()) }); #[cfg(feature = "encryption")] pub async fn bootstrap_cross_signing(&self, auth_data: Option>) -> Result<()> { - let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?; + let olm = self.olm_machine().await.ok_or(Error::AuthenticationRequired)?; let (request, signature_request) = olm.bootstrap_cross_signing(false).await?; @@ -601,7 +601,7 @@ impl Client { passphrase: &str, predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool, ) -> Result<()> { - let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?; + let olm = self.olm_machine().await.ok_or(Error::AuthenticationRequired)?; let keys = olm.export_keys(predicate).await?; let passphrase = zeroize::Zeroizing::new(passphrase.to_owned()); @@ -661,7 +661,7 @@ impl Client { path: PathBuf, passphrase: &str, ) -> StdResult { - let olm = self.base_client.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?; + let olm = self.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?; let passphrase = zeroize::Zeroizing::new(passphrase.to_owned()); let decrypt = move || { @@ -684,7 +684,7 @@ impl Client { ) -> serde_json::Result { use ruma::serde::JsonObject; - if let Some(machine) = self.base_client.olm_machine().await { + if let Some(machine) = self.olm_machine().await { if let AnyRoomEvent::Message(event) = event { if let AnyMessageEvent::RoomEncrypted(_) = event { let room_id = event.room_id(); @@ -727,7 +727,7 @@ impl Client { let request = assign!(get_keys::Request::new(), { device_keys }); let response = self.send(request, None).await?; - self.base_client.mark_request_as_sent(request_id, &response).await?; + self.mark_request_as_sent(request_id, &response).await?; Ok(response) } @@ -844,7 +844,7 @@ impl Client { if let Some(room) = self.get_joined_room(&response.room_id) { Ok(Some(room)) } else { - self.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); + self.inner.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); Ok(self.get_joined_room(&response.room_id)) } } @@ -860,11 +860,11 @@ impl Client { &self, users: impl Iterator, ) -> Result<()> { - let _lock = self.key_claim_lock.lock().await; + let _lock = self.inner.key_claim_lock.lock().await; - if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? { + if let Some((request_id, request)) = self.base_client().get_missing_sessions(users).await? { let response = self.send(request, None).await?; - self.base_client.mark_request_as_sent(&request_id, &response).await?; + self.mark_request_as_sent(&request_id, &response).await?; } Ok(()) @@ -893,7 +893,7 @@ impl Client { ); let response = self.send(request.clone(), None).await?; - self.base_client.mark_request_as_sent(request_id, &response).await?; + self.mark_request_as_sent(request_id, &response).await?; Ok(response) } @@ -971,23 +971,23 @@ impl Client { } OutgoingRequests::ToDeviceRequest(request) => { let response = self.send_to_device(request).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::SignatureUpload(request) => { let response = self.send(request.clone(), None).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::RoomMessage(request) => { let response = self.room_send_helper(request).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::KeysClaim(request) => { let response = self.send(request.clone(), None).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::KeysBackup(request) => { let response = self.send_backup_request(request).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } } @@ -1015,7 +1015,7 @@ impl Client { warn!("Error while claiming one-time keys {:?}", e); } - let outgoing_requests = stream::iter(self.base_client.outgoing_requests().await?) + let outgoing_requests = stream::iter(self.base_client().outgoing_requests().await?) .map(|r| self.send_outgoing_request(r)); let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS); diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index 6c51cb532..f36221861 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -168,14 +168,17 @@ impl Common { pub(crate) async fn request_members(&self) -> Result> { if let Some(mutex) = - self.client.members_request_locks.get(self.inner.room_id()).map(|m| m.clone()) + self.client.inner.members_request_locks.get(self.inner.room_id()).map(|m| m.clone()) { mutex.lock().await; Ok(None) } else { let mutex = Arc::new(Mutex::new(())); - self.client.members_request_locks.insert(self.inner.room_id().clone(), mutex.clone()); + self.client + .inner + .members_request_locks + .insert(self.inner.room_id().clone(), mutex.clone()); let _guard = mutex.lock().await; @@ -183,9 +186,9 @@ impl Common { let response = self.client.send(request, None).await?; let response = - self.client.base_client.receive_members(self.inner.room_id(), &response).await?; + self.client.base_client().receive_members(self.inner.room_id(), &response).await?; - self.client.members_request_locks.remove(self.inner.room_id()); + self.client.inner.members_request_locks.remove(self.inner.room_id()); Ok(Some(response)) } diff --git a/crates/matrix-sdk/src/room/joined.rs b/crates/matrix-sdk/src/room/joined.rs index 524d3022c..8ae09561c 100644 --- a/crates/matrix-sdk/src/room/joined.rs +++ b/crates/matrix-sdk/src/room/joined.rs @@ -176,31 +176,33 @@ impl Joined { // Only send a request to the homeserver if the old timeout has elapsed // or the typing notice changed state within the // TYPING_NOTICE_TIMEOUT - let send = - if let Some(typing_time) = self.client.typing_notice_times.get(self.inner.room_id()) { - if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT { - // We always reactivate the typing notice if typing is true or - // we may need to deactivate it if it's - // currently active if typing is false - typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT - } else { - // Only send a request when we need to deactivate typing - !typing - } + let send = if let Some(typing_time) = + self.client.inner.typing_notice_times.get(self.inner.room_id()) + { + if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT { + // We always reactivate the typing notice if typing is true or + // we may need to deactivate it if it's + // currently active if typing is false + typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT } else { - // Typing notice is currently deactivated, therefore, send a request - // only when it's about to be activated - typing - }; + // Only send a request when we need to deactivate typing + !typing + } + } else { + // Typing notice is currently deactivated, therefore, send a request + // only when it's about to be activated + typing + }; if send { let typing = if typing { self.client + .inner .typing_notice_times .insert(self.inner.room_id().clone(), Instant::now()); Typing::Yes(TYPING_NOTICE_TIMEOUT) } else { - self.client.typing_notice_times.remove(self.inner.room_id()); + self.client.inner.typing_notice_times.remove(self.inner.room_id()); Typing::No }; @@ -294,7 +296,7 @@ impl Joined { // TODO do we want to return an error here if we time out? This // could be quite useful if someone wants to enable encryption and // send a message right after it's enabled. - self.client.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); + self.client.inner.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); } Ok(()) @@ -311,7 +313,7 @@ impl Joined { // TODO expose this publicly so people can pre-share a group session if // e.g. a user starts to type a message for a room. if let Some(mutex) = - self.client.group_session_locks.get(self.inner.room_id()).map(|m| m.clone()) + self.client.inner.group_session_locks.get(self.inner.room_id()).map(|m| m.clone()) { // If a group session share request is already going on, // await the release of the lock. @@ -320,7 +322,10 @@ impl Joined { // Otherwise create a new lock and share the group // session. let mutex = Arc::new(Mutex::new(())); - self.client.group_session_locks.insert(self.inner.room_id().clone(), mutex.clone()); + self.client + .inner + .group_session_locks + .insert(self.inner.room_id().clone(), mutex.clone()); let _guard = mutex.lock().await; @@ -334,13 +339,13 @@ impl Joined { let response = self.share_group_session().await; - self.client.group_session_locks.remove(self.inner.room_id()); + self.client.inner.group_session_locks.remove(self.inner.room_id()); // If one of the responses failed invalidate the group // session as using it would end up in undecryptable // messages. if let Err(r) = response { - self.client.base_client.invalidate_group_session(self.inner.room_id()).await?; + self.client.base_client().invalidate_group_session(self.inner.room_id()).await?; return Err(r); } } @@ -357,12 +362,12 @@ impl Joined { #[cfg(feature = "encryption")] async fn share_group_session(&self) -> Result<()> { let mut requests = - self.client.base_client.share_group_session(self.inner.room_id()).await?; + self.client.base_client().share_group_session(self.inner.room_id()).await?; for request in requests.drain(..) { let response = self.client.send_to_device(&request).await?; - self.client.base_client.mark_request_as_sent(&request.txn_id, &response).await?; + self.client.mark_request_as_sent(&request.txn_id, &response).await?; } Ok(()) @@ -554,8 +559,7 @@ impl Joined { self.preshare_group_session().await?; - let olm = - self.client.base_client.olm_machine().await.expect("Olm machine wasn't started"); + let olm = self.client.olm_machine().await.expect("Olm machine wasn't started"); let encrypted_content = olm.encrypt_raw(self.inner.room_id(), content, event_type).await?; diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs index 2b04742db..398fbd914 100644 --- a/crates/matrix-sdk/src/sync.rs +++ b/crates/matrix-sdk/src/sync.rs @@ -10,13 +10,14 @@ use tracing::{error, warn}; use crate::{event_handler::EventKind, Client, Result}; -/// Internal functionality related to getting events from the server (`sync_events` endpoint) +/// Internal functionality related to getting events from the server +/// (`sync_events` endpoint) impl Client { pub(crate) async fn process_sync( &self, response: sync_events::Response, ) -> Result { - let response = self.base_client.receive_sync_response(response).await?; + let response = self.base_client().receive_sync_response(response).await?; let SyncResponse { next_batch: _, rooms,