mirror of
https://github.com/exo-explore/exo.git
synced 2025-12-23 22:27:50 -05:00
fix a race condition
This commit is contained in:
@@ -193,6 +193,7 @@ class Master:
|
||||
logger.debug(f"Master indexing event: {str(event)[:100]}")
|
||||
indexed = IndexedEvent(event=event, idx=len(self._event_log))
|
||||
self.state = apply(self.state, indexed)
|
||||
|
||||
# TODO: SQL
|
||||
self._event_log.append(event)
|
||||
await self._send_event(indexed)
|
||||
@@ -225,6 +226,7 @@ class Master:
|
||||
)
|
||||
local_index += 1
|
||||
|
||||
# This function is re-entrant, take care!
|
||||
async def _send_event(self, event: IndexedEvent):
|
||||
# Convenience method since this line is ugly
|
||||
await self.global_event_sender.send(
|
||||
|
||||
@@ -19,6 +19,9 @@ class OrderedBuffer[T]:
|
||||
if idx < self.next_idx_to_release:
|
||||
return
|
||||
if idx in self.store:
|
||||
assert self.store[idx] == t, (
|
||||
"Received different messages with identical indices, probable race condition"
|
||||
)
|
||||
return
|
||||
self.store[idx] = t
|
||||
|
||||
|
||||
@@ -630,18 +630,21 @@ class Worker:
|
||||
async for event in self.fail_runner(e, runner_id):
|
||||
yield event
|
||||
|
||||
|
||||
|
||||
# This function is re-entrant, take care!
|
||||
async def event_publisher(self, event: Event) -> None:
|
||||
fe = ForwarderEvent(
|
||||
origin_idx=self.local_event_index,
|
||||
origin=self.node_id,
|
||||
event=event,
|
||||
)
|
||||
await self.local_event_sender.send(fe)
|
||||
self.out_for_delivery[event.event_id] = fe
|
||||
logger.debug(
|
||||
f"Worker published event {self.local_event_index}: {str(event)[:100]}"
|
||||
)
|
||||
self.local_event_index += 1
|
||||
await self.local_event_sender.send(fe)
|
||||
self.out_for_delivery[event.event_id] = fe
|
||||
|
||||
|
||||
def event_relevant_to_worker(event: Event, worker: Worker):
|
||||
|
||||
Reference in New Issue
Block a user