mirror of
https://github.com/exo-explore/exo.git
synced 2025-12-23 22:27:50 -05:00
actually update the topology
This commit is contained in:
@@ -214,6 +214,7 @@ def apply_node_gathered_info(event: NodeGatheredInfo, state: State) -> State:
|
||||
topology = copy.deepcopy(state.topology)
|
||||
info = event.info
|
||||
profile = state.node_profiles.get(event.node_id, NodePerformanceProfile())
|
||||
# TODO: should be broken up into individual events instead of this monster
|
||||
match info:
|
||||
case MacmonMetrics():
|
||||
profile.system = info.system_profile
|
||||
@@ -272,7 +273,7 @@ def apply_node_gathered_info(event: NodeGatheredInfo, state: State) -> State:
|
||||
last_seen = {**state.last_seen, event.node_id: datetime.fromisoformat(event.when)}
|
||||
new_profiles = {**state.node_profiles, event.node_id: profile}
|
||||
return state.model_copy(
|
||||
update={"node_profiles": new_profiles, "last_seen": last_seen}
|
||||
update={"node_profiles": new_profiles, "last_seen": last_seen, "topology": topology}
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import anyio
|
||||
from anyio import create_task_group, open_process
|
||||
from anyio.abc import TaskGroup
|
||||
from anyio.streams.text import TextReceiveStream
|
||||
from anyio.streams.buffered import BufferedByteReceiveStream
|
||||
from loguru import logger
|
||||
|
||||
from exo.shared.constants import EXO_CONFIG_FILE
|
||||
@@ -49,9 +50,9 @@ class NodeConfig(CamelCaseModel):
|
||||
|
||||
@classmethod
|
||||
async def gather(cls) -> Self | None:
|
||||
f = anyio.Path(EXO_CONFIG_FILE)
|
||||
await f.touch(exist_ok=True)
|
||||
async with await f.open("rb") as f:
|
||||
cfg_file = anyio.Path(EXO_CONFIG_FILE)
|
||||
await cfg_file.touch(exist_ok=True)
|
||||
async with await cfg_file.open("rb") as f:
|
||||
try:
|
||||
contents = (await f.read()).decode("utf-8")
|
||||
data = tomllib.loads(contents)
|
||||
@@ -82,9 +83,9 @@ async def _gather_iface_map() -> dict[str, str] | None:
|
||||
port = ""
|
||||
for line in proc.stdout.decode("utf-8").split("\n"):
|
||||
if line.startswith("Hardware Port:"):
|
||||
port = line.strip()[15:]
|
||||
port = line.split(": ")[1]
|
||||
elif line.startswith("Device:"):
|
||||
ports[port] = line.strip()[8:]
|
||||
ports[port] = line.split(": ")[1]
|
||||
port = ""
|
||||
if "" in ports:
|
||||
del ports[""]
|
||||
@@ -147,11 +148,9 @@ class InfoGatherer:
|
||||
return
|
||||
|
||||
old_idents = []
|
||||
old_conns = []
|
||||
while True:
|
||||
data = await TBConnectivity.gather()
|
||||
if data is None:
|
||||
break
|
||||
assert data is not None
|
||||
|
||||
idents = [it for i in data if (it := i.ident(iface_map)) is not None]
|
||||
if idents != old_idents:
|
||||
@@ -159,9 +158,9 @@ class InfoGatherer:
|
||||
old_idents = idents
|
||||
|
||||
conns = [it for i in data if (it := i.conn()) is not None]
|
||||
if conns != old_conns:
|
||||
await self.info_sender.send(conns)
|
||||
old_conns = conns
|
||||
await self.info_sender.send(conns)
|
||||
|
||||
await anyio.sleep(self.system_profiler_interval)
|
||||
|
||||
async def _monitor_memory_usage(self):
|
||||
override_memory_env = os.getenv("OVERRIDE_MEMORY_MB")
|
||||
@@ -200,9 +199,8 @@ class InfoGatherer:
|
||||
if not p.stdout:
|
||||
logger.critical("MacMon closed stdout")
|
||||
return
|
||||
async for text in TextReceiveStream(p.stdout):
|
||||
async for text in TextReceiveStream(BufferedByteReceiveStream(p.stdout)):
|
||||
await self.info_sender.send(MacmonMetrics.from_raw_json(text))
|
||||
|
||||
except CalledProcessError as e:
|
||||
stderr_msg = "no stderr"
|
||||
stderr_output = cast(bytes | str | None, e.stderr)
|
||||
|
||||
Reference in New Issue
Block a user