mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-27 07:20:14 -05:00
Compare commits
1 Commits
main
...
rust-explo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3029dc6653 |
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -514,6 +514,20 @@ version = "0.7.6"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cluster_membership"
|
||||||
|
version = "0.0.1"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
|
"futures-lite",
|
||||||
|
"futures-timer",
|
||||||
|
"libp2p",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"tracing-subscriber",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "colorchoice"
|
name = "colorchoice"
|
||||||
version = "1.0.4"
|
version = "1.0.4"
|
||||||
@@ -998,6 +1012,7 @@ dependencies = [
|
|||||||
name = "exo_pyo3_bindings"
|
name = "exo_pyo3_bindings"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"cluster_membership",
|
||||||
"delegate",
|
"delegate",
|
||||||
"derive_more",
|
"derive_more",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
@@ -1030,6 +1045,12 @@ dependencies = [
|
|||||||
"syn 2.0.111",
|
"syn 2.0.111",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fastrand"
|
||||||
|
version = "2.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ff"
|
name = "ff"
|
||||||
version = "0.13.1"
|
version = "0.13.1"
|
||||||
@@ -1138,7 +1159,10 @@ version = "2.6.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
|
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"fastrand",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"parking",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ members = [
|
|||||||
"rust/networking",
|
"rust/networking",
|
||||||
"rust/exo_pyo3_bindings",
|
"rust/exo_pyo3_bindings",
|
||||||
"rust/util",
|
"rust/util",
|
||||||
|
"rust/cluster_membership",
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
@@ -25,6 +26,7 @@ opt-level = 3
|
|||||||
## Crate members as common dependencies
|
## Crate members as common dependencies
|
||||||
networking = { path = "rust/networking" }
|
networking = { path = "rust/networking" }
|
||||||
util = { path = "rust/util" }
|
util = { path = "rust/util" }
|
||||||
|
cluster_membership = { path = "rust/cluster_membership" }
|
||||||
|
|
||||||
# Proc-macro authoring tools
|
# Proc-macro authoring tools
|
||||||
syn = "2.0"
|
syn = "2.0"
|
||||||
@@ -62,6 +64,7 @@ frunk-enum-core = "0.3"
|
|||||||
# Async dependencies
|
# Async dependencies
|
||||||
tokio = "1.46"
|
tokio = "1.46"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
futures-lite = "2.6.1"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
futures-timer = "3.0"
|
futures-timer = "3.0"
|
||||||
|
|
||||||
|
|||||||
23
rust/cluster_membership/Cargo.toml
Normal file
23
rust/cluster_membership/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
[package]
|
||||||
|
name = "cluster_membership"
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
publish = false
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
# util
|
||||||
|
anyhow.workspace = true
|
||||||
|
log.workspace = true
|
||||||
|
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
|
||||||
|
|
||||||
|
# async
|
||||||
|
tokio = { workspace = true, features = ["full"] }
|
||||||
|
futures-timer = { workspace = true }
|
||||||
|
futures-lite = "2.6.1"
|
||||||
|
|
||||||
|
# networking
|
||||||
|
libp2p = { workspace = true, features = ["full"] }
|
||||||
|
async-trait = "0.1.89"
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
30
rust/cluster_membership/examples/chatroom.rs
Normal file
30
rust/cluster_membership/examples/chatroom.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use cluster_membership::Peer;
|
||||||
|
use libp2p::identity::ed25519::SecretKey;
|
||||||
|
use tokio::io::{self, AsyncBufReadExt};
|
||||||
|
use tracing_subscriber::{EnvFilter, filter::LevelFilter};
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let _ = tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
|
||||||
|
.try_init();
|
||||||
|
|
||||||
|
let (mut peer, send, mut recv) =
|
||||||
|
Peer::new(SecretKey::generate(), "hello".to_string()).expect("peer should always build");
|
||||||
|
|
||||||
|
let ch = peer.subscribe("chatroom".to_string());
|
||||||
|
let jh = tokio::spawn(async move { peer.run().await });
|
||||||
|
|
||||||
|
let mut stdin = io::BufReader::new(io::stdin()).lines();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Ok(Some(line)) = stdin.next_line() => {send.send((ch.clone(), line.into_bytes())).await.expect("example");}
|
||||||
|
Some(r) = recv.recv() => match r {
|
||||||
|
Ok((_, id, line)) => println!("{:?}:{:?}", id, String::from_utf8_lossy(&line)),
|
||||||
|
Err(e) => eprintln!("{e:?}"),
|
||||||
|
},
|
||||||
|
else => break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jh.await.expect("task failure");
|
||||||
|
}
|
||||||
227
rust/cluster_membership/src/lib.rs
Normal file
227
rust/cluster_membership/src/lib.rs
Normal file
@@ -0,0 +1,227 @@
|
|||||||
|
use libp2p::{
|
||||||
|
Multiaddr, PeerId, Swarm, SwarmBuilder,
|
||||||
|
futures::StreamExt,
|
||||||
|
gossipsub::{self, PublishError, Sha256Topic, TopicHash},
|
||||||
|
identify,
|
||||||
|
identity::{Keypair, ed25519},
|
||||||
|
mdns,
|
||||||
|
swarm::{NetworkBehaviour, SwarmEvent, dial_opts::DialOpts},
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
use tokio::{select, sync::mpsc};
|
||||||
|
|
||||||
|
const DEFAULT_BUFFER_SIZE: usize = 10;
|
||||||
|
const MDNS_IGNORE_DURATION_SECS: u64 = 30;
|
||||||
|
|
||||||
|
impl Peer {
|
||||||
|
pub fn new(
|
||||||
|
identity: ed25519::SecretKey,
|
||||||
|
namespace: String,
|
||||||
|
) -> anyhow::Result<(
|
||||||
|
Self,
|
||||||
|
mpsc::Sender<(TopicHash, Vec<u8>)>,
|
||||||
|
mpsc::Receiver<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
|
||||||
|
)> {
|
||||||
|
let mut id_bytes = identity.as_ref().to_vec();
|
||||||
|
|
||||||
|
let mut swarm =
|
||||||
|
SwarmBuilder::with_existing_identity(Keypair::ed25519_from_bytes(&mut id_bytes)?)
|
||||||
|
.with_tokio()
|
||||||
|
.with_quic()
|
||||||
|
// TODO(evan): .with_bandwidth_metrics();
|
||||||
|
.with_behaviour(|kp| Behaviour::new(kp, namespace.clone()))?
|
||||||
|
.build();
|
||||||
|
|
||||||
|
swarm.listen_on("/ip6/::/udp/0/quic-v1".parse()?)?;
|
||||||
|
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
|
||||||
|
let (to_swarm, from_client) = mpsc::channel(DEFAULT_BUFFER_SIZE);
|
||||||
|
let (to_client, from_swarm) = mpsc::channel(DEFAULT_BUFFER_SIZE);
|
||||||
|
Ok((
|
||||||
|
Self {
|
||||||
|
swarm,
|
||||||
|
namespace,
|
||||||
|
denied: HashMap::new(),
|
||||||
|
from_client,
|
||||||
|
to_client,
|
||||||
|
},
|
||||||
|
to_swarm,
|
||||||
|
from_swarm,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe(&mut self, topic: String) -> TopicHash {
|
||||||
|
let topic = Sha256Topic::new(topic);
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.subscribe(&topic)
|
||||||
|
.expect("topic filtered");
|
||||||
|
topic.hash()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self) {
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
ev = self.swarm.select_next_some() => {
|
||||||
|
let Ok(()) = self.handle_swarm_event(ev).await else {
|
||||||
|
return
|
||||||
|
};
|
||||||
|
},
|
||||||
|
Some(msg) = self.from_client.recv() => {
|
||||||
|
if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(msg.0, msg.1) {
|
||||||
|
let Ok(()) = self.to_client.send(Err(e)).await else {
|
||||||
|
return
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourEvent>) -> Result<(), ()> {
|
||||||
|
let SwarmEvent::Behaviour(event) = event else {
|
||||||
|
if let SwarmEvent::NewListenAddr {
|
||||||
|
listener_id: _,
|
||||||
|
address,
|
||||||
|
} = event
|
||||||
|
{
|
||||||
|
log::info!("new listen address {address}")
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
match event {
|
||||||
|
BehaviourEvent::Mdns(mdns_event) => match mdns_event {
|
||||||
|
mdns::Event::Discovered(vec) => {
|
||||||
|
// Dial everyone
|
||||||
|
let mut addrs = HashMap::<PeerId, Vec<Multiaddr>>::new();
|
||||||
|
vec.into_iter()
|
||||||
|
.filter(|(peer_id, _)| {
|
||||||
|
self.denied.get(peer_id).is_none_or(|t| {
|
||||||
|
t.elapsed() > Duration::from_secs(MDNS_IGNORE_DURATION_SECS)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.for_each(|(peer_id, addr)| addrs.entry(peer_id).or_default().push(addr));
|
||||||
|
addrs.into_iter().for_each(|(peer_id, addrs)| {
|
||||||
|
let _ = self
|
||||||
|
.swarm
|
||||||
|
.dial(DialOpts::peer_id(peer_id).addresses(addrs).build());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
mdns::Event::Expired(vec) => {
|
||||||
|
vec.iter().for_each(|(peer_id, _)| {
|
||||||
|
log::debug!("{peer_id} no longer reachable on mDNS");
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.remove_explicit_peer(peer_id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
BehaviourEvent::Identify(identify::Event::Received {
|
||||||
|
connection_id: _,
|
||||||
|
peer_id,
|
||||||
|
info,
|
||||||
|
}) => {
|
||||||
|
if info
|
||||||
|
.protocols
|
||||||
|
.iter()
|
||||||
|
.any(|p| p.as_ref().contains(&self.namespace))
|
||||||
|
{
|
||||||
|
self.passed_namespace(peer_id);
|
||||||
|
} else {
|
||||||
|
self.failed_namespace(peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BehaviourEvent::Gossipsub(gossipsub::Event::Message {
|
||||||
|
propagation_source: _,
|
||||||
|
message_id: _,
|
||||||
|
message:
|
||||||
|
gossipsub::Message {
|
||||||
|
topic,
|
||||||
|
data,
|
||||||
|
source: Some(source_peer),
|
||||||
|
..
|
||||||
|
},
|
||||||
|
}) => {
|
||||||
|
let Ok(()) = self.to_client.send(Ok((topic, source_peer, data))).await else {
|
||||||
|
return Err(());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn passed_namespace(&mut self, peer: PeerId) {
|
||||||
|
log::info!("new peer {peer:?}");
|
||||||
|
self.denied.remove(&peer);
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.remove_blacklisted_peer(&peer);
|
||||||
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.gossipsub
|
||||||
|
.add_explicit_peer(&peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn failed_namespace(&mut self, peer: PeerId) {
|
||||||
|
log::debug!("{peer} failed handshake");
|
||||||
|
self.denied.insert(peer, Instant::now());
|
||||||
|
self.swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
|
||||||
|
// we don't care if disconnect fails
|
||||||
|
let _ = self.swarm.disconnect_peer_id(peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Peer {
|
||||||
|
pub swarm: Swarm<Behaviour>,
|
||||||
|
denied: HashMap<PeerId, Instant>,
|
||||||
|
namespace: String,
|
||||||
|
to_client: mpsc::Sender<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
|
||||||
|
from_client: mpsc::Receiver<(TopicHash, Vec<u8>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn foo() {
|
||||||
|
fn bar<T: Send>(t: T) {}
|
||||||
|
let p: Peer = unimplemented!();
|
||||||
|
bar(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(NetworkBehaviour)]
|
||||||
|
pub struct Behaviour {
|
||||||
|
mdns: mdns::tokio::Behaviour,
|
||||||
|
pub gossipsub: gossipsub::Behaviour,
|
||||||
|
identify: identify::Behaviour,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Behaviour {
|
||||||
|
fn new(kp: &Keypair, namespace: String) -> Self {
|
||||||
|
let mdns = mdns::tokio::Behaviour::new(Default::default(), kp.public().to_peer_id())
|
||||||
|
.expect("implementation is infallible");
|
||||||
|
let gossipsub = gossipsub::Behaviour::new(
|
||||||
|
gossipsub::MessageAuthenticity::Signed(kp.clone()),
|
||||||
|
gossipsub::ConfigBuilder::default()
|
||||||
|
.max_transmit_size(1024 * 1024)
|
||||||
|
.protocol_id_prefix(format!("/exo/gossip/{namespace}/v1"))
|
||||||
|
.build()
|
||||||
|
.expect("fixed gossipsub config should always build"),
|
||||||
|
)
|
||||||
|
.expect("fixed gossipsub init should always build");
|
||||||
|
|
||||||
|
let identify = identify::Behaviour::new(
|
||||||
|
identify::Config::new_with_signed_peer_record(format!("/exo/identity/v1"), kp)
|
||||||
|
.with_push_listen_addr_updates(true),
|
||||||
|
);
|
||||||
|
|
||||||
|
Behaviour {
|
||||||
|
mdns,
|
||||||
|
gossipsub,
|
||||||
|
identify,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,6 +22,7 @@ doc = false
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
cluster_membership.workspace = true
|
||||||
networking = { workspace = true }
|
networking = { workspace = true }
|
||||||
|
|
||||||
# interop
|
# interop
|
||||||
|
|||||||
@@ -6,3 +6,41 @@
|
|||||||
|
|
||||||
pub mod ident;
|
pub mod ident;
|
||||||
pub mod multiaddr;
|
pub mod multiaddr;
|
||||||
|
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use cluster_membership::Peer;
|
||||||
|
use libp2p::identity::ed25519::Keypair;
|
||||||
|
use pyo3::prelude::*;
|
||||||
|
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
|
||||||
|
|
||||||
|
#[gen_stub_pyclass]
|
||||||
|
#[pyclass]
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PyKeypair(Keypair);
|
||||||
|
|
||||||
|
#[gen_stub_pymethods]
|
||||||
|
#[pymethods]
|
||||||
|
impl PyKeypair {
|
||||||
|
#[staticmethod]
|
||||||
|
fn generate() -> Self {
|
||||||
|
Self(Keypair::generate())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[gen_stub_pyclass]
|
||||||
|
#[pyclass]
|
||||||
|
pub struct PyPeer(Mutex<Peer>);
|
||||||
|
|
||||||
|
#[gen_stub_pymethods]
|
||||||
|
#[pymethods]
|
||||||
|
impl PyPeer {
|
||||||
|
#[staticmethod]
|
||||||
|
fn init(kp: PyKeypair, namespace: String) -> PyResult<Self> {
|
||||||
|
Ok(PyPeer(Mutex::new(
|
||||||
|
Peer::new(kp.0.secret(), namespace)
|
||||||
|
.map_err(|e| e.pyerr())?
|
||||||
|
.0,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user