Get back to Recovering syncing when we haven't sync for a while

This commit is contained in:
Mathieu Velten
2024-09-14 00:03:03 +02:00
committed by Ivan Enderlin
parent 736aa0351c
commit 752706c51d
2 changed files with 192 additions and 53 deletions

View File

@@ -60,7 +60,7 @@ mod state;
use std::{sync::Arc, time::Duration};
use async_stream::stream;
use eyeball::{SharedObservable, Subscriber};
use eyeball::Subscriber;
use futures_util::{pin_mut, Stream, StreamExt};
use matrix_sdk::{
event_cache::EventCacheError, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList,
@@ -89,7 +89,7 @@ pub struct RoomListService {
/// The current state of the `RoomListService`.
///
/// `RoomListService` is a simple state-machine.
state: SharedObservable<State>,
state_machine: StateMachine,
}
impl RoomListService {
@@ -172,7 +172,7 @@ impl RoomListService {
// Eagerly subscribe the event cache to sync responses.
client.event_cache().subscribe()?;
Ok(Self { client, sliding_sync, state: SharedObservable::new(State::Init) })
Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
}
/// Start to sync the room list.
@@ -208,7 +208,7 @@ impl RoomListService {
debug!("Run a sync iteration");
// Calculate the next state, and run the associated actions.
let next_state = self.state.get().next(&self.sliding_sync).await?;
let next_state = self.state_machine.next(&self.sliding_sync).await?;
// Do the sync.
match sync.next().await {
@@ -217,7 +217,7 @@ impl RoomListService {
debug!(state = ?next_state, "New state");
// Update the state.
self.state.set(next_state);
self.state_machine.set(next_state);
yield Ok(());
}
@@ -227,7 +227,7 @@ impl RoomListService {
debug!(expected_state = ?next_state, "New state is an error");
let next_state = State::Error { from: Box::new(next_state) };
self.state.set(next_state);
self.state_machine.set(next_state);
yield Err(Error::SlidingSync(error));
@@ -239,7 +239,7 @@ impl RoomListService {
debug!(expected_state = ?next_state, "New state is a termination");
let next_state = State::Terminated { from: Box::new(next_state) };
self.state.set(next_state);
self.state_machine.set(next_state);
break;
}
@@ -286,8 +286,8 @@ impl RoomListService {
// when the session is forced to expire, the state remains `Terminated`, thus
// the actions aren't executed as expected. Consequently, let's update the
// state.
if let State::Terminated { from } = self.state.get() {
self.state.set(State::Error { from });
if let State::Terminated { from } = self.state_machine.get() {
self.state_machine.set(State::Error { from });
}
}
@@ -341,7 +341,7 @@ impl RoomListService {
// Update the `current_state`.
current_state = next_state;
} else {
// Something is broken with `self.state`. Let's stop this stream too.
// Something is broken with the state. Let's stop this stream too.
break;
}
}
@@ -355,7 +355,7 @@ impl RoomListService {
/// Get a subscriber to the state.
pub fn state(&self) -> Subscriber<State> {
self.state.subscribe()
self.state_machine.subscribe()
}
async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
@@ -396,7 +396,7 @@ impl RoomListService {
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}
let cancel_in_flight_request = match self.state.get() {
let cancel_in_flight_request = match self.state_machine.get() {
State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
false
}
@@ -617,13 +617,16 @@ mod tests {
let _ = sync.next().await;
// State is `Terminated`, as expected!
assert_eq!(room_list.state.get(), State::Terminated { from: Box::new(State::Running) });
assert_eq!(
room_list.state_machine.get(),
State::Terminated { from: Box::new(State::Running) }
);
// Now, let's make the sliding sync session to expire.
room_list.expire_sync_session().await;
// State is `Error`, as a regular session expiration would generate!
assert_eq!(room_list.state.get(), State::Error { from: Box::new(State::Running) });
assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
Ok(())
}

View File

@@ -14,15 +14,20 @@
//! States and actions for the `RoomList` state machine.
use std::future::ready;
use std::{
future::ready,
sync::Mutex,
time::{Duration, Instant},
};
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};
use super::Error;
pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
/// The state of the [`super::RoomList`]' state machine.
/// The state of the [`super::RoomList`].
#[derive(Clone, Debug, PartialEq)]
pub enum State {
/// That's the first initial state.
@@ -46,13 +51,71 @@ pub enum State {
Terminated { from: Box<State> },
}
impl State {
/// Default value for `StateMachine::state_lifespan`.
const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
/// The state machine used to transition between the [`State`]s.
#[derive(Debug)]
pub struct StateMachine {
/// The current state of the `RoomListService`.
state: SharedObservable<State>,
/// Last time the state has been updated.
///
/// When the state has not been updated since a long time, we want to enter
/// the [`State::Recovering`] state. Why do we need to do that? Because in
/// some cases, the user might have received many updates between two
/// distant syncs. If the sliding sync list range was too large, like
/// 0..=499, the next sync is likely to be heavy and potentially slow.
/// In this case, it's preferable to jump back onto `Recovering`, which will
/// reset the range, so that the next sync will be fast for the client.
///
/// To be used in coordination with `Self::state_lifespan`.
///
/// This mutex is only taken for short periods of time, so it's sync.
last_state_update_time: Mutex<Instant>,
/// The maximum time before considering the state as “too old”.
///
/// To be used in coordination with `Self::last_state_update_time`.
state_lifespan: Duration,
}
impl StateMachine {
pub(super) fn new() -> Self {
StateMachine {
state: SharedObservable::new(State::Init),
last_state_update_time: Mutex::new(Instant::now()),
state_lifespan: DEFAULT_STATE_LIFESPAN,
}
}
/// Get the current state.
pub(super) fn get(&self) -> State {
self.state.get()
}
/// Set the new state.
///
/// Setting a new state will update `Self::last_state_update`.
pub(super) fn set(&self, state: State) {
let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
*last_state_update_time = Instant::now();
self.state.set(state);
}
/// Subscribe to state updates.
pub fn subscribe(&self) -> Subscriber<State> {
self.state.subscribe()
}
/// Transition to the next state, and execute the associated transition's
/// [`Actions`].
pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<Self, Error> {
pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
use State::*;
let next_state = match self {
let next_state = match self.get() {
Init => SettingUp,
SettingUp | Recovering => {
@@ -60,7 +123,18 @@ impl State {
Running
}
Running => Running,
Running => {
// We haven't changed the state for a while, we go back to `Recovering` to avoid
// requesting potentially large data. See `Self::last_state_update` to learn
// the details.
if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
Recovering
} else {
Running
}
}
Error { from: previous_state } | Terminated { from: previous_state } => {
match previous_state.as_ref() {
@@ -122,6 +196,7 @@ pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
#[cfg(test)]
mod tests {
use matrix_sdk_test::async_test;
use tokio::time::sleep;
use super::{super::tests::new_room_list, *};
@@ -130,94 +205,155 @@ mod tests {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
// First state.
let state = State::Init;
let state_machine = StateMachine::new();
// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
state_machine.set(State::Error { from: Box::new(state_machine.get()) });
// Back to the previous state.
assert_eq!(state, State::Init);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Init);
}
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
// Back to the previous state.
assert_eq!(state, State::Init);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Init);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::SettingUp);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
state_machine.set(State::Error { from: Box::new(state_machine.get()) });
// Back to the previous state.
assert_eq!(state, State::SettingUp);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
}
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
// Back to the previous state.
assert_eq!(state, State::SettingUp);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
state_machine.set(State::Error { from: Box::new(state_machine.get()) });
// Jump to the **recovering** state!
assert_eq!(state, State::Recovering);
let state = state.next(sliding_sync).await?;
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
// Now, back to the previous state.
assert_eq!(state, State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
// Jump to the **recovering** state!
assert_eq!(state, State::Recovering);
let state = state.next(sliding_sync).await?;
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
// Now, back to the previous state.
assert_eq!(state, State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
// Hypothetical error when recovering.
{
let state =
State::Error { from: Box::new(State::Recovering) }.next(sliding_sync).await?;
state_machine.set(State::Error { from: Box::new(State::Recovering) });
// Back to the previous state.
assert_eq!(state, State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
}
// Hypothetical termination when recovering.
{
let state =
State::Terminated { from: Box::new(State::Recovering) }.next(sliding_sync).await?;
state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
// Back to the previous state.
assert_eq!(state, State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
}
Ok(())
}
#[async_test]
async fn test_recover_state_after_delay() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
let mut state_machine = StateMachine::new();
state_machine.state_lifespan = Duration::from_millis(50);
{
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::SettingUp);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
// Time passes.
sleep(Duration::from_millis(100)).await;
{
// Time has elapsed, time to recover.
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
// Time passes, again. Just to test everything is going well.
sleep(Duration::from_millis(100)).await;
{
// Time has elapsed, time to recover.
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Recovering);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
state_machine.set(state_machine.next(sliding_sync).await?);
assert_eq!(state_machine.get(), State::Running);
}
Ok(())