diff --git a/src/arch/arm64/exceptions/syscall.rs b/src/arch/arm64/exceptions/syscall.rs index 5dcd7d6..99b24fc 100644 --- a/src/arch/arm64/exceptions/syscall.rs +++ b/src/arch/arm64/exceptions/syscall.rs @@ -57,7 +57,7 @@ use crate::{ umask::sys_umask, wait::sys_wait4, }, - threading::{sys_futex, sys_set_robust_list, sys_set_tid_address}, + threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address}, }, sched::current_task, }; diff --git a/src/process/exit.rs b/src/process/exit.rs index 952ba0c..e28adf6 100644 --- a/src/process/exit.rs +++ b/src/process/exit.rs @@ -1,12 +1,13 @@ use super::{ TaskState, thread_group::{ProcessState, Tgid, ThreadGroup, signal::SigId, wait::ChildState}, + threading::futex::{self, key::FutexKey}, }; use crate::memory::uaccess::copy_to_user; -use crate::process::threading::futex_wake_addr; use crate::sched::current_task; use alloc::vec::Vec; use libkernel::error::Result; +use log::warn; use ringbuf::Arc; pub fn do_exit_group(exit_code: ChildState) { @@ -108,8 +109,11 @@ pub async fn sys_exit(exit_code: usize) -> Result { if let Some(ptr) = ptr { copy_to_user(ptr, 0u32).await?; - // Wake any thread waiting on this futex. - futex_wake_addr(ptr, 1); + if let Ok(key) = FutexKey::new_shared(ptr) { + futex::wake_key(1, key); + } else { + warn!("Failed to get futex wake key on sys_exit"); + } } let process = Arc::clone(&task.process); diff --git a/src/process/threading.rs b/src/process/threading.rs deleted file mode 100644 index 3dea397..0000000 --- a/src/process/threading.rs +++ /dev/null @@ -1,159 +0,0 @@ -use core::ffi::c_long; -use core::mem::size_of; - -use crate::memory::uaccess::copy_from_user; -use crate::sched::current_task; -use crate::sync::{OnceLock, SpinLock}; -use alloc::{collections::BTreeMap, sync::Arc}; -use libkernel::sync::waker_set::{WakerSet, wait_until}; -use libkernel::{ - error::{KernelError, Result}, - memory::address::{TUA, VA}, -}; - -/// A per-futex wait queue holding wakers for blocked tasks. -struct FutexWaitQueue { - wakers: WakerSet, - /// Number of pending wake-ups for waiters on this futex. - wakeups: usize, -} - -impl FutexWaitQueue { - fn new() -> Self { - Self { - wakers: WakerSet::new(), - wakeups: 0, - } - } -} - -/// Global futex table mapping a user address to its wait queue. -// TODO: statically allocate an array of SpinLock>. -// Then hash into that table to find it's bucket -// TODO: Should be physical address, not user address -#[allow(clippy::type_complexity)] -static FUTEX_TABLE: OnceLock, Arc>>>> = - OnceLock::new(); - -pub async fn sys_set_tid_address(_tidptr: VA) -> Result { - let tid = current_task().tid; - - // TODO: implement threading and this system call properly. For now, we just - // return the PID as the thread id. - Ok(tid.value() as _) -} - -#[repr(C)] -#[derive(Clone, Copy, Debug)] -pub struct RobustList { - next: TUA, -} - -#[repr(C)] -#[derive(Clone, Copy, Debug)] -pub struct RobustListHead { - list: RobustList, - futex_offset: c_long, - list_op_pending: RobustList, -} - -pub async fn sys_set_robust_list(head: TUA, len: usize) -> Result { - if core::hint::unlikely(len != size_of::()) { - return Err(KernelError::InvalidValue); - } - - let task = current_task(); - task.robust_list.lock_save_irq().replace(head); - - Ok(0) -} - -const FUTEX_WAIT: i32 = 0; -const FUTEX_WAKE: i32 = 1; -const FUTEX_WAIT_BITSET: i32 = 9; -const FUTEX_WAKE_BITSET: i32 = 10; -const FUTEX_PRIVATE_FLAG: i32 = 128; - -/// Wake up to `nr` waiters sleeping on the futex word located at `uaddr`. -/// Returns the number of tasks actually woken. -pub fn futex_wake_addr(uaddr: TUA, nr: usize) -> usize { - let mut woke = 0; - - if let Some(table) = FUTEX_TABLE.get() - && let Some(waitq_arc) = table.lock_save_irq().get(&uaddr).cloned() - { - let mut waitq = waitq_arc.lock_save_irq(); - for _ in 0..nr { - waitq.wakeups = waitq.wakeups.saturating_add(1); - waitq.wakers.wake_one(); - woke += 1; - } - } - - woke -} - -pub async fn sys_futex( - uaddr: TUA, - op: i32, - val: u32, - _timeout: VA, - _uaddr2: TUA, - _val3: u32, -) -> Result { - // Strip PRIVATE flag if present - let cmd = op & !FUTEX_PRIVATE_FLAG; - - // TODO: support bitset variants properly - - match cmd { - FUTEX_WAIT | FUTEX_WAIT_BITSET => { - // Ensure the wait-queue exists *before* we begin checking the - // futex word so that a racing FUTEX_WAKE cannot miss us. This - // avoids the classic lost-wake-up race where a waker runs between - // our value check and queue insertion. - // - // After publishing the queue we perform a second sanity check on - // the user word, mirroring Linux’s “double read” strategy. - - // Obtain (or create) the wait-queue for this futex word. - let table = FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new())); - let waitq_arc = { - let mut guard = table.lock_save_irq(); - guard - .entry(uaddr) - .or_insert_with(|| Arc::new(SpinLock::new(FutexWaitQueue::new()))) - .clone() - }; - - let current: u32 = copy_from_user(uaddr).await?; - if current != val { - return Err(KernelError::TryAgain); - } - - // TODO: When we have try_ variants of locking primitives, use them here - wait_until( - waitq_arc.clone(), - |state| &mut state.wakers, - |state| { - if state.wakeups > 0 { - state.wakeups -= 1; - Some(()) - } else { - None - } - }, - ) - .await; - - Ok(0) - } - - FUTEX_WAKE | FUTEX_WAKE_BITSET => { - let nr_wake = val as usize; - Ok(futex_wake_addr(uaddr, nr_wake)) - } - - _ => Err(KernelError::NotSupported), - } -} diff --git a/src/process/threading/futex/key.rs b/src/process/threading/futex/key.rs new file mode 100644 index 0000000..cdcc08c --- /dev/null +++ b/src/process/threading/futex/key.rs @@ -0,0 +1,36 @@ +use crate::sched::current_task; +use libkernel::UserAddressSpace; +use libkernel::error::{KernelError, Result}; +use libkernel::memory::address::{TUA, VA}; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum FutexKey { + Private { pid: u32, addr: usize }, + Shared { frame: usize, offset: usize }, +} + +impl FutexKey { + pub fn new_private(uaddr: TUA) -> Self { + let pid = current_task().process.tgid.value(); + + Self::Private { + pid, + addr: uaddr.value(), + } + } + + pub fn new_shared(uaddr: TUA) -> Result { + let pg_info = current_task() + .vm + .lock_save_irq() + .mm_mut() + .address_space_mut() + .translate(VA::from_value(uaddr.value())) + .ok_or(KernelError::Fault)?; + + Ok(Self::Shared { + frame: pg_info.pfn.value(), + offset: uaddr.page_offset(), + }) + } +} diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs new file mode 100644 index 0000000..4febfa6 --- /dev/null +++ b/src/process/threading/futex/mod.rs @@ -0,0 +1,90 @@ +use crate::sync::{OnceLock, SpinLock}; +use alloc::{collections::btree_map::BTreeMap, sync::Arc}; +use key::FutexKey; +use libkernel::{ + error::{KernelError, Result}, + memory::address::{TUA, VA}, + sync::waker_set::WakerSet, +}; +use wait::FutexWait; + +pub mod key; +mod wait; + +const FUTEX_WAIT: i32 = 0; +const FUTEX_WAKE: i32 = 1; +const FUTEX_WAIT_BITSET: i32 = 9; +const FUTEX_WAKE_BITSET: i32 = 10; +const FUTEX_PRIVATE_FLAG: i32 = 128; + +type FutexTable = BTreeMap>>; + +/// Global futex table mapping a futex key to its wait queue. +#[allow(clippy::type_complexity)] +static FUTEX_TABLE: OnceLock> = OnceLock::new(); + +fn futex_table() -> &'static SpinLock { + FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new())) +} + +fn get_or_create_queue(key: FutexKey) -> Arc> { + let table = futex_table(); + + table + .lock_save_irq() + .entry(key) + .or_insert_with(|| Arc::new(SpinLock::new(WakerSet::new()))) + .clone() +} + +pub fn wake_key(nr_wake: usize, key: FutexKey) -> usize { + let mut woke = 0; + + let table = futex_table(); + + if let Some(waitq_arc) = table.lock_save_irq().get(&key).cloned() { + let mut waitq = waitq_arc.lock_save_irq(); + for _ in 0..nr_wake { + if waitq.wake_one() { + woke += 1; + } else { + break; + } + } + } + + woke +} + +pub async fn sys_futex( + uaddr: TUA, + op: i32, + val: u32, + _timeout: VA, + _uaddr2: TUA, + _val3: u32, +) -> Result { + // Strip PRIVATE flag if present + let cmd = op & !FUTEX_PRIVATE_FLAG; + + let key = if op & FUTEX_PRIVATE_FLAG != 0 { + FutexKey::new_private(uaddr) + } else { + FutexKey::new_shared(uaddr)? + }; + + // TODO: support bitset variants properly + match cmd { + FUTEX_WAIT | FUTEX_WAIT_BITSET => { + // Obtain (or create) the wait-queue for this futex word. + let slot = get_or_create_queue(key); + + // Return 0 on success. + FutexWait::new(uaddr, val, slot).await.map(|_| 0) + } + + FUTEX_WAKE | FUTEX_WAKE_BITSET => Ok(wake_key(val as _, key)), + + _ => Err(KernelError::NotSupported), + } +} diff --git a/src/process/threading/futex/wait.rs b/src/process/threading/futex/wait.rs new file mode 100644 index 0000000..c97115e --- /dev/null +++ b/src/process/threading/futex/wait.rs @@ -0,0 +1,109 @@ +use alloc::{boxed::Box, sync::Arc}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use libkernel::{ + error::{KernelError, Result}, + memory::address::TUA, + sync::waker_set::WakerSet, +}; + +use crate::{ + memory::uaccess::{copy_from_user, try_copy_from_user}, + sync::SpinLock, +}; + +enum WaitState { + Init, + HandlingFault(Pin> + Send + 'static>>), + Waiting { token: u64 }, +} + +pub struct FutexWait { + uaddr: TUA, + val: u32, + queue: Arc>, + state: WaitState, +} + +impl FutexWait { + pub fn new(uaddr: TUA, val: u32, queue: Arc>) -> Self { + Self { + uaddr, + val, + queue, + state: WaitState::Init, + } + } +} + +impl Drop for FutexWait { + fn drop(&mut self) { + if let WaitState::Waiting { token } = &self.state { + self.queue.lock_save_irq().remove(*token); + } + } +} + +impl Future for FutexWait { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safe because we are inside Pin and not moving out of Self + let this = unsafe { self.as_mut().get_unchecked_mut() }; + + loop { + match &mut this.state { + WaitState::Init => { + let mut wait_queue = this.queue.lock_save_irq(); + + match try_copy_from_user(this.uaddr) { + Ok(val) => { + if val != this.val { + return Poll::Ready(Err(KernelError::TryAgain)); + } + + let token = wait_queue.register(cx.waker()); + + this.state = WaitState::Waiting { token }; + + return Poll::Pending; + } + Err(_) => { + drop(wait_queue); + + let uaddr = this.uaddr; + let fault_handler = Box::pin(async move { + copy_from_user(uaddr).await?; + Ok(()) + }); + + this.state = WaitState::HandlingFault(fault_handler); + continue; + } + } + } + + WaitState::Waiting { token } => { + let wait_queue = this.queue.lock_save_irq(); + + if !wait_queue.contains_token(*token) { + return Poll::Ready(Ok(())); + } else { + return Poll::Pending; + } + } + + WaitState::HandlingFault(fut) => match fut.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => { + this.state = WaitState::Init; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + }, + } + } + } +} diff --git a/src/process/threading/mod.rs b/src/process/threading/mod.rs new file mode 100644 index 0000000..481a3f6 --- /dev/null +++ b/src/process/threading/mod.rs @@ -0,0 +1,43 @@ +use core::ffi::c_long; +use core::mem::size_of; + +use crate::sched::current_task; +use libkernel::{ + error::{KernelError, Result}, + memory::address::{TUA, VA}, +}; + +pub mod futex; + +pub async fn sys_set_tid_address(_tidptr: VA) -> Result { + let tid = current_task().tid; + + // TODO: implement threading and this system call properly. For now, we just + // return the PID as the thread id. + Ok(tid.value() as _) +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct RobustList { + next: TUA, +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct RobustListHead { + list: RobustList, + futex_offset: c_long, + list_op_pending: RobustList, +} + +pub async fn sys_set_robust_list(head: TUA, len: usize) -> Result { + if core::hint::unlikely(len != size_of::()) { + return Err(KernelError::InvalidValue); + } + + let task = current_task(); + task.robust_list.lock_save_irq().replace(head); + + Ok(0) +}