mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-06-21 06:39:52 -04:00
- Added `queryByConditions` method to `DeviceInstance` for flexible device querying based on dynamic conditions. - Introduced `interpolate_tokens` function to replace placeholders in action values with actual device data. - Updated `UpdateFieldAction` to handle cross-device updates and archive conflicting MAC addresses. - Implemented cascade prevention in `WorkflowManager` to avoid processing events for devices modified in the same batch. - Added unit tests for new functionalities, including token interpolation, condition querying, and action execution. - Created constants for device column validation to enhance security and maintainability. - Established a structured research skill specification to guide development practices.
404 lines
15 KiB
Python
404 lines
15 KiB
Python
"""
|
|
Unit tests for Workflow Engine v2 — cross-device targeting.
|
|
|
|
Covers:
|
|
- interpolate_tokens()
|
|
- WorkflowManager.VALID_DEVICE_COLUMNS token validation
|
|
- WorkflowManager._validate_workflow_tokens()
|
|
- WorkflowManager.load_workflows() rejects invalid-token workflows
|
|
- DeviceInstance.queryByConditions()
|
|
- UpdateFieldAction boolean column casting
|
|
- UpdateFieldAction _archive_conflicting_mac guard
|
|
- WorkflowManager._mutated_guids cascade prevention
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
from db_test_helpers import make_db, make_device_dict, insert_device_from_dict
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared test helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_app_event(guid="evt-guid-1", obj_guid="dev-guid-1", obj_type="Devices",
|
|
event_type="update", index=1):
|
|
"""Return a dict mimicking an AppEvents sqlite3.Row."""
|
|
return {
|
|
"guid": guid,
|
|
"objectGuid": obj_guid,
|
|
"objectType": obj_type,
|
|
"appEventType": event_type,
|
|
"appEventProcessed": 0,
|
|
"index": index,
|
|
}
|
|
|
|
|
|
def make_stub_manager():
|
|
"""Return a WorkflowManager with a mock DB and no workflows loaded."""
|
|
from workflows.manager import WorkflowManager
|
|
db = MagicMock()
|
|
db.sql = MagicMock()
|
|
db.sql.execute.return_value.fetchall.return_value = []
|
|
db.commitDB = MagicMock()
|
|
with patch.object(WorkflowManager, "load_workflows", return_value=[]):
|
|
mgr = WorkflowManager(db)
|
|
return mgr
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# interpolate_tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInterpolateTokens(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
from workflows.actions import interpolate_tokens
|
|
self.interpolate = interpolate_tokens
|
|
|
|
def test_replaces_known_token(self):
|
|
device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
|
|
result = self.interpolate("{{trigger.devLastIP}}", device)
|
|
self.assertEqual(result, "10.0.0.5")
|
|
|
|
def test_replaces_multiple_tokens(self):
|
|
device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
|
|
result = self.interpolate("ip={{trigger.devLastIP}} mac={{trigger.devMac}}", device)
|
|
self.assertEqual(result, "ip=10.0.0.5 mac=aa:bb:cc:dd:ee:ff")
|
|
|
|
def test_leaves_unknown_token_unchanged(self):
|
|
device = {"devLastIP": "10.0.0.5"}
|
|
result = self.interpolate("{{trigger.doesNotExist}}", device)
|
|
self.assertEqual(result, "{{trigger.doesNotExist}}")
|
|
|
|
def test_non_string_value_returned_as_is(self):
|
|
device = {}
|
|
self.assertEqual(self.interpolate(42, device), 42)
|
|
self.assertIsNone(self.interpolate(None, device))
|
|
|
|
def test_empty_device_dict_leaves_token_unchanged(self):
|
|
result = self.interpolate("{{trigger.devMac}}", {})
|
|
self.assertEqual(result, "{{trigger.devMac}}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateWorkflowTokens(unittest.TestCase):
|
|
|
|
def test_valid_token_passes(self):
|
|
mgr = make_stub_manager()
|
|
wf = {"name": "test", "actions": [{"value": "{{trigger.devLastIP}}"}]}
|
|
self.assertTrue(mgr._validate_workflow_tokens(wf))
|
|
|
|
def test_invalid_token_fails(self):
|
|
mgr = make_stub_manager()
|
|
wf = {"name": "test", "actions": [{"value": "{{trigger.ip_address}}"}]}
|
|
self.assertFalse(mgr._validate_workflow_tokens(wf))
|
|
|
|
def test_nested_invalid_token_fails(self):
|
|
mgr = make_stub_manager()
|
|
wf = {
|
|
"name": "test",
|
|
"actions": [{
|
|
"target": {
|
|
"conditions": [{"value": "{{trigger.bad_field}}"}]
|
|
}
|
|
}]
|
|
}
|
|
self.assertFalse(mgr._validate_workflow_tokens(wf))
|
|
|
|
def test_no_tokens_passes(self):
|
|
mgr = make_stub_manager()
|
|
wf = {"name": "test", "conditions": [], "actions": [{"value": "static"}]}
|
|
self.assertTrue(mgr._validate_workflow_tokens(wf))
|
|
|
|
|
|
class TestLoadWorkflowsRejectsInvalidTokens(unittest.TestCase):
|
|
|
|
def _make_manager_loading(self, raw_workflows):
|
|
"""Build a WorkflowManager whose load_workflows() reads from a temp file."""
|
|
import workflows.manager as wf_mod
|
|
from workflows.manager import WorkflowManager
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
wf_path = os.path.join(tmpdir, "workflows.json")
|
|
with open(wf_path, "w") as f:
|
|
json.dump(raw_workflows, f)
|
|
|
|
orig = wf_mod.fullConfFolder
|
|
wf_mod.fullConfFolder = tmpdir
|
|
try:
|
|
db = MagicMock()
|
|
with patch.object(WorkflowManager, "load_workflows", return_value=[]):
|
|
mgr = WorkflowManager(db)
|
|
mgr.workflows = mgr.load_workflows()
|
|
finally:
|
|
wf_mod.fullConfFolder = orig
|
|
return mgr
|
|
|
|
def test_valid_workflow_loaded(self):
|
|
wf = {
|
|
"name": "Valid WF", "enabled": "Yes",
|
|
"trigger": {"object_type": "Devices", "event_type": "update"},
|
|
"conditions": [],
|
|
"actions": [{"type": "update_field", "field": "devIsNew",
|
|
"value": "{{trigger.devLastIP}}"}]
|
|
}
|
|
mgr = self._make_manager_loading([wf])
|
|
self.assertEqual(len(mgr.workflows), 1)
|
|
|
|
def test_invalid_token_workflow_rejected(self):
|
|
wf = {
|
|
"name": "Bad WF", "enabled": "Yes",
|
|
"trigger": {"object_type": "Devices", "event_type": "update"},
|
|
"conditions": [],
|
|
"actions": [{"type": "update_field", "field": "devIsNew",
|
|
"value": "{{trigger.nonexistent_field}}"}]
|
|
}
|
|
mgr = self._make_manager_loading([wf])
|
|
self.assertEqual(len(mgr.workflows), 0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DeviceInstance.queryByConditions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQueryByConditions(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.conn = make_db()
|
|
dev_a = make_device_dict("aa:bb:cc:dd:ee:01", devLastIP="192.168.1.10",
|
|
devGUID="guid-a", devIsArchived=0)
|
|
dev_b = make_device_dict("aa:bb:cc:dd:ee:02", devLastIP="192.168.1.10",
|
|
devGUID="guid-b", devIsArchived=0)
|
|
dev_c = make_device_dict("aa:bb:cc:dd:ee:03", devLastIP="192.168.1.20",
|
|
devGUID="guid-c", devIsArchived=0)
|
|
for d in [dev_a, dev_b, dev_c]:
|
|
insert_device_from_dict(self.conn, d)
|
|
|
|
def _instance(self):
|
|
from models.device_instance import DeviceInstance
|
|
inst = DeviceInstance()
|
|
# Patch _fetchall to use our in-memory connection
|
|
def _fetchall(q, p=()):
|
|
rows = self.conn.execute(q, p).fetchall()
|
|
return [dict(r) for r in rows]
|
|
inst._fetchall = _fetchall
|
|
return inst
|
|
|
|
def test_equals_returns_matching_devices(self):
|
|
inst = self._instance()
|
|
results = inst.queryByConditions([
|
|
{"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"}
|
|
])
|
|
macs = {r["devMac"] for r in results}
|
|
self.assertIn("aa:bb:cc:dd:ee:01", macs)
|
|
self.assertIn("aa:bb:cc:dd:ee:02", macs)
|
|
self.assertNotIn("aa:bb:cc:dd:ee:03", macs)
|
|
|
|
def test_multiple_conditions_and_logic(self):
|
|
inst = self._instance()
|
|
results = inst.queryByConditions([
|
|
{"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"},
|
|
{"field": "devMac", "operator": "equals", "value": "aa:bb:cc:dd:ee:01"},
|
|
])
|
|
self.assertEqual(len(results), 1)
|
|
self.assertEqual(results[0]["devMac"], "aa:bb:cc:dd:ee:01")
|
|
|
|
def test_contains_operator(self):
|
|
inst = self._instance()
|
|
results = inst.queryByConditions([
|
|
{"field": "devLastIP", "operator": "contains", "value": "192.168.1"}
|
|
])
|
|
self.assertEqual(len(results), 3)
|
|
|
|
def test_empty_conditions_returns_empty(self):
|
|
inst = self._instance()
|
|
results = inst.queryByConditions([])
|
|
self.assertEqual(results, [])
|
|
|
|
def test_unknown_field_skipped_returns_empty(self):
|
|
inst = self._instance()
|
|
results = inst.queryByConditions([
|
|
{"field": "nonexistent_column", "operator": "equals", "value": "x"}
|
|
])
|
|
self.assertEqual(results, [])
|
|
|
|
def test_unknown_operator_skipped_returns_empty(self):
|
|
inst = self._instance()
|
|
results = inst.queryByConditions([
|
|
{"field": "devLastIP", "operator": "regex", "value": ".*"}
|
|
])
|
|
self.assertEqual(results, [])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UpdateFieldAction — boolean cast
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdateFieldActionBooleanCast(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.conn = make_db()
|
|
dev = make_device_dict("aa:bb:cc:dd:ee:ff", devGUID="guid-1", devIsArchived=0)
|
|
insert_device_from_dict(self.conn, dev)
|
|
|
|
def _make_action(self, field, value, target_device):
|
|
from workflows.actions import UpdateFieldAction
|
|
trigger = MagicMock()
|
|
trigger.object = None
|
|
trigger.object_type = "Devices"
|
|
db = MagicMock()
|
|
|
|
action = UpdateFieldAction(db, field, value, trigger, target_device)
|
|
|
|
# Patch DeviceInstance.updateField to capture what value is written
|
|
self.written_value = None
|
|
def fake_update(guid, f, v):
|
|
self.written_value = v
|
|
with patch("workflows.actions.DeviceInstance") as MockDI:
|
|
MockDI.return_value.updateField.side_effect = fake_update
|
|
action.execute()
|
|
|
|
return self.written_value
|
|
|
|
def test_string_one_cast_to_int_for_boolean_column(self):
|
|
target = {"devGUID": "guid-1", "devIsArchived": 0}
|
|
written = self._make_action("devIsArchived", "1", target)
|
|
self.assertEqual(written, 1)
|
|
self.assertIsInstance(written, int)
|
|
|
|
def test_string_zero_cast_to_int_for_boolean_column(self):
|
|
target = {"devGUID": "guid-1", "devIsArchived": 1}
|
|
written = self._make_action("devIsArchived", "0", target)
|
|
self.assertEqual(written, 0)
|
|
self.assertIsInstance(written, int)
|
|
|
|
def test_non_boolean_column_not_cast(self):
|
|
target = {"devGUID": "guid-1", "devName": "OldName"}
|
|
written = self._make_action("devName", "NewName", target)
|
|
self.assertEqual(written, "NewName")
|
|
self.assertIsInstance(written, str)
|
|
|
|
def test_invalid_boolean_value_skips_update(self):
|
|
target = {"devGUID": "guid-1", "devIsArchived": 0}
|
|
written = self._make_action("devIsArchived", "not_an_int", target)
|
|
self.assertIsNone(written)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UpdateFieldAction — devMac conflict archive guard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdateFieldActionMacGuard(unittest.TestCase):
|
|
|
|
def test_conflicting_mac_device_archived(self):
|
|
from workflows.actions import UpdateFieldAction
|
|
trigger = MagicMock()
|
|
trigger.object = None
|
|
db = MagicMock()
|
|
|
|
conflicting = {"devGUID": "guid-conflict", "devMac": "aa:bb:cc:dd:ee:ff"}
|
|
current_guid = "guid-current"
|
|
target_device = {"devGUID": current_guid, "devMac": "11:22:33:44:55:66"}
|
|
|
|
action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
|
|
|
|
archived_guid = None
|
|
def fake_update(guid, field, value):
|
|
nonlocal archived_guid
|
|
if field == "devIsArchived":
|
|
archived_guid = guid
|
|
|
|
with patch("workflows.actions.DeviceInstance") as MockDI:
|
|
MockDI.return_value.getByMac.return_value = conflicting
|
|
MockDI.return_value.updateField.side_effect = fake_update
|
|
action.execute()
|
|
|
|
self.assertEqual(archived_guid, "guid-conflict")
|
|
|
|
def test_no_conflicting_mac_no_archive(self):
|
|
from workflows.actions import UpdateFieldAction
|
|
trigger = MagicMock()
|
|
trigger.object = None
|
|
db = MagicMock()
|
|
|
|
target_device = {"devGUID": "guid-current", "devMac": "11:22:33:44:55:66"}
|
|
action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
|
|
|
|
archived_guid = None
|
|
def fake_update(guid, field, value):
|
|
nonlocal archived_guid
|
|
if field == "devIsArchived":
|
|
archived_guid = guid
|
|
|
|
with patch("workflows.actions.DeviceInstance") as MockDI:
|
|
MockDI.return_value.getByMac.return_value = None
|
|
MockDI.return_value.updateField.side_effect = fake_update
|
|
action.execute()
|
|
|
|
self.assertIsNone(archived_guid)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cascade prevention — _mutated_guids
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCascadePrevention(unittest.TestCase):
|
|
|
|
def test_mutated_guid_blocks_event(self):
|
|
mgr = make_stub_manager()
|
|
mgr._mutated_guids.add("dev-guid-42")
|
|
|
|
event = _make_app_event(guid="evt-1", obj_guid="dev-guid-42")
|
|
# Make event dict-accessible
|
|
event = MagicMock()
|
|
event.__getitem__ = lambda s, k: {"guid": "evt-1", "objectGuid": "dev-guid-42",
|
|
"index": 1}[k]
|
|
|
|
# process_event should skip without calling execute_workflow
|
|
with patch.object(mgr, "execute_workflow") as mock_exec:
|
|
mgr.process_event(event)
|
|
mock_exec.assert_not_called()
|
|
|
|
def test_get_new_app_events_clears_mutated_guids(self):
|
|
mgr = make_stub_manager()
|
|
mgr._mutated_guids.add("some-guid")
|
|
|
|
mgr.db.sql.execute.return_value.fetchall.return_value = []
|
|
mgr.get_new_app_events()
|
|
|
|
self.assertEqual(len(mgr._mutated_guids), 0)
|
|
|
|
def test_execute_actions_adds_to_mutated_guids(self):
|
|
mgr = make_stub_manager()
|
|
|
|
target_device = {"devGUID": "guid-mutated", "devIsArchived": 0}
|
|
|
|
actions = [{"type": "update_field", "field": "devIsArchived", "value": "1"}]
|
|
|
|
trigger = MagicMock()
|
|
trigger.object = None
|
|
|
|
with patch("workflows.manager.DeviceInstance"), \
|
|
patch("workflows.actions.DeviceInstance") as MockDI:
|
|
MockDI.return_value.updateField = MagicMock()
|
|
with patch.object(mgr, "_resolve_target_devices", return_value=[target_device]):
|
|
mgr.execute_actions(actions, trigger)
|
|
|
|
self.assertIn("guid-mutated", mgr._mutated_guids)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|