mirror of
https://github.com/fastapi/fastapi.git
synced 2026-03-01 13:19:09 -05:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12ea7be0be | ||
|
|
4cd76acafc | ||
|
|
2238155844 |
@@ -4,6 +4,12 @@ If you want to stream data that can be structured as JSON, you should [Stream JS
|
||||
|
||||
But if you want to **stream pure binary data** or strings, here's how you can do it.
|
||||
|
||||
/// info
|
||||
|
||||
Added in FastAPI 0.134.0.
|
||||
|
||||
///
|
||||
|
||||
## Use Cases { #use-cases }
|
||||
|
||||
You could use this if you want to stream pure strings, for example directly from the output of an **AI LLM** service.
|
||||
|
||||
@@ -7,6 +7,13 @@ hide:
|
||||
|
||||
## Latest Changes
|
||||
|
||||
## 0.135.0
|
||||
|
||||
### Features
|
||||
|
||||
* ✨ Add support for Server Sent Events. PR [#15030](https://github.com/fastapi/fastapi/pull/15030) by [@tiangolo](https://github.com/tiangolo).
|
||||
* New docs: [Server-Sent Events (SSE)](https://fastapi.tiangolo.com/tutorial/server-sent-events/).
|
||||
|
||||
## 0.134.0
|
||||
|
||||
### Features
|
||||
|
||||
120
docs/en/docs/tutorial/server-sent-events.md
Normal file
120
docs/en/docs/tutorial/server-sent-events.md
Normal file
@@ -0,0 +1,120 @@
|
||||
# Server-Sent Events (SSE) { #server-sent-events-sse }
|
||||
|
||||
You can stream data to the client using **Server-Sent Events** (SSE).
|
||||
|
||||
This is similar to [Stream JSON Lines](stream-json-lines.md){.internal-link target=_blank}, but uses the `text/event-stream` format, which is supported natively by browsers with the <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource" class="external-link" target="_blank">`EventSource` API</a>.
|
||||
|
||||
/// info
|
||||
|
||||
Added in FastAPI 0.135.0.
|
||||
|
||||
///
|
||||
|
||||
## What are Server-Sent Events? { #what-are-server-sent-events }
|
||||
|
||||
SSE is a standard for streaming data from the server to the client over HTTP.
|
||||
|
||||
Each event is a small text block with "fields" like `data`, `event`, `id`, and `retry`, separated by blank lines.
|
||||
|
||||
It looks like this:
|
||||
|
||||
```
|
||||
data: {"name": "Portal Gun", "price": 999.99}
|
||||
|
||||
data: {"name": "Plumbus", "price": 32.99}
|
||||
|
||||
```
|
||||
|
||||
SSE is commonly used for AI chat streaming, live notifications, logs and observability, and other cases where the server pushes updates to the client.
|
||||
|
||||
/// tip
|
||||
|
||||
If you want to stream binary data, for example video or audio, check the advanced guide: [Stream Data](../advanced/stream-data.md){.internal-link target=_blank}.
|
||||
|
||||
///
|
||||
|
||||
## Stream SSE with FastAPI { #stream-sse-with-fastapi }
|
||||
|
||||
To stream SSE with FastAPI, use `yield` in your *path operation function* and set `response_class=EventSourceResponse`.
|
||||
|
||||
Import `EventSourceResponse` from `fastapi.sse`:
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[1:25] hl[4,22] *}
|
||||
|
||||
Each yielded item is encoded as JSON and sent in the `data:` field of an SSE event.
|
||||
|
||||
If you declare the return type as `AsyncIterable[Item]`, FastAPI will use it to **validate**, **document**, and **serialize** the data using Pydantic.
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[1:25] hl[10:12,23] *}
|
||||
|
||||
/// tip
|
||||
|
||||
As Pydantic will serialize it in the **Rust** side, you will get much higher **performance** than if you don't declare a return type.
|
||||
|
||||
///
|
||||
|
||||
### Non-async *path operation functions* { #non-async-path-operation-functions }
|
||||
|
||||
You can also use regular `def` functions (without `async`), and use `yield` the same way.
|
||||
|
||||
FastAPI will make sure it's run correctly so that it doesn't block the event loop.
|
||||
|
||||
As in this case the function is not async, the right return type would be `Iterable[Item]`:
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[28:31] hl[29] *}
|
||||
|
||||
### No Return Type { #no-return-type }
|
||||
|
||||
You can also omit the return type. FastAPI will use the [`jsonable_encoder`](./encoder.md){.internal-link target=_blank} to convert the data and send it.
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *}
|
||||
|
||||
## `ServerSentEvent` { #serversentevent }
|
||||
|
||||
If you need to set SSE fields like `event`, `id`, `retry`, or `comment`, you can yield `ServerSentEvent` objects instead of plain data.
|
||||
|
||||
Import `ServerSentEvent` from `fastapi.sse`:
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *}
|
||||
|
||||
The `data` field is always encoded as JSON. You can pass any value that can be serialized as JSON, including Pydantic models.
|
||||
|
||||
## Raw Data { #raw-data }
|
||||
|
||||
If you need to send data **without** JSON encoding, use `raw_data` instead of `data`.
|
||||
|
||||
This is useful for sending pre-formatted text, log lines, or special <dfn title="A value used to indicate a special condition or state">"sentinel"</dfn> values like `[DONE]`.
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial003_py310.py hl[17] *}
|
||||
|
||||
/// note
|
||||
|
||||
`data` and `raw_data` are mutually exclusive. You can only set one of them on each `ServerSentEvent`.
|
||||
|
||||
///
|
||||
|
||||
## Resuming with `Last-Event-ID` { #resuming-with-last-event-id }
|
||||
|
||||
When a browser reconnects after a connection drop, it sends the last received `id` in the `Last-Event-ID` header.
|
||||
|
||||
You can read it as a header parameter and use it to resume the stream from where the client left off:
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial004_py310.py hl[25,27,31] *}
|
||||
|
||||
## SSE with POST { #sse-with-post }
|
||||
|
||||
SSE works with **any HTTP method**, not just `GET`.
|
||||
|
||||
This is useful for protocols like <a href="https://modelcontextprotocol.io" class="external-link" target="_blank">MCP</a> that stream SSE over `POST`:
|
||||
|
||||
{* ../../docs_src/server_sent_events/tutorial005_py310.py hl[14] *}
|
||||
|
||||
## Technical Details { #technical-details }
|
||||
|
||||
FastAPI implements some SSE best practices out of the box.
|
||||
|
||||
* Send a **"keep alive" `ping` comment** every 15 seconds when there hasn't been any message, to prevent some proxies from closing the connection, as suggested in the <a href="https://html.spec.whatwg.org/multipage/server-sent-events.html#authoring-notes" class="external-link" target="_blank">HTML specification: Server-Sent Events</a>.
|
||||
* Set the `Cache-Control: no-cache` header to **prevent caching** of the stream.
|
||||
* Set a special header `X-Accel-Buffering: no` to **prevent buffering** in some proxies like Nginx.
|
||||
|
||||
You don't have to do anything about it, it works out of the box. 🤓
|
||||
@@ -2,6 +2,12 @@
|
||||
|
||||
You could have a sequence of data that you would like to send in a "**stream**", you could do it with **JSON Lines**.
|
||||
|
||||
/// info
|
||||
|
||||
Added in FastAPI 0.134.0.
|
||||
|
||||
///
|
||||
|
||||
## What is a Stream? { #what-is-a-stream }
|
||||
|
||||
"**Streaming**" data means that your app will start sending data items to the client without waiting for the entire sequence of items to be ready.
|
||||
@@ -100,6 +106,6 @@ You can also omit the return type. FastAPI will then use the [`jsonable_encoder`
|
||||
|
||||
{* ../../docs_src/stream_json_lines/tutorial001_py310.py ln[33:36] hl[34] *}
|
||||
|
||||
## Server Sent Events (SSE) { #server-sent-events-sse }
|
||||
## Server-Sent Events (SSE) { #server-sent-events-sse }
|
||||
|
||||
A future version of FastAPI will also have first-class support for Server Sent Events (SSE), which are quite similar, but with a couple of extra details. 🤓
|
||||
FastAPI also has first-class support for Server-Sent Events (SSE), which are quite similar but with a couple of extra details. You can learn about them in the next chapter: [Server-Sent Events (SSE)](server-sent-events.md){.internal-link target=_blank}. 🤓
|
||||
|
||||
@@ -155,6 +155,7 @@ nav:
|
||||
- tutorial/sql-databases.md
|
||||
- tutorial/bigger-applications.md
|
||||
- tutorial/stream-json-lines.md
|
||||
- tutorial/server-sent-events.md
|
||||
- tutorial/background-tasks.md
|
||||
- tutorial/metadata.md
|
||||
- tutorial/static-files.md
|
||||
|
||||
0
docs_src/server_sent_events/__init__.py
Normal file
0
docs_src/server_sent_events/__init__.py
Normal file
43
docs_src/server_sent_events/tutorial001_py310.py
Normal file
43
docs_src/server_sent_events/tutorial001_py310.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from collections.abc import AsyncIterable, Iterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
description: str | None
|
||||
|
||||
|
||||
items = [
|
||||
Item(name="Plumbus", description="A multi-purpose household device."),
|
||||
Item(name="Portal Gun", description="A portal opening device."),
|
||||
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
|
||||
]
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def sse_items() -> AsyncIterable[Item]:
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
|
||||
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
|
||||
def sse_items_no_async() -> Iterable[Item]:
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
|
||||
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
|
||||
async def sse_items_no_annotation():
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
|
||||
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
|
||||
def sse_items_no_async_no_annotation():
|
||||
for item in items:
|
||||
yield item
|
||||
26
docs_src/server_sent_events/tutorial002_py310.py
Normal file
26
docs_src/server_sent_events/tutorial002_py310.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
price: float
|
||||
|
||||
|
||||
items = [
|
||||
Item(name="Plumbus", price=32.99),
|
||||
Item(name="Portal Gun", price=999.99),
|
||||
Item(name="Meeseeks Box", price=49.99),
|
||||
]
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def stream_items() -> AsyncIterable[ServerSentEvent]:
|
||||
yield ServerSentEvent(comment="stream of item updates")
|
||||
for i, item in enumerate(items):
|
||||
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
|
||||
17
docs_src/server_sent_events/tutorial003_py310.py
Normal file
17
docs_src/server_sent_events/tutorial003_py310.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/logs/stream", response_class=EventSourceResponse)
|
||||
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
|
||||
logs = [
|
||||
"2025-01-01 INFO Application started",
|
||||
"2025-01-01 DEBUG Connected to database",
|
||||
"2025-01-01 WARN High memory usage detected",
|
||||
]
|
||||
for log_line in logs:
|
||||
yield ServerSentEvent(raw_data=log_line)
|
||||
31
docs_src/server_sent_events/tutorial004_py310.py
Normal file
31
docs_src/server_sent_events/tutorial004_py310.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from collections.abc import AsyncIterable
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import FastAPI, Header
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
price: float
|
||||
|
||||
|
||||
items = [
|
||||
Item(name="Plumbus", price=32.99),
|
||||
Item(name="Portal Gun", price=999.99),
|
||||
Item(name="Meeseeks Box", price=49.99),
|
||||
]
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def stream_items(
|
||||
last_event_id: Annotated[int | None, Header()] = None,
|
||||
) -> AsyncIterable[ServerSentEvent]:
|
||||
start = last_event_id + 1 if last_event_id is not None else 0
|
||||
for i, item in enumerate(items):
|
||||
if i < start:
|
||||
continue
|
||||
yield ServerSentEvent(data=item, id=str(i))
|
||||
19
docs_src/server_sent_events/tutorial005_py310.py
Normal file
19
docs_src/server_sent_events/tutorial005_py310.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Prompt(BaseModel):
|
||||
text: str
|
||||
|
||||
|
||||
@app.post("/chat/stream", response_class=EventSourceResponse)
|
||||
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
|
||||
words = prompt.text.split()
|
||||
for word in words:
|
||||
yield ServerSentEvent(data=word, event="token")
|
||||
yield ServerSentEvent(raw_data="[DONE]", event="done")
|
||||
@@ -521,6 +521,56 @@ async def stream_items() -> AsyncIterable[Item]:
|
||||
yield item
|
||||
```
|
||||
|
||||
## Server-Sent Events (SSE)
|
||||
|
||||
To stream Server-Sent Events, use `response_class=EventSourceResponse` and `yield` items from the endpoint.
|
||||
|
||||
Plain objects are automatically JSON-serialized as `data:` fields, declare the return type so the serialization is done by Pydantic:
|
||||
|
||||
```python
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
price: float
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def stream_items() -> AsyncIterable[Item]:
|
||||
yield Item(name="Plumbus", price=32.99)
|
||||
yield Item(name="Portal Gun", price=999.99)
|
||||
```
|
||||
|
||||
For full control over SSE fields (`event`, `id`, `retry`, `comment`), yield `ServerSentEvent` instances:
|
||||
|
||||
```python
|
||||
from collections.abc import AsyncIterable
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/events", response_class=EventSourceResponse)
|
||||
async def stream_events() -> AsyncIterable[ServerSentEvent]:
|
||||
yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
|
||||
yield ServerSentEvent(data={"progress": 50}, event="progress", id="2")
|
||||
```
|
||||
|
||||
Use `raw_data` instead of `data` to send pre-formatted strings without JSON encoding:
|
||||
|
||||
```python
|
||||
yield ServerSentEvent(raw_data="plain text line", event="log")
|
||||
```
|
||||
|
||||
## Stream bytes
|
||||
|
||||
To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""FastAPI framework, high performance, easy to learn, fast to code, ready for production"""
|
||||
|
||||
__version__ = "0.134.0"
|
||||
__version__ = "0.135.0"
|
||||
|
||||
from starlette import status as status
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from fastapi.openapi.constants import METHODS_WITH_BODY, REF_PREFIX
|
||||
from fastapi.openapi.models import OpenAPI
|
||||
from fastapi.params import Body, ParamTypes
|
||||
from fastapi.responses import Response
|
||||
from fastapi.sse import _SSE_EVENT_SCHEMA
|
||||
from fastapi.types import ModelNameMap
|
||||
from fastapi.utils import (
|
||||
deep_dict_update,
|
||||
@@ -372,6 +373,26 @@ def get_openapi_path(
|
||||
operation.setdefault("responses", {}).setdefault(
|
||||
status_code, {}
|
||||
).setdefault("content", {})["application/jsonl"] = jsonl_content
|
||||
elif route.is_sse_stream:
|
||||
sse_content: dict[str, Any] = {}
|
||||
item_schema = copy.deepcopy(_SSE_EVENT_SCHEMA)
|
||||
if route.stream_item_field:
|
||||
content_schema = get_schema_from_model_field(
|
||||
field=route.stream_item_field,
|
||||
model_name_map=model_name_map,
|
||||
field_mapping=field_mapping,
|
||||
separate_input_output_schemas=separate_input_output_schemas,
|
||||
)
|
||||
item_schema["required"] = ["data"]
|
||||
item_schema["properties"]["data"] = {
|
||||
"type": "string",
|
||||
"contentMediaType": "application/json",
|
||||
"contentSchema": content_schema,
|
||||
}
|
||||
sse_content["itemSchema"] = item_schema
|
||||
operation.setdefault("responses", {}).setdefault(
|
||||
status_code, {}
|
||||
).setdefault("content", {})["text/event-stream"] = sse_content
|
||||
elif route_response_media_type:
|
||||
response_schema = {"type": "string"}
|
||||
if lenient_issubclass(current_response_class, JSONResponse):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from typing import Any
|
||||
|
||||
from fastapi.exceptions import FastAPIDeprecationWarning
|
||||
from fastapi.sse import EventSourceResponse as EventSourceResponse # noqa
|
||||
from starlette.responses import FileResponse as FileResponse # noqa
|
||||
from starlette.responses import HTMLResponse as HTMLResponse # noqa
|
||||
from starlette.responses import JSONResponse as JSONResponse # noqa
|
||||
|
||||
@@ -56,6 +56,13 @@ from fastapi.exceptions import (
|
||||
ResponseValidationError,
|
||||
WebSocketRequestValidationError,
|
||||
)
|
||||
from fastapi.sse import (
|
||||
_PING_INTERVAL,
|
||||
KEEPALIVE_COMMENT,
|
||||
EventSourceResponse,
|
||||
ServerSentEvent,
|
||||
format_sse_event,
|
||||
)
|
||||
from fastapi.types import DecoratedCallable, IncEx
|
||||
from fastapi.utils import (
|
||||
create_model_field,
|
||||
@@ -66,7 +73,7 @@ from fastapi.utils import (
|
||||
from starlette import routing
|
||||
from starlette._exception_handler import wrap_app_handling_exceptions
|
||||
from starlette._utils import is_async_callable
|
||||
from starlette.concurrency import run_in_threadpool
|
||||
from starlette.concurrency import iterate_in_threadpool, run_in_threadpool
|
||||
from starlette.exceptions import HTTPException
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse, Response, StreamingResponse
|
||||
@@ -361,6 +368,7 @@ def get_request_handler(
|
||||
actual_response_class: type[Response] = response_class.value
|
||||
else:
|
||||
actual_response_class = response_class
|
||||
is_sse_stream = lenient_issubclass(actual_response_class, EventSourceResponse)
|
||||
if isinstance(strict_content_type, DefaultPlaceholder):
|
||||
actual_strict_content_type: bool = strict_content_type.value
|
||||
else:
|
||||
@@ -452,35 +460,125 @@ def get_request_handler(
|
||||
errors = solved_result.errors
|
||||
assert dependant.call # For types
|
||||
if not errors:
|
||||
if is_json_stream:
|
||||
# Shared serializer for stream items (JSONL and SSE).
|
||||
# Validates against stream_item_field when set, then
|
||||
# serializes to JSON bytes.
|
||||
def _serialize_data(data: Any) -> bytes:
|
||||
if stream_item_field:
|
||||
value, errors_ = stream_item_field.validate(
|
||||
data, {}, loc=("response",)
|
||||
)
|
||||
if errors_:
|
||||
ctx = endpoint_ctx or EndpointContext()
|
||||
raise ResponseValidationError(
|
||||
errors=errors_,
|
||||
body=data,
|
||||
endpoint_ctx=ctx,
|
||||
)
|
||||
return stream_item_field.serialize_json(
|
||||
value,
|
||||
include=response_model_include,
|
||||
exclude=response_model_exclude,
|
||||
by_alias=response_model_by_alias,
|
||||
exclude_unset=response_model_exclude_unset,
|
||||
exclude_defaults=response_model_exclude_defaults,
|
||||
exclude_none=response_model_exclude_none,
|
||||
)
|
||||
else:
|
||||
data = jsonable_encoder(data)
|
||||
return json.dumps(data).encode("utf-8")
|
||||
|
||||
if is_sse_stream:
|
||||
# Generator endpoint: stream as Server-Sent Events
|
||||
gen = dependant.call(**solved_result.values)
|
||||
|
||||
def _serialize_sse_item(item: Any) -> bytes:
|
||||
if isinstance(item, ServerSentEvent):
|
||||
# User controls the event structure.
|
||||
# Serialize the data payload if present.
|
||||
# For ServerSentEvent items we skip stream_item_field
|
||||
# validation (the user may mix types intentionally).
|
||||
if item.raw_data is not None:
|
||||
data_str: str | None = item.raw_data
|
||||
elif item.data is not None:
|
||||
if hasattr(item.data, "model_dump_json"):
|
||||
data_str = item.data.model_dump_json()
|
||||
else:
|
||||
data_str = json.dumps(jsonable_encoder(item.data))
|
||||
else:
|
||||
data_str = None
|
||||
return format_sse_event(
|
||||
data_str=data_str,
|
||||
event=item.event,
|
||||
id=item.id,
|
||||
retry=item.retry,
|
||||
comment=item.comment,
|
||||
)
|
||||
else:
|
||||
# Plain object: validate + serialize via
|
||||
# stream_item_field (if set) and wrap in data field
|
||||
return format_sse_event(
|
||||
data_str=_serialize_data(item).decode("utf-8")
|
||||
)
|
||||
|
||||
if dependant.is_async_gen_callable:
|
||||
sse_aiter: AsyncIterator[Any] = gen.__aiter__()
|
||||
else:
|
||||
sse_aiter = iterate_in_threadpool(gen)
|
||||
|
||||
async def _async_stream_sse() -> AsyncIterator[bytes]:
|
||||
# Use a memory stream to decouple generator iteration
|
||||
# from the keepalive timer. A producer task pulls items
|
||||
# from the generator independently, so
|
||||
# `anyio.fail_after` never wraps the generator's
|
||||
# `__anext__` directly - avoiding CancelledError that
|
||||
# would finalize the generator and also working for sync
|
||||
# generators running in a thread pool.
|
||||
send_stream, receive_stream = anyio.create_memory_object_stream[
|
||||
bytes
|
||||
](max_buffer_size=1)
|
||||
|
||||
async def _producer() -> None:
|
||||
async with send_stream:
|
||||
async for raw_item in sse_aiter:
|
||||
await send_stream.send(_serialize_sse_item(raw_item))
|
||||
|
||||
async with anyio.create_task_group() as tg:
|
||||
tg.start_soon(_producer)
|
||||
async with receive_stream:
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
with anyio.fail_after(_PING_INTERVAL):
|
||||
data = await receive_stream.receive()
|
||||
yield data
|
||||
# To allow for cancellation to trigger
|
||||
# Ref: https://github.com/fastapi/fastapi/issues/14680
|
||||
await anyio.sleep(0)
|
||||
except TimeoutError:
|
||||
yield KEEPALIVE_COMMENT
|
||||
except anyio.EndOfStream:
|
||||
pass
|
||||
|
||||
sse_stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
|
||||
_async_stream_sse()
|
||||
)
|
||||
|
||||
response = StreamingResponse(
|
||||
sse_stream_content,
|
||||
media_type="text/event-stream",
|
||||
background=solved_result.background_tasks,
|
||||
)
|
||||
response.headers["Cache-Control"] = "no-cache"
|
||||
# For Nginx proxies to not buffer server sent events
|
||||
response.headers["X-Accel-Buffering"] = "no"
|
||||
response.headers.raw.extend(solved_result.response.headers.raw)
|
||||
elif is_json_stream:
|
||||
# Generator endpoint: stream as JSONL
|
||||
gen = dependant.call(**solved_result.values)
|
||||
|
||||
def _serialize_item(item: Any) -> bytes:
|
||||
if stream_item_field:
|
||||
value, errors = stream_item_field.validate(
|
||||
item, {}, loc=("response",)
|
||||
)
|
||||
if errors:
|
||||
ctx = endpoint_ctx or EndpointContext()
|
||||
raise ResponseValidationError(
|
||||
errors=errors,
|
||||
body=item,
|
||||
endpoint_ctx=ctx,
|
||||
)
|
||||
line = stream_item_field.serialize_json(
|
||||
value,
|
||||
include=response_model_include,
|
||||
exclude=response_model_exclude,
|
||||
by_alias=response_model_by_alias,
|
||||
exclude_unset=response_model_exclude_unset,
|
||||
exclude_defaults=response_model_exclude_defaults,
|
||||
exclude_none=response_model_exclude_none,
|
||||
)
|
||||
return line + b"\n"
|
||||
else:
|
||||
data = jsonable_encoder(item)
|
||||
return json.dumps(data).encode("utf-8") + b"\n"
|
||||
return _serialize_data(item) + b"\n"
|
||||
|
||||
if dependant.is_async_gen_callable:
|
||||
|
||||
@@ -491,7 +589,7 @@ def get_request_handler(
|
||||
# Ref: https://github.com/fastapi/fastapi/issues/14680
|
||||
await anyio.sleep(0)
|
||||
|
||||
stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
|
||||
jsonl_stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
|
||||
_async_stream_jsonl()
|
||||
)
|
||||
else:
|
||||
@@ -500,10 +598,10 @@ def get_request_handler(
|
||||
for item in gen:
|
||||
yield _serialize_item(item)
|
||||
|
||||
stream_content = _sync_stream_jsonl()
|
||||
jsonl_stream_content = _sync_stream_jsonl()
|
||||
|
||||
response = StreamingResponse(
|
||||
stream_content,
|
||||
jsonl_stream_content,
|
||||
media_type="application/jsonl",
|
||||
background=solved_result.background_tasks,
|
||||
)
|
||||
@@ -709,9 +807,16 @@ class APIRoute(routing.Route):
|
||||
else:
|
||||
stream_item = get_stream_item_type(return_annotation)
|
||||
if stream_item is not None:
|
||||
# Only extract item type for JSONL streaming when no
|
||||
# explicit response_class (e.g. StreamingResponse) was set
|
||||
if isinstance(response_class, DefaultPlaceholder):
|
||||
# Extract item type for JSONL or SSE streaming when
|
||||
# response_class is DefaultPlaceholder (JSONL) or
|
||||
# EventSourceResponse (SSE).
|
||||
# ServerSentEvent is excluded: it's a transport
|
||||
# wrapper, not a data model, so it shouldn't feed
|
||||
# into validation or OpenAPI schema generation.
|
||||
if (
|
||||
isinstance(response_class, DefaultPlaceholder)
|
||||
or lenient_issubclass(response_class, EventSourceResponse)
|
||||
) and not lenient_issubclass(stream_item, ServerSentEvent):
|
||||
self.stream_item_type = stream_item
|
||||
response_model = None
|
||||
else:
|
||||
@@ -814,11 +919,16 @@ class APIRoute(routing.Route):
|
||||
name=self.unique_id,
|
||||
embed_body_fields=self._embed_body_fields,
|
||||
)
|
||||
# Detect generator endpoints that should stream as JSONL
|
||||
# (only when no explicit response_class like StreamingResponse is set)
|
||||
self.is_json_stream = isinstance(response_class, DefaultPlaceholder) and (
|
||||
# Detect generator endpoints that should stream as JSONL or SSE
|
||||
is_generator = (
|
||||
self.dependant.is_async_gen_callable or self.dependant.is_gen_callable
|
||||
)
|
||||
self.is_sse_stream = is_generator and lenient_issubclass(
|
||||
response_class, EventSourceResponse
|
||||
)
|
||||
self.is_json_stream = is_generator and isinstance(
|
||||
response_class, DefaultPlaceholder
|
||||
)
|
||||
self.app = request_response(self.get_route_handler())
|
||||
|
||||
def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
|
||||
|
||||
222
fastapi/sse.py
Normal file
222
fastapi/sse.py
Normal file
@@ -0,0 +1,222 @@
|
||||
from typing import Annotated, Any
|
||||
|
||||
from annotated_doc import Doc
|
||||
from pydantic import AfterValidator, BaseModel, Field, model_validator
|
||||
from starlette.responses import StreamingResponse
|
||||
|
||||
# Canonical SSE event schema matching the OpenAPI 3.2 spec
|
||||
# (Section 4.14.4 "Special Considerations for Server-Sent Events")
|
||||
_SSE_EVENT_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {"type": "integer", "minimum": 0},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class EventSourceResponse(StreamingResponse):
|
||||
"""Streaming response with `text/event-stream` media type.
|
||||
|
||||
Use as `response_class=EventSourceResponse` on a *path operation* that uses `yield`
|
||||
to enable Server Sent Events (SSE) responses.
|
||||
|
||||
Works with **any HTTP method** (`GET`, `POST`, etc.), which makes it compatible
|
||||
with protocols like MCP that stream SSE over `POST`.
|
||||
|
||||
The actual encoding logic lives in the FastAPI routing layer. This class
|
||||
serves mainly as a marker and sets the correct `Content-Type`.
|
||||
"""
|
||||
|
||||
media_type = "text/event-stream"
|
||||
|
||||
|
||||
def _check_id_no_null(v: str | None) -> str | None:
|
||||
if v is not None and "\0" in v:
|
||||
raise ValueError("SSE 'id' must not contain null characters")
|
||||
return v
|
||||
|
||||
|
||||
class ServerSentEvent(BaseModel):
|
||||
"""Represents a single Server-Sent Event.
|
||||
|
||||
When `yield`ed from a *path operation function* that uses
|
||||
`response_class=EventSourceResponse`, each `ServerSentEvent` is encoded
|
||||
into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)
|
||||
(`text/event-stream`).
|
||||
|
||||
If you yield a plain object (dict, Pydantic model, etc.) instead, it is
|
||||
automatically JSON-encoded and sent as the `data:` field.
|
||||
|
||||
All `data` values **including plain strings** are JSON-serialized.
|
||||
|
||||
For example, `data="hello"` produces `data: "hello"` on the wire (with
|
||||
quotes).
|
||||
"""
|
||||
|
||||
data: Annotated[
|
||||
Any,
|
||||
Doc(
|
||||
"""
|
||||
The event payload.
|
||||
|
||||
Can be any JSON-serializable value: a Pydantic model, dict, list,
|
||||
string, number, etc. It is **always** serialized to JSON: strings
|
||||
are quoted (`"hello"` becomes `data: "hello"` on the wire).
|
||||
|
||||
Mutually exclusive with `raw_data`.
|
||||
"""
|
||||
),
|
||||
] = None
|
||||
raw_data: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Raw string to send as the `data:` field **without** JSON encoding.
|
||||
|
||||
Use this when you need to send pre-formatted text, HTML fragments,
|
||||
CSV lines, or any non-JSON payload. The string is placed directly
|
||||
into the `data:` field as-is.
|
||||
|
||||
Mutually exclusive with `data`.
|
||||
"""
|
||||
),
|
||||
] = None
|
||||
event: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Optional event type name.
|
||||
|
||||
Maps to `addEventListener(event, ...)` on the browser. When omitted,
|
||||
the browser dispatches on the generic `message` event.
|
||||
"""
|
||||
),
|
||||
] = None
|
||||
id: Annotated[
|
||||
str | None,
|
||||
AfterValidator(_check_id_no_null),
|
||||
Doc(
|
||||
"""
|
||||
Optional event ID.
|
||||
|
||||
The browser sends this value back as the `Last-Event-ID` header on
|
||||
automatic reconnection. **Must not contain null (`\\0`) characters.**
|
||||
"""
|
||||
),
|
||||
] = None
|
||||
retry: Annotated[
|
||||
int | None,
|
||||
Field(ge=0),
|
||||
Doc(
|
||||
"""
|
||||
Optional reconnection time in **milliseconds**.
|
||||
|
||||
Tells the browser how long to wait before reconnecting after the
|
||||
connection is lost. Must be a non-negative integer.
|
||||
"""
|
||||
),
|
||||
] = None
|
||||
comment: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Optional comment line(s).
|
||||
|
||||
Comment lines start with `:` in the SSE wire format and are ignored by
|
||||
`EventSource` clients. Useful for keep-alive pings to prevent
|
||||
proxy/load-balancer timeouts.
|
||||
"""
|
||||
),
|
||||
] = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _check_data_exclusive(self) -> "ServerSentEvent":
|
||||
if self.data is not None and self.raw_data is not None:
|
||||
raise ValueError(
|
||||
"Cannot set both 'data' and 'raw_data' on the same "
|
||||
"ServerSentEvent. Use 'data' for JSON-serialized payloads "
|
||||
"or 'raw_data' for pre-formatted strings."
|
||||
)
|
||||
return self
|
||||
|
||||
|
||||
def format_sse_event(
|
||||
*,
|
||||
data_str: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Pre-serialized data string to use as the `data:` field.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
event: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Optional event type name (`event:` field).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
id: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Optional event ID (`id:` field).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
retry: Annotated[
|
||||
int | None,
|
||||
Doc(
|
||||
"""
|
||||
Optional reconnection time in milliseconds (`retry:` field).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
comment: Annotated[
|
||||
str | None,
|
||||
Doc(
|
||||
"""
|
||||
Optional comment line(s) (`:` prefix).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
) -> bytes:
|
||||
"""Build SSE wire-format bytes from **pre-serialized** data.
|
||||
|
||||
The result always ends with `\n\n` (the event terminator).
|
||||
"""
|
||||
lines: list[str] = []
|
||||
|
||||
if comment is not None:
|
||||
for line in comment.splitlines():
|
||||
lines.append(f": {line}")
|
||||
|
||||
if event is not None:
|
||||
lines.append(f"event: {event}")
|
||||
|
||||
if data_str is not None:
|
||||
for line in data_str.splitlines():
|
||||
lines.append(f"data: {line}")
|
||||
|
||||
if id is not None:
|
||||
lines.append(f"id: {id}")
|
||||
|
||||
if retry is not None:
|
||||
lines.append(f"retry: {retry}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
return "\n".join(lines).encode("utf-8")
|
||||
|
||||
|
||||
# Keep-alive comment, per the SSE spec recommendation
|
||||
KEEPALIVE_COMMENT = b": ping\n\n"
|
||||
|
||||
# Seconds between keep-alive pings when a generator is idle.
|
||||
# Private but importable so tests can monkeypatch it.
|
||||
_PING_INTERVAL: float = 15.0
|
||||
@@ -324,6 +324,7 @@ ignore = [
|
||||
"docs_src/stream_json_lines/tutorial001_py310.py" = ["UP028"]
|
||||
"docs_src/stream_data/tutorial001_py310.py" = ["UP028"]
|
||||
"docs_src/stream_data/tutorial002_py310.py" = ["UP028"]
|
||||
"docs_src/server_sent_events/tutorial001_py310.py" = ["UP028"]
|
||||
|
||||
[tool.ruff.lint.isort]
|
||||
known-third-party = ["fastapi", "pydantic", "starlette"]
|
||||
|
||||
318
tests/test_sse.py
Normal file
318
tests/test_sse.py
Normal file
@@ -0,0 +1,318 @@
|
||||
import asyncio
|
||||
import time
|
||||
from collections.abc import AsyncIterable, Iterable
|
||||
|
||||
import fastapi.routing
|
||||
import pytest
|
||||
from fastapi import APIRouter, FastAPI
|
||||
from fastapi.responses import EventSourceResponse
|
||||
from fastapi.sse import ServerSentEvent
|
||||
from fastapi.testclient import TestClient
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
description: str | None = None
|
||||
|
||||
|
||||
items = [
|
||||
Item(name="Plumbus", description="A multi-purpose household device."),
|
||||
Item(name="Portal Gun", description="A portal opening device."),
|
||||
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
|
||||
]
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/items/stream", response_class=EventSourceResponse)
|
||||
async def sse_items() -> AsyncIterable[Item]:
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
|
||||
@app.get("/items/stream-sync", response_class=EventSourceResponse)
|
||||
def sse_items_sync() -> Iterable[Item]:
|
||||
yield from items
|
||||
|
||||
|
||||
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
|
||||
async def sse_items_no_annotation():
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
|
||||
@app.get("/items/stream-sync-no-annotation", response_class=EventSourceResponse)
|
||||
def sse_items_sync_no_annotation():
|
||||
yield from items
|
||||
|
||||
|
||||
@app.get("/items/stream-dict", response_class=EventSourceResponse)
|
||||
async def sse_items_dict():
|
||||
for item in items:
|
||||
yield {"name": item.name, "description": item.description}
|
||||
|
||||
|
||||
@app.get("/items/stream-sse-event", response_class=EventSourceResponse)
|
||||
async def sse_items_event():
|
||||
yield ServerSentEvent(data="hello", event="greeting", id="1")
|
||||
yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2")
|
||||
yield ServerSentEvent(comment="just a comment")
|
||||
yield ServerSentEvent(data="retry-test", retry=5000)
|
||||
|
||||
|
||||
@app.get("/items/stream-mixed", response_class=EventSourceResponse)
|
||||
async def sse_items_mixed() -> AsyncIterable[Item]:
|
||||
yield items[0]
|
||||
yield ServerSentEvent(data="custom-event", event="special")
|
||||
yield items[1]
|
||||
|
||||
|
||||
@app.get("/items/stream-string", response_class=EventSourceResponse)
|
||||
async def sse_items_string():
|
||||
yield ServerSentEvent(data="plain text data")
|
||||
|
||||
|
||||
@app.post("/items/stream-post", response_class=EventSourceResponse)
|
||||
async def sse_items_post() -> AsyncIterable[Item]:
|
||||
for item in items:
|
||||
yield item
|
||||
|
||||
|
||||
@app.get("/items/stream-raw", response_class=EventSourceResponse)
|
||||
async def sse_items_raw():
|
||||
yield ServerSentEvent(raw_data="plain text without quotes")
|
||||
yield ServerSentEvent(raw_data="<div>html fragment</div>", event="html")
|
||||
yield ServerSentEvent(raw_data="cpu,87.3,1709145600", event="csv")
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/events", response_class=EventSourceResponse)
|
||||
async def stream_events():
|
||||
yield {"msg": "hello"}
|
||||
yield {"msg": "world"}
|
||||
|
||||
|
||||
app.include_router(router, prefix="/api")
|
||||
|
||||
|
||||
@pytest.fixture(name="client")
|
||||
def client_fixture():
|
||||
with TestClient(app) as c:
|
||||
yield c
|
||||
|
||||
|
||||
def test_async_generator_with_model(client: TestClient):
|
||||
response = client.get("/items/stream")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
assert response.headers["cache-control"] == "no-cache"
|
||||
assert response.headers["x-accel-buffering"] == "no"
|
||||
|
||||
lines = response.text.strip().split("\n")
|
||||
data_lines = [line for line in lines if line.startswith("data: ")]
|
||||
assert len(data_lines) == 3
|
||||
assert '"name":"Plumbus"' in data_lines[0] or '"name": "Plumbus"' in data_lines[0]
|
||||
assert (
|
||||
'"name":"Portal Gun"' in data_lines[1]
|
||||
or '"name": "Portal Gun"' in data_lines[1]
|
||||
)
|
||||
assert (
|
||||
'"name":"Meeseeks Box"' in data_lines[2]
|
||||
or '"name": "Meeseeks Box"' in data_lines[2]
|
||||
)
|
||||
|
||||
|
||||
def test_sync_generator_with_model(client: TestClient):
|
||||
response = client.get("/items/stream-sync")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
|
||||
def test_async_generator_no_annotation(client: TestClient):
|
||||
response = client.get("/items/stream-no-annotation")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
|
||||
def test_sync_generator_no_annotation(client: TestClient):
|
||||
response = client.get("/items/stream-sync-no-annotation")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
|
||||
def test_dict_items(client: TestClient):
|
||||
response = client.get("/items/stream-dict")
|
||||
assert response.status_code == 200
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
assert '"name"' in data_lines[0]
|
||||
|
||||
|
||||
def test_post_method_sse(client: TestClient):
|
||||
"""SSE should work with POST (needed for MCP compatibility)."""
|
||||
response = client.post("/items/stream-post")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
|
||||
def test_sse_events_with_fields(client: TestClient):
|
||||
response = client.get("/items/stream-sse-event")
|
||||
assert response.status_code == 200
|
||||
text = response.text
|
||||
|
||||
assert "event: greeting\n" in text
|
||||
assert 'data: "hello"\n' in text
|
||||
assert "id: 1\n" in text
|
||||
|
||||
assert "event: json-data\n" in text
|
||||
assert "id: 2\n" in text
|
||||
assert 'data: {"key": "value"}\n' in text
|
||||
|
||||
assert ": just a comment\n" in text
|
||||
|
||||
assert "retry: 5000\n" in text
|
||||
assert 'data: "retry-test"\n' in text
|
||||
|
||||
|
||||
def test_mixed_plain_and_sse_events(client: TestClient):
|
||||
response = client.get("/items/stream-mixed")
|
||||
assert response.status_code == 200
|
||||
text = response.text
|
||||
|
||||
assert "event: special\n" in text
|
||||
assert 'data: "custom-event"\n' in text
|
||||
assert '"name"' in text
|
||||
|
||||
|
||||
def test_string_data_json_encoded(client: TestClient):
|
||||
"""Strings are always JSON-encoded (quoted)."""
|
||||
response = client.get("/items/stream-string")
|
||||
assert response.status_code == 200
|
||||
assert 'data: "plain text data"\n' in response.text
|
||||
|
||||
|
||||
def test_server_sent_event_null_id_rejected():
|
||||
with pytest.raises(ValueError, match="null"):
|
||||
ServerSentEvent(data="test", id="has\0null")
|
||||
|
||||
|
||||
def test_server_sent_event_negative_retry_rejected():
|
||||
with pytest.raises(ValueError):
|
||||
ServerSentEvent(data="test", retry=-1)
|
||||
|
||||
|
||||
def test_server_sent_event_float_retry_rejected():
|
||||
with pytest.raises(ValueError):
|
||||
ServerSentEvent(data="test", retry=1.5) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_raw_data_sent_without_json_encoding(client: TestClient):
|
||||
"""raw_data is sent as-is, not JSON-encoded."""
|
||||
response = client.get("/items/stream-raw")
|
||||
assert response.status_code == 200
|
||||
text = response.text
|
||||
|
||||
# raw_data should appear without JSON quotes
|
||||
assert "data: plain text without quotes\n" in text
|
||||
# Not JSON-quoted
|
||||
assert 'data: "plain text without quotes"' not in text
|
||||
|
||||
assert "event: html\n" in text
|
||||
assert "data: <div>html fragment</div>\n" in text
|
||||
|
||||
assert "event: csv\n" in text
|
||||
assert "data: cpu,87.3,1709145600\n" in text
|
||||
|
||||
|
||||
def test_data_and_raw_data_mutually_exclusive():
|
||||
"""Cannot set both data and raw_data."""
|
||||
with pytest.raises(ValueError, match="Cannot set both"):
|
||||
ServerSentEvent(data="json", raw_data="raw")
|
||||
|
||||
|
||||
def test_sse_on_router_included_in_app(client: TestClient):
|
||||
response = client.get("/api/events")
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 2
|
||||
|
||||
|
||||
# Keepalive ping tests
|
||||
|
||||
|
||||
keepalive_app = FastAPI()
|
||||
|
||||
|
||||
@keepalive_app.get("/slow-async", response_class=EventSourceResponse)
|
||||
async def slow_async_stream():
|
||||
yield {"n": 1}
|
||||
# Sleep longer than the (monkeypatched) ping interval so a keepalive
|
||||
# comment is emitted before the next item.
|
||||
await asyncio.sleep(0.3)
|
||||
yield {"n": 2}
|
||||
|
||||
|
||||
@keepalive_app.get("/slow-sync", response_class=EventSourceResponse)
|
||||
def slow_sync_stream():
|
||||
yield {"n": 1}
|
||||
time.sleep(0.3)
|
||||
yield {"n": 2}
|
||||
|
||||
|
||||
def test_keepalive_ping_async(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05)
|
||||
with TestClient(keepalive_app) as c:
|
||||
response = c.get("/slow-async")
|
||||
assert response.status_code == 200
|
||||
text = response.text
|
||||
# The keepalive comment ": ping" should appear between the two data events
|
||||
assert ": ping\n" in text
|
||||
data_lines = [line for line in text.split("\n") if line.startswith("data: ")]
|
||||
assert len(data_lines) == 2
|
||||
|
||||
|
||||
def test_keepalive_ping_sync(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05)
|
||||
with TestClient(keepalive_app) as c:
|
||||
response = c.get("/slow-sync")
|
||||
assert response.status_code == 200
|
||||
text = response.text
|
||||
assert ": ping\n" in text
|
||||
data_lines = [line for line in text.split("\n") if line.startswith("data: ")]
|
||||
assert len(data_lines) == 2
|
||||
|
||||
|
||||
def test_no_keepalive_when_fast(client: TestClient):
|
||||
"""No keepalive comment when items arrive quickly."""
|
||||
response = client.get("/items/stream")
|
||||
assert response.status_code == 200
|
||||
# KEEPALIVE_COMMENT is ": ping\n\n".
|
||||
assert ": ping\n" not in response.text
|
||||
191
tests/test_tutorial/test_server_sent_events/test_tutorial001.py
Normal file
191
tests/test_tutorial/test_server_sent_events/test_tutorial001.py
Normal file
@@ -0,0 +1,191 @@
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from inline_snapshot import snapshot
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="client",
|
||||
params=[
|
||||
pytest.param("tutorial001_py310"),
|
||||
],
|
||||
)
|
||||
def get_client(request: pytest.FixtureRequest):
|
||||
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
|
||||
|
||||
client = TestClient(mod.app)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path",
|
||||
[
|
||||
"/items/stream",
|
||||
"/items/stream-no-async",
|
||||
"/items/stream-no-annotation",
|
||||
"/items/stream-no-async-no-annotation",
|
||||
],
|
||||
)
|
||||
def test_stream_items(client: TestClient, path: str):
|
||||
response = client.get(path)
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
|
||||
def test_openapi_schema(client: TestClient):
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == snapshot(
|
||||
{
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/items/stream": {
|
||||
"get": {
|
||||
"summary": "Sse Items",
|
||||
"operationId": "sse_items_items_stream_get",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {
|
||||
"type": "string",
|
||||
"contentMediaType": "application/json",
|
||||
"contentSchema": {
|
||||
"$ref": "#/components/schemas/Item"
|
||||
},
|
||||
},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
"required": ["data"],
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"/items/stream-no-async": {
|
||||
"get": {
|
||||
"summary": "Sse Items No Async",
|
||||
"operationId": "sse_items_no_async_items_stream_no_async_get",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {
|
||||
"type": "string",
|
||||
"contentMediaType": "application/json",
|
||||
"contentSchema": {
|
||||
"$ref": "#/components/schemas/Item"
|
||||
},
|
||||
},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
"required": ["data"],
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"/items/stream-no-annotation": {
|
||||
"get": {
|
||||
"summary": "Sse Items No Annotation",
|
||||
"operationId": "sse_items_no_annotation_items_stream_no_annotation_get",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"/items/stream-no-async-no-annotation": {
|
||||
"get": {
|
||||
"summary": "Sse Items No Async No Annotation",
|
||||
"operationId": "sse_items_no_async_no_annotation_items_stream_no_async_no_annotation_get",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
"components": {
|
||||
"schemas": {
|
||||
"Item": {
|
||||
"properties": {
|
||||
"name": {"type": "string", "title": "Name"},
|
||||
"description": {
|
||||
"anyOf": [
|
||||
{"type": "string"},
|
||||
{"type": "null"},
|
||||
],
|
||||
"title": "Description",
|
||||
},
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["name", "description"],
|
||||
"title": "Item",
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -0,0 +1,83 @@
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from inline_snapshot import snapshot
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="client",
|
||||
params=[
|
||||
pytest.param("tutorial002_py310"),
|
||||
],
|
||||
)
|
||||
def get_client(request: pytest.FixtureRequest):
|
||||
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
|
||||
client = TestClient(mod.app)
|
||||
return client
|
||||
|
||||
|
||||
def test_stream_items(client: TestClient):
|
||||
response = client.get("/items/stream")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
|
||||
lines = response.text.strip().split("\n")
|
||||
|
||||
# First event is a comment-only event
|
||||
assert lines[0] == ": stream of item updates"
|
||||
|
||||
# Remaining lines contain event:, data:, id:, retry: fields
|
||||
event_lines = [line for line in lines if line.startswith("event: ")]
|
||||
assert len(event_lines) == 3
|
||||
assert all(line == "event: item_update" for line in event_lines)
|
||||
|
||||
data_lines = [line for line in lines if line.startswith("data: ")]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
id_lines = [line for line in lines if line.startswith("id: ")]
|
||||
assert id_lines == ["id: 1", "id: 2", "id: 3"]
|
||||
|
||||
retry_lines = [line for line in lines if line.startswith("retry: ")]
|
||||
assert len(retry_lines) == 3
|
||||
assert all(line == "retry: 5000" for line in retry_lines)
|
||||
|
||||
|
||||
def test_openapi_schema(client: TestClient):
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == snapshot(
|
||||
{
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/items/stream": {
|
||||
"get": {
|
||||
"summary": "Stream Items",
|
||||
"operationId": "stream_items_items_stream_get",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -0,0 +1,73 @@
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from inline_snapshot import snapshot
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="client",
|
||||
params=[
|
||||
pytest.param("tutorial003_py310"),
|
||||
],
|
||||
)
|
||||
def get_client(request: pytest.FixtureRequest):
|
||||
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
|
||||
client = TestClient(mod.app)
|
||||
return client
|
||||
|
||||
|
||||
def test_stream_logs(client: TestClient):
|
||||
response = client.get("/logs/stream")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
# raw_data is sent without JSON encoding (no quotes around the string)
|
||||
assert data_lines[0] == "data: 2025-01-01 INFO Application started"
|
||||
assert data_lines[1] == "data: 2025-01-01 DEBUG Connected to database"
|
||||
assert data_lines[2] == "data: 2025-01-01 WARN High memory usage detected"
|
||||
|
||||
|
||||
def test_openapi_schema(client: TestClient):
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == snapshot(
|
||||
{
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/logs/stream": {
|
||||
"get": {
|
||||
"summary": "Stream Logs",
|
||||
"operationId": "stream_logs_logs_stream_get",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
164
tests/test_tutorial/test_server_sent_events/test_tutorial004.py
Normal file
164
tests/test_tutorial/test_server_sent_events/test_tutorial004.py
Normal file
@@ -0,0 +1,164 @@
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from inline_snapshot import snapshot
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="client",
|
||||
params=[
|
||||
pytest.param("tutorial004_py310"),
|
||||
],
|
||||
)
|
||||
def get_client(request: pytest.FixtureRequest):
|
||||
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
|
||||
client = TestClient(mod.app)
|
||||
return client
|
||||
|
||||
|
||||
def test_stream_all_items(client: TestClient):
|
||||
response = client.get("/items/stream")
|
||||
assert response.status_code == 200, response.text
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 3
|
||||
|
||||
id_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("id: ")
|
||||
]
|
||||
assert id_lines == ["id: 0", "id: 1", "id: 2"]
|
||||
|
||||
|
||||
def test_resume_from_last_event_id(client: TestClient):
|
||||
response = client.get(
|
||||
"/items/stream",
|
||||
headers={"last-event-id": "0"},
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 2
|
||||
|
||||
id_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("id: ")
|
||||
]
|
||||
assert id_lines == ["id: 1", "id: 2"]
|
||||
|
||||
|
||||
def test_resume_from_last_item(client: TestClient):
|
||||
response = client.get(
|
||||
"/items/stream",
|
||||
headers={"last-event-id": "1"},
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
|
||||
data_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("data: ")
|
||||
]
|
||||
assert len(data_lines) == 1
|
||||
|
||||
id_lines = [
|
||||
line for line in response.text.strip().split("\n") if line.startswith("id: ")
|
||||
]
|
||||
assert id_lines == ["id: 2"]
|
||||
|
||||
|
||||
def test_openapi_schema(client: TestClient):
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == snapshot(
|
||||
{
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/items/stream": {
|
||||
"get": {
|
||||
"summary": "Stream Items",
|
||||
"operationId": "stream_items_items_stream_get",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "last-event-id",
|
||||
"in": "header",
|
||||
"required": False,
|
||||
"schema": {
|
||||
"anyOf": [{"type": "integer"}, {"type": "null"}],
|
||||
"title": "Last-Event-Id",
|
||||
},
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
"422": {
|
||||
"description": "Validation Error",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/HTTPValidationError"
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"schemas": {
|
||||
"HTTPValidationError": {
|
||||
"properties": {
|
||||
"detail": {
|
||||
"items": {
|
||||
"$ref": "#/components/schemas/ValidationError"
|
||||
},
|
||||
"type": "array",
|
||||
"title": "Detail",
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"title": "HTTPValidationError",
|
||||
},
|
||||
"ValidationError": {
|
||||
"properties": {
|
||||
"loc": {
|
||||
"items": {
|
||||
"anyOf": [{"type": "string"}, {"type": "integer"}]
|
||||
},
|
||||
"type": "array",
|
||||
"title": "Location",
|
||||
},
|
||||
"msg": {"type": "string", "title": "Message"},
|
||||
"type": {"type": "string", "title": "Error Type"},
|
||||
"input": {"title": "Input"},
|
||||
"ctx": {"type": "object", "title": "Context"},
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["loc", "msg", "type"],
|
||||
"title": "ValidationError",
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
141
tests/test_tutorial/test_server_sent_events/test_tutorial005.py
Normal file
141
tests/test_tutorial/test_server_sent_events/test_tutorial005.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from inline_snapshot import snapshot
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="client",
|
||||
params=[
|
||||
pytest.param("tutorial005_py310"),
|
||||
],
|
||||
)
|
||||
def get_client(request: pytest.FixtureRequest):
|
||||
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
|
||||
client = TestClient(mod.app)
|
||||
return client
|
||||
|
||||
|
||||
def test_stream_chat(client: TestClient):
|
||||
response = client.post(
|
||||
"/chat/stream",
|
||||
json={"text": "hello world"},
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
|
||||
|
||||
lines = response.text.strip().split("\n")
|
||||
|
||||
event_lines = [line for line in lines if line.startswith("event: ")]
|
||||
assert event_lines == [
|
||||
"event: token",
|
||||
"event: token",
|
||||
"event: done",
|
||||
]
|
||||
|
||||
data_lines = [line for line in lines if line.startswith("data: ")]
|
||||
assert data_lines == [
|
||||
'data: "hello"',
|
||||
'data: "world"',
|
||||
"data: [DONE]",
|
||||
]
|
||||
|
||||
|
||||
def test_openapi_schema(client: TestClient):
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json() == snapshot(
|
||||
{
|
||||
"openapi": "3.1.0",
|
||||
"info": {"title": "FastAPI", "version": "0.1.0"},
|
||||
"paths": {
|
||||
"/chat/stream": {
|
||||
"post": {
|
||||
"summary": "Stream Chat",
|
||||
"operationId": "stream_chat_chat_stream_post",
|
||||
"requestBody": {
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {"$ref": "#/components/schemas/Prompt"}
|
||||
}
|
||||
},
|
||||
"required": True,
|
||||
},
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"text/event-stream": {
|
||||
"itemSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {"type": "string"},
|
||||
"event": {"type": "string"},
|
||||
"id": {"type": "string"},
|
||||
"retry": {
|
||||
"type": "integer",
|
||||
"minimum": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
"422": {
|
||||
"description": "Validation Error",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/HTTPValidationError"
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"schemas": {
|
||||
"HTTPValidationError": {
|
||||
"properties": {
|
||||
"detail": {
|
||||
"items": {
|
||||
"$ref": "#/components/schemas/ValidationError"
|
||||
},
|
||||
"type": "array",
|
||||
"title": "Detail",
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"title": "HTTPValidationError",
|
||||
},
|
||||
"Prompt": {
|
||||
"properties": {"text": {"type": "string", "title": "Text"}},
|
||||
"type": "object",
|
||||
"required": ["text"],
|
||||
"title": "Prompt",
|
||||
},
|
||||
"ValidationError": {
|
||||
"properties": {
|
||||
"loc": {
|
||||
"items": {
|
||||
"anyOf": [{"type": "string"}, {"type": "integer"}]
|
||||
},
|
||||
"type": "array",
|
||||
"title": "Location",
|
||||
},
|
||||
"msg": {"type": "string", "title": "Message"},
|
||||
"type": {"type": "string", "title": "Error Type"},
|
||||
"input": {"title": "Input"},
|
||||
"ctx": {"type": "object", "title": "Context"},
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["loc", "msg", "type"],
|
||||
"title": "ValidationError",
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
Reference in New Issue
Block a user