mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-02-01 02:21:17 -05:00
433 lines
18 KiB
Python
433 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Pytest-based Mount Diagnostic Tests for NetAlertX
|
||
|
||
Tests all possible mount configurations for each path to validate the diagnostic tool.
|
||
Uses pytest framework for proper test discovery and execution.
|
||
|
||
All tests use the mounts table. For reference, the mounts table looks like this:
|
||
|
||
Path | Writeable | Mount | RAMDisk | Performance | DataLoss
|
||
------------------------------------+-----------+-------+---------+-------------+----------
|
||
/app/db | ✅ | ❌ | ➖ | ➖ | ❌
|
||
/app/config | ✅ | ❌ | ➖ | ➖ | ❌
|
||
/app/api | ✅ | ❌ | ❌ | ❌ | ✅
|
||
/app/log | ✅ | ❌ | ❌ | ❌ | ✅
|
||
/services/run | ✅ | ❌ | ❌ | ❌ | ✅
|
||
/services/config/nginx/conf.active | ✅ | ❌ | ❌ | ❌ | ✅
|
||
|
||
Table Assertions:
|
||
- Use assert_table_row(output, path, writeable=True/False/None, mount=True/False/None, ...)
|
||
- Emojis are converted: ✅=True, ❌=False, ➖=None
|
||
- Example: assert_table_row(output, "/app/db", writeable=True, mount=False, dataloss=False)
|
||
|
||
"""
|
||
|
||
import os
|
||
import subprocess
|
||
import pytest
|
||
import re
|
||
from pathlib import Path
|
||
from dataclasses import dataclass
|
||
from typing import List, Optional, Tuple, Union
|
||
|
||
# Test configurations directory
|
||
CONFIG_DIR = Path(__file__).parent / "configurations"
|
||
|
||
@dataclass
|
||
class MountTableRow:
|
||
"""Represents a parsed row from the mount diagnostic table."""
|
||
path: str
|
||
writeable: bool
|
||
mount: bool
|
||
ramdisk: Optional[bool] # None for ➖
|
||
performance: Optional[bool] # None for ➖
|
||
dataloss: bool
|
||
|
||
def parse_mount_table(output: str) -> List[MountTableRow]:
|
||
"""Parse the mount diagnostic table from stdout."""
|
||
rows = []
|
||
|
||
# Find the table in the output
|
||
lines = output.split('\n')
|
||
table_start = None
|
||
|
||
for i, line in enumerate(lines):
|
||
if line.startswith(' Path ') and '|' in line:
|
||
table_start = i
|
||
break
|
||
|
||
if table_start is None:
|
||
return rows
|
||
|
||
# Skip header and separator lines
|
||
data_lines = lines[table_start + 2:]
|
||
|
||
for line in data_lines:
|
||
if '|' not in line or line.strip() == '':
|
||
continue
|
||
|
||
# Split by | and clean up
|
||
parts = [part.strip() for part in line.split('|')]
|
||
if len(parts) < 6:
|
||
continue
|
||
|
||
path = parts[0]
|
||
if not path:
|
||
continue
|
||
|
||
# Convert emojis to boolean/none
|
||
def emoji_to_bool(emoji: str) -> Optional[bool]:
|
||
emoji = emoji.strip()
|
||
if emoji == '✅':
|
||
return True
|
||
elif emoji == '❌':
|
||
return False
|
||
elif emoji == '➖':
|
||
return None
|
||
return None
|
||
|
||
try:
|
||
row = MountTableRow(
|
||
path=path,
|
||
writeable=emoji_to_bool(parts[1]),
|
||
mount=emoji_to_bool(parts[2]),
|
||
ramdisk=emoji_to_bool(parts[3]),
|
||
performance=emoji_to_bool(parts[4]),
|
||
dataloss=emoji_to_bool(parts[5])
|
||
)
|
||
rows.append(row)
|
||
except (IndexError, ValueError):
|
||
continue
|
||
|
||
return rows
|
||
|
||
def assert_table_row(output: str, expected_path: str,
|
||
writeable: Optional[bool] = None,
|
||
mount: Optional[bool] = None,
|
||
ramdisk: Optional[bool] = None,
|
||
performance: Optional[bool] = None,
|
||
dataloss: Optional[bool] = None) -> MountTableRow:
|
||
"""Assert that a specific table row matches expected values."""
|
||
|
||
rows = parse_mount_table(output)
|
||
|
||
# Find the row for the expected path
|
||
matching_row = None
|
||
for row in rows:
|
||
if row.path == expected_path:
|
||
matching_row = row
|
||
break
|
||
|
||
assert matching_row is not None, f"Path '{expected_path}' not found in table. Available paths: {[r.path for r in rows]}"
|
||
|
||
# Check each field if specified
|
||
if writeable is not None:
|
||
assert matching_row.writeable == writeable, f"Path '{expected_path}': expected writeable={writeable}, got {matching_row.writeable}"
|
||
|
||
if mount is not None:
|
||
assert matching_row.mount == mount, f"Path '{expected_path}': expected mount={mount}, got {matching_row.mount}"
|
||
|
||
if ramdisk is not None:
|
||
assert matching_row.ramdisk == ramdisk, f"Path '{expected_path}': expected ramdisk={ramdisk}, got {matching_row.ramdisk}"
|
||
|
||
if performance is not None:
|
||
assert matching_row.performance == performance, f"Path '{expected_path}': expected performance={performance}, got {matching_row.performance}"
|
||
|
||
if dataloss is not None:
|
||
assert matching_row.dataloss == dataloss, f"Path '{expected_path}': expected dataloss={dataloss}, got {matching_row.dataloss}"
|
||
|
||
return matching_row
|
||
|
||
@dataclass
|
||
class TestScenario:
|
||
"""Represents a test scenario for a specific path configuration."""
|
||
__test__ = False # Prevent pytest from collecting this as a test class
|
||
name: str
|
||
path_var: str
|
||
container_path: str
|
||
is_persistent: bool
|
||
docker_compose: str
|
||
expected_issues: List[str] # List of expected issue types
|
||
expected_exit_code: int # Expected container exit code
|
||
|
||
@pytest.fixture(scope="session")
|
||
def netalertx_test_image():
|
||
"""Ensure the netalertx-test image exists."""
|
||
image_name = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
|
||
|
||
# Check if image exists
|
||
result = subprocess.run(
|
||
["docker", "images", "-q", image_name],
|
||
capture_output=True,
|
||
text=True
|
||
)
|
||
|
||
if not result.stdout.strip():
|
||
pytest.skip(f"NetAlertX test image '{image_name}' not found. Build it first.")
|
||
|
||
return image_name
|
||
|
||
@pytest.fixture
|
||
def test_scenario(request):
|
||
"""Fixture that provides test scenarios."""
|
||
return request.param
|
||
|
||
def create_test_scenarios() -> List[TestScenario]:
|
||
"""Create all test scenarios."""
|
||
|
||
scenarios = []
|
||
|
||
# Define paths to test
|
||
paths = [
|
||
("db", "/app/db", True, "NETALERTX_DB"),
|
||
("config", "/app/config", True, "NETALERTX_CONFIG"),
|
||
("api", "/app/api", False, "NETALERTX_API"),
|
||
("log", "/app/log", False, "NETALERTX_LOG"),
|
||
("run", "/services/run", False, "SYSTEM_SERVICES_RUN"),
|
||
("active_config", "/services/config/nginx/conf.active", False, "SYSTEM_SERVICES_ACTIVE_CONFIG"),
|
||
]
|
||
|
||
# Test scenarios for each path
|
||
test_scenarios = [
|
||
("no-mount", ["table_issues", "warning_message"]), # Always issues
|
||
("ramdisk", []), # Good for non-persistent, bad for persistent
|
||
("mounted", ["table_issues", "warning_message"]), # Bad for non-persistent, good for persistent
|
||
("unwritable", ["table_issues", "warning_message"]), # Always issues
|
||
]
|
||
|
||
for path_name, container_path, is_persistent, env_var in paths:
|
||
for scenario_name, base_expected_issues in test_scenarios:
|
||
# Adjust expected issues based on persistence and scenario
|
||
expected_issues = list(base_expected_issues) # Copy the list
|
||
|
||
if scenario_name == "ramdisk" and is_persistent:
|
||
# Ramdisk is bad for persistent paths
|
||
expected_issues = ["table_issues", "warning_message"]
|
||
elif scenario_name == "mounted" and is_persistent:
|
||
# Mounted is good for persistent paths
|
||
expected_issues = []
|
||
elif path_name == "active_config" and scenario_name == "unwritable":
|
||
# active_config unwritable: RAM disk issues detected
|
||
expected_issues = ["table_issues", "warning_message"]
|
||
compose_file = f"docker-compose.mount-test.{path_name}_{scenario_name}.yml"
|
||
|
||
# Determine expected exit code
|
||
expected_exit_code = 1 if scenario_name == "unwritable" else 0
|
||
|
||
scenarios.append(TestScenario(
|
||
name=f"{path_name}_{scenario_name}",
|
||
path_var=env_var,
|
||
container_path=container_path,
|
||
is_persistent=is_persistent,
|
||
docker_compose=compose_file,
|
||
expected_issues=expected_issues,
|
||
expected_exit_code=expected_exit_code
|
||
))
|
||
|
||
return scenarios
|
||
|
||
|
||
def validate_scenario_table_output(output: str, test_scenario: TestScenario) -> None:
|
||
"""Validate the diagnostic table for scenarios that should report issues."""
|
||
|
||
if not test_scenario.expected_issues:
|
||
return
|
||
|
||
try:
|
||
if test_scenario.name.startswith('db_'):
|
||
if test_scenario.name == 'db_ramdisk':
|
||
# db on ramdisk: mount=True, ramdisk=False (detected), dataloss=False (risk)
|
||
assert_table_row(output, '/app/db', mount=True, ramdisk=False, dataloss=False)
|
||
elif test_scenario.name == 'db_no-mount':
|
||
# db not mounted: mount=False, dataloss=False (risk)
|
||
assert_table_row(output, '/app/db', mount=False, dataloss=False)
|
||
elif test_scenario.name == 'db_unwritable':
|
||
# db read-only: writeable=False
|
||
assert_table_row(output, '/app/db', writeable=False)
|
||
|
||
elif test_scenario.name.startswith('config_'):
|
||
if test_scenario.name == 'config_ramdisk':
|
||
# config on ramdisk: mount=True, ramdisk=False (detected), dataloss=False (risk)
|
||
assert_table_row(output, '/app/config', mount=True, ramdisk=False, dataloss=False)
|
||
elif test_scenario.name == 'config_no-mount':
|
||
# config not mounted: mount=False, dataloss=False (risk)
|
||
assert_table_row(output, '/app/config', mount=False, dataloss=False)
|
||
elif test_scenario.name == 'config_unwritable':
|
||
# config read-only: writeable=False
|
||
assert_table_row(output, '/app/config', writeable=False)
|
||
|
||
elif test_scenario.name.startswith('api_'):
|
||
if test_scenario.name == 'api_mounted':
|
||
# api with volume mount: mount=True, performance=False (not ramdisk)
|
||
assert_table_row(output, '/app/api', mount=True, performance=False)
|
||
elif test_scenario.name == 'api_no-mount':
|
||
# api not mounted: mount=False, performance=False (not ramdisk)
|
||
assert_table_row(output, '/app/api', mount=False, performance=False)
|
||
elif test_scenario.name == 'api_unwritable':
|
||
# api read-only: writeable=False
|
||
assert_table_row(output, '/app/api', writeable=False)
|
||
|
||
elif test_scenario.name.startswith('log_'):
|
||
if test_scenario.name == 'log_mounted':
|
||
# log with volume mount: mount=True, performance=False (not ramdisk)
|
||
assert_table_row(output, '/app/log', mount=True, performance=False)
|
||
elif test_scenario.name == 'log_no-mount':
|
||
# log not mounted: mount=False, performance=False (not ramdisk)
|
||
assert_table_row(output, '/app/log', mount=False, performance=False)
|
||
elif test_scenario.name == 'log_unwritable':
|
||
# log read-only: writeable=False
|
||
assert_table_row(output, '/app/log', writeable=False)
|
||
|
||
elif test_scenario.name.startswith('run_'):
|
||
if test_scenario.name == 'run_mounted':
|
||
# run with volume mount: mount=True, performance=False (not ramdisk)
|
||
assert_table_row(output, '/services/run', mount=True, performance=False)
|
||
elif test_scenario.name == 'run_no-mount':
|
||
# run not mounted: mount=False, performance=False (not ramdisk)
|
||
assert_table_row(output, '/services/run', mount=False, performance=False)
|
||
elif test_scenario.name == 'run_unwritable':
|
||
# run read-only: writeable=False
|
||
assert_table_row(output, '/services/run', writeable=False)
|
||
|
||
elif test_scenario.name.startswith('active_config_'):
|
||
if test_scenario.name == 'active_config_mounted':
|
||
# active_config with volume mount: mount=True, performance=False (not ramdisk)
|
||
assert_table_row(output, '/services/config/nginx/conf.active', mount=True, performance=False)
|
||
elif test_scenario.name == 'active_config_no-mount':
|
||
# active_config not mounted: mount=False, performance=False (not ramdisk)
|
||
assert_table_row(output, '/services/config/nginx/conf.active', mount=False, performance=False)
|
||
elif test_scenario.name == 'active_config_unwritable':
|
||
# active_config unwritable: RAM disk issues detected
|
||
assert_table_row(output, '/services/config/nginx/conf.active', ramdisk=False, performance=False)
|
||
|
||
except AssertionError as e:
|
||
pytest.fail(f"Table validation failed for {test_scenario.name}: {e}")
|
||
|
||
@pytest.mark.parametrize("test_scenario", create_test_scenarios(), ids=lambda s: s.name)
|
||
@pytest.mark.docker
|
||
def test_mount_diagnostic(netalertx_test_image, test_scenario):
|
||
"""Test that the mount diagnostic tool correctly identifies issues for each configuration."""
|
||
|
||
# Use the pre-generated docker-compose file
|
||
compose_file = CONFIG_DIR / "mount-tests" / test_scenario.docker_compose
|
||
assert compose_file.exists(), f"Docker compose file not found: {compose_file}"
|
||
|
||
# Start container
|
||
project_name = f"mount-test-{test_scenario.name.replace('_', '-')}"
|
||
|
||
# Remove any existing containers with the same project name
|
||
cmd_down = [
|
||
"docker-compose", "-f", str(compose_file),
|
||
"-p", project_name, "down", "-v"
|
||
]
|
||
subprocess.run(cmd_down, capture_output=True, timeout=30)
|
||
|
||
cmd_up = [
|
||
"docker-compose", "-f", str(compose_file),
|
||
"-p", project_name, "up", "-d"
|
||
]
|
||
|
||
result_up = subprocess.run(cmd_up, capture_output=True, text=True, timeout=60)
|
||
if result_up.returncode != 0:
|
||
pytest.fail(
|
||
f"Failed to start container: {result_up.stderr}\n"
|
||
f"STDOUT: {result_up.stdout}"
|
||
)
|
||
|
||
try:
|
||
# Wait for container to be ready
|
||
import time
|
||
time.sleep(4)
|
||
|
||
# Check if container is still running
|
||
container_name = f"netalertx-test-mount-{test_scenario.name}"
|
||
result_ps = subprocess.run(
|
||
["docker", "ps", "-q", "-f", f"name={container_name}"],
|
||
capture_output=True, text=True
|
||
)
|
||
|
||
if not result_ps.stdout.strip():
|
||
# Container exited - check the exit code
|
||
result_inspect = subprocess.run(
|
||
["docker", "inspect", container_name, "--format={{.State.ExitCode}}"],
|
||
capture_output=True, text=True
|
||
)
|
||
actual_exit_code = int(result_inspect.stdout.strip())
|
||
|
||
# Assert the exit code matches expected
|
||
assert actual_exit_code == test_scenario.expected_exit_code, (
|
||
f"Container {container_name} exited with code {actual_exit_code}, "
|
||
f"expected {test_scenario.expected_exit_code}"
|
||
)
|
||
# Check the logs to see if it detected the expected issues
|
||
result_logs = subprocess.run(
|
||
["docker", "logs", container_name],
|
||
capture_output=True, text=True
|
||
)
|
||
|
||
logs = result_logs.stdout + result_logs.stderr
|
||
|
||
if test_scenario.expected_issues:
|
||
validate_scenario_table_output(logs, test_scenario)
|
||
|
||
return # Test passed - container correctly detected issues and exited
|
||
|
||
# Container is still running - run diagnostic tool
|
||
cmd_exec = [
|
||
"docker", "exec", "--user", "netalertx", container_name,
|
||
"python3", "/entrypoint.d/10-mounts.py"
|
||
]
|
||
result_exec = subprocess.run(cmd_exec, capture_output=True, text=True, timeout=30)
|
||
diagnostic_output = result_exec.stdout + result_exec.stderr
|
||
|
||
# The diagnostic tool returns 1 for unwritable paths except active_config, which only warns
|
||
if test_scenario.name.startswith('active_config_') and 'unwritable' in test_scenario.name:
|
||
expected_tool_exit = 0
|
||
elif 'unwritable' in test_scenario.name:
|
||
expected_tool_exit = 1
|
||
else:
|
||
expected_tool_exit = 0
|
||
|
||
assert result_exec.returncode == expected_tool_exit, (
|
||
f"Diagnostic tool failed: {result_exec.stderr}"
|
||
)
|
||
|
||
if test_scenario.expected_issues:
|
||
validate_scenario_table_output(diagnostic_output, test_scenario)
|
||
assert "⚠️" in diagnostic_output, (
|
||
f"Issue scenario {test_scenario.name} should include a warning symbol, got: {result_exec.stderr}"
|
||
)
|
||
else:
|
||
# Should have table output but no warning message
|
||
assert "Path" in result_exec.stdout, f"Good config {test_scenario.name} should show table, got: {result_exec.stdout}"
|
||
assert "⚠️" not in diagnostic_output, (
|
||
f"Good config {test_scenario.name} should not show warning, got stderr: {result_exec.stderr}"
|
||
)
|
||
return # Test passed - diagnostic output validated
|
||
|
||
finally:
|
||
# Stop container
|
||
cmd_down = [
|
||
"docker-compose", "-f", str(compose_file),
|
||
"-p", project_name, "down", "-v"
|
||
]
|
||
subprocess.run(cmd_down, capture_output=True, timeout=30)
|
||
|
||
def test_table_parsing():
|
||
"""Test the table parsing and assertion functions."""
|
||
|
||
sample_output = """
|
||
Path | Writeable | Mount | RAMDisk | Performance | DataLoss
|
||
------------------------------------+-----------+-------+---------+-------------+----------
|
||
/app/db | ✅ | ❌ | ➖ | ➖ | ❌
|
||
/app/api | ✅ | ✅ | ✅ | ✅ | ✅
|
||
"""
|
||
|
||
# Test parsing
|
||
rows = parse_mount_table(sample_output)
|
||
assert len(rows) == 2
|
||
|
||
# Test assertions
|
||
assert_table_row(sample_output, "/app/db", writeable=True, mount=False, ramdisk=None, performance=None, dataloss=False)
|
||
assert_table_row(sample_output, "/app/api", writeable=True, mount=True, ramdisk=True, performance=True, dataloss=True) |