mirror of
https://github.com/exo-explore/exo.git
synced 2025-12-23 22:27:50 -05:00
Negative index nack request
This commit is contained in:
@@ -159,8 +159,9 @@ class Worker:
|
||||
or self._nack_cancel_scope.cancel_called
|
||||
):
|
||||
assert self._tg
|
||||
# Request the next index.
|
||||
self._tg.start_soon(
|
||||
self._nack_request, self.state.last_event_applied_idx
|
||||
self._nack_request, self.state.last_event_applied_idx + 1
|
||||
)
|
||||
continue
|
||||
elif indexed_events and self._nack_cancel_scope:
|
||||
@@ -295,6 +296,11 @@ class Worker:
|
||||
# We request all events after (and including) the missing index.
|
||||
# This function is started whenever we receive an event that is out of sequence.
|
||||
# It is cancelled as soon as we receiver an event that is in sequence.
|
||||
|
||||
if since_idx < 0:
|
||||
logger.warning(f"Negative value encountered for nack request {since_idx=}")
|
||||
since_idx = 0
|
||||
|
||||
with CancelScope() as scope:
|
||||
self._nack_cancel_scope = scope
|
||||
delay: float = self._nack_base_seconds * (2.0**self._nack_attempts)
|
||||
@@ -302,6 +308,9 @@ class Worker:
|
||||
self._nack_attempts += 1
|
||||
try:
|
||||
await anyio.sleep(delay)
|
||||
logger.info(
|
||||
f"Nack attempt {self._nack_attempts}: Requesting Event Log from {since_idx}"
|
||||
)
|
||||
await self.command_sender.send(
|
||||
ForwarderCommand(
|
||||
origin=self.node_id,
|
||||
|
||||
Reference in New Issue
Block a user