Compare commits

..

1 Commits

Author SHA1 Message Date
Evan Quiney
ba611f9cd0 Revert "report macmon failures more aggressively (#1618)" (#1625)
this pr broke macmon - revert it
2026-02-25 19:15:55 +00:00
2 changed files with 18 additions and 18 deletions

View File

@@ -542,13 +542,10 @@ class InfoGatherer:
if not p.stdout:
logger.critical("MacMon closed stdout")
return
t = TextReceiveStream(BufferedByteReceiveStream(p.stdout))
while True:
with anyio.fail_after(self.macmon_interval * 3):
macmon_output = await t.receive()
await self.info_sender.send(
MacmonMetrics.from_raw_json(macmon_output)
)
async for text in TextReceiveStream(
BufferedByteReceiveStream(p.stdout)
):
await self.info_sender.send(MacmonMetrics.from_raw_json(text))
except CalledProcessError as e:
stderr_msg = "no stderr"
stderr_output = cast(bytes | str | None, e.stderr)
@@ -559,12 +556,8 @@ class InfoGatherer:
else str(stderr_output)
)
logger.warning(
f"memory monitor failed with return code {e.returncode}: {stderr_msg}"
)
except TimeoutError:
logger.warning(
f"memory monitor silent for {self.macmon_interval * 3}s - reloading"
f"MacMon failed with return code {e.returncode}: {stderr_msg}"
)
except Exception as e:
logger.opt(exception=e).warning("Error in memory monitor")
logger.warning(f"Error in macmon monitor: {e}")
await anyio.sleep(self.macmon_interval)

View File

@@ -2,7 +2,6 @@ import json
import os
import re
import sys
import tempfile
import time
from pathlib import Path
from typing import Any, cast
@@ -99,13 +98,14 @@ def mlx_distributed_init(
rank = bound_instance.bound_shard.device_rank
logger.info(f"Starting initialization for rank {rank}")
with tempfile.TemporaryDirectory() as tmpdir:
coordination_file = str(
Path(tmpdir) / f"hosts_{bound_instance.instance.instance_id}_{rank}.json"
)
coordination_file = None
try:
# TODO: singleton instances
match bound_instance.instance:
case MlxRingInstance(hosts_by_node=hosts_by_node, ephemeral_port=_):
coordination_file = (
f"./hosts_{bound_instance.instance.instance_id}_{rank}.json"
)
hosts_for_node = hosts_by_node[bound_instance.bound_node_id]
hosts_json = HostList.from_hosts(hosts_for_node).model_dump_json()
@@ -128,6 +128,9 @@ def mlx_distributed_init(
jaccl_devices[i][i] is None for i in range(len(jaccl_devices))
)
# Use RDMA connectivity matrix
coordination_file = (
f"./hosts_{bound_instance.instance.instance_id}_{rank}.json"
)
jaccl_devices_json = json.dumps(jaccl_devices)
with open(coordination_file, "w") as f:
@@ -147,6 +150,10 @@ def mlx_distributed_init(
logger.info(f"Rank {rank} mlx distributed initialization complete")
return group
finally:
with contextlib.suppress(FileNotFoundError):
if coordination_file:
os.remove(coordination_file)
def initialize_mlx(