From 197e3a3cb6fb488e6afe8be76aea1dac203b6a18 Mon Sep 17 00:00:00 2001 From: Nathan Jacobson <22107+natecj@users.noreply.github.com> Date: Wed, 20 May 2026 16:27:25 -0400 Subject: [PATCH 1/6] feat: add adguard_export plugin and tests Adds a new NetAlertX plugin that syncs known devices from the NetAlertX database to AdGuard Home as persistent clients, keeping names, MACs, IP addresses, and device-type tags in sync. Also fixes the adguard_import config.json description placeholder and a minor indentation inconsistency in that file. Co-Authored-By: Claude Sonnet 4.6 --- front/plugins/adguard_export/README.md | 140 ++++++ front/plugins/adguard_export/config.json | 534 +++++++++++++++++++++++ front/plugins/adguard_export/script.py | 499 +++++++++++++++++++++ front/plugins/adguard_import/config.json | 4 +- test/plugins/test_adguard_export.py | 401 +++++++++++++++++ 5 files changed, 1576 insertions(+), 2 deletions(-) create mode 100644 front/plugins/adguard_export/README.md create mode 100644 front/plugins/adguard_export/config.json create mode 100644 front/plugins/adguard_export/script.py create mode 100644 test/plugins/test_adguard_export.py diff --git a/front/plugins/adguard_export/README.md b/front/plugins/adguard_export/README.md new file mode 100644 index 00000000..e0cdc948 --- /dev/null +++ b/front/plugins/adguard_export/README.md @@ -0,0 +1,140 @@ +# adguard_export — NetAlertX Plugin + +> **Direction:** NetAlertX → AdGuard Home +> Syncs known devices from the NetAlertX database to AdGuard Home as **persistent clients**, keeping device names, MAC addresses, and IP identifiers in sync. + +--- + +## What it does + +On every run the plugin: + +1. Reads all (or only *known*) devices from the NetAlertX database. +2. Fetches the current list of persistent clients from AdGuard Home via its REST API. +3. **Adds** clients that are in NetAlertX but not yet in AdGuard Home. +4. **Updates** clients whose name, identifiers, or device-type tag have changed. +5. Optionally **deletes** clients that have been removed from NetAlertX (see `DELETE` setting). + +Device types set in NetAlertX (e.g. `Smartphone`, `Laptop`, `NAS`) are automatically mapped to the corresponding AdGuard Home `device_*` tags (e.g. `device_phone`, `device_laptop`, `device_nas`). + +--- + +## Requirements + +| Requirement | Notes | +|---|---| +| AdGuard Home | v0.107+ (REST API must be enabled) | +| Python packages | `requests`, `pytz` — already present in the NetAlertX container | +| AdGuard credentials | A user account with permission to manage clients | + +--- + +## Installation + +1. Copy the `adguard_export/` folder into `/app/front/plugins/` inside your NetAlertX container (or mount it as a volume). +2. Restart NetAlertX so the plugin is discovered. +3. Open **Settings → Plugins → AdGuard (Device Export)** and configure the settings below. + +--- + +## Settings + +| Setting key | Default | Description | +|---|---|---| +| `ADGUARDEXP_RUN` | `disabled` | When to run: `disabled`, `once`, or `schedule` | +| `ADGUARDEXP_RUN_SCHD` | `0 * * * *` | Cron schedule (default: hourly) | +| `ADGUARDEXP_URL` | `http://192.168.11.1:3000` | Base URL of AdGuard Home web UI | +| `ADGUARDEXP_USER` | `admin` | AdGuard Home username | +| `ADGUARDEXP_PASSWORD` | *(empty)* | AdGuard Home password | +| `ADGUARDEXP_VERIFYSSL` | `true` | Verify TLS cert; set `false` for self-signed certs | +| `ADGUARDEXP_INCLUDE_OFFLINE` | `true` | When `true`, devices not seen in the last scan are still exported | +| `ADGUARDEXP_INCLUDE_NEW` | `false` | When `false`, devices flagged as new/unknown are excluded until identified | +| `ADGUARDEXP_USEMAC` | `true` | Use MAC address as primary client identifier; falls back to IP | +| `ADGUARDEXP_DELETE` | `false` | ⚠ Delete AdGuard clients no longer present in NetAlertX | + +--- + +## AdGuard Home client identifiers + +AdGuard Home identifies a client by one or more **ids**, which can be: + +- A MAC address (e.g. `aa:bb:cc:dd:ee:ff`) +- An IP address (e.g. `192.168.1.42`) +- A CIDR range +- A ClientID string + +When `ADGUARDEXP_USEMAC=true`, the plugin prefers the device's MAC address and includes the last known IP as a secondary identifier. When `ADGUARDEXP_USEMAC=false`, only the IP address is used. + +--- + +## Device type tags + +The plugin maps NetAlertX device types to valid AdGuard Home `device_*` tags automatically: + +| NetAlertX type | AdGuard tag | +|---|---| +| Smartphone, Phone, Mobile | `device_phone` | +| Laptop, Notebook | `device_laptop` | +| Desktop, Server, Hypervisor | `device_pc` | +| Tablet | `device_tablet` | +| Smart TV, SmartTV, TV | `device_tv` | +| NAS | `device_nas` | +| Printer | `device_printer` | +| IP Camera, Camera | `device_camera` | +| Game Console | `device_gameconsole` | +| Speaker, Assistant, Virtual Assistance | `device_audio` | +| AP, Gateway, Router, House Appliance | `device_other` | + +Devices with an unrecognised or empty type are exported without a tag. + +--- + +## Safe deletion + +When `ADGUARDEXP_DELETE=true`, the plugin only removes clients it previously created — it will never delete clients you added manually in AdGuard Home. Ownership is tracked in a local state file at: + +``` +/app/db/state.ADGUARDEXP.json +``` + +--- + +## Logs + +Plugin logs are written to: + +``` +/tmp/log/plugins/script.ADGUARDEXP.log +``` + +Result rows (used by the NetAlertX UI) are written to: + +``` +/tmp/log/plugins/last_result.ADGUARDEXP.log +``` + +--- + +## Troubleshooting + +| Symptom | Likely cause | +|---|---| +| `Connection failed` in logs | Wrong `ADGUARDEXP_URL` or AdGuard Home is unreachable from the NetAlertX container | +| `HTTP error: 401` | Wrong username / password | +| `HTTP error: 400` | Client already exists with conflicting ids — check AdGuard Home for duplicate entries | +| Devices not appearing | `ADGUARDEXP_INCLUDE_NEW=false` and devices are flagged as new/unknown; identify them in NetAlertX first | +| SSL errors | Set `ADGUARDEXP_VERIFYSSL=false` for self-signed certificates | + +--- + +## Related plugins + +- **adguard_import** — the reverse direction: imports devices *from* AdGuard Home *into* NetAlertX. + +--- + +## Notes + +- Version: 1.0.0 +- Author: [natecj](https://github.com/natecj) +- Release Date: 2026-05-10 diff --git a/front/plugins/adguard_export/config.json b/front/plugins/adguard_export/config.json new file mode 100644 index 00000000..c6da9027 --- /dev/null +++ b/front/plugins/adguard_export/config.json @@ -0,0 +1,534 @@ +{ + "code_name": "adguard_export", + "unique_prefix": "ADGUARDEXP", + "plugin_type": "other", + "execution_order": "Layer_0", + "enabled": true, + "data_source": "script", + "localized": ["display_name", "description", "icon"], + "display_name": [ + { + "language_code": "en_us", + "string": "AdGuard (Device Export)" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Exports known devices from NetAlertX to AdGuard as persistent clients, keeping names and IP/MAC identifiers in sync." + } + ], + "icon": [ + { + "language_code": "en_us", + "string": "" + } + ], + "timeout": 120, + "params": [], + "settings": [ + { + "function": "RUN", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "disabled", + "options": ["disabled", "once", "schedule"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "When to run" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Enable or schedule the export. Use 'schedule' together with the RUN_SCHD setting." + } + ] + }, + { + "function": "RUN_SCHD", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "0 * * * *", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Schedule" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Cron expression for how often to run. Default: every hour on the hour." + } + ] + }, + { + "function": "CMD", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "python3 /app/front/plugins/adguard_export/script.py", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Command" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Command to execute. Do not change unless you know what you are doing." + } + ] + }, + { + "function": "URL", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "http://localhost:3000", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard Home URL" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Base URL of your AdGuard Home web interface, e.g. http://192.168.1.1:3000" + } + ] + }, + { + "function": "USER", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "admin", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard Home Username" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Username for AdGuard Home basic authentication." + } + ] + }, + { + "function": "PASSWORD", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [{ "type": "password" }], + "transformers": [] + } + ] + }, + "maxLength": 200, + "default_value": "", + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard Home Password" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Password for AdGuard Home basic authentication." + } + ] + }, + { + "function": "VERIFYSSL", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "true", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Verify SSL Certificate" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Set to 'false' to skip TLS certificate verification (useful for self-signed certs)." + } + ] + }, + { + "function": "INCLUDE_OFFLINE", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "true", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Include offline devices" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When 'true', devices not seen in the last scan are still exported to AdGuard Home." + } + ] + }, + { + "function": "INCLUDE_NEW", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "false", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Include new/unknown devices" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When 'true', devices flagged as new/unknown are also exported. When 'false', only identified devices are exported." + } + ] + }, + { + "function": "USEMAC", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "true", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Use MAC address as identifier" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When 'true', MAC addresses are used as the primary client identifier in AdGuard Home. Falls back to IP when no MAC is available." + } + ] + }, + { + "function": "DELETE", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "false", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Delete clients missing from NetAlertX" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Caution: When 'true', AdGuard Home clients that were previously exported by this plugin but are no longer present in NetAlertX will be deleted. Ownership is tracked in a local state file — manually added AdGuard clients are never deleted." + } + ] + } + ], + "database_column_definitions": [ + { + "column": "Object_PrimaryID", + "mapped_to_column": "cur_MAC", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "filter_info": { + "type": "MAC", + "name": "MAC", + "source": "table" + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "MAC / ID" + } + ] + }, + { + "column": "Object_SecondaryID", + "mapped_to_column": "cur_IP", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "IP Address" + } + ] + }, + { + "column": "Watched_Value1", + "mapped_to_column": "cur_Name", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Device Name" + } + ] + }, + { + "column": "Watched_Value2", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "MAC Address" + } + ] + }, + { + "column": "Watched_Value3", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Last IP" + } + ] + }, + { + "column": "Watched_Value4", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard URL" + } + ] + }, + { + "column": "Dummy", + "mapped_to_column": "scanSourcePlugin", + "mapped_to_column_data": { + "value": "adguard_export" + }, + "css_classes": "col-sm-2", + "show": false, + "type": "label", + "default_value": "", + "options": [], + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "ADGUARDEXP" + } + ] + }, + { + "column": "Extra", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [ + { + "option": "cssClass", + "value": "text-muted small" + } + ], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Status" + } + ] + }, + { + "column": "ForeignKey", + "mapped_to_column": "dev_MAC", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Device link" + } + ] + } + ] +} diff --git a/front/plugins/adguard_export/script.py b/front/plugins/adguard_export/script.py new file mode 100644 index 00000000..432e037c --- /dev/null +++ b/front/plugins/adguard_export/script.py @@ -0,0 +1,499 @@ +#!/usr/bin/env python +# adguard_export/script.py +# +# NetAlertX plugin: adguard_export +# Syncs known devices from the NetAlertX database to AdGuard Home as +# persistent clients, keeping names, MACs, and IP addresses in sync. +# +# AdGuard Home API reference: +# GET /control/clients – list all persistent clients +# POST /control/clients/add – create a new persistent client +# POST /control/clients/update – update an existing persistent client +# POST /control/clients/delete – remove a persistent client + +import os +import sys +import json +import requests +from pytz import timezone +import sqlite3 +from typing import Dict, List, Optional, Set, Tuple + +# Define the installation path and extend the system path for plugin imports +INSTALL_PATH = os.getenv('NETALERTX_APP', '/app') +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +from const import dataPath, logPath, fullDbPath # noqa: E402, E261 +from plugin_helper import Plugin_Objects # noqa: E402, E261 +from logger import mylog, Logger # noqa: E402, E261 +from helper import get_setting_value # noqa: E402, E261 +import conf # noqa: E402, E261 + +# ---------------------------- +# Plugin metadata +# ---------------------------- +pluginName = "ADGUARDEXP" + +# Make sure the TIMEZONE for logging is correct +conf.tz = timezone(get_setting_value("TIMEZONE")) + +# Make sure log level is initialized correctly +Logger(get_setting_value("LOG_LEVEL")) + +# Define paths +LOG_PATH = logPath + "/plugins" +RESULT_FILE = os.path.join(LOG_PATH, f"last_result.{pluginName}.log") +STATE_FILE = os.path.join(dataPath, f"state.{pluginName}.json") + +plugin_objects = Plugin_Objects(RESULT_FILE) + + +def load_managed_names() -> Set[str]: + """Return the set of AdGuard client names we previously added.""" + try: + with open(STATE_FILE) as f: + return set(json.load(f).get("managed", [])) + except (FileNotFoundError, json.JSONDecodeError): + return set() + + +def save_managed_names(names: Set[str]) -> None: + with open(STATE_FILE, "w") as f: + json.dump({"managed": sorted(names)}, f, indent=2) + + +# --------------------------------------------------------------------------- +# Device type → AdGuard tag mapping +# --------------------------------------------------------------------------- +_TYPE_TAG_MAP: Dict[str, str] = { + "ap": "device_other", + "desktop": "device_pc", + "game console": "device_gameconsole", + "gameconsole": "device_gameconsole", + "gateway": "device_other", + "house appliance": "device_other", + "hypervisor": "device_pc", + "ip camera": "device_camera", + "camera": "device_camera", + "laptop": "device_laptop", + "notebook": "device_laptop", + "nas": "device_nas", + "printer": "device_printer", + "router": "device_other", + "server": "device_pc", + "smarttv": "device_tv", + "smart tv": "device_tv", + "tv": "device_tv", + "smartphone": "device_phone", + "phone": "device_phone", + "mobile": "device_phone", + "smartwatch": "device_phone", + "watch": "device_phone", + "tablet": "device_tablet", + "virtual assistance": "device_audio", + "assistant": "device_audio", + "speaker": "device_audio", +} + + +def device_type_to_tag(dev_type: str) -> str: + """Map a NetAlertX devType string to a valid AdGuard Home tag, or ''.""" + if not dev_type: + return "" + key = dev_type.strip().lower() + if key in _TYPE_TAG_MAP: + return _TYPE_TAG_MAP[key] + # Substring fallback for partial matches + for pattern, tag in _TYPE_TAG_MAP.items(): + if pattern in key: + return tag + return "" + + +# --------------------------------------------------------------------------- +# AdGuard Home client +# --------------------------------------------------------------------------- +class AdGuardClient: + """Thin wrapper around the AdGuard Home /control/clients* API.""" + + def __init__(self, base_url: str, username: str, password: str, verify_ssl: bool = True): + self.base_url = base_url.rstrip("/") + self.auth = (username, password) + self.verify_ssl = verify_ssl + self.session = requests.Session() + self.session.auth = self.auth + + def _url(self, path: str) -> str: + return f"{self.base_url}/control/{path.lstrip('/')}" + + def get_clients(self) -> List[dict]: + """Return the list of persistent (manually added) clients.""" + resp = self.session.get(self._url("clients"), verify=self.verify_ssl, timeout=15) + resp.raise_for_status() + return resp.json().get("clients") or [] + + def add_client(self, client: dict) -> None: + resp = self.session.post( + self._url("clients/add"), + json=client, + verify=self.verify_ssl, + timeout=15, + ) + resp.raise_for_status() + + def update_client(self, old_name: str, client: dict) -> None: + payload = {"name": old_name, "data": client} + resp = self.session.post( + self._url("clients/update"), + json=payload, + verify=self.verify_ssl, + timeout=15, + ) + resp.raise_for_status() + + def delete_client(self, name: str) -> None: + resp = self.session.post( + self._url("clients/delete"), + json={"name": name}, + verify=self.verify_ssl, + timeout=15, + ) + resp.raise_for_status() + + +# --------------------------------------------------------------------------- +# Database helpers +# --------------------------------------------------------------------------- +def get_netalertx_devices(db_path: str, include_offline: bool, include_new: bool) -> List[dict]: + """ + Query NetAlertX's Devices table and return a list of dicts with the + fields we care about: mac, name, last_ip, dev_type + """ + devices = [] + conn = None + try: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cur = conn.cursor() + + clauses = ["devIsArchived = 0"] + if not include_offline: + clauses.append("devPresentLastScan = 1") + if not include_new: + clauses.append("devIsNew = 0") + where = "WHERE " + " AND ".join(clauses) + + cur.execute( + f""" + SELECT devMac AS mac, + devName AS name, + devLastIP AS last_ip, + devType AS dev_type + FROM Devices + {where} + ORDER BY devMac + """ + ) + for row in cur.fetchall(): + mac = (row["mac"] or "").strip() + name = (row["name"] or "").strip() + last_ip = (row["last_ip"] or "").strip() + dev_type = (row["dev_type"] or "").strip() + + # Skip completely empty rows + if not mac and not last_ip: + continue + + # Fall back to MAC as name when no friendly name is set + if not name: + name = mac or last_ip + + devices.append({"mac": mac, "name": name, "last_ip": last_ip, "dev_type": dev_type}) + + except sqlite3.Error as exc: + mylog("verbose", [f"[{pluginName}] ERROR reading NetAlertX database: {exc}"]) + finally: + if conn: + conn.close() + + return devices + + +# --------------------------------------------------------------------------- +# Sync logic +# --------------------------------------------------------------------------- +def build_agrd_client(device: dict, use_mac: bool) -> dict: + """ + Build an AdGuard Home client object from a NetAlertX device row. + + AdGuard Home identifies a client by its 'ids' list, which may contain + MACs, IPs, CIDRs, or ClientIDs. We prefer MAC when available; fall + back to IP otherwise. + """ + ids = [] + if use_mac and device["mac"] and device["mac"] not in ("", "00:00:00:00:00:00"): + ids.append(device["mac"].lower()) + if device["last_ip"] and device["last_ip"] not in ("", "0.0.0.0"): + ids.append(device["last_ip"]) + + if not ids: + return {} # nothing useful to identify the device + + tag = device_type_to_tag(device.get("dev_type", "")) + return { + "name": device["name"], + "ids": ids, + "tags": [tag] if tag else [], + "use_global_settings": True, + "use_global_blocked_services": True, + "filtering_enabled": False, + "parental_enabled": False, + "safebrowsing_enabled": False, + "safesearch_enabled": False, + "blocked_services": [], + "upstreams": [], + } + + +def sync_to_adguard( + agrd: AdGuardClient, + devices: List[dict], + use_mac: bool, + delete_missing: bool, + existing_clients: Optional[List[dict]] = None, +) -> Tuple[int, int, int, int]: + """ + Core sync routine. Returns (added, updated, skipped, deleted). + Pass existing_clients to reuse a list already fetched (avoids a second + round-trip when the caller performed a connectivity check first). + """ + if existing_clients is None: + existing_clients = agrd.get_clients() + mylog("verbose", [f"[{pluginName}] AdGuard Home currently has {len(existing_clients)} persistent client(s)."]) + + # Build a lookup: identifier → client dict + existing_by_id: Dict[str, dict] = {} + for client in existing_clients: + for cid in client.get("ids", []): + existing_by_id[cid.lower()] = client + + # Also index by name for update / delete operations (warn if AdGuard has duplicate names) + existing_by_name: Dict[str, dict] = {} + for c in existing_clients: + if c["name"] in existing_by_name: + mylog("verbose", [f"[{pluginName}] WARNING duplicate client name in AdGuard Home: {c['name']!r}"]) + existing_by_name[c["name"]] = c + + # Load the set of client names we've previously added so that DELETE mode + # only removes clients we created, not manually-added ones. + managed_names = load_managed_names() + + added = updated = skipped = deleted = 0 + + # ----- add / update ----- + for device in devices: + client_data = build_agrd_client(device, use_mac) + if not client_data: + if not use_mac and not device["last_ip"]: + reason = "no IP address (USEMAC is disabled, IP required)" + else: + reason = "no usable MAC or IP" + mylog("verbose", [f"[{pluginName}] SKIP {device['name']!r} – {reason}"]) + skipped += 1 + continue + + # Check whether any of the ids already exist in AdGuard + existing = None + for cid in client_data["ids"]: + if cid.lower() in existing_by_id: + existing = existing_by_id[cid.lower()] + break + + if existing is None and device["name"] in managed_names: + # Fall back to name match only for clients we previously added — avoids + # accidentally matching a manually-created AdGuard client with the same name. + existing = existing_by_name.get(device["name"]) + if existing: + mylog("verbose", [f"[{pluginName}] WARN matched {device['name']!r} by name (no ID match) — verify no duplicate clients"]) + + if existing: + old_name = existing["name"] + # Preserve existing per-client AdGuard settings; we only manage name, ids, tags. + _our_keys = frozenset(("name", "ids", "tags")) + merged_data = {**client_data, **{k: v for k, v in existing.items() if k not in _our_keys}} + # Only call update when something actually changed to avoid noise + if ( + sorted(i.lower() for i in existing.get("ids", [])) != sorted(i.lower() for i in client_data["ids"]) + or existing.get("name") != client_data["name"] + or sorted(existing.get("tags", [])) != sorted(client_data["tags"]) + ): + try: + agrd.update_client(old_name, merged_data) + mylog("verbose", [f"[{pluginName}] UPDATE {old_name!r} → {device['name']!r} ids={client_data['ids']}"]) + managed_names.discard(old_name) + managed_names.add(device["name"]) + updated += 1 + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR updating {device['name']!r}: {exc}"]) + skipped += 1 + else: + mylog("verbose", [f"[{pluginName}] SKIP (no change) {device['name']!r}"]) + managed_names.add(device["name"]) + skipped += 1 + else: + try: + agrd.add_client(client_data) + mylog("verbose", [f"[{pluginName}] ADD {device['name']!r} ids={client_data['ids']}"]) + managed_names.add(device["name"]) + added += 1 + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR adding {device['name']!r}: {exc}"]) + skipped += 1 + + # ----- optional delete of AdGuard clients no longer in NetAlertX ----- + if delete_missing: + export_names = {d["name"] for d in devices} + for client in existing_clients: + cname = client.get("name", "") + # Only delete clients that we previously added (tracked in state file) + # so we don't accidentally remove manually-added clients. + if cname in managed_names and cname not in export_names: + try: + agrd.delete_client(cname) + mylog("verbose", [f"[{pluginName}] DELETE {cname!r} (no longer in NetAlertX)"]) + managed_names.discard(cname) + deleted += 1 + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR deleting {cname!r}: {exc}"]) + + save_managed_names(managed_names) + return added, updated, skipped, deleted + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- +def main(): + mylog("verbose", [f"[{pluginName}] In script"]) + + # ------------------------------------------------------------------ + # Read settings + # ------------------------------------------------------------------ + agrd_url = get_setting_value("ADGUARDEXP_URL") or "http://localhost:3000" + agrd_user = get_setting_value("ADGUARDEXP_USER") or "" + agrd_pass = get_setting_value("ADGUARDEXP_PASSWORD") or "" + verify_ssl_str = get_setting_value("ADGUARDEXP_VERIFYSSL") or "true" + include_offline_str = get_setting_value("ADGUARDEXP_INCLUDE_OFFLINE") or "true" + include_new_str = get_setting_value("ADGUARDEXP_INCLUDE_NEW") or "false" + use_mac_str = get_setting_value("ADGUARDEXP_USEMAC") or "true" + delete_str = get_setting_value("ADGUARDEXP_DELETE") or "false" + + verify_ssl = verify_ssl_str.strip().lower() not in ("false", "0", "no") + include_offline = include_offline_str.strip().lower() not in ("false", "0", "no") + include_new = include_new_str.strip().lower() not in ("false", "0", "no") + use_mac = use_mac_str.strip().lower() not in ("false", "0", "no") + delete_miss = delete_str.strip().lower() not in ("false", "0", "no") + + mylog("verbose", [f"[{pluginName}] " + ("=" * 60)]) + mylog("verbose", [f"[{pluginName}] AdGuard Home URL : {agrd_url}"]) + mylog("verbose", [f"[{pluginName}] Include offline devs: {include_offline}"]) + mylog("verbose", [f"[{pluginName}] Include new/unknown : {include_new}"]) + mylog("verbose", [f"[{pluginName}] Use MAC as id : {use_mac}"]) + mylog("verbose", [f"[{pluginName}] Delete missing : {delete_miss}"]) + mylog("verbose", [f"[{pluginName}] " + ("=" * 60)]) + + # ------------------------------------------------------------------ + # Load devices from NetAlertX + # ------------------------------------------------------------------ + devices = get_netalertx_devices(fullDbPath, include_offline, include_new) + mylog("verbose", [f"[{pluginName}] Loaded {len(devices)} device(s) from NetAlertX database."]) + + if not devices: + mylog("verbose", ["No devices to sync – exiting."]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "summary", + watched1 = "0", + watched2 = "0", + watched3 = "0", + watched4 = "0", + extra = "No devices found in NetAlertX", + ) + plugin_objects.write_result_file() + return + + # ------------------------------------------------------------------ + # Connect to AdGuard Home and sync + # ------------------------------------------------------------------ + try: + agrd = AdGuardClient(agrd_url, agrd_user, agrd_pass, verify_ssl) + existing_clients = agrd.get_clients() + except requests.exceptions.ConnectionError as exc: + mylog("verbose", [f"[{pluginName}] ERROR – cannot connect to AdGuard Home at {agrd_url}: {exc}"]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "error", + extra = f"Connection failed: {exc}", + ) + plugin_objects.write_result_file() + return + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR – AdGuard Home returned an HTTP error: {exc}"]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "error", + extra = f"HTTP error: {exc}", + ) + plugin_objects.write_result_file() + return + except Exception as exc: + mylog("verbose", [f"[{pluginName}] ERROR – AdGuard Home returned an unknown error: {exc}"]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "error", + extra = f"Unknown error: {exc}", + ) + plugin_objects.write_result_file() + return + + added, updated, skipped, deleted = sync_to_adguard( + agrd, devices, use_mac, delete_miss, existing_clients=existing_clients + ) + + summary = ( + f"Sync complete – added={added} updated={updated} " + f"skipped={skipped} deleted={deleted}" + ) + + # ------------------------------------------------------------------ + # Write plugin result (one summary row + one row per touched device) + # ------------------------------------------------------------------ + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "summary", + watched1 = str(added), + watched2 = str(updated), + watched3 = str(skipped), + watched4 = str(deleted), + extra = summary, + ) + + for device in devices: + plugin_objects.add_object( + primaryId = device["mac"] or device["last_ip"], + secondaryId = device["last_ip"], + watched1 = device["name"], + watched2 = device["mac"], + watched3 = device["last_ip"], + watched4 = agrd_url, + extra = "exported", + foreignKey = device["mac"] or "", + ) + + mylog("verbose", [f"[{pluginName}] {summary}"]) + plugin_objects.write_result_file() + return + + +if __name__ == "__main__": + main() diff --git a/front/plugins/adguard_import/config.json b/front/plugins/adguard_import/config.json index f4df3377..e4c2a6e1 100644 --- a/front/plugins/adguard_import/config.json +++ b/front/plugins/adguard_import/config.json @@ -26,7 +26,7 @@ "description": [ { "language_code": "en_us", - "string": "Plugin to ..." + "string": "Imports known devices from AdGuard to NetAlertX as persistent devices, keeping names and IP/MAC identifiers in sync." } ], "icon": [ @@ -36,7 +36,7 @@ } ], "params": [], - "settings": [ + "settings": [ { "function": "RUN", "events": ["run"], diff --git a/test/plugins/test_adguard_export.py b/test/plugins/test_adguard_export.py new file mode 100644 index 00000000..98051865 --- /dev/null +++ b/test/plugins/test_adguard_export.py @@ -0,0 +1,401 @@ +""" +Tests for adguard_export/script.py + +Run from inside the NetAlertX container (where the full environment is available), +or locally — in that case the NetAlertX-specific modules are stubbed out +automatically before the script is imported. + + pytest test/plugins/test_adguard_export.py -v +""" + +import json +import os +import sqlite3 +import sys +import tempfile +import types +from unittest.mock import MagicMock, call, patch + +import pytest + +# --------------------------------------------------------------------------- +# Stub NetAlertX-specific modules so tests can run outside the container. +# sys.modules.setdefault() is a no-op when the real module is already loaded, +# so this is safe to run inside the container too. +# --------------------------------------------------------------------------- +_tmp_log = tempfile.mkdtemp() + + +def _stub(name: str, **attrs): + if name not in sys.modules: + mod = types.ModuleType(name) + for k, v in attrs.items(): + setattr(mod, k, v) + sys.modules[name] = mod + + +_stub("pytz", timezone=lambda tz: tz) +_stub("conf") +_stub("const", dataPath=_tmp_log, logPath=_tmp_log, fullDbPath=os.path.join(_tmp_log, "test.db")) +_stub("plugin_helper", Plugin_Objects=MagicMock) +_stub("logger", mylog=lambda *a: None, Logger=MagicMock) +_stub("helper", get_setting_value=lambda k: "") + +# Stub requests only when it isn't installed (e.g. bare system Python locally). +# In the container and CI, the real package is present and will be used. +if "requests" not in sys.modules: + _req = types.ModuleType("requests") + _req.Session = MagicMock + _req.HTTPError = type("HTTPError", (Exception,), {}) + _req_exc = types.ModuleType("requests.exceptions") + _req_exc.ConnectionError = type("ConnectionError", (Exception,), {}) + _req.exceptions = _req_exc + sys.modules["requests"] = _req + sys.modules["requests.exceptions"] = _req_exc + +# --------------------------------------------------------------------------- +# Import the functions under test (must come after the stubs above). +# --------------------------------------------------------------------------- +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "front", "plugins", "adguard_export")) + +from script import ( # noqa: E402 + AdGuardClient, + _TYPE_TAG_MAP, + build_agrd_client, + device_type_to_tag, + get_netalertx_devices, + load_managed_names, + save_managed_names, + sync_to_adguard, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_db(path: str, rows: list[dict]) -> None: + """Create a minimal Devices table and populate it with *rows*.""" + conn = sqlite3.connect(path) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS Devices ( + devMac TEXT, + devName TEXT, + devLastIP TEXT, + devType TEXT, + devIsArchived INTEGER DEFAULT 0, + devPresentLastScan INTEGER DEFAULT 1, + devIsNew INTEGER DEFAULT 0 + ) + """ + ) + for row in rows: + conn.execute( + "INSERT INTO Devices VALUES (?,?,?,?,?,?,?)", + ( + row.get("devMac", ""), + row.get("devName", ""), + row.get("devLastIP", ""), + row.get("devType", ""), + row.get("devIsArchived", 0), + row.get("devPresentLastScan", 1), + row.get("devIsNew", 0), + ), + ) + conn.commit() + conn.close() + + +def _mock_agrd(existing=None) -> MagicMock: + """Return a mock AdGuardClient whose get_clients() returns *existing*.""" + agrd = MagicMock(spec=AdGuardClient) + agrd.get_clients.return_value = existing or [] + return agrd + + +# --------------------------------------------------------------------------- +# device_type_to_tag +# --------------------------------------------------------------------------- + + +class TestDeviceTypeToTag: + def test_empty_string_returns_empty(self): + assert device_type_to_tag("") == "" + + def test_none_returns_empty(self): + assert device_type_to_tag(None) == "" + + def test_exact_match_case_insensitive(self): + assert device_type_to_tag("Smartphone") == "device_phone" + assert device_type_to_tag("LAPTOP") == "device_laptop" + assert device_type_to_tag("nas") == "device_nas" + + def test_substring_fallback(self): + # "gaming smartphone" contains "smartphone" + assert device_type_to_tag("gaming smartphone") == "device_phone" + + def test_unknown_type_returns_empty(self): + assert device_type_to_tag("toaster") == "" + + def test_all_map_values_are_valid_adguard_tags(self): + valid_prefixes = ("device_", "ct_", "os_") + for tag in _TYPE_TAG_MAP.values(): + assert any(tag.startswith(p) for p in valid_prefixes), ( + f"{tag!r} is not a valid AdGuard Home tag" + ) + + +# --------------------------------------------------------------------------- +# build_agrd_client +# --------------------------------------------------------------------------- + + +class TestBuildAgrdClient: + def _device(self, **overrides) -> dict: + base = {"mac": "AA:BB:CC:DD:EE:FF", "name": "My PC", "last_ip": "192.168.1.10", "dev_type": "desktop"} + return {**base, **overrides} + + def test_mac_and_ip_both_included_when_use_mac_true(self): + result = build_agrd_client(self._device(), use_mac=True) + assert "aa:bb:cc:dd:ee:ff" in result["ids"] + assert "192.168.1.10" in result["ids"] + + def test_only_ip_when_use_mac_false(self): + result = build_agrd_client(self._device(), use_mac=False) + assert result["ids"] == ["192.168.1.10"] + + def test_returns_empty_dict_when_no_usable_id(self): + result = build_agrd_client( + {"mac": "", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""}, + use_mac=True, + ) + assert result == {} + + def test_null_mac_falls_back_to_ip(self): + result = build_agrd_client( + {"mac": "00:00:00:00:00:00", "name": "Dev", "last_ip": "10.0.0.5", "dev_type": ""}, + use_mac=True, + ) + assert result["ids"] == ["10.0.0.5"] + + def test_device_type_tag_applied(self): + result = build_agrd_client(self._device(dev_type="smartphone"), use_mac=True) + assert result["tags"] == ["device_phone"] + + def test_unknown_device_type_produces_no_tag(self): + result = build_agrd_client(self._device(dev_type=""), use_mac=True) + assert result["tags"] == [] + + def test_mac_is_lowercased(self): + result = build_agrd_client(self._device(mac="AA:BB:CC:DD:EE:FF"), use_mac=True) + assert "aa:bb:cc:dd:ee:ff" in result["ids"] + + +# --------------------------------------------------------------------------- +# load_managed_names / save_managed_names +# --------------------------------------------------------------------------- + + +class TestManagedNames: + def test_round_trip(self, tmp_path): + state = tmp_path / "state.json" + with patch("script.STATE_FILE", str(state)): + save_managed_names({"alpha", "beta", "gamma"}) + loaded = load_managed_names() + assert loaded == {"alpha", "beta", "gamma"} + + def test_missing_file_returns_empty_set(self, tmp_path): + with patch("script.STATE_FILE", str(tmp_path / "nonexistent.json")): + assert load_managed_names() == set() + + def test_corrupt_file_returns_empty_set(self, tmp_path): + state = tmp_path / "state.json" + state.write_text("not valid json") + with patch("script.STATE_FILE", str(state)): + assert load_managed_names() == set() + + def test_save_sorts_names(self, tmp_path): + state = tmp_path / "state.json" + with patch("script.STATE_FILE", str(state)): + save_managed_names({"zebra", "apple", "mango"}) + data = json.loads(state.read_text()) + assert data["managed"] == ["apple", "mango", "zebra"] + + +# --------------------------------------------------------------------------- +# get_netalertx_devices +# --------------------------------------------------------------------------- + + +class TestGetNetalertxDevices: + def test_basic_query(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [{"devMac": "AA:BB:CC:00:00:01", "devName": "PC", "devLastIP": "10.0.0.1", "devType": "desktop"}]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert len(result) == 1 + assert result[0]["name"] == "PC" + assert result[0]["mac"] == "AA:BB:CC:00:00:01" + + def test_archived_devices_excluded(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [ + {"devMac": "AA:00:00:00:00:01", "devName": "Active", "devLastIP": "10.0.0.1", "devIsArchived": 0}, + {"devMac": "AA:00:00:00:00:02", "devName": "Archived", "devLastIP": "10.0.0.2", "devIsArchived": 1}, + ]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert len(result) == 1 + assert result[0]["name"] == "Active" + + def test_offline_excluded_when_flag_false(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [ + {"devMac": "AA:00:00:00:00:01", "devName": "Online", "devLastIP": "10.0.0.1", "devPresentLastScan": 1}, + {"devMac": "AA:00:00:00:00:02", "devName": "Offline", "devLastIP": "10.0.0.2", "devPresentLastScan": 0}, + ]) + result = get_netalertx_devices(db, include_offline=False, include_new=True) + assert len(result) == 1 + assert result[0]["name"] == "Online" + + def test_new_devices_excluded_when_flag_false(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [ + {"devMac": "AA:00:00:00:00:01", "devName": "Known", "devLastIP": "10.0.0.1", "devIsNew": 0}, + {"devMac": "AA:00:00:00:00:02", "devName": "Unknown", "devLastIP": "10.0.0.2", "devIsNew": 1}, + ]) + result = get_netalertx_devices(db, include_offline=True, include_new=False) + assert len(result) == 1 + assert result[0]["name"] == "Known" + + def test_nameless_device_falls_back_to_mac(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [{"devMac": "BB:CC:DD:EE:FF:00", "devName": "", "devLastIP": "10.0.0.5"}]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert result[0]["name"] == "BB:CC:DD:EE:FF:00" + + def test_row_with_no_mac_and_no_ip_skipped(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [{"devMac": "", "devName": "Ghost", "devLastIP": ""}]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert result == [] + + def test_missing_db_returns_empty_list(self, tmp_path): + result = get_netalertx_devices(str(tmp_path / "missing.db"), True, True) + assert result == [] + + +# --------------------------------------------------------------------------- +# sync_to_adguard +# --------------------------------------------------------------------------- + + +class TestSyncToAdguard: + def _device(self, name="PC", mac="AA:BB:CC:00:00:01", ip="10.0.0.1", dev_type="desktop") -> dict: + return {"mac": mac, "name": name, "last_ip": ip, "dev_type": dev_type} + + def test_new_device_is_added(self, tmp_path): + agrd = _mock_agrd(existing=[]) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [self._device()], use_mac=True, delete_missing=False + ) + assert added == 1 + assert updated == skipped == deleted == 0 + agrd.add_client.assert_called_once() + + def test_unchanged_device_is_skipped(self, tmp_path): + existing = [{"name": "PC", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [self._device()], use_mac=True, delete_missing=False + ) + assert skipped == 1 + assert added == updated == deleted == 0 + agrd.update_client.assert_not_called() + + def test_renamed_device_is_updated(self, tmp_path): + existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False + ) + assert updated == 1 + agrd.update_client.assert_called_once_with("Old Name", agrd.update_client.call_args[0][1]) + + def test_missing_device_deleted_when_flag_true(self, tmp_path): + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": ["Gone Device"]})) + existing = [{"name": "Gone Device", "ids": ["10.0.0.99"], "tags": []}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(state)): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [], use_mac=True, delete_missing=True + ) + assert deleted == 1 + agrd.delete_client.assert_called_once_with("Gone Device") + + def test_unmanaged_device_not_deleted(self, tmp_path): + # State file is empty — we never added this client + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": []})) + existing = [{"name": "Manual Client", "ids": ["10.0.0.50"], "tags": []}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(state)): + sync_to_adguard(agrd, [], use_mac=True, delete_missing=True) + agrd.delete_client.assert_not_called() + + def test_device_with_no_usable_id_is_skipped(self, tmp_path): + agrd = _mock_agrd(existing=[]) + device = {"mac": "00:00:00:00:00:00", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""} + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [device], use_mac=True, delete_missing=False + ) + assert skipped == 1 + agrd.add_client.assert_not_called() + + def test_existing_clients_parameter_avoids_extra_api_call(self, tmp_path): + existing = [] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + sync_to_adguard( + agrd, [self._device()], use_mac=True, delete_missing=False, + existing_clients=existing, + ) + agrd.get_clients.assert_not_called() + + def test_rename_removes_old_name_from_managed_names(self, tmp_path): + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": ["Old Name"]})) + existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(state)): + sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False) + loaded = load_managed_names() + assert "Old Name" not in loaded + assert "New Name" in loaded + + def test_update_preserves_custom_adguard_settings(self, tmp_path): + existing = [{ + "name": "Old Name", + "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], + "tags": ["device_pc"], + "filtering_enabled": True, + "use_global_settings": False, + "parental_enabled": True, + "safebrowsing_enabled": False, + "safesearch_enabled": False, + "use_global_blocked_services": False, + "blocked_services": ["youtube.com"], + "upstreams": ["1.1.1.1"], + }] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False) + _, sent_payload = agrd.update_client.call_args[0] + assert sent_payload["filtering_enabled"] is True + assert sent_payload["use_global_settings"] is False + assert sent_payload["blocked_services"] == ["youtube.com"] + assert sent_payload["upstreams"] == ["1.1.1.1"] From ca7a699ce3c7091f1fae483b4fe92e07320f12d2 Mon Sep 17 00:00:00 2001 From: Nathan Jacobson <22107+natecj@users.noreply.github.com> Date: Sat, 23 May 2026 14:22:58 -0400 Subject: [PATCH 2/6] fix: address PR review feedback on adguard_export plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config.json: add show:true to all visible column definitions so they render in the plugin output table - script.py: fix managed_names adoption bug — update/skip branches no longer add unowned clients to managed state; rename tracking now scoped to plugin-created clients only - README.md: fix ADGUARDEXP_URL default (localhost:3000, not local IP), add language tags to fenced code blocks, normalise metadata block to Other info / Maintainer / DD-Mon-YYYY format - test_adguard_export.py: add regression test for manual client matched by ID not being adopted into managed state Co-Authored-By: Claude Sonnet 4.6 --- front/plugins/adguard_export/README.md | 14 +++++++------- front/plugins/adguard_export/config.json | 8 ++++++++ front/plugins/adguard_export/script.py | 9 ++++++--- test/plugins/test_adguard_export.py | 14 ++++++++++++++ 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/front/plugins/adguard_export/README.md b/front/plugins/adguard_export/README.md index e0cdc948..a7ed0ef0 100644 --- a/front/plugins/adguard_export/README.md +++ b/front/plugins/adguard_export/README.md @@ -43,7 +43,7 @@ Device types set in NetAlertX (e.g. `Smartphone`, `Laptop`, `NAS`) are automatic |---|---|---| | `ADGUARDEXP_RUN` | `disabled` | When to run: `disabled`, `once`, or `schedule` | | `ADGUARDEXP_RUN_SCHD` | `0 * * * *` | Cron schedule (default: hourly) | -| `ADGUARDEXP_URL` | `http://192.168.11.1:3000` | Base URL of AdGuard Home web UI | +| `ADGUARDEXP_URL` | `http://localhost:3000` | Base URL of AdGuard Home web UI | | `ADGUARDEXP_USER` | `admin` | AdGuard Home username | | `ADGUARDEXP_PASSWORD` | *(empty)* | AdGuard Home password | | `ADGUARDEXP_VERIFYSSL` | `true` | Verify TLS cert; set `false` for self-signed certs | @@ -93,7 +93,7 @@ Devices with an unrecognised or empty type are exported without a tag. When `ADGUARDEXP_DELETE=true`, the plugin only removes clients it previously created — it will never delete clients you added manually in AdGuard Home. Ownership is tracked in a local state file at: -``` +```text /app/db/state.ADGUARDEXP.json ``` @@ -103,13 +103,13 @@ When `ADGUARDEXP_DELETE=true`, the plugin only removes clients it previously cre Plugin logs are written to: -``` +```text /tmp/log/plugins/script.ADGUARDEXP.log ``` Result rows (used by the NetAlertX UI) are written to: -``` +```text /tmp/log/plugins/last_result.ADGUARDEXP.log ``` @@ -133,8 +133,8 @@ Result rows (used by the NetAlertX UI) are written to: --- -## Notes +### Other info - Version: 1.0.0 -- Author: [natecj](https://github.com/natecj) -- Release Date: 2026-05-10 +- Maintainer: [natecj](https://github.com/natecj) +- Release Date: 10-May-2026 diff --git a/front/plugins/adguard_export/config.json b/front/plugins/adguard_export/config.json index c6da9027..6e19087e 100644 --- a/front/plugins/adguard_export/config.json +++ b/front/plugins/adguard_export/config.json @@ -340,6 +340,7 @@ { "column": "Object_PrimaryID", "mapped_to_column": "cur_MAC", + "show": true, "type": { "dataType": "string", "elements": [ @@ -366,6 +367,7 @@ { "column": "Object_SecondaryID", "mapped_to_column": "cur_IP", + "show": true, "type": { "dataType": "string", "elements": [ @@ -387,6 +389,7 @@ { "column": "Watched_Value1", "mapped_to_column": "cur_Name", + "show": true, "type": { "dataType": "string", "elements": [ @@ -407,6 +410,7 @@ }, { "column": "Watched_Value2", + "show": true, "type": { "dataType": "string", "elements": [ @@ -427,6 +431,7 @@ }, { "column": "Watched_Value3", + "show": true, "type": { "dataType": "string", "elements": [ @@ -447,6 +452,7 @@ }, { "column": "Watched_Value4", + "show": true, "type": { "dataType": "string", "elements": [ @@ -486,6 +492,7 @@ }, { "column": "Extra", + "show": true, "type": { "dataType": "string", "elements": [ @@ -512,6 +519,7 @@ { "column": "ForeignKey", "mapped_to_column": "dev_MAC", + "show": true, "type": { "dataType": "string", "elements": [ diff --git a/front/plugins/adguard_export/script.py b/front/plugins/adguard_export/script.py index 432e037c..03548bb6 100644 --- a/front/plugins/adguard_export/script.py +++ b/front/plugins/adguard_export/script.py @@ -330,15 +330,18 @@ def sync_to_adguard( try: agrd.update_client(old_name, merged_data) mylog("verbose", [f"[{pluginName}] UPDATE {old_name!r} → {device['name']!r} ids={client_data['ids']}"]) - managed_names.discard(old_name) - managed_names.add(device["name"]) + # Only track the rename for clients we already own — never adopt a manually-created client. + if old_name in managed_names: + managed_names.discard(old_name) + managed_names.add(device["name"]) updated += 1 except requests.HTTPError as exc: mylog("verbose", [f"[{pluginName}] ERROR updating {device['name']!r}: {exc}"]) skipped += 1 else: mylog("verbose", [f"[{pluginName}] SKIP (no change) {device['name']!r}"]) - managed_names.add(device["name"]) + # No managed_names update: if we created this client it's already in the state + # file; if it's a manually-created client we must not claim ownership of it. skipped += 1 else: try: diff --git a/test/plugins/test_adguard_export.py b/test/plugins/test_adguard_export.py index 98051865..fe2629f1 100644 --- a/test/plugins/test_adguard_export.py +++ b/test/plugins/test_adguard_export.py @@ -346,6 +346,20 @@ class TestSyncToAdguard: sync_to_adguard(agrd, [], use_mac=True, delete_missing=True) agrd.delete_client.assert_not_called() + def test_manual_client_matched_by_id_not_adopted(self, tmp_path): + # A manually-created AdGuard client whose IP matches a NetAlertX device + # must not be added to managed_names — so DELETE=true won't touch it later. + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": []})) + existing = [{"name": "Manual Client", "ids": ["10.0.0.5"], "tags": []}] + agrd = _mock_agrd(existing=existing) + device = {"mac": "", "name": "Manual Client", "last_ip": "10.0.0.5", "dev_type": ""} + with patch("script.STATE_FILE", str(state)): + sync_to_adguard(agrd, [device], use_mac=True, delete_missing=True) + loaded = load_managed_names() + assert "Manual Client" not in loaded + agrd.delete_client.assert_not_called() + def test_device_with_no_usable_id_is_skipped(self, tmp_path): agrd = _mock_agrd(existing=[]) device = {"mac": "00:00:00:00:00:00", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""} From 8e391f503e89cf2e59699b8431e2697d33ddda35 Mon Sep 17 00:00:00 2001 From: mid Date: Sun, 24 May 2026 09:03:47 +0200 Subject: [PATCH 3/6] Translated using Weblate (Japanese) Currently translated at 100.0% (809 of 809 strings) Translation: NetAlertX/core Translate-URL: https://hosted.weblate.org/projects/pialert/core/ja/ --- front/php/templates/language/ja_jp.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/front/php/templates/language/ja_jp.json b/front/php/templates/language/ja_jp.json index ff84525d..8613049e 100644 --- a/front/php/templates/language/ja_jp.json +++ b/front/php/templates/language/ja_jp.json @@ -66,8 +66,8 @@ "CustProps_cant_remove": "削除できません。少なくとも1つのプロパティが必要です。", "DAYS_TO_KEEP_EVENTS_description": "これはメンテナンス設定です。イベントエントリを保持する日数を指定します。それより古いイベントは定期的に削除されます。プラグインイベント履歴にも適用されます。", "DAYS_TO_KEEP_EVENTS_name": "古いイベントの削除", - "DEEP_SLEEP_description": "", - "DEEP_SLEEP_name": "", + "DEEP_SLEEP_description": "処理サイクル間のアイドル待機時間を延長することで、CPU使用率を低減します。この機能を有効にすると、スキャンが最大1分間遅延したり、UIの反応が悪くなったりする場合があります。", + "DEEP_SLEEP_name": "ディープスリープ", "DISCOVER_PLUGINS_description": "このオプションを無効にすると、初期化と設定の保存が高速化されます。無効にした場合、プラグインは検出されず、LOADED_PLUGINS 設定に新しいプラグインを追加することはできません。", "DISCOVER_PLUGINS_name": "プラグインの検出", "DevDetail_Children_Title": "親子関係", @@ -249,8 +249,8 @@ "Device_TableHead_Name": "名前", "Device_TableHead_NetworkSite": "ネットワークサイト", "Device_TableHead_Owner": "所有者", - "Device_TableHead_ParentRelType": "関係種別", - "Device_TableHead_Parent_MAC": "親ネットワークノード", + "Device_TableHead_ParentRelType": "関係", + "Device_TableHead_Parent_MAC": "親ノード", "Device_TableHead_Port": "ポート", "Device_TableHead_PresentLastScan": "検出", "Device_TableHead_ReqNicsOnline": "OnlineのNICが必要", @@ -346,7 +346,7 @@ "Gen_LockedDB": "エラー - DBがロックされている可能性があります - F12で開発者ツール→コンソールを確認するか、後で試してください。", "Gen_NetworkMask": "ネットワークマスク", "Gen_New": "New", - "Gen_No_Data": "", + "Gen_No_Data": "データなし", "Gen_Offline": "オフライン", "Gen_Okay": "Ok", "Gen_Online": "オンライン", @@ -808,4 +808,4 @@ "settings_system_label": "システム", "settings_update_item_warning": "以下の値を更新してください。以前のフォーマットに従うよう注意してください。検証は行われません。", "test_event_tooltip": "設定をテストする前に、まず変更を保存してください。" -} \ No newline at end of file +} From 93e534cef528ac5619deda3ad5911397c19cdfb4 Mon Sep 17 00:00:00 2001 From: Nathan Jacobson <22107+natecj@users.noreply.github.com> Date: Tue, 26 May 2026 22:03:14 -0400 Subject: [PATCH 4/6] refactor: use DeviceInstance model instead of direct SQLite query Replaces the raw sqlite3 query in get_netalertx_devices() with DeviceInstance().getAll() as suggested in code review, applying the archived/offline/new filters in Python. Removes the sqlite3 and fullDbPath imports. Updates tests to mock DeviceInstance.getAll(). Co-Authored-By: Claude Sonnet 4.6 --- front/plugins/adguard_export/script.py | 64 +++++--------- test/plugins/test_adguard_export.py | 110 ++++++++++--------------- 2 files changed, 64 insertions(+), 110 deletions(-) diff --git a/front/plugins/adguard_export/script.py b/front/plugins/adguard_export/script.py index 03548bb6..f66c74d8 100644 --- a/front/plugins/adguard_export/script.py +++ b/front/plugins/adguard_export/script.py @@ -16,17 +16,17 @@ import sys import json import requests from pytz import timezone -import sqlite3 from typing import Dict, List, Optional, Set, Tuple # Define the installation path and extend the system path for plugin imports INSTALL_PATH = os.getenv('NETALERTX_APP', '/app') sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) -from const import dataPath, logPath, fullDbPath # noqa: E402, E261 +from const import dataPath, logPath # noqa: E402, E261 from plugin_helper import Plugin_Objects # noqa: E402, E261 from logger import mylog, Logger # noqa: E402, E261 from helper import get_setting_value # noqa: E402, E261 +from models.device_instance import DeviceInstance # noqa: E402, E261 import conf # noqa: E402, E261 # ---------------------------- @@ -164,57 +164,35 @@ class AdGuardClient: # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- -def get_netalertx_devices(db_path: str, include_offline: bool, include_new: bool) -> List[dict]: +def get_netalertx_devices(include_offline: bool, include_new: bool) -> List[dict]: """ - Query NetAlertX's Devices table and return a list of dicts with the - fields we care about: mac, name, last_ip, dev_type + Return filtered devices from NetAlertX using the DeviceInstance model. + Fields returned per device: mac, name, last_ip, dev_type """ devices = [] - conn = None try: - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - cur = conn.cursor() - - clauses = ["devIsArchived = 0"] - if not include_offline: - clauses.append("devPresentLastScan = 1") - if not include_new: - clauses.append("devIsNew = 0") - where = "WHERE " + " AND ".join(clauses) - - cur.execute( - f""" - SELECT devMac AS mac, - devName AS name, - devLastIP AS last_ip, - devType AS dev_type - FROM Devices - {where} - ORDER BY devMac - """ - ) - for row in cur.fetchall(): - mac = (row["mac"] or "").strip() - name = (row["name"] or "").strip() - last_ip = (row["last_ip"] or "").strip() - dev_type = (row["dev_type"] or "").strip() - - # Skip completely empty rows - if not mac and not last_ip: + for d in DeviceInstance().getAll(): + if d.get("devIsArchived", 0): + continue + if not include_offline and not d.get("devPresentLastScan", 1): + continue + if not include_new and d.get("devIsNew", 0): continue - # Fall back to MAC as name when no friendly name is set + mac = (d.get("devMac", "") or "").strip() + last_ip = (d.get("devLastIP", "") or "").strip() + name = (d.get("devName", "") or "").strip() + dev_type = (d.get("devType", "") or "").strip() + + if not mac and not last_ip: + continue if not name: name = mac or last_ip devices.append({"mac": mac, "name": name, "last_ip": last_ip, "dev_type": dev_type}) - except sqlite3.Error as exc: - mylog("verbose", [f"[{pluginName}] ERROR reading NetAlertX database: {exc}"]) - finally: - if conn: - conn.close() + except Exception as exc: + mylog("verbose", [f"[{pluginName}] ERROR reading devices: {exc}"]) return devices @@ -408,7 +386,7 @@ def main(): # ------------------------------------------------------------------ # Load devices from NetAlertX # ------------------------------------------------------------------ - devices = get_netalertx_devices(fullDbPath, include_offline, include_new) + devices = get_netalertx_devices(include_offline, include_new) mylog("verbose", [f"[{pluginName}] Loaded {len(devices)} device(s) from NetAlertX database."]) if not devices: diff --git a/test/plugins/test_adguard_export.py b/test/plugins/test_adguard_export.py index fe2629f1..cf0973c6 100644 --- a/test/plugins/test_adguard_export.py +++ b/test/plugins/test_adguard_export.py @@ -10,7 +10,6 @@ automatically before the script is imported. import json import os -import sqlite3 import sys import tempfile import types @@ -40,6 +39,8 @@ _stub("const", dataPath=_tmp_log, logPath=_tmp_log, fullDbPath=os.path.join(_tmp _stub("plugin_helper", Plugin_Objects=MagicMock) _stub("logger", mylog=lambda *a: None, Logger=MagicMock) _stub("helper", get_setting_value=lambda k: "") +_stub("models", ) +_stub("models.device_instance", DeviceInstance=MagicMock) # Stub requests only when it isn't installed (e.g. bare system Python locally). # In the container and CI, the real package is present and will be used. @@ -74,37 +75,18 @@ from script import ( # noqa: E402 # Helpers # --------------------------------------------------------------------------- -def _make_db(path: str, rows: list[dict]) -> None: - """Create a minimal Devices table and populate it with *rows*.""" - conn = sqlite3.connect(path) - conn.execute( - """ - CREATE TABLE IF NOT EXISTS Devices ( - devMac TEXT, - devName TEXT, - devLastIP TEXT, - devType TEXT, - devIsArchived INTEGER DEFAULT 0, - devPresentLastScan INTEGER DEFAULT 1, - devIsNew INTEGER DEFAULT 0 - ) - """ - ) - for row in rows: - conn.execute( - "INSERT INTO Devices VALUES (?,?,?,?,?,?,?)", - ( - row.get("devMac", ""), - row.get("devName", ""), - row.get("devLastIP", ""), - row.get("devType", ""), - row.get("devIsArchived", 0), - row.get("devPresentLastScan", 1), - row.get("devIsNew", 0), - ), - ) - conn.commit() - conn.close() +def _raw_device(**overrides) -> dict: + """Build a raw DeviceInstance.getAll() style dict.""" + base = { + "devMac": "AA:BB:CC:00:00:01", + "devName": "PC", + "devLastIP": "10.0.0.1", + "devType": "desktop", + "devIsArchived": 0, + "devPresentLastScan": 1, + "devIsNew": 0, + } + return {**base, **overrides} def _mock_agrd(existing=None) -> MagicMock: @@ -229,59 +211,53 @@ class TestManagedNames: class TestGetNetalertxDevices: - def test_basic_query(self, tmp_path): - db = str(tmp_path / "na.db") - _make_db(db, [{"devMac": "AA:BB:CC:00:00:01", "devName": "PC", "devLastIP": "10.0.0.1", "devType": "desktop"}]) - result = get_netalertx_devices(db, include_offline=True, include_new=True) + def _call(self, rows, include_offline=True, include_new=True): + with patch("script.DeviceInstance") as mock_di: + mock_di.return_value.getAll.return_value = rows + return get_netalertx_devices(include_offline=include_offline, include_new=include_new) + + def test_basic_query(self): + result = self._call([_raw_device()]) assert len(result) == 1 assert result[0]["name"] == "PC" assert result[0]["mac"] == "AA:BB:CC:00:00:01" - def test_archived_devices_excluded(self, tmp_path): - db = str(tmp_path / "na.db") - _make_db(db, [ - {"devMac": "AA:00:00:00:00:01", "devName": "Active", "devLastIP": "10.0.0.1", "devIsArchived": 0}, - {"devMac": "AA:00:00:00:00:02", "devName": "Archived", "devLastIP": "10.0.0.2", "devIsArchived": 1}, + def test_archived_devices_excluded(self): + result = self._call([ + _raw_device(devMac="AA:00:00:00:00:01", devName="Active", devIsArchived=0), + _raw_device(devMac="AA:00:00:00:00:02", devName="Archived", devIsArchived=1), ]) - result = get_netalertx_devices(db, include_offline=True, include_new=True) assert len(result) == 1 assert result[0]["name"] == "Active" - def test_offline_excluded_when_flag_false(self, tmp_path): - db = str(tmp_path / "na.db") - _make_db(db, [ - {"devMac": "AA:00:00:00:00:01", "devName": "Online", "devLastIP": "10.0.0.1", "devPresentLastScan": 1}, - {"devMac": "AA:00:00:00:00:02", "devName": "Offline", "devLastIP": "10.0.0.2", "devPresentLastScan": 0}, - ]) - result = get_netalertx_devices(db, include_offline=False, include_new=True) + def test_offline_excluded_when_flag_false(self): + result = self._call([ + _raw_device(devMac="AA:00:00:00:00:01", devName="Online", devPresentLastScan=1), + _raw_device(devMac="AA:00:00:00:00:02", devName="Offline", devPresentLastScan=0), + ], include_offline=False) assert len(result) == 1 assert result[0]["name"] == "Online" - def test_new_devices_excluded_when_flag_false(self, tmp_path): - db = str(tmp_path / "na.db") - _make_db(db, [ - {"devMac": "AA:00:00:00:00:01", "devName": "Known", "devLastIP": "10.0.0.1", "devIsNew": 0}, - {"devMac": "AA:00:00:00:00:02", "devName": "Unknown", "devLastIP": "10.0.0.2", "devIsNew": 1}, - ]) - result = get_netalertx_devices(db, include_offline=True, include_new=False) + def test_new_devices_excluded_when_flag_false(self): + result = self._call([ + _raw_device(devMac="AA:00:00:00:00:01", devName="Known", devIsNew=0), + _raw_device(devMac="AA:00:00:00:00:02", devName="Unknown", devIsNew=1), + ], include_new=False) assert len(result) == 1 assert result[0]["name"] == "Known" - def test_nameless_device_falls_back_to_mac(self, tmp_path): - db = str(tmp_path / "na.db") - _make_db(db, [{"devMac": "BB:CC:DD:EE:FF:00", "devName": "", "devLastIP": "10.0.0.5"}]) - result = get_netalertx_devices(db, include_offline=True, include_new=True) + def test_nameless_device_falls_back_to_mac(self): + result = self._call([_raw_device(devMac="BB:CC:DD:EE:FF:00", devName="", devLastIP="10.0.0.5")]) assert result[0]["name"] == "BB:CC:DD:EE:FF:00" - def test_row_with_no_mac_and_no_ip_skipped(self, tmp_path): - db = str(tmp_path / "na.db") - _make_db(db, [{"devMac": "", "devName": "Ghost", "devLastIP": ""}]) - result = get_netalertx_devices(db, include_offline=True, include_new=True) + def test_row_with_no_mac_and_no_ip_skipped(self): + result = self._call([_raw_device(devMac="", devName="Ghost", devLastIP="")]) assert result == [] - def test_missing_db_returns_empty_list(self, tmp_path): - result = get_netalertx_devices(str(tmp_path / "missing.db"), True, True) - assert result == [] + def test_exception_returns_empty_list(self): + with patch("script.DeviceInstance") as mock_di: + mock_di.return_value.getAll.side_effect = Exception("db error") + assert get_netalertx_devices(True, True) == [] # --------------------------------------------------------------------------- From 6845104f4196cd5fdcf8d6b91053e496d7832563 Mon Sep 17 00:00:00 2001 From: Safeguard Date: Thu, 28 May 2026 10:44:56 +0200 Subject: [PATCH 5/6] Translated using Weblate (Russian) Currently translated at 100.0% (809 of 809 strings) Translation: NetAlertX/core Translate-URL: https://hosted.weblate.org/projects/pialert/core/ru/ --- front/php/templates/language/ru_ru.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/front/php/templates/language/ru_ru.json b/front/php/templates/language/ru_ru.json index 4d6b1732..eb2acbe3 100644 --- a/front/php/templates/language/ru_ru.json +++ b/front/php/templates/language/ru_ru.json @@ -27,7 +27,7 @@ "AppEvents_ObjectType": "Тип объекта", "AppEvents_Plugin": "Плагин", "AppEvents_Type": "Тип", - "BACKEND_API_URL_description": "Используется для обеспечения связи между фронтендом и бэкендом. По умолчанию это значение установлено на /server и, как правило, не должно изменяться.", + "BACKEND_API_URL_description": "Используется для обеспечения связи между фронтендом и бэкендом. По умолчанию это значение установлено на /server и, как правило, не должно изменяться.", "BACKEND_API_URL_name": "URL-адрес серверного API", "BackDevDetail_Actions_Ask_Run": "Вы хотите выполнить действие?", "BackDevDetail_Actions_Not_Registered": "Действие не зарегистрировано:· ", From 6819f144488874c2aaa6270a6d4f301208c88d93 Mon Sep 17 00:00:00 2001 From: Arvuno Date: Wed, 3 Jun 2026 19:54:35 +0000 Subject: [PATCH 6/6] feat(server): optional default for get_setting_value get_setting_value returns an empty string when a key is not found, which forces every call site to remember to treat '' as a sentinel and provide its own fallback. The fallback is sometimes a hard-coded default and sometimes a different code path entirely, leading to inconsistent handling across the codebase. Add an optional argument that defaults to '' (preserves the existing behaviour for every call site) and is returned when the key is not present. New call sites can opt into a more meaningful default without changing the function's signature for existing callers. Refs #1626. --- server/helper.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/server/helper.py b/server/helper.py index bfbf65c1..59278b80 100755 --- a/server/helper.py +++ b/server/helper.py @@ -197,7 +197,7 @@ def get_setting(key): # ------------------------------------------------------------------------------- # Return setting value -def get_setting_value(key): +def get_setting_value(key, default=""): """ Retrieve a setting value from configuration. @@ -208,13 +208,16 @@ def get_setting_value(key): Args: key (str): The setting key to look up. + default (Any): Value to return when the key is not found. Defaults + to "" for backwards compatibility with call sites that already + treat an empty string as "missing". Returns: - Any: The Python-typed setting value, or an empty string if not found. + Any: The Python-typed setting value, or `default` if not found. """ - # Returns empty string if not found - value = "" + # Returns default if not found + value = default # lookup key in secondary cache if key in SETTINGS_SECONDARYCACHE: