feat(sdk): Add a method to sync as an async stream

This commit is contained in:
Damir Jelić
2021-09-17 10:37:23 +02:00
parent 051d935b28
commit 39edd32072
2 changed files with 112 additions and 30 deletions

View File

@@ -49,6 +49,7 @@ thiserror = "1.0.25"
tracing = "0.1.26"
url = "2.2.2"
zeroize = "1.3.0"
async-stream = "0.3.2"
[dependencies.matrix-sdk-base]
version = "0.4.0"

View File

@@ -1664,6 +1664,40 @@ impl Client {
Ok(response)
}
async fn sync_loop_helper(
&self,
sync_settings: &mut crate::config::SyncSettings<'_>,
) -> Result<SyncResponse> {
let response = self.sync_once(sync_settings.clone()).await;
match response {
Ok(r) => {
sync_settings.token = Some(r.next_batch.clone());
Ok(r)
}
Err(e) => {
error!("Received an invalid response: {}", e);
sleep::new(Duration::from_secs(1)).await;
Err(e)
}
}
}
async fn delay_sync(last_sync_time: &mut Option<Instant>) {
let now = Instant::now();
// If the last sync happened less than a second ago, sleep for a
// while to not hammer out requests if the server doesn't respect
// the sync timeout.
if let Some(t) = last_sync_time {
if now - *t <= Duration::from_secs(1) {
sleep::new(Duration::from_secs(1)).await;
}
}
*last_sync_time = Some(now);
}
/// Synchronize the client's state with the latest state on the server.
///
/// ## Syncing Events
@@ -1799,9 +1833,9 @@ impl Client {
///
/// This method can be used with the [`Client::register_event_handler`]
/// method to react to individual events. If you instead wish to handle
/// events in a bulk manner the [`Client::sync_with_callback`]
/// methods can be used instead. This method repeadetly returns the whole
/// sync response.
/// events in a bulk manner the [`Client::sync_with_callback`] and
/// [`Client::sync_stream`] methods can be used instead. Those two methods
/// repeadetly return the whole sync response.
///
/// # Arguments
///
@@ -1918,38 +1952,85 @@ impl Client {
}
loop {
let response = self.sync_once(sync_settings.clone()).await;
let response = match response {
Ok(r) => r,
Err(e) => {
error!("Received an invalid response: {}", e);
sleep::new(Duration::from_secs(1)).await;
continue;
// TODO we should abort the sync loop if the error is a storage error or
// the access token got invalid.
if let Ok(r) = self.sync_loop_helper(&mut sync_settings).await {
if callback(r).await == LoopCtrl::Break {
return;
}
};
if callback(response).await == LoopCtrl::Break {
return;
} else {
continue;
}
let now = Instant::now();
Client::delay_sync(&mut last_sync_time).await
}
}
// If the last sync happened less than a second ago, sleep for a
// while to not hammer out requests if the server doesn't respect
// the sync timeout.
if let Some(t) = last_sync_time {
if now - t <= Duration::from_secs(1) {
sleep::new(Duration::from_secs(1)).await;
}
//// Repeatedly synchronize the client state with the server.
///
/// This method will internally call [`Client::sync_once`] in a loop and is
/// equivalent to the [`Client::sync`] method but the resopnses are provided
/// as an async stream.
///
/// # Arguments
///
/// * `sync_settings` - Settings for the sync call. *Note* that those
/// settings will be only used for the first sync call. See the argument
/// docs for [`Client::sync_once`] for more info.
///
/// # Examples
///
/// ```no_run
/// # use url::Url;
/// # use futures::executor::block_on;
/// # block_on(async {
/// # let homeserver = Url::parse("http://localhost:8080")?;
/// # let username = "";
/// # let password = "";
/// use futures::stream::TryStreamExt;
/// use matrix_sdk::{
/// Client, config::SyncSettings,
/// ruma::events::{SyncMessageEvent, room::message::MessageEventContent},
/// };
///
/// let client = Client::new(homeserver)?;
/// client.login(&username, &password, None, None).await?;
///
/// let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await);
///
/// for response in sync_stream.try_next().await? {
/// for room in response.rooms.join.values() {
/// for e in &room.timeline.events {
/// if let Ok(event) = e.event.deserialize() {
/// println!("Received event {:?}", event);
/// }
/// }
/// }
/// }
///
/// # matrix_sdk::Result::Ok(()) });
/// ```
#[instrument]
pub async fn sync_stream<'a>(
&'a self,
mut sync_settings: crate::config::SyncSettings<'a>,
) -> impl futures::stream::Stream<Item = Result<SyncResponse>> + 'a {
let mut last_sync_time: Option<Instant> = None;
if sync_settings.token.is_none() {
sync_settings.token = self.sync_token().await;
}
// TODO we should only abort the sync loop if the error is a storage error or
// the access token got invalid.
async_stream::try_stream! {
loop {
let response = self.sync_loop_helper(&mut sync_settings).await?;
yield response;
Client::delay_sync(&mut last_sync_time).await
}
last_sync_time = Some(now);
sync_settings.token =
Some(self.sync_token().await.expect("No sync token found after initial sync"));
self.sync_beat.notify(usize::MAX);
}
}