mirror of
https://github.com/exo-explore/exo.git
synced 2026-01-18 02:50:24 -05:00
Compare commits
1 Commits
alexcheema
...
alexcheema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3465afae3 |
25
AGENTS.md
25
AGENTS.md
@@ -40,31 +40,6 @@ uv run ruff check
|
||||
nix fmt
|
||||
```
|
||||
|
||||
## Pre-Commit Checks (REQUIRED)
|
||||
|
||||
**IMPORTANT: Always run these checks before committing code. CI will fail if these don't pass.**
|
||||
|
||||
```bash
|
||||
# 1. Type checking - MUST pass with 0 errors
|
||||
uv run basedpyright
|
||||
|
||||
# 2. Linting - MUST pass
|
||||
uv run ruff check
|
||||
|
||||
# 3. Formatting - MUST be applied
|
||||
nix fmt
|
||||
|
||||
# 4. Tests - MUST pass
|
||||
uv run pytest
|
||||
```
|
||||
|
||||
Run all checks in sequence:
|
||||
```bash
|
||||
uv run basedpyright && uv run ruff check && nix fmt && uv run pytest
|
||||
```
|
||||
|
||||
If `nix fmt` changes any files, stage them before committing. The CI runs `nix flake check` which verifies formatting, linting, and runs Rust tests.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Node Composition
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
<script lang="ts">
|
||||
import {
|
||||
messages,
|
||||
currentResponse,
|
||||
import {
|
||||
messages,
|
||||
currentResponse,
|
||||
isLoading,
|
||||
deleteMessage,
|
||||
editAndRegenerate,
|
||||
regenerateLastResponse,
|
||||
regenerateFromToken
|
||||
regenerateLastResponse
|
||||
} from '$lib/stores/app.svelte';
|
||||
import type { MessageAttachment } from '$lib/stores/app.svelte';
|
||||
import MarkdownContent from './MarkdownContent.svelte';
|
||||
import TokenHeatmap from './TokenHeatmap.svelte';
|
||||
|
||||
interface Props {
|
||||
class?: string;
|
||||
@@ -97,23 +95,6 @@
|
||||
let copiedMessageId = $state<string | null>(null);
|
||||
let expandedThinkingMessageIds = $state<Set<string>>(new Set());
|
||||
|
||||
// Uncertainty view state - tracks which messages show token heatmap
|
||||
let uncertaintyViewMessageIds = $state<Set<string>>(new Set());
|
||||
|
||||
function toggleUncertaintyView(messageId: string) {
|
||||
const newSet = new Set(uncertaintyViewMessageIds);
|
||||
if (newSet.has(messageId)) {
|
||||
newSet.delete(messageId);
|
||||
} else {
|
||||
newSet.add(messageId);
|
||||
}
|
||||
uncertaintyViewMessageIds = newSet;
|
||||
}
|
||||
|
||||
function isUncertaintyViewEnabled(messageId: string): boolean {
|
||||
return uncertaintyViewMessageIds.has(messageId);
|
||||
}
|
||||
|
||||
function formatTimestamp(timestamp: number): string {
|
||||
return new Date(timestamp).toLocaleTimeString('en-US', {
|
||||
hour12: false,
|
||||
@@ -385,17 +366,7 @@ function isThinkingExpanded(messageId: string): boolean {
|
||||
</div>
|
||||
{/if}
|
||||
<div class="text-xs text-foreground">
|
||||
{#if message.role === 'assistant' && isUncertaintyViewEnabled(message.id) && message.tokens && message.tokens.length > 0}
|
||||
<!-- Uncertainty heatmap view -->
|
||||
<TokenHeatmap
|
||||
tokens={message.tokens}
|
||||
isGenerating={loading}
|
||||
onRegenerateFrom={(tokenIndex) => regenerateFromToken(message.id, tokenIndex)}
|
||||
/>
|
||||
{:else}
|
||||
<!-- Normal markdown view -->
|
||||
<MarkdownContent content={message.content || (loading ? response : '')} />
|
||||
{/if}
|
||||
<MarkdownContent content={message.content || (loading ? response : '')} />
|
||||
{#if loading && !message.content}
|
||||
<span class="inline-block w-2 h-4 bg-exo-yellow/70 ml-1 cursor-blink"></span>
|
||||
{/if}
|
||||
@@ -448,20 +419,7 @@ function isThinkingExpanded(messageId: string): boolean {
|
||||
</svg>
|
||||
</button>
|
||||
{/if}
|
||||
|
||||
<!-- Uncertainty view toggle (assistant messages with tokens only) -->
|
||||
{#if message.role === 'assistant' && message.tokens && message.tokens.length > 0}
|
||||
<button
|
||||
onclick={() => toggleUncertaintyView(message.id)}
|
||||
class="p-1.5 transition-colors rounded cursor-pointer {isUncertaintyViewEnabled(message.id) ? 'text-exo-yellow' : 'text-exo-light-gray hover:text-exo-yellow'}"
|
||||
title={isUncertaintyViewEnabled(message.id) ? 'Hide uncertainty' : 'Show uncertainty'}
|
||||
>
|
||||
<svg class="w-3.5 h-3.5" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 19v-6a2 2 0 00-2-2H5a2 2 0 00-2 2v6a2 2 0 002 2h2a2 2 0 002-2zm0 0V9a2 2 0 012-2h2a2 2 0 012 2v10m-6 0a2 2 0 002 2h2a2 2 0 002-2m0 0V5a2 2 0 012-2h2a2 2 0 012 2v14a2 2 0 01-2 2h-2a2 2 0 01-2-2z" />
|
||||
</svg>
|
||||
</button>
|
||||
{/if}
|
||||
|
||||
|
||||
<!-- Delete button -->
|
||||
<button
|
||||
onclick={() => handleDeleteClick(message.id)}
|
||||
|
||||
@@ -1,192 +0,0 @@
|
||||
<script lang="ts">
|
||||
import type { TokenData } from '$lib/stores/app.svelte';
|
||||
|
||||
interface Props {
|
||||
tokens: TokenData[];
|
||||
class?: string;
|
||||
isGenerating?: boolean;
|
||||
onRegenerateFrom?: (tokenIndex: number) => void;
|
||||
}
|
||||
|
||||
let { tokens, class: className = '', isGenerating = false, onRegenerateFrom }: Props = $props();
|
||||
|
||||
// Tooltip state - track both token data and index
|
||||
let hoveredTokenIndex = $state<number | null>(null);
|
||||
let hoveredPosition = $state<{ x: number; y: number } | null>(null);
|
||||
let isTooltipHovered = $state(false);
|
||||
let hideTimeoutId: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
// Derive the hovered token from the index (stable across re-renders)
|
||||
const hoveredToken = $derived(
|
||||
hoveredTokenIndex !== null && hoveredPosition && tokens[hoveredTokenIndex]
|
||||
? { token: tokens[hoveredTokenIndex], index: hoveredTokenIndex, ...hoveredPosition }
|
||||
: null
|
||||
);
|
||||
|
||||
/**
|
||||
* Get confidence styling based on probability.
|
||||
* Following Apple design principles: high confidence tokens blend in,
|
||||
* only uncertainty draws attention.
|
||||
*/
|
||||
function getConfidenceClass(probability: number): string {
|
||||
if (probability > 0.8) return 'text-inherit'; // Expected tokens - blend in
|
||||
if (probability > 0.5) return 'bg-gray-500/10 text-inherit'; // Slight hint
|
||||
if (probability > 0.2) return 'bg-amber-500/15 text-amber-200/90'; // Subtle warmth
|
||||
return 'bg-red-500/20 text-red-200/90'; // Draws attention
|
||||
}
|
||||
|
||||
/**
|
||||
* Get border/underline styling for uncertain tokens
|
||||
*/
|
||||
function getBorderClass(probability: number): string {
|
||||
if (probability > 0.8) return 'border-transparent'; // No border for expected
|
||||
if (probability > 0.5) return 'border-gray-500/20';
|
||||
if (probability > 0.2) return 'border-amber-500/30';
|
||||
return 'border-red-500/40';
|
||||
}
|
||||
|
||||
function clearHideTimeout() {
|
||||
if (hideTimeoutId) {
|
||||
clearTimeout(hideTimeoutId);
|
||||
hideTimeoutId = null;
|
||||
}
|
||||
}
|
||||
|
||||
function handleMouseEnter(event: MouseEvent, token: TokenData, index: number) {
|
||||
clearHideTimeout();
|
||||
const rect = (event.target as HTMLElement).getBoundingClientRect();
|
||||
hoveredTokenIndex = index;
|
||||
hoveredPosition = {
|
||||
x: rect.left + rect.width / 2,
|
||||
y: rect.top - 10
|
||||
};
|
||||
}
|
||||
|
||||
function handleMouseLeave() {
|
||||
clearHideTimeout();
|
||||
// Use longer delay during generation to account for re-renders
|
||||
const delay = isGenerating ? 300 : 100;
|
||||
hideTimeoutId = setTimeout(() => {
|
||||
if (!isTooltipHovered) {
|
||||
hoveredTokenIndex = null;
|
||||
hoveredPosition = null;
|
||||
}
|
||||
}, delay);
|
||||
}
|
||||
|
||||
function handleTooltipEnter() {
|
||||
clearHideTimeout();
|
||||
isTooltipHovered = true;
|
||||
}
|
||||
|
||||
function handleTooltipLeave() {
|
||||
isTooltipHovered = false;
|
||||
hoveredTokenIndex = null;
|
||||
hoveredPosition = null;
|
||||
}
|
||||
|
||||
function handleRegenerate() {
|
||||
if (hoveredToken && onRegenerateFrom) {
|
||||
const indexToRegenerate = hoveredToken.index;
|
||||
// Clear hover state immediately
|
||||
hoveredTokenIndex = null;
|
||||
hoveredPosition = null;
|
||||
isTooltipHovered = false;
|
||||
// Call regenerate
|
||||
onRegenerateFrom(indexToRegenerate);
|
||||
}
|
||||
}
|
||||
|
||||
function formatProbability(prob: number): string {
|
||||
return (prob * 100).toFixed(1) + '%';
|
||||
}
|
||||
|
||||
function formatLogprob(logprob: number): string {
|
||||
return logprob.toFixed(3);
|
||||
}
|
||||
|
||||
function getProbabilityColor(probability: number): string {
|
||||
if (probability > 0.8) return 'text-gray-300';
|
||||
if (probability > 0.5) return 'text-gray-400';
|
||||
if (probability > 0.2) return 'text-amber-400';
|
||||
return 'text-red-400';
|
||||
}
|
||||
</script>
|
||||
|
||||
<div class="token-heatmap leading-relaxed {className}">
|
||||
{#each tokens as tokenData, i (i)}
|
||||
<span
|
||||
role="button"
|
||||
tabindex="0"
|
||||
class="token-span inline rounded px-0.5 py-0.5 cursor-pointer transition-all duration-150 border {getConfidenceClass(tokenData.probability)} {getBorderClass(tokenData.probability)} hover:opacity-80"
|
||||
onmouseenter={(e) => handleMouseEnter(e, tokenData, i)}
|
||||
onmouseleave={handleMouseLeave}
|
||||
>{tokenData.token}</span>
|
||||
{/each}
|
||||
</div>
|
||||
|
||||
<!-- Tooltip -->
|
||||
{#if hoveredToken}
|
||||
<div
|
||||
class="fixed z-50"
|
||||
style="left: {hoveredToken.x}px; top: {hoveredToken.y}px; transform: translate(-50%, -100%);"
|
||||
onmouseenter={handleTooltipEnter}
|
||||
onmouseleave={handleTooltipLeave}
|
||||
>
|
||||
<div class="bg-gray-900/95 backdrop-blur-sm border border-gray-700/50 rounded-xl shadow-xl p-3 text-sm min-w-48">
|
||||
<!-- Token info -->
|
||||
<div class="mb-2">
|
||||
<span class="text-gray-500 text-xs">Token:</span>
|
||||
<span class="text-white font-mono ml-1">"{hoveredToken.token.token}"</span>
|
||||
<span class="{getProbabilityColor(hoveredToken.token.probability)} ml-2">{formatProbability(hoveredToken.token.probability)}</span>
|
||||
</div>
|
||||
|
||||
<div class="text-gray-400 text-xs mb-1">
|
||||
logprob: <span class="text-gray-300 font-mono">{formatLogprob(hoveredToken.token.logprob)}</span>
|
||||
</div>
|
||||
|
||||
<!-- Top alternatives -->
|
||||
{#if hoveredToken.token.topLogprobs.length > 0}
|
||||
<div class="border-t border-gray-700/50 mt-2 pt-2">
|
||||
<div class="text-gray-500 text-xs mb-1">Alternatives:</div>
|
||||
{#each hoveredToken.token.topLogprobs.slice(0, 5) as alt, idx (idx)}
|
||||
{@const altProb = Math.exp(alt.logprob)}
|
||||
<div class="flex justify-between items-center text-xs py-0.5">
|
||||
<span class="text-gray-300 font-mono truncate max-w-24">"{alt.token}"</span>
|
||||
<span class="text-gray-400 ml-2">{formatProbability(altProb)}</span>
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
<!-- Regenerate button -->
|
||||
{#if onRegenerateFrom}
|
||||
<button
|
||||
onclick={handleRegenerate}
|
||||
class="w-full mt-2 pt-2 border-t border-gray-700/50 flex items-center justify-center gap-1.5 text-xs text-gray-400 hover:text-white transition-colors cursor-pointer"
|
||||
>
|
||||
<svg class="w-3 h-3" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
|
||||
</svg>
|
||||
Regenerate from here
|
||||
</button>
|
||||
{/if}
|
||||
</div>
|
||||
<!-- Arrow -->
|
||||
<div class="absolute left-1/2 -translate-x-1/2 top-full">
|
||||
<div class="border-8 border-transparent border-t-gray-900"></div>
|
||||
</div>
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
<style>
|
||||
.token-heatmap {
|
||||
word-wrap: break-word;
|
||||
white-space: pre-wrap;
|
||||
}
|
||||
|
||||
.token-span {
|
||||
margin: 0;
|
||||
border-width: 1px;
|
||||
}
|
||||
</style>
|
||||
@@ -71,35 +71,36 @@ export interface Instance {
|
||||
};
|
||||
}
|
||||
|
||||
interface RawNodeProfile {
|
||||
modelId?: string;
|
||||
chipId?: string;
|
||||
friendlyName?: string;
|
||||
networkInterfaces?: Array<{
|
||||
name?: string;
|
||||
ipAddress?: string;
|
||||
addresses?: Array<{ address?: string } | string>;
|
||||
ipv4?: string;
|
||||
ipv6?: string;
|
||||
ipAddresses?: string[];
|
||||
ips?: string[];
|
||||
}>;
|
||||
memory?: {
|
||||
ramTotal?: { inBytes: number };
|
||||
ramAvailable?: { inBytes: number };
|
||||
swapTotal?: { inBytes: number };
|
||||
swapAvailable?: { inBytes: number };
|
||||
};
|
||||
system?: {
|
||||
gpuUsage?: number;
|
||||
temp?: number;
|
||||
sysPower?: number;
|
||||
};
|
||||
// Split state interfaces
|
||||
interface RawNodeIdentity {
|
||||
modelId: string;
|
||||
chipId: string;
|
||||
friendlyName: string;
|
||||
}
|
||||
|
||||
interface RawNodeMemory {
|
||||
ramTotal: { inBytes: number };
|
||||
ramAvailable: { inBytes: number };
|
||||
swapTotal: { inBytes: number };
|
||||
swapAvailable: { inBytes: number };
|
||||
}
|
||||
|
||||
interface RawNodeSystem {
|
||||
gpuUsage?: number;
|
||||
temp?: number;
|
||||
sysPower?: number;
|
||||
pcpuUsage?: number;
|
||||
ecpuUsage?: number;
|
||||
anePower?: number;
|
||||
}
|
||||
|
||||
interface RawNetworkInterface {
|
||||
name: string;
|
||||
ipAddress: string;
|
||||
}
|
||||
|
||||
interface RawTopologyNode {
|
||||
nodeId: string;
|
||||
nodeProfile: RawNodeProfile;
|
||||
}
|
||||
|
||||
interface RawTopologyConnection {
|
||||
@@ -115,8 +116,6 @@ interface RawTopology {
|
||||
connections?: RawTopologyConnection[];
|
||||
}
|
||||
|
||||
type RawNodeProfiles = Record<string, RawNodeProfile>;
|
||||
|
||||
export interface DownloadProgress {
|
||||
totalBytes: number;
|
||||
downloadedBytes: number;
|
||||
@@ -171,7 +170,11 @@ interface RawStateResponse {
|
||||
>;
|
||||
runners?: Record<string, unknown>;
|
||||
downloads?: Record<string, unknown[]>;
|
||||
nodeProfiles?: RawNodeProfiles;
|
||||
// Split state fields
|
||||
nodeIdentities?: Record<string, RawNodeIdentity>;
|
||||
nodeMemories?: Record<string, RawNodeMemory>;
|
||||
nodeSystems?: Record<string, RawNodeSystem>;
|
||||
nodeNetworks?: Record<string, RawNetworkInterface[]>;
|
||||
}
|
||||
|
||||
export interface MessageAttachment {
|
||||
@@ -182,20 +185,6 @@ export interface MessageAttachment {
|
||||
mimeType?: string;
|
||||
}
|
||||
|
||||
// Token-level data for uncertainty visualization
|
||||
export interface TopLogprob {
|
||||
token: string;
|
||||
logprob: number;
|
||||
bytes?: number[];
|
||||
}
|
||||
|
||||
export interface TokenData {
|
||||
token: string;
|
||||
logprob: number;
|
||||
probability: number; // exp(logprob)
|
||||
topLogprobs: TopLogprob[];
|
||||
}
|
||||
|
||||
export interface Message {
|
||||
id: string;
|
||||
role: "user" | "assistant" | "system";
|
||||
@@ -205,7 +194,6 @@ export interface Message {
|
||||
attachments?: MessageAttachment[];
|
||||
ttftMs?: number; // Time to first token in ms (for assistant messages)
|
||||
tps?: number; // Tokens per second (for assistant messages)
|
||||
tokens?: TokenData[]; // Token-level data for uncertainty visualization
|
||||
}
|
||||
|
||||
export interface Conversation {
|
||||
@@ -223,66 +211,41 @@ const STORAGE_KEY = "exo-conversations";
|
||||
|
||||
function transformTopology(
|
||||
raw: RawTopology,
|
||||
profiles?: RawNodeProfiles,
|
||||
identities?: Record<string, RawNodeIdentity>,
|
||||
memories?: Record<string, RawNodeMemory>,
|
||||
systems?: Record<string, RawNodeSystem>,
|
||||
networks?: Record<string, RawNetworkInterface[]>,
|
||||
): TopologyData {
|
||||
const nodes: Record<string, NodeInfo> = {};
|
||||
const edges: TopologyEdge[] = [];
|
||||
|
||||
for (const node of raw.nodes || []) {
|
||||
const mergedProfile = profiles?.[node.nodeId];
|
||||
const profile = { ...(node.nodeProfile ?? {}), ...(mergedProfile ?? {}) };
|
||||
const ramTotal = profile?.memory?.ramTotal?.inBytes ?? 0;
|
||||
const ramAvailable = profile?.memory?.ramAvailable?.inBytes ?? 0;
|
||||
// Get split state fields (may be undefined if events haven't arrived yet)
|
||||
const identity = identities?.[node.nodeId];
|
||||
const memory = memories?.[node.nodeId];
|
||||
const system = systems?.[node.nodeId];
|
||||
const network = networks?.[node.nodeId];
|
||||
|
||||
const ramTotal = memory?.ramTotal?.inBytes ?? 0;
|
||||
const ramAvailable = memory?.ramAvailable?.inBytes ?? 0;
|
||||
const ramUsage = Math.max(ramTotal - ramAvailable, 0);
|
||||
|
||||
const networkInterfaces = (profile?.networkInterfaces || []).map(
|
||||
(iface) => {
|
||||
const addresses: string[] = [];
|
||||
if (iface.ipAddress && typeof iface.ipAddress === "string") {
|
||||
addresses.push(iface.ipAddress);
|
||||
}
|
||||
if (Array.isArray(iface.addresses)) {
|
||||
for (const addr of iface.addresses) {
|
||||
if (typeof addr === "string") addresses.push(addr);
|
||||
else if (addr && typeof addr === "object" && addr.address)
|
||||
addresses.push(addr.address);
|
||||
}
|
||||
}
|
||||
if (Array.isArray(iface.ipAddresses)) {
|
||||
addresses.push(
|
||||
...iface.ipAddresses.filter(
|
||||
(a): a is string => typeof a === "string",
|
||||
),
|
||||
);
|
||||
}
|
||||
if (Array.isArray(iface.ips)) {
|
||||
addresses.push(
|
||||
...iface.ips.filter((a): a is string => typeof a === "string"),
|
||||
);
|
||||
}
|
||||
if (iface.ipv4 && typeof iface.ipv4 === "string")
|
||||
addresses.push(iface.ipv4);
|
||||
if (iface.ipv6 && typeof iface.ipv6 === "string")
|
||||
addresses.push(iface.ipv6);
|
||||
|
||||
return {
|
||||
name: iface.name,
|
||||
addresses: Array.from(new Set(addresses)),
|
||||
};
|
||||
},
|
||||
);
|
||||
const networkInterfaces = (network ?? []).map((iface) => ({
|
||||
name: iface.name,
|
||||
addresses: [iface.ipAddress],
|
||||
}));
|
||||
|
||||
const ipToInterface: Record<string, string> = {};
|
||||
for (const iface of networkInterfaces) {
|
||||
for (const addr of iface.addresses || []) {
|
||||
ipToInterface[addr] = iface.name ?? "";
|
||||
for (const addr of iface.addresses) {
|
||||
ipToInterface[addr] = iface.name;
|
||||
}
|
||||
}
|
||||
|
||||
nodes[node.nodeId] = {
|
||||
system_info: {
|
||||
model_id: profile?.modelId ?? "Unknown",
|
||||
chip: profile?.chipId,
|
||||
model_id: identity?.modelId ?? "Unknown",
|
||||
chip: identity?.chipId,
|
||||
memory: ramTotal,
|
||||
},
|
||||
network_interfaces: networkInterfaces,
|
||||
@@ -293,17 +256,15 @@ function transformTopology(
|
||||
ram_total: ramTotal,
|
||||
},
|
||||
temp:
|
||||
profile?.system?.temp !== undefined
|
||||
? { gpu_temp_avg: profile.system.temp }
|
||||
system?.temp !== undefined
|
||||
? { gpu_temp_avg: system.temp }
|
||||
: undefined,
|
||||
gpu_usage:
|
||||
profile?.system?.gpuUsage !== undefined
|
||||
? [0, profile.system.gpuUsage]
|
||||
: undefined,
|
||||
sys_power: profile?.system?.sysPower,
|
||||
system?.gpuUsage !== undefined ? [0, system.gpuUsage] : undefined,
|
||||
sys_power: system?.sysPower,
|
||||
},
|
||||
last_macmon_update: Date.now() / 1000,
|
||||
friendly_name: profile?.friendlyName,
|
||||
friendly_name: identity?.friendlyName,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -383,21 +344,6 @@ class AppStore {
|
||||
private fetchInterval: ReturnType<typeof setInterval> | null = null;
|
||||
private previewsInterval: ReturnType<typeof setInterval> | null = null;
|
||||
private lastConversationPersistTs = 0;
|
||||
private currentRequestController: AbortController | null = null;
|
||||
|
||||
/**
|
||||
* Abort any in-flight generation request
|
||||
*/
|
||||
abortCurrentRequest(): boolean {
|
||||
if (this.currentRequestController) {
|
||||
this.currentRequestController.abort();
|
||||
this.currentRequestController = null;
|
||||
this.isLoading = false;
|
||||
this.currentResponse = "";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
if (browser) {
|
||||
@@ -898,7 +844,13 @@ class AppStore {
|
||||
const data: RawStateResponse = await response.json();
|
||||
|
||||
if (data.topology) {
|
||||
this.topologyData = transformTopology(data.topology, data.nodeProfiles);
|
||||
this.topologyData = transformTopology(
|
||||
data.topology,
|
||||
data.nodeIdentities,
|
||||
data.nodeMemories,
|
||||
data.nodeSystems,
|
||||
data.nodeNetworks,
|
||||
);
|
||||
}
|
||||
if (data.instances) {
|
||||
this.instances = data.instances;
|
||||
@@ -1428,10 +1380,6 @@ class AppStore {
|
||||
let firstTokenTime: number | null = null;
|
||||
let tokenCount = 0;
|
||||
|
||||
// Create abort controller for this request
|
||||
const controller = new AbortController();
|
||||
this.currentRequestController = controller;
|
||||
|
||||
const response = await fetch("/v1/chat/completions", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
@@ -1442,10 +1390,7 @@ class AppStore {
|
||||
messages: apiMessages,
|
||||
temperature: 0.7,
|
||||
stream: true,
|
||||
logprobs: true,
|
||||
top_logprobs: 5,
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
@@ -1461,7 +1406,6 @@ class AppStore {
|
||||
const decoder = new TextDecoder();
|
||||
let fullContent = "";
|
||||
let buffer = "";
|
||||
const collectedTokens: TokenData[] = [];
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
@@ -1483,8 +1427,8 @@ class AppStore {
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(data);
|
||||
const delta = parsed.choices?.[0]?.delta?.content;
|
||||
if (delta) {
|
||||
const tokenContent = parsed.choices?.[0]?.delta?.content;
|
||||
if (tokenContent) {
|
||||
// Track first token for TTFT
|
||||
if (firstTokenTime === null) {
|
||||
firstTokenTime = performance.now();
|
||||
@@ -1501,30 +1445,7 @@ class AppStore {
|
||||
this.tps = (tokenCount / elapsed) * 1000;
|
||||
}
|
||||
|
||||
// Extract logprobs for uncertainty visualization
|
||||
const logprobsData = parsed.choices?.[0]?.logprobs;
|
||||
if (logprobsData?.content?.[0]) {
|
||||
const logprobItem = logprobsData.content[0];
|
||||
const tokenData: TokenData = {
|
||||
token: logprobItem.token || delta,
|
||||
logprob: logprobItem.logprob ?? 0,
|
||||
probability: Math.exp(logprobItem.logprob ?? 0),
|
||||
topLogprobs: (logprobItem.top_logprobs || []).map(
|
||||
(item: {
|
||||
token: string;
|
||||
logprob: number;
|
||||
bytes?: number[];
|
||||
}) => ({
|
||||
token: item.token,
|
||||
logprob: item.logprob,
|
||||
bytes: item.bytes,
|
||||
}),
|
||||
),
|
||||
};
|
||||
collectedTokens.push(tokenData);
|
||||
}
|
||||
|
||||
fullContent += delta;
|
||||
fullContent += tokenContent;
|
||||
|
||||
// Strip thinking tags for display and extract thinking content
|
||||
const { displayContent, thinkingContent } =
|
||||
@@ -1538,7 +1459,6 @@ class AppStore {
|
||||
if (idx !== -1) {
|
||||
this.messages[idx].content = displayContent;
|
||||
this.messages[idx].thinking = thinkingContent || undefined;
|
||||
this.messages[idx].tokens = [...collectedTokens];
|
||||
}
|
||||
this.persistActiveConversation();
|
||||
}
|
||||
@@ -1586,16 +1506,9 @@ class AppStore {
|
||||
if (this.tps !== null) {
|
||||
this.messages[idx].tps = this.tps;
|
||||
}
|
||||
if (collectedTokens.length > 0) {
|
||||
this.messages[idx].tokens = collectedTokens;
|
||||
}
|
||||
}
|
||||
this.persistActiveConversation();
|
||||
} catch (error) {
|
||||
// Don't show error for aborted requests (user cancelled)
|
||||
if (error instanceof Error && error.name === "AbortError") {
|
||||
return;
|
||||
}
|
||||
console.error("Error sending message:", error);
|
||||
// Update the assistant message with error
|
||||
const idx = this.messages.findIndex((m) => m.id === assistantMessage.id);
|
||||
@@ -1605,237 +1518,6 @@ class AppStore {
|
||||
}
|
||||
this.persistActiveConversation();
|
||||
} finally {
|
||||
// Clean up controller if this is still the active request
|
||||
if (this.currentRequestController === controller) {
|
||||
this.currentRequestController = null;
|
||||
}
|
||||
this.isLoading = false;
|
||||
this.currentResponse = "";
|
||||
this.updateActiveConversation();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Regenerate from a specific token in an assistant message.
|
||||
* Keeps content up to and including the specified token, then continues generation.
|
||||
* If a generation is already in progress, it will be aborted first.
|
||||
*/
|
||||
async regenerateFromToken(
|
||||
messageId: string,
|
||||
tokenIndex: number,
|
||||
): Promise<void> {
|
||||
// Abort any in-flight request first
|
||||
this.abortCurrentRequest();
|
||||
|
||||
const messageIdx = this.messages.findIndex((m) => m.id === messageId);
|
||||
if (messageIdx === -1) return;
|
||||
|
||||
const message = this.messages[messageIdx];
|
||||
if (message.role !== "assistant" || !message.tokens) return;
|
||||
|
||||
// Get tokens up to and including the specified index
|
||||
const tokensToKeep = message.tokens.slice(0, tokenIndex + 1);
|
||||
const prefixText = tokensToKeep.map((t) => t.token).join("");
|
||||
|
||||
// Remove all messages after this assistant message
|
||||
this.messages = this.messages.slice(0, messageIdx + 1);
|
||||
|
||||
// Update the message to show the prefix
|
||||
this.messages[messageIdx].content = prefixText;
|
||||
this.messages[messageIdx].tokens = tokensToKeep;
|
||||
|
||||
// Set up for continuation
|
||||
this.isLoading = true;
|
||||
this.currentResponse = prefixText;
|
||||
this.ttftMs = null;
|
||||
this.tps = null;
|
||||
this.totalTokens = tokensToKeep.length;
|
||||
|
||||
try {
|
||||
// Build messages for API - include the partial assistant message
|
||||
const systemPrompt = {
|
||||
role: "system" as const,
|
||||
content:
|
||||
"You are a helpful AI assistant. Respond directly and concisely. Do not show your reasoning or thought process.",
|
||||
};
|
||||
|
||||
// Get all messages up to and including the one we're regenerating from
|
||||
const apiMessages = [
|
||||
systemPrompt,
|
||||
...this.messages.map((m) => {
|
||||
let msgContent = m.content;
|
||||
if (m.attachments) {
|
||||
for (const attachment of m.attachments) {
|
||||
if (attachment.type === "text" && attachment.content) {
|
||||
msgContent += `\n\n[File: ${attachment.name}]\n\`\`\`\n${attachment.content}\n\`\`\``;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { role: m.role, content: msgContent };
|
||||
}),
|
||||
];
|
||||
|
||||
// Determine model
|
||||
let modelToUse = this.selectedChatModel;
|
||||
if (!modelToUse) {
|
||||
for (const [, instanceWrapper] of Object.entries(this.instances)) {
|
||||
if (instanceWrapper && typeof instanceWrapper === "object") {
|
||||
const keys = Object.keys(
|
||||
instanceWrapper as Record<string, unknown>,
|
||||
);
|
||||
if (keys.length === 1) {
|
||||
const instance = (instanceWrapper as Record<string, unknown>)[
|
||||
keys[0]
|
||||
] as { shardAssignments?: { modelId?: string } };
|
||||
if (instance?.shardAssignments?.modelId) {
|
||||
modelToUse = instance.shardAssignments.modelId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!modelToUse) {
|
||||
throw new Error("No model available");
|
||||
}
|
||||
|
||||
// Start timing
|
||||
const requestStartTime = performance.now();
|
||||
let firstTokenTime: number | null = null;
|
||||
let tokenCount = tokensToKeep.length;
|
||||
|
||||
// Create abort controller
|
||||
const controller = new AbortController();
|
||||
this.currentRequestController = controller;
|
||||
|
||||
const response = await fetch("/v1/chat/completions", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
model: modelToUse,
|
||||
messages: apiMessages,
|
||||
stream: true,
|
||||
logprobs: true,
|
||||
top_logprobs: 5,
|
||||
continue_from_prefix: true,
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text();
|
||||
throw new Error(`API error: ${response.status} - ${errorText}`);
|
||||
}
|
||||
|
||||
const reader = response.body?.getReader();
|
||||
if (!reader) throw new Error("No response body");
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
let fullContent = prefixText;
|
||||
let buffer = "";
|
||||
const collectedTokens: TokenData[] = [...tokensToKeep];
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed || trimmed === "data: [DONE]") continue;
|
||||
|
||||
if (trimmed.startsWith("data: ")) {
|
||||
try {
|
||||
const json = JSON.parse(trimmed.slice(6));
|
||||
const delta = json.choices?.[0]?.delta?.content;
|
||||
if (delta) {
|
||||
if (firstTokenTime === null) {
|
||||
firstTokenTime = performance.now();
|
||||
this.ttftMs = firstTokenTime - requestStartTime;
|
||||
}
|
||||
|
||||
tokenCount += 1;
|
||||
this.totalTokens = tokenCount;
|
||||
|
||||
if (
|
||||
firstTokenTime !== null &&
|
||||
tokenCount > tokensToKeep.length
|
||||
) {
|
||||
const elapsed = performance.now() - firstTokenTime;
|
||||
this.tps =
|
||||
((tokenCount - tokensToKeep.length) / elapsed) * 1000;
|
||||
}
|
||||
|
||||
// Extract logprobs
|
||||
const logprobsData = json.choices?.[0]?.logprobs;
|
||||
if (logprobsData?.content?.[0]) {
|
||||
const logprobItem = logprobsData.content[0];
|
||||
collectedTokens.push({
|
||||
token: logprobItem.token || delta,
|
||||
logprob: logprobItem.logprob ?? 0,
|
||||
probability: Math.exp(logprobItem.logprob ?? 0),
|
||||
topLogprobs: (logprobItem.top_logprobs || []).map(
|
||||
(item: {
|
||||
token: string;
|
||||
logprob: number;
|
||||
bytes?: number[];
|
||||
}) => ({
|
||||
token: item.token,
|
||||
logprob: item.logprob,
|
||||
bytes: item.bytes,
|
||||
}),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
fullContent += delta;
|
||||
const { displayContent, thinkingContent } =
|
||||
this.stripThinkingTags(fullContent);
|
||||
this.currentResponse = displayContent;
|
||||
|
||||
this.messages[messageIdx].content = displayContent;
|
||||
this.messages[messageIdx].thinking =
|
||||
thinkingContent || undefined;
|
||||
this.messages[messageIdx].tokens = [...collectedTokens];
|
||||
this.persistActiveConversation();
|
||||
}
|
||||
} catch {
|
||||
// Skip malformed JSON
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Final update
|
||||
const { displayContent, thinkingContent } =
|
||||
this.stripThinkingTags(fullContent);
|
||||
this.messages[messageIdx].content = displayContent;
|
||||
this.messages[messageIdx].thinking = thinkingContent || undefined;
|
||||
this.messages[messageIdx].tokens = collectedTokens;
|
||||
|
||||
if (this.ttftMs !== null) {
|
||||
this.messages[messageIdx].ttftMs = this.ttftMs;
|
||||
}
|
||||
if (this.tps !== null) {
|
||||
this.messages[messageIdx].tps = this.tps;
|
||||
}
|
||||
this.persistActiveConversation();
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.name === "AbortError") {
|
||||
return;
|
||||
}
|
||||
console.error("Error regenerating from token:", error);
|
||||
this.messages[messageIdx].content =
|
||||
`${prefixText}\n\nError: ${error instanceof Error ? error.message : "Unknown error"}`;
|
||||
this.persistActiveConversation();
|
||||
} finally {
|
||||
if (this.currentRequestController === controller) {
|
||||
this.currentRequestController = null;
|
||||
}
|
||||
this.isLoading = false;
|
||||
this.currentResponse = "";
|
||||
this.updateActiveConversation();
|
||||
@@ -1915,8 +1597,6 @@ export const editMessage = (messageId: string, newContent: string) =>
|
||||
export const editAndRegenerate = (messageId: string, newContent: string) =>
|
||||
appStore.editAndRegenerate(messageId, newContent);
|
||||
export const regenerateLastResponse = () => appStore.regenerateLastResponse();
|
||||
export const regenerateFromToken = (messageId: string, tokenIndex: number) =>
|
||||
appStore.regenerateFromToken(messageId, tokenIndex);
|
||||
|
||||
// Conversation actions
|
||||
export const conversations = () => appStore.conversations;
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
"""API adapters for different API formats (Claude, OpenAI Responses, etc.)."""
|
||||
@@ -1,175 +0,0 @@
|
||||
"""OpenAI Chat Completions API adapter for converting requests/responses."""
|
||||
|
||||
import time
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from exo.shared.types.api import (
|
||||
ChatCompletionChoice,
|
||||
ChatCompletionMessage,
|
||||
ChatCompletionMessageText,
|
||||
ChatCompletionResponse,
|
||||
ChatCompletionTaskParams,
|
||||
ErrorInfo,
|
||||
ErrorResponse,
|
||||
FinishReason,
|
||||
Logprobs,
|
||||
LogprobsContentItem,
|
||||
StreamingChoiceResponse,
|
||||
)
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.openai_responses import ResponseInputMessage, ResponsesRequest
|
||||
|
||||
|
||||
def chat_request_to_internal(request: ChatCompletionTaskParams) -> ResponsesRequest:
|
||||
"""Convert Chat Completions API request to ResponsesRequest (canonical internal format).
|
||||
|
||||
Extracts system message as instructions, converts messages to input.
|
||||
"""
|
||||
instructions: str | None = None
|
||||
input_messages: list[ResponseInputMessage] = []
|
||||
|
||||
for msg in request.messages:
|
||||
# Normalize content to string
|
||||
content: str
|
||||
if msg.content is None:
|
||||
content = ""
|
||||
elif isinstance(msg.content, str):
|
||||
content = msg.content
|
||||
elif isinstance(msg.content, ChatCompletionMessageText):
|
||||
content = msg.content.text
|
||||
else:
|
||||
# List of ChatCompletionMessageText
|
||||
content = "\n".join(item.text for item in msg.content)
|
||||
|
||||
# Extract system message as instructions
|
||||
if msg.role == "system":
|
||||
if instructions is None:
|
||||
instructions = content
|
||||
else:
|
||||
# Append additional system messages
|
||||
instructions = f"{instructions}\n{content}"
|
||||
else:
|
||||
# Convert to ResponseInputMessage (only user, assistant, developer roles)
|
||||
if msg.role in ("user", "assistant", "developer"):
|
||||
input_messages.append(
|
||||
ResponseInputMessage(role=msg.role, content=content)
|
||||
)
|
||||
|
||||
return ResponsesRequest(
|
||||
model=request.model,
|
||||
input=input_messages if input_messages else "",
|
||||
instructions=instructions,
|
||||
max_output_tokens=request.max_tokens,
|
||||
temperature=request.temperature,
|
||||
top_p=request.top_p,
|
||||
top_k=request.top_k,
|
||||
stop=request.stop,
|
||||
seed=request.seed,
|
||||
stream=request.stream,
|
||||
tools=request.tools,
|
||||
continue_from_prefix=request.continue_from_prefix,
|
||||
)
|
||||
|
||||
|
||||
def chunk_to_response(
|
||||
chunk: TokenChunk, command_id: CommandId
|
||||
) -> ChatCompletionResponse:
|
||||
"""Convert a TokenChunk to a streaming ChatCompletionResponse."""
|
||||
# Build logprobs if available
|
||||
logprobs: Logprobs | None = None
|
||||
if chunk.logprob is not None:
|
||||
logprobs = Logprobs(
|
||||
content=[
|
||||
LogprobsContentItem(
|
||||
token=chunk.text,
|
||||
logprob=chunk.logprob,
|
||||
top_logprobs=chunk.top_logprobs or [],
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
return ChatCompletionResponse(
|
||||
id=command_id,
|
||||
created=int(time.time()),
|
||||
model=chunk.model,
|
||||
choices=[
|
||||
StreamingChoiceResponse(
|
||||
index=0,
|
||||
delta=ChatCompletionMessage(role="assistant", content=chunk.text),
|
||||
logprobs=logprobs,
|
||||
finish_reason=chunk.finish_reason,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
async def generate_chat_stream(
|
||||
command_id: CommandId,
|
||||
chunk_stream: AsyncGenerator[TokenChunk, None],
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Generate Chat Completions API streaming events from TokenChunks."""
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.finish_reason == "error":
|
||||
error_response = ErrorResponse(
|
||||
error=ErrorInfo(
|
||||
message=chunk.error_message or "Internal server error",
|
||||
type="InternalServerError",
|
||||
code=500,
|
||||
)
|
||||
)
|
||||
yield f"data: {error_response.model_dump_json()}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
chunk_response = chunk_to_response(chunk, command_id)
|
||||
yield f"data: {chunk_response.model_dump_json()}\n\n"
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
|
||||
async def collect_chat_response(
|
||||
command_id: CommandId,
|
||||
chunk_stream: AsyncGenerator[TokenChunk, None],
|
||||
) -> ChatCompletionResponse:
|
||||
"""Collect all token chunks and return a single ChatCompletionResponse."""
|
||||
text_parts: list[str] = []
|
||||
model: str | None = None
|
||||
finish_reason: FinishReason | None = None
|
||||
error_message: str | None = None
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.finish_reason == "error":
|
||||
error_message = chunk.error_message or "Internal server error"
|
||||
break
|
||||
|
||||
if model is None:
|
||||
model = chunk.model
|
||||
|
||||
text_parts.append(chunk.text)
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
finish_reason = chunk.finish_reason
|
||||
|
||||
if error_message is not None:
|
||||
raise ValueError(error_message)
|
||||
|
||||
combined_text = "".join(text_parts)
|
||||
assert model is not None
|
||||
|
||||
return ChatCompletionResponse(
|
||||
id=command_id,
|
||||
created=int(time.time()),
|
||||
model=model,
|
||||
choices=[
|
||||
ChatCompletionChoice(
|
||||
index=0,
|
||||
message=ChatCompletionMessage(
|
||||
role="assistant",
|
||||
content=combined_text,
|
||||
),
|
||||
finish_reason=finish_reason,
|
||||
)
|
||||
],
|
||||
)
|
||||
@@ -1,190 +0,0 @@
|
||||
"""Claude Messages API adapter for converting requests/responses."""
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from exo.shared.types.api import FinishReason
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.claude_api import (
|
||||
ClaudeContentBlockDeltaEvent,
|
||||
ClaudeContentBlockStartEvent,
|
||||
ClaudeContentBlockStopEvent,
|
||||
ClaudeMessageDelta,
|
||||
ClaudeMessageDeltaEvent,
|
||||
ClaudeMessageDeltaUsage,
|
||||
ClaudeMessagesRequest,
|
||||
ClaudeMessagesResponse,
|
||||
ClaudeMessageStart,
|
||||
ClaudeMessageStartEvent,
|
||||
ClaudeMessageStopEvent,
|
||||
ClaudeStopReason,
|
||||
ClaudeTextBlock,
|
||||
ClaudeTextDelta,
|
||||
ClaudeUsage,
|
||||
)
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.openai_responses import ResponseInputMessage, ResponsesRequest
|
||||
|
||||
|
||||
def finish_reason_to_claude_stop_reason(
|
||||
finish_reason: FinishReason | None,
|
||||
) -> ClaudeStopReason | None:
|
||||
"""Map OpenAI finish_reason to Claude stop_reason."""
|
||||
if finish_reason is None:
|
||||
return None
|
||||
mapping: dict[FinishReason, ClaudeStopReason] = {
|
||||
"stop": "end_turn",
|
||||
"length": "max_tokens",
|
||||
"tool_calls": "tool_use",
|
||||
"content_filter": "end_turn",
|
||||
"function_call": "tool_use",
|
||||
}
|
||||
return mapping.get(finish_reason, "end_turn")
|
||||
|
||||
|
||||
def claude_request_to_internal(request: ClaudeMessagesRequest) -> ResponsesRequest:
|
||||
"""Convert Claude Messages API request to ResponsesRequest (canonical internal format).
|
||||
|
||||
Converts Claude's system parameter to instructions,
|
||||
and messages to input.
|
||||
"""
|
||||
# Handle system message
|
||||
instructions: str | None = None
|
||||
if request.system:
|
||||
if isinstance(request.system, str):
|
||||
instructions = request.system
|
||||
else:
|
||||
# List of text blocks
|
||||
instructions = "".join(block.text for block in request.system)
|
||||
|
||||
# Convert messages to input
|
||||
input_messages: list[ResponseInputMessage] = []
|
||||
for msg in request.messages:
|
||||
content: str
|
||||
if isinstance(msg.content, str):
|
||||
content = msg.content
|
||||
else:
|
||||
# Concatenate text blocks (images not supported for MVP)
|
||||
text_parts: list[str] = []
|
||||
for block in msg.content:
|
||||
if isinstance(block, ClaudeTextBlock):
|
||||
text_parts.append(block.text)
|
||||
content = "".join(text_parts)
|
||||
|
||||
# Claude uses "user" and "assistant" roles
|
||||
input_messages.append(ResponseInputMessage(role=msg.role, content=content))
|
||||
|
||||
return ResponsesRequest(
|
||||
model=request.model,
|
||||
input=input_messages if input_messages else "",
|
||||
instructions=instructions,
|
||||
max_output_tokens=request.max_tokens,
|
||||
temperature=request.temperature,
|
||||
top_p=request.top_p,
|
||||
top_k=request.top_k,
|
||||
stop=request.stop_sequences,
|
||||
stream=request.stream,
|
||||
)
|
||||
|
||||
|
||||
async def collect_claude_response(
|
||||
command_id: CommandId,
|
||||
model: str,
|
||||
chunk_stream: AsyncGenerator[TokenChunk, None],
|
||||
) -> ClaudeMessagesResponse:
|
||||
"""Collect all token chunks and return a single ClaudeMessagesResponse."""
|
||||
text_parts: list[str] = []
|
||||
stop_reason: ClaudeStopReason | None = None
|
||||
last_stats = None
|
||||
error_message: str | None = None
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.finish_reason == "error":
|
||||
error_message = chunk.error_message or "Internal server error"
|
||||
break
|
||||
|
||||
text_parts.append(chunk.text)
|
||||
last_stats = chunk.stats or last_stats
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
stop_reason = finish_reason_to_claude_stop_reason(chunk.finish_reason)
|
||||
|
||||
if error_message is not None:
|
||||
raise ValueError(error_message)
|
||||
|
||||
combined_text = "".join(text_parts)
|
||||
|
||||
# Use actual usage data from stats if available
|
||||
input_tokens = last_stats.prompt_tokens if last_stats else 0
|
||||
output_tokens = last_stats.generation_tokens if last_stats else 0
|
||||
|
||||
return ClaudeMessagesResponse(
|
||||
id=f"msg_{command_id}",
|
||||
model=model,
|
||||
content=[ClaudeTextBlock(text=combined_text)],
|
||||
stop_reason=stop_reason,
|
||||
usage=ClaudeUsage(
|
||||
input_tokens=input_tokens,
|
||||
output_tokens=output_tokens,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def generate_claude_stream(
|
||||
command_id: CommandId,
|
||||
model: str,
|
||||
chunk_stream: AsyncGenerator[TokenChunk, None],
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Generate Claude Messages API streaming events from TokenChunks."""
|
||||
# Initial message_start event
|
||||
initial_message = ClaudeMessageStart(
|
||||
id=f"msg_{command_id}",
|
||||
model=model,
|
||||
content=[],
|
||||
stop_reason=None,
|
||||
usage=ClaudeUsage(input_tokens=0, output_tokens=0),
|
||||
)
|
||||
start_event = ClaudeMessageStartEvent(message=initial_message)
|
||||
yield f"event: message_start\ndata: {start_event.model_dump_json()}\n\n"
|
||||
|
||||
# content_block_start
|
||||
block_start = ClaudeContentBlockStartEvent(
|
||||
index=0, content_block=ClaudeTextBlock(text="")
|
||||
)
|
||||
yield f"event: content_block_start\ndata: {block_start.model_dump_json()}\n\n"
|
||||
|
||||
output_tokens = 0
|
||||
stop_reason: ClaudeStopReason | None = None
|
||||
last_stats = None
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
output_tokens += 1 # Count each chunk as one token
|
||||
last_stats = chunk.stats or last_stats
|
||||
|
||||
# content_block_delta
|
||||
delta_event = ClaudeContentBlockDeltaEvent(
|
||||
index=0,
|
||||
delta=ClaudeTextDelta(text=chunk.text),
|
||||
)
|
||||
yield f"event: content_block_delta\ndata: {delta_event.model_dump_json()}\n\n"
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
stop_reason = finish_reason_to_claude_stop_reason(chunk.finish_reason)
|
||||
|
||||
# Use actual token count from stats if available
|
||||
if last_stats is not None:
|
||||
output_tokens = last_stats.generation_tokens
|
||||
|
||||
# content_block_stop
|
||||
block_stop = ClaudeContentBlockStopEvent(index=0)
|
||||
yield f"event: content_block_stop\ndata: {block_stop.model_dump_json()}\n\n"
|
||||
|
||||
# message_delta
|
||||
message_delta = ClaudeMessageDeltaEvent(
|
||||
delta=ClaudeMessageDelta(stop_reason=stop_reason),
|
||||
usage=ClaudeMessageDeltaUsage(output_tokens=output_tokens),
|
||||
)
|
||||
yield f"event: message_delta\ndata: {message_delta.model_dump_json()}\n\n"
|
||||
|
||||
# message_stop
|
||||
message_stop = ClaudeMessageStopEvent()
|
||||
yield f"event: message_stop\ndata: {message_stop.model_dump_json()}\n\n"
|
||||
@@ -1,173 +0,0 @@
|
||||
"""OpenAI Responses API adapter for converting requests/responses.
|
||||
|
||||
ResponsesRequest is the canonical internal format. Responses API is the most featureful,
|
||||
making it the natural choice for the internal format. All other API formats (Chat
|
||||
Completions, Claude) are converted TO ResponsesRequest.
|
||||
"""
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.openai_responses import (
|
||||
ResponseCompletedEvent,
|
||||
ResponseContentPartAddedEvent,
|
||||
ResponseContentPartDoneEvent,
|
||||
ResponseCreatedEvent,
|
||||
ResponseInProgressEvent,
|
||||
ResponseMessageItem,
|
||||
ResponseOutputItemAddedEvent,
|
||||
ResponseOutputItemDoneEvent,
|
||||
ResponseOutputText,
|
||||
ResponsesResponse,
|
||||
ResponseTextDeltaEvent,
|
||||
ResponseTextDoneEvent,
|
||||
ResponseUsage,
|
||||
)
|
||||
|
||||
|
||||
async def collect_responses_response(
|
||||
command_id: CommandId,
|
||||
model: str,
|
||||
chunk_stream: AsyncGenerator[TokenChunk, None],
|
||||
) -> ResponsesResponse:
|
||||
"""Collect all token chunks and return a single ResponsesResponse."""
|
||||
response_id = f"resp_{command_id}"
|
||||
item_id = f"item_{command_id}"
|
||||
accumulated_text = ""
|
||||
last_stats = None
|
||||
error_message: str | None = None
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.finish_reason == "error":
|
||||
error_message = chunk.error_message or "Internal server error"
|
||||
break
|
||||
|
||||
accumulated_text += chunk.text
|
||||
last_stats = chunk.stats or last_stats
|
||||
|
||||
if error_message is not None:
|
||||
raise ValueError(error_message)
|
||||
|
||||
# Create usage from stats if available
|
||||
usage = None
|
||||
if last_stats is not None:
|
||||
usage = ResponseUsage(
|
||||
input_tokens=last_stats.prompt_tokens,
|
||||
output_tokens=last_stats.generation_tokens,
|
||||
total_tokens=last_stats.prompt_tokens + last_stats.generation_tokens,
|
||||
)
|
||||
|
||||
output_item = ResponseMessageItem(
|
||||
id=item_id,
|
||||
content=[ResponseOutputText(text=accumulated_text)],
|
||||
status="completed",
|
||||
)
|
||||
|
||||
return ResponsesResponse(
|
||||
id=response_id,
|
||||
model=model,
|
||||
status="completed",
|
||||
output=[output_item],
|
||||
output_text=accumulated_text,
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
|
||||
async def generate_responses_stream(
|
||||
command_id: CommandId,
|
||||
model: str,
|
||||
chunk_stream: AsyncGenerator[TokenChunk, None],
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Generate OpenAI Responses API streaming events from TokenChunks."""
|
||||
response_id = f"resp_{command_id}"
|
||||
item_id = f"item_{command_id}"
|
||||
|
||||
# response.created
|
||||
initial_response = ResponsesResponse(
|
||||
id=response_id,
|
||||
model=model,
|
||||
status="in_progress",
|
||||
output=[],
|
||||
output_text="",
|
||||
)
|
||||
created_event = ResponseCreatedEvent(response=initial_response)
|
||||
yield f"event: response.created\ndata: {created_event.model_dump_json()}\n\n"
|
||||
|
||||
# response.in_progress
|
||||
in_progress_event = ResponseInProgressEvent(response=initial_response)
|
||||
yield f"event: response.in_progress\ndata: {in_progress_event.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_item.added
|
||||
initial_item = ResponseMessageItem(
|
||||
id=item_id,
|
||||
content=[ResponseOutputText(text="")],
|
||||
status="in_progress",
|
||||
)
|
||||
item_added = ResponseOutputItemAddedEvent(output_index=0, item=initial_item)
|
||||
yield f"event: response.output_item.added\ndata: {item_added.model_dump_json()}\n\n"
|
||||
|
||||
# response.content_part.added
|
||||
initial_part = ResponseOutputText(text="")
|
||||
part_added = ResponseContentPartAddedEvent(
|
||||
output_index=0, content_index=0, part=initial_part
|
||||
)
|
||||
yield f"event: response.content_part.added\ndata: {part_added.model_dump_json()}\n\n"
|
||||
|
||||
accumulated_text = ""
|
||||
last_stats = None
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
accumulated_text += chunk.text
|
||||
last_stats = chunk.stats or last_stats
|
||||
|
||||
# response.output_text.delta
|
||||
delta_event = ResponseTextDeltaEvent(
|
||||
output_index=0,
|
||||
content_index=0,
|
||||
delta=chunk.text,
|
||||
)
|
||||
yield f"event: response.output_text.delta\ndata: {delta_event.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_text.done
|
||||
text_done = ResponseTextDoneEvent(
|
||||
output_index=0, content_index=0, text=accumulated_text
|
||||
)
|
||||
yield f"event: response.output_text.done\ndata: {text_done.model_dump_json()}\n\n"
|
||||
|
||||
# response.content_part.done
|
||||
final_part = ResponseOutputText(text=accumulated_text)
|
||||
part_done = ResponseContentPartDoneEvent(
|
||||
output_index=0, content_index=0, part=final_part
|
||||
)
|
||||
yield f"event: response.content_part.done\ndata: {part_done.model_dump_json()}\n\n"
|
||||
|
||||
# response.output_item.done
|
||||
final_item = ResponseMessageItem(
|
||||
id=item_id,
|
||||
content=[ResponseOutputText(text=accumulated_text)],
|
||||
status="completed",
|
||||
)
|
||||
item_done = ResponseOutputItemDoneEvent(output_index=0, item=final_item)
|
||||
yield f"event: response.output_item.done\ndata: {item_done.model_dump_json()}\n\n"
|
||||
|
||||
# Create usage from stats if available
|
||||
usage = None
|
||||
if last_stats is not None:
|
||||
usage = ResponseUsage(
|
||||
input_tokens=last_stats.prompt_tokens,
|
||||
output_tokens=last_stats.generation_tokens,
|
||||
total_tokens=last_stats.prompt_tokens + last_stats.generation_tokens,
|
||||
)
|
||||
|
||||
# response.completed
|
||||
final_response = ResponsesResponse(
|
||||
id=response_id,
|
||||
model=model,
|
||||
status="completed",
|
||||
output=[final_item],
|
||||
output_text=accumulated_text,
|
||||
usage=usage,
|
||||
)
|
||||
completed_event = ResponseCompletedEvent(response=final_response)
|
||||
yield f"event: response.completed\ndata: {completed_event.model_dump_json()}\n\n"
|
||||
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from collections.abc import AsyncGenerator
|
||||
from http import HTTPStatus
|
||||
from typing import cast
|
||||
@@ -14,20 +15,6 @@ from hypercorn.config import Config
|
||||
from hypercorn.typing import ASGIFramework
|
||||
from loguru import logger
|
||||
|
||||
from exo.master.adapters.chat_completions import (
|
||||
chat_request_to_internal,
|
||||
collect_chat_response,
|
||||
generate_chat_stream,
|
||||
)
|
||||
from exo.master.adapters.claude import (
|
||||
claude_request_to_internal,
|
||||
collect_claude_response,
|
||||
generate_claude_stream,
|
||||
)
|
||||
from exo.master.adapters.responses import (
|
||||
collect_responses_response,
|
||||
generate_responses_stream,
|
||||
)
|
||||
from exo.master.placement import place_instance as get_instance_placements
|
||||
from exo.shared.apply import apply
|
||||
from exo.shared.election import ElectionMessage
|
||||
@@ -40,24 +27,21 @@ from exo.shared.types.api import (
|
||||
ChatCompletionChoice,
|
||||
ChatCompletionMessage,
|
||||
ChatCompletionResponse,
|
||||
ChatCompletionTaskParams,
|
||||
CreateInstanceParams,
|
||||
CreateInstanceResponse,
|
||||
DeleteInstanceResponse,
|
||||
ErrorInfo,
|
||||
ErrorResponse,
|
||||
FinishReason,
|
||||
GenerationStats,
|
||||
ModelList,
|
||||
ModelListModel,
|
||||
PlaceInstanceParams,
|
||||
PlacementPreview,
|
||||
PlacementPreviewResponse,
|
||||
StreamingChoiceResponse,
|
||||
)
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.claude_api import (
|
||||
ClaudeMessagesRequest,
|
||||
ClaudeMessagesResponse,
|
||||
)
|
||||
from exo.shared.types.commands import (
|
||||
ChatCompletion,
|
||||
Command,
|
||||
@@ -76,11 +60,8 @@ from exo.shared.types.events import (
|
||||
)
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.shared.types.openai_responses import (
|
||||
ResponsesRequest,
|
||||
ResponsesResponse,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import ChatCompletionTaskParams
|
||||
from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
|
||||
from exo.shared.types.worker.shards import Sharding
|
||||
from exo.utils.banner import print_startup_banner
|
||||
@@ -89,6 +70,23 @@ from exo.utils.dashboard_path import find_dashboard
|
||||
from exo.utils.event_buffer import OrderedBuffer
|
||||
|
||||
|
||||
def chunk_to_response(
|
||||
chunk: TokenChunk, command_id: CommandId
|
||||
) -> ChatCompletionResponse:
|
||||
return ChatCompletionResponse(
|
||||
id=command_id,
|
||||
created=int(time.time()),
|
||||
model=chunk.model,
|
||||
choices=[
|
||||
StreamingChoiceResponse(
|
||||
index=0,
|
||||
delta=ChatCompletionMessage(role="assistant", content=chunk.text),
|
||||
finish_reason=chunk.finish_reason,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
async def resolve_model_meta(model_id: str) -> ModelMetadata:
|
||||
if model_id in MODEL_CARDS:
|
||||
model_card = MODEL_CARDS[model_id]
|
||||
@@ -193,8 +191,6 @@ class API:
|
||||
self.chat_completions
|
||||
)
|
||||
self.app.post("/bench/chat/completions")(self.bench_chat_completions)
|
||||
self.app.post("/v1/messages", response_model=None)(self.claude_messages)
|
||||
self.app.post("/v1/responses", response_model=None)(self.openai_responses)
|
||||
self.app.get("/state")(lambda: self.state)
|
||||
self.app.get("/events")(lambda: self._event_log)
|
||||
|
||||
@@ -400,13 +396,11 @@ class API:
|
||||
instance_id=instance_id,
|
||||
)
|
||||
|
||||
async def _token_chunk_stream(
|
||||
async def _chat_chunk_stream(
|
||||
self, command_id: CommandId
|
||||
) -> AsyncGenerator[TokenChunk, None]:
|
||||
"""Yield `TokenChunk`s for a given command until completion.
|
||||
"""Yield `TokenChunk`s for a given command until completion."""
|
||||
|
||||
This is the internal low-level stream used by all API adapters.
|
||||
"""
|
||||
try:
|
||||
self._chat_completion_queues[command_id], recv = channel[TokenChunk]()
|
||||
|
||||
@@ -429,20 +423,87 @@ class API:
|
||||
await self._send(command)
|
||||
del self._chat_completion_queues[command_id]
|
||||
|
||||
async def _generate_chat_stream(
|
||||
self, command_id: CommandId
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""Generate chat completion stream as JSON strings."""
|
||||
|
||||
async for chunk in self._chat_chunk_stream(command_id):
|
||||
if chunk.finish_reason == "error":
|
||||
error_response = ErrorResponse(
|
||||
error=ErrorInfo(
|
||||
message=chunk.error_message or "Internal server error",
|
||||
type="InternalServerError",
|
||||
code=500,
|
||||
)
|
||||
)
|
||||
yield f"data: {error_response.model_dump_json()}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
chunk_response: ChatCompletionResponse = chunk_to_response(
|
||||
chunk, command_id
|
||||
)
|
||||
logger.debug(f"chunk_response: {chunk_response}")
|
||||
|
||||
yield f"data: {chunk_response.model_dump_json()}\n\n"
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
async def _collect_chat_completion(
|
||||
self, command_id: CommandId
|
||||
) -> ChatCompletionResponse:
|
||||
"""Collect all token chunks for a chat completion and return a single response."""
|
||||
|
||||
text_parts: list[str] = []
|
||||
model: str | None = None
|
||||
finish_reason: FinishReason | None = None
|
||||
|
||||
async for chunk in self._chat_chunk_stream(command_id):
|
||||
if chunk.finish_reason == "error":
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=chunk.error_message or "Internal server error",
|
||||
)
|
||||
|
||||
if model is None:
|
||||
model = chunk.model
|
||||
|
||||
text_parts.append(chunk.text)
|
||||
|
||||
if chunk.finish_reason is not None:
|
||||
finish_reason = chunk.finish_reason
|
||||
|
||||
combined_text = "".join(text_parts)
|
||||
assert model is not None
|
||||
|
||||
return ChatCompletionResponse(
|
||||
id=command_id,
|
||||
created=int(time.time()),
|
||||
model=model,
|
||||
choices=[
|
||||
ChatCompletionChoice(
|
||||
index=0,
|
||||
message=ChatCompletionMessage(
|
||||
role="assistant",
|
||||
content=combined_text,
|
||||
),
|
||||
finish_reason=finish_reason,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
async def _collect_chat_completion_with_stats(
|
||||
self, command_id: CommandId
|
||||
) -> BenchChatCompletionResponse:
|
||||
import time
|
||||
|
||||
from exo.shared.types.api import FinishReason
|
||||
|
||||
text_parts: list[str] = []
|
||||
model: str | None = None
|
||||
finish_reason: FinishReason | None = None
|
||||
|
||||
stats: GenerationStats | None = None
|
||||
|
||||
async for chunk in self._token_chunk_stream(command_id):
|
||||
async for chunk in self._chat_chunk_stream(command_id):
|
||||
if chunk.finish_reason == "error":
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
@@ -486,154 +547,60 @@ class API:
|
||||
async def chat_completions(
|
||||
self, payload: ChatCompletionTaskParams
|
||||
) -> ChatCompletionResponse | StreamingResponse:
|
||||
"""OpenAI Chat Completions API - adapter."""
|
||||
internal_params = chat_request_to_internal(payload)
|
||||
model_meta = await resolve_model_meta(internal_params.model)
|
||||
internal_params.model = model_meta.model_id
|
||||
"""Handle chat completions, supporting both streaming and non-streaming responses."""
|
||||
model_meta = await resolve_model_meta(payload.model)
|
||||
payload.model = model_meta.model_id
|
||||
|
||||
if not any(
|
||||
instance.shard_assignments.model_id == internal_params.model
|
||||
instance.shard_assignments.model_id == payload.model
|
||||
for instance in self.state.instances.values()
|
||||
):
|
||||
await self._trigger_notify_user_to_download_model(internal_params.model)
|
||||
await self._trigger_notify_user_to_download_model(payload.model)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No instance found for model {internal_params.model}",
|
||||
status_code=404, detail=f"No instance found for model {payload.model}"
|
||||
)
|
||||
|
||||
command = ChatCompletion(request_params=internal_params)
|
||||
command = ChatCompletion(
|
||||
request_params=payload,
|
||||
)
|
||||
await self._send(command)
|
||||
|
||||
if payload.stream:
|
||||
return StreamingResponse(
|
||||
generate_chat_stream(
|
||||
command.command_id,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
),
|
||||
self._generate_chat_stream(command.command_id),
|
||||
media_type="text/event-stream",
|
||||
)
|
||||
|
||||
try:
|
||||
return await collect_chat_response(
|
||||
command.command_id,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
return await self._collect_chat_completion(command.command_id)
|
||||
|
||||
async def bench_chat_completions(
|
||||
self, payload: BenchChatCompletionTaskParams
|
||||
) -> BenchChatCompletionResponse:
|
||||
# Convert to internal format (BenchChatCompletionTaskParams extends ChatCompletionTaskParams)
|
||||
internal_params = chat_request_to_internal(payload)
|
||||
model_meta = await resolve_model_meta(internal_params.model)
|
||||
internal_params.model = model_meta.model_id
|
||||
model_meta = await resolve_model_meta(payload.model)
|
||||
payload.model = model_meta.model_id
|
||||
|
||||
if not any(
|
||||
instance.shard_assignments.model_id == internal_params.model
|
||||
instance.shard_assignments.model_id == payload.model
|
||||
for instance in self.state.instances.values()
|
||||
):
|
||||
await self._trigger_notify_user_to_download_model(internal_params.model)
|
||||
await self._trigger_notify_user_to_download_model(payload.model)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No instance found for model {internal_params.model}",
|
||||
status_code=404, detail=f"No instance found for model {payload.model}"
|
||||
)
|
||||
|
||||
internal_params.stream = False
|
||||
payload.stream = False
|
||||
|
||||
command = ChatCompletion(request_params=internal_params)
|
||||
command = ChatCompletion(request_params=payload)
|
||||
await self._send(command)
|
||||
|
||||
response = await self._collect_chat_completion_with_stats(command.command_id)
|
||||
return response
|
||||
|
||||
async def claude_messages(
|
||||
self, payload: ClaudeMessagesRequest
|
||||
) -> ClaudeMessagesResponse | StreamingResponse:
|
||||
"""Claude Messages API - adapter."""
|
||||
internal_params = claude_request_to_internal(payload)
|
||||
model_meta = await resolve_model_meta(internal_params.model)
|
||||
internal_params.model = model_meta.model_id
|
||||
|
||||
if not any(
|
||||
instance.shard_assignments.model_id == internal_params.model
|
||||
for instance in self.state.instances.values()
|
||||
):
|
||||
await self._trigger_notify_user_to_download_model(internal_params.model)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No instance found for model {internal_params.model}",
|
||||
)
|
||||
|
||||
command = ChatCompletion(request_params=internal_params)
|
||||
await self._send(command)
|
||||
|
||||
if payload.stream:
|
||||
return StreamingResponse(
|
||||
generate_claude_stream(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
)
|
||||
|
||||
try:
|
||||
return await collect_claude_response(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
async def openai_responses(
|
||||
self, payload: ResponsesRequest
|
||||
) -> ResponsesResponse | StreamingResponse:
|
||||
"""OpenAI Responses API - native format (no conversion needed)."""
|
||||
model_meta = await resolve_model_meta(payload.model)
|
||||
# Update model to resolved model_id
|
||||
request_params = payload.model_copy(update={"model": model_meta.model_id})
|
||||
|
||||
if not any(
|
||||
instance.shard_assignments.model_id == request_params.model
|
||||
for instance in self.state.instances.values()
|
||||
):
|
||||
await self._trigger_notify_user_to_download_model(request_params.model)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No instance found for model {request_params.model}",
|
||||
)
|
||||
|
||||
command = ChatCompletion(request_params=request_params)
|
||||
await self._send(command)
|
||||
|
||||
if payload.stream:
|
||||
return StreamingResponse(
|
||||
generate_responses_stream(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
)
|
||||
|
||||
try:
|
||||
return await collect_responses_response(
|
||||
command.command_id,
|
||||
payload.model,
|
||||
self._token_chunk_stream(command.command_id),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||||
|
||||
def _calculate_total_available_memory(self) -> Memory:
|
||||
"""Calculate total available memory across all nodes in bytes."""
|
||||
total_available = Memory()
|
||||
|
||||
for node in self.state.topology.list_nodes():
|
||||
if node.node_profile is not None:
|
||||
total_available += node.node_profile.memory.ram_available
|
||||
for memory in self.state.node_memories.values():
|
||||
total_available += memory.ram_available
|
||||
|
||||
return total_available
|
||||
|
||||
|
||||
@@ -113,6 +113,7 @@ def place_instance(
|
||||
node.node_profile.memory.ram_available
|
||||
for node in cycle
|
||||
if node.node_profile is not None
|
||||
and node.node_profile.memory is not None
|
||||
),
|
||||
start=Memory(),
|
||||
),
|
||||
|
||||
@@ -25,7 +25,10 @@ class NodeWithProfile(BaseModel):
|
||||
|
||||
|
||||
def narrow_all_nodes(nodes: list[NodeInfo]) -> TypeGuard[list[NodeWithProfile]]:
|
||||
return all(node.node_profile is not None for node in nodes)
|
||||
return all(
|
||||
node.node_profile is not None and node.node_profile.memory is not None
|
||||
for node in nodes
|
||||
)
|
||||
|
||||
|
||||
def filter_cycles_by_memory(
|
||||
@@ -36,8 +39,14 @@ def filter_cycles_by_memory(
|
||||
if not narrow_all_nodes(cycle):
|
||||
continue
|
||||
|
||||
# narrow_all_nodes guarantees memory is not None
|
||||
total_mem = sum(
|
||||
(node.node_profile.memory.ram_available for node in cycle), start=Memory()
|
||||
(
|
||||
node.node_profile.memory.ram_available
|
||||
for node in cycle
|
||||
if node.node_profile.memory is not None
|
||||
),
|
||||
start=Memory(),
|
||||
)
|
||||
if total_mem >= required_memory:
|
||||
filtered_cycles.append(cast(list[NodeInfo], cycle))
|
||||
@@ -53,8 +62,13 @@ def get_shard_assignments_for_pipeline_parallel(
|
||||
model_meta: ModelMetadata,
|
||||
selected_cycle: list[NodeWithProfile],
|
||||
):
|
||||
# NodeWithProfile guarantees memory is not None
|
||||
cycle_memory = sum(
|
||||
(node.node_profile.memory.ram_available for node in selected_cycle),
|
||||
(
|
||||
node.node_profile.memory.ram_available
|
||||
for node in selected_cycle
|
||||
if node.node_profile.memory is not None
|
||||
),
|
||||
start=Memory(),
|
||||
)
|
||||
total_layers = model_meta.n_layers
|
||||
@@ -67,6 +81,8 @@ def get_shard_assignments_for_pipeline_parallel(
|
||||
if i == len(selected_cycle) - 1:
|
||||
node_layers = total_layers - layers_assigned
|
||||
else:
|
||||
# NodeWithProfile guarantees memory is not None
|
||||
assert node.node_profile.memory is not None
|
||||
node_layers = round(
|
||||
total_layers
|
||||
* (
|
||||
|
||||
@@ -1,283 +0,0 @@
|
||||
"""Tests for Claude Messages API conversion functions and types."""
|
||||
|
||||
import json
|
||||
from typing import Any, cast
|
||||
|
||||
import pydantic
|
||||
import pytest
|
||||
|
||||
from exo.master.adapters.claude import (
|
||||
claude_request_to_internal,
|
||||
finish_reason_to_claude_stop_reason,
|
||||
)
|
||||
from exo.shared.types.claude_api import (
|
||||
ClaudeContentBlockDeltaEvent,
|
||||
ClaudeContentBlockStartEvent,
|
||||
ClaudeContentBlockStopEvent,
|
||||
ClaudeMessage,
|
||||
ClaudeMessageDelta,
|
||||
ClaudeMessageDeltaEvent,
|
||||
ClaudeMessageDeltaUsage,
|
||||
ClaudeMessagesRequest,
|
||||
ClaudeMessageStart,
|
||||
ClaudeMessageStartEvent,
|
||||
ClaudeMessageStopEvent,
|
||||
ClaudeTextBlock,
|
||||
ClaudeTextDelta,
|
||||
ClaudeUsage,
|
||||
)
|
||||
|
||||
|
||||
class TestFinishReasonToClaudeStopReason:
|
||||
"""Tests for finish_reason to Claude stop_reason mapping."""
|
||||
|
||||
def test_stop_maps_to_end_turn(self):
|
||||
assert finish_reason_to_claude_stop_reason("stop") == "end_turn"
|
||||
|
||||
def test_length_maps_to_max_tokens(self):
|
||||
assert finish_reason_to_claude_stop_reason("length") == "max_tokens"
|
||||
|
||||
def test_tool_calls_maps_to_tool_use(self):
|
||||
assert finish_reason_to_claude_stop_reason("tool_calls") == "tool_use"
|
||||
|
||||
def test_function_call_maps_to_tool_use(self):
|
||||
assert finish_reason_to_claude_stop_reason("function_call") == "tool_use"
|
||||
|
||||
def test_content_filter_maps_to_end_turn(self):
|
||||
assert finish_reason_to_claude_stop_reason("content_filter") == "end_turn"
|
||||
|
||||
def test_none_returns_none(self):
|
||||
assert finish_reason_to_claude_stop_reason(None) is None
|
||||
|
||||
|
||||
class TestClaudeRequestToInternal:
|
||||
"""Tests for converting Claude Messages API requests to ResponsesRequest."""
|
||||
|
||||
def test_basic_request_conversion(self):
|
||||
request = ClaudeMessagesRequest(
|
||||
model="claude-3-opus",
|
||||
max_tokens=100,
|
||||
messages=[
|
||||
ClaudeMessage(role="user", content="Hello"),
|
||||
],
|
||||
)
|
||||
params = claude_request_to_internal(request)
|
||||
|
||||
assert params.model == "claude-3-opus"
|
||||
assert params.max_output_tokens == 100
|
||||
assert isinstance(params.input, list)
|
||||
assert len(params.input) == 1
|
||||
assert params.input[0].role == "user"
|
||||
assert params.input[0].content == "Hello"
|
||||
assert params.instructions is None
|
||||
|
||||
def test_request_with_system_string(self):
|
||||
request = ClaudeMessagesRequest(
|
||||
model="claude-3-opus",
|
||||
max_tokens=100,
|
||||
system="You are a helpful assistant.",
|
||||
messages=[
|
||||
ClaudeMessage(role="user", content="Hello"),
|
||||
],
|
||||
)
|
||||
params = claude_request_to_internal(request)
|
||||
|
||||
assert params.instructions == "You are a helpful assistant."
|
||||
assert isinstance(params.input, list)
|
||||
assert len(params.input) == 1
|
||||
assert params.input[0].role == "user"
|
||||
assert params.input[0].content == "Hello"
|
||||
|
||||
def test_request_with_system_text_blocks(self):
|
||||
request = ClaudeMessagesRequest(
|
||||
model="claude-3-opus",
|
||||
max_tokens=100,
|
||||
system=[
|
||||
ClaudeTextBlock(text="You are helpful. "),
|
||||
ClaudeTextBlock(text="Be concise."),
|
||||
],
|
||||
messages=[
|
||||
ClaudeMessage(role="user", content="Hello"),
|
||||
],
|
||||
)
|
||||
params = claude_request_to_internal(request)
|
||||
|
||||
assert params.instructions == "You are helpful. Be concise."
|
||||
assert isinstance(params.input, list)
|
||||
assert len(params.input) == 1
|
||||
|
||||
def test_request_with_content_blocks(self):
|
||||
request = ClaudeMessagesRequest(
|
||||
model="claude-3-opus",
|
||||
max_tokens=100,
|
||||
messages=[
|
||||
ClaudeMessage(
|
||||
role="user",
|
||||
content=[
|
||||
ClaudeTextBlock(text="First part. "),
|
||||
ClaudeTextBlock(text="Second part."),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
params = claude_request_to_internal(request)
|
||||
|
||||
assert isinstance(params.input, list)
|
||||
assert len(params.input) == 1
|
||||
assert params.input[0].content == "First part. Second part."
|
||||
|
||||
def test_request_with_multi_turn_conversation(self):
|
||||
request = ClaudeMessagesRequest(
|
||||
model="claude-3-opus",
|
||||
max_tokens=100,
|
||||
messages=[
|
||||
ClaudeMessage(role="user", content="Hello"),
|
||||
ClaudeMessage(role="assistant", content="Hi there!"),
|
||||
ClaudeMessage(role="user", content="How are you?"),
|
||||
],
|
||||
)
|
||||
params = claude_request_to_internal(request)
|
||||
|
||||
assert isinstance(params.input, list)
|
||||
assert len(params.input) == 3
|
||||
assert params.input[0].role == "user"
|
||||
assert params.input[1].role == "assistant"
|
||||
assert params.input[2].role == "user"
|
||||
|
||||
def test_request_with_optional_parameters(self):
|
||||
request = ClaudeMessagesRequest(
|
||||
model="claude-3-opus",
|
||||
max_tokens=100,
|
||||
messages=[ClaudeMessage(role="user", content="Hello")],
|
||||
temperature=0.7,
|
||||
top_p=0.9,
|
||||
top_k=40,
|
||||
stop_sequences=["STOP", "END"],
|
||||
stream=True,
|
||||
)
|
||||
params = claude_request_to_internal(request)
|
||||
|
||||
assert params.temperature == 0.7
|
||||
assert params.top_p == 0.9
|
||||
assert params.top_k == 40
|
||||
assert params.stop == ["STOP", "END"]
|
||||
assert params.stream is True
|
||||
|
||||
|
||||
class TestClaudeMessagesRequestValidation:
|
||||
"""Tests for Claude Messages API request validation."""
|
||||
|
||||
def test_request_requires_model(self):
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
ClaudeMessagesRequest.model_validate(
|
||||
{
|
||||
"max_tokens": 100,
|
||||
"messages": [{"role": "user", "content": "Hello"}],
|
||||
}
|
||||
)
|
||||
|
||||
def test_request_requires_max_tokens(self):
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
ClaudeMessagesRequest.model_validate(
|
||||
{
|
||||
"model": "claude-3-opus",
|
||||
"messages": [{"role": "user", "content": "Hello"}],
|
||||
}
|
||||
)
|
||||
|
||||
def test_request_requires_messages(self):
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
ClaudeMessagesRequest.model_validate(
|
||||
{
|
||||
"model": "claude-3-opus",
|
||||
"max_tokens": 100,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class TestClaudeStreamingEvents:
|
||||
"""Tests for Claude Messages API streaming event serialization."""
|
||||
|
||||
def test_message_start_event_format(self):
|
||||
message = ClaudeMessageStart(
|
||||
id="msg_123",
|
||||
model="claude-3-opus",
|
||||
content=[],
|
||||
stop_reason=None,
|
||||
usage=ClaudeUsage(input_tokens=10, output_tokens=0),
|
||||
)
|
||||
event = ClaudeMessageStartEvent(message=message)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "message_start"
|
||||
assert parsed["message"]["id"] == "msg_123"
|
||||
assert parsed["message"]["type"] == "message"
|
||||
assert parsed["message"]["role"] == "assistant"
|
||||
assert parsed["message"]["model"] == "claude-3-opus"
|
||||
|
||||
def test_content_block_start_event_format(self):
|
||||
event = ClaudeContentBlockStartEvent(
|
||||
index=0,
|
||||
content_block=ClaudeTextBlock(text=""),
|
||||
)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "content_block_start"
|
||||
assert parsed["index"] == 0
|
||||
assert parsed["content_block"]["type"] == "text"
|
||||
assert parsed["content_block"]["text"] == ""
|
||||
|
||||
def test_content_block_delta_event_format(self):
|
||||
event = ClaudeContentBlockDeltaEvent(
|
||||
index=0,
|
||||
delta=ClaudeTextDelta(text="Hello"),
|
||||
)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "content_block_delta"
|
||||
assert parsed["index"] == 0
|
||||
assert parsed["delta"]["type"] == "text_delta"
|
||||
assert parsed["delta"]["text"] == "Hello"
|
||||
|
||||
def test_content_block_stop_event_format(self):
|
||||
event = ClaudeContentBlockStopEvent(index=0)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "content_block_stop"
|
||||
assert parsed["index"] == 0
|
||||
|
||||
def test_message_delta_event_format(self):
|
||||
event = ClaudeMessageDeltaEvent(
|
||||
delta=ClaudeMessageDelta(stop_reason="end_turn"),
|
||||
usage=ClaudeMessageDeltaUsage(output_tokens=25),
|
||||
)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "message_delta"
|
||||
assert parsed["delta"]["stop_reason"] == "end_turn"
|
||||
assert parsed["usage"]["output_tokens"] == 25
|
||||
|
||||
def test_message_stop_event_format(self):
|
||||
event = ClaudeMessageStopEvent()
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "message_stop"
|
||||
|
||||
def test_sse_format(self):
|
||||
"""Test that SSE format is correctly generated."""
|
||||
event = ClaudeContentBlockDeltaEvent(
|
||||
index=0,
|
||||
delta=ClaudeTextDelta(text="Hello"),
|
||||
)
|
||||
# Simulate the SSE format used in the streaming generator
|
||||
sse_line = f"event: content_block_delta\ndata: {event.model_dump_json()}\n\n"
|
||||
|
||||
assert sse_line.startswith("event: content_block_delta\n")
|
||||
assert "data: " in sse_line
|
||||
assert sse_line.endswith("\n\n")
|
||||
@@ -7,6 +7,7 @@ from loguru import logger
|
||||
|
||||
from exo.master.main import Master
|
||||
from exo.routing.router import get_node_id_keypair
|
||||
from exo.shared.types.api import ChatCompletionMessage, ChatCompletionTaskParams
|
||||
from exo.shared.types.commands import (
|
||||
ChatCompletion,
|
||||
CommandId,
|
||||
@@ -18,17 +19,13 @@ from exo.shared.types.events import (
|
||||
ForwarderEvent,
|
||||
IndexedEvent,
|
||||
InstanceCreated,
|
||||
NodePerformanceMeasured,
|
||||
NodeIdentityMeasured,
|
||||
NodeMemoryMeasured,
|
||||
TaskCreated,
|
||||
)
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.models import ModelId, ModelMetadata
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryPerformanceProfile,
|
||||
NodePerformanceProfile,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.profiling import MemoryPerformanceProfile
|
||||
from exo.shared.types.tasks import ChatCompletion as ChatCompletionTask
|
||||
from exo.shared.types.tasks import TaskStatus
|
||||
from exo.shared.types.worker.instances import (
|
||||
@@ -75,29 +72,39 @@ async def test_master():
|
||||
tg.start_soon(master.run)
|
||||
|
||||
sender_node_id = NodeId(f"{keypair.to_peer_id().to_base58()}_sender")
|
||||
# inject a NodePerformanceProfile event
|
||||
logger.info("inject a NodePerformanceProfile event")
|
||||
# inject NodeIdentityMeasured and NodeMemoryMeasured events
|
||||
logger.info("inject NodeIdentityMeasured event")
|
||||
await local_event_sender.send(
|
||||
ForwarderEvent(
|
||||
origin_idx=0,
|
||||
origin=sender_node_id,
|
||||
session=session_id,
|
||||
event=(
|
||||
NodePerformanceMeasured(
|
||||
NodeIdentityMeasured(
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
node_id=node_id,
|
||||
node_profile=NodePerformanceProfile(
|
||||
model_id="maccy",
|
||||
chip_id="arm",
|
||||
friendly_name="test",
|
||||
memory=MemoryPerformanceProfile(
|
||||
ram_total=Memory.from_bytes(678948 * 1024),
|
||||
ram_available=Memory.from_bytes(678948 * 1024),
|
||||
swap_total=Memory.from_bytes(0),
|
||||
swap_available=Memory.from_bytes(0),
|
||||
),
|
||||
network_interfaces=[],
|
||||
system=SystemPerformanceProfile(),
|
||||
model_id="maccy",
|
||||
chip_id="arm",
|
||||
friendly_name="test",
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
logger.info("inject NodeMemoryMeasured event")
|
||||
await local_event_sender.send(
|
||||
ForwarderEvent(
|
||||
origin_idx=1,
|
||||
origin=sender_node_id,
|
||||
session=session_id,
|
||||
event=(
|
||||
NodeMemoryMeasured(
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
node_id=node_id,
|
||||
memory=MemoryPerformanceProfile(
|
||||
ram_total=Memory.from_bytes(678948 * 1024),
|
||||
ram_available=Memory.from_bytes(678948 * 1024),
|
||||
swap_total=Memory.from_bytes(0),
|
||||
swap_available=Memory.from_bytes(0),
|
||||
),
|
||||
)
|
||||
),
|
||||
@@ -108,7 +115,7 @@ async def test_master():
|
||||
logger.info("wait for initial topology event")
|
||||
while len(list(master.state.topology.list_nodes())) == 0:
|
||||
await anyio.sleep(0.001)
|
||||
while len(master.state.node_profiles) == 0:
|
||||
while len(master.state.node_identities) == 0:
|
||||
await anyio.sleep(0.001)
|
||||
|
||||
logger.info("inject a CreateInstance Command")
|
||||
@@ -143,25 +150,31 @@ async def test_master():
|
||||
command=(
|
||||
ChatCompletion(
|
||||
command_id=CommandId(),
|
||||
request_params=ResponsesRequest(
|
||||
request_params=ChatCompletionTaskParams(
|
||||
model="llama-3.2-1b",
|
||||
input="Hello, how are you?",
|
||||
messages=[
|
||||
ChatCompletionMessage(
|
||||
role="user", content="Hello, how are you?"
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
while len(_get_events()) < 3:
|
||||
while len(_get_events()) < 4:
|
||||
await anyio.sleep(0.01)
|
||||
|
||||
events = _get_events()
|
||||
assert len(events) == 3
|
||||
assert len(events) == 4
|
||||
assert events[0].idx == 0
|
||||
assert events[1].idx == 1
|
||||
assert events[2].idx == 2
|
||||
assert isinstance(events[0].event, NodePerformanceMeasured)
|
||||
assert isinstance(events[1].event, InstanceCreated)
|
||||
created_instance = events[1].event.instance
|
||||
assert events[3].idx == 3
|
||||
assert isinstance(events[0].event, NodeIdentityMeasured)
|
||||
assert isinstance(events[1].event, NodeMemoryMeasured)
|
||||
assert isinstance(events[2].event, InstanceCreated)
|
||||
created_instance = events[2].event.instance
|
||||
assert isinstance(created_instance, MlxRingInstance)
|
||||
runner_id = list(created_instance.shard_assignments.runner_to_shard.keys())[0]
|
||||
# Validate the shard assignments
|
||||
@@ -193,12 +206,14 @@ async def test_master():
|
||||
assert len(created_instance.hosts_by_node[node_id]) == 1
|
||||
assert created_instance.hosts_by_node[node_id][0].ip == "0.0.0.0"
|
||||
assert created_instance.ephemeral_port > 0
|
||||
assert isinstance(events[2].event, TaskCreated)
|
||||
assert events[2].event.task.task_status == TaskStatus.Pending
|
||||
assert isinstance(events[2].event.task, ChatCompletionTask)
|
||||
assert events[2].event.task.task_params == ResponsesRequest(
|
||||
assert isinstance(events[3].event, TaskCreated)
|
||||
assert events[3].event.task.task_status == TaskStatus.Pending
|
||||
assert isinstance(events[3].event.task, ChatCompletionTask)
|
||||
assert events[3].event.task.task_params == ChatCompletionTaskParams(
|
||||
model="llama-3.2-1b",
|
||||
input="Hello, how are you?",
|
||||
messages=[
|
||||
ChatCompletionMessage(role="user", content="Hello, how are you?")
|
||||
],
|
||||
)
|
||||
|
||||
await master.shutdown()
|
||||
|
||||
@@ -1,293 +0,0 @@
|
||||
"""Tests for OpenAI Responses API types.
|
||||
|
||||
ResponsesRequest is the canonical internal type used throughout the pipeline.
|
||||
No conversion is needed for Responses API requests.
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any, cast
|
||||
|
||||
import pydantic
|
||||
import pytest
|
||||
|
||||
from exo.shared.types.openai_responses import (
|
||||
ResponseCompletedEvent,
|
||||
ResponseContentPartAddedEvent,
|
||||
ResponseCreatedEvent,
|
||||
ResponseInputMessage,
|
||||
ResponseMessageItem,
|
||||
ResponseOutputItemAddedEvent,
|
||||
ResponseOutputItemDoneEvent,
|
||||
ResponseOutputText,
|
||||
ResponsesRequest,
|
||||
ResponsesResponse,
|
||||
ResponseTextDeltaEvent,
|
||||
ResponseTextDoneEvent,
|
||||
ResponseUsage,
|
||||
)
|
||||
|
||||
|
||||
class TestResponsesRequestAsCanonicalType:
|
||||
"""Tests for ResponsesRequest as the canonical internal type."""
|
||||
|
||||
def test_string_input(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input="Hello, how are you?",
|
||||
)
|
||||
|
||||
assert request.model == "gpt-4o"
|
||||
assert request.input == "Hello, how are you?"
|
||||
assert request.instructions is None
|
||||
|
||||
def test_message_array_input(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input=[
|
||||
ResponseInputMessage(role="user", content="Hello"),
|
||||
ResponseInputMessage(role="assistant", content="Hi there!"),
|
||||
ResponseInputMessage(role="user", content="How are you?"),
|
||||
],
|
||||
)
|
||||
|
||||
assert isinstance(request.input, list)
|
||||
assert len(request.input) == 3
|
||||
assert request.input[0].role == "user"
|
||||
assert request.input[0].content == "Hello"
|
||||
assert request.input[1].role == "assistant"
|
||||
assert request.input[1].content == "Hi there!"
|
||||
assert request.input[2].role == "user"
|
||||
assert request.input[2].content == "How are you?"
|
||||
|
||||
def test_request_with_instructions(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input="Hello",
|
||||
instructions="You are a helpful assistant. Be concise.",
|
||||
)
|
||||
|
||||
assert request.input == "Hello"
|
||||
assert request.instructions == "You are a helpful assistant. Be concise."
|
||||
|
||||
def test_request_with_optional_parameters(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input="Hello",
|
||||
max_output_tokens=500,
|
||||
temperature=0.8,
|
||||
top_p=0.95,
|
||||
stream=True,
|
||||
)
|
||||
|
||||
assert request.max_output_tokens == 500
|
||||
assert request.temperature == 0.8
|
||||
assert request.top_p == 0.95
|
||||
assert request.stream is True
|
||||
|
||||
def test_request_with_new_fields(self):
|
||||
"""Test the additional fields added for internal use."""
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input="Hello",
|
||||
top_k=40,
|
||||
seed=42,
|
||||
stop=["STOP", "END"],
|
||||
tools=[{"type": "function", "function": {"name": "test"}}],
|
||||
)
|
||||
|
||||
assert request.top_k == 40
|
||||
assert request.seed == 42
|
||||
assert request.stop == ["STOP", "END"]
|
||||
assert request.tools == [{"type": "function", "function": {"name": "test"}}]
|
||||
|
||||
def test_request_with_system_role_in_messages(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input=[
|
||||
ResponseInputMessage(role="system", content="Be helpful"),
|
||||
ResponseInputMessage(role="user", content="Hello"),
|
||||
],
|
||||
)
|
||||
|
||||
assert isinstance(request.input, list)
|
||||
assert len(request.input) == 2
|
||||
assert request.input[0].role == "system"
|
||||
assert request.input[1].role == "user"
|
||||
|
||||
def test_request_with_developer_role(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input=[
|
||||
ResponseInputMessage(role="developer", content="Internal note"),
|
||||
ResponseInputMessage(role="user", content="Hello"),
|
||||
],
|
||||
)
|
||||
|
||||
assert isinstance(request.input, list)
|
||||
assert len(request.input) == 2
|
||||
assert request.input[0].role == "developer"
|
||||
|
||||
|
||||
class TestResponsesRequestValidation:
|
||||
"""Tests for OpenAI Responses API request validation."""
|
||||
|
||||
def test_request_requires_model(self):
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
ResponsesRequest.model_validate(
|
||||
{
|
||||
"input": "Hello",
|
||||
}
|
||||
)
|
||||
|
||||
def test_request_requires_input(self):
|
||||
with pytest.raises(pydantic.ValidationError):
|
||||
ResponsesRequest.model_validate(
|
||||
{
|
||||
"model": "gpt-4o",
|
||||
}
|
||||
)
|
||||
|
||||
def test_request_accepts_string_input(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input="Hello",
|
||||
)
|
||||
assert request.input == "Hello"
|
||||
|
||||
def test_request_accepts_message_array_input(self):
|
||||
request = ResponsesRequest(
|
||||
model="gpt-4o",
|
||||
input=[ResponseInputMessage(role="user", content="Hello")],
|
||||
)
|
||||
assert len(request.input) == 1
|
||||
|
||||
|
||||
class TestResponsesStreamingEvents:
|
||||
"""Tests for OpenAI Responses API streaming event serialization."""
|
||||
|
||||
def test_response_created_event_format(self):
|
||||
response = ResponsesResponse(
|
||||
id="resp_123",
|
||||
model="gpt-4o",
|
||||
status="in_progress",
|
||||
output=[],
|
||||
output_text="",
|
||||
)
|
||||
event = ResponseCreatedEvent(response=response)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.created"
|
||||
assert parsed["response"]["id"] == "resp_123"
|
||||
assert parsed["response"]["object"] == "response"
|
||||
assert parsed["response"]["status"] == "in_progress"
|
||||
|
||||
def test_output_item_added_event_format(self):
|
||||
item = ResponseMessageItem(
|
||||
id="item_123",
|
||||
content=[ResponseOutputText(text="")],
|
||||
status="in_progress",
|
||||
)
|
||||
event = ResponseOutputItemAddedEvent(output_index=0, item=item)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.output_item.added"
|
||||
assert parsed["output_index"] == 0
|
||||
assert parsed["item"]["type"] == "message"
|
||||
assert parsed["item"]["id"] == "item_123"
|
||||
assert parsed["item"]["role"] == "assistant"
|
||||
|
||||
def test_content_part_added_event_format(self):
|
||||
part = ResponseOutputText(text="")
|
||||
event = ResponseContentPartAddedEvent(
|
||||
output_index=0,
|
||||
content_index=0,
|
||||
part=part,
|
||||
)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.content_part.added"
|
||||
assert parsed["output_index"] == 0
|
||||
assert parsed["content_index"] == 0
|
||||
assert parsed["part"]["type"] == "output_text"
|
||||
|
||||
def test_text_delta_event_format(self):
|
||||
event = ResponseTextDeltaEvent(
|
||||
output_index=0,
|
||||
content_index=0,
|
||||
delta="Hello",
|
||||
)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.output_text.delta"
|
||||
assert parsed["output_index"] == 0
|
||||
assert parsed["content_index"] == 0
|
||||
assert parsed["delta"] == "Hello"
|
||||
|
||||
def test_text_done_event_format(self):
|
||||
event = ResponseTextDoneEvent(
|
||||
output_index=0,
|
||||
content_index=0,
|
||||
text="Hello, world!",
|
||||
)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.output_text.done"
|
||||
assert parsed["text"] == "Hello, world!"
|
||||
|
||||
def test_output_item_done_event_format(self):
|
||||
item = ResponseMessageItem(
|
||||
id="item_123",
|
||||
content=[ResponseOutputText(text="Hello, world!")],
|
||||
status="completed",
|
||||
)
|
||||
event = ResponseOutputItemDoneEvent(output_index=0, item=item)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.output_item.done"
|
||||
assert parsed["item"]["status"] == "completed"
|
||||
assert parsed["item"]["content"][0]["text"] == "Hello, world!"
|
||||
|
||||
def test_response_completed_event_format(self):
|
||||
item = ResponseMessageItem(
|
||||
id="item_123",
|
||||
content=[ResponseOutputText(text="Hello!")],
|
||||
status="completed",
|
||||
)
|
||||
response = ResponsesResponse(
|
||||
id="resp_123",
|
||||
model="gpt-4o",
|
||||
status="completed",
|
||||
output=[item],
|
||||
output_text="Hello!",
|
||||
usage=ResponseUsage(input_tokens=10, output_tokens=5, total_tokens=15),
|
||||
)
|
||||
event = ResponseCompletedEvent(response=response)
|
||||
json_str = event.model_dump_json()
|
||||
parsed = cast(dict[str, Any], json.loads(json_str))
|
||||
|
||||
assert parsed["type"] == "response.completed"
|
||||
assert parsed["response"]["status"] == "completed"
|
||||
assert parsed["response"]["output_text"] == "Hello!"
|
||||
assert parsed["response"]["usage"]["total_tokens"] == 15
|
||||
|
||||
def test_sse_format(self):
|
||||
"""Test that SSE format is correctly generated."""
|
||||
event = ResponseTextDeltaEvent(
|
||||
output_index=0,
|
||||
content_index=0,
|
||||
delta="Hello",
|
||||
)
|
||||
# Simulate the SSE format used in the streaming generator
|
||||
sse_line = (
|
||||
f"event: response.output_text.delta\ndata: {event.model_dump_json()}\n\n"
|
||||
)
|
||||
|
||||
assert sse_line.startswith("event: response.output_text.delta\n")
|
||||
assert "data: " in sse_line
|
||||
assert sse_line.endswith("\n\n")
|
||||
@@ -13,8 +13,10 @@ from exo.shared.types.events import (
|
||||
InstanceDeleted,
|
||||
NodeCreated,
|
||||
NodeDownloadProgress,
|
||||
NodeIdentityMeasured,
|
||||
NodeMemoryMeasured,
|
||||
NodePerformanceMeasured,
|
||||
NodeNetworkMeasured,
|
||||
NodeSystemMeasured,
|
||||
NodeTimedOut,
|
||||
RunnerDeleted,
|
||||
RunnerStatusUpdated,
|
||||
@@ -27,7 +29,13 @@ from exo.shared.types.events import (
|
||||
TopologyEdgeCreated,
|
||||
TopologyEdgeDeleted,
|
||||
)
|
||||
from exo.shared.types.profiling import NodePerformanceProfile, SystemPerformanceProfile
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryPerformanceProfile,
|
||||
NetworkInterfaceInfo,
|
||||
NodeIdentity,
|
||||
NodePerformanceProfile,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import Task, TaskId, TaskStatus
|
||||
from exo.shared.types.topology import NodeInfo
|
||||
@@ -51,8 +59,12 @@ def event_apply(event: Event, state: State) -> State:
|
||||
return apply_topology_node_created(event, state)
|
||||
case NodeTimedOut():
|
||||
return apply_node_timed_out(event, state)
|
||||
case NodePerformanceMeasured():
|
||||
return apply_node_performance_measured(event, state)
|
||||
case NodeIdentityMeasured():
|
||||
return apply_node_identity_measured(event, state)
|
||||
case NodeSystemMeasured():
|
||||
return apply_node_system_measured(event, state)
|
||||
case NodeNetworkMeasured():
|
||||
return apply_node_network_measured(event, state)
|
||||
case NodeDownloadProgress():
|
||||
return apply_node_download_progress(event, state)
|
||||
case NodeMemoryMeasured():
|
||||
@@ -190,8 +202,19 @@ def apply_runner_deleted(event: RunnerDeleted, state: State) -> State:
|
||||
def apply_node_timed_out(event: NodeTimedOut, state: State) -> State:
|
||||
topology = copy.copy(state.topology)
|
||||
state.topology.remove_node(event.node_id)
|
||||
node_profiles = {
|
||||
key: value for key, value in state.node_profiles.items() if key != event.node_id
|
||||
node_identities = {
|
||||
key: value
|
||||
for key, value in state.node_identities.items()
|
||||
if key != event.node_id
|
||||
}
|
||||
node_memories = {
|
||||
key: value for key, value in state.node_memories.items() if key != event.node_id
|
||||
}
|
||||
node_systems = {
|
||||
key: value for key, value in state.node_systems.items() if key != event.node_id
|
||||
}
|
||||
node_networks = {
|
||||
key: value for key, value in state.node_networks.items() if key != event.node_id
|
||||
}
|
||||
last_seen = {
|
||||
key: value for key, value in state.last_seen.items() if key != event.node_id
|
||||
@@ -199,32 +222,120 @@ def apply_node_timed_out(event: NodeTimedOut, state: State) -> State:
|
||||
return state.model_copy(
|
||||
update={
|
||||
"topology": topology,
|
||||
"node_profiles": node_profiles,
|
||||
"node_identities": node_identities,
|
||||
"node_memories": node_memories,
|
||||
"node_systems": node_systems,
|
||||
"node_networks": node_networks,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def apply_node_performance_measured(
|
||||
event: NodePerformanceMeasured, state: State
|
||||
) -> State:
|
||||
new_profiles: Mapping[NodeId, NodePerformanceProfile] = {
|
||||
**state.node_profiles,
|
||||
event.node_id: event.node_profile,
|
||||
def _reconstruct_profile(
|
||||
node_id: NodeId,
|
||||
state: State,
|
||||
*,
|
||||
identity: NodeIdentity | None = None,
|
||||
memory: MemoryPerformanceProfile | None = None,
|
||||
system: SystemPerformanceProfile | None = None,
|
||||
network_interfaces: list[NetworkInterfaceInfo] | None = None,
|
||||
) -> NodePerformanceProfile:
|
||||
"""Reconstruct a NodePerformanceProfile from split state storage.
|
||||
|
||||
Uses provided overrides, falling back to state values.
|
||||
"""
|
||||
ident = identity or state.node_identities.get(node_id)
|
||||
mem = memory or state.node_memories.get(node_id)
|
||||
sys = system or state.node_systems.get(node_id)
|
||||
nets = (
|
||||
network_interfaces
|
||||
if network_interfaces is not None
|
||||
else state.node_networks.get(node_id, [])
|
||||
)
|
||||
|
||||
return NodePerformanceProfile(
|
||||
model_id=ident.model_id if ident else None,
|
||||
chip_id=ident.chip_id if ident else None,
|
||||
friendly_name=ident.friendly_name if ident else None,
|
||||
memory=mem,
|
||||
network_interfaces=nets,
|
||||
system=sys,
|
||||
)
|
||||
|
||||
|
||||
def apply_node_identity_measured(event: NodeIdentityMeasured, state: State) -> State:
|
||||
topology = copy.copy(state.topology)
|
||||
|
||||
identity = NodeIdentity(
|
||||
model_id=event.model_id,
|
||||
chip_id=event.chip_id,
|
||||
friendly_name=event.friendly_name,
|
||||
)
|
||||
new_identities: Mapping[NodeId, NodeIdentity] = {
|
||||
**state.node_identities,
|
||||
event.node_id: identity,
|
||||
}
|
||||
last_seen: Mapping[NodeId, datetime] = {
|
||||
**state.last_seen,
|
||||
event.node_id: datetime.fromisoformat(event.when),
|
||||
}
|
||||
state = state.model_copy(update={"node_profiles": new_profiles})
|
||||
topology = copy.copy(state.topology)
|
||||
# TODO: NodeCreated
|
||||
if not topology.contains_node(event.node_id):
|
||||
topology.add_node(NodeInfo(node_id=event.node_id))
|
||||
topology.update_node_profile(event.node_id, event.node_profile)
|
||||
reconstructed = _reconstruct_profile(event.node_id, state, identity=identity)
|
||||
topology.update_node_profile(event.node_id, reconstructed)
|
||||
return state.model_copy(
|
||||
update={
|
||||
"node_profiles": new_profiles,
|
||||
"node_identities": new_identities,
|
||||
"topology": topology,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def apply_node_system_measured(event: NodeSystemMeasured, state: State) -> State:
|
||||
topology = copy.copy(state.topology)
|
||||
|
||||
new_systems: Mapping[NodeId, SystemPerformanceProfile] = {
|
||||
**state.node_systems,
|
||||
event.node_id: event.system,
|
||||
}
|
||||
last_seen: Mapping[NodeId, datetime] = {
|
||||
**state.last_seen,
|
||||
event.node_id: datetime.fromisoformat(event.when),
|
||||
}
|
||||
if not topology.contains_node(event.node_id):
|
||||
topology.add_node(NodeInfo(node_id=event.node_id))
|
||||
reconstructed = _reconstruct_profile(event.node_id, state, system=event.system)
|
||||
topology.update_node_profile(event.node_id, reconstructed)
|
||||
return state.model_copy(
|
||||
update={
|
||||
"node_systems": new_systems,
|
||||
"topology": topology,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def apply_node_network_measured(event: NodeNetworkMeasured, state: State) -> State:
|
||||
topology = copy.copy(state.topology)
|
||||
|
||||
new_networks: Mapping[NodeId, list[NetworkInterfaceInfo]] = {
|
||||
**state.node_networks,
|
||||
event.node_id: event.network_interfaces,
|
||||
}
|
||||
last_seen: Mapping[NodeId, datetime] = {
|
||||
**state.last_seen,
|
||||
event.node_id: datetime.fromisoformat(event.when),
|
||||
}
|
||||
if not topology.contains_node(event.node_id):
|
||||
topology.add_node(NodeInfo(node_id=event.node_id))
|
||||
reconstructed = _reconstruct_profile(
|
||||
event.node_id, state, network_interfaces=event.network_interfaces
|
||||
)
|
||||
topology.update_node_profile(event.node_id, reconstructed)
|
||||
return state.model_copy(
|
||||
update={
|
||||
"node_networks": new_networks,
|
||||
"topology": topology,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
@@ -232,57 +343,26 @@ def apply_node_performance_measured(
|
||||
|
||||
|
||||
def apply_node_memory_measured(event: NodeMemoryMeasured, state: State) -> State:
|
||||
existing = state.node_profiles.get(event.node_id)
|
||||
topology = copy.copy(state.topology)
|
||||
|
||||
if existing is None:
|
||||
created = NodePerformanceProfile(
|
||||
model_id="unknown",
|
||||
chip_id="unknown",
|
||||
friendly_name="Unknown",
|
||||
memory=event.memory,
|
||||
network_interfaces=[],
|
||||
system=SystemPerformanceProfile(
|
||||
# TODO: flops_fp16=0.0,
|
||||
gpu_usage=0.0,
|
||||
temp=0.0,
|
||||
sys_power=0.0,
|
||||
pcpu_usage=0.0,
|
||||
ecpu_usage=0.0,
|
||||
ane_power=0.0,
|
||||
),
|
||||
)
|
||||
created_profiles: Mapping[NodeId, NodePerformanceProfile] = {
|
||||
**state.node_profiles,
|
||||
event.node_id: created,
|
||||
}
|
||||
last_seen: Mapping[NodeId, datetime] = {
|
||||
**state.last_seen,
|
||||
event.node_id: datetime.fromisoformat(event.when),
|
||||
}
|
||||
if not topology.contains_node(event.node_id):
|
||||
topology.add_node(NodeInfo(node_id=event.node_id))
|
||||
# TODO: NodeCreated
|
||||
topology.update_node_profile(event.node_id, created)
|
||||
return state.model_copy(
|
||||
update={
|
||||
"node_profiles": created_profiles,
|
||||
"topology": topology,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
)
|
||||
|
||||
updated = existing.model_copy(update={"memory": event.memory})
|
||||
updated_profiles: Mapping[NodeId, NodePerformanceProfile] = {
|
||||
**state.node_profiles,
|
||||
event.node_id: updated,
|
||||
new_memories: Mapping[NodeId, MemoryPerformanceProfile] = {
|
||||
**state.node_memories,
|
||||
event.node_id: event.memory,
|
||||
}
|
||||
last_seen: Mapping[NodeId, datetime] = {
|
||||
**state.last_seen,
|
||||
event.node_id: datetime.fromisoformat(event.when),
|
||||
}
|
||||
# TODO: NodeCreated
|
||||
if not topology.contains_node(event.node_id):
|
||||
topology.add_node(NodeInfo(node_id=event.node_id))
|
||||
topology.update_node_profile(event.node_id, updated)
|
||||
reconstructed = _reconstruct_profile(event.node_id, state, memory=event.memory)
|
||||
topology.update_node_profile(event.node_id, reconstructed)
|
||||
return state.model_copy(
|
||||
update={"node_profiles": updated_profiles, "topology": topology}
|
||||
update={
|
||||
"node_memories": new_memories,
|
||||
"topology": topology,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -157,13 +157,10 @@ class ChatCompletionTaskParams(BaseModel):
|
||||
stream: bool = False
|
||||
temperature: float | None = None
|
||||
top_p: float | None = None
|
||||
top_k: int | None = None
|
||||
tools: list[dict[str, Any]] | None = None
|
||||
tool_choice: str | dict[str, Any] | None = None
|
||||
parallel_tool_calls: bool | None = None
|
||||
user: str | None = None
|
||||
# When True, continue the last assistant message without EOS tokens
|
||||
continue_from_prefix: bool = False
|
||||
|
||||
|
||||
class BenchChatCompletionTaskParams(ChatCompletionTaskParams):
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from enum import Enum
|
||||
|
||||
from exo.shared.types.api import GenerationStats, TopLogprobItem
|
||||
from exo.shared.types.api import GenerationStats
|
||||
from exo.utils.pydantic_ext import TaggedModel
|
||||
|
||||
from .api import FinishReason
|
||||
@@ -20,8 +20,6 @@ class BaseChunk(TaggedModel):
|
||||
class TokenChunk(BaseChunk):
|
||||
text: str
|
||||
token_id: int
|
||||
logprob: float | None = None # Log probability of the selected token
|
||||
top_logprobs: list[TopLogprobItem] | None = None # Top-k alternative tokens
|
||||
finish_reason: FinishReason | None = None
|
||||
stats: GenerationStats | None = None
|
||||
error_message: str | None = None
|
||||
|
||||
@@ -1,168 +0,0 @@
|
||||
"""Claude Messages API types for request/response conversion."""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Type aliases
|
||||
ClaudeRole = Literal["user", "assistant"]
|
||||
ClaudeStopReason = Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]
|
||||
|
||||
|
||||
# Content block types
|
||||
class ClaudeTextBlock(BaseModel, frozen=True):
|
||||
"""Text content block in Claude Messages API."""
|
||||
|
||||
type: Literal["text"] = "text"
|
||||
text: str
|
||||
|
||||
|
||||
class ClaudeImageSource(BaseModel, frozen=True):
|
||||
"""Image source for Claude image blocks."""
|
||||
|
||||
type: Literal["base64", "url"]
|
||||
media_type: str | None = None
|
||||
data: str | None = None
|
||||
url: str | None = None
|
||||
|
||||
|
||||
class ClaudeImageBlock(BaseModel, frozen=True):
|
||||
"""Image content block in Claude Messages API."""
|
||||
|
||||
type: Literal["image"] = "image"
|
||||
source: ClaudeImageSource
|
||||
|
||||
|
||||
ClaudeContentBlock = ClaudeTextBlock | ClaudeImageBlock
|
||||
|
||||
|
||||
# Request types
|
||||
class ClaudeMessage(BaseModel, frozen=True):
|
||||
"""Message in Claude Messages API request."""
|
||||
|
||||
role: ClaudeRole
|
||||
content: str | list[ClaudeContentBlock]
|
||||
|
||||
|
||||
class ClaudeMessagesRequest(BaseModel):
|
||||
"""Request body for Claude Messages API."""
|
||||
|
||||
model: str
|
||||
max_tokens: int
|
||||
messages: list[ClaudeMessage]
|
||||
system: str | list[ClaudeTextBlock] | None = None
|
||||
stop_sequences: list[str] | None = None
|
||||
stream: bool = False
|
||||
temperature: float | None = None
|
||||
top_p: float | None = None
|
||||
top_k: int | None = None
|
||||
metadata: dict[str, str] | None = None
|
||||
|
||||
|
||||
# Response types
|
||||
class ClaudeUsage(BaseModel, frozen=True):
|
||||
"""Token usage in Claude Messages API response."""
|
||||
|
||||
input_tokens: int
|
||||
output_tokens: int
|
||||
|
||||
|
||||
class ClaudeMessagesResponse(BaseModel, frozen=True):
|
||||
"""Response body for Claude Messages API."""
|
||||
|
||||
id: str
|
||||
type: Literal["message"] = "message"
|
||||
role: Literal["assistant"] = "assistant"
|
||||
content: list[ClaudeTextBlock]
|
||||
model: str
|
||||
stop_reason: ClaudeStopReason | None = None
|
||||
stop_sequence: str | None = None
|
||||
usage: ClaudeUsage
|
||||
|
||||
|
||||
# Streaming event types
|
||||
class ClaudeMessageStart(BaseModel, frozen=True):
|
||||
"""Partial message in message_start event."""
|
||||
|
||||
id: str
|
||||
type: Literal["message"] = "message"
|
||||
role: Literal["assistant"] = "assistant"
|
||||
content: list[ClaudeTextBlock] = Field(default_factory=list)
|
||||
model: str
|
||||
stop_reason: ClaudeStopReason | None = None
|
||||
stop_sequence: str | None = None
|
||||
usage: ClaudeUsage
|
||||
|
||||
|
||||
class ClaudeMessageStartEvent(BaseModel, frozen=True):
|
||||
"""Event sent at start of message stream."""
|
||||
|
||||
type: Literal["message_start"] = "message_start"
|
||||
message: ClaudeMessageStart
|
||||
|
||||
|
||||
class ClaudeContentBlockStartEvent(BaseModel, frozen=True):
|
||||
"""Event sent at start of a content block."""
|
||||
|
||||
type: Literal["content_block_start"] = "content_block_start"
|
||||
index: int
|
||||
content_block: ClaudeTextBlock
|
||||
|
||||
|
||||
class ClaudeTextDelta(BaseModel, frozen=True):
|
||||
"""Delta for text content block."""
|
||||
|
||||
type: Literal["text_delta"] = "text_delta"
|
||||
text: str
|
||||
|
||||
|
||||
class ClaudeContentBlockDeltaEvent(BaseModel, frozen=True):
|
||||
"""Event sent for content block delta."""
|
||||
|
||||
type: Literal["content_block_delta"] = "content_block_delta"
|
||||
index: int
|
||||
delta: ClaudeTextDelta
|
||||
|
||||
|
||||
class ClaudeContentBlockStopEvent(BaseModel, frozen=True):
|
||||
"""Event sent at end of a content block."""
|
||||
|
||||
type: Literal["content_block_stop"] = "content_block_stop"
|
||||
index: int
|
||||
|
||||
|
||||
class ClaudeMessageDeltaUsage(BaseModel, frozen=True):
|
||||
"""Usage in message_delta event."""
|
||||
|
||||
output_tokens: int
|
||||
|
||||
|
||||
class ClaudeMessageDelta(BaseModel, frozen=True):
|
||||
"""Delta in message_delta event."""
|
||||
|
||||
stop_reason: ClaudeStopReason | None = None
|
||||
stop_sequence: str | None = None
|
||||
|
||||
|
||||
class ClaudeMessageDeltaEvent(BaseModel, frozen=True):
|
||||
"""Event sent with final message delta."""
|
||||
|
||||
type: Literal["message_delta"] = "message_delta"
|
||||
delta: ClaudeMessageDelta
|
||||
usage: ClaudeMessageDeltaUsage
|
||||
|
||||
|
||||
class ClaudeMessageStopEvent(BaseModel, frozen=True):
|
||||
"""Event sent at end of message stream."""
|
||||
|
||||
type: Literal["message_stop"] = "message_stop"
|
||||
|
||||
|
||||
ClaudeStreamEvent = (
|
||||
ClaudeMessageStartEvent
|
||||
| ClaudeContentBlockStartEvent
|
||||
| ClaudeContentBlockDeltaEvent
|
||||
| ClaudeContentBlockStopEvent
|
||||
| ClaudeMessageDeltaEvent
|
||||
| ClaudeMessageStopEvent
|
||||
)
|
||||
@@ -1,8 +1,8 @@
|
||||
from pydantic import Field
|
||||
|
||||
from exo.shared.types.api import ChatCompletionTaskParams
|
||||
from exo.shared.types.common import CommandId, NodeId
|
||||
from exo.shared.types.models import ModelMetadata
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
|
||||
from exo.shared.types.worker.shards import Sharding
|
||||
from exo.utils.pydantic_ext import CamelCaseModel, TaggedModel
|
||||
@@ -17,7 +17,7 @@ class TestCommand(BaseCommand):
|
||||
|
||||
|
||||
class ChatCompletion(BaseCommand):
|
||||
request_params: ResponsesRequest
|
||||
request_params: ChatCompletionTaskParams
|
||||
|
||||
|
||||
class PlaceInstance(BaseCommand):
|
||||
|
||||
@@ -2,10 +2,14 @@ from datetime import datetime
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from exo.shared.topology import Connection, NodePerformanceProfile
|
||||
from exo.shared.topology import Connection
|
||||
from exo.shared.types.chunks import GenerationChunk
|
||||
from exo.shared.types.common import CommandId, Id, NodeId, SessionId
|
||||
from exo.shared.types.profiling import MemoryPerformanceProfile
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryPerformanceProfile,
|
||||
NetworkInterfaceInfo,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.tasks import Task, TaskId, TaskStatus
|
||||
from exo.shared.types.worker.downloads import DownloadProgress
|
||||
from exo.shared.types.worker.instances import Instance, InstanceId
|
||||
@@ -85,13 +89,35 @@ class NodeTimedOut(BaseEvent):
|
||||
node_id: NodeId
|
||||
|
||||
|
||||
class NodePerformanceMeasured(BaseEvent):
|
||||
class NodeIdentityMeasured(BaseEvent):
|
||||
"""Static identity info - emitted once at startup."""
|
||||
|
||||
node_id: NodeId
|
||||
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
|
||||
node_profile: NodePerformanceProfile
|
||||
model_id: str
|
||||
chip_id: str
|
||||
friendly_name: str
|
||||
|
||||
|
||||
class NodeSystemMeasured(BaseEvent):
|
||||
"""Dynamic system metrics (GPU, temp, power) - emitted at 1s intervals."""
|
||||
|
||||
node_id: NodeId
|
||||
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
|
||||
system: SystemPerformanceProfile
|
||||
|
||||
|
||||
class NodeNetworkMeasured(BaseEvent):
|
||||
"""Semi-static network interface info - emitted at 30s intervals."""
|
||||
|
||||
node_id: NodeId
|
||||
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
|
||||
network_interfaces: list[NetworkInterfaceInfo]
|
||||
|
||||
|
||||
class NodeMemoryMeasured(BaseEvent):
|
||||
"""Dynamic memory metrics - emitted at 0.5s intervals."""
|
||||
|
||||
node_id: NodeId
|
||||
when: str # this is a manually cast datetime overrode by the master when the event is indexed, rather than the local time on the device
|
||||
memory: MemoryPerformanceProfile
|
||||
@@ -127,7 +153,9 @@ Event = (
|
||||
| RunnerDeleted
|
||||
| NodeCreated
|
||||
| NodeTimedOut
|
||||
| NodePerformanceMeasured
|
||||
| NodeIdentityMeasured
|
||||
| NodeSystemMeasured
|
||||
| NodeNetworkMeasured
|
||||
| NodeMemoryMeasured
|
||||
| NodeDownloadProgress
|
||||
| ChunkGenerated
|
||||
|
||||
@@ -1,190 +0,0 @@
|
||||
"""OpenAI Responses API types for request/response conversion.
|
||||
|
||||
ResponsesRequest serves as both:
|
||||
1. The external API request type for /v1/responses
|
||||
2. The canonical internal type used throughout the inference pipeline
|
||||
|
||||
All external API formats (Chat Completions, Claude) are converted to
|
||||
ResponsesRequest at the API boundary.
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Type aliases
|
||||
ResponseStatus = Literal["completed", "failed", "in_progress", "incomplete"]
|
||||
ResponseRole = Literal["user", "assistant", "system", "developer"]
|
||||
|
||||
|
||||
# Request types
|
||||
class ResponseInputMessage(BaseModel, frozen=True):
|
||||
"""Input message for Responses API.
|
||||
|
||||
This is also used as the internal message format throughout the pipeline.
|
||||
"""
|
||||
|
||||
role: ResponseRole
|
||||
content: str
|
||||
|
||||
|
||||
class ResponsesRequest(BaseModel):
|
||||
"""Request body for OpenAI Responses API.
|
||||
|
||||
This is also the canonical internal task params format used throughout
|
||||
the inference pipeline. All external API formats are converted to this
|
||||
format at the API boundary.
|
||||
|
||||
Field mapping from other APIs:
|
||||
- input: Replaces 'messages' from Chat Completions
|
||||
- instructions: System message, extracted from messages or Claude's 'system'
|
||||
- max_output_tokens: Replaces 'max_tokens' from Chat Completions
|
||||
"""
|
||||
|
||||
model: str
|
||||
input: str | list[ResponseInputMessage]
|
||||
instructions: str | None = None
|
||||
max_output_tokens: int | None = None
|
||||
temperature: float | None = None
|
||||
top_p: float | None = None
|
||||
top_k: int | None = None
|
||||
stop: str | list[str] | None = None
|
||||
seed: int | None = None
|
||||
stream: bool = False
|
||||
# Tools support
|
||||
tools: list[dict[str, Any]] | None = None
|
||||
# previous_response_id not supported in MVP
|
||||
metadata: dict[str, str] | None = None
|
||||
# When True, continue the last assistant message without EOS tokens
|
||||
continue_from_prefix: bool = False
|
||||
|
||||
|
||||
# Response types
|
||||
class ResponseOutputText(BaseModel, frozen=True):
|
||||
"""Text content in response output."""
|
||||
|
||||
type: Literal["output_text"] = "output_text"
|
||||
text: str
|
||||
annotations: list[dict[str, str]] = Field(default_factory=list)
|
||||
|
||||
|
||||
class ResponseMessageItem(BaseModel, frozen=True):
|
||||
"""Message item in response output array."""
|
||||
|
||||
type: Literal["message"] = "message"
|
||||
id: str
|
||||
role: Literal["assistant"] = "assistant"
|
||||
content: list[ResponseOutputText]
|
||||
status: ResponseStatus = "completed"
|
||||
|
||||
|
||||
ResponseItem = ResponseMessageItem # Can expand for function_call, reasoning, etc.
|
||||
|
||||
|
||||
class ResponseUsage(BaseModel, frozen=True):
|
||||
"""Token usage in Responses API response."""
|
||||
|
||||
input_tokens: int
|
||||
output_tokens: int
|
||||
total_tokens: int
|
||||
|
||||
|
||||
class ResponsesResponse(BaseModel, frozen=True):
|
||||
"""Response body for OpenAI Responses API."""
|
||||
|
||||
id: str
|
||||
object: Literal["response"] = "response"
|
||||
created_at: int = Field(default_factory=lambda: int(time.time()))
|
||||
status: ResponseStatus = "completed"
|
||||
model: str
|
||||
output: list[ResponseItem]
|
||||
output_text: str
|
||||
usage: ResponseUsage | None = None
|
||||
|
||||
|
||||
# Streaming event types
|
||||
class ResponseCreatedEvent(BaseModel, frozen=True):
|
||||
"""Event sent when response is created."""
|
||||
|
||||
type: Literal["response.created"] = "response.created"
|
||||
response: ResponsesResponse
|
||||
|
||||
|
||||
class ResponseInProgressEvent(BaseModel, frozen=True):
|
||||
"""Event sent when response starts processing."""
|
||||
|
||||
type: Literal["response.in_progress"] = "response.in_progress"
|
||||
response: ResponsesResponse
|
||||
|
||||
|
||||
class ResponseOutputItemAddedEvent(BaseModel, frozen=True):
|
||||
"""Event sent when an output item is added."""
|
||||
|
||||
type: Literal["response.output_item.added"] = "response.output_item.added"
|
||||
output_index: int
|
||||
item: ResponseItem
|
||||
|
||||
|
||||
class ResponseContentPartAddedEvent(BaseModel, frozen=True):
|
||||
"""Event sent when a content part is added."""
|
||||
|
||||
type: Literal["response.content_part.added"] = "response.content_part.added"
|
||||
output_index: int
|
||||
content_index: int
|
||||
part: ResponseOutputText
|
||||
|
||||
|
||||
class ResponseTextDeltaEvent(BaseModel, frozen=True):
|
||||
"""Event sent for text delta during streaming."""
|
||||
|
||||
type: Literal["response.output_text.delta"] = "response.output_text.delta"
|
||||
output_index: int
|
||||
content_index: int
|
||||
delta: str
|
||||
|
||||
|
||||
class ResponseTextDoneEvent(BaseModel, frozen=True):
|
||||
"""Event sent when text content is done."""
|
||||
|
||||
type: Literal["response.output_text.done"] = "response.output_text.done"
|
||||
output_index: int
|
||||
content_index: int
|
||||
text: str
|
||||
|
||||
|
||||
class ResponseContentPartDoneEvent(BaseModel, frozen=True):
|
||||
"""Event sent when a content part is done."""
|
||||
|
||||
type: Literal["response.content_part.done"] = "response.content_part.done"
|
||||
output_index: int
|
||||
content_index: int
|
||||
part: ResponseOutputText
|
||||
|
||||
|
||||
class ResponseOutputItemDoneEvent(BaseModel, frozen=True):
|
||||
"""Event sent when an output item is done."""
|
||||
|
||||
type: Literal["response.output_item.done"] = "response.output_item.done"
|
||||
output_index: int
|
||||
item: ResponseItem
|
||||
|
||||
|
||||
class ResponseCompletedEvent(BaseModel, frozen=True):
|
||||
"""Event sent when response is completed."""
|
||||
|
||||
type: Literal["response.completed"] = "response.completed"
|
||||
response: ResponsesResponse
|
||||
|
||||
|
||||
ResponsesStreamEvent = (
|
||||
ResponseCreatedEvent
|
||||
| ResponseInProgressEvent
|
||||
| ResponseOutputItemAddedEvent
|
||||
| ResponseContentPartAddedEvent
|
||||
| ResponseTextDeltaEvent
|
||||
| ResponseTextDoneEvent
|
||||
| ResponseContentPartDoneEvent
|
||||
| ResponseOutputItemDoneEvent
|
||||
| ResponseCompletedEvent
|
||||
)
|
||||
@@ -52,13 +52,21 @@ class NetworkInterfaceInfo(CamelCaseModel):
|
||||
ip_address: str
|
||||
|
||||
|
||||
class NodePerformanceProfile(CamelCaseModel):
|
||||
class NodeIdentity(CamelCaseModel):
|
||||
"""Static identity info for a node."""
|
||||
|
||||
model_id: str
|
||||
chip_id: str
|
||||
friendly_name: str
|
||||
memory: MemoryPerformanceProfile
|
||||
|
||||
|
||||
class NodePerformanceProfile(CamelCaseModel):
|
||||
model_id: str | None = None
|
||||
chip_id: str | None = None
|
||||
friendly_name: str | None = None
|
||||
memory: MemoryPerformanceProfile | None = None
|
||||
network_interfaces: list[NetworkInterfaceInfo] = []
|
||||
system: SystemPerformanceProfile
|
||||
system: SystemPerformanceProfile | None = None
|
||||
|
||||
|
||||
class ConnectionProfile(CamelCaseModel):
|
||||
|
||||
@@ -7,7 +7,12 @@ from pydantic.alias_generators import to_camel
|
||||
|
||||
from exo.shared.topology import Topology, TopologySnapshot
|
||||
from exo.shared.types.common import NodeId
|
||||
from exo.shared.types.profiling import NodePerformanceProfile
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryPerformanceProfile,
|
||||
NetworkInterfaceInfo,
|
||||
NodeIdentity,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.tasks import Task, TaskId
|
||||
from exo.shared.types.worker.downloads import DownloadProgress
|
||||
from exo.shared.types.worker.instances import Instance, InstanceId
|
||||
@@ -35,7 +40,10 @@ class State(CamelCaseModel):
|
||||
runners: Mapping[RunnerId, RunnerStatus] = {}
|
||||
downloads: Mapping[NodeId, Sequence[DownloadProgress]] = {}
|
||||
tasks: Mapping[TaskId, Task] = {}
|
||||
node_profiles: Mapping[NodeId, NodePerformanceProfile] = {}
|
||||
node_identities: Mapping[NodeId, NodeIdentity] = {}
|
||||
node_memories: Mapping[NodeId, MemoryPerformanceProfile] = {}
|
||||
node_systems: Mapping[NodeId, SystemPerformanceProfile] = {}
|
||||
node_networks: Mapping[NodeId, list[NetworkInterfaceInfo]] = {}
|
||||
last_seen: Mapping[NodeId, datetime] = {}
|
||||
topology: Topology = Field(default_factory=Topology)
|
||||
last_event_applied_idx: int = Field(default=-1, ge=-1)
|
||||
|
||||
@@ -2,8 +2,8 @@ from enum import Enum
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from exo.shared.types.api import ChatCompletionTaskParams
|
||||
from exo.shared.types.common import CommandId, Id
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.worker.instances import BoundInstance, InstanceId
|
||||
from exo.shared.types.worker.runners import RunnerId
|
||||
from exo.shared.types.worker.shards import ShardMetadata
|
||||
@@ -50,7 +50,7 @@ class StartWarmup(BaseTask): # emitted by Worker
|
||||
|
||||
class ChatCompletion(BaseTask): # emitted by Master
|
||||
command_id: CommandId
|
||||
task_params: ResponsesRequest
|
||||
task_params: ChatCompletionTaskParams
|
||||
|
||||
error_type: str | None = Field(default=None)
|
||||
error_message: str | None = Field(default=None)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from exo.shared.types.api import FinishReason, GenerationStats, TopLogprobItem
|
||||
from exo.shared.types.api import FinishReason, GenerationStats
|
||||
from exo.utils.pydantic_ext import TaggedModel
|
||||
|
||||
|
||||
@@ -13,8 +13,7 @@ class TokenizedResponse(BaseRunnerResponse):
|
||||
class GenerationResponse(BaseRunnerResponse):
|
||||
text: str
|
||||
token: int
|
||||
logprob: float | None = None # Log probability of the selected token
|
||||
top_logprobs: list[TopLogprobItem] | None = None # Top-k alternative tokens
|
||||
# logprobs: list[float] | None = None # too big. we can change to be top-k
|
||||
finish_reason: FinishReason | None = None
|
||||
stats: GenerationStats | None = None
|
||||
|
||||
|
||||
@@ -40,6 +40,4 @@ class TokenizerWrapper:
|
||||
messages_dicts: list[dict[str, Any]],
|
||||
tokenize: bool = False,
|
||||
add_generation_prompt: bool = True,
|
||||
continue_final_message: bool = False,
|
||||
tools: list[dict[str, Any]] | None = None,
|
||||
) -> str: ...
|
||||
|
||||
@@ -8,12 +8,13 @@ from mlx_lm.tokenizer_utils import TokenizerWrapper
|
||||
|
||||
# from exo.engines.mlx.cache import KVPrefixCache
|
||||
from exo.shared.types.api import (
|
||||
BenchChatCompletionTaskParams,
|
||||
ChatCompletionMessage,
|
||||
FinishReason,
|
||||
GenerationStats,
|
||||
TopLogprobItem,
|
||||
)
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.tasks import ChatCompletionTaskParams
|
||||
from exo.shared.types.worker.runner_response import (
|
||||
GenerationResponse,
|
||||
)
|
||||
@@ -52,9 +53,14 @@ def warmup_inference(
|
||||
|
||||
warmup_prompt = apply_chat_template(
|
||||
tokenizer=tokenizer,
|
||||
task_params=ResponsesRequest(
|
||||
chat_task_data=ChatCompletionTaskParams(
|
||||
model="",
|
||||
input=content,
|
||||
messages=[
|
||||
ChatCompletionMessage(
|
||||
role="user",
|
||||
content=content,
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
@@ -112,11 +118,11 @@ def eos_ids_from_tokenizer(tokenizer: TokenizerWrapper) -> list[int]:
|
||||
def mlx_generate(
|
||||
model: Model,
|
||||
tokenizer: TokenizerWrapper,
|
||||
task: ResponsesRequest,
|
||||
is_bench: bool = False,
|
||||
task: ChatCompletionTaskParams,
|
||||
) -> Generator[GenerationResponse]:
|
||||
# Ensure that generation stats only contains peak memory for this generation
|
||||
mx.reset_peak_memory()
|
||||
is_bench: bool = isinstance(task, BenchChatCompletionTaskParams)
|
||||
|
||||
# Currently we support chat-completion tasks only.
|
||||
logger.info(f"task_params: {task}")
|
||||
@@ -126,7 +132,7 @@ def mlx_generate(
|
||||
|
||||
prompt = apply_chat_template(
|
||||
tokenizer=tokenizer,
|
||||
task_params=task,
|
||||
chat_task_data=task,
|
||||
)
|
||||
|
||||
caches = make_kv_cache(model=model)
|
||||
@@ -140,20 +146,9 @@ def mlx_generate(
|
||||
sampler = make_sampler(
|
||||
temp=task.temperature if task.temperature is not None else 0.7,
|
||||
top_p=task.top_p if task.top_p is not None else 1.0,
|
||||
top_k=task.top_k if task.top_k is not None else 0,
|
||||
)
|
||||
|
||||
# Normalize stop sequences to a list
|
||||
stop_sequences: list[str] = (
|
||||
([task.stop] if isinstance(task.stop, str) else task.stop)
|
||||
if task.stop is not None
|
||||
else []
|
||||
)
|
||||
max_stop_len = max((len(s) for s in stop_sequences), default=0)
|
||||
|
||||
max_tokens = task.max_output_tokens or MAX_TOKENS
|
||||
accumulated_text = ""
|
||||
|
||||
max_tokens = task.max_tokens or MAX_TOKENS
|
||||
for out in stream_generate(
|
||||
model=model,
|
||||
tokenizer=tokenizer,
|
||||
@@ -166,34 +161,11 @@ def mlx_generate(
|
||||
prefill_step_size=2048,
|
||||
kv_group_size=KV_GROUP_SIZE,
|
||||
kv_bits=KV_BITS,
|
||||
return_logprob=True,
|
||||
return_top_logprobs=5,
|
||||
):
|
||||
logger.info(out.text)
|
||||
accumulated_text += out.text
|
||||
|
||||
# Check for stop sequences
|
||||
text = out.text
|
||||
finish_reason: FinishReason | None = cast(
|
||||
FinishReason | None, out.finish_reason
|
||||
)
|
||||
stop_matched = False
|
||||
|
||||
if stop_sequences:
|
||||
for stop_seq in stop_sequences:
|
||||
if stop_seq in accumulated_text:
|
||||
# Trim text to just before the stop sequence
|
||||
stop_index = accumulated_text.find(stop_seq)
|
||||
text_before_stop = accumulated_text[:stop_index]
|
||||
chunk_start = len(accumulated_text) - len(out.text)
|
||||
text = text_before_stop[chunk_start:]
|
||||
finish_reason = "stop"
|
||||
stop_matched = True
|
||||
break
|
||||
|
||||
is_done = finish_reason is not None
|
||||
stats: GenerationStats | None = None
|
||||
if is_done:
|
||||
if out.finish_reason is not None:
|
||||
stats = GenerationStats(
|
||||
prompt_tps=float(out.prompt_tps),
|
||||
generation_tps=float(out.generation_tps),
|
||||
@@ -201,41 +173,22 @@ def mlx_generate(
|
||||
generation_tokens=int(out.generation_tokens),
|
||||
peak_memory_usage=Memory.from_gb(out.peak_memory),
|
||||
)
|
||||
if not stop_matched and out.finish_reason not in get_args(FinishReason):
|
||||
|
||||
if out.finish_reason not in get_args(FinishReason):
|
||||
# We don't throw here as this failure case is really not all that bad
|
||||
# Just log the error and move on
|
||||
logger.warning(
|
||||
f"Model generated unexpected finish_reason: {out.finish_reason}"
|
||||
)
|
||||
|
||||
# Extract logprobs if available
|
||||
logprob: float | None = getattr(out, "logprob", None)
|
||||
top_logprobs_raw: list[tuple[int, float]] | None = getattr(
|
||||
out, "top_logprobs", None
|
||||
)
|
||||
|
||||
top_logprobs: list[TopLogprobItem] | None = None
|
||||
if top_logprobs_raw is not None:
|
||||
top_logprobs = [
|
||||
TopLogprobItem(
|
||||
token=text if i == 0 else tokenizer.decode([tok_id]),
|
||||
logprob=float(lp),
|
||||
)
|
||||
for i, (tok_id, lp) in enumerate(top_logprobs_raw)
|
||||
]
|
||||
|
||||
yield GenerationResponse(
|
||||
text=text,
|
||||
text=out.text,
|
||||
token=out.token,
|
||||
logprob=logprob,
|
||||
top_logprobs=top_logprobs,
|
||||
finish_reason=finish_reason,
|
||||
finish_reason=cast(FinishReason | None, out.finish_reason),
|
||||
stats=stats,
|
||||
)
|
||||
|
||||
if is_done:
|
||||
if out.finish_reason is not None:
|
||||
break
|
||||
|
||||
# Limit accumulated_text to what's needed for stop sequence detection
|
||||
if max_stop_len > 0 and len(accumulated_text) > max_stop_len:
|
||||
accumulated_text = accumulated_text[-max_stop_len:]
|
||||
|
||||
# TODO: Do we want an mx_barrier?
|
||||
|
||||
@@ -42,9 +42,10 @@ import mlx.nn as nn
|
||||
from mlx_lm.utils import load_model
|
||||
from pydantic import RootModel
|
||||
|
||||
from exo.shared.types.api import ChatCompletionMessageText
|
||||
from exo.shared.types.common import Host
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.tasks import ChatCompletionTaskParams
|
||||
from exo.shared.types.worker.instances import (
|
||||
BoundInstance,
|
||||
MlxJacclInstance,
|
||||
@@ -389,54 +390,36 @@ def load_tokenizer_for_model_id(model_id: str, model_path: Path) -> TokenizerWra
|
||||
|
||||
def apply_chat_template(
|
||||
tokenizer: TokenizerWrapper,
|
||||
task_params: ResponsesRequest,
|
||||
chat_task_data: ChatCompletionTaskParams,
|
||||
) -> str:
|
||||
"""Convert ResponsesRequest to a chat template prompt.
|
||||
# Now we can properly access the messages
|
||||
messages = chat_task_data.messages
|
||||
|
||||
Converts the internal format (input + instructions) to a messages list
|
||||
that can be processed by the tokenizer's chat template.
|
||||
"""
|
||||
formatted_messages: list[dict[str, Any]] = []
|
||||
|
||||
# Add system message (instructions) if present
|
||||
if task_params.instructions:
|
||||
formatted_messages.append(
|
||||
{"role": "system", "content": task_params.instructions}
|
||||
)
|
||||
|
||||
# Convert input to messages
|
||||
if isinstance(task_params.input, str):
|
||||
# Simple string input becomes a single user message
|
||||
formatted_messages.append({"role": "user", "content": task_params.input})
|
||||
else:
|
||||
# List of InputMessage
|
||||
for msg in task_params.input:
|
||||
if not msg.content:
|
||||
logger.warning("Received message with empty content, skipping")
|
||||
for message in messages:
|
||||
if isinstance(message.content, ChatCompletionMessageText):
|
||||
message.content = message.content.text
|
||||
if isinstance(message.content, list):
|
||||
if len(message.content) == 0:
|
||||
logger.warning("Received prompt with no content, skipping")
|
||||
continue
|
||||
formatted_messages.append({"role": msg.role, "content": msg.content})
|
||||
|
||||
# Use continue_final_message when continuing from prefix (e.g., regenerate from token)
|
||||
# This keeps the final assistant message open without EOS tokens
|
||||
# Note: explicitly set add_generation_prompt=False when using continue_final_message
|
||||
# because some tokenizers (e.g., Kimi) default add_generation_prompt=True
|
||||
prompt: str
|
||||
if task_params.continue_from_prefix:
|
||||
prompt = tokenizer.apply_chat_template(
|
||||
formatted_messages,
|
||||
tokenize=False,
|
||||
continue_final_message=True,
|
||||
add_generation_prompt=False,
|
||||
tools=task_params.tools,
|
||||
)
|
||||
else:
|
||||
prompt = tokenizer.apply_chat_template(
|
||||
formatted_messages,
|
||||
tokenize=False,
|
||||
add_generation_prompt=True,
|
||||
tools=task_params.tools,
|
||||
message.content = "\n".join(c.text for c in message.content).strip()
|
||||
if message.content is None and message.thinking is None:
|
||||
continue
|
||||
|
||||
# Null values are not valid when applying templates in tokenizer
|
||||
formatted_messages.append(
|
||||
{k: v for k, v in message.model_dump().items() if v is not None} # type: ignore
|
||||
)
|
||||
|
||||
prompt: str = tokenizer.apply_chat_template(
|
||||
formatted_messages,
|
||||
tokenize=False,
|
||||
add_generation_prompt=True,
|
||||
tools=chat_task_data.tools,
|
||||
)
|
||||
|
||||
logger.info(prompt)
|
||||
|
||||
return prompt
|
||||
|
||||
@@ -16,8 +16,10 @@ from exo.shared.types.events import (
|
||||
ForwarderEvent,
|
||||
IndexedEvent,
|
||||
NodeDownloadProgress,
|
||||
NodeIdentityMeasured,
|
||||
NodeMemoryMeasured,
|
||||
NodePerformanceMeasured,
|
||||
NodeNetworkMeasured,
|
||||
NodeSystemMeasured,
|
||||
TaskCreated,
|
||||
TaskStatusUpdated,
|
||||
TopologyEdgeCreated,
|
||||
@@ -25,7 +27,11 @@ from exo.shared.types.events import (
|
||||
)
|
||||
from exo.shared.types.models import ModelId
|
||||
from exo.shared.types.multiaddr import Multiaddr
|
||||
from exo.shared.types.profiling import MemoryPerformanceProfile, NodePerformanceProfile
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryPerformanceProfile,
|
||||
NetworkInterfaceInfo,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
from exo.shared.types.state import State
|
||||
from exo.shared.types.tasks import (
|
||||
CreateRunner,
|
||||
@@ -51,7 +57,13 @@ from exo.worker.download.download_utils import (
|
||||
from exo.worker.download.shard_downloader import RepoDownloadProgress, ShardDownloader
|
||||
from exo.worker.plan import plan
|
||||
from exo.worker.runner.runner_supervisor import RunnerSupervisor
|
||||
from exo.worker.utils import start_polling_memory_metrics, start_polling_node_metrics
|
||||
from exo.worker.utils import (
|
||||
IdentityMetrics,
|
||||
start_polling_identity_metrics,
|
||||
start_polling_memory_metrics,
|
||||
start_polling_network_metrics,
|
||||
start_polling_system_metrics,
|
||||
)
|
||||
from exo.worker.utils.net_profile import check_reachable
|
||||
|
||||
|
||||
@@ -98,37 +110,51 @@ class Worker:
|
||||
async def run(self):
|
||||
logger.info("Starting Worker")
|
||||
|
||||
# TODO: CLEANUP HEADER
|
||||
async def resource_monitor_callback(
|
||||
node_performance_profile: NodePerformanceProfile,
|
||||
) -> None:
|
||||
async def identity_callback(identity: IdentityMetrics) -> None:
|
||||
await self.event_sender.send(
|
||||
NodePerformanceMeasured(
|
||||
NodeIdentityMeasured(
|
||||
node_id=self.node_id,
|
||||
node_profile=node_performance_profile,
|
||||
model_id=identity.model_id,
|
||||
chip_id=identity.chip_id,
|
||||
friendly_name=identity.friendly_name,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
),
|
||||
)
|
||||
|
||||
async def memory_monitor_callback(
|
||||
memory_profile: MemoryPerformanceProfile,
|
||||
) -> None:
|
||||
async def system_callback(system: SystemPerformanceProfile) -> None:
|
||||
await self.event_sender.send(
|
||||
NodeSystemMeasured(
|
||||
node_id=self.node_id,
|
||||
system=system,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
),
|
||||
)
|
||||
|
||||
async def network_callback(interfaces: list[NetworkInterfaceInfo]) -> None:
|
||||
await self.event_sender.send(
|
||||
NodeNetworkMeasured(
|
||||
node_id=self.node_id,
|
||||
network_interfaces=interfaces,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
),
|
||||
)
|
||||
|
||||
async def memory_callback(memory: MemoryPerformanceProfile) -> None:
|
||||
await self.event_sender.send(
|
||||
NodeMemoryMeasured(
|
||||
node_id=self.node_id,
|
||||
memory=memory_profile,
|
||||
memory=memory,
|
||||
when=str(datetime.now(tz=timezone.utc)),
|
||||
)
|
||||
)
|
||||
|
||||
# END CLEANUP
|
||||
|
||||
async with create_task_group() as tg:
|
||||
self._tg = tg
|
||||
tg.start_soon(self.plan_step)
|
||||
tg.start_soon(start_polling_node_metrics, resource_monitor_callback)
|
||||
|
||||
tg.start_soon(start_polling_memory_metrics, memory_monitor_callback)
|
||||
tg.start_soon(start_polling_identity_metrics, identity_callback)
|
||||
tg.start_soon(start_polling_system_metrics, system_callback)
|
||||
tg.start_soon(start_polling_network_metrics, network_callback)
|
||||
tg.start_soon(start_polling_memory_metrics, memory_callback)
|
||||
tg.start_soon(self._emit_existing_download_progress)
|
||||
tg.start_soon(self._connection_message_event_writer)
|
||||
tg.start_soon(self._resend_out_for_delivery)
|
||||
|
||||
@@ -13,6 +13,7 @@ from openai_harmony import ( # pyright: ignore[reportMissingTypeStubs]
|
||||
load_harmony_encoding,
|
||||
)
|
||||
|
||||
from exo.shared.types.api import ChatCompletionMessageText
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.common import CommandId
|
||||
from exo.shared.types.events import (
|
||||
@@ -23,7 +24,6 @@ from exo.shared.types.events import (
|
||||
TaskStatusUpdated,
|
||||
)
|
||||
from exo.shared.types.models import ModelId
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.tasks import (
|
||||
ChatCompletion,
|
||||
ConnectToGroup,
|
||||
@@ -209,7 +209,8 @@ def main(
|
||||
):
|
||||
assert model
|
||||
assert tokenizer
|
||||
_check_for_debug_prompts(task_params)
|
||||
assert task_params.messages[0].content is not None
|
||||
_check_for_debug_prompts(task_params.messages[0].content)
|
||||
|
||||
# Generate responses using the actual MLX generation
|
||||
mlx_generator = mlx_generate(
|
||||
@@ -236,8 +237,6 @@ def main(
|
||||
model=shard_metadata.model_meta.model_id,
|
||||
text=response.text,
|
||||
token_id=response.token,
|
||||
logprob=response.logprob,
|
||||
top_logprobs=response.top_logprobs,
|
||||
finish_reason=response.finish_reason,
|
||||
stats=response.stats,
|
||||
),
|
||||
@@ -316,23 +315,17 @@ EXO_RUNNER_MUST_OOM = "EXO RUNNER MUST OOM"
|
||||
EXO_RUNNER_MUST_TIMEOUT = "EXO RUNNER MUST TIMEOUT"
|
||||
|
||||
|
||||
def _check_for_debug_prompts(task_params: ResponsesRequest) -> None:
|
||||
"""Check for debug prompt triggers in the input.
|
||||
|
||||
Extracts the first user input text and checks for debug triggers.
|
||||
"""
|
||||
prompt: str
|
||||
if isinstance(task_params.input, str):
|
||||
prompt = task_params.input
|
||||
else:
|
||||
# List of InputMessage - get first message content
|
||||
if len(task_params.input) == 0:
|
||||
logger.debug("Empty message list in debug prompt check")
|
||||
def _check_for_debug_prompts(
|
||||
prompt: str | ChatCompletionMessageText | list[ChatCompletionMessageText],
|
||||
):
|
||||
if isinstance(prompt, list):
|
||||
if len(prompt) == 0:
|
||||
logger.debug("Empty message prompt received in debug prompt")
|
||||
return
|
||||
prompt = task_params.input[0].content
|
||||
prompt = prompt[0]
|
||||
|
||||
if not prompt:
|
||||
return
|
||||
if isinstance(prompt, ChatCompletionMessageText):
|
||||
prompt = prompt.text
|
||||
|
||||
if EXO_RUNNER_MUST_FAIL in prompt:
|
||||
logger.info("raising exception")
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from typing import cast
|
||||
|
||||
import exo.worker.plan as plan_mod
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.api import ChatCompletionTaskParams
|
||||
from exo.shared.types.tasks import ChatCompletion, Task, TaskId, TaskStatus
|
||||
from exo.shared.types.worker.instances import BoundInstance, InstanceId
|
||||
from exo.shared.types.worker.runners import (
|
||||
@@ -59,7 +59,7 @@ def test_plan_forwards_pending_chat_completion_when_runner_ready():
|
||||
instance_id=INSTANCE_1_ID,
|
||||
task_status=TaskStatus.Pending,
|
||||
command_id=COMMAND_1_ID,
|
||||
task_params=ResponsesRequest(model=MODEL_A_ID, input=""),
|
||||
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
|
||||
)
|
||||
|
||||
result = plan_mod.plan(
|
||||
@@ -107,7 +107,7 @@ def test_plan_does_not_forward_chat_completion_if_any_runner_not_ready():
|
||||
instance_id=INSTANCE_1_ID,
|
||||
task_status=TaskStatus.Pending,
|
||||
command_id=COMMAND_1_ID,
|
||||
task_params=ResponsesRequest(model=MODEL_A_ID, input=""),
|
||||
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
|
||||
)
|
||||
|
||||
result = plan_mod.plan(
|
||||
@@ -152,7 +152,7 @@ def test_plan_does_not_forward_tasks_for_other_instances():
|
||||
instance_id=other_instance_id,
|
||||
task_status=TaskStatus.Pending,
|
||||
command_id=COMMAND_1_ID,
|
||||
task_params=ResponsesRequest(model=MODEL_A_ID, input=""),
|
||||
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
|
||||
)
|
||||
|
||||
result = plan_mod.plan(
|
||||
@@ -201,7 +201,7 @@ def test_plan_ignores_non_pending_or_non_chat_tasks():
|
||||
instance_id=INSTANCE_1_ID,
|
||||
task_status=TaskStatus.Complete,
|
||||
command_id=COMMAND_1_ID,
|
||||
task_params=ResponsesRequest(model=MODEL_A_ID, input=""),
|
||||
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
|
||||
)
|
||||
|
||||
other_task_id = TaskId("other-task")
|
||||
|
||||
@@ -5,6 +5,7 @@ from typing import Callable
|
||||
import pytest
|
||||
|
||||
import exo.worker.runner.runner as mlx_runner
|
||||
from exo.shared.types.api import ChatCompletionMessage
|
||||
from exo.shared.types.chunks import TokenChunk
|
||||
from exo.shared.types.events import (
|
||||
ChunkGenerated,
|
||||
@@ -13,9 +14,9 @@ from exo.shared.types.events import (
|
||||
TaskAcknowledged,
|
||||
TaskStatusUpdated,
|
||||
)
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.tasks import (
|
||||
ChatCompletion,
|
||||
ChatCompletionTaskParams,
|
||||
ConnectToGroup,
|
||||
LoadModel,
|
||||
Shutdown,
|
||||
@@ -84,11 +85,11 @@ SHUTDOWN_TASK = Shutdown(
|
||||
runner_id=RUNNER_1_ID,
|
||||
)
|
||||
|
||||
CHAT_PARAMS = ResponsesRequest(
|
||||
CHAT_PARAMS = ChatCompletionTaskParams(
|
||||
model=str(MODEL_A_ID),
|
||||
input="hello",
|
||||
messages=[ChatCompletionMessage(role="user", content="hello")],
|
||||
stream=True,
|
||||
max_output_tokens=4,
|
||||
max_tokens=4,
|
||||
temperature=0.0,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,6 +1,15 @@
|
||||
from .profile import start_polling_memory_metrics, start_polling_node_metrics
|
||||
from .profile import (
|
||||
IdentityMetrics,
|
||||
start_polling_identity_metrics,
|
||||
start_polling_memory_metrics,
|
||||
start_polling_network_metrics,
|
||||
start_polling_system_metrics,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"start_polling_node_metrics",
|
||||
"IdentityMetrics",
|
||||
"start_polling_identity_metrics",
|
||||
"start_polling_memory_metrics",
|
||||
"start_polling_network_metrics",
|
||||
"start_polling_system_metrics",
|
||||
]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import platform
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Coroutine
|
||||
|
||||
import anyio
|
||||
@@ -9,7 +10,7 @@ from loguru import logger
|
||||
from exo.shared.types.memory import Memory
|
||||
from exo.shared.types.profiling import (
|
||||
MemoryPerformanceProfile,
|
||||
NodePerformanceProfile,
|
||||
NetworkInterfaceInfo,
|
||||
SystemPerformanceProfile,
|
||||
)
|
||||
|
||||
@@ -27,6 +28,13 @@ from .system_info import (
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class IdentityMetrics:
|
||||
model_id: str
|
||||
chip_id: str
|
||||
friendly_name: str
|
||||
|
||||
|
||||
async def get_metrics_async() -> Metrics | None:
|
||||
"""Return detailed Metrics on macOS or a minimal fallback elsewhere."""
|
||||
|
||||
@@ -67,48 +75,73 @@ async def start_polling_memory_metrics(
|
||||
await anyio.sleep(poll_interval_s)
|
||||
|
||||
|
||||
async def start_polling_node_metrics(
|
||||
callback: Callable[[NodePerformanceProfile], Coroutine[Any, Any, None]],
|
||||
):
|
||||
poll_interval_s = 1.0
|
||||
async def start_polling_identity_metrics(
|
||||
callback: Callable[[IdentityMetrics], Coroutine[Any, Any, None]],
|
||||
*,
|
||||
poll_interval_s: float = 30.0,
|
||||
) -> None:
|
||||
"""Continuously poll and emit identity metrics at 30s intervals."""
|
||||
while True:
|
||||
try:
|
||||
model_id, chip_id = await get_model_and_chip()
|
||||
friendly_name = await get_friendly_name()
|
||||
await callback(
|
||||
IdentityMetrics(
|
||||
model_id=model_id,
|
||||
chip_id=chip_id,
|
||||
friendly_name=friendly_name,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.opt(exception=e).error("Failed to emit identity metrics")
|
||||
finally:
|
||||
await anyio.sleep(poll_interval_s)
|
||||
|
||||
|
||||
async def start_polling_system_metrics(
|
||||
callback: Callable[[SystemPerformanceProfile], Coroutine[Any, Any, None]],
|
||||
*,
|
||||
poll_interval_s: float = 1.0,
|
||||
) -> None:
|
||||
"""Continuously poll and emit system metrics (GPU, temp, power) at 1s intervals."""
|
||||
while True:
|
||||
try:
|
||||
metrics = await get_metrics_async()
|
||||
if metrics is None:
|
||||
return
|
||||
|
||||
network_interfaces = get_network_interfaces()
|
||||
# these awaits could be joined but realistically they should be cached
|
||||
model_id, chip_id = await get_model_and_chip()
|
||||
friendly_name = await get_friendly_name()
|
||||
|
||||
# do the memory profile last to get a fresh reading to not conflict with the other memory profiling loop
|
||||
memory_profile = get_memory_profile()
|
||||
|
||||
await callback(
|
||||
NodePerformanceProfile(
|
||||
model_id=model_id,
|
||||
chip_id=chip_id,
|
||||
friendly_name=friendly_name,
|
||||
network_interfaces=network_interfaces,
|
||||
memory=memory_profile,
|
||||
system=SystemPerformanceProfile(
|
||||
gpu_usage=metrics.gpu_usage[1],
|
||||
temp=metrics.temp.gpu_temp_avg,
|
||||
sys_power=metrics.sys_power,
|
||||
pcpu_usage=metrics.pcpu_usage[1],
|
||||
ecpu_usage=metrics.ecpu_usage[1],
|
||||
ane_power=metrics.ane_power,
|
||||
),
|
||||
SystemPerformanceProfile(
|
||||
gpu_usage=metrics.gpu_usage[1],
|
||||
temp=metrics.temp.gpu_temp_avg,
|
||||
sys_power=metrics.sys_power,
|
||||
pcpu_usage=metrics.pcpu_usage[1],
|
||||
ecpu_usage=metrics.ecpu_usage[1],
|
||||
ane_power=metrics.ane_power,
|
||||
)
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
"[resource_monitor] Operation timed out after 30s, skipping this cycle."
|
||||
"[system_monitor] Operation timed out after 30s, skipping this cycle."
|
||||
)
|
||||
except MacMonError as e:
|
||||
logger.opt(exception=e).error("Resource Monitor encountered error")
|
||||
logger.opt(exception=e).error("System Monitor encountered error")
|
||||
return
|
||||
finally:
|
||||
await anyio.sleep(poll_interval_s)
|
||||
|
||||
|
||||
async def start_polling_network_metrics(
|
||||
callback: Callable[[list[NetworkInterfaceInfo]], Coroutine[Any, Any, None]],
|
||||
*,
|
||||
poll_interval_s: float = 30.0,
|
||||
) -> None:
|
||||
"""Continuously poll and emit network interface info at 30s intervals."""
|
||||
while True:
|
||||
try:
|
||||
network_interfaces = get_network_interfaces()
|
||||
await callback(network_interfaces)
|
||||
except Exception as e:
|
||||
logger.opt(exception=e).error("Network Monitor encountered error")
|
||||
finally:
|
||||
await anyio.sleep(poll_interval_s)
|
||||
|
||||
@@ -13,10 +13,10 @@ from pydantic import BaseModel
|
||||
|
||||
from exo.shared.logging import InterceptLogger, logger_setup
|
||||
from exo.shared.models.model_cards import MODEL_CARDS, ModelId
|
||||
from exo.shared.types.api import ChatCompletionMessage, ChatCompletionTaskParams
|
||||
from exo.shared.types.commands import CommandId
|
||||
from exo.shared.types.common import Host, NodeId
|
||||
from exo.shared.types.events import Event
|
||||
from exo.shared.types.openai_responses import ResponsesRequest
|
||||
from exo.shared.types.tasks import (
|
||||
ChatCompletion,
|
||||
ConnectToGroup,
|
||||
@@ -169,10 +169,16 @@ async def execute_test(test: Tests, instance: Instance, hn: str):
|
||||
send.send(StartWarmup(instance_id=iid))
|
||||
send.send(
|
||||
ChatCompletion(
|
||||
task_params=ResponsesRequest(
|
||||
task_params=ChatCompletionTaskParams(
|
||||
model=test.model_id,
|
||||
instructions="You are a helpful assistant",
|
||||
input="What is the capital of France?",
|
||||
messages=[
|
||||
ChatCompletionMessage(
|
||||
role="system", content="You are a helpful assistant"
|
||||
),
|
||||
ChatCompletionMessage(
|
||||
role="user", content="What is the capital of France?"
|
||||
),
|
||||
],
|
||||
),
|
||||
command_id=CommandId("yo"),
|
||||
instance_id=iid,
|
||||
|
||||
Reference in New Issue
Block a user