mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-16 01:51:03 -05:00
Compare commits
1 Commits
alexcheema
...
model-card
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
642b1bb1b4 |
@@ -276,23 +276,24 @@ class BatchGenerator:
|
||||
logprobs: mx.array
|
||||
finish_reason: Optional[str]
|
||||
|
||||
unprocessed_prompts: List[Any]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: nn.Module,
|
||||
model,
|
||||
max_tokens: int = ...,
|
||||
stop_tokens: Optional[set[int]] = ...,
|
||||
stop_tokens: Optional[set] = ...,
|
||||
sampler: Optional[Callable[[mx.array], mx.array]] = ...,
|
||||
completion_batch_size: int = ...,
|
||||
prefill_batch_size: int = ...,
|
||||
prefill_step_size: int = ...,
|
||||
) -> None: ...
|
||||
def insert(
|
||||
self, prompts: List[List[int]], max_tokens: Union[List[int], int, None] = ...
|
||||
) -> List[int]: ...
|
||||
def stats(self) -> BatchStats: ...
|
||||
def next(self) -> List[Response]: ...
|
||||
self, prompts, max_tokens: Union[List[int], int, None] = ...
|
||||
): # -> list[Any]:
|
||||
...
|
||||
def stats(self): # -> BatchStats:
|
||||
...
|
||||
def next(self): # -> list[Any]:
|
||||
...
|
||||
|
||||
def batch_generate(
|
||||
model,
|
||||
|
||||
@@ -39,18 +39,12 @@ class StreamingDetokenizer:
|
||||
"""
|
||||
|
||||
__slots__ = ...
|
||||
tokens: list[int]
|
||||
def reset(self) -> None: ...
|
||||
def add_token(self, token: int) -> None: ...
|
||||
def finalize(self) -> None: ...
|
||||
def reset(self): ...
|
||||
def add_token(self, token): ...
|
||||
def finalize(self): ...
|
||||
@property
|
||||
def text(self) -> str:
|
||||
"""The full text decoded so far."""
|
||||
...
|
||||
@property
|
||||
def last_segment(self) -> str:
|
||||
def last_segment(self):
|
||||
"""Return the last segment of readable text since last time this property was accessed."""
|
||||
...
|
||||
|
||||
class NaiveStreamingDetokenizer(StreamingDetokenizer):
|
||||
"""NaiveStreamingDetokenizer relies on the underlying tokenizer
|
||||
@@ -114,7 +108,6 @@ class TokenizerWrapper:
|
||||
_tokenizer: PreTrainedTokenizerFast
|
||||
eos_token_id: int | None
|
||||
eos_token: str | None
|
||||
eos_token_ids: list[int] | None
|
||||
bos_token_id: int | None
|
||||
bos_token: str | None
|
||||
vocab_size: int
|
||||
|
||||
39
AGENTS.md
39
AGENTS.md
@@ -91,45 +91,6 @@ From .cursorrules:
|
||||
- Catch exceptions only where you can handle them meaningfully
|
||||
- Use `@final` and immutability wherever applicable
|
||||
|
||||
## Model Storage
|
||||
|
||||
Downloaded models are stored in `~/.exo/models/` (not the standard HuggingFace cache location).
|
||||
|
||||
## Creating Model Instances via API
|
||||
|
||||
When testing with the API, you must first create a model instance before sending chat completions:
|
||||
|
||||
```bash
|
||||
# 1. Get instance previews for a model
|
||||
curl "http://localhost:52415/instance/previews?model_id=llama-3.2-1b"
|
||||
|
||||
# 2. Create an instance from the first valid preview
|
||||
INSTANCE=$(curl -s "http://localhost:52415/instance/previews?model_id=llama-3.2-1b" | jq -c '.previews[] | select(.error == null) | .instance' | head -n1)
|
||||
curl -X POST http://localhost:52415/instance -H 'Content-Type: application/json' -d "{\"instance\": $INSTANCE}"
|
||||
|
||||
# 3. Wait for the runner to become ready (check logs for "runner ready")
|
||||
|
||||
# 4. Send chat completions using the full model ID
|
||||
curl -X POST http://localhost:52415/v1/chat/completions \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"model": "mlx-community/Llama-3.2-1B-Instruct-4bit", "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 50}'
|
||||
```
|
||||
|
||||
## Logs
|
||||
|
||||
Exo logs are stored in `~/.exo/exo.log`. This is useful for debugging runner crashes and distributed issues.
|
||||
|
||||
## Testing
|
||||
|
||||
Tests use pytest-asyncio with `asyncio_mode = "auto"`. Tests are in `tests/` subdirectories alongside the code they test. The `EXO_TESTS=1` env var is set during tests.
|
||||
|
||||
### Distributed Testing
|
||||
|
||||
When running distributed tests across multiple machines, use `EXO_LIBP2P_NAMESPACE` to isolate your test cluster from other exo instances on the same network:
|
||||
|
||||
```bash
|
||||
# On each machine in the test cluster, use the same unique namespace
|
||||
EXO_LIBP2P_NAMESPACE=my-test-cluster uv run exo
|
||||
```
|
||||
|
||||
This prevents your test cluster from discovering and interfering with production or other developers' exo clusters.
|
||||
|
||||
@@ -60,39 +60,12 @@
|
||||
return models;
|
||||
});
|
||||
|
||||
// Track previous model IDs to detect newly added models (plain variable to avoid reactive loop)
|
||||
let previousModelIds: Set<string> = new Set();
|
||||
|
||||
// Auto-select the first available model if none is selected, if current selection is stale, or if a new model is added
|
||||
// Auto-select the first available model if none is selected
|
||||
$effect(() => {
|
||||
const models = availableModels();
|
||||
const currentModelIds = new Set(models.map(m => m.id));
|
||||
|
||||
if (models.length > 0) {
|
||||
// Find newly added models (in current but not in previous)
|
||||
const newModels = models.filter(m => !previousModelIds.has(m.id));
|
||||
|
||||
// If no model selected, select the first available
|
||||
if (!currentModel) {
|
||||
setSelectedChatModel(models[0].id);
|
||||
}
|
||||
// If current model is stale (no longer has a running instance), reset to first available
|
||||
else if (!models.some(m => m.id === currentModel)) {
|
||||
setSelectedChatModel(models[0].id);
|
||||
}
|
||||
// If a new model was just added, select it
|
||||
else if (newModels.length > 0 && previousModelIds.size > 0) {
|
||||
setSelectedChatModel(newModels[0].id);
|
||||
}
|
||||
} else {
|
||||
// No instances running - clear the selected model
|
||||
if (currentModel) {
|
||||
setSelectedChatModel('');
|
||||
}
|
||||
if (models.length > 0 && !currentModel) {
|
||||
setSelectedChatModel(models[0].id);
|
||||
}
|
||||
|
||||
// Update previous model IDs for next comparison
|
||||
previousModelIds = currentModelIds;
|
||||
});
|
||||
|
||||
function getInstanceModelId(instanceWrapped: unknown): string {
|
||||
|
||||
@@ -400,8 +400,10 @@ function toggleInstanceDownloadDetails(nodeId: string): void {
|
||||
const errorText = await response.text();
|
||||
console.error('Failed to launch instance:', errorText);
|
||||
} else {
|
||||
// Always auto-select the newly launched model so the user chats to what they just launched
|
||||
setSelectedChatModel(modelId);
|
||||
// Auto-select the launched model only if no model is currently selected
|
||||
if (!selectedChatModel()) {
|
||||
setSelectedChatModel(modelId);
|
||||
}
|
||||
|
||||
// Scroll to the bottom of instances container to show the new instance
|
||||
// Use multiple attempts to ensure DOM has updated with the new instance
|
||||
@@ -761,10 +763,6 @@ function toggleInstanceDownloadDetails(nodeId: string): void {
|
||||
async function deleteInstance(instanceId: string) {
|
||||
if (!confirm(`Delete instance ${instanceId.slice(0, 8)}...?`)) return;
|
||||
|
||||
// Get the model ID of the instance being deleted before we delete it
|
||||
const deletedInstanceModelId = getInstanceModelId(instanceData[instanceId]);
|
||||
const wasSelected = selectedChatModel() === deletedInstanceModelId;
|
||||
|
||||
try {
|
||||
const response = await fetch(`/instance/${instanceId}`, {
|
||||
method: 'DELETE',
|
||||
@@ -773,24 +771,6 @@ function toggleInstanceDownloadDetails(nodeId: string): void {
|
||||
|
||||
if (!response.ok) {
|
||||
console.error('Failed to delete instance:', response.status);
|
||||
} else if (wasSelected) {
|
||||
// If we deleted the currently selected model, switch to another available model
|
||||
// Find another instance that isn't the one we just deleted
|
||||
const remainingInstances = Object.entries(instanceData).filter(([id]) => id !== instanceId);
|
||||
if (remainingInstances.length > 0) {
|
||||
// Select the last instance (most recently added, since objects preserve insertion order)
|
||||
const [, lastInstance] = remainingInstances[remainingInstances.length - 1];
|
||||
const newModelId = getInstanceModelId(lastInstance);
|
||||
if (newModelId && newModelId !== 'Unknown' && newModelId !== 'Unknown Model') {
|
||||
setSelectedChatModel(newModelId);
|
||||
} else {
|
||||
// Clear selection if no valid model found
|
||||
setSelectedChatModel('');
|
||||
}
|
||||
} else {
|
||||
// No more instances, clear the selection
|
||||
setSelectedChatModel('');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error deleting instance:', error);
|
||||
|
||||
2
justfile
2
justfile
@@ -1,5 +1,3 @@
|
||||
export NIX_CONFIG := "extra-experimental-features = nix-command flakes"
|
||||
|
||||
fmt:
|
||||
nix fmt
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ dependencies = [
|
||||
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
|
||||
"hypercorn>=0.18.0",
|
||||
"openai-harmony>=0.0.8",
|
||||
"tomlkit>=0.14.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
15
resources/model_cards/deepseek-v3.1-4bit.toml
Normal file
15
resources/model_cards/deepseek-v3.1-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "deepseek-v3.1-4bit"
|
||||
model_id = "mlx-community/DeepSeek-V3.1-4bit"
|
||||
name = "DeepSeek V3.1 (4-bit)"
|
||||
description = "DeepSeek V3.1 is a large language model trained on the DeepSeek V3.1 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/DeepSeek-V3.1-4bit"
|
||||
pretty_name = "DeepSeek V3.1 (4-bit)"
|
||||
n_layers = 61
|
||||
hidden_size = 7168
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 405874409472
|
||||
15
resources/model_cards/deepseek-v3.1-8bit.toml
Normal file
15
resources/model_cards/deepseek-v3.1-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "deepseek-v3.1-8bit"
|
||||
model_id = "mlx-community/DeepSeek-V3.1-8bit"
|
||||
name = "DeepSeek V3.1 (8-bit)"
|
||||
description = "DeepSeek V3.1 is a large language model trained on the DeepSeek V3.1 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/DeepSeek-V3.1-8bit"
|
||||
pretty_name = "DeepSeek V3.1 (8-bit)"
|
||||
n_layers = 61
|
||||
hidden_size = 7168
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 765577920512
|
||||
15
resources/model_cards/glm-4.5-air-8bit.toml
Normal file
15
resources/model_cards/glm-4.5-air-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "glm-4.5-air-8bit"
|
||||
model_id = "mlx-community/GLM-4.5-Air-8bit"
|
||||
name = "GLM 4.5 Air 8bit"
|
||||
description = "GLM 4.5 Air 8bit"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/GLM-4.5-Air-8bit"
|
||||
pretty_name = "GLM 4.5 Air 8bit"
|
||||
n_layers = 46
|
||||
hidden_size = 4096
|
||||
supports_tensor = false
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 122406567936
|
||||
15
resources/model_cards/glm-4.5-air-bf16.toml
Normal file
15
resources/model_cards/glm-4.5-air-bf16.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "glm-4.5-air-bf16"
|
||||
model_id = "mlx-community/GLM-4.5-Air-bf16"
|
||||
name = "GLM 4.5 Air bf16"
|
||||
description = "GLM 4.5 Air bf16"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/GLM-4.5-Air-bf16"
|
||||
pretty_name = "GLM 4.5 Air bf16"
|
||||
n_layers = 46
|
||||
hidden_size = 4096
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 229780750336
|
||||
15
resources/model_cards/glm-4.7-4bit.toml
Normal file
15
resources/model_cards/glm-4.7-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "glm-4.7-4bit"
|
||||
model_id = "mlx-community/GLM-4.7-4bit"
|
||||
name = "GLM 4.7 4bit"
|
||||
description = "GLM 4.7 4bit"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/GLM-4.7-4bit"
|
||||
pretty_name = "GLM 4.7 4bit"
|
||||
n_layers = 91
|
||||
hidden_size = 5120
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 198556925568
|
||||
15
resources/model_cards/glm-4.7-6bit.toml
Normal file
15
resources/model_cards/glm-4.7-6bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "glm-4.7-6bit"
|
||||
model_id = "mlx-community/GLM-4.7-6bit"
|
||||
name = "GLM 4.7 6bit"
|
||||
description = "GLM 4.7 6bit"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/GLM-4.7-6bit"
|
||||
pretty_name = "GLM 4.7 6bit"
|
||||
n_layers = 91
|
||||
hidden_size = 5120
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 286737579648
|
||||
15
resources/model_cards/glm-4.7-8bit-gs32.toml
Normal file
15
resources/model_cards/glm-4.7-8bit-gs32.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "glm-4.7-8bit-gs32"
|
||||
model_id = "mlx-community/GLM-4.7-8bit-gs32"
|
||||
name = "GLM 4.7 8bit (gs32)"
|
||||
description = "GLM 4.7 8bit (gs32)"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/GLM-4.7-8bit-gs32"
|
||||
pretty_name = "GLM 4.7 8bit (gs32)"
|
||||
n_layers = 91
|
||||
hidden_size = 5120
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 396963397248
|
||||
15
resources/model_cards/gpt-oss-120b-MXFP4-Q8.toml
Normal file
15
resources/model_cards/gpt-oss-120b-MXFP4-Q8.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "gpt-oss-120b-MXFP4-Q8"
|
||||
model_id = "mlx-community/gpt-oss-120b-MXFP4-Q8"
|
||||
name = "GPT-OSS 120B (MXFP4-Q8, MLX)"
|
||||
description = "OpenAI's GPT-OSS 120B is a 117B-parameter Mixture-of-Experts model designed for high-reasoning and general-purpose use; this variant is a 4-bit MLX conversion for Apple Silicon."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/gpt-oss-120b-MXFP4-Q8"
|
||||
pretty_name = "GPT-OSS 120B (MXFP4-Q8, MLX)"
|
||||
n_layers = 36
|
||||
hidden_size = 2880
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 70652212224
|
||||
15
resources/model_cards/gpt-oss-20b-4bit.toml
Normal file
15
resources/model_cards/gpt-oss-20b-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "gpt-oss-20b-4bit"
|
||||
model_id = "mlx-community/gpt-oss-20b-MXFP4-Q4"
|
||||
name = "GPT-OSS 20B (MXFP4-Q4, MLX)"
|
||||
description = "OpenAI's GPT-OSS 20B is a medium-sized MoE model for lower-latency and local or specialized use cases; this MLX variant uses MXFP4 4-bit quantization."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/gpt-oss-20b-MXFP4-Q4"
|
||||
pretty_name = "GPT-OSS 20B (MXFP4-Q4, MLX)"
|
||||
n_layers = 24
|
||||
hidden_size = 2880
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 12025908224
|
||||
15
resources/model_cards/kimi-k2-instruct-4bit.toml
Normal file
15
resources/model_cards/kimi-k2-instruct-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "kimi-k2-instruct-4bit"
|
||||
model_id = "mlx-community/Kimi-K2-Instruct-4bit"
|
||||
name = "Kimi K2 Instruct (4-bit)"
|
||||
description = "Kimi K2 is a large language model trained on the Kimi K2 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Kimi-K2-Instruct-4bit"
|
||||
pretty_name = "Kimi K2 Instruct (4-bit)"
|
||||
n_layers = 61
|
||||
hidden_size = 7168
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 620622774272
|
||||
15
resources/model_cards/kimi-k2-thinking.toml
Normal file
15
resources/model_cards/kimi-k2-thinking.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "kimi-k2-thinking"
|
||||
model_id = "mlx-community/Kimi-K2-Thinking"
|
||||
name = "Kimi K2 Thinking (4-bit)"
|
||||
description = "Kimi K2 Thinking is the latest, most capable version of open-source thinking model."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Kimi-K2-Thinking"
|
||||
pretty_name = "Kimi K2 Thinking (4-bit)"
|
||||
n_layers = 61
|
||||
hidden_size = 7168
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 706522120192
|
||||
15
resources/model_cards/llama-3.1-70b.toml
Normal file
15
resources/model_cards/llama-3.1-70b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.1-70b"
|
||||
model_id = "mlx-community/Meta-Llama-3.1-70B-Instruct-4bit"
|
||||
name = "Llama 3.1 70B (4-bit)"
|
||||
description = "Llama 3.1 is a large language model trained on the Llama 3.1 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Meta-Llama-3.1-70B-Instruct-4bit"
|
||||
pretty_name = "Llama 3.1 70B (4-bit)"
|
||||
n_layers = 80
|
||||
hidden_size = 8192
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 40652242944
|
||||
15
resources/model_cards/llama-3.1-8b-8bit.toml
Normal file
15
resources/model_cards/llama-3.1-8b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.1-8b-8bit"
|
||||
model_id = "mlx-community/Meta-Llama-3.1-8B-Instruct-8bit"
|
||||
name = "Llama 3.1 8B (8-bit)"
|
||||
description = "Llama 3.1 is a large language model trained on the Llama 3.1 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Meta-Llama-3.1-8B-Instruct-8bit"
|
||||
pretty_name = "Llama 3.1 8B (8-bit)"
|
||||
n_layers = 32
|
||||
hidden_size = 4096
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 8954839040
|
||||
15
resources/model_cards/llama-3.1-8b-bf16.toml
Normal file
15
resources/model_cards/llama-3.1-8b-bf16.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.1-8b-bf16"
|
||||
model_id = "mlx-community/Meta-Llama-3.1-8B-Instruct-bf16"
|
||||
name = "Llama 3.1 8B (BF16)"
|
||||
description = "Llama 3.1 is a large language model trained on the Llama 3.1 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Meta-Llama-3.1-8B-Instruct-bf16"
|
||||
pretty_name = "Llama 3.1 8B (BF16)"
|
||||
n_layers = 32
|
||||
hidden_size = 4096
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 16882073600
|
||||
15
resources/model_cards/llama-3.1-8b.toml
Normal file
15
resources/model_cards/llama-3.1-8b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.1-8b"
|
||||
model_id = "mlx-community/Meta-Llama-3.1-8B-Instruct-4bit"
|
||||
name = "Llama 3.1 8B (4-bit)"
|
||||
description = "Llama 3.1 is a large language model trained on the Llama 3.1 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Meta-Llama-3.1-8B-Instruct-4bit"
|
||||
pretty_name = "Llama 3.1 8B (4-bit)"
|
||||
n_layers = 32
|
||||
hidden_size = 4096
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 4637851648
|
||||
15
resources/model_cards/llama-3.2-1b.toml
Normal file
15
resources/model_cards/llama-3.2-1b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.2-1b"
|
||||
model_id = "mlx-community/Llama-3.2-1B-Instruct-4bit"
|
||||
name = "Llama 3.2 1B (4-bit)"
|
||||
description = "Llama 3.2 is a large language model trained on the Llama 3.2 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Llama-3.2-1B-Instruct-4bit"
|
||||
pretty_name = "Llama 3.2 1B (4-bit)"
|
||||
n_layers = 16
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 729808896
|
||||
15
resources/model_cards/llama-3.2-3b-8bit.toml
Normal file
15
resources/model_cards/llama-3.2-3b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.2-3b-8bit"
|
||||
model_id = "mlx-community/Llama-3.2-3B-Instruct-8bit"
|
||||
name = "Llama 3.2 3B (8-bit)"
|
||||
description = "Llama 3.2 is a large language model trained on the Llama 3.2 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Llama-3.2-3B-Instruct-8bit"
|
||||
pretty_name = "Llama 3.2 3B (8-bit)"
|
||||
n_layers = 28
|
||||
hidden_size = 3072
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 3501195264
|
||||
15
resources/model_cards/llama-3.2-3b.toml
Normal file
15
resources/model_cards/llama-3.2-3b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.2-3b"
|
||||
model_id = "mlx-community/Llama-3.2-3B-Instruct-4bit"
|
||||
name = "Llama 3.2 3B (4-bit)"
|
||||
description = "Llama 3.2 is a large language model trained on the Llama 3.2 dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Llama-3.2-3B-Instruct-4bit"
|
||||
pretty_name = "Llama 3.2 3B (4-bit)"
|
||||
n_layers = 28
|
||||
hidden_size = 3072
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 1863319552
|
||||
15
resources/model_cards/llama-3.3-70b-8bit.toml
Normal file
15
resources/model_cards/llama-3.3-70b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.3-70b-8bit"
|
||||
model_id = "mlx-community/Llama-3.3-70B-Instruct-8bit"
|
||||
name = "Llama 3.3 70B (8-bit)"
|
||||
description = "The Meta Llama 3.3 multilingual large language model (LLM) is an instruction tuned generative model in 70B (text in/text out)"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Llama-3.3-70B-Instruct-8bit"
|
||||
pretty_name = "Llama 3.3 70B (8-bit)"
|
||||
n_layers = 80
|
||||
hidden_size = 8192
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 76799803392
|
||||
15
resources/model_cards/llama-3.3-70b-fp16.toml
Normal file
15
resources/model_cards/llama-3.3-70b-fp16.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.3-70b-fp16"
|
||||
model_id = "mlx-community/llama-3.3-70b-instruct-fp16"
|
||||
name = "Llama 3.3 70B (FP16)"
|
||||
description = "The Meta Llama 3.3 multilingual large language model (LLM) is an instruction tuned generative model in 70B (text in/text out)"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/llama-3.3-70b-instruct-fp16"
|
||||
pretty_name = "Llama 3.3 70B (FP16)"
|
||||
n_layers = 80
|
||||
hidden_size = 8192
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 144383672320
|
||||
15
resources/model_cards/llama-3.3-70b.toml
Normal file
15
resources/model_cards/llama-3.3-70b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "llama-3.3-70b"
|
||||
model_id = "mlx-community/Llama-3.3-70B-Instruct-4bit"
|
||||
name = "Llama 3.3 70B (4-bit)"
|
||||
description = "The Meta Llama 3.3 multilingual large language model (LLM) is an instruction tuned generative model in 70B (text in/text out)"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Llama-3.3-70B-Instruct-4bit"
|
||||
pretty_name = "Llama 3.3 70B"
|
||||
n_layers = 80
|
||||
hidden_size = 8192
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 40652242944
|
||||
15
resources/model_cards/minimax-m2.1-3bit.toml
Normal file
15
resources/model_cards/minimax-m2.1-3bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "minimax-m2.1-3bit"
|
||||
model_id = "mlx-community/MiniMax-M2.1-3bit"
|
||||
name = "MiniMax M2.1 3bit"
|
||||
description = "MiniMax M2.1 3bit"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/MiniMax-M2.1-3bit"
|
||||
pretty_name = "MiniMax M2.1 3bit"
|
||||
n_layers = 61
|
||||
hidden_size = 3072
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 100086644736
|
||||
15
resources/model_cards/minimax-m2.1-8bit.toml
Normal file
15
resources/model_cards/minimax-m2.1-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "minimax-m2.1-8bit"
|
||||
model_id = "mlx-community/MiniMax-M2.1-8bit"
|
||||
name = "MiniMax M2.1 8bit"
|
||||
description = "MiniMax M2.1 8bit"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/MiniMax-M2.1-8bit"
|
||||
pretty_name = "MiniMax M2.1 8bit"
|
||||
n_layers = 61
|
||||
hidden_size = 3072
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 242986745856
|
||||
15
resources/model_cards/qwen3-0.6b-8bit.toml
Normal file
15
resources/model_cards/qwen3-0.6b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-0.6b-8bit"
|
||||
model_id = "mlx-community/Qwen3-0.6B-8bit"
|
||||
name = "Qwen3 0.6B (8-bit)"
|
||||
description = "Qwen3 0.6B is a large language model trained on the Qwen3 0.6B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-0.6B-8bit"
|
||||
pretty_name = "Qwen3 0.6B (8-bit)"
|
||||
n_layers = 28
|
||||
hidden_size = 1024
|
||||
supports_tensor = false
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 698351616
|
||||
15
resources/model_cards/qwen3-0.6b.toml
Normal file
15
resources/model_cards/qwen3-0.6b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-0.6b"
|
||||
model_id = "mlx-community/Qwen3-0.6B-4bit"
|
||||
name = "Qwen3 0.6B (4-bit)"
|
||||
description = "Qwen3 0.6B is a large language model trained on the Qwen3 0.6B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-0.6B-4bit"
|
||||
pretty_name = "Qwen3 0.6B (4-bit)"
|
||||
n_layers = 28
|
||||
hidden_size = 1024
|
||||
supports_tensor = false
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 342884352
|
||||
15
resources/model_cards/qwen3-235b-a22b-4bit.toml
Normal file
15
resources/model_cards/qwen3-235b-a22b-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-235b-a22b-4bit"
|
||||
model_id = "mlx-community/Qwen3-235B-A22B-Instruct-2507-4bit"
|
||||
name = "Qwen3 235B A22B (4-bit)"
|
||||
description = "Qwen3 235B (Active 22B) is a large language model trained on the Qwen3 235B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-235B-A22B-Instruct-2507-4bit"
|
||||
pretty_name = "Qwen3 235B A22B (4-bit)"
|
||||
n_layers = 94
|
||||
hidden_size = 4096
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 141733920768
|
||||
15
resources/model_cards/qwen3-235b-a22b-8bit.toml
Normal file
15
resources/model_cards/qwen3-235b-a22b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-235b-a22b-8bit"
|
||||
model_id = "mlx-community/Qwen3-235B-A22B-Instruct-2507-8bit"
|
||||
name = "Qwen3 235B A22B (8-bit)"
|
||||
description = "Qwen3 235B (Active 22B) is a large language model trained on the Qwen3 235B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-235B-A22B-Instruct-2507-8bit"
|
||||
pretty_name = "Qwen3 235B A22B (8-bit)"
|
||||
n_layers = 94
|
||||
hidden_size = 4096
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 268435456000
|
||||
15
resources/model_cards/qwen3-30b-8bit.toml
Normal file
15
resources/model_cards/qwen3-30b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-30b-8bit"
|
||||
model_id = "mlx-community/Qwen3-30B-A3B-8bit"
|
||||
name = "Qwen3 30B A3B (8-bit)"
|
||||
description = "Qwen3 30B is a large language model trained on the Qwen3 30B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-30B-A3B-8bit"
|
||||
pretty_name = "Qwen3 30B A3B (8-bit)"
|
||||
n_layers = 48
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 33279705088
|
||||
15
resources/model_cards/qwen3-30b.toml
Normal file
15
resources/model_cards/qwen3-30b.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-30b"
|
||||
model_id = "mlx-community/Qwen3-30B-A3B-4bit"
|
||||
name = "Qwen3 30B A3B (4-bit)"
|
||||
description = "Qwen3 30B is a large language model trained on the Qwen3 30B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-30B-A3B-4bit"
|
||||
pretty_name = "Qwen3 30B A3B (4-bit)"
|
||||
n_layers = 48
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 17612931072
|
||||
15
resources/model_cards/qwen3-80b-a3B-4bit.toml
Normal file
15
resources/model_cards/qwen3-80b-a3B-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-80b-a3B-4bit"
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Instruct-4bit"
|
||||
name = "Qwen3 80B A3B (4-bit)"
|
||||
description = "Qwen3 80B"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Instruct-4bit"
|
||||
pretty_name = "Qwen3 80B A3B (4-bit)"
|
||||
n_layers = 48
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 46976204800
|
||||
15
resources/model_cards/qwen3-80b-a3B-8bit.toml
Normal file
15
resources/model_cards/qwen3-80b-a3B-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-80b-a3B-8bit"
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Instruct-8bit"
|
||||
name = "Qwen3 80B A3B (8-bit)"
|
||||
description = "Qwen3 80B"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Instruct-8bit"
|
||||
pretty_name = "Qwen3 80B A3B (8-bit)"
|
||||
n_layers = 48
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 88814387200
|
||||
15
resources/model_cards/qwen3-80b-a3B-thinking-4bit.toml
Normal file
15
resources/model_cards/qwen3-80b-a3B-thinking-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-80b-a3B-thinking-4bit"
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Thinking-4bit"
|
||||
name = "Qwen3 80B A3B Thinking (4-bit)"
|
||||
description = "Qwen3 80B Reasoning model"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Thinking-4bit"
|
||||
pretty_name = "Qwen3 80B A3B (4-bit)"
|
||||
n_layers = 48
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 88814387200
|
||||
15
resources/model_cards/qwen3-80b-a3B-thinking-8bit.toml
Normal file
15
resources/model_cards/qwen3-80b-a3B-thinking-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-80b-a3B-thinking-8bit"
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Thinking-8bit"
|
||||
name = "Qwen3 80B A3B Thinking (8-bit)"
|
||||
description = "Qwen3 80B Reasoning model"
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-Next-80B-A3B-Thinking-8bit"
|
||||
pretty_name = "Qwen3 80B A3B (8-bit)"
|
||||
n_layers = 48
|
||||
hidden_size = 2048
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 88814387200
|
||||
15
resources/model_cards/qwen3-coder-480b-a35b-4bit.toml
Normal file
15
resources/model_cards/qwen3-coder-480b-a35b-4bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-coder-480b-a35b-4bit"
|
||||
model_id = "mlx-community/Qwen3-Coder-480B-A35B-Instruct-4bit"
|
||||
name = "Qwen3 Coder 480B A35B (4-bit)"
|
||||
description = "Qwen3 Coder 480B (Active 35B) is a large language model trained on the Qwen3 Coder 480B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-Coder-480B-A35B-Instruct-4bit"
|
||||
pretty_name = "Qwen3 Coder 480B A35B (4-bit)"
|
||||
n_layers = 62
|
||||
hidden_size = 6144
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 289910292480
|
||||
15
resources/model_cards/qwen3-coder-480b-a35b-8bit.toml
Normal file
15
resources/model_cards/qwen3-coder-480b-a35b-8bit.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
short_id = "qwen3-coder-480b-a35b-8bit"
|
||||
model_id = "mlx-community/Qwen3-Coder-480B-A35B-Instruct-8bit"
|
||||
name = "Qwen3 Coder 480B A35B (8-bit)"
|
||||
description = "Qwen3 Coder 480B (Active 35B) is a large language model trained on the Qwen3 Coder 480B dataset."
|
||||
tags = []
|
||||
|
||||
[metadata]
|
||||
model_id = "mlx-community/Qwen3-Coder-480B-A35B-Instruct-8bit"
|
||||
pretty_name = "Qwen3 Coder 480B A35B (8-bit)"
|
||||
n_layers = 62
|
||||
hidden_size = 6144
|
||||
supports_tensor = true
|
||||
|
||||
[metadata.storage_size]
|
||||
in_bytes = 579820584960
|
||||
@@ -13,6 +13,12 @@ from hypercorn.asyncio import serve # pyright: ignore[reportUnknownVariableType
|
||||
from hypercorn.config import Config
|
||||
from hypercorn.typing import ASGIFramework
|
||||
from loguru import logger
|
||||
from openai_harmony import ( # pyright: ignore[reportMissingTypeStubs]
|
||||
HarmonyEncodingName,
|
||||
Role,
|
||||
StreamableParser,
|
||||
load_harmony_encoding,
|
||||
)
|
||||
|
||||
from exo.master.placement import place_instance as get_instance_placements
|
||||
from exo.shared.apply import apply
|
||||
@@ -61,6 +67,8 @@ from exo.utils.channels import Receiver, Sender, channel
|
||||
from exo.utils.dashboard_path import find_dashboard
|
||||
from exo.utils.event_buffer import OrderedBuffer
|
||||
|
||||
encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)
|
||||
|
||||
|
||||
def chunk_to_response(
|
||||
chunk: TokenChunk, command_id: CommandId
|
||||
@@ -373,8 +381,35 @@ class API:
|
||||
instance_id=instance_id,
|
||||
)
|
||||
|
||||
async def _process_gpt_oss(self, token_chunks: Receiver[TokenChunk]):
|
||||
stream = StreamableParser(encoding, role=Role.ASSISTANT)
|
||||
thinking = False
|
||||
|
||||
async for chunk in token_chunks:
|
||||
stream.process(chunk.token_id)
|
||||
|
||||
delta = stream.last_content_delta
|
||||
ch = stream.current_channel
|
||||
|
||||
if ch == "analysis" and not thinking:
|
||||
thinking = True
|
||||
yield chunk.model_copy(update={"text": "<think>"})
|
||||
|
||||
if ch != "analysis" and thinking:
|
||||
thinking = False
|
||||
yield chunk.model_copy(update={"text": "</think>"})
|
||||
|
||||
if delta:
|
||||
yield chunk.model_copy(update={"text": delta})
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
if thinking:
|
||||
yield chunk.model_copy(update={"text": "</think>"})
|
||||
yield chunk
|
||||
break
|
||||
|
||||
async def _chat_chunk_stream(
|
||||
self, command_id: CommandId
|
||||
self, command_id: CommandId, parse_gpt_oss: bool
|
||||
) -> AsyncGenerator[TokenChunk, None]:
|
||||
"""Yield `TokenChunk`s for a given command until completion."""
|
||||
|
||||
@@ -382,10 +417,16 @@ class API:
|
||||
self._chat_completion_queues[command_id], recv = channel[TokenChunk]()
|
||||
|
||||
with recv as token_chunks:
|
||||
async for chunk in token_chunks:
|
||||
yield chunk
|
||||
if chunk.finish_reason is not None:
|
||||
break
|
||||
if parse_gpt_oss:
|
||||
async for chunk in self._process_gpt_oss(token_chunks):
|
||||
yield chunk
|
||||
if chunk.finish_reason is not None:
|
||||
break
|
||||
else:
|
||||
async for chunk in token_chunks:
|
||||
yield chunk
|
||||
if chunk.finish_reason is not None:
|
||||
break
|
||||
|
||||
except anyio.get_cancelled_exc_class():
|
||||
# TODO: TaskCancelled
|
||||
@@ -401,11 +442,11 @@ class API:
|
||||
del self._chat_completion_queues[command_id]
|
||||
|
||||
async def _generate_chat_stream(
|
||||
self, command_id: CommandId
|
||||
self, command_id: CommandId, parse_gpt_oss: bool
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Generate chat completion stream as JSON strings."""
|
||||
|
||||
async for chunk in self._chat_chunk_stream(command_id):
|
||||
async for chunk in self._chat_chunk_stream(command_id, parse_gpt_oss):
|
||||
chunk_response: ChatCompletionResponse = chunk_to_response(
|
||||
chunk, command_id
|
||||
)
|
||||
@@ -417,7 +458,7 @@ class API:
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
async def _collect_chat_completion(
|
||||
self, command_id: CommandId
|
||||
self, command_id: CommandId, parse_gpt_oss: bool
|
||||
) -> ChatCompletionResponse:
|
||||
"""Collect all token chunks for a chat completion and return a single response."""
|
||||
|
||||
@@ -425,7 +466,7 @@ class API:
|
||||
model: str | None = None
|
||||
finish_reason: FinishReason | None = None
|
||||
|
||||
async for chunk in self._chat_chunk_stream(command_id):
|
||||
async for chunk in self._chat_chunk_stream(command_id, parse_gpt_oss):
|
||||
if model is None:
|
||||
model = chunk.model
|
||||
|
||||
@@ -454,7 +495,7 @@ class API:
|
||||
)
|
||||
|
||||
async def _collect_chat_completion_with_stats(
|
||||
self, command_id: CommandId
|
||||
self, command_id: CommandId, parse_gpt_oss: bool
|
||||
) -> BenchChatCompletionResponse:
|
||||
text_parts: list[str] = []
|
||||
model: str | None = None
|
||||
@@ -462,7 +503,7 @@ class API:
|
||||
|
||||
stats: GenerationStats | None = None
|
||||
|
||||
async for chunk in self._chat_chunk_stream(command_id):
|
||||
async for chunk in self._chat_chunk_stream(command_id, parse_gpt_oss):
|
||||
if model is None:
|
||||
model = chunk.model
|
||||
|
||||
@@ -503,6 +544,8 @@ class API:
|
||||
"""Handle chat completions, supporting both streaming and non-streaming responses."""
|
||||
model_meta = await resolve_model_meta(payload.model)
|
||||
payload.model = model_meta.model_id
|
||||
parse_gpt_oss = "gpt-oss" in model_meta.model_id.lower()
|
||||
logger.info(f"{parse_gpt_oss=}")
|
||||
|
||||
if not any(
|
||||
instance.shard_assignments.model_id == payload.model
|
||||
@@ -519,16 +562,17 @@ class API:
|
||||
await self._send(command)
|
||||
if payload.stream:
|
||||
return StreamingResponse(
|
||||
self._generate_chat_stream(command.command_id),
|
||||
self._generate_chat_stream(command.command_id, parse_gpt_oss),
|
||||
media_type="text/event-stream",
|
||||
)
|
||||
|
||||
return await self._collect_chat_completion(command.command_id)
|
||||
return await self._collect_chat_completion(command.command_id, parse_gpt_oss)
|
||||
|
||||
async def bench_chat_completions(
|
||||
self, payload: BenchChatCompletionTaskParams
|
||||
) -> BenchChatCompletionResponse:
|
||||
model_meta = await resolve_model_meta(payload.model)
|
||||
parse_gpt_oss = "gpt-oss" in model_meta.model_id.lower()
|
||||
payload.model = model_meta.model_id
|
||||
|
||||
if not any(
|
||||
@@ -545,7 +589,10 @@ class API:
|
||||
command = ChatCompletion(request_params=payload)
|
||||
await self._send(command)
|
||||
|
||||
response = await self._collect_chat_completion_with_stats(command.command_id)
|
||||
response = await self._collect_chat_completion_with_stats(
|
||||
command.command_id,
|
||||
parse_gpt_oss,
|
||||
)
|
||||
return response
|
||||
|
||||
def _calculate_total_available_memory(self) -> Memory:
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
from anyio import Path, open_file
|
||||
import tomlkit
|
||||
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.utils.pydantic_ext import CamelCaseModel
|
||||
@@ -11,6 +14,21 @@ class ModelCard(CamelCaseModel):
|
||||
tags: list[str]
|
||||
metadata: ModelMetadata
|
||||
|
||||
@staticmethod
|
||||
async def load(path: Path) -> "ModelCard":
|
||||
async with await open_file(path) as f:
|
||||
data = await f.read()
|
||||
py = tomlkit.loads(data)
|
||||
return ModelCard.model_validate(py)
|
||||
|
||||
async def save(self, path: Path):
|
||||
async with await open_file(path, "w") as f:
|
||||
py = self.model_dump()
|
||||
data = tomlkit.dumps(py) # pyright: ignore[reportUnknownMemberType]
|
||||
await f.write(data)
|
||||
|
||||
|
||||
|
||||
|
||||
MODEL_CARDS: dict[str, ModelCard] = {
|
||||
# deepseek v3
|
||||
@@ -425,15 +443,15 @@ MODEL_CARDS: dict[str, ModelCard] = {
|
||||
supports_tensor=True,
|
||||
),
|
||||
),
|
||||
"gpt-oss-20b-MXFP4-Q8": ModelCard(
|
||||
short_id="gpt-oss-20b-MXFP4-Q8",
|
||||
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q8"),
|
||||
name="GPT-OSS 20B (MXFP4-Q8, MLX)",
|
||||
description="""OpenAI's GPT-OSS 20B is a medium-sized MoE model for lower-latency and local or specialized use cases; this variant is a 4-bit MLX conversion for Apple Silicon.""",
|
||||
"gpt-oss-20b-4bit": ModelCard(
|
||||
short_id="gpt-oss-20b-4bit",
|
||||
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q4"),
|
||||
name="GPT-OSS 20B (MXFP4-Q4, MLX)",
|
||||
description="""OpenAI's GPT-OSS 20B is a medium-sized MoE model for lower-latency and local or specialized use cases; this MLX variant uses MXFP4 4-bit quantization.""",
|
||||
tags=[],
|
||||
metadata=ModelMetadata(
|
||||
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q8"),
|
||||
pretty_name="GPT-OSS 20B (MXFP4-Q8, MLX)",
|
||||
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q4"),
|
||||
pretty_name="GPT-OSS 20B (MXFP4-Q4, MLX)",
|
||||
storage_size=Memory.from_kb(11_744_051),
|
||||
n_layers=24,
|
||||
hidden_size=2880,
|
||||
|
||||
@@ -50,9 +50,7 @@ class RunnerReady(BaseRunnerStatus):
|
||||
|
||||
|
||||
class RunnerRunning(BaseRunnerStatus):
|
||||
"""Runner is processing requests and can accept more (continuous batching)."""
|
||||
|
||||
active_requests: int = 0
|
||||
pass
|
||||
|
||||
|
||||
class RunnerShuttingDown(BaseRunnerStatus):
|
||||
|
||||
@@ -1,302 +0,0 @@
|
||||
"""Batch generation engine using mlx_lm's BatchGenerator for continuous batching."""
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import mlx.core as mx
|
||||
from mlx_lm.generate import BatchGenerator
|
||||
from mlx_lm.sample_utils import make_sampler
|
||||
from mlx_lm.tokenizer_utils import StreamingDetokenizer, TokenizerWrapper
|
||||
|
||||
from exo.shared.types.api import FinishReason, GenerationStats
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.tasks import ChatCompletionTaskParams, TaskId
|
||||
from exo.shared.types.worker.runner_response import GenerationResponse
|
||||
from exo.worker.engines.mlx import Model
|
||||
from exo.worker.engines.mlx.constants import MAX_TOKENS
|
||||
from exo.worker.engines.mlx.generator.distributed_sync import share_object
|
||||
from exo.worker.engines.mlx.utils_mlx import apply_chat_template
|
||||
from exo.worker.runner.bootstrap import logger
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActiveRequest:
|
||||
"""Tracks an active request in the batch."""
|
||||
|
||||
command_id: CommandId
|
||||
task_id: TaskId
|
||||
uid: int # BatchGenerator's internal ID
|
||||
detokenizer: StreamingDetokenizer
|
||||
tokens_generated: int = 0
|
||||
prompt_tokens: int = 0
|
||||
start_time: float = field(default_factory=time.perf_counter)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchedGenerationResponse:
|
||||
"""Response from batch engine, tagged with command_id and task_id."""
|
||||
|
||||
command_id: CommandId
|
||||
task_id: TaskId
|
||||
response: GenerationResponse
|
||||
|
||||
|
||||
class BatchGenerationEngine:
|
||||
"""Manages continuous batching using mlx_lm's BatchGenerator."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: Model,
|
||||
tokenizer: TokenizerWrapper,
|
||||
group: mx.distributed.Group | None = None,
|
||||
max_tokens: int = MAX_TOKENS,
|
||||
completion_batch_size: int = 32,
|
||||
prefill_batch_size: int = 8,
|
||||
prefill_step_size: int = 2048,
|
||||
):
|
||||
self.model = model
|
||||
self.tokenizer = tokenizer
|
||||
self.max_tokens = max_tokens
|
||||
self.active_requests: dict[int, ActiveRequest] = {}
|
||||
self._pending_inserts: list[
|
||||
tuple[CommandId, TaskId, ChatCompletionTaskParams]
|
||||
] = []
|
||||
self._pending_completions: list[
|
||||
int
|
||||
] = [] # UIDs completed but not yet synced/removed
|
||||
|
||||
self.group = group
|
||||
self.rank = group.rank() if group else 0
|
||||
self.is_distributed = group is not None and group.size() > 1
|
||||
|
||||
sampler = make_sampler(temp=0.7, top_p=1.0)
|
||||
|
||||
eos_tokens: set[int] = set(tokenizer.eos_token_ids or [])
|
||||
|
||||
self.batch_gen: BatchGenerator = BatchGenerator(
|
||||
model=model,
|
||||
max_tokens=max_tokens,
|
||||
stop_tokens=eos_tokens,
|
||||
sampler=sampler,
|
||||
completion_batch_size=completion_batch_size,
|
||||
prefill_batch_size=prefill_batch_size,
|
||||
prefill_step_size=prefill_step_size,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"BatchGenerationEngine initialized with completion_batch_size={completion_batch_size}, "
|
||||
f"prefill_batch_size={prefill_batch_size}, distributed={self.is_distributed}"
|
||||
)
|
||||
|
||||
def queue_request(
|
||||
self,
|
||||
command_id: CommandId,
|
||||
task_id: TaskId,
|
||||
task_params: ChatCompletionTaskParams,
|
||||
) -> None:
|
||||
"""Queue a request for insertion. Only rank 0 should call this.
|
||||
|
||||
In distributed mode, rank 0 receives tasks from the control plane and
|
||||
queues them here. The actual insertion happens in sync_and_insert_pending()
|
||||
which ensures all ranks insert the same requests together.
|
||||
"""
|
||||
assert self.rank == 0, "Only rank 0 should queue requests"
|
||||
self._pending_inserts.append((command_id, task_id, task_params))
|
||||
logger.info(
|
||||
f"Queued request {command_id} for insertion (pending={len(self._pending_inserts)})"
|
||||
)
|
||||
|
||||
def sync_and_insert_pending(self) -> list[int]:
|
||||
"""Sync pending inserts across ranks and insert them. Returns UIDs.
|
||||
|
||||
This method ensures all ranks insert the same requests in the same order.
|
||||
In non-distributed mode, it simply inserts all pending requests.
|
||||
In distributed mode, it broadcasts pending requests from rank 0 to all ranks.
|
||||
|
||||
Batches all pending inserts into a single batch_gen.insert() call for
|
||||
efficient prefill batching.
|
||||
"""
|
||||
inserts_to_process: list[tuple[CommandId, TaskId, ChatCompletionTaskParams]]
|
||||
|
||||
if not self.is_distributed:
|
||||
# Non-distributed: just insert directly from pending
|
||||
inserts_to_process = list(self._pending_inserts)
|
||||
else:
|
||||
# Distributed: broadcast pending inserts from rank 0 to all ranks
|
||||
assert self.group is not None
|
||||
pending_data = self._pending_inserts if self.rank == 0 else None
|
||||
synced_data = share_object(pending_data, self.rank, self.group)
|
||||
|
||||
if synced_data is None:
|
||||
self._pending_inserts.clear()
|
||||
return []
|
||||
|
||||
inserts_to_process = synced_data
|
||||
|
||||
if not inserts_to_process:
|
||||
self._pending_inserts.clear()
|
||||
return []
|
||||
|
||||
# Prepare all requests for batched insertion
|
||||
all_tokens: list[list[int]] = []
|
||||
all_max_tokens: list[int] = []
|
||||
all_prompt_tokens: list[int] = []
|
||||
request_info: list[tuple[CommandId, TaskId]] = []
|
||||
|
||||
for cmd_id, task_id, params in inserts_to_process:
|
||||
prompt_str = apply_chat_template(self.tokenizer, params)
|
||||
tokens: list[int] = self.tokenizer.encode(
|
||||
prompt_str, add_special_tokens=False
|
||||
)
|
||||
max_tokens = params.max_tokens or self.max_tokens
|
||||
|
||||
all_tokens.append(tokens)
|
||||
all_max_tokens.append(max_tokens)
|
||||
all_prompt_tokens.append(len(tokens))
|
||||
request_info.append((cmd_id, task_id))
|
||||
|
||||
# Single batched insert for efficient prefill
|
||||
uids = self.batch_gen.insert(all_tokens, max_tokens=all_max_tokens)
|
||||
|
||||
# Track all inserted requests
|
||||
for i, uid in enumerate(uids):
|
||||
cmd_id, task_id = request_info[i]
|
||||
self.active_requests[uid] = ActiveRequest(
|
||||
command_id=cmd_id,
|
||||
task_id=task_id,
|
||||
uid=uid,
|
||||
detokenizer=self.tokenizer.detokenizer,
|
||||
prompt_tokens=all_prompt_tokens[i],
|
||||
)
|
||||
logger.info(
|
||||
f"Inserted request {cmd_id} with uid={uid}, prompt_tokens={all_prompt_tokens[i]}, max_tokens={all_max_tokens[i]}"
|
||||
)
|
||||
|
||||
self._pending_inserts.clear()
|
||||
return uids
|
||||
|
||||
def step(self) -> list[BatchedGenerationResponse]:
|
||||
"""Run one decode step. Tracks completions but does not sync - call sync_completions() at budget boundaries."""
|
||||
responses = self.batch_gen.next()
|
||||
if not responses:
|
||||
return []
|
||||
|
||||
results: list[BatchedGenerationResponse] = []
|
||||
|
||||
for r in responses:
|
||||
uid: int = r.uid
|
||||
req = self.active_requests.get(uid)
|
||||
if req is None:
|
||||
logger.warning(f"Received response for unknown uid={uid}")
|
||||
continue
|
||||
|
||||
req.tokens_generated += 1
|
||||
|
||||
# Decode the token
|
||||
token: int = r.token
|
||||
req.detokenizer.add_token(token)
|
||||
text: str = req.detokenizer.last_segment
|
||||
|
||||
stats: GenerationStats | None = None
|
||||
finish_reason: FinishReason | None = None
|
||||
|
||||
raw_finish_reason: str | None = r.finish_reason
|
||||
if raw_finish_reason is not None:
|
||||
# Finalize to get remaining text
|
||||
req.detokenizer.finalize()
|
||||
text = req.detokenizer.last_segment
|
||||
|
||||
elapsed = time.perf_counter() - req.start_time
|
||||
generation_tps = req.tokens_generated / elapsed if elapsed > 0 else 0.0
|
||||
|
||||
stats = GenerationStats(
|
||||
prompt_tps=0.0, # Not tracked per-request in batch mode
|
||||
generation_tps=generation_tps,
|
||||
prompt_tokens=req.prompt_tokens,
|
||||
generation_tokens=req.tokens_generated,
|
||||
peak_memory_usage=Memory.from_gb(mx.get_peak_memory() / 1e9),
|
||||
)
|
||||
|
||||
if raw_finish_reason == "stop":
|
||||
finish_reason = "stop"
|
||||
elif raw_finish_reason == "length":
|
||||
finish_reason = "length"
|
||||
else:
|
||||
logger.warning(f"Unknown finish_reason: {raw_finish_reason}")
|
||||
finish_reason = "stop"
|
||||
|
||||
# Track completion but don't remove yet - wait for sync_completions()
|
||||
self._pending_completions.append(uid)
|
||||
logger.info(
|
||||
f"Request {req.command_id} completed: {req.tokens_generated} tokens, {generation_tps:.2f} tps, reason={finish_reason}"
|
||||
)
|
||||
|
||||
results.append(
|
||||
BatchedGenerationResponse(
|
||||
command_id=req.command_id,
|
||||
task_id=req.task_id,
|
||||
response=GenerationResponse(
|
||||
text=text, token=token, finish_reason=finish_reason, stats=stats
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# In non-distributed mode, clean up completions immediately
|
||||
if not self.is_distributed:
|
||||
self._remove_completed()
|
||||
|
||||
return results
|
||||
|
||||
def sync_completions(self) -> None:
|
||||
"""Sync and remove completed requests. Call at time budget boundaries in distributed mode."""
|
||||
if not self.is_distributed:
|
||||
# Non-distributed: early return if nothing to do
|
||||
if not self._pending_completions:
|
||||
return
|
||||
self._remove_completed()
|
||||
return
|
||||
|
||||
# Distributed mode: ALWAYS sync to ensure all ranks participate in collective op
|
||||
# This prevents deadlock if one rank has completions and another doesn't
|
||||
assert self.group is not None
|
||||
synced_uids = share_object(
|
||||
self._pending_completions if self.rank == 0 else None,
|
||||
self.rank,
|
||||
self.group,
|
||||
)
|
||||
if synced_uids:
|
||||
self._pending_completions = synced_uids
|
||||
|
||||
self._remove_completed()
|
||||
|
||||
def _remove_completed(self) -> None:
|
||||
"""Remove completed requests from tracking."""
|
||||
for uid in self._pending_completions:
|
||||
if uid in self.active_requests:
|
||||
del self.active_requests[uid]
|
||||
self._pending_completions.clear()
|
||||
|
||||
@property
|
||||
def has_active_requests(self) -> bool:
|
||||
return bool(self.active_requests or self.batch_gen.unprocessed_prompts)
|
||||
|
||||
@property
|
||||
def has_pending_inserts(self) -> bool:
|
||||
return bool(self._pending_inserts)
|
||||
|
||||
@property
|
||||
def active_count(self) -> int:
|
||||
return len(self.active_requests)
|
||||
|
||||
@property
|
||||
def pending_count(self) -> int:
|
||||
return len(self.batch_gen.unprocessed_prompts)
|
||||
|
||||
@property
|
||||
def pending_insert_count(self) -> int:
|
||||
return len(self._pending_inserts)
|
||||
|
||||
@property
|
||||
def has_pending_completions(self) -> bool:
|
||||
return bool(self._pending_completions)
|
||||
@@ -1,30 +0,0 @@
|
||||
"""Distributed sync utilities using mx.distributed.all_sum() to broadcast from rank 0."""
|
||||
|
||||
# pyright: reportAny=false
|
||||
|
||||
import pickle
|
||||
from typing import TypeVar, cast
|
||||
|
||||
import mlx.core as mx
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def share_object(obj: T | None, rank: int, group: mx.distributed.Group) -> T | None:
|
||||
"""Broadcast object from rank 0 to all ranks. Two-phase: size then data."""
|
||||
if rank == 0:
|
||||
if obj is None:
|
||||
mx.eval(mx.distributed.all_sum(mx.array([0]), group=group))
|
||||
return None
|
||||
data = mx.array(list(pickle.dumps(obj)), dtype=mx.uint8)
|
||||
mx.eval(mx.distributed.all_sum(mx.array([data.size]), group=group))
|
||||
mx.eval(mx.distributed.all_sum(data, group=group))
|
||||
return obj
|
||||
else:
|
||||
size = int(mx.distributed.all_sum(mx.array([0]), group=group).item())
|
||||
if size == 0:
|
||||
return None
|
||||
data = mx.zeros(size, dtype=mx.uint8)
|
||||
data = mx.distributed.all_sum(data, group=group)
|
||||
mx.eval(data)
|
||||
return cast(T, pickle.loads(bytes(cast(list[int], data.tolist()))))
|
||||
@@ -1,104 +0,0 @@
|
||||
"""Time budget iterator for controlling generation loop timing in distributed mode.
|
||||
|
||||
Based on mlx-lm's TimeBudget pattern - runs for a time budget then syncs,
|
||||
rather than syncing every token. This reduces distributed sync overhead.
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Iterator
|
||||
|
||||
import mlx.core as mx
|
||||
|
||||
from exo.worker.runner.bootstrap import logger
|
||||
|
||||
generation_stream = mx.new_stream(mx.default_device())
|
||||
|
||||
|
||||
class TimeBudget(Iterator[None]):
|
||||
"""Controls generation loop timing, syncing across ranks periodically.
|
||||
|
||||
In distributed mode, periodically syncs timing across all ranks to
|
||||
dynamically adjust iteration count based on actual performance.
|
||||
|
||||
In non-distributed mode, simply runs for the time budget.
|
||||
|
||||
Usage:
|
||||
for _ in TimeBudget(budget=0.5):
|
||||
batch_engine.step()
|
||||
# ... process responses ...
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
budget: float = 0.5,
|
||||
iterations: int = 25,
|
||||
sync_frequency: int = 10,
|
||||
group: mx.distributed.Group | None = None,
|
||||
):
|
||||
"""Initialize TimeBudget.
|
||||
|
||||
Args:
|
||||
budget: Time budget in seconds before yielding control
|
||||
iterations: Initial number of iterations per budget period (distributed only)
|
||||
sync_frequency: How often to sync timing across ranks (distributed only)
|
||||
group: Distributed group, or None for non-distributed mode
|
||||
"""
|
||||
self._budget = budget
|
||||
self._iterations = iterations
|
||||
self._sync_frequency = sync_frequency
|
||||
self._group = group
|
||||
self._is_distributed = group is not None and group.size() > 1
|
||||
|
||||
# Runtime state
|
||||
self._start: float = 0.0
|
||||
self._current_iterations: int = 0
|
||||
self._loops: int = 0
|
||||
self._time_spent: float = 0.0
|
||||
|
||||
def __iter__(self) -> "TimeBudget":
|
||||
self._start = time.perf_counter()
|
||||
self._current_iterations = 0
|
||||
return self
|
||||
|
||||
def __next__(self) -> None:
|
||||
if not self._is_distributed:
|
||||
# Non-distributed: just check time budget
|
||||
if time.perf_counter() - self._start > self._budget:
|
||||
raise StopIteration()
|
||||
return None
|
||||
|
||||
# Distributed mode: iteration-based with periodic timing sync
|
||||
self._current_iterations += 1
|
||||
if self._current_iterations > self._iterations:
|
||||
self._loops += 1
|
||||
self._time_spent += time.perf_counter() - self._start
|
||||
|
||||
if self._loops % self._sync_frequency == 0:
|
||||
# Sync timing across all ranks
|
||||
assert self._group is not None
|
||||
with mx.stream(generation_stream):
|
||||
time_array = mx.array([self._time_spent], dtype=mx.float32)
|
||||
total_time = mx.distributed.all_sum(time_array, group=self._group)
|
||||
mx.eval(total_time)
|
||||
loop_time = float(total_time.item())
|
||||
|
||||
avg_loop_time = loop_time / (self._group.size() * self._sync_frequency)
|
||||
|
||||
if avg_loop_time > 0:
|
||||
factor = self._budget / avg_loop_time
|
||||
self._iterations = max(round(self._iterations * factor), 1)
|
||||
logger.debug(
|
||||
f"TimeBudget adjusted iterations to {self._iterations}"
|
||||
)
|
||||
|
||||
self._loops = 0
|
||||
self._time_spent = 0.0
|
||||
|
||||
raise StopIteration()
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def iterations(self) -> int:
|
||||
"""Current iterations per budget period."""
|
||||
return self._iterations
|
||||
@@ -20,7 +20,6 @@ except ImportError:
|
||||
|
||||
from mlx_lm.models.cache import KVCache, QuantizedKVCache, RotatingKVCache
|
||||
from mlx_lm.models.deepseek_v3 import DeepseekV3Model
|
||||
from mlx_lm.models.gpt_oss import Model as GptOssModel
|
||||
from mlx_lm.tokenizer_utils import TokenizerWrapper
|
||||
|
||||
from exo.worker.engines.mlx.constants import (
|
||||
@@ -366,8 +365,6 @@ def apply_chat_template(
|
||||
tools=chat_task_data.tools,
|
||||
)
|
||||
|
||||
logger.info(prompt)
|
||||
|
||||
return prompt
|
||||
|
||||
|
||||
@@ -399,11 +396,6 @@ def make_kv_cache(
|
||||
) -> list[KVCache | RotatingKVCache | QuantizedKVCache]:
|
||||
assert hasattr(model, "layers")
|
||||
|
||||
# TODO: Do this for all models
|
||||
if hasattr(model, "make_cache") and isinstance(model, GptOssModel):
|
||||
logger.info("Using MLX LM's make cache")
|
||||
return model.make_cache() # type: ignore
|
||||
|
||||
if max_kv_size is None:
|
||||
if KV_CACHE_BITS is None:
|
||||
logger.info("Using default KV cache")
|
||||
|
||||
@@ -277,14 +277,12 @@ def _pending_tasks(
|
||||
# I have a design point here; this is a state race in disguise as the task status doesn't get updated to completed fast enough
|
||||
# however, realistically the task status should be set to completed by the LAST runner, so this is a true race
|
||||
# the actual solution is somewhat deeper than this bypass - TODO!
|
||||
# Also skip tasks in pending to prevent duplicate forwarding with continuous batching
|
||||
if task.task_id in runner.completed or task.task_id in runner.pending:
|
||||
if task.task_id in runner.completed:
|
||||
continue
|
||||
|
||||
# TODO: Check ordering aligns with MLX distributeds expectations.
|
||||
|
||||
# Allow forwarding tasks when runner is Ready or Running (for continuous batching)
|
||||
if isinstance(runner.status, (RunnerReady, RunnerRunning)) and all(
|
||||
if isinstance(runner.status, RunnerReady) and all(
|
||||
isinstance(all_runners[global_runner_id], (RunnerReady, RunnerRunning))
|
||||
for global_runner_id in runner.bound_instance.instance.shard_assignments.runner_to_shard
|
||||
):
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import gc
|
||||
import time
|
||||
|
||||
import mlx.core as mx
|
||||
from anyio import WouldBlock
|
||||
|
||||
from exo.shared.types.api import ChatCompletionMessageText
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
@@ -23,6 +21,9 @@ from exo.shared.types.tasks import (
|
||||
TaskStatus,
|
||||
)
|
||||
from exo.shared.types.worker.instances import BoundInstance
|
||||
from exo.shared.types.worker.runner_response import (
|
||||
GenerationResponse,
|
||||
)
|
||||
from exo.shared.types.worker.runners import (
|
||||
RunnerConnected,
|
||||
RunnerConnecting,
|
||||
@@ -38,9 +39,7 @@ from exo.shared.types.worker.runners import (
|
||||
RunnerWarmingUp,
|
||||
)
|
||||
from exo.utils.channels import MpReceiver, MpSender
|
||||
from exo.worker.engines.mlx.generator.batch_engine import BatchGenerationEngine
|
||||
from exo.worker.engines.mlx.generator.generate import warmup_inference
|
||||
from exo.worker.engines.mlx.generator.time_budget import TimeBudget
|
||||
from exo.worker.engines.mlx.generator.generate import mlx_generate, warmup_inference
|
||||
from exo.worker.engines.mlx.utils_mlx import (
|
||||
initialize_mlx,
|
||||
load_mlx_items,
|
||||
@@ -70,318 +69,142 @@ def main(
|
||||
model = None
|
||||
tokenizer = None
|
||||
group = None
|
||||
batch_engine: BatchGenerationEngine | None = None
|
||||
pending_shutdown: Shutdown | None = None
|
||||
|
||||
current_status: RunnerStatus = RunnerIdle()
|
||||
|
||||
def send_status(status: RunnerStatus) -> None:
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(runner_id=runner_id, runner_status=status)
|
||||
)
|
||||
|
||||
logger.info("runner created")
|
||||
send_status(current_status)
|
||||
|
||||
def handle_task(task: Task, is_deferred: bool = False) -> bool:
|
||||
nonlocal current_status, model, tokenizer, group, batch_engine, pending_shutdown
|
||||
|
||||
# For Shutdown, check if we need to defer BEFORE sending Running/Acknowledged
|
||||
if (
|
||||
isinstance(task, Shutdown)
|
||||
and not is_deferred
|
||||
and batch_engine is not None
|
||||
and (batch_engine.has_active_requests or batch_engine.has_pending_inserts)
|
||||
):
|
||||
logger.info("deferring shutdown until active requests complete")
|
||||
pending_shutdown = task
|
||||
return True
|
||||
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Running)
|
||||
)
|
||||
event_sender.send(TaskAcknowledged(task_id=task.task_id))
|
||||
|
||||
match task:
|
||||
case ConnectToGroup() if isinstance(
|
||||
current_status, (RunnerIdle, RunnerFailed)
|
||||
):
|
||||
logger.info("runner connecting")
|
||||
current_status = RunnerConnecting()
|
||||
send_status(current_status)
|
||||
group = initialize_mlx(bound_instance)
|
||||
|
||||
logger.info("runner connected")
|
||||
current_status = RunnerConnected()
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(
|
||||
task_id=task.task_id, task_status=TaskStatus.Complete
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
|
||||
)
|
||||
with task_receiver as tasks:
|
||||
for task in tasks:
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Running)
|
||||
)
|
||||
event_sender.send(TaskAcknowledged(task_id=task.task_id))
|
||||
match task:
|
||||
case ConnectToGroup() if isinstance(
|
||||
current_status, (RunnerIdle, RunnerFailed)
|
||||
):
|
||||
logger.info("runner connecting")
|
||||
current_status = RunnerConnecting()
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
)
|
||||
)
|
||||
send_status(current_status)
|
||||
group = initialize_mlx(bound_instance)
|
||||
|
||||
case LoadModel() if (
|
||||
isinstance(current_status, RunnerConnected) and group is not None
|
||||
) or (isinstance(current_status, RunnerIdle) and group is None):
|
||||
current_status = RunnerLoading()
|
||||
logger.info("runner loading")
|
||||
send_status(current_status)
|
||||
logger.info("runner connected")
|
||||
current_status = RunnerConnected()
|
||||
|
||||
model, tokenizer = load_mlx_items(bound_instance, group)
|
||||
|
||||
current_status = RunnerLoaded()
|
||||
logger.info("runner loaded")
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(
|
||||
task_id=task.task_id, task_status=TaskStatus.Complete
|
||||
# we load the model if it's connected with a group, or idle without a group. we should never tell a model to connect if it doesn't need to
|
||||
case LoadModel() if (
|
||||
isinstance(current_status, RunnerConnected) and group is not None
|
||||
) or (isinstance(current_status, RunnerIdle) and group is None):
|
||||
current_status = RunnerLoading()
|
||||
logger.info("runner loading")
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
)
|
||||
)
|
||||
send_status(current_status)
|
||||
|
||||
case StartWarmup() if isinstance(current_status, RunnerLoaded):
|
||||
assert model is not None
|
||||
assert tokenizer is not None
|
||||
current_status = RunnerWarmingUp()
|
||||
logger.info("runner warming up")
|
||||
send_status(current_status)
|
||||
model, tokenizer = load_mlx_items(bound_instance, group)
|
||||
|
||||
logger.info(f"warming up inference for instance: {instance}")
|
||||
toks = warmup_inference(model=model, tokenizer=tokenizer)
|
||||
logger.info(f"warmed up by generating {toks} tokens")
|
||||
logger.info(
|
||||
f"runner initialized in {time.time() - setup_start_time} seconds"
|
||||
)
|
||||
|
||||
batch_engine = BatchGenerationEngine(
|
||||
model=model, tokenizer=tokenizer, group=group
|
||||
)
|
||||
|
||||
current_status = RunnerReady()
|
||||
logger.info("runner ready")
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(
|
||||
task_id=task.task_id, task_status=TaskStatus.Complete
|
||||
current_status = RunnerLoaded()
|
||||
logger.info("runner loaded")
|
||||
case StartWarmup() if isinstance(current_status, RunnerLoaded):
|
||||
assert model
|
||||
assert tokenizer
|
||||
current_status = RunnerWarmingUp()
|
||||
logger.info("runner warming up")
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
)
|
||||
)
|
||||
send_status(current_status)
|
||||
|
||||
case ChatCompletion(task_params=task_params, command_id=command_id) if (
|
||||
isinstance(current_status, (RunnerReady, RunnerRunning))
|
||||
):
|
||||
assert batch_engine is not None
|
||||
|
||||
# In distributed mode, only rank 0 should queue requests
|
||||
# Other ranks should skip - they'll participate in sync_and_insert_pending()
|
||||
is_distributed_mode = group is not None and group.size() > 1
|
||||
if is_distributed_mode and shard_metadata.device_rank != 0:
|
||||
logger.debug(
|
||||
f"Rank {shard_metadata.device_rank} skipping ChatCompletionTask (only rank 0 queues)"
|
||||
logger.info(f"warming up inference for instance: {instance}")
|
||||
toks = warmup_inference(
|
||||
model=model,
|
||||
tokenizer=tokenizer,
|
||||
# kv_prefix_cache=kv_prefix_cache, # supply for warmup-time prefix caching
|
||||
)
|
||||
return True
|
||||
|
||||
if task_params.messages and task_params.messages[0].content is not None:
|
||||
logger.info(f"warmed up by generating {toks} tokens")
|
||||
logger.info(
|
||||
f"runner initialized in {time.time() - setup_start_time} seconds"
|
||||
)
|
||||
current_status = RunnerReady()
|
||||
logger.info("runner ready")
|
||||
case ChatCompletion(task_params=task_params, command_id=command_id) if (
|
||||
isinstance(current_status, RunnerReady)
|
||||
):
|
||||
assert model
|
||||
assert tokenizer
|
||||
logger.info(f"received chat request: {str(task)[:500]}")
|
||||
current_status = RunnerRunning()
|
||||
logger.info("runner running")
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
)
|
||||
assert task_params.messages[0].content is not None
|
||||
_check_for_debug_prompts(task_params.messages[0].content)
|
||||
|
||||
# Queue the request - actual insertion happens in sync_and_insert_pending()
|
||||
batch_engine.queue_request(
|
||||
command_id=command_id, task_id=task.task_id, task_params=task_params
|
||||
)
|
||||
|
||||
# Status will be updated after actual insertion in the main loop
|
||||
# For now, set to RunnerRunning to indicate we're processing
|
||||
current_status = RunnerRunning(
|
||||
active_requests=batch_engine.active_count
|
||||
+ batch_engine.pending_insert_count
|
||||
)
|
||||
send_status(current_status)
|
||||
|
||||
case Shutdown():
|
||||
current_status = RunnerShuttingDown()
|
||||
logger.info("runner shutting down")
|
||||
send_status(current_status)
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(
|
||||
task_id=task.task_id, task_status=TaskStatus.Complete
|
||||
)
|
||||
)
|
||||
current_status = RunnerShutdown()
|
||||
send_status(current_status)
|
||||
return False
|
||||
|
||||
case _:
|
||||
raise ValueError(
|
||||
f"Received {task.__class__.__name__} outside of state machine in {current_status=}"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
with task_receiver as tasks:
|
||||
running = True
|
||||
is_rank_0 = shard_metadata.device_rank == 0
|
||||
|
||||
while running:
|
||||
# Use batch_engine.is_distributed since it's set correctly after group initialization
|
||||
# (the group variable is None at loop start, but set by ConnectToGroup task)
|
||||
if batch_engine is not None and batch_engine.is_distributed:
|
||||
assert group is not None
|
||||
assert batch_engine is not None
|
||||
|
||||
# Distributed mode: tasks wake up all ranks, then we sync and generate
|
||||
|
||||
# Check deferred shutdown FIRST - all ranks must check and process together
|
||||
# This must run before any collective operations to prevent deadlock
|
||||
if (
|
||||
pending_shutdown is not None
|
||||
and not batch_engine.has_active_requests
|
||||
and not batch_engine.has_pending_inserts
|
||||
):
|
||||
handle_task(pending_shutdown, is_deferred=True)
|
||||
running = False
|
||||
continue
|
||||
|
||||
# When idle, block waiting for task (exo sends tasks to all ranks)
|
||||
# When active, poll non-blocking to batch incoming requests
|
||||
if (
|
||||
not batch_engine.has_active_requests
|
||||
and not batch_engine.has_pending_inserts
|
||||
):
|
||||
# IDLE: Block until task arrives (all ranks receive the same task)
|
||||
task = tasks.receive()
|
||||
task_result = handle_task(task)
|
||||
if not task_result:
|
||||
running = False
|
||||
continue
|
||||
else:
|
||||
# ACTIVE: Poll for new tasks without blocking
|
||||
while True:
|
||||
try:
|
||||
task = tasks.receive_nowait()
|
||||
task_result = handle_task(task)
|
||||
if not task_result:
|
||||
running = False
|
||||
break
|
||||
except WouldBlock:
|
||||
break
|
||||
if not running:
|
||||
continue
|
||||
|
||||
# Sync and insert pending requests (collective operation)
|
||||
# Rank 0 broadcasts its pending to all ranks
|
||||
inserted = batch_engine.sync_and_insert_pending()
|
||||
if is_rank_0 and inserted:
|
||||
current_status = RunnerRunning(
|
||||
active_requests=batch_engine.active_count
|
||||
)
|
||||
send_status(current_status)
|
||||
|
||||
# Run generation for time budget
|
||||
if batch_engine.has_active_requests:
|
||||
time_budget = TimeBudget(budget=0.5, group=group)
|
||||
for _ in time_budget:
|
||||
if not batch_engine.has_active_requests:
|
||||
break
|
||||
for resp in batch_engine.step():
|
||||
# Send token IMMEDIATELY for smooth streaming (only rank 0)
|
||||
if is_rank_0:
|
||||
event_sender.send(
|
||||
ChunkGenerated(
|
||||
command_id=resp.command_id,
|
||||
chunk=TokenChunk(
|
||||
idx=resp.response.token,
|
||||
model=shard_metadata.model_meta.model_id,
|
||||
text=resp.response.text,
|
||||
token_id=resp.response.token,
|
||||
finish_reason=resp.response.finish_reason,
|
||||
stats=resp.response.stats,
|
||||
),
|
||||
)
|
||||
)
|
||||
if resp.response.finish_reason is not None:
|
||||
# Generate responses using the actual MLX generation
|
||||
for response in mlx_generate(
|
||||
model=model,
|
||||
tokenizer=tokenizer,
|
||||
task=task_params,
|
||||
):
|
||||
match response:
|
||||
case GenerationResponse():
|
||||
if shard_metadata.device_rank == 0:
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(
|
||||
task_id=resp.task_id,
|
||||
task_status=TaskStatus.Complete,
|
||||
ChunkGenerated(
|
||||
command_id=command_id,
|
||||
chunk=TokenChunk(
|
||||
idx=response.token,
|
||||
model=shard_metadata.model_meta.model_id,
|
||||
text=response.text,
|
||||
token_id=response.token,
|
||||
finish_reason=response.finish_reason,
|
||||
stats=response.stats,
|
||||
),
|
||||
)
|
||||
)
|
||||
# case TokenizedResponse():
|
||||
# TODO: something here ig
|
||||
|
||||
# Sync completions at budget boundary (always call - it's a collective operation)
|
||||
batch_engine.sync_completions()
|
||||
|
||||
# Update status after budget
|
||||
if is_rank_0:
|
||||
current_status = (
|
||||
RunnerRunning(active_requests=batch_engine.active_count)
|
||||
if batch_engine.has_active_requests
|
||||
else RunnerReady()
|
||||
)
|
||||
send_status(current_status)
|
||||
|
||||
else:
|
||||
# Non-distributed mode: original logic with queue + insert
|
||||
while True:
|
||||
try:
|
||||
task = tasks.receive_nowait()
|
||||
running = handle_task(task)
|
||||
if not running:
|
||||
break
|
||||
except WouldBlock:
|
||||
break
|
||||
|
||||
if not running:
|
||||
break
|
||||
|
||||
# Insert any queued requests (non-distributed just inserts directly)
|
||||
# Status was already sent in handle_task when queueing
|
||||
if batch_engine is not None and batch_engine.has_pending_inserts:
|
||||
batch_engine.sync_and_insert_pending()
|
||||
|
||||
if batch_engine is not None and batch_engine.has_active_requests:
|
||||
for resp in batch_engine.step():
|
||||
if shard_metadata.device_rank == 0:
|
||||
event_sender.send(
|
||||
ChunkGenerated(
|
||||
command_id=resp.command_id,
|
||||
chunk=TokenChunk(
|
||||
idx=resp.response.token,
|
||||
model=shard_metadata.model_meta.model_id,
|
||||
text=resp.response.text,
|
||||
token_id=resp.response.token,
|
||||
finish_reason=resp.response.finish_reason,
|
||||
stats=resp.response.stats,
|
||||
),
|
||||
)
|
||||
)
|
||||
if resp.response.finish_reason is not None:
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(
|
||||
task_id=resp.task_id,
|
||||
task_status=TaskStatus.Complete,
|
||||
)
|
||||
)
|
||||
|
||||
if batch_engine.has_active_requests:
|
||||
current_status = RunnerRunning(
|
||||
active_requests=batch_engine.active_count
|
||||
current_status = RunnerReady()
|
||||
logger.info("runner ready")
|
||||
case Shutdown():
|
||||
current_status = RunnerShuttingDown()
|
||||
logger.info("runner shutting down")
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(
|
||||
runner_id=runner_id, runner_status=current_status
|
||||
)
|
||||
else:
|
||||
current_status = RunnerReady()
|
||||
send_status(current_status)
|
||||
)
|
||||
current_status = RunnerShutdown()
|
||||
case _:
|
||||
raise ValueError(
|
||||
f"Received {task.__class__.__name__} outside of state machine in {current_status=}"
|
||||
)
|
||||
event_sender.send(
|
||||
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Complete)
|
||||
)
|
||||
event_sender.send(
|
||||
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
|
||||
)
|
||||
if isinstance(current_status, RunnerShutdown):
|
||||
del model, tokenizer, group
|
||||
mx.clear_cache()
|
||||
import gc
|
||||
|
||||
# Process deferred shutdown after all requests complete
|
||||
if (
|
||||
pending_shutdown is not None
|
||||
and not batch_engine.has_active_requests
|
||||
and not batch_engine.has_pending_inserts
|
||||
):
|
||||
running = handle_task(pending_shutdown, is_deferred=True)
|
||||
else:
|
||||
task = tasks.receive()
|
||||
running = handle_task(task)
|
||||
|
||||
# Cleanup
|
||||
del model, tokenizer, group, batch_engine
|
||||
mx.clear_cache()
|
||||
gc.collect()
|
||||
gc.collect()
|
||||
break
|
||||
|
||||
|
||||
EXO_RUNNER_MUST_FAIL = "EXO RUNNER MUST FAIL"
|
||||
|
||||
@@ -105,7 +105,7 @@ class RunnerSupervisor:
|
||||
return
|
||||
|
||||
# This is overkill but it's not technically bad, just unnecessary.
|
||||
logger.warning("Runner process didn't shutdown successfully, terminating")
|
||||
logger.warning("Runner process didn't shutdown succesfully, terminating")
|
||||
self.runner_process.terminate()
|
||||
await to_thread.run_sync(self.runner_process.join, 5)
|
||||
if not self.runner_process.is_alive():
|
||||
@@ -128,11 +128,9 @@ class RunnerSupervisor:
|
||||
|
||||
async def start_task(self, task: Task):
|
||||
if task.task_id in self.completed:
|
||||
logger.info(f"Skipping task {task.task_id} - already completed")
|
||||
return
|
||||
if task.task_id in self.pending:
|
||||
logger.info(f"Skipping task {task.task_id} - already pending")
|
||||
return
|
||||
logger.info(
|
||||
f"Skipping invalid task {task} as it has already been completed"
|
||||
)
|
||||
logger.info(f"Starting task {task}")
|
||||
event = anyio.Event()
|
||||
self.pending[task.task_id] = event
|
||||
@@ -151,17 +149,13 @@ class RunnerSupervisor:
|
||||
if isinstance(event, RunnerStatusUpdated):
|
||||
self.status = event.runner_status
|
||||
if isinstance(event, TaskAcknowledged):
|
||||
# Just set the event to unblock start_task, but keep in pending
|
||||
# to prevent duplicate forwarding until completion
|
||||
if event.task_id in self.pending:
|
||||
self.pending[event.task_id].set()
|
||||
self.pending.pop(event.task_id).set()
|
||||
continue
|
||||
if isinstance(event, TaskStatusUpdated) and event.task_status in (
|
||||
TaskStatus.Complete,
|
||||
TaskStatus.TimedOut,
|
||||
TaskStatus.Failed,
|
||||
if (
|
||||
isinstance(event, TaskStatusUpdated)
|
||||
and event.task_status == TaskStatus.Complete
|
||||
):
|
||||
# If a task has just finished, we should be working on it.
|
||||
# If a task has just been completed, we should be working on it.
|
||||
assert isinstance(
|
||||
self.status,
|
||||
(
|
||||
@@ -172,8 +166,6 @@ class RunnerSupervisor:
|
||||
RunnerShuttingDown,
|
||||
),
|
||||
)
|
||||
# Now safe to remove from pending and add to completed
|
||||
self.pending.pop(event.task_id, None)
|
||||
self.completed.add(event.task_id)
|
||||
await self._event_sender.send(event)
|
||||
except (ClosedResourceError, BrokenResourceError) as e:
|
||||
|
||||
@@ -20,7 +20,6 @@ class FakeRunnerSupervisor:
|
||||
bound_instance: BoundInstance
|
||||
status: RunnerStatus
|
||||
completed: set[TaskId] = field(default_factory=set)
|
||||
pending: dict[TaskId, object] = field(default_factory=dict)
|
||||
|
||||
|
||||
class OtherTask(BaseTask):
|
||||
|
||||
@@ -1,319 +0,0 @@
|
||||
"""
|
||||
Tests for continuous batching behavior in the runner.
|
||||
|
||||
These tests verify that:
|
||||
1. Single requests work through the batch path
|
||||
2. Multiple concurrent requests batch together
|
||||
3. Tokens are routed to the correct requests
|
||||
4. Requests complete at different times appropriately
|
||||
"""
|
||||
|
||||
# pyright: reportAny=false
|
||||
# pyright: reportUnknownArgumentType=false
|
||||
# pyright: reportUnknownMemberType=false
|
||||
# pyright: reportAttributeAccessIssue=false
|
||||
# pyright: reportInvalidTypeVarUse=false
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
import exo.worker.runner.runner as mlx_runner
|
||||
from exo.shared.types.api import ChatCompletionMessage
|
||||
from exo.shared.types.common import CommandId, NodeId
|
||||
from exo.shared.types.events import (
|
||||
Event,
|
||||
RunnerStatusUpdated,
|
||||
TaskStatusUpdated,
|
||||
)
|
||||
from exo.shared.types.tasks import (
|
||||
ChatCompletion,
|
||||
ChatCompletionTaskParams,
|
||||
ConnectToGroup,
|
||||
LoadModel,
|
||||
Shutdown,
|
||||
StartWarmup,
|
||||
Task,
|
||||
TaskId,
|
||||
TaskStatus,
|
||||
)
|
||||
from exo.shared.types.worker.runner_response import GenerationResponse
|
||||
from exo.shared.types.worker.runners import RunnerRunning
|
||||
from exo.utils.channels import mp_channel
|
||||
from exo.worker.engines.mlx.generator.batch_engine import (
|
||||
BatchedGenerationResponse,
|
||||
)
|
||||
from exo.worker.tests.constants import (
|
||||
INSTANCE_1_ID,
|
||||
MODEL_A_ID,
|
||||
NODE_A,
|
||||
RUNNER_1_ID,
|
||||
)
|
||||
from exo.worker.tests.unittests.conftest import get_bound_mlx_ring_instance
|
||||
|
||||
|
||||
class FakeBatchEngineWithTokens:
|
||||
"""
|
||||
Fake batch engine that generates a specified number of tokens per request.
|
||||
|
||||
This simulates realistic batch generation behavior where:
|
||||
- Requests are queued on insert
|
||||
- Each step() call generates one token for all active requests
|
||||
- Requests complete when they've generated all their tokens
|
||||
"""
|
||||
|
||||
def __init__(self, *_args: Any, **_kwargs: Any):
|
||||
self._active_requests: dict[int, tuple[CommandId, TaskId, int, int]] = {}
|
||||
self._pending_inserts: list[
|
||||
tuple[CommandId, TaskId, ChatCompletionTaskParams]
|
||||
] = []
|
||||
self._uid_counter = 0
|
||||
self._tokens_per_request = 3 # Default: generate 3 tokens before completing
|
||||
self.rank = 0 # Fake rank for testing
|
||||
|
||||
def queue_request(
|
||||
self,
|
||||
command_id: CommandId,
|
||||
task_id: TaskId,
|
||||
task_params: ChatCompletionTaskParams,
|
||||
) -> None:
|
||||
"""Queue a request for insertion."""
|
||||
self._pending_inserts.append((command_id, task_id, task_params))
|
||||
|
||||
def sync_and_insert_pending(self) -> list[int]:
|
||||
"""Insert all pending requests."""
|
||||
uids: list[int] = []
|
||||
for command_id, task_id, task_params in self._pending_inserts:
|
||||
uid = self._do_insert(command_id, task_id, task_params)
|
||||
uids.append(uid)
|
||||
self._pending_inserts.clear()
|
||||
return uids
|
||||
|
||||
@property
|
||||
def has_pending_inserts(self) -> bool:
|
||||
return len(self._pending_inserts) > 0
|
||||
|
||||
def _do_insert(
|
||||
self,
|
||||
command_id: CommandId,
|
||||
task_id: TaskId,
|
||||
task_params: ChatCompletionTaskParams | None,
|
||||
) -> int:
|
||||
uid = self._uid_counter
|
||||
self._uid_counter += 1
|
||||
# Track: (command_id, task_id, tokens_generated, max_tokens)
|
||||
max_tokens = task_params.max_tokens if task_params else self._tokens_per_request
|
||||
self._active_requests[uid] = (command_id, task_id, 0, max_tokens or 3)
|
||||
return uid
|
||||
|
||||
def step(self) -> list[BatchedGenerationResponse]:
|
||||
results: list[BatchedGenerationResponse] = []
|
||||
uids_to_remove: list[int] = []
|
||||
|
||||
for uid, (command_id, task_id, tokens_gen, max_tokens) in list(
|
||||
self._active_requests.items()
|
||||
):
|
||||
tokens_gen += 1
|
||||
finish_reason = "stop" if tokens_gen >= max_tokens else None
|
||||
text = f"token{tokens_gen}"
|
||||
|
||||
if finish_reason:
|
||||
uids_to_remove.append(uid)
|
||||
else:
|
||||
self._active_requests[uid] = (
|
||||
command_id,
|
||||
task_id,
|
||||
tokens_gen,
|
||||
max_tokens,
|
||||
)
|
||||
|
||||
results.append(
|
||||
BatchedGenerationResponse(
|
||||
command_id=command_id,
|
||||
task_id=task_id,
|
||||
response=GenerationResponse(
|
||||
token=tokens_gen,
|
||||
text=text,
|
||||
finish_reason=finish_reason,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
for uid in uids_to_remove:
|
||||
del self._active_requests[uid]
|
||||
|
||||
return results
|
||||
|
||||
@property
|
||||
def has_active_requests(self) -> bool:
|
||||
return len(self._active_requests) > 0
|
||||
|
||||
@property
|
||||
def active_count(self) -> int:
|
||||
return len(self._active_requests)
|
||||
|
||||
@property
|
||||
def pending_insert_count(self) -> int:
|
||||
return len(self._pending_inserts)
|
||||
|
||||
|
||||
def make_nothin[T, U, V](res: T):
|
||||
def nothin(*_1: U, **_2: V) -> T:
|
||||
return res
|
||||
|
||||
return nothin
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_batch_engine(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Patch MLX dependencies and use FakeBatchEngineWithTokens."""
|
||||
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(MagicMock()))
|
||||
monkeypatch.setattr(
|
||||
mlx_runner, "load_mlx_items", make_nothin((MagicMock(), MagicMock()))
|
||||
)
|
||||
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
|
||||
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", make_nothin(None))
|
||||
monkeypatch.setattr(mlx_runner, "BatchGenerationEngine", FakeBatchEngineWithTokens)
|
||||
|
||||
|
||||
def _run_with_tasks(tasks: list[Task]) -> list[Event]:
|
||||
"""
|
||||
Run tasks through the runner, adding shutdown at the end.
|
||||
|
||||
Tasks are sent in order, with shutdown sent last.
|
||||
The batch engine processes between task handling.
|
||||
"""
|
||||
bound_instance = get_bound_mlx_ring_instance(
|
||||
instance_id=INSTANCE_1_ID,
|
||||
model_id=MODEL_A_ID,
|
||||
runner_id=RUNNER_1_ID,
|
||||
node_id=NodeId(NODE_A),
|
||||
)
|
||||
|
||||
task_sender, task_receiver = mp_channel[Task]()
|
||||
event_sender, event_receiver = mp_channel[Event]()
|
||||
|
||||
shutdown_task = Shutdown(
|
||||
task_id=TaskId("shutdown"),
|
||||
instance_id=INSTANCE_1_ID,
|
||||
runner_id=RUNNER_1_ID,
|
||||
)
|
||||
|
||||
with task_sender, event_receiver:
|
||||
# Send all tasks including shutdown
|
||||
for t in tasks:
|
||||
task_sender.send(t)
|
||||
task_sender.send(shutdown_task)
|
||||
|
||||
# Disable cleanup methods to prevent issues
|
||||
event_sender.close = lambda: None
|
||||
event_sender.join = lambda: None
|
||||
task_receiver.close = lambda: None
|
||||
task_receiver.join = lambda: None
|
||||
|
||||
mlx_runner.main(bound_instance, event_sender, task_receiver)
|
||||
|
||||
return event_receiver.collect()
|
||||
|
||||
|
||||
INIT_TASK = ConnectToGroup(task_id=TaskId("init"), instance_id=INSTANCE_1_ID)
|
||||
LOAD_TASK = LoadModel(task_id=TaskId("load"), instance_id=INSTANCE_1_ID)
|
||||
WARMUP_TASK = StartWarmup(task_id=TaskId("warmup"), instance_id=INSTANCE_1_ID)
|
||||
|
||||
|
||||
def make_chat_task(
|
||||
task_id: str, command_id: str, max_tokens: int = 3
|
||||
) -> ChatCompletion:
|
||||
return ChatCompletion(
|
||||
task_id=TaskId(task_id),
|
||||
command_id=CommandId(command_id),
|
||||
task_params=ChatCompletionTaskParams(
|
||||
model=str(MODEL_A_ID),
|
||||
messages=[ChatCompletionMessage(role="user", content="hello")],
|
||||
stream=True,
|
||||
max_tokens=max_tokens,
|
||||
),
|
||||
instance_id=INSTANCE_1_ID,
|
||||
)
|
||||
|
||||
|
||||
def test_single_request_generates_tokens(patch_batch_engine: None):
|
||||
"""
|
||||
Verify a single request generates the expected tokens through the batch path.
|
||||
|
||||
Note: With the current non-blocking design, shutdown is processed before
|
||||
batch steps run when all tasks are queued together. This test verifies
|
||||
the runner status reflects active requests.
|
||||
"""
|
||||
chat_task = make_chat_task("chat1", "cmd1", max_tokens=3)
|
||||
events = _run_with_tasks([INIT_TASK, LOAD_TASK, WARMUP_TASK, chat_task])
|
||||
|
||||
# Find RunnerRunning status events - this shows the request was inserted
|
||||
running_events = [
|
||||
e
|
||||
for e in events
|
||||
if isinstance(e, RunnerStatusUpdated)
|
||||
and isinstance(e.runner_status, RunnerRunning)
|
||||
]
|
||||
|
||||
assert len(running_events) >= 1, "Expected at least one RunnerRunning event"
|
||||
assert running_events[0].runner_status.active_requests == 1
|
||||
|
||||
|
||||
def test_runner_status_reflects_active_requests(patch_batch_engine: None):
|
||||
"""Verify RunnerRunning status includes active_requests count."""
|
||||
chat_task = make_chat_task("chat1", "cmd1", max_tokens=2)
|
||||
events = _run_with_tasks([INIT_TASK, LOAD_TASK, WARMUP_TASK, chat_task])
|
||||
|
||||
# Find RunnerRunning status events
|
||||
running_events = [
|
||||
e
|
||||
for e in events
|
||||
if isinstance(e, RunnerStatusUpdated)
|
||||
and isinstance(e.runner_status, RunnerRunning)
|
||||
]
|
||||
|
||||
assert len(running_events) > 0, "Expected at least one RunnerRunning event"
|
||||
assert running_events[0].runner_status.active_requests == 1
|
||||
|
||||
|
||||
def test_chat_task_acknowledged(patch_batch_engine: None):
|
||||
"""Verify chat completion task is acknowledged with proper status updates."""
|
||||
chat_task = make_chat_task("chat1", "cmd1", max_tokens=2)
|
||||
events = _run_with_tasks([INIT_TASK, LOAD_TASK, WARMUP_TASK, chat_task])
|
||||
|
||||
# Find the chat task status events
|
||||
chat_running = [
|
||||
e
|
||||
for e in events
|
||||
if isinstance(e, TaskStatusUpdated)
|
||||
and e.task_id == TaskId("chat1")
|
||||
and e.task_status == TaskStatus.Running
|
||||
]
|
||||
|
||||
assert len(chat_running) == 1, "Expected exactly one chat task Running status"
|
||||
|
||||
|
||||
def test_multiple_requests_tracked(patch_batch_engine: None):
|
||||
"""Verify multiple concurrent requests are tracked in active_requests."""
|
||||
chat1 = make_chat_task("chat1", "cmd1", max_tokens=2)
|
||||
chat2 = make_chat_task("chat2", "cmd2", max_tokens=2)
|
||||
events = _run_with_tasks([INIT_TASK, LOAD_TASK, WARMUP_TASK, chat1, chat2])
|
||||
|
||||
# Find RunnerRunning status events
|
||||
running_events = [
|
||||
e
|
||||
for e in events
|
||||
if isinstance(e, RunnerStatusUpdated)
|
||||
and isinstance(e.runner_status, RunnerRunning)
|
||||
]
|
||||
|
||||
# Should have at least 2 RunnerRunning events (one per request inserted)
|
||||
assert len(running_events) >= 2, (
|
||||
f"Expected at least 2 RunnerRunning events, got {len(running_events)}"
|
||||
)
|
||||
|
||||
# First should have 1 active request, second should have 2
|
||||
assert running_events[0].runner_status.active_requests == 1
|
||||
assert running_events[1].runner_status.active_requests == 2
|
||||
@@ -1,17 +1,12 @@
|
||||
# Check tasks are complete before runner is ever ready.
|
||||
|
||||
# pyright: reportAny=false
|
||||
|
||||
from collections.abc import Iterable
|
||||
from typing import Any, Callable
|
||||
from unittest.mock import MagicMock
|
||||
from typing import Callable
|
||||
|
||||
import pytest
|
||||
|
||||
import exo.worker.runner.runner as mlx_runner
|
||||
from exo.shared.types.api import ChatCompletionMessage
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.events import (
|
||||
ChunkGenerated,
|
||||
Event,
|
||||
@@ -27,7 +22,6 @@ from exo.shared.types.tasks import (
|
||||
Shutdown,
|
||||
StartWarmup,
|
||||
Task,
|
||||
TaskId,
|
||||
TaskStatus,
|
||||
)
|
||||
from exo.shared.types.worker.runner_response import GenerationResponse
|
||||
@@ -44,9 +38,6 @@ from exo.shared.types.worker.runners import (
|
||||
RunnerWarmingUp,
|
||||
)
|
||||
from exo.utils.channels import mp_channel
|
||||
from exo.worker.engines.mlx.generator.batch_engine import (
|
||||
BatchedGenerationResponse,
|
||||
)
|
||||
|
||||
from ...constants import (
|
||||
CHAT_COMPLETION_TASK_ID,
|
||||
@@ -116,89 +107,18 @@ def assert_events_equal(test_events: Iterable[Event], true_events: Iterable[Even
|
||||
assert test_event == true_event, f"{test_event} != {true_event}"
|
||||
|
||||
|
||||
class FakeBatchEngine:
|
||||
"""
|
||||
Fake batch engine for testing.
|
||||
|
||||
Queues requests on insert, returns one token per step.
|
||||
The runner's non-blocking loop drains all tasks before running batch steps,
|
||||
so this engine queues requests and has_active_requests returns True only
|
||||
after at least one request has been inserted.
|
||||
"""
|
||||
|
||||
def __init__(self, *_args: Any, **_kwargs: Any):
|
||||
self._active_requests: dict[int, tuple[CommandId, TaskId]] = {}
|
||||
self._pending_inserts: list[
|
||||
tuple[CommandId, TaskId, ChatCompletionTaskParams]
|
||||
] = []
|
||||
self._uid_counter = 0
|
||||
self.rank = 0 # Fake rank for testing
|
||||
|
||||
def queue_request(
|
||||
self,
|
||||
command_id: CommandId,
|
||||
task_id: TaskId,
|
||||
task_params: ChatCompletionTaskParams,
|
||||
) -> None:
|
||||
"""Queue a request for insertion."""
|
||||
self._pending_inserts.append((command_id, task_id, task_params))
|
||||
|
||||
def sync_and_insert_pending(self) -> list[int]:
|
||||
"""Insert all pending requests."""
|
||||
uids: list[int] = []
|
||||
for command_id, task_id, _task_params in self._pending_inserts:
|
||||
uid = self._uid_counter
|
||||
self._uid_counter += 1
|
||||
self._active_requests[uid] = (command_id, task_id)
|
||||
uids.append(uid)
|
||||
self._pending_inserts.clear()
|
||||
return uids
|
||||
|
||||
@property
|
||||
def has_pending_inserts(self) -> bool:
|
||||
return len(self._pending_inserts) > 0
|
||||
|
||||
def step(self) -> list[BatchedGenerationResponse]:
|
||||
results: list[BatchedGenerationResponse] = []
|
||||
# Process all active requests - return one token and complete
|
||||
for uid, (command_id, task_id) in list(self._active_requests.items()):
|
||||
results.append(
|
||||
BatchedGenerationResponse(
|
||||
command_id=command_id,
|
||||
task_id=task_id,
|
||||
response=GenerationResponse(
|
||||
token=0,
|
||||
text="hi",
|
||||
finish_reason="stop",
|
||||
),
|
||||
)
|
||||
)
|
||||
del self._active_requests[uid]
|
||||
return results
|
||||
|
||||
@property
|
||||
def has_active_requests(self) -> bool:
|
||||
return len(self._active_requests) > 0
|
||||
|
||||
@property
|
||||
def active_count(self) -> int:
|
||||
return len(self._active_requests)
|
||||
|
||||
@property
|
||||
def pending_insert_count(self) -> int:
|
||||
return len(self._pending_inserts)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
|
||||
# initialize_mlx returns a fake "group" (non-None for state machine)
|
||||
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(MagicMock()))
|
||||
monkeypatch.setattr(
|
||||
mlx_runner, "load_mlx_items", make_nothin((MagicMock(), MagicMock()))
|
||||
)
|
||||
# initialize_mlx returns a "group" equal to 1
|
||||
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(1))
|
||||
monkeypatch.setattr(mlx_runner, "load_mlx_items", make_nothin((1, 1)))
|
||||
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
|
||||
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", nothin)
|
||||
monkeypatch.setattr(mlx_runner, "BatchGenerationEngine", FakeBatchEngine)
|
||||
|
||||
def fake_generate(*_1: object, **_2: object):
|
||||
yield GenerationResponse(token=0, text="hi", finish_reason="stop")
|
||||
|
||||
monkeypatch.setattr(mlx_runner, "mlx_generate", fake_generate)
|
||||
|
||||
|
||||
def _run(tasks: Iterable[Task]):
|
||||
@@ -228,8 +148,7 @@ def _run(tasks: Iterable[Task]):
|
||||
return event_receiver.collect()
|
||||
|
||||
|
||||
def test_chat_completion_generates_and_completes(patch_out_mlx: pytest.MonkeyPatch):
|
||||
"""Verify chat completion generates tokens, completes, and runner returns to Ready."""
|
||||
def test_events_processed_in_correct_order(patch_out_mlx: pytest.MonkeyPatch):
|
||||
events = _run([INIT_TASK, LOAD_TASK, WARMUP_TASK, CHAT_TASK, SHUTDOWN_TASK])
|
||||
|
||||
expected_chunk = ChunkGenerated(
|
||||
@@ -272,9 +191,7 @@ def test_chat_completion_generates_and_completes(patch_out_mlx: pytest.MonkeyPat
|
||||
task_id=CHAT_COMPLETION_TASK_ID, task_status=TaskStatus.Running
|
||||
),
|
||||
TaskAcknowledged(task_id=CHAT_COMPLETION_TASK_ID),
|
||||
RunnerStatusUpdated(
|
||||
runner_id=RUNNER_1_ID, runner_status=RunnerRunning(active_requests=1)
|
||||
),
|
||||
RunnerStatusUpdated(runner_id=RUNNER_1_ID, runner_status=RunnerRunning()),
|
||||
expected_chunk,
|
||||
TaskStatusUpdated(
|
||||
task_id=CHAT_COMPLETION_TASK_ID, task_status=TaskStatus.Complete
|
||||
@@ -289,6 +206,7 @@ def test_chat_completion_generates_and_completes(patch_out_mlx: pytest.MonkeyPat
|
||||
TaskStatusUpdated(
|
||||
task_id=SHUTDOWN_TASK_ID, task_status=TaskStatus.Complete
|
||||
),
|
||||
# SPECIAL EXCEPTION FOR RUNNER SHUTDOWN
|
||||
RunnerStatusUpdated(runner_id=RUNNER_1_ID, runner_status=RunnerShutdown()),
|
||||
],
|
||||
)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import http.client
|
||||
import time
|
||||
|
||||
from anyio import create_task_group, to_thread
|
||||
from loguru import logger
|
||||
@@ -7,8 +6,6 @@ from loguru import logger
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.common import NodeId
|
||||
|
||||
BAD_STATUSLINE_ATTEMPTS = 3
|
||||
|
||||
|
||||
async def check_reachability(
|
||||
target_ip: str,
|
||||
@@ -18,9 +15,8 @@ async def check_reachability(
|
||||
) -> None:
|
||||
"""Check if a node is reachable at the given IP and verify its identity."""
|
||||
|
||||
# TODO: use an async http client
|
||||
def _fetch_remote_node_id(*, attempt: int = 1) -> NodeId | None:
|
||||
connection = http.client.HTTPConnection(target_ip, 52415, timeout=3)
|
||||
def _fetch_remote_node_id() -> NodeId | None:
|
||||
connection = http.client.HTTPConnection(target_ip, 52415, timeout=1)
|
||||
try:
|
||||
connection.request("GET", "/node_id")
|
||||
response = connection.getresponse()
|
||||
@@ -36,16 +32,7 @@ async def check_reachability(
|
||||
return NodeId(body) or None
|
||||
except OSError:
|
||||
return None
|
||||
except http.client.BadStatusLine:
|
||||
if attempt >= BAD_STATUSLINE_ATTEMPTS:
|
||||
logger.warning(
|
||||
f"BadStatusLine from {target_ip}, after {attempt} attempts, assuming connection to {expected_node_id} has dropped"
|
||||
)
|
||||
return None
|
||||
time.sleep(1)
|
||||
return _fetch_remote_node_id(attempt=attempt + 1)
|
||||
except http.client.HTTPException as e:
|
||||
logger.warning(f"HTTPException from {target_ip}: {type(e).__name__}: {e}")
|
||||
except http.client.HTTPException:
|
||||
return None
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
Reference in New Issue
Block a user