From 173ec75bb33c8a1bd8ace5c2c59678f69e88de9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 16 Jan 2025 13:29:36 +0100 Subject: [PATCH] refactor(ui): Move common data of the SyncService under a lock Previously we had a lock protecting an empty value, but the logic wants to protect a bunch of data in the SyncService. Let's do the usual thing and create a SyncServiceInner which holds the data and protect that with a lock. --- crates/matrix-sdk-ui/src/sync_service.rs | 143 +++++++++--------- .../tests/integration/sync_service.rs | 10 +- 2 files changed, 74 insertions(+), 79 deletions(-) diff --git a/crates/matrix-sdk-ui/src/sync_service.rs b/crates/matrix-sdk-ui/src/sync_service.rs index 752d3eb6f..8ace8da86 100644 --- a/crates/matrix-sdk-ui/src/sync_service.rs +++ b/crates/matrix-sdk-ui/src/sync_service.rs @@ -23,7 +23,7 @@ //! MUST observe. Whenever an error/termination is observed, the user MUST call //! [`SyncService::start()`] again to restart the room list sync. -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use eyeball::{SharedObservable, Subscriber}; use futures_core::Future; @@ -67,35 +67,11 @@ pub enum State { Error, } -pub struct SyncService { - /// Room list service used to synchronize the rooms state. +struct SyncServiceInner { room_list_service: Arc, - - /// Encryption sync taking care of e2ee events. encryption_sync_service: Arc, - - /// What's the state of this sync service? state: SharedObservable, - - /// Use a mutex everytime to modify the `state` value, otherwise it would be - /// possible to have race conditions when starting or pausing the - /// service multiple times really quickly. - modifying_state: AsyncMutex<()>, - - /// Task running the room list service. - room_list_task: Arc>>>, - - /// Task running the encryption sync. - encryption_sync_task: Arc>>>, - - /// Global lock to allow using at most one `EncryptionSyncService` at all - /// times. - /// - /// This ensures that there's only one ever existing in the application's - /// lifetime (under the assumption that there is at most one - /// `SyncService` per application). encryption_sync_permit: Arc>, - /// Scheduler task ensuring proper termination. /// /// This task is waiting for a `TerminationReport` from any of the other two @@ -103,12 +79,32 @@ pub struct SyncService { /// that the two services are properly shut up and just interrupted. /// /// This is set at the same time as the other two tasks. - scheduler_task: Arc>>>, - + scheduler_task: Option>, /// `TerminationReport` sender for the [`Self::stop()`] function. /// /// This is set at the same time as all the tasks in [`Self::start()`]. - scheduler_sender: Mutex>>, + scheduler_sender: Option>, +} + +pub struct SyncService { + inner: Arc>, + + /// Room list service used to synchronize the rooms state. + room_list_service: Arc, + + /// What's the state of this sync service? This field is replicated from the + /// [`SyncServiceInner`] struct, but it should not be modified in this + /// struct. It's re-exposed here so we can subscribe to the state + /// without taking the lock on the `inner` field. + state: SharedObservable, + + /// Global lock to allow using at most one [`EncryptionSyncService`] at all + /// times. + /// + /// This ensures that there's only one ever existing in the application's + /// lifetime (under the assumption that there is at most one [`SyncService`] + /// per application). + encryption_sync_permit: Arc>, } impl SyncService { @@ -134,13 +130,14 @@ impl SyncService { /// the other one too). fn spawn_scheduler_task( &self, + inner: &SyncServiceInner, + room_list_task: JoinHandle<()>, + encryption_sync_task: JoinHandle<()>, mut receiver: Receiver, ) -> impl Future { - let encryption_sync_task = self.encryption_sync_task.clone(); - let encryption_sync = self.encryption_sync_service.clone(); - let room_list_service = self.room_list_service.clone(); - let room_list_task = self.room_list_task.clone(); - let state = self.state.clone(); + let encryption_sync = inner.encryption_sync_service.clone(); + let room_list_service = inner.room_list_service.clone(); + let state = inner.state.clone(); async move { let Some(report) = receiver.recv().await else { @@ -165,13 +162,8 @@ impl SyncService { } } - { - let task = room_list_task.lock().unwrap().take(); - if let Some(task) = task { - if let Err(err) = task.await { - error!("when awaiting room list service: {err:#}"); - } - } + if let Err(err) = room_list_task.await { + error!("when awaiting room list service: {err:#}"); } if stop_encryption { @@ -180,13 +172,8 @@ impl SyncService { } } - { - let task = encryption_sync_task.lock().unwrap().take(); - if let Some(task) = task { - if let Err(err) = task.await { - error!("when awaiting encryption sync: {err:#}"); - } - } + if let Err(err) = encryption_sync_task.await { + error!("when awaiting encryption sync: {err:#}"); } if report.is_error { @@ -306,7 +293,7 @@ impl SyncService { /// - if the stream has been aborted before, it will be properly cleaned up /// and restarted. pub async fn start(&self) { - let _guard = self.modifying_state.lock().await; + let mut inner = self.inner.lock().await; // Only (re)start the tasks if any was stopped. if matches!(self.state.get(), State::Running) { @@ -319,20 +306,25 @@ impl SyncService { let (sender, receiver) = tokio::sync::mpsc::channel(16); // First, take care of the room list. - *self.room_list_task.lock().unwrap() = - Some(spawn(Self::room_list_sync_task(self.room_list_service.clone(), sender.clone()))); + let room_list_task = + spawn(Self::room_list_sync_task(self.room_list_service.clone(), sender.clone())); // Then, take care of the encryption sync. - let sync_permit_guard = self.encryption_sync_permit.clone().lock_owned().await; - *self.encryption_sync_task.lock().unwrap() = Some(spawn(Self::encryption_sync_task( - self.encryption_sync_service.clone(), + let sync_permit_guard = inner.encryption_sync_permit.clone().lock_owned().await; + let encryption_sync_task = spawn(Self::encryption_sync_task( + inner.encryption_sync_service.clone(), sender.clone(), sync_permit_guard, - ))); + )); // Spawn the scheduler task. - *self.scheduler_sender.lock().unwrap() = Some(sender); - *self.scheduler_task.lock().unwrap() = Some(spawn(self.spawn_scheduler_task(receiver))); + inner.scheduler_sender = Some(sender); + inner.scheduler_task = Some(spawn(self.spawn_scheduler_task( + &inner, + room_list_task, + encryption_sync_task, + receiver, + ))); self.state.set(State::Running); } @@ -344,7 +336,7 @@ impl SyncService { /// necessary. #[instrument(skip_all)] pub async fn stop(&self) -> Result<(), Error> { - let _guard = self.modifying_state.lock().await; + let mut inner = self.inner.lock().await; match self.state.get() { State::Idle | State::Terminated | State::Error => { @@ -360,7 +352,7 @@ impl SyncService { // later, so that we're in a clean state independently of the request to // stop. - let sender = self.scheduler_sender.lock().unwrap().clone(); + let sender = inner.scheduler_sender.clone(); sender .ok_or_else(|| { error!("missing sender"); @@ -377,7 +369,7 @@ impl SyncService { Error::InternalSchedulerError })?; - let scheduler_task = self.scheduler_task.lock().unwrap().take(); + let scheduler_task = inner.scheduler_task.take(); scheduler_task .ok_or_else(|| { error!("missing scheduler task"); @@ -420,11 +412,8 @@ struct TerminationReport { #[doc(hidden)] impl SyncService { /// Return the existential states of internal tasks. - pub fn task_states(&self) -> (bool, bool) { - ( - self.encryption_sync_task.lock().unwrap().is_some(), - self.room_list_task.lock().unwrap().is_some(), - ) + pub async fn task_state(&self) -> bool { + self.inner.lock().await.scheduler_task.is_some() } } @@ -476,16 +465,22 @@ impl SyncServiceBuilder { .await?, ); + let room_list_service = Arc::new(room_list); + let state = SharedObservable::new(State::Idle); + Ok(SyncService { - room_list_service: Arc::new(room_list), - encryption_sync_service: encryption_sync, - encryption_sync_task: Arc::new(Mutex::new(None)), - room_list_task: Arc::new(Mutex::new(None)), - scheduler_task: Arc::new(Mutex::new(None)), - scheduler_sender: Mutex::new(None), - state: SharedObservable::new(State::Idle), - modifying_state: AsyncMutex::new(()), - encryption_sync_permit, + state: state.clone(), + room_list_service: room_list_service.clone(), + encryption_sync_permit: encryption_sync_permit.clone(), + + inner: Arc::new(AsyncMutex::new(SyncServiceInner { + scheduler_task: None, + scheduler_sender: None, + room_list_service, + encryption_sync_service: encryption_sync, + state, + encryption_sync_permit, + })), }) } } diff --git a/crates/matrix-sdk-ui/tests/integration/sync_service.rs b/crates/matrix-sdk-ui/tests/integration/sync_service.rs index dae23a426..80a1d1ef6 100644 --- a/crates/matrix-sdk-ui/tests/integration/sync_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/sync_service.rs @@ -73,19 +73,19 @@ async fn test_sync_service_state() -> anyhow::Result<()> { // At first, the sync service is sleeping. assert_eq!(state_stream.get(), State::Idle); assert!(server.received_requests().await.unwrap().is_empty()); - assert_eq!(sync_service.task_states(), (false, false)); + assert_eq!(sync_service.task_state().await, false); assert!(sync_service.try_get_encryption_sync_permit().is_some()); // After starting, the sync service is, well, running. sync_service.start().await; assert_next_matches!(state_stream, State::Running); - assert_eq!(sync_service.task_states(), (true, true)); + assert_eq!(sync_service.task_state().await, true); assert!(sync_service.try_get_encryption_sync_permit().is_none()); // Restarting while started doesn't change the current state. sync_service.start().await; assert_pending!(state_stream); - assert_eq!(sync_service.task_states(), (true, true)); + assert_eq!(sync_service.task_state().await, true); assert!(sync_service.try_get_encryption_sync_permit().is_none()); // Let the server respond a few times. @@ -94,7 +94,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> { // Pausing will stop both syncs, after a bit of delay. sync_service.stop().await?; assert_next_matches!(state_stream, State::Idle); - assert_eq!(sync_service.task_states(), (false, false)); + assert_eq!(sync_service.task_state().await, false); assert!(sync_service.try_get_encryption_sync_permit().is_some()); let mut num_encryption_sync_requests: i32 = 0; @@ -149,7 +149,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> { // the same position than just before being stopped. sync_service.start().await; assert_next_matches!(state_stream, State::Running); - assert_eq!(sync_service.task_states(), (true, true)); + assert_eq!(sync_service.task_state().await, true); assert!(sync_service.try_get_encryption_sync_permit().is_none()); tokio::time::sleep(Duration::from_millis(100)).await;