proper cross-cpu task insertion logic

chooses the CPU with the least weight of both running and runnable tasks
This commit is contained in:
Ashwin Naren
2026-01-15 22:29:26 -08:00
parent e7a0ec6801
commit 08f42868b2

View File

@@ -9,7 +9,7 @@ use crate::{
process::{TASK_LIST, TaskDescriptor, TaskState},
};
use alloc::{boxed::Box, collections::btree_map::BTreeMap, sync::Arc};
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::{AtomicU64, Ordering};
use core::time::Duration;
use current::{CUR_TASK_PTR, current_task};
use libkernel::{UserAddressSpace, error::Result};
@@ -66,7 +66,7 @@ pub const SCHED_WEIGHT_BASE: i32 = 1024;
fn schedule() {
// Reentrancy Check
if SCHED_STATE.try_borrow_mut().is_none() {
log::warn!(
warn!(
"Scheduler reentrancy detected on CPU {}",
CpuId::this().value()
);
@@ -86,18 +86,23 @@ pub fn spawn_kernel_work(fut: impl Future<Output = ()> + 'static + Send) {
current_task().ctx.put_kernel_work(Box::pin(fut));
}
/// Global atomic storing info about the least-tasked CPU.
/// First 16 bits: CPU ID
/// Next 24 bits: Weight
/// Next 24 bits: Number of waiting tasks
static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0);
const WEIGHT_SHIFT: u32 = 16;
const WAITING_SHIFT: u32 = WEIGHT_SHIFT + 24;
#[cfg(feature = "smp")]
fn get_next_cpu() -> CpuId {
static NEXT_CPU: AtomicUsize = AtomicUsize::new(0);
let cpu_count = ArchImpl::cpu_count();
let cpu_id = NEXT_CPU.fetch_add(1, core::sync::atomic::Ordering::Relaxed) % cpu_count;
CpuId::from_value(cpu_id)
fn get_best_cpu() -> CpuId {
// Get the CPU with the least number of tasks.
let least_tasked_cpu_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
CpuId::from_value((least_tasked_cpu_info & 0xffff) as usize)
}
#[cfg(not(feature = "smp"))]
fn get_next_cpu() -> CpuId {
fn get_best_cpu() -> CpuId {
CpuId::this()
}
@@ -110,7 +115,7 @@ pub fn insert_task(task: Box<OwnedTask>) {
#[cfg(feature = "smp")]
pub fn insert_task_cross_cpu(task: Box<OwnedTask>) {
let cpu = get_next_cpu();
let cpu = get_best_cpu();
if cpu == CpuId::this() {
insert_task(task);
} else {
@@ -148,6 +153,62 @@ impl SchedState {
}
}
/// Update the global least-tasked CPU info atomically.
#[cfg(feature = "smp")]
fn update_global_least_tasked_cpu_info(&self) {
fn none<T>() -> Option<T> {
None
}
per_cpu! {
static LAST_UPDATE: Option<Instant> = none;
}
// Try and throttle contention on the atomic variable.
const MIN_COOLDOWN: Duration = Duration::from_millis(16);
if let Some(last) = LAST_UPDATE.borrow().as_ref()
&& let Some(now) = now()
&& now - *last < MIN_COOLDOWN
{
return;
}
*LAST_UPDATE.borrow_mut() = now();
let weight = self.run_q.weight();
let waiting_tasks = self.wait_q.len() as u64;
let cpu_id = CpuId::this().value() as u64;
let new_info = (cpu_id & 0xffff)
| ((weight & 0xffffff) << WEIGHT_SHIFT)
| ((waiting_tasks & 0xffffff) << WAITING_SHIFT);
let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
// Ensure we don't spin forever (possible with a larger number of CPUs)
const MAX_RETRIES: usize = 8;
// Ensure consistency
for _ in 0..MAX_RETRIES {
let old_cpu_id = old_info & 0xffff;
let old_weight = (old_info >> WEIGHT_SHIFT) & 0xffffff;
let old_waiting = (old_info >> WAITING_SHIFT) & 0xffffff;
let metric = weight + (waiting_tasks * SCHED_WEIGHT_BASE as u64);
let old_metric = old_weight + (old_waiting * SCHED_WEIGHT_BASE as u64);
if (cpu_id == old_cpu_id && old_info != new_info) || (metric < old_metric) {
match LEAST_TASKED_CPU_INFO.compare_exchange(
old_info,
new_info,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(x) => old_info = x,
}
}
}
}
#[cfg(not(feature = "smp"))]
fn update_global_least_tasked_cpu_info(&self) {
// No-op on single-core systems.
}
/// Advance the per-CPU virtual clock (`vclock`) by converting the elapsed
/// real time since the last update into 65.63-format fixed-point
/// virtual-time units:
@@ -183,6 +244,8 @@ impl SchedState {
}
self.run_q.enqueue_task(new_task);
self.update_global_least_tasked_cpu_info();
}
pub fn wakeup(&mut self, desc: TaskDescriptor) {
@@ -198,6 +261,7 @@ impl SchedState {
}
pub fn do_schedule(&mut self) {
self.update_global_least_tasked_cpu_info();
// Update Clocks
let now_inst = now().expect("System timer not initialised");
@@ -290,6 +354,8 @@ pub fn sched_init_secondary() {
let idle_task = ArchImpl::create_idle_task();
insert_task(Box::new(idle_task));
// Force update_global_least_tasked_cpu_info
SCHED_STATE.borrow().update_global_least_tasked_cpu_info();
}
pub fn sys_sched_yield() -> Result<usize> {