mirror of
https://github.com/exo-explore/exo.git
synced 2026-05-24 06:35:32 -04:00
## Motivation No automated integration tests exist for exo. Manual testing against real hardware clusters is slow and error-prone. We need a pytest framework that deploys clusters via `eco`, runs inference scenarios, and tears down cleanly. ## Changes - **`tools/src/exo_tools/`** — New workspace member shared by bench, eval, and tests: - `client.py` — `ExoClient` HTTP client (extracted from `bench/harness.py`) - `harness.py` — instance lifecycle helpers (placement, wait-for-ready, etc.) - `cluster.py` — `EcoSession` for eco cluster lifecycle (deploy/stop/start/release/logs/exec) with unique `USER=<prefix>-<uuid>` per session and atexit/signal cleanup - **`tests/integration/`** — 17 pytest tests across 5 files: - `test_1node.py` — place, chat, multi-turn, delete, state/models endpoints, cluster snapshot, download-from-scratch - `test_2node.py` — parametrized tensor/jaccl + pipeline/ring inference and multi-turn - `test_4node.py` — parametrized 4-node pipeline/ring inference, cluster state - `test_resilience.py` — full disconnect/reconnect cycle (2-node → disconnect → 1-node → reconnect → 2-node) - `test_dashboard.py` — Playwright: dashboard loads, shows node info, chat flow - `helpers.py` — placement/inference helpers, re-exports from `exo_tools` - `conftest.py` — session-scoped cluster fixtures with constraint-based eco reservations; `--hosts` override; `EXO_REF` env var for CI deployments from a GitHub branch - **`bench/`** — Updated imports from `exo_tools.client` / `exo_tools.harness` - **`pyproject.toml`** — Added `tools` workspace member, `playwright` dev dep, `--ignore=tests/integration` ## Why It Works Tests use `eco` for cluster lifecycle and `ExoClient` for API interactions — same tools humans use. Session-scoped fixtures deploy once per file. Unique eco users prevent test runs from interfering with each other or manual usage. ## Test Plan ### Automated Testing - `uv run pytest tests/integration/ -v -s` — full suite (~4-5 min, 17/17 passing) - `uv run pytest tests/integration/ -v -s --hosts s4,s9,s10,s22` — pin specific hosts - `EXO_REF=main uv run pytest tests/integration/ -v` — deploy from a GitHub branch (CI) - `uv run pytest` — confirms integration tests are excluded from default runs
182 lines
5.5 KiB
Python
182 lines
5.5 KiB
Python
# type: ignore
|
|
"""Pytest configuration for marker-driven exo integration tests.
|
|
|
|
Test authors declare requirements via markers:
|
|
|
|
@pytest.mark.cluster(count=2, thunderbolt='a2a')
|
|
@pytest.mark.instance('mlx-community/Llama-3.2-1B-Instruct-4bit',
|
|
sharding='tensor', comm='jaccl')
|
|
def test_jaccl_inference(session):
|
|
resp = session.chat('What is 2+2?')
|
|
assert '4' in resp
|
|
|
|
Clusters are cached by `ClusterSpec`; tests with the same cluster_spec
|
|
share a deployment. Each test places its own instance (matching its
|
|
`@pytest.mark.instance`), and instances are cleaned up after the test.
|
|
|
|
Run with:
|
|
uv run pytest tests/ -v
|
|
uv run pytest tests/ -v --hosts s2,s4,s9,s10
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import json
|
|
|
|
import pytest
|
|
from exo_tools.cluster import ClusterInfo, EcoSession
|
|
from exo_tools.harness import cleanup_all_instances, place_instance
|
|
|
|
from .framework import (
|
|
ClusterSpec,
|
|
Session,
|
|
parse_cluster_marker,
|
|
parse_instance_marker,
|
|
)
|
|
|
|
# Single eco session for the entire test process.
|
|
eco = EcoSession(user_prefix="test")
|
|
|
|
# Cluster cache keyed by ClusterSpec — tests with the same spec share a deployment.
|
|
# Cleared at session teardown.
|
|
_cluster_cache: dict[ClusterSpec, ClusterInfo] = {}
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption(
|
|
"--hosts",
|
|
default=None,
|
|
help="Comma-separated list of hosts (e.g. s2,s4,s9,s10). "
|
|
"Overrides constraint-based reservation.",
|
|
)
|
|
|
|
|
|
def pytest_configure(config):
|
|
"""Register custom markers."""
|
|
config.addinivalue_line(
|
|
"markers",
|
|
"cluster(count=N, thunderbolt=Thunderbolt|None, min_memory=GB, chip=PATTERN): "
|
|
"declare cluster requirements for a test",
|
|
)
|
|
config.addinivalue_line(
|
|
"markers",
|
|
"instance(model_id, sharding=Sharding, comm=Comm, min_nodes=N): "
|
|
"declare instance placement for a test",
|
|
)
|
|
|
|
|
|
def pytest_report_header(config):
|
|
"""Show the eco user and hosts for this test session."""
|
|
hosts = config.getoption("--hosts")
|
|
lines = [f"eco user: {eco.user}"]
|
|
if hosts:
|
|
lines.append(f"hosts override: {hosts}")
|
|
return lines
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def _host_pool(request) -> list[str] | None:
|
|
raw = request.config.getoption("--hosts")
|
|
if raw:
|
|
return [h.strip() for h in raw.split(",") if h.strip()]
|
|
return None
|
|
|
|
|
|
@pytest.fixture
|
|
def session(request, _host_pool) -> Session:
|
|
"""Per-test fixture providing a Session matching the test's markers.
|
|
|
|
Reads @pytest.mark.cluster and @pytest.mark.instance from the test, deploys
|
|
a matching cluster (cached across tests with the same spec), places the
|
|
model, and yields a Session for the test to interact with. Cleans up the
|
|
instance after the test, and invalidates the cluster cache if the test
|
|
left nodes disconnected.
|
|
"""
|
|
cluster_marker = request.node.get_closest_marker("cluster")
|
|
instance_marker = request.node.get_closest_marker("instance")
|
|
|
|
cluster_spec = parse_cluster_marker(cluster_marker)
|
|
instance_spec = parse_instance_marker(instance_marker)
|
|
|
|
# Deploy or reuse a cluster matching the spec
|
|
cluster = _cluster_cache.get(cluster_spec)
|
|
if cluster is None:
|
|
if _host_pool:
|
|
cluster = eco.start_deploy(
|
|
hosts=_host_pool[: cluster_spec.count], wait=True
|
|
)
|
|
else:
|
|
cluster = eco.start_deploy(
|
|
count=cluster_spec.count,
|
|
thunderbolt=cluster_spec.thunderbolt,
|
|
chip=cluster_spec.chip,
|
|
min_memory_gb=cluster_spec.min_memory_gb,
|
|
wait=True,
|
|
)
|
|
_cluster_cache[cluster_spec] = cluster
|
|
|
|
# Place an instance for this test if the test specified one
|
|
instance_id = None
|
|
if instance_spec is not None:
|
|
client = cluster.make_client()
|
|
instance_id = place_instance(
|
|
client,
|
|
instance_spec.model_id,
|
|
sharding=instance_spec.sharding,
|
|
comm=instance_spec.comm,
|
|
min_nodes=instance_spec.min_nodes,
|
|
)
|
|
|
|
sess = Session(
|
|
cluster=cluster,
|
|
eco=eco,
|
|
instance_spec=instance_spec,
|
|
instance_id=instance_id,
|
|
)
|
|
|
|
yield sess
|
|
|
|
# ---- Teardown ----
|
|
|
|
# If the test left nodes disconnected, invalidate the cluster cache and
|
|
# stop the cluster so the next test deploys fresh.
|
|
if sess._stopped_hosts:
|
|
_cluster_cache.pop(cluster_spec, None)
|
|
with contextlib.suppress(Exception):
|
|
eco.stop(sess.cluster.hosts)
|
|
return
|
|
|
|
# Otherwise, clean up any instances created during the test
|
|
with contextlib.suppress(Exception):
|
|
cleanup_all_instances(sess.client)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session-level teardown — stop all cached clusters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def _teardown_clusters():
|
|
yield
|
|
for cluster in _cluster_cache.values():
|
|
with contextlib.suppress(Exception):
|
|
eco.stop(cluster.hosts)
|
|
_cluster_cache.clear()
|
|
|
|
|
|
def pytest_runtest_makereport(item, call):
|
|
"""Attach cluster logs to the test report when a test fails."""
|
|
if call.when != "call" or call.excinfo is None:
|
|
return
|
|
|
|
sess = item.funcargs.get("session")
|
|
if sess is None:
|
|
return
|
|
try:
|
|
logs = eco.logs(sess.cluster.hosts, lines=200)
|
|
item.add_report_section("call", "Cluster Logs", json.dumps(logs, indent=2))
|
|
except Exception:
|
|
pass
|