Compare commits

...

43 Commits

Author SHA1 Message Date
Alex Cheema
eadb6de1f7 Merge main into uncertainty-visualization branch
Resolve conflicts by keeping main's structure (TextGenerationTaskParams,
tool calling, KV prefix cache, Claude/OpenAI APIs) and surgically adding
the uncertainty visualization features on top:

- Add logprob/top_logprobs fields to GenerationResponse and TokenChunk
- Add extract_top_logprobs() to MLX generator for per-token logprob extraction
- Build Logprobs in chat completions adapter for streaming responses
- Add SSE headers (Cache-Control, Connection, X-Accel-Buffering) to streaming endpoints
- Add TokenHeatmap component and uncertainty toggle in dashboard
- Add logprobs collection in streaming response handler
- Add regenerateFromToken method for re-generation from specific tokens
- Strip token data from localStorage to avoid storage bloat

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 11:33:24 -08:00
Alex Cheema
d826d309b3 chore: gitignore hosts_*.json files (#1343)
## Motivation

`hosts_*.json` files are local host configuration snapshots that
shouldn't be tracked in version control.

## Changes

Added `hosts_*.json` pattern to `.gitignore`.

## Why It Works

The glob pattern `hosts_*.json` matches any file starting with `hosts_`
and ending with `.json` in the repo root.

## Test Plan

### Manual Testing
- Verified that `hosts_*.json` files are ignored by git after this
change.

### Automated Testing
- No automated tests needed for a `.gitignore` change.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 16:14:11 +00:00
Alex Cheema
c3537980bd feat: add Claude Messages API and OpenAI Responses API support (#1167)
## Motivation

Add support for Claude Messages API and OpenAI Responses API to allow
users to interact with exo using these popular API formats. This enables
broader compatibility with existing tooling and SDKs that expect these
API formats.

## Architecture

Adapter logic lives exclusively in the API layer
(`src/exo/master/adapters/`). On the way in, each adapter converts its
API-specific request type (`ChatCompletionRequest`,
`ClaudeMessagesRequest`, `ResponsesRequest`) into
`TextGenerationTaskParams`. On the way out, each adapter converts the
`TokenChunk` stream back into its API-specific response format.
Everything inside the application — commands, worker, runner, event
sourcing — only sees `TextGenerationTaskParams` and `TokenChunk`. No
API-specific types cross the boundary.

```
                          API layer                         │  Application internals
                                                            │
Chat Completions → [adapter] → TextGenerationTaskParams ──→ │ ──→ TextGeneration command → Runner → TokenChunk ──→ │ ──→ [adapter] → ChatCompletionResponse
Claude Messages  → [adapter] → TextGenerationTaskParams ──→ │ ──→ TextGeneration command → Runner → TokenChunk ──→ │ ──→ [adapter] → ClaudeMessagesResponse
Responses API    → [adapter] → TextGenerationTaskParams ──→ │ ──→ TextGeneration command → Runner → TokenChunk ──→ │ ──→ [adapter] → ResponsesResponse
```

## Changes

### New Files
- `src/exo/shared/types/claude_api.py` - Pydantic types for Claude
Messages API
- `src/exo/shared/types/openai_responses.py` - Pydantic types for OpenAI
Responses API
- `src/exo/shared/types/text_generation.py` - Shared
`TextGenerationTaskParams` internal type
- `src/exo/master/adapters/chat_completions.py` - Chat Completions
adapter (streaming/non-streaming)
- `src/exo/master/adapters/claude.py` - Claude Messages adapter
(streaming/non-streaming)
- `src/exo/master/adapters/responses.py` - OpenAI Responses adapter
(streaming/non-streaming)

### Modified Files
- `src/exo/master/api.py` - Refactored to use adapters uniformly for all
endpoints; extracted `_resolve_and_validate_text_model` helper to
deduplicate model validation across all text endpoints; removed ad-hoc
`try/except ValueError` blocks from non-streaming paths

### New Endpoints
- `POST /v1/messages` - Claude Messages API (streaming and
non-streaming)
- `POST /v1/responses` - OpenAI Responses API (streaming and
non-streaming)

## Why It Works

All APIs are implemented as pure conversion adapters at the edge of the
application:
1. Adapter functions in `src/exo/master/adapters/` convert incoming
requests to `TextGenerationTaskParams`
2. `api.py` wraps the params in a `TextGeneration` command and sends it
through the existing command/event flow
3. The worker, runner, and event sourcing layers only handle
`TextGenerationTaskParams` and `TokenChunk` — they have no awareness of
Chat Completions, Claude, or Responses API formats
4. On response, adapter functions convert the `TokenChunk` stream back
to the caller's expected format
5. Model validation is handled by a single shared helper
(`_resolve_and_validate_text_model`), mirroring the existing
`_validate_image_model` pattern for image endpoints

No changes to core inference logic were needed.

### Streaming Formats
- **Chat Completions**: Uses `data: {...}\n\n` with `[DONE]` terminator
- **Claude**: Uses event types `message_start`, `content_block_start`,
`content_block_delta`, `content_block_stop`, `message_delta`,
`message_stop`
- **OpenAI Responses**: Uses event types `response.created`,
`response.in_progress`, `response.output_item.added`,
`response.content_part.added`, `response.output_text.delta`,
`response.output_text.done`, `response.content_part.done`,
`response.output_item.done`, `response.completed`

## Test Plan

### Manual Testing
Hardware: MacBook Pro M3 Max

**Non-streaming tests:**
```bash
# Chat Completions API
curl -X POST http://localhost:52415/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"model": "llama-3.2-1b", "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 20}'

# Claude Messages API
curl -X POST http://localhost:52415/v1/messages \
  -H "Content-Type: application/json" \
  -d '{"model": "llama-3.2-1b", "max_tokens": 50, "messages": [{"role": "user", "content": "Hello"}]}'

# OpenAI Responses API
curl -X POST http://localhost:52415/v1/responses \
  -H "Content-Type: application/json" \
  -d '{"model": "llama-3.2-1b", "input": "Hello", "max_output_tokens": 20}'
```

**Streaming tests:**
```bash
# Chat Completions API (streaming)
curl -N -X POST http://localhost:52415/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"model": "llama-3.2-1b", "messages": [{"role": "user", "content": "Hello"}], "stream": true, "max_tokens": 20}'

# Claude Messages API (streaming)
curl -N -X POST http://localhost:52415/v1/messages \
  -H "Content-Type: application/json" \
  -d '{"model": "llama-3.2-1b", "max_tokens": 50, "messages": [{"role": "user", "content": "Hello"}], "stream": true}'

# OpenAI Responses API (streaming)
curl -N -X POST http://localhost:52415/v1/responses \
  -H "Content-Type: application/json" \
  -d '{"model": "llama-3.2-1b", "input": "Hello", "stream": true, "max_output_tokens": 20}'
```

All endpoints tested successfully with proper response formats and
streaming events.

### Automated Testing
- Tests in `src/exo/master/tests/` all pass (85 tests)
- Type checker (basedpyright) passes with 0 errors
- Linter (ruff) passes

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Evan <evanev7@gmail.com>
2026-02-02 15:58:37 +00:00
rltakashige
21d477f1cb Update exo bench (#1357)
## Motivation

Make exo bench faster for longer prompts, lengthen default timeouts and
use pairs for pp and tg.

## Changes

- Uses binary search to find correct prompt
- Flag to force all combinations if that is desired
2026-02-02 15:46:15 +00:00
Jake Hillion
b2579c78fe nix: add macmon to PATH in wrapper scripts on Darwin
`nix run .#exo` couldn't find `macmon` because the Nix wrapper scripts
didn't include it in PATH, causing `shutil.which("macmon")` to fail.

Added `--prefix PATH : ${pkgs.macmon}/bin` to the `makeWrapper` call,
conditional on Darwin via `lib.optionalString`, so macmon's binary is
available at runtime without modifying the user's system PATH.

Test plan:
- Verified `nix build .#exo` succeeds
- Checked wrapper script contains macmon store path in PATH prefix
2026-02-02 13:42:36 +00:00
Evan Quiney
cd946742f7 fix skipping logic in worker plan (#1342)
the worker plan function had some skipping logic missing, leading to
double-submitting tasks.
2026-01-30 14:31:40 +00:00
rltakashige
a5bc38ad1f Check all nodes to evict (#1341)
## Motivation

If nodes have uneven memory, one node may evict cache that remains on
another node. This will break prefill on some setups.

## Changes

<!-- Describe what you changed in detail -->

## Why It Works

<!-- Explain why your approach solves the problem -->

## Test Plan

### Manual Testing
<!-- Hardware: (e.g., MacBook Pro M1 Max 32GB, Mac Mini M2 16GB,
connected via Thunderbolt 4) -->
<!-- What you did: -->
<!-- - -->

### Automated Testing
<!-- Describe changes to automated tests, or how existing tests cover
this change -->
<!-- - -->
2026-01-30 13:42:09 +00:00
Evan Quiney
2a4e0d4629 make node-ids unique per-session (#1338)
we currently have no strict reuqirements that node ids persist across
sessions, so we can generate fresh nodeids each time

this avoids issues like #1332, but prevents further features such as
caching downloads or node-id dialling

Co-authored-by: rltakashige <rl.takashige@gmail.com>
2026-01-30 13:33:31 +00:00
Evan Quiney
46a14153dd switch to ModelCard.load outside of download log (#1339)
some attempts to load model cards (i.e. build_base_shard) always went
through networking rather than using downloaded model cards. we should
always default to ModelCard.load in these scenarios
2026-01-30 11:20:20 +00:00
Evan
9ba61f3733 improve log message in shard downloader
closes #1336
2026-01-30 10:35:01 +00:00
rltakashige
d9eca75895 Add usage stats (#1333)
## Motivation

(Probably) the final missing piece of the Chat Completions API 

## Changes

Add UsageStats 

## Why It Works

OpenCode reviewed my PR and gave me stats:

<img width="1150" height="802" alt="image"
src="https://github.com/user-attachments/assets/ebc06bae-797f-4087-87d5-2f26cf60fc48"
/>


## Test Plan

### Automated Testing
No tests were broken.
2026-01-30 10:23:08 +00:00
rltakashige
9dabde7e57 Fix bench after recent updates (#1331)
## Motivation

A lot of changes happened without much attention to the state of exo
bench.

## Changes

Use TaggedModel for BenchChatCompletion so it serialises properly.
Don't break after gpt oss tool call to preserve parity with the rest of
the codebase.

## Why It Works

<!-- Explain why your approach solves the problem -->

## Test Plan

### Manual Testing
<img width="2856" height="678" alt="image"
src="https://github.com/user-attachments/assets/2e18cf0d-c0f8-467c-9763-1a6a59c8a327"
/>

Also tested GPT OSS tool calling in OpenCode
2026-01-29 19:14:40 +00:00
ciaranbor
a31942ce12 Ciaran/image non streaming (#1328)
## Motivation

The dashboard UI attempted to parse all image generation responses as
SSE streams, even when streaming was disabled. This broke non-streaming
image generation.

## Changes

- Parse JSON responses directly when not streaming, use SSE parser only
when stream=true AND partialImages > 0
- explicitly disable partial images when not streaming

## Why It Works

Both API and dashboard now use the same condition (stream &&
partialImages > 0) to determine response format, ensuring correct
parsing.

## Test Plan

### Manual Testing

Non-streamed image generation results appear in the UI. Streamed image
generation still works
2026-01-29 17:24:32 +00:00
Alex Cheema
7cc313b22a Treat Swift/Xcode build warnings as errors (#1322)
## Motivation

Warnings that go unchecked tend to accumulate and hide real issues.
Treating them as errors ensures they are addressed immediately, both
locally during development and in CI.

## Changes

Added `SWIFT_TREAT_WARNINGS_AS_ERRORS = YES` and
`GCC_TREAT_WARNINGS_AS_ERRORS = YES` to the **project-level** Debug and
Release build configurations in `project.pbxproj`. This applies to all
targets (EXO, EXOTests, EXOUITests).

## Why It Works

Xcode's `SWIFT_TREAT_WARNINGS_AS_ERRORS` and
`GCC_TREAT_WARNINGS_AS_ERRORS` build settings promote Swift and C/ObjC
warnings to errors at compile time. Setting them at the project level
means all targets inherit the policy without needing per-target or
CI-level overrides.

## Test Plan

### Manual Testing
- Built the EXO scheme in Release configuration with `xcodebuild` — no
warning-as-error failures from Swift or C/ObjC sources.

### Automated Testing
- CI already builds with `-configuration Release`, so it will
automatically enforce warnings-as-errors via the inherited project
settings — no CI changes needed.
2026-01-29 17:15:49 +00:00
rltakashige
2837225dc7 Load pipeline layers sequentially (#1329)
## Motivation

Slightly annoyed by needing this change, but same story as for tensor
loading...
2026-01-29 17:08:38 +00:00
Jake Hillion
e4c6a7dbb4 nix: add Python packaging with uv2nix
Add uv2nix to build Python packages from uv.lock. This creates a fully
Nix-managed Python environment with the Rust bindings injected via overlay.

Changes:
- Add pyproject-nix, uv2nix, and pyproject-build-systems flake inputs
- Create python/parts.nix with overlays to inject Nix-built Rust wheel
- Export packages.exo on macOS (wraps exo/exo-master/exo-worker with dashboard)
- Add checks.lint (ruff, all platforms) and checks.pytest (macOS only)
- Simplify CI typecheck job using nicknovitski/nix-develop action
- Delete .github/actions/typecheck composite action (no longer needed)
- Add no-build-package for MLX packages in pyproject.toml (use wheels)

The Python build is currently macOS-only since MLX requires Metal. Linux
support will be added once the pyproject dependencies are simplified.

Test plan:
- Run `nix flake check` on macOS to verify pytest and lint pass
- Build exo package on macOS: `nix build .#exo`
- Verify CI pipeline passes with simplified typecheck job
2026-01-29 16:35:58 +00:00
Evan
b1e88a3d06 shfmt
adds shfmt, a shell formatter, and formats the bash files
2026-01-29 15:24:36 +00:00
Jake Hillion
ebeddfb308 mlx: build with Nix (#1285)
In order to make testing and deployment simpler and more reproducible,
we want to provide a Nix derivation for our macOS .app build. We already
build the Rust and dashboard with Nix, but so far the Python has been
blocked because we haven't had an MLX build.

This change adds a Metal compiler derivation that uses `requireFile` to
be provided a NAR of the unfree macOS Metal compiler. It is documented
how to get this file, but effectively you have to trigger the download,
mount the DMG, and NAR the result. Once this is added to the store by
hash we can build MLX using it. The MLX build itself is quite self
explanatory.

Test plan:
- CI. We follow the instructions to grab the Metal compiler. Once this
is in Cachix we should really never do this again, and I can pin the
path too to ensure it doesn't leave.
- MLX tests run as part of the MLX derivation's build. They pass.
- `NIXPKGS_ALLOW_UNFREE=1 nix build .#mlx.passthru.tests.mlxTest
--impure --option sandbox false`

---------

Co-authored-by: Ryuichi Leo Takashige <leo@exolabs.net>
2026-01-29 14:07:00 +00:00
Alex Cheema
9111575997 Add startup delay and update network setup message (#1309)
## Summary
- Add 20-second startup delay to wait for macOS to finish network setup
after boot
- Update user-facing message to clarify the service configures local
networking, disables Thunderbolt Bridge (preventing packet storms), and
installs a Network Location

## Test plan
- [ ] Manual verification of Swift syntax
- [ ] Test network setup on macOS device after reboot

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: rltakashige <rl.takashige@gmail.com>
2026-01-29 13:05:50 +00:00
Sami Khan
ffacabe7e4 Fix uninstall button error (#1306)
## Motivation

Fix "Network setup script failed" error when clicking uninstall button
and resolve Xcode compiler warnings.

## Changes

- NetworkSetupHelper.swift: Add || true guards and explicit return 0 in
find_and_enable_thunderbolt_bridge to prevent script failures with set
-euo pipefail
- ThunderboltBridgeService.swift: Use withCString and
withUnsafeMutablePointer for Authorization API calls to fix pointer
lifetime warnings
- EXOApp.swift: Mark showNotification as nonisolated to fix main actor
isolation warning

## Why It Works

- The uninstall script's Thunderbolt re-enable function could exit
non-zero in edge cases (no bridges, no matches). Since this is a cleanup
step, failures should not abort uninstall.
- Swift requires explicit pointer lifetime management when passing
strings/structs to C APIs.
- showNotification is called from a nonisolated delegate method and uses
thread-safe APIs.

## Test Plan

### Manual Testing
Hardware: MacBook Pro

- Clicked Uninstall button, verified it completes without error
- Built in Xcode, verified no warnings   

### Automated Testing
N/A
2026-01-29 12:57:48 +00:00
rltakashige
9e58a57599 Add RDMA caveats to README.md (#1316)
## Motivation

Running RDMA from source is not well documented as is. Several
surprising things that took time to debug internally too.

App should be updated to detect MacOS versions in future.
2026-01-28 18:44:00 +00:00
Evan Quiney
748a026071 fix configdata validation for kimi-k2 (#1314)
## motivation
our shard downloader could not correctly fetch data for kimi-k2, as it
deferred some values to a text_config field.
## changes
config_data now prioritizes this field if it exists in information like
layer_count
2026-01-28 14:29:36 +00:00
Alex Cheema
f1a2d054ec Update tagline to "Run frontier AI locally" (#1313)
- Update README tagline from "Run your own AI cluster at home with
everyday devices" to "Run frontier AI locally"
2026-01-28 12:38:14 +00:00
Alex Cheema
b3c8f85fc8 Update MLX to 0.30.4 (#1311)
## Summary
- Bump mlx from 0.30.3 to 0.30.4

## Test plan
- [x] `uv lock` succeeds
- [x] Type checking passes (`uv run basedpyright`)
- [x] Run inference tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 04:30:21 -08:00
rltakashige
a562114ba5 Add Kimi K2.5 support (#1302)
## Motivation

<!-- Why is this change needed? What problem does it solve? -->
<!-- If it fixes an open issue, please link to the issue here -->

## Changes

<!-- Describe what you changed in detail -->

## Why It Works

<!-- Explain why your approach solves the problem -->

## Test Plan

### Manual Testing
<!-- Hardware: (e.g., MacBook Pro M1 Max 32GB, Mac Mini M2 16GB,
connected via Thunderbolt 4) -->
<!-- What you did: -->
<!-- - -->

### Automated Testing
<!-- Describe changes to automated tests, or how existing tests cover
this change -->
<!-- - -->

---------

Co-authored-by: Alex Cheema <41707476+AlexCheema@users.noreply.github.com>
2026-01-28 05:44:19 +00:00
Evan Quiney
991d278119 replace nix fmt with treefmt in just lint (#1301)
man evaluating the nix flake is so slow. treefmt speeeedy
2026-01-27 17:03:01 +00:00
Alex Cheema
ba7148ccec style: format app.svelte.ts with nix fmt 2026-01-22 11:53:43 +00:00
Alex Cheema
a64b8addc6 Fix localStorage quota issues by stripping tokens and auto-pruning
- Strip tokens (logprobs data) from messages before saving to localStorage
  since they're large and not essential for persistence
- Add pruneOldConversations() to automatically remove oldest conversations
  when quota is exceeded
- This prevents QuotaExceededError from crashing the app

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
e6599a9408 Fix ReferenceError: controller undefined in sendMessage finally block
Move AbortController creation before the try block in both
sendMessageWithLogprobs and regenerateFromToken functions.
Previously, controller was defined inside the try block but
referenced in the finally block, causing a ReferenceError
if an exception was thrown before the controller was created.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
93f4753598 Add SSE headers to properly close streaming connections
Add Cache-Control, Connection: close, and X-Accel-Buffering headers
to all SSE streaming responses. These headers help ensure:
- No caching of streaming responses
- Connection closes when stream ends (instead of keep-alive)
- No proxy buffering that could delay stream closure

This should fix the issue where the frontend stays on "PROCESSING"
even after receiving the complete response.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
75fe505275 Add debug logging to generate_chat_stream
Add logging to help diagnose why streaming might not be ending properly.
This will show when [DONE] is yielded, when return is called, and when
the finally block runs.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
d7c044e349 Fix streaming not ending after [DONE] is yielded
Add missing return statement after yielding [DONE] in generate_chat_stream.
Without this, the async generator continues waiting for more chunks from
chunk_stream even though generation is complete, causing the stream to hang
indefinitely. The frontend waits for the stream to close (reader.done) which
never happens, resulting in the chat button staying on "PROCESSING" forever.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
53b6d56e9f fix: restore extract_top_logprobs function for uncertainty visualization
The extract_top_logprobs function was lost during rebases. This function
processes the out.logprobs array (full vocabulary logprobs from MLX) to
extract the selected token's logprob and top-k alternatives.

The previous code tried to use getattr(out, "logprob", None) which
doesn't exist - mlx_lm returns logprobs as an mx.array, not individual
values.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
7fe0a61230 fix: remove unsupported logprob params from stream_generate
The mlx_lm.stream_generate already returns logprobs in its output -
we don't need to pass return_logprob or return_top_logprobs kwargs.
The uncertainty visualization feature extracts logprobs from the
existing out.logprobs field.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
5a36542631 feat: add uncertainty visualization with token-level logprobs
- Add TokenHeatmap component for visualizing token confidence
- Collect and stream logprobs in generation pipeline
- Add regenerate-from-token feature with continue_from_prefix
- Add AbortController for request cancellation
- Support continue_final_message for seamless prefix continuation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:53:43 +00:00
Alex Cheema
955e0105b3 fix: resolve import and type errors from rebase
- Use claude_request_to_internal instead of old function name
- Fix ModelId imports in runner.py and test files
- Update test_mlx/conftest.py to use ResponsesRequest format
- Remove unused imports

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:36:11 +00:00
Evan
4d1eb1d9bd fix: rebase fix 2026-01-22 11:32:46 +00:00
Alex Cheema
365416c65e style: move inline imports to top of file in api.py
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:32:26 +00:00
Alex Cheema
04af76e10f fix: restore try/except structure in runner.py
Replace non-existent context manager with proper try/except block
and remove unused ModelId import.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:32:04 +00:00
Alex Cheema
a84c3431cd style: fix formatting issues caught by treefmt
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:31:45 +00:00
Alex Cheema
52445b21f6 refactor: use ResponsesRequest as canonical internal type
- Extend ResponsesRequest with fields: top_k, seed, stop, tools
- Remove redundant InternalTaskParams and InputMessage types
- Update all adapters to convert to ResponsesRequest
- Simplify Responses API (no conversion needed - native passthrough)
- Update all imports across codebase and tests

This eliminates type duplication and makes the Responses API
relationship explicit throughout the codebase.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:31:44 +00:00
Alex Cheema
435bd7f6fa refactor: make Responses API the canonical internal format
Restructure the API layer so that OpenAI Responses API is the native
format, with Chat Completions and Claude Messages as adapters on top.

Changes:
- Add new chat_completions.py adapter with streaming/non-streaming support
- Update responses.py with collect_responses_response() for non-streaming
- Update claude.py with collect_claude_response() for non-streaming
- Refactor api.py so all endpoints use adapters uniformly
- Rename _chat_chunk_stream to _token_chunk_stream (generic internal format)
- Remove unused chat_response_to_* converter functions
- Update tests to remove tests for deleted functions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:30:27 +00:00
Alex Cheema
dd25b5b90e feat: add Claude Messages API and OpenAI Responses API support
Adds two new API endpoints that wrap the existing chat completions:

- /v1/messages - Claude Messages API compatible endpoint
- /v1/responses - OpenAI Responses API compatible endpoint

Both support streaming (SSE) and non-streaming modes with proper
token usage reporting from actual inference stats.

Also adds top_k sampling parameter and stop sequence support to the
MLX inference engine.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 11:28:49 +00:00
68 changed files with 5703 additions and 2039 deletions

View File

@@ -1,12 +0,0 @@
name: Type Check
description: "Run type checker"
runs:
using: "composite"
steps:
- name: Run type checker
run: |
nix --extra-experimental-features nix-command --extra-experimental-features flakes develop -c just sync
nix --extra-experimental-features nix-command --extra-experimental-features flakes develop -c just check
shell: bash

View File

@@ -26,73 +26,14 @@ jobs:
name: exo
authToken: "${{ secrets.CACHIX_AUTH_TOKEN }}"
- name: Configure git user
run: |
git config --local user.email "github-actions@users.noreply.github.com"
git config --local user.name "github-actions bot"
shell: bash
- name: Load nix develop environment
run: nix run github:nicknovitski/nix-develop/v1
- name: Pull LFS files
run: |
echo "Pulling Git LFS files..."
git lfs pull
shell: bash
- name: Sync dependencies
run: uv sync --all-packages
- name: Setup Nix Environment
run: |
echo "Checking for nix installation..."
# Check if nix binary exists directly
if [ -f /nix/var/nix/profiles/default/bin/nix ]; then
echo "Found nix binary at /nix/var/nix/profiles/default/bin/nix"
export PATH="/nix/var/nix/profiles/default/bin:$PATH"
echo "PATH=$PATH" >> $GITHUB_ENV
nix --version
elif [ -f /nix/var/nix/profiles/default/etc/profile.d/nix-daemon.sh ]; then
echo "Found nix profile script, sourcing..."
source /nix/var/nix/profiles/default/etc/profile.d/nix-daemon.sh
nix --version
elif command -v nix >/dev/null 2>&1; then
echo "Nix already in PATH"
nix --version
else
echo "Nix not found. Debugging info:"
echo "Contents of /nix/var/nix/profiles/default/:"
ls -la /nix/var/nix/profiles/default/ 2>/dev/null || echo "Directory not found"
echo "Contents of /nix/var/nix/profiles/default/bin/:"
ls -la /nix/var/nix/profiles/default/bin/ 2>/dev/null || echo "Directory not found"
exit 1
fi
shell: bash
- name: Configure basedpyright include for local MLX
run: |
RUNNER_LABELS='${{ toJSON(runner.labels) }}'
if echo "$RUNNER_LABELS" | grep -q "local_mlx"; then
if [ -d "/Users/Shared/mlx" ]; then
echo "Updating [tool.basedpyright].include to use /Users/Shared/mlx"
awk '
BEGIN { in=0 }
/^\[tool\.basedpyright\]/ { in=1; print; next }
in && /^\[/ { in=0 } # next section
in && /^[ \t]*include[ \t]*=/ {
print "include = [\"/Users/Shared/mlx\"]"
next
}
{ print }
' pyproject.toml > pyproject.toml.tmp && mv pyproject.toml.tmp pyproject.toml
echo "New [tool.basedpyright] section:"
sed -n '/^\[tool\.basedpyright\]/,/^\[/p' pyproject.toml | sed '$d' || true
else
echo "local_mlx tag present but /Users/Shared/mlx not found; leaving pyproject unchanged."
fi
else
echo "Runner does not have 'local_mlx' tag; leaving pyproject unchanged."
fi
shell: bash
- uses: ./.github/actions/typecheck
- name: Run type checker
run: uv run basedpyright --project pyproject.toml
nix:
name: Build and check (${{ matrix.system }})
@@ -123,6 +64,63 @@ jobs:
name: exo
authToken: "${{ secrets.CACHIX_AUTH_TOKEN }}"
- name: Build Metal packages (macOS only)
if: runner.os == 'macOS'
run: |
# Try to build metal-toolchain first (may succeed via cachix cache hit)
if nix build .#metal-toolchain 2>/dev/null; then
echo "metal-toolchain built successfully (likely cache hit)"
else
echo "metal-toolchain build failed, extracting from Xcode..."
NAR_HASH="sha256-ayR5mXN4sZAddwKEG2OszGRF93k9ZFc7H0yi2xbylQw="
NAR_NAME="metal-toolchain-17C48.nar"
# Use RUNNER_TEMP to avoid /tmp symlink issues on macOS
WORK_DIR="${RUNNER_TEMP}/metal-work"
mkdir -p "$WORK_DIR"
# Download the Metal toolchain component
xcodebuild -downloadComponent MetalToolchain
# Find and mount the DMG
DMG_PATH=$(find /System/Library/AssetsV2/com_apple_MobileAsset_MetalToolchain -name '*.dmg' 2>/dev/null | head -1)
if [ -z "$DMG_PATH" ]; then
echo "Error: Could not find Metal toolchain DMG"
exit 1
fi
echo "Found DMG at: $DMG_PATH"
hdiutil attach "$DMG_PATH" -mountpoint "${WORK_DIR}/metal-dmg"
# Copy the toolchain
cp -R "${WORK_DIR}/metal-dmg/Metal.xctoolchain" "${WORK_DIR}/metal-export"
hdiutil detach "${WORK_DIR}/metal-dmg"
# Create NAR and add to store
nix nar pack "${WORK_DIR}/metal-export" > "${WORK_DIR}/${NAR_NAME}"
STORE_PATH=$(nix store add --mode flat "${WORK_DIR}/${NAR_NAME}")
echo "Added NAR to store: $STORE_PATH"
# Verify the hash matches
ACTUAL_HASH=$(nix hash file "${WORK_DIR}/${NAR_NAME}")
if [ "$ACTUAL_HASH" != "$NAR_HASH" ]; then
echo "Warning: NAR hash mismatch!"
echo "Expected: $NAR_HASH"
echo "Actual: $ACTUAL_HASH"
echo "The metal-toolchain.nix may need updating"
fi
# Clean up
rm -rf "$WORK_DIR"
# Retry the build now that NAR is in store
nix build .#metal-toolchain
fi
# Build mlx (depends on metal-toolchain)
nix build .#mlx
- name: Build all Nix outputs
run: |
nix flake show --json | jq -r '
@@ -134,3 +132,14 @@ jobs:
- name: Run nix flake check
run: nix flake check
- name: Run pytest (macOS only)
if: runner.os == 'macOS'
run: |
# Build the test environment (requires relaxed sandbox for uv2nix on macOS)
TEST_ENV=$(nix build '.#exo-test-env' --option sandbox relaxed --print-out-paths)
# Run pytest outside sandbox (needs GPU access for MLX)
export HOME="$RUNNER_TEMP"
export EXO_TESTS=1
$TEST_ENV/bin/python -m pytest src -m "not slow" --import-mode=importlib

3
.gitignore vendored
View File

@@ -28,3 +28,6 @@ target/
dashboard/build/
dashboard/node_modules/
dashboard/.svelte-kit/
# host config snapshots
hosts_*.json

View File

@@ -5,7 +5,7 @@
<img alt="exo logo" src="/docs/imgs/exo-logo-transparent.png" width="50%" height="50%">
</picture>
exo: Run your own AI cluster at home with everyday devices. Maintained by [exo labs](https://x.com/exolabs).
exo: Run frontier AI locally. Maintained by [exo labs](https://x.com/exolabs).
<p align="center">
<a href="https://discord.gg/TJ4P57arEm" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/Discord-Join%20Server-5865F2?logo=discord&logoColor=white" alt="Discord"></a>
@@ -107,6 +107,10 @@ uv run exo
This starts the exo dashboard and API at http://localhost:52415/
*Please view the section on RDMA to enable this feature on MacOS >=26.2!*
### Run from Source (Linux)
**Prerequisites:**
@@ -230,7 +234,7 @@ This removes:
RDMA is a new capability added to macOS 26.2. It works on any Mac with Thunderbolt 5 (M4 Pro Mac Mini, M4 Max Mac Studio, M4 Max MacBook Pro, M3 Ultra Mac Studio).
Note that on Mac Studio, you cannot use the Thunderbolt 5 port next to the Ethernet port.
Please refer to the caveats for immediate troubleshooting.
To enable RDMA on macOS, follow these steps:
@@ -247,6 +251,14 @@ To enable RDMA on macOS, follow these steps:
After that, RDMA will be enabled in macOS and exo will take care of the rest.
**Important Caveats**
1. Devices that wish to be part of an RDMA cluster must be connected to all other devices in the cluster.
2. The cables must support TB5.
3. On a Mac Studio, you cannot use the Thunderbolt 5 port next to the Ethernet port.
4. If running from source, please use the script found at `tmp/set_rdma_network_config.sh`, which will disable Thunderbolt Bridge and set dhcp on each RDMA port.
5. RDMA ports may be unable to discover each other on different versions of MacOS. Please ensure that OS versions match exactly (even beta version numbers) on all devices.
---
### Using the API

View File

@@ -342,6 +342,8 @@
SDKROOT = macosx;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = "DEBUG $(inherited)";
SWIFT_OPTIMIZATION_LEVEL = "-Onone";
SWIFT_TREAT_WARNINGS_AS_ERRORS = YES;
GCC_TREAT_WARNINGS_AS_ERRORS = YES;
};
name = Debug;
};
@@ -397,6 +399,8 @@
MTL_FAST_MATH = YES;
SDKROOT = macosx;
SWIFT_COMPILATION_MODE = wholemodule;
SWIFT_TREAT_WARNINGS_AS_ERRORS = YES;
GCC_TREAT_WARNINGS_AS_ERRORS = YES;
};
name = Release;
};

View File

@@ -225,7 +225,7 @@ private final class ExoUpdaterDelegate: NSObject, SPUUpdaterDelegate {
}
}
private func showNotification(title: String, body: String) {
nonisolated private func showNotification(title: String, body: String) {
let center = UNUserNotificationCenter.current()
let content = UNMutableNotificationContent()
content.title = title

View File

@@ -293,7 +293,7 @@ struct ClusterTask {
let modelName: String?
let promptPreview: String?
let errorMessage: String?
let parameters: ChatCompletionTaskParameters?
let parameters: TextGenerationTaskParameters?
var sortPriority: Int {
switch status {
@@ -330,12 +330,12 @@ struct ClusterTaskPayload: Decodable {
let taskStatus: TaskStatus?
let instanceId: String?
let commandId: String?
let taskParams: ChatCompletionTaskParameters?
let taskParams: TextGenerationTaskParameters?
let errorType: String?
let errorMessage: String?
}
struct ChatCompletionTaskParameters: Decodable, Equatable {
struct TextGenerationTaskParameters: Decodable, Equatable {
let model: String?
let messages: [ChatCompletionMessage]?
let maxTokens: Int?
@@ -374,7 +374,7 @@ extension ClusterTask {
guard let id = payload.taskId else { return nil }
let status = payload.taskStatus ?? .unknown
switch kindKey {
case "ChatCompletion":
case "TextGeneration":
self.init(
id: id,
status: status,

View File

@@ -18,6 +18,9 @@ enum NetworkSetupHelper {
set -euo pipefail
# Wait for macOS to finish network setup after boot
sleep 20
PREFS="/Library/Preferences/SystemConfiguration/preferences.plist"
# Remove bridge0 interface
@@ -80,7 +83,7 @@ enum NetworkSetupHelper {
let alert = NSAlert()
alert.messageText = "EXO Network Configuration"
alert.informativeText =
"EXO needs to install a system service to automatically disable Thunderbolt Bridge on startup. This prevents network loops when connecting multiple Macs via Thunderbolt.\n\nYou will be prompted for your administrator password."
"EXO needs to install a system service to configure local networking. This will disable Thunderbolt Bridge (preventing packet storms) and install a Network Location.\n\nYou will be prompted for your password."
alert.alertStyle = .informational
alert.addButton(withTitle: "Install")
alert.addButton(withTitle: "Not Now")
@@ -241,11 +244,11 @@ enum NetworkSetupHelper {
rm -f "$LOG_OUT" "$LOG_ERR"
# Switch back to Automatic network location
networksetup -switchtolocation Automatic 2>/dev/null || true
networksetup -switchtolocation Automatic >/dev/null 2>&1 || true
# Delete the exo network location if it exists
networksetup -listlocations | grep -q '^exo$' && {
networksetup -deletelocation exo 2>/dev/null || true
networksetup -listlocations 2>/dev/null | grep -q '^exo$' && {
networksetup -deletelocation exo >/dev/null 2>&1 || true
} || true
# Re-enable any Thunderbolt Bridge service if it exists
@@ -255,12 +258,12 @@ enum NetworkSetupHelper {
tb_devices=$(networksetup -listallhardwareports 2>/dev/null | awk '
/^Hardware Port:/ { port = tolower(substr($0, 16)) }
/^Device:/ { if (port ~ /thunderbolt/) print substr($0, 9) }
')
') || true
[ -z "$tb_devices" ] && return 0
# For each bridge device, check if it contains Thunderbolt interfaces
for bridge in bridge0 bridge1 bridge2; do
members=$(ifconfig "$bridge" 2>/dev/null | awk '/member:/ {print $2}')
members=$(ifconfig "$bridge" 2>/dev/null | awk '/member:/ {print $2}') || true
[ -z "$members" ] && continue
for tb_dev in $tb_devices; do
@@ -269,7 +272,7 @@ enum NetworkSetupHelper {
service_name=$(networksetup -listnetworkserviceorder 2>/dev/null | awk -v dev="$bridge" '
/^\\([0-9*]/ { gsub(/^\\([0-9*]+\\) /, ""); svc = $0 }
/Device:/ && $0 ~ dev { print svc; exit }
')
') || true
if [ -n "$service_name" ]; then
networksetup -setnetworkserviceenabled "$service_name" on 2>/dev/null || true
return 0
@@ -277,8 +280,9 @@ enum NetworkSetupHelper {
fi
done
done
return 0
}
find_and_enable_thunderbolt_bridge
find_and_enable_thunderbolt_bridge || true
echo "EXO network components removed successfully"
"""

View File

@@ -127,21 +127,24 @@ final class ThunderboltBridgeService: ObservableObject {
// 2. Request specific network configuration rights
let rightName = "system.services.systemconfiguration.network"
var item = AuthorizationItem(
name: rightName,
valueLength: 0,
value: nil,
flags: 0
)
var rights = AuthorizationRights(count: 1, items: &item)
status = AuthorizationCopyRights(
authRef,
&rights,
nil,
[.extendRights, .interactionAllowed],
nil
)
status = rightName.withCString { nameCString in
var item = AuthorizationItem(
name: nameCString,
valueLength: 0,
value: nil,
flags: 0
)
return withUnsafeMutablePointer(to: &item) { itemPointer in
var rights = AuthorizationRights(count: 1, items: itemPointer)
return AuthorizationCopyRights(
authRef,
&rights,
nil,
[.extendRights, .interactionAllowed],
nil
)
}
}
guard status == errAuthorizationSuccess else {
if status == errAuthorizationCanceled {
throw ThunderboltBridgeError.authorizationCanceled

View File

@@ -29,21 +29,21 @@ YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo_info() {
echo -e "${GREEN}[INFO]${NC} $1"
echo -e "${GREEN}[INFO]${NC} $1"
}
echo_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
echo -e "${YELLOW}[WARN]${NC} $1"
}
echo_error() {
echo -e "${RED}[ERROR]${NC} $1"
echo -e "${RED}[ERROR]${NC} $1"
}
# Check if running as root
if [[ $EUID -ne 0 ]]; then
echo_error "This script must be run as root (use sudo)"
exit 1
echo_error "This script must be run as root (use sudo)"
exit 1
fi
echo ""
@@ -55,64 +55,64 @@ echo ""
# Unload the LaunchDaemon if running
echo_info "Stopping network setup daemon..."
if launchctl list | grep -q "$LABEL"; then
launchctl bootout system/"$LABEL" 2>/dev/null || true
echo_info "Daemon stopped"
launchctl bootout system/"$LABEL" 2>/dev/null || true
echo_info "Daemon stopped"
else
echo_warn "Daemon was not running"
echo_warn "Daemon was not running"
fi
# Remove LaunchDaemon plist
if [[ -f "$PLIST_DEST" ]]; then
rm -f "$PLIST_DEST"
echo_info "Removed LaunchDaemon plist"
if [[ -f $PLIST_DEST ]]; then
rm -f "$PLIST_DEST"
echo_info "Removed LaunchDaemon plist"
else
echo_warn "LaunchDaemon plist not found (already removed?)"
echo_warn "LaunchDaemon plist not found (already removed?)"
fi
# Remove the script and parent directory
if [[ -f "$SCRIPT_DEST" ]]; then
rm -f "$SCRIPT_DEST"
echo_info "Removed network setup script"
if [[ -f $SCRIPT_DEST ]]; then
rm -f "$SCRIPT_DEST"
echo_info "Removed network setup script"
else
echo_warn "Network setup script not found (already removed?)"
echo_warn "Network setup script not found (already removed?)"
fi
# Remove EXO directory if empty
if [[ -d "/Library/Application Support/EXO" ]]; then
rmdir "/Library/Application Support/EXO" 2>/dev/null && \
echo_info "Removed EXO support directory" || \
echo_warn "EXO support directory not empty, leaving in place"
rmdir "/Library/Application Support/EXO" 2>/dev/null &&
echo_info "Removed EXO support directory" ||
echo_warn "EXO support directory not empty, leaving in place"
fi
# Remove log files
if [[ -f "$LOG_OUT" ]] || [[ -f "$LOG_ERR" ]]; then
rm -f "$LOG_OUT" "$LOG_ERR"
echo_info "Removed log files"
if [[ -f $LOG_OUT ]] || [[ -f $LOG_ERR ]]; then
rm -f "$LOG_OUT" "$LOG_ERR"
echo_info "Removed log files"
else
echo_warn "Log files not found (already removed?)"
echo_warn "Log files not found (already removed?)"
fi
# Switch back to Automatic network location
echo_info "Restoring network configuration..."
if networksetup -listlocations | grep -q "^Automatic$"; then
networksetup -switchtolocation Automatic 2>/dev/null || true
echo_info "Switched to Automatic network location"
networksetup -switchtolocation Automatic 2>/dev/null || true
echo_info "Switched to Automatic network location"
else
echo_warn "Automatic network location not found"
echo_warn "Automatic network location not found"
fi
# Delete the exo network location if it exists
if networksetup -listlocations | grep -q "^exo$"; then
networksetup -deletelocation exo 2>/dev/null || true
echo_info "Deleted 'exo' network location"
networksetup -deletelocation exo 2>/dev/null || true
echo_info "Deleted 'exo' network location"
else
echo_warn "'exo' network location not found (already removed?)"
echo_warn "'exo' network location not found (already removed?)"
fi
# Re-enable Thunderbolt Bridge if it exists
if networksetup -listnetworkservices 2>/dev/null | grep -q "Thunderbolt Bridge"; then
networksetup -setnetworkserviceenabled "Thunderbolt Bridge" on 2>/dev/null || true
echo_info "Re-enabled Thunderbolt Bridge"
networksetup -setnetworkserviceenabled "Thunderbolt Bridge" on 2>/dev/null || true
echo_info "Re-enabled Thunderbolt Bridge"
fi
# Note about launch at login registration
@@ -124,14 +124,14 @@ echo_warn " System Settings → General → Login Items → Remove EXO"
# Check if EXO.app exists in common locations
APP_FOUND=false
for app_path in "/Applications/EXO.app" "$HOME/Applications/EXO.app"; do
if [[ -d "$app_path" ]]; then
if [[ "$APP_FOUND" == false ]]; then
echo ""
APP_FOUND=true
fi
echo_warn "EXO.app found at: $app_path"
echo_warn "You may want to move it to Trash manually."
if [[ -d $app_path ]]; then
if [[ $APP_FOUND == false ]]; then
echo ""
APP_FOUND=true
fi
echo_warn "EXO.app found at: $app_path"
echo_warn "You may want to move it to Trash manually."
fi
done
echo ""
@@ -151,4 +151,3 @@ echo ""
echo "Manual step required:"
echo " Remove EXO from Login Items in System Settings → General → Login Items"
echo ""

View File

@@ -5,10 +5,13 @@ from __future__ import annotations
import argparse
import contextlib
import http.client
import itertools
import json
import os
import sys
import time
from collections.abc import Callable
from pathlib import Path
from statistics import mean
from typing import Any
from urllib.parse import urlencode
@@ -16,6 +19,84 @@ from urllib.parse import urlencode
from loguru import logger
from transformers import AutoTokenizer
# Monkey-patch for transformers 5.x compatibility
# Kimi's tokenization_kimi.py imports bytes_to_unicode from the old location
# which was moved in transformers 5.0.0rc2
try:
import transformers.models.gpt2.tokenization_gpt2 as gpt2_tokenization
from transformers.convert_slow_tokenizer import bytes_to_unicode
if not hasattr(gpt2_tokenization, "bytes_to_unicode"):
gpt2_tokenization.bytes_to_unicode = bytes_to_unicode # type: ignore[attr-defined]
except ImportError:
pass # transformers < 5.0 or bytes_to_unicode not available
def load_tokenizer_for_bench(model_id: str) -> Any:
"""
Load tokenizer for benchmarking, with special handling for Kimi models.
Kimi uses a custom TikTokenTokenizer that transformers 5.x can't load via AutoTokenizer.
This function replicates the logic from utils_mlx.py for bench compatibility.
"""
model_id_lower = model_id.lower()
if "kimi-k2" in model_id_lower:
import importlib.util
import types
from huggingface_hub import snapshot_download
# Download/get the model path
model_path = Path(
snapshot_download(
model_id,
allow_patterns=["*.json", "*.py", "*.tiktoken"],
)
)
sys.path.insert(0, str(model_path))
# Load tool_declaration_ts first (tokenization_kimi imports it with relative import)
tool_decl_path = model_path / "tool_declaration_ts.py"
if tool_decl_path.exists():
spec = importlib.util.spec_from_file_location(
"tool_declaration_ts", tool_decl_path
)
if spec and spec.loader:
tool_decl_module = importlib.util.module_from_spec(spec)
sys.modules["tool_declaration_ts"] = tool_decl_module
spec.loader.exec_module(tool_decl_module)
# Load tokenization_kimi with patched source (convert relative to absolute import)
tok_path = model_path / "tokenization_kimi.py"
source = tok_path.read_text()
source = source.replace("from .tool_declaration_ts", "from tool_declaration_ts")
spec = importlib.util.spec_from_file_location("tokenization_kimi", tok_path)
if spec:
tok_module = types.ModuleType("tokenization_kimi")
tok_module.__file__ = str(tok_path)
sys.modules["tokenization_kimi"] = tok_module
exec(compile(source, tok_path, "exec"), tok_module.__dict__) # noqa: S102
TikTokenTokenizer = tok_module.TikTokenTokenizer # noqa: N806
else:
from tokenization_kimi import TikTokenTokenizer # type: ignore[import-not-found] # noqa: I001
hf_tokenizer: Any = TikTokenTokenizer.from_pretrained(model_path)
# Patch encode to use internal tiktoken model directly
# transformers 5.x has a bug in the encode->pad path for slow tokenizers
def _patched_encode(text: str, **kwargs: object) -> list[int]:
# Pass allowed_special="all" to handle special tokens like <|im_user|>
return list(hf_tokenizer.model.encode(text, allowed_special="all"))
hf_tokenizer.encode = _patched_encode
return hf_tokenizer
# Default: use AutoTokenizer
return AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
class ExoHttpError(RuntimeError):
def __init__(self, status: int, reason: str, body_preview: str):
@@ -24,7 +105,7 @@ class ExoHttpError(RuntimeError):
class ExoClient:
def __init__(self, host: str, port: int, timeout_s: float = 600.0):
def __init__(self, host: str, port: int, timeout_s: float = 7200.0):
self.host = host
self.port = port
self.timeout_s = timeout_s
@@ -180,14 +261,7 @@ def parse_int_list(values: list[str]) -> list[int]:
part = part.strip()
if part:
items.append(int(part))
seen: set[int] = set()
out: list[int] = []
for x in items:
if x not in seen:
out.append(x)
seen.add(x)
return out
return items
def resolve_model_short_id(client: ExoClient, model_arg: str) -> tuple[str, str]:
@@ -240,7 +314,11 @@ def run_one_completion(
stats = out.get("generation_stats")
preview = (out.get("choices") or [{}])[0]["message"]["content"][:200]
# Extract preview, handling None content (common for thinking models)
choices = out.get("choices") or [{}]
message = choices[0].get("message", {}) if choices else {}
content = message.get("content") or ""
preview = content[:200] if content else ""
return {
"elapsed_s": elapsed,
@@ -277,12 +355,29 @@ class PromptSizer:
f"Target ({target}) is smaller than template overhead ({self.base_tokens})."
)
content = ""
tok = self.count_fn(content)
# Estimate tokens per atom using a sample
sample_count = 100
sample_content = self.atom * sample_count
sample_tokens = self.count_fn(sample_content) - self.base_tokens
tokens_per_atom = sample_tokens / sample_count
while tok < target:
content += self.atom
tok = self.count_fn(content)
# Estimate starting point
needed_tokens = target - self.base_tokens
estimated_atoms = int(needed_tokens / tokens_per_atom)
# Binary search to find exact atom count
low, high = 0, estimated_atoms * 2 + 100
while low < high:
mid = (low + high) // 2
tok = self.count_fn(self.atom * mid)
if tok < target:
low = mid + 1
else:
high = mid
content = self.atom * low
tok = self.count_fn(content)
logger.info(f"{tok=}")
if tok != target:
raise RuntimeError(
@@ -348,7 +443,7 @@ def main() -> int:
help="Warmup runs per placement (uses first pp/tg).",
)
ap.add_argument(
"--timeout", type=float, default=600.0, help="HTTP timeout (seconds)."
"--timeout", type=float, default=7200.0, help="HTTP timeout (seconds)."
)
ap.add_argument(
"--json-out",
@@ -358,6 +453,11 @@ def main() -> int:
ap.add_argument(
"--dry-run", action="store_true", help="List selected placements and exit."
)
ap.add_argument(
"--all-combinations",
action="store_true",
help="Force all pp×tg combinations (cartesian product) even when lists have equal length.",
)
args = ap.parse_args()
pp_list = parse_int_list(args.pp)
@@ -369,6 +469,15 @@ def main() -> int:
logger.error("--repeat must be >= 1")
return 2
# Log pairing mode
use_combinations = args.all_combinations or len(pp_list) != len(tg_list)
if use_combinations:
logger.info(
f"pp/tg mode: combinations (product) - {len(pp_list) * len(tg_list)} pairs"
)
else:
logger.info(f"pp/tg mode: tandem (zip) - {len(pp_list)} pairs")
client = ExoClient(args.host, args.port, timeout_s=args.timeout)
short_id, full_model_id = resolve_model_short_id(client, args.model)
@@ -377,10 +486,7 @@ def main() -> int:
)
previews = previews_resp.get("previews") or []
tokenizer = AutoTokenizer.from_pretrained(
full_model_id,
trust_remote_code=True,
)
tokenizer = load_tokenizer_for_bench(full_model_id)
if tokenizer is None:
raise RuntimeError("[exo-bench] tokenizer load failed")
@@ -486,60 +592,55 @@ def main() -> int:
)
logger.debug(f" warmup {i + 1}/{args.warmup} done")
for pp in pp_list:
# if (
# pp * n_nodes > 2048
# and "ring" in instance_meta.lower()
# and "tensor" in sharding.lower()
# ):
# model_card = MODEL_CARDS[short_id]
# if model_card.metadata.storage_size > Memory.from_gb(10):
# logger.info(
# f"Skipping tensor ring as this is too slow for model of size {model_card.metadata.storage_size} on {n_nodes=}"
# )
# continue
for tg in tg_list:
runs: list[dict[str, Any]] = []
for r in range(args.repeat):
time.sleep(3)
try:
row, actual_pp_tokens = run_one_completion(
client, full_model_id, pp, tg, prompt_sizer
)
except Exception as e:
logger.error(e)
continue
row.update(
{
"model_short_id": short_id,
"model_id": full_model_id,
"placement_sharding": sharding,
"placement_instance_meta": instance_meta,
"placement_nodes": n_nodes,
"instance_id": instance_id,
"pp_tokens": actual_pp_tokens,
"tg": tg,
"repeat_index": r,
}
)
runs.append(row)
all_rows.append(row)
# If pp and tg lists have same length, run in tandem (zip)
# Otherwise (or if --all-combinations), run all combinations (cartesian product)
if use_combinations:
pp_tg_pairs = list(itertools.product(pp_list, tg_list))
else:
pp_tg_pairs = list(zip(pp_list, tg_list, strict=True))
if runs:
prompt_tps = mean(x["stats"]["prompt_tps"] for x in runs)
gen_tps = mean(x["stats"]["generation_tps"] for x in runs)
ptok = mean(x["stats"]["prompt_tokens"] for x in runs)
gtok = mean(x["stats"]["generation_tokens"] for x in runs)
peak = mean(
x["stats"]["peak_memory_usage"]["inBytes"] for x in runs
for pp, tg in pp_tg_pairs:
runs: list[dict[str, Any]] = []
for r in range(args.repeat):
time.sleep(3)
try:
row, actual_pp_tokens = run_one_completion(
client, full_model_id, pp, tg, prompt_sizer
)
except Exception as e:
logger.error(e)
continue
row.update(
{
"model_short_id": short_id,
"model_id": full_model_id,
"placement_sharding": sharding,
"placement_instance_meta": instance_meta,
"placement_nodes": n_nodes,
"instance_id": instance_id,
"pp_tokens": actual_pp_tokens,
"tg": tg,
"repeat_index": r,
}
)
runs.append(row)
all_rows.append(row)
logger.info(
f"prompt_tps={prompt_tps:.2f} gen_tps={gen_tps:.2f} "
f"prompt_tokens={ptok} gen_tokens={gtok} "
f"peak_memory={format_peak_memory(peak)}\n"
)
time.sleep(2)
if runs:
prompt_tps = mean(x["stats"]["prompt_tps"] for x in runs)
gen_tps = mean(x["stats"]["generation_tps"] for x in runs)
ptok = mean(x["stats"]["prompt_tokens"] for x in runs)
gtok = mean(x["stats"]["generation_tokens"] for x in runs)
peak = mean(
x["stats"]["peak_memory_usage"]["inBytes"] for x in runs
)
logger.info(
f"prompt_tps={prompt_tps:.2f} gen_tps={gen_tps:.2f} "
f"prompt_tokens={ptok} gen_tokens={gtok} "
f"peak_memory={format_peak_memory(peak)}\n"
)
time.sleep(2)
finally:
try:
client.request_json("DELETE", f"/instance/{instance_id}")

View File

@@ -865,7 +865,6 @@
"integrity": "sha512-oH8tXw7EZnie8FdOWYrF7Yn4IKrqTFHhXvl8YxXxbKwTMcD/5NNCryUSEXRk2ZR4ojnub0P8rNrsVGHXWqIDtA==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@standard-schema/spec": "^1.0.0",
"@sveltejs/acorn-typescript": "^1.0.5",
@@ -905,7 +904,6 @@
"integrity": "sha512-Y1Cs7hhTc+a5E9Va/xwKlAJoariQyHY+5zBgCZg4PFWNYQ1nMN9sjK1zhw1gK69DuqVP++sht/1GZg1aRwmAXQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@sveltejs/vite-plugin-svelte-inspector": "^4.0.1",
"debug": "^4.4.1",
@@ -1522,7 +1520,6 @@
"integrity": "sha512-LCCV0HdSZZZb34qifBsyWlUmok6W7ouER+oQIGBScS8EsZsQbrtFTUrDX4hOl+CS6p7cnNC4td+qrSVGSCTUfQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"undici-types": "~6.21.0"
}
@@ -1532,7 +1529,6 @@
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz",
"integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==",
"license": "MIT",
"peer": true,
"bin": {
"acorn": "bin/acorn"
},
@@ -1945,7 +1941,6 @@
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
"dev": true,
"license": "ISC",
"peer": true,
"engines": {
"node": ">=12"
}
@@ -2653,7 +2648,6 @@
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=12"
},
@@ -2696,7 +2690,6 @@
"integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==",
"dev": true,
"license": "MIT",
"peer": true,
"bin": {
"prettier": "bin/prettier.cjs"
},
@@ -2869,7 +2862,6 @@
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.45.3.tgz",
"integrity": "sha512-ngKXNhNvwPzF43QqEhDOue7TQTrG09em1sd4HBxVF0Wr2gopAmdEWan+rgbdgK4fhBtSOTJO8bYU4chUG7VXZQ==",
"license": "MIT",
"peer": true,
"dependencies": {
"@jridgewell/remapping": "^2.3.4",
"@jridgewell/sourcemap-codec": "^1.5.0",
@@ -3014,7 +3006,6 @@
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
"dev": true,
"license": "Apache-2.0",
"peer": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
@@ -3036,7 +3027,6 @@
"integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"esbuild": "^0.25.0",
"fdir": "^6.4.4",

View File

@@ -6,11 +6,13 @@
deleteMessage,
editAndRegenerate,
regenerateLastResponse,
regenerateFromToken,
setEditingImage,
} from "$lib/stores/app.svelte";
import type { Message } from "$lib/stores/app.svelte";
import type { MessageAttachment } from "$lib/stores/app.svelte";
import MarkdownContent from "./MarkdownContent.svelte";
import TokenHeatmap from "./TokenHeatmap.svelte";
interface Props {
class?: string;
@@ -99,6 +101,23 @@
let copiedMessageId = $state<string | null>(null);
let expandedThinkingMessageIds = $state<Set<string>>(new Set());
// Uncertainty heatmap toggle
let heatmapMessageIds = $state<Set<string>>(new Set());
function toggleHeatmap(messageId: string) {
const next = new Set(heatmapMessageIds);
if (next.has(messageId)) {
next.delete(messageId);
} else {
next.add(messageId);
}
heatmapMessageIds = next;
}
function isHeatmapVisible(messageId: string): boolean {
return heatmapMessageIds.has(messageId);
}
function formatTimestamp(timestamp: number): string {
return new Date(timestamp).toLocaleTimeString("en-US", {
hour12: false,
@@ -548,13 +567,23 @@
>
</div>
{:else if message.content || (loading && !message.attachments?.some((a) => a.type === "generated-image"))}
<MarkdownContent
content={message.content || (loading ? response : "")}
/>
{#if loading && !message.content}
<span
class="inline-block w-2 h-4 bg-exo-yellow/70 ml-1 cursor-blink"
></span>
{#if isHeatmapVisible(message.id) && message.tokens && message.tokens.length > 0}
<TokenHeatmap
tokens={message.tokens}
isGenerating={loading &&
isLastAssistantMessage(message.id)}
onRegenerateFrom={(tokenIndex) =>
regenerateFromToken(message.id, tokenIndex)}
/>
{:else}
<MarkdownContent
content={message.content || (loading ? response : "")}
/>
{#if loading && !message.content}
<span
class="inline-block w-2 h-4 bg-exo-yellow/70 ml-1 cursor-blink"
></span>
{/if}
{/if}
{/if}
</div>
@@ -629,6 +658,35 @@
</button>
{/if}
<!-- Uncertainty heatmap toggle (assistant messages with tokens) -->
{#if message.role === "assistant" && message.tokens && message.tokens.length > 0}
<button
onclick={() => toggleHeatmap(message.id)}
class="p-1.5 transition-colors rounded cursor-pointer {isHeatmapVisible(
message.id,
)
? 'text-exo-yellow'
: 'text-exo-light-gray hover:text-exo-yellow'}"
title={isHeatmapVisible(message.id)
? "Hide uncertainty heatmap"
: "Show uncertainty heatmap"}
>
<svg
class="w-3.5 h-3.5"
fill="none"
viewBox="0 0 24 24"
stroke="currentColor"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M9 19v-6a2 2 0 00-2-2H5a2 2 0 00-2 2v6a2 2 0 002 2h2a2 2 0 002-2zm0 0V9a2 2 0 012-2h2a2 2 0 012 2v10m-6 0a2 2 0 002 2h2a2 2 0 002-2m0 0V5a2 2 0 012-2h2a2 2 0 012 2v14a2 2 0 01-2 2h-2a2 2 0 01-2-2z"
/>
</svg>
</button>
{/if}
<!-- Regenerate button (last assistant message only) -->
{#if message.role === "assistant" && isLastAssistantMessage(message.id) && !loading}
<button

View File

@@ -0,0 +1,229 @@
<script lang="ts">
import type { TokenData } from "$lib/stores/app.svelte";
interface Props {
tokens: TokenData[];
class?: string;
isGenerating?: boolean;
onRegenerateFrom?: (tokenIndex: number) => void;
}
let {
tokens,
class: className = "",
isGenerating = false,
onRegenerateFrom,
}: Props = $props();
// Tooltip state - track both token data and index
let hoveredTokenIndex = $state<number | null>(null);
let hoveredPosition = $state<{ x: number; y: number } | null>(null);
let isTooltipHovered = $state(false);
let hideTimeoutId: ReturnType<typeof setTimeout> | null = null;
// Derive the hovered token from the index (stable across re-renders)
const hoveredToken = $derived(
hoveredTokenIndex !== null && hoveredPosition && tokens[hoveredTokenIndex]
? {
token: tokens[hoveredTokenIndex],
index: hoveredTokenIndex,
...hoveredPosition,
}
: null,
);
/**
* Get confidence styling based on probability.
* Following Apple design principles: high confidence tokens blend in,
* only uncertainty draws attention.
*/
function getConfidenceClass(probability: number): string {
if (probability > 0.8) return "text-inherit"; // Expected tokens - blend in
if (probability > 0.5) return "bg-gray-500/10 text-inherit"; // Slight hint
if (probability > 0.2) return "bg-amber-500/15 text-amber-200/90"; // Subtle warmth
return "bg-red-500/20 text-red-200/90"; // Draws attention
}
/**
* Get border/underline styling for uncertain tokens
*/
function getBorderClass(probability: number): string {
if (probability > 0.8) return "border-transparent"; // No border for expected
if (probability > 0.5) return "border-gray-500/20";
if (probability > 0.2) return "border-amber-500/30";
return "border-red-500/40";
}
function clearHideTimeout() {
if (hideTimeoutId) {
clearTimeout(hideTimeoutId);
hideTimeoutId = null;
}
}
function handleMouseEnter(
event: MouseEvent,
token: TokenData,
index: number,
) {
clearHideTimeout();
const rect = (event.target as HTMLElement).getBoundingClientRect();
hoveredTokenIndex = index;
hoveredPosition = {
x: rect.left + rect.width / 2,
y: rect.top - 10,
};
}
function handleMouseLeave() {
clearHideTimeout();
// Use longer delay during generation to account for re-renders
const delay = isGenerating ? 300 : 100;
hideTimeoutId = setTimeout(() => {
if (!isTooltipHovered) {
hoveredTokenIndex = null;
hoveredPosition = null;
}
}, delay);
}
function handleTooltipEnter() {
clearHideTimeout();
isTooltipHovered = true;
}
function handleTooltipLeave() {
isTooltipHovered = false;
hoveredTokenIndex = null;
hoveredPosition = null;
}
function handleRegenerate() {
if (hoveredToken && onRegenerateFrom) {
const indexToRegenerate = hoveredToken.index;
// Clear hover state immediately
hoveredTokenIndex = null;
hoveredPosition = null;
isTooltipHovered = false;
// Call regenerate
onRegenerateFrom(indexToRegenerate);
}
}
function formatProbability(prob: number): string {
return (prob * 100).toFixed(1) + "%";
}
function formatLogprob(logprob: number): string {
return logprob.toFixed(3);
}
function getProbabilityColor(probability: number): string {
if (probability > 0.8) return "text-gray-300";
if (probability > 0.5) return "text-gray-400";
if (probability > 0.2) return "text-amber-400";
return "text-red-400";
}
</script>
<div class="token-heatmap leading-relaxed {className}">
{#each tokens as tokenData, i (i)}
<span
role="button"
tabindex="0"
class="token-span inline rounded px-0.5 py-0.5 cursor-pointer transition-all duration-150 border {getConfidenceClass(
tokenData.probability,
)} {getBorderClass(tokenData.probability)} hover:opacity-80"
onmouseenter={(e) => handleMouseEnter(e, tokenData, i)}
onmouseleave={handleMouseLeave}>{tokenData.token}</span
>
{/each}
</div>
<!-- Tooltip -->
{#if hoveredToken}
<div
class="fixed z-50"
style="left: {hoveredToken.x}px; top: {hoveredToken.y}px; transform: translate(-50%, -100%);"
onmouseenter={handleTooltipEnter}
onmouseleave={handleTooltipLeave}
>
<div
class="bg-gray-900/95 backdrop-blur-sm border border-gray-700/50 rounded-xl shadow-xl p-3 text-sm min-w-48"
>
<!-- Token info -->
<div class="mb-2">
<span class="text-gray-500 text-xs">Token:</span>
<span class="text-white font-mono ml-1"
>"{hoveredToken.token.token}"</span
>
<span class="{getProbabilityColor(hoveredToken.token.probability)} ml-2"
>{formatProbability(hoveredToken.token.probability)}</span
>
</div>
<div class="text-gray-400 text-xs mb-1">
logprob: <span class="text-gray-300 font-mono"
>{formatLogprob(hoveredToken.token.logprob)}</span
>
</div>
<!-- Top alternatives -->
{#if hoveredToken.token.topLogprobs.length > 0}
<div class="border-t border-gray-700/50 mt-2 pt-2">
<div class="text-gray-500 text-xs mb-1">Alternatives:</div>
{#each hoveredToken.token.topLogprobs.slice(0, 5) as alt, idx (idx)}
{@const altProb = Math.exp(alt.logprob)}
<div class="flex justify-between items-center text-xs py-0.5">
<span class="text-gray-300 font-mono truncate max-w-24"
>"{alt.token}"</span
>
<span class="text-gray-400 ml-2"
>{formatProbability(altProb)}</span
>
</div>
{/each}
</div>
{/if}
<!-- Regenerate button -->
{#if onRegenerateFrom}
<button
onclick={handleRegenerate}
class="w-full mt-2 pt-2 border-t border-gray-700/50 flex items-center justify-center gap-1.5 text-xs text-gray-400 hover:text-white transition-colors cursor-pointer"
>
<svg
class="w-3 h-3"
fill="none"
viewBox="0 0 24 24"
stroke="currentColor"
>
<path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15"
/>
</svg>
Regenerate from here
</button>
{/if}
</div>
<!-- Arrow -->
<div class="absolute left-1/2 -translate-x-1/2 top-full">
<div class="border-8 border-transparent border-t-gray-900"></div>
</div>
</div>
{/if}
<style>
.token-heatmap {
word-wrap: break-word;
white-space: pre-wrap;
}
.token-span {
margin: 0;
border-width: 1px;
}
</style>

View File

@@ -173,6 +173,11 @@ export interface PlacementPreviewResponse {
previews: PlacementPreview[];
}
interface ImageApiResponse {
created: number;
data: Array<{ b64_json?: string; url?: string }>;
}
interface RawStateResponse {
topology?: RawTopology;
instances?: Record<
@@ -207,6 +212,19 @@ export interface MessageAttachment {
mimeType?: string;
}
export interface TopLogprob {
token: string;
logprob: number;
bytes: number[] | null;
}
export interface TokenData {
token: string;
logprob: number;
probability: number;
topLogprobs: TopLogprob[];
}
export interface Message {
id: string;
role: "user" | "assistant" | "system";
@@ -218,6 +236,7 @@ export interface Message {
tps?: number; // Tokens per second (for assistant messages)
requestType?: "chat" | "image-generation" | "image-editing";
sourceImageDataUrl?: string; // For image editing regeneration
tokens?: TokenData[];
}
export interface Conversation {
@@ -505,7 +524,18 @@ class AppStore {
*/
private saveConversationsToStorage() {
try {
localStorage.setItem(STORAGE_KEY, JSON.stringify(this.conversations));
// Strip tokens from messages before saving to avoid bloating localStorage
const stripped = this.conversations.map((conv) => ({
...conv,
messages: conv.messages.map((msg) => {
if (msg.tokens) {
const { tokens: _, ...rest } = msg;
return rest;
}
return msg;
}),
}));
localStorage.setItem(STORAGE_KEY, JSON.stringify(stripped));
} catch (error) {
console.error("Failed to save conversations:", error);
}
@@ -1410,6 +1440,213 @@ class AppStore {
}
}
/**
* Regenerate response from a specific token index.
* Truncates the assistant message at the given token and re-generates from there.
*/
async regenerateFromToken(
messageId: string,
tokenIndex: number,
): Promise<void> {
if (this.isLoading) return;
const targetConversationId = this.activeConversationId;
if (!targetConversationId) return;
const msgIndex = this.messages.findIndex((m) => m.id === messageId);
if (msgIndex === -1) return;
const msg = this.messages[msgIndex];
if (
msg.role !== "assistant" ||
!msg.tokens ||
tokenIndex >= msg.tokens.length
)
return;
// Keep tokens up to (not including) the specified index
const tokensToKeep = msg.tokens.slice(0, tokenIndex);
const prefixText = tokensToKeep.map((t) => t.token).join("");
// Remove all messages after this assistant message
this.messages = this.messages.slice(0, msgIndex + 1);
// Update the message to show the prefix
this.messages[msgIndex].content = prefixText;
this.messages[msgIndex].tokens = tokensToKeep;
this.updateActiveConversation();
// Set up for continuation - modify the existing message in place
this.isLoading = true;
this.currentResponse = prefixText;
this.ttftMs = null;
this.tps = null;
this.totalTokens = tokensToKeep.length;
try {
// Build messages for API - include the partial assistant message
const systemPrompt = {
role: "system" as const,
content:
"You are a helpful AI assistant. Respond directly and concisely. Do not show your reasoning or thought process.",
};
const apiMessages = [
systemPrompt,
...this.messages.map((m) => {
let msgContent = m.content;
if (m.attachments) {
for (const attachment of m.attachments) {
if (attachment.type === "text" && attachment.content) {
msgContent += `\n\n[File: ${attachment.name}]\n\`\`\`\n${attachment.content}\n\`\`\``;
}
}
}
return { role: m.role, content: msgContent };
}),
];
const modelToUse = this.getModelForRequest();
if (!modelToUse) {
throw new Error("No model available");
}
const requestStartTime = performance.now();
let firstTokenTime: number | null = null;
let tokenCount = tokensToKeep.length;
const response = await fetch("/v1/chat/completions", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
model: modelToUse,
messages: apiMessages,
stream: true,
logprobs: true,
top_logprobs: 5,
}),
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`API error: ${response.status} - ${errorText}`);
}
const reader = response.body?.getReader();
if (!reader) throw new Error("No response body");
let fullContent = prefixText;
const collectedTokens: TokenData[] = [...tokensToKeep];
interface ChatCompletionChunk {
choices?: Array<{
delta?: { content?: string };
logprobs?: {
content?: Array<{
token: string;
logprob: number;
top_logprobs?: Array<{
token: string;
logprob: number;
bytes: number[] | null;
}>;
}>;
};
}>;
}
await this.parseSSEStream<ChatCompletionChunk>(
reader,
targetConversationId,
(parsed) => {
const choice = parsed.choices?.[0];
const delta = choice?.delta?.content;
// Collect logprobs data
const logprobsContent = choice?.logprobs?.content;
if (logprobsContent) {
for (const item of logprobsContent) {
collectedTokens.push({
token: item.token,
logprob: item.logprob,
probability: Math.exp(item.logprob),
topLogprobs: (item.top_logprobs || []).map((t) => ({
token: t.token,
logprob: t.logprob,
bytes: t.bytes,
})),
});
}
}
if (delta) {
if (firstTokenTime === null) {
firstTokenTime = performance.now();
this.ttftMs = firstTokenTime - requestStartTime;
}
tokenCount += 1;
this.totalTokens = tokenCount;
if (firstTokenTime !== null && tokenCount > tokensToKeep.length) {
const elapsed = performance.now() - firstTokenTime;
this.tps = ((tokenCount - tokensToKeep.length) / elapsed) * 1000;
}
fullContent += delta;
const { displayContent, thinkingContent } =
this.stripThinkingTags(fullContent);
if (this.activeConversationId === targetConversationId) {
this.currentResponse = displayContent;
}
// Update existing message in place
this.updateConversationMessage(
targetConversationId,
messageId,
(m) => {
m.content = displayContent;
m.thinking = thinkingContent || undefined;
m.tokens = [...collectedTokens];
},
);
this.syncActiveMessagesIfNeeded(targetConversationId);
this.persistConversation(targetConversationId);
}
},
);
// Final update
if (this.conversationExists(targetConversationId)) {
const { displayContent, thinkingContent } =
this.stripThinkingTags(fullContent);
this.updateConversationMessage(targetConversationId, messageId, (m) => {
m.content = displayContent;
m.thinking = thinkingContent || undefined;
m.tokens = [...collectedTokens];
if (this.ttftMs !== null) m.ttftMs = this.ttftMs;
if (this.tps !== null) m.tps = this.tps;
});
this.syncActiveMessagesIfNeeded(targetConversationId);
this.persistConversation(targetConversationId);
}
} catch (error) {
console.error("Error regenerating from token:", error);
if (this.conversationExists(targetConversationId)) {
this.updateConversationMessage(targetConversationId, messageId, (m) => {
m.content = `${prefixText}\n\nError: ${error instanceof Error ? error.message : "Unknown error"}`;
});
this.syncActiveMessagesIfNeeded(targetConversationId);
this.persistConversation(targetConversationId);
}
} finally {
this.isLoading = false;
this.currentResponse = "";
this.saveConversationsToStorage();
}
}
/**
* Helper method to regenerate a chat completion response
*/
@@ -1478,6 +1715,8 @@ class AppStore {
model: modelToUse,
messages: apiMessages,
stream: true,
logprobs: true,
top_logprobs: 5,
}),
});
@@ -1492,16 +1731,49 @@ class AppStore {
}
let streamedContent = "";
const collectedTokens: TokenData[] = [];
interface ChatCompletionChunk {
choices?: Array<{ delta?: { content?: string } }>;
choices?: Array<{
delta?: { content?: string };
logprobs?: {
content?: Array<{
token: string;
logprob: number;
top_logprobs?: Array<{
token: string;
logprob: number;
bytes: number[] | null;
}>;
}>;
};
}>;
}
await this.parseSSEStream<ChatCompletionChunk>(
reader,
targetConversationId,
(parsed) => {
const delta = parsed.choices?.[0]?.delta?.content;
const choice = parsed.choices?.[0];
const delta = choice?.delta?.content;
// Collect logprobs data
const logprobsContent = choice?.logprobs?.content;
if (logprobsContent) {
for (const item of logprobsContent) {
collectedTokens.push({
token: item.token,
logprob: item.logprob,
probability: Math.exp(item.logprob),
topLogprobs: (item.top_logprobs || []).map((t) => ({
token: t.token,
logprob: t.logprob,
bytes: t.bytes,
})),
});
}
}
if (delta) {
streamedContent += delta;
const { displayContent, thinkingContent } =
@@ -1519,6 +1791,7 @@ class AppStore {
(msg) => {
msg.content = displayContent;
msg.thinking = thinkingContent || undefined;
msg.tokens = [...collectedTokens];
},
);
this.syncActiveMessagesIfNeeded(targetConversationId);
@@ -1537,6 +1810,7 @@ class AppStore {
(msg) => {
msg.content = displayContent;
msg.thinking = thinkingContent || undefined;
msg.tokens = [...collectedTokens];
},
);
this.syncActiveMessagesIfNeeded(targetConversationId);
@@ -2095,107 +2369,137 @@ class AppStore {
throw new Error(`API error: ${response.status} - ${errorText}`);
}
const reader = response.body?.getReader();
if (!reader) {
throw new Error("No response body");
}
// Streaming requires both stream=true AND partialImages > 0
const isStreaming = params.stream && params.partialImages > 0;
interface ImageGenerationChunk {
data?: { b64_json?: string };
format?: string;
type?: "partial" | "final";
image_index?: number;
partial_index?: number;
total_partials?: number;
}
if (!isStreaming) {
// Non-streaming: parse JSON response directly
const jsonResponse = (await response.json()) as ImageApiResponse;
const format = params.outputFormat || "png";
const mimeType = `image/${format}`;
const numImages = params.numImages;
const attachments: MessageAttachment[] = jsonResponse.data
.filter((img) => img.b64_json)
.map((img, index) => ({
type: "generated-image" as const,
name: `generated-image-${index + 1}.${format}`,
preview: `data:${mimeType};base64,${img.b64_json}`,
mimeType,
}));
await this.parseSSEStream<ImageGenerationChunk>(
reader,
targetConversationId,
(parsed) => {
const imageData = parsed.data?.b64_json;
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = "";
msg.attachments = attachments;
},
);
this.syncActiveMessagesIfNeeded(targetConversationId);
} else {
// Streaming mode: use SSE parser
const reader = response.body?.getReader();
if (!reader) {
throw new Error("No response body");
}
if (imageData) {
const format = parsed.format || "png";
const mimeType = `image/${format}`;
const imageIndex = parsed.image_index ?? 0;
interface ImageGenerationChunk {
data?: { b64_json?: string };
format?: string;
type?: "partial" | "final";
image_index?: number;
partial_index?: number;
total_partials?: number;
}
if (parsed.type === "partial") {
// Update with partial image and progress
const partialNum = (parsed.partial_index ?? 0) + 1;
const totalPartials = parsed.total_partials ?? 3;
const progressText =
numImages > 1
? `Generating image ${imageIndex + 1}/${numImages}... ${partialNum}/${totalPartials}`
: `Generating... ${partialNum}/${totalPartials}`;
const numImages = params.numImages;
const partialAttachment: MessageAttachment = {
type: "generated-image",
name: `generated-image.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
};
await this.parseSSEStream<ImageGenerationChunk>(
reader,
targetConversationId,
(parsed) => {
const imageData = parsed.data?.b64_json;
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = progressText;
if (imageIndex === 0) {
// First image - safe to replace attachments with partial preview
msg.attachments = [partialAttachment];
} else {
// Subsequent images - keep existing finals, show partial at current position
const existingAttachments = msg.attachments || [];
// Keep only the completed final images (up to current imageIndex)
const finals = existingAttachments.slice(0, imageIndex);
msg.attachments = [...finals, partialAttachment];
}
},
);
} else if (parsed.type === "final") {
// Final image - replace partial at this position
const newAttachment: MessageAttachment = {
type: "generated-image",
name: `generated-image-${imageIndex + 1}.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
};
if (imageData) {
const format = parsed.format || "png";
const mimeType = `image/${format}`;
const imageIndex = parsed.image_index ?? 0;
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
if (imageIndex === 0) {
// First final image - replace any partial preview
msg.attachments = [newAttachment];
} else {
// Subsequent images - keep previous finals, replace partial at current position
const existingAttachments = msg.attachments || [];
// Slice keeps indices 0 to imageIndex-1 (the previous final images)
const previousFinals = existingAttachments.slice(
0,
imageIndex,
);
msg.attachments = [...previousFinals, newAttachment];
}
if (parsed.type === "partial") {
// Update with partial image and progress
const partialNum = (parsed.partial_index ?? 0) + 1;
const totalPartials = parsed.total_partials ?? 3;
const progressText =
numImages > 1
? `Generating image ${imageIndex + 1}/${numImages}... ${partialNum}/${totalPartials}`
: `Generating... ${partialNum}/${totalPartials}`;
// Update progress message for multiple images
if (numImages > 1 && imageIndex < numImages - 1) {
msg.content = `Generating image ${imageIndex + 2}/${numImages}...`;
} else {
msg.content = "";
}
},
);
const partialAttachment: MessageAttachment = {
type: "generated-image",
name: `generated-image.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
};
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = progressText;
if (imageIndex === 0) {
// First image - safe to replace attachments with partial preview
msg.attachments = [partialAttachment];
} else {
// Subsequent images - keep existing finals, show partial at current position
const existingAttachments = msg.attachments || [];
// Keep only the completed final images (up to current imageIndex)
const finals = existingAttachments.slice(0, imageIndex);
msg.attachments = [...finals, partialAttachment];
}
},
);
} else if (parsed.type === "final") {
// Final image - replace partial at this position
const newAttachment: MessageAttachment = {
type: "generated-image",
name: `generated-image-${imageIndex + 1}.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
};
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
if (imageIndex === 0) {
// First final image - replace any partial preview
msg.attachments = [newAttachment];
} else {
// Subsequent images - keep previous finals, replace partial at current position
const existingAttachments = msg.attachments || [];
// Slice keeps indices 0 to imageIndex-1 (the previous final images)
const previousFinals = existingAttachments.slice(
0,
imageIndex,
);
msg.attachments = [...previousFinals, newAttachment];
}
// Update progress message for multiple images
if (numImages > 1 && imageIndex < numImages - 1) {
msg.content = `Generating image ${imageIndex + 2}/${numImages}...`;
} else {
msg.content = "";
}
},
);
}
this.syncActiveMessagesIfNeeded(targetConversationId);
}
this.syncActiveMessagesIfNeeded(targetConversationId);
}
},
);
},
);
}
} catch (error) {
console.error("Error generating image:", error);
this.handleStreamingError(
@@ -2343,69 +2647,98 @@ class AppStore {
throw new Error(`API error: ${apiResponse.status} - ${errorText}`);
}
const reader = apiResponse.body?.getReader();
if (!reader) {
throw new Error("No response body");
}
// Streaming requires both stream=true AND partialImages > 0
const isStreaming = params.stream && params.partialImages > 0;
interface ImageEditChunk {
data?: { b64_json?: string };
format?: string;
type?: "partial" | "final";
partial_index?: number;
total_partials?: number;
}
if (!isStreaming) {
// Non-streaming: parse JSON response directly
const jsonResponse = (await apiResponse.json()) as ImageApiResponse;
const format = params.outputFormat || "png";
const mimeType = `image/${format}`;
const attachments: MessageAttachment[] = jsonResponse.data
.filter((img) => img.b64_json)
.map((img) => ({
type: "generated-image" as const,
name: `edited-image.${format}`,
preview: `data:${mimeType};base64,${img.b64_json}`,
mimeType,
}));
await this.parseSSEStream<ImageEditChunk>(
reader,
targetConversationId,
(parsed) => {
const imageData = parsed.data?.b64_json;
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = "";
msg.attachments = attachments;
},
);
this.syncActiveMessagesIfNeeded(targetConversationId);
} else {
// Streaming mode: use SSE parser
const reader = apiResponse.body?.getReader();
if (!reader) {
throw new Error("No response body");
}
if (imageData) {
const format = parsed.format || "png";
const mimeType = `image/${format}`;
if (parsed.type === "partial") {
// Update with partial image and progress
const partialNum = (parsed.partial_index ?? 0) + 1;
const totalPartials = parsed.total_partials ?? 3;
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = `Editing... ${partialNum}/${totalPartials}`;
msg.attachments = [
{
type: "generated-image",
name: `edited-image.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
},
];
},
);
} else if (parsed.type === "final") {
// Final image
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = "";
msg.attachments = [
{
type: "generated-image",
name: `edited-image.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
},
];
},
);
interface ImageEditChunk {
data?: { b64_json?: string };
format?: string;
type?: "partial" | "final";
partial_index?: number;
total_partials?: number;
}
await this.parseSSEStream<ImageEditChunk>(
reader,
targetConversationId,
(parsed) => {
const imageData = parsed.data?.b64_json;
if (imageData) {
const format = parsed.format || "png";
const mimeType = `image/${format}`;
if (parsed.type === "partial") {
// Update with partial image and progress
const partialNum = (parsed.partial_index ?? 0) + 1;
const totalPartials = parsed.total_partials ?? 3;
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = `Editing... ${partialNum}/${totalPartials}`;
msg.attachments = [
{
type: "generated-image",
name: `edited-image.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
},
];
},
);
} else if (parsed.type === "final") {
// Final image
this.updateConversationMessage(
targetConversationId,
assistantMessage.id,
(msg) => {
msg.content = "";
msg.attachments = [
{
type: "generated-image",
name: `edited-image.${format}`,
preview: `data:${mimeType};base64,${imageData}`,
mimeType,
},
];
},
);
}
this.syncActiveMessagesIfNeeded(targetConversationId);
}
this.syncActiveMessagesIfNeeded(targetConversationId);
}
},
);
},
);
}
} catch (error) {
console.error("Error editing image:", error);
this.handleStreamingError(
@@ -2556,6 +2889,8 @@ export const editMessage = (messageId: string, newContent: string) =>
export const editAndRegenerate = (messageId: string, newContent: string) =>
appStore.editAndRegenerate(messageId, newContent);
export const regenerateLastResponse = () => appStore.regenerateLastResponse();
export const regenerateFromToken = (messageId: string, tokenIndex: number) =>
appStore.regenerateFromToken(messageId, tokenIndex);
// Conversation actions
export const conversations = () => appStore.conversations;

65
flake.lock generated
View File

@@ -21,7 +21,9 @@
"nixpkgs"
],
"purescript-overlay": "purescript-overlay",
"pyproject-nix": "pyproject-nix"
"pyproject-nix": [
"pyproject-nix"
]
},
"locked": {
"lastModified": 1765953015,
@@ -149,19 +151,44 @@
"type": "github"
}
},
"pyproject-build-systems": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"pyproject-nix": [
"pyproject-nix"
],
"uv2nix": [
"uv2nix"
]
},
"locked": {
"lastModified": 1763662255,
"narHash": "sha256-4bocaOyLa3AfiS8KrWjZQYu+IAta05u3gYZzZ6zXbT0=",
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"rev": "042904167604c681a090c07eb6967b4dd4dae88c",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"type": "github"
}
},
"pyproject-nix": {
"inputs": {
"nixpkgs": [
"dream2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1763017646,
"narHash": "sha256-Z+R2lveIp6Skn1VPH3taQIuMhABg1IizJd8oVdmdHsQ=",
"lastModified": 1764134915,
"narHash": "sha256-xaKvtPx6YAnA3HQVp5LwyYG1MaN4LLehpQI8xEdBvBY=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "47bd6f296502842643078d66128f7b5e5370790c",
"rev": "2c8df1383b32e5443c921f61224b198a2282a657",
"type": "github"
},
"original": {
@@ -178,7 +205,10 @@
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs",
"nixpkgs-swift": "nixpkgs-swift",
"treefmt-nix": "treefmt-nix"
"pyproject-build-systems": "pyproject-build-systems",
"pyproject-nix": "pyproject-nix",
"treefmt-nix": "treefmt-nix",
"uv2nix": "uv2nix"
}
},
"rust-analyzer-src": {
@@ -239,6 +269,29 @@
"repo": "treefmt-nix",
"type": "github"
}
},
"uv2nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"pyproject-nix": [
"pyproject-nix"
]
},
"locked": {
"lastModified": 1767701098,
"narHash": "sha256-CJhKZnWb3gumR9oTRjFvCg/6lYTGbZRU7xtvcyWIRwU=",
"owner": "pyproject-nix",
"repo": "uv2nix",
"rev": "9d357f0d2ce6f5f35ec7959d7e704452352eb4da",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "uv2nix",
"type": "github"
}
}
},
"root": "root",

View File

@@ -24,6 +24,26 @@
dream2nix = {
url = "github:nix-community/dream2nix";
inputs.nixpkgs.follows = "nixpkgs";
inputs.pyproject-nix.follows = "pyproject-nix";
};
# Python packaging with uv2nix
pyproject-nix = {
url = "github:pyproject-nix/pyproject.nix";
inputs.nixpkgs.follows = "nixpkgs";
};
uv2nix = {
url = "github:pyproject-nix/uv2nix";
inputs.pyproject-nix.follows = "pyproject-nix";
inputs.nixpkgs.follows = "nixpkgs";
};
pyproject-build-systems = {
url = "github:pyproject-nix/build-system-pkgs";
inputs.pyproject-nix.follows = "pyproject-nix";
inputs.uv2nix.follows = "uv2nix";
inputs.nixpkgs.follows = "nixpkgs";
};
# Pinned nixpkgs for swift-format (swift is broken on x86_64-linux in newer nixpkgs)
@@ -48,6 +68,7 @@
inputs.treefmt-nix.flakeModule
./dashboard/parts.nix
./rust/parts.nix
./python/parts.nix
];
perSystem =
@@ -58,6 +79,11 @@
pkgsSwift = import inputs.nixpkgs-swift { inherit system; };
in
{
# Allow unfree for metal-toolchain (needed for Darwin Metal packages)
_module.args.pkgs = import inputs.nixpkgs {
inherit system;
config.allowUnfreePredicate = pkg: (pkg.pname or "") == "metal-toolchain";
};
treefmt = {
projectRootFile = "flake.nix";
programs = {
@@ -79,14 +105,24 @@
enable = true;
package = pkgsSwift.swiftPackages.swift-format;
};
shfmt.enable = true;
};
};
checks.lint = pkgs.runCommand "lint-check" { } ''
export RUFF_CACHE_DIR="$TMPDIR/ruff-cache"
${pkgs.ruff}/bin/ruff check ${inputs.self}/
touch $out
'';
packages = lib.optionalAttrs pkgs.stdenv.hostPlatform.isDarwin (
let
uvLock = builtins.fromTOML (builtins.readFile ./uv.lock);
mlxPackage = builtins.head (builtins.filter (p: p.name == "mlx") uvLock.package);
uvLockMlxVersion = mlxPackage.version;
in
{
metal-toolchain = pkgs.callPackage ./nix/metal-toolchain.nix { };
mlx = pkgs.callPackage ./nix/mlx.nix {
metal-toolchain = self'.packages.metal-toolchain;
inherit uvLockMlxVersion;
};
}
);
devShells.default = with pkgs; pkgs.mkShell {
inputsFrom = [ self'.checks.cargo-build ];

View File

@@ -1,7 +1,7 @@
export NIX_CONFIG := "extra-experimental-features = nix-command flakes"
fmt:
nix fmt
treefmt || nix fmt
lint:
uv run ruff check --fix

View File

@@ -0,0 +1,79 @@
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0ed30932..d8528132 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -177,11 +177,7 @@ if(MLX_BUILD_METAL)
add_compile_definitions(MLX_METAL_DEBUG)
endif()
- # Throw an error if xcrun not found
- execute_process(
- COMMAND zsh "-c" "/usr/bin/xcrun -sdk macosx --show-sdk-version"
- OUTPUT_VARIABLE MACOS_SDK_VERSION
- OUTPUT_STRIP_TRAILING_WHITESPACE COMMAND_ERROR_IS_FATAL ANY)
+ set(MACOS_SDK_VERSION @sdkVersion@)
if(${MACOS_SDK_VERSION} LESS 14.0)
message(
@@ -199,11 +195,8 @@ if(MLX_BUILD_METAL)
endif()
set(XCRUN_FLAGS "-mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET}")
endif()
- execute_process(
- COMMAND
- zsh "-c"
- "echo \"__METAL_VERSION__\" | xcrun -sdk macosx metal ${XCRUN_FLAGS} -E -x metal -P - | tail -1 | tr -d '\n'"
- OUTPUT_VARIABLE MLX_METAL_VERSION COMMAND_ERROR_IS_FATAL ANY)
+ set(
+ MLX_METAL_VERSION @metalVersion@)
FetchContent_Declare(metal_cpp URL ${METAL_CPP_URL})
FetchContent_MakeAvailable(metal_cpp)
target_include_directories(
diff --git a/cmake/extension.cmake b/cmake/extension.cmake
index 13db804a..5b385132 100644
--- a/cmake/extension.cmake
+++ b/cmake/extension.cmake
@@ -36,7 +36,7 @@ macro(mlx_build_metallib)
add_custom_command(
OUTPUT ${MTLLIB_BUILD_TARGET}
COMMAND
- xcrun -sdk macosx metal
+ metal -fmodules-cache-path=${CMAKE_BINARY_DIR}/metal-cache
"$<LIST:TRANSFORM,${MTLLIB_INCLUDE_DIRS},PREPEND,-I>"
${MTLLIB_COMPILE_OPTIONS} ${MTLLIB_SOURCES} -o ${MTLLIB_BUILD_TARGET}
DEPENDS ${MTLLIB_DEPS} ${MTLLIB_SOURCES}
diff --git a/mlx/backend/metal/kernels/CMakeLists.txt b/mlx/backend/metal/kernels/CMakeLists.txt
index 262b0495..5c7446ad 100644
--- a/mlx/backend/metal/kernels/CMakeLists.txt
+++ b/mlx/backend/metal/kernels/CMakeLists.txt
@@ -29,7 +29,7 @@ function(build_kernel_base TARGET SRCFILE DEPS)
"-mmacosx-version-min=${CMAKE_OSX_DEPLOYMENT_TARGET}")
endif()
add_custom_command(
- COMMAND xcrun -sdk macosx metal ${METAL_FLAGS} -c ${SRCFILE}
+ COMMAND metal -fmodules-cache-path=${CMAKE_BINARY_DIR}/metal-cache ${METAL_FLAGS} -c ${SRCFILE}
-I${PROJECT_SOURCE_DIR} -o ${TARGET}.air
DEPENDS ${SRCFILE} ${DEPS} ${BASE_HEADERS}
OUTPUT ${TARGET}.air
@@ -170,7 +170,7 @@ endif()
add_custom_command(
OUTPUT ${MLX_METAL_PATH}/mlx.metallib
- COMMAND xcrun -sdk macosx metallib ${KERNEL_AIR} -o
+ COMMAND metallib ${KERNEL_AIR} -o
${MLX_METAL_PATH}/mlx.metallib
DEPENDS ${KERNEL_AIR}
COMMENT "Building mlx.metallib"
diff --git a/mlx/backend/metal/make_compiled_preamble.sh b/mlx/backend/metal/make_compiled_preamble.sh
index bb55ed3a..94ea7dd7 100644
--- a/mlx/backend/metal/make_compiled_preamble.sh
+++ b/mlx/backend/metal/make_compiled_preamble.sh
@@ -31,7 +31,7 @@ OUTPUT_FILE=${OUTPUT_DIR}/${SRC_NAME}.cpp
mkdir -p "$OUTPUT_DIR"
# Use the metal compiler to get a list of headers (with depth)
-CCC="xcrun -sdk macosx metal -x metal"
+CCC="metal -x metal -fmodules-cache-path=${OUTPUT_DIR}/metal-cache"
HDRS=$( $CCC -I"$SRC_DIR" -I"$JIT_INCLUDES" -DMLX_METAL_JIT -E -P -CC -C -H "$INPUT_FILE" $CFLAGS -w 2>&1 1>/dev/null )
# Remove any included system frameworks (for MetalPerformancePrimitive headers)

56
nix/metal-toolchain.nix Normal file
View File

@@ -0,0 +1,56 @@
{ lib, stdenvNoCC, requireFile, nix }:
let
narFile = requireFile {
name = "metal-toolchain-17C48.nar";
message = ''
The Metal Toolchain NAR must be available.
If you have cachix configured for exo.cachix.org, this should be automatic.
Otherwise:
1. Install Xcode 26+ from the App Store
2. Run: xcodebuild -downloadComponent MetalToolchain
3. Export the toolchain:
hdiutil attach "$(find /System/Library/AssetsV2/com_apple_MobileAsset_MetalToolchain -name '*.dmg' | head -1)" -mountpoint /tmp/metal-dmg
cp -R /tmp/metal-dmg/Metal.xctoolchain /tmp/metal-export
hdiutil detach /tmp/metal-dmg
4. Create NAR and add to store:
nix nar pack /tmp/metal-export > /tmp/metal-toolchain-17C48.nar
nix store add --mode flat /tmp/metal-toolchain-17C48.nar
'';
hash = "sha256-ayR5mXN4sZAddwKEG2OszGRF93k9ZFc7H0yi2xbylQw=";
};
in
stdenvNoCC.mkDerivation {
pname = "metal-toolchain";
version = "17C48";
dontUnpack = true;
dontBuild = true;
dontFixup = true;
nativeBuildInputs = [ nix ];
installPhase = ''
runHook preInstall
nix-store --restore $out < ${narFile}
# Create bin directory with symlinks for PATH
mkdir -p $out/bin
ln -s $out/usr/bin/metal $out/bin/metal
ln -s $out/usr/bin/metallib $out/bin/metallib
runHook postInstall
'';
# Metal language version for CMake (from: echo __METAL_VERSION__ | metal -E -x metal -P -)
passthru.metalVersion = "400";
meta = {
description = "Apple Metal compiler toolchain";
platforms = [ "aarch64-darwin" ];
license = lib.licenses.unfree;
};
}

158
nix/mlx.nix Normal file
View File

@@ -0,0 +1,158 @@
{ stdenv
, lib
, fetchFromGitHub
, replaceVars
, fetchzip
, cmake
, nlohmann_json
, apple-sdk_26
, metal-toolchain
, runCommand
, fmt
, python313Packages
, uvLockMlxVersion
}:
assert stdenv.isDarwin;
let
python = python313Packages.python;
# Static dependencies included directly during compilation
gguf-tools = fetchFromGitHub {
owner = "antirez";
repo = "gguf-tools";
rev = "8fa6eb65236618e28fd7710a0fba565f7faa1848";
hash = "sha256-15FvyPOFqTOr5vdWQoPnZz+mYH919++EtghjozDlnSA=";
};
metal_cpp = fetchzip {
url = "https://developer.apple.com/metal/cpp/files/metal-cpp_26.zip";
hash = "sha256-7n2eI2lw/S+Us6l7YPAATKwcIbRRpaQ8VmES7S8ZjY8=";
};
nanobind = fetchFromGitHub {
owner = "wjakob";
repo = "nanobind";
rev = "v2.10.2";
hash = "sha256-io44YhN+VpfHFWyvvLWSanRgbzA0whK8WlDNRi3hahU=";
fetchSubmodules = true;
};
mlx = stdenv.mkDerivation rec {
pname = "mlx";
version = let v = "0.30.4"; in
assert v == uvLockMlxVersion || throw "MLX version mismatch: nix/mlx.nix has ${v} but uv.lock has ${uvLockMlxVersion}. Update both the version and hash in nix/mlx.nix.";
v;
pyproject = true;
src = fetchFromGitHub {
owner = "ml-explore";
repo = "mlx";
tag = "v${version}";
hash = "sha256-OJk6jPlbaSlsUdk3ADz3tWcRzTWXRof3/q8Soe1AO6w=";
};
patches = [
(replaceVars ./darwin-build-fixes.patch {
sdkVersion = apple-sdk_26.version;
metalVersion = metal-toolchain.metalVersion;
})
];
postPatch = ''
substituteInPlace mlx/backend/cpu/jit_compiler.cpp \
--replace-fail "g++" "$CXX"
'';
dontUseCmakeConfigure = true;
enableParallelBuilding = true;
# Allows multiple cores to be used in Python builds.
postUnpack = ''
export MAKEFLAGS+="''${enableParallelBuilding:+-j$NIX_BUILD_CORES}"
'';
# Updates the wrong fetcher rev attribute
passthru.skipBulkUpdate = true;
env = {
DEV_RELEASE = 1;
CMAKE_ARGS = toString [
(lib.cmakeBool "USE_SYSTEM_FMT" true)
(lib.cmakeOptionType "filepath" "FETCHCONTENT_SOURCE_DIR_GGUFLIB" "${gguf-tools}")
(lib.cmakeOptionType "filepath" "FETCHCONTENT_SOURCE_DIR_JSON" "${nlohmann_json.src}")
(lib.cmakeOptionType "filepath" "FETCHCONTENT_SOURCE_DIR_NANOBIND" "${nanobind}")
(lib.cmakeBool "FETCHCONTENT_FULLY_DISCONNECTED" true)
(lib.cmakeBool "MLX_BUILD_METAL" true)
(lib.cmakeOptionType "filepath" "FETCHCONTENT_SOURCE_DIR_METAL_CPP" "${metal_cpp}")
(lib.cmakeOptionType "string" "CMAKE_OSX_DEPLOYMENT_TARGET" "${apple-sdk_26.version}")
(lib.cmakeOptionType "filepath" "CMAKE_OSX_SYSROOT" "${apple-sdk_26.passthru.sdkroot}")
];
SDKROOT = apple-sdk_26.passthru.sdkroot;
MACOSX_DEPLOYMENT_TARGET = apple-sdk_26.version;
};
build-system = [
python313Packages.setuptools
];
nativeBuildInputs = [
cmake
metal-toolchain
python313Packages.pypaBuildHook
python313Packages.pypaInstallHook
python313Packages.setuptools
python313Packages.typing-extensions
python313Packages.wheel
python313Packages.cmake
python313Packages.ninja
];
buildInputs = [
fmt
gguf-tools
python313Packages.nanobind
python313Packages.pybind11
apple-sdk_26
];
# Tests require Metal GPU access which isn't available in the Nix sandbox.
# To run tests, build with: nix build --option sandbox false .#mlx.passthru.tests.mlxTest
doCheck = false;
pythonImportsCheck = [ "mlx" ];
passthru.tests = {
# Runs example scripts to verify MLX works. Requires --option sandbox false
# since Metal GPU access is needed.
mlxTest =
runCommand "run-mlx-examples"
{
buildInputs = [ mlx ];
nativeBuildInputs = [ python ];
}
''
cp ${src}/examples/python/logistic_regression.py .
${python.interpreter} logistic_regression.py
rm logistic_regression.py
cp ${src}/examples/python/linear_regression.py .
${python.interpreter} linear_regression.py
rm linear_regression.py
touch $out
'';
};
meta = {
homepage = "https://github.com/ml-explore/mlx";
description = "Array framework for Apple silicon";
changelog = "https://github.com/ml-explore/mlx/releases/tag/${src.tag}";
license = lib.licenses.mit;
platforms = [ "aarch64-darwin" ];
};
};
in
mlx

View File

@@ -17,9 +17,9 @@ dependencies = [
"loguru>=0.7.3",
"exo_pyo3_bindings", # rust bindings
"anyio==4.11.0",
"mlx==0.30.3; sys_platform == 'darwin'",
"mlx[cpu]==0.30.3; sys_platform == 'linux'",
"mlx-lm==0.30.5",
"mlx==0.30.4; sys_platform == 'darwin'",
"mlx[cpu]==0.30.4; sys_platform == 'linux'",
"mlx-lm",
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
"hypercorn>=0.18.0",
"openai-harmony>=0.0.8",
@@ -63,6 +63,7 @@ members = [
[tool.uv.sources]
exo_pyo3_bindings = { workspace = true }
mlx-lm = { git = "https://github.com/ml-explore/mlx-lm", branch = "main" }
# Uncomment to use local mlx/mlx-lm development versions:
# mlx = { path = "/Users/Shared/mlx", editable=true }
# mlx-lm = { path = "/Users/Shared/mlx-lm", editable=true }

94
python/parts.nix Normal file
View File

@@ -0,0 +1,94 @@
{ inputs, ... }:
{
perSystem =
{ config, self', pkgs, lib, system, ... }:
let
# Load workspace from uv.lock
workspace = inputs.uv2nix.lib.workspace.loadWorkspace {
workspaceRoot = inputs.self;
};
# Create overlay from workspace
# Use wheels from PyPI for most packages; we override mlx with our pure Nix Metal build
overlay = workspace.mkPyprojectOverlay { sourcePreference = "wheel"; };
# Override overlay to inject Nix-built components
exoOverlay = final: prev: {
# Replace workspace exo_pyo3_bindings with Nix-built wheel
exo-pyo3-bindings = pkgs.stdenv.mkDerivation {
pname = "exo-pyo3-bindings";
version = "0.1.0";
src = self'.packages.exo_pyo3_bindings;
# Install from pre-built wheel
nativeBuildInputs = [ final.pyprojectWheelHook ];
dontStrip = true;
};
};
python = pkgs.python313;
# Overlay to provide build systems and custom packages
buildSystemsOverlay = final: prev: {
# Use our pure Nix-built MLX with Metal support
mlx = self'.packages.mlx;
# mlx-lm is a git dependency that needs setuptools
mlx-lm = prev.mlx-lm.overrideAttrs (old: {
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [
final.setuptools
];
});
};
pythonSet = (pkgs.callPackage inputs.pyproject-nix.build.packages {
inherit python;
}).overrideScope (
lib.composeManyExtensions [
inputs.pyproject-build-systems.overlays.default
overlay
exoOverlay
buildSystemsOverlay
]
);
exoVenv = pythonSet.mkVirtualEnv "exo-env" workspace.deps.default;
# Virtual environment with dev dependencies for testing
testVenv = pythonSet.mkVirtualEnv "exo-test-env" (
workspace.deps.default // {
exo = [ "dev" ]; # Include pytest, pytest-asyncio, pytest-env
}
);
exoPackage = pkgs.runCommand "exo"
{
nativeBuildInputs = [ pkgs.makeWrapper ];
}
''
mkdir -p $out/bin
# Create wrapper scripts
for script in exo exo-master exo-worker; do
makeWrapper ${exoVenv}/bin/$script $out/bin/$script \
--set DASHBOARD_DIR ${self'.packages.dashboard} \
${lib.optionalString pkgs.stdenv.isDarwin "--prefix PATH : ${pkgs.macmon}/bin"}
done
'';
in
{
# Python package only available on macOS (requires MLX/Metal)
packages = lib.optionalAttrs pkgs.stdenv.hostPlatform.isDarwin {
exo = exoPackage;
# Test environment for running pytest outside of Nix sandbox (needs GPU access)
exo-test-env = testVenv;
};
checks = {
# Ruff linting (works on all platforms)
lint = pkgs.runCommand "ruff-lint" { } ''
export RUFF_CACHE_DIR="$TMPDIR/ruff-cache"
${pkgs.ruff}/bin/ruff check ${inputs.self}/
touch $out
'';
};
};
}

View File

@@ -21,7 +21,7 @@ def exo_shard_downloader(max_parallel_downloads: int = 8) -> ShardDownloader:
async def build_base_shard(model_id: ModelId) -> ShardMetadata:
model_card = await ModelCard.from_hf(model_id)
model_card = await ModelCard.load(model_id)
return PipelineShardMetadata(
model_card=model_card,
device_rank=0,
@@ -166,9 +166,8 @@ class ResumableShardDownloader(ShardDownloader):
for task in asyncio.as_completed(tasks):
try:
yield await task
# TODO: except Exception
except Exception as e:
logger.error("Error downloading shard:", e)
logger.warning(f"Error downloading shard: {type(e).__name__}")
async def get_shard_download_status_for_shard(
self, shard: ShardMetadata

View File

@@ -0,0 +1 @@
"""API adapters for different API formats (Claude, OpenAI Responses, etc.)."""

View File

@@ -0,0 +1,230 @@
"""OpenAI Chat Completions API adapter for converting requests/responses."""
import time
from collections.abc import AsyncGenerator
from typing import Any
from uuid import uuid4
from exo.shared.types.api import (
ChatCompletionChoice,
ChatCompletionMessage,
ChatCompletionMessageText,
ChatCompletionRequest,
ChatCompletionResponse,
ErrorInfo,
ErrorResponse,
FinishReason,
Logprobs,
LogprobsContentItem,
StreamingChoiceResponse,
ToolCall,
)
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
from exo.shared.types.common import CommandId
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
def chat_request_to_text_generation(
request: ChatCompletionRequest,
) -> TextGenerationTaskParams:
instructions: str | None = None
input_messages: list[InputMessage] = []
chat_template_messages: list[dict[str, Any]] = []
for msg in request.messages:
# Normalize content to string
content: str
if msg.content is None:
content = ""
elif isinstance(msg.content, str):
content = msg.content
elif isinstance(msg.content, ChatCompletionMessageText):
content = msg.content.text
else:
# List of ChatCompletionMessageText
content = "\n".join(item.text for item in msg.content)
# Extract system message as instructions
if msg.role == "system":
if instructions is None:
instructions = content
else:
# Append additional system messages
instructions = f"{instructions}\n{content}"
chat_template_messages.append({"role": "system", "content": content})
else:
# Skip messages with no meaningful content
if msg.content is None and msg.thinking is None and msg.tool_calls is None:
continue
if msg.role in ("user", "assistant", "developer"):
input_messages.append(InputMessage(role=msg.role, content=content))
# Build full message dict for chat template (preserves tool_calls etc.)
# Normalize content for model_dump
msg_copy = msg.model_copy(update={"content": content})
dumped: dict[str, Any] = msg_copy.model_dump(exclude_none=True)
chat_template_messages.append(dumped)
return TextGenerationTaskParams(
model=request.model,
input=input_messages if input_messages else "",
instructions=instructions,
max_output_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
top_k=request.top_k,
stop=request.stop,
seed=request.seed,
stream=request.stream,
tools=request.tools,
chat_template_messages=chat_template_messages
if chat_template_messages
else None,
logprobs=request.logprobs or False,
top_logprobs=request.top_logprobs,
)
def chunk_to_response(
chunk: TokenChunk, command_id: CommandId
) -> ChatCompletionResponse:
"""Convert a TokenChunk to a streaming ChatCompletionResponse."""
# Build logprobs if available
logprobs: Logprobs | None = None
if chunk.logprob is not None:
logprobs = Logprobs(
content=[
LogprobsContentItem(
token=chunk.text,
logprob=chunk.logprob,
top_logprobs=chunk.top_logprobs or [],
)
]
)
return ChatCompletionResponse(
id=command_id,
created=int(time.time()),
model=chunk.model,
choices=[
StreamingChoiceResponse(
index=0,
delta=ChatCompletionMessage(role="assistant", content=chunk.text),
logprobs=logprobs,
finish_reason=chunk.finish_reason,
)
],
)
async def generate_chat_stream(
command_id: CommandId,
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
) -> AsyncGenerator[str, None]:
"""Generate Chat Completions API streaming events from chunks."""
async for chunk in chunk_stream:
if isinstance(chunk, ErrorChunk):
error_response = ErrorResponse(
error=ErrorInfo(
message=chunk.error_message or "Internal server error",
type="InternalServerError",
code=500,
)
)
yield f"data: {error_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return
if isinstance(chunk, ToolCallChunk):
tool_call_deltas = [
ToolCall(
id=str(uuid4()),
index=i,
function=tool,
)
for i, tool in enumerate(chunk.tool_calls)
]
tool_response = ChatCompletionResponse(
id=command_id,
created=int(time.time()),
model=chunk.model,
choices=[
StreamingChoiceResponse(
index=0,
delta=ChatCompletionMessage(
role="assistant",
tool_calls=tool_call_deltas,
),
finish_reason="tool_calls",
)
],
)
yield f"data: {tool_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return
chunk_response = chunk_to_response(chunk, command_id)
yield f"data: {chunk_response.model_dump_json()}\n\n"
if chunk.finish_reason is not None:
yield "data: [DONE]\n\n"
async def collect_chat_response(
command_id: CommandId,
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
) -> ChatCompletionResponse:
"""Collect all token chunks and return a single ChatCompletionResponse."""
text_parts: list[str] = []
tool_calls: list[ToolCall] = []
model: str | None = None
finish_reason: FinishReason | None = None
error_message: str | None = None
async for chunk in chunk_stream:
if isinstance(chunk, ErrorChunk):
error_message = chunk.error_message or "Internal server error"
break
if model is None:
model = chunk.model
if isinstance(chunk, TokenChunk):
text_parts.append(chunk.text)
if isinstance(chunk, ToolCallChunk):
tool_calls.extend(
ToolCall(
id=str(uuid4()),
index=i,
function=tool,
)
for i, tool in enumerate(chunk.tool_calls)
)
if chunk.finish_reason is not None:
finish_reason = chunk.finish_reason
if error_message is not None:
raise ValueError(error_message)
combined_text = "".join(text_parts)
assert model is not None
return ChatCompletionResponse(
id=command_id,
created=int(time.time()),
model=model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatCompletionMessage(
role="assistant",
content=combined_text,
tool_calls=tool_calls if tool_calls else None,
),
finish_reason=finish_reason,
)
],
)

View File

@@ -0,0 +1,321 @@
"""Claude Messages API adapter for converting requests/responses."""
import json
from collections.abc import AsyncGenerator
from typing import Any
from uuid import uuid4
from exo.shared.types.api import FinishReason
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
from exo.shared.types.claude_api import (
ClaudeContentBlock,
ClaudeContentBlockDeltaEvent,
ClaudeContentBlockStartEvent,
ClaudeContentBlockStopEvent,
ClaudeInputJsonDelta,
ClaudeMessageDelta,
ClaudeMessageDeltaEvent,
ClaudeMessageDeltaUsage,
ClaudeMessagesRequest,
ClaudeMessagesResponse,
ClaudeMessageStart,
ClaudeMessageStartEvent,
ClaudeMessageStopEvent,
ClaudeStopReason,
ClaudeTextBlock,
ClaudeTextDelta,
ClaudeToolResultBlock,
ClaudeToolUseBlock,
ClaudeUsage,
)
from exo.shared.types.common import CommandId
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
def finish_reason_to_claude_stop_reason(
finish_reason: FinishReason | None,
) -> ClaudeStopReason | None:
"""Map OpenAI finish_reason to Claude stop_reason."""
if finish_reason is None:
return None
mapping: dict[FinishReason, ClaudeStopReason] = {
"stop": "end_turn",
"length": "max_tokens",
"tool_calls": "tool_use",
"content_filter": "end_turn",
"function_call": "tool_use",
}
return mapping.get(finish_reason, "end_turn")
def _extract_tool_result_text(block: ClaudeToolResultBlock) -> str:
"""Extract plain text from a tool_result content field."""
if block.content is None:
return ""
if isinstance(block.content, str):
return block.content
return "".join(sub_block.text for sub_block in block.content)
def claude_request_to_text_generation(
request: ClaudeMessagesRequest,
) -> TextGenerationTaskParams:
# Handle system message
instructions: str | None = None
chat_template_messages: list[dict[str, Any]] = []
if request.system:
if isinstance(request.system, str):
instructions = request.system
else:
instructions = "".join(block.text for block in request.system)
chat_template_messages.append({"role": "system", "content": instructions})
# Convert messages to input
input_messages: list[InputMessage] = []
for msg in request.messages:
if isinstance(msg.content, str):
input_messages.append(InputMessage(role=msg.role, content=msg.content))
chat_template_messages.append({"role": msg.role, "content": msg.content})
continue
# Process structured content blocks
text_parts: list[str] = []
tool_calls: list[dict[str, Any]] = []
tool_results: list[ClaudeToolResultBlock] = []
for block in msg.content:
if isinstance(block, ClaudeTextBlock):
text_parts.append(block.text)
elif isinstance(block, ClaudeToolUseBlock):
tool_calls.append(
{
"id": block.id,
"type": "function",
"function": {
"name": block.name,
"arguments": json.dumps(block.input),
},
}
)
elif isinstance(block, ClaudeToolResultBlock):
tool_results.append(block)
content = "".join(text_parts)
# Build InputMessage from text content
if msg.role in ("user", "assistant"):
input_messages.append(InputMessage(role=msg.role, content=content))
# Build chat_template_messages preserving tool structure
if tool_calls:
chat_template_messages.append(
{"role": "assistant", "content": content, "tool_calls": tool_calls}
)
elif tool_results:
for tr in tool_results:
chat_template_messages.append(
{
"role": "tool",
"tool_call_id": tr.tool_use_id,
"content": _extract_tool_result_text(tr),
}
)
else:
chat_template_messages.append({"role": msg.role, "content": content})
# Convert Claude tool definitions to OpenAI-style function tools
tools: list[dict[str, Any]] | None = None
if request.tools:
tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": tool.input_schema,
},
}
for tool in request.tools
]
return TextGenerationTaskParams(
model=request.model,
input=input_messages if input_messages else "",
instructions=instructions,
max_output_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
top_k=request.top_k,
stop=request.stop_sequences,
stream=request.stream,
tools=tools,
chat_template_messages=chat_template_messages
if chat_template_messages
else None,
)
async def collect_claude_response(
command_id: CommandId,
model: str,
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
) -> ClaudeMessagesResponse:
"""Collect all token chunks and return a single ClaudeMessagesResponse."""
text_parts: list[str] = []
tool_use_blocks: list[ClaudeToolUseBlock] = []
stop_reason: ClaudeStopReason | None = None
last_stats = None
error_message: str | None = None
async for chunk in chunk_stream:
if isinstance(chunk, ErrorChunk):
error_message = chunk.error_message or "Internal server error"
break
if isinstance(chunk, ToolCallChunk):
for tool in chunk.tool_calls:
tool_use_blocks.append(
ClaudeToolUseBlock(
id=f"toolu_{uuid4().hex[:24]}",
name=tool.name,
input=json.loads(tool.arguments), # pyright: ignore[reportAny]
)
)
last_stats = chunk.stats or last_stats
stop_reason = "tool_use"
continue
text_parts.append(chunk.text)
last_stats = chunk.stats or last_stats
if chunk.finish_reason is not None:
stop_reason = finish_reason_to_claude_stop_reason(chunk.finish_reason)
if error_message is not None:
raise ValueError(error_message)
combined_text = "".join(text_parts)
# Build content blocks
content: list[ClaudeContentBlock] = []
if combined_text:
content.append(ClaudeTextBlock(text=combined_text))
content.extend(tool_use_blocks)
# If no content at all, include empty text block
if not content:
content.append(ClaudeTextBlock(text=""))
# Use actual usage data from stats if available
input_tokens = last_stats.prompt_tokens if last_stats else 0
output_tokens = last_stats.generation_tokens if last_stats else 0
return ClaudeMessagesResponse(
id=f"msg_{command_id}",
model=model,
content=content,
stop_reason=stop_reason,
usage=ClaudeUsage(
input_tokens=input_tokens,
output_tokens=output_tokens,
),
)
async def generate_claude_stream(
command_id: CommandId,
model: str,
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
) -> AsyncGenerator[str, None]:
"""Generate Claude Messages API streaming events from TokenChunks."""
# Initial message_start event
initial_message = ClaudeMessageStart(
id=f"msg_{command_id}",
model=model,
content=[],
stop_reason=None,
usage=ClaudeUsage(input_tokens=0, output_tokens=0),
)
start_event = ClaudeMessageStartEvent(message=initial_message)
yield f"event: message_start\ndata: {start_event.model_dump_json()}\n\n"
# content_block_start for text block at index 0
block_start = ClaudeContentBlockStartEvent(
index=0, content_block=ClaudeTextBlock(text="")
)
yield f"event: content_block_start\ndata: {block_start.model_dump_json()}\n\n"
output_tokens = 0
stop_reason: ClaudeStopReason | None = None
last_stats = None
next_block_index = 1 # text block is 0, tool blocks start at 1
async for chunk in chunk_stream:
if isinstance(chunk, ErrorChunk):
# Close text block and bail
break
if isinstance(chunk, ToolCallChunk):
last_stats = chunk.stats or last_stats
stop_reason = "tool_use"
# Emit tool_use content blocks
for tool in chunk.tool_calls:
tool_id = f"toolu_{uuid4().hex[:24]}"
tool_input_json = tool.arguments
# content_block_start for tool_use
tool_block_start = ClaudeContentBlockStartEvent(
index=next_block_index,
content_block=ClaudeToolUseBlock(
id=tool_id, name=tool.name, input={}
),
)
yield f"event: content_block_start\ndata: {tool_block_start.model_dump_json()}\n\n"
# content_block_delta with input_json_delta
tool_delta_event = ClaudeContentBlockDeltaEvent(
index=next_block_index,
delta=ClaudeInputJsonDelta(partial_json=tool_input_json),
)
yield f"event: content_block_delta\ndata: {tool_delta_event.model_dump_json()}\n\n"
# content_block_stop
tool_block_stop = ClaudeContentBlockStopEvent(index=next_block_index)
yield f"event: content_block_stop\ndata: {tool_block_stop.model_dump_json()}\n\n"
next_block_index += 1
continue
output_tokens += 1 # Count each chunk as one token
last_stats = chunk.stats or last_stats
# content_block_delta
delta_event = ClaudeContentBlockDeltaEvent(
index=0,
delta=ClaudeTextDelta(text=chunk.text),
)
yield f"event: content_block_delta\ndata: {delta_event.model_dump_json()}\n\n"
if chunk.finish_reason is not None:
stop_reason = finish_reason_to_claude_stop_reason(chunk.finish_reason)
# Use actual token count from stats if available
if last_stats is not None:
output_tokens = last_stats.generation_tokens
# content_block_stop for text block
block_stop = ClaudeContentBlockStopEvent(index=0)
yield f"event: content_block_stop\ndata: {block_stop.model_dump_json()}\n\n"
# message_delta
message_delta = ClaudeMessageDeltaEvent(
delta=ClaudeMessageDelta(stop_reason=stop_reason),
usage=ClaudeMessageDeltaUsage(output_tokens=output_tokens),
)
yield f"event: message_delta\ndata: {message_delta.model_dump_json()}\n\n"
# message_stop
message_stop = ClaudeMessageStopEvent()
yield f"event: message_stop\ndata: {message_stop.model_dump_json()}\n\n"

View File

@@ -0,0 +1,369 @@
"""OpenAI Responses API adapter for converting requests/responses."""
from collections.abc import AsyncGenerator
from itertools import count
from typing import Any
from uuid import uuid4
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
from exo.shared.types.common import CommandId
from exo.shared.types.openai_responses import (
FunctionCallInputItem,
ResponseCompletedEvent,
ResponseContentPart,
ResponseContentPartAddedEvent,
ResponseContentPartDoneEvent,
ResponseCreatedEvent,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionCallArgumentsDoneEvent,
ResponseFunctionCallItem,
ResponseInProgressEvent,
ResponseInputMessage,
ResponseItem,
ResponseMessageItem,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseOutputText,
ResponsesRequest,
ResponsesResponse,
ResponseTextDeltaEvent,
ResponseTextDoneEvent,
ResponseUsage,
)
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
def _extract_content(content: str | list[ResponseContentPart]) -> str:
"""Extract plain text from a content field that may be a string or list of parts."""
if isinstance(content, str):
return content
return "".join(part.text for part in content)
def responses_request_to_text_generation(
request: ResponsesRequest,
) -> TextGenerationTaskParams:
input_value: str | list[InputMessage]
built_chat_template: list[dict[str, Any]] | None = None
if isinstance(request.input, str):
input_value = request.input
else:
input_messages: list[InputMessage] = []
chat_template_messages: list[dict[str, Any]] = []
if request.instructions is not None:
chat_template_messages.append(
{"role": "system", "content": request.instructions}
)
for item in request.input:
if isinstance(item, ResponseInputMessage):
content = _extract_content(item.content)
if item.role in ("user", "assistant", "developer"):
input_messages.append(InputMessage(role=item.role, content=content))
if item.role == "system":
chat_template_messages.append(
{"role": "system", "content": content}
)
else:
chat_template_messages.append(
{"role": item.role, "content": content}
)
elif isinstance(item, FunctionCallInputItem):
chat_template_messages.append(
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": item.call_id,
"type": "function",
"function": {
"name": item.name,
"arguments": item.arguments,
},
}
],
}
)
else:
chat_template_messages.append(
{
"role": "tool",
"tool_call_id": item.call_id,
"content": item.output,
}
)
input_value = input_messages if input_messages else ""
built_chat_template = chat_template_messages if chat_template_messages else None
return TextGenerationTaskParams(
model=request.model,
input=input_value,
instructions=request.instructions,
max_output_tokens=request.max_output_tokens,
temperature=request.temperature,
top_p=request.top_p,
stream=request.stream,
tools=request.tools,
top_k=request.top_k,
stop=request.stop,
seed=request.seed,
chat_template_messages=built_chat_template or request.chat_template_messages,
)
async def collect_responses_response(
command_id: CommandId,
model: str,
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
) -> ResponsesResponse:
"""Collect all token chunks and return a single ResponsesResponse."""
response_id = f"resp_{command_id}"
item_id = f"item_{command_id}"
accumulated_text = ""
function_call_items: list[ResponseFunctionCallItem] = []
last_stats = None
error_message: str | None = None
async for chunk in chunk_stream:
if isinstance(chunk, ErrorChunk):
error_message = chunk.error_message or "Internal server error"
break
if isinstance(chunk, ToolCallChunk):
for tool in chunk.tool_calls:
function_call_items.append(
ResponseFunctionCallItem(
id=f"fc_{uuid4().hex[:24]}",
call_id=f"call_{uuid4().hex[:24]}",
name=tool.name,
arguments=tool.arguments,
)
)
last_stats = chunk.stats or last_stats
continue
accumulated_text += chunk.text
last_stats = chunk.stats or last_stats
if error_message is not None:
raise ValueError(error_message)
# Create usage from stats if available
usage = None
if last_stats is not None:
usage = ResponseUsage(
input_tokens=last_stats.prompt_tokens,
output_tokens=last_stats.generation_tokens,
total_tokens=last_stats.prompt_tokens + last_stats.generation_tokens,
)
output: list[ResponseItem] = [
ResponseMessageItem(
id=item_id,
content=[ResponseOutputText(text=accumulated_text)],
status="completed",
)
]
output.extend(function_call_items)
return ResponsesResponse(
id=response_id,
model=model,
status="completed",
output=output,
output_text=accumulated_text,
usage=usage,
)
async def generate_responses_stream(
command_id: CommandId,
model: str,
chunk_stream: AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None],
) -> AsyncGenerator[str, None]:
"""Generate OpenAI Responses API streaming events from TokenChunks."""
response_id = f"resp_{command_id}"
item_id = f"item_{command_id}"
seq = count(1)
# response.created
initial_response = ResponsesResponse(
id=response_id,
model=model,
status="in_progress",
output=[],
output_text="",
)
created_event = ResponseCreatedEvent(
sequence_number=next(seq), response=initial_response
)
yield f"event: response.created\ndata: {created_event.model_dump_json()}\n\n"
# response.in_progress
in_progress_event = ResponseInProgressEvent(
sequence_number=next(seq), response=initial_response
)
yield f"event: response.in_progress\ndata: {in_progress_event.model_dump_json()}\n\n"
# response.output_item.added
initial_item = ResponseMessageItem(
id=item_id,
content=[ResponseOutputText(text="")],
status="in_progress",
)
item_added = ResponseOutputItemAddedEvent(
sequence_number=next(seq), output_index=0, item=initial_item
)
yield f"event: response.output_item.added\ndata: {item_added.model_dump_json()}\n\n"
# response.content_part.added
initial_part = ResponseOutputText(text="")
part_added = ResponseContentPartAddedEvent(
sequence_number=next(seq),
item_id=item_id,
output_index=0,
content_index=0,
part=initial_part,
)
yield f"event: response.content_part.added\ndata: {part_added.model_dump_json()}\n\n"
accumulated_text = ""
function_call_items: list[ResponseFunctionCallItem] = []
last_stats = None
next_output_index = 1 # message item is at 0
async for chunk in chunk_stream:
if isinstance(chunk, ErrorChunk):
break
if isinstance(chunk, ToolCallChunk):
last_stats = chunk.stats or last_stats
for tool in chunk.tool_calls:
fc_id = f"fc_{uuid4().hex[:24]}"
call_id = f"call_{uuid4().hex[:24]}"
# response.output_item.added for function_call
fc_item = ResponseFunctionCallItem(
id=fc_id,
call_id=call_id,
name=tool.name,
arguments="",
status="in_progress",
)
fc_added = ResponseOutputItemAddedEvent(
sequence_number=next(seq),
output_index=next_output_index,
item=fc_item,
)
yield f"event: response.output_item.added\ndata: {fc_added.model_dump_json()}\n\n"
# response.function_call_arguments.delta
args_delta = ResponseFunctionCallArgumentsDeltaEvent(
sequence_number=next(seq),
item_id=fc_id,
output_index=next_output_index,
delta=tool.arguments,
)
yield f"event: response.function_call_arguments.delta\ndata: {args_delta.model_dump_json()}\n\n"
# response.function_call_arguments.done
args_done = ResponseFunctionCallArgumentsDoneEvent(
sequence_number=next(seq),
item_id=fc_id,
output_index=next_output_index,
name=tool.name,
arguments=tool.arguments,
)
yield f"event: response.function_call_arguments.done\ndata: {args_done.model_dump_json()}\n\n"
# response.output_item.done
fc_done_item = ResponseFunctionCallItem(
id=fc_id,
call_id=call_id,
name=tool.name,
arguments=tool.arguments,
status="completed",
)
fc_item_done = ResponseOutputItemDoneEvent(
sequence_number=next(seq),
output_index=next_output_index,
item=fc_done_item,
)
yield f"event: response.output_item.done\ndata: {fc_item_done.model_dump_json()}\n\n"
function_call_items.append(fc_done_item)
next_output_index += 1
continue
accumulated_text += chunk.text
last_stats = chunk.stats or last_stats
# response.output_text.delta
delta_event = ResponseTextDeltaEvent(
sequence_number=next(seq),
item_id=item_id,
output_index=0,
content_index=0,
delta=chunk.text,
)
yield f"event: response.output_text.delta\ndata: {delta_event.model_dump_json()}\n\n"
# response.output_text.done
text_done = ResponseTextDoneEvent(
sequence_number=next(seq),
item_id=item_id,
output_index=0,
content_index=0,
text=accumulated_text,
)
yield f"event: response.output_text.done\ndata: {text_done.model_dump_json()}\n\n"
# response.content_part.done
final_part = ResponseOutputText(text=accumulated_text)
part_done = ResponseContentPartDoneEvent(
sequence_number=next(seq),
item_id=item_id,
output_index=0,
content_index=0,
part=final_part,
)
yield f"event: response.content_part.done\ndata: {part_done.model_dump_json()}\n\n"
# response.output_item.done
final_message_item = ResponseMessageItem(
id=item_id,
content=[ResponseOutputText(text=accumulated_text)],
status="completed",
)
item_done = ResponseOutputItemDoneEvent(
sequence_number=next(seq), output_index=0, item=final_message_item
)
yield f"event: response.output_item.done\ndata: {item_done.model_dump_json()}\n\n"
# Create usage from stats if available
usage = None
if last_stats is not None:
usage = ResponseUsage(
input_tokens=last_stats.prompt_tokens,
output_tokens=last_stats.generation_tokens,
total_tokens=last_stats.prompt_tokens + last_stats.generation_tokens,
)
# response.completed
output: list[ResponseItem] = [final_message_item]
output.extend(function_call_items)
final_response = ResponsesResponse(
id=response_id,
model=model,
status="completed",
output=output,
output_text=accumulated_text,
usage=usage,
)
completed_event = ResponseCompletedEvent(
sequence_number=next(seq), response=final_response
)
yield f"event: response.completed\ndata: {completed_event.model_dump_json()}\n\n"

View File

@@ -2,7 +2,7 @@ import base64
import contextlib
import json
import time
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Awaitable, Callable
from http import HTTPStatus
from typing import Annotated, Literal, cast
from uuid import uuid4
@@ -19,6 +19,21 @@ from hypercorn.config import Config
from hypercorn.typing import ASGIFramework
from loguru import logger
from exo.master.adapters.chat_completions import (
chat_request_to_text_generation,
collect_chat_response,
generate_chat_stream,
)
from exo.master.adapters.claude import (
claude_request_to_text_generation,
collect_claude_response,
generate_claude_stream,
)
from exo.master.adapters.responses import (
collect_responses_response,
generate_responses_stream,
responses_request_to_text_generation,
)
from exo.master.image_store import ImageStore
from exo.master.placement import place_instance as get_instance_placements
from exo.shared.apply import apply
@@ -35,12 +50,13 @@ from exo.shared.models.model_cards import (
)
from exo.shared.types.api import (
AdvancedImageParams,
BenchChatCompletionRequest,
BenchChatCompletionResponse,
BenchChatCompletionTaskParams,
BenchImageGenerationResponse,
BenchImageGenerationTaskParams,
ChatCompletionChoice,
ChatCompletionMessage,
ChatCompletionRequest,
ChatCompletionResponse,
CreateInstanceParams,
CreateInstanceResponse,
@@ -51,7 +67,7 @@ from exo.shared.types.api import (
FinishReason,
GenerationStats,
ImageData,
ImageEditsInternalParams,
ImageEditsTaskParams,
ImageGenerationResponse,
ImageGenerationStats,
ImageGenerationTaskParams,
@@ -64,7 +80,6 @@ from exo.shared.types.api import (
PlacementPreviewResponse,
StartDownloadParams,
StartDownloadResponse,
StreamingChoiceResponse,
ToolCall,
)
from exo.shared.types.chunks import (
@@ -74,8 +89,11 @@ from exo.shared.types.chunks import (
TokenChunk,
ToolCallChunk,
)
from exo.shared.types.claude_api import (
ClaudeMessagesRequest,
ClaudeMessagesResponse,
)
from exo.shared.types.commands import (
ChatCompletion,
Command,
CreateInstance,
DeleteDownload,
@@ -89,6 +107,7 @@ from exo.shared.types.commands import (
SendInputChunk,
StartDownload,
TaskFinished,
TextGeneration,
)
from exo.shared.types.common import CommandId, Id, NodeId, SessionId
from exo.shared.types.events import (
@@ -98,8 +117,11 @@ from exo.shared.types.events import (
IndexedEvent,
)
from exo.shared.types.memory import Memory
from exo.shared.types.openai_responses import (
ResponsesRequest,
ResponsesResponse,
)
from exo.shared.types.state import State
from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
from exo.shared.types.worker.shards import Sharding
from exo.utils.banner import print_startup_banner
@@ -112,35 +134,6 @@ def _format_to_content_type(image_format: Literal["png", "jpeg", "webp"] | None)
return f"image/{image_format or 'png'}"
def chunk_to_response(
chunk: TokenChunk | ToolCallChunk, command_id: CommandId
) -> ChatCompletionResponse:
return ChatCompletionResponse(
id=command_id,
created=int(time.time()),
model=chunk.model,
choices=[
StreamingChoiceResponse(
index=0,
delta=ChatCompletionMessage(role="assistant", content=chunk.text)
if isinstance(chunk, TokenChunk)
else ChatCompletionMessage(
role="assistant",
tool_calls=[
ToolCall(
id=str(uuid4()),
index=i,
function=tool,
)
for i, tool in enumerate(chunk.tool_calls)
],
),
finish_reason=chunk.finish_reason,
)
],
)
async def resolve_model_card(model_id: ModelId) -> ModelCard:
if model_id in MODEL_CARDS:
model_card = MODEL_CARDS[model_id]
@@ -183,6 +176,15 @@ class API:
self.paused_ev: anyio.Event = anyio.Event()
self.app = FastAPI()
@self.app.middleware("http")
async def _log_requests( # pyright: ignore[reportUnusedFunction]
request: Request,
call_next: Callable[[Request], Awaitable[StreamingResponse]],
) -> StreamingResponse:
logger.debug(f"API request: {request.method} {request.url.path}")
return await call_next(request)
self._setup_exception_handlers()
self._setup_cors()
self._setup_routes()
@@ -196,7 +198,7 @@ class API:
name="dashboard",
)
self._chat_completion_queues: dict[
self._text_generation_queues: dict[
CommandId, Sender[TokenChunk | ErrorChunk | ToolCallChunk]
] = {}
self._image_generation_queues: dict[
@@ -210,7 +212,7 @@ class API:
self.state = State()
self.session_id = new_session_id
self.event_buffer = OrderedBuffer[Event]()
self._chat_completion_queues = {}
self._text_generation_queues = {}
self._image_generation_queues = {}
self.unpause(result_clock)
@@ -267,6 +269,8 @@ class API:
self.app.post("/bench/images/edits")(self.bench_image_edits)
self.app.get("/images")(self.list_images)
self.app.get("/images/{image_id}")(self.get_image)
self.app.post("/v1/messages", response_model=None)(self.claude_messages)
self.app.post("/v1/responses", response_model=None)(self.openai_responses)
self.app.get("/state")(lambda: self.state)
self.app.get("/events")(lambda: self._event_log)
self.app.post("/download/start")(self.start_download)
@@ -274,7 +278,7 @@ class API:
async def place_instance(self, payload: PlaceInstanceParams):
command = PlaceInstance(
model_card=await resolve_model_card(payload.model_id),
model_card=await ModelCard.load(payload.model_id),
sharding=payload.sharding,
instance_meta=payload.instance_meta,
min_nodes=payload.min_nodes,
@@ -291,7 +295,7 @@ class API:
self, payload: CreateInstanceParams
) -> CreateInstanceResponse:
instance = payload.instance
model_card = await resolve_model_card(instance.shard_assignments.model_id)
model_card = await ModelCard.load(instance.shard_assignments.model_id)
required_memory = model_card.storage_size
available_memory = self._calculate_total_available_memory()
@@ -319,7 +323,7 @@ class API:
instance_meta: InstanceMeta = InstanceMeta.MlxRing,
min_nodes: int = 1,
) -> Instance:
model_card = await resolve_model_card(model_id)
model_card = await ModelCard.load(model_id)
try:
placements = get_instance_placements(
@@ -491,13 +495,15 @@ class API:
instance_id=instance_id,
)
async def _chat_chunk_stream(
async def _token_chunk_stream(
self, command_id: CommandId
) -> AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None]:
"""Yield `TokenChunk`s for a given command until completion."""
"""Yield chunks for a given command until completion.
This is the internal low-level stream used by all API adapters.
"""
try:
self._chat_completion_queues[command_id], recv = channel[
self._text_generation_queues[command_id], recv = channel[
ErrorChunk | ToolCallChunk | TokenChunk
]()
@@ -518,105 +524,20 @@ class API:
finally:
command = TaskFinished(finished_command_id=command_id)
await self._send(command)
if command_id in self._chat_completion_queues:
del self._chat_completion_queues[command_id]
if command_id in self._text_generation_queues:
del self._text_generation_queues[command_id]
async def _generate_chat_stream(
self, command_id: CommandId
) -> AsyncGenerator[str, None]:
"""Generate chat completion stream as JSON strings."""
async for chunk in self._chat_chunk_stream(command_id):
assert not isinstance(chunk, ImageChunk)
if chunk.finish_reason == "error":
error_response = ErrorResponse(
error=ErrorInfo(
message=chunk.error_message or "Internal server error",
type="InternalServerError",
code=500,
)
)
yield f"data: {error_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return
chunk_response: ChatCompletionResponse = chunk_to_response(
chunk, command_id
)
logger.debug(f"chunk_response: {chunk_response}")
yield f"data: {chunk_response.model_dump_json()}\n\n"
if chunk.finish_reason is not None:
yield "data: [DONE]\n\n"
async def _collect_chat_completion(
self, command_id: CommandId
) -> ChatCompletionResponse:
"""Collect all token chunks for a chat completion and return a single response."""
text_parts: list[str] = []
tool_calls: list[ToolCall] = []
model: str | None = None
finish_reason: FinishReason | None = None
async for chunk in self._chat_chunk_stream(command_id):
if isinstance(chunk, ErrorChunk):
raise HTTPException(
status_code=500,
detail=chunk.error_message or "Internal server error",
)
if model is None:
model = chunk.model
if isinstance(chunk, TokenChunk):
text_parts.append(chunk.text)
if isinstance(chunk, ToolCallChunk):
tool_calls.extend(
ToolCall(
id=str(uuid4()),
index=i,
function=tool,
)
for i, tool in enumerate(chunk.tool_calls)
)
if chunk.finish_reason is not None:
finish_reason = chunk.finish_reason
combined_text = "".join(text_parts)
assert model is not None
return ChatCompletionResponse(
id=command_id,
created=int(time.time()),
model=model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatCompletionMessage(
role="assistant",
content=combined_text,
tool_calls=tool_calls,
),
finish_reason=finish_reason,
)
],
)
async def _collect_chat_completion_with_stats(
async def _collect_text_generation_with_stats(
self, command_id: CommandId
) -> BenchChatCompletionResponse:
text_parts: list[str] = []
tool_calls: list[ToolCall] = []
model: str | None = None
model: ModelId | None = None
finish_reason: FinishReason | None = None
stats: GenerationStats | None = None
async for chunk in self._chat_chunk_stream(command_id):
async for chunk in self._token_chunk_stream(command_id):
if chunk.finish_reason == "error":
raise HTTPException(
status_code=500,
@@ -655,7 +576,9 @@ class API:
ChatCompletionChoice(
index=0,
message=ChatCompletionMessage(
role="assistant", content=combined_text, tool_calls=tool_calls
role="assistant",
content=combined_text,
tool_calls=tool_calls if tool_calls else None,
),
finish_reason=finish_reason,
)
@@ -664,68 +587,84 @@ class API:
)
return resp
async def _trigger_notify_user_to_download_model(self, model_id: str) -> None:
async def _trigger_notify_user_to_download_model(self, model_id: ModelId) -> None:
logger.warning(
"TODO: we should send a notification to the user to download the model"
)
async def chat_completions(
self, payload: ChatCompletionTaskParams
self, payload: ChatCompletionRequest
) -> ChatCompletionResponse | StreamingResponse:
"""Handle chat completions, supporting both streaming and non-streaming responses."""
model_card = await resolve_model_card(ModelId(payload.model))
payload.model = model_card.model_id
if not any(
instance.shard_assignments.model_id == payload.model
for instance in self.state.instances.values()
):
await self._trigger_notify_user_to_download_model(payload.model)
raise HTTPException(
status_code=404, detail=f"No instance found for model {payload.model}"
)
command = ChatCompletion(
request_params=payload,
"""OpenAI Chat Completions API - adapter."""
task_params = chat_request_to_text_generation(payload)
resolved_model = await self._resolve_and_validate_text_model(
ModelId(task_params.model)
)
task_params = task_params.model_copy(update={"model": resolved_model})
command = TextGeneration(task_params=task_params)
await self._send(command)
if payload.stream:
return StreamingResponse(
self._generate_chat_stream(command.command_id),
generate_chat_stream(
command.command_id,
self._token_chunk_stream(command.command_id),
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "close",
"X-Accel-Buffering": "no",
},
)
return await self._collect_chat_completion(command.command_id)
return await collect_chat_response(
command.command_id,
self._token_chunk_stream(command.command_id),
)
async def bench_chat_completions(
self, payload: BenchChatCompletionTaskParams
self, payload: BenchChatCompletionRequest
) -> BenchChatCompletionResponse:
model_card = await resolve_model_card(ModelId(payload.model))
payload.model = model_card.model_id
task_params = chat_request_to_text_generation(payload)
resolved_model = await self._resolve_and_validate_text_model(
ModelId(task_params.model)
)
task_params = task_params.model_copy(update={"model": resolved_model})
if not any(
instance.shard_assignments.model_id == payload.model
for instance in self.state.instances.values()
):
await self._trigger_notify_user_to_download_model(payload.model)
raise HTTPException(
status_code=404, detail=f"No instance found for model {payload.model}"
)
task_params = task_params.model_copy(update={"stream": False, "bench": True})
payload.stream = False
command = ChatCompletion(request_params=payload)
command = TextGeneration(task_params=task_params)
await self._send(command)
response = await self._collect_chat_completion_with_stats(command.command_id)
response = await self._collect_text_generation_with_stats(command.command_id)
return response
async def _validate_image_model(self, model: str) -> ModelId:
async def _resolve_and_validate_text_model(self, model: ModelId) -> ModelId:
"""Validate a text model exists and return the resolved model ID.
Raises HTTPException 404 if no instance is found for the model.
"""
model_card = await resolve_model_card(model)
resolved = model_card.model_id
if not any(
instance.shard_assignments.model_id == resolved
for instance in self.state.instances.values()
):
await self._trigger_notify_user_to_download_model(resolved)
raise HTTPException(
status_code=404,
detail=f"No instance found for model {resolved}",
)
return resolved
async def _validate_image_model(self, model: ModelId) -> ModelId:
"""Validate model exists and return resolved model ID.
Raises HTTPException 404 if no instance is found for the model.
"""
model_card = await resolve_model_card(ModelId(model))
model_card = await ModelCard.load(model)
resolved_model = model_card.model_id
if not any(
instance.shard_assignments.model_id == resolved_model
@@ -771,10 +710,10 @@ class API:
When stream=True and partial_images > 0, returns a StreamingResponse
with SSE-formatted events for partial and final images.
"""
payload.model = await self._validate_image_model(payload.model)
payload.model = await self._validate_image_model(ModelId(payload.model))
command = ImageGeneration(
request_params=payload,
task_params=payload,
)
await self._send(command)
@@ -1016,13 +955,13 @@ class API:
async def bench_image_generations(
self, request: Request, payload: BenchImageGenerationTaskParams
) -> BenchImageGenerationResponse:
payload.model = await self._validate_image_model(payload.model)
payload.model = await self._validate_image_model(ModelId(payload.model))
payload.stream = False
payload.partial_images = 0
command = ImageGeneration(
request_params=payload,
task_params=payload,
)
await self._send(command)
@@ -1037,7 +976,7 @@ class API:
self,
image: UploadFile,
prompt: str,
model: str,
model: ModelId,
n: int,
size: str,
response_format: Literal["url", "b64_json"],
@@ -1064,7 +1003,7 @@ class API:
total_chunks = len(data_chunks)
command = ImageEdits(
request_params=ImageEditsInternalParams(
task_params=ImageEditsTaskParams(
image_data="",
total_input_chunks=total_chunks,
prompt=prompt,
@@ -1132,7 +1071,7 @@ class API:
command = await self._send_image_edits_command(
image=image,
prompt=prompt,
model=model,
model=ModelId(model),
n=n,
size=size,
response_format=response_format,
@@ -1188,7 +1127,7 @@ class API:
command = await self._send_image_edits_command(
image=image,
prompt=prompt,
model=model,
model=ModelId(model),
n=n,
size=size,
response_format=response_format,
@@ -1208,6 +1147,72 @@ class API:
response_format=response_format,
)
async def claude_messages(
self, payload: ClaudeMessagesRequest
) -> ClaudeMessagesResponse | StreamingResponse:
"""Claude Messages API - adapter."""
task_params = claude_request_to_text_generation(payload)
resolved_model = await self._resolve_and_validate_text_model(
ModelId(task_params.model)
)
task_params = task_params.model_copy(update={"model": resolved_model})
command = TextGeneration(task_params=task_params)
await self._send(command)
if payload.stream:
return StreamingResponse(
generate_claude_stream(
command.command_id,
payload.model,
self._token_chunk_stream(command.command_id),
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "close",
"X-Accel-Buffering": "no",
},
)
return await collect_claude_response(
command.command_id,
payload.model,
self._token_chunk_stream(command.command_id),
)
async def openai_responses(
self, payload: ResponsesRequest
) -> ResponsesResponse | StreamingResponse:
"""OpenAI Responses API."""
task_params = responses_request_to_text_generation(payload)
resolved_model = await self._resolve_and_validate_text_model(task_params.model)
task_params = task_params.model_copy(update={"model": resolved_model})
command = TextGeneration(task_params=task_params)
await self._send(command)
if payload.stream:
return StreamingResponse(
generate_responses_stream(
command.command_id,
payload.model,
self._token_chunk_stream(command.command_id),
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "close",
"X-Accel-Buffering": "no",
},
)
return await collect_responses_response(
command.command_id,
payload.model,
self._token_chunk_stream(command.command_id),
)
def _calculate_total_available_memory(self) -> Memory:
"""Calculate total available memory across all nodes in bytes."""
total_available = Memory()
@@ -1280,14 +1285,14 @@ class API:
self._image_generation_queues.pop(
event.command_id, None
)
if queue := self._chat_completion_queues.get(
if queue := self._text_generation_queues.get(
event.command_id, None
):
assert not isinstance(event.chunk, ImageChunk)
try:
await queue.send(event.chunk)
except BrokenResourceError:
self._chat_completion_queues.pop(event.command_id, None)
self._text_generation_queues.pop(event.command_id, None)
async def _pause_on_new_election(self):
with self.election_receiver as ems:

View File

@@ -12,7 +12,6 @@ from exo.master.placement import (
)
from exo.shared.apply import apply
from exo.shared.types.commands import (
ChatCompletion,
CreateInstance,
DeleteInstance,
ForwarderCommand,
@@ -23,6 +22,7 @@ from exo.shared.types.commands import (
SendInputChunk,
TaskFinished,
TestCommand,
TextGeneration,
)
from exo.shared.types.common import CommandId, NodeId, SessionId
from exo.shared.types.events import (
@@ -37,9 +37,6 @@ from exo.shared.types.events import (
TaskDeleted,
)
from exo.shared.types.state import State
from exo.shared.types.tasks import (
ChatCompletion as ChatCompletionTask,
)
from exo.shared.types.tasks import (
ImageEdits as ImageEditsTask,
)
@@ -50,6 +47,9 @@ from exo.shared.types.tasks import (
TaskId,
TaskStatus,
)
from exo.shared.types.tasks import (
TextGeneration as TextGenerationTask,
)
from exo.shared.types.worker.instances import InstanceId
from exo.utils.channels import Receiver, Sender, channel
from exo.utils.event_buffer import MultiSourceBuffer
@@ -117,11 +117,11 @@ class Master:
match command:
case TestCommand():
pass
case ChatCompletion():
case TextGeneration():
for instance in self.state.instances.values():
if (
instance.shard_assignments.model_id
== command.request_params.model
== command.task_params.model
):
task_count = sum(
1
@@ -134,7 +134,7 @@ class Master:
if not instance_task_counts:
raise ValueError(
f"No instance found for model {command.request_params.model}"
f"No instance found for model {command.task_params.model}"
)
available_instance_ids = sorted(
@@ -148,12 +148,12 @@ class Master:
generated_events.append(
TaskCreated(
task_id=task_id,
task=ChatCompletionTask(
task=TextGenerationTask(
task_id=task_id,
command_id=command.command_id,
instance_id=available_instance_ids[0],
task_status=TaskStatus.Pending,
task_params=command.request_params,
task_params=command.task_params,
),
)
)
@@ -163,7 +163,7 @@ class Master:
for instance in self.state.instances.values():
if (
instance.shard_assignments.model_id
== command.request_params.model
== command.task_params.model
):
task_count = sum(
1
@@ -176,7 +176,7 @@ class Master:
if not instance_task_counts:
raise ValueError(
f"No instance found for model {command.request_params.model}"
f"No instance found for model {command.task_params.model}"
)
available_instance_ids = sorted(
@@ -195,7 +195,7 @@ class Master:
command_id=command.command_id,
instance_id=available_instance_ids[0],
task_status=TaskStatus.Pending,
task_params=command.request_params,
task_params=command.task_params,
),
)
)
@@ -205,7 +205,7 @@ class Master:
for instance in self.state.instances.values():
if (
instance.shard_assignments.model_id
== command.request_params.model
== command.task_params.model
):
task_count = sum(
1
@@ -218,7 +218,7 @@ class Master:
if not instance_task_counts:
raise ValueError(
f"No instance found for model {command.request_params.model}"
f"No instance found for model {command.task_params.model}"
)
available_instance_ids = sorted(
@@ -237,7 +237,7 @@ class Master:
command_id=command.command_id,
instance_id=available_instance_ids[0],
task_status=TaskStatus.Pending,
task_params=command.request_params,
task_params=command.task_params,
),
)
)

View File

@@ -0,0 +1,182 @@
"""Tests for Claude Messages API conversion functions and types."""
import pydantic
import pytest
from exo.master.adapters.claude import (
claude_request_to_text_generation,
finish_reason_to_claude_stop_reason,
)
from exo.shared.types.claude_api import (
ClaudeMessage,
ClaudeMessagesRequest,
ClaudeTextBlock,
)
from exo.shared.types.common import ModelId
class TestFinishReasonToClaudeStopReason:
"""Tests for finish_reason to Claude stop_reason mapping."""
def test_stop_maps_to_end_turn(self):
assert finish_reason_to_claude_stop_reason("stop") == "end_turn"
def test_length_maps_to_max_tokens(self):
assert finish_reason_to_claude_stop_reason("length") == "max_tokens"
def test_tool_calls_maps_to_tool_use(self):
assert finish_reason_to_claude_stop_reason("tool_calls") == "tool_use"
def test_function_call_maps_to_tool_use(self):
assert finish_reason_to_claude_stop_reason("function_call") == "tool_use"
def test_content_filter_maps_to_end_turn(self):
assert finish_reason_to_claude_stop_reason("content_filter") == "end_turn"
def test_none_returns_none(self):
assert finish_reason_to_claude_stop_reason(None) is None
class TestClaudeRequestToInternal:
"""Tests for converting Claude Messages API requests to TextGenerationTaskParams."""
def test_basic_request_conversion(self):
request = ClaudeMessagesRequest(
model=ModelId("claude-3-opus"),
max_tokens=100,
messages=[
ClaudeMessage(role="user", content="Hello"),
],
)
params = claude_request_to_text_generation(request)
assert params.model == "claude-3-opus"
assert params.max_output_tokens == 100
assert isinstance(params.input, list)
assert len(params.input) == 1
assert params.input[0].role == "user"
assert params.input[0].content == "Hello"
assert params.instructions is None
def test_request_with_system_string(self):
request = ClaudeMessagesRequest(
model=ModelId("claude-3-opus"),
max_tokens=100,
system="You are a helpful assistant.",
messages=[
ClaudeMessage(role="user", content="Hello"),
],
)
params = claude_request_to_text_generation(request)
assert params.instructions == "You are a helpful assistant."
assert isinstance(params.input, list)
assert len(params.input) == 1
assert params.input[0].role == "user"
assert params.input[0].content == "Hello"
def test_request_with_system_text_blocks(self):
request = ClaudeMessagesRequest(
model=ModelId("claude-3-opus"),
max_tokens=100,
system=[
ClaudeTextBlock(text="You are helpful. "),
ClaudeTextBlock(text="Be concise."),
],
messages=[
ClaudeMessage(role="user", content="Hello"),
],
)
params = claude_request_to_text_generation(request)
assert params.instructions == "You are helpful. Be concise."
assert isinstance(params.input, list)
assert len(params.input) == 1
def test_request_with_content_blocks(self):
request = ClaudeMessagesRequest(
model=ModelId("claude-3-opus"),
max_tokens=100,
messages=[
ClaudeMessage(
role="user",
content=[
ClaudeTextBlock(text="First part. "),
ClaudeTextBlock(text="Second part."),
],
),
],
)
params = claude_request_to_text_generation(request)
assert isinstance(params.input, list)
assert len(params.input) == 1
assert params.input[0].content == "First part. Second part."
def test_request_with_multi_turn_conversation(self):
request = ClaudeMessagesRequest(
model=ModelId("claude-3-opus"),
max_tokens=100,
messages=[
ClaudeMessage(role="user", content="Hello"),
ClaudeMessage(role="assistant", content="Hi there!"),
ClaudeMessage(role="user", content="How are you?"),
],
)
params = claude_request_to_text_generation(request)
assert isinstance(params.input, list)
assert len(params.input) == 3
assert params.input[0].role == "user"
assert params.input[1].role == "assistant"
assert params.input[2].role == "user"
def test_request_with_optional_parameters(self):
request = ClaudeMessagesRequest(
model=ModelId("claude-3-opus"),
max_tokens=100,
messages=[ClaudeMessage(role="user", content="Hello")],
temperature=0.7,
top_p=0.9,
top_k=40,
stop_sequences=["STOP", "END"],
stream=True,
)
params = claude_request_to_text_generation(request)
assert params.temperature == 0.7
assert params.top_p == 0.9
assert params.top_k == 40
assert params.stop == ["STOP", "END"]
assert params.stream is True
class TestClaudeMessagesRequestValidation:
"""Tests for Claude Messages API request validation."""
def test_request_requires_model(self):
with pytest.raises(pydantic.ValidationError):
ClaudeMessagesRequest.model_validate(
{
"max_tokens": 100,
"messages": [{"role": "user", "content": "Hello"}],
}
)
def test_request_requires_max_tokens(self):
with pytest.raises(pydantic.ValidationError):
ClaudeMessagesRequest.model_validate(
{
"model": "claude-3-opus",
"messages": [{"role": "user", "content": "Hello"}],
}
)
def test_request_requires_messages(self):
with pytest.raises(pydantic.ValidationError):
ClaudeMessagesRequest.model_validate(
{
"model": "claude-3-opus",
"max_tokens": 100,
}
)

View File

@@ -0,0 +1,265 @@
"""Tests for Claude Messages API tool_use support in the adapter."""
import json
from collections.abc import AsyncGenerator
from typing import Any, cast
from exo.master.adapters.claude import collect_claude_response, generate_claude_stream
from exo.shared.types.api import ToolCallItem
from exo.shared.types.chunks import ErrorChunk, TokenChunk, ToolCallChunk
from exo.shared.types.common import CommandId, ModelId
async def _chunks_to_stream(
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk],
) -> AsyncGenerator[ErrorChunk | ToolCallChunk | TokenChunk, None]:
for chunk in chunks:
yield chunk
MODEL = ModelId("test-model")
COMMAND_ID = CommandId("cmd_test123")
def _parse_sse_events(events: list[str]) -> list[dict[str, Any]]:
"""Parse SSE event strings into JSON dicts."""
parsed: list[dict[str, Any]] = []
for event_str in events:
for line in event_str.strip().split("\n"):
if line.startswith("data: "):
parsed.append(cast(dict[str, Any], json.loads(line[6:])))
return parsed
class TestCollectClaudeResponseToolUse:
"""Tests for non-streaming tool_use response collection."""
async def test_tool_call_chunk_produces_tool_use_blocks(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = [
ToolCallChunk(
model=MODEL,
usage=None,
tool_calls=[
ToolCallItem(
name="get_weather",
arguments='{"location": "San Francisco"}',
)
],
),
]
response = await collect_claude_response(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
)
assert response.stop_reason == "tool_use"
tool_blocks = [b for b in response.content if b.type == "tool_use"]
assert len(tool_blocks) == 1
block = tool_blocks[0]
assert block.type == "tool_use"
assert block.name == "get_weather"
assert block.input == {"location": "San Francisco"}
assert block.id.startswith("toolu_")
async def test_multiple_tool_calls(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = [
ToolCallChunk(
model=MODEL,
usage=None,
tool_calls=[
ToolCallItem(
name="get_weather",
arguments='{"location": "SF"}',
),
ToolCallItem(
name="get_time",
arguments='{"timezone": "PST"}',
),
],
),
]
response = await collect_claude_response(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
)
assert response.stop_reason == "tool_use"
tool_blocks = [b for b in response.content if b.type == "tool_use"]
assert len(tool_blocks) == 2
assert tool_blocks[0].name == "get_weather"
assert tool_blocks[1].name == "get_time"
async def test_mixed_text_and_tool_use(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = [
TokenChunk(model=MODEL, text="Let me check ", token_id=1, usage=None),
TokenChunk(model=MODEL, text="the weather.", token_id=2, usage=None),
ToolCallChunk(
model=MODEL,
usage=None,
tool_calls=[
ToolCallItem(
name="get_weather",
arguments='{"location": "NYC"}',
)
],
),
]
response = await collect_claude_response(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
)
assert response.stop_reason == "tool_use"
text_blocks = [b for b in response.content if b.type == "text"]
tool_blocks = [b for b in response.content if b.type == "tool_use"]
assert len(text_blocks) == 1
assert text_blocks[0].text == "Let me check the weather."
assert len(tool_blocks) == 1
assert tool_blocks[0].name == "get_weather"
async def test_no_content_produces_empty_text_block(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = []
response = await collect_claude_response(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
)
assert len(response.content) == 1
assert response.content[0].type == "text"
class TestGenerateClaudeStreamToolUse:
"""Tests for streaming tool_use event generation."""
async def test_tool_call_emits_tool_use_events(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = [
ToolCallChunk(
model=MODEL,
usage=None,
tool_calls=[
ToolCallItem(
name="get_weather",
arguments='{"location": "SF"}',
)
],
),
]
events: list[str] = []
async for event in generate_claude_stream(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
):
events.append(event)
parsed = _parse_sse_events(events)
# Find tool_use content_block_start
tool_starts = [
e
for e in parsed
if e.get("type") == "content_block_start"
and cast(dict[str, Any], e.get("content_block", {})).get("type")
== "tool_use"
]
assert len(tool_starts) == 1
content_block = cast(dict[str, Any], tool_starts[0]["content_block"])
assert content_block["name"] == "get_weather"
assert content_block["input"] == {}
assert cast(str, content_block["id"]).startswith("toolu_")
# Find input_json_delta
json_deltas = [
e
for e in parsed
if e.get("type") == "content_block_delta"
and cast(dict[str, Any], e.get("delta", {})).get("type")
== "input_json_delta"
]
assert len(json_deltas) == 1
delta = cast(dict[str, Any], json_deltas[0]["delta"])
assert json.loads(cast(str, delta["partial_json"])) == {"location": "SF"}
# Find message_delta with tool_use stop reason
msg_deltas = [e for e in parsed if e.get("type") == "message_delta"]
assert len(msg_deltas) == 1
assert cast(dict[str, Any], msg_deltas[0]["delta"])["stop_reason"] == "tool_use"
async def test_streaming_mixed_text_and_tool_use(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = [
TokenChunk(model=MODEL, text="Hello ", token_id=1, usage=None),
ToolCallChunk(
model=MODEL,
usage=None,
tool_calls=[
ToolCallItem(
name="search",
arguments='{"query": "test"}',
)
],
),
]
events: list[str] = []
async for event in generate_claude_stream(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
):
events.append(event)
parsed = _parse_sse_events(events)
# Should have text delta at index 0
text_deltas = [
e
for e in parsed
if e.get("type") == "content_block_delta"
and cast(dict[str, Any], e.get("delta", {})).get("type") == "text_delta"
]
assert len(text_deltas) == 1
assert text_deltas[0]["index"] == 0
assert cast(dict[str, Any], text_deltas[0]["delta"])["text"] == "Hello "
# Tool block at index 1
tool_starts = [
e
for e in parsed
if e.get("type") == "content_block_start"
and cast(dict[str, Any], e.get("content_block", {})).get("type")
== "tool_use"
]
assert len(tool_starts) == 1
assert tool_starts[0]["index"] == 1
# Stop reason should be tool_use
msg_deltas = [e for e in parsed if e.get("type") == "message_delta"]
assert cast(dict[str, Any], msg_deltas[0]["delta"])["stop_reason"] == "tool_use"
async def test_streaming_tool_block_stop_events(self):
chunks: list[ErrorChunk | ToolCallChunk | TokenChunk] = [
ToolCallChunk(
model=MODEL,
usage=None,
tool_calls=[
ToolCallItem(name="fn1", arguments="{}"),
ToolCallItem(name="fn2", arguments='{"a": 1}'),
],
),
]
events: list[str] = []
async for event in generate_claude_stream(
COMMAND_ID, "test-model", _chunks_to_stream(chunks)
):
events.append(event)
parsed = _parse_sse_events(events)
# Two tool block starts (at indices 1 and 2)
tool_starts = [
e
for e in parsed
if e.get("type") == "content_block_start"
and cast(dict[str, Any], e.get("content_block", {})).get("type")
== "tool_use"
]
assert len(tool_starts) == 2
assert tool_starts[0]["index"] == 1
assert tool_starts[1]["index"] == 2
# Two tool block stops (at indices 1 and 2), plus text block stop at 0
block_stops = [e for e in parsed if e.get("type") == "content_block_stop"]
stop_indices = [e["index"] for e in block_stops]
assert 0 in stop_indices
assert 1 in stop_indices
assert 2 in stop_indices

View File

@@ -7,15 +7,14 @@ from loguru import logger
from exo.master.main import Master
from exo.routing.router import get_node_id_keypair
from exo.shared.models.model_cards import ModelCard, ModelId, ModelTask
from exo.shared.types.api import ChatCompletionMessage, ChatCompletionTaskParams
from exo.shared.models.model_cards import ModelCard, ModelTask
from exo.shared.types.commands import (
ChatCompletion,
CommandId,
ForwarderCommand,
PlaceInstance,
TextGeneration,
)
from exo.shared.types.common import NodeId, SessionId
from exo.shared.types.common import ModelId, NodeId, SessionId
from exo.shared.types.events import (
ForwarderEvent,
IndexedEvent,
@@ -27,8 +26,9 @@ from exo.shared.types.memory import Memory
from exo.shared.types.profiling import (
MemoryUsage,
)
from exo.shared.types.tasks import ChatCompletion as ChatCompletionTask
from exo.shared.types.tasks import TaskStatus
from exo.shared.types.tasks import TextGeneration as TextGenerationTask
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import (
InstanceMeta,
MlxRingInstance,
@@ -127,20 +127,16 @@ async def test_master():
logger.info("wait for an instance")
while len(master.state.instances.keys()) == 0:
await anyio.sleep(0.001)
logger.info("inject a ChatCompletion Command")
logger.info("inject a TextGeneration Command")
await command_sender.send(
ForwarderCommand(
origin=node_id,
command=(
ChatCompletion(
TextGeneration(
command_id=CommandId(),
request_params=ChatCompletionTaskParams(
model="llama-3.2-1b",
messages=[
ChatCompletionMessage(
role="user", content="Hello, how are you?"
)
],
task_params=TextGenerationTaskParams(
model=ModelId("llama-3.2-1b"),
input="Hello, how are you?",
),
)
),
@@ -190,12 +186,10 @@ async def test_master():
assert created_instance.ephemeral_port > 0
assert isinstance(events[2].event, TaskCreated)
assert events[2].event.task.task_status == TaskStatus.Pending
assert isinstance(events[2].event.task, ChatCompletionTask)
assert events[2].event.task.task_params == ChatCompletionTaskParams(
model="llama-3.2-1b",
messages=[
ChatCompletionMessage(role="user", content="Hello, how are you?")
],
assert isinstance(events[2].event.task, TextGenerationTask)
assert events[2].event.task.task_params == TextGenerationTaskParams(
model=ModelId("llama-3.2-1b"),
input="Hello, how are you?",
)
await master.shutdown()

View File

@@ -0,0 +1,48 @@
"""Tests for OpenAI Responses API wire types.
ResponsesRequest is the API wire type for the Responses endpoint.
The responses adapter converts it to TextGenerationTaskParams for the pipeline.
"""
import pydantic
import pytest
from exo.shared.types.common import ModelId
from exo.shared.types.openai_responses import (
ResponseInputMessage,
ResponsesRequest,
)
class TestResponsesRequestValidation:
"""Tests for OpenAI Responses API request validation."""
def test_request_requires_model(self):
with pytest.raises(pydantic.ValidationError):
ResponsesRequest.model_validate(
{
"input": "Hello",
}
)
def test_request_requires_input(self):
with pytest.raises(pydantic.ValidationError):
ResponsesRequest.model_validate(
{
"model": "gpt-4o",
}
)
def test_request_accepts_string_input(self):
request = ResponsesRequest(
model=ModelId("gpt-4o"),
input="Hello",
)
assert request.input == "Hello"
def test_request_accepts_message_array_input(self):
request = ResponsesRequest(
model=ModelId("gpt-4o"),
input=[ResponseInputMessage(role="user", content="Hello")],
)
assert len(request.input) == 1

View File

@@ -216,6 +216,8 @@ def get_node_id_keypair(
Obtains the :class:`Keypair` associated with this node-ID.
Obtain the :class:`PeerId` by from it.
"""
# TODO(evan): bring back node id persistence once we figure out how to deal with duplicates
return Keypair.generate_ed25519()
def lock_path(path: str | bytes | PathLike[str] | PathLike[bytes]) -> Path:
return Path(str(path) + ".lock")

View File

@@ -1,5 +1,5 @@
from enum import Enum
from typing import Annotated
from typing import Annotated, Any
import aiofiles
import aiofiles.os as aios
@@ -7,7 +7,14 @@ import tomlkit
from anyio import Path, open_file
from huggingface_hub import model_info
from loguru import logger
from pydantic import BaseModel, Field, PositiveInt, field_validator
from pydantic import (
AliasChoices,
BaseModel,
Field,
PositiveInt,
field_validator,
model_validator,
)
from exo.shared.constants import EXO_ENABLE_IMAGE_MODELS
from exo.shared.types.common import ModelId
@@ -121,6 +128,14 @@ MODEL_CARDS: dict[str, ModelCard] = {
supports_tensor=True,
tasks=[ModelTask.TextGeneration],
),
"kimi-k2.5": ModelCard(
model_id=ModelId("mlx-community/Kimi-K2.5"),
storage_size=Memory.from_gb(617),
n_layers=61,
hidden_size=7168,
supports_tensor=True,
tasks=[ModelTask.TextGeneration],
),
# llama-3.1
"llama-3.1-8b": ModelCard(
model_id=ModelId("mlx-community/Meta-Llama-3.1-8B-Instruct-4bit"),
@@ -255,7 +270,7 @@ MODEL_CARDS: dict[str, ModelCard] = {
),
"qwen3-80b-a3B-thinking-4bit": ModelCard(
model_id=ModelId("mlx-community/Qwen3-Next-80B-A3B-Thinking-4bit"),
storage_size=Memory.from_mb(84700),
storage_size=Memory.from_mb(44900),
n_layers=48,
hidden_size=2048,
supports_tensor=True,
@@ -703,15 +718,18 @@ if EXO_ENABLE_IMAGE_MODELS:
class ConfigData(BaseModel):
model_config = {"extra": "ignore"} # Allow unknown fields
# Common field names for number of layers across different architectures
num_hidden_layers: Annotated[int, Field(ge=0)] | None = None
num_layers: Annotated[int, Field(ge=0)] | None = None
n_layer: Annotated[int, Field(ge=0)] | None = None
n_layers: Annotated[int, Field(ge=0)] | None = None # Sometimes used
num_decoder_layers: Annotated[int, Field(ge=0)] | None = None # Transformer models
decoder_layers: Annotated[int, Field(ge=0)] | None = None # Some architectures
hidden_size: Annotated[int, Field(ge=0)] | None = None
architectures: list[str] | None = None
hidden_size: Annotated[int, Field(ge=0)] | None = None
layer_count: int = Field(
validation_alias=AliasChoices(
"num_hidden_layers",
"num_layers",
"n_layer",
"n_layers",
"num_decoder_layers",
"decoder_layers",
)
)
@property
def supports_tensor(self) -> bool:
@@ -726,25 +744,27 @@ class ConfigData(BaseModel):
["GptOssForCausalLM"],
]
@property
def layer_count(self) -> int:
# Check common field names for layer count
layer_fields = [
self.num_hidden_layers,
self.num_layers,
self.n_layer,
self.n_layers,
self.num_decoder_layers,
self.decoder_layers,
]
@model_validator(mode="before")
@classmethod
def defer_to_text_config(cls, data: dict[str, Any]):
text_config = data.get("text_config")
if text_config is None:
return data
for layer_count in layer_fields:
if layer_count is not None:
return layer_count
for field in [
"architectures",
"hidden_size",
"num_hidden_layers",
"num_layers",
"n_layer",
"n_layers",
"num_decoder_layers",
"decoder_layers",
]:
if (val := text_config.get(field)) is not None: # pyright: ignore[reportAny]
data[field] = val
raise ValueError(
f"No layer count found in config.json: {self.model_dump_json()}"
)
return data
async def get_config_data(model_id: ModelId) -> ConfigData:

View File

@@ -8,7 +8,7 @@ from multiprocessing.synchronize import Event as EventT
from multiprocessing.synchronize import Semaphore as SemaphoreT
from loguru import logger
from pytest import LogCaptureFixture
from pytest import LogCaptureFixture, mark
from exo.routing.router import get_node_id_keypair
from exo.shared.constants import EXO_NODE_ID_KEYPAIR
@@ -74,6 +74,7 @@ def _delete_if_exists(p: str | bytes | os.PathLike[str] | os.PathLike[bytes]):
os.remove(p)
@mark.skip(reason="this functionality is currently disabled but may return in future")
def test_node_id_fetching(caplog: LogCaptureFixture):
reps = 10

View File

@@ -2,7 +2,6 @@ import time
from collections.abc import Generator
from typing import Annotated, Any, Literal
from fastapi import UploadFile
from pydantic import BaseModel, Field, field_validator
from pydantic_core import PydanticUseDefault
@@ -116,8 +115,8 @@ class Usage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
prompt_tokens_details: PromptTokensDetails | None = None
completion_tokens_details: CompletionTokensDetails | None = None
prompt_tokens_details: PromptTokensDetails
completion_tokens_details: CompletionTokensDetails
class StreamingChoiceResponse(BaseModel):
@@ -170,8 +169,12 @@ class BenchChatCompletionResponse(ChatCompletionResponse):
generation_stats: GenerationStats | None = None
class ChatCompletionTaskParams(BaseModel):
model: str
class StreamOptions(BaseModel):
include_usage: bool = False
class ChatCompletionRequest(BaseModel):
model: ModelId
frequency_penalty: float | None = None
messages: list[ChatCompletionMessage]
logit_bias: dict[str, int] | None = None
@@ -184,15 +187,17 @@ class ChatCompletionTaskParams(BaseModel):
seed: int | None = None
stop: str | list[str] | None = None
stream: bool = False
stream_options: StreamOptions | None = None
temperature: float | None = None
top_p: float | None = None
top_k: int | None = None
tools: list[dict[str, Any]] | None = None
tool_choice: str | dict[str, Any] | None = None
parallel_tool_calls: bool | None = None
user: str | None = None
class BenchChatCompletionTaskParams(ChatCompletionTaskParams):
class BenchChatCompletionRequest(ChatCompletionRequest):
pass
@@ -276,28 +281,7 @@ class BenchImageGenerationTaskParams(ImageGenerationTaskParams):
class ImageEditsTaskParams(BaseModel):
image: UploadFile
prompt: str
background: str | None = None
input_fidelity: float | None = None
mask: UploadFile | None = None
model: str
n: int | None = 1
output_compression: int | None = None
output_format: Literal["png", "jpeg", "webp"] = "png"
partial_images: int | None = 0
quality: Literal["high", "medium", "low"] | None = "medium"
response_format: Literal["url", "b64_json"] | None = "b64_json"
size: str | None = "1024x1024"
stream: bool | None = False
user: str | None = None
advanced_params: AdvancedImageParams | None = None
# Internal flag for benchmark mode - set by API, preserved through serialization
bench: bool = False
class ImageEditsInternalParams(BaseModel):
"""Serializable version of ImageEditsTaskParams for distributed task execution."""
"""Internal task params for image-editing requests."""
image_data: str = "" # Base64-encoded image (empty when using chunked transfer)
total_input_chunks: int = 0

View File

@@ -2,7 +2,12 @@ from collections.abc import Generator
from typing import Any, Literal
from exo.shared.models.model_cards import ModelId
from exo.shared.types.api import GenerationStats, ImageGenerationStats
from exo.shared.types.api import (
GenerationStats,
ImageGenerationStats,
TopLogprobItem,
Usage,
)
from exo.utils.pydantic_ext import TaggedModel
from .api import FinishReason
@@ -17,8 +22,11 @@ class BaseChunk(TaggedModel):
class TokenChunk(BaseChunk):
text: str
token_id: int
usage: Usage | None
finish_reason: Literal["stop", "length", "content_filter"] | None = None
stats: GenerationStats | None = None
logprob: float | None = None
top_logprobs: list[TopLogprobItem] | None = None
class ErrorChunk(BaseChunk):
@@ -28,6 +36,7 @@ class ErrorChunk(BaseChunk):
class ToolCallChunk(BaseChunk):
tool_calls: list[ToolCallItem]
usage: Usage | None
finish_reason: Literal["tool_calls"] = "tool_calls"
stats: GenerationStats | None = None

View File

@@ -0,0 +1,214 @@
"""Claude Messages API types for request/response conversion."""
from typing import Any, Literal
from pydantic import BaseModel, Field
from exo.shared.types.common import ModelId
# Tool definition types
ClaudeToolInputSchema = dict[str, Any]
class ClaudeToolDefinition(BaseModel, frozen=True):
"""Tool definition in Claude Messages API request."""
name: str
description: str | None = None
input_schema: ClaudeToolInputSchema
# Type aliases
ClaudeRole = Literal["user", "assistant"]
ClaudeStopReason = Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]
# Content block types
class ClaudeTextBlock(BaseModel, frozen=True):
"""Text content block in Claude Messages API."""
type: Literal["text"] = "text"
text: str
class ClaudeImageSource(BaseModel, frozen=True):
"""Image source for Claude image blocks."""
type: Literal["base64", "url"]
media_type: str | None = None
data: str | None = None
url: str | None = None
class ClaudeImageBlock(BaseModel, frozen=True):
"""Image content block in Claude Messages API."""
type: Literal["image"] = "image"
source: ClaudeImageSource
class ClaudeToolUseBlock(BaseModel, frozen=True):
"""Tool use content block in Claude Messages API."""
type: Literal["tool_use"] = "tool_use"
id: str
name: str
input: dict[str, Any]
class ClaudeToolResultBlock(BaseModel, frozen=True):
"""Tool result content block in Claude Messages API request."""
type: Literal["tool_result"] = "tool_result"
tool_use_id: str
content: str | list[ClaudeTextBlock] | None = None
is_error: bool | None = None
cache_control: dict[str, str] | None = None
ClaudeContentBlock = ClaudeTextBlock | ClaudeImageBlock | ClaudeToolUseBlock
# Input content blocks can also include tool_result (sent by user after tool_use)
ClaudeInputContentBlock = (
ClaudeTextBlock | ClaudeImageBlock | ClaudeToolUseBlock | ClaudeToolResultBlock
)
# Request types
class ClaudeMessage(BaseModel, frozen=True):
"""Message in Claude Messages API request."""
role: ClaudeRole
content: str | list[ClaudeInputContentBlock]
class ClaudeMessagesRequest(BaseModel):
"""Request body for Claude Messages API."""
model: ModelId
max_tokens: int
messages: list[ClaudeMessage]
system: str | list[ClaudeTextBlock] | None = None
stop_sequences: list[str] | None = None
stream: bool = False
temperature: float | None = None
top_p: float | None = None
top_k: int | None = None
tools: list[ClaudeToolDefinition] | None = None
metadata: dict[str, str] | None = None
# Response types
class ClaudeUsage(BaseModel, frozen=True):
"""Token usage in Claude Messages API response."""
input_tokens: int
output_tokens: int
class ClaudeMessagesResponse(BaseModel, frozen=True):
"""Response body for Claude Messages API."""
id: str
type: Literal["message"] = "message"
role: Literal["assistant"] = "assistant"
content: list[ClaudeContentBlock]
model: str
stop_reason: ClaudeStopReason | None = None
stop_sequence: str | None = None
usage: ClaudeUsage
# Streaming event types
class ClaudeMessageStart(BaseModel, frozen=True):
"""Partial message in message_start event."""
id: str
type: Literal["message"] = "message"
role: Literal["assistant"] = "assistant"
content: list[ClaudeTextBlock] = Field(default_factory=list)
model: str
stop_reason: ClaudeStopReason | None = None
stop_sequence: str | None = None
usage: ClaudeUsage
class ClaudeMessageStartEvent(BaseModel, frozen=True):
"""Event sent at start of message stream."""
type: Literal["message_start"] = "message_start"
message: ClaudeMessageStart
class ClaudeContentBlockStartEvent(BaseModel, frozen=True):
"""Event sent at start of a content block."""
type: Literal["content_block_start"] = "content_block_start"
index: int
content_block: ClaudeTextBlock | ClaudeToolUseBlock
class ClaudeTextDelta(BaseModel, frozen=True):
"""Delta for text content block."""
type: Literal["text_delta"] = "text_delta"
text: str
class ClaudeInputJsonDelta(BaseModel, frozen=True):
"""Delta for tool use input JSON content block."""
type: Literal["input_json_delta"] = "input_json_delta"
partial_json: str
class ClaudeContentBlockDeltaEvent(BaseModel, frozen=True):
"""Event sent for content block delta."""
type: Literal["content_block_delta"] = "content_block_delta"
index: int
delta: ClaudeTextDelta | ClaudeInputJsonDelta
class ClaudeContentBlockStopEvent(BaseModel, frozen=True):
"""Event sent at end of a content block."""
type: Literal["content_block_stop"] = "content_block_stop"
index: int
class ClaudeMessageDeltaUsage(BaseModel, frozen=True):
"""Usage in message_delta event."""
output_tokens: int
class ClaudeMessageDelta(BaseModel, frozen=True):
"""Delta in message_delta event."""
stop_reason: ClaudeStopReason | None = None
stop_sequence: str | None = None
class ClaudeMessageDeltaEvent(BaseModel, frozen=True):
"""Event sent with final message delta."""
type: Literal["message_delta"] = "message_delta"
delta: ClaudeMessageDelta
usage: ClaudeMessageDeltaUsage
class ClaudeMessageStopEvent(BaseModel, frozen=True):
"""Event sent at end of message stream."""
type: Literal["message_stop"] = "message_stop"
ClaudeStreamEvent = (
ClaudeMessageStartEvent
| ClaudeContentBlockStartEvent
| ClaudeContentBlockDeltaEvent
| ClaudeContentBlockStopEvent
| ClaudeMessageDeltaEvent
| ClaudeMessageStopEvent
)

View File

@@ -2,12 +2,12 @@ from pydantic import Field
from exo.shared.models.model_cards import ModelCard, ModelId
from exo.shared.types.api import (
ChatCompletionTaskParams,
ImageEditsInternalParams,
ImageEditsTaskParams,
ImageGenerationTaskParams,
)
from exo.shared.types.chunks import InputImageChunk
from exo.shared.types.common import CommandId, NodeId
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
from exo.shared.types.worker.shards import Sharding, ShardMetadata
from exo.utils.pydantic_ext import CamelCaseModel, TaggedModel
@@ -21,16 +21,16 @@ class TestCommand(BaseCommand):
__test__ = False
class ChatCompletion(BaseCommand):
request_params: ChatCompletionTaskParams
class TextGeneration(BaseCommand):
task_params: TextGenerationTaskParams
class ImageGeneration(BaseCommand):
request_params: ImageGenerationTaskParams
task_params: ImageGenerationTaskParams
class ImageEdits(BaseCommand):
request_params: ImageEditsInternalParams
task_params: ImageEditsTaskParams
class PlaceInstance(BaseCommand):
@@ -78,7 +78,7 @@ DownloadCommand = StartDownload | DeleteDownload
Command = (
TestCommand
| RequestEventLog
| ChatCompletion
| TextGeneration
| ImageGeneration
| ImageEdits
| PlaceInstance

View File

@@ -0,0 +1,296 @@
"""OpenAI Responses API wire types.
These types model the OpenAI Responses API request/response format.
ResponsesRequest is the API-level wire type; for the canonical internal
task params type used by the inference pipeline, see
``exo.shared.types.text_generation.TextGenerationTaskParams``.
"""
import time
from typing import Any, Literal
from pydantic import BaseModel, Field
from exo.shared.types.common import ModelId
# Type aliases
ResponseStatus = Literal["completed", "failed", "in_progress", "incomplete"]
ResponseRole = Literal["user", "assistant", "system", "developer"]
# Request input content part types
class ResponseInputTextPart(BaseModel, frozen=True):
"""Text content part in a Responses API input message."""
type: Literal["input_text"] = "input_text"
text: str
class ResponseOutputTextPart(BaseModel, frozen=True):
"""Output text content part (used when replaying assistant messages in input)."""
type: Literal["output_text"] = "output_text"
text: str
ResponseContentPart = ResponseInputTextPart | ResponseOutputTextPart
# Request input item types
class ResponseInputMessage(BaseModel, frozen=True):
"""Input message for Responses API."""
role: ResponseRole
content: str | list[ResponseContentPart]
type: Literal["message"] = "message"
class FunctionCallInputItem(BaseModel, frozen=True):
"""Function call item replayed in input (from a previous assistant response)."""
type: Literal["function_call"] = "function_call"
id: str | None = None
call_id: str
name: str
arguments: str
status: ResponseStatus | None = None
class FunctionCallOutputInputItem(BaseModel, frozen=True):
"""Function call output item in input (user providing tool results)."""
type: Literal["function_call_output"] = "function_call_output"
call_id: str
output: str
id: str | None = None
status: ResponseStatus | None = None
ResponseInputItem = (
ResponseInputMessage | FunctionCallInputItem | FunctionCallOutputInputItem
)
class ResponsesRequest(BaseModel, frozen=True):
"""Request body for OpenAI Responses API.
This is the API wire type for the Responses endpoint. The canonical
internal task params type is ``TextGenerationTaskParams``; see the
``responses_request_to_text_generation`` adapter for conversion.
"""
# --- OpenAI Responses API standard fields ---
model: ModelId
input: str | list[ResponseInputItem]
instructions: str | None = None
max_output_tokens: int | None = None
temperature: float | None = None
top_p: float | None = None
stream: bool = False
tools: list[dict[str, Any]] | None = None
metadata: dict[str, str] | None = None
# --- exo extensions (not in OpenAI Responses API spec) ---
top_k: int | None = Field(
default=None,
description="[exo extension] Top-k sampling parameter. Not part of the OpenAI Responses API.",
json_schema_extra={"x-exo-extension": True},
)
stop: str | list[str] | None = Field(
default=None,
description="[exo extension] Stop sequence(s). Not part of the OpenAI Responses API.",
json_schema_extra={"x-exo-extension": True},
)
seed: int | None = Field(
default=None,
description="[exo extension] Seed for deterministic sampling. Not part of the OpenAI Responses API.",
json_schema_extra={"x-exo-extension": True},
)
# --- Internal fields (preserved during serialization, hidden from OpenAPI schema) ---
chat_template_messages: list[dict[str, Any]] | None = Field(
default=None,
description="Internal: pre-formatted messages for tokenizer chat template. Not part of the OpenAI Responses API.",
json_schema_extra={"x-exo-internal": True},
)
# Response types
class ResponseOutputText(BaseModel, frozen=True):
"""Text content in response output."""
type: Literal["output_text"] = "output_text"
text: str
annotations: list[dict[str, str]] = Field(default_factory=list)
class ResponseMessageItem(BaseModel, frozen=True):
"""Message item in response output array."""
type: Literal["message"] = "message"
id: str
role: Literal["assistant"] = "assistant"
content: list[ResponseOutputText]
status: ResponseStatus = "completed"
class ResponseFunctionCallItem(BaseModel, frozen=True):
"""Function call item in response output array."""
type: Literal["function_call"] = "function_call"
id: str
call_id: str
name: str
arguments: str
status: ResponseStatus = "completed"
ResponseItem = ResponseMessageItem | ResponseFunctionCallItem
class ResponseUsage(BaseModel, frozen=True):
"""Token usage in Responses API response."""
input_tokens: int
output_tokens: int
total_tokens: int
class ResponsesResponse(BaseModel, frozen=True):
"""Response body for OpenAI Responses API."""
id: str
object: Literal["response"] = "response"
created_at: int = Field(default_factory=lambda: int(time.time()))
status: ResponseStatus = "completed"
model: str
output: list[ResponseItem]
output_text: str
usage: ResponseUsage | None = None
# Streaming event types
class ResponseCreatedEvent(BaseModel, frozen=True):
"""Event sent when response is created."""
type: Literal["response.created"] = "response.created"
sequence_number: int
response: ResponsesResponse
class ResponseInProgressEvent(BaseModel, frozen=True):
"""Event sent when response starts processing."""
type: Literal["response.in_progress"] = "response.in_progress"
sequence_number: int
response: ResponsesResponse
class ResponseOutputItemAddedEvent(BaseModel, frozen=True):
"""Event sent when an output item is added."""
type: Literal["response.output_item.added"] = "response.output_item.added"
sequence_number: int
output_index: int
item: ResponseItem
class ResponseContentPartAddedEvent(BaseModel, frozen=True):
"""Event sent when a content part is added."""
type: Literal["response.content_part.added"] = "response.content_part.added"
sequence_number: int
item_id: str
output_index: int
content_index: int
part: ResponseOutputText
class ResponseTextDeltaEvent(BaseModel, frozen=True):
"""Event sent for text delta during streaming."""
type: Literal["response.output_text.delta"] = "response.output_text.delta"
sequence_number: int
item_id: str
output_index: int
content_index: int
delta: str
class ResponseTextDoneEvent(BaseModel, frozen=True):
"""Event sent when text content is done."""
type: Literal["response.output_text.done"] = "response.output_text.done"
sequence_number: int
item_id: str
output_index: int
content_index: int
text: str
class ResponseContentPartDoneEvent(BaseModel, frozen=True):
"""Event sent when a content part is done."""
type: Literal["response.content_part.done"] = "response.content_part.done"
sequence_number: int
item_id: str
output_index: int
content_index: int
part: ResponseOutputText
class ResponseOutputItemDoneEvent(BaseModel, frozen=True):
"""Event sent when an output item is done."""
type: Literal["response.output_item.done"] = "response.output_item.done"
sequence_number: int
output_index: int
item: ResponseItem
class ResponseFunctionCallArgumentsDeltaEvent(BaseModel, frozen=True):
"""Event sent for function call arguments delta during streaming."""
type: Literal["response.function_call_arguments.delta"] = (
"response.function_call_arguments.delta"
)
sequence_number: int
item_id: str
output_index: int
delta: str
class ResponseFunctionCallArgumentsDoneEvent(BaseModel, frozen=True):
"""Event sent when function call arguments are complete."""
type: Literal["response.function_call_arguments.done"] = (
"response.function_call_arguments.done"
)
sequence_number: int
item_id: str
output_index: int
name: str
arguments: str
class ResponseCompletedEvent(BaseModel, frozen=True):
"""Event sent when response is completed."""
type: Literal["response.completed"] = "response.completed"
sequence_number: int
response: ResponsesResponse
ResponsesStreamEvent = (
ResponseCreatedEvent
| ResponseInProgressEvent
| ResponseOutputItemAddedEvent
| ResponseContentPartAddedEvent
| ResponseTextDeltaEvent
| ResponseTextDoneEvent
| ResponseContentPartDoneEvent
| ResponseOutputItemDoneEvent
| ResponseFunctionCallArgumentsDeltaEvent
| ResponseFunctionCallArgumentsDoneEvent
| ResponseCompletedEvent
)

View File

@@ -3,11 +3,11 @@ from enum import Enum
from pydantic import Field
from exo.shared.types.api import (
ChatCompletionTaskParams,
ImageEditsInternalParams,
ImageEditsTaskParams,
ImageGenerationTaskParams,
)
from exo.shared.types.common import CommandId, Id
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import BoundInstance, InstanceId
from exo.shared.types.worker.runners import RunnerId
from exo.shared.types.worker.shards import ShardMetadata
@@ -52,9 +52,9 @@ class StartWarmup(BaseTask): # emitted by Worker
pass
class ChatCompletion(BaseTask): # emitted by Master
class TextGeneration(BaseTask): # emitted by Master
command_id: CommandId
task_params: ChatCompletionTaskParams
task_params: TextGenerationTaskParams
error_type: str | None = Field(default=None)
error_message: str | None = Field(default=None)
@@ -70,7 +70,7 @@ class ImageGeneration(BaseTask): # emitted by Master
class ImageEdits(BaseTask): # emitted by Master
command_id: CommandId
task_params: ImageEditsInternalParams
task_params: ImageEditsTaskParams
error_type: str | None = Field(default=None)
error_message: str | None = Field(default=None)
@@ -86,7 +86,7 @@ Task = (
| ConnectToGroup
| LoadModel
| StartWarmup
| ChatCompletion
| TextGeneration
| ImageGeneration
| ImageEdits
| Shutdown

View File

@@ -0,0 +1,44 @@
"""Canonical internal type for text generation task parameters.
All external API formats (Chat Completions, Claude Messages, OpenAI Responses)
are converted to TextGenerationTaskParams at the API boundary via adapters.
"""
from typing import Any, Literal
from pydantic import BaseModel
from exo.shared.types.common import ModelId
MessageRole = Literal["user", "assistant", "system", "developer"]
class InputMessage(BaseModel, frozen=True):
"""Internal message for text generation pipelines."""
role: MessageRole
content: str
class TextGenerationTaskParams(BaseModel, frozen=True):
"""Canonical internal task params for text generation.
Every API adapter converts its wire type into this before handing
off to the master/worker pipeline.
"""
model: ModelId
input: str | list[InputMessage]
instructions: str | None = None
max_output_tokens: int | None = None
temperature: float | None = None
top_p: float | None = None
stream: bool = False
tools: list[dict[str, Any]] | None = None
bench: bool = False
top_k: int | None = None
stop: str | list[str] | None = None
seed: int | None = None
chat_template_messages: list[dict[str, Any]] | None = None
logprobs: bool = False
top_logprobs: int | None = None

View File

@@ -6,6 +6,8 @@ from exo.shared.types.api import (
GenerationStats,
ImageGenerationStats,
ToolCallItem,
TopLogprobItem,
Usage,
)
from exo.utils.pydantic_ext import TaggedModel
@@ -21,9 +23,11 @@ class TokenizedResponse(BaseRunnerResponse):
class GenerationResponse(BaseRunnerResponse):
text: str
token: int
# logprobs: list[float] | None = None # too big. we can change to be top-k
logprob: float | None = None
top_logprobs: list[TopLogprobItem] | None = None
finish_reason: FinishReason | None = None
stats: GenerationStats | None = None
usage: Usage | None
class ImageGenerationResponse(BaseRunnerResponse):
@@ -57,6 +61,7 @@ class PartialImageResponse(BaseRunnerResponse):
class ToolCallResponse(BaseRunnerResponse):
tool_calls: list[ToolCallItem]
usage: Usage | None
class FinishedResponse(BaseRunnerResponse):

View File

@@ -13,6 +13,7 @@ from anyio.abc import TaskGroup
from anyio.streams.buffered import BufferedByteReceiveStream
from anyio.streams.text import TextReceiveStream
from loguru import logger
from pydantic import ValidationError
from exo.shared.constants import EXO_CONFIG_FILE
from exo.shared.types.memory import Memory
@@ -267,7 +268,7 @@ class NodeConfig(TaggedModel):
contents = (await f.read()).decode("utf-8")
data = tomllib.loads(contents)
return cls.model_validate(data)
except (tomllib.TOMLDecodeError, UnicodeDecodeError):
except (tomllib.TOMLDecodeError, UnicodeDecodeError, ValidationError):
logger.warning("Invalid config file, skipping...")
return None

View File

@@ -11,7 +11,7 @@ from PIL import Image
from exo.shared.types.api import (
AdvancedImageParams,
ImageEditsInternalParams,
ImageEditsTaskParams,
ImageGenerationStats,
ImageGenerationTaskParams,
)
@@ -67,7 +67,7 @@ def warmup_image_generator(model: DistributedImageModel) -> Image.Image | None:
def generate_image(
model: DistributedImageModel,
task: ImageGenerationTaskParams | ImageEditsInternalParams,
task: ImageGenerationTaskParams | ImageEditsTaskParams,
) -> Generator[ImageGenerationResponse | PartialImageResponse, None, None]:
"""Generate image(s), optionally yielding partial results.
@@ -98,14 +98,14 @@ def generate_image(
partial_images = (
task.partial_images
if task.partial_images is not None
else (3 if task.stream else 0)
if task.partial_images is not None and task.stream is not None and task.stream
else 0
)
image_path: Path | None = None
with tempfile.TemporaryDirectory() as tmpdir:
if isinstance(task, ImageEditsInternalParams):
if isinstance(task, ImageEditsTaskParams):
# Decode base64 image data and save to temp file
image_path = Path(tmpdir) / "input.png"
image_path.write_bytes(base64.b64decode(task.image_data))

View File

@@ -348,6 +348,7 @@ class DiffusionRunner:
ctx.in_loop( # pyright: ignore[reportAny]
t=t,
latents=latents,
time_steps=time_steps,
)
mx.eval(latents)

View File

@@ -23,6 +23,7 @@ from mlx_lm.models.glm4_moe_lite import Glm4MoeLiteDecoderLayer, Glm4MoeLiteMLP
from mlx_lm.models.glm4_moe_lite import Model as GLM4MoeLiteModel
from mlx_lm.models.gpt_oss import GptOssMoeModel
from mlx_lm.models.gpt_oss import Model as GptOssModel
from mlx_lm.models.kimi_k25 import Model as KimiK25Model
from mlx_lm.models.llama import Model as LlamaModel
from mlx_lm.models.minimax import Model as MiniMaxModel
from mlx_lm.models.ministral3 import Model as Ministral3Model
@@ -200,6 +201,9 @@ def pipeline_auto_parallel(
device_rank, world_size = model_shard_meta.device_rank, model_shard_meta.world_size
layers = layers[start_layer:end_layer]
for layer in layers:
mx.eval(layer) # type: ignore
layers[0] = PipelineFirstLayer(layers[0], device_rank, group=group)
layers[-1] = PipelineLastLayer(
layers[-1],
@@ -344,7 +348,7 @@ def tensor_auto_parallel(
all_to_sharded_linear_in_place,
sharded_to_all_linear_in_place,
)
elif isinstance(model, (DeepseekV3Model, DeepseekV32Model)):
elif isinstance(model, (DeepseekV3Model, DeepseekV32Model, KimiK25Model)):
tensor_parallel_sharding_strategy = DeepSeekShardingStrategy(
group,
all_to_sharded_linear,
@@ -453,7 +457,7 @@ def _set_layers(model: nn.Module, layers: list[_LayerCallable]) -> None:
# Update DeepSeek V3 specific parameters when layers are shrunk
if isinstance(
model, (DeepseekV3Model, DeepseekV32Model, Glm4MoeModel)
model, (DeepseekV3Model, DeepseekV32Model, Glm4MoeModel, KimiK25Model)
) and hasattr(inner_model_instance, "num_layers"):
logger.info(
f"Setting num_layers to {len(layers)} for model {model.model.__class__.__name__}"

View File

@@ -3,6 +3,7 @@ from copy import deepcopy
from typing import Any, cast
import mlx.core as mx
import psutil
from mlx_lm.models.cache import (
KVCache,
QuantizedKVCache,
@@ -12,25 +13,29 @@ from mlx_lm.models.cache import (
from mlx_lm.models.gpt_oss import Model as GptOssModel
from mlx_lm.tokenizer_utils import TokenizerWrapper
from exo.shared.types.memory import Memory
from exo.shared.types.mlx import KVCacheType
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.constants import CACHE_GROUP_SIZE, KV_CACHE_BITS
from exo.worker.runner.bootstrap import logger
# Fraction of device memory above which LRU eviction kicks in
_DEFAULT_MEMORY_THRESHOLD = 0.85
_DEFAULT_MEMORY_THRESHOLD = 0.9
_MEMORY_THRESHOLD = float(
os.environ.get("EXO_MEMORY_THRESHOLD", _DEFAULT_MEMORY_THRESHOLD)
)
class KVPrefixCache:
def __init__(self, tokenizer: TokenizerWrapper):
def __init__(
self, tokenizer: TokenizerWrapper, group: mx.distributed.Group | None = None
):
self.prompts: list[mx.array] = [] # mx array of tokens (ints)
self.caches: list[KVCacheType] = []
self._last_used: list[int] = [] # monotonic counter of last access per entry
self._access_counter: int = 0
self._tokenizer: TokenizerWrapper = tokenizer
self._group = group
def clear(self):
"""Clear all cached prompts and caches."""
@@ -81,13 +86,13 @@ class KVPrefixCache:
best_snapshot_index, best_snapshot_length = None, 0
for i, cached_prompt in enumerate(self.prompts):
length = _get_prefix_length(tokenized_prompt, cached_prompt)
length = get_prefix_length(tokenized_prompt, cached_prompt)
if length == max_length:
# Exact match - cached prompt starts with our entire prompt
# Trim cache to prompt length - 1, return last token for stream_generate
prompt_cache = deepcopy(self.caches[i])
cached_length = _cache_length(self.caches[i])
cached_length = cache_length(self.caches[i])
tokens_to_trim = cached_length - (max_length - 1)
if tokens_to_trim > 0:
trim_prompt_cache(cast(list[Any], prompt_cache), tokens_to_trim)
@@ -109,7 +114,7 @@ class KVPrefixCache:
prompt_cache = deepcopy(self.caches[best_snapshot_index])
# Trim removes tokens from the end, so we trim (cached_length - prefix_length) to keep the prefix
cached_length = _cache_length(self.caches[best_snapshot_index])
cached_length = cache_length(self.caches[best_snapshot_index])
tokens_to_trim = cached_length - best_snapshot_length
if tokens_to_trim > 0:
trim_prompt_cache(cast(list[Any], prompt_cache), tokens_to_trim)
@@ -131,29 +136,37 @@ class KVPrefixCache:
return prompt_cache, tokenized_prompt, None
def _evict_if_needed(self):
"""Evict least recently used entries while memory pressure is high."""
"""Evict least recently used entries while memory usage is high."""
if len(self.caches) == 0:
return
active: int = mx.metal.get_active_memory()
limit = int(mx.metal.device_info()["max_recommended_working_set_size"])
if active < limit * _MEMORY_THRESHOLD:
return
# Evict LRU entries until below threshold or only one entry left
while len(self.caches) > 0:
while (
len(self.caches) > 1
and self.get_memory_used_percentage() > _MEMORY_THRESHOLD
):
lru_index = self._last_used.index(min(self._last_used))
evicted_tokens = len(self.prompts[lru_index])
self.prompts.pop(lru_index)
self.caches.pop(lru_index)
self._last_used.pop(lru_index)
logger.info(
f"KV cache evicted LRU entry ({evicted_tokens} tokens) due to memory pressure"
f"KV cache evicted LRU entry ({evicted_tokens} tokens) due to memory usage"
)
active = mx.metal.get_active_memory()
if active < limit * _MEMORY_THRESHOLD:
break
def get_memory_used_percentage(self) -> float:
local_pressure: float = get_memory_used_percentage()
if self._group is None:
return local_pressure
all_pressure = mx.distributed.all_gather(
mx.array([local_pressure], dtype=mx.float32),
group=self._group,
)
# .item() evals.
max_pressure = float(mx.max(all_pressure).item())
return max_pressure
def encode_prompt(tokenizer: TokenizerWrapper, prompt: str) -> mx.array:
@@ -168,13 +181,13 @@ def encode_prompt(tokenizer: TokenizerWrapper, prompt: str) -> mx.array:
return mx.array(tokenized_prompt)
def _cache_length(cache: KVCacheType) -> int:
def cache_length(cache: KVCacheType) -> int:
"""Get the number of tokens in a KV cache."""
# Use .offset attribute which all cache types have (len() not implemented in older QuantizedKVCache)
return max(c.offset for c in cache) # type: ignore
def _get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
def get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
"""Find the length of the common prefix between two token arrays."""
n = min(int(prompt.shape[0]), int(cached_prompt.shape[0]))
if n == 0:
@@ -185,6 +198,17 @@ def _get_prefix_length(prompt: mx.array, cached_prompt: mx.array) -> int:
return int(mx.sum(prefix_mask).item())
def get_available_memory() -> Memory:
mem: int = psutil.virtual_memory().available
return Memory.from_bytes(mem)
def get_memory_used_percentage() -> float:
mem = psutil.virtual_memory()
# percent is 0-100
return float(mem.percent / 100)
def make_kv_cache(
model: Model, max_kv_size: int | None = None, keep: int = 0
) -> KVCacheType:

View File

@@ -11,5 +11,7 @@ QUANTIZE_MODEL_MODE: str | None = "affine"
CACHE_GROUP_SIZE: int = 64
KV_CACHE_BITS: int | None = None
DEFAULT_TOP_LOGPROBS: int = 5
# TODO: We should really make this opt-in, but Kimi requires trust_remote_code=True
TRUST_REMOTE_CODE: bool = True

View File

@@ -8,20 +8,28 @@ from mlx_lm.sample_utils import make_sampler
from mlx_lm.tokenizer_utils import TokenizerWrapper
from exo.shared.types.api import (
BenchChatCompletionTaskParams,
ChatCompletionMessage,
CompletionTokensDetails,
FinishReason,
GenerationStats,
PromptTokensDetails,
TopLogprobItem,
Usage,
)
from exo.shared.types.common import ModelId
from exo.shared.types.memory import Memory
from exo.shared.types.mlx import KVCacheType
from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.runner_response import (
GenerationResponse,
)
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.cache import KVPrefixCache, encode_prompt, make_kv_cache
from exo.worker.engines.mlx.constants import KV_BITS, KV_GROUP_SIZE, MAX_TOKENS
from exo.worker.engines.mlx.constants import (
DEFAULT_TOP_LOGPROBS,
KV_BITS,
KV_GROUP_SIZE,
MAX_TOKENS,
)
from exo.worker.engines.mlx.utils_mlx import (
apply_chat_template,
mx_barrier,
@@ -39,7 +47,7 @@ def prefill(
sampler: Callable[[mx.array], mx.array],
prompt_tokens: mx.array,
cache: KVCacheType,
) -> float:
) -> tuple[float, int]:
"""Prefill the KV cache with prompt tokens.
This runs the model over the prompt tokens to populate the cache,
@@ -50,7 +58,7 @@ def prefill(
"""
num_tokens = len(prompt_tokens)
if num_tokens == 0:
return 0.0
return 0.0, 0
logger.debug(f"Prefilling {num_tokens} tokens...")
start_time = time.perf_counter()
@@ -85,7 +93,7 @@ def prefill(
f"Prefill complete: {num_tokens} tokens in {elapsed:.2f}s "
f"({tokens_per_sec:.1f} tok/s)"
)
return tokens_per_sec
return tokens_per_sec, num_tokens
def warmup_inference(
@@ -96,14 +104,9 @@ def warmup_inference(
warmup_prompt = apply_chat_template(
tokenizer=tokenizer,
chat_task_data=ChatCompletionTaskParams(
model="",
messages=[
ChatCompletionMessage(
role="user",
content=content,
)
],
task_params=TextGenerationTaskParams(
model=ModelId(""),
input=content,
),
)
@@ -158,24 +161,74 @@ def eos_ids_from_tokenizer(tokenizer: TokenizerWrapper) -> list[int]:
return eos
def extract_top_logprobs(
logprobs: mx.array,
tokenizer: TokenizerWrapper,
top_k: int,
selected_token: int,
) -> tuple[float, list[TopLogprobItem]]:
"""Extract the selected token's logprob and top-k alternative tokens.
Args:
logprobs: Full vocabulary logprobs array from MLX
tokenizer: Tokenizer for decoding token IDs to strings
top_k: Number of top alternatives to return
selected_token: The token ID that was actually sampled
Returns:
Tuple of (selected_token_logprob, list of TopLogprobItem for top-k tokens)
"""
# Get the logprob of the selected token
selected_logprob = float(logprobs[selected_token].item())
# Get top-k indices (most probable tokens)
# mx.argpartition gives indices that would partition the array
# We negate logprobs since argpartition finds smallest, and we want largest
top_k = min(top_k, logprobs.shape[0]) # Don't exceed vocab size
top_indices = mx.argpartition(-logprobs, top_k)[:top_k]
# Get the actual logprob values for these indices
top_values = logprobs[top_indices]
# Sort by logprob (descending) for consistent ordering
sort_order = mx.argsort(-top_values)
top_indices = top_indices[sort_order]
top_values = top_values[sort_order]
# Convert to list of TopLogprobItem
top_logprob_items: list[TopLogprobItem] = []
for i in range(top_k):
token_id = int(top_indices[i].item())
token_logprob = float(top_values[i].item())
# Decode token ID to string
token_str = tokenizer.decode([token_id])
# Get byte representation
token_bytes = list(token_str.encode("utf-8"))
top_logprob_items.append(
TopLogprobItem(
token=token_str,
logprob=token_logprob,
bytes=token_bytes,
)
)
return selected_logprob, top_logprob_items
def mlx_generate(
model: Model,
tokenizer: TokenizerWrapper,
task: ChatCompletionTaskParams,
task: TextGenerationTaskParams,
prompt: str,
kv_prefix_cache: KVPrefixCache | None = None,
) -> Generator[GenerationResponse]:
# Ensure that generation stats only contains peak memory for this generation
mx.reset_peak_memory()
is_bench: bool = isinstance(task, BenchChatCompletionTaskParams)
# Currently we support chat-completion tasks only.
logger.debug(f"task_params: {task}")
if task.seed is not None:
mx.random.seed(task.seed)
# Do not use the prefix cache if we are trying to do benchmarks.
is_bench = task.bench
if is_bench:
kv_prefix_cache = None
@@ -201,58 +254,130 @@ def mlx_generate(
sampler = make_sampler(
temp=task.temperature if task.temperature is not None else 0.7,
top_p=task.top_p if task.top_p is not None else 1.0,
top_k=task.top_k if task.top_k is not None else 0,
)
# Normalize stop sequences to a list
stop_sequences: list[str] = (
([task.stop] if isinstance(task.stop, str) else task.stop)
if task.stop is not None
else []
)
max_stop_len = max((len(s) for s in stop_sequences), default=0)
# Prefill cache with all tokens except the last one
prefill_tps = prefill(model, tokenizer, sampler, prompt_tokens[:-1], caches)
prefill_tps, prefill_tokens = prefill(
model, tokenizer, sampler, prompt_tokens[:-1], caches
)
# stream_generate starts from the last token
last_token = prompt_tokens[-1:]
max_tokens = task.max_tokens or MAX_TOKENS
max_tokens = task.max_output_tokens or MAX_TOKENS
accumulated_text = ""
generated_text_parts: list[str] = []
generation_start_time = time.perf_counter()
for out in stream_generate(
model=model,
tokenizer=tokenizer,
prompt=last_token,
max_tokens=max_tokens,
sampler=sampler,
logits_processors=logits_processors,
prompt_cache=caches,
# TODO: Dynamically change prefill step size to be the maximum possible without timing out.
prefill_step_size=2048,
kv_group_size=KV_GROUP_SIZE,
kv_bits=KV_BITS,
usage: Usage | None = None
in_thinking = False
reasoning_tokens = 0
think_start = tokenizer.think_start
think_end = tokenizer.think_end
for completion_tokens, out in enumerate(
stream_generate(
model=model,
tokenizer=tokenizer,
prompt=last_token,
max_tokens=max_tokens,
sampler=sampler,
logits_processors=logits_processors,
prompt_cache=caches,
# TODO: Dynamically change prefill step size to be the maximum possible without timing out.
prefill_step_size=2048,
kv_group_size=KV_GROUP_SIZE,
kv_bits=KV_BITS,
),
start=1,
):
generated_text_parts.append(out.text)
logger.info(out.text)
accumulated_text += out.text
if think_start is not None and out.text == think_start:
in_thinking = True
elif think_end is not None and out.text == think_end:
in_thinking = False
if in_thinking:
reasoning_tokens += 1
# Check for stop sequences
text = out.text
finish_reason: FinishReason | None = cast(
FinishReason | None, out.finish_reason
)
stop_matched = False
if stop_sequences:
for stop_seq in stop_sequences:
if stop_seq in accumulated_text:
# Trim text to just before the stop sequence
stop_index = accumulated_text.find(stop_seq)
text_before_stop = accumulated_text[:stop_index]
chunk_start = len(accumulated_text) - len(out.text)
text = text_before_stop[chunk_start:]
finish_reason = "stop"
stop_matched = True
break
is_done = finish_reason is not None
stats: GenerationStats | None = None
if out.finish_reason is not None:
if is_done:
stats = GenerationStats(
prompt_tps=float(prefill_tps or out.prompt_tps),
generation_tps=float(out.generation_tps),
prompt_tokens=int(out.prompt_tokens),
prompt_tokens=int(prefill_tokens + out.prompt_tokens),
generation_tokens=int(out.generation_tokens),
peak_memory_usage=Memory.from_gb(out.peak_memory),
)
if out.finish_reason not in get_args(FinishReason):
# We don't throw here as this failure case is really not all that bad
# Just log the error and move on
if not stop_matched and out.finish_reason not in get_args(FinishReason):
logger.warning(
f"Model generated unexpected finish_reason: {out.finish_reason}"
)
usage = Usage(
prompt_tokens=int(out.prompt_tokens),
completion_tokens=completion_tokens,
total_tokens=int(out.prompt_tokens) + completion_tokens,
prompt_tokens_details=PromptTokensDetails(
cached_tokens=prefix_hit_length
),
completion_tokens_details=CompletionTokensDetails(
reasoning_tokens=reasoning_tokens
),
)
# Extract logprobs from the full vocabulary logprobs array
logprob: float | None = None
top_logprobs: list[TopLogprobItem] | None = None
if task.logprobs:
logprob, top_logprobs = extract_top_logprobs(
logprobs=out.logprobs,
tokenizer=tokenizer,
top_k=task.top_logprobs or DEFAULT_TOP_LOGPROBS,
selected_token=out.token,
)
yield GenerationResponse(
text=out.text,
text=text,
token=out.token,
finish_reason=cast(FinishReason | None, out.finish_reason),
logprob=logprob,
top_logprobs=top_logprobs,
finish_reason=finish_reason,
stats=stats,
usage=usage,
)
if out.finish_reason is not None:
if is_done:
# Log generation stats
generation_elapsed = time.perf_counter() - generation_start_time
generated_tokens = len(generated_text_parts)
@@ -275,4 +400,8 @@ def mlx_generate(
kv_prefix_cache.add_kv_cache(full_prompt, caches)
break
# Limit accumulated_text to what's needed for stop sequence detection
if max_stop_len > 0 and len(accumulated_text) > max_stop_len:
accumulated_text = accumulated_text[-max_stop_len:]
# TODO: Do we want an mx_barrier?

View File

@@ -39,10 +39,9 @@ from mlx_lm.utils import load_model
from pydantic import RootModel
from exo.download.download_utils import build_model_path
from exo.shared.types.api import ChatCompletionMessageText
from exo.shared.types.common import Host
from exo.shared.types.memory import Memory
from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import (
BoundInstance,
MlxJacclInstance,
@@ -165,12 +164,11 @@ def mlx_distributed_init(
jaccl_coordinator = jaccl_coordinators[bound_instance.bound_node_id]
# TODO: update once upstream fixes
logger.info(
f"rank {rank} MLX_JACCL_DEVICES: {coordination_file} with devices: {jaccl_devices_json}"
f"rank {rank} MLX_IBV_DEVICES: {coordination_file} with devices: {jaccl_devices_json}"
)
logger.info(f"rank {rank} MLX_JACCL_COORDINATOR: {jaccl_coordinator}")
os.environ["MLX_JACCL_DEVICES"] = coordination_file
os.environ["MLX_IBV_DEVICES"] = coordination_file
os.environ["MLX_RANK"] = str(rank)
os.environ["MLX_JACCL_COORDINATOR"] = jaccl_coordinator
group = mx.distributed.init(backend="jaccl", strict=True)
@@ -259,10 +257,10 @@ def shard_and_load(
logger.info(f"Group size: {group.size()}, group rank: {group.rank()}")
# Estimate timeout based on model size
base_timeout = float(os.environ.get("EXO_MODEL_LOAD_TIMEOUT", "60"))
# Estimate timeout based on model size (5x default for large queued workloads)
base_timeout = float(os.environ.get("EXO_MODEL_LOAD_TIMEOUT", "300"))
model_size_gb = get_weights_size(shard_metadata).in_bytes / (1024**3)
timeout_seconds = base_timeout + model_size_gb / 5
timeout_seconds = base_timeout + model_size_gb
logger.info(
f"Evaluating model parameters with timeout of {timeout_seconds:.0f}s "
f"(model size: {model_size_gb:.1f}GB)"
@@ -339,8 +337,35 @@ def load_tokenizer_for_model_id(
# Kimi uses a custom TikTokenTokenizer that transformers 5.x can't load via AutoTokenizer
if "kimi-k2" in model_id_lower:
import importlib.util
import types
sys.path.insert(0, str(model_path))
from tokenization_kimi import TikTokenTokenizer # type: ignore[import-not-found] # noqa: I001
# Load tool_declaration_ts first (tokenization_kimi imports it with relative import)
tool_decl_path = model_path / "tool_declaration_ts.py"
if tool_decl_path.exists():
spec = importlib.util.spec_from_file_location(
"tool_declaration_ts", tool_decl_path
)
if spec and spec.loader:
tool_decl_module = importlib.util.module_from_spec(spec)
sys.modules["tool_declaration_ts"] = tool_decl_module
spec.loader.exec_module(tool_decl_module)
# Load tokenization_kimi with patched source (convert relative to absolute import)
tok_path = model_path / "tokenization_kimi.py"
source = tok_path.read_text()
source = source.replace("from .tool_declaration_ts", "from tool_declaration_ts")
spec = importlib.util.spec_from_file_location("tokenization_kimi", tok_path)
if spec:
tok_module = types.ModuleType("tokenization_kimi")
tok_module.__file__ = str(tok_path)
sys.modules["tokenization_kimi"] = tok_module
exec(compile(source, tok_path, "exec"), tok_module.__dict__) # noqa: S102
TikTokenTokenizer = tok_module.TikTokenTokenizer # type: ignore[attr-defined] # noqa: N806
else:
from tokenization_kimi import TikTokenTokenizer # type: ignore[import-not-found] # noqa: I001
hf_tokenizer: Any = TikTokenTokenizer.from_pretrained(model_path) # pyright: ignore[reportUnknownVariableType,reportUnknownMemberType]
@@ -363,8 +388,7 @@ def load_tokenizer_for_model_id(
def _normalize_tool_calls(msg_dict: dict[str, Any]) -> None:
"""
Normalize tool_calls in a message dict.
"""Normalize tool_calls in a message dict.
OpenAI format has tool_calls[].function.arguments as a JSON string,
but some chat templates (e.g., GLM) expect it as a dict.
@@ -387,42 +411,47 @@ def _normalize_tool_calls(msg_dict: dict[str, Any]) -> None:
def apply_chat_template(
tokenizer: TokenizerWrapper,
chat_task_data: ChatCompletionTaskParams,
task_params: TextGenerationTaskParams,
) -> str:
messages = chat_task_data.messages
tools = chat_task_data.tools
"""Convert TextGenerationTaskParams to a chat template prompt.
Converts the internal format (input + instructions) to a messages list
that can be processed by the tokenizer's chat template.
When chat_template_messages is available (from Chat Completions API),
uses those directly to preserve tool_calls, thinking, and other fields.
Otherwise builds messages from the task params input/instructions.
"""
formatted_messages: list[dict[str, Any]] = []
for message in messages:
if isinstance(message.content, ChatCompletionMessageText):
message.content = message.content.text
if isinstance(message.content, list):
if len(message.content) == 0:
logger.warning("Received prompt with no content, skipping")
continue
if task_params.chat_template_messages is not None:
# Use pre-formatted messages that preserve tool_calls, thinking, etc.
formatted_messages = list(task_params.chat_template_messages)
for msg in formatted_messages:
_normalize_tool_calls(msg)
else:
# Add system message (instructions) if present
if task_params.instructions:
formatted_messages.append(
{"role": "system", "content": task_params.instructions}
)
message.content = "\n".join(c.text for c in message.content).strip()
if (
message.content is None
and message.thinking is None
and message.tool_calls is None
):
continue
# Null values are not valid when applying templates in tokenizer
dumped: dict[str, Any] = message.model_dump()
msg_dict: dict[str, Any] = {k: v for k, v in dumped.items() if v is not None} # pyright: ignore[reportAny]
# Parse tool_calls arguments from JSON string to dict for templates that expect dicts
_normalize_tool_calls(msg_dict)
formatted_messages.append(msg_dict)
# Convert input to messages
if isinstance(task_params.input, str):
# Simple string input becomes a single user message
formatted_messages.append({"role": "user", "content": task_params.input})
else:
# List of InputMessage
for msg in task_params.input:
if not msg.content:
logger.warning("Received message with empty content, skipping")
continue
formatted_messages.append({"role": msg.role, "content": msg.content})
prompt: str = tokenizer.apply_chat_template(
formatted_messages,
tokenize=False,
add_generation_prompt=True,
tools=tools,
tools=task_params.tools,
)
logger.info(prompt)

View File

@@ -10,7 +10,7 @@ from loguru import logger
from exo.routing.connection_message import ConnectionMessage, ConnectionMessageType
from exo.shared.apply import apply
from exo.shared.models.model_cards import ModelId
from exo.shared.types.api import ImageEditsInternalParams
from exo.shared.types.api import ImageEditsTaskParams
from exo.shared.types.commands import (
ForwarderCommand,
ForwarderDownloadCommand,
@@ -244,7 +244,7 @@ class Worker:
command_id=task.command_id,
instance_id=task.instance_id,
task_status=task.task_status,
task_params=ImageEditsInternalParams(
task_params=ImageEditsTaskParams(
image_data=assembled,
total_input_chunks=task.task_params.total_input_chunks,
prompt=task.task_params.prompt,

View File

@@ -4,7 +4,6 @@ from collections.abc import Mapping, Sequence
from exo.shared.types.common import CommandId, NodeId
from exo.shared.types.tasks import (
ChatCompletion,
ConnectToGroup,
CreateRunner,
DownloadModel,
@@ -16,6 +15,7 @@ from exo.shared.types.tasks import (
Task,
TaskId,
TaskStatus,
TextGeneration,
)
from exo.shared.types.worker.downloads import (
DownloadCompleted,
@@ -275,7 +275,7 @@ def _pending_tasks(
for task in tasks.values():
# for now, just forward chat completions
# TODO(ciaran): do this better!
if not isinstance(task, (ChatCompletion, ImageGeneration, ImageEdits)):
if not isinstance(task, (TextGeneration, ImageGeneration, ImageEdits)):
continue
if task.task_status not in (TaskStatus.Pending, TaskStatus.Running):
continue

View File

@@ -18,7 +18,7 @@ from pydantic import ValidationError
from exo.shared.constants import EXO_MAX_CHUNK_SIZE
from exo.shared.models.model_cards import ModelId, ModelTask
from exo.shared.types.api import ChatCompletionMessageText, ImageGenerationStats
from exo.shared.types.api import ImageGenerationStats
from exo.shared.types.chunks import ErrorChunk, ImageChunk, TokenChunk, ToolCallChunk
from exo.shared.types.common import CommandId
from exo.shared.types.events import (
@@ -29,7 +29,6 @@ from exo.shared.types.events import (
TaskStatusUpdated,
)
from exo.shared.types.tasks import (
ChatCompletion,
ConnectToGroup,
ImageEdits,
ImageGeneration,
@@ -37,8 +36,11 @@ from exo.shared.types.tasks import (
Shutdown,
StartWarmup,
Task,
TaskId,
TaskStatus,
TextGeneration,
)
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import BoundInstance
from exo.shared.types.worker.runner_response import (
GenerationResponse,
@@ -111,8 +113,12 @@ def main(
event_sender.send(
RunnerStatusUpdated(runner_id=runner_id, runner_status=current_status)
)
seen = set[TaskId]()
with task_receiver as tasks:
for task in tasks:
if task.task_id in seen:
logger.warning("repeat task - potential error")
seen.add(task.task_id)
event_sender.send(
TaskStatusUpdated(task_id=task.task_id, task_status=TaskStatus.Running)
)
@@ -163,7 +169,7 @@ def main(
logger.info(
f"model has_tool_calling={tokenizer.has_tool_calling}"
)
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache = KVPrefixCache(tokenizer, group)
elif (
ModelTask.TextToImage in shard_metadata.model_card.tasks
@@ -214,7 +220,7 @@ def main(
current_status = RunnerReady()
logger.info("runner ready")
case ChatCompletion(task_params=task_params, command_id=command_id) if (
case TextGeneration(task_params=task_params, command_id=command_id) if (
isinstance(current_status, RunnerReady)
):
logger.info(f"received chat request: {task}")
@@ -227,10 +233,9 @@ def main(
)
assert model and not isinstance(model, DistributedImageModel)
assert tokenizer
assert task_params.messages[0].content is not None
try:
_check_for_debug_prompts(task_params.messages[0].content)
_check_for_debug_prompts(task_params)
# Build prompt once - used for both generation and thinking detection
prompt = apply_chat_template(tokenizer, task_params)
@@ -277,9 +282,11 @@ def main(
tokenizer.tool_parser, # pyright: ignore[reportAny]
)
completion_tokens = 0
for response in mlx_generator:
match response:
case GenerationResponse():
completion_tokens += 1
if (
device_rank == 0
and response.finish_reason == "error"
@@ -307,8 +314,11 @@ def main(
model=shard_metadata.model_card.model_id,
text=response.text,
token_id=response.token,
usage=response.usage,
finish_reason=response.finish_reason,
stats=response.stats,
logprob=response.logprob,
top_logprobs=response.top_logprobs,
),
)
)
@@ -320,6 +330,7 @@ def main(
chunk=ToolCallChunk(
tool_calls=response.tool_calls,
model=shard_metadata.model_card.model_id,
usage=response.usage,
),
)
)
@@ -535,10 +546,10 @@ def parse_gpt_oss(
name=current_tool_name,
arguments="".join(tool_arg_parts).strip(),
)
]
],
usage=response.usage,
)
tool_arg_parts = []
break
current_tool_name = recipient
# If inside a tool call, accumulate arguments
@@ -684,7 +695,7 @@ def parse_tool_calls(
tools = [_validate_single_tool(tool) for tool in parsed]
else:
tools = [_validate_single_tool(parsed)]
yield ToolCallResponse(tool_calls=tools)
yield ToolCallResponse(tool_calls=tools, usage=response.usage)
except (
json.JSONDecodeError,
@@ -853,17 +864,23 @@ EXO_RUNNER_MUST_OOM = "EXO RUNNER MUST OOM"
EXO_RUNNER_MUST_TIMEOUT = "EXO RUNNER MUST TIMEOUT"
def _check_for_debug_prompts(
prompt: str | ChatCompletionMessageText | list[ChatCompletionMessageText],
):
if isinstance(prompt, list):
if len(prompt) == 0:
logger.debug("Empty message prompt received in debug prompt")
return
prompt = prompt[0]
def _check_for_debug_prompts(task_params: TextGenerationTaskParams) -> None:
"""Check for debug prompt triggers in the input.
if isinstance(prompt, ChatCompletionMessageText):
prompt = prompt.text
Extracts the first user input text and checks for debug triggers.
"""
prompt: str
if isinstance(task_params.input, str):
prompt = task_params.input
else:
# List of InputMessage - get first message content
if len(task_params.input) == 0:
logger.debug("Empty message list in debug prompt check")
return
prompt = task_params.input[0].content
if not prompt:
return
if EXO_RUNNER_MUST_FAIL in prompt:
logger.info("raising exception")

View File

@@ -127,20 +127,25 @@ class RunnerSupervisor:
self._tg.cancel_scope.cancel()
async def start_task(self, task: Task):
if task.task_id in self.pending:
logger.warning(
f"Skipping invalid task {task} as it has already been submitted"
)
return
if task.task_id in self.completed:
logger.info(
logger.warning(
f"Skipping invalid task {task} as it has already been completed"
)
return
logger.info(f"Starting task {task}")
event = anyio.Event()
self.pending[task.task_id] = event
try:
self._task_sender.send(task)
await self._task_sender.send_async(task)
except ClosedResourceError:
logger.warning(f"Task {task} dropped, runner closed communication.")
return
await event.wait()
logger.info(f"Finished task {task}")
async def _forward_events(self):
with self._ev_recv as events:

View File

@@ -12,10 +12,9 @@ import mlx.nn as nn
from exo.shared.constants import EXO_MODELS_DIR
from exo.shared.models.model_cards import ModelCard, ModelTask
from exo.shared.types.api import ChatCompletionMessage
from exo.shared.types.common import ModelId
from exo.shared.types.memory import Memory
from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.shards import PipelineShardMetadata, TensorShardMetadata
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.generator.generate import mlx_generate
@@ -113,10 +112,10 @@ def run_gpt_oss_pipeline_device(
tokens = tokens[:prompt_tokens]
prompt_text = tokenizer.decode(tokens)
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content=prompt_text)],
max_tokens=max_tokens,
input=prompt_text,
max_output_tokens=max_tokens,
)
prompt = apply_chat_template(tokenizer, task)
@@ -181,10 +180,10 @@ def run_gpt_oss_tensor_parallel_device(
tokens = tokens[:prompt_tokens]
prompt_text = tokenizer.decode(tokens)
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content=prompt_text)],
max_tokens=max_tokens,
input=prompt_text,
max_output_tokens=max_tokens,
)
prompt = apply_chat_template(tokenizer, task)

View File

@@ -8,15 +8,14 @@ import pytest
from mlx_lm.models.cache import KVCache
from mlx_lm.sample_utils import make_sampler
from exo.shared.types.api import ChatCompletionMessage
from exo.shared.types.common import ModelId
from exo.shared.types.tasks import ChatCompletionTaskParams
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
from exo.worker.engines.mlx import Model
from exo.worker.engines.mlx.cache import (
KVPrefixCache,
_cache_length,
_get_prefix_length,
cache_length,
encode_prompt,
get_prefix_length,
make_kv_cache,
)
from exo.worker.engines.mlx.generator.generate import mlx_generate, prefill
@@ -35,47 +34,47 @@ class TestGetPrefixLength:
def test_identical_arrays(self):
a = mx.array([1, 2, 3, 4, 5])
b = mx.array([1, 2, 3, 4, 5])
assert _get_prefix_length(a, b) == 5
assert get_prefix_length(a, b) == 5
def test_no_common_prefix(self):
a = mx.array([1, 2, 3])
b = mx.array([4, 5, 6])
assert _get_prefix_length(a, b) == 0
assert get_prefix_length(a, b) == 0
def test_partial_prefix(self):
a = mx.array([1, 2, 3, 4, 5])
b = mx.array([1, 2, 3, 7, 8])
assert _get_prefix_length(a, b) == 3
assert get_prefix_length(a, b) == 3
def test_prompt_longer_than_cached(self):
a = mx.array([1, 2, 3, 4, 5])
b = mx.array([1, 2, 3])
assert _get_prefix_length(a, b) == 3
assert get_prefix_length(a, b) == 3
def test_cached_longer_than_prompt(self):
a = mx.array([1, 2, 3])
b = mx.array([1, 2, 3, 4, 5])
assert _get_prefix_length(a, b) == 3
assert get_prefix_length(a, b) == 3
def test_single_token_match(self):
a = mx.array([1, 2, 3])
b = mx.array([1, 5, 6])
assert _get_prefix_length(a, b) == 1
assert get_prefix_length(a, b) == 1
def test_empty_prompt(self):
a = mx.array([]).astype(mx.int32)
b = mx.array([1, 2, 3])
assert _get_prefix_length(a, b) == 0
assert get_prefix_length(a, b) == 0
def test_empty_cached(self):
a = mx.array([1, 2, 3])
b = mx.array([]).astype(mx.int32)
assert _get_prefix_length(a, b) == 0
assert get_prefix_length(a, b) == 0
def test_both_empty(self):
a = mx.array([]).astype(mx.int32)
b = mx.array([]).astype(mx.int32)
assert _get_prefix_length(a, b) == 0
assert get_prefix_length(a, b) == 0
class TestKVPrefix:
@@ -134,10 +133,10 @@ class TestKVPrefixCacheWithModel:
def test_prefill_populates_cache(self, model_and_tokenizer):
model, tokenizer = model_and_tokenizer
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Hello!!")],
max_tokens=1,
input=[InputMessage(role="user", content="Hello!!")],
max_output_tokens=1,
)
prompt = apply_chat_template(tokenizer, task)
tokens = encode_prompt(tokenizer, prompt)
@@ -146,15 +145,15 @@ class TestKVPrefixCacheWithModel:
prefill(model, tokenizer, make_sampler(0.0), tokens, cache)
# Cache should now hold the prompt tokens
assert _cache_length(cache) == len(tokens)
assert cache_length(cache) == len(tokens)
def test_add_and_get_exact_match(self, model_and_tokenizer):
model, tokenizer = model_and_tokenizer
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Test exact")],
max_tokens=1,
input=[InputMessage(role="user", content="Test exact")],
max_output_tokens=1,
)
prompt = apply_chat_template(tokenizer, task)
tokens = encode_prompt(tokenizer, prompt)
@@ -166,7 +165,7 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache.add_kv_cache(prompt, cache)
assert len(kv_prefix_cache.prompts) == 1
stored_length = _cache_length(kv_prefix_cache.caches[0])
stored_length = cache_length(kv_prefix_cache.caches[0])
assert stored_length > 0
# Retrieve with same prompt: exact match
@@ -183,10 +182,10 @@ class TestKVPrefixCacheWithModel:
"""get_kv_cache with a longer prompt sharing prefix should return partial match."""
model, tokenizer = model_and_tokenizer
short_task = ChatCompletionTaskParams(
short_task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Hi")],
max_tokens=1,
input=[InputMessage(role="user", content="Hi")],
max_output_tokens=1,
)
short_prompt = apply_chat_template(tokenizer, short_task)
short_tokens = encode_prompt(tokenizer, short_prompt)
@@ -198,18 +197,16 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache.add_kv_cache(short_prompt, cache)
# Query with longer prompt that shares the chat template prefix
long_task = ChatCompletionTaskParams(
long_task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[
ChatCompletionMessage(role="user", content="Hi there, how are you?")
],
max_tokens=1,
input=[InputMessage(role="user", content="Hi there, how are you?")],
max_output_tokens=1,
)
long_prompt = apply_chat_template(tokenizer, long_task)
long_tokens = encode_prompt(tokenizer, long_prompt)
# The prompts share a prefix (chat template preamble + "Hi")
expected_prefix = _get_prefix_length(long_tokens, short_tokens)
expected_prefix = get_prefix_length(long_tokens, short_tokens)
assert expected_prefix > 0, (
"Prompts should share a prefix from the chat template"
)
@@ -229,10 +226,10 @@ class TestKVPrefixCacheWithModel:
"""Getting a cache and then mutating it (as generation does) must not corrupt stored cache."""
model, tokenizer = model_and_tokenizer
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Mutation test")],
max_tokens=1,
input=[InputMessage(role="user", content="Mutation test")],
max_output_tokens=1,
)
prompt = apply_chat_template(tokenizer, task)
tokens = encode_prompt(tokenizer, prompt)
@@ -243,7 +240,7 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
stored_length = _cache_length(kv_prefix_cache.caches[0])
stored_length = cache_length(kv_prefix_cache.caches[0])
# Get cache and mutate it (simulating what generation does)
result_cache, _, matched_index = kv_prefix_cache.get_kv_cache(model, prompt)
@@ -259,7 +256,7 @@ class TestKVPrefixCacheWithModel:
mx.eval([c.keys for c in result_cache])
# Stored cache must be unchanged
assert _cache_length(kv_prefix_cache.caches[0]) == stored_length
assert cache_length(kv_prefix_cache.caches[0]) == stored_length
def test_stored_cache_survives_repeated_get_mutate_cycles(
self, model_and_tokenizer
@@ -267,10 +264,10 @@ class TestKVPrefixCacheWithModel:
"""Multiple get+mutate cycles (like repeated user requests) must not corrupt cache."""
model, tokenizer = model_and_tokenizer
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Repeat test")],
max_tokens=1,
input=[InputMessage(role="user", content="Repeat test")],
max_output_tokens=1,
)
prompt = apply_chat_template(tokenizer, task)
tokens = encode_prompt(tokenizer, prompt)
@@ -281,7 +278,7 @@ class TestKVPrefixCacheWithModel:
kv_prefix_cache = KVPrefixCache(tokenizer)
kv_prefix_cache.add_kv_cache(prompt, cache)
stored_length = _cache_length(kv_prefix_cache.caches[0])
stored_length = cache_length(kv_prefix_cache.caches[0])
for i in range(3):
result_cache, _, _ = kv_prefix_cache.get_kv_cache(model, prompt)
@@ -293,7 +290,7 @@ class TestKVPrefixCacheWithModel:
layer_cache.update_and_fetch(extra, extra)
mx.eval([c.keys for c in result_cache])
assert _cache_length(kv_prefix_cache.caches[0]) == stored_length, (
assert cache_length(kv_prefix_cache.caches[0]) == stored_length, (
f"Failed on loop {i}"
)
@@ -302,10 +299,10 @@ class TestKVPrefixCacheWithModel:
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache(tokenizer)
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Hello")],
max_tokens=5,
input=[InputMessage(role="user", content="Hello")],
max_output_tokens=5,
)
prompt = apply_chat_template(tokenizer, task)
prompt_tokens = encode_prompt(tokenizer, prompt)
@@ -325,17 +322,17 @@ class TestKVPrefixCacheWithModel:
assert len(kv_prefix_cache.caches) == 1
# Cache should contain prompt + generated tokens
expected_length = len(prompt_tokens) + generated_tokens
assert _cache_length(kv_prefix_cache.caches[0]) == expected_length
assert cache_length(kv_prefix_cache.caches[0]) == expected_length
def test_mlx_generate_second_call_gets_prefix_hit(self, model_and_tokenizer):
"""Second mlx_generate call with same prompt should get a prefix hit from stored cache."""
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache(tokenizer)
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Reuse test")],
max_tokens=5,
input=[InputMessage(role="user", content="Reuse test")],
max_output_tokens=5,
)
prompt = apply_chat_template(tokenizer, task)
prompt_tokens = encode_prompt(tokenizer, prompt)
@@ -376,10 +373,10 @@ class TestKVPrefixCacheWithModel:
repeats = (1200 // len(base_tokens)) + 2
long_content = base_text * repeats
task1 = ChatCompletionTaskParams(
task1 = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content=long_content)],
max_tokens=5,
input=[InputMessage(role="user", content=long_content)],
max_output_tokens=5,
)
prompt1 = apply_chat_template(tokenizer, task1)
prompt1_tokens = encode_prompt(tokenizer, prompt1)
@@ -400,23 +397,23 @@ class TestKVPrefixCacheWithModel:
first_gen_time = time.perf_counter() - t0
assert len(kv_prefix_cache.prompts) == 1
first_cache_length = _cache_length(kv_prefix_cache.caches[0])
first_cache_length = cache_length(kv_prefix_cache.caches[0])
# Second generation: same long prompt + extra content (simulating multi-turn)
task2 = ChatCompletionTaskParams(
task2 = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[
ChatCompletionMessage(role="user", content=long_content),
ChatCompletionMessage(role="assistant", content="Sure, I can help."),
ChatCompletionMessage(role="user", content="Tell me more."),
input=[
InputMessage(role="user", content=long_content),
InputMessage(role="assistant", content="Sure, I can help."),
InputMessage(role="user", content="Tell me more."),
],
max_tokens=5,
max_output_tokens=5,
)
prompt2 = apply_chat_template(tokenizer, task2)
prompt2_tokens = encode_prompt(tokenizer, prompt2)
# Verify the prompts share a long prefix
prefix_len = _get_prefix_length(prompt2_tokens, prompt1_tokens)
prefix_len = get_prefix_length(prompt2_tokens, prompt1_tokens)
assert prefix_len > 1000, "Prompts must share > 1000 token prefix"
# Second generation should reuse the cached prefix (only prefill new tokens)
@@ -440,7 +437,7 @@ class TestKVPrefixCacheWithModel:
# With prefix_hit > 1000, should update in-place (not add a second entry)
assert len(kv_prefix_cache.prompts) == 1
# Updated cache should be longer (prompt2 + generated > prompt1 + generated)
updated_cache_length = _cache_length(kv_prefix_cache.caches[0])
updated_cache_length = cache_length(kv_prefix_cache.caches[0])
assert updated_cache_length > first_cache_length
def test_mlx_generate_stored_cache_not_mutated(self, model_and_tokenizer):
@@ -448,10 +445,10 @@ class TestKVPrefixCacheWithModel:
model, tokenizer = model_and_tokenizer
kv_prefix_cache = KVPrefixCache(tokenizer)
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="Immutable test")],
max_tokens=5,
input=[InputMessage(role="user", content="Immutable test")],
max_output_tokens=5,
)
prompt = apply_chat_template(tokenizer, task)
@@ -465,7 +462,7 @@ class TestKVPrefixCacheWithModel:
):
pass
first_cache_length = _cache_length(kv_prefix_cache.caches[0])
firstcache_length = cache_length(kv_prefix_cache.caches[0])
# Second generation gets the cache and mutates it during generation
for _response in mlx_generate(
@@ -478,7 +475,7 @@ class TestKVPrefixCacheWithModel:
pass
# The first stored cache must not have been mutated by the second generation
assert _cache_length(kv_prefix_cache.caches[0]) == first_cache_length
assert cache_length(kv_prefix_cache.caches[0]) == firstcache_length
def test_evicts_lru_entry_under_memory_pressure(self, model_and_tokenizer):
"""Under memory pressure, adding a new cache entry evicts the least recently used one."""
@@ -489,10 +486,10 @@ class TestKVPrefixCacheWithModel:
# Add three cache entries with different prompts
prompts = ["First entry", "Second entry", "Third entry"]
for i, content in enumerate(prompts):
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content=content)],
max_tokens=1,
input=[InputMessage(role="user", content=content)],
max_output_tokens=1,
)
prompt = apply_chat_template(tokenizer, task)
tokens = encode_prompt(tokenizer, prompt)
@@ -523,10 +520,10 @@ class TestKVPrefixCacheWithModel:
),
):
# Trigger eviction by adding a new entry
task = ChatCompletionTaskParams(
task = TextGenerationTaskParams(
model=DEFAULT_GPT_OSS_MODEL_ID,
messages=[ChatCompletionMessage(role="user", content="New entry")],
max_tokens=1,
input=[InputMessage(role="user", content="New entry")],
max_output_tokens=1,
)
prompt = apply_chat_template(tokenizer, task)
tokens = encode_prompt(tokenizer, prompt)
@@ -540,6 +537,6 @@ class TestKVPrefixCacheWithModel:
assert len(kv_prefix_cache.prompts) == 1
# The surviving entry should be the newly added one
new_tokens = encode_prompt(tokenizer, prompt)
assert _get_prefix_length(kv_prefix_cache.prompts[0], new_tokens) == len(
assert get_prefix_length(kv_prefix_cache.prompts[0], new_tokens) == len(
new_tokens
)

View File

@@ -1,8 +1,8 @@
from typing import cast
import exo.worker.plan as plan_mod
from exo.shared.types.api import ChatCompletionTaskParams
from exo.shared.types.tasks import ChatCompletion, Task, TaskId, TaskStatus
from exo.shared.types.tasks import Task, TaskId, TaskStatus, TextGeneration
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import BoundInstance, InstanceId
from exo.shared.types.worker.runners import (
RunnerIdle,
@@ -29,7 +29,7 @@ from exo.worker.tests.unittests.conftest import (
def test_plan_forwards_pending_chat_completion_when_runner_ready():
"""
When there is a pending ChatCompletion for the local instance and all
When there is a pending TextGeneration for the local instance and all
runners are Ready/Running, plan() should forward that task.
"""
shard0 = get_pipeline_shard_metadata(MODEL_A_ID, device_rank=0, world_size=2)
@@ -54,12 +54,12 @@ def test_plan_forwards_pending_chat_completion_when_runner_ready():
RUNNER_2_ID: RunnerReady(),
}
task = ChatCompletion(
task = TextGeneration(
task_id=TASK_1_ID,
instance_id=INSTANCE_1_ID,
task_status=TaskStatus.Pending,
command_id=COMMAND_1_ID,
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
task_params=TextGenerationTaskParams(model=MODEL_A_ID, input=""),
)
result = plan_mod.plan(
@@ -76,7 +76,7 @@ def test_plan_forwards_pending_chat_completion_when_runner_ready():
def test_plan_does_not_forward_chat_completion_if_any_runner_not_ready():
"""
Even with a pending ChatCompletion, plan() should not forward it unless
Even with a pending TextGeneration, plan() should not forward it unless
all runners for the instance are Ready/Running.
"""
shard1 = get_pipeline_shard_metadata(MODEL_A_ID, device_rank=0, world_size=2)
@@ -101,12 +101,12 @@ def test_plan_does_not_forward_chat_completion_if_any_runner_not_ready():
RUNNER_2_ID: RunnerIdle(),
}
task = ChatCompletion(
task = TextGeneration(
task_id=TASK_1_ID,
instance_id=INSTANCE_1_ID,
task_status=TaskStatus.Pending,
command_id=COMMAND_1_ID,
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
task_params=TextGenerationTaskParams(model=MODEL_A_ID, input=""),
)
result = plan_mod.plan(
@@ -123,7 +123,7 @@ def test_plan_does_not_forward_chat_completion_if_any_runner_not_ready():
def test_plan_does_not_forward_tasks_for_other_instances():
"""
plan() should ignore pending ChatCompletion tasks whose instance_id does
plan() should ignore pending TextGeneration tasks whose instance_id does
not match the local instance.
"""
shard = get_pipeline_shard_metadata(model_id=MODEL_A_ID, device_rank=0)
@@ -145,12 +145,12 @@ def test_plan_does_not_forward_tasks_for_other_instances():
all_runners = {RUNNER_1_ID: RunnerReady()}
other_instance_id = InstanceId("instance-2")
foreign_task = ChatCompletion(
foreign_task = TextGeneration(
task_id=TaskId("other-task"),
instance_id=other_instance_id,
task_status=TaskStatus.Pending,
command_id=COMMAND_1_ID,
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
task_params=TextGenerationTaskParams(model=MODEL_A_ID, input=""),
)
result = plan_mod.plan(
@@ -167,7 +167,7 @@ def test_plan_does_not_forward_tasks_for_other_instances():
def test_plan_ignores_non_pending_or_non_chat_tasks():
"""
_pending_tasks should not forward tasks that are either not ChatCompletion
_pending_tasks should not forward tasks that are either not TextGeneration
or not in Pending/Running states.
"""
shard0 = get_pipeline_shard_metadata(MODEL_A_ID, device_rank=0, world_size=2)
@@ -193,12 +193,12 @@ def test_plan_ignores_non_pending_or_non_chat_tasks():
RUNNER_2_ID: RunnerReady(),
}
completed_task = ChatCompletion(
completed_task = TextGeneration(
task_id=TASK_1_ID,
instance_id=INSTANCE_1_ID,
task_status=TaskStatus.Complete,
command_id=COMMAND_1_ID,
task_params=ChatCompletionTaskParams(model=MODEL_A_ID, messages=[]),
task_params=TextGenerationTaskParams(model=MODEL_A_ID, input=""),
)
other_task_id = TaskId("other-task")

View File

@@ -5,7 +5,6 @@ from typing import Callable
import pytest
import exo.worker.runner.runner as mlx_runner
from exo.shared.types.api import ChatCompletionMessage
from exo.shared.types.chunks import TokenChunk
from exo.shared.types.events import (
ChunkGenerated,
@@ -15,15 +14,15 @@ from exo.shared.types.events import (
TaskStatusUpdated,
)
from exo.shared.types.tasks import (
ChatCompletion,
ChatCompletionTaskParams,
ConnectToGroup,
LoadModel,
Shutdown,
StartWarmup,
Task,
TaskStatus,
TextGeneration,
)
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.runner_response import GenerationResponse
from exo.shared.types.worker.runners import (
RunnerConnected,
@@ -85,15 +84,15 @@ SHUTDOWN_TASK = Shutdown(
runner_id=RUNNER_1_ID,
)
CHAT_PARAMS = ChatCompletionTaskParams(
model=str(MODEL_A_ID),
messages=[ChatCompletionMessage(role="user", content="hello")],
CHAT_PARAMS = TextGenerationTaskParams(
model=MODEL_A_ID,
input="hello",
stream=True,
max_tokens=4,
max_output_tokens=4,
temperature=0.0,
)
CHAT_TASK = ChatCompletion(
CHAT_TASK = TextGeneration(
task_id=CHAT_COMPLETION_TASK_ID,
command_id=COMMAND_1_ID,
task_params=CHAT_PARAMS,
@@ -109,8 +108,8 @@ def assert_events_equal(test_events: Iterable[Event], true_events: Iterable[Even
@pytest.fixture
def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
# initialize_mlx returns a "group" equal to 1
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(1))
# initialize_mlx returns a mock group
monkeypatch.setattr(mlx_runner, "initialize_mlx", make_nothin(MockGroup()))
monkeypatch.setattr(mlx_runner, "load_mlx_items", make_nothin((1, MockTokenizer)))
monkeypatch.setattr(mlx_runner, "warmup_inference", make_nothin(1))
monkeypatch.setattr(mlx_runner, "_check_for_debug_prompts", nothin)
@@ -120,7 +119,7 @@ def patch_out_mlx(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(mlx_runner, "detect_thinking_prompt_suffix", make_nothin(False))
def fake_generate(*_1: object, **_2: object):
yield GenerationResponse(token=0, text="hi", finish_reason="stop")
yield GenerationResponse(token=0, text="hi", finish_reason="stop", usage=None)
monkeypatch.setattr(mlx_runner, "mlx_generate", fake_generate)
@@ -147,6 +146,14 @@ class MockTokenizer:
has_tool_calling = False
class MockGroup:
def rank(self) -> int:
return 0
def size(self) -> int:
return 1
def _run(tasks: Iterable[Task]):
bound_instance = get_bound_mlx_ring_instance(
instance_id=INSTANCE_1_ID,
@@ -182,6 +189,8 @@ def test_events_processed_in_correct_order(patch_out_mlx: pytest.MonkeyPatch):
text="hi",
token_id=0,
finish_reason="stop",
usage=None,
stats=None,
),
)

View File

@@ -0,0 +1,110 @@
"""Tests for GLM tool call argument parsing regex."""
import regex as re
# Replicate the regex patterns from runner.py to test them in isolation
_func_name_regex = re.compile(r"^(.*?)<arg_key>", re.DOTALL)
_func_arg_regex = re.compile(
r"<arg_key>(.*?)</arg_key>(?:\n|\s)*<arg_value>(.*?)(?:</arg_value>|(?=<arg_key>)|$)",
re.DOTALL,
)
def _parse_args(text: str) -> list[tuple[str, str]]:
"""Extract (key, value) pairs from GLM tool call text."""
pairs = _func_arg_regex.findall(text)
return [(k.strip(), v.strip()) for k, v in pairs] # pyright: ignore[reportAny]
def _parse_func_name(text: str) -> str:
"""Extract function name from GLM tool call text."""
match = _func_name_regex.search(text)
if match is None:
raise ValueError(f"Could not parse function name: {text!r}")
return match.group(1).strip()
class TestGlmToolParsingWithClosingTags:
"""Tests for normal format with closing tags present."""
def test_single_argument(self):
text = (
"get_weather<arg_key>location</arg_key><arg_value>San Francisco</arg_value>"
)
assert _parse_func_name(text) == "get_weather"
pairs = _parse_args(text)
assert pairs == [("location", "San Francisco")]
def test_multiple_arguments(self):
text = (
"search<arg_key>query</arg_key><arg_value>python</arg_value>"
"<arg_key>limit</arg_key><arg_value>10</arg_value>"
)
assert _parse_func_name(text) == "search"
pairs = _parse_args(text)
assert pairs == [("query", "python"), ("limit", "10")]
def test_arguments_with_whitespace_between(self):
text = (
"fn<arg_key>a</arg_key>\n<arg_value>1</arg_value>\n"
"<arg_key>b</arg_key> <arg_value>2</arg_value>"
)
pairs = _parse_args(text)
assert pairs == [("a", "1"), ("b", "2")]
class TestGlmToolParsingMissingClosingTags:
"""Tests for format where </arg_value> closing tags are missing."""
def test_single_argument_no_closing(self):
text = "get_weather<arg_key>location</arg_key><arg_value>San Francisco"
assert _parse_func_name(text) == "get_weather"
pairs = _parse_args(text)
assert pairs == [("location", "San Francisco")]
def test_multiple_arguments_no_closing(self):
text = (
"search<arg_key>query</arg_key><arg_value>python"
"<arg_key>limit</arg_key><arg_value>10"
)
assert _parse_func_name(text) == "search"
pairs = _parse_args(text)
assert pairs == [("query", "python"), ("limit", "10")]
def test_mixed_closing_tags(self):
"""First arg has closing tag, second does not."""
text = (
"fn<arg_key>a</arg_key><arg_value>1</arg_value>"
"<arg_key>b</arg_key><arg_value>2"
)
pairs = _parse_args(text)
assert pairs == [("a", "1"), ("b", "2")]
def test_value_with_trailing_whitespace(self):
text = "fn<arg_key>x</arg_key><arg_value>hello world \n"
pairs = _parse_args(text)
assert pairs == [("x", "hello world")]
def test_value_with_newlines_no_closing(self):
text = "fn<arg_key>data</arg_key><arg_value>line1\nline2"
pairs = _parse_args(text)
assert pairs == [("data", "line1\nline2")]
class TestGlmToolParsingEdgeCases:
"""Edge case tests for GLM tool call parsing."""
def test_empty_value_with_closing(self):
text = "fn<arg_key>empty</arg_key><arg_value></arg_value>"
pairs = _parse_args(text)
assert pairs == [("empty", "")]
def test_value_with_json_content(self):
text = 'fn<arg_key>data</arg_key><arg_value>{"key": "value"}</arg_value>'
pairs = _parse_args(text)
assert pairs == [("data", '{"key": "value"}')]
def test_value_with_json_no_closing(self):
text = 'fn<arg_key>data</arg_key><arg_value>{"key": "value"}'
pairs = _parse_args(text)
assert pairs == [("data", '{"key": "value"}')]

View File

@@ -0,0 +1,87 @@
"""Tests for parse_tool_calls generator, especially unclosed tool call handling."""
from collections.abc import Generator
from typing import Any
from exo.shared.types.worker.runner_response import GenerationResponse, ToolCallResponse
from exo.worker.runner.runner import parse_tool_calls
def _make_responses(
texts: list[str],
finish_on_last: bool = True,
) -> Generator[GenerationResponse | ToolCallResponse]:
"""Create a sequence of GenerationResponses from text strings."""
for i, text in enumerate(texts):
is_last = i == len(texts) - 1
yield GenerationResponse(
text=text,
token=i,
finish_reason="stop" if (is_last and finish_on_last) else None,
usage=None,
)
def _dummy_parser(text: str) -> dict[str, Any]:
return {"name": "test_fn", "arguments": {"arg": text}}
class TestParseToolCalls:
"""Tests for parse_tool_calls generator."""
def test_closed_tool_call_works_normally(self):
"""Normal tool call flow should not be affected."""
texts = ["<tool_call>", "test_fn", "</tool_call>"]
results = list(
parse_tool_calls(
_make_responses(texts, finish_on_last=False),
"<tool_call>",
"</tool_call>",
_dummy_parser,
)
)
assert len(results) == 1
assert isinstance(results[0], ToolCallResponse)
def test_no_tool_call_passes_through(self):
"""Responses without tool calls should pass through unchanged."""
texts = ["Hello", " world"]
results = list(
parse_tool_calls(
_make_responses(texts),
"<tool_call>",
"</tool_call>",
_dummy_parser,
)
)
assert len(results) == 2
assert all(isinstance(r, GenerationResponse) for r in results)
r0 = results[0]
r1 = results[1]
assert isinstance(r0, GenerationResponse)
assert isinstance(r1, GenerationResponse)
assert r0.text == "Hello"
assert r1.text == " world"
assert r1.finish_reason == "stop"
def test_failed_parse_yields_text(self):
"""When tool call parsing fails, the text should be yielded as-is."""
def _failing_parser(text: str) -> dict[str, Any]:
raise ValueError("parse failed")
texts = ["<tool_call>", "bad content", "</tool_call>"]
results = list(
parse_tool_calls(
_make_responses(texts, finish_on_last=False),
"<tool_call>",
"</tool_call>",
_failing_parser,
)
)
assert len(results) == 1
assert isinstance(results[0], GenerationResponse)
assert results[0].text == "<tool_call>bad content</tool_call>"

View File

@@ -17,18 +17,18 @@ from exo.download.impl_shard_downloader import (
)
from exo.shared.logging import InterceptLogger, logger_setup
from exo.shared.models.model_cards import MODEL_CARDS, ModelId
from exo.shared.types.api import ChatCompletionMessage, ChatCompletionTaskParams
from exo.shared.types.commands import CommandId
from exo.shared.types.common import Host, NodeId
from exo.shared.types.events import Event
from exo.shared.types.tasks import (
ChatCompletion,
ConnectToGroup,
LoadModel,
Shutdown,
StartWarmup,
Task,
TextGeneration,
)
from exo.shared.types.text_generation import TextGenerationTaskParams
from exo.shared.types.worker.instances import (
BoundInstance,
Instance,
@@ -179,17 +179,11 @@ async def execute_test(test: Tests, instance: Instance, hn: str):
case "inference":
send.send(StartWarmup(instance_id=iid))
send.send(
ChatCompletion(
task_params=ChatCompletionTaskParams(
TextGeneration(
task_params=TextGenerationTaskParams(
model=test.model_id,
messages=[
ChatCompletionMessage(
role="system", content="You are a helpful assistant"
),
ChatCompletionMessage(
role="user", content="What is the capital of France?"
),
],
instructions="You are a helpful assistant",
input="What is the capital of France?",
),
command_id=CommandId("yo"),
instance_id=iid,

View File

@@ -11,7 +11,6 @@ if [[ $# -lt 2 ]]; then
exit 1
fi
kind=$1
shift
@@ -31,14 +30,14 @@ for name in "${hostnames[@]}"; do
weaved+=("$name" "$ip")
done
devs_raw=$(printf "[\"%s\", \"%s\"], " "${weaved[@]}")
devs_raw=$(printf '["%s", "%s"], ' "${weaved[@]}")
devs="[${devs_raw%, }]"
model_ids=("qwen3-30b" "gpt-oss-120b-MXFP4-Q8" "kimi-k2-thinking")
for model_id in "${model_ids[@]}"; do
for i in "${!ips[@]}"; do
{
for i in "${!ips[@]}"; do
{
req="{
\"model_id\": \"${model_id}\",
\"devs\": ${devs},
@@ -48,9 +47,8 @@ for model_id in "${model_ids[@]}"; do
curl -sN \
-X POST "http://${ips[$i]}:52415/${kind}" \
-H "Content-Type: application/json" -d "$req" \
2>&1 | sed "s/^/\n${hostnames[$i]}@${ips[$i]}: /" || echo "curl to ${hostnames[$i]} failed" && exit 1
2>&1 | sed "s/^/\n${hostnames[$i]}@${ips[$i]}: /" || echo "curl to ${hostnames[$i]} failed" && exit 1
} &
done
wait
done

View File

@@ -0,0 +1,18 @@
{
"$schema": "https://opencode.ai/config.json",
"model": "exo/mlx-community/gpt-oss-120b-MXFP4-Q8",
"provider": {
"exo": {
"api": "http://localhost:52415/v1",
"models": {
"mlx-community/gpt-oss-120b-MXFP4-Q8": {
"name": "GPT OSS 120B",
"limit": {
"context": 32768,
"output": 8192
}
}
}
}
}
}

47
tmp/set_rdma_network_config.sh Executable file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env bash
set -euo pipefail
PREFS="/Library/Preferences/SystemConfiguration/preferences.plist"
# Remove bridge0 interface
ifconfig bridge0 &>/dev/null && {
ifconfig bridge0 | grep -q 'member' && {
ifconfig bridge0 | awk '/member/ {print $2}' | xargs -n1 ifconfig bridge0 deletem 2>/dev/null || true
}
ifconfig bridge0 destroy 2>/dev/null || true
}
# Remove Thunderbolt Bridge from VirtualNetworkInterfaces in preferences.plist
/usr/libexec/PlistBuddy -c "Delete :VirtualNetworkInterfaces:Bridge:bridge0" "$PREFS" 2>/dev/null || true
networksetup -listlocations | grep -q exo || {
networksetup -createlocation exo
}
networksetup -switchtolocation exo
networksetup -listallhardwareports |
awk -F': ' '/Hardware Port: / {print $2}' |
while IFS=":" read -r name; do
case "$name" in
"Ethernet Adapter"*) ;;
"Thunderbolt Bridge") ;;
"Thunderbolt "*)
networksetup -listallnetworkservices |
grep -q "EXO $name" ||
networksetup -createnetworkservice "EXO $name" "$name" 2>/dev/null ||
continue
networksetup -setdhcp "EXO $name"
;;
*)
networksetup -listallnetworkservices |
grep -q "$name" ||
networksetup -createnetworkservice "$name" "$name" 2>/dev/null ||
continue
;;
esac
done
networksetup -listnetworkservices | grep -q "Thunderbolt Bridge" && {
networksetup -setnetworkserviceenabled "Thunderbolt Bridge" off
} || true

2174
uv.lock generated
View File

File diff suppressed because it is too large Load Diff