From 87c72fc1fd20aaa5e8204bdb83129157fcc2380c Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Mon, 11 May 2026 13:15:22 +0100 Subject: [PATCH] Fixes issue #2068 (#2083) ## Motivation To fix https://github.com/exo-explore/exo/issues/2068 ## Changes Adds queue shutdown logic & hard-timeouts for closing server. ## Why It Works Prevents API from hanging more than 5 seconds. --- src/exo/api/main.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/exo/api/main.py b/src/exo/api/main.py index 4fb6d2d3b..e346a4f92 100644 --- a/src/exo/api/main.py +++ b/src/exo/api/main.py @@ -20,6 +20,7 @@ from fastapi.staticfiles import StaticFiles from hypercorn.asyncio import serve # pyright: ignore[reportUnknownVariableType] from hypercorn.config import Config from hypercorn.typing import ASGIFramework +from hypercorn.utils import LifespanTimeoutError from loguru import logger from exo.api.adapters.chat_completions import ( @@ -1856,12 +1857,21 @@ class API: await anyio.sleep_forever() finally: with anyio.CancelScope(shield=True): + # IMPORTANT: when new queues are added, update this (for proper shutdown semantics) + self._shutdown_queues(self._text_generation_queues) + self._shutdown_queues(self._image_generation_queues) + shutdown_ev.set() finally: self._event_log.close() self.command_sender.close() self.event_receiver.close() + @staticmethod + def _shutdown_queues[K, V](queues: dict[K, Sender[V]]): + for v in queues.values(): + v.close() + async def run_api(self, ev: anyio.Event): cfg = Config() cfg.bind = [f"0.0.0.0:{self.port}"] @@ -1869,12 +1879,23 @@ class API: cfg.accesslog = None cfg.errorlog = "-" cfg.logger_class = InterceptLogger + + # prevents hangs when mid-request and connection refuses to close + cfg.graceful_timeout = 2 # seconds + cfg.shutdown_timeout = 3 # seconds + with anyio.CancelScope(shield=True): - await serve( - cast(ASGIFramework, self.app), - cfg, - shutdown_trigger=ev.wait, - ) + try: + await serve( + cast(ASGIFramework, self.app), + cfg, + shutdown_trigger=ev.wait, + ) + except LifespanTimeoutError as e: + logger.warning( + "Graceful server shutdown timed out, some connections forcebly closed" + ) + logger.opt(exception=e).debug("") async def _apply_state(self): with self.event_receiver as events: