Merge pull request #1649 from natecj/feat/adguard-export-plugin

feat: add adguard_export plugin
This commit is contained in:
Jokob @NetAlertX
2026-05-30 09:18:56 +10:00
committed by GitHub
5 changed files with 1555 additions and 2 deletions

View File

@@ -0,0 +1,140 @@
# adguard_export — NetAlertX Plugin
> **Direction:** NetAlertX → AdGuard Home
> Syncs known devices from the NetAlertX database to AdGuard Home as **persistent clients**, keeping device names, MAC addresses, and IP identifiers in sync.
---
## What it does
On every run the plugin:
1. Reads all (or only *known*) devices from the NetAlertX database.
2. Fetches the current list of persistent clients from AdGuard Home via its REST API.
3. **Adds** clients that are in NetAlertX but not yet in AdGuard Home.
4. **Updates** clients whose name, identifiers, or device-type tag have changed.
5. Optionally **deletes** clients that have been removed from NetAlertX (see `DELETE` setting).
Device types set in NetAlertX (e.g. `Smartphone`, `Laptop`, `NAS`) are automatically mapped to the corresponding AdGuard Home `device_*` tags (e.g. `device_phone`, `device_laptop`, `device_nas`).
---
## Requirements
| Requirement | Notes |
|---|---|
| AdGuard Home | v0.107+ (REST API must be enabled) |
| Python packages | `requests`, `pytz` — already present in the NetAlertX container |
| AdGuard credentials | A user account with permission to manage clients |
---
## Installation
1. Copy the `adguard_export/` folder into `/app/front/plugins/` inside your NetAlertX container (or mount it as a volume).
2. Restart NetAlertX so the plugin is discovered.
3. Open **Settings → Plugins → AdGuard (Device Export)** and configure the settings below.
---
## Settings
| Setting key | Default | Description |
|---|---|---|
| `ADGUARDEXP_RUN` | `disabled` | When to run: `disabled`, `once`, or `schedule` |
| `ADGUARDEXP_RUN_SCHD` | `0 * * * *` | Cron schedule (default: hourly) |
| `ADGUARDEXP_URL` | `http://localhost:3000` | Base URL of AdGuard Home web UI |
| `ADGUARDEXP_USER` | `admin` | AdGuard Home username |
| `ADGUARDEXP_PASSWORD` | *(empty)* | AdGuard Home password |
| `ADGUARDEXP_VERIFYSSL` | `true` | Verify TLS cert; set `false` for self-signed certs |
| `ADGUARDEXP_INCLUDE_OFFLINE` | `true` | When `true`, devices not seen in the last scan are still exported |
| `ADGUARDEXP_INCLUDE_NEW` | `false` | When `false`, devices flagged as new/unknown are excluded until identified |
| `ADGUARDEXP_USEMAC` | `true` | Use MAC address as primary client identifier; falls back to IP |
| `ADGUARDEXP_DELETE` | `false` | ⚠ Delete AdGuard clients no longer present in NetAlertX |
---
## AdGuard Home client identifiers
AdGuard Home identifies a client by one or more **ids**, which can be:
- A MAC address (e.g. `aa:bb:cc:dd:ee:ff`)
- An IP address (e.g. `192.168.1.42`)
- A CIDR range
- A ClientID string
When `ADGUARDEXP_USEMAC=true`, the plugin prefers the device's MAC address and includes the last known IP as a secondary identifier. When `ADGUARDEXP_USEMAC=false`, only the IP address is used.
---
## Device type tags
The plugin maps NetAlertX device types to valid AdGuard Home `device_*` tags automatically:
| NetAlertX type | AdGuard tag |
|---|---|
| Smartphone, Phone, Mobile | `device_phone` |
| Laptop, Notebook | `device_laptop` |
| Desktop, Server, Hypervisor | `device_pc` |
| Tablet | `device_tablet` |
| Smart TV, SmartTV, TV | `device_tv` |
| NAS | `device_nas` |
| Printer | `device_printer` |
| IP Camera, Camera | `device_camera` |
| Game Console | `device_gameconsole` |
| Speaker, Assistant, Virtual Assistance | `device_audio` |
| AP, Gateway, Router, House Appliance | `device_other` |
Devices with an unrecognised or empty type are exported without a tag.
---
## Safe deletion
When `ADGUARDEXP_DELETE=true`, the plugin only removes clients it previously created — it will never delete clients you added manually in AdGuard Home. Ownership is tracked in a local state file at:
```text
/app/db/state.ADGUARDEXP.json
```
---
## Logs
Plugin logs are written to:
```text
/tmp/log/plugins/script.ADGUARDEXP.log
```
Result rows (used by the NetAlertX UI) are written to:
```text
/tmp/log/plugins/last_result.ADGUARDEXP.log
```
---
## Troubleshooting
| Symptom | Likely cause |
|---|---|
| `Connection failed` in logs | Wrong `ADGUARDEXP_URL` or AdGuard Home is unreachable from the NetAlertX container |
| `HTTP error: 401` | Wrong username / password |
| `HTTP error: 400` | Client already exists with conflicting ids — check AdGuard Home for duplicate entries |
| Devices not appearing | `ADGUARDEXP_INCLUDE_NEW=false` and devices are flagged as new/unknown; identify them in NetAlertX first |
| SSL errors | Set `ADGUARDEXP_VERIFYSSL=false` for self-signed certificates |
---
## Related plugins
- **adguard_import** — the reverse direction: imports devices *from* AdGuard Home *into* NetAlertX.
---
### Other info
- Version: 1.0.0
- Maintainer: [natecj](https://github.com/natecj)
- Release Date: 10-May-2026

View File

@@ -0,0 +1,542 @@
{
"code_name": "adguard_export",
"unique_prefix": "ADGUARDEXP",
"plugin_type": "other",
"execution_order": "Layer_0",
"enabled": true,
"data_source": "script",
"localized": ["display_name", "description", "icon"],
"display_name": [
{
"language_code": "en_us",
"string": "AdGuard (Device Export)"
}
],
"description": [
{
"language_code": "en_us",
"string": "Exports known devices from NetAlertX to AdGuard as persistent clients, keeping names and IP/MAC identifiers in sync."
}
],
"icon": [
{
"language_code": "en_us",
"string": "<i class=\"fa fa-shield\"></i>"
}
],
"timeout": 120,
"params": [],
"settings": [
{
"function": "RUN",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "select",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "disabled",
"options": ["disabled", "once", "schedule"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "When to run"
}
],
"description": [
{
"language_code": "en_us",
"string": "Enable or schedule the export. Use 'schedule' together with the RUN_SCHD setting."
}
]
},
{
"function": "RUN_SCHD",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "0 * * * *",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Schedule"
}
],
"description": [
{
"language_code": "en_us",
"string": "Cron expression for how often to run. Default: every hour on the hour."
}
]
},
{
"function": "CMD",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "python3 /app/front/plugins/adguard_export/script.py",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Command"
}
],
"description": [
{
"language_code": "en_us",
"string": "Command to execute. Do not change unless you know what you are doing."
}
]
},
{
"function": "URL",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "http://localhost:3000",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "AdGuard Home URL"
}
],
"description": [
{
"language_code": "en_us",
"string": "Base URL of your AdGuard Home web interface, e.g. http://192.168.1.1:3000"
}
]
},
{
"function": "USER",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "admin",
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "AdGuard Home Username"
}
],
"description": [
{
"language_code": "en_us",
"string": "Username for AdGuard Home basic authentication."
}
]
},
{
"function": "PASSWORD",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "input",
"elementOptions": [{ "type": "password" }],
"transformers": []
}
]
},
"maxLength": 200,
"default_value": "",
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "AdGuard Home Password"
}
],
"description": [
{
"language_code": "en_us",
"string": "Password for AdGuard Home basic authentication."
}
]
},
{
"function": "VERIFYSSL",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "select",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "true",
"options": ["true", "false"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Verify SSL Certificate"
}
],
"description": [
{
"language_code": "en_us",
"string": "Set to 'false' to skip TLS certificate verification (useful for self-signed certs)."
}
]
},
{
"function": "INCLUDE_OFFLINE",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "select",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "true",
"options": ["true", "false"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Include offline devices"
}
],
"description": [
{
"language_code": "en_us",
"string": "When 'true', devices not seen in the last scan are still exported to AdGuard Home."
}
]
},
{
"function": "INCLUDE_NEW",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "select",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "false",
"options": ["true", "false"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Include new/unknown devices"
}
],
"description": [
{
"language_code": "en_us",
"string": "When 'true', devices flagged as new/unknown are also exported. When 'false', only identified devices are exported."
}
]
},
{
"function": "USEMAC",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "select",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "true",
"options": ["true", "false"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Use MAC address as identifier"
}
],
"description": [
{
"language_code": "en_us",
"string": "When 'true', MAC addresses are used as the primary client identifier in AdGuard Home. Falls back to IP when no MAC is available."
}
]
},
{
"function": "DELETE",
"type": {
"dataType": "string",
"elements": [
{
"elementType": "select",
"elementOptions": [],
"transformers": []
}
]
},
"default_value": "false",
"options": ["true", "false"],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Delete clients missing from NetAlertX"
}
],
"description": [
{
"language_code": "en_us",
"string": "Caution: When 'true', AdGuard Home clients that were previously exported by this plugin but are no longer present in NetAlertX will be deleted. Ownership is tracked in a local state file — manually added AdGuard clients are never deleted."
}
]
}
],
"database_column_definitions": [
{
"column": "Object_PrimaryID",
"mapped_to_column": "cur_MAC",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"filter_info": {
"type": "MAC",
"name": "MAC",
"source": "table"
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "MAC / ID"
}
]
},
{
"column": "Object_SecondaryID",
"mapped_to_column": "cur_IP",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "IP Address"
}
]
},
{
"column": "Watched_Value1",
"mapped_to_column": "cur_Name",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Device Name"
}
]
},
{
"column": "Watched_Value2",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "MAC Address"
}
]
},
{
"column": "Watched_Value3",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Last IP"
}
]
},
{
"column": "Watched_Value4",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "AdGuard URL"
}
]
},
{
"column": "Dummy",
"mapped_to_column": "scanSourcePlugin",
"mapped_to_column_data": {
"value": "adguard_export"
},
"css_classes": "col-sm-2",
"show": false,
"type": "label",
"default_value": "",
"options": [],
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "ADGUARDEXP"
}
]
},
{
"column": "Extra",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [
{
"option": "cssClass",
"value": "text-muted small"
}
],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Status"
}
]
},
{
"column": "ForeignKey",
"mapped_to_column": "dev_MAC",
"show": true,
"type": {
"dataType": "string",
"elements": [
{
"elementType": "label",
"elementOptions": [],
"transformers": []
}
]
},
"localized": ["name"],
"name": [
{
"language_code": "en_us",
"string": "Device link"
}
]
}
]
}

View File

@@ -0,0 +1,480 @@
#!/usr/bin/env python
# adguard_export/script.py
#
# NetAlertX plugin: adguard_export
# Syncs known devices from the NetAlertX database to AdGuard Home as
# persistent clients, keeping names, MACs, and IP addresses in sync.
#
# AdGuard Home API reference:
# GET /control/clients list all persistent clients
# POST /control/clients/add create a new persistent client
# POST /control/clients/update update an existing persistent client
# POST /control/clients/delete remove a persistent client
import os
import sys
import json
import requests
from pytz import timezone
from typing import Dict, List, Optional, Set, Tuple
# Define the installation path and extend the system path for plugin imports
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from const import dataPath, logPath # noqa: E402, E261
from plugin_helper import Plugin_Objects # noqa: E402, E261
from logger import mylog, Logger # noqa: E402, E261
from helper import get_setting_value # noqa: E402, E261
from models.device_instance import DeviceInstance # noqa: E402, E261
import conf # noqa: E402, E261
# ----------------------------
# Plugin metadata
# ----------------------------
pluginName = "ADGUARDEXP"
# Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value("TIMEZONE"))
# Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL"))
# Define paths
LOG_PATH = logPath + "/plugins"
RESULT_FILE = os.path.join(LOG_PATH, f"last_result.{pluginName}.log")
STATE_FILE = os.path.join(dataPath, f"state.{pluginName}.json")
plugin_objects = Plugin_Objects(RESULT_FILE)
def load_managed_names() -> Set[str]:
"""Return the set of AdGuard client names we previously added."""
try:
with open(STATE_FILE) as f:
return set(json.load(f).get("managed", []))
except (FileNotFoundError, json.JSONDecodeError):
return set()
def save_managed_names(names: Set[str]) -> None:
with open(STATE_FILE, "w") as f:
json.dump({"managed": sorted(names)}, f, indent=2)
# ---------------------------------------------------------------------------
# Device type → AdGuard tag mapping
# ---------------------------------------------------------------------------
_TYPE_TAG_MAP: Dict[str, str] = {
"ap": "device_other",
"desktop": "device_pc",
"game console": "device_gameconsole",
"gameconsole": "device_gameconsole",
"gateway": "device_other",
"house appliance": "device_other",
"hypervisor": "device_pc",
"ip camera": "device_camera",
"camera": "device_camera",
"laptop": "device_laptop",
"notebook": "device_laptop",
"nas": "device_nas",
"printer": "device_printer",
"router": "device_other",
"server": "device_pc",
"smarttv": "device_tv",
"smart tv": "device_tv",
"tv": "device_tv",
"smartphone": "device_phone",
"phone": "device_phone",
"mobile": "device_phone",
"smartwatch": "device_phone",
"watch": "device_phone",
"tablet": "device_tablet",
"virtual assistance": "device_audio",
"assistant": "device_audio",
"speaker": "device_audio",
}
def device_type_to_tag(dev_type: str) -> str:
"""Map a NetAlertX devType string to a valid AdGuard Home tag, or ''."""
if not dev_type:
return ""
key = dev_type.strip().lower()
if key in _TYPE_TAG_MAP:
return _TYPE_TAG_MAP[key]
# Substring fallback for partial matches
for pattern, tag in _TYPE_TAG_MAP.items():
if pattern in key:
return tag
return ""
# ---------------------------------------------------------------------------
# AdGuard Home client
# ---------------------------------------------------------------------------
class AdGuardClient:
"""Thin wrapper around the AdGuard Home /control/clients* API."""
def __init__(self, base_url: str, username: str, password: str, verify_ssl: bool = True):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.auth = self.auth
def _url(self, path: str) -> str:
return f"{self.base_url}/control/{path.lstrip('/')}"
def get_clients(self) -> List[dict]:
"""Return the list of persistent (manually added) clients."""
resp = self.session.get(self._url("clients"), verify=self.verify_ssl, timeout=15)
resp.raise_for_status()
return resp.json().get("clients") or []
def add_client(self, client: dict) -> None:
resp = self.session.post(
self._url("clients/add"),
json=client,
verify=self.verify_ssl,
timeout=15,
)
resp.raise_for_status()
def update_client(self, old_name: str, client: dict) -> None:
payload = {"name": old_name, "data": client}
resp = self.session.post(
self._url("clients/update"),
json=payload,
verify=self.verify_ssl,
timeout=15,
)
resp.raise_for_status()
def delete_client(self, name: str) -> None:
resp = self.session.post(
self._url("clients/delete"),
json={"name": name},
verify=self.verify_ssl,
timeout=15,
)
resp.raise_for_status()
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def get_netalertx_devices(include_offline: bool, include_new: bool) -> List[dict]:
"""
Return filtered devices from NetAlertX using the DeviceInstance model.
Fields returned per device: mac, name, last_ip, dev_type
"""
devices = []
try:
for d in DeviceInstance().getAll():
if d.get("devIsArchived", 0):
continue
if not include_offline and not d.get("devPresentLastScan", 1):
continue
if not include_new and d.get("devIsNew", 0):
continue
mac = (d.get("devMac", "") or "").strip()
last_ip = (d.get("devLastIP", "") or "").strip()
name = (d.get("devName", "") or "").strip()
dev_type = (d.get("devType", "") or "").strip()
if not mac and not last_ip:
continue
if not name:
name = mac or last_ip
devices.append({"mac": mac, "name": name, "last_ip": last_ip, "dev_type": dev_type})
except Exception as exc:
mylog("verbose", [f"[{pluginName}] ERROR reading devices: {exc}"])
return devices
# ---------------------------------------------------------------------------
# Sync logic
# ---------------------------------------------------------------------------
def build_agrd_client(device: dict, use_mac: bool) -> dict:
"""
Build an AdGuard Home client object from a NetAlertX device row.
AdGuard Home identifies a client by its 'ids' list, which may contain
MACs, IPs, CIDRs, or ClientIDs. We prefer MAC when available; fall
back to IP otherwise.
"""
ids = []
if use_mac and device["mac"] and device["mac"] not in ("", "00:00:00:00:00:00"):
ids.append(device["mac"].lower())
if device["last_ip"] and device["last_ip"] not in ("", "0.0.0.0"):
ids.append(device["last_ip"])
if not ids:
return {} # nothing useful to identify the device
tag = device_type_to_tag(device.get("dev_type", ""))
return {
"name": device["name"],
"ids": ids,
"tags": [tag] if tag else [],
"use_global_settings": True,
"use_global_blocked_services": True,
"filtering_enabled": False,
"parental_enabled": False,
"safebrowsing_enabled": False,
"safesearch_enabled": False,
"blocked_services": [],
"upstreams": [],
}
def sync_to_adguard(
agrd: AdGuardClient,
devices: List[dict],
use_mac: bool,
delete_missing: bool,
existing_clients: Optional[List[dict]] = None,
) -> Tuple[int, int, int, int]:
"""
Core sync routine. Returns (added, updated, skipped, deleted).
Pass existing_clients to reuse a list already fetched (avoids a second
round-trip when the caller performed a connectivity check first).
"""
if existing_clients is None:
existing_clients = agrd.get_clients()
mylog("verbose", [f"[{pluginName}] AdGuard Home currently has {len(existing_clients)} persistent client(s)."])
# Build a lookup: identifier → client dict
existing_by_id: Dict[str, dict] = {}
for client in existing_clients:
for cid in client.get("ids", []):
existing_by_id[cid.lower()] = client
# Also index by name for update / delete operations (warn if AdGuard has duplicate names)
existing_by_name: Dict[str, dict] = {}
for c in existing_clients:
if c["name"] in existing_by_name:
mylog("verbose", [f"[{pluginName}] WARNING duplicate client name in AdGuard Home: {c['name']!r}"])
existing_by_name[c["name"]] = c
# Load the set of client names we've previously added so that DELETE mode
# only removes clients we created, not manually-added ones.
managed_names = load_managed_names()
added = updated = skipped = deleted = 0
# ----- add / update -----
for device in devices:
client_data = build_agrd_client(device, use_mac)
if not client_data:
if not use_mac and not device["last_ip"]:
reason = "no IP address (USEMAC is disabled, IP required)"
else:
reason = "no usable MAC or IP"
mylog("verbose", [f"[{pluginName}] SKIP {device['name']!r} {reason}"])
skipped += 1
continue
# Check whether any of the ids already exist in AdGuard
existing = None
for cid in client_data["ids"]:
if cid.lower() in existing_by_id:
existing = existing_by_id[cid.lower()]
break
if existing is None and device["name"] in managed_names:
# Fall back to name match only for clients we previously added — avoids
# accidentally matching a manually-created AdGuard client with the same name.
existing = existing_by_name.get(device["name"])
if existing:
mylog("verbose", [f"[{pluginName}] WARN matched {device['name']!r} by name (no ID match) — verify no duplicate clients"])
if existing:
old_name = existing["name"]
# Preserve existing per-client AdGuard settings; we only manage name, ids, tags.
_our_keys = frozenset(("name", "ids", "tags"))
merged_data = {**client_data, **{k: v for k, v in existing.items() if k not in _our_keys}}
# Only call update when something actually changed to avoid noise
if (
sorted(i.lower() for i in existing.get("ids", [])) != sorted(i.lower() for i in client_data["ids"])
or existing.get("name") != client_data["name"]
or sorted(existing.get("tags", [])) != sorted(client_data["tags"])
):
try:
agrd.update_client(old_name, merged_data)
mylog("verbose", [f"[{pluginName}] UPDATE {old_name!r}{device['name']!r} ids={client_data['ids']}"])
# Only track the rename for clients we already own — never adopt a manually-created client.
if old_name in managed_names:
managed_names.discard(old_name)
managed_names.add(device["name"])
updated += 1
except requests.HTTPError as exc:
mylog("verbose", [f"[{pluginName}] ERROR updating {device['name']!r}: {exc}"])
skipped += 1
else:
mylog("verbose", [f"[{pluginName}] SKIP (no change) {device['name']!r}"])
# No managed_names update: if we created this client it's already in the state
# file; if it's a manually-created client we must not claim ownership of it.
skipped += 1
else:
try:
agrd.add_client(client_data)
mylog("verbose", [f"[{pluginName}] ADD {device['name']!r} ids={client_data['ids']}"])
managed_names.add(device["name"])
added += 1
except requests.HTTPError as exc:
mylog("verbose", [f"[{pluginName}] ERROR adding {device['name']!r}: {exc}"])
skipped += 1
# ----- optional delete of AdGuard clients no longer in NetAlertX -----
if delete_missing:
export_names = {d["name"] for d in devices}
for client in existing_clients:
cname = client.get("name", "")
# Only delete clients that we previously added (tracked in state file)
# so we don't accidentally remove manually-added clients.
if cname in managed_names and cname not in export_names:
try:
agrd.delete_client(cname)
mylog("verbose", [f"[{pluginName}] DELETE {cname!r} (no longer in NetAlertX)"])
managed_names.discard(cname)
deleted += 1
except requests.HTTPError as exc:
mylog("verbose", [f"[{pluginName}] ERROR deleting {cname!r}: {exc}"])
save_managed_names(managed_names)
return added, updated, skipped, deleted
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
mylog("verbose", [f"[{pluginName}] In script"])
# ------------------------------------------------------------------
# Read settings
# ------------------------------------------------------------------
agrd_url = get_setting_value("ADGUARDEXP_URL") or "http://localhost:3000"
agrd_user = get_setting_value("ADGUARDEXP_USER") or ""
agrd_pass = get_setting_value("ADGUARDEXP_PASSWORD") or ""
verify_ssl_str = get_setting_value("ADGUARDEXP_VERIFYSSL") or "true"
include_offline_str = get_setting_value("ADGUARDEXP_INCLUDE_OFFLINE") or "true"
include_new_str = get_setting_value("ADGUARDEXP_INCLUDE_NEW") or "false"
use_mac_str = get_setting_value("ADGUARDEXP_USEMAC") or "true"
delete_str = get_setting_value("ADGUARDEXP_DELETE") or "false"
verify_ssl = verify_ssl_str.strip().lower() not in ("false", "0", "no")
include_offline = include_offline_str.strip().lower() not in ("false", "0", "no")
include_new = include_new_str.strip().lower() not in ("false", "0", "no")
use_mac = use_mac_str.strip().lower() not in ("false", "0", "no")
delete_miss = delete_str.strip().lower() not in ("false", "0", "no")
mylog("verbose", [f"[{pluginName}] " + ("=" * 60)])
mylog("verbose", [f"[{pluginName}] AdGuard Home URL : {agrd_url}"])
mylog("verbose", [f"[{pluginName}] Include offline devs: {include_offline}"])
mylog("verbose", [f"[{pluginName}] Include new/unknown : {include_new}"])
mylog("verbose", [f"[{pluginName}] Use MAC as id : {use_mac}"])
mylog("verbose", [f"[{pluginName}] Delete missing : {delete_miss}"])
mylog("verbose", [f"[{pluginName}] " + ("=" * 60)])
# ------------------------------------------------------------------
# Load devices from NetAlertX
# ------------------------------------------------------------------
devices = get_netalertx_devices(include_offline, include_new)
mylog("verbose", [f"[{pluginName}] Loaded {len(devices)} device(s) from NetAlertX database."])
if not devices:
mylog("verbose", ["No devices to sync exiting."])
plugin_objects.add_object(
primaryId = "adguard_export",
secondaryId = "summary",
watched1 = "0",
watched2 = "0",
watched3 = "0",
watched4 = "0",
extra = "No devices found in NetAlertX",
)
plugin_objects.write_result_file()
return
# ------------------------------------------------------------------
# Connect to AdGuard Home and sync
# ------------------------------------------------------------------
try:
agrd = AdGuardClient(agrd_url, agrd_user, agrd_pass, verify_ssl)
existing_clients = agrd.get_clients()
except requests.exceptions.ConnectionError as exc:
mylog("verbose", [f"[{pluginName}] ERROR cannot connect to AdGuard Home at {agrd_url}: {exc}"])
plugin_objects.add_object(
primaryId = "adguard_export",
secondaryId = "error",
extra = f"Connection failed: {exc}",
)
plugin_objects.write_result_file()
return
except requests.HTTPError as exc:
mylog("verbose", [f"[{pluginName}] ERROR AdGuard Home returned an HTTP error: {exc}"])
plugin_objects.add_object(
primaryId = "adguard_export",
secondaryId = "error",
extra = f"HTTP error: {exc}",
)
plugin_objects.write_result_file()
return
except Exception as exc:
mylog("verbose", [f"[{pluginName}] ERROR AdGuard Home returned an unknown error: {exc}"])
plugin_objects.add_object(
primaryId = "adguard_export",
secondaryId = "error",
extra = f"Unknown error: {exc}",
)
plugin_objects.write_result_file()
return
added, updated, skipped, deleted = sync_to_adguard(
agrd, devices, use_mac, delete_miss, existing_clients=existing_clients
)
summary = (
f"Sync complete added={added} updated={updated} "
f"skipped={skipped} deleted={deleted}"
)
# ------------------------------------------------------------------
# Write plugin result (one summary row + one row per touched device)
# ------------------------------------------------------------------
plugin_objects.add_object(
primaryId = "adguard_export",
secondaryId = "summary",
watched1 = str(added),
watched2 = str(updated),
watched3 = str(skipped),
watched4 = str(deleted),
extra = summary,
)
for device in devices:
plugin_objects.add_object(
primaryId = device["mac"] or device["last_ip"],
secondaryId = device["last_ip"],
watched1 = device["name"],
watched2 = device["mac"],
watched3 = device["last_ip"],
watched4 = agrd_url,
extra = "exported",
foreignKey = device["mac"] or "",
)
mylog("verbose", [f"[{pluginName}] {summary}"])
plugin_objects.write_result_file()
return
if __name__ == "__main__":
main()

View File

@@ -26,7 +26,7 @@
"description": [
{
"language_code": "en_us",
"string": "Plugin to import devices from AdGuard."
"string": "Imports known devices from AdGuard to NetAlertX as persistent devices, keeping names and IP/MAC identifiers in sync."
}
],
"icon": [
@@ -36,7 +36,7 @@
}
],
"params": [],
"settings": [
"settings": [
{
"function": "RUN",
"events": ["run"],

View File

@@ -0,0 +1,391 @@
"""
Tests for adguard_export/script.py
Run from inside the NetAlertX container (where the full environment is available),
or locally — in that case the NetAlertX-specific modules are stubbed out
automatically before the script is imported.
pytest test/plugins/test_adguard_export.py -v
"""
import json
import os
import sys
import tempfile
import types
from unittest.mock import MagicMock, call, patch
import pytest
# ---------------------------------------------------------------------------
# Stub NetAlertX-specific modules so tests can run outside the container.
# sys.modules.setdefault() is a no-op when the real module is already loaded,
# so this is safe to run inside the container too.
# ---------------------------------------------------------------------------
_tmp_log = tempfile.mkdtemp()
def _stub(name: str, **attrs):
if name not in sys.modules:
mod = types.ModuleType(name)
for k, v in attrs.items():
setattr(mod, k, v)
sys.modules[name] = mod
_stub("pytz", timezone=lambda tz: tz)
_stub("conf")
_stub("const", dataPath=_tmp_log, logPath=_tmp_log, fullDbPath=os.path.join(_tmp_log, "test.db"))
_stub("plugin_helper", Plugin_Objects=MagicMock)
_stub("logger", mylog=lambda *a: None, Logger=MagicMock)
_stub("helper", get_setting_value=lambda k: "")
_stub("models", )
_stub("models.device_instance", DeviceInstance=MagicMock)
# Stub requests only when it isn't installed (e.g. bare system Python locally).
# In the container and CI, the real package is present and will be used.
if "requests" not in sys.modules:
_req = types.ModuleType("requests")
_req.Session = MagicMock
_req.HTTPError = type("HTTPError", (Exception,), {})
_req_exc = types.ModuleType("requests.exceptions")
_req_exc.ConnectionError = type("ConnectionError", (Exception,), {})
_req.exceptions = _req_exc
sys.modules["requests"] = _req
sys.modules["requests.exceptions"] = _req_exc
# ---------------------------------------------------------------------------
# Import the functions under test (must come after the stubs above).
# ---------------------------------------------------------------------------
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "front", "plugins", "adguard_export"))
from script import ( # noqa: E402
AdGuardClient,
_TYPE_TAG_MAP,
build_agrd_client,
device_type_to_tag,
get_netalertx_devices,
load_managed_names,
save_managed_names,
sync_to_adguard,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _raw_device(**overrides) -> dict:
"""Build a raw DeviceInstance.getAll() style dict."""
base = {
"devMac": "AA:BB:CC:00:00:01",
"devName": "PC",
"devLastIP": "10.0.0.1",
"devType": "desktop",
"devIsArchived": 0,
"devPresentLastScan": 1,
"devIsNew": 0,
}
return {**base, **overrides}
def _mock_agrd(existing=None) -> MagicMock:
"""Return a mock AdGuardClient whose get_clients() returns *existing*."""
agrd = MagicMock(spec=AdGuardClient)
agrd.get_clients.return_value = existing or []
return agrd
# ---------------------------------------------------------------------------
# device_type_to_tag
# ---------------------------------------------------------------------------
class TestDeviceTypeToTag:
def test_empty_string_returns_empty(self):
assert device_type_to_tag("") == ""
def test_none_returns_empty(self):
assert device_type_to_tag(None) == ""
def test_exact_match_case_insensitive(self):
assert device_type_to_tag("Smartphone") == "device_phone"
assert device_type_to_tag("LAPTOP") == "device_laptop"
assert device_type_to_tag("nas") == "device_nas"
def test_substring_fallback(self):
# "gaming smartphone" contains "smartphone"
assert device_type_to_tag("gaming smartphone") == "device_phone"
def test_unknown_type_returns_empty(self):
assert device_type_to_tag("toaster") == ""
def test_all_map_values_are_valid_adguard_tags(self):
valid_prefixes = ("device_", "ct_", "os_")
for tag in _TYPE_TAG_MAP.values():
assert any(tag.startswith(p) for p in valid_prefixes), (
f"{tag!r} is not a valid AdGuard Home tag"
)
# ---------------------------------------------------------------------------
# build_agrd_client
# ---------------------------------------------------------------------------
class TestBuildAgrdClient:
def _device(self, **overrides) -> dict:
base = {"mac": "AA:BB:CC:DD:EE:FF", "name": "My PC", "last_ip": "192.168.1.10", "dev_type": "desktop"}
return {**base, **overrides}
def test_mac_and_ip_both_included_when_use_mac_true(self):
result = build_agrd_client(self._device(), use_mac=True)
assert "aa:bb:cc:dd:ee:ff" in result["ids"]
assert "192.168.1.10" in result["ids"]
def test_only_ip_when_use_mac_false(self):
result = build_agrd_client(self._device(), use_mac=False)
assert result["ids"] == ["192.168.1.10"]
def test_returns_empty_dict_when_no_usable_id(self):
result = build_agrd_client(
{"mac": "", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""},
use_mac=True,
)
assert result == {}
def test_null_mac_falls_back_to_ip(self):
result = build_agrd_client(
{"mac": "00:00:00:00:00:00", "name": "Dev", "last_ip": "10.0.0.5", "dev_type": ""},
use_mac=True,
)
assert result["ids"] == ["10.0.0.5"]
def test_device_type_tag_applied(self):
result = build_agrd_client(self._device(dev_type="smartphone"), use_mac=True)
assert result["tags"] == ["device_phone"]
def test_unknown_device_type_produces_no_tag(self):
result = build_agrd_client(self._device(dev_type=""), use_mac=True)
assert result["tags"] == []
def test_mac_is_lowercased(self):
result = build_agrd_client(self._device(mac="AA:BB:CC:DD:EE:FF"), use_mac=True)
assert "aa:bb:cc:dd:ee:ff" in result["ids"]
# ---------------------------------------------------------------------------
# load_managed_names / save_managed_names
# ---------------------------------------------------------------------------
class TestManagedNames:
def test_round_trip(self, tmp_path):
state = tmp_path / "state.json"
with patch("script.STATE_FILE", str(state)):
save_managed_names({"alpha", "beta", "gamma"})
loaded = load_managed_names()
assert loaded == {"alpha", "beta", "gamma"}
def test_missing_file_returns_empty_set(self, tmp_path):
with patch("script.STATE_FILE", str(tmp_path / "nonexistent.json")):
assert load_managed_names() == set()
def test_corrupt_file_returns_empty_set(self, tmp_path):
state = tmp_path / "state.json"
state.write_text("not valid json")
with patch("script.STATE_FILE", str(state)):
assert load_managed_names() == set()
def test_save_sorts_names(self, tmp_path):
state = tmp_path / "state.json"
with patch("script.STATE_FILE", str(state)):
save_managed_names({"zebra", "apple", "mango"})
data = json.loads(state.read_text())
assert data["managed"] == ["apple", "mango", "zebra"]
# ---------------------------------------------------------------------------
# get_netalertx_devices
# ---------------------------------------------------------------------------
class TestGetNetalertxDevices:
def _call(self, rows, include_offline=True, include_new=True):
with patch("script.DeviceInstance") as mock_di:
mock_di.return_value.getAll.return_value = rows
return get_netalertx_devices(include_offline=include_offline, include_new=include_new)
def test_basic_query(self):
result = self._call([_raw_device()])
assert len(result) == 1
assert result[0]["name"] == "PC"
assert result[0]["mac"] == "AA:BB:CC:00:00:01"
def test_archived_devices_excluded(self):
result = self._call([
_raw_device(devMac="AA:00:00:00:00:01", devName="Active", devIsArchived=0),
_raw_device(devMac="AA:00:00:00:00:02", devName="Archived", devIsArchived=1),
])
assert len(result) == 1
assert result[0]["name"] == "Active"
def test_offline_excluded_when_flag_false(self):
result = self._call([
_raw_device(devMac="AA:00:00:00:00:01", devName="Online", devPresentLastScan=1),
_raw_device(devMac="AA:00:00:00:00:02", devName="Offline", devPresentLastScan=0),
], include_offline=False)
assert len(result) == 1
assert result[0]["name"] == "Online"
def test_new_devices_excluded_when_flag_false(self):
result = self._call([
_raw_device(devMac="AA:00:00:00:00:01", devName="Known", devIsNew=0),
_raw_device(devMac="AA:00:00:00:00:02", devName="Unknown", devIsNew=1),
], include_new=False)
assert len(result) == 1
assert result[0]["name"] == "Known"
def test_nameless_device_falls_back_to_mac(self):
result = self._call([_raw_device(devMac="BB:CC:DD:EE:FF:00", devName="", devLastIP="10.0.0.5")])
assert result[0]["name"] == "BB:CC:DD:EE:FF:00"
def test_row_with_no_mac_and_no_ip_skipped(self):
result = self._call([_raw_device(devMac="", devName="Ghost", devLastIP="")])
assert result == []
def test_exception_returns_empty_list(self):
with patch("script.DeviceInstance") as mock_di:
mock_di.return_value.getAll.side_effect = Exception("db error")
assert get_netalertx_devices(True, True) == []
# ---------------------------------------------------------------------------
# sync_to_adguard
# ---------------------------------------------------------------------------
class TestSyncToAdguard:
def _device(self, name="PC", mac="AA:BB:CC:00:00:01", ip="10.0.0.1", dev_type="desktop") -> dict:
return {"mac": mac, "name": name, "last_ip": ip, "dev_type": dev_type}
def test_new_device_is_added(self, tmp_path):
agrd = _mock_agrd(existing=[])
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
added, updated, skipped, deleted = sync_to_adguard(
agrd, [self._device()], use_mac=True, delete_missing=False
)
assert added == 1
assert updated == skipped == deleted == 0
agrd.add_client.assert_called_once()
def test_unchanged_device_is_skipped(self, tmp_path):
existing = [{"name": "PC", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}]
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
added, updated, skipped, deleted = sync_to_adguard(
agrd, [self._device()], use_mac=True, delete_missing=False
)
assert skipped == 1
assert added == updated == deleted == 0
agrd.update_client.assert_not_called()
def test_renamed_device_is_updated(self, tmp_path):
existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}]
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
added, updated, skipped, deleted = sync_to_adguard(
agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False
)
assert updated == 1
agrd.update_client.assert_called_once_with("Old Name", agrd.update_client.call_args[0][1])
def test_missing_device_deleted_when_flag_true(self, tmp_path):
state = tmp_path / "state.json"
state.write_text(json.dumps({"managed": ["Gone Device"]}))
existing = [{"name": "Gone Device", "ids": ["10.0.0.99"], "tags": []}]
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(state)):
added, updated, skipped, deleted = sync_to_adguard(
agrd, [], use_mac=True, delete_missing=True
)
assert deleted == 1
agrd.delete_client.assert_called_once_with("Gone Device")
def test_unmanaged_device_not_deleted(self, tmp_path):
# State file is empty — we never added this client
state = tmp_path / "state.json"
state.write_text(json.dumps({"managed": []}))
existing = [{"name": "Manual Client", "ids": ["10.0.0.50"], "tags": []}]
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(state)):
sync_to_adguard(agrd, [], use_mac=True, delete_missing=True)
agrd.delete_client.assert_not_called()
def test_manual_client_matched_by_id_not_adopted(self, tmp_path):
# A manually-created AdGuard client whose IP matches a NetAlertX device
# must not be added to managed_names — so DELETE=true won't touch it later.
state = tmp_path / "state.json"
state.write_text(json.dumps({"managed": []}))
existing = [{"name": "Manual Client", "ids": ["10.0.0.5"], "tags": []}]
agrd = _mock_agrd(existing=existing)
device = {"mac": "", "name": "Manual Client", "last_ip": "10.0.0.5", "dev_type": ""}
with patch("script.STATE_FILE", str(state)):
sync_to_adguard(agrd, [device], use_mac=True, delete_missing=True)
loaded = load_managed_names()
assert "Manual Client" not in loaded
agrd.delete_client.assert_not_called()
def test_device_with_no_usable_id_is_skipped(self, tmp_path):
agrd = _mock_agrd(existing=[])
device = {"mac": "00:00:00:00:00:00", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""}
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
added, updated, skipped, deleted = sync_to_adguard(
agrd, [device], use_mac=True, delete_missing=False
)
assert skipped == 1
agrd.add_client.assert_not_called()
def test_existing_clients_parameter_avoids_extra_api_call(self, tmp_path):
existing = []
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
sync_to_adguard(
agrd, [self._device()], use_mac=True, delete_missing=False,
existing_clients=existing,
)
agrd.get_clients.assert_not_called()
def test_rename_removes_old_name_from_managed_names(self, tmp_path):
state = tmp_path / "state.json"
state.write_text(json.dumps({"managed": ["Old Name"]}))
existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}]
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(state)):
sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False)
loaded = load_managed_names()
assert "Old Name" not in loaded
assert "New Name" in loaded
def test_update_preserves_custom_adguard_settings(self, tmp_path):
existing = [{
"name": "Old Name",
"ids": ["aa:bb:cc:00:00:01", "10.0.0.1"],
"tags": ["device_pc"],
"filtering_enabled": True,
"use_global_settings": False,
"parental_enabled": True,
"safebrowsing_enabled": False,
"safesearch_enabled": False,
"use_global_blocked_services": False,
"blocked_services": ["youtube.com"],
"upstreams": ["1.1.1.1"],
}]
agrd = _mock_agrd(existing=existing)
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False)
_, sent_payload = agrd.update_client.call_args[0]
assert sent_payload["filtering_enabled"] is True
assert sent_payload["use_global_settings"] is False
assert sent_payload["blocked_services"] == ["youtube.com"]
assert sent_payload["upstreams"] == ["1.1.1.1"]