mirror of
https://github.com/exo-explore/exo.git
synced 2025-12-23 22:27:50 -05:00
clippy lints
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
//! SEE: https://pyo3.rs/v0.27.1/async-await.html#detaching-from-the-interpreter-across-await
|
||||
//! SEE: <https://pyo3.rs/v0.27.1/async-await.html#detaching-from-the-interpreter-across-await>
|
||||
|
||||
use pyo3::prelude::*;
|
||||
use std::{
|
||||
@@ -8,13 +8,13 @@ use std::{
|
||||
};
|
||||
|
||||
#[repr(transparent)]
|
||||
pub(crate) struct AllowThreads<F>(F);
|
||||
pub struct AllowThreads<F>(F);
|
||||
|
||||
impl<F> AllowThreads<F>
|
||||
where
|
||||
Self: Future,
|
||||
{
|
||||
pub fn new(f: F) -> Self {
|
||||
pub const fn new(f: F) -> Self {
|
||||
Self(f)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use iroh::{EndpointId, SecretKey, endpoint_info::EndpointIdExt};
|
||||
use iroh::{EndpointId, SecretKey, endpoint_info::EndpointIdExt as _};
|
||||
use postcard::ser_flavors::StdVec;
|
||||
|
||||
use crate::ext::ResultExt as _;
|
||||
@@ -23,7 +23,7 @@ impl PyKeypair {
|
||||
}
|
||||
/// Decode a postcard structure into a keypair
|
||||
#[staticmethod]
|
||||
fn from_postcard_encoding(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
|
||||
fn from_postcard_encoding(bytes: &Bound<'_, PyBytes>) -> PyResult<Self> {
|
||||
let bytes = Vec::from(bytes.as_bytes());
|
||||
Ok(Self(postcard::from_bytes(&bytes).pyerr()?))
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use crate::ext::{ByteArrayExt, FutureExt, ResultExt};
|
||||
use crate::ext::{ByteArrayExt as _, FutureExt as _, ResultExt as _};
|
||||
use crate::identity::{PyEndpointId, PyKeypair};
|
||||
use iroh::SecretKey;
|
||||
use iroh::discovery::EndpointInfo;
|
||||
use iroh::discovery::mdns::DiscoveryEvent;
|
||||
use iroh_gossip::api::{ApiError, Event, GossipReceiver, GossipSender, Message};
|
||||
use iroh_networking::ExoNet;
|
||||
use n0_future::{Stream, StreamExt as _};
|
||||
use networking::ExoNet;
|
||||
use pyo3::exceptions::{PyRuntimeError, PyStopAsyncIteration};
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::PyBytes;
|
||||
@@ -17,6 +17,7 @@ use std::sync::{Arc, LazyLock};
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
static RUNTIME: LazyLock<Runtime> =
|
||||
LazyLock::new(|| Runtime::new().expect("Failed to create tokio runtime"));
|
||||
|
||||
@@ -38,11 +39,11 @@ impl PyIpAddress {
|
||||
self.inner.ip().to_string()
|
||||
}
|
||||
|
||||
pub fn port(&self) -> u16 {
|
||||
pub const fn port(&self) -> u16 {
|
||||
self.inner.port()
|
||||
}
|
||||
|
||||
pub fn zone_id(&self) -> Option<u32> {
|
||||
pub const fn zone_id(&self) -> Option<u32> {
|
||||
match self.inner {
|
||||
SocketAddr::V6(ip) => Some(ip.scope_id()),
|
||||
SocketAddr::V4(_) => None,
|
||||
@@ -70,7 +71,7 @@ impl PyNetworkingHandle {
|
||||
.pyerr()?
|
||||
.pyerr()?,
|
||||
);
|
||||
let cloned = net.clone();
|
||||
let cloned = Arc::clone(&net);
|
||||
RUNTIME.spawn(async move { cloned.start_auto_dialer().await });
|
||||
|
||||
Ok(Self { net })
|
||||
@@ -158,34 +159,27 @@ struct PyConnectionReceiver {
|
||||
#[pymethods]
|
||||
impl PyConnectionReceiver {
|
||||
async fn receive(&mut self) -> PyResult<PyConnectionMessage> {
|
||||
loop {
|
||||
let mg_fut = self.inner.lock();
|
||||
let mut lock = pin!(mg_fut).allow_threads_py().await;
|
||||
match lock.next().allow_threads_py().await {
|
||||
// Successful cases
|
||||
Some(DiscoveryEvent::Discovered {
|
||||
endpoint_info: EndpointInfo { endpoint_id, data },
|
||||
..
|
||||
}) => {
|
||||
return Ok(PyConnectionMessage {
|
||||
endpoint_id: endpoint_id.into(),
|
||||
current_transport_addrs: Some(
|
||||
data.ip_addrs()
|
||||
.map(|it| PyIpAddress { inner: it.clone() })
|
||||
.collect(),
|
||||
),
|
||||
});
|
||||
}
|
||||
Some(DiscoveryEvent::Expired { endpoint_id }) => {
|
||||
return Ok(PyConnectionMessage {
|
||||
endpoint_id: endpoint_id.into(),
|
||||
current_transport_addrs: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Failure case
|
||||
None => return Err(PyStopAsyncIteration::new_err("")),
|
||||
}
|
||||
let mg_fut = self.inner.lock();
|
||||
let mut lock = pin!(mg_fut).allow_threads_py().await;
|
||||
match lock.next().allow_threads_py().await {
|
||||
// Successful cases
|
||||
Some(DiscoveryEvent::Discovered {
|
||||
endpoint_info: EndpointInfo { endpoint_id, data },
|
||||
..
|
||||
}) => Ok(PyConnectionMessage {
|
||||
endpoint_id: endpoint_id.into(),
|
||||
current_transport_addrs: Some(
|
||||
data.ip_addrs()
|
||||
.map(|inner| PyIpAddress { inner: *inner })
|
||||
.collect(),
|
||||
),
|
||||
}),
|
||||
Some(DiscoveryEvent::Expired { endpoint_id }) => Ok(PyConnectionMessage {
|
||||
endpoint_id: endpoint_id.into(),
|
||||
current_transport_addrs: None,
|
||||
}),
|
||||
// Failure case
|
||||
None => Err(PyStopAsyncIteration::new_err("")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ pub(crate) mod ext {
|
||||
}
|
||||
|
||||
pub trait FutureExt: Future + Sized {
|
||||
/// SEE: https://pyo3.rs/v0.27.1/async-await.html#detaching-from-the-interpreter-across-await
|
||||
/// SEE: <https://pyo3.rs/v0.27.1/async-await.html#detaching-from-the-interpreter-across-await>
|
||||
fn allow_threads_py(self) -> AllowThreads<Self>
|
||||
where
|
||||
AllowThreads<Self>: Future,
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::collections::BTreeSet;
|
||||
use iroh::{
|
||||
Endpoint, EndpointId, SecretKey, TransportAddr,
|
||||
discovery::{
|
||||
Discovery, EndpointData, IntoDiscoveryError,
|
||||
Discovery as _, EndpointData, IntoDiscoveryError,
|
||||
mdns::{DiscoveryEvent, MdnsDiscovery},
|
||||
},
|
||||
endpoint::BindError,
|
||||
endpoint_info::EndpointIdExt,
|
||||
endpoint_info::EndpointIdExt as _,
|
||||
protocol::Router,
|
||||
};
|
||||
use iroh_gossip::{
|
||||
@@ -16,11 +16,11 @@ use iroh_gossip::{
|
||||
};
|
||||
|
||||
use n0_error::{e, stack_error};
|
||||
use n0_future::{Stream, StreamExt};
|
||||
use n0_future::{Stream, StreamExt as _};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[stack_error(derive, add_meta, from_sources)]
|
||||
pub enum Error {
|
||||
pub enum ExoError {
|
||||
#[error(transparent)]
|
||||
FailedBinding { source: BindError },
|
||||
/// The gossip topic was closed.
|
||||
@@ -42,7 +42,8 @@ pub struct ExoNet {
|
||||
}
|
||||
|
||||
impl ExoNet {
|
||||
pub async fn init_iroh(sk: SecretKey, namespace: &str) -> Result<Self, Error> {
|
||||
#[inline]
|
||||
pub async fn init_iroh(sk: SecretKey, namespace: &str) -> Result<Self, ExoError> {
|
||||
let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
|
||||
.secret_key(sk)
|
||||
.bind()
|
||||
@@ -50,14 +51,12 @@ impl ExoNet {
|
||||
let mdns = MdnsDiscovery::builder().build(endpoint.id())?;
|
||||
let endpoint_addr = endpoint.addr();
|
||||
|
||||
let bound = endpoint_addr
|
||||
.ip_addrs()
|
||||
.map(|it| TransportAddr::Ip(it.clone()));
|
||||
let bound = endpoint_addr.ip_addrs().map(|it| TransportAddr::Ip(*it));
|
||||
|
||||
log::info!("publishing {endpoint_addr:?} with mdns");
|
||||
mdns.publish(&EndpointData::new(bound));
|
||||
endpoint.discovery().add(mdns.clone());
|
||||
let alpn = format!("/exo_discovery_network/{}", namespace).to_owned();
|
||||
let alpn = format!("/exo_discovery_network/{namespace}");
|
||||
// max msg size 4MB
|
||||
let gossip = Gossip::builder()
|
||||
.max_message_size(4 * 1024 * 1024)
|
||||
@@ -75,6 +74,7 @@ impl ExoNet {
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn start_auto_dialer(&self) {
|
||||
let mut recv = self.connection_info().await;
|
||||
|
||||
@@ -117,13 +117,15 @@ impl ExoNet {
|
||||
log::info!("Auto dialer stopping");
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn connection_info(&self) -> impl Stream<Item = DiscoveryEvent> + Unpin + use<> {
|
||||
self.mdns.subscribe().await
|
||||
}
|
||||
|
||||
pub async fn subscribe(&self, topic: &str) -> Result<(GossipSender, GossipReceiver), Error> {
|
||||
#[inline]
|
||||
pub async fn subscribe(&self, topic: &str) -> Result<(GossipSender, GossipReceiver), ExoError> {
|
||||
if self.known_peers.lock().await.is_empty() {
|
||||
return Err(e!(Error::NoPeers));
|
||||
return Err(e!(ExoError::NoPeers));
|
||||
}
|
||||
Ok(self
|
||||
.gossip
|
||||
@@ -135,11 +137,10 @@ impl ExoNet {
|
||||
.split())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(clippy::expect_used)]
|
||||
pub async fn shutdown(&self) {
|
||||
self.router
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("Iroh Router failed to shutdown");
|
||||
self.router.shutdown().await.expect("router panic");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user