mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-30 18:38:13 -05:00
1260 lines
42 KiB
Python
1260 lines
42 KiB
Python
"""
|
||
This set of tests requires netalertx-test image built. Ensure netalertx-test image is built prior
|
||
to starting these tests or they will fail. netalertx-test image is generally rebuilt using the
|
||
Build Unit Test Docker Image task. but can be created manually with the following command executed
|
||
in the workspace:
|
||
docker buildx build -t netalertx-test .
|
||
"""
|
||
|
||
import os
|
||
import pathlib
|
||
import shutil
|
||
import subprocess
|
||
import uuid
|
||
import re
|
||
import pytest
|
||
|
||
IMAGE = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
|
||
GRACE_SECONDS = float(os.environ.get("NETALERTX_TEST_GRACE", "2"))
|
||
DEFAULT_CAPS = ["NET_RAW", "NET_ADMIN", "NET_BIND_SERVICE"]
|
||
SUBPROCESS_TIMEOUT_SECONDS = float(os.environ.get("NETALERTX_TEST_SUBPROCESS_TIMEOUT", "60"))
|
||
|
||
CONTAINER_TARGETS: dict[str, str] = {
|
||
"data": "/data",
|
||
"app_db": "/data/db",
|
||
"data_db": "/data/db",
|
||
"app_config": "/data/config",
|
||
"data_config": "/data/config",
|
||
"app_log": "/tmp/log",
|
||
"log": "/tmp/log",
|
||
"app_api": os.environ.get("NETALERTX_API", "/tmp/api"),
|
||
"api": os.environ.get("NETALERTX_API", "/tmp/api"),
|
||
"nginx_conf": "/tmp/nginx/active-config",
|
||
"nginx_active": "/tmp/nginx/active-config",
|
||
"services_run": "/tmp/run",
|
||
}
|
||
|
||
DATA_SUBDIR_KEYS = ("app_db", "app_config")
|
||
OPTIONAL_TMP_KEYS = ("app_log", "app_api", "nginx_conf", "services_run")
|
||
|
||
VOLUME_MAP = CONTAINER_TARGETS
|
||
|
||
pytestmark = [pytest.mark.docker, pytest.mark.feature_complete]
|
||
|
||
|
||
def _unique_label(prefix: str) -> str:
|
||
return f"{prefix.upper()}__NETALERTX_INTENTIONAL__{uuid.uuid4().hex[:6]}"
|
||
|
||
|
||
def _repo_root() -> pathlib.Path:
|
||
env = os.environ.get("NETALERTX_REPO_ROOT")
|
||
if env:
|
||
return pathlib.Path(env)
|
||
cur = pathlib.Path(__file__).resolve()
|
||
for parent in cur.parents:
|
||
if any(
|
||
[
|
||
(parent / "pyproject.toml").exists(),
|
||
(parent / ".git").exists(),
|
||
(parent / "back").exists() and (parent / "db").exists(),
|
||
]
|
||
):
|
||
return parent
|
||
return cur.parents[2]
|
||
|
||
|
||
def _docker_visible_tmp_root() -> pathlib.Path:
|
||
"""Return a docker-daemon-visible scratch directory for bind mounts.
|
||
|
||
Pytest's default tmp_path lives under /tmp inside the devcontainer, which may
|
||
not be visible to the Docker daemon that evaluates bind mount source paths.
|
||
We use /tmp/pytest-docker-mounts instead of the repo.
|
||
"""
|
||
|
||
root = pathlib.Path("/tmp/pytest-docker-mounts")
|
||
root.mkdir(parents=True, exist_ok=True)
|
||
try:
|
||
root.chmod(0o777)
|
||
except PermissionError:
|
||
# Best-effort; the directory only needs to be writable by the current user.
|
||
pass
|
||
return root
|
||
|
||
|
||
def _docker_visible_path(path: pathlib.Path) -> pathlib.Path:
|
||
"""Map a path into `_docker_visible_tmp_root()` when it lives under /tmp."""
|
||
|
||
try:
|
||
if str(path).startswith("/tmp/"):
|
||
return _docker_visible_tmp_root() / path.name
|
||
except Exception:
|
||
pass
|
||
return path
|
||
|
||
|
||
def _setup_mount_tree(
|
||
tmp_path: pathlib.Path,
|
||
prefix: str,
|
||
*,
|
||
seed_config: bool = True,
|
||
seed_db: bool = True,
|
||
) -> dict[str, pathlib.Path]:
|
||
"""Create a compose-like host tree with permissive perms for arbitrary UID/GID."""
|
||
|
||
label = _unique_label(prefix)
|
||
base = _docker_visible_tmp_root() / f"{label}_MOUNT_ROOT"
|
||
base.mkdir()
|
||
base.chmod(0o777)
|
||
|
||
paths: dict[str, pathlib.Path] = {}
|
||
|
||
data_root = base / f"{label}_DATA_INTENTIONAL_NETALERTX_TEST"
|
||
data_root.mkdir(parents=True, exist_ok=True)
|
||
data_root.chmod(0o777)
|
||
paths["data"] = data_root
|
||
|
||
db_dir = data_root / "db"
|
||
db_dir.mkdir(exist_ok=True)
|
||
db_dir.chmod(0o777)
|
||
paths["app_db"] = db_dir
|
||
paths["data_db"] = db_dir
|
||
|
||
config_dir = data_root / "config"
|
||
config_dir.mkdir(exist_ok=True)
|
||
config_dir.chmod(0o777)
|
||
paths["app_config"] = config_dir
|
||
paths["data_config"] = config_dir
|
||
|
||
for key in OPTIONAL_TMP_KEYS:
|
||
folder_name = f"{label}_{key.upper()}_INTENTIONAL_NETALERTX_TEST"
|
||
host_path = base / folder_name
|
||
host_path.mkdir(parents=True, exist_ok=True)
|
||
host_path.chmod(0o777)
|
||
paths[key] = host_path
|
||
if key == "app_log":
|
||
paths["log"] = host_path
|
||
elif key == "app_api":
|
||
paths["api"] = host_path
|
||
elif key == "nginx_conf":
|
||
paths["nginx_active"] = host_path
|
||
|
||
repo_root = _repo_root()
|
||
if seed_config:
|
||
config_src = repo_root / "back" / "app.conf"
|
||
config_dst = paths["app_config"] / "app.conf"
|
||
if config_src.exists():
|
||
shutil.copyfile(config_src, config_dst)
|
||
config_dst.chmod(0o666)
|
||
if seed_db:
|
||
db_src = repo_root / "db" / "app.db"
|
||
db_dst = paths["app_db"] / "app.db"
|
||
if db_src.exists():
|
||
shutil.copyfile(db_src, db_dst)
|
||
db_dst.chmod(0o666)
|
||
|
||
# Ensure every mount point is world-writable so arbitrary UID/GID can write
|
||
for p in paths.values():
|
||
if p.is_dir():
|
||
p.chmod(0o777)
|
||
for child in p.iterdir():
|
||
if child.is_dir():
|
||
child.chmod(0o777)
|
||
else:
|
||
child.chmod(0o666)
|
||
else:
|
||
p.chmod(0o666)
|
||
|
||
return paths
|
||
|
||
|
||
def _setup_fixed_mount_tree(base: pathlib.Path) -> dict[str, pathlib.Path]:
|
||
base = _docker_visible_path(base)
|
||
|
||
if base.exists():
|
||
shutil.rmtree(base)
|
||
base.mkdir(parents=True)
|
||
try:
|
||
base.chmod(0o777)
|
||
except PermissionError:
|
||
pass
|
||
|
||
paths: dict[str, pathlib.Path] = {}
|
||
|
||
data_root = base / "DATA_NETALERTX_TEST"
|
||
data_root.mkdir(parents=True, exist_ok=True)
|
||
data_root.chmod(0o777)
|
||
paths["data"] = data_root
|
||
|
||
db_dir = data_root / "db"
|
||
db_dir.mkdir(exist_ok=True)
|
||
db_dir.chmod(0o777)
|
||
paths["app_db"] = db_dir
|
||
paths["data_db"] = db_dir
|
||
|
||
config_dir = data_root / "config"
|
||
config_dir.mkdir(exist_ok=True)
|
||
config_dir.chmod(0o777)
|
||
paths["app_config"] = config_dir
|
||
paths["data_config"] = config_dir
|
||
|
||
for key in OPTIONAL_TMP_KEYS:
|
||
host_path = base / f"{key.upper()}_NETALERTX_TEST"
|
||
host_path.mkdir(parents=True, exist_ok=True)
|
||
host_path.chmod(0o777)
|
||
paths[key] = host_path
|
||
if key == "app_log":
|
||
paths["log"] = host_path
|
||
elif key == "app_api":
|
||
paths["api"] = host_path
|
||
elif key == "nginx_conf":
|
||
paths["nginx_active"] = host_path
|
||
return paths
|
||
|
||
|
||
def _build_volume_args(
|
||
paths: dict[str, pathlib.Path],
|
||
) -> list[tuple[str, str, bool]]:
|
||
return _build_volume_args_for_keys(paths, {"data"})
|
||
|
||
|
||
def _build_volume_args_for_keys(
|
||
paths: dict[str, pathlib.Path],
|
||
keys: set[str],
|
||
read_only: set[str] | None = None,
|
||
) -> list[tuple[str, str, bool]]:
|
||
bindings: list[tuple[str, str, bool]] = []
|
||
read_only = read_only or set()
|
||
for key in keys:
|
||
if key not in CONTAINER_TARGETS:
|
||
raise KeyError(f"Unknown mount key {key}")
|
||
target = CONTAINER_TARGETS[key]
|
||
if key not in paths:
|
||
raise KeyError(f"Missing host path for key {key}")
|
||
bindings.append((str(paths[key]), target, key in read_only))
|
||
return bindings
|
||
|
||
|
||
def _chown_path(host_path: pathlib.Path, uid: int, gid: int) -> None:
|
||
"""Chown a host path using the test image with host user namespace."""
|
||
|
||
if not host_path.exists():
|
||
raise RuntimeError(f"Cannot chown missing path {host_path}")
|
||
|
||
cmd = [
|
||
"docker",
|
||
"run",
|
||
"--rm",
|
||
"--userns",
|
||
"host",
|
||
"--user",
|
||
"0:0",
|
||
"--entrypoint",
|
||
"/bin/chown",
|
||
"-v",
|
||
f"{host_path}:/mnt",
|
||
IMAGE,
|
||
"-R",
|
||
f"{uid}:{gid}",
|
||
"/mnt",
|
||
]
|
||
|
||
try:
|
||
subprocess.run(
|
||
cmd,
|
||
check=True,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
except subprocess.CalledProcessError as exc:
|
||
raise RuntimeError(f"Failed to chown {host_path} to {uid}:{gid}") from exc
|
||
|
||
|
||
def _chown_root(host_path: pathlib.Path) -> None:
|
||
_chown_path(host_path, 0, 0)
|
||
|
||
|
||
def _chown_netalertx(host_path: pathlib.Path) -> None:
|
||
_chown_path(host_path, 20211, 20211)
|
||
|
||
|
||
def _docker_volume_rm(volume_name: str) -> None:
|
||
subprocess.run(
|
||
["docker", "volume", "rm", "-f", volume_name],
|
||
check=False,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
|
||
def _docker_volume_create(volume_name: str) -> None:
|
||
subprocess.run(
|
||
["docker", "volume", "create", volume_name],
|
||
check=True,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
|
||
def _fresh_named_volume(prefix: str) -> str:
|
||
name = _unique_label(prefix).lower().replace("__", "-")
|
||
# Ensure we're exercising Docker's fresh-volume copy-up behavior.
|
||
_docker_volume_rm(name)
|
||
return name
|
||
|
||
|
||
def _ensure_volume_copy_up(volume_name: str) -> None:
|
||
"""Ensure a named volume is initialized from the NetAlertX image.
|
||
|
||
If we write into the volume first (e.g., with an Alpine helper container),
|
||
Docker will not perform the image-to-volume copy-up and the volume root may
|
||
stay root:root 0755, breaking arbitrary UID/GID runs.
|
||
"""
|
||
|
||
subprocess.run(
|
||
[
|
||
"docker",
|
||
"run",
|
||
"--rm",
|
||
"--userns",
|
||
"host",
|
||
"-v",
|
||
f"{volume_name}:/data",
|
||
"--entrypoint",
|
||
"/bin/sh",
|
||
IMAGE,
|
||
"-c",
|
||
"true",
|
||
],
|
||
check=True,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
|
||
def _seed_volume_text_file(
|
||
volume_name: str,
|
||
container_path: str,
|
||
content: str,
|
||
*,
|
||
chmod_mode: str = "644",
|
||
user: str | None = None,
|
||
) -> None:
|
||
"""Create/overwrite a text file inside a named volume.
|
||
|
||
Uses a tiny helper container so we don't rely on bind mounts (which are
|
||
resolved on the Docker daemon host).
|
||
"""
|
||
|
||
cmd = [
|
||
"docker",
|
||
"run",
|
||
"--rm",
|
||
"--userns",
|
||
"host",
|
||
]
|
||
if user:
|
||
cmd.extend(["--user", user])
|
||
cmd.extend(
|
||
[
|
||
"-v",
|
||
f"{volume_name}:/data",
|
||
"alpine:3.22",
|
||
"sh",
|
||
"-c",
|
||
f"set -eu; mkdir -p \"$(dirname '{container_path}')\"; cat > '{container_path}'; chmod {chmod_mode} '{container_path}'",
|
||
]
|
||
)
|
||
|
||
subprocess.run(
|
||
cmd,
|
||
input=content,
|
||
text=True,
|
||
check=True,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
|
||
def _volume_has_file(volume_name: str, container_path: str) -> bool:
|
||
return (
|
||
subprocess.run(
|
||
[
|
||
"docker",
|
||
"run",
|
||
"--rm",
|
||
"--userns",
|
||
"host",
|
||
"-v",
|
||
f"{volume_name}:/data",
|
||
"alpine:3.22",
|
||
"sh",
|
||
"-c",
|
||
f"test -f '{container_path}'",
|
||
],
|
||
check=False,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
).returncode
|
||
== 0
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize(
|
||
"uid_gid",
|
||
[
|
||
(1001, 1001),
|
||
(1502, 1502),
|
||
],
|
||
)
|
||
def test_nonroot_custom_uid_logs_note(
|
||
tmp_path: pathlib.Path,
|
||
uid_gid: tuple[int, int],
|
||
) -> None:
|
||
"""Ensure arbitrary non-root UID/GID can run with compose-like mounts."""
|
||
|
||
uid, gid = uid_gid
|
||
|
||
vol = _fresh_named_volume(f"note_uid_{uid}")
|
||
try:
|
||
# Fresh named volume at /data: matches default docker-compose UX.
|
||
result = _run_container(
|
||
f"note-uid-{uid}",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
user=f"{uid}:{gid}",
|
||
sleep_seconds=5,
|
||
)
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
|
||
_assert_contains(result, f"NetAlertX note: current UID {uid} GID {gid}", result.args)
|
||
assert "expected UID" in result.output
|
||
assert result.returncode == 0
|
||
|
||
|
||
def _run_container(
|
||
label: str,
|
||
volumes: list[tuple[str, str, bool]] | None = None,
|
||
*,
|
||
env: dict[str, str] | None = None,
|
||
user: str | None = None,
|
||
drop_caps: list[str] | None = None,
|
||
network_mode: str | None = "host",
|
||
extra_args: list[str] | None = None,
|
||
volume_specs: list[str] | None = None,
|
||
sleep_seconds: float = GRACE_SECONDS,
|
||
wait_for_exit: bool = False,
|
||
pre_entrypoint: str | None = None,
|
||
userns_mode: str | None = "host",
|
||
image: str = IMAGE,
|
||
) -> subprocess.CompletedProcess[str]:
|
||
name = f"netalertx-test-{label}-{uuid.uuid4().hex[:8]}".lower()
|
||
|
||
tmp_uid = 20211
|
||
tmp_gid = 20211
|
||
if user:
|
||
try:
|
||
u_str, g_str = user.split(":", 1)
|
||
tmp_uid = int(u_str)
|
||
tmp_gid = int(g_str)
|
||
except Exception:
|
||
# Keep defaults if user format is unexpected.
|
||
tmp_uid = 20211
|
||
tmp_gid = 20211
|
||
|
||
# Clean up any existing container with this name
|
||
subprocess.run(
|
||
["docker", "rm", "-f", name],
|
||
check=False,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
cmd: list[str] = ["docker", "run", "--rm", "--name", name]
|
||
|
||
# Avoid flakiness in host-network runs when the host already uses the
|
||
# default NetAlertX ports. Tests can still override explicitly via `env`.
|
||
effective_env: dict[str, str] = dict(env or {})
|
||
if network_mode == "host":
|
||
if "PORT" not in effective_env:
|
||
effective_env["PORT"] = str(30000 + (int(uuid.uuid4().hex[:4], 16) % 20000))
|
||
if "GRAPHQL_PORT" not in effective_env:
|
||
gql = 30000 + (int(uuid.uuid4().hex[4:8], 16) % 20000)
|
||
if str(gql) == effective_env["PORT"]:
|
||
gql = 30000 + ((gql + 1) % 20000)
|
||
effective_env["GRAPHQL_PORT"] = str(gql)
|
||
|
||
if network_mode:
|
||
cmd.extend(["--network", network_mode])
|
||
if userns_mode:
|
||
cmd.extend(["--userns", userns_mode])
|
||
# Match docker-compose UX: /tmp is tmpfs with 1700 and owned by the runtime UID/GID.
|
||
cmd.extend(["--tmpfs", f"/tmp:mode=1700,uid={tmp_uid},gid={tmp_gid}"])
|
||
if user:
|
||
cmd.extend(["--user", user])
|
||
if drop_caps is not None:
|
||
for cap in drop_caps:
|
||
cmd.extend(["--cap-drop", cap])
|
||
else:
|
||
cmd.extend(["--cap-drop", "ALL"])
|
||
for cap in DEFAULT_CAPS:
|
||
cmd.extend(["--cap-add", cap])
|
||
if effective_env:
|
||
for key, value in effective_env.items():
|
||
cmd.extend(["-e", f"{key}={value}"])
|
||
if extra_args:
|
||
cmd.extend(extra_args)
|
||
for host_path, target, readonly in volumes or []:
|
||
mount = f"{host_path}:{target}"
|
||
if readonly:
|
||
mount += ":ro"
|
||
cmd.extend(["-v", mount])
|
||
if volume_specs:
|
||
for spec in volume_specs:
|
||
cmd.extend(["-v", spec])
|
||
|
||
# Diagnostic wrapper: list ownership and perms of mounted targets inside
|
||
# the container before running the real entrypoint. This helps debug
|
||
# permission failures by capturing the container's view of the host mounts.
|
||
mounts_ls = """
|
||
echo "--- MOUNT PERMS (container view) ---";
|
||
ls -ldn \
|
||
"""
|
||
for _, target, _ in volumes or []:
|
||
mounts_ls += f" {target}"
|
||
mounts_ls += " || true; echo '--- END MOUNTS ---'; \n"
|
||
|
||
setup_script = ""
|
||
if pre_entrypoint:
|
||
setup_script = pre_entrypoint
|
||
if not setup_script.endswith("\n"):
|
||
setup_script += "\n"
|
||
|
||
if wait_for_exit:
|
||
script = mounts_ls + setup_script + "sh /entrypoint.sh"
|
||
else:
|
||
script = "".join([
|
||
mounts_ls,
|
||
setup_script,
|
||
"sh /entrypoint.sh & pid=$!; ",
|
||
f"sleep {sleep_seconds}; ",
|
||
"if kill -0 $pid >/dev/null 2>&1; then kill -TERM $pid >/dev/null 2>&1 || true; fi; ",
|
||
"wait $pid; code=$?; if [ $code -eq 143 ]; then exit 0; fi; exit $code"
|
||
])
|
||
cmd.extend(["--entrypoint", "/bin/sh", image, "-c", script])
|
||
|
||
# Print the full Docker command for debugging
|
||
print("\n--- DOCKER CMD ---\n", " ".join(cmd), "\n--- END CMD ---\n")
|
||
result = subprocess.run(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
timeout=max(SUBPROCESS_TIMEOUT_SECONDS, sleep_seconds + 30),
|
||
check=False,
|
||
)
|
||
# Combine and clean stdout and stderr
|
||
stdouterr = re.sub(r"\x1b\[[0-9;]*m", "", result.stdout or "") + re.sub(
|
||
r"\x1b\[[0-9;]*m", "", result.stderr or ""
|
||
)
|
||
result.output = stdouterr
|
||
# Print container output for debugging in every test run.
|
||
print("\n--- CONTAINER OUTPUT START ---")
|
||
print(result.output)
|
||
print("--- CONTAINER OUTPUT END ---\n")
|
||
|
||
return result
|
||
|
||
|
||
def _assert_contains(result, snippet: str, cmd: list[str] = None) -> None:
|
||
output = result.output + result.stderr
|
||
if snippet not in output:
|
||
cmd_str = " ".join(cmd) if cmd else ""
|
||
raise AssertionError(
|
||
f"Expected to find '{snippet}' in container output.\n"
|
||
f"STDOUT:\n{result.output}\n"
|
||
f"STDERR:\n{result.stderr}\n"
|
||
f"Combined output:\n{output}\n"
|
||
f"Container command:\n{cmd_str}"
|
||
)
|
||
|
||
|
||
def _extract_mount_rows(output: str) -> dict[str, list[str]]:
|
||
rows: dict[str, list[str]] = {}
|
||
in_table = False
|
||
expected_cols = 0
|
||
|
||
for raw_line in (output or "").splitlines():
|
||
line = raw_line.rstrip()
|
||
if not in_table:
|
||
if line.startswith(" Path") and "Writeable" in line:
|
||
# Legacy format: Path | Writeable | Mount | RAMDisk | Performance | DataLoss
|
||
in_table = True
|
||
expected_cols = 5
|
||
elif line.startswith(" Path") and "| R" in line and "| W" in line:
|
||
# Current format: Path | R | W | Mount | RAMDisk | Performance | DataLoss
|
||
in_table = True
|
||
expected_cols = 6
|
||
continue
|
||
|
||
if not line.strip():
|
||
break
|
||
if line.lstrip().startswith("Path"):
|
||
continue
|
||
if set(line.strip()) <= {"-", "+"}:
|
||
continue
|
||
|
||
parts = [part.strip() for part in line.split("|")]
|
||
if len(parts) < 1 + expected_cols:
|
||
continue
|
||
path = parts[0].strip()
|
||
if not path:
|
||
continue
|
||
rows[path] = parts[1 : 1 + expected_cols]
|
||
|
||
return rows
|
||
|
||
|
||
def _assert_mount_row(
|
||
result,
|
||
path: str,
|
||
*,
|
||
write: str | None = None,
|
||
mount: str | None = None,
|
||
ramdisk: str | None = None,
|
||
performance: str | None = None,
|
||
dataloss: str | None = None,
|
||
) -> None:
|
||
rows = _extract_mount_rows(result.output)
|
||
if path not in rows:
|
||
raise AssertionError(
|
||
f"Mount table row for {path} not found. Rows: {sorted(rows)}\nOutput:\n{result.output}"
|
||
)
|
||
columns = rows[path]
|
||
|
||
# Legacy: [Writeable, Mount, RAMDisk, Performance, DataLoss]
|
||
# Current: [R, W, Mount, RAMDisk, Performance, DataLoss]
|
||
if len(columns) == 5:
|
||
label_to_value = {
|
||
"Writeable": columns[0],
|
||
"Mount": columns[1],
|
||
"RAMDisk": columns[2],
|
||
"Performance": columns[3],
|
||
"DataLoss": columns[4],
|
||
}
|
||
write_label = "Writeable"
|
||
elif len(columns) == 6:
|
||
label_to_value = {
|
||
"R": columns[0],
|
||
"W": columns[1],
|
||
"Mount": columns[2],
|
||
"RAMDisk": columns[3],
|
||
"Performance": columns[4],
|
||
"DataLoss": columns[5],
|
||
}
|
||
write_label = "W"
|
||
else:
|
||
raise AssertionError(
|
||
f"Unexpected mount table column count for {path}: {len(columns)}. Columns: {columns}\nOutput:\n{result.output}"
|
||
)
|
||
|
||
checks = [
|
||
(write_label, write),
|
||
("Mount", mount),
|
||
("RAMDisk", ramdisk),
|
||
("Performance", performance),
|
||
("DataLoss", dataloss),
|
||
]
|
||
|
||
for label, expected in checks:
|
||
if expected is None:
|
||
continue
|
||
actual = label_to_value.get(label)
|
||
if actual != expected:
|
||
raise AssertionError(
|
||
f"{path} {label} expected {expected}, got {actual}.\n"
|
||
f"Row: {label_to_value}\nOutput:\n{result.output}"
|
||
)
|
||
|
||
|
||
def _setup_zero_perm_dir(paths: dict[str, pathlib.Path], key: str) -> None:
|
||
"""Set up a directory with files and zero permissions for testing."""
|
||
if key in ["app_db", "app_config"]:
|
||
# Files already exist from _setup_mount_tree seeding
|
||
pass
|
||
else:
|
||
# Create a dummy file for other directories
|
||
(paths[key] / "dummy.txt").write_text("dummy")
|
||
|
||
# Chmod all files in the directory to 000
|
||
for f in paths[key].iterdir():
|
||
f.chmod(0)
|
||
|
||
# Chmod the directory itself to 000
|
||
paths[key].chmod(0)
|
||
|
||
|
||
def _restore_zero_perm_dir(paths: dict[str, pathlib.Path], key: str) -> None:
|
||
"""Restore permissions after zero perm test."""
|
||
# Chmod directory back to 700
|
||
paths[key].chmod(0o700)
|
||
|
||
# Chmod files back to appropriate permissions
|
||
for f in paths[key].iterdir():
|
||
if f.name in ["app.db", "app.conf"]:
|
||
f.chmod(0o600)
|
||
else:
|
||
f.chmod(0o644)
|
||
|
||
|
||
def test_missing_capabilities_triggers_warning(tmp_path: pathlib.Path) -> None:
|
||
"""Test missing required capabilities - simulates insufficient container privileges.
|
||
|
||
5. Missing Required Capabilities: Simulates running without NET_ADMIN, NET_RAW,
|
||
NET_BIND_SERVICE capabilities. Required for ARP scanning and network operations.
|
||
Expected: "exec /bin/sh: operation not permitted" error, guidance to add capabilities.
|
||
|
||
Check script: N/A (capability check happens at container runtime)
|
||
Sample message: "exec /bin/sh: operation not permitted"
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "missing_caps")
|
||
volumes = _build_volume_args_for_keys(paths, {"data"})
|
||
result = _run_container(
|
||
"missing-caps",
|
||
volumes,
|
||
drop_caps=["ALL"],
|
||
)
|
||
_assert_contains(result, "exec /bin/sh: operation not permitted", result.args)
|
||
assert result.returncode != 0
|
||
|
||
|
||
def test_running_as_root_is_blocked(tmp_path: pathlib.Path) -> None:
|
||
"""Test running as root user - simulates insecure container execution.
|
||
|
||
6. Running as Root User: Simulates running container as root (UID 0) instead of
|
||
dedicated netalertx user. Warning about security risks, special permission fix mode.
|
||
Expected: Warning about security risks, guidance to use UID 20211.
|
||
|
||
Check script: /entrypoint.d/0-storage-permission.sh
|
||
Sample message: "🚨 CRITICAL SECURITY ALERT: NetAlertX is running as ROOT (UID 0)!"
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "run_as_root")
|
||
volumes = _build_volume_args_for_keys(paths, {"data", "nginx_conf"})
|
||
result = _run_container(
|
||
"run-as-root",
|
||
volumes,
|
||
user="0",
|
||
)
|
||
_assert_contains(result, "NetAlertX is running as ROOT", result.args)
|
||
_assert_contains(result, "Permissions fixed for read-write paths.", result.args)
|
||
assert (
|
||
result.returncode == 0
|
||
) # container warns but continues running, then terminated by test framework
|
||
|
||
|
||
def test_missing_host_network_warns(tmp_path: pathlib.Path) -> None:
|
||
# No output assertion, just returncode check
|
||
"""Test missing host networking - simulates running without host network mode.
|
||
|
||
8. Missing Host Networking: Simulates running without network_mode: host.
|
||
Limits ARP scanning capabilities for network discovery.
|
||
Expected: Warning about ARP scanning limitations, guidance to use host networking.
|
||
|
||
Check script: check-network-mode.sh
|
||
Sample message: "⚠️ ATTENTION: NetAlertX is not running with --network=host. Bridge networking..."
|
||
"""
|
||
vol = _fresh_named_volume("missing_host_network")
|
||
try:
|
||
result = _run_container(
|
||
"missing-host-network",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
network_mode=None,
|
||
sleep_seconds=15,
|
||
)
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
_assert_contains(result, "not running with --network=host", result.args)
|
||
|
||
|
||
# NOTE: The following runtime-behavior tests depended on the entrypoint continuing even when
|
||
# /data was mounted without write permissions. With fail-fast enabled we must supply a pre-owned
|
||
# (UID/GID 20211) data volume, which this dev container cannot provide for bind mounts. Once the
|
||
# docker tests switch to compose-managed fixtures, restore these cases by moving them back to the
|
||
# top level.
|
||
|
||
|
||
|
||
def test_missing_app_conf_triggers_seed(tmp_path: pathlib.Path) -> None:
|
||
"""Test missing configuration file seeding - simulates corrupted/missing app.conf.
|
||
|
||
9. Missing Configuration File: Simulates corrupted/missing app.conf.
|
||
Container automatically regenerates default configuration on startup.
|
||
Expected: Automatic regeneration of default configuration.
|
||
|
||
Check script: /entrypoint.d/15-first-run-config.sh
|
||
Sample message: "Default configuration written to"
|
||
"""
|
||
vol = _fresh_named_volume("missing_app_conf")
|
||
try:
|
||
result = _run_container(
|
||
"missing-app-conf",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
sleep_seconds=15,
|
||
)
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
_assert_contains(result, "Default configuration written to", result.args)
|
||
assert result.returncode == 0
|
||
|
||
|
||
def test_missing_app_db_triggers_seed(tmp_path: pathlib.Path) -> None:
|
||
"""Test missing database file seeding - simulates corrupted/missing app.db.
|
||
|
||
10. Missing Database File: Simulates corrupted/missing app.db.
|
||
Container automatically creates initial database schema on startup.
|
||
Expected: Automatic creation of initial database schema.
|
||
|
||
Check script: /entrypoint.d/20-first-run-db.sh
|
||
Sample message: "Building initial database schema"
|
||
"""
|
||
vol = _fresh_named_volume("missing_app_db")
|
||
try:
|
||
_ensure_volume_copy_up(vol)
|
||
# Seed only app.conf; leave app.db missing to trigger first-run DB schema creation.
|
||
_seed_volume_text_file(
|
||
vol,
|
||
"/data/config/app.conf",
|
||
"TIMEZONE='UTC'\n",
|
||
chmod_mode="644",
|
||
user="20211:20211",
|
||
)
|
||
result = _run_container(
|
||
"missing-app-db",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
user="20211:20211",
|
||
sleep_seconds=20,
|
||
)
|
||
assert _volume_has_file(vol, "/data/db/app.db")
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
assert result.returncode == 0
|
||
|
||
|
||
def test_custom_port_without_writable_conf(tmp_path: pathlib.Path) -> None:
|
||
"""Test custom port configuration without writable nginx config mount.
|
||
|
||
4. Custom Port Without Nginx Config Mount: Simulates setting custom LISTEN_ADDR/PORT
|
||
without mounting nginx config. Container starts but uses default address.
|
||
Expected: Container starts but uses default address, warning about missing config mount.
|
||
|
||
Check script: check-nginx-config.sh
|
||
Sample messages: "⚠️ ATTENTION: Nginx configuration mount /tmp/nginx/active-config is missing."
|
||
"⚠️ ATTENTION: Unable to write to /tmp/nginx/active-config/netalertx.conf."
|
||
"""
|
||
vol = _fresh_named_volume("custom_port_ro_conf")
|
||
extra_args = [
|
||
"--tmpfs",
|
||
f"{VOLUME_MAP['nginx_conf']}:uid=20211,gid=20211,mode=500",
|
||
]
|
||
try:
|
||
result = _run_container(
|
||
"custom-port-ro-conf",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
env={"PORT": "24444", "LISTEN_ADDR": "127.0.0.1"},
|
||
user="20211:20211",
|
||
extra_args=extra_args,
|
||
sleep_seconds=15,
|
||
)
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
_assert_contains(result, "Unable to write to", result.args)
|
||
_assert_contains(
|
||
result, f"{VOLUME_MAP['nginx_conf']}/netalertx.conf", result.args
|
||
)
|
||
assert result.returncode != 0
|
||
|
||
def test_excessive_capabilities_warning(tmp_path: pathlib.Path) -> None:
|
||
"""Test excessive capabilities detection - simulates container with extra capabilities.
|
||
|
||
11. Excessive Capabilities: Simulates container with capabilities beyond the required
|
||
NET_ADMIN, NET_RAW, and NET_BIND_SERVICE.
|
||
Expected: Warning about excessive capabilities detected.
|
||
|
||
Check script: 90-excessive-capabilities.sh
|
||
Sample message: "Excessive capabilities detected"
|
||
"""
|
||
vol = _fresh_named_volume("excessive_caps")
|
||
try:
|
||
result = _run_container(
|
||
"excessive-caps",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
extra_args=["--cap-add=SYS_ADMIN", "--cap-add=NET_BROADCAST"],
|
||
sleep_seconds=15,
|
||
)
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
_assert_contains(result, "Excessive capabilities detected", result.args)
|
||
_assert_contains(result, "bounding caps:", result.args)
|
||
|
||
def test_appliance_integrity_read_write_mode(tmp_path: pathlib.Path) -> None:
|
||
"""Test appliance integrity - simulates running with read-write root filesystem.
|
||
|
||
12. Appliance Integrity: Simulates running container with read-write root filesystem
|
||
instead of read-only mode.
|
||
Expected: Warning about running in read-write mode instead of read-only.
|
||
|
||
Check script: 95-appliance-integrity.sh
|
||
Sample message: "Container is running as read-write, not in read-only mode"
|
||
"""
|
||
vol = _fresh_named_volume("appliance_integrity")
|
||
try:
|
||
result = _run_container(
|
||
"appliance-integrity",
|
||
volumes=None,
|
||
volume_specs=[f"{vol}:/data"],
|
||
sleep_seconds=15,
|
||
)
|
||
finally:
|
||
_docker_volume_rm(vol)
|
||
_assert_contains(
|
||
result, "Container is running as read-write, not in read-only mode", result.args
|
||
)
|
||
|
||
|
||
def test_zero_permissions_app_db_dir(tmp_path: pathlib.Path) -> None:
|
||
"""Test zero permissions - simulates mounting directories/files with no permissions.
|
||
|
||
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
|
||
Tests directories and files with no read/write/execute permissions.
|
||
Expected: Mounts table shows ❌ for writeable status, configuration issues detected.
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "chmod_app_db")
|
||
_setup_zero_perm_dir(paths, "app_db")
|
||
volumes = _build_volume_args_for_keys(paths, {"data"})
|
||
try:
|
||
result = _run_container(
|
||
"chmod-app-db",
|
||
volumes,
|
||
user="20211:20211",
|
||
wait_for_exit=True,
|
||
)
|
||
# Check that the mounts table shows the app_db directory as not writeable
|
||
_assert_mount_row(result, VOLUME_MAP["app_db"], write="❌")
|
||
# Check that configuration issues are detected
|
||
_assert_contains(result, "Configuration issues detected", result.args)
|
||
assert result.returncode != 0
|
||
finally:
|
||
_restore_zero_perm_dir(paths, "app_db")
|
||
|
||
|
||
def test_zero_permissions_app_config_dir(tmp_path: pathlib.Path) -> None:
|
||
"""Test zero permissions - simulates mounting directories/files with no permissions.
|
||
|
||
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
|
||
Tests directories and files with no read/write/execute permissions.
|
||
Expected: Mounts table shows ❌ for writeable status, configuration issues detected.
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "chmod_app_config")
|
||
_setup_zero_perm_dir(paths, "app_config")
|
||
volumes = _build_volume_args_for_keys(paths, {"data"})
|
||
try:
|
||
result = _run_container(
|
||
"chmod-app-config",
|
||
volumes,
|
||
user="20211:20211",
|
||
wait_for_exit=True,
|
||
)
|
||
# Check that the mounts table shows the app_config directory as not writeable
|
||
_assert_mount_row(result, VOLUME_MAP["app_config"], write="❌")
|
||
# Check that configuration issues are detected
|
||
_assert_contains(result, "Configuration issues detected", result.args)
|
||
assert result.returncode != 0
|
||
finally:
|
||
_restore_zero_perm_dir(paths, "app_config")
|
||
|
||
|
||
def test_mandatory_folders_creation(tmp_path: pathlib.Path) -> None:
|
||
"""Test mandatory folders creation - simulates missing plugins log directory.
|
||
|
||
1. Mandatory Folders: Simulates missing required directories and log files.
|
||
Container automatically creates plugins log, system services run log/tmp directories,
|
||
and required log files on startup.
|
||
Expected: Automatic creation of all required directories and files.
|
||
|
||
Check script: 25-mandatory-folders.sh
|
||
Sample message: "Creating Plugins log"
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "mandatory_folders")
|
||
# Remove the plugins log directory to simulate missing mandatory folder
|
||
plugins_log_dir = paths["app_log"] / "plugins"
|
||
if plugins_log_dir.exists():
|
||
shutil.rmtree(plugins_log_dir)
|
||
|
||
# Ensure other directories are writable and owned by netalertx user so container gets past mounts.py
|
||
for key in [
|
||
"data",
|
||
"app_db",
|
||
"app_config",
|
||
"app_log",
|
||
"app_api",
|
||
"services_run",
|
||
"nginx_conf",
|
||
]:
|
||
paths[key].chmod(0o777)
|
||
_chown_netalertx(paths[key]) # Ensure all directories are owned by netalertx
|
||
|
||
volumes = _build_volume_args_for_keys(paths, {"data"})
|
||
result = _run_container(
|
||
"mandatory-folders", volumes, user="20211:20211", sleep_seconds=5
|
||
)
|
||
_assert_contains(result, "Creating Plugins log", result.args)
|
||
# The container will fail at writable config due to permission issues, but we just want to verify
|
||
# that mandatory folders creation ran successfully
|
||
|
||
|
||
def test_writable_config_validation(tmp_path: pathlib.Path) -> None:
|
||
"""Test writable config validation - simulates invalid config file type.
|
||
|
||
3. Writable Config Validation: Simulates app.conf being a non-regular file (directory).
|
||
Container verifies it can read from and write to critical config and database files.
|
||
Expected: "Path is not a regular file" warning for config file.
|
||
|
||
Check script: 35-writable-config.sh
|
||
Sample message: "Path is not a regular file"
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "writable_config")
|
||
# Force a non-regular file for /data/config/app.conf to exercise the correct warning branch.
|
||
config_path = paths["app_config"] / "app.conf"
|
||
if config_path.exists():
|
||
if config_path.is_dir():
|
||
shutil.rmtree(config_path)
|
||
else:
|
||
config_path.unlink()
|
||
config_path.mkdir(parents=False)
|
||
config_path.chmod(0o777)
|
||
_chown_netalertx(config_path)
|
||
|
||
# Ensure directories are writable and owned by netalertx user so container gets past mounts.py
|
||
for key in [
|
||
"data",
|
||
"app_db",
|
||
"app_config",
|
||
"app_log",
|
||
"app_api",
|
||
"services_run",
|
||
"nginx_conf",
|
||
]:
|
||
paths[key].chmod(0o777)
|
||
_chown_netalertx(paths[key])
|
||
|
||
volumes = _build_volume_args_for_keys(paths, {"data"})
|
||
result = _run_container(
|
||
"writable-config", volumes, user="20211:20211", sleep_seconds=5.0
|
||
)
|
||
_assert_contains(result, "ATTENTION: Path is not a regular file.", result.args)
|
||
|
||
|
||
def test_mount_analysis_ram_disk_performance(tmp_path: pathlib.Path) -> None:
|
||
"""Test mount analysis for RAM disk performance issues.
|
||
|
||
Tests 10-mounts.py detection of persistent paths on RAM disks (tmpfs) which can cause
|
||
performance issues and data loss on container restart.
|
||
Expected: Mounts table shows ❌ for RAMDisk on persistent paths, performance warnings.
|
||
|
||
Check script: 10-mounts.py
|
||
Sample message: "Configuration issues detected"
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "ram_disk_mount")
|
||
# Mount persistent paths (db, config) on tmpfs to simulate RAM disk
|
||
volumes = [
|
||
(str(paths["app_log"]), VOLUME_MAP["app_log"], False),
|
||
(str(paths["app_api"]), VOLUME_MAP["app_api"], False),
|
||
(str(paths["services_run"]), VOLUME_MAP["services_run"], False),
|
||
(str(paths["nginx_conf"]), VOLUME_MAP["nginx_conf"], False),
|
||
]
|
||
# Use tmpfs mounts for persistent paths with proper permissions
|
||
extra_args = [
|
||
"--tmpfs",
|
||
f"{VOLUME_MAP['app_db']}:uid=20211,gid=20211,mode=755",
|
||
"--tmpfs",
|
||
f"{VOLUME_MAP['app_config']}:uid=20211,gid=20211,mode=755",
|
||
]
|
||
result = _run_container(
|
||
"ram-disk-mount", volumes=volumes, extra_args=extra_args, user="20211:20211"
|
||
)
|
||
# Check that mounts table shows RAM disk detection for persistent paths
|
||
_assert_mount_row(
|
||
result,
|
||
VOLUME_MAP["app_db"],
|
||
write="✅",
|
||
mount="✅",
|
||
ramdisk="❌",
|
||
performance="➖",
|
||
dataloss="❌",
|
||
)
|
||
_assert_mount_row(
|
||
result,
|
||
VOLUME_MAP["app_config"],
|
||
write="✅",
|
||
mount="✅",
|
||
ramdisk="❌",
|
||
performance="➖",
|
||
dataloss="❌",
|
||
)
|
||
# Check that configuration issues are detected due to dataloss risk
|
||
_assert_contains(result, "Configuration issues detected", result.args)
|
||
assert result.returncode != 0
|
||
|
||
|
||
def test_mount_analysis_dataloss_risk(tmp_path: pathlib.Path) -> None:
|
||
"""Test mount analysis for dataloss risk on non-persistent filesystems.
|
||
|
||
Tests 10-mounts.py detection when persistent database/config paths are
|
||
mounted on non-persistent filesystems (tmpfs, ramfs).
|
||
Expected: Mounts table shows dataloss risk warnings for persistent paths on tmpfs.
|
||
|
||
Check script: 10-mounts.py
|
||
Sample message: "Configuration issues detected"
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "dataloss_risk")
|
||
# Mount persistent paths (db, config) on tmpfs to simulate non-persistent storage
|
||
volumes = [
|
||
(str(paths["app_log"]), VOLUME_MAP["app_log"], False),
|
||
(str(paths["app_api"]), VOLUME_MAP["app_api"], False),
|
||
(str(paths["services_run"]), VOLUME_MAP["services_run"], False),
|
||
(str(paths["nginx_conf"]), VOLUME_MAP["nginx_conf"], False),
|
||
]
|
||
# Use tmpfs mounts for persistent paths with proper permissions
|
||
extra_args = [
|
||
"--tmpfs",
|
||
f"{VOLUME_MAP['app_db']}:uid=20211,gid=20211,mode=755",
|
||
"--tmpfs",
|
||
f"{VOLUME_MAP['app_config']}:uid=20211,gid=20211,mode=755",
|
||
]
|
||
result = _run_container(
|
||
"dataloss-risk", volumes=volumes, extra_args=extra_args, user="20211:20211"
|
||
)
|
||
# Check that mounts table shows dataloss risk for persistent paths on tmpfs
|
||
_assert_mount_row(
|
||
result,
|
||
VOLUME_MAP["app_db"],
|
||
write="✅",
|
||
mount="✅",
|
||
ramdisk="❌",
|
||
performance="➖",
|
||
dataloss="❌",
|
||
)
|
||
_assert_mount_row(
|
||
result,
|
||
VOLUME_MAP["app_config"],
|
||
write="✅",
|
||
mount="✅",
|
||
ramdisk="❌",
|
||
performance="➖",
|
||
dataloss="❌",
|
||
)
|
||
# Check that configuration issues are detected due to dataloss risk
|
||
_assert_contains(result, "Configuration issues detected", result.args)
|
||
assert result.returncode != 0
|
||
|
||
|
||
def test_restrictive_permissions_handling(tmp_path: pathlib.Path) -> None:
|
||
"""Test handling of restrictive permissions on bind mounts.
|
||
|
||
Simulates a user mounting a directory with restrictive permissions (e.g., 755 root:root).
|
||
The container should either fail gracefully or handle it if running as root (which triggers fix).
|
||
If running as non-root (default), it should fail to write if it doesn't have access.
|
||
"""
|
||
paths = _setup_mount_tree(tmp_path, "restrictive_perms")
|
||
|
||
# Helper to chown without userns host (workaround for potential devcontainer hang)
|
||
def _chown_root_safe(host_path: pathlib.Path) -> None:
|
||
cmd = [
|
||
"docker", "run", "--rm",
|
||
# "--userns", "host", # Removed to avoid hang
|
||
"--user", "0:0",
|
||
"--entrypoint", "/bin/chown",
|
||
"-v", f"{host_path}:/mnt",
|
||
IMAGE,
|
||
"-R", "0:0", "/mnt",
|
||
]
|
||
subprocess.run(
|
||
cmd,
|
||
check=True,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
# Set up a restrictive directory (root owned, 755)
|
||
target_dir = paths["app_db"]
|
||
_chown_root_safe(target_dir)
|
||
target_dir.chmod(0o755)
|
||
|
||
# Mount ALL volumes to avoid 'find' errors in 0-storage-permission.sh
|
||
keys = {"data", "app_db", "app_config", "app_log", "app_api", "services_run", "nginx_conf"}
|
||
volumes = _build_volume_args_for_keys(paths, keys)
|
||
|
||
# Case 1: Running as non-root (default) - Should fail to write
|
||
# We disable host network/userns to avoid potential hangs in devcontainer environment
|
||
result = _run_container(
|
||
"restrictive-perms-user",
|
||
volumes,
|
||
user="20211:20211",
|
||
sleep_seconds=5,
|
||
network_mode=None,
|
||
userns_mode=None
|
||
)
|
||
assert result.returncode != 0 or "Permission denied" in result.output or "Unable to write" in result.output
|
||
|
||
# Case 2: Running as root - Should trigger the fix script
|
||
result_root = _run_container(
|
||
"restrictive-perms-root",
|
||
volumes,
|
||
user="0:0",
|
||
sleep_seconds=5,
|
||
network_mode=None,
|
||
userns_mode=None
|
||
)
|
||
|
||
_assert_contains(result_root, "NetAlertX is running as ROOT", result_root.args)
|
||
_assert_contains(result_root, "Permissions fixed for read-write paths", result_root.args)
|
||
|
||
check_cmd = [
|
||
"docker", "run", "--rm",
|
||
"--entrypoint", "/bin/sh",
|
||
"--user", "20211:20211",
|
||
IMAGE,
|
||
"-c", "ls -ldn /data/db && touch /data/db/test_write_after_fix"
|
||
]
|
||
# Add all volumes to check_cmd too
|
||
for host_path, target, _readonly in volumes:
|
||
check_cmd.extend(["-v", f"{host_path}:{target}"])
|
||
|
||
check_result = subprocess.run(
|
||
check_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=SUBPROCESS_TIMEOUT_SECONDS,
|
||
)
|
||
|
||
if check_result.returncode != 0:
|
||
print(f"Check command failed. Cmd: {check_cmd}")
|
||
print(f"Stderr: {check_result.stderr}")
|
||
print(f"Stdout: {check_result.stdout}")
|
||
|
||
assert check_result.returncode == 0, f"Should be able to write after root fix script runs. Stderr: {check_result.stderr}. Stdout: {check_result.stdout}"
|
||
|