Merge pull request #1675 from netalertx/next_release

Next release
This commit is contained in:
Jokob @NetAlertX
2026-06-17 15:37:23 +10:00
committed by GitHub
13 changed files with 1167 additions and 133 deletions

View File

@@ -0,0 +1,131 @@
### 🧠 NAX Research Skill — Specification
This defines a **Research Skill module for NAX (NetAlertX)** focused on safe, structured analysis before any implementation work.
---
## 1. Purpose
Ensure all work begins with **documentation-first understanding**, **PRD validation**, and **conflict detection**, before any planning or coding.
---
## 2. Core Workflow
### Step 1 — Documentation First
* Always begin by reading relevant repository documentation.
* Priority order:
1. `/CONTRIBUTING.md`
2. `/README.md`
3. `/.github/skills/code-standards/SKILL.md`
4. `/docs/**`
5. Related module/code context if referenced
* Extract:
* Architecture expectations
* Coding standards
* Plugin or module conventions
* Existing workflows or constraints
---
### Step 2 — PRD Check
* If a PRD (Product Requirements Document) is NOT provided:
* Explicitly request it before proceeding further
* Do not assume requirements
* If PRD is provided:
* Parse and restate key requirements internally
* Identify scope boundaries
---
### Step 3 — Clarification Gate
If anything is unclear:
* Stop immediately
* Ask targeted clarifying questions
* Do NOT propose solutions yet
---
### Step 4 — Codebase Cross-Check
* Compare PRD + documentation against existing codebase
* Identify:
* Conflicting behavior
* Outdated patterns
* Duplicate logic
* Breaking assumptions
* Plugin or API mismatches
* Clearly report inconsistencies before proceeding
---
### Step 5 — Planning Requirement (Strict)
Before any implementation:
* Produce a structured plan including:
* Approach overview
* Files/modules affected
* Dependencies
* Risk areas
* Migration considerations (if any)
* Explicitly label:
> “WAITING FOR USER CONFIRMATION”
---
### Step 6 — Implementation Gate (Hard Rule)
* Do NOT start implementation until user explicitly confirms the plan
* No partial coding, no early patches, no assumptions
---
## 3. Behavioral Constraints
* Always prioritize correctness over speed
* Never skip PRD validation
* Never proceed past ambiguity
* Never implement without approval
* Always surface contradictions in source material
* Always prefer asking questions over guessing
---
## 4. Output Style Rules
* Be structured and technical
* Avoid unnecessary verbosity
* Separate:
* Findings
* Risks
* Questions
* Plan
* No hidden assumptions
---
## 5. Summary Flow
```
Docs → PRD → Clarify → Codebase Check → Plan → User Approval → Implement
```

1
.gitignore vendored
View File

@@ -19,6 +19,7 @@ db/pialert.db
db/app.db
front/log/*
/log/*
.gemini/internal-docs/PRDs/*
/log/plugins/*
front/api/*
/api/*

View File

@@ -54,6 +54,7 @@ Sometimes devices are manually archived (e.g., no longer expected on the network
* `devIsArchived` is `1` (archived).
* `devPresentLastScan` is `1` (device was detected in the latest scan).
* **Action**:
* Updates the device to set `devIsArchived` to `0` (unarchived).
@@ -110,6 +111,7 @@ When new devices join your network, assigning them to the correct network node i
* **Conditions**:
* `devLastIP` contains `192.168.1.` (matches subnet).
* **Action**:
* Sets `devNetworkNode` to the specified MAC address.
@@ -175,6 +177,7 @@ You may want to automatically clear out newly detected Google devices (such as C
* `devVendor` contains `Google`.
* `devIsNew` is `1` (device marked as new).
* **Actions**:
1. Sets `devIsNew` to `0` (mark as not new).
@@ -182,4 +185,70 @@ You may want to automatically clear out newly detected Google devices (such as C
### ✅ Result
Any newly detected Google devices are cleaned up instantly — first marked as not new, then deleted — helping you avoid clutter in your device records.
Any newly detected Google devices are cleaned up instantly — first marked as not new, then deleted — helping you avoid clutter in your device records.
---
## Example 4: On new device discovery archive the old device with the same ip
This workflow automatically archives devices if a new device is discovered with an already assigned IP.
### 📋 Use Case
This workflow is useful if you are assigning static IPs to your devices. This workflow can also help with archiving device entries with [random MAC addresses](./RANDOM_MAC.md).
### ⚙️ Workflow Configuration
```json
{
"name": "Archive device with same ip",
"trigger": {
"object_type": "Devices",
"event_type": "insert"
},
"conditions": [
{
"logic": "AND",
"conditions": []
}
],
"actions": [
{
"type": "update_field",
"field": "devIsArchived",
"value": "1",
"target": {
"strategy": "query",
"conditions": [
{
"field": "devLastIP",
"operator": "equals",
"value": "{{trigger.devLastIP}}"
}
]
}
}
]
}
```
### 🔍 Explanation
* **Trigger**: Runs on a new device being inserted.
* **Conditions**:
* `N/A`
* **Target Conditions**:
* `devLastIP` of the target is the same as the newly discovered device's `devLastIP` value.
* **Actions**:
1. Sets `devIsArchived` to `1` (mark target device as archived).
### ✅ Result
Any newly detected device that has the same IP as an existing device will automatically trigger the archival of the old device.

View File

@@ -2338,6 +2338,17 @@ textarea[readonly],
padding: 5px;
}
.workflows .add-target-condition
{
margin: 10px;
text-align: center;
}
.workflows .inline-hint
{
margin: 5px;
}
.workflows
{
max-width: 800px;
@@ -2430,6 +2441,18 @@ textarea[readonly],
color: var(--color-green) !important;
}
.workflows .action-target-conditions
{
opacity: 0.8;
}
.workflows .bckg-icon-base
{
display: block;
position: absolute;
opacity: 0.1;
right: 0.1em;
}
.workflows .bckg-icon-1-line
{
font-size: 3em;

View File

@@ -742,6 +742,9 @@
"VERSION_name": "Version or timestamp",
"WF_Action_Add": "Add Action",
"WF_Action_field": "Field",
"WF_Action_target": "Apply action to",
"WF_Action_target_conditions": "Target device conditions",
"WF_Action_token_hint": "Use <code>{{trigger.COLUMN}}</code> to reference the triggering device (e.g. <code>{{trigger.devLastIP}}</code>, <code>{{trigger.devMac}}</code>)",
"WF_Action_type": "Type",
"WF_Action_value": "Value",
"WF_Actions": "Actions",

View File

@@ -295,8 +295,86 @@ function generateWorkflowUI(wf, wfIndex) {
});
// Dropdown for action.type
let $actionDropdown= createEditableDropdown(
// how big should the background icon be — computed after all content decisions
let numberOfLines = 1
// ------------------------------------------------------------------
// Target selector — shown first so user picks the target before the action
// Applies to update_field and delete_device actions
// ------------------------------------------------------------------
if (action.type == "update_field" || action.type == "delete_device") {
let currentStrategy = (action.target && action.target.strategy) ? action.target.strategy : "triggering_device";
let $targetDropdown = createEditableDropdown(
`[${wfIndex}].actions[${actionIndex}].target.strategy`,
getString("WF_Action_target"),
["triggering_device", "query"],
currentStrategy,
`wf-${wfIndex}-actionIndex-${actionIndex}-target-strategy`
);
$actionEl.append($targetDropdown);
// Conditional query conditions sub-form
let $targetConditionsWrap = $("<div>", {
class: `action-target-conditions panel col-sm-12 col-sx-12 ${currentStrategy === "query" ? "" : "hidden"}`,
id: `wf-${wfIndex}-actionIndex-${actionIndex}-target-conditions-wrap`
});
let $targetConditionsTitle = $("<div>", { class: "section-title" })
.append($("<i>", { class: "fa-solid fa-crosshairs" }))
.append(` ${getString("WF_Action_target_conditions")}:`);
let $tokenHint = $("<div>", { class: "text-muted inline-hint small col-sm-12 col-xs-12" })
.html(getString("WF_Action_token_hint"));
$targetConditionsWrap.append($targetConditionsTitle);
$targetConditionsWrap.append($tokenHint);
let targetConditions = (action.target && action.target.conditions) ? action.target.conditions : [];
let targetBasePath = `[${wfIndex}].actions[${actionIndex}].target`;
$.each(targetConditions, function(tcIdx, tc) {
let $tcRow = createTargetConditionRow(wfIndex, actionIndex, tcIdx, tc, targetBasePath);
$targetConditionsWrap.append($tcRow);
});
let $addTargetCondBtn = $("<div>", {
class: "pointer add-target-condition green-hover-text col-sm-12",
wfIndex: wfIndex,
actionIndex: actionIndex
}).append($("<i>", { class: "fa-solid fa-plus" })).append(` ${getString("WF_Add_Condition")}`);
$targetConditionsWrap.append($addTargetCondBtn);
$actionEl.append($targetConditionsWrap);
// Show/hide conditions sub-form when strategy dropdown changes
$targetDropdown.find("select").on("change", function() {
let val = $(this).val();
let $wrap = $(`#wf-${wfIndex}-actionIndex-${actionIndex}-target-conditions-wrap`);
if (val === "query") {
$wrap.removeClass("hidden");
} else {
$wrap.addClass("hidden");
// Strip target.conditions from the in-memory object when switching away from query
let wfs = getWorkflowsJson();
if (wfs[wfIndex] && wfs[wfIndex].actions[actionIndex] && wfs[wfIndex].actions[actionIndex].target) {
delete wfs[wfIndex].actions[actionIndex].target.conditions;
}
updateWorkflowsJson(wfs);
}
});
// numberOfLines: 1 (target dropdown) = 1 base for both action types
// query mode adds: 1 (section title+hint) + N×3 (each condition: field/op/value) + 1 (add btn)
let conditionLines = currentStrategy === "query"
? 2 + (targetConditions.length * 3)
: 0;
numberOfLines = 1 + conditionLines;
}
// Dropdown for action.type — rendered after target so user reads: who → what
let $actionDropdown = createEditableDropdown(
`[${wfIndex}].actions[${actionIndex}].type`,
getString("WF_Action_type"),
actionTypes,
@@ -304,15 +382,13 @@ function generateWorkflowUI(wf, wfIndex) {
`wf-${wfIndex}-actionIndex-${actionIndex}-type`
);
$actionEl.append($actionDropdown);
// how big should the background icon be
let numberOfLines = 1
numberOfLines += 1;
if(action.type == "update_field")
{
numberOfLines = 3
// +2 for field dropdown and value input rows
numberOfLines += 2;
// Dropdown for action.field
let $fieldDropdown = createEditableDropdown(
@@ -356,7 +432,8 @@ function generateWorkflowUI(wf, wfIndex) {
$actionRemoveButtonWrap.append($actionRemoveButton);
let $actionIcon = $("<i>", {
class: `fa-solid fa-person-running fa-flip-horizontal bckg-icon-${numberOfLines}-line `
class: `fa-solid fa-person-running fa-flip-horizontal bckg-icon-base`,
style: `font-size: ${numberOfLines * 3}em`
});
$actionEl.prepend($actionIcon)
@@ -721,6 +798,57 @@ function createEditableInput(jsonPath, labelText, value, id, className = "") {
return $wrapper;
}
// --------------------------------------
// Render a single row in a target-conditions sub-form (cross-device query targeting v2)
function createTargetConditionRow(wfIndex, actionIndex, tcIdx, tc, targetBasePath) {
let basePath = `${targetBasePath}.conditions[${tcIdx}]`;
let $row = $("<div>", { class: "panel col-sm-12 col-sx-12 target-condition-row" });
let $icon = $("<i>", { class: "fa-solid fa-crosshairs bckg-icon-3-line" });
$row.append($icon);
let $inner = $("<div>", { class: "col-sm-11 col-sx-12" });
let $fieldDropdown = createEditableDropdown(
`${basePath}.field`,
getString("WF_Condition_field"),
fieldOptions,
tc.field || "",
`wf-${wfIndex}-act-${actionIndex}-tc-${tcIdx}-field`
);
let $operatorDropdown = createEditableDropdown(
`${basePath}.operator`,
getString("WF_Condition_operator"),
operatorTypes,
tc.operator || "equals",
`wf-${wfIndex}-act-${actionIndex}-tc-${tcIdx}-operator`
);
let $valueInput = createEditableInput(
`${basePath}.value`,
getString("WF_Condition_value"),
tc.value || "",
`wf-${wfIndex}-act-${actionIndex}-tc-${tcIdx}-value`,
"condition-value-input"
);
$inner.append($fieldDropdown).append($operatorDropdown).append($valueInput);
let $removeWrap = $("<div>", { class: "button-container col-sm-1 col-sx-12" });
let $removeBtn = $("<div>", {
class: "pointer red-hover-text remove-target-condition",
wfIndex: wfIndex,
actionIndex: actionIndex,
tcIdx: tcIdx
}).append($("<i>", { class: "fa-solid fa-trash" }));
$removeWrap.append($removeBtn);
$row.append($inner).append($removeWrap);
return $row;
}
// --------------------------------------
// Updating the in-memory workflow object
function updateWorkflowObject(newValue, jsonPath) {
@@ -1097,15 +1225,23 @@ function getEmptyWorkflowJson()
// Save workflows JSON
function saveWorkflows()
{
showSpinner();
// encode for import
appConfBase64 = btoa(JSON.stringify(getWorkflowsJson()))
// import
$.post('php/server/query_replace_config.php', { base64data: appConfBase64, fileName: "workflows.json" }, function(msg) {
console.log(msg);
// showMessage(msg);
write_notification(`[WF]: ${msg}`, 'interrupt');
});
$.post('php/server/query_replace_config.php', { base64data: appConfBase64, fileName: "workflows.json" })
.done(function(msg) {
console.log(msg);
write_notification(`[WF]: ${msg}`, 'interrupt');
})
.fail(function(jqXHR, textStatus, errorThrown) {
console.warn("Failed to save workflows.json:", textStatus, errorThrown);
write_notification(`[WF]: Save failed (${textStatus})`, 'interrupt');
})
.always(function() {
hideSpinner();
});
}
// ---------------------------------------------------
@@ -1168,6 +1304,42 @@ $(document).on("click", ".remove-action", function () {
removeAction(getWorkflowsJson(), wfIndex, actionIndex);
});
// Event Listeners for target condition rows (v2 cross-device targeting)
$(document).on("click", ".add-target-condition", function () {
let wfIndex = parseInt($(this).attr("wfIndex"), 10);
let actionIndex = parseInt($(this).attr("actionIndex"), 10);
let wfs = getWorkflowsJson();
if (!wfs[wfIndex].actions[actionIndex].target) {
wfs[wfIndex].actions[actionIndex].target = { strategy: "query", conditions: [] };
}
if (!wfs[wfIndex].actions[actionIndex].target.conditions) {
wfs[wfIndex].actions[actionIndex].target.conditions = [];
}
wfs[wfIndex].actions[actionIndex].target.conditions.push({
field: fieldOptions[0],
operator: "equals",
value: ""
});
updateWorkflowsJson(wfs);
renderWorkflows();
});
$(document).on("click", ".remove-target-condition", function () {
let wfIndex = parseInt($(this).attr("wfIndex"), 10);
let actionIndex = parseInt($(this).attr("actionIndex"), 10);
let tcIdx = parseInt($(this).attr("tcIdx"), 10);
let wfs = getWorkflowsJson();
if (wfs[wfIndex].actions[actionIndex].target && wfs[wfIndex].actions[actionIndex].target.conditions) {
wfs[wfIndex].actions[actionIndex].target.conditions.splice(tcIdx, 1);
updateWorkflowsJson(wfs);
renderWorkflows();
}
});
// Event Listeners for Removing Condition Groups
$(document).on("click", ".remove-condition-group", function () {
let wfIndex = $(this).attr("wfindex");

View File

@@ -18,6 +18,7 @@ from db.authoritative_handler import (
unlock_fields
)
from helper import is_random_mac, get_setting_value
from workflows.constants import VALID_DEVICE_COLUMNS
from utils.datetime_utils import timeNowUTC
@@ -85,6 +86,11 @@ class DeviceInstance:
SELECT * FROM Devices WHERE devGUID = ?
""", (devGUID,))
def getByMac(self, mac):
return self._fetchone("""
SELECT * FROM Devices WHERE devMac = ?
""", (mac,))
def exists(self, devGUID):
row = self._fetchone("""
SELECT COUNT(*) as count FROM Devices WHERE devGUID = ?
@@ -96,6 +102,49 @@ class DeviceInstance:
SELECT * FROM Devices WHERE devLastIP = ?
""", (ip,))
def queryByConditions(self, conditions):
"""Query Devices using a list of condition dicts.
Each condition dict must have ``field``, ``operator``, and ``value`` keys.
Supported operators: ``equals``, ``contains``.
Returns a list of device dicts (may be empty). Only fields present in
the Devices schema are accepted; unrecognised fields are skipped with a
warning to prevent SQL injection.
"""
clauses = []
params = []
for cond in conditions:
field = cond.get("field", "")
operator = cond.get("operator", "")
value = cond.get("value", "")
if field not in VALID_DEVICE_COLUMNS:
mylog("none", [f"[WF] queryByConditions: unknown field '{field}' — skipped"])
continue
# Normalize MAC values before comparison to match stored format
if field == "devMac" and value:
value = normalize_mac(value)
if operator == "equals":
clauses.append(f"{field} = ?")
params.append(value)
elif operator == "contains":
clauses.append(f"{field} LIKE ?")
params.append(f"%{value}%")
else:
mylog("none", [f"[WF] queryByConditions: unsupported operator '{operator}' — skipped"])
continue
if not clauses:
mylog("none", ["[WF] queryByConditions: no valid conditions — returning empty result"])
return []
where = " AND ".join(clauses)
return self._fetchall(f"SELECT * FROM Devices WHERE {where}", tuple(params))
def search(self, query):
like = f"%{query}%"
return self._fetchall("""

View File

@@ -1,13 +1,31 @@
import sqlite3
from logger import mylog, Logger
from helper import get_setting_value
from front.plugins.plugin_helper import normalize_mac
from models.device_instance import DeviceInstance
from models.plugin_object_instance import PluginObjectInstance
from workflows.constants import BOOLEAN_COLUMNS, TOKEN_RE
# Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL"))
def interpolate_tokens(value, trigger_device):
"""Replace every ``{{trigger.COLUMN}}`` placeholder in *value* with the
corresponding field from *trigger_device* (a plain dict).
Unknown columns are left as-is so callers can log them separately.
"""
if not isinstance(value, str):
return value
def _replace(match):
col = match.group(1)
return str(trigger_device.get(col, match.group(0)))
return TOKEN_RE.sub(_replace, value)
class Action:
"""Base class for all actions."""
@@ -28,90 +46,122 @@ class Action:
class UpdateFieldAction(Action):
"""Action to update a specific field of an object."""
"""Action to update a specific field of a device.
def __init__(self, db, field, value, trigger):
When *target_device* is supplied the action operates on that device rather
than the one that raised the event, enabling cross-device targeting (v2).
*trigger* is still required for context / logging.
"""
def __init__(self, db, field, value, trigger, target_device=None):
super().__init__(trigger)
self.field = field
self.value = value
self.db = db
self.target_device = target_device
def execute(self):
mylog("verbose", f"[WF] Updating field '{self.field}' to '{self.value}' for event object {self.trigger.object_type}")
# Resolve the device to operate on
obj = self.target_device if self.target_device is not None else self.get_object()
obj = self.get_object()
if isinstance(obj, sqlite3.Row):
obj = dict(obj)
if obj is None:
mylog("none", "[WF] Object no longer exists")
mylog("none", "[WF] UpdateFieldAction: target device no longer exists")
return None
if isinstance(obj, dict) and "objectGuid" in obj:
# Interpolate {{trigger.X}} tokens in the value using the triggering device
trigger_obj = self.get_object() or {}
final_value = interpolate_tokens(self.value, trigger_obj)
# Cast to int for boolean CHECK columns to satisfy SQLite constraints
if self.field in BOOLEAN_COLUMNS:
try:
final_value = int(final_value)
except (ValueError, TypeError):
mylog("none", [f"[WF] Cannot cast value '{final_value}' to int for boolean field '{self.field}' — skipping"])
return None
mylog("verbose", f"[WF] Updating field '{self.field}' to '{final_value}' on device {obj.get('devGUID', '?')}")
if "objectGuid" in obj:
mylog("debug", f"[WF] Updating Object '{obj}'")
PluginObjectInstance().updateField(
obj["objectGuid"],
self.field,
self.value,
)
PluginObjectInstance().updateField(obj["objectGuid"], self.field, final_value)
return obj
if isinstance(obj, dict) and "devGUID" in obj:
mylog("debug", f"[WF] Updating Device '{obj}'")
DeviceInstance().updateField(
obj["devGUID"],
self.field,
self.value,
)
if "devGUID" in obj:
# Guard: if mutating devMac, normalize the value and archive any
# existing device already holding that MAC before writing to avoid
# a PK UNIQUE constraint violation.
if self.field == "devMac":
final_value = normalize_mac(final_value)
self._archive_conflicting_mac(final_value, obj["devGUID"])
mylog("debug", f"[WF] Updating Device '{obj.get('devGUID')}'")
DeviceInstance().updateField(obj["devGUID"], self.field, final_value)
return obj
mylog("none", f"[WF] Unsupported object format: {obj}")
mylog("none", f"[WF] UpdateFieldAction: unsupported object format: {obj}")
return None
def _archive_conflicting_mac(self, new_mac, current_guid):
"""If another device already holds *new_mac*, archive it before the
primary-key mutation so SQLite's UNIQUE constraint is not violated."""
normalized = normalize_mac(new_mac)
existing = DeviceInstance().getByMac(normalized)
if existing and existing.get("devGUID") != current_guid:
mylog("none", [
f"[WF] Archiving conflicting device {existing['devGUID']} "
f"(MAC {normalized}) before devMac update"
])
DeviceInstance().updateField(existing["devGUID"], "devIsArchived", 1)
class DeleteObjectAction(Action):
"""Action to delete an object."""
"""Action to delete a device or plugin object.
def __init__(self, db, trigger):
When *target_device* is supplied the action deletes that device rather than
the one that raised the event, enabling cross-device targeting (v2).
"""
def __init__(self, db, trigger, target_device=None):
super().__init__(trigger)
self.db = db
self.target_device = target_device
def execute(self):
mylog("verbose", f"[WF] Deleting event object {self.trigger.object_type}")
obj = self.target_device if self.target_device is not None else self.get_object()
obj = self.get_object()
if isinstance(obj, sqlite3.Row):
obj = dict(obj)
if obj is None:
mylog("none", "[WF] Object no longer exists")
mylog("none", "[WF] DeleteObjectAction: target device no longer exists")
return None
if isinstance(obj, dict) and "objectGuid" in obj:
mylog("verbose", f"[WF] Deleting device {obj.get('devGUID', obj.get('objectGuid', '?'))}")
if "objectGuid" in obj:
mylog("debug", f"[WF] Deleting Object '{obj}'")
PluginObjectInstance().delete(obj["objectGuid"])
return obj
if isinstance(obj, dict) and "devGUID" in obj:
mylog("debug", f"[WF] Deleting Device '{obj}'")
if "devGUID" in obj:
mylog("debug", f"[WF] Deleting Device '{obj.get('devGUID')}'")
DeviceInstance().delete(obj["devGUID"])
return obj
mylog("none", f"[WF] Unsupported object format: {obj}")
mylog("none", f"[WF] DeleteObjectAction: unsupported object format: {obj}")
return None
class RunPluginAction(Action):
"""Action to run a specific plugin."""
def __init__(self, plugin_name, params, trigger):
def __init__(self, db, plugin_name, params, trigger):
super().__init__(trigger)
self.db = db
self.plugin_name = plugin_name
self.params = params

View File

@@ -162,7 +162,28 @@ class AppEvent_obj:
self.db.commitDB()
# Manage prefixes of column names
# ---------------------------------------------------------------------------
# AppEvents query helpers
# ---------------------------------------------------------------------------
def get_unprocessed(db):
"""Return all unprocessed AppEvents rows ordered by creation time."""
return db.sql.execute("""
SELECT * FROM AppEvents
WHERE appEventProcessed = 0
ORDER BY dateTimeCreated ASC
""").fetchall()
def mark_processed(db, event_index):
"""Mark a single AppEvent row as processed and commit."""
db.sql.execute(
'UPDATE AppEvents SET appEventProcessed = 1 WHERE "index" = ?',
(event_index,),
)
db.commitDB()
def manage_prefix(field, event):
if event == "delete":
return field.replace("NEW.", "OLD.")

View File

@@ -0,0 +1,41 @@
"""
Shared constants for the workflow engine.
Centralised here so that manager, actions, and models can all import from a
single source of truth rather than duplicating schema knowledge across files.
"""
import re
# ---------------------------------------------------------------------------
# Devices table column whitelist
# ---------------------------------------------------------------------------
# Every column present in the Devices table schema. Used in two ways:
# 1. Token validation — {{trigger.COLUMN}} tokens are rejected at workflow
# load time if COLUMN is not in this set.
# 2. Query safety — queryByConditions() refuses to build WHERE clauses for
# columns not in this set, preventing SQL injection via workflow JSON.
VALID_DEVICE_COLUMNS = frozenset([
"devMac", "devName", "devOwner", "devType", "devVendor", "devFavorite",
"devGroup", "devComments", "devFirstConnection", "devLastConnection",
"devLastIP", "devPrimaryIPv4", "devPrimaryIPv6", "devVlan", "devForceStatus",
"devStaticIP", "devScan", "devLogEvents", "devAlertEvents", "devAlertDown",
"devSkipRepeated", "devLastNotification", "devPresentLastScan", "devIsNew",
"devLocation", "devIsArchived", "devParentMAC", "devParentPort",
"devParentRelType", "devIcon", "devGUID", "devSite", "devSSID",
"devSyncHubNode", "devSourcePlugin", "devFQDN", "devMacSource",
"devNameSource", "devFQDNSource", "devLastIPSource", "devVendorSource",
"devSSIDSource", "devParentMACSource", "devParentPortSource",
"devParentRelTypeSource", "devVlanSource", "devCustomProps",
])
# Devices table columns whose CHECK constraint requires a strict integer 0 or 1.
# Values destined for these columns are cast to int before being written to DB.
BOOLEAN_COLUMNS = frozenset([
"devFavorite", "devStaticIP", "devLogEvents", "devAlertEvents",
"devAlertDown", "devPresentLastScan", "devIsNew", "devIsArchived",
])
# Compiled regex for {{trigger.COLUMN_NAME}} token substitution and validation.
TOKEN_RE = re.compile(r"\{\{trigger\.(\w+)\}\}")

View File

@@ -1,11 +1,15 @@
import json
import sqlite3
from const import fullConfFolder
from logger import mylog, Logger
from helper import get_setting_value
from models.device_instance import DeviceInstance
from workflows.constants import VALID_DEVICE_COLUMNS, TOKEN_RE
from workflows.app_events import get_unprocessed, mark_processed
from workflows.triggers import Trigger
from workflows.conditions import ConditionGroup
from workflows.actions import DeleteObjectAction, RunPluginAction, UpdateFieldAction
from workflows.actions import DeleteObjectAction, RunPluginAction, UpdateFieldAction, interpolate_tokens
# Make sure log level is initialized correctly
@@ -17,140 +21,185 @@ class WorkflowManager:
self.db = db
self.workflows = self.load_workflows()
self.update_api = False
# Tracks devGUIDs mutated by workflow actions within the current event batch.
# Events whose objectGuid appears here are skipped to prevent cascade loops.
# Cleared at the start of each new event batch via get_new_app_events().
self._mutated_guids = set()
# -------------------------------------------------------------------------
# Token validation
def _validate_workflow_tokens(self, workflow):
"""Recursively scan a workflow dict for {{trigger.X}} tokens.
Returns True if every token maps to a valid Devices column."""
def _scan(node):
if isinstance(node, str):
for col in TOKEN_RE.findall(node):
if col not in VALID_DEVICE_COLUMNS:
mylog("none", [
f"[WF] Invalid token '{{{{trigger.{col}}}}}' in workflow "
f"'{workflow.get('name', '?')}' — must be a valid Devices column"
])
return False
return True
if isinstance(node, dict):
return all(_scan(v) for v in node.values())
if isinstance(node, list):
return all(_scan(item) for item in node)
return True
return _scan(workflow)
# -------------------------------------------------------------------------
# Loading
def load_workflows(self):
"""Load workflows from workflows.json."""
"""Load workflows from workflows.json, rejecting any with invalid tokens."""
try:
workflows_json_path = fullConfFolder + "/workflows.json"
with open(workflows_json_path, "r") as f:
workflows = json.load(f)
return workflows
raw = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
mylog("none", ["[WF] Failed to load workflows.json"])
return []
valid = []
for wf in raw:
if self._validate_workflow_tokens(wf):
valid.append(wf)
else:
mylog("none", [f"[WF] Workflow '{wf.get('name', '?')}' rejected — contains invalid trigger tokens"])
return valid
# -------------------------------------------------------------------------
# Event fetching
def get_new_app_events(self):
"""Get new unprocessed events from the AppEvents table."""
result = self.db.sql.execute("""
SELECT * FROM AppEvents
WHERE appEventProcessed = 0
ORDER BY dateTimeCreated ASC
""").fetchall()
"""Get new unprocessed events from the AppEvents table.
Resets _mutated_guids to start a fresh cascade-prevention window for this batch."""
self._mutated_guids.clear()
result = get_unprocessed(self.db)
mylog("none", [f"[WF] get_new_app_events - new events count: {len(result)}"])
return result
def process_event(self, event):
"""Process the events. Check if events match a workflow trigger"""
# -------------------------------------------------------------------------
# Event processing
def process_event(self, event):
"""Process one AppEvent against all enabled workflows."""
evGuid = event["guid"]
obj_guid = event["objectGuid"]
# Cascade prevention: skip events for devices already mutated this batch
if obj_guid in self._mutated_guids:
mylog("debug", [f"[WF] Skipping event {evGuid} — device {obj_guid} was mutated by a workflow in this batch"])
mark_processed(self.db, event["index"])
return
mylog("verbose", [f"[WF] Processing event with GUID {evGuid}"])
# Check if the trigger conditions match
for workflow in self.workflows:
# Ensure workflow is enabled before proceeding
if workflow.get("enabled", "No").lower() == "yes":
wfName = workflow["name"]
mylog("debug", f"[WF] Checking if '{evGuid}' triggers the workflow '{wfName}'")
# construct trigger object which also evaluates if the current event triggers it
trigger = Trigger(workflow["trigger"], event, self.db)
if trigger.triggered:
mylog("verbose", f"[WF] Event with GUID '{evGuid}' triggered the workflow '{wfName}'")
self.execute_workflow(workflow, trigger)
# After processing the event, mark the event as processed (set AppEventProcessed to 1)
self.db.sql.execute(
"""
UPDATE AppEvents
SET appEventProcessed = 1
WHERE "index" = ?
""",
(event["index"],),
) # Pass the event's unique identifier
self.db.commitDB()
mark_processed(self.db, event["index"])
# -------------------------------------------------------------------------
# Workflow execution
def execute_workflow(self, workflow, trigger):
"""Execute the actions in the given workflow if conditions are met."""
"""Execute workflow actions if any condition group evaluates to True."""
wfName = workflow["name"]
# Ensure conditions exist
if not isinstance(workflow.get("conditions"), list):
m = "[WF] workflow['conditions'] must be a list"
mylog("none", [m])
raise ValueError(m)
# Evaluate each condition group separately
for condition_group in workflow["conditions"]:
evaluator = ConditionGroup(condition_group)
if evaluator.evaluate(trigger): # If any group evaluates to True
if evaluator.evaluate(trigger):
mylog("none", f"[WF] Workflow {wfName} will be executed - conditions were evaluated as TRUE")
mylog("debug", [f"[WF] Workflow condition_group: {condition_group}"])
self.execute_actions(workflow["actions"], trigger)
return # Stop if a condition group succeeds
return
mylog("none", ["[WF] No condition group matched. Actions not executed."])
def _resolve_target_devices(self, action, trigger_device):
"""Return the list of device dicts that the action should be applied to.
- No ``target`` key or ``strategy == "triggering_device"`` → legacy behaviour,
targets only the device that raised the event.
- ``strategy == "query"`` → query the Devices table using the action's
nested conditions (with {{trigger.X}} tokens already interpolated).
"""
target_block = action.get("target", {})
strategy = target_block.get("strategy", "triggering_device")
if strategy == "triggering_device":
return [trigger_device] if trigger_device is not None else []
if strategy == "query":
raw_conditions = target_block.get("conditions", [])
compiled_conditions = []
for cond in raw_conditions:
compiled = dict(cond)
compiled["value"] = interpolate_tokens(cond["value"], trigger_device or {})
compiled_conditions.append(compiled)
return DeviceInstance().queryByConditions(compiled_conditions)
mylog("none", [f"[WF] Unknown target strategy '{strategy}' — skipping action"])
return []
def execute_actions(self, actions, trigger):
"""Execute the actions defined in a workflow."""
"""Execute all actions defined in a workflow against their resolved targets."""
# Normalise trigger object to a plain dict for token operations
trigger_obj = trigger.object
if isinstance(trigger_obj, sqlite3.Row):
trigger_obj = dict(trigger_obj)
for action in actions:
if action["type"] == "update_field":
field = action["field"]
value = action["value"]
action_instance = UpdateFieldAction(self.db, field, value, trigger)
# indicate if the api has to be updated
self.update_api = True
action_type = action["type"]
elif action["type"] == "run_plugin":
plugin_name = action["plugin"]
params = action["params"]
action_instance = RunPluginAction(self.db, plugin_name, params, trigger)
# run_plugin does not support query targeting — always uses the trigger context
if action_type == "run_plugin":
RunPluginAction(self.db, action["plugin"], action["params"], trigger).execute()
continue
elif action["type"] == "delete_device":
action_instance = DeleteObjectAction(self.db, trigger)
target_devices = self._resolve_target_devices(action, trigger_obj)
# elif action["type"] == "send_notification":
# method = action["method"]
# message = action["message"]
# action_instance = SendNotificationAction(method, message, trigger)
if not target_devices:
mylog("debug", [f"[WF] No target devices matched for action '{action_type}'"])
continue
else:
m = f"[WF] Unsupported action type: {action['type']}"
mylog("none", [m])
raise ValueError(m)
for target_device in target_devices:
if action_type == "update_field":
action_instance = UpdateFieldAction(
self.db, action["field"], action["value"], trigger, target_device
)
self.update_api = True
action_instance.execute() # Execute the action
elif action_type == "delete_device":
action_instance = DeleteObjectAction(self.db, trigger, target_device)
# if result:
# # Iterate through actions and execute them
# for action in workflow["actions"]:
# if action["type"] == "update_field":
# # Action type is "update_field", so map to UpdateFieldAction
# field = action["field"]
# value = action["value"]
# action_instance = UpdateFieldAction(field, value)
# action_instance.execute(trigger.event)
else:
m = f"[WF] Unsupported action type: {action_type}"
mylog("none", [m])
raise ValueError(m)
# elif action["type"] == "run_plugin":
# # Action type is "run_plugin", so map to RunPluginAction
# plugin_name = action["plugin"]
# params = action["params"]
# action_instance = RunPluginAction(plugin_name, params)
# action_instance.execute(trigger.event)
# elif action["type"] == "send_notification":
# # Action type is "send_notification", so map to SendNotificationAction
# method = action["method"]
# message = action["message"]
# action_instance = SendNotificationAction(method, message)
# action_instance.execute(trigger.event)
# else:
# # Handle unsupported action types
# raise ValueError(f"Unsupported action type: {action['type']}")
action_instance.execute()
# Record this device's GUID so cascade events are suppressed in this batch
if isinstance(target_device, dict) and target_device.get("devGUID"):
self._mutated_guids.add(target_device["devGUID"])

View File

@@ -0,0 +1,403 @@
"""
Unit tests for Workflow Engine v2 — cross-device targeting.
Covers:
- interpolate_tokens()
- WorkflowManager.VALID_DEVICE_COLUMNS token validation
- WorkflowManager._validate_workflow_tokens()
- WorkflowManager.load_workflows() rejects invalid-token workflows
- DeviceInstance.queryByConditions()
- UpdateFieldAction boolean column casting
- UpdateFieldAction _archive_conflicting_mac guard
- WorkflowManager._mutated_guids cascade prevention
"""
import sys
import os
import json
import tempfile
import unittest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import make_db, make_device_dict, insert_device_from_dict
# ---------------------------------------------------------------------------
# Shared test helpers
# ---------------------------------------------------------------------------
def _make_app_event(guid="evt-guid-1", obj_guid="dev-guid-1", obj_type="Devices",
event_type="update", index=1):
"""Return a dict mimicking an AppEvents sqlite3.Row."""
return {
"guid": guid,
"objectGuid": obj_guid,
"objectType": obj_type,
"appEventType": event_type,
"appEventProcessed": 0,
"index": index,
}
def make_stub_manager():
"""Return a WorkflowManager with a mock DB and no workflows loaded."""
from workflows.manager import WorkflowManager
db = MagicMock()
db.sql = MagicMock()
db.sql.execute.return_value.fetchall.return_value = []
db.commitDB = MagicMock()
with patch.object(WorkflowManager, "load_workflows", return_value=[]):
mgr = WorkflowManager(db)
return mgr
# ---------------------------------------------------------------------------
# interpolate_tokens
# ---------------------------------------------------------------------------
class TestInterpolateTokens(unittest.TestCase):
def setUp(self):
from workflows.actions import interpolate_tokens
self.interpolate = interpolate_tokens
def test_replaces_known_token(self):
device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
result = self.interpolate("{{trigger.devLastIP}}", device)
self.assertEqual(result, "10.0.0.5")
def test_replaces_multiple_tokens(self):
device = {"devLastIP": "10.0.0.5", "devMac": "aa:bb:cc:dd:ee:ff"}
result = self.interpolate("ip={{trigger.devLastIP}} mac={{trigger.devMac}}", device)
self.assertEqual(result, "ip=10.0.0.5 mac=aa:bb:cc:dd:ee:ff")
def test_leaves_unknown_token_unchanged(self):
device = {"devLastIP": "10.0.0.5"}
result = self.interpolate("{{trigger.doesNotExist}}", device)
self.assertEqual(result, "{{trigger.doesNotExist}}")
def test_non_string_value_returned_as_is(self):
device = {}
self.assertEqual(self.interpolate(42, device), 42)
self.assertIsNone(self.interpolate(None, device))
def test_empty_device_dict_leaves_token_unchanged(self):
result = self.interpolate("{{trigger.devMac}}", {})
self.assertEqual(result, "{{trigger.devMac}}")
# ---------------------------------------------------------------------------
# Token validation
# ---------------------------------------------------------------------------
class TestValidateWorkflowTokens(unittest.TestCase):
def test_valid_token_passes(self):
mgr = make_stub_manager()
wf = {"name": "test", "actions": [{"value": "{{trigger.devLastIP}}"}]}
self.assertTrue(mgr._validate_workflow_tokens(wf))
def test_invalid_token_fails(self):
mgr = make_stub_manager()
wf = {"name": "test", "actions": [{"value": "{{trigger.ip_address}}"}]}
self.assertFalse(mgr._validate_workflow_tokens(wf))
def test_nested_invalid_token_fails(self):
mgr = make_stub_manager()
wf = {
"name": "test",
"actions": [{
"target": {
"conditions": [{"value": "{{trigger.bad_field}}"}]
}
}]
}
self.assertFalse(mgr._validate_workflow_tokens(wf))
def test_no_tokens_passes(self):
mgr = make_stub_manager()
wf = {"name": "test", "conditions": [], "actions": [{"value": "static"}]}
self.assertTrue(mgr._validate_workflow_tokens(wf))
class TestLoadWorkflowsRejectsInvalidTokens(unittest.TestCase):
def _make_manager_loading(self, raw_workflows):
"""Build a WorkflowManager whose load_workflows() reads from a temp file."""
import workflows.manager as wf_mod
from workflows.manager import WorkflowManager
with tempfile.TemporaryDirectory() as tmpdir:
wf_path = os.path.join(tmpdir, "workflows.json")
with open(wf_path, "w") as f:
json.dump(raw_workflows, f)
orig = wf_mod.fullConfFolder
wf_mod.fullConfFolder = tmpdir
try:
db = MagicMock()
with patch.object(WorkflowManager, "load_workflows", return_value=[]):
mgr = WorkflowManager(db)
mgr.workflows = mgr.load_workflows()
finally:
wf_mod.fullConfFolder = orig
return mgr
def test_valid_workflow_loaded(self):
wf = {
"name": "Valid WF", "enabled": "Yes",
"trigger": {"object_type": "Devices", "event_type": "update"},
"conditions": [],
"actions": [{"type": "update_field", "field": "devIsNew",
"value": "{{trigger.devLastIP}}"}]
}
mgr = self._make_manager_loading([wf])
self.assertEqual(len(mgr.workflows), 1)
def test_invalid_token_workflow_rejected(self):
wf = {
"name": "Bad WF", "enabled": "Yes",
"trigger": {"object_type": "Devices", "event_type": "update"},
"conditions": [],
"actions": [{"type": "update_field", "field": "devIsNew",
"value": "{{trigger.nonexistent_field}}"}]
}
mgr = self._make_manager_loading([wf])
self.assertEqual(len(mgr.workflows), 0)
# ---------------------------------------------------------------------------
# DeviceInstance.queryByConditions
# ---------------------------------------------------------------------------
class TestQueryByConditions(unittest.TestCase):
def setUp(self):
self.conn = make_db()
dev_a = make_device_dict("aa:bb:cc:dd:ee:01", devLastIP="192.168.1.10",
devGUID="guid-a", devIsArchived=0)
dev_b = make_device_dict("aa:bb:cc:dd:ee:02", devLastIP="192.168.1.10",
devGUID="guid-b", devIsArchived=0)
dev_c = make_device_dict("aa:bb:cc:dd:ee:03", devLastIP="192.168.1.20",
devGUID="guid-c", devIsArchived=0)
for d in [dev_a, dev_b, dev_c]:
insert_device_from_dict(self.conn, d)
def _instance(self):
from models.device_instance import DeviceInstance
inst = DeviceInstance()
# Patch _fetchall to use our in-memory connection
def _fetchall(q, p=()):
rows = self.conn.execute(q, p).fetchall()
return [dict(r) for r in rows]
inst._fetchall = _fetchall
return inst
def test_equals_returns_matching_devices(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"}
])
macs = {r["devMac"] for r in results}
self.assertIn("aa:bb:cc:dd:ee:01", macs)
self.assertIn("aa:bb:cc:dd:ee:02", macs)
self.assertNotIn("aa:bb:cc:dd:ee:03", macs)
def test_multiple_conditions_and_logic(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "equals", "value": "192.168.1.10"},
{"field": "devMac", "operator": "equals", "value": "aa:bb:cc:dd:ee:01"},
])
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["devMac"], "aa:bb:cc:dd:ee:01")
def test_contains_operator(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "contains", "value": "192.168.1"}
])
self.assertEqual(len(results), 3)
def test_empty_conditions_returns_empty(self):
inst = self._instance()
results = inst.queryByConditions([])
self.assertEqual(results, [])
def test_unknown_field_skipped_returns_empty(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "nonexistent_column", "operator": "equals", "value": "x"}
])
self.assertEqual(results, [])
def test_unknown_operator_skipped_returns_empty(self):
inst = self._instance()
results = inst.queryByConditions([
{"field": "devLastIP", "operator": "regex", "value": ".*"}
])
self.assertEqual(results, [])
# ---------------------------------------------------------------------------
# UpdateFieldAction — boolean cast
# ---------------------------------------------------------------------------
class TestUpdateFieldActionBooleanCast(unittest.TestCase):
def setUp(self):
self.conn = make_db()
dev = make_device_dict("aa:bb:cc:dd:ee:ff", devGUID="guid-1", devIsArchived=0)
insert_device_from_dict(self.conn, dev)
def _make_action(self, field, value, target_device):
from workflows.actions import UpdateFieldAction
trigger = MagicMock()
trigger.object = None
trigger.object_type = "Devices"
db = MagicMock()
action = UpdateFieldAction(db, field, value, trigger, target_device)
# Patch DeviceInstance.updateField to capture what value is written
self.written_value = None
def fake_update(guid, f, v):
self.written_value = v
with patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.updateField.side_effect = fake_update
action.execute()
return self.written_value
def test_string_one_cast_to_int_for_boolean_column(self):
target = {"devGUID": "guid-1", "devIsArchived": 0}
written = self._make_action("devIsArchived", "1", target)
self.assertEqual(written, 1)
self.assertIsInstance(written, int)
def test_string_zero_cast_to_int_for_boolean_column(self):
target = {"devGUID": "guid-1", "devIsArchived": 1}
written = self._make_action("devIsArchived", "0", target)
self.assertEqual(written, 0)
self.assertIsInstance(written, int)
def test_non_boolean_column_not_cast(self):
target = {"devGUID": "guid-1", "devName": "OldName"}
written = self._make_action("devName", "NewName", target)
self.assertEqual(written, "NewName")
self.assertIsInstance(written, str)
def test_invalid_boolean_value_skips_update(self):
target = {"devGUID": "guid-1", "devIsArchived": 0}
written = self._make_action("devIsArchived", "not_an_int", target)
self.assertIsNone(written)
# ---------------------------------------------------------------------------
# UpdateFieldAction — devMac conflict archive guard
# ---------------------------------------------------------------------------
class TestUpdateFieldActionMacGuard(unittest.TestCase):
def test_conflicting_mac_device_archived(self):
from workflows.actions import UpdateFieldAction
trigger = MagicMock()
trigger.object = None
db = MagicMock()
conflicting = {"devGUID": "guid-conflict", "devMac": "aa:bb:cc:dd:ee:ff"}
current_guid = "guid-current"
target_device = {"devGUID": current_guid, "devMac": "11:22:33:44:55:66"}
action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
archived_guid = None
def fake_update(guid, field, value):
nonlocal archived_guid
if field == "devIsArchived":
archived_guid = guid
with patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.getByMac.return_value = conflicting
MockDI.return_value.updateField.side_effect = fake_update
action.execute()
self.assertEqual(archived_guid, "guid-conflict")
def test_no_conflicting_mac_no_archive(self):
from workflows.actions import UpdateFieldAction
trigger = MagicMock()
trigger.object = None
db = MagicMock()
target_device = {"devGUID": "guid-current", "devMac": "11:22:33:44:55:66"}
action = UpdateFieldAction(db, "devMac", "aa:bb:cc:dd:ee:ff", trigger, target_device)
archived_guid = None
def fake_update(guid, field, value):
nonlocal archived_guid
if field == "devIsArchived":
archived_guid = guid
with patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.getByMac.return_value = None
MockDI.return_value.updateField.side_effect = fake_update
action.execute()
self.assertIsNone(archived_guid)
# ---------------------------------------------------------------------------
# Cascade prevention — _mutated_guids
# ---------------------------------------------------------------------------
class TestCascadePrevention(unittest.TestCase):
def test_mutated_guid_blocks_event(self):
mgr = make_stub_manager()
mgr._mutated_guids.add("dev-guid-42")
event = _make_app_event(guid="evt-1", obj_guid="dev-guid-42")
# Make event dict-accessible
event = MagicMock()
event.__getitem__ = lambda s, k: {"guid": "evt-1", "objectGuid": "dev-guid-42",
"index": 1}[k]
# process_event should skip without calling execute_workflow
with patch.object(mgr, "execute_workflow") as mock_exec:
mgr.process_event(event)
mock_exec.assert_not_called()
def test_get_new_app_events_clears_mutated_guids(self):
mgr = make_stub_manager()
mgr._mutated_guids.add("some-guid")
mgr.db.sql.execute.return_value.fetchall.return_value = []
mgr.get_new_app_events()
self.assertEqual(len(mgr._mutated_guids), 0)
def test_execute_actions_adds_to_mutated_guids(self):
mgr = make_stub_manager()
target_device = {"devGUID": "guid-mutated", "devIsArchived": 0}
actions = [{"type": "update_field", "field": "devIsArchived", "value": "1"}]
trigger = MagicMock()
trigger.object = None
with patch("workflows.manager.DeviceInstance"), \
patch("workflows.actions.DeviceInstance") as MockDI:
MockDI.return_value.updateField = MagicMock()
with patch.object(mgr, "_resolve_target_devices", return_value=[target_device]):
mgr.execute_actions(actions, trigger)
self.assertIn("guid-mutated", mgr._mutated_guids)
if __name__ == "__main__":
unittest.main()

View File

@@ -377,6 +377,28 @@ def down_event_macs(cur) -> set:
return {r["eveMac"].lower() for r in cur.fetchall()}
def insert_device_from_dict(conn: sqlite3.Connection, device: dict) -> None:
"""Insert a device dict (as produced by make_device_dict) into Devices.
Uses INSERT OR IGNORE so duplicate MACs are silently skipped. Accepts any
subset of Devices columns — only keys present in the table are written.
"""
cur = conn.cursor()
cur.execute("PRAGMA table_info(Devices)")
db_columns = {row[1] for row in cur.fetchall()}
cols = [k for k in device.keys() if k in db_columns]
placeholders = ", ".join("?" for _ in cols)
col_list = ", ".join(cols)
values = [device[c] for c in cols]
cur.execute(
f"INSERT OR IGNORE INTO Devices ({col_list}) VALUES ({placeholders})",
values,
)
conn.commit()
# ---------------------------------------------------------------------------
# DummyDB — minimal wrapper used by scan.session_events helpers
# ---------------------------------------------------------------------------