Compare commits

..

33 Commits

Author SHA1 Message Date
Antonio Lujano Luna
9c29eb7d48 Add proxy and custom SSL certificate support for corporate networks (#1189)
Support HTTPS_PROXY/HTTP_PROXY environment variables for proxy
configuration and SSL_CERT_FILE for custom CA certificates, enabling use
in corporate environments with SSL inspection.

## Motivation
Users in corporate environments often need to route traffic through HTTP
proxies and use custom CA certificates for SSL inspection. Without this
support, exo cannot download models in these network configurations.

## Changes
- Added `HTTPS_PROXY`/`HTTP_PROXY` environment variable support to
`create_http_session()` in `download_utils.py`
- Added `SSL_CERT_FILE` environment variable support for custom CA
certificate bundles, falling back to certifi's default bundle

## Why It Works
- `aiohttp.ClientSession` natively supports the `proxy` parameter for
routing requests through HTTP proxies
- `ssl.create_default_context(cafile=...)` accepts a custom CA bundle
path, allowing corporate CAs to be trusted
- Using environment variables is consistent with the codebase's existing
configuration patterns (e.g., `EXO_HOME`, `HF_ENDPOINT`)

## Test Plan
### Manual Testing
- Set `HTTPS_PROXY` environment variable and verified model downloads
route through proxy
- Set `SSL_CERT_FILE` to custom CA bundle and verified SSL verification
succeeds with corporate SSL inspection

### Automated Testing
- No automated tests added; this change is configuration-only and does
not alter existing behavior when environment variables are unset
2026-01-18 12:05:50 +00:00
Alex Cheema
c5158bee53 Add pre-commit checks documentation to AGENTS.md (#1184)
## Motivation

CI failures can be avoided by running checks locally before committing.
This adds clear documentation to AGENTS.md so that AI agents (and
humans) know exactly which checks must pass before pushing code.

## Changes

Added a new "Pre-Commit Checks (REQUIRED)" section to AGENTS.md that:
- Lists all 4 required checks (basedpyright, ruff, nix fmt, pytest)
- Provides a one-liner to run all checks in sequence
- Notes that `nix fmt` changes must be staged before committing
- Explains that CI runs `nix flake check` which verifies everything

## Why It Works

Clear documentation prevents CI failures by ensuring contributors run
checks locally first. The one-liner command makes it easy to run all
checks before committing.

## Test Plan

### Manual Testing
- Verified the documented commands work correctly

### Automated Testing
- N/A - documentation only change

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 21:50:24 +00:00
rltakashige
5c8a237940 Handle model timeouts (#1177)
- Add eval with a timeout.
- Add fast synch flag

## Motivation

Because of the experimental FAST SYNCH flag, some models may not work.
This PR catches when this occurs and allows users to specify a run
without fast synch

## Changes

- Adds a flag to enable or disable fast synch (--fast-synch and
--no-fast-synch)
- Adds a heuristic timeout
- Reduces exo_bench default timeout to 10 minutes.

## Why It Works

Heuristic timeout assumes normal loading times on Mac devices (60 +
model size in gb / 5: e.g. DeepSeek takes up to 120 seconds to load on
tensor parallel, and timeout is set to 60 + 120 = 180s.

We could raise this value if necessary.

## Test Plan

### Manual Testing
Catches that GPT OSS fails to load in Tensor RDMA
Can launch with --no-fast-synch flag to launch GPT OSS.

**GPT OSS 20B**
TP with fast synch
<img width="3064" height="456" alt="image"
src="https://github.com/user-attachments/assets/f6e25cd8-8621-4e99-99fe-292ee05c4035"
/>

TP without fast synch
<img width="3098" height="496" alt="image"
src="https://github.com/user-attachments/assets/d36453d9-6686-4cfe-aa7c-a7d458369d4d"
/>
[Note: the performance is really not great as fast synch is off]

(As a sanity check)
PP with fast synch
<img width="3124" height="496" alt="image"
src="https://github.com/user-attachments/assets/e97d4547-c6fa-483d-badb-4b371b900b4c"
/>

PP without fast synch
<img width="3078" height="508" alt="image"
src="https://github.com/user-attachments/assets/b2e20dfd-4b0e-4295-8a92-417dfe745c28"
/>

PP without RDMA
<img width="3070" height="498" alt="image"
src="https://github.com/user-attachments/assets/a8509d68-0aef-4cda-bca5-a67d39a0801e"
/>

TP without RDMA
<img width="3068" height="496" alt="image"
src="https://github.com/user-attachments/assets/b5691429-89f4-4369-bcf2-8fde2ad7154a"
/>
2026-01-16 20:25:12 +00:00
rltakashige
745343c705 Return error responses for Chat Completions (#1173)
- Error chunks
- Use error handling in exo_bench.py

## Motivation

Return when an error occurs so that generation stops. Adding timeouts is
a separate TODO for model loading and chat completions.

## Changes

- Return HTTP exceptions as JSON responses in an OpenAI compatible
format.
- Context manager for generation to catch and return error messages.
- Use error handling in exo_bench.py.

## Test Plan

### Manual Testing
Manually tested that exo_bench returns on failures within and outside
generation

### Automated Testing
<!-- Describe changes to automated tests, or how existing tests cover
this change -->
<!-- - -->
2026-01-16 19:24:37 +00:00
Alex Cheema
5e28664c41 Fix draft release detection (attempt 3) (#1176)
## Motivation

Previous fix still failed in CI. Suspecting permissions issue with
GITHUB_TOKEN not being able to see draft releases via API.

## Changes

1. Add explicit `permissions: contents: write` to the job
2. Use `gh release list` first to check if draft exists (this uses a
different code path that might work better)
3. Add debug echo statements

## Test Plan

Delete v1.0.63 tag and re-push after merging.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 17:26:06 +00:00
Alex Cheema
ae0a804ccb Fix draft release detection query (#1175)
## Motivation

Fixes the draft release detection that failed on the v1.0.63 release
attempt.

## Changes

The jq query was piped to `head -1` which truncated multi-line JSON
output to just `{`, causing the empty check to fail.

Changed to use `first // empty` in jq instead.

## Test Plan

Tested locally:
```bash
GITHUB_REF_NAME="v1.0.63"
gh api repos/exo-explore/exo/releases --jq "[.[] | select(.draft == true) | select(.name == \"$GITHUB_REF_NAME\")] | first // empty"
# Returns the full draft release JSON (2711 chars)
```

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 17:05:24 +00:00
Alex Cheema
07cf2c1aa1 Add GitHub releases with Sparkle release notes integration (#1172)
## Motivation

Closes #1140

Currently releases are uploaded to S3 for Sparkle updates but there's no
GitHub Release created, and Sparkle update dialogs don't show release
notes. Users have no visibility into what changed.

## Changes

- Added release workflow documentation comment at top of `build-app.yml`
- Added "Fetch release notes for Sparkle" step that converts markdown
from draft GitHub release to HTML
- Added "Inject release notes into appcast" step that embeds HTML in
appcast.xml with CDATA
- Added "Publish GitHub Release" step that attaches DMG and publishes
the draft

## Why It Works

- Sparkle's `<description>` tag supports HTML wrapped in CDATA for
rendering in update dialogs
- GitHub's markdown API (`/markdown`) converts the release notes to HTML
with proper formatting
- Draft releases allow writing polished notes before the build, then the
workflow publishes them automatically
- The workflow fails if no draft release exists, ensuring release notes
are always provided

## Test Plan

### Manual Testing
1. Create a draft GitHub release for a new tag with markdown release
notes
2. Push the tag to trigger the workflow
3. Verify the GitHub release is published with DMG attached
4. Download appcast.xml from S3 and verify
`<description><![CDATA[...]]></description>` contains HTML
5. Test Sparkle update dialog on macOS to confirm release notes appear

### Automated Testing
No automated tests added - this is CI workflow configuration.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 16:47:33 +00:00
Evan
83c5285a80 reduce logs
previous commits logs were too verbose, this tones them down a bit
2026-01-16 14:05:47 +00:00
Evan Quiney
39ee2bf7bd switch from synchronous threaded pinging to an async implementation (#1170)
still seeing churn in our networking - lets properly rate limit it

## changes

added an httpx client with max connections with a persistent AsyncClient

## testing

deployed on cluster, discovery VASTLY more stable (the only deleted
edges were those discovered by mdns)
2026-01-16 13:20:03 +00:00
Sami Khan
991adfbd6f fix local network warning (#1136)
## Motivation

Local network warning banner was showing on fresh install even though
mDNS was working. The check would fail before the user had a chance to
grant permission via the macOS prompt.

## Changes

- Added `hasWorkedBefore` flag persisted in UserDefaults
- Only show warning if permission previously worked but now doesn't

## Why It Works

On fresh install, the check may fail (no permission yet), but
`hasWorkedBefore` is false so no warning shows. Once the user grants
permission and a check succeeds, we record it. Future failures (zombie
permission after restart) will show the warning since `hasWorkedBefore`
is now true.

## Test Plan

### Manual Testing
Run locally

### Automated Testing
N/A
2026-01-16 13:10:50 +00:00
rltakashige
4b3de6b984 Fix exo bench for transformers 5.x (#1168)
## Motivation
Prompt Sizer was broken as transformers 5.x tokenizers create
BatchEncodings which are essentially a dictionary of {input_ids: []}
instead of the list of input ids.

## Test Plan

### Manual Testing
Tested that exo bench runs as expected.

### Automated Testing
<!-- Describe changes to automated tests, or how existing tests cover
this change -->
<!-- - -->
2026-01-16 12:39:22 +00:00
Evan
c8de3b90ea quiet rust logs
rust logs were too verbose - now only warnings propagate to python

entirely happy not to merge this and to clean up rust logging instead,
but this felt saner right now
2026-01-16 12:34:28 +00:00
Sami Khan
6e6567a802 resolve issue #1070 (#1076)
## Motivation

https://github.com/exo-explore/exo/issues/1070

## Changes

Added check in ChatForm.svelte to reset selectedChatModel when it no
longer matches any running instance.

## Why It Works

The $effect now detects when the selected model is stale (not in
availableModels()) and resets to the first available model.

## Test Plan

### Manual Testing

1. Create instance of Model A → Delete it → Create instance of Model B →
Chat
2. Verify request goes to Model B (not Model A)

---------

Co-authored-by: Alex Cheema <41707476+AlexCheema@users.noreply.github.com>
2026-01-15 20:00:41 +00:00
rltakashige
a735dad667 Parse GPT OSS in runner (#1160)
## Motivation

Simplification of API + moving model specific code to the runner

<!-- Why is this change needed? What problem does it solve? -->
<!-- If it fixes an open issue, please link to the issue here -->

## Test Plan

### Manual Testing
Tested that GPT OSS outputs are parsed correctly on the dashboard.

### Automated Testing
<!-- Describe changes to automated tests, or how existing tests cover
this change -->
<!-- - -->
2026-01-15 19:53:55 +00:00
rltakashige
aaf4e36bc3 FIX GPT OSS (#1165)
## Motivation

Adds several unmerged fixes for GPT OSS.
Also adds GPT OSS 20B MXFP4 Q8 instead of Q4 for numerical stability (as
this is unstable for MLX LM too)
<!-- Why is this change needed? What problem does it solve? -->
<!-- If it fixes an open issue, please link to the issue here -->


## Test Plan

### Manual Testing
Manually tested. No further gibberish responses.

### Automated Testing
Ran EXO Bench - pipeline, tensor and single node work on both 20B and
120B models
2026-01-15 19:20:17 +00:00
Evan Quiney
3e623ccf0d up http timeout to 3 seconds and retry on BadStatusLine (#1164)
we're seeing a lot of network churn - perhaps this is a connection
timing out issue? lets also re-try after a second

## testing
none yet

---------

Co-authored-by: Alex Cheema <alexcheema123@gmail.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-15 18:15:12 +00:00
Evan Quiney
c22dad8a7d dashboard: add peer: true to package lock (#1162)
this happens every time i run npm install - lets upstream it

## testing
dashboard builds and renders
2026-01-15 17:01:43 +00:00
Evan
4bc4d50685 rust: remove dead code
the system custodian has been made unnecessary with the swift app - we
can remove it

## testing
everything still builds
2026-01-15 16:51:46 +00:00
Jake Hillion
e0aab46fd8 model_cards.py: clean up commented out code
Clean up the commented out code and make sure the comments are unified.
Carrying around the commented out code means people making changes to
model_cards are supposed to update it, but that's not clear and won't be
picked up by type checking etc. Drop it for now - it's in the git
history.

Also make the rest of the comments a bit more uniform, and place
comments about a specific model card inside the model card (instead of
above) so they don't get lost when code is added/moved around.

Test plan:
- my eyes
2026-01-15 13:21:58 +00:00
Evan Quiney
82ba42bae9 add glm-47, minimax-m21 (#1147)
Adds support glm 4.7 and MiniMax M2.1

Manual testing:
Tensor + Pipeline execution of both models.

Closes #1141 and #1142
2026-01-14 16:33:17 +00:00
Jake Hillion
3671528fa4 nix: add dashboard build with dream2nix
Continue working towards a fully Nix based build by building the
dashboard with Nix. Continuing the theme of using the existing lock
files, use dream2nix to parse the lock file and build the tree of
dependency derivations.

dream2nix doesn't like the bundleDependencies, so we apply a small patch
to the lock file that drops all dependencies that are bundled. This
should ideally be contributed upstream but that can be done later.

Use this new dashboard build in the build-app CI workflow, meaning
future macOS apps will include this reproducible dashboard.

Test plan:
- Built a DMG, shipped to a cluster, loaded in a browser with no cache
  and the dashboard looks good.

- Directory layout is as expected:
```
$ nix build .#dashboard
$ find result/
...
result/_app/immutable/entry
result/_app/immutable/entry/app.CTPAnMjf.js
result/_app/immutable/entry/start.fUSEa-2O.js
result/_app/immutable/nodes
result/_app/immutable/nodes/3.DqQr1Obm.js
result/_app/immutable/nodes/0.DgEY44RO.js
result/_app/immutable/nodes/2.BjZg_lJh.js
result/_app/immutable/nodes/1.D6vGUYYT.js
result/_app/env.js
result/_app/version.json
result/exo-logo.png
result/favicon.ico
result/index.html
```
2026-01-14 15:58:16 +01:00
Jake Hillion
e6434ec446 nix: add Rust builds with crane and fenix
The Rust workspace lacked Nix build support, making it difficult to
build packages reproducibly or run checks in CI.

Added a flake-parts module at rust/parts.nix that uses crane for Rust
builds and fenix for the nightly toolchain. The source filter isolates
rust/ and root Cargo files to prevent Python/docs changes from
triggering Rust rebuilds. Exports packages (system_custodian,
exo_pyo3_bindings wheel, exo-rust-workspace) and checks (cargo-nextest,
cargo-doc) for all three target platforms.

The devShell now uses inputsFrom to inherit build dependencies from the
workspace package, removing the need for manual pkg-config/openssl setup.

Test plan:
- Ran `nix flake check` successfully
- Built `nix build ".#checks.x86_64-linux.cargo-nextest"` and tests pass
- Built `nix build ".#exo_pyo3_bindings"` and wheel is produced
2026-01-14 11:52:29 +00:00
Jake Hillion
bdb43e1dbb nix: drop noisy echos from devshell
Drop all the printing when entering a devshell. It's annoying, and not a
super accurate description of how to develop exo anyway.
2026-01-14 10:04:57 +00:00
Jake Hillion
e4a01e2b0e chore(deps): nix lock file maintenance
Update nix flake inputs. Add a second input as Swift is currently broken
in nixpkgs on Linux for `swift-format` as we want `nix fmt` to continue
being reproducible everywhere.
2026-01-13 19:57:14 +01:00
Evan Quiney
1200a7db64 Add tensor sharding for GPT-OSS (#1144)
## Motivation

GPT OSS did not previously support tensor sharding

## Changes

Add GPT sharding support in tensor_auto_parallel.
Code is mostly @rltakashige's

## Test Plan

### Manual Testing
Tested GPT-OSS - MLX Fast Sync causes issues in Tensor RDMA - this is a general problem at the moment.
2026-01-13 17:25:52 +00:00
Evan Quiney
47ceb54bc1 up the rlimit (#1148)
Fixes #1117 

Manual testing:
Launched 100 instances. worked. yay.
2026-01-13 15:00:54 +00:00
Jake Hillion
f8112fdf25 nix: convert to flake-parts
Preparing to add a flake-parts module for Rust builds. The flake-utils
library doesn't support the module system needed for cleanly separating
the Rust build configuration.

Converted from flake-utils to flake-parts, switching to the treefmt-nix
flakeModule import pattern. The devShell and formatter outputs remain
functionally equivalent.

Test plan:
- Ran `nix flake check` successfully
- Verified `nix develop` provides the same environment
2026-01-13 15:06:44 +01:00
Alex Cheema
e388f59480 docs: add AGENTS.md for AI coding agents guidance (#1132)
## Motivation

Add documentation to help AI coding agents (Claude Code, Cursor, GitHub
Copilot, etc.) understand the exo codebase and contribute effectively.

## Changes

- Add `AGENTS.md` with guidance for AI agents working on the codebase
- Add symlink `CLAUDE.md -> AGENTS.md` for backwards compatibility with
Claude Code

## Why It Works

`AGENTS.md` is becoming a standard convention for AI agent instructions.
The symlink ensures Claude Code (which looks for `CLAUDE.md`) continues
to work while supporting the broader `AGENTS.md` convention.

## Test Plan

### Manual Testing
- Verified symlink works correctly

### Automated Testing
- N/A (documentation only)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 13:05:47 +00:00
Alex Cheema
e5e74e1eef Upgrade mlx-lm to 0.30.2 with transformers 5.x compatibility (#1125)
## Motivation

Upgrade mlx-lm to version 0.30.2 which requires transformers 5.0.0rc2 as
a prerelease dependency. This enables support for newer models like Kimi
K2 Thinking while maintaining compatibility with existing models.

The transformers 5.x release includes breaking changes that affect
custom tokenizers like Kimi's TikTokenTokenizer, requiring compatibility
fixes.

## Changes

### Core Changes
- **mlx-lm upgrade**: Bump to 0.30.2 with locked exact versions for
mlx/mlx-lm to prevent breaking changes
- **transformers 5.x compatibility**: Enable prerelease transformers
dependency

### Kimi K2 Tokenizer Fixes
- Add `bytes_to_unicode` monkey-patch to restore function moved in
transformers 5.0.0rc2
- Load `TikTokenTokenizer` directly instead of via `AutoTokenizer` to
bypass transformers 5.x bug with `auto_map` fallback
- Patch `encode()` to use tiktoken directly with `allowed_special="all"`
to handle special tokens from chat templates

### Other Changes
- Dashboard: Show disk usage for completed model downloads
- CI: Add `workflow_dispatch` trigger to build-app workflow
- Docs: Add basic API documentation

### Testing
- Add comprehensive tokenizer unit tests for all supported models
- Tests verify encode/decode, special token handling, and chat template
encoding

## Why It Works

**bytes_to_unicode issue**: transformers 5.0.0rc2 moved
`bytes_to_unicode` from `transformers.models.gpt2.tokenization_gpt2` to
`transformers.convert_slow_tokenizer`. Kimi's `tokenization_kimi.py`
imports from the old location. The monkey-patch restores it at module
load time.

**AutoTokenizer issue**: transformers 5.x has a bug where
`tokenizer_class_from_name('TikTokenTokenizer')` returns `None` for
custom tokenizers with `auto_map`. Loading the tokenizer directly
bypasses this.

**encode() issue**: transformers 5.x's `pad()` method fails for slow
tokenizers. Using tiktoken's encode directly with
`allowed_special="all"` avoids this path and properly handles special
tokens like `<|im_user|>` from chat templates.

## Test Plan

### Manual Testing
- Hardware: 2x Mac Studios connected via Thunderbolt 5 (mike22 and
james21)
- Tested Kimi K2 Thinking, GPT-OSS-120B, GPT-OSS-20B, LLama-3.1-8B-bf16, qwen3-30B-A3B-8bit model with pipeline parallelism across both
nodes
- Verified warmup inference completes successfully
- Verified chat completions work with special tokens

### Automated Testing
- Added `test_tokenizers.py` with 31 tests covering:
- Basic encode/decode for all model families (deepseek, kimi, llama,
qwen, gpt-oss, glm)
  - Special token encoding (critical for chat templates)
  - Chat template application and encoding
  - Kimi-specific and GLM-specific edge cases
- All tests pass: `uv run pytest
src/exo/worker/tests/unittests/test_mlx/test_tokenizers.py`

### Failing Tests
RDMA with all models.

---------

Co-authored-by: Evan <evanev7@gmail.com>
2026-01-13 12:06:04 +00:00
Jake Hillion
b968d6f0a0 ci: remove old commented out job 2026-01-13 12:42:04 +01:00
Jake Hillion
3bfffd9b4f ci: build all Nix outputs on all platforms and push to cachix
The CI was only running `nix flake check` on ubuntu-latest, missing
builds for other platforms and not caching packages or devShells.

Added a matrix-based `nix-build` job that runs on macos-26 (aarch64-darwin),
ubuntu-latest (x86_64-linux), and ubuntu-24.04-arm (aarch64-linux). Each
job enumerates all packages and devShells via `nix flake show --json`,
builds them in a single `nix build` call for parallelization, then runs
`nix flake check`. The cachix-action pushes all built outputs automatically.

This ensures all Nix outputs are built and cached for every supported
platform, speeding up local development and CI runs.

Test plan:
- Tested jq enumeration command locally, correctly outputs devShell paths
- Verified xargs pipeline works with the enumerated outputs
2026-01-13 12:37:12 +01:00
Jake Hillion
007eb80029 nix: enable cachix
Enable cachix and push to it in the pipeline.yml workflow. This won't
cache a huge amount yet but will automatically extend our caching as we
build more of the repo with Nix in CI. It can also be used by local
users by accepting our cache to improve the speed of local builds.

Test plan:
- CI
2026-01-12 17:24:59 +01:00
Jake Hillion
8d7b6789b3 dashboard: show disk usage for completed models
The downloads dashboard showed "Completed" for finished model downloads
but provided no indication of how much disk space each model or the
total models on a node were using.

Added total_bytes field to DownloadCompleted type so the size is
preserved when a download completes. Updated the dashboard to display
the model size next to "Completed" status (e.g., "Completed (251.1GB)")
and a total disk usage line below the model count for each node (e.g.,
"502.2GB on disk").

Test plan:
- Ran unit tests for download apply and planning logic
- Type checked all modified files with basedpyright
2026-01-12 16:34:29 +01:00
113 changed files with 2933 additions and 25573 deletions

View File

@@ -1,5 +1,16 @@
name: Build EXO macOS DMG
# Release workflow:
# 1. Create a draft GitHub Release with the tag name (e.g. v1.0.0) and write release notes in markdown
# 2. Push the tag: git tag v1.0.0 && git push origin v1.0.0
# 3. This workflow builds, signs, and notarizes the DMG
# 4. Release notes are embedded in appcast.xml for Sparkle (rendered as markdown)
# 5. DMG and appcast.xml are uploaded to S3
# 6. The draft GitHub Release is published with the DMG attached
#
# For alpha releases (e.g. v1.0.0-alpha.1): draft release and notes are optional.
# If no draft exists, a release is auto-created with generated notes.
on:
workflow_dispatch:
push:
@@ -11,8 +22,10 @@ on:
jobs:
build-macos-app:
runs-on: "macos-26"
permissions:
contents: write
env:
SPARKLE_VERSION: 2.8.1
SPARKLE_VERSION: 2.9.0-beta.1
SPARKLE_DOWNLOAD_PREFIX: ${{ secrets.SPARKLE_DOWNLOAD_PREFIX }}
SPARKLE_FEED_URL: ${{ secrets.SPARKLE_FEED_URL }}
SPARKLE_ED25519_PUBLIC: ${{ secrets.SPARKLE_ED25519_PUBLIC }}
@@ -87,6 +100,52 @@ jobs:
exit 1
fi
- name: Fetch and validate release notes
if: github.ref_type == 'tag'
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Find draft release by name using gh release list (more reliable with default token)
echo "Looking for draft release named '$GITHUB_REF_NAME'..."
DRAFT_EXISTS=$(gh release list --json name,isDraft --jq ".[] | select(.isDraft == true) | select(.name == \"$GITHUB_REF_NAME\") | .name" 2>/dev/null || echo "")
if [[ -z "$DRAFT_EXISTS" ]]; then
if [[ "$IS_ALPHA" == "true" ]]; then
echo "No draft release found for alpha tag $GITHUB_REF_NAME (optional for alphas)"
echo "HAS_RELEASE_NOTES=false" >> $GITHUB_ENV
exit 0
fi
echo "ERROR: No draft release found for tag $GITHUB_REF_NAME"
echo "Please create a draft release with release notes before pushing the tag."
exit 1
fi
# Fetch full release details via API to get body and ID
echo "Found draft release, fetching details..."
RELEASE_JSON=$(gh api repos/${{ github.repository }}/releases --jq ".[] | select(.draft == true) | select(.name == \"$GITHUB_REF_NAME\")" 2>/dev/null || echo "")
# Extract release notes
NOTES=$(echo "$RELEASE_JSON" | jq -r '.body // ""')
if [[ -z "$NOTES" || "$NOTES" == "null" ]]; then
if [[ "$IS_ALPHA" == "true" ]]; then
echo "Draft release has no notes (optional for alphas)"
echo "HAS_RELEASE_NOTES=false" >> $GITHUB_ENV
exit 0
fi
echo "ERROR: Draft release exists but has no release notes"
echo "Please add release notes to the draft release before pushing the tag."
exit 1
fi
# Save release ID for later publishing
RELEASE_ID=$(echo "$RELEASE_JSON" | jq -r '.id')
echo "DRAFT_RELEASE_ID=$RELEASE_ID" >> $GITHUB_ENV
echo "HAS_RELEASE_NOTES=true" >> $GITHUB_ENV
echo "Found draft release (ID: $RELEASE_ID), saving release notes..."
echo "$NOTES" > /tmp/release_notes.md
echo "RELEASE_NOTES_FILE=/tmp/release_notes.md" >> $GITHUB_ENV
# ============================================================
# Install dependencies
# ============================================================
@@ -113,11 +172,22 @@ jobs:
uv python install
uv sync --locked
- name: Install Nix
uses: cachix/install-nix-action@v31
with:
nix_path: nixpkgs=channel:nixos-unstable
- name: Configure Cachix
uses: cachix/cachix-action@v14
with:
name: exo
authToken: "${{ secrets.CACHIX_AUTH_TOKEN }}"
- name: Build dashboard
run: |
cd dashboard
npm ci
npm run build
DASHBOARD_OUT=$(nix build .#dashboard --print-build-logs --no-link --print-out-paths)
mkdir -p dashboard/build
cp -r "$DASHBOARD_OUT"/* dashboard/build/
- name: Install Sparkle CLI
run: |
@@ -293,6 +363,28 @@ jobs:
$CHANNEL_FLAG \
.
- name: Inject release notes into appcast
if: github.ref_type == 'tag' && env.HAS_RELEASE_NOTES == 'true'
env:
RELEASE_VERSION: ${{ env.RELEASE_VERSION }}
run: |
# Inject markdown release notes with sparkle:format="markdown" (Sparkle 2.9+)
export NOTES=$(cat "$RELEASE_NOTES_FILE")
# Insert description after the enclosure tag for this version
awk '
/<enclosure[^>]*>/ && index($0, ENVIRON["RELEASE_VERSION"]) {
print
print " <description sparkle:format=\"markdown\"><![CDATA["
print ENVIRON["NOTES"]
print " ]]></description>"
next
}
{ print }
' output/appcast.xml > output/appcast.xml.tmp && mv output/appcast.xml.tmp output/appcast.xml
echo "Injected markdown release notes for version $RELEASE_VERSION"
# ============================================================
# Upload artifacts
# ============================================================
@@ -325,3 +417,26 @@ jobs:
aws s3 cp "$DMG_NAME" "s3://${SPARKLE_S3_BUCKET}/${PREFIX}EXO-latest.dmg"
aws s3 cp appcast.xml "s3://${SPARKLE_S3_BUCKET}/${PREFIX}appcast.xml" --content-type application/xml --cache-control no-cache
fi
- name: Publish GitHub Release
if: github.ref_type == 'tag'
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
DMG_PATH="output/EXO-${RELEASE_VERSION}.dmg"
if [[ "$HAS_RELEASE_NOTES" == "true" ]]; then
# Update the draft release with the tag and upload DMG
gh api --method PATCH "repos/${{ github.repository }}/releases/$DRAFT_RELEASE_ID" \
-f tag_name="$GITHUB_REF_NAME" \
-F draft=false
gh release upload "$GITHUB_REF_NAME" "$DMG_PATH" --clobber
echo "Published release $GITHUB_REF_NAME with DMG attached"
else
# Alpha without draft release - create one with auto-generated notes
gh release create "$GITHUB_REF_NAME" "$DMG_PATH" \
--title "$GITHUB_REF_NAME" \
--generate-notes \
--prerelease
echo "Created alpha release $GITHUB_REF_NAME with auto-generated notes"
fi

View File

@@ -20,6 +20,12 @@ jobs:
with:
nix_path: nixpkgs=channel:nixos-unstable
- uses: cachix/cachix-action@v14
name: Configure Cachix
with:
name: exo
authToken: "${{ secrets.CACHIX_AUTH_TOKEN }}"
- name: Configure git user
run: |
git config --local user.email "github-actions@users.noreply.github.com"
@@ -88,9 +94,19 @@ jobs:
- uses: ./.github/actions/typecheck
nix-flake-check:
name: Check Nix flake
runs-on: ubuntu-latest
nix:
name: Build and check (${{ matrix.system }})
runs-on: ${{ matrix.runner }}
strategy:
fail-fast: false
matrix:
include:
- runner: macos-26
system: aarch64-darwin
- runner: ubuntu-latest
system: x86_64-linux
- runner: ubuntu-24.04-arm
system: aarch64-linux
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -101,83 +117,20 @@ jobs:
with:
nix_path: nixpkgs=channel:nixos-unstable
- name: Run nix flake check
run: |
nix flake check
shell: bash
- uses: cachix/cachix-action@v14
name: Configure Cachix
with:
name: exo
authToken: "${{ secrets.CACHIX_AUTH_TOKEN }}"
# ci:
# needs: typecheck
# runs-on: ubuntu-latest
# permissions:
# contents: read
# env:
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# steps:
# - name: Checkout repository
# uses: actions/checkout@v4
# with:
# fetch-depth: 0
# token: ${{ secrets.GITHUB_TOKEN }}
# lfs: true
#
# - name: Configure git user
# run: |
# git config --local user.email "github-actions@users.noreply.github.com"
# git config --local user.name "github-actions bot"
# shell: bash
#
# - name: Pull LFS files
# run: |
# echo "Pulling Git LFS files..."
# git lfs pull
# shell: bash
#
# - name: Setup EXO_HOME and API_PORT
# run: |
# EXO_HOME=$(mktemp -d -t exo-ci-XXXXXXXX)
# # Generate random port (macOS compatible method)
# API_PORT=$((49152 + RANDOM % (65535 - 49152 + 1)))
# echo "EXO_HOME=$EXO_HOME" >> $GITHUB_ENV
# echo "API_PORT=$API_PORT" >> $GITHUB_ENV
# echo "Created EXO_HOME: $EXO_HOME"
# echo "Generated API_PORT: $API_PORT"
# shell: bash
#
# - name: Setup Nix Environment
# run: |
# echo "Checking for nix installation..."
#
# # Check if nix binary exists directly
# if [ -f /nix/var/nix/profiles/default/bin/nix ]; then
# echo "Found nix binary at /nix/var/nix/profiles/default/bin/nix"
# export PATH="/nix/var/nix/profiles/default/bin:$PATH"
# echo "PATH=$PATH" >> $GITHUB_ENV
# nix --version
# elif [ -f /nix/var/nix/profiles/default/etc/profile.d/nix-daemon.sh ]; then
# echo "Found nix profile script, sourcing..."
# source /nix/var/nix/profiles/default/etc/profile.d/nix-daemon.sh
# nix --version
# elif command -v nix >/dev/null 2>&1; then
# echo "Nix already in PATH"
# nix --version
# else
# echo "Nix not found. Debugging info:"
# echo "Contents of /nix/var/nix/profiles/default/:"
# ls -la /nix/var/nix/profiles/default/ 2>/dev/null || echo "Directory not found"
# echo "Contents of /nix/var/nix/profiles/default/bin/:"
# ls -la /nix/var/nix/profiles/default/bin/ 2>/dev/null || echo "Directory not found"
# exit 1
# fi
# shell: bash
#
# - uses: ./.github/actions/lint-check
#
# - uses: ./.github/actions/unit-test
#
# - name: Cleanup EXO_HOME
# run: |
# echo "Cleaning up EXO_HOME: $EXO_HOME"
# rm -rf "$EXO_HOME"
# shell: bash
# if: always()
- name: Build all Nix outputs
run: |
nix flake show --json | jq -r '
[
(.packages."${{ matrix.system }}" // {} | keys[] | ".#packages.${{ matrix.system }}.\(.)"),
(.devShells."${{ matrix.system }}" // {} | keys[] | ".#devShells.${{ matrix.system }}.\(.)")
] | .[]
' | xargs nix build
- name: Run nix flake check
run: nix flake check

View File

@@ -0,0 +1,156 @@
"""Type stubs for mlx_lm.models.deepseek_v3"""
from dataclasses import dataclass
from typing import Any, Dict, Optional
import mlx.core as mx
import mlx.nn as nn
from .base import BaseModelArgs
from .switch_layers import SwitchGLU
@dataclass
class ModelArgs(BaseModelArgs):
model_type: str
vocab_size: int
hidden_size: int
intermediate_size: int
moe_intermediate_size: int
num_hidden_layers: int
num_attention_heads: int
num_key_value_heads: int
n_shared_experts: Optional[int]
n_routed_experts: Optional[int]
routed_scaling_factor: float
kv_lora_rank: int
q_lora_rank: Optional[int]
qk_rope_head_dim: int
v_head_dim: int
qk_nope_head_dim: int
topk_method: str
scoring_func: str
norm_topk_prob: bool
n_group: int
topk_group: int
num_experts_per_tok: int
moe_layer_freq: int
first_k_dense_replace: int
max_position_embeddings: int
rms_norm_eps: float
rope_theta: float
rope_scaling: Optional[Dict[str, Any]]
attention_bias: bool
class DeepseekV3Attention(nn.Module):
config: ModelArgs
hidden_size: int
num_heads: int
max_position_embeddings: int
rope_theta: float
q_lora_rank: Optional[int]
qk_rope_head_dim: int
kv_lora_rank: int
v_head_dim: int
qk_nope_head_dim: int
q_head_dim: int
scale: float
q_proj: nn.Linear
q_a_proj: nn.Linear
q_a_layernorm: nn.RMSNorm
q_b_proj: nn.Linear
kv_a_proj_with_mqa: nn.Linear
kv_a_layernorm: nn.RMSNorm
kv_b_proj: nn.Linear
o_proj: nn.Linear
rope: Any
def __init__(self, config: ModelArgs) -> None: ...
def __call__(
self,
x: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class DeepseekV3MLP(nn.Module):
config: ModelArgs
hidden_size: int
intermediate_size: int
gate_proj: nn.Linear
up_proj: nn.Linear
down_proj: nn.Linear
def __init__(
self,
config: ModelArgs,
hidden_size: Optional[int] = None,
intermediate_size: Optional[int] = None,
) -> None: ...
def __call__(self, x: mx.array) -> mx.array: ...
class MoEGate(nn.Module):
config: ModelArgs
top_k: int
norm_topk_prob: bool
n_routed_experts: Optional[int]
routed_scaling_factor: float
n_group: int
topk_group: int
weight: mx.array
e_score_correction_bias: mx.array
def __init__(self, config: ModelArgs) -> None: ...
def __call__(self, x: mx.array) -> tuple[mx.array, mx.array]: ...
class DeepseekV3MoE(nn.Module):
config: ModelArgs
num_experts_per_tok: int
switch_mlp: SwitchGLU
gate: MoEGate
shared_experts: DeepseekV3MLP
sharding_group: Optional[mx.distributed.Group]
def __init__(self, config: ModelArgs) -> None: ...
def __call__(self, x: mx.array) -> mx.array: ...
class DeepseekV3DecoderLayer(nn.Module):
self_attn: DeepseekV3Attention
mlp: DeepseekV3MLP | DeepseekV3MoE
input_layernorm: nn.RMSNorm
post_attention_layernorm: nn.RMSNorm
def __init__(self, config: ModelArgs, layer_idx: int) -> None: ...
def __call__(
self,
x: mx.array,
mask: Optional[mx.array] = None,
cache: Optional[Any] = None,
) -> mx.array: ...
class DeepseekV3Model(nn.Module):
vocab_size: int
embed_tokens: nn.Embedding
layers: list[DeepseekV3DecoderLayer]
norm: nn.RMSNorm
def __init__(self, config: ModelArgs) -> None: ...
def __call__(
self,
x: mx.array,
cache: Optional[Any] = None,
) -> mx.array: ...
class Model(nn.Module):
model_type: str
model: DeepseekV3Model
lm_head: nn.Linear
def __init__(self, config: ModelArgs) -> None: ...
def __call__(
self,
inputs: mx.array,
cache: Optional[Any] = None,
) -> mx.array: ...
def sanitize(self, weights: dict[str, Any]) -> dict[str, Any]: ...
@property
def layers(self) -> list[DeepseekV3DecoderLayer]: ...

View File

@@ -57,6 +57,11 @@ class SwiGLU(nn.Module):
def __call__(self, x, gate): ...
class SwitchGLU(nn.Module):
gate_proj: SwitchLinear
up_proj: SwitchLinear
down_proj: SwitchLinear
activation: SwiGLU
def __init__(
self,
input_dims: int,

View File

@@ -4,6 +4,7 @@ This type stub file was generated by pyright.
from functools import partial
from pathlib import Path
from typing import Any
from transformers import PreTrainedTokenizerFast
@@ -103,37 +104,55 @@ class TokenizerWrapper:
Accessing any attribute other than the ``detokenizer`` is forwarded to the
huggingface tokenizer.
"""
def __init__(self, tokenizer, detokenizer_class=..., eos_token_ids=...) -> None: ...
def add_eos_token(self, token: str): # -> None:
...
@property
def has_thinking(self): # -> bool:
...
@property
def think_start(self): # -> str | None:
...
@property
def think_end(self): # -> str | None:
...
@property
def has_tool_calling(self): # -> bool:
...
@property
def tool_call_start(self): # -> str | None:
...
@property
def tool_call_end(self): # -> str | None:
...
@property
def detokenizer(self): # -> NaiveStreamingDetokenizer:
"""
Get a stateful streaming detokenizer.
"""
def __getattr__(self, attr): # -> set[Any] | Any:
...
def __setattr__(self, attr, value): # -> None:
...
_tokenizer: PreTrainedTokenizerFast
eos_token_id: int | None
eos_token: str | None
bos_token_id: int | None
bos_token: str | None
vocab_size: int
all_special_tokens: list[str]
def __init__(
self,
tokenizer: Any,
detokenizer_class: Any = ...,
eos_token_ids: list[int] | None = ...,
chat_template: Any = ...,
tool_parser: Any = ...,
tool_call_start: str | None = ...,
tool_call_end: str | None = ...,
) -> None: ...
def encode(self, text: str, **kwargs: Any) -> list[int]: ...
def decode(self, token_ids: list[int], **kwargs: Any) -> str: ...
def apply_chat_template(
self,
messages: list[dict[str, Any]],
tokenize: bool = False,
add_generation_prompt: bool = False,
tools: Any = None,
**kwargs: Any,
) -> str: ...
def get_vocab(self) -> dict[str, int]: ...
def add_eos_token(self, token: str) -> None: ...
@property
def has_thinking(self) -> bool: ...
@property
def think_start(self) -> str | None: ...
@property
def think_end(self) -> str | None: ...
@property
def has_tool_calling(self) -> bool: ...
@property
def tool_call_start(self) -> str | None: ...
@property
def tool_call_end(self) -> str | None: ...
@property
def detokenizer(self) -> NaiveStreamingDetokenizer:
"""Get a stateful streaming detokenizer."""
def __getattr__(self, attr: str) -> Any: ...
def __setattr__(self, attr: str, value: Any) -> None: ...
class NewlineTokenizer(PreTrainedTokenizerFast):
"""A tokenizer that replaces newlines with <n> and <n> with new line."""
@@ -146,18 +165,11 @@ class NewlineTokenizer(PreTrainedTokenizerFast):
def batch_decode(self, *args, **kwargs): # -> list[str]:
...
def load_tokenizer(
def load(
model_path: Path,
tokenizer_config_extra=...,
return_tokenizer=...,
eos_token_ids=...,
) -> (
TokenizerWrapper
| type[SPMStreamingDetokenizer]
| partial[SPMStreamingDetokenizer]
| type[BPEStreamingDetokenizer]
| type[NaiveStreamingDetokenizer]
):
tokenizer_config_extra: dict[str, Any] | None = None,
eos_token_ids: list[int] | int | None = None,
) -> TokenizerWrapper:
"""Load a huggingface tokenizer and try to infer the type of streaming
detokenizer to use.
@@ -165,4 +177,7 @@ def load_tokenizer(
a Hugging Face repo ID.
"""
def no_bos_or_eos(sequence: list, bos: int, eos: int) -> list: ...
# Alias for backward compatibility
load_tokenizer = load
def no_bos_or_eos(sequence: list[int], bos: int, eos: int) -> list[int]: ...

121
AGENTS.md Normal file
View File

@@ -0,0 +1,121 @@
# AGENTS.md
This file provides guidance to AI coding agents when working with code in this repository.
## Project Overview
exo is a distributed AI inference system that connects multiple devices into a cluster. It enables running large language models across multiple machines using MLX as the inference backend and libp2p for peer-to-peer networking.
## Build & Run Commands
```bash
# Build the dashboard (required before running exo)
cd dashboard && npm install && npm run build && cd ..
# Run exo (starts both master and worker with API at http://localhost:52415)
uv run exo
# Run with verbose logging
uv run exo -v # or -vv for more verbose
# Run tests (excludes slow tests by default)
uv run pytest
# Run all tests including slow tests
uv run pytest -m ""
# Run a specific test file
uv run pytest src/exo/shared/tests/test_election.py
# Run a specific test function
uv run pytest src/exo/shared/tests/test_election.py::test_function_name
# Type checking (strict mode)
uv run basedpyright
# Linting
uv run ruff check
# Format code (using nix)
nix fmt
```
## Pre-Commit Checks (REQUIRED)
**IMPORTANT: Always run these checks before committing code. CI will fail if these don't pass.**
```bash
# 1. Type checking - MUST pass with 0 errors
uv run basedpyright
# 2. Linting - MUST pass
uv run ruff check
# 3. Formatting - MUST be applied
nix fmt
# 4. Tests - MUST pass
uv run pytest
```
Run all checks in sequence:
```bash
uv run basedpyright && uv run ruff check && nix fmt && uv run pytest
```
If `nix fmt` changes any files, stage them before committing. The CI runs `nix flake check` which verifies formatting, linting, and runs Rust tests.
## Architecture
### Node Composition
A single exo `Node` (src/exo/main.py) runs multiple components:
- **Router**: libp2p-based pub/sub messaging via Rust bindings (exo_pyo3_bindings)
- **Worker**: Handles inference tasks, downloads models, manages runner processes
- **Master**: Coordinates cluster state, places model instances across nodes
- **Election**: Bully algorithm for master election
- **API**: FastAPI server for OpenAI-compatible chat completions
### Message Flow
Components communicate via typed pub/sub topics (src/exo/routing/topics.py):
- `GLOBAL_EVENTS`: Master broadcasts indexed events to all workers
- `LOCAL_EVENTS`: Workers send events to master for indexing
- `COMMANDS`: Workers/API send commands to master
- `ELECTION_MESSAGES`: Election protocol messages
- `CONNECTION_MESSAGES`: libp2p connection updates
### Event Sourcing
The system uses event sourcing for state management:
- `State` (src/exo/shared/types/state.py): Immutable state object
- `apply()` (src/exo/shared/apply.py): Pure function that applies events to state
- Master indexes events and broadcasts; workers apply indexed events
### Key Type Hierarchy
- `src/exo/shared/types/`: Pydantic models for all shared types
- `events.py`: Event types (discriminated union)
- `commands.py`: Command types
- `tasks.py`: Task types for worker execution
- `state.py`: Cluster state model
### Rust Components
Rust code in `rust/` provides:
- `networking`: libp2p networking (gossipsub, peer discovery)
- `exo_pyo3_bindings`: PyO3 bindings exposing Rust to Python
- `system_custodian`: System-level operations
### Dashboard
Svelte 5 + TypeScript frontend in `dashboard/`. Build output goes to `dashboard/build/` and is served by the API.
## Code Style Requirements
From .cursorrules:
- Strict, exhaustive typing - never bypass the type-checker
- Use `Literal[...]` for enum-like sets, `typing.NewType` for primitives
- Pydantic models with `frozen=True` and `strict=True`
- Pure functions with injectable effect handlers for side-effects
- Descriptive names - no abbreviations or 3-letter acronyms
- Catch exceptions only where you can handle them meaningfully
- Use `@final` and immutability wherever applicable
## Testing
Tests use pytest-asyncio with `asyncio_mode = "auto"`. Tests are in `tests/` subdirectories alongside the code they test. The `EXO_TESTS=1` env var is set during tests.

1
CLAUDE.md Symbolic link
View File

@@ -0,0 +1 @@
AGENTS.md

1624
Cargo.lock generated
View File

File diff suppressed because it is too large Load Diff

View File

@@ -2,9 +2,7 @@
resolver = "3"
members = [
"rust/networking",
"rust/downloads",
"rust/exo_pyo3_bindings",
"rust/system_custodian",
"rust/util",
]
@@ -26,8 +24,6 @@ opt-level = 3
[workspace.dependencies]
## Crate members as common dependencies
networking = { path = "rust/networking" }
downloads = { path = "rust/downloads" }
system_custodian = { path = "rust/system_custodian" }
util = { path = "rust/util" }
# Proc-macro authoring tools

41
MISSED_THINGS.md Normal file
View File

@@ -0,0 +1,41 @@
# Missed things
[X] Log EXO_LIBP2P_NAMESPACE on start in exo/main.py
[X] Ordering of warmup was changed, which is wrong. It was changed to rank < n-1, then rank=n-1. It should be rank!=0 then rank=0 (this matches the auto_parallel implementation. NOTE: we use a different convention to mlx-lm, our terminal rank is rank=n-1 whereas mlx-lm is rank=0 hence i can see why this was changed wrongly).
[X] Downloads keying by model_id not shard_metadata (worker/plan.py, worker/main.py).
[X] Fetching download status of all models on start
[X] Deduplication of tasks in plan_step.
[X] resolve_allow_patterns should just be wildcard now.
[] no mx_barrier in genreate.py mlx_generate at the end.
[] cache assertion not needed in auto_parallel.py PipelineLastLayer.
[] GPTOSS support dropped in auto_parallel.py.
[] sharding changed "all-to-sharded" became _all_to_sharded in auto_parallel.py.
[] same as above with "sharded-to-all" became _sharded_to_all in auto_parallel.py.
[] Dropped support for Ministral3Model, DeepseekV32Model, Glm4MoeModel, Qwen3NextModel, GptOssMode in auto_parallel.py.
[] Dropped prefill/decode code in auto_parallel.py and utils_mlx.py.
[X] KV_CACHE_BITS should be None to disable quantized KV cache.
[] Dropped _set_nofile_limit in utils_mlx.py.
[] We have group optional in load_mlx_items in utils_mlx.py.
[] Dropped add_missing_chat_templates for GptOss in load_mlx_items in utils_mlx.py.
[] Dropped model.make_cache in make_kv_cache in utils_mlx.py.
[X] We put cache limit back in utils_mlx.py.
[] topology.py remove_node removes the connections after checking if node is is in self._node_id_to_rx_id_map. on beta_1 it checks after, so would remove stale connections I guess?
[] Missing Glm 4.7 model cards (this isn't ready yet but should be picked up, probably create an issue... the blocker is transforemrs version doesn't support the tokenizer for Glm 4.7. rc-1 does but we can't upgrade as it breaks other things.)
[] try-except in _command_processor only excepts ValueError. This was silently failing leading to un-debuggable errors (we had a KeyError that was happening ). Changed this to catch Exception instead of ValueError. See exo-v2 89ae38405e0052e3c22405daf094b065878aa873 and fb99fea69b5a39017efc90c5dad0072e677455f0.
[X] In placement.py, place_instance no longer looks at model_meta.supports_tensor and check if this tensor parallel number of nodes is supported by the model's tensor dimensions.
[X] In placement.py, place_instanec, we no longer have the special case to exclude DeepSeek v3.1 pipeline parallel (it doesn't work).
[] logger.warning("You have likely selected ibv for a single node instance; falling back to MlxRing") was changed to debug. That will spam this warning since it happens every time we query instance previews.
[X] In placement_utils.py, get_mlx_jaccl_coordinators, We no longer prioritise Jaccl Coordinator IP. Now it picks the first one, which is unstable (Jaccl coordinator over TB5 is unstable).
[X] Downloads keying by model_id not shard_metadata (worker/plan.py, worker/main.py).
[X] Fetching download status of all models on start
[X] Deduplication of tasks in plan_step.
[X] resolve_allow_patterns should just be wildcard now.
[X] KV_CACHE_BITS should be None to disable quantized KV cache.
[X] We put cache limit back in utils_mlx.py.
[X] In placement.py, place_instance no longer looks at model_meta.supports_tensor and check if this tensor parallel number of nodes is supported by the model's tensor dimensions.
[X] In placement.py, place_instanec, we no longer have the special case to exclude DeepSeek v3.1 pipeline parallel (it doesn't work).
[X] In placement_utils.py, get_mlx_jaccl_coordinators, We no longer prioritise Jaccl Coordinator IP. Now it picks the first one, which is unstable (Jaccl coordinator over TB5 is unstable).

View File

@@ -585,7 +585,7 @@
repositoryURL = "https://github.com/sparkle-project/Sparkle.git";
requirement = {
kind = upToNextMajorVersion;
minimumVersion = 2.8.1;
minimumVersion = 2.9.0-beta.1;
};
};
/* End XCRemoteSwiftPackageReference section */

View File

@@ -6,8 +6,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/sparkle-project/Sparkle.git",
"state" : {
"revision" : "5581748cef2bae787496fe6d61139aebe0a451f6",
"version" : "2.8.1"
"revision" : "e641adb41915a8409895e2e30666aa64e487b637",
"version" : "2.9.0-beta.1"
}
}
],

View File

@@ -56,6 +56,11 @@ struct ContentView: View {
}
private var shouldShowLocalNetworkWarning: Bool {
// Show warning if local network is not working and EXO is running.
// The checker uses a longer timeout on first launch to allow time for
// the permission prompt, so this correctly handles both:
// 1. User denied permission on first launch
// 2. Permission broke after restart (macOS TCC bug)
if case .notWorking = localNetworkChecker.status {
return controller.status != .stopped
}

View File

@@ -5,8 +5,8 @@ import os.log
/// Checks if the app's local network permission is actually functional.
///
/// macOS local network permission can appear enabled in System Preferences but not
/// actually work after a restart. This service detects this by creating a UDP
/// connection to the mDNS multicast address (224.0.0.251:5353).
/// actually work after a restart. This service uses NWConnection to mDNS multicast
/// to verify actual connectivity.
@MainActor
final class LocalNetworkChecker: ObservableObject {
enum Status: Equatable {
@@ -35,30 +35,43 @@ final class LocalNetworkChecker: ObservableObject {
}
private static let logger = Logger(subsystem: "io.exo.EXO", category: "LocalNetworkChecker")
private static let hasCompletedInitialCheckKey = "LocalNetworkChecker.hasCompletedInitialCheck"
@Published private(set) var status: Status = .unknown
@Published private(set) var lastConnectionState: String = "none"
private var connection: NWConnection?
private var checkTask: Task<Void, Never>?
/// Whether we've completed at least one check (stored in UserDefaults)
private var hasCompletedInitialCheck: Bool {
get { UserDefaults.standard.bool(forKey: Self.hasCompletedInitialCheckKey) }
set { UserDefaults.standard.set(newValue, forKey: Self.hasCompletedInitialCheckKey) }
}
/// Checks if local network access is working.
func check() {
checkTask?.cancel()
status = .checking
lastConnectionState = "connecting"
// Use longer timeout on first launch to allow time for permission prompt
let isFirstCheck = !hasCompletedInitialCheck
let timeout: UInt64 = isFirstCheck ? 30_000_000_000 : 3_000_000_000
checkTask = Task { [weak self] in
guard let self else { return }
let result = await self.performCheck()
Self.logger.info("Checking local network connectivity (first check: \(isFirstCheck))")
let result = await self.checkConnectivity(timeout: timeout)
self.status = result
self.hasCompletedInitialCheck = true
Self.logger.info("Local network check complete: \(result.displayText)")
}
}
private func performCheck() async -> Status {
Self.logger.info("Checking local network access via UDP multicast")
/// Checks connectivity using NWConnection to mDNS multicast.
/// The connection attempt triggers the permission prompt if not yet shown.
private func checkConnectivity(timeout: UInt64) async -> Status {
connection?.cancel()
connection = nil
@@ -84,22 +97,7 @@ final class LocalNetworkChecker: ObservableObject {
continuation.resume(returning: status)
}
conn.stateUpdateHandler = { [weak self] state in
let stateStr: String
switch state {
case .setup: stateStr = "setup"
case .preparing: stateStr = "preparing"
case .ready: stateStr = "ready"
case .waiting(let e): stateStr = "waiting(\(e))"
case .failed(let e): stateStr = "failed(\(e))"
case .cancelled: stateStr = "cancelled"
@unknown default: stateStr = "unknown"
}
Task { @MainActor in
self?.lastConnectionState = stateStr
}
conn.stateUpdateHandler = { state in
switch state {
case .ready:
resumeOnce(.working)
@@ -108,6 +106,7 @@ final class LocalNetworkChecker: ObservableObject {
if errorStr.contains("54") || errorStr.contains("ECONNRESET") {
resumeOnce(.notWorking(reason: "Connection blocked"))
}
// Otherwise keep waiting - might be showing permission prompt
case .failed(let error):
let errorStr = "\(error)"
if errorStr.contains("65") || errorStr.contains("EHOSTUNREACH")
@@ -127,7 +126,7 @@ final class LocalNetworkChecker: ObservableObject {
conn.start(queue: .main)
Task {
try? await Task.sleep(nanoseconds: 3_000_000_000)
try? await Task.sleep(nanoseconds: timeout)
let state = conn.state
switch state {
case .ready:

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import argparse
import contextlib
import http.client
import json
import os
@@ -26,7 +27,7 @@ class ExoHttpError(RuntimeError):
class ExoClient:
def __init__(self, host: str, port: int, timeout_s: float = 2400.0):
def __init__(self, host: str, port: int, timeout_s: float = 600.0):
self.host = host
self.port = port
self.timeout_s = timeout_s
@@ -104,22 +105,46 @@ def runner_ready(runner: dict[str, Any]) -> bool:
return "RunnerReady" in runner
def runner_failed(runner: dict[str, Any]) -> bool:
return "RunnerFailed" in runner
def get_runner_failed_message(runner: dict[str, Any]) -> str | None:
if "RunnerFailed" in runner:
return runner["RunnerFailed"].get("errorMessage")
return None
def wait_for_instance_ready(
client: ExoClient, instance_id: str, timeout: float = 24000.0
) -> None:
start_time = time.time()
instance_existed = False
while time.time() - start_time < timeout:
state = client.request_json("GET", "/state")
instances = state.get("instances", {})
if instance_id not in instances:
if instance_existed:
# Instance was deleted after being created - likely due to runner failure
raise RuntimeError(
f"Instance {instance_id} was deleted (runner may have failed)"
)
time.sleep(0.1)
continue
instance_existed = True
instance = instances[instance_id]
runner_ids = runner_ids_from_instance(instance)
runners = state.get("runners", {})
# Check for failed runners first
for rid in runner_ids:
runner = runners.get(rid, {})
if runner_failed(runner):
error_msg = get_runner_failed_message(runner) or "Unknown error"
raise RuntimeError(f"Runner {rid} failed: {error_msg}")
if all(runner_ready(runners.get(rid, {})) for rid in runner_ids):
return
@@ -241,6 +266,9 @@ class PromptSizer:
ids = tokenizer.apply_chat_template(
messages, tokenize=True, add_generation_prompt=True
)
# Fix for transformers 5.x
if hasattr(ids, "input_ids"):
ids = ids.input_ids
return int(len(ids))
return count_fn
@@ -296,6 +324,12 @@ def main() -> int:
default=4,
help="Only consider placements using <= this many nodes.",
)
ap.add_argument(
"--min-nodes",
type=int,
default=1,
help="Only consider placements using >= this many nodes.",
)
ap.add_argument(
"--instance-meta", choices=["ring", "jaccl", "both"], default="both"
)
@@ -317,7 +351,7 @@ def main() -> int:
help="Warmup runs per placement (uses first pp/tg).",
)
ap.add_argument(
"--timeout", type=float, default=2400.0, help="HTTP timeout (seconds)."
"--timeout", type=float, default=600.0, help="HTTP timeout (seconds)."
)
ap.add_argument(
"--json-out",
@@ -396,7 +430,7 @@ def main() -> int:
):
continue
if 0 < n <= args.max_nodes:
if args.min_nodes <= n <= args.max_nodes:
selected.append(p)
if not selected:
@@ -438,7 +472,13 @@ def main() -> int:
)
client.request_json("POST", "/instance", body={"instance": instance})
wait_for_instance_ready(client, instance_id)
try:
wait_for_instance_ready(client, instance_id)
except (RuntimeError, TimeoutError) as e:
logger.error(f"Failed to initialize placement: {e}")
with contextlib.suppress(ExoHttpError):
client.request_json("DELETE", f"/instance/{instance_id}")
continue
time.sleep(1)

60
dashboard/dashboard.nix Normal file
View File

@@ -0,0 +1,60 @@
{ lib
, config
, dream2nix
, ...
}:
let
# Read and parse the lock file
rawLockFile = builtins.fromJSON (builtins.readFile "${config.deps.dashboardSrc}/package-lock.json");
# For packages with bundleDependencies, filter out deps that are bundled
# (bundled deps are inside the tarball, not separate lockfile entries)
fixedPackages = lib.mapAttrs
(path: entry:
if entry ? bundleDependencies && entry.bundleDependencies != [ ]
then entry // {
dependencies = lib.filterAttrs
(name: _: !(lib.elem name entry.bundleDependencies))
(entry.dependencies or { });
}
else entry
)
(rawLockFile.packages or { });
fixedLockFile = rawLockFile // { packages = fixedPackages; };
in
{
imports = [
dream2nix.modules.dream2nix.nodejs-package-lock-v3
dream2nix.modules.dream2nix.nodejs-granular-v3
];
name = "exo-dashboard";
version = "1.0.0";
mkDerivation = {
src = config.deps.dashboardSrc;
buildPhase = ''
runHook preBuild
npm run build
runHook postBuild
'';
installPhase = ''
runHook preInstall
cp -r build $out/build
runHook postInstall
'';
};
deps = { nixpkgs, ... }: {
inherit (nixpkgs) stdenv;
dashboardSrc = null; # Injected by parts.nix
};
nodejs-package-lock-v3 = {
# Don't use packageLockFile - provide the fixed lock content directly
packageLock = fixedLockFile;
};
}

View File

@@ -863,6 +863,7 @@
"integrity": "sha512-oH8tXw7EZnie8FdOWYrF7Yn4IKrqTFHhXvl8YxXxbKwTMcD/5NNCryUSEXRk2ZR4ojnub0P8rNrsVGHXWqIDtA==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@standard-schema/spec": "^1.0.0",
"@sveltejs/acorn-typescript": "^1.0.5",
@@ -902,6 +903,7 @@
"integrity": "sha512-Y1Cs7hhTc+a5E9Va/xwKlAJoariQyHY+5zBgCZg4PFWNYQ1nMN9sjK1zhw1gK69DuqVP++sht/1GZg1aRwmAXQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@sveltejs/vite-plugin-svelte-inspector": "^4.0.1",
"debug": "^4.4.1",
@@ -1518,6 +1520,7 @@
"integrity": "sha512-LCCV0HdSZZZb34qifBsyWlUmok6W7ouER+oQIGBScS8EsZsQbrtFTUrDX4hOl+CS6p7cnNC4td+qrSVGSCTUfQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"undici-types": "~6.21.0"
}
@@ -1527,6 +1530,7 @@
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz",
"integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==",
"license": "MIT",
"peer": true,
"bin": {
"acorn": "bin/acorn"
},
@@ -1939,6 +1943,7 @@
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
"dev": true,
"license": "ISC",
"peer": true,
"engines": {
"node": ">=12"
}
@@ -2646,6 +2651,7 @@
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=12"
},
@@ -2833,6 +2839,7 @@
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.45.3.tgz",
"integrity": "sha512-ngKXNhNvwPzF43QqEhDOue7TQTrG09em1sd4HBxVF0Wr2gopAmdEWan+rgbdgK4fhBtSOTJO8bYU4chUG7VXZQ==",
"license": "MIT",
"peer": true,
"dependencies": {
"@jridgewell/remapping": "^2.3.4",
"@jridgewell/sourcemap-codec": "^1.5.0",
@@ -2977,6 +2984,7 @@
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
"dev": true,
"license": "Apache-2.0",
"peer": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
@@ -2998,6 +3006,7 @@
"integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"esbuild": "^0.25.0",
"fdir": "^6.4.4",

44
dashboard/parts.nix Normal file
View File

@@ -0,0 +1,44 @@
{ inputs, ... }:
{
perSystem =
{ pkgs, lib, ... }:
let
# Filter source to only include dashboard directory
src = lib.cleanSourceWith {
src = inputs.self;
filter =
path: type:
let
baseName = builtins.baseNameOf path;
inDashboardDir =
(lib.hasInfix "/dashboard/" path)
|| (lib.hasSuffix "/dashboard" (builtins.dirOf path))
|| (baseName == "dashboard" && type == "directory");
in
inDashboardDir;
};
# Build the dashboard with dream2nix (includes node_modules in output)
dashboardFull = inputs.dream2nix.lib.evalModules {
packageSets.nixpkgs = pkgs;
modules = [
./dashboard.nix
{
paths.projectRoot = inputs.self;
paths.projectRootFile = "flake.nix";
paths.package = inputs.self + "/dashboard";
}
# Inject the filtered source
{
deps.dashboardSrc = lib.mkForce "${src}/dashboard";
}
];
};
in
{
# Extract just the static site from the full build
packages.dashboard = pkgs.runCommand "exo-dashboard" { } ''
cp -r ${dashboardFull}/build $out
'';
};
}

View File

@@ -60,12 +60,39 @@
return models;
});
// Auto-select the first available model if none is selected
// Track previous model IDs to detect newly added models (plain variable to avoid reactive loop)
let previousModelIds: Set<string> = new Set();
// Auto-select the first available model if none is selected, if current selection is stale, or if a new model is added
$effect(() => {
const models = availableModels();
if (models.length > 0 && !currentModel) {
setSelectedChatModel(models[0].id);
const currentModelIds = new Set(models.map(m => m.id));
if (models.length > 0) {
// Find newly added models (in current but not in previous)
const newModels = models.filter(m => !previousModelIds.has(m.id));
// If no model selected, select the first available
if (!currentModel) {
setSelectedChatModel(models[0].id);
}
// If current model is stale (no longer has a running instance), reset to first available
else if (!models.some(m => m.id === currentModel)) {
setSelectedChatModel(models[0].id);
}
// If a new model was just added, select it
else if (newModels.length > 0 && previousModelIds.size > 0) {
setSelectedChatModel(newModels[0].id);
}
} else {
// No instances running - clear the selected model
if (currentModel) {
setSelectedChatModel('');
}
}
// Update previous model IDs for next comparison
previousModelIds = currentModelIds;
});
function getInstanceModelId(instanceWrapped: unknown): string {

View File

@@ -400,10 +400,8 @@ function toggleInstanceDownloadDetails(nodeId: string): void {
const errorText = await response.text();
console.error('Failed to launch instance:', errorText);
} else {
// Auto-select the launched model only if no model is currently selected
if (!selectedChatModel()) {
setSelectedChatModel(modelId);
}
// Always auto-select the newly launched model so the user chats to what they just launched
setSelectedChatModel(modelId);
// Scroll to the bottom of instances container to show the new instance
// Use multiple attempts to ensure DOM has updated with the new instance
@@ -763,6 +761,10 @@ function toggleInstanceDownloadDetails(nodeId: string): void {
async function deleteInstance(instanceId: string) {
if (!confirm(`Delete instance ${instanceId.slice(0, 8)}...?`)) return;
// Get the model ID of the instance being deleted before we delete it
const deletedInstanceModelId = getInstanceModelId(instanceData[instanceId]);
const wasSelected = selectedChatModel() === deletedInstanceModelId;
try {
const response = await fetch(`/instance/${instanceId}`, {
method: 'DELETE',
@@ -771,6 +773,24 @@ function toggleInstanceDownloadDetails(nodeId: string): void {
if (!response.ok) {
console.error('Failed to delete instance:', response.status);
} else if (wasSelected) {
// If we deleted the currently selected model, switch to another available model
// Find another instance that isn't the one we just deleted
const remainingInstances = Object.entries(instanceData).filter(([id]) => id !== instanceId);
if (remainingInstances.length > 0) {
// Select the last instance (most recently added, since objects preserve insertion order)
const [, lastInstance] = remainingInstances[remainingInstances.length - 1];
const newModelId = getInstanceModelId(lastInstance);
if (newModelId && newModelId !== 'Unknown' && newModelId !== 'Unknown Model') {
setSelectedChatModel(newModelId);
} else {
// Clear selection if no valid model found
setSelectedChatModel('');
}
} else {
// No more instances, clear the selection
setSelectedChatModel('');
}
}
} catch (error) {
console.error('Error deleting instance:', error);

185
flake.lock generated
View File

@@ -1,5 +1,42 @@
{
"nodes": {
"crane": {
"locked": {
"lastModified": 1767744144,
"narHash": "sha256-9/9ntI0D+HbN4G0TrK3KmHbTvwgswz7p8IEJsWyef8Q=",
"owner": "ipetkov",
"repo": "crane",
"rev": "2fb033290bf6b23f226d4c8b32f7f7a16b043d7e",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"dream2nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"purescript-overlay": "purescript-overlay",
"pyproject-nix": "pyproject-nix"
},
"locked": {
"lastModified": 1765953015,
"narHash": "sha256-5FBZbbWR1Csp3Y2icfRkxMJw/a/5FGg8hCXej2//bbI=",
"owner": "nix-community",
"repo": "dream2nix",
"rev": "69eb01fa0995e1e90add49d8ca5bcba213b0416f",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "dream2nix",
"type": "github"
}
},
"fenix": {
"inputs": {
"nixpkgs": [
@@ -8,11 +45,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1761893049,
"narHash": "sha256-1TtFDPhC+ZsrOOtBnry1EZC+WipTTvsOVjIEVugqji8=",
"lastModified": 1768287139,
"narHash": "sha256-nsXFt0OzUi6K7dUzzJD5/v9e0Ic+fvclfIW936/43ZM=",
"owner": "nix-community",
"repo": "fenix",
"rev": "c2ac9a5c0d6d16630c3b225b874bd14528d1abe6",
"rev": "a4a3aa956931f90f35453cb519e4545e9ad7f773",
"type": "github"
},
"original": {
@@ -21,25 +58,59 @@
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"lastModified": 1696426674,
"narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"owner": "edolstra",
"repo": "flake-compat",
"type": "github"
}
},
"flake-parts": {
"inputs": {
"nixpkgs-lib": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1768135262,
"narHash": "sha256-PVvu7OqHBGWN16zSi6tEmPwwHQ4rLPU9Plvs8/1TUBY=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "80daad04eddbbf5a4d883996a73f3f542fa437ac",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1768127708,
"narHash": "sha256-1Sm77VfZh3mU0F5OqKABNLWxOuDeHIlcFjsXeeiPazs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "ffbc9f8cbaacfb331b6017d5a5abb21a492c9a38",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-swift": {
"locked": {
"lastModified": 1761672384,
"narHash": "sha256-o9KF3DJL7g7iYMZq9SWgfS1BFlNbsm6xplRjVlOCkXI=",
@@ -50,27 +121,74 @@
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"rev": "08dacfca559e1d7da38f3cf05f1f45ee9bfd213c",
"type": "github"
}
},
"purescript-overlay": {
"inputs": {
"flake-compat": "flake-compat",
"nixpkgs": [
"dream2nix",
"nixpkgs"
],
"slimlock": "slimlock"
},
"locked": {
"lastModified": 1728546539,
"narHash": "sha256-Sws7w0tlnjD+Bjck1nv29NjC5DbL6nH5auL9Ex9Iz2A=",
"owner": "thomashoneyman",
"repo": "purescript-overlay",
"rev": "4ad4c15d07bd899d7346b331f377606631eb0ee4",
"type": "github"
},
"original": {
"owner": "thomashoneyman",
"repo": "purescript-overlay",
"type": "github"
}
},
"pyproject-nix": {
"inputs": {
"nixpkgs": [
"dream2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1763017646,
"narHash": "sha256-Z+R2lveIp6Skn1VPH3taQIuMhABg1IizJd8oVdmdHsQ=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "47bd6f296502842643078d66128f7b5e5370790c",
"type": "github"
},
"original": {
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"type": "github"
}
},
"root": {
"inputs": {
"crane": "crane",
"dream2nix": "dream2nix",
"fenix": "fenix",
"flake-utils": "flake-utils",
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs",
"nixpkgs-swift": "nixpkgs-swift",
"treefmt-nix": "treefmt-nix"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1761849405,
"narHash": "sha256-igXdvC+WCUN+3gnfk+ptT7rMmxQuY6WbIg1rXMUN1DM=",
"lastModified": 1768224240,
"narHash": "sha256-Pp1dDrXKPBUJReZnnDElFyHYn67XTd48zRhToheLjtk=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "f7de8ae045a5fe80f1203c5a1c3015b05f7c3550",
"rev": "725349602e525df37f377701e001fe8aab807878",
"type": "github"
},
"original": {
@@ -80,18 +198,25 @@
"type": "github"
}
},
"systems": {
"slimlock": {
"inputs": {
"nixpkgs": [
"dream2nix",
"purescript-overlay",
"nixpkgs"
]
},
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"lastModified": 1688756706,
"narHash": "sha256-xzkkMv3neJJJ89zo3o2ojp7nFeaZc2G0fYwNXNJRFlo=",
"owner": "thomashoneyman",
"repo": "slimlock",
"rev": "cf72723f59e2340d24881fd7bf61cb113b4c407c",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"owner": "thomashoneyman",
"repo": "slimlock",
"type": "github"
}
},
@@ -102,11 +227,11 @@
]
},
"locked": {
"lastModified": 1762938485,
"narHash": "sha256-AlEObg0syDl+Spi4LsZIBrjw+snSVU4T8MOeuZJUJjM=",
"lastModified": 1768158989,
"narHash": "sha256-67vyT1+xClLldnumAzCTBvU0jLZ1YBcf4vANRWP3+Ak=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "5b4ee75aeefd1e2d5a1cc43cf6ba65eba75e83e4",
"rev": "e96d59dff5c0d7fddb9d113ba108f03c3ef99eca",
"type": "github"
},
"original": {

210
flake.nix
View File

@@ -3,132 +3,134 @@
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
# Provides Rust dev-env integration:
flake-parts = {
url = "github:hercules-ci/flake-parts";
inputs.nixpkgs-lib.follows = "nixpkgs";
};
crane.url = "github:ipetkov/crane";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
# Provides formatting infrastructure:
treefmt-nix = {
url = "github:numtide/treefmt-nix";
inputs.nixpkgs.follows = "nixpkgs";
};
dream2nix = {
url = "github:nix-community/dream2nix";
inputs.nixpkgs.follows = "nixpkgs";
};
# Pinned nixpkgs for swift-format (swift is broken on x86_64-linux in newer nixpkgs)
nixpkgs-swift.url = "github:NixOS/nixpkgs/08dacfca559e1d7da38f3cf05f1f45ee9bfd213c";
};
# TODO: figure out caching story
# nixConfig = {
# # nix community cachix
# extra-trusted-public-keys = "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=";
# extra-substituters = "https://nix-community.cachix.org";
# };
nixConfig = {
extra-trusted-public-keys = "exo.cachix.org-1:okq7hl624TBeAR3kV+g39dUFSiaZgLRkLsFBCuJ2NZI=";
extra-substituters = "https://exo.cachix.org";
};
outputs =
inputs:
let
inputs.flake-parts.lib.mkFlake { inherit inputs; } {
systems = [
"x86_64-linux"
"aarch64-darwin"
"aarch64-linux"
];
fenixToolchain = system: inputs.fenix.packages.${system}.complete;
in
inputs.flake-utils.lib.eachSystem systems (
system:
let
pkgs = import inputs.nixpkgs {
inherit system;
overlays = [ inputs.fenix.overlays.default ];
};
treefmtEval = inputs.treefmt-nix.lib.evalModule pkgs {
projectRootFile = "flake.nix";
programs = {
nixpkgs-fmt.enable = true;
ruff-format = {
enable = true;
excludes = [ "rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi" ];
imports = [
inputs.treefmt-nix.flakeModule
./dashboard/parts.nix
./rust/parts.nix
];
perSystem =
{ config, self', inputs', pkgs, lib, system, ... }:
let
fenixToolchain = inputs'.fenix.packages.complete;
# Use pinned nixpkgs for swift-format (swift is broken on x86_64-linux in newer nixpkgs)
pkgsSwift = import inputs.nixpkgs-swift { inherit system; };
in
{
treefmt = {
projectRootFile = "flake.nix";
programs = {
nixpkgs-fmt.enable = true;
ruff-format = {
enable = true;
excludes = [ "rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi" ];
};
rustfmt = {
enable = true;
package = config.rust.toolchain;
};
prettier = {
enable = true;
includes = [ "*.ts" ];
};
swift-format = {
enable = true;
package = pkgsSwift.swiftPackages.swift-format;
};
};
rustfmt = {
enable = true;
package = (fenixToolchain system).rustfmt;
};
prettier = {
enable = true;
includes = [ "*.ts" ];
};
swift-format.enable = true;
};
};
in
{
formatter = treefmtEval.config.build.wrapper;
checks.formatting = treefmtEval.config.build.check inputs.self;
checks.lint = pkgs.runCommand "lint-check" { } ''
export RUFF_CACHE_DIR="$TMPDIR/ruff-cache"
${pkgs.ruff}/bin/ruff check ${inputs.self}/
touch $out
'';
devShells.default = pkgs.mkShell {
packages =
with pkgs;
[
# FORMATTING
treefmtEval.config.build.wrapper
# PYTHON
python313
uv
ruff
basedpyright
# RUST
((fenixToolchain system).withComponents [
"cargo"
"rustc"
"clippy"
"rustfmt"
"rust-src"
])
rustup # Just here to make RustRover happy
# NIX
nixpkgs-fmt
# SVELTE
nodejs
# MISC
just
jq
]
++ (pkgs.lib.optionals pkgs.stdenv.isLinux [
# IFCONFIG
unixtools.ifconfig
# Build dependencies for Linux
pkg-config
openssl
])
++ (pkgs.lib.optionals pkgs.stdenv.isDarwin [
# MACMON
macmon
]);
shellHook = ''
# PYTHON
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:${pkgs.python313}/lib"
${pkgs.lib.optionalString pkgs.stdenv.isLinux ''
# Build environment for Linux
export PKG_CONFIG_PATH="${pkgs.openssl.dev}/lib/pkgconfig:$PKG_CONFIG_PATH"
export LD_LIBRARY_PATH="${pkgs.openssl.out}/lib:$LD_LIBRARY_PATH"
''}
echo
echo "🍎🍎 Run 'just <recipe>' to get started"
just --list
checks.lint = pkgs.runCommand "lint-check" { } ''
export RUFF_CACHE_DIR="$TMPDIR/ruff-cache"
${pkgs.ruff}/bin/ruff check ${inputs.self}/
touch $out
'';
devShells.default = with pkgs; pkgs.mkShell {
inputsFrom = [ self'.checks.cargo-build ];
packages =
[
# FORMATTING
config.treefmt.build.wrapper
# PYTHON
python313
uv
ruff
basedpyright
# RUST
config.rust.toolchain
maturin
# NIX
nixpkgs-fmt
# SVELTE
nodejs
# MISC
just
jq
]
++ lib.optionals stdenv.isLinux [
unixtools.ifconfig
]
++ lib.optionals stdenv.isDarwin [
macmon
];
OPENSSL_NO_VENDOR = "1";
shellHook = ''
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:${python313}/lib"
${lib.optionalString stdenv.isLinux ''
export LD_LIBRARY_PATH="${openssl.out}/lib:$LD_LIBRARY_PATH"
''}
'';
};
};
}
);
};
}

View File

@@ -1,3 +1,5 @@
export NIX_CONFIG := "extra-experimental-features = nix-command flakes"
fmt:
nix fmt

View File

@@ -17,12 +17,13 @@ dependencies = [
"loguru>=0.7.3",
"exo_pyo3_bindings", # rust bindings
"anyio==4.11.0",
"mlx>=0.30.1; sys_platform == 'darwin'",
"mlx[cpu]>=0.30.1; sys_platform == 'linux'",
"mlx-lm>=0.28.3",
"mlx==0.30.1; sys_platform == 'darwin'",
"mlx[cpu]==0.30.1; sys_platform == 'linux'",
"mlx-lm @ git+https://github.com/AlexCheema/mlx-lm.git@fix-transformers-5.0.0rc2",
"tiktoken>=0.12.0", # required for kimi k2 tokenizer
"hypercorn>=0.18.0",
"openai-harmony>=0.0.8",
"httpx>=0.28.1",
]
[project.scripts]
@@ -33,6 +34,7 @@ exo = "exo.main:main"
# dependencies only required for development
[dependency-groups]
dev = [
"basedpyright>=1.29.0",
"pyinstaller>=6.17.0",
"pytest>=8.4.0",
"pytest-asyncio>=1.0.0",
@@ -98,6 +100,7 @@ root = "src"
# supported platforms for this project
[tool.uv]
prerelease = "allow"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",

View File

@@ -1,40 +0,0 @@
[package]
name = "downloads"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
name = "downloads"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
# macro dependencies
derive_more = { workspace = true }
# async
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
futures-util = { workspace = true }
# utility dependencies
util = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
itertools = { workspace = true }
# tracing/logging
log = { workspace = true }
# BitTorrent library
librqbit = { git = "https://github.com/JakeHillion/rqbit", rev = "c4e2ecf81d03bd8acd96a0803d06a70b34d5da19" }
# Embed torrent files
include_dir = "0.7"
# Serialization
serde = { version = "1.0", features = ["derive"] }

View File

@@ -1,162 +0,0 @@
//! Bencode encoding for BitTorrent tracker responses
//!
//! Implements the subset of bencoding needed for tracker announce responses.
use std::collections::BTreeMap;
/// Parameters from a tracker announce request
#[derive(Debug, Clone)]
pub struct AnnounceParams {
/// 20-byte info hash of the torrent
pub info_hash: [u8; 20],
/// 20-byte peer ID of the client
pub peer_id: [u8; 20],
/// Port the client is listening on
pub port: u16,
/// Total bytes uploaded
pub uploaded: u64,
/// Total bytes downloaded
pub downloaded: u64,
/// Bytes remaining to download
pub left: u64,
/// Whether to return compact peer list (6 bytes per peer)
pub compact: bool,
/// Optional event (started, stopped, completed)
pub event: Option<AnnounceEvent>,
}
/// Announce event types
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AnnounceEvent {
Started,
Stopped,
Completed,
}
/// A bencoded value
#[derive(Debug, Clone)]
pub enum BencodeValue {
Integer(i64),
Bytes(Vec<u8>),
List(Vec<BencodeValue>),
Dict(BTreeMap<Vec<u8>, BencodeValue>),
}
impl BencodeValue {
/// Create a string value from a &str
#[inline]
pub fn string(s: &str) -> Self {
Self::Bytes(s.as_bytes().to_vec())
}
/// Create an integer value
#[inline]
pub fn integer(i: i64) -> Self {
Self::Integer(i)
}
/// Create an empty list
#[inline]
pub fn list() -> Self {
Self::List(Vec::new())
}
/// Create an empty dict
#[inline]
pub fn dict() -> Self {
Self::Dict(BTreeMap::new())
}
/// Add an item to a list (builder pattern)
#[inline]
pub fn push(mut self, value: BencodeValue) -> Self {
if let Self::List(ref mut list) = self {
list.push(value);
}
self
}
/// Insert a key-value pair into a dict (builder pattern)
#[inline]
pub fn insert(mut self, key: &str, value: BencodeValue) -> Self {
if let Self::Dict(ref mut dict) = self {
dict.insert(key.as_bytes().to_vec(), value);
}
self
}
/// Encode to bencoded bytes
pub fn encode(&self) -> Vec<u8> {
let mut buf = Vec::new();
self.encode_into(&mut buf);
buf
}
/// Encode into an existing buffer
pub fn encode_into(&self, buf: &mut Vec<u8>) {
match self {
Self::Integer(i) => {
buf.push(b'i');
buf.extend_from_slice(i.to_string().as_bytes());
buf.push(b'e');
}
Self::Bytes(bytes) => {
buf.extend_from_slice(bytes.len().to_string().as_bytes());
buf.push(b':');
buf.extend_from_slice(bytes);
}
Self::List(list) => {
buf.push(b'l');
for item in list {
item.encode_into(buf);
}
buf.push(b'e');
}
Self::Dict(dict) => {
buf.push(b'd');
// BTreeMap keeps keys sorted
for (key, value) in dict {
buf.extend_from_slice(key.len().to_string().as_bytes());
buf.push(b':');
buf.extend_from_slice(key);
value.encode_into(buf);
}
buf.push(b'e');
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encode_integer() {
assert_eq!(BencodeValue::integer(42).encode(), b"i42e");
assert_eq!(BencodeValue::integer(-1).encode(), b"i-1e");
assert_eq!(BencodeValue::integer(0).encode(), b"i0e");
}
#[test]
fn test_encode_string() {
assert_eq!(BencodeValue::string("spam").encode(), b"4:spam");
assert_eq!(BencodeValue::string("").encode(), b"0:");
}
#[test]
fn test_encode_list() {
let list = BencodeValue::list()
.push(BencodeValue::string("spam"))
.push(BencodeValue::integer(42));
assert_eq!(list.encode(), b"l4:spami42ee");
}
#[test]
fn test_encode_dict() {
let dict = BencodeValue::dict()
.insert("bar", BencodeValue::string("spam"))
.insert("foo", BencodeValue::integer(42));
assert_eq!(dict.encode(), b"d3:bar4:spam3:fooi42ee");
}
}

View File

@@ -1,108 +0,0 @@
//! Embedded torrent file access
//!
//! Provides access to .torrent files embedded in the binary at compile time.
//! Each model/revision can have multiple torrent variants (e.g., "small", "large").
use include_dir::{Dir, include_dir};
/// Embedded torrent files directory
static TORRENTS: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/torrents");
/// Get all embedded torrent variants for a model_id and revision
///
/// # Arguments
/// * `model_id` - Model identifier (e.g., "mlx-community/Qwen3-30B-A3B-4bit")
/// * `revision` - Git commit hash
///
/// # Returns
/// Vec of (variant_name, torrent_data) tuples, e.g., [("small", data), ("large", data)]
/// Returns empty Vec if no torrents found for this model/revision.
#[inline]
pub fn get_embedded_torrents(model_id: &str, revision: &str) -> Vec<(String, Vec<u8>)> {
let dir_path = format!("{model_id}");
let Some(model_dir) = TORRENTS.get_dir(&dir_path) else {
return Vec::new();
};
let mut results = Vec::new();
let prefix = format!("{revision}.");
let suffix = ".torrent";
for file in model_dir.files() {
let Some(name) = file.path().file_name().and_then(|n| n.to_str()) else {
continue;
};
// Match files like "{revision}.small.torrent" or "{revision}.large.torrent"
if name.starts_with(&prefix) && name.ends_with(suffix) {
// Extract variant: "{revision}.{variant}.torrent" -> "{variant}"
let middle = &name[prefix.len()..name.len() - suffix.len()];
// Skip plain "{revision}.torrent" files (wrong format)
if middle.is_empty() {
continue;
}
results.push((middle.to_string(), file.contents().to_vec()));
}
}
// Sort by variant name for consistent ordering
results.sort_by(|a, b| a.0.cmp(&b.0));
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_embedded_torrents() {
// Test with the Qwen3 torrent we have
let result = get_embedded_torrents(
"mlx-community/Qwen3-30B-A3B-4bit",
"d388dead1515f5e085ef7a0431dd8fadf0886c57",
);
assert!(!result.is_empty(), "Expected to find embedded torrents");
// Should have both small and large variants
let variants: Vec<&str> = result.iter().map(|(v, _)| v.as_str()).collect();
assert!(
variants.contains(&"small"),
"Expected 'small' variant, got: {variants:?}"
);
assert!(
variants.contains(&"large"),
"Expected 'large' variant, got: {variants:?}"
);
// Verify data is not empty
for (variant, data) in &result {
assert!(!data.is_empty(), "Torrent data for '{variant}' should not be empty");
}
}
#[test]
fn test_missing_torrent() {
let result = get_embedded_torrents("nonexistent/model", "abc123");
assert!(result.is_empty(), "Expected empty Vec for missing torrent");
}
#[test]
fn test_variant_ordering() {
let result = get_embedded_torrents(
"mlx-community/Qwen3-30B-A3B-4bit",
"d388dead1515f5e085ef7a0431dd8fadf0886c57",
);
if result.len() >= 2 {
// Verify alphabetical ordering
let variants: Vec<&str> = result.iter().map(|(v, _)| v.as_str()).collect();
let mut sorted = variants.clone();
sorted.sort();
assert_eq!(variants, sorted, "Variants should be sorted alphabetically");
}
}
}

View File

@@ -1,22 +0,0 @@
//! BitTorrent-based download system for model shards using rqbit
//!
//! This crate provides:
//! - Torrent session management via rqbit
//! - Embedded torrent file access
//! - Private tracker announce handling
//! - Selective file download based on shard layer ranges
#![allow(clippy::missing_inline_in_public_items)]
pub mod bencode;
pub mod embedded;
pub mod progress;
pub mod session;
pub mod torrent_files;
pub mod tracker;
pub use bencode::AnnounceParams;
pub use embedded::get_embedded_torrents;
pub use session::{DownloadProgress, TorrentSession};
pub use torrent_files::{get_torrent_file_list, TorrentFileInfo};
pub use tracker::{handle_announce, PeerInfo, TopologyData};

View File

@@ -1,77 +0,0 @@
//! Download progress tracking
//!
//! Types for tracking and reporting download progress to Python
use std::collections::HashMap;
/// Progress update for a torrent download
#[derive(Debug, Clone)]
pub struct DownloadProgress {
/// Total bytes to download
pub total_bytes: u64,
/// Bytes downloaded so far
pub downloaded_bytes: u64,
/// Number of pieces completed
pub pieces_completed: usize,
/// Total number of pieces
pub total_pieces: usize,
/// Number of peers connected
pub peers_connected: usize,
/// Download speed in bytes/second
pub speed_bytes_per_sec: f64,
/// Estimated time remaining in seconds
pub eta_seconds: Option<f64>,
/// Per-file progress
pub files: HashMap<String, FileProgress>,
}
#[derive(Debug, Clone)]
pub struct FileProgress {
/// Total file size
pub total_bytes: u64,
/// Bytes downloaded for this file
pub downloaded_bytes: u64,
/// Whether the file is complete
pub complete: bool,
}
impl DownloadProgress {
#[inline]
pub fn new(total_bytes: u64, total_pieces: usize) -> Self {
Self {
total_bytes,
downloaded_bytes: 0,
pieces_completed: 0,
total_pieces,
peers_connected: 0,
speed_bytes_per_sec: 0.0,
eta_seconds: None,
files: HashMap::new(),
}
}
#[inline]
pub fn progress_fraction(&self) -> f64 {
if self.total_bytes == 0 {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let fraction = self.downloaded_bytes as f64 / self.total_bytes as f64;
fraction
}
}
#[inline]
pub fn is_complete(&self) -> bool {
self.pieces_completed >= self.total_pieces
}
}

View File

@@ -1,166 +0,0 @@
//! Torrent session management using rqbit
//!
//! Provides a wrapper around rqbit's Session for managing torrent downloads
//! with persistent seeding and selective file downloads.
use anyhow::{Context, Result};
use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ManagedTorrent, Session, SessionOptions, SessionPersistenceConfig};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Download progress information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadProgress {
pub downloaded_bytes: u64,
pub total_bytes: u64,
pub download_speed: f64,
pub upload_speed: f64,
pub peers_connected: usize,
pub is_finished: bool,
}
/// Torrent session handle for managing multiple torrents
pub struct TorrentSession {
session: Arc<Session>,
api: Arc<Api>,
session_dir: PathBuf,
torrents: Arc<RwLock<HashMap<String, Arc<ManagedTorrent>>>>,
}
impl TorrentSession {
/// Create a new torrent session
///
/// # Arguments
/// * `session_dir` - Directory to store session state and downloaded files
pub async fn new(session_dir: PathBuf) -> Result<Self> {
std::fs::create_dir_all(&session_dir).context("Failed to create session directory")?;
let opts = SessionOptions {
disable_dht: false,
disable_dht_persistence: false,
dht_config: None,
persistence: Some(SessionPersistenceConfig::Json { folder: None }),
fastresume: true,
..Default::default()
};
let session = Session::new_with_opts(session_dir.clone(), opts)
.await
.context("Failed to create rqbit session")?;
let api = Api::new(Arc::clone(&session), None);
Ok(Self {
session,
api: Arc::new(api),
session_dir,
torrents: Arc::new(RwLock::new(HashMap::new())),
})
}
/// Add a torrent from raw bytes
///
/// # Arguments
/// * `torrent_data` - Raw .torrent file contents
/// * `save_path` - Where to save the downloaded files
/// * `file_indices` - Optional list of file indices to download (None = all files)
///
/// # Returns
/// Info hash as hex string
pub async fn add_torrent(
&self,
torrent_data: Vec<u8>,
save_path: PathBuf,
file_indices: Option<Vec<usize>>,
) -> Result<String> {
let opts = AddTorrentOptions {
overwrite: false,
only_files_regex: None,
only_files: file_indices,
output_folder: Some(save_path.to_string_lossy().to_string()),
..Default::default()
};
let add_torrent = AddTorrent::from_bytes(torrent_data);
let response = self
.session
.add_torrent(add_torrent, Some(opts))
.await
.context("Failed to add torrent")?;
let handle = match response {
AddTorrentResponse::Added(_, handle) => handle,
AddTorrentResponse::AlreadyManaged(_, handle) => handle,
AddTorrentResponse::ListOnly(_) => anyhow::bail!("Torrent was list-only, not added"),
};
let info_hash = handle.info_hash().as_string();
self.torrents
.write()
.await
.insert(info_hash.clone(), handle);
Ok(info_hash)
}
/// Get download progress for a torrent
pub async fn get_progress(&self, info_hash: &str) -> Result<DownloadProgress> {
let torrents = self.torrents.read().await;
let handle = torrents.get(info_hash).context("Torrent not found")?;
let stats = handle.stats();
Ok(DownloadProgress {
downloaded_bytes: stats.progress_bytes,
total_bytes: stats.total_bytes,
download_speed: stats.live.as_ref().map_or(0.0, |l| l.download_speed.mbps * 1024.0 * 1024.0),
upload_speed: stats.live.as_ref().map_or(0.0, |l| l.upload_speed.mbps * 1024.0 * 1024.0),
peers_connected: stats.live.as_ref().map_or(0, |l| l.snapshot.peer_stats.live as usize),
is_finished: stats.finished,
})
}
/// Wait until torrent download is completed
pub async fn wait_until_completed(&self, info_hash: &str) -> Result<()> {
let torrents = self.torrents.read().await;
let handle = torrents.get(info_hash).context("Torrent not found")?;
handle
.wait_until_completed()
.await
.context("Failed to wait for completion")?;
Ok(())
}
/// Enable seeding for a completed torrent
///
/// Note: rqbit seeds by default after completion, this is a no-op
/// but kept for API compatibility
pub async fn enable_seeding(&self, _info_hash: &str) -> Result<()> {
// rqbit automatically seeds after download completion
// This is kept for API compatibility
Ok(())
}
/// Remove a torrent from the session
pub async fn remove_torrent(&self, info_hash: &str) -> Result<()> {
let mut torrents = self.torrents.write().await;
if let Some(handle) = torrents.remove(info_hash) {
drop(handle);
}
Ok(())
}
/// Get list of all torrent info hashes in the session
pub async fn list_torrents(&self) -> Vec<String> {
self.torrents.read().await.keys().cloned().collect()
}
}

View File

@@ -1,100 +0,0 @@
//! Torrent file list parsing
//!
//! Provides functionality to extract file information from torrent metadata
//! without adding the torrent to a session.
use anyhow::{Context, Result};
use librqbit::torrent_from_bytes;
use serde::{Deserialize, Serialize};
/// Information about a file in a torrent
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TorrentFileInfo {
/// File index (0-based)
pub index: usize,
/// File path relative to torrent root
pub path: String,
/// File size in bytes
pub size: u64,
}
/// Get the list of files in a torrent from its raw bytes
///
/// # Arguments
/// * `torrent_data` - Raw .torrent file contents
///
/// # Returns
/// List of file information (index, path, size)
pub fn get_torrent_file_list(torrent_data: &[u8]) -> Result<Vec<TorrentFileInfo>> {
let torrent_meta = torrent_from_bytes(torrent_data).context("Failed to parse torrent")?;
// Access the data inside WithRawBytes wrapper
let info = &torrent_meta.info.data;
let mut files = Vec::new();
// Handle both single-file and multi-file torrents
if let Some(ref file_list) = info.files {
// Multi-file torrent
for (index, file) in file_list.iter().enumerate() {
let path = file
.path
.iter()
.map(|buf| String::from_utf8_lossy(buf.0).to_string())
.collect::<Vec<_>>()
.join("/");
files.push(TorrentFileInfo {
index,
path,
size: file.length,
});
}
} else {
// Single-file torrent
let name = match &info.name {
Some(n) => String::from_utf8_lossy(n.0).to_string(),
None => String::new(),
};
files.push(TorrentFileInfo {
index: 0,
path: name,
size: info.length.unwrap_or(0),
});
}
Ok(files)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::get_embedded_torrents;
#[test]
fn test_get_torrent_file_list() {
// Use an embedded torrent for testing
let torrents = get_embedded_torrents(
"mlx-community/Qwen3-30B-A3B-4bit",
"d388dead1515f5e085ef7a0431dd8fadf0886c57",
);
assert!(!torrents.is_empty(), "Expected to find embedded torrents");
for (variant, data) in torrents {
let files = get_torrent_file_list(&data).expect("Failed to parse torrent");
assert!(!files.is_empty(), "Expected files in {variant} variant");
// Verify file info makes sense
for file in &files {
assert!(!file.path.is_empty(), "File path should not be empty");
assert!(file.size > 0, "File size should be positive");
}
println!("Variant '{variant}' has {} files", files.len());
for file in files.iter().take(5) {
println!(" [{}] {} ({} bytes)", file.index, file.path, file.size);
}
}
}
}

View File

@@ -1,185 +0,0 @@
//! Fake tracker implementation for Exo topology-based peer discovery
//!
//! Instead of contacting real BitTorrent trackers, this module generates
//! tracker announce responses using Exo's cluster topology data.
use std::net::Ipv4Addr;
use anyhow::Result;
use crate::bencode::{AnnounceParams, BencodeValue};
/// Information about a peer in the Exo topology
#[derive(Debug, Clone)]
pub struct PeerInfo {
/// Unique node identifier in the Exo cluster
pub node_id: String,
/// IPv4 address of the peer
pub ip: Ipv4Addr,
/// BitTorrent listening port
pub port: u16,
/// Whether this peer has the complete torrent
pub has_complete: bool,
/// Priority for peer selection (higher = prefer)
pub priority: i32,
}
/// Topology data containing available peers
#[derive(Debug, Clone)]
pub struct TopologyData {
/// List of peers in the topology
pub peers: Vec<PeerInfo>,
}
/// Default announce interval in seconds
const DEFAULT_INTERVAL: i64 = 1800;
/// Handle a tracker announce request using Exo topology data
///
/// Returns a bencoded tracker response containing peers from the topology.
///
/// # Arguments
/// * `params` - Announce request parameters
/// * `topology` - Current Exo cluster topology
///
/// # Returns
/// Bencoded announce response as bytes
pub fn handle_announce(params: &AnnounceParams, topology: &TopologyData) -> Result<Vec<u8>> {
// Sort peers by priority (descending) for better peer selection
let mut peers: Vec<_> = topology.peers.iter().collect();
peers.sort_by(|a, b| b.priority.cmp(&a.priority));
let response = if params.compact {
// Compact format: 6 bytes per peer (4 IP + 2 port)
let mut peer_data = Vec::with_capacity(peers.len() * 6);
for peer in &peers {
peer_data.extend_from_slice(&peer.ip.octets());
peer_data.extend_from_slice(&peer.port.to_be_bytes());
}
BencodeValue::dict()
.insert("interval", BencodeValue::integer(DEFAULT_INTERVAL))
.insert("peers", BencodeValue::Bytes(peer_data))
} else {
// Non-compact format: list of dicts
let mut peer_list = BencodeValue::list();
for peer in &peers {
let peer_dict = BencodeValue::dict()
.insert("ip", BencodeValue::string(&peer.ip.to_string()))
.insert("port", BencodeValue::integer(i64::from(peer.port)))
.insert("peer id", BencodeValue::Bytes(vec![0u8; 20])); // Placeholder peer ID
peer_list = peer_list.push(peer_dict);
}
BencodeValue::dict()
.insert("interval", BencodeValue::integer(DEFAULT_INTERVAL))
.insert("peers", peer_list)
};
Ok(response.encode())
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_params(compact: bool) -> AnnounceParams {
AnnounceParams {
info_hash: [0u8; 20],
peer_id: [0u8; 20],
port: 6881,
uploaded: 0,
downloaded: 0,
left: 1000,
compact,
event: None,
}
}
fn make_test_topology() -> TopologyData {
TopologyData {
peers: vec![
PeerInfo {
node_id: "node1".to_string(),
ip: Ipv4Addr::new(192, 168, 1, 1),
port: 6881,
has_complete: true,
priority: 10,
},
PeerInfo {
node_id: "node2".to_string(),
ip: Ipv4Addr::new(192, 168, 1, 2),
port: 6882,
has_complete: false,
priority: 5,
},
],
}
}
#[test]
fn test_compact_response() {
let params = make_test_params(true);
let topology = make_test_topology();
let response = handle_announce(&params, &topology).unwrap();
// Should contain "interval" and "peers" keys
assert!(response.starts_with(b"d"));
assert!(response.ends_with(b"e"));
// Verify we have 12 bytes of peer data (2 peers * 6 bytes)
// The compact peers field should be "12:<12 bytes>"
let response_str = String::from_utf8_lossy(&response);
assert!(response_str.contains("8:interval"));
assert!(response_str.contains("5:peers"));
}
#[test]
fn test_non_compact_response() {
let params = make_test_params(false);
let topology = make_test_topology();
let response = handle_announce(&params, &topology).unwrap();
// Should contain peers as a list
let response_str = String::from_utf8_lossy(&response);
assert!(response_str.contains("8:interval"));
assert!(response_str.contains("5:peers"));
assert!(response_str.contains("2:ip"));
assert!(response_str.contains("4:port"));
}
#[test]
fn test_peer_priority_ordering() {
let params = make_test_params(true);
let topology = make_test_topology();
let response = handle_announce(&params, &topology).unwrap();
// In compact format, first peer should be node1 (priority 10)
// which is 192.168.1.1:6881
// Look for the peer data after "5:peers12:"
let peers_marker = b"5:peers12:";
let pos = response
.windows(peers_marker.len())
.position(|w| w == peers_marker)
.unwrap();
let peer_data = &response[pos + peers_marker.len()..pos + peers_marker.len() + 6];
// First peer should be 192.168.1.1 (node1 with higher priority)
assert_eq!(&peer_data[0..4], &[192, 168, 1, 1]);
}
#[test]
fn test_empty_topology() {
let params = make_test_params(true);
let topology = TopologyData { peers: vec![] };
let response = handle_announce(&params, &topology).unwrap();
// Should still be valid bencoded response with empty peers
assert!(response.starts_with(b"d"));
assert!(response.ends_with(b"e"));
}
}

View File

File diff suppressed because one or more lines are too long

View File

File diff suppressed because one or more lines are too long

View File

File diff suppressed because one or more lines are too long

View File

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1519e4:pathl14:.gitattributeseed6:lengthi884e4:pathl9:README.mdeed6:lengthi1249e4:pathl19:chat_template.jinjaeed6:lengthi1848e4:pathl11:config.jsoneed6:lengthi10652e4:pathl25:configuration_deepseek.pyeed6:lengthi52e4:pathl22:generation_config.jsoneed6:lengthi221164e4:pathl28:model.safetensors.index.jsoneed6:lengthi75769e4:pathl20:modeling_deepseek.pyeed6:lengthi760e4:pathl23:special_tokens_map.jsoneed6:lengthi11330e4:pathl20:tokenization_kimi.pyeed6:lengthi2738e4:pathl21:tokenizer_config.jsoneee4:name40:91fb4f9fd1de100104925196d62b8ee06fd2ad6012:piece lengthi262144e6:pieces40:<3A>C<EFBFBD>t:<3A><>I_<49>i*xg<78><04>s|,<2C>4S<34><53><EFBFBD>j<EFBFBD><6A><EFBFBD>S<EFBFBD><03>|d<>e8:url-list63:https://huggingface.co/mlx-community/Kimi-K2-Instruct-4bit/raw/e

View File

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1519e4:pathl14:.gitattributeseed6:lengthi864e4:pathl9:README.mdeed6:lengthi3442e4:pathl19:chat_template.jinjaeed6:lengthi3445e4:pathl11:config.jsoneed6:lengthi10652e4:pathl25:configuration_deepseek.pyeed6:lengthi53e4:pathl22:generation_config.jsoneed6:lengthi129766e4:pathl28:model.safetensors.index.jsoneed6:lengthi75769e4:pathl20:modeling_deepseek.pyeed6:lengthi760e4:pathl23:special_tokens_map.jsoneed6:lengthi12597e4:pathl20:tokenization_kimi.pyeed6:lengthi4047e4:pathl21:tokenizer_config.jsoneee4:name40:035a0cdd221ae0dca6b03120e20704a251a7bc9b12:piece lengthi262144e6:pieces20:<3A>^<5E>9`<60>C<18><>Y<EFBFBD>-L<><4C>*EC*e8:url-list58:https://huggingface.co/mlx-community/Kimi-K2-Thinking/raw/e

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1570e4:pathl14:.gitattributeseed6:lengthi16485e4:pathl9:README.mdeed6:lengthi1123e4:pathl11:config.jsoneed6:lengthi158327e4:pathl28:model.safetensors.index.jsoneed6:lengthi454e4:pathl23:special_tokens_map.jsoneed6:lengthi55425e4:pathl21:tokenizer_config.jsoneee4:name40:de2dfaf56839b7d0e834157d2401dee02726874d12:piece lengthi262144e6:pieces20:<3A>*_<1F><><EFBFBD><18>Tij<04><>+<2B>]<5D><>e8:url-list69:https://huggingface.co/mlx-community/Llama-3.3-70B-Instruct-4bit/raw/e

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1570e4:pathl14:.gitattributeseed6:lengthi16485e4:pathl9:README.mdeed6:lengthi1123e4:pathl11:config.jsoneed6:lengthi158327e4:pathl28:model.safetensors.index.jsoneed6:lengthi454e4:pathl23:special_tokens_map.jsoneed6:lengthi55425e4:pathl21:tokenizer_config.jsoneee4:name40:c5bfd839cd4cda0e5a39a97e00218d9c56e468af12:piece lengthi262144e6:pieces20:܌!<0E><><EFBFBD>TO<54><4F>4<><34><EFBFBD>P<EFBFBD>_Qe8:url-list69:https://huggingface.co/mlx-community/Llama-3.3-70B-Instruct-8bit/raw/e

View File

@@ -1,2 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1570e4:pathl14:.gitattributeseed6:lengthi1033e4:pathl9:README.mdeed6:lengthi707e4:pathl17:added_tokens.jsoneed6:lengthi6722e4:pathl19:chat_template.jinjaeed6:lengthi1222e4:pathl11:config.jsoneed6:lengthi180e4:pathl22:generation_config.jsoneed6:lengthi1671853e4:pathl10:merges.txteed6:lengthi154390e4:pathl28:model.safetensors.index.jsoneed6:lengthi28881e4:pathl24:qwen3_xml_tool_parser.pyeed6:lengthi613e4:pathl23:special_tokens_map.jsoneed6:lengthi5405e4:pathl21:tokenizer_config.jsoneed6:lengthi2776833e4:pathl10:vocab.jsoneee4:name40:ca8dbf41071f579fbe3260f20bbe1ab896f7903112:piece lengthi262144e6:pieces360:<3A>3<EFBFBD>\<5C>PDE<44><45><17><><EFBFBD><06><06><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>c+<2B>h{" <0B><>_
m<EFBFBD> 7<><37><EFBFBD><EFBFBD>.<2E>h<14><>fm<66><6D>,<2C>w<EFBFBD><77>nOМ<4F><11><>"<22><><EFBFBD><EFBFBD>&j<><6A>_<EFBFBD><5F>"F<><46><EFBFBD>u<18>gU<67><08><><EFBFBD>QW<51><57><EFBFBD><EFBFBD>@qiiq<69><71>T<EFBFBD><54><EFBFBD>P<>lSJƤ<4A> \<5C><><EFBFBD>R!<21>=<3D><>v<EFBFBD><76><EFBFBD>F<EFBFBD>q9<71><39><EFBFBD><EFBFBD><01><><EFBFBD><EFBFBD><av<61>B@<40><> <09>z

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi75789955e4:pathl17:model.safetensorseee4:name40:f56bc6adfb74c794203dc8ca94e0bccfe2bcd6cc12:piece lengthi16777216e6:pieces100:QM0Ts@Ev<>XԄ=<3D>6_xhњU4=<3D><>7<EFBFBD>j<EFBFBD><6A><EFBFBD><18>F<EFBFBD>M<EFBFBD>q<EFBFBD><71><EFBFBD><EFBFBD>m>a<><61>H°*'<27>5<EFBFBD><35>/9B<39><42>^V<>4H9m<39><6D><EFBFBD><EFBFBD>0<EFBFBD>^z<><7A>+YS*<2A>M<EFBFBD><4D>G<EFBFBD>+<2B>.<02>h<EFBFBD>5e8:url-list62:https://huggingface.co/mlx-community/SmolLM-135M-4bit/resolve/e

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1570e4:pathl14:.gitattributeseed6:lengthi845e4:pathl9:README.mdeed6:lengthi16738e4:pathl19:chat_template.jinjaeed6:lengthi50145e4:pathl11:config.jsoneed6:lengthi177e4:pathl22:generation_config.jsoneed6:lengthi100431e4:pathl28:model.safetensors.index.jsoneed6:lengthi440e4:pathl23:special_tokens_map.jsoneed6:lengthi4200e4:pathl21:tokenizer_config.jsoneee4:name40:81e5ac3ad0af6efb1298a8e8c7a10ed2990c137b12:piece lengthi262144e6:pieces20:ME<4D>TVE@ͯ<><4E>8<><38><EFBFBD>`e8:url-list63:https://huggingface.co/mlx-community/gpt-oss-120b-MXFP4-Q8/raw/e

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1570e4:pathl14:.gitattributeseed6:lengthi838e4:pathl9:README.mdeed6:lengthi33998e4:pathl11:config.jsoneed6:lengthi177e4:pathl22:generation_config.jsoneed6:lengthi67046e4:pathl28:model.safetensors.index.jsoneed6:lengthi440e4:pathl23:special_tokens_map.jsoneed6:lengthi21694e4:pathl21:tokenizer_config.jsoneee4:name40:f356f2747216d7e98fee755df25987459fc1908912:piece lengthi262144e6:pieces20:<3A><><EFBFBD><EFBFBD>ͥ<><CDA5><EFBFBD>g#`<60><>f<EFBFBD>x<EFBFBD><78>e8:url-list62:https://huggingface.co/mlx-community/gpt-oss-20b-MXFP4-Q4/raw/e

View File

@@ -1,6 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1519e4:pathl14:.gitattributeseed6:lengthi956e4:pathl9:README.mdeed6:lengthi207e4:pathl17:added_tokens.jsoneed6:lengthi848e4:pathl11:config.jsoneed6:lengthi441810e4:pathl10:merges.txteed6:lengthi25864e4:pathl28:model.safetensors.index.jsoneed6:lengthi801e4:pathl23:special_tokens_map.jsoneed6:lengthi3476578e4:pathl14:tokenizer.jsoneed6:lengthi9935e4:pathl21:tokenizer_config.jsoneed6:lengthi776995e4:pathl10:vocab.jsoneee4:name40:39b35eaa97282c34db81f61a983b4b83344e10f112:piece lengthi262144e6:pieces380:<3A>ih֨
[׬-<2D><><EFBFBD>}<7D><19><>U<EFBFBD>){[<5B>+<2B>7PU<><13>nR`<60><>g<EFBFBD> <0C>vH<76><78>q<EFBFBD><71>Lz<4C> <>Q<>Ĉ|Š<><C5A0><EFBFBD><EFBFBD>\<5C><>ehۢ<68>S<EFBFBD> <0B>#<23>g<EFBFBD>Y%@D<><D2A9><EFBFBD>}ޥXO<><4F><EFBFBD><EFBFBD><EFBFBD><06> <0C><><EFBFBD><EFBFBD><EFBFBD><1B>Y<EFBFBD>"<22><>|<7C>JH<4A> <0C>w<EFBFBD><05>MH<4D>*k<>@R<><52> 1i<31>|<7C>y<H<02><>H {<7B><14>
<EFBFBD><1B><P<><50>@<40><><16><>E<<3C><><EFBFBD>S<EFBFBD><53>|<7C><><EFBFBD>A
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>_<><5F>
;<3B>Rg<52><67><EFBFBD><>$<24><><EFBFBD>|@`X<58><7F>#<23><><EFBFBD>M<EFBFBD>$n-<2D><10><>i<EFBFBD>
9<>6ɝ@t<><74>j<EFBFBD><16>n<EFBFBD><6E><EFBFBD><EFBFBD>ɃH<C983><48>,<2C><>

View File

@@ -1 +0,0 @@
d8:announce42:udp://tracker.opentrackr.org:1337/announce10:created by13:mktorrent 1.14:infod5:filesld6:lengthi1570e4:pathl14:.gitattributeseed6:lengthi16447e4:pathl9:README.mdeed6:lengthi970e4:pathl11:config.jsoneed6:lengthi62518e4:pathl28:model.safetensors.index.jsoneed6:lengthi454e4:pathl23:special_tokens_map.jsoneed6:lengthi55421e4:pathl21:tokenizer_config.jsoneee4:name40:8103891b028a8933068e47751bc2acc10bb59aa212:piece lengthi262144e6:pieces20:<3A>l<EFBFBD>f<EFBFBD>7<>.<2E><><EFBFBD> <0B> a<><61>e8:url-list69:https://huggingface.co/mlx-community/llama-3.3-70b-instruct-fp16/raw/e

View File

@@ -23,7 +23,6 @@ workspace = true
[dependencies]
networking = { workspace = true }
downloads = { workspace = true }
# interop
pyo3 = { version = "0.27.1", features = [

View File

@@ -1,334 +0,0 @@
//! Downloads module - BitTorrent downloads PyO3 bindings
use crate::ext::*;
use downloads::bencode::AnnounceParams;
use downloads::tracker::{PeerInfo, TopologyData, handle_announce as rust_handle_announce};
use downloads::{DownloadProgress, TorrentSession};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict};
use std::net::Ipv4Addr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
/// Handle a tracker announce request
///
/// Args:
/// params: Dictionary with announce parameters (info_hash, peer_id, port, etc.)
/// peers: List of peer dictionaries (node_id, ip, port, has_complete, priority)
///
/// Returns:
/// Bencoded announce response as bytes
#[pyfunction]
fn handle_tracker_announce(
py: Python<'_>,
params: &Bound<'_, PyDict>,
peers: &Bound<'_, pyo3::types::PyList>,
) -> PyResult<Py<PyBytes>> {
// Parse announce params
let info_hash = {
let info_hash_item = params
.get_item("info_hash")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing info_hash"))?;
let info_hash_bytes: &[u8] = info_hash_item.extract()?;
if info_hash_bytes.len() != 20 {
return Err(pyo3::exceptions::PyValueError::new_err(
"info_hash must be 20 bytes",
));
}
let mut info_hash = [0u8; 20];
info_hash.copy_from_slice(info_hash_bytes);
info_hash
};
let peer_id = {
let peer_id_item = params
.get_item("peer_id")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing peer_id"))?;
let peer_id_bytes: &[u8] = peer_id_item.extract()?;
if peer_id_bytes.len() != 20 {
return Err(pyo3::exceptions::PyValueError::new_err(
"peer_id must be 20 bytes",
));
}
let mut peer_id = [0u8; 20];
peer_id.copy_from_slice(peer_id_bytes);
peer_id
};
let port: u16 = params
.get_item("port")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing port"))?
.extract()?;
let uploaded: u64 = params
.get_item("uploaded")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing uploaded"))?
.extract()?;
let downloaded: u64 = params
.get_item("downloaded")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing downloaded"))?
.extract()?;
let left: u64 = params
.get_item("left")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing left"))?
.extract()?;
let compact: bool = params
.get_item("compact")?
.map(|v| v.extract().unwrap_or(true))
.unwrap_or(true);
let announce_params = AnnounceParams {
info_hash,
peer_id,
port,
uploaded,
downloaded,
left,
compact,
event: None, // TODO: parse event if needed
};
// Parse peer list
let peer_infos: Result<Vec<PeerInfo>, PyErr> = peers
.iter()
.map(|peer_item| {
let peer_dict: &Bound<'_, PyDict> = peer_item.downcast()?;
let node_id: String = peer_dict
.get_item("node_id")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing node_id"))?
.extract()?;
let ip_str: String = peer_dict
.get_item("ip")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing ip"))?
.extract()?;
let ip: Ipv4Addr = ip_str
.parse()
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Invalid IP address"))?;
let port: u16 = peer_dict
.get_item("port")?
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Missing port"))?
.extract()?;
let has_complete: bool = peer_dict
.get_item("has_complete")?
.map(|v: Bound<'_, pyo3::PyAny>| v.extract().unwrap_or(false))
.unwrap_or(false);
let priority: i32 = peer_dict
.get_item("priority")?
.map(|v: Bound<'_, pyo3::PyAny>| v.extract().unwrap_or(0))
.unwrap_or(0);
Ok(PeerInfo {
node_id,
ip,
port,
has_complete,
priority,
})
})
.collect();
let peer_infos = peer_infos?;
let topology = TopologyData { peers: peer_infos };
// Call Rust tracker handler
let response_bytes = rust_handle_announce(&announce_params, &topology).pyerr()?;
// Return as Python bytes
Ok(PyBytes::new(py, &response_bytes).unbind())
}
/// Get all embedded torrent variants for a model
///
/// Args:
/// model_id: Model identifier (e.g., "mlx-community/Qwen3-30B-A3B-4bit")
/// revision: Git commit hash
///
/// Returns:
/// List of (variant_name, torrent_data) tuples, e.g., [("small", bytes), ("large", bytes)]
/// Returns empty list if no torrents found.
#[pyfunction]
fn get_embedded_torrents(
py: Python<'_>,
model_id: String,
revision: String,
) -> PyResult<Vec<(String, Py<PyBytes>)>> {
let torrents = downloads::get_embedded_torrents(&model_id, &revision);
Ok(torrents
.into_iter()
.map(|(variant, data)| (variant, PyBytes::new(py, &data).unbind()))
.collect())
}
/// Get file list from torrent data
///
/// Args:
/// torrent_data: Raw .torrent file contents
///
/// Returns:
/// List of (index, path, size_bytes) tuples for each file in the torrent
#[pyfunction]
fn get_torrent_file_list(torrent_data: Vec<u8>) -> PyResult<Vec<(usize, String, u64)>> {
let files = downloads::get_torrent_file_list(&torrent_data).pyerr()?;
Ok(files
.into_iter()
.map(|f| (f.index, f.path, f.size))
.collect())
}
/// Python wrapper for TorrentSession
#[pyclass]
struct TorrentSessionHandle {
session: Arc<Mutex<TorrentSession>>,
}
#[pymethods]
impl TorrentSessionHandle {
/// Create a new torrent session
///
/// Args:
/// session_dir: Directory to store session state and downloads
#[new]
fn new(session_dir: String) -> PyResult<Self> {
let session_path = PathBuf::from(session_dir);
let session = tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async { TorrentSession::new(session_path).await })
.pyerr()?;
Ok(Self {
session: Arc::new(Mutex::new(session)),
})
}
/// Add a torrent from bytes
///
/// Args:
/// torrent_data: Raw .torrent file contents
/// save_path: Where to save downloaded files
/// file_indices: Optional list of file indices to download
///
/// Returns:
/// Info hash as hex string
fn add_torrent(
&self,
_py: Python<'_>,
torrent_data: Vec<u8>,
save_path: String,
file_indices: Option<Vec<usize>>,
) -> PyResult<String> {
let session = Arc::clone(&self.session);
let save_path = PathBuf::from(save_path);
tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async {
session
.lock()
.await
.add_torrent(torrent_data, save_path, file_indices)
.await
})
.pyerr()
}
/// Get download progress for a torrent
///
/// Args:
/// info_hash: Torrent info hash
///
/// Returns:
/// Dictionary with progress information
fn get_progress(&self, py: Python<'_>, info_hash: String) -> PyResult<Py<PyDict>> {
let session = Arc::clone(&self.session);
let progress: DownloadProgress = tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async { session.lock().await.get_progress(&info_hash).await })
.pyerr()?;
let dict = PyDict::new(py);
dict.set_item("downloaded_bytes", progress.downloaded_bytes)?;
dict.set_item("total_bytes", progress.total_bytes)?;
dict.set_item("download_speed", progress.download_speed)?;
dict.set_item("upload_speed", progress.upload_speed)?;
dict.set_item("peers_connected", progress.peers_connected)?;
dict.set_item("is_finished", progress.is_finished)?;
Ok(dict.unbind())
}
/// Wait until torrent download is completed
///
/// Args:
/// info_hash: Torrent info hash
fn wait_until_completed(&self, _py: Python<'_>, info_hash: String) -> PyResult<()> {
let session = Arc::clone(&self.session);
tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async { session.lock().await.wait_until_completed(&info_hash).await })
.pyerr()
}
/// Enable seeding for a torrent
///
/// Args:
/// info_hash: Torrent info hash
fn enable_seeding(&self, _py: Python<'_>, info_hash: String) -> PyResult<()> {
let session = Arc::clone(&self.session);
tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async { session.lock().await.enable_seeding(&info_hash).await })
.pyerr()
}
/// Remove a torrent from the session
///
/// Args:
/// info_hash: Torrent info hash
fn remove_torrent(&self, _py: Python<'_>, info_hash: String) -> PyResult<()> {
let session = Arc::clone(&self.session);
tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async { session.lock().await.remove_torrent(&info_hash).await })
.pyerr()
}
/// List all torrents in the session
///
/// Returns:
/// List of info hashes
fn list_torrents(&self, _py: Python<'_>) -> PyResult<Vec<String>> {
let session = Arc::clone(&self.session);
tokio::runtime::Runtime::new()
.pyerr()?
.block_on(async { Ok(session.lock().await.list_torrents().await) })
}
}
/// Downloads submodule
pub(crate) fn downloads_submodule(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(handle_tracker_announce, m)?)?;
m.add_function(wrap_pyfunction!(get_embedded_torrents, m)?)?;
m.add_function(wrap_pyfunction!(get_torrent_file_list, m)?)?;
m.add_class::<TorrentSessionHandle>()?;
Ok(())
}

View File

@@ -17,12 +17,10 @@
extern crate core;
mod allow_threading;
pub(crate) mod downloads;
mod examples;
pub(crate) mod networking;
pub(crate) mod pylibp2p;
use crate::downloads::downloads_submodule;
use crate::networking::networking_submodule;
use crate::pylibp2p::ident::ident_submodule;
use crate::pylibp2p::multiaddr::multiaddr_submodule;
@@ -209,7 +207,6 @@ fn main_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
ident_submodule(m)?;
multiaddr_submodule(m)?;
networking_submodule(m)?;
downloads_submodule(m)?;
// top-level constructs
// TODO: ...

145
rust/parts.nix Normal file
View File

@@ -0,0 +1,145 @@
{ inputs, ... }:
{
perSystem =
{ config, self', inputs', pkgs, lib, ... }:
let
# Fenix nightly toolchain with all components
fenixPkgs = inputs'.fenix.packages;
rustToolchain = fenixPkgs.complete.withComponents [
"cargo"
"rustc"
"clippy"
"rustfmt"
"rust-src"
"rust-analyzer"
];
# Crane with fenix toolchain
craneLib = (inputs.crane.mkLib pkgs).overrideToolchain rustToolchain;
# Source filtering - only include rust/ directory and root Cargo files
# This ensures changes to Python/docs/etc don't trigger Rust rebuilds
src = lib.cleanSourceWith {
src = inputs.self;
filter =
path: type:
let
baseName = builtins.baseNameOf path;
parentDir = builtins.dirOf path;
inRustDir =
(lib.hasInfix "/rust/" path)
|| (lib.hasSuffix "/rust" parentDir)
|| (baseName == "rust" && type == "directory");
isRootCargoFile =
(baseName == "Cargo.toml" || baseName == "Cargo.lock")
&& (builtins.dirOf path == toString inputs.self);
in
isRootCargoFile
|| (inRustDir && (craneLib.filterCargoSources path type || lib.hasSuffix ".toml" path || lib.hasSuffix ".md" path));
};
# Common arguments for all Rust builds
commonArgs = {
inherit src;
pname = "exo-rust";
version = "0.0.1";
strictDeps = true;
nativeBuildInputs = [
pkgs.pkg-config
pkgs.python313 # Required for pyo3-build-config
];
buildInputs = [
pkgs.openssl
pkgs.python313 # Required for pyo3 tests
];
OPENSSL_NO_VENDOR = "1";
# Required for pyo3 tests to find libpython
LD_LIBRARY_PATH = lib.makeLibraryPath [ pkgs.python313 ];
};
# Build dependencies once for caching
cargoArtifacts = craneLib.buildDepsOnly (
commonArgs
// {
cargoExtraArgs = "--workspace";
}
);
in
{
# Export toolchain for use in treefmt and devShell
options.rust = {
toolchain = lib.mkOption {
type = lib.types.package;
default = rustToolchain;
description = "The Rust toolchain to use";
};
};
config = {
packages = {
# Python bindings wheel via maturin
exo_pyo3_bindings = craneLib.buildPackage (
commonArgs
// {
inherit cargoArtifacts;
pname = "exo_pyo3_bindings";
nativeBuildInputs = commonArgs.nativeBuildInputs ++ [
pkgs.maturin
];
buildPhaseCargoCommand = ''
maturin build \
--release \
--manylinux off \
--manifest-path rust/exo_pyo3_bindings/Cargo.toml \
--features "pyo3/extension-module,pyo3/experimental-async" \
--interpreter ${pkgs.python313}/bin/python \
--out dist
'';
# Don't use crane's default install behavior
doNotPostBuildInstallCargoBinaries = true;
installPhaseCommand = ''
mkdir -p $out
cp dist/*.whl $out/
'';
}
);
};
checks = {
# Full workspace build (all crates)
cargo-build = craneLib.buildPackage (
commonArgs
// {
inherit cargoArtifacts;
cargoExtraArgs = "--workspace";
}
);
# Run tests with nextest
cargo-nextest = craneLib.cargoNextest (
commonArgs
// {
inherit cargoArtifacts;
cargoExtraArgs = "--workspace";
}
);
# Build documentation
cargo-doc = craneLib.cargoDoc (
commonArgs
// {
inherit cargoArtifacts;
cargoExtraArgs = "--workspace";
}
);
};
};
};
}

View File

@@ -1,47 +0,0 @@
[package]
name = "system_custodian"
version = { workspace = true }
edition = { workspace = true }
publish = false
[lib]
doctest = false
name = "system_custodian"
path = "src/lib.rs"
[[bin]]
path = "src/bin/main.rs"
name = "system_custodian"
doc = false
[lints]
workspace = true
[dependencies]
# datastructures
either = { workspace = true }
# macro dependencies
extend = { workspace = true }
delegate = { workspace = true }
impl-trait-for-tuples = { workspace = true }
derive_more = { workspace = true }
# async
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
futures-timer = { workspace = true }
# utility dependencies
util = { workspace = true }
thiserror = { workspace = true }
#internment = { workspace = true }
#recursion = { workspace = true }
#generativity = { workspace = true }
#itertools = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["default", "env-filter"] }
keccak-const = { workspace = true }
# tracing/logging
log = { workspace = true }

View File

@@ -1,4 +0,0 @@
//! TODO: documentation
//!
fn main() {}

View File

@@ -1,69 +0,0 @@
//! This crate defines the logic of, and ways to interact with, Exo's **_System Custodian_** daemon.
//!
//! The **_System Custodian_** daemon is supposed to be a long-living process that precedes the
//! launch of the Exo application, and responsible for ensuring the system (configuration, settings,
//! etc.) is in an appropriate state to facilitate the running of Exo application.
//! The **_System Custodian_** daemon shall expose a [D-Bus](https://www.freedesktop.org/wiki/Software/dbus/)
//! service which Exo application use to _control & query_ it.
//!
//! # Lifecycle
//! When the Exo application starts, it will _wake_ the **_System Custodian_** daemon for the
//! duration of its lifetime, and after it has terminated the daemon will go back to sleep. When
//! the daemon wakes up, it will configure the system into a state suitable for the Exo Application;
//! When the daemon goes to sleep, it will revert those changes as much as it can in case they were
//! destructive to the user's pre-existing configurations.
//!
//! # Responsibilities
//! TODO: these are purely on MacOS, but change to be more broad
//! The **_System Custodian_** daemon is responsible for using System Configuration framework to
//! 1. duplicate the current network set
//! 2. modify existing services to turn on IPv6 if not there
//! 3. remove any bridge services & add any missing services that AREN'T bridge
//! TODO: In the future:
//! 1. run a dummy AWDL service to [allow for macOS peer-to-peer wireless networking](https://yggdrasil-network.github.io/2019/08/19/awdl.html)
//! 2. toggle some GPU/memory configurations to speed up GPU (ask Alex what those configurations are)
//! 3. if we ever decide to provide our **own network interfaces** that abstract over some userland
//! logic, this would be the place to spin that up.
//!
//! Then it will watch the SCDynamicStore for:
//! 1. all __actual__ network interfaces -> collect information on them e.g. their BSD name, MAC
//! address, MTU, IPv6 addresses, etc. -> and set up watchers/notifiers to inform the DBus
//! interface of any changes
//! 2. watch for any __undesirable__ changes to configuration and revert it
//!
//! It should somehow (probably through system sockets and/or BSD interface) trigger IPv6 NDP on
//! each of the interfaces & also listen to/query for any changes on the OS routing cache??
//! Basically emulate the `ping6 ff02::1%enX` and `ndp -an` commands BUT BETTER!!!
//! 1. all that info should coalesce back to the overall state colleted -> should be queryable
//! over D-Bus
//! TODO:
//! 1. we might potentially add to this step a handshake of some kind...? To ensure that we can
//! ACTUALLY communicate with that machine over that link over e.g. TCP, UDP, etc. Will the
//! handshake require to know Node ID? Will the handshake require heartbeats? Who knows...
//! 2. if we ever decide to write proprietary L2/L3 protocols for quicker communication,
//! e.g. [AF_NDRV](https://www.zerotier.com/blog/how-zerotier-eliminated-kernel-extensions-on-macos/)
//! for raw ethernet frame communication, or even a [custom thunderbolt PCIe driver](https://developer.apple.com/documentation/pcidriverkit/creating-custom-pcie-drivers-for-thunderbolt-devices),
//! then this would be the place to carry out discovery and propper handshakes with devices
//! on the other end of the link.
//!
// enable Rust-unstable features for convenience
#![feature(trait_alias)]
#![feature(stmt_expr_attributes)]
#![feature(type_alias_impl_trait)]
#![feature(specialization)]
#![feature(unboxed_closures)]
#![feature(const_trait_impl)]
#![feature(fn_traits)]
pub(crate) mod private {
// sealed traits support
pub trait Sealed {}
impl<T: ?Sized> Sealed for T {}
}
/// Namespace for all the type/trait aliases used by this crate.
pub(crate) mod alias {}
/// Namespace for crate-wide extension traits/methods
pub(crate) mod ext {}

View File

@@ -1,58 +0,0 @@
#!/usr/bin/env nix-shell
#!nix-shell -i bash -p mktorrent -p python3Packages.huggingface-hub -p git -p git-lfs
set -euo pipefail
set -x
MODEL="$1"
mkdir -p "$MODEL"
# Step 1: Clone/fetch the repo and get the hash of head
mkdir -p "$MODEL"
if test -d "$MODEL/git"; then
# Assert that the origin is correct
git -C "$MODEL/git" fetch
else
git clone "https://huggingface.co/$MODEL" "$MODEL/git"
fi
HASH=$(git -C "$MODEL/git" rev-parse origin/main)
LARGE_FILES=$(git -C "$MODEL/git" lfs ls-files --all --name-only)
SMALL_DIR="$MODEL/$HASH-small"
LARGE_DIR="$MODEL/$HASH-large"
mkdir -p "$SMALL_DIR" "$LARGE_DIR"
# Step 2: Prepare files. Two torrents: one for large files and one for metadata.
git -C "$MODEL/git" archive "$HASH" | tar -x -C "$SMALL_DIR"
echo "$LARGE_FILES" | xargs -I{} rm "$SMALL_DIR/{}"
echo "$LARGE_FILES" | xargs hf download "$MODEL" --revision "$HASH" --local-dir "$LARGE_DIR" --cache-dir "$(realpath .cache)" --include
if test -d "$LARGE_DIR/.cache"; then
echo ".cache created against our wishes, deleting it..."
rm -r "$LARGE_DIR/.cache"
fi
# Step 3: Create both torrents
mkdir -p "torrents/$MODEL/"
SMALL_TORRENT_PATH="torrents/$MODEL/${HASH}.small.torrent"
LARGE_TORRENT_PATH="torrents/$MODEL/${HASH}.large.torrent"
mktorrent "$SMALL_DIR/" --output="$SMALL_TORRENT_PATH" \
-n "$HASH" \
--web-seed="https://huggingface.co/$MODEL/raw/" \
--no-date \
--announce="udp://tracker.opentrackr.org:1337/announce"
# --private
mktorrent "$LARGE_DIR/" --output="$LARGE_TORRENT_PATH" \
-n "$HASH" \
--web-seed="https://huggingface.co/$MODEL/resolve/" \
--piece-length=24 \
--no-date \
--announce="udp://tracker.opentrackr.org:1337/announce"
# --private
echo "Successfully created torrent files in:"
echo "$SMALL_TORRENT_PATH"
echo "$LARGE_TORRENT_PATH"

View File

@@ -1,6 +1,7 @@
import argparse
import multiprocessing as mp
import os
import resource
import signal
from dataclasses import dataclass, field
from typing import Self
@@ -35,7 +36,6 @@ class Node:
api: API | None
node_id: NodeId
enable_torrents: bool
_tg: TaskGroup = field(init=False, default_factory=anyio.create_task_group)
@classmethod
@@ -67,8 +67,7 @@ class Node:
worker = Worker(
node_id,
session_id,
exo_shard_downloader(enable_torrents=args.enable_torrents),
initial_connection_messages=[],
exo_shard_downloader(),
connection_message_receiver=router.receiver(topics.CONNECTION_MESSAGES),
global_event_receiver=router.receiver(topics.GLOBAL_EVENTS),
local_event_sender=router.sender(topics.LOCAL_EVENTS),
@@ -76,6 +75,7 @@ class Node:
)
else:
worker = None
# We start every node with a master
master = Master(
node_id,
@@ -99,7 +99,7 @@ class Node:
election_result_sender=er_send,
)
return cls(router, worker, election, er_recv, master, api, node_id, args.enable_torrents)
return cls(router, worker, election, er_recv, master, api, node_id)
async def run(self):
async with self._tg as tg:
@@ -176,7 +176,7 @@ class Node:
self.worker = Worker(
self.node_id,
result.session_id,
exo_shard_downloader(enable_torrents=self.enable_torrents),
exo_shard_downloader(),
connection_message_receiver=self.router.receiver(
topics.CONNECTION_MESSAGES
),
@@ -196,6 +196,8 @@ class Node:
def main():
args = Args.parse()
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, 65535), hard))
mp.set_start_method("spawn")
# TODO: Refactor the current verbosity system
@@ -203,6 +205,14 @@ def main():
logger.info("Starting EXO")
logger.info(f"EXO_LIBP2P_NAMESPACE: {os.getenv('EXO_LIBP2P_NAMESPACE')}")
# Set FAST_SYNCH override env var for runner subprocesses
if args.fast_synch is True:
os.environ["EXO_FAST_SYNCH"] = "on"
logger.info("FAST_SYNCH forced ON")
elif args.fast_synch is False:
os.environ["EXO_FAST_SYNCH"] = "off"
logger.info("FAST_SYNCH forced OFF")
node = anyio.run(Node.create, args)
anyio.run(node.run)
logger.info("EXO Shutdown complete")
@@ -216,7 +226,7 @@ class Args(CamelCaseModel):
api_port: PositiveInt = 52415
tb_only: bool = False
no_worker: bool = False
enable_torrents: bool = False
fast_synch: bool | None = None # None = auto, True = force on, False = force off
@classmethod
def parse(cls) -> Self:
@@ -258,11 +268,19 @@ class Args(CamelCaseModel):
"--no-worker",
action="store_true",
)
parser.add_argument(
"--enable-torrents",
fast_synch_group = parser.add_mutually_exclusive_group()
fast_synch_group.add_argument(
"--fast-synch",
action="store_true",
dest="enable_torrents",
help="Enable BitTorrent-based downloads (experimental)",
dest="fast_synch",
default=None,
help="Force MLX FAST_SYNCH on (for JACCL backend)",
)
fast_synch_group.add_argument(
"--no-fast-synch",
action="store_false",
dest="fast_synch",
help="Force MLX FAST_SYNCH off",
)
args = parser.parse_args()

View File

@@ -1,24 +1,19 @@
import time
from collections.abc import AsyncGenerator
from http import HTTPStatus
from typing import cast
import anyio
from anyio import create_task_group
from anyio import BrokenResourceError, create_task_group
from anyio.abc import TaskGroup
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, StreamingResponse
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from hypercorn.asyncio import serve # pyright: ignore[reportUnknownVariableType]
from hypercorn.config import Config
from hypercorn.typing import ASGIFramework
from loguru import logger
from openai_harmony import ( # pyright: ignore[reportMissingTypeStubs]
HarmonyEncodingName,
Role,
StreamableParser,
load_harmony_encoding,
)
from exo.master.placement import place_instance as get_instance_placements
from exo.shared.apply import apply
@@ -35,6 +30,8 @@ from exo.shared.types.api import (
CreateInstanceParams,
CreateInstanceResponse,
DeleteInstanceResponse,
ErrorInfo,
ErrorResponse,
FinishReason,
GenerationStats,
ModelList,
@@ -55,7 +52,12 @@ from exo.shared.types.commands import (
TaskFinished,
)
from exo.shared.types.common import CommandId, NodeId, SessionId
from exo.shared.types.events import ChunkGenerated, Event, ForwarderEvent, IndexedEvent
from exo.shared.types.events import (
ChunkGenerated,
Event,
ForwarderEvent,
IndexedEvent,
)
from exo.shared.types.memory import Memory
from exo.shared.types.models import ModelId, ModelMetadata
from exo.shared.types.state import State
@@ -67,8 +69,6 @@ from exo.utils.channels import Receiver, Sender, channel
from exo.utils.dashboard_path import find_dashboard
from exo.utils.event_buffer import OrderedBuffer
encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)
def chunk_to_response(
chunk: TokenChunk, command_id: CommandId
@@ -123,6 +123,7 @@ class API:
self.paused_ev: anyio.Event = anyio.Event()
self.app = FastAPI()
self._setup_exception_handlers()
self._setup_cors()
self._setup_routes()
@@ -153,6 +154,20 @@ class API:
self.paused_ev.set()
self.paused_ev = anyio.Event()
def _setup_exception_handlers(self) -> None:
@self.app.exception_handler(HTTPException)
async def http_exception_handler( # pyright: ignore[reportUnusedFunction]
_: Request, exc: HTTPException
) -> JSONResponse:
err = ErrorResponse(
error=ErrorInfo(
message=exc.detail,
type=HTTPStatus(exc.status_code).phrase,
code=exc.status_code,
)
)
return JSONResponse(err.model_dump(), status_code=exc.status_code)
def _setup_cors(self) -> None:
self.app.add_middleware(
CORSMiddleware,
@@ -178,7 +193,6 @@ class API:
self.app.post("/bench/chat/completions")(self.bench_chat_completions)
self.app.get("/state")(lambda: self.state)
self.app.get("/events")(lambda: self._event_log)
self.app.get("/_internal/announce")(self.tracker_announce)
async def place_instance(self, payload: PlaceInstanceParams):
command = PlaceInstance(
@@ -382,35 +396,8 @@ class API:
instance_id=instance_id,
)
async def _process_gpt_oss(self, token_chunks: Receiver[TokenChunk]):
stream = StreamableParser(encoding, role=Role.ASSISTANT)
thinking = False
async for chunk in token_chunks:
stream.process(chunk.token_id)
delta = stream.last_content_delta
ch = stream.current_channel
if ch == "analysis" and not thinking:
thinking = True
yield chunk.model_copy(update={"text": "<think>"})
if ch != "analysis" and thinking:
thinking = False
yield chunk.model_copy(update={"text": "</think>"})
if delta:
yield chunk.model_copy(update={"text": delta})
if chunk.finish_reason is not None:
if thinking:
yield chunk.model_copy(update={"text": "</think>"})
yield chunk
break
async def _chat_chunk_stream(
self, command_id: CommandId, parse_gpt_oss: bool
self, command_id: CommandId
) -> AsyncGenerator[TokenChunk, None]:
"""Yield `TokenChunk`s for a given command until completion."""
@@ -418,16 +405,10 @@ class API:
self._chat_completion_queues[command_id], recv = channel[TokenChunk]()
with recv as token_chunks:
if parse_gpt_oss:
async for chunk in self._process_gpt_oss(token_chunks):
yield chunk
if chunk.finish_reason is not None:
break
else:
async for chunk in token_chunks:
yield chunk
if chunk.finish_reason is not None:
break
async for chunk in token_chunks:
yield chunk
if chunk.finish_reason is not None:
break
except anyio.get_cancelled_exc_class():
# TODO: TaskCancelled
@@ -443,11 +424,23 @@ class API:
del self._chat_completion_queues[command_id]
async def _generate_chat_stream(
self, command_id: CommandId, parse_gpt_oss: bool
self, command_id: CommandId
) -> AsyncGenerator[str, None]:
"""Generate chat completion stream as JSON strings."""
async for chunk in self._chat_chunk_stream(command_id, parse_gpt_oss):
async for chunk in self._chat_chunk_stream(command_id):
if chunk.finish_reason == "error":
error_response = ErrorResponse(
error=ErrorInfo(
message=chunk.error_message or "Internal server error",
type="InternalServerError",
code=500,
)
)
yield f"data: {error_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return
chunk_response: ChatCompletionResponse = chunk_to_response(
chunk, command_id
)
@@ -459,7 +452,7 @@ class API:
yield "data: [DONE]\n\n"
async def _collect_chat_completion(
self, command_id: CommandId, parse_gpt_oss: bool
self, command_id: CommandId
) -> ChatCompletionResponse:
"""Collect all token chunks for a chat completion and return a single response."""
@@ -467,7 +460,13 @@ class API:
model: str | None = None
finish_reason: FinishReason | None = None
async for chunk in self._chat_chunk_stream(command_id, parse_gpt_oss):
async for chunk in self._chat_chunk_stream(command_id):
if chunk.finish_reason == "error":
raise HTTPException(
status_code=500,
detail=chunk.error_message or "Internal server error",
)
if model is None:
model = chunk.model
@@ -496,7 +495,7 @@ class API:
)
async def _collect_chat_completion_with_stats(
self, command_id: CommandId, parse_gpt_oss: bool
self, command_id: CommandId
) -> BenchChatCompletionResponse:
text_parts: list[str] = []
model: str | None = None
@@ -504,7 +503,13 @@ class API:
stats: GenerationStats | None = None
async for chunk in self._chat_chunk_stream(command_id, parse_gpt_oss):
async for chunk in self._chat_chunk_stream(command_id):
if chunk.finish_reason == "error":
raise HTTPException(
status_code=500,
detail=chunk.error_message or "Internal server error",
)
if model is None:
model = chunk.model
@@ -545,8 +550,6 @@ class API:
"""Handle chat completions, supporting both streaming and non-streaming responses."""
model_meta = await resolve_model_meta(payload.model)
payload.model = model_meta.model_id
parse_gpt_oss = "gpt-oss" in model_meta.model_id.lower()
logger.info(f"{parse_gpt_oss=}")
if not any(
instance.shard_assignments.model_id == payload.model
@@ -563,17 +566,16 @@ class API:
await self._send(command)
if payload.stream:
return StreamingResponse(
self._generate_chat_stream(command.command_id, parse_gpt_oss),
self._generate_chat_stream(command.command_id),
media_type="text/event-stream",
)
return await self._collect_chat_completion(command.command_id, parse_gpt_oss)
return await self._collect_chat_completion(command.command_id)
async def bench_chat_completions(
self, payload: BenchChatCompletionTaskParams
) -> BenchChatCompletionResponse:
model_meta = await resolve_model_meta(payload.model)
parse_gpt_oss = "gpt-oss" in model_meta.model_id.lower()
payload.model = model_meta.model_id
if not any(
@@ -590,10 +592,7 @@ class API:
command = ChatCompletion(request_params=payload)
await self._send(command)
response = await self._collect_chat_completion_with_stats(
command.command_id,
parse_gpt_oss,
)
response = await self._collect_chat_completion_with_stats(command.command_id)
return response
def _calculate_total_available_memory(self) -> Memory:
@@ -623,65 +622,6 @@ class API:
]
)
async def tracker_announce(self, request: Request) -> Response:
"""BitTorrent tracker announce endpoint for private tracker."""
try:
from exo_pyo3_bindings import handle_tracker_announce # type: ignore
except ImportError as e:
raise HTTPException(
status_code=501,
detail="Torrent support not available (exo_pyo3_bindings not installed)",
) from e
# Parse announce parameters from query string
query_params = dict(request.query_params)
# Extract required parameters
try:
info_hash_hex = query_params.get("info_hash", "")
peer_id_hex = query_params.get("peer_id", "")
# URL decode and convert to bytes
info_hash = bytes.fromhex(info_hash_hex) if info_hash_hex else b""
peer_id = bytes.fromhex(peer_id_hex) if peer_id_hex else b""
if len(info_hash) != 20 or len(peer_id) != 20:
raise ValueError("info_hash and peer_id must be 20 bytes")
params = {
"info_hash": info_hash,
"peer_id": peer_id,
"port": int(query_params.get("port", "6881")),
"uploaded": int(query_params.get("uploaded", "0")),
"downloaded": int(query_params.get("downloaded", "0")),
"left": int(query_params.get("left", "0")),
"compact": query_params.get("compact", "1") == "1",
}
except (ValueError, KeyError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid announce parameters: {e}",
) from e
# Build peer list from topology
# TODO: Implement _build_peer_list_from_topology() to extract peers from self.state.topology
peers = [] # For now, return empty peer list
# Call Rust tracker handler
try:
response_bytes: bytes = handle_tracker_announce(params, peers) # type: ignore
return Response(
content=response_bytes,
media_type="text/plain",
headers={"Content-Type": "text/plain"},
)
except Exception as e:
logger.error(f"Tracker announce error: {e}")
raise HTTPException(
status_code=500,
detail=f"Tracker announce failed: {e}",
) from e
async def run(self):
cfg = Config()
cfg.bind = f"0.0.0.0:{self.port}"
@@ -714,14 +654,14 @@ class API:
for idx, event in self.event_buffer.drain_indexed():
self._event_log.append(event)
self.state = apply(self.state, IndexedEvent(event=event, idx=idx))
if (
isinstance(event, ChunkGenerated)
and event.command_id in self._chat_completion_queues
):
if isinstance(event, ChunkGenerated):
assert isinstance(event.chunk, TokenChunk)
await self._chat_completion_queues[event.command_id].send(
event.chunk
)
queue = self._chat_completion_queues.get(event.command_id)
if queue is not None:
try:
await queue.send(event.chunk)
except BrokenResourceError:
self._chat_completion_queues.pop(event.command_id, None)
async def _pause_on_new_election(self):
with self.election_receiver as ems:

View File

@@ -0,0 +1,107 @@
# pyright: reportUnusedFunction=false, reportAny=false
from typing import Any, get_args
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from exo.shared.types.api import ErrorInfo, ErrorResponse, FinishReason
from exo.shared.types.chunks import TokenChunk
from exo.worker.tests.constants import MODEL_A_ID
def test_http_exception_handler_formats_openai_style() -> None:
"""Test that HTTPException is converted to OpenAI-style error format."""
from exo.master.api import API
app = FastAPI()
# Setup exception handler
api = object.__new__(API)
api.app = app
api._setup_exception_handlers() # pyright: ignore[reportPrivateUsage]
# Add test routes that raise HTTPException
@app.get("/test-error")
async def _test_error() -> None:
raise HTTPException(status_code=500, detail="Test error message")
@app.get("/test-not-found")
async def _test_not_found() -> None:
raise HTTPException(status_code=404, detail="Resource not found")
client = TestClient(app)
# Test 500 error
response = client.get("/test-error")
assert response.status_code == 500
data: dict[str, Any] = response.json()
assert "error" in data
assert data["error"]["message"] == "Test error message"
assert data["error"]["type"] == "Internal Server Error"
assert data["error"]["code"] == 500
# Test 404 error
response = client.get("/test-not-found")
assert response.status_code == 404
data = response.json()
assert "error" in data
assert data["error"]["message"] == "Resource not found"
assert data["error"]["type"] == "Not Found"
assert data["error"]["code"] == 404
def test_finish_reason_includes_error() -> None:
valid_reasons = get_args(FinishReason)
assert "error" in valid_reasons
def test_token_chunk_with_error_fields() -> None:
chunk = TokenChunk(
idx=0,
model=MODEL_A_ID,
text="",
token_id=0,
finish_reason="error",
error_message="Something went wrong",
)
assert chunk.finish_reason == "error"
assert chunk.error_message == "Something went wrong"
def test_token_chunk_without_error() -> None:
chunk = TokenChunk(
idx=1,
model=MODEL_A_ID,
text="Hello",
token_id=42,
finish_reason=None,
)
assert chunk.finish_reason is None
assert chunk.error_message is None
def test_error_response_construction() -> None:
error_response = ErrorResponse(
error=ErrorInfo(
message="Generation failed",
type="InternalServerError",
code=500,
)
)
assert error_response.error.message == "Generation failed"
assert error_response.error.code == 500
def test_normal_finish_reasons_still_work() -> None:
for reason in ["stop", "length", "tool_calls", "content_filter", "function_call"]:
chunk = TokenChunk(
idx=0,
model=MODEL_A_ID,
text="done",
token_id=100,
finish_reason=reason, # type: ignore[arg-type]
)
assert chunk.finish_reason == reason

View File

@@ -29,6 +29,11 @@ class _InterceptHandler(logging.Handler):
def logger_setup(log_file: Path | None, verbosity: int = 0):
"""Set up logging for this process - formatting, file handles, verbosity and output"""
logging.getLogger("exo_pyo3_bindings").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger.remove()
# replace all stdlib loggers with _InterceptHandlers that log to loguru

View File

@@ -14,32 +14,6 @@ class ModelCard(CamelCaseModel):
MODEL_CARDS: dict[str, ModelCard] = {
# deepseek v3
# "deepseek-v3-0324:4bit": ModelCard(
# short_id="deepseek-v3-0324:4bit",
# model_id="mlx-community/DeepSeek-V3-0324-4bit",
# name="DeepSeek V3 0324 (4-bit)",
# description="""DeepSeek V3 is a large language model trained on the DeepSeek V3 dataset.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/DeepSeek-V3-0324-4bit"),
# pretty_name="DeepSeek V3 0324 (4-bit)",
# storage_size=Memory.from_kb(409706307),
# n_layers=61,
# ),
# ),
# "deepseek-v3-0324": ModelCard(
# short_id="deepseek-v3-0324",
# model_id="mlx-community/DeepSeek-v3-0324-8bit",
# name="DeepSeek V3 0324 (8-bit)",
# description="""DeepSeek V3 is a large language model trained on the DeepSeek V3 dataset.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/DeepSeek-v3-0324-8bit"),
# pretty_name="DeepSeek V3 0324 (8-bit)",
# storage_size=Memory.from_kb(754706307),
# n_layers=61,
# ),
# ),
"deepseek-v3.1-4bit": ModelCard(
short_id="deepseek-v3.1-4bit",
model_id=ModelId("mlx-community/DeepSeek-V3.1-4bit"),
@@ -70,63 +44,6 @@ MODEL_CARDS: dict[str, ModelCard] = {
supports_tensor=True,
),
),
# "deepseek-v3.2": ModelCard(
# short_id="deepseek-v3.2",
# model_id=ModelId("mlx-community/DeepSeek-V3.2-8bit"),
# name="DeepSeek V3.2 (8-bit)",
# description="""DeepSeek V3.2 is a large language model trained on the DeepSeek V3.2 dataset.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/DeepSeek-V3.2-8bit"),
# pretty_name="DeepSeek V3.2 (8-bit)",
# storage_size=Memory.from_kb(754706307),
# n_layers=61,
# hidden_size=7168,
# ),
# ),
# "deepseek-v3.2-4bit": ModelCard(
# short_id="deepseek-v3.2-4bit",
# model_id=ModelId("mlx-community/DeepSeek-V3.2-4bit"),
# name="DeepSeek V3.2 (4-bit)",
# description="""DeepSeek V3.2 is a large language model trained on the DeepSeek V3.2 dataset.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/DeepSeek-V3.2-4bit"),
# pretty_name="DeepSeek V3.2 (4-bit)",
# storage_size=Memory.from_kb(754706307 // 2), # TODO !!!!!
# n_layers=61,
# hidden_size=7168,
# ),
# ),
# deepseek r1
# "deepseek-r1-0528-4bit": ModelCard(
# short_id="deepseek-r1-0528-4bit",
# model_id="mlx-community/DeepSeek-R1-0528-4bit",
# name="DeepSeek-R1-0528 (4-bit)",
# description="""DeepSeek R1 is a large language model trained on the DeepSeek R1 dataset.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/DeepSeek-R1-0528-4bit"),
# pretty_name="DeepSeek R1 671B (4-bit)",
# storage_size=Memory.from_kb(409706307),
# n_layers=61,
# hidden_size=7168,
# ),
# ),
# "deepseek-r1-0528": ModelCard(
# short_id="deepseek-r1-0528",
# model_id="mlx-community/DeepSeek-R1-0528-8bit",
# name="DeepSeek-R1-0528 (8-bit)",
# description="""DeepSeek R1 is a large language model trained on the DeepSeek R1 dataset.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/DeepSeek-R1-0528-8bit"),
# pretty_name="DeepSeek R1 671B (8-bit)",
# storage_size=Memory.from_bytes(754998771712),
# n_layers=61,
# . hidden_size=7168,
# ),
# ),
# kimi k2
"kimi-k2-instruct-4bit": ModelCard(
short_id="kimi-k2-instruct-4bit",
@@ -508,23 +425,24 @@ MODEL_CARDS: dict[str, ModelCard] = {
supports_tensor=True,
),
),
"gpt-oss-20b-4bit": ModelCard(
short_id="gpt-oss-20b-4bit",
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q4"),
name="GPT-OSS 20B (MXFP4-Q4, MLX)",
description="""OpenAI's GPT-OSS 20B is a medium-sized MoE model for lower-latency and local or specialized use cases; this MLX variant uses MXFP4 4-bit quantization.""",
"gpt-oss-20b-MXFP4-Q8": ModelCard(
short_id="gpt-oss-20b-MXFP4-Q8",
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q8"),
name="GPT-OSS 20B (MXFP4-Q8, MLX)",
description="""OpenAI's GPT-OSS 20B is a medium-sized MoE model for lower-latency and local or specialized use cases; this variant is a 4-bit MLX conversion for Apple Silicon.""",
tags=[],
metadata=ModelMetadata(
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q4"),
pretty_name="GPT-OSS 20B (MXFP4-Q4, MLX)",
model_id=ModelId("mlx-community/gpt-oss-20b-MXFP4-Q8"),
pretty_name="GPT-OSS 20B (MXFP4-Q8, MLX)",
storage_size=Memory.from_kb(11_744_051),
n_layers=24,
hidden_size=2880,
supports_tensor=True,
),
),
# Needs to be quantized g32 or g16.
# glm 4.5
"glm-4.5-air-8bit": ModelCard(
# Needs to be quantized g32 or g16 to work with tensor parallel
short_id="glm-4.5-air-8bit",
model_id=ModelId("mlx-community/GLM-4.5-Air-8bit"),
name="GLM 4.5 Air 8bit",
@@ -554,19 +472,81 @@ MODEL_CARDS: dict[str, ModelCard] = {
supports_tensor=True,
),
),
# "devstral-2-123b-instruct-2512-8bit": ModelCard(
# short_id="devstral-2-123b-instruct-2512-8bit",
# model_id=ModelId("mlx-community/Devstral-2-123B-Instruct-2512-8bit"),
# name="Devstral 2 123B Instruct 2512 (8-bit, MLX)",
# description="""Mistral AI's Devstral 2 123B Instruct (2512) is an agentic coding model.""",
# tags=[],
# metadata=ModelMetadata(
# model_id=ModelId("mlx-community/Devstral-2-123B-Instruct-2512-8bit"),
# pretty_name="Devstral 2 123B Instruct 2512 (8-bit, MLX)",
# storage_size=Memory.from_kb(133_000_000),
# n_layers=88,
# hidden_size=12288,
# supports_tensor=True,
# ),
# ),
# glm 4.7
"glm-4.7-4bit": ModelCard(
short_id="glm-4.7-4bit",
model_id=ModelId("mlx-community/GLM-4.7-4bit"),
name="GLM 4.7 4bit",
description="GLM 4.7 4bit",
tags=[],
metadata=ModelMetadata(
model_id=ModelId("mlx-community/GLM-4.7-4bit"),
pretty_name="GLM 4.7 4bit",
storage_size=Memory.from_bytes(198556925568),
n_layers=91,
hidden_size=5120,
supports_tensor=True,
),
),
"glm-4.7-6bit": ModelCard(
short_id="glm-4.7-6bit",
model_id=ModelId("mlx-community/GLM-4.7-6bit"),
name="GLM 4.7 6bit",
description="GLM 4.7 6bit",
tags=[],
metadata=ModelMetadata(
model_id=ModelId("mlx-community/GLM-4.7-6bit"),
pretty_name="GLM 4.7 6bit",
storage_size=Memory.from_bytes(286737579648),
n_layers=91,
hidden_size=5120,
supports_tensor=True,
),
),
"glm-4.7-8bit-gs32": ModelCard(
short_id="glm-4.7-8bit-gs32",
model_id=ModelId("mlx-community/GLM-4.7-8bit-gs32"),
name="GLM 4.7 8bit (gs32)",
description="GLM 4.7 8bit (gs32)",
tags=[],
metadata=ModelMetadata(
model_id=ModelId("mlx-community/GLM-4.7-8bit-gs32"),
pretty_name="GLM 4.7 8bit (gs32)",
storage_size=Memory.from_bytes(396963397248),
n_layers=91,
hidden_size=5120,
supports_tensor=True,
),
),
# minimax-m2
"minimax-m2.1-8bit": ModelCard(
short_id="minimax-m2.1-8bit",
model_id=ModelId("mlx-community/MiniMax-M2.1-8bit"),
name="MiniMax M2.1 8bit",
description="MiniMax M2.1 8bit",
tags=[],
metadata=ModelMetadata(
model_id=ModelId("mlx-community/MiniMax-M2.1-8bit"),
pretty_name="MiniMax M2.1 8bit",
storage_size=Memory.from_bytes(242986745856),
n_layers=61,
hidden_size=3072,
supports_tensor=True,
),
),
"minimax-m2.1-3bit": ModelCard(
short_id="minimax-m2.1-3bit",
model_id=ModelId("mlx-community/MiniMax-M2.1-3bit"),
name="MiniMax M2.1 3bit",
description="MiniMax M2.1 3bit",
tags=[],
metadata=ModelMetadata(
model_id=ModelId("mlx-community/MiniMax-M2.1-3bit"),
pretty_name="MiniMax M2.1 3bit",
storage_size=Memory.from_bytes(100086644736),
n_layers=61,
hidden_size=3072,
supports_tensor=True,
),
),
}

View File

@@ -11,10 +11,21 @@ from exo.shared.types.worker.instances import Instance, InstanceId, InstanceMeta
from exo.shared.types.worker.shards import Sharding
FinishReason = Literal[
"stop", "length", "tool_calls", "content_filter", "function_call"
"stop", "length", "tool_calls", "content_filter", "function_call", "error"
]
class ErrorInfo(BaseModel):
message: str
type: str
param: str | None = None
code: int
class ErrorResponse(BaseModel):
error: ErrorInfo
class ModelListModel(BaseModel):
id: str
object: str = "model"

View File

@@ -22,6 +22,7 @@ class TokenChunk(BaseChunk):
token_id: int
finish_reason: FinishReason | None = None
stats: GenerationStats | None = None
error_message: str | None = None
class ImageChunk(BaseChunk):

View File

@@ -16,4 +16,3 @@ class ModelMetadata(CamelCaseModel):
n_layers: PositiveInt
hidden_size: PositiveInt
supports_tensor: bool
revision: str | None = None # Git commit hash for torrent lookup

Some files were not shown because too many files have changed in this diff Show More