feat(ui): Implement RoomList::stop_sync

feat(ui): Implement `RoomList::stop_sync`
This commit is contained in:
Ivan Enderlin
2023-06-21 16:23:06 +02:00
committed by GitHub
4 changed files with 369 additions and 91 deletions

View File

@@ -87,17 +87,27 @@ pub struct RoomList {
#[uniffi::export]
impl RoomList {
fn sync(&self) -> Arc<TaskHandle> {
fn sync(&self) {
let this = self.inner.clone();
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
RUNTIME.spawn(async move {
let sync_stream = this.sync();
pin_mut!(sync_stream);
while sync_stream.next().await.is_some() {
// keep going!
}
})))
});
}
fn stop_sync(&self) -> Result<(), RoomListError> {
self.inner.stop_sync().map_err(Into::into)
}
fn is_syncing(&self) -> bool {
use matrix_sdk_ui::room_list::State;
matches!(self.inner.state().get(), State::SettingUp | State::Running)
}
fn state(&self, listener: Box<dyn RoomListStateListener>) -> Arc<TaskHandle> {
@@ -195,9 +205,9 @@ pub struct RoomListEntriesLoadingStateResult {
#[derive(uniffi::Enum)]
pub enum RoomListState {
Init,
FirstRooms,
AllRooms,
CarryOn,
SettingUp,
Running,
Error,
Terminated,
}
@@ -207,9 +217,9 @@ impl From<matrix_sdk_ui::room_list::State> for RoomListState {
match value {
Init => Self::Init,
FirstRooms => Self::FirstRooms,
AllRooms => Self::AllRooms,
CarryOn => Self::CarryOn,
SettingUp => Self::SettingUp,
Running => Self::Running,
Error { .. } => Self::Error,
Terminated { .. } => Self::Terminated,
}
}

View File

@@ -147,9 +147,9 @@ impl RoomList {
///
/// The `RoomList`' state machine is run by this method.
///
/// Stopping the [`Stream`] (i.e. stop polling it) and calling
/// [`Self::sync`] again will resume from the previous state of the state
/// machine.
/// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
/// calling [`Self::sync`] again will resume from the previous state of
/// the state machine.
pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
stream! {
let sync = self.sliding_sync.sync();
@@ -174,7 +174,7 @@ impl RoomList {
}
Some(Err(error)) => {
let next_state = State::Terminated { from: Box::new(self.state.get()) };
let next_state = State::Error { from: Box::new(self.state.get()) };
self.state.set(next_state);
yield Err(Error::SlidingSync(error));
@@ -193,6 +193,23 @@ impl RoomList {
}
}
/// Force to stop the sync of the room list started by [`Self::sync`].
///
/// It's better to call this method rather than stop polling the `Stream`
/// returned by [`Self::sync`] because it will force the cancellation and
/// exit the sync-loop, i.e. it will cancel any in-flight HTTP requests,
/// cancel any pending futures etc.
///
/// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
/// until it returns `None`, because of [`Self::stop_sync`], so that it
/// ensures the states are correctly placed.
///
/// Stopping the sync of the room list via this method will put the
/// state-machine into the [`State::Terminated`] state.
pub fn stop_sync(&self) -> Result<(), Error> {
self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
}
/// Get a subscriber to the state.
pub fn state(&self) -> Subscriber<State> {
self.state.subscribe()

View File

@@ -21,18 +21,17 @@ pub enum State {
/// That's the first initial state.
Init,
/// At this state, the first rooms start to be synced.
FirstRooms,
/// At this state, the first rooms are starting to sync.
SettingUp,
/// At this state, all rooms start to be synced.
AllRooms,
/// At this state, all rooms are syncing, and the visible rooms + invites
/// lists exist.
Running,
/// This state is the cruising speed, i.e. the “normal” state, where nothing
/// fancy happens: all rooms are syncing, and life is great.
CarryOn,
/// At this state, the sync has been stopped because an error happened.
Error { from: Box<State> },
/// At this state, the sync has been stopped (because it was requested, or
/// because it has errored too many times previously).
/// At this state, the sync has been stopped because it was requested.
Terminated { from: Box<State> },
}
@@ -43,29 +42,29 @@ impl State {
use State::*;
let (next_state, actions) = match self {
Init => (FirstRooms, Actions::none()),
FirstRooms => (AllRooms, Actions::first_rooms_are_loaded()),
AllRooms => (CarryOn, Actions::none()),
CarryOn => (CarryOn, Actions::none()),
// If the state was `Terminated`, the next state is calculated again, because it means
// the sync has been restarted. In this case, let's jump back on the
// previous state that led to the termination. No action is required in this
// scenario.
Terminated { from: previous_state } => {
Init => (SettingUp, Actions::none()),
SettingUp => (Running, Actions::first_rooms_are_loaded()),
Running => (Running, Actions::none()),
// If the state was `Error` or `Terminated`, the next state is calculated again, because
// it means the sync has been restarted. In this case, let's jump back on
// the previous state that led to the termination. No action is required in
// this scenario.
Error { from: previous_state } | Terminated { from: previous_state } => {
match previous_state.as_ref() {
state @ Init | state @ FirstRooms => {
state @ Init | state @ SettingUp => {
// Do nothing.
(state.to_owned(), Actions::none())
}
state @ AllRooms | state @ CarryOn => {
Running => {
// Refresh the lists.
(state.to_owned(), Actions::refresh_lists())
(Running, Actions::refresh_lists())
}
Terminated { .. } => {
// Having `Terminated { from: Terminated { … } }` is not allowed.
unreachable!("It's impossible to reach `Terminated` from `Terminated`");
Error { .. } | Terminated { .. } => {
// Having `Error { from: Error { .. } }` or `Terminated { from: Terminated {
// … } }` is not allowed.
unreachable!("It's impossible to reach `Error` from `Error`, or `Terminated` from `Terminated`");
}
}
}
@@ -224,6 +223,12 @@ mod tests {
// First state.
let state = State::Init;
// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::Init);
}
// Hypothetical termination.
{
let state =
@@ -233,46 +238,53 @@ mod tests {
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::FirstRooms);
assert_eq!(state, State::SettingUp);
// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::SettingUp);
}
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::FirstRooms);
assert_eq!(state, State::SettingUp);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::AllRooms);
assert_eq!(state, State::Running);
// Hypothetical error.
{
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::Running);
}
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::AllRooms);
assert_eq!(state, State::Running);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::CarryOn);
assert_eq!(state, State::Running);
// Hypothetical termination.
// Hypothetical error.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::CarryOn);
let state = State::Error { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::Running);
}
// Next state.
let state = state.next(sliding_sync).await?;
assert_eq!(state, State::CarryOn);
// Hypothetical termination.
{
let state =
State::Terminated { from: Box::new(state.clone()) }.next(sliding_sync).await?;
assert_eq!(state, State::CarryOn);
assert_eq!(state, State::Running);
}
Ok(())

View File

@@ -201,7 +201,7 @@ macro_rules! assert_entries_stream {
}
#[async_test]
async fn test_sync_from_init_to_enjoy() -> Result<(), Error> {
async fn test_sync_all_states() -> Result<(), Error> {
let (server, room_list) = new_room_list().await?;
let (entries_loading_state, mut entries_loading_state_stream) =
@@ -214,7 +214,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => FirstRooms,
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -277,7 +277,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = FirstRooms => AllRooms,
states = SettingUp => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -349,7 +349,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = AllRooms => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -391,7 +391,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = CarryOn => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -433,7 +433,7 @@ async fn test_sync_from_init_to_enjoy() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = CarryOn => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -491,7 +491,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => FirstRooms,
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -519,7 +519,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = FirstRooms => AllRooms,
states = SettingUp => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -561,7 +561,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = AllRooms => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -600,7 +600,7 @@ async fn test_sync_resumes_from_previous_state() -> Result<(), Error> {
}
#[async_test]
async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
async fn test_sync_resumes_from_error() -> Result<(), Error> {
let (server, room_list) = new_room_list().await?;
let sync = room_list.sync();
@@ -610,7 +610,7 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
sync matches Some(Err(_)),
states = Init => Terminated { .. },
states = Init => Error { .. },
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -632,10 +632,10 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
let sync = room_list.sync();
pin_mut!(sync);
// Do a regular sync from the `Terminated` state.
// Do a regular sync from the `Error` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => FirstRooms,
states = Error { .. } => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -655,15 +655,15 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
},
};
// Simulate an error from the `FirstRooms` state.
// Simulate an error from the `SettingUp` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
sync matches Some(Err(_)),
states = FirstRooms => Terminated { .. },
states = SettingUp => Error { .. },
assert request >= {
"lists": {
ALL_ROOMS: {
// In `FirstRooms`, the sync-mode has changed to growing, with
// In `SettingUp`, the sync-mode has changed to growing, with
// its initial range.
"ranges": [[0, 49]],
},
@@ -693,14 +693,14 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
// Update the viewport, just to be sure it's not reset later.
room_list.apply_input(Input::Viewport(vec![5..=10])).await?;
// Do a regular sync from the `Terminated` state.
// Do a regular sync from the `Error` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => AllRooms,
states = Error { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
// In `AllRooms`, the sync-mode is still growing, but the range
// In `Running`, the sync-mode is still growing, but the range
// hasn't been modified due to previous error.
"ranges": [[0, 49]],
},
@@ -728,15 +728,15 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
},
};
// Simulate an error from the `AllRooms` state.
// Simulate an error from the `Running` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
sync matches Some(Err(_)),
states = AllRooms => Terminated { .. },
states = Running => Error { .. },
assert request >= {
"lists": {
ALL_ROOMS: {
// In `AllRooms`, the sync-mode is still growing, and the range
// In `Running`, the sync-mode is still growing, and the range
// has made progress.
"ranges": [[0, 99]],
},
@@ -763,10 +763,10 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
let sync = room_list.sync();
pin_mut!(sync);
// Do a regular sync from the `Terminated` state.
// Do a regular sync from the `Error` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => CarryOn,
states = Error { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -797,11 +797,11 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
},
};
// Do a regular sync from the `CarryOn` state to update the `ALL_ROOMS` list
// Do a regular sync from the `Running` state to update the `ALL_ROOMS` list
// again.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = CarryOn => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -829,11 +829,11 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
},
};
// Simulate an error from the `CarryOn` state.
// Simulate an error from the `Running` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
sync matches Some(Err(_)),
states = CarryOn => Terminated { .. },
states = Running => Error { .. },
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -864,10 +864,10 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
let sync = room_list.sync();
pin_mut!(sync);
// Do a regular sync from the `Terminated` state.
// Do a regular sync from the `Error` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => CarryOn,
states = Error { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -900,6 +900,245 @@ async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
Ok(())
}
#[async_test]
async fn test_sync_resumes_from_terminated() -> Result<(), Error> {
let (server, room_list) = new_room_list().await?;
// Let's stop the sync before actually syncing (we never know!).
// We get an error, obviously.
assert!(room_list.stop_sync().is_err());
let sync = room_list.sync();
pin_mut!(sync);
// Do a first sync.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
// The default range, in selective sync-mode.
"ranges": [[0, 19]],
},
},
},
respond with = {
"pos": "1",
"lists": {
ALL_ROOMS: {
"count": 110,
},
},
"rooms": {},
},
};
// Stop the sync.
room_list.stop_sync()?;
assert!(sync.next().await.is_none());
// Start a new sync.
let sync = room_list.sync();
pin_mut!(sync);
// Do a regular sync from the `Terminated` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
// In `SettingUp`, the sync-mode has changed to growing, with
// its initial range.
"ranges": [[0, 49]],
},
VISIBLE_ROOMS: {
// Hello new list.
"ranges": [[0, 19]],
},
INVITES: {
// Hello new list.
"ranges": [[0, 99]],
},
},
},
respond with = {
"pos": "2",
"lists": {
ALL_ROOMS: {
"count": 110,
},
},
"rooms": {},
},
};
// Stop the sync.
room_list.stop_sync()?;
assert!(sync.next().await.is_none());
// Start a new sync.
let sync = room_list.sync();
pin_mut!(sync);
// Update the viewport, just to be sure it's not reset later.
room_list.apply_input(Input::Viewport(vec![5..=10])).await?;
// Do a regular sync from the `Terminated` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
// In `Running`, the sync-mode is still growing, but the range
// hasn't been modified due to previous termination.
"ranges": [[0, 49]],
},
VISIBLE_ROOMS: {
// We have set a viewport, which reflects here.
"ranges": [[5, 10]],
},
INVITES: {
// The range hasn't been modified due to previous termination.
"ranges": [[0, 99]],
},
},
},
respond with = {
"pos": "2",
"lists": {
ALL_ROOMS: {
"count": 110,
},
INVITES: {
"count": 3,
}
},
"rooms": {},
},
};
// Stop the sync.
room_list.stop_sync()?;
assert!(sync.next().await.is_none());
// Start a new sync.
let sync = room_list.sync();
pin_mut!(sync);
// Do a regular sync from the `Terminated` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
// In `Running`, the sync-mode is still growing, but the range
// hasn't been modified due to the previous termination.
"ranges": [[0, 49]],
},
VISIBLE_ROOMS: {
// Despites the termination, the range is kept.
"ranges": [[5, 10]],
},
INVITES: {
// Despites the error, the range has made progress.
"ranges": [[0, 2]],
},
},
},
respond with = {
"pos": "3",
"lists": {
ALL_ROOMS: {
"count": 110,
},
INVITES: {
"count": 0,
}
},
"rooms": {},
},
};
// Do a regular sync from the `Running` state to update the `ALL_ROOMS` list
// again.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
// No termination. The range is making progress.
"ranges": [[0, 99]],
},
VISIBLE_ROOMS: {
// No termination. The range is still here.
"ranges": [[5, 10]],
},
INVITES: {
// The range is making progress.
"ranges": [[0, 0]],
},
},
},
respond with = {
"pos": "4",
"lists": {
ALL_ROOMS: {
"count": 110,
},
},
"rooms": {},
},
};
// Stop the sync.
room_list.stop_sync()?;
assert!(sync.next().await.is_none());
// Start a new sync.
let sync = room_list.sync();
pin_mut!(sync);
// Do a regular sync from the `Terminated` state.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Terminated { .. } => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
// A termination was received at the previous sync iteration.
// The list is still in growing sync-mode, but its range has
// been reset.
"ranges": [[0, 49]],
},
VISIBLE_ROOMS: {
// The range is still here.
"ranges": [[5, 10]],
},
INVITES: {
// The range is kept as it was.
"ranges": [[0, 0]],
},
},
},
respond with = {
"pos": "5",
"lists": {
ALL_ROOMS: {
"count": 110,
},
},
"rooms": {},
},
};
Ok(())
}
#[async_test]
async fn test_entries_stream() -> Result<(), Error> {
let (server, room_list) = new_room_list().await?;
@@ -912,7 +1151,7 @@ async fn test_entries_stream() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => FirstRooms,
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -970,7 +1209,7 @@ async fn test_entries_stream() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = FirstRooms => AllRooms,
states = SettingUp => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1045,7 +1284,7 @@ async fn test_entries_stream_with_updated_filter() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => FirstRooms,
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1099,7 +1338,7 @@ async fn test_entries_stream_with_updated_filter() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = FirstRooms => AllRooms,
states = SettingUp => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1186,7 +1425,7 @@ async fn test_invites_stream() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => FirstRooms,
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1212,7 +1451,7 @@ async fn test_invites_stream() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = FirstRooms => AllRooms,
states = SettingUp => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1272,7 +1511,7 @@ async fn test_invites_stream() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = AllRooms => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1828,7 +2067,7 @@ async fn test_input_viewport() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = Init => FirstRooms,
states = Init => SettingUp,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1845,7 +2084,7 @@ async fn test_input_viewport() -> Result<(), Error> {
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = FirstRooms => AllRooms,
states = SettingUp => Running,
assert request >= {
"lists": {
ALL_ROOMS: {
@@ -1872,7 +2111,7 @@ async fn test_input_viewport() -> Result<(), Error> {
// The `timeline_limit` is not repeated because it's sticky.
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
states = AllRooms => CarryOn,
states = Running => Running,
assert request >= {
"lists": {
ALL_ROOMS: {