Files
NetAlertX/test/docker_tests/test_container_environment.py
2025-12-22 02:08:50 +00:00

1260 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
This set of tests requires netalertx-test image built. Ensure netalertx-test image is built prior
to starting these tests or they will fail. netalertx-test image is generally rebuilt using the
Build Unit Test Docker Image task. but can be created manually with the following command executed
in the workspace:
docker buildx build -t netalertx-test .
"""
import os
import pathlib
import shutil
import subprocess
import uuid
import re
import pytest
IMAGE = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
GRACE_SECONDS = float(os.environ.get("NETALERTX_TEST_GRACE", "2"))
DEFAULT_CAPS = ["NET_RAW", "NET_ADMIN", "NET_BIND_SERVICE"]
SUBPROCESS_TIMEOUT_SECONDS = float(os.environ.get("NETALERTX_TEST_SUBPROCESS_TIMEOUT", "60"))
CONTAINER_TARGETS: dict[str, str] = {
"data": "/data",
"app_db": "/data/db",
"data_db": "/data/db",
"app_config": "/data/config",
"data_config": "/data/config",
"app_log": "/tmp/log",
"log": "/tmp/log",
"app_api": os.environ.get("NETALERTX_API", "/tmp/api"),
"api": os.environ.get("NETALERTX_API", "/tmp/api"),
"nginx_conf": "/tmp/nginx/active-config",
"nginx_active": "/tmp/nginx/active-config",
"services_run": "/tmp/run",
}
DATA_SUBDIR_KEYS = ("app_db", "app_config")
OPTIONAL_TMP_KEYS = ("app_log", "app_api", "nginx_conf", "services_run")
VOLUME_MAP = CONTAINER_TARGETS
pytestmark = [pytest.mark.docker, pytest.mark.feature_complete]
def _unique_label(prefix: str) -> str:
return f"{prefix.upper()}__NETALERTX_INTENTIONAL__{uuid.uuid4().hex[:6]}"
def _repo_root() -> pathlib.Path:
env = os.environ.get("NETALERTX_REPO_ROOT")
if env:
return pathlib.Path(env)
cur = pathlib.Path(__file__).resolve()
for parent in cur.parents:
if any(
[
(parent / "pyproject.toml").exists(),
(parent / ".git").exists(),
(parent / "back").exists() and (parent / "db").exists(),
]
):
return parent
return cur.parents[2]
def _docker_visible_tmp_root() -> pathlib.Path:
"""Return a docker-daemon-visible scratch directory for bind mounts.
Pytest's default tmp_path lives under /tmp inside the devcontainer, which may
not be visible to the Docker daemon that evaluates bind mount source paths.
We use /tmp/pytest-docker-mounts instead of the repo.
"""
root = pathlib.Path("/tmp/pytest-docker-mounts")
root.mkdir(parents=True, exist_ok=True)
try:
root.chmod(0o777)
except PermissionError:
# Best-effort; the directory only needs to be writable by the current user.
pass
return root
def _docker_visible_path(path: pathlib.Path) -> pathlib.Path:
"""Map a path into `_docker_visible_tmp_root()` when it lives under /tmp."""
try:
if str(path).startswith("/tmp/"):
return _docker_visible_tmp_root() / path.name
except Exception:
pass
return path
def _setup_mount_tree(
tmp_path: pathlib.Path,
prefix: str,
*,
seed_config: bool = True,
seed_db: bool = True,
) -> dict[str, pathlib.Path]:
"""Create a compose-like host tree with permissive perms for arbitrary UID/GID."""
label = _unique_label(prefix)
base = _docker_visible_tmp_root() / f"{label}_MOUNT_ROOT"
base.mkdir()
base.chmod(0o777)
paths: dict[str, pathlib.Path] = {}
data_root = base / f"{label}_DATA_INTENTIONAL_NETALERTX_TEST"
data_root.mkdir(parents=True, exist_ok=True)
data_root.chmod(0o777)
paths["data"] = data_root
db_dir = data_root / "db"
db_dir.mkdir(exist_ok=True)
db_dir.chmod(0o777)
paths["app_db"] = db_dir
paths["data_db"] = db_dir
config_dir = data_root / "config"
config_dir.mkdir(exist_ok=True)
config_dir.chmod(0o777)
paths["app_config"] = config_dir
paths["data_config"] = config_dir
for key in OPTIONAL_TMP_KEYS:
folder_name = f"{label}_{key.upper()}_INTENTIONAL_NETALERTX_TEST"
host_path = base / folder_name
host_path.mkdir(parents=True, exist_ok=True)
host_path.chmod(0o777)
paths[key] = host_path
if key == "app_log":
paths["log"] = host_path
elif key == "app_api":
paths["api"] = host_path
elif key == "nginx_conf":
paths["nginx_active"] = host_path
repo_root = _repo_root()
if seed_config:
config_src = repo_root / "back" / "app.conf"
config_dst = paths["app_config"] / "app.conf"
if config_src.exists():
shutil.copyfile(config_src, config_dst)
config_dst.chmod(0o666)
if seed_db:
db_src = repo_root / "db" / "app.db"
db_dst = paths["app_db"] / "app.db"
if db_src.exists():
shutil.copyfile(db_src, db_dst)
db_dst.chmod(0o666)
# Ensure every mount point is world-writable so arbitrary UID/GID can write
for p in paths.values():
if p.is_dir():
p.chmod(0o777)
for child in p.iterdir():
if child.is_dir():
child.chmod(0o777)
else:
child.chmod(0o666)
else:
p.chmod(0o666)
return paths
def _setup_fixed_mount_tree(base: pathlib.Path) -> dict[str, pathlib.Path]:
base = _docker_visible_path(base)
if base.exists():
shutil.rmtree(base)
base.mkdir(parents=True)
try:
base.chmod(0o777)
except PermissionError:
pass
paths: dict[str, pathlib.Path] = {}
data_root = base / "DATA_NETALERTX_TEST"
data_root.mkdir(parents=True, exist_ok=True)
data_root.chmod(0o777)
paths["data"] = data_root
db_dir = data_root / "db"
db_dir.mkdir(exist_ok=True)
db_dir.chmod(0o777)
paths["app_db"] = db_dir
paths["data_db"] = db_dir
config_dir = data_root / "config"
config_dir.mkdir(exist_ok=True)
config_dir.chmod(0o777)
paths["app_config"] = config_dir
paths["data_config"] = config_dir
for key in OPTIONAL_TMP_KEYS:
host_path = base / f"{key.upper()}_NETALERTX_TEST"
host_path.mkdir(parents=True, exist_ok=True)
host_path.chmod(0o777)
paths[key] = host_path
if key == "app_log":
paths["log"] = host_path
elif key == "app_api":
paths["api"] = host_path
elif key == "nginx_conf":
paths["nginx_active"] = host_path
return paths
def _build_volume_args(
paths: dict[str, pathlib.Path],
) -> list[tuple[str, str, bool]]:
return _build_volume_args_for_keys(paths, {"data"})
def _build_volume_args_for_keys(
paths: dict[str, pathlib.Path],
keys: set[str],
read_only: set[str] | None = None,
) -> list[tuple[str, str, bool]]:
bindings: list[tuple[str, str, bool]] = []
read_only = read_only or set()
for key in keys:
if key not in CONTAINER_TARGETS:
raise KeyError(f"Unknown mount key {key}")
target = CONTAINER_TARGETS[key]
if key not in paths:
raise KeyError(f"Missing host path for key {key}")
bindings.append((str(paths[key]), target, key in read_only))
return bindings
def _chown_path(host_path: pathlib.Path, uid: int, gid: int) -> None:
"""Chown a host path using the test image with host user namespace."""
if not host_path.exists():
raise RuntimeError(f"Cannot chown missing path {host_path}")
cmd = [
"docker",
"run",
"--rm",
"--userns",
"host",
"--user",
"0:0",
"--entrypoint",
"/bin/chown",
"-v",
f"{host_path}:/mnt",
IMAGE,
"-R",
f"{uid}:{gid}",
"/mnt",
]
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Failed to chown {host_path} to {uid}:{gid}") from exc
def _chown_root(host_path: pathlib.Path) -> None:
_chown_path(host_path, 0, 0)
def _chown_netalertx(host_path: pathlib.Path) -> None:
_chown_path(host_path, 20211, 20211)
def _docker_volume_rm(volume_name: str) -> None:
subprocess.run(
["docker", "volume", "rm", "-f", volume_name],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _docker_volume_create(volume_name: str) -> None:
subprocess.run(
["docker", "volume", "create", volume_name],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _fresh_named_volume(prefix: str) -> str:
name = _unique_label(prefix).lower().replace("__", "-")
# Ensure we're exercising Docker's fresh-volume copy-up behavior.
_docker_volume_rm(name)
return name
def _ensure_volume_copy_up(volume_name: str) -> None:
"""Ensure a named volume is initialized from the NetAlertX image.
If we write into the volume first (e.g., with an Alpine helper container),
Docker will not perform the image-to-volume copy-up and the volume root may
stay root:root 0755, breaking arbitrary UID/GID runs.
"""
subprocess.run(
[
"docker",
"run",
"--rm",
"--userns",
"host",
"-v",
f"{volume_name}:/data",
"--entrypoint",
"/bin/sh",
IMAGE,
"-c",
"true",
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _seed_volume_text_file(
volume_name: str,
container_path: str,
content: str,
*,
chmod_mode: str = "644",
user: str | None = None,
) -> None:
"""Create/overwrite a text file inside a named volume.
Uses a tiny helper container so we don't rely on bind mounts (which are
resolved on the Docker daemon host).
"""
cmd = [
"docker",
"run",
"--rm",
"--userns",
"host",
]
if user:
cmd.extend(["--user", user])
cmd.extend(
[
"-v",
f"{volume_name}:/data",
"alpine:3.22",
"sh",
"-c",
f"set -eu; mkdir -p \"$(dirname '{container_path}')\"; cat > '{container_path}'; chmod {chmod_mode} '{container_path}'",
]
)
subprocess.run(
cmd,
input=content,
text=True,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _volume_has_file(volume_name: str, container_path: str) -> bool:
return (
subprocess.run(
[
"docker",
"run",
"--rm",
"--userns",
"host",
"-v",
f"{volume_name}:/data",
"alpine:3.22",
"sh",
"-c",
f"test -f '{container_path}'",
],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
).returncode
== 0
)
@pytest.mark.parametrize(
"uid_gid",
[
(1001, 1001),
(1502, 1502),
],
)
def test_nonroot_custom_uid_logs_note(
tmp_path: pathlib.Path,
uid_gid: tuple[int, int],
) -> None:
"""Ensure arbitrary non-root UID/GID can run with compose-like mounts."""
uid, gid = uid_gid
vol = _fresh_named_volume(f"note_uid_{uid}")
try:
# Fresh named volume at /data: matches default docker-compose UX.
result = _run_container(
f"note-uid-{uid}",
volumes=None,
volume_specs=[f"{vol}:/data"],
user=f"{uid}:{gid}",
sleep_seconds=5,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, f"NetAlertX note: current UID {uid} GID {gid}", result.args)
assert "expected UID" in result.output
assert result.returncode == 0
def _run_container(
label: str,
volumes: list[tuple[str, str, bool]] | None = None,
*,
env: dict[str, str] | None = None,
user: str | None = None,
drop_caps: list[str] | None = None,
network_mode: str | None = "host",
extra_args: list[str] | None = None,
volume_specs: list[str] | None = None,
sleep_seconds: float = GRACE_SECONDS,
wait_for_exit: bool = False,
pre_entrypoint: str | None = None,
userns_mode: str | None = "host",
image: str = IMAGE,
) -> subprocess.CompletedProcess[str]:
name = f"netalertx-test-{label}-{uuid.uuid4().hex[:8]}".lower()
tmp_uid = 20211
tmp_gid = 20211
if user:
try:
u_str, g_str = user.split(":", 1)
tmp_uid = int(u_str)
tmp_gid = int(g_str)
except Exception:
# Keep defaults if user format is unexpected.
tmp_uid = 20211
tmp_gid = 20211
# Clean up any existing container with this name
subprocess.run(
["docker", "rm", "-f", name],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
cmd: list[str] = ["docker", "run", "--rm", "--name", name]
# Avoid flakiness in host-network runs when the host already uses the
# default NetAlertX ports. Tests can still override explicitly via `env`.
effective_env: dict[str, str] = dict(env or {})
if network_mode == "host":
if "PORT" not in effective_env:
effective_env["PORT"] = str(30000 + (int(uuid.uuid4().hex[:4], 16) % 20000))
if "GRAPHQL_PORT" not in effective_env:
gql = 30000 + (int(uuid.uuid4().hex[4:8], 16) % 20000)
if str(gql) == effective_env["PORT"]:
gql = 30000 + ((gql + 1) % 20000)
effective_env["GRAPHQL_PORT"] = str(gql)
if network_mode:
cmd.extend(["--network", network_mode])
if userns_mode:
cmd.extend(["--userns", userns_mode])
# Match docker-compose UX: /tmp is tmpfs with 1700 and owned by the runtime UID/GID.
cmd.extend(["--tmpfs", f"/tmp:mode=1700,uid={tmp_uid},gid={tmp_gid}"])
if user:
cmd.extend(["--user", user])
if drop_caps is not None:
for cap in drop_caps:
cmd.extend(["--cap-drop", cap])
else:
cmd.extend(["--cap-drop", "ALL"])
for cap in DEFAULT_CAPS:
cmd.extend(["--cap-add", cap])
if effective_env:
for key, value in effective_env.items():
cmd.extend(["-e", f"{key}={value}"])
if extra_args:
cmd.extend(extra_args)
for host_path, target, readonly in volumes or []:
mount = f"{host_path}:{target}"
if readonly:
mount += ":ro"
cmd.extend(["-v", mount])
if volume_specs:
for spec in volume_specs:
cmd.extend(["-v", spec])
# Diagnostic wrapper: list ownership and perms of mounted targets inside
# the container before running the real entrypoint. This helps debug
# permission failures by capturing the container's view of the host mounts.
mounts_ls = """
echo "--- MOUNT PERMS (container view) ---";
ls -ldn \
"""
for _, target, _ in volumes or []:
mounts_ls += f" {target}"
mounts_ls += " || true; echo '--- END MOUNTS ---'; \n"
setup_script = ""
if pre_entrypoint:
setup_script = pre_entrypoint
if not setup_script.endswith("\n"):
setup_script += "\n"
if wait_for_exit:
script = mounts_ls + setup_script + "sh /entrypoint.sh"
else:
script = "".join([
mounts_ls,
setup_script,
"sh /entrypoint.sh & pid=$!; ",
f"sleep {sleep_seconds}; ",
"if kill -0 $pid >/dev/null 2>&1; then kill -TERM $pid >/dev/null 2>&1 || true; fi; ",
"wait $pid; code=$?; if [ $code -eq 143 ]; then exit 0; fi; exit $code"
])
cmd.extend(["--entrypoint", "/bin/sh", image, "-c", script])
# Print the full Docker command for debugging
print("\n--- DOCKER CMD ---\n", " ".join(cmd), "\n--- END CMD ---\n")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=max(SUBPROCESS_TIMEOUT_SECONDS, sleep_seconds + 30),
check=False,
)
# Combine and clean stdout and stderr
stdouterr = re.sub(r"\x1b\[[0-9;]*m", "", result.stdout or "") + re.sub(
r"\x1b\[[0-9;]*m", "", result.stderr or ""
)
result.output = stdouterr
# Print container output for debugging in every test run.
print("\n--- CONTAINER OUTPUT START ---")
print(result.output)
print("--- CONTAINER OUTPUT END ---\n")
return result
def _assert_contains(result, snippet: str, cmd: list[str] = None) -> None:
output = result.output + result.stderr
if snippet not in output:
cmd_str = " ".join(cmd) if cmd else ""
raise AssertionError(
f"Expected to find '{snippet}' in container output.\n"
f"STDOUT:\n{result.output}\n"
f"STDERR:\n{result.stderr}\n"
f"Combined output:\n{output}\n"
f"Container command:\n{cmd_str}"
)
def _extract_mount_rows(output: str) -> dict[str, list[str]]:
rows: dict[str, list[str]] = {}
in_table = False
expected_cols = 0
for raw_line in (output or "").splitlines():
line = raw_line.rstrip()
if not in_table:
if line.startswith(" Path") and "Writeable" in line:
# Legacy format: Path | Writeable | Mount | RAMDisk | Performance | DataLoss
in_table = True
expected_cols = 5
elif line.startswith(" Path") and "| R" in line and "| W" in line:
# Current format: Path | R | W | Mount | RAMDisk | Performance | DataLoss
in_table = True
expected_cols = 6
continue
if not line.strip():
break
if line.lstrip().startswith("Path"):
continue
if set(line.strip()) <= {"-", "+"}:
continue
parts = [part.strip() for part in line.split("|")]
if len(parts) < 1 + expected_cols:
continue
path = parts[0].strip()
if not path:
continue
rows[path] = parts[1 : 1 + expected_cols]
return rows
def _assert_mount_row(
result,
path: str,
*,
write: str | None = None,
mount: str | None = None,
ramdisk: str | None = None,
performance: str | None = None,
dataloss: str | None = None,
) -> None:
rows = _extract_mount_rows(result.output)
if path not in rows:
raise AssertionError(
f"Mount table row for {path} not found. Rows: {sorted(rows)}\nOutput:\n{result.output}"
)
columns = rows[path]
# Legacy: [Writeable, Mount, RAMDisk, Performance, DataLoss]
# Current: [R, W, Mount, RAMDisk, Performance, DataLoss]
if len(columns) == 5:
label_to_value = {
"Writeable": columns[0],
"Mount": columns[1],
"RAMDisk": columns[2],
"Performance": columns[3],
"DataLoss": columns[4],
}
write_label = "Writeable"
elif len(columns) == 6:
label_to_value = {
"R": columns[0],
"W": columns[1],
"Mount": columns[2],
"RAMDisk": columns[3],
"Performance": columns[4],
"DataLoss": columns[5],
}
write_label = "W"
else:
raise AssertionError(
f"Unexpected mount table column count for {path}: {len(columns)}. Columns: {columns}\nOutput:\n{result.output}"
)
checks = [
(write_label, write),
("Mount", mount),
("RAMDisk", ramdisk),
("Performance", performance),
("DataLoss", dataloss),
]
for label, expected in checks:
if expected is None:
continue
actual = label_to_value.get(label)
if actual != expected:
raise AssertionError(
f"{path} {label} expected {expected}, got {actual}.\n"
f"Row: {label_to_value}\nOutput:\n{result.output}"
)
def _setup_zero_perm_dir(paths: dict[str, pathlib.Path], key: str) -> None:
"""Set up a directory with files and zero permissions for testing."""
if key in ["app_db", "app_config"]:
# Files already exist from _setup_mount_tree seeding
pass
else:
# Create a dummy file for other directories
(paths[key] / "dummy.txt").write_text("dummy")
# Chmod all files in the directory to 000
for f in paths[key].iterdir():
f.chmod(0)
# Chmod the directory itself to 000
paths[key].chmod(0)
def _restore_zero_perm_dir(paths: dict[str, pathlib.Path], key: str) -> None:
"""Restore permissions after zero perm test."""
# Chmod directory back to 700
paths[key].chmod(0o700)
# Chmod files back to appropriate permissions
for f in paths[key].iterdir():
if f.name in ["app.db", "app.conf"]:
f.chmod(0o600)
else:
f.chmod(0o644)
def test_missing_capabilities_triggers_warning(tmp_path: pathlib.Path) -> None:
"""Test missing required capabilities - simulates insufficient container privileges.
5. Missing Required Capabilities: Simulates running without NET_ADMIN, NET_RAW,
NET_BIND_SERVICE capabilities. Required for ARP scanning and network operations.
Expected: "exec /bin/sh: operation not permitted" error, guidance to add capabilities.
Check script: N/A (capability check happens at container runtime)
Sample message: "exec /bin/sh: operation not permitted"
"""
paths = _setup_mount_tree(tmp_path, "missing_caps")
volumes = _build_volume_args_for_keys(paths, {"data"})
result = _run_container(
"missing-caps",
volumes,
drop_caps=["ALL"],
)
_assert_contains(result, "exec /bin/sh: operation not permitted", result.args)
assert result.returncode != 0
def test_running_as_root_is_blocked(tmp_path: pathlib.Path) -> None:
"""Test running as root user - simulates insecure container execution.
6. Running as Root User: Simulates running container as root (UID 0) instead of
dedicated netalertx user. Warning about security risks, special permission fix mode.
Expected: Warning about security risks, guidance to use UID 20211.
Check script: /entrypoint.d/0-storage-permission.sh
Sample message: "🚨 CRITICAL SECURITY ALERT: NetAlertX is running as ROOT (UID 0)!"
"""
paths = _setup_mount_tree(tmp_path, "run_as_root")
volumes = _build_volume_args_for_keys(paths, {"data", "nginx_conf"})
result = _run_container(
"run-as-root",
volumes,
user="0",
)
_assert_contains(result, "NetAlertX is running as ROOT", result.args)
_assert_contains(result, "Permissions fixed for read-write paths.", result.args)
assert (
result.returncode == 0
) # container warns but continues running, then terminated by test framework
def test_missing_host_network_warns(tmp_path: pathlib.Path) -> None:
# No output assertion, just returncode check
"""Test missing host networking - simulates running without host network mode.
8. Missing Host Networking: Simulates running without network_mode: host.
Limits ARP scanning capabilities for network discovery.
Expected: Warning about ARP scanning limitations, guidance to use host networking.
Check script: check-network-mode.sh
Sample message: "⚠️ ATTENTION: NetAlertX is not running with --network=host. Bridge networking..."
"""
vol = _fresh_named_volume("missing_host_network")
try:
result = _run_container(
"missing-host-network",
volumes=None,
volume_specs=[f"{vol}:/data"],
network_mode=None,
sleep_seconds=15,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "not running with --network=host", result.args)
# NOTE: The following runtime-behavior tests depended on the entrypoint continuing even when
# /data was mounted without write permissions. With fail-fast enabled we must supply a pre-owned
# (UID/GID 20211) data volume, which this dev container cannot provide for bind mounts. Once the
# docker tests switch to compose-managed fixtures, restore these cases by moving them back to the
# top level.
def test_missing_app_conf_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing configuration file seeding - simulates corrupted/missing app.conf.
9. Missing Configuration File: Simulates corrupted/missing app.conf.
Container automatically regenerates default configuration on startup.
Expected: Automatic regeneration of default configuration.
Check script: /entrypoint.d/15-first-run-config.sh
Sample message: "Default configuration written to"
"""
vol = _fresh_named_volume("missing_app_conf")
try:
result = _run_container(
"missing-app-conf",
volumes=None,
volume_specs=[f"{vol}:/data"],
sleep_seconds=15,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "Default configuration written to", result.args)
assert result.returncode == 0
def test_missing_app_db_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing database file seeding - simulates corrupted/missing app.db.
10. Missing Database File: Simulates corrupted/missing app.db.
Container automatically creates initial database schema on startup.
Expected: Automatic creation of initial database schema.
Check script: /entrypoint.d/20-first-run-db.sh
Sample message: "Building initial database schema"
"""
vol = _fresh_named_volume("missing_app_db")
try:
_ensure_volume_copy_up(vol)
# Seed only app.conf; leave app.db missing to trigger first-run DB schema creation.
_seed_volume_text_file(
vol,
"/data/config/app.conf",
"TIMEZONE='UTC'\n",
chmod_mode="644",
user="20211:20211",
)
result = _run_container(
"missing-app-db",
volumes=None,
volume_specs=[f"{vol}:/data"],
user="20211:20211",
sleep_seconds=20,
)
assert _volume_has_file(vol, "/data/db/app.db")
finally:
_docker_volume_rm(vol)
assert result.returncode == 0
def test_custom_port_without_writable_conf(tmp_path: pathlib.Path) -> None:
"""Test custom port configuration without writable nginx config mount.
4. Custom Port Without Nginx Config Mount: Simulates setting custom LISTEN_ADDR/PORT
without mounting nginx config. Container starts but uses default address.
Expected: Container starts but uses default address, warning about missing config mount.
Check script: check-nginx-config.sh
Sample messages: "⚠️ ATTENTION: Nginx configuration mount /tmp/nginx/active-config is missing."
"⚠️ ATTENTION: Unable to write to /tmp/nginx/active-config/netalertx.conf."
"""
vol = _fresh_named_volume("custom_port_ro_conf")
extra_args = [
"--tmpfs",
f"{VOLUME_MAP['nginx_conf']}:uid=20211,gid=20211,mode=500",
]
try:
result = _run_container(
"custom-port-ro-conf",
volumes=None,
volume_specs=[f"{vol}:/data"],
env={"PORT": "24444", "LISTEN_ADDR": "127.0.0.1"},
user="20211:20211",
extra_args=extra_args,
sleep_seconds=15,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "Unable to write to", result.args)
_assert_contains(
result, f"{VOLUME_MAP['nginx_conf']}/netalertx.conf", result.args
)
assert result.returncode != 0
def test_excessive_capabilities_warning(tmp_path: pathlib.Path) -> None:
"""Test excessive capabilities detection - simulates container with extra capabilities.
11. Excessive Capabilities: Simulates container with capabilities beyond the required
NET_ADMIN, NET_RAW, and NET_BIND_SERVICE.
Expected: Warning about excessive capabilities detected.
Check script: 90-excessive-capabilities.sh
Sample message: "Excessive capabilities detected"
"""
vol = _fresh_named_volume("excessive_caps")
try:
result = _run_container(
"excessive-caps",
volumes=None,
volume_specs=[f"{vol}:/data"],
extra_args=["--cap-add=SYS_ADMIN", "--cap-add=NET_BROADCAST"],
sleep_seconds=15,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "Excessive capabilities detected", result.args)
_assert_contains(result, "bounding caps:", result.args)
def test_appliance_integrity_read_write_mode(tmp_path: pathlib.Path) -> None:
"""Test appliance integrity - simulates running with read-write root filesystem.
12. Appliance Integrity: Simulates running container with read-write root filesystem
instead of read-only mode.
Expected: Warning about running in read-write mode instead of read-only.
Check script: 95-appliance-integrity.sh
Sample message: "Container is running as read-write, not in read-only mode"
"""
vol = _fresh_named_volume("appliance_integrity")
try:
result = _run_container(
"appliance-integrity",
volumes=None,
volume_specs=[f"{vol}:/data"],
sleep_seconds=15,
)
finally:
_docker_volume_rm(vol)
_assert_contains(
result, "Container is running as read-write, not in read-only mode", result.args
)
def test_zero_permissions_app_db_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: Mounts table shows ❌ for writeable status, configuration issues detected.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_db")
_setup_zero_perm_dir(paths, "app_db")
volumes = _build_volume_args_for_keys(paths, {"data"})
try:
result = _run_container(
"chmod-app-db",
volumes,
user="20211:20211",
wait_for_exit=True,
)
# Check that the mounts table shows the app_db directory as not writeable
_assert_mount_row(result, VOLUME_MAP["app_db"], write="")
# Check that configuration issues are detected
_assert_contains(result, "Configuration issues detected", result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "app_db")
def test_zero_permissions_app_config_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: Mounts table shows ❌ for writeable status, configuration issues detected.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_config")
_setup_zero_perm_dir(paths, "app_config")
volumes = _build_volume_args_for_keys(paths, {"data"})
try:
result = _run_container(
"chmod-app-config",
volumes,
user="20211:20211",
wait_for_exit=True,
)
# Check that the mounts table shows the app_config directory as not writeable
_assert_mount_row(result, VOLUME_MAP["app_config"], write="")
# Check that configuration issues are detected
_assert_contains(result, "Configuration issues detected", result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "app_config")
def test_mandatory_folders_creation(tmp_path: pathlib.Path) -> None:
"""Test mandatory folders creation - simulates missing plugins log directory.
1. Mandatory Folders: Simulates missing required directories and log files.
Container automatically creates plugins log, system services run log/tmp directories,
and required log files on startup.
Expected: Automatic creation of all required directories and files.
Check script: 25-mandatory-folders.sh
Sample message: "Creating Plugins log"
"""
paths = _setup_mount_tree(tmp_path, "mandatory_folders")
# Remove the plugins log directory to simulate missing mandatory folder
plugins_log_dir = paths["app_log"] / "plugins"
if plugins_log_dir.exists():
shutil.rmtree(plugins_log_dir)
# Ensure other directories are writable and owned by netalertx user so container gets past mounts.py
for key in [
"data",
"app_db",
"app_config",
"app_log",
"app_api",
"services_run",
"nginx_conf",
]:
paths[key].chmod(0o777)
_chown_netalertx(paths[key]) # Ensure all directories are owned by netalertx
volumes = _build_volume_args_for_keys(paths, {"data"})
result = _run_container(
"mandatory-folders", volumes, user="20211:20211", sleep_seconds=5
)
_assert_contains(result, "Creating Plugins log", result.args)
# The container will fail at writable config due to permission issues, but we just want to verify
# that mandatory folders creation ran successfully
def test_writable_config_validation(tmp_path: pathlib.Path) -> None:
"""Test writable config validation - simulates invalid config file type.
3. Writable Config Validation: Simulates app.conf being a non-regular file (directory).
Container verifies it can read from and write to critical config and database files.
Expected: "Path is not a regular file" warning for config file.
Check script: 35-writable-config.sh
Sample message: "Path is not a regular file"
"""
paths = _setup_mount_tree(tmp_path, "writable_config")
# Force a non-regular file for /data/config/app.conf to exercise the correct warning branch.
config_path = paths["app_config"] / "app.conf"
if config_path.exists():
if config_path.is_dir():
shutil.rmtree(config_path)
else:
config_path.unlink()
config_path.mkdir(parents=False)
config_path.chmod(0o777)
_chown_netalertx(config_path)
# Ensure directories are writable and owned by netalertx user so container gets past mounts.py
for key in [
"data",
"app_db",
"app_config",
"app_log",
"app_api",
"services_run",
"nginx_conf",
]:
paths[key].chmod(0o777)
_chown_netalertx(paths[key])
volumes = _build_volume_args_for_keys(paths, {"data"})
result = _run_container(
"writable-config", volumes, user="20211:20211", sleep_seconds=5.0
)
_assert_contains(result, "ATTENTION: Path is not a regular file.", result.args)
def test_mount_analysis_ram_disk_performance(tmp_path: pathlib.Path) -> None:
"""Test mount analysis for RAM disk performance issues.
Tests 10-mounts.py detection of persistent paths on RAM disks (tmpfs) which can cause
performance issues and data loss on container restart.
Expected: Mounts table shows ❌ for RAMDisk on persistent paths, performance warnings.
Check script: 10-mounts.py
Sample message: "Configuration issues detected"
"""
paths = _setup_mount_tree(tmp_path, "ram_disk_mount")
# Mount persistent paths (db, config) on tmpfs to simulate RAM disk
volumes = [
(str(paths["app_log"]), VOLUME_MAP["app_log"], False),
(str(paths["app_api"]), VOLUME_MAP["app_api"], False),
(str(paths["services_run"]), VOLUME_MAP["services_run"], False),
(str(paths["nginx_conf"]), VOLUME_MAP["nginx_conf"], False),
]
# Use tmpfs mounts for persistent paths with proper permissions
extra_args = [
"--tmpfs",
f"{VOLUME_MAP['app_db']}:uid=20211,gid=20211,mode=755",
"--tmpfs",
f"{VOLUME_MAP['app_config']}:uid=20211,gid=20211,mode=755",
]
result = _run_container(
"ram-disk-mount", volumes=volumes, extra_args=extra_args, user="20211:20211"
)
# Check that mounts table shows RAM disk detection for persistent paths
_assert_mount_row(
result,
VOLUME_MAP["app_db"],
write="",
mount="",
ramdisk="",
performance="",
dataloss="",
)
_assert_mount_row(
result,
VOLUME_MAP["app_config"],
write="",
mount="",
ramdisk="",
performance="",
dataloss="",
)
# Check that configuration issues are detected due to dataloss risk
_assert_contains(result, "Configuration issues detected", result.args)
assert result.returncode != 0
def test_mount_analysis_dataloss_risk(tmp_path: pathlib.Path) -> None:
"""Test mount analysis for dataloss risk on non-persistent filesystems.
Tests 10-mounts.py detection when persistent database/config paths are
mounted on non-persistent filesystems (tmpfs, ramfs).
Expected: Mounts table shows dataloss risk warnings for persistent paths on tmpfs.
Check script: 10-mounts.py
Sample message: "Configuration issues detected"
"""
paths = _setup_mount_tree(tmp_path, "dataloss_risk")
# Mount persistent paths (db, config) on tmpfs to simulate non-persistent storage
volumes = [
(str(paths["app_log"]), VOLUME_MAP["app_log"], False),
(str(paths["app_api"]), VOLUME_MAP["app_api"], False),
(str(paths["services_run"]), VOLUME_MAP["services_run"], False),
(str(paths["nginx_conf"]), VOLUME_MAP["nginx_conf"], False),
]
# Use tmpfs mounts for persistent paths with proper permissions
extra_args = [
"--tmpfs",
f"{VOLUME_MAP['app_db']}:uid=20211,gid=20211,mode=755",
"--tmpfs",
f"{VOLUME_MAP['app_config']}:uid=20211,gid=20211,mode=755",
]
result = _run_container(
"dataloss-risk", volumes=volumes, extra_args=extra_args, user="20211:20211"
)
# Check that mounts table shows dataloss risk for persistent paths on tmpfs
_assert_mount_row(
result,
VOLUME_MAP["app_db"],
write="",
mount="",
ramdisk="",
performance="",
dataloss="",
)
_assert_mount_row(
result,
VOLUME_MAP["app_config"],
write="",
mount="",
ramdisk="",
performance="",
dataloss="",
)
# Check that configuration issues are detected due to dataloss risk
_assert_contains(result, "Configuration issues detected", result.args)
assert result.returncode != 0
def test_restrictive_permissions_handling(tmp_path: pathlib.Path) -> None:
"""Test handling of restrictive permissions on bind mounts.
Simulates a user mounting a directory with restrictive permissions (e.g., 755 root:root).
The container should either fail gracefully or handle it if running as root (which triggers fix).
If running as non-root (default), it should fail to write if it doesn't have access.
"""
paths = _setup_mount_tree(tmp_path, "restrictive_perms")
# Helper to chown without userns host (workaround for potential devcontainer hang)
def _chown_root_safe(host_path: pathlib.Path) -> None:
cmd = [
"docker", "run", "--rm",
# "--userns", "host", # Removed to avoid hang
"--user", "0:0",
"--entrypoint", "/bin/chown",
"-v", f"{host_path}:/mnt",
IMAGE,
"-R", "0:0", "/mnt",
]
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
# Set up a restrictive directory (root owned, 755)
target_dir = paths["app_db"]
_chown_root_safe(target_dir)
target_dir.chmod(0o755)
# Mount ALL volumes to avoid 'find' errors in 0-storage-permission.sh
keys = {"data", "app_db", "app_config", "app_log", "app_api", "services_run", "nginx_conf"}
volumes = _build_volume_args_for_keys(paths, keys)
# Case 1: Running as non-root (default) - Should fail to write
# We disable host network/userns to avoid potential hangs in devcontainer environment
result = _run_container(
"restrictive-perms-user",
volumes,
user="20211:20211",
sleep_seconds=5,
network_mode=None,
userns_mode=None
)
assert result.returncode != 0 or "Permission denied" in result.output or "Unable to write" in result.output
# Case 2: Running as root - Should trigger the fix script
result_root = _run_container(
"restrictive-perms-root",
volumes,
user="0:0",
sleep_seconds=5,
network_mode=None,
userns_mode=None
)
_assert_contains(result_root, "NetAlertX is running as ROOT", result_root.args)
_assert_contains(result_root, "Permissions fixed for read-write paths", result_root.args)
check_cmd = [
"docker", "run", "--rm",
"--entrypoint", "/bin/sh",
"--user", "20211:20211",
IMAGE,
"-c", "ls -ldn /data/db && touch /data/db/test_write_after_fix"
]
# Add all volumes to check_cmd too
for host_path, target, _readonly in volumes:
check_cmd.extend(["-v", f"{host_path}:{target}"])
check_result = subprocess.run(
check_cmd,
capture_output=True,
text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if check_result.returncode != 0:
print(f"Check command failed. Cmd: {check_cmd}")
print(f"Stderr: {check_result.stderr}")
print(f"Stdout: {check_result.stdout}")
assert check_result.returncode == 0, f"Should be able to write after root fix script runs. Stderr: {check_result.stderr}. Stdout: {check_result.stdout}"