From c829c4cf5a79e88db49fdc4e3a8c10f69bd13158 Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Mon, 25 May 2026 11:32:59 +0000 Subject: [PATCH 1/5] Enhance node name extraction logic to accurately classify file formats for PUSH and PULL modes --- test/plugins/test_sync_protocol.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/test/plugins/test_sync_protocol.py b/test/plugins/test_sync_protocol.py index c765394f..ac2da79c 100644 --- a/test/plugins/test_sync_protocol.py +++ b/test/plugins/test_sync_protocol.py @@ -70,15 +70,17 @@ def _get_data(api_token, node_url): def _node_name_from_filename(file_name: str) -> str: """Mirror of the node-name extraction in sync.main() (Mode 3). - Real file formats produced by the system: - PUSH (post-decode): last_result.PLUGIN.decoded.NodeName.N.log - — split on '.decoded.' marker, strip .N.log with rsplit from the right - PULL: last_result.NodeName.log - — strip 'last_result.' prefix and '.log' suffix + PUSH shape: last_result.PLUGIN.(decoded|encoded).NodeName.N.log + — marker present AND the second-to-last segment (before .log) is a digit + PULL shape: last_result.NodeName.log + — no marker, or marker present but no digit counter + (e.g. node name is 'office.encoded.lab') Both forms handle dots anywhere in PLUGIN or NodeName. """ - if '.decoded.' in file_name or '.encoded.' in file_name: + marker_present = '.decoded.' in file_name or '.encoded.' in file_name + is_push = marker_present and file_name.rsplit('.', 2)[1].isdigit() + if is_push: marker = '.decoded.' if '.decoded.' in file_name else '.encoded.' _, after = file_name.split(marker, 1) return after.rsplit('.', 2)[0] @@ -366,6 +368,12 @@ class TestNodeNameExtraction: "last_result.A.B.decoded.x.y.z.1.log" ) == "x.y.z" + def test_pull_with_encoded_in_node_name(self): + # Regression: PULL file whose node name contains '.encoded.' must NOT + # be mis-classified as a PUSH artifact (no digit counter → PULL branch). + assert _node_name_from_filename("last_result.office.encoded.lab.log") == "office.encoded.lab" + assert _node_name_from_filename("last_result.site.decoded.backup.log") == "site.decoded.backup" + # =========================================================================== # CurrentScan candidates filter (Mode 3 – RECEIVE) From f8f7ac38279389b1dc3550813ca0277af108aa05 Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Tue, 26 May 2026 07:33:23 +0000 Subject: [PATCH 2/5] Implement SYNC_BEHAVIOR setting to control hub device write modes and update related documentation #1652 --- docs/ADVISORY_MULTI_NETWORK.md | 2 +- docs/API_SYNC.md | 24 +++- docs/NOTIFICATIONS.md | 2 - docs/REMOTE_NETWORKS.md | 3 + front/plugins/sync/README.md | 65 +++++++-- front/plugins/sync/config.json | 32 +++++ front/plugins/sync/sync.py | 87 +++++++++--- test/db_test_helpers.py | 60 +++++++-- test/plugins/test_sync_protocol.py | 207 +++++++++++++++++++++++++++++ 9 files changed, 438 insertions(+), 44 deletions(-) diff --git a/docs/ADVISORY_MULTI_NETWORK.md b/docs/ADVISORY_MULTI_NETWORK.md index b56f58ae..ab77ca89 100644 --- a/docs/ADVISORY_MULTI_NETWORK.md +++ b/docs/ADVISORY_MULTI_NETWORK.md @@ -8,7 +8,7 @@ Effective multi-network monitoring starts with understanding how NetAlertX "sees * **B. Plan Subnet & Scan Interfaces:** Explicitly configure each accessible segment in `SCAN_SUBNETS` with the corresponding interfaces. * **C. Remote & Inaccessible Networks:** For networks unreachable via ARP, use these strategies: * **Alternate Plugins:** Supplement discovery with [SNMPDSC](SNMPDSC) or [DHCP lease imports](https://docs.netalertx.com/PLUGINS/?h=DHCPLSS#available-plugins). -* **Centralized Multi-Tenant Management using Sync Nodes:** Run secondary NetAlertX instances on isolated networks and aggregate data using the **SYNC plugin**. +* **Centralized Multi-Tenant Management using Sync Nodes:** Run secondary NetAlertX instances on isolated networks and aggregate data using the **SYNC plugin**. Use the [`SYNC_BEHAVIOR`](../front/plugins/sync/README.md#hub-device-write-behavior-sync_behavior) setting on the hub to control whether the hub inherits device config from nodes or manages it independently. * **Manual Entry:** For static assets where only ICMP (ping) status is needed. > [!TIP] diff --git a/docs/API_SYNC.md b/docs/API_SYNC.md index c7569170..f7b6e4c8 100755 --- a/docs/API_SYNC.md +++ b/docs/API_SYNC.md @@ -1,4 +1,4 @@ -# Sync API Endpoint +# Sync API Endpoint --- @@ -35,7 +35,7 @@ curl 'http://:/sync' \ --- -#### 9.2 POST `/sync` +#### 9.2 POST `/sync` The **POST** endpoint is used by nodes to **send data to the hub**. The hub expects the data as **form-encoded fields** (application/x-www-form-urlencoded or multipart/form-data). The hub then stores the data in the plugin log folder for processing. @@ -91,7 +91,7 @@ curl -X POST 'http://:/sync' \ * The `data` field contains JSON with a **`data` array**, where each element is a **device object** or **plugin data object**. * The `plugin` and `node_name` fields allow the hub to **organize and store the file correctly**. -* The data is only processed if the relevant plugins are enabled and run on the target server. +* The data is only processed if the relevant plugins are enabled and run on the target server. --- @@ -112,7 +112,7 @@ last_result..encoded...log * Both encoded and decoded files are tracked, and new submissions increment the sequence number. * If storing fails, the API returns HTTP 500 with an error message. -* The data is only processed if the relevant plugins are enabled and run on the target server. +* The data is only processed if the relevant plugins are enabled and run on the target server. --- @@ -120,6 +120,20 @@ last_result..encoded...log * **Authorization Required** – Both GET and POST require a valid API token. * **Data Integrity** – Ensure that `node_name` and `plugin` are consistent to avoid overwriting files. -* **Monitoring** – Notifications are generated whenever data is sent or received (`write_notification`), which can be used for alerting or auditing. +* **Monitoring** – An in-app log entry is written via `write_notification` whenever data is sent or received, which can be used for auditing. * **Use Case** – Typically used in multi-node deployments to consolidate device and event data on a central hub. +--- + +#### 9.4 Hub Device-Write Behavior (`SYNC_BEHAVIOR`) + +The `SYNC_BEHAVIOR` setting controls how the hub writes devices received from nodes (Mode 3 — RECEIVE). It only affects the hub. + +| Value | Default? | Writes to Devices | +|---|---|---| +| `copy-new` | ✅ | New MACs only (INSERT OR IGNORE) | +| `carbon-copy` | | All MACs every sync (UPSERT) | +| `hub-defaults` | | None — hub pipeline handles it | + +For full details and per-mode behaviour, see [SYNC plugin README — Hub Device-Write Behavior](../front/plugins/sync/README.md#hub-device-write-behavior-sync_behavior). + diff --git a/docs/NOTIFICATIONS.md b/docs/NOTIFICATIONS.md index eb29b1a2..882381e3 100755 --- a/docs/NOTIFICATIONS.md +++ b/docs/NOTIFICATIONS.md @@ -63,5 +63,3 @@ You can completely ignore detected devices globally. This could be because your 1. Ignored MACs (`NEWDEV_ignored_MACs`) - List of MACs to ignore. 2. Ignored IPs (`NEWDEV_ignored_IPs`) - List of IPs to ignore. - - diff --git a/docs/REMOTE_NETWORKS.md b/docs/REMOTE_NETWORKS.md index 0999b303..c2c20f5f 100755 --- a/docs/REMOTE_NETWORKS.md +++ b/docs/REMOTE_NETWORKS.md @@ -45,6 +45,9 @@ You can use supplementary plugins that employ alternate methods. Protocols used If you have servers in different networks, you can set up separate NetAlertX instances on those subnets and synchronize the results into one instance using the [`SYNC` plugin](https://github.com/netalertx/NetAlertX/tree/main/front/plugins/sync). +> [!TIP] +> The [`SYNC_BEHAVIOR`](../front/plugins/sync/README.md#hub-device-write-behavior-sync_behavior) setting controls how the hub handles newly discovered devices from nodes - whether it inherits node config, overwrites on every sync, or applies its own NEWDEV defaults. + ### Manual Entry If you don't need to discover new devices and only need to report on their status (`online`, `offline`, `down`), you can manually enter devices and check their status using the [`ICMP` plugin](https://github.com/netalertx/NetAlertX/blob/main/front/plugins/icmp_scan/), which uses the `ping` command internally. diff --git a/front/plugins/sync/README.md b/front/plugins/sync/README.md index 65d7ecd4..975986de 100755 --- a/front/plugins/sync/README.md +++ b/front/plugins/sync/README.md @@ -53,6 +53,7 @@ The plugin operates in three different modes based on the configuration settings - **Schedule** `[n,h]`: `SYNC_RUN_SCHD` - **Encryption Key** `[n,h]`: `SYNC_encryption_key` - **Nodes to Pull From** `[h]`: `SYNC_nodes` + `GRAPHQL_PORT` of the source nodes +- **Hub Behavior** `[h]`: `SYNC_BEHAVIOR` — controls how the hub writes devices received from nodes (see [below](#hub-device-write-behavior-sync_behavior)) ### Usage @@ -63,11 +64,59 @@ The plugin operates in three different modes based on the configuration settings ### Notes -- Existing devices on the hub will not be updated by the data received from this SYNC plugin if their MAC addresses are already present. +- How existing and new devices are handled on the hub depends on the `SYNC_BEHAVIOR` setting (see below). - It is recommended to use Device synchronization primarily. Plugin data synchronization is more suitable for specific use cases. ![Sync Hub Setup Diagram](/front/plugins/sync/sync_hub.png) +--- + +### Hub Device-Write Behavior (`SYNC_BEHAVIOR`) + +The `SYNC_BEHAVIOR` setting — configured on the **hub only** — controls how the hub writes devices received from nodes. + +| Value | Default? | Devices written | Source of truth | Recommended when | +|---|---|---|---|---| +| `copy-new` | ✅ | New devices only (INSERT OR IGNORE) | Node (first sync), then Hub | You want the hub to start with the node's existing config and manage devices from there. | +| `carbon-copy` | | All devices on every sync (UPSERT) | Node | The node owns device config end-to-end. **All** hub fields are overwritten on every sync, including LOCKED. Do not customize devices on the hub. | +| `hub-defaults` | | None — hub pipeline handles insertion | Hub | Nodes provide presence data only; all device config is set and maintained on the hub. | + +#### `copy-new` (default) + +New devices are inserted using all available column values from the node's existing record (name, alert settings, vendor, etc.). If the device already exists on the hub, the INSERT is silently skipped. + +Subsequent syncs update only empty/unknown fields on the hub (e.g., if the hub's `devName` is `(unknown)` and the node now has a resolved name, it propagates). Fields customized by a user on the hub (fileds with source set to `USER` or `LOCKED`) are never overwritten. + +``` +First sync: INSERT with node's full config +Next syncs: empty fields updated only (name, vendor) via scan pipeline +User edits: protected — never overwritten +``` + +#### `carbon-copy` + +All received devices are upserted on every sync. The node is treated as fully authoritative: its values overwrite **all** hub fields on every sync cycle, including fields with `USER` or `LOCKED` source. + +> ⚠️ Do not customize devices on the hub when using `carbon-copy`. Any hub-side changes will be overwritten on the next sync. + +``` +First sync: UPSERT with node config +Next syncs: UPSERT — node values win (all fields, no exceptions) +User edits: overwritten on next sync +``` + +#### `hub-defaults` + +**The hub is the source of truth.** Nodes contribute only presence data (MAC, IP, vendor from scans). All device configuration — name, alerts, notes, group — should be set on the hub. Node-side values for those fields are ignored. + +Use this mode when you want the hub to behave as a fully independent instance — it receives presence data from nodes but manages its own device configuration. + +``` +First sync: NEWDEV defaults applied +Next syncs: empty fields updated only via scan pipeline +User edits: set and maintained on the hub — never overwritten +``` + ### Example use case: Network Setup with Multiple VLANs and VM Scanning > Thank you to [@richtj999](https://github.com/richtj999) for the use case 🙏 @@ -76,7 +125,7 @@ I have 6 VLANs, all isolated by a firewall, except for one VLAN that has access Initially, I had one virtual machine (VM) with 6 network cards, one for each VLAN. While this setup worked, it introduced delays due to other concurrent scans. To optimize this, I switched to a multi-VM setup: -- I created 6 VMs, each attached to a single VLAN. +- I created 6 VMs, each attached to a single VLAN. - One VM acts as the "server," and the other 5 as "clients." - The server has access to all VLANs (via firewall rules) and collects data from the client VMs, which each scan their own VLAN. @@ -87,22 +136,22 @@ Initially, I had one virtual machine (VM) with 6 network cards, one for each VLA #### Example Setup -- **VM1 ("Server")**: Network 1 (can access all networks) - IP: `10.10.10.106` +- **VM1 ("Server")**: Network 1 (can access all networks) - IP: `10.10.10.106` Receives data from all NetAlertX clients and scans network 1. -- **VM2 ("Client")**: Network 2 (can access only network 2) - IP: `192.168.x.x` +- **VM2 ("Client")**: Network 2 (can access only network 2) - IP: `192.168.x.x` Scans network 2; VM1 retrieves this data. -- **VM3 ("Client")**: Network 3 (can access only network 3) - IP: `192.168.x.x` +- **VM3 ("Client")**: Network 3 (can access only network 3) - IP: `192.168.x.x` Scans network 3; VM1 retrieves this data. -- **VM4 ("Client")**: Network 4 (can access only network 4) - IP: `192.168.x.x` +- **VM4 ("Client")**: Network 4 (can access only network 4) - IP: `192.168.x.x` Scans network 4; VM1 retrieves this data. -- **VM5 ("Client")**: Network 5 (can access only network 5) - IP: `192.168.x.x` +- **VM5 ("Client")**: Network 5 (can access only network 5) - IP: `192.168.x.x` Scans network 5; VM1 retrieves this data. -- **VM6 ("Client")**: Network 6 (can access only network 6) - IP: `192.168.x.x` +- **VM6 ("Client")**: Network 6 (can access only network 6) - IP: `192.168.x.x` Scans network 6; VM1 retrieves this data. --- diff --git a/front/plugins/sync/config.json b/front/plugins/sync/config.json index e364c339..b5db98ea 100755 --- a/front/plugins/sync/config.json +++ b/front/plugins/sync/config.json @@ -597,6 +597,38 @@ } ] }, + { + "function": "BEHAVIOR", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "copy-new", + "options": [ + "copy-new", + "carbon-copy", + "hub-defaults" + ], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Hub behavior [h]" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Controls how the hub handles devices received from nodes.

copy-new (default): New devices are inserted using the node's existing configuration (name, alert settings, etc.). Subsequent node-side changes only update empty fields on the hub. Recommended: customize devices on the hub after first discovery.

carbon-copy: All devices are upserted on every sync — the node is fully authoritative and its values overwrite all hub fields on every sync, including fields with USER or LOCKED source. Do not customize devices on the hub.

hub-defaults: The hub skips the direct INSERT and creates new devices through its own pipeline, applying NEWDEV defaults. The hub is the source of truth; nodes contribute presence data only. Recommended: manage all device settings on the hub." + } + ] + }, { "function": "SET_ALWAYS", "type": { diff --git a/front/plugins/sync/sync.py b/front/plugins/sync/sync.py index 459dad47..20cf11df 100755 --- a/front/plugins/sync/sync.py +++ b/front/plugins/sync/sync.py @@ -257,7 +257,7 @@ def main(): cursor.execute("PRAGMA table_info(Devices)") db_columns = {row[1] for row in cursor.fetchall()} - # Filter out existing devices + # Filter new devices (MACs not yet known on hub). new_devices = [ device for device in device_data if device['devMac'].lower() not in existing_mac_addresses @@ -266,29 +266,80 @@ def main(): mylog('verbose', [f'[{pluginName}] All devices: "{len(device_data)}"']) mylog('verbose', [f'[{pluginName}] New devices: "{len(new_devices)}"']) - # Prepare the insert statement - if new_devices: + # Determine which devices to write and how, based on SYNC_BEHAVIOR. + # + # copy-new (default) — INSERT new devices only, using node config. + # Subsequent node changes only update empty hub fields. + # + # carbon-copy — UPSERT all devices every sync. + # Node is authoritative; overwrites hub values except + # USER/LOCKED-sourced fields (enforced by the + # update_devices_data_from_scan pipeline, not here). + # + # hub-defaults — Skip direct INSERT entirely. + # Hub creates new devices via create_new_devices() + # with its own NEWDEV defaults. + # + # For copy-new/carbon-copy we insert them here (before the Devices INSERT + # would pre-seed the table and block create_new_devices()). + # For hub-defaults, create_new_devices() handles it naturally. - # Only keep keys that are real columns in the target DB; computed - # or unknown fields are silently dropped regardless of source schema. - insert_cols = [k for k in new_devices[0].keys() if k in db_columns] - columns = ', '.join(insert_cols) - placeholders = ', '.join('?' for _ in insert_cols) - sql = f'INSERT INTO Devices ({columns}) VALUES ({placeholders})' + sync_behavior = get_setting_value('SYNC_BEHAVIOR') or 'copy-new' + mylog('verbose', [f'[{pluginName}] SYNC_BEHAVIOR: "{sync_behavior}"']) - # Extract only the whitelisted column values for each device - values = [tuple(device.get(col) for col in insert_cols) for device in new_devices] + if sync_behavior == 'hub-defaults': + mylog('verbose', [f'[{pluginName}] hub-defaults: skipping direct Devices write; hub pipeline handles new devices and events']) - mylog('verbose', [f'[{pluginName}] Inserting Devices SQL : "{sql}"']) - mylog('verbose', [f'[{pluginName}] Inserting Devices VALUES: "{values}"']) + else: + # Fire "New Device" events for genuinely new MACs before the Devices + # INSERT pre-seeds the table (which would block create_new_devices()). + if new_devices: + now = timeNowUTC() + cursor.executemany( + """INSERT OR IGNORE INTO Events + (eveMac, eveIp, eveDateTime, eveEventType, eveAdditionalInfo, evePendingAlertEmail) + VALUES (?, ?, ?, 'New Device', ?, 1)""", + [(d['devMac'], d.get('devLastIP', ''), now, d.get('devVendor', '')) + for d in new_devices] + ) + mylog('verbose', [f'[{pluginName}] Queued "New Device" events for {len(new_devices)} device(s)']) - # Use executemany for batch insertion - cursor.executemany(sql, values) + devices_to_write = new_devices if sync_behavior == 'copy-new' else device_data - message = f'[{pluginName}] Inserted "{len(new_devices)}" new devices' + if devices_to_write: + # Only keep keys that are real DB columns; computed or unknown + # fields are silently dropped regardless of the source schema. + insert_cols = [k for k in devices_to_write[0].keys() if k in db_columns] + columns = ', '.join(insert_cols) + placeholders = ', '.join('?' for _ in insert_cols) - mylog('verbose', [message]) - write_notification(message, 'info', timeNowUTC()) + if sync_behavior == 'carbon-copy': + # UPSERT: on MAC conflict update all columns except devMac. + # devMac is the PRIMARY KEY so it is excluded from the SET clause. + # NOTE: this raw SQL bypasses can_overwrite_field() — ALL fields + # including USER/LOCKED-sourced ones are overwritten. Node is fully + # authoritative in this mode. + update_cols = [col for col in insert_cols if col != 'devMac'] + update_clause = ', '.join(f'{col}=excluded.{col}' for col in update_cols) + sql = ( + f'INSERT INTO Devices ({columns}) VALUES ({placeholders}) ' + f'ON CONFLICT(devMac) DO UPDATE SET {update_clause}' + ) + else: + # copy-new: skip silently if MAC already exists (race-condition safety). + sql = f'INSERT OR IGNORE INTO Devices ({columns}) VALUES ({placeholders})' + + values = [tuple(device.get(col) for col in insert_cols) for device in devices_to_write] + + mylog('verbose', [f'[{pluginName}] Devices SQL : "{sql}"']) + mylog('verbose', [f'[{pluginName}] Devices VALUES: "{values}"']) + + cursor.executemany(sql, values) + + write_count = len(new_devices) if sync_behavior == 'copy-new' else len(devices_to_write) + message = f'[{pluginName}] {sync_behavior}: wrote "{write_count}" device(s) to Devices' + mylog('verbose', [message]) + write_notification(message, 'info', timeNowUTC()) # Commit and close the connection conn.commit() diff --git a/test/db_test_helpers.py b/test/db_test_helpers.py index f92bb308..7af01c4a 100644 --- a/test/db_test_helpers.py +++ b/test/db_test_helpers.py @@ -283,43 +283,83 @@ def sync_insert_devices( conn: sqlite3.Connection, device_data: list, existing_macs: set | None = None, + behavior: str = "copy-new", ) -> int: """ - Schema-aware device INSERT mirroring sync.py's Mode-3 insert block. + Schema-aware device write mirroring sync.py's Mode-3 SYNC_BEHAVIOR block. Parameters ---------- conn: - In-memory (or real) SQLite connection with a Devices table. + In-memory (or real) SQLite connection with Devices and Events tables. device_data: List of device dicts as received from table_devices.json or a node log. existing_macs: - Set of MAC addresses already present in Devices. Rows whose devMac is - in this set are skipped. Pass ``None`` (default) to insert everything. + Set of MAC addresses already present in Devices. Used to compute + genuinely new MACs for the Events INSERT and (for ``copy-new``) to + filter write candidates. Pass ``None`` to treat every device as new. + behavior: + One of ``"copy-new"`` (default), ``"carbon-copy"``, or + ``"hub-defaults"``. - Returns the number of rows actually inserted. + ``copy-new`` — INSERT OR IGNORE for new MACs only (current default). + ``carbon-copy`` — UPSERT (INSERT … ON CONFLICT DO UPDATE) for all MACs. + ``hub-defaults``— skip write entirely; hub pipeline handles new devices + and their Events rows. + + Returns the number of device rows written (0 for ``hub-defaults``). + Side-effect: inserts an Events row with eveEventType='New Device' for each + genuinely new MAC when behavior is ``copy-new`` or ``carbon-copy``. """ - if not device_data: + if not device_data or behavior == "hub-defaults": return 0 cursor = conn.cursor() - candidates = ( + # Genuinely new MACs — drives the Events INSERT for both non-hub-defaults modes. + new_devices = ( [d for d in device_data if d["devMac"] not in existing_macs] if existing_macs is not None else list(device_data) ) + # Fire "New Device" events before the Devices INSERT pre-seeds the table. + if new_devices: + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + cursor.executemany( + """INSERT OR IGNORE INTO Events + (eveMac, eveIp, eveDateTime, eveEventType, eveAdditionalInfo, evePendingAlertEmail) + VALUES (?, ?, ?, 'New Device', ?, 1)""", + [(d["devMac"], d.get("devLastIP", ""), now, d.get("devVendor", "")) + for d in new_devices], + ) + + if behavior == "copy-new": + candidates = new_devices + else: # carbon-copy — process all devices + candidates = list(device_data) + if not candidates: + conn.commit() return 0 cursor.execute("PRAGMA table_info(Devices)") db_columns = {row[1] for row in cursor.fetchall()} - insert_cols = [k for k in candidates[0].keys() if k in db_columns] - columns = ", ".join(insert_cols) + insert_cols = [k for k in candidates[0].keys() if k in db_columns] + columns = ", ".join(insert_cols) placeholders = ", ".join("?" for _ in insert_cols) - sql = f"INSERT INTO Devices ({columns}) VALUES ({placeholders})" + + if behavior == "carbon-copy": + update_cols = [col for col in insert_cols if col != "devMac"] + update_clause = ", ".join(f"{col}=excluded.{col}" for col in update_cols) + sql = ( + f"INSERT INTO Devices ({columns}) VALUES ({placeholders}) " + f"ON CONFLICT(devMac) DO UPDATE SET {update_clause}" + ) + else: + sql = f"INSERT OR IGNORE INTO Devices ({columns}) VALUES ({placeholders})" + values = [tuple(d.get(col) for col in insert_cols) for d in candidates] cursor.executemany(sql, values) conn.commit() diff --git a/test/plugins/test_sync_protocol.py b/test/plugins/test_sync_protocol.py index ac2da79c..1ed9a583 100644 --- a/test/plugins/test_sync_protocol.py +++ b/test/plugins/test_sync_protocol.py @@ -569,3 +569,210 @@ class TestMode3JsonSkip: f.write_text("") with pytest.raises(json.JSONDecodeError): _parse_sync_payload(str(f)) + + +# =========================================================================== +# SYNC_BEHAVIOR — three hub device-write modes (Mode 3 – RECEIVE) +# =========================================================================== + +class TestSyncBehavior: + """Covers the three SYNC_BEHAVIOR modes for hub-side device writes. + + copy-new (default) — INSERT new MACs only, skip existing. + carbon-copy — UPSERT all MACs; node values overwrite hub values. + hub-defaults — skip direct write; let hub pipeline handle it. + """ + + # ------------------------------------------------------------------ + # copy-new (default – backward compatible) + # ------------------------------------------------------------------ + + def test_copy_new_inserts_new_device(self, conn): + device = make_device_dict(mac="aa:bb:cc:dd:ee:01") + written = sync_insert_devices(conn, [device], existing_macs=set(), behavior="copy-new") + assert written == 1 + cur = conn.cursor() + cur.execute("SELECT devMac FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone() is not None + + def test_copy_new_skips_existing_device(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac, devName) VALUES (?, ?)", ("aa:bb:cc:dd:ee:01", "Original")) + conn.commit() + + device = make_device_dict(mac="aa:bb:cc:dd:ee:01", devName="Updated") + written = sync_insert_devices(conn, [device], existing_macs={"aa:bb:cc:dd:ee:01"}, behavior="copy-new") + assert written == 0 + cur.execute("SELECT devName FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone()["devName"] == "Original" + + def test_copy_new_only_new_in_mixed_batch(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac, devName) VALUES (?, ?)", ("aa:bb:cc:dd:ee:existing", "Existing")) + conn.commit() + + devices = [ + make_device_dict(mac="aa:bb:cc:dd:ee:existing"), + make_device_dict(mac="aa:bb:cc:dd:ee:new1"), + make_device_dict(mac="aa:bb:cc:dd:ee:new2"), + ] + written = sync_insert_devices(conn, devices, existing_macs={"aa:bb:cc:dd:ee:existing"}, behavior="copy-new") + assert written == 2 + + # ------------------------------------------------------------------ + # carbon-copy — UPSERT, node is authoritative + # ------------------------------------------------------------------ + + def test_carbon_copy_inserts_new_device(self, conn): + device = make_device_dict(mac="aa:bb:cc:dd:ee:01") + written = sync_insert_devices(conn, [device], behavior="carbon-copy") + assert written == 1 + cur = conn.cursor() + cur.execute("SELECT devMac FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone() is not None + + def test_carbon_copy_overwrites_existing_device(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac, devName) VALUES (?, ?)", ("aa:bb:cc:dd:ee:01", "OldName")) + conn.commit() + + device = make_device_dict(mac="aa:bb:cc:dd:ee:01", devName="NewName") + written = sync_insert_devices(conn, [device], behavior="carbon-copy") + assert written == 1 + cur.execute("SELECT devName FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone()["devName"] == "NewName" + + def test_carbon_copy_processes_all_devices_in_batch(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac, devName) VALUES (?, ?)", ("aa:bb:cc:dd:ee:01", "OldName")) + conn.commit() + + devices = [ + make_device_dict(mac="aa:bb:cc:dd:ee:01", devName="UpdatedName"), + make_device_dict(mac="aa:bb:cc:dd:ee:02"), + ] + written = sync_insert_devices(conn, devices, behavior="carbon-copy") + assert written == 2 + + cur.execute("SELECT devName FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone()["devName"] == "UpdatedName" + + def test_carbon_copy_does_not_duplicate_existing_device(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac, devName) VALUES (?, ?)", ("aa:bb:cc:dd:ee:01", "Original")) + conn.commit() + + device = make_device_dict(mac="aa:bb:cc:dd:ee:01", devName="Updated") + sync_insert_devices(conn, [device], behavior="carbon-copy") + + cur.execute("SELECT COUNT(*) AS cnt FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone()["cnt"] == 1 + + # ------------------------------------------------------------------ + # hub-defaults — no direct write, hub pipeline handles it + # ------------------------------------------------------------------ + + def test_hub_defaults_writes_nothing(self, conn): + device = make_device_dict(mac="aa:bb:cc:dd:ee:01") + written = sync_insert_devices(conn, [device], behavior="hub-defaults") + assert written == 0 + + def test_hub_defaults_leaves_db_empty(self, conn): + devices = [make_device_dict(mac=f"aa:bb:cc:dd:ee:0{i}") for i in range(3)] + sync_insert_devices(conn, devices, behavior="hub-defaults") + cur = conn.cursor() + cur.execute("SELECT COUNT(*) AS cnt FROM Devices") + assert cur.fetchone()["cnt"] == 0 + + def test_hub_defaults_returns_zero_for_empty_input(self, conn): + assert sync_insert_devices(conn, [], behavior="hub-defaults") == 0 + + # ------------------------------------------------------------------ + # "New Device" events — copy-new and carbon-copy must fire; hub-defaults must not + # ------------------------------------------------------------------ + + def test_copy_new_fires_new_device_event(self, conn): + device = make_device_dict(mac="aa:bb:cc:dd:ee:01") + sync_insert_devices(conn, [device], existing_macs=set(), behavior="copy-new") + cur = conn.cursor() + cur.execute("SELECT COUNT(*) AS cnt FROM Events WHERE eveEventType='New Device' AND eveMac=?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone()["cnt"] == 1 + + def test_copy_new_does_not_fire_event_for_existing_device(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac) VALUES (?)", ("aa:bb:cc:dd:ee:01",)) + conn.commit() + device = make_device_dict(mac="aa:bb:cc:dd:ee:01") + sync_insert_devices(conn, [device], existing_macs={"aa:bb:cc:dd:ee:01"}, behavior="copy-new") + cur.execute("SELECT COUNT(*) AS cnt FROM Events WHERE eveEventType='New Device'") + assert cur.fetchone()["cnt"] == 0 + + def test_carbon_copy_fires_new_device_event_for_new_mac(self, conn): + device = make_device_dict(mac="aa:bb:cc:dd:ee:01") + sync_insert_devices(conn, [device], existing_macs=set(), behavior="carbon-copy") + cur = conn.cursor() + cur.execute("SELECT COUNT(*) AS cnt FROM Events WHERE eveEventType='New Device' AND eveMac=?", ("aa:bb:cc:dd:ee:01",)) + assert cur.fetchone()["cnt"] == 1 + + def test_carbon_copy_does_not_fire_event_for_existing_mac(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac) VALUES (?)", ("aa:bb:cc:dd:ee:01",)) + conn.commit() + device = make_device_dict(mac="aa:bb:cc:dd:ee:01", devName="Updated") + sync_insert_devices(conn, [device], existing_macs={"aa:bb:cc:dd:ee:01"}, behavior="carbon-copy") + cur.execute("SELECT COUNT(*) AS cnt FROM Events WHERE eveEventType='New Device'") + assert cur.fetchone()["cnt"] == 0 + + def test_hub_defaults_fires_no_events_directly(self, conn): + devices = [make_device_dict(mac=f"aa:bb:cc:dd:ee:0{i}") for i in range(3)] + sync_insert_devices(conn, devices, existing_macs=set(), behavior="hub-defaults") + cur = conn.cursor() + cur.execute("SELECT COUNT(*) AS cnt FROM Events WHERE eveEventType='New Device'") + assert cur.fetchone()["cnt"] == 0 + + def test_copy_new_fires_events_only_for_new_macs_in_mixed_batch(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac) VALUES (?)", ("aa:bb:cc:dd:ee:existing",)) + conn.commit() + + devices = [ + make_device_dict(mac="aa:bb:cc:dd:ee:existing"), + make_device_dict(mac="aa:bb:cc:dd:ee:new1"), + make_device_dict(mac="aa:bb:cc:dd:ee:new2"), + ] + sync_insert_devices(conn, devices, existing_macs={"aa:bb:cc:dd:ee:existing"}, behavior="copy-new") + + cur.execute("SELECT eveMac FROM Events WHERE eveEventType='New Device'") + event_macs = {r["eveMac"] for r in cur.fetchall()} + assert event_macs == {"aa:bb:cc:dd:ee:new1", "aa:bb:cc:dd:ee:new2"} + + def test_carbon_copy_fires_events_only_for_new_macs_in_mixed_batch(self, conn): + cur = conn.cursor() + cur.execute("INSERT INTO Devices (devMac, devName) VALUES (?, ?)", ("aa:bb:cc:dd:ee:existing", "Old")) + conn.commit() + + devices = [ + make_device_dict(mac="aa:bb:cc:dd:ee:existing", devName="Updated"), + make_device_dict(mac="aa:bb:cc:dd:ee:new1"), + ] + sync_insert_devices(conn, devices, existing_macs={"aa:bb:cc:dd:ee:existing"}, behavior="carbon-copy") + + cur.execute("SELECT eveMac FROM Events WHERE eveEventType='New Device'") + event_macs = {r["eveMac"] for r in cur.fetchall()} + assert event_macs == {"aa:bb:cc:dd:ee:new1"} + + def test_new_device_event_fields_are_correct(self, conn): + device = make_device_dict(mac="aa:bb:cc:dd:ee:01", devLastIP="10.0.0.1", devVendor="Acme") + sync_insert_devices(conn, [device], existing_macs=set(), behavior="copy-new") + cur = conn.cursor() + cur.execute( + "SELECT * FROM Events WHERE eveEventType='New Device' AND eveMac=?", + ("aa:bb:cc:dd:ee:01",), + ) + row = cur.fetchone() + assert row is not None + assert row["eveMac"] == "aa:bb:cc:dd:ee:01" + assert row["eveIp"] == "10.0.0.1" + assert row["eveAdditionalInfo"] == "Acme" + assert row["evePendingAlertEmail"] == 1 + assert cur.fetchone()["cnt"] == 0 From 5ea999fd57604a8bd1b024ffd51e0da6073ec498 Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Tue, 26 May 2026 07:51:09 +0000 Subject: [PATCH 3/5] Fix typos in README and test files related to SYNC_BEHAVIOR documentation --- front/plugins/sync/README.md | 2 +- front/plugins/sync/sync.py | 9 ++++++--- test/plugins/test_sync_protocol.py | 9 +++++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/front/plugins/sync/README.md b/front/plugins/sync/README.md index 975986de..fa465a70 100755 --- a/front/plugins/sync/README.md +++ b/front/plugins/sync/README.md @@ -85,7 +85,7 @@ The `SYNC_BEHAVIOR` setting — configured on the **hub only** — controls how New devices are inserted using all available column values from the node's existing record (name, alert settings, vendor, etc.). If the device already exists on the hub, the INSERT is silently skipped. -Subsequent syncs update only empty/unknown fields on the hub (e.g., if the hub's `devName` is `(unknown)` and the node now has a resolved name, it propagates). Fields customized by a user on the hub (fileds with source set to `USER` or `LOCKED`) are never overwritten. +Subsequent syncs update only empty/unknown fields on the hub (e.g., if the hub's `devName` is `(unknown)` and the node now has a resolved name, it propagates). Fields customized by a user on the hub (fields with source set to `USER` or `LOCKED`) are never overwritten. ``` First sync: INSERT with node's full config diff --git a/front/plugins/sync/sync.py b/front/plugins/sync/sync.py index 20cf11df..4973cfd1 100755 --- a/front/plugins/sync/sync.py +++ b/front/plugins/sync/sync.py @@ -272,9 +272,12 @@ def main(): # Subsequent node changes only update empty hub fields. # # carbon-copy — UPSERT all devices every sync. - # Node is authoritative; overwrites hub values except - # USER/LOCKED-sourced fields (enforced by the - # update_devices_data_from_scan pipeline, not here). + # Node is fully authoritative; raw SQL bypasses + # can_overwrite_field(), so ALL hub fields are + # overwritten on every sync, including USER/LOCKED- + # sourced fields. (update_devices_data_from_scan + # respects field locks but is not invoked here; + # see README "carbon-copy" for the contract.) # # hub-defaults — Skip direct INSERT entirely. # Hub creates new devices via create_new_devices() diff --git a/test/plugins/test_sync_protocol.py b/test/plugins/test_sync_protocol.py index 1ed9a583..4b5e443e 100644 --- a/test/plugins/test_sync_protocol.py +++ b/test/plugins/test_sync_protocol.py @@ -572,7 +572,7 @@ class TestMode3JsonSkip: # =========================================================================== -# SYNC_BEHAVIOR — three hub device-write modes (Mode 3 – RECEIVE) +# SYNC_BEHAVIOR - three hub device-write modes (Mode 3 - RECEIVE) # =========================================================================== class TestSyncBehavior: @@ -775,4 +775,9 @@ class TestSyncBehavior: assert row["eveIp"] == "10.0.0.1" assert row["eveAdditionalInfo"] == "Acme" assert row["evePendingAlertEmail"] == 1 - assert cur.fetchone()["cnt"] == 0 + # Confirm exactly one event was inserted (no duplicates). + cur.execute( + "SELECT COUNT(*) AS cnt FROM Events WHERE eveEventType='New Device' AND eveMac=?", + ("aa:bb:cc:dd:ee:01",), + ) + assert cur.fetchone()["cnt"] == 1 From cfea84806472af363c7bb04b4f830766aa5a5c4a Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Tue, 26 May 2026 10:40:53 +0000 Subject: [PATCH 4/5] Add validation for node response in main function to skip invalid nodes #1657 --- front/plugins/sync/sync.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/front/plugins/sync/sync.py b/front/plugins/sync/sync.py index 4973cfd1..757e775b 100755 --- a/front/plugins/sync/sync.py +++ b/front/plugins/sync/sync.py @@ -130,6 +130,10 @@ def main(): for node_url in pull_nodes: response_json = get_data(api_token, node_url) + if not isinstance(response_json, dict): + mylog('none', [f'[{pluginName}] Skipping node "{node_url}" due to failed or invalid response']) + continue + # Extract node_name and base64 data node_name = response_json.get('node_name', 'unknown_node') data_base64 = response_json.get('data_base64', '') From d4fe94fb7a1de6e1e91ad767fd62603a07ec784f Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Tue, 26 May 2026 11:30:53 +0000 Subject: [PATCH 5/5] Add error handling for base64 decoding in main function to skip invalid data --- front/plugins/sync/sync.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/front/plugins/sync/sync.py b/front/plugins/sync/sync.py index 757e775b..dc464d52 100755 --- a/front/plugins/sync/sync.py +++ b/front/plugins/sync/sync.py @@ -5,6 +5,7 @@ import sys import requests import json import base64 +import binascii # Define the installation path and extend the system path for plugin imports @@ -139,7 +140,11 @@ def main(): data_base64 = response_json.get('data_base64', '') # Decode base64 data - decoded_data = base64.b64decode(data_base64) + try: + decoded_data = base64.b64decode(data_base64) + except (binascii.Error, ValueError, TypeError) as e: + mylog('none', [f'[{pluginName}] Skipping node "{node_name}": base64 decode failed for data_base64="{data_base64}": {e}']) + continue # Create log file name using node name log_file_name = f'{file_prefix}.{node_name}.log'