Compare commits

..

1 Commits

Author SHA1 Message Date
Evan
1bca96747d add new kill bridge script 2026-01-20 14:03:14 +00:00
9 changed files with 241 additions and 88 deletions

View File

@@ -276,7 +276,9 @@ def test_placement_selects_leaf_nodes(
# arrange # arrange
topology = Topology() topology = Topology()
model_card.storage_size = Memory.from_bytes(1000) # Model requires more than any single node but fits within a 3-node cycle
model_card.storage_size.in_bytes = 1500
model_card.n_layers = 12
node_id_a = NodeId() node_id_a = NodeId()
node_id_b = NodeId() node_id_b = NodeId()

View File

@@ -477,6 +477,53 @@ async def get_downloaded_size(path: Path) -> int:
return 0 return 0
async def download_progress_for_local_path(
repo_id: str, shard: ShardMetadata, local_path: Path
) -> RepoDownloadProgress:
file_progress: dict[str, RepoFileDownloadProgress] = {}
total_files = 0
total_bytes = 0
if await aios.path.isdir(local_path):
for root, _, files in os.walk(local_path):
for f in files:
if f.endswith((".safetensors", ".bin", ".pt", ".gguf", ".json")):
file_path = Path(root) / f
size = (await aios.stat(file_path)).st_size
rel_path = str(file_path.relative_to(local_path))
file_progress[rel_path] = RepoFileDownloadProgress(
repo_id=repo_id,
repo_revision="local",
file_path=rel_path,
downloaded=Memory.from_bytes(size),
downloaded_this_session=Memory.from_bytes(0),
total=Memory.from_bytes(size),
speed=0,
eta=timedelta(0),
status="complete",
start_time=time.time(),
)
total_files += 1
total_bytes += size
else:
raise ValueError(f"Local path {local_path} is not a directory")
return RepoDownloadProgress(
repo_id=repo_id,
repo_revision="local",
shard=shard,
completed_files=total_files,
total_files=total_files,
downloaded_bytes=Memory.from_bytes(total_bytes),
downloaded_bytes_this_session=Memory.from_bytes(0),
total_bytes=Memory.from_bytes(total_bytes),
overall_speed=0,
overall_eta=timedelta(0),
status="complete",
file_progress=file_progress,
)
async def download_shard( async def download_shard(
shard: ShardMetadata, shard: ShardMetadata,
on_progress: Callable[[ShardMetadata, RepoDownloadProgress], Awaitable[None]], on_progress: Callable[[ShardMetadata, RepoDownloadProgress], Awaitable[None]],
@@ -487,6 +534,14 @@ async def download_shard(
if not skip_download: if not skip_download:
logger.info(f"Downloading {shard.model_card.model_id=}") logger.info(f"Downloading {shard.model_card.model_id=}")
# Handle local paths
if await aios.path.exists(str(shard.model_card.model_id)):
logger.info(f"Using local model path {shard.model_card.model_id}")
local_path = Path(str(shard.model_card.model_id))
return local_path, await download_progress_for_local_path(
str(shard.model_card.model_id), shard, local_path
)
revision = "main" revision = "main"
target_dir = await ensure_models_dir() / str(shard.model_card.model_id).replace( target_dir = await ensure_models_dir() / str(shard.model_card.model_id).replace(
"/", "--" "/", "--"
@@ -497,8 +552,7 @@ async def download_shard(
if not allow_patterns: if not allow_patterns:
allow_patterns = await resolve_allow_patterns(shard) allow_patterns = await resolve_allow_patterns(shard)
if not skip_download: logger.info(f"Downloading {shard.model_card.model_id=} with {allow_patterns=}")
logger.info(f"Downloading {shard.model_card.model_id=} with {allow_patterns=}")
all_start_time = time.time() all_start_time = time.time()
# TODO: currently not recursive. Some models might require subdirectories - thus this will need to be changed. # TODO: currently not recursive. Some models might require subdirectories - thus this will need to be changed.

View File

@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
from collections.abc import Callable from collections.abc import Callable
from functools import partial from functools import partial
from inspect import signature from inspect import signature
from typing import TYPE_CHECKING, Any, cast from typing import TYPE_CHECKING, Any, Protocol, cast
import mlx.core as mx import mlx.core as mx
import mlx.nn as nn import mlx.nn as nn
@@ -67,16 +67,27 @@ def eval_with_timeout(
completed.set() completed.set()
class _LayerCallable(Protocol):
"""Structural type that any compatible layer must satisfy.
We require a single positional input of type ``mx.array`` and an
``mx.array`` output, while permitting arbitrary *args / **kwargs so this
protocol matches the vast majority of `mlx.nn.Module` subclasses.
"""
def __call__(self, x: mx.array, *args: object, **kwargs: object) -> mx.array: ...
class CustomMlxLayer(nn.Module): class CustomMlxLayer(nn.Module):
"""Base class for replacing an MLX layer with a custom implementation.""" """Base class for replacing an MLX layer with a custom implementation."""
def __init__(self, original_layer: nn.Module): def __init__(self, original_layer: _LayerCallable):
super().__init__() super().__init__()
object.__setattr__(self, "_original_layer", original_layer) object.__setattr__(self, "_original_layer", original_layer)
@property @property
def original_layer(self) -> nn.Module: def original_layer(self) -> _LayerCallable:
return cast(nn.Module, object.__getattribute__(self, "_original_layer")) return cast(_LayerCallable, object.__getattribute__(self, "_original_layer"))
# Calls __getattr__ for any attributes not found on nn.Module (e.g. use_sliding) # Calls __getattr__ for any attributes not found on nn.Module (e.g. use_sliding)
if not TYPE_CHECKING: if not TYPE_CHECKING:
@@ -89,53 +100,52 @@ class CustomMlxLayer(nn.Module):
return getattr(original_layer, name) return getattr(original_layer, name)
def patch_pipeline_first_layer( class PipelineFirstLayer(CustomMlxLayer):
pipeline_layer: nn.Module, group: mx.distributed.Group def __init__(
) -> nn.Module: self,
cls = type(pipeline_layer) original_layer: _LayerCallable,
orig_call = cast(Callable[..., mx.array], cls.__call__) r: int,
group: mx.distributed.Group,
):
super().__init__(original_layer)
self.r: int = r
self.group = group
rank = group.rank() def __call__(self, x: mx.array, *args: object, **kwargs: object) -> mx.array:
if self.r != 0:
class PatchedFirstLayer(cls): x = mx.distributed.recv_like(x, (self.r - 1), group=self.group)
def __call__(self, x: mx.array, *args: object, **kwargs: object) -> mx.array: return self.original_layer(x, *args, **kwargs)
if rank != 0:
x = mx.distributed.recv_like(x, (rank - 1), group=group)
return orig_call(self, x, *args, **kwargs)
pipeline_layer.__class__ = PatchedFirstLayer
return pipeline_layer
def patch_pipeline_last_layer( class PipelineLastLayer(CustomMlxLayer):
pipeline_layer: nn.Module, group: mx.distributed.Group def __init__(
) -> nn.Module: self,
cls = type(pipeline_layer) original_layer: _LayerCallable,
orig_call = cast(Callable[..., mx.array], cls.__call__) r: int,
orig_call_sig = signature(orig_call) s: int,
group: mx.distributed.Group,
):
super().__init__(original_layer)
self.r: int = r
self.s: int = s
self.group = group
self.original_layer_signature = signature(self.original_layer.__call__)
rank = group.rank() def __call__(self, x: mx.array, *args: object, **kwargs: object) -> mx.array:
size = group.size() cache = self.original_layer_signature.bind_partial(
x, *args, **kwargs
).arguments.get("cache", None)
class PatchedLastLayer(cls): output: mx.array = self.original_layer(x, *args, **kwargs)
def __call__(self, x: mx.array, *args: object, **kwargs: object) -> mx.array:
cache = orig_call_sig.bind_partial(x, *args, **kwargs).arguments.get( if self.r != self.s - 1:
"cache", None output = mx.distributed.send(
output, (self.r + 1) % self.s, group=self.group
) )
if cache is not None:
cache.keys = mx.depends(cache.keys, output) # type: ignore[reportUnknownMemberType]
output: mx.array = orig_call(self, x, *args, **kwargs) return output
if rank != size - 1:
output = mx.distributed.send(output, (rank + 1) % size, group=group)
if cache is not None:
cache.keys = mx.depends(cache.keys, output) # type: ignore[reportUnknownMemberType]
return output
pipeline_layer.__class__ = PatchedLastLayer
return pipeline_layer
def _inner_model(model: nn.Module) -> nn.Module: def _inner_model(model: nn.Module) -> nn.Module:
@@ -150,13 +160,13 @@ def _inner_model(model: nn.Module) -> nn.Module:
raise ValueError("Model must either have a 'model' or 'transformer' attribute") raise ValueError("Model must either have a 'model' or 'transformer' attribute")
def _get_layers(inner_model_instance: nn.Module) -> list[nn.Module]: def _get_layers(inner_model_instance: nn.Module) -> list[_LayerCallable]:
# Handle both model.layers and model.h cases # Handle both model.layers and model.h cases
layers: list[nn.Module] layers: list[_LayerCallable]
if hasattr(inner_model_instance, "layers"): if hasattr(inner_model_instance, "layers"):
layers = cast(list[nn.Module], inner_model_instance.layers) layers = cast(list[_LayerCallable], inner_model_instance.layers)
elif hasattr(inner_model_instance, "h"): elif hasattr(inner_model_instance, "h"):
layers = cast(list[nn.Module], inner_model_instance.h) layers = cast(list[_LayerCallable], inner_model_instance.h)
else: else:
raise ValueError("Model must have either a 'layers' or 'h' attribute") raise ValueError("Model must have either a 'layers' or 'h' attribute")
@@ -181,12 +191,15 @@ def pipeline_auto_parallel(
layers = _get_layers(inner_model_instance) layers = _get_layers(inner_model_instance)
start_layer, end_layer = model_shard_meta.start_layer, model_shard_meta.end_layer start_layer, end_layer = model_shard_meta.start_layer, model_shard_meta.end_layer
device_rank, world_size = model_shard_meta.device_rank, model_shard_meta.world_size
layers = layers[start_layer:end_layer] layers = layers[start_layer:end_layer]
layers[0] = patch_pipeline_first_layer(layers[0], group) layers[0] = PipelineFirstLayer(layers[0], device_rank, group=group)
layers[-1] = patch_pipeline_last_layer( layers[-1] = PipelineLastLayer(
layers[-1], layers[-1],
group, device_rank,
world_size,
group=group,
) )
if isinstance(inner_model_instance, GptOssMoeModel): if isinstance(inner_model_instance, GptOssMoeModel):
@@ -433,7 +446,7 @@ class LlamaShardingStrategy(TensorParallelShardingStrategy):
return model return model
def _set_layers(model: nn.Module, layers: list[nn.Module]) -> None: def _set_layers(model: nn.Module, layers: list[_LayerCallable]) -> None:
inner_model_instance = _inner_model(model) inner_model_instance = _inner_model(model)
if hasattr(inner_model_instance, "layers"): if hasattr(inner_model_instance, "layers"):
inner_model_instance.layers = layers inner_model_instance.layers = layers
@@ -508,17 +521,17 @@ class DeepSeekShardingStrategy(TensorParallelShardingStrategy):
class ShardedDeepseekV3MoE(CustomMlxLayer): class ShardedDeepseekV3MoE(CustomMlxLayer):
def __init__(self, layer: nn.Module): def __init__(self, layer: _LayerCallable):
super().__init__(layer) super().__init__(layer)
self.sharding_group: mx.distributed.Group | None = None self.sharding_group: mx.distributed.Group | None = None
def __call__(self, x: mx.array) -> mx.array: def __call__(self, x: mx.array) -> mx.array:
if self.sharding_group is not None: if self.sharding_group is not None:
x = sum_gradients(self.sharding_group)(x) x = sum_gradients(self.sharding_group)(x)
y = self.original_layer.__call__(x) # type: ignore y = self.original_layer.__call__(x)
if self.sharding_group is not None: if self.sharding_group is not None:
y = mx.distributed.all_sum(y, group=self.sharding_group) # type: ignore y = mx.distributed.all_sum(y, group=self.sharding_group)
return y # type: ignore return y
class MiniMaxShardingStrategy(TensorParallelShardingStrategy): class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
@@ -552,7 +565,7 @@ class MiniMaxShardingStrategy(TensorParallelShardingStrategy):
self.all_to_sharded_linear_in_place( self.all_to_sharded_linear_in_place(
layer.block_sparse_moe.switch_mlp.up_proj layer.block_sparse_moe.switch_mlp.up_proj
) )
layer.block_sparse_moe = ShardedQwenMoE(layer.block_sparse_moe) # pyright: ignore[reportAttributeAccessIssue] layer.block_sparse_moe = ShardedQwenMoE(layer.block_sparse_moe) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.block_sparse_moe.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue] layer.block_sparse_moe.sharding_group = self.group # pyright: ignore[reportAttributeAccessIssue]
return model return model
@@ -586,7 +599,7 @@ class QwenShardingStrategy(TensorParallelShardingStrategy):
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj) self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.gate_proj)
self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj) self.sharded_to_all_linear_in_place(layer.mlp.switch_mlp.down_proj)
self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj) self.all_to_sharded_linear_in_place(layer.mlp.switch_mlp.up_proj)
layer.mlp = ShardedQwenMoE(layer.mlp) # pyright: ignore[reportAttributeAccessIssue] layer.mlp = ShardedQwenMoE(layer.mlp) # pyright: ignore[reportAttributeAccessIssue, reportArgumentType]
layer.mlp.sharding_group = self.group layer.mlp.sharding_group = self.group
# Shard the MLP # Shard the MLP
@@ -599,17 +612,17 @@ class QwenShardingStrategy(TensorParallelShardingStrategy):
class ShardedQwenMoE(CustomMlxLayer): class ShardedQwenMoE(CustomMlxLayer):
def __init__(self, layer: nn.Module): def __init__(self, layer: _LayerCallable):
super().__init__(layer) super().__init__(layer)
self.sharding_group: mx.distributed.Group | None = None self.sharding_group: mx.distributed.Group | None = None
def __call__(self, x: mx.array) -> mx.array: def __call__(self, x: mx.array) -> mx.array:
if self.sharding_group is not None: if self.sharding_group is not None:
x = sum_gradients(self.sharding_group)(x) x = sum_gradients(self.sharding_group)(x)
y = self.original_layer.__call__(x) # type: ignore y = self.original_layer.__call__(x)
if self.sharding_group is not None: if self.sharding_group is not None:
y = mx.distributed.all_sum(y, group=self.sharding_group) # type: ignore y = mx.distributed.all_sum(y, group=self.sharding_group)
return y # type: ignore return y
class GptOssShardingStrategy(TensorParallelShardingStrategy): class GptOssShardingStrategy(TensorParallelShardingStrategy):
@@ -661,7 +674,7 @@ class ShardedGptOssMoE(CustomMlxLayer):
def __call__(self, x: mx.array) -> mx.array: def __call__(self, x: mx.array) -> mx.array:
if self.sharding_group is not None: if self.sharding_group is not None:
x = sum_gradients(self.sharding_group)(x) x = sum_gradients(self.sharding_group)(x)
y = self.original_layer(x) # type: ignore y = self.original_layer(x)
if self.sharding_group is not None: if self.sharding_group is not None:
y = mx.distributed.all_sum(y, group=self.sharding_group) # type: ignore y = mx.distributed.all_sum(y, group=self.sharding_group)
return y # type: ignore return y

View File

@@ -413,6 +413,11 @@ class Worker:
) )
for nid in conns: for nid in conns:
for ip in conns[nid]: for ip in conns[nid]:
if "127.0.0.1" in ip or "localhost" in ip:
logger.warning(
f"Loopback connection should not happen: {ip=} for {nid=}"
)
edge = SocketConnection( edge = SocketConnection(
# nonsense multiaddr # nonsense multiaddr
sink_multiaddr=Multiaddr(address=f"/ip4/{ip}/tcp/52415") sink_multiaddr=Multiaddr(address=f"/ip4/{ip}/tcp/52415")
@@ -433,9 +438,6 @@ class Worker:
for conn in self.state.topology.out_edges(self.node_id): for conn in self.state.topology.out_edges(self.node_id):
if not isinstance(conn.edge, SocketConnection): if not isinstance(conn.edge, SocketConnection):
continue continue
# ignore mDNS discovered connections
if conn.edge.sink_multiaddr.port != 52415:
continue
if ( if (
conn.sink not in conns conn.sink not in conns
or conn.edge.sink_multiaddr.ip_address or conn.edge.sink_multiaddr.ip_address
@@ -449,7 +451,7 @@ class Worker:
async def _emit_existing_download_progress(self) -> None: async def _emit_existing_download_progress(self) -> None:
try: try:
while True: while True:
logger.debug("Fetching and emitting existing download progress...") logger.info("Fetching and emitting existing download progress...")
async for ( async for (
_, _,
progress, progress,
@@ -480,7 +482,7 @@ class Worker:
await self.event_sender.send( await self.event_sender.send(
NodeDownloadProgress(download_progress=status) NodeDownloadProgress(download_progress=status)
) )
logger.debug("Done emitting existing download progress.") logger.info("Done emitting existing download progress.")
await anyio.sleep(5 * 60) # 5 minutes await anyio.sleep(5 * 60) # 5 minutes
except Exception as e: except Exception as e:
logger.error(f"Error emitting existing download progress: {e}") logger.error(f"Error emitting existing download progress: {e}")

View File

@@ -18,7 +18,7 @@ from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.shared.types.worker.shards import PipelineShardMetadata, TensorShardMetadata from exo.shared.types.worker.shards import PipelineShardMetadata, TensorShardMetadata
from exo.worker.engines.mlx import Model from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.generator.generate import mlx_generate from exo.worker.engines.mlx.generator.generate import mlx_generate
from exo.worker.engines.mlx.utils_mlx import shard_and_load, apply_chat_template from exo.worker.engines.mlx.utils_mlx import shard_and_load
class MockLayer(nn.Module): class MockLayer(nn.Module):
@@ -116,11 +116,12 @@ def run_gpt_oss_pipeline_device(
messages=[ChatCompletionMessage(role="user", content=prompt_text)], messages=[ChatCompletionMessage(role="user", content=prompt_text)],
max_tokens=max_tokens, max_tokens=max_tokens,
) )
prompt = apply_chat_template(tokenizer, task)
generated_text = "" generated_text = ""
for response in mlx_generate( for response in mlx_generate(
model=model, tokenizer=tokenizer, task=task, prompt=prompt model=model,
tokenizer=tokenizer,
task=task,
): ):
generated_text += response.text generated_text += response.text
if response.finish_reason is not None: if response.finish_reason is not None:
@@ -182,11 +183,11 @@ def run_gpt_oss_tensor_parallel_device(
max_tokens=max_tokens, max_tokens=max_tokens,
) )
prompt = apply_chat_template(tokenizer, task)
generated_text = "" generated_text = ""
for response in mlx_generate( for response in mlx_generate(
model=model, tokenizer=tokenizer, task=task, prompt=prompt model=model,
tokenizer=tokenizer,
task=task,
): ):
generated_text += response.text generated_text += response.text
if response.finish_reason is not None: if response.finish_reason is not None:

View File

@@ -10,8 +10,8 @@ import pytest
from exo.worker.engines.mlx.auto_parallel import ( from exo.worker.engines.mlx.auto_parallel import (
CustomMlxLayer, CustomMlxLayer,
patch_pipeline_first_layer, PipelineFirstLayer,
patch_pipeline_last_layer, PipelineLastLayer,
patch_pipeline_model, patch_pipeline_model,
) )
from exo.worker.tests.unittests.test_mlx.conftest import MockLayer from exo.worker.tests.unittests.test_mlx.conftest import MockLayer
@@ -50,8 +50,8 @@ def run_pipeline_device(
group = mx.distributed.init(backend="ring", strict=True) group = mx.distributed.init(backend="ring", strict=True)
mock = MockLayerInner() mock = MockLayerInner()
first = patch_pipeline_first_layer(mock, group) first = PipelineFirstLayer(mock, r=rank, group=group)
composed = patch_pipeline_last_layer(first, group) composed = PipelineLastLayer(first, r=rank, s=world_size, group=group)
# Wrap in a mock model, then wrap in PipelineParallelModel for all_gather # Wrap in a mock model, then wrap in PipelineParallelModel for all_gather
inner_model = MockModel([composed]) inner_model = MockModel([composed])
@@ -78,8 +78,8 @@ def test_composed_wrappers_delegate_attributes() -> None:
mock = MockLayer() mock = MockLayer()
group = mx.distributed.init() group = mx.distributed.init()
first = patch_pipeline_first_layer(mock, group) first = PipelineFirstLayer(mock, r=0, group=group)
composed = patch_pipeline_last_layer(first, group) composed = PipelineLastLayer(first, r=0, s=1, group=group)
assert composed.custom_attr == "test_value" # type: ignore[attr-defined] assert composed.custom_attr == "test_value" # type: ignore[attr-defined]
assert composed.use_sliding is True # type: ignore[attr-defined] assert composed.use_sliding is True # type: ignore[attr-defined]

View File

@@ -18,7 +18,6 @@ def _check_model_exists() -> bool:
pytestmark = [ pytestmark = [
pytest.mark.slow,
pytest.mark.skipif( pytest.mark.skipif(
not _check_model_exists(), not _check_model_exists(),
reason=f"GPT-OSS model not found at {DEFAULT_GPT_OSS_CONFIG.model_path}", reason=f"GPT-OSS model not found at {DEFAULT_GPT_OSS_CONFIG.model_path}",

View File

@@ -89,8 +89,6 @@ def get_test_models() -> list[tuple[str, ModelCard]]:
TEST_MODELS: list[tuple[str, ModelCard]] = get_test_models() TEST_MODELS: list[tuple[str, ModelCard]] = get_test_models()
pytestmark = pytest.mark.slow
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def event_loop(): def event_loop():

84
tmp/kill_bridge_plist.sh Normal file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env bash
set -euo pipefail
PREFS="${PREFS:-/Library/Preferences/SystemConfiguration/preferences.plist}"
tmpdir="$(mktemp -d)"
trap 'rm -rf "$tmpdir"' EXIT
injson="$tmpdir/in.json"
outjson="$tmpdir/out.json"
plutil -convert json -o "$injson" "$PREFS"
perl -Mstrict -Mwarnings -MJSON::PP -e '
my ($in, $out) = @ARGV;
open my $fh, "<", $in or die "open $in: $!";
local $/;
my $txt = <$fh>;
close $fh;
my $json = JSON::PP->new->utf8->relaxed(1);
my $d = $json->decode($txt);
if (ref($d->{VirtualNetworkInterfaces}) eq "HASH"
&& ref($d->{VirtualNetworkInterfaces}{Bridge}) eq "HASH") {
delete $d->{VirtualNetworkInterfaces}{Bridge}{bridge0};
}
my @bridge_svcs;
if (ref($d->{NetworkServices}) eq "HASH") {
for my $k (keys %{ $d->{NetworkServices} }) {
my $svc = $d->{NetworkServices}{$k};
next unless ref($svc) eq "HASH";
my $iface = $svc->{Interface};
next unless ref($iface) eq "HASH";
my $dev = $iface->{DeviceName};
if (defined $dev && $dev eq "bridge0") {
push @bridge_svcs, $k;
}
}
delete @{ $d->{NetworkServices} }{ @bridge_svcs } if @bridge_svcs;
}
my %is_bridge = map { $_ => 1 } @bridge_svcs;
if (ref($d->{Sets}) eq "HASH") {
for my $setk (keys %{ $d->{Sets} }) {
my $set = $d->{Sets}{$setk};
next unless ref($set) eq "HASH";
my $net = $set->{Network};
next unless ref($net) eq "HASH";
if (ref($net->{Interface}) eq "HASH") {
delete $net->{Interface}{bridge0};
}
if (ref($net->{Service}) eq "HASH" && @bridge_svcs) {
for my $svc (@bridge_svcs) {
delete $net->{Service}{$svc};
}
}
my $g = $net->{Global};
if (ref($g) eq "HASH"
&& ref($g->{IPv4}) eq "HASH"
&& ref($g->{IPv4}{ServiceOrder}) eq "ARRAY"
&& @bridge_svcs) {
my @so = @{ $g->{IPv4}{ServiceOrder} };
@so = grep { !defined($_) || !$is_bridge{$_} } @so;
$g->{IPv4}{ServiceOrder} = \@so;
}
}
}
open my $oh, ">", $out or die "open $out: $!";
print $oh JSON::PP->new->utf8->canonical(1)->pretty(1)->encode($d);
close $oh;
' "$injson" "$outjson"
# Convert JSON -> plist (write back as binary1; change to xml1 if you prefer)
plutil -convert xml1 -o "$PREFS" "$outjson"
# Ask configd to reload SystemConfiguration state
killall -HUP configd 2>/dev/null || true