refactor(ui): Move common data of the SyncService under a lock

Previously we had a lock protecting an empty value, but the logic wants
to protect a bunch of data in the SyncService.

Let's do the usual thing and create a SyncServiceInner which holds the
data and protect that with a lock.
This commit is contained in:
Damir Jelić
2025-01-16 13:29:36 +01:00
parent 1d3f8bf898
commit 173ec75bb3
2 changed files with 74 additions and 79 deletions

View File

@@ -23,7 +23,7 @@
//! MUST observe. Whenever an error/termination is observed, the user MUST call
//! [`SyncService::start()`] again to restart the room list sync.
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use eyeball::{SharedObservable, Subscriber};
use futures_core::Future;
@@ -67,35 +67,11 @@ pub enum State {
Error,
}
pub struct SyncService {
/// Room list service used to synchronize the rooms state.
struct SyncServiceInner {
room_list_service: Arc<RoomListService>,
/// Encryption sync taking care of e2ee events.
encryption_sync_service: Arc<EncryptionSyncService>,
/// What's the state of this sync service?
state: SharedObservable<State>,
/// Use a mutex everytime to modify the `state` value, otherwise it would be
/// possible to have race conditions when starting or pausing the
/// service multiple times really quickly.
modifying_state: AsyncMutex<()>,
/// Task running the room list service.
room_list_task: Arc<Mutex<Option<JoinHandle<()>>>>,
/// Task running the encryption sync.
encryption_sync_task: Arc<Mutex<Option<JoinHandle<()>>>>,
/// Global lock to allow using at most one `EncryptionSyncService` at all
/// times.
///
/// This ensures that there's only one ever existing in the application's
/// lifetime (under the assumption that there is at most one
/// `SyncService` per application).
encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
/// Scheduler task ensuring proper termination.
///
/// This task is waiting for a `TerminationReport` from any of the other two
@@ -103,12 +79,32 @@ pub struct SyncService {
/// that the two services are properly shut up and just interrupted.
///
/// This is set at the same time as the other two tasks.
scheduler_task: Arc<Mutex<Option<JoinHandle<()>>>>,
scheduler_task: Option<JoinHandle<()>>,
/// `TerminationReport` sender for the [`Self::stop()`] function.
///
/// This is set at the same time as all the tasks in [`Self::start()`].
scheduler_sender: Mutex<Option<Sender<TerminationReport>>>,
scheduler_sender: Option<Sender<TerminationReport>>,
}
pub struct SyncService {
inner: Arc<AsyncMutex<SyncServiceInner>>,
/// Room list service used to synchronize the rooms state.
room_list_service: Arc<RoomListService>,
/// What's the state of this sync service? This field is replicated from the
/// [`SyncServiceInner`] struct, but it should not be modified in this
/// struct. It's re-exposed here so we can subscribe to the state
/// without taking the lock on the `inner` field.
state: SharedObservable<State>,
/// Global lock to allow using at most one [`EncryptionSyncService`] at all
/// times.
///
/// This ensures that there's only one ever existing in the application's
/// lifetime (under the assumption that there is at most one [`SyncService`]
/// per application).
encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
}
impl SyncService {
@@ -134,13 +130,14 @@ impl SyncService {
/// the other one too).
fn spawn_scheduler_task(
&self,
inner: &SyncServiceInner,
room_list_task: JoinHandle<()>,
encryption_sync_task: JoinHandle<()>,
mut receiver: Receiver<TerminationReport>,
) -> impl Future<Output = ()> {
let encryption_sync_task = self.encryption_sync_task.clone();
let encryption_sync = self.encryption_sync_service.clone();
let room_list_service = self.room_list_service.clone();
let room_list_task = self.room_list_task.clone();
let state = self.state.clone();
let encryption_sync = inner.encryption_sync_service.clone();
let room_list_service = inner.room_list_service.clone();
let state = inner.state.clone();
async move {
let Some(report) = receiver.recv().await else {
@@ -165,13 +162,8 @@ impl SyncService {
}
}
{
let task = room_list_task.lock().unwrap().take();
if let Some(task) = task {
if let Err(err) = task.await {
error!("when awaiting room list service: {err:#}");
}
}
if let Err(err) = room_list_task.await {
error!("when awaiting room list service: {err:#}");
}
if stop_encryption {
@@ -180,13 +172,8 @@ impl SyncService {
}
}
{
let task = encryption_sync_task.lock().unwrap().take();
if let Some(task) = task {
if let Err(err) = task.await {
error!("when awaiting encryption sync: {err:#}");
}
}
if let Err(err) = encryption_sync_task.await {
error!("when awaiting encryption sync: {err:#}");
}
if report.is_error {
@@ -306,7 +293,7 @@ impl SyncService {
/// - if the stream has been aborted before, it will be properly cleaned up
/// and restarted.
pub async fn start(&self) {
let _guard = self.modifying_state.lock().await;
let mut inner = self.inner.lock().await;
// Only (re)start the tasks if any was stopped.
if matches!(self.state.get(), State::Running) {
@@ -319,20 +306,25 @@ impl SyncService {
let (sender, receiver) = tokio::sync::mpsc::channel(16);
// First, take care of the room list.
*self.room_list_task.lock().unwrap() =
Some(spawn(Self::room_list_sync_task(self.room_list_service.clone(), sender.clone())));
let room_list_task =
spawn(Self::room_list_sync_task(self.room_list_service.clone(), sender.clone()));
// Then, take care of the encryption sync.
let sync_permit_guard = self.encryption_sync_permit.clone().lock_owned().await;
*self.encryption_sync_task.lock().unwrap() = Some(spawn(Self::encryption_sync_task(
self.encryption_sync_service.clone(),
let sync_permit_guard = inner.encryption_sync_permit.clone().lock_owned().await;
let encryption_sync_task = spawn(Self::encryption_sync_task(
inner.encryption_sync_service.clone(),
sender.clone(),
sync_permit_guard,
)));
));
// Spawn the scheduler task.
*self.scheduler_sender.lock().unwrap() = Some(sender);
*self.scheduler_task.lock().unwrap() = Some(spawn(self.spawn_scheduler_task(receiver)));
inner.scheduler_sender = Some(sender);
inner.scheduler_task = Some(spawn(self.spawn_scheduler_task(
&inner,
room_list_task,
encryption_sync_task,
receiver,
)));
self.state.set(State::Running);
}
@@ -344,7 +336,7 @@ impl SyncService {
/// necessary.
#[instrument(skip_all)]
pub async fn stop(&self) -> Result<(), Error> {
let _guard = self.modifying_state.lock().await;
let mut inner = self.inner.lock().await;
match self.state.get() {
State::Idle | State::Terminated | State::Error => {
@@ -360,7 +352,7 @@ impl SyncService {
// later, so that we're in a clean state independently of the request to
// stop.
let sender = self.scheduler_sender.lock().unwrap().clone();
let sender = inner.scheduler_sender.clone();
sender
.ok_or_else(|| {
error!("missing sender");
@@ -377,7 +369,7 @@ impl SyncService {
Error::InternalSchedulerError
})?;
let scheduler_task = self.scheduler_task.lock().unwrap().take();
let scheduler_task = inner.scheduler_task.take();
scheduler_task
.ok_or_else(|| {
error!("missing scheduler task");
@@ -420,11 +412,8 @@ struct TerminationReport {
#[doc(hidden)]
impl SyncService {
/// Return the existential states of internal tasks.
pub fn task_states(&self) -> (bool, bool) {
(
self.encryption_sync_task.lock().unwrap().is_some(),
self.room_list_task.lock().unwrap().is_some(),
)
pub async fn task_state(&self) -> bool {
self.inner.lock().await.scheduler_task.is_some()
}
}
@@ -476,16 +465,22 @@ impl SyncServiceBuilder {
.await?,
);
let room_list_service = Arc::new(room_list);
let state = SharedObservable::new(State::Idle);
Ok(SyncService {
room_list_service: Arc::new(room_list),
encryption_sync_service: encryption_sync,
encryption_sync_task: Arc::new(Mutex::new(None)),
room_list_task: Arc::new(Mutex::new(None)),
scheduler_task: Arc::new(Mutex::new(None)),
scheduler_sender: Mutex::new(None),
state: SharedObservable::new(State::Idle),
modifying_state: AsyncMutex::new(()),
encryption_sync_permit,
state: state.clone(),
room_list_service: room_list_service.clone(),
encryption_sync_permit: encryption_sync_permit.clone(),
inner: Arc::new(AsyncMutex::new(SyncServiceInner {
scheduler_task: None,
scheduler_sender: None,
room_list_service,
encryption_sync_service: encryption_sync,
state,
encryption_sync_permit,
})),
})
}
}

View File

@@ -73,19 +73,19 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
// At first, the sync service is sleeping.
assert_eq!(state_stream.get(), State::Idle);
assert!(server.received_requests().await.unwrap().is_empty());
assert_eq!(sync_service.task_states(), (false, false));
assert_eq!(sync_service.task_state().await, false);
assert!(sync_service.try_get_encryption_sync_permit().is_some());
// After starting, the sync service is, well, running.
sync_service.start().await;
assert_next_matches!(state_stream, State::Running);
assert_eq!(sync_service.task_states(), (true, true));
assert_eq!(sync_service.task_state().await, true);
assert!(sync_service.try_get_encryption_sync_permit().is_none());
// Restarting while started doesn't change the current state.
sync_service.start().await;
assert_pending!(state_stream);
assert_eq!(sync_service.task_states(), (true, true));
assert_eq!(sync_service.task_state().await, true);
assert!(sync_service.try_get_encryption_sync_permit().is_none());
// Let the server respond a few times.
@@ -94,7 +94,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
// Pausing will stop both syncs, after a bit of delay.
sync_service.stop().await?;
assert_next_matches!(state_stream, State::Idle);
assert_eq!(sync_service.task_states(), (false, false));
assert_eq!(sync_service.task_state().await, false);
assert!(sync_service.try_get_encryption_sync_permit().is_some());
let mut num_encryption_sync_requests: i32 = 0;
@@ -149,7 +149,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
// the same position than just before being stopped.
sync_service.start().await;
assert_next_matches!(state_stream, State::Running);
assert_eq!(sync_service.task_states(), (true, true));
assert_eq!(sync_service.task_state().await, true);
assert!(sync_service.try_get_encryption_sync_permit().is_none());
tokio::time::sleep(Duration::from_millis(100)).await;