Remove rust

This commit is contained in:
Alex Cheema
2025-08-02 08:16:39 -07:00
committed by GitHub
parent a46f8c3cd1
commit 92c9688bf0
48 changed files with 305 additions and 3620 deletions

10
.idea/exo-v2.iml generated
View File

@@ -7,21 +7,11 @@
</component>
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/rust/discovery/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rust/discovery/tests" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/rust/exo_pyo3_bindings/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rust/exo_pyo3_bindings/tests" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/rust/master_election/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rust/master_election/tests" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/rust/util/fn_pipe/proc/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rust/util/fn_pipe/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rust/util/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/engines/mlx" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/master" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/shared" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/worker" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
<excludeFolder url="file://$MODULE_DIR$/rust/target" />
<excludeFolder url="file://$MODULE_DIR$/.direnv" />
<excludeFolder url="file://$MODULE_DIR$/build" />
</content>

29
flake.lock generated
View File

@@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1752950548,
"narHash": "sha256-NS6BLD0lxOrnCiEOcvQCDVPXafX1/ek1dfJHX1nUIzc=",
"lastModified": 1753939845,
"narHash": "sha256-K2ViRJfdVGE8tpJejs8Qpvvejks1+A4GQej/lBk5y7I=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "c87b95e25065c028d31a94f06a62927d18763fdf",
"rev": "94def634a20494ee057c76998843c015909d6311",
"type": "github"
},
"original": {
@@ -37,28 +37,7 @@
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1753156081,
"narHash": "sha256-N+8LM+zvS6cP+VG2vxgEEDCyX1T9EUq9wXTSvGwX9TM=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "8610c0f3801fc8dec7eb4b79c95fb39d16f38a80",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
"nixpkgs": "nixpkgs"
}
},
"systems": {

View File

@@ -7,18 +7,13 @@
url = "github:numtide/flake-utils";
inputs.nixpkgs.follows = "nixpkgs";
};
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, rust-overlay, flake-utils }:
outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
overlays = [ (import rust-overlay) ];
pkgs = (import nixpkgs) {
inherit system overlays;
inherit system;
};
# Go 1.23 compiler align with go.mod
@@ -38,11 +33,6 @@
buildInputs = with pkgs; [
];
nativeBuildInputs = with pkgs; [
# This sets up the rust suite, automatically selecting the latest nightly version
(rust-bin.selectLatestNightlyWith
(toolchain: toolchain.default.override {
extensions = [ "rust-src" "clippy" ];
}))
];
in
{
@@ -80,7 +70,6 @@
'';
nativeBuildInputs = with pkgs; [
cargo-expand
nixpkgs-fmt
cmake
] ++ buildInputs ++ nativeBuildInputs;

View File

@@ -1,136 +0,0 @@
import asyncio
import logging
from exo_pyo3_bindings import ConnectionUpdate, DiscoveryService, Keypair
from shared.db import AsyncSQLiteEventStorage
from shared.types.common import NodeId
from shared.types.events import TopologyEdgeCreated, TopologyEdgeDeleted
from shared.types.multiaddr import Multiaddr
from shared.types.topology import Connection
class DiscoverySupervisor:
def __init__(self, node_id_keypair: Keypair, node_id: NodeId, global_events: AsyncSQLiteEventStorage,
logger: logging.Logger):
self.global_events = global_events
self.logger = logger
self.node_id = node_id
# configure callbacks
self.discovery_service = DiscoveryService(node_id_keypair)
self._add_connected_callback()
self._add_disconnected_callback()
def _add_connected_callback(self):
stream_get, stream_put = _make_iter()
self.discovery_service.add_connected_callback(stream_put)
async def run():
async for c in stream_get:
await self._connected_callback(c)
return asyncio.create_task(run())
def _add_disconnected_callback(self):
stream_get, stream_put = _make_iter()
async def run():
async for c in stream_get:
await self._disconnected_callback(c)
self.discovery_service.add_disconnected_callback(stream_put)
return asyncio.create_task(run())
async def _connected_callback(self, e: ConnectionUpdate) -> None:
local_node_id = self.node_id
send_back_node_id = NodeId(e.peer_id.to_base58())
local_multiaddr = Multiaddr(address=str(e.local_addr))
send_back_multiaddr = Multiaddr(address=str(e.send_back_addr))
connection_profile = None
if send_back_multiaddr.ipv4_address == local_multiaddr.ipv4_address:
return
topology_edge_created = TopologyEdgeCreated(edge=Connection(
local_node_id=local_node_id,
send_back_node_id=send_back_node_id,
local_multiaddr=local_multiaddr,
send_back_multiaddr=send_back_multiaddr,
connection_profile=connection_profile
))
self.logger.info(
msg=f"CONNECTED CALLBACK: {local_node_id} -> {send_back_node_id}, {local_multiaddr} -> {send_back_multiaddr}")
await self.global_events.append_events(
[topology_edge_created],
self.node_id
)
async def _disconnected_callback(self, e: ConnectionUpdate) -> None:
local_node_id = self.node_id
send_back_node_id = NodeId(e.peer_id.to_base58())
local_multiaddr = Multiaddr(address=str(e.local_addr))
send_back_multiaddr = Multiaddr(address=str(e.send_back_addr))
connection_profile = None
topology_edge_created = TopologyEdgeDeleted(edge=Connection(
local_node_id=local_node_id,
send_back_node_id=send_back_node_id,
local_multiaddr=local_multiaddr,
send_back_multiaddr=send_back_multiaddr,
connection_profile=connection_profile
))
self.logger.error(
msg=f"DISCONNECTED CALLBACK: {local_node_id} -> {send_back_node_id}, {local_multiaddr} -> {send_back_multiaddr}")
await self.global_events.append_events(
[topology_edge_created],
self.node_id
)
def _make_iter(): # TODO: generalize to generic utility
loop = asyncio.get_event_loop()
queue: asyncio.Queue[ConnectionUpdate] = asyncio.Queue()
def put(c: ConnectionUpdate) -> None:
loop.call_soon_threadsafe(queue.put_nowait, c)
async def get():
while True:
yield await queue.get()
return get(), put
# class MyClass: # TODO: figure out how to make pydantic integrate with Multiaddr
# def __init__(self, data: str):
# self.data = data
#
# @staticmethod
# def from_str(s: str, _i: ValidationInfo) -> 'MyClass':
# return MyClass(s)
#
# def __str__(self):
# return self.data
#
# @classmethod
# def __get_pydantic_core_schema__(
# cls, source_type: type[any], handler: GetCoreSchemaHandler
# ) -> CoreSchema:
# return core_schema.with_info_after_validator_function(
# function=MyClass.from_str,
# schema=core_schema.bytes_schema(),
# serialization=core_schema.to_string_ser_schema()
# )
#
#
# # Use directly in a model (no Annotated needed)
# class ExampleModel(BaseModel):
# field: MyClass
#
#
# m = ExampleModel(field=MyClass("foo"))
# d = m.model_dump()
# djs = m.model_dump_json()
#
# print(d)
# print(djs)

View File

@@ -6,11 +6,7 @@ import traceback
from pathlib import Path
from typing import List
from exo_pyo3_bindings import Keypair
from master.api import start_fastapi_server
# from master.discovery_supervisor import DiscoverySupervisor
from master.election_callback import ElectionCallbacks
from master.forwarder_supervisor import ForwarderRole, ForwarderSupervisor
from master.placement import get_instance_placements, get_transition_events
@@ -34,7 +30,7 @@ from shared.types.events.commands import (
from shared.types.state import State
from shared.types.tasks import ChatCompletionTask, TaskId, TaskStatus, TaskType
from shared.types.worker.instances import Instance
from shared.utils import get_node_id_keypair
from shared.utils import Keypair, get_node_id_keypair
class Master:
@@ -42,17 +38,11 @@ class Master:
global_events: AsyncSQLiteEventStorage, worker_events: AsyncSQLiteEventStorage,
forwarder_binary_path: Path, logger: logging.Logger):
self.state = State()
self.node_id_keypair = node_id_keypair
self.node_id = node_id
self.command_buffer = command_buffer
self.global_events = global_events
self.worker_events = worker_events
# self.discovery_supervisor = DiscoverySupervisor(
# node_id_keypair,
# node_id,
# # TODO: needs to be more general for when we have master election
# worker_events if os.getenv('EXO_RUN_AS_REPLICA') in set(['TRUE', 'true', '1']) else global_events,
# logger
# )
self.forwarder_supervisor = ForwarderSupervisor(
self.node_id,
forwarder_binary_path=forwarder_binary_path,
@@ -191,7 +181,7 @@ async def main():
logger.info('Running FastAPI server in a separate thread. Listening on port 8000.')
master = Master(node_id_keypair, node_id, command_buffer, global_events, worker_events,
forwarder_binary_path=Path("./build/forwarder"), logger=logger)
Path("./build/forwarder"), logger)
await master.run()

View File

@@ -5,7 +5,6 @@ from pathlib import Path
from typing import List, Sequence
import pytest
from exo_pyo3_bindings import Keypair
from master.main import Master
from shared.db.sqlite.config import EventLogConfig
@@ -35,6 +34,7 @@ from shared.types.tasks import ChatCompletionTask, TaskStatus, TaskType
from shared.types.worker.common import InstanceId
from shared.types.worker.instances import Instance, InstanceStatus, ShardAssignments
from shared.types.worker.shards import PartitionStrategy, PipelineShardMetadata
from shared.utils import Keypair
def _create_forwarder_dummy_binary() -> Path:

View File

@@ -11,13 +11,14 @@ dependencies = [
"exo-worker",
"types-aiofiles>=24.1.0.20250708",
"typeguard>=4.4.4",
"pydantic>=2.11.7"
"pydantic>=2.11.7",
"base58>=2.1.1",
"cryptography>=45.0.5",
]
# dependencies only required for development
[dependency-groups]
dev = [
"maturin>=1.9.0",
"pytest>=8.4.0",
"pytest-asyncio>=1.0.0",
"ruff>=0.11.13",
@@ -38,8 +39,7 @@ members = [
"master",
"worker",
"shared",
"engines/*",
"rust/exo_pyo3_bindings",
"engines/*"
]
[tool.uv.sources]
@@ -47,7 +47,6 @@ exo-shared = { workspace = true }
exo-master = { workspace = true }
exo-worker = { workspace = true }
exo-engine-mlx = { workspace = true }
exo-pyo3-bindings = { workspace = true }
[build-system]
requires = ["hatchling"]

11
rust/.gitignore vendored
View File

@@ -1,11 +0,0 @@
/target
compile
.*
./*.wacc
*.s
*.core
.wacc
*.png
*.dot
Cargo.lock

View File

@@ -1,166 +0,0 @@
[workspace]
resolver = "3"
members = [
"discovery",
"exo_pyo3_bindings",
"master_election",
"util",
"util/fn_pipe",
"util/fn_pipe/proc",
]
[workspace.package]
version = "0.0.1"
edition = "2024"
[profile.dev]
opt-level = 1
debug = true
[profile.release]
opt-level = 3
# Common shared dependendencies configured once at the workspace
# level, to be re-used more easily across workspace member crates.
#
# Common configurations include versions, paths, features, etc.
[workspace.dependencies]
## Crate members as common dependencies
discovery = { path = "discovery" }
master_election = { path = "master_election" }
util = { path = "util" }
exo_pyo3_bindings = { path = "exo_pyo3_bindings" }
fn_pipe = { path = "util/fn_pipe" }
fn_pipe_proc = { path = "util/fn_pipe/proc" }
# Proc-macro authoring tools
syn = "2.0"
quote = "1.0"
proc-macro2 = "1.0"
darling = "0.20"
# Macro dependecies
extend = "1.2"
delegate = "0.13"
impl-trait-for-tuples = "0.2"
clap = "4.5"
derive_more = { version = "2.0.1", features = ["display"] }
# Utility dependencies
itertools = "0.14"
thiserror = "2"
internment = "0.8"
recursion = "0.5"
regex = "1.11"
once_cell = "1.21"
thread_local = "1.1"
bon = "3.4"
generativity = "1.1"
anyhow = "1.0"
keccak-const = "0.2"
# Functional generics/lenses frameworks
frunk_core = "0.4"
frunk = "0.4"
frunk_utils = "0.2"
frunk-enum-core = "0.3"
# Async dependencies
tokio = "1.46"
futures = "0.3"
futures-util = "0.3"
# Data structures
either = "1.15"
ordered-float = "5.0"
ahash = "0.8"
# networking
libp2p = "0.56"
libp2p-tcp = "0.44"
# interop
pyo3 = "0.25"
#pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git", rev = "d2626600e52452e71095c57e721514de748d419d" } # v0.11 not yet published to crates
pyo3-stub-gen = { git = "https://github.com/cstruct/pyo3-stub-gen.git", rev = "a935099276fa2d273496a2759d4af7177a6acd57" } # This fork adds support for type overrides => not merged yet!!!
pyo3-async-runtimes = "0.25"
[workspace.lints.rust]
static_mut_refs = "warn" # Or use "warn" instead of deny
incomplete_features = "allow"
# Clippy's lint category level configurations;
# every member crate needs to inherit these by adding
#
# ```toml
# [lints]
# workspace = true
# ```
#
# to their `Cargo.toml` files
[workspace.lints.clippy]
# Clippy lint categories meant to be enabled all at once
correctness = { level = "deny", priority = -1 }
suspicious = { level = "warn", priority = -1 }
style = { level = "warn", priority = -1 }
complexity = { level = "warn", priority = -1 }
perf = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }
# Individual Clippy lints from the `restriction` category
arithmetic_side_effects = "warn"
as_conversions = "warn"
assertions_on_result_states = "warn"
clone_on_ref_ptr = "warn"
decimal_literal_representation = "warn"
default_union_representation = "warn"
deref_by_slicing = "warn"
disallowed_script_idents = "deny"
else_if_without_else = "warn"
empty_enum_variants_with_brackets = "warn"
empty_structs_with_brackets = "warn"
error_impl_error = "warn"
exit = "deny"
expect_used = "warn"
float_cmp_const = "warn"
get_unwrap = "warn"
if_then_some_else_none = "warn"
impl_trait_in_params = "warn"
indexing_slicing = "warn"
infinite_loop = "warn"
let_underscore_must_use = "warn"
let_underscore_untyped = "warn"
lossy_float_literal = "warn"
mem_forget = "warn"
missing_inline_in_public_items = "warn"
multiple_inherent_impl = "warn"
multiple_unsafe_ops_per_block = "warn"
mutex_atomic = "warn"
non_zero_suggestions = "warn"
panic = "warn"
partial_pub_fields = "warn"
pattern_type_mismatch = "warn"
pub_without_shorthand = "warn"
rc_buffer = "warn"
rc_mutex = "warn"
redundant_type_annotations = "warn"
renamed_function_params = "warn"
rest_pat_in_fully_bound_structs = "warn"
same_name_method = "warn"
self_named_module_files = "deny"
semicolon_inside_block = "warn"
shadow_same = "warn"
shadow_unrelated = "warn"
str_to_string = "warn"
string_add = "warn"
string_lit_chars_any = "warn"
string_to_string = "warn"
tests_outside_test_module = "warn"
todo = "warn"
try_err = "warn"
undocumented_unsafe_blocks = "warn"
unnecessary_safety_comment = "warn"
unnecessary_safety_doc = "warn"
unneeded_field_pattern = "warn"
unseparated_literal_suffix = "warn"
unused_result_ok = "warn"
unused_trait_names = "warn"
unwrap_used = "warn"
verbose_file_reads = "warn"
static_mut_refs = "warn"

View File

@@ -1,2 +0,0 @@
# we can manually exclude false-positive lint errors for dual packages (if in dependencies)
#allowed-duplicate-crates = ["hashbrown"]

View File

@@ -1,39 +0,0 @@
[package]
name = "discovery"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
name = "discovery"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
# macro dependencies
extend = { workspace = true }
delegate = { workspace = true }
impl-trait-for-tuples = { workspace = true }
derive_more = { workspace = true }
# Async
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
# utility dependencies
#util = { workspace = true }
#fn_pipe = { workspace = true }
thiserror = { workspace = true }
#internment = { workspace = true }
#recursion = { workspace = true }
#generativity = { workspace = true }
#itertools = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
keccak-const = { workspace = true }
log = "0.4"
# Networking
libp2p = { workspace = true, features = ["full"] }

View File

@@ -1,244 +0,0 @@
use crate::alias::AnyResult;
use libp2p::core::Endpoint;
use libp2p::core::transport::PortUse;
use libp2p::swarm::derive_prelude::Either;
use libp2p::swarm::{
ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId, FromSwarm,
NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId, gossipsub, identity, mdns};
use std::fmt;
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::time::Duration;
/// Custom network behavior for `discovery` network; it combines [`mdns::tokio::Behaviour`] for
/// the actual mDNS discovery, and [`gossipsub::Behaviour`] for PubSub functionality.
#[derive(NetworkBehaviour)]
pub struct DiscoveryBehaviour {
pub mdns: mdns::tokio::Behaviour,
pub gossipsub: gossipsub::Behaviour,
}
// #[doc = "`NetworkBehaviour::ToSwarm` produced by DiscoveryBehaviour."]
// pub enum DiscoveryBehaviourEvent {
// Mdns(<mdns::tokio::Behaviour as NetworkBehaviour>::ToSwarm),
// Gossipsub(<gossipsub::Behaviour as NetworkBehaviour>::ToSwarm),
// }
// impl Debug for DiscoveryBehaviourEvent
// where
// <mdns::tokio::Behaviour as NetworkBehaviour>::ToSwarm: Debug,
// <gossipsub::Behaviour as NetworkBehaviour>::ToSwarm: Debug,
// {
// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// match &self {
// DiscoveryBehaviourEvent::Mdns(event) => {
// f.write_fmt(format_args!("{}: {:?}", "DiscoveryBehaviourEvent", event))
// }
// DiscoveryBehaviourEvent::Gossipsub(event) => {
// f.write_fmt(format_args!("{}: {:?}", "DiscoveryBehaviourEvent", event))
// }
// }
// }
// }
// impl NetworkBehaviour for DiscoveryBehaviour
// where
// mdns::tokio::Behaviour: NetworkBehaviour,
// gossipsub::Behaviour: NetworkBehaviour,
// {
// type ConnectionHandler =
// ConnectionHandlerSelect<THandler<mdns::tokio::Behaviour>, THandler<gossipsub::Behaviour>>;
// type ToSwarm = DiscoveryBehaviourEvent;
// #[allow(clippy::needless_question_mark)]
// fn handle_pending_inbound_connection(
// &mut self,
// connection_id: ConnectionId,
// local_addr: &Multiaddr,
// remote_addr: &Multiaddr,
// ) -> Result<(), ConnectionDenied> {
// NetworkBehaviour::handle_pending_inbound_connection(
// &mut self.mdns,
// connection_id,
// local_addr,
// remote_addr,
// )?;
// NetworkBehaviour::handle_pending_inbound_connection(
// &mut self.gossipsub,
// connection_id,
// local_addr,
// remote_addr,
// )?;
// Ok(())
// }
// #[allow(clippy::needless_question_mark)]
// fn handle_established_inbound_connection(
// &mut self,
// connection_id: ConnectionId,
// peer: PeerId,
// local_addr: &Multiaddr,
// remote_addr: &Multiaddr,
// ) -> Result<THandler<Self>, ConnectionDenied> {
// Ok(ConnectionHandler::select(
// self.mdns.handle_established_inbound_connection(
// connection_id,
// peer,
// local_addr,
// remote_addr,
// )?,
// self.gossipsub.handle_established_inbound_connection(
// connection_id,
// peer,
// local_addr,
// remote_addr,
// )?,
// ))
// }
// #[allow(clippy::needless_question_mark)]
// fn handle_pending_outbound_connection(
// &mut self,
// connection_id: ConnectionId,
// maybe_peer: Option<PeerId>,
// addresses: &[Multiaddr],
// effective_role: Endpoint,
// ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
// let mut combined_addresses = Vec::new();
// combined_addresses.extend(NetworkBehaviour::handle_pending_outbound_connection(
// &mut self.mdns,
// connection_id,
// maybe_peer,
// addresses,
// effective_role,
// )?);
// combined_addresses.extend(NetworkBehaviour::handle_pending_outbound_connection(
// &mut self.gossipsub,
// connection_id,
// maybe_peer,
// addresses,
// effective_role,
// )?);
// Ok(combined_addresses)
// }
// #[allow(clippy::needless_question_mark)]
// fn handle_established_outbound_connection(
// &mut self,
// connection_id: ConnectionId,
// peer: PeerId,
// addr: &Multiaddr,
// role_override: Endpoint,
// port_use: PortUse,
// ) -> Result<THandler<Self>, ConnectionDenied> {
// Ok(ConnectionHandler::select(
// self.mdns.handle_established_outbound_connection(
// connection_id,
// peer,
// addr,
// role_override,
// port_use,
// )?,
// self.gossipsub.handle_established_outbound_connection(
// connection_id,
// peer,
// addr,
// role_override,
// port_use,
// )?,
// ))
// }
// fn on_swarm_event(&mut self, event: FromSwarm) {
// self.mdns.on_swarm_event(event);
// self.gossipsub.on_swarm_event(event);
// }
// fn on_connection_handler_event(
// &mut self,
// peer_id: PeerId,
// connection_id: ConnectionId,
// event: THandlerOutEvent<Self>,
// ) {
// match event {
// Either::Left(ev) => NetworkBehaviour::on_connection_handler_event(
// &mut self.mdns,
// peer_id,
// connection_id,
// ev,
// ),
// Either::Right(ev) => NetworkBehaviour::on_connection_handler_event(
// &mut self.gossipsub,
// peer_id,
// connection_id,
// ev,
// ),
// }
// }
// fn poll(
// &mut self,
// cx: &mut std::task::Context,
// ) -> std::task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// match NetworkBehaviour::poll(&mut self.mdns, cx) {
// std::task::Poll::Ready(e) => {
// return std::task::Poll::Ready(
// e.map_out(DiscoveryBehaviourEvent::Mdns)
// .map_in(|event| Either::Left(event)),
// );
// }
// std::task::Poll::Pending => {}
// }
// match NetworkBehaviour::poll(&mut self.gossipsub, cx) {
// std::task::Poll::Ready(e) => {
// return std::task::Poll::Ready(
// e.map_out(DiscoveryBehaviourEvent::Gossipsub)
// .map_in(|event| Either::Right(event)),
// );
// }
// std::task::Poll::Pending => {}
// }
// std::task::Poll::Pending
// }
// }
fn mdns_behaviour(keypair: &identity::Keypair) -> AnyResult<mdns::tokio::Behaviour> {
use mdns::{Config, tokio};
// mDNS config => enable IPv6
let mdns_config = Config {
enable_ipv6: true,
..Default::default()
};
let mdns_behaviour = tokio::Behaviour::new(mdns_config, keypair.public().to_peer_id());
Ok(mdns_behaviour?)
}
fn gossipsub_behaviour(keypair: &identity::Keypair) -> AnyResult<gossipsub::Behaviour> {
use gossipsub::ConfigBuilder;
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let gossipsub_config = ConfigBuilder::default()
// .mesh_n_low(1
.mesh_n(1) // this is for debugging!!! change to 6
// .mesh_n_for_topic(1, topic.hash()) // this is for debugging!!! change to 6
// .mesh_n_high(1)
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(gossipsub::ValidationMode::None) // This sets the kind of message validation. Skip signing for speed.
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()?; // Temporary hack because `build` does not return a proper `std::error::Error`.
// build a gossipsub network behaviour
let gossipsub_behavior = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config,
)?;
Ok(gossipsub_behavior)
}
pub fn discovery_behaviour(keypair: &identity::Keypair) -> AnyResult<DiscoveryBehaviour> {
Ok(DiscoveryBehaviour {
gossipsub: gossipsub_behaviour(keypair)?,
mdns: mdns_behaviour(keypair)?,
})
}

View File

@@ -1,149 +0,0 @@
//! TODO: crate documentation
//!
//! this is here as a placeholder documentation
//!
//!
// enable Rust-unstable features for convenience
#![feature(trait_alias)]
// #![feature(stmt_expr_attributes)]
// #![feature(unboxed_closures)]
// #![feature(assert_matches)]
// #![feature(async_fn_in_dyn_trait)]
// #![feature(async_for_loop)]
// #![feature(auto_traits)]
// #![feature(negative_impls)]
use crate::behaviour::{discovery_behaviour, DiscoveryBehaviour};
use crate::transport::discovery_transport;
use libp2p::{identity, Swarm, SwarmBuilder};
use std::net::IpAddr;
pub mod behaviour;
pub mod transport;
/// Namespace for all the type/trait aliases used by this crate.
pub(crate) mod alias {
use std::error::Error;
pub type AnyError = Box<dyn Error + Send + Sync + 'static>;
pub type AnyResult<T> = Result<T, AnyError>;
}
/// Namespace for crate-wide extension traits/methods
pub(crate) mod ext {}
pub(crate) mod private {
/// Sealed traits support
pub trait Sealed {}
impl<T: ?Sized> Sealed for T {}
}
/// Create and configure a swarm, and start listening to all ports/OS.
#[inline]
pub fn discovery_swarm(keypair: identity::Keypair) -> alias::AnyResult<Swarm<DiscoveryBehaviour>> {
let peer_id = keypair.public().to_peer_id();
log::info!("RUST: Creating discovery swarm with peer_id: {}", peer_id);
let mut swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_other_transport(discovery_transport)?
.with_behaviour(discovery_behaviour)?
.build();
// Listen on IPv4
let listen_addr_ipv4 = "/ip4/0.0.0.0/tcp/0".parse()?;
log::info!("RUST: Attempting to listen on: {}", listen_addr_ipv4);
swarm.listen_on(listen_addr_ipv4)?;
// Listen on IPv6 - try but don't fail if not available
let listen_addr_ipv6 = "/ip6/::/tcp/0".parse()?;
log::info!("RUST: Attempting to listen on: {}", listen_addr_ipv6);
match swarm.listen_on(listen_addr_ipv6) {
Ok(_) => log::info!("RUST: Successfully listening on IPv6"),
Err(e) => log::warn!("RUST: Failed to listen on IPv6 (this is okay if IPv6 is not available): {:?}", e),
}
Ok(swarm)
}
// TODO: - ensure that all changes to connections means a Disconnect/Reconnect event fired, i.e. if it switched IPs slighty or something
// - ensure that all links are unique, i.e. each connection has some kind of uniquely identifiable hash/multiaddress/whatever => temporally unique???
// - need pnet config, so that forwarder & discovery don't interfere with each-other
// - discovery network needs persistence, so swarm created from existing identity (passed as arg)
// - connect/disconnect events etc. should be handled with callbacks
// - DON'T need gossipsub JUST yet, only mDNS for discovery => potentially use something else instead of gossipsub
#[cfg(test)]
mod tests {
use crate::alias::AnyResult;
use crate::behaviour::DiscoveryBehaviourEvent;
use crate::discovery_swarm;
use futures::stream::StreamExt as _;
use libp2p::{gossipsub, identity, mdns, swarm::SwarmEvent};
use std::hash::Hash;
use tokio::{io, io::AsyncBufReadExt as _, select};
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::util::SubscriberInitExt as _;
use tracing_subscriber::EnvFilter;
#[tokio::test]
async fn chatroom_test() -> AnyResult<()> {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::DEBUG.into()))
.try_init();
// Configure swarm
let mut swarm = discovery_swarm(identity::Keypair::generate_ed25519())?;
// Create a Gossipsub topic & subscribe
let topic = gossipsub::IdentTopic::new("test-net");
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
println!(
"Enter messages via STDIN and they will be sent to connected peers using Gossipsub"
);
// Kick it off
loop {
select! {
Ok(Some(line)) = stdin.next_line() => {
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(topic.clone(), line.as_bytes()) {
println!("Publish error: {e:?}");
}
}
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(DiscoveryBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, multiaddr) in list {
println!("mDNS discovered a new peer: {peer_id} on {multiaddr}");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(DiscoveryBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, multiaddr) in list {
println!("mDNS discover peer has expired: {peer_id} on {multiaddr}");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(DiscoveryBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => println!(
"\n\nGot message: '{}' with id: {id} from peer: {peer_id}\n\n",
String::from_utf8_lossy(&message.data),
),
SwarmEvent::NewListenAddr { address, .. } => {
println!("Local node is listening on {address}");
}
e => {
println!("Other event {e:?}");
}
}
}
}
}
}

View File

@@ -1,81 +0,0 @@
use crate::alias::AnyResult;
use futures::{AsyncRead, AsyncWrite};
use keccak_const::Sha3_256;
use libp2p::{
core::{muxing, transport::Boxed}, identity,
noise,
pnet, quic, yamux, PeerId, Transport as _,
};
use std::any::Any;
/// Key used for discovery's private network. See [`pnet_upgrade`] for more.
const PNET_PRESHARED_KEY: [u8; 32] = Sha3_256::new().update(b"exo_discovery_network").finalize();
/// Make `discovery` run on a private network, as to not clash with the `forwarder` network.
/// This is implemented as an additional "upgrade" ontop of existing [`libp2p::Transport`] layers.
fn pnet_upgrade<Socket>(
socket: Socket,
_ignored: impl Any,
) -> impl Future<Output = Result<pnet::PnetOutput<Socket>, pnet::PnetError>>
where
Socket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
pnet::PnetConfig::new(pnet::PreSharedKey::new(PNET_PRESHARED_KEY)).handshake(socket)
}
/// TCP/IP transport layer configuration.
fn tcp_transport(
keypair: &identity::Keypair,
) -> AnyResult<Boxed<(PeerId, muxing::StreamMuxerBox)>> {
use libp2p::{
core::upgrade::Version,
tcp::{tokio, Config},
};
// `TCP_NODELAY` enabled => avoid latency
let tcp_config = Config::default()
.nodelay(true);
// V1 + lazy flushing => 0-RTT negotiation
let upgrade_version = Version::V1Lazy;
// Noise is faster than TLS + we don't care much for security
let noise_config = noise::Config::new(keypair)?;
//let tls_config = tls::Config::new(keypair)?; // TODO: add this in if needed?? => look into how `.with_tcp` does it...
// Use default Yamux config for multiplexing
let yamux_config = yamux::Config::default();
// Create new Tokio-driven TCP/IP transport layer
let base_transport = tokio::Transport::new(tcp_config)
.and_then(pnet_upgrade)
.upgrade(upgrade_version)
.authenticate(noise_config)
.multiplex(yamux_config);
// Return boxed transport (to flatten complex type)
Ok(base_transport.boxed())
}
/// QUIC transport layer configuration.
fn quic_transport(keypair: &identity::Keypair) -> Boxed<(PeerId, quic::Connection)> {
use libp2p::quic::{tokio, Config};
let quic_config = Config::new(keypair);
let base_transport = tokio::Transport::new(quic_config).boxed();
//.and_then(); // As of now, QUIC doesn't support PNet's.., ;( TODO: figure out in future how to do
unimplemented!("you cannot use this yet !!!");
base_transport
}
/// Overall composed transport-layer configuration for the `discovery` network.
pub fn discovery_transport(
keypair: &identity::Keypair,
) -> AnyResult<Boxed<(PeerId, muxing::StreamMuxerBox)>> {
// TODO: when QUIC is figured out with PNET, re-enable this
// Ok(tcp_transport(keypair)?
// .or_transport(quic_transport(keypair))
// .boxed())
tcp_transport(keypair)
}

View File

@@ -1,8 +0,0 @@
// maybe this will hold test in the future...??
#[cfg(test)]
mod tests {
#[test]
fn does_nothing() {
}
}

View File

@@ -1,76 +0,0 @@
[package]
name = "exo_pyo3_bindings"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
path = "src/lib.rs"
name = "exo_pyo3_bindings"
# "cdylib" needed to produce shared library for Python to import
# "rlib" needed for stub-gen to run
crate-type = ["cdylib", "rlib"]
[[bin]]
path = "src/bin/stub_gen.rs"
name = "stub_gen"
doc = false
[lints]
workspace = true
[dependencies]
discovery = { workspace = true }
# interop
pyo3 = { workspace = true, features = [
"abi3-py311", # tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.11
"nightly", # enables better-supported GIL integration
"experimental-async", # async support in #[pyfunction] & #[pymethods]
#"experimental-inspect", # inspection of generated binary => easier to automate type-hint generation
#"py-clone", # adding Clone-ing of `Py<T>` without GIL (may cause panics - remove if panics happen)
"multiple-pymethods", # allows multiple #[pymethods] sections per class
# integrations with other libraries
"arc_lock", "bigdecimal", "either", "hashbrown", "indexmap", "num-bigint", "num-complex", "num-rational",
"ordered-float", "rust_decimal", "smallvec",
# "anyhow", "chrono", "chrono-local", "chrono-tz", "eyre", "jiff-02", "lock_api", "parking-lot", "time", "serde",
] }
pyo3-stub-gen = { workspace = true }
pyo3-async-runtimes = { workspace = true, features = ["attributes", "tokio-runtime", "testing"] }
# macro dependencies
extend = { workspace = true }
delegate = { workspace = true }
impl-trait-for-tuples = { workspace = true }
derive_more = { workspace = true }
# async runtime
tokio = { workspace = true, features = ["full", "tracing"] }
# utility dependencies
once_cell = "1.21.3"
thread_local = "1.1.9"
#util = { workspace = true }
#fn_pipe = { workspace = true }
thiserror = { workspace = true }
#internment = { workspace = true }
#recursion = { workspace = true }
#generativity = { workspace = true }
#itertools = { workspace = true }
# Tracing
#tracing = "0.1"
#tracing-subscriber = "0.3"
#console-subscriber = "0.1.5"
#tracing-log = "0.2.0"
env_logger = "0.11"
log = "0.4"
pyo3-log = "0.12"
# Networking
libp2p = { workspace = true, features = ["full"] }

View File

@@ -1 +0,0 @@
TODO: do something here....

View File

@@ -1,170 +0,0 @@
# This file is automatically generated by pyo3_stub_gen
# ruff: noqa: E501, F401
import builtins
import collections.abc
class ConnectionId:
r"""
TODO: documentation...
"""
@staticmethod
def new_unchecked(id:builtins.int) -> ConnectionId:
r"""
TODO: documentation
"""
def __repr__(self) -> builtins.str: ...
def __str__(self) -> builtins.str: ...
class ConnectionUpdate:
@property
def peer_id(self) -> PeerId:
r"""
Identity of the peer that we have connected to.
"""
@property
def connection_id(self) -> ConnectionId:
r"""
Identifier of the connection.
"""
@property
def local_addr(self) -> Multiaddr:
r"""
Local connection address.
"""
@property
def send_back_addr(self) -> Multiaddr:
r"""
Address used to send back data to the remote.
"""
class DiscoveryService:
def __new__(cls, identity:Keypair) -> DiscoveryService: ...
def add_connected_callback(self, callback:collections.abc.Callable[[ConnectionUpdate], None]) -> None: ...
def add_disconnected_callback(self, callback:collections.abc.Callable[[ConnectionUpdate], None]) -> None: ...
class Keypair:
r"""
TODO: documentation...
"""
@staticmethod
def generate_ed25519() -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def generate_ecdsa() -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def generate_secp256k1() -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def from_protobuf_encoding(bytes:bytes) -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def rsa_from_pkcs8(bytes:bytes) -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def secp256k1_from_der(bytes:bytes) -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def ed25519_from_bytes(bytes:bytes) -> Keypair:
r"""
TODO: documentation
"""
@staticmethod
def ecdsa_from_bytes(bytes:bytes) -> Keypair:
r"""
TODO: documentation
"""
def to_protobuf_encoding(self) -> bytes:
r"""
TODO: documentation
"""
def to_peer_id(self) -> PeerId:
r"""
TODO: documentation
"""
class Multiaddr:
r"""
TODO: documentation...
"""
@staticmethod
def empty() -> Multiaddr:
r"""
TODO: documentation
"""
@staticmethod
def with_capacity(n:builtins.int) -> Multiaddr:
r"""
TODO: documentation
"""
@staticmethod
def from_bytes(bytes:bytes) -> Multiaddr:
r"""
TODO: documentation
"""
@staticmethod
def from_string(string:builtins.str) -> Multiaddr:
r"""
TODO: documentation
"""
def len(self) -> builtins.int:
r"""
TODO: documentation
"""
def is_empty(self) -> builtins.bool:
r"""
TODO: documentation
"""
def to_bytes(self) -> bytes:
r"""
TODO: documentation
"""
def to_string(self) -> builtins.str:
r"""
TODO: documentation
"""
class PeerId:
r"""
TODO: documentation...
"""
@staticmethod
def random() -> PeerId:
r"""
TODO: documentation
"""
@staticmethod
def from_bytes(bytes:bytes) -> PeerId:
r"""
TODO: documentation
"""
def to_bytes(self) -> bytes:
r"""
TODO: documentation
"""
def to_base58(self) -> builtins.str:
r"""
TODO: documentation
"""
def __repr__(self) -> builtins.str:
r"""
TODO: documentation
"""
def __str__(self) -> builtins.str:
r"""
TODO: documentation
"""

View File

@@ -1,35 +0,0 @@
[build-system]
requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"
[project]
name = "exo_pyo3_bindings"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
{ name = "Andrei Cravtov", email = "the.andrei.cravtov@gmail.com" }
]
requires-python = ">=3.13"
dependencies = []
[dependency-groups]
dev = [
"exo_pyo3_bindings",
"pytest>=8.4.0",
"pytest-asyncio>=1.0.0",
]
#[project.scripts]
#networking = "rust-bindings:main"
[tool.maturin]
#purelib = true
#python-source = "python"
module-name = "exo_pyo3_bindings"
features = ["pyo3/extension-module", "pyo3/experimental-async"]
[tool.pytest.ini_options]
log_cli = true
log_cli_level = "INFO"
asyncio_mode = "auto"

View File

@@ -1,32 +0,0 @@
use pyo3_stub_gen::Result;
fn main() -> Result<()> {
let body = async {
env_logger::Builder::from_env(env_logger::Env::default().filter_or("RUST_LOG", "info"))
.init();
let stub = exo_pyo3_bindings::stub_info()?;
stub.generate()?;
Ok(())
};
#[allow(
clippy::expect_used,
clippy::diverging_sub_expression,
clippy::needless_return
)]
{
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime");
let a = runtime.handle();
return runtime.block_on(body);
}
}
// fn main() -> Result<()> {
// let stub = python_bindings::stub_info()?;
// stub.generate()?;
// Ok(())
// }

View File

@@ -1,520 +0,0 @@
#![allow(
clippy::multiple_inherent_impl,
clippy::unnecessary_wraps,
clippy::unused_self,
clippy::needless_pass_by_value
)]
use crate::ext::ResultExt;
use crate::pylibp2p::connection::PyConnectionId;
use crate::pylibp2p::ident::{PyKeypair, PyPeerId};
use crate::pylibp2p::multiaddr::PyMultiaddr;
use crate::{MPSC_CHANNEL_SIZE, alias, pyclass};
use discovery::behaviour::{DiscoveryBehaviour, DiscoveryBehaviourEvent};
use discovery::discovery_swarm;
use libp2p::core::ConnectedPoint;
use libp2p::futures::StreamExt;
use libp2p::multiaddr::multiaddr;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{ConnectionId, SwarmEvent, ToSwarm};
use libp2p::{Multiaddr, PeerId, Swarm, gossipsub, mdns};
use std::net::IpAddr;
use pyo3::prelude::{PyModule, PyModuleMethods as _};
use pyo3::{Bound, Py, PyObject, PyResult, PyTraverseError, PyVisit, Python, pymethods};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use std::convert::identity;
use std::error::Error;
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
struct ConnectionUpdate {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
}
#[gen_stub_pyclass]
#[pyclass(frozen, name = "ConnectionUpdate")]
#[derive(Debug, Clone)]
struct PyConnectionUpdate {
/// Identity of the peer that we have connected to.
#[pyo3(get)]
peer_id: PyPeerId,
/// Identifier of the connection.
#[pyo3(get)]
connection_id: PyConnectionId,
/// Local connection address.
#[pyo3(get)]
local_addr: PyMultiaddr,
/// Address used to send back data to the remote.
#[pyo3(get)]
send_back_addr: PyMultiaddr,
}
impl PyConnectionUpdate {
fn from_connection_event(
ConnectionUpdate {
peer_id,
connection_id,
local_addr,
send_back_addr,
}: ConnectionUpdate,
) -> Self {
Self {
peer_id: PyPeerId(peer_id),
connection_id: PyConnectionId(connection_id),
local_addr: PyMultiaddr(local_addr),
send_back_addr: PyMultiaddr(send_back_addr),
}
}
}
enum IncomingDiscoveryMessage {
AddConnectedCallback(Box<dyn alias::SendFn<(ConnectionUpdate,), ()>>),
AddDisconnectedCallback(Box<dyn alias::SendFn<(ConnectionUpdate,), ()>>),
}
/// Check if a multiaddr is valid for connection
fn is_address_valid(addr: &Multiaddr) -> bool {
use libp2p::multiaddr::Protocol;
for component in addr.iter() {
match component {
Protocol::Ip4(ip) => {
let ip_addr = IpAddr::V4(ip);
// Filter out loopback and unspecified addresses
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
return false;
}
// Filter out Tailscale ranges (100.64.0.0/10)
if let IpAddr::V4(ipv4) = ip_addr {
let octets = ipv4.octets();
if octets[0] == 100 && octets[1] >= 64 && octets[1] <= 127 {
return false;
}
}
}
Protocol::Ip6(ip) => {
let ip_addr = IpAddr::V6(ip);
// Filter out loopback and unspecified addresses
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
return false;
}
// Filter out Tailscale IPv6 (fd7a:115c:a1e0::/48)
if let IpAddr::V6(ipv6) = ip_addr {
let segments = ipv6.segments();
if segments[0] == 0xfd7a && segments[1] == 0x115c && segments[2] == 0xa1e0 {
return false;
}
}
}
_ => {}
}
}
true
}
#[allow(clippy::enum_glob_use)]
async fn discovery_task(
mut receiver: mpsc::Receiver<IncomingDiscoveryMessage>,
mut swarm: Swarm<DiscoveryBehaviour>,
) {
use DiscoveryBehaviourEvent::*;
use IncomingDiscoveryMessage::*;
use SwarmEvent::*;
use gossipsub::Event::*;
use mdns::Event::*;
log::info!("RUST: discovery task started");
// create callbacks list
let mut connected_callbacks: Vec<Box<dyn alias::SendFn<(ConnectionUpdate,), ()>>> = vec![];
let mut disconnected_callbacks: Vec<Box<dyn alias::SendFn<(ConnectionUpdate,), ()>>> = vec![];
// Create periodic health check timer with adaptive interval
let fast_check_duration = Duration::from_secs(5);
let slow_check_duration = Duration::from_secs(30);
let mut health_check_interval = interval(fast_check_duration);
let mut no_connection_count = 0;
loop {
tokio::select! {
_ = health_check_interval.tick() => {
// Check connection health periodically
let connected_peers = swarm.connected_peers().count();
if connected_peers == 0 {
no_connection_count += 1;
log::info!("RUST: No connected peers (check #{no_connection_count})");
// Keep fast checking when disconnected
if health_check_interval.period() != fast_check_duration {
health_check_interval = interval(fast_check_duration);
log::info!("RUST: Switching to fast health checks (every {:?})", fast_check_duration);
}
// Force mDNS restart after multiple failed checks
if no_connection_count > 1 { // Trigger faster, after 2 checks
log::info!("RUST: Attempting to restart mDNS discovery");
// Note: In rust-libp2p, we can't easily restart mDNS like in Go,
// but we can force a re-announce by changing listening addresses
// This is a workaround to trigger mDNS to re-announce
// Try listening on a new ephemeral port to force re-announcement
match swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) {
Ok(_) => log::info!("RUST: Added new listener to force mDNS re-announcement"),
Err(e) => log::error!("RUST: Failed to add new listener: {e:?}"),
}
// Also try IPv6
match swarm.listen_on("/ip6/::/tcp/0".parse().unwrap()) {
Ok(_) => log::info!("RUST: Added IPv6 listener to force mDNS re-announcement"),
Err(e) => log::error!("RUST: Failed to add IPv6 listener: {e:?}"),
}
}
} else {
if no_connection_count > 0 {
log::info!("RUST: Connection restored, currently connected to {connected_peers} peers");
}
no_connection_count = 0;
// Switch to slow checking when connected
if health_check_interval.period() != slow_check_duration {
health_check_interval = interval(slow_check_duration);
log::info!("RUST: Switching to slow health checks (every {:?})", slow_check_duration);
}
}
}
message = receiver.recv() => {
// handle closed channel
let Some(message) = message else {
log::info!("RUST: channel closed");
break;
};
// attach callbacks for event types
match message {
AddConnectedCallback(callback) => {
log::info!("RUST: received connected callback");
connected_callbacks.push(callback);
}
AddDisconnectedCallback(callback) => {
log::info!("RUST: received disconnected callback");
disconnected_callbacks.push(callback);
}
}
}
swarm_event = swarm.select_next_some() => {
match swarm_event {
Behaviour(Mdns(Discovered(list))) => {
for (peer_id, multiaddr) in list {
log::info!("RUST: mDNS discovered a new peer: {peer_id} on {multiaddr}");
// Filter out invalid addresses
if !is_address_valid(&multiaddr) {
log::info!("RUST: Filtered out invalid address: {multiaddr}");
continue;
}
let local_peer_id = *swarm.local_peer_id();
// To avoid simultaneous dial races, only the lexicographically larger peer_id dials.
if peer_id > local_peer_id {
let dial_opts = DialOpts::peer_id(peer_id)
.addresses(vec![multiaddr.clone()].into())
.condition(libp2p::swarm::dial_opts::PeerCondition::Always)
.build();
match swarm.dial(dial_opts) {
Ok(()) => log::info!("RUST: Dial initiated to {multiaddr}"),
Err(libp2p::swarm::DialError::DialPeerConditionFalse(_)) => {
// Another dial is already in progress; not an error for us.
log::debug!(
"RUST: Dial skipped because another dial is active for {peer_id}"
);
}
Err(e) => {
log::warn!("RUST: Failed to dial {multiaddr}: {e:?}");
}
}
}
// Maintain peer in gossipsub mesh so the connection stays alive once established.
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
log::info!("RUST: Added peer {peer_id} to gossipsub explicit peers");
}
}
Behaviour(Mdns(Expired(list))) => {
for (peer_id, multiaddr) in list {
log::info!("RUST: mDNS discover peer has expired: {peer_id} on {multiaddr}");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
Behaviour(Gossipsub(Message {
propagation_source: peer_id,
message_id: id,
message,
})) => log::info!(
"RUST: Got message: '{}' with id: {id} from peer: {peer_id}",
String::from_utf8_lossy(&message.data),
),
ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established: _num_established,
concurrent_dial_errors,
established_in: _established_in,
} => {
log::info!("RUST: ConnectionEstablished event - peer_id: {peer_id}, connection_id: {connection_id:?}, endpoint: {endpoint:?}");
// log any connection errors
if let Some(concurrent_dial_errors) = concurrent_dial_errors {
for (multiaddr, error) in concurrent_dial_errors {
log::error!("Connection error: multiaddr={multiaddr}, error={error:?}");
}
}
// Extract addresses based on endpoint type
let (local_addr, send_back_addr) = match &endpoint {
ConnectedPoint::Listener { local_addr, send_back_addr } => {
log::info!("RUST: Connection established (Listener) - local_addr: {local_addr}, send_back_addr: {send_back_addr}");
(local_addr.clone(), send_back_addr.clone())
},
ConnectedPoint::Dialer { address, .. } => {
log::info!("RUST: Connection established (Dialer) - remote_addr: {address}");
// For dialer, we use the dialed address as both local and send_back
// This isn't perfect but allows both sides to be notified
(address.clone(), address.clone())
}
};
log::info!("RUST: Number of connected callbacks: {}", connected_callbacks.len());
// trigger callback on connected peer
for connected_callback in &connected_callbacks {
connected_callback(ConnectionUpdate {
peer_id,
connection_id,
local_addr: local_addr.clone(),
send_back_addr: send_back_addr.clone(),
});
}
},
ConnectionClosed { peer_id, connection_id, endpoint, num_established, cause } => {
log::info!("RUST: ConnectionClosed event - peer_id: {peer_id}, connection_id: {connection_id:?}, endpoint: {endpoint:?}, num_established: {num_established}");
// log any connection errors
if let Some(cause) = cause {
log::error!("Connection error: cause={cause:?}");
}
// Extract addresses based on endpoint type
let (local_addr, send_back_addr) = match &endpoint {
ConnectedPoint::Listener { local_addr, send_back_addr } => {
log::info!("RUST: Connection closed (Listener) - local_addr: {local_addr}, send_back_addr: {send_back_addr}");
(local_addr.clone(), send_back_addr.clone())
},
ConnectedPoint::Dialer { address, .. } => {
log::info!("RUST: Connection closed (Dialer) - remote_addr: {address}");
// For dialer, we use the dialed address as both local and send_back
// This isn't perfect but allows both sides to be notified
(address.clone(), address.clone())
}
};
log::info!("RUST: Number of disconnected callbacks: {}", disconnected_callbacks.len());
// trigger callback on connected peer
for disconnected_callback in &disconnected_callbacks {
disconnected_callback(ConnectionUpdate {
peer_id,
connection_id,
local_addr: local_addr.clone(),
send_back_addr: send_back_addr.clone(),
});
}
// If this was the last connection to the peer, try to force mDNS re-discovery
if num_established == 0 {
log::info!("RUST: Last connection to peer {peer_id} closed, triggering mDNS re-discovery");
// Remove from gossipsub to ensure clean state
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
// Force a listen address change to trigger mDNS re-announcement
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
log::info!("RUST: Delayed mDNS trigger after disconnect");
});
}
}
NewListenAddr { address, .. } => {
log::info!("RUST: Local node is listening on {address}");
let local_peer = swarm.local_peer_id();
log::info!("RUST: Local peer_id: {local_peer}");
}
OutgoingConnectionError { peer_id, error, .. } => {
log::error!("RUST: Outgoing connection error to peer {peer_id:?}: {error:?}");
// Connection failed, might be due to network change
if let Some(peer) = peer_id {
// Remove from gossipsub to allow fresh connection attempts
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer);
}
}
IncomingConnectionError { send_back_addr, error, .. } => {
log::error!("RUST: Incoming connection error from {send_back_addr}: {error:?}");
}
e => {
log::debug!("RUST: Other event {e:?}");
}
}
}
}
}
log::info!("RUST: discovery task stopped");
}
#[gen_stub_pyclass]
#[pyclass(name = "DiscoveryService")]
#[derive(Debug, Clone)]
struct PyDiscoveryService {
sender: Option<mpsc::Sender<IncomingDiscoveryMessage>>,
}
#[allow(clippy::expect_used)]
impl PyDiscoveryService {
const fn sender(&self) -> &mpsc::Sender<IncomingDiscoveryMessage> {
self.sender
.as_ref()
.expect("The sender should only be None after de-initialization.")
}
const fn sender_mut(&mut self) -> &mut mpsc::Sender<IncomingDiscoveryMessage> {
self.sender
.as_mut()
.expect("The sender should only be None after de-initialization.")
}
const fn new(sender: mpsc::Sender<IncomingDiscoveryMessage>) -> Self {
Self {
sender: Some(sender),
}
}
}
#[gen_stub_pymethods]
#[pymethods]
impl PyDiscoveryService {
#[new]
fn py_new<'py>(identity: Bound<'py, PyKeypair>) -> PyResult<Self> {
use pyo3_async_runtimes::tokio::get_runtime;
// create communication channel
let (sender, receiver) = mpsc::channel::<IncomingDiscoveryMessage>(MPSC_CHANNEL_SIZE);
// get identity
let identity = identity.borrow().0.clone();
log::info!("RUST: Creating DiscoveryService with keypair");
// create discovery swarm (within tokio context!! or it crashes)
let swarm = get_runtime()
.block_on(async { discovery_swarm(identity) })
.pyerr()?;
log::info!("RUST: Discovery swarm created successfully");
// spawn tokio task
get_runtime().spawn(async move {
log::info!("RUST: Starting discovery task");
discovery_task(receiver, swarm).await;
log::info!("RUST: Discovery task ended");
});
Ok(Self::new(sender))
}
#[allow(clippy::expect_used)]
fn add_connected_callback<'py>(
&self,
#[gen_stub(override_type(
type_repr="collections.abc.Callable[[ConnectionUpdate], None]",
imports=("collections.abc")
))]
callback: PyObject,
) -> PyResult<()> {
use pyo3_async_runtimes::tokio::get_runtime;
get_runtime()
.block_on(
self.sender()
.send(IncomingDiscoveryMessage::AddConnectedCallback(Box::new(
move |connection_event| {
Python::with_gil(|py| {
callback
.call1(
py,
(PyConnectionUpdate::from_connection_event(
connection_event,
),),
)
.expect("Callback should always work...");
});
},
))),
)
.pyerr()?;
Ok(())
}
#[allow(clippy::expect_used)]
fn add_disconnected_callback<'py>(
&self,
#[gen_stub(override_type(
type_repr="collections.abc.Callable[[ConnectionUpdate], None]",
imports=("collections.abc")
))]
callback: PyObject,
) -> PyResult<()> {
use pyo3_async_runtimes::tokio::get_runtime;
get_runtime()
.block_on(
self.sender()
.send(IncomingDiscoveryMessage::AddDisconnectedCallback(Box::new(
move |connection_event| {
Python::with_gil(|py| {
callback
.call1(
py,
(PyConnectionUpdate::from_connection_event(
connection_event,
),),
)
.expect("Callback should always work...");
});
},
))),
)
.pyerr()?;
Ok(())
}
#[gen_stub(skip)]
const fn __traverse__(&self, visit: PyVisit<'_>) -> Result<(), PyTraverseError> {
Ok(()) // This is needed purely so `__clear__` can work
}
#[gen_stub(skip)]
fn __clear__(&mut self) {
// TODO: may or may not need to await a "kill-signal" oneshot channel message,
// to ensure that the discovery task is done BEFORE exiting the clear function...
// but this may require GIL?? and it may not be safe to call GIL here??
self.sender = None; // Using Option<T> as a trick to force `sender` channel to be dropped
}
}
pub fn discovery_submodule(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyConnectionUpdate>()?;
m.add_class::<PyDiscoveryService>()?;
Ok(())
}

View File

@@ -1,101 +0,0 @@
//! TODO: crate documentation
//!
//! this is here as a placeholder documentation
//!
//!
// enable Rust-unstable features for convenience
#![feature(trait_alias)]
#![feature(tuple_trait)]
#![feature(unboxed_closures)]
// #![feature(stmt_expr_attributes)]
// #![feature(assert_matches)]
// #![feature(async_fn_in_dyn_trait)]
// #![feature(async_for_loop)]
// #![feature(auto_traits)]
// #![feature(negative_impls)]
extern crate core;
pub(crate) mod discovery;
pub(crate) mod pylibp2p;
use crate::discovery::discovery_submodule;
use crate::pylibp2p::connection::connection_submodule;
use crate::pylibp2p::ident::ident_submodule;
use crate::pylibp2p::multiaddr::multiaddr_submodule;
use pyo3::prelude::{PyModule, PyModuleMethods};
use pyo3::{prelude::*, types::*};
use pyo3::{pyclass, pymodule, Bound, PyResult};
use pyo3_stub_gen::define_stub_info_gatherer;
/// Namespace for all the type/trait aliases used by this crate.
pub(crate) mod alias {
use std::error::Error;
use std::marker::Tuple;
pub trait SendFn<Args: Tuple + Send + 'static, Output> =
Fn<Args, Output = Output> + Send + 'static;
pub type AnyError = Box<dyn Error + Send + Sync + 'static>;
pub type AnyResult<T> = Result<T, AnyError>;
}
/// Namespace for crate-wide extension traits/methods
pub(crate) mod ext {
use extend::ext;
use pyo3::exceptions::PyRuntimeError;
use pyo3::PyErr;
#[ext(pub, name = ResultExt)]
impl<T, E> Result<T, E>
where
E: ToString,
{
fn pyerr(self) -> Result<T, PyErr> {
self.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
}
}
pub(crate) mod private {
use std::marker::Sized;
/// Sealed traits support
pub trait Sealed {}
impl<T: ?Sized> Sealed for T {}
}
pub(crate) const MPSC_CHANNEL_SIZE: usize = 8;
/// A Python module implemented in Rust. The name of this function must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule(name = "exo_pyo3_bindings")]
fn main_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
// install logger
pyo3_log::init();
// TODO: for now this is all NOT a submodule, but figure out how to make the submodule system
// work with maturin, where the types generate correctly, in the right folder, without
// too many importing issues...
connection_submodule(m)?;
ident_submodule(m)?;
multiaddr_submodule(m)?;
discovery_submodule(m)?;
// top-level constructs
// TODO: ...
Ok(())
}
define_stub_info_gatherer!(stub_info);
/// Test of unit test for testing link problem
#[cfg(test)]
mod tests {
#[test]
fn test() {
assert_eq!(2 + 2, 4);
}
}

View File

@@ -1,36 +0,0 @@
use libp2p::swarm::ConnectionId;
use pyo3::prelude::{PyModule, PyModuleMethods};
use pyo3::{pyclass, pymethods, Bound, PyResult};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
/// TODO: documentation...
#[gen_stub_pyclass]
#[pyclass(name = "ConnectionId")]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PyConnectionId(pub ConnectionId);
#[gen_stub_pymethods]
#[pymethods]
#[allow(clippy::needless_pass_by_value)]
impl PyConnectionId {
/// TODO: documentation
#[staticmethod]
fn new_unchecked(id: usize) -> Self {
Self(ConnectionId::new_unchecked(id))
}
fn __repr__(&self) -> String {
format!("ConnectionId({})", self.0)
}
fn __str__(&self) -> String {
self.0.to_string()
}
}
pub fn connection_submodule(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyConnectionId>()?;
Ok(())
}

View File

@@ -1,160 +0,0 @@
use crate::ext::ResultExt;
use libp2p::identity::{ecdsa, Keypair};
use libp2p::PeerId;
use pyo3::prelude::{PyBytesMethods, PyModule, PyModuleMethods};
use pyo3::types::PyBytes;
use pyo3::{pyclass, pymethods, Bound, PyObject, PyResult, Python};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
/// TODO: documentation...
#[gen_stub_pyclass]
#[pyclass(name = "Keypair")]
#[repr(transparent)]
pub struct PyKeypair(pub Keypair);
#[gen_stub_pymethods]
#[pymethods]
#[allow(clippy::needless_pass_by_value)]
impl PyKeypair {
/// TODO: documentation
#[staticmethod]
fn generate_ed25519() -> Self {
Self(Keypair::generate_ed25519())
}
/// TODO: documentation
#[staticmethod]
fn generate_ecdsa() -> Self {
Self(Keypair::generate_ecdsa())
}
/// TODO: documentation
#[staticmethod]
fn generate_secp256k1() -> Self {
Self(Keypair::generate_secp256k1())
}
/// TODO: documentation
#[staticmethod]
fn from_protobuf_encoding(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let bytes = Vec::from(bytes.as_bytes());
Ok(Self(Keypair::from_protobuf_encoding(&bytes).pyerr()?))
}
/// TODO: documentation
#[staticmethod]
fn rsa_from_pkcs8(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let mut bytes = Vec::from(bytes.as_bytes());
Ok(Self(Keypair::rsa_from_pkcs8(&mut bytes).pyerr()?))
}
/// TODO: documentation
#[staticmethod]
fn secp256k1_from_der(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let mut bytes = Vec::from(bytes.as_bytes());
Ok(Self(Keypair::secp256k1_from_der(&mut bytes).pyerr()?))
}
/// TODO: documentation
#[staticmethod]
fn ed25519_from_bytes(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let mut bytes = Vec::from(bytes.as_bytes());
Ok(Self(Keypair::ed25519_from_bytes(&mut bytes).pyerr()?))
}
/// TODO: documentation
#[staticmethod]
fn ecdsa_from_bytes(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let bytes = Vec::from(bytes.as_bytes());
Ok(Self(Keypair::from(ecdsa::Keypair::from(
ecdsa::SecretKey::try_from_bytes(bytes).pyerr()?,
))))
}
/// TODO: documentation
fn to_protobuf_encoding<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyBytes>> {
let bytes = self.0.to_protobuf_encoding().pyerr()?;
Ok(PyBytes::new(py, &bytes))
}
/// TODO: documentation
fn to_peer_id(&self) -> PyPeerId {
PyPeerId(self.0.public().to_peer_id())
}
// /// Hidden constructor for pickling support. TODO: figure out how to do pickling...
// #[gen_stub(skip)]
// #[new]
// fn py_new(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
// Self::from_protobuf_encoding(bytes)
// }
//
// #[gen_stub(skip)]
// fn __setstate__(&mut self, state: Bound<'_, PyBytes>) -> PyResult<()> {
// *self = Self::from_protobuf_encoding(state)?;
// Ok(())
// }
//
// #[gen_stub(skip)]
// fn __getstate__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyBytes>> {
// self.to_protobuf_encoding(py)
// }
//
// #[gen_stub(skip)]
// pub fn __getnewargs__<'py>(&self, py: Python<'py>) -> PyResult<(Bound<'py, PyBytes>,)> {
// Ok((self.to_protobuf_encoding(py)?,))
// }
}
/// TODO: documentation...
#[gen_stub_pyclass]
#[pyclass(name = "PeerId")]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PyPeerId(pub PeerId);
#[gen_stub_pymethods]
#[pymethods]
#[allow(clippy::needless_pass_by_value)]
impl PyPeerId {
/// TODO: documentation
#[staticmethod]
fn random() -> Self {
Self(PeerId::random())
}
/// TODO: documentation
#[staticmethod]
fn from_bytes(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let bytes = Vec::from(bytes.as_bytes());
Ok(Self(PeerId::from_bytes(&bytes).pyerr()?))
}
/// TODO: documentation
fn to_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
let bytes = self.0.to_bytes();
PyBytes::new(py, &bytes)
}
/// TODO: documentation
fn to_base58(&self) -> String {
self.0.to_base58()
}
/// TODO: documentation
fn __repr__(&self) -> String {
format!("PeerId({})", self.to_base58())
}
/// TODO: documentation
fn __str__(&self) -> String {
self.to_base58()
}
}
pub fn ident_submodule(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyKeypair>()?;
m.add_class::<PyPeerId>()?;
Ok(())
}

View File

@@ -1,3 +0,0 @@
pub mod connection;
pub mod ident;
pub mod multiaddr;

View File

@@ -1,81 +0,0 @@
use crate::ext::ResultExt;
use libp2p::Multiaddr;
use pyo3::prelude::{PyBytesMethods, PyModule, PyModuleMethods};
use pyo3::types::PyBytes;
use pyo3::{Bound, PyResult, Python, pyclass, pymethods};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use std::str::FromStr;
/// TODO: documentation...
#[gen_stub_pyclass]
#[pyclass(name = "Multiaddr")]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PyMultiaddr(pub Multiaddr);
#[gen_stub_pymethods]
#[pymethods]
#[allow(clippy::needless_pass_by_value)]
impl PyMultiaddr {
/// TODO: documentation
#[staticmethod]
fn empty() -> Self {
Self(Multiaddr::empty())
}
/// TODO: documentation
#[staticmethod]
fn with_capacity(n: usize) -> Self {
Self(Multiaddr::with_capacity(n))
}
/// TODO: documentation
#[staticmethod]
fn from_bytes(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let bytes = Vec::from(bytes.as_bytes());
Ok(Self(Multiaddr::try_from(bytes).pyerr()?))
}
/// TODO: documentation
#[staticmethod]
fn from_string(string: String) -> PyResult<Self> {
Ok(Self(Multiaddr::from_str(&string).pyerr()?))
}
/// TODO: documentation
fn len(&self) -> usize {
self.0.len()
}
/// TODO: documentation
fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// TODO: documentation
fn to_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
let bytes = self.0.to_vec();
PyBytes::new(py, &bytes)
}
/// TODO: documentation
fn to_string(&self) -> String {
self.0.to_string()
}
#[gen_stub(skip)]
fn __repr__(&self) -> String {
format!("Multiaddr({})", self.0)
}
#[gen_stub(skip)]
fn __str__(&self) -> String {
self.to_string()
}
}
pub fn multiaddr_submodule(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyMultiaddr>()?;
Ok(())
}

View File

@@ -1,54 +0,0 @@
#[cfg(test)]
mod tests {
use core::mem::drop;
use core::option::Option::Some;
use core::time::Duration;
use tokio;
use tokio::sync::mpsc;
#[tokio::test]
async fn test_drop_channel() {
struct Ping;
let (tx, mut rx) = mpsc::channel::<Ping>(10);
let _ = tokio::spawn(async move {
println!("TASK: entered");
loop {
tokio::select! {
result = rx.recv() => {
match result {
Some(_) => {
println!("TASK: pinged");
}
None => {
println!("TASK: closing channel");
break;
}
}
}
_ = tokio::time::sleep(Duration::from_secs_f32(0.1)) => {
println!("TASK: heartbeat");
}
}
}
println!("TASK: exited");
});
let tx2 = tx.clone();
tokio::time::sleep(Duration::from_secs_f32(0.11)).await;
tx.send(Ping).await.expect("Should not fail");
drop(tx);
tokio::time::sleep(Duration::from_secs_f32(0.11)).await;
tx2.send(Ping).await.expect("Should not fail");
drop(tx2);
tokio::time::sleep(Duration::from_secs_f32(0.11)).await;
}
}

View File

@@ -1,129 +0,0 @@
import asyncio
import time
import pytest
from exo_pyo3_bindings import ConnectionUpdate, Keypair, DiscoveryService
# # => `tokio::mpsc` channels are closed when all `Sender` are dropped, or when `Receiver::close` is called
# # => the only sender is `KillableTaskHandle.sender: Option<Option<Sender<KillKillableTask>>>`
# # => integrate with https://pyo3.rs/v0.25.1/class/protocols.html#garbage-collector-integration
# # => set `sender` to `None` to drop the `Sender` & therefore trigger an automatic cleanup
# # => TODO: there could be a bug where dropping `Sender` won't close the channel in time bc of unprocessed events
# # so the handle drops and asyncio loop closes BEFORE the task dies...
# # might wanna figure out some kind of `oneshot` "shutdown confirmed" blocking mechanism or something...??
# # => also there is "cancellable futures" stuff ?? => https://pyo3.rs/main/async-await.html
# #
# # For now, always explicitly call cleanup functions to avoid crashes
# # in the future research tighter integration for automatic cleanup and safety!!!
# # also look into `pyo3_async_runtimes::tokio::get_runtime()` for blocking calls in Rust
# @pytest.mark.asyncio
# async def test_handle_kill() -> None:
# print("PYTHON: starting handle")
# h: KillableTaskHandle = killable_task_spawn()
# time.sleep(0.35)
# # for i in range(0, 4):
# # print(f"PYTHON: waiting... {i}")
# # time.sleep(0.11)
# # print("PYTHON: killing task")
# # h.kill_task()
# def test_keypair_creation() -> None:
# kp = Keypair.generate_ecdsa()
# kp_protobuf = kp.to_protobuf_encoding()
# print(kp_protobuf)
# kp = Keypair.from_protobuf_encoding(kp_protobuf)
# assert kp.to_protobuf_encoding() == kp_protobuf
@pytest.mark.asyncio
async def test_discovery_callbacks() -> None:
ident = Keypair.generate_ed25519()
service = DiscoveryService(ident)
a = _add_connected_callback(service)
d = _add_disconnected_callback(service)
# stream_get_a, stream_put = _make_iter()
# service.add_connected_callback(stream_put)
#
# stream_get_d, stream_put = _make_iter()
# service.add_disconnected_callback(stream_put)
# async for c in stream_get_a:
# await connected_callback(c)
for i in range(0, 10):
print(f"PYTHON: tick {i} of 10")
await asyncio.sleep(1)
print(service, a, d) # only done to prevent GC... TODO: come up with less hacky solution
def _add_connected_callback(d: DiscoveryService):
stream_get, stream_put = _make_iter()
d.add_connected_callback(stream_put)
async def run():
async for c in stream_get:
await connected_callback(c)
return asyncio.create_task(run())
def _add_disconnected_callback(d: DiscoveryService):
stream_get, stream_put = _make_iter()
async def run():
async for c in stream_get:
await disconnected_callback(c)
d.add_disconnected_callback(stream_put)
return asyncio.create_task(run())
async def connected_callback(e: ConnectionUpdate) -> None:
print(f"\n\nPYTHON: Connected callback: {e.peer_id}, {e.connection_id}, {e.local_addr}, {e.send_back_addr}")
print(
f"PYTHON: Connected callback: {e.peer_id.__repr__()}, {e.connection_id.__repr__()}, {e.local_addr.__repr__()}, {e.send_back_addr.__repr__()}\n\n")
async def disconnected_callback(e: ConnectionUpdate) -> None:
print(f"\n\nPYTHON: Disconnected callback: {e.peer_id}, {e.connection_id}, {e.local_addr}, {e.send_back_addr}")
print(
f"PYTHON: Disconnected callback: {e.peer_id.__repr__()}, {e.connection_id.__repr__()}, {e.local_addr.__repr__()}, {e.send_back_addr.__repr__()}\n\n")
def _foo_task() -> None:
print("PYTHON: This simply runs in asyncio context")
def _make_iter():
loop = asyncio.get_event_loop()
queue: asyncio.Queue[ConnectionUpdate] = asyncio.Queue()
def put(c: ConnectionUpdate) -> None:
loop.call_soon_threadsafe(queue.put_nowait, c)
async def get():
while True:
yield await queue.get()
return get(), put
# async def inputstream_generator(channels=1, **kwargs):
# """Generator that yields blocks of input data as NumPy arrays."""
# q_in = asyncio.Queue()
# loop = asyncio.get_event_loop()
#
# def callback(indata, frame_count, time_info, status):
# loop.call_soon_threadsafe(q_in.put_nowait, (indata.copy(), status))
#
# stream = sd.InputStream(callback=callback, channels=channels, **kwargs)
# with stream:
# while True:
# indata, status = await q_in.get()
# yield indata, status

View File

@@ -1,41 +0,0 @@
[package]
name = "master_election"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
name = "master_election"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
# macro dependencies
extend = { workspace = true }
delegate = { workspace = true }
impl-trait-for-tuples = { workspace = true }
derive_more = { workspace = true }
# Async
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
# utility dependencies
#util = { workspace = true }
#fn_pipe = { workspace = true }
thiserror = { workspace = true }
#internment = { workspace = true }
#recursion = { workspace = true }
#generativity = { workspace = true }
#itertools = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
keccak-const = { workspace = true }
# Data types
ordered-float = { workspace = true }
# Networking
libp2p = { workspace = true, features = ["full"] }

View File

@@ -1,36 +0,0 @@
use crate::cel::data::Map;
use crate::cel::{View, ID};
/// The number of neighbours of a process.
pub fn degree_centrality(known: &Map<ID, View>, id: ID) -> u32 {
todo!()
}
/// Measures average length of the shortest path between the vertex and all other vertices in the graph.
/// The more central is a vertex, the closer it is to all other vertices. The closeness centrality
/// characterizes the ability of a node to spread information over the graph.
///
/// Alex Balevas defined in 1950 the closeness centrality of a vertex as follows:
/// `C_C(x) = \frac{1}{ \sum_y d(x,y) }` where `d(x,y)` is the shortest path between `x` and `y`.
///
/// CEL paper uses this.
pub fn closeness_centrality(known: &Map<ID, View>, id: ID) -> u32 {
todo!()
}
/// Measures the number of times a vertex acts as a relay (router) along
/// shortest paths between other vertices. Even if previous authors
/// have intuitively described centrality as being based on betweenness,
/// betweenness centrality was formally defined by Freeman in 1977.
///
/// The betweenness of a vertex `x` is defined as the sum, for each pair
/// of vertices `(s, t)`, of the number of shortest paths from `s` to `t` that
/// pass through `x`, over the total number of shortest paths between
/// vertices `s` and `t`; it can be represented by the following formula:
/// `C_B(x) = \sum_{ s \neq x \neq t } \frac{ \sigma_{st}(x) }{ \sigma_{st} }`
/// where `\sigma_{st}` denotes the total number of shortest paths from vertex `s`
/// to vertex `t` (with `\sigma_{ss} = 1` by convention), and `\sigma_{st}(x)`
/// is the number of those shorter paths that pass through `x`.
pub fn betweenness_centrality(known: &Map<ID, View>, id: ID) -> u32 {
todo!()
}

View File

@@ -1,57 +0,0 @@
use crate::cel::messaging::data::Probability;
use crate::cel::KnowledgeMessage;
mod data {
use ordered_float::OrderedFloat;
use thiserror::Error;
#[derive(Error, Debug, Copy, Clone, PartialEq, PartialOrd)]
#[error("Floating number `{0}` is not a probability")]
#[repr(transparent)]
pub struct NotProbabilityError(f64);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
#[repr(transparent)]
pub struct Probability(OrderedFloat<f64>);
impl Probability {
const MIN_P: OrderedFloat<f64> = OrderedFloat(0.0);
const MAX_P: OrderedFloat<f64> = OrderedFloat(1.0);
pub fn new(p: f64) -> Result<Self, NotProbabilityError> {
let p = OrderedFloat(p);
if Self::MIN_P <= p && p <= Self::MAX_P {
Ok(Self(p))
} else {
Err(NotProbabilityError(p.0))
}
}
pub const fn into_f64(self) -> f64 {
self.0.0
}
}
impl From<Probability> for f64 {
fn from(value: Probability) -> Self {
value.into_f64()
}
}
impl TryFrom<f64> for Probability {
type Error = NotProbabilityError;
fn try_from(value: f64) -> Result<Self, Self::Error> {
Self::new(value)
}
}
}
/// Haas et al. proposed several gossip protocols for *ad hoc networks* that use probabilities.
/// Combined with the number of hops or the number of times the same message is received, the
/// protocols choose if a node broadcast a message to all its neighbors or not, reducing thus
/// the number of messages propagated in the system. The authors show that gossiping with a
/// probability between 0.6 and 0.8 ensures that almost every node of the system gets the message,
/// with up to 35% fewer messages in some networks compared to flooding.
pub fn local_broadcast(message: KnowledgeMessage, rho: Probability) {
//
}

View File

@@ -1,333 +0,0 @@
pub mod centrality;
pub mod messaging;
use crate::cel::data::{Map, Set};
use std::collections::VecDeque;
pub mod data {
use std::marker::PhantomData;
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct Set<V>(PhantomData<V>);
impl<V> Set<V> {
pub fn new() -> Self {
todo!()
}
pub fn add(&mut self, value: V) -> bool {
todo!()
}
pub fn remove(&mut self, value: V) {}
pub fn add_all(&mut self, other: &Set<V>) {}
pub fn values_mut(&mut self) -> &mut [V] {
todo!()
}
pub fn values(&self) -> &[V] {
todo!()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct Map<K, V>(PhantomData<(K, V)>);
impl<K, V> Map<K, V> {
pub fn new() -> Self {
todo!()
}
pub fn set(&mut self, key: K, value: V) {}
pub fn get(&self, key: K) -> &V {
todo!()
}
pub fn get_mut(&mut self, key: K) -> &mut V {
todo!()
}
pub fn kv_mut(&mut self) -> &mut [(K, V)] {
todo!()
}
pub fn contains_key(&self, key: K) -> bool {
todo!()
}
pub fn not_contains_key(&self, key: K) -> bool {
!self.contains_key(key)
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
#[repr(transparent)]
pub struct ID(pub u128);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
#[repr(transparent)]
pub struct Clock(pub u64);
impl Clock {
pub const ZERO: Self = Self(0);
pub const ONE: Self = Self(1);
pub fn plus_one(self) -> Self {
Self(self.0 + 1)
}
}
/// `CEL` uses a data structure called a `view`
///
/// A `view` associated to node is composed of two elements:
/// 1) A logical `clock` value, acting as a timestamp and incremented at each connection and disconnection.
/// 2) A set of node `identifiers`, which are the current neighbors of `i` (this node).
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct View {
/// Logical clock
clock: Clock,
/// Neighbors set
neigh: Set<ID>,
}
impl View {
pub fn new(clock: Clock, neigh: Set<ID>) -> Self {
Self { clock, neigh }
}
}
/// The only type of message exchanged between neighbors is the `knowledge` message.
/// It contains the current topological knowledge that the sender node has of the network,
/// i.e. its `known` variable.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct KnowledgeMessage {
pub known: Map<ID, View>,
}
/// Each node `i` maintains a local variable called `known`.
///
/// This variable represents the current topological knowledge that `i` has of its current
/// component (including itself). It is implemented as a map of `view` indexed by node `identifier`.
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct Node {
id: ID,
known: Map<ID, View>,
}
impl Node {
/// Firstly, node initializes its `known` variable with its own identifier (`i`),
/// and sets its logical clock to `0`.
pub fn initialization(this_id: ID) -> Self {
let mut neigh = Set::new(); // neigh = \{ i \}
neigh.add(this_id);
let mut known = Map::<ID, View>::new();
known.set(this_id, View::new(Clock::ZERO, neigh));
Self { id: this_id, known }
}
/// When a new node `j` appears in the transmission range of `i`, the crosslayer mechanism of
/// `i` detects `j`, and triggers the `Connection` method.
///
/// Node `j` is added to the neighbors set of node `i`. As the knowledge of has been updated,
/// its logical clock is incremented.
///
/// Since links are assumed bidirectional, i.e. the emission range equals the reception range,
/// if node `i` has no previous knowledge of `j`, the neighbor-aware mechanism adds both
/// `i` and `j` in the set of neighbors of `j`. Then, `i` sets the clock value of `j` to `1`,
/// as `i` was added to the knowledge of node `j`. On the other hand, if node `i` already has
/// information about `j`, `i` is added to the neighbors of `j`, and the logical clock of
/// node `j` is incremented.
///
/// Finally, by calling `LocalBroadcast` method, node `i` shares its
/// knowledge with `j` and informs its neighborhood of its new neighbor `j`.
/// Note that such a method sends a knowledge message to the neighbors
/// of node `i`, with a gossip probability `\rho`, as seen in `Section 2.8`.
/// However, for the first hop, `\rho` is set to `1` to make sure that all neighbors of `i`
/// will be aware of its new neighbor `j`. Note that the cross-layer mechanism
/// of node `j` will also trigger its `Connection` method, and the respective
/// steps will also be achieved on node `j`.
pub fn node_connection(&mut self, other_id: ID) {
let this_known = self.known.get_mut(self.id);
this_known.neigh.add(other_id);
this_known.clock = this_known.clock.plus_one();
if self.known.not_contains_key(other_id) {
let mut other_neigh = Set::new(); // neigh = \{ j, i \}
other_neigh.add(self.id);
other_neigh.add(other_id);
self.known.set(other_id, View::new(Clock::ONE, other_neigh));
} else {
let other_known = self.known.get_mut(other_id);
other_known.neigh.add(self.id);
other_known.clock = other_known.clock.plus_one();
}
// TODO: `LocalBroadcast(knowlege<known>, 1)`
}
/// When a node `j` disappears from the transmission range of node `i`,
/// the cross-layer mechanism stops receiving beacon messages at the
/// MAC level, and triggers the `Disconnection` method. Node `j` is
/// then removed from the knowledge of node `i`, and its clock
/// is incremented as its knowledge was modified.
///
/// Then, the neighbor-aware mechanism assumes that node `i` will also disconnect
/// from `j`. Therefore, `i` is removed from the neighborhood of `j` in the
/// knowledge of node `i`, and the corresponding clock is incremented.
///
/// Finally, node `i` broadcasts its updated knowledge to its neighbors.
pub fn node_disconected(&mut self, other_id: ID) {
let this_known = self.known.get_mut(self.id);
this_known.neigh.remove(other_id);
this_known.clock = this_known.clock.plus_one();
let other_known = self.known.get_mut(other_id);
other_known.neigh.remove(self.id);
other_known.clock = other_known.clock.plus_one();
// TODO: `LocalBroadcast(knowlege<known>, 1)`
}
/// When node receives a knowledge message `known_j`, from node `j`,
/// it looks at each node `n` included in `known_j`. If `n` is an
/// unknown node for `i`, or if `n` is known by node `i` and has a
/// more recent clock value in `known_j`, the clock and neighbors of
/// node `n` are updated in the knowledge of `i`.
///
/// Note that a clock value of `n` higher than the one currently known by
/// node `i` means that node `n` made some connections and/or
/// disconnections of which node `i` is not aware. Then, the `UpdateNeighbors`
/// method is called to update the knowledge of `i` regarding the neighbors
/// of `n`. If the clock value of node `n` is identical to the one of
/// both the knowledge of node `i` and `known_j`, the neighbor-aware
/// mechanism merges the neighbors of node `n` from `known_j` with the
/// known neighbors of `n` in the knowledge of `i`.
///
/// Remark that the clock of node `n` is not updated by the neighbor-aware
/// mechanism, otherwise, `n` would not be able to override this view in the
/// future with more recent information. The `UpdateNeighbors` method is
/// then called. Finally, node `i` broadcasts its knowledge only if
/// this latter was modified.
pub fn receive_knowledge(
&mut self,
other_id: ID,
KnowledgeMessage {
known: mut other_known,
}: KnowledgeMessage,
) {
let mut this_known_updated = false;
for (n, other_known_n) in other_known.kv_mut() {
if self.known.not_contains_key(*n) || other_known_n.clock > self.known.get(*n).clock {
self.known.set(*n, other_known_n.clone());
// TODO: UpdateNeighbors(known_j, n)
} else if other_known_n.clock == self.known.get(*n).clock {
self.known.get_mut(*n).neigh.add_all(&other_known_n.neigh);
// TODO: UpdateNeighbors(known_j, n)
}
}
// TODO: figure out what constitutes "updated", i.e. should any of the two branches count?
// or should each atomic update-op be checked for "change"??
if this_known_updated {
// TODO: TopologicalBroadcast()
}
}
/// The `UpdateNeighbors` method updates the knowledge of `i` with
/// information about the neighbors of node `n`. If the neighbor `k`
/// is an unknown node for `i`, or if `k` is known by `i` but has a
/// more recent clock value in `known_j` (line 38), the clock and neighbors
/// of node `k` are added or updated in the knowledge of node `i`.
/// Otherwise, if the clock of node `k` is identical in the knowledge of node
/// `i` and in `known_j`, the neighbor-aware mechanism merges the
/// neighbors of node `k` in the knowledge of `i`.
fn update_neighbors(&mut self, other_known: &mut Map<ID, View>, n: ID) {
for k in other_known.get(n).neigh.values() {
if self.known.not_contains_key(*k)
|| other_known.get(*k).clock > self.known.get(*k).clock
{
self.known.set(*k, other_known.get(*k).clone());
} else if other_known.get(*k).clock == self.known.get(*k).clock {
self.known
.get_mut(*k)
.neigh
.add_all(&other_known.get(*k).neigh);
}
}
}
/// The `TopologicalBroadcast` method uses a self-pruning approach to broadcast
/// or not the updated knowledge of node `i`, after the reception of a `knowledge`
/// from a neighbor `j`. To this end, node `i` checks whether each of its neighbors
/// has the same neighborhood as itself. In this case, node `n` is supposed to have
/// also received the knowledge message from neighbor node `j`. Therefore, among the
/// neighbors having the same neighborhood than `i`, only the one with
/// the smallest identifier will broadcast the knowledge, with a
/// gossip probability `\rho`. Note that this topological self-pruning
/// mechanism reaches the same neighborhood as multiple broadcasts.
fn topological_broadcast(&self) {
for n in self.known.get(self.id).neigh.values() {
// TODO: ensure this is a value-equality comparison
if self.known.get(*n).neigh == self.known.get(self.id).neigh {
if *n < self.id {
return;
}
}
}
// TODO: `LocalBroadcast(knowlege<known>, \rho)`
}
/// The leader is elected when a process running on node `i` calls the `Leader`
/// function. This function returns the most central leader in the component
/// according the closeness centrality, as seen in Section 2.7, using the
/// knowledge of node `i`. The closeness centrality is chosen instead of the
/// betweenness centrality, because it is faster to compute and requires fewer
/// computational steps, therefore consuming less energy from the mobile node
/// batteries than the latter.
///
/// First, node `i` rebuilds its component according to its topological knowledge.
/// To do so, it computes the entire set of reachable nodes, by adding
/// neighbors, neighbors of its neighbors, and so on.
/// Then, it evaluates the shortest distance between each reachable node and the
/// other ones, and computes the closeness centrality for each of them.
/// Finally, it returns the node having the highest closeness value as the
/// leader. The highest node identifier is used to break ties among
/// identical centrality values. If all nodes of the component have the same
/// topological knowledge, the `Leader()` function will return the same leader
/// node when invoked. Otherwise, it may return different leader nodes.
/// However, when the network topology stops changing, the algorithm
/// ensures that all nodes of a component will eventually have the same
/// topological knowledge and therefore, the `Leader()` function will return
/// the same leader node for all of them.
fn leader(&self) -> ID {
// this just computes the transitive closure of the adj-list graph starting from node `i`
// TODO: its an inefficient BFS impl, swap to better later!!!
let mut component = Set::new();
let mut process_queue =
VecDeque::from_iter(self.known.get(self.id).neigh.values().iter().cloned());
while let Some(j) = process_queue.pop_front() {
let successfully_added = component.add(j);
// was already processed, so don't add neighbors
if !successfully_added {
continue;
}
process_queue.extend(self.known.get(j).neigh.values().iter().cloned());
}
let leader: ID = todo!(); // TODO: `Max (ClosenessCentrality (component))`
return leader;
}
}

View File

@@ -1,35 +0,0 @@
//! Communicator is an abstraction that allows me to "mock" speaking to the network
//!
use crate::participant::{Participant, ParticipantId};
use crate::ElectionMessage;
pub trait Communicator {
fn all_participants(&self) -> &[ParticipantId];
fn broadcast_message(&self, message: ElectionMessage, recipients: &[ParticipantId]) -> ();
fn register_participant(&mut self, participant: &Participant) -> ParticipantId;
}
mod communicator_impls {
macro_rules! as_ref_impl {
() => {
#[inline]
fn all_participants(&self) -> &[ParticipantId] {
self.as_ref().all_participants()
}
#[inline]
fn broadcast_message(&self, message: Message, recipients: &[ParticipantId]) {
self.as_ref().broadcast_message(message, recipients);
}
};
}
// impl<C: Communicator> Communicator for Box<C> {
// as_ref_impl!();
// }
//
// impl<C: Communicator> Communicator for Arc<C> {
// as_ref_impl!();
// }
}

View File

@@ -1,44 +0,0 @@
//! TODO: crate documentation
//!
//! this is here as a placeholder documentation
//!
//!
// enable Rust-unstable features for convenience
#![feature(trait_alias)]
// #![feature(stmt_expr_attributes)]
// #![feature(unboxed_closures)]
// #![feature(assert_matches)]
// #![feature(async_fn_in_dyn_trait)]
// #![feature(async_for_loop)]
// #![feature(auto_traits)]
// #![feature(negative_impls)]
use crate::participant::ParticipantId;
pub mod cel;
mod communicator;
mod participant;
/// Namespace for all the type/trait aliases used by this crate.
pub(crate) mod alias {}
/// Namespace for crate-wide extension traits/methods
pub(crate) mod ext {}
pub(crate) mod private {
/// Sealed traits support
pub trait Sealed {}
impl<T: ?Sized> Sealed for T {}
}
pub enum ElectionMessage {
/// Announce election
Election {
candidate: ParticipantId,
},
Alive,
Victory {
coordinator: ParticipantId,
},
}

View File

@@ -1,203 +0,0 @@
use crate::communicator::Communicator;
use crate::ElectionMessage;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::{mpsc, Mutex};
// trait ParticipantState {} // TODO: make sealed or something??
//
// struct Coordinator; // TODO: change to master
// struct Candidate; // i.e. election candidate
// struct Transient; // transient state, e.g. waiting for election results, declaring themselves winner, etc
// struct Follower; // i.e. a follower of an existing coordinator
//
// mod participant_impl {
// use crate::participant::{Candidate, Coordinator, Follower, ParticipantState, Transient};
//
// impl ParticipantState for Coordinator {}
// impl ParticipantState for Candidate {}
// impl ParticipantState for Transient {}
// impl ParticipantState for Follower {}
// }
pub type ParticipantSelf = Arc<Mutex<Participant>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct ParticipantId(pub u128);
#[derive(Debug, Clone, Copy)]
pub enum ParticipantState {
Coordinator, // i.e. master
ElectionCandidate, // after noticing a master went down, become candidate and `Election` message to all nodes higher than itself
Waiting, // when lower nodes are waiting for results of an election to conclude
Follower { id: ParticipantId }, // when a participant is following a coordinator
Transient, // when the participant is in a neutral/uninitialized state
}
pub struct Participant {
id: ParticipantId,
state: ParticipantState,
on_message_sent: Vec<Box<dyn FnOnce(ElectionMessage, ParticipantId)>>,
}
mod impls {
use crate::participant::{Participant, ParticipantId, ParticipantSelf, ParticipantState};
use crate::ElectionMessage;
impl Participant {
pub fn new_with(id: ParticipantId, state: ParticipantState) -> Self {
Self {
id,
state,
on_message_sent: vec![],
}
}
pub fn add_on_message_sent<F>(&mut self, callback: F)
where
F: FnOnce(ElectionMessage, ParticipantId) + Send + 'static,
{
self.on_message_sent.push(Box::new(callback));
}
pub async fn receive_message(mut self_: ParticipantSelf, message: ElectionMessage) {
let foo = self_.lock_owned().await;
}
}
}
pub const TASK_CHANNEL_SIZE: usize = 8;
pub const ELECTION_VICTORY_TIMEOUT: Duration = Duration::from_secs(1);
pub const VICTORY_WAITING_TIMEOUT: Duration = Duration::from_secs(1);
pub const HEARTBEAT_RECEIVE_TIMEOUT: Duration = Duration::from_secs(2);
pub const HEARTBEAT_SEND_TIMEOUT: Duration = Duration::from_secs(1);
pub enum InMessage {
ElectionMessage(ElectionMessage),
Heartbeat,
}
pub enum OutMessage {
ElectionMessage(ElectionMessage),
Heartbeat,
}
#[derive(Error, Debug)]
pub enum ParticipantError {
#[error("could not send out-message: `{0}`")]
SendError(#[from] mpsc::error::SendError<OutMessage>),
}
pub async fn participant_task<C: Communicator>(
mut in_channel: mpsc::Receiver<InMessage>,
out_channel: mpsc::Sender<OutMessage>,
communicator: C,
) -> Result<(), ParticipantError> {
// task state
let participant_id: ParticipantId = ParticipantId(1234u128); // TODO: replace with dependency injection
let mut participant_state: ParticipantState = ParticipantState::Transient;
// TODO: slot this logic into this somewhere...
// 4. If P receives an Election message from another process with a lower ID it sends an Answer message
// back and if it has not already started an election, it starts the election process at the beginning,
// by sending an Election message to higher-numbered processes.
loop {
match participant_state {
ParticipantState::Transient => {
// When a process P recovers from failure, or the failure detector indicates
// that the current coordinator has failed, P performs the following actions:
//
// 1A) If P has the highest process ID, it sends a Victory message to all other
// processes and becomes the new Coordinator.
let max_id = communicator
.all_participants()
.iter()
.max()
.unwrap_or(&ParticipantId(0u128));
if max_id <= &participant_id {
participant_state = ParticipantState::Coordinator;
communicator.broadcast_message(
ElectionMessage::Victory {
coordinator: participant_id,
},
communicator.all_participants(),
);
continue;
}
// 1B) Otherwise, P broadcasts an Election message to all other processes with
// higher process IDs than itself
participant_state = ParticipantState::ElectionCandidate;
communicator.broadcast_message(
ElectionMessage::Election {
candidate: participant_id,
},
&communicator
.all_participants()
.iter()
.filter(|&p| p > &participant_id)
.copied()
.collect::<Vec<_>>(),
);
}
ParticipantState::ElectionCandidate => {
tokio::select! {
// 2. If P receives no Answer after sending an Election message, then it broadcasts
// a Victory message to all other processes and becomes the Coordinator.
_ = tokio::time::sleep(ELECTION_VICTORY_TIMEOUT) => {
participant_state = ParticipantState::Coordinator;
communicator.broadcast_message(
ElectionMessage::Victory {
coordinator: participant_id,
},
communicator.all_participants(),
);
}
// 3A. If P receives an Answer from a process with a higher ID, it sends no further
// messages for this election and waits for a Victory message. (If there is no Victory
// message after a period of time, it restarts the process at the beginning.)
Some(InMessage::ElectionMessage(ElectionMessage::Alive)) = in_channel.recv() => {
participant_state = ParticipantState::Waiting;
} // TODO: handle all other branches, e.g. channel closure, different messages & so on
}
}
ParticipantState::Waiting => {
tokio::select! {
// 3B. If there is no Victory message after a period of time, it restarts the process
// at the beginning.
_ = tokio::time::sleep(VICTORY_WAITING_TIMEOUT) => {
participant_state = ParticipantState::Transient;
}
// 5. If P receives a Victory message, it treats the sender as the coordinator.
Some(InMessage::ElectionMessage(ElectionMessage::Victory { coordinator })) = in_channel.recv() => {
participant_state = ParticipantState::Follower { id: coordinator };
} // TODO: handle all other branches, e.g. channel closure, different messages & so on
}
}
ParticipantState::Follower { id: coordinator_id } => {
tokio::select! {
// If we do not receive a heartbeat from the coordinator, trigger new election
_ = tokio::time::sleep(VICTORY_WAITING_TIMEOUT) => {
participant_state = ParticipantState::Transient;
}
// If we do receive a heartbeat - keep going
Some(InMessage::Heartbeat) = in_channel.recv() => {
} // TODO: handle all other branches, e.g. channel closure, different messages & so on
}
}
ParticipantState::Coordinator => {
// If we are coordinator - send heart beats
{
out_channel.send(OutMessage::Heartbeat).await?;
tokio::time::sleep(HEARTBEAT_SEND_TIMEOUT).await;
}
}
}
}
}

View File

@@ -1,8 +0,0 @@
// maybe this will hold test in the future...??
#[cfg(test)]
mod tests {
#[test]
fn does_nothing() {
}
}

View File

@@ -1,2 +0,0 @@
[toolchain]
channel = "nightly"

View File

@@ -1,26 +0,0 @@
[package]
name = "util"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
name = "util"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
# macro dependencies
extend = { workspace = true }
# utility dependencies
thiserror = { workspace = true }
once_cell = { workspace = true }
internment = { workspace = true }
derive_more = { workspace = true }
bon = { workspace = true }
recursion = { workspace = true }
fn_pipe = { workspace = true }

View File

@@ -1,16 +0,0 @@
[package]
name = "fn_pipe"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
name = "fn_pipe"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
fn_pipe_proc = { workspace = true }

View File

@@ -1,20 +0,0 @@
[package]
name = "fn_pipe_proc"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
name = "fn_pipe_proc"
path = "src/lib.rs"
proc-macro = true
[lints]
workspace = true
[dependencies]
extend = { workspace = true }
syn = { workspace = true }
quote = { workspace = true }
proc-macro2 = { workspace = true }
darling = { workspace = true }

View File

@@ -1,201 +0,0 @@
//! Proc-macro for implementing `Fn/Pipe*` variants for tuples of a given size;
//! it is only here for this one purpose and no other, should not be used elsewhere
#![allow(clippy::arbitrary_source_item_ordering)]
extern crate proc_macro;
use extend::ext;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, LitInt};
type TokS2 = proc_macro2::TokenStream;
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::arithmetic_side_effects,
clippy::missing_panics_doc,
clippy::too_many_lines
)]
#[proc_macro]
pub fn impl_fn_pipe_for_tuple(item: TokenStream) -> TokenStream {
// DEFINE CONSTANT TOKEN STREAMS UPFRONT
// token streams for Fn/Pipe* variants
let fn_pipe_names = (
(
"Fn".parse_unchecked(),
"FnPipe".parse_unchecked(),
"run".parse_unchecked(),
"call".parse_unchecked(),
),
(
"FnMut".parse_unchecked(),
"FnMutPipe".parse_unchecked(),
"run_mut".parse_unchecked(),
"call_mut".parse_unchecked(),
),
(
"FnOnce".parse_unchecked(),
"FnOncePipe".parse_unchecked(),
"run_once".parse_unchecked(),
"call_once".parse_unchecked(),
),
);
// get the number of tuple parameters to implement this for
let max_tuple_size = match parse_macro_input!(item as LitInt).base10_parse::<usize>() {
Ok(num) => num,
Err(e) => return e.to_compile_error().into(),
};
assert!(
max_tuple_size > 0,
"passed parameter must be greater than zero"
);
// generate generic function type-names, to be used later everywhere
let mut fn_type_names = Vec::with_capacity(max_tuple_size);
for i in 0..max_tuple_size {
fn_type_names.push(format!("_{i}").parse_unchecked());
}
// create a middle type constraint (i.e. not the first one)
let middle_type_constraint = |prev_fn: TokS2, this_fn: TokS2, fn_name: TokS2| {
quote! {
#this_fn: #fn_name<(#prev_fn::Output,)>
}
};
// create call implementation
let impl_call = |n: usize, call: TokS2, base: TokS2| {
let tuple_access = format!("self.{n}").parse_unchecked();
quote! {
#tuple_access.#call((#base,))
}
};
// generic impl block parametrised on the variant and number of params
let impl_per_type_and_n = |n: usize,
(fn_name, fn_pipe_name, run, call): (TokS2, TokS2, TokS2, TokS2),
extra: Option<TokS2>,
ref_style: Option<TokS2>| {
// flatten the extra tokens
let extra = extra.unwrap_or_default();
let fn_type_names_comma_sep = &fn_type_names[0..n].comma_separated();
// get name of first type and create the type constraint for the fist type
let first_fn_type = fn_type_names[0].clone();
let first_type_constraint = quote! {
#first_fn_type: #fn_name<Args>
};
// create the middle type constraint implementations
let middle_type_constraints = (1..n)
.map(|i| {
// get previous and current tokens
let prev_fn = fn_type_names[i - 1].clone();
let this_fn = fn_type_names[i].clone();
// create middle implementation
middle_type_constraint(prev_fn, this_fn, fn_name.clone())
})
.collect::<Vec<_>>();
// combine the two, and comma-separate them into a single block
let type_constraints = [vec![first_type_constraint], middle_type_constraints]
.concat()
.as_slice()
.comma_separated();
// recursive call implementation starting from the base
let mut call_impl = quote! { self.0 .#call(args) };
for i in 1..n {
call_impl = impl_call(i, call.clone(), call_impl);
}
quote! {
#[allow(clippy::type_repetition_in_bounds)]
impl<Args: Tuple, #fn_type_names_comma_sep: ?Sized> #fn_pipe_name<Args> for (#fn_type_names_comma_sep,)
where #type_constraints
{
#extra
#[inline]
extern "rust-call" fn #run(#ref_style self, args: Args) -> Self::Output {
#call_impl
}
}
}
};
// generic impl block parametrised on the number of params
let impl_per_n = |n: usize| {
// create the `Fn/FnPipe` implementation
let mut impl_per_n =
impl_per_type_and_n(n, fn_pipe_names.0.clone(), None, Some(quote! { & }));
// create the `FnMut/FnMutPipe` implementation
impl_per_n.extend(impl_per_type_and_n(
n,
fn_pipe_names.1.clone(),
None,
Some(quote! { &mut }),
));
// create the `FnOnce/FnOncePipe` implementation;
// this implementation additionally needs to specify the associated `type Output`
let last = fn_type_names[n - 1].clone();
impl_per_n.extend(impl_per_type_and_n(
n,
fn_pipe_names.2.clone(),
Some(quote! {
type Output = #last::Output;
}),
None,
));
impl_per_n
};
// we need to implement for all tuple sizes 1 through-to `n`
let mut impls = TokS2::new();
for n in 1..=max_tuple_size {
impls.extend(impl_per_n(n));
}
// return all the impls
impls.into()
}
#[ext]
impl [TokS2] {
#[allow(clippy::unwrap_used, clippy::single_call_fn)]
fn comma_separated(&self) -> TokS2 {
let comma_tok = ",".parse_unchecked();
// get the first token, and turn it into an accumulator
let mut toks = self.iter();
let mut tok: TokS2 = toks.next().unwrap().clone();
// if there are more tokens to come, keep extending with comma
for next in toks {
tok.extend(comma_tok.clone());
tok.extend(next.clone());
}
// return final comma-separated result
tok
}
}
#[ext]
impl str {
fn parse_unchecked(&self) -> TokS2 {
match self.parse::<TokS2>() {
Ok(s) => s,
Err(e) => unimplemented!("{e}"),
}
}
}

View File

@@ -1,35 +0,0 @@
//! TODO: crate documentation
//!
//! this is here as a placeholder documentation
// enable Rust-unstable features for convenience
#![feature(tuple_trait)]
#![feature(unboxed_closures)]
#![feature(fn_traits)]
#![feature(unsized_fn_params)] // this is fine because I am PURELY wrapping around existing `Fn*` traits
// global lints
#![allow(internal_features)]
#![allow(clippy::arbitrary_source_item_ordering)]
use fn_pipe_proc::impl_fn_pipe_for_tuple;
use std::marker::Tuple;
/// A trait representing a pipe of functions, where the output of one will
/// be fed as the input of another, until the entire pipe ran
pub trait FnPipe<Args: Tuple>: FnMutPipe<Args> {
extern "rust-call" fn run(&self, args: Args) -> Self::Output;
}
pub trait FnMutPipe<Args: Tuple>: FnOncePipe<Args> {
extern "rust-call" fn run_mut(&mut self, args: Args) -> Self::Output;
}
pub trait FnOncePipe<Args: Tuple> {
type Output;
extern "rust-call" fn run_once(self, args: Args) -> Self::Output;
}
// implement `Fn/Pipe*` variants for tuples of upto length 26,
// can be increased in the future
impl_fn_pipe_for_tuple!(26usize);

View File

@@ -1,53 +0,0 @@
//! TODO: crate documentation
//!
//! this is here as a placeholder documentation
//!
//!
// enable Rust-unstable features for convenience
#![feature(trait_alias)]
#![feature(stmt_expr_attributes)]
#![feature(type_alias_impl_trait)]
#![feature(specialization)]
#![feature(unboxed_closures)]
#![feature(const_trait_impl)]
#![feature(fn_traits)]
pub mod nonempty;
pub(crate) mod private {
// sealed traits support
pub trait Sealed {}
impl<T: ?Sized> Sealed for T {}
}
/// Namespace for all the type/trait aliases used by this crate.
pub(crate) mod alias {
}
/// Namespace for crate-wide extension traits/methods
pub mod ext {
use extend::ext;
#[ext(pub, name = BoxedSliceExt)]
impl<T> Box<[T]> {
#[inline]
fn map<B, F>(self, f: F) -> Box<[B]>
where
F: FnMut(T) -> B,
{
self.into_iter().map(f).collect()
}
}
#[ext(pub, name = VecExt)]
impl<T> Vec<T> {
#[inline]
fn map<B, F>(self, f: F) -> Vec<B>
where
F: FnMut(T) -> B,
{
self.into_iter().map(f).collect()
}
}
}

View File

@@ -1,145 +0,0 @@
use fn_pipe::FnMutPipe;
use std::slice::SliceIndex;
use std::{ops, slice};
use thiserror::Error;
#[derive(Error, Debug)]
#[error("Cannot create to `NonemptyArray` because the supplied slice is empty")]
pub struct EmptySliceError;
/// A pointer to a non-empty fixed-size slice allocated on the heap.
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)]
pub struct NonemptyArray<T>(Box<[T]>);
#[allow(clippy::arbitrary_source_item_ordering)]
impl<T> NonemptyArray<T> {
#[inline]
pub fn singleton(value: T) -> Self {
Self(Box::new([value]))
}
#[allow(clippy::missing_errors_doc)]
#[inline]
pub fn try_from_boxed_slice<S: Into<Box<[T]>>>(
boxed_slice: S,
) -> Result<Self, EmptySliceError> {
let boxed_slice = boxed_slice.into();
if boxed_slice.is_empty() {
Err(EmptySliceError)
} else {
Ok(Self(boxed_slice))
}
}
#[must_use]
#[inline]
pub fn into_boxed_slice(self) -> Box<[T]> {
self.0
}
#[must_use]
#[inline]
pub fn to_vec(&self) -> Vec<T>
where
T: Clone,
{
self.0.to_vec()
}
#[must_use]
#[inline]
pub const fn as_slice(&self) -> &[T] {
&self.0
}
#[allow(clippy::indexing_slicing)]
#[must_use]
#[inline]
pub fn first(&self) -> &T {
&self.0[0]
}
#[allow(clippy::indexing_slicing, clippy::arithmetic_side_effects)]
#[must_use]
#[inline]
pub fn last(&self) -> &T {
&self.0[self.0.len() - 1]
}
#[must_use]
#[inline]
pub fn get<I>(&self, index: I) -> Option<&I::Output>
where
I: SliceIndex<[T]>,
{
self.0.get(index)
}
#[allow(clippy::len_without_is_empty)]
#[must_use]
#[inline]
pub const fn len(&self) -> usize {
self.0.len()
}
#[allow(clippy::iter_without_into_iter)]
#[inline]
pub fn iter(&self) -> slice::Iter<'_, T> {
self.0.iter()
}
#[allow(clippy::iter_without_into_iter)]
#[inline]
pub fn iter_mut(&mut self) -> slice::IterMut<'_, T> {
self.0.iter_mut()
}
#[inline]
#[must_use]
pub fn map<U, F: FnMut(T) -> U>(self, f: F) -> NonemptyArray<U> {
NonemptyArray(self.0.into_iter().map(f).collect())
}
#[inline]
#[must_use]
pub fn pipe<U, P: FnMutPipe(T) -> U>(self, mut p: P) -> NonemptyArray<U> {
self.map(|x| p.run_mut((x,)))
}
}
impl<T> From<NonemptyArray<T>> for Box<[T]> {
#[inline]
fn from(value: NonemptyArray<T>) -> Self {
value.into_boxed_slice()
}
}
impl<T> ops::Index<usize> for NonemptyArray<T> {
type Output = T;
#[inline]
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}
impl<T> IntoIterator for NonemptyArray<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<T>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.into_boxed_slice().into_vec().into_iter()
}
}
impl<'a, T> IntoIterator for &'a NonemptyArray<T> {
type Item = &'a T;
type IntoIter = slice::Iter<'a, T>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

View File

@@ -17,6 +17,8 @@ dependencies = [
"sqlmodel>=0.0.22",
"sqlalchemy[asyncio]>=2.0.0",
"greenlet>=3.2.3",
"cryptography>=44.0.0",
"base58>=2.1.1",
]
[build-system]

View File

@@ -41,3 +41,4 @@ class Host(BaseModel):
if not (0 <= v <= 65535):
raise ValueError("Port must be between 0 and 65535")
return v

View File

@@ -1,40 +1,228 @@
from __future__ import annotations
import hashlib
import logging
import os
from pathlib import Path
from typing import Any, Type
from typing import Any, Type, final
from exo_pyo3_bindings import Keypair
import base58
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from filelock import FileLock
from shared.constants import EXO_NODE_ID_KEYPAIR
@final
class PeerId:
"""
A libp2p peer identifier derived from a cryptographic public key.
Compatible with py-libp2p's PeerID interface.
"""
def __init__(self, peer_id_bytes: bytes) -> None:
self._bytes = peer_id_bytes
@staticmethod
def from_bytes(data: bytes) -> "PeerId":
"""Create PeerId from raw bytes."""
return PeerId(data)
@staticmethod
def from_public_key(public_key_bytes: bytes) -> "PeerId":
"""Create PeerId from a public key by hashing it."""
# For Ed25519 keys, libp2p uses the identity hash (no hashing) for keys <= 42 bytes
# Since Ed25519 public keys are 32 bytes, we use identity hash
if len(public_key_bytes) <= 42:
return PeerId(public_key_bytes)
else:
# For larger keys, use SHA-256
hash_digest = hashlib.sha256(public_key_bytes).digest()
return PeerId(hash_digest)
def to_bytes(self) -> bytes:
"""Return the raw bytes of this PeerId."""
return self._bytes
def to_base58(self) -> str:
"""Return the base58-encoded string representation."""
return base58.b58encode(self._bytes).decode('ascii')
def __str__(self) -> str:
"""Return the base58-encoded string representation."""
return self.to_base58()
def __repr__(self) -> str:
"""Return debug representation."""
return f"PeerId('{self.to_base58()}')"
def __eq__(self, other: object) -> bool:
"""Check equality with another PeerId."""
if not isinstance(other, PeerId):
return False
return self._bytes == other._bytes
def __hash__(self) -> int:
"""Make PeerId hashable."""
return hash(self._bytes)
@final
class Keypair:
"""
A py-libp2p compatible keypair implementation.
Provides the same interface as py-libp2p's KeyPair.
"""
def __init__(self, private_key: ed25519.Ed25519PrivateKey) -> None:
self._private_key = private_key
self._public_key = private_key.public_key()
@staticmethod
def generate_ed25519() -> "Keypair":
"""Generate a new Ed25519 keypair."""
private_key = ed25519.Ed25519PrivateKey.generate()
return Keypair(private_key)
@staticmethod
def from_protobuf_encoding(data: bytes) -> "Keypair":
"""
Deserialize a keypair from libp2p protobuf encoding.
Compatible with py-libp2p's serialization format.
"""
if len(data) < 2:
raise ValueError("Invalid protobuf data: too short")
# Simple protobuf parsing for our specific use case
# We expect: field 1 (type) as varint, field 2 (data) as bytes
offset = 0
# Parse type field (field tag 1, wire type 0 = varint)
if data[offset] != 0x08: # field 1, varint
raise ValueError("Expected type field")
offset += 1
key_type = data[offset]
offset += 1
if key_type != 1: # Ed25519
raise ValueError(f"Unsupported key type: {key_type}")
# Parse data field (field tag 2, wire type 2 = length-delimited)
if offset >= len(data) or data[offset] != 0x12: # field 2, bytes
raise ValueError("Expected data field")
offset += 1
# Parse length
data_length = data[offset]
offset += 1
if data_length not in (32, 64):
raise ValueError(f"Invalid Ed25519 private key length: {data_length}")
if offset + data_length > len(data):
raise ValueError("Truncated private key data")
key_data = data[offset:offset + data_length]
try:
if data_length == 64:
# libp2p format: 32 bytes private key seed + 32 bytes public key
private_key_seed = key_data[:32]
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_key_seed)
else:
# Raw 32-byte private key
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(key_data)
return Keypair(private_key)
except Exception as e:
raise ValueError(f"Invalid Ed25519 private key: {e}") from e
def to_protobuf_encoding(self) -> bytes:
"""
Serialize this keypair to libp2p protobuf encoding.
Compatible with py-libp2p's serialization format.
"""
private_key_bytes = self._private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
public_key_bytes = self._public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
# libp2p Ed25519 format: private key seed (32) + public key (32)
combined_key_data = private_key_bytes + public_key_bytes
# Build protobuf manually for our simple case
# Field 1 (type): tag=0x08, value=1 (Ed25519)
# Field 2 (data): tag=0x12, length=64, data=combined_key_data
result = bytearray()
result.extend([0x08, 0x01]) # field 1: type = 1 (Ed25519)
result.extend([0x12, 0x40]) # field 2: length = 64 bytes
result.extend(combined_key_data)
return bytes(result)
def to_peer_id(self) -> PeerId:
"""Generate a PeerId from this keypair's public key."""
public_key_bytes = self._public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
return PeerId.from_public_key(public_key_bytes)
def sign(self, data: bytes) -> bytes:
"""Sign data with this keypair's private key."""
return self._private_key.sign(data)
def verify(self, data: bytes, signature: bytes) -> bool:
"""Verify a signature against data using this keypair's public key."""
try:
self._public_key.verify(signature, data)
return True
except Exception:
return False
@property
def public_key_bytes(self) -> bytes:
"""Get the raw public key bytes."""
return self._public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
@property
def private_key_bytes(self) -> bytes:
"""Get the raw private key bytes."""
return self._private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
# py-libp2p compatibility properties
@property
def private_key(self) -> ed25519.Ed25519PrivateKey:
"""Access to the underlying private key for py-libp2p compatibility."""
return self._private_key
@property
def public_key(self) -> ed25519.Ed25519PublicKey:
"""Access to the underlying public key for py-libp2p compatibility."""
return self._public_key
def ensure_type[T](obj: Any, expected_type: Type[T]) -> T: # type: ignore
if not isinstance(obj, expected_type):
raise TypeError(f"Expected {expected_type}, got {type(obj)}") # type: ignore
return obj
# def make_async_iter[T]():
# """
# Creates a pair `<async-iter>, <put-to-iter>` of an asynchronous iterator
# and a synchronous function to put items into that iterator.
# """
#
# loop = asyncio.get_event_loop()
# queue: asyncio.Queue[T] = asyncio.Queue()
#
# def put(c: ConnectionUpdate) -> None:
# loop.call_soon_threadsafe(queue.put_nowait, (c,))
#
# async def get():
# while True:
# yield await queue.get()
#
# return get(), put
def get_node_id_keypair(path: str | bytes | os.PathLike[str] | os.PathLike[bytes] = EXO_NODE_ID_KEYPAIR) -> Keypair:
"""
Obtains the :class:`Keypair` associated with this node-ID.
@@ -54,11 +242,11 @@ def get_node_id_keypair(path: str | bytes | os.PathLike[str] | os.PathLike[bytes
try: # if decoded successfully, save & return
return Keypair.from_protobuf_encoding(protobuf_encoded)
except RuntimeError as e: # on runtime error, assume corrupt file
logging.warning(f"Encountered runtime error when trying to get keypair: {e}")
except ValueError as e: # on runtime error, assume corrupt file
logging.warning(f"Encountered error when trying to get keypair: {e}")
# if no valid credentials, create new ones and persist
with open(path, 'w+b') as f:
keypair = Keypair.generate_ed25519()
f.write(keypair.to_protobuf_encoding())
return keypair
return keypair

118
uv.lock generated
View File

@@ -15,7 +15,6 @@ members = [
"exo",
"exo-engine-mlx",
"exo-master",
"exo-pyo3-bindings",
"exo-shared",
"exo-worker",
]
@@ -125,6 +124,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/77/06/bb80f5f86020c4551da315d78b3ab75e8228f89f0162f2c3a819e407941a/attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3", size = 63815, upload-time = "2025-03-13T11:10:21.14Z" },
]
[[package]]
name = "base58"
version = "2.1.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/7f/45/8ae61209bb9015f516102fa559a2914178da1d5868428bd86a1b4421141d/base58-2.1.1.tar.gz", hash = "sha256:c5d0cb3f5b6e81e8e35da5754388ddcc6d0d14b6c6a132cb93d69ed580a7278c", size = 6528, upload-time = "2021-10-30T22:12:17.858Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4a/45/ec96b29162a402fc4c1c5512d114d7b3787b9d1c2ec241d9568b4816ee23/base58-2.1.1-py3-none-any.whl", hash = "sha256:11a36f4d3ce51dfc1043f3218591ac4eb1ceb172919cebe05b52a5bcc8d245c2", size = 5621, upload-time = "2021-10-30T22:12:16.658Z" },
]
[[package]]
name = "certifi"
version = "2025.7.14"
@@ -134,6 +142,26 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/4f/52/34c6cf5bb9285074dc3531c437b3919e825d976fde097a7a73f79e726d03/certifi-2025.7.14-py3-none-any.whl", hash = "sha256:6b31f564a415d79ee77df69d757bb49a5bb53bd9f756cbbe24394ffd6fc1f4b2", size = 162722, upload-time = "2025-07-14T03:29:26.863Z" },
]
[[package]]
name = "cffi"
version = "1.17.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pycparser", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/fc/97/c783634659c2920c3fc70419e3af40972dbaf758daa229a7d6ea6135c90d/cffi-1.17.1.tar.gz", hash = "sha256:1c39c6016c32bc48dd54561950ebd6836e1670f2ae46128f67cf49e789c52824", size = 516621, upload-time = "2024-09-04T20:45:21.852Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8d/f8/dd6c246b148639254dad4d6803eb6a54e8c85c6e11ec9df2cffa87571dbe/cffi-1.17.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f3a2b4222ce6b60e2e8b337bb9596923045681d71e5a082783484d845390938e", size = 182989, upload-time = "2024-09-04T20:44:28.956Z" },
{ url = "https://files.pythonhosted.org/packages/8b/f1/672d303ddf17c24fc83afd712316fda78dc6fce1cd53011b839483e1ecc8/cffi-1.17.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0984a4925a435b1da406122d4d7968dd861c1385afe3b45ba82b750f229811e2", size = 178802, upload-time = "2024-09-04T20:44:30.289Z" },
{ url = "https://files.pythonhosted.org/packages/0e/2d/eab2e858a91fdff70533cab61dcff4a1f55ec60425832ddfdc9cd36bc8af/cffi-1.17.1-cp313-cp313-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d01b12eeeb4427d3110de311e1774046ad344f5b1a7403101878976ecd7a10f3", size = 454792, upload-time = "2024-09-04T20:44:32.01Z" },
{ url = "https://files.pythonhosted.org/packages/75/b2/fbaec7c4455c604e29388d55599b99ebcc250a60050610fadde58932b7ee/cffi-1.17.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:706510fe141c86a69c8ddc029c7910003a17353970cff3b904ff0686a5927683", size = 478893, upload-time = "2024-09-04T20:44:33.606Z" },
{ url = "https://files.pythonhosted.org/packages/4f/b7/6e4a2162178bf1935c336d4da8a9352cccab4d3a5d7914065490f08c0690/cffi-1.17.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:de55b766c7aa2e2a3092c51e0483d700341182f08e67c63630d5b6f200bb28e5", size = 485810, upload-time = "2024-09-04T20:44:35.191Z" },
{ url = "https://files.pythonhosted.org/packages/c7/8a/1d0e4a9c26e54746dc08c2c6c037889124d4f59dffd853a659fa545f1b40/cffi-1.17.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c59d6e989d07460165cc5ad3c61f9fd8f1b4796eacbd81cee78957842b834af4", size = 471200, upload-time = "2024-09-04T20:44:36.743Z" },
{ url = "https://files.pythonhosted.org/packages/26/9f/1aab65a6c0db35f43c4d1b4f580e8df53914310afc10ae0397d29d697af4/cffi-1.17.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dd398dbc6773384a17fe0d3e7eeb8d1a21c2200473ee6806bb5e6a8e62bb73dd", size = 479447, upload-time = "2024-09-04T20:44:38.492Z" },
{ url = "https://files.pythonhosted.org/packages/5f/e4/fb8b3dd8dc0e98edf1135ff067ae070bb32ef9d509d6cb0f538cd6f7483f/cffi-1.17.1-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:3edc8d958eb099c634dace3c7e16560ae474aa3803a5df240542b305d14e14ed", size = 484358, upload-time = "2024-09-04T20:44:40.046Z" },
{ url = "https://files.pythonhosted.org/packages/f1/47/d7145bf2dc04684935d57d67dff9d6d795b2ba2796806bb109864be3a151/cffi-1.17.1-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:72e72408cad3d5419375fc87d289076ee319835bdfa2caad331e377589aebba9", size = 488469, upload-time = "2024-09-04T20:44:41.616Z" },
]
[[package]]
name = "charset-normalizer"
version = "3.4.2"
@@ -163,6 +191,37 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/85/32/10bb5764d90a8eee674e9dc6f4db6a0ab47c8c4d0d83c27f7c39ac415a4d/click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b", size = 102215, upload-time = "2025-05-20T23:19:47.796Z" },
]
[[package]]
name = "cryptography"
version = "45.0.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cffi", marker = "(platform_python_implementation != 'PyPy' and sys_platform == 'darwin') or (platform_python_implementation != 'PyPy' and sys_platform == 'linux')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/95/1e/49527ac611af559665f71cbb8f92b332b5ec9c6fbc4e88b0f8e92f5e85df/cryptography-45.0.5.tar.gz", hash = "sha256:72e76caa004ab63accdf26023fccd1d087f6d90ec6048ff33ad0445abf7f605a", size = 744903, upload-time = "2025-07-02T13:06:25.941Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f0/fb/09e28bc0c46d2c547085e60897fea96310574c70fb21cd58a730a45f3403/cryptography-45.0.5-cp311-abi3-macosx_10_9_universal2.whl", hash = "sha256:101ee65078f6dd3e5a028d4f19c07ffa4dd22cce6a20eaa160f8b5219911e7d8", size = 7043092, upload-time = "2025-07-02T13:05:01.514Z" },
{ url = "https://files.pythonhosted.org/packages/b1/05/2194432935e29b91fb649f6149c1a4f9e6d3d9fc880919f4ad1bcc22641e/cryptography-45.0.5-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:3a264aae5f7fbb089dbc01e0242d3b67dffe3e6292e1f5182122bdf58e65215d", size = 4205926, upload-time = "2025-07-02T13:05:04.741Z" },
{ url = "https://files.pythonhosted.org/packages/07/8b/9ef5da82350175e32de245646b1884fc01124f53eb31164c77f95a08d682/cryptography-45.0.5-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e74d30ec9c7cb2f404af331d5b4099a9b322a8a6b25c4632755c8757345baac5", size = 4429235, upload-time = "2025-07-02T13:05:07.084Z" },
{ url = "https://files.pythonhosted.org/packages/7c/e1/c809f398adde1994ee53438912192d92a1d0fc0f2d7582659d9ef4c28b0c/cryptography-45.0.5-cp311-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:3af26738f2db354aafe492fb3869e955b12b2ef2e16908c8b9cb928128d42c57", size = 4209785, upload-time = "2025-07-02T13:05:09.321Z" },
{ url = "https://files.pythonhosted.org/packages/d0/8b/07eb6bd5acff58406c5e806eff34a124936f41a4fb52909ffa4d00815f8c/cryptography-45.0.5-cp311-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:e6c00130ed423201c5bc5544c23359141660b07999ad82e34e7bb8f882bb78e0", size = 3893050, upload-time = "2025-07-02T13:05:11.069Z" },
{ url = "https://files.pythonhosted.org/packages/ec/ef/3333295ed58d900a13c92806b67e62f27876845a9a908c939f040887cca9/cryptography-45.0.5-cp311-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:dd420e577921c8c2d31289536c386aaa30140b473835e97f83bc71ea9d2baf2d", size = 4457379, upload-time = "2025-07-02T13:05:13.32Z" },
{ url = "https://files.pythonhosted.org/packages/d9/9d/44080674dee514dbb82b21d6fa5d1055368f208304e2ab1828d85c9de8f4/cryptography-45.0.5-cp311-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:d05a38884db2ba215218745f0781775806bde4f32e07b135348355fe8e4991d9", size = 4209355, upload-time = "2025-07-02T13:05:15.017Z" },
{ url = "https://files.pythonhosted.org/packages/c9/d8/0749f7d39f53f8258e5c18a93131919ac465ee1f9dccaf1b3f420235e0b5/cryptography-45.0.5-cp311-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:ad0caded895a00261a5b4aa9af828baede54638754b51955a0ac75576b831b27", size = 4456087, upload-time = "2025-07-02T13:05:16.945Z" },
{ url = "https://files.pythonhosted.org/packages/09/d7/92acac187387bf08902b0bf0699816f08553927bdd6ba3654da0010289b4/cryptography-45.0.5-cp311-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:9024beb59aca9d31d36fcdc1604dd9bbeed0a55bface9f1908df19178e2f116e", size = 4332873, upload-time = "2025-07-02T13:05:18.743Z" },
{ url = "https://files.pythonhosted.org/packages/03/c2/840e0710da5106a7c3d4153c7215b2736151bba60bf4491bdb421df5056d/cryptography-45.0.5-cp311-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:91098f02ca81579c85f66df8a588c78f331ca19089763d733e34ad359f474174", size = 4564651, upload-time = "2025-07-02T13:05:21.382Z" },
{ url = "https://files.pythonhosted.org/packages/fe/2b/160ce8c2765e7a481ce57d55eba1546148583e7b6f85514472b1d151711d/cryptography-45.0.5-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:f3562c2f23c612f2e4a6964a61d942f891d29ee320edb62ff48ffb99f3de9ae8", size = 7017143, upload-time = "2025-07-02T13:05:27.229Z" },
{ url = "https://files.pythonhosted.org/packages/c2/e7/2187be2f871c0221a81f55ee3105d3cf3e273c0a0853651d7011eada0d7e/cryptography-45.0.5-cp37-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:3fcfbefc4a7f332dece7272a88e410f611e79458fab97b5efe14e54fe476f4fd", size = 4197780, upload-time = "2025-07-02T13:05:29.299Z" },
{ url = "https://files.pythonhosted.org/packages/b9/cf/84210c447c06104e6be9122661159ad4ce7a8190011669afceeaea150524/cryptography-45.0.5-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:460f8c39ba66af7db0545a8c6f2eabcbc5a5528fc1cf6c3fa9a1e44cec33385e", size = 4420091, upload-time = "2025-07-02T13:05:31.221Z" },
{ url = "https://files.pythonhosted.org/packages/3e/6a/cb8b5c8bb82fafffa23aeff8d3a39822593cee6e2f16c5ca5c2ecca344f7/cryptography-45.0.5-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:9b4cf6318915dccfe218e69bbec417fdd7c7185aa7aab139a2c0beb7468c89f0", size = 4198711, upload-time = "2025-07-02T13:05:33.062Z" },
{ url = "https://files.pythonhosted.org/packages/04/f7/36d2d69df69c94cbb2473871926daf0f01ad8e00fe3986ac3c1e8c4ca4b3/cryptography-45.0.5-cp37-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:2089cc8f70a6e454601525e5bf2779e665d7865af002a5dec8d14e561002e135", size = 3883299, upload-time = "2025-07-02T13:05:34.94Z" },
{ url = "https://files.pythonhosted.org/packages/82/c7/f0ea40f016de72f81288e9fe8d1f6748036cb5ba6118774317a3ffc6022d/cryptography-45.0.5-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:0027d566d65a38497bc37e0dd7c2f8ceda73597d2ac9ba93810204f56f52ebc7", size = 4450558, upload-time = "2025-07-02T13:05:37.288Z" },
{ url = "https://files.pythonhosted.org/packages/06/ae/94b504dc1a3cdf642d710407c62e86296f7da9e66f27ab12a1ee6fdf005b/cryptography-45.0.5-cp37-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:be97d3a19c16a9be00edf79dca949c8fa7eff621763666a145f9f9535a5d7f42", size = 4198020, upload-time = "2025-07-02T13:05:39.102Z" },
{ url = "https://files.pythonhosted.org/packages/05/2b/aaf0adb845d5dabb43480f18f7ca72e94f92c280aa983ddbd0bcd6ecd037/cryptography-45.0.5-cp37-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:7760c1c2e1a7084153a0f68fab76e754083b126a47d0117c9ed15e69e2103492", size = 4449759, upload-time = "2025-07-02T13:05:41.398Z" },
{ url = "https://files.pythonhosted.org/packages/91/e4/f17e02066de63e0100a3a01b56f8f1016973a1d67551beaf585157a86b3f/cryptography-45.0.5-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6ff8728d8d890b3dda5765276d1bc6fb099252915a2cd3aff960c4c195745dd0", size = 4319991, upload-time = "2025-07-02T13:05:43.64Z" },
{ url = "https://files.pythonhosted.org/packages/f2/2e/e2dbd629481b499b14516eed933f3276eb3239f7cee2dcfa4ee6b44d4711/cryptography-45.0.5-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:7259038202a47fdecee7e62e0fd0b0738b6daa335354396c6ddebdbe1206af2a", size = 4554189, upload-time = "2025-07-02T13:05:46.045Z" },
]
[[package]]
name = "distro"
version = "1.9.0"
@@ -179,6 +238,8 @@ source = { editable = "." }
dependencies = [
{ name = "aiofiles", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "aiohttp", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "base58", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "cryptography", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "exo-master", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "exo-worker", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -193,7 +254,6 @@ darwin = [
[package.dev-dependencies]
dev = [
{ name = "maturin", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest-asyncio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "ruff", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -203,6 +263,8 @@ dev = [
requires-dist = [
{ name = "aiofiles", specifier = ">=24.1.0" },
{ name = "aiohttp", specifier = ">=3.12.14" },
{ name = "base58", specifier = ">=2.1.1" },
{ name = "cryptography", specifier = ">=45.0.5" },
{ name = "exo-master", editable = "master" },
{ name = "exo-worker", editable = "worker" },
{ name = "mlx", marker = "extra == 'darwin'" },
@@ -214,7 +276,6 @@ provides-extras = ["darwin"]
[package.metadata.requires-dev]
dev = [
{ name = "maturin", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.4.0" },
{ name = "pytest-asyncio", specifier = ">=1.0.0" },
{ name = "ruff", specifier = ">=0.11.13" },
@@ -242,33 +303,14 @@ requires-dist = [
{ name = "uvicorn", specifier = ">=0.35.0" },
]
[[package]]
name = "exo-pyo3-bindings"
version = "0.1.0"
source = { editable = "rust/exo_pyo3_bindings" }
[package.dev-dependencies]
dev = [
{ name = "exo-pyo3-bindings", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pytest-asyncio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
]
[package.metadata]
[package.metadata.requires-dev]
dev = [
{ name = "exo-pyo3-bindings", editable = "rust/exo_pyo3_bindings" },
{ name = "pytest", specifier = ">=8.4.0" },
{ name = "pytest-asyncio", specifier = ">=1.0.0" },
]
[[package]]
name = "exo-shared"
version = "0.1.0"
source = { editable = "shared" }
dependencies = [
{ name = "aiosqlite", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "base58", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "cryptography", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "filelock", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "greenlet", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "networkx", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -292,6 +334,8 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiosqlite", specifier = ">=0.20.0" },
{ name = "base58", specifier = ">=2.1.1" },
{ name = "cryptography", specifier = ">=44.0.0" },
{ name = "filelock", specifier = ">=3.18.0" },
{ name = "greenlet", specifier = ">=3.2.3" },
{ name = "networkx", specifier = ">=3.5" },
@@ -592,23 +636,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0d/80/0985960e4b89922cb5a0bac0ed39c5b96cbc1a536a99f30e8c220a996ed9/MarkupSafe-3.0.2-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:131a3c7689c85f5ad20f9f6fb1b866f402c445b220c19fe4308c0b147ccd2ad9", size = 24098, upload-time = "2024-10-18T15:21:40.813Z" },
]
[[package]]
name = "maturin"
version = "1.9.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/f7/73cf2ae0d6db943a627d28c09f5368735fce6b8b2ad1e1f6bcda2632c80a/maturin-1.9.1.tar.gz", hash = "sha256:97b52fb19d20c1fdc70e4efdc05d79853a4c9c0051030c93a793cd5181dc4ccd", size = 209757, upload-time = "2025-07-08T04:54:43.877Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/48/f2/de43e8954092bd957fbdfbc5b978bf8be40f27aec1a4ebd65e57cfb3ec8a/maturin-1.9.1-py3-none-linux_armv6l.whl", hash = "sha256:fe8f59f9e387fb19635eab6b7381ef718e5dc7a328218e6da604c91f206cbb72", size = 8270244, upload-time = "2025-07-08T04:54:17.962Z" },
{ url = "https://files.pythonhosted.org/packages/b8/72/36966375c2c2bb2d66df4fa756cfcd54175773719b98d4b26a6b4d1f0bfc/maturin-1.9.1-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:6a9c9d176f6df3a8ec1a4c9c72c8a49674ed13668a03c9ead5fab983bbeeb624", size = 16053959, upload-time = "2025-07-08T04:54:21.153Z" },
{ url = "https://files.pythonhosted.org/packages/c4/40/4e0da87e563333ff1605fef15bed5858c2a41c0c0404e47f20086f214473/maturin-1.9.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e14eedbc4369dda1347ce9ddc183ade7c513d9975b7ea2b9c9e4211fb74f597a", size = 8407170, upload-time = "2025-07-08T04:54:23.351Z" },
{ url = "https://files.pythonhosted.org/packages/d9/27/4b29614964c10370effcdfcf34ec57126c9a4b921b7a2c42a94ae3a59cb0/maturin-1.9.1-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:2f05f07bc887e010c44d32a088aea4f36a2104e301f51f408481e4e9759471a7", size = 8258775, upload-time = "2025-07-08T04:54:25.596Z" },
{ url = "https://files.pythonhosted.org/packages/e0/5b/b15ad53e1e6733d8798ce903d25d9e05aa3083b2544f1a6f863ea01dd50d/maturin-1.9.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:e7eb54db3aace213420cd545b24a149842e8d6b1fcec046d0346f299d8adfc34", size = 8787295, upload-time = "2025-07-08T04:54:27.154Z" },
{ url = "https://files.pythonhosted.org/packages/72/d8/b97f4767786eae63bb6b700b342766bcea88da98796bfee290bcddd99fd8/maturin-1.9.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:9d037a37b8ef005eebdea61eaf0e3053ebcad3b740162932fbc120db5fdf5653", size = 8053283, upload-time = "2025-07-08T04:54:28.953Z" },
{ url = "https://files.pythonhosted.org/packages/95/45/770fc005bceac81f5905c96f37c36f65fa9c3da3f4aa8d4e4d2a883aa967/maturin-1.9.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:7c26fb60d80e6a72a8790202bb14dbef956b831044f55d1ce4e2c2e915eb6124", size = 8127120, upload-time = "2025-07-08T04:54:30.779Z" },
{ url = "https://files.pythonhosted.org/packages/2f/a6/be684b4fce58f8b3a9d3b701c23961d5fe0e1710ed484e2216441997e74f/maturin-1.9.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:e0a2c546c123ed97d1ee0c9cc80a912d9174913643c737c12adf4bce46603bb3", size = 10569627, upload-time = "2025-07-08T04:54:32.54Z" },
{ url = "https://files.pythonhosted.org/packages/24/ad/7f8a9d8a1b79c2ed6291aaaa22147c98efee729b23df2803c319dd658049/maturin-1.9.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f5dde6fbcc36a1173fe74e6629fee36e89df76236247b64b23055f1f820bdf35", size = 8934678, upload-time = "2025-07-08T04:54:34.529Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
@@ -813,6 +840,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f7/af/ab3c51ab7507a7325e98ffe691d9495ee3d3aa5f589afad65ec920d39821/protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e", size = 168724, upload-time = "2025-05-28T19:25:53.926Z" },
]
[[package]]
name = "pycparser"
version = "2.22"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1d/b2/31537cf4b1ca988837256c910a668b553fceb8f069bedc4b1c826024b52c/pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6", size = 172736, upload-time = "2024-03-30T13:22:22.564Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/13/a3/a812df4e2dd5696d1f351d58b8fe16a405b234ad2886a0dab9183fb78109/pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc", size = 117552, upload-time = "2024-03-30T13:22:20.476Z" },
]
[[package]]
name = "pydantic"
version = "2.11.7"