Compare commits

..

44 Commits

Author SHA1 Message Date
Alex Cheema
17cb9e5425 fix: resolve post-rebase type errors (duplicate decls, missing tasks param)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
b72a5497e3 fix: cancel active tasks on meta-instance cascade delete
Previously, DeleteMetaInstance cascade-deleted backing instances without
cancelling their active tasks, leaving orphaned task references. Now emits
TaskStatusUpdated(Cancelled) for Pending/Running tasks before InstanceDeleted.

Also adds lifecycle logging for meta-instance operations, a GET /meta_instances
endpoint, and 2 regression tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
d379aba2a3 fix: dashboard derived reactivity bug and tautological check in meta-instance UI
1. DERIVED REACTIVITY BUG: `unifiedDisplayItems` used `$derived(fn)` which
   made the derived value the function itself instead of its result. Svelte
   never tracked reactive dependencies in the function body, so the instance
   list didn't update when metaInstances or instances changed. Fixed by using
   `$derived.by(fn)` and removing the `()` call-sites in the template.

2. TAUTOLOGICAL CHECK: In `getMetaInstancePlacingStatus`, the `lastError ? ...
   : null` guard inside the `failures > 0` branch was always true because
   `lastFailureError` and `consecutiveFailures` are always set together in
   `apply_instance_retrying` and `apply_instance_deleted`. Removed the dead
   `: null` branch.

Also fixes pyright errors in test file by using proper pytest.MonkeyPatch type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
5db1d023ec fix: add timeout and error handling for ModelCard.load in MetaInstanceReconciler
ModelCard.load() does async I/O inside the 1-second reconcile loop. A slow
or failing load blocked all reconciliation (health checks, node timeouts,
other meta-instances). Adds a 10-second timeout, per-meta-instance error
handling with MetaInstancePlacementFailed events, and documents the
intentional early return in apply_instance_retrying.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
bd148d815a fix: validate placement before sending command to prevent silent failures (BUG-001c)
The place_instance API endpoint used fire-and-forget: it sent the command
and returned HTTP 200 immediately. On a fresh cluster start, the master's
state often lacks topology/memory data, so placement raises ValueError
which was silently caught and logged. The caller never learned it failed.

Two fixes:
- API: validate placement locally before sending, return HTTP 400 on
  failure instead of silently accepting an unprocessable command
- Master: emit MetaInstancePlacementFailed on immediate placement error
  in CreateMetaInstance handler so the error surfaces in state right away

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
fe508320e3 fix: place_instance now uses all available nodes to prevent OOM (BUG-001d)
The placement algorithm previously selected the smallest viable cycle,
causing large models to be distributed across too few nodes and running
out of memory. Changed get_smallest_cycles to get_largest_cycles so that
all healthy nodes are utilized, spreading layers more evenly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
2d37a2be16 feat: add task cancellation for client disconnect handling (BUG-001)
- Add TaskCancelled command and Cancelled task status
- Detect API client disconnects in master/api.py
- Handle TaskCancelled in master state machine
- Add _cancel_tasks to worker for graceful task cleanup
- Add cancel_receiver to runner for inference abort
- Add mx_any helper in MLX utils for cancellable operations
- Guard instance lookup in worker to prevent KeyError
- Update tests for cancellation flow

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
ce06eab63b Fix model discovery reporting DownloadPending for fully-downloaded models
On startup, _emit_existing_download_progress() used
downloaded_bytes_this_session to decide between DownloadPending and
DownloadOngoing. Since downloaded_bytes_this_session is always 0 on
startup (it tracks the current session only), fully-downloaded models
were incorrectly reported as DownloadPending.

Now checks actual disk state: if downloaded_bytes >= total_bytes, emit
DownloadCompleted regardless of session bytes. This fixes the UI showing
models as pending when they're already available.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
b59a5ade51 fix: use force=True for multiprocessing set_start_method
Prevents RuntimeError when the context has already been set,
e.g. when Terminal.app reuses a tab or the process restarts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
010e4fe96d fix: eliminate command/reconciler interleaving race in meta-instance
Two race conditions existed in the meta-instance lifecycle:

1. CreateMetaInstance buffered MetaInstanceCreated but didn't apply it
   before awaiting ModelCard.load(). The reconciler could interleave
   during the await, leading to duplicate placements.

   Fix: apply MetaInstanceCreated eagerly via _apply_and_broadcast,
   then re-check satisfaction after the await so placement uses fresh
   state and skips if the reconciler already handled it.

2. delete_meta_instance (API handler) sent DeleteMetaInstance then
   read self.state.instances for cascade deletion. State was stale,
   so backing instances created between the send and the read were
   missed — permanently orphaning them.

   Fix: move cascade delete into the command processor's
   DeleteMetaInstance handler, where InstanceDeleted events are
   generated atomically with MetaInstanceDeleted.

Reproduced on 4-node Mac Mini cluster: 28K anomalies in stress test
including 21 permanently orphaned instances. After fix, the cascade
delete and placement are race-free.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
2301dde064 test: add 25 edge-case tests for MetaInstance lifecycle
Cover retry logic, error handling, backward compatibility,
concurrent scenarios, placement error tracking, and serialization.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
5b1ec42632 Fix JACCL SideChannel bytes serialization for JSON round-trip
TaggedModel's wrap validator converts JSON→Python validation context,
which breaks strict-mode bytes deserialization from JSON strings.
Use Base64Bytes type to encode/decode bytes as base64 strings in JSON.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
b7d64c94b4 Use named pipes (FIFOs) for JACCL SideChannel relay
Anonymous pipes from os.pipe() don't survive multiprocessing.Process
spawn on macOS (default since Python 3.8). The FD numbers are passed
but the actual file descriptors don't exist in the child process,
causing EBADF errors.

Switch to named pipes (FIFOs) which the child opens by path in the
spawned process, getting valid FDs for the C++ SideChannel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
84ecd01a98 Add pipe-based JACCL SideChannel relay via exo control plane
Replace fragile TCP SideChannel with anonymous pipes relayed through
exo's event-sourced control plane. RunnerSupervisor creates pipe pairs
for MlxJaccl instances, relays all_gather rounds via JacclSideChannelData/
JacclSideChannelGathered events through the master, eliminating errno=57
crashes from Thunderbolt RDMA driver instability.

Also includes dashboard RDMA warning improvements and instance retry fixes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
0f28c7280d Preserve last_failure_error across instance recreation, fix RDMA banner wording
- apply_instance_created no longer clears last_failure_error so the
  error context persists while the new instance starts up
- Dashboard retryError shows the error without (N/3) prefix when
  consecutiveFailures is 0 (instance was recreated)
- Jaccl warning tooltip now says "experimental RDMA driver in macOS"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
6f22f38df5 chore: remove temporary screenshot files
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
d97bce053e temp: add jaccl warning screenshots for PR comment
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
6a12efb669 dashboard: show warning banner for [jaccl] RDMA driver errors
Detect errors containing "[jaccl]" in MetaInstance failure errors and
display a red dismissible alert banner. The tooltip explains this is a
macOS RDMA driver issue and that the affected machine needs to be
restarted. Alert re-appears if a new error arrives after dismissal.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
388e28eace Retry runners within the same Instance instead of recreating
When runners fail for a MetaInstance-backed Instance, retry up to 3
times by restarting runners within the same Instance rather than
deleting and recreating it each time. After 3 failures, delete the
Instance so MetaInstanceReconciler can create a fresh one.

- Add InstanceRetrying event that removes runners from state (signaling
  workers to restart) and increments consecutive_failures on MetaInstance
- InstanceHealthReconciler emits InstanceRetrying when under retry limit,
  InstanceDeleted when exhausted or no MetaInstance
- Worker _kill_runner detects retry signal (runner deleted from state +
  terminal supervisor) and cleans up for _create_runner to recreate
- Worker _create_runner guards against oscillation by blocking creation
  while any peer runner has explicit terminal status
- InstanceCreated resets consecutive_failures for fresh starts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
675041748a Remove timestamp-based retry cooldown
Remove last_failure_at field and RETRY_COOLDOWN_SECONDS logic.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
1a69e14e79 Consolidate failure state onto MetaInstance, add 5s retry cooldown
Move placement_error, consecutive_failures, last_failure_error, and
last_failure_at directly onto the MetaInstance model instead of keeping
them as separate State mappings (meta_instance_errors, InstanceFailureInfo,
meta_instance_failure_info). Adds a 5-second cooldown between retry attempts
to prevent rapid instance churn when runners fail instantly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
0a8446cbcc Show retry attempt count with error message, e.g. (2/3)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
3a998410e5 Include node friendly names in runner error messages
Each error in the combined message is now prefixed with the node's friendly
name (e.g. "MacBook Pro: OOM; Mac Studio: connection reset") so the root
cause node is easily identifiable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
4af9b32f19 Remove permanent retry blocking, allow continuous retry batches
The dashboard % 3 logic already handles displaying retry progress in batches
(RETRYING 1/3, 2/3, 3/3, then PLACING with error, repeat). No need to
permanently block placement after 3 failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
972cd53521 Show retry count in exceeded retry limit message (3/3)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
ab5c86d478 Collect all runner error messages instead of just the last one
When multiple runners fail, concatenate all error messages with "; " so the
real error isn't hidden by generic side-effect failures from other runners.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
4cff623b84 Stop infinite retries after 3 failures, show errors persistently in dashboard
MetaInstanceReconciler now checks failure count before placement — after 3
consecutive failures it emits MetaInstancePlacementFailed instead of retrying
forever. Dashboard shows "Retrying after error: <msg>" in orange throughout
the retry cycle, not just during the brief window with no backing instance.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
7c90c43509 Add instance retry logic with max 3 retries and failure tracking
- Extend InstanceDeleted with failure_error field for runner crash info
- Add InstanceFailureInfo model tracking consecutive failures per MetaInstance
- InstanceHealthReconciler now detects runner failures (all terminal with
  at least one RunnerFailed) in addition to connection failures
- apply_instance_deleted increments failure counter for meta-bound instances
- Dashboard shows RETRYING (N/3) status with error messages, and
  "Instance re-created due to failure" after 3 consecutive failures
- Extract and display RunnerFailed error messages in instance status

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
fba47b802d Fix MetaInstance.node_ids frozenset failing JSON deserialization
frozenset serializes to a JSON array but cannot be deserialized back
in strict mode through the TaggedModel wrap validator (list → frozenset
coercion is rejected). Changed to list[NodeId] since the model is
already frozen/immutable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
8dd19c9817 Send node_ids from placement preview when launching instances
The dashboard now extracts node IDs from the selected preview's
memory_delta_by_node, ensuring the backend places on exactly the
nodes the user was shown. Also reverts incorrect RDMA min_nodes >= 2
enforcement since single-node RDMA is valid.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
22026fd31f Enforce min_nodes >= 2 for RDMA (MlxJaccl) instances
RDMA requires at least 2 nodes — a single-node RDMA instance is
nonsensical. Enforce this in both the dashboard (when building the
launch request) and the backend placement (when filtering cycles).
Previously, selecting RDMA would still place on 1 node because
min_nodes defaulted to 1 and the placement silently switched to Ring.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
d7066e7531 Ensure min_nodes >= node filter size when launching
When user selects specific nodes via the filter, min_nodes should be at
least the number of filtered nodes to prevent placement from picking a
smaller cycle.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
8840dfad1d Send node_ids from dashboard, error on RDMA when unavailable
Dashboard was not including the user's node filter in the POST to
/meta_instance, so placement ignored which nodes the user selected.
Also, placement silently fell back to Ring when RDMA was requested but
no RDMA-connected cycles were available — now raises an error that
surfaces via MetaInstancePlacementFailed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
e29c2ba170 Fix use_default validator silently ignoring sharding/instance_meta
The mode="plain" validator bypassed Pydantic's string-to-enum coercion,
so JSON strings like "Tensor" and "MlxJaccl" from the dashboard failed
the isinstance check and silently fell back to Pipeline/MlxRing defaults.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
d303df2848 Add placement error feedback and per-node loading status
Show why MetaInstance placement fails instead of stuck "PLACING", and
show per-node runner status during loading for multi-node instances.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
d79e1243e3 Show MetaInstance sharding/type while PLACING, fix MlxIbv references
When a MetaInstance has no backing instance yet, derive the strategy
display from the MetaInstance's own sharding and instanceMeta fields
rather than showing "Unknown (Unknown)".

Also clean up all stale MlxIbv references across the dashboard —
the backend enum is MlxJaccl.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
00e7f7942b Extract reconciler into ProcessManager protocol, fix RDMA instance type
- Replace inline _plan() with ProcessManager loop (_reconcile), tick
  every 1s instead of 10s — safe because all PMs are idempotent
- Fix dashboard sending "MlxIbv" instead of "MlxJaccl" for RDMA
  instance type, which silently fell back to MlxRing default
- Remove all stale MlxIbv references from dashboard

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
74d0577b2f Extract reconciler into ProcessManager protocol
Replace inline _plan() steps with a list of ProcessManagers, each
implementing async reconcile(State) -> Sequence[Event]. Tick every
1s instead of 10s — safe because all PMs are idempotent against state.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
ad8db70915 Simplify MetaInstance binding: put meta_instance_id on Instance
The separate MetaInstanceBound event + meta_instance_backing map
introduced two bugs: stale exclusion sets in the reconciler loop and
a delete ordering race. Embedding meta_instance_id directly on
BaseInstance eliminates the binding mechanism entirely — when an
instance is created for a MetaInstance it carries the ID, when
deleted the binding is gone. No separate map, no cleanup, no races.

Also fixes delete_meta_instance to cascade-delete backing instances.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:10 -08:00
Alex Cheema
eebd087977 Add explicit MetaInstance binding, slim MetaInstance to use ModelId
- Add MetaInstanceBound event and meta_instance_backing State field
  for explicit MetaInstance → Instance binding (prevents ambiguous
  linking when two MetaInstances have identical constraints)
- Replace model_card: ModelCard with model_id: ModelId on MetaInstance
  (load ModelCard on-demand at placement time)
- Add MetaInstance API endpoints (POST /meta_instance, DELETE)
- Update dashboard to use MetaInstances as primary primitive with
  unified display items merging MetaInstances and orphan instances
- Dashboard launches via MetaInstance instead of direct Instance creation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:09 -08:00
Alex Cheema
35c992d3b1 Add MetaInstance declarative layer with connection health checking
Introduces MetaInstance as a declarative constraint ensuring an instance
matching given parameters (model, sharding, min_nodes) always exists.
The master's reconciliation loop continuously checks for unsatisfied
meta-instances and attempts placement. Connection health checking
verifies that specific IPs (MlxRing) and RDMA interfaces (MlxJaccl)
stored on instances still exist as topology edges, enabling automatic
recovery when cables are swapped or interfaces change.

Also eliminates the master's loopback event path, unifying all event
emission through _apply_and_broadcast for simpler control flow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:09:09 -08:00
rltakashige
f2be929211 Leo/address rdma gpu locks 2 (#1515)
Same as #1489 . Had to revert and redo thanks to Claude.

---------

Co-authored-by: Jake Hillion <jake@hillion.co.uk>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:00:52 -08:00
rltakashige
83af8c63fa Revert "Use custom fork that resolves GPU locks" (#1502)
Reverts exo-explore/exo#1489

Goddammit Claude...
2026-02-17 18:18:54 +00:00
Evan Quiney
eccc6298d1 Revert "Add MetaInstance declarative layer (#1447)"
This reverts commit a962a28afc.
2026-02-17 18:11:47 +00:00
13 changed files with 58 additions and 52 deletions

View File

@@ -324,7 +324,7 @@ class DownloadCoordinator:
shard_metadata=progress.shard,
total_bytes=progress.total_bytes,
)
elif progress.downloaded_bytes.in_bytes == 0:
elif progress.downloaded_bytes_this_session.in_bytes == 0:
status = DownloadPending(
node_id=self.node_id,
shard_metadata=progress.shard,

View File

@@ -603,10 +603,10 @@ class API:
break
except anyio.get_cancelled_exc_class():
cancel_command = TaskCancelled(cancelled_command_id=command_id)
command = TaskCancelled(cancelled_command_id=command_id)
with anyio.CancelScope(shield=True):
await self.command_sender.send(
ForwarderCommand(origin=self.node_id, command=cancel_command)
ForwarderCommand(origin=self.node_id, command=command)
)
raise
finally:
@@ -946,10 +946,10 @@ class API:
del image_metadata[key]
except anyio.get_cancelled_exc_class():
cancel_command = TaskCancelled(cancelled_command_id=command_id)
command = TaskCancelled(cancelled_command_id=command_id)
with anyio.CancelScope(shield=True):
await self.command_sender.send(
ForwarderCommand(origin=self.node_id, command=cancel_command)
ForwarderCommand(origin=self.node_id, command=command)
)
raise
finally:
@@ -1032,10 +1032,10 @@ class API:
return (images, stats if capture_stats else None)
except anyio.get_cancelled_exc_class():
cancel_command = TaskCancelled(cancelled_command_id=command_id)
command = TaskCancelled(cancelled_command_id=command_id)
with anyio.CancelScope(shield=True):
await self.command_sender.send(
ForwarderCommand(origin=self.node_id, command=cancel_command)
ForwarderCommand(origin=self.node_id, command=command)
)
raise
finally:

View File

@@ -417,19 +417,16 @@ class Master:
)
case TaskCancelled():
if (
command.cancelled_command_id
in self.command_task_mapping
):
task_id := self.command_task_mapping.get(
command.cancelled_command_id
)
) is not None:
generated_events.append(
TaskDeleted(
task_id=self.command_task_mapping[
command.cancelled_command_id
]
TaskStatusUpdated(
task_status=TaskStatus.Cancelled,
task_id=task_id,
)
)
del self.command_task_mapping[
command.cancelled_command_id
]
case TaskFinished():
generated_events.append(
TaskDeleted(
@@ -438,10 +435,9 @@ class Master:
]
)
)
if command.finished_command_id in self.command_task_mapping:
del self.command_task_mapping[
command.finished_command_id
]
self.command_task_mapping.pop(
command.finished_command_id, None
)
case RequestEventLog():
# We should just be able to send everything, since other buffers will ignore old messages
# rate limit to 1000 at a time

View File

@@ -200,7 +200,7 @@ def try_place_for_meta_instance(
current_instances: Mapping[InstanceId, Instance],
node_memory: Mapping[NodeId, MemoryUsage],
node_network: Mapping[NodeId, NodeNetworkInfo],
tasks: Mapping[TaskId, Task],
tasks: Mapping[TaskId, Task] | None = None,
) -> PlacementResult:
"""Try to place an instance satisfying the meta-instance constraints.
@@ -233,7 +233,7 @@ def try_place_for_meta_instance(
)
return PlacementResult(
events=list(
get_transition_events(current_instances, target_instances, tasks)
get_transition_events(current_instances, target_instances, tasks or {})
),
error=None,
)

View File

@@ -105,6 +105,7 @@ Command = (
| TaskCancelled
| CreateMetaInstance
| DeleteMetaInstance
| TaskCancelled
| TaskFinished
| SendInputChunk
)

View File

@@ -61,7 +61,7 @@ class TextGeneration(BaseTask): # emitted by Master
error_message: str | None = Field(default=None)
class CancelTask(BaseTask): # emitted by Worker when master cancels a task
class CancelTask(BaseTask):
cancelled_task_id: TaskId
runner_id: RunnerId

View File

@@ -125,7 +125,9 @@ class MpSender[T]:
self._state.buffer.put(item, block=True)
async def send_async(self, item: T) -> None:
await to_thread.run_sync(self.send, item, limiter=CapacityLimiter(1))
await to_thread.run_sync(
self.send, item, limiter=CapacityLimiter(1), abandon_on_cancel=True
)
def close(self) -> None:
if not self._state.closed.is_set():

View File

@@ -34,6 +34,7 @@ from exo.shared.types.events import (
from exo.shared.types.multiaddr import Multiaddr
from exo.shared.types.state import State
from exo.shared.types.tasks import (
CancelTask,
CreateRunner,
DownloadModel,
ImageEdits,
@@ -234,15 +235,22 @@ class Worker:
)
)
case Shutdown(runner_id=runner_id):
runner = self.runners.pop(runner_id)
try:
with fail_after(3):
await self.runners.pop(runner_id).start_task(task)
await runner.start_task(task)
except TimeoutError:
await self.event_sender.send(
TaskStatusUpdated(
task_id=task.task_id, task_status=TaskStatus.TimedOut
)
)
finally:
runner.shutdown()
case CancelTask(
cancelled_task_id=cancelled_task_id, runner_id=runner_id
):
await self.runners[runner_id].cancel_task(cancelled_task_id)
case ImageEdits() if task.task_params.total_input_chunks > 0:
# Assemble image from chunks and inject into task
cmd_id = task.command_id
@@ -280,18 +288,18 @@ class Worker:
del self.input_chunk_buffer[cmd_id]
if cmd_id in self.input_chunk_counts:
del self.input_chunk_counts[cmd_id]
await self.runners[self._task_to_runner_id(task)].start_task(
modified_task
)
await self._start_runner_task(modified_task)
case task:
await self.runners[self._task_to_runner_id(task)].start_task(task)
await self._start_runner_task(task)
def shutdown(self):
self._tg.cancel_scope.cancel()
def _task_to_runner_id(self, task: Task):
instance = self.state.instances[task.instance_id]
return instance.shard_assignments.node_to_runner[self.node_id]
async def _start_runner_task(self, task: Task):
if (instance := self.state.instances.get(task.instance_id)) is not None:
await self.runners[
instance.shard_assignments.node_to_runner[self.node_id]
].start_task(task)
async def _nack_request(self, since_idx: int) -> None:
# We request all events after (and including) the missing index.

View File

@@ -328,8 +328,7 @@ def _pending_tasks(
def _cancel_tasks(
runners: Mapping[RunnerId, RunnerSupervisor],
tasks: Mapping[TaskId, Task],
) -> CancelTask | None:
"""Find a cancelled task that hasn't been sent to the runner yet."""
) -> Task | None:
for task in tasks.values():
if task.task_status != TaskStatus.Cancelled:
continue

View File

@@ -67,9 +67,7 @@ def entrypoint(
try:
event_sender.close()
task_receiver.close()
cancel_receiver.close()
finally:
event_sender.join()
task_receiver.join()
cancel_receiver.join()
logger.info("bye from the runner")

View File

@@ -243,7 +243,7 @@ def main(
assert inference_model
assert tokenizer
t = time.perf_counter()
t = time.monotonic()
toks = warmup_inference(
model=inference_model,
tokenizer=tokenizer,
@@ -251,7 +251,7 @@ def main(
)
logger.info(f"warmed up by generating {toks} tokens")
check_for_cancel_every = min(
math.ceil(toks / max(time.perf_counter() - t, 0.001)), 100
math.ceil(toks / min(time.monotonic() - t, 0.001)), 100
)
if group is not None:
check_for_cancel_every = int(

View File

@@ -72,8 +72,8 @@ class RunnerSupervisor:
initialize_timeout: float
_ev_recv: MpReceiver[Event]
_task_sender: MpSender[Task]
_cancel_sender: MpSender[TaskId]
_event_sender: Sender[Event]
_cancel_sender: MpSender[TaskId]
_pipe_read_fd: int | None = None # Python reads runner's pipe output
_pipe_write_fd: int | None = None # Python writes gathered data to runner
_child_pipe_fds: tuple[int, int] | None = None # fds to close after fork
@@ -188,6 +188,8 @@ class RunnerSupervisor:
self._cancel_sender.send(TaskId("CANCEL_CURRENT_TASK"))
self._cancel_sender.close()
self._event_sender.close()
self._cancel_sender.send(TaskId("CANCEL_CURRENT_TASK"))
self._cancel_sender.close()
self._close_pipe_fds()
self.runner_process.join(1)
if not self.runner_process.is_alive():

View File

@@ -1,7 +1,9 @@
# Check tasks are complete before runner is ever ready.
import unittest.mock
from collections.abc import Iterable
from typing import Callable
import mlx.core as mx
import pytest
import exo.worker.runner.runner as mlx_runner
@@ -115,12 +117,6 @@ def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", nothin)
monkeypatch.setattr(mlx_runner, "mx_any", make_nothin(False))
# Mock mx.distributed.all_gather so MockGroup doesn't hit real MLX C++ bindings.
def _mock_all_gather(x: object, **_kw: object) -> object:
return x
monkeypatch.setattr(mlx_runner.mx.distributed, "all_gather", _mock_all_gather)
# Mock apply_chat_template since we're using a fake tokenizer (integer 1).
# Returns a prompt without thinking tag so detect_thinking_prompt_suffix returns None.
monkeypatch.setattr(mlx_runner, "apply_chat_template", make_nothin("test prompt"))
@@ -185,12 +181,16 @@ def _run(tasks: Iterable[Task]):
cancel_receiver.close = nothin
cancel_receiver.join = nothin
mlx_runner.main(
bound_instance,
event_sender, # pyright: ignore[reportArgumentType]
task_receiver,
cancel_receiver,
)
with unittest.mock.patch(
"exo.worker.runner.runner.mx.distributed.all_gather",
make_nothin(mx.array([1])),
):
mlx_runner.main(
bound_instance,
event_sender, # pyright: ignore[reportArgumentType]
task_receiver,
cancel_receiver,
)
return event_sender.events