Compare commits

..

6 Commits

Author SHA1 Message Date
ciaranbor
19120b1fe7 Eval noise before CFG exchange 2026-01-31 18:18:54 +00:00
ciaranbor
714e1600e7 Reverse negative CFG group ranks (enables guidance exchange for ring topology) 2026-01-31 18:18:25 +00:00
ciaranbor
f3abdb53cd Fix image yield for CFG parallel 2026-01-31 18:16:24 +00:00
ciaranbor
d457e9d07e Implement parallel CFG 2026-01-30 20:36:48 +00:00
ciaranbor
135e894232 Update API to set seed to maintain consistency between nodes 2026-01-30 20:36:20 +00:00
ciaranbor
bebf5a1654 Support parallel CFG sharding 2026-01-30 20:36:06 +00:00
27 changed files with 828 additions and 421 deletions

3
.gitignore vendored
View File

@@ -28,6 +28,3 @@ target/
dashboard/build/
dashboard/node_modules/
dashboard/.svelte-kit/
# host config snapshots
hosts_*.json

View File

@@ -21,7 +21,7 @@ def exo_shard_downloader(max_parallel_downloads: int = 8) -> ShardDownloader:
async def build_base_shard(model_id: ModelId) -> ShardMetadata:
model_card = await ModelCard.load(model_id)
model_card = await ModelCard.from_hf(model_id)
return PipelineShardMetadata(
model_card=model_card,
device_rank=0,
@@ -166,8 +166,9 @@ class ResumableShardDownloader(ShardDownloader):
for task in asyncio.as_completed(tasks):
try:
yield await task
# TODO: except Exception
except Exception as e:
logger.warning(f"Error downloading shard: {type(e).__name__}")
logger.error("Error downloading shard:", e)
async def get_shard_download_status_for_shard(
self, shard: ShardMetadata

View File

@@ -1,6 +1,7 @@
import base64
import contextlib
import json
import random
import time
from collections.abc import AsyncGenerator
from http import HTTPStatus
@@ -65,9 +66,7 @@ from exo.shared.types.api import (
StartDownloadParams,
StartDownloadResponse,
StreamingChoiceResponse,
StreamOptions,
ToolCall,
Usage,
)
from exo.shared.types.chunks import (
ErrorChunk,
@@ -114,10 +113,17 @@ def _format_to_content_type(image_format: Literal["png", "jpeg", "webp"] | None)
return f"image/{image_format or 'png'}"
def _ensure_seed(params: AdvancedImageParams | None) -> AdvancedImageParams:
"""Ensure advanced params has a seed set for distributed consistency."""
if params is None:
return AdvancedImageParams(seed=random.randint(0, 2**32 - 1))
if params.seed is None:
return params.model_copy(update={"seed": random.randint(0, 2**32 - 1)})
return params
def chunk_to_response(
chunk: TokenChunk | ToolCallChunk,
command_id: CommandId,
usage: Usage | None,
chunk: TokenChunk | ToolCallChunk, command_id: CommandId
) -> ChatCompletionResponse:
return ChatCompletionResponse(
id=command_id,
@@ -142,10 +148,21 @@ def chunk_to_response(
finish_reason=chunk.finish_reason,
)
],
usage=usage,
)
async def resolve_model_card(model_id: ModelId) -> ModelCard:
if model_id in MODEL_CARDS:
model_card = MODEL_CARDS[model_id]
return model_card
for card in MODEL_CARDS.values():
if card.model_id == ModelId(model_id):
return card
return await ModelCard.from_hf(model_id)
class API:
def __init__(
self,
@@ -267,7 +284,7 @@ class API:
async def place_instance(self, payload: PlaceInstanceParams):
command = PlaceInstance(
model_card=await ModelCard.load(payload.model_id),
model_card=await resolve_model_card(payload.model_id),
sharding=payload.sharding,
instance_meta=payload.instance_meta,
min_nodes=payload.min_nodes,
@@ -284,7 +301,7 @@ class API:
self, payload: CreateInstanceParams
) -> CreateInstanceResponse:
instance = payload.instance
model_card = await ModelCard.load(instance.shard_assignments.model_id)
model_card = await resolve_model_card(instance.shard_assignments.model_id)
required_memory = model_card.storage_size
available_memory = self._calculate_total_available_memory()
@@ -312,7 +329,7 @@ class API:
instance_meta: InstanceMeta = InstanceMeta.MlxRing,
min_nodes: int = 1,
) -> Instance:
model_card = await ModelCard.load(model_id)
model_card = await resolve_model_card(model_id)
try:
placements = get_instance_placements(
@@ -515,10 +532,9 @@ class API:
del self._chat_completion_queues[command_id]
async def _generate_chat_stream(
self, command_id: CommandId, stream_options: StreamOptions | None = None
self, command_id: CommandId
) -> AsyncGenerator[str, None]:
"""Generate chat completion stream as JSON strings."""
include_usage = stream_options.include_usage if stream_options else False
async for chunk in self._chat_chunk_stream(command_id):
assert not isinstance(chunk, ImageChunk)
@@ -534,10 +550,8 @@ class API:
yield "data: [DONE]\n\n"
return
usage = chunk.usage if include_usage else None
chunk_response: ChatCompletionResponse = chunk_to_response(
chunk, command_id, usage=usage
chunk, command_id
)
logger.debug(f"chunk_response: {chunk_response}")
@@ -553,9 +567,8 @@ class API:
text_parts: list[str] = []
tool_calls: list[ToolCall] = []
model: ModelId | None = None
model: str | None = None
finish_reason: FinishReason | None = None
usage: Usage | None = None
async for chunk in self._chat_chunk_stream(command_id):
if isinstance(chunk, ErrorChunk):
@@ -580,9 +593,6 @@ class API:
for i, tool in enumerate(chunk.tool_calls)
)
if chunk.usage is not None:
usage = chunk.usage
if chunk.finish_reason is not None:
finish_reason = chunk.finish_reason
@@ -604,7 +614,6 @@ class API:
finish_reason=finish_reason,
)
],
usage=usage,
)
async def _collect_chat_completion_with_stats(
@@ -612,7 +621,7 @@ class API:
) -> BenchChatCompletionResponse:
text_parts: list[str] = []
tool_calls: list[ToolCall] = []
model: ModelId | None = None
model: str | None = None
finish_reason: FinishReason | None = None
stats: GenerationStats | None = None
@@ -665,7 +674,7 @@ class API:
)
return resp
async def _trigger_notify_user_to_download_model(self, model_id: ModelId) -> None:
async def _trigger_notify_user_to_download_model(self, model_id: str) -> None:
logger.warning(
"TODO: we should send a notification to the user to download the model"
)
@@ -674,7 +683,7 @@ class API:
self, payload: ChatCompletionTaskParams
) -> ChatCompletionResponse | StreamingResponse:
"""Handle chat completions, supporting both streaming and non-streaming responses."""
model_card = await ModelCard.load(ModelId(payload.model))
model_card = await resolve_model_card(ModelId(payload.model))
payload.model = model_card.model_id
if not any(
@@ -692,7 +701,7 @@ class API:
await self._send(command)
if payload.stream:
return StreamingResponse(
self._generate_chat_stream(command.command_id, payload.stream_options),
self._generate_chat_stream(command.command_id),
media_type="text/event-stream",
)
@@ -701,7 +710,7 @@ class API:
async def bench_chat_completions(
self, payload: BenchChatCompletionTaskParams
) -> BenchChatCompletionResponse:
model_card = await ModelCard.load(ModelId(payload.model))
model_card = await resolve_model_card(ModelId(payload.model))
payload.model = model_card.model_id
if not any(
@@ -721,12 +730,12 @@ class API:
response = await self._collect_chat_completion_with_stats(command.command_id)
return response
async def _validate_image_model(self, model: ModelId) -> ModelId:
async def _validate_image_model(self, model: str) -> ModelId:
"""Validate model exists and return resolved model ID.
Raises HTTPException 404 if no instance is found for the model.
"""
model_card = await ModelCard.load(model)
model_card = await resolve_model_card(ModelId(model))
resolved_model = model_card.model_id
if not any(
instance.shard_assignments.model_id == resolved_model
@@ -772,7 +781,10 @@ class API:
When stream=True and partial_images > 0, returns a StreamingResponse
with SSE-formatted events for partial and final images.
"""
payload.model = await self._validate_image_model(ModelId(payload.model))
payload.model = await self._validate_image_model(payload.model)
payload = payload.model_copy(
update={"advanced_params": _ensure_seed(payload.advanced_params)}
)
command = ImageGeneration(
request_params=payload,
@@ -1017,10 +1029,13 @@ class API:
async def bench_image_generations(
self, request: Request, payload: BenchImageGenerationTaskParams
) -> BenchImageGenerationResponse:
payload.model = await self._validate_image_model(ModelId(payload.model))
payload.model = await self._validate_image_model(payload.model)
payload.stream = False
payload.partial_images = 0
payload = payload.model_copy(
update={"advanced_params": _ensure_seed(payload.advanced_params)}
)
command = ImageGeneration(
request_params=payload,
@@ -1038,7 +1053,7 @@ class API:
self,
image: UploadFile,
prompt: str,
model: ModelId,
model: str,
n: int,
size: str,
response_format: Literal["url", "b64_json"],
@@ -1052,6 +1067,7 @@ class API:
) -> ImageEdits:
"""Prepare and send an image edits command with chunked image upload."""
resolved_model = await self._validate_image_model(model)
advanced_params = _ensure_seed(advanced_params)
image_content = await image.read()
image_data = base64.b64encode(image_content).decode("utf-8")
@@ -1133,7 +1149,7 @@ class API:
command = await self._send_image_edits_command(
image=image,
prompt=prompt,
model=ModelId(model),
model=model,
n=n,
size=size,
response_format=response_format,
@@ -1189,7 +1205,7 @@ class API:
command = await self._send_image_edits_command(
image=image,
prompt=prompt,
model=ModelId(model),
model=model,
n=n,
size=size,
response_format=response_format,

View File

@@ -94,20 +94,35 @@ def get_shard_assignments_for_pipeline_parallel(
runner_to_shard: dict[RunnerId, ShardMetadata] = {}
node_to_runner: dict[NodeId, RunnerId] = {}
# Determine CFG parallelism topology
# CFG parallel only for even node counts with CFG models (2+ nodes)
use_cfg_parallel = model_card.uses_cfg and world_size >= 2 and world_size % 2 == 0
cfg_world_size = 2 if use_cfg_parallel else 1
pipeline_world_size = world_size // cfg_world_size
# For CFG parallel, we only need to allocate layers for one pipeline group
# (both CFG groups run the same layers). Use the first pipeline group's nodes.
pipeline_node_ids = cycle.node_ids[:pipeline_world_size]
pipeline_memory = sum(
(node_memory[node_id].ram_available for node_id in pipeline_node_ids),
start=Memory(),
)
layer_allocations = allocate_layers_proportionally(
total_layers=total_layers,
memory_fractions=[
node_memory[node_id].ram_available.in_bytes / cycle_memory.in_bytes
for node_id in cycle.node_ids
node_memory[node_id].ram_available.in_bytes / pipeline_memory.in_bytes
for node_id in pipeline_node_ids
],
)
# Validate each node has sufficient memory for its assigned layers
memory_per_layer = model_card.storage_size.in_bytes / total_layers
for i, (node_id, node_layers) in enumerate(
zip(cycle.node_ids, layer_allocations, strict=True)
):
required_memory = node_layers * memory_per_layer
# Validate each pipeline node has sufficient memory for its assigned layers
# Use integer arithmetic to avoid floating point precision issues
total_storage_bytes = model_card.storage_size.in_bytes
for i, node_id in enumerate(pipeline_node_ids):
node_layers = layer_allocations[i]
# Integer division then multiply to get conservative estimate
required_memory = (total_storage_bytes * node_layers) // total_layers
available_memory = node_memory[node_id].ram_available.in_bytes
if required_memory > available_memory:
raise ValueError(
@@ -116,24 +131,69 @@ def get_shard_assignments_for_pipeline_parallel(
f"but only has {available_memory / (1024**3):.2f} GB available"
)
layers_assigned = 0
for i, (node_id, node_layers) in enumerate(
zip(cycle.node_ids, layer_allocations, strict=True)
):
# CFG group 0: pipeline ranks in ascending order (0, 1, 2, ...)
# CFG group 1: pipeline ranks in descending order (reversed)
# This places both "last stages" as ring neighbors for CFG exchange.
position_to_cfg_pipeline = [(0, r) for r in range(pipeline_world_size)] + [
(1, r) for r in reversed(range(pipeline_world_size))
]
cfg_pipeline_to_device: dict[tuple[int, int], int] = {
(cfg_rank, pipeline_rank): i
for i, (cfg_rank, pipeline_rank) in enumerate(position_to_cfg_pipeline)
}
for i, node_id in enumerate(cycle.node_ids):
cfg_rank, pipeline_rank = position_to_cfg_pipeline[i]
layers_before = sum(layer_allocations[:pipeline_rank])
node_layers = layer_allocations[pipeline_rank]
is_first_stage = pipeline_rank == 0
is_last_stage = pipeline_rank == pipeline_world_size - 1
if is_last_stage:
next_pipeline_device = None
else:
next_pipeline_device = cfg_pipeline_to_device[(cfg_rank, pipeline_rank + 1)]
if is_first_stage:
prev_pipeline_device = None
else:
prev_pipeline_device = cfg_pipeline_to_device[(cfg_rank, pipeline_rank - 1)]
if is_last_stage and use_cfg_parallel:
other_cfg_rank = 1 - cfg_rank
cfg_peer_device = cfg_pipeline_to_device[(other_cfg_rank, pipeline_rank)]
else:
cfg_peer_device = None
first_pipeline_device = cfg_pipeline_to_device[(cfg_rank, 0)]
last_pipeline_device = cfg_pipeline_to_device[
(cfg_rank, pipeline_world_size - 1)
]
runner_id = RunnerId()
shard = PipelineShardMetadata(
model_card=model_card,
device_rank=i,
world_size=world_size,
start_layer=layers_assigned,
end_layer=layers_assigned + node_layers,
start_layer=layers_before,
end_layer=layers_before + node_layers,
n_layers=total_layers,
cfg_rank=cfg_rank,
cfg_world_size=cfg_world_size,
explicit_pipeline_rank=pipeline_rank,
next_pipeline_device=next_pipeline_device,
prev_pipeline_device=prev_pipeline_device,
cfg_peer_device=cfg_peer_device,
first_pipeline_device=first_pipeline_device,
last_pipeline_device=last_pipeline_device,
)
runner_to_shard[runner_id] = shard
node_to_runner[node_id] = runner_id
layers_assigned += node_layers
shard_assignments = ShardAssignments(
model_id=model_card.model_id,

View File

@@ -5,6 +5,7 @@ from exo.master.placement_utils import (
filter_cycles_by_memory,
get_mlx_jaccl_coordinators,
get_shard_assignments,
get_shard_assignments_for_pipeline_parallel,
get_smallest_cycles,
)
from exo.master.tests.conftest import (
@@ -20,7 +21,7 @@ from exo.shared.types.profiling import (
NodeNetworkInfo,
)
from exo.shared.types.topology import Connection, SocketConnection
from exo.shared.types.worker.shards import Sharding
from exo.shared.types.worker.shards import PipelineShardMetadata, Sharding
def test_filter_cycles_by_memory():
@@ -487,3 +488,195 @@ def test_get_shard_assignments_insufficient_memory_raises():
get_shard_assignments(
model_card, selected_cycle, Sharding.Pipeline, node_memory
)
class TestCfgParallelPlacement:
def _create_ring_topology(self, node_ids: list[NodeId]) -> Topology:
topology = Topology()
for node_id in node_ids:
topology.add_node(node_id)
for i, node_id in enumerate(node_ids):
next_node = node_ids[(i + 1) % len(node_ids)]
conn = Connection(
source=node_id,
sink=next_node,
edge=create_socket_connection(i + 1),
)
topology.add_connection(conn)
return topology
def test_two_nodes_cfg_model_uses_cfg_parallel(self):
"""Two nodes with CFG model should use CFG parallel (no pipeline)."""
node_a = NodeId()
node_b = NodeId()
topology = self._create_ring_topology([node_a, node_b])
cycles = [c for c in topology.get_cycles() if len(c) == 2]
cycle = cycles[0]
node_memory = {
node_a: create_node_memory(1000 * 1024),
node_b: create_node_memory(1000 * 1024),
}
model_card = ModelCard(
model_id=ModelId("qwen-image-test"),
n_layers=60,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=True,
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 2
# Both nodes should have all layers (no pipeline split)
for shard in shards:
assert isinstance(shard, PipelineShardMetadata)
assert shard.start_layer == 0
assert shard.end_layer == 60
assert shard.cfg_world_size == 2
cfg_ranks = sorted(
s.cfg_rank for s in shards if isinstance(s, PipelineShardMetadata)
)
assert cfg_ranks == [0, 1]
def test_four_nodes_cfg_model_uses_hybrid(self):
"""Four nodes with CFG model should use 2 CFG groups x 2 pipeline stages."""
nodes = [NodeId() for _ in range(4)]
topology = self._create_ring_topology(nodes)
cycles = [c for c in topology.get_cycles() if len(c) == 4]
cycle = cycles[0]
node_memory = {n: create_node_memory(1000 * 1024) for n in nodes}
model_card = ModelCard(
model_id=ModelId("qwen-image-test"),
n_layers=60,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=True,
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 4
for shard in shards:
assert isinstance(shard, PipelineShardMetadata)
assert shard.cfg_world_size == 2
assert shard.pipeline_world_size == 2
# Check we have 2 nodes in each CFG group
cfg_0_shards = [
s
for s in shards
if isinstance(s, PipelineShardMetadata) and s.cfg_rank == 0
]
cfg_1_shards = [
s
for s in shards
if isinstance(s, PipelineShardMetadata) and s.cfg_rank == 1
]
assert len(cfg_0_shards) == 2
assert len(cfg_1_shards) == 2
# Both CFG groups should have the same layer assignments
cfg_0_layers = [(s.start_layer, s.end_layer) for s in cfg_0_shards]
cfg_1_layers = [(s.start_layer, s.end_layer) for s in cfg_1_shards]
assert sorted(cfg_0_layers) == sorted(cfg_1_layers)
def test_three_nodes_cfg_model_uses_sequential_cfg(self):
"""Three nodes (odd) with CFG model should use sequential CFG."""
nodes = [NodeId() for _ in range(3)]
topology = self._create_ring_topology(nodes)
cycles = [c for c in topology.get_cycles() if len(c) == 3]
cycle = cycles[0]
node_memory = {n: create_node_memory(1000 * 1024) for n in nodes}
model_card = ModelCard(
model_id=ModelId("qwen-image-test"),
n_layers=60,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=True,
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 3
for shard in shards:
assert isinstance(shard, PipelineShardMetadata)
# cfg_world_size = 1 means sequential CFG
assert shard.cfg_world_size == 1
assert shard.cfg_rank == 0
def test_two_nodes_non_cfg_model_uses_pipeline(self):
"""Two nodes with non-CFG model should use pure pipeline."""
node_a = NodeId()
node_b = NodeId()
topology = self._create_ring_topology([node_a, node_b])
cycles = [c for c in topology.get_cycles() if len(c) == 2]
cycle = cycles[0]
node_memory = {
node_a: create_node_memory(1000 * 1024),
node_b: create_node_memory(1000 * 1024),
}
model_card = ModelCard(
model_id=ModelId("flux-test"),
n_layers=57,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=False, # Non-CFG model
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 2
for shard in shards:
assert isinstance(shard, PipelineShardMetadata)
# cfg_world_size = 1 means no CFG parallel
assert shard.cfg_world_size == 1
assert shard.cfg_rank == 0
# Should have actual layer sharding (pipeline)
layer_ranges = sorted(
(s.start_layer, s.end_layer)
for s in shards
if isinstance(s, PipelineShardMetadata)
)
# First shard starts at 0, last shard ends at 57
assert layer_ranges[0][0] == 0
assert layer_ranges[-1][1] == 57

View File

@@ -216,8 +216,6 @@ def get_node_id_keypair(
Obtains the :class:`Keypair` associated with this node-ID.
Obtain the :class:`PeerId` by from it.
"""
# TODO(evan): bring back node id persistence once we figure out how to deal with duplicates
return Keypair.generate_ed25519()
def lock_path(path: str | bytes | PathLike[str] | PathLike[bytes]) -> Path:
return Path(str(path) + ".lock")

View File

@@ -47,6 +47,7 @@ class ModelCard(CamelCaseModel):
supports_tensor: bool
tasks: list[ModelTask]
components: list[ComponentInfo] | None = None
uses_cfg: bool = False
@field_validator("tasks", mode="before")
@classmethod
@@ -562,6 +563,7 @@ _IMAGE_BASE_MODEL_CARDS: dict[str, ModelCard] = {
hidden_size=1,
supports_tensor=False,
tasks=[ModelTask.TextToImage],
uses_cfg=True,
components=[
ComponentInfo(
component_name="text_encoder",
@@ -596,6 +598,7 @@ _IMAGE_BASE_MODEL_CARDS: dict[str, ModelCard] = {
hidden_size=1,
supports_tensor=False,
tasks=[ModelTask.ImageToImage],
uses_cfg=True,
components=[
ComponentInfo(
component_name="text_encoder",
@@ -681,6 +684,7 @@ def _generate_image_model_quant_variants(
hidden_size=base_card.hidden_size,
supports_tensor=base_card.supports_tensor,
tasks=base_card.tasks,
uses_cfg=base_card.uses_cfg,
components=with_transformer_size(transformer_bytes),
)
}
@@ -700,6 +704,7 @@ def _generate_image_model_quant_variants(
hidden_size=base_card.hidden_size,
supports_tensor=base_card.supports_tensor,
tasks=base_card.tasks,
uses_cfg=base_card.uses_cfg,
components=with_transformer_size(quant_transformer_bytes),
)

View File

@@ -8,7 +8,7 @@ from multiprocessing.synchronize import Event as EventT
from multiprocessing.synchronize import Semaphore as SemaphoreT
from loguru import logger
from pytest import LogCaptureFixture, mark
from pytest import LogCaptureFixture
from exo.routing.router import get_node_id_keypair
from exo.shared.constants import EXO_NODE_ID_KEYPAIR
@@ -74,7 +74,6 @@ def _delete_if_exists(p: str | bytes | os.PathLike[str] | os.PathLike[bytes]):
os.remove(p)
@mark.skip(reason="this functionality is currently disabled but may return in future")
def test_node_id_fetching(caplog: LogCaptureFixture):
reps = 10

View File

@@ -11,7 +11,7 @@ from exo.shared.types.common import CommandId, NodeId
from exo.shared.types.memory import Memory
from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
from exo.shared.types.worker.shards import Sharding, ShardMetadata
from exo.utils.pydantic_ext import CamelCaseModel, ConfigDict, TaggedModel
from exo.utils.pydantic_ext import CamelCaseModel
FinishReason = Literal[
"stop", "length", "tool_calls", "content_filter", "function_call", "error"
@@ -116,8 +116,8 @@ class Usage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
prompt_tokens_details: PromptTokensDetails
completion_tokens_details: CompletionTokensDetails
prompt_tokens_details: PromptTokensDetails | None = None
completion_tokens_details: CompletionTokensDetails | None = None
class StreamingChoiceResponse(BaseModel):
@@ -170,13 +170,7 @@ class BenchChatCompletionResponse(ChatCompletionResponse):
generation_stats: GenerationStats | None = None
class StreamOptions(BaseModel):
include_usage: bool = False
class ChatCompletionTaskParams(TaggedModel):
model_config = ConfigDict(extra="ignore")
class ChatCompletionTaskParams(BaseModel):
model: str
frequency_penalty: float | None = None
messages: list[ChatCompletionMessage]
@@ -190,7 +184,6 @@ class ChatCompletionTaskParams(TaggedModel):
seed: int | None = None
stop: str | list[str] | None = None
stream: bool = False
stream_options: StreamOptions | None = None
temperature: float | None = None
top_p: float | None = None
tools: list[dict[str, Any]] | None = None

View File

@@ -2,7 +2,7 @@ from collections.abc import Generator
from typing import Any, Literal
from exo.shared.models.model_cards import ModelId
from exo.shared.types.api import GenerationStats, ImageGenerationStats, Usage
from exo.shared.types.api import GenerationStats, ImageGenerationStats
from exo.utils.pydantic_ext import TaggedModel
from .api import FinishReason
@@ -17,7 +17,6 @@ class BaseChunk(TaggedModel):
class TokenChunk(BaseChunk):
text: str
token_id: int
usage: Usage | None
finish_reason: Literal["stop", "length", "content_filter"] | None = None
stats: GenerationStats | None = None
@@ -29,7 +28,6 @@ class ErrorChunk(BaseChunk):
class ToolCallChunk(BaseChunk):
tool_calls: list[ToolCallItem]
usage: Usage | None
finish_reason: Literal["tool_calls"] = "tool_calls"
stats: GenerationStats | None = None

View File

@@ -2,7 +2,6 @@ from pydantic import Field
from exo.shared.models.model_cards import ModelCard, ModelId
from exo.shared.types.api import (
BenchChatCompletionTaskParams,
ChatCompletionTaskParams,
ImageEditsInternalParams,
ImageGenerationTaskParams,
@@ -23,7 +22,7 @@ class TestCommand(BaseCommand):
class ChatCompletion(BaseCommand):
request_params: ChatCompletionTaskParams | BenchChatCompletionTaskParams
request_params: ChatCompletionTaskParams
class ImageGeneration(BaseCommand):

View File

@@ -3,7 +3,6 @@ from enum import Enum
from pydantic import Field
from exo.shared.types.api import (
BenchChatCompletionTaskParams,
ChatCompletionTaskParams,
ImageEditsInternalParams,
ImageGenerationTaskParams,
@@ -55,7 +54,7 @@ class StartWarmup(BaseTask): # emitted by Worker
class ChatCompletion(BaseTask): # emitted by Master
command_id: CommandId
task_params: ChatCompletionTaskParams | BenchChatCompletionTaskParams
task_params: ChatCompletionTaskParams
error_type: str | None = Field(default=None)
error_message: str | None = Field(default=None)

View File

@@ -6,7 +6,6 @@ from exo.shared.types.api import (
GenerationStats,
ImageGenerationStats,
ToolCallItem,
Usage,
)
from exo.utils.pydantic_ext import TaggedModel
@@ -25,7 +24,6 @@ class GenerationResponse(BaseRunnerResponse):
# logprobs: list[float] | None = None # too big. we can change to be top-k
finish_reason: FinishReason | None = None
stats: GenerationStats | None = None
usage: Usage | None
class ImageGenerationResponse(BaseRunnerResponse):
@@ -59,7 +57,6 @@ class PartialImageResponse(BaseRunnerResponse):
class ToolCallResponse(BaseRunnerResponse):
tool_calls: list[ToolCallItem]
usage: Usage | None
class FinishedResponse(BaseRunnerResponse):

View File

@@ -57,8 +57,62 @@ class PipelineShardMetadata(BaseShardMetadata):
Layers are represented as a half-open interval [start_layer, end_layer),
where start_layer is inclusive and end_layer is exclusive.
CFG parallelism fields:
- cfg_rank: 0 = positive branch, 1 = negative branch (or 0 if no CFG parallel)
- cfg_world_size: 1 = sequential CFG, 2 = parallel CFG
Communication rank fields (explicit to support ring topology):
- next_pipeline_device: device to send to in pipeline forward pass
- prev_pipeline_device: device to receive from in pipeline forward pass
- cfg_peer_device: device for CFG exchange (last stage only)
- first_pipeline_device: device of first stage in same CFG group (for latent return)
"""
cfg_rank: int = 0
cfg_world_size: int = 1
# Explicit pipeline position (CFG group 1 uses reversed pipeline order)
explicit_pipeline_rank: int | None = None
next_pipeline_device: int | None = None
prev_pipeline_device: int | None = None
cfg_peer_device: int | None = None
first_pipeline_device: int | None = None
last_pipeline_device: int | None = None
@property
def pipeline_world_size(self) -> int:
return self.world_size // self.cfg_world_size
@property
def pipeline_rank(self) -> int:
if self.explicit_pipeline_rank is not None:
return self.explicit_pipeline_rank
return self.device_rank % self.pipeline_world_size
@property
def is_pipeline_first(self) -> bool:
return self.pipeline_rank == 0
@property
def is_pipeline_last(self) -> bool:
return self.pipeline_rank == self.pipeline_world_size - 1
def __hash__(self) -> int:
return hash(
(
self.model_card.model_id,
self.start_layer,
self.end_layer,
self.n_layers,
self.device_rank,
self.world_size,
self.cfg_rank,
self.cfg_world_size,
)
)
class TensorShardMetadata(BaseShardMetadata):
pass

View File

@@ -37,7 +37,12 @@ class DistributedImageModel:
config = get_config_for_model(model_id)
adapter = create_adapter_for_model(config, model_id, local_path, quantize)
if group is not None:
has_layer_sharding = (
shard_metadata.start_layer != 0
or shard_metadata.end_layer != shard_metadata.n_layers
)
if group is not None and has_layer_sharding:
adapter.slice_transformer_blocks(
start_layer=shard_metadata.start_layer,
end_layer=shard_metadata.end_layer,

View File

@@ -86,6 +86,27 @@ class PromptData(ABC):
"""
...
@abstractmethod
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
"""Get embeddings for a single CFG branch (positive or negative).
Used for sequential CFG and CFG parallel modes where we process
one branch at a time instead of batching.
Args:
positive: True for positive prompt, False for negative prompt
Returns:
Tuple of:
- embeds: [1, seq, hidden] prompt embeddings
- mask: [1, seq] attention mask or None
- pooled: [1, hidden] pooled embeddings or None
- conditioning_latents: [1, latent_seq, latent_dim] or None
"""
...
class ModelAdapter(ABC, Generic[ModelT, TransformerT]):
_config: ImageModelConfig

View File

@@ -64,6 +64,12 @@ class FluxPromptData(PromptData):
) -> tuple[mx.array, mx.array, mx.array | None, mx.array | None] | None:
return None
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
"""Flux doesn't use CFG, but we return positive data for compatibility."""
return (self._prompt_embeds, None, self._pooled_prompt_embeds, None)
class FluxModelAdapter(ModelAdapter[Flux1, Transformer]):
def __init__(

View File

@@ -133,6 +133,24 @@ class QwenPromptData(PromptData):
return batched_embeds, batched_mask, None, cond_latents
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
if positive:
return (
self._prompt_embeds,
self._prompt_mask,
None,
self.conditioning_latents,
)
else:
return (
self._negative_prompt_embeds,
self._negative_prompt_mask,
None,
self.conditioning_latents,
)
class QwenModelAdapter(ModelAdapter[QwenImage, QwenTransformer]):
"""Adapter for Qwen-Image model.

View File

@@ -153,6 +153,24 @@ class QwenEditPromptData(PromptData):
return batched_embeds, batched_mask, None, batched_cond_latents
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
if positive:
return (
self._prompt_embeds,
self._prompt_mask,
None,
self._conditioning_latents,
)
else:
return (
self._negative_prompt_embeds,
self._negative_prompt_mask,
None,
self._conditioning_latents,
)
class QwenEditModelAdapter(ModelAdapter[QwenImageEdit, QwenTransformer]):
"""Adapter for Qwen-Image-Edit model.

View File

@@ -1,5 +1,7 @@
from collections.abc import Iterator
from dataclasses import dataclass
from math import ceil
from typing import Any, Optional
from typing import Any, Optional, final
import mlx.core as mx
from mflux.models.common.config.config import Config
@@ -20,6 +22,16 @@ from exo.worker.engines.image.pipeline.block_wrapper import (
)
@final
@dataclass
class CfgBranch:
positive: bool
embeds: mx.array
mask: mx.array | None
pooled: mx.array | None
cond_latents: mx.array | None
def calculate_patch_heights(
latent_height: int, num_patches: int
) -> tuple[list[int], int]:
@@ -72,22 +84,11 @@ class DiffusionRunner:
self.adapter = adapter
self.group = group
if group is None:
self.rank = 0
self.world_size = 1
self.next_rank = 0
self.prev_rank = 0
self.start_layer = 0
self.end_layer = config.total_blocks
else:
self.rank = shard_metadata.device_rank
self.world_size = shard_metadata.world_size
self.next_rank = (self.rank + 1) % self.world_size
self.prev_rank = (self.rank - 1 + self.world_size) % self.world_size
self.start_layer = shard_metadata.start_layer
self.end_layer = shard_metadata.end_layer
self._init_cfg_topology(shard_metadata)
self.num_patches = num_patches if num_patches else max(1, self.world_size)
self.num_patches = (
num_patches if num_patches else max(1, self.pipeline_world_size)
)
self.total_joint = config.joint_block_count
self.total_single = config.single_block_count
@@ -97,6 +98,48 @@ class DiffusionRunner:
self._compute_assigned_blocks()
def _init_cfg_topology(self, shard_metadata: PipelineShardMetadata) -> None:
"""Initialize CFG and pipeline topology from shard metadata."""
if self.group is None:
self.rank = 0
self.world_size = 1
self.start_layer = 0
self.end_layer = self.config.total_blocks
self.cfg_rank = 0
self.cfg_world_size = 1
self.cfg_parallel = False
self.pipeline_world_size = 1
self.pipeline_rank = 0
self.next_pipeline_rank: int | None = None
self.prev_pipeline_rank: int | None = None
self.cfg_peer_rank: int | None = None
self.first_pipeline_rank: int = 0
self.last_pipeline_rank: int = 0
else:
self.rank = shard_metadata.device_rank
self.world_size = shard_metadata.world_size
self.start_layer = shard_metadata.start_layer
self.end_layer = shard_metadata.end_layer
self.cfg_rank = shard_metadata.cfg_rank
self.cfg_world_size = shard_metadata.cfg_world_size
self.cfg_parallel = self.cfg_world_size > 1
self.pipeline_world_size = shard_metadata.pipeline_world_size
self.pipeline_rank = shard_metadata.pipeline_rank
self.next_pipeline_rank = shard_metadata.next_pipeline_device
self.prev_pipeline_rank = shard_metadata.prev_pipeline_device
self.cfg_peer_rank = shard_metadata.cfg_peer_device
assert shard_metadata.first_pipeline_device is not None
assert shard_metadata.last_pipeline_device is not None
self.first_pipeline_rank = shard_metadata.first_pipeline_device
self.last_pipeline_rank = shard_metadata.last_pipeline_device
def _compute_assigned_blocks(self) -> None:
"""Determine which joint/single blocks this stage owns."""
start = self.start_layer
@@ -133,11 +176,11 @@ class DiffusionRunner:
@property
def is_first_stage(self) -> bool:
return self.rank == 0
return self.pipeline_rank == 0
@property
def is_last_stage(self) -> bool:
return self.rank == self.world_size - 1
return self.pipeline_rank == self.pipeline_world_size - 1
@property
def is_distributed(self) -> bool:
@@ -148,6 +191,97 @@ class DiffusionRunner:
return self._guidance_override
return self.config.guidance_scale
def _get_cfg_branches(self, prompt_data: PromptData) -> Iterator[CfgBranch]:
"""Yield the CFG branches this node should process.
- No CFG: yields one branch (positive)
- CFG parallel: yields one branch (our assigned branch)
- Sequential CFG: yields two branches (positive, then negative)
"""
if not self.adapter.needs_cfg:
embeds, mask, pooled, cond = prompt_data.get_cfg_branch_data(positive=True)
yield CfgBranch(
positive=True,
embeds=embeds,
mask=mask,
pooled=pooled,
cond_latents=cond,
)
elif self.cfg_parallel:
positive = self.cfg_rank == 0
embeds, mask, pooled, cond = prompt_data.get_cfg_branch_data(positive)
yield CfgBranch(
positive=positive,
embeds=embeds,
mask=mask,
pooled=pooled,
cond_latents=cond,
)
else:
pos_embeds, pos_mask, pos_pooled, pos_cond = (
prompt_data.get_cfg_branch_data(positive=True)
)
yield CfgBranch(
positive=True,
embeds=pos_embeds,
mask=pos_mask,
pooled=pos_pooled,
cond_latents=pos_cond,
)
neg_embeds, neg_mask, neg_pooled, neg_cond = (
prompt_data.get_cfg_branch_data(positive=False)
)
yield CfgBranch(
positive=False,
embeds=neg_embeds,
mask=neg_mask,
pooled=neg_pooled,
cond_latents=neg_cond,
)
def _combine_cfg_results(self, results: list[tuple[bool, mx.array]]) -> mx.array:
if len(results) == 1:
positive, noise = results[0]
if self.cfg_parallel and self.is_last_stage:
# TODO(ciaran): try to remove
mx.eval(noise)
return self._exchange_and_apply_guidance(noise, positive)
return noise
noise_neg = next(n for p, n in results if not p)
noise_pos = next(n for p, n in results if p)
return self._apply_guidance(noise_pos, noise_neg)
def _exchange_and_apply_guidance(
self, noise: mx.array, is_positive: bool
) -> mx.array:
assert self.group is not None
assert self.cfg_peer_rank is not None
if is_positive:
noise = mx.distributed.send(noise, self.cfg_peer_rank, group=self.group)
mx.async_eval(noise)
noise_neg = mx.distributed.recv_like(
noise, self.cfg_peer_rank, group=self.group
)
mx.eval(noise_neg)
noise_pos = noise
else:
noise_pos = mx.distributed.recv_like(
noise, self.cfg_peer_rank, group=self.group
)
mx.eval(noise_pos)
noise = mx.distributed.send(noise, self.cfg_peer_rank, group=self.group)
mx.async_eval(noise)
noise_neg = noise
return self._apply_guidance(noise_pos, noise_neg)
def _apply_guidance(self, noise_pos: mx.array, noise_neg: mx.array) -> mx.array:
scale = self._get_effective_guidance_scale()
assert scale is not None
return self.adapter.apply_guidance(noise_pos, noise_neg, scale)
def _ensure_wrappers(
self,
text_seq_len: int,
@@ -464,7 +598,9 @@ class DiffusionRunner:
) -> mx.array:
if self.group is None:
return self._single_node_step(t, config, latents, prompt_data)
elif t < config.init_time_step + num_sync_steps:
elif (
self.pipeline_world_size == 1 or t < config.init_time_step + num_sync_steps
):
return self._sync_pipeline_step(
t,
config,
@@ -488,42 +624,29 @@ class DiffusionRunner:
prompt_data: PromptData,
) -> mx.array:
cond_image_grid = prompt_data.cond_image_grid
needs_cfg = self.adapter.needs_cfg
results: list[tuple[bool, mx.array]] = []
for branch in self._get_cfg_branches(prompt_data):
# Reset caches before each branch to ensure no state contamination
self._reset_all_caches()
if needs_cfg:
batched_data = prompt_data.get_batched_cfg_data()
assert batched_data is not None, "CFG model must provide batched data"
prompt_embeds, encoder_mask, batched_pooled, cond_latents = batched_data
pooled_embeds = (
batched_pooled if batched_pooled is not None else prompt_embeds
)
step_latents = mx.concatenate([latents, latents], axis=0)
else:
prompt_embeds = prompt_data.prompt_embeds
pooled_embeds = prompt_data.pooled_prompt_embeds
encoder_mask = prompt_data.get_encoder_hidden_states_mask(positive=True)
cond_latents = prompt_data.conditioning_latents
step_latents = latents
noise = self._forward_pass(
step_latents,
prompt_embeds,
pooled_embeds,
t=t,
config=config,
encoder_hidden_states_mask=encoder_mask,
cond_image_grid=cond_image_grid,
conditioning_latents=cond_latents,
)
if needs_cfg:
noise_pos, noise_neg = mx.split(noise, 2, axis=0)
guidance_scale = self._get_effective_guidance_scale()
assert guidance_scale is not None
noise = self.adapter.apply_guidance(
noise_pos, noise_neg, guidance_scale=guidance_scale
branch.pooled if branch.pooled is not None else branch.embeds
)
noise = self._forward_pass(
latents,
branch.embeds,
pooled_embeds,
t=t,
config=config,
encoder_hidden_states_mask=branch.mask,
cond_image_grid=cond_image_grid,
conditioning_latents=branch.cond_latents,
)
results.append((branch.positive, noise))
noise = self._combine_cfg_results(results)
return config.scheduler.step(noise=noise, timestep=t, latents=latents) # pyright: ignore[reportAny]
def _create_patches(
@@ -574,7 +697,7 @@ class DiffusionRunner:
)
text_embeddings = self.adapter.compute_text_embeddings(
t, config, pooled_prompt_embeds
t, config, pooled_prompt_embeds, hidden_states=hidden_states
)
image_rotary_embeddings = self.adapter.compute_rotary_embeddings(
prompt_embeds,
@@ -586,16 +709,17 @@ class DiffusionRunner:
if self.has_joint_blocks:
if not self.is_first_stage:
assert self.prev_pipeline_rank is not None
hidden_states = mx.distributed.recv(
(batch_size, num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(hidden_states, encoder_hidden_states)
@@ -620,27 +744,30 @@ class DiffusionRunner:
if self.has_single_blocks or self.is_last_stage:
hidden_states = concatenated
else:
assert self.next_pipeline_rank is not None
concatenated = mx.distributed.send(
concatenated, self.next_rank, group=self.group
concatenated, self.next_pipeline_rank, group=self.group
)
mx.async_eval(concatenated)
elif self.has_joint_blocks and not self.is_last_stage:
assert encoder_hidden_states is not None
assert self.next_pipeline_rank is not None
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
hidden_states, self.next_pipeline_rank, group=self.group
)
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
encoder_hidden_states, self.next_pipeline_rank, group=self.group
)
mx.async_eval(hidden_states, encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
assert self.prev_pipeline_rank is not None
hidden_states = mx.distributed.recv(
(batch_size, text_seq_len + num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(hidden_states)
@@ -655,8 +782,9 @@ class DiffusionRunner:
)
if not self.is_last_stage:
assert self.next_pipeline_rank is not None
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
hidden_states, self.next_pipeline_rank, group=self.group
)
mx.async_eval(hidden_states)
@@ -679,75 +807,65 @@ class DiffusionRunner:
kontext_image_ids: mx.array | None = None,
) -> mx.array:
prev_latents = hidden_states
needs_cfg = self.adapter.needs_cfg
cond_image_grid = prompt_data.cond_image_grid
scaled_hidden_states = config.scheduler.scale_model_input(hidden_states, t) # pyright: ignore[reportAny]
original_latent_tokens: int = scaled_hidden_states.shape[1] # pyright: ignore[reportAny]
if needs_cfg:
batched_data = prompt_data.get_batched_cfg_data()
assert batched_data is not None, "CFG model must provide batched data"
prompt_embeds, encoder_mask, batched_pooled, cond_latents = batched_data
results: list[tuple[bool, mx.array]] = []
for branch in self._get_cfg_branches(prompt_data):
pooled_embeds = (
batched_pooled if batched_pooled is not None else prompt_embeds
branch.pooled if branch.pooled is not None else branch.embeds
)
step_latents = mx.concatenate(
[scaled_hidden_states, scaled_hidden_states], axis=0
cond_latents = branch.cond_latents
if cond_latents is not None:
num_img_tokens: int = original_latent_tokens + cond_latents.shape[1]
else:
num_img_tokens = original_latent_tokens
step_latents: mx.array = scaled_hidden_states # pyright: ignore[reportAny]
if self.is_first_stage and cond_latents is not None:
step_latents = mx.concatenate([step_latents, cond_latents], axis=1)
text_seq_len = branch.embeds.shape[1]
self._ensure_wrappers(text_seq_len, branch.mask)
noise = self._run_sync_pass(
t,
config,
step_latents,
branch.embeds,
pooled_embeds,
branch.mask,
cond_image_grid,
kontext_image_ids,
num_img_tokens,
original_latent_tokens,
cond_latents,
)
else:
prompt_embeds = prompt_data.prompt_embeds
pooled_embeds = prompt_data.pooled_prompt_embeds
encoder_mask = prompt_data.get_encoder_hidden_states_mask(positive=True)
cond_latents = prompt_data.conditioning_latents
step_latents = scaled_hidden_states # pyright: ignore[reportAny]
if cond_latents is not None:
num_img_tokens: int = original_latent_tokens + cond_latents.shape[1]
else:
num_img_tokens = original_latent_tokens
if self.is_first_stage and cond_latents is not None:
step_latents = mx.concatenate([step_latents, cond_latents], axis=1)
text_seq_len = prompt_embeds.shape[1]
self._ensure_wrappers(text_seq_len, encoder_mask)
noise = self._run_sync_pass(
t,
config,
step_latents,
prompt_embeds,
pooled_embeds,
encoder_mask,
cond_image_grid,
kontext_image_ids,
num_img_tokens,
original_latent_tokens,
cond_latents,
)
if self.is_last_stage:
assert noise is not None
results.append((branch.positive, noise))
if self.is_last_stage:
assert noise is not None
if needs_cfg:
noise_pos, noise_neg = mx.split(noise, 2, axis=0)
guidance_scale = self._get_effective_guidance_scale()
assert guidance_scale is not None
noise = self.adapter.apply_guidance(
noise_pos, noise_neg, guidance_scale
)
noise = self._combine_cfg_results(results)
hidden_states = config.scheduler.step( # pyright: ignore[reportAny]
noise=noise, timestep=t, latents=prev_latents
)
if not self.is_first_stage:
hidden_states = mx.distributed.send(hidden_states, 0, group=self.group)
hidden_states = mx.distributed.send(
hidden_states, self.first_pipeline_rank, group=self.group
)
mx.async_eval(hidden_states)
elif self.is_first_stage:
hidden_states = mx.distributed.recv_like(
prev_latents, src=self.world_size - 1, group=self.group
prev_latents, src=self.last_pipeline_rank, group=self.group
)
mx.eval(hidden_states)
@@ -766,39 +884,10 @@ class DiffusionRunner:
kontext_image_ids: mx.array | None = None,
) -> mx.array:
patch_latents, token_indices = self._create_patches(latents, config)
needs_cfg = self.adapter.needs_cfg
cond_image_grid = prompt_data.cond_image_grid
if needs_cfg:
batched_data = prompt_data.get_batched_cfg_data()
assert batched_data is not None, "CFG model must provide batched data"
prompt_embeds, encoder_mask, batched_pooled, _ = batched_data
pooled_embeds = (
batched_pooled if batched_pooled is not None else prompt_embeds
)
else:
prompt_embeds = prompt_data.prompt_embeds
pooled_embeds = prompt_data.pooled_prompt_embeds
encoder_mask = prompt_data.get_encoder_hidden_states_mask(positive=True)
text_seq_len = prompt_embeds.shape[1]
self._ensure_wrappers(text_seq_len, encoder_mask)
self._set_text_seq_len(text_seq_len)
if self.joint_block_wrappers:
for wrapper in self.joint_block_wrappers:
wrapper.set_encoder_mask(encoder_mask)
text_embeddings = self.adapter.compute_text_embeddings(t, config, pooled_embeds)
image_rotary_embeddings = self.adapter.compute_rotary_embeddings(
prompt_embeds,
config,
encoder_hidden_states_mask=encoder_mask,
cond_image_grid=cond_image_grid,
kontext_image_ids=kontext_image_ids,
)
prev_patch_latents = [p for p in patch_latents]
encoder_hidden_states: mx.array | None = None
for patch_idx in range(len(patch_latents)):
@@ -810,31 +899,52 @@ class DiffusionRunner:
and not is_first_async_step
):
patch = mx.distributed.recv_like(
patch, src=self.prev_rank, group=self.group
patch, src=self.last_pipeline_rank, group=self.group
)
mx.eval(patch)
step_patch = mx.concatenate([patch, patch], axis=0) if needs_cfg else patch
results: list[tuple[bool, mx.array]] = []
noise, encoder_hidden_states = self._run_single_patch_pass(
patch=step_patch,
patch_idx=patch_idx,
token_indices=token_indices[patch_idx],
prompt_embeds=prompt_embeds,
text_embeddings=text_embeddings,
image_rotary_embeddings=image_rotary_embeddings,
encoder_hidden_states=encoder_hidden_states,
)
for branch in self._get_cfg_branches(prompt_data):
pooled_embeds = (
branch.pooled if branch.pooled is not None else branch.embeds
)
text_seq_len = branch.embeds.shape[1]
self._ensure_wrappers(text_seq_len, branch.mask)
self._set_text_seq_len(text_seq_len)
if self.joint_block_wrappers:
for wrapper in self.joint_block_wrappers:
wrapper.set_encoder_mask(branch.mask)
text_embeddings = self.adapter.compute_text_embeddings(
t, config, pooled_embeds
)
image_rotary_embeddings = self.adapter.compute_rotary_embeddings(
branch.embeds,
config,
encoder_hidden_states_mask=branch.mask,
cond_image_grid=cond_image_grid,
kontext_image_ids=kontext_image_ids,
)
noise, encoder_hidden_states = self._run_single_patch_pass(
patch=patch,
patch_idx=patch_idx,
token_indices=token_indices[patch_idx],
prompt_embeds=branch.embeds,
text_embeddings=text_embeddings,
image_rotary_embeddings=image_rotary_embeddings,
encoder_hidden_states=encoder_hidden_states,
)
if self.is_last_stage:
assert noise is not None
results.append((branch.positive, noise))
if self.is_last_stage:
assert noise is not None
if needs_cfg:
noise_pos, noise_neg = mx.split(noise, 2, axis=0)
guidance_scale = self._get_effective_guidance_scale()
assert guidance_scale is not None
noise = self.adapter.apply_guidance(
noise_pos, noise_neg, guidance_scale
)
noise = self._combine_cfg_results(results)
patch_latents[patch_idx] = config.scheduler.step( # pyright: ignore[reportAny]
noise=noise,
@@ -844,7 +954,9 @@ class DiffusionRunner:
if not self.is_first_stage and t != config.num_inference_steps - 1:
patch_latents[patch_idx] = mx.distributed.send(
patch_latents[patch_idx], self.next_rank, group=self.group
patch_latents[patch_idx],
self.first_pipeline_rank,
group=self.group,
)
mx.async_eval(patch_latents[patch_idx])
@@ -884,11 +996,12 @@ class DiffusionRunner:
if self.has_joint_blocks:
if not self.is_first_stage:
assert self.prev_pipeline_rank is not None
patch_len = patch.shape[1]
patch = mx.distributed.recv(
(batch_size, patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(patch)
@@ -897,7 +1010,7 @@ class DiffusionRunner:
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
patch.dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(encoder_hidden_states)
@@ -925,29 +1038,34 @@ class DiffusionRunner:
if self.has_single_blocks or self.is_last_stage:
patch = patch_concat
else:
assert self.next_pipeline_rank is not None
patch_concat = mx.distributed.send(
patch_concat, self.next_rank, group=self.group
patch_concat, self.next_pipeline_rank, group=self.group
)
mx.async_eval(patch_concat)
elif self.has_joint_blocks and not self.is_last_stage:
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
assert self.next_pipeline_rank is not None
patch = mx.distributed.send(
patch, self.next_pipeline_rank, group=self.group
)
mx.async_eval(patch)
if patch_idx == 0:
assert encoder_hidden_states is not None
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
encoder_hidden_states, self.next_pipeline_rank, group=self.group
)
mx.async_eval(encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
assert self.prev_pipeline_rank is not None
patch_len = patch.shape[1]
patch = mx.distributed.recv(
(batch_size, text_seq_len + patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(patch)
@@ -962,7 +1080,10 @@ class DiffusionRunner:
)
if not self.is_last_stage:
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
assert self.next_pipeline_rank is not None
patch = mx.distributed.send(
patch, self.next_pipeline_rank, group=self.group
)
mx.async_eval(patch)
noise: mx.array | None = None

View File

@@ -3,7 +3,6 @@ from copy import deepcopy
from typing import Any, cast
import mlx.core as mx
import psutil
from mlx_lm.models.cache import (
KVCache,
QuantizedKVCache,
@@ -13,29 +12,25 @@ from mlx_lm.models.cache import (
from mlx_lm.models.gpt_oss import Model as GptOssModel
from mlx_lm.tokenizer_utils import TokenizerWrapper
from exo.shared.types.memory import Memory
from exo.shared.types.mlx import KVCacheType
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.constants import CACHE_GROUP_SIZE, KV_CACHE_BITS
from exo.worker.runner.bootstrap import logger
# Fraction of device memory above which LRU eviction kicks in
_DEFAULT_MEMORY_THRESHOLD = 0.9
_DEFAULT_MEMORY_THRESHOLD = 0.85
_MEMORY_THRESHOLD = float(
os.environ.get("EXO_MEMORY_THRESHOLD", _DEFAULT_MEMORY_THRESHOLD)
)
class KVPrefixCache:
def __init__(
self, tokenizer: TokenizerWrapper, group: mx.distributed.Group | None = None
):
def __init__(self, tokenizer: TokenizerWrapper):
self.prompts: list[mx.array] = [] # mx array of tokens (ints)
self.caches: list[KVCacheType] = []
self._last_used: list[int] = [] # monotonic counter of last access per entry
self._access_counter: int = 0
self._tokenizer: TokenizerWrapper = tokenizer
self._group = group
def clear(self):
"""Clear all cached prompts and caches."""
@@ -86,13 +81,13 @@ class KVPrefixCache:
best_snapshot_index, best_snapshot_length = None, 0
for i, cached_prompt in enumerate(self.prompts):
length = get_prefix_length(tokenized_prompt, cached_prompt)
length = _get_prefix_length(tokenized_prompt, cached_prompt)
if length == max_length:
# Exact match - cached prompt starts with our entire prompt
# Trim cache to prompt length - 1, return last token for stream_generate
prompt_cache = deepcopy(self.caches[i])
cached_length = cache_length(self.caches[i])
cached_length = _cache_length(self.caches[i])
tokens_to_trim = cached_length - (max_length - 1)
if tokens_to_trim > 0:
trim_prompt_cache(cast(list[Any], prompt_cache), tokens_to_trim)
@@ -114,7 +109,7 @@ class KVPrefixCache:
prompt_cache = deepcopy(self.caches[best_snapshot_index])
# Trim removes tokens from the end, so we trim (cached_length - prefix_length) to keep the prefix
cached_length = cache_length(self.caches[best_snapshot_index])
cached_length = _cache_length(self.caches[best_snapshot_index])
tokens_to_trim = cached_length - best_snapshot_length
if tokens_to_trim > 0:
trim_prompt_cache(cast(list[Any], prompt_cache), tokens_to_trim)
@@ -136,37 +131,29 @@ class KVPrefixCache:
return prompt_cache, tokenized_prompt, None
def _evict_if_needed(self):
"""Evict least recently used entries while memory usage is high."""
"""Evict least recently used entries while memory pressure is high."""
if len(self.caches) == 0:
return
active: int = mx.metal.get_active_memory()
limit = int(mx.metal.device_info()["max_recommended_working_set_size"])
if active < limit * _MEMORY_THRESHOLD:
return
# Evict LRU entries until below threshold or only one entry left
while (
len(self.caches) > 1
and self.get_memory_used_percentage() > _MEMORY_THRESHOLD
):
while len(self.caches) > 0:
lru_index = self._last_used.index(min(self._last_used))
evicted_tokens = len(self.prompts[lru_index])
self.prompts.pop(lru_index)
self.caches.pop(lru_index)
self._last_used.pop(lru_index)
logger.info(
f"KV cache evicted LRU entry ({evicted_tokens} tokens) due to memory usage"
f"KV cache evicted LRU entry ({evicted_tokens} tokens) due to memory pressure"
)
def get_memory_used_percentage(self) -> float:
local_pressure: float = get_memory_used_percentage()
if self._group is None:
return local_pressure
all_pressure = mx.distributed.all_gather(
mx.array([local_pressure], dtype=mx.float32),
group=self._group,
)
# .item() evals.
max_pressure = float(mx.max(all_pressure).item())
return max_pressure
active = mx.metal.get_active_memory()
if active < limit * _MEMORY_THRESHOLD:
break
def encode_prompt(tokenizer: TokenizerWrapper, prompt: str) -> mx.array:
@@ -181,13 +168,13 @@ def encode_prompt(tokenizer: TokenizerWrapper, prompt: str) -> mx.array:
return mx.array(tokenized_prompt)
def cache_length(cache: KVCacheType) -> int:
def _cache_length(cache: KVCacheType) -> int:
"""Get the number of tokens in a KV cache."""
# Use .offset attribute which all cache types have (len() not implemented in older QuantizedKVCache)
return max(c.offset for c in cache) # type: ignore
def get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
def _get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
"""Find the length of the common prefix between two token arrays."""
n = min(int(prompt.shape[0]), int(cached_prompt.shape[0]))
if n == 0:
@@ -198,17 +185,6 @@ def get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
return int(mx.sum(prefix_mask).item())
def get_available_memory() -> Memory:
mem: int = psutil.virtual_memory().available
return Memory.from_bytes(mem)
def get_memory_used_percentage() -> float:
mem = psutil.virtual_memory()
# percent is 0-100
return float(mem.percent / 100)
def make_kv_cache(
model: Model, max_kv_size: int | None = None, keep: int = 0
) -> KVCacheType:

View File

@@ -10,11 +10,8 @@ from mlx_lm.tokenizer_utils import TokenizerWrapper
from exo.shared.types.api import (
BenchChatCompletionTaskParams,
ChatCompletionMessage,
CompletionTokensDetails,
FinishReason,
GenerationStats,
PromptTokensDetails,
Usage,
)
from exo.shared.types.memory import Memory
from exo.shared.types.mlx import KVCacheType
@@ -42,7 +39,7 @@ def prefill(
sampler: Callable[[mx.array], mx.array],
prompt_tokens: mx.array,
cache: KVCacheType,
) -> tuple[float, int]:
) -> float:
"""Prefill the KV cache with prompt tokens.
This runs the model over the prompt tokens to populate the cache,
@@ -53,7 +50,7 @@ def prefill(
"""
num_tokens = len(prompt_tokens)
if num_tokens == 0:
return 0.0, 0
return 0.0
logger.debug(f"Prefilling {num_tokens} tokens...")
start_time = time.perf_counter()
@@ -88,7 +85,7 @@ def prefill(
f"Prefill complete: {num_tokens} tokens in {elapsed:.2f}s "
f"({tokens_per_sec:.1f} tok/s)"
)
return tokens_per_sec, num_tokens
return tokens_per_sec
def warmup_inference(
@@ -172,8 +169,6 @@ def mlx_generate(
mx.reset_peak_memory()
is_bench: bool = isinstance(task, BenchChatCompletionTaskParams)
logger.info(f"{is_bench=}")
# Currently we support chat-completion tasks only.
logger.debug(f"task_params: {task}")
@@ -209,9 +204,7 @@ def mlx_generate(
)
# Prefill cache with all tokens except the last one
prefill_tps, prefill_tokens = prefill(
model, tokenizer, sampler, prompt_tokens[:-1], caches
)
prefill_tps = prefill(model, tokenizer, sampler, prompt_tokens[:-1], caches)
# stream_generate starts from the last token
last_token = prompt_tokens[-1:]
@@ -219,43 +212,28 @@ def mlx_generate(
max_tokens = task.max_tokens or MAX_TOKENS
generated_text_parts: list[str] = []
generation_start_time = time.perf_counter()
usage: Usage | None = None
in_thinking = False
reasoning_tokens = 0
think_start = tokenizer.think_start
think_end = tokenizer.think_end
for completion_tokens, out in enumerate(
stream_generate(
model=model,
tokenizer=tokenizer,
prompt=last_token,
max_tokens=max_tokens,
sampler=sampler,
logits_processors=logits_processors,
prompt_cache=caches,
# TODO: Dynamically change prefill step size to be the maximum possible without timing out.
prefill_step_size=2048,
kv_group_size=KV_GROUP_SIZE,
kv_bits=KV_BITS,
),
start=1,
for out in stream_generate(
model=model,
tokenizer=tokenizer,
prompt=last_token,
max_tokens=max_tokens,
sampler=sampler,
logits_processors=logits_processors,
prompt_cache=caches,
# TODO: Dynamically change prefill step size to be the maximum possible without timing out.
prefill_step_size=2048,
kv_group_size=KV_GROUP_SIZE,
kv_bits=KV_BITS,
):
generated_text_parts.append(out.text)
logger.info(out.text)
if think_start is not None and out.text == think_start:
in_thinking = True
elif think_end is not None and out.text == think_end:
in_thinking = False
if in_thinking:
reasoning_tokens += 1
stats: GenerationStats | None = None
if out.finish_reason is not None:
stats = GenerationStats(
prompt_tps=float(prefill_tps or out.prompt_tps),
generation_tps=float(out.generation_tps),
prompt_tokens=int(prefill_tokens + out.prompt_tokens),
prompt_tokens=int(out.prompt_tokens),
generation_tokens=int(out.generation_tokens),
peak_memory_usage=Memory.from_gb(out.peak_memory),
)
@@ -267,24 +245,11 @@ def mlx_generate(
f"Model generated unexpected finish_reason: {out.finish_reason}"
)
usage = Usage(
prompt_tokens=int(out.prompt_tokens),
completion_tokens=completion_tokens,
total_tokens=int(out.prompt_tokens) + completion_tokens,
prompt_tokens_details=PromptTokensDetails(
cached_tokens=prefix_hit_length
),
completion_tokens_details=CompletionTokensDetails(
reasoning_tokens=reasoning_tokens
),
)
yield GenerationResponse(
text=out.text,
token=out.token,
finish_reason=cast(FinishReason | None, out.finish_reason),
stats=stats,
usage=usage,
)
if out.finish_reason is not None:

View File

@@ -37,7 +37,6 @@ from exo.shared.types.tasks import (
Shutdown,
StartWarmup,
Task,
TaskId,
TaskStatus,
)
from exo.shared.types.worker.instances import BoundInstance
@@ -62,7 +61,7 @@ from exo.shared.types.worker.runners import (
RunnerStatus,
RunnerWarmingUp,
)
from exo.shared.types.worker.shards import ShardMetadata
from exo.shared.types.worker.shards import PipelineShardMetadata, ShardMetadata
from exo.utils.channels import MpReceiver, MpSender
from exo.worker.engines.image import (
DistributedImageModel,
@@ -112,12 +111,8 @@ def main(
event_sender.send(
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
)
seen = set[TaskId]()
with task_receiver as tasks:
for task in tasks:
if task.task_id in seen:
logger.warning("repeat task - potential error")
seen.add(task.task_id)
event_sender.send(
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Running)
)
@@ -168,7 +163,7 @@ def main(
logger.info(
f"model has_tool_calling={tokenizer.has_tool_calling}"
)
kv_prefix_cache = KVPrefixCache(tokenizer, group)
kv_prefix_cache = KVPrefixCache(tokenizer)
elif (
ModelTask.TextToImage in shard_metadata.model_card.tasks
@@ -282,11 +277,9 @@ def main(
tokenizer.tool_parser, # pyright: ignore[reportAny]
)
completion_tokens = 0
for response in mlx_generator:
match response:
case GenerationResponse():
completion_tokens += 1
if (
device_rank == 0
and response.finish_reason == "error"
@@ -314,7 +307,6 @@ def main(
model=shard_metadata.model_card.model_id,
text=response.text,
token_id=response.token,
usage=response.usage,
finish_reason=response.finish_reason,
stats=response.stats,
),
@@ -328,7 +320,6 @@ def main(
chunk=ToolCallChunk(
tool_calls=response.tool_calls,
model=shard_metadata.model_card.model_id,
usage=response.usage,
),
)
)
@@ -369,8 +360,9 @@ def main(
image_index = 0
for response in generate_image(model=model, task=task_params):
if (
shard_metadata.device_rank
== shard_metadata.world_size - 1
isinstance(shard_metadata, PipelineShardMetadata)
and shard_metadata.is_pipeline_last
and shard_metadata.cfg_rank == 0
):
match response:
case PartialImageResponse():
@@ -396,7 +388,11 @@ def main(
image_index += 1
# can we make this more explicit?
except Exception as e:
if shard_metadata.device_rank == shard_metadata.world_size - 1:
if (
isinstance(shard_metadata, PipelineShardMetadata)
and shard_metadata.is_pipeline_last
and shard_metadata.cfg_rank == 0
):
event_sender.send(
ChunkGenerated(
command_id=command_id,
@@ -428,8 +424,9 @@ def main(
image_index = 0
for response in generate_image(model=model, task=task_params):
if (
shard_metadata.device_rank
== shard_metadata.world_size - 1
isinstance(shard_metadata, PipelineShardMetadata)
and shard_metadata.is_pipeline_last
and shard_metadata.cfg_rank == 0
):
match response:
case PartialImageResponse():
@@ -454,7 +451,11 @@ def main(
)
image_index += 1
except Exception as e:
if shard_metadata.device_rank == shard_metadata.world_size - 1:
if (
isinstance(shard_metadata, PipelineShardMetadata)
and shard_metadata.is_pipeline_last
and shard_metadata.cfg_rank == 0
):
event_sender.send(
ChunkGenerated(
command_id=command_id,
@@ -544,10 +545,10 @@ def parse_gpt_oss(
name=current_tool_name,
arguments="".join(tool_arg_parts).strip(),
)
],
usage=response.usage,
]
)
tool_arg_parts = []
break
current_tool_name = recipient
# If inside a tool call, accumulate arguments
@@ -693,7 +694,7 @@ def parse_tool_calls(
tools = [_validate_single_tool(tool) for tool in parsed]
else:
tools = [_validate_single_tool(parsed)]
yield ToolCallResponse(tool_calls=tools, usage=response.usage)
yield ToolCallResponse(tool_calls=tools)
except (
json.JSONDecodeError,

View File

@@ -127,25 +127,20 @@ class RunnerSupervisor:
self._tg.cancel_scope.cancel()
async def start_task(self, task: Task):
if task.task_id in self.pending:
logger.warning(
f"Skipping invalid task {task} as it has already been submitted"
)
return
if task.task_id in self.completed:
logger.warning(
logger.info(
f"Skipping invalid task {task} as it has already been completed"
)
return
logger.info(f"Starting task {task}")
event = anyio.Event()
self.pending[task.task_id] = event
try:
await self._task_sender.send_async(task)
self._task_sender.send(task)
except ClosedResourceError:
logger.warning(f"Task {task} dropped, runner closed communication.")
return
await event.wait()
logger.info(f"Finished task {task}")
async def _forward_events(self):
with self._ev_recv as events:

View File

@@ -14,9 +14,9 @@ from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.cache import (
KVPrefixCache,
cache_length,
_cache_length,
_get_prefix_length,
encode_prompt,
get_prefix_length,
make_kv_cache,
)
from exo.worker.engines.mlx.generator.generate import mlx_generate, prefill
@@ -35,47 +35,47 @@ class TestGetPrefixLength:
def test_identical_arrays(self):
a = mx.array([1, 2, 3, 4, 5])
b = mx.array([1, 2, 3, 4, 5])
assert get_prefix_length(a, b) == 5
assert _get_prefix_length(a, b) == 5
def test_no_common_prefix(self):
a = mx.array([1, 2, 3])
b = mx.array([4, 5, 6])
assert get_prefix_length(a, b) == 0
assert _get_prefix_length(a, b) == 0
def test_partial_prefix(self):
a = mx.array([1, 2, 3, 4, 5])
b = mx.array([1, 2, 3, 7, 8])
assert get_prefix_length(a, b) == 3
assert _get_prefix_length(a, b) == 3
def test_prompt_longer_than_cached(self):
a = mx.array([1, 2, 3, 4, 5])
b = mx.array([1, 2, 3])
assert get_prefix_length(a, b) == 3
assert _get_prefix_length(a, b) == 3
def test_cached_longer_than_prompt(self):
a = mx.array([1, 2, 3])
b = mx.array([1, 2, 3, 4, 5])
assert get_prefix_length(a, b) == 3
assert _get_prefix_length(a, b) == 3
def test_single_token_match(self):
a = mx.array([1, 2, 3])
b = mx.array([1, 5, 6])
assert get_prefix_length(a, b) == 1
assert _get_prefix_length(a, b) == 1
def test_empty_prompt(self):
a = mx.array([]).astype(mx.int32)
b = mx.array([1, 2, 3])
assert get_prefix_length(a, b) == 0
assert _get_prefix_length(a, b) == 0
def test_empty_cached(self):
a = mx.array([1, 2, 3])
b = mx.array([]).astype(mx.int32)
assert get_prefix_length(a, b) == 0
assert _get_prefix_length(a, b) == 0
def test_both_empty(self):
a = mx.array([]).astype(mx.int32)
b = mx.array([]).astype(mx.int32)
assert get_prefix_length(a, b) == 0
assert _get_prefix_length(a, b) == 0
class TestKVPrefix:
@@ -146,7 +146,7 @@ class TestKVPrefixCacheWithModel:
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
# Cache should now hold the prompt tokens
assert cache_length(cache) == len(tokens)
assert _cache_length(cache) == len(tokens)
def test_add_and_get_exact_match(self, model_and_tokenizer):
model, tokenizer = model_and_tokenizer
@@ -166,7 +166,7 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache.add_kv_cache(prompt, cache)
assert len(kv_prefix_cache.prompts) == 1
stored_length = cache_length(kv_prefix_cache.caches[0])
stored_length = _cache_length(kv_prefix_cache.caches[0])
assert stored_length > 0
# Retrieve with same prompt: exact match
@@ -209,7 +209,7 @@ class TestKVPrefixCacheWithModel:
long_tokens = encode_prompt(tokenizer, long_prompt)
# The prompts share a prefix (chat template preamble + "Hi")
expected_prefix = get_prefix_length(long_tokens, short_tokens)
expected_prefix = _get_prefix_length(long_tokens, short_tokens)
assert expected_prefix > 0, (
"Prompts should share a prefix from the chat template"
)
@@ -243,7 +243,7 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
stored_length = cache_length(kv_prefix_cache.caches[0])
stored_length = _cache_length(kv_prefix_cache.caches[0])
# Get cache and mutate it (simulating what generation does)
result_cache, _, matched_index = kv_prefix_cache.get_kv_cache(model, prompt)
@@ -259,7 +259,7 @@ class TestKVPrefixCacheWithModel:
mx.eval([c.keys for c in result_cache])
# Stored cache must be unchanged
assert cache_length(kv_prefix_cache.caches[0]) == stored_length
assert _cache_length(kv_prefix_cache.caches[0]) == stored_length
def test_stored_cache_survives_repeated_get_mutate_cycles(
self, model_and_tokenizer
@@ -281,7 +281,7 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
stored_length = cache_length(kv_prefix_cache.caches[0])
stored_length = _cache_length(kv_prefix_cache.caches[0])
for i in range(3):
result_cache, _, _ = kv_prefix_cache.get_kv_cache(model, prompt)
@@ -293,7 +293,7 @@ class TestKVPrefixCacheWithModel:
layer_cache.update_and_fetch(extra, extra)
mx.eval([c.keys for c in result_cache])
assert cache_length(kv_prefix_cache.caches[0]) == stored_length, (
assert _cache_length(kv_prefix_cache.caches[0]) == stored_length, (
f"Failed on loop {i}"
)
@@ -325,7 +325,7 @@ class TestKVPrefixCacheWithModel:
assert len(kv_prefix_cache.caches) == 1
# Cache should contain prompt + generated tokens
expected_length = len(prompt_tokens) + generated_tokens
assert cache_length(kv_prefix_cache.caches[0]) == expected_length
assert _cache_length(kv_prefix_cache.caches[0]) == expected_length
def test_mlx_generate_second_call_gets_prefix_hit(self, model_and_tokenizer):
"""Second mlx_generate call with same prompt should get a prefix hit from stored cache."""
@@ -400,7 +400,7 @@ class TestKVPrefixCacheWithModel:
first_gen_time = time.perf_counter() - t0
assert len(kv_prefix_cache.prompts) == 1
first_cache_length = cache_length(kv_prefix_cache.caches[0])
first_cache_length = _cache_length(kv_prefix_cache.caches[0])
# Second generation: same long prompt + extra content (simulating multi-turn)
task2 = ChatCompletionTaskParams(
@@ -416,7 +416,7 @@ class TestKVPrefixCacheWithModel:
prompt2_tokens = encode_prompt(tokenizer, prompt2)
# Verify the prompts share a long prefix
prefix_len = get_prefix_length(prompt2_tokens, prompt1_tokens)
prefix_len = _get_prefix_length(prompt2_tokens, prompt1_tokens)
assert prefix_len > 1000, "Prompts must share > 1000 token prefix"
# Second generation should reuse the cached prefix (only prefill new tokens)
@@ -440,7 +440,7 @@ class TestKVPrefixCacheWithModel:
# With prefix_hit > 1000, should update in-place (not add a second entry)
assert len(kv_prefix_cache.prompts) == 1
# Updated cache should be longer (prompt2 + generated > prompt1 + generated)
updated_cache_length = cache_length(kv_prefix_cache.caches[0])
updated_cache_length = _cache_length(kv_prefix_cache.caches[0])
assert updated_cache_length > first_cache_length
def test_mlx_generate_stored_cache_not_mutated(self, model_and_tokenizer):
@@ -465,7 +465,7 @@ class TestKVPrefixCacheWithModel:
):
pass
firstcache_length = cache_length(kv_prefix_cache.caches[0])
first_cache_length = _cache_length(kv_prefix_cache.caches[0])
# Second generation gets the cache and mutates it during generation
for _response in mlx_generate(
@@ -478,7 +478,7 @@ class TestKVPrefixCacheWithModel:
pass
# The first stored cache must not have been mutated by the second generation
assert cache_length(kv_prefix_cache.caches[0]) == firstcache_length
assert _cache_length(kv_prefix_cache.caches[0]) == first_cache_length
def test_evicts_lru_entry_under_memory_pressure(self, model_and_tokenizer):
"""Under memory pressure, adding a new cache entry evicts the least recently used one."""
@@ -540,6 +540,6 @@ class TestKVPrefixCacheWithModel:
assert len(kv_prefix_cache.prompts) == 1
# The surviving entry should be the newly added one
new_tokens = encode_prompt(tokenizer, prompt)
assert get_prefix_length(kv_prefix_cache.prompts[0], new_tokens) == len(
assert _get_prefix_length(kv_prefix_cache.prompts[0], new_tokens) == len(
new_tokens
)

View File

@@ -109,8 +109,8 @@ def assert_events_equal(test_events: Iterable[Event], true_events: Iterable[Even
@pytest.fixture
def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
# initialize_mlx returns a mock group
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(MockGroup()))
# initialize_mlx returns a "group" equal to 1
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(1))
monkeypatch.setattr(mlx_runner, "load_mlx_items", make_nothin((1, MockTokenizer)))
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", nothin)
@@ -120,7 +120,7 @@ def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(mlx_runner, "detect_thinking_prompt_suffix", make_nothin(False))
def fake_generate(*_1: object, **_2: object):
yield GenerationResponse(token=0, text="hi", finish_reason="stop", usage=None)
yield GenerationResponse(token=0, text="hi", finish_reason="stop")
monkeypatch.setattr(mlx_runner, "mlx_generate", fake_generate)
@@ -147,14 +147,6 @@ class MockTokenizer:
has_tool_calling = False
class MockGroup:
def rank(self) -> int:
return 0
def size(self) -> int:
return 1
def _run(tasks: Iterable[Task]):
bound_instance = get_bound_mlx_ring_instance(
instance_id=INSTANCE_1_ID,
@@ -190,8 +182,6 @@ def test_events_processed_in_correct_order(patch_out_mlx: pytest.MonkeyPatch):
text="hi",
token_id=0,
finish_reason="stop",
usage=None,
stats=None,
),
)

View File

@@ -1,18 +0,0 @@
{
"$schema": "https://opencode.ai/config.json",
"model": "exo/mlx-community/gpt-oss-120b-MXFP4-Q8",
"provider": {
"exo": {
"api": "http://localhost:52415/v1",
"models": {
"mlx-community/gpt-oss-120b-MXFP4-Q8": {
"name": "GPT OSS 120B",
"limit": {
"context": 32768,
"output": 8192
}
}
}
}
}
}