Compare commits

..

1 Commits

Author SHA1 Message Date
Alex Cheema
24418e3788 fix: import ResponsesStreamEvent and DRY up SSE formatting in responses adapter
ResponsesStreamEvent was defined in openai_responses.py but never imported
or used anywhere. Import it in the responses adapter and add a _format_sse()
helper that uses the event's .type field, replacing 13 hardcoded SSE format
strings with a single typed function.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:14:46 -08:00
4 changed files with 30 additions and 79 deletions

View File

@@ -26,6 +26,7 @@ from exo.shared.types.openai_responses import (
ResponseOutputText,
ResponsesRequest,
ResponsesResponse,
ResponsesStreamEvent,
ResponseTextDeltaEvent,
ResponseTextDoneEvent,
ResponseUsage,
@@ -33,6 +34,11 @@ from exo.shared.types.openai_responses import (
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
def _format_sse(event: ResponsesStreamEvent) -> str:
"""Format a streaming event as an SSE message."""
return f"event: {event.type}\ndata: {event.model_dump_json()}\n\n"
def _extract_content(content: str | list[ResponseContentPart]) -> str:
"""Extract plain text from a content field that may be a string or list of parts."""
if isinstance(content, str):
@@ -207,13 +213,13 @@ async def generate_responses_stream(
created_event = ResponseCreatedEvent(
sequence_number=next(seq), response=initial_response
)
yield f"event: response.created\ndata: {created_event.model_dump_json()}\n\n"
yield _format_sse(created_event)
# response.in_progress
in_progress_event = ResponseInProgressEvent(
sequence_number=next(seq), response=initial_response
)
yield f"event: response.in_progress\ndata: {in_progress_event.model_dump_json()}\n\n"
yield _format_sse(in_progress_event)
# response.output_item.added
initial_item = ResponseMessageItem(
@@ -224,7 +230,7 @@ async def generate_responses_stream(
item_added = ResponseOutputItemAddedEvent(
sequence_number=next(seq), output_index=0, item=initial_item
)
yield f"event: response.output_item.added\ndata: {item_added.model_dump_json()}\n\n"
yield _format_sse(item_added)
# response.content_part.added
initial_part = ResponseOutputText(text="")
@@ -235,7 +241,7 @@ async def generate_responses_stream(
content_index=0,
part=initial_part,
)
yield f"event: response.content_part.added\ndata: {part_added.model_dump_json()}\n\n"
yield _format_sse(part_added)
accumulated_text = ""
function_call_items: list[ResponseFunctionCallItem] = []
@@ -266,7 +272,7 @@ async def generate_responses_stream(
output_index=next_output_index,
item=fc_item,
)
yield f"event: response.output_item.added\ndata: {fc_added.model_dump_json()}\n\n"
yield _format_sse(fc_added)
# response.function_call_arguments.delta
args_delta = ResponseFunctionCallArgumentsDeltaEvent(
@@ -275,7 +281,7 @@ async def generate_responses_stream(
output_index=next_output_index,
delta=tool.arguments,
)
yield f"event: response.function_call_arguments.delta\ndata: {args_delta.model_dump_json()}\n\n"
yield _format_sse(args_delta)
# response.function_call_arguments.done
args_done = ResponseFunctionCallArgumentsDoneEvent(
@@ -285,7 +291,7 @@ async def generate_responses_stream(
name=tool.name,
arguments=tool.arguments,
)
yield f"event: response.function_call_arguments.done\ndata: {args_done.model_dump_json()}\n\n"
yield _format_sse(args_done)
# response.output_item.done
fc_done_item = ResponseFunctionCallItem(
@@ -300,7 +306,7 @@ async def generate_responses_stream(
output_index=next_output_index,
item=fc_done_item,
)
yield f"event: response.output_item.done\ndata: {fc_item_done.model_dump_json()}\n\n"
yield _format_sse(fc_item_done)
function_call_items.append(fc_done_item)
next_output_index += 1
@@ -316,7 +322,7 @@ async def generate_responses_stream(
content_index=0,
delta=chunk.text,
)
yield f"event: response.output_text.delta\ndata: {delta_event.model_dump_json()}\n\n"
yield _format_sse(delta_event)
# response.output_text.done
text_done = ResponseTextDoneEvent(
@@ -326,7 +332,7 @@ async def generate_responses_stream(
content_index=0,
text=accumulated_text,
)
yield f"event: response.output_text.done\ndata: {text_done.model_dump_json()}\n\n"
yield _format_sse(text_done)
# response.content_part.done
final_part = ResponseOutputText(text=accumulated_text)
@@ -337,7 +343,7 @@ async def generate_responses_stream(
content_index=0,
part=final_part,
)
yield f"event: response.content_part.done\ndata: {part_done.model_dump_json()}\n\n"
yield _format_sse(part_done)
# response.output_item.done
final_message_item = ResponseMessageItem(
@@ -348,7 +354,7 @@ async def generate_responses_stream(
item_done = ResponseOutputItemDoneEvent(
sequence_number=next(seq), output_index=0, item=final_message_item
)
yield f"event: response.output_item.done\ndata: {item_done.model_dump_json()}\n\n"
yield _format_sse(item_done)
# Create usage from usage data if available
usage = None
@@ -373,4 +379,4 @@ async def generate_responses_stream(
completed_event = ResponseCompletedEvent(
sequence_number=next(seq), response=final_response
)
yield f"event: response.completed\ndata: {completed_event.model_dump_json()}\n\n"
yield _format_sse(completed_event)

View File

@@ -165,7 +165,6 @@ def is_custom_card(model_id: ModelId) -> bool:
class ConfigData(BaseModel):
model_config = {"extra": "ignore"} # Allow unknown fields
model_type: str | None = None
architectures: list[str] | None = None
hidden_size: Annotated[int, Field(ge=0)] | None = None
layer_count: int = Field(
@@ -201,7 +200,6 @@ class ConfigData(BaseModel):
return data
for field in [
"model_type",
"architectures",
"hidden_size",
"num_hidden_layers",

View File

@@ -269,52 +269,19 @@ def get_tokenizer(model_path: Path, shard_metadata: ShardMetadata) -> TokenizerW
return load_tokenizer_for_model_id(shard_metadata.model_card.model_id, model_path)
def _read_model_type_from_config(model_path: Path) -> str | None:
"""Read the model_type field from config.json at the given model path.
Returns None if config.json doesn't exist or doesn't contain model_type.
def get_eos_token_ids_for_model(model_id: ModelId) -> list[int] | None:
"""
config_path = model_path / "config.json"
if not config_path.exists():
return None
try:
with open(config_path) as f:
config: dict[str, Any] = json.load(f) # pyright: ignore[reportAny]
model_type: Any = config.get("model_type")
if model_type is None:
text_config: Any = config.get("text_config")
if isinstance(text_config, dict):
model_type = text_config.get("model_type") # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType]
return model_type if isinstance(model_type, str) else None
except (json.JSONDecodeError, OSError):
return None
Get the EOS token IDs for a model based on its ID.
def get_eos_token_ids_for_model(
model_id: ModelId, model_type: str | None = None
) -> list[int] | None:
"""Get the EOS token IDs for a model based on its architecture type.
Uses model_type from config.json when available, falls back to model_id
string matching for backward compatibility.
Some models require explicit EOS token configuration that isn't in their
tokenizer config. This function returns the known EOS token IDs for such models.
Args:
model_id: The HuggingFace model ID
model_type: The model_type field from config.json (e.g., "kimi", "glm4")
Returns:
List of EOS token IDs, or None if the model uses standard tokenizer config
"""
if model_type is not None:
if model_type == "kimi":
return [163586]
elif model_type == "glm4_moe_lite":
# 154820: <|endoftext|>, 154827: <|user|>, 154829: <|observation|>
return [154820, 154827, 154829]
elif model_type.startswith("glm"):
return [151336, 151329, 151338]
# Fallback: string matching on model_id
model_id_lower = model_id.lower()
if "kimi-k2" in model_id_lower:
return [163586]
@@ -329,10 +296,11 @@ def get_eos_token_ids_for_model(
def load_tokenizer_for_model_id(
model_id: ModelId, model_path: Path
) -> TokenizerWrapper:
"""Load tokenizer for a model given its ID and local path.
"""
Load tokenizer for a model given its ID and local path.
Uses model_type from config.json for architecture detection when available,
falling back to model_id string matching for backward compatibility.
This is the core tokenizer loading logic, handling special cases for different
model families (Kimi, GLM, etc.) and transformers 5.x compatibility.
Args:
model_id: The HuggingFace model ID (e.g., "moonshotai/Kimi-K2-Instruct")
@@ -341,21 +309,11 @@ def load_tokenizer_for_model_id(
Returns:
TokenizerWrapper instance configured for the model
"""
model_type = _read_model_type_from_config(model_path)
model_id_lower = model_id.lower()
eos_token_ids = get_eos_token_ids_for_model(model_id, model_type=model_type)
is_kimi = (
model_type == "kimi" if model_type is not None else "kimi-k2" in model_id_lower
)
is_gemma3 = (
model_type == "gemma3"
if model_type is not None
else "gemma-3" in model_id_lower
)
eos_token_ids = get_eos_token_ids_for_model(model_id)
# Kimi uses a custom TikTokenTokenizer that transformers 5.x can't load via AutoTokenizer
if is_kimi:
if "kimi-k2" in model_id_lower:
import importlib.util
import types
@@ -409,7 +367,7 @@ def load_tokenizer_for_model_id(
eos_token_ids=eos_token_ids,
)
if is_gemma3:
if "gemma-3" in model_id_lower:
gemma_3_eos_id = 1
gemma_3_end_of_turn_id = 106
if tokenizer.eos_token_ids is not None:

View File

@@ -24,7 +24,6 @@ from exo.worker.engines.mlx.utils_mlx import (
# Files needed for tokenizer functionality
TOKENIZER_FILE_PATTERNS = [
"config.json",
"tokenizer.json",
"tokenizer_config.json",
"special_tokens_map.json",
@@ -339,9 +338,6 @@ async def test_kimi_tokenizer_specifically():
# Verify EOS token is set
assert eos_token_ids == [163586], "Kimi EOS token should be [163586]"
# Verify architecture-based detection gives same result
assert get_eos_token_ids_for_model(model_id, model_type="kimi") == [163586]
# Test GLM tokenizer since it also has special handling
@pytest.mark.asyncio
@@ -382,10 +378,3 @@ async def test_glm_tokenizer_specifically():
151329,
151338,
], "GLM EOS tokens should be correct"
# Verify architecture-based detection gives same result
assert get_eos_token_ids_for_model(model_id, model_type="glm4") == [
151336,
151329,
151338,
]