Merge pull request #1653 from netalertx/next_release

Refactor sync data processing to handle PUSH and PULL modes with impr…
This commit is contained in:
Jokob @NetAlertX
2026-05-24 12:22:32 +10:00
committed by GitHub
3 changed files with 103 additions and 21 deletions

View File

@@ -23,6 +23,7 @@ description: NetAlertX coding standards and conventions. Use this when writing c
- use environment variables for runtime paths, never hardcode paths or use relative paths
- follow existing code style and structure, and ensure backward compatibility with existing installations when submitting PRs
- all code needs to be scalable to handle large networks with thousands of devices (10k+) without performance degradation
- no inline imports, all imports must be at the top of the file
## File Length

View File

@@ -175,23 +175,35 @@ def main():
if file_name != 'last_result.log':
mylog('verbose', [f'[{pluginName}] Processing: "{file_name}"'])
# make sure the file has the correct name (e.g last_result.encoded.Node_1.1.log) to skip any otehr plugin files
if len(file_name.split('.')) > 2:
# Extract node name from either last_result.decoded.Node_1.1.log or last_result.Node_1.log
parts = file_name.split('.')
# If decoded/encoded file, node name is at index 2; otherwise at index 1
syncHubNodeName = parts[2] if 'decoded' in file_name or 'encoded' in file_name else parts[1]
# Only process sync artifacts:
# PUSH mode (decoded): last_result.PLUGIN.decoded.NodeName.N.log (6 parts)
# PULL mode: last_result.NodeName.log (3 parts, valid JSON)
# Local plugin result files (last_result.ARPSCAN.log) are also 3 parts but
# are pipe-delimited — catch and skip them via the JSONDecodeError guard below.
parts = file_name.split('.')
if len(parts) > 2:
# Extract node name:
# decoded/encoded: last_result.PLUGIN.decoded.NodeName.N.log → parts[3]
# pull mode: last_result.NodeName.log → parts[1]
if 'decoded' in file_name or 'encoded' in file_name:
syncHubNodeName = parts[3]
else:
syncHubNodeName = parts[1]
file_path = f"{LOG_PATH}/{file_name}"
with open(file_path, 'r') as f:
data = json.load(f)
try:
with open(file_path, 'r') as f:
data = json.load(f)
for device in data['data']:
device['devMac'] = str(device['devMac']).lower()
if device['devMac'].lower() not in unique_mac_addresses:
device['devSyncHubNode'] = syncHubNodeName
unique_mac_addresses.add(device['devMac'].lower())
device_data.append(device)
except (json.JSONDecodeError, KeyError):
mylog('verbose', [f'[{pluginName}] Skipping "{file_name}" - not a valid sync JSON payload'])
continue
# Rename the file to "processed_" + current name
new_file_name = f"processed_{file_name}"

View File

@@ -68,9 +68,14 @@ def _get_data(api_token, node_url):
def _node_name_from_filename(file_name: str) -> str:
"""Mirror of the node-name extraction in sync.main()."""
"""Mirror of the node-name extraction in sync.main() (Mode 3).
Real file formats produced by the system:
PUSH (post-decode): last_result.PLUGIN.decoded.NodeName.N.log → parts[3]
PULL: last_result.NodeName.log → parts[1]
"""
parts = file_name.split(".")
return parts[2] if ("decoded" in file_name or "encoded" in file_name) else parts[1]
return parts[3] if ("decoded" in file_name or "encoded" in file_name) else parts[1]
def _should_delete_after_process(filename: str) -> bool:
@@ -306,23 +311,33 @@ class TestGetData:
class TestNodeNameExtraction:
def test_simple_filename(self):
# last_result.MyNode.log → "MyNode"
def test_pull_mode_filename(self):
# PULL mode: last_result.MyNode.log → "MyNode"
assert _node_name_from_filename("last_result.MyNode.log") == "MyNode"
def test_decoded_filename(self):
# last_result.decoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.decoded.MyNode.1.log") == "MyNode"
def test_push_decoded_filename(self):
# PUSH mode (post-decode): last_result.ARPSCAN.decoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.ARPSCAN.decoded.MyNode.1.log") == "MyNode"
def test_encoded_filename(self):
# last_result.encoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.encoded.MyNode.1.log") == "MyNode"
def test_push_encoded_filename(self):
# PUSH mode (pre-decode): last_result.ARPSCAN.encoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.ARPSCAN.encoded.MyNode.1.log") == "MyNode"
def test_node_name_with_underscores(self):
def test_pull_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.Wladek_Site.log") == "Wladek_Site"
def test_decoded_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.decoded.Wladek_Site.1.log") == "Wladek_Site"
def test_push_decoded_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.ARPSCAN.decoded.Wladek_Site.1.log") == "Wladek_Site"
def test_push_decoded_node_name_with_counter_gt_1(self):
# Counter increments when multiple pushes arrive before SYNC runs
assert _node_name_from_filename("last_result.ARPSCAN.decoded.Node_Vlan01.3.log") == "Node_Vlan01"
def test_push_decoded_different_plugins(self):
for plugin in ("NMAP", "PIHOLE", "DHCPLEASES"):
fname = f"last_result.{plugin}.decoded.HubNode.1.log"
assert _node_name_from_filename(fname) == "HubNode", \
f"Expected 'HubNode' from {fname}"
# ===========================================================================
@@ -465,3 +480,57 @@ class TestPluginFileRetention:
def test_empty_device_list_returns_zero(self, conn):
assert sync_insert_devices(conn, [], existing_macs=set()) == 0
# ===========================================================================
# Mode 3 JSON-skip behaviour
# Regression: local plugin result files (pipe-delimited) must not crash Mode 3.
# ===========================================================================
def _parse_sync_payload(file_path: str) -> list:
"""Mirror of the json.load + data['data'] block in sync.main() Mode 3.
Returns the list of device dicts on success, or raises nothing on invalid
input — callers should catch JSONDecodeError / KeyError and skip the file.
"""
with open(file_path, "r") as f:
data = json.load(f)
return data["data"]
class TestMode3JsonSkip:
"""Regression for the crash when Mode 3 encountered pipe-delimited plugin files.
Before the fix, sync.py called json.load() on every last_result.*.log file
returned by decode_and_rename_files(), including local plugin result files
(e.g. last_result.DIGSCAN.log) which are pipe-delimited and not JSON. The
fix wraps the load in try/except(JSONDecodeError, KeyError) and continues.
"""
def test_valid_sync_payload_is_parsed(self, tmp_path):
payload = {"data": [{"devMac": "aa:bb:cc:dd:ee:01", "devName": "TestDevice"}]}
f = tmp_path / "last_result.ARPSCAN.decoded.Node1.1.log"
f.write_text(json.dumps(payload))
result = _parse_sync_payload(str(f))
assert len(result) == 1
assert result[0]["devMac"] == "aa:bb:cc:dd:ee:01"
def test_pipe_delimited_file_raises_json_error(self, tmp_path):
"""Pipe-delimited plugin file must raise JSONDecodeError so callers can skip it."""
f = tmp_path / "last_result.DIGSCAN.log"
f.write_text("aa:bb:cc:dd:ee:01|192.168.1.1|2026-01-01 00:00:00|hostname||subnet||DIGSCAN|||||\n")
with pytest.raises(json.JSONDecodeError):
_parse_sync_payload(str(f))
def test_json_without_data_key_raises_key_error(self, tmp_path):
"""JSON that lacks the 'data' key must raise KeyError so callers can skip it."""
f = tmp_path / "last_result.UNKNOWN.log"
f.write_text(json.dumps({"result": []}))
with pytest.raises(KeyError):
_parse_sync_payload(str(f))
def test_empty_file_raises_json_error(self, tmp_path):
f = tmp_path / "last_result.EMPTY.log"
f.write_text("")
with pytest.raises(json.JSONDecodeError):
_parse_sync_payload(str(f))