from collections.abc import AsyncIterable from typing import Annotated from fastapi import FastAPI, Header from fastapi.sse import EventSourceResponse, ServerSentEvent from pydantic import BaseModel app = FastAPI() class Item(BaseModel): name: str price: float items = [ Item(name="Plumbus", price=32.99), Item(name="Portal Gun", price=999.99), Item(name="Meeseeks Box", price=49.99), ] @app.get("/items/stream", response_class=EventSourceResponse) async def stream_items( last_event_id: Annotated[int | None, Header()] = None, ) -> AsyncIterable[ServerSentEvent]: start = last_event_id + 1 if last_event_id is not None else 0 for i, item in enumerate(items): if i < start: continue yield ServerSentEvent(data=item, id=str(i))