Compare commits

..

4 Commits

Author SHA1 Message Date
Evan
3029dc6653 maybe maybe 2026-01-27 10:09:42 +00:00
Alex Cheema
44453c4c8b Remove change-detection checks from info gatherer monitors (#1283)
## Summary
- When a node times out, its info gets cleared from state. The monitor
functions only sent data when something changed, leaving no mechanism to
re-populate this info after a timeout.
- Removes change-detection checks from `_monitor_misc`,
`_monitor_system_profiler_thunderbolt_data`, `_watch_system_info`, and
`_monitor_thunderbolt_bridge_status` so data is sent periodically
regardless of whether it changed.

## Test plan
- [ ] Verify type checker passes: `uv run basedpyright`
- [ ] Verify linter passes: `uv run ruff check`
- [ ] Verify tests pass: `uv run pytest`
- [ ] Manually test that node info is re-populated after a timeout by
observing cluster behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 12:23:22 +00:00
Jake Hillion
1290e8ed9f dashboard: fix prettier-svelte rebuilding on every file change
The prettier-svelte package was rebuilding whenever any file in the
repository changed because dashboardStubSrc referenced inputs.self
directly. Since inputs.self's store path hash is computed from the
entire repository contents, any file modification invalidated the
derivation.

Added dashboardLockfileSrc using lib.cleanSourceWith to filter
inputs.self to only include package.json and package-lock.json from
the dashboard directory. Updated dashboardStubSrc to reference this
filtered source instead of inputs.self directly.

This ensures prettier-svelte only rebuilds when the lockfiles actually
change, significantly improving build caching for unrelated changes.

Test plan:
- Built prettier-svelte with nix build .#prettier-svelte
- Modified src/exo/main.py and rebuilt - same store path (no rebuild)
- Modified dashboard/package.json and rebuilt - different store path (rebuild triggered)
- Ran nix flake check successfully
2026-01-26 12:02:05 +00:00
Evan Quiney
d93db3d6bf re enable the evil network script (#1277)
seems like we still need the interfaces to be routable for mdns. at
least we're not dependent on this behaviour anymore.
2026-01-24 13:36:06 +00:00
23 changed files with 515 additions and 237 deletions

24
Cargo.lock generated
View File

@@ -514,6 +514,20 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cluster_membership"
version = "0.0.1"
dependencies = [
"anyhow",
"async-trait",
"futures-lite",
"futures-timer",
"libp2p",
"log",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "colorchoice"
version = "1.0.4"
@@ -998,6 +1012,7 @@ dependencies = [
name = "exo_pyo3_bindings"
version = "0.0.1"
dependencies = [
"cluster_membership",
"delegate",
"derive_more",
"env_logger",
@@ -1030,6 +1045,12 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "ff"
version = "0.13.1"
@@ -1138,7 +1159,10 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]

View File

@@ -4,6 +4,7 @@ members = [
"rust/networking",
"rust/exo_pyo3_bindings",
"rust/util",
"rust/cluster_membership",
]
[workspace.package]
@@ -25,6 +26,7 @@ opt-level = 3
## Crate members as common dependencies
networking = { path = "rust/networking" }
util = { path = "rust/util" }
cluster_membership = { path = "rust/cluster_membership" }
# Proc-macro authoring tools
syn = "2.0"
@@ -62,6 +64,7 @@ frunk-enum-core = "0.3"
# Async dependencies
tokio = "1.46"
futures = "0.3"
futures-lite = "2.6.1"
futures-util = "0.3"
futures-timer = "3.0"

View File

@@ -5,18 +5,18 @@
[X] Fetching download status of all models on start
[X] Deduplication of tasks in plan_step.
[X] resolve_allow_patterns should just be wildcard now.
[X] no mx_barrier in genreate.py mlx_generate at the end.
[] no mx_barrier in genreate.py mlx_generate at the end.
[] cache assertion not needed in auto_parallel.py PipelineLastLayer.
[X] GPTOSS support dropped in auto_parallel.py.
[X] sharding changed "all-to-sharded" became _all_to_sharded in auto_parallel.py.
[X] same as above with "sharded-to-all" became _sharded_to_all in auto_parallel.py.
[X] Dropped support for Ministral3Model, DeepseekV32Model, Glm4MoeModel, Qwen3NextModel, GptOssMode in auto_parallel.py.
[] GPTOSS support dropped in auto_parallel.py.
[] sharding changed "all-to-sharded" became _all_to_sharded in auto_parallel.py.
[] same as above with "sharded-to-all" became _sharded_to_all in auto_parallel.py.
[] Dropped support for Ministral3Model, DeepseekV32Model, Glm4MoeModel, Qwen3NextModel, GptOssMode in auto_parallel.py.
[] Dropped prefill/decode code in auto_parallel.py and utils_mlx.py.
[X] KV_CACHE_BITS should be None to disable quantized KV cache.
[X] Dropped _set_nofile_limit in utils_mlx.py.
[X] We have group optional in load_mlx_items in utils_mlx.py.
[] Dropped _set_nofile_limit in utils_mlx.py.
[] We have group optional in load_mlx_items in utils_mlx.py.
[] Dropped add_missing_chat_templates for GptOss in load_mlx_items in utils_mlx.py.
[X] Dropped model.make_cache in make_kv_cache in utils_mlx.py.
[] Dropped model.make_cache in make_kv_cache in utils_mlx.py.
[X] We put cache limit back in utils_mlx.py.
[] topology.py remove_node removes the connections after checking if node is is in self._node_id_to_rx_id_map. on beta_1 it checks after, so would remove stale connections I guess?
[] Missing Glm 4.7 model cards (this isn't ready yet but should be picked up, probably create an issue... the blocker is transforemrs version doesn't support the tokenizer for Glm 4.7. rc-1 does but we can't upgrade as it breaks other things.)

View File

@@ -31,6 +31,35 @@ enum NetworkSetupHelper {
# Remove Thunderbolt Bridge from VirtualNetworkInterfaces in preferences.plist
/usr/libexec/PlistBuddy -c "Delete :VirtualNetworkInterfaces:Bridge:bridge0" "$PREFS" 2>/dev/null || true
networksetup -listlocations | grep -q exo || {
networksetup -createlocation exo
}
networksetup -switchtolocation exo
networksetup -listallhardwareports \\
| awk -F': ' '/Hardware Port: / {print $2}' \\
| while IFS=":" read -r name; do
case "$name" in
"Ethernet Adapter"*)
;;
"Thunderbolt Bridge")
;;
"Thunderbolt "*)
networksetup -listallnetworkservices \\
| grep -q "EXO $name" \\
|| networksetup -createnetworkservice "EXO $name" "$name" 2>/dev/null \\
|| continue
networksetup -setdhcp "EXO $name"
;;
*)
networksetup -listallnetworkservices \\
| grep -q "$name" \\
|| networksetup -createnetworkservice "$name" "$name" 2>/dev/null \\
|| continue
;;
esac
done
networksetup -listnetworkservices | grep -q "Thunderbolt Bridge" && {
networksetup -setnetworkserviceenabled "Thunderbolt Bridge" off
} || true

View File

@@ -3,12 +3,28 @@
perSystem =
{ pkgs, lib, ... }:
let
# Filter source to ONLY include package.json and package-lock.json
# This ensures prettier-svelte only rebuilds when lockfiles change
dashboardLockfileSrc = lib.cleanSourceWith {
src = inputs.self;
filter =
path: type:
let
baseName = builtins.baseNameOf path;
isDashboardDir = baseName == "dashboard" && type == "directory";
isPackageFile =
(lib.hasInfix "/dashboard/" path || lib.hasSuffix "/dashboard" (builtins.dirOf path))
&& (baseName == "package.json" || baseName == "package-lock.json");
in
isDashboardDir || isPackageFile;
};
# Stub source with lockfiles and minimal files for build to succeed
# This allows prettier-svelte to avoid rebuilding when dashboard source changes
dashboardStubSrc = pkgs.runCommand "dashboard-stub-src" { } ''
mkdir -p $out
cp ${inputs.self}/dashboard/package.json $out/
cp ${inputs.self}/dashboard/package-lock.json $out/
cp ${dashboardLockfileSrc}/dashboard/package.json $out/
cp ${dashboardLockfileSrc}/dashboard/package-lock.json $out/
# Minimal files so vite build succeeds (produces empty output)
echo '<!DOCTYPE html><html><head></head><body></body></html>' > $out/index.html
mkdir -p $out/src

View File

@@ -0,0 +1,23 @@
[package]
name = "cluster_membership"
version.workspace = true
edition.workspace = true
publish = false
[dependencies]
# util
anyhow.workspace = true
log.workspace = true
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
# async
tokio = { workspace = true, features = ["full"] }
futures-timer = { workspace = true }
futures-lite = "2.6.1"
# networking
libp2p = { workspace = true, features = ["full"] }
async-trait = "0.1.89"
[lints]
workspace = true

View File

@@ -0,0 +1,30 @@
use cluster_membership::Peer;
use libp2p::identity::ed25519::SecretKey;
use tokio::io::{self, AsyncBufReadExt};
use tracing_subscriber::{EnvFilter, filter::LevelFilter};
#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
.try_init();
let (mut peer, send, mut recv) =
Peer::new(SecretKey::generate(), "hello".to_string()).expect("peer should always build");
let ch = peer.subscribe("chatroom".to_string());
let jh = tokio::spawn(async move { peer.run().await });
let mut stdin = io::BufReader::new(io::stdin()).lines();
loop {
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {send.send((ch.clone(), line.into_bytes())).await.expect("example");}
Some(r) = recv.recv() => match r {
Ok((_, id, line)) => println!("{:?}:{:?}", id, String::from_utf8_lossy(&line)),
Err(e) => eprintln!("{e:?}"),
},
else => break
}
}
jh.await.expect("task failure");
}

View File

@@ -0,0 +1,227 @@
use libp2p::{
Multiaddr, PeerId, Swarm, SwarmBuilder,
futures::StreamExt,
gossipsub::{self, PublishError, Sha256Topic, TopicHash},
identify,
identity::{Keypair, ed25519},
mdns,
swarm::{NetworkBehaviour, SwarmEvent, dial_opts::DialOpts},
};
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use tokio::{select, sync::mpsc};
const DEFAULT_BUFFER_SIZE: usize = 10;
const MDNS_IGNORE_DURATION_SECS: u64 = 30;
impl Peer {
pub fn new(
identity: ed25519::SecretKey,
namespace: String,
) -> anyhow::Result<(
Self,
mpsc::Sender<(TopicHash, Vec<u8>)>,
mpsc::Receiver<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
)> {
let mut id_bytes = identity.as_ref().to_vec();
let mut swarm =
SwarmBuilder::with_existing_identity(Keypair::ed25519_from_bytes(&mut id_bytes)?)
.with_tokio()
.with_quic()
// TODO(evan): .with_bandwidth_metrics();
.with_behaviour(|kp| Behaviour::new(kp, namespace.clone()))?
.build();
swarm.listen_on("/ip6/::/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
let (to_swarm, from_client) = mpsc::channel(DEFAULT_BUFFER_SIZE);
let (to_client, from_swarm) = mpsc::channel(DEFAULT_BUFFER_SIZE);
Ok((
Self {
swarm,
namespace,
denied: HashMap::new(),
from_client,
to_client,
},
to_swarm,
from_swarm,
))
}
pub fn subscribe(&mut self, topic: String) -> TopicHash {
let topic = Sha256Topic::new(topic);
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&topic)
.expect("topic filtered");
topic.hash()
}
pub async fn run(&mut self) {
loop {
select! {
ev = self.swarm.select_next_some() => {
let Ok(()) = self.handle_swarm_event(ev).await else {
return
};
},
Some(msg) = self.from_client.recv() => {
if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(msg.0, msg.1) {
let Ok(()) = self.to_client.send(Err(e)).await else {
return
};
}
},
}
}
}
async fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourEvent>) -> Result<(), ()> {
let SwarmEvent::Behaviour(event) = event else {
if let SwarmEvent::NewListenAddr {
listener_id: _,
address,
} = event
{
log::info!("new listen address {address}")
}
return Ok(());
};
match event {
BehaviourEvent::Mdns(mdns_event) => match mdns_event {
mdns::Event::Discovered(vec) => {
// Dial everyone
let mut addrs = HashMap::<PeerId, Vec<Multiaddr>>::new();
vec.into_iter()
.filter(|(peer_id, _)| {
self.denied.get(peer_id).is_none_or(|t| {
t.elapsed() > Duration::from_secs(MDNS_IGNORE_DURATION_SECS)
})
})
.for_each(|(peer_id, addr)| addrs.entry(peer_id).or_default().push(addr));
addrs.into_iter().for_each(|(peer_id, addrs)| {
let _ = self
.swarm
.dial(DialOpts::peer_id(peer_id).addresses(addrs).build());
});
}
mdns::Event::Expired(vec) => {
vec.iter().for_each(|(peer_id, _)| {
log::debug!("{peer_id} no longer reachable on mDNS");
self.swarm
.behaviour_mut()
.gossipsub
.remove_explicit_peer(peer_id);
});
}
},
BehaviourEvent::Identify(identify::Event::Received {
connection_id: _,
peer_id,
info,
}) => {
if info
.protocols
.iter()
.any(|p| p.as_ref().contains(&self.namespace))
{
self.passed_namespace(peer_id);
} else {
self.failed_namespace(peer_id);
}
}
BehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: _,
message_id: _,
message:
gossipsub::Message {
topic,
data,
source: Some(source_peer),
..
},
}) => {
let Ok(()) = self.to_client.send(Ok((topic, source_peer, data))).await else {
return Err(());
};
}
_ => {}
}
Ok(())
}
fn passed_namespace(&mut self, peer: PeerId) {
log::info!("new peer {peer:?}");
self.denied.remove(&peer);
self.swarm
.behaviour_mut()
.gossipsub
.remove_blacklisted_peer(&peer);
self.swarm
.behaviour_mut()
.gossipsub
.add_explicit_peer(&peer);
}
fn failed_namespace(&mut self, peer: PeerId) {
log::debug!("{peer} failed handshake");
self.denied.insert(peer, Instant::now());
self.swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
// we don't care if disconnect fails
let _ = self.swarm.disconnect_peer_id(peer);
}
}
pub struct Peer {
pub swarm: Swarm<Behaviour>,
denied: HashMap<PeerId, Instant>,
namespace: String,
to_client: mpsc::Sender<Result<(TopicHash, PeerId, Vec<u8>), PublishError>>,
from_client: mpsc::Receiver<(TopicHash, Vec<u8>)>,
}
#[test]
fn foo() {
fn bar<T: Send>(t: T) {}
let p: Peer = unimplemented!();
bar(p);
}
#[derive(NetworkBehaviour)]
pub struct Behaviour {
mdns: mdns::tokio::Behaviour,
pub gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
}
impl Behaviour {
fn new(kp: &Keypair, namespace: String) -> Self {
let mdns = mdns::tokio::Behaviour::new(Default::default(), kp.public().to_peer_id())
.expect("implementation is infallible");
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(kp.clone()),
gossipsub::ConfigBuilder::default()
.max_transmit_size(1024 * 1024)
.protocol_id_prefix(format!("/exo/gossip/{namespace}/v1"))
.build()
.expect("fixed gossipsub config should always build"),
)
.expect("fixed gossipsub init should always build");
let identify = identify::Behaviour::new(
identify::Config::new_with_signed_peer_record(format!("/exo/identity/v1"), kp)
.with_push_listen_addr_updates(true),
);
Behaviour {
mdns,
gossipsub,
identify,
}
}
}

View File

@@ -22,6 +22,7 @@ doc = false
workspace = true
[dependencies]
cluster_membership.workspace = true
networking = { workspace = true }
# interop

View File

@@ -6,3 +6,41 @@
pub mod ident;
pub mod multiaddr;
use std::sync::Mutex;
use cluster_membership::Peer;
use libp2p::identity::ed25519::Keypair;
use pyo3::prelude::*;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
#[gen_stub_pyclass]
#[pyclass]
#[derive(Clone)]
pub struct PyKeypair(Keypair);
#[gen_stub_pymethods]
#[pymethods]
impl PyKeypair {
#[staticmethod]
fn generate() -> Self {
Self(Keypair::generate())
}
}
#[gen_stub_pyclass]
#[pyclass]
pub struct PyPeer(Mutex<Peer>);
#[gen_stub_pymethods]
#[pymethods]
impl PyPeer {
#[staticmethod]
fn init(kp: PyKeypair, namespace: String) -> PyResult<Self> {
Ok(PyPeer(Mutex::new(
Peer::new(kp.0.secret(), namespace)
.map_err(|e| e.pyerr())?
.0,
)))
}
}

View File

@@ -88,7 +88,6 @@ from exo.shared.types.commands import (
PlaceInstance,
SendInputChunk,
StartDownload,
TaskCancelled,
TaskFinished,
)
from exo.shared.types.common import CommandId, Id, NodeId, SessionId
@@ -509,14 +508,16 @@ class API:
break
except anyio.get_cancelled_exc_class():
command = TaskCancelled(cancelled_command_id=command_id)
with anyio.CancelScope(shield=True):
await self.command_sender.send(
ForwarderCommand(origin=self.node_id, command=command)
)
# TODO: TaskCancelled
"""
self.command_sender.send_nowait(
ForwarderCommand(origin=self.node_id, command=command)
)
"""
raise
finally:
await self._send(TaskFinished(finished_command_id=command_id))
command = TaskFinished(finished_command_id=command_id)
await self._send(command)
if command_id in self._chat_completion_queues:
del self._chat_completion_queues[command_id]
@@ -900,11 +901,6 @@ class API:
del image_metadata[key]
except anyio.get_cancelled_exc_class():
command = TaskCancelled(cancelled_command_id=command_id)
with anyio.CancelScope(shield=True):
await self.command_sender.send(
ForwarderCommand(origin=self.node_id, command=command)
)
raise
finally:
await self._send(TaskFinished(finished_command_id=command_id))
@@ -986,11 +982,6 @@ class API:
return (images, stats if capture_stats else None)
except anyio.get_cancelled_exc_class():
command = TaskCancelled(cancelled_command_id=command_id)
with anyio.CancelScope(shield=True):
await self.command_sender.send(
ForwarderCommand(origin=self.node_id, command=command)
)
raise
finally:
await self._send(TaskFinished(finished_command_id=command_id))

View File

@@ -21,7 +21,6 @@ from exo.shared.types.commands import (
PlaceInstance,
RequestEventLog,
SendInputChunk,
TaskCancelled,
TaskFinished,
TestCommand,
)
@@ -36,7 +35,6 @@ from exo.shared.types.events import (
NodeTimedOut,
TaskCreated,
TaskDeleted,
TaskStatusUpdated,
)
from exo.shared.types.state import State
from exo.shared.types.tasks import (
@@ -248,7 +246,7 @@ class Master:
case DeleteInstance():
placement = delete_instance(command, self.state.instances)
transition_events = get_transition_events(
self.state.instances, placement, self.state.tasks
self.state.instances, placement
)
generated_events.extend(transition_events)
case PlaceInstance():
@@ -260,7 +258,7 @@ class Master:
self.state.node_network,
)
transition_events = get_transition_events(
self.state.instances, placement, self.state.tasks
self.state.instances, placement
)
generated_events.extend(transition_events)
case CreateInstance():
@@ -270,7 +268,7 @@ class Master:
self.state.instances,
)
transition_events = get_transition_events(
self.state.instances, placement, self.state.tasks
self.state.instances, placement
)
generated_events.extend(transition_events)
case SendInputChunk(chunk=chunk):
@@ -280,18 +278,6 @@ class Master:
chunk=chunk,
)
)
case TaskCancelled():
if (
task_id := self.command_task_mapping.get(
command.cancelled_command_id
)
) is not None:
generated_events.append(
TaskStatusUpdated(
task_status=TaskStatus.Cancelled,
task_id=task_id,
)
)
case TaskFinished():
generated_events.append(
TaskDeleted(
@@ -300,9 +286,10 @@ class Master:
]
)
)
self.command_task_mapping.pop(
command.finished_command_id, None
)
if command.finished_command_id in self.command_task_mapping:
del self.command_task_mapping[
command.finished_command_id
]
case RequestEventLog():
# We should just be able to send everything, since other buffers will ignore old messages
for i in range(command.since_idx, len(self._event_log)):

View File

@@ -20,15 +20,9 @@ from exo.shared.types.commands import (
PlaceInstance,
)
from exo.shared.types.common import NodeId
from exo.shared.types.events import (
Event,
InstanceCreated,
InstanceDeleted,
TaskStatusUpdated,
)
from exo.shared.types.events import Event, InstanceCreated, InstanceDeleted
from exo.shared.types.memory import Memory
from exo.shared.types.profiling import MemoryUsage, NodeNetworkInfo
from exo.shared.types.tasks import Task, TaskId, TaskStatus
from exo.shared.types.worker.instances import (
Instance,
InstanceId,
@@ -186,7 +180,6 @@ def delete_instance(
def get_transition_events(
current_instances: Mapping[InstanceId, Instance],
target_instances: Mapping[InstanceId, Instance],
tasks: Mapping[TaskId, Task],
) -> Sequence[Event]:
events: list[Event] = []
@@ -202,18 +195,6 @@ def get_transition_events(
# find instances to delete
for instance_id in current_instances:
if instance_id not in target_instances:
for task in tasks.values():
if task.instance_id == instance_id and task.task_status in [
TaskStatus.Pending,
TaskStatus.Running,
]:
events.append(
TaskStatusUpdated(
task_status=TaskStatus.Cancelled,
task_id=task.task_id,
)
)
events.append(
InstanceDeleted(
instance_id=instance_id,

View File

@@ -48,10 +48,6 @@ class DeleteInstance(BaseCommand):
instance_id: InstanceId
class TaskCancelled(BaseCommand):
cancelled_command_id: CommandId
class TaskFinished(BaseCommand):
finished_command_id: CommandId
@@ -88,7 +84,6 @@ Command = (
| PlaceInstance
| CreateInstance
| DeleteInstance
| TaskCancelled
| TaskFinished
| SendInputChunk
)

View File

@@ -24,7 +24,6 @@ class TaskStatus(str, Enum):
Complete = "Complete"
TimedOut = "TimedOut"
Failed = "Failed"
Cancelled = "Cancelled"
class BaseTask(TaggedModel):
@@ -61,10 +60,6 @@ class ChatCompletion(BaseTask): # emitted by Master
error_message: str | None = Field(default=None)
class CancelTask(BaseTask):
cancelled_task_id: TaskId
class ImageGeneration(BaseTask): # emitted by Master
command_id: CommandId
task_params: ImageGenerationTaskParams
@@ -92,7 +87,6 @@ Task = (
| LoadModel
| StartWarmup
| ChatCompletion
| CancelTask
| ImageGeneration
| ImageEdits
| Shutdown

View File

@@ -349,13 +349,8 @@ class InfoGatherer:
async def _monitor_misc(self):
if self.misc_poll_interval is None:
return
prev = await MiscData.gather()
await self.info_sender.send(prev)
while True:
curr = await MiscData.gather()
if prev != curr:
prev = curr
await self.info_sender.send(curr)
await self.info_sender.send(await MiscData.gather())
await anyio.sleep(self.misc_poll_interval)
async def _monitor_system_profiler_thunderbolt_data(self):
@@ -365,15 +360,12 @@ class InfoGatherer:
if iface_map is None:
return
old_idents = []
while True:
data = await ThunderboltConnectivity.gather()
assert data is not None
idents = [it for i in data if (it := i.ident(iface_map)) is not None]
if idents != old_idents:
await self.info_sender.send(MacThunderboltIdentifiers(idents=idents))
old_idents = idents
await self.info_sender.send(MacThunderboltIdentifiers(idents=idents))
conns = [it for i in data if (it := i.conn()) is not None]
await self.info_sender.send(MacThunderboltConnections(conns=conns))
@@ -398,22 +390,17 @@ class InfoGatherer:
async def _watch_system_info(self):
if self.interface_watcher_interval is None:
return
old_nics = []
while True:
nics = await get_network_interfaces()
if nics != old_nics:
old_nics = nics
await self.info_sender.send(NodeNetworkInterfaces(ifaces=nics))
await self.info_sender.send(NodeNetworkInterfaces(ifaces=nics))
await anyio.sleep(self.interface_watcher_interval)
async def _monitor_thunderbolt_bridge_status(self):
if self.thunderbolt_bridge_poll_interval is None:
return
prev: ThunderboltBridgeInfo | None = None
while True:
curr = await ThunderboltBridgeInfo.gather()
if curr is not None and prev != curr:
prev = curr
if curr is not None:
await self.info_sender.send(curr)
await anyio.sleep(self.thunderbolt_bridge_poll_interval)

View File

@@ -23,6 +23,7 @@ from exo.worker.engines.mlx.constants import KV_BITS, KV_GROUP_SIZE, MAX_TOKENS
from exo.worker.engines.mlx.utils_mlx import (
apply_chat_template,
make_kv_cache,
mx_barrier,
)
from exo.worker.runner.bootstrap import logger
@@ -89,6 +90,10 @@ def warmup_inference(
logger.info("Generated ALL warmup tokens")
# TODO: Do we want an mx_barrier?
# At least this version is actively incorrect, as it should use mx_barrier(group)
mx_barrier()
return tokens_generated
@@ -181,3 +186,5 @@ def mlx_generate(
if out.finish_reason is not None:
break
# TODO: Do we want an mx_barrier?

View File

@@ -70,6 +70,8 @@ Group = mx.distributed.Group
resource.setrlimit(resource.RLIMIT_NOFILE, (2048, 4096))
# TODO: Test this
# ALSO https://github.com/exo-explore/exo/pull/233#discussion_r2549683673
def get_weights_size(model_shard_meta: ShardMetadata) -> Memory:
return Memory.from_float_kb(
(model_shard_meta.end_layer - model_shard_meta.start_layer)
@@ -87,6 +89,30 @@ class ModelLoadingTimeoutError(Exception):
pass
def mx_barrier(group: Group | None = None):
mx.eval(
mx.distributed.all_sum(
mx.array(1.0),
stream=mx.default_stream(mx.Device(mx.cpu)),
group=group,
)
)
def broadcast_from_zero(value: int, group: Group | None = None):
if group is None:
return value
if group.rank() == 0:
a = mx.array([value], dtype=mx.int32)
else:
a = mx.array([0], dtype=mx.int32)
m = mx.distributed.all_sum(a, stream=mx.Device(mx.DeviceType.cpu), group=group)
mx.eval(m)
return int(m.item())
class HostList(RootModel[list[str]]):
@classmethod
def from_hosts(cls, hosts: list[Host]) -> "HostList":
@@ -510,23 +536,3 @@ def mlx_cleanup(
import gc
gc.collect()
def mx_any(bool_: bool, group: Group | None) -> bool:
if group is None:
return bool_
num_true = mx.distributed.all_sum(
mx.array(bool_), group=group, stream=mx.default_stream(mx.Device(mx.cpu))
)
mx.eval(num_true)
return num_true.item() > 0
def mx_barrier(group: Group | None):
if group is None:
return
mx.eval(
mx.distributed.all_sum(
mx.array(1.0), group=group, stream=mx.default_stream(mx.Device(mx.cpu))
)
)

View File

@@ -33,7 +33,6 @@ from exo.shared.types.events import (
from exo.shared.types.multiaddr import Multiaddr
from exo.shared.types.state import State
from exo.shared.types.tasks import (
CancelTask,
CreateRunner,
DownloadModel,
ImageEdits,
@@ -116,9 +115,8 @@ class Worker:
self.local_event_sender.close()
self.command_sender.close()
self.download_command_sender.close()
async with create_task_group() as tg:
for runner in self.runners.values():
tg.start_soon(runner.shutdown)
for runner in self.runners.values():
runner.shutdown()
async def _forward_info(self, recv: Receiver[GatheredInfo]):
with recv as info_stream:
@@ -222,22 +220,15 @@ class Worker:
)
)
case Shutdown(runner_id=runner_id):
runner = self.runners.pop(runner_id)
try:
with fail_after(3):
await runner.start_task(task)
await self.runners.pop(runner_id).start_task(task)
except TimeoutError:
await self.event_sender.send(
TaskStatusUpdated(
task_id=task.task_id, task_status=TaskStatus.TimedOut
)
)
finally:
await runner.shutdown()
case CancelTask(cancelled_task_id=cancelled_task_id):
await self.runners[self._task_to_runner_id(task)].cancel_task(
cancelled_task_id
)
case ImageEdits() if task.task_params.total_input_chunks > 0:
# Assemble image from chunks and inject into task
cmd_id = task.command_id
@@ -360,6 +351,8 @@ class Worker:
for event in self.out_for_delivery.copy().values():
await self.local_event_sender.send(event)
## Op Executors
def _create_supervisor(self, task: CreateRunner) -> RunnerSupervisor:
"""Creates and stores a new AssignedRunner with initial downloading status."""
runner = RunnerSupervisor.create(

View File

@@ -4,7 +4,6 @@ from collections.abc import Mapping, Sequence
from exo.shared.types.common import CommandId, NodeId
from exo.shared.types.tasks import (
CancelTask,
ChatCompletion,
ConnectToGroup,
CreateRunner,
@@ -60,8 +59,7 @@ def plan(
or _init_distributed_backend(runners, all_runners)
or _load_model(runners, all_runners, global_download_status)
or _ready_to_warmup(runners, all_runners)
or _cancel_tasks(runners, tasks)
or _pending_tasks(runners, tasks, all_runners, input_chunk_buffer or {})
or _pending_tasks(runners, tasks, all_runners, input_chunk_buffer)
)
@@ -272,7 +270,7 @@ def _pending_tasks(
runners: Mapping[RunnerId, RunnerSupervisor],
tasks: Mapping[TaskId, Task],
all_runners: Mapping[RunnerId, RunnerStatus],
input_chunk_buffer: Mapping[CommandId, dict[int, str]],
input_chunk_buffer: Mapping[CommandId, dict[int, str]] | None = None,
) -> Task | None:
for task in tasks.values():
# for now, just forward chat completions
@@ -286,7 +284,7 @@ def _pending_tasks(
if isinstance(task, ImageEdits) and task.task_params.total_input_chunks > 0:
cmd_id = task.command_id
expected = task.task_params.total_input_chunks
received = len(input_chunk_buffer.get(cmd_id, {}))
received = len((input_chunk_buffer or {}).get(cmd_id, {}))
if received < expected:
continue # Wait for all chunks to arrive
@@ -294,31 +292,16 @@ def _pending_tasks(
if task.instance_id != runner.bound_instance.instance.instance_id:
continue
# the task status _should_ be set to completed by the LAST runner
# it is currently set by the first
# this is definitely a hack
# I have a design point here; this is a state race in disguise as the task status doesn't get updated to completed fast enough
# however, realistically the task status should be set to completed by the LAST runner, so this is a true race
# the actual solution is somewhat deeper than this bypass - TODO!
if task.task_id in runner.completed:
continue
# TODO: Check ordering aligns with MLX distributeds expectations.
if isinstance(runner.status, RunnerReady) and all(
isinstance(all_runners[global_runner_id], (RunnerReady, RunnerRunning))
for global_runner_id in runner.bound_instance.instance.shard_assignments.runner_to_shard
):
return task
def _cancel_tasks(
runners: Mapping[RunnerId, RunnerSupervisor],
tasks: Mapping[TaskId, Task],
) -> Task | None:
for task in tasks.values():
if task.task_status != TaskStatus.Cancelled:
continue
for runner in runners.values():
if task.instance_id != runner.bound_instance.instance.instance_id:
continue
if task.task_id in runner.cancelled:
continue
return CancelTask(
instance_id=task.instance_id, cancelled_task_id=task.task_id
)

View File

@@ -3,7 +3,7 @@ import os
import loguru
from exo.shared.types.events import Event, RunnerStatusUpdated
from exo.shared.types.tasks import Task, TaskId
from exo.shared.types.tasks import Task
from exo.shared.types.worker.instances import BoundInstance, MlxJacclInstance
from exo.shared.types.worker.runners import RunnerFailed
from exo.utils.channels import ClosedResourceError, MpReceiver, MpSender
@@ -15,7 +15,6 @@ def entrypoint(
bound_instance: BoundInstance,
event_sender: MpSender[Event],
task_receiver: MpReceiver[Task],
cancel_receiver: MpReceiver[TaskId],
_logger: "loguru.Logger",
) -> None:
fast_synch_override = os.environ.get("EXO_FAST_SYNCH")
@@ -39,7 +38,7 @@ def entrypoint(
try:
from exo.worker.runner.runner import main
main(bound_instance, event_sender, task_receiver, cancel_receiver)
main(bound_instance, event_sender, task_receiver)
except ClosedResourceError:
logger.warning("Runner communication closed unexpectedly")
except Exception as e:

View File

@@ -37,7 +37,6 @@ from exo.shared.types.tasks import (
Shutdown,
StartWarmup,
Task,
TaskId,
TaskStatus,
)
from exo.shared.types.worker.instances import BoundInstance
@@ -78,7 +77,6 @@ from exo.worker.engines.mlx.utils_mlx import (
initialize_mlx,
load_mlx_items,
mlx_force_oom,
mx_any,
)
from exo.worker.runner.bootstrap import logger
@@ -87,7 +85,6 @@ def main(
bound_instance: BoundInstance,
event_sender: MpSender[Event],
task_receiver: MpReceiver[Task],
cancel_receiver: MpReceiver[TaskId],
):
instance, runner_id, shard_metadata = (
bound_instance.instance,
@@ -102,11 +99,8 @@ def main(
time.sleep(timeout)
setup_start_time = time.time()
cancelled_tasks = set[TaskId]()
# type checker was unhappy with me - splitting these fixed it
inference_model: Model | None = None
image_model: DistributedImageModel | None = None
model: Model | DistributedImageModel | None = None
tokenizer = None
group = None
@@ -117,7 +111,6 @@ def main(
)
with task_receiver as tasks:
for task in tasks:
cancelled_tasks.discard(TaskId("CANCEL_CURRENT_TASK"))
event_sender.send(
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Running)
)
@@ -162,7 +155,7 @@ def main(
time.sleep(0.5)
if ModelTask.TextGeneration in shard_metadata.model_card.tasks:
inference_model, tokenizer = load_mlx_items(
model, tokenizer = load_mlx_items(
bound_instance, group, on_timeout=on_model_load_timeout
)
logger.info(
@@ -172,7 +165,7 @@ def main(
ModelTask.TextToImage in shard_metadata.model_card.tasks
or ModelTask.ImageToImage in shard_metadata.model_card.tasks
):
image_model = initialize_image_model(bound_instance)
model = initialize_image_model(bound_instance)
else:
raise ValueError(
f"Unknown model task(s): {shard_metadata.model_card.tasks}"
@@ -181,6 +174,8 @@ def main(
current_status = RunnerLoaded()
logger.info("runner loaded")
case StartWarmup() if isinstance(current_status, RunnerLoaded):
assert model
current_status = RunnerWarmingUp()
logger.info("runner warming up")
event_sender.send(
@@ -191,11 +186,11 @@ def main(
logger.info(f"warming up inference for instance: {instance}")
if ModelTask.TextGeneration in shard_metadata.model_card.tasks:
assert inference_model
assert not isinstance(model, DistributedImageModel)
assert tokenizer
toks = warmup_inference(
model=inference_model,
model=model,
tokenizer=tokenizer,
# kv_prefix_cache=kv_prefix_cache, # supply for warmup-time prefix caching
)
@@ -207,8 +202,8 @@ def main(
ModelTask.TextToImage in shard_metadata.model_card.tasks
or ModelTask.ImageToImage in shard_metadata.model_card.tasks
):
assert image_model
image = warmup_image_generator(model=image_model)
assert isinstance(model, DistributedImageModel)
image = warmup_image_generator(model=model)
if image is not None:
logger.info(f"warmed up by generating {image.size} image")
else:
@@ -227,7 +222,7 @@ def main(
runner_id=runner_id, runner_status=current_status
)
)
assert inference_model
assert model and not isinstance(model, DistributedImageModel)
assert tokenizer
assert task_params.messages[0].content is not None
@@ -239,7 +234,7 @@ def main(
# Generate responses using the actual MLX generation
mlx_generator = mlx_generate(
model=inference_model,
model=model,
tokenizer=tokenizer,
task=task_params,
prompt=prompt,
@@ -262,11 +257,11 @@ def main(
patch_glm_tokenizer(tokenizer)
# GPT-OSS specific parsing to match other model formats.
elif isinstance(inference_model, GptOssModel):
elif isinstance(model, GptOssModel):
mlx_generator = parse_gpt_oss(mlx_generator)
if tokenizer.has_tool_calling and not isinstance(
inference_model, GptOssModel
model, GptOssModel
):
assert tokenizer.tool_call_start
assert tokenizer.tool_call_end
@@ -278,19 +273,7 @@ def main(
tokenizer.tool_parser, # pyright: ignore[reportAny]
)
cancel_every = 5
tokens_since_last_cancel_check = 0
for response in mlx_generator:
tokens_since_last_cancel_check += 1
if tokens_since_last_cancel_check >= cancel_every:
tokens_since_last_cancel_check = 0
cancelled_tasks.update(cancel_receiver.collect())
want_to_cancel = (task.task_id in cancelled_tasks) or (
TaskId("CANCEL_CURRENT_TASK") in cancelled_tasks
)
if mx_any(want_to_cancel, group):
break
match response:
case GenerationResponse():
if (
@@ -357,7 +340,7 @@ def main(
case ImageGeneration(
task_params=task_params, command_id=command_id
) if isinstance(current_status, RunnerReady):
assert image_model
assert isinstance(model, DistributedImageModel)
logger.info(f"received image generation request: {str(task)[:500]}")
current_status = RunnerRunning()
logger.info("runner running")
@@ -371,9 +354,7 @@ def main(
# Generate images using the image generation backend
# Track image_index for final images only
image_index = 0
for response in generate_image(
model=image_model, task=task_params
):
for response in generate_image(model=model, task=task_params):
if (
shard_metadata.device_rank
== shard_metadata.world_size - 1
@@ -420,7 +401,7 @@ def main(
case ImageEdits(task_params=task_params, command_id=command_id) if (
isinstance(current_status, RunnerReady)
):
assert image_model
assert isinstance(model, DistributedImageModel)
logger.info(f"received image edits request: {str(task)[:500]}")
current_status = RunnerRunning()
logger.info("runner running")
@@ -432,9 +413,7 @@ def main(
try:
image_index = 0
for response in generate_image(
model=image_model, task=task_params
):
for response in generate_image(model=model, task=task_params):
if (
shard_metadata.device_rank
== shard_metadata.world_size - 1
@@ -497,7 +476,7 @@ def main(
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
)
if isinstance(current_status, RunnerShutdown):
del inference_model, image_model, tokenizer, group
del model, tokenizer, group
mx.clear_cache()
import gc

View File

@@ -49,12 +49,10 @@ class RunnerSupervisor:
_ev_recv: MpReceiver[Event]
_task_sender: MpSender[Task]
_event_sender: Sender[Event]
_cancel_sender: MpSender[TaskId]
_tg: TaskGroup = field(default_factory=create_task_group, init=False)
_tg: TaskGroup | None = field(default=None, init=False)
status: RunnerStatus = field(default_factory=RunnerIdle, init=False)
pending: dict[TaskId, anyio.Event] = field(default_factory=dict, init=False)
completed: set[TaskId] = field(default_factory=set, init=False)
cancelled: set[TaskId] = field(default_factory=set, init=False)
@classmethod
def create(
@@ -65,8 +63,8 @@ class RunnerSupervisor:
initialize_timeout: float = 400,
) -> Self:
ev_send, ev_recv = mp_channel[Event]()
# A task is kind of a runner command
task_sender, task_recv = mp_channel[Task]()
cancel_sender, cancel_recv = mp_channel[TaskId]()
runner_process = Process(
target=entrypoint,
@@ -74,7 +72,6 @@ class RunnerSupervisor:
bound_instance,
ev_send,
task_recv,
cancel_recv,
logger,
),
daemon=True,
@@ -89,7 +86,6 @@ class RunnerSupervisor:
initialize_timeout=initialize_timeout,
_ev_recv=ev_recv,
_task_sender=task_sender,
_cancel_sender=cancel_sender,
_event_sender=event_sender,
)
@@ -97,41 +93,37 @@ class RunnerSupervisor:
async def run(self):
self.runner_process.start()
async with self._tg as tg:
async with create_task_group() as tg:
self._tg = tg
tg.start_soon(self._forward_events)
with anyio.CancelScope(shield=True), contextlib.suppress(ClosedResourceError):
await self._cancel_sender.send_async(TaskId("CANCEL_CURRENT_TASK"))
self._ev_recv.close()
self._task_sender.close()
self._event_sender.close()
await to_thread.run_sync(self.runner_process.join, 30)
if not self.runner_process.is_alive():
return
self._ev_recv.close()
self._task_sender.close()
self._event_sender.close()
self._cancel_sender.close()
# This is overkill but it's not technically bad, just unnecessary.
logger.warning("Runner process didn't shutdown succesfully, terminating")
self.runner_process.terminate()
await to_thread.run_sync(self.runner_process.join, 5)
if not self.runner_process.is_alive():
return
await to_thread.run_sync(self.runner_process.join, 10)
if not self.runner_process.is_alive():
return
logger.critical("Runner process didn't respond to SIGTERM, killing")
self.runner_process.kill()
# This is overkill but it's not technically bad, just unnecessary.
logger.warning("Runner process didn't shutdown succesfully, terminating")
self.runner_process.terminate()
await to_thread.run_sync(self.runner_process.join, 5)
if not self.runner_process.is_alive():
return
await to_thread.run_sync(self.runner_process.join, 5)
if not self.runner_process.is_alive():
return
logger.critical("Runner process didn't respond to SIGTERM, killing")
self.runner_process.kill()
logger.critical(
"Runner process didn't respond to SIGKILL. System resources may have leaked"
)
await to_thread.run_sync(self.runner_process.join, 5)
if not self.runner_process.is_alive():
return
logger.critical(
"Runner process didn't respond to SIGKILL. System resources may have leaked"
)
async def shutdown(self):
await self._cancel_sender.send_async(TaskId("CANCEL_CURRENT_TASK"))
def shutdown(self):
assert self._tg
self._tg.cancel_scope.cancel()
async def start_task(self, task: Task):
@@ -139,7 +131,6 @@ class RunnerSupervisor:
logger.info(
f"Skipping invalid task {task} as it has already been completed"
)
return
logger.info(f"Starting task {task}")
event = anyio.Event()
self.pending[task.task_id] = event
@@ -149,13 +140,7 @@ class RunnerSupervisor:
logger.warning(f"Task {task} dropped, runner closed communication.")
return
await event.wait()
async def cancel_task(self, task_id: TaskId):
if task_id in self.completed:
logger.info(f"Unable to cancel {task_id} as it has been completed")
return
self.cancelled.add(task_id)
await self._cancel_sender.send_async(task_id)
logger.info(f"Finished task {task}")
async def _forward_events(self):
with self._ev_recv as events:
@@ -221,4 +206,4 @@ class RunnerSupervisor:
runner_status=RunnerFailed(error_message=f"Terminated ({cause})"),
)
)
await self.shutdown()
self.shutdown()