Fix a bunch of issues found during testing

This commit is contained in:
ciaranbor
2026-04-17 20:11:17 +01:00
parent bffdcc690b
commit 46c3ed1eb4
6 changed files with 173 additions and 57 deletions

View File

@@ -39,13 +39,13 @@
total: number;
modelDirectory?: string;
}
| { kind: "failed"; modelDirectory?: string }
| { kind: "failed"; errorMessage?: string; modelDirectory?: string }
| {
kind: "rejected";
reason: string;
requiredBytes: number;
availableBytes: number;
limitBytes: number;
limitBytes?: number;
modelDirectory?: string;
}
| { kind: "not_present" };
@@ -231,6 +231,18 @@
? Math.round((storageConfigNode.diskTotal ?? 0) / 1024 ** 3)
: 0,
);
let configEffectiveCapacityGb = $derived.by(() => {
if (!storageConfigNode) return 0;
const diskAvail = storageConfigNode.diskAvailable ?? 0;
const exoUsed = getNodeUsedStorage(storageConfigNode.nodeId);
return Math.round((diskAvail + exoUsed) / 1024 ** 3);
});
let configLimitExceedsDisk = $derived(
!configNoLimit &&
configMaxGb != null &&
configMaxGb > configEffectiveCapacityGb &&
configEffectiveCapacityGb > 0,
);
function openStorageConfig(col: NodeColumn) {
storageConfigNode = col;
@@ -378,7 +390,11 @@
modelDirectory,
};
} else if (tag === "ModelDownloadFailed") {
cell = { kind: "failed", modelDirectory };
const errorMessage =
(payload.error_message as string) ??
(payload.errorMessage as string) ??
undefined;
cell = { kind: "failed", errorMessage, modelDirectory };
} else {
const downloaded = getBytes(
payload.downloaded ??
@@ -581,7 +597,12 @@
</th>
{#each nodeColumns as col}
{@const usedStorage = getNodeUsedStorage(col.nodeId)}
{@const storageMax = col.storageLimit ?? col.diskTotal ?? 0}
{@const quotaLimit = col.storageLimit}
{@const diskAvail = col.diskAvailable ?? 0}
{@const storageMax =
quotaLimit != null
? Math.min(quotaLimit, diskAvail + usedStorage)
: diskAvail + usedStorage}
{@const storagePercent =
storageMax > 0
? Math.min(100, (usedStorage / storageMax) * 100)
@@ -842,35 +863,36 @@
{/if}
</div>
{:else if cell.kind === "failed"}
<div
class="flex flex-col items-center gap-1"
title="Download failed"
>
<svg
class="w-7 h-7 text-red-400"
viewBox="0 0 20 20"
fill="currentColor"
>
<path
fill-rule="evenodd"
d="M4.293 4.293a1 1 0 011.414 0L10 8.586l4.293-4.293a1 1 0 111.414 1.414L11.414 10l4.293 4.293a1 1 0 01-1.414 1.414L10 11.414l-4.293 4.293a1 1 0 01-1.414-1.414L8.586 10 4.293 5.707a1 1 0 010-1.414z"
clip-rule="evenodd"
></path>
</svg>
<div class="flex gap-1">
{#if row.shardMetadata}
<button
type="button"
class="text-white/50 hover:text-exo-yellow transition-colors cursor-pointer"
onclick={() =>
startDownload(col.nodeId, row.shardMetadata!)}
title="Retry download on this node"
>
{@render downloadIcon()}
</button>
{/if}
{@render deleteButton(col.nodeId, row.modelId)}
<div class="flex flex-col items-center gap-1">
<!-- Error icon with tooltip -->
<div class="relative group">
<svg
class="w-7 h-7 text-red-400 cursor-help"
viewBox="0 0 20 20"
fill="currentColor"
>
<path
fill-rule="evenodd"
d="M18 10a8 8 0 11-16 0 8 8 0 0116 0zm-7 4a1 1 0 11-2 0 1 1 0 012 0zm-1-9a1 1 0 00-1 1v4a1 1 0 102 0V6a1 1 0 00-1-1z"
clip-rule="evenodd"
></path>
</svg>
<div
class="absolute top-full left-1/2 -translate-x-1/2 mt-2 px-3 py-2 bg-black/95 border border-red-500/30 rounded-lg text-[10px] text-red-300 opacity-0 group-hover:opacity-100 transition-opacity pointer-events-none z-50 max-w-[300px] text-center"
>
{cell.errorMessage ?? "Download failed"}
</div>
</div>
{#if row.shardMetadata}
<button
type="button"
class="text-[9px] text-white/50 hover:text-exo-yellow transition-colors cursor-pointer border border-white/10 hover:border-exo-yellow/40 rounded px-1.5 py-0.5"
onclick={() =>
startDownload(col.nodeId, row.shardMetadata!)}
>
Retry
</button>
{/if}
</div>
{:else}
<div
@@ -1153,6 +1175,29 @@
</div>
</div>
<!-- Disk capacity warning -->
{#if configLimitExceedsDisk}
<div
class="flex items-start gap-2 px-3 py-2 rounded bg-orange-500/10 border border-orange-500/20"
>
<svg
class="w-4 h-4 text-orange-400 shrink-0 mt-0.5"
viewBox="0 0 20 20"
fill="currentColor"
>
<path
fill-rule="evenodd"
d="M8.257 3.099c.765-1.36 2.722-1.36 3.486 0l5.58 9.92c.75 1.334-.213 2.98-1.742 2.98H4.42c-1.53 0-2.493-1.646-1.743-2.98l5.58-9.92zM11 13a1 1 0 11-2 0 1 1 0 012 0zm-1-8a1 1 0 00-1 1v3a1 1 0 002 0V6a1 1 0 00-1-1z"
clip-rule="evenodd"
/>
</svg>
<p class="text-[10px] text-orange-300/80 font-mono">
Disk only has {configEffectiveCapacityGb} GB available for models. The
{configMaxGb} GB limit has no effect.
</p>
</div>
{/if}
<!-- Policy selector -->
<div class="space-y-1.5">
<div

View File

@@ -22,6 +22,7 @@ from exo.download.shard_downloader import ShardDownloader
from exo.shared.constants import (
EXO_DEFAULT_MODELS_DIR,
EXO_MODEL_USAGE_FILE,
EXO_MODELS_DIRS,
EXO_MODELS_READ_ONLY_DIRS,
)
from exo.shared.models.model_cards import ModelId, get_model_cards
@@ -96,6 +97,21 @@ class DownloadCoordinator:
def _default_model_dir(model_id: ModelId) -> str:
return str(EXO_DEFAULT_MODELS_DIR / model_id.normalize())
@staticmethod
def _get_disk_free() -> Memory | None:
"""Get free disk space for the first available models directory."""
import shutil
for candidate_dir in EXO_MODELS_DIRS:
if not candidate_dir.exists():
continue
try:
usage = shutil.disk_usage(candidate_dir)
return Memory.from_bytes(usage.free)
except OSError:
continue
return None
def _completed_from_path(
self,
shard: ShardMetadata,
@@ -240,15 +256,15 @@ class DownloadCoordinator:
async def _start_download(self, shard: ShardMetadata) -> None:
model_id = shard.model_card.model_id
# Check if already downloading, complete, or recently failed
# Check if already downloading or complete
if model_id in self.download_status:
status = self.download_status[model_id]
if isinstance(status, (ModelDownloading, ModelReady, ModelDownloadFailed)):
if isinstance(status, (ModelDownloading, ModelReady)):
logger.debug(
f"Download for {model_id} already in progress, complete, or failed, skipping"
f"Download for {model_id} skipped: current status is {type(status).__name__}"
)
return
if isinstance(status, ModelRejected):
if isinstance(status, (ModelRejected, ModelDownloadFailed)):
del self.download_status[model_id]
# Check all model directories for pre-existing complete models
@@ -264,12 +280,14 @@ class DownloadCoordinator:
)
return
disk_free = await to_thread.run_sync(self._get_disk_free)
action = decide_storage_action(
shard.model_card.storage_size,
self.storage_config,
list(self.download_status.values()),
self._model_last_used,
frozenset(self._active_model_ids),
disk_free=disk_free,
)
match action:
case StorageReject(reason=reason, available=available):
@@ -438,7 +456,10 @@ class DownloadCoordinator:
if model_id in self.active_downloads:
continue
if isinstance(self.download_status.get(model_id), ModelRejected):
if isinstance(
self.download_status.get(model_id),
(ModelRejected, ModelDownloadFailed),
):
continue
if progress.status == "complete":
@@ -477,7 +498,7 @@ class DownloadCoordinator:
else:
continue
self.download_status[progress.shard.model_card.model_id] = status
self.download_status[model_id] = status
await self.event_sender.send(
NodeDownloadProgress(download_progress=status)
)
@@ -522,7 +543,6 @@ class DownloadCoordinator:
async def _reject_download(
self, shard: ShardMetadata, reason: str, available: Memory
) -> None:
assert self.storage_config.max_storage is not None
model_id = shard.model_card.model_id
rejected = ModelRejected(
shard_metadata=shard,
@@ -557,8 +577,11 @@ class DownloadCoordinator:
current_used = calculate_used_storage(
list(self.download_status.values())
)
assert self.storage_config.max_storage is not None
current_available = self.storage_config.max_storage - current_used
if self.storage_config.max_storage is not None:
current_available = self.storage_config.max_storage - current_used
else:
disk_free = self._get_disk_free()
current_available = disk_free if disk_free is not None else Memory()
await self._reject_download(
shard,
f"Failed to delete model {evict_model_id} from disk",

View File

@@ -67,7 +67,7 @@ from exo.shared.types.tasks import (
from exo.shared.types.tasks import (
TextGeneration as TextGenerationTask,
)
from exo.shared.types.worker.downloads import ModelRejected
from exo.shared.types.worker.downloads import ModelDownloadFailed, ModelRejected
from exo.shared.types.worker.instances import InstanceId
from exo.utils.channels import Receiver, Sender
from exo.utils.disk_event_log import DiskEventLog
@@ -125,7 +125,9 @@ class Master:
with self.command_receiver as commands:
async for forwarder_command in commands:
try:
logger.info(f"Executing command: {forwarder_command.command}")
logger.info(
f"Executing command from {forwarder_command.origin}: {forwarder_command.command}"
)
generated_events: list[Event] = []
command = forwarder_command.command
@@ -430,18 +432,18 @@ class Master:
self.state = apply(self.state, indexed)
if isinstance(event, NodeDownloadProgress) and isinstance(
event.download_progress, ModelRejected
event.download_progress, (ModelRejected, ModelDownloadFailed)
):
dp = event.download_progress
rejection_events = get_download_rejected_events(
cleanup_events = get_download_rejected_events(
dp.shard_metadata.model_card.model_id,
dp.node_id,
self.state.instances,
self.state.tasks,
)
for rejection_event in rejection_events:
logger.info(f"Download rejected cleanup: {rejection_event}")
await self.event_sender.send(rejection_event)
for cleanup_event in cleanup_events:
logger.info(f"Download failure cleanup: {cleanup_event}")
await self.event_sender.send(cleanup_event)
event._master_time_stamp = datetime.now(tz=timezone.utc) # pyright: ignore[reportPrivateUsage]
if isinstance(event, NodeGatheredInfo):

View File

@@ -108,18 +108,50 @@ def decide_storage_action(
downloads: Sequence[ModelStatus],
model_last_used: Mapping[ModelId, datetime],
active_model_ids: frozenset[ModelId],
disk_free: Memory | None = None,
) -> StorageDecision:
"""Pure decision function: given storage state, decide whether to allow, evict, or reject."""
if config.max_storage is None:
return StorageAllow()
"""Pure decision function: given storage state, decide whether to allow, evict, or reject.
allowed, reason = check_storage_quota(model_size, config, downloads)
if allowed:
If ``disk_free`` is provided, it is used alongside the quota to determine
the effective available space. This ensures evictions are triggered when the
physical disk is full, even if the quota accounting says there is room.
"""
if config.max_storage is None:
if disk_free is not None and model_size > disk_free:
# No quota set, but disk is physically full — try auto-evict if enabled
if config.storage_policy == "auto-evict":
candidates = get_lru_eviction_candidates(
downloads, model_last_used, active_model_ids
)
to_evict = compute_evictions_needed(model_size, disk_free, candidates)
if to_evict is not None:
return StorageEvict(model_ids=to_evict)
return StorageReject(
reason=f"Need {model_size.in_gb:.1f} GiB but only {disk_free.in_gb:.1f} GiB free on disk",
available=disk_free,
)
return StorageAllow()
used = calculate_used_storage(downloads)
raw_available = config.max_storage - used
available = raw_available if raw_available.in_bytes >= 0 else Memory()
raw_quota_available = config.max_storage - used
quota_available = (
raw_quota_available if raw_quota_available.in_bytes >= 0 else Memory()
)
# Effective available is the minimum of quota headroom and physical disk free space
available = quota_available
if disk_free is not None and disk_free < available:
available = disk_free
if model_size <= available:
return StorageAllow()
reason = (
f"Need {model_size.in_gb:.1f} GiB, only {max(0, available.in_gb):.1f} GiB available"
f" (quota: {quota_available.in_gb:.1f} GiB, disk: {disk_free.in_gb:.1f} GiB)"
if disk_free is not None
else f"Need {model_size.in_gb:.1f} GiB, only {max(0, available.in_gb):.1f} GiB available within {config.max_storage.in_gb:.1f} GiB limit"
)
if config.storage_policy == "auto-evict":
candidates = get_lru_eviction_candidates(

View File

@@ -51,7 +51,7 @@ class ModelRejected(BaseModelStatus):
reason: str
required: Memory
available: Memory
limit: Memory
limit: Memory | None = None
ModelStatus = (

View File

@@ -63,7 +63,7 @@ def plan(
or _model_needs_download(
node_id, runners, global_download_status, download_backoff
)
or _init_distributed_backend(runners, all_runners)
or _init_distributed_backend(runners, all_runners, global_download_status)
or _load_model(runners, all_runners, global_download_status)
or _ready_to_warmup(runners, all_runners)
or _pending_tasks(runners, tasks, all_runners, input_chunk_buffer)
@@ -168,6 +168,7 @@ def _model_needs_download(
def _init_distributed_backend(
runners: Mapping[RunnerId, RunnerSupervisor],
all_runners: Mapping[RunnerId, RunnerStatus],
global_download_status: Mapping[NodeId, Sequence[ModelStatus]],
):
for runner in runners.values():
instance = runner.bound_instance.instance
@@ -177,6 +178,19 @@ def _init_distributed_backend(
if is_single_node_instance:
continue
# Don't connect until all nodes have downloaded the model
all_downloads_complete = all(
nid in global_download_status
and any(
isinstance(dp, ModelReady)
and dp.shard_metadata.model_card.model_id == shard_assignments.model_id
for dp in global_download_status[nid]
)
for nid in shard_assignments.node_to_runner
)
if not all_downloads_complete:
continue
runner_is_idle = isinstance(runner.status, RunnerIdle)
all_runners_connecting = all(
isinstance(