sched: introduce Work as the unified scheduleable unit

Refactor the scheduler so all scheduleable work is wrapped in Arc<Work>,
replacing the previous per-CPU wait_q design where sleeping tasks were
bound to a specific CPU. Wakers now hold direct Arc<Work> references and
can re-enqueue tasks on any CPU upon wakeup.

Key changes:

- Add Work struct wrapping OwnedTask with an AtomicTaskState and
  scheduler metadata (SchedulerData), replacing the old SchedulableTask.
  Remove Task::state (Arc<SpinLock<TaskState>>). Work::state is now the
  single source of truth for task state.

- Rewrite the run queue using BinaryHeap-based eligible/ineligible split
  (EEVDF) with a dedicated VClock, replacing the BTreeMap linear scan.
  Extract vclock into its own module.

- Rewrite wakers to hold Arc<Work> directly instead of looking up tasks
  by TaskDescriptor from TASK_LIST.

- Replace lock-based sleep transitions in uspc_ret with atomic CAS
  (try_sleep_current) that correctly detects concurrent Woken state.

- Simplify least-tasked-CPU metric to use only run-queue weight, since
  sleeping tasks are no longer bound to any CPU.

- Add current_work() accessor.
This commit is contained in:
Matthew Leach
2026-03-10 22:00:56 +00:00
parent e12af349ad
commit 5ebfc29cd2
22 changed files with 825 additions and 623 deletions

12
Cargo.lock generated
View File

@@ -46,6 +46,17 @@ dependencies = [
"syn",
]
[[package]]
name = "atomic_enum"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -547,6 +558,7 @@ dependencies = [
"arm-pl011-uart",
"arm_pl031",
"async-trait",
"atomic_enum",
"bitflags 2.11.0",
"blake2",
"chacha20",

View File

@@ -45,6 +45,7 @@ rustc-hash = { version = "2.1", default-features = false }
smoltcp = { version = "0.12.0", default-features = false, features = ["alloc", "medium-ethernet", "medium-ip", "proto-ipv4", "proto-ipv6", "socket-tcp", "socket-udp"] }
tock-registers = "0.10.1"
virtio-drivers = "0.13.0"
atomic_enum = "0.3.0"
[build-dependencies]
time = { version = "0.3.47", features = ["formatting", "macros"] } # For build timestamping via build.rs

View File

@@ -66,7 +66,6 @@ use crate::{
socket::sys_socket,
},
process::{
TaskState,
caps::{sys_capget, sys_capset},
clone::sys_clone,
creds::{
@@ -98,7 +97,7 @@ use crate::{
},
threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address},
},
sched::{current::current_task, sys_sched_yield},
sched::{self, current::current_task, sched_task::state::TaskState, sys_sched_yield},
};
use alloc::boxed::Box;
use libkernel::{
@@ -389,10 +388,12 @@ pub async fn handle_syscall() {
0x5d => {
let _ = sys_exit(arg1 as _).await;
debug_assert!(matches!(
*current_task().state.lock_save_irq(),
TaskState::Finished
));
debug_assert!(
sched::current_work()
.state
.load(core::sync::atomic::Ordering::Acquire)
== TaskState::Finished
);
// Don't process result on exit.
return;
@@ -400,10 +401,12 @@ pub async fn handle_syscall() {
0x5e => {
let _ = sys_exit_group(arg1 as _).await;
debug_assert!(matches!(
*current_task().state.lock_save_irq(),
TaskState::Finished
));
debug_assert!(
sched::current_work()
.state
.load(core::sync::atomic::Ordering::Acquire)
== TaskState::Finished
);
// Don't process result on exit.
return;

View File

@@ -93,7 +93,7 @@ impl SimpleFile for ProcTaskFileInode {
};
let status_string = if let Some(task) = task_details {
let state = *task.state.lock_save_irq();
let state = task.state.load(core::sync::atomic::Ordering::Relaxed);
let name = task.comm.lock_save_irq();
match self.file_type {
TaskFileType::Status => format!(

View File

@@ -6,7 +6,7 @@ use super::{
ClaimedInterrupt, InterruptConfig, InterruptDescriptor, InterruptHandler, get_interrupt_root,
};
use crate::kernel::cpu_id::CpuId;
use crate::process::owned::OwnedTask;
use crate::sched::sched_task::Work;
use crate::{
arch::ArchImpl,
drivers::Driver,
@@ -14,7 +14,6 @@ use crate::{
sched,
sync::{OnceLock, SpinLock},
};
use alloc::boxed::Box;
use alloc::{sync::Arc, vec::Vec};
use libkernel::{
CpuOps,
@@ -23,7 +22,8 @@ use libkernel::{
use log::warn;
pub enum Message {
PutTask(Box<OwnedTask>),
EnqueueWork(Arc<Work>),
#[expect(unused)]
WakeupTask(Waker),
}
@@ -50,7 +50,7 @@ impl InterruptHandler for CpuMessenger {
.try_pop()
{
match message {
Message::PutTask(task) => sched::insert_task(task),
Message::EnqueueWork(work) => sched::insert_work(work),
Message::WakeupTask(waker) => waker.wake(),
}
}

View File

@@ -60,6 +60,7 @@ pub async fn sys_capget(hdrp: TUA<CapUserHeader>, datap: TUA<CapUserData>) -> Re
.iter()
.find(|task| task.0.tgid.value() == header.pid as u32)
.and_then(|task| task.1.upgrade())
.map(|x| x.t_shared.clone())
.ok_or(KernelError::NoProcess)?
};
match header.version {
@@ -95,6 +96,7 @@ pub async fn sys_capset(hdrp: TUA<CapUserHeader>, datap: TUA<CapUserData>) -> Re
.iter()
.find(|task| task.0.tgid.value() == header.pid as u32)
.and_then(|task| task.1.upgrade())
.map(|x| x.t_shared.clone())
.ok_or(KernelError::NoProcess)?
};

View File

@@ -1,10 +1,10 @@
use super::owned::OwnedTask;
use super::ptrace::{PTrace, TracePoint, ptrace_stop};
use super::{ctx::Context, thread_group::signal::SigSet};
use crate::kernel::cpu_id::CpuId;
use crate::memory::uaccess::copy_to_user;
use crate::sched::sched_task::Work;
use crate::{
process::{TASK_LIST, Task, TaskState},
process::{TASK_LIST, Task},
sched::{self, current::current_task},
sync::SpinLock,
};
@@ -170,8 +170,6 @@ pub async fn sys_clone(
cwd,
root,
creds: SpinLock::new(creds),
state: Arc::new(SpinLock::new(TaskState::Runnable)),
last_cpu: SpinLock::new(CpuId::this()),
ptrace: SpinLock::new(ptrace),
utime: AtomicUsize::new(0),
stime: AtomicUsize::new(0),
@@ -181,28 +179,29 @@ pub async fn sys_clone(
}
};
let tid = new_task.tid;
let desc = new_task.descriptor();
let work = Work::new(Box::new(new_task));
TASK_LIST
.lock_save_irq()
.insert(new_task.descriptor(), Arc::downgrade(&new_task.t_shared));
.insert(desc, Arc::downgrade(&work));
new_task
.process
work.process
.tasks
.lock_save_irq()
.insert(tid, Arc::downgrade(&new_task.t_shared));
.insert(desc.tid, Arc::downgrade(&work));
sched::insert_task_cross_cpu(work);
sched::insert_task_cross_cpu(Box::new(new_task));
NUM_FORKS.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
// Honour CLONE_*SETTID semantics for the parent and (shared-VM) child.
if flags.contains(CloneFlags::CLONE_PARENT_SETTID) && !parent_tidptr.is_null() {
copy_to_user(parent_tidptr, tid.value()).await?;
copy_to_user(parent_tidptr, desc.tid.value()).await?;
}
if flags.contains(CloneFlags::CLONE_CHILD_SETTID) && !child_tidptr.is_null() {
copy_to_user(child_tidptr, tid.value()).await?;
copy_to_user(child_tidptr, desc.tid.value()).await?;
}
Ok(tid.value() as _)
Ok(desc.tid.value() as _)
}

View File

@@ -1,10 +1,10 @@
use super::{
TASK_LIST, TaskState,
TASK_LIST,
ptrace::{TracePoint, ptrace_stop},
thread_group::{ProcessState, Tgid, ThreadGroup, signal::SigId, wait::ChildState},
threading::futex::{self, key::FutexKey},
};
use crate::sched::current::current_task;
use crate::sched::{self, current::current_task};
use crate::{memory::uaccess::copy_to_user, sched::current::current_task_shared};
use alloc::vec::Vec;
use libkernel::error::Result;
@@ -34,7 +34,8 @@ pub fn do_exit_group(exit_code: ChildState) {
if *process_state != ProcessState::Running {
// We're already on our way out. Just kill this thread.
drop(process_state);
*task.state.lock_save_irq() = TaskState::Finished;
drop(task);
sched::current_work().state.finish();
return;
}
@@ -51,7 +52,7 @@ pub fn do_exit_group(exit_code: ChildState) {
// TODO: Send an IPI/Signal to halt execution now. For now, just
// wait for the scheduler to never schedule any of it's tasks
// again.
*other_thread.state.lock_save_irq() = TaskState::Finished;
other_thread.state.finish();
}
}
}
@@ -87,7 +88,8 @@ pub fn do_exit_group(exit_code: ChildState) {
.set_signal(SigId::SIGCHLD);
// 5. This thread is now finished.
*task.state.lock_save_irq() = TaskState::Finished;
drop(task);
sched::current_work().state.finish();
// NOTE: that the scheduler will never execute the task again since it's
// state is set to Finished.
@@ -151,7 +153,7 @@ pub async fn sys_exit(exit_code: usize) -> Result<usize> {
Ok(0)
} else {
// Mark our own state as finished.
*task.state.lock_save_irq() = TaskState::Finished;
sched::current_work().state.finish();
// Remove ourself from the process's thread list.
tasks_lock.remove(&task.tid);

View File

@@ -1,5 +1,6 @@
use crate::drivers::timer::Instant;
use crate::sched::CPU_STAT;
use crate::sched::sched_task::Work;
use crate::{
arch::ArchImpl,
kernel::cpu_id::CpuId,
@@ -14,7 +15,6 @@ use alloc::{
collections::btree_map::BTreeMap,
sync::{Arc, Weak},
};
use core::fmt::Display;
use core::sync::atomic::{AtomicUsize, Ordering};
use creds::Credentials;
use fd_table::FileDescriptorTable;
@@ -124,35 +124,6 @@ impl TaskDescriptor {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskState {
Running,
Runnable,
Woken,
Stopped,
Sleeping,
Finished,
}
impl Display for TaskState {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let state_str = match self {
TaskState::Running => "R",
TaskState::Runnable => "R",
TaskState::Woken => "W",
TaskState::Stopped => "T",
TaskState::Sleeping => "S",
TaskState::Finished => "Z",
};
write!(f, "{state_str}")
}
}
impl TaskState {
pub fn is_finished(self) -> bool {
matches!(self, Self::Finished)
}
}
pub type ProcVM = ProcessVM<<ArchImpl as VirtualMemory>::ProcessAddressSpace>;
#[derive(Copy, Clone)]
@@ -184,8 +155,6 @@ pub struct Task {
pub root: Arc<SpinLock<(Arc<dyn Inode>, PathBuf)>>,
pub creds: SpinLock<Credentials>,
pub fd_table: Arc<SpinLock<FileDescriptorTable>>,
pub state: Arc<SpinLock<TaskState>>,
pub last_cpu: SpinLock<CpuId>,
pub ptrace: SpinLock<PTrace>,
pub utime: AtomicUsize,
pub stime: AtomicUsize,
@@ -308,7 +277,7 @@ impl Task {
}
}
pub fn find_task_by_descriptor(descriptor: &TaskDescriptor) -> Option<Arc<Task>> {
pub fn find_task_by_descriptor(descriptor: &TaskDescriptor) -> Option<Arc<Work>> {
TASK_LIST
.lock_save_irq()
.get(descriptor)
@@ -316,11 +285,11 @@ pub fn find_task_by_descriptor(descriptor: &TaskDescriptor) -> Option<Arc<Task>>
}
/// Finds the root task for the given thread group
pub fn find_process_by_tgid(tgid: Tgid) -> Option<Arc<Task>> {
pub fn find_process_by_tgid(tgid: Tgid) -> Option<Arc<Work>> {
find_task_by_descriptor(&TaskDescriptor::from_tgid_tid(tgid, Tid::from_tgid(tgid)))
}
pub static TASK_LIST: SpinLock<BTreeMap<TaskDescriptor, Weak<Task>>> =
pub static TASK_LIST: SpinLock<BTreeMap<TaskDescriptor, Weak<Work>>> =
SpinLock::new(BTreeMap::new());
unsafe impl Send for Task {}

View File

@@ -1,7 +1,5 @@
use core::ops::Deref;
use super::{
Comm, Task, TaskState, Tid,
Comm, Task, Tid,
creds::Credentials,
ctx::{Context, UserCtx},
fd_table::FileDescriptorTable,
@@ -13,14 +11,13 @@ use super::{
},
threading::RobustListHead,
};
use crate::drivers::timer::{Instant, now};
use crate::{arch::Arch, fs::DummyInode, sync::SpinLock};
use crate::{
arch::{Arch, ArchImpl},
fs::DummyInode,
kernel::cpu_id::CpuId,
sync::SpinLock,
arch::ArchImpl,
drivers::timer::{Instant, now},
};
use alloc::sync::Arc;
use core::ops::Deref;
use core::sync::atomic::AtomicUsize;
use libkernel::{
VirtualMemory,
@@ -72,13 +69,11 @@ impl OwnedTask {
tid: Tid::idle_for_cpu(),
comm: Arc::new(SpinLock::new(Comm::new("idle"))),
process: thread_group_builder.build(),
state: Arc::new(SpinLock::new(TaskState::Runnable)),
cwd: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))),
root: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))),
creds: SpinLock::new(Credentials::new_root()),
vm: Arc::new(SpinLock::new(vm)),
fd_table: Arc::new(SpinLock::new(FileDescriptorTable::new())),
last_cpu: SpinLock::new(CpuId::this()),
ptrace: SpinLock::new(PTrace::new()),
utime: AtomicUsize::new(0),
stime: AtomicUsize::new(0),
@@ -102,7 +97,6 @@ impl OwnedTask {
tid: Tid(1),
comm: Arc::new(SpinLock::new(Comm::new("init"))),
process: ThreadGroupBuilder::new(Tgid::init()).build(),
state: Arc::new(SpinLock::new(TaskState::Runnable)),
cwd: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))),
root: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))),
creds: SpinLock::new(Credentials::new_root()),
@@ -110,7 +104,6 @@ impl OwnedTask {
ProcessVM::empty().expect("Could not create init process's VM"),
)),
fd_table: Arc::new(SpinLock::new(FileDescriptorTable::new())),
last_cpu: SpinLock::new(CpuId::this()),
ptrace: SpinLock::new(PTrace::new()),
last_account: AtomicUsize::new(0),
utime: AtomicUsize::new(0),

View File

@@ -1,24 +1,25 @@
use core::future::poll_fn;
use core::task::{Poll, Waker};
use crate::arch::{Arch, ArchImpl};
use crate::fs::syscalls::iov::IoVec;
use crate::memory::uaccess::{copy_from_user, copy_to_user};
use crate::process::TASK_LIST;
use crate::process::thread_group::signal::SigId;
use crate::sched::current::{current_task, current_task_shared};
use super::thread_group::{ThreadGroup, wait::ChildState};
use crate::{
arch::{Arch, ArchImpl},
fs::syscalls::iov::IoVec,
memory::uaccess::{copy_from_user, copy_to_user},
process::{TASK_LIST, thread_group::signal::SigId},
sched::current::{current_task, current_task_shared},
};
use alloc::sync::Arc;
use bitflags::Flags;
use libkernel::error::{KernelError, Result};
use libkernel::memory::address::UA;
use core::{
future::poll_fn,
task::{Poll, Waker},
};
use libkernel::{
error::{KernelError, Result},
memory::address::UA,
};
use log::warn;
type GpRegs = <ArchImpl as Arch>::PTraceGpRegs;
use super::TaskState;
use super::thread_group::ThreadGroup;
use super::thread_group::wait::ChildState;
const PTRACE_EVENT_FORK: usize = 1;
const PTRACE_EVENT_VFORK: usize = 2;
const PTRACE_EVENT_CLONE: usize = 3;
@@ -43,7 +44,7 @@ bitflags::bitflags! {
const PTRACE_O_SUSPEND_SECCOMP = 1 << 21;
}
#[derive(Clone, Copy, PartialEq)]
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct TracePoint: u32 {
const SyscallEntry = 0x01;
const SyscallExit = 0x02;
@@ -177,9 +178,6 @@ impl PTrace {
}
pub fn set_waker(&mut self, waker: Waker) {
// Ensure we never override an already existing waker.
debug_assert!(self.waker.is_none());
self.waker = Some(waker);
}
@@ -259,22 +257,29 @@ impl TryFrom<i32> for PtraceOperation {
pub async fn ptrace_stop(point: TracePoint) -> bool {
let task_sh = current_task_shared();
{
let mut ptrace = task_sh.ptrace.lock_save_irq();
if ptrace.hit_trace_point(point, current_task().ctx.user()) {
ptrace.notify_tracer_of_trap(&task_sh.process);
} else {
return false;
}
}
let mut notified = false;
poll_fn(|cx| {
let mut ptrace = task_sh.ptrace.lock_save_irq();
if matches!(ptrace.state, Some(PTraceState::Running)) {
if !notified {
// First poll: hit the trace point, set waker, then notify.
// The waker must be set *before* notification so the tracer
// can always find it when it does PTRACE_SYSCALL/CONT.
if !ptrace.hit_trace_point(point, current_task().ctx.user()) {
return Poll::Ready(false);
}
notified = true;
ptrace.set_waker(cx.waker().clone());
ptrace.notify_tracer_of_trap(&task_sh.process);
Poll::Pending
} else if matches!(ptrace.state, Some(PTraceState::Running)) {
// Tracer resumed us.
Poll::Ready(true)
} else {
// Re-polled (e.g. spurious wakeup from signal) but tracer
// hasn't resumed yet. Refresh the waker and go back to sleep.
ptrace.set_waker(cx.waker().clone());
Poll::Pending
}
@@ -381,7 +386,9 @@ pub async fn sys_ptrace(op: i32, pid: u64, addr: UA, data: UA) -> Result<usize>
.break_points
.remove(TracePoint::SyscallEntry | TracePoint::SyscallExit);
*target_task.state.lock_save_irq() = TaskState::Runnable;
if let Some(waker) = ptrace.waker.take() {
waker.wake();
}
Ok(0)
}

View File

@@ -1,5 +1,12 @@
use super::{Task, TaskState, Tid};
use crate::{memory::uaccess::UserCopyable, sched::waker::create_waker, sync::SpinLock};
use super::Tid;
use crate::{
memory::uaccess::UserCopyable,
sched::{
sched_task::{Work, state::TaskState},
waker::create_waker,
},
sync::SpinLock,
};
use alloc::{
collections::btree_map::BTreeMap,
sync::{Arc, Weak},
@@ -95,7 +102,7 @@ pub struct ThreadGroup {
pub umask: SpinLock<u32>,
pub parent: SpinLock<Option<Weak<ThreadGroup>>>,
pub children: SpinLock<BTreeMap<Tgid, Arc<ThreadGroup>>>,
pub tasks: SpinLock<BTreeMap<Tid, Weak<Task>>>,
pub tasks: SpinLock<BTreeMap<Tid, Weak<Work>>>,
pub signals: Arc<SpinLock<SignalActionState>>,
pub rsrc_lim: Arc<SpinLock<ResourceLimits>>,
pub pending_signals: SpinLock<SigSet>,
@@ -165,13 +172,10 @@ impl ThreadGroup {
*self.pending_signals.lock_save_irq() = SigSet::SIGKILL;
for task in self.tasks.lock_save_irq().values() {
if let Some(task) = task.upgrade()
&& matches!(
*task.state.lock_save_irq(),
TaskState::Stopped | TaskState::Sleeping
)
{
create_waker(task.descriptor()).wake();
if let Some(task) = task.upgrade() {
// Wake will handle Sleeping/Stopped → Enqueue,
// and Running/Pending* → PreventedSleep (sets Woken).
create_waker(task).wake();
}
}
}
@@ -182,8 +186,12 @@ impl ThreadGroup {
for task in self.tasks.lock_save_irq().values() {
if let Some(task) = task.upgrade()
&& matches!(
*task.state.lock_save_irq(),
TaskState::Runnable | TaskState::Running
task.state.load(Ordering::Acquire),
TaskState::Runnable
| TaskState::Running
| TaskState::Woken
| TaskState::PendingSleep
| TaskState::PendingStop
)
{
// Signal delivered. This task will eventually be
@@ -196,7 +204,7 @@ impl ThreadGroup {
// No task will pick up the signal. Wake one up.
for task in self.tasks.lock_save_irq().values() {
if let Some(task) = task.upgrade() {
create_waker(task.descriptor()).wake();
create_waker(task).wake();
return;
}
}

View File

@@ -2,7 +2,7 @@ use crate::{
per_cpu_private,
process::{Task, owned::OwnedTask},
};
use alloc::{boxed::Box, sync::Arc};
use alloc::sync::Arc;
use core::{
cell::Cell,
marker::PhantomData,
@@ -78,8 +78,8 @@ impl CurrentTaskPtr {
}
}
pub(super) fn set_current(&self, task: &mut Box<OwnedTask>) {
self.ptr.set(Box::as_mut_ptr(task));
pub(super) fn set_current(&self, task: *mut OwnedTask) {
self.ptr.set(task);
}
}

View File

@@ -1,23 +1,20 @@
use crate::arch::ArchImpl;
use crate::drivers::timer::{Instant, now};
#[cfg(feature = "smp")]
use crate::interrupts::cpu_messenger::{Message, message_cpu};
use crate::kernel::cpu_id::CpuId;
use crate::process::owned::OwnedTask;
use crate::{
arch::Arch,
per_cpu_private, per_cpu_shared,
process::{TASK_LIST, TaskDescriptor, TaskState},
};
use alloc::{boxed::Box, collections::btree_map::BTreeMap, sync::Arc};
use crate::{per_cpu_private, per_cpu_shared, process::TASK_LIST};
use alloc::{boxed::Box, sync::Arc};
use core::fmt::Debug;
use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use core::task::Waker;
use core::time::Duration;
use current::{CUR_TASK_PTR, current_task};
use libkernel::{UserAddressSpace, error::Result};
use libkernel::error::Result;
use log::warn;
use runqueue::{RunQueue, SwitchResult};
use sched_task::SchedulableTask;
use runqueue::RunQueue;
use sched_task::Work;
use waker::create_waker;
pub mod current;
mod runqueue;
@@ -128,12 +125,10 @@ pub fn spawn_kernel_work(fut: impl Future<Output = ()> + 'static + Send) {
/// Global atomic storing info about the least-tasked CPU.
/// First 16 bits: CPU ID
/// Next 24 bits: Weight
/// Next 24 bits: Number of waiting tasks
/// Remaining 48 bits: Run-queue weight
#[cfg(feature = "smp")]
static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0);
const WEIGHT_SHIFT: u32 = 16;
const WAITING_SHIFT: u32 = WEIGHT_SHIFT + 24;
#[cfg(feature = "smp")]
fn get_best_cpu() -> CpuId {
@@ -143,49 +138,35 @@ fn get_best_cpu() -> CpuId {
}
/// Insert the given task onto a CPU's run queue.
pub fn insert_task(task: Box<OwnedTask>) {
SCHED_STATE
.borrow_mut()
.insert_into_runq(SchedulableTask::new(task));
pub fn insert_work(work: Arc<Work>) {
SCHED_STATE.borrow_mut().run_q.add_work(work);
}
#[cfg(feature = "smp")]
pub fn insert_task_cross_cpu(task: Box<OwnedTask>) {
pub fn insert_task_cross_cpu(task: Arc<Work>) {
let cpu = get_best_cpu();
if cpu == CpuId::this() {
insert_task(task);
SCHED_STATE.borrow_mut().run_q.add_work(task);
} else {
message_cpu(cpu, Message::PutTask(task)).expect("Failed to send task to CPU");
message_cpu(cpu, Message::EnqueueWork(task)).expect("Failed to send task to CPU");
}
}
#[cfg(not(feature = "smp"))]
pub fn insert_task_cross_cpu(task: Box<OwnedTask>) {
insert_task(task);
pub fn insert_task_cross_cpu(task: Arc<Work>) {
insert_work(task);
}
pub struct SchedState {
run_q: RunQueue,
wait_q: BTreeMap<TaskDescriptor, Box<SchedulableTask>>,
/// Per-CPU virtual clock (fixed-point 65.63 stored in a u128).
/// Expressed in virtual-time units as defined by the EEVDF paper.
vclock: u128,
/// Real-time moment when `vclock` was last updated.
last_update: Option<Instant>,
/// Force a reschedule.
force_resched: bool,
}
unsafe impl Send for SchedState {}
impl SchedState {
pub const fn new() -> Self {
pub fn new() -> Self {
Self {
run_q: RunQueue::new(),
wait_q: BTreeMap::new(),
vclock: 0,
last_update: None,
force_resched: false,
}
}
@@ -211,22 +192,16 @@ impl SchedState {
*LAST_UPDATE.borrow_mut() = now();
let weight = self.run_q.weight();
let waiting_tasks = self.wait_q.len() as u64;
let cpu_id = CpuId::this().value() as u64;
let new_info = (cpu_id & 0xffff)
| ((weight & 0xffffff) << WEIGHT_SHIFT)
| ((waiting_tasks & 0xffffff) << WAITING_SHIFT);
let new_info = (cpu_id & 0xffff) | ((weight & 0xffffffffffff) << WEIGHT_SHIFT);
let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
// Ensure we don't spin forever (possible with a larger number of CPUs)
const MAX_RETRIES: usize = 8;
// Ensure consistency
for _ in 0..MAX_RETRIES {
let old_cpu_id = old_info & 0xffff;
let old_weight = (old_info >> WEIGHT_SHIFT) & 0xffffff;
let old_waiting = (old_info >> WAITING_SHIFT) & 0xffffff;
let metric = weight + (waiting_tasks * SCHED_WEIGHT_BASE as u64);
let old_metric = old_weight + (old_waiting * SCHED_WEIGHT_BASE as u64);
if (cpu_id == old_cpu_id && old_info != new_info) || (metric < old_metric) {
let old_weight = old_info >> WEIGHT_SHIFT;
if (cpu_id == old_cpu_id && old_info != new_info) || (weight < old_weight) {
match LEAST_TASKED_CPU_INFO.compare_exchange(
old_info,
new_info,
@@ -245,160 +220,57 @@ impl SchedState {
// No-op on single-core systems.
}
/// Advance the per-CPU virtual clock (`vclock`) by converting the elapsed
/// real time since the last update into 65.63-format fixed-point
/// virtual-time units:
/// v += (delta t << VT_FIXED_SHIFT) / sum w
/// The caller must pass the current real time (`now_inst`).
fn advance_vclock(&mut self, now_inst: Instant) {
if let Some(prev) = self.last_update {
let delta_real = now_inst - prev;
if self.run_q.weight() > 0 {
let delta_vt =
((delta_real.as_nanos()) << VT_FIXED_SHIFT) / self.run_q.weight() as u128;
self.vclock = self.vclock.saturating_add(delta_vt);
}
}
self.last_update = Some(now_inst);
}
fn insert_into_runq(&mut self, mut new_task: Box<SchedulableTask>) {
let now = now().expect("systimer not running");
self.advance_vclock(now);
new_task.inserting_into_runqueue(self.vclock);
if let Some(current) = self.run_q.current() {
// We force a reschedule if:
//
// We are currently idling, OR The new task has an earlier deadline
// than the current task.
if current.is_idle_task() || new_task.v_deadline < current.v_deadline {
self.force_resched = true;
}
}
self.run_q.enqueue_task(new_task);
self.update_global_least_tasked_cpu_info();
}
pub fn wakeup(&mut self, desc: TaskDescriptor) {
if let Some(task) = self.wait_q.remove(&desc) {
self.insert_into_runq(task);
} else {
warn!(
"Spurious wakeup for task {:?} on CPU {:?}",
desc,
CpuId::this().value()
);
}
}
pub fn do_schedule(&mut self) {
self.update_global_least_tasked_cpu_info();
// Update Clocks
let now_inst = now().expect("System timer not initialised");
self.advance_vclock(now_inst);
{
let current = self.run_q.current_mut();
let mut needs_resched = self.force_resched;
current.task.update_accounting(Some(now_inst));
if let Some(current) = self.run_q.current_mut() {
current.update_accounting(Some(now_inst));
// Reset accounting baseline after updating stats to avoid double-counting
// the same time interval on the next scheduler tick.
current.reset_last_account(now_inst);
// If the current task is IDLE, we always want to proceed to the
// scheduler core to see if a real task has arrived.
if current.is_idle_task() {
needs_resched = true;
} else if current.tick(now_inst) {
// Otherwise, check if the real task expired
needs_resched = true;
}
} else {
needs_resched = true;
current.task.reset_last_account(now_inst);
}
if !needs_resched
&& let Some(current) = self.run_q.current()
&& matches!(*current.state.lock_save_irq(), TaskState::Running)
{
// Fast Path: Only return if we have a valid task (Running state),
// it has budget, AND it's not the idle task.
return;
}
// Reset the force flag for next time.
self.force_resched = false;
// Select Next Task.
let next_task_desc = self.run_q.find_next_runnable_desc(self.vclock);
match self.run_q.switch_tasks(next_task_desc, now_inst) {
SwitchResult::AlreadyRunning => {
// Nothing to do.
return;
}
SwitchResult::Blocked { old_task } => {
// If the blocked task has finished, allow it to drop here so it's
// resources are released.
if !old_task.state.lock_save_irq().is_finished() {
self.wait_q.insert(old_task.descriptor(), old_task);
}
}
// fall-thru.
SwitchResult::Preempted => {}
}
// Update all context since the task has switched.
if let Some(new_current) = self.run_q.current_mut() {
NUM_CONTEXT_SWITCHES.fetch_add(1, Ordering::Relaxed);
ArchImpl::context_switch(new_current.t_shared.clone());
let now = now().unwrap();
new_current.reset_last_account(now);
CUR_TASK_PTR.borrow_mut().set_current(&mut new_current.task);
}
self.run_q.schedule(now_inst);
}
}
pub fn sched_init() {
let idle_task = ArchImpl::create_idle_task();
let init_task = OwnedTask::create_init_task();
init_task
.vm
.lock_save_irq()
.mm_mut()
.address_space_mut()
.activate();
*init_task.state.lock_save_irq() = TaskState::Runnable;
let init_work = Work::new(Box::new(init_task));
{
let mut task_list = TASK_LIST.lock_save_irq();
task_list.insert(idle_task.descriptor(), Arc::downgrade(&idle_task.t_shared));
task_list.insert(init_task.descriptor(), Arc::downgrade(&init_task.t_shared));
task_list.insert(init_work.task.descriptor(), Arc::downgrade(&init_work));
}
insert_task(Box::new(idle_task));
insert_task(Box::new(init_task));
insert_work(init_work);
schedule();
}
pub fn sched_init_secondary() {
let idle_task = ArchImpl::create_idle_task();
insert_task(Box::new(idle_task));
// Force update_global_least_tasked_cpu_info
SCHED_STATE.borrow().update_global_least_tasked_cpu_info();
schedule();
}
pub fn sys_sched_yield() -> Result<usize> {
schedule();
Ok(0)
}
pub fn current_work() -> Arc<Work> {
SCHED_STATE.borrow().run_q.current().task.clone()
}
pub fn current_work_waker() -> Waker {
create_waker(current_work())
}

View File

@@ -1,169 +0,0 @@
use core::cmp::Ordering;
use crate::{
drivers::timer::Instant,
process::{TaskDescriptor, TaskState},
};
use alloc::{boxed::Box, collections::btree_map::BTreeMap};
use log::warn;
use super::{VCLOCK_EPSILON, sched_task::SchedulableTask};
/// The result of a requested task switch.
pub enum SwitchResult {
/// The requested task is already running. No changes made.
AlreadyRunning,
/// A switch occurred. The previous task was Runnable and has been
/// re-queued.
Preempted,
/// A switch occurred. The previous task is Blocked (or Finished) and
/// ownership is returned to the caller (to handle sleep/wait queues).
Blocked { old_task: Box<SchedulableTask> },
}
/// A simple weight-tracking runqueue.
///
/// Invariants:
/// 1. `total_weight` = Sum(queue tasks) + Weight(running_task) (excluding the idle task).
/// 2. `running_task` is NOT in `queue`.
pub struct RunQueue {
total_weight: u64,
pub(super) queue: BTreeMap<TaskDescriptor, Box<SchedulableTask>>,
pub(super) running_task: Option<Box<SchedulableTask>>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
total_weight: 0,
queue: BTreeMap::new(),
running_task: None,
}
}
pub fn switch_tasks(&mut self, next_task: TaskDescriptor, now_inst: Instant) -> SwitchResult {
if let Some(current) = self.current()
&& current.descriptor() == next_task
{
return SwitchResult::AlreadyRunning;
}
let mut new_task = match self.queue.remove(&next_task) {
Some(t) => t,
None => {
warn!("Task {next_task:?} not found for switch.");
return SwitchResult::AlreadyRunning;
}
};
new_task.about_to_execute(now_inst);
// Perform the swap.
if let Some(old_task) = self.running_task.replace(new_task) {
let state = *old_task.state.lock_save_irq();
match state {
TaskState::Running | TaskState::Runnable => {
// Update state to strictly Runnable
*old_task.state.lock_save_irq() = TaskState::Runnable;
self.queue.insert(old_task.descriptor(), old_task);
return SwitchResult::Preempted;
}
_ => {
self.total_weight = self.total_weight.saturating_sub(old_task.weight() as u64);
return SwitchResult::Blocked { old_task };
}
}
}
// If there was no previous task (e.g., boot up), it counts as a
// Preemption.
SwitchResult::Preempted
}
pub fn weight(&self) -> u64 {
self.total_weight
}
#[allow(clippy::borrowed_box)]
pub fn current(&self) -> Option<&Box<SchedulableTask>> {
self.running_task.as_ref()
}
pub fn current_mut(&mut self) -> Option<&mut Box<SchedulableTask>> {
self.running_task.as_mut()
}
fn fallback_current_or_idle(&self) -> TaskDescriptor {
if let Some(ref current) = self.running_task {
let s = *current.state.lock_save_irq();
if !current.is_idle_task() && (s == TaskState::Runnable || s == TaskState::Running) {
return current.descriptor();
}
}
TaskDescriptor::this_cpus_idle()
}
/// Returns the Descriptor of the best task to run next. This compares the
/// best task in the run_queue against the currently running task.
pub fn find_next_runnable_desc(&self, vclock: u128) -> TaskDescriptor {
// Find the best candidate from the Run Queue
let best_queued_entry = self
.queue
.iter()
.filter(|(_, task)| {
!task.is_idle_task() && task.v_eligible.saturating_sub(vclock) <= VCLOCK_EPSILON
})
.min_by(|(_, t1), (_, t2)| t1.compare_with(t2));
let (best_queued_desc, best_queued_task) = match best_queued_entry {
Some((d, t)) => (*d, t),
// If runqueue is empty (or no eligible tasks), we might just run
// current or idle.
None => return self.fallback_current_or_idle(),
};
// Compare against the current task
if let Some(current) = self.current() {
// If current is not Runnable (e.g. it blocked, yielded, or
// finished), it cannot win.
let current_state = *current.state.lock_save_irq();
if current_state != TaskState::Runnable && current_state != TaskState::Running {
return best_queued_desc;
}
// compare current vs challenger
match current.compare_with(best_queued_task) {
Ordering::Less | Ordering::Equal => {
// Current is better (has earlier deadline) or equal. Keep
// running current.
return current.descriptor();
}
Ordering::Greater => {
// Queued task is better. Switch.
return best_queued_desc;
}
}
}
best_queued_desc
}
/// Inserts `task` into this CPU's run-queue.
pub fn enqueue_task(&mut self, new_task: Box<SchedulableTask>) {
if !new_task.is_idle_task() {
self.total_weight = self.total_weight.saturating_add(new_task.weight() as u64);
}
if let Some(old_task) = self.queue.insert(new_task.descriptor(), new_task) {
// Handle the edge case where we overwrite a task. If we replaced
// someone, we must subtract their weight to avoid accounting drift.
warn!("Overwrote active task {:?}", old_task.descriptor());
self.total_weight = self.total_weight.saturating_sub(old_task.weight() as u64);
}
}
}

220
src/sched/runqueue/mod.rs Normal file
View File

@@ -0,0 +1,220 @@
use super::{
CUR_TASK_PTR, NUM_CONTEXT_SWITCHES,
sched_task::{RunnableTask, Work, state::TaskState},
};
use crate::{
arch::{Arch, ArchImpl},
drivers::timer::Instant,
};
use alloc::{boxed::Box, collections::binary_heap::BinaryHeap, sync::Arc};
use core::{cmp, ptr, sync::atomic::Ordering};
use vclock::VClock;
mod vclock;
// Wrapper for the Ineligible Heap (Min-Heap ordered by v_eligible)
struct ByEligible(RunnableTask);
impl PartialEq for ByEligible {
fn eq(&self, other: &Self) -> bool {
self.0.v_eligible == other.0.v_eligible
}
}
impl Eq for ByEligible {}
impl Ord for ByEligible {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other.0.v_eligible.cmp(&self.0.v_eligible)
}
}
impl PartialOrd for ByEligible {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
// Wrapper for the Eligible Heap (Min-Heap ordered by deadline)
struct ByDeadline(RunnableTask);
impl PartialEq for ByDeadline {
fn eq(&self, other: &Self) -> bool {
other.0.compare_with(&self.0) == cmp::Ordering::Equal
}
}
impl Eq for ByDeadline {}
impl Ord for ByDeadline {
fn cmp(&self, other: &Self) -> cmp::Ordering {
// Reversed so BinaryHeap acts as a MIN-heap
other.0.compare_with(&self.0)
}
}
impl PartialOrd for ByDeadline {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
/// A simple weight-tracking runqueue.
///
/// Invariants:
/// 1. `total_weight` = Sum(queue tasks) + Weight(running_task) (excluding the idle task).
/// 2. `running_task` is NOT in `queue`.
pub struct RunQueue {
total_weight: u64,
ineligible: BinaryHeap<ByEligible>,
eligible: BinaryHeap<ByDeadline>,
pub(super) running_task: Option<RunnableTask>,
v_clock: VClock,
idle: RunnableTask,
}
impl RunQueue {
pub fn new() -> Self {
let idle = Work::new(Box::new(ArchImpl::create_idle_task())).into_runnable();
Self {
total_weight: 0,
ineligible: BinaryHeap::new(),
eligible: BinaryHeap::new(),
running_task: None,
v_clock: VClock::new(),
idle,
}
}
/// Picks the next task to execute and performs the context switch.
pub fn schedule(&mut self, now: Instant) {
self.v_clock.advance(now, self.weight());
let mut prev_task = ptr::null();
if let Some(cur_task) = self.running_task.take() {
let state = cur_task.task.state.load(Ordering::Acquire);
match state {
TaskState::Running | TaskState::Woken => {
// requeue for the next time slice.
prev_task = Arc::as_ptr(&cur_task.task);
self.enqueue(cur_task);
}
TaskState::PendingSleep | TaskState::PendingStop => {
// Task wants to deactivate. Drop the RunnableTask
// (restoring sched_data), then finalize the transition.
let work = cur_task.task.clone();
self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64);
drop(cur_task);
if !work.state.finalize_deactivation() {
// Woken concurrently — re-enqueue.
self.add_work(work);
}
}
_ => {
// Finished — remove weight.
self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64);
}
}
}
if let Some(mut next_task) = self.find_next_task() {
next_task.about_to_execute(now);
if Arc::as_ptr(&next_task.task) != prev_task {
// If we scheduled a different task than before, context switch.
NUM_CONTEXT_SWITCHES.fetch_add(1, Ordering::Relaxed);
next_task.switch_context();
next_task.task.reset_last_account(now);
}
CUR_TASK_PTR
.borrow_mut()
.set_current(Box::as_ptr(&next_task.task.task) as *mut _);
self.running_task = Some(next_task);
} else {
// No next task. Go idle.
self.idle.switch_context();
CUR_TASK_PTR
.borrow_mut()
.set_current(Box::as_ptr(&self.idle.task.task) as *mut _);
}
}
/// Pops any tasks that were ineligible which have become eligible from the
/// ineligible queue.
fn pop_now_eligible_task(&mut self) -> Option<RunnableTask> {
let ByEligible(tsk) = self.ineligible.peek()?;
if self.v_clock.is_task_eligible(tsk) {
let ByEligible(tsk) = self.ineligible.pop()?;
Some(tsk)
} else {
None
}
}
/// Returns the Descriptor of the best task to run next.
///
/// # Returns
/// - `None` when no new task can be found (the runqueue is empty).
/// - `Some(tsk)` when the current task should be replaced with `tsk`.
fn find_next_task(&mut self) -> Option<RunnableTask> {
while let Some(tsk) = self.pop_now_eligible_task() {
self.eligible.push(ByDeadline(tsk));
}
if let Some(ByDeadline(best)) = self.eligible.pop() {
return Some(best);
}
// Fast forward logic, if we have non-eligible, don't go idle.
// Fast-forward vclk to the next earliest `v_eligible`.
if let Some(ByEligible(tsk)) = self.ineligible.peek() {
self.v_clock.fast_forward(tsk.v_eligible);
return self.find_next_task();
}
// The runqueues are completely empty. Go idle.
None
}
fn enqueue(&mut self, task: RunnableTask) {
task.task.state.mark_runnable();
if self.v_clock.is_task_eligible(&task) {
self.eligible.push(ByDeadline(task));
} else {
self.ineligible.push(ByEligible(task));
}
}
/// Inserts `new_task` into this CPU's run-queue.
pub fn add_work(&mut self, new_task: Arc<Work>) {
let new_task = new_task.into_runnable();
self.total_weight = self.total_weight.saturating_add(new_task.weight() as u64);
self.enqueue(new_task);
}
pub fn weight(&self) -> u64 {
self.total_weight
}
#[allow(clippy::borrowed_box)]
pub fn current(&self) -> &RunnableTask {
self.running_task.as_ref().unwrap_or(&self.idle)
}
pub fn current_mut(&mut self) -> &mut RunnableTask {
self.running_task.as_mut().unwrap_or(&mut self.idle)
}
}

View File

@@ -0,0 +1,43 @@
use crate::{
drivers::timer::Instant,
sched::{VCLOCK_EPSILON, VT_FIXED_SHIFT, sched_task::RunnableTask},
};
pub struct VClock {
last_update: Option<Instant>,
clk: u128,
}
impl VClock {
pub fn new() -> Self {
Self {
last_update: None,
clk: 0,
}
}
pub fn is_task_eligible(&self, tsk: &RunnableTask) -> bool {
tsk.v_eligible.saturating_sub(self.clk) <= VCLOCK_EPSILON
}
/// Fast forward the clk to the specified clock, `new_clk`.
pub fn fast_forward(&mut self, new_clk: u128) {
self.clk = new_clk;
}
/// Advance the virtual clock (`v_clock`) by converting the elapsed real time
/// since the last update into 65.63-format fixed-point virtual-time units:
/// v += (delta t << VT_FIXED_SHIFT) / sum w The caller must pass the
/// current real time (`now_inst`).
pub fn advance(&mut self, now_inst: Instant, weight: u64) {
if let Some(prev) = self.last_update {
let delta_real = now_inst - prev;
if weight > 0 {
let delta_vt = ((delta_real.as_nanos()) << VT_FIXED_SHIFT) / weight as u128;
self.clk = self.clk.saturating_add(delta_vt);
}
}
self.last_update = Some(now_inst);
}
}

View File

@@ -3,18 +3,27 @@ use core::{
ops::{Deref, DerefMut},
};
use alloc::boxed::Box;
use super::{DEFAULT_TIME_SLICE, SCHED_WEIGHT_BASE, VT_FIXED_SHIFT};
use crate::{
arch::{Arch, ArchImpl},
drivers::timer::{Instant, schedule_preempt},
kernel::cpu_id::CpuId,
process::{TaskState, owned::OwnedTask},
process::owned::OwnedTask,
sync::SpinLock,
};
use super::{DEFAULT_TIME_SLICE, SCHED_WEIGHT_BASE, VT_FIXED_SHIFT};
use alloc::{boxed::Box, sync::Arc};
use state::TaskStateMachine;
pub struct SchedulableTask {
pub mod state;
pub struct Work {
pub task: Box<OwnedTask>,
pub state: TaskStateMachine,
pub sched_data: SpinLock<Option<SchedulerData>>,
}
#[derive(Clone)]
pub struct SchedulerData {
pub v_runtime: u128,
/// Virtual time at which the task becomes eligible (v_ei).
pub v_eligible: u128,
@@ -25,7 +34,32 @@ pub struct SchedulableTask {
pub last_run: Option<Instant>,
}
impl Deref for SchedulableTask {
impl SchedulerData {
fn new() -> Self {
Self {
v_runtime: 0,
v_eligible: 0,
v_deadline: 0,
exec_start: None,
deadline: None,
last_run: None,
}
}
}
pub struct RunnableTask {
pub(super) task: Arc<Work>,
pub(super) sched_data: SchedulerData,
}
impl Drop for RunnableTask {
// Replace the hot sched info back into the struct.
fn drop(&mut self) {
*self.task.sched_data.lock_save_irq() = Some(self.sched_data.clone());
}
}
impl Deref for Work {
type Target = OwnedTask;
fn deref(&self) -> &Self::Target {
@@ -33,27 +67,52 @@ impl Deref for SchedulableTask {
}
}
impl DerefMut for SchedulableTask {
impl DerefMut for Work {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.task
}
}
impl SchedulableTask {
pub fn new(task: Box<OwnedTask>) -> Box<Self> {
Box::new(Self {
impl Work {
pub fn new(task: Box<OwnedTask>) -> Arc<Self> {
Arc::new(Self {
task,
v_runtime: 0,
v_eligible: 0,
v_deadline: 0,
exec_start: None,
deadline: None,
last_run: None,
state: TaskStateMachine::new(),
sched_data: SpinLock::new(Some(SchedulerData::new())),
})
}
pub fn into_runnable(self: Arc<Self>) -> RunnableTask {
let sd = self
.sched_data
.lock_save_irq()
.take()
.expect("Should have sched data");
RunnableTask {
task: self,
sched_data: sd,
}
}
}
impl Deref for RunnableTask {
type Target = SchedulerData;
fn deref(&self) -> &Self::Target {
&self.sched_data
}
}
impl DerefMut for RunnableTask {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sched_data
}
}
impl RunnableTask {
/// Re-issue a virtual deadline
pub fn replenish_deadline(&mut self) {
fn replenish_deadline(&mut self) {
let q_ns: u128 = DEFAULT_TIME_SLICE.as_nanos();
let v_delta = (q_ns << VT_FIXED_SHIFT) / self.weight() as u128;
self.v_deadline = self.v_eligible + v_delta;
@@ -95,19 +154,11 @@ impl SchedulableTask {
/// weight = priority + SCHED_WEIGHT_BASE
/// The sum is clamped to a minimum of 1
pub fn weight(&self) -> u32 {
let w = self.priority() as i32 + SCHED_WEIGHT_BASE;
let w = self.task.priority() as i32 + SCHED_WEIGHT_BASE;
if w <= 0 { 1 } else { w as u32 }
}
pub fn compare_with(&self, other: &Self) -> core::cmp::Ordering {
if self.is_idle_task() {
return Ordering::Greater;
}
if other.is_idle_task() {
return Ordering::Less;
}
self.v_deadline
.cmp(&other.v_deadline)
.then_with(|| self.v_runtime.cmp(&other.v_runtime))
@@ -141,8 +192,7 @@ impl SchedulableTask {
/// Setup task accounting info such that it is about to be executed.
pub fn about_to_execute(&mut self, now: Instant) {
self.exec_start = Some(now);
*self.last_cpu.lock_save_irq() = CpuId::this();
*self.state.lock_save_irq() = TaskState::Running;
self.task.state.activate();
// Deadline logic
if self.deadline.is_none_or(|d| d <= now + DEFAULT_TIME_SLICE) {
@@ -153,4 +203,8 @@ impl SchedulableTask {
schedule_preempt(d);
}
}
pub fn switch_context(&self) {
ArchImpl::context_switch(self.task.t_shared.clone());
}
}

View File

@@ -0,0 +1,240 @@
use atomic_enum::atomic_enum;
use core::{fmt::Display, sync::atomic::Ordering};
#[atomic_enum]
#[derive(PartialEq, Eq)]
pub enum TaskState {
Running,
Runnable,
Woken,
Stopped,
Sleeping,
Finished,
/// Wants to sleep but RunnableTask still on the runqueue.
PendingSleep,
/// Wants to stop but RunnableTask still on the runqueue;
PendingStop,
}
impl Display for TaskState {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let state_str = match self {
TaskState::Running => "R",
TaskState::Runnable => "R",
TaskState::Woken => "W",
TaskState::Stopped => "T",
TaskState::Sleeping => "S",
TaskState::Finished => "Z",
TaskState::PendingSleep => "S",
TaskState::PendingStop => "T",
};
write!(f, "{state_str}")
}
}
impl TaskState {
pub fn is_finished(self) -> bool {
matches!(self, Self::Finished)
}
}
/// What the waker should do after calling `wake()`.
pub enum WakerAction {
/// Task was Sleeping/Stopped -> Runnable. Safe to call add_work.
Enqueue,
/// Task was Running/Pending* -> Woken. Sleep prevented, no enqueue needed.
PreventedSleep,
/// No action needed (already Woken/Finished, or CAS lost to another CPU).
None,
}
/// State machine wrapper around `AtomicTaskState`.
///
/// Only exposes named transition methods to enforce state transition logic.
///
/// ```text
/// new()
/// |
/// v
/// +------- wake() ------> RUNNABLE <------- wake() -------+-------+
/// | (Enqueue) | (Enqueue) | |
/// | activate() | |
/// | | | |
/// | v | |
/// | RUNNING ----> finish() ---> FINISHED |
/// | / \ |
/// | try_pending / \ try_pending |
/// | _sleep() / \ |
/// | v v |
/// | PENDING_SLEEP PENDING_STOP |
/// | | | |
/// | finalize | | finalize |
/// | _deact() | | _deact() |
/// | v v |
/// +--------------- SLEEPING STOPPED --------------------------+
///
/// Race: wake() on {Running, Runnable, PendingSleep, PendingStop} --> WOKEN.
/// try_pending_sleep() clears WOKEN back to Running (prevents spurious
/// sleep).
/// ```
pub struct TaskStateMachine(AtomicTaskState);
impl TaskStateMachine {
/// New tasks starts as Runnable.
pub fn new() -> Self {
Self(AtomicTaskState::new(TaskState::Runnable))
}
/// Read the current state.
pub fn load(&self, ordering: Ordering) -> TaskState {
self.0.load(ordering)
}
/// Scheduler is about to execute this task (-> Running).
pub fn activate(&self) {
self.0.store(TaskState::Running, Ordering::Relaxed);
}
/// Task placed on a run queue (-> Runnable).
pub fn mark_runnable(&self) {
self.0.store(TaskState::Runnable, Ordering::Relaxed);
}
/// Scheduler finalizes deactivation after dropping RunnableTask.
///
/// CAS PendingSleep -> Sleeping or PendingStop -> Stopped. Returns `true`
/// if deactivated, `false` if woken (caller should re-enqueue).
pub fn finalize_deactivation(&self) -> bool {
let state = self.0.load(Ordering::Acquire);
let target = match state {
TaskState::PendingSleep => TaskState::Sleeping,
TaskState::PendingStop => TaskState::Stopped,
// Already woken concurrently.
_ => return false,
};
self.0
.compare_exchange(state, target, Ordering::Release, Ordering::Acquire)
.is_ok()
}
/// Future returned `Poll::Pending`, try to initiate sleep.
///
/// CAS Running -> PendingSleep. Returns `true` if sleep initiated and a new
/// task should be scheduled, `false` if woken/finished and we should
/// re-poll.
pub fn try_pending_sleep(&self) -> bool {
let state = self.0.load(Ordering::Acquire);
match state {
TaskState::Running | TaskState::Runnable => {
match self.0.compare_exchange(
state,
TaskState::PendingSleep,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => true,
Err(TaskState::Woken) => {
// Woken between load and CAS — clear woken, don't sleep.
self.0.store(TaskState::Running, Ordering::Release);
false
}
Err(TaskState::Finished) => true,
Err(s) => {
unreachable!("Unexpected task state {s:?} during pending_sleep transition")
}
}
}
TaskState::Woken => {
self.0.store(TaskState::Running, Ordering::Release);
false
}
TaskState::Finished => true,
s => unreachable!("Unexpected task state {s:?} during pending_sleep transition"),
}
}
/// Ptrace/signal stop: try CAS Running -> PendingStop.
///
/// Returns `true` if the stop was initiated, `false` if the task was woken
/// concurrently (caller should re-process kernel work instead of sleeping).
pub fn try_pending_stop(&self) -> bool {
let state = self.0.load(Ordering::Acquire);
match state {
TaskState::Running | TaskState::Runnable => self
.0
.compare_exchange(
state,
TaskState::PendingStop,
Ordering::Release,
Ordering::Acquire,
)
.is_ok(),
TaskState::Woken => {
self.0.store(TaskState::Running, Ordering::Release);
false
}
// Already stopped/sleeping/finished — nothing to do.
_ => true,
}
}
/// Wake the task. Returns what the caller should do.
pub fn wake(&self) -> WakerAction {
loop {
let state = self.0.load(Ordering::Acquire);
match state {
TaskState::Sleeping | TaskState::Stopped => {
if self
.0
.compare_exchange(
state,
TaskState::Runnable,
Ordering::Release,
Ordering::Relaxed,
)
.is_ok()
{
return WakerAction::Enqueue;
}
// Another CPU handled it.
return WakerAction::None;
}
TaskState::Running
| TaskState::Runnable
| TaskState::PendingSleep
| TaskState::PendingStop => {
// Task hasn't fully deactivated yet. Set Woken to prevent
// the sleep/stop transition from completing.
if self
.0
.compare_exchange(
state,
TaskState::Woken,
Ordering::Release,
Ordering::Relaxed,
)
.is_ok()
{
return WakerAction::PreventedSleep;
}
// State changed under us (e.g. PendingSleep -> Sleeping).
// Retry.
continue;
}
// Already Woken or Finished: noop.
_ => return WakerAction::None,
}
}
}
/// Mark task as finished (-> Finished).
pub fn finish(&self) {
self.0.store(TaskState::Finished, Ordering::Release);
}
/// Check if finished.
pub fn is_finished(&self) -> bool {
self.0.load(Ordering::Acquire).is_finished()
}
}

View File

@@ -1,8 +1,7 @@
use super::{current::current_task, schedule, waker::create_waker};
use super::{current::current_task, current_work, current_work_waker, schedule};
use crate::{
arch::{Arch, ArchImpl},
process::{
TaskState,
ctx::UserCtx,
exit::kernel_exit_with_signal,
thread_group::{
@@ -12,7 +11,7 @@ use crate::{
},
};
use alloc::boxed::Box;
use core::{ptr, task::Poll};
use core::{ptr, sync::atomic::Ordering, task::Poll};
enum State {
PickNewTask,
@@ -20,6 +19,15 @@ enum State {
ReturnToUserspace,
}
/// Try to transition the current task from Running to PendingSleep atomically.
///
/// Returns `true` if the task should go to sleep (state set to PendingSleep or
/// task is Finished). Returns `false` if the task was woken concurrently
/// and should re-process its kernel work instead.
fn try_sleep_current() -> bool {
current_work().state.try_pending_sleep()
}
/// Prepares the kernel for a safe return to userspace, guaranteeing a valid
/// context frame is prepared.
///
@@ -84,13 +92,9 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
State::ProcessKernelWork => {
// First, let's handle signals. If there is any scheduled signal
// work (this has to be async to handle faults, etc).
let (signal_work, desc, is_idle) = {
let (signal_work, is_idle) = {
let mut task = current_task();
(
task.ctx.take_signal_work(),
task.descriptor(),
task.is_idle_task(),
)
(task.ctx.take_signal_work(), task.is_idle_task())
};
if let Some(mut signal_work) = signal_work {
@@ -100,7 +104,7 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
match signal_work
.as_mut()
.poll(&mut core::task::Context::from_waker(&create_waker(desc)))
.poll(&mut core::task::Context::from_waker(&current_work_waker()))
{
Poll::Ready(Ok(state)) => {
// Signal actioning is complete. Return to userspace.
@@ -118,39 +122,13 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
continue;
}
Poll::Pending => {
let mut task = current_task();
current_task().ctx.put_signal_work(signal_work);
task.ctx.put_signal_work(signal_work);
let mut task_state = task.state.lock_save_irq();
match *task_state {
// The main path we expect to take to sleep the
// task.
// Task is currently running or is runnable and will now sleep.
TaskState::Running | TaskState::Runnable => {
*task_state = TaskState::Sleeping;
}
// If we were woken between the future returning
// `Poll::Pending` and acquiring the lock above,
// the waker will have put us into this state.
// Transition back to `Running` since we're
// ready to progress with more work.
TaskState::Woken => {
*task_state = TaskState::Running;
}
// If the task finished concurrently while we were
// polling its signal work, let the scheduler
// pick another task; no further work to do here.
TaskState::Finished => {}
// We should never get here for any other state.
s => {
unreachable!(
"Unexpected task state {s:?} during signal task sleep"
);
}
if try_sleep_current() {
state = State::PickNewTask;
} else {
state = State::ProcessKernelWork;
}
state = State::PickNewTask;
continue;
}
}
@@ -165,16 +143,14 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
match kern_work
.as_mut()
.poll(&mut core::task::Context::from_waker(&create_waker(desc)))
.poll(&mut core::task::Context::from_waker(&current_work_waker()))
{
Poll::Ready(()) => {
let task = current_task();
// If the task just exited (entered the finished
// state), don't return to it's userspace, instead,
// find another task to execute, removing this task
// from the runqueue, reaping it's resouces.
if task.state.lock_save_irq().is_finished() {
if current_work().state.load(Ordering::Acquire).is_finished() {
state = State::PickNewTask;
continue;
}
@@ -188,41 +164,13 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
continue;
}
Poll::Pending => {
let mut task = current_task();
current_task().ctx.put_kernel_work(kern_work);
// Kernel work hasn't finished. A wake up should
// have been scheduled by the future. Replace the
// kernel work context back into the task, set it's
// state to sleeping so it's not scheduled again and
// search for another task to execute.
task.ctx.put_kernel_work(kern_work);
let mut task_state = task.state.lock_save_irq();
match *task_state {
// Task is runnable or running, put it to sleep.
TaskState::Running | TaskState::Runnable => {
*task_state = TaskState::Sleeping;
}
// If we were woken between the future returning
// `Poll::Pending` and acquiring the lock above,
// the waker will have put us into this state.
// Transition back to `Running` since we're
// ready to progress with more work.
TaskState::Woken => {
*task_state = TaskState::Running;
}
// Task finished concurrently while we were trying
// to put it to sleep; just reschedule and let
// teardown handle it.
TaskState::Finished => {}
// We should never get here for any other state.
s => {
unreachable!(
"Unexpected task state {s:?} during kernel task sleep"
);
}
if try_sleep_current() {
state = State::PickNewTask;
} else {
state = State::ProcessKernelWork;
}
state = State::PickNewTask;
continue;
}
}
@@ -242,11 +190,17 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
while let Some(signal) = task.take_signal() {
let mut ptrace = task.ptrace.lock_save_irq();
if ptrace.trace_signal(signal, task.ctx.user()) {
ptrace.set_waker(current_work_waker());
ptrace.notify_tracer_of_trap(&task.process);
ptrace.set_waker(create_waker(task.descriptor()));
drop(ptrace);
drop(task);
*task.state.lock_save_irq() = TaskState::Stopped;
state = State::PickNewTask;
if current_work().state.try_pending_stop() {
state = State::PickNewTask;
} else {
// Woken concurrently (tracer already resumed us).
state = State::ProcessKernelWork;
}
continue 'dispatch;
}
drop(ptrace);
@@ -284,7 +238,7 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
for thr_weak in process.tasks.lock_save_irq().values() {
if let Some(thr) = thr_weak.upgrade() {
*thr.state.lock_save_irq() = TaskState::Stopped;
thr.state.try_pending_stop();
}
}
@@ -294,13 +248,10 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) {
Some(KSignalAction::Continue) => {
let process = &task.process;
// Wake up all sleeping threads in the process.
// Wake up all stopped/sleeping threads in the process.
for thr_weak in process.tasks.lock_save_irq().values() {
if let Some(thr) = thr_weak.upgrade() {
let mut st = thr.state.lock_save_irq();
if *st == TaskState::Sleeping {
*st = TaskState::Runnable;
}
crate::sched::waker::create_waker(thr).wake();
}
}

View File

@@ -1,62 +1,58 @@
use crate::{
interrupts::cpu_messenger::{Message, message_cpu},
kernel::cpu_id::CpuId,
process::{TASK_LIST, TaskDescriptor, TaskState},
};
use alloc::sync::Arc;
use core::task::{RawWaker, RawWakerVTable, Waker};
use super::SCHED_STATE;
use super::{
SCHED_STATE,
sched_task::{Work, state::WakerAction},
};
unsafe fn clone_waker(data: *const ()) -> RawWaker {
RawWaker::new(data, &VTABLE)
let data: *const Work = data.cast();
unsafe { Arc::increment_strong_count(data) };
RawWaker::new(data.cast(), &VTABLE)
}
/// Wakes the task. This consumes the waker.
unsafe fn wake_waker(data: *const ()) {
let desc = TaskDescriptor::from_ptr(data);
/// Wakes the task. This does not consume the waker.
unsafe fn wake_waker_no_consume(data: *const ()) {
let data: *const Work = data.cast();
let task = TASK_LIST
.lock_save_irq()
.get(&desc)
.and_then(|x| x.upgrade());
// Increment the strong count first so that Arc::from_raw does not
// consume the waker's own reference.
unsafe { Arc::increment_strong_count(data) };
let work = unsafe { Arc::from_raw(data) };
if let Some(task) = task {
let mut state = task.state.lock_save_irq();
let locus = *task.last_cpu.lock_save_irq();
match *state {
// If the task has been put to sleep, then wake it up.
TaskState::Sleeping | TaskState::Stopped => {
if locus == CpuId::this() {
*state = TaskState::Runnable;
SCHED_STATE.borrow_mut().wakeup(desc);
} else {
message_cpu(locus, Message::WakeupTask(create_waker(desc)))
.expect("Could not wakeup task on other CPU");
}
}
// If the task is running, mark it so it doesn't actually go to
// sleep when poll returns. This covers the small race-window
// between a future returning `Poll::Pending` and the sched setting
// the state to sleeping.
TaskState::Running => {
*state = TaskState::Woken;
}
_ => {}
match work.state.wake() {
WakerAction::Enqueue => {
SCHED_STATE.borrow_mut().run_q.add_work(work);
}
WakerAction::PreventedSleep | WakerAction::None => {}
}
}
unsafe fn drop_waker(_data: *const ()) {
// There is nothing to do.
unsafe fn wake_waker_consume(data: *const ()) {
unsafe {
wake_waker_no_consume(data);
drop_waker(data);
}
}
static VTABLE: RawWakerVTable =
RawWakerVTable::new(clone_waker, wake_waker, wake_waker, drop_waker);
unsafe fn drop_waker(data: *const ()) {
let data: *const Work = data.cast();
unsafe { Arc::decrement_strong_count(data) };
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
clone_waker,
wake_waker_consume,
wake_waker_no_consume,
drop_waker,
);
/// Creates a `Waker` for a given `Pid`.
pub fn create_waker(desc: TaskDescriptor) -> Waker {
let raw_waker = RawWaker::new(desc.to_ptr(), &VTABLE);
pub fn create_waker(work: Arc<Work>) -> Waker {
let raw_waker = RawWaker::new(Arc::into_raw(work).cast(), &VTABLE);
// SAFETY: We have correctly implemented the VTable functions.
unsafe { Waker::from_raw(raw_waker) }

View File

@@ -105,8 +105,7 @@ macro_rules! ktest_impl {
$name,
fn [<__sync_ $name>]() {
let mut fut = alloc::boxed::Box::pin($name());
let desc = crate::process::TaskDescriptor::from_tgid_tid(crate::process::thread_group::Tgid(0), crate::process::Tid(0));
let waker = crate::sched::waker::create_waker(desc);
let waker = crate::sched::current_work_waker();
let mut ctx = core::task::Context::from_waker(&waker);
loop {
match fut.as_mut().poll(&mut ctx) {