mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-03-29 04:43:22 -04:00
- Added support for pagination (page and limit) in the session events endpoint. - Implemented sorting functionality based on specified columns and directions. - Introduced free-text search capability for session events. - Updated SQL queries to retrieve all events and added a new SQL constant for events. - Refactored GraphQL types and helpers to support new plugin and event queries. - Created new GraphQL resolvers for plugins and events with pagination and filtering. - Added comprehensive tests for new GraphQL endpoints and session events functionality.
434 lines
14 KiB
Python
434 lines
14 KiB
Python
import sys
|
|
import random
|
|
import pytest
|
|
|
|
INSTALL_PATH = "/app"
|
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
|
|
|
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
|
|
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def api_token():
|
|
return get_setting_value("API_TOKEN")
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
with app.test_client() as client:
|
|
yield client
|
|
|
|
|
|
@pytest.fixture
|
|
def test_mac():
|
|
# Generate a unique MAC for each test run
|
|
return "aa:bb:cc:" + ":".join(f"{random.randint(0, 255):02X}" for _ in range(3)).lower().lower()
|
|
|
|
|
|
def auth_headers(token):
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
def test_graphql_debug_get(client):
|
|
"""GET /graphql should return the debug string"""
|
|
resp = client.get("/graphql")
|
|
assert resp.status_code == 200
|
|
assert resp.data.decode() == "NetAlertX GraphQL server running."
|
|
|
|
|
|
def test_graphql_post_unauthorized(client):
|
|
"""POST /graphql without token should return 403"""
|
|
query = {"query": "{ devices { devName devMac } }"}
|
|
resp = client.post("/graphql", json=query)
|
|
assert resp.status_code == 403
|
|
assert "Unauthorized access attempt" in resp.json.get("message", "")
|
|
assert "Forbidden" in resp.json.get("error", "")
|
|
|
|
# --- DEVICES TESTS ---
|
|
|
|
|
|
def test_graphql_post_devices(client, api_token):
|
|
"""POST /graphql with a valid token should return device data"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
devices {
|
|
devices {
|
|
devGUID
|
|
devGroup
|
|
devIsRandomMac
|
|
devParentChildrenCount
|
|
}
|
|
count
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
|
|
body = resp.get_json()
|
|
|
|
# GraphQL spec: response always under "data"
|
|
assert "data" in body
|
|
data = body["data"]
|
|
|
|
assert "devices" in data
|
|
assert isinstance(data["devices"]["devices"], list)
|
|
assert isinstance(data["devices"]["count"], int)
|
|
|
|
|
|
# --- SETTINGS TESTS ---
|
|
def test_graphql_post_settings(client, api_token):
|
|
"""POST /graphql should return settings data"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
settings {
|
|
settings { setKey setValue setGroup }
|
|
count
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
data = resp.json.get("data", {})
|
|
assert "settings" in data
|
|
assert isinstance(data["settings"]["settings"], list)
|
|
|
|
|
|
# --- LANGSTRINGS TESTS ---
|
|
def test_graphql_post_langstrings_specific(client, api_token):
|
|
"""Retrieve a specific langString in a given language"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
langStrings(langCode: "en_us", langStringKey: "settings_other_scanners") {
|
|
langStrings { langCode langStringKey langStringText }
|
|
count
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
data = resp.json.get("data", {}).get("langStrings", {})
|
|
assert data["count"] >= 1
|
|
for entry in data["langStrings"]:
|
|
assert entry["langCode"] == "en_us"
|
|
assert entry["langStringKey"] == "settings_other_scanners"
|
|
assert isinstance(entry["langStringText"], str)
|
|
|
|
|
|
def test_graphql_post_langstrings_fallback(client, api_token):
|
|
"""Fallback to en_us if requested language string is empty"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
langStrings(langCode: "de_de", langStringKey: "settings_other_scanners") {
|
|
langStrings { langCode langStringKey langStringText }
|
|
count
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
data = resp.json.get("data", {}).get("langStrings", {})
|
|
assert data["count"] >= 1
|
|
# Ensure fallback occurred if de_de text is empty
|
|
for entry in data["langStrings"]:
|
|
assert entry["langStringText"] != ""
|
|
|
|
|
|
def test_graphql_post_langstrings_all_languages(client, api_token):
|
|
"""Retrieve all languages for a given key"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
enStrings: langStrings(langCode: "en_us", langStringKey: "settings_other_scanners") {
|
|
langStrings { langCode langStringKey langStringText }
|
|
count
|
|
}
|
|
deStrings: langStrings(langCode: "de_de", langStringKey: "settings_other_scanners") {
|
|
langStrings { langCode langStringKey langStringText }
|
|
count
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
data = resp.json.get("data", {})
|
|
assert "enStrings" in data
|
|
assert "deStrings" in data
|
|
# At least one string in each language
|
|
assert data["enStrings"]["count"] >= 1
|
|
assert data["deStrings"]["count"] >= 1
|
|
# Ensure langCode matches
|
|
assert all(e["langCode"] == "en_us" for e in data["enStrings"]["langStrings"])
|
|
|
|
|
|
def test_graphql_langstrings_excludes_languages_json(client, api_token):
|
|
"""languages.json must never appear as a language string entry (langCode='languages')"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
langStrings {
|
|
langStrings { langCode langStringKey langStringText }
|
|
count
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
all_strings = resp.json.get("data", {}).get("langStrings", {}).get("langStrings", [])
|
|
# No entry should have langCode == "languages" (i.e. from languages.json)
|
|
polluted = [s for s in all_strings if s.get("langCode") == "languages"]
|
|
assert polluted == [], (
|
|
f"languages.json leaked into langStrings as {len(polluted)} entries; "
|
|
"graphql_endpoint.py must exclude it from the directory scan"
|
|
)
|
|
|
|
|
|
# --- PLUGINS_OBJECTS TESTS ---
|
|
|
|
def test_graphql_plugins_objects_no_options(client, api_token):
|
|
"""pluginsObjects without options returns valid schema (entries list + count fields)"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
pluginsObjects {
|
|
dbCount
|
|
count
|
|
entries {
|
|
index
|
|
plugin
|
|
objectPrimaryId
|
|
status
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
result = body["data"]["pluginsObjects"]
|
|
assert isinstance(result["entries"], list)
|
|
assert isinstance(result["dbCount"], int)
|
|
assert isinstance(result["count"], int)
|
|
assert result["dbCount"] >= result["count"]
|
|
|
|
|
|
def test_graphql_plugins_objects_pagination(client, api_token):
|
|
"""pluginsObjects with limit=5 returns at most 5 entries and count reflects filter total"""
|
|
query = {
|
|
"query": """
|
|
query PluginsObjectsPaged($options: PluginQueryOptionsInput) {
|
|
pluginsObjects(options: $options) {
|
|
dbCount
|
|
count
|
|
entries { index plugin }
|
|
}
|
|
}
|
|
""",
|
|
"variables": {"options": {"page": 1, "limit": 5}}
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
result = body["data"]["pluginsObjects"]
|
|
assert len(result["entries"]) <= 5
|
|
assert result["count"] >= len(result["entries"])
|
|
|
|
|
|
def test_graphql_plugins_events_no_options(client, api_token):
|
|
"""pluginsEvents without options returns valid schema"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
pluginsEvents {
|
|
dbCount
|
|
count
|
|
entries { index plugin objectPrimaryId dateTimeCreated }
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
result = body["data"]["pluginsEvents"]
|
|
assert isinstance(result["entries"], list)
|
|
assert isinstance(result["count"], int)
|
|
|
|
|
|
def test_graphql_plugins_history_no_options(client, api_token):
|
|
"""pluginsHistory without options returns valid schema"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
pluginsHistory {
|
|
dbCount
|
|
count
|
|
entries { index plugin watchedValue1 }
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
result = body["data"]["pluginsHistory"]
|
|
assert isinstance(result["entries"], list)
|
|
assert isinstance(result["count"], int)
|
|
|
|
|
|
def test_graphql_plugins_hard_cap(client, api_token):
|
|
"""limit=99999 is clamped server-side to at most 1000 entries"""
|
|
query = {
|
|
"query": """
|
|
query PluginsHardCap($options: PluginQueryOptionsInput) {
|
|
pluginsObjects(options: $options) {
|
|
count
|
|
entries { index }
|
|
}
|
|
}
|
|
""",
|
|
"variables": {"options": {"page": 1, "limit": 99999}}
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
entries = body["data"]["pluginsObjects"]["entries"]
|
|
assert len(entries) <= 1000, f"Hard cap violated: got {len(entries)} entries"
|
|
|
|
|
|
# --- EVENTS TESTS ---
|
|
|
|
def test_graphql_events_no_options(client, api_token):
|
|
"""events without options returns valid schema (entries list + count fields)"""
|
|
query = {
|
|
"query": """
|
|
{
|
|
events {
|
|
dbCount
|
|
count
|
|
entries {
|
|
eveMac
|
|
eveIp
|
|
eveDateTime
|
|
eveEventType
|
|
eveAdditionalInfo
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
result = body["data"]["events"]
|
|
assert isinstance(result["entries"], list)
|
|
assert isinstance(result["count"], int)
|
|
assert isinstance(result["dbCount"], int)
|
|
|
|
|
|
def test_graphql_events_filter_by_mac(client, api_token):
|
|
"""events filtered by eveMac='00:00:00:00:00:00' returns only that MAC (or empty)"""
|
|
query = {
|
|
"query": """
|
|
query EventsByMac($options: EventQueryOptionsInput) {
|
|
events(options: $options) {
|
|
count
|
|
entries { eveMac eveEventType eveDateTime }
|
|
}
|
|
}
|
|
""",
|
|
"variables": {"options": {"eveMac": "00:00:00:00:00:00", "limit": 50}}
|
|
}
|
|
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert "errors" not in body
|
|
result = body["data"]["events"]
|
|
for entry in result["entries"]:
|
|
assert entry["eveMac"].upper() == "00:00:00:00:00:00", (
|
|
f"MAC filter leaked a non-matching row: {entry['eveMac']}"
|
|
)
|
|
|
|
|
|
# --- PLUGIN FILTER SCOPING TESTS ---
|
|
|
|
def test_graphql_plugins_objects_dbcount_scoped_to_plugin(client, api_token):
|
|
"""dbCount should reflect only the rows for the requested plugin, not the entire table."""
|
|
# First, get the unscoped total
|
|
query_all = {
|
|
"query": "{ pluginsObjects { dbCount count } }"
|
|
}
|
|
resp_all = client.post("/graphql", json=query_all, headers=auth_headers(api_token))
|
|
assert resp_all.status_code == 200
|
|
total_all = resp_all.get_json()["data"]["pluginsObjects"]["dbCount"]
|
|
|
|
# Now request a non-existent plugin — dbCount must be 0
|
|
query_fake = {
|
|
"query": """
|
|
query Scoped($options: PluginQueryOptionsInput) {
|
|
pluginsObjects(options: $options) { dbCount count entries { plugin } }
|
|
}
|
|
""",
|
|
"variables": {"options": {"plugin": "NONEXISTENT_PLUGIN_XYZ"}}
|
|
}
|
|
resp_fake = client.post("/graphql", json=query_fake, headers=auth_headers(api_token))
|
|
assert resp_fake.status_code == 200
|
|
body_fake = resp_fake.get_json()
|
|
assert "errors" not in body_fake
|
|
result_fake = body_fake["data"]["pluginsObjects"]
|
|
assert result_fake["dbCount"] == 0, (
|
|
f"dbCount should be 0 for non-existent plugin, got {result_fake['dbCount']}"
|
|
)
|
|
assert result_fake["count"] == 0
|
|
assert result_fake["entries"] == []
|
|
|
|
|
|
def test_graphql_plugins_objects_scoped_entries_match_plugin(client, api_token):
|
|
"""When filtering by plugin, all returned entries must belong to that plugin."""
|
|
# Get first available plugin prefix from the unscoped query
|
|
query_sample = {
|
|
"query": "{ pluginsObjects(options: {page: 1, limit: 1}) { entries { plugin } } }"
|
|
}
|
|
resp = client.post("/graphql", json=query_sample, headers=auth_headers(api_token))
|
|
assert resp.status_code == 200
|
|
entries = resp.get_json()["data"]["pluginsObjects"]["entries"]
|
|
if not entries:
|
|
pytest.skip("No plugin objects in database")
|
|
target = entries[0]["plugin"]
|
|
|
|
# Query scoped to that plugin
|
|
query_scoped = {
|
|
"query": """
|
|
query Scoped($options: PluginQueryOptionsInput) {
|
|
pluginsObjects(options: $options) { dbCount count entries { plugin } }
|
|
}
|
|
""",
|
|
"variables": {"options": {"plugin": target, "page": 1, "limit": 100}}
|
|
}
|
|
resp2 = client.post("/graphql", json=query_scoped, headers=auth_headers(api_token))
|
|
assert resp2.status_code == 200
|
|
result = resp2.get_json()["data"]["pluginsObjects"]
|
|
assert result["dbCount"] > 0
|
|
for entry in result["entries"]:
|
|
assert entry["plugin"].upper() == target.upper(), (
|
|
f"Plugin filter leaked: expected {target}, got {entry['plugin']}"
|
|
)
|