Compare commits

...

1 Commits

Author SHA1 Message Date
Evan
3029dc6653 maybe maybe 2026-01-27 10:09:42 +00:00
7 changed files with 346 additions and 0 deletions

24
Cargo.lock generated
View File

@@ -514,6 +514,20 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cluster_membership"
version = "0.0.1"
dependencies = [
"anyhow",
"async-trait",
"futures-lite",
"futures-timer",
"libp2p",
"log",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "colorchoice"
version = "1.0.4"
@@ -998,6 +1012,7 @@ dependencies = [
name = "exo_pyo3_bindings"
version = "0.0.1"
dependencies = [
"cluster_membership",
"delegate",
"derive_more",
"env_logger",
@@ -1030,6 +1045,12 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "ff"
version = "0.13.1"
@@ -1138,7 +1159,10 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]

View File

@@ -4,6 +4,7 @@ members = [
"rust/networking",
"rust/exo_pyo3_bindings",
"rust/util",
"rust/cluster_membership",
]
[workspace.package]
@@ -25,6 +26,7 @@ opt-level = 3
## Crate members as common dependencies
networking = { path = "rust/networking" }
util = { path = "rust/util" }
cluster_membership = { path = "rust/cluster_membership" }
# Proc-macro authoring tools
syn = "2.0"
@@ -62,6 +64,7 @@ frunk-enum-core = "0.3"
# Async dependencies
tokio = "1.46"
futures = "0.3"
futures-lite = "2.6.1"
futures-util = "0.3"
futures-timer = "3.0"

View File

@@ -0,0 +1,23 @@
[package]
name = "cluster_membership"
version.workspace = true
edition.workspace = true
publish = false
[dependencies]
# util
anyhow.workspace = true
log.workspace = true
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
# async
tokio = { workspace = true, features = ["full"] }
futures-timer = { workspace = true }
futures-lite = "2.6.1"
# networking
libp2p = { workspace = true, features = ["full"] }
async-trait = "0.1.89"
[lints]
workspace = true

View File

@@ -0,0 +1,30 @@
use cluster_membership::Peer;
use libp2p::identity::ed25519::SecretKey;
use tokio::io::{self, AsyncBufReadExt};
use tracing_subscriber::{EnvFilter, filter::LevelFilter};
#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
.try_init();
let (mut peer, send, mut recv) =
Peer::new(SecretKey::generate(), "hello".to_string()).expect("peer should always build");
let ch = peer.subscribe("chatroom".to_string());
let jh = tokio::spawn(async move { peer.run().await });
let mut stdin = io::BufReader::new(io::stdin()).lines();
loop {
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {send.send((ch.clone(), line.into_bytes())).await.expect("example");}
Some(r) = recv.recv() => match r {
Ok((_, id, line)) => println!("{:?}:{:?}", id, String::from_utf8_lossy(&line)),
Err(e) => eprintln!("{e:?}"),
},
else => break
}
}
jh.await.expect("task failure");
}

View File

@@ -0,0 +1,227 @@
use libp2p::{
Multiaddr, PeerId, Swarm, SwarmBuilder,
futures::StreamExt,
gossipsub::{self, PublishError, Sha256Topic, TopicHash},
identify,
identity::{Keypair, ed25519},
mdns,
swarm::{NetworkBehaviour, SwarmEvent, dial_opts::DialOpts},
};
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use tokio::{select, sync::mpsc};
const DEFAULT_BUFFER_SIZE: usize = 10;
const MDNS_IGNORE_DURATION_SECS: u64 = 30;
impl Peer {
pub fn new(
identity: ed25519::SecretKey,
namespace: String,
) -> anyhow::Result<(
Self,
mpsc::Sender<(TopicHash, Vec<u8>)>,
mpsc::Receiver<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
)> {
let mut id_bytes = identity.as_ref().to_vec();
let mut swarm =
SwarmBuilder::with_existing_identity(Keypair::ed25519_from_bytes(&mut id_bytes)?)
.with_tokio()
.with_quic()
// TODO(evan): .with_bandwidth_metrics();
.with_behaviour(|kp| Behaviour::new(kp, namespace.clone()))?
.build();
swarm.listen_on("/ip6/::/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
let (to_swarm, from_client) = mpsc::channel(DEFAULT_BUFFER_SIZE);
let (to_client, from_swarm) = mpsc::channel(DEFAULT_BUFFER_SIZE);
Ok((
Self {
swarm,
namespace,
denied: HashMap::new(),
from_client,
to_client,
},
to_swarm,
from_swarm,
))
}
pub fn subscribe(&mut self, topic: String) -> TopicHash {
let topic = Sha256Topic::new(topic);
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&topic)
.expect("topic filtered");
topic.hash()
}
pub async fn run(&mut self) {
loop {
select! {
ev = self.swarm.select_next_some() => {
let Ok(()) = self.handle_swarm_event(ev).await else {
return
};
},
Some(msg) = self.from_client.recv() => {
if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(msg.0, msg.1) {
let Ok(()) = self.to_client.send(Err(e)).await else {
return
};
}
},
}
}
}
async fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourEvent>) -> Result<(), ()> {
let SwarmEvent::Behaviour(event) = event else {
if let SwarmEvent::NewListenAddr {
listener_id: _,
address,
} = event
{
log::info!("new listen address {address}")
}
return Ok(());
};
match event {
BehaviourEvent::Mdns(mdns_event) => match mdns_event {
mdns::Event::Discovered(vec) => {
// Dial everyone
let mut addrs = HashMap::<PeerId, Vec<Multiaddr>>::new();
vec.into_iter()
.filter(|(peer_id, _)| {
self.denied.get(peer_id).is_none_or(|t| {
t.elapsed() > Duration::from_secs(MDNS_IGNORE_DURATION_SECS)
})
})
.for_each(|(peer_id, addr)| addrs.entry(peer_id).or_default().push(addr));
addrs.into_iter().for_each(|(peer_id, addrs)| {
let _ = self
.swarm
.dial(DialOpts::peer_id(peer_id).addresses(addrs).build());
});
}
mdns::Event::Expired(vec) => {
vec.iter().for_each(|(peer_id, _)| {
log::debug!("{peer_id} no longer reachable on mDNS");
self.swarm
.behaviour_mut()
.gossipsub
.remove_explicit_peer(peer_id);
});
}
},
BehaviourEvent::Identify(identify::Event::Received {
connection_id: _,
peer_id,
info,
}) => {
if info
.protocols
.iter()
.any(|p| p.as_ref().contains(&self.namespace))
{
self.passed_namespace(peer_id);
} else {
self.failed_namespace(peer_id);
}
}
BehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: _,
message_id: _,
message:
gossipsub::Message {
topic,
data,
source: Some(source_peer),
..
},
}) => {
let Ok(()) = self.to_client.send(Ok((topic, source_peer, data))).await else {
return Err(());
};
}
_ => {}
}
Ok(())
}
fn passed_namespace(&mut self, peer: PeerId) {
log::info!("new peer {peer:?}");
self.denied.remove(&peer);
self.swarm
.behaviour_mut()
.gossipsub
.remove_blacklisted_peer(&peer);
self.swarm
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer);
}
fn failed_namespace(&mut self, peer: PeerId) {
log::debug!("{peer} failed handshake");
self.denied.insert(peer, Instant::now());
self.swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
// we don't care if disconnect fails
let _ = self.swarm.disconnect_peer_id(peer);
}
}
pub struct Peer {
pub swarm: Swarm<Behaviour>,
denied: HashMap<PeerId, Instant>,
namespace: String,
to_client: mpsc::Sender<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
from_client: mpsc::Receiver<(TopicHash, Vec<u8>)>,
}
#[test]
fn foo() {
fn bar<T: Send>(t: T) {}
let p: Peer = unimplemented!();
bar(p);
}
#[derive(NetworkBehaviour)]
pub struct Behaviour {
mdns: mdns::tokio::Behaviour,
pub gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
}
impl Behaviour {
fn new(kp: &Keypair, namespace: String) -> Self {
let mdns = mdns::tokio::Behaviour::new(Default::default(), kp.public().to_peer_id())
.expect("implementation is infallible");
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(kp.clone()),
gossipsub::ConfigBuilder::default()
.max_transmit_size(1024 * 1024)
.protocol_id_prefix(format!("/exo/gossip/{namespace}/v1"))
.build()
.expect("fixed gossipsub config should always build"),
)
.expect("fixed gossipsub init should always build");
let identify = identify::Behaviour::new(
identify::Config::new_with_signed_peer_record(format!("/exo/identity/v1"), kp)
.with_push_listen_addr_updates(true),
);
Behaviour {
mdns,
gossipsub,
identify,
}
}
}

View File

@@ -22,6 +22,7 @@ doc = false
workspace = true
[dependencies]
cluster_membership.workspace = true
networking = { workspace = true }
# interop

View File

@@ -6,3 +6,41 @@
pub mod ident;
pub mod multiaddr;
use std::sync::Mutex;
use cluster_membership::Peer;
use libp2p::identity::ed25519::Keypair;
use pyo3::prelude::*;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
#[gen_stub_pyclass]
#[pyclass]
#[derive(Clone)]
pub struct PyKeypair(Keypair);
#[gen_stub_pymethods]
#[pymethods]
impl PyKeypair {
#[staticmethod]
fn generate() -> Self {
Self(Keypair::generate())
}
}
#[gen_stub_pyclass]
#[pyclass]
pub struct PyPeer(Mutex<Peer>);
#[gen_stub_pymethods]
#[pymethods]
impl PyPeer {
#[staticmethod]
fn init(kp: PyKeypair, namespace: String) -> PyResult<Self> {
Ok(PyPeer(Mutex::new(
Peer::new(kp.0.secret(), namespace)
.map_err(|e| e.pyerr())?
.0,
)))
}
}