Compare commits

..

1 Commits

Author SHA1 Message Date
Alex Cheema
e3465afae3 Split NodePerformanceProfile state storage into separate mappings
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 21:28:42 +00:00
13 changed files with 440 additions and 266 deletions

View File

@@ -40,31 +40,6 @@ uv run ruff check
nix fmt
```
## Pre-Commit Checks (REQUIRED)
**IMPORTANT: Always run these checks before committing code. CI will fail if these don't pass.**
```bash
# 1. Type checking - MUST pass with 0 errors
uv run basedpyright
# 2. Linting - MUST pass
uv run ruff check
# 3. Formatting - MUST be applied
nix fmt
# 4. Tests - MUST pass
uv run pytest
```
Run all checks in sequence:
```bash
uv run basedpyright && uv run ruff check && nix fmt && uv run pytest
```
If `nix fmt` changes any files, stage them before committing. The CI runs `nix flake check` which verifies formatting, linting, and runs Rust tests.
## Architecture
### Node Composition

View File

@@ -71,35 +71,36 @@ export interface Instance {
};
}
interface RawNodeProfile {
modelId?: string;
chipId?: string;
friendlyName?: string;
networkInterfaces?: Array<{
name?: string;
ipAddress?: string;
addresses?: Array<{ address?: string } | string>;
ipv4?: string;
ipv6?: string;
ipAddresses?: string[];
ips?: string[];
}>;
memory?: {
ramTotal?: { inBytes: number };
ramAvailable?: { inBytes: number };
swapTotal?: { inBytes: number };
swapAvailable?: { inBytes: number };
};
system?: {
gpuUsage?: number;
temp?: number;
sysPower?: number;
};
// Split state interfaces
interface RawNodeIdentity {
modelId: string;
chipId: string;
friendlyName: string;
}
interface RawNodeMemory {
ramTotal: { inBytes: number };
ramAvailable: { inBytes: number };
swapTotal: { inBytes: number };
swapAvailable: { inBytes: number };
}
interface RawNodeSystem {
gpuUsage?: number;
temp?: number;
sysPower?: number;
pcpuUsage?: number;
ecpuUsage?: number;
anePower?: number;
}
interface RawNetworkInterface {
name: string;
ipAddress: string;
}
interface RawTopologyNode {
nodeId: string;
nodeProfile: RawNodeProfile;
}
interface RawTopologyConnection {
@@ -115,8 +116,6 @@ interface RawTopology {
connections?: RawTopologyConnection[];
}
type RawNodeProfiles = Record<string, RawNodeProfile>;
export interface DownloadProgress {
totalBytes: number;
downloadedBytes: number;
@@ -171,7 +170,11 @@ interface RawStateResponse {
>;
runners?: Record<string, unknown>;
downloads?: Record<string, unknown[]>;
nodeProfiles?: RawNodeProfiles;
// Split state fields
nodeIdentities?: Record<string, RawNodeIdentity>;
nodeMemories?: Record<string, RawNodeMemory>;
nodeSystems?: Record<string, RawNodeSystem>;
nodeNetworks?: Record<string, RawNetworkInterface[]>;
}
export interface MessageAttachment {
@@ -208,66 +211,41 @@ const STORAGE_KEY = "exo-conversations";
function transformTopology(
raw: RawTopology,
profiles?: RawNodeProfiles,
identities?: Record<string, RawNodeIdentity>,
memories?: Record<string, RawNodeMemory>,
systems?: Record<string, RawNodeSystem>,
networks?: Record<string, RawNetworkInterface[]>,
): TopologyData {
const nodes: Record<string, NodeInfo> = {};
const edges: TopologyEdge[] = [];
for (const node of raw.nodes || []) {
const mergedProfile = profiles?.[node.nodeId];
const profile = { ...(node.nodeProfile ?? {}), ...(mergedProfile ?? {}) };
const ramTotal = profile?.memory?.ramTotal?.inBytes ?? 0;
const ramAvailable = profile?.memory?.ramAvailable?.inBytes ?? 0;
// Get split state fields (may be undefined if events haven't arrived yet)
const identity = identities?.[node.nodeId];
const memory = memories?.[node.nodeId];
const system = systems?.[node.nodeId];
const network = networks?.[node.nodeId];
const ramTotal = memory?.ramTotal?.inBytes ?? 0;
const ramAvailable = memory?.ramAvailable?.inBytes ?? 0;
const ramUsage = Math.max(ramTotal - ramAvailable, 0);
const networkInterfaces = (profile?.networkInterfaces || []).map(
(iface) => {
const addresses: string[] = [];
if (iface.ipAddress && typeof iface.ipAddress === "string") {
addresses.push(iface.ipAddress);
}
if (Array.isArray(iface.addresses)) {
for (const addr of iface.addresses) {
if (typeof addr === "string") addresses.push(addr);
else if (addr && typeof addr === "object" && addr.address)
addresses.push(addr.address);
}
}
if (Array.isArray(iface.ipAddresses)) {
addresses.push(
...iface.ipAddresses.filter(
(a): a is string => typeof a === "string",
),
);
}
if (Array.isArray(iface.ips)) {
addresses.push(
...iface.ips.filter((a): a is string => typeof a === "string"),
);
}
if (iface.ipv4 && typeof iface.ipv4 === "string")
addresses.push(iface.ipv4);
if (iface.ipv6 && typeof iface.ipv6 === "string")
addresses.push(iface.ipv6);
return {
name: iface.name,
addresses: Array.from(new Set(addresses)),
};
},
);
const networkInterfaces = (network ?? []).map((iface) => ({
name: iface.name,
addresses: [iface.ipAddress],
}));
const ipToInterface: Record<string, string> = {};
for (const iface of networkInterfaces) {
for (const addr of iface.addresses || []) {
ipToInterface[addr] = iface.name ?? "";
for (const addr of iface.addresses) {
ipToInterface[addr] = iface.name;
}
}
nodes[node.nodeId] = {
system_info: {
model_id: profile?.modelId ?? "Unknown",
chip: profile?.chipId,
model_id: identity?.modelId ?? "Unknown",
chip: identity?.chipId,
memory: ramTotal,
},
network_interfaces: networkInterfaces,
@@ -278,17 +256,15 @@ function transformTopology(
ram_total: ramTotal,
},
temp:
profile?.system?.temp !== undefined
? { gpu_temp_avg: profile.system.temp }
system?.temp !== undefined
? { gpu_temp_avg: system.temp }
: undefined,
gpu_usage:
profile?.system?.gpuUsage !== undefined
? [0, profile.system.gpuUsage]
: undefined,
sys_power: profile?.system?.sysPower,
system?.gpuUsage !== undefined ? [0, system.gpuUsage] : undefined,
sys_power: system?.sysPower,
},
last_macmon_update: Date.now() / 1000,
friendly_name: profile?.friendlyName,
friendly_name: identity?.friendlyName,
};
}
@@ -868,7 +844,13 @@ class AppStore {
const data: RawStateResponse = await response.json();
if (data.topology) {
this.topologyData = transformTopology(data.topology, data.nodeProfiles);
this.topologyData = transformTopology(
data.topology,
data.nodeIdentities,
data.nodeMemories,
data.nodeSystems,
data.nodeNetworks,
);
}
if (data.instances) {
this.instances = data.instances;

View File

@@ -599,9 +599,8 @@ class API:
"""Calculate total available memory across all nodes in bytes."""
total_available = Memory()
for node in self.state.topology.list_nodes():
if node.node_profile is not None:
total_available += node.node_profile.memory.ram_available
for memory in self.state.node_memories.values():
total_available += memory.ram_available
return total_available

View File

@@ -113,6 +113,7 @@ def place_instance(
node.node_profile.memory.ram_available
for node in cycle
if node.node_profile is not None
and node.node_profile.memory is not None
),
start=Memory(),
),

View File

@@ -25,7 +25,10 @@ class NodeWithProfile(BaseModel):
def narrow_all_nodes(nodes: list[NodeInfo]) -> TypeGuard[list[NodeWithProfile]]:
return all(node.node_profile is not None for node in nodes)
return all(
node.node_profile is not None and node.node_profile.memory is not None
for node in nodes
)
def filter_cycles_by_memory(
@@ -36,8 +39,14 @@ def filter_cycles_by_memory(
if not narrow_all_nodes(cycle):
continue
# narrow_all_nodes guarantees memory is not None
total_mem = sum(
(node.node_profile.memory.ram_available for node in cycle), start=Memory()
(
node.node_profile.memory.ram_available
for node in cycle
if node.node_profile.memory is not None
),
start=Memory(),
)
if total_mem >= required_memory:
filtered_cycles.append(cast(list[NodeInfo], cycle))
@@ -53,8 +62,13 @@ def get_shard_assignments_for_pipeline_parallel(
model_meta: ModelMetadata,
selected_cycle: list[NodeWithProfile],
):
# NodeWithProfile guarantees memory is not None
cycle_memory = sum(
(node.node_profile.memory.ram_available for node in selected_cycle),
(
node.node_profile.memory.ram_available
for node in selected_cycle
if node.node_profile.memory is not None
),
start=Memory(),
)
total_layers = model_meta.n_layers
@@ -67,6 +81,8 @@ def get_shard_assignments_for_pipeline_parallel(
if i == len(selected_cycle) - 1:
node_layers = total_layers - layers_assigned
else:
# NodeWithProfile guarantees memory is not None
assert node.node_profile.memory is not None
node_layers = round(
total_layers
* (

View File

@@ -19,16 +19,13 @@ from exo.shared.types.events import (
ForwarderEvent,
IndexedEvent,
InstanceCreated,
NodePerformanceMeasured,
NodeIdentityMeasured,
NodeMemoryMeasured,
TaskCreated,
)
from exo.shared.types.memory import Memory
from exo.shared.types.models import ModelId, ModelMetadata
from exo.shared.types.profiling import (
MemoryPerformanceProfile,
NodePerformanceProfile,
SystemPerformanceProfile,
)
from exo.shared.types.profiling import MemoryPerformanceProfile
from exo.shared.types.tasks import ChatCompletion as ChatCompletionTask
from exo.shared.types.tasks import TaskStatus
from exo.shared.types.worker.instances import (
@@ -75,29 +72,39 @@ async def test_master():
tg.start_soon(master.run)
sender_node_id = NodeId(f"{keypair.to_peer_id().to_base58()}_sender")
# inject a NodePerformanceProfile event
logger.info("inject a NodePerformanceProfile event")
# inject NodeIdentityMeasured and NodeMemoryMeasured events
logger.info("inject NodeIdentityMeasured event")
await local_event_sender.send(
ForwarderEvent(
origin_idx=0,
origin=sender_node_id,
session=session_id,
event=(
NodePerformanceMeasured(
NodeIdentityMeasured(
when=str(datetime.now(tz=timezone.utc)),
node_id=node_id,
node_profile=NodePerformanceProfile(
model_id="maccy",
chip_id="arm",
friendly_name="test",
memory=MemoryPerformanceProfile(
ram_total=Memory.from_bytes(678948 * 1024),
ram_available=Memory.from_bytes(678948 * 1024),
swap_total=Memory.from_bytes(0),
swap_available=Memory.from_bytes(0),
),
network_interfaces=[],
system=SystemPerformanceProfile(),
model_id="maccy",
chip_id="arm",
friendly_name="test",
)
),
)
)
logger.info("inject NodeMemoryMeasured event")
await local_event_sender.send(
ForwarderEvent(
origin_idx=1,
origin=sender_node_id,
session=session_id,
event=(
NodeMemoryMeasured(
when=str(datetime.now(tz=timezone.utc)),
node_id=node_id,
memory=MemoryPerformanceProfile(
ram_total=Memory.from_bytes(678948 * 1024),
ram_available=Memory.from_bytes(678948 * 1024),
swap_total=Memory.from_bytes(0),
swap_available=Memory.from_bytes(0),
),
)
),
@@ -108,7 +115,7 @@ async def test_master():
logger.info("wait for initial topology event")
while len(list(master.state.topology.list_nodes())) == 0:
await anyio.sleep(0.001)
while len(master.state.node_profiles) == 0:
while len(master.state.node_identities) == 0:
await anyio.sleep(0.001)
logger.info("inject a CreateInstance Command")
@@ -155,17 +162,19 @@ async def test_master():
),
)
)
while len(_get_events()) < 3:
while len(_get_events()) < 4:
await anyio.sleep(0.01)
events = _get_events()
assert len(events) == 3
assert len(events) == 4
assert events[0].idx == 0
assert events[1].idx == 1
assert events[2].idx == 2
assert isinstance(events[0].event, NodePerformanceMeasured)
assert isinstance(events[1].event, InstanceCreated)
created_instance = events[1].event.instance
assert events[3].idx == 3
assert isinstance(events[0].event, NodeIdentityMeasured)
assert isinstance(events[1].event, NodeMemoryMeasured)
assert isinstance(events[2].event, InstanceCreated)
created_instance = events[2].event.instance
assert isinstance(created_instance, MlxRingInstance)
runner_id = list(created_instance.shard_assignments.runner_to_shard.keys())[0]
# Validate the shard assignments
@@ -197,10 +206,10 @@ async def test_master():
assert len(created_instance.hosts_by_node[node_id]) == 1
assert created_instance.hosts_by_node[node_id][0].ip == "0.0.0.0"
assert created_instance.ephemeral_port > 0
assert isinstance(events[2].event, TaskCreated)
assert events[2].event.task.task_status == TaskStatus.Pending
assert isinstance(events[2].event.task, ChatCompletionTask)
assert events[2].event.task.task_params == ChatCompletionTaskParams(
assert isinstance(events[3].event, TaskCreated)
assert events[3].event.task.task_status == TaskStatus.Pending
assert isinstance(events[3].event.task, ChatCompletionTask)
assert events[3].event.task.task_params == ChatCompletionTaskParams(
model="llama-3.2-1b",
messages=[
ChatCompletionMessage(role="user", content="Hello, how are you?")

View File

@@ -13,8 +13,10 @@ from exo.shared.types.events import (
InstanceDeleted,
NodeCreated,
NodeDownloadProgress,
NodeIdentityMeasured,
NodeMemoryMeasured,
NodePerformanceMeasured,
NodeNetworkMeasured,
NodeSystemMeasured,
NodeTimedOut,
RunnerDeleted,
RunnerStatusUpdated,
@@ -27,7 +29,13 @@ from exo.shared.types.events import (
TopologyEdgeCreated,
TopologyEdgeDeleted,
)
from exo.shared.types.profiling import NodePerformanceProfile, SystemPerformanceProfile
from exo.shared.types.profiling import (
MemoryPerformanceProfile,
NetworkInterfaceInfo,
NodeIdentity,
NodePerformanceProfile,
SystemPerformanceProfile,
)
from exo.shared.types.state import State
from exo.shared.types.tasks import Task, TaskId, TaskStatus
from exo.shared.types.topology import NodeInfo
@@ -51,8 +59,12 @@ def event_apply(event: Event, state: State) -> State:
return apply_topology_node_created(event, state)
case NodeTimedOut():
return apply_node_timed_out(event, state)
case NodePerformanceMeasured():
return apply_node_performance_measured(event, state)
case NodeIdentityMeasured():
return apply_node_identity_measured(event, state)
case NodeSystemMeasured():
return apply_node_system_measured(event, state)
case NodeNetworkMeasured():
return apply_node_network_measured(event, state)
case NodeDownloadProgress():
return apply_node_download_progress(event, state)
case NodeMemoryMeasured():
@@ -190,8 +202,19 @@ def apply_runner_deleted(event: RunnerDeleted, state: State) -> State:
def apply_node_timed_out(event: NodeTimedOut, state: State) -> State:
topology = copy.copy(state.topology)
state.topology.remove_node(event.node_id)
node_profiles = {
key: value for key, value in state.node_profiles.items() if key != event.node_id
node_identities = {
key: value
for key, value in state.node_identities.items()
if key != event.node_id
}
node_memories = {
key: value for key, value in state.node_memories.items() if key != event.node_id
}
node_systems = {
key: value for key, value in state.node_systems.items() if key != event.node_id
}
node_networks = {
key: value for key, value in state.node_networks.items() if key != event.node_id
}
last_seen = {
key: value for key, value in state.last_seen.items() if key != event.node_id
@@ -199,32 +222,120 @@ def apply_node_timed_out(event: NodeTimedOut, state: State) -> State:
return state.model_copy(
update={
"topology": topology,
"node_profiles": node_profiles,
"node_identities": node_identities,
"node_memories": node_memories,
"node_systems": node_systems,
"node_networks": node_networks,
"last_seen": last_seen,
}
)
def apply_node_performance_measured(
event: NodePerformanceMeasured, state: State
) -> State:
new_profiles: Mapping[NodeId, NodePerformanceProfile] = {
**state.node_profiles,
event.node_id: event.node_profile,
def _reconstruct_profile(
node_id: NodeId,
state: State,
*,
identity: NodeIdentity | None = None,
memory: MemoryPerformanceProfile | None = None,
system: SystemPerformanceProfile | None = None,
network_interfaces: list[NetworkInterfaceInfo] | None = None,
) -> NodePerformanceProfile:
"""Reconstruct a NodePerformanceProfile from split state storage.
Uses provided overrides, falling back to state values.
"""
ident = identity or state.node_identities.get(node_id)
mem = memory or state.node_memories.get(node_id)
sys = system or state.node_systems.get(node_id)
nets = (
network_interfaces
if network_interfaces is not None
else state.node_networks.get(node_id, [])
)
return NodePerformanceProfile(
model_id=ident.model_id if ident else None,
chip_id=ident.chip_id if ident else None,
friendly_name=ident.friendly_name if ident else None,
memory=mem,
network_interfaces=nets,
system=sys,
)
def apply_node_identity_measured(event: NodeIdentityMeasured, state: State) -> State:
topology = copy.copy(state.topology)
identity = NodeIdentity(
model_id=event.model_id,
chip_id=event.chip_id,
friendly_name=event.friendly_name,
)
new_identities: Mapping[NodeId, NodeIdentity] = {
**state.node_identities,
event.node_id: identity,
}
last_seen: Mapping[NodeId, datetime] = {
**state.last_seen,
event.node_id: datetime.fromisoformat(event.when),
}
state = state.model_copy(update={"node_profiles": new_profiles})
topology = copy.copy(state.topology)
# TODO: NodeCreated
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
topology.update_node_profile(event.node_id, event.node_profile)
reconstructed = _reconstruct_profile(event.node_id, state, identity=identity)
topology.update_node_profile(event.node_id, reconstructed)
return state.model_copy(
update={
"node_profiles": new_profiles,
"node_identities": new_identities,
"topology": topology,
"last_seen": last_seen,
}
)
def apply_node_system_measured(event: NodeSystemMeasured, state: State) -> State:
topology = copy.copy(state.topology)
new_systems: Mapping[NodeId, SystemPerformanceProfile] = {
**state.node_systems,
event.node_id: event.system,
}
last_seen: Mapping[NodeId, datetime] = {
**state.last_seen,
event.node_id: datetime.fromisoformat(event.when),
}
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
reconstructed = _reconstruct_profile(event.node_id, state, system=event.system)
topology.update_node_profile(event.node_id, reconstructed)
return state.model_copy(
update={
"node_systems": new_systems,
"topology": topology,
"last_seen": last_seen,
}
)
def apply_node_network_measured(event: NodeNetworkMeasured, state: State) -> State:
topology = copy.copy(state.topology)
new_networks: Mapping[NodeId, list[NetworkInterfaceInfo]] = {
**state.node_networks,
event.node_id: event.network_interfaces,
}
last_seen: Mapping[NodeId, datetime] = {
**state.last_seen,
event.node_id: datetime.fromisoformat(event.when),
}
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
reconstructed = _reconstruct_profile(
event.node_id, state, network_interfaces=event.network_interfaces
)
topology.update_node_profile(event.node_id, reconstructed)
return state.model_copy(
update={
"node_networks": new_networks,
"topology": topology,
"last_seen": last_seen,
}
@@ -232,57 +343,26 @@ def apply_node_performance_measured(
def apply_node_memory_measured(event: NodeMemoryMeasured, state: State) -> State:
existing = state.node_profiles.get(event.node_id)
topology = copy.copy(state.topology)
if existing is None:
created = NodePerformanceProfile(
model_id="unknown",
chip_id="unknown",
friendly_name="Unknown",
memory=event.memory,
network_interfaces=[],
system=SystemPerformanceProfile(
# TODO: flops_fp16=0.0,
gpu_usage=0.0,
temp=0.0,
sys_power=0.0,
pcpu_usage=0.0,
ecpu_usage=0.0,
ane_power=0.0,
),
)
created_profiles: Mapping[NodeId, NodePerformanceProfile] = {
**state.node_profiles,
event.node_id: created,
}
last_seen: Mapping[NodeId, datetime] = {
**state.last_seen,
event.node_id: datetime.fromisoformat(event.when),
}
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
# TODO: NodeCreated
topology.update_node_profile(event.node_id, created)
return state.model_copy(
update={
"node_profiles": created_profiles,
"topology": topology,
"last_seen": last_seen,
}
)
updated = existing.model_copy(update={"memory": event.memory})
updated_profiles: Mapping[NodeId, NodePerformanceProfile] = {
**state.node_profiles,
event.node_id: updated,
new_memories: Mapping[NodeId, MemoryPerformanceProfile] = {
**state.node_memories,
event.node_id: event.memory,
}
last_seen: Mapping[NodeId, datetime] = {
**state.last_seen,
event.node_id: datetime.fromisoformat(event.when),
}
# TODO: NodeCreated
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
topology.update_node_profile(event.node_id, updated)
reconstructed = _reconstruct_profile(event.node_id, state, memory=event.memory)
topology.update_node_profile(event.node_id, reconstructed)
return state.model_copy(
update={"node_profiles": updated_profiles, "topology": topology}
update={
"node_memories": new_memories,
"topology": topology,
"last_seen": last_seen,
}
)

View File

@@ -2,10 +2,14 @@ from datetime import datetime
from pydantic import Field
from exo.shared.topology import Connection, NodePerformanceProfile
from exo.shared.topology import Connection
from exo.shared.types.chunks import GenerationChunk
from exo.shared.types.common import CommandId, Id, NodeId, SessionId
from exo.shared.types.profiling import MemoryPerformanceProfile
from exo.shared.types.profiling import (
MemoryPerformanceProfile,
NetworkInterfaceInfo,
SystemPerformanceProfile,
)
from exo.shared.types.tasks import Task, TaskId, TaskStatus
from exo.shared.types.worker.downloads import DownloadProgress
from exo.shared.types.worker.instances import Instance, InstanceId
@@ -85,13 +89,35 @@ class NodeTimedOut(BaseEvent):
node_id: NodeId
class NodePerformanceMeasured(BaseEvent):
class NodeIdentityMeasured(BaseEvent):
"""Static identity info - emitted once at startup."""
node_id: NodeId
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
node_profile: NodePerformanceProfile
model_id: str
chip_id: str
friendly_name: str
class NodeSystemMeasured(BaseEvent):
"""Dynamic system metrics (GPU, temp, power) - emitted at 1s intervals."""
node_id: NodeId
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
system: SystemPerformanceProfile
class NodeNetworkMeasured(BaseEvent):
"""Semi-static network interface info - emitted at 30s intervals."""
node_id: NodeId
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
network_interfaces: list[NetworkInterfaceInfo]
class NodeMemoryMeasured(BaseEvent):
"""Dynamic memory metrics - emitted at 0.5s intervals."""
node_id: NodeId
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
memory: MemoryPerformanceProfile
@@ -127,7 +153,9 @@ Event = (
| RunnerDeleted
| NodeCreated
| NodeTimedOut
| NodePerformanceMeasured
| NodeIdentityMeasured
| NodeSystemMeasured
| NodeNetworkMeasured
| NodeMemoryMeasured
| NodeDownloadProgress
| ChunkGenerated

View File

@@ -52,13 +52,21 @@ class NetworkInterfaceInfo(CamelCaseModel):
ip_address: str
class NodePerformanceProfile(CamelCaseModel):
class NodeIdentity(CamelCaseModel):
"""Static identity info for a node."""
model_id: str
chip_id: str
friendly_name: str
memory: MemoryPerformanceProfile
class NodePerformanceProfile(CamelCaseModel):
model_id: str | None = None
chip_id: str | None = None
friendly_name: str | None = None
memory: MemoryPerformanceProfile | None = None
network_interfaces: list[NetworkInterfaceInfo] = []
system: SystemPerformanceProfile
system: SystemPerformanceProfile | None = None
class ConnectionProfile(CamelCaseModel):

View File

@@ -7,7 +7,12 @@ from pydantic.alias_generators import to_camel
from exo.shared.topology import Topology, TopologySnapshot
from exo.shared.types.common import NodeId
from exo.shared.types.profiling import NodePerformanceProfile
from exo.shared.types.profiling import (
MemoryPerformanceProfile,
NetworkInterfaceInfo,
NodeIdentity,
SystemPerformanceProfile,
)
from exo.shared.types.tasks import Task, TaskId
from exo.shared.types.worker.downloads import DownloadProgress
from exo.shared.types.worker.instances import Instance, InstanceId
@@ -35,7 +40,10 @@ class State(CamelCaseModel):
runners: Mapping[RunnerId, RunnerStatus] = {}
downloads: Mapping[NodeId, Sequence[DownloadProgress]] = {}
tasks: Mapping[TaskId, Task] = {}
node_profiles: Mapping[NodeId, NodePerformanceProfile] = {}
node_identities: Mapping[NodeId, NodeIdentity] = {}
node_memories: Mapping[NodeId, MemoryPerformanceProfile] = {}
node_systems: Mapping[NodeId, SystemPerformanceProfile] = {}
node_networks: Mapping[NodeId, list[NetworkInterfaceInfo]] = {}
last_seen: Mapping[NodeId, datetime] = {}
topology: Topology = Field(default_factory=Topology)
last_event_applied_idx: int = Field(default=-1, ge=-1)

View File

@@ -16,8 +16,10 @@ from exo.shared.types.events import (
ForwarderEvent,
IndexedEvent,
NodeDownloadProgress,
NodeIdentityMeasured,
NodeMemoryMeasured,
NodePerformanceMeasured,
NodeNetworkMeasured,
NodeSystemMeasured,
TaskCreated,
TaskStatusUpdated,
TopologyEdgeCreated,
@@ -25,7 +27,11 @@ from exo.shared.types.events import (
)
from exo.shared.types.models import ModelId
from exo.shared.types.multiaddr import Multiaddr
from exo.shared.types.profiling import MemoryPerformanceProfile, NodePerformanceProfile
from exo.shared.types.profiling import (
MemoryPerformanceProfile,
NetworkInterfaceInfo,
SystemPerformanceProfile,
)
from exo.shared.types.state import State
from exo.shared.types.tasks import (
CreateRunner,
@@ -51,7 +57,13 @@ from exo.worker.download.download_utils import (
from exo.worker.download.shard_downloader import RepoDownloadProgress, ShardDownloader
from exo.worker.plan import plan
from exo.worker.runner.runner_supervisor import RunnerSupervisor
from exo.worker.utils import start_polling_memory_metrics, start_polling_node_metrics
from exo.worker.utils import (
IdentityMetrics,
start_polling_identity_metrics,
start_polling_memory_metrics,
start_polling_network_metrics,
start_polling_system_metrics,
)
from exo.worker.utils.net_profile import check_reachable
@@ -98,37 +110,51 @@ class Worker:
async def run(self):
logger.info("Starting Worker")
# TODO: CLEANUP HEADER
async def resource_monitor_callback(
node_performance_profile: NodePerformanceProfile,
) -> None:
async def identity_callback(identity: IdentityMetrics) -> None:
await self.event_sender.send(
NodePerformanceMeasured(
NodeIdentityMeasured(
node_id=self.node_id,
node_profile=node_performance_profile,
model_id=identity.model_id,
chip_id=identity.chip_id,
friendly_name=identity.friendly_name,
when=str(datetime.now(tz=timezone.utc)),
),
)
async def memory_monitor_callback(
memory_profile: MemoryPerformanceProfile,
) -> None:
async def system_callback(system: SystemPerformanceProfile) -> None:
await self.event_sender.send(
NodeSystemMeasured(
node_id=self.node_id,
system=system,
when=str(datetime.now(tz=timezone.utc)),
),
)
async def network_callback(interfaces: list[NetworkInterfaceInfo]) -> None:
await self.event_sender.send(
NodeNetworkMeasured(
node_id=self.node_id,
network_interfaces=interfaces,
when=str(datetime.now(tz=timezone.utc)),
),
)
async def memory_callback(memory: MemoryPerformanceProfile) -> None:
await self.event_sender.send(
NodeMemoryMeasured(
node_id=self.node_id,
memory=memory_profile,
memory=memory,
when=str(datetime.now(tz=timezone.utc)),
)
)
# END CLEANUP
async with create_task_group() as tg:
self._tg = tg
tg.start_soon(self.plan_step)
tg.start_soon(start_polling_node_metrics, resource_monitor_callback)
tg.start_soon(start_polling_memory_metrics, memory_monitor_callback)
tg.start_soon(start_polling_identity_metrics, identity_callback)
tg.start_soon(start_polling_system_metrics, system_callback)
tg.start_soon(start_polling_network_metrics, network_callback)
tg.start_soon(start_polling_memory_metrics, memory_callback)
tg.start_soon(self._emit_existing_download_progress)
tg.start_soon(self._connection_message_event_writer)
tg.start_soon(self._resend_out_for_delivery)

View File

@@ -1,6 +1,15 @@
from .profile import start_polling_memory_metrics, start_polling_node_metrics
from .profile import (
IdentityMetrics,
start_polling_identity_metrics,
start_polling_memory_metrics,
start_polling_network_metrics,
start_polling_system_metrics,
)
__all__ = [
"start_polling_node_metrics",
"IdentityMetrics",
"start_polling_identity_metrics",
"start_polling_memory_metrics",
"start_polling_network_metrics",
"start_polling_system_metrics",
]

View File

@@ -1,6 +1,7 @@
import asyncio
import os
import platform
from dataclasses import dataclass
from typing import Any, Callable, Coroutine
import anyio
@@ -9,7 +10,7 @@ from loguru import logger
from exo.shared.types.memory import Memory
from exo.shared.types.profiling import (
MemoryPerformanceProfile,
NodePerformanceProfile,
NetworkInterfaceInfo,
SystemPerformanceProfile,
)
@@ -27,6 +28,13 @@ from .system_info import (
)
@dataclass(frozen=True)
class IdentityMetrics:
model_id: str
chip_id: str
friendly_name: str
async def get_metrics_async() -> Metrics | None:
"""Return detailed Metrics on macOS or a minimal fallback elsewhere."""
@@ -67,48 +75,73 @@ async def start_polling_memory_metrics(
await anyio.sleep(poll_interval_s)
async def start_polling_node_metrics(
callback: Callable[[NodePerformanceProfile], Coroutine[Any, Any, None]],
):
poll_interval_s = 1.0
async def start_polling_identity_metrics(
callback: Callable[[IdentityMetrics], Coroutine[Any, Any, None]],
*,
poll_interval_s: float = 30.0,
) -> None:
"""Continuously poll and emit identity metrics at 30s intervals."""
while True:
try:
model_id, chip_id = await get_model_and_chip()
friendly_name = await get_friendly_name()
await callback(
IdentityMetrics(
model_id=model_id,
chip_id=chip_id,
friendly_name=friendly_name,
)
)
except Exception as e:
logger.opt(exception=e).error("Failed to emit identity metrics")
finally:
await anyio.sleep(poll_interval_s)
async def start_polling_system_metrics(
callback: Callable[[SystemPerformanceProfile], Coroutine[Any, Any, None]],
*,
poll_interval_s: float = 1.0,
) -> None:
"""Continuously poll and emit system metrics (GPU, temp, power) at 1s intervals."""
while True:
try:
metrics = await get_metrics_async()
if metrics is None:
return
network_interfaces = get_network_interfaces()
# these awaits could be joined but realistically they should be cached
model_id, chip_id = await get_model_and_chip()
friendly_name = await get_friendly_name()
# do the memory profile last to get a fresh reading to not conflict with the other memory profiling loop
memory_profile = get_memory_profile()
await callback(
NodePerformanceProfile(
model_id=model_id,
chip_id=chip_id,
friendly_name=friendly_name,
network_interfaces=network_interfaces,
memory=memory_profile,
system=SystemPerformanceProfile(
gpu_usage=metrics.gpu_usage[1],
temp=metrics.temp.gpu_temp_avg,
sys_power=metrics.sys_power,
pcpu_usage=metrics.pcpu_usage[1],
ecpu_usage=metrics.ecpu_usage[1],
ane_power=metrics.ane_power,
),
SystemPerformanceProfile(
gpu_usage=metrics.gpu_usage[1],
temp=metrics.temp.gpu_temp_avg,
sys_power=metrics.sys_power,
pcpu_usage=metrics.pcpu_usage[1],
ecpu_usage=metrics.ecpu_usage[1],
ane_power=metrics.ane_power,
)
)
except asyncio.TimeoutError:
logger.warning(
"[resource_monitor] Operation timed out after 30s, skipping this cycle."
"[system_monitor] Operation timed out after 30s, skipping this cycle."
)
except MacMonError as e:
logger.opt(exception=e).error("Resource Monitor encountered error")
logger.opt(exception=e).error("System Monitor encountered error")
return
finally:
await anyio.sleep(poll_interval_s)
async def start_polling_network_metrics(
callback: Callable[[list[NetworkInterfaceInfo]], Coroutine[Any, Any, None]],
*,
poll_interval_s: float = 30.0,
) -> None:
"""Continuously poll and emit network interface info at 30s intervals."""
while True:
try:
network_interfaces = get_network_interfaces()
await callback(network_interfaces)
except Exception as e:
logger.opt(exception=e).error("Network Monitor encountered error")
finally:
await anyio.sleep(poll_interval_s)