mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-16 01:51:03 -05:00
Compare commits
16 Commits
sami/flash
...
debug/gpt-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
771a94d944 | ||
|
|
0c266151ca | ||
|
|
556f5a0f6d | ||
|
|
1d0b121457 | ||
|
|
f036add84f | ||
|
|
d63c8c86a8 | ||
|
|
80608eaf64 | ||
|
|
fc32199653 | ||
|
|
028e29a6d8 | ||
|
|
3941855ad6 | ||
|
|
1933b224c9 | ||
|
|
737d97a2d4 | ||
|
|
3e623ccf0d | ||
|
|
c22dad8a7d | ||
|
|
4bc4d50685 | ||
|
|
e0aab46fd8 |
45
AGENTS.md
45
AGENTS.md
@@ -91,6 +91,51 @@ From .cursorrules:
|
||||
- Catch exceptions only where you can handle them meaningfully
|
||||
- Use `@final` and immutability wherever applicable
|
||||
|
||||
## API Reference
|
||||
|
||||
The API is served at `http://localhost:52415` by default. Key files:
|
||||
- `docs/api.md`: Full API documentation
|
||||
- `src/exo/master/api.py`: FastAPI implementation
|
||||
- `src/exo/shared/types/api.py`: Request/response Pydantic models
|
||||
|
||||
### Key Endpoints
|
||||
|
||||
```
|
||||
GET /node_id # Current master node ID
|
||||
GET /state # Full cluster state (topology, instances, downloads, etc.)
|
||||
GET /events # Event log for debugging
|
||||
|
||||
POST /instance # Create model instance
|
||||
GET /instance/{id} # Get instance details
|
||||
DELETE /instance/{id} # Delete instance
|
||||
GET /instance/previews # Preview placements for a model
|
||||
GET /instance/placement # Compute placement without creating
|
||||
|
||||
GET /models # List available models
|
||||
GET /v1/models # OpenAI-compatible model list
|
||||
|
||||
POST /v1/chat/completions # OpenAI-compatible chat completions (streaming/non-streaming)
|
||||
POST /bench/chat/completions # Chat completions with performance stats
|
||||
```
|
||||
|
||||
### Useful curl Commands
|
||||
|
||||
```bash
|
||||
# Check cluster state
|
||||
curl -s http://localhost:52415/state | python3 -m json.tool
|
||||
|
||||
# List models
|
||||
curl -s http://localhost:52415/models | python3 -m json.tool
|
||||
|
||||
# Preview placements for a model
|
||||
curl -s "http://localhost:52415/instance/previews?model_id=llama-3.2-1b" | python3 -m json.tool
|
||||
|
||||
# Chat completion
|
||||
curl -X POST http://localhost:52415/v1/chat/completions \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"model": "llama-3.2-1b", "messages": [{"role": "user", "content": "Hello"}]}'
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
Tests use pytest-asyncio with `asyncio_mode = "auto"`. Tests are in `tests/` subdirectories alongside the code they test. The `EXO_TESTS=1` env var is set during tests.
|
||||
|
||||
19
Cargo.lock
generated
19
Cargo.lock
generated
@@ -4340,25 +4340,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "system_custodian"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"delegate",
|
||||
"derive_more",
|
||||
"either",
|
||||
"extend",
|
||||
"futures",
|
||||
"futures-timer",
|
||||
"impl-trait-for-tuples",
|
||||
"keccak-const",
|
||||
"log",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tracing-subscriber",
|
||||
"util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tagptr"
|
||||
version = "0.2.0"
|
||||
|
||||
@@ -3,7 +3,6 @@ resolver = "3"
|
||||
members = [
|
||||
"rust/networking",
|
||||
"rust/exo_pyo3_bindings",
|
||||
"rust/system_custodian",
|
||||
"rust/util",
|
||||
]
|
||||
|
||||
@@ -25,7 +24,6 @@ opt-level = 3
|
||||
[workspace.dependencies]
|
||||
## Crate members as common dependencies
|
||||
networking = { path = "rust/networking" }
|
||||
system_custodian = { path = "rust/system_custodian" }
|
||||
util = { path = "rust/util" }
|
||||
|
||||
# Proc-macro authoring tools
|
||||
|
||||
9
dashboard/package-lock.json
generated
9
dashboard/package-lock.json
generated
@@ -863,6 +863,7 @@
|
||||
"integrity": "sha512-oH8tXw7EZnie8FdOWYrF7Yn4IKrqTFHhXvl8YxXxbKwTMcD/5NNCryUSEXRk2ZR4ojnub0P8rNrsVGHXWqIDtA==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@standard-schema/spec": "^1.0.0",
|
||||
"@sveltejs/acorn-typescript": "^1.0.5",
|
||||
@@ -902,6 +903,7 @@
|
||||
"integrity": "sha512-Y1Cs7hhTc+a5E9Va/xwKlAJoariQyHY+5zBgCZg4PFWNYQ1nMN9sjK1zhw1gK69DuqVP++sht/1GZg1aRwmAXQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@sveltejs/vite-plugin-svelte-inspector": "^4.0.1",
|
||||
"debug": "^4.4.1",
|
||||
@@ -1518,6 +1520,7 @@
|
||||
"integrity": "sha512-LCCV0HdSZZZb34qifBsyWlUmok6W7ouER+oQIGBScS8EsZsQbrtFTUrDX4hOl+CS6p7cnNC4td+qrSVGSCTUfQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~6.21.0"
|
||||
}
|
||||
@@ -1527,6 +1530,7 @@
|
||||
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz",
|
||||
"integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==",
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"bin": {
|
||||
"acorn": "bin/acorn"
|
||||
},
|
||||
@@ -1939,6 +1943,7 @@
|
||||
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
|
||||
"dev": true,
|
||||
"license": "ISC",
|
||||
"peer": true,
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
}
|
||||
@@ -2646,6 +2651,7 @@
|
||||
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
@@ -2833,6 +2839,7 @@
|
||||
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.45.3.tgz",
|
||||
"integrity": "sha512-ngKXNhNvwPzF43QqEhDOue7TQTrG09em1sd4HBxVF0Wr2gopAmdEWan+rgbdgK4fhBtSOTJO8bYU4chUG7VXZQ==",
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@jridgewell/remapping": "^2.3.4",
|
||||
"@jridgewell/sourcemap-codec": "^1.5.0",
|
||||
@@ -2977,6 +2984,7 @@
|
||||
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
|
||||
"dev": true,
|
||||
"license": "Apache-2.0",
|
||||
"peer": true,
|
||||
"bin": {
|
||||
"tsc": "bin/tsc",
|
||||
"tsserver": "bin/tsserver"
|
||||
@@ -2998,6 +3006,7 @@
|
||||
"integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"esbuild": "^0.25.0",
|
||||
"fdir": "^6.4.4",
|
||||
|
||||
@@ -29,7 +29,6 @@ dependencies = [
|
||||
exo-master = "exo.master.main:main"
|
||||
exo-worker = "exo.worker.main:main"
|
||||
exo = "exo.main:main"
|
||||
exo-rsh = "exo.rsh.client:main"
|
||||
|
||||
# dependencies only required for development
|
||||
[dependency-groups]
|
||||
|
||||
@@ -81,20 +81,6 @@
|
||||
|
||||
config = {
|
||||
packages = {
|
||||
# The system_custodian binary
|
||||
system_custodian = craneLib.buildPackage (
|
||||
commonArgs
|
||||
// {
|
||||
inherit cargoArtifacts;
|
||||
cargoExtraArgs = "-p system_custodian";
|
||||
|
||||
meta = {
|
||||
description = "System custodian daemon for exo";
|
||||
mainProgram = "system_custodian";
|
||||
};
|
||||
}
|
||||
);
|
||||
|
||||
# Python bindings wheel via maturin
|
||||
exo_pyo3_bindings = craneLib.buildPackage (
|
||||
commonArgs
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
[package]
|
||||
name = "system_custodian"
|
||||
version = { workspace = true }
|
||||
edition = { workspace = true }
|
||||
publish = false
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
name = "system_custodian"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[[bin]]
|
||||
path = "src/bin/main.rs"
|
||||
name = "system_custodian"
|
||||
doc = false
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
# datastructures
|
||||
either = { workspace = true }
|
||||
|
||||
# macro dependencies
|
||||
extend = { workspace = true }
|
||||
delegate = { workspace = true }
|
||||
impl-trait-for-tuples = { workspace = true }
|
||||
derive_more = { workspace = true }
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true }
|
||||
futures-timer = { workspace = true }
|
||||
|
||||
# utility dependencies
|
||||
util = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
#internment = { workspace = true }
|
||||
#recursion = { workspace = true }
|
||||
#generativity = { workspace = true }
|
||||
#itertools = { workspace = true }
|
||||
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
|
||||
keccak-const = { workspace = true }
|
||||
|
||||
# tracing/logging
|
||||
log = { workspace = true }
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
//! TODO: documentation
|
||||
//!
|
||||
|
||||
fn main() {}
|
||||
@@ -1,69 +0,0 @@
|
||||
//! This crate defines the logic of, and ways to interact with, Exo's **_System Custodian_** daemon.
|
||||
//!
|
||||
//! The **_System Custodian_** daemon is supposed to be a long-living process that precedes the
|
||||
//! launch of the Exo application, and responsible for ensuring the system (configuration, settings,
|
||||
//! etc.) is in an appropriate state to facilitate the running of Exo application.
|
||||
//! The **_System Custodian_** daemon shall expose a [D-Bus](https://www.freedesktop.org/wiki/Software/dbus/)
|
||||
//! service which Exo application use to _control & query_ it.
|
||||
//!
|
||||
//! # Lifecycle
|
||||
//! When the Exo application starts, it will _wake_ the **_System Custodian_** daemon for the
|
||||
//! duration of its lifetime, and after it has terminated the daemon will go back to sleep. When
|
||||
//! the daemon wakes up, it will configure the system into a state suitable for the Exo Application;
|
||||
//! When the daemon goes to sleep, it will revert those changes as much as it can in case they were
|
||||
//! destructive to the user's pre-existing configurations.
|
||||
//!
|
||||
//! # Responsibilities
|
||||
//! TODO: these are purely on MacOS, but change to be more broad
|
||||
//! The **_System Custodian_** daemon is responsible for using System Configuration framework to
|
||||
//! 1. duplicate the current network set
|
||||
//! 2. modify existing services to turn on IPv6 if not there
|
||||
//! 3. remove any bridge services & add any missing services that AREN'T bridge
|
||||
//! TODO: In the future:
|
||||
//! 1. run a dummy AWDL service to [allow for macOS peer-to-peer wireless networking](https://yggdrasil-network.github.io/2019/08/19/awdl.html)
|
||||
//! 2. toggle some GPU/memory configurations to speed up GPU (ask Alex what those configurations are)
|
||||
//! 3. if we ever decide to provide our **own network interfaces** that abstract over some userland
|
||||
//! logic, this would be the place to spin that up.
|
||||
//!
|
||||
//! Then it will watch the SCDynamicStore for:
|
||||
//! 1. all __actual__ network interfaces -> collect information on them e.g. their BSD name, MAC
|
||||
//! address, MTU, IPv6 addresses, etc. -> and set up watchers/notifiers to inform the DBus
|
||||
//! interface of any changes
|
||||
//! 2. watch for any __undesirable__ changes to configuration and revert it
|
||||
//!
|
||||
//! It should somehow (probably through system sockets and/or BSD interface) trigger IPv6 NDP on
|
||||
//! each of the interfaces & also listen to/query for any changes on the OS routing cache??
|
||||
//! Basically emulate the `ping6 ff02::1%enX` and `ndp -an` commands BUT BETTER!!!
|
||||
//! 1. all that info should coalesce back to the overall state colleted -> should be queryable
|
||||
//! over D-Bus
|
||||
//! TODO:
|
||||
//! 1. we might potentially add to this step a handshake of some kind...? To ensure that we can
|
||||
//! ACTUALLY communicate with that machine over that link over e.g. TCP, UDP, etc. Will the
|
||||
//! handshake require to know Node ID? Will the handshake require heartbeats? Who knows...
|
||||
//! 2. if we ever decide to write proprietary L2/L3 protocols for quicker communication,
|
||||
//! e.g. [AF_NDRV](https://www.zerotier.com/blog/how-zerotier-eliminated-kernel-extensions-on-macos/)
|
||||
//! for raw ethernet frame communication, or even a [custom thunderbolt PCIe driver](https://developer.apple.com/documentation/pcidriverkit/creating-custom-pcie-drivers-for-thunderbolt-devices),
|
||||
//! then this would be the place to carry out discovery and propper handshakes with devices
|
||||
//! on the other end of the link.
|
||||
//!
|
||||
|
||||
// enable Rust-unstable features for convenience
|
||||
#![feature(trait_alias)]
|
||||
#![feature(stmt_expr_attributes)]
|
||||
#![feature(type_alias_impl_trait)]
|
||||
#![feature(specialization)]
|
||||
#![feature(unboxed_closures)]
|
||||
#![feature(const_trait_impl)]
|
||||
#![feature(fn_traits)]
|
||||
|
||||
pub(crate) mod private {
|
||||
// sealed traits support
|
||||
pub trait Sealed {}
|
||||
impl<T: ?Sized> Sealed for T {}
|
||||
}
|
||||
|
||||
/// Namespace for all the type/trait aliases used by this crate.
|
||||
pub(crate) mod alias {}
|
||||
|
||||
/// Namespace for crate-wide extension traits/methods
|
||||
pub(crate) mod ext {}
|
||||
@@ -1,8 +1,6 @@
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any, Optional, cast
|
||||
from typing import cast
|
||||
|
||||
import anyio
|
||||
from anyio import create_task_group
|
||||
@@ -21,7 +19,6 @@ from openai_harmony import ( # pyright: ignore[reportMissingTypeStubs]
|
||||
StreamableParser,
|
||||
load_harmony_encoding,
|
||||
)
|
||||
from pydantic import BaseModel
|
||||
|
||||
from exo.master.placement import place_instance as get_instance_placements
|
||||
from exo.shared.apply import apply
|
||||
@@ -54,9 +51,7 @@ from exo.shared.types.commands import (
|
||||
CreateInstance,
|
||||
DeleteInstance,
|
||||
ForwarderCommand,
|
||||
LaunchFLASH,
|
||||
PlaceInstance,
|
||||
StopFLASH,
|
||||
TaskFinished,
|
||||
)
|
||||
from exo.shared.types.common import CommandId, NodeId, SessionId
|
||||
@@ -65,12 +60,7 @@ from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import ChatCompletionTaskParams
|
||||
from exo.shared.types.worker.instances import (
|
||||
FLASHInstance,
|
||||
Instance,
|
||||
InstanceId,
|
||||
InstanceMeta,
|
||||
)
|
||||
from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
|
||||
from exo.shared.types.worker.shards import Sharding
|
||||
from exo.utils.banner import print_startup_banner
|
||||
from exo.utils.channels import Receiver, Sender, channel
|
||||
@@ -80,22 +70,6 @@ from exo.utils.event_buffer import OrderedBuffer
|
||||
encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)
|
||||
|
||||
|
||||
class ExecuteRequest(BaseModel):
|
||||
"""Request to execute a command."""
|
||||
|
||||
command: list[str]
|
||||
cwd: Optional[str] = None
|
||||
env: Optional[dict[str, str]] = None
|
||||
|
||||
|
||||
class ExecuteResponse(BaseModel):
|
||||
"""Response from command execution."""
|
||||
|
||||
exit_code: int
|
||||
stdout: str
|
||||
stderr: str
|
||||
|
||||
|
||||
def chunk_to_response(
|
||||
chunk: TokenChunk, command_id: CommandId
|
||||
) -> ChatCompletionResponse:
|
||||
@@ -204,12 +178,6 @@ class API:
|
||||
self.app.post("/bench/chat/completions")(self.bench_chat_completions)
|
||||
self.app.get("/state")(lambda: self.state)
|
||||
self.app.get("/events")(lambda: self._event_log)
|
||||
# FLASH simulation endpoints
|
||||
self.app.post("/flash/launch")(self.launch_flash)
|
||||
self.app.delete("/flash/{instance_id}")(self.stop_flash)
|
||||
self.app.get("/flash/instances")(self.list_flash_instances)
|
||||
# Remote execution endpoint (used by exo-rsh for MPI)
|
||||
self.app.post("/execute")(self.execute)
|
||||
|
||||
async def place_instance(self, payload: PlaceInstanceParams):
|
||||
command = PlaceInstance(
|
||||
@@ -654,145 +622,6 @@ class API:
|
||||
]
|
||||
)
|
||||
|
||||
async def launch_flash(
|
||||
self,
|
||||
simulation_name: str,
|
||||
flash_executable_path: str,
|
||||
working_directory: str,
|
||||
parameter_file_path: str = "",
|
||||
ranks_per_node: int = 1,
|
||||
min_nodes: int = 1,
|
||||
hosts: str = "",
|
||||
) -> dict[str, str]:
|
||||
"""Launch a FLASH MPI simulation across the cluster.
|
||||
|
||||
Args:
|
||||
hosts: Optional comma-separated hostnames (e.g., "s14,james21-1").
|
||||
If not provided, IPs are discovered from topology edges.
|
||||
"""
|
||||
command = LaunchFLASH(
|
||||
simulation_name=simulation_name,
|
||||
flash_executable_path=flash_executable_path,
|
||||
parameter_file_path=parameter_file_path,
|
||||
working_directory=working_directory,
|
||||
ranks_per_node=ranks_per_node,
|
||||
min_nodes=min_nodes,
|
||||
hosts=hosts,
|
||||
)
|
||||
await self._send(command)
|
||||
|
||||
return {
|
||||
"message": "FLASH launch command received",
|
||||
"command_id": str(command.command_id),
|
||||
"simulation_name": simulation_name,
|
||||
}
|
||||
|
||||
async def stop_flash(self, instance_id: InstanceId) -> dict[str, str]:
|
||||
"""Stop a running FLASH simulation."""
|
||||
if instance_id not in self.state.instances:
|
||||
raise HTTPException(status_code=404, detail="Instance not found")
|
||||
|
||||
instance = self.state.instances[instance_id]
|
||||
if not isinstance(instance, FLASHInstance):
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Instance is not a FLASH simulation"
|
||||
)
|
||||
|
||||
command = StopFLASH(instance_id=instance_id)
|
||||
await self._send(command)
|
||||
|
||||
return {
|
||||
"message": "Stop command received",
|
||||
"command_id": str(command.command_id),
|
||||
"instance_id": str(instance_id),
|
||||
}
|
||||
|
||||
async def list_flash_instances(self) -> list[dict[str, Any]]:
|
||||
"""List all FLASH simulation instances."""
|
||||
flash_instances: list[dict[str, Any]] = []
|
||||
for instance_id, instance in self.state.instances.items():
|
||||
if isinstance(instance, FLASHInstance):
|
||||
# Get runner statuses for this instance
|
||||
runner_statuses: dict[str, str | None] = {}
|
||||
for (
|
||||
node_id,
|
||||
runner_id,
|
||||
) in instance.shard_assignments.node_to_runner.items():
|
||||
runner_status = self.state.runners.get(runner_id)
|
||||
runner_statuses[str(node_id)] = (
|
||||
str(runner_status) if runner_status else None
|
||||
)
|
||||
|
||||
flash_instances.append(
|
||||
{
|
||||
"instance_id": str(instance_id),
|
||||
"simulation_name": instance.simulation_name,
|
||||
"total_ranks": instance.total_ranks,
|
||||
"working_directory": instance.working_directory,
|
||||
"runner_statuses": runner_statuses,
|
||||
}
|
||||
)
|
||||
return flash_instances
|
||||
|
||||
async def execute(self, request: ExecuteRequest) -> ExecuteResponse:
|
||||
"""Execute a command locally. Used by exo-rsh for MPI remote execution."""
|
||||
cmd_str = " ".join(request.command)
|
||||
logger.info(f"Executing: {cmd_str}")
|
||||
|
||||
try:
|
||||
# Build environment
|
||||
env = os.environ.copy()
|
||||
if request.env:
|
||||
env.update(request.env)
|
||||
|
||||
# Check if command contains shell metacharacters
|
||||
# If so, run through shell. mpirun sends complex commands like:
|
||||
# "VAR=value;export VAR;/path/to/prted --args"
|
||||
needs_shell = any(c in cmd_str for c in ";|&$`")
|
||||
|
||||
if needs_shell:
|
||||
process = await asyncio.create_subprocess_shell(
|
||||
cmd_str,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=request.cwd,
|
||||
env=env,
|
||||
)
|
||||
else:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*request.command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=request.cwd,
|
||||
env=env,
|
||||
)
|
||||
|
||||
stdout, stderr = await process.communicate()
|
||||
exit_code = process.returncode or 0
|
||||
|
||||
logger.info(f"Command completed with exit code {exit_code}")
|
||||
|
||||
return ExecuteResponse(
|
||||
exit_code=exit_code,
|
||||
stdout=stdout.decode("utf-8", errors="replace"),
|
||||
stderr=stderr.decode("utf-8", errors="replace"),
|
||||
)
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Command not found: {request.command[0]}")
|
||||
return ExecuteResponse(
|
||||
exit_code=127,
|
||||
stdout="",
|
||||
stderr=f"Command not found: {request.command[0]}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Execution error: {e}")
|
||||
return ExecuteResponse(
|
||||
exit_code=1,
|
||||
stdout="",
|
||||
stderr=str(e),
|
||||
)
|
||||
|
||||
async def run(self):
|
||||
cfg = Config()
|
||||
cfg.bind = f"0.0.0.0:{self.port}"
|
||||
|
||||
@@ -8,7 +8,6 @@ from exo.master.placement import (
|
||||
add_instance_to_placements,
|
||||
delete_instance,
|
||||
get_transition_events,
|
||||
place_flash_instance,
|
||||
place_instance,
|
||||
)
|
||||
from exo.shared.apply import apply
|
||||
@@ -17,10 +16,8 @@ from exo.shared.types.commands import (
|
||||
CreateInstance,
|
||||
DeleteInstance,
|
||||
ForwarderCommand,
|
||||
LaunchFLASH,
|
||||
PlaceInstance,
|
||||
RequestEventLog,
|
||||
StopFLASH,
|
||||
TaskFinished,
|
||||
TestCommand,
|
||||
)
|
||||
@@ -176,26 +173,6 @@ class Master:
|
||||
self.state.instances, placement
|
||||
)
|
||||
generated_events.extend(transition_events)
|
||||
case LaunchFLASH():
|
||||
placement = place_flash_instance(
|
||||
command,
|
||||
self.state.topology,
|
||||
self.state.instances,
|
||||
)
|
||||
transition_events = get_transition_events(
|
||||
self.state.instances, placement
|
||||
)
|
||||
generated_events.extend(transition_events)
|
||||
case StopFLASH():
|
||||
# Reuse delete_instance logic to stop FLASH simulation
|
||||
placement = delete_instance(
|
||||
DeleteInstance(instance_id=command.instance_id),
|
||||
self.state.instances,
|
||||
)
|
||||
transition_events = get_transition_events(
|
||||
self.state.instances, placement
|
||||
)
|
||||
generated_events.extend(transition_events)
|
||||
case TaskFinished():
|
||||
generated_events.append(
|
||||
TaskDeleted(
|
||||
|
||||
@@ -17,24 +17,20 @@ from exo.shared.topology import Topology
|
||||
from exo.shared.types.commands import (
|
||||
CreateInstance,
|
||||
DeleteInstance,
|
||||
LaunchFLASH,
|
||||
PlaceInstance,
|
||||
)
|
||||
from exo.shared.types.common import Host, NodeId
|
||||
from exo.shared.types.events import Event, InstanceCreated, InstanceDeleted
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.shared.types.models import ModelId
|
||||
from exo.shared.types.topology import NodeInfo
|
||||
from exo.shared.types.worker.instances import (
|
||||
FLASHInstance,
|
||||
Instance,
|
||||
InstanceId,
|
||||
InstanceMeta,
|
||||
MlxJacclInstance,
|
||||
MlxRingInstance,
|
||||
)
|
||||
from exo.shared.types.worker.runners import RunnerId, ShardAssignments
|
||||
from exo.shared.types.worker.shards import PipelineShardMetadata, Sharding
|
||||
from exo.shared.types.worker.shards import Sharding
|
||||
|
||||
|
||||
def random_ephemeral_port() -> int:
|
||||
@@ -169,9 +165,6 @@ def place_instance(
|
||||
hosts_by_node=hosts_by_node,
|
||||
ephemeral_port=ephemeral_port,
|
||||
)
|
||||
case InstanceMeta.FLASH:
|
||||
# FLASH instances are handled by place_flash_instance()
|
||||
raise ValueError("FLASH instances should use place_flash_instance()")
|
||||
|
||||
return target_instances
|
||||
|
||||
@@ -187,148 +180,6 @@ def delete_instance(
|
||||
raise ValueError(f"Instance {command.instance_id} not found")
|
||||
|
||||
|
||||
def place_flash_instance(
|
||||
command: LaunchFLASH,
|
||||
topology: Topology,
|
||||
current_instances: Mapping[InstanceId, Instance],
|
||||
) -> dict[InstanceId, Instance]:
|
||||
"""Place a FLASH simulation instance across available nodes.
|
||||
|
||||
Unlike MLX instances which use ring/JACCL topology for tensor parallelism,
|
||||
FLASH instances use MPI for communication. We just need to provide the
|
||||
node IPs so the runner can generate an MPI hostfile.
|
||||
"""
|
||||
instance_id = InstanceId()
|
||||
target_instances = dict(deepcopy(current_instances))
|
||||
|
||||
all_nodes = list(topology.list_nodes())
|
||||
|
||||
if len(all_nodes) < command.min_nodes:
|
||||
raise ValueError(
|
||||
f"Not enough nodes: need {command.min_nodes}, have {len(all_nodes)}"
|
||||
)
|
||||
|
||||
# Select nodes (take the first min_nodes)
|
||||
selected_nodes = all_nodes[: command.min_nodes]
|
||||
|
||||
logger.info(
|
||||
f"Placing FLASH instance '{command.simulation_name}' on {len(selected_nodes)} nodes"
|
||||
)
|
||||
|
||||
# Build shard assignments (one runner per node for FLASH)
|
||||
runner_to_shard: dict[RunnerId, PipelineShardMetadata] = {}
|
||||
node_to_runner: dict[NodeId, RunnerId] = {}
|
||||
|
||||
# Create a dummy ModelMetadata for FLASH (required by ShardMetadata interface)
|
||||
flash_model_meta = ModelMetadata(
|
||||
model_id=ModelId(command.simulation_name),
|
||||
pretty_name=f"FLASH: {command.simulation_name}",
|
||||
storage_size=Memory(in_bytes=0),
|
||||
n_layers=1,
|
||||
hidden_size=1,
|
||||
supports_tensor=False,
|
||||
)
|
||||
|
||||
for i, node_info in enumerate(selected_nodes):
|
||||
runner_id = RunnerId()
|
||||
node_to_runner[node_info.node_id] = runner_id
|
||||
runner_to_shard[runner_id] = PipelineShardMetadata(
|
||||
device_rank=i,
|
||||
world_size=len(selected_nodes),
|
||||
model_meta=flash_model_meta,
|
||||
start_layer=0,
|
||||
end_layer=1,
|
||||
n_layers=1,
|
||||
)
|
||||
|
||||
shard_assignments = ShardAssignments(
|
||||
model_id=ModelId(command.simulation_name),
|
||||
runner_to_shard=runner_to_shard,
|
||||
node_to_runner=node_to_runner,
|
||||
)
|
||||
|
||||
# Build hosts_by_node - get hostnames/IPs for MPI hostfile generation
|
||||
hosts_by_node: dict[NodeId, list[Host]] = {}
|
||||
|
||||
# If explicit hosts are provided, use them directly
|
||||
if command.hosts:
|
||||
explicit_hosts = [h.strip() for h in command.hosts.split(",") if h.strip()]
|
||||
logger.info(f"FLASH placement: explicit hosts provided: {explicit_hosts}")
|
||||
for i, node_info in enumerate(selected_nodes):
|
||||
if i < len(explicit_hosts):
|
||||
hosts_by_node[node_info.node_id] = [Host(ip=explicit_hosts[i], port=0)]
|
||||
logger.info(
|
||||
f"FLASH placement: node {node_info.node_id} (rank {i}) -> IP {explicit_hosts[i]}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Not enough hosts provided for node {i}, using localhost"
|
||||
)
|
||||
hosts_by_node[node_info.node_id] = [Host(ip="127.0.0.1", port=0)]
|
||||
logger.info(
|
||||
f"FLASH placement: coordinator will be rank 0 at IP {explicit_hosts[0]}"
|
||||
)
|
||||
else:
|
||||
# Try to get IPs from topology edges
|
||||
for node_info in selected_nodes:
|
||||
node_hosts: list[Host] = []
|
||||
|
||||
# Get IP from outgoing edges (connections to other nodes via mDNS discovery)
|
||||
for _, edge_data in topology.out_edges(node_info.node_id):
|
||||
if hasattr(edge_data, "send_back_multiaddr"):
|
||||
# Extract IP from multiaddr like /ip4/192.168.1.100/tcp/52415
|
||||
multiaddr = str(edge_data.send_back_multiaddr)
|
||||
if "/ip4/" in multiaddr:
|
||||
parts = multiaddr.split("/")
|
||||
try:
|
||||
ip_idx = parts.index("ip4") + 1
|
||||
ip = parts[ip_idx]
|
||||
# Skip link-local and localhost addresses
|
||||
if not ip.startswith("169.254.") and not ip.startswith(
|
||||
"127."
|
||||
):
|
||||
node_hosts.append(Host(ip=ip, port=0))
|
||||
break
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
# Last resort: use localhost (will only work for single-node)
|
||||
if not node_hosts:
|
||||
logger.warning(
|
||||
f"Could not determine IP for node {node_info.node_id}, using localhost"
|
||||
)
|
||||
node_hosts.append(Host(ip="127.0.0.1", port=0))
|
||||
|
||||
hosts_by_node[node_info.node_id] = node_hosts
|
||||
|
||||
total_ranks = len(selected_nodes) * command.ranks_per_node
|
||||
|
||||
# Determine coordinator IP - first node's first host IP
|
||||
first_node_id: NodeId = next(iter(hosts_by_node.keys()))
|
||||
coordinator_ip: str = (
|
||||
hosts_by_node[first_node_id][0].ip
|
||||
if hosts_by_node[first_node_id]
|
||||
else "127.0.0.1"
|
||||
)
|
||||
|
||||
target_instances[instance_id] = FLASHInstance(
|
||||
instance_id=instance_id,
|
||||
shard_assignments=shard_assignments,
|
||||
hosts_by_node=hosts_by_node,
|
||||
flash_executable_path=command.flash_executable_path,
|
||||
parameter_file_path=command.parameter_file_path,
|
||||
working_directory=command.working_directory,
|
||||
ranks_per_node=command.ranks_per_node,
|
||||
total_ranks=total_ranks,
|
||||
simulation_name=command.simulation_name,
|
||||
coordinator_ip=coordinator_ip,
|
||||
)
|
||||
|
||||
logger.info(f"Created FLASH instance {instance_id} with {total_ranks} total ranks")
|
||||
|
||||
return target_instances
|
||||
|
||||
|
||||
def get_transition_events(
|
||||
current_instances: Mapping[InstanceId, Instance],
|
||||
target_instances: Mapping[InstanceId, Instance],
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
"""Exo RSH - Remote Shell for MPI without SSH.
|
||||
|
||||
This module provides a remote execution mechanism that allows mpirun to spawn
|
||||
processes on remote nodes without requiring SSH setup. It works by:
|
||||
|
||||
1. Each Exo node runs an API server on port 52415 with an /execute endpoint
|
||||
2. The exo-rsh script acts as a drop-in replacement for ssh
|
||||
3. When mpirun calls "exo-rsh hostname command", it HTTP POSTs to the target's /execute
|
||||
4. The target executes the command and returns output
|
||||
|
||||
Usage:
|
||||
mpirun --mca plm_rsh_agent exo-rsh -np 4 --hostfile hosts.txt ./program
|
||||
"""
|
||||
@@ -1,101 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""exo-rsh - Remote shell client for MPI.
|
||||
|
||||
This script is called by mpirun as a replacement for ssh.
|
||||
Usage: exo-rsh [ssh-options...] hostname command [args...]
|
||||
|
||||
It connects to the target node's Exo API (port 52415) and executes the command.
|
||||
"""
|
||||
|
||||
import json
|
||||
import socket
|
||||
import sys
|
||||
from typing import Any, cast
|
||||
from urllib.error import URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
# Use the same port as Exo's API server
|
||||
EXO_API_PORT = 52415
|
||||
|
||||
|
||||
def resolve_hostname(hostname: str) -> str:
|
||||
"""Resolve hostname to IP address."""
|
||||
try:
|
||||
return socket.gethostbyname(hostname)
|
||||
except socket.gaierror:
|
||||
# If resolution fails, try using the hostname directly
|
||||
return hostname
|
||||
|
||||
|
||||
def main():
|
||||
# Parse arguments - mpirun calls us like: exo-rsh [options] hostname command [args...]
|
||||
# SSH options we might see: -x (disable X11), -o options, etc.
|
||||
args = sys.argv[1:]
|
||||
|
||||
# Skip SSH-style options
|
||||
hostname = None
|
||||
command_start = 0
|
||||
|
||||
i = 0
|
||||
while i < len(args):
|
||||
arg = args[i]
|
||||
if arg.startswith("-"):
|
||||
# Skip option and its value if needed
|
||||
if arg in ("-o", "-i", "-l", "-p", "-F"):
|
||||
i += 2 # Skip option and its argument
|
||||
continue
|
||||
i += 1
|
||||
continue
|
||||
else:
|
||||
# First non-option is the hostname
|
||||
hostname = arg
|
||||
command_start = i + 1
|
||||
break
|
||||
i += 1
|
||||
|
||||
if hostname is None or command_start >= len(args):
|
||||
print("Usage: exo-rsh [options] hostname command [args...]", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
command = args[command_start:]
|
||||
|
||||
# Resolve hostname to IP
|
||||
ip = resolve_hostname(hostname)
|
||||
|
||||
# Make request to Exo API
|
||||
url = f"http://{ip}:{EXO_API_PORT}/execute"
|
||||
data = json.dumps({"command": command}).encode("utf-8")
|
||||
|
||||
try:
|
||||
req = Request(url, data=data, headers={"Content-Type": "application/json"})
|
||||
with urlopen(req, timeout=300) as response: # pyright: ignore[reportAny]
|
||||
response_body: bytes = cast(bytes, response.read()) # pyright: ignore[reportAny]
|
||||
result: dict[str, Any] = json.loads(response_body.decode("utf-8")) # pyright: ignore[reportAny]
|
||||
|
||||
# Output stdout/stderr
|
||||
stdout: str = cast(str, result.get("stdout", ""))
|
||||
stderr: str = cast(str, result.get("stderr", ""))
|
||||
exit_code: int = cast(int, result.get("exit_code", 0))
|
||||
|
||||
if stdout:
|
||||
sys.stdout.write(stdout)
|
||||
sys.stdout.flush()
|
||||
if stderr:
|
||||
sys.stderr.write(stderr)
|
||||
sys.stderr.flush()
|
||||
|
||||
sys.exit(exit_code)
|
||||
|
||||
except URLError as e:
|
||||
print(
|
||||
f"exo-rsh: Failed to connect to {hostname}:{EXO_API_PORT}: {e}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(255)
|
||||
except Exception as e:
|
||||
print(f"exo-rsh: Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -14,32 +14,6 @@ class ModelCard(CamelCaseModel):
|
||||
|
||||
MODEL_CARDS: dict[str, ModelCard] = {
|
||||
# deepseek v3
|
||||
# "deepseek-v3-0324:4bit": ModelCard(
|
||||
# short_id="deepseek-v3-0324:4bit",
|
||||
# model_id="mlx-community/DeepSeek-V3-0324-4bit",
|
||||
# name="DeepSeek V3 0324 (4-bit)",
|
||||
# description="""DeepSeek V3 is a large language model trained on the DeepSeek V3 dataset.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/DeepSeek-V3-0324-4bit"),
|
||||
# pretty_name="DeepSeek V3 0324 (4-bit)",
|
||||
# storage_size=Memory.from_kb(409706307),
|
||||
# n_layers=61,
|
||||
# ),
|
||||
# ),
|
||||
# "deepseek-v3-0324": ModelCard(
|
||||
# short_id="deepseek-v3-0324",
|
||||
# model_id="mlx-community/DeepSeek-v3-0324-8bit",
|
||||
# name="DeepSeek V3 0324 (8-bit)",
|
||||
# description="""DeepSeek V3 is a large language model trained on the DeepSeek V3 dataset.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/DeepSeek-v3-0324-8bit"),
|
||||
# pretty_name="DeepSeek V3 0324 (8-bit)",
|
||||
# storage_size=Memory.from_kb(754706307),
|
||||
# n_layers=61,
|
||||
# ),
|
||||
# ),
|
||||
"deepseek-v3.1-4bit": ModelCard(
|
||||
short_id="deepseek-v3.1-4bit",
|
||||
model_id=ModelId("mlx-community/DeepSeek-V3.1-4bit"),
|
||||
@@ -70,65 +44,6 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# "deepseek-v3.2": ModelCard(
|
||||
# short_id="deepseek-v3.2",
|
||||
# model_id=ModelId("mlx-community/DeepSeek-V3.2-8bit"),
|
||||
# name="DeepSeek V3.2 (8-bit)",
|
||||
# description="""DeepSeek V3.2 is a large language model trained on the DeepSeek V3.2 dataset.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/DeepSeek-V3.2-8bit"),
|
||||
# pretty_name="DeepSeek V3.2 (8-bit)",
|
||||
# storage_size=Memory.from_kb(754706307),
|
||||
# n_layers=61,
|
||||
# hidden_size=7168,
|
||||
# supports_tensor=True,
|
||||
# ),
|
||||
# ),
|
||||
# "deepseek-v3.2-4bit": ModelCard(
|
||||
# short_id="deepseek-v3.2-4bit",
|
||||
# model_id=ModelId("mlx-community/DeepSeek-V3.2-4bit"),
|
||||
# name="DeepSeek V3.2 (4-bit)",
|
||||
# description="""DeepSeek V3.2 is a large language model trained on the DeepSeek V3.2 dataset.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/DeepSeek-V3.2-4bit"),
|
||||
# pretty_name="DeepSeek V3.2 (4-bit)",
|
||||
# storage_size=Memory.from_kb(754706307 // 2), # TODO !!!!!
|
||||
# n_layers=61,
|
||||
# hidden_size=7168,
|
||||
# supports_tensor=True,
|
||||
# ),
|
||||
# ),
|
||||
# deepseek r1
|
||||
# "deepseek-r1-0528-4bit": ModelCard(
|
||||
# short_id="deepseek-r1-0528-4bit",
|
||||
# model_id="mlx-community/DeepSeek-R1-0528-4bit",
|
||||
# name="DeepSeek-R1-0528 (4-bit)",
|
||||
# description="""DeepSeek R1 is a large language model trained on the DeepSeek R1 dataset.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/DeepSeek-R1-0528-4bit"),
|
||||
# pretty_name="DeepSeek R1 671B (4-bit)",
|
||||
# storage_size=Memory.from_kb(409706307),
|
||||
# n_layers=61,
|
||||
# hidden_size=7168,
|
||||
# ),
|
||||
# ),
|
||||
# "deepseek-r1-0528": ModelCard(
|
||||
# short_id="deepseek-r1-0528",
|
||||
# model_id="mlx-community/DeepSeek-R1-0528-8bit",
|
||||
# name="DeepSeek-R1-0528 (8-bit)",
|
||||
# description="""DeepSeek R1 is a large language model trained on the DeepSeek R1 dataset.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/DeepSeek-R1-0528-8bit"),
|
||||
# pretty_name="DeepSeek R1 671B (8-bit)",
|
||||
# storage_size=Memory.from_bytes(754998771712),
|
||||
# n_layers=61,
|
||||
# . hidden_size=7168,
|
||||
# ),
|
||||
# ),
|
||||
# kimi k2
|
||||
"kimi-k2-instruct-4bit": ModelCard(
|
||||
short_id="kimi-k2-instruct-4bit",
|
||||
@@ -525,8 +440,9 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# Needs to be quantized g32 or g16.
|
||||
# glm 4.5
|
||||
"glm-4.5-air-8bit": ModelCard(
|
||||
# Needs to be quantized g32 or g16 to work with tensor parallel
|
||||
short_id="glm-4.5-air-8bit",
|
||||
model_id=ModelId("mlx-community/GLM-4.5-Air-8bit"),
|
||||
name="GLM 4.5 Air 8bit",
|
||||
@@ -556,6 +472,7 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# glm 4.7
|
||||
"glm-4.7-4bit": ModelCard(
|
||||
short_id="glm-4.7-4bit",
|
||||
model_id=ModelId("mlx-community/GLM-4.7-4bit"),
|
||||
@@ -601,6 +518,7 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# minimax-m2
|
||||
"minimax-m2.1-8bit": ModelCard(
|
||||
short_id="minimax-m2.1-8bit",
|
||||
model_id=ModelId("mlx-community/MiniMax-M2.1-8bit"),
|
||||
@@ -631,19 +549,4 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# "devstral-2-123b-instruct-2512-8bit": ModelCard(
|
||||
# short_id="devstral-2-123b-instruct-2512-8bit",
|
||||
# model_id=ModelId("mlx-community/Devstral-2-123B-Instruct-2512-8bit"),
|
||||
# name="Devstral 2 123B Instruct 2512 (8-bit, MLX)",
|
||||
# description="""Mistral AI's Devstral 2 123B Instruct (2512) is an agentic coding model.""",
|
||||
# tags=[],
|
||||
# metadata=ModelMetadata(
|
||||
# model_id=ModelId("mlx-community/Devstral-2-123B-Instruct-2512-8bit"),
|
||||
# pretty_name="Devstral 2 123B Instruct 2512 (8-bit, MLX)",
|
||||
# storage_size=Memory.from_kb(133_000_000),
|
||||
# n_layers=88,
|
||||
# hidden_size=12288,
|
||||
# supports_tensor=True,
|
||||
# ),
|
||||
# ),
|
||||
}
|
||||
|
||||
@@ -35,26 +35,6 @@ class DeleteInstance(BaseCommand):
|
||||
instance_id: InstanceId
|
||||
|
||||
|
||||
class LaunchFLASH(BaseCommand):
|
||||
"""Command to launch a FLASH MPI simulation."""
|
||||
|
||||
simulation_name: str
|
||||
flash_executable_path: str
|
||||
parameter_file_path: str
|
||||
working_directory: str
|
||||
ranks_per_node: int = 1
|
||||
min_nodes: int = 1
|
||||
# Optional: explicit hostnames for MPI (e.g., "s14,james21-1")
|
||||
# Used when topology edges don't contain IP addresses
|
||||
hosts: str = ""
|
||||
|
||||
|
||||
class StopFLASH(BaseCommand):
|
||||
"""Command to stop a running FLASH simulation."""
|
||||
|
||||
instance_id: InstanceId
|
||||
|
||||
|
||||
class TaskFinished(BaseCommand):
|
||||
finished_command_id: CommandId
|
||||
|
||||
@@ -70,8 +50,6 @@ Command = (
|
||||
| PlaceInstance
|
||||
| CreateInstance
|
||||
| DeleteInstance
|
||||
| LaunchFLASH
|
||||
| StopFLASH
|
||||
| TaskFinished
|
||||
)
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ class InstanceId(Id):
|
||||
class InstanceMeta(str, Enum):
|
||||
MlxRing = "MlxRing"
|
||||
MlxJaccl = "MlxJaccl"
|
||||
FLASH = "FLASH"
|
||||
|
||||
|
||||
class BaseInstance(TaggedModel):
|
||||
@@ -35,27 +34,8 @@ class MlxJacclInstance(BaseInstance):
|
||||
jaccl_coordinators: dict[NodeId, str]
|
||||
|
||||
|
||||
class FLASHInstance(BaseInstance):
|
||||
"""Instance for FLASH MPI simulation.
|
||||
|
||||
Unlike MLX instances which do tensor parallelism, FLASH instances
|
||||
coordinate MPI processes across nodes. Each node runs one or more
|
||||
MPI ranks of the FLASH simulation.
|
||||
"""
|
||||
|
||||
hosts_by_node: dict[NodeId, list[Host]]
|
||||
flash_executable_path: str
|
||||
parameter_file_path: str
|
||||
working_directory: str
|
||||
ranks_per_node: int = 1
|
||||
total_ranks: int
|
||||
simulation_name: str
|
||||
coordinator_ip: str
|
||||
network_interface: str = "en0" # Network interface for MPI (e.g., en0, eth0)
|
||||
|
||||
|
||||
# TODO: Single node instance
|
||||
Instance = MlxRingInstance | MlxJacclInstance | FLASHInstance
|
||||
Instance = MlxRingInstance | MlxJacclInstance
|
||||
|
||||
|
||||
class BoundInstance(CamelCaseModel):
|
||||
|
||||
@@ -228,15 +228,10 @@ def tensor_auto_parallel(
|
||||
group=group,
|
||||
)
|
||||
|
||||
if hasattr(model, "shard"):
|
||||
try:
|
||||
model.shard(group) # type: ignore
|
||||
return model
|
||||
except (AttributeError, TypeError, NameError):
|
||||
pass
|
||||
logger.info(f"tensor_auto_parallel: model type = {type(model).__name__}")
|
||||
|
||||
if isinstance(model, (LlamaModel, Ministral3Model)):
|
||||
logger.warning("shouldn't be hit - upstream sharding exists")
|
||||
logger.info("Using LlamaShardingStrategy")
|
||||
tensor_parallel_sharding_strategy = LlamaShardingStrategy(
|
||||
group,
|
||||
all_to_sharded_linear,
|
||||
@@ -245,7 +240,7 @@ def tensor_auto_parallel(
|
||||
sharded_to_all_linear_in_place,
|
||||
)
|
||||
elif isinstance(model, (DeepseekV3Model, DeepseekV32Model)):
|
||||
logger.warning("shouldn't be hit - upstream sharding exists")
|
||||
logger.info("Using DeepSeekShardingStrategy")
|
||||
tensor_parallel_sharding_strategy = DeepSeekShardingStrategy(
|
||||
group,
|
||||
all_to_sharded_linear,
|
||||
@@ -254,6 +249,7 @@ def tensor_auto_parallel(
|
||||
sharded_to_all_linear_in_place,
|
||||
)
|
||||
elif isinstance(model, MiniMaxModel):
|
||||
logger.info("Using MiniMaxShardingStrategy")
|
||||
tensor_parallel_sharding_strategy = MiniMaxShardingStrategy(
|
||||
group,
|
||||
all_to_sharded_linear,
|
||||
@@ -262,6 +258,7 @@ def tensor_auto_parallel(
|
||||
sharded_to_all_linear_in_place,
|
||||
)
|
||||
elif isinstance(model, (Qwen3MoeModel, Glm4MoeModel, Qwen3NextModel)):
|
||||
logger.info("Using QwenShardingStrategy")
|
||||
tensor_parallel_sharding_strategy = QwenShardingStrategy(
|
||||
group,
|
||||
all_to_sharded_linear,
|
||||
@@ -270,6 +267,7 @@ def tensor_auto_parallel(
|
||||
sharded_to_all_linear_in_place,
|
||||
)
|
||||
elif isinstance(model, GptOssModel):
|
||||
logger.info("Using GptOssShardingStrategy for tensor parallelism")
|
||||
tensor_parallel_sharding_strategy = GptOssShardingStrategy(
|
||||
group,
|
||||
all_to_sharded_linear,
|
||||
@@ -352,6 +350,8 @@ def _set_layers(model: nn.Module, layers: list[_LayerCallable]) -> None:
|
||||
class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
model = cast(DeepseekV3Model, model)
|
||||
dense_count = 0
|
||||
moe_count = 0
|
||||
for layer in model.layers:
|
||||
# Shard the self attention
|
||||
if layer.self_attn.q_lora_rank is None:
|
||||
@@ -370,6 +370,7 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
|
||||
|
||||
# Shard the MLP
|
||||
if isinstance(layer.mlp, (DeepseekV3MLP, DeepseekV32MLP)):
|
||||
dense_count += 1
|
||||
layer.mlp.gate_proj = self.all_to_sharded_linear(layer.mlp.gate_proj)
|
||||
layer.mlp.down_proj = self.sharded_to_all_linear(layer.mlp.down_proj)
|
||||
layer.mlp.up_proj = self.all_to_sharded_linear(layer.mlp.up_proj)
|
||||
@@ -377,6 +378,7 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
|
||||
# Shard the MoE. Shard in place since the MoE should be responsible
|
||||
# for aggregating the results.
|
||||
else:
|
||||
moe_count += 1
|
||||
self.all_to_sharded_linear_in_place(layer.mlp.shared_experts.gate_proj)
|
||||
self.sharded_to_all_linear_in_place(layer.mlp.shared_experts.down_proj)
|
||||
self.all_to_sharded_linear_in_place(layer.mlp.shared_experts.up_proj)
|
||||
@@ -386,6 +388,7 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
|
||||
layer.mlp = ShardedDeepseekV3MoE(layer.mlp) # type: ignore
|
||||
layer.mlp.sharding_group = self.group
|
||||
|
||||
logger.info(f"DeepSeekShardingStrategy: {dense_count} dense layers (shard_linear), {moe_count} MoE layers (shard_inplace)")
|
||||
return model
|
||||
|
||||
|
||||
@@ -481,7 +484,6 @@ class ShardedQwenMoE(CustomMlxLayer):
|
||||
class GptOssShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
model = cast(GptOssMoeModel, model)
|
||||
|
||||
for layer in model.layers:
|
||||
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
||||
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
||||
|
||||
@@ -162,12 +162,9 @@ def mlx_distributed_init(
|
||||
os.environ["MLX_IBV_DEVICES"] = coordination_file
|
||||
os.environ["MLX_RANK"] = str(rank)
|
||||
os.environ["MLX_JACCL_COORDINATOR"] = jaccl_coordinator
|
||||
logger.info(f"rank {rank} BEFORE mx.distributed.init(backend='jaccl')")
|
||||
group = mx.distributed.init(backend="jaccl", strict=True)
|
||||
|
||||
case _:
|
||||
raise ValueError(
|
||||
f"Unsupported instance type for MLX distributed: {type(bound_instance.instance)}"
|
||||
)
|
||||
logger.info(f"rank {rank} AFTER mx.distributed.init - group created")
|
||||
|
||||
logger.info(f"Rank {rank} mlx distributed initialization complete")
|
||||
|
||||
@@ -204,10 +201,12 @@ def load_mlx_items(
|
||||
tokenizer = get_tokenizer(model_path, bound_instance.bound_shard)
|
||||
|
||||
else:
|
||||
logger.info("Starting distributed init")
|
||||
logger.info("Starting distributed shard_and_load")
|
||||
start_time = time.perf_counter()
|
||||
logger.info(f"BEFORE shard_and_load for model {bound_instance.bound_shard.model_meta.model_id}")
|
||||
model, tokenizer = shard_and_load(bound_instance.bound_shard, group=group)
|
||||
end_time = time.perf_counter()
|
||||
logger.info(f"AFTER shard_and_load completed")
|
||||
logger.info(
|
||||
f"Time taken to shard and load model: {(end_time - start_time):.2f}s"
|
||||
)
|
||||
@@ -222,8 +221,10 @@ def shard_and_load(
|
||||
group: Group,
|
||||
) -> tuple[nn.Module, TokenizerWrapper]:
|
||||
model_path = build_model_path(shard_metadata.model_meta.model_id)
|
||||
|
||||
logger.info(f"shard_and_load: model_path={model_path}")
|
||||
logger.info("BEFORE load_model (lazy=True)")
|
||||
model, _ = load_model(model_path, lazy=True, strict=False)
|
||||
logger.info("AFTER load_model")
|
||||
logger.debug(model)
|
||||
if hasattr(model, "model") and isinstance(model.model, DeepseekV3Model): # type: ignore
|
||||
pass
|
||||
@@ -257,8 +258,6 @@ def shard_and_load(
|
||||
model = pipeline_auto_parallel(model, group, shard_metadata)
|
||||
|
||||
mx.eval(model.parameters())
|
||||
|
||||
# TODO: Do we need this?
|
||||
mx.eval(model)
|
||||
|
||||
logger.debug("SHARDED")
|
||||
|
||||
@@ -21,12 +21,7 @@ from exo.shared.types.worker.downloads import (
|
||||
DownloadOngoing,
|
||||
DownloadProgress,
|
||||
)
|
||||
from exo.shared.types.worker.instances import (
|
||||
BoundInstance,
|
||||
FLASHInstance,
|
||||
Instance,
|
||||
InstanceId,
|
||||
)
|
||||
from exo.shared.types.worker.instances import BoundInstance, Instance, InstanceId
|
||||
from exo.shared.types.worker.runners import (
|
||||
RunnerConnected,
|
||||
RunnerConnecting,
|
||||
@@ -55,11 +50,6 @@ def plan(
|
||||
all_runners: Mapping[RunnerId, RunnerStatus], # all global
|
||||
tasks: Mapping[TaskId, Task],
|
||||
) -> Task | None:
|
||||
# Check for FLASH instance tasks first
|
||||
flash_task = _plan_flash(runners, instances)
|
||||
if flash_task is not None:
|
||||
return flash_task
|
||||
|
||||
# Python short circuiting OR logic should evaluate these sequentially.
|
||||
return (
|
||||
_kill_runner(runners, all_runners, instances)
|
||||
@@ -72,34 +62,6 @@ def plan(
|
||||
)
|
||||
|
||||
|
||||
def _plan_flash(
|
||||
runners: Mapping[RunnerId, RunnerSupervisor],
|
||||
instances: Mapping[InstanceId, Instance],
|
||||
) -> Task | None:
|
||||
"""Plan tasks specifically for FLASH instances.
|
||||
|
||||
FLASH instances have a simpler lifecycle:
|
||||
- CreateRunner (handled by _create_runner)
|
||||
- LoadModel (starts the simulation immediately)
|
||||
- Shutdown (handled by _kill_runner)
|
||||
|
||||
This function handles the LoadModel step for FLASH instances,
|
||||
skipping the MLX-specific download/init/warmup steps.
|
||||
"""
|
||||
for runner in runners.values():
|
||||
instance = runner.bound_instance.instance
|
||||
|
||||
# Only handle FLASH instances
|
||||
if not isinstance(instance, FLASHInstance):
|
||||
continue
|
||||
|
||||
# If runner is idle, emit LoadModel to start the simulation
|
||||
if isinstance(runner.status, RunnerIdle):
|
||||
return LoadModel(instance_id=instance.instance_id)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _kill_runner(
|
||||
runners: Mapping[RunnerId, RunnerSupervisor],
|
||||
all_runners: Mapping[RunnerId, RunnerStatus],
|
||||
@@ -152,10 +114,6 @@ def _model_needs_download(
|
||||
download_status: Mapping[ModelId, DownloadProgress],
|
||||
) -> DownloadModel | None:
|
||||
for runner in runners.values():
|
||||
# FLASH instances don't need model downloads
|
||||
if isinstance(runner.bound_instance.instance, FLASHInstance):
|
||||
continue
|
||||
|
||||
model_id = runner.bound_instance.bound_shard.model_meta.model_id
|
||||
if isinstance(runner.status, RunnerIdle) and (
|
||||
model_id not in download_status
|
||||
|
||||
@@ -4,11 +4,7 @@ import loguru
|
||||
|
||||
from exo.shared.types.events import Event, RunnerStatusUpdated
|
||||
from exo.shared.types.tasks import Task
|
||||
from exo.shared.types.worker.instances import (
|
||||
BoundInstance,
|
||||
FLASHInstance,
|
||||
MlxJacclInstance,
|
||||
)
|
||||
from exo.shared.types.worker.instances import BoundInstance, MlxJacclInstance
|
||||
from exo.shared.types.worker.runners import RunnerFailed
|
||||
from exo.utils.channels import ClosedResourceError, MpReceiver, MpSender
|
||||
|
||||
@@ -21,27 +17,20 @@ def entrypoint(
|
||||
task_receiver: MpReceiver[Task],
|
||||
_logger: "loguru.Logger",
|
||||
) -> None:
|
||||
if (
|
||||
isinstance(bound_instance.instance, MlxJacclInstance)
|
||||
and len(bound_instance.instance.ibv_devices) >= 2
|
||||
):
|
||||
os.environ["MLX_METAL_FAST_SYNCH"] = "1"
|
||||
|
||||
global logger
|
||||
logger = _logger
|
||||
|
||||
# Route based on instance type
|
||||
# Import main after setting global logger - this lets us just import logger from this module
|
||||
try:
|
||||
if isinstance(bound_instance.instance, FLASHInstance):
|
||||
# FLASH MPI simulation runner
|
||||
from exo.worker.runner.flash_runner import main
|
||||
from exo.worker.runner.runner import main
|
||||
|
||||
main(bound_instance, event_sender, task_receiver)
|
||||
else:
|
||||
# MLX runner (default)
|
||||
if (
|
||||
isinstance(bound_instance.instance, MlxJacclInstance)
|
||||
and len(bound_instance.instance.ibv_devices) >= 2
|
||||
):
|
||||
os.environ["MLX_METAL_FAST_SYNCH"] = "1"
|
||||
|
||||
from exo.worker.runner.runner import main
|
||||
|
||||
main(bound_instance, event_sender, task_receiver)
|
||||
main(bound_instance, event_sender, task_receiver)
|
||||
except ClosedResourceError:
|
||||
logger.warning("Runner communication closed unexpectedly")
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
"""FLASH MPI Runner - spawns and monitors FLASH simulations.
|
||||
|
||||
Exo-native distributed MPI:
|
||||
- Exo handles node discovery and coordination
|
||||
- Coordinator generates hostfile from Exo topology
|
||||
- mpirun uses exo-rsh (no SSH required) to spawn on remote nodes
|
||||
- exo-rsh connects to each node's Exo API (/execute endpoint) for remote execution
|
||||
- Workers just report ready and wait
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import threading
|
||||
|
||||
from exo.shared.types.events import (
|
||||
Event,
|
||||
RunnerStatusUpdated,
|
||||
TaskAcknowledged,
|
||||
TaskStatusUpdated,
|
||||
)
|
||||
from exo.shared.types.tasks import (
|
||||
LoadModel,
|
||||
Shutdown,
|
||||
Task,
|
||||
TaskStatus,
|
||||
)
|
||||
from exo.shared.types.worker.instances import BoundInstance, FLASHInstance
|
||||
from exo.shared.types.worker.runners import (
|
||||
RunnerFailed,
|
||||
RunnerIdle,
|
||||
RunnerLoading,
|
||||
RunnerReady,
|
||||
RunnerRunning,
|
||||
RunnerShutdown,
|
||||
RunnerShuttingDown,
|
||||
RunnerStatus,
|
||||
)
|
||||
from exo.utils.channels import MpReceiver, MpSender
|
||||
from exo.worker.runner.bootstrap import logger
|
||||
|
||||
# Find mpirun in PATH, fallback to common locations
|
||||
MPIRUN_PATH = shutil.which("mpirun") or "/opt/homebrew/bin/mpirun"
|
||||
|
||||
# exo-rsh is installed as console script by exo package
|
||||
_exo_rsh_path = shutil.which("exo-rsh")
|
||||
if not _exo_rsh_path:
|
||||
raise RuntimeError("exo-rsh not found in PATH - this should be installed with exo")
|
||||
EXO_RSH_PATH: str = _exo_rsh_path
|
||||
|
||||
|
||||
def get_my_rank(instance: FLASHInstance, my_node_id: str) -> int:
|
||||
"""Determine this node's rank based on position in hosts_by_node."""
|
||||
for i, node_id in enumerate(instance.hosts_by_node.keys()):
|
||||
if str(node_id) == str(my_node_id):
|
||||
return i
|
||||
return -1
|
||||
|
||||
|
||||
def get_coordinator_host(instance: FLASHInstance) -> str:
|
||||
"""Get the IP of the coordinator node."""
|
||||
return instance.coordinator_ip
|
||||
|
||||
|
||||
def resolve_host(host: str) -> str:
|
||||
"""Resolve host string to a usable hostname for MPI hostfile.
|
||||
|
||||
Accepts either an IP address or hostname. For IPs, attempts to resolve
|
||||
to a hostname via DNS/mDNS. Hostnames are returned as-is after validation.
|
||||
"""
|
||||
# Check if input is already a hostname (not an IP)
|
||||
try:
|
||||
socket.inet_aton(host)
|
||||
is_ip = True
|
||||
except socket.error:
|
||||
is_ip = False
|
||||
|
||||
if not is_ip:
|
||||
# Already a hostname, verify it resolves and return as-is
|
||||
try:
|
||||
socket.gethostbyname(host)
|
||||
return host
|
||||
except socket.gaierror:
|
||||
logger.warning(f"Hostname {host} does not resolve, using anyway")
|
||||
return host
|
||||
|
||||
# It's an IP address, try to resolve to hostname
|
||||
try:
|
||||
hostname, _, _ = socket.gethostbyaddr(host)
|
||||
hostname = hostname.split(".")[0]
|
||||
logger.info(f"Resolved {host} to {hostname}")
|
||||
return hostname
|
||||
except socket.herror:
|
||||
pass
|
||||
|
||||
# Fall back to IP
|
||||
logger.warning(f"Could not resolve {host} to hostname, using IP directly")
|
||||
return host
|
||||
|
||||
|
||||
def generate_hostfile(instance: FLASHInstance, working_dir: str) -> str:
|
||||
"""Generate MPI hostfile from instance topology."""
|
||||
hostfile_path = os.path.join(working_dir, "flash_hosts.txt")
|
||||
with open(hostfile_path, "w") as f:
|
||||
for _node_id, hosts in instance.hosts_by_node.items():
|
||||
if hosts:
|
||||
host = resolve_host(hosts[0].ip)
|
||||
f.write(f"{host} slots={instance.ranks_per_node}\n")
|
||||
logger.info(f"Generated hostfile at {hostfile_path}")
|
||||
with open(hostfile_path, "r") as f:
|
||||
logger.info(f"Hostfile contents:\n{f.read()}")
|
||||
return hostfile_path
|
||||
|
||||
|
||||
def main(
|
||||
bound_instance: BoundInstance,
|
||||
event_sender: MpSender[Event],
|
||||
task_receiver: MpReceiver[Task],
|
||||
):
|
||||
"""Main FLASH runner loop.
|
||||
|
||||
Coordinator: generates hostfile and runs mpirun (uses exo-rsh instead of SSH)
|
||||
Workers: just report ready and wait for mpirun to spawn processes on them
|
||||
"""
|
||||
assert isinstance(bound_instance.instance, FLASHInstance)
|
||||
instance = bound_instance.instance
|
||||
runner_id = bound_instance.bound_runner_id
|
||||
my_node_id = str(bound_instance.bound_node_id)
|
||||
|
||||
logger.info(f"FLASH runner starting for simulation: {instance.simulation_name}")
|
||||
|
||||
my_rank = get_my_rank(instance, my_node_id)
|
||||
world_size = len(instance.hosts_by_node)
|
||||
is_coordinator = my_rank == 0
|
||||
coordinator_ip = get_coordinator_host(instance)
|
||||
|
||||
logger.info(
|
||||
f"FLASH node: rank={my_rank}, world_size={world_size}, coordinator={is_coordinator}"
|
||||
)
|
||||
logger.info(f"FLASH coordinator IP: {coordinator_ip}")
|
||||
|
||||
process: subprocess.Popen[bytes] | None = None
|
||||
current_status: RunnerStatus = RunnerIdle()
|
||||
shutdown_requested = False
|
||||
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
|
||||
)
|
||||
|
||||
def monitor_output(proc: subprocess.Popen[bytes]) -> None:
|
||||
"""Monitor FLASH stdout for progress updates."""
|
||||
if proc.stdout is None:
|
||||
return
|
||||
for line in iter(proc.stdout.readline, b""):
|
||||
if shutdown_requested:
|
||||
break
|
||||
try:
|
||||
decoded: str = line.decode("utf-8", errors="replace").strip()
|
||||
if decoded:
|
||||
logger.info(f"[FLASH] {decoded}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error parsing FLASH output: {e}")
|
||||
|
||||
with task_receiver as tasks:
|
||||
for task in tasks:
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Running)
|
||||
)
|
||||
event_sender.send(TaskAcknowledged(task_id=task.task_id))
|
||||
|
||||
match task:
|
||||
case LoadModel() if isinstance(current_status, RunnerIdle):
|
||||
current_status = RunnerLoading()
|
||||
logger.info("Starting FLASH simulation")
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
if is_coordinator:
|
||||
# Coordinator: generate hostfile and run mpirun
|
||||
hostfile = generate_hostfile(
|
||||
instance, instance.working_directory
|
||||
)
|
||||
|
||||
iface = instance.network_interface
|
||||
cmd = [
|
||||
MPIRUN_PATH,
|
||||
"-np",
|
||||
str(instance.total_ranks),
|
||||
"--hostfile",
|
||||
hostfile,
|
||||
"--wdir",
|
||||
instance.working_directory,
|
||||
"--oversubscribe",
|
||||
"--mca",
|
||||
"btl",
|
||||
"tcp,self",
|
||||
"--mca",
|
||||
"btl_tcp_if_include",
|
||||
iface,
|
||||
"--mca",
|
||||
"oob_tcp_if_include",
|
||||
iface,
|
||||
"--mca",
|
||||
"plm_rsh_no_tree_spawn",
|
||||
"1",
|
||||
]
|
||||
|
||||
# Use exo-rsh for remote execution (no SSH needed)
|
||||
cmd.extend(["--mca", "plm_rsh_agent", EXO_RSH_PATH])
|
||||
|
||||
cmd.append(instance.flash_executable_path)
|
||||
|
||||
logger.info(f"FLASH distributed launch: {' '.join(cmd)}")
|
||||
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
cwd=instance.working_directory,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
monitor_thread = threading.Thread(
|
||||
target=monitor_output, args=(process,), daemon=True
|
||||
)
|
||||
monitor_thread.start()
|
||||
|
||||
current_status = RunnerRunning()
|
||||
logger.info(
|
||||
f"FLASH running on {world_size} nodes with {instance.total_ranks} ranks"
|
||||
)
|
||||
|
||||
else:
|
||||
# Worker: mpirun on coordinator will use exo-rsh to spawn processes here
|
||||
logger.info(
|
||||
f"Worker {my_rank}: Ready for mpirun to spawn processes via exo-rsh"
|
||||
)
|
||||
current_status = RunnerRunning()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start FLASH: {e}")
|
||||
import traceback
|
||||
|
||||
logger.error(traceback.format_exc())
|
||||
current_status = RunnerFailed(error_message=str(e))
|
||||
|
||||
case Shutdown():
|
||||
shutdown_requested = True
|
||||
current_status = RunnerShuttingDown()
|
||||
logger.info("FLASH runner shutting down")
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
)
|
||||
|
||||
if process and process.poll() is None:
|
||||
logger.info("Terminating FLASH simulation")
|
||||
process.terminate()
|
||||
try:
|
||||
process.wait(timeout=10)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("FLASH didn't terminate, killing")
|
||||
process.kill()
|
||||
process.wait()
|
||||
|
||||
current_status = RunnerShutdown()
|
||||
|
||||
case _:
|
||||
if process and process.poll() is not None:
|
||||
exit_code = process.returncode
|
||||
if exit_code == 0:
|
||||
logger.info("FLASH simulation completed successfully")
|
||||
current_status = RunnerReady()
|
||||
else:
|
||||
logger.error(
|
||||
f"FLASH simulation failed with code {exit_code}"
|
||||
)
|
||||
current_status = RunnerFailed(
|
||||
error_message=f"Exit code {exit_code}"
|
||||
)
|
||||
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Complete)
|
||||
)
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
|
||||
)
|
||||
|
||||
if isinstance(current_status, RunnerShutdown):
|
||||
break
|
||||
|
||||
if process and process.poll() is None:
|
||||
process.terminate()
|
||||
process.wait(timeout=5)
|
||||
|
||||
logger.info("FLASH runner exiting")
|
||||
@@ -1,4 +1,5 @@
|
||||
import http.client
|
||||
import time
|
||||
|
||||
from anyio import create_task_group, to_thread
|
||||
from loguru import logger
|
||||
@@ -6,6 +7,8 @@ from loguru import logger
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.common import NodeId
|
||||
|
||||
BAD_STATUSLINE_ATTEMPTS = 3
|
||||
|
||||
|
||||
async def check_reachability(
|
||||
target_ip: str,
|
||||
@@ -15,8 +18,9 @@ async def check_reachability(
|
||||
) -> None:
|
||||
"""Check if a node is reachable at the given IP and verify its identity."""
|
||||
|
||||
def _fetch_remote_node_id() -> NodeId | None:
|
||||
connection = http.client.HTTPConnection(target_ip, 52415, timeout=1)
|
||||
# TODO: use an async http client
|
||||
def _fetch_remote_node_id(*, attempt: int = 1) -> NodeId | None:
|
||||
connection = http.client.HTTPConnection(target_ip, 52415, timeout=3)
|
||||
try:
|
||||
connection.request("GET", "/node_id")
|
||||
response = connection.getresponse()
|
||||
@@ -32,7 +36,16 @@ async def check_reachability(
|
||||
return NodeId(body) or None
|
||||
except OSError:
|
||||
return None
|
||||
except http.client.HTTPException:
|
||||
except http.client.BadStatusLine:
|
||||
if attempt >= BAD_STATUSLINE_ATTEMPTS:
|
||||
logger.warning(
|
||||
f"BadStatusLine from {target_ip}, after {attempt} attempts, assuming connection to {expected_node_id} has dropped"
|
||||
)
|
||||
return None
|
||||
time.sleep(1)
|
||||
return _fetch_remote_node_id(attempt=attempt + 1)
|
||||
except http.client.HTTPException as e:
|
||||
logger.warning(f"HTTPException from {target_ip}: {type(e).__name__}: {e}")
|
||||
return None
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
Reference in New Issue
Block a user