diff --git a/crates/matrix-sdk-appservice/src/webserver/warp.rs b/crates/matrix-sdk-appservice/src/webserver/warp.rs index 378615846..a7b57989e 100644 --- a/crates/matrix-sdk-appservice/src/webserver/warp.rs +++ b/crates/matrix-sdk-appservice/src/webserver/warp.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{net::ToSocketAddrs, result::Result as StdResult}; +use std::net::ToSocketAddrs; use matrix_sdk::{ bytes::Bytes, @@ -168,7 +168,7 @@ mod handlers { _user_id: String, appservice: AppService, request: http::Request, - ) -> StdResult { + ) -> Result { if let Some(user_exists) = appservice.event_handler.users.lock().await.as_mut() { let request = query_user::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; @@ -185,7 +185,7 @@ mod handlers { _room_id: String, appservice: AppService, request: http::Request, - ) -> StdResult { + ) -> Result { if let Some(room_exists) = appservice.event_handler.rooms.lock().await.as_mut() { let request = query_room::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; @@ -202,7 +202,7 @@ mod handlers { _txn_id: String, appservice: AppService, request: http::Request, - ) -> StdResult { + ) -> Result { let incoming_transaction: ruma::api::appservice::event::push_events::v1::IncomingRequest = ruma::api::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; @@ -224,7 +224,7 @@ struct ErrorMessage { message: String, } -pub async fn handle_rejection(err: Rejection) -> std::result::Result { +pub async fn handle_rejection(err: Rejection) -> Result { if err.find::().is_some() || err.find::().is_some() { let code = http::StatusCode::UNAUTHORIZED; let message = "UNAUTHORIZED"; diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index 808ea6c19..648acc76e 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -50,7 +50,7 @@ default-features = false features = ["sync", "fs"] [dev-dependencies] -futures = { version = "0.3.15", default-features = false } +futures = { version = "0.3.15", default-features = false, features = ["executor"] } http = "0.2.4" matrix-sdk-test = { version = "0.4.0", path = "../matrix-sdk-test" } diff --git a/crates/matrix-sdk-common/Cargo.toml b/crates/matrix-sdk-common/Cargo.toml index f9043bffa..16b2df4a8 100644 --- a/crates/matrix-sdk-common/Cargo.toml +++ b/crates/matrix-sdk-common/Cargo.toml @@ -33,7 +33,7 @@ version = "0.1.9" features = ["now"] [target.'cfg(target_arch = "wasm32")'.dependencies] -futures-util = { version = "0.3.15", default-features = false } -futures-locks = { version = "0.6.0", default-features = false } +async-lock = "2.4.0" +futures-util = { version = "0.3.15", default-features = false, features = ["channel"] } wasm-bindgen-futures = "0.4.24" uuid = { version = "0.8.2", default-features = false, features = ["v4", "wasm-bindgen"] } diff --git a/crates/matrix-sdk-common/src/locks.rs b/crates/matrix-sdk-common/src/locks.rs index f3162309e..fbb89f52d 100644 --- a/crates/matrix-sdk-common/src/locks.rs +++ b/crates/matrix-sdk-common/src/locks.rs @@ -1,8 +1,4 @@ -// could switch to futures-lock completely at some point, blocker: -// https://github.com/asomers/futures-locks/issues/34 -// https://www.reddit.com/r/rust/comments/f4zldz/i_audited_3_different_implementation_of_async/ - #[cfg(target_arch = "wasm32")] -pub use futures_locks::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub use async_lock::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; #[cfg(not(target_arch = "wasm32"))] pub use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; diff --git a/crates/matrix-sdk-crypto/Cargo.toml b/crates/matrix-sdk-crypto/Cargo.toml index 6d1599a9b..70f6176b7 100644 --- a/crates/matrix-sdk-crypto/Cargo.toml +++ b/crates/matrix-sdk-crypto/Cargo.toml @@ -56,7 +56,7 @@ criterion = { version = "0.3.4", features = [ "async_tokio", "html_reports", ] } -futures = { version = "0.3.15", default-features = false } +futures = { version = "0.3.15", default-features = false, features = ["executor"] } http = "0.2.4" indoc = "1.0.3" matches = "0.1.8" diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index ccb083a83..f448e440b 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -108,7 +108,7 @@ features = ["wasm-bindgen"] [dev-dependencies] anyhow = "1.0" dirs = "3.0.2" -futures = { version = "0.3.15", default-features = false } +futures = { version = "0.3.15", default-features = false, features = ["executor"] } lazy_static = "1.4.0" matches = "0.1.8" matrix-sdk-test = { version = "0.4.0", path = "../matrix-sdk-test" } diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index 6f1dcd47a..e00a8ff8a 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -26,15 +26,14 @@ use std::{ use anymap2::any::CloneAnySendSync; use dashmap::DashMap; use futures_core::stream::Stream; -use futures_timer::Delay as sleep; use matrix_sdk_base::{ - deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse}, + deserialized_responses::SyncResponse, media::{MediaEventContent, MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, BaseClient, Session, Store, }; use matrix_sdk_common::{ instant::{Duration, Instant}, - locks::{Mutex, RwLock}, + locks::{Mutex, RwLock, RwLockReadGuard}, }; use mime::{self, Mime}; use ruma::{ @@ -109,27 +108,31 @@ pub enum LoopCtrl { /// All of the state is held in an `Arc` so the `Client` can be cloned freely. #[derive(Clone)] pub struct Client { + pub(crate) inner: Arc, +} + +pub(crate) struct ClientInner { /// The URL of the homeserver to connect to. homeserver: Arc>, /// The underlying HTTP client. http_client: HttpClient, /// User session data. - pub(crate) base_client: BaseClient, + base_client: BaseClient, /// Locks making sure we only have one group session sharing request in /// flight per room. #[cfg(feature = "encryption")] - pub(crate) group_session_locks: Arc>>>, + pub(crate) group_session_locks: DashMap>>, #[cfg(feature = "encryption")] /// Lock making sure we're only doing one key claim request at a time. - pub(crate) key_claim_lock: Arc>, - pub(crate) members_request_locks: Arc>>>, - pub(crate) typing_notice_times: Arc>, + pub(crate) key_claim_lock: Mutex<()>, + pub(crate) members_request_locks: DashMap>>, + pub(crate) typing_notice_times: DashMap, /// Event handlers. See `register_event_handler`. - pub(crate) event_handlers: Arc>, + event_handlers: RwLock, /// Custom event handler context. See `register_event_handler_context`. - pub(crate) event_handler_data: Arc>, + event_handler_data: StdRwLock, /// Notification handlers. See `register_notification_handler`. - notification_handlers: Arc>>, + notification_handlers: RwLock>, /// Whether the client should operate in application service style mode. /// This is low-level functionality. For an high-level API check the /// `matrix_sdk_appservice` crate. @@ -142,7 +145,7 @@ pub struct Client { /// synchronization, e.g. if we send out a request to create a room, we can /// wait for the sync to get the data to fetch a room object from the state /// store. - pub(crate) sync_beat: Arc, + pub(crate) sync_beat: event_listener::Event, } #[cfg(not(tarpaulin_include))] @@ -186,7 +189,7 @@ impl Client { let http_client = HttpClient::new(client, homeserver.clone(), session, config.request_config); - Ok(Self { + let inner = Arc::new(ClientInner { homeserver, http_client, base_client, @@ -201,8 +204,10 @@ impl Client { notification_handlers: Default::default(), appservice_mode: config.appservice_mode, use_discovery_response: config.use_discovery_response, - sync_beat: event_listener::Event::new().into(), - }) + sync_beat: event_listener::Event::new(), + }); + + Ok(Self { inner }) } /// Create a new [`Client`] using homeserver auto discovery. @@ -226,12 +231,12 @@ impl Client { /// // First let's try to construct an user id, presumably from user input. /// let alice = UserId::try_from("@alice:example.org")?; /// - /// // Now let's try to discover the homeserver and create client object. + /// // Now let's try to discover the homeserver and create a client object. /// let client = Client::new_from_user_id(&alice).await?; /// /// // Finally let's try to login. /// client.login(alice, "password", None, None).await?; - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [spec]: https://spec.matrix.org/unstable/client-server-api/#well-known-uri @@ -268,6 +273,24 @@ impl Client { Ok(client) } + pub(crate) fn base_client(&self) -> &BaseClient { + &self.inner.base_client + } + + #[cfg(feature = "encryption")] + pub(crate) async fn olm_machine(&self) -> Option { + self.base_client().olm_machine().await + } + + #[cfg(feature = "encryption")] + pub(crate) async fn mark_request_as_sent( + &self, + request_id: &matrix_sdk_base::uuid::Uuid, + response: impl Into>, + ) -> Result<(), matrix_sdk_base::Error> { + self.base_client().mark_request_as_sent(request_id, response).await + } + fn homeserver_from_user_id(user_id: &UserId) -> Result { let homeserver = format!("https://{}", user_id.server_name()); #[allow(unused_mut)] @@ -290,7 +313,7 @@ impl Client { /// /// * `homeserver_url` - The new URL to use. pub async fn set_homeserver(&self, homeserver_url: Url) { - let mut homeserver = self.homeserver.write().await; + let mut homeserver = self.inner.homeserver.write().await; *homeserver = homeserver_url; } @@ -324,23 +347,23 @@ impl Client { /// Is the client logged in. pub async fn logged_in(&self) -> bool { - self.base_client.logged_in().await + self.inner.base_client.logged_in().await } /// The Homeserver of the client. pub async fn homeserver(&self) -> Url { - self.homeserver.read().await.clone() + self.inner.homeserver.read().await.clone() } /// Get the user id of the current owner of the client. pub async fn user_id(&self) -> Option { - let session = self.base_client.session().read().await; + let session = self.inner.base_client.session().read().await; session.as_ref().cloned().map(|s| s.user_id) } /// Get the device id that identifies the current session. pub async fn device_id(&self) -> Option { - let session = self.base_client.session().read().await; + let session = self.inner.base_client.session().read().await; session.as_ref().map(|s| s.device_id.clone()) } @@ -351,7 +374,7 @@ impl Client { /// Can be used with [`Client::restore_login`] to restore a previously /// logged in session. pub async fn session(&self) -> Option { - self.base_client.session().read().await.clone() + self.inner.base_client.session().read().await.clone() } /// Fetches the display name of the owner of the client. @@ -469,7 +492,7 @@ impl Client { /// Get a reference to the store. pub fn store(&self) -> &Store { - self.base_client.store() + self.inner.base_client.store() } /// Sets the mxc avatar url of the client's owner. The avatar gets unset if @@ -611,32 +634,41 @@ impl Client { ::Output: EventHandlerResult, { let event_type = H::ID.1; - self.event_handlers.write().await.entry(H::ID).or_default().push(Box::new(move |data| { - let maybe_fut = serde_json::from_str(data.raw.get()) - .map(|ev| handler.clone().handle_event(ev, data)); + self.inner.event_handlers.write().await.entry(H::ID).or_default().push(Box::new( + move |data| { + let maybe_fut = serde_json::from_str(data.raw.get()) + .map(|ev| handler.clone().handle_event(ev, data)); - Box::pin(async move { - match maybe_fut { - Ok(Some(fut)) => { - fut.await.print_error(event_type); - } - Ok(None) => { - error!("Event handler for {} has an invalid context argument", event_type); - } - Err(e) => { - warn!( - "Failed to deserialize `{}` event, skipping event handler.\n\ + Box::pin(async move { + match maybe_fut { + Ok(Some(fut)) => { + fut.await.print_error(event_type); + } + Ok(None) => { + error!( + "Event handler for {} has an invalid context argument", + event_type + ); + } + Err(e) => { + warn!( + "Failed to deserialize `{}` event, skipping event handler.\n\ Deserialization error: {}", - event_type, e, - ); + event_type, e, + ); + } } - } - }) - })); + }) + }, + )); self } + pub(crate) async fn event_handlers(&self) -> RwLockReadGuard<'_, EventHandlerMap> { + self.inner.event_handlers.read().await + } + /// Add an arbitrary value for use as event handler context. /// /// The value can be obtained in an event handler by adding an argument of @@ -678,10 +710,18 @@ impl Client { where T: Clone + Send + Sync + 'static, { - self.event_handler_data.write().unwrap().insert(ctx); + self.inner.event_handler_data.write().unwrap().insert(ctx); self } + pub(crate) fn event_handler_context(&self) -> Option + where + T: Clone + Send + Sync + 'static, + { + let map = self.inner.event_handler_data.read().unwrap(); + map.get::().cloned() + } + /// Register a handler for a notification. /// /// Similar to [`Client::register_event_handler`], but only allows functions @@ -692,13 +732,19 @@ impl Client { H: Fn(Notification, room::Room, Client) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { - self.notification_handlers.write().await.push(Box::new( + self.inner.notification_handlers.write().await.push(Box::new( move |notification, room, client| Box::pin((handler)(notification, room, client)), )); self } + pub(crate) async fn notification_handlers( + &self, + ) -> RwLockReadGuard<'_, Vec> { + self.inner.notification_handlers.read().await + } + /// Get all the rooms the client knows about. /// /// This will return the list of joined, invited, and left rooms. @@ -849,7 +895,7 @@ impl Client { /// "Logged in as {}, got device_id {} and access_token {}", /// user, response.device_id, response.access_token /// ); - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`restore_login`]: #method.restore_login @@ -1166,7 +1212,7 @@ impl Client { /// /// * `response` - A successful login response. async fn receive_login_response(&self, response: &login::Response) -> Result<()> { - if self.use_discovery_response { + if self.inner.use_discovery_response { if let Some(well_known) = &response.well_known { if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) { self.set_homeserver(homeserver).await; @@ -1174,7 +1220,7 @@ impl Client { } } - self.base_client.receive_login_response(response).await?; + self.inner.base_client.receive_login_response(response).await?; Ok(()) } @@ -1237,7 +1283,7 @@ impl Client { /// /// [`login`]: #method.login pub async fn restore_login(&self, session: Session) -> Result<()> { - Ok(self.base_client.restore_login(session).await?) + Ok(self.inner.base_client.restore_login(session).await?) } /// Register a user to the server. @@ -1284,8 +1330,8 @@ impl Client { let homeserver = self.homeserver().await; info!("Registering to {}", homeserver); - let config = if self.appservice_mode { - Some(self.http_client.request_config.force_auth()) + let config = if self.inner.appservice_mode { + Some(self.inner.http_client.request_config.force_auth()) } else { None }; @@ -1350,14 +1396,14 @@ impl Client { filter_name: &str, definition: FilterDefinition<'_>, ) -> Result { - if let Some(filter) = self.base_client.get_filter(filter_name).await? { + if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? { Ok(filter) } else { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = FilterUploadRequest::new(&user_id, definition); let response = self.send(request, None).await?; - self.base_client.receive_filter_upload(filter_name, &response).await?; + self.inner.base_client.receive_filter_upload(filter_name, &response).await?; Ok(response.filter_id) } @@ -1511,7 +1557,7 @@ impl Client { /// for room in response.chunk { /// println!("Found room {:?}", room); /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn public_rooms_filtered( &self, @@ -1569,8 +1615,8 @@ impl Client { content_type: Some(content_type.essence_str()), }); - let request_config = self.http_client.request_config.timeout(timeout); - Ok(self.http_client.upload(request, Some(request_config)).await?) + let request_config = self.inner.http_client.request_config.timeout(timeout); + Ok(self.inner.http_client.upload(request, Some(request_config)).await?) } /// Send an arbitrary request to the server, without updating client state. @@ -1611,7 +1657,7 @@ impl Client { /// /// // Check the corresponding Response struct to find out what types are /// // returned - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn send( &self, @@ -1622,7 +1668,7 @@ impl Client { Request: OutgoingRequest + Debug, HttpError: From>, { - Ok(self.http_client.send(request, config).await?) + Ok(self.inner.http_client.send(request, config).await?) } /// Get information of all our own devices. @@ -1646,7 +1692,7 @@ impl Client { /// device.display_name.as_deref().unwrap_or("") /// ); /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn devices(&self) -> HttpResult { let request = get_devices::Request::new(); @@ -1699,7 +1745,7 @@ impl Client { /// .await?; /// } /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); pub async fn delete_devices( &self, devices: &[DeviceIdBox], @@ -1711,135 +1757,6 @@ impl Client { self.send(request, None).await } - pub(crate) async fn process_sync( - &self, - response: sync_events::Response, - ) -> Result { - let response = self.base_client.receive_sync_response(response).await?; - let SyncResponse { - next_batch: _, - rooms, - presence, - account_data, - to_device: _, - device_lists: _, - device_one_time_keys_count: _, - ambiguity_changes: _, - notifications, - } = &response; - - self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?; - self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?; - - for (room_id, room_info) in &rooms.join { - let room = self.get_room(room_id); - if room.is_none() { - error!("Can't call event handler, room {} not found", room_id); - continue; - } - - let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } = - room_info; - - self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?; - self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) - .await?; - self.handle_sync_state_events(&room, &state.events).await?; - self.handle_sync_timeline_events(&room, &timeline.events).await?; - } - - for (room_id, room_info) in &rooms.leave { - let room = self.get_room(room_id); - if room.is_none() { - error!("Can't call event handler, room {} not found", room_id); - continue; - } - - let LeftRoom { timeline, state, account_data } = room_info; - - self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) - .await?; - self.handle_sync_state_events(&room, &state.events).await?; - self.handle_sync_timeline_events(&room, &timeline.events).await?; - } - - for (room_id, room_info) in &rooms.invite { - let room = self.get_room(room_id); - if room.is_none() { - error!("Can't call event handler, room {} not found", room_id); - continue; - } - - // FIXME: Destructure room_info - self.handle_sync_events( - EventKind::StrippedState, - &room, - &room_info.invite_state.events, - ) - .await?; - } - - // Construct notification event handler futures - let mut futures = Vec::new(); - for handler in &*self.notification_handlers.read().await { - for (room_id, room_notifications) in notifications { - let room = match self.get_room(room_id) { - Some(room) => room, - None => { - warn!("Can't call notification handler, room {} not found", room_id); - continue; - } - }; - - futures.extend(room_notifications.iter().map(|notification| { - (handler)(notification.clone(), room.clone(), self.clone()) - })); - } - } - - // Run the notification handler futures with the - // `self.notification_handlers` lock no longer being held, in order. - for fut in futures { - fut.await; - } - - Ok(response) - } - - async fn sync_loop_helper( - &self, - sync_settings: &mut crate::config::SyncSettings<'_>, - ) -> Result { - let response = self.sync_once(sync_settings.clone()).await; - - match response { - Ok(r) => { - sync_settings.token = Some(r.next_batch.clone()); - Ok(r) - } - Err(e) => { - error!("Received an invalid response: {}", e); - sleep::new(Duration::from_secs(1)).await; - Err(e) - } - } - } - - async fn delay_sync(last_sync_time: &mut Option) { - let now = Instant::now(); - - // If the last sync happened less than a second ago, sleep for a - // while to not hammer out requests if the server doesn't respect - // the sync timeout. - if let Some(t) = last_sync_time { - if now - *t <= Duration::from_secs(1) { - sleep::new(Duration::from_secs(1)).await; - } - } - - *last_sync_time = Some(now); - } - /// Synchronize the client's state with the latest state on the server. /// /// ## Syncing Events @@ -1917,7 +1834,7 @@ impl Client { /// // Now keep on syncing forever. `sync()` will use the stored sync token /// // from our `sync_once()` call automatically. /// client.sync(SyncSettings::default()).await; - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`sync`]: #method.sync @@ -1944,9 +1861,9 @@ impl Client { timeout: sync_settings.timeout, }); - let request_config = self.http_client.request_config.timeout( + let request_config = self.inner.http_client.request_config.timeout( sync_settings.timeout.unwrap_or_else(|| Duration::from_secs(0)) - + self.http_client.request_config.timeout, + + self.inner.http_client.request_config.timeout, ); let response = self.send(request, Some(request_config)).await?; @@ -1957,7 +1874,7 @@ impl Client { error!(error =? e, "Error while sending outgoing E2EE requests"); }; - self.sync_beat.notify(usize::MAX); + self.inner.sync_beat.notify(usize::MAX); Ok(response) } @@ -2008,7 +1925,7 @@ impl Client { /// // Now keep on syncing forever. `sync()` will use the latest sync token /// // automatically. /// client.sync(SyncSettings::default()).await; - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [argument docs]: #method.sync_once @@ -2137,7 +2054,7 @@ impl Client { /// } /// } /// - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` #[instrument] pub async fn sync_stream<'a>( @@ -2162,7 +2079,7 @@ impl Client { /// Get the current, if any, sync token of the client. /// This will be None if the client didn't sync at least once. pub async fn sync_token(&self) -> Option { - self.base_client.sync_token().await + self.inner.base_client.sync_token().await } /// Get a media file's content. @@ -2181,7 +2098,7 @@ impl Client { use_cache: bool, ) -> Result> { let content = if use_cache { - self.base_client.store().get_media_content(request).await? + self.inner.base_client.store().get_media_content(request).await? } else { None }; @@ -2225,7 +2142,7 @@ impl Client { }; if use_cache { - self.base_client.store().add_media_content(request, content.clone()).await?; + self.inner.base_client.store().add_media_content(request, content.clone()).await?; } Ok(content) @@ -2238,7 +2155,7 @@ impl Client { /// /// * `request` - The `MediaRequest` of the content. pub async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> { - Ok(self.base_client.store().remove_media_content(request).await?) + Ok(self.inner.base_client.store().remove_media_content(request).await?) } /// Delete all the media content corresponding to the given @@ -2248,7 +2165,7 @@ impl Client { /// /// * `uri` - The `MxcUri` of the files. pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - Ok(self.base_client.store().remove_media_content_for_uri(uri).await?) + Ok(self.inner.base_client.store().remove_media_content_for_uri(uri).await?) } /// Get the file of the given media event content. @@ -2767,7 +2684,7 @@ pub(crate) mod test { .add_state_event(EventsJson::PowerLevels) .build_sync_response(); - client.base_client.receive_sync_response(response).await.unwrap(); + client.inner.base_client.receive_sync_response(response).await.unwrap(); let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); assert_eq!(client.homeserver().await, Url::parse(&mockito::server_url()).unwrap()); diff --git a/crates/matrix-sdk/src/config/client.rs b/crates/matrix-sdk/src/config/client.rs index 2bdc64e40..1c0260686 100644 --- a/crates/matrix-sdk/src/config/client.rs +++ b/crates/matrix-sdk/src/config/client.rs @@ -89,7 +89,7 @@ impl ClientConfig { /// let client_config = ClientConfig::new() /// .proxy("http://localhost:8080")?; /// - /// # matrix_sdk::Result::Ok(()) + /// # Result::<_, matrix_sdk::Error>::Ok(()) /// ``` #[cfg(not(target_arch = "wasm32"))] pub fn proxy(mut self, proxy: &str) -> Result { @@ -104,7 +104,7 @@ impl ClientConfig { } /// Set a custom HTTP user agent for the client. - pub fn user_agent(mut self, user_agent: &str) -> std::result::Result { + pub fn user_agent(mut self, user_agent: &str) -> Result { self.user_agent = Some(HeaderValue::from_str(user_agent)?); Ok(self) } diff --git a/crates/matrix-sdk/src/encryption/identities/devices.rs b/crates/matrix-sdk/src/encryption/identities/devices.rs index 83bf626ee..904b855d4 100644 --- a/crates/matrix-sdk/src/encryption/identities/devices.rs +++ b/crates/matrix-sdk/src/encryption/identities/devices.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ops::Deref, result::Result as StdResult}; +use std::ops::Deref; use matrix_sdk_base::crypto::{ store::CryptoStoreError, Device as BaseDevice, LocalTrust, ReadOnlyDevice, @@ -250,7 +250,7 @@ impl Device { /// } /// # anyhow::Result::<()>::Ok(()) }); /// ``` - pub async fn verify(&self) -> std::result::Result<(), ManualVerifyError> { + pub async fn verify(&self) -> Result<(), ManualVerifyError> { let request = self.inner.verify().await?; self.client.send(request, None).await?; @@ -391,10 +391,7 @@ impl Device { /// # Arguments /// /// * `trust_state` - The new trust state that should be set for the device. - pub async fn set_local_trust( - &self, - trust_state: LocalTrust, - ) -> StdResult<(), CryptoStoreError> { + pub async fn set_local_trust(&self, trust_state: LocalTrust) -> Result<(), CryptoStoreError> { self.inner.set_local_trust(trust_state).await } } diff --git a/crates/matrix-sdk/src/encryption/identities/users.rs b/crates/matrix-sdk/src/encryption/identities/users.rs index 4ae3f047d..3c5fe30cf 100644 --- a/crates/matrix-sdk/src/encryption/identities/users.rs +++ b/crates/matrix-sdk/src/encryption/identities/users.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{result::Result, sync::Arc}; +use std::sync::Arc; use matrix_sdk_base::{ crypto::{ diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 7a33b7e8c..8f599d6e0 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -295,7 +295,7 @@ impl Client { /// called the fingerprint of the device. #[cfg(feature = "encryption")] pub async fn ed25519_key(&self) -> Option { - self.base_client.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned()) + self.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned()) } /// Get the status of the private cross signing keys. @@ -304,7 +304,7 @@ impl Client { /// stored locally. #[cfg(feature = "encryption")] pub async fn cross_signing_status(&self) -> Option { - if let Some(machine) = self.base_client.olm_machine().await { + if let Some(machine) = self.olm_machine().await { Some(machine.cross_signing_status().await) } else { None @@ -317,13 +317,13 @@ impl Client { /// capable devices up to date. #[cfg(feature = "encryption")] pub async fn tracked_users(&self) -> HashSet { - self.base_client.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default() + self.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default() } /// Get a verification object with the given flow id. #[cfg(feature = "encryption")] pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option { - let olm = self.base_client.olm_machine().await?; + let olm = self.olm_machine().await?; olm.get_verification(user_id, flow_id).map(|v| match v { matrix_sdk_base::crypto::Verification::SasV1(s) => { SasVerification { inner: s, client: self.clone() }.into() @@ -343,7 +343,7 @@ impl Client { user_id: &UserId, flow_id: impl AsRef, ) -> Option { - let olm = self.base_client.olm_machine().await?; + let olm = self.olm_machine().await?; olm.get_verification_request(user_id, flow_id) .map(|r| VerificationRequest { inner: r, client: self.clone() }) @@ -388,7 +388,7 @@ impl Client { user_id: &UserId, device_id: &DeviceId, ) -> StdResult, CryptoStoreError> { - let device = self.base_client.get_device(user_id, device_id).await?; + let device = self.base_client().get_device(user_id, device_id).await?; Ok(device.map(|d| Device { inner: d, client: self.clone() })) } @@ -425,7 +425,7 @@ impl Client { &self, user_id: &UserId, ) -> StdResult { - let devices = self.base_client.get_user_devices(user_id).await?; + let devices = self.base_client().get_user_devices(user_id).await?; Ok(UserDevices { inner: devices, client: self.clone() }) } @@ -468,7 +468,7 @@ impl Client { ) -> StdResult, CryptoStoreError> { use crate::encryption::identities::UserIdentity; - if let Some(olm) = self.base_client.olm_machine().await { + if let Some(olm) = self.olm_machine().await { let identity = olm.get_identity(user_id).await?; Ok(identity.map(|i| match i { @@ -526,7 +526,7 @@ impl Client { /// # anyhow::Result::<()>::Ok(()) }); #[cfg(feature = "encryption")] pub async fn bootstrap_cross_signing(&self, auth_data: Option>) -> Result<()> { - let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?; + let olm = self.olm_machine().await.ok_or(Error::AuthenticationRequired)?; let (request, signature_request) = olm.bootstrap_cross_signing(false).await?; @@ -601,7 +601,7 @@ impl Client { passphrase: &str, predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool, ) -> Result<()> { - let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?; + let olm = self.olm_machine().await.ok_or(Error::AuthenticationRequired)?; let keys = olm.export_keys(predicate).await?; let passphrase = zeroize::Zeroizing::new(passphrase.to_owned()); @@ -661,7 +661,7 @@ impl Client { path: PathBuf, passphrase: &str, ) -> StdResult { - let olm = self.base_client.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?; + let olm = self.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?; let passphrase = zeroize::Zeroizing::new(passphrase.to_owned()); let decrypt = move || { @@ -684,7 +684,7 @@ impl Client { ) -> serde_json::Result { use ruma::serde::JsonObject; - if let Some(machine) = self.base_client.olm_machine().await { + if let Some(machine) = self.olm_machine().await { if let AnyRoomEvent::Message(event) = event { if let AnyMessageEvent::RoomEncrypted(_) = event { let room_id = event.room_id(); @@ -727,7 +727,7 @@ impl Client { let request = assign!(get_keys::Request::new(), { device_keys }); let response = self.send(request, None).await?; - self.base_client.mark_request_as_sent(request_id, &response).await?; + self.mark_request_as_sent(request_id, &response).await?; Ok(response) } @@ -844,7 +844,7 @@ impl Client { if let Some(room) = self.get_joined_room(&response.room_id) { Ok(Some(room)) } else { - self.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); + self.inner.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); Ok(self.get_joined_room(&response.room_id)) } } @@ -860,11 +860,11 @@ impl Client { &self, users: impl Iterator, ) -> Result<()> { - let _lock = self.key_claim_lock.lock().await; + let _lock = self.inner.key_claim_lock.lock().await; - if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? { + if let Some((request_id, request)) = self.base_client().get_missing_sessions(users).await? { let response = self.send(request, None).await?; - self.base_client.mark_request_as_sent(&request_id, &response).await?; + self.mark_request_as_sent(&request_id, &response).await?; } Ok(()) @@ -893,7 +893,7 @@ impl Client { ); let response = self.send(request.clone(), None).await?; - self.base_client.mark_request_as_sent(request_id, &response).await?; + self.mark_request_as_sent(request_id, &response).await?; Ok(response) } @@ -971,23 +971,23 @@ impl Client { } OutgoingRequests::ToDeviceRequest(request) => { let response = self.send_to_device(request).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::SignatureUpload(request) => { let response = self.send(request.clone(), None).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::RoomMessage(request) => { let response = self.room_send_helper(request).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::KeysClaim(request) => { let response = self.send(request.clone(), None).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } OutgoingRequests::KeysBackup(request) => { let response = self.send_backup_request(request).await?; - self.base_client.mark_request_as_sent(r.request_id(), &response).await?; + self.mark_request_as_sent(r.request_id(), &response).await?; } } @@ -1015,7 +1015,7 @@ impl Client { warn!("Error while claiming one-time keys {:?}", e); } - let outgoing_requests = stream::iter(self.base_client.outgoing_requests().await?) + let outgoing_requests = stream::iter(self.base_client().outgoing_requests().await?) .map(|r| self.send_outgoing_request(r)); let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS); diff --git a/crates/matrix-sdk/src/encryption/verification/qrcode.rs b/crates/matrix-sdk/src/encryption/verification/qrcode.rs index 78b80e807..87322ff3d 100644 --- a/crates/matrix-sdk/src/encryption/verification/qrcode.rs +++ b/crates/matrix-sdk/src/encryption/verification/qrcode.rs @@ -71,7 +71,7 @@ impl QrVerification { /// /// The [`to_bytes()`](#method.to_bytes) method can be used to instead /// output the raw bytes that should be encoded as a QR code. - pub fn to_qr_code(&self) -> std::result::Result { + pub fn to_qr_code(&self) -> Result { self.inner.to_qr_code() } @@ -80,7 +80,7 @@ impl QrVerification { /// /// The [`to_qr_code()`](#method.to_qr_code) method can be used to instead /// output a `QrCode` object that can be rendered. - pub fn to_bytes(&self) -> std::result::Result, EncodingError> { + pub fn to_bytes(&self) -> Result, EncodingError> { self.inner.to_bytes() } diff --git a/crates/matrix-sdk/src/error.rs b/crates/matrix-sdk/src/error.rs index 33ab68f0c..cd02986a3 100644 --- a/crates/matrix-sdk/src/error.rs +++ b/crates/matrix-sdk/src/error.rs @@ -40,7 +40,7 @@ use thiserror::Error; use url::ParseError as UrlParseError; /// Result type of the matrix-sdk. -pub type Result = std::result::Result; +pub type Result = std::result::Result; /// Result type of a pure HTTP request. pub type HttpResult = std::result::Result; diff --git a/crates/matrix-sdk/src/event_handler.rs b/crates/matrix-sdk/src/event_handler.rs index ee48a053a..97993eed7 100644 --- a/crates/matrix-sdk/src/event_handler.rs +++ b/crates/matrix-sdk/src/event_handler.rs @@ -186,8 +186,7 @@ pub struct Ctx(pub T); impl EventHandlerContext for Ctx { fn from_data(data: &EventHandlerData<'_>) -> Option { - let anymap = data.client.event_handler_data.read().unwrap(); - Some(Ctx(anymap.get::()?.clone())) + data.client.event_handler_context::().map(Ctx) } } @@ -330,8 +329,7 @@ impl Client { // Construct event handler futures let futures: Vec<_> = self - .event_handlers - .read() + .event_handlers() .await .get(&event_handler_id) .into_iter() diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index a9863acd8..2adca293f 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -53,8 +53,8 @@ pub mod event_handler; mod http_client; /// High-level room API pub mod room; -/// High-level room API mod room_member; +mod sync; #[cfg(feature = "encryption")] pub mod encryption; diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index 6c51cb532..f36221861 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -168,14 +168,17 @@ impl Common { pub(crate) async fn request_members(&self) -> Result> { if let Some(mutex) = - self.client.members_request_locks.get(self.inner.room_id()).map(|m| m.clone()) + self.client.inner.members_request_locks.get(self.inner.room_id()).map(|m| m.clone()) { mutex.lock().await; Ok(None) } else { let mutex = Arc::new(Mutex::new(())); - self.client.members_request_locks.insert(self.inner.room_id().clone(), mutex.clone()); + self.client + .inner + .members_request_locks + .insert(self.inner.room_id().clone(), mutex.clone()); let _guard = mutex.lock().await; @@ -183,9 +186,9 @@ impl Common { let response = self.client.send(request, None).await?; let response = - self.client.base_client.receive_members(self.inner.room_id(), &response).await?; + self.client.base_client().receive_members(self.inner.room_id(), &response).await?; - self.client.members_request_locks.remove(self.inner.room_id()); + self.client.inner.members_request_locks.remove(self.inner.room_id()); Ok(Some(response)) } diff --git a/crates/matrix-sdk/src/room/joined.rs b/crates/matrix-sdk/src/room/joined.rs index ce9e64361..8ae09561c 100644 --- a/crates/matrix-sdk/src/room/joined.rs +++ b/crates/matrix-sdk/src/room/joined.rs @@ -170,37 +170,39 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.typing_notice(true).await? /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn typing_notice(&self, typing: bool) -> Result<()> { // Only send a request to the homeserver if the old timeout has elapsed // or the typing notice changed state within the // TYPING_NOTICE_TIMEOUT - let send = - if let Some(typing_time) = self.client.typing_notice_times.get(self.inner.room_id()) { - if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT { - // We always reactivate the typing notice if typing is true or - // we may need to deactivate it if it's - // currently active if typing is false - typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT - } else { - // Only send a request when we need to deactivate typing - !typing - } + let send = if let Some(typing_time) = + self.client.inner.typing_notice_times.get(self.inner.room_id()) + { + if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT { + // We always reactivate the typing notice if typing is true or + // we may need to deactivate it if it's + // currently active if typing is false + typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT } else { - // Typing notice is currently deactivated, therefore, send a request - // only when it's about to be activated - typing - }; + // Only send a request when we need to deactivate typing + !typing + } + } else { + // Typing notice is currently deactivated, therefore, send a request + // only when it's about to be activated + typing + }; if send { let typing = if typing { self.client + .inner .typing_notice_times .insert(self.inner.room_id().clone(), Instant::now()); Typing::Yes(TYPING_NOTICE_TIMEOUT) } else { - self.client.typing_notice_times.remove(self.inner.room_id()); + self.client.inner.typing_notice_times.remove(self.inner.room_id()); Typing::No }; @@ -278,7 +280,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.enable_encryption().await? /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn enable_encryption(&self) -> Result<()> { use ruma::{ @@ -294,7 +296,7 @@ impl Joined { // TODO do we want to return an error here if we time out? This // could be quite useful if someone wants to enable encryption and // send a message right after it's enabled. - self.client.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); + self.client.inner.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME); } Ok(()) @@ -311,7 +313,7 @@ impl Joined { // TODO expose this publicly so people can pre-share a group session if // e.g. a user starts to type a message for a room. if let Some(mutex) = - self.client.group_session_locks.get(self.inner.room_id()).map(|m| m.clone()) + self.client.inner.group_session_locks.get(self.inner.room_id()).map(|m| m.clone()) { // If a group session share request is already going on, // await the release of the lock. @@ -320,7 +322,10 @@ impl Joined { // Otherwise create a new lock and share the group // session. let mutex = Arc::new(Mutex::new(())); - self.client.group_session_locks.insert(self.inner.room_id().clone(), mutex.clone()); + self.client + .inner + .group_session_locks + .insert(self.inner.room_id().clone(), mutex.clone()); let _guard = mutex.lock().await; @@ -334,13 +339,13 @@ impl Joined { let response = self.share_group_session().await; - self.client.group_session_locks.remove(self.inner.room_id()); + self.client.inner.group_session_locks.remove(self.inner.room_id()); // If one of the responses failed invalidate the group // session as using it would end up in undecryptable // messages. if let Err(r) = response { - self.client.base_client.invalidate_group_session(self.inner.room_id()).await?; + self.client.base_client().invalidate_group_session(self.inner.room_id()).await?; return Err(r); } } @@ -357,12 +362,12 @@ impl Joined { #[cfg(feature = "encryption")] async fn share_group_session(&self) -> Result<()> { let mut requests = - self.client.base_client.share_group_session(self.inner.room_id()).await?; + self.client.base_client().share_group_session(self.inner.room_id()).await?; for request in requests.drain(..) { let response = self.client.send_to_device(&request).await?; - self.client.base_client.mark_request_as_sent(&request.txn_id, &response).await?; + self.client.mark_request_as_sent(&request.txn_id, &response).await?; } Ok(()) @@ -449,7 +454,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.send(content, Some(txn_id)).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`SyncMessageEvent`]: ruma::events::SyncMessageEvent @@ -517,7 +522,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.send_raw(content, "m.room.message", None).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` /// /// [`SyncMessageEvent`]: ruma::events::SyncMessageEvent @@ -554,8 +559,7 @@ impl Joined { self.preshare_group_session().await?; - let olm = - self.client.base_client.olm_machine().await.expect("Olm machine wasn't started"); + let olm = self.client.olm_machine().await.expect("Olm machine wasn't started"); let encrypted_content = olm.encrypt_raw(self.inner.room_id(), content, event_type).await?; @@ -631,7 +635,7 @@ impl Joined { /// None, /// ).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn send_attachment( &self, @@ -698,7 +702,7 @@ impl Joined { /// if let Some(room) = client.get_joined_room(&room_id) { /// room.send_state_event(content, "").await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn send_state_event( &self, @@ -791,7 +795,7 @@ impl Joined { /// let reason = Some("Indecent material"); /// room.redact(&event_id, reason, None).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn redact( &self, @@ -834,7 +838,7 @@ impl Joined { /// /// room.set_tag("u.work", tag_info ).await?; /// } - /// # matrix_sdk::Result::Ok(()) }); + /// # Result::<_, matrix_sdk::Error>::Ok(()) }); /// ``` pub async fn set_tag(&self, tag: &str, tag_info: TagInfo) -> HttpResult { let user_id = self.client.user_id().await.ok_or(HttpError::AuthenticationRequired)?; diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs new file mode 100644 index 000000000..398fbd914 --- /dev/null +++ b/crates/matrix-sdk/src/sync.rs @@ -0,0 +1,144 @@ +use std::time::Duration; + +use futures_timer::Delay as sleep; +use matrix_sdk_base::{ + deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse}, + instant::Instant, +}; +use ruma::api::client::r0::sync::sync_events; +use tracing::{error, warn}; + +use crate::{event_handler::EventKind, Client, Result}; + +/// Internal functionality related to getting events from the server +/// (`sync_events` endpoint) +impl Client { + pub(crate) async fn process_sync( + &self, + response: sync_events::Response, + ) -> Result { + let response = self.base_client().receive_sync_response(response).await?; + let SyncResponse { + next_batch: _, + rooms, + presence, + account_data, + to_device: _, + device_lists: _, + device_one_time_keys_count: _, + ambiguity_changes: _, + notifications, + } = &response; + + self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?; + self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?; + + for (room_id, room_info) in &rooms.join { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } = + room_info; + + self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?; + self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) + .await?; + self.handle_sync_state_events(&room, &state.events).await?; + self.handle_sync_timeline_events(&room, &timeline.events).await?; + } + + for (room_id, room_info) in &rooms.leave { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + let LeftRoom { timeline, state, account_data } = room_info; + + self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) + .await?; + self.handle_sync_state_events(&room, &state.events).await?; + self.handle_sync_timeline_events(&room, &timeline.events).await?; + } + + for (room_id, room_info) in &rooms.invite { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + // FIXME: Destructure room_info + self.handle_sync_events( + EventKind::StrippedState, + &room, + &room_info.invite_state.events, + ) + .await?; + } + + // Construct notification event handler futures + let mut futures = Vec::new(); + for handler in &*self.notification_handlers().await { + for (room_id, room_notifications) in notifications { + let room = match self.get_room(room_id) { + Some(room) => room, + None => { + warn!("Can't call notification handler, room {} not found", room_id); + continue; + } + }; + + futures.extend(room_notifications.iter().map(|notification| { + (handler)(notification.clone(), room.clone(), self.clone()) + })); + } + } + + // Run the notification handler futures with the + // `self.notification_handlers` lock no longer being held, in order. + for fut in futures { + fut.await; + } + + Ok(response) + } + + pub(crate) async fn sync_loop_helper( + &self, + sync_settings: &mut crate::config::SyncSettings<'_>, + ) -> Result { + let response = self.sync_once(sync_settings.clone()).await; + + match response { + Ok(r) => { + sync_settings.token = Some(r.next_batch.clone()); + Ok(r) + } + Err(e) => { + error!("Received an invalid response: {}", e); + sleep::new(Duration::from_secs(1)).await; + Err(e) + } + } + } + + pub(crate) async fn delay_sync(last_sync_time: &mut Option) { + let now = Instant::now(); + + // If the last sync happened less than a second ago, sleep for a + // while to not hammer out requests if the server doesn't respect + // the sync timeout. + if let Some(t) = last_sync_time { + if now - *t <= Duration::from_secs(1) { + sleep::new(Duration::from_secs(1)).await; + } + } + + *last_sync_time = Some(now); + } +}