mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-18 14:55:13 -05:00
Compare commits
7 Commits
fix/respon
...
handle-mes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4d24a24e4 | ||
|
|
83af8c63fa | ||
|
|
eccc6298d1 | ||
|
|
c8997217cf | ||
|
|
490d2e46ba | ||
|
|
facf2d4d03 | ||
|
|
a962a28afc |
@@ -26,7 +26,6 @@ from exo.shared.types.openai_responses import (
|
||||
ResponseOutputText,
|
||||
ResponsesRequest,
|
||||
ResponsesResponse,
|
||||
ResponsesStreamEvent,
|
||||
ResponseTextDeltaEvent,
|
||||
ResponseTextDoneEvent,
|
||||
ResponseUsage,
|
||||
@@ -34,11 +33,6 @@ from exo.shared.types.openai_responses import (
|
||||
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
|
||||
|
||||
|
||||
def _format_sse(event: ResponsesStreamEvent) -> str:
|
||||
"""Format a streaming event as an SSE message."""
|
||||
return f"event: {event.type}\ndata: {event.model_dump_json()}\n\n"
|
||||
|
||||
|
||||
def _extract_content(content: str | list[ResponseContentPart]) -> str:
|
||||
"""Extract plain text from a content field that may be a string or list of parts."""
|
||||
if isinstance(content, str):
|
||||
@@ -213,13 +207,13 @@ async def generate_responses_stream(
|
||||
created_event = ResponseCreatedEvent(
|
||||
sequence_number=next(seq), response=initial_response
|
||||
)
|
||||
yield _format_sse(created_event)
|
||||
yield f"event: response.created\ndata: {created_event.model_dump_json()}\n\n"
|
||||
|
||||
# response.in_progress
|
||||
in_progress_event = ResponseInProgressEvent(
|
||||
sequence_number=next(seq), response=initial_response
|
||||
)
|
||||
yield _format_sse(in_progress_event)
|
||||
yield f"event: response.in_progress\ndata: {in_progress_event.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_item.added
|
||||
initial_item = ResponseMessageItem(
|
||||
@@ -230,7 +224,7 @@ async def generate_responses_stream(
|
||||
item_added = ResponseOutputItemAddedEvent(
|
||||
sequence_number=next(seq), output_index=0, item=initial_item
|
||||
)
|
||||
yield _format_sse(item_added)
|
||||
yield f"event: response.output_item.added\ndata: {item_added.model_dump_json()}\n\n"
|
||||
|
||||
# response.content_part.added
|
||||
initial_part = ResponseOutputText(text="")
|
||||
@@ -241,7 +235,7 @@ async def generate_responses_stream(
|
||||
content_index=0,
|
||||
part=initial_part,
|
||||
)
|
||||
yield _format_sse(part_added)
|
||||
yield f"event: response.content_part.added\ndata: {part_added.model_dump_json()}\n\n"
|
||||
|
||||
accumulated_text = ""
|
||||
function_call_items: list[ResponseFunctionCallItem] = []
|
||||
@@ -272,7 +266,7 @@ async def generate_responses_stream(
|
||||
output_index=next_output_index,
|
||||
item=fc_item,
|
||||
)
|
||||
yield _format_sse(fc_added)
|
||||
yield f"event: response.output_item.added\ndata: {fc_added.model_dump_json()}\n\n"
|
||||
|
||||
# response.function_call_arguments.delta
|
||||
args_delta = ResponseFunctionCallArgumentsDeltaEvent(
|
||||
@@ -281,7 +275,7 @@ async def generate_responses_stream(
|
||||
output_index=next_output_index,
|
||||
delta=tool.arguments,
|
||||
)
|
||||
yield _format_sse(args_delta)
|
||||
yield f"event: response.function_call_arguments.delta\ndata: {args_delta.model_dump_json()}\n\n"
|
||||
|
||||
# response.function_call_arguments.done
|
||||
args_done = ResponseFunctionCallArgumentsDoneEvent(
|
||||
@@ -291,7 +285,7 @@ async def generate_responses_stream(
|
||||
name=tool.name,
|
||||
arguments=tool.arguments,
|
||||
)
|
||||
yield _format_sse(args_done)
|
||||
yield f"event: response.function_call_arguments.done\ndata: {args_done.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_item.done
|
||||
fc_done_item = ResponseFunctionCallItem(
|
||||
@@ -306,7 +300,7 @@ async def generate_responses_stream(
|
||||
output_index=next_output_index,
|
||||
item=fc_done_item,
|
||||
)
|
||||
yield _format_sse(fc_item_done)
|
||||
yield f"event: response.output_item.done\ndata: {fc_item_done.model_dump_json()}\n\n"
|
||||
|
||||
function_call_items.append(fc_done_item)
|
||||
next_output_index += 1
|
||||
@@ -322,7 +316,7 @@ async def generate_responses_stream(
|
||||
content_index=0,
|
||||
delta=chunk.text,
|
||||
)
|
||||
yield _format_sse(delta_event)
|
||||
yield f"event: response.output_text.delta\ndata: {delta_event.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_text.done
|
||||
text_done = ResponseTextDoneEvent(
|
||||
@@ -332,7 +326,7 @@ async def generate_responses_stream(
|
||||
content_index=0,
|
||||
text=accumulated_text,
|
||||
)
|
||||
yield _format_sse(text_done)
|
||||
yield f"event: response.output_text.done\ndata: {text_done.model_dump_json()}\n\n"
|
||||
|
||||
# response.content_part.done
|
||||
final_part = ResponseOutputText(text=accumulated_text)
|
||||
@@ -343,7 +337,7 @@ async def generate_responses_stream(
|
||||
content_index=0,
|
||||
part=final_part,
|
||||
)
|
||||
yield _format_sse(part_done)
|
||||
yield f"event: response.content_part.done\ndata: {part_done.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_item.done
|
||||
final_message_item = ResponseMessageItem(
|
||||
@@ -354,7 +348,7 @@ async def generate_responses_stream(
|
||||
item_done = ResponseOutputItemDoneEvent(
|
||||
sequence_number=next(seq), output_index=0, item=final_message_item
|
||||
)
|
||||
yield _format_sse(item_done)
|
||||
yield f"event: response.output_item.done\ndata: {item_done.model_dump_json()}\n\n"
|
||||
|
||||
# Create usage from usage data if available
|
||||
usage = None
|
||||
@@ -379,4 +373,4 @@ async def generate_responses_stream(
|
||||
completed_event = ResponseCompletedEvent(
|
||||
sequence_number=next(seq), response=final_response
|
||||
)
|
||||
yield _format_sse(completed_event)
|
||||
yield f"event: response.completed\ndata: {completed_event.model_dump_json()}\n\n"
|
||||
|
||||
@@ -211,6 +211,14 @@ class Router:
|
||||
pass
|
||||
except AllQueuesFullError:
|
||||
logger.warning(f"All peer queues full, dropping message on {topic}")
|
||||
except RuntimeError as e:
|
||||
if "MessageTooLarge" in str(e):
|
||||
logger.error(
|
||||
f"Message too large for gossipsub on topic {topic} "
|
||||
f"({len(data)} bytes), dropping message"
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def get_node_id_keypair(
|
||||
|
||||
Reference in New Issue
Block a user