mirror of
https://github.com/exo-explore/exo.git
synced 2025-12-23 22:27:50 -05:00
auto dial
This commit is contained in:
@@ -13,7 +13,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
|
||||
use std::collections::BTreeSet;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::{Pin, pin};
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -53,7 +53,7 @@ impl PyIpAddress {
|
||||
#[gen_stub_pyclass]
|
||||
#[pyclass(name = "RustNetworkingHandle")]
|
||||
pub struct PyNetworkingHandle {
|
||||
net: Mutex<ExoNet>,
|
||||
net: Arc<ExoNet>,
|
||||
}
|
||||
|
||||
#[gen_stub_pymethods]
|
||||
@@ -62,28 +62,29 @@ impl PyNetworkingHandle {
|
||||
#[staticmethod]
|
||||
pub async fn create(identity: PyKeypair, namespace: String) -> PyResult<Self> {
|
||||
let loc: SecretKey = identity.0.clone();
|
||||
let net = RUNTIME
|
||||
.spawn(async move { ExoNet::init_iroh(loc, &namespace).await })
|
||||
.await
|
||||
// todo: pyerr better
|
||||
.pyerr()?
|
||||
.pyerr()?;
|
||||
Ok(Self {
|
||||
net: Mutex::new(net),
|
||||
})
|
||||
let net = Arc::new(
|
||||
RUNTIME
|
||||
.spawn(async move { ExoNet::init_iroh(loc, &namespace).await })
|
||||
.await
|
||||
// todo: pyerr better
|
||||
.pyerr()?
|
||||
.pyerr()?,
|
||||
);
|
||||
let cloned = net.clone();
|
||||
RUNTIME.spawn(async move { cloned.start_auto_dialer().await });
|
||||
|
||||
Ok(Self { net })
|
||||
}
|
||||
|
||||
async fn subscribe(&mut self, topic: String) -> PyResult<(PySender, PyReceiver)> {
|
||||
let mut lock = self.net.lock().await;
|
||||
let fut = lock.subscribe(&topic);
|
||||
async fn subscribe(&self, topic: String) -> PyResult<(PySender, PyReceiver)> {
|
||||
let fut = self.net.subscribe(&topic);
|
||||
let (send, recv) = pin!(fut).allow_threads_py().await.pyerr()?;
|
||||
Ok((PySender { inner: send }, PyReceiver { inner: recv }))
|
||||
}
|
||||
|
||||
async fn get_connection_receiver(&mut self) -> PyResult<PyConnectionReceiver> {
|
||||
let mut lock = self.net.lock().await;
|
||||
let fut = lock.connection_info();
|
||||
let stream = fut.await;
|
||||
async fn get_connection_receiver(&self) -> PyResult<PyConnectionReceiver> {
|
||||
let fut = self.net.connection_info();
|
||||
let stream = pin!(fut).allow_threads_py().await;
|
||||
Ok(PyConnectionReceiver {
|
||||
inner: Mutex::new(Box::pin(stream)),
|
||||
})
|
||||
|
||||
@@ -7,6 +7,7 @@ edition.workspace = true
|
||||
blake3 = { workspace = true, features = ["neon", "rayon"] }
|
||||
iroh = { workspace = true, features = ["discovery-local-network"] }
|
||||
iroh-gossip = { workspace = true }
|
||||
log.workspace = true
|
||||
n0-error = { workspace = true }
|
||||
n0-future = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
|
||||
@@ -9,7 +9,7 @@ async fn main() {
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let mut net = ExoNet::init_iroh(SecretKey::generate(&mut rand::rng()), "")
|
||||
let net = ExoNet::init_iroh(SecretKey::generate(&mut rand::rng()), "")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use iroh::{
|
||||
Endpoint, SecretKey,
|
||||
discovery::{
|
||||
@@ -5,6 +7,7 @@ use iroh::{
|
||||
mdns::{DiscoveryEvent, MdnsDiscovery},
|
||||
},
|
||||
endpoint::BindError,
|
||||
endpoint_info::EndpointIdExt,
|
||||
protocol::Router,
|
||||
};
|
||||
use iroh_gossip::{
|
||||
@@ -13,7 +16,7 @@ use iroh_gossip::{
|
||||
};
|
||||
|
||||
use n0_error::stack_error;
|
||||
use n0_future::Stream;
|
||||
use n0_future::{Stream, StreamExt};
|
||||
|
||||
#[stack_error(derive, add_meta, from_sources)]
|
||||
pub enum Error {
|
||||
@@ -28,6 +31,7 @@ pub enum Error {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ExoNet {
|
||||
alpn: String,
|
||||
router: Router,
|
||||
gossip: Gossip,
|
||||
mdns: MdnsDiscovery,
|
||||
@@ -47,20 +51,42 @@ impl ExoNet {
|
||||
.accept(&alpn, gossip.clone())
|
||||
.spawn();
|
||||
Ok(Self {
|
||||
alpn,
|
||||
router,
|
||||
gossip,
|
||||
mdns,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn connection_info(&mut self) -> impl Stream<Item = DiscoveryEvent> + Unpin + use<> {
|
||||
pub async fn start_auto_dialer(&self) {
|
||||
let mut dialed = BTreeSet::new();
|
||||
let mut recv = self.connection_info().await;
|
||||
while let Some(item) = recv.next().await {
|
||||
match item {
|
||||
DiscoveryEvent::Discovered { endpoint_info, .. } => {
|
||||
if !dialed.contains(&endpoint_info.endpoint_id) {
|
||||
log::info!("Dialing new peer {}", endpoint_info.endpoint_id.to_z32());
|
||||
let _ = self
|
||||
.router
|
||||
.endpoint()
|
||||
.connect(endpoint_info, self.alpn.as_bytes())
|
||||
.await;
|
||||
} else {
|
||||
dialed.insert(endpoint_info.endpoint_id);
|
||||
}
|
||||
}
|
||||
DiscoveryEvent::Expired { endpoint_id } => {
|
||||
dialed.remove(&endpoint_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connection_info(&self) -> impl Stream<Item = DiscoveryEvent> + Unpin + use<> {
|
||||
self.mdns.subscribe().await
|
||||
}
|
||||
|
||||
pub async fn subscribe(
|
||||
&mut self,
|
||||
topic: &str,
|
||||
) -> Result<(GossipSender, GossipReceiver), Error> {
|
||||
pub async fn subscribe(&self, topic: &str) -> Result<(GossipSender, GossipReceiver), Error> {
|
||||
Ok(self
|
||||
.gossip
|
||||
.subscribe(str_to_topic_id(topic), vec![])
|
||||
@@ -68,7 +94,7 @@ impl ExoNet {
|
||||
.split())
|
||||
}
|
||||
|
||||
pub async fn shutdown(&mut self) {
|
||||
pub async fn shutdown(&self) {
|
||||
self.router
|
||||
.shutdown()
|
||||
.await
|
||||
|
||||
@@ -167,9 +167,11 @@ class Router:
|
||||
recv = await self._net.get_connection_receiver()
|
||||
while True:
|
||||
message = await recv.receive()
|
||||
logger.warning(f"yo!, {message}!")
|
||||
logger.trace(
|
||||
f"Received message on connection_messages with payload {message}"
|
||||
)
|
||||
|
||||
if CONNECTION_MESSAGES.topic in self.topic_routers:
|
||||
router = self.topic_routers[CONNECTION_MESSAGES.topic]
|
||||
assert router.topic.model_type == ConnectionMessage
|
||||
|
||||
Reference in New Issue
Block a user