Files
fastapi/docs_src/security/tutorial004.py
Sebastián Ramírez 1f01ce9615 📝 Use Optional in docs (#1644)
* Updated .py files with Optional tag (up to body_nested_models)

* Update optionals

* docs_src/ all updates, few I was unsure of

* Updated markdown files with Optional param

* es: Add Optional typing to index.md

* Last of markdown files updated with Optional param

* Update highlight lines

* it: Add Optional typings

* README.md: Update with Optional typings

* Update more highlight increments

* Update highlights

* schema-extra-example.md: Update highlights

* updating highlighting on website to reflect .py changes

* Update highlighting for query-params & response-directly

* Address PR comments

* Get rid of unnecessary comment

*  Revert Optional in Chinese docs as it probably also requires changes in text

* 🎨 Apply format

*  Revert modified example

* ♻️ Simplify example in docs

* 📝 Update OpenAPI callback example to use Optional

*  Add Optional types to tests

* 📝 Update docs about query params, default to using Optional

* 🎨 Update code examples line highlighting

* 📝 Update nested models docs to use "type parameters" instead of "subtypes"

* 📝 Add notes about FastAPI usage of None

including:

= None

and

= Query(None)

and clarify relationship with Optional[str]

* 📝 Add note about response_model_by_alias

* ♻️ Simplify query param list example

* 🔥 Remove test for removed example

*  Update test for updated example

Co-authored-by: Christopher Nguyen <chrisngyn99@gmail.com>
Co-authored-by: yk396 <yk396@cornell.edu>
Co-authored-by: Kai Chen <kaichen120@gmail.com>
2020-06-28 20:13:30 +02:00

140 lines
4.0 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt import PyJWTError
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]