Files
OpenLLM/src/openllm/_configuration.py

616 lines
28 KiB
Python

# Copyright 2023 BentoML Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Configuration utilities for OpenLLM. All model configuration will inherit from openllm.configuration_utils.LLMConfig.
Note that ``openllm.LLMConfig`` is a subclass of ``pydantic.BaseModel``. It also
has a ``to_click_options`` that returns a list of Click-compatible options for the model.
Such options will then be parsed to ``openllm.__main__.cli``.
Each fields in ``openllm.LLMConfig`` will also automatically generate a environment
variable based on its name field.
For example, the following config class:
```python
class FlanT5Config(openllm.LLMConfig):
class GenerationConfig:
temperature: float = 0.75
max_new_tokens: int = 3000
top_k: int = 50
top_p: float = 0.4
repetition_penalty = 1.0
```
"""
from __future__ import annotations
import os
import types
import typing as t
from abc import ABC
import click
import inflection
import orjson
import pydantic
import yaml
from bentoml._internal.models.model import ModelSignature
from click_option_group import optgroup
import openllm
from .exceptions import GpuNotAvailableError, OpenLLMException
from .utils import LazyType
from .utils.dantic import allows_multiple, parse_default
if t.TYPE_CHECKING:
P = t.ParamSpec("P")
F = t.Callable[P, t.Any]
ReprArgs: t.TypeAlias = t.Iterable[tuple[str | None, t.Any]]
import tensorflow as tf
import torch
import transformers
from pydantic.fields import FieldInfo
from transformers.generation.beam_constraints import Constraint
DictStrAny = dict[str, t.Any]
else:
from transformers.utils.dummy_pt_objects import Constraint
DictStrAny = dict
transformers = openllm.utils.LazyLoader("transformers", globals(), "transformers")
torch = openllm.utils.LazyLoader("torch", globals(), "torch")
tf = openllm.utils.LazyLoader("tf", globals(), "tensorflow")
__all__ = ["LLMConfig", "ModelSignature"]
_object_setattr = object.__setattr__
def field_to_options(
name: str, field: FieldInfo, model_name: str, suffix_generation: bool = False
) -> t.Callable[[F[P]], F[P]]:
# TODO: support parsing nested model in FieldInfo
envvar = field.json_schema_extra.get("env") if field.json_schema_extra else None
dasherized = inflection.dasherize(name)
underscored = inflection.underscore(name)
full_option_name = f"--{dasherized}"
if field.annotation is bool:
full_option_name += f"/--no-{dasherized}"
if suffix_generation:
identifier = f"{model_name}_generation_{underscored}"
else:
identifier = f"{model_name}_{underscored}"
return optgroup.option(
identifier,
full_option_name,
type=field.annotation,
required=field.is_required(),
default=parse_default(field.default, field.annotation),
show_default=True if field.default else False,
multiple=allows_multiple(field.annotation),
help=field.description,
show_envvar=True if envvar else False,
envvar=envvar,
)
def generate_kwargs_from_envvar(model: GenerationConfig | LLMConfig) -> dict[str, t.Any]:
# NOTE: We can safe cast here since all of the fields in GenerationConfig or LLMConfig
# will have a `env` field in `json_schema_extra`
return {
key: os.environ.get(t.cast("dict[str, t.Any]", field.json_schema_extra)["env"], field.default)
for key, field in model.model_fields.items()
}
class GenerationConfig(pydantic.BaseModel):
"""Generation config provides the configuration to then be parsed to ``transformers.GenerationConfig``,
with some additional validation and environment constructor.
Note that we always set `do_sample=True` and `return_dict_in_generate=False`
"""
# NOTE: parameters for controlling the length of the output
max_new_tokens: int = pydantic.Field(
20, ge=0, description="The maximum numbers of tokens to generate, ignoring the number of tokens in the prompt."
)
min_length: int = pydantic.Field(
0,
ge=0,
description="""The minimum length of the sequence to be generated. Corresponds to the length of the
input prompt + `min_new_tokens`. Its effect is overridden by `min_new_tokens`, if also set.""",
)
min_new_tokens: int = pydantic.Field(
None, description="The minimum numbers of tokens to generate, ignoring the number of tokens in the prompt."
)
early_stopping: bool = pydantic.Field(
False,
description="""Controls the stopping condition for beam-based methods, like beam-search. It accepts the
following values:
- `True`, where the generation stops as soon as there are `num_beams` complete candidates;
- `False`, where an heuristic is applied and the generation stops when is it very unlikely to find
better candidates;
- `"never"`, where the beam search procedure only stops when there cannot be better candidates
(canonical beam search algorithm)
""",
)
max_time: float = pydantic.Field(
None,
description="""The maximum amount of time you allow the computation to run for in seconds. generation will
still finish the current pass after allocated time has been passed.""",
)
# NOTE: Parameters for controling generaiton strategies
num_beams: int = pydantic.Field(1, description="Number of beams for beam search. 1 means no beam search.")
num_beam_groups: int = pydantic.Field(
1,
description="""Number of groups to divide `num_beams` into in order to ensure diversity among different
groups of beams. [this paper](https://arxiv.org/pdf/1610.02424.pdf) for more details.""",
)
penalty_alpha: float = pydantic.Field(
None,
description="""The values balance the model confidence and the degeneration penalty in
contrastive search decoding.""",
)
use_cache: bool = pydantic.Field(
True,
description="""Whether or not the model should use the past last
key/values attentions (if applicable to the model) to speed up decoding.""",
)
# NOTE: Parameters for manipulation of the model output logits
temperature: float = pydantic.Field(
1.0, ge=0.0, le=1.0, description="The value used to modulate the next token probabilities."
)
top_k: int = pydantic.Field(
50, description="The number of highest probability vocabulary tokens to keep for top-k-filtering."
)
top_p: float = pydantic.Field(
1.0,
description="""If set to float < 1, only the smallest set of most probable tokens with
probabilities that add up to `top_p` or higher are kept for generation.""",
)
typical_p: float = pydantic.Field(
1.0,
description="""Local typicality measures how similar the conditional probability of predicting a target
token next is to the expected conditional probability of predicting a random token next, given the
partial text already generated. If set to float < 1, the smallest set of the most locally typical
tokens with probabilities that add up to `typical_p` or higher are kept for generation. See [this
paper](https://arxiv.org/pdf/2202.00666.pdf) for more details.
""",
)
epsilon_cutoff: float = pydantic.Field(
0.0,
description="""\
If set to float strictly between 0 and 1, only tokens with a conditional probability greater than
`epsilon_cutoff` will be sampled. In the paper, suggested values range from 3e-4 to 9e-4, depending on the
size of the model. See [Truncation Sampling as Language Model Desmoothing](https://arxiv.org/abs/2210.15191)
for more details.
""",
)
eta_cutoff: float = pydantic.Field(
0.0,
description="""Eta sampling is a hybrid of locally typical sampling and epsilon sampling.
If set to float strictly between 0 and 1, a token is only considered if it is greater than
either `eta_cutoff` or `sqrt(eta_cutoff) * exp(-entropy(softmax(next_token_logits)))`. The latter term is
intuitively the expected next token probability, scaled by `sqrt(eta_cutoff)`. In the paper, suggested
values range from 3e-4 to 2e-3, depending on the size of the model.
See [Truncation Sampling as Language Model Desmoothing](https://arxiv.org/abs/2210.15191) for more details.
""",
)
diversity_penalty: float = pydantic.Field(
0.0,
description="""This value is subtracted from a beam's score if it generates a token same
as any beam from other group at a particular time. Note that `diversity_penalty` is only
effective if `group beam search` is enabled.
""",
)
repetition_penalty: float = pydantic.Field(
1.0,
description="""The parameter for repetition penalty. 1.0 means no penalty.
See [this paper](https://arxiv.org/pdf/1909.05858.pdf) for more details.""",
)
encoder_repetition_penalty: float = pydantic.Field(
1.0,
description="""The paramater for encoder_repetition_penalty. An exponential penalty on sequences that are
not in the original input. 1.0 means no penalty.""",
)
length_penalty: float = pydantic.Field(
1.0,
description="""Exponential penalty to the length that is used with beam-based generation. It is applied
as an exponent to the sequence length, which in turn is used to divide the score of the sequence. Since
the score is the log likelihood of the sequence (i.e. negative), `length_penalty` > 0.0 promotes longer
sequences, while `length_penalty` < 0.0 encourages shorter sequences.
""",
)
no_repeat_ngram_size: int = pydantic.Field(
0, description="If set to int > 0, all ngrams of that size can only occur once."
)
bad_words_ids: t.List[t.List[int]] = pydantic.Field(
None,
description="""List of token ids that are not allowed to be generated. In order to get the token ids
of the words that should not appear in the generated text, use
`tokenizer(bad_words, add_prefix_space=True, add_special_tokens=False).input_ids`.
""",
)
# NOTE: t.Union is not yet supported on CLI, but the environment variable should already be available.
force_words_ids: t.Union[t.List[t.List[int]], t.List[t.List[t.List[int]]]] = pydantic.Field(
None,
description="""List of token ids that must be generated. If given a `List[List[int]]`, this is treated
as a simple list of words that must be included, the opposite to `bad_words_ids`.
If given `List[List[List[int]]]`, this triggers a
[disjunctive constraint](https://github.com/huggingface/transformers/issues/14081), where one
can allow different forms of each word.
""",
)
renormalize_logits: bool = pydantic.Field(
False,
description="""Whether to renormalize the logits after applying all the logits processors or warpers
(including the custom ones). It's highly recommended to set this flag to `True` as the search
algorithms suppose the score logits are normalized but some logit processors or warpers break the normalization.
""",
)
constraints: t.List["Constraint"] = pydantic.Field(
None,
description="""Custom constraints that can be added to the generation to ensure that the output
will contain the use of certain tokens as defined by ``Constraint`` objects, in the most sensible way possible.
""",
)
forced_bos_token_id: int = pydantic.Field(
None,
description="""The id of the token to force as the first generated token after the
``decoder_start_token_id``. Useful for multilingual models like
[mBART](https://huggingface.co/docs/transformers/model_doc/mbart) where the first generated token needs
to be the target language token.
""",
)
forced_eos_token_id: t.Union[int, t.List[int]] = pydantic.Field(
None,
description="""The id of the token to force as the last generated token when `max_length` is reached.
Optionally, use a list to set multiple *end-of-sequence* tokens.""",
)
remove_invalid_values: bool = pydantic.Field(
False,
description="""Whether to remove possible *nan* and *inf* outputs of the model to prevent the
generation method to crash. Note that using `remove_invalid_values` can slow down generation.""",
)
exponential_decay_length_penalty: t.Tuple[int, float] = pydantic.Field(
None,
description="""This tuple adds an exponentially increasing length penalty, after a certain amount of tokens
have been generated. The tuple shall consist of: `(start_index, decay_factor)` where `start_index`
indicates where penalty starts and `decay_factor` represents the factor of exponential decay
""",
)
suppress_tokens: t.List[int] = pydantic.Field(
None,
description="""A list of tokens that will be suppressed at generation. The `SupressTokens` logit
processor will set their log probs to `-inf` so that they are not sampled.
""",
)
begin_suppress_tokens: t.List[int] = pydantic.Field(
None,
description="""A list of tokens that will be suppressed at the beginning of the generation. The
`SupressBeginTokens` logit processor will set their log probs to `-inf` so that they are not sampled.
""",
)
forced_decoder_ids: t.List[t.List[int]] = pydantic.Field(
None,
description="""A list of pairs of integers which indicates a mapping from generation indices to token indices
that will be forced before sampling. For example, `[[1, 123]]` means the second generated token will always
be a token of index 123.
""",
)
# NOTE: Parameters that define the output variables of `generate`
num_return_sequences: int = pydantic.Field(
1, description="The number of independently computed returned sequences for each element in the batch."
)
output_attentions: bool = pydantic.Field(
False,
description="""Whether or not to return the attentions tensors of all attention layers.
See `attentions` under returned tensors for more details. """,
)
output_hidden_states: bool = pydantic.Field(
False,
description="""Whether or not to return the hidden states of all layers.
See `hidden_states` under returned tensors for more details.
""",
)
output_scores: bool = pydantic.Field(
False,
description="""Whether or not to return the prediction scores. See `scores` under returned
tensors for more details.""",
)
# NOTE: Special tokens that can be used at generation time
pad_token_id: int = pydantic.Field(None, description="The id of the *padding* token.")
bos_token_id: int = pydantic.Field(None, description="The id of the *beginning-of-sequence* token.")
eos_token_id: t.Union[int, t.List[int]] = pydantic.Field(
None,
description="""The id of the *end-of-sequence* token. Optionally, use a list to set
multiple *end-of-sequence* tokens.""",
)
# NOTE: Generation parameters exclusive to encoder-decoder models
encoder_no_repeat_ngram_size: int = pydantic.Field(
0,
description="""If set to int > 0, all ngrams of that size that occur in the
`encoder_input_ids` cannot occur in the `decoder_input_ids`.
""",
)
decoder_start_token_id: int = pydantic.Field(
None,
description="""If an encoder-decoder model starts decoding with a
different token than *bos*, the id of that token.
""",
)
# NOTE: pydantic definition
model_config = {"extra": "forbid", "arbitrary_types_allowed": True}
if t.TYPE_CHECKING:
# The following is handled via __pydantic_init_subclass__
__openllm_env_name__: str
__openllm_model_name__: str
def __init_subclass__(cls, *, _internal: bool = False, **attrs: t.Any) -> None:
if not _internal:
raise RuntimeError(
"GenerationConfig is not meant to be used directly, "
"but you can access this via a LLMConfig.generation_config"
)
model_name = attrs.get("model_name", None)
if model_name is None:
raise RuntimeError("Failed to initialize GenerationConfig subclass (missing model_name)")
cls.__openllm_model_name__ = inflection.underscore(model_name)
cls.__openllm_env_name__ = cls.__openllm_model_name__.upper()
@classmethod
def construct_from_llm_config(cls, llm_config: type[LLMConfig]) -> GenerationConfig:
"""Parse ModelConfig.GenerationConfig into a GenerationConfig object."""
return cls.model_validate(
{
k: v
for k, v in vars(llm_config.GenerationConfig).items()
if not k.startswith("_") and k in cls.model_fields
}
)
def model_post_init(self, _: t.Any):
# NOTE: I don't know how to do this more efficiently in pydantic v2 yet, will probably
# need to consult the pydantic team on this.
for key, field in self.model_fields.items():
if not field.json_schema_extra:
field.json_schema_extra = {}
if "env" in field.json_schema_extra:
continue
field.json_schema_extra["env"] = f"OPENLLM_{self.__openllm_env_name__}_GENERATION_{key.upper()}"
class LLMConfig(pydantic.BaseModel, ABC):
# NOTE: we need this to allow such that we can parse generation_config from configuration.
model_config = {"extra": "allow"}
def __getattr__(self, attr: t.Any) -> t.Any:
if attr in self.generation_config.model_fields:
return getattr(self.generation_config, attr)
return getattr(self, attr)
if t.TYPE_CHECKING:
# The following is handled via __pydantic_init_subclass__, and is only used for TYPE_CHECKING
__openllm_model_name__: str
__openllm_start_name__: str
__openllm_timeout__: int = 3600
__openllm_name_type__: t.Literal["dasherize", "lowercase"] = "dasherize"
__openllm_trust_remote_code__: bool = False
__openllm_requires_gpu__: bool = False
__openllm_env__: openllm.utils.ModelEnv
GenerationConfig: type[t.Any] = GenerationConfig
def __init_subclass__(
cls,
*,
default_timeout: int | None = None,
name_type: t.Literal["dasherize", "lowercase"] = "dasherize",
trust_remote_code: bool = False,
requires_gpu: bool = False,
**attrs: t.Any,
):
if default_timeout is None:
default_timeout = 3600
cls.__openllm_timeout__ = default_timeout
if name_type not in ("dasherize", "lowercase"):
raise RuntimeError(f"Unknown name_type {name_type}. Only allowed are 'dasherize' and 'lowercase'.")
cls.__openllm_name_type__ = name_type
cls.__openllm_trust_remote_code__ = trust_remote_code
cls.__openllm_requires_gpu__ = requires_gpu
super(LLMConfig, cls).__init_subclass__(**attrs)
@classmethod
def check_if_gpu_is_available(cls, implementation: t.Literal["pt", "tf", "flax"] = "pt"):
try:
if cls.__openllm_requires_gpu__:
if implementation in ("tf", "flax") and len(tf.config.list_physical_devices("GPU")) == 0:
raise OpenLLMException("Required GPU for given model")
else:
if not torch.cuda.is_available():
raise OpenLLMException("Required GPU for given model")
except OpenLLMException:
raise GpuNotAvailableError(f"{cls} only supports running with GPU (None available).") from None
@classmethod
def __pydantic_init_subclass__(cls, **_: t.Any):
if cls.__openllm_name_type__ == "dasherize":
cls.__openllm_model_name__ = inflection.underscore(cls.__name__.replace("Config", ""))
cls.__openllm_start_name__ = inflection.dasherize(cls.__openllm_model_name__)
else:
cls.__openllm_model_name__ = cls.__name__.replace("Config", "").lower()
cls.__openllm_start_name__ = cls.__openllm_model_name__
cls.__openllm_env__ = openllm.utils.ModelEnv(cls.__openllm_model_name__)
if hasattr(cls, "GenerationConfig"):
cls.generation_config = t.cast(
"type[GenerationConfig]",
types.new_class(
cls.__name__.replace("Config", "") + "GenerationConfig",
(GenerationConfig,),
{"model_name": cls.__openllm_model_name__, "_internal": True},
),
).construct_from_llm_config(cls)
delattr(cls, "GenerationConfig")
for key, field in cls.model_fields.items():
if not field.json_schema_extra:
field.json_schema_extra = {}
if "env" in field.json_schema_extra:
continue
field.json_schema_extra["env"] = f"OPENLLM_{cls.__openllm_model_name__.upper()}_{key.upper()}"
def model_post_init(self, _: t.Any):
if self.__pydantic_extra__:
generation_config: dict[str, t.Any] | None = self.__pydantic_extra__.pop("generation_config", None)
if generation_config is not None:
assert LazyType[DictStrAny](dict).isinstance(generation_config), "generation_config must be a dict."
self.generation_config = self.generation_config.model_copy(update=generation_config, deep=True)
else:
# The rest of the extras fields should just be the generation_config.
self.generation_config = self.generation_config.model_copy(update=self.__pydantic_extra__, deep=True)
# NOTE: Non need to maintain key from __pydantic_extra__ that is already parsed into generation_config
_object_setattr(
self,
"__pydantic_extra__",
{k: v for k, v in self.__pydantic_extra__.items() if k not in self.generation_config.model_fields},
)
def model_dump_yaml(self):
try:
return yaml.safe_dump(self.model_dump(), sort_keys=False)
except yaml.YAMLError as e:
raise openllm.exceptions.ValidationError(f"Failed to dump configuration to yaml: {e}") from e
@classmethod
def model_validate_yaml(cls, path: str, ctx: str | None = None, strict: bool = True) -> LLMConfig:
with open(openllm.utils.resolve_user_filepath(path, ctx=ctx), "rb") as f:
content = yaml.safe_load(f)
try:
return cls.model_validate(content, strict=strict)
except pydantic.ValidationError as e:
raise openllm.exceptions.ValidationError(f"Failed to parse configuration to {cls}: {e}") from e
def model_dump(self, flatten: bool = False, **attrs: t.Any):
try:
to_dump = super().model_dump(**attrs)
generation_config = self.generation_config.model_dump(exclude_none=True)
if not flatten:
to_dump["generation_config"] = generation_config
else:
to_dump.update(generation_config)
return to_dump
except pydantic.ValidationError as e:
raise openllm.exceptions.ValidationError(f"Failed to dump configuration to dict: {e}") from e
@classmethod
def model_construct_env(cls, __llm_config__: LLMConfig | None = None, **attrs: t.Any) -> LLMConfig:
"""A helpers that respect configuration values that
sets from environment variables for any given configuration class.
"""
env_json_string = os.environ.get(cls.__openllm_env__.model_config, None)
if env_json_string is not None:
try:
self = cls.model_construct(**orjson.loads(env_json_string))
except pydantic.ValidationError as e:
raise RuntimeError(f"Failed to parse '{cls.__openllm_env__.model_config}' as valid JSON string.") from e
else:
self = cls.model_construct()
if __llm_config__ is not None:
# NOTE: Only hit this branch on the server. Client shouldn't use __llm_config__
# as it is not set.
return self.model_construct(**__llm_config__.model_dump(flatten=True))
# filtered out None values
attrs = {k: v for k, v in attrs.items() if v is not None}
construct_attrs = generate_kwargs_from_envvar(self)
construct_attrs.update(generate_kwargs_from_envvar(self.generation_config))
construct_attrs.update(attrs)
return self.model_construct(**construct_attrs)
def model_validate_click(self, **attrs: t.Any) -> tuple[LLMConfig, dict[str, t.Any]]:
"""Parse given click attributes into a LLMConfig and return the remaining click attributes."""
llm_config_attrs = {}
key_to_remove: list[str] = []
for k, v in attrs.items():
if k.startswith(f"{self.__openllm_model_name__}_"):
llm_config_attrs[k[len(self.__openllm_model_name__) + 1 :]] = v
key_to_remove.append(k)
elif k.startswith(f"{self.__openllm_model_name__}_generation_"):
llm_config_attrs[k[len(self.__openllm_model_name__ + "_generation") + 1 :]] = v
key_to_remove.append(k)
return self.model_construct_env(**llm_config_attrs), {k: v for k, v in attrs.items() if k not in key_to_remove}
@t.overload
def to_generation_config(self, return_as_dict: t.Literal[True] = ...) -> dict[str, t.Any]:
...
@t.overload
def to_generation_config(self, return_as_dict: t.Literal[False] = ...) -> transformers.GenerationConfig:
...
def to_generation_config(self, return_as_dict: bool = False) -> transformers.GenerationConfig | dict[str, t.Any]:
config = transformers.GenerationConfig(**self.generation_config.model_dump())
if return_as_dict:
output = config.to_dict()
output.pop("transformers_version")
return output
return config
def to_click_options(self, f: F[P]) -> t.Callable[[F[P]], click.Command]:
"""
Convert current model to click options. This can be used as a decorator for click commands.
Note that the identifier for all LLMConfig will be prefixed with '<model_name>_*', and the generation config
will be prefixed with '<model_name>_generation_*'.
"""
for name, field in self.generation_config.model_fields.items():
if t.get_origin(field.annotation) is t.Union:
# NOTE: Union type is currently not yet supported, we probably just need to use environment instead.
continue
f = field_to_options(name, field, self.__openllm_model_name__, suffix_generation=True)(f)
f = optgroup.group(f"{self.__class__.__name__} generation options")(f)
if len(self.model_fields.values()) == 0:
return f
for name, field in self.model_fields.items():
if t.get_origin(field.annotation) is t.Union:
# NOTE: Union type is currently not yet supported, we probably just need to use environment instead.
continue
f = field_to_options(name, field, self.__openllm_model_name__)(f)
return optgroup.group(f"{self.__class__.__name__} options")(f)