From 5ebfc29cd2bdb762dfa6c0cc0845a31fac3e962a Mon Sep 17 00:00:00 2001 From: Matthew Leach Date: Tue, 10 Mar 2026 22:00:56 +0000 Subject: [PATCH] sched: introduce Work as the unified scheduleable unit Refactor the scheduler so all scheduleable work is wrapped in Arc, replacing the previous per-CPU wait_q design where sleeping tasks were bound to a specific CPU. Wakers now hold direct Arc references and can re-enqueue tasks on any CPU upon wakeup. Key changes: - Add Work struct wrapping OwnedTask with an AtomicTaskState and scheduler metadata (SchedulerData), replacing the old SchedulableTask. Remove Task::state (Arc>). Work::state is now the single source of truth for task state. - Rewrite the run queue using BinaryHeap-based eligible/ineligible split (EEVDF) with a dedicated VClock, replacing the BTreeMap linear scan. Extract vclock into its own module. - Rewrite wakers to hold Arc directly instead of looking up tasks by TaskDescriptor from TASK_LIST. - Replace lock-based sleep transitions in uspc_ret with atomic CAS (try_sleep_current) that correctly detects concurrent Woken state. - Simplify least-tasked-CPU metric to use only run-queue weight, since sleeping tasks are no longer bound to any CPU. - Add current_work() accessor. --- Cargo.lock | 12 + Cargo.toml | 1 + src/arch/arm64/exceptions/syscall.rs | 23 +- src/drivers/fs/proc/task/task_file.rs | 2 +- src/interrupts/cpu_messenger.rs | 8 +- src/process/caps.rs | 2 + src/process/clone.rs | 25 +- src/process/exit.rs | 14 +- src/process/mod.rs | 39 +-- src/process/owned.rs | 17 +- src/process/ptrace.rs | 67 ++--- src/process/thread_group.rs | 34 ++- src/sched/current.rs | 6 +- src/sched/mod.rs | 204 +++------------ src/sched/runqueue.rs | 169 ------------ src/sched/runqueue/mod.rs | 220 ++++++++++++++++ src/sched/runqueue/vclock.rs | 43 ++++ .../{sched_task.rs => sched_task/mod.rs} | 112 +++++--- src/sched/sched_task/state.rs | 240 ++++++++++++++++++ src/sched/uspc_ret.rs | 125 +++------ src/sched/waker.rs | 82 +++--- src/testing/mod.rs | 3 +- 22 files changed, 825 insertions(+), 623 deletions(-) delete mode 100644 src/sched/runqueue.rs create mode 100644 src/sched/runqueue/mod.rs create mode 100644 src/sched/runqueue/vclock.rs rename src/sched/{sched_task.rs => sched_task/mod.rs} (71%) create mode 100644 src/sched/sched_task/state.rs diff --git a/Cargo.lock b/Cargo.lock index fd134bd..9ccb4af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic_enum" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -547,6 +558,7 @@ dependencies = [ "arm-pl011-uart", "arm_pl031", "async-trait", + "atomic_enum", "bitflags 2.11.0", "blake2", "chacha20", diff --git a/Cargo.toml b/Cargo.toml index e3d1780..e9c2c48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ rustc-hash = { version = "2.1", default-features = false } smoltcp = { version = "0.12.0", default-features = false, features = ["alloc", "medium-ethernet", "medium-ip", "proto-ipv4", "proto-ipv6", "socket-tcp", "socket-udp"] } tock-registers = "0.10.1" virtio-drivers = "0.13.0" +atomic_enum = "0.3.0" [build-dependencies] time = { version = "0.3.47", features = ["formatting", "macros"] } # For build timestamping via build.rs diff --git a/src/arch/arm64/exceptions/syscall.rs b/src/arch/arm64/exceptions/syscall.rs index 07a0199..3425587 100644 --- a/src/arch/arm64/exceptions/syscall.rs +++ b/src/arch/arm64/exceptions/syscall.rs @@ -66,7 +66,6 @@ use crate::{ socket::sys_socket, }, process::{ - TaskState, caps::{sys_capget, sys_capset}, clone::sys_clone, creds::{ @@ -98,7 +97,7 @@ use crate::{ }, threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address}, }, - sched::{current::current_task, sys_sched_yield}, + sched::{self, current::current_task, sched_task::state::TaskState, sys_sched_yield}, }; use alloc::boxed::Box; use libkernel::{ @@ -389,10 +388,12 @@ pub async fn handle_syscall() { 0x5d => { let _ = sys_exit(arg1 as _).await; - debug_assert!(matches!( - *current_task().state.lock_save_irq(), - TaskState::Finished - )); + debug_assert!( + sched::current_work() + .state + .load(core::sync::atomic::Ordering::Acquire) + == TaskState::Finished + ); // Don't process result on exit. return; @@ -400,10 +401,12 @@ pub async fn handle_syscall() { 0x5e => { let _ = sys_exit_group(arg1 as _).await; - debug_assert!(matches!( - *current_task().state.lock_save_irq(), - TaskState::Finished - )); + debug_assert!( + sched::current_work() + .state + .load(core::sync::atomic::Ordering::Acquire) + == TaskState::Finished + ); // Don't process result on exit. return; diff --git a/src/drivers/fs/proc/task/task_file.rs b/src/drivers/fs/proc/task/task_file.rs index f7d76bf..ee267e2 100644 --- a/src/drivers/fs/proc/task/task_file.rs +++ b/src/drivers/fs/proc/task/task_file.rs @@ -93,7 +93,7 @@ impl SimpleFile for ProcTaskFileInode { }; let status_string = if let Some(task) = task_details { - let state = *task.state.lock_save_irq(); + let state = task.state.load(core::sync::atomic::Ordering::Relaxed); let name = task.comm.lock_save_irq(); match self.file_type { TaskFileType::Status => format!( diff --git a/src/interrupts/cpu_messenger.rs b/src/interrupts/cpu_messenger.rs index a507ec3..7774277 100644 --- a/src/interrupts/cpu_messenger.rs +++ b/src/interrupts/cpu_messenger.rs @@ -6,7 +6,7 @@ use super::{ ClaimedInterrupt, InterruptConfig, InterruptDescriptor, InterruptHandler, get_interrupt_root, }; use crate::kernel::cpu_id::CpuId; -use crate::process::owned::OwnedTask; +use crate::sched::sched_task::Work; use crate::{ arch::ArchImpl, drivers::Driver, @@ -14,7 +14,6 @@ use crate::{ sched, sync::{OnceLock, SpinLock}, }; -use alloc::boxed::Box; use alloc::{sync::Arc, vec::Vec}; use libkernel::{ CpuOps, @@ -23,7 +22,8 @@ use libkernel::{ use log::warn; pub enum Message { - PutTask(Box), + EnqueueWork(Arc), + #[expect(unused)] WakeupTask(Waker), } @@ -50,7 +50,7 @@ impl InterruptHandler for CpuMessenger { .try_pop() { match message { - Message::PutTask(task) => sched::insert_task(task), + Message::EnqueueWork(work) => sched::insert_work(work), Message::WakeupTask(waker) => waker.wake(), } } diff --git a/src/process/caps.rs b/src/process/caps.rs index f912df3..0329ed2 100644 --- a/src/process/caps.rs +++ b/src/process/caps.rs @@ -60,6 +60,7 @@ pub async fn sys_capget(hdrp: TUA, datap: TUA) -> Re .iter() .find(|task| task.0.tgid.value() == header.pid as u32) .and_then(|task| task.1.upgrade()) + .map(|x| x.t_shared.clone()) .ok_or(KernelError::NoProcess)? }; match header.version { @@ -95,6 +96,7 @@ pub async fn sys_capset(hdrp: TUA, datap: TUA) -> Re .iter() .find(|task| task.0.tgid.value() == header.pid as u32) .and_then(|task| task.1.upgrade()) + .map(|x| x.t_shared.clone()) .ok_or(KernelError::NoProcess)? }; diff --git a/src/process/clone.rs b/src/process/clone.rs index 7bfafcb..e997c32 100644 --- a/src/process/clone.rs +++ b/src/process/clone.rs @@ -1,10 +1,10 @@ use super::owned::OwnedTask; use super::ptrace::{PTrace, TracePoint, ptrace_stop}; use super::{ctx::Context, thread_group::signal::SigSet}; -use crate::kernel::cpu_id::CpuId; use crate::memory::uaccess::copy_to_user; +use crate::sched::sched_task::Work; use crate::{ - process::{TASK_LIST, Task, TaskState}, + process::{TASK_LIST, Task}, sched::{self, current::current_task}, sync::SpinLock, }; @@ -170,8 +170,6 @@ pub async fn sys_clone( cwd, root, creds: SpinLock::new(creds), - state: Arc::new(SpinLock::new(TaskState::Runnable)), - last_cpu: SpinLock::new(CpuId::this()), ptrace: SpinLock::new(ptrace), utime: AtomicUsize::new(0), stime: AtomicUsize::new(0), @@ -181,28 +179,29 @@ pub async fn sys_clone( } }; - let tid = new_task.tid; + let desc = new_task.descriptor(); + let work = Work::new(Box::new(new_task)); TASK_LIST .lock_save_irq() - .insert(new_task.descriptor(), Arc::downgrade(&new_task.t_shared)); + .insert(desc, Arc::downgrade(&work)); - new_task - .process + work.process .tasks .lock_save_irq() - .insert(tid, Arc::downgrade(&new_task.t_shared)); + .insert(desc.tid, Arc::downgrade(&work)); + + sched::insert_task_cross_cpu(work); - sched::insert_task_cross_cpu(Box::new(new_task)); NUM_FORKS.fetch_add(1, core::sync::atomic::Ordering::Relaxed); // Honour CLONE_*SETTID semantics for the parent and (shared-VM) child. if flags.contains(CloneFlags::CLONE_PARENT_SETTID) && !parent_tidptr.is_null() { - copy_to_user(parent_tidptr, tid.value()).await?; + copy_to_user(parent_tidptr, desc.tid.value()).await?; } if flags.contains(CloneFlags::CLONE_CHILD_SETTID) && !child_tidptr.is_null() { - copy_to_user(child_tidptr, tid.value()).await?; + copy_to_user(child_tidptr, desc.tid.value()).await?; } - Ok(tid.value() as _) + Ok(desc.tid.value() as _) } diff --git a/src/process/exit.rs b/src/process/exit.rs index e767f92..8abdc9f 100644 --- a/src/process/exit.rs +++ b/src/process/exit.rs @@ -1,10 +1,10 @@ use super::{ - TASK_LIST, TaskState, + TASK_LIST, ptrace::{TracePoint, ptrace_stop}, thread_group::{ProcessState, Tgid, ThreadGroup, signal::SigId, wait::ChildState}, threading::futex::{self, key::FutexKey}, }; -use crate::sched::current::current_task; +use crate::sched::{self, current::current_task}; use crate::{memory::uaccess::copy_to_user, sched::current::current_task_shared}; use alloc::vec::Vec; use libkernel::error::Result; @@ -34,7 +34,8 @@ pub fn do_exit_group(exit_code: ChildState) { if *process_state != ProcessState::Running { // We're already on our way out. Just kill this thread. drop(process_state); - *task.state.lock_save_irq() = TaskState::Finished; + drop(task); + sched::current_work().state.finish(); return; } @@ -51,7 +52,7 @@ pub fn do_exit_group(exit_code: ChildState) { // TODO: Send an IPI/Signal to halt execution now. For now, just // wait for the scheduler to never schedule any of it's tasks // again. - *other_thread.state.lock_save_irq() = TaskState::Finished; + other_thread.state.finish(); } } } @@ -87,7 +88,8 @@ pub fn do_exit_group(exit_code: ChildState) { .set_signal(SigId::SIGCHLD); // 5. This thread is now finished. - *task.state.lock_save_irq() = TaskState::Finished; + drop(task); + sched::current_work().state.finish(); // NOTE: that the scheduler will never execute the task again since it's // state is set to Finished. @@ -151,7 +153,7 @@ pub async fn sys_exit(exit_code: usize) -> Result { Ok(0) } else { // Mark our own state as finished. - *task.state.lock_save_irq() = TaskState::Finished; + sched::current_work().state.finish(); // Remove ourself from the process's thread list. tasks_lock.remove(&task.tid); diff --git a/src/process/mod.rs b/src/process/mod.rs index ecb31a8..d95009e 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -1,5 +1,6 @@ use crate::drivers::timer::Instant; use crate::sched::CPU_STAT; +use crate::sched::sched_task::Work; use crate::{ arch::ArchImpl, kernel::cpu_id::CpuId, @@ -14,7 +15,6 @@ use alloc::{ collections::btree_map::BTreeMap, sync::{Arc, Weak}, }; -use core::fmt::Display; use core::sync::atomic::{AtomicUsize, Ordering}; use creds::Credentials; use fd_table::FileDescriptorTable; @@ -124,35 +124,6 @@ impl TaskDescriptor { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TaskState { - Running, - Runnable, - Woken, - Stopped, - Sleeping, - Finished, -} - -impl Display for TaskState { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let state_str = match self { - TaskState::Running => "R", - TaskState::Runnable => "R", - TaskState::Woken => "W", - TaskState::Stopped => "T", - TaskState::Sleeping => "S", - TaskState::Finished => "Z", - }; - write!(f, "{state_str}") - } -} - -impl TaskState { - pub fn is_finished(self) -> bool { - matches!(self, Self::Finished) - } -} pub type ProcVM = ProcessVM<::ProcessAddressSpace>; #[derive(Copy, Clone)] @@ -184,8 +155,6 @@ pub struct Task { pub root: Arc, PathBuf)>>, pub creds: SpinLock, pub fd_table: Arc>, - pub state: Arc>, - pub last_cpu: SpinLock, pub ptrace: SpinLock, pub utime: AtomicUsize, pub stime: AtomicUsize, @@ -308,7 +277,7 @@ impl Task { } } -pub fn find_task_by_descriptor(descriptor: &TaskDescriptor) -> Option> { +pub fn find_task_by_descriptor(descriptor: &TaskDescriptor) -> Option> { TASK_LIST .lock_save_irq() .get(descriptor) @@ -316,11 +285,11 @@ pub fn find_task_by_descriptor(descriptor: &TaskDescriptor) -> Option> } /// Finds the root task for the given thread group -pub fn find_process_by_tgid(tgid: Tgid) -> Option> { +pub fn find_process_by_tgid(tgid: Tgid) -> Option> { find_task_by_descriptor(&TaskDescriptor::from_tgid_tid(tgid, Tid::from_tgid(tgid))) } -pub static TASK_LIST: SpinLock>> = +pub static TASK_LIST: SpinLock>> = SpinLock::new(BTreeMap::new()); unsafe impl Send for Task {} diff --git a/src/process/owned.rs b/src/process/owned.rs index fe2408d..11f8683 100644 --- a/src/process/owned.rs +++ b/src/process/owned.rs @@ -1,7 +1,5 @@ -use core::ops::Deref; - use super::{ - Comm, Task, TaskState, Tid, + Comm, Task, Tid, creds::Credentials, ctx::{Context, UserCtx}, fd_table::FileDescriptorTable, @@ -13,14 +11,13 @@ use super::{ }, threading::RobustListHead, }; -use crate::drivers::timer::{Instant, now}; +use crate::{arch::Arch, fs::DummyInode, sync::SpinLock}; use crate::{ - arch::{Arch, ArchImpl}, - fs::DummyInode, - kernel::cpu_id::CpuId, - sync::SpinLock, + arch::ArchImpl, + drivers::timer::{Instant, now}, }; use alloc::sync::Arc; +use core::ops::Deref; use core::sync::atomic::AtomicUsize; use libkernel::{ VirtualMemory, @@ -72,13 +69,11 @@ impl OwnedTask { tid: Tid::idle_for_cpu(), comm: Arc::new(SpinLock::new(Comm::new("idle"))), process: thread_group_builder.build(), - state: Arc::new(SpinLock::new(TaskState::Runnable)), cwd: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))), root: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))), creds: SpinLock::new(Credentials::new_root()), vm: Arc::new(SpinLock::new(vm)), fd_table: Arc::new(SpinLock::new(FileDescriptorTable::new())), - last_cpu: SpinLock::new(CpuId::this()), ptrace: SpinLock::new(PTrace::new()), utime: AtomicUsize::new(0), stime: AtomicUsize::new(0), @@ -102,7 +97,6 @@ impl OwnedTask { tid: Tid(1), comm: Arc::new(SpinLock::new(Comm::new("init"))), process: ThreadGroupBuilder::new(Tgid::init()).build(), - state: Arc::new(SpinLock::new(TaskState::Runnable)), cwd: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))), root: Arc::new(SpinLock::new((Arc::new(DummyInode {}), PathBuf::new()))), creds: SpinLock::new(Credentials::new_root()), @@ -110,7 +104,6 @@ impl OwnedTask { ProcessVM::empty().expect("Could not create init process's VM"), )), fd_table: Arc::new(SpinLock::new(FileDescriptorTable::new())), - last_cpu: SpinLock::new(CpuId::this()), ptrace: SpinLock::new(PTrace::new()), last_account: AtomicUsize::new(0), utime: AtomicUsize::new(0), diff --git a/src/process/ptrace.rs b/src/process/ptrace.rs index 243fd08..e64caca 100644 --- a/src/process/ptrace.rs +++ b/src/process/ptrace.rs @@ -1,24 +1,25 @@ -use core::future::poll_fn; -use core::task::{Poll, Waker}; - -use crate::arch::{Arch, ArchImpl}; -use crate::fs::syscalls::iov::IoVec; -use crate::memory::uaccess::{copy_from_user, copy_to_user}; -use crate::process::TASK_LIST; -use crate::process::thread_group::signal::SigId; -use crate::sched::current::{current_task, current_task_shared}; +use super::thread_group::{ThreadGroup, wait::ChildState}; +use crate::{ + arch::{Arch, ArchImpl}, + fs::syscalls::iov::IoVec, + memory::uaccess::{copy_from_user, copy_to_user}, + process::{TASK_LIST, thread_group::signal::SigId}, + sched::current::{current_task, current_task_shared}, +}; use alloc::sync::Arc; use bitflags::Flags; -use libkernel::error::{KernelError, Result}; -use libkernel::memory::address::UA; +use core::{ + future::poll_fn, + task::{Poll, Waker}, +}; +use libkernel::{ + error::{KernelError, Result}, + memory::address::UA, +}; use log::warn; type GpRegs = ::PTraceGpRegs; -use super::TaskState; -use super::thread_group::ThreadGroup; -use super::thread_group::wait::ChildState; - const PTRACE_EVENT_FORK: usize = 1; const PTRACE_EVENT_VFORK: usize = 2; const PTRACE_EVENT_CLONE: usize = 3; @@ -43,7 +44,7 @@ bitflags::bitflags! { const PTRACE_O_SUSPEND_SECCOMP = 1 << 21; } - #[derive(Clone, Copy, PartialEq)] + #[derive(Clone, Copy, PartialEq, Debug)] pub struct TracePoint: u32 { const SyscallEntry = 0x01; const SyscallExit = 0x02; @@ -177,9 +178,6 @@ impl PTrace { } pub fn set_waker(&mut self, waker: Waker) { - // Ensure we never override an already existing waker. - debug_assert!(self.waker.is_none()); - self.waker = Some(waker); } @@ -259,22 +257,29 @@ impl TryFrom for PtraceOperation { pub async fn ptrace_stop(point: TracePoint) -> bool { let task_sh = current_task_shared(); - { - let mut ptrace = task_sh.ptrace.lock_save_irq(); - - if ptrace.hit_trace_point(point, current_task().ctx.user()) { - ptrace.notify_tracer_of_trap(&task_sh.process); - } else { - return false; - } - } + let mut notified = false; poll_fn(|cx| { let mut ptrace = task_sh.ptrace.lock_save_irq(); - if matches!(ptrace.state, Some(PTraceState::Running)) { + if !notified { + // First poll: hit the trace point, set waker, then notify. + // The waker must be set *before* notification so the tracer + // can always find it when it does PTRACE_SYSCALL/CONT. + if !ptrace.hit_trace_point(point, current_task().ctx.user()) { + return Poll::Ready(false); + } + + notified = true; + ptrace.set_waker(cx.waker().clone()); + ptrace.notify_tracer_of_trap(&task_sh.process); + Poll::Pending + } else if matches!(ptrace.state, Some(PTraceState::Running)) { + // Tracer resumed us. Poll::Ready(true) } else { + // Re-polled (e.g. spurious wakeup from signal) but tracer + // hasn't resumed yet. Refresh the waker and go back to sleep. ptrace.set_waker(cx.waker().clone()); Poll::Pending } @@ -381,7 +386,9 @@ pub async fn sys_ptrace(op: i32, pid: u64, addr: UA, data: UA) -> Result .break_points .remove(TracePoint::SyscallEntry | TracePoint::SyscallExit); - *target_task.state.lock_save_irq() = TaskState::Runnable; + if let Some(waker) = ptrace.waker.take() { + waker.wake(); + } Ok(0) } diff --git a/src/process/thread_group.rs b/src/process/thread_group.rs index 56edf9c..02c6eff 100644 --- a/src/process/thread_group.rs +++ b/src/process/thread_group.rs @@ -1,5 +1,12 @@ -use super::{Task, TaskState, Tid}; -use crate::{memory::uaccess::UserCopyable, sched::waker::create_waker, sync::SpinLock}; +use super::Tid; +use crate::{ + memory::uaccess::UserCopyable, + sched::{ + sched_task::{Work, state::TaskState}, + waker::create_waker, + }, + sync::SpinLock, +}; use alloc::{ collections::btree_map::BTreeMap, sync::{Arc, Weak}, @@ -95,7 +102,7 @@ pub struct ThreadGroup { pub umask: SpinLock, pub parent: SpinLock>>, pub children: SpinLock>>, - pub tasks: SpinLock>>, + pub tasks: SpinLock>>, pub signals: Arc>, pub rsrc_lim: Arc>, pub pending_signals: SpinLock, @@ -165,13 +172,10 @@ impl ThreadGroup { *self.pending_signals.lock_save_irq() = SigSet::SIGKILL; for task in self.tasks.lock_save_irq().values() { - if let Some(task) = task.upgrade() - && matches!( - *task.state.lock_save_irq(), - TaskState::Stopped | TaskState::Sleeping - ) - { - create_waker(task.descriptor()).wake(); + if let Some(task) = task.upgrade() { + // Wake will handle Sleeping/Stopped → Enqueue, + // and Running/Pending* → PreventedSleep (sets Woken). + create_waker(task).wake(); } } } @@ -182,8 +186,12 @@ impl ThreadGroup { for task in self.tasks.lock_save_irq().values() { if let Some(task) = task.upgrade() && matches!( - *task.state.lock_save_irq(), - TaskState::Runnable | TaskState::Running + task.state.load(Ordering::Acquire), + TaskState::Runnable + | TaskState::Running + | TaskState::Woken + | TaskState::PendingSleep + | TaskState::PendingStop ) { // Signal delivered. This task will eventually be @@ -196,7 +204,7 @@ impl ThreadGroup { // No task will pick up the signal. Wake one up. for task in self.tasks.lock_save_irq().values() { if let Some(task) = task.upgrade() { - create_waker(task.descriptor()).wake(); + create_waker(task).wake(); return; } } diff --git a/src/sched/current.rs b/src/sched/current.rs index 3682938..af01027 100644 --- a/src/sched/current.rs +++ b/src/sched/current.rs @@ -2,7 +2,7 @@ use crate::{ per_cpu_private, process::{Task, owned::OwnedTask}, }; -use alloc::{boxed::Box, sync::Arc}; +use alloc::sync::Arc; use core::{ cell::Cell, marker::PhantomData, @@ -78,8 +78,8 @@ impl CurrentTaskPtr { } } - pub(super) fn set_current(&self, task: &mut Box) { - self.ptr.set(Box::as_mut_ptr(task)); + pub(super) fn set_current(&self, task: *mut OwnedTask) { + self.ptr.set(task); } } diff --git a/src/sched/mod.rs b/src/sched/mod.rs index 1109444..cb17f37 100644 --- a/src/sched/mod.rs +++ b/src/sched/mod.rs @@ -1,23 +1,20 @@ -use crate::arch::ArchImpl; use crate::drivers::timer::{Instant, now}; #[cfg(feature = "smp")] use crate::interrupts::cpu_messenger::{Message, message_cpu}; use crate::kernel::cpu_id::CpuId; use crate::process::owned::OwnedTask; -use crate::{ - arch::Arch, - per_cpu_private, per_cpu_shared, - process::{TASK_LIST, TaskDescriptor, TaskState}, -}; -use alloc::{boxed::Box, collections::btree_map::BTreeMap, sync::Arc}; +use crate::{per_cpu_private, per_cpu_shared, process::TASK_LIST}; +use alloc::{boxed::Box, sync::Arc}; use core::fmt::Debug; use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use core::task::Waker; use core::time::Duration; use current::{CUR_TASK_PTR, current_task}; -use libkernel::{UserAddressSpace, error::Result}; +use libkernel::error::Result; use log::warn; -use runqueue::{RunQueue, SwitchResult}; -use sched_task::SchedulableTask; +use runqueue::RunQueue; +use sched_task::Work; +use waker::create_waker; pub mod current; mod runqueue; @@ -128,12 +125,10 @@ pub fn spawn_kernel_work(fut: impl Future + 'static + Send) { /// Global atomic storing info about the least-tasked CPU. /// First 16 bits: CPU ID -/// Next 24 bits: Weight -/// Next 24 bits: Number of waiting tasks +/// Remaining 48 bits: Run-queue weight #[cfg(feature = "smp")] static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0); const WEIGHT_SHIFT: u32 = 16; -const WAITING_SHIFT: u32 = WEIGHT_SHIFT + 24; #[cfg(feature = "smp")] fn get_best_cpu() -> CpuId { @@ -143,49 +138,35 @@ fn get_best_cpu() -> CpuId { } /// Insert the given task onto a CPU's run queue. -pub fn insert_task(task: Box) { - SCHED_STATE - .borrow_mut() - .insert_into_runq(SchedulableTask::new(task)); +pub fn insert_work(work: Arc) { + SCHED_STATE.borrow_mut().run_q.add_work(work); } #[cfg(feature = "smp")] -pub fn insert_task_cross_cpu(task: Box) { +pub fn insert_task_cross_cpu(task: Arc) { let cpu = get_best_cpu(); if cpu == CpuId::this() { - insert_task(task); + SCHED_STATE.borrow_mut().run_q.add_work(task); } else { - message_cpu(cpu, Message::PutTask(task)).expect("Failed to send task to CPU"); + message_cpu(cpu, Message::EnqueueWork(task)).expect("Failed to send task to CPU"); } } #[cfg(not(feature = "smp"))] -pub fn insert_task_cross_cpu(task: Box) { - insert_task(task); +pub fn insert_task_cross_cpu(task: Arc) { + insert_work(task); } pub struct SchedState { run_q: RunQueue, - wait_q: BTreeMap>, - /// Per-CPU virtual clock (fixed-point 65.63 stored in a u128). - /// Expressed in virtual-time units as defined by the EEVDF paper. - vclock: u128, - /// Real-time moment when `vclock` was last updated. - last_update: Option, - /// Force a reschedule. - force_resched: bool, } unsafe impl Send for SchedState {} impl SchedState { - pub const fn new() -> Self { + pub fn new() -> Self { Self { run_q: RunQueue::new(), - wait_q: BTreeMap::new(), - vclock: 0, - last_update: None, - force_resched: false, } } @@ -211,22 +192,16 @@ impl SchedState { *LAST_UPDATE.borrow_mut() = now(); let weight = self.run_q.weight(); - let waiting_tasks = self.wait_q.len() as u64; let cpu_id = CpuId::this().value() as u64; - let new_info = (cpu_id & 0xffff) - | ((weight & 0xffffff) << WEIGHT_SHIFT) - | ((waiting_tasks & 0xffffff) << WAITING_SHIFT); + let new_info = (cpu_id & 0xffff) | ((weight & 0xffffffffffff) << WEIGHT_SHIFT); let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire); // Ensure we don't spin forever (possible with a larger number of CPUs) const MAX_RETRIES: usize = 8; // Ensure consistency for _ in 0..MAX_RETRIES { let old_cpu_id = old_info & 0xffff; - let old_weight = (old_info >> WEIGHT_SHIFT) & 0xffffff; - let old_waiting = (old_info >> WAITING_SHIFT) & 0xffffff; - let metric = weight + (waiting_tasks * SCHED_WEIGHT_BASE as u64); - let old_metric = old_weight + (old_waiting * SCHED_WEIGHT_BASE as u64); - if (cpu_id == old_cpu_id && old_info != new_info) || (metric < old_metric) { + let old_weight = old_info >> WEIGHT_SHIFT; + if (cpu_id == old_cpu_id && old_info != new_info) || (weight < old_weight) { match LEAST_TASKED_CPU_INFO.compare_exchange( old_info, new_info, @@ -245,160 +220,57 @@ impl SchedState { // No-op on single-core systems. } - /// Advance the per-CPU virtual clock (`vclock`) by converting the elapsed - /// real time since the last update into 65.63-format fixed-point - /// virtual-time units: - /// v += (delta t << VT_FIXED_SHIFT) / sum w - /// The caller must pass the current real time (`now_inst`). - fn advance_vclock(&mut self, now_inst: Instant) { - if let Some(prev) = self.last_update { - let delta_real = now_inst - prev; - if self.run_q.weight() > 0 { - let delta_vt = - ((delta_real.as_nanos()) << VT_FIXED_SHIFT) / self.run_q.weight() as u128; - self.vclock = self.vclock.saturating_add(delta_vt); - } - } - self.last_update = Some(now_inst); - } - - fn insert_into_runq(&mut self, mut new_task: Box) { - let now = now().expect("systimer not running"); - - self.advance_vclock(now); - - new_task.inserting_into_runqueue(self.vclock); - - if let Some(current) = self.run_q.current() { - // We force a reschedule if: - // - // We are currently idling, OR The new task has an earlier deadline - // than the current task. - if current.is_idle_task() || new_task.v_deadline < current.v_deadline { - self.force_resched = true; - } - } - - self.run_q.enqueue_task(new_task); - - self.update_global_least_tasked_cpu_info(); - } - - pub fn wakeup(&mut self, desc: TaskDescriptor) { - if let Some(task) = self.wait_q.remove(&desc) { - self.insert_into_runq(task); - } else { - warn!( - "Spurious wakeup for task {:?} on CPU {:?}", - desc, - CpuId::this().value() - ); - } - } - pub fn do_schedule(&mut self) { self.update_global_least_tasked_cpu_info(); - // Update Clocks + let now_inst = now().expect("System timer not initialised"); - self.advance_vclock(now_inst); + { + let current = self.run_q.current_mut(); - let mut needs_resched = self.force_resched; + current.task.update_accounting(Some(now_inst)); - if let Some(current) = self.run_q.current_mut() { - current.update_accounting(Some(now_inst)); // Reset accounting baseline after updating stats to avoid double-counting // the same time interval on the next scheduler tick. - current.reset_last_account(now_inst); - // If the current task is IDLE, we always want to proceed to the - // scheduler core to see if a real task has arrived. - if current.is_idle_task() { - needs_resched = true; - } else if current.tick(now_inst) { - // Otherwise, check if the real task expired - needs_resched = true; - } - } else { - needs_resched = true; + current.task.reset_last_account(now_inst); } - if !needs_resched - && let Some(current) = self.run_q.current() - && matches!(*current.state.lock_save_irq(), TaskState::Running) - { - // Fast Path: Only return if we have a valid task (Running state), - // it has budget, AND it's not the idle task. - return; - } - - // Reset the force flag for next time. - self.force_resched = false; - - // Select Next Task. - let next_task_desc = self.run_q.find_next_runnable_desc(self.vclock); - - match self.run_q.switch_tasks(next_task_desc, now_inst) { - SwitchResult::AlreadyRunning => { - // Nothing to do. - return; - } - SwitchResult::Blocked { old_task } => { - // If the blocked task has finished, allow it to drop here so it's - // resources are released. - if !old_task.state.lock_save_irq().is_finished() { - self.wait_q.insert(old_task.descriptor(), old_task); - } - } - // fall-thru. - SwitchResult::Preempted => {} - } - - // Update all context since the task has switched. - if let Some(new_current) = self.run_q.current_mut() { - NUM_CONTEXT_SWITCHES.fetch_add(1, Ordering::Relaxed); - ArchImpl::context_switch(new_current.t_shared.clone()); - let now = now().unwrap(); - new_current.reset_last_account(now); - CUR_TASK_PTR.borrow_mut().set_current(&mut new_current.task); - } + self.run_q.schedule(now_inst); } } pub fn sched_init() { - let idle_task = ArchImpl::create_idle_task(); let init_task = OwnedTask::create_init_task(); - init_task - .vm - .lock_save_irq() - .mm_mut() - .address_space_mut() - .activate(); - - *init_task.state.lock_save_irq() = TaskState::Runnable; + let init_work = Work::new(Box::new(init_task)); { let mut task_list = TASK_LIST.lock_save_irq(); - task_list.insert(idle_task.descriptor(), Arc::downgrade(&idle_task.t_shared)); - task_list.insert(init_task.descriptor(), Arc::downgrade(&init_task.t_shared)); + task_list.insert(init_work.task.descriptor(), Arc::downgrade(&init_work)); } - insert_task(Box::new(idle_task)); - insert_task(Box::new(init_task)); + insert_work(init_work); schedule(); } pub fn sched_init_secondary() { - let idle_task = ArchImpl::create_idle_task(); - - insert_task(Box::new(idle_task)); // Force update_global_least_tasked_cpu_info SCHED_STATE.borrow().update_global_least_tasked_cpu_info(); + + schedule(); } pub fn sys_sched_yield() -> Result { schedule(); Ok(0) } + +pub fn current_work() -> Arc { + SCHED_STATE.borrow().run_q.current().task.clone() +} + +pub fn current_work_waker() -> Waker { + create_waker(current_work()) +} diff --git a/src/sched/runqueue.rs b/src/sched/runqueue.rs deleted file mode 100644 index c3431c7..0000000 --- a/src/sched/runqueue.rs +++ /dev/null @@ -1,169 +0,0 @@ -use core::cmp::Ordering; - -use crate::{ - drivers::timer::Instant, - process::{TaskDescriptor, TaskState}, -}; -use alloc::{boxed::Box, collections::btree_map::BTreeMap}; -use log::warn; - -use super::{VCLOCK_EPSILON, sched_task::SchedulableTask}; - -/// The result of a requested task switch. -pub enum SwitchResult { - /// The requested task is already running. No changes made. - AlreadyRunning, - /// A switch occurred. The previous task was Runnable and has been - /// re-queued. - Preempted, - /// A switch occurred. The previous task is Blocked (or Finished) and - /// ownership is returned to the caller (to handle sleep/wait queues). - Blocked { old_task: Box }, -} - -/// A simple weight-tracking runqueue. -/// -/// Invariants: -/// 1. `total_weight` = Sum(queue tasks) + Weight(running_task) (excluding the idle task). -/// 2. `running_task` is NOT in `queue`. -pub struct RunQueue { - total_weight: u64, - pub(super) queue: BTreeMap>, - pub(super) running_task: Option>, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - total_weight: 0, - queue: BTreeMap::new(), - running_task: None, - } - } - - pub fn switch_tasks(&mut self, next_task: TaskDescriptor, now_inst: Instant) -> SwitchResult { - if let Some(current) = self.current() - && current.descriptor() == next_task - { - return SwitchResult::AlreadyRunning; - } - - let mut new_task = match self.queue.remove(&next_task) { - Some(t) => t, - None => { - warn!("Task {next_task:?} not found for switch."); - return SwitchResult::AlreadyRunning; - } - }; - - new_task.about_to_execute(now_inst); - - // Perform the swap. - if let Some(old_task) = self.running_task.replace(new_task) { - let state = *old_task.state.lock_save_irq(); - - match state { - TaskState::Running | TaskState::Runnable => { - // Update state to strictly Runnable - *old_task.state.lock_save_irq() = TaskState::Runnable; - - self.queue.insert(old_task.descriptor(), old_task); - - return SwitchResult::Preempted; - } - _ => { - self.total_weight = self.total_weight.saturating_sub(old_task.weight() as u64); - - return SwitchResult::Blocked { old_task }; - } - } - } - - // If there was no previous task (e.g., boot up), it counts as a - // Preemption. - SwitchResult::Preempted - } - - pub fn weight(&self) -> u64 { - self.total_weight - } - - #[allow(clippy::borrowed_box)] - pub fn current(&self) -> Option<&Box> { - self.running_task.as_ref() - } - - pub fn current_mut(&mut self) -> Option<&mut Box> { - self.running_task.as_mut() - } - - fn fallback_current_or_idle(&self) -> TaskDescriptor { - if let Some(ref current) = self.running_task { - let s = *current.state.lock_save_irq(); - if !current.is_idle_task() && (s == TaskState::Runnable || s == TaskState::Running) { - return current.descriptor(); - } - } - - TaskDescriptor::this_cpus_idle() - } - - /// Returns the Descriptor of the best task to run next. This compares the - /// best task in the run_queue against the currently running task. - pub fn find_next_runnable_desc(&self, vclock: u128) -> TaskDescriptor { - // Find the best candidate from the Run Queue - let best_queued_entry = self - .queue - .iter() - .filter(|(_, task)| { - !task.is_idle_task() && task.v_eligible.saturating_sub(vclock) <= VCLOCK_EPSILON - }) - .min_by(|(_, t1), (_, t2)| t1.compare_with(t2)); - - let (best_queued_desc, best_queued_task) = match best_queued_entry { - Some((d, t)) => (*d, t), - // If runqueue is empty (or no eligible tasks), we might just run - // current or idle. - None => return self.fallback_current_or_idle(), - }; - - // Compare against the current task - if let Some(current) = self.current() { - // If current is not Runnable (e.g. it blocked, yielded, or - // finished), it cannot win. - let current_state = *current.state.lock_save_irq(); - if current_state != TaskState::Runnable && current_state != TaskState::Running { - return best_queued_desc; - } - - // compare current vs challenger - match current.compare_with(best_queued_task) { - Ordering::Less | Ordering::Equal => { - // Current is better (has earlier deadline) or equal. Keep - // running current. - return current.descriptor(); - } - Ordering::Greater => { - // Queued task is better. Switch. - return best_queued_desc; - } - } - } - - best_queued_desc - } - - /// Inserts `task` into this CPU's run-queue. - pub fn enqueue_task(&mut self, new_task: Box) { - if !new_task.is_idle_task() { - self.total_weight = self.total_weight.saturating_add(new_task.weight() as u64); - } - - if let Some(old_task) = self.queue.insert(new_task.descriptor(), new_task) { - // Handle the edge case where we overwrite a task. If we replaced - // someone, we must subtract their weight to avoid accounting drift. - warn!("Overwrote active task {:?}", old_task.descriptor()); - self.total_weight = self.total_weight.saturating_sub(old_task.weight() as u64); - } - } -} diff --git a/src/sched/runqueue/mod.rs b/src/sched/runqueue/mod.rs new file mode 100644 index 0000000..6dd07d4 --- /dev/null +++ b/src/sched/runqueue/mod.rs @@ -0,0 +1,220 @@ +use super::{ + CUR_TASK_PTR, NUM_CONTEXT_SWITCHES, + sched_task::{RunnableTask, Work, state::TaskState}, +}; +use crate::{ + arch::{Arch, ArchImpl}, + drivers::timer::Instant, +}; +use alloc::{boxed::Box, collections::binary_heap::BinaryHeap, sync::Arc}; +use core::{cmp, ptr, sync::atomic::Ordering}; +use vclock::VClock; + +mod vclock; + +// Wrapper for the Ineligible Heap (Min-Heap ordered by v_eligible) +struct ByEligible(RunnableTask); + +impl PartialEq for ByEligible { + fn eq(&self, other: &Self) -> bool { + self.0.v_eligible == other.0.v_eligible + } +} + +impl Eq for ByEligible {} + +impl Ord for ByEligible { + fn cmp(&self, other: &Self) -> cmp::Ordering { + other.0.v_eligible.cmp(&self.0.v_eligible) + } +} + +impl PartialOrd for ByEligible { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Wrapper for the Eligible Heap (Min-Heap ordered by deadline) +struct ByDeadline(RunnableTask); + +impl PartialEq for ByDeadline { + fn eq(&self, other: &Self) -> bool { + other.0.compare_with(&self.0) == cmp::Ordering::Equal + } +} + +impl Eq for ByDeadline {} + +impl Ord for ByDeadline { + fn cmp(&self, other: &Self) -> cmp::Ordering { + // Reversed so BinaryHeap acts as a MIN-heap + other.0.compare_with(&self.0) + } +} + +impl PartialOrd for ByDeadline { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// A simple weight-tracking runqueue. +/// +/// Invariants: +/// 1. `total_weight` = Sum(queue tasks) + Weight(running_task) (excluding the idle task). +/// 2. `running_task` is NOT in `queue`. +pub struct RunQueue { + total_weight: u64, + ineligible: BinaryHeap, + eligible: BinaryHeap, + pub(super) running_task: Option, + v_clock: VClock, + idle: RunnableTask, +} + +impl RunQueue { + pub fn new() -> Self { + let idle = Work::new(Box::new(ArchImpl::create_idle_task())).into_runnable(); + + Self { + total_weight: 0, + ineligible: BinaryHeap::new(), + eligible: BinaryHeap::new(), + running_task: None, + v_clock: VClock::new(), + idle, + } + } + + /// Picks the next task to execute and performs the context switch. + pub fn schedule(&mut self, now: Instant) { + self.v_clock.advance(now, self.weight()); + + let mut prev_task = ptr::null(); + + if let Some(cur_task) = self.running_task.take() { + let state = cur_task.task.state.load(Ordering::Acquire); + match state { + TaskState::Running | TaskState::Woken => { + // requeue for the next time slice. + prev_task = Arc::as_ptr(&cur_task.task); + self.enqueue(cur_task); + } + TaskState::PendingSleep | TaskState::PendingStop => { + // Task wants to deactivate. Drop the RunnableTask + // (restoring sched_data), then finalize the transition. + let work = cur_task.task.clone(); + self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64); + drop(cur_task); + + if !work.state.finalize_deactivation() { + // Woken concurrently — re-enqueue. + self.add_work(work); + } + } + _ => { + // Finished — remove weight. + self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64); + } + } + } + + if let Some(mut next_task) = self.find_next_task() { + next_task.about_to_execute(now); + + if Arc::as_ptr(&next_task.task) != prev_task { + // If we scheduled a different task than before, context switch. + NUM_CONTEXT_SWITCHES.fetch_add(1, Ordering::Relaxed); + + next_task.switch_context(); + + next_task.task.reset_last_account(now); + } + + CUR_TASK_PTR + .borrow_mut() + .set_current(Box::as_ptr(&next_task.task.task) as *mut _); + + self.running_task = Some(next_task); + } else { + // No next task. Go idle. + self.idle.switch_context(); + + CUR_TASK_PTR + .borrow_mut() + .set_current(Box::as_ptr(&self.idle.task.task) as *mut _); + } + } + + /// Pops any tasks that were ineligible which have become eligible from the + /// ineligible queue. + fn pop_now_eligible_task(&mut self) -> Option { + let ByEligible(tsk) = self.ineligible.peek()?; + + if self.v_clock.is_task_eligible(tsk) { + let ByEligible(tsk) = self.ineligible.pop()?; + + Some(tsk) + } else { + None + } + } + + /// Returns the Descriptor of the best task to run next. + /// + /// # Returns + /// - `None` when no new task can be found (the runqueue is empty). + /// - `Some(tsk)` when the current task should be replaced with `tsk`. + fn find_next_task(&mut self) -> Option { + while let Some(tsk) = self.pop_now_eligible_task() { + self.eligible.push(ByDeadline(tsk)); + } + + if let Some(ByDeadline(best)) = self.eligible.pop() { + return Some(best); + } + + // Fast forward logic, if we have non-eligible, don't go idle. + // Fast-forward vclk to the next earliest `v_eligible`. + if let Some(ByEligible(tsk)) = self.ineligible.peek() { + self.v_clock.fast_forward(tsk.v_eligible); + return self.find_next_task(); + } + + // The runqueues are completely empty. Go idle. + None + } + + fn enqueue(&mut self, task: RunnableTask) { + task.task.state.mark_runnable(); + + if self.v_clock.is_task_eligible(&task) { + self.eligible.push(ByDeadline(task)); + } else { + self.ineligible.push(ByEligible(task)); + } + } + + /// Inserts `new_task` into this CPU's run-queue. + pub fn add_work(&mut self, new_task: Arc) { + let new_task = new_task.into_runnable(); + + self.total_weight = self.total_weight.saturating_add(new_task.weight() as u64); + + self.enqueue(new_task); + } + + pub fn weight(&self) -> u64 { + self.total_weight + } + + #[allow(clippy::borrowed_box)] + pub fn current(&self) -> &RunnableTask { + self.running_task.as_ref().unwrap_or(&self.idle) + } + + pub fn current_mut(&mut self) -> &mut RunnableTask { + self.running_task.as_mut().unwrap_or(&mut self.idle) + } +} diff --git a/src/sched/runqueue/vclock.rs b/src/sched/runqueue/vclock.rs new file mode 100644 index 0000000..f6a2cd7 --- /dev/null +++ b/src/sched/runqueue/vclock.rs @@ -0,0 +1,43 @@ +use crate::{ + drivers::timer::Instant, + sched::{VCLOCK_EPSILON, VT_FIXED_SHIFT, sched_task::RunnableTask}, +}; + +pub struct VClock { + last_update: Option, + clk: u128, +} + +impl VClock { + pub fn new() -> Self { + Self { + last_update: None, + clk: 0, + } + } + + pub fn is_task_eligible(&self, tsk: &RunnableTask) -> bool { + tsk.v_eligible.saturating_sub(self.clk) <= VCLOCK_EPSILON + } + + /// Fast forward the clk to the specified clock, `new_clk`. + pub fn fast_forward(&mut self, new_clk: u128) { + self.clk = new_clk; + } + + /// Advance the virtual clock (`v_clock`) by converting the elapsed real time + /// since the last update into 65.63-format fixed-point virtual-time units: + /// v += (delta t << VT_FIXED_SHIFT) / sum w The caller must pass the + /// current real time (`now_inst`). + pub fn advance(&mut self, now_inst: Instant, weight: u64) { + if let Some(prev) = self.last_update { + let delta_real = now_inst - prev; + + if weight > 0 { + let delta_vt = ((delta_real.as_nanos()) << VT_FIXED_SHIFT) / weight as u128; + self.clk = self.clk.saturating_add(delta_vt); + } + } + self.last_update = Some(now_inst); + } +} diff --git a/src/sched/sched_task.rs b/src/sched/sched_task/mod.rs similarity index 71% rename from src/sched/sched_task.rs rename to src/sched/sched_task/mod.rs index 1cb3afb..82d97a5 100644 --- a/src/sched/sched_task.rs +++ b/src/sched/sched_task/mod.rs @@ -3,18 +3,27 @@ use core::{ ops::{Deref, DerefMut}, }; -use alloc::boxed::Box; - +use super::{DEFAULT_TIME_SLICE, SCHED_WEIGHT_BASE, VT_FIXED_SHIFT}; use crate::{ + arch::{Arch, ArchImpl}, drivers::timer::{Instant, schedule_preempt}, - kernel::cpu_id::CpuId, - process::{TaskState, owned::OwnedTask}, + process::owned::OwnedTask, + sync::SpinLock, }; -use super::{DEFAULT_TIME_SLICE, SCHED_WEIGHT_BASE, VT_FIXED_SHIFT}; +use alloc::{boxed::Box, sync::Arc}; +use state::TaskStateMachine; -pub struct SchedulableTask { +pub mod state; + +pub struct Work { pub task: Box, + pub state: TaskStateMachine, + pub sched_data: SpinLock>, +} + +#[derive(Clone)] +pub struct SchedulerData { pub v_runtime: u128, /// Virtual time at which the task becomes eligible (v_ei). pub v_eligible: u128, @@ -25,7 +34,32 @@ pub struct SchedulableTask { pub last_run: Option, } -impl Deref for SchedulableTask { +impl SchedulerData { + fn new() -> Self { + Self { + v_runtime: 0, + v_eligible: 0, + v_deadline: 0, + exec_start: None, + deadline: None, + last_run: None, + } + } +} + +pub struct RunnableTask { + pub(super) task: Arc, + pub(super) sched_data: SchedulerData, +} + +impl Drop for RunnableTask { + // Replace the hot sched info back into the struct. + fn drop(&mut self) { + *self.task.sched_data.lock_save_irq() = Some(self.sched_data.clone()); + } +} + +impl Deref for Work { type Target = OwnedTask; fn deref(&self) -> &Self::Target { @@ -33,27 +67,52 @@ impl Deref for SchedulableTask { } } -impl DerefMut for SchedulableTask { +impl DerefMut for Work { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.task } } -impl SchedulableTask { - pub fn new(task: Box) -> Box { - Box::new(Self { +impl Work { + pub fn new(task: Box) -> Arc { + Arc::new(Self { task, - v_runtime: 0, - v_eligible: 0, - v_deadline: 0, - exec_start: None, - deadline: None, - last_run: None, + state: TaskStateMachine::new(), + sched_data: SpinLock::new(Some(SchedulerData::new())), }) } + pub fn into_runnable(self: Arc) -> RunnableTask { + let sd = self + .sched_data + .lock_save_irq() + .take() + .expect("Should have sched data"); + + RunnableTask { + task: self, + sched_data: sd, + } + } +} + +impl Deref for RunnableTask { + type Target = SchedulerData; + + fn deref(&self) -> &Self::Target { + &self.sched_data + } +} + +impl DerefMut for RunnableTask { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sched_data + } +} + +impl RunnableTask { /// Re-issue a virtual deadline - pub fn replenish_deadline(&mut self) { + fn replenish_deadline(&mut self) { let q_ns: u128 = DEFAULT_TIME_SLICE.as_nanos(); let v_delta = (q_ns << VT_FIXED_SHIFT) / self.weight() as u128; self.v_deadline = self.v_eligible + v_delta; @@ -95,19 +154,11 @@ impl SchedulableTask { /// weight = priority + SCHED_WEIGHT_BASE /// The sum is clamped to a minimum of 1 pub fn weight(&self) -> u32 { - let w = self.priority() as i32 + SCHED_WEIGHT_BASE; + let w = self.task.priority() as i32 + SCHED_WEIGHT_BASE; if w <= 0 { 1 } else { w as u32 } } pub fn compare_with(&self, other: &Self) -> core::cmp::Ordering { - if self.is_idle_task() { - return Ordering::Greater; - } - - if other.is_idle_task() { - return Ordering::Less; - } - self.v_deadline .cmp(&other.v_deadline) .then_with(|| self.v_runtime.cmp(&other.v_runtime)) @@ -141,8 +192,7 @@ impl SchedulableTask { /// Setup task accounting info such that it is about to be executed. pub fn about_to_execute(&mut self, now: Instant) { self.exec_start = Some(now); - *self.last_cpu.lock_save_irq() = CpuId::this(); - *self.state.lock_save_irq() = TaskState::Running; + self.task.state.activate(); // Deadline logic if self.deadline.is_none_or(|d| d <= now + DEFAULT_TIME_SLICE) { @@ -153,4 +203,8 @@ impl SchedulableTask { schedule_preempt(d); } } + + pub fn switch_context(&self) { + ArchImpl::context_switch(self.task.t_shared.clone()); + } } diff --git a/src/sched/sched_task/state.rs b/src/sched/sched_task/state.rs new file mode 100644 index 0000000..3cec9ff --- /dev/null +++ b/src/sched/sched_task/state.rs @@ -0,0 +1,240 @@ +use atomic_enum::atomic_enum; +use core::{fmt::Display, sync::atomic::Ordering}; + +#[atomic_enum] +#[derive(PartialEq, Eq)] +pub enum TaskState { + Running, + Runnable, + Woken, + Stopped, + Sleeping, + Finished, + /// Wants to sleep but RunnableTask still on the runqueue. + PendingSleep, + /// Wants to stop but RunnableTask still on the runqueue; + PendingStop, +} + +impl Display for TaskState { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let state_str = match self { + TaskState::Running => "R", + TaskState::Runnable => "R", + TaskState::Woken => "W", + TaskState::Stopped => "T", + TaskState::Sleeping => "S", + TaskState::Finished => "Z", + TaskState::PendingSleep => "S", + TaskState::PendingStop => "T", + }; + write!(f, "{state_str}") + } +} + +impl TaskState { + pub fn is_finished(self) -> bool { + matches!(self, Self::Finished) + } +} + +/// What the waker should do after calling `wake()`. +pub enum WakerAction { + /// Task was Sleeping/Stopped -> Runnable. Safe to call add_work. + Enqueue, + /// Task was Running/Pending* -> Woken. Sleep prevented, no enqueue needed. + PreventedSleep, + /// No action needed (already Woken/Finished, or CAS lost to another CPU). + None, +} + +/// State machine wrapper around `AtomicTaskState`. +/// +/// Only exposes named transition methods to enforce state transition logic. +/// +/// ```text +/// new() +/// | +/// v +/// +------- wake() ------> RUNNABLE <------- wake() -------+-------+ +/// | (Enqueue) | (Enqueue) | | +/// | activate() | | +/// | | | | +/// | v | | +/// | RUNNING ----> finish() ---> FINISHED | +/// | / \ | +/// | try_pending / \ try_pending | +/// | _sleep() / \ | +/// | v v | +/// | PENDING_SLEEP PENDING_STOP | +/// | | | | +/// | finalize | | finalize | +/// | _deact() | | _deact() | +/// | v v | +/// +--------------- SLEEPING STOPPED --------------------------+ +/// +/// Race: wake() on {Running, Runnable, PendingSleep, PendingStop} --> WOKEN. +/// try_pending_sleep() clears WOKEN back to Running (prevents spurious +/// sleep). +/// ``` +pub struct TaskStateMachine(AtomicTaskState); + +impl TaskStateMachine { + /// New tasks starts as Runnable. + pub fn new() -> Self { + Self(AtomicTaskState::new(TaskState::Runnable)) + } + + /// Read the current state. + pub fn load(&self, ordering: Ordering) -> TaskState { + self.0.load(ordering) + } + + /// Scheduler is about to execute this task (-> Running). + pub fn activate(&self) { + self.0.store(TaskState::Running, Ordering::Relaxed); + } + + /// Task placed on a run queue (-> Runnable). + pub fn mark_runnable(&self) { + self.0.store(TaskState::Runnable, Ordering::Relaxed); + } + + /// Scheduler finalizes deactivation after dropping RunnableTask. + /// + /// CAS PendingSleep -> Sleeping or PendingStop -> Stopped. Returns `true` + /// if deactivated, `false` if woken (caller should re-enqueue). + pub fn finalize_deactivation(&self) -> bool { + let state = self.0.load(Ordering::Acquire); + let target = match state { + TaskState::PendingSleep => TaskState::Sleeping, + TaskState::PendingStop => TaskState::Stopped, + // Already woken concurrently. + _ => return false, + }; + + self.0 + .compare_exchange(state, target, Ordering::Release, Ordering::Acquire) + .is_ok() + } + + /// Future returned `Poll::Pending`, try to initiate sleep. + /// + /// CAS Running -> PendingSleep. Returns `true` if sleep initiated and a new + /// task should be scheduled, `false` if woken/finished and we should + /// re-poll. + pub fn try_pending_sleep(&self) -> bool { + let state = self.0.load(Ordering::Acquire); + match state { + TaskState::Running | TaskState::Runnable => { + match self.0.compare_exchange( + state, + TaskState::PendingSleep, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(_) => true, + Err(TaskState::Woken) => { + // Woken between load and CAS — clear woken, don't sleep. + self.0.store(TaskState::Running, Ordering::Release); + false + } + Err(TaskState::Finished) => true, + Err(s) => { + unreachable!("Unexpected task state {s:?} during pending_sleep transition") + } + } + } + TaskState::Woken => { + self.0.store(TaskState::Running, Ordering::Release); + false + } + TaskState::Finished => true, + s => unreachable!("Unexpected task state {s:?} during pending_sleep transition"), + } + } + + /// Ptrace/signal stop: try CAS Running -> PendingStop. + /// + /// Returns `true` if the stop was initiated, `false` if the task was woken + /// concurrently (caller should re-process kernel work instead of sleeping). + pub fn try_pending_stop(&self) -> bool { + let state = self.0.load(Ordering::Acquire); + match state { + TaskState::Running | TaskState::Runnable => self + .0 + .compare_exchange( + state, + TaskState::PendingStop, + Ordering::Release, + Ordering::Acquire, + ) + .is_ok(), + TaskState::Woken => { + self.0.store(TaskState::Running, Ordering::Release); + false + } + // Already stopped/sleeping/finished — nothing to do. + _ => true, + } + } + + /// Wake the task. Returns what the caller should do. + pub fn wake(&self) -> WakerAction { + loop { + let state = self.0.load(Ordering::Acquire); + match state { + TaskState::Sleeping | TaskState::Stopped => { + if self + .0 + .compare_exchange( + state, + TaskState::Runnable, + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { + return WakerAction::Enqueue; + } + // Another CPU handled it. + return WakerAction::None; + } + TaskState::Running + | TaskState::Runnable + | TaskState::PendingSleep + | TaskState::PendingStop => { + // Task hasn't fully deactivated yet. Set Woken to prevent + // the sleep/stop transition from completing. + if self + .0 + .compare_exchange( + state, + TaskState::Woken, + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { + return WakerAction::PreventedSleep; + } + // State changed under us (e.g. PendingSleep -> Sleeping). + // Retry. + continue; + } + // Already Woken or Finished: noop. + _ => return WakerAction::None, + } + } + } + + /// Mark task as finished (-> Finished). + pub fn finish(&self) { + self.0.store(TaskState::Finished, Ordering::Release); + } + + /// Check if finished. + pub fn is_finished(&self) -> bool { + self.0.load(Ordering::Acquire).is_finished() + } +} diff --git a/src/sched/uspc_ret.rs b/src/sched/uspc_ret.rs index 47503ec..53d2633 100644 --- a/src/sched/uspc_ret.rs +++ b/src/sched/uspc_ret.rs @@ -1,8 +1,7 @@ -use super::{current::current_task, schedule, waker::create_waker}; +use super::{current::current_task, current_work, current_work_waker, schedule}; use crate::{ arch::{Arch, ArchImpl}, process::{ - TaskState, ctx::UserCtx, exit::kernel_exit_with_signal, thread_group::{ @@ -12,7 +11,7 @@ use crate::{ }, }; use alloc::boxed::Box; -use core::{ptr, task::Poll}; +use core::{ptr, sync::atomic::Ordering, task::Poll}; enum State { PickNewTask, @@ -20,6 +19,15 @@ enum State { ReturnToUserspace, } +/// Try to transition the current task from Running to PendingSleep atomically. +/// +/// Returns `true` if the task should go to sleep (state set to PendingSleep or +/// task is Finished). Returns `false` if the task was woken concurrently +/// and should re-process its kernel work instead. +fn try_sleep_current() -> bool { + current_work().state.try_pending_sleep() +} + /// Prepares the kernel for a safe return to userspace, guaranteeing a valid /// context frame is prepared. /// @@ -84,13 +92,9 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { State::ProcessKernelWork => { // First, let's handle signals. If there is any scheduled signal // work (this has to be async to handle faults, etc). - let (signal_work, desc, is_idle) = { + let (signal_work, is_idle) = { let mut task = current_task(); - ( - task.ctx.take_signal_work(), - task.descriptor(), - task.is_idle_task(), - ) + (task.ctx.take_signal_work(), task.is_idle_task()) }; if let Some(mut signal_work) = signal_work { @@ -100,7 +104,7 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { match signal_work .as_mut() - .poll(&mut core::task::Context::from_waker(&create_waker(desc))) + .poll(&mut core::task::Context::from_waker(¤t_work_waker())) { Poll::Ready(Ok(state)) => { // Signal actioning is complete. Return to userspace. @@ -118,39 +122,13 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { continue; } Poll::Pending => { - let mut task = current_task(); + current_task().ctx.put_signal_work(signal_work); - task.ctx.put_signal_work(signal_work); - let mut task_state = task.state.lock_save_irq(); - - match *task_state { - // The main path we expect to take to sleep the - // task. - // Task is currently running or is runnable and will now sleep. - TaskState::Running | TaskState::Runnable => { - *task_state = TaskState::Sleeping; - } - // If we were woken between the future returning - // `Poll::Pending` and acquiring the lock above, - // the waker will have put us into this state. - // Transition back to `Running` since we're - // ready to progress with more work. - TaskState::Woken => { - *task_state = TaskState::Running; - } - // If the task finished concurrently while we were - // polling its signal work, let the scheduler - // pick another task; no further work to do here. - TaskState::Finished => {} - // We should never get here for any other state. - s => { - unreachable!( - "Unexpected task state {s:?} during signal task sleep" - ); - } + if try_sleep_current() { + state = State::PickNewTask; + } else { + state = State::ProcessKernelWork; } - - state = State::PickNewTask; continue; } } @@ -165,16 +143,14 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { match kern_work .as_mut() - .poll(&mut core::task::Context::from_waker(&create_waker(desc))) + .poll(&mut core::task::Context::from_waker(¤t_work_waker())) { Poll::Ready(()) => { - let task = current_task(); - // If the task just exited (entered the finished // state), don't return to it's userspace, instead, // find another task to execute, removing this task // from the runqueue, reaping it's resouces. - if task.state.lock_save_irq().is_finished() { + if current_work().state.load(Ordering::Acquire).is_finished() { state = State::PickNewTask; continue; } @@ -188,41 +164,13 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { continue; } Poll::Pending => { - let mut task = current_task(); + current_task().ctx.put_kernel_work(kern_work); - // Kernel work hasn't finished. A wake up should - // have been scheduled by the future. Replace the - // kernel work context back into the task, set it's - // state to sleeping so it's not scheduled again and - // search for another task to execute. - task.ctx.put_kernel_work(kern_work); - let mut task_state = task.state.lock_save_irq(); - - match *task_state { - // Task is runnable or running, put it to sleep. - TaskState::Running | TaskState::Runnable => { - *task_state = TaskState::Sleeping; - } - // If we were woken between the future returning - // `Poll::Pending` and acquiring the lock above, - // the waker will have put us into this state. - // Transition back to `Running` since we're - // ready to progress with more work. - TaskState::Woken => { - *task_state = TaskState::Running; - } - // Task finished concurrently while we were trying - // to put it to sleep; just reschedule and let - // teardown handle it. - TaskState::Finished => {} - // We should never get here for any other state. - s => { - unreachable!( - "Unexpected task state {s:?} during kernel task sleep" - ); - } + if try_sleep_current() { + state = State::PickNewTask; + } else { + state = State::ProcessKernelWork; } - state = State::PickNewTask; continue; } } @@ -242,11 +190,17 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { while let Some(signal) = task.take_signal() { let mut ptrace = task.ptrace.lock_save_irq(); if ptrace.trace_signal(signal, task.ctx.user()) { + ptrace.set_waker(current_work_waker()); ptrace.notify_tracer_of_trap(&task.process); - ptrace.set_waker(create_waker(task.descriptor())); + drop(ptrace); + drop(task); - *task.state.lock_save_irq() = TaskState::Stopped; - state = State::PickNewTask; + if current_work().state.try_pending_stop() { + state = State::PickNewTask; + } else { + // Woken concurrently (tracer already resumed us). + state = State::ProcessKernelWork; + } continue 'dispatch; } drop(ptrace); @@ -284,7 +238,7 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { for thr_weak in process.tasks.lock_save_irq().values() { if let Some(thr) = thr_weak.upgrade() { - *thr.state.lock_save_irq() = TaskState::Stopped; + thr.state.try_pending_stop(); } } @@ -294,13 +248,10 @@ pub fn dispatch_userspace_task(ctx: *mut UserCtx) { Some(KSignalAction::Continue) => { let process = &task.process; - // Wake up all sleeping threads in the process. + // Wake up all stopped/sleeping threads in the process. for thr_weak in process.tasks.lock_save_irq().values() { if let Some(thr) = thr_weak.upgrade() { - let mut st = thr.state.lock_save_irq(); - if *st == TaskState::Sleeping { - *st = TaskState::Runnable; - } + crate::sched::waker::create_waker(thr).wake(); } } diff --git a/src/sched/waker.rs b/src/sched/waker.rs index 4bac0ab..1c6629b 100644 --- a/src/sched/waker.rs +++ b/src/sched/waker.rs @@ -1,62 +1,58 @@ -use crate::{ - interrupts::cpu_messenger::{Message, message_cpu}, - kernel::cpu_id::CpuId, - process::{TASK_LIST, TaskDescriptor, TaskState}, -}; +use alloc::sync::Arc; use core::task::{RawWaker, RawWakerVTable, Waker}; -use super::SCHED_STATE; +use super::{ + SCHED_STATE, + sched_task::{Work, state::WakerAction}, +}; unsafe fn clone_waker(data: *const ()) -> RawWaker { - RawWaker::new(data, &VTABLE) + let data: *const Work = data.cast(); + + unsafe { Arc::increment_strong_count(data) }; + + RawWaker::new(data.cast(), &VTABLE) } -/// Wakes the task. This consumes the waker. -unsafe fn wake_waker(data: *const ()) { - let desc = TaskDescriptor::from_ptr(data); +/// Wakes the task. This does not consume the waker. +unsafe fn wake_waker_no_consume(data: *const ()) { + let data: *const Work = data.cast(); - let task = TASK_LIST - .lock_save_irq() - .get(&desc) - .and_then(|x| x.upgrade()); + // Increment the strong count first so that Arc::from_raw does not + // consume the waker's own reference. + unsafe { Arc::increment_strong_count(data) }; + let work = unsafe { Arc::from_raw(data) }; - if let Some(task) = task { - let mut state = task.state.lock_save_irq(); - let locus = *task.last_cpu.lock_save_irq(); - - match *state { - // If the task has been put to sleep, then wake it up. - TaskState::Sleeping | TaskState::Stopped => { - if locus == CpuId::this() { - *state = TaskState::Runnable; - SCHED_STATE.borrow_mut().wakeup(desc); - } else { - message_cpu(locus, Message::WakeupTask(create_waker(desc))) - .expect("Could not wakeup task on other CPU"); - } - } - // If the task is running, mark it so it doesn't actually go to - // sleep when poll returns. This covers the small race-window - // between a future returning `Poll::Pending` and the sched setting - // the state to sleeping. - TaskState::Running => { - *state = TaskState::Woken; - } - _ => {} + match work.state.wake() { + WakerAction::Enqueue => { + SCHED_STATE.borrow_mut().run_q.add_work(work); } + WakerAction::PreventedSleep | WakerAction::None => {} } } -unsafe fn drop_waker(_data: *const ()) { - // There is nothing to do. +unsafe fn wake_waker_consume(data: *const ()) { + unsafe { + wake_waker_no_consume(data); + drop_waker(data); + } } -static VTABLE: RawWakerVTable = - RawWakerVTable::new(clone_waker, wake_waker, wake_waker, drop_waker); +unsafe fn drop_waker(data: *const ()) { + let data: *const Work = data.cast(); + unsafe { Arc::decrement_strong_count(data) }; +} + +static VTABLE: RawWakerVTable = RawWakerVTable::new( + clone_waker, + wake_waker_consume, + wake_waker_no_consume, + drop_waker, +); /// Creates a `Waker` for a given `Pid`. -pub fn create_waker(desc: TaskDescriptor) -> Waker { - let raw_waker = RawWaker::new(desc.to_ptr(), &VTABLE); +pub fn create_waker(work: Arc) -> Waker { + let raw_waker = RawWaker::new(Arc::into_raw(work).cast(), &VTABLE); // SAFETY: We have correctly implemented the VTable functions. unsafe { Waker::from_raw(raw_waker) } diff --git a/src/testing/mod.rs b/src/testing/mod.rs index ccbce40..229f325 100644 --- a/src/testing/mod.rs +++ b/src/testing/mod.rs @@ -105,8 +105,7 @@ macro_rules! ktest_impl { $name, fn [<__sync_ $name>]() { let mut fut = alloc::boxed::Box::pin($name()); - let desc = crate::process::TaskDescriptor::from_tgid_tid(crate::process::thread_group::Tgid(0), crate::process::Tid(0)); - let waker = crate::sched::waker::create_waker(desc); + let waker = crate::sched::current_work_waker(); let mut ctx = core::task::Context::from_waker(&waker); loop { match fut.as_mut().poll(&mut ctx) {