From 62d0a7e902eb180aef8ca5d2535c0f0e1725c0c2 Mon Sep 17 00:00:00 2001 From: Evan Date: Thu, 27 Nov 2025 14:58:48 +0000 Subject: [PATCH] auto dial --- rust/exo_pyo3_bindings/src/iroh_networking.rs | 37 ++++++++--------- rust/iroh_networking/Cargo.toml | 1 + rust/iroh_networking/src/bin/main.rs | 2 +- rust/iroh_networking/src/lib.rs | 40 +++++++++++++++---- src/exo/routing/router.py | 2 + 5 files changed, 56 insertions(+), 26 deletions(-) diff --git a/rust/exo_pyo3_bindings/src/iroh_networking.rs b/rust/exo_pyo3_bindings/src/iroh_networking.rs index bbd88e3d..4d31f48e 100644 --- a/rust/exo_pyo3_bindings/src/iroh_networking.rs +++ b/rust/exo_pyo3_bindings/src/iroh_networking.rs @@ -13,7 +13,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use std::collections::BTreeSet; use std::net::SocketAddr; use std::pin::{Pin, pin}; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; use tokio::runtime::Runtime; use tokio::sync::Mutex; @@ -53,7 +53,7 @@ impl PyIpAddress { #[gen_stub_pyclass] #[pyclass(name = "RustNetworkingHandle")] pub struct PyNetworkingHandle { - net: Mutex, + net: Arc, } #[gen_stub_pymethods] @@ -62,28 +62,29 @@ impl PyNetworkingHandle { #[staticmethod] pub async fn create(identity: PyKeypair, namespace: String) -> PyResult { let loc: SecretKey = identity.0.clone(); - let net = RUNTIME - .spawn(async move { ExoNet::init_iroh(loc, &namespace).await }) - .await - // todo: pyerr better - .pyerr()? - .pyerr()?; - Ok(Self { - net: Mutex::new(net), - }) + let net = Arc::new( + RUNTIME + .spawn(async move { ExoNet::init_iroh(loc, &namespace).await }) + .await + // todo: pyerr better + .pyerr()? + .pyerr()?, + ); + let cloned = net.clone(); + RUNTIME.spawn(async move { cloned.start_auto_dialer().await }); + + Ok(Self { net }) } - async fn subscribe(&mut self, topic: String) -> PyResult<(PySender, PyReceiver)> { - let mut lock = self.net.lock().await; - let fut = lock.subscribe(&topic); + async fn subscribe(&self, topic: String) -> PyResult<(PySender, PyReceiver)> { + let fut = self.net.subscribe(&topic); let (send, recv) = pin!(fut).allow_threads_py().await.pyerr()?; Ok((PySender { inner: send }, PyReceiver { inner: recv })) } - async fn get_connection_receiver(&mut self) -> PyResult { - let mut lock = self.net.lock().await; - let fut = lock.connection_info(); - let stream = fut.await; + async fn get_connection_receiver(&self) -> PyResult { + let fut = self.net.connection_info(); + let stream = pin!(fut).allow_threads_py().await; Ok(PyConnectionReceiver { inner: Mutex::new(Box::pin(stream)), }) diff --git a/rust/iroh_networking/Cargo.toml b/rust/iroh_networking/Cargo.toml index a2234949..73c7993e 100644 --- a/rust/iroh_networking/Cargo.toml +++ b/rust/iroh_networking/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true blake3 = { workspace = true, features = ["neon", "rayon"] } iroh = { workspace = true, features = ["discovery-local-network"] } iroh-gossip = { workspace = true } +log.workspace = true n0-error = { workspace = true } n0-future = { workspace = true } rand = { workspace = true } diff --git a/rust/iroh_networking/src/bin/main.rs b/rust/iroh_networking/src/bin/main.rs index 6001a099..f6877c9b 100644 --- a/rust/iroh_networking/src/bin/main.rs +++ b/rust/iroh_networking/src/bin/main.rs @@ -9,7 +9,7 @@ async fn main() { .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .init(); - let mut net = ExoNet::init_iroh(SecretKey::generate(&mut rand::rng()), "") + let net = ExoNet::init_iroh(SecretKey::generate(&mut rand::rng()), "") .await .unwrap(); diff --git a/rust/iroh_networking/src/lib.rs b/rust/iroh_networking/src/lib.rs index 3104ca88..e7a9ec03 100644 --- a/rust/iroh_networking/src/lib.rs +++ b/rust/iroh_networking/src/lib.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use iroh::{ Endpoint, SecretKey, discovery::{ @@ -5,6 +7,7 @@ use iroh::{ mdns::{DiscoveryEvent, MdnsDiscovery}, }, endpoint::BindError, + endpoint_info::EndpointIdExt, protocol::Router, }; use iroh_gossip::{ @@ -13,7 +16,7 @@ use iroh_gossip::{ }; use n0_error::stack_error; -use n0_future::Stream; +use n0_future::{Stream, StreamExt}; #[stack_error(derive, add_meta, from_sources)] pub enum Error { @@ -28,6 +31,7 @@ pub enum Error { #[derive(Debug)] pub struct ExoNet { + alpn: String, router: Router, gossip: Gossip, mdns: MdnsDiscovery, @@ -47,20 +51,42 @@ impl ExoNet { .accept(&alpn, gossip.clone()) .spawn(); Ok(Self { + alpn, router, gossip, mdns, }) } - pub async fn connection_info(&mut self) -> impl Stream + Unpin + use<> { + pub async fn start_auto_dialer(&self) { + let mut dialed = BTreeSet::new(); + let mut recv = self.connection_info().await; + while let Some(item) = recv.next().await { + match item { + DiscoveryEvent::Discovered { endpoint_info, .. } => { + if !dialed.contains(&endpoint_info.endpoint_id) { + log::info!("Dialing new peer {}", endpoint_info.endpoint_id.to_z32()); + let _ = self + .router + .endpoint() + .connect(endpoint_info, self.alpn.as_bytes()) + .await; + } else { + dialed.insert(endpoint_info.endpoint_id); + } + } + DiscoveryEvent::Expired { endpoint_id } => { + dialed.remove(&endpoint_id); + } + } + } + } + + pub async fn connection_info(&self) -> impl Stream + Unpin + use<> { self.mdns.subscribe().await } - pub async fn subscribe( - &mut self, - topic: &str, - ) -> Result<(GossipSender, GossipReceiver), Error> { + pub async fn subscribe(&self, topic: &str) -> Result<(GossipSender, GossipReceiver), Error> { Ok(self .gossip .subscribe(str_to_topic_id(topic), vec![]) @@ -68,7 +94,7 @@ impl ExoNet { .split()) } - pub async fn shutdown(&mut self) { + pub async fn shutdown(&self) { self.router .shutdown() .await diff --git a/src/exo/routing/router.py b/src/exo/routing/router.py index 17e239f9..c36bed26 100644 --- a/src/exo/routing/router.py +++ b/src/exo/routing/router.py @@ -167,9 +167,11 @@ class Router: recv = await self._net.get_connection_receiver() while True: message = await recv.receive() + logger.warning(f"yo!, {message}!") logger.trace( f"Received message on connection_messages with payload {message}" ) + if CONNECTION_MESSAGES.topic in self.topic_routers: router = self.topic_routers[CONNECTION_MESSAGES.topic] assert router.topic.model_type == ConnectionMessage