♻️ Refactor internals, simplify Pydantic v2/v1 utils, create_model_field, better types for lenient_issubclass (#14860)

This commit is contained in:
Sebastián Ramírez
2026-02-07 00:08:07 -08:00
committed by GitHub
parent cc6ced6345
commit d59fbc3494
8 changed files with 43 additions and 84 deletions

View File

@@ -1,7 +1,6 @@
from .shared import PYDANTIC_VERSION_MINOR_TUPLE as PYDANTIC_VERSION_MINOR_TUPLE
from .shared import annotation_is_pydantic_v1 as annotation_is_pydantic_v1
from .shared import field_annotation_is_scalar as field_annotation_is_scalar
from .shared import is_pydantic_v1_model_class as is_pydantic_v1_model_class
from .shared import is_pydantic_v1_model_instance as is_pydantic_v1_model_instance
from .shared import (
is_uploadfile_or_nonable_uploadfile_annotation as is_uploadfile_or_nonable_uploadfile_annotation,
@@ -12,21 +11,19 @@ from .shared import (
from .shared import lenient_issubclass as lenient_issubclass
from .shared import sequence_types as sequence_types
from .shared import value_is_sequence as value_is_sequence
from .v2 import BaseConfig as BaseConfig
from .v2 import ModelField as ModelField
from .v2 import PydanticSchemaGenerationError as PydanticSchemaGenerationError
from .v2 import RequiredParam as RequiredParam
from .v2 import Undefined as Undefined
from .v2 import UndefinedType as UndefinedType
from .v2 import Url as Url
from .v2 import Validator as Validator
from .v2 import _regenerate_error_with_loc as _regenerate_error_with_loc
from .v2 import copy_field_info as copy_field_info
from .v2 import create_body_model as create_body_model
from .v2 import evaluate_forwardref as evaluate_forwardref
from .v2 import get_cached_model_fields as get_cached_model_fields
from .v2 import get_definitions as get_definitions
from .v2 import get_flat_models_from_fields as get_flat_models_from_fields
from .v2 import get_missing_field_error as get_missing_field_error
from .v2 import get_model_name_map as get_model_name_map
from .v2 import get_schema_from_model_field as get_schema_from_model_field
from .v2 import is_bytes_field as is_bytes_field
from .v2 import is_bytes_sequence_field as is_bytes_sequence_field

View File

@@ -8,6 +8,7 @@ from dataclasses import is_dataclass
from typing import (
Annotated,
Any,
TypeVar,
Union,
)
@@ -15,7 +16,9 @@ from fastapi.types import UnionType
from pydantic import BaseModel
from pydantic.version import VERSION as PYDANTIC_VERSION
from starlette.datastructures import UploadFile
from typing_extensions import get_args, get_origin
from typing_extensions import TypeGuard, get_args, get_origin
_T = TypeVar("_T")
# Copy from Pydantic: pydantic/_internal/_typing_extra.py
if sys.version_info < (3, 10):
@@ -39,15 +42,13 @@ sequence_annotation_to_type = {
deque: deque,
}
sequence_types = tuple(sequence_annotation_to_type.keys())
Url: type[Any]
sequence_types: tuple[type[Any], ...] = tuple(sequence_annotation_to_type.keys())
# Copy of Pydantic: pydantic/_internal/_utils.py
# Copy of Pydantic: pydantic/_internal/_utils.py with added TypeGuard
def lenient_issubclass(
cls: Any, class_or_tuple: Union[type[Any], tuple[type[Any], ...], None]
) -> bool:
cls: Any, class_or_tuple: Union[type[_T], tuple[type[_T], ...], None]
) -> TypeGuard[type[_T]]:
try:
return isinstance(cls, type) and issubclass(cls, class_or_tuple) # type: ignore[arg-type]
except TypeError: # pragma: no cover
@@ -177,16 +178,26 @@ def is_uploadfile_sequence_annotation(annotation: Any) -> bool:
def is_pydantic_v1_model_instance(obj: Any) -> bool:
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
from pydantic import v1
# TODO: remove this function once the required version of Pydantic fully
# removes pydantic.v1
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
from pydantic import v1
except ImportError: # pragma: no cover
return False
return isinstance(obj, v1.BaseModel)
def is_pydantic_v1_model_class(cls: Any) -> bool:
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
from pydantic import v1
# TODO: remove this function once the required version of Pydantic fully
# removes pydantic.v1
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
from pydantic import v1
except ImportError: # pragma: no cover
return False
return lenient_issubclass(cls, v1.BaseModel)

View File

@@ -12,7 +12,7 @@ from typing import (
cast,
)
from fastapi._compat import shared
from fastapi._compat import lenient_issubclass, shared
from fastapi.openapi.constants import REF_TEMPLATE
from fastapi.types import IncEx, ModelNameMap, UnionType
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, create_model
@@ -23,29 +23,20 @@ from pydantic._internal._schema_generation_shared import ( # type: ignore[attr-
GetJsonSchemaHandler as GetJsonSchemaHandler,
)
from pydantic._internal._typing_extra import eval_type_lenient
from pydantic._internal._utils import lenient_issubclass as lenient_issubclass
from pydantic.fields import FieldInfo as FieldInfo
from pydantic.json_schema import GenerateJsonSchema as GenerateJsonSchema
from pydantic.json_schema import JsonSchemaValue as JsonSchemaValue
from pydantic_core import CoreSchema as CoreSchema
from pydantic_core import PydanticUndefined, PydanticUndefinedType
from pydantic_core import PydanticUndefined
from pydantic_core import Url as Url
from pydantic_core.core_schema import (
with_info_plain_validator_function as with_info_plain_validator_function,
)
from typing_extensions import Literal, get_args, get_origin
try:
from pydantic_core.core_schema import (
with_info_plain_validator_function as with_info_plain_validator_function,
)
except ImportError: # pragma: no cover
from pydantic_core.core_schema import (
general_plain_validator_function as with_info_plain_validator_function, # noqa: F401
)
RequiredParam = PydanticUndefined
Undefined = PydanticUndefined
UndefinedType = PydanticUndefinedType
evaluate_forwardref = eval_type_lenient
Validator = Any
# TODO: remove when dropping support for Pydantic < v2.12.3
_Attrs = {
@@ -87,14 +78,6 @@ def asdict(field_info: FieldInfo) -> dict[str, Any]:
}
class BaseConfig:
pass
class ErrorWrapper(Exception):
pass
@dataclass
class ModelField:
field_info: FieldInfo
@@ -143,8 +126,8 @@ class ModelField:
warnings.simplefilter(
"ignore", category=UnsupportedFieldAttributeWarning
)
# TODO: remove after dropping support for Python 3.8 and
# setting the min Pydantic to v2.12.3 that adds asdict()
# TODO: remove after setting the min Pydantic to v2.12.3
# that adds asdict(), and use self.field_info.asdict() instead
field_dict = asdict(self.field_info)
annotated_args = (
field_dict["annotation"],
@@ -432,10 +415,11 @@ def get_flat_models_from_annotation(
origin = get_origin(annotation)
if origin is not None:
for arg in get_args(annotation):
if lenient_issubclass(arg, (BaseModel, Enum)) and arg not in known_models:
known_models.add(arg)
if lenient_issubclass(arg, BaseModel):
get_flat_models_from_model(arg, known_models=known_models)
if lenient_issubclass(arg, (BaseModel, Enum)):
if arg not in known_models:
known_models.add(arg) # type: ignore[arg-type]
if lenient_issubclass(arg, BaseModel):
get_flat_models_from_model(arg, known_models=known_models)
else:
get_flat_models_from_annotation(arg, known_models=known_models)
return known_models

View File

@@ -512,7 +512,6 @@ def analyze_param(
type_=use_annotation_from_field_info,
default=field_info.default,
alias=alias,
required=field_info.default in (RequiredParam, Undefined),
field_info=field_info,
)
if is_path_param:
@@ -523,11 +522,7 @@ def analyze_param(
assert (
is_scalar_field(field)
or is_scalar_sequence_field(field)
or (
lenient_issubclass(field.type_, BaseModel)
# For Pydantic v1
and getattr(field, "shape", 1) == 1
)
or lenient_issubclass(field.type_, BaseModel)
), f"Query parameter {param_name!r} must be one of the supported types"
return ParamDetails(type_annotation=type_annotation, depends=depends, field=field)
@@ -1021,7 +1016,6 @@ def get_body_field(
final_field = create_model_field(
name="body",
type_=BodyModel,
required=required,
alias="body",
field_info=BodyFieldInfo(**BodyFieldInfo_kwargs),
)

View File

@@ -143,10 +143,7 @@ class Schema(BaseModelWithConfig):
else_: Optional["SchemaOrBool"] = Field(default=None, alias="else")
dependentSchemas: Optional[dict[str, "SchemaOrBool"]] = None
prefixItems: Optional[list["SchemaOrBool"]] = None
# TODO: uncomment and remove below when deprecating Pydantic v1
# It generates a list of schemas for tuples, before prefixItems was available
# items: Optional["SchemaOrBool"] = None
items: Optional[Union["SchemaOrBool", list["SchemaOrBool"]]] = None
items: Optional["SchemaOrBool"] = None
contains: Optional["SchemaOrBool"] = None
properties: Optional[dict[str, "SchemaOrBool"]] = None
patternProperties: Optional[dict[str, "SchemaOrBool"]] = None

View File

@@ -10,12 +10,10 @@ from fastapi._compat import (
ModelField,
Undefined,
get_definitions,
get_schema_from_model_field,
lenient_issubclass,
)
from fastapi._compat.v2 import (
get_flat_models_from_fields,
get_model_name_map,
get_schema_from_model_field,
lenient_issubclass,
)
from fastapi.datastructures import DefaultPlaceholder
from fastapi.dependencies.models import Dependant

View File

@@ -34,7 +34,6 @@ from fastapi import params
from fastapi._compat import (
ModelField,
Undefined,
annotation_is_pydantic_v1,
lenient_issubclass,
)
from fastapi.datastructures import Default, DefaultPlaceholder
@@ -52,7 +51,6 @@ from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import (
EndpointContext,
FastAPIError,
PydanticV1NotSupportedError,
RequestValidationError,
ResponseValidationError,
WebSocketRequestValidationError,
@@ -638,11 +636,6 @@ class APIRoute(routing.Route):
f"Status code {status_code} must not have a response body"
)
response_name = "Response_" + self.unique_id
if annotation_is_pydantic_v1(self.response_model):
raise PydanticV1NotSupportedError(
"pydantic.v1 models are no longer supported by FastAPI."
f" Please update the response model {self.response_model!r}."
)
self.response_field = create_model_field(
name=response_name,
type_=self.response_model,
@@ -664,11 +657,6 @@ class APIRoute(routing.Route):
f"Status code {additional_status_code} must not have a response body"
)
response_name = f"Response_{additional_status_code}_{self.unique_id}"
if annotation_is_pydantic_v1(model):
raise PydanticV1NotSupportedError(
"pydantic.v1 models are no longer supported by FastAPI."
f" In responses={{}}, please update {model}."
)
response_field = create_model_field(
name=response_name, type_=model, mode="serialization"
)

View File

@@ -9,12 +9,9 @@ from typing import (
import fastapi
from fastapi._compat import (
BaseConfig,
ModelField,
PydanticSchemaGenerationError,
Undefined,
UndefinedType,
Validator,
annotation_is_pydantic_v1,
)
from fastapi.datastructures import DefaultPlaceholder, DefaultType
@@ -63,26 +60,19 @@ _invalid_args_message = (
def create_model_field(
name: str,
type_: Any,
class_validators: Optional[dict[str, Validator]] = None,
default: Optional[Any] = Undefined,
required: Union[bool, UndefinedType] = Undefined,
model_config: Union[type[BaseConfig], None] = None,
field_info: Optional[FieldInfo] = None,
alias: Optional[str] = None,
mode: Literal["validation", "serialization"] = "validation",
version: Literal["1", "auto"] = "auto",
) -> ModelField:
if annotation_is_pydantic_v1(type_):
raise PydanticV1NotSupportedError(
"pydantic.v1 models are no longer supported by FastAPI."
f" Please update the response model {type_!r}."
)
class_validators = class_validators or {}
field_info = field_info or FieldInfo(annotation=type_, default=default, alias=alias)
kwargs = {"mode": mode, "name": name, "field_info": field_info}
try:
return v2.ModelField(**kwargs) # type: ignore[arg-type]
return v2.ModelField(mode=mode, name=name, field_info=field_info)
except PydanticSchemaGenerationError:
raise fastapi.exceptions.FastAPIError(
_invalid_args_message.format(type_=type_)