diff --git a/src/exo/shared/apply.py b/src/exo/shared/apply.py index c3714907..feae2fa4 100644 --- a/src/exo/shared/apply.py +++ b/src/exo/shared/apply.py @@ -214,6 +214,7 @@ def apply_node_gathered_info(event: NodeGatheredInfo, state: State) -> State: topology = copy.deepcopy(state.topology) info = event.info profile = state.node_profiles.get(event.node_id, NodePerformanceProfile()) + # TODO: should be broken up into individual events instead of this monster match info: case MacmonMetrics(): profile.system = info.system_profile @@ -272,7 +273,7 @@ def apply_node_gathered_info(event: NodeGatheredInfo, state: State) -> State: last_seen = {**state.last_seen, event.node_id: datetime.fromisoformat(event.when)} new_profiles = {**state.node_profiles, event.node_id: profile} return state.model_copy( - update={"node_profiles": new_profiles, "last_seen": last_seen} + update={"node_profiles": new_profiles, "last_seen": last_seen, "topology": topology} ) diff --git a/src/exo/utils/info_gatherer/info_gatherer.py b/src/exo/utils/info_gatherer/info_gatherer.py index 30e7edd4..78546742 100644 --- a/src/exo/utils/info_gatherer/info_gatherer.py +++ b/src/exo/utils/info_gatherer/info_gatherer.py @@ -12,6 +12,7 @@ import anyio from anyio import create_task_group, open_process from anyio.abc import TaskGroup from anyio.streams.text import TextReceiveStream +from anyio.streams.buffered import BufferedByteReceiveStream from loguru import logger from exo.shared.constants import EXO_CONFIG_FILE @@ -49,9 +50,9 @@ class NodeConfig(CamelCaseModel): @classmethod async def gather(cls) -> Self | None: - f = anyio.Path(EXO_CONFIG_FILE) - await f.touch(exist_ok=True) - async with await f.open("rb") as f: + cfg_file = anyio.Path(EXO_CONFIG_FILE) + await cfg_file.touch(exist_ok=True) + async with await cfg_file.open("rb") as f: try: contents = (await f.read()).decode("utf-8") data = tomllib.loads(contents) @@ -82,9 +83,9 @@ async def _gather_iface_map() -> dict[str, str] | None: port = "" for line in proc.stdout.decode("utf-8").split("\n"): if line.startswith("Hardware Port:"): - port = line.strip()[15:] + port = line.split(": ")[1] elif line.startswith("Device:"): - ports[port] = line.strip()[8:] + ports[port] = line.split(": ")[1] port = "" if "" in ports: del ports[""] @@ -147,11 +148,9 @@ class InfoGatherer: return old_idents = [] - old_conns = [] while True: data = await TBConnectivity.gather() - if data is None: - break + assert data is not None idents = [it for i in data if (it := i.ident(iface_map)) is not None] if idents != old_idents: @@ -159,9 +158,9 @@ class InfoGatherer: old_idents = idents conns = [it for i in data if (it := i.conn()) is not None] - if conns != old_conns: - await self.info_sender.send(conns) - old_conns = conns + await self.info_sender.send(conns) + + await anyio.sleep(self.system_profiler_interval) async def _monitor_memory_usage(self): override_memory_env = os.getenv("OVERRIDE_MEMORY_MB") @@ -200,9 +199,8 @@ class InfoGatherer: if not p.stdout: logger.critical("MacMon closed stdout") return - async for text in TextReceiveStream(p.stdout): + async for text in TextReceiveStream(BufferedByteReceiveStream(p.stdout)): await self.info_sender.send(MacmonMetrics.from_raw_json(text)) - except CalledProcessError as e: stderr_msg = "no stderr" stderr_output = cast(bytes | str | None, e.stderr)