Typesafe stream handling (#603)

* typesafe stream handling
Now it's impossible for your to respond to a broadcast stream. Previously this would just fail due to the TCP connection having been closed by the sender.

* remove connected_peers state from `SpaceTime`

* use `OutEvent` instead of `tokio::spawn` for `SpaceTime` events

* Polling for events > tokio::mpsc

* extend max length of sync messages to ~4GB + broadcast done message

* Fix "actOS" operating system decode error
This commit is contained in:
Oscar Beaumont
2023-03-13 16:58:35 +08:00
committed by GitHub
parent 4a6b057872
commit 0c25239c53
16 changed files with 389 additions and 299 deletions

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc, time::Instant};
use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant};
use rspc::Type;
use sd_p2p::{
@@ -35,6 +35,8 @@ pub enum P2PEvent {
pub struct P2PManager {
events: broadcast::Sender<P2PEvent>,
// We hold this only so we don't get errors sending when no frontend's are listening
_events_rx: broadcast::Receiver<P2PEvent>,
pub manager: Arc<Manager<PeerMetadata>>,
}
@@ -69,7 +71,7 @@ impl P2PManager {
manager.listen_addrs().await
);
let (events_tx, _) = broadcast::channel(100);
let (events_tx, events_rx) = broadcast::channel(100);
let events = events_tx.clone();
tokio::spawn(async move {
while let Some(event) = stream.next().await {
@@ -116,9 +118,11 @@ impl P2PManager {
// TODO: Save to the filesystem
}
Header::Sync(library_id) => {
let buf_len = event.stream.read_u8().await.unwrap();
let mut len = [0; 4];
event.stream.read_exact(&mut len).await.unwrap();
let len = u32::from_be_bytes(len);
let mut buf = vec![0; buf_len as usize]; // TODO: Designed for easily being able to be DOS the current Node
let mut buf = vec![0; len as usize]; // TODO: Designed for easily being able to be DOS the current Node
event.stream.read_exact(&mut buf).await.unwrap();
let mut buf: &[u8] = &buf;
@@ -144,9 +148,13 @@ impl P2PManager {
// https://docs.rs/ctrlc/latest/ctrlc/
// https://docs.rs/system_shutdown/latest/system_shutdown/
let this = Arc::new(Self { events, manager });
let this = Arc::new(Self {
events,
_events_rx: events_rx,
manager,
});
// TODO: Probs remove this
// TODO: Probs remove this once connection timeout/keepalive are working correctly
tokio::spawn({
let this = this.clone();
async move {
@@ -157,26 +165,51 @@ impl P2PManager {
}
});
// TODO: Probs remove this
tokio::spawn({
let this = this.clone();
async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let mut connected = this
.manager
.get_connected_peers()
.await
.unwrap()
.into_iter();
if let Some(peer_id) = connected.next() {
info!("Starting Spacedrop to peer '{}'", peer_id);
this.big_bad_spacedrop(peer_id, PathBuf::from("./demo.txt"))
// TODO(@Oscar): Remove this in the future once i'm done using it for testing
if std::env::var("SPACEDROP_DEMO").is_ok() {
// tokio::spawn({
// let this = this.clone();
// async move {
// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
// let mut connected = this
// .manager
// .get_connected_peers()
// .await
// .unwrap()
// .into_iter();
// if let Some(peer_id) = connected.next() {
// info!("Starting Spacedrop to peer '{}'", peer_id);
// this.big_bad_spacedrop(peer_id, PathBuf::from("./demo.txt"))
// .await;
// } else {
// info!("No clients found so skipping Spacedrop demo!");
// }
// }
// });
tokio::spawn({
let this = this.clone();
async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let mut connected = this
.manager
.get_connected_peers()
.await
.unwrap()
.into_iter();
if let Some(peer_id) = connected.next() {
info!("Starting Spacedrop to peer '{}'", peer_id);
this.broadcast_sync_events(
Uuid::from_str("e4372586-d028-48f8-8be6-b4ff781a7dc2").unwrap(),
vec![],
)
.await;
} else {
info!("No clients found so skipping Spacedrop demo!");
} else {
info!("No clients found so skipping Spacedrop demo!");
}
}
}
});
});
}
this
}
@@ -189,10 +222,15 @@ impl P2PManager {
pub async fn broadcast_sync_events(&self, library_id: Uuid, event: Vec<CRDTOperation>) {
let mut head_buf = Header::Sync(library_id).to_bytes();
let mut buf = rmp_serde::to_vec_named(&event).unwrap(); // TODO: Error handling
head_buf.push(buf.len() as u8); // TODO: This is going to overflow quickly so deal with it properly!
let len: u32 = buf.len().try_into().unwrap(); // Max Sync payload is like 4GB
let mut len_buf = len.to_le_bytes();
debug_assert_eq!(len_buf.len(), 4);
head_buf.extend_from_slice(&mut len_buf);
head_buf.append(&mut buf);
self.manager.broadcast(buf).await;
self.manager.broadcast(head_buf).await;
}
pub async fn ping(&self) {

View File

@@ -103,12 +103,12 @@ impl FromStr for OperatingSystem {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut chars = s.chars();
match chars.next() {
Some('w') => Ok(OperatingSystem::Windows),
Some('l') => Ok(OperatingSystem::Linux),
Some('m') => Ok(OperatingSystem::MacOS),
Some('i') => Ok(OperatingSystem::Ios),
Some('a') => Ok(OperatingSystem::Android),
_ => Ok(OperatingSystem::Other(chars.as_str().to_string())),
Some('W') => Ok(OperatingSystem::Windows),
Some('L') => Ok(OperatingSystem::Linux),
Some('M') => Ok(OperatingSystem::MacOS),
Some('I') => Ok(OperatingSystem::Ios),
Some('A') => Ok(OperatingSystem::Android),
_ => Ok(OperatingSystem::Other(s.to_owned())),
}
}
}

View File

@@ -16,7 +16,12 @@ impl Header {
let discriminator = stream.read_u8().await.map_err(|_| ())?; // TODO: Error handling
match discriminator {
0 => Ok(Self::Spacedrop(TransferRequest::from_stream(stream).await?)),
0 => match stream {
SpaceTimeStream::Unicast(stream) => {
Ok(Self::Spacedrop(TransferRequest::from_stream(stream).await?))
}
_ => todo!(),
},
1 => Ok(Self::Ping),
2 => {
let mut uuid = [0u8; 16];

View File

@@ -1,6 +1,6 @@
use std::{collections::HashMap, env, time::Duration};
use sd_p2p::{Event, Keypair, Manager, Metadata};
use sd_p2p::{spacetime::SpaceTimeStream, Event, Keypair, Manager, Metadata};
use tokio::{io::AsyncReadExt, time::sleep};
use tracing::{debug, error, info};
@@ -73,17 +73,24 @@ async fn main() {
debug!("Peer '{}' established stream", event.peer_id);
tokio::spawn(async move {
let mut stream = event.stream;
let mut buf = [0; 100];
let n = stream.read(&mut buf).await.unwrap();
println!("GOT: {:?}", std::str::from_utf8(&buf[..n]).unwrap());
// TODO: Allow differentiating whether this is Spacedrop or Sync message here
// TODO
// event.stream.write_all(response.as_bytes()).await.unwrap();
// event.stream.flush().await.unwrap();
match event.stream {
SpaceTimeStream::Broadcast(mut stream) => {
let mut buf = [0; 100];
let n = stream.read(&mut buf).await.unwrap();
println!(
"GOT BROADCAST: {:?}",
std::str::from_utf8(&buf[..n]).unwrap()
);
}
SpaceTimeStream::Unicast(mut stream) => {
let mut buf = [0; 100];
let n = stream.read(&mut buf).await.unwrap();
println!(
"GOT UNICAST: {:?}",
std::str::from_utf8(&buf[..n]).unwrap()
);
}
}
});
}
_ => debug!("event: {:?}", event),

View File

@@ -10,7 +10,7 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use tracing::{debug, error, warn};
use crate::{
spacetime::{SpaceTime, SpaceTimeStream},
spacetime::{SpaceTime, UnicastStream},
AsyncFn, DiscoveredPeer, Keypair, ManagerStream, ManagerStreamAction, Mdns, Metadata, PeerId,
};
@@ -113,14 +113,16 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
})
}
pub async fn stream(&self, peer_id: PeerId) -> Result<SpaceTimeStream, ()> {
pub async fn stream(&self, peer_id: PeerId) -> Result<UnicastStream, ()> {
// TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that.
let (tx, rx) = oneshot::channel();
self.emit(ManagerStreamAction::StartStream(peer_id, tx))
.await;
rx.await.map_err(|_| {
let mut stream = rx.await.map_err(|_| {
warn!("failed to queue establishing stream to peer '{peer_id}'!");
})
})?;
stream.write_discriminator().await.unwrap(); // TODO: Error handling
Ok(stream)
}
pub async fn broadcast(&self, data: Vec<u8>) {

View File

@@ -1,4 +1,5 @@
use std::{
fmt,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
@@ -19,7 +20,7 @@ use tracing::{debug, error, warn};
use crate::{
quic_multiaddr_to_socketaddr, socketaddr_to_quic_multiaddr,
spacetime::{OutboundRequest, SpaceTime, SpaceTimeStream},
spacetime::{OutboundRequest, SpaceTime, UnicastStream},
AsyncFn, Event, Manager, Mdns, Metadata, PeerId,
};
@@ -35,11 +36,17 @@ pub enum ManagerStreamAction<TMetadata: Metadata> {
addresses: Vec<SocketAddr>,
},
/// TODO
StartStream(PeerId, oneshot::Sender<SpaceTimeStream>),
StartStream(PeerId, oneshot::Sender<UnicastStream>),
/// TODO
BroadcastData(Vec<u8>),
}
impl<TMetadata: Metadata> fmt::Debug for ManagerStreamAction<TMetadata> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManagerStreamAction")
}
}
impl<TMetadata: Metadata> From<Event<TMetadata>> for ManagerStreamAction<TMetadata> {
fn from(event: Event<TMetadata>) -> Self {
Self::Event(event)
@@ -69,58 +76,25 @@ where
// We loop polling internal services until an event comes in that needs to be sent to the parent application.
loop {
tokio::select! {
_ = self.mdns.poll() => {},
event = self.mdns.poll() => {
if let Some(event) = event {
return Some(event);
}
continue;
},
event = self.event_stream_rx.recv() => {
// If the sender has shut down we return `None` to also shut down too.
match event? {
ManagerStreamAction::Event(event) => return Some(event),
ManagerStreamAction::GetConnectedPeers(response) => {
response.send(self.swarm.behaviour().connected_peers.values().map(|p| p.peer_id).collect::<Vec<_>>()).map_err(|_| error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!")).ok();
},
ManagerStreamAction::Dial { peer_id, addresses } => {
match self.swarm.dial(
DialOpts::peer_id(peer_id.0)
.condition(PeerCondition::Disconnected)
.addresses(
addresses
.iter()
.map(socketaddr_to_quic_multiaddr)
.collect(),
)
.extend_addresses_through_behaviour()
.build(),
) {
Ok(_) => {}
Err(err) => warn!(
"error dialing peer '{}' with addresses '{:?}': {}",
peer_id, addresses, err
),
}
}
ManagerStreamAction::StartStream(peer_id, rx) => {
self.swarm.behaviour_mut().pending_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Stream(rx),
});
}
ManagerStreamAction::BroadcastData(data) => {
let swarm = self.swarm.behaviour_mut();
for peer in swarm.connected_peers.values() {
swarm.pending_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Data(data.clone()),
});
}
}
if let Some(event) = self.handle_manager_stream_action(event?) {
return Some(event);
}
}
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(()) => {},
SwarmEvent::Behaviour(event) => {
if let Some(event) = self.handle_manager_stream_action(event) {
return Some(event);
}
},
SwarmEvent::ConnectionEstablished { .. } => {},
SwarmEvent::ConnectionClosed { .. } => {},
SwarmEvent::IncomingConnection { local_addr, .. } => debug!("incoming connection from '{}'", local_addr),
@@ -136,8 +110,8 @@ where
self.is_advertisement_queued.store(true, Ordering::Relaxed);
self.mdns.advertise();
}
self.manager.emit(Event::AddListenAddr(addr).into()).await;
self.mdns.advertise();
return Some(Event::AddListenAddr(addr));
},
Err(err) => {
warn!("error passing listen address: {}", err);
@@ -174,12 +148,73 @@ where
}
}
fn handle_manager_stream_action(
&mut self,
event: ManagerStreamAction<TMetadata>,
) -> Option<Event<TMetadata>> {
match event {
ManagerStreamAction::Event(event) => return Some(event),
ManagerStreamAction::GetConnectedPeers(response) => {
response
.send(
self.swarm
.connected_peers()
.map(|v| PeerId(*v))
.collect::<Vec<_>>(),
)
.map_err(|_| {
error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!")
})
.ok();
}
ManagerStreamAction::Dial { peer_id, addresses } => {
match self.swarm.dial(
DialOpts::peer_id(peer_id.0)
.condition(PeerCondition::Disconnected)
.addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect())
.build(),
) {
Ok(_) => {}
Err(err) => warn!(
"error dialing peer '{}' with addresses '{:?}': {}",
peer_id, addresses, err
),
}
}
ManagerStreamAction::StartStream(peer_id, rx) => {
self.swarm.behaviour_mut().pending_events.push_back(
NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Unicast(rx),
},
);
}
ManagerStreamAction::BroadcastData(data) => {
let connected_peers = self.swarm.connected_peers().map(|v| *v).collect::<Vec<_>>();
let behaviour = self.swarm.behaviour_mut();
for peer_id in connected_peers {
behaviour
.pending_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id,
handler: NotifyHandler::Any,
event: OutboundRequest::Broadcast(data.clone()),
});
}
}
}
return None;
}
// TODO: Move into mdns
async fn unregister_addr(
manager: &Arc<Manager<TMetadata>>,
mdns: &Mdns<TMetadata, TMetadataFn>,
is_advertisement_queued: &AtomicBool,
address: Multiaddr,
) -> Result<(), String> {
) -> Result<Event<TMetadata>, String> {
match quic_multiaddr_to_socketaddr(address) {
Ok(addr) => {
debug!("listen address removed: {}", addr);
@@ -189,10 +224,7 @@ where
is_advertisement_queued.store(true, Ordering::Relaxed);
mdns.advertise();
}
manager
.emit(ManagerStreamAction::Event(Event::RemoveListenAddr(addr)))
.await;
Ok(())
Ok(Event::RemoveListenAddr(addr))
}
Err(err) => Err(err),
}

View File

@@ -123,7 +123,7 @@ where
}
// TODO: if the channel's sender is dropped will this cause the `tokio::select` in the `manager.rs` to infinitely loop?
pub async fn poll(&mut self) {
pub async fn poll(&mut self) -> Option<Event<TMetadata>> {
tokio::select! {
_ = &mut self.next_mdns_advertisement => {
self.advertise();
@@ -143,7 +143,7 @@ where
Ok(peer_id) => {
// Prevent discovery of the current peer.
if peer_id == self.manager.peer_id {
return;
return None;
}
match TMetadata::from_hashmap(
@@ -182,7 +182,7 @@ where
discovered_peers.insert(peer_id, peer.clone());
peer
};
self.manager.emit(Event::PeerDiscovered(peer).into()).await;
return Some(Event::PeerDiscovered(peer));
}
Err(err) => {
error!("error parsing metadata for peer '{}': {}", raw_peer_id, err)
@@ -202,7 +202,7 @@ where
Ok(peer_id) => {
// Prevent discovery of the current peer.
if peer_id == self.manager.peer_id {
return;
return None;
}
{
@@ -210,12 +210,10 @@ where
self.manager.discovered.write().await;
let peer = discovered_peers.remove(&peer_id);
self.manager
.emit(Event::PeerExpired {
id: peer_id,
metadata: peer.map(|p| p.metadata),
}.into())
.await;
return Some(Event::PeerExpired {
id: peer_id,
metadata: peer.map(|p| p.metadata),
});
}
}
Err(_) => warn!(
@@ -227,6 +225,8 @@ where
ServiceEvent::SearchStopped(_) => {}
}
}
}
};
None
}
}

View File

@@ -1,6 +1,4 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use libp2p::{core::ConnectedPoint, swarm::ConnectionId};
use std::{net::SocketAddr, sync::Arc};
use crate::{Manager, ManagerStreamAction, Metadata, PeerId};
@@ -40,7 +38,4 @@ impl<TMetadata: Metadata> DiscoveredPeer<TMetadata> {
pub struct ConnectedPeer {
/// get the peer id of the discovered peer
pub peer_id: PeerId,
/// list of connections between the peer and the local node
#[cfg_attr(any(feature = "serde", feature = "specta"), serde(skip))]
pub connections: HashMap<ConnectionId, ConnectedPoint>, // TODO: Probs use `thinvec` style thing here
}

View File

@@ -10,7 +10,7 @@ use std::{
use tokio::io::AsyncReadExt;
use crate::spacetime::SpaceTimeStream;
use crate::spacetime::{SpaceTimeStream, UnicastStream};
/// TODO
#[derive(Debug, PartialEq, Eq)]
@@ -35,7 +35,7 @@ pub struct TransferRequest {
}
impl TransferRequest {
pub async fn from_stream(stream: &mut SpaceTimeStream) -> Result<Self, ()> {
pub async fn from_stream(stream: &mut UnicastStream) -> Result<Self, ()> {
let name_len = stream.read_u8().await.map_err(|_| ())?; // TODO: Error handling
let mut name = vec![0u8; name_len as usize];
stream.read_exact(&mut name).await.map_err(|_| ())?; // TODO: Error handling

View File

@@ -1,5 +1,5 @@
use std::{
collections::{HashMap, VecDeque},
collections::VecDeque,
sync::Arc,
task::{Context, Poll},
};
@@ -14,7 +14,7 @@ use libp2p::{
Multiaddr,
};
use thiserror::Error;
use tracing::{debug, warn};
use tracing::debug;
use crate::{ConnectedPeer, Event, Manager, ManagerStreamAction, Metadata, PeerId};
@@ -38,7 +38,7 @@ pub struct SpaceTime<TMetadata: Metadata> {
NetworkBehaviourAction<<Self as NetworkBehaviour>::OutEvent, THandlerInEvent<Self>>,
>,
// For future me's sake, DON't try and refactor this to use shared state (for the nth time), it doesn't fit into libp2p's synchronous trait and polling model!!!
pub(crate) connected_peers: HashMap<PeerId, ConnectedPeer>,
// pub(crate) connected_peers: HashMap<PeerId, ConnectedPeer>,
}
impl<TMetadata: Metadata> SpaceTime<TMetadata> {
@@ -47,14 +47,14 @@ impl<TMetadata: Metadata> SpaceTime<TMetadata> {
Self {
manager,
pending_events: VecDeque::new(),
connected_peers: HashMap::new(),
// connected_peers: HashMap::new(),
}
}
}
impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
type ConnectionHandler = SpaceTimeConnection<TMetadata>;
type OutEvent = ();
type OutEvent = ManagerStreamAction<TMetadata>;
fn handle_established_inbound_connection(
&mut self,
@@ -69,43 +69,15 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
))
}
// TODO: Are we even using the response to this??
// TODO: Do we need to load from state or can be just pass through the `addresses` arg?
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
maybe_peer: Option<libp2p::PeerId>,
_maybe_peer: Option<libp2p::PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
if let Some(peer_id) = maybe_peer {
let mut addresses = Vec::new();
if let Some(connection) = self.connected_peers.get(&PeerId(peer_id)) {
#[allow(clippy::unnecessary_filter_map)]
// TODO: Clippy is getting annoyed cause of the `todo!()`. Remove this once that is fixed.
addresses.extend(
connection
.connections
.iter()
.filter_map(|(_, cp)| match cp {
ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => {
println!(
"TODO: Handle this case! ({} -> {})",
local_addr, send_back_addr
);
todo!();
}
}),
)
}
Ok(addresses)
} else {
Ok(vec![])
}
// This should be unused but libp2p still calls it
Ok(vec![])
}
fn handle_established_outbound_connection(
@@ -125,7 +97,6 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint,
other_established,
..
@@ -140,92 +111,39 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
);
let peer_id = PeerId(peer_id);
let endpoint = endpoint.clone();
let conn = match self.connected_peers.get_mut(&peer_id) {
Some(peer) => {
peer.connections.insert(connection_id, endpoint);
peer.clone()
}
None => {
self.connected_peers.insert(
peer_id,
ConnectedPeer {
peer_id,
connections: HashMap::from([(connection_id, endpoint)]),
},
);
self.connected_peers
.get(&peer_id)
.expect("We legit have a mutable reference")
.clone()
}
};
// TODO: Move this block onto into `connection.rs` -> will probs be required for the ConnectionEstablishmentPayload stuff
{
debug!("sending establishment request to peer '{}'", peer_id);
if other_established == 0 {
let manager = self.manager.clone();
tokio::spawn(async move {
manager
.emit(ManagerStreamAction::Event(Event::PeerConnected(conn)))
.await;
});
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
ManagerStreamAction::Event(Event::PeerConnected(ConnectedPeer {
peer_id,
})),
));
}
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
remaining_established,
..
}) => {
let peer_id = PeerId(peer_id);
match self.connected_peers.get_mut(&peer_id) {
Some(peer) => {
if peer.connections.len() == 1 {
let conn = self.connected_peers.remove(&peer_id).expect("Literally impossible. We have a mutable reference to it, no shot it's already been removed.");
debug!("Disconnected from peer '{}'", conn.peer_id);
let manager = self.manager.clone();
tokio::spawn(async move {
manager
.emit(ManagerStreamAction::Event(Event::PeerDisconnected(
conn.peer_id,
)))
.await;
});
} else {
peer.connections.remove(&connection_id);
}
}
None => {
warn!(
"Received connection closed event for peer '{}' but no connection was found!",
peer_id
);
}
if remaining_established == 0 {
debug!("Disconnected from peer '{}'", peer_id);
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
ManagerStreamAction::Event(Event::PeerDisconnected(peer_id)),
));
}
}
FromSwarm::AddressChange(_event) => {
// TODO: Reenable?
// let new_address = match event.new {
// ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
// ConnectedPoint::Listener { .. } => None,
// };
// let connected = self
// .manager
// .connections
// .blocking_read()
// .get_mut(&PeerId(event.peer_id))
// .expect("Address change can only happen on an established connection.");
// let connection = connected
// .connections
// .iter_mut()
// .find(|c| c.id == event.connection_id)
// .expect("Address change can only happen on an established connection.");
// connection.address = new_address;
FromSwarm::AddressChange(event) => {
debug!(
"Address change event: {:?} {:?} {:?} {:?}",
event.peer_id, event.connection_id, event.old, event.new
);
}
FromSwarm::DialFailure(event) => {
if let Some(peer_id) = event.peer_id {
@@ -267,9 +185,10 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
&mut self,
_peer_id: libp2p::PeerId,
_connection: ConnectionId,
_event: <SpaceTimeConnection<TMetadata> as ConnectionHandler>::OutEvent,
event: <SpaceTimeConnection<TMetadata> as ConnectionHandler>::OutEvent,
) {
todo!();
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
fn poll(

View File

@@ -1,7 +1,7 @@
use libp2p::swarm::{
handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
KeepAlive,
FullyNegotiatedInbound, KeepAlive,
},
SubstreamProtocol,
};
@@ -14,7 +14,7 @@ use std::{
};
use tracing::error;
use crate::{Manager, Metadata, PeerId};
use crate::{Manager, ManagerStreamAction, Metadata, PeerId};
use super::{InboundProtocol, OutboundProtocol, OutboundRequest, EMPTY_QUEUE_SHRINK_THRESHOLD};
@@ -49,7 +49,7 @@ impl<TMetadata: Metadata> SpaceTimeConnection<TMetadata> {
impl<TMetadata: Metadata> ConnectionHandler for SpaceTimeConnection<TMetadata> {
type InEvent = OutboundRequest;
type OutEvent = ();
type OutEvent = ManagerStreamAction<TMetadata>;
type Error = ConnectionHandlerUpgrErr<io::Error>;
type InboundProtocol = InboundProtocol<TMetadata>;
type OutboundProtocol = OutboundProtocol;
@@ -117,7 +117,12 @@ impl<TMetadata: Metadata> ConnectionHandler for SpaceTimeConnection<TMetadata> {
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(_) => {}
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol, ..
}) => {
self.pending_events
.push_back(ConnectionHandlerEvent::Custom(protocol));
}
ConnectionEvent::FullyNegotiatedOutbound(_) => {}
ConnectionEvent::DialUpgradeError(event) => {
error!("DialUpgradeError: {:#?}", event.error);

View File

@@ -1,11 +0,0 @@
use serde::{Deserialize, Serialize};
/// TODO
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SpaceTimeMessage {
/// Establish the connection
Establish,
/// Send data on behalf of application
Application(Vec<u8>),
}

View File

@@ -4,7 +4,6 @@
mod behaviour;
mod connection;
mod libp2p;
mod message;
mod proto_inbound;
mod proto_outbound;
mod stream;
@@ -12,7 +11,6 @@ mod stream;
pub use self::libp2p::*;
pub use behaviour::*;
pub use connection::*;
pub use message::*;
pub use proto_inbound::*;
pub use proto_outbound::*;
pub use stream::*;

View File

@@ -21,25 +21,21 @@ impl<TMetadata: Metadata> UpgradeInfo for InboundProtocol<TMetadata> {
}
impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtocol<TMetadata> {
type Output = ();
type Output = ManagerStreamAction<TMetadata>;
type Error = ();
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send + 'static>>;
fn upgrade_inbound(self, io: NegotiatedSubstream, _: Self::Info) -> Self::Future {
Box::pin(async move {
self.manager
.emit(ManagerStreamAction::Event(
PeerMessageEvent {
peer_id: self.peer_id,
manager: self.manager.clone(),
stream: SpaceTimeStream::new(io),
_priv: (),
}
.into(),
))
.await;
Ok(())
Ok(ManagerStreamAction::Event(
PeerMessageEvent {
peer_id: self.peer_id,
manager: self.manager.clone(),
stream: SpaceTimeStream::from_stream(io).await,
_priv: (),
}
.into(),
))
})
}
}

View File

@@ -1,15 +1,23 @@
use std::future::{ready, Ready};
use std::{
future::{ready, Ready},
io::ErrorKind,
};
use libp2p::{core::UpgradeInfo, swarm::NegotiatedSubstream, OutboundUpgrade};
use tokio::{io::AsyncWriteExt, sync::oneshot};
use libp2p::{
core::UpgradeInfo,
futures::{AsyncReadExt, AsyncWriteExt},
swarm::NegotiatedSubstream,
OutboundUpgrade,
};
use tokio::sync::oneshot;
use tracing::error;
use super::{SpaceTimeProtocolName, SpaceTimeStream};
use super::{SpaceTimeProtocolName, UnicastStream, BROADCAST_DISCRIMINATOR};
#[derive(Debug)]
pub enum OutboundRequest {
Data(Vec<u8>),
Stream(oneshot::Sender<SpaceTimeStream>),
Broadcast(Vec<u8>),
Unicast(oneshot::Sender<UnicastStream>),
}
pub struct OutboundProtocol(pub(crate) &'static [u8], pub(crate) OutboundRequest);
@@ -28,24 +36,33 @@ impl OutboundUpgrade<NegotiatedSubstream> for OutboundProtocol {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn upgrade_outbound(self, io: NegotiatedSubstream, _protocol: Self::Info) -> Self::Future {
let mut stream = SpaceTimeStream::new(io);
fn upgrade_outbound(self, mut io: NegotiatedSubstream, _protocol: Self::Info) -> Self::Future {
match self.1 {
OutboundRequest::Data(data) => {
OutboundRequest::Broadcast(data) => {
tokio::spawn(async move {
if let Err(err) = stream.write_all(&data).await {
io.write_all(&[BROADCAST_DISCRIMINATOR]).await.unwrap();
if let Err(err) = io.write_all(&data).await {
// TODO: Print the peer which we failed to send to here
error!("Error sending broadcast: {:?}", err);
}
stream.flush().await.unwrap();
stream.close().await.unwrap();
// TODO: We close the connection here without waiting for a response.
// TODO: If the other side's user-code doesn't account for that on this specific message they will error.
// TODO: Add an abstraction so the user can't respond to fixed size messages.
io.flush().await.unwrap();
let mut buf = [0u8; 1];
io.read_exact(&mut buf).await.unwrap();
debug_assert_eq!(buf[0], b'D', "Peer should let us know they were done!");
match io.close().await {
Ok(_) => {}
Err(err) if err.kind() == ErrorKind::ConnectionReset => {} // The other end shut the connection before us
Err(err) => {
error!("Error closing broadcast stream: {:?}", err);
}
}
});
}
OutboundRequest::Stream(sender) => {
sender.send(stream).unwrap();
OutboundRequest::Unicast(sender) => {
// We write the discriminator to the stream in the `Manager::stream` method before returning the stream to the user to make async a tad nicer.
sender.send(UnicastStream::new(io)).unwrap();
}
}

View File

@@ -1,29 +1,126 @@
use std::{
io,
io::{self, ErrorKind},
pin::Pin,
task::{Context, Poll},
};
use libp2p::{futures::AsyncWriteExt, swarm::NegotiatedSubstream};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt as TokioAsyncWriteExt, ReadBuf,
};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
use tracing::error;
pub const BROADCAST_DISCRIMINATOR: u8 = 0;
pub const UNICAST_DISCRIMINATOR: u8 = 1;
#[derive(Debug)]
pub struct SpaceTimeStream(Compat<NegotiatedSubstream>);
// TODO: Utils for sending msgpack and stuff over the stream. -> Have a max size of reading buffers so we are less susceptible to DoS attacks.
pub enum SpaceTimeStream {
Broadcast(BroadcastStream),
Unicast(UnicastStream),
}
impl SpaceTimeStream {
pub fn new(io: NegotiatedSubstream) -> Self {
Self(io.compat())
pub(crate) async fn from_stream(io: NegotiatedSubstream) -> Self {
let mut io = io.compat();
let discriminator = io.read_u8().await.unwrap(); // TODO: Timeout on this
match discriminator {
BROADCAST_DISCRIMINATOR => Self::Broadcast(BroadcastStream(Some(io))),
UNICAST_DISCRIMINATOR => Self::Unicast(UnicastStream(io)),
_ => todo!(), // TODO: Error handling
}
}
pub async fn close(self) -> Result<(), io::Error> {
self.0.into_inner().close().await
match self {
Self::Broadcast(mut stream) => {
if let Some(stream) = stream.0.take() {
BroadcastStream::close_inner(stream).await
} else {
debug_assert!(true, "'BroadcastStream' should never be 'None' here!");
error!("'BroadcastStream' should never be 'None' here!");
Ok(())
}
}
Self::Unicast(stream) => stream.0.into_inner().close().await,
}
}
}
impl AsyncRead for SpaceTimeStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.get_mut() {
Self::Broadcast(stream) => Pin::new(stream).poll_read(cx, buf),
Self::Unicast(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
/// A broadcast is a message sent to many peers in the network.
/// Due to this it is not possible to respond to a broadcast.
#[derive(Debug)]
pub struct BroadcastStream(Option<Compat<NegotiatedSubstream>>);
impl BroadcastStream {
async fn close_inner(mut io: Compat<NegotiatedSubstream>) -> Result<(), io::Error> {
io.write_all(&[b'D']).await?;
io.flush().await?;
match io.into_inner().close().await {
Ok(_) => Ok(()),
Err(err) if err.kind() == ErrorKind::ConnectionReset => Ok(()), // The other end shut the connection before us
Err(err) => Err(err),
}
}
}
impl AsyncRead for BroadcastStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().0.as_mut().expect("'BroadcastStream' can only be 'None' if this method is called after 'Drop' which ain't happening!")).poll_read(cx, buf)
}
}
impl Drop for BroadcastStream {
fn drop(&mut self) {
// This may be `None` if the user manually called `Self::close`
if let Some(stream) = self.0.take() {
tokio::spawn(async move {
Self::close_inner(stream).await.unwrap();
});
}
}
}
/// A unicast stream is a direct stream to a specific peer.
#[derive(Debug)]
pub struct UnicastStream(Compat<NegotiatedSubstream>);
// TODO: Utils for sending msgpack and stuff over the stream. -> Have a max size of reading buffers so we are less susceptible to DoS attacks.
impl UnicastStream {
pub(crate) fn new(io: NegotiatedSubstream) -> Self {
Self(io.compat())
}
pub(crate) async fn write_discriminator(&mut self) -> io::Result<()> {
// TODO: Timeout if the peer doesn't accept the byte quick enough
self.0.write_all(&[UNICAST_DISCRIMINATOR]).await
}
pub async fn close(self) -> Result<(), io::Error> {
self.0.into_inner().close().await
}
}
impl AsyncRead for UnicastStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -33,17 +130,7 @@ impl AsyncRead for SpaceTimeStream {
}
}
// impl AsyncBufRead for SpaceTimeStream {
// fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
// Pin::new(&mut self.get_mut().0).poll_fill_buf(cx)
// }
// fn consume(self: Pin<&mut Self>, amt: usize) {
// Pin::new(&mut self.get_mut().0).consume(amt)
// }
// }
impl AsyncWrite for SpaceTimeStream {
impl AsyncWrite for UnicastStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,