From 70298ce0a9bcf68c978b3157de402de8a148fdec Mon Sep 17 00:00:00 2001 From: rltakashige Date: Tue, 9 Dec 2025 15:57:28 +0000 Subject: [PATCH] Negative index nack request --- src/exo/worker/main.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/exo/worker/main.py b/src/exo/worker/main.py index d31e7fa4..6028c2b4 100644 --- a/src/exo/worker/main.py +++ b/src/exo/worker/main.py @@ -159,8 +159,9 @@ class Worker: or self._nack_cancel_scope.cancel_called ): assert self._tg + # Request the next index. self._tg.start_soon( - self._nack_request, self.state.last_event_applied_idx + self._nack_request, self.state.last_event_applied_idx + 1 ) continue elif indexed_events and self._nack_cancel_scope: @@ -295,6 +296,11 @@ class Worker: # We request all events after (and including) the missing index. # This function is started whenever we receive an event that is out of sequence. # It is cancelled as soon as we receiver an event that is in sequence. + + if since_idx < 0: + logger.warning(f"Negative value encountered for nack request {since_idx=}") + since_idx = 0 + with CancelScope() as scope: self._nack_cancel_scope = scope delay: float = self._nack_base_seconds * (2.0**self._nack_attempts) @@ -302,6 +308,9 @@ class Worker: self._nack_attempts += 1 try: await anyio.sleep(delay) + logger.info( + f"Nack attempt {self._nack_attempts}: Requesting Event Log from {since_idx}" + ) await self.command_sender.send( ForwarderCommand( origin=self.node_id,