mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-26 23:10:01 -05:00
Compare commits
2 Commits
improve-di
...
alexcheema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
922e8075d3 | ||
|
|
6ee745246d |
@@ -31,35 +31,6 @@ enum NetworkSetupHelper {
|
||||
# Remove Thunderbolt Bridge from VirtualNetworkInterfaces in preferences.plist
|
||||
/usr/libexec/PlistBuddy -c "Delete :VirtualNetworkInterfaces:Bridge:bridge0" "$PREFS" 2>/dev/null || true
|
||||
|
||||
networksetup -listlocations | grep -q exo || {
|
||||
networksetup -createlocation exo
|
||||
}
|
||||
|
||||
networksetup -switchtolocation exo
|
||||
networksetup -listallhardwareports \\
|
||||
| awk -F': ' '/Hardware Port: / {print $2}' \\
|
||||
| while IFS=":" read -r name; do
|
||||
case "$name" in
|
||||
"Ethernet Adapter"*)
|
||||
;;
|
||||
"Thunderbolt Bridge")
|
||||
;;
|
||||
"Thunderbolt "*)
|
||||
networksetup -listallnetworkservices \\
|
||||
| grep -q "EXO $name" \\
|
||||
|| networksetup -createnetworkservice "EXO $name" "$name" 2>/dev/null \\
|
||||
|| continue
|
||||
networksetup -setdhcp "EXO $name"
|
||||
;;
|
||||
*)
|
||||
networksetup -listallnetworkservices \\
|
||||
| grep -q "$name" \\
|
||||
|| networksetup -createnetworkservice "$name" "$name" 2>/dev/null \\
|
||||
|| continue
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
networksetup -listnetworkservices | grep -q "Thunderbolt Bridge" && {
|
||||
networksetup -setnetworkserviceenabled "Thunderbolt Bridge" off
|
||||
} || true
|
||||
|
||||
@@ -3,28 +3,12 @@
|
||||
perSystem =
|
||||
{ pkgs, lib, ... }:
|
||||
let
|
||||
# Filter source to ONLY include package.json and package-lock.json
|
||||
# This ensures prettier-svelte only rebuilds when lockfiles change
|
||||
dashboardLockfileSrc = lib.cleanSourceWith {
|
||||
src = inputs.self;
|
||||
filter =
|
||||
path: type:
|
||||
let
|
||||
baseName = builtins.baseNameOf path;
|
||||
isDashboardDir = baseName == "dashboard" && type == "directory";
|
||||
isPackageFile =
|
||||
(lib.hasInfix "/dashboard/" path || lib.hasSuffix "/dashboard" (builtins.dirOf path))
|
||||
&& (baseName == "package.json" || baseName == "package-lock.json");
|
||||
in
|
||||
isDashboardDir || isPackageFile;
|
||||
};
|
||||
|
||||
# Stub source with lockfiles and minimal files for build to succeed
|
||||
# This allows prettier-svelte to avoid rebuilding when dashboard source changes
|
||||
dashboardStubSrc = pkgs.runCommand "dashboard-stub-src" { } ''
|
||||
mkdir -p $out
|
||||
cp ${dashboardLockfileSrc}/dashboard/package.json $out/
|
||||
cp ${dashboardLockfileSrc}/dashboard/package-lock.json $out/
|
||||
cp ${inputs.self}/dashboard/package.json $out/
|
||||
cp ${inputs.self}/dashboard/package-lock.json $out/
|
||||
# Minimal files so vite build succeeds (produces empty output)
|
||||
echo '<!DOCTYPE html><html><head></head><body></body></html>' > $out/index.html
|
||||
mkdir -p $out/src
|
||||
|
||||
@@ -53,6 +53,7 @@ class Node:
|
||||
await router.register_topic(topics.COMMANDS)
|
||||
await router.register_topic(topics.ELECTION_MESSAGES)
|
||||
await router.register_topic(topics.CONNECTION_MESSAGES)
|
||||
await router.register_topic(topics.STATE_CATCHUP)
|
||||
await router.register_topic(topics.DOWNLOAD_COMMANDS)
|
||||
|
||||
logger.info(f"Starting node {node_id}")
|
||||
@@ -82,6 +83,7 @@ class Node:
|
||||
command_sender=router.sender(topics.COMMANDS),
|
||||
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
|
||||
election_receiver=router.receiver(topics.ELECTION_MESSAGES),
|
||||
state_catchup_receiver=router.receiver(topics.STATE_CATCHUP),
|
||||
)
|
||||
else:
|
||||
api = None
|
||||
@@ -94,6 +96,7 @@ class Node:
|
||||
global_event_receiver=router.receiver(topics.GLOBAL_EVENTS),
|
||||
local_event_sender=router.sender(topics.LOCAL_EVENTS),
|
||||
command_sender=router.sender(topics.COMMANDS),
|
||||
state_catchup_receiver=router.receiver(topics.STATE_CATCHUP),
|
||||
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
|
||||
event_index_counter=event_index_counter,
|
||||
)
|
||||
@@ -107,6 +110,7 @@ class Node:
|
||||
global_event_sender=router.sender(topics.GLOBAL_EVENTS),
|
||||
local_event_receiver=router.receiver(topics.LOCAL_EVENTS),
|
||||
command_receiver=router.receiver(topics.COMMANDS),
|
||||
state_catchup_sender=router.sender(topics.STATE_CATCHUP),
|
||||
)
|
||||
|
||||
er_send, er_recv = channel[ElectionResult]()
|
||||
@@ -189,6 +193,7 @@ class Node:
|
||||
global_event_sender=self.router.sender(topics.GLOBAL_EVENTS),
|
||||
local_event_receiver=self.router.receiver(topics.LOCAL_EVENTS),
|
||||
command_receiver=self.router.receiver(topics.COMMANDS),
|
||||
state_catchup_sender=self.router.sender(topics.STATE_CATCHUP),
|
||||
)
|
||||
self._tg.start_soon(self.master.run)
|
||||
elif (
|
||||
@@ -235,6 +240,9 @@ class Node:
|
||||
),
|
||||
local_event_sender=self.router.sender(topics.LOCAL_EVENTS),
|
||||
command_sender=self.router.sender(topics.COMMANDS),
|
||||
state_catchup_receiver=self.router.receiver(
|
||||
topics.STATE_CATCHUP
|
||||
),
|
||||
download_command_sender=self.router.sender(
|
||||
topics.DOWNLOAD_COMMANDS
|
||||
),
|
||||
|
||||
@@ -166,6 +166,7 @@ class API:
|
||||
download_command_sender: Sender[ForwarderDownloadCommand],
|
||||
# This lets us pause the API if an election is running
|
||||
election_receiver: Receiver[ElectionMessage],
|
||||
state_catchup_receiver: Receiver[State],
|
||||
) -> None:
|
||||
self.state = State()
|
||||
self._event_log: list[Event] = []
|
||||
@@ -173,6 +174,7 @@ class API:
|
||||
self.download_command_sender = download_command_sender
|
||||
self.global_event_receiver = global_event_receiver
|
||||
self.election_receiver = election_receiver
|
||||
self.state_catchup_receiver = state_catchup_receiver
|
||||
self.event_buffer: OrderedBuffer[Event] = OrderedBuffer[Event]()
|
||||
self.node_id: NodeId = node_id
|
||||
self.session_id: SessionId = session_id
|
||||
@@ -1249,6 +1251,7 @@ class API:
|
||||
tg.start_soon(self._apply_state)
|
||||
tg.start_soon(self._pause_on_new_election)
|
||||
tg.start_soon(self._cleanup_expired_images)
|
||||
tg.start_soon(self._state_catchup)
|
||||
print_startup_banner(self.port)
|
||||
await serve(
|
||||
cast(ASGIFramework, self.app),
|
||||
@@ -1259,6 +1262,37 @@ class API:
|
||||
self.command_sender.close()
|
||||
self.global_event_receiver.close()
|
||||
|
||||
async def _state_catchup(self):
|
||||
with self.state_catchup_receiver as states:
|
||||
async for state in states:
|
||||
if (
|
||||
self.state.last_event_applied_idx == -1
|
||||
and state.last_event_applied_idx > self.state.last_event_applied_idx
|
||||
):
|
||||
# DEBUG: Log buffer state BEFORE clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: About to catch up. "
|
||||
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
|
||||
f"catching up to idx: {state.last_event_applied_idx}"
|
||||
)
|
||||
|
||||
new_idx = state.last_event_applied_idx + 1
|
||||
self.event_buffer.next_idx_to_release = new_idx
|
||||
# Preserve events that arrived early but are still valid (idx >= new_idx)
|
||||
# Remove stale events (idx < new_idx) to prevent memory growth
|
||||
self.event_buffer.store = {
|
||||
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
|
||||
}
|
||||
self.state = state
|
||||
|
||||
# DEBUG: Log buffer state AFTER clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: Catchup complete. "
|
||||
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
|
||||
async def _apply_state(self):
|
||||
with self.global_event_receiver as events:
|
||||
async for f_event in events:
|
||||
|
||||
@@ -68,6 +68,8 @@ class Master:
|
||||
# Send events to the forwarder to be indexed (usually from command processing)
|
||||
# Ideally these would be MasterForwarderEvents but type system says no :(
|
||||
global_event_sender: Sender[ForwarderEvent],
|
||||
# not a fan but - send the entire state to a node so it can catchup without the whole event log.
|
||||
state_catchup_sender: Sender[State],
|
||||
):
|
||||
self.state = State()
|
||||
self._tg: TaskGroup = anyio.create_task_group()
|
||||
@@ -77,6 +79,7 @@ class Master:
|
||||
self.command_receiver = command_receiver
|
||||
self.local_event_receiver = local_event_receiver
|
||||
self.global_event_sender = global_event_sender
|
||||
self.state_catchup_sender = state_catchup_sender
|
||||
send, recv = channel[Event]()
|
||||
self.event_sender: Sender[Event] = send
|
||||
self._loopback_event_receiver: Receiver[Event] = recv
|
||||
@@ -84,7 +87,6 @@ class Master:
|
||||
local_event_receiver.clone_sender()
|
||||
)
|
||||
self._multi_buffer = MultiSourceBuffer[NodeId, Event]()
|
||||
# TODO: not have this
|
||||
self._event_log: list[Event] = []
|
||||
|
||||
async def run(self):
|
||||
@@ -291,11 +293,17 @@ class Master:
|
||||
command.finished_command_id
|
||||
]
|
||||
case RequestEventLog():
|
||||
# We should just be able to send everything, since other buffers will ignore old messages
|
||||
for i in range(command.since_idx, len(self._event_log)):
|
||||
await self._send_event(
|
||||
IndexedEvent(idx=i, event=self._event_log[i])
|
||||
if command.since_idx == 0:
|
||||
# This is an optimization, and should not be relied upon in theory.
|
||||
logger.info(
|
||||
f"Master sending catchup state for index {self.state.last_event_applied_idx}"
|
||||
)
|
||||
await self.state_catchup_sender.send(self.state)
|
||||
else:
|
||||
for i in range(command.since_idx, len(self._event_log)):
|
||||
await self._send_event(
|
||||
IndexedEvent(idx=i, event=self._event_log[i])
|
||||
)
|
||||
for event in generated_events:
|
||||
await self.event_sender.send(event)
|
||||
except ValueError as e:
|
||||
|
||||
@@ -27,6 +27,7 @@ from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryUsage,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import ChatCompletion as ChatCompletionTask
|
||||
from exo.shared.types.tasks import TaskStatus
|
||||
from exo.shared.types.worker.instances import (
|
||||
@@ -47,6 +48,7 @@ async def test_master():
|
||||
ge_sender, global_event_receiver = channel[ForwarderEvent]()
|
||||
command_sender, co_receiver = channel[ForwarderCommand]()
|
||||
local_event_sender, le_receiver = channel[ForwarderEvent]()
|
||||
st_s, _st_r = channel[State]()
|
||||
|
||||
all_events: list[IndexedEvent] = []
|
||||
|
||||
@@ -67,6 +69,7 @@ async def test_master():
|
||||
global_event_sender=ge_sender,
|
||||
local_event_receiver=le_receiver,
|
||||
command_receiver=co_receiver,
|
||||
state_catchup_sender=st_s,
|
||||
)
|
||||
logger.info("run the master")
|
||||
async with anyio.create_task_group() as tg:
|
||||
|
||||
@@ -7,6 +7,7 @@ from exo.shared.types.commands import ForwarderCommand, ForwarderDownloadCommand
|
||||
from exo.shared.types.events import (
|
||||
ForwarderEvent,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.utils.pydantic_ext import CamelCaseModel
|
||||
|
||||
|
||||
@@ -45,6 +46,7 @@ ELECTION_MESSAGES = TypedTopic(
|
||||
CONNECTION_MESSAGES = TypedTopic(
|
||||
"connection_messages", PublishPolicy.Never, ConnectionMessage
|
||||
)
|
||||
STATE_CATCHUP = TypedTopic("state_catchup", PublishPolicy.Always, State)
|
||||
DOWNLOAD_COMMANDS = TypedTopic(
|
||||
"download_commands", PublishPolicy.Always, ForwarderDownloadCommand
|
||||
)
|
||||
|
||||
@@ -349,8 +349,13 @@ class InfoGatherer:
|
||||
async def _monitor_misc(self):
|
||||
if self.misc_poll_interval is None:
|
||||
return
|
||||
prev = await MiscData.gather()
|
||||
await self.info_sender.send(prev)
|
||||
while True:
|
||||
await self.info_sender.send(await MiscData.gather())
|
||||
curr = await MiscData.gather()
|
||||
if prev != curr:
|
||||
prev = curr
|
||||
await self.info_sender.send(curr)
|
||||
await anyio.sleep(self.misc_poll_interval)
|
||||
|
||||
async def _monitor_system_profiler_thunderbolt_data(self):
|
||||
@@ -360,12 +365,15 @@ class InfoGatherer:
|
||||
if iface_map is None:
|
||||
return
|
||||
|
||||
old_idents = []
|
||||
while True:
|
||||
data = await ThunderboltConnectivity.gather()
|
||||
assert data is not None
|
||||
|
||||
idents = [it for i in data if (it := i.ident(iface_map)) is not None]
|
||||
await self.info_sender.send(MacThunderboltIdentifiers(idents=idents))
|
||||
if idents != old_idents:
|
||||
await self.info_sender.send(MacThunderboltIdentifiers(idents=idents))
|
||||
old_idents = idents
|
||||
|
||||
conns = [it for i in data if (it := i.conn()) is not None]
|
||||
await self.info_sender.send(MacThunderboltConnections(conns=conns))
|
||||
@@ -390,17 +398,22 @@ class InfoGatherer:
|
||||
async def _watch_system_info(self):
|
||||
if self.interface_watcher_interval is None:
|
||||
return
|
||||
old_nics = []
|
||||
while True:
|
||||
nics = await get_network_interfaces()
|
||||
await self.info_sender.send(NodeNetworkInterfaces(ifaces=nics))
|
||||
if nics != old_nics:
|
||||
old_nics = nics
|
||||
await self.info_sender.send(NodeNetworkInterfaces(ifaces=nics))
|
||||
await anyio.sleep(self.interface_watcher_interval)
|
||||
|
||||
async def _monitor_thunderbolt_bridge_status(self):
|
||||
if self.thunderbolt_bridge_poll_interval is None:
|
||||
return
|
||||
prev: ThunderboltBridgeInfo | None = None
|
||||
while True:
|
||||
curr = await ThunderboltBridgeInfo.gather()
|
||||
if curr is not None:
|
||||
if curr is not None and prev != curr:
|
||||
prev = curr
|
||||
await self.info_sender.send(curr)
|
||||
await anyio.sleep(self.thunderbolt_bridge_poll_interval)
|
||||
|
||||
|
||||
@@ -60,9 +60,8 @@ class Worker:
|
||||
connection_message_receiver: Receiver[ConnectionMessage],
|
||||
global_event_receiver: Receiver[ForwarderEvent],
|
||||
local_event_sender: Sender[ForwarderEvent],
|
||||
# This is for requesting updates. It doesn't need to be a general command sender right now,
|
||||
# but I think it's the correct way to be thinking about commands
|
||||
command_sender: Sender[ForwarderCommand],
|
||||
state_catchup_receiver: Receiver[State],
|
||||
download_command_sender: Sender[ForwarderDownloadCommand],
|
||||
event_index_counter: Iterator[int],
|
||||
):
|
||||
@@ -71,6 +70,8 @@ class Worker:
|
||||
|
||||
self.global_event_receiver = global_event_receiver
|
||||
self.local_event_sender = local_event_sender
|
||||
self.state_catchup_receiver = state_catchup_receiver
|
||||
self.local_event_index = 0
|
||||
self.event_index_counter = event_index_counter
|
||||
self.command_sender = command_sender
|
||||
self.download_command_sender = download_command_sender
|
||||
@@ -110,6 +111,7 @@ class Worker:
|
||||
tg.start_soon(self._event_applier)
|
||||
tg.start_soon(self._forward_events)
|
||||
tg.start_soon(self._poll_connection_updates)
|
||||
tg.start_soon(self._check_catchup_state)
|
||||
|
||||
# Actual shutdown code - waits for all tasks to complete before executing.
|
||||
self.local_event_sender.close()
|
||||
@@ -121,13 +123,47 @@ class Worker:
|
||||
async def _forward_info(self, recv: Receiver[GatheredInfo]):
|
||||
with recv as info_stream:
|
||||
async for info in info_stream:
|
||||
await self.event_sender.send(
|
||||
NodeGatheredInfo(
|
||||
node_id=self.node_id,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
info=info,
|
||||
)
|
||||
event = NodeGatheredInfo(
|
||||
node_id=self.node_id,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
info=info,
|
||||
)
|
||||
logger.warning(
|
||||
f"NODE_GATHERED_INFO: Sending event for node {self.node_id}, "
|
||||
f"event_id={event.event_id}"
|
||||
)
|
||||
await self.event_sender.send(event)
|
||||
|
||||
async def _check_catchup_state(self):
|
||||
with self.state_catchup_receiver as states:
|
||||
async for state in states:
|
||||
if (
|
||||
self.state.last_event_applied_idx == -1
|
||||
and state.last_event_applied_idx > self.state.last_event_applied_idx
|
||||
):
|
||||
# DEBUG: Log buffer state BEFORE clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: About to catch up. "
|
||||
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
|
||||
f"catching up to idx: {state.last_event_applied_idx}"
|
||||
)
|
||||
|
||||
new_idx = state.last_event_applied_idx + 1
|
||||
self.event_buffer.next_idx_to_release = new_idx
|
||||
# Preserve events that arrived early but are still valid (idx >= new_idx)
|
||||
# Remove stale events (idx < new_idx) to prevent memory growth
|
||||
self.event_buffer.store = {
|
||||
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
|
||||
}
|
||||
self.state = state
|
||||
|
||||
# DEBUG: Log buffer state AFTER clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: Catchup complete. "
|
||||
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
|
||||
async def _event_applier(self):
|
||||
with self.global_event_receiver as events:
|
||||
@@ -139,8 +175,20 @@ class Worker:
|
||||
if event_id in self.out_for_delivery:
|
||||
del self.out_for_delivery[event_id]
|
||||
|
||||
# DEBUG: Log what was ingested
|
||||
logger.warning(
|
||||
f"EVENT_APPLIER: Ingested event idx={f_event.origin_idx}, "
|
||||
f"buffer keys now: {sorted(self.event_buffer.store.keys())}"
|
||||
)
|
||||
|
||||
# 2. for each event, apply it to the state
|
||||
indexed_events = self.event_buffer.drain_indexed()
|
||||
|
||||
# DEBUG: Log drain results
|
||||
logger.warning(
|
||||
f"EVENT_APPLIER: Drained {len(indexed_events)} events, "
|
||||
f"next_idx_to_release now: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
if indexed_events:
|
||||
self._nack_attempts = 0
|
||||
|
||||
@@ -157,6 +205,12 @@ class Worker:
|
||||
self._nack_cancel_scope.cancel()
|
||||
|
||||
for idx, event in indexed_events:
|
||||
# DEBUG: Log NodeGatheredInfo events
|
||||
if isinstance(event, NodeGatheredInfo):
|
||||
logger.warning(
|
||||
f"NODE_GATHERED_INFO: Applying event idx={idx} for node {event.node_id}, "
|
||||
f"event_id={event.event_id}"
|
||||
)
|
||||
self.state = apply(self.state, IndexedEvent(idx=idx, event=event))
|
||||
|
||||
# Buffer input image chunks for image editing
|
||||
@@ -318,10 +372,7 @@ class Worker:
|
||||
# We request all events after (and including) the missing index.
|
||||
# This function is started whenever we receive an event that is out of sequence.
|
||||
# It is cancelled as soon as we receiver an event that is in sequence.
|
||||
|
||||
if since_idx < 0:
|
||||
logger.warning(f"Negative value encountered for nack request {since_idx=}")
|
||||
since_idx = 0
|
||||
assert since_idx >= 0
|
||||
|
||||
with CancelScope() as scope:
|
||||
self._nack_cancel_scope = scope
|
||||
|
||||
@@ -7,7 +7,6 @@ from exo.shared.types.tasks import Task
|
||||
from exo.shared.types.worker.instances import BoundInstance, MlxJacclInstance
|
||||
from exo.shared.types.worker.runners import RunnerFailed
|
||||
from exo.utils.channels import ClosedResourceError, MpReceiver, MpSender
|
||||
from exo.worker.tests.patches import load_null_model
|
||||
|
||||
logger: "loguru.Logger" = loguru.logger
|
||||
|
||||
@@ -17,8 +16,6 @@ def entrypoint(
|
||||
event_sender: MpSender[Event],
|
||||
task_receiver: MpReceiver[Task],
|
||||
_logger: "loguru.Logger",
|
||||
*,
|
||||
_load_null_models: bool = False,
|
||||
) -> None:
|
||||
fast_synch_override = os.environ.get("EXO_FAST_SYNCH")
|
||||
if fast_synch_override == "on" or (
|
||||
@@ -32,13 +29,6 @@ def entrypoint(
|
||||
else:
|
||||
os.environ["MLX_METAL_FAST_SYNCH"] = "0"
|
||||
|
||||
p = None
|
||||
if _load_null_models:
|
||||
from unittest.mock import patch
|
||||
|
||||
p = patch("mlx_lm.utils.load_model", new=load_null_model)
|
||||
p.start()
|
||||
|
||||
global logger
|
||||
logger = _logger
|
||||
|
||||
@@ -62,8 +52,6 @@ def entrypoint(
|
||||
)
|
||||
)
|
||||
finally:
|
||||
if p is not None:
|
||||
p.stop()
|
||||
try:
|
||||
event_sender.close()
|
||||
task_receiver.close()
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
# type: ignore
|
||||
|
||||
import importlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from exo.worker.engines.mlx import Model
|
||||
|
||||
|
||||
def load_null_model(path: Path, **_: object) -> "tuple[Model, dict[str, Any]]":
|
||||
with open(path / "config.json", "r") as f:
|
||||
cfg = json.load(f)
|
||||
model, args = _get_classes(cfg)
|
||||
model = model(args.from_dict(cfg))
|
||||
return model, cfg
|
||||
|
||||
|
||||
def _get_classes(config: dict):
|
||||
"""
|
||||
Retrieve the model and model args classes based on the configuration.
|
||||
|
||||
Args:
|
||||
config (dict): The model configuration.
|
||||
|
||||
Returns:
|
||||
A tuple containing the Model class and the ModelArgs class.
|
||||
"""
|
||||
model_type = config["model_type"]
|
||||
model_type = MODEL_REMAPPING.get(model_type, model_type)
|
||||
try:
|
||||
arch = importlib.import_module(f"mlx_lm.models.{model_type}")
|
||||
except ImportError:
|
||||
msg = f"Model type {model_type} not supported."
|
||||
raise ValueError(msg) from None
|
||||
|
||||
return arch.Model, arch.ModelArgs
|
||||
|
||||
|
||||
MODEL_REMAPPING = {
|
||||
"mistral": "llama",
|
||||
"llava": "mistral3",
|
||||
"phi-msft": "phixtral",
|
||||
"falcon_mamba": "mamba",
|
||||
"kimi_k2": "deepseek_v3",
|
||||
"qwen2_5_vl": "qwen2_vl",
|
||||
"minimax_m2": "minimax",
|
||||
"iquestcoder": "llama",
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import multiprocessing as mp
|
||||
import socket
|
||||
import time
|
||||
import typing
|
||||
|
||||
import anyio
|
||||
from fastapi import FastAPI
|
||||
@@ -10,12 +11,16 @@ from hypercorn.asyncio import serve # pyright: ignore[reportUnknownVariableType
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from exo.download.impl_shard_downloader import (
|
||||
build_full_shard,
|
||||
exo_shard_downloader,
|
||||
)
|
||||
from exo.shared.logging import InterceptLogger, logger_setup
|
||||
from exo.shared.models.model_cards import MODEL_CARDS, ModelId
|
||||
from exo.shared.types.api import ChatCompletionMessage, ChatCompletionTaskParams
|
||||
from exo.shared.types.commands import CommandId
|
||||
from exo.shared.types.common import Host, NodeId
|
||||
from exo.shared.types.events import Event, RunnerStatusUpdated
|
||||
from exo.shared.types.events import Event
|
||||
from exo.shared.types.tasks import (
|
||||
ChatCompletion,
|
||||
ConnectToGroup,
|
||||
@@ -31,17 +36,18 @@ from exo.shared.types.worker.instances import (
|
||||
MlxJacclInstance,
|
||||
MlxRingInstance,
|
||||
)
|
||||
from exo.shared.types.worker.runners import RunnerFailed, RunnerId, ShardAssignments
|
||||
from exo.shared.types.worker.runners import RunnerId, ShardAssignments
|
||||
from exo.shared.types.worker.shards import PipelineShardMetadata, TensorShardMetadata
|
||||
from exo.utils.channels import MpReceiver, MpSender, channel, mp_channel
|
||||
from exo.utils.info_gatherer.info_gatherer import GatheredInfo, InfoGatherer
|
||||
from exo.worker.runner.bootstrap import entrypoint
|
||||
|
||||
MODEL_CARDS = {"haha": MODEL_CARDS["qwen3-coder-480b-a35b-8bit"]}
|
||||
|
||||
class Tests(BaseModel):
|
||||
# list[hostname, ip addr]
|
||||
devs: list[list[str]]
|
||||
model_id: str
|
||||
kind: typing.Literal["init", "warmup", "inference"]
|
||||
|
||||
|
||||
mp.set_start_method("spawn", force=True)
|
||||
@@ -50,14 +56,16 @@ logger_setup(None)
|
||||
|
||||
async def main():
|
||||
logger.info("starting cool server majig")
|
||||
await assert_downloads()
|
||||
cfg = Config()
|
||||
cfg.bind = "0.0.0.0:8000"
|
||||
cfg.bind = "0.0.0.0:52415"
|
||||
# nb: shared.logging needs updating if any of this changes
|
||||
cfg.accesslog = "-"
|
||||
cfg.errorlog = "-"
|
||||
cfg.logger_class = InterceptLogger
|
||||
app = FastAPI()
|
||||
app.post("/run_test")(run_test)
|
||||
app.post("/ring")(ring_backend)
|
||||
app.post("/jaccl")(jaccl_backend)
|
||||
app.post("/tb_detection")(tb_detection)
|
||||
shutdown = anyio.Event()
|
||||
await serve(
|
||||
@@ -79,7 +87,28 @@ async def tb_detection():
|
||||
return recv.collect()
|
||||
|
||||
|
||||
async def run_test(test: Tests):
|
||||
async def assert_downloads():
|
||||
sd = exo_shard_downloader()
|
||||
# await sd.ensure_shard(await build_full_shard(MODEL_CARDS["qwen3-0.6b"].model_id))
|
||||
await sd.ensure_shard(
|
||||
await build_full_shard(MODEL_CARDS["llama-3.1-8b-bf16"].model_id)
|
||||
)
|
||||
await sd.ensure_shard(await build_full_shard(MODEL_CARDS["qwen3-30b"].model_id))
|
||||
await sd.ensure_shard(
|
||||
await build_full_shard(MODEL_CARDS["gpt-oss-120b-MXFP4-Q8"].model_id)
|
||||
)
|
||||
await sd.ensure_shard(
|
||||
await build_full_shard(MODEL_CARDS["gpt-oss-20b-4bit"].model_id)
|
||||
)
|
||||
await sd.ensure_shard(
|
||||
await build_full_shard(MODEL_CARDS["glm-4.7-8bit-gs32"].model_id)
|
||||
)
|
||||
await sd.ensure_shard(
|
||||
await build_full_shard(MODEL_CARDS["minimax-m2.1-8bit"].model_id)
|
||||
)
|
||||
|
||||
|
||||
async def ring_backend(test: Tests):
|
||||
iid = InstanceId(str(hash(str(test.devs))))
|
||||
weird_hn = socket.gethostname()
|
||||
for dev in test.devs:
|
||||
@@ -88,30 +117,10 @@ async def run_test(test: Tests):
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"{weird_hn} not in {test.devs}")
|
||||
|
||||
async def run():
|
||||
for card in MODEL_CARDS.values():
|
||||
for instance in (
|
||||
ring_instance(test, card.model_id, iid, hn),
|
||||
jaccl_instance(test, card.model_id, iid),
|
||||
):
|
||||
recv = await execute_test(test, instance, hn)
|
||||
|
||||
with recv:
|
||||
try:
|
||||
async for item in recv:
|
||||
yield item.model_dump_json() + "\n"
|
||||
if isinstance(item, RunnerStatusUpdated) and isinstance(
|
||||
item.runner_status, RunnerFailed
|
||||
):
|
||||
return
|
||||
except anyio.ClosedResourceError:
|
||||
pass
|
||||
|
||||
return StreamingResponse(run())
|
||||
return await execute_test(test, ring_instance(test, iid, hn), hn)
|
||||
|
||||
|
||||
def ring_instance(test: Tests, model_id: ModelId, iid: InstanceId, hn: str) -> Instance:
|
||||
def ring_instance(test: Tests, iid: InstanceId, hn: str) -> Instance:
|
||||
hbn = [Host(ip="i dont care", port=52416) for _ in test.devs]
|
||||
world_size = len(test.devs)
|
||||
for i in range(world_size):
|
||||
@@ -126,13 +135,13 @@ def ring_instance(test: Tests, model_id: ModelId, iid: InstanceId, hn: str) -> I
|
||||
else:
|
||||
raise ValueError(f"{hn} not in {test.devs}")
|
||||
|
||||
card = next(card for card in MODEL_CARDS.values() if card.model_id == model_id)
|
||||
card = MODEL_CARDS[test.model_id]
|
||||
instance = MlxRingInstance(
|
||||
instance_id=iid,
|
||||
ephemeral_port=52416,
|
||||
hosts_by_node={NodeId(hn): hbn},
|
||||
shard_assignments=ShardAssignments(
|
||||
model_id=model_id,
|
||||
model_id=ModelId(test.model_id),
|
||||
node_to_runner={NodeId(host[0]): RunnerId(host[0]) for host in test.devs},
|
||||
runner_to_shard={
|
||||
RunnerId(test.devs[i][0]): PipelineShardMetadata(
|
||||
@@ -154,7 +163,7 @@ def ring_instance(test: Tests, model_id: ModelId, iid: InstanceId, hn: str) -> I
|
||||
return instance
|
||||
|
||||
|
||||
async def execute_test(test: Tests, instance: Instance, hn: str) -> MpReceiver[Event]:
|
||||
async def execute_test(test: Tests, instance: Instance, hn: str):
|
||||
world_size = len(test.devs)
|
||||
iid = InstanceId(str(hash(str(test.devs))))
|
||||
_handle, recv, send = new_runner(instance, hn)
|
||||
@@ -162,33 +171,60 @@ async def execute_test(test: Tests, instance: Instance, hn: str) -> MpReceiver[E
|
||||
send.send(ConnectToGroup(instance_id=iid))
|
||||
send.send(LoadModel(instance_id=iid))
|
||||
|
||||
for card in MODEL_CARDS.values():
|
||||
send.send(StartWarmup(instance_id=iid))
|
||||
send.send(
|
||||
ChatCompletion(
|
||||
task_params=ChatCompletionTaskParams(
|
||||
model=card.model_id,
|
||||
messages=[
|
||||
ChatCompletionMessage(
|
||||
role="system", content="You are a helpful assistant"
|
||||
),
|
||||
ChatCompletionMessage(
|
||||
role="user", content="What is the capital of France?"
|
||||
),
|
||||
],
|
||||
),
|
||||
command_id=CommandId("yo"),
|
||||
instance_id=iid,
|
||||
match test.kind:
|
||||
case "init":
|
||||
pass
|
||||
case "warmup":
|
||||
send.send(StartWarmup(instance_id=iid))
|
||||
case "inference":
|
||||
send.send(StartWarmup(instance_id=iid))
|
||||
send.send(
|
||||
ChatCompletion(
|
||||
task_params=ChatCompletionTaskParams(
|
||||
model=test.model_id,
|
||||
messages=[
|
||||
ChatCompletionMessage(
|
||||
role="system", content="You are a helpful assistant"
|
||||
),
|
||||
ChatCompletionMessage(
|
||||
role="user", content="What is the capital of France?"
|
||||
),
|
||||
],
|
||||
),
|
||||
command_id=CommandId("yo"),
|
||||
instance_id=iid,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
send.send(Shutdown(runner_id=RunnerId(hn), instance_id=iid))
|
||||
|
||||
return recv
|
||||
async def map_recv():
|
||||
with recv:
|
||||
try:
|
||||
async for item in recv:
|
||||
yield item.model_dump_json() + "\n"
|
||||
except anyio.ClosedResourceError:
|
||||
pass
|
||||
|
||||
ret = StreamingResponse(map_recv())
|
||||
ret._pls_dont_gc = _handle # type: ignore
|
||||
return ret
|
||||
|
||||
|
||||
def jaccl_instance(test: Tests, model_id: ModelId, iid: InstanceId):
|
||||
card = next(card for card in MODEL_CARDS.values() if card.model_id == model_id)
|
||||
async def jaccl_backend(test: Tests):
|
||||
iid = InstanceId(str(hash(str(test.devs))))
|
||||
weird_hn = socket.gethostname()
|
||||
for dev in test.devs:
|
||||
if weird_hn.startswith(dev[0]) or dev[0].startswith(weird_hn):
|
||||
hn = dev[0]
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"{weird_hn} not in {test.devs}")
|
||||
return await execute_test(test, jaccl_instance(test, iid), hn)
|
||||
|
||||
|
||||
def jaccl_instance(test: Tests, iid: InstanceId):
|
||||
card = MODEL_CARDS[test.model_id]
|
||||
world_size = len(test.devs)
|
||||
|
||||
return MlxJacclInstance(
|
||||
@@ -199,7 +235,7 @@ def jaccl_instance(test: Tests, model_id: ModelId, iid: InstanceId):
|
||||
NodeId(host[0]): test.devs[0][1] + ":52416" for host in test.devs
|
||||
},
|
||||
shard_assignments=ShardAssignments(
|
||||
model_id=model_id,
|
||||
model_id=ModelId(test.model_id),
|
||||
node_to_runner={NodeId(host[0]): RunnerId(host[0]) for host in test.devs},
|
||||
runner_to_shard={
|
||||
RunnerId(test.devs[i][0]): TensorShardMetadata(
|
||||
@@ -234,7 +270,6 @@ def new_runner(
|
||||
task_recv,
|
||||
logger,
|
||||
),
|
||||
kwargs={"_load_null_models": True},
|
||||
)
|
||||
runner_process._pls_dont_gc = (ev_send, task_recv) # type: ignore
|
||||
runner_process.start()
|
||||
|
||||
@@ -6,8 +6,19 @@ query() {
|
||||
tailscale status | awk -v find="$1" '$2 == find { print $1 }'
|
||||
}
|
||||
|
||||
if [[ $# -lt 1 ]]; then
|
||||
echo "USAGE: $0 [host1] [host2] ..."
|
||||
if [[ $# -lt 2 ]]; then
|
||||
echo "USAGE: $0 <test kind> [host1] [host2] ..."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
kind=$1
|
||||
shift
|
||||
|
||||
test_kinds="ring jaccl"
|
||||
|
||||
if ! echo "$test_kinds" | grep -q "$kind"; then
|
||||
printf "%s is not a known test kind.\nCurrent test kinds are %s" "$kind" "$test_kinds"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -23,12 +34,23 @@ done
|
||||
devs_raw=$(printf "[\"%s\", \"%s\"], " "${weaved[@]}")
|
||||
devs="[${devs_raw%, }]"
|
||||
|
||||
for i in "${!ips[@]}"; do
|
||||
{
|
||||
curl -sN \
|
||||
-X POST "http://${ips[$i]}:8000/run_test" \
|
||||
-H "Content-Type: application/json" -d "{\"devs\": ${devs}}" \
|
||||
2>&1 | sed "s/^/\n${hostnames[$i]}@${ips[$i]}: /" || echo "curl to ${hostnames[$i]} failed" && exit 1
|
||||
} &
|
||||
model_ids=("qwen3-30b" "gpt-oss-120b-MXFP4-Q8" "kimi-k2-thinking")
|
||||
|
||||
for model_id in "${model_ids[@]}"; do
|
||||
for i in "${!ips[@]}"; do
|
||||
{
|
||||
req="{
|
||||
\"model_id\": \"${model_id}\",
|
||||
\"devs\": ${devs},
|
||||
\"kind\": \"inference\"
|
||||
}"
|
||||
echo "req $req"
|
||||
curl -sN \
|
||||
-X POST "http://${ips[$i]}:52415/${kind}" \
|
||||
-H "Content-Type: application/json" -d "$req" \
|
||||
2>&1 | sed "s/^/\n${hostnames[$i]}@${ips[$i]}: /" || echo "curl to ${hostnames[$i]} failed" && exit 1
|
||||
} &
|
||||
done
|
||||
wait
|
||||
done
|
||||
wait
|
||||
|
||||
|
||||
Reference in New Issue
Block a user