From 84dfc8a738e006a3e52e0cb8b0f38a9910ce4c57 Mon Sep 17 00:00:00 2001 From: Alex Cheema <41707476+AlexCheema@users.noreply.github.com> Date: Tue, 7 Oct 2025 16:23:51 +0100 Subject: [PATCH] Fast memory profiling Co-authored-by: Evan --- dashboard/index.html | 102 +++++++++++++++++++------------ src/exo/shared/apply.py | 50 ++++++++++++++- src/exo/shared/types/events.py | 9 +++ src/exo/worker/main.py | 14 ++++- src/exo/worker/utils/__init__.py | 7 ++- src/exo/worker/utils/profile.py | 92 +++++++++++++++++----------- uv.lock | 3 +- 7 files changed, 198 insertions(+), 79 deletions(-) diff --git a/dashboard/index.html b/dashboard/index.html index 85f94589..7560298f 100644 --- a/dashboard/index.html +++ b/dashboard/index.html @@ -1913,18 +1913,33 @@ const result = {}; if (!clusterState) return result; + // Helper: get numeric bytes from various shapes (number | {in_bytes}|{inBytes}) + function getBytes(value) { + if (typeof value === 'number') return value; + if (value && typeof value === 'object') { + if (typeof value.in_bytes === 'number') return value.in_bytes; + if (typeof value.inBytes === 'number') return value.inBytes; + } + return 0; + } + + // Helper: pick from snake_case or camelCase + const pick = (obj, snake, camel, fallback = undefined) => { + if (!obj) return fallback; + if (obj[snake] !== undefined) return obj[snake]; + if (obj[camel] !== undefined) return obj[camel]; + return fallback; + }; + // Process nodes from topology or fallback to node_profiles directly let nodesToProcess = {}; - - if (clusterState.topology && clusterState.topology.nodes) { - // Use topology.nodes array + if (clusterState.topology && Array.isArray(clusterState.topology.nodes)) { clusterState.topology.nodes.forEach(node => { if (node.node_id && node.node_profile) { nodesToProcess[node.node_id] = node.node_profile; } }); } else if (clusterState.node_profiles) { - // Fallback to node_profiles directly nodesToProcess = clusterState.node_profiles; } @@ -1933,53 +1948,64 @@ const nodeProfile = nodesToProcess[nodeId]; if (!nodeProfile) continue; - // Extract memory information + // Extract memory information (supports new nested schema and old flat numbers) let memBytesTotal = 0; - let memBytesUsed = 0; - - if (nodeProfile.memory) { - memBytesTotal = nodeProfile.memory.ram_total || 0; - const ramAvailable = nodeProfile.memory.ram_available || 0; - memBytesUsed = Math.max(memBytesTotal - ramAvailable, 0); - } + let memBytesAvailable = 0; + const memory = nodeProfile.memory || {}; + const ramTotalVal = pick(memory, 'ram_total', 'ramTotal'); + const ramAvailVal = pick(memory, 'ram_available', 'ramAvailable'); + const swapTotalVal = pick(memory, 'swap_total', 'swapTotal'); + const swapAvailVal = pick(memory, 'swap_available', 'swapAvailable'); + + memBytesTotal = getBytes(ramTotalVal); + memBytesAvailable = getBytes(ramAvailVal); + const memBytesUsed = Math.max(memBytesTotal - memBytesAvailable, 0); // Extract model information - const modelId = nodeProfile.model_id || 'Unknown'; - const chipId = nodeProfile.chip_id || ''; - const friendlyName = nodeProfile.friendly_name || `${nodeId.substring(0, 8)}...`; + const modelId = pick(nodeProfile, 'model_id', 'modelId', 'Unknown'); + const chipId = pick(nodeProfile, 'chip_id', 'chipId', ''); + const friendlyName = pick(nodeProfile, 'friendly_name', 'friendlyName', `${nodeId.substring(0, 8)}...`); - // Extract network addresses + // Extract network addresses (support snake_case and camelCase) const addrList = []; - if (nodeProfile.network_interfaces) { - nodeProfile.network_interfaces.forEach(intf => { - if (intf.ip_address && !intf.ip_address.startsWith('fe80::')) { - // Filter out link-local IPv6 addresses - addrList.push(intf.ip_address); - } - }); - } + const netIfacesSnake = nodeProfile.network_interfaces; + const netIfacesCamel = nodeProfile.networkInterfaces; + const interfaces = Array.isArray(netIfacesSnake) ? netIfacesSnake : (Array.isArray(netIfacesCamel) ? netIfacesCamel : []); + interfaces.forEach(intf => { + const ip = intf.ip_address ?? intf.ipAddress; + if (ip && !String(ip).startsWith('fe80::')) { + addrList.push(ip); + } + }); - // Transform system metrics to macmon_info format for compatibility + // Transform system metrics to macmon_info format (support snake_case and camelCase) const systemInfo = nodeProfile.system || {}; + const gpuUsage = pick(systemInfo, 'gpu_usage', 'gpuUsage', 0); + const temp = pick(systemInfo, 'temp', 'temp', null); + const sysPower = pick(systemInfo, 'sys_power', 'sysPower', null); + const pcpuUsage = pick(systemInfo, 'pcpu_usage', 'pcpuUsage', 0); + const ecpuUsage = pick(systemInfo, 'ecpu_usage', 'ecpuUsage', 0); + const anePower = pick(systemInfo, 'ane_power', 'anePower', 0); + const flopsFp16 = pick(systemInfo, 'flops_fp16', 'flopsFp16', 0); + const macmonInfo = { memory: { ram_total: memBytesTotal, ram_usage: memBytesUsed, - ram_available: nodeProfile.memory?.ram_available || 0, - swap_total: nodeProfile.memory?.swap_total || 0, - swap_usage: (nodeProfile.memory?.swap_total || 0) - (nodeProfile.memory?.swap_available || 0) + ram_available: memBytesAvailable, + swap_total: getBytes(swapTotalVal), + swap_usage: Math.max(getBytes(swapTotalVal) - getBytes(swapAvailVal), 0) }, - // Convert new format to old format - gpu_usage: systemInfo.gpu_usage ? [0, systemInfo.gpu_usage] : [0, 0], + gpu_usage: [0, typeof gpuUsage === 'number' ? gpuUsage : 0], temp: { - cpu_temp_avg: systemInfo.temp || null, - gpu_temp_avg: systemInfo.temp || null // Using same temp for both in new format + cpu_temp_avg: typeof temp === 'number' ? temp : null, + gpu_temp_avg: typeof temp === 'number' ? temp : null }, - sys_power: systemInfo.sys_power || null, - pcpu_usage: systemInfo.pcpu_usage ? [0, systemInfo.pcpu_usage] : [0, 0], - ecpu_usage: systemInfo.ecpu_usage ? [0, systemInfo.ecpu_usage] : [0, 0], - ane_power: systemInfo.ane_power || 0, - flops_fp16: systemInfo.flops_fp16 || 0, + sys_power: typeof sysPower === 'number' ? sysPower : null, + pcpu_usage: [0, typeof pcpuUsage === 'number' ? pcpuUsage : 0], + ecpu_usage: [0, typeof ecpuUsage === 'number' ? ecpuUsage : 0], + ane_power: typeof anePower === 'number' ? anePower : 0, + flops_fp16: typeof flopsFp16 === 'number' ? flopsFp16 : 0, timestamp: new Date().toISOString() }; @@ -1987,7 +2013,7 @@ mem: memBytesTotal, addrs: addrList, last_addr_update: Date.now() / 1000, - system_info: { + system_info: { model_id: modelId, chip_id: chipId }, diff --git a/src/exo/shared/apply.py b/src/exo/shared/apply.py index 3c0f2d5d..1ba73d7b 100644 --- a/src/exo/shared/apply.py +++ b/src/exo/shared/apply.py @@ -12,6 +12,7 @@ from exo.shared.types.events import ( InstanceCreated, InstanceDeactivated, InstanceDeleted, + NodeMemoryMeasured, NodePerformanceMeasured, RunnerDeleted, RunnerStatusUpdated, @@ -25,7 +26,7 @@ from exo.shared.types.events import ( TopologyNodeCreated, WorkerStatusUpdated, ) -from exo.shared.types.profiling import NodePerformanceProfile +from exo.shared.types.profiling import NodePerformanceProfile, SystemPerformanceProfile from exo.shared.types.state import State from exo.shared.types.tasks import Task, TaskId, TaskStatus from exo.shared.types.topology import NodeInfo @@ -49,6 +50,8 @@ def event_apply(event: Event, state: State) -> State: return apply_instance_deleted(event, state) case NodePerformanceMeasured(): return apply_node_performance_measured(event, state) + case NodeMemoryMeasured(): + return apply_node_memory_measured(event, state) case RunnerDeleted(): return apply_runner_deleted(event, state) case RunnerStatusUpdated(): @@ -197,6 +200,51 @@ def apply_node_performance_measured( return state.model_copy(update={"topology": topology}) +def apply_node_memory_measured(event: NodeMemoryMeasured, state: State) -> State: + existing = state.node_profiles.get(event.node_id) + topology = copy.copy(state.topology) + + if existing is None: + created = NodePerformanceProfile( + model_id="unknown", + chip_id="unknown", + friendly_name="Unknown", + memory=event.memory, + network_interfaces=[], + system=SystemPerformanceProfile( + flops_fp16=0.0, + gpu_usage=0.0, + temp=0.0, + sys_power=0.0, + pcpu_usage=0.0, + ecpu_usage=0.0, + ane_power=0.0, + ), + ) + created_profiles: Mapping[NodeId, NodePerformanceProfile] = { + **state.node_profiles, + event.node_id: created, + } + if not topology.contains_node(event.node_id): + topology.add_node(NodeInfo(node_id=event.node_id)) + topology.update_node_profile(event.node_id, created) + return state.model_copy( + update={"node_profiles": created_profiles, "topology": topology} + ) + + updated = existing.model_copy(update={"memory": event.memory}) + updated_profiles: Mapping[NodeId, NodePerformanceProfile] = { + **state.node_profiles, + event.node_id: updated, + } + if not topology.contains_node(event.node_id): + topology.add_node(NodeInfo(node_id=event.node_id)) + topology.update_node_profile(event.node_id, updated) + return state.model_copy( + update={"node_profiles": updated_profiles, "topology": topology} + ) + + def apply_worker_status_updated(event: WorkerStatusUpdated, state: State) -> State: new_node_status: Mapping[NodeId, WorkerStatus] = { **state.node_status, diff --git a/src/exo/shared/types/events.py b/src/exo/shared/types/events.py index 8d9aa32c..074457a3 100644 --- a/src/exo/shared/types/events.py +++ b/src/exo/shared/types/events.py @@ -6,6 +6,7 @@ from pydantic import Field from exo.shared.topology import Connection, NodePerformanceProfile from exo.shared.types.chunks import CommandId, GenerationChunk from exo.shared.types.common import ID, NodeId +from exo.shared.types.profiling import MemoryPerformanceProfile from exo.shared.types.tasks import Task, TaskId, TaskStatus from exo.shared.types.worker.common import InstanceId, WorkerStatus from exo.shared.types.worker.instances import Instance @@ -51,6 +52,7 @@ class EventType(str, Enum): # Node Performance Events WorkerStatusUpdated = "WorkerStatusUpdated" NodePerformanceMeasured = "NodePerformanceMeasured" + NodeMemoryMeasured = "NodeMemoryMeasured" # Topology Events TopologyNodeCreated = "TopologyNodeCreated" @@ -116,6 +118,11 @@ class NodePerformanceMeasured(BaseEvent): node_profile: NodePerformanceProfile +class NodeMemoryMeasured(BaseEvent): + node_id: NodeId + memory: MemoryPerformanceProfile + + class WorkerStatusUpdated(BaseEvent): node_id: NodeId node_state: WorkerStatus @@ -151,6 +158,7 @@ Event = Union[ RunnerStatusUpdated, RunnerDeleted, NodePerformanceMeasured, + NodeMemoryMeasured, WorkerStatusUpdated, ChunkGenerated, TopologyNodeCreated, @@ -173,6 +181,7 @@ Event = Union[ EventType.RunnerStatusUpdated: RunnerStatusUpdated, EventType.RunnerDeleted: RunnerDeleted, EventType.NodePerformanceMeasured: NodePerformanceMeasured, + EventType.NodeMemoryMeasured: NodeMemoryMeasured, EventType.WorkerStatusUpdated: WorkerStatusUpdated, EventType.ChunkGenerated: ChunkGenerated, EventType.TopologyNodeCreated: TopologyNodeCreated, diff --git a/src/exo/worker/main.py b/src/exo/worker/main.py index 24c60323..59cf2ca6 100644 --- a/src/exo/worker/main.py +++ b/src/exo/worker/main.py @@ -21,6 +21,7 @@ from exo.shared.types.events import ( ForwarderEvent, IndexedEvent, InstanceDeleted, + NodeMemoryMeasured, NodePerformanceMeasured, RunnerDeleted, RunnerStatusUpdated, @@ -32,7 +33,7 @@ from exo.shared.types.events import ( ) from exo.shared.types.memory import Memory from exo.shared.types.multiaddr import Multiaddr -from exo.shared.types.profiling import NodePerformanceProfile +from exo.shared.types.profiling import MemoryPerformanceProfile, NodePerformanceProfile from exo.shared.types.state import State from exo.shared.types.tasks import TaskId, TaskStatus from exo.shared.types.topology import Connection @@ -68,7 +69,7 @@ from exo.worker.common import AssignedRunner from exo.worker.download.shard_downloader import RepoDownloadProgress, ShardDownloader from exo.worker.plan import plan from exo.worker.runner.runner_supervisor import RunnerSupervisor -from exo.worker.utils import start_polling_node_metrics +from exo.worker.utils import start_polling_memory_metrics, start_polling_node_metrics class Worker: @@ -124,6 +125,15 @@ class Worker: async with create_task_group() as tg: self._tg = tg tg.start_soon(start_polling_node_metrics, resource_monitor_callback) + + async def memory_monitor_callback( + memory_profile: MemoryPerformanceProfile, + ) -> None: + await self.event_publisher( + NodeMemoryMeasured(node_id=self.node_id, memory=memory_profile) + ) + + tg.start_soon(start_polling_memory_metrics, memory_monitor_callback) tg.start_soon(self._connection_message_event_writer) tg.start_soon(self._resend_out_for_delivery) tg.start_soon(self._event_applier) diff --git a/src/exo/worker/utils/__init__.py b/src/exo/worker/utils/__init__.py index 386a613c..9a94e028 100644 --- a/src/exo/worker/utils/__init__.py +++ b/src/exo/worker/utils/__init__.py @@ -1,3 +1,6 @@ -from .profile import start_polling_node_metrics +from .profile import start_polling_memory_metrics, start_polling_node_metrics -__all__ = ["start_polling_node_metrics"] +__all__ = [ + "start_polling_node_metrics", + "start_polling_memory_metrics", +] diff --git a/src/exo/worker/utils/profile.py b/src/exo/worker/utils/profile.py index 174c1a41..45f8c4b0 100644 --- a/src/exo/worker/utils/profile.py +++ b/src/exo/worker/utils/profile.py @@ -4,6 +4,7 @@ import platform from typing import Any, Callable, Coroutine import anyio +import psutil from loguru import logger from exo.shared.types.profiling import ( @@ -37,6 +38,54 @@ async def get_metrics_async() -> Metrics: return Metrics() +async def get_memory_profile_async() -> MemoryPerformanceProfile: + """Return MemoryPerformanceProfile using psutil (fast, cross-platform). + + Uses synchronous psutil calls in a worker thread to avoid blocking the event loop. + """ + + def _read_psutil() -> MemoryPerformanceProfile: + vm = psutil.virtual_memory() + sm = psutil.swap_memory() + + override_memory_env = os.getenv("OVERRIDE_MEMORY") + override_memory: int | None = ( + int(override_memory_env) * 2**30 if override_memory_env else None + ) + + return MemoryPerformanceProfile.from_bytes( + ram_total=int(vm.total), + ram_available=int(override_memory) + if override_memory + else int(vm.available), + swap_total=int(sm.total), + swap_available=int(sm.free), + ) + + return await asyncio.to_thread(_read_psutil) + + +async def start_polling_memory_metrics( + callback: Callable[[MemoryPerformanceProfile], Coroutine[Any, Any, None]], + *, + poll_interval_s: float = 0.5, +) -> None: + """Continuously poll and emit memory-only metrics at a faster cadence. + + Parameters + - callback: coroutine called with a fresh MemoryPerformanceProfile each tick + - poll_interval_s: interval between polls + """ + while True: + try: + mem = await get_memory_profile_async() + await callback(mem) + except Exception as e: + logger.opt(exception=e).error("Memory Monitor encountered error") + finally: + await anyio.sleep(poll_interval_s) + + async def start_polling_node_metrics( callback: Callable[[NodePerformanceProfile], Coroutine[Any, Any, None]], ): @@ -46,28 +95,16 @@ async def start_polling_node_metrics( # Gather metrics & system info with a timeout on each call metrics = await get_metrics_async() - # Extract memory totals from metrics - total_mem = ( - metrics.memory.ram_total - if metrics.memory is not None and metrics.memory.ram_total is not None - else 0 - ) - used_mem = ( - metrics.memory.ram_usage - if metrics.memory is not None and metrics.memory.ram_usage is not None - else 0 - ) - - system_info, network_interfaces, mac_friendly_name = await asyncio.gather( + ( + system_info, + network_interfaces, + mac_friendly_name, + memory_profile, + ) = await asyncio.gather( get_mac_system_info_async(), get_network_interface_info_async(), get_mac_friendly_name_async(), - ) - - # Run heavy FLOPs profiling only if enough time has elapsed - override_memory_env = os.getenv("OVERRIDE_MEMORY") - override_memory: int | None = ( - int(override_memory_env) * 2**30 if override_memory_env else None + get_memory_profile_async(), ) await callback( @@ -76,22 +113,7 @@ async def start_polling_node_metrics( chip_id=system_info.chip_id, friendly_name=mac_friendly_name or "Unknown", network_interfaces=network_interfaces, - memory=MemoryPerformanceProfile.from_bytes( - ram_total=total_mem, - ram_available=override_memory - if override_memory - else total_mem - used_mem, - swap_total=metrics.memory.swap_total - if metrics.memory is not None - and metrics.memory.swap_total is not None - else 0, - swap_available=metrics.memory.swap_total - - metrics.memory.swap_usage - if metrics.memory is not None - and metrics.memory.swap_usage is not None - and metrics.memory.swap_total is not None - else 0, - ), + memory=memory_profile, system=SystemPerformanceProfile( flops_fp16=0, gpu_usage=metrics.gpu_usage[1] diff --git a/uv.lock b/uv.lock index 798b19d4..6ef6edd7 100644 --- a/uv.lock +++ b/uv.lock @@ -253,7 +253,7 @@ wheels = [ [[package]] name = "exo" -version = "0.2.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "aiofiles", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -1112,6 +1112,7 @@ source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "numpy", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +sdist = { url = "https://files.pythonhosted.org/packages/e7/b0/66d96f02120f79eeed86b5c5be04029b6821155f31ed4907a4e9f1460671/rustworkx-0.17.1.tar.gz", hash = "sha256:59ea01b4e603daffa4e8827316c1641eef18ae9032f0b1b14aa0181687e3108e", size = 399407, upload-time = "2025-09-15T16:29:46.429Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/20/24/8972ed631fa05fdec05a7bb7f1fc0f8e78ee761ab37e8a93d1ed396ba060/rustworkx-0.17.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:c08fb8db041db052da404839b064ebfb47dcce04ba9a3e2eb79d0c65ab011da4", size = 2257491, upload-time = "2025-08-13T01:43:31.466Z" }, { url = "https://files.pythonhosted.org/packages/23/ae/7b6bbae5e0487ee42072dc6a46edf5db9731a0701ed648db22121fb7490c/rustworkx-0.17.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:4ef8e327dadf6500edd76fedb83f6d888b9266c58bcdbffd5a40c33835c9dd26", size = 2040175, upload-time = "2025-08-13T01:43:33.762Z" },