Dashboard Status & Bugfixes

This commit is contained in:
Matt Beton
2025-08-29 09:34:17 -07:00
committed by GitHub
parent a33787f5fd
commit 35c4311587
9 changed files with 93 additions and 237 deletions

2
.gitignore vendored
View File

@@ -2,7 +2,7 @@
__pycache__
*.so
hosts_*.json
hosts*.json
# go cache is project local but not tracked
.go_cache

View File

@@ -407,6 +407,26 @@
background-color: #f59e0b;
color: var(--exo-black);
}
/* New runner-status aware pills */
.instance-status.starting {
background-color: #3b82f6; /* blue */
color: var(--exo-black);
}
.instance-status.loaded {
background-color: #2dd4bf; /* teal */
color: var(--exo-black);
}
.instance-status.running {
background-color: #4ade80; /* green */
color: var(--exo-black);
}
.instance-status.failed {
background-color: #ef4444; /* red */
color: white;
}
.instance-delete-button {
background-color: #ef4444;
@@ -984,6 +1004,39 @@
return { isDownloading, progress, downloadingRunners: downloadingRunners.length };
}
// Derive a display status for an instance from its runners.
// Priority: FAILED > DOWNLOADING > STARTING > RUNNING > LOADED > INACTIVE
function deriveInstanceStatus(instance, runners = {}) {
const runnerIds = Object.keys(instance.shard_assignments?.runner_to_shard || {});
const statuses = runnerIds
.map(rid => runners[rid]?.runner_status)
.filter(s => typeof s === 'string');
const has = (s) => statuses.includes(s);
const every = (pred) => statuses.length > 0 && statuses.every(pred);
if (statuses.length === 0) {
const inactive = instance.instance_type === 'INACTIVE';
return { statusText: inactive ? 'INACTIVE' : 'LOADED', statusClass: inactive ? 'inactive' : 'loaded' };
}
if (has('Failed')) return { statusText: 'FAILED', statusClass: 'failed' };
if (has('Downloading')) return { statusText: 'DOWNLOADING', statusClass: 'downloading' };
if (has('Starting')) return { statusText: 'LOADING', statusClass: 'starting' };
if (has('Running')) return { statusText: 'RUNNING', statusClass: 'running' };
const allInactive = every(s => s === 'Inactive');
const loadedOrInactiveOnly = every(s => s === 'Loaded' || s === 'Inactive');
const anyLoaded = statuses.some(s => s === 'Loaded');
if (loadedOrInactiveOnly && anyLoaded) {
return { statusText: 'LOADED', statusClass: 'loaded' };
}
if (allInactive) {
return { statusText: 'INACTIVE', statusClass: 'inactive' };
}
return { statusText: 'LOADED', statusClass: 'loaded' };
}
function renderInstances(instances, runners = {}) {
const instancesArray = Object.values(instances);
@@ -1004,10 +1057,13 @@
// Calculate download status for this instance
const downloadStatus = calculateInstanceDownloadStatus(instance, runners);
// Determine status display - prioritize downloading over original status
const statusText = downloadStatus.isDownloading ? 'DOWNLOADING' : instance.instance_type;
const statusClass = downloadStatus.isDownloading ? 'downloading' : instance.instance_type.toLowerCase();
let statusText, statusClass;
if (downloadStatus.isDownloading) {
({ statusText, statusClass } = { statusText: 'DOWNLOADING', statusClass: 'downloading' });
} else {
({ statusText, statusClass } = deriveInstanceStatus(instance, runners));
}
// Generate download progress HTML
const downloadProgressHTML = downloadStatus.isDownloading

View File

@@ -52,17 +52,6 @@ run_remote () { # $1 host $2 command
return $rc
}
###############################################################################
# Run git command locally
###############################################################################
echo "=== Running 'git $GIT_CMD' locally ==="
if (cd ~/exo && git $GIT_CMD); then
echo "✓ Local git command succeeded"
else
echo "❌ Local git command failed"
exit 1
fi
###############################################################################
# Run git command on remote hosts (parallel)
###############################################################################

View File

@@ -199,7 +199,7 @@ def spin_up_runners(
if (
runner_id in state_runners
and state_runners[runner_id].runner_status
!= RunnerStatusType.Inactive
not in [RunnerStatusType.Inactive, RunnerStatusType.Starting]
):
ready_to_spin = False

View File

@@ -52,7 +52,7 @@ def generate_step(
max_kv_size: Optional[int] = None,
prompt_cache: Optional[list[KVCache]] = None,
prefill_step_size: int = 2048,
) -> Generator[Tuple[mx.array, mx.array], None, None]:
) -> Generator[Tuple[int, mx.array], None, None]:
"""
A generator producing token ids based on the given prompt from the model.
@@ -70,7 +70,7 @@ def generate_step(
prefill_step_size (int): Step size for processing the prompt.
Yields:
Tuple[mx.array, mx.array]: One token and a vector of log probabilities.
Tuple[int, mx.array]: One token and a vector of log probabilities.
"""
tokens = None
@@ -128,19 +128,22 @@ def generate_step(
n = 0
next_y: array | None = None
next_logprobs: array | None = None
mx.async_eval(y, logprobs) # type: ignore
n = 0
while True:
if n != max_tokens and n > 0: # Only call _step after first iteration
if n != max_tokens:
assert y is not None
next_y, next_logprobs = _step(y)
mx.async_eval(next_y, next_logprobs) # type: ignore
if n == 0:
mx.eval(y) # type: ignore
if n == max_tokens:
break
yield y, logprobs # y is always defined here, no need for cast
yield int(y.item()), logprobs # type: ignore
if n % 256 == 0:
mx.clear_cache()
if next_y is not None and next_logprobs is not None:
y, logprobs = next_y, next_logprobs
y, logprobs = next_y, next_logprobs
n += 1
@@ -153,6 +156,7 @@ def stream_generate(
sampler: Callable[[mx.array], mx.array],
prompt_cache: Optional[list[KVCache]] = None,
prefill_step_size: int = 2048,
warmup: bool = False,
) -> Generator[GenerationResponse, None, None]:
# Try to infer if special tokens are needed
@@ -160,11 +164,12 @@ def stream_generate(
tokenizer.bos_token
)
prompt_array: mx.array = mx.array(tokenizer.encode(prompt, add_special_tokens=add_special_tokens))
runner_write_response(TokenizedResponse(prompt_tokens=len(prompt_array)))
if not warmup:
runner_write_response(TokenizedResponse(prompt_tokens=len(prompt_array)))
detokenizer = tokenizer.detokenizer
token_generator: Generator[Tuple[array, array], None, None] = generate_step(
token_generator: Generator[Tuple[int, array], None, None] = generate_step(
prompt_array,
model,
max_tokens=max_tokens,
@@ -179,12 +184,12 @@ def stream_generate(
if token in tokenizer.eos_token_ids:
break
detokenizer.add_token(int(token))
detokenizer.add_token(token)
# TODO: We could put more metrics on this GenerationResponse if we wish
yield GenerationResponse(
text=detokenizer.last_segment,
token=int(token),
token=token,
finish_reason=None,
)
@@ -192,7 +197,7 @@ def stream_generate(
detokenizer.finalize()
yield GenerationResponse(
text=detokenizer.last_segment,
token=int(token),
token=token,
finish_reason="stop" if token in tokenizer.eos_token_ids else "length",
)
@@ -222,12 +227,13 @@ async def warmup_inference(
def _generate_warmup():
nonlocal tokens_generated
for _ in mlx_stream_generate(
for _ in stream_generate(
model=model,
tokenizer=tokenizer,
prompt=warmup_prompt,
max_tokens=50,
sampler=sampler,
warmup=True,
):
tokens_generated += 1

View File

@@ -25,6 +25,7 @@ from exo.shared.types.worker.runners import (
InactiveRunnerStatus,
LoadedRunnerStatus,
RunningRunnerStatus,
StartingRunnerStatus,
)
from exo.worker.main import Worker
from exo.worker.tests.constants import (
@@ -85,9 +86,11 @@ async def test_runner_up_op(
events = await read_events_op(worker, runner_up_op)
assert len(events) == 1
assert len(events) == 2
assert isinstance(events[0], RunnerStatusUpdated)
assert isinstance(events[0].runner_status, LoadedRunnerStatus)
assert isinstance(events[0].runner_status, StartingRunnerStatus)
assert isinstance(events[1], RunnerStatusUpdated)
assert isinstance(events[1].runner_status, LoadedRunnerStatus)
# Is the runner actually running?
supervisor = next(iter(worker.assigned_runners.values())).runner

View File

@@ -1,203 +0,0 @@
# type: ignore
import contextlib
import os
import time
from pathlib import Path
import mlx.core as mx
import pytest
from mlx_lm.generate import stream_generate
from mlx_lm.sample_utils import make_sampler
from mlx_lm.tokenizer_utils import load_tokenizer
from mlx_lm.utils import load_model
MODEL_ID = "mlx-community/Llama-3.3-70B-Instruct-4bit"
MODEL_PATH = Path(
os.path.expanduser("~/.exo/models/mlx-community--Llama-3.3-70B-Instruct-4bit/")
)
def _get_model_size_gb(path: str) -> float:
"""Calculate total size of directory recursively in GB."""
total_size = 0
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if os.path.isfile(filepath):
total_size += os.path.getsize(filepath)
return total_size / (1024**3) # Convert bytes to GB
@pytest.mark.skipif(
not (os.path.exists(MODEL_PATH) and _get_model_size_gb(MODEL_PATH) > 30),
reason=f"This test only runs when model {MODEL_ID} is downloaded",
)
def test_mlx_profiling():
"""
Test MLX generation directly to profile:
- Time to first token (TTFT)
- Prefill tokens per second (TPS)
- Generation tokens per second (TPS)
For two consecutive prompts using the 70B Llama model.
"""
# How much memory to keep "wired" (resident) and how much freed memory MLX should keep cached
info = mx.metal.device_info() # returns limits & sizes
# Start conservatively: e.g., 7090% of recommended working set
target_bytes = int(0.8 * info["max_recommended_working_set_size"])
# Keep more freed buffers around for instant reuse
mx.set_cache_limit(target_bytes)
# On macOS 15+ you can wire resident memory to avoid OS paging/compression
with contextlib.suppress(Exception):
mx.set_wired_limit(target_bytes)
print(f"\n=== Loading Model {MODEL_ID} ===")
load_start = time.time()
# Load model and tokenizer
model, _ = load_model(MODEL_PATH, lazy=True, strict=False)
tokenizer = load_tokenizer(MODEL_PATH)
# Evaluate model parameters to load them into memory
mx.eval(model.parameters())
# Create sampler with temperature 0.7
sampler = make_sampler(temp=0.7)
load_time = time.time() - load_start
print(f"Model loaded in {load_time:.2f}s")
# Define test prompts
prompts = [
"Write me a haiku about a robot.",
"Please write a haiku about a flower.",
"Please write a haiku about headlights.",
]
# Prepare messages in chat format
test_messages = [[{"role": "user", "content": prompt}] for prompt in prompts]
results = []
for i, (messages, prompt_text) in enumerate(
zip(test_messages, prompts, strict=False), 1
):
print(f"\n=== Prompt {i}: '{prompt_text}' ===")
# Apply chat template
formatted_prompt = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
# Tokenize to count prompt tokens
prompt_tokens = tokenizer.encode(formatted_prompt)
num_prompt_tokens = len(prompt_tokens)
print(f"Prompt tokens: {num_prompt_tokens}")
# Start timing
start_time = time.time()
first_token_time = None
tokens_generated = 0
generated_text = ""
# Stream generate tokens
for generation in stream_generate(
model=model,
tokenizer=tokenizer,
prompt=formatted_prompt,
max_tokens=100,
sampler=sampler,
):
if first_token_time is None:
first_token_time = time.time()
ttft = first_token_time - start_time
print(f"Time to first token: {ttft:.3f}s")
tokens_generated += 1
generated_text += generation.text
# Stop if we hit the finish reason
if generation.finish_reason:
break
total_time = time.time() - start_time
generation_time = total_time - ttft if first_token_time else total_time
# Calculate metrics
prefill_tps = num_prompt_tokens / ttft if ttft > 0 else 0
generation_tps = (
tokens_generated / generation_time if generation_time > 0 else 0
)
# Store results
result = {
"prompt": prompt_text,
"ttft": ttft,
"total_time": total_time,
"generation_time": generation_time,
"prompt_tokens": num_prompt_tokens,
"tokens_generated": tokens_generated,
"prefill_tps": prefill_tps,
"generation_tps": generation_tps,
"generated_text": generated_text,
}
results.append(result)
# Print results for this prompt
print(f"Total completion time: {total_time:.3f}s")
print(f"Tokens generated: {tokens_generated}")
print(f"Response length: {len(generated_text)} chars")
print(
f"Prefill TPS: {prefill_tps:.1f} tokens/sec ({num_prompt_tokens} prompt tokens / {ttft:.3f}s)"
)
print(
f"Generation TPS: {generation_tps:.1f} tokens/sec ({tokens_generated} tokens / {generation_time:.3f}s)"
)
print(f"Generated text preview: {generated_text[:100]}...")
# Small delay between prompts
if i < len(prompts):
time.sleep(3.0)
# Compare results
print("\n=== Comparison ===")
if len(results) == 2:
r1, r2 = results[0], results[1]
print(f"Second prompt TTFT: {r2['ttft'] / r1['ttft']:.2f}x the first")
print(
f"Second prompt prefill TPS: {r2['prefill_tps'] / r1['prefill_tps']:.2f}x the first"
)
print(
f"Second prompt generation TPS: {r2['generation_tps'] / r1['generation_tps']:.2f}x the first"
)
# Performance expectations
print("\n=== Performance Summary ===")
print("First prompt:")
print(f" TTFT: {r1['ttft']:.3f}s")
print(f" Prefill: {r1['prefill_tps']:.1f} tok/s")
print(f" Generation: {r1['generation_tps']:.1f} tok/s")
print("Second prompt (warmed up):")
print(f" TTFT: {r2['ttft']:.3f}s")
print(f" Prefill: {r2['prefill_tps']:.1f} tok/s")
print(f" Generation: {r2['generation_tps']:.1f} tok/s")
# Basic assertions
for result in results:
assert result["ttft"] > 0, "TTFT must be positive"
assert result["tokens_generated"] > 0, "Must generate at least one token"
assert len(result["generated_text"]) > 0, "Must generate some text"
assert result["prefill_tps"] > 0, "Prefill TPS must be positive"
assert result["generation_tps"] > 0, "Generation TPS must be positive"
print("\n✅ All tests passed!")
if __name__ == "__main__":
test_mlx_profiling()

View File

@@ -74,7 +74,7 @@ def _get_model_size_gb(path: str) -> float:
@pytest.mark.skipif(
not (
True or not (
os.path.exists(
os.path.expanduser(
"~/.exo/models/mlx-community--Llama-3.3-70B-Instruct-4bit/"
@@ -310,7 +310,7 @@ async def test_ttft(
@pytest.mark.skipif(
not (
True or not (
os.path.exists(
os.path.expanduser(
"~/.exo/models/mlx-community--Llama-3.3-70B-Instruct-4bit/"
@@ -419,7 +419,7 @@ async def test_2_runner_inference(
@pytest.mark.skipif(
not (
True or not (
os.path.exists(
os.path.expanduser(
"~/.exo/models/mlx-community--Llama-3.3-70B-Instruct-4bit/"

View File

@@ -42,6 +42,7 @@ from exo.shared.types.worker.runners import (
InactiveRunnerStatus,
LoadedRunnerStatus,
RunningRunnerStatus,
StartingRunnerStatus,
)
from exo.shared.types.worker.shards import ShardMetadata
from exo.worker.common import AssignedRunner
@@ -229,6 +230,10 @@ class Worker:
) -> AsyncGenerator[Event, None]:
assigned_runner = self.assigned_runners[op.runner_id]
# Emit "Starting" status right away so UI can show loading state
assigned_runner.status = StartingRunnerStatus()
yield assigned_runner.status_update_event()
assigned_runner.runner = await RunnerSupervisor.create(
model_shard_meta=assigned_runner.shard_metadata,
hosts=assigned_runner.hosts,