diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 3e97ecb02..4a03b138e 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -49,6 +49,7 @@ thiserror = "1.0.25" tracing = "0.1.26" url = "2.2.2" zeroize = "1.3.0" +async-stream = "0.3.2" [dependencies.matrix-sdk-base] version = "0.4.0" diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index a63df87e9..40c9f109b 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -1664,6 +1664,40 @@ impl Client { Ok(response) } + async fn sync_loop_helper( + &self, + sync_settings: &mut crate::config::SyncSettings<'_>, + ) -> Result { + let response = self.sync_once(sync_settings.clone()).await; + + match response { + Ok(r) => { + sync_settings.token = Some(r.next_batch.clone()); + Ok(r) + } + Err(e) => { + error!("Received an invalid response: {}", e); + sleep::new(Duration::from_secs(1)).await; + Err(e) + } + } + } + + async fn delay_sync(last_sync_time: &mut Option) { + let now = Instant::now(); + + // If the last sync happened less than a second ago, sleep for a + // while to not hammer out requests if the server doesn't respect + // the sync timeout. + if let Some(t) = last_sync_time { + if now - *t <= Duration::from_secs(1) { + sleep::new(Duration::from_secs(1)).await; + } + } + + *last_sync_time = Some(now); + } + /// Synchronize the client's state with the latest state on the server. /// /// ## Syncing Events @@ -1799,9 +1833,9 @@ impl Client { /// /// This method can be used with the [`Client::register_event_handler`] /// method to react to individual events. If you instead wish to handle - /// events in a bulk manner the [`Client::sync_with_callback`] - /// methods can be used instead. This method repeadetly returns the whole - /// sync response. + /// events in a bulk manner the [`Client::sync_with_callback`] and + /// [`Client::sync_stream`] methods can be used instead. Those two methods + /// repeadetly return the whole sync response. /// /// # Arguments /// @@ -1918,38 +1952,85 @@ impl Client { } loop { - let response = self.sync_once(sync_settings.clone()).await; - - let response = match response { - Ok(r) => r, - Err(e) => { - error!("Received an invalid response: {}", e); - sleep::new(Duration::from_secs(1)).await; - continue; + // TODO we should abort the sync loop if the error is a storage error or + // the access token got invalid. + if let Ok(r) = self.sync_loop_helper(&mut sync_settings).await { + if callback(r).await == LoopCtrl::Break { + return; } - }; - - if callback(response).await == LoopCtrl::Break { - return; + } else { + continue; } - let now = Instant::now(); + Client::delay_sync(&mut last_sync_time).await + } + } - // If the last sync happened less than a second ago, sleep for a - // while to not hammer out requests if the server doesn't respect - // the sync timeout. - if let Some(t) = last_sync_time { - if now - t <= Duration::from_secs(1) { - sleep::new(Duration::from_secs(1)).await; - } + //// Repeatedly synchronize the client state with the server. + /// + /// This method will internally call [`Client::sync_once`] in a loop and is + /// equivalent to the [`Client::sync`] method but the resopnses are provided + /// as an async stream. + /// + /// # Arguments + /// + /// * `sync_settings` - Settings for the sync call. *Note* that those + /// settings will be only used for the first sync call. See the argument + /// docs for [`Client::sync_once`] for more info. + /// + /// # Examples + /// + /// ```no_run + /// # use url::Url; + /// # use futures::executor::block_on; + /// # block_on(async { + /// # let homeserver = Url::parse("http://localhost:8080")?; + /// # let username = ""; + /// # let password = ""; + /// use futures::stream::TryStreamExt; + /// use matrix_sdk::{ + /// Client, config::SyncSettings, + /// ruma::events::{SyncMessageEvent, room::message::MessageEventContent}, + /// }; + /// + /// let client = Client::new(homeserver)?; + /// client.login(&username, &password, None, None).await?; + /// + /// let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await); + /// + /// for response in sync_stream.try_next().await? { + /// for room in response.rooms.join.values() { + /// for e in &room.timeline.events { + /// if let Ok(event) = e.event.deserialize() { + /// println!("Received event {:?}", event); + /// } + /// } + /// } + /// } + /// + /// # matrix_sdk::Result::Ok(()) }); + /// ``` + #[instrument] + pub async fn sync_stream<'a>( + &'a self, + mut sync_settings: crate::config::SyncSettings<'a>, + ) -> impl futures::stream::Stream> + 'a { + let mut last_sync_time: Option = None; + + if sync_settings.token.is_none() { + sync_settings.token = self.sync_token().await; + } + + // TODO we should only abort the sync loop if the error is a storage error or + // the access token got invalid. + async_stream::try_stream! { + loop { + let response = self.sync_loop_helper(&mut sync_settings).await?; + + yield response; + + Client::delay_sync(&mut last_sync_time).await } - - last_sync_time = Some(now); - - sync_settings.token = - Some(self.sync_token().await.expect("No sync token found after initial sync")); - - self.sync_beat.notify(usize::MAX); } }