From c2dba9762c2aa31081738fedf75f56513b917564 Mon Sep 17 00:00:00 2001 From: Ashwin Naren Date: Sat, 14 Mar 2026 16:41:08 -0700 Subject: [PATCH] retain CPU affinity when rescheduling --- src/process/clone.rs | 2 +- src/sched/mod.rs | 108 +++++++++++++++++------------------- src/sched/runqueue/mod.rs | 2 + src/sched/sched_task/mod.rs | 7 +++ src/sched/waker.rs | 4 +- 5 files changed, 64 insertions(+), 59 deletions(-) diff --git a/src/process/clone.rs b/src/process/clone.rs index e997c32..90f1081 100644 --- a/src/process/clone.rs +++ b/src/process/clone.rs @@ -191,7 +191,7 @@ pub async fn sys_clone( .lock_save_irq() .insert(desc.tid, Arc::downgrade(&work)); - sched::insert_task_cross_cpu(work); + sched::insert_work_cross_cpu(work); NUM_FORKS.fetch_add(1, core::sync::atomic::Ordering::Relaxed); diff --git a/src/sched/mod.rs b/src/sched/mod.rs index 06008ef..331841b 100644 --- a/src/sched/mod.rs +++ b/src/sched/mod.rs @@ -1,4 +1,5 @@ -use crate::drivers::timer::{Instant, now}; +use crate::arch::{Arch, ArchImpl}; +use crate::drivers::timer::now; #[cfg(feature = "smp")] use crate::interrupts::cpu_messenger::{Message, message_cpu}; use crate::kernel::cpu_id::CpuId; @@ -70,6 +71,10 @@ per_cpu_private! { static SCHED_STATE: SchedState = SchedState::new; } +per_cpu_shared! { + pub static SHARED_SCHED_STATE: SharedSchedState = SharedSchedState::new; +} + /// Default time-slice assigned to runnable tasks. const DEFAULT_TIME_SLICE: Duration = Duration::from_millis(4); @@ -129,18 +134,22 @@ pub fn spawn_kernel_work(fut: impl Future + 'static + Send) { current_task().ctx.put_kernel_work(Box::pin(fut)); } -/// Global atomic storing info about the least-tasked CPU. -/// First 16 bits: CPU ID -/// Remaining 48 bits: Run-queue weight -#[cfg(feature = "smp")] -static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0); -const WEIGHT_SHIFT: u32 = 16; - #[cfg(feature = "smp")] fn get_best_cpu() -> CpuId { - // Get the CPU with the least number of tasks. - let least_tasked_cpu_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire); - CpuId::from_value((least_tasked_cpu_info & 0xffff) as usize) + let r = 0..ArchImpl::cpu_count(); + r.min_by(|&x, &y| { + // TODO: Find a way to calculate already assigned affinities and account for that + let info_x = SHARED_SCHED_STATE.get_by_cpu(x); + let info_y = SHARED_SCHED_STATE.get_by_cpu(y); + let weight_x = info_x.total_runq_weight.load(Ordering::Relaxed); + let weight_y = info_y.total_runq_weight.load(Ordering::Relaxed); + weight_x.cmp(&weight_y) + }) + .map(CpuId::from_value) + .unwrap_or_else(|| { + warn!("No CPUs found when trying to get best CPU! Defaulting to CPU 0"); + CpuId::from_value(0) + }) } /// Insert the given task onto a CPU's run queue. @@ -149,17 +158,27 @@ pub fn insert_work(work: Arc) { } #[cfg(feature = "smp")] -pub fn insert_task_cross_cpu(task: Arc) { - let cpu = get_best_cpu(); - if cpu == CpuId::this() { - SCHED_STATE.borrow_mut().run_q.add_work(task); +pub fn insert_work_cross_cpu(work: Arc) { + let last_cpu = work + .sched_data + .lock_save_irq() + .as_ref() + .map(|s| s.last_cpu) + .unwrap_or(usize::MAX); + let cpu = if last_cpu == usize::MAX { + get_best_cpu() } else { - message_cpu(cpu, Message::EnqueueWork(task)).expect("Failed to send task to CPU"); + CpuId::from_value(last_cpu) + }; + if cpu == CpuId::this() { + SCHED_STATE.borrow_mut().run_q.add_work(work); + } else { + message_cpu(cpu, Message::EnqueueWork(work)).expect("Failed to send task to CPU"); } } #[cfg(not(feature = "smp"))] -pub fn insert_task_cross_cpu(task: Arc) { +pub fn insert_work_cross_cpu(task: Arc) { insert_work(task); } @@ -179,46 +198,11 @@ impl SchedState { /// Update the global least-tasked CPU info atomically. #[cfg(feature = "smp")] fn update_global_least_tasked_cpu_info(&self) { - fn none() -> Option { - None - } - - per_cpu_private! { - static LAST_UPDATE: Option = none; - } - - // Try and throttle contention on the atomic variable. - const MIN_COOLDOWN: Duration = Duration::from_millis(16); - if let Some(last) = LAST_UPDATE.borrow().as_ref() - && let Some(now) = now() - && now - *last < MIN_COOLDOWN - { - return; - } - *LAST_UPDATE.borrow_mut() = now(); - let weight = self.run_q.weight(); - let cpu_id = CpuId::this().value() as u64; - let new_info = (cpu_id & 0xffff) | ((weight & 0xffffffffffff) << WEIGHT_SHIFT); - let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire); - // Ensure we don't spin forever (possible with a larger number of CPUs) - const MAX_RETRIES: usize = 8; - // Ensure consistency - for _ in 0..MAX_RETRIES { - let old_cpu_id = old_info & 0xffff; - let old_weight = old_info >> WEIGHT_SHIFT; - if (cpu_id == old_cpu_id && old_info != new_info) || (weight < old_weight) { - match LEAST_TASKED_CPU_INFO.compare_exchange( - old_info, - new_info, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(x) => old_info = x, - } - } - } + SHARED_SCHED_STATE + .get() + .total_runq_weight + .store(weight, Ordering::Relaxed); } #[cfg(not(feature = "smp"))] @@ -245,6 +229,18 @@ impl SchedState { } } +pub struct SharedSchedState { + pub total_runq_weight: AtomicU64, +} + +impl SharedSchedState { + pub fn new() -> Self { + Self { + total_runq_weight: AtomicU64::new(0), + } + } +} + pub fn sched_init() { let init_task = OwnedTask::create_init_task(); diff --git a/src/sched/runqueue/mod.rs b/src/sched/runqueue/mod.rs index 362cf39..f66f78f 100644 --- a/src/sched/runqueue/mod.rs +++ b/src/sched/runqueue/mod.rs @@ -8,6 +8,7 @@ use crate::{ }; use alloc::{boxed::Box, collections::binary_heap::BinaryHeap, sync::Arc, vec::Vec}; use core::{cmp, ptr, sync::atomic::Ordering}; +use libkernel::CpuOps; use vclock::VClock; mod vclock; @@ -116,6 +117,7 @@ impl RunQueue { // Task wants to deactivate. Drop the RunnableTask now to // restore sched_data. let work = cur_task.task.clone(); + cur_task.sched_data.last_cpu = ArchImpl::id(); self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64); drop(cur_task); diff --git a/src/sched/sched_task/mod.rs b/src/sched/sched_task/mod.rs index 82d97a5..15124c5 100644 --- a/src/sched/sched_task/mod.rs +++ b/src/sched/sched_task/mod.rs @@ -22,6 +22,9 @@ pub struct Work { pub sched_data: SpinLock>, } +pub const NR_CPUS: usize = 256; +pub const CPU_MASK_SIZE: usize = NR_CPUS / 64; + #[derive(Clone)] pub struct SchedulerData { pub v_runtime: u128, @@ -32,6 +35,8 @@ pub struct SchedulerData { pub exec_start: Option, pub deadline: Option, pub last_run: Option, + pub last_cpu: usize, + pub cpu_mask: [u64; CPU_MASK_SIZE], } impl SchedulerData { @@ -43,6 +48,8 @@ impl SchedulerData { exec_start: None, deadline: None, last_run: None, + last_cpu: usize::MAX, + cpu_mask: [u64::MAX; CPU_MASK_SIZE], } } } diff --git a/src/sched/waker.rs b/src/sched/waker.rs index 1c6629b..1f5cf7c 100644 --- a/src/sched/waker.rs +++ b/src/sched/waker.rs @@ -2,7 +2,7 @@ use alloc::sync::Arc; use core::task::{RawWaker, RawWakerVTable, Waker}; use super::{ - SCHED_STATE, + insert_work_cross_cpu, sched_task::{Work, state::WakerAction}, }; @@ -25,7 +25,7 @@ unsafe fn wake_waker_no_consume(data: *const ()) { match work.state.wake() { WakerAction::Enqueue => { - SCHED_STATE.borrow_mut().run_q.add_work(work); + insert_work_cross_cpu(work); } WakerAction::PreventedSleep | WakerAction::None => {} }