Compare commits

..

1 Commits

Author SHA1 Message Date
github-actions[bot]
b537d6858a 👥 Update FastAPI People - Contributors and Translators 2026-03-01 03:58:37 +00:00
31 changed files with 366 additions and 2052 deletions

View File

@@ -1,13 +1,18 @@
tiangolo:
login: tiangolo
count: 871
count: 922
avatarUrl: https://avatars.githubusercontent.com/u/1326112?u=cb5d06e73a9e1998141b1641aa88e443c6717651&v=4
url: https://github.com/tiangolo
dependabot:
login: dependabot
count: 133
count: 142
avatarUrl: https://avatars.githubusercontent.com/in/29110?v=4
url: https://github.com/apps/dependabot
YuriiMotov:
login: YuriiMotov
count: 57
avatarUrl: https://avatars.githubusercontent.com/u/109919500?u=bc48be95c429989224786106b027f3c5e40cc354&v=4
url: https://github.com/YuriiMotov
alejsdev:
login: alejsdev
count: 53
@@ -18,11 +23,6 @@ pre-commit-ci:
count: 50
avatarUrl: https://avatars.githubusercontent.com/in/68672?v=4
url: https://github.com/apps/pre-commit-ci
YuriiMotov:
login: YuriiMotov
count: 38
avatarUrl: https://avatars.githubusercontent.com/u/109919500?u=bc48be95c429989224786106b027f3c5e40cc354&v=4
url: https://github.com/YuriiMotov
github-actions:
login: github-actions
count: 26
@@ -33,16 +33,16 @@ Kludex:
count: 25
avatarUrl: https://avatars.githubusercontent.com/u/7353520?u=df8a3f06ba8f55ae1967a3e2d5ed882903a4e330&v=4
url: https://github.com/Kludex
svlandeg:
login: svlandeg
count: 18
avatarUrl: https://avatars.githubusercontent.com/u/8796347?u=556c97650c27021911b0b9447ec55e75987b0e8a&v=4
url: https://github.com/svlandeg
dmontagu:
login: dmontagu
count: 17
avatarUrl: https://avatars.githubusercontent.com/u/35119617?u=540f30c937a6450812628b9592a1dfe91bbe148e&v=4
url: https://github.com/dmontagu
svlandeg:
login: svlandeg
count: 17
avatarUrl: https://avatars.githubusercontent.com/u/8796347?u=556c97650c27021911b0b9447ec55e75987b0e8a&v=4
url: https://github.com/svlandeg
nilslindemann:
login: nilslindemann
count: 15
@@ -148,6 +148,11 @@ AlexWendland:
count: 4
avatarUrl: https://avatars.githubusercontent.com/u/3949212?u=c4c0c615e0ea33d00bfe16b779cf6ebc0f58071c&v=4
url: https://github.com/AlexWendland
valentinDruzhinin:
login: valentinDruzhinin
count: 4
avatarUrl: https://avatars.githubusercontent.com/u/12831905?u=aae1ebc675c91e8fa582df4fcc4fc4128106344d&v=4
url: https://github.com/valentinDruzhinin
divums:
login: divums
count: 3
@@ -283,11 +288,6 @@ hamidrasti:
count: 3
avatarUrl: https://avatars.githubusercontent.com/u/43915620?v=4
url: https://github.com/hamidrasti
valentinDruzhinin:
login: valentinDruzhinin
count: 3
avatarUrl: https://avatars.githubusercontent.com/u/12831905?u=aae1ebc675c91e8fa582df4fcc4fc4128106344d&v=4
url: https://github.com/valentinDruzhinin
kkinder:
login: kkinder
count: 2
@@ -521,7 +521,7 @@ s111d:
estebanx64:
login: estebanx64
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/10840422?u=812422ae5d6a4bc5ff331c901fc54f9ab3cecf5c&v=4
avatarUrl: https://avatars.githubusercontent.com/u/10840422?u=2ca073ee47a625e495a9573bd374ddcd7be5ec91&v=4
url: https://github.com/estebanx64
ndimares:
login: ndimares
@@ -573,3 +573,8 @@ Taoup:
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/22348542?v=4
url: https://github.com/Taoup
jonathan-fulton:
login: jonathan-fulton
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/4665111?u=bda1c12e5137bd7771a6aa24d9515b87c11da150&v=4
url: https://github.com/jonathan-fulton

View File

@@ -31,7 +31,7 @@ hard-coders:
hasansezertasan:
login: hasansezertasan
count: 95
avatarUrl: https://avatars.githubusercontent.com/u/13135006?u=99f0b0f0fc47e88e8abb337b4447357939ef93e7&v=4
avatarUrl: https://avatars.githubusercontent.com/u/13135006?u=d36995e41a00590da64e6204cfd112e0484ac1ca&v=4
url: https://github.com/hasansezertasan
alv2017:
login: alv2017
@@ -43,21 +43,31 @@ nazarepiedady:
count: 87
avatarUrl: https://avatars.githubusercontent.com/u/31008635?u=f69ddc4ea8bda3bdfac7aa0e2ea38de282e6ee2d&v=4
url: https://github.com/nazarepiedady
tiangolo:
login: tiangolo
count: 82
avatarUrl: https://avatars.githubusercontent.com/u/1326112?u=cb5d06e73a9e1998141b1641aa88e443c6717651&v=4
url: https://github.com/tiangolo
AlertRED:
login: AlertRED
count: 81
avatarUrl: https://avatars.githubusercontent.com/u/15695000?u=f5a4944c6df443030409c88da7d7fa0b7ead985c&v=4
url: https://github.com/AlertRED
tiangolo:
login: tiangolo
count: 78
avatarUrl: https://avatars.githubusercontent.com/u/1326112?u=cb5d06e73a9e1998141b1641aa88e443c6717651&v=4
url: https://github.com/tiangolo
Alexandrhub:
login: Alexandrhub
count: 68
avatarUrl: https://avatars.githubusercontent.com/u/119126536?u=9fc0d48f3307817bafecc5861eb2168401a6cb04&v=4
url: https://github.com/Alexandrhub
nilslindemann:
login: nilslindemann
count: 67
avatarUrl: https://avatars.githubusercontent.com/u/6892179?u=1dca6a22195d6cd1ab20737c0e19a4c55d639472&v=4
url: https://github.com/nilslindemann
YuriiMotov:
login: YuriiMotov
count: 65
avatarUrl: https://avatars.githubusercontent.com/u/109919500?u=bc48be95c429989224786106b027f3c5e40cc354&v=4
url: https://github.com/YuriiMotov
cassiobotaro:
login: cassiobotaro
count: 64
@@ -68,21 +78,11 @@ waynerv:
count: 63
avatarUrl: https://avatars.githubusercontent.com/u/39515546?u=ec35139777597cdbbbddda29bf8b9d4396b429a9&v=4
url: https://github.com/waynerv
nilslindemann:
login: nilslindemann
count: 61
avatarUrl: https://avatars.githubusercontent.com/u/6892179?u=1dca6a22195d6cd1ab20737c0e19a4c55d639472&v=4
url: https://github.com/nilslindemann
mattwang44:
login: mattwang44
count: 61
avatarUrl: https://avatars.githubusercontent.com/u/24987826?u=58e37fb3927b9124b458945ac4c97aa0f1062d85&v=4
url: https://github.com/mattwang44
YuriiMotov:
login: YuriiMotov
count: 56
avatarUrl: https://avatars.githubusercontent.com/u/109919500?u=bc48be95c429989224786106b027f3c5e40cc354&v=4
url: https://github.com/YuriiMotov
Laineyzhang55:
login: Laineyzhang55
count: 48
@@ -128,6 +128,11 @@ solomein-sv:
count: 38
avatarUrl: https://avatars.githubusercontent.com/u/46193920?u=789927ee09cfabd752d3bd554fa6baf4850d2777&v=4
url: https://github.com/solomein-sv
mezgoodle:
login: mezgoodle
count: 38
avatarUrl: https://avatars.githubusercontent.com/u/41520940?u=4a9c765af688389d54296845d18b8f6cd6ddf09a&v=4
url: https://github.com/mezgoodle
JavierSanchezCastro:
login: JavierSanchezCastro
count: 38
@@ -138,11 +143,6 @@ alejsdev:
count: 37
avatarUrl: https://avatars.githubusercontent.com/u/90076947?u=0facffe3abf87f57a1f05fa773d1119cc5c2f6a5&v=4
url: https://github.com/alejsdev
mezgoodle:
login: mezgoodle
count: 37
avatarUrl: https://avatars.githubusercontent.com/u/41520940?u=4a9c765af688389d54296845d18b8f6cd6ddf09a&v=4
url: https://github.com/mezgoodle
stlucasgarcia:
login: stlucasgarcia
count: 36
@@ -163,21 +163,21 @@ rjNemo:
count: 34
avatarUrl: https://avatars.githubusercontent.com/u/56785022?u=d5c3a02567c8649e146fcfc51b6060ccaf8adef8&v=4
url: https://github.com/rjNemo
codingjenny:
login: codingjenny
yychanlee:
login: yychanlee
count: 34
avatarUrl: https://avatars.githubusercontent.com/u/103817302?u=3a042740dc0ff58615da0d8679230966fd7693e8&v=4
url: https://github.com/codingjenny
url: https://github.com/yychanlee
Vincy1230:
login: Vincy1230
count: 34
avatarUrl: https://avatars.githubusercontent.com/u/81342412?u=ab5e256a4077a4a91f3f9cd2115ba80780454cbe&v=4
url: https://github.com/Vincy1230
akarev0:
login: akarev0
count: 33
avatarUrl: https://avatars.githubusercontent.com/u/53393089?u=6e528bb4789d56af887ce6fe237bea4010885406&v=4
url: https://github.com/akarev0
Vincy1230:
login: Vincy1230
count: 33
avatarUrl: https://avatars.githubusercontent.com/u/81342412?u=ab5e256a4077a4a91f3f9cd2115ba80780454cbe&v=4
url: https://github.com/Vincy1230
romashevchenko:
login: romashevchenko
count: 32
@@ -313,6 +313,11 @@ sattosan:
count: 19
avatarUrl: https://avatars.githubusercontent.com/u/20574756?u=b0d8474d2938189c6954423ae8d81d91013f80a8&v=4
url: https://github.com/sattosan
maru0123-2004:
login: maru0123-2004
count: 19
avatarUrl: https://avatars.githubusercontent.com/u/43961566?u=16ed8603a4d6a4665cb6c53a7aece6f31379b769&v=4
url: https://github.com/maru0123-2004
yes0ng:
login: yes0ng
count: 19
@@ -383,11 +388,6 @@ Joao-Pedro-P-Holanda:
count: 16
avatarUrl: https://avatars.githubusercontent.com/u/110267046?u=331bd016326dac4cf3df4848f6db2dbbf8b5f978&v=4
url: https://github.com/Joao-Pedro-P-Holanda
maru0123-2004:
login: maru0123-2004
count: 16
avatarUrl: https://avatars.githubusercontent.com/u/43961566?u=16ed8603a4d6a4665cb6c53a7aece6f31379b769&v=4
url: https://github.com/maru0123-2004
JaeHyuckSa:
login: JaeHyuckSa
count: 16
@@ -683,6 +683,11 @@ JoaoGustavoRogel:
count: 9
avatarUrl: https://avatars.githubusercontent.com/u/29525510?u=a0a91251f5e43e132608d55d28ccb8645c5ea405&v=4
url: https://github.com/JoaoGustavoRogel
valentinDruzhinin:
login: valentinDruzhinin
count: 9
avatarUrl: https://avatars.githubusercontent.com/u/12831905?u=aae1ebc675c91e8fa582df4fcc4fc4128106344d&v=4
url: https://github.com/valentinDruzhinin
Yarous:
login: Yarous
count: 9
@@ -738,6 +743,11 @@ sungchan1:
count: 8
avatarUrl: https://avatars.githubusercontent.com/u/28076127?u=fadbf24840186aca639d344bb3e0ecf7ff3441cf&v=4
url: https://github.com/sungchan1
roli2py:
login: roli2py
count: 8
avatarUrl: https://avatars.githubusercontent.com/u/61126128?u=bcb7a286e435a6b9d6a84b07db1232580ee796d4&v=4
url: https://github.com/roli2py
Serrones:
login: Serrones
count: 7
@@ -783,11 +793,6 @@ d2a-raudenaerde:
count: 7
avatarUrl: https://avatars.githubusercontent.com/u/5213150?u=e6d0ef65c571c7e544fc1c7ec151c7c0a72fb6bb&v=4
url: https://github.com/d2a-raudenaerde
valentinDruzhinin:
login: valentinDruzhinin
count: 7
avatarUrl: https://avatars.githubusercontent.com/u/12831905?u=aae1ebc675c91e8fa582df4fcc4fc4128106344d&v=4
url: https://github.com/valentinDruzhinin
Zerohertz:
login: Zerohertz
count: 7
@@ -1271,7 +1276,7 @@ rafsaf:
frnsimoes:
login: frnsimoes
count: 3
avatarUrl: https://avatars.githubusercontent.com/u/66239468?u=cba345870d8d6b25dd6d56ee18f7120581e3c573&v=4
avatarUrl: https://avatars.githubusercontent.com/u/66239468?u=98fb2a38bcac765ea9651af8a0ab8f37df86570d&v=4
url: https://github.com/frnsimoes
lieryan:
login: lieryan
@@ -1371,7 +1376,7 @@ nymous:
EpsilonRationes:
login: EpsilonRationes
count: 3
avatarUrl: https://avatars.githubusercontent.com/u/148639079?v=4
avatarUrl: https://avatars.githubusercontent.com/u/148639079?u=5dd6c4a3f570dea44d208465fd10b709bcdfa69a&v=4
url: https://github.com/EpsilonRationes
SametEmin:
login: SametEmin
@@ -1611,7 +1616,7 @@ raphaelauv:
Fahad-Md-Kamal:
login: Fahad-Md-Kamal
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/34704464?u=141086368c5557d5a1a533fe291f21f9fc584458&v=4
avatarUrl: https://avatars.githubusercontent.com/u/34704464?u=0b1da22a9b88b14d99e7e4368eadde7ecd695366&v=4
url: https://github.com/Fahad-Md-Kamal
zxcq544:
login: zxcq544

View File

@@ -10,7 +10,7 @@ jaystone776:
url: https://github.com/jaystone776
tiangolo:
login: tiangolo
count: 31
count: 46
avatarUrl: https://avatars.githubusercontent.com/u/1326112?u=cb5d06e73a9e1998141b1641aa88e443c6717651&v=4
url: https://github.com/tiangolo
ceb10n:
@@ -33,10 +33,15 @@ SwftAlpc:
count: 23
avatarUrl: https://avatars.githubusercontent.com/u/52768429?u=6a3aa15277406520ad37f6236e89466ed44bc5b8&v=4
url: https://github.com/SwftAlpc
YuriiMotov:
login: YuriiMotov
count: 23
avatarUrl: https://avatars.githubusercontent.com/u/109919500?u=bc48be95c429989224786106b027f3c5e40cc354&v=4
url: https://github.com/YuriiMotov
hasansezertasan:
login: hasansezertasan
count: 22
avatarUrl: https://avatars.githubusercontent.com/u/13135006?u=99f0b0f0fc47e88e8abb337b4447357939ef93e7&v=4
avatarUrl: https://avatars.githubusercontent.com/u/13135006?u=d36995e41a00590da64e6204cfd112e0484ac1ca&v=4
url: https://github.com/hasansezertasan
waynerv:
login: waynerv
@@ -58,11 +63,11 @@ Joao-Pedro-P-Holanda:
count: 14
avatarUrl: https://avatars.githubusercontent.com/u/110267046?u=331bd016326dac4cf3df4848f6db2dbbf8b5f978&v=4
url: https://github.com/Joao-Pedro-P-Holanda
codingjenny:
login: codingjenny
yychanlee:
login: yychanlee
count: 14
avatarUrl: https://avatars.githubusercontent.com/u/103817302?u=3a042740dc0ff58615da0d8679230966fd7693e8&v=4
url: https://github.com/codingjenny
url: https://github.com/yychanlee
Xewus:
login: Xewus
count: 13
@@ -108,11 +113,6 @@ pablocm83:
count: 8
avatarUrl: https://avatars.githubusercontent.com/u/28315068?u=3310fbb05bb8bfc50d2c48b6cb64ac9ee4a14549&v=4
url: https://github.com/pablocm83
YuriiMotov:
login: YuriiMotov
count: 8
avatarUrl: https://avatars.githubusercontent.com/u/109919500?u=bc48be95c429989224786106b027f3c5e40cc354&v=4
url: https://github.com/YuriiMotov
ptt3199:
login: ptt3199
count: 7
@@ -466,7 +466,7 @@ ArtemKhymenko:
hasnatsajid:
login: hasnatsajid
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/86589885?v=4
avatarUrl: https://avatars.githubusercontent.com/u/86589885?u=3712c0362d7a4000d76022339c545cf46aa5903f&v=4
url: https://github.com/hasnatsajid
alperiox:
login: alperiox
@@ -481,7 +481,7 @@ emrhnsyts:
vusallyv:
login: vusallyv
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/85983771?u=6fb8e2f876bca06e9f846606423c8f18fb46ad06&v=4
avatarUrl: https://avatars.githubusercontent.com/u/85983771?u=620ce103dcdc47953c952bb8d402a9cf8199014d&v=4
url: https://github.com/vusallyv
jackleeio:
login: jackleeio
@@ -496,7 +496,7 @@ choi-haram:
imtiaz101325:
login: imtiaz101325
count: 2
avatarUrl: https://avatars.githubusercontent.com/u/54007087?u=194d972b501b9ea9d2ddeaed757c492936e0121a&v=4
avatarUrl: https://avatars.githubusercontent.com/u/54007087?u=61e79c4c39798cd4d339788045dc44d4c6252bde&v=4
url: https://github.com/imtiaz101325
fabianfalon:
login: fabianfalon

View File

@@ -4,12 +4,6 @@ If you want to stream data that can be structured as JSON, you should [Stream JS
But if you want to **stream pure binary data** or strings, here's how you can do it.
/// info
Added in FastAPI 0.134.0.
///
## Use Cases { #use-cases }
You could use this if you want to stream pure strings, for example directly from the output of an **AI LLM** service.

View File

@@ -7,17 +7,6 @@ hide:
## Latest Changes
### Docs
* 📝 Update Skill, optimize context, trim and refactor into references. PR [#15031](https://github.com/fastapi/fastapi/pull/15031) by [@tiangolo](https://github.com/tiangolo).
## 0.135.0
### Features
* ✨ Add support for Server Sent Events. PR [#15030](https://github.com/fastapi/fastapi/pull/15030) by [@tiangolo](https://github.com/tiangolo).
* New docs: [Server-Sent Events (SSE)](https://fastapi.tiangolo.com/tutorial/server-sent-events/).
## 0.134.0
### Features

View File

@@ -1,120 +0,0 @@
# Server-Sent Events (SSE) { #server-sent-events-sse }
You can stream data to the client using **Server-Sent Events** (SSE).
This is similar to [Stream JSON Lines](stream-json-lines.md){.internal-link target=_blank}, but uses the `text/event-stream` format, which is supported natively by browsers with the <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource" class="external-link" target="_blank">`EventSource` API</a>.
/// info
Added in FastAPI 0.135.0.
///
## What are Server-Sent Events? { #what-are-server-sent-events }
SSE is a standard for streaming data from the server to the client over HTTP.
Each event is a small text block with "fields" like `data`, `event`, `id`, and `retry`, separated by blank lines.
It looks like this:
```
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
```
SSE is commonly used for AI chat streaming, live notifications, logs and observability, and other cases where the server pushes updates to the client.
/// tip
If you want to stream binary data, for example video or audio, check the advanced guide: [Stream Data](../advanced/stream-data.md){.internal-link target=_blank}.
///
## Stream SSE with FastAPI { #stream-sse-with-fastapi }
To stream SSE with FastAPI, use `yield` in your *path operation function* and set `response_class=EventSourceResponse`.
Import `EventSourceResponse` from `fastapi.sse`:
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[1:25] hl[4,22] *}
Each yielded item is encoded as JSON and sent in the `data:` field of an SSE event.
If you declare the return type as `AsyncIterable[Item]`, FastAPI will use it to **validate**, **document**, and **serialize** the data using Pydantic.
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[1:25] hl[10:12,23] *}
/// tip
As Pydantic will serialize it in the **Rust** side, you will get much higher **performance** than if you don't declare a return type.
///
### Non-async *path operation functions* { #non-async-path-operation-functions }
You can also use regular `def` functions (without `async`), and use `yield` the same way.
FastAPI will make sure it's run correctly so that it doesn't block the event loop.
As in this case the function is not async, the right return type would be `Iterable[Item]`:
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[28:31] hl[29] *}
### No Return Type { #no-return-type }
You can also omit the return type. FastAPI will use the [`jsonable_encoder`](./encoder.md){.internal-link target=_blank} to convert the data and send it.
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *}
## `ServerSentEvent` { #serversentevent }
If you need to set SSE fields like `event`, `id`, `retry`, or `comment`, you can yield `ServerSentEvent` objects instead of plain data.
Import `ServerSentEvent` from `fastapi.sse`:
{* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *}
The `data` field is always encoded as JSON. You can pass any value that can be serialized as JSON, including Pydantic models.
## Raw Data { #raw-data }
If you need to send data **without** JSON encoding, use `raw_data` instead of `data`.
This is useful for sending pre-formatted text, log lines, or special <dfn title="A value used to indicate a special condition or state">"sentinel"</dfn> values like `[DONE]`.
{* ../../docs_src/server_sent_events/tutorial003_py310.py hl[17] *}
/// note
`data` and `raw_data` are mutually exclusive. You can only set one of them on each `ServerSentEvent`.
///
## Resuming with `Last-Event-ID` { #resuming-with-last-event-id }
When a browser reconnects after a connection drop, it sends the last received `id` in the `Last-Event-ID` header.
You can read it as a header parameter and use it to resume the stream from where the client left off:
{* ../../docs_src/server_sent_events/tutorial004_py310.py hl[25,27,31] *}
## SSE with POST { #sse-with-post }
SSE works with **any HTTP method**, not just `GET`.
This is useful for protocols like <a href="https://modelcontextprotocol.io" class="external-link" target="_blank">MCP</a> that stream SSE over `POST`:
{* ../../docs_src/server_sent_events/tutorial005_py310.py hl[14] *}
## Technical Details { #technical-details }
FastAPI implements some SSE best practices out of the box.
* Send a **"keep alive" `ping` comment** every 15 seconds when there hasn't been any message, to prevent some proxies from closing the connection, as suggested in the <a href="https://html.spec.whatwg.org/multipage/server-sent-events.html#authoring-notes" class="external-link" target="_blank">HTML specification: Server-Sent Events</a>.
* Set the `Cache-Control: no-cache` header to **prevent caching** of the stream.
* Set a special header `X-Accel-Buffering: no` to **prevent buffering** in some proxies like Nginx.
You don't have to do anything about it, it works out of the box. 🤓

View File

@@ -2,12 +2,6 @@
You could have a sequence of data that you would like to send in a "**stream**", you could do it with **JSON Lines**.
/// info
Added in FastAPI 0.134.0.
///
## What is a Stream? { #what-is-a-stream }
"**Streaming**" data means that your app will start sending data items to the client without waiting for the entire sequence of items to be ready.
@@ -106,6 +100,6 @@ You can also omit the return type. FastAPI will then use the [`jsonable_encoder`
{* ../../docs_src/stream_json_lines/tutorial001_py310.py ln[33:36] hl[34] *}
## Server-Sent Events (SSE) { #server-sent-events-sse }
## Server Sent Events (SSE) { #server-sent-events-sse }
FastAPI also has first-class support for Server-Sent Events (SSE), which are quite similar but with a couple of extra details. You can learn about them in the next chapter: [Server-Sent Events (SSE)](server-sent-events.md){.internal-link target=_blank}. 🤓
A future version of FastAPI will also have first-class support for Server Sent Events (SSE), which are quite similar, but with a couple of extra details. 🤓

View File

@@ -155,7 +155,6 @@ nav:
- tutorial/sql-databases.md
- tutorial/bigger-applications.md
- tutorial/stream-json-lines.md
- tutorial/server-sent-events.md
- tutorial/background-tasks.md
- tutorial/metadata.md
- tutorial/static-files.md

View File

View File

@@ -1,43 +0,0 @@
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item

View File

@@ -1,26 +0,0 @@
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

View File

@@ -1,17 +0,0 @@
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)

View File

@@ -1,31 +0,0 @@
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))

View File

@@ -1,19 +0,0 @@
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")

View File

@@ -290,11 +290,146 @@ Apply shared dependencies at the router level via `dependencies=[Depends(...)]`.
## Dependency Injection
See [the dependency injection reference](references/dependencies.md) for detailed patterns including `yield` with `scope`, and class dependencies.
Use dependencies when:
Use dependencies when the logic can't be declared in Pydantic validation, depends on external resources, needs cleanup (with `yield`), or is shared across endpoints.
* They can't be declared in Pydantic validation and require additional logic
* The logic depends on external resources or could block in any other way
* Other dependencies need their results (it's a sub-dependency)
* The logic can be shared by multiple endpoints to do things like error early, authentication, etc.
* They need to handle cleanup (e.g., DB sessions, file handles), using dependencies with `yield`
* Their logic needs input data from the request, like headers, query parameters, etc.
Apply shared dependencies at the router level via `dependencies=[Depends(...)]`.
### Dependencies with `yield` and `scope`
When using dependencies with `yield`, they can have a `scope` that defines when the exit code is run.
Use the default scope `"request"` to run the exit code after the response is sent back.
```python
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
def get_db():
db = DBSession()
try:
yield db
finally:
db.close()
DBDep = Annotated[DBSession, Depends(get_db)]
@app.get("/items/")
async def read_items(db: DBDep):
return db.query(Item).all()
```
Use the scope `"function"` when they should run the exit code after the response data is generated but before the response is sent back to the client.
```python
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
def get_username():
try:
yield "Rick"
finally:
print("Cleanup up before response is sent")
UserNameDep = Annotated[str, Depends(get_username, scope="function")]
@app.get("/users/me")
def get_user_me(username: UserNameDep):
return username
```
### Class Dependencies
Avoid creating class dependencies when possible.
If a class is needed, instead create a regular function dependency that returns a class instance.
Do this:
```python
from dataclasses import dataclass
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
@dataclass
class DatabasePaginator:
offset: int = 0
limit: int = 100
q: str | None = None
def get_page(self) -> dict:
# Simulate a page of data
return {
"offset": self.offset,
"limit": self.limit,
"q": self.q,
"items": [],
}
def get_db_paginator(
offset: int = 0, limit: int = 100, q: str | None = None
) -> DatabasePaginator:
return DatabasePaginator(offset=offset, limit=limit, q=q)
PaginatorDep = Annotated[DatabasePaginator, Depends(get_db_paginator)]
@app.get("/items/")
async def read_items(paginator: PaginatorDep):
return paginator.get_page()
```
instead of this:
```python
# DO NOT DO THIS
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
class DatabasePaginator:
def __init__(self, offset: int = 0, limit: int = 100, q: str | None = None):
self.offset = offset
self.limit = limit
self.q = q
def get_page(self) -> dict:
# Simulate a page of data
return {
"offset": self.offset,
"limit": self.limit,
"q": self.q,
"items": [],
}
@app.get("/items/")
async def read_items(paginator: Annotated[DatabasePaginator, Depends()]):
return paginator.get_page()
```
## Async vs Sync *path operations*
@@ -326,23 +461,120 @@ The same rules apply to dependencies.
Make sure blocking code is not run inside of `async` functions. The logic will work, but will damage the performance heavily.
When needing to mix blocking and async code, see Asyncer in [the other tools reference](references/other-tools.md).
### Asyncer
## Streaming (JSON Lines, SSE, bytes)
When needing to run blocking code inside of async functions, or async code inside of blocking functions, suggest using Asyncer.
See [the streaming reference](references/streaming.md) for JSON Lines, Server-Sent Events (`EventSourceResponse`, `ServerSentEvent`), and byte streaming (`StreamingResponse`) patterns.
Install:
## Tooling
```bash
uv add asyncer
```
See [the other tools reference](references/other-tools.md) for details on uv, Ruff, ty for package management, linting, type checking, formatting, etc.
Run blocking sync code inside of async with `asyncify()`:
## Other Libraries
```python
from asyncer import asyncify
from fastapi import FastAPI
See [the other tools reference](references/other-tools.md) for details on other libraries:
app = FastAPI()
* Asyncer for handling async and await, concurrency, mixing async and blocking code, prefer it over AnyIO or asyncio.
* SQLModel for working with SQL databases, prefer it over SQLAlchemy.
* HTTPX for interacting with HTTP (other APIs), prefer it over Requests.
def do_blocking_work(name: str) -> str:
# Some blocking I/O operation
return f"Hello {name}"
@app.get("/items/")
async def read_items():
result = await asyncify(do_blocking_work)(name="World")
return {"message": result}
```
And run async code inside of blocking sync code with `syncify()`:
```python
from asyncer import syncify
from fastapi import FastAPI
app = FastAPI()
async def do_async_work(name: str) -> str:
return f"Hello {name}"
@app.get("/items/")
def read_items():
result = syncify(do_async_work)(name="World")
return {"message": result}
```
## Stream JSON Lines
To stream JSON Lines, declare the return type and use `yield` to return the data.
```python
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
```
## Stream bytes
To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data.
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from app.utils import read_image
app = FastAPI()
class PNGStreamingResponse(StreamingResponse):
media_type = "image/png"
@app.get("/image", response_class=PNGStreamingResponse)
def stream_image_no_async_no_annotation():
with read_image() as image_file:
yield from image_file
```
prefer this over returning a `StreamingResponse` directly:
```python
# DO NOT DO THIS
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from app.utils import read_image
app = FastAPI()
class PNGStreamingResponse(StreamingResponse):
media_type = "image/png"
@app.get("/")
async def main():
return PNGStreamingResponse(read_image())
```
## Use uv, ruff, ty
If uv is available, use it to manage dependencies.
If Ruff is available, use it to lint and format the code. Consider enabling the FastAPI rules.
If ty is available, use it to check types.
## SQLModel for SQL databases
When working with SQL databases, prefer using SQLModel as it is integrated with Pydantic and will allow declaring data validation with the same models.
## Do not use Pydantic RootModels

View File

@@ -1,142 +0,0 @@
# Dependency Injection
Use dependencies when:
* They can't be declared in Pydantic validation and require additional logic
* The logic depends on external resources or could block in any other way
* Other dependencies need their results (it's a sub-dependency)
* The logic can be shared by multiple endpoints to do things like error early, authentication, etc.
* They need to handle cleanup (e.g., DB sessions, file handles), using dependencies with `yield`
* Their logic needs input data from the request, like headers, query parameters, etc.
## Dependencies with `yield` and `scope`
When using dependencies with `yield`, they can have a `scope` that defines when the exit code is run.
Use the default scope `"request"` to run the exit code after the response is sent back.
```python
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
def get_db():
db = DBSession()
try:
yield db
finally:
db.close()
DBDep = Annotated[DBSession, Depends(get_db)]
@app.get("/items/")
async def read_items(db: DBDep):
return db.query(Item).all()
```
Use the scope `"function"` when they should run the exit code after the response data is generated but before the response is sent back to the client.
```python
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
def get_username():
try:
yield "Rick"
finally:
print("Cleanup up before response is sent")
UserNameDep = Annotated[str, Depends(get_username, scope="function")]
@app.get("/users/me")
def get_user_me(username: UserNameDep):
return username
```
## Class Dependencies
Avoid creating class dependencies when possible.
If a class is needed, instead create a regular function dependency that returns a class instance.
Do this:
```python
from dataclasses import dataclass
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
@dataclass
class DatabasePaginator:
offset: int = 0
limit: int = 100
q: str | None = None
def get_page(self) -> dict:
# Simulate a page of data
return {
"offset": self.offset,
"limit": self.limit,
"q": self.q,
"items": [],
}
def get_db_paginator(
offset: int = 0, limit: int = 100, q: str | None = None
) -> DatabasePaginator:
return DatabasePaginator(offset=offset, limit=limit, q=q)
PaginatorDep = Annotated[DatabasePaginator, Depends(get_db_paginator)]
@app.get("/items/")
async def read_items(paginator: PaginatorDep):
return paginator.get_page()
```
instead of this:
```python
# DO NOT DO THIS
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
class DatabasePaginator:
def __init__(self, offset: int = 0, limit: int = 100, q: str | None = None):
self.offset = offset
self.limit = limit
self.q = q
def get_page(self) -> dict:
# Simulate a page of data
return {
"offset": self.offset,
"limit": self.limit,
"q": self.q,
"items": [],
}
@app.get("/items/")
async def read_items(paginator: Annotated[DatabasePaginator, Depends()]):
return paginator.get_page()
```

View File

@@ -1,76 +0,0 @@
# Other Tools
## uv
If uv is available, use it to manage dependencies.
## Ruff
If Ruff is available, use it to lint and format the code. Consider enabling the FastAPI rules.
## ty
If ty is available, use it to check types.
## Asyncer
When needing to run blocking code inside of async functions, or async code inside of blocking functions, suggest using Asyncer.
Prefer it over AnyIO or asyncio.
Install:
```bash
uv add asyncer
```
Run blocking sync code inside of async with `asyncify()`:
```python
from asyncer import asyncify
from fastapi import FastAPI
app = FastAPI()
def do_blocking_work(name: str) -> str:
# Some blocking I/O operation
return f"Hello {name}"
@app.get("/items/")
async def read_items():
result = await asyncify(do_blocking_work)(name="World")
return {"message": result}
```
And run async code inside of blocking sync code with `syncify()`:
```python
from asyncer import syncify
from fastapi import FastAPI
app = FastAPI()
async def do_async_work(name: str) -> str:
return f"Hello {name}"
@app.get("/items/")
def read_items():
result = syncify(do_async_work)(name="World")
return {"message": result}
```
## SQLModel for SQL databases
When working with SQL databases, prefer using SQLModel as it is integrated with Pydantic and will allow declaring data validation with the same models.
Prefer it over SQLAlchemy.
## HTTPX
Use HTTPX for handling HTTP communication (e.g. with other APIs). It support sync and async usage.
Prefer it over Requests.

View File

@@ -1,105 +0,0 @@
# Streaming
## Stream JSON Lines
To stream JSON Lines, declare the return type and use `yield` to return the data.
```python
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
```
## Server-Sent Events (SSE)
To stream Server-Sent Events, use `response_class=EventSourceResponse` and `yield` items from the endpoint.
Plain objects are automatically JSON-serialized as `data:` fields, declare the return type so the serialization is done by Pydantic:
```python
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[Item]:
yield Item(name="Plumbus", price=32.99)
yield Item(name="Portal Gun", price=999.99)
```
For full control over SSE fields (`event`, `id`, `retry`, `comment`), yield `ServerSentEvent` instances:
```python
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/events", response_class=EventSourceResponse)
async def stream_events() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
yield ServerSentEvent(data={"progress": 50}, event="progress", id="2")
```
Use `raw_data` instead of `data` to send pre-formatted strings without JSON encoding:
```python
yield ServerSentEvent(raw_data="plain text line", event="log")
```
## Stream bytes
To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data.
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from app.utils import read_image
app = FastAPI()
class PNGStreamingResponse(StreamingResponse):
media_type = "image/png"
@app.get("/image", response_class=PNGStreamingResponse)
def stream_image_no_async_no_annotation():
with read_image() as image_file:
yield from image_file
```
prefer this over returning a `StreamingResponse` directly:
```python
# DO NOT DO THIS
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from app.utils import read_image
app = FastAPI()
class PNGStreamingResponse(StreamingResponse):
media_type = "image/png"
@app.get("/")
async def main():
return PNGStreamingResponse(read_image())
```

View File

@@ -1,6 +1,6 @@
"""FastAPI framework, high performance, easy to learn, fast to code, ready for production"""
__version__ = "0.135.0"
__version__ = "0.134.0"
from starlette import status as status

View File

@@ -29,7 +29,6 @@ from fastapi.openapi.constants import METHODS_WITH_BODY, REF_PREFIX
from fastapi.openapi.models import OpenAPI
from fastapi.params import Body, ParamTypes
from fastapi.responses import Response
from fastapi.sse import _SSE_EVENT_SCHEMA
from fastapi.types import ModelNameMap
from fastapi.utils import (
deep_dict_update,
@@ -373,26 +372,6 @@ def get_openapi_path(
operation.setdefault("responses", {}).setdefault(
status_code, {}
).setdefault("content", {})["application/jsonl"] = jsonl_content
elif route.is_sse_stream:
sse_content: dict[str, Any] = {}
item_schema = copy.deepcopy(_SSE_EVENT_SCHEMA)
if route.stream_item_field:
content_schema = get_schema_from_model_field(
field=route.stream_item_field,
model_name_map=model_name_map,
field_mapping=field_mapping,
separate_input_output_schemas=separate_input_output_schemas,
)
item_schema["required"] = ["data"]
item_schema["properties"]["data"] = {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": content_schema,
}
sse_content["itemSchema"] = item_schema
operation.setdefault("responses", {}).setdefault(
status_code, {}
).setdefault("content", {})["text/event-stream"] = sse_content
elif route_response_media_type:
response_schema = {"type": "string"}
if lenient_issubclass(current_response_class, JSONResponse):

View File

@@ -1,7 +1,6 @@
from typing import Any
from fastapi.exceptions import FastAPIDeprecationWarning
from fastapi.sse import EventSourceResponse as EventSourceResponse # noqa
from starlette.responses import FileResponse as FileResponse # noqa
from starlette.responses import HTMLResponse as HTMLResponse # noqa
from starlette.responses import JSONResponse as JSONResponse # noqa

View File

@@ -56,13 +56,6 @@ from fastapi.exceptions import (
ResponseValidationError,
WebSocketRequestValidationError,
)
from fastapi.sse import (
_PING_INTERVAL,
KEEPALIVE_COMMENT,
EventSourceResponse,
ServerSentEvent,
format_sse_event,
)
from fastapi.types import DecoratedCallable, IncEx
from fastapi.utils import (
create_model_field,
@@ -73,7 +66,7 @@ from fastapi.utils import (
from starlette import routing
from starlette._exception_handler import wrap_app_handling_exceptions
from starlette._utils import is_async_callable
from starlette.concurrency import iterate_in_threadpool, run_in_threadpool
from starlette.concurrency import run_in_threadpool
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
@@ -368,7 +361,6 @@ def get_request_handler(
actual_response_class: type[Response] = response_class.value
else:
actual_response_class = response_class
is_sse_stream = lenient_issubclass(actual_response_class, EventSourceResponse)
if isinstance(strict_content_type, DefaultPlaceholder):
actual_strict_content_type: bool = strict_content_type.value
else:
@@ -460,125 +452,35 @@ def get_request_handler(
errors = solved_result.errors
assert dependant.call # For types
if not errors:
# Shared serializer for stream items (JSONL and SSE).
# Validates against stream_item_field when set, then
# serializes to JSON bytes.
def _serialize_data(data: Any) -> bytes:
if stream_item_field:
value, errors_ = stream_item_field.validate(
data, {}, loc=("response",)
)
if errors_:
ctx = endpoint_ctx or EndpointContext()
raise ResponseValidationError(
errors=errors_,
body=data,
endpoint_ctx=ctx,
)
return stream_item_field.serialize_json(
value,
include=response_model_include,
exclude=response_model_exclude,
by_alias=response_model_by_alias,
exclude_unset=response_model_exclude_unset,
exclude_defaults=response_model_exclude_defaults,
exclude_none=response_model_exclude_none,
)
else:
data = jsonable_encoder(data)
return json.dumps(data).encode("utf-8")
if is_sse_stream:
# Generator endpoint: stream as Server-Sent Events
gen = dependant.call(**solved_result.values)
def _serialize_sse_item(item: Any) -> bytes:
if isinstance(item, ServerSentEvent):
# User controls the event structure.
# Serialize the data payload if present.
# For ServerSentEvent items we skip stream_item_field
# validation (the user may mix types intentionally).
if item.raw_data is not None:
data_str: str | None = item.raw_data
elif item.data is not None:
if hasattr(item.data, "model_dump_json"):
data_str = item.data.model_dump_json()
else:
data_str = json.dumps(jsonable_encoder(item.data))
else:
data_str = None
return format_sse_event(
data_str=data_str,
event=item.event,
id=item.id,
retry=item.retry,
comment=item.comment,
)
else:
# Plain object: validate + serialize via
# stream_item_field (if set) and wrap in data field
return format_sse_event(
data_str=_serialize_data(item).decode("utf-8")
)
if dependant.is_async_gen_callable:
sse_aiter: AsyncIterator[Any] = gen.__aiter__()
else:
sse_aiter = iterate_in_threadpool(gen)
async def _async_stream_sse() -> AsyncIterator[bytes]:
# Use a memory stream to decouple generator iteration
# from the keepalive timer. A producer task pulls items
# from the generator independently, so
# `anyio.fail_after` never wraps the generator's
# `__anext__` directly - avoiding CancelledError that
# would finalize the generator and also working for sync
# generators running in a thread pool.
send_stream, receive_stream = anyio.create_memory_object_stream[
bytes
](max_buffer_size=1)
async def _producer() -> None:
async with send_stream:
async for raw_item in sse_aiter:
await send_stream.send(_serialize_sse_item(raw_item))
async with anyio.create_task_group() as tg:
tg.start_soon(_producer)
async with receive_stream:
try:
while True:
try:
with anyio.fail_after(_PING_INTERVAL):
data = await receive_stream.receive()
yield data
# To allow for cancellation to trigger
# Ref: https://github.com/fastapi/fastapi/issues/14680
await anyio.sleep(0)
except TimeoutError:
yield KEEPALIVE_COMMENT
except anyio.EndOfStream:
pass
sse_stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
_async_stream_sse()
)
response = StreamingResponse(
sse_stream_content,
media_type="text/event-stream",
background=solved_result.background_tasks,
)
response.headers["Cache-Control"] = "no-cache"
# For Nginx proxies to not buffer server sent events
response.headers["X-Accel-Buffering"] = "no"
response.headers.raw.extend(solved_result.response.headers.raw)
elif is_json_stream:
if is_json_stream:
# Generator endpoint: stream as JSONL
gen = dependant.call(**solved_result.values)
def _serialize_item(item: Any) -> bytes:
return _serialize_data(item) + b"\n"
if stream_item_field:
value, errors = stream_item_field.validate(
item, {}, loc=("response",)
)
if errors:
ctx = endpoint_ctx or EndpointContext()
raise ResponseValidationError(
errors=errors,
body=item,
endpoint_ctx=ctx,
)
line = stream_item_field.serialize_json(
value,
include=response_model_include,
exclude=response_model_exclude,
by_alias=response_model_by_alias,
exclude_unset=response_model_exclude_unset,
exclude_defaults=response_model_exclude_defaults,
exclude_none=response_model_exclude_none,
)
return line + b"\n"
else:
data = jsonable_encoder(item)
return json.dumps(data).encode("utf-8") + b"\n"
if dependant.is_async_gen_callable:
@@ -589,7 +491,7 @@ def get_request_handler(
# Ref: https://github.com/fastapi/fastapi/issues/14680
await anyio.sleep(0)
jsonl_stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
stream_content: AsyncIterator[bytes] | Iterator[bytes] = (
_async_stream_jsonl()
)
else:
@@ -598,10 +500,10 @@ def get_request_handler(
for item in gen:
yield _serialize_item(item)
jsonl_stream_content = _sync_stream_jsonl()
stream_content = _sync_stream_jsonl()
response = StreamingResponse(
jsonl_stream_content,
stream_content,
media_type="application/jsonl",
background=solved_result.background_tasks,
)
@@ -807,16 +709,9 @@ class APIRoute(routing.Route):
else:
stream_item = get_stream_item_type(return_annotation)
if stream_item is not None:
# Extract item type for JSONL or SSE streaming when
# response_class is DefaultPlaceholder (JSONL) or
# EventSourceResponse (SSE).
# ServerSentEvent is excluded: it's a transport
# wrapper, not a data model, so it shouldn't feed
# into validation or OpenAPI schema generation.
if (
isinstance(response_class, DefaultPlaceholder)
or lenient_issubclass(response_class, EventSourceResponse)
) and not lenient_issubclass(stream_item, ServerSentEvent):
# Only extract item type for JSONL streaming when no
# explicit response_class (e.g. StreamingResponse) was set
if isinstance(response_class, DefaultPlaceholder):
self.stream_item_type = stream_item
response_model = None
else:
@@ -919,16 +814,11 @@ class APIRoute(routing.Route):
name=self.unique_id,
embed_body_fields=self._embed_body_fields,
)
# Detect generator endpoints that should stream as JSONL or SSE
is_generator = (
# Detect generator endpoints that should stream as JSONL
# (only when no explicit response_class like StreamingResponse is set)
self.is_json_stream = isinstance(response_class, DefaultPlaceholder) and (
self.dependant.is_async_gen_callable or self.dependant.is_gen_callable
)
self.is_sse_stream = is_generator and lenient_issubclass(
response_class, EventSourceResponse
)
self.is_json_stream = is_generator and isinstance(
response_class, DefaultPlaceholder
)
self.app = request_response(self.get_route_handler())
def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:

View File

@@ -1,222 +0,0 @@
from typing import Annotated, Any
from annotated_doc import Doc
from pydantic import AfterValidator, BaseModel, Field, model_validator
from starlette.responses import StreamingResponse
# Canonical SSE event schema matching the OpenAPI 3.2 spec
# (Section 4.14.4 "Special Considerations for Server-Sent Events")
_SSE_EVENT_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {"type": "integer", "minimum": 0},
},
}
class EventSourceResponse(StreamingResponse):
"""Streaming response with `text/event-stream` media type.
Use as `response_class=EventSourceResponse` on a *path operation* that uses `yield`
to enable Server Sent Events (SSE) responses.
Works with **any HTTP method** (`GET`, `POST`, etc.), which makes it compatible
with protocols like MCP that stream SSE over `POST`.
The actual encoding logic lives in the FastAPI routing layer. This class
serves mainly as a marker and sets the correct `Content-Type`.
"""
media_type = "text/event-stream"
def _check_id_no_null(v: str | None) -> str | None:
if v is not None and "\0" in v:
raise ValueError("SSE 'id' must not contain null characters")
return v
class ServerSentEvent(BaseModel):
"""Represents a single Server-Sent Event.
When `yield`ed from a *path operation function* that uses
`response_class=EventSourceResponse`, each `ServerSentEvent` is encoded
into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)
(`text/event-stream`).
If you yield a plain object (dict, Pydantic model, etc.) instead, it is
automatically JSON-encoded and sent as the `data:` field.
All `data` values **including plain strings** are JSON-serialized.
For example, `data="hello"` produces `data: "hello"` on the wire (with
quotes).
"""
data: Annotated[
Any,
Doc(
"""
The event payload.
Can be any JSON-serializable value: a Pydantic model, dict, list,
string, number, etc. It is **always** serialized to JSON: strings
are quoted (`"hello"` becomes `data: "hello"` on the wire).
Mutually exclusive with `raw_data`.
"""
),
] = None
raw_data: Annotated[
str | None,
Doc(
"""
Raw string to send as the `data:` field **without** JSON encoding.
Use this when you need to send pre-formatted text, HTML fragments,
CSV lines, or any non-JSON payload. The string is placed directly
into the `data:` field as-is.
Mutually exclusive with `data`.
"""
),
] = None
event: Annotated[
str | None,
Doc(
"""
Optional event type name.
Maps to `addEventListener(event, ...)` on the browser. When omitted,
the browser dispatches on the generic `message` event.
"""
),
] = None
id: Annotated[
str | None,
AfterValidator(_check_id_no_null),
Doc(
"""
Optional event ID.
The browser sends this value back as the `Last-Event-ID` header on
automatic reconnection. **Must not contain null (`\\0`) characters.**
"""
),
] = None
retry: Annotated[
int | None,
Field(ge=0),
Doc(
"""
Optional reconnection time in **milliseconds**.
Tells the browser how long to wait before reconnecting after the
connection is lost. Must be a non-negative integer.
"""
),
] = None
comment: Annotated[
str | None,
Doc(
"""
Optional comment line(s).
Comment lines start with `:` in the SSE wire format and are ignored by
`EventSource` clients. Useful for keep-alive pings to prevent
proxy/load-balancer timeouts.
"""
),
] = None
@model_validator(mode="after")
def _check_data_exclusive(self) -> "ServerSentEvent":
if self.data is not None and self.raw_data is not None:
raise ValueError(
"Cannot set both 'data' and 'raw_data' on the same "
"ServerSentEvent. Use 'data' for JSON-serialized payloads "
"or 'raw_data' for pre-formatted strings."
)
return self
def format_sse_event(
*,
data_str: Annotated[
str | None,
Doc(
"""
Pre-serialized data string to use as the `data:` field.
"""
),
] = None,
event: Annotated[
str | None,
Doc(
"""
Optional event type name (`event:` field).
"""
),
] = None,
id: Annotated[
str | None,
Doc(
"""
Optional event ID (`id:` field).
"""
),
] = None,
retry: Annotated[
int | None,
Doc(
"""
Optional reconnection time in milliseconds (`retry:` field).
"""
),
] = None,
comment: Annotated[
str | None,
Doc(
"""
Optional comment line(s) (`:` prefix).
"""
),
] = None,
) -> bytes:
"""Build SSE wire-format bytes from **pre-serialized** data.
The result always ends with `\n\n` (the event terminator).
"""
lines: list[str] = []
if comment is not None:
for line in comment.splitlines():
lines.append(f": {line}")
if event is not None:
lines.append(f"event: {event}")
if data_str is not None:
for line in data_str.splitlines():
lines.append(f"data: {line}")
if id is not None:
lines.append(f"id: {id}")
if retry is not None:
lines.append(f"retry: {retry}")
lines.append("")
lines.append("")
return "\n".join(lines).encode("utf-8")
# Keep-alive comment, per the SSE spec recommendation
KEEPALIVE_COMMENT = b": ping\n\n"
# Seconds between keep-alive pings when a generator is idle.
# Private but importable so tests can monkeypatch it.
_PING_INTERVAL: float = 15.0

View File

@@ -324,7 +324,6 @@ ignore = [
"docs_src/stream_json_lines/tutorial001_py310.py" = ["UP028"]
"docs_src/stream_data/tutorial001_py310.py" = ["UP028"]
"docs_src/stream_data/tutorial002_py310.py" = ["UP028"]
"docs_src/server_sent_events/tutorial001_py310.py" = ["UP028"]
[tool.ruff.lint.isort]
known-third-party = ["fastapi", "pydantic", "starlette"]

View File

@@ -1,318 +0,0 @@
import asyncio
import time
from collections.abc import AsyncIterable, Iterable
import fastapi.routing
import pytest
from fastapi import APIRouter, FastAPI
from fastapi.responses import EventSourceResponse
from fastapi.sse import ServerSentEvent
from fastapi.testclient import TestClient
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: str | None = None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
app = FastAPI()
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-sync", response_class=EventSourceResponse)
def sse_items_sync() -> Iterable[Item]:
yield from items
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-sync-no-annotation", response_class=EventSourceResponse)
def sse_items_sync_no_annotation():
yield from items
@app.get("/items/stream-dict", response_class=EventSourceResponse)
async def sse_items_dict():
for item in items:
yield {"name": item.name, "description": item.description}
@app.get("/items/stream-sse-event", response_class=EventSourceResponse)
async def sse_items_event():
yield ServerSentEvent(data="hello", event="greeting", id="1")
yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2")
yield ServerSentEvent(comment="just a comment")
yield ServerSentEvent(data="retry-test", retry=5000)
@app.get("/items/stream-mixed", response_class=EventSourceResponse)
async def sse_items_mixed() -> AsyncIterable[Item]:
yield items[0]
yield ServerSentEvent(data="custom-event", event="special")
yield items[1]
@app.get("/items/stream-string", response_class=EventSourceResponse)
async def sse_items_string():
yield ServerSentEvent(data="plain text data")
@app.post("/items/stream-post", response_class=EventSourceResponse)
async def sse_items_post() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-raw", response_class=EventSourceResponse)
async def sse_items_raw():
yield ServerSentEvent(raw_data="plain text without quotes")
yield ServerSentEvent(raw_data="<div>html fragment</div>", event="html")
yield ServerSentEvent(raw_data="cpu,87.3,1709145600", event="csv")
router = APIRouter()
@router.get("/events", response_class=EventSourceResponse)
async def stream_events():
yield {"msg": "hello"}
yield {"msg": "world"}
app.include_router(router, prefix="/api")
@pytest.fixture(name="client")
def client_fixture():
with TestClient(app) as c:
yield c
def test_async_generator_with_model(client: TestClient):
response = client.get("/items/stream")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
assert response.headers["cache-control"] == "no-cache"
assert response.headers["x-accel-buffering"] == "no"
lines = response.text.strip().split("\n")
data_lines = [line for line in lines if line.startswith("data: ")]
assert len(data_lines) == 3
assert '"name":"Plumbus"' in data_lines[0] or '"name": "Plumbus"' in data_lines[0]
assert (
'"name":"Portal Gun"' in data_lines[1]
or '"name": "Portal Gun"' in data_lines[1]
)
assert (
'"name":"Meeseeks Box"' in data_lines[2]
or '"name": "Meeseeks Box"' in data_lines[2]
)
def test_sync_generator_with_model(client: TestClient):
response = client.get("/items/stream-sync")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_async_generator_no_annotation(client: TestClient):
response = client.get("/items/stream-no-annotation")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_sync_generator_no_annotation(client: TestClient):
response = client.get("/items/stream-sync-no-annotation")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_dict_items(client: TestClient):
response = client.get("/items/stream-dict")
assert response.status_code == 200
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
assert '"name"' in data_lines[0]
def test_post_method_sse(client: TestClient):
"""SSE should work with POST (needed for MCP compatibility)."""
response = client.post("/items/stream-post")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_sse_events_with_fields(client: TestClient):
response = client.get("/items/stream-sse-event")
assert response.status_code == 200
text = response.text
assert "event: greeting\n" in text
assert 'data: "hello"\n' in text
assert "id: 1\n" in text
assert "event: json-data\n" in text
assert "id: 2\n" in text
assert 'data: {"key": "value"}\n' in text
assert ": just a comment\n" in text
assert "retry: 5000\n" in text
assert 'data: "retry-test"\n' in text
def test_mixed_plain_and_sse_events(client: TestClient):
response = client.get("/items/stream-mixed")
assert response.status_code == 200
text = response.text
assert "event: special\n" in text
assert 'data: "custom-event"\n' in text
assert '"name"' in text
def test_string_data_json_encoded(client: TestClient):
"""Strings are always JSON-encoded (quoted)."""
response = client.get("/items/stream-string")
assert response.status_code == 200
assert 'data: "plain text data"\n' in response.text
def test_server_sent_event_null_id_rejected():
with pytest.raises(ValueError, match="null"):
ServerSentEvent(data="test", id="has\0null")
def test_server_sent_event_negative_retry_rejected():
with pytest.raises(ValueError):
ServerSentEvent(data="test", retry=-1)
def test_server_sent_event_float_retry_rejected():
with pytest.raises(ValueError):
ServerSentEvent(data="test", retry=1.5) # type: ignore[arg-type]
def test_raw_data_sent_without_json_encoding(client: TestClient):
"""raw_data is sent as-is, not JSON-encoded."""
response = client.get("/items/stream-raw")
assert response.status_code == 200
text = response.text
# raw_data should appear without JSON quotes
assert "data: plain text without quotes\n" in text
# Not JSON-quoted
assert 'data: "plain text without quotes"' not in text
assert "event: html\n" in text
assert "data: <div>html fragment</div>\n" in text
assert "event: csv\n" in text
assert "data: cpu,87.3,1709145600\n" in text
def test_data_and_raw_data_mutually_exclusive():
"""Cannot set both data and raw_data."""
with pytest.raises(ValueError, match="Cannot set both"):
ServerSentEvent(data="json", raw_data="raw")
def test_sse_on_router_included_in_app(client: TestClient):
response = client.get("/api/events")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 2
# Keepalive ping tests
keepalive_app = FastAPI()
@keepalive_app.get("/slow-async", response_class=EventSourceResponse)
async def slow_async_stream():
yield {"n": 1}
# Sleep longer than the (monkeypatched) ping interval so a keepalive
# comment is emitted before the next item.
await asyncio.sleep(0.3)
yield {"n": 2}
@keepalive_app.get("/slow-sync", response_class=EventSourceResponse)
def slow_sync_stream():
yield {"n": 1}
time.sleep(0.3)
yield {"n": 2}
def test_keepalive_ping_async(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05)
with TestClient(keepalive_app) as c:
response = c.get("/slow-async")
assert response.status_code == 200
text = response.text
# The keepalive comment ": ping" should appear between the two data events
assert ": ping\n" in text
data_lines = [line for line in text.split("\n") if line.startswith("data: ")]
assert len(data_lines) == 2
def test_keepalive_ping_sync(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05)
with TestClient(keepalive_app) as c:
response = c.get("/slow-sync")
assert response.status_code == 200
text = response.text
assert ": ping\n" in text
data_lines = [line for line in text.split("\n") if line.startswith("data: ")]
assert len(data_lines) == 2
def test_no_keepalive_when_fast(client: TestClient):
"""No keepalive comment when items arrive quickly."""
response = client.get("/items/stream")
assert response.status_code == 200
# KEEPALIVE_COMMENT is ": ping\n\n".
assert ": ping\n" not in response.text

View File

@@ -1,191 +0,0 @@
import importlib
import pytest
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
@pytest.fixture(
name="client",
params=[
pytest.param("tutorial001_py310"),
],
)
def get_client(request: pytest.FixtureRequest):
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
client = TestClient(mod.app)
return client
@pytest.mark.parametrize(
"path",
[
"/items/stream",
"/items/stream-no-async",
"/items/stream-no-annotation",
"/items/stream-no-async-no-annotation",
],
)
def test_stream_items(client: TestClient, path: str):
response = client.get(path)
assert response.status_code == 200, response.text
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_openapi_schema(client: TestClient):
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/items/stream": {
"get": {
"summary": "Sse Items",
"operationId": "sse_items_items_stream_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": {
"$ref": "#/components/schemas/Item"
},
},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
"required": ["data"],
}
}
},
}
},
}
},
"/items/stream-no-async": {
"get": {
"summary": "Sse Items No Async",
"operationId": "sse_items_no_async_items_stream_no_async_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": {
"$ref": "#/components/schemas/Item"
},
},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
"required": ["data"],
}
}
},
}
},
}
},
"/items/stream-no-annotation": {
"get": {
"summary": "Sse Items No Annotation",
"operationId": "sse_items_no_annotation_items_stream_no_annotation_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
}
}
},
}
},
}
},
"/items/stream-no-async-no-annotation": {
"get": {
"summary": "Sse Items No Async No Annotation",
"operationId": "sse_items_no_async_no_annotation_items_stream_no_async_no_annotation_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
}
}
},
}
},
}
},
},
"components": {
"schemas": {
"Item": {
"properties": {
"name": {"type": "string", "title": "Name"},
"description": {
"anyOf": [
{"type": "string"},
{"type": "null"},
],
"title": "Description",
},
},
"type": "object",
"required": ["name", "description"],
"title": "Item",
}
}
},
}
)

View File

@@ -1,83 +0,0 @@
import importlib
import pytest
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
@pytest.fixture(
name="client",
params=[
pytest.param("tutorial002_py310"),
],
)
def get_client(request: pytest.FixtureRequest):
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
client = TestClient(mod.app)
return client
def test_stream_items(client: TestClient):
response = client.get("/items/stream")
assert response.status_code == 200, response.text
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
lines = response.text.strip().split("\n")
# First event is a comment-only event
assert lines[0] == ": stream of item updates"
# Remaining lines contain event:, data:, id:, retry: fields
event_lines = [line for line in lines if line.startswith("event: ")]
assert len(event_lines) == 3
assert all(line == "event: item_update" for line in event_lines)
data_lines = [line for line in lines if line.startswith("data: ")]
assert len(data_lines) == 3
id_lines = [line for line in lines if line.startswith("id: ")]
assert id_lines == ["id: 1", "id: 2", "id: 3"]
retry_lines = [line for line in lines if line.startswith("retry: ")]
assert len(retry_lines) == 3
assert all(line == "retry: 5000" for line in retry_lines)
def test_openapi_schema(client: TestClient):
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/items/stream": {
"get": {
"summary": "Stream Items",
"operationId": "stream_items_items_stream_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
}
}
},
}
},
}
}
},
}
)

View File

@@ -1,73 +0,0 @@
import importlib
import pytest
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
@pytest.fixture(
name="client",
params=[
pytest.param("tutorial003_py310"),
],
)
def get_client(request: pytest.FixtureRequest):
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
client = TestClient(mod.app)
return client
def test_stream_logs(client: TestClient):
response = client.get("/logs/stream")
assert response.status_code == 200, response.text
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
# raw_data is sent without JSON encoding (no quotes around the string)
assert data_lines[0] == "data: 2025-01-01 INFO Application started"
assert data_lines[1] == "data: 2025-01-01 DEBUG Connected to database"
assert data_lines[2] == "data: 2025-01-01 WARN High memory usage detected"
def test_openapi_schema(client: TestClient):
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/logs/stream": {
"get": {
"summary": "Stream Logs",
"operationId": "stream_logs_logs_stream_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
}
}
},
}
},
}
}
},
}
)

View File

@@ -1,164 +0,0 @@
import importlib
import pytest
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
@pytest.fixture(
name="client",
params=[
pytest.param("tutorial004_py310"),
],
)
def get_client(request: pytest.FixtureRequest):
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
client = TestClient(mod.app)
return client
def test_stream_all_items(client: TestClient):
response = client.get("/items/stream")
assert response.status_code == 200, response.text
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
id_lines = [
line for line in response.text.strip().split("\n") if line.startswith("id: ")
]
assert id_lines == ["id: 0", "id: 1", "id: 2"]
def test_resume_from_last_event_id(client: TestClient):
response = client.get(
"/items/stream",
headers={"last-event-id": "0"},
)
assert response.status_code == 200, response.text
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 2
id_lines = [
line for line in response.text.strip().split("\n") if line.startswith("id: ")
]
assert id_lines == ["id: 1", "id: 2"]
def test_resume_from_last_item(client: TestClient):
response = client.get(
"/items/stream",
headers={"last-event-id": "1"},
)
assert response.status_code == 200, response.text
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 1
id_lines = [
line for line in response.text.strip().split("\n") if line.startswith("id: ")
]
assert id_lines == ["id: 2"]
def test_openapi_schema(client: TestClient):
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/items/stream": {
"get": {
"summary": "Stream Items",
"operationId": "stream_items_items_stream_get",
"parameters": [
{
"name": "last-event-id",
"in": "header",
"required": False,
"schema": {
"anyOf": [{"type": "integer"}, {"type": "null"}],
"title": "Last-Event-Id",
},
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
}
}
},
"components": {
"schemas": {
"HTTPValidationError": {
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ValidationError"
},
"type": "array",
"title": "Detail",
}
},
"type": "object",
"title": "HTTPValidationError",
},
"ValidationError": {
"properties": {
"loc": {
"items": {
"anyOf": [{"type": "string"}, {"type": "integer"}]
},
"type": "array",
"title": "Location",
},
"msg": {"type": "string", "title": "Message"},
"type": {"type": "string", "title": "Error Type"},
"input": {"title": "Input"},
"ctx": {"type": "object", "title": "Context"},
},
"type": "object",
"required": ["loc", "msg", "type"],
"title": "ValidationError",
},
}
},
}
)

View File

@@ -1,141 +0,0 @@
import importlib
import pytest
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
@pytest.fixture(
name="client",
params=[
pytest.param("tutorial005_py310"),
],
)
def get_client(request: pytest.FixtureRequest):
mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}")
client = TestClient(mod.app)
return client
def test_stream_chat(client: TestClient):
response = client.post(
"/chat/stream",
json={"text": "hello world"},
)
assert response.status_code == 200, response.text
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
lines = response.text.strip().split("\n")
event_lines = [line for line in lines if line.startswith("event: ")]
assert event_lines == [
"event: token",
"event: token",
"event: done",
]
data_lines = [line for line in lines if line.startswith("data: ")]
assert data_lines == [
'data: "hello"',
'data: "world"',
"data: [DONE]",
]
def test_openapi_schema(client: TestClient):
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/chat/stream": {
"post": {
"summary": "Stream Chat",
"operationId": "stream_chat_chat_stream_post",
"requestBody": {
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Prompt"}
}
},
"required": True,
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {"type": "string"},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {
"type": "integer",
"minimum": 0,
},
},
}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
}
}
},
"components": {
"schemas": {
"HTTPValidationError": {
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ValidationError"
},
"type": "array",
"title": "Detail",
}
},
"type": "object",
"title": "HTTPValidationError",
},
"Prompt": {
"properties": {"text": {"type": "string", "title": "Text"}},
"type": "object",
"required": ["text"],
"title": "Prompt",
},
"ValidationError": {
"properties": {
"loc": {
"items": {
"anyOf": [{"type": "string"}, {"type": "integer"}]
},
"type": "array",
"title": "Location",
},
"msg": {"type": "string", "title": "Message"},
"type": {"type": "string", "title": "Error Type"},
"input": {"title": "Input"},
"ctx": {"type": "object", "title": "Context"},
},
"type": "object",
"required": ["loc", "msg", "type"],
"title": "ValidationError",
},
}
},
}
)