From dfd6fe7816e7af5d97aab7b21078656ed40a2aa2 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Mon, 23 Feb 2026 17:48:55 +0000 Subject: [PATCH] swarm: replace manual Stream impl with async_stream select loop (#1597) The Swarm's manual `impl Stream` had a fairness issue: it drained all client commands before polling the inner libp2p swarm, which could theoretically starve network event delivery under heavy command load. Replaced the hand-rolled `poll_next` with `tokio::select!` inside an `async_stream::stream!` generator. This gives fair, randomized polling between the client command channel and the inner swarm. Extracted `on_message` and `filter_swarm_event` as free functions, removed `pin_project` dependency, and changed callers to use `.into_stream()`. Test plan: - CI --- Cargo.lock | 23 +++ Cargo.toml | 1 + rust/exo_pyo3_bindings/src/networking.rs | 9 +- rust/networking/Cargo.toml | 3 +- rust/networking/examples/chatroom.rs | 3 +- rust/networking/src/swarm.rs | 190 +++++++++++------------ 6 files changed, 121 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b3e9e24..96819c82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -2759,6 +2781,7 @@ dependencies = [ name = "networking" version = "0.0.1" dependencies = [ + "async-stream", "delegate", "either", "extend", diff --git a/Cargo.toml b/Cargo.toml index ffa9022c..16be6eef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ delegate = "0.13" keccak-const = "0.2" # Async dependencies +async-stream = "0.3" tokio = "1.46" futures-lite = "2.6.1" futures-timer = "3.0" diff --git a/rust/exo_pyo3_bindings/src/networking.rs b/rust/exo_pyo3_bindings/src/networking.rs index 96349c89..0e4f1063 100644 --- a/rust/exo_pyo3_bindings/src/networking.rs +++ b/rust/exo_pyo3_bindings/src/networking.rs @@ -1,3 +1,4 @@ +use std::pin::Pin; use std::sync::Arc; use crate::r#const::MPSC_CHANNEL_SIZE; @@ -8,9 +9,9 @@ use crate::networking::exception::{ PyAllQueuesFullError, PyMessageTooLargeError, PyNoPeersSubscribedToTopicError, }; use crate::pyclass; -use futures_lite::StreamExt as _; +use futures_lite::{Stream, StreamExt as _}; use libp2p::gossipsub::PublishError; -use networking::swarm::{FromSwarm, Swarm, ToSwarm, create_swarm}; +use networking::swarm::{FromSwarm, ToSwarm, create_swarm}; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::{PyModule, PyModuleMethods as _}; use pyo3::types::PyBytes; @@ -133,7 +134,7 @@ mod exception { struct PyNetworkingHandle { // channels pub to_swarm: mpsc::Sender, - pub swarm: Arc>, + pub swarm: Arc + Send>>>>, } #[gen_stub_pyclass_complex_enum] @@ -188,7 +189,7 @@ impl PyNetworkingHandle { // create networking swarm (within tokio context!! or it crashes) let _guard = pyo3_async_runtimes::tokio::get_runtime().enter(); - let swarm = { create_swarm(identity, from_client).pyerr()? }; + let swarm = create_swarm(identity, from_client).pyerr()?.into_stream(); Ok(Self { swarm: Arc::new(Mutex::new(swarm)), diff --git a/rust/networking/Cargo.toml b/rust/networking/Cargo.toml index cbe30f40..80d6dbf5 100644 --- a/rust/networking/Cargo.toml +++ b/rust/networking/Cargo.toml @@ -21,9 +21,10 @@ extend = { workspace = true } delegate = { workspace = true } # async -tokio = { workspace = true, features = ["full"] } +async-stream = { workspace = true } futures-lite = { workspace = true } futures-timer = { workspace = true } +tokio = { workspace = true, features = ["full"] } # utility dependencies util = { workspace = true } diff --git a/rust/networking/examples/chatroom.rs b/rust/networking/examples/chatroom.rs index 22beae1f..cd25c99a 100644 --- a/rust/networking/examples/chatroom.rs +++ b/rust/networking/examples/chatroom.rs @@ -17,7 +17,8 @@ async fn main() { // Configure swarm let mut swarm = swarm::create_swarm(identity::Keypair::generate_ed25519(), from_client) - .expect("Swarm creation failed"); + .expect("Swarm creation failed") + .into_stream(); // Create a Gossipsub topic & subscribe let (tx, rx) = oneshot::channel(); diff --git a/rust/networking/src/swarm.rs b/rust/networking/src/swarm.rs index 73d0b47d..511c919b 100644 --- a/rust/networking/src/swarm.rs +++ b/rust/networking/src/swarm.rs @@ -1,10 +1,9 @@ use std::pin::Pin; -use std::task::Poll; use crate::swarm::transport::tcp_transport; use crate::{alias, discovery}; pub use behaviour::{Behaviour, BehaviourEvent}; -use futures_lite::Stream; +use futures_lite::{Stream, StreamExt}; use libp2p::{PeerId, SwarmBuilder, gossipsub, identity, swarm::SwarmEvent}; use tokio::sync::{mpsc, oneshot}; @@ -46,111 +45,101 @@ pub enum FromSwarm { peer_id: PeerId, }, } -#[pin_project::pin_project] + pub struct Swarm { - #[pin] - inner: libp2p::Swarm, + swarm: libp2p::Swarm, from_client: mpsc::Receiver, } + impl Swarm { - fn on_message(mut self: Pin<&mut Self>, message: ToSwarm) { - match message { - ToSwarm::Subscribe { - topic, - result_sender, - } => { - // try to subscribe - let result = self - .inner - .behaviour_mut() - .gossipsub - .subscribe(&gossipsub::IdentTopic::new(topic)); - - // send response oneshot - _ = result_sender.send(result) - } - ToSwarm::Unsubscribe { - topic, - result_sender, - } => { - // try to unsubscribe from the topic - let result = self - .inner - .behaviour_mut() - .gossipsub - .unsubscribe(&gossipsub::IdentTopic::new(topic)); - - // send response oneshot (or exit if connection closed) - _ = result_sender.send(result) - } - ToSwarm::Publish { - topic, - data, - result_sender, - } => { - // try to publish the data -> catch NoPeersSubscribedToTopic error & convert to correct exception - let result = self - .inner - .behaviour_mut() - .gossipsub - .publish(gossipsub::IdentTopic::new(topic), data); - // send response oneshot (or exit if connection closed) - _ = result_sender.send(result) - } - } - } -} -impl Stream for Swarm { - type Item = FromSwarm; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - loop { - let recv = self.as_mut().project().from_client; - match recv.poll_recv(cx) { - Poll::Ready(Some(msg)) => { - self.as_mut().on_message(msg); - // continue to re-poll after consumption - continue; + pub fn into_stream(self) -> Pin + Send>> { + let Swarm { + mut swarm, + mut from_client, + } = self; + let stream = async_stream::stream! { + loop { + tokio::select! { + msg = from_client.recv() => { + let Some(msg) = msg else { break }; + on_message(&mut swarm, msg); + } + event = swarm.next() => { + let Some(event) = event else { break }; + if let Some(item) = filter_swarm_event(event) { + yield item; + } + } } - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => {} } - let inner = self.as_mut().project().inner; - return match inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(swarm_event)) => match swarm_event { - SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( - gossipsub::Event::Message { - message: - gossipsub::Message { - source: Some(peer_id), - topic, - data, - .. - }, - .. - }, - )) => Poll::Ready(Some(FromSwarm::Message { - from: peer_id, - topic: topic.into_string(), - data, - })), - SwarmEvent::Behaviour(BehaviourEvent::Discovery( - discovery::Event::ConnectionEstablished { peer_id, .. }, - )) => Poll::Ready(Some(FromSwarm::Discovered { peer_id })), - SwarmEvent::Behaviour(BehaviourEvent::Discovery( - discovery::Event::ConnectionClosed { peer_id, .. }, - )) => Poll::Ready(Some(FromSwarm::Expired { peer_id })), - // continue to re-poll after consumption - _ => continue, - }, - }; + }; + Box::pin(stream) + } +} + +fn on_message(swarm: &mut libp2p::Swarm, message: ToSwarm) { + match message { + ToSwarm::Subscribe { + topic, + result_sender, + } => { + let result = swarm + .behaviour_mut() + .gossipsub + .subscribe(&gossipsub::IdentTopic::new(topic)); + _ = result_sender.send(result); + } + ToSwarm::Unsubscribe { + topic, + result_sender, + } => { + let result = swarm + .behaviour_mut() + .gossipsub + .unsubscribe(&gossipsub::IdentTopic::new(topic)); + _ = result_sender.send(result); + } + ToSwarm::Publish { + topic, + data, + result_sender, + } => { + let result = swarm + .behaviour_mut() + .gossipsub + .publish(gossipsub::IdentTopic::new(topic), data); + _ = result_sender.send(result); } } } + +fn filter_swarm_event(event: SwarmEvent) -> Option { + match event { + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message { + message: + gossipsub::Message { + source: Some(peer_id), + topic, + data, + .. + }, + .. + })) => Some(FromSwarm::Message { + from: peer_id, + topic: topic.into_string(), + data, + }), + SwarmEvent::Behaviour(BehaviourEvent::Discovery( + discovery::Event::ConnectionEstablished { peer_id, .. }, + )) => Some(FromSwarm::Discovered { peer_id }), + SwarmEvent::Behaviour(BehaviourEvent::Discovery(discovery::Event::ConnectionClosed { + peer_id, + .. + })) => Some(FromSwarm::Expired { peer_id }), + _ => None, + } +} + /// Create and configure a swarm which listens to all ports on OS pub fn create_swarm( keypair: identity::Keypair, @@ -164,10 +153,7 @@ pub fn create_swarm( // Listen on all interfaces and whatever port the OS assigns swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - Ok(Swarm { - inner: swarm, - from_client, - }) + Ok(Swarm { swarm, from_client }) } mod transport {