mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-21 04:22:21 -05:00
Compare commits
1 Commits
foo
...
alexcheema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd497c3696 |
@@ -10,6 +10,7 @@ from exo.routing.connection_message import ConnectionMessage, ConnectionMessageT
|
||||
from exo.shared.apply import apply
|
||||
from exo.shared.types.commands import ForwarderCommand, RequestEventLog
|
||||
from exo.shared.types.common import NodeId, SessionId
|
||||
from exo.shared.types.tasks import TaskId
|
||||
from exo.shared.types.events import (
|
||||
Event,
|
||||
EventId,
|
||||
@@ -172,6 +173,7 @@ class Worker:
|
||||
self.state = apply(self.state, IndexedEvent(idx=idx, event=event))
|
||||
|
||||
async def plan_step(self):
|
||||
seen_tasks: set[TaskId] = set()
|
||||
while True:
|
||||
await anyio.sleep(0.1)
|
||||
# 3. based on the updated state, we plan & execute an operation.
|
||||
@@ -186,6 +188,10 @@ class Worker:
|
||||
)
|
||||
if task is None:
|
||||
continue
|
||||
if task.task_id in seen_tasks:
|
||||
logger.warning("Worker tried to plan a duplicate task")
|
||||
continue
|
||||
seen_tasks.add(task.task_id)
|
||||
logger.info(f"Worker plan: {task.__class__.__name__}")
|
||||
assert task.task_status
|
||||
await self.event_sender.send(TaskCreated(task_id=task.task_id, task=task))
|
||||
|
||||
Reference in New Issue
Block a user