From 53cd2d2bf300d54812a5a1215426c26cf98978c2 Mon Sep 17 00:00:00 2001 From: Ashwin Naren Date: Sat, 7 Mar 2026 17:53:11 -0800 Subject: [PATCH] initial message boundary implementation --- src/net/unix.rs | 79 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 69 insertions(+), 10 deletions(-) diff --git a/src/net/unix.rs b/src/net/unix.rs index 40b4fd8..22813d0 100644 --- a/src/net/unix.rs +++ b/src/net/unix.rs @@ -1,12 +1,14 @@ use crate::fs::open_file::FileCtx; use crate::kernel::kpipe::KPipe; +use crate::memory::uaccess::copy_to_user_slice; use crate::net::sops::{RecvFlags, SendFlags}; -use crate::net::{SockAddr, SocketOps}; -use crate::sync::OnceLock; +use crate::net::{SockAddr, SockAddrUn, SocketOps}; use crate::sync::SpinLock; +use crate::sync::{Mutex, OnceLock}; use alloc::boxed::Box; -use alloc::collections::BTreeMap; +use alloc::collections::{BTreeMap, VecDeque}; use alloc::sync::Arc; +use alloc::vec; use alloc::vec::Vec; use async_trait::async_trait; use core::future::poll_fn; @@ -15,9 +17,64 @@ use core::task::Waker; use libkernel::error::{FsError, KernelError, Result}; use libkernel::memory::address::UA; +struct Message { + #[allow(unused)] + sender: SockAddrUn, + data: Vec, +} + +#[derive(Clone)] +enum Inbox { + Pipe(Arc), + Datagram(Arc>>), +} + +impl Inbox { + fn new(socket_type: SocketType) -> Self { + match socket_type { + SocketType::Stream | SocketType::SeqPacket => { + Inbox::Pipe(Arc::new(KPipe::new().expect("KPipe creation failed"))) + } + SocketType::Datagram => Inbox::Datagram(Arc::new(Mutex::new(VecDeque::new()))), + } + } + + async fn send(&self, origin: SockAddrUn, buf: UA, count: usize) -> Result { + match self { + Inbox::Pipe(pipe) => pipe.copy_from_user(buf, count).await, + Inbox::Datagram(queue) => { + let mut data = vec![0u8; count]; + copy_to_user_slice(&mut data, buf).await?; + let msg = Message { + sender: origin, + data, + }; + queue.lock().await.push_back(msg); + Ok(count) + } + } + } + + async fn recv(&self, buf: UA, count: usize) -> Result { + match self { + Inbox::Pipe(pipe) => pipe.copy_to_user(buf, count).await, + Inbox::Datagram(queue) => { + let mut q = queue.lock().await; + if let Some(msg) = q.pop_front() { + let n = msg.data.len().min(count); + copy_to_user_slice(&msg.data[..n], buf).await?; + Ok(n) + } else { + Ok(0) + } + } + } + } +} + /// Registry mapping Unix socket path bytes to endpoint inbox and listening state struct Endpoint { - inbox: Arc, + inbox: Inbox, listening: bool, backlog_max: usize, pending: Vec, @@ -32,6 +89,7 @@ fn endpoints() -> &'static SpinLock, Endpoint>> { UNIX_ENDPOINTS.get_or_init(|| SpinLock::new(BTreeMap::new())) } +#[derive(Copy, Clone)] enum SocketType { Stream, Datagram, @@ -41,10 +99,10 @@ enum SocketType { pub struct UnixSocket { socket_type: SocketType, /// Recv inbox - inbox: Arc, + inbox: Inbox, /// The peer endpoint's inbox - peer_inbox: SpinLock>>, - local_addr: SpinLock>, + peer_inbox: SpinLock>, + local_addr: SpinLock>, connected: SpinLock, listening: SpinLock, backlog: SpinLock, @@ -57,7 +115,7 @@ impl UnixSocket { fn new(socket_type: SocketType) -> Self { UnixSocket { socket_type, - inbox: Arc::new(KPipe::new().expect("KPipe::new for UnixSocket")), + inbox: Inbox::new(socket_type), peer_inbox: SpinLock::new(None), local_addr: SpinLock::new(None), connected: SpinLock::new(false), @@ -250,7 +308,7 @@ impl SocketOps for UnixSocket { if *self.rd_shutdown.lock_save_irq() { return Ok(0); } - self.inbox.copy_to_user(buf, count).await + self.inbox.recv(buf, count).await } async fn recvfrom( @@ -289,7 +347,8 @@ impl SocketOps for UnixSocket { let Some(peer) = self.peer_inbox.lock_save_irq().clone() else { return Err(KernelError::InvalidValue); }; - peer.copy_from_user(buf, count).await + let local_addr = { self.local_addr.lock_save_irq().unwrap() }; + peer.send(local_addr, buf, count).await } async fn sendto(