mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-23 17:58:36 -05:00
swarm: replace manual Stream impl with async_stream select loop (#1597)
The Swarm's manual `impl Stream` had a fairness issue: it drained all client commands before polling the inner libp2p swarm, which could theoretically starve network event delivery under heavy command load. Replaced the hand-rolled `poll_next` with `tokio::select!` inside an `async_stream::stream!` generator. This gives fair, randomized polling between the client command channel and the inner swarm. Extracted `on_message` and `filter_swarm_event` as free functions, removed `pin_project` dependency, and changed callers to use `.into_stream()`. Test plan: - CI
This commit is contained in:
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -216,6 +216,28 @@ dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
|
||||
dependencies = [
|
||||
"async-stream-impl",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream-impl"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.89"
|
||||
@@ -2759,6 +2781,7 @@ dependencies = [
|
||||
name = "networking"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"delegate",
|
||||
"either",
|
||||
"extend",
|
||||
|
||||
@@ -34,6 +34,7 @@ delegate = "0.13"
|
||||
keccak-const = "0.2"
|
||||
|
||||
# Async dependencies
|
||||
async-stream = "0.3"
|
||||
tokio = "1.46"
|
||||
futures-lite = "2.6.1"
|
||||
futures-timer = "3.0"
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::r#const::MPSC_CHANNEL_SIZE;
|
||||
@@ -8,9 +9,9 @@ use crate::networking::exception::{
|
||||
PyAllQueuesFullError, PyMessageTooLargeError, PyNoPeersSubscribedToTopicError,
|
||||
};
|
||||
use crate::pyclass;
|
||||
use futures_lite::StreamExt as _;
|
||||
use futures_lite::{Stream, StreamExt as _};
|
||||
use libp2p::gossipsub::PublishError;
|
||||
use networking::swarm::{FromSwarm, Swarm, ToSwarm, create_swarm};
|
||||
use networking::swarm::{FromSwarm, ToSwarm, create_swarm};
|
||||
use pyo3::exceptions::PyRuntimeError;
|
||||
use pyo3::prelude::{PyModule, PyModuleMethods as _};
|
||||
use pyo3::types::PyBytes;
|
||||
@@ -133,7 +134,7 @@ mod exception {
|
||||
struct PyNetworkingHandle {
|
||||
// channels
|
||||
pub to_swarm: mpsc::Sender<ToSwarm>,
|
||||
pub swarm: Arc<Mutex<Swarm>>,
|
||||
pub swarm: Arc<Mutex<Pin<Box<dyn Stream<Item = FromSwarm> + Send>>>>,
|
||||
}
|
||||
|
||||
#[gen_stub_pyclass_complex_enum]
|
||||
@@ -188,7 +189,7 @@ impl PyNetworkingHandle {
|
||||
|
||||
// create networking swarm (within tokio context!! or it crashes)
|
||||
let _guard = pyo3_async_runtimes::tokio::get_runtime().enter();
|
||||
let swarm = { create_swarm(identity, from_client).pyerr()? };
|
||||
let swarm = create_swarm(identity, from_client).pyerr()?.into_stream();
|
||||
|
||||
Ok(Self {
|
||||
swarm: Arc::new(Mutex::new(swarm)),
|
||||
|
||||
@@ -21,9 +21,10 @@ extend = { workspace = true }
|
||||
delegate = { workspace = true }
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
async-stream = { workspace = true }
|
||||
futures-lite = { workspace = true }
|
||||
futures-timer = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
# utility dependencies
|
||||
util = { workspace = true }
|
||||
|
||||
@@ -17,7 +17,8 @@ async fn main() {
|
||||
|
||||
// Configure swarm
|
||||
let mut swarm = swarm::create_swarm(identity::Keypair::generate_ed25519(), from_client)
|
||||
.expect("Swarm creation failed");
|
||||
.expect("Swarm creation failed")
|
||||
.into_stream();
|
||||
|
||||
// Create a Gossipsub topic & subscribe
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
|
||||
use crate::swarm::transport::tcp_transport;
|
||||
use crate::{alias, discovery};
|
||||
pub use behaviour::{Behaviour, BehaviourEvent};
|
||||
use futures_lite::Stream;
|
||||
use futures_lite::{Stream, StreamExt};
|
||||
use libp2p::{PeerId, SwarmBuilder, gossipsub, identity, swarm::SwarmEvent};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
@@ -46,111 +45,101 @@ pub enum FromSwarm {
|
||||
peer_id: PeerId,
|
||||
},
|
||||
}
|
||||
#[pin_project::pin_project]
|
||||
|
||||
pub struct Swarm {
|
||||
#[pin]
|
||||
inner: libp2p::Swarm<Behaviour>,
|
||||
swarm: libp2p::Swarm<Behaviour>,
|
||||
from_client: mpsc::Receiver<ToSwarm>,
|
||||
}
|
||||
|
||||
impl Swarm {
|
||||
fn on_message(mut self: Pin<&mut Self>, message: ToSwarm) {
|
||||
match message {
|
||||
ToSwarm::Subscribe {
|
||||
topic,
|
||||
result_sender,
|
||||
} => {
|
||||
// try to subscribe
|
||||
let result = self
|
||||
.inner
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.subscribe(&gossipsub::IdentTopic::new(topic));
|
||||
|
||||
// send response oneshot
|
||||
_ = result_sender.send(result)
|
||||
}
|
||||
ToSwarm::Unsubscribe {
|
||||
topic,
|
||||
result_sender,
|
||||
} => {
|
||||
// try to unsubscribe from the topic
|
||||
let result = self
|
||||
.inner
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.unsubscribe(&gossipsub::IdentTopic::new(topic));
|
||||
|
||||
// send response oneshot (or exit if connection closed)
|
||||
_ = result_sender.send(result)
|
||||
}
|
||||
ToSwarm::Publish {
|
||||
topic,
|
||||
data,
|
||||
result_sender,
|
||||
} => {
|
||||
// try to publish the data -> catch NoPeersSubscribedToTopic error & convert to correct exception
|
||||
let result = self
|
||||
.inner
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.publish(gossipsub::IdentTopic::new(topic), data);
|
||||
// send response oneshot (or exit if connection closed)
|
||||
_ = result_sender.send(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Stream for Swarm {
|
||||
type Item = FromSwarm;
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
let recv = self.as_mut().project().from_client;
|
||||
match recv.poll_recv(cx) {
|
||||
Poll::Ready(Some(msg)) => {
|
||||
self.as_mut().on_message(msg);
|
||||
// continue to re-poll after consumption
|
||||
continue;
|
||||
pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = FromSwarm> + Send>> {
|
||||
let Swarm {
|
||||
mut swarm,
|
||||
mut from_client,
|
||||
} = self;
|
||||
let stream = async_stream::stream! {
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = from_client.recv() => {
|
||||
let Some(msg) = msg else { break };
|
||||
on_message(&mut swarm, msg);
|
||||
}
|
||||
event = swarm.next() => {
|
||||
let Some(event) = event else { break };
|
||||
if let Some(item) = filter_swarm_event(event) {
|
||||
yield item;
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => return Poll::Ready(None),
|
||||
Poll::Pending => {}
|
||||
}
|
||||
let inner = self.as_mut().project().inner;
|
||||
return match inner.poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(Some(swarm_event)) => match swarm_event {
|
||||
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(
|
||||
gossipsub::Event::Message {
|
||||
message:
|
||||
gossipsub::Message {
|
||||
source: Some(peer_id),
|
||||
topic,
|
||||
data,
|
||||
..
|
||||
},
|
||||
..
|
||||
},
|
||||
)) => Poll::Ready(Some(FromSwarm::Message {
|
||||
from: peer_id,
|
||||
topic: topic.into_string(),
|
||||
data,
|
||||
})),
|
||||
SwarmEvent::Behaviour(BehaviourEvent::Discovery(
|
||||
discovery::Event::ConnectionEstablished { peer_id, .. },
|
||||
)) => Poll::Ready(Some(FromSwarm::Discovered { peer_id })),
|
||||
SwarmEvent::Behaviour(BehaviourEvent::Discovery(
|
||||
discovery::Event::ConnectionClosed { peer_id, .. },
|
||||
)) => Poll::Ready(Some(FromSwarm::Expired { peer_id })),
|
||||
// continue to re-poll after consumption
|
||||
_ => continue,
|
||||
},
|
||||
};
|
||||
};
|
||||
Box::pin(stream)
|
||||
}
|
||||
}
|
||||
|
||||
fn on_message(swarm: &mut libp2p::Swarm<Behaviour>, message: ToSwarm) {
|
||||
match message {
|
||||
ToSwarm::Subscribe {
|
||||
topic,
|
||||
result_sender,
|
||||
} => {
|
||||
let result = swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.subscribe(&gossipsub::IdentTopic::new(topic));
|
||||
_ = result_sender.send(result);
|
||||
}
|
||||
ToSwarm::Unsubscribe {
|
||||
topic,
|
||||
result_sender,
|
||||
} => {
|
||||
let result = swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.unsubscribe(&gossipsub::IdentTopic::new(topic));
|
||||
_ = result_sender.send(result);
|
||||
}
|
||||
ToSwarm::Publish {
|
||||
topic,
|
||||
data,
|
||||
result_sender,
|
||||
} => {
|
||||
let result = swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.publish(gossipsub::IdentTopic::new(topic), data);
|
||||
_ = result_sender.send(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn filter_swarm_event(event: SwarmEvent<BehaviourEvent>) -> Option<FromSwarm> {
|
||||
match event {
|
||||
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message {
|
||||
message:
|
||||
gossipsub::Message {
|
||||
source: Some(peer_id),
|
||||
topic,
|
||||
data,
|
||||
..
|
||||
},
|
||||
..
|
||||
})) => Some(FromSwarm::Message {
|
||||
from: peer_id,
|
||||
topic: topic.into_string(),
|
||||
data,
|
||||
}),
|
||||
SwarmEvent::Behaviour(BehaviourEvent::Discovery(
|
||||
discovery::Event::ConnectionEstablished { peer_id, .. },
|
||||
)) => Some(FromSwarm::Discovered { peer_id }),
|
||||
SwarmEvent::Behaviour(BehaviourEvent::Discovery(discovery::Event::ConnectionClosed {
|
||||
peer_id,
|
||||
..
|
||||
})) => Some(FromSwarm::Expired { peer_id }),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create and configure a swarm which listens to all ports on OS
|
||||
pub fn create_swarm(
|
||||
keypair: identity::Keypair,
|
||||
@@ -164,10 +153,7 @@ pub fn create_swarm(
|
||||
|
||||
// Listen on all interfaces and whatever port the OS assigns
|
||||
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
|
||||
Ok(Swarm {
|
||||
inner: swarm,
|
||||
from_client,
|
||||
})
|
||||
Ok(Swarm { swarm, from_client })
|
||||
}
|
||||
|
||||
mod transport {
|
||||
|
||||
Reference in New Issue
Block a user