Redirect runner stdout/stderr to file logs (#2084)

## Motivation

We want to use log mining tools like
[Drain3](https://github.com/logpai/Drain3) to get standardized error
formats, but for that we should record runner stdout/stderr in a massive
append-only log to gather training data for such tools. Also useful for
future opt-in telemetry.

## Changes

The stdout/stderr from runner now splits into 3 tasks: 
1) raw write to dedicated runner logs 
2) sanitized line-by-line logging with log-guru 
3) stub for further error-processing (i.e. turning lines into errors)

### Manual Testing
Works on 4x mac mini clusted connected as TB4 ring.
This commit is contained in:
Andrei Cravtov
2026-05-12 11:48:08 +01:00
committed by GitHub
parent 87c72fc1fd
commit ed2d10bdc6
4 changed files with 150 additions and 42 deletions

View File

@@ -68,6 +68,10 @@ DASHBOARD_DIR = (
# Log files (data/logs or cache)
EXO_LOG_DIR = EXO_CACHE_HOME / "exo_log"
EXO_LOG = EXO_LOG_DIR / "exo.log"
EXO_RUNNER_LOG_DIR = EXO_LOG_DIR / "runner_log"
EXO_RUNNER_STDOUT_LOG = EXO_RUNNER_LOG_DIR / "stdout.log"
EXO_RUNNER_STDERR_LOG = EXO_RUNNER_LOG_DIR / "stderr.log"
EXO_TEST_LOG = EXO_CACHE_HOME / "exo_test.log"
EXO_PID_FILE = EXO_CACHE_HOME / "exo.pid"

View File

@@ -223,7 +223,7 @@ class Worker:
# lets not kill the worker if a runner is unresponsive
match task:
case CreateRunner():
self._create_supervisor(task)
await self._create_supervisor(task)
self._instance_backoff.record_attempt(task.instance_id)
await self.event_sender.send(
TaskStatusUpdated(
@@ -370,9 +370,9 @@ class Worker:
instance.shard_assignments.node_to_runner[self.node_id]
].start_task(task)
def _create_supervisor(self, task: CreateRunner) -> RunnerSupervisor:
async def _create_supervisor(self, task: CreateRunner) -> RunnerSupervisor:
"""Creates and stores a new AssignedRunner with initial downloading status."""
runner = RunnerSupervisor.create(
runner = await RunnerSupervisor.create(
bound_instance=task.bound_instance,
event_sender=self.event_sender.clone(),
)

View File

@@ -1,16 +1,20 @@
import codecs
import contextlib
import signal
from dataclasses import dataclass, field
from typing import Self
from os import PathLike
from typing import Callable, Self
import anyio
from anyio import (
AsyncFile,
BrokenResourceError,
CancelScope,
ClosedResourceError,
EndOfStream,
)
from loguru import logger
from exo.shared.constants import EXO_RUNNER_STDERR_LOG, EXO_RUNNER_STDOUT_LOG
from exo.shared.types.chunks import ErrorChunk
from exo.shared.types.events import (
ChunkGenerated,
@@ -42,6 +46,7 @@ from exo.shared.types.worker.runners import (
from exo.shared.types.worker.shards import ShardMetadata
from exo.utils.async_process import AsyncProcess
from exo.utils.channels import MpReceiver, MpSender, Receiver, Sender, mp_channel
from exo.utils.fs import ensure_parent_directory_exists
from exo.utils.task_group import TaskGroup
from exo.worker.runner.bootstrap import entrypoint
@@ -49,11 +54,127 @@ PREFILL_TIMEOUT_SECONDS = 60
DECODE_TIMEOUT_SECONDS = 5
@dataclass(eq=False)
class RunnerStdioHandler:
_stdout_rx: Receiver[bytes]
_stderr_rx: Receiver[bytes]
_stdout_log: AsyncFile[str]
_stderr_log: AsyncFile[str]
_tg: TaskGroup = field(default_factory=TaskGroup, init=False)
@classmethod
async def create(
cls,
*,
stdout_rx: Receiver[bytes],
stderr_rx: Receiver[bytes],
stdout_log_path: PathLike[str] = EXO_RUNNER_STDOUT_LOG,
stderr_log_path: PathLike[str] = EXO_RUNNER_STDERR_LOG,
) -> Self:
# these are append only logs used to gather data for log template mining
#
# TODO: in the future use [Drain3](https://github.com/logpai/Drain3)
# to mine these logs
ensure_parent_directory_exists(stdout_log_path)
ensure_parent_directory_exists(stderr_log_path)
stdout_log = await anyio.open_file(stdout_log_path, "a")
stderr_log = await anyio.open_file(stderr_log_path, "a")
# instantiate and return
self = cls(
_stdout_rx=stdout_rx,
_stderr_rx=stderr_rx,
_stdout_log=stdout_log,
_stderr_log=stderr_log,
)
return self
async def run(self):
try:
async with self._tg as tg:
tg.start_soon( # pyright: ignore[reportUnknownArgumentType]
self._handle_runner_output,
self._stdout_rx,
self._stdout_log,
lambda line: logger.info(f"Runner stdout: {line}"), # pyright: ignore[reportUnknownLambdaType]
)
tg.start_soon( # pyright: ignore[reportUnknownArgumentType]
self._handle_runner_output,
self._stderr_rx,
self._stderr_log,
lambda line: logger.warning(f"Runner stderr: {line}"), # pyright: ignore[reportUnknownLambdaType]
)
finally:
with CancelScope(shield=True):
await self._stdout_log.aclose()
await self._stderr_log.aclose()
async def _handle_runner_output(
self,
rx: Receiver[bytes],
logfile: AsyncFile[str],
log_line: Callable[[str], None],
):
# TODO: right now it logs them as warnings, but in the future they should be split
# into being logged AND a seperate task which tries to best-effort figure out cause
# of error and package into error enum, which then is used by rest of app to act on it;
# inferring what the error is would be done by pattern-matching in the text for things
# e.g. certain VLLM error codes and so on
# not using TextReceiveStream because it doesn't do final=True handling on errors
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
pending_line = ""
async def handle_line(line: str):
# preserve whitespace for later log-mining
line = line.removesuffix("\r")
if not line:
return
# Send to logger & error recovery task
log_line(line)
# TODO: error recovery task
async def handle_text(text: str):
nonlocal pending_line
if not text:
return
await logfile.write(text)
await logfile.flush()
# newline buffering
pending_line += text
lines = pending_line.split("\n")
pending_line = lines.pop()
for line in lines:
await handle_line(line)
try:
with rx:
async for chunk in rx:
await handle_text(decoder.decode(chunk, final=False))
except (ClosedResourceError, BrokenResourceError):
logger.warning("Runner stdio stream closed before clean EOF")
finally:
with CancelScope(shield=True):
await handle_text(decoder.decode(b"", final=True))
await logfile.flush()
if pending_line:
await handle_line(pending_line)
pending_line = ""
@dataclass(eq=False)
class RunnerSupervisor:
shard_metadata: ShardMetadata
bound_instance: BoundInstance
runner_process: AsyncProcess
_runner_stdio_handler: RunnerStdioHandler
initialize_timeout: float
_ev_recv: MpReceiver[Event]
_task_sender: MpSender[Task]
@@ -70,7 +191,7 @@ class RunnerSupervisor:
)
@classmethod
def create(
async def create(
cls,
*,
bound_instance: BoundInstance,
@@ -92,6 +213,9 @@ class RunnerSupervisor:
),
daemon=True,
)
runner_stdio_handler = await RunnerStdioHandler.create(
stdout_rx=runner_process.stdout, stderr_rx=runner_process.stderr
)
shard_metadata = bound_instance.bound_shard
@@ -99,6 +223,7 @@ class RunnerSupervisor:
bound_instance=bound_instance,
shard_metadata=shard_metadata,
runner_process=runner_process,
_runner_stdio_handler=runner_stdio_handler,
initialize_timeout=initialize_timeout,
_ev_recv=ev_recv,
_task_sender=task_sender,
@@ -111,22 +236,9 @@ class RunnerSupervisor:
async def run(self):
try:
async with self._tg as tg:
# start the process itself
# start the process itself & handle its stdout/stderr
await tg.start(self.runner_process.run)
# start tasks to drain/collect stdout/stderr into usable errors
#
# TODO: right now it logs them as warnings, but in the future they should be split
# into being logged AND a seperate task which tries to best-effort figure out cause
# of error and package into error enum, which then is used by rest of app to act on it;
# inferring what the error is would be done by pattern-matching in the text for things
# e.g. certain VLLM error codes and so on
tg.start_soon(
self._forward_runner_output, "stdout", self.runner_process.stdout
)
tg.start_soon(
self._forward_runner_output, "stderr", self.runner_process.stderr
)
tg.start_soon(self._runner_stdio_handler.run)
tg.start_soon(self._watch_runner)
tg.start_soon(self._forward_events)
@@ -235,25 +347,6 @@ class RunnerSupervisor:
if not self.runner_process.is_alive():
await self._check_runner(RuntimeError("Runner found to be dead"))
async def _forward_runner_output(
self,
stream_name: str,
stream: Receiver[bytes],
) -> None:
while True:
try:
chunk = await stream.receive()
except (EndOfStream, ClosedResourceError, BrokenResourceError):
return
message = chunk.decode("utf-8", errors="replace").rstrip()
if not message:
continue
if stream_name == "stderr":
logger.warning(f"Runner stderr: {message}")
else:
logger.debug(f"Runner stdout: {message}")
async def _check_runner(self, e: Exception) -> None:
if not self._cancel_watch_runner.cancel_called:
self._cancel_watch_runner.cancel()

View File

@@ -17,11 +17,17 @@ from exo.shared.types.worker.instances import BoundInstance, InstanceId
from exo.shared.types.worker.runners import RunnerFailed, RunnerId
from exo.utils.async_process import AsyncProcess
from exo.utils.channels import channel, mp_channel
from exo.worker.runner.supervisor import RunnerSupervisor
from exo.worker.runner.supervisor import RunnerStdioHandler, RunnerSupervisor
from exo.worker.tests.unittests.conftest import get_bound_mlx_ring_instance
class _DeadProcess:
def __init__(self):
rx1, _ = channel[bytes]()
rx2, _ = channel[bytes]()
self.stdout = rx1
self.stderr = rx2
exitcode = -6
def is_alive(self) -> bool:
@@ -42,10 +48,15 @@ async def test_check_runner_emits_error_chunk_for_inflight_text_generation() ->
node_id=NodeId("node-a"),
)
proc = cast(AsyncProcess, cast(object, _DeadProcess()))
handler = await RunnerStdioHandler.create(
stdout_rx=proc.stdout, stderr_rx=proc.stderr
)
supervisor = RunnerSupervisor(
shard_metadata=bound_instance.bound_shard,
bound_instance=bound_instance,
runner_process=cast(AsyncProcess, cast(object, _DeadProcess())),
runner_process=proc,
_runner_stdio_handler=handler,
initialize_timeout=400,
_ev_recv=ev_recv,
_task_sender=task_sender,