Files
NetAlertX/test/backend/test_workflows.py
Jokob @NetAlertX 26b337d6a7 feat(workflows): Enhance device management with query capabilities and conflict resolution
- Added `queryByConditions` method to `DeviceInstance` for flexible device querying based on dynamic conditions.
- Introduced `interpolate_tokens` function to replace placeholders in action values with actual device data.
- Updated `UpdateFieldAction` to handle cross-device updates and archive conflicting MAC addresses.
- Implemented cascade prevention in `WorkflowManager` to avoid processing events for devices modified in the same batch.
- Added unit tests for new functionalities, including token interpolation, condition querying, and action execution.
- Created constants for device column validation to enhance security and maintainability.
- Established a structured research skill specification to guide development practices.
2026-06-16 11:40:28 +00:00

404 lines
15 KiB
Python

"""
Unit tests for Workflow Engine v2 — cross-device targeting.
Covers:
- interpolate_tokens()
- WorkflowManager.VALID_DEVICE_COLUMNS token validation
- WorkflowManager._validate_workflow_tokens()
- WorkflowManager.load_workflows() rejects invalid-token workflows
- DeviceInstance.queryByConditions()
- UpdateFieldAction boolean column casting
- UpdateFieldAction _archive_conflicting_mac guard
- WorkflowManager._mutated_guids cascade prevention
"""
import sys
import os
import json
import tempfile
import unittest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import make_db, make_device_dict, insert_device_from_dict
# ---------------------------------------------------------------------------
# Shared test helpers
# ---------------------------------------------------------------------------
def _make_app_event(guid="evt-guid-1", obj_guid="dev-guid-1", obj_type="Devices",
event_type="update", index=1):
"""Return a dict mimicking an AppEvents sqlite3.Row."""
return {
"guid": guid,
"objectGuid": obj_guid,
"objectType": obj_type,
"appEventType": event_type,
"appEventProcessed": 0,
"index": index,
}
def make_stub_manager():
"""Return a WorkflowManager with a mock DB and no workflows loaded."""
from workflows.manager import WorkflowManager
db = MagicMock()
db.sql = MagicMock()
db.sql.execute.return_value.fetchall.return_value = []
db.commitDB = MagicMock()
with patch.object(WorkflowManager, "load_workflows", return_value=[]):
mgr = WorkflowManager(db)
return mgr
# ---------------------------------------------------------------------------
# interpolate_tokens
# ---------------------------------------------------------------------------
class TestInterpolateTokens(unittest.TestCase):
def setUp(self):
from workflows.actions import interpolate_tokens
self.interpolate = interpolate_tokens
def test_replaces_known_token(self):
device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
result = self.interpolate("{{trigger.devLastIP}}", device)
self.assertEqual(result, "10.0.0.5")
def test_replaces_multiple_tokens(self):
device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
result = self.interpolate("ip={{trigger.devLastIP}} mac={{trigger.devMac}}", device)
self.assertEqual(result, "ip=10.0.0.5 mac=aa:bb:cc:dd:ee:ff")
def test_leaves_unknown_token_unchanged(self):
device = {"devLastIP": "10.0.0.5"}
result = self.interpolate("{{trigger.doesNotExist}}", device)
self.assertEqual(result, "{{trigger.doesNotExist}}")
def test_non_string_value_returned_as_is(self):
device = {}
self.assertEqual(self.interpolate(42, device), 42)
self.assertIsNone(self.interpolate(None, device))
def test_empty_device_dict_leaves_token_unchanged(self):
result = self.interpolate("{{trigger.devMac}}", {})
self.assertEqual(result, "{{trigger.devMac}}")
# ---------------------------------------------------------------------------
# Token validation
# ---------------------------------------------------------------------------
class TestValidateWorkflowTokens(unittest.TestCase):
def test_valid_token_passes(self):
mgr = make_stub_manager()
wf = {"name": "test", "actions": [{"value": "{{trigger.devLastIP}}"}]}
self.assertTrue(mgr._validate_workflow_tokens(wf))
def test_invalid_token_fails(self):
mgr = make_stub_manager()
wf = {"name": "test", "actions": [{"value": "{{trigger.ip_address}}"}]}
self.assertFalse(mgr._validate_workflow_tokens(wf))
def test_nested_invalid_token_fails(self):
mgr = make_stub_manager()
wf = {
"name": "test",
"actions": [{
"target": {
"conditions": [{"value": "{{trigger.bad_field}}"}]
}
}]
}
self.assertFalse(mgr._validate_workflow_tokens(wf))
def test_no_tokens_passes(self):
mgr = make_stub_manager()
wf = {"name": "test", "conditions": [], "actions": [{"value": "static"}]}
self.assertTrue(mgr._validate_workflow_tokens(wf))
class TestLoadWorkflowsRejectsInvalidTokens(unittest.TestCase):
def _make_manager_loading(self, raw_workflows):
"""Build a WorkflowManager whose load_workflows() reads from a temp file."""
import workflows.manager as wf_mod
from workflows.manager import WorkflowManager
with tempfile.TemporaryDirectory() as tmpdir:
wf_path = os.path.join(tmpdir, "workflows.json")
with open(wf_path, "w") as f:
json.dump(raw_workflows, f)
orig = wf_mod.fullConfFolder
wf_mod.fullConfFolder = tmpdir
try:
db = MagicMock()
with patch.object(WorkflowManager, "load_workflows", return_value=[]):
mgr = WorkflowManager(db)
mgr.workflows = mgr.load_workflows()
finally:
wf_mod.fullConfFolder = orig
return mgr
def test_valid_workflow_loaded(self):
wf = {
"name": "Valid WF", "enabled": "Yes",
"trigger": {"object_type": "Devices", "event_type": "update"},
"conditions": [],
"actions": [{"type": "update_field", "field": "devIsNew",
"value": "{{trigger.devLastIP}}"}]
}
mgr = self._make_manager_loading([wf])
self.assertEqual(len(mgr.workflows), 1)
def test_invalid_token_workflow_rejected(self):
wf = {
"name": "Bad WF", "enabled": "Yes",
"trigger": {"object_type": "Devices", "event_type": "update"},
"conditions": [],
"actions": [{"type": "update_field", "field": "devIsNew",
"value": "{{trigger.nonexistent_field}}"}]
}
mgr = self._make_manager_loading([wf])
self.assertEqual(len(mgr.workflows), 0)
# ---------------------------------------------------------------------------
# DeviceInstance.queryByConditions
# ---------------------------------------------------------------------------
class TestQueryByConditions(unittest.TestCase):
def setUp(self):
self.conn = make_db()
dev_a = make_device_dict("aa:bb:cc:dd:ee:01", devLastIP="192.168.1.10",
devGUID="guid-a", devIsArchived=0)
dev_b = make_device_dict("aa:bb:cc:dd:ee:02", devLastIP="192.168.1.10",
devGUID="guid-b", devIsArchived=0)
dev_c = make_device_dict("aa:bb:cc:dd:ee:03", devLastIP="192.168.1.20",
devGUID="guid-c", devIsArchived=0)
for d in [dev_a, dev_b, dev_c]:
insert_device_from_dict(self.conn, d)
def _instance(self):
from models.device_instance import DeviceInstance
inst = DeviceInstance()
# Patch _fetchall to use our in-memory connection
def _fetchall(q, p=()):
rows = self.conn.execute(q, p).fetchall()
return [dict(r) for r in rows]
inst._fetchall = _fetchall
return inst
def test_equals_returns_matching_devices(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"}
])
macs = {r["devMac"] for r in results}
self.assertIn("aa:bb:cc:dd:ee:01", macs)
self.assertIn("aa:bb:cc:dd:ee:02", macs)
self.assertNotIn("aa:bb:cc:dd:ee:03", macs)
def test_multiple_conditions_and_logic(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"},
{"field": "devMac", "operator": "equals", "value": "aa:bb:cc:dd:ee:01"},
])
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["devMac"], "aa:bb:cc:dd:ee:01")
def test_contains_operator(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "contains", "value": "192.168.1"}
])
self.assertEqual(len(results), 3)
def test_empty_conditions_returns_empty(self):
inst = self._instance()
results = inst.queryByConditions([])
self.assertEqual(results, [])
def test_unknown_field_skipped_returns_empty(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "nonexistent_column", "operator": "equals", "value": "x"}
])
self.assertEqual(results, [])
def test_unknown_operator_skipped_returns_empty(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "regex", "value": ".*"}
])
self.assertEqual(results, [])
# ---------------------------------------------------------------------------
# UpdateFieldAction — boolean cast
# ---------------------------------------------------------------------------
class TestUpdateFieldActionBooleanCast(unittest.TestCase):
def setUp(self):
self.conn = make_db()
dev = make_device_dict("aa:bb:cc:dd:ee:ff", devGUID="guid-1", devIsArchived=0)
insert_device_from_dict(self.conn, dev)
def _make_action(self, field, value, target_device):
from workflows.actions import UpdateFieldAction
trigger = MagicMock()
trigger.object = None
trigger.object_type = "Devices"
db = MagicMock()
action = UpdateFieldAction(db, field, value, trigger, target_device)
# Patch DeviceInstance.updateField to capture what value is written
self.written_value = None
def fake_update(guid, f, v):
self.written_value = v
with patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.updateField.side_effect = fake_update
action.execute()
return self.written_value
def test_string_one_cast_to_int_for_boolean_column(self):
target = {"devGUID": "guid-1", "devIsArchived": 0}
written = self._make_action("devIsArchived", "1", target)
self.assertEqual(written, 1)
self.assertIsInstance(written, int)
def test_string_zero_cast_to_int_for_boolean_column(self):
target = {"devGUID": "guid-1", "devIsArchived": 1}
written = self._make_action("devIsArchived", "0", target)
self.assertEqual(written, 0)
self.assertIsInstance(written, int)
def test_non_boolean_column_not_cast(self):
target = {"devGUID": "guid-1", "devName": "OldName"}
written = self._make_action("devName", "NewName", target)
self.assertEqual(written, "NewName")
self.assertIsInstance(written, str)
def test_invalid_boolean_value_skips_update(self):
target = {"devGUID": "guid-1", "devIsArchived": 0}
written = self._make_action("devIsArchived", "not_an_int", target)
self.assertIsNone(written)
# ---------------------------------------------------------------------------
# UpdateFieldAction — devMac conflict archive guard
# ---------------------------------------------------------------------------
class TestUpdateFieldActionMacGuard(unittest.TestCase):
def test_conflicting_mac_device_archived(self):
from workflows.actions import UpdateFieldAction
trigger = MagicMock()
trigger.object = None
db = MagicMock()
conflicting = {"devGUID": "guid-conflict", "devMac": "aa:bb:cc:dd:ee:ff"}
current_guid = "guid-current"
target_device = {"devGUID": current_guid, "devMac": "11:22:33:44:55:66"}
action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
archived_guid = None
def fake_update(guid, field, value):
nonlocal archived_guid
if field == "devIsArchived":
archived_guid = guid
with patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.getByMac.return_value = conflicting
MockDI.return_value.updateField.side_effect = fake_update
action.execute()
self.assertEqual(archived_guid, "guid-conflict")
def test_no_conflicting_mac_no_archive(self):
from workflows.actions import UpdateFieldAction
trigger = MagicMock()
trigger.object = None
db = MagicMock()
target_device = {"devGUID": "guid-current", "devMac": "11:22:33:44:55:66"}
action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
archived_guid = None
def fake_update(guid, field, value):
nonlocal archived_guid
if field == "devIsArchived":
archived_guid = guid
with patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.getByMac.return_value = None
MockDI.return_value.updateField.side_effect = fake_update
action.execute()
self.assertIsNone(archived_guid)
# ---------------------------------------------------------------------------
# Cascade prevention — _mutated_guids
# ---------------------------------------------------------------------------
class TestCascadePrevention(unittest.TestCase):
def test_mutated_guid_blocks_event(self):
mgr = make_stub_manager()
mgr._mutated_guids.add("dev-guid-42")
event = _make_app_event(guid="evt-1", obj_guid="dev-guid-42")
# Make event dict-accessible
event = MagicMock()
event.__getitem__ = lambda s, k: {"guid": "evt-1", "objectGuid": "dev-guid-42",
"index": 1}[k]
# process_event should skip without calling execute_workflow
with patch.object(mgr, "execute_workflow") as mock_exec:
mgr.process_event(event)
mock_exec.assert_not_called()
def test_get_new_app_events_clears_mutated_guids(self):
mgr = make_stub_manager()
mgr._mutated_guids.add("some-guid")
mgr.db.sql.execute.return_value.fetchall.return_value = []
mgr.get_new_app_events()
self.assertEqual(len(mgr._mutated_guids), 0)
def test_execute_actions_adds_to_mutated_guids(self):
mgr = make_stub_manager()
target_device = {"devGUID": "guid-mutated", "devIsArchived": 0}
actions = [{"type": "update_field", "field": "devIsArchived", "value": "1"}]
trigger = MagicMock()
trigger.object = None
with patch("workflows.manager.DeviceInstance"), \
patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.updateField = MagicMock()
with patch.object(mgr, "_resolve_target_devices", return_value=[target_device]):
mgr.execute_actions(actions, trigger)
self.assertIn("guid-mutated", mgr._mutated_guids)
if __name__ == "__main__":
unittest.main()