Compare commits

..

3 Commits

Author SHA1 Message Date
Alex Cheema
7d42ded97f test: verify instance deletion cancels ongoing tasks (#1215)
The cancellation logic already exists in get_transition_events() but had
no test coverage. Add tests confirming that deleting an instance emits
TaskStatusUpdated(Cancelled) events for all Pending/Running tasks on
that instance while leaving tasks on other instances untouched.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 10:05:08 -08:00
Alex Cheema
e84d570291 dashboard: show failure reasons for runner and download errors (#1350)
Surface error messages that the backend already captures but the
dashboard was discarding. Runner failures now extract errorMessage from
RunnerFailed status, and download failures display errorMessage from
DownloadFailed payloads on both the main page and downloads page.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:49:31 -08:00
Alex Cheema
d6301ed593 dashboard: redesign downloads page as model×node table (#1465)
## Motivation

The current downloads page uses a node-centric card grid layout that is
messy and hard to read — the same model across different nodes appears
in separate cards, and deep nesting wastes space. This makes it
difficult to quickly see which models are on which nodes.

## Changes

Rewrote the downloads page
(`dashboard/src/routes/downloads/+page.svelte`) from a card grid to a
clean table layout:

- **Rows** = models (unique across all nodes)
- **Columns** = nodes (with disk free shown in header)
- **Cells** show status at a glance:
  -  Green checkmark + size for completed downloads
  - 🟡 Yellow percentage + mini progress bar + speed for active downloads
  - `...` for pending downloads
  -  Red X for failed downloads
  - `--` for models not present on a node
- Delete/download action buttons appear on row hover
- Model name column is sticky on horizontal scroll (for many-node
clusters)
- Models sorted by number of nodes with completed downloads
- Imported shared utilities from `$lib/utils/downloads` instead of
inline re-implementations

### Backend: model directory in download events

- Added `model_directory` field to `BaseDownloadProgress` so all
download status events include the on-disk path
- Added `_model_dir()` helper to `DownloadCoordinator` to compute the
path from `EXO_MODELS_DIR`
- Dashboard uses this to show file location and enable "open in Finder"
for completed downloads

### Info modal

- Clicking a model name opens an info modal showing card details
(family, quantization, capabilities, storage size, layer count, tensor
parallelism support)

### Other fixes

- Fixed model name truncation in the table
- Excluded `tests/start_distributed_test.py` from pytest collection (CLI
script that calls `sys.exit()` at import time)

## Test Plan

- [x] `uv run basedpyright` — 0 errors
- [x] `uv run ruff check` — all passed
- [x] `nix fmt` — clean
- [x] `uv run pytest` — 188 passed, 1 skipped

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 14:31:47 +00:00
6 changed files with 781 additions and 498 deletions

View File

@@ -1084,7 +1084,7 @@
return {
isDownloading: false,
isFailed: statusInfo.statusText === "FAILED",
errorMessage: null,
errorMessage: statusInfo.errorMessage,
progress: null,
statusText: statusInfo.statusText,
perNode: [],
@@ -1142,10 +1142,15 @@
function deriveInstanceStatus(instanceWrapped: unknown): {
statusText: string;
statusClass: string;
errorMessage: string | null;
} {
const [, instance] = getTagged(instanceWrapped);
if (!instance || typeof instance !== "object") {
return { statusText: "PREPARING", statusClass: "inactive" };
return {
statusText: "PREPARING",
statusClass: "inactive",
errorMessage: null,
};
}
const inst = instance as {
@@ -1153,50 +1158,106 @@
};
const runnerIds = Object.keys(inst.shardAssignments?.runnerToShard || {});
const statusMap: Record<string, string> = {
RunnerWaitingForInitialization: "WaitingForInitialization",
RunnerInitializingBackend: "InitializingBackend",
RunnerWaitingForModel: "WaitingForModel",
RunnerLoading: "Loading",
RunnerLoaded: "Loaded",
RunnerWarmingUp: "WarmingUp",
RunnerReady: "Ready",
RunnerRunning: "Running",
RunnerShutdown: "Shutdown",
RunnerFailed: "Failed",
};
const statuses = runnerIds
.map((rid) => {
const r = runnersData[rid];
if (!r) return null;
const [kind] = getTagged(r);
const statusMap: Record<string, string> = {
RunnerWaitingForInitialization: "WaitingForInitialization",
RunnerInitializingBackend: "InitializingBackend",
RunnerWaitingForModel: "WaitingForModel",
RunnerLoading: "Loading",
RunnerLoaded: "Loaded",
RunnerWarmingUp: "WarmingUp",
RunnerReady: "Ready",
RunnerRunning: "Running",
RunnerShutdown: "Shutdown",
RunnerFailed: "Failed",
};
return kind ? statusMap[kind] || null : null;
const [kind, payload] = getTagged(r);
if (!kind || !statusMap[kind]) return null;
const errorMessage =
kind === "RunnerFailed" && payload && typeof payload === "object"
? (((payload as Record<string, unknown>).errorMessage as string) ??
null)
: null;
return { status: statusMap[kind], errorMessage };
})
.filter((s): s is string => s !== null);
.filter(
(s): s is { status: string; errorMessage: string | null } => s !== null,
);
const has = (s: string) => statuses.includes(s);
const has = (s: string) => statuses.some((e) => e.status === s);
if (statuses.length === 0)
return { statusText: "PREPARING", statusClass: "inactive" };
if (has("Failed")) return { statusText: "FAILED", statusClass: "failed" };
return {
statusText: "PREPARING",
statusClass: "inactive",
errorMessage: null,
};
if (has("Failed")) {
const failedEntry = statuses.find(
(e) => e.status === "Failed" && e.errorMessage,
);
return {
statusText: "FAILED",
statusClass: "failed",
errorMessage: failedEntry?.errorMessage ?? null,
};
}
if (has("Shutdown"))
return { statusText: "SHUTDOWN", statusClass: "inactive" };
return {
statusText: "SHUTDOWN",
statusClass: "inactive",
errorMessage: null,
};
if (has("Loading"))
return { statusText: "LOADING", statusClass: "starting" };
return {
statusText: "LOADING",
statusClass: "starting",
errorMessage: null,
};
if (has("WarmingUp"))
return { statusText: "WARMING UP", statusClass: "starting" };
return {
statusText: "WARMING UP",
statusClass: "starting",
errorMessage: null,
};
if (has("Running"))
return { statusText: "RUNNING", statusClass: "running" };
if (has("Ready")) return { statusText: "READY", statusClass: "loaded" };
if (has("Loaded")) return { statusText: "LOADED", statusClass: "loaded" };
return {
statusText: "RUNNING",
statusClass: "running",
errorMessage: null,
};
if (has("Ready"))
return { statusText: "READY", statusClass: "loaded", errorMessage: null };
if (has("Loaded"))
return {
statusText: "LOADED",
statusClass: "loaded",
errorMessage: null,
};
if (has("WaitingForModel"))
return { statusText: "WAITING", statusClass: "starting" };
return {
statusText: "WAITING",
statusClass: "starting",
errorMessage: null,
};
if (has("InitializingBackend"))
return { statusText: "INITIALIZING", statusClass: "starting" };
return {
statusText: "INITIALIZING",
statusClass: "starting",
errorMessage: null,
};
if (has("WaitingForInitialization"))
return { statusText: "INITIALIZING", statusClass: "starting" };
return {
statusText: "INITIALIZING",
statusClass: "starting",
errorMessage: null,
};
return { statusText: "RUNNING", statusClass: "active" };
return { statusText: "RUNNING", statusClass: "active", errorMessage: null };
}
function getBytes(value: unknown): number {

View File

File diff suppressed because it is too large Load Diff

View File

@@ -132,7 +132,7 @@ markers = [
env = [
"EXO_TESTS=1"
]
addopts = "-m 'not slow'"
addopts = "-m 'not slow' --ignore=tests/start_distributed_test.py"
filterwarnings = [
"ignore:builtin type Swig:DeprecationWarning",
]

View File

@@ -14,6 +14,7 @@ from exo.download.download_utils import (
map_repo_download_progress_to_download_progress_data,
)
from exo.download.shard_downloader import ShardDownloader
from exo.shared.constants import EXO_MODELS_DIR
from exo.shared.models.model_cards import ModelId
from exo.shared.types.commands import (
CancelDownload,
@@ -63,6 +64,9 @@ class DownloadCoordinator:
self.event_sender, self.event_receiver = channel[Event]()
self.shard_downloader.on_progress(self._download_progress_callback)
def _model_dir(self, model_id: ModelId) -> str:
return str(EXO_MODELS_DIR / model_id.normalize())
async def _download_progress_callback(
self, callback_shard: ShardMetadata, progress: RepoDownloadProgress
) -> None:
@@ -74,6 +78,7 @@ class DownloadCoordinator:
shard_metadata=callback_shard,
node_id=self.node_id,
total_bytes=progress.total_bytes,
model_directory=self._model_dir(model_id),
)
self.download_status[model_id] = completed
await self.event_sender.send(
@@ -93,6 +98,7 @@ class DownloadCoordinator:
download_progress=map_repo_download_progress_to_download_progress_data(
progress
),
model_directory=self._model_dir(model_id),
)
self.download_status[model_id] = ongoing
await self.event_sender.send(
@@ -170,7 +176,11 @@ class DownloadCoordinator:
return
# Emit pending status
progress = DownloadPending(shard_metadata=shard, node_id=self.node_id)
progress = DownloadPending(
shard_metadata=shard,
node_id=self.node_id,
model_directory=self._model_dir(model_id),
)
self.download_status[model_id] = progress
await self.event_sender.send(NodeDownloadProgress(download_progress=progress))
@@ -184,6 +194,7 @@ class DownloadCoordinator:
shard_metadata=shard,
node_id=self.node_id,
total_bytes=initial_progress.total_bytes,
model_directory=self._model_dir(model_id),
)
self.download_status[model_id] = completed
await self.event_sender.send(
@@ -206,6 +217,7 @@ class DownloadCoordinator:
download_progress=map_repo_download_progress_to_download_progress_data(
initial_progress
),
model_directory=self._model_dir(model_id),
)
self.download_status[model_id] = status
self.event_sender.send_nowait(NodeDownloadProgress(download_progress=status))
@@ -219,6 +231,7 @@ class DownloadCoordinator:
shard_metadata=shard,
node_id=self.node_id,
error_message=str(e),
model_directory=self._model_dir(model_id),
)
self.download_status[model_id] = failed
await self.event_sender.send(
@@ -253,6 +266,7 @@ class DownloadCoordinator:
pending = DownloadPending(
shard_metadata=current_status.shard_metadata,
node_id=self.node_id,
model_directory=self._model_dir(model_id),
)
await self.event_sender.send(
NodeDownloadProgress(download_progress=pending)
@@ -295,11 +309,18 @@ class DownloadCoordinator:
node_id=self.node_id,
shard_metadata=progress.shard,
total_bytes=progress.total_bytes,
model_directory=self._model_dir(
progress.shard.model_card.model_id
),
)
elif progress.status in ["in_progress", "not_started"]:
if progress.downloaded_bytes_this_session.in_bytes == 0:
status = DownloadPending(
node_id=self.node_id, shard_metadata=progress.shard
node_id=self.node_id,
shard_metadata=progress.shard,
model_directory=self._model_dir(
progress.shard.model_card.model_id
),
)
else:
status = DownloadOngoing(
@@ -308,6 +329,9 @@ class DownloadCoordinator:
download_progress=map_repo_download_progress_to_download_progress_data(
progress
),
model_directory=self._model_dir(
progress.shard.model_card.model_id
),
)
else:
continue

View File

@@ -14,10 +14,12 @@ from exo.shared.models.model_cards import ModelCard, ModelId, ModelTask
from exo.shared.topology import Topology
from exo.shared.types.commands import PlaceInstance
from exo.shared.types.common import CommandId, NodeId
from exo.shared.types.events import InstanceCreated, InstanceDeleted
from exo.shared.types.events import InstanceCreated, InstanceDeleted, TaskStatusUpdated
from exo.shared.types.memory import Memory
from exo.shared.types.multiaddr import Multiaddr
from exo.shared.types.profiling import NetworkInterfaceInfo, NodeNetworkInfo
from exo.shared.types.tasks import TaskId, TaskStatus, TextGeneration
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
from exo.shared.types.topology import Connection, SocketConnection
from exo.shared.types.worker.instances import (
Instance,
@@ -456,3 +458,117 @@ def test_tensor_rdma_backend_connectivity_matrix(
else:
ip_part = coordinator.split(":")[0]
assert len(ip_part.split(".")) == 4
def _make_task(
instance_id: InstanceId,
status: TaskStatus = TaskStatus.Running,
) -> TextGeneration:
return TextGeneration(
task_id=TaskId(),
task_status=status,
instance_id=instance_id,
command_id=CommandId(),
task_params=TextGenerationTaskParams(
model=ModelId("test-model"),
input=[InputMessage(role="user", content="hello")],
),
)
def test_get_transition_events_delete_instance_cancels_running_tasks(
instance: Instance,
):
# arrange
instance_id = InstanceId()
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
target_instances: dict[InstanceId, Instance] = {}
task = _make_task(instance_id, TaskStatus.Running)
tasks = {task.task_id: task}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert cancellation event should come before the deletion event
assert len(events) == 2
assert isinstance(events[0], TaskStatusUpdated)
assert events[0].task_id == task.task_id
assert events[0].task_status == TaskStatus.Cancelled
assert isinstance(events[1], InstanceDeleted)
assert events[1].instance_id == instance_id
def test_get_transition_events_delete_instance_cancels_pending_tasks(
instance: Instance,
):
# arrange
instance_id = InstanceId()
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
target_instances: dict[InstanceId, Instance] = {}
task = _make_task(instance_id, TaskStatus.Pending)
tasks = {task.task_id: task}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert
assert len(events) == 2
assert isinstance(events[0], TaskStatusUpdated)
assert events[0].task_id == task.task_id
assert events[0].task_status == TaskStatus.Cancelled
assert isinstance(events[1], InstanceDeleted)
def test_get_transition_events_delete_instance_ignores_completed_tasks(
instance: Instance,
):
# arrange
instance_id = InstanceId()
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
target_instances: dict[InstanceId, Instance] = {}
tasks = {
t.task_id: t
for t in [
_make_task(instance_id, TaskStatus.Complete),
_make_task(instance_id, TaskStatus.Failed),
_make_task(instance_id, TaskStatus.TimedOut),
_make_task(instance_id, TaskStatus.Cancelled),
]
}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert only the InstanceDeleted event, no cancellations
assert len(events) == 1
assert isinstance(events[0], InstanceDeleted)
def test_get_transition_events_delete_instance_cancels_only_matching_tasks(
instance: Instance,
):
# arrange
instance_id_a = InstanceId()
instance_id_b = InstanceId()
current_instances: dict[InstanceId, Instance] = {
instance_id_a: instance,
instance_id_b: instance,
}
# only delete instance A, keep instance B
target_instances: dict[InstanceId, Instance] = {instance_id_b: instance}
task_a = _make_task(instance_id_a, TaskStatus.Running)
task_b = _make_task(instance_id_b, TaskStatus.Running)
tasks = {task_a.task_id: task_a, task_b.task_id: task_b}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert only task_a should be cancelled
cancel_events = [e for e in events if isinstance(e, TaskStatusUpdated)]
delete_events = [e for e in events if isinstance(e, InstanceDeleted)]
assert len(cancel_events) == 1
assert cancel_events[0].task_id == task_a.task_id
assert cancel_events[0].task_status == TaskStatus.Cancelled
assert len(delete_events) == 1
assert delete_events[0].instance_id == instance_id_a

View File

@@ -26,6 +26,7 @@ class DownloadProgressData(CamelCaseModel):
class BaseDownloadProgress(TaggedModel):
node_id: NodeId
shard_metadata: ShardMetadata
model_directory: str = ""
class DownloadPending(BaseDownloadProgress):