Compare commits

...

2 Commits

Author SHA1 Message Date
Alex Cheema
922e8075d3 debug: add logging for NodeGatheredInfo event flow
Track when NodeGatheredInfo events are sent and applied to help
diagnose why joining nodes stay as "unknown".

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 16:03:22 -08:00
Alex Cheema
6ee745246d fix: preserve early-arriving events during state catchup
When a node joins a cluster and catches up state, events arriving before
catchup completes were being lost because the buffer was cleared entirely.
This could cause nodes to remain "unknown" if their NodeGatheredInfo event
arrived during this window.

Now we preserve events with idx >= new_idx instead of clearing all events.
Added debug logging to help diagnose any remaining issues.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 15:56:33 -08:00
2 changed files with 69 additions and 18 deletions

View File

@@ -1269,15 +1269,30 @@ class API:
self.state.last_event_applied_idx == -1
and state.last_event_applied_idx > self.state.last_event_applied_idx
):
logger.info(
f"API catching up state to idx {state.last_event_applied_idx}"
)
self.event_buffer.store = {}
self.event_buffer.next_idx_to_release = (
state.last_event_applied_idx + 1
# DEBUG: Log buffer state BEFORE clearing
logger.warning(
f"STATE_CATCHUP: About to catch up. "
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
f"catching up to idx: {state.last_event_applied_idx}"
)
new_idx = state.last_event_applied_idx + 1
self.event_buffer.next_idx_to_release = new_idx
# Preserve events that arrived early but are still valid (idx >= new_idx)
# Remove stale events (idx < new_idx) to prevent memory growth
self.event_buffer.store = {
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
}
self.state = state
# DEBUG: Log buffer state AFTER clearing
logger.warning(
f"STATE_CATCHUP: Catchup complete. "
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
)
async def _apply_state(self):
with self.global_event_receiver as events:
async for f_event in events:

View File

@@ -123,13 +123,16 @@ class Worker:
async def _forward_info(self, recv: Receiver[GatheredInfo]):
with recv as info_stream:
async for info in info_stream:
await self.event_sender.send(
NodeGatheredInfo(
node_id=self.node_id,
when=str(datetime.now(tz=timezone.utc)),
info=info,
)
event = NodeGatheredInfo(
node_id=self.node_id,
when=str(datetime.now(tz=timezone.utc)),
info=info,
)
logger.warning(
f"NODE_GATHERED_INFO: Sending event for node {self.node_id}, "
f"event_id={event.event_id}"
)
await self.event_sender.send(event)
async def _check_catchup_state(self):
with self.state_catchup_receiver as states:
@@ -138,15 +141,30 @@ class Worker:
self.state.last_event_applied_idx == -1
and state.last_event_applied_idx > self.state.last_event_applied_idx
):
logger.info(
f"Worker catching up state to idx {state.last_event_applied_idx}"
)
self.event_buffer.store = {}
self.event_buffer.next_idx_to_release = (
state.last_event_applied_idx + 1
# DEBUG: Log buffer state BEFORE clearing
logger.warning(
f"STATE_CATCHUP: About to catch up. "
f"Current buffer indices: {sorted(self.event_buffer.store.keys())}, "
f"next_idx_to_release: {self.event_buffer.next_idx_to_release}, "
f"catching up to idx: {state.last_event_applied_idx}"
)
new_idx = state.last_event_applied_idx + 1
self.event_buffer.next_idx_to_release = new_idx
# Preserve events that arrived early but are still valid (idx >= new_idx)
# Remove stale events (idx < new_idx) to prevent memory growth
self.event_buffer.store = {
k: v for k, v in self.event_buffer.store.items() if k >= new_idx
}
self.state = state
# DEBUG: Log buffer state AFTER clearing
logger.warning(
f"STATE_CATCHUP: Catchup complete. "
f"Buffer preserved indices: {sorted(self.event_buffer.store.keys())}, "
f"new next_idx_to_release: {self.event_buffer.next_idx_to_release}"
)
async def _event_applier(self):
with self.global_event_receiver as events:
async for f_event in events:
@@ -157,8 +175,20 @@ class Worker:
if event_id in self.out_for_delivery:
del self.out_for_delivery[event_id]
# DEBUG: Log what was ingested
logger.warning(
f"EVENT_APPLIER: Ingested event idx={f_event.origin_idx}, "
f"buffer keys now: {sorted(self.event_buffer.store.keys())}"
)
# 2. for each event, apply it to the state
indexed_events = self.event_buffer.drain_indexed()
# DEBUG: Log drain results
logger.warning(
f"EVENT_APPLIER: Drained {len(indexed_events)} events, "
f"next_idx_to_release now: {self.event_buffer.next_idx_to_release}"
)
if indexed_events:
self._nack_attempts = 0
@@ -175,6 +205,12 @@ class Worker:
self._nack_cancel_scope.cancel()
for idx, event in indexed_events:
# DEBUG: Log NodeGatheredInfo events
if isinstance(event, NodeGatheredInfo):
logger.warning(
f"NODE_GATHERED_INFO: Applying event idx={idx} for node {event.node_id}, "
f"event_id={event.event_id}"
)
self.state = apply(self.state, IndexedEvent(idx=idx, event=event))
# Buffer input image chunks for image editing