mirror of
https://github.com/fastapi/fastapi.git
synced 2026-03-02 05:37:08 -05:00
32 lines
795 B
Python
32 lines
795 B
Python
from collections.abc import AsyncIterable
|
|
from typing import Annotated
|
|
|
|
from fastapi import FastAPI, Header
|
|
from fastapi.sse import EventSourceResponse, ServerSentEvent
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
class Item(BaseModel):
|
|
name: str
|
|
price: float
|
|
|
|
|
|
items = [
|
|
Item(name="Plumbus", price=32.99),
|
|
Item(name="Portal Gun", price=999.99),
|
|
Item(name="Meeseeks Box", price=49.99),
|
|
]
|
|
|
|
|
|
@app.get("/items/stream", response_class=EventSourceResponse)
|
|
async def stream_items(
|
|
last_event_id: Annotated[int | None, Header()] = None,
|
|
) -> AsyncIterable[ServerSentEvent]:
|
|
start = last_event_id + 1 if last_event_id is not None else 0
|
|
for i, item in enumerate(items):
|
|
if i < start:
|
|
continue
|
|
yield ServerSentEvent(data=item, id=str(i))
|