mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-20 03:51:14 -05:00
Compare commits
4 Commits
leo/test-s
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
176ab5ba40 | ||
|
|
f5e6aa82d2 | ||
|
|
39f0ed6018 | ||
|
|
ee43b598fe |
9
dashboard/package-lock.json
generated
9
dashboard/package-lock.json
generated
@@ -863,7 +863,6 @@
|
||||
"integrity": "sha512-oH8tXw7EZnie8FdOWYrF7Yn4IKrqTFHhXvl8YxXxbKwTMcD/5NNCryUSEXRk2ZR4ojnub0P8rNrsVGHXWqIDtA==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@standard-schema/spec": "^1.0.0",
|
||||
"@sveltejs/acorn-typescript": "^1.0.5",
|
||||
@@ -903,7 +902,6 @@
|
||||
"integrity": "sha512-Y1Cs7hhTc+a5E9Va/xwKlAJoariQyHY+5zBgCZg4PFWNYQ1nMN9sjK1zhw1gK69DuqVP++sht/1GZg1aRwmAXQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@sveltejs/vite-plugin-svelte-inspector": "^4.0.1",
|
||||
"debug": "^4.4.1",
|
||||
@@ -1520,7 +1518,6 @@
|
||||
"integrity": "sha512-LCCV0HdSZZZb34qifBsyWlUmok6W7ouER+oQIGBScS8EsZsQbrtFTUrDX4hOl+CS6p7cnNC4td+qrSVGSCTUfQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~6.21.0"
|
||||
}
|
||||
@@ -1530,7 +1527,6 @@
|
||||
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz",
|
||||
"integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==",
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"bin": {
|
||||
"acorn": "bin/acorn"
|
||||
},
|
||||
@@ -1943,7 +1939,6 @@
|
||||
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
|
||||
"dev": true,
|
||||
"license": "ISC",
|
||||
"peer": true,
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
}
|
||||
@@ -2651,7 +2646,6 @@
|
||||
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
@@ -2839,7 +2833,6 @@
|
||||
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.45.3.tgz",
|
||||
"integrity": "sha512-ngKXNhNvwPzF43QqEhDOue7TQTrG09em1sd4HBxVF0Wr2gopAmdEWan+rgbdgK4fhBtSOTJO8bYU4chUG7VXZQ==",
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@jridgewell/remapping": "^2.3.4",
|
||||
"@jridgewell/sourcemap-codec": "^1.5.0",
|
||||
@@ -2984,7 +2977,6 @@
|
||||
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
|
||||
"dev": true,
|
||||
"license": "Apache-2.0",
|
||||
"peer": true,
|
||||
"bin": {
|
||||
"tsc": "bin/tsc",
|
||||
"tsserver": "bin/tsserver"
|
||||
@@ -3006,7 +2998,6 @@
|
||||
"integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"esbuild": "^0.25.0",
|
||||
"fdir": "^6.4.4",
|
||||
|
||||
@@ -71,44 +71,46 @@ export interface Instance {
|
||||
};
|
||||
}
|
||||
|
||||
interface RawNodeProfile {
|
||||
// Granular node state types from the new state structure
|
||||
interface RawNodeIdentity {
|
||||
modelId?: string;
|
||||
chipId?: string;
|
||||
friendlyName?: string;
|
||||
networkInterfaces?: Array<{
|
||||
name?: string;
|
||||
ipAddress?: string;
|
||||
addresses?: Array<{ address?: string } | string>;
|
||||
ipv4?: string;
|
||||
ipv6?: string;
|
||||
ipAddresses?: string[];
|
||||
ips?: string[];
|
||||
}>;
|
||||
memory?: {
|
||||
ramTotal?: { inBytes: number };
|
||||
ramAvailable?: { inBytes: number };
|
||||
swapTotal?: { inBytes: number };
|
||||
swapAvailable?: { inBytes: number };
|
||||
};
|
||||
system?: {
|
||||
gpuUsage?: number;
|
||||
temp?: number;
|
||||
sysPower?: number;
|
||||
};
|
||||
}
|
||||
|
||||
interface RawTopologyNode {
|
||||
nodeId: string;
|
||||
nodeProfile?: RawNodeProfile;
|
||||
interface RawMemoryUsage {
|
||||
ramTotal?: { inBytes: number };
|
||||
ramAvailable?: { inBytes: number };
|
||||
swapTotal?: { inBytes: number };
|
||||
swapAvailable?: { inBytes: number };
|
||||
}
|
||||
|
||||
interface RawSystemPerformanceProfile {
|
||||
gpuUsage?: number;
|
||||
temp?: number;
|
||||
sysPower?: number;
|
||||
pcpuUsage?: number;
|
||||
ecpuUsage?: number;
|
||||
}
|
||||
|
||||
interface RawNetworkInterfaceInfo {
|
||||
name?: string;
|
||||
ipAddress?: string;
|
||||
addresses?: Array<{ address?: string } | string>;
|
||||
ipv4?: string;
|
||||
ipv6?: string;
|
||||
ipAddresses?: string[];
|
||||
ips?: string[];
|
||||
}
|
||||
|
||||
interface RawNodeNetworkInfo {
|
||||
interfaces?: RawNetworkInterfaceInfo[];
|
||||
}
|
||||
|
||||
// New connection edge types from Python SocketConnection/RDMAConnection
|
||||
interface RawSocketConnection {
|
||||
sinkMultiaddr?: {
|
||||
address?: string;
|
||||
// Multiaddr uses snake_case (no camelCase alias)
|
||||
ip_address?: string;
|
||||
ipAddress?: string; // fallback in case it changes
|
||||
address_type?: string;
|
||||
port?: number;
|
||||
};
|
||||
@@ -125,14 +127,10 @@ type RawConnectionEdge = RawSocketConnection | RawRDMAConnection;
|
||||
type RawConnectionsMap = Record<string, Record<string, RawConnectionEdge[]>>;
|
||||
|
||||
interface RawTopology {
|
||||
// nodes can be array of strings (node IDs) or array of objects with nodeId/nodeProfile
|
||||
nodes: (string | RawTopologyNode)[];
|
||||
// New nested mapping format
|
||||
nodes: string[];
|
||||
connections?: RawConnectionsMap;
|
||||
}
|
||||
|
||||
type RawNodeProfiles = Record<string, RawNodeProfile>;
|
||||
|
||||
export interface DownloadProgress {
|
||||
totalBytes: number;
|
||||
downloadedBytes: number;
|
||||
@@ -187,7 +185,11 @@ interface RawStateResponse {
|
||||
>;
|
||||
runners?: Record<string, unknown>;
|
||||
downloads?: Record<string, unknown[]>;
|
||||
nodeProfiles?: RawNodeProfiles;
|
||||
// New granular node state fields
|
||||
nodeIdentities?: Record<string, RawNodeIdentity>;
|
||||
nodeMemory?: Record<string, RawMemoryUsage>;
|
||||
nodeSystem?: Record<string, RawSystemPerformanceProfile>;
|
||||
nodeNetwork?: Record<string, RawNodeNetworkInfo>;
|
||||
}
|
||||
|
||||
export interface MessageAttachment {
|
||||
@@ -222,65 +224,69 @@ export interface Conversation {
|
||||
|
||||
const STORAGE_KEY = "exo-conversations";
|
||||
|
||||
interface GranularNodeState {
|
||||
nodeIdentities?: Record<string, RawNodeIdentity>;
|
||||
nodeMemory?: Record<string, RawMemoryUsage>;
|
||||
nodeSystem?: Record<string, RawSystemPerformanceProfile>;
|
||||
nodeNetwork?: Record<string, RawNodeNetworkInfo>;
|
||||
}
|
||||
|
||||
function transformNetworkInterface(iface: RawNetworkInterfaceInfo): {
|
||||
name?: string;
|
||||
addresses: string[];
|
||||
} {
|
||||
const addresses: string[] = [];
|
||||
if (iface.ipAddress && typeof iface.ipAddress === "string") {
|
||||
addresses.push(iface.ipAddress);
|
||||
}
|
||||
if (Array.isArray(iface.addresses)) {
|
||||
for (const addr of iface.addresses) {
|
||||
if (typeof addr === "string") addresses.push(addr);
|
||||
else if (addr && typeof addr === "object" && addr.address)
|
||||
addresses.push(addr.address);
|
||||
}
|
||||
}
|
||||
if (Array.isArray(iface.ipAddresses)) {
|
||||
addresses.push(
|
||||
...iface.ipAddresses.filter((a): a is string => typeof a === "string"),
|
||||
);
|
||||
}
|
||||
if (Array.isArray(iface.ips)) {
|
||||
addresses.push(
|
||||
...iface.ips.filter((a): a is string => typeof a === "string"),
|
||||
);
|
||||
}
|
||||
if (iface.ipv4 && typeof iface.ipv4 === "string") addresses.push(iface.ipv4);
|
||||
if (iface.ipv6 && typeof iface.ipv6 === "string") addresses.push(iface.ipv6);
|
||||
|
||||
return {
|
||||
name: iface.name,
|
||||
addresses: Array.from(new Set(addresses)),
|
||||
};
|
||||
}
|
||||
|
||||
function transformTopology(
|
||||
raw: RawTopology,
|
||||
profiles?: RawNodeProfiles,
|
||||
granularState: GranularNodeState,
|
||||
): TopologyData {
|
||||
const nodes: Record<string, NodeInfo> = {};
|
||||
const edges: TopologyEdge[] = [];
|
||||
|
||||
// Handle nodes - can be array of strings (node IDs) or array of objects with nodeId/nodeProfile
|
||||
for (const node of raw.nodes || []) {
|
||||
// Determine the node ID - could be a string or an object with nodeId property
|
||||
const nodeId = typeof node === "string" ? node : node.nodeId;
|
||||
for (const nodeId of raw.nodes || []) {
|
||||
if (!nodeId) continue;
|
||||
|
||||
// Get the profile - from the separate profiles map or from the node object itself
|
||||
const profileFromMap = profiles?.[nodeId];
|
||||
const profileFromNode =
|
||||
typeof node === "object" ? node.nodeProfile : undefined;
|
||||
const profile = { ...(profileFromNode ?? {}), ...(profileFromMap ?? {}) };
|
||||
// Get data from granular state mappings
|
||||
const identity = granularState.nodeIdentities?.[nodeId];
|
||||
const memory = granularState.nodeMemory?.[nodeId];
|
||||
const system = granularState.nodeSystem?.[nodeId];
|
||||
const network = granularState.nodeNetwork?.[nodeId];
|
||||
|
||||
const ramTotal = profile?.memory?.ramTotal?.inBytes ?? 0;
|
||||
const ramAvailable = profile?.memory?.ramAvailable?.inBytes ?? 0;
|
||||
const ramTotal = memory?.ramTotal?.inBytes ?? 0;
|
||||
const ramAvailable = memory?.ramAvailable?.inBytes ?? 0;
|
||||
const ramUsage = Math.max(ramTotal - ramAvailable, 0);
|
||||
|
||||
const networkInterfaces = (profile?.networkInterfaces || []).map(
|
||||
(iface) => {
|
||||
const addresses: string[] = [];
|
||||
if (iface.ipAddress && typeof iface.ipAddress === "string") {
|
||||
addresses.push(iface.ipAddress);
|
||||
}
|
||||
if (Array.isArray(iface.addresses)) {
|
||||
for (const addr of iface.addresses) {
|
||||
if (typeof addr === "string") addresses.push(addr);
|
||||
else if (addr && typeof addr === "object" && addr.address)
|
||||
addresses.push(addr.address);
|
||||
}
|
||||
}
|
||||
if (Array.isArray(iface.ipAddresses)) {
|
||||
addresses.push(
|
||||
...iface.ipAddresses.filter(
|
||||
(a): a is string => typeof a === "string",
|
||||
),
|
||||
);
|
||||
}
|
||||
if (Array.isArray(iface.ips)) {
|
||||
addresses.push(
|
||||
...iface.ips.filter((a): a is string => typeof a === "string"),
|
||||
);
|
||||
}
|
||||
if (iface.ipv4 && typeof iface.ipv4 === "string")
|
||||
addresses.push(iface.ipv4);
|
||||
if (iface.ipv6 && typeof iface.ipv6 === "string")
|
||||
addresses.push(iface.ipv6);
|
||||
|
||||
return {
|
||||
name: iface.name,
|
||||
addresses: Array.from(new Set(addresses)),
|
||||
};
|
||||
},
|
||||
);
|
||||
const rawInterfaces = network?.interfaces || [];
|
||||
const networkInterfaces = rawInterfaces.map(transformNetworkInterface);
|
||||
|
||||
const ipToInterface: Record<string, string> = {};
|
||||
for (const iface of networkInterfaces) {
|
||||
@@ -291,8 +297,8 @@ function transformTopology(
|
||||
|
||||
nodes[nodeId] = {
|
||||
system_info: {
|
||||
model_id: profile?.modelId ?? "Unknown",
|
||||
chip: profile?.chipId,
|
||||
model_id: identity?.modelId ?? "Unknown",
|
||||
chip: identity?.chipId,
|
||||
memory: ramTotal,
|
||||
},
|
||||
network_interfaces: networkInterfaces,
|
||||
@@ -303,17 +309,15 @@ function transformTopology(
|
||||
ram_total: ramTotal,
|
||||
},
|
||||
temp:
|
||||
profile?.system?.temp !== undefined
|
||||
? { gpu_temp_avg: profile.system.temp }
|
||||
system?.temp !== undefined
|
||||
? { gpu_temp_avg: system.temp }
|
||||
: undefined,
|
||||
gpu_usage:
|
||||
profile?.system?.gpuUsage !== undefined
|
||||
? [0, profile.system.gpuUsage]
|
||||
: undefined,
|
||||
sys_power: profile?.system?.sysPower,
|
||||
system?.gpuUsage !== undefined ? [0, system.gpuUsage] : undefined,
|
||||
sys_power: system?.sysPower,
|
||||
},
|
||||
last_macmon_update: Date.now() / 1000,
|
||||
friendly_name: profile?.friendlyName,
|
||||
friendly_name: identity?.friendlyName,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -325,19 +329,15 @@ function transformTopology(
|
||||
for (const [sink, edgeList] of Object.entries(sinks)) {
|
||||
if (!Array.isArray(edgeList)) continue;
|
||||
for (const edge of edgeList) {
|
||||
// Extract IP from SocketConnection (uses snake_case: ip_address)
|
||||
let sendBackIp: string | undefined;
|
||||
if (edge && typeof edge === "object" && "sinkMultiaddr" in edge) {
|
||||
const multiaddr = edge.sinkMultiaddr;
|
||||
if (multiaddr) {
|
||||
// Try both snake_case (actual) and camelCase (in case it changes)
|
||||
sendBackIp =
|
||||
multiaddr.ip_address ||
|
||||
multiaddr.ipAddress ||
|
||||
extractIpFromMultiaddr(multiaddr.address);
|
||||
}
|
||||
}
|
||||
// RDMAConnection (sourceRdmaIface/sinkRdmaIface) has no IP - edge just shows connection exists
|
||||
|
||||
if (nodes[source] && nodes[sink] && source !== sink) {
|
||||
edges.push({ source, target: sink, sendBackIp });
|
||||
@@ -898,7 +898,12 @@ class AppStore {
|
||||
const data: RawStateResponse = await response.json();
|
||||
|
||||
if (data.topology) {
|
||||
this.topologyData = transformTopology(data.topology, data.nodeProfiles);
|
||||
this.topologyData = transformTopology(data.topology, {
|
||||
nodeIdentities: data.nodeIdentities,
|
||||
nodeMemory: data.nodeMemory,
|
||||
nodeSystem: data.nodeSystem,
|
||||
nodeNetwork: data.nodeNetwork,
|
||||
});
|
||||
}
|
||||
if (data.instances) {
|
||||
this.instances = data.instances;
|
||||
|
||||
@@ -17,8 +17,8 @@ dependencies = [
|
||||
"loguru>=0.7.3",
|
||||
"exo_pyo3_bindings", # rust bindings
|
||||
"anyio==4.11.0",
|
||||
"mlx==0.30.1; sys_platform == 'darwin'",
|
||||
"mlx[cpu]==0.30.1; sys_platform == 'linux'",
|
||||
"mlx==0.30.3; sys_platform == 'darwin'",
|
||||
"mlx[cpu]==0.30.3; sys_platform == 'linux'",
|
||||
"mlx-lm @ git+https://github.com/AlexCheema/mlx-lm.git@fix-transformers-5.0.0rc2",
|
||||
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
|
||||
"hypercorn>=0.18.0",
|
||||
|
||||
@@ -252,7 +252,8 @@ class API:
|
||||
instance_meta=instance_meta,
|
||||
min_nodes=min_nodes,
|
||||
),
|
||||
node_profiles=self.state.node_profiles,
|
||||
node_memory=self.state.node_memory,
|
||||
node_network=self.state.node_network,
|
||||
topology=self.state.topology,
|
||||
current_instances=self.state.instances,
|
||||
)
|
||||
@@ -308,7 +309,8 @@ class API:
|
||||
instance_meta=instance_meta,
|
||||
min_nodes=min_nodes,
|
||||
),
|
||||
node_profiles=self.state.node_profiles,
|
||||
node_memory=self.state.node_memory,
|
||||
node_network=self.state.node_network,
|
||||
topology=self.state.topology,
|
||||
current_instances=self.state.instances,
|
||||
)
|
||||
@@ -602,8 +604,8 @@ class API:
|
||||
"""Calculate total available memory across all nodes in bytes."""
|
||||
total_available = Memory()
|
||||
|
||||
for profile in self.state.node_profiles.values():
|
||||
total_available += profile.memory.ram_available
|
||||
for memory in self.state.node_memory.values():
|
||||
total_available += memory.ram_available
|
||||
|
||||
return total_available
|
||||
|
||||
|
||||
@@ -159,7 +159,8 @@ class Master:
|
||||
command,
|
||||
self.state.topology,
|
||||
self.state.instances,
|
||||
self.state.node_profiles,
|
||||
self.state.node_memory,
|
||||
self.state.node_network,
|
||||
)
|
||||
transition_events = get_transition_events(
|
||||
self.state.instances, placement
|
||||
|
||||
@@ -24,7 +24,7 @@ from exo.shared.types.common import NodeId
|
||||
from exo.shared.types.events import Event, InstanceCreated, InstanceDeleted
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId
|
||||
from exo.shared.types.profiling import NodePerformanceProfile
|
||||
from exo.shared.types.profiling import MemoryUsage, NodeNetworkInfo
|
||||
from exo.shared.types.worker.instances import (
|
||||
Instance,
|
||||
InstanceId,
|
||||
@@ -54,12 +54,13 @@ def place_instance(
|
||||
command: PlaceInstance,
|
||||
topology: Topology,
|
||||
current_instances: Mapping[InstanceId, Instance],
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_memory: Mapping[NodeId, MemoryUsage],
|
||||
node_network: Mapping[NodeId, NodeNetworkInfo],
|
||||
) -> dict[InstanceId, Instance]:
|
||||
cycles = topology.get_cycles()
|
||||
candidate_cycles = list(filter(lambda it: len(it) >= command.min_nodes, cycles))
|
||||
cycles_with_sufficient_memory = filter_cycles_by_memory(
|
||||
candidate_cycles, node_profiles, command.model_meta.storage_size
|
||||
candidate_cycles, node_memory, command.model_meta.storage_size
|
||||
)
|
||||
if len(cycles_with_sufficient_memory) == 0:
|
||||
raise ValueError("No cycles found with sufficient memory")
|
||||
@@ -104,13 +105,13 @@ def place_instance(
|
||||
selected_cycle = max(
|
||||
cycles_with_leaf_nodes if cycles_with_leaf_nodes != [] else smallest_cycles,
|
||||
key=lambda cycle: sum(
|
||||
(node_profiles[node_id].memory.ram_available for node_id in cycle),
|
||||
(node_memory[node_id].ram_available for node_id in cycle),
|
||||
start=Memory(),
|
||||
),
|
||||
)
|
||||
|
||||
shard_assignments = get_shard_assignments(
|
||||
command.model_meta, selected_cycle, command.sharding, node_profiles
|
||||
command.model_meta, selected_cycle, command.sharding, node_memory
|
||||
)
|
||||
|
||||
cycle_digraph: Topology = topology.get_subgraph_from_nodes(selected_cycle.node_ids)
|
||||
@@ -136,7 +137,7 @@ def place_instance(
|
||||
coordinator=selected_cycle.node_ids[0],
|
||||
coordinator_port=random_ephemeral_port(),
|
||||
cycle_digraph=cycle_digraph,
|
||||
node_profiles=node_profiles,
|
||||
node_network=node_network,
|
||||
)
|
||||
target_instances[instance_id] = MlxJacclInstance(
|
||||
instance_id=instance_id,
|
||||
@@ -150,7 +151,7 @@ def place_instance(
|
||||
selected_cycle=selected_cycle,
|
||||
cycle_digraph=cycle_digraph,
|
||||
ephemeral_port=ephemeral_port,
|
||||
node_profiles=node_profiles,
|
||||
node_network=node_network,
|
||||
)
|
||||
target_instances[instance_id] = MlxRingInstance(
|
||||
instance_id=instance_id,
|
||||
|
||||
@@ -6,7 +6,7 @@ from exo.shared.topology import Topology
|
||||
from exo.shared.types.common import Host, NodeId
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelMetadata
|
||||
from exo.shared.types.profiling import NodePerformanceProfile
|
||||
from exo.shared.types.profiling import MemoryUsage, NodeNetworkInfo
|
||||
from exo.shared.types.topology import Cycle, RDMAConnection, SocketConnection
|
||||
from exo.shared.types.worker.runners import RunnerId, ShardAssignments
|
||||
from exo.shared.types.worker.shards import (
|
||||
@@ -19,16 +19,16 @@ from exo.shared.types.worker.shards import (
|
||||
|
||||
def filter_cycles_by_memory(
|
||||
cycles: list[Cycle],
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_memory: Mapping[NodeId, MemoryUsage],
|
||||
required_memory: Memory,
|
||||
) -> list[Cycle]:
|
||||
filtered_cycles: list[Cycle] = []
|
||||
for cycle in cycles:
|
||||
if not all(node in node_profiles for node in cycle):
|
||||
if not all(node in node_memory for node in cycle):
|
||||
continue
|
||||
|
||||
total_mem = sum(
|
||||
(node_profiles[node_id].memory.ram_available for node_id in cycle.node_ids),
|
||||
(node_memory[node_id].ram_available for node_id in cycle.node_ids),
|
||||
start=Memory(),
|
||||
)
|
||||
if total_mem >= required_memory:
|
||||
@@ -77,13 +77,13 @@ def allocate_layers_proportionally(
|
||||
def get_shard_assignments_for_pipeline_parallel(
|
||||
model_meta: ModelMetadata,
|
||||
cycle: Cycle,
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_memory: Mapping[NodeId, MemoryUsage],
|
||||
):
|
||||
if not cycle.node_ids:
|
||||
raise ValueError("Cannot create shard assignments for empty node cycle")
|
||||
|
||||
cycle_memory = sum(
|
||||
(node_profiles[node_id].memory.ram_available for node_id in cycle.node_ids),
|
||||
(node_memory[node_id].ram_available for node_id in cycle.node_ids),
|
||||
start=Memory(),
|
||||
)
|
||||
|
||||
@@ -98,7 +98,7 @@ def get_shard_assignments_for_pipeline_parallel(
|
||||
layer_allocations = allocate_layers_proportionally(
|
||||
total_layers=total_layers,
|
||||
memory_fractions=[
|
||||
node_profiles[node_id].memory.ram_available.in_bytes / cycle_memory.in_bytes
|
||||
node_memory[node_id].ram_available.in_bytes / cycle_memory.in_bytes
|
||||
for node_id in cycle.node_ids
|
||||
],
|
||||
)
|
||||
@@ -109,7 +109,7 @@ def get_shard_assignments_for_pipeline_parallel(
|
||||
zip(cycle.node_ids, layer_allocations, strict=True)
|
||||
):
|
||||
required_memory = node_layers * memory_per_layer
|
||||
available_memory = node_profiles[node_id].memory.ram_available.in_bytes
|
||||
available_memory = node_memory[node_id].ram_available.in_bytes
|
||||
if required_memory > available_memory:
|
||||
raise ValueError(
|
||||
f"Node {i} ({node_id}) has insufficient memory: "
|
||||
@@ -182,14 +182,14 @@ def get_shard_assignments(
|
||||
model_meta: ModelMetadata,
|
||||
cycle: Cycle,
|
||||
sharding: Sharding,
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_memory: Mapping[NodeId, MemoryUsage],
|
||||
) -> ShardAssignments:
|
||||
match sharding:
|
||||
case Sharding.Pipeline:
|
||||
return get_shard_assignments_for_pipeline_parallel(
|
||||
model_meta=model_meta,
|
||||
cycle=cycle,
|
||||
node_profiles=node_profiles,
|
||||
node_memory=node_memory,
|
||||
)
|
||||
case Sharding.Tensor:
|
||||
return get_shard_assignments_for_tensor_parallel(
|
||||
@@ -288,10 +288,10 @@ def _find_connection_ip(
|
||||
|
||||
|
||||
def _find_interface_name_for_ip(
|
||||
ip_address: str, node_profile: NodePerformanceProfile
|
||||
ip_address: str, node_network: NodeNetworkInfo
|
||||
) -> str | None:
|
||||
"""Find the interface name for an IP address on a node (any interface)."""
|
||||
for interface in node_profile.network_interfaces:
|
||||
for interface in node_network.interfaces:
|
||||
if interface.ip_address == ip_address:
|
||||
return interface.name
|
||||
|
||||
@@ -302,7 +302,7 @@ def _find_ip_prioritised(
|
||||
node_id: NodeId,
|
||||
other_node_id: NodeId,
|
||||
cycle_digraph: Topology,
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_network: Mapping[NodeId, NodeNetworkInfo],
|
||||
) -> str | None:
|
||||
# TODO: Actually prioritize in the correct Ethernet > Wifi > Non-TB > TB order.
|
||||
"""Find an IP address between nodes with prioritization.
|
||||
@@ -316,7 +316,9 @@ def _find_ip_prioritised(
|
||||
ips = list(_find_connection_ip(node_id, other_node_id, cycle_digraph))
|
||||
# We expect a unique iface -> ip mapping
|
||||
iface_map = {
|
||||
_find_interface_name_for_ip(ip, node_profiles[other_node_id]): ip
|
||||
_find_interface_name_for_ip(
|
||||
ip, node_network.get(other_node_id, NodeNetworkInfo())
|
||||
): ip
|
||||
for ip, _ in ips
|
||||
}
|
||||
|
||||
@@ -345,7 +347,7 @@ def get_mlx_ring_hosts_by_node(
|
||||
selected_cycle: Cycle,
|
||||
cycle_digraph: Topology,
|
||||
ephemeral_port: int,
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_network: Mapping[NodeId, NodeNetworkInfo],
|
||||
) -> dict[NodeId, list[Host]]:
|
||||
"""Generate per-node host lists for MLX ring backend.
|
||||
|
||||
@@ -377,7 +379,7 @@ def get_mlx_ring_hosts_by_node(
|
||||
continue
|
||||
|
||||
connection_ip = _find_ip_prioritised(
|
||||
node_id, other_node_id, cycle_digraph, node_profiles
|
||||
node_id, other_node_id, cycle_digraph, node_network
|
||||
)
|
||||
if connection_ip is None:
|
||||
logger.warning(
|
||||
@@ -398,7 +400,7 @@ def get_mlx_jaccl_coordinators(
|
||||
coordinator: NodeId,
|
||||
coordinator_port: int,
|
||||
cycle_digraph: Topology,
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_network: Mapping[NodeId, NodeNetworkInfo],
|
||||
) -> dict[NodeId, str]:
|
||||
"""Get the coordinator addresses for MLX JACCL (rank 0 device).
|
||||
|
||||
@@ -411,7 +413,7 @@ def get_mlx_jaccl_coordinators(
|
||||
if n == coordinator:
|
||||
return "0.0.0.0"
|
||||
|
||||
ip = _find_ip_prioritised(n, coordinator, cycle_digraph, node_profiles)
|
||||
ip = _find_ip_prioritised(n, coordinator, cycle_digraph, node_network)
|
||||
if ip is not None:
|
||||
return ip
|
||||
|
||||
|
||||
@@ -2,28 +2,26 @@ from exo.shared.types.multiaddr import Multiaddr
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryUsage,
|
||||
NetworkInterfaceInfo,
|
||||
NodePerformanceProfile,
|
||||
SystemPerformanceProfile,
|
||||
NodeNetworkInfo,
|
||||
)
|
||||
from exo.shared.types.topology import RDMAConnection, SocketConnection
|
||||
|
||||
|
||||
def create_node_profile(memory: int) -> NodePerformanceProfile:
|
||||
return NodePerformanceProfile(
|
||||
model_id="test",
|
||||
chip_id="test",
|
||||
friendly_name="test",
|
||||
memory=MemoryUsage.from_bytes(
|
||||
ram_total=1000,
|
||||
ram_available=memory,
|
||||
swap_total=1000,
|
||||
swap_available=1000,
|
||||
),
|
||||
network_interfaces=[
|
||||
def create_node_memory(memory: int) -> MemoryUsage:
|
||||
return MemoryUsage.from_bytes(
|
||||
ram_total=1000,
|
||||
ram_available=memory,
|
||||
swap_total=1000,
|
||||
swap_available=1000,
|
||||
)
|
||||
|
||||
|
||||
def create_node_network() -> NodeNetworkInfo:
|
||||
return NodeNetworkInfo(
|
||||
interfaces=[
|
||||
NetworkInterfaceInfo(name="en0", ip_address=f"169.254.0.{i}")
|
||||
for i in range(10)
|
||||
],
|
||||
system=SystemPerformanceProfile(),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -73,8 +73,8 @@ async def test_master():
|
||||
tg.start_soon(master.run)
|
||||
|
||||
sender_node_id = NodeId(f"{keypair.to_peer_id().to_base58()}_sender")
|
||||
# inject a NodePerformanceProfile event
|
||||
logger.info("inject a NodePerformanceProfile event")
|
||||
# inject a NodeGatheredInfo event
|
||||
logger.info("inject a NodeGatheredInfo event")
|
||||
await local_event_sender.send(
|
||||
ForwarderEvent(
|
||||
origin_idx=0,
|
||||
@@ -99,7 +99,7 @@ async def test_master():
|
||||
logger.info("wait for initial topology event")
|
||||
while len(list(master.state.topology.list_nodes())) == 0:
|
||||
await anyio.sleep(0.001)
|
||||
while len(master.state.node_profiles) == 0:
|
||||
while len(master.state.node_memory) == 0:
|
||||
await anyio.sleep(0.001)
|
||||
|
||||
logger.info("inject a CreateInstance Command")
|
||||
|
||||
@@ -5,7 +5,8 @@ from exo.master.placement import (
|
||||
place_instance,
|
||||
)
|
||||
from exo.master.tests.conftest import (
|
||||
create_node_profile,
|
||||
create_node_memory,
|
||||
create_node_network,
|
||||
create_rdma_connection,
|
||||
create_socket_connection,
|
||||
)
|
||||
@@ -16,7 +17,7 @@ from exo.shared.types.events import InstanceCreated, InstanceDeleted
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.shared.types.multiaddr import Multiaddr
|
||||
from exo.shared.types.profiling import NetworkInterfaceInfo
|
||||
from exo.shared.types.profiling import NetworkInterfaceInfo, NodeNetworkInfo
|
||||
from exo.shared.types.topology import Connection, SocketConnection
|
||||
from exo.shared.types.worker.instances import (
|
||||
Instance,
|
||||
@@ -109,10 +110,15 @@ def test_get_instance_placements_create_instance(
|
||||
source=node_id_b, sink=node_id_a, edge=create_socket_connection(6)
|
||||
)
|
||||
|
||||
profiles = {
|
||||
node_id_a: create_node_profile(available_memory[0]),
|
||||
node_id_b: create_node_profile(available_memory[1]),
|
||||
node_id_c: create_node_profile(available_memory[2]),
|
||||
node_memory = {
|
||||
node_id_a: create_node_memory(available_memory[0]),
|
||||
node_id_b: create_node_memory(available_memory[1]),
|
||||
node_id_c: create_node_memory(available_memory[2]),
|
||||
}
|
||||
node_network = {
|
||||
node_id_a: create_node_network(),
|
||||
node_id_b: create_node_network(),
|
||||
node_id_c: create_node_network(),
|
||||
}
|
||||
topology.add_node(node_id_a)
|
||||
topology.add_node(node_id_b)
|
||||
@@ -125,7 +131,7 @@ def test_get_instance_placements_create_instance(
|
||||
topology.add_connection(conn_b_a)
|
||||
|
||||
# act
|
||||
placements = place_instance(cic, topology, {}, profiles)
|
||||
placements = place_instance(cic, topology, {}, node_memory, node_network)
|
||||
|
||||
# assert
|
||||
assert len(placements) == 1
|
||||
@@ -155,7 +161,8 @@ def test_get_instance_placements_one_node_exact_fit() -> None:
|
||||
topology = Topology()
|
||||
node_id = NodeId()
|
||||
topology.add_node(node_id)
|
||||
profiles = {node_id: create_node_profile(1000 * 1024)}
|
||||
node_memory = {node_id: create_node_memory(1000 * 1024)}
|
||||
node_network = {node_id: create_node_network()}
|
||||
cic = place_instance_command(
|
||||
ModelMetadata(
|
||||
model_id=ModelId("test-model"),
|
||||
@@ -166,7 +173,7 @@ def test_get_instance_placements_one_node_exact_fit() -> None:
|
||||
supports_tensor=True,
|
||||
),
|
||||
)
|
||||
placements = place_instance(cic, topology, {}, profiles)
|
||||
placements = place_instance(cic, topology, {}, node_memory, node_network)
|
||||
|
||||
assert len(placements) == 1
|
||||
instance_id = list(placements.keys())[0]
|
||||
@@ -181,7 +188,8 @@ def test_get_instance_placements_one_node_fits_with_extra_memory() -> None:
|
||||
topology = Topology()
|
||||
node_id = NodeId()
|
||||
topology.add_node(node_id)
|
||||
profiles = {node_id: create_node_profile(1001 * 1024)}
|
||||
node_memory = {node_id: create_node_memory(1001 * 1024)}
|
||||
node_network = {node_id: create_node_network()}
|
||||
cic = place_instance_command(
|
||||
ModelMetadata(
|
||||
model_id=ModelId("test-model"),
|
||||
@@ -192,7 +200,7 @@ def test_get_instance_placements_one_node_fits_with_extra_memory() -> None:
|
||||
supports_tensor=True,
|
||||
),
|
||||
)
|
||||
placements = place_instance(cic, topology, {}, profiles)
|
||||
placements = place_instance(cic, topology, {}, node_memory, node_network)
|
||||
|
||||
assert len(placements) == 1
|
||||
instance_id = list(placements.keys())[0]
|
||||
@@ -207,7 +215,8 @@ def test_get_instance_placements_one_node_not_fit() -> None:
|
||||
topology = Topology()
|
||||
node_id = NodeId()
|
||||
topology.add_node(node_id)
|
||||
profiles = {node_id: create_node_profile(1000 * 1024)}
|
||||
node_memory = {node_id: create_node_memory(1000 * 1024)}
|
||||
node_network = {node_id: create_node_network()}
|
||||
cic = place_instance_command(
|
||||
model_meta=ModelMetadata(
|
||||
model_id=ModelId("test-model"),
|
||||
@@ -220,7 +229,7 @@ def test_get_instance_placements_one_node_not_fit() -> None:
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="No cycles found with sufficient memory"):
|
||||
place_instance(cic, topology, {}, profiles)
|
||||
place_instance(cic, topology, {}, node_memory, node_network)
|
||||
|
||||
|
||||
def test_get_transition_events_no_change(instance: Instance):
|
||||
@@ -278,11 +287,17 @@ def test_placement_selects_leaf_nodes(
|
||||
node_id_c = NodeId()
|
||||
node_id_d = NodeId()
|
||||
|
||||
profiles = {
|
||||
node_id_a: create_node_profile(500),
|
||||
node_id_b: create_node_profile(600),
|
||||
node_id_c: create_node_profile(600),
|
||||
node_id_d: create_node_profile(500),
|
||||
node_memory = {
|
||||
node_id_a: create_node_memory(500),
|
||||
node_id_b: create_node_memory(600),
|
||||
node_id_c: create_node_memory(600),
|
||||
node_id_d: create_node_memory(500),
|
||||
}
|
||||
node_network = {
|
||||
node_id_a: create_node_network(),
|
||||
node_id_b: create_node_network(),
|
||||
node_id_c: create_node_network(),
|
||||
node_id_d: create_node_network(),
|
||||
}
|
||||
|
||||
topology.add_node(node_id_a)
|
||||
@@ -313,7 +328,7 @@ def test_placement_selects_leaf_nodes(
|
||||
cic = place_instance_command(model_meta=model_meta)
|
||||
|
||||
# act
|
||||
placements = place_instance(cic, topology, {}, profiles)
|
||||
placements = place_instance(cic, topology, {}, node_memory, node_network)
|
||||
|
||||
# assert
|
||||
assert len(placements) == 1
|
||||
@@ -340,10 +355,10 @@ def test_tensor_rdma_backend_connectivity_matrix(
|
||||
node_b = NodeId()
|
||||
node_c = NodeId()
|
||||
|
||||
profiles = {
|
||||
node_a: create_node_profile(500),
|
||||
node_b: create_node_profile(500),
|
||||
node_c: create_node_profile(500),
|
||||
node_memory = {
|
||||
node_a: create_node_memory(500),
|
||||
node_b: create_node_memory(500),
|
||||
node_c: create_node_memory(500),
|
||||
}
|
||||
|
||||
ethernet_interface = NetworkInterfaceInfo(
|
||||
@@ -354,9 +369,11 @@ def test_tensor_rdma_backend_connectivity_matrix(
|
||||
sink_multiaddr=Multiaddr(address="/ip4/10.0.0.1/tcp/8000")
|
||||
)
|
||||
|
||||
profiles[node_a].network_interfaces = [ethernet_interface]
|
||||
profiles[node_b].network_interfaces = [ethernet_interface]
|
||||
profiles[node_c].network_interfaces = [ethernet_interface]
|
||||
node_network = {
|
||||
node_a: NodeNetworkInfo(interfaces=[ethernet_interface]),
|
||||
node_b: NodeNetworkInfo(interfaces=[ethernet_interface]),
|
||||
node_c: NodeNetworkInfo(interfaces=[ethernet_interface]),
|
||||
}
|
||||
|
||||
topology.add_node(node_a)
|
||||
topology.add_node(node_b)
|
||||
@@ -399,7 +416,7 @@ def test_tensor_rdma_backend_connectivity_matrix(
|
||||
)
|
||||
|
||||
# act
|
||||
placements = place_instance(cic, topology, {}, profiles)
|
||||
placements = place_instance(cic, topology, {}, node_memory, node_network)
|
||||
|
||||
# assert
|
||||
assert len(placements) == 1
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from copy import copy
|
||||
|
||||
import pytest
|
||||
|
||||
from exo.master.placement_utils import (
|
||||
@@ -10,16 +8,17 @@ from exo.master.placement_utils import (
|
||||
get_shard_assignments,
|
||||
get_smallest_cycles,
|
||||
)
|
||||
from exo.master.tests.conftest import create_node_profile, create_socket_connection
|
||||
from exo.master.tests.conftest import (
|
||||
create_node_memory,
|
||||
create_socket_connection,
|
||||
)
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.common import Host, NodeId
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryUsage,
|
||||
NetworkInterfaceInfo,
|
||||
NodePerformanceProfile,
|
||||
SystemPerformanceProfile,
|
||||
NodeNetworkInfo,
|
||||
)
|
||||
from exo.shared.types.topology import Connection, SocketConnection
|
||||
from exo.shared.types.worker.shards import Sharding
|
||||
@@ -36,9 +35,9 @@ def test_filter_cycles_by_memory():
|
||||
source=node2_id, sink=node1_id, edge=create_socket_connection(2)
|
||||
)
|
||||
|
||||
node1 = create_node_profile(1000 * 1024)
|
||||
node2 = create_node_profile(1000 * 1024)
|
||||
node_profiles = {node1_id: node1, node2_id: node2}
|
||||
node1_mem = create_node_memory(1000 * 1024)
|
||||
node2_mem = create_node_memory(1000 * 1024)
|
||||
node_memory = {node1_id: node1_mem, node2_id: node2_mem}
|
||||
|
||||
topology = Topology()
|
||||
topology.add_node(node1_id)
|
||||
@@ -51,9 +50,7 @@ def test_filter_cycles_by_memory():
|
||||
assert len(cycles[0]) == 2
|
||||
|
||||
# act
|
||||
filtered_cycles = filter_cycles_by_memory(
|
||||
cycles, node_profiles, Memory.from_bytes(1)
|
||||
)
|
||||
filtered_cycles = filter_cycles_by_memory(cycles, node_memory, Memory.from_bytes(1))
|
||||
|
||||
# assert
|
||||
assert len(filtered_cycles) == 1
|
||||
@@ -72,9 +69,9 @@ def test_filter_cycles_by_insufficient_memory():
|
||||
source=node2_id, sink=node1_id, edge=create_socket_connection(2)
|
||||
)
|
||||
|
||||
node1 = create_node_profile(1000 * 1024)
|
||||
node2 = create_node_profile(1000 * 1024)
|
||||
node_profiles = {node1_id: node1, node2_id: node2}
|
||||
node1_mem = create_node_memory(1000 * 1024)
|
||||
node2_mem = create_node_memory(1000 * 1024)
|
||||
node_memory = {node1_id: node1_mem, node2_id: node2_mem}
|
||||
|
||||
topology = Topology()
|
||||
topology.add_node(node1_id)
|
||||
@@ -84,7 +81,7 @@ def test_filter_cycles_by_insufficient_memory():
|
||||
|
||||
# act
|
||||
filtered_cycles = filter_cycles_by_memory(
|
||||
topology.get_cycles(), node_profiles, Memory.from_kb(2001)
|
||||
topology.get_cycles(), node_memory, Memory.from_kb(2001)
|
||||
)
|
||||
|
||||
# assert
|
||||
@@ -109,13 +106,13 @@ def test_filter_multiple_cycles_by_memory():
|
||||
source=node_c_id, sink=node_b_id, edge=create_socket_connection(4)
|
||||
)
|
||||
|
||||
node_a = create_node_profile(500 * 1024)
|
||||
node_b = create_node_profile(500 * 1024)
|
||||
node_c = create_node_profile(1000 * 1024)
|
||||
node_profiles = {
|
||||
node_a_id: node_a,
|
||||
node_b_id: node_b,
|
||||
node_c_id: node_c,
|
||||
node_a_mem = create_node_memory(500 * 1024)
|
||||
node_b_mem = create_node_memory(500 * 1024)
|
||||
node_c_mem = create_node_memory(1000 * 1024)
|
||||
node_memory = {
|
||||
node_a_id: node_a_mem,
|
||||
node_b_id: node_b_mem,
|
||||
node_c_id: node_c_mem,
|
||||
}
|
||||
|
||||
topology = Topology()
|
||||
@@ -130,9 +127,7 @@ def test_filter_multiple_cycles_by_memory():
|
||||
cycles = topology.get_cycles()
|
||||
|
||||
# act
|
||||
filtered_cycles = filter_cycles_by_memory(
|
||||
cycles, node_profiles, Memory.from_kb(1500)
|
||||
)
|
||||
filtered_cycles = filter_cycles_by_memory(cycles, node_memory, Memory.from_kb(1500))
|
||||
|
||||
# assert
|
||||
assert len(filtered_cycles) == 1
|
||||
@@ -228,13 +223,13 @@ def test_get_shard_assignments(
|
||||
topology.add_connection(connection3)
|
||||
topology.add_connection(connection4)
|
||||
|
||||
node_a = create_node_profile(available_memory[0] * 1024)
|
||||
node_b = create_node_profile(available_memory[1] * 1024)
|
||||
node_c = create_node_profile(available_memory[2] * 1024)
|
||||
node_profiles = {
|
||||
node_a_id: node_a,
|
||||
node_b_id: node_b,
|
||||
node_c_id: node_c,
|
||||
node_a_mem = create_node_memory(available_memory[0] * 1024)
|
||||
node_b_mem = create_node_memory(available_memory[1] * 1024)
|
||||
node_c_mem = create_node_memory(available_memory[2] * 1024)
|
||||
node_memory = {
|
||||
node_a_id: node_a_mem,
|
||||
node_b_id: node_b_mem,
|
||||
node_c_id: node_c_mem,
|
||||
}
|
||||
|
||||
model_meta = ModelMetadata(
|
||||
@@ -253,7 +248,7 @@ def test_get_shard_assignments(
|
||||
|
||||
# act
|
||||
shard_assignments = get_shard_assignments(
|
||||
model_meta, selected_cycle, Sharding.Pipeline, node_profiles=node_profiles
|
||||
model_meta, selected_cycle, Sharding.Pipeline, node_memory=node_memory
|
||||
)
|
||||
|
||||
# assert
|
||||
@@ -343,38 +338,28 @@ def test_get_mlx_jaccl_coordinators():
|
||||
source=node_a_id, sink=node_c_id, edge=create_socket_connection(6)
|
||||
)
|
||||
|
||||
npp = NodePerformanceProfile(
|
||||
model_id="test",
|
||||
chip_id="test",
|
||||
friendly_name="test",
|
||||
memory=MemoryUsage.from_bytes(
|
||||
ram_total=0,
|
||||
ram_available=0,
|
||||
swap_total=0,
|
||||
swap_available=0,
|
||||
),
|
||||
network_interfaces=[],
|
||||
system=SystemPerformanceProfile(),
|
||||
network_a = NodeNetworkInfo(
|
||||
interfaces=[
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.5"),
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.2"),
|
||||
]
|
||||
)
|
||||
npp_a = copy(npp)
|
||||
npp_a.network_interfaces = [
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.5"),
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.2"),
|
||||
]
|
||||
npp_b = copy(npp)
|
||||
npp_b.network_interfaces = [
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.1"),
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.4"),
|
||||
]
|
||||
npp_c = copy(npp)
|
||||
npp_c.network_interfaces = [
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.3"),
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.6"),
|
||||
]
|
||||
node_profiles = {
|
||||
node_a_id: npp_a,
|
||||
node_b_id: npp_b,
|
||||
node_c_id: npp_c,
|
||||
network_b = NodeNetworkInfo(
|
||||
interfaces=[
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.1"),
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.4"),
|
||||
]
|
||||
)
|
||||
network_c = NodeNetworkInfo(
|
||||
interfaces=[
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.3"),
|
||||
NetworkInterfaceInfo(name="en0", ip_address="169.254.0.6"),
|
||||
]
|
||||
)
|
||||
node_network = {
|
||||
node_a_id: network_a,
|
||||
node_b_id: network_b,
|
||||
node_c_id: network_c,
|
||||
}
|
||||
|
||||
topology = Topology()
|
||||
@@ -394,7 +379,7 @@ def test_get_mlx_jaccl_coordinators():
|
||||
node_a_id,
|
||||
coordinator_port=5000,
|
||||
cycle_digraph=topology,
|
||||
node_profiles=node_profiles,
|
||||
node_network=node_network,
|
||||
)
|
||||
|
||||
# assert
|
||||
@@ -496,9 +481,9 @@ def test_get_shard_assignments_insufficient_memory_raises():
|
||||
topology = Topology()
|
||||
|
||||
# Node C has only 10 KB but would need 50 KB for 1 layer (1000 KB / 20 layers)
|
||||
node_a = create_node_profile(900 * 1024)
|
||||
node_b = create_node_profile(50 * 1024)
|
||||
node_c = create_node_profile(10 * 1024) # Insufficient memory
|
||||
node_a_mem = create_node_memory(900 * 1024)
|
||||
node_b_mem = create_node_memory(50 * 1024)
|
||||
node_c_mem = create_node_memory(10 * 1024) # Insufficient memory
|
||||
|
||||
topology.add_node(node_a_id)
|
||||
topology.add_node(node_b_id)
|
||||
@@ -521,10 +506,10 @@ def test_get_shard_assignments_insufficient_memory_raises():
|
||||
topology.add_connection(conn_c_a)
|
||||
topology.add_connection(conn_b_a)
|
||||
|
||||
profiles = {
|
||||
node_a_id: node_a,
|
||||
node_b_id: node_b,
|
||||
node_c_id: node_c,
|
||||
node_memory = {
|
||||
node_a_id: node_a_mem,
|
||||
node_b_id: node_b_mem,
|
||||
node_c_id: node_c_mem,
|
||||
}
|
||||
|
||||
model_meta = ModelMetadata(
|
||||
@@ -539,4 +524,6 @@ def test_get_shard_assignments_insufficient_memory_raises():
|
||||
selected_cycle = cycles[0]
|
||||
|
||||
with pytest.raises(ValueError, match="insufficient memory"):
|
||||
get_shard_assignments(model_meta, selected_cycle, Sharding.Pipeline, profiles)
|
||||
get_shard_assignments(
|
||||
model_meta, selected_cycle, Sharding.Pipeline, node_memory
|
||||
)
|
||||
|
||||
@@ -3,11 +3,6 @@ import pytest
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.common import NodeId
|
||||
from exo.shared.types.multiaddr import Multiaddr
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryUsage,
|
||||
NodePerformanceProfile,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.topology import Connection, SocketConnection
|
||||
|
||||
|
||||
@@ -23,22 +18,6 @@ def socket_connection() -> SocketConnection:
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node_profile() -> NodePerformanceProfile:
|
||||
memory_profile = MemoryUsage.from_bytes(
|
||||
ram_total=1000, ram_available=1000, swap_total=1000, swap_available=1000
|
||||
)
|
||||
system_profile = SystemPerformanceProfile()
|
||||
return NodePerformanceProfile(
|
||||
model_id="test",
|
||||
chip_id="test",
|
||||
friendly_name="test",
|
||||
memory=memory_profile,
|
||||
network_interfaces=[],
|
||||
system=system_profile,
|
||||
)
|
||||
|
||||
|
||||
def test_add_node(topology: Topology):
|
||||
# arrange
|
||||
node_id = NodeId()
|
||||
|
||||
@@ -25,7 +25,11 @@ from exo.shared.types.events import (
|
||||
TopologyEdgeCreated,
|
||||
TopologyEdgeDeleted,
|
||||
)
|
||||
from exo.shared.types.profiling import NodePerformanceProfile
|
||||
from exo.shared.types.profiling import (
|
||||
NodeIdentity,
|
||||
NodeNetworkInfo,
|
||||
NodeThunderboltInfo,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import Task, TaskId, TaskStatus
|
||||
from exo.shared.types.topology import Connection, RDMAConnection
|
||||
@@ -193,22 +197,43 @@ def apply_runner_deleted(event: RunnerDeleted, state: State) -> State:
|
||||
|
||||
def apply_node_timed_out(event: NodeTimedOut, state: State) -> State:
|
||||
topology = copy.deepcopy(state.topology)
|
||||
state.topology.remove_node(event.node_id)
|
||||
node_profiles = {
|
||||
key: value for key, value in state.node_profiles.items() if key != event.node_id
|
||||
}
|
||||
topology.remove_node(event.node_id)
|
||||
last_seen = {
|
||||
key: value for key, value in state.last_seen.items() if key != event.node_id
|
||||
}
|
||||
downloads = {
|
||||
key: value for key, value in state.downloads.items() if key != event.node_id
|
||||
}
|
||||
# Clean up all granular node mappings
|
||||
node_identities = {
|
||||
key: value
|
||||
for key, value in state.node_identities.items()
|
||||
if key != event.node_id
|
||||
}
|
||||
node_memory = {
|
||||
key: value for key, value in state.node_memory.items() if key != event.node_id
|
||||
}
|
||||
node_system = {
|
||||
key: value for key, value in state.node_system.items() if key != event.node_id
|
||||
}
|
||||
node_network = {
|
||||
key: value for key, value in state.node_network.items() if key != event.node_id
|
||||
}
|
||||
node_thunderbolt = {
|
||||
key: value
|
||||
for key, value in state.node_thunderbolt.items()
|
||||
if key != event.node_id
|
||||
}
|
||||
return state.model_copy(
|
||||
update={
|
||||
"downloads": downloads,
|
||||
"topology": topology,
|
||||
"node_profiles": node_profiles,
|
||||
"last_seen": last_seen,
|
||||
"node_identities": node_identities,
|
||||
"node_memory": node_memory,
|
||||
"node_system": node_system,
|
||||
"node_network": node_network,
|
||||
"node_thunderbolt": node_thunderbolt,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -217,29 +242,60 @@ def apply_node_gathered_info(event: NodeGatheredInfo, state: State) -> State:
|
||||
topology = copy.deepcopy(state.topology)
|
||||
topology.add_node(event.node_id)
|
||||
info = event.info
|
||||
profile = state.node_profiles.get(event.node_id, NodePerformanceProfile())
|
||||
|
||||
# Build update dict with only the mappings that change
|
||||
update: dict[str, object] = {
|
||||
"last_seen": {
|
||||
**state.last_seen,
|
||||
event.node_id: datetime.fromisoformat(event.when),
|
||||
},
|
||||
"topology": topology,
|
||||
}
|
||||
|
||||
match info:
|
||||
case MacmonMetrics():
|
||||
profile.system = info.system_profile
|
||||
profile.memory = info.memory
|
||||
update["node_system"] = {
|
||||
**state.node_system,
|
||||
event.node_id: info.system_profile,
|
||||
}
|
||||
update["node_memory"] = {**state.node_memory, event.node_id: info.memory}
|
||||
case MemoryUsage():
|
||||
profile.memory = info
|
||||
update["node_memory"] = {**state.node_memory, event.node_id: info}
|
||||
case NodeConfig():
|
||||
pass
|
||||
case MiscData():
|
||||
profile.friendly_name = info.friendly_name
|
||||
current_identity = state.node_identities.get(event.node_id, NodeIdentity())
|
||||
new_identity = current_identity.model_copy(
|
||||
update={"friendly_name": info.friendly_name}
|
||||
)
|
||||
update["node_identities"] = {
|
||||
**state.node_identities,
|
||||
event.node_id: new_identity,
|
||||
}
|
||||
case StaticNodeInformation():
|
||||
profile.model_id = info.model
|
||||
profile.chip_id = info.chip
|
||||
current_identity = state.node_identities.get(event.node_id, NodeIdentity())
|
||||
new_identity = current_identity.model_copy(
|
||||
update={"model_id": info.model, "chip_id": info.chip}
|
||||
)
|
||||
update["node_identities"] = {
|
||||
**state.node_identities,
|
||||
event.node_id: new_identity,
|
||||
}
|
||||
case NodeNetworkInterfaces():
|
||||
profile.network_interfaces = info.ifaces
|
||||
update["node_network"] = {
|
||||
**state.node_network,
|
||||
event.node_id: NodeNetworkInfo(interfaces=info.ifaces),
|
||||
}
|
||||
case MacThunderboltIdentifiers():
|
||||
profile.tb_interfaces = info.idents
|
||||
update["node_thunderbolt"] = {
|
||||
**state.node_thunderbolt,
|
||||
event.node_id: NodeThunderboltInfo(interfaces=info.idents),
|
||||
}
|
||||
case MacThunderboltConnections():
|
||||
conn_map = {
|
||||
tb_ident.domain_uuid: (nid, tb_ident.rdma_interface)
|
||||
for nid in state.node_profiles
|
||||
for tb_ident in state.node_profiles[nid].tb_interfaces
|
||||
for nid in state.node_thunderbolt
|
||||
for tb_ident in state.node_thunderbolt[nid].interfaces
|
||||
}
|
||||
as_rdma_conns = [
|
||||
Connection(
|
||||
@@ -256,15 +312,7 @@ def apply_node_gathered_info(event: NodeGatheredInfo, state: State) -> State:
|
||||
]
|
||||
topology.replace_all_out_rdma_connections(event.node_id, as_rdma_conns)
|
||||
|
||||
last_seen = {**state.last_seen, event.node_id: datetime.fromisoformat(event.when)}
|
||||
new_profiles = {**state.node_profiles, event.node_id: profile}
|
||||
return state.model_copy(
|
||||
update={
|
||||
"node_profiles": new_profiles,
|
||||
"last_seen": last_seen,
|
||||
"topology": topology,
|
||||
}
|
||||
)
|
||||
return state.model_copy(update=update)
|
||||
|
||||
|
||||
def apply_topology_edge_created(event: TopologyEdgeCreated, state: State) -> State:
|
||||
|
||||
@@ -518,6 +518,67 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# glm 4.7 flash
|
||||
"glm-4.7-flash-4bit": ModelCard(
|
||||
short_id="glm-4.7-flash-4bit",
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-4bit"),
|
||||
name="GLM 4.7 Flash 4bit",
|
||||
description="GLM 4.7 Flash 4bit",
|
||||
tags=[],
|
||||
metadata=ModelMetadata(
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-4bit"),
|
||||
pretty_name="GLM 4.7 Flash 4bit",
|
||||
storage_size=Memory.from_gb(18),
|
||||
n_layers=47,
|
||||
hidden_size=2048,
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
"glm-4.7-flash-5bit": ModelCard(
|
||||
short_id="glm-4.7-flash-5bit",
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-5bit"),
|
||||
name="GLM 4.7 Flash 5bit",
|
||||
description="GLM 4.7 Flash 5bit",
|
||||
tags=[],
|
||||
metadata=ModelMetadata(
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-5bit"),
|
||||
pretty_name="GLM 4.7 Flash 5bit",
|
||||
storage_size=Memory.from_gb(21),
|
||||
n_layers=47,
|
||||
hidden_size=2048,
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
"glm-4.7-flash-6bit": ModelCard(
|
||||
short_id="glm-4.7-flash-6bit",
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-6bit"),
|
||||
name="GLM 4.7 Flash 6bit",
|
||||
description="GLM 4.7 Flash 6bit",
|
||||
tags=[],
|
||||
metadata=ModelMetadata(
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-6bit"),
|
||||
pretty_name="GLM 4.7 Flash 6bit",
|
||||
storage_size=Memory.from_gb(25),
|
||||
n_layers=47,
|
||||
hidden_size=2048,
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
"glm-4.7-flash-8bit": ModelCard(
|
||||
short_id="glm-4.7-flash-8bit",
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-8bit"),
|
||||
name="GLM 4.7 Flash 8bit",
|
||||
description="GLM 4.7 Flash 8bit",
|
||||
tags=[],
|
||||
metadata=ModelMetadata(
|
||||
model_id=ModelId("mlx-community/GLM-4.7-Flash-8bit"),
|
||||
pretty_name="GLM 4.7 Flash 8bit",
|
||||
storage_size=Memory.from_gb(32),
|
||||
n_layers=47,
|
||||
hidden_size=2048,
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
# minimax-m2
|
||||
"minimax-m2.1-8bit": ModelCard(
|
||||
short_id="minimax-m2.1-8bit",
|
||||
|
||||
@@ -53,13 +53,21 @@ class NetworkInterfaceInfo(CamelCaseModel):
|
||||
ip_address: str
|
||||
|
||||
|
||||
class NodePerformanceProfile(CamelCaseModel):
|
||||
class NodeIdentity(CamelCaseModel):
|
||||
"""Static and slow-changing node identification data."""
|
||||
|
||||
model_id: str = "Unknown"
|
||||
chip_id: str = "Unknown"
|
||||
friendly_name: str = "Unknown"
|
||||
memory: MemoryUsage = MemoryUsage.from_bytes(
|
||||
ram_total=0, ram_available=0, swap_total=0, swap_available=0
|
||||
)
|
||||
network_interfaces: Sequence[NetworkInterfaceInfo] = []
|
||||
tb_interfaces: Sequence[ThunderboltIdentifier] = []
|
||||
system: SystemPerformanceProfile = SystemPerformanceProfile()
|
||||
|
||||
|
||||
class NodeNetworkInfo(CamelCaseModel):
|
||||
"""Network interface information for a node."""
|
||||
|
||||
interfaces: Sequence[NetworkInterfaceInfo] = []
|
||||
|
||||
|
||||
class NodeThunderboltInfo(CamelCaseModel):
|
||||
"""Thunderbolt interface identifiers for a node."""
|
||||
|
||||
interfaces: Sequence[ThunderboltIdentifier] = []
|
||||
|
||||
@@ -7,7 +7,13 @@ from pydantic.alias_generators import to_camel
|
||||
|
||||
from exo.shared.topology import Topology, TopologySnapshot
|
||||
from exo.shared.types.common import NodeId
|
||||
from exo.shared.types.profiling import NodePerformanceProfile
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryUsage,
|
||||
NodeIdentity,
|
||||
NodeNetworkInfo,
|
||||
NodeThunderboltInfo,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.tasks import Task, TaskId
|
||||
from exo.shared.types.worker.downloads import DownloadProgress
|
||||
from exo.shared.types.worker.instances import Instance, InstanceId
|
||||
@@ -35,11 +41,17 @@ class State(CamelCaseModel):
|
||||
runners: Mapping[RunnerId, RunnerStatus] = {}
|
||||
downloads: Mapping[NodeId, Sequence[DownloadProgress]] = {}
|
||||
tasks: Mapping[TaskId, Task] = {}
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile] = {}
|
||||
last_seen: Mapping[NodeId, datetime] = {}
|
||||
topology: Topology = Field(default_factory=Topology)
|
||||
last_event_applied_idx: int = Field(default=-1, ge=-1)
|
||||
|
||||
# Granular node state mappings (update independently at different frequencies)
|
||||
node_identities: Mapping[NodeId, NodeIdentity] = {}
|
||||
node_memory: Mapping[NodeId, MemoryUsage] = {}
|
||||
node_system: Mapping[NodeId, SystemPerformanceProfile] = {}
|
||||
node_network: Mapping[NodeId, NodeNetworkInfo] = {}
|
||||
node_thunderbolt: Mapping[NodeId, NodeThunderboltInfo] = {}
|
||||
|
||||
@field_serializer("topology", mode="plain")
|
||||
def _encode_topology(self, value: Topology) -> TopologySnapshot:
|
||||
return value.to_snapshot()
|
||||
|
||||
@@ -7,7 +7,7 @@ from loguru import logger
|
||||
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.common import NodeId
|
||||
from exo.shared.types.profiling import NodePerformanceProfile
|
||||
from exo.shared.types.profiling import NodeNetworkInfo
|
||||
|
||||
REACHABILITY_ATTEMPTS = 3
|
||||
|
||||
@@ -79,7 +79,7 @@ async def check_reachability(
|
||||
async def check_reachable(
|
||||
topology: Topology,
|
||||
self_node_id: NodeId,
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile],
|
||||
node_network: Mapping[NodeId, NodeNetworkInfo],
|
||||
) -> dict[NodeId, set[str]]:
|
||||
"""Check which nodes are reachable and return their IPs."""
|
||||
|
||||
@@ -98,11 +98,11 @@ async def check_reachable(
|
||||
create_task_group() as tg,
|
||||
):
|
||||
for node_id in topology.list_nodes():
|
||||
if node_id not in node_profiles:
|
||||
if node_id not in node_network:
|
||||
continue
|
||||
if node_id == self_node_id:
|
||||
continue
|
||||
for iface in node_profiles[node_id].network_interfaces:
|
||||
for iface in node_network[node_id].interfaces:
|
||||
tg.start_soon(
|
||||
check_reachability,
|
||||
iface.ip_address,
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from typing import Any
|
||||
|
||||
import mlx.core as mx
|
||||
import mlx.nn as nn
|
||||
from mlx_lm.models.cache import KVCache
|
||||
@@ -17,27 +15,3 @@ class Model(nn.Module):
|
||||
cache: list[KVCache] | None,
|
||||
input_embeddings: mx.array | None = None,
|
||||
) -> mx.array: ...
|
||||
|
||||
|
||||
class Detokenizer:
|
||||
def reset(self) -> None: ...
|
||||
def add_token(self, token: int) -> None: ...
|
||||
def finalize(self) -> None: ...
|
||||
|
||||
@property
|
||||
def last_segment(self) -> str: ...
|
||||
|
||||
|
||||
class TokenizerWrapper:
|
||||
bos_token: str | None
|
||||
eos_token_ids: list[int]
|
||||
detokenizer: Detokenizer
|
||||
|
||||
def encode(self, text: str, add_special_tokens: bool = True) -> list[int]: ...
|
||||
|
||||
def apply_chat_template(
|
||||
self,
|
||||
messages_dicts: list[dict[str, Any]],
|
||||
tokenize: bool = False,
|
||||
add_generation_prompt: bool = True,
|
||||
) -> str: ...
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import os
|
||||
import threading
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Callable
|
||||
from functools import partial
|
||||
from inspect import signature
|
||||
from typing import TYPE_CHECKING, Callable, Protocol, cast
|
||||
from typing import TYPE_CHECKING, Any, Protocol, cast
|
||||
|
||||
import mlx.core as mx
|
||||
import mlx.nn as nn
|
||||
@@ -29,6 +32,40 @@ from mlx_lm.models.qwen3_next import Qwen3NextSparseMoeBlock
|
||||
from exo.shared.logging import logger
|
||||
from exo.shared.types.worker.shards import PipelineShardMetadata
|
||||
|
||||
TimeoutCallback = Callable[[], None]
|
||||
|
||||
|
||||
def eval_with_timeout(
|
||||
mlx_item: Any, # pyright: ignore[reportAny]
|
||||
timeout_seconds: float = 60.0,
|
||||
on_timeout: TimeoutCallback | None = None,
|
||||
) -> None:
|
||||
"""Evaluate MLX item with a hard timeout.
|
||||
|
||||
If on_timeout callback is provided, it will be called before terminating
|
||||
the process. This allows the runner to send a failure event before exit.
|
||||
"""
|
||||
completed = threading.Event()
|
||||
|
||||
def watchdog() -> None:
|
||||
if not completed.wait(timeout=timeout_seconds):
|
||||
logger.error(
|
||||
f"mlx_item evaluation timed out after {timeout_seconds:.0f}s. "
|
||||
"This may indicate an issue with FAST_SYNCH and tensor parallel sharding. "
|
||||
"Terminating process."
|
||||
)
|
||||
if on_timeout is not None:
|
||||
on_timeout()
|
||||
os._exit(1)
|
||||
|
||||
watchdog_thread = threading.Thread(target=watchdog, daemon=True)
|
||||
watchdog_thread.start()
|
||||
|
||||
try:
|
||||
mx.eval(mlx_item) # pyright: ignore[reportAny]
|
||||
finally:
|
||||
completed.set()
|
||||
|
||||
|
||||
class _LayerCallable(Protocol):
|
||||
"""Structural type that any compatible layer must satisfy.
|
||||
@@ -225,9 +262,37 @@ def patch_pipeline_model[T](model: T, group: mx.distributed.Group) -> T:
|
||||
return model
|
||||
|
||||
|
||||
def patch_tensor_model[T](model: T) -> T:
|
||||
"""Patch model's __call__ to ensure distributed ops sync during inference."""
|
||||
cls = model.__class__
|
||||
original_call = cls.__call__
|
||||
call_signature = signature(original_call)
|
||||
|
||||
def patched_call(
|
||||
self: T,
|
||||
*args: object,
|
||||
**kwargs: object,
|
||||
) -> mx.array:
|
||||
logits: mx.array = original_call(self, *args, **kwargs) # pyright: ignore[reportAny]
|
||||
cache = call_signature.bind_partial(self, *args, **kwargs).arguments.get(
|
||||
"cache", None
|
||||
)
|
||||
|
||||
# Add dependency to last cache entry to ensure distributed ops are evaluated
|
||||
if cache is not None and len(cache) > 0: # pyright: ignore[reportAny]
|
||||
cache[-1].state = mx.depends(cache[-1].state, logits) # pyright: ignore[reportAny,reportUnknownMemberType]
|
||||
|
||||
return logits
|
||||
|
||||
cls.__call__ = patched_call
|
||||
return model
|
||||
|
||||
|
||||
def tensor_auto_parallel(
|
||||
model: nn.Module,
|
||||
group: mx.distributed.Group,
|
||||
timeout_seconds: float = 60.0,
|
||||
on_timeout: TimeoutCallback | None = None,
|
||||
) -> nn.Module:
|
||||
all_to_sharded_linear = partial(
|
||||
shard_linear,
|
||||
@@ -272,7 +337,7 @@ def tensor_auto_parallel(
|
||||
if hasattr(model, "shard"):
|
||||
try:
|
||||
model.shard(group) # type: ignore
|
||||
return model
|
||||
return patch_tensor_model(model)
|
||||
except (AttributeError, TypeError, NameError):
|
||||
pass
|
||||
|
||||
@@ -322,7 +387,10 @@ def tensor_auto_parallel(
|
||||
else:
|
||||
raise ValueError(f"Unsupported model type: {type(model)}")
|
||||
|
||||
return tensor_parallel_sharding_strategy.shard_model(model)
|
||||
model = tensor_parallel_sharding_strategy.shard_model(
|
||||
model, timeout_seconds, on_timeout
|
||||
)
|
||||
return patch_tensor_model(model)
|
||||
|
||||
|
||||
class TensorParallelShardingStrategy(ABC):
|
||||
@@ -342,13 +410,27 @@ class TensorParallelShardingStrategy(ABC):
|
||||
self.N = group.size()
|
||||
|
||||
@abstractmethod
|
||||
def shard_model(self, model: nn.Module) -> nn.Module: ...
|
||||
def shard_model(
|
||||
self,
|
||||
model: nn.Module,
|
||||
timeout_seconds: float,
|
||||
on_timeout: TimeoutCallback | None,
|
||||
) -> nn.Module: ...
|
||||
|
||||
|
||||
class LlamaShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
def shard_model(
|
||||
self,
|
||||
model: nn.Module,
|
||||
timeout_seconds: float,
|
||||
on_timeout: TimeoutCallback | None,
|
||||
) -> nn.Module:
|
||||
model = cast(LlamaModel, model)
|
||||
for layer in model.layers:
|
||||
# Force load weights before sharding to avoid FAST_SYNCH deadlock
|
||||
eval_with_timeout(
|
||||
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
||||
)
|
||||
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
||||
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
||||
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
|
||||
@@ -391,9 +473,17 @@ def _set_layers(model: nn.Module, layers: list[_LayerCallable]) -> None:
|
||||
|
||||
|
||||
class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
def shard_model(
|
||||
self,
|
||||
model: nn.Module,
|
||||
timeout_seconds: float,
|
||||
on_timeout: TimeoutCallback | None,
|
||||
) -> nn.Module:
|
||||
model = cast(DeepseekV3Model, model)
|
||||
for layer in model.layers:
|
||||
eval_with_timeout(
|
||||
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
||||
)
|
||||
# Shard the self attention
|
||||
if layer.self_attn.q_lora_rank is None:
|
||||
layer.self_attn.q_proj = self.all_to_sharded_linear(
|
||||
@@ -445,9 +535,17 @@ class ShardedDeepseekV3MoE(CustomMlxLayer):
|
||||
|
||||
|
||||
class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
def shard_model(
|
||||
self,
|
||||
model: nn.Module,
|
||||
timeout_seconds: float,
|
||||
on_timeout: TimeoutCallback | None,
|
||||
) -> nn.Module:
|
||||
model = cast(MiniMaxModel, model)
|
||||
for layer in model.layers:
|
||||
eval_with_timeout(
|
||||
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
||||
)
|
||||
# Shard the self attention
|
||||
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
||||
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
||||
@@ -468,15 +566,23 @@ class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
|
||||
layer.block_sparse_moe.switch_mlp.up_proj
|
||||
)
|
||||
layer.block_sparse_moe = ShardedQwenMoE(layer.block_sparse_moe) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
|
||||
layer.block_sparse_moe.sharding_group = self.group
|
||||
layer.block_sparse_moe.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
|
||||
|
||||
return model
|
||||
|
||||
|
||||
class QwenShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
def shard_model(
|
||||
self,
|
||||
model: nn.Module,
|
||||
timeout_seconds: float,
|
||||
on_timeout: TimeoutCallback | None,
|
||||
) -> nn.Module:
|
||||
model = cast(Qwen3MoeModel, model)
|
||||
for layer in model.layers:
|
||||
eval_with_timeout(
|
||||
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
||||
)
|
||||
# Shard the self attention
|
||||
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
||||
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
||||
@@ -520,10 +626,18 @@ class ShardedQwenMoE(CustomMlxLayer):
|
||||
|
||||
|
||||
class GptOssShardingStrategy(TensorParallelShardingStrategy):
|
||||
def shard_model(self, model: nn.Module) -> nn.Module:
|
||||
def shard_model(
|
||||
self,
|
||||
model: nn.Module,
|
||||
timeout_seconds: float,
|
||||
on_timeout: TimeoutCallback | None,
|
||||
) -> nn.Module:
|
||||
model = cast(GptOssMoeModel, model)
|
||||
|
||||
for layer in model.layers:
|
||||
eval_with_timeout(
|
||||
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
||||
)
|
||||
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
||||
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
||||
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
|
||||
@@ -547,7 +661,7 @@ class GptOssShardingStrategy(TensorParallelShardingStrategy):
|
||||
self.all_to_sharded_linear_in_place(layer.mlp.experts.up_proj)
|
||||
|
||||
layer.mlp = ShardedGptOssMoE(layer.mlp) # type: ignore
|
||||
layer.mlp.sharding_group = self.group
|
||||
layer.mlp.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
|
||||
|
||||
return model
|
||||
|
||||
|
||||
@@ -119,6 +119,7 @@ def mlx_generate(
|
||||
model: Model,
|
||||
tokenizer: TokenizerWrapper,
|
||||
task: ChatCompletionTaskParams,
|
||||
prompt: str,
|
||||
) -> Generator[GenerationResponse]:
|
||||
# Ensure that generation stats only contains peak memory for this generation
|
||||
mx.reset_peak_memory()
|
||||
@@ -130,11 +131,6 @@ def mlx_generate(
|
||||
if task.seed is not None:
|
||||
mx.random.seed(task.seed)
|
||||
|
||||
prompt = apply_chat_template(
|
||||
tokenizer=tokenizer,
|
||||
chat_task_data=task,
|
||||
)
|
||||
|
||||
caches = make_kv_cache(model=model)
|
||||
|
||||
logits_processors: list[Callable[[mx.array, mx.array], mx.array]] = []
|
||||
|
||||
@@ -2,9 +2,7 @@ import json
|
||||
import os
|
||||
import resource
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
@@ -59,6 +57,8 @@ from exo.shared.types.worker.shards import (
|
||||
from exo.worker.download.download_utils import build_model_path
|
||||
from exo.worker.engines.mlx import Model
|
||||
from exo.worker.engines.mlx.auto_parallel import (
|
||||
TimeoutCallback,
|
||||
eval_with_timeout,
|
||||
pipeline_auto_parallel,
|
||||
tensor_auto_parallel,
|
||||
)
|
||||
@@ -88,41 +88,6 @@ class ModelLoadingTimeoutError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
TimeoutCallback = Callable[[], None]
|
||||
|
||||
|
||||
def eval_with_timeout(
|
||||
mlx_item: Any, # pyright: ignore[reportAny]
|
||||
timeout_seconds: float = 60.0,
|
||||
on_timeout: TimeoutCallback | None = None,
|
||||
) -> None:
|
||||
"""Evaluate MLX item with a hard timeout.
|
||||
|
||||
If on_timeout callback is provided, it will be called before terminating
|
||||
the process. This allows the runner to send a failure event before exit.
|
||||
"""
|
||||
completed = threading.Event()
|
||||
|
||||
def watchdog() -> None:
|
||||
if not completed.wait(timeout=timeout_seconds):
|
||||
logger.error(
|
||||
f"mlx_item evaluation timed out after {timeout_seconds:.0f}s. "
|
||||
"This may indicate an issue with FAST_SYNCH and tensor parallel sharding. "
|
||||
"Terminating process."
|
||||
)
|
||||
if on_timeout is not None:
|
||||
on_timeout()
|
||||
os._exit(1)
|
||||
|
||||
watchdog_thread = threading.Thread(target=watchdog, daemon=True)
|
||||
watchdog_thread.start()
|
||||
|
||||
try:
|
||||
mx.eval(mlx_item) # pyright: ignore[reportAny]
|
||||
finally:
|
||||
completed.set()
|
||||
|
||||
|
||||
def mx_barrier(group: Group | None = None):
|
||||
mx.eval(
|
||||
mx.distributed.all_sum(
|
||||
@@ -204,10 +169,10 @@ def mlx_distributed_init(
|
||||
|
||||
# TODO: update once upstream fixes
|
||||
logger.info(
|
||||
f"rank {rank} MLX_IBV_DEVICES: {coordination_file} with devices: {jaccl_devices_json}"
|
||||
f"rank {rank} MLX_JACCL_DEVICES: {coordination_file} with devices: {jaccl_devices_json}"
|
||||
)
|
||||
logger.info(f"rank {rank} MLX_JACCL_COORDINATOR: {jaccl_coordinator}")
|
||||
os.environ["MLX_IBV_DEVICES"] = coordination_file
|
||||
os.environ["MLX_JACCL_DEVICES"] = coordination_file
|
||||
os.environ["MLX_RANK"] = str(rank)
|
||||
os.environ["MLX_JACCL_COORDINATOR"] = jaccl_coordinator
|
||||
group = mx.distributed.init(backend="jaccl", strict=True)
|
||||
@@ -296,14 +261,6 @@ def shard_and_load(
|
||||
|
||||
logger.info(f"Group size: {group.size()}, group rank: {group.rank()}")
|
||||
|
||||
match shard_metadata:
|
||||
case TensorShardMetadata():
|
||||
logger.info(f"loading model from {model_path} with tensor parallelism")
|
||||
model = tensor_auto_parallel(model, group)
|
||||
case PipelineShardMetadata():
|
||||
logger.info(f"loading model from {model_path} with pipeline parallelism")
|
||||
model = pipeline_auto_parallel(model, group, shard_metadata)
|
||||
|
||||
# Estimate timeout based on model size
|
||||
base_timeout = float(os.environ.get("EXO_MODEL_LOAD_TIMEOUT", "60"))
|
||||
model_size_gb = get_weights_size(shard_metadata).in_bytes / (1024**3)
|
||||
@@ -312,7 +269,15 @@ def shard_and_load(
|
||||
f"Evaluating model parameters with timeout of {timeout_seconds:.0f}s "
|
||||
f"(model size: {model_size_gb:.1f}GB)"
|
||||
)
|
||||
eval_with_timeout(model.parameters(), timeout_seconds, on_timeout)
|
||||
|
||||
match shard_metadata:
|
||||
case TensorShardMetadata():
|
||||
logger.info(f"loading model from {model_path} with tensor parallelism")
|
||||
model = tensor_auto_parallel(model, group, timeout_seconds, on_timeout)
|
||||
case PipelineShardMetadata():
|
||||
logger.info(f"loading model from {model_path} with pipeline parallelism")
|
||||
model = pipeline_auto_parallel(model, group, shard_metadata)
|
||||
eval_with_timeout(model.parameters(), timeout_seconds, on_timeout)
|
||||
|
||||
# TODO: Do we need this?
|
||||
mx.eval(model)
|
||||
@@ -347,6 +312,9 @@ def get_eos_token_ids_for_model(model_id: str) -> list[int] | None:
|
||||
model_id_lower = model_id.lower()
|
||||
if "kimi-k2" in model_id_lower:
|
||||
return [163586]
|
||||
elif "glm-4.7-flash" in model_id_lower:
|
||||
# 154820: <|endoftext|>, 154827: <|user|>, 154829: <|observation|>
|
||||
return [154820, 154827, 154829]
|
||||
elif "glm" in model_id_lower:
|
||||
return [151336, 151329, 151338]
|
||||
return None
|
||||
@@ -431,6 +399,16 @@ def apply_chat_template(
|
||||
return prompt
|
||||
|
||||
|
||||
def detect_thinking_prompt_suffix(prompt: str, tokenizer: TokenizerWrapper) -> bool:
|
||||
"""
|
||||
Detect if prompt ends with a thinking opening tag that should be
|
||||
prepended to the output stream.
|
||||
"""
|
||||
think_token = tokenizer.think_start
|
||||
|
||||
return think_token is not None and prompt.rstrip().endswith(think_token)
|
||||
|
||||
|
||||
class NullKVCache(KVCache):
|
||||
"""
|
||||
A KVCache that pretends to exist but holds zero tokens.
|
||||
|
||||
@@ -409,7 +409,7 @@ class Worker:
|
||||
conns = await check_reachable(
|
||||
self.state.topology,
|
||||
self.node_id,
|
||||
self.state.node_profiles,
|
||||
self.state.node_network,
|
||||
)
|
||||
for nid in conns:
|
||||
for ip in conns[nid]:
|
||||
|
||||
@@ -4,6 +4,7 @@ from functools import cache
|
||||
|
||||
import mlx.core as mx
|
||||
from mlx_lm.models.gpt_oss import Model as GptOssModel
|
||||
from mlx_lm.tokenizer_utils import TokenizerWrapper
|
||||
from openai_harmony import ( # pyright: ignore[reportMissingTypeStubs]
|
||||
HarmonyEncodingName,
|
||||
Role,
|
||||
@@ -50,6 +51,8 @@ from exo.shared.types.worker.runners import (
|
||||
from exo.utils.channels import MpReceiver, MpSender
|
||||
from exo.worker.engines.mlx.generator.generate import mlx_generate, warmup_inference
|
||||
from exo.worker.engines.mlx.utils_mlx import (
|
||||
apply_chat_template,
|
||||
detect_thinking_prompt_suffix,
|
||||
initialize_mlx,
|
||||
load_mlx_items,
|
||||
mlx_force_oom,
|
||||
@@ -177,17 +180,28 @@ def main(
|
||||
try:
|
||||
_check_for_debug_prompts(task_params.messages[0].content)
|
||||
|
||||
# Build prompt once - used for both generation and thinking detection
|
||||
prompt = apply_chat_template(tokenizer, task_params)
|
||||
|
||||
# Generate responses using the actual MLX generation
|
||||
mlx_generator = mlx_generate(
|
||||
model=model,
|
||||
tokenizer=tokenizer,
|
||||
task=task_params,
|
||||
prompt=prompt,
|
||||
)
|
||||
|
||||
# GPT-OSS specific parsing to match other model formats.
|
||||
if isinstance(model, GptOssModel):
|
||||
mlx_generator = parse_gpt_oss(mlx_generator)
|
||||
|
||||
# For other thinking models (GLM, etc.), check if we need to
|
||||
# prepend the thinking tag that was consumed by the chat template
|
||||
if detect_thinking_prompt_suffix(prompt, tokenizer):
|
||||
mlx_generator = parse_thinking_models(
|
||||
mlx_generator, tokenizer
|
||||
)
|
||||
|
||||
# TODO: Add tool call parser here
|
||||
|
||||
for response in mlx_generator:
|
||||
@@ -293,6 +307,28 @@ def parse_gpt_oss(
|
||||
break
|
||||
|
||||
|
||||
def parse_thinking_models(
|
||||
responses: Generator[GenerationResponse],
|
||||
tokenizer: TokenizerWrapper,
|
||||
) -> Generator[GenerationResponse]:
|
||||
"""
|
||||
For models that inject thinking tags in the prompt (like GLM-4.7),
|
||||
prepend the thinking tag to the output stream so the frontend
|
||||
can properly parse thinking content.
|
||||
"""
|
||||
first = True
|
||||
for response in responses:
|
||||
if first:
|
||||
first = False
|
||||
yield response.model_copy(
|
||||
update={
|
||||
"text": tokenizer.think_start,
|
||||
"token": tokenizer.think_start_id, # type: ignore
|
||||
}
|
||||
)
|
||||
yield response
|
||||
|
||||
|
||||
EXO_RUNNER_MUST_FAIL = "EXO RUNNER MUST FAIL"
|
||||
EXO_RUNNER_MUST_OOM = "EXO RUNNER MUST OOM"
|
||||
EXO_RUNNER_MUST_TIMEOUT = "EXO RUNNER MUST TIMEOUT"
|
||||
|
||||
@@ -114,6 +114,10 @@ def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(mlx_runner, "load_mlx_items", make_nothin((1, 1)))
|
||||
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
|
||||
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", nothin)
|
||||
# Mock apply_chat_template since we're using a fake tokenizer (integer 1).
|
||||
# Returns a prompt without thinking tag so detect_thinking_prompt_suffix returns None.
|
||||
monkeypatch.setattr(mlx_runner, "apply_chat_template", make_nothin("test prompt"))
|
||||
monkeypatch.setattr(mlx_runner, "detect_thinking_prompt_suffix", make_nothin(False))
|
||||
|
||||
def fake_generate(*_1: object, **_2: object):
|
||||
yield GenerationResponse(token=0, text="hi", finish_reason="stop")
|
||||
|
||||
Reference in New Issue
Block a user