Compare commits

..

1 Commits

Author SHA1 Message Date
Evan
b694552402 remove timing out in all but topology 2026-02-06 13:00:28 +00:00
74 changed files with 1391 additions and 3546 deletions

6
.gitignore vendored
View File

@@ -32,9 +32,3 @@ dashboard/.svelte-kit/
# host config snapshots
hosts_*.json
.swp
# bench files
bench/**/*.json
# tmp
tmp/models

View File

@@ -1,7 +0,0 @@
"""
This type stub file was generated by pyright.
"""
from mflux.models.flux.variants.kontext.flux_kontext import Flux1Kontext
__all__ = ["Flux1Kontext"]

View File

@@ -1,49 +0,0 @@
"""
This type stub file was generated by pyright.
"""
from pathlib import Path
from typing import Any
from mlx import nn
from mflux.models.common.config.model_config import ModelConfig
from mflux.models.flux.model.flux_text_encoder.clip_encoder.clip_encoder import (
CLIPEncoder,
)
from mflux.models.flux.model.flux_text_encoder.t5_encoder.t5_encoder import T5Encoder
from mflux.models.flux.model.flux_transformer.transformer import Transformer
from mflux.models.flux.model.flux_vae.vae import VAE
from mflux.utils.generated_image import GeneratedImage
class Flux1Kontext(nn.Module):
vae: VAE
transformer: Transformer
t5_text_encoder: T5Encoder
clip_text_encoder: CLIPEncoder
bits: int | None
lora_paths: list[str] | None
lora_scales: list[float] | None
prompt_cache: dict[str, Any]
tokenizers: dict[str, Any]
def __init__(
self,
quantize: int | None = ...,
model_path: str | None = ...,
lora_paths: list[str] | None = ...,
lora_scales: list[float] | None = ...,
model_config: ModelConfig = ...,
) -> None: ...
def generate_image(
self,
seed: int,
prompt: str,
num_inference_steps: int = ...,
height: int = ...,
width: int = ...,
guidance: float = ...,
image_path: Path | str | None = ...,
image_strength: float | None = ...,
scheduler: str = ...,
) -> GeneratedImage: ...

View File

@@ -1,16 +0,0 @@
"""
This type stub file was generated by pyright.
"""
import mlx.core as mx
from mflux.models.flux.model.flux_vae.vae import VAE
class KontextUtil:
@staticmethod
def create_image_conditioning_latents(
vae: VAE,
height: int,
width: int,
image_path: str,
) -> tuple[mx.array, mx.array]: ...

View File

@@ -1139,7 +1139,7 @@ class array:
) -> array:
"""See :func:`flatten`."""
def reshape(self, *shape: int, stream: Stream | Device | None = ...) -> array:
def reshape(self, *shape, stream: Stream | Device | None = ...) -> array:
"""
Equivalent to :func:`reshape` but the shape can be passed either as a
:obj:`tuple` or as separate arguments.
@@ -1222,7 +1222,7 @@ class array:
) -> array:
"""See :func:`swapaxes`."""
def transpose(self, *axes: int, stream: Stream | Device | None = ...) -> array:
def transpose(self, *axes, stream: Stream | Device | None = ...) -> array:
"""
Equivalent to :func:`transpose` but the axes can be passed either as
a tuple or as separate arguments.

View File

@@ -30,9 +30,6 @@ class Conv1d(Module):
bias (bool, optional): If ``True`` add a learnable bias to the output.
Default: ``True``
"""
weight: mx.array
groups: int
def __init__(
self,
in_channels: int,

View File

@@ -11,10 +11,7 @@ import mlx.core as mx
class Cache(Protocol):
keys: mx.array
values: mx.array
offset: int
def update_and_fetch(
self, keys: mx.array, values: mx.array
) -> tuple[mx.array, mx.array]: ...
def update_and_fetch(self, keys: mx.array, values: mx.array) -> None: ...
@property
def state(self) -> tuple[mx.array, mx.array]: ...
@state.setter
@@ -90,7 +87,6 @@ def create_attention_mask(
class _BaseCache(Cache):
keys: mx.array
values: mx.array
offset: int
@property
def state(self) -> tuple[mx.array, mx.array]: ...
@state.setter

View File

@@ -5,7 +5,6 @@ from typing import Any, Dict, Optional
import mlx.core as mx
import mlx.nn as nn
from mlx_lm.models.mla import MultiLinear
from .base import BaseModelArgs
from .switch_layers import SwitchGLU
@@ -61,10 +60,7 @@ class DeepseekV3Attention(nn.Module):
q_b_proj: nn.Linear
kv_a_proj_with_mqa: nn.Linear
kv_a_layernorm: nn.RMSNorm
# kv_b_proj: nn.Linear
embed_q: MultiLinear
unembed_out: MultiLinear
kv_b_proj: nn.Linear
o_proj: nn.Linear
rope: Any

View File

@@ -1,153 +0,0 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional
import mlx.core as mx
import mlx.nn as nn
from .base import BaseModelArgs
from .switch_layers import SwitchGLU
@dataclass
class ModelArgs(BaseModelArgs):
model_type: str
vocab_size: int
hidden_size: int
intermediate_size: int
max_position_embeddings: int
moe_intermediate_size: int
norm_topk_prob: bool
num_attention_heads: int
n_group: int
head_dim: int
topk_group: int
n_shared_experts: int
n_routed_experts: int
routed_scaling_factor: float
num_experts_per_tok: int
first_k_dense_replace: int
num_hidden_layers: int
num_key_value_heads: int
rms_norm_eps: float
rope_theta: float
rope_scaling: Optional[Dict[str, Any]]
use_qk_norm: bool
tie_word_embeddings: bool
attention_bias: bool
partial_rotary_factor: float
scoring_func: str
topk_method: str
class Attention(nn.Module):
n_heads: int
n_kv_heads: int
scale: float
q_proj: nn.Linear
k_proj: nn.Linear
v_proj: nn.Linear
o_proj: nn.Linear
use_qk_norm: bool
q_norm: nn.RMSNorm
k_norm: nn.RMSNorm
rope: nn.RoPE
def __init__(self, args: ModelArgs) -> None: ...
def __call__(
self,
x: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class MLP(nn.Module):
config: ModelArgs
hidden_size: int
intermediate_size: int
gate_proj: nn.Linear
up_proj: nn.Linear
down_proj: nn.Linear
def __init__(
self,
config: ModelArgs,
hidden_size: Optional[int] = None,
intermediate_size: Optional[int] = None,
) -> None: ...
def __call__(self, x: mx.array) -> mx.array: ...
class MoEGate(nn.Module):
config: ModelArgs
top_k: int
norm_topk_prob: bool
n_routed_experts: int
routed_scaling_factor: float
n_group: int
topk_group: int
weight: mx.array
e_score_correction_bias: mx.array
def __init__(self, config: ModelArgs) -> None: ...
def __call__(self, x: mx.array) -> tuple[mx.array, mx.array]: ...
class MoE(nn.Module):
config: ModelArgs
num_experts_per_tok: int
switch_mlp: SwitchGLU
gate: MoEGate
shared_experts: MLP
sharding_group: Optional[mx.distributed.Group]
def __init__(self, config: ModelArgs) -> None: ...
def __call__(self, x: mx.array) -> mx.array: ...
class DecoderLayer(nn.Module):
self_attn: Attention
mlp: MLP | MoE
input_layernorm: nn.RMSNorm
post_attention_layernorm: nn.RMSNorm
def __init__(self, config: ModelArgs, layer_idx: int) -> None: ...
def __call__(
self,
x: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class LanguageModel(nn.Module):
vocab_size: int
embed_tokens: nn.Embedding
layers: list[DecoderLayer]
norm: nn.RMSNorm
pipeline_rank: int
pipeline_size: int
start_idx: int
end_idx: Optional[int]
num_layers: int
def __init__(self, config: ModelArgs) -> None: ...
def __call__(
self,
x: mx.array,
cache: Optional[Any] = None,
) -> mx.array: ...
@property
def pipeline_layers(self) -> list[DecoderLayer]: ...
class Model(nn.Module):
args: ModelArgs
model_type: str
model: LanguageModel
lm_head: nn.Linear
def __init__(self, config: ModelArgs) -> None: ...
def __call__(
self,
inputs: mx.array,
cache: Optional[Any] = None,
) -> mx.array: ...
def sanitize(self, weights: dict[str, Any]) -> dict[str, Any]: ...
def shard(self, group: Optional[mx.distributed.Group] = None) -> None: ...
@property
def layers(self) -> list[DecoderLayer]: ...
@property
def cast_predicate(self) -> Any: ...

View File

@@ -1,114 +0,0 @@
"""Type stubs for mlx_lm.models.qwen3_next"""
from typing import Any, Optional
import mlx.core as mx
import mlx.nn as nn
from .switch_layers import SwitchGLU
class Qwen3NextMLP(nn.Module):
gate_proj: nn.Linear
down_proj: nn.Linear
up_proj: nn.Linear
def __init__(self, dim: int, hidden_dim: int) -> None: ...
def __call__(self, x: mx.array) -> mx.array: ...
class Qwen3NextGatedDeltaNet(nn.Module):
hidden_size: int
num_v_heads: int
num_k_heads: int
head_k_dim: int
head_v_dim: int
key_dim: int
value_dim: int
conv_kernel_size: int
conv_dim: int
conv1d: nn.Conv1d
in_proj_qkvz: nn.Linear
in_proj_ba: nn.Linear
dt_bias: mx.array
A_log: mx.array
out_proj: nn.Linear
def __init__(self, config: Any) -> None: ...
def __call__(
self,
inputs: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class Qwen3NextAttention(nn.Module):
num_attention_heads: int
num_key_value_heads: int
head_dim: int
scale: float
q_proj: nn.Linear
k_proj: nn.Linear
v_proj: nn.Linear
o_proj: nn.Linear
def __init__(self, args: Any) -> None: ...
def __call__(
self,
x: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class Qwen3NextSparseMoeBlock(nn.Module):
norm_topk_prob: bool
num_experts: int
top_k: int
gate: nn.Linear
switch_mlp: SwitchGLU
shared_expert: Qwen3NextMLP
shared_expert_gate: nn.Linear
def __init__(self, args: Any) -> None: ...
def __call__(self, x: mx.array) -> mx.array: ...
class Qwen3NextDecoderLayer(nn.Module):
is_linear: bool
linear_attn: Qwen3NextGatedDeltaNet
self_attn: Qwen3NextAttention
input_layernorm: nn.RMSNorm
post_attention_layernorm: nn.RMSNorm
mlp: Qwen3NextMLP | Qwen3NextSparseMoeBlock
def __init__(self, args: Any, layer_idx: int) -> None: ...
def __call__(
self,
x: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class Qwen3NextModel(nn.Module):
embed_tokens: nn.Embedding
layers: list[Qwen3NextDecoderLayer]
norm: nn.RMSNorm
def __init__(self, args: Any) -> None: ...
def __call__(
self,
inputs: mx.array,
cache: Optional[Any] = None,
) -> mx.array: ...
class Model(nn.Module):
model_type: str
model: Qwen3NextModel
lm_head: nn.Linear
def __init__(self, args: Any) -> None: ...
def __call__(
self,
inputs: mx.array,
cache: Optional[Any] = None,
) -> mx.array: ...
def sanitize(self, weights: dict[str, Any]) -> dict[str, Any]: ...
@property
def layers(self) -> list[Qwen3NextDecoderLayer]: ...

View File

@@ -113,10 +113,6 @@ class TokenizerWrapper:
bos_token: str | None
vocab_size: int
all_special_tokens: list[str]
think_start: str | None
think_end: str | None
think_start_id: int | None
think_end_id: int | None
def __init__(
self,

View File

@@ -431,12 +431,7 @@ def main() -> int:
ap.add_argument(
"--skip-pipeline-jaccl",
action="store_true",
help="Skip pipeline+jaccl placements, as it's often pointless.",
)
ap.add_argument(
"--skip-tensor-ring",
action="store_true",
help="Skip tensor+ring placements, as it's so slow.",
help="Pipeline jaccl is often pointless, skip by default",
)
ap.add_argument(
"--repeat", type=int, default=1, help="Repetitions per (pp,tg) pair."
@@ -455,7 +450,6 @@ def main() -> int:
default="bench/results.json",
help="Write raw per-run results JSON to this path.",
)
ap.add_argument("--stdout", action="store_true", help="Write results to stdout")
ap.add_argument(
"--dry-run", action="store_true", help="List selected placements and exit."
)
@@ -539,16 +533,6 @@ def main() -> int:
):
continue
if (
args.skip_tensor_ring
and (
args.instance_meta == "both"
and "ring" in p.get("instance_meta", "").lower()
)
and (args.sharding == "both" and "tensor" in p.get("sharding", "").lower())
):
continue
if args.min_nodes <= n <= args.max_nodes:
selected.append(p)
@@ -668,9 +652,7 @@ def main() -> int:
time.sleep(5)
if args.stdout:
json.dump(all_rows, sys.stdout, indent=2, ensure_ascii=False)
elif args.json_out:
if args.json_out:
with open(args.json_out, "w", encoding="utf-8") as f:
json.dump(all_rows, f, indent=2, ensure_ascii=False)
logger.debug(f"\nWrote results JSON: {args.json_out}")

View File

@@ -1,16 +0,0 @@
[project]
name = "exo-bench"
version = "0.1.0"
description = "Benchmarking tool for exo distributed inference"
requires-python = ">=3.13"
dependencies = [
"loguru>=0.7.3",
"transformers>=5.0.0",
"huggingface-hub>=0.33.4",
"tiktoken>=0.12.0",
"jinja2>=3.1.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

View File

@@ -58,18 +58,6 @@
d="M17.962 1.844h-4.326l-3.425 7.81H5.369V1.878H1.5V22h3.87v-8.477h6.824a3.025 3.025 0 002.743-1.75V22h3.87v-8.477a3.87 3.87 0 00-3.588-3.86v-.01h-2.125a3.94 3.94 0 002.323-2.12l2.545-5.689z"
/>
</svg>
{:else if family === "flux"}
<svg class="w-6 h-6 {className}" viewBox="0 0 24 24" fill="currentColor">
<path
d="M12 2L2 19h7.5l2.5-4.5L14.5 19H22L12 2zm0 4.5L16.5 17h-3l-1.5-2.7L10.5 17h-3L12 6.5z"
/>
</svg>
{:else if family === "qwen-image"}
<svg class="w-6 h-6 {className}" viewBox="0 0 24 24" fill="currentColor">
<path
d="M12.604 1.34c.393.69.784 1.382 1.174 2.075a.18.18 0 00.157.091h5.552c.174 0 .322.11.446.327l1.454 2.57c.19.337.24.478.024.837-.26.43-.513.864-.76 1.3l-.367.658c-.106.196-.223.28-.04.512l2.652 4.637c.172.301.111.494-.043.77-.437.785-.882 1.564-1.335 2.34-.159.272-.352.375-.68.37-.777-.016-1.552-.01-2.327.016a.099.099 0 00-.081.05 575.097 575.097 0 01-2.705 4.74c-.169.293-.38.363-.725.364-.997.003-2.002.004-3.017.002a.537.537 0 01-.465-.271l-1.335-2.323a.09.09 0 00-.083-.049H4.982c-.285.03-.553-.001-.805-.092l-1.603-2.77a.543.543 0 01-.002-.54l1.207-2.12a.198.198 0 000-.197 550.951 550.951 0 01-1.875-3.272l-.79-1.395c-.16-.31-.173-.496.095-.965.465-.813.927-1.625 1.387-2.436.132-.234.304-.334.584-.335a338.3 338.3 0 012.589-.001.124.124 0 00.107-.063l2.806-4.895a.488.488 0 01.422-.246c.524-.001 1.053 0 1.583-.006L11.704 1c.341-.003.724.032.9.34zm-3.432.403a.06.06 0 00-.052.03L6.254 6.788a.157.157 0 01-.135.078H3.253c-.056 0-.07.025-.041.074l5.81 10.156c.025.042.013.062-.034.063l-2.795.015a.218.218 0 00-.2.116l-1.32 2.31c-.044.078-.021.118.068.118l5.716.008c.046 0 .08.02.104.061l1.403 2.454c.046.081.092.082.139 0l5.006-8.76.783-1.382a.055.055 0 01.096 0l1.424 2.53a.122.122 0 00.107.062l2.763-.02a.04.04 0 00.035-.02.041.041 0 000-.04l-2.9-5.086a.108.108 0 010-.113l.293-.507 1.12-1.977c.024-.041.012-.062-.035-.062H9.2c-.059 0-.073-.026-.043-.077l1.434-2.505a.107.107 0 000-.114L9.225 1.774a.06.06 0 00-.053-.031zm6.29 8.02c.046 0 .058.02.034.06l-.832 1.465-2.613 4.585a.056.056 0 01-.05.029.058.058 0 01-.05-.029L8.498 9.841c-.02-.034-.01-.052.028-.054l.216-.012 6.722-.012z"
/>
</svg>
{:else if family === "huggingface"}
<svg class="w-6 h-6 {className}" viewBox="0 0 24 24" fill="currentColor">
<path

View File

@@ -22,8 +22,6 @@
glm: "GLM",
minimax: "MiniMax",
kimi: "Kimi",
flux: "FLUX",
"qwen-image": "Qwen Img",
};
function getFamilyName(family: string): string {
@@ -34,7 +32,7 @@
</script>
<div
class="flex flex-col gap-1 py-2 px-1 border-r border-exo-yellow/10 bg-exo-medium-gray/30 min-w-[64px] overflow-y-auto scrollbar-hide"
class="flex flex-col gap-1 py-2 px-1 border-r border-exo-yellow/10 bg-exo-medium-gray/30 min-w-[64px]"
>
<!-- All models (no filter) -->
<button

View File

@@ -64,8 +64,6 @@
"1024x1024",
"1024x768",
"768x1024",
"1024x1365",
"1365x1024",
];
const qualityOptions: ImageGenerationParams["quality"][] = [
@@ -150,15 +148,6 @@
setImageGenerationParams({ guidance: null });
}
function handleNumSyncStepsChange(event: Event) {
const value = parseInt((event.target as HTMLInputElement).value, 10);
setImageGenerationParams({ numSyncSteps: value });
}
function clearNumSyncSteps() {
setImageGenerationParams({ numSyncSteps: null });
}
function handleReset() {
resetImageGenerationParams();
showAdvanced = false;
@@ -168,8 +157,7 @@
params.seed !== null ||
params.numInferenceSteps !== null ||
params.guidance !== null ||
(params.negativePrompt !== null && params.negativePrompt.trim() !== "") ||
params.numSyncSteps !== null,
(params.negativePrompt !== null && params.negativePrompt.trim() !== ""),
);
</script>
@@ -590,50 +578,7 @@
</div>
</div>
<!-- Row 3: Sync Steps -->
<div class="flex items-center gap-1.5">
<span
class="text-xs text-exo-light-gray uppercase tracking-wider whitespace-nowrap"
>SYNC STEPS:</span
>
<div class="flex items-center gap-2 flex-1 max-w-xs">
<input
type="range"
min="1"
max="100"
value={params.numSyncSteps ?? 1}
oninput={handleNumSyncStepsChange}
class="flex-1 h-1 bg-exo-medium-gray/50 rounded appearance-none cursor-pointer accent-exo-yellow"
/>
<span class="text-xs font-mono text-exo-yellow w-8 text-right">
{params.numSyncSteps ?? "--"}
</span>
{#if params.numSyncSteps !== null}
<button
type="button"
onclick={clearNumSyncSteps}
class="text-exo-light-gray hover:text-exo-yellow transition-colors"
title="Clear"
>
<svg
class="w-3 h-3"
fill="none"
viewBox="0 0 24 24"
stroke="currentColor"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M6 18L18 6M6 6l12 12"
/>
</svg>
</button>
{/if}
</div>
</div>
<!-- Row 4: Negative Prompt -->
<!-- Row 3: Negative Prompt -->
<div class="flex flex-col gap-1.5">
<span class="text-xs text-exo-light-gray uppercase tracking-wider"
>NEGATIVE PROMPT:</span

View File

@@ -24,8 +24,6 @@
{ id: "thinking", label: "Thinking" },
{ id: "code", label: "Code" },
{ id: "vision", label: "Vision" },
{ id: "image_gen", label: "Image Gen" },
{ id: "image_edit", label: "Image Edit" },
];
// Size ranges
@@ -144,45 +142,6 @@
stroke-linejoin="round"
/><circle cx="12" cy="12" r="3" /></svg
>
{:else if cap.id === "image_gen"}
<svg
class="w-3.5 h-3.5 inline-block"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
><rect
x="3"
y="3"
width="18"
height="18"
rx="2"
ry="2"
stroke-linecap="round"
stroke-linejoin="round"
/><circle cx="8.5" cy="8.5" r="1.5" /><path
d="M21 15l-5-5L5 21"
stroke-linecap="round"
stroke-linejoin="round"
/></svg
>
{:else if cap.id === "image_edit"}
<svg
class="w-3.5 h-3.5 inline-block"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
><path
d="M11 4H4a2 2 0 0 0-2 2v14a2 2 0 0 0 2 2h14a2 2 0 0 0 2-2v-7"
stroke-linecap="round"
stroke-linejoin="round"
/><path
d="M18.5 2.5a2.121 2.121 0 0 1 3 3L12 15l-4 1 1-4 9.5-9.5z"
stroke-linecap="round"
stroke-linejoin="round"
/></svg
>
{/if}
<span class="ml-1">{cap.label}</span>
</button>

View File

@@ -200,42 +200,9 @@
stroke-width="1.5"
title="Supports image generation"
>
<rect
x="3"
y="3"
width="18"
height="18"
rx="2"
ry="2"
stroke-linecap="round"
stroke-linejoin="round"
/>
<rect x="3" y="3" width="18" height="18" rx="2" ry="2" />
<circle cx="8.5" cy="8.5" r="1.5" />
<path
d="M21 15l-5-5L5 21"
stroke-linecap="round"
stroke-linejoin="round"
/>
</svg>
{:else if cap === "image_edit"}
<svg
class="w-3.5 h-3.5 text-white/40 flex-shrink-0"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
title="Supports image editing"
>
<path
d="M11 4H4a2 2 0 0 0-2 2v14a2 2 0 0 0 2 2h14a2 2 0 0 0 2-2v-7"
stroke-linecap="round"
stroke-linejoin="round"
/>
<path
d="M18.5 2.5a2.121 2.121 0 0 1 3 3L12 15l-4 1 1-4 9.5-9.5z"
stroke-linecap="round"
stroke-linejoin="round"
/>
<path d="M21 15l-5-5L5 21" />
</svg>
{/if}
{/each}

View File

@@ -363,8 +363,6 @@
"deepseek",
"gpt-oss",
"llama",
"flux",
"qwen-image",
];
return Array.from(families).sort((a, b) => {
const aIdx = familyOrder.indexOf(a);
@@ -608,7 +606,7 @@
/>
<!-- Model list -->
<div class="flex-1 overflow-y-auto scrollbar-hide flex flex-col">
<div class="flex-1 overflow-y-auto flex flex-col">
{#if selectedFamily === "huggingface"}
<!-- HuggingFace Hub view -->
<div class="flex-1 flex flex-col min-h-0">
@@ -626,7 +624,7 @@
</div>
<!-- Results list -->
<div class="flex-1 overflow-y-auto scrollbar-hide">
<div class="flex-1 overflow-y-auto">
{#if hfIsLoadingTrending && hfTrendingModels.length === 0}
<div
class="flex items-center justify-center py-12 text-white/40"

View File

@@ -286,14 +286,7 @@ const IMAGE_PARAMS_STORAGE_KEY = "exo-image-generation-params";
// Image generation params interface matching backend API
export interface ImageGenerationParams {
// Basic params
size:
| "512x512"
| "768x768"
| "1024x1024"
| "1024x768"
| "768x1024"
| "1024x1365"
| "1365x1024";
size: "512x512" | "768x768" | "1024x1024" | "1024x768" | "768x1024";
quality: "low" | "medium" | "high";
outputFormat: "png" | "jpeg";
numImages: number;
@@ -305,7 +298,6 @@ export interface ImageGenerationParams {
numInferenceSteps: number | null;
guidance: number | null;
negativePrompt: string | null;
numSyncSteps: number | null;
// Edit mode params
inputFidelity: "low" | "high";
}
@@ -327,7 +319,6 @@ const DEFAULT_IMAGE_PARAMS: ImageGenerationParams = {
numInferenceSteps: null,
guidance: null,
negativePrompt: null,
numSyncSteps: null,
inputFidelity: "low",
};
@@ -2405,9 +2396,7 @@ class AppStore {
params.seed !== null ||
params.numInferenceSteps !== null ||
params.guidance !== null ||
(params.negativePrompt !== null &&
params.negativePrompt.trim() !== "") ||
params.numSyncSteps !== null;
(params.negativePrompt !== null && params.negativePrompt.trim() !== "");
const requestBody: Record<string, unknown> = {
model,
@@ -2432,9 +2421,6 @@ class AppStore {
params.negativePrompt.trim() !== "" && {
negative_prompt: params.negativePrompt,
}),
...(params.numSyncSteps !== null && {
num_sync_steps: params.numSyncSteps,
}),
};
}
@@ -2684,19 +2670,11 @@ class AppStore {
formData.append("input_fidelity", params.inputFidelity);
// Advanced params
const hasAdvancedParams =
params.seed !== null ||
params.numInferenceSteps !== null ||
params.guidance !== null ||
(params.negativePrompt !== null &&
params.negativePrompt.trim() !== "") ||
params.numSyncSteps !== null;
if (hasAdvancedParams) {
if (params.seed !== null) {
formData.append(
"advanced_params",
JSON.stringify({
...(params.seed !== null && { seed: params.seed }),
seed: params.seed,
...(params.numInferenceSteps !== null && {
num_inference_steps: params.numInferenceSteps,
}),
@@ -2705,9 +2683,24 @@ class AppStore {
params.negativePrompt.trim() !== "" && {
negative_prompt: params.negativePrompt,
}),
...(params.numSyncSteps !== null && {
num_sync_steps: params.numSyncSteps,
}),
);
} else if (
params.numInferenceSteps !== null ||
params.guidance !== null ||
(params.negativePrompt !== null && params.negativePrompt.trim() !== "")
) {
formData.append(
"advanced_params",
JSON.stringify({
...(params.numInferenceSteps !== null && {
num_inference_steps: params.numInferenceSteps,
}),
...(params.guidance !== null && { guidance: params.guidance }),
...(params.negativePrompt !== null &&
params.negativePrompt.trim() !== "" && {
negative_prompt: params.negativePrompt,
}),
}),
);
}

View File

@@ -41,7 +41,7 @@ let
mlx = stdenv.mkDerivation rec {
pname = "mlx";
version = let v = "0.30.5"; in
version = let v = "0.30.4"; in
assert v == uvLockMlxVersion || throw "MLX version mismatch: nix/mlx.nix has ${v} but uv.lock has ${uvLockMlxVersion}. Update both the version and hash in nix/mlx.nix.";
v;
pyproject = true;
@@ -50,7 +50,7 @@ let
owner = "ml-explore";
repo = "mlx";
tag = "v${version}";
hash = "sha256-SV/3MXt+SuJ69XfLfXycold6KgtXSM7OE0KwMSNw+eE=";
hash = "sha256-OJk6jPlbaSlsUdk3ADz3tWcRzTWXRof3/q8Soe1AO6w=";
};
patches = [

View File

@@ -17,16 +17,16 @@ dependencies = [
"loguru>=0.7.3",
"exo_pyo3_bindings", # rust bindings
"anyio==4.11.0",
"mlx==0.30.5; sys_platform == 'darwin'",
"mlx[cpu]==0.30.5; sys_platform == 'linux'",
"mlx-lm==0.30.6",
"mlx==0.30.4; sys_platform == 'darwin'",
"mlx[cpu]==0.30.4; sys_platform == 'linux'",
"mlx-lm",
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
"hypercorn>=0.18.0",
"openai-harmony>=0.0.8",
"httpx>=0.28.1",
"tomlkit>=0.14.0",
"pillow>=11.0,<12.0", # compatibility with mflux
"mflux==0.15.5",
"mflux==0.15.4",
"python-multipart>=0.0.21",
]
@@ -57,12 +57,11 @@ dev = [
[tool.uv.workspace]
members = [
"rust/exo_pyo3_bindings",
"bench",
]
[tool.uv.sources]
exo_pyo3_bindings = { workspace = true }
#mlx-lm = { git = "https://github.com/davidmcc73/mlx-lm", branch = "stable" }
mlx-lm = { git = "https://github.com/ml-explore/mlx-lm", branch = "main" }
# Uncomment to use local mlx/mlx-lm development versions:
# mlx = { path = "/Users/Shared/mlx", editable=true }
# mlx-lm = { path = "/Users/Shared/mlx-lm", editable=true }
@@ -104,7 +103,6 @@ root = "src"
# supported platforms for this project
[tool.uv]
required-version = ">=0.8.6"
prerelease = "allow"
environments = [
"sys_platform == 'darwin'",

View File

@@ -66,23 +66,7 @@
EXO_DASHBOARD_DIR = self'.packages.dashboard;
EXO_RESOURCES_DIR = inputs.self + /resources;
};
text = ''exec python ${path} "$@"'';
};
benchVenv = pythonSet.mkVirtualEnv "exo-bench-env" {
exo-bench = [ ];
};
mkBenchScript = name: path: pkgs.writeShellApplication {
inherit name;
runtimeInputs = [ benchVenv ];
text = ''exec python ${path} "$@"'';
};
mkSimplePythonScript = name: path: pkgs.writeShellApplication {
inherit name;
runtimeInputs = [ pkgs.python313 ];
text = ''exec python ${path} "$@"'';
text = ''python ${path}'';
};
exoPackage = pkgs.runCommand "exo"
@@ -101,14 +85,12 @@
in
{
# Python package only available on macOS (requires MLX/Metal)
packages = lib.optionalAttrs pkgs.stdenv.hostPlatform.isDarwin
{
exo = exoPackage;
# Test environment for running pytest outside of Nix sandbox (needs GPU access)
exo-test-env = testVenv;
} // {
exo-bench = mkBenchScript "exo-bench" (inputs.self + /bench/exo_bench.py);
exo-get-all-models-on-cluster = mkSimplePythonScript "exo-get-all-models-on-cluster" (inputs.self + /tests/get_all_models_on_cluster.py);
packages = lib.optionalAttrs pkgs.stdenv.hostPlatform.isDarwin {
exo = exoPackage;
# Test environment for running pytest outside of Nix sandbox (needs GPU access)
exo-test-env = testVenv;
exo-bench = mkPythonScript "exo-bench" (inputs.self + /bench/exo_bench.py);
exo-distributed-test = mkPythonScript "exo-distributed-test" (inputs.self + /tests/headless_runner.py);
};
checks = {

View File

@@ -1,49 +0,0 @@
model_id = "exolabs/FLUX.1-Kontext-dev-4bit"
n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
family = "flux"
quantization = "4bit"
base_model = "FLUX.1 Kontext"
capabilities = ["image_edit"]
[storage_size]
in_bytes = 15475325472
[[components]]
component_name = "text_encoder"
component_path = "text_encoder/"
n_layers = 12
can_shard = false
[components.storage_size]
in_bytes = 0
[[components]]
component_name = "text_encoder_2"
component_path = "text_encoder_2/"
n_layers = 24
can_shard = false
safetensors_index_filename = "model.safetensors.index.json"
[components.storage_size]
in_bytes = 9524621312
[[components]]
component_name = "transformer"
component_path = "transformer/"
n_layers = 57
can_shard = true
safetensors_index_filename = "diffusion_pytorch_model.safetensors.index.json"
[components.storage_size]
in_bytes = 5950704160
[[components]]
component_name = "vae"
component_path = "vae/"
can_shard = false
[components.storage_size]
in_bytes = 0

View File

@@ -1,49 +0,0 @@
model_id = "exolabs/FLUX.1-Kontext-dev-8bit"
n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
family = "flux"
quantization = "8bit"
base_model = "FLUX.1 Kontext"
capabilities = ["image_edit"]
[storage_size]
in_bytes = 21426029632
[[components]]
component_name = "text_encoder"
component_path = "text_encoder/"
n_layers = 12
can_shard = false
[components.storage_size]
in_bytes = 0
[[components]]
component_name = "text_encoder_2"
component_path = "text_encoder_2/"
n_layers = 24
can_shard = false
safetensors_index_filename = "model.safetensors.index.json"
[components.storage_size]
in_bytes = 9524621312
[[components]]
component_name = "transformer"
component_path = "transformer/"
n_layers = 57
can_shard = true
safetensors_index_filename = "diffusion_pytorch_model.safetensors.index.json"
[components.storage_size]
in_bytes = 11901408320
[[components]]
component_name = "vae"
component_path = "vae/"
can_shard = false
[components.storage_size]
in_bytes = 0

View File

@@ -1,49 +0,0 @@
model_id = "exolabs/FLUX.1-Kontext-dev"
n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
family = "flux"
quantization = ""
base_model = "FLUX.1 Kontext"
capabilities = ["image_edit"]
[storage_size]
in_bytes = 33327437952
[[components]]
component_name = "text_encoder"
component_path = "text_encoder/"
n_layers = 12
can_shard = false
[components.storage_size]
in_bytes = 0
[[components]]
component_name = "text_encoder_2"
component_path = "text_encoder_2/"
n_layers = 24
can_shard = false
safetensors_index_filename = "model.safetensors.index.json"
[components.storage_size]
in_bytes = 9524621312
[[components]]
component_name = "transformer"
component_path = "transformer/"
n_layers = 57
can_shard = true
safetensors_index_filename = "diffusion_pytorch_model.safetensors.index.json"
[components.storage_size]
in_bytes = 23802816640
[[components]]
component_name = "vae"
component_path = "vae/"
can_shard = false
[components.storage_size]
in_bytes = 0

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = "4bit"
base_model = "FLUX.1 Krea"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 15475325472

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = "8bit"
base_model = "FLUX.1 Krea"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 21426029632

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = ""
base_model = "FLUX.1 Krea"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 33327437952

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = "4bit"
base_model = "FLUX.1 Dev"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 15475325472

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = "8bit"
base_model = "FLUX.1 Dev"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 21426029632

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = ""
base_model = "FLUX.1 Dev"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 33327437952

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = "4bit"
base_model = "FLUX.1 Schnell"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 15470210592

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = "8bit"
base_model = "FLUX.1 Schnell"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 21415799872

View File

@@ -3,10 +3,6 @@ n_layers = 57
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
family = "flux"
quantization = ""
base_model = "FLUX.1 Schnell"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 33306978432

View File

@@ -4,10 +4,6 @@ hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
uses_cfg = true
family = "qwen-image"
quantization = "4bit"
base_model = "Qwen Image"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 26799533856

View File

@@ -4,10 +4,6 @@ hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
uses_cfg = true
family = "qwen-image"
quantization = "8bit"
base_model = "Qwen Image"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 37014734400

View File

@@ -4,10 +4,6 @@ hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
uses_cfg = true
family = "qwen-image"
quantization = "4bit"
base_model = "Qwen Image Edit"
capabilities = ["image_edit"]
[storage_size]
in_bytes = 26799533856

View File

@@ -4,10 +4,6 @@ hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
uses_cfg = true
family = "qwen-image"
quantization = "8bit"
base_model = "Qwen Image Edit"
capabilities = ["image_edit"]
[storage_size]
in_bytes = 37014734400

View File

@@ -4,10 +4,6 @@ hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
uses_cfg = true
family = "qwen-image"
quantization = ""
base_model = "Qwen Image Edit"
capabilities = ["image_edit"]
[storage_size]
in_bytes = 57445135488

View File

@@ -4,10 +4,6 @@ hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
uses_cfg = true
family = "qwen-image"
quantization = ""
base_model = "Qwen Image"
capabilities = ["image_gen"]
[storage_size]
in_bytes = 57445135488

View File

@@ -158,78 +158,6 @@ async def seed_models(seed_dir: str | Path):
logger.error(traceback.format_exc())
async def _build_file_list_from_local_directory(
model_id: ModelId,
recursive: bool = False,
) -> list[FileListEntry] | None:
"""Build a file list from locally existing model files.
We can only figure out the files we need from safetensors index, so
a local directory must contain a *.safetensors.index.json and
safetensors listed there.
"""
model_dir = (await ensure_models_dir()) / model_id.normalize()
if not await aios.path.exists(model_dir):
return None
def _scan() -> list[FileListEntry] | None:
index_files = list(model_dir.glob("**/*.safetensors.index.json"))
if not index_files:
return None
entries_by_path: dict[str, FileListEntry] = {}
if recursive:
for dirpath, _, filenames in os.walk(model_dir):
for filename in filenames:
if filename.endswith(".partial"):
continue
full_path = Path(dirpath) / filename
rel_path = str(full_path.relative_to(model_dir))
entries_by_path[rel_path] = FileListEntry(
type="file",
path=rel_path,
size=full_path.stat().st_size,
)
else:
for item in model_dir.iterdir():
if item.is_file() and not item.name.endswith(".partial"):
entries_by_path[item.name] = FileListEntry(
type="file",
path=item.name,
size=item.stat().st_size,
)
# Add expected weight files from index that haven't been downloaded yet
for index_file in index_files:
try:
index_data = ModelSafetensorsIndex.model_validate_json(
index_file.read_text()
)
relative_dir = index_file.parent.relative_to(model_dir)
for filename in set(index_data.weight_map.values()):
rel_path = (
str(relative_dir / filename)
if relative_dir != Path(".")
else filename
)
if rel_path not in entries_by_path:
entries_by_path[rel_path] = FileListEntry(
type="file",
path=rel_path,
size=None,
)
except Exception:
continue
return list(entries_by_path.values())
file_list = await asyncio.to_thread(_scan)
if not file_list:
return None
return file_list
_fetched_file_lists_this_session: set[str] = set()
@@ -255,14 +183,6 @@ async def fetch_file_list_with_cache(
if await aios.path.exists(cache_file):
async with aiofiles.open(cache_file, "r") as f:
return TypeAdapter(list[FileListEntry]).validate_json(await f.read())
local_file_list = await _build_file_list_from_local_directory(
model_id, recursive
)
if local_file_list is not None:
logger.warning(
f"No internet and no cached file list for {model_id} - using local file list"
)
return local_file_list
raise FileNotFoundError(
f"No internet connection and no cached file list for {model_id}"
)
@@ -283,18 +203,10 @@ async def fetch_file_list_with_cache(
except Exception as e:
if await aios.path.exists(cache_file):
logger.warning(
f"No internet and no cached file list for {model_id} - using local file list"
f"Failed to fetch file list for {model_id}, using cached data: {e}"
)
async with aiofiles.open(cache_file, "r") as f:
return TypeAdapter(list[FileListEntry]).validate_json(await f.read())
local_file_list = await _build_file_list_from_local_directory(
model_id, recursive
)
if local_file_list is not None:
logger.warning(
f"Failed to fetch file list for {model_id} and no cache exists, "
)
return local_file_list
raise FileNotFoundError(f"Failed to fetch file list for {model_id}: {e}") from e

View File

@@ -195,10 +195,6 @@ class ResumableShardDownloader(ShardDownloader):
self, shard: ShardMetadata
) -> RepoDownloadProgress:
_, progress = await download_shard(
shard,
self.on_progress_wrapper,
skip_download=True,
skip_internet=not self.internet_connection,
on_connection_lost=lambda: self.set_internet_connection(False),
shard, self.on_progress_wrapper, skip_download=True
)
return progress

View File

@@ -3,6 +3,7 @@
import time
from collections.abc import AsyncGenerator
from typing import Any
from uuid import uuid4
from exo.shared.types.api import (
ChatCompletionChoice,
@@ -140,7 +141,7 @@ async def generate_chat_stream(
if isinstance(chunk, ToolCallChunk):
tool_call_deltas = [
ToolCall(
id=tool.id,
id=str(uuid4()),
index=i,
function=tool,
)
@@ -206,7 +207,7 @@ async def collect_chat_response(
if isinstance(chunk, ToolCallChunk):
tool_calls.extend(
ToolCall(
id=tool.id,
id=str(uuid4()),
index=i,
function=tool,
)

View File

@@ -3,6 +3,7 @@
import json
from collections.abc import AsyncGenerator
from typing import Any
from uuid import uuid4
from exo.shared.types.api import FinishReason
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
@@ -178,7 +179,7 @@ async def collect_claude_response(
for tool in chunk.tool_calls:
tool_use_blocks.append(
ClaudeToolUseBlock(
id=f"toolu_{tool.id}",
id=f"toolu_{uuid4().hex[:24]}",
name=tool.name,
input=json.loads(tool.arguments), # pyright: ignore[reportAny]
)
@@ -263,7 +264,7 @@ async def generate_claude_stream(
# Emit tool_use content blocks
for tool in chunk.tool_calls:
tool_id = f"toolu_{tool.id}"
tool_id = f"toolu_{uuid4().hex[:24]}"
tool_input_json = tool.arguments
# content_block_start for tool_use

View File

@@ -3,6 +3,7 @@
from collections.abc import AsyncGenerator
from itertools import count
from typing import Any
from uuid import uuid4
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
from exo.shared.types.common import CommandId
@@ -139,8 +140,8 @@ async def collect_responses_response(
for tool in chunk.tool_calls:
function_call_items.append(
ResponseFunctionCallItem(
id=f"fc_{tool.id}",
call_id=f"call_{tool.id}",
id=f"fc_{uuid4().hex[:24]}",
call_id=f"call_{uuid4().hex[:24]}",
name=tool.name,
arguments=tool.arguments,
)
@@ -245,8 +246,8 @@ async def generate_responses_stream(
if isinstance(chunk, ToolCallChunk):
last_stats = chunk.stats or last_stats
for tool in chunk.tool_calls:
fc_id = f"fc_{tool.id}"
call_id = f"call_{tool.id}"
fc_id = f"fc_{uuid4().hex[:24]}"
call_id = f"call_{uuid4().hex[:24]}"
# response.output_item.added for function_call
fc_item = ResponseFunctionCallItem(

View File

@@ -332,11 +332,7 @@ class Master:
]
case RequestEventLog():
# We should just be able to send everything, since other buffers will ignore old messages
# rate limit to 1000 at a time
for i in range(
command.since_idx,
min(command.since_idx + 1000, len(self._event_log)),
):
for i in range(command.since_idx, len(self._event_log)):
await self._send_event(
IndexedEvent(idx=i, event=self._event_log[i])
)

View File

@@ -208,58 +208,19 @@ def apply_runner_deleted(event: RunnerDeleted, state: State) -> State:
def apply_node_timed_out(event: NodeTimedOut, state: State) -> State:
topology = copy.deepcopy(state.topology)
topology.remove_node(event.node_id)
last_seen = {
key: value for key, value in state.last_seen.items() if key != event.node_id
}
downloads = {
key: value for key, value in state.downloads.items() if key != event.node_id
}
# Clean up all granular node mappings
node_identities = {
key: value
for key, value in state.node_identities.items()
if key != event.node_id
}
node_memory = {
key: value for key, value in state.node_memory.items() if key != event.node_id
}
node_system = {
key: value for key, value in state.node_system.items() if key != event.node_id
}
node_network = {
key: value for key, value in state.node_network.items() if key != event.node_id
}
node_thunderbolt = {
key: value
for key, value in state.node_thunderbolt.items()
if key != event.node_id
}
node_thunderbolt_bridge = {
key: value
for key, value in state.node_thunderbolt_bridge.items()
if key != event.node_id
}
# Only recompute cycles if the leaving node had TB bridge enabled
leaving_node_status = state.node_thunderbolt_bridge.get(event.node_id)
leaving_node_had_tb_enabled = (
leaving_node_status is not None and leaving_node_status.enabled
)
thunderbolt_bridge_cycles = (
topology.get_thunderbolt_bridge_cycles(node_thunderbolt_bridge, node_network)
topology.get_thunderbolt_bridge_cycles(state.node_thunderbolt_bridge, state.node_network)
if leaving_node_had_tb_enabled
else [list(cycle) for cycle in state.thunderbolt_bridge_cycles]
)
return state.model_copy(
update={
"downloads": downloads,
"topology": topology,
"last_seen": last_seen,
"node_identities": node_identities,
"node_memory": node_memory,
"node_system": node_system,
"node_network": node_network,
"node_thunderbolt": node_thunderbolt,
"node_thunderbolt_bridge": node_thunderbolt_bridge,
"thunderbolt_bridge_cycles": thunderbolt_bridge_cycles,
}
)

View File

@@ -1,7 +1,6 @@
import time
from collections.abc import Generator
from typing import Annotated, Any, Literal
from uuid import uuid4
from pydantic import BaseModel, Field, field_validator
from pydantic_core import PydanticUseDefault
@@ -61,7 +60,6 @@ class ChatCompletionMessageText(BaseModel):
class ToolCallItem(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
name: str
arguments: str
@@ -274,7 +272,6 @@ class AdvancedImageParams(BaseModel):
num_inference_steps: Annotated[int, Field(ge=1, le=100)] | None = None
guidance: Annotated[float, Field(ge=1.0, le=20.0)] | None = None
negative_prompt: str | None = None
num_sync_steps: Annotated[int, Field(ge=1, le=100)] | None = None
class ImageGenerationTaskParams(BaseModel):

View File

@@ -3,11 +3,10 @@
from collections.abc import Sequence
from mlx_lm.models.cache import (
ArraysCache,
KVCache,
QuantizedKVCache,
RotatingKVCache,
)
# This list contains one cache entry per transformer layer
KVCacheType = Sequence[KVCache | RotatingKVCache | QuantizedKVCache | ArraysCache]
KVCacheType = Sequence[KVCache | RotatingKVCache | QuantizedKVCache]

View File

@@ -1,4 +1,5 @@
from enum import Enum
from math import ceil
from pydantic import BaseModel
@@ -22,7 +23,7 @@ class ImageModelConfig(BaseModel):
block_configs: tuple[TransformerBlockConfig, ...]
default_steps: dict[str, int] # {"low": X, "medium": Y, "high": Z}
num_sync_steps: int # Number of sync steps for distributed inference
num_sync_steps_factor: float # Fraction of steps for sync phase
guidance_scale: float | None = None # None or <= 1.0 disables CFG
@@ -44,3 +45,6 @@ class ImageModelConfig(BaseModel):
def get_steps_for_quality(self, quality: str) -> int:
return self.default_steps[quality]
def get_num_sync_steps(self, steps: int) -> int:
return ceil(steps * self.num_sync_steps_factor)

View File

@@ -150,10 +150,7 @@ class DistributedImageModel:
guidance=guidance_override if guidance_override is not None else 4.0,
)
if advanced_params is not None and advanced_params.num_sync_steps is not None:
num_sync_steps = advanced_params.num_sync_steps
else:
num_sync_steps = self._config.num_sync_steps
num_sync_steps = self._config.get_num_sync_steps(steps)
for result in self._runner.generate_image(
runtime_config=config,

View File

@@ -5,9 +5,7 @@ from exo.worker.engines.image.config import ImageModelConfig
from exo.worker.engines.image.models.base import ModelAdapter
from exo.worker.engines.image.models.flux import (
FLUX_DEV_CONFIG,
FLUX_KONTEXT_CONFIG,
FLUX_SCHNELL_CONFIG,
FluxKontextModelAdapter,
FluxModelAdapter,
)
from exo.worker.engines.image.models.qwen import (
@@ -28,16 +26,13 @@ AdapterFactory = Callable[
# Registry maps model_family string to adapter factory
_ADAPTER_REGISTRY: dict[str, AdapterFactory] = {
"flux": FluxModelAdapter,
"flux-kontext": FluxKontextModelAdapter,
"qwen-edit": QwenEditModelAdapter,
"qwen": QwenModelAdapter,
}
# Config registry: maps model ID patterns to configs
# Order matters: longer/more-specific patterns must come before shorter ones
_CONFIG_REGISTRY: dict[str, ImageModelConfig] = {
"flux.1-schnell": FLUX_SCHNELL_CONFIG,
"flux.1-kontext": FLUX_KONTEXT_CONFIG, # Must come before "flux.1-dev" for pattern matching
"flux.1-krea-dev": FLUX_DEV_CONFIG, # Must come before "flux.1-dev" for pattern matching
"flux.1-dev": FLUX_DEV_CONFIG,
"qwen-image-edit": QWEN_IMAGE_EDIT_CONFIG, # Must come before "qwen-image" for pattern matching

View File

@@ -66,19 +66,6 @@ class PromptData(ABC):
"""
...
@property
@abstractmethod
def kontext_image_ids(self) -> mx.array | None:
"""Kontext-style position IDs for image conditioning.
For FLUX.1-Kontext models, returns position IDs with first_coord=1
to distinguish conditioning tokens from generation tokens (first_coord=0).
Returns:
Position IDs array [1, seq_len, 3] for Kontext, None for other models.
"""
...
@abstractmethod
def get_batched_cfg_data(
self,

View File

@@ -1,17 +1,11 @@
from exo.worker.engines.image.models.flux.adapter import FluxModelAdapter
from exo.worker.engines.image.models.flux.config import (
FLUX_DEV_CONFIG,
FLUX_KONTEXT_CONFIG,
FLUX_SCHNELL_CONFIG,
)
from exo.worker.engines.image.models.flux.kontext_adapter import (
FluxKontextModelAdapter,
)
__all__ = [
"FluxModelAdapter",
"FluxKontextModelAdapter",
"FLUX_DEV_CONFIG",
"FLUX_KONTEXT_CONFIG",
"FLUX_SCHNELL_CONFIG",
]

View File

@@ -59,10 +59,6 @@ class FluxPromptData(PromptData):
def conditioning_latents(self) -> mx.array | None:
return None
@property
def kontext_image_ids(self) -> mx.array | None:
return None
def get_batched_cfg_data(
self,
) -> tuple[mx.array, mx.array, mx.array | None, mx.array | None] | None:

View File

@@ -15,7 +15,7 @@ FLUX_SCHNELL_CONFIG = ImageModelConfig(
),
),
default_steps={"low": 1, "medium": 2, "high": 4},
num_sync_steps=1,
num_sync_steps_factor=0.5, # 1 sync step for medium (2 steps)
)
@@ -30,21 +30,5 @@ FLUX_DEV_CONFIG = ImageModelConfig(
),
),
default_steps={"low": 10, "medium": 25, "high": 50},
num_sync_steps=4,
)
FLUX_KONTEXT_CONFIG = ImageModelConfig(
model_family="flux-kontext",
block_configs=(
TransformerBlockConfig(
block_type=BlockType.JOINT, count=19, has_separate_text_output=True
),
TransformerBlockConfig(
block_type=BlockType.SINGLE, count=38, has_separate_text_output=False
),
),
default_steps={"low": 10, "medium": 25, "high": 50},
num_sync_steps=4,
guidance_scale=4.0,
num_sync_steps_factor=0.125, # ~3 sync steps for medium (25 steps)
)

View File

@@ -1,348 +0,0 @@
import math
from pathlib import Path
from typing import Any, final
import mlx.core as mx
from mflux.models.common.config.config import Config
from mflux.models.common.config.model_config import ModelConfig
from mflux.models.flux.latent_creator.flux_latent_creator import FluxLatentCreator
from mflux.models.flux.model.flux_text_encoder.prompt_encoder import PromptEncoder
from mflux.models.flux.model.flux_transformer.transformer import Transformer
from mflux.models.flux.variants.kontext.flux_kontext import Flux1Kontext
from mflux.models.flux.variants.kontext.kontext_util import KontextUtil
from exo.worker.engines.image.config import ImageModelConfig
from exo.worker.engines.image.models.base import (
ModelAdapter,
PromptData,
RotaryEmbeddings,
)
from exo.worker.engines.image.models.flux.wrappers import (
FluxJointBlockWrapper,
FluxSingleBlockWrapper,
)
from exo.worker.engines.image.pipeline.block_wrapper import (
JointBlockWrapper,
SingleBlockWrapper,
)
@final
class FluxKontextPromptData(PromptData):
"""Prompt data for FLUX.1-Kontext image editing.
Stores text embeddings along with conditioning latents and position IDs
for the input image.
"""
def __init__(
self,
prompt_embeds: mx.array,
pooled_prompt_embeds: mx.array,
conditioning_latents: mx.array,
kontext_image_ids: mx.array,
):
self._prompt_embeds = prompt_embeds
self._pooled_prompt_embeds = pooled_prompt_embeds
self._conditioning_latents = conditioning_latents
self._kontext_image_ids = kontext_image_ids
@property
def prompt_embeds(self) -> mx.array:
return self._prompt_embeds
@property
def pooled_prompt_embeds(self) -> mx.array:
return self._pooled_prompt_embeds
@property
def negative_prompt_embeds(self) -> mx.array | None:
return None
@property
def negative_pooled_prompt_embeds(self) -> mx.array | None:
return None
def get_encoder_hidden_states_mask(self, positive: bool = True) -> mx.array | None:
return None
@property
def cond_image_grid(
self,
) -> tuple[int, int, int] | list[tuple[int, int, int]] | None:
return None
@property
def conditioning_latents(self) -> mx.array | None:
"""VAE-encoded input image latents for Kontext conditioning."""
return self._conditioning_latents
@property
def kontext_image_ids(self) -> mx.array | None:
"""Position IDs for Kontext conditioning (first_coord=1)."""
return self._kontext_image_ids
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
"""Kontext doesn't use CFG, but we return positive data for compatibility."""
return (
self._prompt_embeds,
None,
self._pooled_prompt_embeds,
self._conditioning_latents,
)
def get_batched_cfg_data(
self,
) -> tuple[mx.array, mx.array, mx.array | None, mx.array | None] | None:
# Kontext doesn't use CFG
return None
@final
class FluxKontextModelAdapter(ModelAdapter[Flux1Kontext, Transformer]):
"""Adapter for FLUX.1-Kontext image editing model.
Key differences from standard FluxModelAdapter:
- Takes an input image and computes output dimensions from it
- Creates conditioning latents from the input image via VAE
- Creates special position IDs (kontext_image_ids) for conditioning tokens
- Creates pure noise latents (not img2img blending)
"""
def __init__(
self,
config: ImageModelConfig,
model_id: str,
local_path: Path,
quantize: int | None = None,
):
self._config = config
self._model = Flux1Kontext(
model_config=ModelConfig.from_name(model_name=model_id, base_model=None),
model_path=str(local_path),
quantize=quantize,
)
self._transformer = self._model.transformer
# Stores image path and computed dimensions after set_image_dimensions
self._image_path: str | None = None
self._output_height: int | None = None
self._output_width: int | None = None
@property
def hidden_dim(self) -> int:
return self._transformer.x_embedder.weight.shape[0] # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
@property
def needs_cfg(self) -> bool:
return False
def _get_latent_creator(self) -> type:
return FluxLatentCreator
def get_joint_block_wrappers(
self,
text_seq_len: int,
encoder_hidden_states_mask: mx.array | None = None,
) -> list[JointBlockWrapper[Any]]:
"""Create wrapped joint blocks for Flux Kontext."""
return [
FluxJointBlockWrapper(block, text_seq_len)
for block in self._transformer.transformer_blocks
]
def get_single_block_wrappers(
self,
text_seq_len: int,
) -> list[SingleBlockWrapper[Any]]:
"""Create wrapped single blocks for Flux Kontext."""
return [
FluxSingleBlockWrapper(block, text_seq_len)
for block in self._transformer.single_transformer_blocks
]
def slice_transformer_blocks(
self,
start_layer: int,
end_layer: int,
):
all_joint = list(self._transformer.transformer_blocks)
all_single = list(self._transformer.single_transformer_blocks)
total_joint_blocks = len(all_joint)
if end_layer <= total_joint_blocks:
# All assigned are joint blocks
joint_start, joint_end = start_layer, end_layer
single_start, single_end = 0, 0
elif start_layer >= total_joint_blocks:
# All assigned are single blocks
joint_start, joint_end = 0, 0
single_start = start_layer - total_joint_blocks
single_end = end_layer - total_joint_blocks
else:
# Spans both joint and single
joint_start, joint_end = start_layer, total_joint_blocks
single_start = 0
single_end = end_layer - total_joint_blocks
self._transformer.transformer_blocks = all_joint[joint_start:joint_end]
self._transformer.single_transformer_blocks = all_single[
single_start:single_end
]
def set_image_dimensions(self, image_path: Path) -> tuple[int, int]:
"""Compute and store dimensions from input image.
Also stores image_path for use in encode_prompt().
Args:
image_path: Path to the input image
Returns:
(output_width, output_height) for runtime config
"""
from mflux.utils.image_util import ImageUtil
pil_image = ImageUtil.load_image(str(image_path)).convert("RGB")
image_size = pil_image.size
# Compute output dimensions from input image aspect ratio
# Target area of 1024x1024 = ~1M pixels
target_area = 1024 * 1024
ratio = image_size[0] / image_size[1]
output_width = math.sqrt(target_area * ratio)
output_height = output_width / ratio
output_width = round(output_width / 32) * 32
output_height = round(output_height / 32) * 32
# Ensure multiple of 16 for VAE
vae_scale_factor = 8
multiple_of = vae_scale_factor * 2
output_width = output_width // multiple_of * multiple_of
output_height = output_height // multiple_of * multiple_of
self._image_path = str(image_path)
self._output_width = int(output_width)
self._output_height = int(output_height)
return self._output_width, self._output_height
def create_latents(self, seed: int, runtime_config: Config) -> mx.array:
"""Create initial noise latents for Kontext.
Unlike standard img2img which blends noise with encoded input,
Kontext uses pure noise latents. The input image is provided
separately as conditioning.
"""
return FluxLatentCreator.create_noise(
seed=seed,
height=runtime_config.height,
width=runtime_config.width,
)
def encode_prompt(
self, prompt: str, negative_prompt: str | None = None
) -> FluxKontextPromptData:
"""Encode prompt and create conditioning from stored input image.
Must call set_image_dimensions() before this method.
Args:
prompt: Text prompt for editing
negative_prompt: Ignored (Kontext doesn't use CFG)
Returns:
FluxKontextPromptData with text embeddings and image conditioning
"""
del negative_prompt # Kontext doesn't support negative prompts or CFG
if (
self._image_path is None
or self._output_height is None
or self._output_width is None
):
raise RuntimeError(
"set_image_dimensions() must be called before encode_prompt() "
"for FluxKontextModelAdapter"
)
assert isinstance(self.model.prompt_cache, dict)
assert isinstance(self.model.tokenizers, dict)
# Encode text prompt
prompt_embeds, pooled_prompt_embeds = PromptEncoder.encode_prompt(
prompt=prompt,
prompt_cache=self.model.prompt_cache,
t5_tokenizer=self.model.tokenizers["t5"], # pyright: ignore[reportAny]
clip_tokenizer=self.model.tokenizers["clip"], # pyright: ignore[reportAny]
t5_text_encoder=self.model.t5_text_encoder,
clip_text_encoder=self.model.clip_text_encoder,
)
# Create conditioning latents from input image
conditioning_latents, kontext_image_ids = (
KontextUtil.create_image_conditioning_latents(
vae=self.model.vae,
height=self._output_height,
width=self._output_width,
image_path=self._image_path,
)
)
return FluxKontextPromptData(
prompt_embeds=prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
conditioning_latents=conditioning_latents,
kontext_image_ids=kontext_image_ids,
)
def compute_embeddings(
self,
hidden_states: mx.array,
prompt_embeds: mx.array,
) -> tuple[mx.array, mx.array]:
embedded_hidden = self._transformer.x_embedder(hidden_states)
embedded_encoder = self._transformer.context_embedder(prompt_embeds)
return embedded_hidden, embedded_encoder
def compute_text_embeddings(
self,
t: int,
runtime_config: Config,
pooled_prompt_embeds: mx.array | None = None,
hidden_states: mx.array | None = None,
) -> mx.array:
if pooled_prompt_embeds is None:
raise ValueError(
"pooled_prompt_embeds is required for Flux Kontext text embeddings"
)
return Transformer.compute_text_embeddings(
t, pooled_prompt_embeds, self._transformer.time_text_embed, runtime_config
)
def compute_rotary_embeddings(
self,
prompt_embeds: mx.array,
runtime_config: Config,
encoder_hidden_states_mask: mx.array | None = None,
cond_image_grid: tuple[int, int, int]
| list[tuple[int, int, int]]
| None = None,
kontext_image_ids: mx.array | None = None,
) -> RotaryEmbeddings:
return Transformer.compute_rotary_embeddings(
prompt_embeds,
self._transformer.pos_embed,
runtime_config,
kontext_image_ids,
)
def apply_guidance(
self,
noise_positive: mx.array,
noise_negative: mx.array,
guidance_scale: float,
) -> mx.array:
raise NotImplementedError("Flux Kontext does not use classifier-free guidance")

View File

@@ -69,10 +69,6 @@ class QwenPromptData(PromptData):
def conditioning_latents(self) -> mx.array | None:
return None
@property
def kontext_image_ids(self) -> mx.array | None:
return None
def get_batched_cfg_data(
self,
) -> tuple[mx.array, mx.array, mx.array | None, mx.array | None] | None:

View File

@@ -12,7 +12,7 @@ QWEN_IMAGE_CONFIG = ImageModelConfig(
),
),
default_steps={"low": 10, "medium": 25, "high": 50},
num_sync_steps=7,
num_sync_steps_factor=0.25,
guidance_scale=3.5, # Set to None or < 1.0 to disable CFG
)
@@ -24,6 +24,6 @@ QWEN_IMAGE_EDIT_CONFIG = ImageModelConfig(
),
),
default_steps={"low": 10, "medium": 25, "high": 50},
num_sync_steps=7,
num_sync_steps_factor=0.25,
guidance_scale=3.5,
)

View File

@@ -85,10 +85,6 @@ class QwenEditPromptData(PromptData):
def qwen_image_ids(self) -> mx.array:
return self._qwen_image_ids
@property
def kontext_image_ids(self) -> mx.array | None:
return None
@property
def is_edit_mode(self) -> bool:
return True

View File

@@ -567,7 +567,6 @@ class DiffusionRunner:
| list[tuple[int, int, int]]
| None = None,
conditioning_latents: mx.array | None = None,
kontext_image_ids: mx.array | None = None,
) -> mx.array:
"""Run a single forward pass through the transformer.
Args:
@@ -579,7 +578,6 @@ class DiffusionRunner:
encoder_hidden_states_mask: Attention mask for text (Qwen)
cond_image_grid: Conditioning image grid dimensions (Qwen edit)
conditioning_latents: Conditioning latents for edit mode
kontext_image_ids: Position IDs for Kontext conditioning (Flux Kontext)
Returns:
Noise prediction tensor
@@ -612,7 +610,6 @@ class DiffusionRunner:
config,
encoder_hidden_states_mask=encoder_hidden_states_mask,
cond_image_grid=cond_image_grid,
kontext_image_ids=kontext_image_ids,
)
assert self.joint_block_wrappers is not None
@@ -684,7 +681,6 @@ class DiffusionRunner:
prompt_data: PromptData,
) -> mx.array:
cond_image_grid = prompt_data.cond_image_grid
kontext_image_ids = prompt_data.kontext_image_ids
results: list[tuple[bool, mx.array]] = []
for branch in self._get_cfg_branches(prompt_data):
@@ -704,7 +700,6 @@ class DiffusionRunner:
encoder_hidden_states_mask=branch.mask,
cond_image_grid=cond_image_grid,
conditioning_latents=branch.cond_latents,
kontext_image_ids=kontext_image_ids,
)
results.append((branch.positive, noise))
@@ -907,10 +902,10 @@ class DiffusionRunner:
config: Config,
hidden_states: mx.array,
prompt_data: PromptData,
kontext_image_ids: mx.array | None = None,
) -> mx.array:
prev_latents = hidden_states
cond_image_grid = prompt_data.cond_image_grid
kontext_image_ids = prompt_data.kontext_image_ids
scaled_hidden_states = config.scheduler.scale_model_input(hidden_states, t) # pyright: ignore[reportAny]
original_latent_tokens: int = scaled_hidden_states.shape[1] # pyright: ignore[reportAny]
@@ -984,10 +979,10 @@ class DiffusionRunner:
latents: mx.array,
prompt_data: PromptData,
is_first_async_step: bool,
kontext_image_ids: mx.array | None = None,
) -> mx.array:
patch_latents, token_indices = self._create_patches(latents, config)
cond_image_grid = prompt_data.cond_image_grid
kontext_image_ids = prompt_data.kontext_image_ids
prev_patch_latents = [p for p in patch_latents]

View File

@@ -13,9 +13,6 @@ from mlx.nn.layers.distributed import (
shard_linear,
sum_gradients,
)
from mlx_lm.models.base import (
scaled_dot_product_attention, # pyright: ignore[reportUnknownVariableType]
)
from mlx_lm.models.deepseek_v3 import DeepseekV3MLP
from mlx_lm.models.deepseek_v3 import Model as DeepseekV3Model
from mlx_lm.models.deepseek_v32 import DeepseekV32MLP
@@ -28,21 +25,16 @@ from mlx_lm.models.gpt_oss import GptOssMoeModel
from mlx_lm.models.gpt_oss import Model as GptOssModel
from mlx_lm.models.kimi_k25 import Model as KimiK25Model
from mlx_lm.models.llama import Model as LlamaModel
from mlx_lm.models.minimax import MiniMaxAttention
from mlx_lm.models.minimax import Model as MiniMaxModel
from mlx_lm.models.ministral3 import Model as Ministral3Model
from mlx_lm.models.qwen3_moe import Model as Qwen3MoeModel
from mlx_lm.models.qwen3_moe import Qwen3MoeSparseMoeBlock
from mlx_lm.models.qwen3_next import Model as Qwen3NextModel
from mlx_lm.models.qwen3_next import Qwen3NextDecoderLayer, Qwen3NextSparseMoeBlock
from transformers.models.qwen3.modeling_qwen3 import Qwen3DecoderLayer
from mlx_lm.models.qwen3_next import Qwen3NextSparseMoeBlock
from exo.shared.logging import logger
from exo.shared.types.worker.shards import PipelineShardMetadata
if TYPE_CHECKING:
from mlx_lm.models.cache import Cache
TimeoutCallback = Callable[[], None]
@@ -386,15 +378,7 @@ def tensor_auto_parallel(
all_to_sharded_linear_in_place,
sharded_to_all_linear_in_place,
)
elif isinstance(model, Glm4MoeModel):
tensor_parallel_sharding_strategy = Glm4MoeShardingStrategy(
group,
all_to_sharded_linear,
sharded_to_all_linear,
all_to_sharded_linear_in_place,
sharded_to_all_linear_in_place,
)
elif isinstance(model, (Qwen3MoeModel, Qwen3NextModel)):
elif isinstance(model, (Qwen3MoeModel, Glm4MoeModel, Qwen3NextModel)):
tensor_parallel_sharding_strategy = QwenShardingStrategy(
group,
all_to_sharded_linear,
@@ -519,21 +503,12 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
layer.self_attn.q_b_proj = self.all_to_sharded_linear(
layer.self_attn.q_b_proj
)
layer.self_attn.kv_b_proj = self.all_to_sharded_linear(
layer.self_attn.kv_b_proj
)
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
layer.self_attn.num_heads //= self.N
# Logic from upstream mlx
num_heads = layer.self_attn.num_heads
sh = self.group.rank() * num_heads
eh = sh + num_heads
def shard_heads(w: mx.array, sh: int = sh, eh: int = eh) -> mx.array:
return w[sh:eh]
layer.self_attn.embed_q.apply(shard_heads)
layer.self_attn.unembed_out.apply(shard_heads)
# Shard the MLP
if isinstance(layer.mlp, (DeepseekV3MLP, DeepseekV32MLP)):
layer.mlp.gate_proj = self.all_to_sharded_linear(layer.mlp.gate_proj)
@@ -549,7 +524,7 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj)
self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj)
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj)
layer.mlp = ShardedMoE(layer.mlp) # type: ignore
layer.mlp = ShardedDeepseekV3MoE(layer.mlp) # type: ignore
layer.mlp.sharding_group = self.group
mx.eval(layer)
@@ -557,9 +532,7 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
return model
class ShardedMoE(CustomMlxLayer):
"""Wraps any MoE layer with distributed sum_gradients / all_sum."""
class ShardedDeepseekV3MoE(CustomMlxLayer):
def __init__(self, layer: _LayerCallable):
super().__init__(layer)
self.sharding_group: mx.distributed.Group | None = None
@@ -630,89 +603,25 @@ class GLM4MoeLiteShardingStrategy(TensorParallelShardingStrategy):
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj)
self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj)
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj)
layer.mlp = ShardedMoE(layer.mlp) # type: ignore
layer.mlp = ShardedGLM4MoeLiteMoE(layer.mlp) # type: ignore
layer.mlp.sharding_group = self.group # type: ignore
mx.eval(layer)
return model
class WrappedMiniMaxAttention(CustomMlxLayer):
def __init__(self, layer: _LayerCallable, group: mx.distributed.Group):
class ShardedGLM4MoeLiteMoE(CustomMlxLayer):
def __init__(self, layer: _LayerCallable):
super().__init__(layer)
self.group = group
self.sharding_group: mx.distributed.Group | None = None
def __call__(
self,
x: mx.array,
mask: mx.array | None = None,
cache: "Cache | None" = None,
) -> mx.array:
batch_dim, seq_dim, _ = x.shape
self._original_layer = cast(MiniMaxAttention, self.original_layer) # type: ignore
queries: mx.array = self._original_layer.q_proj(x)
keys: mx.array = self._original_layer.k_proj(x)
values: mx.array = self._original_layer.v_proj(x)
if getattr(self, "use_qk_norm", False):
q_dim = queries.shape[-1]
k_dim = keys.shape[-1]
n = self.group.size()
qk = mx.concatenate(
[queries, keys], axis=-1
) # (batch_dim, seq_dim, q_dim + k_dim)
qk = mx.distributed.all_gather(
qk, group=self.group
) # (n*batch_dim, seq_dim, q_dim + k_dim)
qk = qk.reshape(n, batch_dim, seq_dim, q_dim + k_dim).transpose(1, 2, 0, 3)
queries = qk[..., :q_dim].reshape(
batch_dim, seq_dim, -1
) # (batch_dim, seq_dim, n * q_dim)
keys = qk[..., q_dim:].reshape(
batch_dim, seq_dim, -1
) # (batch_dim, seq_dim, n * k_dim)
queries = self._original_layer.q_norm(queries)
keys = self._original_layer.k_norm(keys)
# Split back and take this rank's portion
queries = mx.split(queries, n, axis=-1)[self.group.rank()]
keys = mx.split(keys, n, axis=-1)[self.group.rank()]
queries = queries.reshape(
batch_dim, seq_dim, self._original_layer.num_attention_heads, -1
).transpose(0, 2, 1, 3)
keys = keys.reshape(
batch_dim, seq_dim, self._original_layer.num_key_value_heads, -1
).transpose(0, 2, 1, 3)
values = values.reshape(
batch_dim, seq_dim, self._original_layer.num_key_value_heads, -1
).transpose(0, 2, 1, 3)
if cache is not None:
queries = self._original_layer.rope(queries, offset=cache.offset)
keys = self._original_layer.rope(keys, offset=cache.offset)
keys, values = cache.update_and_fetch(keys, values)
else:
queries = self._original_layer.rope(queries)
keys = self._original_layer.rope(keys)
output = scaled_dot_product_attention(
queries,
keys,
values,
cache=cache,
scale=self._original_layer.scale, # type: ignore
mask=mask,
)
output = output.transpose(0, 2, 1, 3).reshape(batch_dim, seq_dim, -1)
return self._original_layer.o_proj(output)
def __call__(self, x: mx.array) -> mx.array:
if self.sharding_group is not None:
x = sum_gradients(self.sharding_group)(x)
y = self.original_layer.__call__(x)
if self.sharding_group is not None:
y = mx.distributed.all_sum(y, group=self.sharding_group)
return y
class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
@@ -723,6 +632,7 @@ class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
on_timeout: TimeoutCallback | None,
) -> nn.Module:
model = cast(MiniMaxModel, model)
rank = self.group.rank()
for layer in model.layers:
eval_with_timeout(
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
@@ -733,11 +643,18 @@ class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
# Shard qk_norm weights if present (must match sharded head count)
if getattr(layer.self_attn, "use_qk_norm", False):
layer.self_attn.q_norm.weight = layer.self_attn.q_norm.weight.split( # type: ignore
self.N, axis=-1
)[rank]
layer.self_attn.k_norm.weight = layer.self_attn.k_norm.weight.split( # type: ignore
self.N, axis=-1
)[rank]
layer.self_attn.num_attention_heads //= self.N
layer.self_attn.num_key_value_heads //= self.N
layer.self_attn = WrappedMiniMaxAttention(layer.self_attn, self.group) # pyright: ignore[reportAttributeAccessIssue,reportArgumentType]
# Shard the MoE. Shard in place since the MoE should be responsible
# for aggregating the results.
self.all_to_sharded_linear_in_place(
@@ -749,7 +666,7 @@ class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
self.all_to_sharded_linear_in_place(
layer.block_sparse_moe.switch_mlp.up_proj
)
layer.block_sparse_moe = ShardedMoE(layer.block_sparse_moe) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.block_sparse_moe = ShardedQwenMoE(layer.block_sparse_moe) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.block_sparse_moe.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
mx.eval(layer)
return model
@@ -762,111 +679,28 @@ class QwenShardingStrategy(TensorParallelShardingStrategy):
timeout_seconds: float,
on_timeout: TimeoutCallback | None,
) -> nn.Module:
model = cast(Qwen3MoeModel | Qwen3NextModel, model)
model = cast(Qwen3MoeModel, model)
for layer in model.layers:
eval_with_timeout(
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
)
# Shard the self attention
if isinstance(layer, Qwen3DecoderLayer):
layer.self_attn.q_proj = self.all_to_sharded_linear(
layer.self_attn.q_proj
)
layer.self_attn.k_proj = self.all_to_sharded_linear(
layer.self_attn.k_proj
)
layer.self_attn.v_proj = self.all_to_sharded_linear(
layer.self_attn.v_proj
)
layer.self_attn.o_proj = self.sharded_to_all_linear(
layer.self_attn.o_proj
)
else:
assert isinstance(layer, Qwen3NextDecoderLayer)
if hasattr(layer, "linear_attn"):
linear_attn = layer.linear_attn
linear_attn.in_proj_qkvz = self.all_to_sharded_linear(
linear_attn.in_proj_qkvz
)
linear_attn.in_proj_ba = self.all_to_sharded_linear(
linear_attn.in_proj_ba
)
linear_attn.out_proj = self.sharded_to_all_linear(
linear_attn.out_proj
)
# Shard conv1d: depthwise conv with non-contiguous channel slicing.
# Channel layout is [q(key_dim), k(key_dim), v(value_dim)].
# Each rank takes its head-slice from each of the three sections.
rank = self.group.rank()
key_dim = linear_attn.key_dim
value_dim = linear_attn.value_dim
key_dim_shard = key_dim // self.N
value_dim_shard = value_dim // self.N
q_idx = mx.arange(rank * key_dim_shard, (rank + 1) * key_dim_shard)
k_idx = mx.arange(
key_dim + rank * key_dim_shard,
key_dim + (rank + 1) * key_dim_shard,
)
v_idx = mx.arange(
2 * key_dim + rank * value_dim_shard,
2 * key_dim + (rank + 1) * value_dim_shard,
)
conv_indices = mx.concatenate([q_idx, k_idx, v_idx])
linear_attn.conv1d.weight = linear_attn.conv1d.weight[conv_indices]
new_conv_dim = key_dim_shard * 2 + value_dim_shard
linear_attn.conv1d.groups = new_conv_dim
num_v_shard = linear_attn.num_v_heads // self.N
v_start = rank * num_v_shard
v_end = v_start + num_v_shard
linear_attn.A_log = linear_attn.A_log[v_start:v_end]
linear_attn.dt_bias = linear_attn.dt_bias[v_start:v_end]
linear_attn.num_k_heads //= self.N
linear_attn.num_v_heads //= self.N
linear_attn.key_dim = (
linear_attn.head_k_dim * linear_attn.num_k_heads
)
linear_attn.value_dim = (
linear_attn.head_v_dim * linear_attn.num_v_heads
)
linear_attn.conv_dim = (
linear_attn.key_dim * 2 + linear_attn.value_dim
)
else:
layer.self_attn.q_proj = self.all_to_sharded_linear(
layer.self_attn.q_proj
)
layer.self_attn.k_proj = self.all_to_sharded_linear(
layer.self_attn.k_proj
)
layer.self_attn.v_proj = self.all_to_sharded_linear(
layer.self_attn.v_proj
)
layer.self_attn.o_proj = self.sharded_to_all_linear(
layer.self_attn.o_proj
)
layer.self_attn.num_attention_heads //= self.N
layer.self_attn.num_key_value_heads //= self.N
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
layer.self_attn.n_heads //= self.N
layer.self_attn.n_kv_heads //= self.N
# Shard the MoE. Shard in place since the MoE should be responsible
# for aggregating the results.
if isinstance(layer.mlp, (Qwen3MoeSparseMoeBlock, Qwen3NextSparseMoeBlock)):
if isinstance(
layer.mlp, (Qwen3MoeSparseMoeBlock, MoE, Qwen3NextSparseMoeBlock)
):
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj)
self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj)
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj)
if isinstance(layer.mlp, Qwen3NextSparseMoeBlock):
self.all_to_sharded_linear_in_place(
layer.mlp.shared_expert.gate_proj
)
self.sharded_to_all_linear_in_place(
layer.mlp.shared_expert.down_proj
)
self.all_to_sharded_linear_in_place(layer.mlp.shared_expert.up_proj)
layer.mlp = ShardedMoE(layer.mlp) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.mlp = ShardedQwenMoE(layer.mlp) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.mlp.sharding_group = self.group
# Shard the MLP
@@ -879,50 +713,18 @@ class QwenShardingStrategy(TensorParallelShardingStrategy):
return model
class Glm4MoeShardingStrategy(TensorParallelShardingStrategy):
def shard_model(
self,
model: nn.Module,
timeout_seconds: float,
on_timeout: TimeoutCallback | None,
) -> nn.Module:
model = cast(Glm4MoeModel, model)
for layer in model.layers:
eval_with_timeout(
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
)
class ShardedQwenMoE(CustomMlxLayer):
def __init__(self, layer: _LayerCallable):
super().__init__(layer)
self.sharding_group: mx.distributed.Group | None = None
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
layer.self_attn.n_heads //= self.N
layer.self_attn.n_kv_heads //= self.N
if isinstance(layer.mlp, MoE):
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj)
self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj)
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj)
if getattr(layer.mlp, "shared_experts", None) is not None:
self.all_to_sharded_linear_in_place(
layer.mlp.shared_experts.gate_proj
)
self.sharded_to_all_linear_in_place(
layer.mlp.shared_experts.down_proj
)
self.all_to_sharded_linear_in_place(
layer.mlp.shared_experts.up_proj
)
layer.mlp = ShardedMoE(layer.mlp) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.mlp.sharding_group = self.group
else:
layer.mlp.gate_proj = self.all_to_sharded_linear(layer.mlp.gate_proj)
layer.mlp.down_proj = self.sharded_to_all_linear(layer.mlp.down_proj)
layer.mlp.up_proj = self.all_to_sharded_linear(layer.mlp.up_proj)
mx.eval(layer)
return model
def __call__(self, x: mx.array) -> mx.array:
if self.sharding_group is not None:
x = sum_gradients(self.sharding_group)(x)
y = self.original_layer.__call__(x)
if self.sharding_group is not None:
y = mx.distributed.all_sum(y, group=self.sharding_group)
return y
class GptOssShardingStrategy(TensorParallelShardingStrategy):
@@ -960,7 +762,21 @@ class GptOssShardingStrategy(TensorParallelShardingStrategy):
self.sharded_to_all_linear_in_place(layer.mlp.experts.down_proj)
self.all_to_sharded_linear_in_place(layer.mlp.experts.up_proj)
layer.mlp = ShardedMoE(layer.mlp) # type: ignore
layer.mlp = ShardedGptOssMoE(layer.mlp) # type: ignore
layer.mlp.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
mx.eval(layer)
return model
class ShardedGptOssMoE(CustomMlxLayer):
def __init__(self, layer: nn.Module):
super().__init__(layer)
self.sharding_group: mx.distributed.Group | None = None
def __call__(self, x: mx.array) -> mx.array:
if self.sharding_group is not None:
x = sum_gradients(self.sharding_group)(x)
y = self.original_layer(x)
if self.sharding_group is not None:
y = mx.distributed.all_sum(y, group=self.sharding_group)
return y

View File

@@ -1,14 +1,16 @@
import os
from copy import deepcopy
from typing import Any, cast
import mlx.core as mx
import psutil
from mlx_lm.models.cache import (
ArraysCache,
KVCache,
QuantizedKVCache,
RotatingKVCache,
trim_prompt_cache,
)
from mlx_lm.models.gpt_oss import Model as GptOssModel
from mlx_lm.tokenizer_utils import TokenizerWrapper
from exo.shared.types.memory import Memory
@@ -24,119 +26,51 @@ _MEMORY_THRESHOLD = float(
)
class CacheSnapshot:
"""Snapshot of states at a known token position."""
def __init__(
self, states: list[RotatingKVCache | ArraysCache | None], token_count: int
):
self.states = states
self.token_count = token_count
def snapshot_ssm_states(cache: KVCacheType) -> CacheSnapshot:
states: list[ArraysCache | RotatingKVCache | None] = []
for c in cache:
if isinstance(c, (ArraysCache, RotatingKVCache)):
states.append(deepcopy(c))
else:
states.append(None)
token_count = cache_length(cache)
return CacheSnapshot(states=states, token_count=token_count)
def _find_nearest_snapshot(
snapshots: list[CacheSnapshot],
target_token_count: int,
) -> CacheSnapshot | None:
best: CacheSnapshot | None = None
for snap in snapshots:
if snap.token_count <= target_token_count and (
best is None or snap.token_count > best.token_count
):
best = snap
return best
def has_non_kv_caches(cache: KVCacheType) -> bool:
"""Check if a cache contains any ArraysCache (SSM) entries."""
return any(isinstance(c, (ArraysCache, RotatingKVCache)) for c in cache)
class KVPrefixCache:
def __init__(self, group: mx.distributed.Group | None = None):
def __init__(
self, tokenizer: TokenizerWrapper, group: mx.distributed.Group | None = None
):
self.prompts: list[mx.array] = [] # mx array of tokens (ints)
self.caches: list[KVCacheType] = []
self._snapshots: list[list[CacheSnapshot] | None] = []
self._last_used: list[int] = [] # monotonic counter of last access per entry
self._access_counter: int = 0
self._tokenizer: TokenizerWrapper = tokenizer
self._group = group
def clear(self):
"""Clear all cached prompts and caches."""
self.prompts.clear()
self.caches.clear()
self._snapshots.clear()
self._last_used.clear()
def add_kv_cache(
self,
prompt_tokens: mx.array,
cache: KVCacheType,
ssm_snapshots: list[CacheSnapshot] | None = None,
):
def add_kv_cache(self, prompt: str, cache: KVCacheType):
"""Add a new cache entry. Evicts LRU entries if memory is high."""
self._evict_if_needed()
self.prompts.append(prompt_tokens)
tokenized_prompt = encode_prompt(self._tokenizer, prompt)
self.prompts.append(tokenized_prompt)
self.caches.append(deepcopy(cache))
self._snapshots.append(ssm_snapshots)
self._access_counter += 1
self._last_used.append(self._access_counter)
logger.info(f"KV cache added: {len(prompt_tokens)} tokens")
logger.info(f"KV cache added: {len(tokenized_prompt)} tokens")
def update_kv_cache(
self,
index: int,
prompt_tokens: mx.array,
prompt: str,
cache: KVCacheType,
snapshots: list[CacheSnapshot] | None,
restore_pos: int,
):
"""Update an existing cache entry in-place."""
old_snapshots = self._snapshots[index]
merged: list[CacheSnapshot] = []
if old_snapshots:
merged = [s for s in old_snapshots if s.token_count <= restore_pos]
if snapshots:
merged.extend(snapshots)
self.prompts[index] = prompt_tokens
tokenized_prompt = encode_prompt(self._tokenizer, prompt)
self.prompts[index] = tokenized_prompt
self.caches[index] = deepcopy(cache)
self._snapshots[index] = merged or None
self._access_counter += 1
self._last_used[index] = self._access_counter
logger.info(f"KV cache updated (index {index}): {len(prompt_tokens)} tokens")
def _get_snapshot(
self, entry_index: int, target_token_count: int
) -> tuple[int, CacheSnapshot | None]:
if not has_non_kv_caches(self.caches[entry_index]):
return target_token_count, None
snapshots = self._snapshots[entry_index]
if not snapshots:
return 0, None
snap = _find_nearest_snapshot(snapshots, target_token_count)
if snap is not None:
return snap.token_count, snap
return 0, None
logger.info(f"KV cache updated (index {index}): {len(tokenized_prompt)} tokens")
def get_kv_cache(
self,
model: Model,
prompt_tokens: mx.array,
prompt: str,
) -> tuple[KVCacheType, mx.array, int | None]:
"""Get KV cache for prompt, returning remaining tokens to prefill.
@@ -145,71 +79,76 @@ class KVPrefixCache:
- cache: KV cache to use for generation
- remaining_tokens: tokens that still need prefilling
- matched_index: index of the matched entry (None if no match)
For models with SSM layers (which are ArraysCache in mlx), the cache is trimmed to the
nearest SSM snapshot position at or before the match point for correctness.
Same for rotating KV Cache.
"""
max_length = len(prompt_tokens)
tokenized_prompt = encode_prompt(self._tokenizer, prompt)
max_length = len(tokenized_prompt)
best_index: int | None = None
best_length = 0
is_exact = False
best_snapshot_index, best_snapshot_length = None, 0
# Find best cache
for i, cached_prompt in enumerate(self.prompts):
length = get_prefix_length(prompt_tokens, cached_prompt)
if length > best_length:
best_index, best_length = i, length
length = get_prefix_length(tokenized_prompt, cached_prompt)
if length == max_length:
is_exact = True
best_index, best_length = i, length
break
# Exact match - cached prompt starts with our entire prompt
# Trim cache to prompt length - 1, return last token for stream_generate
prompt_cache = deepcopy(self.caches[i])
cached_length = cache_length(self.caches[i])
tokens_to_trim = cached_length - (max_length - 1)
if tokens_to_trim > 0:
trim_prompt_cache(cast(list[Any], prompt_cache), tokens_to_trim)
self._access_counter += 1
self._last_used[i] = self._access_counter
logger.info(f"KV cache exact match: {max_length} tokens (instant)")
return prompt_cache, tokenized_prompt[-1:], i
if best_index is None:
return make_kv_cache(model), prompt_tokens, None
if length > best_snapshot_length:
best_snapshot_index, best_snapshot_length = i, length
# For exact match: trim to max_length-1 so remaining has the last token
# For partial match: trim to best_length, remaining has suffix to prefill
# This ensures stream_generate always has at least one token to start with
target = (max_length - 1) if is_exact else best_length
restore_pos, restore_snap = self._get_snapshot(best_index, target)
if best_snapshot_index is not None:
new_tokens = max_length - best_snapshot_length
logger.info(
f"KV cache prefix match: {best_snapshot_length}/{max_length} tokens "
f"(reusing {best_snapshot_length}, need to prefill {new_tokens})"
)
# No usable snapshot — need fresh cache
if restore_snap is None and has_non_kv_caches(self.caches[best_index]):
return make_kv_cache(model), prompt_tokens, None
prompt_cache = deepcopy(self.caches[best_snapshot_index])
prompt_cache = deepcopy(self.caches[best_index])
cached_length = cache_length(self.caches[best_index])
tokens_to_trim = cached_length - restore_pos
if tokens_to_trim > 0:
trim_cache(prompt_cache, tokens_to_trim, restore_snap)
# Reset cache offset to match trimmed length
for c in prompt_cache:
if hasattr(c, "offset"):
c.offset = restore_pos
# Trim removes tokens from the end, so we trim (cached_length - prefix_length) to keep the prefix
cached_length = cache_length(self.caches[best_snapshot_index])
tokens_to_trim = cached_length - best_snapshot_length
if tokens_to_trim > 0:
trim_prompt_cache(cast(list[Any], prompt_cache), tokens_to_trim)
self._access_counter += 1
self._last_used[best_index] = self._access_counter
remaining = prompt_tokens[restore_pos:]
self._access_counter += 1
self._last_used[best_snapshot_index] = self._access_counter
remaining_tokens = tokenized_prompt[best_snapshot_length:]
return prompt_cache, remaining_tokens, best_snapshot_index
return prompt_cache, remaining, best_index
else:
prompt_cache = make_kv_cache(model)
if len(self.prompts) == 0:
logger.info(f"KV cache empty, need to prefill {max_length} tokens")
else:
logger.info(
f"KV cache no prefix match, need to prefill {max_length} tokens"
)
return prompt_cache, tokenized_prompt, None
def _evict_if_needed(self):
"""Evict least recently used entries while memory usage is high."""
if len(self.caches) == 0:
return
# Evict LRU entries until below threshold
# Evict LRU entries until below threshold or only one entry left
while (
len(self.caches) > 0
len(self.caches) > 1
and self.get_memory_used_percentage() > _MEMORY_THRESHOLD
):
lru_index = self._last_used.index(min(self._last_used))
evicted_tokens = len(self.prompts[lru_index])
self.prompts.pop(lru_index)
self.caches.pop(lru_index)
self._snapshots.pop(lru_index)
self._last_used.pop(lru_index)
logger.info(
f"KV cache evicted LRU entry ({evicted_tokens} tokens) due to memory usage"
@@ -230,21 +169,6 @@ class KVPrefixCache:
return max_pressure
def trim_cache(
cache: KVCacheType,
num_tokens: int,
snapshot: CacheSnapshot | None = None,
) -> None:
for i, c in enumerate(cache):
if isinstance(c, (ArraysCache, RotatingKVCache)):
if snapshot is not None and snapshot.states[i] is not None:
cache[i] = deepcopy(snapshot.states[i]) # type: ignore
else:
c.state = [None] * len(c.state) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
else:
c.trim(num_tokens) # pyright: ignore[reportUnknownMemberType]
def encode_prompt(tokenizer: TokenizerWrapper, prompt: str) -> mx.array:
"""Encode a prompt string to token array.
@@ -253,14 +177,14 @@ def encode_prompt(tokenizer: TokenizerWrapper, prompt: str) -> mx.array:
that would corrupt the prompt structure.
"""
# Chat templates define their own structure - don't add BOS/EOS
prompt_tokens = tokenizer.encode(prompt, add_special_tokens=False)
return mx.array(prompt_tokens)
tokenized_prompt = tokenizer.encode(prompt, add_special_tokens=False)
return mx.array(tokenized_prompt)
def cache_length(cache: KVCacheType) -> int:
"""Get the number of tokens in a KV cache."""
# Use .offset attribute which KVCache types have (len() not implemented in older QuantizedKVCache).
return max(getattr(c, "offset", 0) for c in cache)
# Use .offset attribute which all cache types have (len() not implemented in older QuantizedKVCache)
return max(c.offset for c in cache) # type: ignore
def get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
@@ -290,7 +214,8 @@ def make_kv_cache(
) -> KVCacheType:
assert hasattr(model, "layers")
if hasattr(model, "make_cache"):
# TODO: Do this for all models
if hasattr(model, "make_cache") and isinstance(model, GptOssModel):
logger.info("Using MLX LM's make cache")
return model.make_cache() # type: ignore

View File

@@ -1,10 +1,9 @@
import time
from copy import deepcopy
from typing import Callable, Generator, cast, get_args
from typing import Any, Callable, Generator, cast, get_args
import mlx.core as mx
from mlx_lm.generate import stream_generate
from mlx_lm.models.cache import ArraysCache, RotatingKVCache
from mlx_lm.models.cache import trim_prompt_cache
from mlx_lm.sample_utils import make_sampler
from mlx_lm.tokenizer_utils import TokenizerWrapper
@@ -24,14 +23,7 @@ from exo.shared.types.worker.runner_response import (
GenerationResponse,
)
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.cache import (
CacheSnapshot,
KVPrefixCache,
encode_prompt,
has_non_kv_caches,
make_kv_cache,
snapshot_ssm_states,
)
from exo.worker.engines.mlx.cache import KVPrefixCache, encode_prompt, make_kv_cache
from exo.worker.engines.mlx.constants import (
DEFAULT_TOP_LOGPROBS,
KV_BITS,
@@ -40,7 +32,6 @@ from exo.worker.engines.mlx.constants import (
)
from exo.worker.engines.mlx.utils_mlx import (
apply_chat_template,
fix_unmatched_think_end_tokens,
mx_barrier,
)
from exo.worker.runner.bootstrap import logger
@@ -56,7 +47,7 @@ def prefill(
sampler: Callable[[mx.array], mx.array],
prompt_tokens: mx.array,
cache: KVCacheType,
) -> tuple[float, int, list[CacheSnapshot]]:
) -> tuple[float, int]:
"""Prefill the KV cache with prompt tokens.
This runs the model over the prompt tokens to populate the cache,
@@ -67,21 +58,17 @@ def prefill(
"""
num_tokens = len(prompt_tokens)
if num_tokens == 0:
return 0.0, 0, []
return 0.0, 0
logger.debug(f"Prefilling {num_tokens} tokens...")
start_time = time.perf_counter()
has_ssm = has_non_kv_caches(cache)
snapshots: list[CacheSnapshot] = []
def progress_callback(processed: int, total: int) -> None:
elapsed = time.perf_counter() - start_time
elapsed = time.time() - start_time
tok_per_sec = processed / elapsed if elapsed > 0 else 0
logger.debug(
f"Prefill progress: {processed}/{total} tokens ({tok_per_sec:.1f} tok/s)"
)
if has_ssm:
snapshots.append(snapshot_ssm_states(cache))
# Use max_tokens=1 because max_tokens=0 does not work.
# We just throw away the generated token - we only care about filling the cache
@@ -98,18 +85,7 @@ def prefill(
prompt_progress_callback=progress_callback,
):
break # Stop after first iteration - cache is now filled
# stream_generate added 1 extra generated token to the cache, so we should trim it.
# Because of needing to roll back arrays cache, we will generate on 2 tokens so trim 1 more.
pre_gen = deepcopy(snapshots[-2]) if has_ssm else None
for i, c in enumerate(cache):
if has_ssm and isinstance(c, (ArraysCache, RotatingKVCache)):
assert pre_gen is not None
if pre_gen.states[i] is not None:
cache[i] = deepcopy(pre_gen.states[i]) # type: ignore
else:
assert not isinstance(c, (ArraysCache, RotatingKVCache))
c.trim(2) # pyright: ignore[reportUnknownMemberType]
trim_prompt_cache(cast(list[Any], cache), 1)
elapsed = time.perf_counter() - start_time
tokens_per_sec = num_tokens / elapsed if elapsed > 0 else 0.0
@@ -117,14 +93,12 @@ def prefill(
f"Prefill complete: {num_tokens} tokens in {elapsed:.2f}s "
f"({tokens_per_sec:.1f} tok/s)"
)
# Exclude the last snapshot
return tokens_per_sec, num_tokens, snapshots[:-1] if snapshots else []
return tokens_per_sec, num_tokens
def warmup_inference(
model: Model,
tokenizer: TokenizerWrapper,
group: mx.distributed.Group | None = None,
) -> int:
content = "Prompt to warm up the inference engine. Repeat this."
@@ -143,9 +117,7 @@ def warmup_inference(
)
# Use a default sampler for warmup
sampler = make_sampler(temp=0.0)
mx_barrier(group)
sampler = make_sampler(temp=0.7)
logger.info("Generating warmup tokens")
for _r in stream_generate(
@@ -164,7 +136,9 @@ def warmup_inference(
logger.info("Generated ALL warmup tokens")
mx_barrier(group)
# TODO: Do we want an mx_barrier?
# At least this version is actively incorrect, as it should use mx_barrier(group)
mx_barrier()
return tokens_generated
@@ -247,17 +221,11 @@ def mlx_generate(
task: TextGenerationTaskParams,
prompt: str,
kv_prefix_cache: KVPrefixCache | None = None,
group: mx.distributed.Group | None = None,
) -> Generator[GenerationResponse]:
# Ensure that generation stats only contains peak memory for this generation
mx.reset_peak_memory()
# TODO: Randomise task seed and set in taskparams, instead of hard coding as 42.
seed = task.seed or 42
mx.random.seed(seed)
# Encode prompt once at the top and fix unmatched think tags
all_prompt_tokens = encode_prompt(tokenizer, prompt)
all_prompt_tokens = fix_unmatched_think_end_tokens(all_prompt_tokens, tokenizer)
if task.seed is not None:
mx.random.seed(task.seed)
# Do not use the prefix cache if we are trying to do benchmarks.
is_bench = task.bench
@@ -269,16 +237,13 @@ def mlx_generate(
matched_index: int | None = None
if kv_prefix_cache is None:
caches = make_kv_cache(model=model)
prompt_tokens = all_prompt_tokens
prompt_tokens = encode_prompt(tokenizer, prompt)
else:
caches, prompt_tokens, matched_index = kv_prefix_cache.get_kv_cache(
model, all_prompt_tokens
model, prompt
)
all_prompt_tokens = encode_prompt(tokenizer, prompt)
prefix_hit_length = len(all_prompt_tokens) - len(prompt_tokens)
if prefix_hit_length > 0:
logger.info(
f"KV cache hit: {prefix_hit_length}/{len(all_prompt_tokens)} tokens cached ({100 * prefix_hit_length / len(all_prompt_tokens):.1f}%)"
)
logits_processors: list[Callable[[mx.array, mx.array], mx.array]] = []
if is_bench:
@@ -300,21 +265,13 @@ def mlx_generate(
)
max_stop_len = max((len(s) for s in stop_sequences), default=0)
mx_barrier(group)
logger.info("Ready to prefill")
# Prefill cache with all tokens except the last one
prefill_tps, prefill_tokens, ssm_snapshots_list = prefill(
model,
tokenizer,
sampler,
prompt_tokens[:-1],
caches,
prefill_tps, prefill_tokens = prefill(
model, tokenizer, sampler, prompt_tokens[:-1], caches
)
cache_snapshots: list[CacheSnapshot] | None = ssm_snapshots_list or None
# stream_generate starts from the last token
last_token = prompt_tokens[-2:]
last_token = prompt_tokens[-1:]
max_tokens = task.max_output_tokens or MAX_TOKENS
accumulated_text = ""
@@ -342,6 +299,7 @@ def mlx_generate(
start=1,
):
generated_text_parts.append(out.text)
logger.info(out.text)
accumulated_text += out.text
if think_start is not None and out.text == think_start:
@@ -409,6 +367,16 @@ def mlx_generate(
selected_token=out.token,
)
yield GenerationResponse(
text=text,
token=out.token,
logprob=logprob,
top_logprobs=top_logprobs,
finish_reason=finish_reason,
stats=stats,
usage=usage,
)
if is_done:
# Log generation stats
generation_elapsed = time.perf_counter() - generation_start_time
@@ -422,44 +390,18 @@ def mlx_generate(
f"{generation_tps:.1f} tok/s"
)
if kv_prefix_cache is not None:
generated_tokens_array = mx.array(
tokenizer.encode(
"".join(generated_text_parts), add_special_tokens=False
)
)
full_prompt_tokens = mx.concatenate(
[all_prompt_tokens, generated_tokens_array]
)
full_prompt = prompt + "".join(generated_text_parts)
if (
matched_index is not None
and prefix_hit_length >= _MIN_PREFIX_HIT_TO_UPDATE
):
kv_prefix_cache.update_kv_cache(
matched_index,
full_prompt_tokens,
caches,
cache_snapshots,
restore_pos=prefix_hit_length,
)
kv_prefix_cache.update_kv_cache(matched_index, full_prompt, caches)
else:
kv_prefix_cache.add_kv_cache(
full_prompt_tokens, caches, cache_snapshots
)
yield GenerationResponse(
text=text,
token=out.token,
logprob=logprob,
top_logprobs=top_logprobs,
finish_reason=finish_reason,
stats=stats,
usage=usage,
)
if is_done:
mx_barrier(group)
kv_prefix_cache.add_kv_cache(full_prompt, caches)
break
# Limit accumulated_text to what's needed for stop sequence detection
if max_stop_len > 0 and len(accumulated_text) > max_stop_len:
accumulated_text = accumulated_text[-max_stop_len:]
# TODO: Do we want an mx_barrier?

View File

@@ -490,30 +490,6 @@ def detect_thinking_prompt_suffix(prompt: str, tokenizer: TokenizerWrapper) -> b
return think_token is not None and prompt.rstrip().endswith(think_token)
def fix_unmatched_think_end_tokens(
tokens: mx.array, tokenizer: TokenizerWrapper
) -> mx.array:
if not tokenizer.has_thinking:
return tokens
assert tokenizer.think_start_id
assert tokenizer.think_end_id
think_start_id: int = tokenizer.think_start_id
think_end_id: int = tokenizer.think_end_id
token_list: list[int] = cast(list[int], tokens.tolist())
result: list[int] = []
depth = 0
for token in token_list:
if token == think_start_id:
depth += 1
elif token == think_end_id:
if depth == 0:
result.append(think_start_id)
else:
depth -= 1
result.append(token)
return mx.array(result)
class NullKVCache(KVCache):
"""
A KVCache that pretends to exist but holds zero tokens.

View File

@@ -193,7 +193,7 @@ def main(
logger.info(
f"model has_tool_calling={tokenizer.has_tool_calling}"
)
kv_prefix_cache = KVPrefixCache(group)
kv_prefix_cache = KVPrefixCache(tokenizer, group)
elif (
ModelTask.TextToImage in shard_metadata.model_card.tasks
@@ -226,7 +226,6 @@ def main(
toks = warmup_inference(
model=model,
tokenizer=tokenizer,
group=group,
# kv_prefix_cache=kv_prefix_cache, # supply for warmup-time prefix caching
)
logger.info(f"warmed up by generating {toks} tokens")
@@ -275,7 +274,6 @@ def main(
task=task_params,
prompt=prompt,
kv_prefix_cache=kv_prefix_cache,
group=group,
)
# For other thinking models (GLM, etc.), check if we need to
@@ -629,7 +627,7 @@ def parse_thinking_models(
yield response.model_copy(
update={
"text": tokenizer.think_start,
"token": tokenizer.think_start_id,
"token": tokenizer.think_start_id, # type: ignore
}
)
yield response
@@ -810,9 +808,8 @@ def patch_kimi_tokenizer(tokenizer: TokenizerWrapper):
# kimi has a fixed function naming scheme, with a json formatted arg
# functions.multiply:0 <|tool_call_argument_begin|> {"a": 2, "b": 3}
# Also needs to handle tools like call_0<|tool_call_argument_begin|>{"filePath": "..."}
_func_name_regex = re.compile(
r"^\s*(.+)[:](\d+)\s*<\|tool_call_argument_begin\|>", re.DOTALL
r"^\s*(.+):\d+\s*<\|tool_call_argument_begin\|>", re.DOTALL
)
_func_arg_regex = re.compile(r"<\|tool_call_argument_begin\|>\s*(.*)\s*", re.DOTALL)
@@ -836,10 +833,9 @@ def patch_kimi_tokenizer(tokenizer: TokenizerWrapper):
func_name_match = _func_name_regex.search(text)
if func_name_match is None:
raise ValueError(f"Could not parse function name from tool call: {text!r}")
original_func_name = func_name_match.group(1)
tool_id = func_name_match.group(2)
func_name = func_name_match.group(1)
# strip off the `functions.` prefix, if it exists.
func_name = original_func_name[original_func_name.find(".") + 1 :]
func_name = func_name[func_name.find(".") + 1 :]
func_args_match = _func_arg_regex.search(text)
if func_args_match is None:
@@ -848,11 +844,7 @@ def patch_kimi_tokenizer(tokenizer: TokenizerWrapper):
# the args should be valid json - no need to check against our tools to deserialize
arg_dct = _deserialize(func_args) # pyright: ignore[reportAny]
return dict(
id=f"{original_func_name}:{tool_id}",
name=func_name,
arguments=arg_dct, # pyright: ignore[reportAny]
)
return dict(name=func_name, arguments=arg_dct) # pyright: ignore[reportAny]
tokenizer._tool_call_start = tool_call_start
tokenizer._tool_call_end = tool_call_end
@@ -935,13 +927,7 @@ def _validate_single_tool(obj: dict[str, Any]) -> ToolCallItem:
and ((args := obj.get("arguments")) is not None)
and isinstance(name, str)
):
raw_id: object = obj.get("id")
extra = {"id": str(raw_id)} if raw_id is not None else {}
return ToolCallItem(
**extra,
name=name,
arguments=json.dumps(args),
)
return ToolCallItem(name=name, arguments=json.dumps(args))
else:
raise ValidationError

View File

@@ -88,12 +88,12 @@ class TestKVPrefix:
return tokenizer
def test_starts_empty(self, mock_tokenizer):
cache = KVPrefixCache()
cache = KVPrefixCache(mock_tokenizer)
assert len(cache.prompts) == 0
assert len(cache.caches) == 0
def test_clear_empties_cache(self, mock_tokenizer):
cache = KVPrefixCache()
cache = KVPrefixCache(mock_tokenizer)
cache.prompts.append(mx.array([1, 2, 3]))
cache.caches.append([KVCache()])
cache.clear()
@@ -101,7 +101,7 @@ class TestKVPrefix:
assert len(cache.caches) == 0
def test_clear_on_empty_cache(self, mock_tokenizer):
cache = KVPrefixCache()
cache = KVPrefixCache(mock_tokenizer)
cache.clear()
assert len(cache.prompts) == 0
@@ -142,12 +142,10 @@ class TestKVPrefixCacheWithModel:
tokens = encode_prompt(tokenizer, prompt)
cache = make_kv_cache(model)
_, _, snapshots = prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
# Cache should now hold the prompt tokens minus one
assert cache_length(cache) == len(tokens) - 1
# Snapshots should be available for models with non-KV caches
assert len(snapshots) > 0
# Cache should now hold the prompt tokens
assert cache_length(cache) == len(tokens)
def test_add_and_get_exact_match(self, model_and_tokenizer):
model, tokenizer = model_and_tokenizer
@@ -161,10 +159,10 @@ class TestKVPrefixCacheWithModel:
tokens = encode_prompt(tokenizer, prompt)
cache = make_kv_cache(model)
_, _, snapshots = prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache.add_kv_cache(tokens, cache, snapshots)
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
assert len(kv_prefix_cache.prompts) == 1
stored_length = cache_length(kv_prefix_cache.caches[0])
@@ -172,7 +170,7 @@ class TestKVPrefixCacheWithModel:
# Retrieve with same prompt: exact match
result_cache, remaining_tokens, matched_index = kv_prefix_cache.get_kv_cache(
model, tokens
model, prompt
)
assert matched_index == 0
@@ -193,12 +191,10 @@ class TestKVPrefixCacheWithModel:
short_tokens = encode_prompt(tokenizer, short_prompt)
cache = make_kv_cache(model)
_, _, snapshots = prefill(
model, tokenizer, make_sampler(0.0), short_tokens, cache
)
prefill(model, tokenizer, make_sampler(0.0), short_tokens, cache)
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache.add_kv_cache(short_tokens, cache, snapshots)
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(short_prompt, cache)
# Query with longer prompt that shares the chat template prefix
long_task = TextGenerationTaskParams(
@@ -216,12 +212,13 @@ class TestKVPrefixCacheWithModel:
)
result_cache, remaining_tokens, matched_index = kv_prefix_cache.get_kv_cache(
model, long_tokens
model, long_prompt
)
assert matched_index == 0
# remaining_tokens covers from snapshot restore position to end
assert len(remaining_tokens) >= len(long_tokens) - expected_prefix
# remaining_tokens should be the suffix after the shared prefix
assert len(remaining_tokens) == len(long_tokens) - expected_prefix
assert mx.array_equal(remaining_tokens, long_tokens[expected_prefix:])
def test_stored_cache_not_mutated_after_get_and_generation(
self, model_and_tokenizer
@@ -238,15 +235,15 @@ class TestKVPrefixCacheWithModel:
tokens = encode_prompt(tokenizer, prompt)
cache = make_kv_cache(model)
_, _, snapshots = prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache.add_kv_cache(tokens, cache, snapshots)
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
stored_length = cache_length(kv_prefix_cache.caches[0])
# Get cache and mutate it (simulating what generation does)
result_cache, _, matched_index = kv_prefix_cache.get_kv_cache(model, tokens)
result_cache, _, matched_index = kv_prefix_cache.get_kv_cache(model, prompt)
assert matched_index == 0
# Simulate generation: feed many additional tokens through the cache
@@ -276,15 +273,15 @@ class TestKVPrefixCacheWithModel:
tokens = encode_prompt(tokenizer, prompt)
cache = make_kv_cache(model)
_, _, snapshots = prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache.add_kv_cache(tokens, cache, snapshots)
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
stored_length = cache_length(kv_prefix_cache.caches[0])
for i in range(3):
result_cache, _, _ = kv_prefix_cache.get_kv_cache(model, tokens)
result_cache, _, _ = kv_prefix_cache.get_kv_cache(model, prompt)
head_dim = result_cache[0].keys.shape[-1]
num_heads = result_cache[0].keys.shape[1]
@@ -301,7 +298,7 @@ class TestKVPrefixCacheWithModel:
"""mlx_generate should save the cache after generation completes."""
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache = KVPrefixCache(tokenizer)
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
input=[InputMessage(role="user", content="Hello")],
@@ -331,7 +328,7 @@ class TestKVPrefixCacheWithModel:
"""Second mlx_generate call with same prompt should get a prefix hit from stored cache."""
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache = KVPrefixCache(tokenizer)
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
input=[InputMessage(role="user", content="Reuse test")],
@@ -355,20 +352,20 @@ class TestKVPrefixCacheWithModel:
# Second call should find a prefix match (the stored cache contains
# prompt + generated tokens, which shares the prompt prefix)
result_cache, remaining_tokens, matched_index = kv_prefix_cache.get_kv_cache(
model, prompt_tokens
model, prompt
)
# The stored cache is longer than the prompt (it includes generated tokens),
# so this is a prefix match where our prompt is fully contained
assert matched_index == 0
# Exact match: remaining_tokens is just the last token and the one before
assert len(remaining_tokens) == 2
assert mx.array_equal(remaining_tokens, prompt_tokens[-2:])
# Exact match: remaining_tokens is just the last token
assert len(remaining_tokens) == 1
assert mx.array_equal(remaining_tokens, prompt_tokens[-1:])
def test_mlx_generate_long_prompt_updates_cache_in_place(self, model_and_tokenizer):
"""With a prompt > 1000 tokens, second generation should update the cache entry in-place."""
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache = KVPrefixCache(tokenizer)
# Build a long user message (> 1000 tokens) to exceed _MIN_PREFIX_HIT_TO_UPDATE
base_text = "The quick brown fox jumps over the lazy dog. "
@@ -447,7 +444,7 @@ class TestKVPrefixCacheWithModel:
"""After mlx_generate saves a cache, a second generation must not corrupt the stored copy."""
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache = KVPrefixCache(tokenizer)
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
input=[InputMessage(role="user", content="Immutable test")],
@@ -484,7 +481,7 @@ class TestKVPrefixCacheWithModel:
"""Under memory pressure, adding a new cache entry evicts the least recently used one."""
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache()
kv_prefix_cache = KVPrefixCache(tokenizer)
# Add three cache entries with different prompts
prompts = ["First entry", "Second entry", "Third entry"]
@@ -498,7 +495,7 @@ class TestKVPrefixCacheWithModel:
tokens = encode_prompt(tokenizer, prompt)
cache = make_kv_cache(model)
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
kv_prefix_cache.add_kv_cache(tokens, cache)
kv_prefix_cache.add_kv_cache(prompt, cache)
# Stagger _last_used so LRU order is deterministic
kv_prefix_cache._last_used[i] = float(i)
@@ -508,10 +505,19 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache._last_used[2] = 100.0
# Entry 0 (_last_used=0.0) is LRU, entry 1 (_last_used=1.0) is next
# Simulate memory pressure: return usage above _MEMORY_THRESHOLD (0.9)
with patch(
"exo.worker.engines.mlx.cache.get_memory_used_percentage",
return_value=0.95,
# Simulate memory pressure: active memory exceeds threshold
fake_limit = 1000
fake_active = int(fake_limit * 0.90) # Above _MEMORY_THRESHOLD (0.85)
with (
patch(
"exo.worker.engines.mlx.cache.mx.metal.get_active_memory",
return_value=fake_active,
),
patch(
"exo.worker.engines.mlx.cache.mx.metal.device_info",
return_value={"max_recommended_working_set_size": fake_limit},
),
):
# Trigger eviction by adding a new entry
task = TextGenerationTaskParams(
@@ -523,11 +529,14 @@ class TestKVPrefixCacheWithModel:
tokens = encode_prompt(tokenizer, prompt)
cache = make_kv_cache(model)
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
kv_prefix_cache.add_kv_cache(tokens, cache)
kv_prefix_cache.add_kv_cache(prompt, cache)
# LRU entries should have been evicted (entries 0, 1, 2 in order of _last_used)
# Since fake_active stays above threshold after each eviction (we don't change it),
# all old entries get evicted, leaving only the newly added one
assert len(kv_prefix_cache.prompts) == 1
# The surviving entry should be the newly added one
assert get_prefix_length(kv_prefix_cache.prompts[0], tokens) == len(tokens)
new_tokens = encode_prompt(tokenizer, prompt)
assert get_prefix_length(kv_prefix_cache.prompts[0], new_tokens) == len(
new_tokens
)

View File

@@ -34,7 +34,6 @@ TOKENIZER_FILE_PATTERNS = [
"added_tokens.json",
"tokenizer.model",
"tokenization_*.py", # Custom tokenizer implementations
"tool_declaration_ts.py", # Dependency of tokenization_kimi.py
]

View File

@@ -1,53 +0,0 @@
#!/usr/bin/env bash
[ $# -lt 1 ] && {
echo "Usage: $0 host1 [host2 ...]"
exit 1
}
[ -z "$(git status --porcelain)" ] || {
echo "Uncommitted changes"
exit 1
}
commit=$(git rev-parse HEAD)
git fetch -q origin
git branch -r --contains "$commit" | grep -qE '^\s*origin/' || {
echo "Not pushed to origin"
exit 1
}
hosts=("$@")
cleanup() {
for host in "${hosts[@]}"; do
ssh -T -o BatchMode=yes "$host@$host" "pkill -f bin/exo" &
done
sleep 1
jobs -pr | xargs -r kill 2>/dev/null || true
}
trap 'cleanup' EXIT INT TERM
for host; do
ssh -T -o BatchMode=yes -o ServerAliveInterval=30 "$host@$host" \
"EXO_LIBP2P_NAMESPACE=$commit /nix/var/nix/profiles/default/bin/nix build github:exo-explore/exo/$commit" &
done
wait
for host; do
ssh -T -o BatchMode=yes -o ServerAliveInterval=30 "$host@$host" \
"EXO_LIBP2P_NAMESPACE=$commit /nix/var/nix/profiles/default/bin/nix run github:exo-explore/exo/$commit" &>/dev/null &
done
for host; do
echo "Waiting for $host..." 1>&2
until curl -sf "http://$host:52415/models" &>/dev/null; do sleep 1; done
done
echo "Waiting 30s for cluster setup" 1>&2
sleep 30
echo "EXO loaded" 1>&2
bench_runner="${hosts[0]}"
mkdir -p "./bench/$commit"
nix run .#exo-get-all-models-on-cluster -- "$bench_runner" | while IFS= read -r model; do
echo "running bench for $model" 1>&2
ssh -Tn -o BatchMode=yes -o ServerAliveInterval=30 "$bench_runner@$bench_runner" "/nix/var/nix/profiles/default/bin/nix run github:exo-explore/exo/$commit#exo-bench -- --model $model --pp 128 4096 --tg 128 --stdout --skip-tensor-ring" >>"./bench/$commit/${model//\//--}.json"
echo
done

View File

@@ -1,36 +0,0 @@
#!/usr/bin/env python3
# pyright: reportAny=false
import json
import subprocess
import sys
from typing import Any, cast
from urllib.request import urlopen
h = sys.argv[1] if len(sys.argv) > 1 else sys.exit(f"USAGE: {sys.argv[0]} host")
ts = subprocess.run(
["tailscale", "status"], check=True, text=True, capture_output=True
).stdout.splitlines()
ip = next(
(sl[0] for line in ts if len(sl := line.split()) >= 2 if sl[1] == h), None
) or sys.exit(f"{h} not found in tailscale")
with urlopen(f"http://{ip}:52415/state", timeout=5) as r:
data = json.loads(r.read()).get("downloads", {})
def mid(x: dict[str, Any]) -> str | None:
for k in (
"DownloadCompleted",
"shardMetadata",
"PipelineShardMetadata",
"modelCard",
"modelId",
):
x = x.get(k, {})
return cast(str | None, x if x != {} else None)
common = set[str].intersection(
*[{m for d in nid if (m := mid(d))} for nid in data.values()]
)
for c in common:
print(c)

View File

@@ -1,377 +0,0 @@
#!/usr/bin/env python3
"""
Download an mflux model, quantize it, and upload to HuggingFace.
Usage (run from mflux project directory):
cd /path/to/mflux
uv run python /path/to/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev
uv run python /path/to/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev --skip-base --skip-8bit
uv run python /path/to/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev --dry-run
Requires:
- Must be run from mflux project directory using `uv run`
- huggingface_hub installed (add to mflux deps or install separately)
- HuggingFace authentication: run `huggingface-cli login` or set HF_TOKEN
"""
from __future__ import annotations
import argparse
import re
import shutil
import sys
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mflux.models.flux.variants.txt2img.flux import Flux1
HF_ORG = "exolabs"
def get_model_class(model_name: str) -> type:
"""Get the appropriate model class based on model name."""
from mflux.models.fibo.variants.txt2img.fibo import FIBO
from mflux.models.flux.variants.txt2img.flux import Flux1
from mflux.models.flux2.variants.txt2img.flux2_klein import Flux2Klein
from mflux.models.qwen.variants.txt2img.qwen_image import QwenImage
from mflux.models.z_image.variants.turbo.z_image_turbo import ZImageTurbo
model_name_lower = model_name.lower()
if "qwen" in model_name_lower:
return QwenImage
elif "fibo" in model_name_lower:
return FIBO
elif "z-image" in model_name_lower or "zimage" in model_name_lower:
return ZImageTurbo
elif "flux2" in model_name_lower or "flux.2" in model_name_lower:
return Flux2Klein
else:
return Flux1
def get_repo_name(model_name: str, bits: int | None) -> str:
"""Get the HuggingFace repo name for a model variant."""
# Extract repo name from HF path (e.g., "black-forest-labs/FLUX.1-Kontext-dev" -> "FLUX.1-Kontext-dev")
base_name = model_name.split("/")[-1] if "/" in model_name else model_name
suffix = f"-{bits}bit" if bits else ""
return f"{HF_ORG}/{base_name}{suffix}"
def get_local_path(output_dir: Path, model_name: str, bits: int | None) -> Path:
"""Get the local save path for a model variant."""
# Extract repo name from HF path (e.g., "black-forest-labs/FLUX.1-Kontext-dev" -> "FLUX.1-Kontext-dev")
base_name = model_name.split("/")[-1] if "/" in model_name else model_name
suffix = f"-{bits}bit" if bits else ""
return output_dir / f"{base_name}{suffix}"
def copy_source_repo(
source_repo: str,
local_path: Path,
dry_run: bool = False,
) -> None:
"""Copy all files from source repo (replicating original HF structure)."""
print(f"\n{'=' * 60}")
print(f"Copying full repo from source: {source_repo}")
print(f"Output path: {local_path}")
print(f"{'=' * 60}")
if dry_run:
print("[DRY RUN] Would download all files from source repo")
return
from huggingface_hub import snapshot_download
# Download all files to our local path
snapshot_download(
repo_id=source_repo,
local_dir=local_path,
)
# Remove root-level safetensors files (flux.1-dev.safetensors, etc.)
# These are redundant with the component directories
for f in local_path.glob("*.safetensors"):
print(f"Removing root-level safetensors: {f.name}")
if not dry_run:
f.unlink()
print(f"Source repo copied to {local_path}")
def load_and_save_quantized_model(
model_name: str,
bits: int,
output_path: Path,
dry_run: bool = False,
) -> None:
"""Load a model with quantization and save it in mflux format."""
print(f"\n{'=' * 60}")
print(f"Loading {model_name} with {bits}-bit quantization...")
print(f"Output path: {output_path}")
print(f"{'=' * 60}")
if dry_run:
print("[DRY RUN] Would load and save quantized model")
return
from mflux.models.common.config.model_config import ModelConfig
model_class = get_model_class(model_name)
model_config = ModelConfig.from_name(model_name=model_name, base_model=None)
model: Flux1 = model_class(
quantize=bits,
model_config=model_config,
)
print(f"Saving model to {output_path}...")
model.save_model(str(output_path))
print(f"Model saved successfully to {output_path}")
def copy_source_metadata(
source_repo: str,
local_path: Path,
dry_run: bool = False,
) -> None:
"""Copy metadata files (LICENSE, README, etc.) from source repo, excluding safetensors."""
print(f"\n{'=' * 60}")
print(f"Copying metadata from source repo: {source_repo}")
print(f"{'=' * 60}")
if dry_run:
print("[DRY RUN] Would download metadata files (excluding *.safetensors)")
return
from huggingface_hub import snapshot_download
# Download all files except safetensors to our local path
snapshot_download(
repo_id=source_repo,
local_dir=local_path,
ignore_patterns=["*.safetensors"],
)
print(f"Metadata files copied to {local_path}")
def upload_to_huggingface(
local_path: Path,
repo_id: str,
dry_run: bool = False,
clean_remote: bool = False,
) -> None:
"""Upload a saved model to HuggingFace."""
print(f"\n{'=' * 60}")
print(f"Uploading to HuggingFace: {repo_id}")
print(f"Local path: {local_path}")
print(f"Clean remote first: {clean_remote}")
print(f"{'=' * 60}")
if dry_run:
print("[DRY RUN] Would upload to HuggingFace")
return
from huggingface_hub import HfApi
api = HfApi()
# Create the repo if it doesn't exist
print(f"Creating/verifying repo: {repo_id}")
api.create_repo(repo_id=repo_id, repo_type="model", exist_ok=True)
# Clean remote repo if requested (delete old mflux-format files)
if clean_remote:
print("Cleaning old mflux-format files from remote...")
try:
# Pattern for mflux numbered shards: <dir>/<number>.safetensors
numbered_pattern = re.compile(r".*/\d+\.safetensors$")
repo_files = api.list_repo_files(repo_id=repo_id, repo_type="model")
for file_path in repo_files:
# Delete numbered safetensors (mflux format) and mflux index files
if numbered_pattern.match(file_path) or file_path.endswith(
"/model.safetensors.index.json"
):
print(f" Deleting: {file_path}")
api.delete_file(
path_in_repo=file_path, repo_id=repo_id, repo_type="model"
)
except Exception as e:
print(f"Warning: Could not clean remote files: {e}")
# Upload the folder
print("Uploading folder contents...")
api.upload_folder(
folder_path=str(local_path),
repo_id=repo_id,
repo_type="model",
)
print(f"Upload complete: https://huggingface.co/{repo_id}")
def clean_local_files(local_path: Path, dry_run: bool = False) -> None:
"""Remove local model files after upload."""
print(f"\nCleaning up: {local_path}")
if dry_run:
print("[DRY RUN] Would remove local files")
return
if local_path.exists():
shutil.rmtree(local_path)
print(f"Removed {local_path}")
def main() -> int:
parser = argparse.ArgumentParser(
description="Download an mflux model, quantize it, and upload to HuggingFace.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process all variants (base, 4-bit, 8-bit) for FLUX.1-Kontext-dev
python tmp/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev
# Only process 4-bit variant
python tmp/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev --skip-base --skip-8bit
# Save locally without uploading
python tmp/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev --skip-upload
# Preview what would happen
python tmp/quantize_and_upload.py --model black-forest-labs/FLUX.1-Kontext-dev --dry-run
""",
)
parser.add_argument(
"--model",
"-m",
required=True,
help="HuggingFace model path (e.g., black-forest-labs/FLUX.1-Kontext-dev)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=Path("./tmp/models"),
help="Local directory to save models (default: ./tmp/models)",
)
parser.add_argument(
"--skip-base",
action="store_true",
help="Skip base model (no quantization)",
)
parser.add_argument(
"--skip-4bit",
action="store_true",
help="Skip 4-bit quantized model",
)
parser.add_argument(
"--skip-8bit",
action="store_true",
help="Skip 8-bit quantized model",
)
parser.add_argument(
"--skip-download",
action="store_true",
help="Skip downloading/processing, only do upload/clean operations",
)
parser.add_argument(
"--skip-upload",
action="store_true",
help="Only save locally, don't upload to HuggingFace",
)
parser.add_argument(
"--clean",
action="store_true",
help="Remove local files after upload",
)
parser.add_argument(
"--clean-remote",
action="store_true",
help="Delete old mflux-format files from remote repo before uploading",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print actions without executing",
)
args = parser.parse_args()
# Determine which variants to process
variants: list[int | None] = []
if not args.skip_base:
variants.append(None) # Base model (no quantization)
if not args.skip_4bit:
variants.append(4)
if not args.skip_8bit:
variants.append(8)
if not variants:
print("Error: All variants skipped. Nothing to do.")
return 1
# Create output directory
args.output_dir.mkdir(parents=True, exist_ok=True)
print(f"Model: {args.model}")
print(f"Output directory: {args.output_dir}")
print(
f"Variants to process: {['base' if v is None else f'{v}-bit' for v in variants]}"
)
print(f"Upload to HuggingFace: {not args.skip_upload}")
print(f"Clean after upload: {args.clean}")
if args.dry_run:
print("\n*** DRY RUN MODE - No actual changes will be made ***")
# Process each variant
for bits in variants:
local_path = get_local_path(args.output_dir, args.model, bits)
repo_id = get_repo_name(args.model, bits)
if not args.skip_download:
if bits is None:
# Base model: copy original HF repo structure (no mflux conversion)
copy_source_repo(
source_repo=args.model,
local_path=local_path,
dry_run=args.dry_run,
)
else:
# Quantized model: load, quantize, and save with mflux
load_and_save_quantized_model(
model_name=args.model,
bits=bits,
output_path=local_path,
dry_run=args.dry_run,
)
# Copy metadata from source repo (LICENSE, README, etc.)
copy_source_metadata(
source_repo=args.model,
local_path=local_path,
dry_run=args.dry_run,
)
# Upload
if not args.skip_upload:
upload_to_huggingface(
local_path=local_path,
repo_id=repo_id,
dry_run=args.dry_run,
clean_remote=args.clean_remote,
)
# Clean up if requested
if args.clean:
clean_local_files(local_path, dry_run=args.dry_run)
print("\n" + "=" * 60)
print("All done!")
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())

2199
uv.lock generated
View File

File diff suppressed because it is too large Load Diff