From ed2d10bdc6db92c8bd0450060d8f0c03425d7e8e Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Tue, 12 May 2026 11:48:08 +0100 Subject: [PATCH] Redirect runner stdout/stderr to file logs (#2084) ## Motivation We want to use log mining tools like [Drain3](https://github.com/logpai/Drain3) to get standardized error formats, but for that we should record runner stdout/stderr in a massive append-only log to gather training data for such tools. Also useful for future opt-in telemetry. ## Changes The stdout/stderr from runner now splits into 3 tasks: 1) raw write to dedicated runner logs 2) sanitized line-by-line logging with log-guru 3) stub for further error-processing (i.e. turning lines into errors) ### Manual Testing Works on 4x mac mini clusted connected as TB4 ring. --- src/exo/shared/constants.py | 4 + src/exo/worker/main.py | 6 +- src/exo/worker/runner/supervisor.py | 167 ++++++++++++++---- .../test_runner/test_runner_supervisor.py | 15 +- 4 files changed, 150 insertions(+), 42 deletions(-) diff --git a/src/exo/shared/constants.py b/src/exo/shared/constants.py index bd1c537bf..fd0869c46 100644 --- a/src/exo/shared/constants.py +++ b/src/exo/shared/constants.py @@ -68,6 +68,10 @@ DASHBOARD_DIR = ( # Log files (data/logs or cache) EXO_LOG_DIR = EXO_CACHE_HOME / "exo_log" EXO_LOG = EXO_LOG_DIR / "exo.log" +EXO_RUNNER_LOG_DIR = EXO_LOG_DIR / "runner_log" +EXO_RUNNER_STDOUT_LOG = EXO_RUNNER_LOG_DIR / "stdout.log" +EXO_RUNNER_STDERR_LOG = EXO_RUNNER_LOG_DIR / "stderr.log" + EXO_TEST_LOG = EXO_CACHE_HOME / "exo_test.log" EXO_PID_FILE = EXO_CACHE_HOME / "exo.pid" diff --git a/src/exo/worker/main.py b/src/exo/worker/main.py index 9d33cc23e..5c34e7b4b 100644 --- a/src/exo/worker/main.py +++ b/src/exo/worker/main.py @@ -223,7 +223,7 @@ class Worker: # lets not kill the worker if a runner is unresponsive match task: case CreateRunner(): - self._create_supervisor(task) + await self._create_supervisor(task) self._instance_backoff.record_attempt(task.instance_id) await self.event_sender.send( TaskStatusUpdated( @@ -370,9 +370,9 @@ class Worker: instance.shard_assignments.node_to_runner[self.node_id] ].start_task(task) - def _create_supervisor(self, task: CreateRunner) -> RunnerSupervisor: + async def _create_supervisor(self, task: CreateRunner) -> RunnerSupervisor: """Creates and stores a new AssignedRunner with initial downloading status.""" - runner = RunnerSupervisor.create( + runner = await RunnerSupervisor.create( bound_instance=task.bound_instance, event_sender=self.event_sender.clone(), ) diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index bc90d4181..7e4ee9ae1 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -1,16 +1,20 @@ +import codecs import contextlib import signal from dataclasses import dataclass, field -from typing import Self +from os import PathLike +from typing import Callable, Self import anyio from anyio import ( + AsyncFile, BrokenResourceError, + CancelScope, ClosedResourceError, - EndOfStream, ) from loguru import logger +from exo.shared.constants import EXO_RUNNER_STDERR_LOG, EXO_RUNNER_STDOUT_LOG from exo.shared.types.chunks import ErrorChunk from exo.shared.types.events import ( ChunkGenerated, @@ -42,6 +46,7 @@ from exo.shared.types.worker.runners import ( from exo.shared.types.worker.shards import ShardMetadata from exo.utils.async_process import AsyncProcess from exo.utils.channels import MpReceiver, MpSender, Receiver, Sender, mp_channel +from exo.utils.fs import ensure_parent_directory_exists from exo.utils.task_group import TaskGroup from exo.worker.runner.bootstrap import entrypoint @@ -49,11 +54,127 @@ PREFILL_TIMEOUT_SECONDS = 60 DECODE_TIMEOUT_SECONDS = 5 +@dataclass(eq=False) +class RunnerStdioHandler: + _stdout_rx: Receiver[bytes] + _stderr_rx: Receiver[bytes] + _stdout_log: AsyncFile[str] + _stderr_log: AsyncFile[str] + + _tg: TaskGroup = field(default_factory=TaskGroup, init=False) + + @classmethod + async def create( + cls, + *, + stdout_rx: Receiver[bytes], + stderr_rx: Receiver[bytes], + stdout_log_path: PathLike[str] = EXO_RUNNER_STDOUT_LOG, + stderr_log_path: PathLike[str] = EXO_RUNNER_STDERR_LOG, + ) -> Self: + # these are append only logs used to gather data for log template mining + # + # TODO: in the future use [Drain3](https://github.com/logpai/Drain3) + # to mine these logs + ensure_parent_directory_exists(stdout_log_path) + ensure_parent_directory_exists(stderr_log_path) + stdout_log = await anyio.open_file(stdout_log_path, "a") + stderr_log = await anyio.open_file(stderr_log_path, "a") + + # instantiate and return + self = cls( + _stdout_rx=stdout_rx, + _stderr_rx=stderr_rx, + _stdout_log=stdout_log, + _stderr_log=stderr_log, + ) + return self + + async def run(self): + try: + async with self._tg as tg: + tg.start_soon( # pyright: ignore[reportUnknownArgumentType] + self._handle_runner_output, + self._stdout_rx, + self._stdout_log, + lambda line: logger.info(f"Runner stdout: {line}"), # pyright: ignore[reportUnknownLambdaType] + ) + tg.start_soon( # pyright: ignore[reportUnknownArgumentType] + self._handle_runner_output, + self._stderr_rx, + self._stderr_log, + lambda line: logger.warning(f"Runner stderr: {line}"), # pyright: ignore[reportUnknownLambdaType] + ) + finally: + with CancelScope(shield=True): + await self._stdout_log.aclose() + await self._stderr_log.aclose() + + async def _handle_runner_output( + self, + rx: Receiver[bytes], + logfile: AsyncFile[str], + log_line: Callable[[str], None], + ): + # TODO: right now it logs them as warnings, but in the future they should be split + # into being logged AND a seperate task which tries to best-effort figure out cause + # of error and package into error enum, which then is used by rest of app to act on it; + # inferring what the error is would be done by pattern-matching in the text for things + # e.g. certain VLLM error codes and so on + + # not using TextReceiveStream because it doesn't do final=True handling on errors + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + pending_line = "" + + async def handle_line(line: str): + # preserve whitespace for later log-mining + line = line.removesuffix("\r") + if not line: + return + + # Send to logger & error recovery task + log_line(line) + # TODO: error recovery task + + async def handle_text(text: str): + nonlocal pending_line + + if not text: + return + + await logfile.write(text) + await logfile.flush() + + # newline buffering + pending_line += text + lines = pending_line.split("\n") + pending_line = lines.pop() + + for line in lines: + await handle_line(line) + + try: + with rx: + async for chunk in rx: + await handle_text(decoder.decode(chunk, final=False)) + except (ClosedResourceError, BrokenResourceError): + logger.warning("Runner stdio stream closed before clean EOF") + finally: + with CancelScope(shield=True): + await handle_text(decoder.decode(b"", final=True)) + await logfile.flush() + + if pending_line: + await handle_line(pending_line) + pending_line = "" + + @dataclass(eq=False) class RunnerSupervisor: shard_metadata: ShardMetadata bound_instance: BoundInstance runner_process: AsyncProcess + _runner_stdio_handler: RunnerStdioHandler initialize_timeout: float _ev_recv: MpReceiver[Event] _task_sender: MpSender[Task] @@ -70,7 +191,7 @@ class RunnerSupervisor: ) @classmethod - def create( + async def create( cls, *, bound_instance: BoundInstance, @@ -92,6 +213,9 @@ class RunnerSupervisor: ), daemon=True, ) + runner_stdio_handler = await RunnerStdioHandler.create( + stdout_rx=runner_process.stdout, stderr_rx=runner_process.stderr + ) shard_metadata = bound_instance.bound_shard @@ -99,6 +223,7 @@ class RunnerSupervisor: bound_instance=bound_instance, shard_metadata=shard_metadata, runner_process=runner_process, + _runner_stdio_handler=runner_stdio_handler, initialize_timeout=initialize_timeout, _ev_recv=ev_recv, _task_sender=task_sender, @@ -111,22 +236,9 @@ class RunnerSupervisor: async def run(self): try: async with self._tg as tg: - # start the process itself + # start the process itself & handle its stdout/stderr await tg.start(self.runner_process.run) - - # start tasks to drain/collect stdout/stderr into usable errors - # - # TODO: right now it logs them as warnings, but in the future they should be split - # into being logged AND a seperate task which tries to best-effort figure out cause - # of error and package into error enum, which then is used by rest of app to act on it; - # inferring what the error is would be done by pattern-matching in the text for things - # e.g. certain VLLM error codes and so on - tg.start_soon( - self._forward_runner_output, "stdout", self.runner_process.stdout - ) - tg.start_soon( - self._forward_runner_output, "stderr", self.runner_process.stderr - ) + tg.start_soon(self._runner_stdio_handler.run) tg.start_soon(self._watch_runner) tg.start_soon(self._forward_events) @@ -235,25 +347,6 @@ class RunnerSupervisor: if not self.runner_process.is_alive(): await self._check_runner(RuntimeError("Runner found to be dead")) - async def _forward_runner_output( - self, - stream_name: str, - stream: Receiver[bytes], - ) -> None: - while True: - try: - chunk = await stream.receive() - except (EndOfStream, ClosedResourceError, BrokenResourceError): - return - - message = chunk.decode("utf-8", errors="replace").rstrip() - if not message: - continue - if stream_name == "stderr": - logger.warning(f"Runner stderr: {message}") - else: - logger.debug(f"Runner stdout: {message}") - async def _check_runner(self, e: Exception) -> None: if not self._cancel_watch_runner.cancel_called: self._cancel_watch_runner.cancel() diff --git a/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py b/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py index 3ea7c261a..2845ea013 100644 --- a/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py +++ b/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py @@ -17,11 +17,17 @@ from exo.shared.types.worker.instances import BoundInstance, InstanceId from exo.shared.types.worker.runners import RunnerFailed, RunnerId from exo.utils.async_process import AsyncProcess from exo.utils.channels import channel, mp_channel -from exo.worker.runner.supervisor import RunnerSupervisor +from exo.worker.runner.supervisor import RunnerStdioHandler, RunnerSupervisor from exo.worker.tests.unittests.conftest import get_bound_mlx_ring_instance class _DeadProcess: + def __init__(self): + rx1, _ = channel[bytes]() + rx2, _ = channel[bytes]() + self.stdout = rx1 + self.stderr = rx2 + exitcode = -6 def is_alive(self) -> bool: @@ -42,10 +48,15 @@ async def test_check_runner_emits_error_chunk_for_inflight_text_generation() -> node_id=NodeId("node-a"), ) + proc = cast(AsyncProcess, cast(object, _DeadProcess())) + handler = await RunnerStdioHandler.create( + stdout_rx=proc.stdout, stderr_rx=proc.stderr + ) supervisor = RunnerSupervisor( shard_metadata=bound_instance.bound_shard, bound_instance=bound_instance, - runner_process=cast(AsyncProcess, cast(object, _DeadProcess())), + runner_process=proc, + _runner_stdio_handler=handler, initialize_timeout=400, _ev_recv=ev_recv, _task_sender=task_sender,