mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-12 18:21:21 -04:00
refactor(ui): Rename the scheduler task to supervisor task
From cambridge a scheduler is defined as:
> someone whose job is to create or work with schedules
While supervisor is defined as:
> a person whose job is to supervise someone or something
Well ok, that doesn't tell us much, supervise is defined as:
> to watch a person or activity to make certain that everything is done correctly, safely, etc.:
In conclusion, supervising a task is the more common and better
understood terminology here I would say.
This commit is contained in:
@@ -72,18 +72,18 @@ struct SyncServiceInner {
|
||||
encryption_sync_service: Arc<EncryptionSyncService>,
|
||||
state: SharedObservable<State>,
|
||||
encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
|
||||
/// Scheduler task ensuring proper termination.
|
||||
/// Supervisor task ensuring proper termination.
|
||||
///
|
||||
/// This task is waiting for a `TerminationReport` from any of the other two
|
||||
/// tasks, or from a user request via [`Self::stop()`]. It makes sure
|
||||
/// that the two services are properly shut up and just interrupted.
|
||||
///
|
||||
/// This is set at the same time as the other two tasks.
|
||||
scheduler_task: Option<JoinHandle<()>>,
|
||||
supervisor_task: Option<JoinHandle<()>>,
|
||||
/// `TerminationReport` sender for the [`Self::stop()`] function.
|
||||
///
|
||||
/// This is set at the same time as all the tasks in [`Self::start()`].
|
||||
scheduler_sender: Option<Sender<TerminationReport>>,
|
||||
supervisor_sender: Option<Sender<TerminationReport>>,
|
||||
}
|
||||
|
||||
pub struct SyncService {
|
||||
@@ -124,11 +124,11 @@ impl SyncService {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
/// The role of the scheduler task is to wait for a termination message
|
||||
/// The role of the supervisor task is to wait for a termination message
|
||||
/// (`TerminationReport`), sent either because we wanted to stop both
|
||||
/// syncs, or because one of the syncs failed (in which case we'll stop
|
||||
/// the other one too).
|
||||
fn spawn_scheduler_task(
|
||||
fn spawn_supervisor_task(
|
||||
&self,
|
||||
inner: &SyncServiceInner,
|
||||
room_list_task: JoinHandle<()>,
|
||||
@@ -149,7 +149,7 @@ impl SyncService {
|
||||
let (stop_room_list, stop_encryption) = match &report.origin {
|
||||
TerminationOrigin::EncryptionSync => (true, false),
|
||||
TerminationOrigin::RoomList => (false, true),
|
||||
TerminationOrigin::Scheduler => (true, true),
|
||||
TerminationOrigin::Supervisor => (true, true),
|
||||
};
|
||||
|
||||
// Stop both services, and wait for the streams to properly finish: at some
|
||||
@@ -187,13 +187,13 @@ impl SyncService {
|
||||
}
|
||||
|
||||
state.set(State::Error);
|
||||
} else if matches!(report.origin, TerminationOrigin::Scheduler) {
|
||||
} else if matches!(report.origin, TerminationOrigin::Supervisor) {
|
||||
state.set(State::Idle);
|
||||
} else {
|
||||
state.set(State::Terminated);
|
||||
}
|
||||
}
|
||||
.instrument(tracing::span!(Level::WARN, "scheduler task"))
|
||||
.instrument(tracing::span!(Level::WARN, "supervisor task"))
|
||||
}
|
||||
|
||||
async fn encryption_sync_task(
|
||||
@@ -317,9 +317,9 @@ impl SyncService {
|
||||
sync_permit_guard,
|
||||
));
|
||||
|
||||
// Spawn the scheduler task.
|
||||
inner.scheduler_sender = Some(sender);
|
||||
inner.scheduler_task = Some(spawn(self.spawn_scheduler_task(
|
||||
// Spawn the supervisor task.
|
||||
inner.supervisor_sender = Some(sender);
|
||||
inner.supervisor_task = Some(spawn(self.spawn_supervisor_task(
|
||||
&inner,
|
||||
room_list_task,
|
||||
encryption_sync_task,
|
||||
@@ -352,33 +352,33 @@ impl SyncService {
|
||||
// later, so that we're in a clean state independently of the request to
|
||||
// stop.
|
||||
|
||||
let sender = inner.scheduler_sender.clone();
|
||||
let sender = inner.supervisor_sender.clone();
|
||||
sender
|
||||
.ok_or_else(|| {
|
||||
error!("missing sender");
|
||||
Error::InternalSchedulerError
|
||||
Error::InternalSupervisorError
|
||||
})?
|
||||
.send(TerminationReport {
|
||||
is_error: false,
|
||||
has_expired: false,
|
||||
origin: TerminationOrigin::Scheduler,
|
||||
origin: TerminationOrigin::Supervisor,
|
||||
})
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!("when sending termination report: {err}");
|
||||
Error::InternalSchedulerError
|
||||
Error::InternalSupervisorError
|
||||
})?;
|
||||
|
||||
let scheduler_task = inner.scheduler_task.take();
|
||||
scheduler_task
|
||||
let supervisor_task = inner.supervisor_task.take();
|
||||
supervisor_task
|
||||
.ok_or_else(|| {
|
||||
error!("missing scheduler task");
|
||||
Error::InternalSchedulerError
|
||||
error!("missing supervisor task");
|
||||
Error::InternalSupervisorError
|
||||
})?
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!("couldn't finish scheduler task: {err}");
|
||||
Error::InternalSchedulerError
|
||||
error!("couldn't finish supervisor task: {err}");
|
||||
Error::InternalSupervisorError
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
@@ -398,7 +398,7 @@ impl SyncService {
|
||||
enum TerminationOrigin {
|
||||
EncryptionSync,
|
||||
RoomList,
|
||||
Scheduler,
|
||||
Supervisor,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -412,8 +412,8 @@ struct TerminationReport {
|
||||
#[doc(hidden)]
|
||||
impl SyncService {
|
||||
/// Return the existential states of internal tasks.
|
||||
pub async fn task_state(&self) -> bool {
|
||||
self.inner.lock().await.scheduler_task.is_some()
|
||||
pub async fn is_supervisor_running(&self) -> bool {
|
||||
self.inner.lock().await.supervisor_task.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,8 +474,8 @@ impl SyncServiceBuilder {
|
||||
encryption_sync_permit: encryption_sync_permit.clone(),
|
||||
|
||||
inner: Arc::new(AsyncMutex::new(SyncServiceInner {
|
||||
scheduler_task: None,
|
||||
scheduler_sender: None,
|
||||
supervisor_task: None,
|
||||
supervisor_sender: None,
|
||||
room_list_service,
|
||||
encryption_sync_service: encryption_sync,
|
||||
state,
|
||||
@@ -496,6 +496,6 @@ pub enum Error {
|
||||
#[error(transparent)]
|
||||
EncryptionSync(#[from] encryption_sync_service::Error),
|
||||
|
||||
#[error("the scheduler channel has run into an unexpected error")]
|
||||
InternalSchedulerError,
|
||||
#[error("the supervisor channel has run into an unexpected error")]
|
||||
InternalSupervisorError,
|
||||
}
|
||||
|
||||
@@ -73,19 +73,19 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
|
||||
// At first, the sync service is sleeping.
|
||||
assert_eq!(state_stream.get(), State::Idle);
|
||||
assert!(server.received_requests().await.unwrap().is_empty());
|
||||
assert_eq!(sync_service.task_state().await, false);
|
||||
assert!(!sync_service.is_supervisor_running().await);
|
||||
assert!(sync_service.try_get_encryption_sync_permit().is_some());
|
||||
|
||||
// After starting, the sync service is, well, running.
|
||||
sync_service.start().await;
|
||||
assert_next_matches!(state_stream, State::Running);
|
||||
assert_eq!(sync_service.task_state().await, true);
|
||||
assert!(sync_service.is_supervisor_running().await);
|
||||
assert!(sync_service.try_get_encryption_sync_permit().is_none());
|
||||
|
||||
// Restarting while started doesn't change the current state.
|
||||
sync_service.start().await;
|
||||
assert_pending!(state_stream);
|
||||
assert_eq!(sync_service.task_state().await, true);
|
||||
assert!(sync_service.is_supervisor_running().await);
|
||||
assert!(sync_service.try_get_encryption_sync_permit().is_none());
|
||||
|
||||
// Let the server respond a few times.
|
||||
@@ -94,7 +94,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
|
||||
// Pausing will stop both syncs, after a bit of delay.
|
||||
sync_service.stop().await?;
|
||||
assert_next_matches!(state_stream, State::Idle);
|
||||
assert_eq!(sync_service.task_state().await, false);
|
||||
assert!(!sync_service.is_supervisor_running().await);
|
||||
assert!(sync_service.try_get_encryption_sync_permit().is_some());
|
||||
|
||||
let mut num_encryption_sync_requests: i32 = 0;
|
||||
@@ -149,7 +149,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
|
||||
// the same position than just before being stopped.
|
||||
sync_service.start().await;
|
||||
assert_next_matches!(state_stream, State::Running);
|
||||
assert_eq!(sync_service.task_state().await, true);
|
||||
assert!(sync_service.is_supervisor_running().await);
|
||||
assert!(sync_service.try_get_encryption_sync_permit().is_none());
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
Reference in New Issue
Block a user