mirror of
https://github.com/hexagonal-sun/moss-kernel.git
synced 2025-12-23 22:47:55 -05:00
futex: check value while queue locked
Lock the futex queue when checking the value to ensure atomicity.
This commit is contained in:
committed by
Ashwin Naren
parent
f7a02c6ccb
commit
6bada491b6
@@ -57,7 +57,7 @@ use crate::{
|
|||||||
umask::sys_umask,
|
umask::sys_umask,
|
||||||
wait::sys_wait4,
|
wait::sys_wait4,
|
||||||
},
|
},
|
||||||
threading::{sys_futex, sys_set_robust_list, sys_set_tid_address},
|
threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address},
|
||||||
},
|
},
|
||||||
sched::current_task,
|
sched::current_task,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
use super::{
|
use super::{
|
||||||
TaskState,
|
TaskState,
|
||||||
thread_group::{ProcessState, Tgid, ThreadGroup, signal::SigId, wait::ChildState},
|
thread_group::{ProcessState, Tgid, ThreadGroup, signal::SigId, wait::ChildState},
|
||||||
|
threading::futex::{self, key::FutexKey},
|
||||||
};
|
};
|
||||||
use crate::memory::uaccess::copy_to_user;
|
use crate::memory::uaccess::copy_to_user;
|
||||||
use crate::process::threading::futex_wake_addr;
|
|
||||||
use crate::sched::current_task;
|
use crate::sched::current_task;
|
||||||
use alloc::vec::Vec;
|
use alloc::vec::Vec;
|
||||||
use libkernel::error::Result;
|
use libkernel::error::Result;
|
||||||
|
use log::warn;
|
||||||
use ringbuf::Arc;
|
use ringbuf::Arc;
|
||||||
|
|
||||||
pub fn do_exit_group(exit_code: ChildState) {
|
pub fn do_exit_group(exit_code: ChildState) {
|
||||||
@@ -108,8 +109,11 @@ pub async fn sys_exit(exit_code: usize) -> Result<usize> {
|
|||||||
if let Some(ptr) = ptr {
|
if let Some(ptr) = ptr {
|
||||||
copy_to_user(ptr, 0u32).await?;
|
copy_to_user(ptr, 0u32).await?;
|
||||||
|
|
||||||
// Wake any thread waiting on this futex.
|
if let Ok(key) = FutexKey::new_shared(ptr) {
|
||||||
futex_wake_addr(ptr, 1);
|
futex::wake_key(1, key);
|
||||||
|
} else {
|
||||||
|
warn!("Failed to get futex wake key on sys_exit");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let process = Arc::clone(&task.process);
|
let process = Arc::clone(&task.process);
|
||||||
|
|||||||
@@ -1,159 +0,0 @@
|
|||||||
use core::ffi::c_long;
|
|
||||||
use core::mem::size_of;
|
|
||||||
|
|
||||||
use crate::memory::uaccess::copy_from_user;
|
|
||||||
use crate::sched::current_task;
|
|
||||||
use crate::sync::{OnceLock, SpinLock};
|
|
||||||
use alloc::{collections::BTreeMap, sync::Arc};
|
|
||||||
use libkernel::sync::waker_set::{WakerSet, wait_until};
|
|
||||||
use libkernel::{
|
|
||||||
error::{KernelError, Result},
|
|
||||||
memory::address::{TUA, VA},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// A per-futex wait queue holding wakers for blocked tasks.
|
|
||||||
struct FutexWaitQueue {
|
|
||||||
wakers: WakerSet,
|
|
||||||
/// Number of pending wake-ups for waiters on this futex.
|
|
||||||
wakeups: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FutexWaitQueue {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
wakers: WakerSet::new(),
|
|
||||||
wakeups: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Global futex table mapping a user address to its wait queue.
|
|
||||||
// TODO: statically allocate an array of SpinLock<Vec<FutexWaitQueue>>.
|
|
||||||
// Then hash into that table to find it's bucket
|
|
||||||
// TODO: Should be physical address, not user address
|
|
||||||
#[allow(clippy::type_complexity)]
|
|
||||||
static FUTEX_TABLE: OnceLock<SpinLock<BTreeMap<TUA<u32>, Arc<SpinLock<FutexWaitQueue>>>>> =
|
|
||||||
OnceLock::new();
|
|
||||||
|
|
||||||
pub async fn sys_set_tid_address(_tidptr: VA) -> Result<usize> {
|
|
||||||
let tid = current_task().tid;
|
|
||||||
|
|
||||||
// TODO: implement threading and this system call properly. For now, we just
|
|
||||||
// return the PID as the thread id.
|
|
||||||
Ok(tid.value() as _)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[repr(C)]
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
pub struct RobustList {
|
|
||||||
next: TUA<RobustList>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[repr(C)]
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
pub struct RobustListHead {
|
|
||||||
list: RobustList,
|
|
||||||
futex_offset: c_long,
|
|
||||||
list_op_pending: RobustList,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn sys_set_robust_list(head: TUA<RobustListHead>, len: usize) -> Result<usize> {
|
|
||||||
if core::hint::unlikely(len != size_of::<RobustListHead>()) {
|
|
||||||
return Err(KernelError::InvalidValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
let task = current_task();
|
|
||||||
task.robust_list.lock_save_irq().replace(head);
|
|
||||||
|
|
||||||
Ok(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
const FUTEX_WAIT: i32 = 0;
|
|
||||||
const FUTEX_WAKE: i32 = 1;
|
|
||||||
const FUTEX_WAIT_BITSET: i32 = 9;
|
|
||||||
const FUTEX_WAKE_BITSET: i32 = 10;
|
|
||||||
const FUTEX_PRIVATE_FLAG: i32 = 128;
|
|
||||||
|
|
||||||
/// Wake up to `nr` waiters sleeping on the futex word located at `uaddr`.
|
|
||||||
/// Returns the number of tasks actually woken.
|
|
||||||
pub fn futex_wake_addr(uaddr: TUA<u32>, nr: usize) -> usize {
|
|
||||||
let mut woke = 0;
|
|
||||||
|
|
||||||
if let Some(table) = FUTEX_TABLE.get()
|
|
||||||
&& let Some(waitq_arc) = table.lock_save_irq().get(&uaddr).cloned()
|
|
||||||
{
|
|
||||||
let mut waitq = waitq_arc.lock_save_irq();
|
|
||||||
for _ in 0..nr {
|
|
||||||
waitq.wakeups = waitq.wakeups.saturating_add(1);
|
|
||||||
waitq.wakers.wake_one();
|
|
||||||
woke += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
woke
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn sys_futex(
|
|
||||||
uaddr: TUA<u32>,
|
|
||||||
op: i32,
|
|
||||||
val: u32,
|
|
||||||
_timeout: VA,
|
|
||||||
_uaddr2: TUA<u32>,
|
|
||||||
_val3: u32,
|
|
||||||
) -> Result<usize> {
|
|
||||||
// Strip PRIVATE flag if present
|
|
||||||
let cmd = op & !FUTEX_PRIVATE_FLAG;
|
|
||||||
|
|
||||||
// TODO: support bitset variants properly
|
|
||||||
|
|
||||||
match cmd {
|
|
||||||
FUTEX_WAIT | FUTEX_WAIT_BITSET => {
|
|
||||||
// Ensure the wait-queue exists *before* we begin checking the
|
|
||||||
// futex word so that a racing FUTEX_WAKE cannot miss us. This
|
|
||||||
// avoids the classic lost-wake-up race where a waker runs between
|
|
||||||
// our value check and queue insertion.
|
|
||||||
//
|
|
||||||
// After publishing the queue we perform a second sanity check on
|
|
||||||
// the user word, mirroring Linux’s “double read” strategy.
|
|
||||||
|
|
||||||
// Obtain (or create) the wait-queue for this futex word.
|
|
||||||
let table = FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new()));
|
|
||||||
let waitq_arc = {
|
|
||||||
let mut guard = table.lock_save_irq();
|
|
||||||
guard
|
|
||||||
.entry(uaddr)
|
|
||||||
.or_insert_with(|| Arc::new(SpinLock::new(FutexWaitQueue::new())))
|
|
||||||
.clone()
|
|
||||||
};
|
|
||||||
|
|
||||||
let current: u32 = copy_from_user(uaddr).await?;
|
|
||||||
if current != val {
|
|
||||||
return Err(KernelError::TryAgain);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: When we have try_ variants of locking primitives, use them here
|
|
||||||
wait_until(
|
|
||||||
waitq_arc.clone(),
|
|
||||||
|state| &mut state.wakers,
|
|
||||||
|state| {
|
|
||||||
if state.wakeups > 0 {
|
|
||||||
state.wakeups -= 1;
|
|
||||||
Some(())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
FUTEX_WAKE | FUTEX_WAKE_BITSET => {
|
|
||||||
let nr_wake = val as usize;
|
|
||||||
Ok(futex_wake_addr(uaddr, nr_wake))
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => Err(KernelError::NotSupported),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
36
src/process/threading/futex/key.rs
Normal file
36
src/process/threading/futex/key.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use crate::sched::current_task;
|
||||||
|
use libkernel::UserAddressSpace;
|
||||||
|
use libkernel::error::{KernelError, Result};
|
||||||
|
use libkernel::memory::address::{TUA, VA};
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
|
||||||
|
pub enum FutexKey {
|
||||||
|
Private { pid: u32, addr: usize },
|
||||||
|
Shared { frame: usize, offset: usize },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutexKey {
|
||||||
|
pub fn new_private(uaddr: TUA<u32>) -> Self {
|
||||||
|
let pid = current_task().process.tgid.value();
|
||||||
|
|
||||||
|
Self::Private {
|
||||||
|
pid,
|
||||||
|
addr: uaddr.value(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_shared(uaddr: TUA<u32>) -> Result<Self> {
|
||||||
|
let pg_info = current_task()
|
||||||
|
.vm
|
||||||
|
.lock_save_irq()
|
||||||
|
.mm_mut()
|
||||||
|
.address_space_mut()
|
||||||
|
.translate(VA::from_value(uaddr.value()))
|
||||||
|
.ok_or(KernelError::Fault)?;
|
||||||
|
|
||||||
|
Ok(Self::Shared {
|
||||||
|
frame: pg_info.pfn.value(),
|
||||||
|
offset: uaddr.page_offset(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
90
src/process/threading/futex/mod.rs
Normal file
90
src/process/threading/futex/mod.rs
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
use crate::sync::{OnceLock, SpinLock};
|
||||||
|
use alloc::{collections::btree_map::BTreeMap, sync::Arc};
|
||||||
|
use key::FutexKey;
|
||||||
|
use libkernel::{
|
||||||
|
error::{KernelError, Result},
|
||||||
|
memory::address::{TUA, VA},
|
||||||
|
sync::waker_set::WakerSet,
|
||||||
|
};
|
||||||
|
use wait::FutexWait;
|
||||||
|
|
||||||
|
pub mod key;
|
||||||
|
mod wait;
|
||||||
|
|
||||||
|
const FUTEX_WAIT: i32 = 0;
|
||||||
|
const FUTEX_WAKE: i32 = 1;
|
||||||
|
const FUTEX_WAIT_BITSET: i32 = 9;
|
||||||
|
const FUTEX_WAKE_BITSET: i32 = 10;
|
||||||
|
const FUTEX_PRIVATE_FLAG: i32 = 128;
|
||||||
|
|
||||||
|
type FutexTable = BTreeMap<FutexKey, Arc<SpinLock<WakerSet>>>;
|
||||||
|
|
||||||
|
/// Global futex table mapping a futex key to its wait queue.
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
static FUTEX_TABLE: OnceLock<SpinLock<FutexTable>> = OnceLock::new();
|
||||||
|
|
||||||
|
fn futex_table() -> &'static SpinLock<FutexTable> {
|
||||||
|
FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_or_create_queue(key: FutexKey) -> Arc<SpinLock<WakerSet>> {
|
||||||
|
let table = futex_table();
|
||||||
|
|
||||||
|
table
|
||||||
|
.lock_save_irq()
|
||||||
|
.entry(key)
|
||||||
|
.or_insert_with(|| Arc::new(SpinLock::new(WakerSet::new())))
|
||||||
|
.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wake_key(nr_wake: usize, key: FutexKey) -> usize {
|
||||||
|
let mut woke = 0;
|
||||||
|
|
||||||
|
let table = futex_table();
|
||||||
|
|
||||||
|
if let Some(waitq_arc) = table.lock_save_irq().get(&key).cloned() {
|
||||||
|
let mut waitq = waitq_arc.lock_save_irq();
|
||||||
|
for _ in 0..nr_wake {
|
||||||
|
if waitq.wake_one() {
|
||||||
|
woke += 1;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
woke
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn sys_futex(
|
||||||
|
uaddr: TUA<u32>,
|
||||||
|
op: i32,
|
||||||
|
val: u32,
|
||||||
|
_timeout: VA,
|
||||||
|
_uaddr2: TUA<u32>,
|
||||||
|
_val3: u32,
|
||||||
|
) -> Result<usize> {
|
||||||
|
// Strip PRIVATE flag if present
|
||||||
|
let cmd = op & !FUTEX_PRIVATE_FLAG;
|
||||||
|
|
||||||
|
let key = if op & FUTEX_PRIVATE_FLAG != 0 {
|
||||||
|
FutexKey::new_private(uaddr)
|
||||||
|
} else {
|
||||||
|
FutexKey::new_shared(uaddr)?
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: support bitset variants properly
|
||||||
|
match cmd {
|
||||||
|
FUTEX_WAIT | FUTEX_WAIT_BITSET => {
|
||||||
|
// Obtain (or create) the wait-queue for this futex word.
|
||||||
|
let slot = get_or_create_queue(key);
|
||||||
|
|
||||||
|
// Return 0 on success.
|
||||||
|
FutexWait::new(uaddr, val, slot).await.map(|_| 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
FUTEX_WAKE | FUTEX_WAKE_BITSET => Ok(wake_key(val as _, key)),
|
||||||
|
|
||||||
|
_ => Err(KernelError::NotSupported),
|
||||||
|
}
|
||||||
|
}
|
||||||
109
src/process/threading/futex/wait.rs
Normal file
109
src/process/threading/futex/wait.rs
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
use alloc::{boxed::Box, sync::Arc};
|
||||||
|
use core::{
|
||||||
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
use libkernel::{
|
||||||
|
error::{KernelError, Result},
|
||||||
|
memory::address::TUA,
|
||||||
|
sync::waker_set::WakerSet,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
memory::uaccess::{copy_from_user, try_copy_from_user},
|
||||||
|
sync::SpinLock,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum WaitState {
|
||||||
|
Init,
|
||||||
|
HandlingFault(Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>),
|
||||||
|
Waiting { token: u64 },
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FutexWait {
|
||||||
|
uaddr: TUA<u32>,
|
||||||
|
val: u32,
|
||||||
|
queue: Arc<SpinLock<WakerSet>>,
|
||||||
|
state: WaitState,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutexWait {
|
||||||
|
pub fn new(uaddr: TUA<u32>, val: u32, queue: Arc<SpinLock<WakerSet>>) -> Self {
|
||||||
|
Self {
|
||||||
|
uaddr,
|
||||||
|
val,
|
||||||
|
queue,
|
||||||
|
state: WaitState::Init,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for FutexWait {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let WaitState::Waiting { token } = &self.state {
|
||||||
|
self.queue.lock_save_irq().remove(*token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for FutexWait {
|
||||||
|
type Output = Result<()>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
// Safe because we are inside Pin and not moving out of Self
|
||||||
|
let this = unsafe { self.as_mut().get_unchecked_mut() };
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match &mut this.state {
|
||||||
|
WaitState::Init => {
|
||||||
|
let mut wait_queue = this.queue.lock_save_irq();
|
||||||
|
|
||||||
|
match try_copy_from_user(this.uaddr) {
|
||||||
|
Ok(val) => {
|
||||||
|
if val != this.val {
|
||||||
|
return Poll::Ready(Err(KernelError::TryAgain));
|
||||||
|
}
|
||||||
|
|
||||||
|
let token = wait_queue.register(cx.waker());
|
||||||
|
|
||||||
|
this.state = WaitState::Waiting { token };
|
||||||
|
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
drop(wait_queue);
|
||||||
|
|
||||||
|
let uaddr = this.uaddr;
|
||||||
|
let fault_handler = Box::pin(async move {
|
||||||
|
copy_from_user(uaddr).await?;
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
this.state = WaitState::HandlingFault(fault_handler);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitState::Waiting { token } => {
|
||||||
|
let wait_queue = this.queue.lock_save_irq();
|
||||||
|
|
||||||
|
if !wait_queue.contains_token(*token) {
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
} else {
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitState::HandlingFault(fut) => match fut.as_mut().poll(cx) {
|
||||||
|
Poll::Ready(Ok(_)) => {
|
||||||
|
this.state = WaitState::Init;
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
||||||
|
Poll::Pending => return Poll::Pending,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
43
src/process/threading/mod.rs
Normal file
43
src/process/threading/mod.rs
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
use core::ffi::c_long;
|
||||||
|
use core::mem::size_of;
|
||||||
|
|
||||||
|
use crate::sched::current_task;
|
||||||
|
use libkernel::{
|
||||||
|
error::{KernelError, Result},
|
||||||
|
memory::address::{TUA, VA},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub mod futex;
|
||||||
|
|
||||||
|
pub async fn sys_set_tid_address(_tidptr: VA) -> Result<usize> {
|
||||||
|
let tid = current_task().tid;
|
||||||
|
|
||||||
|
// TODO: implement threading and this system call properly. For now, we just
|
||||||
|
// return the PID as the thread id.
|
||||||
|
Ok(tid.value() as _)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
pub struct RobustList {
|
||||||
|
next: TUA<RobustList>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
pub struct RobustListHead {
|
||||||
|
list: RobustList,
|
||||||
|
futex_offset: c_long,
|
||||||
|
list_op_pending: RobustList,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn sys_set_robust_list(head: TUA<RobustListHead>, len: usize) -> Result<usize> {
|
||||||
|
if core::hint::unlikely(len != size_of::<RobustListHead>()) {
|
||||||
|
return Err(KernelError::InvalidValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
let task = current_task();
|
||||||
|
task.robust_list.lock_save_irq().replace(head);
|
||||||
|
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user