diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index b2e7e4994..127caa85d 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file. ### Features +- Add `ignore_timeout_on_first_sync` to the `SyncSettings`, which should allow to have a quicker + first response when using one of the `sync`, `sync_with_callback`, `sync_with_result_callback` + or `sync_stream` methods on `Client`, if the response is empty. + ([#5481](https://github.com/matrix-org/matrix-rust-sdk/pull/5481)) - The methods to use the `/v3/sync` endpoint set the `use_state_after` field, which means that, if the server supports it, the response will contain the state changes between the last sync and the end of the timeline. diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 56faced5d..73d41274d 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -2533,30 +2533,21 @@ impl Client { #[instrument(skip(self, callback))] pub async fn sync_with_result_callback( &self, - mut sync_settings: crate::config::SyncSettings, + sync_settings: crate::config::SyncSettings, callback: impl Fn(Result) -> C, ) -> Result<(), Error> where C: Future>, { - let mut last_sync_time: Option = None; - - if sync_settings.token.is_none() { - sync_settings.token = self.sync_token().await; - } - - loop { - trace!("Syncing"); - let result = self.sync_loop_helper(&mut sync_settings).await; + let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await); + while let Some(result) = sync_stream.next().await { trace!("Running callback"); if callback(result).await? == LoopCtrl::Break { trace!("Callback told us to stop"); break; } trace!("Done running callback"); - - Client::delay_sync(&mut last_sync_time).await } Ok(()) @@ -2609,6 +2600,8 @@ impl Client { &self, mut sync_settings: crate::config::SyncSettings, ) -> impl Stream> + '_ { + let mut is_first_sync = true; + let mut timeout = None; let mut last_sync_time: Option = None; if sync_settings.token.is_none() { @@ -2617,13 +2610,28 @@ impl Client { let parent_span = Span::current(); - async_stream::stream! { + async_stream::stream!({ loop { - yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await; + trace!("Syncing"); + + if sync_settings.ignore_timeout_on_first_sync { + if is_first_sync { + timeout = sync_settings.timeout.take(); + } else if sync_settings.timeout.is_none() && timeout.is_some() { + sync_settings.timeout = timeout.take(); + } + + is_first_sync = false; + } + + yield self + .sync_loop_helper(&mut sync_settings) + .instrument(parent_span.clone()) + .await; Client::delay_sync(&mut last_sync_time).await } - } + }) } /// Get the current, if any, sync token of the client. @@ -2932,7 +2940,7 @@ pub(crate) mod tests { use assert_matches::assert_matches; use assert_matches2::assert_let; use eyeball::SharedObservable; - use futures_util::{pin_mut, FutureExt}; + use futures_util::{pin_mut, FutureExt, StreamExt}; use js_int::{uint, UInt}; use matrix_sdk_base::{ store::{MemoryStore, StoreConfig}, @@ -2970,8 +2978,9 @@ pub(crate) mod tests { use super::Client; use crate::{ + assert_let_timeout, client::{futures::SendMediaUploadRequest, WeakClient}, - config::RequestConfig, + config::{RequestConfig, SyncSettings}, futures::SendRequest, media::MediaError, test_utils::{client::MockClientBuilder, mocks::MatrixMockServer}, @@ -3831,4 +3840,53 @@ pub(crate) mod tests { assert_eq!(max, uint!(1)); assert_eq!(current, UInt::new_wrapping(data.len() as u64)); } + + #[async_test] + async fn test_dont_ignore_timeout_on_first_sync() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + server + .mock_sync() + .timeout(Some(Duration::from_secs(30))) + .ok(|_| {}) + .mock_once() + .named("sync_with_timeout") + .mount() + .await; + + // Call the endpoint once to check the timeout. + let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await); + assert_let_timeout!(Some(Ok(_)) = stream.next()); + } + + #[async_test] + async fn test_ignore_timeout_on_first_sync() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + server + .mock_sync() + .timeout(None) + .ok(|_| {}) + .mock_once() + .named("sync_no_timeout") + .mount() + .await; + server + .mock_sync() + .timeout(Some(Duration::from_secs(30))) + .ok(|_| {}) + .mock_once() + .named("sync_with_timeout") + .mount() + .await; + + // Call each version of the endpoint once to check the timeouts. + let mut stream = Box::pin( + client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await, + ); + assert_let_timeout!(Some(Ok(_)) = stream.next()); + assert_let_timeout!(Some(Ok(_)) = stream.next()); + } } diff --git a/crates/matrix-sdk/src/config/sync.rs b/crates/matrix-sdk/src/config/sync.rs index fd3627db6..04dd27c02 100644 --- a/crates/matrix-sdk/src/config/sync.rs +++ b/crates/matrix-sdk/src/config/sync.rs @@ -25,6 +25,7 @@ pub struct SyncSettings { // Filter is pretty big at 1000 bytes, box it to reduce stack size pub(crate) filter: Option>, pub(crate) timeout: Option, + pub(crate) ignore_timeout_on_first_sync: bool, pub(crate) token: Option, pub(crate) full_state: bool, pub(crate) set_presence: PresenceState, @@ -39,10 +40,18 @@ impl Default for SyncSettings { #[cfg(not(tarpaulin_include))] impl fmt::Debug for SyncSettings { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let Self { filter, timeout, token: _, full_state, set_presence } = self; + let Self { + filter, + timeout, + ignore_timeout_on_first_sync, + token: _, + full_state, + set_presence, + } = self; f.debug_struct("SyncSettings") .maybe_field("filter", filter) .maybe_field("timeout", timeout) + .field("ignore_timeout_on_first_sync", ignore_timeout_on_first_sync) .field("full_state", full_state) .field("set_presence", set_presence) .finish() @@ -56,6 +65,7 @@ impl SyncSettings { Self { filter: None, timeout: Some(DEFAULT_SYNC_TIMEOUT), + ignore_timeout_on_first_sync: false, token: None, full_state: false, set_presence: PresenceState::Online, @@ -85,6 +95,33 @@ impl SyncSettings { self } + /// Whether to ignore the `timeout` the first time that the `/sync` endpoint + /// is called. + /// + /// If there is no new data to show, the server will wait until the end of + /// `timeout` before returning a response. It can be an undesirable + /// behavior when starting a client and informing the user that we are + /// "catching up" while waiting for the first response. + /// + /// By not setting a `timeout` on the first request to `/sync`, the + /// homeserver should reply immediately, whether the response is empty or + /// not. + /// + /// Note that this setting is ignored when calling [`Client::sync_once()`], + /// because there is no loop happening. + /// + /// # Arguments + /// + /// * `ignore` - Whether to ignore the `timeout` the first time that the + /// `/sync` endpoint is called. + /// + /// [`Client::sync_once()`]: crate::Client::sync_once + #[must_use] + pub fn ignore_timeout_on_first_sync(mut self, ignore: bool) -> Self { + self.ignore_timeout_on_first_sync = ignore; + self + } + /// Set the sync filter. /// It can be either the filter ID, or the definition for the filter. /// diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index ef5dcf2ed..5d5d851ef 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -50,7 +50,10 @@ use serde::Deserialize; use serde_json::{from_value, json, Value}; use tokio::sync::oneshot::{self, Receiver}; use wiremock::{ - matchers::{body_json, body_partial_json, header, method, path, path_regex, query_param}, + matchers::{ + body_json, body_partial_json, header, method, path, path_regex, query_param, + query_param_is_missing, + }, Mock, MockBuilder, MockGuard, MockServer, Request, Respond, ResponseTemplate, Times, }; @@ -342,7 +345,6 @@ impl MatrixMockServer { mock, SyncEndpoint { sync_response_builder: self.sync_response_builder.clone() }, ) - .expect_default_access_token() } /// Creates a prebuilt mock for joining a room. @@ -2314,7 +2316,30 @@ pub struct SyncEndpoint { sync_response_builder: Arc>, } -impl MockEndpoint<'_, SyncEndpoint> { +impl<'a> MockEndpoint<'a, SyncEndpoint> { + /// Expect the given timeout, or lack thereof, in the request. + pub fn timeout(mut self, timeout: Option) -> Self { + if let Some(timeout) = timeout { + self.mock = self.mock.and(query_param("timeout", timeout.as_millis().to_string())); + } else { + self.mock = self.mock.and(query_param_is_missing("timeout")); + } + + self + } + + /// Mocks the sync endpoint, using the given function to generate the + /// response. + pub fn ok(self, func: F) -> MatrixMock<'a> { + let json_response = { + let mut builder = self.endpoint.sync_response_builder.lock().unwrap(); + func(&mut builder); + builder.build_json_sync_response() + }; + + self.respond_with(ResponseTemplate::new(200).set_body_json(json_response)) + } + /// Temporarily mocks the sync with the given endpoint and runs a client /// sync with it. /// @@ -2346,17 +2371,7 @@ impl MockEndpoint<'_, SyncEndpoint> { /// # anyhow::Ok(()) }); /// ``` pub async fn ok_and_run(self, client: &Client, func: F) { - let json_response = { - let mut builder = self.endpoint.sync_response_builder.lock().unwrap(); - func(&mut builder); - builder.build_json_sync_response() - }; - - let _scope = self - .mock - .respond_with(ResponseTemplate::new(200).set_body_json(json_response)) - .mount_as_scoped(self.server) - .await; + let _scope = self.ok(func).mount_as_scoped().await; let _response = client.sync_once(Default::default()).await.unwrap(); }