mirror of
https://github.com/fastapi/fastapi.git
synced 2026-03-02 05:37:08 -05:00
223 lines
6.2 KiB
Python
223 lines
6.2 KiB
Python
from typing import Annotated, Any
|
|
|
|
from annotated_doc import Doc
|
|
from pydantic import AfterValidator, BaseModel, Field, model_validator
|
|
from starlette.responses import StreamingResponse
|
|
|
|
# Canonical SSE event schema matching the OpenAPI 3.2 spec
|
|
# (Section 4.14.4 "Special Considerations for Server-Sent Events")
|
|
_SSE_EVENT_SCHEMA: dict[str, Any] = {
|
|
"type": "object",
|
|
"properties": {
|
|
"data": {"type": "string"},
|
|
"event": {"type": "string"},
|
|
"id": {"type": "string"},
|
|
"retry": {"type": "integer", "minimum": 0},
|
|
},
|
|
}
|
|
|
|
|
|
class EventSourceResponse(StreamingResponse):
|
|
"""Streaming response with `text/event-stream` media type.
|
|
|
|
Use as `response_class=EventSourceResponse` on a *path operation* that uses `yield`
|
|
to enable Server Sent Events (SSE) responses.
|
|
|
|
Works with **any HTTP method** (`GET`, `POST`, etc.), which makes it compatible
|
|
with protocols like MCP that stream SSE over `POST`.
|
|
|
|
The actual encoding logic lives in the FastAPI routing layer. This class
|
|
serves mainly as a marker and sets the correct `Content-Type`.
|
|
"""
|
|
|
|
media_type = "text/event-stream"
|
|
|
|
|
|
def _check_id_no_null(v: str | None) -> str | None:
|
|
if v is not None and "\0" in v:
|
|
raise ValueError("SSE 'id' must not contain null characters")
|
|
return v
|
|
|
|
|
|
class ServerSentEvent(BaseModel):
|
|
"""Represents a single Server-Sent Event.
|
|
|
|
When `yield`ed from a *path operation function* that uses
|
|
`response_class=EventSourceResponse`, each `ServerSentEvent` is encoded
|
|
into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)
|
|
(`text/event-stream`).
|
|
|
|
If you yield a plain object (dict, Pydantic model, etc.) instead, it is
|
|
automatically JSON-encoded and sent as the `data:` field.
|
|
|
|
All `data` values **including plain strings** are JSON-serialized.
|
|
|
|
For example, `data="hello"` produces `data: "hello"` on the wire (with
|
|
quotes).
|
|
"""
|
|
|
|
data: Annotated[
|
|
Any,
|
|
Doc(
|
|
"""
|
|
The event payload.
|
|
|
|
Can be any JSON-serializable value: a Pydantic model, dict, list,
|
|
string, number, etc. It is **always** serialized to JSON: strings
|
|
are quoted (`"hello"` becomes `data: "hello"` on the wire).
|
|
|
|
Mutually exclusive with `raw_data`.
|
|
"""
|
|
),
|
|
] = None
|
|
raw_data: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Raw string to send as the `data:` field **without** JSON encoding.
|
|
|
|
Use this when you need to send pre-formatted text, HTML fragments,
|
|
CSV lines, or any non-JSON payload. The string is placed directly
|
|
into the `data:` field as-is.
|
|
|
|
Mutually exclusive with `data`.
|
|
"""
|
|
),
|
|
] = None
|
|
event: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Optional event type name.
|
|
|
|
Maps to `addEventListener(event, ...)` on the browser. When omitted,
|
|
the browser dispatches on the generic `message` event.
|
|
"""
|
|
),
|
|
] = None
|
|
id: Annotated[
|
|
str | None,
|
|
AfterValidator(_check_id_no_null),
|
|
Doc(
|
|
"""
|
|
Optional event ID.
|
|
|
|
The browser sends this value back as the `Last-Event-ID` header on
|
|
automatic reconnection. **Must not contain null (`\\0`) characters.**
|
|
"""
|
|
),
|
|
] = None
|
|
retry: Annotated[
|
|
int | None,
|
|
Field(ge=0),
|
|
Doc(
|
|
"""
|
|
Optional reconnection time in **milliseconds**.
|
|
|
|
Tells the browser how long to wait before reconnecting after the
|
|
connection is lost. Must be a non-negative integer.
|
|
"""
|
|
),
|
|
] = None
|
|
comment: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Optional comment line(s).
|
|
|
|
Comment lines start with `:` in the SSE wire format and are ignored by
|
|
`EventSource` clients. Useful for keep-alive pings to prevent
|
|
proxy/load-balancer timeouts.
|
|
"""
|
|
),
|
|
] = None
|
|
|
|
@model_validator(mode="after")
|
|
def _check_data_exclusive(self) -> "ServerSentEvent":
|
|
if self.data is not None and self.raw_data is not None:
|
|
raise ValueError(
|
|
"Cannot set both 'data' and 'raw_data' on the same "
|
|
"ServerSentEvent. Use 'data' for JSON-serialized payloads "
|
|
"or 'raw_data' for pre-formatted strings."
|
|
)
|
|
return self
|
|
|
|
|
|
def format_sse_event(
|
|
*,
|
|
data_str: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Pre-serialized data string to use as the `data:` field.
|
|
"""
|
|
),
|
|
] = None,
|
|
event: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Optional event type name (`event:` field).
|
|
"""
|
|
),
|
|
] = None,
|
|
id: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Optional event ID (`id:` field).
|
|
"""
|
|
),
|
|
] = None,
|
|
retry: Annotated[
|
|
int | None,
|
|
Doc(
|
|
"""
|
|
Optional reconnection time in milliseconds (`retry:` field).
|
|
"""
|
|
),
|
|
] = None,
|
|
comment: Annotated[
|
|
str | None,
|
|
Doc(
|
|
"""
|
|
Optional comment line(s) (`:` prefix).
|
|
"""
|
|
),
|
|
] = None,
|
|
) -> bytes:
|
|
"""Build SSE wire-format bytes from **pre-serialized** data.
|
|
|
|
The result always ends with `\n\n` (the event terminator).
|
|
"""
|
|
lines: list[str] = []
|
|
|
|
if comment is not None:
|
|
for line in comment.splitlines():
|
|
lines.append(f": {line}")
|
|
|
|
if event is not None:
|
|
lines.append(f"event: {event}")
|
|
|
|
if data_str is not None:
|
|
for line in data_str.splitlines():
|
|
lines.append(f"data: {line}")
|
|
|
|
if id is not None:
|
|
lines.append(f"id: {id}")
|
|
|
|
if retry is not None:
|
|
lines.append(f"retry: {retry}")
|
|
|
|
lines.append("")
|
|
lines.append("")
|
|
return "\n".join(lines).encode("utf-8")
|
|
|
|
|
|
# Keep-alive comment, per the SSE spec recommendation
|
|
KEEPALIVE_COMMENT = b": ping\n\n"
|
|
|
|
# Seconds between keep-alive pings when a generator is idle.
|
|
# Private but importable so tests can monkeypatch it.
|
|
_PING_INTERVAL: float = 15.0
|