Files
LocalAI/backend/python/vllm/test.py
Richard Palethorpe 4916f8c880 feat(vllm): expose AsyncEngineArgs via generic engine_args YAML map (#9563)
* feat(vllm): expose AsyncEngineArgs via generic engine_args YAML map

LocalAI's vLLM backend wraps a small typed subset of vLLM's
AsyncEngineArgs (quantization, tensor_parallel_size, dtype, etc.).
Anything outside that subset -- pipeline/data/expert parallelism,
speculative_config, kv_transfer_config, all2all_backend, prefix
caching, chunked prefill, etc. -- requires a new protobuf field, a
Go struct field, an options.go line, and a backend.py mapping per
feature. That cadence is the bottleneck on shipping vLLM's
production feature set.

Add a generic `engine_args:` map on the model YAML that is
JSON-serialised into a new ModelOptions.EngineArgs proto field and
applied verbatim to AsyncEngineArgs at LoadModel time. Validation
is done by the Python backend via dataclasses.fields(); unknown
keys fail with the closest valid name as a hint.
dataclasses.replace() is used so vLLM's __post_init__ re-runs and
auto-converts dict values into nested config dataclasses
(CompilationConfig, AttentionConfig, ...). speculative_config and
kv_transfer_config flow through as dicts; vLLM converts them at
engine init.

Operators can now write:

  engine_args:
    data_parallel_size: 8
    enable_expert_parallel: true
    all2all_backend: deepep_low_latency
    speculative_config:
      method: deepseek_mtp
      num_speculative_tokens: 3
    kv_cache_dtype: fp8

without further proto/Go/Python plumbing per field.

Production defaults seeded by hooks_vllm.go: enable_prefix_caching
and enable_chunked_prefill default to true unless explicitly set.

Existing typed YAML fields (gpu_memory_utilization,
tensor_parallel_size, etc.) remain for back-compat; engine_args
overrides them when both are set.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* chore(vllm): pin cublas13 to vLLM 0.20.0 cu130 wheel

vLLM's PyPI wheel is built against CUDA 12 (libcudart.so.12) and won't
load on a cu130 host. Switch the cublas13 build to vLLM's per-tag cu130
simple-index (https://wheels.vllm.ai/0.20.0/cu130/) and pin
vllm==0.20.0. The cu130-flavoured wheel ships libcudart.so.13 and
includes the DFlash speculative-decoding method that landed in 0.20.0.

cublas13 install gets --index-strategy=unsafe-best-match so uv consults
both the cu130 index and PyPI when resolving — PyPI also publishes
vllm==0.20.0, but with cu12 binaries that error at import time.

Verified: Qwen3.5-4B + z-lab/Qwen3.5-4B-DFlash loads and serves chat
completions on RTX 5070 Ti (sm_120, cu130).

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* ci(vllm): bot job to bump cublas13 vLLM wheel pin

vLLM's cu130 wheel index URL is itself version-locked
(wheels.vllm.ai/<TAG>/cu130/, no /latest/ alias upstream), so a vLLM
bump means rewriting two values atomically — the URL segment and the
version constraint. bump_deps.sh handles git-sha-in-Makefile only;
add a sibling bump_vllm_wheel.sh and a matching workflow job that
mirrors the existing matrix's PR-creation pattern.

The bumper queries /releases/latest (which excludes prereleases),
strips the leading 'v', and seds both lines unconditionally. When the
file is already on the latest tag the rewrite is a no-op and
peter-evans/create-pull-request opens no PR.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* docs(vllm): document engine_args and speculative decoding

The new engine_args: map plumbs arbitrary AsyncEngineArgs through to
vLLM, but the public docs only covered the basic typed fields. Add a
short subsection in the vLLM section explaining the typed/generic
split and showing a worked DFlash speculative-decoding config, with
pointers to vLLM's SpeculativeConfig reference and z-lab's drafter
collection.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

---------

Signed-off-by: Richard Palethorpe <io@richiejp.com>
Co-authored-by: Ettore Di Giacinto <mudler@users.noreply.github.com>
2026-04-29 00:49:28 +02:00

281 lines
11 KiB
Python

import unittest
import subprocess
import time
import backend_pb2
import backend_pb2_grpc
import grpc
import unittest
import subprocess
import time
import grpc
import backend_pb2_grpc
import backend_pb2
class TestBackendServicer(unittest.TestCase):
"""
TestBackendServicer is the class that tests the gRPC service.
This class contains methods to test the startup and shutdown of the gRPC service.
"""
def setUp(self):
self.service = subprocess.Popen(["python", "backend.py", "--addr", "localhost:50051"])
time.sleep(10)
def tearDown(self) -> None:
self.service.terminate()
self.service.wait()
def test_server_startup(self):
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.Health(backend_pb2.HealthMessage())
self.assertEqual(response.message, b'OK')
except Exception as err:
print(err)
self.fail("Server failed to start")
finally:
self.tearDown()
def test_load_model(self):
"""
This method tests if the model is loaded successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="facebook/opt-125m"))
self.assertTrue(response.success)
self.assertEqual(response.message, "Model loaded successfully")
except Exception as err:
print(err)
self.fail("LoadModel service failed")
finally:
self.tearDown()
def test_text(self):
"""
This method tests if the embeddings are generated successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="facebook/opt-125m"))
self.assertTrue(response.success)
req = backend_pb2.PredictOptions(Prompt="The capital of France is")
resp = stub.Predict(req)
self.assertIsNotNone(resp.message)
except Exception as err:
print(err)
self.fail("text service failed")
finally:
self.tearDown()
def test_sampling_params(self):
"""
This method tests if all sampling parameters are correctly processed
NOTE: this does NOT test for correctness, just that we received a compatible response
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="facebook/opt-125m"))
self.assertTrue(response.success)
req = backend_pb2.PredictOptions(
Prompt="The capital of France is",
TopP=0.8,
Tokens=50,
Temperature=0.7,
TopK=40,
PresencePenalty=0.1,
FrequencyPenalty=0.2,
RepetitionPenalty=1.1,
MinP=0.05,
Seed=42,
StopPrompts=["\n"],
StopTokenIds=[50256],
BadWords=["badword"],
IncludeStopStrInOutput=True,
IgnoreEOS=True,
MinTokens=5,
Logprobs=5,
PromptLogprobs=5,
SkipSpecialTokens=True,
SpacesBetweenSpecialTokens=True,
TruncatePromptTokens=10,
GuidedDecoding=True,
N=2,
)
resp = stub.Predict(req)
self.assertIsNotNone(resp.message)
self.assertIsNotNone(resp.logprobs)
except Exception as err:
print(err)
self.fail("sampling params service failed")
finally:
self.tearDown()
def test_messages_to_dicts(self):
"""
Tests _messages_to_dicts conversion of proto Messages to dicts.
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend import BackendServicer
servicer = BackendServicer()
msgs = [
backend_pb2.Message(role="user", content="hello"),
backend_pb2.Message(
role="assistant",
content="",
tool_calls='[{"id":"call_1","type":"function","function":{"name":"foo","arguments":"{}"}}]',
reasoning_content="thinking...",
),
backend_pb2.Message(role="tool", content="result", name="foo", tool_call_id="call_1"),
]
result = servicer._messages_to_dicts(msgs)
self.assertEqual(len(result), 3)
self.assertEqual(result[0], {"role": "user", "content": "hello"})
self.assertEqual(result[1]["reasoning_content"], "thinking...")
self.assertIsInstance(result[1]["tool_calls"], list)
self.assertEqual(result[1]["tool_calls"][0]["id"], "call_1")
self.assertEqual(result[2]["tool_call_id"], "call_1")
self.assertEqual(result[2]["name"], "foo")
def test_parse_options(self):
"""
Tests _parse_options correctly parses key:value strings.
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend import BackendServicer
servicer = BackendServicer()
opts = servicer._parse_options([
"tool_parser:hermes",
"reasoning_parser:deepseek_r1",
"invalid_no_colon",
"key_with_colons:a:b:c",
])
self.assertEqual(opts["tool_parser"], "hermes")
self.assertEqual(opts["reasoning_parser"], "deepseek_r1")
self.assertEqual(opts["key_with_colons"], "a:b:c")
self.assertNotIn("invalid_no_colon", opts)
def test_apply_engine_args_known_keys(self):
"""
Tests _apply_engine_args overlays user-supplied JSON onto AsyncEngineArgs.
"""
import sys, os, json as _json
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend import BackendServicer
from vllm.engine.arg_utils import AsyncEngineArgs
servicer = BackendServicer()
base = AsyncEngineArgs(model="facebook/opt-125m")
extras = _json.dumps({
"trust_remote_code": True,
"max_num_seqs": 32,
})
out = servicer._apply_engine_args(base, extras)
self.assertTrue(out.trust_remote_code)
self.assertEqual(out.max_num_seqs, 32)
# untouched fields preserved
self.assertEqual(out.model, "facebook/opt-125m")
def test_apply_engine_args_unknown_key_raises(self):
"""
Tests _apply_engine_args rejects unknown keys with a helpful suggestion.
"""
import sys, os, json as _json
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend import BackendServicer
from vllm.engine.arg_utils import AsyncEngineArgs
servicer = BackendServicer()
base = AsyncEngineArgs(model="facebook/opt-125m")
with self.assertRaises(ValueError) as ctx:
servicer._apply_engine_args(base, _json.dumps({"trustremotecode": True}))
self.assertIn("trustremotecode", str(ctx.exception))
# close-match hint for the typo
self.assertIn("trust_remote_code", str(ctx.exception))
def test_apply_engine_args_empty_passthrough(self):
"""
Tests that empty engine_args returns the base unchanged.
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend import BackendServicer
from vllm.engine.arg_utils import AsyncEngineArgs
servicer = BackendServicer()
base = AsyncEngineArgs(model="facebook/opt-125m")
self.assertIs(servicer._apply_engine_args(base, ""), base)
self.assertIs(servicer._apply_engine_args(base, None), base)
def test_tokenize_string(self):
"""
Tests the TokenizeString RPC returns valid tokens.
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="facebook/opt-125m"))
self.assertTrue(response.success)
resp = stub.TokenizeString(backend_pb2.PredictOptions(Prompt="Hello world"))
self.assertGreater(resp.length, 0)
self.assertEqual(len(resp.tokens), resp.length)
except Exception as err:
print(err)
self.fail("TokenizeString service failed")
finally:
self.tearDown()
def test_free(self):
"""
Tests the Free RPC doesn't crash.
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="facebook/opt-125m"))
self.assertTrue(response.success)
free_resp = stub.Free(backend_pb2.HealthMessage())
self.assertTrue(free_resp.success)
except Exception as err:
print(err)
self.fail("Free service failed")
finally:
self.tearDown()
def test_embedding(self):
"""
This method tests if the embeddings are generated successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="intfloat/e5-mistral-7b-instruct"))
self.assertTrue(response.success)
embedding_request = backend_pb2.PredictOptions(Embeddings="This is a test sentence.")
embedding_response = stub.Embedding(embedding_request)
self.assertIsNotNone(embedding_response.embeddings)
# assert that is a list of floats
self.assertIsInstance(embedding_response.embeddings, list)
# assert that the list is not empty
self.assertTrue(len(embedding_response.embeddings) > 0)
except Exception as err:
print(err)
self.fail("Embedding service failed")
finally:
self.tearDown()