Compare commits

...

8 Commits

Author SHA1 Message Date
ciaranbor
92de82bdf6 Add traces page to UI 2026-01-30 18:59:02 +00:00
ciaranbor
e792b19f5d Integrate trace collection with exo runners 2026-01-30 18:59:02 +00:00
ciaranbor
edc3a88b12 Minor tweaks 2026-01-30 18:58:30 +00:00
ciaranbor
0ae0c788d5 Average per step 2026-01-30 18:58:30 +00:00
ciaranbor
09abf44d49 Add statistics interpretation 2026-01-30 18:58:30 +00:00
ciaranbor
ac4b78349b Require external evals 2026-01-30 18:58:30 +00:00
ciaranbor
96c440c3b1 Instrument distributed runner 2026-01-30 18:58:30 +00:00
ciaranbor
c14d63cf61 Add tracing utils 2026-01-30 18:58:30 +00:00
12 changed files with 1570 additions and 133 deletions

View File

@@ -178,6 +178,36 @@ interface ImageApiResponse {
data: Array<{ b64_json?: string; url?: string }>;
}
// Trace API response types
export interface TraceCategoryStats {
totalUs: number;
count: number;
minUs: number;
maxUs: number;
avgUs: number;
}
export interface TraceRankStats {
byCategory: Record<string, TraceCategoryStats>;
}
export interface TraceStatsResponse {
taskId: string;
totalWallTimeUs: number;
byCategory: Record<string, TraceCategoryStats>;
byRank: Record<number, TraceRankStats>;
}
export interface TraceListItem {
taskId: string;
createdAt: string;
fileSize: number;
}
export interface TraceListResponse {
traces: TraceListItem[];
}
interface RawStateResponse {
topology?: RawTopology;
instances?: Record<
@@ -2555,6 +2585,49 @@ class AppStore {
throw error;
}
}
/**
* List all available traces
*/
async listTraces(): Promise<TraceListResponse> {
const response = await fetch("/v1/traces");
if (!response.ok) {
throw new Error(`Failed to list traces: ${response.status}`);
}
return (await response.json()) as TraceListResponse;
}
/**
* Check if a trace exists for a given task ID
*/
async checkTraceExists(taskId: string): Promise<boolean> {
try {
const response = await fetch(`/v1/traces/${encodeURIComponent(taskId)}`);
return response.ok;
} catch {
return false;
}
}
/**
* Get computed statistics for a task's trace
*/
async fetchTraceStats(taskId: string): Promise<TraceStatsResponse> {
const response = await fetch(
`/v1/traces/${encodeURIComponent(taskId)}/stats`,
);
if (!response.ok) {
throw new Error(`Failed to fetch trace stats: ${response.status}`);
}
return (await response.json()) as TraceStatsResponse;
}
/**
* Get the URL for the raw trace file (for Perfetto)
*/
getTraceRawUrl(taskId: string): string {
return `/v1/traces/${encodeURIComponent(taskId)}/raw`;
}
}
export const appStore = new AppStore();
@@ -2666,3 +2739,12 @@ export const startDownload = (nodeId: string, shardMetadata: object) =>
appStore.startDownload(nodeId, shardMetadata);
export const deleteDownload = (nodeId: string, modelId: string) =>
appStore.deleteDownload(nodeId, modelId);
// Trace actions
export const listTraces = () => appStore.listTraces();
export const checkTraceExists = (taskId: string) =>
appStore.checkTraceExists(taskId);
export const fetchTraceStats = (taskId: string) =>
appStore.fetchTraceStats(taskId);
export const getTraceRawUrl = (taskId: string) =>
appStore.getTraceRawUrl(taskId);

View File

@@ -0,0 +1,172 @@
<script lang="ts">
import { onMount } from "svelte";
import {
listTraces,
getTraceRawUrl,
type TraceListItem,
} from "$lib/stores/app.svelte";
import HeaderNav from "$lib/components/HeaderNav.svelte";
let traces = $state<TraceListItem[]>([]);
let loading = $state(true);
let error = $state<string | null>(null);
function formatBytes(bytes: number): string {
if (!bytes || bytes <= 0) return "0B";
const units = ["B", "KB", "MB", "GB"];
const i = Math.min(
Math.floor(Math.log(bytes) / Math.log(1024)),
units.length - 1,
);
const val = bytes / Math.pow(1024, i);
return `${val.toFixed(val >= 10 ? 0 : 1)}${units[i]}`;
}
function formatDate(isoString: string): string {
const date = new Date(isoString);
return date.toLocaleString();
}
async function openInPerfetto(taskId: string) {
// Fetch trace data from our local API
const response = await fetch(getTraceRawUrl(taskId));
const traceData = await response.arrayBuffer();
// Open Perfetto UI
const perfettoWindow = window.open("https://ui.perfetto.dev");
if (!perfettoWindow) {
alert("Failed to open Perfetto. Please allow popups.");
return;
}
// Wait for Perfetto to be ready, then send trace via postMessage
const onMessage = (e: MessageEvent) => {
if (e.data === "PONG") {
window.removeEventListener("message", onMessage);
perfettoWindow.postMessage(
{
perfetto: {
buffer: traceData,
title: `Trace ${taskId}`,
},
},
"https://ui.perfetto.dev",
);
}
};
window.addEventListener("message", onMessage);
// Ping Perfetto until it responds
const pingInterval = setInterval(() => {
perfettoWindow.postMessage("PING", "https://ui.perfetto.dev");
}, 50);
// Clean up after 10 seconds
setTimeout(() => {
clearInterval(pingInterval);
window.removeEventListener("message", onMessage);
}, 10000);
}
async function refresh() {
loading = true;
error = null;
try {
const response = await listTraces();
traces = response.traces;
} catch (e) {
error = e instanceof Error ? e.message : "Failed to load traces";
} finally {
loading = false;
}
}
onMount(() => {
refresh();
});
</script>
<div class="min-h-screen bg-exo-dark-gray text-white">
<HeaderNav showHome={true} />
<div class="max-w-7xl mx-auto px-4 lg:px-8 py-6 space-y-6">
<div class="flex items-center justify-between gap-4 flex-wrap">
<div>
<h1
class="text-2xl font-mono tracking-[0.2em] uppercase text-exo-yellow"
>
Traces
</h1>
</div>
<div class="flex items-center gap-3">
<button
type="button"
class="text-xs font-mono text-exo-light-gray hover:text-exo-yellow transition-colors uppercase border border-exo-medium-gray/40 px-2 py-1 rounded"
onclick={refresh}
disabled={loading}
>
Refresh
</button>
</div>
</div>
{#if loading}
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-6 text-center text-exo-light-gray"
>
<div class="text-sm">Loading traces...</div>
</div>
{:else if error}
<div
class="rounded border border-red-500/30 bg-red-500/10 p-6 text-center text-red-400"
>
<div class="text-sm">{error}</div>
</div>
{:else if traces.length === 0}
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-6 text-center text-exo-light-gray space-y-2"
>
<div class="text-sm">No traces found.</div>
<div class="text-xs text-exo-light-gray/70">
Run exo with EXO_TRACING_ENABLED=1 to collect traces.
</div>
</div>
{:else}
<div class="space-y-3">
{#each traces as trace}
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-4 flex items-center justify-between gap-4"
>
<div class="min-w-0 flex-1">
<a
href="#/traces/{trace.taskId}"
class="text-sm font-mono text-white hover:text-exo-yellow transition-colors truncate block"
>
{trace.taskId}
</a>
<div class="text-xs text-exo-light-gray font-mono mt-1">
{formatDate(trace.createdAt)} &bull; {formatBytes(
trace.fileSize,
)}
</div>
</div>
<div class="flex items-center gap-2 shrink-0">
<a
href="#/traces/{trace.taskId}"
class="text-xs font-mono text-exo-light-gray hover:text-exo-yellow transition-colors uppercase border border-exo-medium-gray/40 px-2 py-1 rounded"
>
View Stats
</a>
<button
type="button"
class="text-xs font-mono text-exo-dark-gray bg-exo-yellow hover:bg-exo-yellow/90 transition-colors uppercase px-2 py-1 rounded font-semibold"
onclick={() => openInPerfetto(trace.taskId)}
>
View Trace
</button>
</div>
</div>
{/each}
</div>
{/if}
</div>
</div>

View File

@@ -0,0 +1,347 @@
<script lang="ts">
import { page } from "$app/stores";
import { onMount } from "svelte";
import {
fetchTraceStats,
getTraceRawUrl,
type TraceStatsResponse,
type TraceCategoryStats,
} from "$lib/stores/app.svelte";
import HeaderNav from "$lib/components/HeaderNav.svelte";
const taskId = $derived($page.params.taskId);
let stats = $state<TraceStatsResponse | null>(null);
let loading = $state(true);
let error = $state<string | null>(null);
function formatDuration(us: number): string {
if (us < 1000) return `${us.toFixed(0)}us`;
if (us < 1_000_000) return `${(us / 1000).toFixed(2)}ms`;
return `${(us / 1_000_000).toFixed(2)}s`;
}
function formatPercentage(part: number, total: number): string {
if (total === 0) return "0.0%";
return `${((part / total) * 100).toFixed(1)}%`;
}
// Parse hierarchical categories like "sync/compute" into phases
type PhaseData = {
name: string;
subcategories: { name: string; stats: TraceCategoryStats }[];
totalUs: number; // From outer span (e.g., "sync" category)
stepCount: number; // Count of outer span events
};
function parsePhases(
byCategory: Record<string, TraceCategoryStats>,
): PhaseData[] {
const phases = new Map<
string,
{
subcats: Map<string, TraceCategoryStats>;
outerStats: TraceCategoryStats | null;
}
>();
for (const [category, catStats] of Object.entries(byCategory)) {
if (category.includes("/")) {
const [phase, subcat] = category.split("/", 2);
if (!phases.has(phase)) {
phases.set(phase, { subcats: new Map(), outerStats: null });
}
phases.get(phase)!.subcats.set(subcat, catStats);
} else {
// Outer span - this IS the phase total
if (!phases.has(category)) {
phases.set(category, { subcats: new Map(), outerStats: null });
}
phases.get(category)!.outerStats = catStats;
}
}
return Array.from(phases.entries())
.filter(([_, data]) => data.outerStats !== null) // Only phases with outer spans
.map(([name, data]) => ({
name,
subcategories: Array.from(data.subcats.entries())
.map(([subName, subStats]) => ({ name: subName, stats: subStats }))
.sort((a, b) => b.stats.totalUs - a.stats.totalUs),
totalUs: data.outerStats!.totalUs, // Outer span total
stepCount: data.outerStats!.count, // Number of steps
}))
.sort((a, b) => b.totalUs - a.totalUs);
}
async function openInPerfetto() {
if (!taskId) return;
// Fetch trace data from our local API
const response = await fetch(getTraceRawUrl(taskId));
const traceData = await response.arrayBuffer();
// Open Perfetto UI
const perfettoWindow = window.open("https://ui.perfetto.dev");
if (!perfettoWindow) {
alert("Failed to open Perfetto. Please allow popups.");
return;
}
// Wait for Perfetto to be ready, then send trace via postMessage
const onMessage = (e: MessageEvent) => {
if (e.data === "PONG") {
window.removeEventListener("message", onMessage);
perfettoWindow.postMessage(
{
perfetto: {
buffer: traceData,
title: `Trace ${taskId}`,
},
},
"https://ui.perfetto.dev",
);
}
};
window.addEventListener("message", onMessage);
// Ping Perfetto until it responds
const pingInterval = setInterval(() => {
perfettoWindow.postMessage("PING", "https://ui.perfetto.dev");
}, 50);
// Clean up after 10 seconds
setTimeout(() => {
clearInterval(pingInterval);
window.removeEventListener("message", onMessage);
}, 10000);
}
onMount(async () => {
if (!taskId) {
error = "No task ID provided";
loading = false;
return;
}
try {
stats = await fetchTraceStats(taskId);
} catch (e) {
error = e instanceof Error ? e.message : "Failed to load trace";
} finally {
loading = false;
}
});
const phases = $derived(stats ? parsePhases(stats.byCategory) : []);
const sortedRanks = $derived(
stats
? Object.keys(stats.byRank)
.map(Number)
.sort((a, b) => a - b)
: [],
);
const nodeCount = $derived(sortedRanks.length || 1);
</script>
<div class="min-h-screen bg-exo-dark-gray text-white">
<HeaderNav showHome={true} />
<div class="max-w-7xl mx-auto px-4 lg:px-8 py-6 space-y-6">
<div class="flex items-center justify-between gap-4 flex-wrap">
<div>
<h1
class="text-2xl font-mono tracking-[0.2em] uppercase text-exo-yellow"
>
Trace
</h1>
<p class="text-sm text-exo-light-gray font-mono truncate max-w-lg">
{taskId}
</p>
</div>
<div class="flex items-center gap-3">
<a
href="#/traces"
class="text-xs font-mono text-exo-light-gray hover:text-exo-yellow transition-colors uppercase border border-exo-medium-gray/40 px-3 py-1.5 rounded"
>
All Traces
</a>
<button
type="button"
class="text-xs font-mono text-exo-dark-gray bg-exo-yellow hover:bg-exo-yellow/90 transition-colors uppercase px-3 py-1.5 rounded font-semibold"
onclick={openInPerfetto}
disabled={loading || !!error}
>
View Trace
</button>
</div>
</div>
{#if loading}
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-6 text-center text-exo-light-gray"
>
<div class="text-sm">Loading trace data...</div>
</div>
{:else if error}
<div
class="rounded border border-red-500/30 bg-red-500/10 p-6 text-center text-red-400"
>
<div class="text-sm">{error}</div>
</div>
{:else if stats}
<!-- Wall Time Summary -->
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-4 space-y-2"
>
<h2
class="text-sm font-mono uppercase tracking-wider text-exo-light-gray"
>
Summary
</h2>
<div class="text-3xl font-mono text-exo-yellow">
{formatDuration(stats.totalWallTimeUs)}
</div>
<div class="text-xs text-exo-light-gray">Total wall time</div>
</div>
<!-- By Phase -->
{#if phases.length > 0}
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-4 space-y-4"
>
<h2
class="text-sm font-mono uppercase tracking-wider text-exo-light-gray"
>
By Phase <span class="text-exo-light-gray/50">(avg per node)</span>
</h2>
<div class="space-y-4">
{#each phases as phase}
{@const normalizedTotal = phase.totalUs / nodeCount}
{@const normalizedStepCount = phase.stepCount / nodeCount}
<div class="space-y-2">
<div class="flex items-center justify-between">
<span class="text-sm font-mono text-white">{phase.name}</span>
<span class="text-sm font-mono">
<span class="text-exo-yellow"
>{formatDuration(normalizedTotal)}</span
>
<span class="text-exo-light-gray ml-2">
({normalizedStepCount} steps, {formatDuration(
normalizedTotal / normalizedStepCount,
)}/step)
</span>
</span>
</div>
{#if phase.subcategories.length > 0}
<div class="pl-4 space-y-1.5">
{#each phase.subcategories as subcat}
{@const normalizedSubcat =
subcat.stats.totalUs / nodeCount}
{@const pct = formatPercentage(
normalizedSubcat,
normalizedTotal,
)}
{@const perStep = normalizedSubcat / normalizedStepCount}
<div
class="flex items-center justify-between text-xs font-mono"
>
<span class="text-exo-light-gray">{subcat.name}</span>
<span class="text-white">
{formatDuration(normalizedSubcat)}
<span class="text-exo-light-gray ml-2">({pct})</span>
<span class="text-exo-light-gray/60 ml-2"
>{formatDuration(perStep)}/step</span
>
</span>
</div>
<!-- Progress bar -->
<div
class="relative h-1.5 bg-exo-black/60 rounded-sm overflow-hidden"
>
<div
class="absolute inset-y-0 left-0 bg-gradient-to-r from-exo-yellow to-exo-yellow/70 transition-all duration-300"
style="width: {pct}"
></div>
</div>
{/each}
</div>
{/if}
</div>
{/each}
</div>
</div>
{/if}
<!-- By Rank -->
{#if sortedRanks.length > 0}
<div
class="rounded border border-exo-medium-gray/30 bg-exo-black/30 p-4 space-y-4"
>
<h2
class="text-sm font-mono uppercase tracking-wider text-exo-light-gray"
>
By Rank
</h2>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
{#each sortedRanks as rank}
{@const rankStats = stats.byRank[rank]}
{@const rankPhases = parsePhases(rankStats.byCategory)}
<div
class="rounded border border-exo-medium-gray/20 bg-exo-dark-gray/60 p-3 space-y-3"
>
<div class="text-sm font-mono text-exo-yellow">
Rank {rank}
</div>
<div class="space-y-2">
{#each rankPhases as phase}
<div class="space-y-1">
<div class="flex items-center justify-between text-xs">
<span class="font-mono text-exo-light-gray"
>{phase.name}</span
>
<span class="font-mono text-white">
{formatDuration(phase.totalUs)}
<span class="text-exo-light-gray/50 ml-1">
({phase.stepCount}x)
</span>
</span>
</div>
{#if phase.subcategories.length > 0}
<div class="pl-2 space-y-0.5">
{#each phase.subcategories as subcat}
{@const pct = formatPercentage(
subcat.stats.totalUs,
phase.totalUs,
)}
{@const perStep =
subcat.stats.totalUs / phase.stepCount}
<div
class="flex items-center justify-between text-[10px] font-mono"
>
<span class="text-exo-light-gray/70"
>{subcat.name}</span
>
<span class="text-exo-light-gray">
{formatDuration(subcat.stats.totalUs)}
<span class="text-exo-light-gray/50"
>({pct})</span
>
<span class="text-exo-light-gray/30 ml-1"
>{formatDuration(perStep)}/step</span
>
</span>
</div>
{/each}
</div>
{/if}
</div>
{/each}
</div>
</div>
{/each}
</div>
</div>
{/if}
{/if}
</div>
</div>

View File

@@ -3,7 +3,9 @@ import contextlib
import json
import time
from collections.abc import AsyncGenerator
from datetime import datetime, timezone
from http import HTTPStatus
from pathlib import Path
from typing import Annotated, Literal, cast
from uuid import uuid4
@@ -33,6 +35,7 @@ from exo.shared.models.model_cards import (
ModelCard,
ModelId,
)
from exo.shared.tracing import compute_stats, load_trace_file
from exo.shared.types.api import (
AdvancedImageParams,
BenchChatCompletionResponse,
@@ -67,6 +70,13 @@ from exo.shared.types.api import (
StreamingChoiceResponse,
StreamOptions,
ToolCall,
TraceCategoryStats,
TraceEventResponse,
TraceListItem,
TraceListResponse,
TraceRankStats,
TraceResponse,
TraceStatsResponse,
Usage,
)
from exo.shared.types.chunks import (
@@ -264,6 +274,10 @@ class API:
self.app.get("/events")(lambda: self._event_log)
self.app.post("/download/start")(self.start_download)
self.app.delete("/download/{node_id}/{model_id:path}")(self.delete_download)
self.app.get("/v1/traces")(self.list_traces)
self.app.get("/v1/traces/{task_id}")(self.get_trace)
self.app.get("/v1/traces/{task_id}/stats")(self.get_trace_stats)
self.app.get("/v1/traces/{task_id}/raw")(self.get_trace_raw)
async def place_instance(self, payload: PlaceInstanceParams):
command = PlaceInstance(
@@ -1336,3 +1350,110 @@ class API:
)
await self._send_download(command)
return DeleteDownloadResponse(command_id=command.command_id)
def _get_traces_dir(self) -> Path:
return Path.home() / ".exo" / "traces"
def _get_trace_path(self, task_id: str) -> Path:
return self._get_traces_dir() / f"trace_{task_id}.json"
async def list_traces(self) -> TraceListResponse:
traces_dir = self._get_traces_dir()
traces: list[TraceListItem] = []
if not traces_dir.exists():
return TraceListResponse(traces=[])
for trace_file in sorted(
traces_dir.glob("trace_*.json"),
key=lambda p: p.stat().st_mtime,
reverse=True,
):
# Extract task_id from filename (trace_{task_id}.json)
task_id = trace_file.stem.removeprefix("trace_")
stat = trace_file.stat()
created_at = datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat()
traces.append(
TraceListItem(
task_id=task_id,
created_at=created_at,
file_size=stat.st_size,
)
)
return TraceListResponse(traces=traces)
async def get_trace(self, task_id: str) -> TraceResponse:
trace_path = self._get_trace_path(task_id)
if not trace_path.exists():
raise HTTPException(status_code=404, detail=f"Trace not found: {task_id}")
trace_events = load_trace_file(trace_path)
return TraceResponse(
task_id=task_id,
traces=[
TraceEventResponse(
name=event.name,
start_us=event.start_us,
duration_us=event.duration_us,
rank=event.rank,
category=event.category,
)
for event in trace_events
],
)
async def get_trace_stats(self, task_id: str) -> TraceStatsResponse:
trace_path = self._get_trace_path(task_id)
if not trace_path.exists():
raise HTTPException(status_code=404, detail=f"Trace not found: {task_id}")
trace_events = load_trace_file(trace_path)
stats = compute_stats(trace_events)
return TraceStatsResponse(
task_id=task_id,
total_wall_time_us=stats.total_wall_time_us,
by_category={
category: TraceCategoryStats(
total_us=cat_stats.total_us,
count=cat_stats.count,
min_us=cat_stats.min_us,
max_us=cat_stats.max_us,
avg_us=cat_stats.avg_us,
)
for category, cat_stats in stats.by_category.items()
},
by_rank={
rank: TraceRankStats(
by_category={
category: TraceCategoryStats(
total_us=cat_stats.total_us,
count=cat_stats.count,
min_us=cat_stats.min_us,
max_us=cat_stats.max_us,
avg_us=cat_stats.avg_us,
)
for category, cat_stats in rank_stats.items()
}
)
for rank, rank_stats in stats.by_rank.items()
},
)
async def get_trace_raw(self, task_id: str) -> FileResponse:
trace_path = self._get_trace_path(task_id)
if not trace_path.exists():
raise HTTPException(status_code=404, detail=f"Trace not found: {task_id}")
return FileResponse(
path=trace_path,
media_type="application/json",
filename=f"trace_{task_id}.json",
)

View File

@@ -1,4 +1,5 @@
from datetime import datetime, timedelta, timezone
from pathlib import Path
import anyio
from anyio.abc import TaskGroup
@@ -11,6 +12,7 @@ from exo.master.placement import (
place_instance,
)
from exo.shared.apply import apply
from exo.shared.tracing import TraceEvent, export_trace, is_tracing_enabled
from exo.shared.types.commands import (
ChatCompletion,
CreateInstance,
@@ -35,6 +37,8 @@ from exo.shared.types.events import (
NodeTimedOut,
TaskCreated,
TaskDeleted,
TraceEventData,
TracesCollected,
)
from exo.shared.types.state import State
from exo.shared.types.tasks import (
@@ -86,6 +90,8 @@ class Master:
self._multi_buffer = MultiSourceBuffer[NodeId, Event]()
# TODO: not have this
self._event_log: list[Event] = []
self._pending_traces: dict[TaskId, dict[int, list[TraceEventData]]] = {}
self._expected_ranks: dict[TaskId, set[int]] = {}
async def run(self):
logger.info("Starting Master")
@@ -187,13 +193,14 @@ class Master:
)
task_id = TaskId()
selected_instance_id = available_instance_ids[0]
generated_events.append(
TaskCreated(
task_id=task_id,
task=ImageGenerationTask(
task_id=task_id,
command_id=command.command_id,
instance_id=available_instance_ids[0],
instance_id=selected_instance_id,
task_status=TaskStatus.Pending,
task_params=command.request_params,
),
@@ -201,6 +208,17 @@ class Master:
)
self.command_task_mapping[command.command_id] = task_id
if is_tracing_enabled():
selected_instance = self.state.instances.get(
selected_instance_id
)
if selected_instance:
ranks = set(
shard.device_rank
for shard in selected_instance.shard_assignments.runner_to_shard.values()
)
self._expected_ranks[task_id] = ranks
case ImageEdits():
for instance in self.state.instances.values():
if (
@@ -229,13 +247,14 @@ class Master:
)
task_id = TaskId()
selected_instance_id = available_instance_ids[0]
generated_events.append(
TaskCreated(
task_id=task_id,
task=ImageEditsTask(
task_id=task_id,
command_id=command.command_id,
instance_id=available_instance_ids[0],
instance_id=selected_instance_id,
task_status=TaskStatus.Pending,
task_params=command.request_params,
),
@@ -243,6 +262,17 @@ class Master:
)
self.command_task_mapping[command.command_id] = task_id
if is_tracing_enabled():
selected_instance = self.state.instances.get(
selected_instance_id
)
if selected_instance:
ranks = set(
shard.device_rank
for shard in selected_instance.shard_assignments.runner_to_shard.values()
)
self._expected_ranks[task_id] = ranks
case DeleteInstance():
placement = delete_instance(command, self.state.instances)
transition_events = get_transition_events(
@@ -335,6 +365,10 @@ class Master:
local_event.origin,
)
for event in self._multi_buffer.drain():
if isinstance(event, TracesCollected):
self._handle_traces_collected(event)
continue
logger.debug(f"Master indexing event: {str(event)[:100]}")
indexed = IndexedEvent(event=event, idx=len(self._event_log))
self.state = apply(self.state, indexed)
@@ -373,3 +407,38 @@ class Master:
event=event.event,
)
)
def _handle_traces_collected(self, event: TracesCollected) -> None:
task_id = event.task_id
if task_id not in self._pending_traces:
self._pending_traces[task_id] = {}
self._pending_traces[task_id][event.rank] = event.traces
if (
task_id in self._expected_ranks
and set(self._pending_traces[task_id].keys())
>= self._expected_ranks[task_id]
):
self._merge_and_save_traces(task_id)
def _merge_and_save_traces(self, task_id: TaskId) -> None:
all_traces: list[TraceEvent] = []
for trace_data in self._pending_traces[task_id].values():
for t in trace_data:
all_traces.append(
TraceEvent(
name=t.name,
start_us=t.start_us,
duration_us=t.duration_us,
rank=t.rank,
category=t.category,
)
)
output_path = Path.home() / ".exo" / "traces" / f"trace_{task_id}.json"
export_trace(all_traces, output_path)
logger.info(f"Merged traces saved to {output_path}")
del self._pending_traces[task_id]
if task_id in self._expected_ranks:
del self._expected_ranks[task_id]

View File

@@ -25,6 +25,7 @@ from exo.shared.types.events import (
TestEvent,
TopologyEdgeCreated,
TopologyEdgeDeleted,
TracesCollected,
)
from exo.shared.types.profiling import (
NodeIdentity,
@@ -55,7 +56,11 @@ def event_apply(event: Event, state: State) -> State:
"""Apply an event to state."""
match event:
case (
TestEvent() | ChunkGenerated() | TaskAcknowledged() | InputChunkReceived()
TestEvent()
| ChunkGenerated()
| TaskAcknowledged()
| InputChunkReceived()
| TracesCollected()
): # Pass-through events that don't modify state
return state
case InstanceCreated():

View File

@@ -53,3 +53,9 @@ EXO_IMAGE_CACHE_DIR = EXO_CACHE_HOME / "images"
EXO_ENABLE_IMAGE_MODELS = (
os.getenv("EXO_ENABLE_IMAGE_MODELS", "false").lower() == "true"
)
EXO_TRACING_ENABLED = os.getenv("EXO_TRACING_ENABLED", "").lower() in (
"1",
"true",
"yes",
)

450
src/exo/shared/tracing.py Normal file
View File

@@ -0,0 +1,450 @@
from __future__ import annotations
import json
import logging
import time
from collections import defaultdict
from collections.abc import Generator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field
from pathlib import Path
from typing import cast, final
from exo.shared.constants import EXO_TRACING_ENABLED
logger = logging.getLogger(__name__)
# Context variable to track the current trace category for hierarchical nesting
_current_category: ContextVar[str | None] = ContextVar("current_category", default=None)
@final
@dataclass(frozen=True)
class TraceEvent:
name: str
start_us: int
duration_us: int
rank: int
category: str
@final
@dataclass
class CategoryStats:
total_us: int = 0
count: int = 0
min_us: int = 0
max_us: int = 0
def add(self, duration_us: int) -> None:
if self.count == 0:
self.min_us = duration_us
self.max_us = duration_us
else:
self.min_us = min(self.min_us, duration_us)
self.max_us = max(self.max_us, duration_us)
self.total_us += duration_us
self.count += 1
@property
def avg_us(self) -> float:
return self.total_us / self.count if self.count > 0 else 0.0
@final
@dataclass
class TraceStats:
total_wall_time_us: int = 0
by_category: dict[str, CategoryStats] = field(default_factory=dict)
by_rank: dict[int, dict[str, CategoryStats]] = field(default_factory=dict)
# Global trace buffer - each rank accumulates traces here
_trace_buffer: list[TraceEvent] = []
def is_tracing_enabled() -> bool:
"""Check if tracing is enabled via environment variable."""
return EXO_TRACING_ENABLED
def _record_span(
name: str, start_us: int, duration_us: int, rank: int, category: str
) -> None:
_trace_buffer.append(
TraceEvent(
name=name,
start_us=start_us,
duration_us=duration_us,
rank=rank,
category=category,
)
)
@contextmanager
def trace(
name: str,
rank: int,
category: str = "compute",
) -> Generator[None, None, None]:
"""Context manager to trace any operation.
Nested traces automatically inherit the parent category, creating hierarchical
categories like "sync/compute" or "async/comms".
Args:
name: Name of the operation (e.g., "recv 0", "send 1", "joint_blocks")
rank: This rank's ID
category: Category for grouping in trace viewer ("comm", "compute", "step")
Example:
with trace(f"sync {t}", rank, "sync"):
with trace("joint_blocks", rank, "compute"):
# Recorded with category "sync/compute"
hidden_states = some_computation(...)
"""
if not is_tracing_enabled():
yield
return
# Combine with parent category if nested
parent = _current_category.get()
full_category = f"{parent}/{category}" if parent else category
# Set as current for nested traces
token = _current_category.set(full_category)
try:
start_us = int(time.time() * 1_000_000)
start_perf = time.perf_counter()
yield
duration_us = int((time.perf_counter() - start_perf) * 1_000_000)
_record_span(name, start_us, duration_us, rank, full_category)
finally:
_current_category.reset(token)
def get_trace_buffer() -> list[TraceEvent]:
return list(_trace_buffer)
def clear_trace_buffer() -> None:
_trace_buffer.clear()
def export_trace(traces: list[TraceEvent], output_path: Path) -> None:
trace_events: list[dict[str, object]] = []
for event in traces:
# Chrome trace format uses "X" for complete events (with duration)
chrome_event: dict[str, object] = {
"name": event.name,
"cat": event.category,
"ph": "X",
"ts": event.start_us,
"dur": event.duration_us,
"pid": 0,
"tid": event.rank,
"args": {"rank": event.rank},
}
trace_events.append(chrome_event)
ranks_seen = set(t.rank for t in traces)
for rank in ranks_seen:
trace_events.append(
{
"name": "thread_name",
"ph": "M", # Metadata event
"pid": 0,
"tid": rank,
"args": {"name": f"Rank {rank}"},
}
)
chrome_trace = {"traceEvents": trace_events}
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(chrome_trace, f, indent=2)
except OSError as e:
logger.warning("Failed to export trace to %s: %s", output_path, e)
def export_local_traces(rank: int) -> None:
if not is_tracing_enabled():
return
local_traces = get_trace_buffer()
if local_traces:
output_path = Path.home() / ".exo" / "traces" / f"trace_{rank}.json"
try:
export_trace(local_traces, output_path)
except Exception as e:
logger.warning("Failed to export local traces for rank %d: %s", rank, e)
clear_trace_buffer()
def merge_trace_files(trace_dir: Path | None = None) -> Path | None:
if trace_dir is None:
trace_dir = Path.home() / ".exo" / "traces"
if not trace_dir.exists():
return None
trace_files = sorted(trace_dir.glob("trace_*.json"))
if not trace_files:
return None
merged_events: list[dict[str, object]] = []
for trace_file in trace_files:
file_rank = int(trace_file.stem.split("_")[1])
with open(trace_file) as f:
raw = f.read()
data = cast(dict[str, list[dict[str, object]]], json.loads(raw))
events: list[dict[str, object]] = data.get("traceEvents", [])
for event in events:
event["tid"] = file_rank
if "args" in event and isinstance(event["args"], dict):
event["args"]["rank"] = file_rank
merged_events.extend(events)
output_path = Path.home() / ".exo" / "traces" / "merged_trace.json"
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump({"traceEvents": merged_events}, f, indent=2)
except OSError as e:
logger.warning("Failed to write merged trace to %s: %s", output_path, e)
return None
return output_path
def _format_duration(us: int | float) -> str:
if us < 1000:
return f"{us:.0f}µs"
elif us < 1_000_000:
return f"{us / 1000:.2f}ms"
else:
return f"{us / 1_000_000:.2f}s"
def load_trace_file(path: Path) -> list[TraceEvent]:
"""Load a Chrome Trace Format JSON file into TraceEvent objects."""
with open(path) as f:
data = cast(dict[str, list[dict[str, object]]], json.load(f))
events = data.get("traceEvents", [])
traces: list[TraceEvent] = []
for event in events:
# Skip metadata events
if event.get("ph") == "M":
continue
name = str(event.get("name", ""))
category = str(event.get("cat", ""))
ts_value = event.get("ts", 0)
dur_value = event.get("dur", 0)
tid_value = event.get("tid", 0)
start_us = int(ts_value) if isinstance(ts_value, (int, float, str)) else 0
duration_us = int(dur_value) if isinstance(dur_value, (int, float, str)) else 0
# Get rank from tid or args
rank = int(tid_value) if isinstance(tid_value, (int, float, str)) else 0
args = event.get("args")
if isinstance(args, dict):
args_dict = cast(dict[str, object], args)
rank_from_args = args_dict.get("rank")
if isinstance(rank_from_args, (int, float, str)):
rank = int(rank_from_args)
traces.append(
TraceEvent(
name=name,
start_us=start_us,
duration_us=duration_us,
rank=rank,
category=category,
)
)
return traces
def compute_stats(traces: list[TraceEvent]) -> TraceStats:
"""Compute comprehensive statistics from trace events."""
stats = TraceStats()
if not traces:
return stats
# Calculate wall time from earliest start to latest end
min_start = min(t.start_us for t in traces)
max_end = max(t.start_us + t.duration_us for t in traces)
stats.total_wall_time_us = max_end - min_start
# Initialize nested dicts
by_category: dict[str, CategoryStats] = defaultdict(CategoryStats)
by_rank: dict[int, dict[str, CategoryStats]] = defaultdict(
lambda: defaultdict(CategoryStats)
)
for event in traces:
# By category
by_category[event.category].add(event.duration_us)
# By rank and category
by_rank[event.rank][event.category].add(event.duration_us)
stats.by_category = dict(by_category)
stats.by_rank = {k: dict(v) for k, v in by_rank.items()}
return stats
def print_stats(stats: TraceStats) -> None:
"""Print formatted trace statistics."""
print("=== Trace Statistics ===")
print()
print(f"Wall Time: {_format_duration(stats.total_wall_time_us)}")
print()
# Parse hierarchical categories (e.g., "sync/compute" -> phase="sync", subcat="compute")
if stats.by_category:
phases: dict[str, dict[str, CategoryStats]] = defaultdict(dict)
has_hierarchical = False
for cat, cat_stats in stats.by_category.items():
if "/" in cat:
phase, subcat = cat.split("/", 1)
phases[phase][subcat] = cat_stats
has_hierarchical = True
else:
phases[cat]["_total"] = cat_stats
if has_hierarchical:
print("By Phase:")
for phase in sorted(phases.keys()):
subcats = phases[phase]
# Skip phases that only have _total (non-hierarchical top-level categories)
non_total_subcats = {k: v for k, v in subcats.items() if k != "_total"}
if not non_total_subcats:
continue
phase_total = sum(s.total_us for s in non_total_subcats.values())
print(f" {phase}:")
for subcat, subcat_stats in sorted(
non_total_subcats.items(),
key=lambda x: x[1].total_us,
reverse=True,
):
pct = (
subcat_stats.total_us / phase_total * 100 if phase_total else 0
)
# Use parent phase's step count for per-step average
phase_step_count = subcats.get("_total", CategoryStats()).count
if phase_step_count > 0:
avg_per_step = subcat_stats.total_us / phase_step_count
else:
avg_per_step = subcat_stats.avg_us # fallback
print(
f" {subcat:12s} {_format_duration(subcat_stats.total_us):>10s} "
f"({pct:5.1f}%) avg: {_format_duration(avg_per_step)}"
)
print()
else:
# Fall back to flat category display if no hierarchical categories
print("By Category:")
total_time = sum(c.total_us for c in stats.by_category.values())
for category, cat_stats in sorted(
stats.by_category.items(), key=lambda x: x[1].total_us, reverse=True
):
pct = (cat_stats.total_us / total_time * 100) if total_time > 0 else 0
print(
f" {category:12s} {_format_duration(cat_stats.total_us):>10s} "
f"({pct:5.1f}%) avg: {_format_duration(cat_stats.avg_us):>8s} "
f"count: {cat_stats.count}"
)
print()
# By Rank
if stats.by_rank:
print("By Rank:")
for rank in sorted(stats.by_rank.keys()):
rank_stats = stats.by_rank[rank]
print(f" Rank {rank}:")
# Parse hierarchical categories for this rank
rank_phases: dict[str, dict[str, CategoryStats]] = defaultdict(dict)
has_hierarchical = False
for cat, cat_stats in rank_stats.items():
if "/" in cat:
phase, subcat = cat.split("/", 1)
rank_phases[phase][subcat] = cat_stats
has_hierarchical = True
else:
rank_phases[cat]["_total"] = cat_stats
if has_hierarchical:
for phase in sorted(rank_phases.keys()):
subcats = rank_phases[phase]
non_total_subcats = {
k: v for k, v in subcats.items() if k != "_total"
}
if not non_total_subcats:
continue
phase_total = sum(s.total_us for s in non_total_subcats.values())
print(f" {phase}:")
for subcat, subcat_stats in sorted(
non_total_subcats.items(),
key=lambda x: x[1].total_us,
reverse=True,
):
pct = (
subcat_stats.total_us / phase_total * 100
if phase_total
else 0
)
# Use parent phase's step count for per-step average
phase_step_count = subcats.get("_total", CategoryStats()).count
if phase_step_count > 0:
avg_per_step = subcat_stats.total_us / phase_step_count
else:
avg_per_step = subcat_stats.avg_us # fallback
print(
f" {subcat:12s} {_format_duration(subcat_stats.total_us):>10s} "
f"({pct:5.1f}%) avg: {_format_duration(avg_per_step)}"
)
else:
# Flat display fallback
for category, cat_stats in sorted(
rank_stats.items(), key=lambda x: x[1].total_us, reverse=True
):
print(f" {category}: {_format_duration(cat_stats.total_us)}")
print()
if __name__ == "__main__":
import sys
path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("trace.json")
if not path.exists():
print(f"Error: File not found: {path}")
sys.exit(1)
traces = load_trace_file(path)
if not traces:
print("No trace events found in file.")
sys.exit(0)
computed_stats = compute_stats(traces)
print_stats(computed_stats)

View File

@@ -373,3 +373,45 @@ class StartDownloadResponse(CamelCaseModel):
class DeleteDownloadResponse(CamelCaseModel):
command_id: CommandId
class TraceEventResponse(CamelCaseModel):
name: str
start_us: int
duration_us: int
rank: int
category: str
class TraceResponse(CamelCaseModel):
task_id: str
traces: list[TraceEventResponse]
class TraceCategoryStats(CamelCaseModel):
total_us: int
count: int
min_us: int
max_us: int
avg_us: float
class TraceRankStats(CamelCaseModel):
by_category: dict[str, TraceCategoryStats]
class TraceStatsResponse(CamelCaseModel):
task_id: str
total_wall_time_us: int
by_category: dict[str, TraceCategoryStats]
by_rank: dict[int, TraceRankStats]
class TraceListItem(CamelCaseModel):
task_id: str
created_at: str
file_size: int
class TraceListResponse(CamelCaseModel):
traces: list[TraceListItem]

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from typing import final
from pydantic import Field
@@ -10,7 +11,7 @@ from exo.shared.types.worker.downloads import DownloadProgress
from exo.shared.types.worker.instances import Instance, InstanceId
from exo.shared.types.worker.runners import RunnerId, RunnerStatus
from exo.utils.info_gatherer.info_gatherer import GatheredInfo
from exo.utils.pydantic_ext import CamelCaseModel, TaggedModel
from exo.utils.pydantic_ext import CamelCaseModel, FrozenModel, TaggedModel
class EventId(Id):
@@ -109,6 +110,22 @@ class TopologyEdgeDeleted(BaseEvent):
conn: Connection
@final
class TraceEventData(FrozenModel):
name: str
start_us: int
duration_us: int
rank: int
category: str
@final
class TracesCollected(BaseEvent):
task_id: TaskId
rank: int
traces: list[TraceEventData]
Event = (
TestEvent
| TaskCreated
@@ -127,6 +144,7 @@ Event = (
| InputChunkReceived
| TopologyEdgeCreated
| TopologyEdgeDeleted
| TracesCollected
)

View File

@@ -6,6 +6,11 @@ from mflux.models.common.config.config import Config
from mflux.utils.exceptions import StopImageGenerationException
from tqdm import tqdm
from exo.shared.tracing import (
clear_trace_buffer,
is_tracing_enabled,
trace,
)
from exo.shared.types.worker.shards import PipelineShardMetadata
from exo.worker.engines.image.config import ImageModelConfig
from exo.worker.engines.image.models.base import (
@@ -324,6 +329,7 @@ class DiffusionRunner:
capture_steps = set()
self._reset_all_caches()
clear_trace_buffer()
time_steps = tqdm(range(runtime_config.num_inference_steps))
@@ -465,20 +471,22 @@ class DiffusionRunner:
if self.group is None:
return self._single_node_step(t, config, latents, prompt_data)
elif t < config.init_time_step + num_sync_steps:
return self._sync_pipeline_step(
t,
config,
latents,
prompt_data,
)
with trace(name=f"sync {t}", rank=self.rank, category="sync"):
return self._sync_pipeline_step(
t,
config,
latents,
prompt_data,
)
else:
return self._async_pipeline_step(
t,
config,
latents,
prompt_data,
is_first_async_step=t == config.init_time_step + num_sync_steps,
)
with trace(name=f"async {t}", rank=self.rank, category="async"):
return self._async_pipeline_step(
t,
config,
latents,
prompt_data,
is_first_async_step=t == config.init_time_step + num_sync_steps,
)
def _single_node_step(
self,
@@ -586,30 +594,41 @@ class DiffusionRunner:
if self.has_joint_blocks:
if not self.is_first_stage:
hidden_states = mx.distributed.recv(
(batch_size, num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
group=self.group,
)
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
dtype,
self.prev_rank,
group=self.group,
)
mx.eval(hidden_states, encoder_hidden_states)
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
):
hidden_states = mx.distributed.recv(
(batch_size, num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
group=self.group,
)
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
dtype,
self.prev_rank,
group=self.group,
)
mx.eval(hidden_states, encoder_hidden_states)
assert self.joint_block_wrappers is not None
assert encoder_hidden_states is not None
for wrapper in self.joint_block_wrappers:
wrapper.set_patch(BlockWrapperMode.CACHING)
encoder_hidden_states, hidden_states = wrapper(
hidden_states=hidden_states,
encoder_hidden_states=encoder_hidden_states,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
with trace(
name="joint_blocks",
rank=self.rank,
category="compute",
):
for wrapper in self.joint_block_wrappers:
wrapper.set_patch(BlockWrapperMode.CACHING)
encoder_hidden_states, hidden_states = wrapper(
hidden_states=hidden_states,
encoder_hidden_states=encoder_hidden_states,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
if is_tracing_enabled():
mx.eval(encoder_hidden_states, hidden_states)
if self.owns_concat_stage:
assert encoder_hidden_states is not None
@@ -620,45 +639,63 @@ class DiffusionRunner:
if self.has_single_blocks or self.is_last_stage:
hidden_states = concatenated
else:
concatenated = mx.distributed.send(
concatenated, self.next_rank, group=self.group
)
mx.async_eval(concatenated)
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
):
concatenated = mx.distributed.send(
concatenated, self.next_rank, group=self.group
)
mx.async_eval(concatenated)
elif self.has_joint_blocks and not self.is_last_stage:
assert encoder_hidden_states is not None
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
)
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
)
mx.async_eval(hidden_states, encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
hidden_states = mx.distributed.recv(
(batch_size, text_seq_len + num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
group=self.group,
)
mx.eval(hidden_states)
assert self.single_block_wrappers is not None
for wrapper in self.single_block_wrappers:
wrapper.set_patch(BlockWrapperMode.CACHING)
hidden_states = wrapper(
hidden_states=hidden_states,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
if not self.is_last_stage:
with trace(name=f"send {self.next_rank}", rank=self.rank, category="comms"):
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
)
mx.async_eval(hidden_states)
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
)
mx.async_eval(hidden_states, encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
):
hidden_states = mx.distributed.recv(
(batch_size, text_seq_len + num_img_tokens, hidden_dim),
dtype,
self.prev_rank,
group=self.group,
)
mx.eval(hidden_states)
assert self.single_block_wrappers is not None
with trace(
name="single blocks",
rank=self.rank,
category="compute",
):
for wrapper in self.single_block_wrappers:
wrapper.set_patch(BlockWrapperMode.CACHING)
hidden_states = wrapper(
hidden_states=hidden_states,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
if is_tracing_enabled():
mx.eval(hidden_states)
if not self.is_last_stage:
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
):
hidden_states = mx.distributed.send(
hidden_states, self.next_rank, group=self.group
)
mx.async_eval(hidden_states)
hidden_states = hidden_states[:, text_seq_len:, ...]
@@ -742,14 +779,20 @@ class DiffusionRunner:
)
if not self.is_first_stage:
hidden_states = mx.distributed.send(hidden_states, 0, group=self.group)
mx.async_eval(hidden_states)
with trace(name="send 0", rank=self.rank, category="comms"):
hidden_states = mx.distributed.send(
hidden_states, 0, group=self.group
)
mx.async_eval(hidden_states)
elif self.is_first_stage:
hidden_states = mx.distributed.recv_like(
prev_latents, src=self.world_size - 1, group=self.group
)
mx.eval(hidden_states)
with trace(
name=f"recv {self.world_size - 1}", rank=self.rank, category="comms"
):
hidden_states = mx.distributed.recv_like(
prev_latents, src=self.world_size - 1, group=self.group
)
mx.eval(hidden_states)
else:
hidden_states = prev_latents
@@ -809,10 +852,13 @@ class DiffusionRunner:
and not self.is_last_stage
and not is_first_async_step
):
patch = mx.distributed.recv_like(
patch, src=self.prev_rank, group=self.group
)
mx.eval(patch)
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
):
patch = mx.distributed.recv_like(
patch, src=self.prev_rank, group=self.group
)
mx.eval(patch)
step_patch = mx.concatenate([patch, patch], axis=0) if needs_cfg else patch
@@ -843,10 +889,13 @@ class DiffusionRunner:
)
if not self.is_first_stage and t != config.num_inference_steps - 1:
patch_latents[patch_idx] = mx.distributed.send(
patch_latents[patch_idx], self.next_rank, group=self.group
)
mx.async_eval(patch_latents[patch_idx])
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
):
patch_latents[patch_idx] = mx.distributed.send(
patch_latents[patch_idx], self.next_rank, group=self.group
)
mx.async_eval(patch_latents[patch_idx])
return mx.concatenate(patch_latents, axis=1)
@@ -885,22 +934,28 @@ class DiffusionRunner:
if self.has_joint_blocks:
if not self.is_first_stage:
patch_len = patch.shape[1]
patch = mx.distributed.recv(
(batch_size, patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
group=self.group,
)
mx.eval(patch)
if patch_idx == 0:
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
):
patch = mx.distributed.recv(
(batch_size, patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
group=self.group,
)
mx.eval(encoder_hidden_states)
mx.eval(patch)
if patch_idx == 0:
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
):
encoder_hidden_states = mx.distributed.recv(
(batch_size, text_seq_len, hidden_dim),
patch.dtype,
self.prev_rank,
group=self.group,
)
mx.eval(encoder_hidden_states)
if self.is_first_stage:
patch, encoder_hidden_states = self.adapter.compute_embeddings(
@@ -909,14 +964,22 @@ class DiffusionRunner:
assert self.joint_block_wrappers is not None
assert encoder_hidden_states is not None
for wrapper in self.joint_block_wrappers:
wrapper.set_patch(BlockWrapperMode.PATCHED, start_token, end_token)
encoder_hidden_states, patch = wrapper(
hidden_states=patch,
encoder_hidden_states=encoder_hidden_states,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
with trace(
name=f"joint patch {patch_idx}",
rank=self.rank,
category="compute",
):
for wrapper in self.joint_block_wrappers:
wrapper.set_patch(BlockWrapperMode.PATCHED, start_token, end_token)
encoder_hidden_states, patch = wrapper(
hidden_states=patch,
encoder_hidden_states=encoder_hidden_states,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
if is_tracing_enabled():
mx.eval(encoder_hidden_states, patch)
if self.owns_concat_stage:
assert encoder_hidden_states is not None
@@ -925,49 +988,70 @@ class DiffusionRunner:
if self.has_single_blocks or self.is_last_stage:
patch = patch_concat
else:
patch_concat = mx.distributed.send(
patch_concat, self.next_rank, group=self.group
)
mx.async_eval(patch_concat)
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
):
patch_concat = mx.distributed.send(
patch_concat, self.next_rank, group=self.group
)
mx.async_eval(patch_concat)
elif self.has_joint_blocks and not self.is_last_stage:
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
mx.async_eval(patch)
with trace(name=f"send {self.next_rank}", rank=self.rank, category="comms"):
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
mx.async_eval(patch)
if patch_idx == 0:
assert encoder_hidden_states is not None
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
)
mx.async_eval(encoder_hidden_states)
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
):
encoder_hidden_states = mx.distributed.send(
encoder_hidden_states, self.next_rank, group=self.group
)
mx.async_eval(encoder_hidden_states)
if self.has_single_blocks:
if not self.owns_concat_stage and not self.is_first_stage:
patch_len = patch.shape[1]
patch = mx.distributed.recv(
(batch_size, text_seq_len + patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
group=self.group,
)
mx.eval(patch)
with trace(
name=f"recv {self.prev_rank}", rank=self.rank, category="comms"
):
patch = mx.distributed.recv(
(batch_size, text_seq_len + patch_len, hidden_dim),
patch.dtype,
self.prev_rank,
group=self.group,
)
mx.eval(patch)
assert self.single_block_wrappers is not None
for wrapper in self.single_block_wrappers:
wrapper.set_patch(BlockWrapperMode.PATCHED, start_token, end_token)
patch = wrapper(
hidden_states=patch,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
with trace(
name=f"single patch {patch_idx}",
rank=self.rank,
category="compute",
):
for wrapper in self.single_block_wrappers:
wrapper.set_patch(BlockWrapperMode.PATCHED, start_token, end_token)
patch = wrapper(
hidden_states=patch,
text_embeddings=text_embeddings,
rotary_embeddings=image_rotary_embeddings,
)
if is_tracing_enabled():
mx.eval(patch)
if not self.is_last_stage:
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
mx.async_eval(patch)
with trace(
name=f"send {self.next_rank}", rank=self.rank, category="comms"
):
patch = mx.distributed.send(patch, self.next_rank, group=self.group)
mx.async_eval(patch)
noise: mx.array | None = None
if self.is_last_stage:
patch_img_only = patch[:, text_seq_len:, :]
noise = self.adapter.final_projection(patch_img_only, text_embeddings)
patch = patch[:, text_seq_len:, :]
noise = self.adapter.final_projection(patch, text_embeddings)
return noise, encoder_hidden_states

View File

@@ -18,6 +18,7 @@ from pydantic import ValidationError
from exo.shared.constants import EXO_MAX_CHUNK_SIZE
from exo.shared.models.model_cards import ModelId, ModelTask
from exo.shared.tracing import clear_trace_buffer, get_trace_buffer, is_tracing_enabled
from exo.shared.types.api import ChatCompletionMessageText, ImageGenerationStats
from exo.shared.types.chunks import ErrorChunk, ImageChunk, TokenChunk, ToolCallChunk
from exo.shared.types.common import CommandId
@@ -27,6 +28,8 @@ from exo.shared.types.events import (
RunnerStatusUpdated,
TaskAcknowledged,
TaskStatusUpdated,
TraceEventData,
TracesCollected,
)
from exo.shared.types.tasks import (
ChatCompletion,
@@ -408,6 +411,10 @@ def main(
)
)
raise
finally:
_send_traces_if_enabled(
event_sender, task.task_id, shard_metadata.device_rank
)
current_status = RunnerReady()
logger.info("runner ready")
@@ -466,6 +473,10 @@ def main(
)
)
raise
finally:
_send_traces_if_enabled(
event_sender, task.task_id, shard_metadata.device_rank
)
current_status = RunnerReady()
logger.info("runner ready")
@@ -640,6 +651,36 @@ def _send_image_chunk(
)
def _send_traces_if_enabled(
event_sender: MpSender[Event],
task_id: TaskId,
rank: int,
) -> None:
if not is_tracing_enabled():
return
traces = get_trace_buffer()
if traces:
trace_data = [
TraceEventData(
name=t.name,
start_us=t.start_us,
duration_us=t.duration_us,
rank=t.rank,
category=t.category,
)
for t in traces
]
event_sender.send(
TracesCollected(
task_id=task_id,
rank=rank,
traces=trace_data,
)
)
clear_trace_buffer()
def _process_image_response(
response: ImageGenerationResponse | PartialImageResponse,
command_id: CommandId,