mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-05 19:52:16 -05:00
Compare commits
1 Commits
ciaran/num
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b907398a4 |
@@ -148,15 +148,6 @@
|
||||
setImageGenerationParams({ guidance: null });
|
||||
}
|
||||
|
||||
function handleNumSyncStepsChange(event: Event) {
|
||||
const value = parseInt((event.target as HTMLInputElement).value, 10);
|
||||
setImageGenerationParams({ numSyncSteps: value });
|
||||
}
|
||||
|
||||
function clearNumSyncSteps() {
|
||||
setImageGenerationParams({ numSyncSteps: null });
|
||||
}
|
||||
|
||||
function handleReset() {
|
||||
resetImageGenerationParams();
|
||||
showAdvanced = false;
|
||||
@@ -166,8 +157,7 @@
|
||||
params.seed !== null ||
|
||||
params.numInferenceSteps !== null ||
|
||||
params.guidance !== null ||
|
||||
(params.negativePrompt !== null && params.negativePrompt.trim() !== "") ||
|
||||
params.numSyncSteps !== null,
|
||||
(params.negativePrompt !== null && params.negativePrompt.trim() !== ""),
|
||||
);
|
||||
</script>
|
||||
|
||||
@@ -588,50 +578,7 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Row 3: Sync Steps -->
|
||||
<div class="flex items-center gap-1.5">
|
||||
<span
|
||||
class="text-xs text-exo-light-gray uppercase tracking-wider whitespace-nowrap"
|
||||
>SYNC STEPS:</span
|
||||
>
|
||||
<div class="flex items-center gap-2 flex-1 max-w-xs">
|
||||
<input
|
||||
type="range"
|
||||
min="1"
|
||||
max="100"
|
||||
value={params.numSyncSteps ?? 1}
|
||||
oninput={handleNumSyncStepsChange}
|
||||
class="flex-1 h-1 bg-exo-medium-gray/50 rounded appearance-none cursor-pointer accent-exo-yellow"
|
||||
/>
|
||||
<span class="text-xs font-mono text-exo-yellow w-8 text-right">
|
||||
{params.numSyncSteps ?? "--"}
|
||||
</span>
|
||||
{#if params.numSyncSteps !== null}
|
||||
<button
|
||||
type="button"
|
||||
onclick={clearNumSyncSteps}
|
||||
class="text-exo-light-gray hover:text-exo-yellow transition-colors"
|
||||
title="Clear"
|
||||
>
|
||||
<svg
|
||||
class="w-3 h-3"
|
||||
fill="none"
|
||||
viewBox="0 0 24 24"
|
||||
stroke="currentColor"
|
||||
>
|
||||
<path
|
||||
stroke-linecap="round"
|
||||
stroke-linejoin="round"
|
||||
stroke-width="2"
|
||||
d="M6 18L18 6M6 6l12 12"
|
||||
/>
|
||||
</svg>
|
||||
</button>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Row 4: Negative Prompt -->
|
||||
<!-- Row 3: Negative Prompt -->
|
||||
<div class="flex flex-col gap-1.5">
|
||||
<span class="text-xs text-exo-light-gray uppercase tracking-wider"
|
||||
>NEGATIVE PROMPT:</span
|
||||
|
||||
@@ -298,7 +298,6 @@ export interface ImageGenerationParams {
|
||||
numInferenceSteps: number | null;
|
||||
guidance: number | null;
|
||||
negativePrompt: string | null;
|
||||
numSyncSteps: number | null;
|
||||
// Edit mode params
|
||||
inputFidelity: "low" | "high";
|
||||
}
|
||||
@@ -320,7 +319,6 @@ const DEFAULT_IMAGE_PARAMS: ImageGenerationParams = {
|
||||
numInferenceSteps: null,
|
||||
guidance: null,
|
||||
negativePrompt: null,
|
||||
numSyncSteps: null,
|
||||
inputFidelity: "low",
|
||||
};
|
||||
|
||||
@@ -2398,9 +2396,7 @@ class AppStore {
|
||||
params.seed !== null ||
|
||||
params.numInferenceSteps !== null ||
|
||||
params.guidance !== null ||
|
||||
(params.negativePrompt !== null &&
|
||||
params.negativePrompt.trim() !== "") ||
|
||||
params.numSyncSteps !== null;
|
||||
(params.negativePrompt !== null && params.negativePrompt.trim() !== "");
|
||||
|
||||
const requestBody: Record<string, unknown> = {
|
||||
model,
|
||||
@@ -2425,9 +2421,6 @@ class AppStore {
|
||||
params.negativePrompt.trim() !== "" && {
|
||||
negative_prompt: params.negativePrompt,
|
||||
}),
|
||||
...(params.numSyncSteps !== null && {
|
||||
num_sync_steps: params.numSyncSteps,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2677,19 +2670,11 @@ class AppStore {
|
||||
formData.append("input_fidelity", params.inputFidelity);
|
||||
|
||||
// Advanced params
|
||||
const hasAdvancedParams =
|
||||
params.seed !== null ||
|
||||
params.numInferenceSteps !== null ||
|
||||
params.guidance !== null ||
|
||||
(params.negativePrompt !== null &&
|
||||
params.negativePrompt.trim() !== "") ||
|
||||
params.numSyncSteps !== null;
|
||||
|
||||
if (hasAdvancedParams) {
|
||||
if (params.seed !== null) {
|
||||
formData.append(
|
||||
"advanced_params",
|
||||
JSON.stringify({
|
||||
...(params.seed !== null && { seed: params.seed }),
|
||||
seed: params.seed,
|
||||
...(params.numInferenceSteps !== null && {
|
||||
num_inference_steps: params.numInferenceSteps,
|
||||
}),
|
||||
@@ -2698,9 +2683,24 @@ class AppStore {
|
||||
params.negativePrompt.trim() !== "" && {
|
||||
negative_prompt: params.negativePrompt,
|
||||
}),
|
||||
...(params.numSyncSteps !== null && {
|
||||
num_sync_steps: params.numSyncSteps,
|
||||
}),
|
||||
);
|
||||
} else if (
|
||||
params.numInferenceSteps !== null ||
|
||||
params.guidance !== null ||
|
||||
(params.negativePrompt !== null && params.negativePrompt.trim() !== "")
|
||||
) {
|
||||
formData.append(
|
||||
"advanced_params",
|
||||
JSON.stringify({
|
||||
...(params.numInferenceSteps !== null && {
|
||||
num_inference_steps: params.numInferenceSteps,
|
||||
}),
|
||||
...(params.guidance !== null && { guidance: params.guidance }),
|
||||
...(params.negativePrompt !== null &&
|
||||
params.negativePrompt.trim() !== "" && {
|
||||
negative_prompt: params.negativePrompt,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
2
justfile
2
justfile
@@ -20,7 +20,7 @@ sync-clean:
|
||||
|
||||
rust-rebuild:
|
||||
cargo run --bin stub_gen
|
||||
just sync-clean
|
||||
uv sync --reinstall-package exo_pyo3_bindings
|
||||
|
||||
build-dashboard:
|
||||
#!/usr/bin/env bash
|
||||
|
||||
@@ -16,6 +16,7 @@ from exo.download.download_utils import (
|
||||
from exo.download.shard_downloader import ShardDownloader
|
||||
from exo.shared.models.model_cards import ModelId
|
||||
from exo.shared.types.commands import (
|
||||
CancelDownload,
|
||||
DeleteDownload,
|
||||
ForwarderDownloadCommand,
|
||||
StartDownload,
|
||||
@@ -107,6 +108,13 @@ class DownloadCoordinator:
|
||||
await self._start_download(shard)
|
||||
case DeleteDownload(model_id=model_id):
|
||||
await self._delete_download(model_id)
|
||||
case CancelDownload(model_id=model_id):
|
||||
await self._cancel_download(model_id)
|
||||
|
||||
async def _cancel_download(self, model_id: ModelId) -> None:
|
||||
if model_id in self.active_downloads and model_id in self.download_status:
|
||||
logger.info(f"Cancelling download for {model_id}")
|
||||
self.active_downloads.pop(model_id).cancel()
|
||||
|
||||
async def _start_download(self, shard: ShardMetadata) -> None:
|
||||
model_id = shard.model_card.model_id
|
||||
|
||||
@@ -105,6 +105,7 @@ class Node:
|
||||
global_event_sender=router.sender(topics.GLOBAL_EVENTS),
|
||||
local_event_receiver=router.receiver(topics.LOCAL_EVENTS),
|
||||
command_receiver=router.receiver(topics.COMMANDS),
|
||||
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
|
||||
)
|
||||
|
||||
er_send, er_recv = channel[ElectionResult]()
|
||||
@@ -188,6 +189,9 @@ class Node:
|
||||
global_event_sender=self.router.sender(topics.GLOBAL_EVENTS),
|
||||
local_event_receiver=self.router.receiver(topics.LOCAL_EVENTS),
|
||||
command_receiver=self.router.receiver(topics.COMMANDS),
|
||||
download_command_sender=self.router.sender(
|
||||
topics.DOWNLOAD_COMMANDS
|
||||
),
|
||||
)
|
||||
self._tg.start_soon(self.master.run)
|
||||
elif (
|
||||
|
||||
@@ -6,6 +6,7 @@ from loguru import logger
|
||||
|
||||
from exo.master.placement import (
|
||||
add_instance_to_placements,
|
||||
cancel_unnecessary_downloads,
|
||||
delete_instance,
|
||||
get_transition_events,
|
||||
place_instance,
|
||||
@@ -16,6 +17,7 @@ from exo.shared.types.commands import (
|
||||
CreateInstance,
|
||||
DeleteInstance,
|
||||
ForwarderCommand,
|
||||
ForwarderDownloadCommand,
|
||||
ImageEdits,
|
||||
ImageGeneration,
|
||||
PlaceInstance,
|
||||
@@ -66,12 +68,9 @@ class Master:
|
||||
session_id: SessionId,
|
||||
*,
|
||||
command_receiver: Receiver[ForwarderCommand],
|
||||
# Receiving indexed events from the forwarder to be applied to state
|
||||
# Ideally these would be WorkerForwarderEvents but type system says no :(
|
||||
local_event_receiver: Receiver[ForwarderEvent],
|
||||
# Send events to the forwarder to be indexed (usually from command processing)
|
||||
# Ideally these would be MasterForwarderEvents but type system says no :(
|
||||
global_event_sender: Sender[ForwarderEvent],
|
||||
download_command_sender: Sender[ForwarderDownloadCommand],
|
||||
):
|
||||
self.state = State()
|
||||
self._tg: TaskGroup = anyio.create_task_group()
|
||||
@@ -81,6 +80,7 @@ class Master:
|
||||
self.command_receiver = command_receiver
|
||||
self.local_event_receiver = local_event_receiver
|
||||
self.global_event_sender = global_event_sender
|
||||
self.download_command_sender = download_command_sender
|
||||
send, recv = channel[Event]()
|
||||
self.event_sender: Sender[Event] = send
|
||||
self._loopback_event_receiver: Receiver[Event] = recv
|
||||
@@ -280,6 +280,14 @@ class Master:
|
||||
transition_events = get_transition_events(
|
||||
self.state.instances, placement
|
||||
)
|
||||
for cmd in cancel_unnecessary_downloads(
|
||||
placement, self.state.downloads
|
||||
):
|
||||
await self.download_command_sender.send(
|
||||
ForwarderDownloadCommand(
|
||||
origin=self.node_id, command=cmd
|
||||
)
|
||||
)
|
||||
generated_events.extend(transition_events)
|
||||
case PlaceInstance():
|
||||
placement = place_instance(
|
||||
|
||||
@@ -15,14 +15,20 @@ from exo.master.placement_utils import (
|
||||
from exo.shared.models.model_cards import ModelId
|
||||
from exo.shared.topology import Topology
|
||||
from exo.shared.types.commands import (
|
||||
CancelDownload,
|
||||
CreateInstance,
|
||||
DeleteInstance,
|
||||
DownloadCommand,
|
||||
PlaceInstance,
|
||||
)
|
||||
from exo.shared.types.common import NodeId
|
||||
from exo.shared.types.events import Event, InstanceCreated, InstanceDeleted
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.profiling import MemoryUsage, NodeNetworkInfo
|
||||
from exo.shared.types.worker.downloads import (
|
||||
DownloadOngoing,
|
||||
DownloadProgress,
|
||||
)
|
||||
from exo.shared.types.worker.instances import (
|
||||
Instance,
|
||||
InstanceId,
|
||||
@@ -202,3 +208,29 @@ def get_transition_events(
|
||||
)
|
||||
|
||||
return events
|
||||
|
||||
|
||||
def cancel_unnecessary_downloads(
|
||||
instances: Mapping[InstanceId, Instance],
|
||||
download_status: Mapping[NodeId, Sequence[DownloadProgress]],
|
||||
) -> Sequence[DownloadCommand]:
|
||||
commands: list[DownloadCommand] = []
|
||||
currently_downloading = [
|
||||
(k, v.shard_metadata.model_card.model_id)
|
||||
for k, vs in download_status.items()
|
||||
for v in vs
|
||||
if isinstance(v, (DownloadOngoing))
|
||||
]
|
||||
active_models = set(
|
||||
(
|
||||
node_id,
|
||||
instance.shard_assignments.runner_to_shard[runner_id].model_card.model_id,
|
||||
)
|
||||
for instance in instances.values()
|
||||
for node_id, runner_id in instance.shard_assignments.node_to_runner.items()
|
||||
)
|
||||
for pair in currently_downloading:
|
||||
if pair not in active_models:
|
||||
commands.append(CancelDownload(target_node_id=pair[0], model_id=pair[1]))
|
||||
|
||||
return commands
|
||||
|
||||
@@ -11,6 +11,7 @@ from exo.shared.models.model_cards import ModelCard, ModelTask
|
||||
from exo.shared.types.commands import (
|
||||
CommandId,
|
||||
ForwarderCommand,
|
||||
ForwarderDownloadCommand,
|
||||
PlaceInstance,
|
||||
TextGeneration,
|
||||
)
|
||||
@@ -47,6 +48,7 @@ async def test_master():
|
||||
ge_sender, global_event_receiver = channel[ForwarderEvent]()
|
||||
command_sender, co_receiver = channel[ForwarderCommand]()
|
||||
local_event_sender, le_receiver = channel[ForwarderEvent]()
|
||||
fcds, _fcdr = channel[ForwarderDownloadCommand]()
|
||||
|
||||
all_events: list[IndexedEvent] = []
|
||||
|
||||
@@ -67,6 +69,7 @@ async def test_master():
|
||||
global_event_sender=ge_sender,
|
||||
local_event_receiver=le_receiver,
|
||||
command_receiver=co_receiver,
|
||||
download_command_sender=fcds,
|
||||
)
|
||||
logger.info("run the master")
|
||||
async with anyio.create_task_group() as tg:
|
||||
|
||||
@@ -272,7 +272,6 @@ class AdvancedImageParams(BaseModel):
|
||||
num_inference_steps: Annotated[int, Field(ge=1, le=100)] | None = None
|
||||
guidance: Annotated[float, Field(ge=1.0, le=20.0)] | None = None
|
||||
negative_prompt: str | None = None
|
||||
num_sync_steps: Annotated[int, Field(ge=1, le=100)] | None = None
|
||||
|
||||
|
||||
class ImageGenerationTaskParams(BaseModel):
|
||||
|
||||
@@ -72,7 +72,12 @@ class DeleteDownload(BaseCommand):
|
||||
model_id: ModelId
|
||||
|
||||
|
||||
DownloadCommand = StartDownload | DeleteDownload
|
||||
class CancelDownload(BaseCommand):
|
||||
target_node_id: NodeId
|
||||
model_id: ModelId
|
||||
|
||||
|
||||
DownloadCommand = StartDownload | DeleteDownload | CancelDownload
|
||||
|
||||
|
||||
Command = (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from enum import Enum
|
||||
from math import ceil
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -22,7 +23,7 @@ class ImageModelConfig(BaseModel):
|
||||
block_configs: tuple[TransformerBlockConfig, ...]
|
||||
|
||||
default_steps: dict[str, int] # {"low": X, "medium": Y, "high": Z}
|
||||
num_sync_steps: int # Number of sync steps for distributed inference
|
||||
num_sync_steps_factor: float # Fraction of steps for sync phase
|
||||
|
||||
guidance_scale: float | None = None # None or <= 1.0 disables CFG
|
||||
|
||||
@@ -44,3 +45,6 @@ class ImageModelConfig(BaseModel):
|
||||
|
||||
def get_steps_for_quality(self, quality: str) -> int:
|
||||
return self.default_steps[quality]
|
||||
|
||||
def get_num_sync_steps(self, steps: int) -> int:
|
||||
return ceil(steps * self.num_sync_steps_factor)
|
||||
|
||||
@@ -150,10 +150,7 @@ class DistributedImageModel:
|
||||
guidance=guidance_override if guidance_override is not None else 4.0,
|
||||
)
|
||||
|
||||
if advanced_params is not None and advanced_params.num_sync_steps is not None:
|
||||
num_sync_steps = advanced_params.num_sync_steps
|
||||
else:
|
||||
num_sync_steps = self._config.num_sync_steps
|
||||
num_sync_steps = self._config.get_num_sync_steps(steps)
|
||||
|
||||
for result in self._runner.generate_image(
|
||||
runtime_config=config,
|
||||
|
||||
@@ -15,7 +15,7 @@ FLUX_SCHNELL_CONFIG = ImageModelConfig(
|
||||
),
|
||||
),
|
||||
default_steps={"low": 1, "medium": 2, "high": 4},
|
||||
num_sync_steps=1,
|
||||
num_sync_steps_factor=0.5, # 1 sync step for medium (2 steps)
|
||||
)
|
||||
|
||||
|
||||
@@ -30,5 +30,5 @@ FLUX_DEV_CONFIG = ImageModelConfig(
|
||||
),
|
||||
),
|
||||
default_steps={"low": 10, "medium": 25, "high": 50},
|
||||
num_sync_steps=4,
|
||||
num_sync_steps_factor=0.125, # ~3 sync steps for medium (25 steps)
|
||||
)
|
||||
|
||||
@@ -12,7 +12,7 @@ QWEN_IMAGE_CONFIG = ImageModelConfig(
|
||||
),
|
||||
),
|
||||
default_steps={"low": 10, "medium": 25, "high": 50},
|
||||
num_sync_steps=7,
|
||||
num_sync_steps_factor=0.25,
|
||||
guidance_scale=3.5, # Set to None or < 1.0 to disable CFG
|
||||
)
|
||||
|
||||
@@ -24,6 +24,6 @@ QWEN_IMAGE_EDIT_CONFIG = ImageModelConfig(
|
||||
),
|
||||
),
|
||||
default_steps={"low": 10, "medium": 25, "high": 50},
|
||||
num_sync_steps=7,
|
||||
num_sync_steps_factor=0.25,
|
||||
guidance_scale=3.5,
|
||||
)
|
||||
|
||||
@@ -35,7 +35,7 @@ i=0
|
||||
for host; do
|
||||
colour=${colours[i++ % 4]}
|
||||
ssh -T -o BatchMode=yes -o ServerAliveInterval=30 "$host@$host" \
|
||||
"/nix/var/nix/profiles/default/bin/nix run github:exo-explore/exo/$commit" |&
|
||||
"EXO_LIBP2P_NAMESPACE=$commit /nix/var/nix/profiles/default/bin/nix run github:exo-explore/exo/$commit" |&
|
||||
awk -v p="${colour}[${host}]${reset}" '{ print p $0; fflush() }' &
|
||||
done
|
||||
|
||||
|
||||
Reference in New Issue
Block a user