Add setting to ignore the timeout sync setting on first sync (#5481)

The `timeout` setting on the `/sync` endpoint is the maximum allowed
time for the server to send its response, because this is a poll-based
API. It means that if there is no new data to show, the server will wait
until the end of `timeout` before returning a response.

It can be an undesirable behavior when starting a client and informing
the user that we are "catching up" while waiting for the first response.

By not setting a `timeout` on the first request to `/sync`, the
homeserver should reply immediately, whether the response is empty or
not.

---------

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
Signed-off-by: Ivan Enderlin <ivan@mnt.io>
Co-authored-by: Ivan Enderlin <ivan@mnt.io>
This commit is contained in:
Kévin Commaille
2025-08-06 16:48:44 +02:00
committed by GitHub
parent feb22d4370
commit 872713c4bc
4 changed files with 146 additions and 32 deletions

View File

@@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file.
### Features
- Add `ignore_timeout_on_first_sync` to the `SyncSettings`, which should allow to have a quicker
first response when using one of the `sync`, `sync_with_callback`, `sync_with_result_callback`
or `sync_stream` methods on `Client`, if the response is empty.
([#5481](https://github.com/matrix-org/matrix-rust-sdk/pull/5481))
- The methods to use the `/v3/sync` endpoint set the `use_state_after` field,
which means that, if the server supports it, the response will contain the
state changes between the last sync and the end of the timeline.

View File

@@ -2533,30 +2533,21 @@ impl Client {
#[instrument(skip(self, callback))]
pub async fn sync_with_result_callback<C>(
&self,
mut sync_settings: crate::config::SyncSettings,
sync_settings: crate::config::SyncSettings,
callback: impl Fn(Result<SyncResponse, Error>) -> C,
) -> Result<(), Error>
where
C: Future<Output = Result<LoopCtrl, Error>>,
{
let mut last_sync_time: Option<Instant> = None;
if sync_settings.token.is_none() {
sync_settings.token = self.sync_token().await;
}
loop {
trace!("Syncing");
let result = self.sync_loop_helper(&mut sync_settings).await;
let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
while let Some(result) = sync_stream.next().await {
trace!("Running callback");
if callback(result).await? == LoopCtrl::Break {
trace!("Callback told us to stop");
break;
}
trace!("Done running callback");
Client::delay_sync(&mut last_sync_time).await
}
Ok(())
@@ -2609,6 +2600,8 @@ impl Client {
&self,
mut sync_settings: crate::config::SyncSettings,
) -> impl Stream<Item = Result<SyncResponse>> + '_ {
let mut is_first_sync = true;
let mut timeout = None;
let mut last_sync_time: Option<Instant> = None;
if sync_settings.token.is_none() {
@@ -2617,13 +2610,28 @@ impl Client {
let parent_span = Span::current();
async_stream::stream! {
async_stream::stream!({
loop {
yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
trace!("Syncing");
if sync_settings.ignore_timeout_on_first_sync {
if is_first_sync {
timeout = sync_settings.timeout.take();
} else if sync_settings.timeout.is_none() && timeout.is_some() {
sync_settings.timeout = timeout.take();
}
is_first_sync = false;
}
yield self
.sync_loop_helper(&mut sync_settings)
.instrument(parent_span.clone())
.await;
Client::delay_sync(&mut last_sync_time).await
}
}
})
}
/// Get the current, if any, sync token of the client.
@@ -2932,7 +2940,7 @@ pub(crate) mod tests {
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball::SharedObservable;
use futures_util::{pin_mut, FutureExt};
use futures_util::{pin_mut, FutureExt, StreamExt};
use js_int::{uint, UInt};
use matrix_sdk_base::{
store::{MemoryStore, StoreConfig},
@@ -2970,8 +2978,9 @@ pub(crate) mod tests {
use super::Client;
use crate::{
assert_let_timeout,
client::{futures::SendMediaUploadRequest, WeakClient},
config::RequestConfig,
config::{RequestConfig, SyncSettings},
futures::SendRequest,
media::MediaError,
test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
@@ -3831,4 +3840,53 @@ pub(crate) mod tests {
assert_eq!(max, uint!(1));
assert_eq!(current, UInt::new_wrapping(data.len() as u64));
}
#[async_test]
async fn test_dont_ignore_timeout_on_first_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server
.mock_sync()
.timeout(Some(Duration::from_secs(30)))
.ok(|_| {})
.mock_once()
.named("sync_with_timeout")
.mount()
.await;
// Call the endpoint once to check the timeout.
let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
assert_let_timeout!(Some(Ok(_)) = stream.next());
}
#[async_test]
async fn test_ignore_timeout_on_first_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server
.mock_sync()
.timeout(None)
.ok(|_| {})
.mock_once()
.named("sync_no_timeout")
.mount()
.await;
server
.mock_sync()
.timeout(Some(Duration::from_secs(30)))
.ok(|_| {})
.mock_once()
.named("sync_with_timeout")
.mount()
.await;
// Call each version of the endpoint once to check the timeouts.
let mut stream = Box::pin(
client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
);
assert_let_timeout!(Some(Ok(_)) = stream.next());
assert_let_timeout!(Some(Ok(_)) = stream.next());
}
}

View File

@@ -25,6 +25,7 @@ pub struct SyncSettings {
// Filter is pretty big at 1000 bytes, box it to reduce stack size
pub(crate) filter: Option<Box<sync_events::v3::Filter>>,
pub(crate) timeout: Option<Duration>,
pub(crate) ignore_timeout_on_first_sync: bool,
pub(crate) token: Option<String>,
pub(crate) full_state: bool,
pub(crate) set_presence: PresenceState,
@@ -39,10 +40,18 @@ impl Default for SyncSettings {
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for SyncSettings {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { filter, timeout, token: _, full_state, set_presence } = self;
let Self {
filter,
timeout,
ignore_timeout_on_first_sync,
token: _,
full_state,
set_presence,
} = self;
f.debug_struct("SyncSettings")
.maybe_field("filter", filter)
.maybe_field("timeout", timeout)
.field("ignore_timeout_on_first_sync", ignore_timeout_on_first_sync)
.field("full_state", full_state)
.field("set_presence", set_presence)
.finish()
@@ -56,6 +65,7 @@ impl SyncSettings {
Self {
filter: None,
timeout: Some(DEFAULT_SYNC_TIMEOUT),
ignore_timeout_on_first_sync: false,
token: None,
full_state: false,
set_presence: PresenceState::Online,
@@ -85,6 +95,33 @@ impl SyncSettings {
self
}
/// Whether to ignore the `timeout` the first time that the `/sync` endpoint
/// is called.
///
/// If there is no new data to show, the server will wait until the end of
/// `timeout` before returning a response. It can be an undesirable
/// behavior when starting a client and informing the user that we are
/// "catching up" while waiting for the first response.
///
/// By not setting a `timeout` on the first request to `/sync`, the
/// homeserver should reply immediately, whether the response is empty or
/// not.
///
/// Note that this setting is ignored when calling [`Client::sync_once()`],
/// because there is no loop happening.
///
/// # Arguments
///
/// * `ignore` - Whether to ignore the `timeout` the first time that the
/// `/sync` endpoint is called.
///
/// [`Client::sync_once()`]: crate::Client::sync_once
#[must_use]
pub fn ignore_timeout_on_first_sync(mut self, ignore: bool) -> Self {
self.ignore_timeout_on_first_sync = ignore;
self
}
/// Set the sync filter.
/// It can be either the filter ID, or the definition for the filter.
///

View File

@@ -50,7 +50,10 @@ use serde::Deserialize;
use serde_json::{from_value, json, Value};
use tokio::sync::oneshot::{self, Receiver};
use wiremock::{
matchers::{body_json, body_partial_json, header, method, path, path_regex, query_param},
matchers::{
body_json, body_partial_json, header, method, path, path_regex, query_param,
query_param_is_missing,
},
Mock, MockBuilder, MockGuard, MockServer, Request, Respond, ResponseTemplate, Times,
};
@@ -342,7 +345,6 @@ impl MatrixMockServer {
mock,
SyncEndpoint { sync_response_builder: self.sync_response_builder.clone() },
)
.expect_default_access_token()
}
/// Creates a prebuilt mock for joining a room.
@@ -2314,7 +2316,30 @@ pub struct SyncEndpoint {
sync_response_builder: Arc<Mutex<SyncResponseBuilder>>,
}
impl MockEndpoint<'_, SyncEndpoint> {
impl<'a> MockEndpoint<'a, SyncEndpoint> {
/// Expect the given timeout, or lack thereof, in the request.
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
if let Some(timeout) = timeout {
self.mock = self.mock.and(query_param("timeout", timeout.as_millis().to_string()));
} else {
self.mock = self.mock.and(query_param_is_missing("timeout"));
}
self
}
/// Mocks the sync endpoint, using the given function to generate the
/// response.
pub fn ok<F: FnOnce(&mut SyncResponseBuilder)>(self, func: F) -> MatrixMock<'a> {
let json_response = {
let mut builder = self.endpoint.sync_response_builder.lock().unwrap();
func(&mut builder);
builder.build_json_sync_response()
};
self.respond_with(ResponseTemplate::new(200).set_body_json(json_response))
}
/// Temporarily mocks the sync with the given endpoint and runs a client
/// sync with it.
///
@@ -2346,17 +2371,7 @@ impl MockEndpoint<'_, SyncEndpoint> {
/// # anyhow::Ok(()) });
/// ```
pub async fn ok_and_run<F: FnOnce(&mut SyncResponseBuilder)>(self, client: &Client, func: F) {
let json_response = {
let mut builder = self.endpoint.sync_response_builder.lock().unwrap();
func(&mut builder);
builder.build_json_sync_response()
};
let _scope = self
.mock
.respond_with(ResponseTemplate::new(200).set_body_json(json_response))
.mount_as_scoped(self.server)
.await;
let _scope = self.ok(func).mount_as_scoped().await;
let _response = client.sync_once(Default::default()).await.unwrap();
}