futex: check value while queue locked

Lock the futex queue when checking the value to ensure atomicity.
This commit is contained in:
Matthew Leach
2025-12-23 05:16:33 +00:00
parent facee29869
commit 701f1f9d46
7 changed files with 286 additions and 163 deletions

View File

@@ -62,7 +62,7 @@ use crate::{
umask::sys_umask,
wait::sys_wait4,
},
threading::{sys_futex, sys_set_robust_list, sys_set_tid_address},
threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address},
},
sched::current_task,
};

View File

@@ -1,12 +1,13 @@
use super::{
TaskState,
thread_group::{ProcessState, Tgid, ThreadGroup, signal::SigId, wait::ChildState},
threading::futex::{self, key::FutexKey},
};
use crate::memory::uaccess::copy_to_user;
use crate::process::threading::futex_wake_addr;
use crate::sched::current_task;
use alloc::vec::Vec;
use libkernel::error::Result;
use log::warn;
use ringbuf::Arc;
pub fn do_exit_group(exit_code: ChildState) {
@@ -108,8 +109,11 @@ pub async fn sys_exit(exit_code: usize) -> Result<usize> {
if let Some(ptr) = ptr {
copy_to_user(ptr, 0u32).await?;
// Wake any thread waiting on this futex.
futex_wake_addr(ptr, 1);
if let Ok(key) = FutexKey::new_shared(ptr) {
futex::wake_key(1, key);
} else {
warn!("Failed to get futex wake key on sys_exit");
}
}
let process = Arc::clone(&task.process);

View File

@@ -1,159 +0,0 @@
use core::ffi::c_long;
use core::mem::size_of;
use crate::memory::uaccess::copy_from_user;
use crate::sched::current_task;
use crate::sync::{OnceLock, SpinLock};
use alloc::{collections::BTreeMap, sync::Arc};
use libkernel::sync::waker_set::{WakerSet, wait_until};
use libkernel::{
error::{KernelError, Result},
memory::address::{TUA, VA},
};
/// A per-futex wait queue holding wakers for blocked tasks.
struct FutexWaitQueue {
wakers: WakerSet,
/// Number of pending wake-ups for waiters on this futex.
wakeups: usize,
}
impl FutexWaitQueue {
fn new() -> Self {
Self {
wakers: WakerSet::new(),
wakeups: 0,
}
}
}
/// Global futex table mapping a user address to its wait queue.
// TODO: statically allocate an array of SpinLock<Vec<FutexWaitQueue>>.
// Then hash into that table to find it's bucket
// TODO: Should be physical address, not user address
#[allow(clippy::type_complexity)]
static FUTEX_TABLE: OnceLock<SpinLock<BTreeMap<TUA<u32>, Arc<SpinLock<FutexWaitQueue>>>>> =
OnceLock::new();
pub async fn sys_set_tid_address(_tidptr: VA) -> Result<usize> {
let tid = current_task().tid;
// TODO: implement threading and this system call properly. For now, we just
// return the PID as the thread id.
Ok(tid.value() as _)
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct RobustList {
next: TUA<RobustList>,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct RobustListHead {
list: RobustList,
futex_offset: c_long,
list_op_pending: RobustList,
}
pub async fn sys_set_robust_list(head: TUA<RobustListHead>, len: usize) -> Result<usize> {
if core::hint::unlikely(len != size_of::<RobustListHead>()) {
return Err(KernelError::InvalidValue);
}
let task = current_task();
task.robust_list.lock_save_irq().replace(head);
Ok(0)
}
const FUTEX_WAIT: i32 = 0;
const FUTEX_WAKE: i32 = 1;
const FUTEX_WAIT_BITSET: i32 = 9;
const FUTEX_WAKE_BITSET: i32 = 10;
const FUTEX_PRIVATE_FLAG: i32 = 128;
/// Wake up to `nr` waiters sleeping on the futex word located at `uaddr`.
/// Returns the number of tasks actually woken.
pub fn futex_wake_addr(uaddr: TUA<u32>, nr: usize) -> usize {
let mut woke = 0;
if let Some(table) = FUTEX_TABLE.get()
&& let Some(waitq_arc) = table.lock_save_irq().get(&uaddr).cloned()
{
let mut waitq = waitq_arc.lock_save_irq();
for _ in 0..nr {
waitq.wakeups = waitq.wakeups.saturating_add(1);
waitq.wakers.wake_one();
woke += 1;
}
}
woke
}
pub async fn sys_futex(
uaddr: TUA<u32>,
op: i32,
val: u32,
_timeout: VA,
_uaddr2: TUA<u32>,
_val3: u32,
) -> Result<usize> {
// Strip PRIVATE flag if present
let cmd = op & !FUTEX_PRIVATE_FLAG;
// TODO: support bitset variants properly
match cmd {
FUTEX_WAIT | FUTEX_WAIT_BITSET => {
// Ensure the wait-queue exists *before* we begin checking the
// futex word so that a racing FUTEX_WAKE cannot miss us. This
// avoids the classic lost-wake-up race where a waker runs between
// our value check and queue insertion.
//
// After publishing the queue we perform a second sanity check on
// the user word, mirroring Linuxs “double read” strategy.
// Obtain (or create) the wait-queue for this futex word.
let table = FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new()));
let waitq_arc = {
let mut guard = table.lock_save_irq();
guard
.entry(uaddr)
.or_insert_with(|| Arc::new(SpinLock::new(FutexWaitQueue::new())))
.clone()
};
let current: u32 = copy_from_user(uaddr).await?;
if current != val {
return Err(KernelError::TryAgain);
}
// TODO: When we have try_ variants of locking primitives, use them here
wait_until(
waitq_arc.clone(),
|state| &mut state.wakers,
|state| {
if state.wakeups > 0 {
state.wakeups -= 1;
Some(())
} else {
None
}
},
)
.await;
Ok(0)
}
FUTEX_WAKE | FUTEX_WAKE_BITSET => {
let nr_wake = val as usize;
Ok(futex_wake_addr(uaddr, nr_wake))
}
_ => Err(KernelError::NotSupported),
}
}

View File

@@ -0,0 +1,36 @@
use crate::sched::current_task;
use libkernel::UserAddressSpace;
use libkernel::error::{KernelError, Result};
use libkernel::memory::address::{TUA, VA};
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub enum FutexKey {
Private { pid: u32, addr: usize },
Shared { frame: usize, offset: usize },
}
impl FutexKey {
pub fn new_private(uaddr: TUA<u32>) -> Self {
let pid = current_task().process.tgid.value();
Self::Private {
pid,
addr: uaddr.value(),
}
}
pub fn new_shared(uaddr: TUA<u32>) -> Result<Self> {
let pg_info = current_task()
.vm
.lock_save_irq()
.mm_mut()
.address_space_mut()
.translate(VA::from_value(uaddr.value()))
.ok_or(KernelError::Fault)?;
Ok(Self::Shared {
frame: pg_info.pfn.value(),
offset: uaddr.page_offset(),
})
}
}

View File

@@ -0,0 +1,90 @@
use crate::sync::{OnceLock, SpinLock};
use alloc::{collections::btree_map::BTreeMap, sync::Arc};
use key::FutexKey;
use libkernel::{
error::{KernelError, Result},
memory::address::{TUA, VA},
sync::waker_set::WakerSet,
};
use wait::FutexWait;
pub mod key;
mod wait;
const FUTEX_WAIT: i32 = 0;
const FUTEX_WAKE: i32 = 1;
const FUTEX_WAIT_BITSET: i32 = 9;
const FUTEX_WAKE_BITSET: i32 = 10;
const FUTEX_PRIVATE_FLAG: i32 = 128;
type FutexTable = BTreeMap<FutexKey, Arc<SpinLock<WakerSet>>>;
/// Global futex table mapping a futex key to its wait queue.
#[allow(clippy::type_complexity)]
static FUTEX_TABLE: OnceLock<SpinLock<FutexTable>> = OnceLock::new();
fn futex_table() -> &'static SpinLock<FutexTable> {
FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new()))
}
fn get_or_create_queue(key: FutexKey) -> Arc<SpinLock<WakerSet>> {
let table = futex_table();
table
.lock_save_irq()
.entry(key)
.or_insert_with(|| Arc::new(SpinLock::new(WakerSet::new())))
.clone()
}
pub fn wake_key(nr_wake: usize, key: FutexKey) -> usize {
let mut woke = 0;
let table = futex_table();
if let Some(waitq_arc) = table.lock_save_irq().get(&key).cloned() {
let mut waitq = waitq_arc.lock_save_irq();
for _ in 0..nr_wake {
if waitq.wake_one() {
woke += 1;
} else {
break;
}
}
}
woke
}
pub async fn sys_futex(
uaddr: TUA<u32>,
op: i32,
val: u32,
_timeout: VA,
_uaddr2: TUA<u32>,
_val3: u32,
) -> Result<usize> {
// Strip PRIVATE flag if present
let cmd = op & !FUTEX_PRIVATE_FLAG;
let key = if op & FUTEX_PRIVATE_FLAG != 0 {
FutexKey::new_private(uaddr)
} else {
FutexKey::new_shared(uaddr)?
};
// TODO: support bitset variants properly
match cmd {
FUTEX_WAIT | FUTEX_WAIT_BITSET => {
// Obtain (or create) the wait-queue for this futex word.
let slot = get_or_create_queue(key);
// Return 0 on success.
FutexWait::new(uaddr, val, slot).await.map(|_| 0)
}
FUTEX_WAKE | FUTEX_WAKE_BITSET => Ok(wake_key(val as _, key)),
_ => Err(KernelError::NotSupported),
}
}

View File

@@ -0,0 +1,109 @@
use alloc::{boxed::Box, sync::Arc};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use libkernel::{
error::{KernelError, Result},
memory::address::TUA,
sync::waker_set::WakerSet,
};
use crate::{
memory::uaccess::{copy_from_user, try_copy_from_user},
sync::SpinLock,
};
enum WaitState {
Init,
HandlingFault(Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>),
Waiting { token: u64 },
}
pub struct FutexWait {
uaddr: TUA<u32>,
val: u32,
queue: Arc<SpinLock<WakerSet>>,
state: WaitState,
}
impl FutexWait {
pub fn new(uaddr: TUA<u32>, val: u32, queue: Arc<SpinLock<WakerSet>>) -> Self {
Self {
uaddr,
val,
queue,
state: WaitState::Init,
}
}
}
impl Drop for FutexWait {
fn drop(&mut self) {
if let WaitState::Waiting { token } = &self.state {
self.queue.lock_save_irq().remove(*token);
}
}
}
impl Future for FutexWait {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Safe because we are inside Pin and not moving out of Self
let this = unsafe { self.as_mut().get_unchecked_mut() };
loop {
match &mut this.state {
WaitState::Init => {
let mut wait_queue = this.queue.lock_save_irq();
match try_copy_from_user(this.uaddr) {
Ok(val) => {
if val != this.val {
return Poll::Ready(Err(KernelError::TryAgain));
}
let token = wait_queue.register(cx.waker());
this.state = WaitState::Waiting { token };
return Poll::Pending;
}
Err(_) => {
drop(wait_queue);
let uaddr = this.uaddr;
let fault_handler = Box::pin(async move {
copy_from_user(uaddr).await?;
Ok(())
});
this.state = WaitState::HandlingFault(fault_handler);
continue;
}
}
}
WaitState::Waiting { token } => {
let wait_queue = this.queue.lock_save_irq();
if !wait_queue.contains_token(*token) {
return Poll::Ready(Ok(()));
} else {
return Poll::Pending;
}
}
WaitState::HandlingFault(fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => {
this.state = WaitState::Init;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
},
}
}
}
}

View File

@@ -0,0 +1,43 @@
use core::ffi::c_long;
use core::mem::size_of;
use crate::sched::current_task;
use libkernel::{
error::{KernelError, Result},
memory::address::{TUA, VA},
};
pub mod futex;
pub async fn sys_set_tid_address(_tidptr: VA) -> Result<usize> {
let tid = current_task().tid;
// TODO: implement threading and this system call properly. For now, we just
// return the PID as the thread id.
Ok(tid.value() as _)
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct RobustList {
next: TUA<RobustList>,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct RobustListHead {
list: RobustList,
futex_offset: c_long,
list_op_pending: RobustList,
}
pub async fn sys_set_robust_list(head: TUA<RobustListHead>, len: usize) -> Result<usize> {
if core::hint::unlikely(len != size_of::<RobustListHead>()) {
return Err(KernelError::InvalidValue);
}
let task = current_task();
task.robust_list.lock_save_irq().replace(head);
Ok(0)
}