Compare commits

...

10 Commits

Author SHA1 Message Date
ciaranbor
958ccf7250 Add CfgShardMetadata 2026-02-04 14:01:46 +00:00
ciaranbor
bac6d2f57c Fix image models 2026-02-04 14:01:46 +00:00
ciaranbor
e1e5e8a53a Simplify placement 2026-02-04 14:00:03 +00:00
ciaranbor
65a62768da Use 0.25 num_sync_steps_factor for Qwen models 2026-02-04 14:00:03 +00:00
ciaranbor
7cf3c57cdc Eval noise before CFG exchange 2026-02-04 14:00:03 +00:00
ciaranbor
5f188553cb Reverse negative CFG group ranks (enables guidance exchange for ring topology) 2026-02-04 14:00:03 +00:00
ciaranbor
289e3cb5f2 Fix image yield for CFG parallel 2026-02-04 14:00:03 +00:00
ciaranbor
eae0365d99 Implement parallel CFG 2026-02-04 14:00:03 +00:00
ciaranbor
a951b00c62 Update API to set seed to maintain consistency between nodes 2026-02-04 14:00:03 +00:00
ciaranbor
faef685ac6 Support parallel CFG sharding 2026-02-04 14:00:03 +00:00
20 changed files with 879 additions and 310 deletions

View File

@@ -3,6 +3,7 @@ n_layers = 60
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
uses_cfg = true
[storage_size]
in_bytes = 26799533856

View File

@@ -3,6 +3,7 @@ n_layers = 60
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
uses_cfg = true
[storage_size]
in_bytes = 37014734400

View File

@@ -3,6 +3,7 @@ n_layers = 60
hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
uses_cfg = true
[storage_size]
in_bytes = 26799533856

View File

@@ -3,6 +3,7 @@ n_layers = 60
hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
uses_cfg = true
[storage_size]
in_bytes = 37014734400

View File

@@ -3,6 +3,7 @@ n_layers = 60
hidden_size = 1
supports_tensor = false
tasks = ["ImageToImage"]
uses_cfg = true
[storage_size]
in_bytes = 57445135488

View File

@@ -3,6 +3,7 @@ n_layers = 60
hidden_size = 1
supports_tensor = false
tasks = ["TextToImage"]
uses_cfg = true
[storage_size]
in_bytes = 57445135488

View File

@@ -1,6 +1,7 @@
import base64
import contextlib
import json
import random
import time
from collections.abc import AsyncGenerator, Awaitable, Callable
from datetime import datetime, timezone
@@ -150,6 +151,15 @@ def _format_to_content_type(image_format: Literal["png", "jpeg", "webp"] | None)
return f"image/{image_format or 'png'}"
def _ensure_seed(params: AdvancedImageParams | None) -> AdvancedImageParams:
"""Ensure advanced params has a seed set for distributed consistency."""
if params is None:
return AdvancedImageParams(seed=random.randint(0, 2**32 - 1))
if params.seed is None:
return params.model_copy(update={"seed": random.randint(0, 2**32 - 1)})
return params
class API:
def __init__(
self,
@@ -709,6 +719,9 @@ class API:
with SSE-formatted events for partial and final images.
"""
payload.model = await self._validate_image_model(ModelId(payload.model))
payload = payload.model_copy(
update={"advanced_params": _ensure_seed(payload.advanced_params)}
)
command = ImageGeneration(
task_params=payload,
@@ -957,6 +970,9 @@ class API:
payload.stream = False
payload.partial_images = 0
payload = payload.model_copy(
update={"advanced_params": _ensure_seed(payload.advanced_params)}
)
command = ImageGeneration(
task_params=payload,
@@ -988,6 +1004,7 @@ class API:
) -> ImageEdits:
"""Prepare and send an image edits command with chunked image upload."""
resolved_model = await self._validate_image_model(model)
advanced_params = _ensure_seed(advanced_params)
image_content = await image.read()
image_data = base64.b64encode(image_content).decode("utf-8")

View File

@@ -10,6 +10,7 @@ from exo.shared.types.profiling import MemoryUsage, NodeNetworkInfo
from exo.shared.types.topology import Cycle, RDMAConnection, SocketConnection
from exo.shared.types.worker.runners import RunnerId, ShardAssignments
from exo.shared.types.worker.shards import (
CfgShardMetadata,
PipelineShardMetadata,
Sharding,
ShardMetadata,
@@ -79,6 +80,118 @@ def get_shard_assignments_for_pipeline_parallel(
cycle: Cycle,
node_memory: Mapping[NodeId, MemoryUsage],
):
"""Create shard assignments for pipeline parallel execution.
Routes to CFG-aware or standard pipeline placement based on model_card.uses_cfg.
"""
# Determine CFG parallelism topology
# CFG parallel only for even node counts with CFG models (2+ nodes)
world_size = len(cycle)
use_cfg_parallel = model_card.uses_cfg and world_size >= 2 and world_size % 2 == 0
if use_cfg_parallel:
return _get_shard_assignments_for_cfg_parallel(model_card, cycle, node_memory)
else:
return _get_shard_assignments_for_pure_pipeline(model_card, cycle, node_memory)
def _get_shard_assignments_for_cfg_parallel(
model_card: ModelCard,
cycle: Cycle,
node_memory: Mapping[NodeId, MemoryUsage],
) -> ShardAssignments:
"""Create shard assignments for CFG parallel execution (returns CfgShardMetadata)."""
if not cycle.node_ids:
raise ValueError("Cannot create shard assignments for empty node cycle")
cycle_memory = sum(
(node_memory[node_id].ram_available for node_id in cycle.node_ids),
start=Memory(),
)
if cycle_memory.in_bytes == 0:
raise ValueError("Cannot create shard assignments: total available memory is 0")
total_layers = model_card.n_layers
world_size = len(cycle)
runner_to_shard: dict[RunnerId, ShardMetadata] = {}
node_to_runner: dict[NodeId, RunnerId] = {}
cfg_world_size = 2
pipeline_world_size = world_size // cfg_world_size
# For CFG parallel, we only need to allocate layers for one pipeline group
# (both CFG groups run the same layers). Use the first pipeline group's nodes.
pipeline_node_ids = cycle.node_ids[:pipeline_world_size]
pipeline_memory = sum(
(node_memory[node_id].ram_available for node_id in pipeline_node_ids),
start=Memory(),
)
layer_allocations = allocate_layers_proportionally(
total_layers=total_layers,
memory_fractions=[
node_memory[node_id].ram_available.in_bytes / pipeline_memory.in_bytes
for node_id in pipeline_node_ids
],
)
# Validate each pipeline node has sufficient memory for its assigned layers
# Use integer arithmetic to avoid floating point precision issues
total_storage_bytes = model_card.storage_size.in_bytes
for i, node_id in enumerate(pipeline_node_ids):
node_layers = layer_allocations[i]
# Integer division then multiply to get conservative estimate
required_memory = (total_storage_bytes * node_layers) // total_layers
available_memory = node_memory[node_id].ram_available.in_bytes
if required_memory > available_memory:
raise ValueError(
f"Node {i} ({node_id}) has insufficient memory: "
f"requires {required_memory / (1024**3):.2f} GB for {node_layers} layers, "
f"but only has {available_memory / (1024**3):.2f} GB available"
)
# CFG group 0: pipeline ranks in ascending order (0, 1, 2, ...)
# CFG group 1: pipeline ranks in descending order (reversed)
# This places both "last stages" as ring neighbors for CFG exchange.
position_to_cfg_pipeline = [(0, r) for r in range(pipeline_world_size)] + [
(1, r) for r in reversed(range(pipeline_world_size))
]
for i, node_id in enumerate(cycle.node_ids):
cfg_rank, pipeline_rank = position_to_cfg_pipeline[i]
layers_before = sum(layer_allocations[:pipeline_rank])
node_layers = layer_allocations[pipeline_rank]
runner_id = RunnerId()
shard = CfgShardMetadata(
model_card=model_card,
device_rank=i,
world_size=world_size,
start_layer=layers_before,
end_layer=layers_before + node_layers,
n_layers=total_layers,
cfg_rank=cfg_rank,
cfg_world_size=cfg_world_size,
)
runner_to_shard[runner_id] = shard
node_to_runner[node_id] = runner_id
return ShardAssignments(
model_id=model_card.model_id,
runner_to_shard=runner_to_shard,
node_to_runner=node_to_runner,
)
def _get_shard_assignments_for_pure_pipeline(
model_card: ModelCard,
cycle: Cycle,
node_memory: Mapping[NodeId, MemoryUsage],
) -> ShardAssignments:
"""Create shard assignments for pure pipeline execution (returns PipelineShardMetadata)."""
if not cycle.node_ids:
raise ValueError("Cannot create shard assignments for empty node cycle")
@@ -103,11 +216,10 @@ def get_shard_assignments_for_pipeline_parallel(
)
# Validate each node has sufficient memory for its assigned layers
memory_per_layer = model_card.storage_size.in_bytes / total_layers
for i, (node_id, node_layers) in enumerate(
zip(cycle.node_ids, layer_allocations, strict=True)
):
required_memory = node_layers * memory_per_layer
total_storage_bytes = model_card.storage_size.in_bytes
for i, node_id in enumerate(cycle.node_ids):
node_layers = layer_allocations[i]
required_memory = (total_storage_bytes * node_layers) // total_layers
available_memory = node_memory[node_id].ram_available.in_bytes
if required_memory > available_memory:
raise ValueError(
@@ -116,33 +228,30 @@ def get_shard_assignments_for_pipeline_parallel(
f"but only has {available_memory / (1024**3):.2f} GB available"
)
layers_assigned = 0
for i, (node_id, node_layers) in enumerate(
zip(cycle.node_ids, layer_allocations, strict=True)
):
for i, node_id in enumerate(cycle.node_ids):
layers_before = sum(layer_allocations[:i])
node_layers = layer_allocations[i]
runner_id = RunnerId()
shard = PipelineShardMetadata(
model_card=model_card,
device_rank=i,
world_size=world_size,
start_layer=layers_assigned,
end_layer=layers_assigned + node_layers,
start_layer=layers_before,
end_layer=layers_before + node_layers,
n_layers=total_layers,
)
runner_to_shard[runner_id] = shard
node_to_runner[node_id] = runner_id
layers_assigned += node_layers
shard_assignments = ShardAssignments(
return ShardAssignments(
model_id=model_card.model_id,
runner_to_shard=runner_to_shard,
node_to_runner=node_to_runner,
)
return shard_assignments
def get_shard_assignments_for_tensor_parallel(
model_card: ModelCard,

View File

@@ -5,6 +5,7 @@ from exo.master.placement_utils import (
filter_cycles_by_memory,
get_mlx_jaccl_coordinators,
get_shard_assignments,
get_shard_assignments_for_pipeline_parallel,
get_smallest_cycles,
)
from exo.master.tests.conftest import (
@@ -20,7 +21,11 @@ from exo.shared.types.profiling import (
NodeNetworkInfo,
)
from exo.shared.types.topology import Connection, SocketConnection
from exo.shared.types.worker.shards import Sharding
from exo.shared.types.worker.shards import (
CfgShardMetadata,
PipelineShardMetadata,
Sharding,
)
def test_filter_cycles_by_memory():
@@ -487,3 +492,189 @@ def test_get_shard_assignments_insufficient_memory_raises():
get_shard_assignments(
model_card, selected_cycle, Sharding.Pipeline, node_memory
)
class TestCfgParallelPlacement:
def _create_ring_topology(self, node_ids: list[NodeId]) -> Topology:
topology = Topology()
for node_id in node_ids:
topology.add_node(node_id)
for i, node_id in enumerate(node_ids):
next_node = node_ids[(i + 1) % len(node_ids)]
conn = Connection(
source=node_id,
sink=next_node,
edge=create_socket_connection(i + 1),
)
topology.add_connection(conn)
return topology
def test_two_nodes_cfg_model_uses_cfg_parallel(self):
"""Two nodes with CFG model should use CFG parallel (no pipeline)."""
node_a = NodeId()
node_b = NodeId()
topology = self._create_ring_topology([node_a, node_b])
cycles = [c for c in topology.get_cycles() if len(c) == 2]
cycle = cycles[0]
node_memory = {
node_a: create_node_memory(1000 * 1024),
node_b: create_node_memory(1000 * 1024),
}
model_card = ModelCard(
model_id=ModelId("qwen-image-test"),
n_layers=60,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=True,
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 2
# CFG models should get CfgShardMetadata
for shard in shards:
assert isinstance(shard, CfgShardMetadata)
# Both nodes should have all layers (no pipeline split)
assert shard.start_layer == 0
assert shard.end_layer == 60
assert shard.cfg_world_size == 2
cfg_ranks = sorted(
s.cfg_rank for s in shards if isinstance(s, CfgShardMetadata)
)
assert cfg_ranks == [0, 1]
def test_four_nodes_cfg_model_uses_hybrid(self):
"""Four nodes with CFG model should use 2 CFG groups x 2 pipeline stages."""
nodes = [NodeId() for _ in range(4)]
topology = self._create_ring_topology(nodes)
cycles = [c for c in topology.get_cycles() if len(c) == 4]
cycle = cycles[0]
node_memory = {n: create_node_memory(1000 * 1024) for n in nodes}
model_card = ModelCard(
model_id=ModelId("qwen-image-test"),
n_layers=60,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=True,
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 4
# CFG models should get CfgShardMetadata
for shard in shards:
assert isinstance(shard, CfgShardMetadata)
assert shard.cfg_world_size == 2
assert shard.world_size // shard.cfg_world_size == 2
# Check we have 2 nodes in each CFG group
cfg_0_shards = [
s for s in shards if isinstance(s, CfgShardMetadata) and s.cfg_rank == 0
]
cfg_1_shards = [
s for s in shards if isinstance(s, CfgShardMetadata) and s.cfg_rank == 1
]
assert len(cfg_0_shards) == 2
assert len(cfg_1_shards) == 2
# Both CFG groups should have the same layer assignments
cfg_0_layers = [(s.start_layer, s.end_layer) for s in cfg_0_shards]
cfg_1_layers = [(s.start_layer, s.end_layer) for s in cfg_1_shards]
assert sorted(cfg_0_layers) == sorted(cfg_1_layers)
def test_three_nodes_cfg_model_uses_sequential_cfg(self):
"""Three nodes (odd) with CFG model should use sequential CFG (PipelineShardMetadata)."""
nodes = [NodeId() for _ in range(3)]
topology = self._create_ring_topology(nodes)
cycles = [c for c in topology.get_cycles() if len(c) == 3]
cycle = cycles[0]
node_memory = {n: create_node_memory(1000 * 1024) for n in nodes}
model_card = ModelCard(
model_id=ModelId("qwen-image-test"),
n_layers=60,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=True,
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 3
# Odd node count with CFG model falls back to PipelineShardMetadata (sequential CFG)
for shard in shards:
assert isinstance(shard, PipelineShardMetadata)
def test_two_nodes_non_cfg_model_uses_pipeline(self):
"""Two nodes with non-CFG model should use pure pipeline (PipelineShardMetadata)."""
node_a = NodeId()
node_b = NodeId()
topology = self._create_ring_topology([node_a, node_b])
cycles = [c for c in topology.get_cycles() if len(c) == 2]
cycle = cycles[0]
node_memory = {
node_a: create_node_memory(1000 * 1024),
node_b: create_node_memory(1000 * 1024),
}
model_card = ModelCard(
model_id=ModelId("flux-test"),
n_layers=57,
storage_size=Memory.from_kb(1000),
hidden_size=1,
supports_tensor=False,
uses_cfg=False, # Non-CFG model
tasks=[ModelTask.TextToImage],
)
assignments = get_shard_assignments_for_pipeline_parallel(
model_card, cycle, node_memory
)
shards = list(assignments.runner_to_shard.values())
assert len(shards) == 2
# Non-CFG models should get PipelineShardMetadata
for shard in shards:
assert isinstance(shard, PipelineShardMetadata)
# Should have actual layer sharding (pipeline)
layer_ranges = sorted(
(s.start_layer, s.end_layer)
for s in shards
if isinstance(s, PipelineShardMetadata)
)
# First shard starts at 0, last shard ends at 57
assert layer_ranges[0][0] == 0
assert layer_ranges[-1][1] == 57

View File

@@ -65,9 +65,9 @@ class ComponentInfo(CamelCaseModel):
component_name: str
component_path: str
storage_size: Memory
n_layers: PositiveInt | None
n_layers: PositiveInt | None = None
can_shard: bool
safetensors_index_filename: str | None
safetensors_index_filename: str | None = None
class ModelCard(CamelCaseModel):
@@ -82,6 +82,7 @@ class ModelCard(CamelCaseModel):
quantization: str = ""
base_model: str = ""
capabilities: list[str] = []
uses_cfg: bool = False
@field_validator("tasks", mode="before")
@classmethod
@@ -155,87 +156,6 @@ def is_custom_card(model_id: ModelId) -> bool:
return os.path.isfile(str(card_path))
# TODO: quantizing and dynamically creating model cards
def _generate_image_model_quant_variants( # pyright: ignore[reportUnusedFunction]
base_name: str,
base_card: ModelCard,
) -> dict[str, ModelCard]:
"""Create quantized variants of an image model card.
Only the transformer component is quantized; text encoders stay at bf16.
Sizes are calculated exactly from the base card's component sizes.
"""
if base_card.components is None:
raise ValueError(f"Image model {base_name} must have components defined")
# quantizations = [8, 6, 5, 4, 3]
quantizations = [8, 4]
num_transformer_bytes = next(
c.storage_size.in_bytes
for c in base_card.components
if c.component_name == "transformer"
)
transformer_bytes = Memory.from_bytes(num_transformer_bytes)
remaining_bytes = Memory.from_bytes(
sum(
c.storage_size.in_bytes
for c in base_card.components
if c.component_name != "transformer"
)
)
def with_transformer_size(new_size: Memory) -> list[ComponentInfo]:
assert base_card.components is not None
return [
ComponentInfo(
component_name=c.component_name,
component_path=c.component_path,
storage_size=new_size
if c.component_name == "transformer"
else c.storage_size,
n_layers=c.n_layers,
can_shard=c.can_shard,
safetensors_index_filename=c.safetensors_index_filename,
)
for c in base_card.components
]
variants = {
base_name: ModelCard(
model_id=base_card.model_id,
storage_size=transformer_bytes + remaining_bytes,
n_layers=base_card.n_layers,
hidden_size=base_card.hidden_size,
supports_tensor=base_card.supports_tensor,
tasks=base_card.tasks,
components=with_transformer_size(transformer_bytes),
)
}
for quant in quantizations:
quant_transformer_bytes = Memory.from_bytes(
(num_transformer_bytes * quant) // 16
)
total_bytes = remaining_bytes + quant_transformer_bytes
model_id = ModelId(base_card.model_id + f"-{quant}bit")
variants[f"{base_name}-{quant}bit"] = ModelCard(
model_id=model_id,
storage_size=total_bytes,
n_layers=base_card.n_layers,
hidden_size=base_card.hidden_size,
supports_tensor=base_card.supports_tensor,
tasks=base_card.tasks,
components=with_transformer_size(quant_transformer_bytes),
)
return variants
class ConfigData(BaseModel):
model_config = {"extra": "ignore"} # Allow unknown fields

View File

@@ -1,4 +1,5 @@
from enum import Enum
from typing import TypeAlias, final
from pydantic import Field
@@ -51,6 +52,7 @@ class BaseShardMetadata(TaggedModel):
)
@final
class PipelineShardMetadata(BaseShardMetadata):
"""
Pipeline parallelism shard meta.
@@ -60,8 +62,23 @@ class PipelineShardMetadata(BaseShardMetadata):
"""
@final
class CfgShardMetadata(BaseShardMetadata):
"""
Shard metadata for CFG-enabled models.
Effectively two separate instances of pipeline parallel
"""
cfg_rank: int # 0 = positive branch, 1 = negative branch
cfg_world_size: int = 2
@final
class TensorShardMetadata(BaseShardMetadata):
pass
ShardMetadata = PipelineShardMetadata | TensorShardMetadata
ShardMetadata: TypeAlias = (
PipelineShardMetadata | CfgShardMetadata | TensorShardMetadata
)

View File

@@ -9,7 +9,7 @@ from PIL import Image
from exo.download.download_utils import build_model_path
from exo.shared.types.api import AdvancedImageParams
from exo.shared.types.worker.instances import BoundInstance
from exo.shared.types.worker.shards import PipelineShardMetadata
from exo.shared.types.worker.shards import CfgShardMetadata, PipelineShardMetadata
from exo.worker.engines.image.config import ImageModelConfig
from exo.worker.engines.image.models import (
create_adapter_for_model,
@@ -30,14 +30,19 @@ class DistributedImageModel:
self,
model_id: str,
local_path: Path,
shard_metadata: PipelineShardMetadata,
shard_metadata: PipelineShardMetadata | CfgShardMetadata,
group: Optional[mx.distributed.Group] = None,
quantize: int | None = None,
):
config = get_config_for_model(model_id)
adapter = create_adapter_for_model(config, model_id, local_path, quantize)
if group is not None:
has_layer_sharding = (
shard_metadata.start_layer != 0
or shard_metadata.end_layer != shard_metadata.n_layers
)
if group is not None and has_layer_sharding:
adapter.slice_transformer_blocks(
start_layer=shard_metadata.start_layer,
end_layer=shard_metadata.end_layer,
@@ -75,8 +80,10 @@ class DistributedImageModel:
model_path = build_model_path(model_id)
shard_metadata = bound_instance.bound_shard
if not isinstance(shard_metadata, PipelineShardMetadata):
raise ValueError("Expected PipelineShardMetadata for image generation")
if not isinstance(shard_metadata, (PipelineShardMetadata, CfgShardMetadata)):
raise ValueError(
"Expected PipelineShardMetadata or CfgShardMetadata for image generation"
)
is_distributed = (
len(bound_instance.instance.shard_assignments.node_to_runner) > 1

View File

@@ -86,6 +86,27 @@ class PromptData(ABC):
"""
...
@abstractmethod
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
"""Get embeddings for a single CFG branch (positive or negative).
Used for sequential CFG and CFG parallel modes where we process
one branch at a time instead of batching.
Args:
positive: True for positive prompt, False for negative prompt
Returns:
Tuple of:
- embeds: [1, seq, hidden] prompt embeddings
- mask: [1, seq] attention mask or None
- pooled: [1, hidden] pooled embeddings or None
- conditioning_latents: [1, latent_seq, latent_dim] or None
"""
...
class ModelAdapter(ABC, Generic[ModelT, TransformerT]):
_config: ImageModelConfig

View File

@@ -64,6 +64,12 @@ class FluxPromptData(PromptData):
) -> tuple[mx.array, mx.array, mx.array | None, mx.array | None] | None:
return None
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
"""Flux doesn't use CFG, but we return positive data for compatibility."""
return (self._prompt_embeds, None, self._pooled_prompt_embeds, None)
class FluxModelAdapter(ModelAdapter[Flux1, Transformer]):
def __init__(

View File

@@ -133,6 +133,24 @@ class QwenPromptData(PromptData):
return batched_embeds, batched_mask, None, cond_latents
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
if positive:
return (
self._prompt_embeds,
self._prompt_mask,
None,
self.conditioning_latents,
)
else:
return (
self._negative_prompt_embeds,
self._negative_prompt_mask,
None,
self.conditioning_latents,
)
class QwenModelAdapter(ModelAdapter[QwenImage, QwenTransformer]):
"""Adapter for Qwen-Image model.

View File

@@ -12,7 +12,7 @@ QWEN_IMAGE_CONFIG = ImageModelConfig(
),
),
default_steps={"low": 10, "medium": 25, "high": 50},
num_sync_steps_factor=0.125, # ~3 sync steps for medium (30 steps)
num_sync_steps_factor=0.25,
guidance_scale=3.5, # Set to None or < 1.0 to disable CFG
)
@@ -24,6 +24,6 @@ QWEN_IMAGE_EDIT_CONFIG = ImageModelConfig(
),
),
default_steps={"low": 10, "medium": 25, "high": 50},
num_sync_steps_factor=0.125,
num_sync_steps_factor=0.25,
guidance_scale=3.5,
)

View File

@@ -153,6 +153,24 @@ class QwenEditPromptData(PromptData):
return batched_embeds, batched_mask, None, batched_cond_latents
def get_cfg_branch_data(
self, positive: bool
) -> tuple[mx.array, mx.array | None, mx.array | None, mx.array | None]:
if positive:
return (
self._prompt_embeds,
self._prompt_mask,
None,
self._conditioning_latents,
)
else:
return (
self._negative_prompt_embeds,
self._negative_prompt_mask,
None,
self._conditioning_latents,
)
class QwenEditModelAdapter(ModelAdapter[QwenImageEdit, QwenTransformer]):
"""Adapter for Qwen-Image-Edit model.

View File

@@ -1,5 +1,7 @@
from collections.abc import Iterator
from dataclasses import dataclass
from math import ceil
from typing import Any, Optional
from typing import Any, Optional, final
import mlx.core as mx
from mflux.models.common.config.config import Config
@@ -11,7 +13,7 @@ from exo.shared.tracing import (
clear_trace_buffer,
trace,
)
from exo.shared.types.worker.shards import PipelineShardMetadata
from exo.shared.types.worker.shards import CfgShardMetadata, PipelineShardMetadata
from exo.worker.engines.image.config import ImageModelConfig
from exo.worker.engines.image.models.base import (
ModelAdapter,
@@ -25,6 +27,16 @@ from exo.worker.engines.image.pipeline.block_wrapper import (
)
@final
@dataclass(frozen=True)
class CfgBranch:
positive: bool
embeds: mx.array
mask: mx.array | None
pooled: mx.array | None
cond_latents: mx.array | None
def calculate_patch_heights(
latent_height: int, num_patches: int
) -> tuple[list[int], int]:
@@ -70,29 +82,18 @@ class DiffusionRunner:
config: ImageModelConfig,
adapter: ModelAdapter[Any, Any],
group: Optional[mx.distributed.Group],
shard_metadata: PipelineShardMetadata,
shard_metadata: PipelineShardMetadata | CfgShardMetadata,
num_patches: Optional[int] = None,
):
self.config = config
self.adapter = adapter
self.group = group
if group is None:
self.rank = 0
self.world_size = 1
self.next_rank = 0
self.prev_rank = 0
self.start_layer = 0
self.end_layer = config.total_blocks
else:
self.rank = shard_metadata.device_rank
self.world_size = shard_metadata.world_size
self.next_rank = (self.rank + 1) % self.world_size
self.prev_rank = (self.rank - 1 + self.world_size) % self.world_size
self.start_layer = shard_metadata.start_layer
self.end_layer = shard_metadata.end_layer
self._init_cfg_topology(shard_metadata)
self.num_patches = num_patches if num_patches else max(1, self.world_size)
self.num_patches = (
num_patches if num_patches else max(1, self.pipeline_world_size)
)
self.total_joint = config.joint_block_count
self.total_single = config.single_block_count
@@ -102,6 +103,111 @@ class DiffusionRunner:
self._compute_assigned_blocks()
def _init_cfg_topology(
self, shard_metadata: PipelineShardMetadata | CfgShardMetadata
) -> None:
"""Initialize CFG and pipeline topology from shard metadata.
Handles both CfgShardMetadata (for CFG-enabled models) and
PipelineShardMetadata (for non-CFG models with sequential CFG).
"""
if self.group is None:
self.rank = 0
self.world_size = 1
self.start_layer = 0
self.end_layer = self.config.total_blocks
self.cfg_rank = 0
self.cfg_world_size = 1
self.cfg_parallel = False
self.pipeline_world_size = 1
self.pipeline_rank = 0
self.next_pipeline_rank: int | None = None
self.prev_pipeline_rank: int | None = None
self.cfg_peer_rank: int | None = None
self.first_pipeline_rank: int = 0
self.last_pipeline_rank: int = 0
elif isinstance(shard_metadata, CfgShardMetadata):
self.rank = shard_metadata.device_rank
self.world_size = shard_metadata.world_size
self.start_layer = shard_metadata.start_layer
self.end_layer = shard_metadata.end_layer
self.cfg_rank = shard_metadata.cfg_rank
self.cfg_world_size = shard_metadata.cfg_world_size
self.cfg_parallel = self.cfg_world_size > 1
self.pipeline_world_size = self.world_size // self.cfg_world_size
# Ring topology: CFG group 0 at positions [0..pipeline_world_size-1]
# CFG group 1 at positions [world_size-1..pipeline_world_size] (reversed)
if self.cfg_rank == 0:
self.pipeline_rank = self.rank
else:
self.pipeline_rank = self.world_size - 1 - self.rank
is_first = self.pipeline_rank == 0
is_last = self.pipeline_rank == self.pipeline_world_size - 1
position_to_cfg_pipeline = [
(0, r) for r in range(self.pipeline_world_size)
] + [(1, r) for r in reversed(range(self.pipeline_world_size))]
cfg_pipeline_to_device: dict[tuple[int, int], int] = {
(cfg_r, pipe_r): i
for i, (cfg_r, pipe_r) in enumerate(position_to_cfg_pipeline)
}
if is_last:
self.next_pipeline_rank = None
else:
self.next_pipeline_rank = cfg_pipeline_to_device[
(self.cfg_rank, self.pipeline_rank + 1)
]
if is_first:
self.prev_pipeline_rank = None
else:
self.prev_pipeline_rank = cfg_pipeline_to_device[
(self.cfg_rank, self.pipeline_rank - 1)
]
if is_last:
other_cfg_rank = 1 - self.cfg_rank
self.cfg_peer_rank = cfg_pipeline_to_device[
(other_cfg_rank, self.pipeline_rank)
]
else:
self.cfg_peer_rank = None
if self.cfg_rank == 0:
self.first_pipeline_rank = 0
self.last_pipeline_rank = self.pipeline_world_size - 1
else:
self.first_pipeline_rank = self.world_size - 1
self.last_pipeline_rank = self.pipeline_world_size
else:
self.rank = shard_metadata.device_rank
self.world_size = shard_metadata.world_size
self.start_layer = shard_metadata.start_layer
self.end_layer = shard_metadata.end_layer
self.cfg_rank = 0
self.cfg_world_size = 1
self.cfg_parallel = False
self.pipeline_world_size = self.world_size
self.pipeline_rank = self.rank
is_first = self.pipeline_rank == 0
is_last = self.pipeline_rank == self.pipeline_world_size - 1
self.next_pipeline_rank = None if is_last else self.pipeline_rank + 1
self.prev_pipeline_rank = None if is_first else self.pipeline_rank - 1
self.cfg_peer_rank = None
self.first_pipeline_rank = 0
self.last_pipeline_rank = self.pipeline_world_size - 1
def _compute_assigned_blocks(self) -> None:
"""Determine which joint/single blocks this stage owns."""
start = self.start_layer
@@ -138,11 +244,11 @@ class DiffusionRunner:
@property
def is_first_stage(self) -> bool:
return self.rank == 0
return self.pipeline_rank == 0
@property
def is_last_stage(self) -> bool:
return self.rank == self.world_size - 1
return self.pipeline_rank == self.pipeline_world_size - 1
@property
def is_distributed(self) -> bool:
@@ -153,6 +259,97 @@ class DiffusionRunner:
return self._guidance_override
return self.config.guidance_scale
def _get_cfg_branches(self, prompt_data: PromptData) -> Iterator[CfgBranch]:
"""Yield the CFG branches this node should process.
- No CFG: yields one branch (positive)
- CFG parallel: yields one branch (our assigned branch)
- Sequential CFG: yields two branches (positive, then negative)
"""
if not self.adapter.needs_cfg:
embeds, mask, pooled, cond = prompt_data.get_cfg_branch_data(positive=True)
yield CfgBranch(
positive=True,
embeds=embeds,
mask=mask,
pooled=pooled,
cond_latents=cond,
)
elif self.cfg_parallel:
positive = self.cfg_rank == 0
embeds, mask, pooled, cond = prompt_data.get_cfg_branch_data(positive)
yield CfgBranch(
positive=positive,
embeds=embeds,
mask=mask,
pooled=pooled,
cond_latents=cond,
)
else:
pos_embeds, pos_mask, pos_pooled, pos_cond = (
prompt_data.get_cfg_branch_data(positive=True)
)
yield CfgBranch(
positive=True,
embeds=pos_embeds,
mask=pos_mask,
pooled=pos_pooled,
cond_latents=pos_cond,
)
neg_embeds, neg_mask, neg_pooled, neg_cond = (
prompt_data.get_cfg_branch_data(positive=False)
)
yield CfgBranch(
positive=False,
embeds=neg_embeds,
mask=neg_mask,
pooled=neg_pooled,
cond_latents=neg_cond,
)
def _combine_cfg_results(self, results: list[tuple[bool, mx.array]]) -> mx.array:
if len(results) == 1:
positive, noise = results[0]
if self.cfg_parallel and self.is_last_stage:
# TODO(ciaran): try to remove
mx.eval(noise)
return self._exchange_and_apply_guidance(noise, positive)
return noise
noise_neg = next(n for p, n in results if not p)
noise_pos = next(n for p, n in results if p)
return self._apply_guidance(noise_pos, noise_neg)
def _exchange_and_apply_guidance(
self, noise: mx.array, is_positive: bool
) -> mx.array:
assert self.group is not None
assert self.cfg_peer_rank is not None
if is_positive:
noise = mx.distributed.send(noise, self.cfg_peer_rank, group=self.group)
mx.async_eval(noise)
noise_neg = mx.distributed.recv_like(
noise, self.cfg_peer_rank, group=self.group
)
mx.eval(noise_neg)
noise_pos = noise
else:
noise_pos = mx.distributed.recv_like(
noise, self.cfg_peer_rank, group=self.group
)
mx.eval(noise_pos)
noise = mx.distributed.send(noise, self.cfg_peer_rank, group=self.group)
mx.async_eval(noise)
noise_neg = noise
return self._apply_guidance(noise_pos, noise_neg)
def _apply_guidance(self, noise_pos: mx.array, noise_neg: mx.array) -> mx.array:
scale = self._get_effective_guidance_scale()
assert scale is not None
return self.adapter.apply_guidance(noise_pos, noise_neg, scale)
def _ensure_wrappers(
self,
text_seq_len: int,
@@ -470,7 +667,9 @@ class DiffusionRunner:
) -> mx.array:
if self.group is None:
return self._single_node_step(t, config, latents, prompt_data)
elif t < config.init_time_step + num_sync_steps:
elif (
self.pipeline_world_size == 1 or t < config.init_time_step + num_sync_steps
):
with trace(name=f"sync {t}", rank=self.rank, category="sync"):
return self._sync_pipeline_step(
t,
@@ -496,42 +695,29 @@ class DiffusionRunner:
prompt_data: PromptData,
) -> mx.array:
cond_image_grid = prompt_data.cond_image_grid
needs_cfg = self.adapter.needs_cfg
results: list[tuple[bool, mx.array]] = []
for branch in self._get_cfg_branches(prompt_data):
# Reset caches before each branch to ensure no state contamination
self._reset_all_caches()
if needs_cfg:
batched_data = prompt_data.get_batched_cfg_data()
assert batched_data is not None, "CFG model must provide batched data"
prompt_embeds, encoder_mask, batched_pooled, cond_latents = batched_data
pooled_embeds = (
batched_pooled if batched_pooled is not None else prompt_embeds
)
step_latents = mx.concatenate([latents, latents], axis=0)
else:
prompt_embeds = prompt_data.prompt_embeds
pooled_embeds = prompt_data.pooled_prompt_embeds
encoder_mask = prompt_data.get_encoder_hidden_states_mask(positive=True)
cond_latents = prompt_data.conditioning_latents
step_latents = latents
noise = self._forward_pass(
step_latents,
prompt_embeds,
pooled_embeds,
t=t,
config=config,
encoder_hidden_states_mask=encoder_mask,
cond_image_grid=cond_image_grid,
conditioning_latents=cond_latents,
)
if needs_cfg:
noise_pos, noise_neg = mx.split(noise, 2, axis=0)
guidance_scale = self._get_effective_guidance_scale()
assert guidance_scale is not None
noise = self.adapter.apply_guidance(
noise_pos, noise_neg, guidance_scale=guidance_scale
branch.pooled if branch.pooled is not None else branch.embeds
)
noise = self._forward_pass(
latents,
branch.embeds,
pooled_embeds,
t=t,
config=config,
encoder_hidden_states_mask=branch.mask,
cond_image_grid=cond_image_grid,
conditioning_latents=branch.cond_latents,
)
results.append((branch.positive, noise))
noise = self._combine_cfg_results(results)
return config.scheduler.step(noise=noise, timestep=t, latents=latents) # pyright: ignore[reportAny]
def _create_patches(
@@ -582,7 +768,7 @@ class DiffusionRunner:
)
text_embeddings = self.adapter.compute_text_embeddings(
t, config, pooled_prompt_embeds
t, config, pooled_prompt_embeds, hidden_states=hidden_states
)
image_rotary_embeddings = self.adapter.compute_rotary_embeddings(
prompt_embeds,
@@ -594,19 +780,22 @@ class DiffusionRunner:
if self.has_joint_blocks:
if not self.is_first_stage:
assert self.prev_pipeline_rank is not None
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
name=f"recv {self.prev_pipeline_rank}",
rank=self.rank,
category="comms",
):
hidden_states = mx.distributed.recv(
(batch_size, num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(hidden_states, encoder_hidden_states)
@@ -639,34 +828,45 @@ class DiffusionRunner:
if self.has_single_blocks or self.is_last_stage:
hidden_states = concatenated
else:
assert self.next_pipeline_rank is not None
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
concatenated = mx.distributed.send(
concatenated, self.next_rank, group=self.group
concatenated, self.next_pipeline_rank, group=self.group
)
mx.async_eval(concatenated)
elif self.has_joint_blocks and not self.is_last_stage:
assert encoder_hidden_states is not None
with trace(name=f"send {self.next_rank}", rank=self.rank, category="comms"):
assert self.next_pipeline_rank is not None
with trace(
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
hidden_states, self.next_pipeline_rank, group=self.group
)
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
encoder_hidden_states, self.next_pipeline_rank, group=self.group
)
mx.async_eval(hidden_states, encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
assert self.prev_pipeline_rank is not None
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
name=f"recv {self.prev_pipeline_rank}",
rank=self.rank,
category="comms",
):
hidden_states = mx.distributed.recv(
(batch_size, text_seq_len + num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(hidden_states)
@@ -689,11 +889,14 @@ class DiffusionRunner:
mx.eval(hidden_states)
if not self.is_last_stage:
assert self.next_pipeline_rank is not None
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
hidden_states, self.next_pipeline_rank, group=self.group
)
mx.async_eval(hidden_states)
@@ -716,83 +919,67 @@ class DiffusionRunner:
kontext_image_ids: mx.array | None = None,
) -> mx.array:
prev_latents = hidden_states
needs_cfg = self.adapter.needs_cfg
cond_image_grid = prompt_data.cond_image_grid
scaled_hidden_states = config.scheduler.scale_model_input(hidden_states, t) # pyright: ignore[reportAny]
original_latent_tokens: int = scaled_hidden_states.shape[1] # pyright: ignore[reportAny]
if needs_cfg:
batched_data = prompt_data.get_batched_cfg_data()
assert batched_data is not None, "CFG model must provide batched data"
prompt_embeds, encoder_mask, batched_pooled, cond_latents = batched_data
results: list[tuple[bool, mx.array]] = []
for branch in self._get_cfg_branches(prompt_data):
pooled_embeds = (
batched_pooled if batched_pooled is not None else prompt_embeds
branch.pooled if branch.pooled is not None else branch.embeds
)
step_latents = mx.concatenate(
[scaled_hidden_states, scaled_hidden_states], axis=0
cond_latents = branch.cond_latents
if cond_latents is not None:
num_img_tokens: int = original_latent_tokens + cond_latents.shape[1]
else:
num_img_tokens = original_latent_tokens
step_latents: mx.array = scaled_hidden_states # pyright: ignore[reportAny]
if self.is_first_stage and cond_latents is not None:
step_latents = mx.concatenate([step_latents, cond_latents], axis=1)
text_seq_len = branch.embeds.shape[1]
self._ensure_wrappers(text_seq_len, branch.mask)
noise = self._run_sync_pass(
t,
config,
step_latents,
branch.embeds,
pooled_embeds,
branch.mask,
cond_image_grid,
kontext_image_ids,
num_img_tokens,
original_latent_tokens,
cond_latents,
)
else:
prompt_embeds = prompt_data.prompt_embeds
pooled_embeds = prompt_data.pooled_prompt_embeds
encoder_mask = prompt_data.get_encoder_hidden_states_mask(positive=True)
cond_latents = prompt_data.conditioning_latents
step_latents = scaled_hidden_states # pyright: ignore[reportAny]
if cond_latents is not None:
num_img_tokens: int = original_latent_tokens + cond_latents.shape[1]
else:
num_img_tokens = original_latent_tokens
if self.is_first_stage and cond_latents is not None:
step_latents = mx.concatenate([step_latents, cond_latents], axis=1)
text_seq_len = prompt_embeds.shape[1]
self._ensure_wrappers(text_seq_len, encoder_mask)
noise = self._run_sync_pass(
t,
config,
step_latents,
prompt_embeds,
pooled_embeds,
encoder_mask,
cond_image_grid,
kontext_image_ids,
num_img_tokens,
original_latent_tokens,
cond_latents,
)
if self.is_last_stage:
assert noise is not None
results.append((branch.positive, noise))
if self.is_last_stage:
assert noise is not None
if needs_cfg:
noise_pos, noise_neg = mx.split(noise, 2, axis=0)
guidance_scale = self._get_effective_guidance_scale()
assert guidance_scale is not None
noise = self.adapter.apply_guidance(
noise_pos, noise_neg, guidance_scale
)
noise = self._combine_cfg_results(results)
hidden_states = config.scheduler.step( # pyright: ignore[reportAny]
noise=noise, timestep=t, latents=prev_latents
)
if not self.is_first_stage:
with trace(name="send 0", rank=self.rank, category="comms"):
hidden_states = mx.distributed.send(
hidden_states, 0, group=self.group
)
mx.async_eval(hidden_states)
hidden_states = mx.distributed.send(
hidden_states, self.first_pipeline_rank, group=self.group
)
mx.async_eval(hidden_states)
elif self.is_first_stage:
with trace(
name=f"recv {self.world_size - 1}", rank=self.rank, category="comms"
):
hidden_states = mx.distributed.recv_like(
prev_latents, src=self.world_size - 1, group=self.group
)
mx.eval(hidden_states)
hidden_states = mx.distributed.recv_like(
prev_latents, src=self.last_pipeline_rank, group=self.group
)
mx.eval(hidden_states)
else:
hidden_states = prev_latents
@@ -809,39 +996,10 @@ class DiffusionRunner:
kontext_image_ids: mx.array | None = None,
) -> mx.array:
patch_latents, token_indices = self._create_patches(latents, config)
needs_cfg = self.adapter.needs_cfg
cond_image_grid = prompt_data.cond_image_grid
if needs_cfg:
batched_data = prompt_data.get_batched_cfg_data()
assert batched_data is not None, "CFG model must provide batched data"
prompt_embeds, encoder_mask, batched_pooled, _ = batched_data
pooled_embeds = (
batched_pooled if batched_pooled is not None else prompt_embeds
)
else:
prompt_embeds = prompt_data.prompt_embeds
pooled_embeds = prompt_data.pooled_prompt_embeds
encoder_mask = prompt_data.get_encoder_hidden_states_mask(positive=True)
text_seq_len = prompt_embeds.shape[1]
self._ensure_wrappers(text_seq_len, encoder_mask)
self._set_text_seq_len(text_seq_len)
if self.joint_block_wrappers:
for wrapper in self.joint_block_wrappers:
wrapper.set_encoder_mask(encoder_mask)
text_embeddings = self.adapter.compute_text_embeddings(t, config, pooled_embeds)
image_rotary_embeddings = self.adapter.compute_rotary_embeddings(
prompt_embeds,
config,
encoder_hidden_states_mask=encoder_mask,
cond_image_grid=cond_image_grid,
kontext_image_ids=kontext_image_ids,
)
prev_patch_latents = [p for p in patch_latents]
encoder_hidden_states: mx.array | None = None
for patch_idx in range(len(patch_latents)):
@@ -853,34 +1011,57 @@ class DiffusionRunner:
and not is_first_async_step
):
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
name=f"recv {self.last_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch = mx.distributed.recv_like(
patch, src=self.prev_rank, group=self.group
patch, src=self.last_pipeline_rank, group=self.group
)
mx.eval(patch)
step_patch = mx.concatenate([patch, patch], axis=0) if needs_cfg else patch
results: list[tuple[bool, mx.array]] = []
noise, encoder_hidden_states = self._run_single_patch_pass(
patch=step_patch,
patch_idx=patch_idx,
token_indices=token_indices[patch_idx],
prompt_embeds=prompt_embeds,
text_embeddings=text_embeddings,
image_rotary_embeddings=image_rotary_embeddings,
encoder_hidden_states=encoder_hidden_states,
)
for branch in self._get_cfg_branches(prompt_data):
pooled_embeds = (
branch.pooled if branch.pooled is not None else branch.embeds
)
text_seq_len = branch.embeds.shape[1]
self._ensure_wrappers(text_seq_len, branch.mask)
self._set_text_seq_len(text_seq_len)
if self.joint_block_wrappers:
for wrapper in self.joint_block_wrappers:
wrapper.set_encoder_mask(branch.mask)
text_embeddings = self.adapter.compute_text_embeddings(
t, config, pooled_embeds
)
image_rotary_embeddings = self.adapter.compute_rotary_embeddings(
branch.embeds,
config,
encoder_hidden_states_mask=branch.mask,
cond_image_grid=cond_image_grid,
kontext_image_ids=kontext_image_ids,
)
noise, encoder_hidden_states = self._run_single_patch_pass(
patch=patch,
patch_idx=patch_idx,
token_indices=token_indices[patch_idx],
prompt_embeds=branch.embeds,
text_embeddings=text_embeddings,
image_rotary_embeddings=image_rotary_embeddings,
encoder_hidden_states=encoder_hidden_states,
)
if self.is_last_stage:
assert noise is not None
results.append((branch.positive, noise))
if self.is_last_stage:
assert noise is not None
if needs_cfg:
noise_pos, noise_neg = mx.split(noise, 2, axis=0)
guidance_scale = self._get_effective_guidance_scale()
assert guidance_scale is not None
noise = self.adapter.apply_guidance(
noise_pos, noise_neg, guidance_scale
)
noise = self._combine_cfg_results(results)
patch_latents[patch_idx] = config.scheduler.step( # pyright: ignore[reportAny]
noise=noise,
@@ -890,10 +1071,14 @@ class DiffusionRunner:
if not self.is_first_stage and t != config.num_inference_steps - 1:
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
name=f"send {self.first_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch_latents[patch_idx] = mx.distributed.send(
patch_latents[patch_idx], self.next_rank, group=self.group
patch_latents[patch_idx],
self.first_pipeline_rank,
group=self.group,
)
mx.async_eval(patch_latents[patch_idx])
@@ -933,26 +1118,31 @@ class DiffusionRunner:
if self.has_joint_blocks:
if not self.is_first_stage:
assert self.prev_pipeline_rank is not None
patch_len = patch.shape[1]
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
name=f"recv {self.prev_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch = mx.distributed.recv(
(batch_size, patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(patch)
if patch_idx == 0:
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
name=f"recv {self.prev_pipeline_rank}",
rank=self.rank,
category="comms",
):
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
patch.dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(encoder_hidden_states)
@@ -988,39 +1178,54 @@ class DiffusionRunner:
if self.has_single_blocks or self.is_last_stage:
patch = patch_concat
else:
assert self.next_pipeline_rank is not None
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch_concat = mx.distributed.send(
patch_concat, self.next_rank, group=self.group
patch_concat, self.next_pipeline_rank, group=self.group
)
mx.async_eval(patch_concat)
elif self.has_joint_blocks and not self.is_last_stage:
with trace(name=f"send {self.next_rank}", rank=self.rank, category="comms"):
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
assert self.next_pipeline_rank is not None
with trace(
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch = mx.distributed.send(
patch, self.next_pipeline_rank, group=self.group
)
mx.async_eval(patch)
if patch_idx == 0:
assert encoder_hidden_states is not None
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
encoder_hidden_states, self.next_pipeline_rank, group=self.group
)
mx.async_eval(encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
assert self.prev_pipeline_rank is not None
patch_len = patch.shape[1]
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
name=f"recv {self.prev_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch = mx.distributed.recv(
(batch_size, text_seq_len + patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
self.prev_pipeline_rank,
group=self.group,
)
mx.eval(patch)
@@ -1043,15 +1248,20 @@ class DiffusionRunner:
mx.eval(patch)
if not self.is_last_stage:
assert self.next_pipeline_rank is not None
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
name=f"send {self.next_pipeline_rank}",
rank=self.rank,
category="comms",
):
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
patch = mx.distributed.send(
patch, self.next_pipeline_rank, group=self.group
)
mx.async_eval(patch)
noise: mx.array | None = None
if self.is_last_stage:
patch = patch[:, text_seq_len:, :]
noise = self.adapter.final_projection(patch, text_embeddings)
patch_img_only = patch[:, text_seq_len:, :]
noise = self.adapter.final_projection(patch_img_only, text_embeddings)
return noise, encoder_hidden_states

View File

@@ -48,6 +48,7 @@ from exo.shared.types.worker.instances import (
MlxRingInstance,
)
from exo.shared.types.worker.shards import (
CfgShardMetadata,
PipelineShardMetadata,
ShardMetadata,
TensorShardMetadata,
@@ -274,6 +275,11 @@ def shard_and_load(
logger.info(f"loading model from {model_path} with pipeline parallelism")
model = pipeline_auto_parallel(model, group, shard_metadata)
eval_with_timeout(model.parameters(), timeout_seconds, on_timeout)
case CfgShardMetadata():
raise ValueError(
"CfgShardMetadata is not supported for text model loading - "
"this metadata type is only for image generation models"
)
# TODO: Do we need this?
mx.eval(model)

View File

@@ -66,7 +66,11 @@ from exo.shared.types.worker.runners import (
RunnerStatus,
RunnerWarmingUp,
)
from exo.shared.types.worker.shards import ShardMetadata
from exo.shared.types.worker.shards import (
CfgShardMetadata,
PipelineShardMetadata,
ShardMetadata,
)
from exo.utils.channels import MpReceiver, MpSender
from exo.worker.engines.image import (
DistributedImageModel,
@@ -87,6 +91,28 @@ from exo.worker.engines.mlx.utils_mlx import (
from exo.worker.runner.bootstrap import logger
def _is_primary_output_node(shard_metadata: ShardMetadata) -> bool:
"""Check if this node is the primary output node for image generation.
For CFG models: the node with cfg_rank == 0 and is_pipeline_last.
For non-CFG models: the node with device_rank == world_size - 1.
"""
if isinstance(shard_metadata, CfgShardMetadata):
pipeline_world_size = shard_metadata.world_size // shard_metadata.cfg_world_size
# Ring topology: CFG group 0 at positions [0..pipeline_world_size-1]
# CFG group 1 at positions [world_size-1..pipeline_world_size] (reversed)
if shard_metadata.cfg_rank == 0:
pipeline_rank = shard_metadata.device_rank
else:
pipeline_rank = shard_metadata.world_size - 1 - shard_metadata.device_rank
is_pipeline_last = pipeline_rank == pipeline_world_size - 1
return is_pipeline_last and shard_metadata.cfg_rank == 0
elif isinstance(shard_metadata, PipelineShardMetadata):
# For pipeline metadata: device_rank == world_size - 1 means last stage
return shard_metadata.device_rank == shard_metadata.world_size - 1
return False
def main(
bound_instance: BoundInstance,
event_sender: MpSender[Event],
@@ -371,10 +397,10 @@ def main(
# Track image_index for final images only
image_index = 0
for response in generate_image(model=model, task=task_params):
if (
shard_metadata.device_rank
== shard_metadata.world_size - 1
):
# Only the primary output node (last pipeline stage, CFG rank 0) sends results
is_primary_output = _is_primary_output_node(shard_metadata)
if is_primary_output:
match response:
case PartialImageResponse():
logger.info(
@@ -399,7 +425,7 @@ def main(
image_index += 1
# can we make this more explicit?
except Exception as e:
if shard_metadata.device_rank == shard_metadata.world_size - 1:
if _is_primary_output_node(shard_metadata):
event_sender.send(
ChunkGenerated(
command_id=command_id,
@@ -434,10 +460,7 @@ def main(
try:
image_index = 0
for response in generate_image(model=model, task=task_params):
if (
shard_metadata.device_rank
== shard_metadata.world_size - 1
):
if _is_primary_output_node(shard_metadata):
match response:
case PartialImageResponse():
logger.info(
@@ -461,7 +484,7 @@ def main(
)
image_index += 1
except Exception as e:
if shard_metadata.device_rank == shard_metadata.world_size - 1:
if _is_primary_output_node(shard_metadata):
event_sender.send(
ChunkGenerated(
command_id=command_id,