mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-24 05:48:44 -05:00
Compare commits
2 Commits
main
...
alexcheema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
922e8075d3 | ||
|
|
6ee745246d |
@@ -1269,15 +1269,30 @@ class API:
|
||||
self.state.last_event_applied_idx == -1
|
||||
and state.last_event_applied_idx > self.state.last_event_applied_idx
|
||||
):
|
||||
logger.info(
|
||||
f"API catching up state to idx {state.last_event_applied_idx}"
|
||||
)
|
||||
self.event_buffer.store = {}
|
||||
self.event_buffer.next_idx_to_release = (
|
||||
state.last_event_applied_idx + 1
|
||||
# DEBUG: Log buffer state BEFORE clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: About to catch up. "
|
||||
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
|
||||
f"catching up to idx: {state.last_event_applied_idx}"
|
||||
)
|
||||
|
||||
new_idx = state.last_event_applied_idx + 1
|
||||
self.event_buffer.next_idx_to_release = new_idx
|
||||
# Preserve events that arrived early but are still valid (idx >= new_idx)
|
||||
# Remove stale events (idx < new_idx) to prevent memory growth
|
||||
self.event_buffer.store = {
|
||||
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
|
||||
}
|
||||
self.state = state
|
||||
|
||||
# DEBUG: Log buffer state AFTER clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: Catchup complete. "
|
||||
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
|
||||
async def _apply_state(self):
|
||||
with self.global_event_receiver as events:
|
||||
async for f_event in events:
|
||||
|
||||
@@ -123,13 +123,16 @@ class Worker:
|
||||
async def _forward_info(self, recv: Receiver[GatheredInfo]):
|
||||
with recv as info_stream:
|
||||
async for info in info_stream:
|
||||
await self.event_sender.send(
|
||||
NodeGatheredInfo(
|
||||
node_id=self.node_id,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
info=info,
|
||||
)
|
||||
event = NodeGatheredInfo(
|
||||
node_id=self.node_id,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
info=info,
|
||||
)
|
||||
logger.warning(
|
||||
f"NODE_GATHERED_INFO: Sending event for node {self.node_id}, "
|
||||
f"event_id={event.event_id}"
|
||||
)
|
||||
await self.event_sender.send(event)
|
||||
|
||||
async def _check_catchup_state(self):
|
||||
with self.state_catchup_receiver as states:
|
||||
@@ -138,15 +141,30 @@ class Worker:
|
||||
self.state.last_event_applied_idx == -1
|
||||
and state.last_event_applied_idx > self.state.last_event_applied_idx
|
||||
):
|
||||
logger.info(
|
||||
f"Worker catching up state to idx {state.last_event_applied_idx}"
|
||||
)
|
||||
self.event_buffer.store = {}
|
||||
self.event_buffer.next_idx_to_release = (
|
||||
state.last_event_applied_idx + 1
|
||||
# DEBUG: Log buffer state BEFORE clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: About to catch up. "
|
||||
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
|
||||
f"catching up to idx: {state.last_event_applied_idx}"
|
||||
)
|
||||
|
||||
new_idx = state.last_event_applied_idx + 1
|
||||
self.event_buffer.next_idx_to_release = new_idx
|
||||
# Preserve events that arrived early but are still valid (idx >= new_idx)
|
||||
# Remove stale events (idx < new_idx) to prevent memory growth
|
||||
self.event_buffer.store = {
|
||||
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
|
||||
}
|
||||
self.state = state
|
||||
|
||||
# DEBUG: Log buffer state AFTER clearing
|
||||
logger.warning(
|
||||
f"STATE_CATCHUP: Catchup complete. "
|
||||
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
|
||||
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
|
||||
async def _event_applier(self):
|
||||
with self.global_event_receiver as events:
|
||||
async for f_event in events:
|
||||
@@ -157,8 +175,20 @@ class Worker:
|
||||
if event_id in self.out_for_delivery:
|
||||
del self.out_for_delivery[event_id]
|
||||
|
||||
# DEBUG: Log what was ingested
|
||||
logger.warning(
|
||||
f"EVENT_APPLIER: Ingested event idx={f_event.origin_idx}, "
|
||||
f"buffer keys now: {sorted(self.event_buffer.store.keys())}"
|
||||
)
|
||||
|
||||
# 2. for each event, apply it to the state
|
||||
indexed_events = self.event_buffer.drain_indexed()
|
||||
|
||||
# DEBUG: Log drain results
|
||||
logger.warning(
|
||||
f"EVENT_APPLIER: Drained {len(indexed_events)} events, "
|
||||
f"next_idx_to_release now: {self.event_buffer.next_idx_to_release}"
|
||||
)
|
||||
if indexed_events:
|
||||
self._nack_attempts = 0
|
||||
|
||||
@@ -175,6 +205,12 @@ class Worker:
|
||||
self._nack_cancel_scope.cancel()
|
||||
|
||||
for idx, event in indexed_events:
|
||||
# DEBUG: Log NodeGatheredInfo events
|
||||
if isinstance(event, NodeGatheredInfo):
|
||||
logger.warning(
|
||||
f"NODE_GATHERED_INFO: Applying event idx={idx} for node {event.node_id}, "
|
||||
f"event_id={event.event_id}"
|
||||
)
|
||||
self.state = apply(self.state, IndexedEvent(idx=idx, event=event))
|
||||
|
||||
# Buffer input image chunks for image editing
|
||||
|
||||
Reference in New Issue
Block a user