Fast memory profiling

Co-authored-by: Evan <evanev7@gmail.com>
This commit is contained in:
Alex Cheema
2025-10-07 16:23:51 +01:00
committed by GitHub
parent e01f9cf739
commit 84dfc8a738
7 changed files with 198 additions and 79 deletions

View File

@@ -1913,18 +1913,33 @@
const result = {};
if (!clusterState) return result;
// Helper: get numeric bytes from various shapes (number | {in_bytes}|{inBytes})
function getBytes(value) {
if (typeof value === 'number') return value;
if (value && typeof value === 'object') {
if (typeof value.in_bytes === 'number') return value.in_bytes;
if (typeof value.inBytes === 'number') return value.inBytes;
}
return 0;
}
// Helper: pick from snake_case or camelCase
const pick = (obj, snake, camel, fallback = undefined) => {
if (!obj) return fallback;
if (obj[snake] !== undefined) return obj[snake];
if (obj[camel] !== undefined) return obj[camel];
return fallback;
};
// Process nodes from topology or fallback to node_profiles directly
let nodesToProcess = {};
if (clusterState.topology && clusterState.topology.nodes) {
// Use topology.nodes array
if (clusterState.topology && Array.isArray(clusterState.topology.nodes)) {
clusterState.topology.nodes.forEach(node => {
if (node.node_id && node.node_profile) {
nodesToProcess[node.node_id] = node.node_profile;
}
});
} else if (clusterState.node_profiles) {
// Fallback to node_profiles directly
nodesToProcess = clusterState.node_profiles;
}
@@ -1933,53 +1948,64 @@
const nodeProfile = nodesToProcess[nodeId];
if (!nodeProfile) continue;
// Extract memory information
// Extract memory information (supports new nested schema and old flat numbers)
let memBytesTotal = 0;
let memBytesUsed = 0;
if (nodeProfile.memory) {
memBytesTotal = nodeProfile.memory.ram_total || 0;
const ramAvailable = nodeProfile.memory.ram_available || 0;
memBytesUsed = Math.max(memBytesTotal - ramAvailable, 0);
}
let memBytesAvailable = 0;
const memory = nodeProfile.memory || {};
const ramTotalVal = pick(memory, 'ram_total', 'ramTotal');
const ramAvailVal = pick(memory, 'ram_available', 'ramAvailable');
const swapTotalVal = pick(memory, 'swap_total', 'swapTotal');
const swapAvailVal = pick(memory, 'swap_available', 'swapAvailable');
memBytesTotal = getBytes(ramTotalVal);
memBytesAvailable = getBytes(ramAvailVal);
const memBytesUsed = Math.max(memBytesTotal - memBytesAvailable, 0);
// Extract model information
const modelId = nodeProfile.model_id || 'Unknown';
const chipId = nodeProfile.chip_id || '';
const friendlyName = nodeProfile.friendly_name || `${nodeId.substring(0, 8)}...`;
const modelId = pick(nodeProfile, 'model_id', 'modelId', 'Unknown');
const chipId = pick(nodeProfile, 'chip_id', 'chipId', '');
const friendlyName = pick(nodeProfile, 'friendly_name', 'friendlyName', `${nodeId.substring(0, 8)}...`);
// Extract network addresses
// Extract network addresses (support snake_case and camelCase)
const addrList = [];
if (nodeProfile.network_interfaces) {
nodeProfile.network_interfaces.forEach(intf => {
if (intf.ip_address && !intf.ip_address.startsWith('fe80::')) {
// Filter out link-local IPv6 addresses
addrList.push(intf.ip_address);
}
});
}
const netIfacesSnake = nodeProfile.network_interfaces;
const netIfacesCamel = nodeProfile.networkInterfaces;
const interfaces = Array.isArray(netIfacesSnake) ? netIfacesSnake : (Array.isArray(netIfacesCamel) ? netIfacesCamel : []);
interfaces.forEach(intf => {
const ip = intf.ip_address ?? intf.ipAddress;
if (ip && !String(ip).startsWith('fe80::')) {
addrList.push(ip);
}
});
// Transform system metrics to macmon_info format for compatibility
// Transform system metrics to macmon_info format (support snake_case and camelCase)
const systemInfo = nodeProfile.system || {};
const gpuUsage = pick(systemInfo, 'gpu_usage', 'gpuUsage', 0);
const temp = pick(systemInfo, 'temp', 'temp', null);
const sysPower = pick(systemInfo, 'sys_power', 'sysPower', null);
const pcpuUsage = pick(systemInfo, 'pcpu_usage', 'pcpuUsage', 0);
const ecpuUsage = pick(systemInfo, 'ecpu_usage', 'ecpuUsage', 0);
const anePower = pick(systemInfo, 'ane_power', 'anePower', 0);
const flopsFp16 = pick(systemInfo, 'flops_fp16', 'flopsFp16', 0);
const macmonInfo = {
memory: {
ram_total: memBytesTotal,
ram_usage: memBytesUsed,
ram_available: nodeProfile.memory?.ram_available || 0,
swap_total: nodeProfile.memory?.swap_total || 0,
swap_usage: (nodeProfile.memory?.swap_total || 0) - (nodeProfile.memory?.swap_available || 0)
ram_available: memBytesAvailable,
swap_total: getBytes(swapTotalVal),
swap_usage: Math.max(getBytes(swapTotalVal) - getBytes(swapAvailVal), 0)
},
// Convert new format to old format
gpu_usage: systemInfo.gpu_usage ? [0, systemInfo.gpu_usage] : [0, 0],
gpu_usage: [0, typeof gpuUsage === 'number' ? gpuUsage : 0],
temp: {
cpu_temp_avg: systemInfo.temp || null,
gpu_temp_avg: systemInfo.temp || null // Using same temp for both in new format
cpu_temp_avg: typeof temp === 'number' ? temp : null,
gpu_temp_avg: typeof temp === 'number' ? temp : null
},
sys_power: systemInfo.sys_power || null,
pcpu_usage: systemInfo.pcpu_usage ? [0, systemInfo.pcpu_usage] : [0, 0],
ecpu_usage: systemInfo.ecpu_usage ? [0, systemInfo.ecpu_usage] : [0, 0],
ane_power: systemInfo.ane_power || 0,
flops_fp16: systemInfo.flops_fp16 || 0,
sys_power: typeof sysPower === 'number' ? sysPower : null,
pcpu_usage: [0, typeof pcpuUsage === 'number' ? pcpuUsage : 0],
ecpu_usage: [0, typeof ecpuUsage === 'number' ? ecpuUsage : 0],
ane_power: typeof anePower === 'number' ? anePower : 0,
flops_fp16: typeof flopsFp16 === 'number' ? flopsFp16 : 0,
timestamp: new Date().toISOString()
};
@@ -1987,7 +2013,7 @@
mem: memBytesTotal,
addrs: addrList,
last_addr_update: Date.now() / 1000,
system_info: {
system_info: {
model_id: modelId,
chip_id: chipId
},

View File

@@ -12,6 +12,7 @@ from exo.shared.types.events import (
InstanceCreated,
InstanceDeactivated,
InstanceDeleted,
NodeMemoryMeasured,
NodePerformanceMeasured,
RunnerDeleted,
RunnerStatusUpdated,
@@ -25,7 +26,7 @@ from exo.shared.types.events import (
TopologyNodeCreated,
WorkerStatusUpdated,
)
from exo.shared.types.profiling import NodePerformanceProfile
from exo.shared.types.profiling import NodePerformanceProfile, SystemPerformanceProfile
from exo.shared.types.state import State
from exo.shared.types.tasks import Task, TaskId, TaskStatus
from exo.shared.types.topology import NodeInfo
@@ -49,6 +50,8 @@ def event_apply(event: Event, state: State) -> State:
return apply_instance_deleted(event, state)
case NodePerformanceMeasured():
return apply_node_performance_measured(event, state)
case NodeMemoryMeasured():
return apply_node_memory_measured(event, state)
case RunnerDeleted():
return apply_runner_deleted(event, state)
case RunnerStatusUpdated():
@@ -197,6 +200,51 @@ def apply_node_performance_measured(
return state.model_copy(update={"topology": topology})
def apply_node_memory_measured(event: NodeMemoryMeasured, state: State) -> State:
existing = state.node_profiles.get(event.node_id)
topology = copy.copy(state.topology)
if existing is None:
created = NodePerformanceProfile(
model_id="unknown",
chip_id="unknown",
friendly_name="Unknown",
memory=event.memory,
network_interfaces=[],
system=SystemPerformanceProfile(
flops_fp16=0.0,
gpu_usage=0.0,
temp=0.0,
sys_power=0.0,
pcpu_usage=0.0,
ecpu_usage=0.0,
ane_power=0.0,
),
)
created_profiles: Mapping[NodeId, NodePerformanceProfile] = {
**state.node_profiles,
event.node_id: created,
}
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
topology.update_node_profile(event.node_id, created)
return state.model_copy(
update={"node_profiles": created_profiles, "topology": topology}
)
updated = existing.model_copy(update={"memory": event.memory})
updated_profiles: Mapping[NodeId, NodePerformanceProfile] = {
**state.node_profiles,
event.node_id: updated,
}
if not topology.contains_node(event.node_id):
topology.add_node(NodeInfo(node_id=event.node_id))
topology.update_node_profile(event.node_id, updated)
return state.model_copy(
update={"node_profiles": updated_profiles, "topology": topology}
)
def apply_worker_status_updated(event: WorkerStatusUpdated, state: State) -> State:
new_node_status: Mapping[NodeId, WorkerStatus] = {
**state.node_status,

View File

@@ -6,6 +6,7 @@ from pydantic import Field
from exo.shared.topology import Connection, NodePerformanceProfile
from exo.shared.types.chunks import CommandId, GenerationChunk
from exo.shared.types.common import ID, NodeId
from exo.shared.types.profiling import MemoryPerformanceProfile
from exo.shared.types.tasks import Task, TaskId, TaskStatus
from exo.shared.types.worker.common import InstanceId, WorkerStatus
from exo.shared.types.worker.instances import Instance
@@ -51,6 +52,7 @@ class EventType(str, Enum):
# Node Performance Events
WorkerStatusUpdated = "WorkerStatusUpdated"
NodePerformanceMeasured = "NodePerformanceMeasured"
NodeMemoryMeasured = "NodeMemoryMeasured"
# Topology Events
TopologyNodeCreated = "TopologyNodeCreated"
@@ -116,6 +118,11 @@ class NodePerformanceMeasured(BaseEvent):
node_profile: NodePerformanceProfile
class NodeMemoryMeasured(BaseEvent):
node_id: NodeId
memory: MemoryPerformanceProfile
class WorkerStatusUpdated(BaseEvent):
node_id: NodeId
node_state: WorkerStatus
@@ -151,6 +158,7 @@ Event = Union[
RunnerStatusUpdated,
RunnerDeleted,
NodePerformanceMeasured,
NodeMemoryMeasured,
WorkerStatusUpdated,
ChunkGenerated,
TopologyNodeCreated,
@@ -173,6 +181,7 @@ Event = Union[
EventType.RunnerStatusUpdated: RunnerStatusUpdated,
EventType.RunnerDeleted: RunnerDeleted,
EventType.NodePerformanceMeasured: NodePerformanceMeasured,
EventType.NodeMemoryMeasured: NodeMemoryMeasured,
EventType.WorkerStatusUpdated: WorkerStatusUpdated,
EventType.ChunkGenerated: ChunkGenerated,
EventType.TopologyNodeCreated: TopologyNodeCreated,

View File

@@ -21,6 +21,7 @@ from exo.shared.types.events import (
ForwarderEvent,
IndexedEvent,
InstanceDeleted,
NodeMemoryMeasured,
NodePerformanceMeasured,
RunnerDeleted,
RunnerStatusUpdated,
@@ -32,7 +33,7 @@ from exo.shared.types.events import (
)
from exo.shared.types.memory import Memory
from exo.shared.types.multiaddr import Multiaddr
from exo.shared.types.profiling import NodePerformanceProfile
from exo.shared.types.profiling import MemoryPerformanceProfile, NodePerformanceProfile
from exo.shared.types.state import State
from exo.shared.types.tasks import TaskId, TaskStatus
from exo.shared.types.topology import Connection
@@ -68,7 +69,7 @@ from exo.worker.common import AssignedRunner
from exo.worker.download.shard_downloader import RepoDownloadProgress, ShardDownloader
from exo.worker.plan import plan
from exo.worker.runner.runner_supervisor import RunnerSupervisor
from exo.worker.utils import start_polling_node_metrics
from exo.worker.utils import start_polling_memory_metrics, start_polling_node_metrics
class Worker:
@@ -124,6 +125,15 @@ class Worker:
async with create_task_group() as tg:
self._tg = tg
tg.start_soon(start_polling_node_metrics, resource_monitor_callback)
async def memory_monitor_callback(
memory_profile: MemoryPerformanceProfile,
) -> None:
await self.event_publisher(
NodeMemoryMeasured(node_id=self.node_id, memory=memory_profile)
)
tg.start_soon(start_polling_memory_metrics, memory_monitor_callback)
tg.start_soon(self._connection_message_event_writer)
tg.start_soon(self._resend_out_for_delivery)
tg.start_soon(self._event_applier)

View File

@@ -1,3 +1,6 @@
from .profile import start_polling_node_metrics
from .profile import start_polling_memory_metrics, start_polling_node_metrics
__all__ = ["start_polling_node_metrics"]
__all__ = [
"start_polling_node_metrics",
"start_polling_memory_metrics",
]

View File

@@ -4,6 +4,7 @@ import platform
from typing import Any, Callable, Coroutine
import anyio
import psutil
from loguru import logger
from exo.shared.types.profiling import (
@@ -37,6 +38,54 @@ async def get_metrics_async() -> Metrics:
return Metrics()
async def get_memory_profile_async() -> MemoryPerformanceProfile:
"""Return MemoryPerformanceProfile using psutil (fast, cross-platform).
Uses synchronous psutil calls in a worker thread to avoid blocking the event loop.
"""
def _read_psutil() -> MemoryPerformanceProfile:
vm = psutil.virtual_memory()
sm = psutil.swap_memory()
override_memory_env = os.getenv("OVERRIDE_MEMORY")
override_memory: int | None = (
int(override_memory_env) * 2**30 if override_memory_env else None
)
return MemoryPerformanceProfile.from_bytes(
ram_total=int(vm.total),
ram_available=int(override_memory)
if override_memory
else int(vm.available),
swap_total=int(sm.total),
swap_available=int(sm.free),
)
return await asyncio.to_thread(_read_psutil)
async def start_polling_memory_metrics(
callback: Callable[[MemoryPerformanceProfile], Coroutine[Any, Any, None]],
*,
poll_interval_s: float = 0.5,
) -> None:
"""Continuously poll and emit memory-only metrics at a faster cadence.
Parameters
- callback: coroutine called with a fresh MemoryPerformanceProfile each tick
- poll_interval_s: interval between polls
"""
while True:
try:
mem = await get_memory_profile_async()
await callback(mem)
except Exception as e:
logger.opt(exception=e).error("Memory Monitor encountered error")
finally:
await anyio.sleep(poll_interval_s)
async def start_polling_node_metrics(
callback: Callable[[NodePerformanceProfile], Coroutine[Any, Any, None]],
):
@@ -46,28 +95,16 @@ async def start_polling_node_metrics(
# Gather metrics & system info with a timeout on each call
metrics = await get_metrics_async()
# Extract memory totals from metrics
total_mem = (
metrics.memory.ram_total
if metrics.memory is not None and metrics.memory.ram_total is not None
else 0
)
used_mem = (
metrics.memory.ram_usage
if metrics.memory is not None and metrics.memory.ram_usage is not None
else 0
)
system_info, network_interfaces, mac_friendly_name = await asyncio.gather(
(
system_info,
network_interfaces,
mac_friendly_name,
memory_profile,
) = await asyncio.gather(
get_mac_system_info_async(),
get_network_interface_info_async(),
get_mac_friendly_name_async(),
)
# Run heavy FLOPs profiling only if enough time has elapsed
override_memory_env = os.getenv("OVERRIDE_MEMORY")
override_memory: int | None = (
int(override_memory_env) * 2**30 if override_memory_env else None
get_memory_profile_async(),
)
await callback(
@@ -76,22 +113,7 @@ async def start_polling_node_metrics(
chip_id=system_info.chip_id,
friendly_name=mac_friendly_name or "Unknown",
network_interfaces=network_interfaces,
memory=MemoryPerformanceProfile.from_bytes(
ram_total=total_mem,
ram_available=override_memory
if override_memory
else total_mem - used_mem,
swap_total=metrics.memory.swap_total
if metrics.memory is not None
and metrics.memory.swap_total is not None
else 0,
swap_available=metrics.memory.swap_total
- metrics.memory.swap_usage
if metrics.memory is not None
and metrics.memory.swap_usage is not None
and metrics.memory.swap_total is not None
else 0,
),
memory=memory_profile,
system=SystemPerformanceProfile(
flops_fp16=0,
gpu_usage=metrics.gpu_usage[1]

3
uv.lock generated
View File

@@ -253,7 +253,7 @@ wheels = [
[[package]]
name = "exo"
version = "0.2.0"
version = "0.3.0"
source = { editable = "." }
dependencies = [
{ name = "aiofiles", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -1112,6 +1112,7 @@ source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e7/b0/66d96f02120f79eeed86b5c5be04029b6821155f31ed4907a4e9f1460671/rustworkx-0.17.1.tar.gz", hash = "sha256:59ea01b4e603daffa4e8827316c1641eef18ae9032f0b1b14aa0181687e3108e", size = 399407, upload-time = "2025-09-15T16:29:46.429Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/24/8972ed631fa05fdec05a7bb7f1fc0f8e78ee761ab37e8a93d1ed396ba060/rustworkx-0.17.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:c08fb8db041db052da404839b064ebfb47dcce04ba9a3e2eb79d0c65ab011da4", size = 2257491, upload-time = "2025-08-13T01:43:31.466Z" },
{ url = "https://files.pythonhosted.org/packages/23/ae/7b6bbae5e0487ee42072dc6a46edf5db9731a0701ed648db22121fb7490c/rustworkx-0.17.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:4ef8e327dadf6500edd76fedb83f6d888b9266c58bcdbffd5a40c33835c9dd26", size = 2040175, upload-time = "2025-08-13T01:43:33.762Z" },