Files
exo/tests/conftest.py
ciaranbor fa57131374 Integration tests infra (#1995)
## Motivation

No automated integration tests exist for exo. Manual testing against
real hardware clusters is slow and error-prone. We need a pytest
framework that deploys clusters via `eco`, runs inference scenarios, and
tears down cleanly.

## Changes

- **`tools/src/exo_tools/`** — New workspace member shared by bench,
eval, and tests:
- `client.py` — `ExoClient` HTTP client (extracted from
`bench/harness.py`)
- `harness.py` — instance lifecycle helpers (placement, wait-for-ready,
etc.)
- `cluster.py` — `EcoSession` for eco cluster lifecycle
(deploy/stop/start/release/logs/exec) with unique `USER=<prefix>-<uuid>`
per session and atexit/signal cleanup
- **`tests/integration/`** — 17 pytest tests across 5 files:
- `test_1node.py` — place, chat, multi-turn, delete, state/models
endpoints, cluster snapshot, download-from-scratch
- `test_2node.py` — parametrized tensor/jaccl + pipeline/ring inference
and multi-turn
- `test_4node.py` — parametrized 4-node pipeline/ring inference, cluster
state
- `test_resilience.py` — full disconnect/reconnect cycle (2-node →
disconnect → 1-node → reconnect → 2-node)
- `test_dashboard.py` — Playwright: dashboard loads, shows node info,
chat flow
- `helpers.py` — placement/inference helpers, re-exports from
`exo_tools`
- `conftest.py` — session-scoped cluster fixtures with constraint-based
eco reservations; `--hosts` override; `EXO_REF` env var for CI
deployments from a GitHub branch
- **`bench/`** — Updated imports from `exo_tools.client` /
`exo_tools.harness`
- **`pyproject.toml`** — Added `tools` workspace member, `playwright`
dev dep, `--ignore=tests/integration`

## Why It Works

Tests use `eco` for cluster lifecycle and `ExoClient` for API
interactions — same tools humans use. Session-scoped fixtures deploy
once per file. Unique eco users prevent test runs from interfering with
each other or manual usage.

## Test Plan

### Automated Testing

- `uv run pytest tests/integration/ -v -s` — full suite (~4-5 min, 17/17
passing)
- `uv run pytest tests/integration/ -v -s --hosts s4,s9,s10,s22` — pin
specific hosts
- `EXO_REF=main uv run pytest tests/integration/ -v` — deploy from a
GitHub branch (CI)
- `uv run pytest` — confirms integration tests are excluded from default
runs
2026-05-08 17:15:08 +01:00

182 lines
5.5 KiB
Python

# type: ignore
"""Pytest configuration for marker-driven exo integration tests.
Test authors declare requirements via markers:
@pytest.mark.cluster(count=2, thunderbolt='a2a')
@pytest.mark.instance('mlx-community/Llama-3.2-1B-Instruct-4bit',
sharding='tensor', comm='jaccl')
def test_jaccl_inference(session):
resp = session.chat('What is 2+2?')
assert '4' in resp
Clusters are cached by `ClusterSpec`; tests with the same cluster_spec
share a deployment. Each test places its own instance (matching its
`@pytest.mark.instance`), and instances are cleaned up after the test.
Run with:
uv run pytest tests/ -v
uv run pytest tests/ -v --hosts s2,s4,s9,s10
"""
from __future__ import annotations
import contextlib
import json
import pytest
from exo_tools.cluster import ClusterInfo, EcoSession
from exo_tools.harness import cleanup_all_instances, place_instance
from .framework import (
ClusterSpec,
Session,
parse_cluster_marker,
parse_instance_marker,
)
# Single eco session for the entire test process.
eco = EcoSession(user_prefix="test")
# Cluster cache keyed by ClusterSpec — tests with the same spec share a deployment.
# Cleared at session teardown.
_cluster_cache: dict[ClusterSpec, ClusterInfo] = {}
def pytest_addoption(parser):
parser.addoption(
"--hosts",
default=None,
help="Comma-separated list of hosts (e.g. s2,s4,s9,s10). "
"Overrides constraint-based reservation.",
)
def pytest_configure(config):
"""Register custom markers."""
config.addinivalue_line(
"markers",
"cluster(count=N, thunderbolt=Thunderbolt|None, min_memory=GB, chip=PATTERN): "
"declare cluster requirements for a test",
)
config.addinivalue_line(
"markers",
"instance(model_id, sharding=Sharding, comm=Comm, min_nodes=N): "
"declare instance placement for a test",
)
def pytest_report_header(config):
"""Show the eco user and hosts for this test session."""
hosts = config.getoption("--hosts")
lines = [f"eco user: {eco.user}"]
if hosts:
lines.append(f"hosts override: {hosts}")
return lines
@pytest.fixture(scope="session")
def _host_pool(request) -> list[str] | None:
raw = request.config.getoption("--hosts")
if raw:
return [h.strip() for h in raw.split(",") if h.strip()]
return None
@pytest.fixture
def session(request, _host_pool) -> Session:
"""Per-test fixture providing a Session matching the test's markers.
Reads @pytest.mark.cluster and @pytest.mark.instance from the test, deploys
a matching cluster (cached across tests with the same spec), places the
model, and yields a Session for the test to interact with. Cleans up the
instance after the test, and invalidates the cluster cache if the test
left nodes disconnected.
"""
cluster_marker = request.node.get_closest_marker("cluster")
instance_marker = request.node.get_closest_marker("instance")
cluster_spec = parse_cluster_marker(cluster_marker)
instance_spec = parse_instance_marker(instance_marker)
# Deploy or reuse a cluster matching the spec
cluster = _cluster_cache.get(cluster_spec)
if cluster is None:
if _host_pool:
cluster = eco.start_deploy(
hosts=_host_pool[: cluster_spec.count], wait=True
)
else:
cluster = eco.start_deploy(
count=cluster_spec.count,
thunderbolt=cluster_spec.thunderbolt,
chip=cluster_spec.chip,
min_memory_gb=cluster_spec.min_memory_gb,
wait=True,
)
_cluster_cache[cluster_spec] = cluster
# Place an instance for this test if the test specified one
instance_id = None
if instance_spec is not None:
client = cluster.make_client()
instance_id = place_instance(
client,
instance_spec.model_id,
sharding=instance_spec.sharding,
comm=instance_spec.comm,
min_nodes=instance_spec.min_nodes,
)
sess = Session(
cluster=cluster,
eco=eco,
instance_spec=instance_spec,
instance_id=instance_id,
)
yield sess
# ---- Teardown ----
# If the test left nodes disconnected, invalidate the cluster cache and
# stop the cluster so the next test deploys fresh.
if sess._stopped_hosts:
_cluster_cache.pop(cluster_spec, None)
with contextlib.suppress(Exception):
eco.stop(sess.cluster.hosts)
return
# Otherwise, clean up any instances created during the test
with contextlib.suppress(Exception):
cleanup_all_instances(sess.client)
# ---------------------------------------------------------------------------
# Session-level teardown — stop all cached clusters
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session", autouse=True)
def _teardown_clusters():
yield
for cluster in _cluster_cache.values():
with contextlib.suppress(Exception):
eco.stop(cluster.hosts)
_cluster_cache.clear()
def pytest_runtest_makereport(item, call):
"""Attach cluster logs to the test report when a test fails."""
if call.when != "call" or call.excinfo is None:
return
sess = item.funcargs.get("session")
if sess is None:
return
try:
logs = eco.logs(sess.cluster.hosts, lines=200)
item.add_report_section("call", "Cluster Logs", json.dumps(logs, indent=2))
except Exception:
pass