initial message boundary implementation

This commit is contained in:
Ashwin Naren
2026-03-07 17:53:11 -08:00
parent f49a3f9b65
commit 53cd2d2bf3

View File

@@ -1,12 +1,14 @@
use crate::fs::open_file::FileCtx;
use crate::kernel::kpipe::KPipe;
use crate::memory::uaccess::copy_to_user_slice;
use crate::net::sops::{RecvFlags, SendFlags};
use crate::net::{SockAddr, SocketOps};
use crate::sync::OnceLock;
use crate::net::{SockAddr, SockAddrUn, SocketOps};
use crate::sync::SpinLock;
use crate::sync::{Mutex, OnceLock};
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::collections::{BTreeMap, VecDeque};
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use async_trait::async_trait;
use core::future::poll_fn;
@@ -15,9 +17,64 @@ use core::task::Waker;
use libkernel::error::{FsError, KernelError, Result};
use libkernel::memory::address::UA;
struct Message {
#[allow(unused)]
sender: SockAddrUn,
data: Vec<u8>,
}
#[derive(Clone)]
enum Inbox {
Pipe(Arc<KPipe>),
Datagram(Arc<Mutex<VecDeque<Message>>>),
}
impl Inbox {
fn new(socket_type: SocketType) -> Self {
match socket_type {
SocketType::Stream | SocketType::SeqPacket => {
Inbox::Pipe(Arc::new(KPipe::new().expect("KPipe creation failed")))
}
SocketType::Datagram => Inbox::Datagram(Arc::new(Mutex::new(VecDeque::new()))),
}
}
async fn send(&self, origin: SockAddrUn, buf: UA, count: usize) -> Result<usize> {
match self {
Inbox::Pipe(pipe) => pipe.copy_from_user(buf, count).await,
Inbox::Datagram(queue) => {
let mut data = vec![0u8; count];
copy_to_user_slice(&mut data, buf).await?;
let msg = Message {
sender: origin,
data,
};
queue.lock().await.push_back(msg);
Ok(count)
}
}
}
async fn recv(&self, buf: UA, count: usize) -> Result<usize> {
match self {
Inbox::Pipe(pipe) => pipe.copy_to_user(buf, count).await,
Inbox::Datagram(queue) => {
let mut q = queue.lock().await;
if let Some(msg) = q.pop_front() {
let n = msg.data.len().min(count);
copy_to_user_slice(&msg.data[..n], buf).await?;
Ok(n)
} else {
Ok(0)
}
}
}
}
}
/// Registry mapping Unix socket path bytes to endpoint inbox and listening state
struct Endpoint {
inbox: Arc<KPipe>,
inbox: Inbox,
listening: bool,
backlog_max: usize,
pending: Vec<UnixSocket>,
@@ -32,6 +89,7 @@ fn endpoints() -> &'static SpinLock<BTreeMap<Vec<u8>, Endpoint>> {
UNIX_ENDPOINTS.get_or_init(|| SpinLock::new(BTreeMap::new()))
}
#[derive(Copy, Clone)]
enum SocketType {
Stream,
Datagram,
@@ -41,10 +99,10 @@ enum SocketType {
pub struct UnixSocket {
socket_type: SocketType,
/// Recv inbox
inbox: Arc<KPipe>,
inbox: Inbox,
/// The peer endpoint's inbox
peer_inbox: SpinLock<Option<Arc<KPipe>>>,
local_addr: SpinLock<Option<crate::net::SockAddrUn>>,
peer_inbox: SpinLock<Option<Inbox>>,
local_addr: SpinLock<Option<SockAddrUn>>,
connected: SpinLock<bool>,
listening: SpinLock<bool>,
backlog: SpinLock<usize>,
@@ -57,7 +115,7 @@ impl UnixSocket {
fn new(socket_type: SocketType) -> Self {
UnixSocket {
socket_type,
inbox: Arc::new(KPipe::new().expect("KPipe::new for UnixSocket")),
inbox: Inbox::new(socket_type),
peer_inbox: SpinLock::new(None),
local_addr: SpinLock::new(None),
connected: SpinLock::new(false),
@@ -250,7 +308,7 @@ impl SocketOps for UnixSocket {
if *self.rd_shutdown.lock_save_irq() {
return Ok(0);
}
self.inbox.copy_to_user(buf, count).await
self.inbox.recv(buf, count).await
}
async fn recvfrom(
@@ -289,7 +347,8 @@ impl SocketOps for UnixSocket {
let Some(peer) = self.peer_inbox.lock_save_irq().clone() else {
return Err(KernelError::InvalidValue);
};
peer.copy_from_user(buf, count).await
let local_addr = { self.local_addr.lock_save_irq().unwrap() };
peer.send(local_addr, buf, count).await
}
async fn sendto(