Compare commits

...

8 Commits

Author SHA1 Message Date
Sebastián Ramírez
c59ddc8a24 🔖 Release 0.11.0 2019-04-03 15:51:44 +04:00
Sebastián Ramírez
378b39bbbc 📝 Update release notes 2019-04-03 15:49:58 +04:00
Sebastián Ramírez
37e0306517 📝 Update default error response in SQL tutorial 2019-04-03 15:49:40 +04:00
Sebastián Ramírez
fad3a9e1dc Add auto_error to security utils (#134)
to allow them to be optional, also allowing the declaration of multiple security schemes
2019-04-03 15:44:52 +04:00
Sebastián Ramírez
b35b0a9a90 📝 Update release notes 2019-03-31 22:05:03 +04:00
Alex Iribarren
1426b6200a 🗃️ Close the DB even if exceptions are raised (#89)
* Close the DB even if exceptions are raised

* 📝 Add note about closing DB in finally
2019-03-31 22:01:32 +04:00
Sebastián Ramírez
40e5f3764e 📝 Update release notes 2019-03-31 20:56:52 +04:00
Alif Jahan
e5c75807ce 🔥 removed duplicate dependency in pyproject.toml (#128) 2019-03-31 20:55:12 +04:00
19 changed files with 1068 additions and 71 deletions

View File

@@ -1,5 +1,13 @@
## Next release
## 0.11.0
* Add `auto_error` parameter to security utility functions. Allowing them to be optional. Also allowing to have multiple alternative security schemes that are then checked in a single dependency instead of each one verifying and returning the error to the client automatically when not satisfied. PR <a href="https://github.com/tiangolo/fastapi/pull/134" target="_blank">#134</a>.
* Update <a href="https://fastapi.tiangolo.com/tutorial/sql-databases/#create-a-middleware-to-handle-sessions" target="_blank">SQL Tutorial</a> to close database sessions even when there are exceptions. PR <a href="https://github.com/tiangolo/fastapi/pull/89" target="_blank">#89</a> by <a href="https://github.com/alexiri" target="_blank">@alexiri</a>.
* Fix duplicate dependency in `pyproject.toml`. PR <a href="https://github.com/tiangolo/fastapi/pull/128" target="_blank">#128</a> by <a href="https://github.com/zxalif" target="_blank">@zxalif</a>.
## 0.10.3
* Add Gitter chat, badge, links, etc. <a href="https://gitter.im/tiangolo/fastapi" target="_blank">https://gitter.im/tiangolo/fastapi

View File

@@ -3,6 +3,7 @@ from sqlalchemy import Boolean, Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import Session, sessionmaker
from starlette.requests import Request
from starlette.responses import Response
# SQLAlchemy specific code, as with any other app
SQLALCHEMY_DATABASE_URI = "sqlite:///./test.db"
@@ -66,7 +67,10 @@ def read_user(user_id: int, db: Session = Depends(get_db)):
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
request.state.db = SessionLocal()
response = await call_next(request)
request.state.db.close()
response = Response("Internal server error", status_code=500)
try:
request.state.db = SessionLocal()
response = await call_next(request)
finally:
request.state.db.close()
return response

View File

@@ -37,7 +37,7 @@ For now, don't pay attention to the rest, only the imports:
Define the database that SQLAlchemy should "connect" to:
```Python hl_lines="8"
```Python hl_lines="9"
{!./src/sql_databases/tutorial001.py!}
```
@@ -59,7 +59,7 @@ SQLALCHEMY_DATABASE_URI = "postgresql://user:password@postgresserver/db"
## Create the SQLAlchemy `engine`
```Python hl_lines="11 12 13"
```Python hl_lines="12 13 14"
{!./src/sql_databases/tutorial001.py!}
```
@@ -90,7 +90,7 @@ We will use `Session` to declare types later and getter better editor support an
For now, create the `SessionLocal`:
```Python hl_lines="14"
```Python hl_lines="15"
{!./src/sql_databases/tutorial001.py!}
```
@@ -108,10 +108,17 @@ A "middleware" is a function that is always executed for each request, and have
This middleware (just a function) will create a new SQLAlchemy `SessionLocal` for each request, add it to the request and then close it once the request is finished.
```Python hl_lines="67 68 69 70 71 72"
```Python hl_lines="68 69 70 71 72 73 74 75 76"
{!./src/sql_databases/tutorial001.py!}
```
!!! info
We put the creation of the `SessionLocal()` and handling of the requests in a `try` block.
And then we close it in the `finally` block.
This way we make sure the database session is always closed after the request. Even if there was an exception in the middle.
### About `request.state`
<a href="https://www.starlette.io/requests/#other-state" target="_blank">`request.state` is a property of each Starlette `Request` object</a>, it is there to store arbitrary objects attached to the request itself, like the database session in this case.
@@ -126,7 +133,7 @@ And when using the dependency in a path operation function, we declare it with t
This will then give us better editor support inside the path operation function, because the editor will know that the `db` parameter is of type `Session`.
```Python hl_lines="53 54 68"
```Python hl_lines="54 55 69"
{!./src/sql_databases/tutorial001.py!}
```
@@ -145,13 +152,13 @@ That way you don't have to declare them explicitly in every model.
So, your models will behave very similarly to, for example, Flask-SQLAlchemy.
```Python hl_lines="17 18 19 20 21"
```Python hl_lines="18 19 20 21 22"
{!./src/sql_databases/tutorial001.py!}
```
## Create the SQLAlchemy `Base` model
```Python hl_lines="24"
```Python hl_lines="25"
{!./src/sql_databases/tutorial001.py!}
```
@@ -161,7 +168,7 @@ Now this is finally code specific to your app.
Here's a user model that will be a table in the database:
```Python hl_lines="27 28 29 30 31"
```Python hl_lines="28 29 30 31 32"
{!./src/sql_databases/tutorial001.py!}
```
@@ -169,7 +176,7 @@ Here's a user model that will be a table in the database:
In a very simplistic way, initialize your database (create the tables, etc) and make sure you have a first user:
```Python hl_lines="34 36 38 39 40 41 42 44"
```Python hl_lines="35 37 39 40 41 42 43 45"
{!./src/sql_databases/tutorial001.py!}
```
@@ -197,7 +204,7 @@ Also, as all the functionality is self-contained in the same code, you can copy
By creating a function that is only dedicated to getting your user from a `user_id` (or any other parameter) independent of your path operation function, you can more easily re-use it in multiple parts and also add <abbr title="Automated tests, written in code, that check if another piece of code is working correctly.">unit tests</abbr> for it:
```Python hl_lines="48 49"
```Python hl_lines="49 50"
{!./src/sql_databases/tutorial001.py!}
```
@@ -207,7 +214,7 @@ Now, finally, here's the standard **FastAPI** code.
Create your app and path operation function:
```Python hl_lines="58 61 62 63 64"
```Python hl_lines="59 62 63 64 65"
{!./src/sql_databases/tutorial001.py!}
```
@@ -243,7 +250,7 @@ user = get_user(db_session, user_id=user_id)
Then we should declare the path operation without `async def`, just with a normal `def`:
```Python hl_lines="62"
```Python hl_lines="63"
{!./src/sql_databases/tutorial001.py!}
```

View File

@@ -1,6 +1,6 @@
"""FastAPI framework, high performance, easy to learn, fast to code, ready for production"""
__version__ = "0.10.3"
__version__ = "0.11.0"
from starlette.background import BackgroundTasks

View File

@@ -1,3 +1,5 @@
from typing import Optional
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.security.base import SecurityBase
from starlette.exceptions import HTTPException
@@ -10,42 +12,54 @@ class APIKeyBase(SecurityBase):
class APIKeyQuery(APIKeyBase):
def __init__(self, *, name: str, scheme_name: str = None):
self.model = APIKey(**{"in": APIKeyIn.query}, name=name)
def __init__(self, *, name: str, scheme_name: str = None, auto_error: bool = True):
self.model: APIKey = APIKey(**{"in": APIKeyIn.query}, name=name)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[str]:
api_key: str = request.query_params.get(self.model.name)
if not api_key:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return api_key
class APIKeyHeader(APIKeyBase):
def __init__(self, *, name: str, scheme_name: str = None):
self.model = APIKey(**{"in": APIKeyIn.header}, name=name)
def __init__(self, *, name: str, scheme_name: str = None, auto_error: bool = True):
self.model: APIKey = APIKey(**{"in": APIKeyIn.header}, name=name)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[str]:
api_key: str = request.headers.get(self.model.name)
if not api_key:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return api_key
class APIKeyCookie(APIKeyBase):
def __init__(self, *, name: str, scheme_name: str = None):
self.model = APIKey(**{"in": APIKeyIn.cookie}, name=name)
def __init__(self, *, name: str, scheme_name: str = None, auto_error: bool = True):
self.model: APIKey = APIKey(**{"in": APIKeyIn.cookie}, name=name)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[str]:
api_key: str = request.cookies.get(self.model.name)
if not api_key:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return api_key

View File

@@ -1,5 +1,6 @@
import binascii
from base64 import b64decode
from typing import Optional
from fastapi.openapi.models import (
HTTPBase as HTTPBaseModel,
@@ -24,27 +25,38 @@ class HTTPAuthorizationCredentials(BaseModel):
class HTTPBase(SecurityBase):
def __init__(self, *, scheme: str, scheme_name: str = None):
def __init__(
self, *, scheme: str, scheme_name: str = None, auto_error: bool = True
):
self.model = HTTPBaseModel(scheme=scheme)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(
self, request: Request
) -> Optional[HTTPAuthorizationCredentials]:
authorization: str = request.headers.get("Authorization")
scheme, credentials = get_authorization_scheme_param(authorization)
if not (authorization and scheme and credentials):
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
class HTTPBasic(HTTPBase):
def __init__(self, *, scheme_name: str = None, realm: str = None):
def __init__(
self, *, scheme_name: str = None, realm: str = None, auto_error: bool = True
):
self.model = HTTPBaseModel(scheme="basic")
self.scheme_name = scheme_name or self.__class__.__name__
self.realm = realm
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[HTTPBasicCredentials]:
authorization: str = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
# before implementing headers with 401 errors, wait for: https://github.com/encode/starlette/issues/295
@@ -53,9 +65,12 @@ class HTTPBasic(HTTPBase):
status_code=HTTP_403_FORBIDDEN, detail="Invalid authentication credentials"
)
if not authorization or scheme.lower() != "basic":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
try:
data = b64decode(param).decode("ascii")
except (ValueError, UnicodeDecodeError, binascii.Error):
@@ -67,17 +82,29 @@ class HTTPBasic(HTTPBase):
class HTTPBearer(HTTPBase):
def __init__(self, *, bearerFormat: str = None, scheme_name: str = None):
def __init__(
self,
*,
bearerFormat: str = None,
scheme_name: str = None,
auto_error: bool = True
):
self.model = HTTPBearerModel(bearerFormat=bearerFormat)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(
self, request: Request
) -> Optional[HTTPAuthorizationCredentials]:
authorization: str = request.headers.get("Authorization")
scheme, credentials = get_authorization_scheme_param(authorization)
if not (authorization and scheme and credentials):
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
if scheme.lower() != "bearer":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
@@ -87,17 +114,23 @@ class HTTPBearer(HTTPBase):
class HTTPDigest(HTTPBase):
def __init__(self, *, scheme_name: str = None):
def __init__(self, *, scheme_name: str = None, auto_error: bool = True):
self.model = HTTPBaseModel(scheme="digest")
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(
self, request: Request
) -> Optional[HTTPAuthorizationCredentials]:
authorization: str = request.headers.get("Authorization")
scheme, credentials = get_authorization_scheme_param(authorization)
if not (authorization and scheme and credentials):
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
if scheme.lower() != "digest":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,

View File

@@ -113,32 +113,49 @@ class OAuth2PasswordRequestFormStrict(OAuth2PasswordRequestForm):
class OAuth2(SecurityBase):
def __init__(
self, *, flows: OAuthFlowsModel = OAuthFlowsModel(), scheme_name: str = None
self,
*,
flows: OAuthFlowsModel = OAuthFlowsModel(),
scheme_name: str = None,
auto_error: bool = True
):
self.model = OAuth2Model(flows=flows)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
if not authorization:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return authorization
class OAuth2PasswordBearer(OAuth2):
def __init__(self, tokenUrl: str, scheme_name: str = None, scopes: dict = None):
def __init__(
self,
tokenUrl: str,
scheme_name: str = None,
scopes: dict = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(flows=flows, scheme_name=scheme_name)
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return param

View File

@@ -1,3 +1,5 @@
from typing import Optional
from fastapi.openapi.models import OpenIdConnect as OpenIdConnectModel
from fastapi.security.base import SecurityBase
from starlette.exceptions import HTTPException
@@ -6,14 +8,20 @@ from starlette.status import HTTP_403_FORBIDDEN
class OpenIdConnect(SecurityBase):
def __init__(self, *, openIdConnectUrl: str, scheme_name: str = None):
def __init__(
self, *, openIdConnectUrl: str, scheme_name: str = None, auto_error: bool = True
):
self.model = OpenIdConnectModel(openIdConnectUrl=openIdConnectUrl)
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> str:
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
if not authorization:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
else:
return None
return authorization

View File

@@ -58,7 +58,6 @@ all = [
"pyyaml",
"graphene",
"ujson",
"ujson",
"email_validator",
"uvicorn",
]

View File

@@ -0,0 +1,75 @@
from typing import Optional
from fastapi import Depends, FastAPI, Security
from fastapi.security import APIKeyCookie
from pydantic import BaseModel
from starlette.testclient import TestClient
app = FastAPI()
api_key = APIKeyCookie(name="key", auto_error=False)
class User(BaseModel):
username: str
def get_current_user(oauth_header: Optional[str] = Security(api_key)):
if oauth_header is None:
return None
user = User(username=oauth_header)
return user
@app.get("/users/me")
def read_current_user(current_user: User = Depends(get_current_user)):
if current_user is None:
return {"msg": "Create an account first"}
else:
return current_user
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"APIKeyCookie": []}],
}
}
},
"components": {
"securitySchemes": {
"APIKeyCookie": {"type": "apiKey", "name": "key", "in": "cookie"}
}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_api_key():
response = client.get("/users/me", cookies={"key": "secret"})
assert response.status_code == 200
assert response.json() == {"username": "secret"}
def test_security_api_key_no_key():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}

View File

@@ -0,0 +1,74 @@
from typing import Optional
from fastapi import Depends, FastAPI, Security
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from starlette.testclient import TestClient
app = FastAPI()
api_key = APIKeyHeader(name="key", auto_error=False)
class User(BaseModel):
username: str
def get_current_user(oauth_header: Optional[str] = Security(api_key)):
if oauth_header is None:
return None
user = User(username=oauth_header)
return user
@app.get("/users/me")
def read_current_user(current_user: Optional[User] = Depends(get_current_user)):
if current_user is None:
return {"msg": "Create an account first"}
return current_user
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"APIKeyHeader": []}],
}
}
},
"components": {
"securitySchemes": {
"APIKeyHeader": {"type": "apiKey", "name": "key", "in": "header"}
}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_api_key():
response = client.get("/users/me", headers={"key": "secret"})
assert response.status_code == 200
assert response.json() == {"username": "secret"}
def test_security_api_key_no_key():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}

View File

@@ -0,0 +1,74 @@
from typing import Optional
from fastapi import Depends, FastAPI, Security
from fastapi.security import APIKeyQuery
from pydantic import BaseModel
from starlette.testclient import TestClient
app = FastAPI()
api_key = APIKeyQuery(name="key", auto_error=False)
class User(BaseModel):
username: str
def get_current_user(oauth_header: Optional[str] = Security(api_key)):
if oauth_header is None:
return None
user = User(username=oauth_header)
return user
@app.get("/users/me")
def read_current_user(current_user: Optional[User] = Depends(get_current_user)):
if current_user is None:
return {"msg": "Create an account first"}
return current_user
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"APIKeyQuery": []}],
}
}
},
"components": {
"securitySchemes": {
"APIKeyQuery": {"type": "apiKey", "name": "key", "in": "query"}
}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_api_key():
response = client.get("/users/me?key=secret")
assert response.status_code == 200
assert response.json() == {"username": "secret"}
def test_security_api_key_no_key():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}

View File

@@ -0,0 +1,62 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBase
from starlette.testclient import TestClient
app = FastAPI()
security = HTTPBase(scheme="Other", auto_error=False)
@app.get("/users/me")
def read_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Security(security)
):
if credentials is None:
return {"msg": "Create an account first"}
return {"scheme": credentials.scheme, "credentials": credentials.credentials}
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"HTTPBase": []}],
}
}
},
"components": {
"securitySchemes": {"HTTPBase": {"type": "http", "scheme": "Other"}}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_http_base():
response = client.get("/users/me", headers={"Authorization": "Other foobar"})
assert response.status_code == 200
assert response.json() == {"scheme": "Other", "credentials": "foobar"}
def test_security_http_base_no_credentials():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}

View File

@@ -0,0 +1,79 @@
from base64 import b64encode
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from requests.auth import HTTPBasicAuth
from starlette.testclient import TestClient
app = FastAPI()
security = HTTPBasic(auto_error=False)
@app.get("/users/me")
def read_current_user(credentials: Optional[HTTPBasicCredentials] = Security(security)):
if credentials is None:
return {"msg": "Create an account first"}
return {"username": credentials.username, "password": credentials.password}
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"HTTPBasic": []}],
}
}
},
"components": {
"securitySchemes": {"HTTPBasic": {"type": "http", "scheme": "basic"}}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_http_basic():
auth = HTTPBasicAuth(username="john", password="secret")
response = client.get("/users/me", auth=auth)
assert response.status_code == 200
assert response.json() == {"username": "john", "password": "secret"}
def test_security_http_basic_no_credentials():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}
def test_security_http_basic_invalid_credentials():
response = client.get(
"/users/me", headers={"Authorization": "Basic notabase64token"}
)
assert response.status_code == 403
assert response.json() == {"detail": "Invalid authentication credentials"}
def test_security_http_basic_non_basic_credentials():
payload = b64encode(b"johnsecret").decode("ascii")
auth_header = f"Basic {payload}"
response = client.get("/users/me", headers={"Authorization": auth_header})
assert response.status_code == 403
assert response.json() == {"detail": "Invalid authentication credentials"}

View File

@@ -0,0 +1,68 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette.testclient import TestClient
app = FastAPI()
security = HTTPBearer(auto_error=False)
@app.get("/users/me")
def read_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Security(security)
):
if credentials is None:
return {"msg": "Create an account first"}
return {"scheme": credentials.scheme, "credentials": credentials.credentials}
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"HTTPBearer": []}],
}
}
},
"components": {
"securitySchemes": {"HTTPBearer": {"type": "http", "scheme": "bearer"}}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_http_bearer():
response = client.get("/users/me", headers={"Authorization": "Bearer foobar"})
assert response.status_code == 200
assert response.json() == {"scheme": "Bearer", "credentials": "foobar"}
def test_security_http_bearer_no_credentials():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}
def test_security_http_bearer_incorrect_scheme_credentials():
response = client.get("/users/me", headers={"Authorization": "Basic notreally"})
assert response.status_code == 403
assert response.json() == {"detail": "Invalid authentication credentials"}

View File

@@ -0,0 +1,70 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest
from starlette.testclient import TestClient
app = FastAPI()
security = HTTPDigest(auto_error=False)
@app.get("/users/me")
def read_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Security(security)
):
if credentials is None:
return {"msg": "Create an account first"}
return {"scheme": credentials.scheme, "credentials": credentials.credentials}
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"HTTPDigest": []}],
}
}
},
"components": {
"securitySchemes": {"HTTPDigest": {"type": "http", "scheme": "digest"}}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_http_digest():
response = client.get("/users/me", headers={"Authorization": "Digest foobar"})
assert response.status_code == 200
assert response.json() == {"scheme": "Digest", "credentials": "foobar"}
def test_security_http_digest_no_credentials():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}
def test_security_http_digest_incorrect_scheme_credentials():
response = client.get(
"/users/me", headers={"Authorization": "Other invalidauthorization"}
)
assert response.status_code == 403
assert response.json() == {"detail": "Invalid authentication credentials"}

View File

@@ -0,0 +1,254 @@
from typing import Optional
import pytest
from fastapi import Depends, FastAPI, Security
from fastapi.security import OAuth2
from fastapi.security.oauth2 import OAuth2PasswordRequestFormStrict
from pydantic import BaseModel
from starlette.testclient import TestClient
app = FastAPI()
reusable_oauth2 = OAuth2(
flows={
"password": {
"tokenUrl": "/token",
"scopes": {"read:users": "Read the users", "write:users": "Create users"},
}
},
auto_error=False,
)
class User(BaseModel):
username: str
def get_current_user(oauth_header: Optional[str] = Security(reusable_oauth2)):
if oauth_header is None:
return None
user = User(username=oauth_header)
return user
@app.post("/login")
def read_current_user(form_data: OAuth2PasswordRequestFormStrict = Depends()):
return form_data
@app.get("/users/me")
def read_current_user(current_user: Optional[User] = Depends(get_current_user)):
if current_user is None:
return {"msg": "Create an account first"}
return current_user
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/login": {
"post": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
"summary": "Read Current User Post",
"operationId": "read_current_user_login_post",
"requestBody": {
"content": {
"application/x-www-form-urlencoded": {
"schema": {
"$ref": "#/components/schemas/Body_read_current_user"
}
}
},
"required": True,
},
}
},
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"OAuth2": []}],
}
},
},
"components": {
"schemas": {
"Body_read_current_user": {
"title": "Body_read_current_user",
"required": ["grant_type", "username", "password"],
"type": "object",
"properties": {
"grant_type": {
"title": "Grant_Type",
"pattern": "password",
"type": "string",
},
"username": {"title": "Username", "type": "string"},
"password": {"title": "Password", "type": "string"},
"scope": {"title": "Scope", "type": "string", "default": ""},
"client_id": {"title": "Client_Id", "type": "string"},
"client_secret": {"title": "Client_Secret", "type": "string"},
},
},
"ValidationError": {
"title": "ValidationError",
"required": ["loc", "msg", "type"],
"type": "object",
"properties": {
"loc": {
"title": "Location",
"type": "array",
"items": {"type": "string"},
},
"msg": {"title": "Message", "type": "string"},
"type": {"title": "Error Type", "type": "string"},
},
},
"HTTPValidationError": {
"title": "HTTPValidationError",
"type": "object",
"properties": {
"detail": {
"title": "Detail",
"type": "array",
"items": {"$ref": "#/components/schemas/ValidationError"},
}
},
},
},
"securitySchemes": {
"OAuth2": {
"type": "oauth2",
"flows": {
"password": {
"scopes": {
"read:users": "Read the users",
"write:users": "Create users",
},
"tokenUrl": "/token",
}
},
}
},
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_oauth2():
response = client.get("/users/me", headers={"Authorization": "Bearer footokenbar"})
assert response.status_code == 200
assert response.json() == {"username": "Bearer footokenbar"}
def test_security_oauth2_password_other_header():
response = client.get("/users/me", headers={"Authorization": "Other footokenbar"})
assert response.status_code == 200
assert response.json() == {"username": "Other footokenbar"}
def test_security_oauth2_password_bearer_no_header():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}
required_params = {
"detail": [
{
"loc": ["body", "grant_type"],
"msg": "field required",
"type": "value_error.missing",
},
{
"loc": ["body", "username"],
"msg": "field required",
"type": "value_error.missing",
},
{
"loc": ["body", "password"],
"msg": "field required",
"type": "value_error.missing",
},
]
}
grant_type_required = {
"detail": [
{
"loc": ["body", "grant_type"],
"msg": "field required",
"type": "value_error.missing",
}
]
}
grant_type_incorrect = {
"detail": [
{
"loc": ["body", "grant_type"],
"msg": 'string does not match regex "password"',
"type": "value_error.str.regex",
"ctx": {"pattern": "password"},
}
]
}
@pytest.mark.parametrize(
"data,expected_status,expected_response",
[
(None, 422, required_params),
({"username": "johndoe", "password": "secret"}, 422, grant_type_required),
(
{"username": "johndoe", "password": "secret", "grant_type": "incorrect"},
422,
grant_type_incorrect,
),
(
{"username": "johndoe", "password": "secret", "grant_type": "password"},
200,
{
"grant_type": "password",
"username": "johndoe",
"password": "secret",
"scopes": [],
"client_id": None,
"client_secret": None,
},
),
],
)
def test_strict_login(data, expected_status, expected_response):
response = client.post("/login", data=data)
assert response.status_code == expected_status
assert response.json() == expected_response

View File

@@ -0,0 +1,71 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import OAuth2PasswordBearer
from starlette.testclient import TestClient
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", auto_error=False)
@app.get("/items/")
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
if token is None:
return {"msg": "Create an account first"}
return {"token": token}
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/items/": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Items Get",
"operationId": "read_items_items__get",
"security": [{"OAuth2PasswordBearer": []}],
}
}
},
"components": {
"securitySchemes": {
"OAuth2PasswordBearer": {
"type": "oauth2",
"flows": {"password": {"scopes": {}, "tokenUrl": "/token"}},
}
}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_no_token():
response = client.get("/items")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}
def test_token():
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
assert response.status_code == 200
assert response.json() == {"token": "testtoken"}
def test_incorrect_token():
response = client.get("/items", headers={"Authorization": "Notexistent testtoken"})
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}

View File

@@ -0,0 +1,80 @@
from typing import Optional
from fastapi import Depends, FastAPI, Security
from fastapi.security.open_id_connect_url import OpenIdConnect
from pydantic import BaseModel
from starlette.testclient import TestClient
app = FastAPI()
oid = OpenIdConnect(openIdConnectUrl="/openid", auto_error=False)
class User(BaseModel):
username: str
def get_current_user(oauth_header: Optional[str] = Security(oid)):
if oauth_header is None:
return None
user = User(username=oauth_header)
return user
@app.get("/users/me")
def read_current_user(current_user: Optional[User] = Depends(get_current_user)):
if current_user is None:
return {"msg": "Create an account first"}
return current_user
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/users/me": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Current User Get",
"operationId": "read_current_user_users_me_get",
"security": [{"OpenIdConnect": []}],
}
}
},
"components": {
"securitySchemes": {
"OpenIdConnect": {"type": "openIdConnect", "openIdConnectUrl": "/openid"}
}
},
}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_security_oauth2():
response = client.get("/users/me", headers={"Authorization": "Bearer footokenbar"})
assert response.status_code == 200
assert response.json() == {"username": "Bearer footokenbar"}
def test_security_oauth2_password_other_header():
response = client.get("/users/me", headers={"Authorization": "Other footokenbar"})
assert response.status_code == 200
assert response.json() == {"username": "Other footokenbar"}
def test_security_oauth2_password_bearer_no_header():
response = client.get("/users/me")
assert response.status_code == 200
assert response.json() == {"msg": "Create an account first"}