From ca321cfcc2edc19f8bc7100cc4267c8fa49ee74c Mon Sep 17 00:00:00 2001 From: Evan Date: Thu, 15 Jan 2026 16:06:44 +0000 Subject: [PATCH] mapping of conns --- src/exo/master/main.py | 3 +++ src/exo/shared/topology.py | 14 ++++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/exo/master/main.py b/src/exo/master/main.py index 798f4d8d..8f213d1d 100644 --- a/src/exo/master/main.py +++ b/src/exo/master/main.py @@ -27,6 +27,7 @@ from exo.shared.types.events import ( ForwarderEvent, IndexedEvent, InstanceDeleted, + NodeGatheredInfo, NodeTimedOut, TaskCreated, TaskDeleted, @@ -236,6 +237,8 @@ class Master: self.state = apply(self.state, indexed) event._master_time_stamp = datetime.now(tz=timezone.utc) # pyright: ignore[reportPrivateUsage] + if isinstance(event, NodeGatheredInfo): + event.when = str(datetime.now(tz=timezone.utc)) self._event_log.append(event) await self._send_event(indexed) diff --git a/src/exo/shared/topology.py b/src/exo/shared/topology.py index bcd8d545..65c26add 100644 --- a/src/exo/shared/topology.py +++ b/src/exo/shared/topology.py @@ -17,7 +17,9 @@ from exo.shared.types.topology import ( class TopologySnapshot(BaseModel): nodes: Sequence[NodeId] - connections: Iterable[Connection] + connections: Mapping[ + NodeId, Mapping[NodeId, Sequence[SocketConnection | RDMAConnection]] + ] model_config = ConfigDict(frozen=True, extra="forbid") @@ -31,7 +33,7 @@ class Topology: def to_snapshot(self) -> TopologySnapshot: return TopologySnapshot( - nodes=list(self.list_nodes()), connections=self.list_connections() + nodes=list(self.list_nodes()), connections=self.map_connections() ) @classmethod @@ -42,8 +44,12 @@ class Topology: with contextlib.suppress(ValueError): topology.add_node(node_id) - for conn in snapshot.connections: - topology.add_connection(conn) + for source in snapshot.connections: + for sink in snapshot.connections[source]: + for edge in snapshot.connections[source][sink]: + topology.add_connection( + Connection(source=source, sink=sink, edge=edge) + ) return topology