mirror of
https://github.com/exo-explore/exo.git
synced 2026-05-19 12:15:07 -04:00
## Motivation To fix https://github.com/exo-explore/exo/issues/2068 ## Changes Adds queue shutdown logic & hard-timeouts for closing server. ## Why It Works Prevents API from hanging more than 5 seconds.
This commit is contained in:
@@ -20,6 +20,7 @@ from fastapi.staticfiles import StaticFiles
|
||||
from hypercorn.asyncio import serve # pyright: ignore[reportUnknownVariableType]
|
||||
from hypercorn.config import Config
|
||||
from hypercorn.typing import ASGIFramework
|
||||
from hypercorn.utils import LifespanTimeoutError
|
||||
from loguru import logger
|
||||
|
||||
from exo.api.adapters.chat_completions import (
|
||||
@@ -1856,12 +1857,21 @@ class API:
|
||||
await anyio.sleep_forever()
|
||||
finally:
|
||||
with anyio.CancelScope(shield=True):
|
||||
# IMPORTANT: when new queues are added, update this (for proper shutdown semantics)
|
||||
self._shutdown_queues(self._text_generation_queues)
|
||||
self._shutdown_queues(self._image_generation_queues)
|
||||
|
||||
shutdown_ev.set()
|
||||
finally:
|
||||
self._event_log.close()
|
||||
self.command_sender.close()
|
||||
self.event_receiver.close()
|
||||
|
||||
@staticmethod
|
||||
def _shutdown_queues[K, V](queues: dict[K, Sender[V]]):
|
||||
for v in queues.values():
|
||||
v.close()
|
||||
|
||||
async def run_api(self, ev: anyio.Event):
|
||||
cfg = Config()
|
||||
cfg.bind = [f"0.0.0.0:{self.port}"]
|
||||
@@ -1869,12 +1879,23 @@ class API:
|
||||
cfg.accesslog = None
|
||||
cfg.errorlog = "-"
|
||||
cfg.logger_class = InterceptLogger
|
||||
|
||||
# prevents hangs when mid-request and connection refuses to close
|
||||
cfg.graceful_timeout = 2 # seconds
|
||||
cfg.shutdown_timeout = 3 # seconds
|
||||
|
||||
with anyio.CancelScope(shield=True):
|
||||
await serve(
|
||||
cast(ASGIFramework, self.app),
|
||||
cfg,
|
||||
shutdown_trigger=ev.wait,
|
||||
)
|
||||
try:
|
||||
await serve(
|
||||
cast(ASGIFramework, self.app),
|
||||
cfg,
|
||||
shutdown_trigger=ev.wait,
|
||||
)
|
||||
except LifespanTimeoutError as e:
|
||||
logger.warning(
|
||||
"Graceful server shutdown timed out, some connections forcebly closed"
|
||||
)
|
||||
logger.opt(exception=e).debug("")
|
||||
|
||||
async def _apply_state(self):
|
||||
with self.event_receiver as events:
|
||||
|
||||
Reference in New Issue
Block a user