mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-27 19:56:25 -05:00
Compare commits
1 Commits
main
...
leo/prepar
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
76260fb36b |
@@ -1,6 +1,10 @@
|
||||
[workspace]
|
||||
resolver = "3"
|
||||
members = ["rust/networking", "rust/exo_pyo3_bindings", "rust/util"]
|
||||
members = [
|
||||
"rust/networking",
|
||||
"rust/exo_pyo3_bindings",
|
||||
"rust/util",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "0.0.1"
|
||||
|
||||
@@ -2,4 +2,6 @@
|
||||
#
|
||||
# Lists the suite files to include. Each file defines benchmarks
|
||||
# with shared constraints, topology, and default args.
|
||||
include = ["single-m3-ultra.toml"]
|
||||
include = [
|
||||
"single-m3-ultra.toml",
|
||||
]
|
||||
|
||||
@@ -4,13 +4,13 @@ version = "0.1.0"
|
||||
description = "Benchmarking tool for exo distributed inference"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"httpx>=0.27.0",
|
||||
"loguru>=0.7.3",
|
||||
"transformers>=5.0.0",
|
||||
"huggingface-hub>=0.33.4",
|
||||
"tiktoken>=0.12.0",
|
||||
"jinja2>=3.1.0",
|
||||
"protobuf>=5.29.0",
|
||||
"httpx>=0.27.0",
|
||||
"loguru>=0.7.3",
|
||||
"transformers>=5.0.0",
|
||||
"huggingface-hub>=0.33.4",
|
||||
"tiktoken>=0.12.0",
|
||||
"jinja2>=3.1.0",
|
||||
"protobuf>=5.29.0",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
|
||||
@@ -2,10 +2,10 @@
|
||||
#
|
||||
# Shared constraints applied to ALL benchmarks in this file.
|
||||
constraints = [
|
||||
"All(MacOsBuild(=25D125))",
|
||||
"Hosts(=1)",
|
||||
"All(Chip(m3_ultra))",
|
||||
"All(GpuCores(=80))",
|
||||
"All(MacOsBuild(=25D125))",
|
||||
"Hosts(=1)",
|
||||
"All(Chip(m3_ultra))",
|
||||
"All(GpuCores(=80))",
|
||||
]
|
||||
|
||||
[topology]
|
||||
|
||||
@@ -3158,23 +3158,6 @@ class AppStore {
|
||||
return (await response.json()) as TraceStatsResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete traces by task IDs
|
||||
*/
|
||||
async deleteTraces(
|
||||
taskIds: string[],
|
||||
): Promise<{ deleted: string[]; notFound: string[] }> {
|
||||
const response = await fetch("/v1/traces/delete", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ taskIds }),
|
||||
});
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to delete traces: ${response.status}`);
|
||||
}
|
||||
return await response.json();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the URL for the raw trace file (for Perfetto)
|
||||
*/
|
||||
@@ -3318,5 +3301,3 @@ export const fetchTraceStats = (taskId: string) =>
|
||||
appStore.fetchTraceStats(taskId);
|
||||
export const getTraceRawUrl = (taskId: string) =>
|
||||
appStore.getTraceRawUrl(taskId);
|
||||
export const deleteTraces = (taskIds: string[]) =>
|
||||
appStore.deleteTraces(taskIds);
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
import {
|
||||
listTraces,
|
||||
getTraceRawUrl,
|
||||
deleteTraces,
|
||||
type TraceListItem,
|
||||
} from "$lib/stores/app.svelte";
|
||||
import HeaderNav from "$lib/components/HeaderNav.svelte";
|
||||
@@ -11,51 +10,6 @@
|
||||
let traces = $state<TraceListItem[]>([]);
|
||||
let loading = $state(true);
|
||||
let error = $state<string | null>(null);
|
||||
let selectedIds = $state<Set<string>>(new Set());
|
||||
let deleting = $state(false);
|
||||
|
||||
let allSelected = $derived(
|
||||
traces.length > 0 && selectedIds.size === traces.length,
|
||||
);
|
||||
|
||||
function toggleSelect(taskId: string) {
|
||||
const next = new Set(selectedIds);
|
||||
if (next.has(taskId)) {
|
||||
next.delete(taskId);
|
||||
} else {
|
||||
next.add(taskId);
|
||||
}
|
||||
selectedIds = next;
|
||||
}
|
||||
|
||||
function toggleSelectAll() {
|
||||
if (allSelected) {
|
||||
selectedIds = new Set();
|
||||
} else {
|
||||
selectedIds = new Set(traces.map((t) => t.taskId));
|
||||
}
|
||||
}
|
||||
|
||||
async function handleDelete() {
|
||||
if (selectedIds.size === 0) return;
|
||||
const count = selectedIds.size;
|
||||
if (
|
||||
!confirm(
|
||||
`Delete ${count} trace${count === 1 ? "" : "s"}? This cannot be undone.`,
|
||||
)
|
||||
)
|
||||
return;
|
||||
deleting = true;
|
||||
try {
|
||||
await deleteTraces([...selectedIds]);
|
||||
selectedIds = new Set();
|
||||
await refresh();
|
||||
} catch (e) {
|
||||
error = e instanceof Error ? e.message : "Failed to delete traces";
|
||||
} finally {
|
||||
deleting = false;
|
||||
}
|
||||
}
|
||||
|
||||
function formatBytes(bytes: number): string {
|
||||
if (!bytes || bytes <= 0) return "0B";
|
||||
@@ -155,16 +109,6 @@
|
||||
</h1>
|
||||
</div>
|
||||
<div class="flex items-center gap-3">
|
||||
{#if selectedIds.size > 0}
|
||||
<button
|
||||
type="button"
|
||||
class="text-xs font-mono text-red-400 hover:text-red-300 transition-colors uppercase border border-red-500/40 px-2 py-1 rounded"
|
||||
onclick={handleDelete}
|
||||
disabled={deleting}
|
||||
>
|
||||
{deleting ? "Deleting..." : `Delete (${selectedIds.size})`}
|
||||
</button>
|
||||
{/if}
|
||||
<button
|
||||
type="button"
|
||||
class="text-xs font-mono text-exo-light-gray hover:text-exo-yellow transition-colors uppercase border border-exo-medium-gray/40 px-2 py-1 rounded"
|
||||
@@ -199,41 +143,14 @@
|
||||
</div>
|
||||
{:else}
|
||||
<div class="space-y-3">
|
||||
<div class="flex items-center gap-2 px-1">
|
||||
<button
|
||||
type="button"
|
||||
class="text-xs font-mono uppercase transition-colors {allSelected
|
||||
? 'text-exo-yellow'
|
||||
: 'text-exo-light-gray hover:text-exo-yellow'}"
|
||||
onclick={toggleSelectAll}
|
||||
>
|
||||
{allSelected ? "Deselect all" : "Select all"}
|
||||
</button>
|
||||
</div>
|
||||
{#each traces as trace}
|
||||
{@const isSelected = selectedIds.has(trace.taskId)}
|
||||
<!-- svelte-ignore a11y_no_static_element_interactions -->
|
||||
<div
|
||||
role="button"
|
||||
tabindex="0"
|
||||
class="w-full text-left rounded border-l-2 border-r border-t border-b transition-all p-4 flex items-center justify-between gap-4 cursor-pointer {isSelected
|
||||
? 'bg-exo-yellow/10 border-l-exo-yellow border-r-exo-medium-gray/30 border-t-exo-medium-gray/30 border-b-exo-medium-gray/30'
|
||||
: 'bg-exo-black/30 border-l-transparent border-r-exo-medium-gray/30 border-t-exo-medium-gray/30 border-b-exo-medium-gray/30 hover:bg-white/[0.03]'}"
|
||||
onclick={() => toggleSelect(trace.taskId)}
|
||||
onkeydown={(e) => {
|
||||
if (e.key === "Enter" || e.key === " ") {
|
||||
e.preventDefault();
|
||||
toggleSelect(trace.taskId);
|
||||
}
|
||||
}}
|
||||
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-4 flex items-center justify-between gap-4"
|
||||
>
|
||||
<div class="min-w-0 flex-1">
|
||||
<a
|
||||
href="#/traces/{trace.taskId}"
|
||||
class="text-sm font-mono transition-colors truncate block {isSelected
|
||||
? 'text-exo-yellow'
|
||||
: 'text-white hover:text-exo-yellow'}"
|
||||
onclick={(e) => e.stopPropagation()}
|
||||
class="text-sm font-mono text-white hover:text-exo-yellow transition-colors truncate block"
|
||||
>
|
||||
{trace.taskId}
|
||||
</a>
|
||||
@@ -243,11 +160,7 @@
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
<!-- svelte-ignore a11y_click_events_have_key_events -->
|
||||
<div
|
||||
class="flex items-center gap-2 shrink-0"
|
||||
onclick={(e) => e.stopPropagation()}
|
||||
>
|
||||
<div class="flex items-center gap-2 shrink-0">
|
||||
<a
|
||||
href="#/traces/{trace.taskId}"
|
||||
class="text-xs font-mono text-exo-light-gray hover:text-exo-yellow transition-colors uppercase border border-exo-medium-gray/40 px-2 py-1 rounded"
|
||||
|
||||
@@ -108,7 +108,6 @@
|
||||
package = pkgsSwift.swiftPackages.swift-format;
|
||||
};
|
||||
shfmt.enable = true;
|
||||
taplo.enable = true;
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
100
pyproject.toml
100
pyproject.toml
@@ -5,31 +5,31 @@ description = "Exo"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"aiofiles>=24.1.0",
|
||||
"aiohttp>=3.12.14",
|
||||
"types-aiofiles>=24.1.0.20250708",
|
||||
"pydantic>=2.11.7",
|
||||
"fastapi>=0.116.1",
|
||||
"filelock>=3.18.0",
|
||||
"rustworkx>=0.17.1",
|
||||
"huggingface-hub>=0.33.4",
|
||||
"psutil>=7.0.0",
|
||||
"loguru>=0.7.3",
|
||||
"exo_pyo3_bindings", # rust bindings
|
||||
"anyio==4.11.0",
|
||||
"mlx; sys_platform == 'darwin'",
|
||||
"mlx[cpu]==0.30.6; sys_platform == 'linux'",
|
||||
"mlx-lm==0.30.7",
|
||||
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
|
||||
"hypercorn>=0.18.0",
|
||||
"openai-harmony>=0.0.8",
|
||||
"httpx>=0.28.1",
|
||||
"tomlkit>=0.14.0",
|
||||
"pillow>=11.0,<12.0", # compatibility with mflux
|
||||
"mflux==0.15.5",
|
||||
"python-multipart>=0.0.21",
|
||||
"msgspec>=0.19.0",
|
||||
"zstandard>=0.23.0",
|
||||
"aiofiles>=24.1.0",
|
||||
"aiohttp>=3.12.14",
|
||||
"types-aiofiles>=24.1.0.20250708",
|
||||
"pydantic>=2.11.7",
|
||||
"fastapi>=0.116.1",
|
||||
"filelock>=3.18.0",
|
||||
"rustworkx>=0.17.1",
|
||||
"huggingface-hub>=0.33.4",
|
||||
"psutil>=7.0.0",
|
||||
"loguru>=0.7.3",
|
||||
"exo_pyo3_bindings", # rust bindings
|
||||
"anyio==4.11.0",
|
||||
"mlx; sys_platform == 'darwin'",
|
||||
"mlx[cpu]==0.30.6; sys_platform == 'linux'",
|
||||
"mlx-lm==0.30.7",
|
||||
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
|
||||
"hypercorn>=0.18.0",
|
||||
"openai-harmony>=0.0.8",
|
||||
"httpx>=0.28.1",
|
||||
"tomlkit>=0.14.0",
|
||||
"pillow>=11.0,<12.0", # compatibility with mflux
|
||||
"mflux==0.15.5",
|
||||
"python-multipart>=0.0.21",
|
||||
"msgspec>=0.19.0",
|
||||
"zstandard>=0.23.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
@@ -38,12 +38,12 @@ exo = "exo.main:main"
|
||||
# dependencies only required for development
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"basedpyright>=1.29.0",
|
||||
"pyinstaller>=6.17.0",
|
||||
"pytest>=8.4.0",
|
||||
"pytest-asyncio>=1.0.0",
|
||||
"pytest-env",
|
||||
"ruff>=0.11.13",
|
||||
"basedpyright>=1.29.0",
|
||||
"pyinstaller>=6.17.0",
|
||||
"pytest>=8.4.0",
|
||||
"pytest-asyncio>=1.0.0",
|
||||
"pytest-env",
|
||||
"ruff>=0.11.13",
|
||||
]
|
||||
|
||||
# mlx[cuda] requires a newer version of mlx. the ideal on linux is: default to mlx[cpu] unless[cuda] specified.
|
||||
@@ -57,7 +57,10 @@ dev = [
|
||||
###
|
||||
|
||||
[tool.uv.workspace]
|
||||
members = ["rust/exo_pyo3_bindings", "bench"]
|
||||
members = [
|
||||
"rust/exo_pyo3_bindings",
|
||||
"bench",
|
||||
]
|
||||
|
||||
[tool.uv.sources]
|
||||
exo_pyo3_bindings = { workspace = true }
|
||||
@@ -92,15 +95,7 @@ reportUnnecessaryTypeIgnoreComment = "error"
|
||||
pythonVersion = "3.13"
|
||||
pythonPlatform = "Darwin"
|
||||
|
||||
exclude = [
|
||||
"**/.venv",
|
||||
"**/venv",
|
||||
"**/__pycache__",
|
||||
"**/exo_scripts",
|
||||
"**/.direnv",
|
||||
"**/rust",
|
||||
"**/.github",
|
||||
]
|
||||
exclude = ["**/.venv", "**/venv", "**/__pycache__", "**/exo_scripts", "**/.direnv", "**/rust", "**/.github"]
|
||||
stubPath = ".mlx_typings"
|
||||
|
||||
[[tool.basedpyright.executionEnvironments]]
|
||||
@@ -114,18 +109,17 @@ root = "src"
|
||||
[tool.uv]
|
||||
required-version = ">=0.8.6"
|
||||
prerelease = "allow"
|
||||
environments = ["sys_platform == 'darwin'", "sys_platform == 'linux'"]
|
||||
environments = [
|
||||
"sys_platform == 'darwin'",
|
||||
"sys_platform == 'linux'",
|
||||
]
|
||||
|
||||
###
|
||||
# ruff configuration
|
||||
###
|
||||
|
||||
[tool.ruff]
|
||||
extend-exclude = [
|
||||
"shared/protobufs/**",
|
||||
"*mlx_typings/**",
|
||||
"rust/exo_pyo3_bindings/**",
|
||||
]
|
||||
extend-exclude = ["shared/protobufs/**", "*mlx_typings/**", "rust/exo_pyo3_bindings/**"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
extend-select = ["I", "N", "B", "A", "PIE", "SIM"]
|
||||
@@ -133,7 +127,13 @@ extend-select = ["I", "N", "B", "A", "PIE", "SIM"]
|
||||
[tool.pytest.ini_options]
|
||||
pythonpath = "."
|
||||
asyncio_mode = "auto"
|
||||
markers = ["slow: marks tests as slow (deselected by default)"]
|
||||
env = ["EXO_TESTS=1"]
|
||||
markers = [
|
||||
"slow: marks tests as slow (deselected by default)"
|
||||
]
|
||||
env = [
|
||||
"EXO_TESTS=1"
|
||||
]
|
||||
addopts = "-m 'not slow' --ignore=tests/start_distributed_test.py"
|
||||
filterwarnings = ["ignore:builtin type Swig:DeprecationWarning"]
|
||||
filterwarnings = [
|
||||
"ignore:builtin type Swig:DeprecationWarning",
|
||||
]
|
||||
|
||||
@@ -26,24 +26,20 @@ networking = { workspace = true }
|
||||
|
||||
# interop
|
||||
pyo3 = { version = "0.27.2", features = [
|
||||
# "abi3-py313", # tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.13
|
||||
# "nightly", # enables better-supported GIL integration
|
||||
"experimental-async", # async support in #[pyfunction] & #[pymethods]
|
||||
#"experimental-inspect", # inspection of generated binary => easier to automate type-hint generation
|
||||
#"py-clone", # adding Clone-ing of `Py<T>` without GIL (may cause panics - remove if panics happen)
|
||||
# "multiple-pymethods", # allows multiple #[pymethods] sections per class
|
||||
# "abi3-py313", # tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.13
|
||||
# "nightly", # enables better-supported GIL integration
|
||||
"experimental-async", # async support in #[pyfunction] & #[pymethods]
|
||||
#"experimental-inspect", # inspection of generated binary => easier to automate type-hint generation
|
||||
#"py-clone", # adding Clone-ing of `Py<T>` without GIL (may cause panics - remove if panics happen)
|
||||
# "multiple-pymethods", # allows multiple #[pymethods] sections per class
|
||||
|
||||
# integrations with other libraries
|
||||
# "arc_lock", "bigdecimal", "either", "hashbrown", "indexmap", "num-bigint", "num-complex", "num-rational",
|
||||
# "ordered-float", "rust_decimal", "smallvec",
|
||||
# "anyhow", "chrono", "chrono-local", "chrono-tz", "eyre", "jiff-02", "lock_api", "parking-lot", "time", "serde",
|
||||
# integrations with other libraries
|
||||
# "arc_lock", "bigdecimal", "either", "hashbrown", "indexmap", "num-bigint", "num-complex", "num-rational",
|
||||
# "ordered-float", "rust_decimal", "smallvec",
|
||||
# "anyhow", "chrono", "chrono-local", "chrono-tz", "eyre", "jiff-02", "lock_api", "parking-lot", "time", "serde",
|
||||
] }
|
||||
pyo3-stub-gen = { version = "0.17.2" }
|
||||
pyo3-async-runtimes = { version = "0.27.0", features = [
|
||||
"attributes",
|
||||
"tokio-runtime",
|
||||
"testing",
|
||||
] }
|
||||
pyo3-async-runtimes = { version = "0.27.0", features = ["attributes", "tokio-runtime", "testing"] }
|
||||
pyo3-log = "0.13.2"
|
||||
|
||||
# macro dependencies
|
||||
|
||||
@@ -8,14 +8,18 @@ version = "0.2.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
authors = [
|
||||
{ name = "Andrei Cravtov", email = "the.andrei.cravtov@gmail.com" },
|
||||
{ name = "Evan Quiney", email = "evanev7@gmail.com" },
|
||||
{ name = "Andrei Cravtov", email = "the.andrei.cravtov@gmail.com" },
|
||||
{ name = "Evan Quiney", email = "evanev7@gmail.com" }
|
||||
]
|
||||
requires-python = ">=3.13"
|
||||
dependencies = []
|
||||
|
||||
[dependency-groups]
|
||||
dev = ["exo_pyo3_bindings", "pytest>=8.4.0", "pytest-asyncio>=1.0.0"]
|
||||
dev = [
|
||||
"exo_pyo3_bindings",
|
||||
"pytest>=8.4.0",
|
||||
"pytest-asyncio>=1.0.0",
|
||||
]
|
||||
|
||||
[tool.maturin]
|
||||
#purelib = true
|
||||
|
||||
@@ -28,10 +28,7 @@ tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
# utility dependencies
|
||||
util = { workspace = true }
|
||||
tracing-subscriber = { version = "0.3.19", features = [
|
||||
"default",
|
||||
"env-filter",
|
||||
] }
|
||||
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
|
||||
keccak-const = { workspace = true }
|
||||
|
||||
# tracing/logging
|
||||
|
||||
@@ -81,8 +81,6 @@ from exo.shared.types.api import (
|
||||
CreateInstanceResponse,
|
||||
DeleteDownloadResponse,
|
||||
DeleteInstanceResponse,
|
||||
DeleteTracesRequest,
|
||||
DeleteTracesResponse,
|
||||
ErrorInfo,
|
||||
ErrorResponse,
|
||||
FinishReason,
|
||||
@@ -342,7 +340,6 @@ class API:
|
||||
self.app.post("/download/start")(self.start_download)
|
||||
self.app.delete("/download/{node_id}/{model_id:path}")(self.delete_download)
|
||||
self.app.get("/v1/traces")(self.list_traces)
|
||||
self.app.post("/v1/traces/delete")(self.delete_traces)
|
||||
self.app.get("/v1/traces/{task_id}")(self.get_trace)
|
||||
self.app.get("/v1/traces/{task_id}/stats")(self.get_trace_stats)
|
||||
self.app.get("/v1/traces/{task_id}/raw")(self.get_trace_raw)
|
||||
@@ -1710,12 +1707,8 @@ class API:
|
||||
await self._send_download(command)
|
||||
return DeleteDownloadResponse(command_id=command.command_id)
|
||||
|
||||
@staticmethod
|
||||
def _get_trace_path(task_id: str) -> Path:
|
||||
trace_path = EXO_TRACING_CACHE_DIR / f"trace_{task_id}.json"
|
||||
if not trace_path.resolve().is_relative_to(EXO_TRACING_CACHE_DIR.resolve()):
|
||||
raise HTTPException(status_code=400, detail=f"Invalid task ID: {task_id}")
|
||||
return trace_path
|
||||
def _get_trace_path(self, task_id: str) -> Path:
|
||||
return EXO_TRACING_CACHE_DIR / f"trace_{task_id}.json"
|
||||
|
||||
async def list_traces(self) -> TraceListResponse:
|
||||
traces: list[TraceListItem] = []
|
||||
@@ -1814,18 +1807,6 @@ class API:
|
||||
filename=f"trace_{task_id}.json",
|
||||
)
|
||||
|
||||
async def delete_traces(self, request: DeleteTracesRequest) -> DeleteTracesResponse:
|
||||
deleted: list[str] = []
|
||||
not_found: list[str] = []
|
||||
for task_id in request.task_ids:
|
||||
trace_path = self._get_trace_path(task_id)
|
||||
if trace_path.exists():
|
||||
trace_path.unlink()
|
||||
deleted.append(task_id)
|
||||
else:
|
||||
not_found.append(task_id)
|
||||
return DeleteTracesResponse(deleted=deleted, not_found=not_found)
|
||||
|
||||
async def get_onboarding(self) -> JSONResponse:
|
||||
return JSONResponse({"completed": ONBOARDING_COMPLETE_FILE.exists()})
|
||||
|
||||
|
||||
@@ -258,6 +258,6 @@ def get_node_id_keypair(
|
||||
|
||||
# if no valid credentials, create new ones and persist
|
||||
with open(path, "w+b") as f:
|
||||
keypair = Keypair.generate_ed25519()
|
||||
keypair = Keypair.generate()
|
||||
f.write(keypair.to_bytes())
|
||||
return keypair
|
||||
|
||||
@@ -437,12 +437,3 @@ class TraceListItem(CamelCaseModel):
|
||||
|
||||
class TraceListResponse(CamelCaseModel):
|
||||
traces: list[TraceListItem]
|
||||
|
||||
|
||||
class DeleteTracesRequest(CamelCaseModel):
|
||||
task_ids: list[str]
|
||||
|
||||
|
||||
class DeleteTracesResponse(CamelCaseModel):
|
||||
deleted: list[str]
|
||||
not_found: list[str]
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import contextlib
|
||||
import multiprocessing as mp
|
||||
from collections.abc import Generator
|
||||
from dataclasses import dataclass, field
|
||||
from math import inf
|
||||
from multiprocessing.synchronize import Event
|
||||
@@ -282,6 +283,54 @@ class MpReceiver[T]:
|
||||
return d
|
||||
|
||||
|
||||
class NonBlockingGenerator[T](Generator[T | None, None, None]):
|
||||
def __init__(self, source: MpReceiver[T] | Generator[T | None, None, None]) -> None:
|
||||
self._receiver: MpReceiver[T] | None = None
|
||||
self._inner: Generator[T | None, None, None] | None = None
|
||||
if isinstance(source, MpReceiver):
|
||||
self._receiver = source
|
||||
else:
|
||||
self._inner = source
|
||||
self._exhausted = False
|
||||
|
||||
def send(self, value: None, /) -> T | None:
|
||||
if self._exhausted:
|
||||
raise StopIteration
|
||||
if self._inner is not None:
|
||||
try:
|
||||
return next(self._inner)
|
||||
except (StopIteration, ClosedResourceError):
|
||||
self._exhausted = True
|
||||
raise StopIteration from None
|
||||
assert self._receiver is not None
|
||||
try:
|
||||
return self._receiver.receive_nowait()
|
||||
except WouldBlock:
|
||||
return None
|
||||
except (EndOfStream, ClosedResourceError):
|
||||
self._exhausted = True
|
||||
raise StopIteration from None
|
||||
|
||||
def throw(
|
||||
self,
|
||||
typ: type[BaseException] | BaseException,
|
||||
val: BaseException | object = None,
|
||||
tb: TracebackType | None = None,
|
||||
/,
|
||||
) -> T | None:
|
||||
raise StopIteration
|
||||
|
||||
@property
|
||||
def is_exhausted(self) -> bool:
|
||||
return self._exhausted
|
||||
|
||||
def try_receive(self) -> T | None:
|
||||
try:
|
||||
return next(self)
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
|
||||
class channel[T]: # noqa: N801
|
||||
"""Create a pair of asynchronous channels for communicating within the same process"""
|
||||
|
||||
|
||||
@@ -437,6 +437,7 @@ def mlx_generate(
|
||||
group: mx.distributed.Group | None,
|
||||
on_prefill_progress: Callable[[int, int], None] | None = None,
|
||||
distributed_prompt_progress_callback: Callable[[], None] | None = None,
|
||||
on_generation_token: Callable[[], None] | None = None,
|
||||
) -> Generator[GenerationResponse]:
|
||||
# Ensure that generation stats only contains peak memory for this generation
|
||||
mx.reset_peak_memory()
|
||||
@@ -644,6 +645,9 @@ def mlx_generate(
|
||||
full_prompt_tokens, caches, cache_snapshots
|
||||
)
|
||||
|
||||
if on_generation_token is not None:
|
||||
on_generation_token()
|
||||
|
||||
yield GenerationResponse(
|
||||
text=text,
|
||||
token=out.token,
|
||||
|
||||
@@ -297,10 +297,10 @@ def _pending_tasks(
|
||||
# the task status _should_ be set to completed by the LAST runner
|
||||
# it is currently set by the first
|
||||
# this is definitely a hack
|
||||
if task.task_id in runner.completed:
|
||||
if task.task_id in runner.completed or task.task_id in runner.pending:
|
||||
continue
|
||||
|
||||
if isinstance(runner.status, RunnerReady) and all(
|
||||
if isinstance(runner.status, (RunnerReady, RunnerRunning)) and all(
|
||||
isinstance(all_runners[global_runner_id], (RunnerReady, RunnerRunning))
|
||||
for global_runner_id in runner.bound_instance.instance.shard_assignments.runner_to_shard
|
||||
):
|
||||
|
||||
@@ -33,10 +33,15 @@ def entrypoint(
|
||||
try:
|
||||
if bound_instance.is_image_model:
|
||||
from exo.worker.runner.image_models.runner import main
|
||||
else:
|
||||
from exo.worker.runner.llm_inference.runner import main
|
||||
|
||||
main(bound_instance, event_sender, task_receiver, cancel_receiver)
|
||||
main(bound_instance, event_sender, task_receiver, cancel_receiver)
|
||||
else:
|
||||
from exo.worker.runner.llm_inference.runner import Runner
|
||||
|
||||
runner = Runner(
|
||||
bound_instance, event_sender, task_receiver, cancel_receiver
|
||||
)
|
||||
runner.main()
|
||||
|
||||
except ClosedResourceError:
|
||||
logger.warning("Runner communication closed unexpectedly")
|
||||
|
||||
185
src/exo/worker/runner/llm_inference/batch_generator.py
Normal file
185
src/exo/worker/runner/llm_inference/batch_generator.py
Normal file
@@ -0,0 +1,185 @@
|
||||
from collections import deque
|
||||
from collections.abc import Generator
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import mlx.core as mx
|
||||
from mlx_lm.tokenizer_utils import TokenizerWrapper
|
||||
|
||||
from exo.shared.types.chunks import ErrorChunk, PrefillProgressChunk
|
||||
from exo.shared.types.common import ModelId
|
||||
from exo.shared.types.events import ChunkGenerated, Event
|
||||
from exo.shared.types.mlx import Model
|
||||
from exo.shared.types.tasks import TaskId, TextGeneration
|
||||
from exo.shared.types.text_generation import TextGenerationTaskParams
|
||||
from exo.shared.types.worker.runner_response import GenerationResponse
|
||||
from exo.utils.channels import MpReceiver, MpSender
|
||||
from exo.worker.engines.mlx.cache import KVPrefixCache
|
||||
from exo.worker.engines.mlx.generator.generate import PrefillCancelled, mlx_generate
|
||||
from exo.worker.engines.mlx.utils_mlx import (
|
||||
apply_chat_template,
|
||||
mx_any,
|
||||
)
|
||||
|
||||
EXO_RUNNER_MUST_FAIL = "EXO RUNNER MUST FAIL"
|
||||
EXO_RUNNER_MUST_OOM = "EXO RUNNER MUST OOM"
|
||||
EXO_RUNNER_MUST_TIMEOUT = "EXO RUNNER MUST TIMEOUT"
|
||||
|
||||
|
||||
def _check_for_debug_prompts(task_params: TextGenerationTaskParams) -> None:
|
||||
"""Check for debug prompt triggers in the input."""
|
||||
import time
|
||||
|
||||
from exo.worker.engines.mlx.utils_mlx import mlx_force_oom
|
||||
|
||||
if len(task_params.input) == 0:
|
||||
return
|
||||
prompt = task_params.input[0].content
|
||||
if not prompt:
|
||||
return
|
||||
if EXO_RUNNER_MUST_FAIL in prompt:
|
||||
raise Exception("Artificial runner exception - for testing purposes only.")
|
||||
if EXO_RUNNER_MUST_OOM in prompt:
|
||||
mlx_force_oom()
|
||||
if EXO_RUNNER_MUST_TIMEOUT in prompt:
|
||||
time.sleep(100)
|
||||
|
||||
|
||||
@dataclass(eq=False)
|
||||
class BatchGenerator:
|
||||
model: Model
|
||||
tokenizer: TokenizerWrapper
|
||||
group: mx.distributed.Group | None
|
||||
kv_prefix_cache: KVPrefixCache | None
|
||||
model_id: ModelId
|
||||
device_rank: int
|
||||
cancel_receiver: MpReceiver[TaskId]
|
||||
cancelled_tasks: set[TaskId]
|
||||
event_sender: MpSender[Event]
|
||||
check_for_cancel_every: int
|
||||
|
||||
_queue: deque[tuple[TextGeneration, MpSender[GenerationResponse]]] = field(
|
||||
default_factory=deque, init=False
|
||||
)
|
||||
_active: (
|
||||
tuple[
|
||||
TextGeneration,
|
||||
MpSender[GenerationResponse],
|
||||
Generator[GenerationResponse],
|
||||
]
|
||||
| None
|
||||
) = field(default=None, init=False)
|
||||
_pending_close: MpSender[GenerationResponse] | None = field(
|
||||
default=None, init=False
|
||||
)
|
||||
|
||||
def submit(
|
||||
self,
|
||||
task: TextGeneration,
|
||||
sender: MpSender[GenerationResponse],
|
||||
) -> None:
|
||||
self._queue.append((task, sender))
|
||||
if self._active is None:
|
||||
self._start_next()
|
||||
|
||||
def step(self) -> None:
|
||||
if self._pending_close is not None:
|
||||
self._pending_close.close()
|
||||
self._pending_close = None
|
||||
|
||||
if self._active is None:
|
||||
if self._queue:
|
||||
self._start_next()
|
||||
else:
|
||||
return
|
||||
|
||||
if self._active is None:
|
||||
return
|
||||
|
||||
task, sender, gen = self._active
|
||||
try:
|
||||
response = next(gen)
|
||||
sender.send(response)
|
||||
except (StopIteration, PrefillCancelled):
|
||||
self._pending_close = sender
|
||||
self._active = None
|
||||
if self._queue:
|
||||
self._start_next()
|
||||
except Exception as e:
|
||||
self._send_error(task, e)
|
||||
sender.close()
|
||||
self._active = None
|
||||
raise
|
||||
|
||||
def _start_next(self) -> None:
|
||||
task, sender = self._queue.popleft()
|
||||
try:
|
||||
gen = self._build_generator(task)
|
||||
except Exception as e:
|
||||
self._send_error(task, e)
|
||||
sender.close()
|
||||
raise
|
||||
self._active = (task, sender, gen)
|
||||
|
||||
def _send_error(self, task: TextGeneration, e: Exception) -> None:
|
||||
if self.device_rank == 0:
|
||||
self.event_sender.send(
|
||||
ChunkGenerated(
|
||||
command_id=task.command_id,
|
||||
chunk=ErrorChunk(
|
||||
model=self.model_id,
|
||||
finish_reason="error",
|
||||
error_message=str(e),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
def _build_generator(self, task: TextGeneration) -> Generator[GenerationResponse]:
|
||||
_check_for_debug_prompts(task.task_params)
|
||||
prompt = apply_chat_template(self.tokenizer, task.task_params)
|
||||
|
||||
def on_prefill_progress(processed: int, total: int) -> None:
|
||||
if self.device_rank == 0:
|
||||
self.event_sender.send(
|
||||
ChunkGenerated(
|
||||
command_id=task.command_id,
|
||||
chunk=PrefillProgressChunk(
|
||||
model=self.model_id,
|
||||
processed_tokens=processed,
|
||||
total_tokens=total,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
def distributed_prompt_progress_callback() -> None:
|
||||
self.cancelled_tasks.update(self.cancel_receiver.collect())
|
||||
want_to_cancel = (task.task_id in self.cancelled_tasks) or (
|
||||
TaskId("CANCEL_CURRENT_TASK") in self.cancelled_tasks
|
||||
)
|
||||
if mx_any(want_to_cancel, self.group):
|
||||
raise PrefillCancelled()
|
||||
|
||||
tokens_since_cancel_check = self.check_for_cancel_every
|
||||
|
||||
def on_generation_token() -> None:
|
||||
nonlocal tokens_since_cancel_check
|
||||
tokens_since_cancel_check += 1
|
||||
if tokens_since_cancel_check >= self.check_for_cancel_every:
|
||||
tokens_since_cancel_check = 0
|
||||
self.cancelled_tasks.update(self.cancel_receiver.collect())
|
||||
want_to_cancel = (task.task_id in self.cancelled_tasks) or (
|
||||
TaskId("CANCEL_CURRENT_TASK") in self.cancelled_tasks
|
||||
)
|
||||
if mx_any(want_to_cancel, self.group):
|
||||
raise PrefillCancelled()
|
||||
|
||||
return mlx_generate(
|
||||
model=self.model,
|
||||
tokenizer=self.tokenizer,
|
||||
task=task.task_params,
|
||||
prompt=prompt,
|
||||
kv_prefix_cache=self.kv_prefix_cache,
|
||||
on_prefill_progress=on_prefill_progress,
|
||||
distributed_prompt_progress_callback=distributed_prompt_progress_callback,
|
||||
on_generation_token=on_generation_token,
|
||||
group=self.group,
|
||||
)
|
||||
341
src/exo/worker/runner/llm_inference/model_output_parsers.py
Normal file
341
src/exo/worker/runner/llm_inference/model_output_parsers.py
Normal file
@@ -0,0 +1,341 @@
|
||||
from collections.abc import Generator
|
||||
from functools import cache
|
||||
|
||||
from mlx_lm.tokenizer_utils import TokenizerWrapper
|
||||
from openai_harmony import ( # pyright: ignore[reportMissingTypeStubs]
|
||||
HarmonyEncodingName,
|
||||
HarmonyError, # pyright: ignore[reportUnknownVariableType]
|
||||
Role,
|
||||
StreamableParser,
|
||||
load_harmony_encoding,
|
||||
)
|
||||
|
||||
from exo.shared.types.api import ToolCallItem
|
||||
from exo.shared.types.worker.runner_response import GenerationResponse, ToolCallResponse
|
||||
from exo.worker.runner.bootstrap import logger
|
||||
from exo.worker.runner.llm_inference.tool_parsers import ToolParser
|
||||
|
||||
|
||||
@cache
|
||||
def get_gpt_oss_encoding():
|
||||
encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)
|
||||
return encoding
|
||||
|
||||
|
||||
def parse_gpt_oss(
|
||||
responses: Generator[GenerationResponse | None],
|
||||
) -> Generator[GenerationResponse | ToolCallResponse | None]:
|
||||
encoding = get_gpt_oss_encoding()
|
||||
stream = StreamableParser(encoding, role=Role.ASSISTANT)
|
||||
thinking = False
|
||||
current_tool_name: str | None = None
|
||||
tool_arg_parts: list[str] = []
|
||||
|
||||
for response in responses:
|
||||
if response is None:
|
||||
yield None
|
||||
continue
|
||||
try:
|
||||
stream.process(response.token)
|
||||
except HarmonyError:
|
||||
logger.error("Encountered critical Harmony Error, returning early")
|
||||
return
|
||||
|
||||
delta = stream.last_content_delta
|
||||
ch = stream.current_channel
|
||||
recipient = stream.current_recipient
|
||||
|
||||
# Debug: log every token with state
|
||||
logger.debug(
|
||||
f"parse_gpt_oss token={response.token} text={response.text!r} "
|
||||
f"recipient={recipient!r} ch={ch!r} delta={delta!r} "
|
||||
f"state={stream.state} current_tool={current_tool_name!r}"
|
||||
)
|
||||
|
||||
if recipient != current_tool_name:
|
||||
if current_tool_name is not None:
|
||||
prefix = "functions."
|
||||
if current_tool_name.startswith(prefix):
|
||||
current_tool_name = current_tool_name[len(prefix) :]
|
||||
logger.info(
|
||||
f"parse_gpt_oss yielding tool call: name={current_tool_name!r}"
|
||||
)
|
||||
yield ToolCallResponse(
|
||||
tool_calls=[
|
||||
ToolCallItem(
|
||||
name=current_tool_name,
|
||||
arguments="".join(tool_arg_parts).strip(),
|
||||
)
|
||||
],
|
||||
usage=response.usage,
|
||||
)
|
||||
tool_arg_parts = []
|
||||
current_tool_name = recipient
|
||||
|
||||
# If inside a tool call, accumulate arguments
|
||||
if current_tool_name is not None:
|
||||
if delta:
|
||||
tool_arg_parts.append(delta)
|
||||
continue
|
||||
|
||||
if ch == "analysis" and not thinking:
|
||||
thinking = True
|
||||
|
||||
if ch != "analysis" and thinking:
|
||||
thinking = False
|
||||
|
||||
if delta:
|
||||
yield response.model_copy(update={"text": delta, "is_thinking": thinking})
|
||||
|
||||
if response.finish_reason is not None:
|
||||
yield response
|
||||
|
||||
|
||||
def parse_deepseek_v32(
|
||||
responses: Generator[GenerationResponse | None],
|
||||
) -> Generator[GenerationResponse | ToolCallResponse | None]:
|
||||
"""Parse DeepSeek V3.2 DSML tool calls from the generation stream.
|
||||
|
||||
Uses accumulated-text matching (not per-token marker checks) because
|
||||
DSML markers like <|DSML|function_calls> may span multiple tokens.
|
||||
Also handles <think>...</think> blocks for thinking mode.
|
||||
"""
|
||||
from exo.worker.engines.mlx.dsml_encoding import (
|
||||
THINKING_END,
|
||||
THINKING_START,
|
||||
TOOL_CALLS_END,
|
||||
TOOL_CALLS_START,
|
||||
parse_dsml_output,
|
||||
)
|
||||
|
||||
accumulated = ""
|
||||
in_tool_call = False
|
||||
thinking = False
|
||||
# Tokens buffered while we detect the start of a DSML block
|
||||
pending_buffer: list[GenerationResponse] = []
|
||||
# Text accumulated during a tool call block
|
||||
tool_call_text = ""
|
||||
|
||||
for response in responses:
|
||||
if response is None:
|
||||
yield None
|
||||
continue
|
||||
|
||||
# ── Handle thinking tags ──
|
||||
if not thinking and THINKING_START in response.text:
|
||||
thinking = True
|
||||
# Yield any text before the <think> tag
|
||||
before = response.text[: response.text.index(THINKING_START)]
|
||||
if before:
|
||||
yield response.model_copy(update={"text": before})
|
||||
continue
|
||||
|
||||
if thinking and THINKING_END in response.text:
|
||||
thinking = False
|
||||
# Yield any text after the </think> tag
|
||||
after = response.text[
|
||||
response.text.index(THINKING_END) + len(THINKING_END) :
|
||||
]
|
||||
if after:
|
||||
yield response.model_copy(update={"text": after, "is_thinking": False})
|
||||
continue
|
||||
|
||||
if thinking:
|
||||
yield response.model_copy(update={"is_thinking": True})
|
||||
continue
|
||||
|
||||
# ── Handle tool call accumulation ──
|
||||
if in_tool_call:
|
||||
tool_call_text += response.text
|
||||
if TOOL_CALLS_END in tool_call_text:
|
||||
# Parse the accumulated DSML block
|
||||
parsed = parse_dsml_output(tool_call_text)
|
||||
if parsed is not None:
|
||||
logger.info(f"parsed DSML tool calls: {parsed}")
|
||||
yield ToolCallResponse(
|
||||
tool_calls=parsed,
|
||||
usage=response.usage,
|
||||
stats=response.stats,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"DSML tool call parsing failed for: {tool_call_text}"
|
||||
)
|
||||
yield response.model_copy(update={"text": tool_call_text})
|
||||
in_tool_call = False
|
||||
tool_call_text = ""
|
||||
continue
|
||||
|
||||
# EOS reached before end marker — yield buffered text as-is
|
||||
if response.finish_reason is not None:
|
||||
logger.info("DSML tool call parsing interrupted by EOS")
|
||||
yield response.model_copy(update={"text": tool_call_text})
|
||||
in_tool_call = False
|
||||
tool_call_text = ""
|
||||
continue
|
||||
|
||||
# ── Detect start of tool call block ──
|
||||
accumulated += response.text
|
||||
|
||||
if TOOL_CALLS_START in accumulated:
|
||||
# The start marker might be split across pending_buffer + current token
|
||||
start_idx = accumulated.index(TOOL_CALLS_START)
|
||||
# Yield any pending tokens that are purely before the marker
|
||||
pre_text = accumulated[:start_idx]
|
||||
if pre_text:
|
||||
# Flush pending buffer tokens that contributed text before the marker
|
||||
for buf_resp in pending_buffer:
|
||||
if pre_text:
|
||||
chunk = buf_resp.text
|
||||
if len(chunk) <= len(pre_text):
|
||||
yield buf_resp
|
||||
pre_text = pre_text[len(chunk) :]
|
||||
else:
|
||||
yield buf_resp.model_copy(update={"text": pre_text})
|
||||
pre_text = ""
|
||||
pending_buffer = []
|
||||
tool_call_text = accumulated[start_idx:]
|
||||
accumulated = ""
|
||||
|
||||
# Check if the end marker is already present (entire tool call in one token)
|
||||
if TOOL_CALLS_END in tool_call_text:
|
||||
parsed = parse_dsml_output(tool_call_text)
|
||||
if parsed is not None:
|
||||
logger.info(f"parsed DSML tool calls: {parsed}")
|
||||
yield ToolCallResponse(
|
||||
tool_calls=parsed,
|
||||
usage=response.usage,
|
||||
stats=response.stats,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"DSML tool call parsing failed for: {tool_call_text}"
|
||||
)
|
||||
yield response.model_copy(update={"text": tool_call_text})
|
||||
tool_call_text = ""
|
||||
else:
|
||||
in_tool_call = True
|
||||
continue
|
||||
|
||||
# Check if accumulated text might be the start of a DSML marker
|
||||
# Buffer tokens if we see a partial match at the end
|
||||
if _could_be_dsml_prefix(accumulated):
|
||||
pending_buffer.append(response)
|
||||
continue
|
||||
|
||||
# No partial match — flush all pending tokens and the current one
|
||||
for buf_resp in pending_buffer:
|
||||
yield buf_resp
|
||||
pending_buffer = []
|
||||
accumulated = ""
|
||||
yield response
|
||||
|
||||
# Flush any remaining pending buffer at generator end
|
||||
for buf_resp in pending_buffer:
|
||||
yield buf_resp
|
||||
|
||||
|
||||
def _could_be_dsml_prefix(text: str) -> bool:
|
||||
"""Check if the end of text could be the start of a DSML function_calls marker.
|
||||
|
||||
We look for suffixes of text that are prefixes of the TOOL_CALLS_START pattern.
|
||||
This allows us to buffer tokens until we can determine if a tool call is starting.
|
||||
"""
|
||||
from exo.worker.engines.mlx.dsml_encoding import TOOL_CALLS_START
|
||||
|
||||
# Only check the last portion of text that could overlap with the marker
|
||||
max_check = len(TOOL_CALLS_START)
|
||||
tail = text[-max_check:] if len(text) > max_check else text
|
||||
|
||||
# Check if any suffix of tail is a prefix of TOOL_CALLS_START
|
||||
for i in range(len(tail)):
|
||||
suffix = tail[i:]
|
||||
if TOOL_CALLS_START.startswith(suffix):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def parse_thinking_models(
|
||||
responses: Generator[GenerationResponse | None],
|
||||
tokenizer: TokenizerWrapper,
|
||||
starts_in_thinking: bool = True,
|
||||
) -> Generator[GenerationResponse | None]:
|
||||
"""Route thinking tokens via is_thinking flag.
|
||||
|
||||
Swallows think tag tokens, sets is_thinking on all others.
|
||||
Always yields tokens with finish_reason to avoid hanging the chunk stream.
|
||||
"""
|
||||
in_thinking = starts_in_thinking
|
||||
for response in responses:
|
||||
if response is None:
|
||||
yield None
|
||||
continue
|
||||
if isinstance(response, ToolCallResponse):
|
||||
yield response
|
||||
continue
|
||||
|
||||
is_think_tag = (
|
||||
tokenizer.think_end is not None and response.text == tokenizer.think_end
|
||||
) or (
|
||||
tokenizer.think_start is not None and response.text == tokenizer.think_start
|
||||
)
|
||||
|
||||
if is_think_tag:
|
||||
in_thinking = response.text != tokenizer.think_end
|
||||
# Never swallow finish_reason — the chunk stream needs it to terminate.
|
||||
if response.finish_reason is not None:
|
||||
yield response.model_copy(update={"text": "", "is_thinking": False})
|
||||
continue
|
||||
yield response.model_copy(update={"is_thinking": in_thinking})
|
||||
|
||||
|
||||
def parse_tool_calls(
|
||||
responses: Generator[GenerationResponse | None], tool_parser: ToolParser
|
||||
) -> Generator[GenerationResponse | ToolCallResponse | None]:
|
||||
in_tool_call = False
|
||||
tool_call_text_parts: list[str] = []
|
||||
for response in responses:
|
||||
if response is None:
|
||||
yield None
|
||||
continue
|
||||
if not in_tool_call and response.text.startswith(tool_parser.start_parsing):
|
||||
in_tool_call = True
|
||||
|
||||
if in_tool_call:
|
||||
tool_call_text_parts.append(response.text)
|
||||
if response.text.endswith(tool_parser.end_parsing):
|
||||
# parse the actual tool calls from the tool call text
|
||||
parsed = tool_parser.parse_tool_calls(
|
||||
"".join(tool_call_text_parts).strip()
|
||||
)
|
||||
logger.info(f"parsed {tool_call_text_parts=} into {parsed=}")
|
||||
if parsed is not None:
|
||||
yield ToolCallResponse(
|
||||
tool_calls=parsed, usage=response.usage, stats=response.stats
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"tool call parsing failed for text {''.join(tool_call_text_parts)}"
|
||||
)
|
||||
response.text = "".join(tool_call_text_parts)
|
||||
yield response
|
||||
|
||||
in_tool_call = False
|
||||
tool_call_text_parts = []
|
||||
continue
|
||||
|
||||
if response.finish_reason is not None:
|
||||
logger.info(
|
||||
"tool call parsing interrupted, yield partial tool call as text"
|
||||
)
|
||||
response = response.model_copy(
|
||||
update={
|
||||
"text": "".join(tool_call_text_parts),
|
||||
"token": 0,
|
||||
}
|
||||
)
|
||||
yield response
|
||||
|
||||
else:
|
||||
# fallthrough
|
||||
yield response
|
||||
File diff suppressed because it is too large
Load Diff
@@ -172,7 +172,7 @@ class RunnerSupervisor:
|
||||
if isinstance(event, RunnerStatusUpdated):
|
||||
self.status = event.runner_status
|
||||
if isinstance(event, TaskAcknowledged):
|
||||
self.pending.pop(event.task_id).set()
|
||||
self.pending[event.task_id].set()
|
||||
continue
|
||||
if (
|
||||
isinstance(event, TaskStatusUpdated)
|
||||
@@ -190,6 +190,7 @@ class RunnerSupervisor:
|
||||
),
|
||||
)
|
||||
self.completed.add(event.task_id)
|
||||
self.pending.pop(event.task_id, None)
|
||||
await self._event_sender.send(event)
|
||||
except (ClosedResourceError, BrokenResourceError) as e:
|
||||
await self._check_runner(e)
|
||||
|
||||
@@ -20,6 +20,7 @@ class FakeRunnerSupervisor:
|
||||
bound_instance: BoundInstance
|
||||
status: RunnerStatus
|
||||
completed: set[TaskId] = field(default_factory=set)
|
||||
pending: dict[TaskId, object] = field(default_factory=dict)
|
||||
|
||||
|
||||
class OtherTask(BaseTask):
|
||||
|
||||
@@ -19,7 +19,7 @@ from exo.worker.engines.mlx.dsml_encoding import (
|
||||
encode_messages,
|
||||
parse_dsml_output,
|
||||
)
|
||||
from exo.worker.runner.llm_inference.runner import parse_deepseek_v32
|
||||
from exo.worker.runner.llm_inference.model_output_parsers import parse_deepseek_v32
|
||||
|
||||
# ── Shared fixtures ──────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Callable
|
||||
import mlx.core as mx
|
||||
import pytest
|
||||
|
||||
import exo.worker.runner.llm_inference.batch_generator as mlx_batch_generator
|
||||
import exo.worker.runner.llm_inference.runner as mlx_runner
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.events import (
|
||||
@@ -115,26 +116,32 @@ def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(MockGroup()))
|
||||
monkeypatch.setattr(mlx_runner, "load_mlx_items", make_nothin((1, MockTokenizer)))
|
||||
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
|
||||
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", nothin)
|
||||
monkeypatch.setattr(mlx_runner, "mx_any", make_nothin(False))
|
||||
monkeypatch.setattr(mlx_batch_generator, "_check_for_debug_prompts", nothin)
|
||||
monkeypatch.setattr(mlx_batch_generator, "mx_any", make_nothin(False))
|
||||
# Mock apply_chat_template since we're using a fake tokenizer (integer 1).
|
||||
# Returns a prompt without thinking tag so detect_thinking_prompt_suffix returns None.
|
||||
monkeypatch.setattr(mlx_runner, "apply_chat_template", make_nothin("test prompt"))
|
||||
monkeypatch.setattr(
|
||||
mlx_batch_generator, "apply_chat_template", make_nothin("test prompt")
|
||||
)
|
||||
monkeypatch.setattr(mlx_runner, "detect_thinking_prompt_suffix", make_nothin(False))
|
||||
|
||||
def fake_generate(*_1: object, **_2: object):
|
||||
yield GenerationResponse(token=0, text="hi", finish_reason="stop", usage=None)
|
||||
|
||||
monkeypatch.setattr(mlx_runner, "mlx_generate", fake_generate)
|
||||
monkeypatch.setattr(mlx_batch_generator, "mlx_generate", fake_generate)
|
||||
|
||||
|
||||
# Use a fake event_sender to remove test flakiness.
|
||||
class EventCollector:
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, on_event: Callable[[Event], None] | None = None) -> None:
|
||||
self.events: list[Event] = []
|
||||
self._on_event = on_event
|
||||
|
||||
def send(self, event: Event) -> None:
|
||||
self.events.append(event)
|
||||
if self._on_event:
|
||||
self._on_event(event)
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
@@ -159,7 +166,7 @@ class MockGroup:
|
||||
return 1
|
||||
|
||||
|
||||
def _run(tasks: Iterable[Task]):
|
||||
def _run(tasks: Iterable[Task], send_after_ready: list[Task] | None = None):
|
||||
bound_instance = get_bound_mlx_ring_instance(
|
||||
instance_id=INSTANCE_1_ID,
|
||||
model_id=MODEL_A_ID,
|
||||
@@ -169,7 +176,23 @@ def _run(tasks: Iterable[Task]):
|
||||
|
||||
task_sender, task_receiver = mp_channel[Task]()
|
||||
_cancel_sender, cancel_receiver = mp_channel[TaskId]()
|
||||
event_sender = EventCollector()
|
||||
|
||||
on_event: Callable[[Event], None] | None = None
|
||||
if send_after_ready:
|
||||
_saw_running = False
|
||||
|
||||
def _on_event(event: Event) -> None:
|
||||
nonlocal _saw_running
|
||||
if isinstance(event, RunnerStatusUpdated):
|
||||
if isinstance(event.runner_status, RunnerRunning):
|
||||
_saw_running = True
|
||||
elif _saw_running and isinstance(event.runner_status, RunnerReady):
|
||||
for t in send_after_ready:
|
||||
task_sender.send(t)
|
||||
|
||||
on_event = _on_event
|
||||
|
||||
event_sender = EventCollector(on_event=on_event)
|
||||
|
||||
with task_sender:
|
||||
for t in tasks:
|
||||
@@ -183,18 +206,22 @@ def _run(tasks: Iterable[Task]):
|
||||
"exo.worker.runner.llm_inference.runner.mx.distributed.all_gather",
|
||||
make_nothin(mx.array([1])),
|
||||
):
|
||||
mlx_runner.main(
|
||||
runner = mlx_runner.Runner(
|
||||
bound_instance,
|
||||
event_sender, # pyright: ignore[reportArgumentType]
|
||||
task_receiver,
|
||||
cancel_receiver,
|
||||
)
|
||||
runner.main()
|
||||
|
||||
return event_sender.events
|
||||
|
||||
|
||||
def test_events_processed_in_correct_order(patch_out_mlx: pytest.MonkeyPatch):
|
||||
events = _run([INIT_TASK, LOAD_TASK, WARMUP_TASK, CHAT_TASK, SHUTDOWN_TASK])
|
||||
events = _run(
|
||||
[INIT_TASK, LOAD_TASK, WARMUP_TASK, CHAT_TASK],
|
||||
send_after_ready=[SHUTDOWN_TASK],
|
||||
)
|
||||
|
||||
expected_chunk = ChunkGenerated(
|
||||
command_id=COMMAND_1_ID,
|
||||
|
||||
@@ -4,7 +4,7 @@ from exo.shared.types.worker.runner_response import (
|
||||
GenerationResponse,
|
||||
ToolCallResponse,
|
||||
)
|
||||
from exo.worker.runner.llm_inference.runner import parse_gpt_oss
|
||||
from exo.worker.runner.llm_inference.model_output_parsers import parse_gpt_oss
|
||||
|
||||
# Token IDs from mlx-community/gpt-oss-20b-MXFP4-Q8 tokenizer.
|
||||
# These are stable since they come from the model's vocabulary.
|
||||
@@ -107,7 +107,7 @@ def _collect(
|
||||
def _gen() -> Generator[GenerationResponse, None, None]:
|
||||
yield from _make_gen_responses(tokens)
|
||||
|
||||
return list(parse_gpt_oss(_gen()))
|
||||
return list(x for x in parse_gpt_oss(_gen()) if x is not None)
|
||||
|
||||
|
||||
def _get_tool_call(
|
||||
|
||||
@@ -4,7 +4,7 @@ from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
from exo.shared.types.worker.runner_response import GenerationResponse, ToolCallResponse
|
||||
from exo.worker.runner.llm_inference.runner import parse_tool_calls
|
||||
from exo.worker.runner.llm_inference.model_output_parsers import parse_tool_calls
|
||||
from exo.worker.runner.llm_inference.tool_parsers import make_mlx_parser
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user