mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-05 11:43:17 -05:00
Compare commits
31 Commits
alexcheema
...
alexcheema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f94870f2c0 | ||
|
|
18928d4d9e | ||
|
|
937da476b0 | ||
|
|
1c1c286127 | ||
|
|
258785be84 | ||
|
|
13a6b9819a | ||
|
|
1733d07cb3 | ||
|
|
b3e4c9b1e5 | ||
|
|
4c74792373 | ||
|
|
eadb6de1f7 | ||
|
|
7ba2408eed | ||
|
|
ce4d7f4d43 | ||
|
|
6727523eab | ||
|
|
a8f81e0495 | ||
|
|
ba7148ccec | ||
|
|
a64b8addc6 | ||
|
|
e6599a9408 | ||
|
|
93f4753598 | ||
|
|
75fe505275 | ||
|
|
d7c044e349 | ||
|
|
53b6d56e9f | ||
|
|
7fe0a61230 | ||
|
|
5a36542631 | ||
|
|
955e0105b3 | ||
|
|
4d1eb1d9bd | ||
|
|
365416c65e | ||
|
|
04af76e10f | ||
|
|
a84c3431cd | ||
|
|
52445b21f6 | ||
|
|
435bd7f6fa | ||
|
|
dd25b5b90e |
@@ -9,7 +9,6 @@
|
||||
regenerateFromToken,
|
||||
setEditingImage,
|
||||
} from "$lib/stores/app.svelte";
|
||||
import type { Message } from "$lib/stores/app.svelte";
|
||||
import type { MessageAttachment } from "$lib/stores/app.svelte";
|
||||
import MarkdownContent from "./MarkdownContent.svelte";
|
||||
import TokenHeatmap from "./TokenHeatmap.svelte";
|
||||
|
||||
51
dashboard/src/lib/components/PrefillProgressBar.svelte
Normal file
51
dashboard/src/lib/components/PrefillProgressBar.svelte
Normal file
@@ -0,0 +1,51 @@
|
||||
<script lang="ts">
|
||||
import type { PrefillProgress } from "$lib/stores/app.svelte";
|
||||
|
||||
interface Props {
|
||||
progress: PrefillProgress;
|
||||
class?: string;
|
||||
}
|
||||
|
||||
let { progress, class: className = "" }: Props = $props();
|
||||
|
||||
const percentage = $derived(
|
||||
progress.total > 0
|
||||
? Math.round((progress.processed / progress.total) * 100)
|
||||
: 0,
|
||||
);
|
||||
|
||||
function formatTokenCount(count: number): string {
|
||||
if (count >= 1000) {
|
||||
return `${(count / 1000).toFixed(1)}k`;
|
||||
}
|
||||
return count.toString();
|
||||
}
|
||||
</script>
|
||||
|
||||
<div class="prefill-progress {className}">
|
||||
<div
|
||||
class="flex items-center justify-between text-xs text-exo-light-gray mb-1"
|
||||
>
|
||||
<span>Processing prompt</span>
|
||||
<span class="font-mono">
|
||||
{formatTokenCount(progress.processed)} / {formatTokenCount(
|
||||
progress.total,
|
||||
)} tokens
|
||||
</span>
|
||||
</div>
|
||||
<div class="h-1.5 bg-exo-black/60 rounded-full overflow-hidden">
|
||||
<div
|
||||
class="h-full bg-exo-yellow rounded-full transition-all duration-150 ease-out"
|
||||
style="width: {percentage}%"
|
||||
></div>
|
||||
</div>
|
||||
<div class="text-right text-xs text-exo-light-gray/70 mt-0.5 font-mono">
|
||||
{percentage}%
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<style>
|
||||
.prefill-progress {
|
||||
width: 100%;
|
||||
}
|
||||
</style>
|
||||
@@ -255,6 +255,12 @@ export interface TokenData {
|
||||
topLogprobs: TopLogprob[];
|
||||
}
|
||||
|
||||
export interface PrefillProgress {
|
||||
processed: number;
|
||||
total: number;
|
||||
}
|
||||
|
||||
|
||||
export interface Message {
|
||||
id: string;
|
||||
role: "user" | "assistant" | "system";
|
||||
|
||||
@@ -19,7 +19,12 @@ from exo.shared.types.api import (
|
||||
StreamingChoiceResponse,
|
||||
ToolCall,
|
||||
)
|
||||
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
|
||||
from exo.shared.types.chunks import (
|
||||
ErrorChunk,
|
||||
PrefillProgressData,
|
||||
TokenChunk,
|
||||
ToolCallChunk,
|
||||
)
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
|
||||
|
||||
@@ -122,55 +127,65 @@ def chunk_to_response(
|
||||
|
||||
async def generate_chat_stream(
|
||||
command_id: CommandId,
|
||||
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
|
||||
event_stream: AsyncGenerator[
|
||||
PrefillProgressData | ErrorChunk | ToolCallChunk | TokenChunk, None
|
||||
],
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Generate Chat Completions API streaming events from chunks."""
|
||||
async for chunk in chunk_stream:
|
||||
if isinstance(chunk, ErrorChunk):
|
||||
error_response = ErrorResponse(
|
||||
error=ErrorInfo(
|
||||
message=chunk.error_message or "Internal server error",
|
||||
type="InternalServerError",
|
||||
code=500,
|
||||
)
|
||||
)
|
||||
yield f"data: {error_response.model_dump_json()}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
"""Generate Chat Completions API streaming events from StreamEvents.
|
||||
|
||||
if isinstance(chunk, ToolCallChunk):
|
||||
tool_call_deltas = [
|
||||
ToolCall(
|
||||
id=str(uuid4()),
|
||||
index=i,
|
||||
function=tool,
|
||||
)
|
||||
for i, tool in enumerate(chunk.tool_calls)
|
||||
]
|
||||
tool_response = ChatCompletionResponse(
|
||||
id=command_id,
|
||||
created=int(time.time()),
|
||||
model=chunk.model,
|
||||
choices=[
|
||||
StreamingChoiceResponse(
|
||||
index=0,
|
||||
delta=ChatCompletionMessage(
|
||||
role="assistant",
|
||||
tool_calls=tool_call_deltas,
|
||||
),
|
||||
finish_reason="tool_calls",
|
||||
Handles PrefillProgressData, ErrorChunk, ToolCallChunk, and TokenChunk.
|
||||
"""
|
||||
async for event in event_stream:
|
||||
match event:
|
||||
case PrefillProgressData():
|
||||
yield f"event: prefill_progress\ndata: {event.model_dump_json()}\n\n"
|
||||
|
||||
case ErrorChunk():
|
||||
error_response = ErrorResponse(
|
||||
error=ErrorInfo(
|
||||
message=event.error_message or "Internal server error",
|
||||
type="InternalServerError",
|
||||
code=500,
|
||||
)
|
||||
],
|
||||
)
|
||||
yield f"data: {tool_response.model_dump_json()}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
)
|
||||
yield f"data: {error_response.model_dump_json()}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
chunk_response = chunk_to_response(chunk, command_id)
|
||||
yield f"data: {chunk_response.model_dump_json()}\n\n"
|
||||
case ToolCallChunk():
|
||||
tool_call_deltas = [
|
||||
ToolCall(
|
||||
id=str(uuid4()),
|
||||
index=i,
|
||||
function=tool,
|
||||
)
|
||||
for i, tool in enumerate(event.tool_calls)
|
||||
]
|
||||
tool_response = ChatCompletionResponse(
|
||||
id=command_id,
|
||||
created=int(time.time()),
|
||||
model=event.model,
|
||||
choices=[
|
||||
StreamingChoiceResponse(
|
||||
index=0,
|
||||
delta=ChatCompletionMessage(
|
||||
role="assistant",
|
||||
tool_calls=tool_call_deltas,
|
||||
),
|
||||
finish_reason="tool_calls",
|
||||
)
|
||||
],
|
||||
)
|
||||
yield f"data: {tool_response.model_dump_json()}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
yield "data: [DONE]\n\n"
|
||||
case TokenChunk():
|
||||
chunk_response = chunk_to_response(event, command_id)
|
||||
yield f"data: {chunk_response.model_dump_json()}\n\n"
|
||||
|
||||
if event.finish_reason is not None:
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
|
||||
async def collect_chat_response(
|
||||
|
||||
@@ -103,6 +103,7 @@ from exo.shared.types.chunks import (
|
||||
ErrorChunk,
|
||||
ImageChunk,
|
||||
InputImageChunk,
|
||||
PrefillProgressData,
|
||||
TokenChunk,
|
||||
ToolCallChunk,
|
||||
)
|
||||
@@ -132,6 +133,7 @@ from exo.shared.types.events import (
|
||||
Event,
|
||||
ForwarderEvent,
|
||||
IndexedEvent,
|
||||
PrefillProgress,
|
||||
TracesMerged,
|
||||
)
|
||||
from exo.shared.types.memory import Memory
|
||||
@@ -213,7 +215,8 @@ class API:
|
||||
)
|
||||
|
||||
self._text_generation_queues: dict[
|
||||
CommandId, Sender[TokenChunk | ErrorChunk | ToolCallChunk]
|
||||
CommandId,
|
||||
Sender[TokenChunk | ErrorChunk | ToolCallChunk | PrefillProgressData],
|
||||
] = {}
|
||||
self._image_generation_queues: dict[
|
||||
CommandId, Sender[ImageChunk | ErrorChunk]
|
||||
@@ -510,22 +513,27 @@ class API:
|
||||
instance_id=instance_id,
|
||||
)
|
||||
|
||||
async def _token_chunk_stream(
|
||||
async def _stream_events(
|
||||
self, command_id: CommandId
|
||||
) -> AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None]:
|
||||
"""Yield chunks for a given command until completion.
|
||||
) -> AsyncGenerator[
|
||||
TokenChunk | ErrorChunk | ToolCallChunk | PrefillProgressData, None
|
||||
]:
|
||||
"""Yield stream events for a command.
|
||||
|
||||
This is the internal low-level stream used by all API adapters.
|
||||
"""
|
||||
try:
|
||||
self._text_generation_queues[command_id], recv = channel[
|
||||
ErrorChunk | ToolCallChunk | TokenChunk
|
||||
TokenChunk | ErrorChunk | ToolCallChunk | PrefillProgressData
|
||||
]()
|
||||
|
||||
with recv as token_chunks:
|
||||
async for chunk in token_chunks:
|
||||
yield chunk
|
||||
if chunk.finish_reason is not None:
|
||||
with recv as events:
|
||||
async for event in events:
|
||||
yield event
|
||||
if (
|
||||
isinstance(event, TokenChunk)
|
||||
and event.finish_reason is not None
|
||||
):
|
||||
break
|
||||
|
||||
except anyio.get_cancelled_exc_class():
|
||||
@@ -542,6 +550,14 @@ class API:
|
||||
if command_id in self._text_generation_queues:
|
||||
del self._text_generation_queues[command_id]
|
||||
|
||||
async def _chunk_stream(
|
||||
self, command_id: CommandId
|
||||
) -> AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None]:
|
||||
"""Yield chunks, filtering out prefill progress events."""
|
||||
async for event in self._stream_events(command_id):
|
||||
if not isinstance(event, PrefillProgressData):
|
||||
yield event
|
||||
|
||||
async def _collect_text_generation_with_stats(
|
||||
self, command_id: CommandId
|
||||
) -> BenchChatCompletionResponse:
|
||||
@@ -552,7 +568,7 @@ class API:
|
||||
|
||||
stats: GenerationStats | None = None
|
||||
|
||||
async for chunk in self._token_chunk_stream(command_id):
|
||||
async for chunk in self._chunk_stream(command_id):
|
||||
if chunk.finish_reason == "error":
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
@@ -624,7 +640,7 @@ class API:
|
||||
return StreamingResponse(
|
||||
generate_chat_stream(
|
||||
command.command_id,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
self._stream_events(command.command_id),
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
@@ -634,10 +650,13 @@ class API:
|
||||
},
|
||||
)
|
||||
|
||||
return await collect_chat_response(
|
||||
command.command_id,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
)
|
||||
try:
|
||||
return await collect_chat_response(
|
||||
command.command_id,
|
||||
self._chunk_stream(command.command_id),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
async def bench_chat_completions(
|
||||
self, payload: BenchChatCompletionRequest
|
||||
@@ -1185,7 +1204,7 @@ class API:
|
||||
generate_claude_stream(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
self._chunk_stream(command.command_id),
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
@@ -1195,11 +1214,14 @@ class API:
|
||||
},
|
||||
)
|
||||
|
||||
return await collect_claude_response(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
)
|
||||
try:
|
||||
return await collect_claude_response(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._chunk_stream(command.command_id),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
async def openai_responses(
|
||||
self, payload: ResponsesRequest
|
||||
@@ -1217,7 +1239,7 @@ class API:
|
||||
generate_responses_stream(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
self._chunk_stream(command.command_id),
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
@@ -1227,11 +1249,14 @@ class API:
|
||||
},
|
||||
)
|
||||
|
||||
return await collect_responses_response(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
)
|
||||
try:
|
||||
return await collect_responses_response(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._chunk_stream(command.command_id),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
def _calculate_total_available_memory(self) -> Memory:
|
||||
"""Calculate total available memory across all nodes in bytes."""
|
||||
@@ -1373,6 +1398,20 @@ class API:
|
||||
except BrokenResourceError:
|
||||
self._text_generation_queues.pop(event.command_id, None)
|
||||
|
||||
elif isinstance(event, PrefillProgress):
|
||||
if queue := self._text_generation_queues.get(
|
||||
event.command_id, None
|
||||
):
|
||||
try:
|
||||
await queue.send(
|
||||
PrefillProgressData(
|
||||
processed_tokens=event.processed_tokens,
|
||||
total_tokens=event.total_tokens,
|
||||
)
|
||||
)
|
||||
except BrokenResourceError:
|
||||
self._text_generation_queues.pop(event.command_id, None)
|
||||
|
||||
if isinstance(event, TracesMerged):
|
||||
self._save_merged_trace(event)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from exo.shared.types.events import (
|
||||
NodeDownloadProgress,
|
||||
NodeGatheredInfo,
|
||||
NodeTimedOut,
|
||||
PrefillProgress,
|
||||
RunnerDeleted,
|
||||
RunnerStatusUpdated,
|
||||
TaskAcknowledged,
|
||||
@@ -61,6 +62,7 @@ def event_apply(event: Event, state: State) -> State:
|
||||
| ChunkGenerated()
|
||||
| TaskAcknowledged()
|
||||
| InputChunkReceived()
|
||||
| PrefillProgress()
|
||||
| TracesCollected()
|
||||
| TracesMerged()
|
||||
): # Pass-through events that don't modify state
|
||||
|
||||
@@ -77,3 +77,13 @@ class InputImageChunk(BaseChunk):
|
||||
|
||||
|
||||
GenerationChunk = TokenChunk | ImageChunk | ToolCallChunk | ErrorChunk
|
||||
|
||||
|
||||
class PrefillProgressData(TaggedModel):
|
||||
"""Data class for prefill progress events during streaming."""
|
||||
|
||||
processed_tokens: int
|
||||
total_tokens: int
|
||||
|
||||
|
||||
StreamEvent = TokenChunk | PrefillProgressData
|
||||
|
||||
@@ -102,6 +102,12 @@ class InputChunkReceived(BaseEvent):
|
||||
chunk: InputImageChunk
|
||||
|
||||
|
||||
class PrefillProgress(BaseEvent):
|
||||
command_id: CommandId
|
||||
processed_tokens: int
|
||||
total_tokens: int
|
||||
|
||||
|
||||
class TopologyEdgeCreated(BaseEvent):
|
||||
conn: Connection
|
||||
|
||||
@@ -148,6 +154,7 @@ Event = (
|
||||
| NodeDownloadProgress
|
||||
| ChunkGenerated
|
||||
| InputChunkReceived
|
||||
| PrefillProgress
|
||||
| TopologyEdgeCreated
|
||||
| TopologyEdgeDeleted
|
||||
| TracesCollected
|
||||
|
||||
@@ -66,3 +66,8 @@ class ToolCallResponse(BaseRunnerResponse):
|
||||
|
||||
class FinishedResponse(BaseRunnerResponse):
|
||||
pass
|
||||
|
||||
|
||||
class PrefillProgressResponse(BaseRunnerResponse):
|
||||
processed_tokens: int
|
||||
total_tokens: int
|
||||
|
||||
@@ -79,7 +79,7 @@ def prefill(
|
||||
max_tokens=1,
|
||||
sampler=sampler,
|
||||
prompt_cache=cache,
|
||||
prefill_step_size=2048,
|
||||
prefill_step_size=1024,
|
||||
kv_group_size=KV_GROUP_SIZE,
|
||||
kv_bits=KV_BITS,
|
||||
prompt_progress_callback=progress_callback,
|
||||
@@ -127,7 +127,7 @@ def warmup_inference(
|
||||
max_tokens=50,
|
||||
sampler=sampler,
|
||||
prompt_cache=cache,
|
||||
prefill_step_size=2048,
|
||||
prefill_step_size=1024,
|
||||
kv_group_size=KV_GROUP_SIZE,
|
||||
kv_bits=KV_BITS,
|
||||
):
|
||||
@@ -221,6 +221,7 @@ def mlx_generate(
|
||||
task: TextGenerationTaskParams,
|
||||
prompt: str,
|
||||
kv_prefix_cache: KVPrefixCache | None = None,
|
||||
on_prefill_progress: Callable[[int, int], None] | None = None,
|
||||
) -> Generator[GenerationResponse]:
|
||||
# Ensure that generation stats only contains peak memory for this generation
|
||||
mx.reset_peak_memory()
|
||||
@@ -292,9 +293,10 @@ def mlx_generate(
|
||||
logits_processors=logits_processors,
|
||||
prompt_cache=caches,
|
||||
# TODO: Dynamically change prefill step size to be the maximum possible without timing out.
|
||||
prefill_step_size=2048,
|
||||
prefill_step_size=1024,
|
||||
kv_group_size=KV_GROUP_SIZE,
|
||||
kv_bits=KV_BITS,
|
||||
prompt_progress_callback=on_prefill_progress,
|
||||
),
|
||||
start=1,
|
||||
):
|
||||
|
||||
@@ -25,6 +25,7 @@ from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.events import (
|
||||
ChunkGenerated,
|
||||
Event,
|
||||
PrefillProgress,
|
||||
RunnerStatusUpdated,
|
||||
TaskAcknowledged,
|
||||
TaskStatusUpdated,
|
||||
@@ -261,6 +262,17 @@ def main(
|
||||
assert model and not isinstance(model, DistributedImageModel)
|
||||
assert tokenizer
|
||||
|
||||
# Define callback to send prefill progress events directly
|
||||
def on_prefill_progress(processed: int, total: int) -> None:
|
||||
if device_rank == 0:
|
||||
event_sender.send(
|
||||
PrefillProgress(
|
||||
command_id=command_id,
|
||||
processed_tokens=processed,
|
||||
total_tokens=total,
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
_check_for_debug_prompts(task_params)
|
||||
|
||||
@@ -274,6 +286,7 @@ def main(
|
||||
task=task_params,
|
||||
prompt=prompt,
|
||||
kv_prefix_cache=kv_prefix_cache,
|
||||
on_prefill_progress=on_prefill_progress,
|
||||
)
|
||||
|
||||
# For other thinking models (GLM, etc.), check if we need to
|
||||
|
||||
Reference in New Issue
Block a user