mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-19 07:17:30 -05:00
Compare commits
2 Commits
gh-screens
...
cancel-tas
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d42ded97f | ||
|
|
e84d570291 |
@@ -1084,7 +1084,7 @@
|
||||
return {
|
||||
isDownloading: false,
|
||||
isFailed: statusInfo.statusText === "FAILED",
|
||||
errorMessage: null,
|
||||
errorMessage: statusInfo.errorMessage,
|
||||
progress: null,
|
||||
statusText: statusInfo.statusText,
|
||||
perNode: [],
|
||||
@@ -1142,10 +1142,15 @@
|
||||
function deriveInstanceStatus(instanceWrapped: unknown): {
|
||||
statusText: string;
|
||||
statusClass: string;
|
||||
errorMessage: string | null;
|
||||
} {
|
||||
const [, instance] = getTagged(instanceWrapped);
|
||||
if (!instance || typeof instance !== "object") {
|
||||
return { statusText: "PREPARING", statusClass: "inactive" };
|
||||
return {
|
||||
statusText: "PREPARING",
|
||||
statusClass: "inactive",
|
||||
errorMessage: null,
|
||||
};
|
||||
}
|
||||
|
||||
const inst = instance as {
|
||||
@@ -1153,50 +1158,106 @@
|
||||
};
|
||||
const runnerIds = Object.keys(inst.shardAssignments?.runnerToShard || {});
|
||||
|
||||
const statusMap: Record<string, string> = {
|
||||
RunnerWaitingForInitialization: "WaitingForInitialization",
|
||||
RunnerInitializingBackend: "InitializingBackend",
|
||||
RunnerWaitingForModel: "WaitingForModel",
|
||||
RunnerLoading: "Loading",
|
||||
RunnerLoaded: "Loaded",
|
||||
RunnerWarmingUp: "WarmingUp",
|
||||
RunnerReady: "Ready",
|
||||
RunnerRunning: "Running",
|
||||
RunnerShutdown: "Shutdown",
|
||||
RunnerFailed: "Failed",
|
||||
};
|
||||
|
||||
const statuses = runnerIds
|
||||
.map((rid) => {
|
||||
const r = runnersData[rid];
|
||||
if (!r) return null;
|
||||
const [kind] = getTagged(r);
|
||||
const statusMap: Record<string, string> = {
|
||||
RunnerWaitingForInitialization: "WaitingForInitialization",
|
||||
RunnerInitializingBackend: "InitializingBackend",
|
||||
RunnerWaitingForModel: "WaitingForModel",
|
||||
RunnerLoading: "Loading",
|
||||
RunnerLoaded: "Loaded",
|
||||
RunnerWarmingUp: "WarmingUp",
|
||||
RunnerReady: "Ready",
|
||||
RunnerRunning: "Running",
|
||||
RunnerShutdown: "Shutdown",
|
||||
RunnerFailed: "Failed",
|
||||
};
|
||||
return kind ? statusMap[kind] || null : null;
|
||||
const [kind, payload] = getTagged(r);
|
||||
if (!kind || !statusMap[kind]) return null;
|
||||
const errorMessage =
|
||||
kind === "RunnerFailed" && payload && typeof payload === "object"
|
||||
? (((payload as Record<string, unknown>).errorMessage as string) ??
|
||||
null)
|
||||
: null;
|
||||
return { status: statusMap[kind], errorMessage };
|
||||
})
|
||||
.filter((s): s is string => s !== null);
|
||||
.filter(
|
||||
(s): s is { status: string; errorMessage: string | null } => s !== null,
|
||||
);
|
||||
|
||||
const has = (s: string) => statuses.includes(s);
|
||||
const has = (s: string) => statuses.some((e) => e.status === s);
|
||||
|
||||
if (statuses.length === 0)
|
||||
return { statusText: "PREPARING", statusClass: "inactive" };
|
||||
if (has("Failed")) return { statusText: "FAILED", statusClass: "failed" };
|
||||
return {
|
||||
statusText: "PREPARING",
|
||||
statusClass: "inactive",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("Failed")) {
|
||||
const failedEntry = statuses.find(
|
||||
(e) => e.status === "Failed" && e.errorMessage,
|
||||
);
|
||||
return {
|
||||
statusText: "FAILED",
|
||||
statusClass: "failed",
|
||||
errorMessage: failedEntry?.errorMessage ?? null,
|
||||
};
|
||||
}
|
||||
if (has("Shutdown"))
|
||||
return { statusText: "SHUTDOWN", statusClass: "inactive" };
|
||||
return {
|
||||
statusText: "SHUTDOWN",
|
||||
statusClass: "inactive",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("Loading"))
|
||||
return { statusText: "LOADING", statusClass: "starting" };
|
||||
return {
|
||||
statusText: "LOADING",
|
||||
statusClass: "starting",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("WarmingUp"))
|
||||
return { statusText: "WARMING UP", statusClass: "starting" };
|
||||
return {
|
||||
statusText: "WARMING UP",
|
||||
statusClass: "starting",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("Running"))
|
||||
return { statusText: "RUNNING", statusClass: "running" };
|
||||
if (has("Ready")) return { statusText: "READY", statusClass: "loaded" };
|
||||
if (has("Loaded")) return { statusText: "LOADED", statusClass: "loaded" };
|
||||
return {
|
||||
statusText: "RUNNING",
|
||||
statusClass: "running",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("Ready"))
|
||||
return { statusText: "READY", statusClass: "loaded", errorMessage: null };
|
||||
if (has("Loaded"))
|
||||
return {
|
||||
statusText: "LOADED",
|
||||
statusClass: "loaded",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("WaitingForModel"))
|
||||
return { statusText: "WAITING", statusClass: "starting" };
|
||||
return {
|
||||
statusText: "WAITING",
|
||||
statusClass: "starting",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("InitializingBackend"))
|
||||
return { statusText: "INITIALIZING", statusClass: "starting" };
|
||||
return {
|
||||
statusText: "INITIALIZING",
|
||||
statusClass: "starting",
|
||||
errorMessage: null,
|
||||
};
|
||||
if (has("WaitingForInitialization"))
|
||||
return { statusText: "INITIALIZING", statusClass: "starting" };
|
||||
return {
|
||||
statusText: "INITIALIZING",
|
||||
statusClass: "starting",
|
||||
errorMessage: null,
|
||||
};
|
||||
|
||||
return { statusText: "RUNNING", statusClass: "active" };
|
||||
return { statusText: "RUNNING", statusClass: "active", errorMessage: null };
|
||||
}
|
||||
|
||||
function getBytes(value: unknown): number {
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
modelDirectory?: string;
|
||||
}
|
||||
| { kind: "pending"; modelDirectory?: string }
|
||||
| { kind: "failed"; modelDirectory?: string }
|
||||
| { kind: "failed"; modelDirectory?: string; errorMessage?: string }
|
||||
| { kind: "not_present" };
|
||||
|
||||
type ModelCardInfo = {
|
||||
@@ -263,7 +263,10 @@
|
||||
modelDirectory,
|
||||
};
|
||||
} else if (tag === "DownloadFailed") {
|
||||
cell = { kind: "failed", modelDirectory };
|
||||
const errorMsg =
|
||||
((payload.errorMessage ?? payload.error_message) as string) ||
|
||||
undefined;
|
||||
cell = { kind: "failed", modelDirectory, errorMessage: errorMsg };
|
||||
} else {
|
||||
cell = { kind: "pending", modelDirectory };
|
||||
}
|
||||
@@ -499,7 +502,7 @@
|
||||
{:else if cell.kind === "failed"}
|
||||
<div
|
||||
class="flex flex-col items-center gap-0.5"
|
||||
title="Download failed"
|
||||
title={cell.errorMessage ?? "Download failed"}
|
||||
>
|
||||
<svg
|
||||
class="w-5 h-5 text-red-400"
|
||||
@@ -512,6 +515,14 @@
|
||||
clip-rule="evenodd"
|
||||
></path>
|
||||
</svg>
|
||||
{#if cell.errorMessage}
|
||||
<span
|
||||
class="text-[9px] text-red-400/70 max-w-[120px] truncate"
|
||||
title={cell.errorMessage}
|
||||
>
|
||||
{cell.errorMessage}
|
||||
</span>
|
||||
{/if}
|
||||
{#if row.shardMetadata}
|
||||
<button
|
||||
type="button"
|
||||
@@ -707,6 +718,11 @@
|
||||
({clampPercent(cellStatus.percentage).toFixed(0)}%)
|
||||
{/if}
|
||||
</span>
|
||||
{#if cellStatus.kind === "failed" && "errorMessage" in cellStatus && cellStatus.errorMessage}
|
||||
<span class="text-[9px] text-red-400/70 break-all pl-1">
|
||||
{cellStatus.errorMessage}
|
||||
</span>
|
||||
{/if}
|
||||
{#if "modelDirectory" in cellStatus && cellStatus.modelDirectory}
|
||||
<span
|
||||
class="text-[9px] text-white/30 break-all pl-1"
|
||||
|
||||
@@ -14,10 +14,12 @@ from exo.shared.models.model_cards import ModelCard, ModelId, ModelTask
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.commands import PlaceInstance
|
||||
from exo.shared.types.common import CommandId, NodeId
|
||||
from exo.shared.types.events import InstanceCreated, InstanceDeleted
|
||||
from exo.shared.types.events import InstanceCreated, InstanceDeleted, TaskStatusUpdated
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.multiaddr import Multiaddr
|
||||
from exo.shared.types.profiling import NetworkInterfaceInfo, NodeNetworkInfo
|
||||
from exo.shared.types.tasks import TaskId, TaskStatus, TextGeneration
|
||||
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
|
||||
from exo.shared.types.topology import Connection, SocketConnection
|
||||
from exo.shared.types.worker.instances import (
|
||||
Instance,
|
||||
@@ -456,3 +458,117 @@ def test_tensor_rdma_backend_connectivity_matrix(
|
||||
else:
|
||||
ip_part = coordinator.split(":")[0]
|
||||
assert len(ip_part.split(".")) == 4
|
||||
|
||||
|
||||
def _make_task(
|
||||
instance_id: InstanceId,
|
||||
status: TaskStatus = TaskStatus.Running,
|
||||
) -> TextGeneration:
|
||||
return TextGeneration(
|
||||
task_id=TaskId(),
|
||||
task_status=status,
|
||||
instance_id=instance_id,
|
||||
command_id=CommandId(),
|
||||
task_params=TextGenerationTaskParams(
|
||||
model=ModelId("test-model"),
|
||||
input=[InputMessage(role="user", content="hello")],
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def test_get_transition_events_delete_instance_cancels_running_tasks(
|
||||
instance: Instance,
|
||||
):
|
||||
# arrange
|
||||
instance_id = InstanceId()
|
||||
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
|
||||
target_instances: dict[InstanceId, Instance] = {}
|
||||
task = _make_task(instance_id, TaskStatus.Running)
|
||||
tasks = {task.task_id: task}
|
||||
|
||||
# act
|
||||
events = get_transition_events(current_instances, target_instances, tasks)
|
||||
|
||||
# assert – cancellation event should come before the deletion event
|
||||
assert len(events) == 2
|
||||
assert isinstance(events[0], TaskStatusUpdated)
|
||||
assert events[0].task_id == task.task_id
|
||||
assert events[0].task_status == TaskStatus.Cancelled
|
||||
assert isinstance(events[1], InstanceDeleted)
|
||||
assert events[1].instance_id == instance_id
|
||||
|
||||
|
||||
def test_get_transition_events_delete_instance_cancels_pending_tasks(
|
||||
instance: Instance,
|
||||
):
|
||||
# arrange
|
||||
instance_id = InstanceId()
|
||||
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
|
||||
target_instances: dict[InstanceId, Instance] = {}
|
||||
task = _make_task(instance_id, TaskStatus.Pending)
|
||||
tasks = {task.task_id: task}
|
||||
|
||||
# act
|
||||
events = get_transition_events(current_instances, target_instances, tasks)
|
||||
|
||||
# assert
|
||||
assert len(events) == 2
|
||||
assert isinstance(events[0], TaskStatusUpdated)
|
||||
assert events[0].task_id == task.task_id
|
||||
assert events[0].task_status == TaskStatus.Cancelled
|
||||
assert isinstance(events[1], InstanceDeleted)
|
||||
|
||||
|
||||
def test_get_transition_events_delete_instance_ignores_completed_tasks(
|
||||
instance: Instance,
|
||||
):
|
||||
# arrange
|
||||
instance_id = InstanceId()
|
||||
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
|
||||
target_instances: dict[InstanceId, Instance] = {}
|
||||
tasks = {
|
||||
t.task_id: t
|
||||
for t in [
|
||||
_make_task(instance_id, TaskStatus.Complete),
|
||||
_make_task(instance_id, TaskStatus.Failed),
|
||||
_make_task(instance_id, TaskStatus.TimedOut),
|
||||
_make_task(instance_id, TaskStatus.Cancelled),
|
||||
]
|
||||
}
|
||||
|
||||
# act
|
||||
events = get_transition_events(current_instances, target_instances, tasks)
|
||||
|
||||
# assert – only the InstanceDeleted event, no cancellations
|
||||
assert len(events) == 1
|
||||
assert isinstance(events[0], InstanceDeleted)
|
||||
|
||||
|
||||
def test_get_transition_events_delete_instance_cancels_only_matching_tasks(
|
||||
instance: Instance,
|
||||
):
|
||||
# arrange
|
||||
instance_id_a = InstanceId()
|
||||
instance_id_b = InstanceId()
|
||||
current_instances: dict[InstanceId, Instance] = {
|
||||
instance_id_a: instance,
|
||||
instance_id_b: instance,
|
||||
}
|
||||
# only delete instance A, keep instance B
|
||||
target_instances: dict[InstanceId, Instance] = {instance_id_b: instance}
|
||||
|
||||
task_a = _make_task(instance_id_a, TaskStatus.Running)
|
||||
task_b = _make_task(instance_id_b, TaskStatus.Running)
|
||||
tasks = {task_a.task_id: task_a, task_b.task_id: task_b}
|
||||
|
||||
# act
|
||||
events = get_transition_events(current_instances, target_instances, tasks)
|
||||
|
||||
# assert – only task_a should be cancelled
|
||||
cancel_events = [e for e in events if isinstance(e, TaskStatusUpdated)]
|
||||
delete_events = [e for e in events if isinstance(e, InstanceDeleted)]
|
||||
assert len(cancel_events) == 1
|
||||
assert cancel_events[0].task_id == task_a.task_id
|
||||
assert cancel_events[0].task_status == TaskStatus.Cancelled
|
||||
assert len(delete_events) == 1
|
||||
assert delete_events[0].instance_id == instance_id_a
|
||||
|
||||
Reference in New Issue
Block a user