diff --git a/core/src/p2p/manager.rs b/core/src/p2p/manager.rs index 8bffca74c..9d9e03771 100644 --- a/core/src/p2p/manager.rs +++ b/core/src/p2p/manager.rs @@ -67,7 +67,7 @@ impl P2PManager { > { let (tx, rx) = bounded(25); let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx); - let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?; + let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone()).map_err(|e| e.to_string())?; let libraries_hook_id = libraries_hook(p2p.clone(), libraries); let this = Arc::new(Self { p2p: p2p.clone(), diff --git a/crates/p2p/src/quic/transport.rs b/crates/p2p/src/quic/transport.rs index 0d493f66b..bbdaf2891 100644 --- a/crates/p2p/src/quic/transport.rs +++ b/crates/p2p/src/quic/transport.rs @@ -16,6 +16,7 @@ use libp2p::{ yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder, }; use serde::{Deserialize, Serialize}; +use thiserror::Error; use tokio::{ net::TcpListener, sync::{mpsc, oneshot}, @@ -78,6 +79,20 @@ struct MyBehaviour { dcutr: dcutr::Behaviour, } +#[derive(Debug, Error)] +pub enum QuicTransportError { + #[error("Failed to modify the SwarmBuilder: {0}")] + SwarmBuilderCreation(String), + #[error("Internal response channel closed: {0}")] + SendChannelClosed(String), + #[error("Internal response channel closed: {0}")] + ReceiveChannelClosed(#[from] oneshot::error::RecvError), + #[error("Failed internal event: {0}")] + InternalEvent(String), + #[error("Failed to create the Listener: {0}")] + ListenerSetup(std::io::Error), +} + /// Transport using Quic to establish a connection between peers. /// This uses `libp2p` internally. #[derive(Debug)] @@ -91,8 +106,7 @@ pub struct QuicTransport { impl QuicTransport { /// Spawn the `QuicTransport` and register it with the P2P system. /// Be aware spawning this does nothing unless you call `Self::set_ipv4_enabled`/`Self::set_ipv6_enabled` to enable the listeners. - // TODO: Error type here - pub fn spawn(p2p: Arc) -> Result<(Self, Libp2pPeerId), String> { + pub fn spawn(p2p: Arc) -> Result<(Self, Libp2pPeerId), QuicTransportError> { let keypair = identity_to_libp2p_keypair(p2p.identity()); let libp2p_peer_id = Libp2pPeerId(keypair.public().to_peer_id()); @@ -108,14 +122,14 @@ impl QuicTransport { .with_tokio() .with_quic() .with_relay_client(noise::Config::new, yamux::Config::default) - .map_err(|err| err.to_string())? + .map_err(|err| QuicTransportError::SwarmBuilderCreation(err.to_string()))? .with_behaviour(|keypair, relay_behaviour| MyBehaviour { stream: libp2p_stream::Behaviour::new(), relay: relay_behaviour, autonat: autonat::Behaviour::new(keypair.public().to_peer_id(), Default::default()), dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()), }) - .map_err(|err| err.to_string())? + .map_err(|err| QuicTransportError::SwarmBuilderCreation(err.to_string()))? .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); @@ -163,7 +177,7 @@ impl QuicTransport { } // `None` on the port means disabled. Use `0` for random port. - pub async fn set_ipv4_enabled(&self, port: Option) -> Result<(), String> { + pub async fn set_ipv4_enabled(&self, port: Option) -> Result<(), QuicTransportError> { self.setup_listener( port.map(|p| SocketAddr::from((Ipv4Addr::UNSPECIFIED, p))), true, @@ -171,7 +185,7 @@ impl QuicTransport { .await } - pub async fn set_ipv6_enabled(&self, port: Option) -> Result<(), String> { + pub async fn set_ipv6_enabled(&self, port: Option) -> Result<(), QuicTransportError> { self.setup_listener( port.map(|p| SocketAddr::from((Ipv6Addr::UNSPECIFIED, p))), false, @@ -179,18 +193,20 @@ impl QuicTransport { .await } - // TODO: Proper error type - async fn setup_listener(&self, addr: Option, ipv4: bool) -> Result<(), String> { + async fn setup_listener( + &self, + addr: Option, + ipv4: bool, + ) -> Result<(), QuicTransportError> { let (tx, rx) = oneshot::channel(); let event = if let Some(mut addr) = addr { if addr.port() == 0 { - #[allow(clippy::unwrap_used)] // TODO: Error handling addr.set_port( TcpListener::bind(addr) .await - .unwrap() + .map_err(|e| QuicTransportError::ListenerSetup(e))? .local_addr() - .unwrap() + .map_err(|e| QuicTransportError::ListenerSetup(e))? .port(), ); } @@ -209,12 +225,13 @@ impl QuicTransport { } }; - let Ok(_) = self.internal_tx.send(event) else { - return Err("internal channel closed".to_string()); - }; + self.internal_tx + .send(event) + .map_err(|e| QuicTransportError::SendChannelClosed(e.to_string()))?; + rx.await - .map_err(|_| "internal response channel closed".to_string()) - .and_then(|r| r) + .map_err(|e| QuicTransportError::ReceiveChannelClosed(e)) + .and_then(|r| r.map_err(|e| QuicTransportError::InternalEvent(e))) } pub async fn shutdown(self) {