Enhance code standards, update contributing guidelines, and add tests for SYNC plugin functionality

This commit is contained in:
Jokob @NetAlertX
2026-03-07 21:34:38 +00:00
parent abf024d4d3
commit d49abd9d02
8 changed files with 709 additions and 26 deletions

View File

@@ -5,12 +5,12 @@ description: NetAlertX coding standards and conventions. Use this when writing c
# Code Standards
- ask me to review before going to each next step (mention n step out of x)
- before starting, prepare implementation plan
- ask me to review before going to each next step (mention n step out of x) (AI only)
- before starting, prepare implementation plan (AI only)
- ask me to review it and ask any clarifying questions first
- add test creation as last step - follow repo architecture patterns - do not place in the root of /test
- code has to be maintainable, no duplicate code
- follow DRY principle
- follow DRY principle - maintainability of code is more important than speed of implementation
- code files should be less than 500 LOC for better maintainability
## File Length

View File

@@ -3,6 +3,10 @@ name: 🧪 Manual Test Suite Selector
on:
workflow_dispatch:
inputs:
run_all:
description: '✅ Run ALL tests (overrides individual selectors)'
type: boolean
default: false
run_scan:
description: '📂 scan/ (Scan, Logic, Locks, IPs)'
type: boolean
@@ -23,6 +27,10 @@ on:
description: '📂 ui/ (Selenium & Dashboard)'
type: boolean
default: false
run_plugins:
description: '📂 plugins/ (Sync insert schema-aware logic)'
type: boolean
default: false
run_root_files:
description: '📄 Root Test Files (WOL, Atomicity, etc.)'
type: boolean
@@ -42,12 +50,20 @@ jobs:
id: builder
run: |
PATHS=""
# run_all overrides everything
if [ "${{ github.event.inputs.run_all }}" == "true" ]; then
echo "final_paths=test/" >> $GITHUB_OUTPUT
exit 0
fi
# Folder Mapping with 'test/' prefix
if [ "${{ github.event.inputs.run_scan }}" == "true" ]; then PATHS="$PATHS test/scan/"; fi
if [ "${{ github.event.inputs.run_api }}" == "true" ]; then PATHS="$PATHS test/api_endpoints/ test/server/"; fi
if [ "${{ github.event.inputs.run_backend }}" == "true" ]; then PATHS="$PATHS test/backend/ test/db/"; fi
if [ "${{ github.event.inputs.run_docker_env }}" == "true" ]; then PATHS="$PATHS test/docker_tests/"; fi
if [ "${{ github.event.inputs.run_ui }}" == "true" ]; then PATHS="$PATHS test/ui/"; fi
if [ "${{ github.event.inputs.run_plugins }}" == "true" ]; then PATHS="$PATHS test/plugins/"; fi
# Root Files Mapping (files sitting directly in /test/)
if [ "${{ github.event.inputs.run_root_files }}" == "true" ]; then

View File

@@ -1,23 +1,23 @@
# 🤝 Contributing to NetAlertX
# Contributing to NetAlertX
First off, **thank you** for taking the time to contribute! NetAlertX is built and improved with the help of passionate people like you.
---
## 📂 Issues, Bugs, and Feature Requests
## Issues, Bugs, and Feature Requests
Please use the [GitHub Issue Tracker](https://github.com/netalertx/NetAlertX/issues) for:
- Bug reports 🐞
- Feature requests 💡
- Documentation feedback 📖
- Bug reports
- Feature requests
- Documentation feedback
Before opening a new issue:
- 🛑 [Check Common Issues & Debug Tips](https://docs.netalertx.com/DEBUG_TIPS#common-issues)
- 🔍 [Search Closed Issues](https://github.com/netalertx/NetAlertX/issues?q=is%3Aissue+is%3Aclosed)
- [Check Common Issues & Debug Tips](https://docs.netalertx.com/DEBUG_TIPS#common-issues)
- [Search Closed Issues](https://github.com/netalertx/NetAlertX/issues?q=is%3Aissue+is%3Aclosed)
---
## 🚀 Submitting Pull Requests (PRs)
## Submitting Pull Requests (PRs)
We welcome PRs to improve the code, docs, or UI!
@@ -29,9 +29,14 @@ Please:
- If relevant, add or update tests and documentation
- For plugins, refer to the [Plugin Dev Guide](https://docs.netalertx.com/PLUGINS_DEV)
## Code quality
- read and follow the [code-standards](/.github/skills/code-standards/SKILL.md)
---
## 🌟 First-Time Contributors
## First-Time Contributors
New to open source? Check out these resources:
- [How to Fork and Submit a PR](https://opensource.guide/how-to-contribute/)
@@ -39,15 +44,15 @@ New to open source? Check out these resources:
---
## 🔐 Code of Conduct
## Code of Conduct
By participating, you agree to follow our [Code of Conduct](./CODE_OF_CONDUCT.md), which ensures a respectful and welcoming community.
---
## 📬 Contact
## Contact
If you have more in-depth questions or want to discuss contributing in other ways, feel free to reach out at:
📧 [jokob@duck.com](mailto:jokob@duck.com?subject=NetAlertX%20Contribution)
[jokob.sk@gmail.com](mailto:jokob.sk@gmail.com?subject=NetAlertX%20Contribution)
We appreciate every contribution, big or small! 💙

View File

@@ -222,27 +222,30 @@ def main():
extra = '',
foreignKey = device['devGUID'])
# Resolve the actual columns that exist in the Devices table once.
# This automatically excludes computed/virtual fields (e.g. devStatus,
# devIsSleeping) and 'rowid' without needing a maintained exclusion list.
cursor.execute("PRAGMA table_info(Devices)")
db_columns = {row[1] for row in cursor.fetchall()}
# Filter out existing devices
new_devices = [device for device in device_data if device['devMac'] not in existing_mac_addresses]
# Remove 'rowid' key if it exists
for device in new_devices:
device.pop('rowid', None)
device.pop('devStatus', None)
mylog('verbose', [f'[{pluginName}] All devices: "{len(device_data)}"'])
mylog('verbose', [f'[{pluginName}] New devices: "{len(new_devices)}"'])
# Prepare the insert statement
if new_devices:
# creating insert statement, removing 'rowid', 'devStatus' as handled on the target and devStatus is resolved on the fly
columns = ', '.join(k for k in new_devices[0].keys() if k not in ['rowid', 'devStatus'])
placeholders = ', '.join('?' for k in new_devices[0] if k not in ['rowid', 'devStatus'])
# Only keep keys that are real columns in the target DB; computed
# or unknown fields are silently dropped regardless of source schema.
insert_cols = [k for k in new_devices[0].keys() if k in db_columns]
columns = ', '.join(insert_cols)
placeholders = ', '.join('?' for _ in insert_cols)
sql = f'INSERT INTO Devices ({columns}) VALUES ({placeholders})'
# Extract values for the new devices
values = [tuple(device.values()) for device in new_devices]
# Extract only the whitelisted column values for each device
values = [tuple(device.get(col) for col in insert_cols) for device in new_devices]
mylog('verbose', [f'[{pluginName}] Inserting Devices SQL : "{sql}"'])
mylog('verbose', [f'[{pluginName}] Inserting Devices VALUES: "{values}"'])

View File

@@ -5,7 +5,7 @@ Import from any test subdirectory with:
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import make_db, insert_device, minutes_ago, DummyDB, down_event_macs
from db_test_helpers import make_db, insert_device, minutes_ago, DummyDB, down_event_macs, make_device_dict, sync_insert_devices
"""
import sqlite3
@@ -202,6 +202,125 @@ def insert_device(
)
def make_device_dict(mac: str = "aa:bb:cc:dd:ee:ff", **overrides) -> dict:
"""
Return a fully-populated Devices row dict with safe defaults.
Mirrors every column in CREATE_DEVICES so callers can be inserted
directly via sync_insert_devices() or similar helpers. Pass keyword
arguments to override any individual field.
Computed/view-only columns (devStatus, devIsSleeping, devFlapping,
rowid, …) are intentionally absent — tests that need to verify they are
dropped should add them after calling this function.
"""
base = {
"devMac": mac,
"devName": "Test Device",
"devOwner": "",
"devType": "",
"devVendor": "Acme",
"devFavorite": 0,
"devGroup": "",
"devComments": "",
"devFirstConnection": "2024-01-01 00:00:00",
"devLastConnection": "2024-01-02 00:00:00",
"devLastIP": "192.168.1.10",
"devPrimaryIPv4": "192.168.1.10",
"devPrimaryIPv6": "",
"devVlan": "",
"devForceStatus": "",
"devStaticIP": "",
"devScan": 1,
"devLogEvents": 1,
"devAlertEvents": 1,
"devAlertDown": 1,
"devCanSleep": 0,
"devSkipRepeated": 0,
"devLastNotification": "",
"devPresentLastScan": 1,
"devIsNew": 0,
"devLocation": "",
"devIsArchived": 0,
"devParentMAC": "",
"devParentPort": "",
"devIcon": "",
"devGUID": "test-guid-1",
"devSite": "",
"devSSID": "",
"devSyncHubNode": "node1",
"devSourcePlugin": "",
"devCustomProps": "",
"devFQDN": "",
"devParentRelType": "",
"devReqNicsOnline": 0,
"devMacSource": "",
"devNameSource": "",
"devFQDNSource": "",
"devLastIPSource": "",
"devVendorSource": "",
"devSSIDSource": "",
"devParentMACSource": "",
"devParentPortSource": "",
"devParentRelTypeSource": "",
"devVlanSource": "",
}
base.update(overrides)
return base
# ---------------------------------------------------------------------------
# Sync insert helper (shared by test/plugins/test_sync_insert.py and
# test/plugins/test_sync_protocol.py — mirrors sync.py's insert block)
# ---------------------------------------------------------------------------
def sync_insert_devices(
conn: sqlite3.Connection,
device_data: list,
existing_macs: set | None = None,
) -> int:
"""
Schema-aware device INSERT mirroring sync.py's Mode-3 insert block.
Parameters
----------
conn:
In-memory (or real) SQLite connection with a Devices table.
device_data:
List of device dicts as received from table_devices.json or a node log.
existing_macs:
Set of MAC addresses already present in Devices. Rows whose devMac is
in this set are skipped. Pass ``None`` (default) to insert everything.
Returns the number of rows actually inserted.
"""
if not device_data:
return 0
cursor = conn.cursor()
candidates = (
[d for d in device_data if d["devMac"] not in existing_macs]
if existing_macs is not None
else list(device_data)
)
if not candidates:
return 0
cursor.execute("PRAGMA table_info(Devices)")
db_columns = {row[1] for row in cursor.fetchall()}
insert_cols = [k for k in candidates[0].keys() if k in db_columns]
columns = ", ".join(insert_cols)
placeholders = ", ".join("?" for _ in insert_cols)
sql = f"INSERT INTO Devices ({columns}) VALUES ({placeholders})"
values = [tuple(d.get(col) for col in insert_cols) for d in candidates]
cursor.executemany(sql, values)
conn.commit()
return len(values)
# ---------------------------------------------------------------------------
# Assertion helpers
# ---------------------------------------------------------------------------

0
test/plugins/__init__.py Normal file
View File

View File

@@ -0,0 +1,130 @@
"""
Tests for the SYNC plugin's schema-aware device insert logic.
The core invariant: only columns that actually exist in the Devices table
are included in the INSERT statement. Computed/virtual fields (devStatus,
devIsSleeping, devFlapping) and unknown future columns must be silently
dropped — never cause an OperationalError.
"""
import sys
import os
import pytest
# Ensure shared helpers and server code are importable.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
from db_test_helpers import make_db, make_device_dict, sync_insert_devices # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with the Devices table and all views."""
return make_db()
class TestSyncInsertSchemaAware:
def test_clean_device_inserts_successfully(self, conn):
"""Happy path: a well-formed device dict inserts without error."""
device = make_device_dict()
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
cur = conn.cursor()
cur.execute("SELECT devMac FROM Devices WHERE devMac = ?", (device["devMac"],))
row = cur.fetchone()
assert row is not None
def test_computed_devStatus_is_silently_dropped(self, conn):
"""devStatus is a computed view column — must NOT raise OperationalError."""
device = make_device_dict()
device["devStatus"] = "Online" # computed in DevicesView, not in Devices table
# Pre-fix this would raise: sqlite3.OperationalError: table Devices has no column named devStatus
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_computed_devIsSleeping_is_silently_dropped(self, conn):
"""devIsSleeping is a CTE/view column — must NOT raise OperationalError."""
device = make_device_dict()
device["devIsSleeping"] = 0 # the exact field that triggered the original bug report
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_computed_devFlapping_is_silently_dropped(self, conn):
"""devFlapping is also computed in the view."""
device = make_device_dict()
device["devFlapping"] = 0
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_rowid_is_silently_dropped(self, conn):
"""rowid must never appear in an INSERT column list."""
device = make_device_dict()
device["rowid"] = 42
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_all_computed_fields_at_once(self, conn):
"""All known computed/virtual columns together — none should abort the insert."""
device = make_device_dict()
device["rowid"] = 99
device["devStatus"] = "Online"
device["devIsSleeping"] = 0
device["devFlapping"] = 0
device["totally_unknown_future_column"] = "ignored"
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_batch_insert_multiple_devices(self, conn):
"""Multiple devices with computed fields all insert correctly."""
devices = []
for i in range(3):
d = make_device_dict(mac=f"aa:bb:cc:dd:ee:{i:02x}")
d["devGUID"] = f"guid-{i}"
d["devStatus"] = "Online" # computed
d["devIsSleeping"] = 0 # computed
devices.append(d)
inserted = sync_insert_devices(conn, devices)
assert inserted == len(devices)
def test_values_aligned_with_columns_after_filtering(self, conn):
"""Values must be extracted in the same order as insert_cols (alignment bug guard)."""
device = make_device_dict()
device["devStatus"] = "SHOULD_BE_DROPPED"
device["devIsSleeping"] = 999
sync_insert_devices(conn, [device])
cur = conn.cursor()
cur.execute("SELECT devName, devVendor, devLastIP FROM Devices WHERE devMac = ?", (device["devMac"],))
row = cur.fetchone()
assert row["devName"] == "Test Device"
assert row["devVendor"] == "Acme"
assert row["devLastIP"] == "192.168.1.10"
def test_unknown_column_does_not_prevent_insert(self, conn):
"""A column that was added on the node but doesn't exist on the hub is dropped."""
device = make_device_dict()
device["devNewFeatureOnlyOnNode"] = "some_value"
# Must not raise — hub schema wins
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_empty_device_list_returns_zero(self, conn):
"""Edge case: empty list should not raise and should return 0."""
inserted = sync_insert_devices(conn, [])
assert inserted == 0

View File

@@ -0,0 +1,410 @@
"""
Tests for SYNC plugin push/pull/receive behaviour.
Three modes exercised:
Mode 1 PUSH (NODE): send_data() POSTs encrypted device data to the hub.
Mode 2 PULL (HUB): get_data() GETs a base64 JSON blob from each node.
Mode 3 RECEIVE: hub parses decoded log files and upserts devices into DB.
sync.py is intentionally NOT imported here — its module-level code has side
effects (reads live config, initialises logging). Instead, the pure logic
under test is extracted into thin local mirrors that match the production
implementation exactly, so any divergence will surface as a test failure.
"""
import base64
import json
import os
import sys
from unittest.mock import MagicMock, patch
import pytest
import requests
# Make shared helpers + server packages importable from test/plugins/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
from db_test_helpers import make_db, make_device_dict, sync_insert_devices # noqa: E402
from utils.crypto_utils import encrypt_data, decrypt_data # noqa: E402
# ---------------------------------------------------------------------------
# Local mirrors of sync.py logic (no module-level side-effects on import)
# ---------------------------------------------------------------------------
API_ENDPOINT = "/sync"
def _send_data(api_token, file_content, encryption_key, file_path, node_name, pref, hub_url):
"""Mirror of sync.send_data() — returns True on HTTP 200, False otherwise."""
encrypted_data = encrypt_data(file_content, encryption_key)
data = {
"data": encrypted_data,
"file_path": file_path,
"plugin": pref,
"node_name": node_name,
}
headers = {"Authorization": f"Bearer {api_token}"}
try:
response = requests.post(hub_url + API_ENDPOINT, data=data, headers=headers, timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
def _get_data(api_token, node_url):
"""Mirror of sync.get_data() — returns parsed JSON dict or '' on any failure."""
headers = {"Authorization": f"Bearer {api_token}"}
try:
response = requests.get(node_url + API_ENDPOINT, headers=headers, timeout=5)
if response.status_code == 200:
return response.json()
except requests.RequestException:
pass
return ""
def _node_name_from_filename(file_name: str) -> str:
"""Mirror of the node-name extraction in sync.main()."""
parts = file_name.split(".")
return parts[2] if ("decoded" in file_name or "encoded" in file_name) else parts[1]
def _determine_mode(hub_url: str, send_devices: bool, plugins_to_sync: list, pull_nodes: list):
"""Mirror of the is_hub / is_node detection block in sync.main()."""
is_node = len(hub_url) > 0 and (send_devices or bool(plugins_to_sync))
is_hub = len(pull_nodes) > 0
return is_hub, is_node
def _currentscan_candidates(device_data: list[dict]) -> list[dict]:
"""
Mirror of the plugin_objects.add_object() filter in sync.main().
Only online (devPresentLastScan=1) and non-internet devices are eligible
to be written to the CurrentScan / plugin result file.
"""
return [
d for d in device_data
if d.get("devPresentLastScan") == 1 and str(d.get("devMac", "")).lower() != "internet"
]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
ENCRYPTION_KEY = "test-secret-key"
API_TOKEN = "tok_abc123"
HUB_URL = "http://hub.local:20211"
NODE_URL = "http://node.local:20211"
@pytest.fixture
def conn():
"""Fresh in-memory DB with Devices table and all views."""
return make_db()
# ===========================================================================
# Mode detection
# ===========================================================================
class TestModeDetection:
def test_is_node_when_hub_url_and_send_devices(self):
is_hub, is_node = _determine_mode(HUB_URL, send_devices=True, plugins_to_sync=[], pull_nodes=[])
assert is_node is True
assert is_hub is False
def test_is_node_when_hub_url_and_plugins_set(self):
is_hub, is_node = _determine_mode(HUB_URL, send_devices=False, plugins_to_sync=["NMAP"], pull_nodes=[])
assert is_node is True
assert is_hub is False
def test_is_hub_when_pull_nodes_set(self):
is_hub, is_node = _determine_mode("", send_devices=False, plugins_to_sync=[], pull_nodes=[NODE_URL])
assert is_hub is True
assert is_node is False
def test_is_both_hub_and_node(self):
is_hub, is_node = _determine_mode(HUB_URL, send_devices=True, plugins_to_sync=[], pull_nodes=[NODE_URL])
assert is_hub is True
assert is_node is True
def test_neither_when_no_config(self):
is_hub, is_node = _determine_mode("", send_devices=False, plugins_to_sync=[], pull_nodes=[])
assert is_hub is False
assert is_node is False
def test_no_hub_url_means_not_node_even_with_send_devices(self):
is_hub, is_node = _determine_mode("", send_devices=True, plugins_to_sync=[], pull_nodes=[])
assert is_node is False
# ===========================================================================
# send_data (Mode 1 PUSH)
# ===========================================================================
class TestSendData:
def _mock_post(self, status_code=200):
resp = MagicMock()
resp.status_code = status_code
return patch("requests.post", return_value=resp)
def test_returns_true_on_http_200(self):
with self._mock_post(200):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is True
def test_returns_false_on_non_200(self):
for code in (400, 401, 403, 500, 503):
with self._mock_post(code):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is False, f"Expected False for HTTP {code}"
def test_returns_false_on_connection_error(self):
with patch("requests.post", side_effect=requests.ConnectionError("refused")):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is False
def test_returns_false_on_timeout(self):
with patch("requests.post", side_effect=requests.Timeout("timed out")):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is False
def test_posts_to_correct_endpoint(self):
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
url_called = mock_post.call_args[0][0]
assert url_called == HUB_URL + "/sync"
def test_bearer_auth_header_sent(self):
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
headers = mock_post.call_args[1]["headers"]
assert headers["Authorization"] == f"Bearer {API_TOKEN}"
def test_payload_contains_expected_fields(self):
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
payload = mock_post.call_args[1]["data"]
assert "data" in payload # encrypted blob
assert payload["file_path"] == "/tmp/file.log"
assert payload["plugin"] == "SYNC"
assert payload["node_name"] == "node1"
def test_payload_data_is_encrypted_not_plaintext(self):
"""The 'data' field in the POST must be encrypted, not the raw content."""
plaintext = '{"secret": "do_not_expose"}'
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, plaintext, ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
transmitted = mock_post.call_args[1]["data"]["data"]
assert transmitted != plaintext
# Verify it round-trips correctly
assert decrypt_data(transmitted, ENCRYPTION_KEY) == plaintext
# ===========================================================================
# get_data (Mode 2 PULL)
# ===========================================================================
class TestGetData:
def _mock_get(self, status_code=200, json_body=None, side_effect=None):
resp = MagicMock()
resp.status_code = status_code
if json_body is not None:
resp.json.return_value = json_body
if side_effect is not None:
return patch("requests.get", side_effect=side_effect)
return patch("requests.get", return_value=resp)
def test_returns_parsed_json_on_200(self):
body = {"node_name": "node1", "data_base64": base64.b64encode(b"hello").decode()}
with self._mock_get(200, json_body=body):
result = _get_data(API_TOKEN, NODE_URL)
assert result == body
def test_gets_from_correct_endpoint(self):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {}
with patch("requests.get", return_value=resp) as mock_get:
_get_data(API_TOKEN, NODE_URL)
url_called = mock_get.call_args[0][0]
assert url_called == NODE_URL + "/sync"
def test_bearer_auth_header_sent(self):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {}
with patch("requests.get", return_value=resp) as mock_get:
_get_data(API_TOKEN, NODE_URL)
headers = mock_get.call_args[1]["headers"]
assert headers["Authorization"] == f"Bearer {API_TOKEN}"
def test_returns_empty_string_on_json_decode_error(self):
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = json.JSONDecodeError("bad json", "", 0)
with patch("requests.get", return_value=resp):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
def test_returns_empty_string_on_connection_error(self):
with patch("requests.get", side_effect=requests.ConnectionError("refused")):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
def test_returns_empty_string_on_timeout(self):
with patch("requests.get", side_effect=requests.Timeout("timed out")):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
def test_returns_empty_string_on_non_200(self):
resp = MagicMock()
resp.status_code = 401
with patch("requests.get", return_value=resp):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
# ===========================================================================
# Node name extraction from filename (Mode 3 RECEIVE)
# ===========================================================================
class TestNodeNameExtraction:
def test_simple_filename(self):
# last_result.MyNode.log → "MyNode"
assert _node_name_from_filename("last_result.MyNode.log") == "MyNode"
def test_decoded_filename(self):
# last_result.decoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.decoded.MyNode.1.log") == "MyNode"
def test_encoded_filename(self):
# last_result.encoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.encoded.MyNode.1.log") == "MyNode"
def test_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.Wladek_Site.log") == "Wladek_Site"
def test_decoded_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.decoded.Wladek_Site.1.log") == "Wladek_Site"
# ===========================================================================
# CurrentScan candidates filter (Mode 3 RECEIVE)
# ===========================================================================
class TestCurrentScanCandidates:
def test_online_device_is_included(self):
d = make_device_dict(devPresentLastScan=1)
assert len(_currentscan_candidates([d])) == 1
def test_offline_device_is_excluded(self):
d = make_device_dict(devPresentLastScan=0)
assert len(_currentscan_candidates([d])) == 0
def test_internet_mac_is_excluded(self):
d = make_device_dict(mac="internet", devPresentLastScan=1)
assert len(_currentscan_candidates([d])) == 0
def test_internet_mac_case_insensitive(self):
for mac in ("INTERNET", "Internet", "iNtErNeT"):
d = make_device_dict(mac=mac, devPresentLastScan=1)
assert len(_currentscan_candidates([d])) == 0, f"mac={mac!r} should be excluded"
def test_mixed_batch(self):
devices = [
make_device_dict(mac="aa:bb:cc:dd:ee:01", devPresentLastScan=1), # included
make_device_dict(mac="aa:bb:cc:dd:ee:02", devPresentLastScan=0), # offline
make_device_dict(mac="internet", devPresentLastScan=1), # root node
make_device_dict(mac="aa:bb:cc:dd:ee:03", devPresentLastScan=1), # included
]
result = _currentscan_candidates(devices)
macs = [d["devMac"] for d in result]
assert "aa:bb:cc:dd:ee:01" in macs
assert "aa:bb:cc:dd:ee:03" in macs
assert "aa:bb:cc:dd:ee:02" not in macs
assert "internet" not in macs
# ===========================================================================
# DB insert filtering new vs existing devices (Mode 3 RECEIVE)
# ===========================================================================
class TestReceiveInsert:
def test_new_device_is_inserted(self, conn):
device = make_device_dict(mac="aa:bb:cc:dd:ee:01")
inserted = sync_insert_devices(conn, [device], existing_macs=set())
assert inserted == 1
cur = conn.cursor()
cur.execute("SELECT devMac FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",))
assert cur.fetchone() is not None
def test_existing_device_is_not_reinserted(self, conn):
# Pre-populate Devices
cur = conn.cursor()
cur.execute(
"INSERT INTO Devices (devMac, devName) VALUES (?, ?)",
("aa:bb:cc:dd:ee:01", "Existing"),
)
conn.commit()
device = make_device_dict(mac="aa:bb:cc:dd:ee:01")
inserted = sync_insert_devices(conn, [device], existing_macs={"aa:bb:cc:dd:ee:01"})
assert inserted == 0
def test_only_new_devices_inserted_in_mixed_batch(self, conn):
cur = conn.cursor()
cur.execute(
"INSERT INTO Devices (devMac, devName) VALUES (?, ?)",
("aa:bb:cc:dd:ee:existing", "Existing"),
)
conn.commit()
devices = [
make_device_dict(mac="aa:bb:cc:dd:ee:existing"),
make_device_dict(mac="aa:bb:cc:dd:ee:new1"),
make_device_dict(mac="aa:bb:cc:dd:ee:new2"),
]
inserted = sync_insert_devices(
conn, devices, existing_macs={"aa:bb:cc:dd:ee:existing"}
)
assert inserted == 2
def test_computed_fields_in_payload_do_not_abort_insert(self, conn):
"""Regression: devIsSleeping / devStatus / devFlapping must be silently dropped."""
device = make_device_dict(mac="aa:bb:cc:dd:ee:01")
device["devIsSleeping"] = 0
device["devStatus"] = "Online"
device["devFlapping"] = 0
device["rowid"] = 99
# Must not raise OperationalError
inserted = sync_insert_devices(conn, [device], existing_macs=set())
assert inserted == 1
def test_empty_device_list_returns_zero(self, conn):
assert sync_insert_devices(conn, [], existing_macs=set()) == 0