From 0c25239c535b9e109d77ee9caaffb4b3af8712e4 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Mon, 13 Mar 2023 16:58:35 +0800 Subject: [PATCH] Typesafe stream handling (#603) * typesafe stream handling Now it's impossible for your to respond to a broadcast stream. Previously this would just fail due to the TCP connection having been closed by the sender. * remove connected_peers state from `SpaceTime` * use `OutEvent` instead of `tokio::spawn` for `SpaceTime` events * Polling for events > tokio::mpsc * extend max length of sync messages to ~4GB + broadcast done message * Fix "actOS" operating system decode error --- core/src/p2p/p2p_manager.rs | 90 +++++++++---- core/src/p2p/peer_metadata.rs | 12 +- core/src/p2p/protocol.rs | 7 +- crates/p2p/examples/basic.rs | 31 +++-- crates/p2p/src/manager.rs | 10 +- crates/p2p/src/manager_stream.rs | 140 +++++++++++++-------- crates/p2p/src/mdns.rs | 22 ++-- crates/p2p/src/peer.rs | 7 +- crates/p2p/src/spaceblock/mod.rs | 4 +- crates/p2p/src/spacetime/behaviour.rs | 139 +++++--------------- crates/p2p/src/spacetime/connection.rs | 13 +- crates/p2p/src/spacetime/message.rs | 11 -- crates/p2p/src/spacetime/mod.rs | 2 - crates/p2p/src/spacetime/proto_inbound.rs | 24 ++-- crates/p2p/src/spacetime/proto_outbound.rs | 51 +++++--- crates/p2p/src/spacetime/stream.rs | 125 +++++++++++++++--- 16 files changed, 389 insertions(+), 299 deletions(-) delete mode 100644 crates/p2p/src/spacetime/message.rs diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index c93046c35..8d7a9ed28 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc, time::Instant}; +use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant}; use rspc::Type; use sd_p2p::{ @@ -35,6 +35,8 @@ pub enum P2PEvent { pub struct P2PManager { events: broadcast::Sender, + // We hold this only so we don't get errors sending when no frontend's are listening + _events_rx: broadcast::Receiver, pub manager: Arc>, } @@ -69,7 +71,7 @@ impl P2PManager { manager.listen_addrs().await ); - let (events_tx, _) = broadcast::channel(100); + let (events_tx, events_rx) = broadcast::channel(100); let events = events_tx.clone(); tokio::spawn(async move { while let Some(event) = stream.next().await { @@ -116,9 +118,11 @@ impl P2PManager { // TODO: Save to the filesystem } Header::Sync(library_id) => { - let buf_len = event.stream.read_u8().await.unwrap(); + let mut len = [0; 4]; + event.stream.read_exact(&mut len).await.unwrap(); + let len = u32::from_be_bytes(len); - let mut buf = vec![0; buf_len as usize]; // TODO: Designed for easily being able to be DOS the current Node + let mut buf = vec![0; len as usize]; // TODO: Designed for easily being able to be DOS the current Node event.stream.read_exact(&mut buf).await.unwrap(); let mut buf: &[u8] = &buf; @@ -144,9 +148,13 @@ impl P2PManager { // https://docs.rs/ctrlc/latest/ctrlc/ // https://docs.rs/system_shutdown/latest/system_shutdown/ - let this = Arc::new(Self { events, manager }); + let this = Arc::new(Self { + events, + _events_rx: events_rx, + manager, + }); - // TODO: Probs remove this + // TODO: Probs remove this once connection timeout/keepalive are working correctly tokio::spawn({ let this = this.clone(); async move { @@ -157,26 +165,51 @@ impl P2PManager { } }); - // TODO: Probs remove this - tokio::spawn({ - let this = this.clone(); - async move { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let mut connected = this - .manager - .get_connected_peers() - .await - .unwrap() - .into_iter(); - if let Some(peer_id) = connected.next() { - info!("Starting Spacedrop to peer '{}'", peer_id); - this.big_bad_spacedrop(peer_id, PathBuf::from("./demo.txt")) + // TODO(@Oscar): Remove this in the future once i'm done using it for testing + if std::env::var("SPACEDROP_DEMO").is_ok() { + // tokio::spawn({ + // let this = this.clone(); + // async move { + // tokio::time::sleep(std::time::Duration::from_secs(5)).await; + // let mut connected = this + // .manager + // .get_connected_peers() + // .await + // .unwrap() + // .into_iter(); + // if let Some(peer_id) = connected.next() { + // info!("Starting Spacedrop to peer '{}'", peer_id); + // this.big_bad_spacedrop(peer_id, PathBuf::from("./demo.txt")) + // .await; + // } else { + // info!("No clients found so skipping Spacedrop demo!"); + // } + // } + // }); + + tokio::spawn({ + let this = this.clone(); + async move { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let mut connected = this + .manager + .get_connected_peers() + .await + .unwrap() + .into_iter(); + if let Some(peer_id) = connected.next() { + info!("Starting Spacedrop to peer '{}'", peer_id); + this.broadcast_sync_events( + Uuid::from_str("e4372586-d028-48f8-8be6-b4ff781a7dc2").unwrap(), + vec![], + ) .await; - } else { - info!("No clients found so skipping Spacedrop demo!"); + } else { + info!("No clients found so skipping Spacedrop demo!"); + } } - } - }); + }); + } this } @@ -189,10 +222,15 @@ impl P2PManager { pub async fn broadcast_sync_events(&self, library_id: Uuid, event: Vec) { let mut head_buf = Header::Sync(library_id).to_bytes(); let mut buf = rmp_serde::to_vec_named(&event).unwrap(); // TODO: Error handling - head_buf.push(buf.len() as u8); // TODO: This is going to overflow quickly so deal with it properly! + + let len: u32 = buf.len().try_into().unwrap(); // Max Sync payload is like 4GB + let mut len_buf = len.to_le_bytes(); + debug_assert_eq!(len_buf.len(), 4); + + head_buf.extend_from_slice(&mut len_buf); head_buf.append(&mut buf); - self.manager.broadcast(buf).await; + self.manager.broadcast(head_buf).await; } pub async fn ping(&self) { diff --git a/core/src/p2p/peer_metadata.rs b/core/src/p2p/peer_metadata.rs index 9a052cf66..4995c9c42 100644 --- a/core/src/p2p/peer_metadata.rs +++ b/core/src/p2p/peer_metadata.rs @@ -103,12 +103,12 @@ impl FromStr for OperatingSystem { fn from_str(s: &str) -> Result { let mut chars = s.chars(); match chars.next() { - Some('w') => Ok(OperatingSystem::Windows), - Some('l') => Ok(OperatingSystem::Linux), - Some('m') => Ok(OperatingSystem::MacOS), - Some('i') => Ok(OperatingSystem::Ios), - Some('a') => Ok(OperatingSystem::Android), - _ => Ok(OperatingSystem::Other(chars.as_str().to_string())), + Some('W') => Ok(OperatingSystem::Windows), + Some('L') => Ok(OperatingSystem::Linux), + Some('M') => Ok(OperatingSystem::MacOS), + Some('I') => Ok(OperatingSystem::Ios), + Some('A') => Ok(OperatingSystem::Android), + _ => Ok(OperatingSystem::Other(s.to_owned())), } } } diff --git a/core/src/p2p/protocol.rs b/core/src/p2p/protocol.rs index 24d75d2a4..a1a2229e6 100644 --- a/core/src/p2p/protocol.rs +++ b/core/src/p2p/protocol.rs @@ -16,7 +16,12 @@ impl Header { let discriminator = stream.read_u8().await.map_err(|_| ())?; // TODO: Error handling match discriminator { - 0 => Ok(Self::Spacedrop(TransferRequest::from_stream(stream).await?)), + 0 => match stream { + SpaceTimeStream::Unicast(stream) => { + Ok(Self::Spacedrop(TransferRequest::from_stream(stream).await?)) + } + _ => todo!(), + }, 1 => Ok(Self::Ping), 2 => { let mut uuid = [0u8; 16]; diff --git a/crates/p2p/examples/basic.rs b/crates/p2p/examples/basic.rs index b9ce2aaab..a871e660c 100644 --- a/crates/p2p/examples/basic.rs +++ b/crates/p2p/examples/basic.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, env, time::Duration}; -use sd_p2p::{Event, Keypair, Manager, Metadata}; +use sd_p2p::{spacetime::SpaceTimeStream, Event, Keypair, Manager, Metadata}; use tokio::{io::AsyncReadExt, time::sleep}; use tracing::{debug, error, info}; @@ -73,17 +73,24 @@ async fn main() { debug!("Peer '{}' established stream", event.peer_id); tokio::spawn(async move { - let mut stream = event.stream; - - let mut buf = [0; 100]; - let n = stream.read(&mut buf).await.unwrap(); - println!("GOT: {:?}", std::str::from_utf8(&buf[..n]).unwrap()); - - // TODO: Allow differentiating whether this is Spacedrop or Sync message here - - // TODO - // event.stream.write_all(response.as_bytes()).await.unwrap(); - // event.stream.flush().await.unwrap(); + match event.stream { + SpaceTimeStream::Broadcast(mut stream) => { + let mut buf = [0; 100]; + let n = stream.read(&mut buf).await.unwrap(); + println!( + "GOT BROADCAST: {:?}", + std::str::from_utf8(&buf[..n]).unwrap() + ); + } + SpaceTimeStream::Unicast(mut stream) => { + let mut buf = [0; 100]; + let n = stream.read(&mut buf).await.unwrap(); + println!( + "GOT UNICAST: {:?}", + std::str::from_utf8(&buf[..n]).unwrap() + ); + } + } }); } _ => debug!("event: {:?}", event), diff --git a/crates/p2p/src/manager.rs b/crates/p2p/src/manager.rs index e082cdf9c..430889af8 100644 --- a/crates/p2p/src/manager.rs +++ b/crates/p2p/src/manager.rs @@ -10,7 +10,7 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use tracing::{debug, error, warn}; use crate::{ - spacetime::{SpaceTime, SpaceTimeStream}, + spacetime::{SpaceTime, UnicastStream}, AsyncFn, DiscoveredPeer, Keypair, ManagerStream, ManagerStreamAction, Mdns, Metadata, PeerId, }; @@ -113,14 +113,16 @@ impl Manager { }) } - pub async fn stream(&self, peer_id: PeerId) -> Result { + pub async fn stream(&self, peer_id: PeerId) -> Result { // TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that. let (tx, rx) = oneshot::channel(); self.emit(ManagerStreamAction::StartStream(peer_id, tx)) .await; - rx.await.map_err(|_| { + let mut stream = rx.await.map_err(|_| { warn!("failed to queue establishing stream to peer '{peer_id}'!"); - }) + })?; + stream.write_discriminator().await.unwrap(); // TODO: Error handling + Ok(stream) } pub async fn broadcast(&self, data: Vec) { diff --git a/crates/p2p/src/manager_stream.rs b/crates/p2p/src/manager_stream.rs index 77311a8ac..eadeaae2d 100644 --- a/crates/p2p/src/manager_stream.rs +++ b/crates/p2p/src/manager_stream.rs @@ -1,4 +1,5 @@ use std::{ + fmt, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, @@ -19,7 +20,7 @@ use tracing::{debug, error, warn}; use crate::{ quic_multiaddr_to_socketaddr, socketaddr_to_quic_multiaddr, - spacetime::{OutboundRequest, SpaceTime, SpaceTimeStream}, + spacetime::{OutboundRequest, SpaceTime, UnicastStream}, AsyncFn, Event, Manager, Mdns, Metadata, PeerId, }; @@ -35,11 +36,17 @@ pub enum ManagerStreamAction { addresses: Vec, }, /// TODO - StartStream(PeerId, oneshot::Sender), + StartStream(PeerId, oneshot::Sender), /// TODO BroadcastData(Vec), } +impl fmt::Debug for ManagerStreamAction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("ManagerStreamAction") + } +} + impl From> for ManagerStreamAction { fn from(event: Event) -> Self { Self::Event(event) @@ -69,58 +76,25 @@ where // We loop polling internal services until an event comes in that needs to be sent to the parent application. loop { tokio::select! { - _ = self.mdns.poll() => {}, + event = self.mdns.poll() => { + if let Some(event) = event { + return Some(event); + } + continue; + }, event = self.event_stream_rx.recv() => { // If the sender has shut down we return `None` to also shut down too. - match event? { - ManagerStreamAction::Event(event) => return Some(event), - ManagerStreamAction::GetConnectedPeers(response) => { - response.send(self.swarm.behaviour().connected_peers.values().map(|p| p.peer_id).collect::>()).map_err(|_| error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!")).ok(); - }, - ManagerStreamAction::Dial { peer_id, addresses } => { - match self.swarm.dial( - DialOpts::peer_id(peer_id.0) - .condition(PeerCondition::Disconnected) - .addresses( - addresses - .iter() - .map(socketaddr_to_quic_multiaddr) - .collect(), - ) - .extend_addresses_through_behaviour() - .build(), - ) { - Ok(_) => {} - Err(err) => warn!( - "error dialing peer '{}' with addresses '{:?}': {}", - peer_id, addresses, err - ), - } - } - ManagerStreamAction::StartStream(peer_id, rx) => { - self.swarm.behaviour_mut().pending_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.0, - handler: NotifyHandler::Any, - event: OutboundRequest::Stream(rx), - }); - } - ManagerStreamAction::BroadcastData(data) => { - let swarm = self.swarm.behaviour_mut(); - for peer in swarm.connected_peers.values() { - swarm.pending_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer.peer_id.0, - handler: NotifyHandler::Any, - event: OutboundRequest::Data(data.clone()), - }); - } - } + if let Some(event) = self.handle_manager_stream_action(event?) { + return Some(event); } } event = self.swarm.select_next_some() => { match event { - SwarmEvent::Behaviour(()) => {}, + SwarmEvent::Behaviour(event) => { + if let Some(event) = self.handle_manager_stream_action(event) { + return Some(event); + } + }, SwarmEvent::ConnectionEstablished { .. } => {}, SwarmEvent::ConnectionClosed { .. } => {}, SwarmEvent::IncomingConnection { local_addr, .. } => debug!("incoming connection from '{}'", local_addr), @@ -136,8 +110,8 @@ where self.is_advertisement_queued.store(true, Ordering::Relaxed); self.mdns.advertise(); } - self.manager.emit(Event::AddListenAddr(addr).into()).await; self.mdns.advertise(); + return Some(Event::AddListenAddr(addr)); }, Err(err) => { warn!("error passing listen address: {}", err); @@ -174,12 +148,73 @@ where } } + fn handle_manager_stream_action( + &mut self, + event: ManagerStreamAction, + ) -> Option> { + match event { + ManagerStreamAction::Event(event) => return Some(event), + ManagerStreamAction::GetConnectedPeers(response) => { + response + .send( + self.swarm + .connected_peers() + .map(|v| PeerId(*v)) + .collect::>(), + ) + .map_err(|_| { + error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!") + }) + .ok(); + } + ManagerStreamAction::Dial { peer_id, addresses } => { + match self.swarm.dial( + DialOpts::peer_id(peer_id.0) + .condition(PeerCondition::Disconnected) + .addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect()) + .build(), + ) { + Ok(_) => {} + Err(err) => warn!( + "error dialing peer '{}' with addresses '{:?}': {}", + peer_id, addresses, err + ), + } + } + ManagerStreamAction::StartStream(peer_id, rx) => { + self.swarm.behaviour_mut().pending_events.push_back( + NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.0, + handler: NotifyHandler::Any, + event: OutboundRequest::Unicast(rx), + }, + ); + } + ManagerStreamAction::BroadcastData(data) => { + let connected_peers = self.swarm.connected_peers().map(|v| *v).collect::>(); + let behaviour = self.swarm.behaviour_mut(); + for peer_id in connected_peers { + behaviour + .pending_events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id, + handler: NotifyHandler::Any, + event: OutboundRequest::Broadcast(data.clone()), + }); + } + } + } + + return None; + } + + // TODO: Move into mdns async fn unregister_addr( manager: &Arc>, mdns: &Mdns, is_advertisement_queued: &AtomicBool, address: Multiaddr, - ) -> Result<(), String> { + ) -> Result, String> { match quic_multiaddr_to_socketaddr(address) { Ok(addr) => { debug!("listen address removed: {}", addr); @@ -189,10 +224,7 @@ where is_advertisement_queued.store(true, Ordering::Relaxed); mdns.advertise(); } - manager - .emit(ManagerStreamAction::Event(Event::RemoveListenAddr(addr))) - .await; - Ok(()) + Ok(Event::RemoveListenAddr(addr)) } Err(err) => Err(err), } diff --git a/crates/p2p/src/mdns.rs b/crates/p2p/src/mdns.rs index 8dbf9416b..4bec9c5b5 100644 --- a/crates/p2p/src/mdns.rs +++ b/crates/p2p/src/mdns.rs @@ -123,7 +123,7 @@ where } // TODO: if the channel's sender is dropped will this cause the `tokio::select` in the `manager.rs` to infinitely loop? - pub async fn poll(&mut self) { + pub async fn poll(&mut self) -> Option> { tokio::select! { _ = &mut self.next_mdns_advertisement => { self.advertise(); @@ -143,7 +143,7 @@ where Ok(peer_id) => { // Prevent discovery of the current peer. if peer_id == self.manager.peer_id { - return; + return None; } match TMetadata::from_hashmap( @@ -182,7 +182,7 @@ where discovered_peers.insert(peer_id, peer.clone()); peer }; - self.manager.emit(Event::PeerDiscovered(peer).into()).await; + return Some(Event::PeerDiscovered(peer)); } Err(err) => { error!("error parsing metadata for peer '{}': {}", raw_peer_id, err) @@ -202,7 +202,7 @@ where Ok(peer_id) => { // Prevent discovery of the current peer. if peer_id == self.manager.peer_id { - return; + return None; } { @@ -210,12 +210,10 @@ where self.manager.discovered.write().await; let peer = discovered_peers.remove(&peer_id); - self.manager - .emit(Event::PeerExpired { - id: peer_id, - metadata: peer.map(|p| p.metadata), - }.into()) - .await; + return Some(Event::PeerExpired { + id: peer_id, + metadata: peer.map(|p| p.metadata), + }); } } Err(_) => warn!( @@ -227,6 +225,8 @@ where ServiceEvent::SearchStopped(_) => {} } } - } + }; + + None } } diff --git a/crates/p2p/src/peer.rs b/crates/p2p/src/peer.rs index a6f0dbc01..bfb8ddda7 100644 --- a/crates/p2p/src/peer.rs +++ b/crates/p2p/src/peer.rs @@ -1,6 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; - -use libp2p::{core::ConnectedPoint, swarm::ConnectionId}; +use std::{net::SocketAddr, sync::Arc}; use crate::{Manager, ManagerStreamAction, Metadata, PeerId}; @@ -40,7 +38,4 @@ impl DiscoveredPeer { pub struct ConnectedPeer { /// get the peer id of the discovered peer pub peer_id: PeerId, - /// list of connections between the peer and the local node - #[cfg_attr(any(feature = "serde", feature = "specta"), serde(skip))] - pub connections: HashMap, // TODO: Probs use `thinvec` style thing here } diff --git a/crates/p2p/src/spaceblock/mod.rs b/crates/p2p/src/spaceblock/mod.rs index de4974b1e..200f71547 100644 --- a/crates/p2p/src/spaceblock/mod.rs +++ b/crates/p2p/src/spaceblock/mod.rs @@ -10,7 +10,7 @@ use std::{ use tokio::io::AsyncReadExt; -use crate::spacetime::SpaceTimeStream; +use crate::spacetime::{SpaceTimeStream, UnicastStream}; /// TODO #[derive(Debug, PartialEq, Eq)] @@ -35,7 +35,7 @@ pub struct TransferRequest { } impl TransferRequest { - pub async fn from_stream(stream: &mut SpaceTimeStream) -> Result { + pub async fn from_stream(stream: &mut UnicastStream) -> Result { let name_len = stream.read_u8().await.map_err(|_| ())?; // TODO: Error handling let mut name = vec![0u8; name_len as usize]; stream.read_exact(&mut name).await.map_err(|_| ())?; // TODO: Error handling diff --git a/crates/p2p/src/spacetime/behaviour.rs b/crates/p2p/src/spacetime/behaviour.rs index 89bc95b9c..e6c303724 100644 --- a/crates/p2p/src/spacetime/behaviour.rs +++ b/crates/p2p/src/spacetime/behaviour.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::VecDeque, sync::Arc, task::{Context, Poll}, }; @@ -14,7 +14,7 @@ use libp2p::{ Multiaddr, }; use thiserror::Error; -use tracing::{debug, warn}; +use tracing::debug; use crate::{ConnectedPeer, Event, Manager, ManagerStreamAction, Metadata, PeerId}; @@ -38,7 +38,7 @@ pub struct SpaceTime { NetworkBehaviourAction<::OutEvent, THandlerInEvent>, >, // For future me's sake, DON't try and refactor this to use shared state (for the nth time), it doesn't fit into libp2p's synchronous trait and polling model!!! - pub(crate) connected_peers: HashMap, + // pub(crate) connected_peers: HashMap, } impl SpaceTime { @@ -47,14 +47,14 @@ impl SpaceTime { Self { manager, pending_events: VecDeque::new(), - connected_peers: HashMap::new(), + // connected_peers: HashMap::new(), } } } impl NetworkBehaviour for SpaceTime { type ConnectionHandler = SpaceTimeConnection; - type OutEvent = (); + type OutEvent = ManagerStreamAction; fn handle_established_inbound_connection( &mut self, @@ -69,43 +69,15 @@ impl NetworkBehaviour for SpaceTime { )) } - // TODO: Are we even using the response to this?? - // TODO: Do we need to load from state or can be just pass through the `addresses` arg? fn handle_pending_outbound_connection( &mut self, _connection_id: ConnectionId, - maybe_peer: Option, + _maybe_peer: Option, _addresses: &[Multiaddr], _effective_role: Endpoint, ) -> Result, ConnectionDenied> { - if let Some(peer_id) = maybe_peer { - let mut addresses = Vec::new(); - if let Some(connection) = self.connected_peers.get(&PeerId(peer_id)) { - #[allow(clippy::unnecessary_filter_map)] - // TODO: Clippy is getting annoyed cause of the `todo!()`. Remove this once that is fixed. - addresses.extend( - connection - .connections - .iter() - .filter_map(|(_, cp)| match cp { - ConnectedPoint::Dialer { address, .. } => Some(address.clone()), - ConnectedPoint::Listener { - local_addr, - send_back_addr, - } => { - println!( - "TODO: Handle this case! ({} -> {})", - local_addr, send_back_addr - ); - todo!(); - } - }), - ) - } - Ok(addresses) - } else { - Ok(vec![]) - } + // This should be unused but libp2p still calls it + Ok(vec![]) } fn handle_established_outbound_connection( @@ -125,7 +97,6 @@ impl NetworkBehaviour for SpaceTime { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, - connection_id, endpoint, other_established, .. @@ -140,92 +111,39 @@ impl NetworkBehaviour for SpaceTime { ); let peer_id = PeerId(peer_id); - let endpoint = endpoint.clone(); - let conn = match self.connected_peers.get_mut(&peer_id) { - Some(peer) => { - peer.connections.insert(connection_id, endpoint); - - peer.clone() - } - None => { - self.connected_peers.insert( - peer_id, - ConnectedPeer { - peer_id, - connections: HashMap::from([(connection_id, endpoint)]), - }, - ); - self.connected_peers - .get(&peer_id) - .expect("We legit have a mutable reference") - .clone() - } - }; // TODO: Move this block onto into `connection.rs` -> will probs be required for the ConnectionEstablishmentPayload stuff { debug!("sending establishment request to peer '{}'", peer_id); if other_established == 0 { - let manager = self.manager.clone(); - tokio::spawn(async move { - manager - .emit(ManagerStreamAction::Event(Event::PeerConnected(conn))) - .await; - }); + self.pending_events + .push_back(NetworkBehaviourAction::GenerateEvent( + ManagerStreamAction::Event(Event::PeerConnected(ConnectedPeer { + peer_id, + })), + )); } } } FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, - connection_id, + remaining_established, .. }) => { let peer_id = PeerId(peer_id); - match self.connected_peers.get_mut(&peer_id) { - Some(peer) => { - if peer.connections.len() == 1 { - let conn = self.connected_peers.remove(&peer_id).expect("Literally impossible. We have a mutable reference to it, no shot it's already been removed."); - debug!("Disconnected from peer '{}'", conn.peer_id); - - let manager = self.manager.clone(); - tokio::spawn(async move { - manager - .emit(ManagerStreamAction::Event(Event::PeerDisconnected( - conn.peer_id, - ))) - .await; - }); - } else { - peer.connections.remove(&connection_id); - } - } - None => { - warn!( - "Received connection closed event for peer '{}' but no connection was found!", - peer_id - ); - } + if remaining_established == 0 { + debug!("Disconnected from peer '{}'", peer_id); + self.pending_events + .push_back(NetworkBehaviourAction::GenerateEvent( + ManagerStreamAction::Event(Event::PeerDisconnected(peer_id)), + )); } } - FromSwarm::AddressChange(_event) => { - // TODO: Reenable? - // let new_address = match event.new { - // ConnectedPoint::Dialer { address, .. } => Some(address.clone()), - // ConnectedPoint::Listener { .. } => None, - // }; - // let connected = self - // .manager - // .connections - // .blocking_read() - // .get_mut(&PeerId(event.peer_id)) - // .expect("Address change can only happen on an established connection."); - - // let connection = connected - // .connections - // .iter_mut() - // .find(|c| c.id == event.connection_id) - // .expect("Address change can only happen on an established connection."); - // connection.address = new_address; + FromSwarm::AddressChange(event) => { + debug!( + "Address change event: {:?} {:?} {:?} {:?}", + event.peer_id, event.connection_id, event.old, event.new + ); } FromSwarm::DialFailure(event) => { if let Some(peer_id) = event.peer_id { @@ -267,9 +185,10 @@ impl NetworkBehaviour for SpaceTime { &mut self, _peer_id: libp2p::PeerId, _connection: ConnectionId, - _event: as ConnectionHandler>::OutEvent, + event: as ConnectionHandler>::OutEvent, ) { - todo!(); + self.pending_events + .push_back(NetworkBehaviourAction::GenerateEvent(event)); } fn poll( diff --git a/crates/p2p/src/spacetime/connection.rs b/crates/p2p/src/spacetime/connection.rs index 6a660e878..86a5fea82 100644 --- a/crates/p2p/src/spacetime/connection.rs +++ b/crates/p2p/src/spacetime/connection.rs @@ -1,7 +1,7 @@ use libp2p::swarm::{ handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - KeepAlive, + FullyNegotiatedInbound, KeepAlive, }, SubstreamProtocol, }; @@ -14,7 +14,7 @@ use std::{ }; use tracing::error; -use crate::{Manager, Metadata, PeerId}; +use crate::{Manager, ManagerStreamAction, Metadata, PeerId}; use super::{InboundProtocol, OutboundProtocol, OutboundRequest, EMPTY_QUEUE_SHRINK_THRESHOLD}; @@ -49,7 +49,7 @@ impl SpaceTimeConnection { impl ConnectionHandler for SpaceTimeConnection { type InEvent = OutboundRequest; - type OutEvent = (); + type OutEvent = ManagerStreamAction; type Error = ConnectionHandlerUpgrErr; type InboundProtocol = InboundProtocol; type OutboundProtocol = OutboundProtocol; @@ -117,7 +117,12 @@ impl ConnectionHandler for SpaceTimeConnection { >, ) { match event { - ConnectionEvent::FullyNegotiatedInbound(_) => {} + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => { + self.pending_events + .push_back(ConnectionHandlerEvent::Custom(protocol)); + } ConnectionEvent::FullyNegotiatedOutbound(_) => {} ConnectionEvent::DialUpgradeError(event) => { error!("DialUpgradeError: {:#?}", event.error); diff --git a/crates/p2p/src/spacetime/message.rs b/crates/p2p/src/spacetime/message.rs deleted file mode 100644 index fad5a16f3..000000000 --- a/crates/p2p/src/spacetime/message.rs +++ /dev/null @@ -1,11 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// TODO -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum SpaceTimeMessage { - /// Establish the connection - Establish, - - /// Send data on behalf of application - Application(Vec), -} diff --git a/crates/p2p/src/spacetime/mod.rs b/crates/p2p/src/spacetime/mod.rs index 0f74252b0..6a1742086 100644 --- a/crates/p2p/src/spacetime/mod.rs +++ b/crates/p2p/src/spacetime/mod.rs @@ -4,7 +4,6 @@ mod behaviour; mod connection; mod libp2p; -mod message; mod proto_inbound; mod proto_outbound; mod stream; @@ -12,7 +11,6 @@ mod stream; pub use self::libp2p::*; pub use behaviour::*; pub use connection::*; -pub use message::*; pub use proto_inbound::*; pub use proto_outbound::*; pub use stream::*; diff --git a/crates/p2p/src/spacetime/proto_inbound.rs b/crates/p2p/src/spacetime/proto_inbound.rs index 842d62b3d..1c0d5bb57 100644 --- a/crates/p2p/src/spacetime/proto_inbound.rs +++ b/crates/p2p/src/spacetime/proto_inbound.rs @@ -21,25 +21,21 @@ impl UpgradeInfo for InboundProtocol { } impl InboundUpgrade for InboundProtocol { - type Output = (); + type Output = ManagerStreamAction; type Error = (); type Future = Pin> + Send + 'static>>; fn upgrade_inbound(self, io: NegotiatedSubstream, _: Self::Info) -> Self::Future { Box::pin(async move { - self.manager - .emit(ManagerStreamAction::Event( - PeerMessageEvent { - peer_id: self.peer_id, - manager: self.manager.clone(), - stream: SpaceTimeStream::new(io), - _priv: (), - } - .into(), - )) - .await; - - Ok(()) + Ok(ManagerStreamAction::Event( + PeerMessageEvent { + peer_id: self.peer_id, + manager: self.manager.clone(), + stream: SpaceTimeStream::from_stream(io).await, + _priv: (), + } + .into(), + )) }) } } diff --git a/crates/p2p/src/spacetime/proto_outbound.rs b/crates/p2p/src/spacetime/proto_outbound.rs index efb7075c4..9fea42837 100644 --- a/crates/p2p/src/spacetime/proto_outbound.rs +++ b/crates/p2p/src/spacetime/proto_outbound.rs @@ -1,15 +1,23 @@ -use std::future::{ready, Ready}; +use std::{ + future::{ready, Ready}, + io::ErrorKind, +}; -use libp2p::{core::UpgradeInfo, swarm::NegotiatedSubstream, OutboundUpgrade}; -use tokio::{io::AsyncWriteExt, sync::oneshot}; +use libp2p::{ + core::UpgradeInfo, + futures::{AsyncReadExt, AsyncWriteExt}, + swarm::NegotiatedSubstream, + OutboundUpgrade, +}; +use tokio::sync::oneshot; use tracing::error; -use super::{SpaceTimeProtocolName, SpaceTimeStream}; +use super::{SpaceTimeProtocolName, UnicastStream, BROADCAST_DISCRIMINATOR}; #[derive(Debug)] pub enum OutboundRequest { - Data(Vec), - Stream(oneshot::Sender), + Broadcast(Vec), + Unicast(oneshot::Sender), } pub struct OutboundProtocol(pub(crate) &'static [u8], pub(crate) OutboundRequest); @@ -28,24 +36,33 @@ impl OutboundUpgrade for OutboundProtocol { type Error = (); type Future = Ready>; - fn upgrade_outbound(self, io: NegotiatedSubstream, _protocol: Self::Info) -> Self::Future { - let mut stream = SpaceTimeStream::new(io); + fn upgrade_outbound(self, mut io: NegotiatedSubstream, _protocol: Self::Info) -> Self::Future { match self.1 { - OutboundRequest::Data(data) => { + OutboundRequest::Broadcast(data) => { tokio::spawn(async move { - if let Err(err) = stream.write_all(&data).await { + io.write_all(&[BROADCAST_DISCRIMINATOR]).await.unwrap(); + if let Err(err) = io.write_all(&data).await { // TODO: Print the peer which we failed to send to here error!("Error sending broadcast: {:?}", err); } - stream.flush().await.unwrap(); - stream.close().await.unwrap(); - // TODO: We close the connection here without waiting for a response. - // TODO: If the other side's user-code doesn't account for that on this specific message they will error. - // TODO: Add an abstraction so the user can't respond to fixed size messages. + io.flush().await.unwrap(); + + let mut buf = [0u8; 1]; + io.read_exact(&mut buf).await.unwrap(); + debug_assert_eq!(buf[0], b'D', "Peer should let us know they were done!"); + + match io.close().await { + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::ConnectionReset => {} // The other end shut the connection before us + Err(err) => { + error!("Error closing broadcast stream: {:?}", err); + } + } }); } - OutboundRequest::Stream(sender) => { - sender.send(stream).unwrap(); + OutboundRequest::Unicast(sender) => { + // We write the discriminator to the stream in the `Manager::stream` method before returning the stream to the user to make async a tad nicer. + sender.send(UnicastStream::new(io)).unwrap(); } } diff --git a/crates/p2p/src/spacetime/stream.rs b/crates/p2p/src/spacetime/stream.rs index 51924bced..f7837e1a4 100644 --- a/crates/p2p/src/spacetime/stream.rs +++ b/crates/p2p/src/spacetime/stream.rs @@ -1,29 +1,126 @@ use std::{ - io, + io::{self, ErrorKind}, pin::Pin, task::{Context, Poll}, }; use libp2p::{futures::AsyncWriteExt, swarm::NegotiatedSubstream}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::io::{ + AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt as TokioAsyncWriteExt, ReadBuf, +}; use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; +use tracing::error; + +pub const BROADCAST_DISCRIMINATOR: u8 = 0; +pub const UNICAST_DISCRIMINATOR: u8 = 1; #[derive(Debug)] -pub struct SpaceTimeStream(Compat); - -// TODO: Utils for sending msgpack and stuff over the stream. -> Have a max size of reading buffers so we are less susceptible to DoS attacks. +pub enum SpaceTimeStream { + Broadcast(BroadcastStream), + Unicast(UnicastStream), +} impl SpaceTimeStream { - pub fn new(io: NegotiatedSubstream) -> Self { - Self(io.compat()) + pub(crate) async fn from_stream(io: NegotiatedSubstream) -> Self { + let mut io = io.compat(); + let discriminator = io.read_u8().await.unwrap(); // TODO: Timeout on this + match discriminator { + BROADCAST_DISCRIMINATOR => Self::Broadcast(BroadcastStream(Some(io))), + UNICAST_DISCRIMINATOR => Self::Unicast(UnicastStream(io)), + _ => todo!(), // TODO: Error handling + } } pub async fn close(self) -> Result<(), io::Error> { - self.0.into_inner().close().await + match self { + Self::Broadcast(mut stream) => { + if let Some(stream) = stream.0.take() { + BroadcastStream::close_inner(stream).await + } else { + debug_assert!(true, "'BroadcastStream' should never be 'None' here!"); + error!("'BroadcastStream' should never be 'None' here!"); + Ok(()) + } + } + Self::Unicast(stream) => stream.0.into_inner().close().await, + } } } impl AsyncRead for SpaceTimeStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.get_mut() { + Self::Broadcast(stream) => Pin::new(stream).poll_read(cx, buf), + Self::Unicast(stream) => Pin::new(stream).poll_read(cx, buf), + } + } +} + +/// A broadcast is a message sent to many peers in the network. +/// Due to this it is not possible to respond to a broadcast. +#[derive(Debug)] +pub struct BroadcastStream(Option>); + +impl BroadcastStream { + async fn close_inner(mut io: Compat) -> Result<(), io::Error> { + io.write_all(&[b'D']).await?; + io.flush().await?; + + match io.into_inner().close().await { + Ok(_) => Ok(()), + Err(err) if err.kind() == ErrorKind::ConnectionReset => Ok(()), // The other end shut the connection before us + Err(err) => Err(err), + } + } +} + +impl AsyncRead for BroadcastStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().0.as_mut().expect("'BroadcastStream' can only be 'None' if this method is called after 'Drop' which ain't happening!")).poll_read(cx, buf) + } +} + +impl Drop for BroadcastStream { + fn drop(&mut self) { + // This may be `None` if the user manually called `Self::close` + if let Some(stream) = self.0.take() { + tokio::spawn(async move { + Self::close_inner(stream).await.unwrap(); + }); + } + } +} + +/// A unicast stream is a direct stream to a specific peer. +#[derive(Debug)] +pub struct UnicastStream(Compat); + +// TODO: Utils for sending msgpack and stuff over the stream. -> Have a max size of reading buffers so we are less susceptible to DoS attacks. + +impl UnicastStream { + pub(crate) fn new(io: NegotiatedSubstream) -> Self { + Self(io.compat()) + } + + pub(crate) async fn write_discriminator(&mut self) -> io::Result<()> { + // TODO: Timeout if the peer doesn't accept the byte quick enough + self.0.write_all(&[UNICAST_DISCRIMINATOR]).await + } + + pub async fn close(self) -> Result<(), io::Error> { + self.0.into_inner().close().await + } +} + +impl AsyncRead for UnicastStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -33,17 +130,7 @@ impl AsyncRead for SpaceTimeStream { } } -// impl AsyncBufRead for SpaceTimeStream { -// fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// Pin::new(&mut self.get_mut().0).poll_fill_buf(cx) -// } - -// fn consume(self: Pin<&mut Self>, amt: usize) { -// Pin::new(&mut self.get_mut().0).consume(amt) -// } -// } - -impl AsyncWrite for SpaceTimeStream { +impl AsyncWrite for UnicastStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>,