Merge pull request #261 from arihant2math/retain-cpu-affin

This commit is contained in:
Matthew Leach
2026-03-18 06:42:21 +00:00
committed by GitHub
5 changed files with 64 additions and 59 deletions

View File

@@ -191,7 +191,7 @@ pub async fn sys_clone(
.lock_save_irq()
.insert(desc.tid, Arc::downgrade(&work));
sched::insert_task_cross_cpu(work);
sched::insert_work_cross_cpu(work);
NUM_FORKS.fetch_add(1, core::sync::atomic::Ordering::Relaxed);

View File

@@ -1,4 +1,5 @@
use crate::drivers::timer::{Instant, now};
use crate::arch::{Arch, ArchImpl};
use crate::drivers::timer::now;
#[cfg(feature = "smp")]
use crate::interrupts::cpu_messenger::{Message, message_cpu};
use crate::kernel::cpu_id::CpuId;
@@ -70,6 +71,10 @@ per_cpu_private! {
static SCHED_STATE: SchedState = SchedState::new;
}
per_cpu_shared! {
pub static SHARED_SCHED_STATE: SharedSchedState = SharedSchedState::new;
}
/// Default time-slice assigned to runnable tasks.
const DEFAULT_TIME_SLICE: Duration = Duration::from_millis(4);
@@ -129,18 +134,22 @@ pub fn spawn_kernel_work(fut: impl Future<Output = ()> + 'static + Send) {
current_task().ctx.put_kernel_work(Box::pin(fut));
}
/// Global atomic storing info about the least-tasked CPU.
/// First 16 bits: CPU ID
/// Remaining 48 bits: Run-queue weight
#[cfg(feature = "smp")]
static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0);
const WEIGHT_SHIFT: u32 = 16;
#[cfg(feature = "smp")]
fn get_best_cpu() -> CpuId {
// Get the CPU with the least number of tasks.
let least_tasked_cpu_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
CpuId::from_value((least_tasked_cpu_info & 0xffff) as usize)
let r = 0..ArchImpl::cpu_count();
r.min_by(|&x, &y| {
// TODO: Find a way to calculate already assigned affinities and account for that
let info_x = SHARED_SCHED_STATE.get_by_cpu(x);
let info_y = SHARED_SCHED_STATE.get_by_cpu(y);
let weight_x = info_x.total_runq_weight.load(Ordering::Relaxed);
let weight_y = info_y.total_runq_weight.load(Ordering::Relaxed);
weight_x.cmp(&weight_y)
})
.map(CpuId::from_value)
.unwrap_or_else(|| {
warn!("No CPUs found when trying to get best CPU! Defaulting to CPU 0");
CpuId::from_value(0)
})
}
/// Insert the given task onto a CPU's run queue.
@@ -149,17 +158,27 @@ pub fn insert_work(work: Arc<Work>) {
}
#[cfg(feature = "smp")]
pub fn insert_task_cross_cpu(task: Arc<Work>) {
let cpu = get_best_cpu();
if cpu == CpuId::this() {
SCHED_STATE.borrow_mut().run_q.add_work(task);
pub fn insert_work_cross_cpu(work: Arc<Work>) {
let last_cpu = work
.sched_data
.lock_save_irq()
.as_ref()
.map(|s| s.last_cpu)
.unwrap_or(usize::MAX);
let cpu = if last_cpu == usize::MAX {
get_best_cpu()
} else {
message_cpu(cpu, Message::EnqueueWork(task)).expect("Failed to send task to CPU");
CpuId::from_value(last_cpu)
};
if cpu == CpuId::this() {
SCHED_STATE.borrow_mut().run_q.add_work(work);
} else {
message_cpu(cpu, Message::EnqueueWork(work)).expect("Failed to send task to CPU");
}
}
#[cfg(not(feature = "smp"))]
pub fn insert_task_cross_cpu(task: Arc<Work>) {
pub fn insert_work_cross_cpu(task: Arc<Work>) {
insert_work(task);
}
@@ -179,46 +198,11 @@ impl SchedState {
/// Update the global least-tasked CPU info atomically.
#[cfg(feature = "smp")]
fn update_global_least_tasked_cpu_info(&self) {
fn none<T>() -> Option<T> {
None
}
per_cpu_private! {
static LAST_UPDATE: Option<Instant> = none;
}
// Try and throttle contention on the atomic variable.
const MIN_COOLDOWN: Duration = Duration::from_millis(16);
if let Some(last) = LAST_UPDATE.borrow().as_ref()
&& let Some(now) = now()
&& now - *last < MIN_COOLDOWN
{
return;
}
*LAST_UPDATE.borrow_mut() = now();
let weight = self.run_q.weight();
let cpu_id = CpuId::this().value() as u64;
let new_info = (cpu_id & 0xffff) | ((weight & 0xffffffffffff) << WEIGHT_SHIFT);
let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
// Ensure we don't spin forever (possible with a larger number of CPUs)
const MAX_RETRIES: usize = 8;
// Ensure consistency
for _ in 0..MAX_RETRIES {
let old_cpu_id = old_info & 0xffff;
let old_weight = old_info >> WEIGHT_SHIFT;
if (cpu_id == old_cpu_id && old_info != new_info) || (weight < old_weight) {
match LEAST_TASKED_CPU_INFO.compare_exchange(
old_info,
new_info,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(x) => old_info = x,
}
}
}
SHARED_SCHED_STATE
.get()
.total_runq_weight
.store(weight, Ordering::Relaxed);
}
#[cfg(not(feature = "smp"))]
@@ -245,6 +229,18 @@ impl SchedState {
}
}
pub struct SharedSchedState {
pub total_runq_weight: AtomicU64,
}
impl SharedSchedState {
pub fn new() -> Self {
Self {
total_runq_weight: AtomicU64::new(0),
}
}
}
pub fn sched_init() {
let init_task = OwnedTask::create_init_task();

View File

@@ -8,6 +8,7 @@ use crate::{
};
use alloc::{boxed::Box, collections::binary_heap::BinaryHeap, sync::Arc, vec::Vec};
use core::{cmp, ptr, sync::atomic::Ordering};
use libkernel::CpuOps;
use vclock::VClock;
mod vclock;
@@ -116,6 +117,7 @@ impl RunQueue {
// Task wants to deactivate. Drop the RunnableTask now to
// restore sched_data.
let work = cur_task.task.clone();
cur_task.sched_data.last_cpu = ArchImpl::id();
self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64);
drop(cur_task);

View File

@@ -22,6 +22,9 @@ pub struct Work {
pub sched_data: SpinLock<Option<SchedulerData>>,
}
pub const NR_CPUS: usize = 256;
pub const CPU_MASK_SIZE: usize = NR_CPUS / 64;
#[derive(Clone)]
pub struct SchedulerData {
pub v_runtime: u128,
@@ -32,6 +35,8 @@ pub struct SchedulerData {
pub exec_start: Option<Instant>,
pub deadline: Option<Instant>,
pub last_run: Option<Instant>,
pub last_cpu: usize,
pub cpu_mask: [u64; CPU_MASK_SIZE],
}
impl SchedulerData {
@@ -43,6 +48,8 @@ impl SchedulerData {
exec_start: None,
deadline: None,
last_run: None,
last_cpu: usize::MAX,
cpu_mask: [u64::MAX; CPU_MASK_SIZE],
}
}
}

View File

@@ -2,7 +2,7 @@ use alloc::sync::Arc;
use core::task::{RawWaker, RawWakerVTable, Waker};
use super::{
SCHED_STATE,
insert_work_cross_cpu,
sched_task::{Work, state::WakerAction},
};
@@ -25,7 +25,7 @@ unsafe fn wake_waker_no_consume(data: *const ()) {
match work.state.wake() {
WakerAction::Enqueue => {
SCHED_STATE.borrow_mut().run_q.add_work(work);
insert_work_cross_cpu(work);
}
WakerAction::PreventedSleep | WakerAction::None => {}
}