mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-06-01 12:34:40 -04:00
Merge pull request #1649 from natecj/feat/adguard-export-plugin
feat: add adguard_export plugin
This commit is contained in:
140
front/plugins/adguard_export/README.md
Normal file
140
front/plugins/adguard_export/README.md
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
# adguard_export — NetAlertX Plugin
|
||||||
|
|
||||||
|
> **Direction:** NetAlertX → AdGuard Home
|
||||||
|
> Syncs known devices from the NetAlertX database to AdGuard Home as **persistent clients**, keeping device names, MAC addresses, and IP identifiers in sync.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## What it does
|
||||||
|
|
||||||
|
On every run the plugin:
|
||||||
|
|
||||||
|
1. Reads all (or only *known*) devices from the NetAlertX database.
|
||||||
|
2. Fetches the current list of persistent clients from AdGuard Home via its REST API.
|
||||||
|
3. **Adds** clients that are in NetAlertX but not yet in AdGuard Home.
|
||||||
|
4. **Updates** clients whose name, identifiers, or device-type tag have changed.
|
||||||
|
5. Optionally **deletes** clients that have been removed from NetAlertX (see `DELETE` setting).
|
||||||
|
|
||||||
|
Device types set in NetAlertX (e.g. `Smartphone`, `Laptop`, `NAS`) are automatically mapped to the corresponding AdGuard Home `device_*` tags (e.g. `device_phone`, `device_laptop`, `device_nas`).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Requirements
|
||||||
|
|
||||||
|
| Requirement | Notes |
|
||||||
|
|---|---|
|
||||||
|
| AdGuard Home | v0.107+ (REST API must be enabled) |
|
||||||
|
| Python packages | `requests`, `pytz` — already present in the NetAlertX container |
|
||||||
|
| AdGuard credentials | A user account with permission to manage clients |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
1. Copy the `adguard_export/` folder into `/app/front/plugins/` inside your NetAlertX container (or mount it as a volume).
|
||||||
|
2. Restart NetAlertX so the plugin is discovered.
|
||||||
|
3. Open **Settings → Plugins → AdGuard (Device Export)** and configure the settings below.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Settings
|
||||||
|
|
||||||
|
| Setting key | Default | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `ADGUARDEXP_RUN` | `disabled` | When to run: `disabled`, `once`, or `schedule` |
|
||||||
|
| `ADGUARDEXP_RUN_SCHD` | `0 * * * *` | Cron schedule (default: hourly) |
|
||||||
|
| `ADGUARDEXP_URL` | `http://localhost:3000` | Base URL of AdGuard Home web UI |
|
||||||
|
| `ADGUARDEXP_USER` | `admin` | AdGuard Home username |
|
||||||
|
| `ADGUARDEXP_PASSWORD` | *(empty)* | AdGuard Home password |
|
||||||
|
| `ADGUARDEXP_VERIFYSSL` | `true` | Verify TLS cert; set `false` for self-signed certs |
|
||||||
|
| `ADGUARDEXP_INCLUDE_OFFLINE` | `true` | When `true`, devices not seen in the last scan are still exported |
|
||||||
|
| `ADGUARDEXP_INCLUDE_NEW` | `false` | When `false`, devices flagged as new/unknown are excluded until identified |
|
||||||
|
| `ADGUARDEXP_USEMAC` | `true` | Use MAC address as primary client identifier; falls back to IP |
|
||||||
|
| `ADGUARDEXP_DELETE` | `false` | ⚠ Delete AdGuard clients no longer present in NetAlertX |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## AdGuard Home client identifiers
|
||||||
|
|
||||||
|
AdGuard Home identifies a client by one or more **ids**, which can be:
|
||||||
|
|
||||||
|
- A MAC address (e.g. `aa:bb:cc:dd:ee:ff`)
|
||||||
|
- An IP address (e.g. `192.168.1.42`)
|
||||||
|
- A CIDR range
|
||||||
|
- A ClientID string
|
||||||
|
|
||||||
|
When `ADGUARDEXP_USEMAC=true`, the plugin prefers the device's MAC address and includes the last known IP as a secondary identifier. When `ADGUARDEXP_USEMAC=false`, only the IP address is used.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Device type tags
|
||||||
|
|
||||||
|
The plugin maps NetAlertX device types to valid AdGuard Home `device_*` tags automatically:
|
||||||
|
|
||||||
|
| NetAlertX type | AdGuard tag |
|
||||||
|
|---|---|
|
||||||
|
| Smartphone, Phone, Mobile | `device_phone` |
|
||||||
|
| Laptop, Notebook | `device_laptop` |
|
||||||
|
| Desktop, Server, Hypervisor | `device_pc` |
|
||||||
|
| Tablet | `device_tablet` |
|
||||||
|
| Smart TV, SmartTV, TV | `device_tv` |
|
||||||
|
| NAS | `device_nas` |
|
||||||
|
| Printer | `device_printer` |
|
||||||
|
| IP Camera, Camera | `device_camera` |
|
||||||
|
| Game Console | `device_gameconsole` |
|
||||||
|
| Speaker, Assistant, Virtual Assistance | `device_audio` |
|
||||||
|
| AP, Gateway, Router, House Appliance | `device_other` |
|
||||||
|
|
||||||
|
Devices with an unrecognised or empty type are exported without a tag.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Safe deletion
|
||||||
|
|
||||||
|
When `ADGUARDEXP_DELETE=true`, the plugin only removes clients it previously created — it will never delete clients you added manually in AdGuard Home. Ownership is tracked in a local state file at:
|
||||||
|
|
||||||
|
```text
|
||||||
|
/app/db/state.ADGUARDEXP.json
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Logs
|
||||||
|
|
||||||
|
Plugin logs are written to:
|
||||||
|
|
||||||
|
```text
|
||||||
|
/tmp/log/plugins/script.ADGUARDEXP.log
|
||||||
|
```
|
||||||
|
|
||||||
|
Result rows (used by the NetAlertX UI) are written to:
|
||||||
|
|
||||||
|
```text
|
||||||
|
/tmp/log/plugins/last_result.ADGUARDEXP.log
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
| Symptom | Likely cause |
|
||||||
|
|---|---|
|
||||||
|
| `Connection failed` in logs | Wrong `ADGUARDEXP_URL` or AdGuard Home is unreachable from the NetAlertX container |
|
||||||
|
| `HTTP error: 401` | Wrong username / password |
|
||||||
|
| `HTTP error: 400` | Client already exists with conflicting ids — check AdGuard Home for duplicate entries |
|
||||||
|
| Devices not appearing | `ADGUARDEXP_INCLUDE_NEW=false` and devices are flagged as new/unknown; identify them in NetAlertX first |
|
||||||
|
| SSL errors | Set `ADGUARDEXP_VERIFYSSL=false` for self-signed certificates |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Related plugins
|
||||||
|
|
||||||
|
- **adguard_import** — the reverse direction: imports devices *from* AdGuard Home *into* NetAlertX.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Other info
|
||||||
|
|
||||||
|
- Version: 1.0.0
|
||||||
|
- Maintainer: [natecj](https://github.com/natecj)
|
||||||
|
- Release Date: 10-May-2026
|
||||||
542
front/plugins/adguard_export/config.json
Normal file
542
front/plugins/adguard_export/config.json
Normal file
@@ -0,0 +1,542 @@
|
|||||||
|
{
|
||||||
|
"code_name": "adguard_export",
|
||||||
|
"unique_prefix": "ADGUARDEXP",
|
||||||
|
"plugin_type": "other",
|
||||||
|
"execution_order": "Layer_0",
|
||||||
|
"enabled": true,
|
||||||
|
"data_source": "script",
|
||||||
|
"localized": ["display_name", "description", "icon"],
|
||||||
|
"display_name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "AdGuard (Device Export)"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Exports known devices from NetAlertX to AdGuard as persistent clients, keeping names and IP/MAC identifiers in sync."
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"icon": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "<i class=\"fa fa-shield\"></i>"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"timeout": 120,
|
||||||
|
"params": [],
|
||||||
|
"settings": [
|
||||||
|
{
|
||||||
|
"function": "RUN",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "select",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "disabled",
|
||||||
|
"options": ["disabled", "once", "schedule"],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "When to run"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Enable or schedule the export. Use 'schedule' together with the RUN_SCHD setting."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "RUN_SCHD",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "input",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "0 * * * *",
|
||||||
|
"options": [],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Schedule"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Cron expression for how often to run. Default: every hour on the hour."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "CMD",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "input",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "python3 /app/front/plugins/adguard_export/script.py",
|
||||||
|
"options": [],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Command"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Command to execute. Do not change unless you know what you are doing."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "URL",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "input",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "http://localhost:3000",
|
||||||
|
"options": [],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "AdGuard Home URL"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Base URL of your AdGuard Home web interface, e.g. http://192.168.1.1:3000"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "USER",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "input",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "admin",
|
||||||
|
"options": [],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "AdGuard Home Username"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Username for AdGuard Home basic authentication."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "PASSWORD",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "input",
|
||||||
|
"elementOptions": [{ "type": "password" }],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"maxLength": 200,
|
||||||
|
"default_value": "",
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "AdGuard Home Password"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Password for AdGuard Home basic authentication."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "VERIFYSSL",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "select",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "true",
|
||||||
|
"options": ["true", "false"],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Verify SSL Certificate"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Set to 'false' to skip TLS certificate verification (useful for self-signed certs)."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "INCLUDE_OFFLINE",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "select",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "true",
|
||||||
|
"options": ["true", "false"],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Include offline devices"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "When 'true', devices not seen in the last scan are still exported to AdGuard Home."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "INCLUDE_NEW",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "select",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "false",
|
||||||
|
"options": ["true", "false"],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Include new/unknown devices"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "When 'true', devices flagged as new/unknown are also exported. When 'false', only identified devices are exported."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "USEMAC",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "select",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "true",
|
||||||
|
"options": ["true", "false"],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Use MAC address as identifier"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "When 'true', MAC addresses are used as the primary client identifier in AdGuard Home. Falls back to IP when no MAC is available."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"function": "DELETE",
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "select",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default_value": "false",
|
||||||
|
"options": ["true", "false"],
|
||||||
|
"localized": ["name", "description"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Delete clients missing from NetAlertX"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Caution: When 'true', AdGuard Home clients that were previously exported by this plugin but are no longer present in NetAlertX will be deleted. Ownership is tracked in a local state file — manually added AdGuard clients are never deleted."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"database_column_definitions": [
|
||||||
|
{
|
||||||
|
"column": "Object_PrimaryID",
|
||||||
|
"mapped_to_column": "cur_MAC",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"filter_info": {
|
||||||
|
"type": "MAC",
|
||||||
|
"name": "MAC",
|
||||||
|
"source": "table"
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "MAC / ID"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Object_SecondaryID",
|
||||||
|
"mapped_to_column": "cur_IP",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "IP Address"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Watched_Value1",
|
||||||
|
"mapped_to_column": "cur_Name",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Device Name"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Watched_Value2",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "MAC Address"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Watched_Value3",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Last IP"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Watched_Value4",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "AdGuard URL"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Dummy",
|
||||||
|
"mapped_to_column": "scanSourcePlugin",
|
||||||
|
"mapped_to_column_data": {
|
||||||
|
"value": "adguard_export"
|
||||||
|
},
|
||||||
|
"css_classes": "col-sm-2",
|
||||||
|
"show": false,
|
||||||
|
"type": "label",
|
||||||
|
"default_value": "",
|
||||||
|
"options": [],
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "ADGUARDEXP"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "Extra",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [
|
||||||
|
{
|
||||||
|
"option": "cssClass",
|
||||||
|
"value": "text-muted small"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Status"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"column": "ForeignKey",
|
||||||
|
"mapped_to_column": "dev_MAC",
|
||||||
|
"show": true,
|
||||||
|
"type": {
|
||||||
|
"dataType": "string",
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"elementType": "label",
|
||||||
|
"elementOptions": [],
|
||||||
|
"transformers": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"localized": ["name"],
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"language_code": "en_us",
|
||||||
|
"string": "Device link"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
480
front/plugins/adguard_export/script.py
Normal file
480
front/plugins/adguard_export/script.py
Normal file
@@ -0,0 +1,480 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# adguard_export/script.py
|
||||||
|
#
|
||||||
|
# NetAlertX plugin: adguard_export
|
||||||
|
# Syncs known devices from the NetAlertX database to AdGuard Home as
|
||||||
|
# persistent clients, keeping names, MACs, and IP addresses in sync.
|
||||||
|
#
|
||||||
|
# AdGuard Home API reference:
|
||||||
|
# GET /control/clients – list all persistent clients
|
||||||
|
# POST /control/clients/add – create a new persistent client
|
||||||
|
# POST /control/clients/update – update an existing persistent client
|
||||||
|
# POST /control/clients/delete – remove a persistent client
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import requests
|
||||||
|
from pytz import timezone
|
||||||
|
from typing import Dict, List, Optional, Set, Tuple
|
||||||
|
|
||||||
|
# Define the installation path and extend the system path for plugin imports
|
||||||
|
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||||
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||||
|
|
||||||
|
from const import dataPath, logPath # noqa: E402, E261
|
||||||
|
from plugin_helper import Plugin_Objects # noqa: E402, E261
|
||||||
|
from logger import mylog, Logger # noqa: E402, E261
|
||||||
|
from helper import get_setting_value # noqa: E402, E261
|
||||||
|
from models.device_instance import DeviceInstance # noqa: E402, E261
|
||||||
|
import conf # noqa: E402, E261
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# Plugin metadata
|
||||||
|
# ----------------------------
|
||||||
|
pluginName = "ADGUARDEXP"
|
||||||
|
|
||||||
|
# Make sure the TIMEZONE for logging is correct
|
||||||
|
conf.tz = timezone(get_setting_value("TIMEZONE"))
|
||||||
|
|
||||||
|
# Make sure log level is initialized correctly
|
||||||
|
Logger(get_setting_value("LOG_LEVEL"))
|
||||||
|
|
||||||
|
# Define paths
|
||||||
|
LOG_PATH = logPath + "/plugins"
|
||||||
|
RESULT_FILE = os.path.join(LOG_PATH, f"last_result.{pluginName}.log")
|
||||||
|
STATE_FILE = os.path.join(dataPath, f"state.{pluginName}.json")
|
||||||
|
|
||||||
|
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||||
|
|
||||||
|
|
||||||
|
def load_managed_names() -> Set[str]:
|
||||||
|
"""Return the set of AdGuard client names we previously added."""
|
||||||
|
try:
|
||||||
|
with open(STATE_FILE) as f:
|
||||||
|
return set(json.load(f).get("managed", []))
|
||||||
|
except (FileNotFoundError, json.JSONDecodeError):
|
||||||
|
return set()
|
||||||
|
|
||||||
|
|
||||||
|
def save_managed_names(names: Set[str]) -> None:
|
||||||
|
with open(STATE_FILE, "w") as f:
|
||||||
|
json.dump({"managed": sorted(names)}, f, indent=2)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Device type → AdGuard tag mapping
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
_TYPE_TAG_MAP: Dict[str, str] = {
|
||||||
|
"ap": "device_other",
|
||||||
|
"desktop": "device_pc",
|
||||||
|
"game console": "device_gameconsole",
|
||||||
|
"gameconsole": "device_gameconsole",
|
||||||
|
"gateway": "device_other",
|
||||||
|
"house appliance": "device_other",
|
||||||
|
"hypervisor": "device_pc",
|
||||||
|
"ip camera": "device_camera",
|
||||||
|
"camera": "device_camera",
|
||||||
|
"laptop": "device_laptop",
|
||||||
|
"notebook": "device_laptop",
|
||||||
|
"nas": "device_nas",
|
||||||
|
"printer": "device_printer",
|
||||||
|
"router": "device_other",
|
||||||
|
"server": "device_pc",
|
||||||
|
"smarttv": "device_tv",
|
||||||
|
"smart tv": "device_tv",
|
||||||
|
"tv": "device_tv",
|
||||||
|
"smartphone": "device_phone",
|
||||||
|
"phone": "device_phone",
|
||||||
|
"mobile": "device_phone",
|
||||||
|
"smartwatch": "device_phone",
|
||||||
|
"watch": "device_phone",
|
||||||
|
"tablet": "device_tablet",
|
||||||
|
"virtual assistance": "device_audio",
|
||||||
|
"assistant": "device_audio",
|
||||||
|
"speaker": "device_audio",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def device_type_to_tag(dev_type: str) -> str:
|
||||||
|
"""Map a NetAlertX devType string to a valid AdGuard Home tag, or ''."""
|
||||||
|
if not dev_type:
|
||||||
|
return ""
|
||||||
|
key = dev_type.strip().lower()
|
||||||
|
if key in _TYPE_TAG_MAP:
|
||||||
|
return _TYPE_TAG_MAP[key]
|
||||||
|
# Substring fallback for partial matches
|
||||||
|
for pattern, tag in _TYPE_TAG_MAP.items():
|
||||||
|
if pattern in key:
|
||||||
|
return tag
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# AdGuard Home client
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class AdGuardClient:
|
||||||
|
"""Thin wrapper around the AdGuard Home /control/clients* API."""
|
||||||
|
|
||||||
|
def __init__(self, base_url: str, username: str, password: str, verify_ssl: bool = True):
|
||||||
|
self.base_url = base_url.rstrip("/")
|
||||||
|
self.auth = (username, password)
|
||||||
|
self.verify_ssl = verify_ssl
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.session.auth = self.auth
|
||||||
|
|
||||||
|
def _url(self, path: str) -> str:
|
||||||
|
return f"{self.base_url}/control/{path.lstrip('/')}"
|
||||||
|
|
||||||
|
def get_clients(self) -> List[dict]:
|
||||||
|
"""Return the list of persistent (manually added) clients."""
|
||||||
|
resp = self.session.get(self._url("clients"), verify=self.verify_ssl, timeout=15)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json().get("clients") or []
|
||||||
|
|
||||||
|
def add_client(self, client: dict) -> None:
|
||||||
|
resp = self.session.post(
|
||||||
|
self._url("clients/add"),
|
||||||
|
json=client,
|
||||||
|
verify=self.verify_ssl,
|
||||||
|
timeout=15,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
def update_client(self, old_name: str, client: dict) -> None:
|
||||||
|
payload = {"name": old_name, "data": client}
|
||||||
|
resp = self.session.post(
|
||||||
|
self._url("clients/update"),
|
||||||
|
json=payload,
|
||||||
|
verify=self.verify_ssl,
|
||||||
|
timeout=15,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
def delete_client(self, name: str) -> None:
|
||||||
|
resp = self.session.post(
|
||||||
|
self._url("clients/delete"),
|
||||||
|
json={"name": name},
|
||||||
|
verify=self.verify_ssl,
|
||||||
|
timeout=15,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Database helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def get_netalertx_devices(include_offline: bool, include_new: bool) -> List[dict]:
|
||||||
|
"""
|
||||||
|
Return filtered devices from NetAlertX using the DeviceInstance model.
|
||||||
|
Fields returned per device: mac, name, last_ip, dev_type
|
||||||
|
"""
|
||||||
|
devices = []
|
||||||
|
try:
|
||||||
|
for d in DeviceInstance().getAll():
|
||||||
|
if d.get("devIsArchived", 0):
|
||||||
|
continue
|
||||||
|
if not include_offline and not d.get("devPresentLastScan", 1):
|
||||||
|
continue
|
||||||
|
if not include_new and d.get("devIsNew", 0):
|
||||||
|
continue
|
||||||
|
|
||||||
|
mac = (d.get("devMac", "") or "").strip()
|
||||||
|
last_ip = (d.get("devLastIP", "") or "").strip()
|
||||||
|
name = (d.get("devName", "") or "").strip()
|
||||||
|
dev_type = (d.get("devType", "") or "").strip()
|
||||||
|
|
||||||
|
if not mac and not last_ip:
|
||||||
|
continue
|
||||||
|
if not name:
|
||||||
|
name = mac or last_ip
|
||||||
|
|
||||||
|
devices.append({"mac": mac, "name": name, "last_ip": last_ip, "dev_type": dev_type})
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR reading devices: {exc}"])
|
||||||
|
|
||||||
|
return devices
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sync logic
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def build_agrd_client(device: dict, use_mac: bool) -> dict:
|
||||||
|
"""
|
||||||
|
Build an AdGuard Home client object from a NetAlertX device row.
|
||||||
|
|
||||||
|
AdGuard Home identifies a client by its 'ids' list, which may contain
|
||||||
|
MACs, IPs, CIDRs, or ClientIDs. We prefer MAC when available; fall
|
||||||
|
back to IP otherwise.
|
||||||
|
"""
|
||||||
|
ids = []
|
||||||
|
if use_mac and device["mac"] and device["mac"] not in ("", "00:00:00:00:00:00"):
|
||||||
|
ids.append(device["mac"].lower())
|
||||||
|
if device["last_ip"] and device["last_ip"] not in ("", "0.0.0.0"):
|
||||||
|
ids.append(device["last_ip"])
|
||||||
|
|
||||||
|
if not ids:
|
||||||
|
return {} # nothing useful to identify the device
|
||||||
|
|
||||||
|
tag = device_type_to_tag(device.get("dev_type", ""))
|
||||||
|
return {
|
||||||
|
"name": device["name"],
|
||||||
|
"ids": ids,
|
||||||
|
"tags": [tag] if tag else [],
|
||||||
|
"use_global_settings": True,
|
||||||
|
"use_global_blocked_services": True,
|
||||||
|
"filtering_enabled": False,
|
||||||
|
"parental_enabled": False,
|
||||||
|
"safebrowsing_enabled": False,
|
||||||
|
"safesearch_enabled": False,
|
||||||
|
"blocked_services": [],
|
||||||
|
"upstreams": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def sync_to_adguard(
|
||||||
|
agrd: AdGuardClient,
|
||||||
|
devices: List[dict],
|
||||||
|
use_mac: bool,
|
||||||
|
delete_missing: bool,
|
||||||
|
existing_clients: Optional[List[dict]] = None,
|
||||||
|
) -> Tuple[int, int, int, int]:
|
||||||
|
"""
|
||||||
|
Core sync routine. Returns (added, updated, skipped, deleted).
|
||||||
|
Pass existing_clients to reuse a list already fetched (avoids a second
|
||||||
|
round-trip when the caller performed a connectivity check first).
|
||||||
|
"""
|
||||||
|
if existing_clients is None:
|
||||||
|
existing_clients = agrd.get_clients()
|
||||||
|
mylog("verbose", [f"[{pluginName}] AdGuard Home currently has {len(existing_clients)} persistent client(s)."])
|
||||||
|
|
||||||
|
# Build a lookup: identifier → client dict
|
||||||
|
existing_by_id: Dict[str, dict] = {}
|
||||||
|
for client in existing_clients:
|
||||||
|
for cid in client.get("ids", []):
|
||||||
|
existing_by_id[cid.lower()] = client
|
||||||
|
|
||||||
|
# Also index by name for update / delete operations (warn if AdGuard has duplicate names)
|
||||||
|
existing_by_name: Dict[str, dict] = {}
|
||||||
|
for c in existing_clients:
|
||||||
|
if c["name"] in existing_by_name:
|
||||||
|
mylog("verbose", [f"[{pluginName}] WARNING duplicate client name in AdGuard Home: {c['name']!r}"])
|
||||||
|
existing_by_name[c["name"]] = c
|
||||||
|
|
||||||
|
# Load the set of client names we've previously added so that DELETE mode
|
||||||
|
# only removes clients we created, not manually-added ones.
|
||||||
|
managed_names = load_managed_names()
|
||||||
|
|
||||||
|
added = updated = skipped = deleted = 0
|
||||||
|
|
||||||
|
# ----- add / update -----
|
||||||
|
for device in devices:
|
||||||
|
client_data = build_agrd_client(device, use_mac)
|
||||||
|
if not client_data:
|
||||||
|
if not use_mac and not device["last_ip"]:
|
||||||
|
reason = "no IP address (USEMAC is disabled, IP required)"
|
||||||
|
else:
|
||||||
|
reason = "no usable MAC or IP"
|
||||||
|
mylog("verbose", [f"[{pluginName}] SKIP {device['name']!r} – {reason}"])
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check whether any of the ids already exist in AdGuard
|
||||||
|
existing = None
|
||||||
|
for cid in client_data["ids"]:
|
||||||
|
if cid.lower() in existing_by_id:
|
||||||
|
existing = existing_by_id[cid.lower()]
|
||||||
|
break
|
||||||
|
|
||||||
|
if existing is None and device["name"] in managed_names:
|
||||||
|
# Fall back to name match only for clients we previously added — avoids
|
||||||
|
# accidentally matching a manually-created AdGuard client with the same name.
|
||||||
|
existing = existing_by_name.get(device["name"])
|
||||||
|
if existing:
|
||||||
|
mylog("verbose", [f"[{pluginName}] WARN matched {device['name']!r} by name (no ID match) — verify no duplicate clients"])
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
old_name = existing["name"]
|
||||||
|
# Preserve existing per-client AdGuard settings; we only manage name, ids, tags.
|
||||||
|
_our_keys = frozenset(("name", "ids", "tags"))
|
||||||
|
merged_data = {**client_data, **{k: v for k, v in existing.items() if k not in _our_keys}}
|
||||||
|
# Only call update when something actually changed to avoid noise
|
||||||
|
if (
|
||||||
|
sorted(i.lower() for i in existing.get("ids", [])) != sorted(i.lower() for i in client_data["ids"])
|
||||||
|
or existing.get("name") != client_data["name"]
|
||||||
|
or sorted(existing.get("tags", [])) != sorted(client_data["tags"])
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
agrd.update_client(old_name, merged_data)
|
||||||
|
mylog("verbose", [f"[{pluginName}] UPDATE {old_name!r} → {device['name']!r} ids={client_data['ids']}"])
|
||||||
|
# Only track the rename for clients we already own — never adopt a manually-created client.
|
||||||
|
if old_name in managed_names:
|
||||||
|
managed_names.discard(old_name)
|
||||||
|
managed_names.add(device["name"])
|
||||||
|
updated += 1
|
||||||
|
except requests.HTTPError as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR updating {device['name']!r}: {exc}"])
|
||||||
|
skipped += 1
|
||||||
|
else:
|
||||||
|
mylog("verbose", [f"[{pluginName}] SKIP (no change) {device['name']!r}"])
|
||||||
|
# No managed_names update: if we created this client it's already in the state
|
||||||
|
# file; if it's a manually-created client we must not claim ownership of it.
|
||||||
|
skipped += 1
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
agrd.add_client(client_data)
|
||||||
|
mylog("verbose", [f"[{pluginName}] ADD {device['name']!r} ids={client_data['ids']}"])
|
||||||
|
managed_names.add(device["name"])
|
||||||
|
added += 1
|
||||||
|
except requests.HTTPError as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR adding {device['name']!r}: {exc}"])
|
||||||
|
skipped += 1
|
||||||
|
|
||||||
|
# ----- optional delete of AdGuard clients no longer in NetAlertX -----
|
||||||
|
if delete_missing:
|
||||||
|
export_names = {d["name"] for d in devices}
|
||||||
|
for client in existing_clients:
|
||||||
|
cname = client.get("name", "")
|
||||||
|
# Only delete clients that we previously added (tracked in state file)
|
||||||
|
# so we don't accidentally remove manually-added clients.
|
||||||
|
if cname in managed_names and cname not in export_names:
|
||||||
|
try:
|
||||||
|
agrd.delete_client(cname)
|
||||||
|
mylog("verbose", [f"[{pluginName}] DELETE {cname!r} (no longer in NetAlertX)"])
|
||||||
|
managed_names.discard(cname)
|
||||||
|
deleted += 1
|
||||||
|
except requests.HTTPError as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR deleting {cname!r}: {exc}"])
|
||||||
|
|
||||||
|
save_managed_names(managed_names)
|
||||||
|
return added, updated, skipped, deleted
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Entry point
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def main():
|
||||||
|
mylog("verbose", [f"[{pluginName}] In script"])
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Read settings
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
agrd_url = get_setting_value("ADGUARDEXP_URL") or "http://localhost:3000"
|
||||||
|
agrd_user = get_setting_value("ADGUARDEXP_USER") or ""
|
||||||
|
agrd_pass = get_setting_value("ADGUARDEXP_PASSWORD") or ""
|
||||||
|
verify_ssl_str = get_setting_value("ADGUARDEXP_VERIFYSSL") or "true"
|
||||||
|
include_offline_str = get_setting_value("ADGUARDEXP_INCLUDE_OFFLINE") or "true"
|
||||||
|
include_new_str = get_setting_value("ADGUARDEXP_INCLUDE_NEW") or "false"
|
||||||
|
use_mac_str = get_setting_value("ADGUARDEXP_USEMAC") or "true"
|
||||||
|
delete_str = get_setting_value("ADGUARDEXP_DELETE") or "false"
|
||||||
|
|
||||||
|
verify_ssl = verify_ssl_str.strip().lower() not in ("false", "0", "no")
|
||||||
|
include_offline = include_offline_str.strip().lower() not in ("false", "0", "no")
|
||||||
|
include_new = include_new_str.strip().lower() not in ("false", "0", "no")
|
||||||
|
use_mac = use_mac_str.strip().lower() not in ("false", "0", "no")
|
||||||
|
delete_miss = delete_str.strip().lower() not in ("false", "0", "no")
|
||||||
|
|
||||||
|
mylog("verbose", [f"[{pluginName}] " + ("=" * 60)])
|
||||||
|
mylog("verbose", [f"[{pluginName}] AdGuard Home URL : {agrd_url}"])
|
||||||
|
mylog("verbose", [f"[{pluginName}] Include offline devs: {include_offline}"])
|
||||||
|
mylog("verbose", [f"[{pluginName}] Include new/unknown : {include_new}"])
|
||||||
|
mylog("verbose", [f"[{pluginName}] Use MAC as id : {use_mac}"])
|
||||||
|
mylog("verbose", [f"[{pluginName}] Delete missing : {delete_miss}"])
|
||||||
|
mylog("verbose", [f"[{pluginName}] " + ("=" * 60)])
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Load devices from NetAlertX
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
devices = get_netalertx_devices(include_offline, include_new)
|
||||||
|
mylog("verbose", [f"[{pluginName}] Loaded {len(devices)} device(s) from NetAlertX database."])
|
||||||
|
|
||||||
|
if not devices:
|
||||||
|
mylog("verbose", ["No devices to sync – exiting."])
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = "adguard_export",
|
||||||
|
secondaryId = "summary",
|
||||||
|
watched1 = "0",
|
||||||
|
watched2 = "0",
|
||||||
|
watched3 = "0",
|
||||||
|
watched4 = "0",
|
||||||
|
extra = "No devices found in NetAlertX",
|
||||||
|
)
|
||||||
|
plugin_objects.write_result_file()
|
||||||
|
return
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Connect to AdGuard Home and sync
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
try:
|
||||||
|
agrd = AdGuardClient(agrd_url, agrd_user, agrd_pass, verify_ssl)
|
||||||
|
existing_clients = agrd.get_clients()
|
||||||
|
except requests.exceptions.ConnectionError as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR – cannot connect to AdGuard Home at {agrd_url}: {exc}"])
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = "adguard_export",
|
||||||
|
secondaryId = "error",
|
||||||
|
extra = f"Connection failed: {exc}",
|
||||||
|
)
|
||||||
|
plugin_objects.write_result_file()
|
||||||
|
return
|
||||||
|
except requests.HTTPError as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR – AdGuard Home returned an HTTP error: {exc}"])
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = "adguard_export",
|
||||||
|
secondaryId = "error",
|
||||||
|
extra = f"HTTP error: {exc}",
|
||||||
|
)
|
||||||
|
plugin_objects.write_result_file()
|
||||||
|
return
|
||||||
|
except Exception as exc:
|
||||||
|
mylog("verbose", [f"[{pluginName}] ERROR – AdGuard Home returned an unknown error: {exc}"])
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = "adguard_export",
|
||||||
|
secondaryId = "error",
|
||||||
|
extra = f"Unknown error: {exc}",
|
||||||
|
)
|
||||||
|
plugin_objects.write_result_file()
|
||||||
|
return
|
||||||
|
|
||||||
|
added, updated, skipped, deleted = sync_to_adguard(
|
||||||
|
agrd, devices, use_mac, delete_miss, existing_clients=existing_clients
|
||||||
|
)
|
||||||
|
|
||||||
|
summary = (
|
||||||
|
f"Sync complete – added={added} updated={updated} "
|
||||||
|
f"skipped={skipped} deleted={deleted}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Write plugin result (one summary row + one row per touched device)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = "adguard_export",
|
||||||
|
secondaryId = "summary",
|
||||||
|
watched1 = str(added),
|
||||||
|
watched2 = str(updated),
|
||||||
|
watched3 = str(skipped),
|
||||||
|
watched4 = str(deleted),
|
||||||
|
extra = summary,
|
||||||
|
)
|
||||||
|
|
||||||
|
for device in devices:
|
||||||
|
plugin_objects.add_object(
|
||||||
|
primaryId = device["mac"] or device["last_ip"],
|
||||||
|
secondaryId = device["last_ip"],
|
||||||
|
watched1 = device["name"],
|
||||||
|
watched2 = device["mac"],
|
||||||
|
watched3 = device["last_ip"],
|
||||||
|
watched4 = agrd_url,
|
||||||
|
extra = "exported",
|
||||||
|
foreignKey = device["mac"] or "",
|
||||||
|
)
|
||||||
|
|
||||||
|
mylog("verbose", [f"[{pluginName}] {summary}"])
|
||||||
|
plugin_objects.write_result_file()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -26,7 +26,7 @@
|
|||||||
"description": [
|
"description": [
|
||||||
{
|
{
|
||||||
"language_code": "en_us",
|
"language_code": "en_us",
|
||||||
"string": "Plugin to import devices from AdGuard."
|
"string": "Imports known devices from AdGuard to NetAlertX as persistent devices, keeping names and IP/MAC identifiers in sync."
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"icon": [
|
"icon": [
|
||||||
@@ -36,7 +36,7 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"params": [],
|
"params": [],
|
||||||
"settings": [
|
"settings": [
|
||||||
{
|
{
|
||||||
"function": "RUN",
|
"function": "RUN",
|
||||||
"events": ["run"],
|
"events": ["run"],
|
||||||
|
|||||||
391
test/plugins/test_adguard_export.py
Normal file
391
test/plugins/test_adguard_export.py
Normal file
@@ -0,0 +1,391 @@
|
|||||||
|
"""
|
||||||
|
Tests for adguard_export/script.py
|
||||||
|
|
||||||
|
Run from inside the NetAlertX container (where the full environment is available),
|
||||||
|
or locally — in that case the NetAlertX-specific modules are stubbed out
|
||||||
|
automatically before the script is imported.
|
||||||
|
|
||||||
|
pytest test/plugins/test_adguard_export.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import types
|
||||||
|
from unittest.mock import MagicMock, call, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stub NetAlertX-specific modules so tests can run outside the container.
|
||||||
|
# sys.modules.setdefault() is a no-op when the real module is already loaded,
|
||||||
|
# so this is safe to run inside the container too.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
_tmp_log = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
|
||||||
|
def _stub(name: str, **attrs):
|
||||||
|
if name not in sys.modules:
|
||||||
|
mod = types.ModuleType(name)
|
||||||
|
for k, v in attrs.items():
|
||||||
|
setattr(mod, k, v)
|
||||||
|
sys.modules[name] = mod
|
||||||
|
|
||||||
|
|
||||||
|
_stub("pytz", timezone=lambda tz: tz)
|
||||||
|
_stub("conf")
|
||||||
|
_stub("const", dataPath=_tmp_log, logPath=_tmp_log, fullDbPath=os.path.join(_tmp_log, "test.db"))
|
||||||
|
_stub("plugin_helper", Plugin_Objects=MagicMock)
|
||||||
|
_stub("logger", mylog=lambda *a: None, Logger=MagicMock)
|
||||||
|
_stub("helper", get_setting_value=lambda k: "")
|
||||||
|
_stub("models", )
|
||||||
|
_stub("models.device_instance", DeviceInstance=MagicMock)
|
||||||
|
|
||||||
|
# Stub requests only when it isn't installed (e.g. bare system Python locally).
|
||||||
|
# In the container and CI, the real package is present and will be used.
|
||||||
|
if "requests" not in sys.modules:
|
||||||
|
_req = types.ModuleType("requests")
|
||||||
|
_req.Session = MagicMock
|
||||||
|
_req.HTTPError = type("HTTPError", (Exception,), {})
|
||||||
|
_req_exc = types.ModuleType("requests.exceptions")
|
||||||
|
_req_exc.ConnectionError = type("ConnectionError", (Exception,), {})
|
||||||
|
_req.exceptions = _req_exc
|
||||||
|
sys.modules["requests"] = _req
|
||||||
|
sys.modules["requests.exceptions"] = _req_exc
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Import the functions under test (must come after the stubs above).
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "front", "plugins", "adguard_export"))
|
||||||
|
|
||||||
|
from script import ( # noqa: E402
|
||||||
|
AdGuardClient,
|
||||||
|
_TYPE_TAG_MAP,
|
||||||
|
build_agrd_client,
|
||||||
|
device_type_to_tag,
|
||||||
|
get_netalertx_devices,
|
||||||
|
load_managed_names,
|
||||||
|
save_managed_names,
|
||||||
|
sync_to_adguard,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _raw_device(**overrides) -> dict:
|
||||||
|
"""Build a raw DeviceInstance.getAll() style dict."""
|
||||||
|
base = {
|
||||||
|
"devMac": "AA:BB:CC:00:00:01",
|
||||||
|
"devName": "PC",
|
||||||
|
"devLastIP": "10.0.0.1",
|
||||||
|
"devType": "desktop",
|
||||||
|
"devIsArchived": 0,
|
||||||
|
"devPresentLastScan": 1,
|
||||||
|
"devIsNew": 0,
|
||||||
|
}
|
||||||
|
return {**base, **overrides}
|
||||||
|
|
||||||
|
|
||||||
|
def _mock_agrd(existing=None) -> MagicMock:
|
||||||
|
"""Return a mock AdGuardClient whose get_clients() returns *existing*."""
|
||||||
|
agrd = MagicMock(spec=AdGuardClient)
|
||||||
|
agrd.get_clients.return_value = existing or []
|
||||||
|
return agrd
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# device_type_to_tag
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeviceTypeToTag:
|
||||||
|
def test_empty_string_returns_empty(self):
|
||||||
|
assert device_type_to_tag("") == ""
|
||||||
|
|
||||||
|
def test_none_returns_empty(self):
|
||||||
|
assert device_type_to_tag(None) == ""
|
||||||
|
|
||||||
|
def test_exact_match_case_insensitive(self):
|
||||||
|
assert device_type_to_tag("Smartphone") == "device_phone"
|
||||||
|
assert device_type_to_tag("LAPTOP") == "device_laptop"
|
||||||
|
assert device_type_to_tag("nas") == "device_nas"
|
||||||
|
|
||||||
|
def test_substring_fallback(self):
|
||||||
|
# "gaming smartphone" contains "smartphone"
|
||||||
|
assert device_type_to_tag("gaming smartphone") == "device_phone"
|
||||||
|
|
||||||
|
def test_unknown_type_returns_empty(self):
|
||||||
|
assert device_type_to_tag("toaster") == ""
|
||||||
|
|
||||||
|
def test_all_map_values_are_valid_adguard_tags(self):
|
||||||
|
valid_prefixes = ("device_", "ct_", "os_")
|
||||||
|
for tag in _TYPE_TAG_MAP.values():
|
||||||
|
assert any(tag.startswith(p) for p in valid_prefixes), (
|
||||||
|
f"{tag!r} is not a valid AdGuard Home tag"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# build_agrd_client
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestBuildAgrdClient:
|
||||||
|
def _device(self, **overrides) -> dict:
|
||||||
|
base = {"mac": "AA:BB:CC:DD:EE:FF", "name": "My PC", "last_ip": "192.168.1.10", "dev_type": "desktop"}
|
||||||
|
return {**base, **overrides}
|
||||||
|
|
||||||
|
def test_mac_and_ip_both_included_when_use_mac_true(self):
|
||||||
|
result = build_agrd_client(self._device(), use_mac=True)
|
||||||
|
assert "aa:bb:cc:dd:ee:ff" in result["ids"]
|
||||||
|
assert "192.168.1.10" in result["ids"]
|
||||||
|
|
||||||
|
def test_only_ip_when_use_mac_false(self):
|
||||||
|
result = build_agrd_client(self._device(), use_mac=False)
|
||||||
|
assert result["ids"] == ["192.168.1.10"]
|
||||||
|
|
||||||
|
def test_returns_empty_dict_when_no_usable_id(self):
|
||||||
|
result = build_agrd_client(
|
||||||
|
{"mac": "", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""},
|
||||||
|
use_mac=True,
|
||||||
|
)
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
def test_null_mac_falls_back_to_ip(self):
|
||||||
|
result = build_agrd_client(
|
||||||
|
{"mac": "00:00:00:00:00:00", "name": "Dev", "last_ip": "10.0.0.5", "dev_type": ""},
|
||||||
|
use_mac=True,
|
||||||
|
)
|
||||||
|
assert result["ids"] == ["10.0.0.5"]
|
||||||
|
|
||||||
|
def test_device_type_tag_applied(self):
|
||||||
|
result = build_agrd_client(self._device(dev_type="smartphone"), use_mac=True)
|
||||||
|
assert result["tags"] == ["device_phone"]
|
||||||
|
|
||||||
|
def test_unknown_device_type_produces_no_tag(self):
|
||||||
|
result = build_agrd_client(self._device(dev_type=""), use_mac=True)
|
||||||
|
assert result["tags"] == []
|
||||||
|
|
||||||
|
def test_mac_is_lowercased(self):
|
||||||
|
result = build_agrd_client(self._device(mac="AA:BB:CC:DD:EE:FF"), use_mac=True)
|
||||||
|
assert "aa:bb:cc:dd:ee:ff" in result["ids"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# load_managed_names / save_managed_names
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestManagedNames:
|
||||||
|
def test_round_trip(self, tmp_path):
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
save_managed_names({"alpha", "beta", "gamma"})
|
||||||
|
loaded = load_managed_names()
|
||||||
|
assert loaded == {"alpha", "beta", "gamma"}
|
||||||
|
|
||||||
|
def test_missing_file_returns_empty_set(self, tmp_path):
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "nonexistent.json")):
|
||||||
|
assert load_managed_names() == set()
|
||||||
|
|
||||||
|
def test_corrupt_file_returns_empty_set(self, tmp_path):
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
state.write_text("not valid json")
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
assert load_managed_names() == set()
|
||||||
|
|
||||||
|
def test_save_sorts_names(self, tmp_path):
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
save_managed_names({"zebra", "apple", "mango"})
|
||||||
|
data = json.loads(state.read_text())
|
||||||
|
assert data["managed"] == ["apple", "mango", "zebra"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# get_netalertx_devices
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetNetalertxDevices:
|
||||||
|
def _call(self, rows, include_offline=True, include_new=True):
|
||||||
|
with patch("script.DeviceInstance") as mock_di:
|
||||||
|
mock_di.return_value.getAll.return_value = rows
|
||||||
|
return get_netalertx_devices(include_offline=include_offline, include_new=include_new)
|
||||||
|
|
||||||
|
def test_basic_query(self):
|
||||||
|
result = self._call([_raw_device()])
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0]["name"] == "PC"
|
||||||
|
assert result[0]["mac"] == "AA:BB:CC:00:00:01"
|
||||||
|
|
||||||
|
def test_archived_devices_excluded(self):
|
||||||
|
result = self._call([
|
||||||
|
_raw_device(devMac="AA:00:00:00:00:01", devName="Active", devIsArchived=0),
|
||||||
|
_raw_device(devMac="AA:00:00:00:00:02", devName="Archived", devIsArchived=1),
|
||||||
|
])
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0]["name"] == "Active"
|
||||||
|
|
||||||
|
def test_offline_excluded_when_flag_false(self):
|
||||||
|
result = self._call([
|
||||||
|
_raw_device(devMac="AA:00:00:00:00:01", devName="Online", devPresentLastScan=1),
|
||||||
|
_raw_device(devMac="AA:00:00:00:00:02", devName="Offline", devPresentLastScan=0),
|
||||||
|
], include_offline=False)
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0]["name"] == "Online"
|
||||||
|
|
||||||
|
def test_new_devices_excluded_when_flag_false(self):
|
||||||
|
result = self._call([
|
||||||
|
_raw_device(devMac="AA:00:00:00:00:01", devName="Known", devIsNew=0),
|
||||||
|
_raw_device(devMac="AA:00:00:00:00:02", devName="Unknown", devIsNew=1),
|
||||||
|
], include_new=False)
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0]["name"] == "Known"
|
||||||
|
|
||||||
|
def test_nameless_device_falls_back_to_mac(self):
|
||||||
|
result = self._call([_raw_device(devMac="BB:CC:DD:EE:FF:00", devName="", devLastIP="10.0.0.5")])
|
||||||
|
assert result[0]["name"] == "BB:CC:DD:EE:FF:00"
|
||||||
|
|
||||||
|
def test_row_with_no_mac_and_no_ip_skipped(self):
|
||||||
|
result = self._call([_raw_device(devMac="", devName="Ghost", devLastIP="")])
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
def test_exception_returns_empty_list(self):
|
||||||
|
with patch("script.DeviceInstance") as mock_di:
|
||||||
|
mock_di.return_value.getAll.side_effect = Exception("db error")
|
||||||
|
assert get_netalertx_devices(True, True) == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# sync_to_adguard
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestSyncToAdguard:
|
||||||
|
def _device(self, name="PC", mac="AA:BB:CC:00:00:01", ip="10.0.0.1", dev_type="desktop") -> dict:
|
||||||
|
return {"mac": mac, "name": name, "last_ip": ip, "dev_type": dev_type}
|
||||||
|
|
||||||
|
def test_new_device_is_added(self, tmp_path):
|
||||||
|
agrd = _mock_agrd(existing=[])
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
|
||||||
|
added, updated, skipped, deleted = sync_to_adguard(
|
||||||
|
agrd, [self._device()], use_mac=True, delete_missing=False
|
||||||
|
)
|
||||||
|
assert added == 1
|
||||||
|
assert updated == skipped == deleted == 0
|
||||||
|
agrd.add_client.assert_called_once()
|
||||||
|
|
||||||
|
def test_unchanged_device_is_skipped(self, tmp_path):
|
||||||
|
existing = [{"name": "PC", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
|
||||||
|
added, updated, skipped, deleted = sync_to_adguard(
|
||||||
|
agrd, [self._device()], use_mac=True, delete_missing=False
|
||||||
|
)
|
||||||
|
assert skipped == 1
|
||||||
|
assert added == updated == deleted == 0
|
||||||
|
agrd.update_client.assert_not_called()
|
||||||
|
|
||||||
|
def test_renamed_device_is_updated(self, tmp_path):
|
||||||
|
existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
|
||||||
|
added, updated, skipped, deleted = sync_to_adguard(
|
||||||
|
agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False
|
||||||
|
)
|
||||||
|
assert updated == 1
|
||||||
|
agrd.update_client.assert_called_once_with("Old Name", agrd.update_client.call_args[0][1])
|
||||||
|
|
||||||
|
def test_missing_device_deleted_when_flag_true(self, tmp_path):
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
state.write_text(json.dumps({"managed": ["Gone Device"]}))
|
||||||
|
existing = [{"name": "Gone Device", "ids": ["10.0.0.99"], "tags": []}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
added, updated, skipped, deleted = sync_to_adguard(
|
||||||
|
agrd, [], use_mac=True, delete_missing=True
|
||||||
|
)
|
||||||
|
assert deleted == 1
|
||||||
|
agrd.delete_client.assert_called_once_with("Gone Device")
|
||||||
|
|
||||||
|
def test_unmanaged_device_not_deleted(self, tmp_path):
|
||||||
|
# State file is empty — we never added this client
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
state.write_text(json.dumps({"managed": []}))
|
||||||
|
existing = [{"name": "Manual Client", "ids": ["10.0.0.50"], "tags": []}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
sync_to_adguard(agrd, [], use_mac=True, delete_missing=True)
|
||||||
|
agrd.delete_client.assert_not_called()
|
||||||
|
|
||||||
|
def test_manual_client_matched_by_id_not_adopted(self, tmp_path):
|
||||||
|
# A manually-created AdGuard client whose IP matches a NetAlertX device
|
||||||
|
# must not be added to managed_names — so DELETE=true won't touch it later.
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
state.write_text(json.dumps({"managed": []}))
|
||||||
|
existing = [{"name": "Manual Client", "ids": ["10.0.0.5"], "tags": []}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
device = {"mac": "", "name": "Manual Client", "last_ip": "10.0.0.5", "dev_type": ""}
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
sync_to_adguard(agrd, [device], use_mac=True, delete_missing=True)
|
||||||
|
loaded = load_managed_names()
|
||||||
|
assert "Manual Client" not in loaded
|
||||||
|
agrd.delete_client.assert_not_called()
|
||||||
|
|
||||||
|
def test_device_with_no_usable_id_is_skipped(self, tmp_path):
|
||||||
|
agrd = _mock_agrd(existing=[])
|
||||||
|
device = {"mac": "00:00:00:00:00:00", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""}
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
|
||||||
|
added, updated, skipped, deleted = sync_to_adguard(
|
||||||
|
agrd, [device], use_mac=True, delete_missing=False
|
||||||
|
)
|
||||||
|
assert skipped == 1
|
||||||
|
agrd.add_client.assert_not_called()
|
||||||
|
|
||||||
|
def test_existing_clients_parameter_avoids_extra_api_call(self, tmp_path):
|
||||||
|
existing = []
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
|
||||||
|
sync_to_adguard(
|
||||||
|
agrd, [self._device()], use_mac=True, delete_missing=False,
|
||||||
|
existing_clients=existing,
|
||||||
|
)
|
||||||
|
agrd.get_clients.assert_not_called()
|
||||||
|
|
||||||
|
def test_rename_removes_old_name_from_managed_names(self, tmp_path):
|
||||||
|
state = tmp_path / "state.json"
|
||||||
|
state.write_text(json.dumps({"managed": ["Old Name"]}))
|
||||||
|
existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(state)):
|
||||||
|
sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False)
|
||||||
|
loaded = load_managed_names()
|
||||||
|
assert "Old Name" not in loaded
|
||||||
|
assert "New Name" in loaded
|
||||||
|
|
||||||
|
def test_update_preserves_custom_adguard_settings(self, tmp_path):
|
||||||
|
existing = [{
|
||||||
|
"name": "Old Name",
|
||||||
|
"ids": ["aa:bb:cc:00:00:01", "10.0.0.1"],
|
||||||
|
"tags": ["device_pc"],
|
||||||
|
"filtering_enabled": True,
|
||||||
|
"use_global_settings": False,
|
||||||
|
"parental_enabled": True,
|
||||||
|
"safebrowsing_enabled": False,
|
||||||
|
"safesearch_enabled": False,
|
||||||
|
"use_global_blocked_services": False,
|
||||||
|
"blocked_services": ["youtube.com"],
|
||||||
|
"upstreams": ["1.1.1.1"],
|
||||||
|
}]
|
||||||
|
agrd = _mock_agrd(existing=existing)
|
||||||
|
with patch("script.STATE_FILE", str(tmp_path / "state.json")):
|
||||||
|
sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False)
|
||||||
|
_, sent_payload = agrd.update_client.call_args[0]
|
||||||
|
assert sent_payload["filtering_enabled"] is True
|
||||||
|
assert sent_payload["use_global_settings"] is False
|
||||||
|
assert sent_payload["blocked_services"] == ["youtube.com"]
|
||||||
|
assert sent_payload["upstreams"] == ["1.1.1.1"]
|
||||||
Reference in New Issue
Block a user