Files
fastapi/docs_src/stream_data/tutorial001_py310.py
2026-02-26 23:07:35 +01:00

66 lines
2.2 KiB
Python

from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
message = """
Rick: (stumbles in drunkenly, and turns on the lights) Morty! You gotta come on. You got--... you gotta come with me.
Morty: (rubs his eyes) What, Rick? What's going on?
Rick: I got a surprise for you, Morty.
Morty: It's the middle of the night. What are you talking about?
Rick: (spills alcohol on Morty's bed) Come on, I got a surprise for you. (drags Morty by the ankle) Come on, hurry up. (pulls Morty out of his bed and into the hall)
Morty: Ow! Ow! You're tugging me too hard!
Rick: We gotta go, gotta get outta here, come on. Got a surprise for you Morty.
"""
@app.get("/story/stream", response_class=StreamingResponse)
async def stream_story() -> AsyncIterable[str]:
for line in message.splitlines():
yield line
@app.get("/story/stream-no-async", response_class=StreamingResponse)
def stream_story_no_async() -> Iterable[str]:
for line in message.splitlines():
yield line
@app.get("/story/stream-no-annotation", response_class=StreamingResponse)
async def stream_story_no_annotation():
for line in message.splitlines():
yield line
@app.get("/story/stream-no-async-no-annotation", response_class=StreamingResponse)
def stream_story_no_async_no_annotation():
for line in message.splitlines():
yield line
@app.get("/story/stream-bytes", response_class=StreamingResponse)
async def stream_story_bytes() -> AsyncIterable[bytes]:
for line in message.splitlines():
yield line.encode("utf-8")
@app.get("/story/stream-no-async-bytes", response_class=StreamingResponse)
def stream_story_no_async_bytes() -> Iterable[bytes]:
for line in message.splitlines():
yield line.encode("utf-8")
@app.get("/story/stream-no-annotation-bytes", response_class=StreamingResponse)
async def stream_story_no_annotation_bytes():
for line in message.splitlines():
yield line.encode("utf-8")
@app.get("/story/stream-no-async-no-annotation-bytes", response_class=StreamingResponse)
def stream_story_no_async_no_annotation_bytes():
for line in message.splitlines():
yield line.encode("utf-8")