diff --git a/src/exo/master/main.py b/src/exo/master/main.py index 3c851f9d..2c78af87 100644 --- a/src/exo/master/main.py +++ b/src/exo/master/main.py @@ -95,6 +95,7 @@ class Master: self._tg.cancel_scope.cancel() async def _command_processor(self) -> None: + retry_num = 0 with self.command_receiver as commands: async for forwarder_command in commands: try: @@ -186,8 +187,11 @@ class Master: command.finished_command_id ] case RequestEventLog(): + retry_num += 1 # We should just be able to send everything, since other buffers will ignore old messages for i in range(command.since_idx, len(self._event_log)): + ev = self._event_log[i] + ev._retry = retry_num await self._send_event( IndexedEvent(idx=i, event=self._event_log[i]) ) diff --git a/src/exo/shared/types/events.py b/src/exo/shared/types/events.py index 29b750ef..efe4edea 100644 --- a/src/exo/shared/types/events.py +++ b/src/exo/shared/types/events.py @@ -22,7 +22,8 @@ class EventId(Id): class BaseEvent(TaggedModel): event_id: EventId = Field(default_factory=EventId) # Internal, for debugging. Please don't rely on this field for anything! - _master_time_stamp: None | datetime = None + _master_time_stamp: datetime | None = None + _retry: int | None = None class TestEvent(BaseEvent):