mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-05-25 00:55:53 -04:00
Merge pull request #1653 from netalertx/next_release
Refactor sync data processing to handle PUSH and PULL modes with impr…
This commit is contained in:
1
.github/skills/code-standards/SKILL.md
vendored
1
.github/skills/code-standards/SKILL.md
vendored
@@ -23,6 +23,7 @@ description: NetAlertX coding standards and conventions. Use this when writing c
|
||||
- use environment variables for runtime paths, never hardcode paths or use relative paths
|
||||
- follow existing code style and structure, and ensure backward compatibility with existing installations when submitting PRs
|
||||
- all code needs to be scalable to handle large networks with thousands of devices (10k+) without performance degradation
|
||||
- no inline imports, all imports must be at the top of the file
|
||||
|
||||
|
||||
## File Length
|
||||
|
||||
@@ -175,23 +175,35 @@ def main():
|
||||
if file_name != 'last_result.log':
|
||||
mylog('verbose', [f'[{pluginName}] Processing: "{file_name}"'])
|
||||
|
||||
# make sure the file has the correct name (e.g last_result.encoded.Node_1.1.log) to skip any otehr plugin files
|
||||
if len(file_name.split('.')) > 2:
|
||||
# Extract node name from either last_result.decoded.Node_1.1.log or last_result.Node_1.log
|
||||
parts = file_name.split('.')
|
||||
# If decoded/encoded file, node name is at index 2; otherwise at index 1
|
||||
syncHubNodeName = parts[2] if 'decoded' in file_name or 'encoded' in file_name else parts[1]
|
||||
# Only process sync artifacts:
|
||||
# PUSH mode (decoded): last_result.PLUGIN.decoded.NodeName.N.log (6 parts)
|
||||
# PULL mode: last_result.NodeName.log (3 parts, valid JSON)
|
||||
# Local plugin result files (last_result.ARPSCAN.log) are also 3 parts but
|
||||
# are pipe-delimited — catch and skip them via the JSONDecodeError guard below.
|
||||
parts = file_name.split('.')
|
||||
if len(parts) > 2:
|
||||
# Extract node name:
|
||||
# decoded/encoded: last_result.PLUGIN.decoded.NodeName.N.log → parts[3]
|
||||
# pull mode: last_result.NodeName.log → parts[1]
|
||||
if 'decoded' in file_name or 'encoded' in file_name:
|
||||
syncHubNodeName = parts[3]
|
||||
else:
|
||||
syncHubNodeName = parts[1]
|
||||
|
||||
file_path = f"{LOG_PATH}/{file_name}"
|
||||
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
for device in data['data']:
|
||||
device['devMac'] = str(device['devMac']).lower()
|
||||
if device['devMac'].lower() not in unique_mac_addresses:
|
||||
device['devSyncHubNode'] = syncHubNodeName
|
||||
unique_mac_addresses.add(device['devMac'].lower())
|
||||
device_data.append(device)
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
mylog('verbose', [f'[{pluginName}] Skipping "{file_name}" - not a valid sync JSON payload'])
|
||||
continue
|
||||
|
||||
# Rename the file to "processed_" + current name
|
||||
new_file_name = f"processed_{file_name}"
|
||||
|
||||
@@ -68,9 +68,14 @@ def _get_data(api_token, node_url):
|
||||
|
||||
|
||||
def _node_name_from_filename(file_name: str) -> str:
|
||||
"""Mirror of the node-name extraction in sync.main()."""
|
||||
"""Mirror of the node-name extraction in sync.main() (Mode 3).
|
||||
|
||||
Real file formats produced by the system:
|
||||
PUSH (post-decode): last_result.PLUGIN.decoded.NodeName.N.log → parts[3]
|
||||
PULL: last_result.NodeName.log → parts[1]
|
||||
"""
|
||||
parts = file_name.split(".")
|
||||
return parts[2] if ("decoded" in file_name or "encoded" in file_name) else parts[1]
|
||||
return parts[3] if ("decoded" in file_name or "encoded" in file_name) else parts[1]
|
||||
|
||||
|
||||
def _should_delete_after_process(filename: str) -> bool:
|
||||
@@ -306,23 +311,33 @@ class TestGetData:
|
||||
|
||||
class TestNodeNameExtraction:
|
||||
|
||||
def test_simple_filename(self):
|
||||
# last_result.MyNode.log → "MyNode"
|
||||
def test_pull_mode_filename(self):
|
||||
# PULL mode: last_result.MyNode.log → "MyNode"
|
||||
assert _node_name_from_filename("last_result.MyNode.log") == "MyNode"
|
||||
|
||||
def test_decoded_filename(self):
|
||||
# last_result.decoded.MyNode.1.log → "MyNode"
|
||||
assert _node_name_from_filename("last_result.decoded.MyNode.1.log") == "MyNode"
|
||||
def test_push_decoded_filename(self):
|
||||
# PUSH mode (post-decode): last_result.ARPSCAN.decoded.MyNode.1.log → "MyNode"
|
||||
assert _node_name_from_filename("last_result.ARPSCAN.decoded.MyNode.1.log") == "MyNode"
|
||||
|
||||
def test_encoded_filename(self):
|
||||
# last_result.encoded.MyNode.1.log → "MyNode"
|
||||
assert _node_name_from_filename("last_result.encoded.MyNode.1.log") == "MyNode"
|
||||
def test_push_encoded_filename(self):
|
||||
# PUSH mode (pre-decode): last_result.ARPSCAN.encoded.MyNode.1.log → "MyNode"
|
||||
assert _node_name_from_filename("last_result.ARPSCAN.encoded.MyNode.1.log") == "MyNode"
|
||||
|
||||
def test_node_name_with_underscores(self):
|
||||
def test_pull_node_name_with_underscores(self):
|
||||
assert _node_name_from_filename("last_result.Wladek_Site.log") == "Wladek_Site"
|
||||
|
||||
def test_decoded_node_name_with_underscores(self):
|
||||
assert _node_name_from_filename("last_result.decoded.Wladek_Site.1.log") == "Wladek_Site"
|
||||
def test_push_decoded_node_name_with_underscores(self):
|
||||
assert _node_name_from_filename("last_result.ARPSCAN.decoded.Wladek_Site.1.log") == "Wladek_Site"
|
||||
|
||||
def test_push_decoded_node_name_with_counter_gt_1(self):
|
||||
# Counter increments when multiple pushes arrive before SYNC runs
|
||||
assert _node_name_from_filename("last_result.ARPSCAN.decoded.Node_Vlan01.3.log") == "Node_Vlan01"
|
||||
|
||||
def test_push_decoded_different_plugins(self):
|
||||
for plugin in ("NMAP", "PIHOLE", "DHCPLEASES"):
|
||||
fname = f"last_result.{plugin}.decoded.HubNode.1.log"
|
||||
assert _node_name_from_filename(fname) == "HubNode", \
|
||||
f"Expected 'HubNode' from {fname}"
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
@@ -465,3 +480,57 @@ class TestPluginFileRetention:
|
||||
|
||||
def test_empty_device_list_returns_zero(self, conn):
|
||||
assert sync_insert_devices(conn, [], existing_macs=set()) == 0
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Mode 3 JSON-skip behaviour
|
||||
# Regression: local plugin result files (pipe-delimited) must not crash Mode 3.
|
||||
# ===========================================================================
|
||||
|
||||
def _parse_sync_payload(file_path: str) -> list:
|
||||
"""Mirror of the json.load + data['data'] block in sync.main() Mode 3.
|
||||
|
||||
Returns the list of device dicts on success, or raises nothing on invalid
|
||||
input — callers should catch JSONDecodeError / KeyError and skip the file.
|
||||
"""
|
||||
with open(file_path, "r") as f:
|
||||
data = json.load(f)
|
||||
return data["data"]
|
||||
|
||||
|
||||
class TestMode3JsonSkip:
|
||||
"""Regression for the crash when Mode 3 encountered pipe-delimited plugin files.
|
||||
|
||||
Before the fix, sync.py called json.load() on every last_result.*.log file
|
||||
returned by decode_and_rename_files(), including local plugin result files
|
||||
(e.g. last_result.DIGSCAN.log) which are pipe-delimited and not JSON. The
|
||||
fix wraps the load in try/except(JSONDecodeError, KeyError) and continues.
|
||||
"""
|
||||
|
||||
def test_valid_sync_payload_is_parsed(self, tmp_path):
|
||||
payload = {"data": [{"devMac": "aa:bb:cc:dd:ee:01", "devName": "TestDevice"}]}
|
||||
f = tmp_path / "last_result.ARPSCAN.decoded.Node1.1.log"
|
||||
f.write_text(json.dumps(payload))
|
||||
result = _parse_sync_payload(str(f))
|
||||
assert len(result) == 1
|
||||
assert result[0]["devMac"] == "aa:bb:cc:dd:ee:01"
|
||||
|
||||
def test_pipe_delimited_file_raises_json_error(self, tmp_path):
|
||||
"""Pipe-delimited plugin file must raise JSONDecodeError so callers can skip it."""
|
||||
f = tmp_path / "last_result.DIGSCAN.log"
|
||||
f.write_text("aa:bb:cc:dd:ee:01|192.168.1.1|2026-01-01 00:00:00|hostname||subnet||DIGSCAN|||||\n")
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
_parse_sync_payload(str(f))
|
||||
|
||||
def test_json_without_data_key_raises_key_error(self, tmp_path):
|
||||
"""JSON that lacks the 'data' key must raise KeyError so callers can skip it."""
|
||||
f = tmp_path / "last_result.UNKNOWN.log"
|
||||
f.write_text(json.dumps({"result": []}))
|
||||
with pytest.raises(KeyError):
|
||||
_parse_sync_payload(str(f))
|
||||
|
||||
def test_empty_file_raises_json_error(self, tmp_path):
|
||||
f = tmp_path / "last_result.EMPTY.log"
|
||||
f.write_text("")
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
_parse_sync_payload(str(f))
|
||||
|
||||
Reference in New Issue
Block a user