mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-27 07:20:14 -05:00
Compare commits
2 Commits
rust-explo
...
alexcheema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
922e8075d3 | ||
|
|
6ee745246d |
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -514,20 +514,6 @@ version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
||||
|
||||
[[package]]
|
||||
name = "cluster_membership"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"futures-lite",
|
||||
"futures-timer",
|
||||
"libp2p",
|
||||
"log",
|
||||
"tokio",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.4"
|
||||
@@ -1012,7 +998,6 @@ dependencies = [
|
||||
name = "exo_pyo3_bindings"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"cluster_membership",
|
||||
"delegate",
|
||||
"derive_more",
|
||||
"env_logger",
|
||||
@@ -1045,12 +1030,6 @@ dependencies = [
|
||||
"syn 2.0.111",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||
|
||||
[[package]]
|
||||
name = "ff"
|
||||
version = "0.13.1"
|
||||
@@ -1159,10 +1138,7 @@ version = "2.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
|
||||
dependencies = [
|
||||
"fastrand",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"parking",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ members = [
|
||||
"rust/networking",
|
||||
"rust/exo_pyo3_bindings",
|
||||
"rust/util",
|
||||
"rust/cluster_membership",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -26,7 +25,6 @@ opt-level = 3
|
||||
## Crate members as common dependencies
|
||||
networking = { path = "rust/networking" }
|
||||
util = { path = "rust/util" }
|
||||
cluster_membership = { path = "rust/cluster_membership" }
|
||||
|
||||
# Proc-macro authoring tools
|
||||
syn = "2.0"
|
||||
@@ -64,7 +62,6 @@ frunk-enum-core = "0.3"
|
||||
# Async dependencies
|
||||
tokio = "1.46"
|
||||
futures = "0.3"
|
||||
futures-lite = "2.6.1"
|
||||
futures-util = "0.3"
|
||||
futures-timer = "3.0"
|
||||
|
||||
|
||||
@@ -31,35 +31,6 @@ enum NetworkSetupHelper {
|
||||
# Remove Thunderbolt Bridge from VirtualNetworkInterfaces in preferences.plist
|
||||
/usr/libexec/PlistBuddy -c "Delete :VirtualNetworkInterfaces:Bridge:bridge0" "$PREFS" 2>/dev/null || true
|
||||
|
||||
networksetup -listlocations | grep -q exo || {
|
||||
networksetup -createlocation exo
|
||||
}
|
||||
|
||||
networksetup -switchtolocation exo
|
||||
networksetup -listallhardwareports \\
|
||||
| awk -F': ' '/Hardware Port: / {print $2}' \\
|
||||
| while IFS=":" read -r name; do
|
||||
case "$name" in
|
||||
"Ethernet Adapter"*)
|
||||
;;
|
||||
"Thunderbolt Bridge")
|
||||
;;
|
||||
"Thunderbolt "*)
|
||||
networksetup -listallnetworkservices \\
|
||||
| grep -q "EXO $name" \\
|
||||
|| networksetup -createnetworkservice "EXO $name" "$name" 2>/dev/null \\
|
||||
|| continue
|
||||
networksetup -setdhcp "EXO $name"
|
||||
;;
|
||||
*)
|
||||
networksetup -listallnetworkservices \\
|
||||
| grep -q "$name" \\
|
||||
|| networksetup -createnetworkservice "$name" "$name" 2>/dev/null \\
|
||||
|| continue
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
networksetup -listnetworkservices | grep -q "Thunderbolt Bridge" && {
|
||||
networksetup -setnetworkserviceenabled "Thunderbolt Bridge" off
|
||||
} || true
|
||||
|
||||
@@ -3,28 +3,12 @@
|
||||
perSystem =
|
||||
{ pkgs, lib, ... }:
|
||||
let
|
||||
# Filter source to ONLY include package.json and package-lock.json
|
||||
# This ensures prettier-svelte only rebuilds when lockfiles change
|
||||
dashboardLockfileSrc = lib.cleanSourceWith {
|
||||
src = inputs.self;
|
||||
filter =
|
||||
path: type:
|
||||
let
|
||||
baseName = builtins.baseNameOf path;
|
||||
isDashboardDir = baseName == "dashboard" && type == "directory";
|
||||
isPackageFile =
|
||||
(lib.hasInfix "/dashboard/" path || lib.hasSuffix "/dashboard" (builtins.dirOf path))
|
||||
&& (baseName == "package.json" || baseName == "package-lock.json");
|
||||
in
|
||||
isDashboardDir || isPackageFile;
|
||||
};
|
||||
|
||||
# Stub source with lockfiles and minimal files for build to succeed
|
||||
# This allows prettier-svelte to avoid rebuilding when dashboard source changes
|
||||
dashboardStubSrc = pkgs.runCommand "dashboard-stub-src" { } ''
|
||||
mkdir -p $out
|
||||
cp ${dashboardLockfileSrc}/dashboard/package.json $out/
|
||||
cp ${dashboardLockfileSrc}/dashboard/package-lock.json $out/
|
||||
cp ${inputs.self}/dashboard/package.json $out/
|
||||
cp ${inputs.self}/dashboard/package-lock.json $out/
|
||||
# Minimal files so vite build succeeds (produces empty output)
|
||||
echo '<!DOCTYPE html><html><head></head><body></body></html>' > $out/index.html
|
||||
mkdir -p $out/src
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
[package]
|
||||
name = "cluster_membership"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
# util
|
||||
anyhow.workspace = true
|
||||
log.workspace = true
|
||||
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures-timer = { workspace = true }
|
||||
futures-lite = "2.6.1"
|
||||
|
||||
# networking
|
||||
libp2p = { workspace = true, features = ["full"] }
|
||||
async-trait = "0.1.89"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -1,30 +0,0 @@
|
||||
use cluster_membership::Peer;
|
||||
use libp2p::identity::ed25519::SecretKey;
|
||||
use tokio::io::{self, AsyncBufReadExt};
|
||||
use tracing_subscriber::{EnvFilter, filter::LevelFilter};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
|
||||
.try_init();
|
||||
|
||||
let (mut peer, send, mut recv) =
|
||||
Peer::new(SecretKey::generate(), "hello".to_string()).expect("peer should always build");
|
||||
|
||||
let ch = peer.subscribe("chatroom".to_string());
|
||||
let jh = tokio::spawn(async move { peer.run().await });
|
||||
|
||||
let mut stdin = io::BufReader::new(io::stdin()).lines();
|
||||
loop {
|
||||
tokio::select! {
|
||||
Ok(Some(line)) = stdin.next_line() => {send.send((ch.clone(), line.into_bytes())).await.expect("example");}
|
||||
Some(r) = recv.recv() => match r {
|
||||
Ok((_, id, line)) => println!("{:?}:{:?}", id, String::from_utf8_lossy(&line)),
|
||||
Err(e) => eprintln!("{e:?}"),
|
||||
},
|
||||
else => break
|
||||
}
|
||||
}
|
||||
jh.await.expect("task failure");
|
||||
}
|
||||
@@ -1,227 +0,0 @@
|
||||
use libp2p::{
|
||||
Multiaddr, PeerId, Swarm, SwarmBuilder,
|
||||
futures::StreamExt,
|
||||
gossipsub::{self, PublishError, Sha256Topic, TopicHash},
|
||||
identify,
|
||||
identity::{Keypair, ed25519},
|
||||
mdns,
|
||||
swarm::{NetworkBehaviour, SwarmEvent, dial_opts::DialOpts},
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{select, sync::mpsc};
|
||||
|
||||
const DEFAULT_BUFFER_SIZE: usize = 10;
|
||||
const MDNS_IGNORE_DURATION_SECS: u64 = 30;
|
||||
|
||||
impl Peer {
|
||||
pub fn new(
|
||||
identity: ed25519::SecretKey,
|
||||
namespace: String,
|
||||
) -> anyhow::Result<(
|
||||
Self,
|
||||
mpsc::Sender<(TopicHash, Vec<u8>)>,
|
||||
mpsc::Receiver<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
|
||||
)> {
|
||||
let mut id_bytes = identity.as_ref().to_vec();
|
||||
|
||||
let mut swarm =
|
||||
SwarmBuilder::with_existing_identity(Keypair::ed25519_from_bytes(&mut id_bytes)?)
|
||||
.with_tokio()
|
||||
.with_quic()
|
||||
// TODO(evan): .with_bandwidth_metrics();
|
||||
.with_behaviour(|kp| Behaviour::new(kp, namespace.clone()))?
|
||||
.build();
|
||||
|
||||
swarm.listen_on("/ip6/::/udp/0/quic-v1".parse()?)?;
|
||||
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
|
||||
let (to_swarm, from_client) = mpsc::channel(DEFAULT_BUFFER_SIZE);
|
||||
let (to_client, from_swarm) = mpsc::channel(DEFAULT_BUFFER_SIZE);
|
||||
Ok((
|
||||
Self {
|
||||
swarm,
|
||||
namespace,
|
||||
denied: HashMap::new(),
|
||||
from_client,
|
||||
to_client,
|
||||
},
|
||||
to_swarm,
|
||||
from_swarm,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn subscribe(&mut self, topic: String) -> TopicHash {
|
||||
let topic = Sha256Topic::new(topic);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.subscribe(&topic)
|
||||
.expect("topic filtered");
|
||||
topic.hash()
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
loop {
|
||||
select! {
|
||||
ev = self.swarm.select_next_some() => {
|
||||
let Ok(()) = self.handle_swarm_event(ev).await else {
|
||||
return
|
||||
};
|
||||
},
|
||||
Some(msg) = self.from_client.recv() => {
|
||||
if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(msg.0, msg.1) {
|
||||
let Ok(()) = self.to_client.send(Err(e)).await else {
|
||||
return
|
||||
};
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourEvent>) -> Result<(), ()> {
|
||||
let SwarmEvent::Behaviour(event) = event else {
|
||||
if let SwarmEvent::NewListenAddr {
|
||||
listener_id: _,
|
||||
address,
|
||||
} = event
|
||||
{
|
||||
log::info!("new listen address {address}")
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
match event {
|
||||
BehaviourEvent::Mdns(mdns_event) => match mdns_event {
|
||||
mdns::Event::Discovered(vec) => {
|
||||
// Dial everyone
|
||||
let mut addrs = HashMap::<PeerId, Vec<Multiaddr>>::new();
|
||||
vec.into_iter()
|
||||
.filter(|(peer_id, _)| {
|
||||
self.denied.get(peer_id).is_none_or(|t| {
|
||||
t.elapsed() > Duration::from_secs(MDNS_IGNORE_DURATION_SECS)
|
||||
})
|
||||
})
|
||||
.for_each(|(peer_id, addr)| addrs.entry(peer_id).or_default().push(addr));
|
||||
addrs.into_iter().for_each(|(peer_id, addrs)| {
|
||||
let _ = self
|
||||
.swarm
|
||||
.dial(DialOpts::peer_id(peer_id).addresses(addrs).build());
|
||||
});
|
||||
}
|
||||
mdns::Event::Expired(vec) => {
|
||||
vec.iter().for_each(|(peer_id, _)| {
|
||||
log::debug!("{peer_id} no longer reachable on mDNS");
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.remove_explicit_peer(peer_id);
|
||||
});
|
||||
}
|
||||
},
|
||||
BehaviourEvent::Identify(identify::Event::Received {
|
||||
connection_id: _,
|
||||
peer_id,
|
||||
info,
|
||||
}) => {
|
||||
if info
|
||||
.protocols
|
||||
.iter()
|
||||
.any(|p| p.as_ref().contains(&self.namespace))
|
||||
{
|
||||
self.passed_namespace(peer_id);
|
||||
} else {
|
||||
self.failed_namespace(peer_id);
|
||||
}
|
||||
}
|
||||
BehaviourEvent::Gossipsub(gossipsub::Event::Message {
|
||||
propagation_source: _,
|
||||
message_id: _,
|
||||
message:
|
||||
gossipsub::Message {
|
||||
topic,
|
||||
data,
|
||||
source: Some(source_peer),
|
||||
..
|
||||
},
|
||||
}) => {
|
||||
let Ok(()) = self.to_client.send(Ok((topic, source_peer, data))).await else {
|
||||
return Err(());
|
||||
};
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn passed_namespace(&mut self, peer: PeerId) {
|
||||
log::info!("new peer {peer:?}");
|
||||
self.denied.remove(&peer);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.remove_blacklisted_peer(&peer);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.add_explicit_peer(&peer);
|
||||
}
|
||||
|
||||
fn failed_namespace(&mut self, peer: PeerId) {
|
||||
log::debug!("{peer} failed handshake");
|
||||
self.denied.insert(peer, Instant::now());
|
||||
self.swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
|
||||
// we don't care if disconnect fails
|
||||
let _ = self.swarm.disconnect_peer_id(peer);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Peer {
|
||||
pub swarm: Swarm<Behaviour>,
|
||||
denied: HashMap<PeerId, Instant>,
|
||||
namespace: String,
|
||||
to_client: mpsc::Sender<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
|
||||
from_client: mpsc::Receiver<(TopicHash, Vec<u8>)>,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn foo() {
|
||||
fn bar<T: Send>(t: T) {}
|
||||
let p: Peer = unimplemented!();
|
||||
bar(p);
|
||||
}
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
pub struct Behaviour {
|
||||
mdns: mdns::tokio::Behaviour,
|
||||
pub gossipsub: gossipsub::Behaviour,
|
||||
identify: identify::Behaviour,
|
||||
}
|
||||
|
||||
impl Behaviour {
|
||||
fn new(kp: &Keypair, namespace: String) -> Self {
|
||||
let mdns = mdns::tokio::Behaviour::new(Default::default(), kp.public().to_peer_id())
|
||||
.expect("implementation is infallible");
|
||||
let gossipsub = gossipsub::Behaviour::new(
|
||||
gossipsub::MessageAuthenticity::Signed(kp.clone()),
|
||||
gossipsub::ConfigBuilder::default()
|
||||
.max_transmit_size(1024 * 1024)
|
||||
.protocol_id_prefix(format!("/exo/gossip/{namespace}/v1"))
|
||||
.build()
|
||||
.expect("fixed gossipsub config should always build"),
|
||||
)
|
||||
.expect("fixed gossipsub init should always build");
|
||||
|
||||
let identify = identify::Behaviour::new(
|
||||
identify::Config::new_with_signed_peer_record(format!("/exo/identity/v1"), kp)
|
||||
.with_push_listen_addr_updates(true),
|
||||
);
|
||||
|
||||
Behaviour {
|
||||
mdns,
|
||||
gossipsub,
|
||||
identify,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,6 @@ doc = false
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
cluster_membership.workspace = true
|
||||
networking = { workspace = true }
|
||||
|
||||
# interop
|
||||
|
||||
@@ -6,41 +6,3 @@
|
||||
|
||||
pub mod ident;
|
||||
pub mod multiaddr;
|
||||
|
||||
use std::sync::Mutex;
|
||||
|
||||
use cluster_membership::Peer;
|
||||
use libp2p::identity::ed25519::Keypair;
|
||||
use pyo3::prelude::*;
|
||||
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
|
||||
|
||||
#[gen_stub_pyclass]
|
||||
#[pyclass]
|
||||
#[derive(Clone)]
|
||||
pub struct PyKeypair(Keypair);
|
||||
|
||||
#[gen_stub_pymethods]
|
||||
#[pymethods]
|
||||
impl PyKeypair {
|
||||
#[staticmethod]
|
||||
fn generate() -> Self {
|
||||
Self(Keypair::generate())
|
||||
}
|
||||
}
|
||||
|
||||
#[gen_stub_pyclass]
|
||||
#[pyclass]
|
||||
pub struct PyPeer(Mutex<Peer>);
|
||||
|
||||
#[gen_stub_pymethods]
|
||||
#[pymethods]
|
||||
impl PyPeer {
|
||||
#[staticmethod]
|
||||
fn init(kp: PyKeypair, namespace: String) -> PyResult<Self> {
|
||||
Ok(PyPeer(Mutex::new(
|
||||
Peer::new(kp.0.secret(), namespace)
|
||||
.map_err(|e| e.pyerr())?
|
||||
.0,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ class Node:
|
||||
await router.register_topic(topics.COMMANDS)
|
||||
await router.register_topic(topics.ELECTION_MESSAGES)
|
||||
await router.register_topic(topics.CONNECTION_MESSAGES)
|
||||
await router.register_topic(topics.STATE_CATCHUP)
|
||||
await router.register_topic(topics.DOWNLOAD_COMMANDS)
|
||||
|
||||
logger.info(f"Starting node {node_id}")
|
||||
@@ -82,6 +83,7 @@ class Node:
|
||||
command_sender=router.sender(topics.COMMANDS),
|
||||
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
|
||||
election_receiver=router.receiver(topics.ELECTION_MESSAGES),
|
||||
state_catchup_receiver=router.receiver(topics.STATE_CATCHUP),
|
||||
)
|
||||
else:
|
||||
api = None
|
||||
@@ -94,6 +96,7 @@ class Node:
|
||||
global_event_receiver=router.receiver(topics.GLOBAL_EVENTS),
|
||||
local_event_sender=router.sender(topics.LOCAL_EVENTS),
|
||||
command_sender=router.sender(topics.COMMANDS),
|
||||
state_catchup_receiver=router.receiver(topics.STATE_CATCHUP),
|
||||
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
|
||||
event_index_counter=event_index_counter,
|
||||
)
|
||||
@@ -107,6 +110,7 @@ class Node:
|
||||
global_event_sender=router.sender(topics.GLOBAL_EVENTS),
|
||||
local_event_receiver=router.receiver(topics.LOCAL_EVENTS),
|
||||
command_receiver=router.receiver(topics.COMMANDS),
|
||||
state_catchup_sender=router.sender(topics.STATE_CATCHUP),
|
||||
)
|
||||
|
||||
er_send, er_recv = channel[ElectionResult]()
|
||||
@@ -189,6 +193,7 @@ class Node:
|
||||
global_event_sender=self.router.sender(topics.GLOBAL_EVENTS),
|
||||
local_event_receiver=self.router.receiver(topics.LOCAL_EVENTS),
|
||||
command_receiver=self.router.receiver(topics.COMMANDS),
|
||||
state_catchup_sender=self.router.sender(topics.STATE_CATCHUP),
|
||||
)
|
||||
self._tg.start_soon(self.master.run)
|
||||
elif (
|
||||
@@ -235,6 +240,9 @@ class Node:
|
||||
),
|
||||
local_event_sender=self.router.sender(topics.LOCAL_EVENTS),
|
||||
command_sender=self.router.sender(topics.COMMANDS),
|
||||
state_catchup_receiver=self.router.receiver(
|
||||
topics.STATE_CATCHUP
|
||||
),
|
||||
download_command_sender=self.router.sender(
|
||||
topics.DOWNLOAD_COMMANDS
|
||||
),
|
||||
|
||||
@@ -166,6 +166,7 @@ class API:
|
||||
download_command_sender: Sender[ForwarderDownloadCommand],
|
||||
# This lets us pause the API if an election is running
|
||||
election_receiver: Receiver[ElectionMessage],
|
||||
state_catchup_receiver: Receiver[State],
|
||||
) -> None:
|
||||
self.state = State()
|
||||
self._event_log: list[Event] = []
|
||||
@@ -173,6 +174,7 @@ class API:
|
||||
self.download_command_sender = download_command_sender
|
||||
self.global_event_receiver = global_event_receiver
|
||||
self.election_receiver = election_receiver
|
||||
self.state_catchup_receiver = state_catchup_receiver
|
||||
self.event_buffer: OrderedBuffer[Event] = OrderedBuffer[Event]()
|
||||
self.node_id: NodeId = node_id
|
||||
self.session_id: SessionId = session_id
|
||||
@@ -1249,6 +1251,7 @@ class API:
|
||||
tg.start_soon(self._apply_state)
|
||||
tg.start_soon(self._pause_on_new_election)
|
||||
tg.start_soon(self._cleanup_expired_images)
|
||||
tg.start_soon(self._state_catchup)
|
||||
print_startup_banner(self.port)
|
||||
await serve(
|
||||
cast(ASGIFramework, self.app),
|
||||
@@ -1259,6 +1262,37 @@ class API:
|
||||
self.command_sender.close()
|
||||
self.global_event_receiver.close()
|
||||
|
||||
async def _state_catchup(self):
|
||||
with self.state_catchup_receiver as states:
|
||||
async for state in states:
|
||||
if (
|
||||
self.state.last_event_applied_idx == -1
|
||||
and state.last_event_applied_idx > self.state.last_event_applied_idx
|
||||
):
|
||||
# DEBUG: Log buffer state BEFORE clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: About to catch up. "
|
||||
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
|
||||
f"catching up to idx: {state.last_event_applied_idx}"
|
||||
)
|
||||
|
||||
new_idx = state.last_event_applied_idx + 1
|
||||
self.event_buffer.next_idx_to_release = new_idx
|
||||
# Preserve events that arrived early but are still valid (idx >= new_idx)
|
||||
# Remove stale events (idx < new_idx) to prevent memory growth
|
||||
self.event_buffer.store = {
|
||||
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
|
||||
}
|
||||
self.state = state
|
||||
|
||||
# DEBUG: Log buffer state AFTER clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: Catchup complete. "
|
||||
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
|
||||
async def _apply_state(self):
|
||||
with self.global_event_receiver as events:
|
||||
async for f_event in events:
|
||||
|
||||
@@ -68,6 +68,8 @@ class Master:
|
||||
# Send events to the forwarder to be indexed (usually from command processing)
|
||||
# Ideally these would be MasterForwarderEvents but type system says no :(
|
||||
global_event_sender: Sender[ForwarderEvent],
|
||||
# not a fan but - send the entire state to a node so it can catchup without the whole event log.
|
||||
state_catchup_sender: Sender[State],
|
||||
):
|
||||
self.state = State()
|
||||
self._tg: TaskGroup = anyio.create_task_group()
|
||||
@@ -77,6 +79,7 @@ class Master:
|
||||
self.command_receiver = command_receiver
|
||||
self.local_event_receiver = local_event_receiver
|
||||
self.global_event_sender = global_event_sender
|
||||
self.state_catchup_sender = state_catchup_sender
|
||||
send, recv = channel[Event]()
|
||||
self.event_sender: Sender[Event] = send
|
||||
self._loopback_event_receiver: Receiver[Event] = recv
|
||||
@@ -84,7 +87,6 @@ class Master:
|
||||
local_event_receiver.clone_sender()
|
||||
)
|
||||
self._multi_buffer = MultiSourceBuffer[NodeId, Event]()
|
||||
# TODO: not have this
|
||||
self._event_log: list[Event] = []
|
||||
|
||||
async def run(self):
|
||||
@@ -291,11 +293,17 @@ class Master:
|
||||
command.finished_command_id
|
||||
]
|
||||
case RequestEventLog():
|
||||
# We should just be able to send everything, since other buffers will ignore old messages
|
||||
for i in range(command.since_idx, len(self._event_log)):
|
||||
await self._send_event(
|
||||
IndexedEvent(idx=i, event=self._event_log[i])
|
||||
if command.since_idx == 0:
|
||||
# This is an optimization, and should not be relied upon in theory.
|
||||
logger.info(
|
||||
f"Master sending catchup state for index {self.state.last_event_applied_idx}"
|
||||
)
|
||||
await self.state_catchup_sender.send(self.state)
|
||||
else:
|
||||
for i in range(command.since_idx, len(self._event_log)):
|
||||
await self._send_event(
|
||||
IndexedEvent(idx=i, event=self._event_log[i])
|
||||
)
|
||||
for event in generated_events:
|
||||
await self.event_sender.send(event)
|
||||
except ValueError as e:
|
||||
|
||||
@@ -27,6 +27,7 @@ from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryUsage,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import ChatCompletion as ChatCompletionTask
|
||||
from exo.shared.types.tasks import TaskStatus
|
||||
from exo.shared.types.worker.instances import (
|
||||
@@ -47,6 +48,7 @@ async def test_master():
|
||||
ge_sender, global_event_receiver = channel[ForwarderEvent]()
|
||||
command_sender, co_receiver = channel[ForwarderCommand]()
|
||||
local_event_sender, le_receiver = channel[ForwarderEvent]()
|
||||
st_s, _st_r = channel[State]()
|
||||
|
||||
all_events: list[IndexedEvent] = []
|
||||
|
||||
@@ -67,6 +69,7 @@ async def test_master():
|
||||
global_event_sender=ge_sender,
|
||||
local_event_receiver=le_receiver,
|
||||
command_receiver=co_receiver,
|
||||
state_catchup_sender=st_s,
|
||||
)
|
||||
logger.info("run the master")
|
||||
async with anyio.create_task_group() as tg:
|
||||
|
||||
@@ -7,6 +7,7 @@ from exo.shared.types.commands import ForwarderCommand, ForwarderDownloadCommand
|
||||
from exo.shared.types.events import (
|
||||
ForwarderEvent,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.utils.pydantic_ext import CamelCaseModel
|
||||
|
||||
|
||||
@@ -45,6 +46,7 @@ ELECTION_MESSAGES = TypedTopic(
|
||||
CONNECTION_MESSAGES = TypedTopic(
|
||||
"connection_messages", PublishPolicy.Never, ConnectionMessage
|
||||
)
|
||||
STATE_CATCHUP = TypedTopic("state_catchup", PublishPolicy.Always, State)
|
||||
DOWNLOAD_COMMANDS = TypedTopic(
|
||||
"download_commands", PublishPolicy.Always, ForwarderDownloadCommand
|
||||
)
|
||||
|
||||
@@ -349,8 +349,13 @@ class InfoGatherer:
|
||||
async def _monitor_misc(self):
|
||||
if self.misc_poll_interval is None:
|
||||
return
|
||||
prev = await MiscData.gather()
|
||||
await self.info_sender.send(prev)
|
||||
while True:
|
||||
await self.info_sender.send(await MiscData.gather())
|
||||
curr = await MiscData.gather()
|
||||
if prev != curr:
|
||||
prev = curr
|
||||
await self.info_sender.send(curr)
|
||||
await anyio.sleep(self.misc_poll_interval)
|
||||
|
||||
async def _monitor_system_profiler_thunderbolt_data(self):
|
||||
@@ -360,12 +365,15 @@ class InfoGatherer:
|
||||
if iface_map is None:
|
||||
return
|
||||
|
||||
old_idents = []
|
||||
while True:
|
||||
data = await ThunderboltConnectivity.gather()
|
||||
assert data is not None
|
||||
|
||||
idents = [it for i in data if (it := i.ident(iface_map)) is not None]
|
||||
await self.info_sender.send(MacThunderboltIdentifiers(idents=idents))
|
||||
if idents != old_idents:
|
||||
await self.info_sender.send(MacThunderboltIdentifiers(idents=idents))
|
||||
old_idents = idents
|
||||
|
||||
conns = [it for i in data if (it := i.conn()) is not None]
|
||||
await self.info_sender.send(MacThunderboltConnections(conns=conns))
|
||||
@@ -390,17 +398,22 @@ class InfoGatherer:
|
||||
async def _watch_system_info(self):
|
||||
if self.interface_watcher_interval is None:
|
||||
return
|
||||
old_nics = []
|
||||
while True:
|
||||
nics = await get_network_interfaces()
|
||||
await self.info_sender.send(NodeNetworkInterfaces(ifaces=nics))
|
||||
if nics != old_nics:
|
||||
old_nics = nics
|
||||
await self.info_sender.send(NodeNetworkInterfaces(ifaces=nics))
|
||||
await anyio.sleep(self.interface_watcher_interval)
|
||||
|
||||
async def _monitor_thunderbolt_bridge_status(self):
|
||||
if self.thunderbolt_bridge_poll_interval is None:
|
||||
return
|
||||
prev: ThunderboltBridgeInfo | None = None
|
||||
while True:
|
||||
curr = await ThunderboltBridgeInfo.gather()
|
||||
if curr is not None:
|
||||
if curr is not None and prev != curr:
|
||||
prev = curr
|
||||
await self.info_sender.send(curr)
|
||||
await anyio.sleep(self.thunderbolt_bridge_poll_interval)
|
||||
|
||||
|
||||
@@ -60,9 +60,8 @@ class Worker:
|
||||
connection_message_receiver: Receiver[ConnectionMessage],
|
||||
global_event_receiver: Receiver[ForwarderEvent],
|
||||
local_event_sender: Sender[ForwarderEvent],
|
||||
# This is for requesting updates. It doesn't need to be a general command sender right now,
|
||||
# but I think it's the correct way to be thinking about commands
|
||||
command_sender: Sender[ForwarderCommand],
|
||||
state_catchup_receiver: Receiver[State],
|
||||
download_command_sender: Sender[ForwarderDownloadCommand],
|
||||
event_index_counter: Iterator[int],
|
||||
):
|
||||
@@ -71,6 +70,8 @@ class Worker:
|
||||
|
||||
self.global_event_receiver = global_event_receiver
|
||||
self.local_event_sender = local_event_sender
|
||||
self.state_catchup_receiver = state_catchup_receiver
|
||||
self.local_event_index = 0
|
||||
self.event_index_counter = event_index_counter
|
||||
self.command_sender = command_sender
|
||||
self.download_command_sender = download_command_sender
|
||||
@@ -110,6 +111,7 @@ class Worker:
|
||||
tg.start_soon(self._event_applier)
|
||||
tg.start_soon(self._forward_events)
|
||||
tg.start_soon(self._poll_connection_updates)
|
||||
tg.start_soon(self._check_catchup_state)
|
||||
|
||||
# Actual shutdown code - waits for all tasks to complete before executing.
|
||||
self.local_event_sender.close()
|
||||
@@ -121,13 +123,47 @@ class Worker:
|
||||
async def _forward_info(self, recv: Receiver[GatheredInfo]):
|
||||
with recv as info_stream:
|
||||
async for info in info_stream:
|
||||
await self.event_sender.send(
|
||||
NodeGatheredInfo(
|
||||
node_id=self.node_id,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
info=info,
|
||||
)
|
||||
event = NodeGatheredInfo(
|
||||
node_id=self.node_id,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
info=info,
|
||||
)
|
||||
logger.warning(
|
||||
f"NODE_GATHERED_INFO: Sending event for node {self.node_id}, "
|
||||
f"event_id={event.event_id}"
|
||||
)
|
||||
await self.event_sender.send(event)
|
||||
|
||||
async def _check_catchup_state(self):
|
||||
with self.state_catchup_receiver as states:
|
||||
async for state in states:
|
||||
if (
|
||||
self.state.last_event_applied_idx == -1
|
||||
and state.last_event_applied_idx > self.state.last_event_applied_idx
|
||||
):
|
||||
# DEBUG: Log buffer state BEFORE clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: About to catch up. "
|
||||
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
|
||||
f"catching up to idx: {state.last_event_applied_idx}"
|
||||
)
|
||||
|
||||
new_idx = state.last_event_applied_idx + 1
|
||||
self.event_buffer.next_idx_to_release = new_idx
|
||||
# Preserve events that arrived early but are still valid (idx >= new_idx)
|
||||
# Remove stale events (idx < new_idx) to prevent memory growth
|
||||
self.event_buffer.store = {
|
||||
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
|
||||
}
|
||||
self.state = state
|
||||
|
||||
# DEBUG: Log buffer state AFTER clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: Catchup complete. "
|
||||
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
|
||||
async def _event_applier(self):
|
||||
with self.global_event_receiver as events:
|
||||
@@ -139,8 +175,20 @@ class Worker:
|
||||
if event_id in self.out_for_delivery:
|
||||
del self.out_for_delivery[event_id]
|
||||
|
||||
# DEBUG: Log what was ingested
|
||||
logger.warning(
|
||||
f"EVENT_APPLIER: Ingested event idx={f_event.origin_idx}, "
|
||||
f"buffer keys now: {sorted(self.event_buffer.store.keys())}"
|
||||
)
|
||||
|
||||
# 2. for each event, apply it to the state
|
||||
indexed_events = self.event_buffer.drain_indexed()
|
||||
|
||||
# DEBUG: Log drain results
|
||||
logger.warning(
|
||||
f"EVENT_APPLIER: Drained {len(indexed_events)} events, "
|
||||
f"next_idx_to_release now: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
if indexed_events:
|
||||
self._nack_attempts = 0
|
||||
|
||||
@@ -157,6 +205,12 @@ class Worker:
|
||||
self._nack_cancel_scope.cancel()
|
||||
|
||||
for idx, event in indexed_events:
|
||||
# DEBUG: Log NodeGatheredInfo events
|
||||
if isinstance(event, NodeGatheredInfo):
|
||||
logger.warning(
|
||||
f"NODE_GATHERED_INFO: Applying event idx={idx} for node {event.node_id}, "
|
||||
f"event_id={event.event_id}"
|
||||
)
|
||||
self.state = apply(self.state, IndexedEvent(idx=idx, event=event))
|
||||
|
||||
# Buffer input image chunks for image editing
|
||||
@@ -318,10 +372,7 @@ class Worker:
|
||||
# We request all events after (and including) the missing index.
|
||||
# This function is started whenever we receive an event that is out of sequence.
|
||||
# It is cancelled as soon as we receiver an event that is in sequence.
|
||||
|
||||
if since_idx < 0:
|
||||
logger.warning(f"Negative value encountered for nack request {since_idx=}")
|
||||
since_idx = 0
|
||||
assert since_idx >= 0
|
||||
|
||||
with CancelScope() as scope:
|
||||
self._nack_cancel_scope = scope
|
||||
|
||||
Reference in New Issue
Block a user