Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Cheema
cd497c3696 Deduplicate tasks in plan_step 2025-12-30 17:28:19 +00:00

View File

@@ -10,6 +10,7 @@ from exo.routing.connection_message import ConnectionMessage, ConnectionMessageT
from exo.shared.apply import apply
from exo.shared.types.commands import ForwarderCommand, RequestEventLog
from exo.shared.types.common import NodeId, SessionId
from exo.shared.types.tasks import TaskId
from exo.shared.types.events import (
Event,
EventId,
@@ -172,6 +173,7 @@ class Worker:
self.state = apply(self.state, IndexedEvent(idx=idx, event=event))
async def plan_step(self):
seen_tasks: set[TaskId] = set()
while True:
await anyio.sleep(0.1)
# 3. based on the updated state, we plan & execute an operation.
@@ -186,6 +188,10 @@ class Worker:
)
if task is None:
continue
if task.task_id in seen_tasks:
logger.warning("Worker tried to plan a duplicate task")
continue
seen_tasks.add(task.task_id)
logger.info(f"Worker plan: {task.__class__.__name__}")
assert task.task_status
await self.event_sender.send(TaskCreated(task_id=task.task_id, task=task))