From 46c3ed1eb41ccd2f77ef1b7ea487500e12322e56 Mon Sep 17 00:00:00 2001 From: ciaranbor Date: Fri, 17 Apr 2026 20:11:17 +0100 Subject: [PATCH] Fix a bunch of issues found during testing --- dashboard/src/routes/downloads/+page.svelte | 109 ++++++++++++++------ src/exo/download/coordinator.py | 41 ++++++-- src/exo/master/main.py | 16 +-- src/exo/shared/storage.py | 46 +++++++-- src/exo/shared/types/worker/downloads.py | 2 +- src/exo/worker/plan.py | 16 ++- 6 files changed, 173 insertions(+), 57 deletions(-) diff --git a/dashboard/src/routes/downloads/+page.svelte b/dashboard/src/routes/downloads/+page.svelte index d6810fc0c..2f64c381d 100644 --- a/dashboard/src/routes/downloads/+page.svelte +++ b/dashboard/src/routes/downloads/+page.svelte @@ -39,13 +39,13 @@ total: number; modelDirectory?: string; } - | { kind: "failed"; modelDirectory?: string } + | { kind: "failed"; errorMessage?: string; modelDirectory?: string } | { kind: "rejected"; reason: string; requiredBytes: number; availableBytes: number; - limitBytes: number; + limitBytes?: number; modelDirectory?: string; } | { kind: "not_present" }; @@ -231,6 +231,18 @@ ? Math.round((storageConfigNode.diskTotal ?? 0) / 1024 ** 3) : 0, ); + let configEffectiveCapacityGb = $derived.by(() => { + if (!storageConfigNode) return 0; + const diskAvail = storageConfigNode.diskAvailable ?? 0; + const exoUsed = getNodeUsedStorage(storageConfigNode.nodeId); + return Math.round((diskAvail + exoUsed) / 1024 ** 3); + }); + let configLimitExceedsDisk = $derived( + !configNoLimit && + configMaxGb != null && + configMaxGb > configEffectiveCapacityGb && + configEffectiveCapacityGb > 0, + ); function openStorageConfig(col: NodeColumn) { storageConfigNode = col; @@ -378,7 +390,11 @@ modelDirectory, }; } else if (tag === "ModelDownloadFailed") { - cell = { kind: "failed", modelDirectory }; + const errorMessage = + (payload.error_message as string) ?? + (payload.errorMessage as string) ?? + undefined; + cell = { kind: "failed", errorMessage, modelDirectory }; } else { const downloaded = getBytes( payload.downloaded ?? @@ -581,7 +597,12 @@ {#each nodeColumns as col} {@const usedStorage = getNodeUsedStorage(col.nodeId)} - {@const storageMax = col.storageLimit ?? col.diskTotal ?? 0} + {@const quotaLimit = col.storageLimit} + {@const diskAvail = col.diskAvailable ?? 0} + {@const storageMax = + quotaLimit != null + ? Math.min(quotaLimit, diskAvail + usedStorage) + : diskAvail + usedStorage} {@const storagePercent = storageMax > 0 ? Math.min(100, (usedStorage / storageMax) * 100) @@ -842,35 +863,36 @@ {/if} {:else if cell.kind === "failed"} -
- - - -
- {#if row.shardMetadata} - - {/if} - {@render deleteButton(col.nodeId, row.modelId)} +
+ +
+ + + +
+ {cell.errorMessage ?? "Download failed"} +
+ {#if row.shardMetadata} + + {/if}
{:else}
+ + {#if configLimitExceedsDisk} +
+ + + +

+ Disk only has {configEffectiveCapacityGb} GB available for models. The + {configMaxGb} GB limit has no effect. +

+
+ {/if} +
str: return str(EXO_DEFAULT_MODELS_DIR / model_id.normalize()) + @staticmethod + def _get_disk_free() -> Memory | None: + """Get free disk space for the first available models directory.""" + import shutil + + for candidate_dir in EXO_MODELS_DIRS: + if not candidate_dir.exists(): + continue + try: + usage = shutil.disk_usage(candidate_dir) + return Memory.from_bytes(usage.free) + except OSError: + continue + return None + def _completed_from_path( self, shard: ShardMetadata, @@ -240,15 +256,15 @@ class DownloadCoordinator: async def _start_download(self, shard: ShardMetadata) -> None: model_id = shard.model_card.model_id - # Check if already downloading, complete, or recently failed + # Check if already downloading or complete if model_id in self.download_status: status = self.download_status[model_id] - if isinstance(status, (ModelDownloading, ModelReady, ModelDownloadFailed)): + if isinstance(status, (ModelDownloading, ModelReady)): logger.debug( - f"Download for {model_id} already in progress, complete, or failed, skipping" + f"Download for {model_id} skipped: current status is {type(status).__name__}" ) return - if isinstance(status, ModelRejected): + if isinstance(status, (ModelRejected, ModelDownloadFailed)): del self.download_status[model_id] # Check all model directories for pre-existing complete models @@ -264,12 +280,14 @@ class DownloadCoordinator: ) return + disk_free = await to_thread.run_sync(self._get_disk_free) action = decide_storage_action( shard.model_card.storage_size, self.storage_config, list(self.download_status.values()), self._model_last_used, frozenset(self._active_model_ids), + disk_free=disk_free, ) match action: case StorageReject(reason=reason, available=available): @@ -438,7 +456,10 @@ class DownloadCoordinator: if model_id in self.active_downloads: continue - if isinstance(self.download_status.get(model_id), ModelRejected): + if isinstance( + self.download_status.get(model_id), + (ModelRejected, ModelDownloadFailed), + ): continue if progress.status == "complete": @@ -477,7 +498,7 @@ class DownloadCoordinator: else: continue - self.download_status[progress.shard.model_card.model_id] = status + self.download_status[model_id] = status await self.event_sender.send( NodeDownloadProgress(download_progress=status) ) @@ -522,7 +543,6 @@ class DownloadCoordinator: async def _reject_download( self, shard: ShardMetadata, reason: str, available: Memory ) -> None: - assert self.storage_config.max_storage is not None model_id = shard.model_card.model_id rejected = ModelRejected( shard_metadata=shard, @@ -557,8 +577,11 @@ class DownloadCoordinator: current_used = calculate_used_storage( list(self.download_status.values()) ) - assert self.storage_config.max_storage is not None - current_available = self.storage_config.max_storage - current_used + if self.storage_config.max_storage is not None: + current_available = self.storage_config.max_storage - current_used + else: + disk_free = self._get_disk_free() + current_available = disk_free if disk_free is not None else Memory() await self._reject_download( shard, f"Failed to delete model {evict_model_id} from disk", diff --git a/src/exo/master/main.py b/src/exo/master/main.py index b07d7e279..54d986436 100644 --- a/src/exo/master/main.py +++ b/src/exo/master/main.py @@ -67,7 +67,7 @@ from exo.shared.types.tasks import ( from exo.shared.types.tasks import ( TextGeneration as TextGenerationTask, ) -from exo.shared.types.worker.downloads import ModelRejected +from exo.shared.types.worker.downloads import ModelDownloadFailed, ModelRejected from exo.shared.types.worker.instances import InstanceId from exo.utils.channels import Receiver, Sender from exo.utils.disk_event_log import DiskEventLog @@ -125,7 +125,9 @@ class Master: with self.command_receiver as commands: async for forwarder_command in commands: try: - logger.info(f"Executing command: {forwarder_command.command}") + logger.info( + f"Executing command from {forwarder_command.origin}: {forwarder_command.command}" + ) generated_events: list[Event] = [] command = forwarder_command.command @@ -430,18 +432,18 @@ class Master: self.state = apply(self.state, indexed) if isinstance(event, NodeDownloadProgress) and isinstance( - event.download_progress, ModelRejected + event.download_progress, (ModelRejected, ModelDownloadFailed) ): dp = event.download_progress - rejection_events = get_download_rejected_events( + cleanup_events = get_download_rejected_events( dp.shard_metadata.model_card.model_id, dp.node_id, self.state.instances, self.state.tasks, ) - for rejection_event in rejection_events: - logger.info(f"Download rejected cleanup: {rejection_event}") - await self.event_sender.send(rejection_event) + for cleanup_event in cleanup_events: + logger.info(f"Download failure cleanup: {cleanup_event}") + await self.event_sender.send(cleanup_event) event._master_time_stamp = datetime.now(tz=timezone.utc) # pyright: ignore[reportPrivateUsage] if isinstance(event, NodeGatheredInfo): diff --git a/src/exo/shared/storage.py b/src/exo/shared/storage.py index 964b91b4e..ef93f86e9 100644 --- a/src/exo/shared/storage.py +++ b/src/exo/shared/storage.py @@ -108,18 +108,50 @@ def decide_storage_action( downloads: Sequence[ModelStatus], model_last_used: Mapping[ModelId, datetime], active_model_ids: frozenset[ModelId], + disk_free: Memory | None = None, ) -> StorageDecision: - """Pure decision function: given storage state, decide whether to allow, evict, or reject.""" - if config.max_storage is None: - return StorageAllow() + """Pure decision function: given storage state, decide whether to allow, evict, or reject. - allowed, reason = check_storage_quota(model_size, config, downloads) - if allowed: + If ``disk_free`` is provided, it is used alongside the quota to determine + the effective available space. This ensures evictions are triggered when the + physical disk is full, even if the quota accounting says there is room. + """ + if config.max_storage is None: + if disk_free is not None and model_size > disk_free: + # No quota set, but disk is physically full — try auto-evict if enabled + if config.storage_policy == "auto-evict": + candidates = get_lru_eviction_candidates( + downloads, model_last_used, active_model_ids + ) + to_evict = compute_evictions_needed(model_size, disk_free, candidates) + if to_evict is not None: + return StorageEvict(model_ids=to_evict) + return StorageReject( + reason=f"Need {model_size.in_gb:.1f} GiB but only {disk_free.in_gb:.1f} GiB free on disk", + available=disk_free, + ) return StorageAllow() used = calculate_used_storage(downloads) - raw_available = config.max_storage - used - available = raw_available if raw_available.in_bytes >= 0 else Memory() + raw_quota_available = config.max_storage - used + quota_available = ( + raw_quota_available if raw_quota_available.in_bytes >= 0 else Memory() + ) + + # Effective available is the minimum of quota headroom and physical disk free space + available = quota_available + if disk_free is not None and disk_free < available: + available = disk_free + + if model_size <= available: + return StorageAllow() + + reason = ( + f"Need {model_size.in_gb:.1f} GiB, only {max(0, available.in_gb):.1f} GiB available" + f" (quota: {quota_available.in_gb:.1f} GiB, disk: {disk_free.in_gb:.1f} GiB)" + if disk_free is not None + else f"Need {model_size.in_gb:.1f} GiB, only {max(0, available.in_gb):.1f} GiB available within {config.max_storage.in_gb:.1f} GiB limit" + ) if config.storage_policy == "auto-evict": candidates = get_lru_eviction_candidates( diff --git a/src/exo/shared/types/worker/downloads.py b/src/exo/shared/types/worker/downloads.py index fbc1683d4..f79f731dc 100644 --- a/src/exo/shared/types/worker/downloads.py +++ b/src/exo/shared/types/worker/downloads.py @@ -51,7 +51,7 @@ class ModelRejected(BaseModelStatus): reason: str required: Memory available: Memory - limit: Memory + limit: Memory | None = None ModelStatus = ( diff --git a/src/exo/worker/plan.py b/src/exo/worker/plan.py index d56661071..56dd74080 100644 --- a/src/exo/worker/plan.py +++ b/src/exo/worker/plan.py @@ -63,7 +63,7 @@ def plan( or _model_needs_download( node_id, runners, global_download_status, download_backoff ) - or _init_distributed_backend(runners, all_runners) + or _init_distributed_backend(runners, all_runners, global_download_status) or _load_model(runners, all_runners, global_download_status) or _ready_to_warmup(runners, all_runners) or _pending_tasks(runners, tasks, all_runners, input_chunk_buffer) @@ -168,6 +168,7 @@ def _model_needs_download( def _init_distributed_backend( runners: Mapping[RunnerId, RunnerSupervisor], all_runners: Mapping[RunnerId, RunnerStatus], + global_download_status: Mapping[NodeId, Sequence[ModelStatus]], ): for runner in runners.values(): instance = runner.bound_instance.instance @@ -177,6 +178,19 @@ def _init_distributed_backend( if is_single_node_instance: continue + # Don't connect until all nodes have downloaded the model + all_downloads_complete = all( + nid in global_download_status + and any( + isinstance(dp, ModelReady) + and dp.shard_metadata.model_card.model_id == shard_assignments.model_id + for dp in global_download_status[nid] + ) + for nid in shard_assignments.node_to_runner + ) + if not all_downloads_complete: + continue + runner_is_idle = isinstance(runner.status, RunnerIdle) all_runners_connecting = all( isinstance(