diff --git a/src/sched/mod.rs b/src/sched/mod.rs index fd78ec9..a88580d 100644 --- a/src/sched/mod.rs +++ b/src/sched/mod.rs @@ -9,7 +9,7 @@ use crate::{ process::{TASK_LIST, TaskDescriptor, TaskState}, }; use alloc::{boxed::Box, collections::btree_map::BTreeMap, sync::Arc}; -use core::sync::atomic::AtomicUsize; +use core::sync::atomic::{AtomicU64, Ordering}; use core::time::Duration; use current::{CUR_TASK_PTR, current_task}; use libkernel::{UserAddressSpace, error::Result}; @@ -66,7 +66,7 @@ pub const SCHED_WEIGHT_BASE: i32 = 1024; fn schedule() { // Reentrancy Check if SCHED_STATE.try_borrow_mut().is_none() { - log::warn!( + warn!( "Scheduler reentrancy detected on CPU {}", CpuId::this().value() ); @@ -86,18 +86,23 @@ pub fn spawn_kernel_work(fut: impl Future + 'static + Send) { current_task().ctx.put_kernel_work(Box::pin(fut)); } +/// Global atomic storing info about the least-tasked CPU. +/// First 16 bits: CPU ID +/// Next 24 bits: Weight +/// Next 24 bits: Number of waiting tasks +static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0); +const WEIGHT_SHIFT: u32 = 16; +const WAITING_SHIFT: u32 = WEIGHT_SHIFT + 24; + #[cfg(feature = "smp")] -fn get_next_cpu() -> CpuId { - static NEXT_CPU: AtomicUsize = AtomicUsize::new(0); - - let cpu_count = ArchImpl::cpu_count(); - let cpu_id = NEXT_CPU.fetch_add(1, core::sync::atomic::Ordering::Relaxed) % cpu_count; - - CpuId::from_value(cpu_id) +fn get_best_cpu() -> CpuId { + // Get the CPU with the least number of tasks. + let least_tasked_cpu_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire); + CpuId::from_value((least_tasked_cpu_info & 0xffff) as usize) } #[cfg(not(feature = "smp"))] -fn get_next_cpu() -> CpuId { +fn get_best_cpu() -> CpuId { CpuId::this() } @@ -110,7 +115,7 @@ pub fn insert_task(task: Box) { #[cfg(feature = "smp")] pub fn insert_task_cross_cpu(task: Box) { - let cpu = get_next_cpu(); + let cpu = get_best_cpu(); if cpu == CpuId::this() { insert_task(task); } else { @@ -148,6 +153,62 @@ impl SchedState { } } + /// Update the global least-tasked CPU info atomically. + #[cfg(feature = "smp")] + fn update_global_least_tasked_cpu_info(&self) { + fn none() -> Option { + None + } + + per_cpu! { + static LAST_UPDATE: Option = none; + } + + // Try and throttle contention on the atomic variable. + const MIN_COOLDOWN: Duration = Duration::from_millis(16); + if let Some(last) = LAST_UPDATE.borrow().as_ref() + && let Some(now) = now() + && now - *last < MIN_COOLDOWN + { + return; + } + *LAST_UPDATE.borrow_mut() = now(); + + let weight = self.run_q.weight(); + let waiting_tasks = self.wait_q.len() as u64; + let cpu_id = CpuId::this().value() as u64; + let new_info = (cpu_id & 0xffff) + | ((weight & 0xffffff) << WEIGHT_SHIFT) + | ((waiting_tasks & 0xffffff) << WAITING_SHIFT); + let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire); + // Ensure we don't spin forever (possible with a larger number of CPUs) + const MAX_RETRIES: usize = 8; + // Ensure consistency + for _ in 0..MAX_RETRIES { + let old_cpu_id = old_info & 0xffff; + let old_weight = (old_info >> WEIGHT_SHIFT) & 0xffffff; + let old_waiting = (old_info >> WAITING_SHIFT) & 0xffffff; + let metric = weight + (waiting_tasks * SCHED_WEIGHT_BASE as u64); + let old_metric = old_weight + (old_waiting * SCHED_WEIGHT_BASE as u64); + if (cpu_id == old_cpu_id && old_info != new_info) || (metric < old_metric) { + match LEAST_TASKED_CPU_INFO.compare_exchange( + old_info, + new_info, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(x) => old_info = x, + } + } + } + } + + #[cfg(not(feature = "smp"))] + fn update_global_least_tasked_cpu_info(&self) { + // No-op on single-core systems. + } + /// Advance the per-CPU virtual clock (`vclock`) by converting the elapsed /// real time since the last update into 65.63-format fixed-point /// virtual-time units: @@ -183,6 +244,8 @@ impl SchedState { } self.run_q.enqueue_task(new_task); + + self.update_global_least_tasked_cpu_info(); } pub fn wakeup(&mut self, desc: TaskDescriptor) { @@ -198,6 +261,7 @@ impl SchedState { } pub fn do_schedule(&mut self) { + self.update_global_least_tasked_cpu_info(); // Update Clocks let now_inst = now().expect("System timer not initialised"); @@ -290,6 +354,8 @@ pub fn sched_init_secondary() { let idle_task = ArchImpl::create_idle_task(); insert_task(Box::new(idle_task)); + // Force update_global_least_tasked_cpu_info + SCHED_STATE.borrow().update_global_least_tasked_cpu_info(); } pub fn sys_sched_yield() -> Result {