|
|
|
@@ -13,17 +13,11 @@ from mlx.nn.layers.distributed import (
|
|
|
|
|
shard_linear,
|
|
|
|
|
sum_gradients,
|
|
|
|
|
)
|
|
|
|
|
from mlx_lm.models.deepseek_v3 import DeepseekV3MLP
|
|
|
|
|
from mlx_lm.models.deepseek_v3 import Model as DeepseekV3Model
|
|
|
|
|
from mlx_lm.models.deepseek_v32 import DeepseekV32MLP
|
|
|
|
|
from mlx_lm.models.deepseek_v32 import Model as DeepseekV32Model
|
|
|
|
|
from mlx_lm.models.glm4_moe import Model as Glm4MoeModel
|
|
|
|
|
from mlx_lm.models.glm4_moe import MoE
|
|
|
|
|
from mlx_lm.models.gpt_oss import GptOssMoeModel
|
|
|
|
|
from mlx_lm.models.gpt_oss import Model as GptOssModel
|
|
|
|
|
from mlx_lm.models.llama import Model as LlamaModel
|
|
|
|
|
from mlx_lm.models.minimax import Model as MiniMaxModel
|
|
|
|
|
from mlx_lm.models.ministral3 import Model as Ministral3Model
|
|
|
|
|
from mlx_lm.models.qwen3_moe import Model as Qwen3MoeModel
|
|
|
|
|
from mlx_lm.models.qwen3_moe import Qwen3MoeSparseMoeBlock
|
|
|
|
|
from mlx_lm.models.qwen3_next import Model as Qwen3NextModel
|
|
|
|
@@ -341,33 +335,7 @@ def tensor_auto_parallel(
|
|
|
|
|
except (AttributeError, TypeError, NameError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
if isinstance(model, (LlamaModel, Ministral3Model)):
|
|
|
|
|
logger.warning("shouldn't be hit - upstream sharding exists")
|
|
|
|
|
tensor_parallel_sharding_strategy = LlamaShardingStrategy(
|
|
|
|
|
group,
|
|
|
|
|
all_to_sharded_linear,
|
|
|
|
|
sharded_to_all_linear,
|
|
|
|
|
all_to_sharded_linear_in_place,
|
|
|
|
|
sharded_to_all_linear_in_place,
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(model, (DeepseekV3Model, DeepseekV32Model)):
|
|
|
|
|
logger.warning("shouldn't be hit - upstream sharding exists")
|
|
|
|
|
tensor_parallel_sharding_strategy = DeepSeekShardingStrategy(
|
|
|
|
|
group,
|
|
|
|
|
all_to_sharded_linear,
|
|
|
|
|
sharded_to_all_linear,
|
|
|
|
|
all_to_sharded_linear_in_place,
|
|
|
|
|
sharded_to_all_linear_in_place,
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(model, MiniMaxModel):
|
|
|
|
|
tensor_parallel_sharding_strategy = MiniMaxShardingStrategy(
|
|
|
|
|
group,
|
|
|
|
|
all_to_sharded_linear,
|
|
|
|
|
sharded_to_all_linear,
|
|
|
|
|
all_to_sharded_linear_in_place,
|
|
|
|
|
sharded_to_all_linear_in_place,
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(model, (Qwen3MoeModel, Glm4MoeModel, Qwen3NextModel)):
|
|
|
|
|
if isinstance(model, (Qwen3MoeModel, Glm4MoeModel, Qwen3NextModel)):
|
|
|
|
|
tensor_parallel_sharding_strategy = QwenShardingStrategy(
|
|
|
|
|
group,
|
|
|
|
|
all_to_sharded_linear,
|
|
|
|
@@ -375,15 +343,6 @@ def tensor_auto_parallel(
|
|
|
|
|
all_to_sharded_linear_in_place,
|
|
|
|
|
sharded_to_all_linear_in_place,
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(model, GptOssModel):
|
|
|
|
|
tensor_parallel_sharding_strategy = GptOssShardingStrategy(
|
|
|
|
|
group,
|
|
|
|
|
all_to_sharded_linear,
|
|
|
|
|
sharded_to_all_linear,
|
|
|
|
|
all_to_sharded_linear_in_place,
|
|
|
|
|
sharded_to_all_linear_in_place,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f"Unsupported model type: {type(model)}")
|
|
|
|
|
|
|
|
|
@@ -418,34 +377,6 @@ class TensorParallelShardingStrategy(ABC):
|
|
|
|
|
) -> nn.Module: ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LlamaShardingStrategy(TensorParallelShardingStrategy):
|
|
|
|
|
def shard_model(
|
|
|
|
|
self,
|
|
|
|
|
model: nn.Module,
|
|
|
|
|
timeout_seconds: float,
|
|
|
|
|
on_timeout: TimeoutCallback | None,
|
|
|
|
|
) -> nn.Module:
|
|
|
|
|
model = cast(LlamaModel, model)
|
|
|
|
|
for layer in model.layers:
|
|
|
|
|
# Force load weights before sharding to avoid FAST_SYNCH deadlock
|
|
|
|
|
eval_with_timeout(
|
|
|
|
|
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
|
|
|
|
)
|
|
|
|
|
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
|
|
|
|
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
|
|
|
|
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
|
|
|
|
|
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
|
|
|
|
|
layer.self_attn.n_heads //= self.N
|
|
|
|
|
if layer.self_attn.n_kv_heads is not None:
|
|
|
|
|
layer.self_attn.n_kv_heads //= self.N
|
|
|
|
|
|
|
|
|
|
layer.mlp.gate_proj = self.all_to_sharded_linear(layer.mlp.gate_proj)
|
|
|
|
|
layer.mlp.down_proj = self.sharded_to_all_linear(layer.mlp.down_proj)
|
|
|
|
|
layer.mlp.up_proj = self.all_to_sharded_linear(layer.mlp.up_proj)
|
|
|
|
|
|
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_layers(model: nn.Module, layers: list[_LayerCallable]) -> None:
|
|
|
|
|
inner_model_instance = _inner_model(model)
|
|
|
|
|
if hasattr(inner_model_instance, "layers"):
|
|
|
|
@@ -472,105 +403,6 @@ def _set_layers(model: nn.Module, layers: list[_LayerCallable]) -> None:
|
|
|
|
|
raise ValueError("Model must have either a 'layers' or 'h' attribute")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
|
|
|
|
|
def shard_model(
|
|
|
|
|
self,
|
|
|
|
|
model: nn.Module,
|
|
|
|
|
timeout_seconds: float,
|
|
|
|
|
on_timeout: TimeoutCallback | None,
|
|
|
|
|
) -> nn.Module:
|
|
|
|
|
model = cast(DeepseekV3Model, model)
|
|
|
|
|
for layer in model.layers:
|
|
|
|
|
eval_with_timeout(
|
|
|
|
|
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
|
|
|
|
)
|
|
|
|
|
# Shard the self attention
|
|
|
|
|
if layer.self_attn.q_lora_rank is None:
|
|
|
|
|
layer.self_attn.q_proj = self.all_to_sharded_linear(
|
|
|
|
|
layer.self_attn.q_proj
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
layer.self_attn.q_b_proj = self.all_to_sharded_linear(
|
|
|
|
|
layer.self_attn.q_b_proj
|
|
|
|
|
)
|
|
|
|
|
layer.self_attn.kv_b_proj = self.all_to_sharded_linear(
|
|
|
|
|
layer.self_attn.kv_b_proj
|
|
|
|
|
)
|
|
|
|
|
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
|
|
|
|
|
layer.self_attn.num_heads //= self.N
|
|
|
|
|
|
|
|
|
|
# Shard the MLP
|
|
|
|
|
if isinstance(layer.mlp, (DeepseekV3MLP, DeepseekV32MLP)):
|
|
|
|
|
layer.mlp.gate_proj = self.all_to_sharded_linear(layer.mlp.gate_proj)
|
|
|
|
|
layer.mlp.down_proj = self.sharded_to_all_linear(layer.mlp.down_proj)
|
|
|
|
|
layer.mlp.up_proj = self.all_to_sharded_linear(layer.mlp.up_proj)
|
|
|
|
|
|
|
|
|
|
# Shard the MoE. Shard in place since the MoE should be responsible
|
|
|
|
|
# for aggregating the results.
|
|
|
|
|
else:
|
|
|
|
|
self.all_to_sharded_linear_in_place(layer.mlp.shared_experts.gate_proj)
|
|
|
|
|
self.sharded_to_all_linear_in_place(layer.mlp.shared_experts.down_proj)
|
|
|
|
|
self.all_to_sharded_linear_in_place(layer.mlp.shared_experts.up_proj)
|
|
|
|
|
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj)
|
|
|
|
|
self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj)
|
|
|
|
|
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj)
|
|
|
|
|
layer.mlp = ShardedDeepseekV3MoE(layer.mlp) # type: ignore
|
|
|
|
|
layer.mlp.sharding_group = self.group
|
|
|
|
|
|
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ShardedDeepseekV3MoE(CustomMlxLayer):
|
|
|
|
|
def __init__(self, layer: _LayerCallable):
|
|
|
|
|
super().__init__(layer)
|
|
|
|
|
self.sharding_group: mx.distributed.Group | None = None
|
|
|
|
|
|
|
|
|
|
def __call__(self, x: mx.array) -> mx.array:
|
|
|
|
|
if self.sharding_group is not None:
|
|
|
|
|
x = sum_gradients(self.sharding_group)(x)
|
|
|
|
|
y = self.original_layer.__call__(x)
|
|
|
|
|
if self.sharding_group is not None:
|
|
|
|
|
y = mx.distributed.all_sum(y, group=self.sharding_group)
|
|
|
|
|
return y
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
|
|
|
|
|
def shard_model(
|
|
|
|
|
self,
|
|
|
|
|
model: nn.Module,
|
|
|
|
|
timeout_seconds: float,
|
|
|
|
|
on_timeout: TimeoutCallback | None,
|
|
|
|
|
) -> nn.Module:
|
|
|
|
|
model = cast(MiniMaxModel, model)
|
|
|
|
|
for layer in model.layers:
|
|
|
|
|
eval_with_timeout(
|
|
|
|
|
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
|
|
|
|
)
|
|
|
|
|
# Shard the self attention
|
|
|
|
|
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
|
|
|
|
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
|
|
|
|
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
|
|
|
|
|
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
|
|
|
|
|
layer.self_attn.num_attention_heads //= self.N
|
|
|
|
|
layer.self_attn.num_key_value_heads //= self.N
|
|
|
|
|
|
|
|
|
|
# Shard the MoE. Shard in place since the MoE should be responsible
|
|
|
|
|
# for aggregating the results.
|
|
|
|
|
self.all_to_sharded_linear_in_place(
|
|
|
|
|
layer.block_sparse_moe.switch_mlp.gate_proj
|
|
|
|
|
)
|
|
|
|
|
self.sharded_to_all_linear_in_place(
|
|
|
|
|
layer.block_sparse_moe.switch_mlp.down_proj
|
|
|
|
|
)
|
|
|
|
|
self.all_to_sharded_linear_in_place(
|
|
|
|
|
layer.block_sparse_moe.switch_mlp.up_proj
|
|
|
|
|
)
|
|
|
|
|
layer.block_sparse_moe = ShardedQwenMoE(layer.block_sparse_moe) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
|
|
|
|
|
layer.block_sparse_moe.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
|
|
|
|
|
|
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QwenShardingStrategy(TensorParallelShardingStrategy):
|
|
|
|
|
def shard_model(
|
|
|
|
|
self,
|
|
|
|
@@ -623,58 +455,3 @@ class ShardedQwenMoE(CustomMlxLayer):
|
|
|
|
|
if self.sharding_group is not None:
|
|
|
|
|
y = mx.distributed.all_sum(y, group=self.sharding_group)
|
|
|
|
|
return y
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GptOssShardingStrategy(TensorParallelShardingStrategy):
|
|
|
|
|
def shard_model(
|
|
|
|
|
self,
|
|
|
|
|
model: nn.Module,
|
|
|
|
|
timeout_seconds: float,
|
|
|
|
|
on_timeout: TimeoutCallback | None,
|
|
|
|
|
) -> nn.Module:
|
|
|
|
|
model = cast(GptOssMoeModel, model)
|
|
|
|
|
|
|
|
|
|
for layer in model.layers:
|
|
|
|
|
eval_with_timeout(
|
|
|
|
|
layer.parameters(), timeout_seconds / len(model.layers), on_timeout
|
|
|
|
|
)
|
|
|
|
|
layer.self_attn.q_proj = self.all_to_sharded_linear(layer.self_attn.q_proj)
|
|
|
|
|
layer.self_attn.k_proj = self.all_to_sharded_linear(layer.self_attn.k_proj)
|
|
|
|
|
layer.self_attn.v_proj = self.all_to_sharded_linear(layer.self_attn.v_proj)
|
|
|
|
|
layer.self_attn.o_proj = self.sharded_to_all_linear(layer.self_attn.o_proj)
|
|
|
|
|
|
|
|
|
|
layer.self_attn.num_attention_heads //= self.N
|
|
|
|
|
layer.self_attn.num_key_value_heads //= self.N
|
|
|
|
|
layer.self_attn.num_key_value_groups = (
|
|
|
|
|
layer.self_attn.num_attention_heads
|
|
|
|
|
// layer.self_attn.num_key_value_heads
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
layer.self_attn.sinks = layer.self_attn.sinks[
|
|
|
|
|
layer.self_attn.num_attention_heads
|
|
|
|
|
* self.group.rank() : layer.self_attn.num_attention_heads
|
|
|
|
|
* (self.group.rank() + 1)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
self.all_to_sharded_linear_in_place(layer.mlp.experts.gate_proj)
|
|
|
|
|
self.sharded_to_all_linear_in_place(layer.mlp.experts.down_proj)
|
|
|
|
|
self.all_to_sharded_linear_in_place(layer.mlp.experts.up_proj)
|
|
|
|
|
|
|
|
|
|
layer.mlp = ShardedGptOssMoE(layer.mlp) # type: ignore
|
|
|
|
|
layer.mlp.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
|
|
|
|
|
|
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ShardedGptOssMoE(CustomMlxLayer):
|
|
|
|
|
def __init__(self, layer: nn.Module):
|
|
|
|
|
super().__init__(layer)
|
|
|
|
|
self.sharding_group: mx.distributed.Group | None = None
|
|
|
|
|
|
|
|
|
|
def __call__(self, x: mx.array) -> mx.array:
|
|
|
|
|
if self.sharding_group is not None:
|
|
|
|
|
x = sum_gradients(self.sharding_group)(x)
|
|
|
|
|
y = self.original_layer(x)
|
|
|
|
|
if self.sharding_group is not None:
|
|
|
|
|
y = mx.distributed.all_sum(y, group=self.sharding_group)
|
|
|
|
|
return y
|
|
|
|
|