Files
exo/bench/exo_eval.py
Ryuichi Leo Takashige e7f3f47754 jeez that was dumb
2026-02-02 19:14:19 +00:00

917 lines
31 KiB
Python

#!/usr/bin/env python3
# pyright: reportAny=false, reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false
"""
exo-eval: Evaluation harness for exo inference system.
Supports multiple evaluation frameworks via TOML configuration:
- lm_eval: Language model evaluation using EleutherAI's lm-evaluation-harness
- livecodebench: Code generation benchmark (https://livecodebench.github.io/)
- swe_bench: SWE-bench evaluation (placeholder for future implementation)
- custom: Custom evaluation scripts
Usage:
uv run python -m bench.exo_eval --config bench/eval_config.toml --model Llama-3.2-1b-Instruct-4bit
uv run python -m bench.exo_eval --config bench/eval_config.toml --model Llama-3.2-1b-Instruct-4bit --dry-run
# Run LiveCodeBench (requires livecodebench package):
# First: git clone https://github.com/LiveCodeBench/LiveCodeBench && cd LiveCodeBench && uv pip install -e .
# Then set type = "livecodebench" in eval_config.toml
"""
from __future__ import annotations
import argparse
import contextlib
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Literal
# Add parent directory to path for direct script execution
if __name__ == "__main__" and __package__ is None:
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import tomlkit
from huggingface_hub import get_token as get_hf_token
from loguru import logger
from tomlkit.exceptions import TOMLKitError
from bench.exo_bench import (
ExoClient,
ExoHttpError,
instance_id_from_instance,
nodes_used_in_instance,
placement_filter,
resolve_model_short_id,
sharding_filter,
wait_for_instance_gone,
wait_for_instance_ready,
)
EvalType = Literal["lm_eval", "swe_bench", "livecodebench", "custom"]
def load_config(config_path: str) -> dict[str, Any]:
"""Load and parse TOML configuration file."""
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(path, encoding="utf-8") as f:
return dict(tomlkit.load(f))
def get_eval_type(config: dict[str, Any]) -> EvalType:
"""Extract evaluation type from config."""
eval_section = config.get("eval", {})
eval_type = eval_section.get("type", "lm_eval")
if eval_type not in ("lm_eval", "swe_bench", "livecodebench", "custom"):
raise ValueError(f"Unknown eval type: {eval_type}")
return eval_type
def check_hf_token(config: dict[str, Any]) -> bool:
"""Check if HuggingFace token is available when required.
Returns True if token is available or not required, False otherwise.
"""
eval_section = config.get("eval", {})
require_hf_token = eval_section.get("require_hf_token", True)
if not require_hf_token:
return True
token = get_hf_token()
if token is None:
logger.error(
"HuggingFace token not found. "
"Set HF_TOKEN environment variable or run 'huggingface-cli login'. "
"To disable this check, set require_hf_token = false in [eval] config."
)
return False
logger.info("HuggingFace token found")
return True
def select_placement(
client: ExoClient,
full_model_id: str,
config: dict[str, Any],
) -> dict[str, Any] | None:
"""Select a placement based on config preferences."""
instance_config = config.get("instance", {})
# If explicit instance is provided, use it directly
if "instance" in instance_config:
return instance_config["instance"]
# Otherwise, select from previews based on preferences
instance_meta_pref = instance_config.get("instance_meta", "ring")
sharding_pref = instance_config.get("sharding", "pipeline")
max_nodes = instance_config.get("max_nodes", 4)
min_nodes = instance_config.get("min_nodes", 1)
previews_resp = client.request_json(
"GET", "/instance/previews", params={"model_id": full_model_id}
)
previews = previews_resp.get("previews") or []
selected: list[dict[str, Any]] = []
for p in previews:
if p.get("error") is not None:
continue
if not placement_filter(str(p.get("instance_meta", "")), instance_meta_pref):
continue
if not sharding_filter(str(p.get("sharding", "")), sharding_pref):
continue
instance = p.get("instance")
if not isinstance(instance, dict):
continue
n = nodes_used_in_instance(instance)
if min_nodes <= n <= max_nodes:
selected.append(p)
if not selected:
return None
# Sort by preference: exact match on sharding/meta, then by node count (descending)
def sort_key(p: dict[str, Any]) -> tuple[int, int, int]:
meta_match = (
1 if instance_meta_pref in str(p.get("instance_meta", "")).lower() else 0
)
sharding_match = 1 if sharding_pref in str(p.get("sharding", "")).lower() else 0
n_nodes = nodes_used_in_instance(p["instance"])
return (meta_match, sharding_match, n_nodes)
selected.sort(key=sort_key, reverse=True)
return selected[0]
def setup_instance(
client: ExoClient,
full_model_id: str,
config: dict[str, Any],
dry_run: bool,
) -> tuple[str | None, dict[str, Any] | None]:
"""Create and wait for an instance to be ready. Returns (instance_id, preview)."""
preview = select_placement(client, full_model_id, config)
if preview is None:
logger.error("No valid placement found matching config preferences")
return None, None
instance_data = preview.get("instance")
instance: dict[str, Any] = (
instance_data if isinstance(instance_data, dict) else preview
)
instance_id = instance_id_from_instance(instance)
sharding = str(preview.get("sharding", "unknown"))
instance_meta = str(preview.get("instance_meta", "unknown"))
n_nodes = nodes_used_in_instance(instance)
logger.info(f"Selected placement: {sharding} / {instance_meta} / nodes={n_nodes}")
logger.info(f"Instance ID: {instance_id}")
if dry_run:
logger.info("[dry-run] Would create instance and wait for ready")
return instance_id, preview
# Create instance
client.request_json("POST", "/instance", body={"instance": instance})
try:
wait_for_instance_ready(client, instance_id)
logger.info("Instance is ready")
time.sleep(1) # Brief pause after ready
return instance_id, preview
except (RuntimeError, TimeoutError) as e:
logger.error(f"Failed to initialize instance: {e}")
with contextlib.suppress(ExoHttpError):
client.request_json("DELETE", f"/instance/{instance_id}")
return None, None
def teardown_instance(client: ExoClient, instance_id: str) -> None:
"""Delete an instance and wait for it to be gone."""
try:
client.request_json("DELETE", f"/instance/{instance_id}")
except ExoHttpError as e:
if e.status != 404:
raise
except (ConnectionRefusedError, OSError):
logger.warning(
f"Could not connect to exo to delete instance {instance_id} (server may be down)"
)
return
try:
wait_for_instance_gone(client, instance_id)
except (ConnectionRefusedError, OSError, TimeoutError):
logger.warning("Could not verify instance deletion (server may be down)")
return
logger.info(f"Instance {instance_id} deleted")
def build_lm_eval_args(
config: dict[str, Any],
base_url: str,
model: str,
output_path: str | None,
limit: int | None,
use_completions: bool,
) -> list[str]:
"""Build command-line arguments for lm_eval."""
lm_eval_config = config.get("lm_eval", {})
# Choose model type based on whether tasks need completions API
if use_completions:
model_type = "local-completions"
endpoint_url = f"{base_url}/v1/completions"
else:
model_type = "local-chat-completions"
endpoint_url = f"{base_url}/v1/chat/completions"
# Build model_args string with num_concurrent and timeout
model_args_parts = [f"model={model}", f"base_url={endpoint_url}"]
num_concurrent = lm_eval_config.get("num_concurrent")
if num_concurrent is not None and num_concurrent > 1:
model_args_parts.append(f"num_concurrent={num_concurrent}")
# Use a very long timeout (1 week) to handle large request queues
timeout = lm_eval_config.get("timeout", 604800)
model_args_parts.append(f"timeout={timeout}")
model_args = ",".join(model_args_parts)
args = [
sys.executable,
"-m",
"bench.lm_eval_patched",
"--model",
model_type,
"--model_args",
model_args,
"--verbosity",
"WARNING",
]
# Tasks
tasks = lm_eval_config.get("tasks", ["mmlu"])
tasks_str = ",".join(tasks) if isinstance(tasks, list) else str(tasks)
args.extend(["--tasks", tasks_str])
# Few-shot
num_fewshot = lm_eval_config.get("num_fewshot")
if num_fewshot is not None:
args.extend(["--num_fewshot", str(num_fewshot)])
# Batch size (default to 1 for API models, "auto" doesn't work)
batch_size = lm_eval_config.get("batch_size", 1)
args.extend(["--batch_size", str(batch_size)])
# Apply chat template for instruct/chat models (default: true)
# Only applies to chat completions, but doesn't hurt to include
apply_chat_template = lm_eval_config.get("apply_chat_template", True)
if apply_chat_template and not use_completions:
args.append("--apply_chat_template")
# Fewshot as multiturn (optional, works with chat template)
fewshot_as_multiturn = lm_eval_config.get("fewshot_as_multiturn", False)
if fewshot_as_multiturn and not use_completions:
args.append("--fewshot_as_multiturn")
# Limit (command line overrides config)
effective_limit = limit if limit is not None else lm_eval_config.get("limit")
if effective_limit is not None:
args.extend(["--limit", str(effective_limit)])
# Output path
effective_output = output_path or lm_eval_config.get("output_path")
if effective_output:
args.extend(["--output_path", effective_output])
# Log model responses for post-hoc analysis when output is saved
args.append("--log_samples")
return args
def run_lm_eval(
config: dict[str, Any],
host: str,
port: int,
model: str,
output_path: str | None,
limit: int | None,
dry_run: bool,
) -> tuple[int, dict[str, Any] | None, float | None]:
"""Run lm_eval evaluation."""
lm_eval_config = config.get("lm_eval", {})
tasks = lm_eval_config.get("tasks", ["mmlu"])
if isinstance(tasks, str):
tasks = [tasks]
exo_base_url = f"http://{host}:{port}"
effective_output = output_path or lm_eval_config.get("output_path")
# Build args - use native completions or chat completions endpoint directly
args = build_lm_eval_args(
config, exo_base_url, model, output_path, limit, use_completions=False
)
logger.info(f"lm_eval command: {' '.join(args)}")
if dry_run:
logger.info("[dry-run] Would execute the above command")
return 0, None, None
try:
start_time = time.perf_counter()
result = subprocess.run(args, check=False)
elapsed_seconds = time.perf_counter() - start_time
# Fetch and return token usage summary from exo
usage: dict[str, Any] | None = None
try:
import httpx
usage_resp = httpx.get(f"{exo_base_url}/v1/usage", timeout=5)
if usage_resp.status_code == 200:
usage_data: dict[str, Any] = usage_resp.json()
usage = usage_data
logger.info("--- Token Usage (Total) ---")
logger.info(
f" Requests: {usage_data.get('total_requests', 0)}"
)
logger.info(
f" Prompt tokens: {usage_data.get('total_prompt_tokens', 0)}"
)
logger.info(
f" Completion tokens: {usage_data.get('total_completion_tokens', 0)}"
)
logger.info(
f" Reasoning tokens: {usage_data.get('total_reasoning_tokens', 0)}"
)
logger.info(f" Total tokens: {usage_data.get('total_tokens', 0)}")
by_model = usage_data.get("by_model", {})
if by_model:
for model_name, counters in by_model.items():
logger.info(f"--- Token Usage ({model_name}) ---")
logger.info(
f" Requests: {counters.get('requests', 0)}"
)
logger.info(
f" Prompt tokens: {counters.get('prompt_tokens', 0)}"
)
logger.info(
f" Completion tokens: {counters.get('completion_tokens', 0)}"
)
logger.info(
f" Reasoning tokens: {counters.get('reasoning_tokens', 0)}"
)
except Exception:
pass # Usage endpoint not available
logger.info(f"Evaluation completed in {elapsed_seconds:.2f}s")
# Append token usage to lm_eval's results.json
if effective_output and usage:
_append_token_usage_to_results(effective_output, usage, elapsed_seconds)
return result.returncode, usage, elapsed_seconds
except FileNotFoundError:
logger.error("lm_eval not found. Install with: uv sync --extra eval")
return 1, None, None
def _append_token_usage_to_results(
output_path: str, usage: dict[str, Any], elapsed_seconds: float
) -> None:
"""Append token usage data to lm_eval's results.json file."""
output_dir = Path(output_path)
results_file = output_dir / "results.json"
if not results_file.exists():
# lm_eval may put results in a subdirectory named after the model
for subdir in output_dir.iterdir():
if subdir.is_dir():
candidate = subdir / "results.json"
if candidate.exists():
results_file = candidate
break
if not results_file.exists():
logger.warning(f"Could not find results.json in {output_path}")
return
try:
with open(results_file, encoding="utf-8") as f:
results = json.load(f)
# Add token usage to the results
results["token_usage"] = {
"prompt_tokens": usage.get("total_prompt_tokens", 0),
"completion_tokens": usage.get("total_completion_tokens", 0),
"reasoning_tokens": usage.get("total_reasoning_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
"total_requests": usage.get("total_requests", 0),
"by_model": usage.get("by_model"),
}
results["elapsed_seconds"] = elapsed_seconds
with open(results_file, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
logger.info(f"Added token usage to: {results_file}")
except Exception as e:
logger.warning(f"Failed to append token usage to results.json: {e}")
def run_swe_bench(
config: dict[str, Any],
host: str,
port: int,
model: str,
output_path: str | None,
dry_run: bool,
) -> tuple[int, dict[str, Any] | None, float | None]:
"""Run SWE-bench evaluation (placeholder)."""
swe_config = config.get("swe_bench", {})
dataset = swe_config.get("dataset", "princeton-nlp/SWE-bench_Lite")
max_workers = swe_config.get("max_workers", 8)
predictions_path = output_path or swe_config.get(
"predictions_path", "bench/predictions"
)
logger.info("SWE-bench evaluation configuration:")
logger.info(f" Dataset: {dataset}")
logger.info(f" Model: {model}")
logger.info(f" API endpoint: http://{host}:{port}/v1")
logger.info(f" Max workers: {max_workers}")
logger.info(f" Predictions path: {predictions_path}")
if dry_run:
logger.info("[dry-run] SWE-bench evaluation would be executed")
return 0, None, None
logger.warning(
"SWE-bench integration is a placeholder. "
"Implement swebench inference and evaluation logic as needed."
)
return 0, None, None
def run_livecodebench(
config: dict[str, Any],
host: str,
port: int,
model: str,
output_path: str | None,
limit: int | None,
dry_run: bool,
) -> tuple[int, dict[str, Any] | None, float | None]:
"""Run LiveCodeBench evaluation.
LiveCodeBench is a contamination-free benchmark for code generation that
continuously collects new problems from LeetCode, AtCoder, and Codeforces.
See: https://livecodebench.github.io/
"""
lcb_config = config.get("livecodebench", {})
scenario = lcb_config.get("scenario", "codegeneration")
release_version = lcb_config.get("release_version", "release_v5")
# Defaults match Artificial Analysis methodology:
# https://artificialanalysis.ai/methodology/intelligence-benchmarking
temperature = lcb_config.get("temperature", 0) # 0 for non-reasoning models
n_samples = lcb_config.get("n_samples", 1) # pass@1
max_tokens = lcb_config.get("max_tokens", 16384)
use_cache = lcb_config.get("use_cache", False)
fast = lcb_config.get("fast", True) # Use code_generation_lite by default
evaluate = lcb_config.get("evaluate", True)
multiprocess = lcb_config.get("multiprocess", 4)
# Timeouts (high defaults for slow inference)
timeout = lcb_config.get("timeout", 1800) # 30 min per problem
openai_timeout = lcb_config.get("openai_timeout", 3600) # 1 hour per request
exo_base_url = f"http://{host}:{port}/v1"
effective_output = output_path or lcb_config.get("output_path", "bench/lcb_results")
logger.info("LiveCodeBench evaluation configuration:")
logger.info(f" Scenario: {scenario}")
logger.info(f" Release version: {release_version}")
logger.info(f" Model: {model}")
logger.info(f" API endpoint: {exo_base_url}")
logger.info(f" Temperature: {temperature}")
logger.info(f" N samples: {n_samples}")
logger.info(f" Max tokens: {max_tokens}")
logger.info(f" Output path: {effective_output}")
# Build command using our wrapper script that handles:
# 1. Registering custom models in LiveCodeBench's registry
# 2. Patching the OpenAI client to use exo's endpoint
args = [
sys.executable,
"-m",
"bench.livecodebench_runner",
"--base-url",
exo_base_url,
"--model",
model,
"--scenario",
scenario,
"--release_version",
release_version,
"--temperature",
str(temperature),
"--n",
str(n_samples),
"--codegen_n",
str(n_samples),
"--max_tokens",
str(max_tokens),
"--output-dir",
effective_output,
]
if use_cache:
args.append("--use_cache")
if not fast:
args.append("--not_fast")
if evaluate:
args.append("--evaluate")
if multiprocess > 1:
args.extend(["--multiprocess", str(multiprocess)])
# Add timeouts
args.extend(["--timeout", str(timeout)])
args.extend(["--openai_timeout", str(openai_timeout)])
if limit is not None:
args.extend(["--limit", str(limit)])
logger.info(f"LiveCodeBench command: {' '.join(args)}")
if dry_run:
logger.info("[dry-run] Would execute the above command")
return 0, None, None
# Environment is set up by the wrapper script
env = os.environ.copy()
try:
start_time = time.perf_counter()
result = subprocess.run(args, env=env, check=False)
elapsed_seconds = time.perf_counter() - start_time
# Fetch token usage from exo
usage: dict[str, Any] | None = None
try:
import httpx
usage_resp = httpx.get(f"http://{host}:{port}/v1/usage", timeout=5)
if usage_resp.status_code == 200:
usage_data: dict[str, Any] = usage_resp.json()
usage = usage_data
logger.info("--- Token Usage (Total) ---")
logger.info(
f" Requests: {usage_data.get('total_requests', 0)}"
)
logger.info(
f" Prompt tokens: {usage_data.get('total_prompt_tokens', 0)}"
)
logger.info(
f" Completion tokens: {usage_data.get('total_completion_tokens', 0)}"
)
logger.info(
f" Total tokens: {usage_data.get('total_tokens', 0)}"
)
except Exception:
pass # Usage endpoint not available
logger.info(f"LiveCodeBench evaluation completed in {elapsed_seconds:.2f}s")
return result.returncode, usage, elapsed_seconds
except FileNotFoundError:
logger.error(
"LiveCodeBench not found. Install with: "
"pip install livecodebench OR "
"git clone https://github.com/LiveCodeBench/LiveCodeBench && "
"cd LiveCodeBench && uv pip install -e ."
)
return 1, None, None
def run_custom_eval(
config: dict[str, Any],
host: str,
port: int,
model: str,
output_path: str | None,
dry_run: bool,
) -> tuple[int, dict[str, Any] | None, float | None]:
"""Run custom evaluation script."""
custom_config = config.get("custom", {})
script = custom_config.get("script")
if not script:
logger.error("No script specified in [custom] config section")
return 1, None, None
script_path = Path(script)
if not script_path.exists():
logger.error(f"Custom script not found: {script}")
return 1, None, None
script_args = custom_config.get("args", [])
if not isinstance(script_args, list):
script_args = [str(script_args)]
# Build environment with exo connection info
env = os.environ.copy()
env["EXO_HOST"] = host
env["EXO_PORT"] = str(port)
env["EXO_MODEL"] = model
if output_path:
env["EXO_OUTPUT_PATH"] = output_path
cmd = [sys.executable, str(script_path), *script_args]
logger.info(f"Custom eval command: {' '.join(cmd)}")
if dry_run:
logger.info("[dry-run] Would execute the above command")
return 0, None, None
start_time = time.perf_counter()
result = subprocess.run(cmd, env=env, check=False)
elapsed_seconds = time.perf_counter() - start_time
logger.info(f"Custom evaluation completed in {elapsed_seconds:.2f}s")
return result.returncode, None, elapsed_seconds
def write_results_metadata(
output_path: str,
config: dict[str, Any],
host: str,
port: int,
model: str,
eval_type: EvalType,
return_code: int,
preview: dict[str, Any] | None,
usage: dict[str, Any] | None,
elapsed_seconds: float | None,
) -> None:
"""Write evaluation metadata to a JSON file."""
metadata: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"eval_type": eval_type,
"model": model,
"api_endpoint": f"http://{host}:{port}/v1",
"config": config,
"return_code": return_code,
}
if elapsed_seconds is not None:
metadata["elapsed_seconds"] = elapsed_seconds
if preview:
metadata["placement"] = {
"sharding": preview.get("sharding"),
"instance_meta": preview.get("instance_meta"),
"instance_id": instance_id_from_instance(preview["instance"])
if "instance" in preview
else None,
}
if usage:
metadata["token_usage"] = {
"prompt_tokens": usage.get("total_prompt_tokens", 0),
"completion_tokens": usage.get("total_completion_tokens", 0),
"reasoning_tokens": usage.get("total_reasoning_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
"total_requests": usage.get("total_requests", 0),
"by_model": usage.get("by_model"),
}
output_dir = Path(output_path)
output_dir.mkdir(parents=True, exist_ok=True)
metadata_path = output_dir / "eval_metadata.json"
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False, default=str)
logger.info(f"Wrote evaluation metadata to: {metadata_path}")
def main() -> int:
"""Main entry point for exo-eval."""
ap = argparse.ArgumentParser(
prog="exo-eval",
description="Evaluation harness for exo inference system.",
)
ap.add_argument(
"--config",
required=True,
help="Path to TOML configuration file",
)
ap.add_argument(
"--host",
default=os.environ.get("EXO_HOST", "localhost"),
help="exo API host (default: localhost or EXO_HOST env var)",
)
ap.add_argument(
"--port",
type=int,
default=int(os.environ.get("EXO_PORT", "52415")),
help="exo API port (default: 52415 or EXO_PORT env var)",
)
ap.add_argument(
"--model",
required=True,
help="Model name/ID to evaluate",
)
ap.add_argument(
"--output",
default=None,
help="Output path for results (overrides config)",
)
ap.add_argument(
"--limit",
type=int,
default=None,
help="Limit samples per task (overrides config, lm_eval only)",
)
ap.add_argument(
"--timeout",
type=float,
default=604800.0,
help="HTTP timeout in seconds (default: 604800 = 1 week)",
)
ap.add_argument(
"--skip-instance-setup",
action="store_true",
help="Skip instance creation (assume instance already running)",
)
ap.add_argument(
"--pipeline",
type=int,
default=None,
metavar="N",
help="Use pipeline sharding with exactly N nodes (overrides config)",
)
ap.add_argument(
"--instance-meta",
choices=["ring", "jaccl", "both"],
default=None,
help="Instance meta preference (overrides config)",
)
ap.add_argument(
"--dry-run",
action="store_true",
help="Print commands without executing",
)
args = ap.parse_args()
logger.info(f"exo-eval starting with config: {args.config}")
try:
config = load_config(args.config)
except FileNotFoundError as e:
logger.error(str(e))
return 1
except TOMLKitError as e:
logger.error(f"Failed to parse config: {e}")
return 1
eval_type = get_eval_type(config)
logger.info(f"Evaluation type: {eval_type}")
logger.info(f"Model: {args.model}")
logger.info(f"API endpoint: http://{args.host}:{args.port}/v1")
# Apply CLI overrides to instance config
if args.pipeline is not None or args.instance_meta is not None:
instance_config = config.setdefault("instance", {})
if args.pipeline is not None:
instance_config["sharding"] = "pipeline"
instance_config["min_nodes"] = args.pipeline
instance_config["max_nodes"] = args.pipeline
logger.info(f"CLI override: pipeline={args.pipeline} nodes")
# Limit concurrency for pipeline to avoid GPU timeouts
if args.pipeline >= 2:
lm_eval_config = config.setdefault("lm_eval", {})
lm_eval_config["num_concurrent"] = 4
logger.info("CLI override: num_concurrent=4 (pipeline>=2)")
if args.instance_meta is not None:
instance_config["instance_meta"] = args.instance_meta
logger.info(f"CLI override: instance_meta={args.instance_meta}")
# Check HuggingFace token if required
if not check_hf_token(config):
return 1
# Setup instance and resolve model
instance_id: str | None = None
preview: dict[str, Any] | None = None
client: ExoClient | None = None
if args.skip_instance_setup:
# Use model name as-is when skipping instance setup
full_model_id = args.model
logger.info(f"Using model: {full_model_id} (instance setup skipped)")
else:
client = ExoClient(args.host, args.port, timeout_s=args.timeout)
# Resolve model
try:
short_id, full_model_id = resolve_model_short_id(client, args.model)
logger.info(f"Resolved model: {short_id} -> {full_model_id}")
except Exception as e:
logger.error(f"Failed to resolve model: {e}")
return 1
instance_id, preview = setup_instance(
client, full_model_id, config, args.dry_run
)
if instance_id is None and not args.dry_run:
return 1
try:
# Run evaluation
usage: dict[str, Any] | None = None
elapsed_seconds: float | None = None
if eval_type == "lm_eval":
return_code, usage, elapsed_seconds = run_lm_eval(
config,
args.host,
args.port,
full_model_id,
args.output,
args.limit,
args.dry_run,
)
elif eval_type == "swe_bench":
return_code, usage, elapsed_seconds = run_swe_bench(
config,
args.host,
args.port,
full_model_id,
args.output,
args.dry_run,
)
elif eval_type == "livecodebench":
return_code, usage, elapsed_seconds = run_livecodebench(
config,
args.host,
args.port,
full_model_id,
args.output,
args.limit,
args.dry_run,
)
elif eval_type == "custom":
return_code, usage, elapsed_seconds = run_custom_eval(
config,
args.host,
args.port,
full_model_id,
args.output,
args.dry_run,
)
else:
logger.error(f"Unknown eval type: {eval_type}")
return 1
# Write metadata if output path specified and not dry-run
output_path = args.output or config.get(eval_type, {}).get("output_path")
if output_path and not args.dry_run:
write_results_metadata(
output_path,
config,
args.host,
args.port,
full_model_id,
eval_type,
return_code,
preview,
usage,
elapsed_seconds,
)
return return_code
finally:
# Teardown instance
if instance_id and client and not args.skip_instance_setup and not args.dry_run:
teardown_instance(client, instance_id)
if __name__ == "__main__":
raise SystemExit(main())