mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-06-20 22:30:11 -04:00
Add REST Import plugin and corresponding tests
- Implemented the REST Import plugin (rest_import.py) to handle importing data from REST APIs. - Added functionality for configurable HTTP methods, authentication types, and custom headers. - Included error handling for various HTTP response statuses and connection issues. - Created unit tests for the plugin covering header building, path resolution, MAC validation, record mapping, and authentication methods. - Ensured that module-level side effects are patched during tests to prevent live interactions.
This commit is contained in:
16
.gemini/skills/plugin-development/plugin-skill.md
Normal file
16
.gemini/skills/plugin-development/plugin-skill.md
Normal file
@@ -0,0 +1,16 @@
|
||||
# Plugin development skill
|
||||
|
||||
1. Assess requirements
|
||||
2. Read `docs/PLUGINS_DEV.md`
|
||||
3. Confirm field mapping
|
||||
4. Ask clarifying questions
|
||||
5. code placed in `front/plugins/<new plugin name>`
|
||||
|
||||
|
||||
Plugin prefix:
|
||||
|
||||
- has to be uppercase letters only (no underscores)
|
||||
- must be unique
|
||||
- keep short but readable if possible
|
||||
|
||||
|
||||
@@ -1846,6 +1846,13 @@ textarea[readonly],
|
||||
display: grid;
|
||||
}
|
||||
|
||||
@media (min-width: 768px) {
|
||||
.modal-dialog {
|
||||
width: 750px;
|
||||
margin: 30px auto;
|
||||
}
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------- */
|
||||
/* NETWORK page */
|
||||
/* ----------------------------------------------------------------- */
|
||||
|
||||
@@ -208,7 +208,12 @@ function showModalPopupForm(
|
||||
}
|
||||
}
|
||||
|
||||
const fieldOptionsOverride = field.type?.elements[0]?.elementOptions || [];
|
||||
// For select elements use field.options (the selectable values); for all other
|
||||
// element types fall back to elementOptions (used for input attributes like placeholder).
|
||||
const elementType = field.type?.elements[0]?.elementType;
|
||||
const fieldOptionsOverride = (elementType === 'select' && field.options?.length)
|
||||
? field.options
|
||||
: (field.type?.elements[0]?.elementOptions || []);
|
||||
const setValue = initialValue;
|
||||
const setType = JSON.stringify(field.type);
|
||||
const setEvents = field.events || []; // default to empty array if missing
|
||||
|
||||
@@ -1175,6 +1175,7 @@ function generateFormHtml(settingsData, set, overrideValue, overrideOptions, ori
|
||||
|
||||
// Parse the setType JSON string
|
||||
// console.log(processQuotes(setType));
|
||||
console.log(setType);
|
||||
|
||||
const setTypeObject = JSON.parse(processQuotes(setType))
|
||||
const dataType = setTypeObject.dataType;
|
||||
|
||||
@@ -129,7 +129,7 @@
|
||||
|
||||
<div id="modal-form-message" class="modal-body"> Modal message </div>
|
||||
|
||||
<div id="modal-form-plc"></div>
|
||||
<div id="modal-form-plc" style="overflow-y: auto; max-height: 60vh; padding: 0 15px;"></div>
|
||||
|
||||
<div class="modal-footer">
|
||||
<button id="modal-form-cancel" type="button" class="btn btn-outline pull-left" style="min-width: 80px;" data-dismiss="modal"> Cancel </button>
|
||||
|
||||
84
front/plugins/rest_import/README.md
Normal file
84
front/plugins/rest_import/README.md
Normal file
@@ -0,0 +1,84 @@
|
||||
## Overview
|
||||
|
||||
Import devices from any REST API that returns JSON
|
||||
|
||||
Designed to replace small vendor-specific plugins for systems like OPNsense, RouterOS, asset management tools, and custom inventory APIs. Multiple endpoints can be configured independently in a single plugin instance.
|
||||
|
||||
### Quick setup guide
|
||||
|
||||
Navigate to **Settings** > **REST Import** and click **Add** to create an import definition.
|
||||
|
||||
| Field | Description |
|
||||
|---|---|
|
||||
| **Name** | Friendly label shown in logs (e.g. `OPNsense DHCP`) |
|
||||
| **URL** | Full REST endpoint URL |
|
||||
| **Method** | `GET` or `POST` |
|
||||
| **Verify SSL** | Disable for self-signed certificates |
|
||||
| **Auth Type** | `none`, `basic`, or `bearer` |
|
||||
| **Username / API Key** | Used with `basic` auth |
|
||||
| **Password / API Secret** | Used with `basic` auth |
|
||||
| **Bearer Token** | Used with `bearer` auth |
|
||||
| **Custom Headers** | Optional. One `Key: Value` per line |
|
||||
| **POST Body** | Optional JSON body for `POST` requests |
|
||||
| **Device List Path** | Dot-separated path to the array of records (e.g. `rows`, `data.devices`). Leave blank if the response is already an array |
|
||||
|
||||
### Field mapping
|
||||
|
||||
Map API response fields to NetAlertX scan fields. Leave a mapping blank to skip that field.
|
||||
|
||||
| Popup field | Scan column |
|
||||
|---|---|
|
||||
| MAC Address | `scanMac` |
|
||||
| IP Address | `scanLastIP` |
|
||||
| Device Name | `scanName` |
|
||||
| Vendor | `scanVendor` |
|
||||
| SSID | `scanSSID` |
|
||||
| Device Type | `scanType` |
|
||||
| Parent MAC | `scanParentMAC` |
|
||||
| Parent Port | `scanParentPort` |
|
||||
| Site | `scanSite` |
|
||||
| VLAN | `scanVlan` |
|
||||
|
||||
### Fake MAC generation
|
||||
|
||||
Enable **Generate Fake MAC** when the API does not expose MAC addresses (e.g. remote ICMP scans, IP-only inventory systems). A deterministic fake MAC is derived from the IP address using the `fa:ce:` prefix. Requires a valid IP mapping.
|
||||
|
||||
> [!WARNING]
|
||||
> Fake MACs can cause inconsistencies if IP addresses change between scans. Static IPs are strongly recommended when using this feature.
|
||||
|
||||
### Example: OPNsense Dnsmasq API
|
||||
|
||||
**Response:**
|
||||
```json
|
||||
{
|
||||
"rows": [
|
||||
{
|
||||
"hwaddr": "94:18:65:de:22:01",
|
||||
"address": "192.168.1.2",
|
||||
"hostname": "Orbi-Base-Station",
|
||||
"mac_info": "NETGEAR"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
**Configuration:**
|
||||
```
|
||||
Name: OPNsense DHCP
|
||||
URL: https://firewall/api/dnsmasq/leases/search
|
||||
Method: GET
|
||||
Auth Type: basic
|
||||
Username: admin
|
||||
Device List Path: rows
|
||||
MAC Address: hwaddr
|
||||
IP Address: address
|
||||
Device Name: hostname
|
||||
Vendor: mac_info
|
||||
```
|
||||
|
||||
### Notes
|
||||
|
||||
- Version: 1.0.0
|
||||
- Author: `jokob-sk`
|
||||
- Records with missing or invalid MAC addresses are skipped unless **Generate Fake MAC** is enabled
|
||||
- Each import definition executes independently; failed imports do not block others
|
||||
1459
front/plugins/rest_import/config.json
Normal file
1459
front/plugins/rest_import/config.json
Normal file
File diff suppressed because it is too large
Load Diff
294
front/plugins/rest_import/rest_import.py
Normal file
294
front/plugins/rest_import/rest_import.py
Normal file
@@ -0,0 +1,294 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import requests
|
||||
from pytz import timezone
|
||||
|
||||
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from plugin_helper import Plugin_Objects, decode_settings_base64, normalize_mac # noqa: E402
|
||||
from logger import mylog, Logger # noqa: E402
|
||||
from helper import get_setting_value # noqa: E402
|
||||
from const import logPath # noqa: E402
|
||||
from utils.crypto_utils import string_to_fake_mac # noqa: E402
|
||||
import conf # noqa: E402
|
||||
|
||||
conf.tz = timezone(get_setting_value('TIMEZONE'))
|
||||
Logger(get_setting_value('LOG_LEVEL'))
|
||||
|
||||
pluginName = 'RSTIMPRT'
|
||||
|
||||
LOG_PATH = logPath + '/plugins'
|
||||
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
||||
|
||||
VALID_METHODS = ('GET', 'POST')
|
||||
VALID_AUTH_TYPES = ('none', 'basic', 'bearer')
|
||||
|
||||
# Valid normalized MAC: exactly xx:xx:xx:xx:xx:xx with hex digits
|
||||
_MAC_RE = re.compile(r'^([0-9a-f]{2}:){5}[0-9a-f]{2}$')
|
||||
|
||||
# Ordered scan field keys as they appear in add_object() positional slots
|
||||
SCAN_FIELD_SLOTS = [
|
||||
'RSTIMPRT_scanName', # watched1
|
||||
'RSTIMPRT_scanVendor', # watched2
|
||||
'RSTIMPRT_scanSSID', # watched3
|
||||
'RSTIMPRT_scanType', # watched4
|
||||
'RSTIMPRT_scanParentMAC', # helpVal1
|
||||
'RSTIMPRT_scanParentPort', # helpVal2
|
||||
'RSTIMPRT_scanSite', # helpVal3
|
||||
'RSTIMPRT_scanVlan', # helpVal4
|
||||
]
|
||||
|
||||
|
||||
def main():
|
||||
mylog('verbose', [f'[{pluginName}] In script'])
|
||||
|
||||
import_configs = get_setting_value('RSTIMPRT_imports')
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
|
||||
if not import_configs:
|
||||
mylog('none', [f'[{pluginName}] No import definitions configured'])
|
||||
plugin_objects.write_result_file()
|
||||
return 0
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Import definitions: {len(import_configs)}'])
|
||||
|
||||
for raw_config in import_configs:
|
||||
cfg = decode_settings_base64(raw_config)
|
||||
process_import(cfg, plugin_objects)
|
||||
|
||||
plugin_objects.write_result_file()
|
||||
return 0
|
||||
|
||||
|
||||
def process_import(cfg, plugin_objects):
|
||||
name = cfg.get('RSTIMPRT_name', 'Unnamed')
|
||||
url = cfg.get('RSTIMPRT_url', '').strip()
|
||||
method = cfg.get('RSTIMPRT_method', 'GET').strip().upper()
|
||||
verify_ssl = bool(cfg.get('RSTIMPRT_verify_ssl', True))
|
||||
auth_type = cfg.get('RSTIMPRT_auth_type', 'none').strip().lower()
|
||||
username = cfg.get('RSTIMPRT_username', '')
|
||||
password = cfg.get('RSTIMPRT_password', '')
|
||||
bearer_token = cfg.get('RSTIMPRT_bearer_token', '')
|
||||
raw_headers = cfg.get('RSTIMPRT_headers', '')
|
||||
post_body = cfg.get('RSTIMPRT_post_body', '')
|
||||
device_path = cfg.get('RSTIMPRT_device_path', '').strip()
|
||||
mac_field = cfg.get('RSTIMPRT_scanMac', '').strip()
|
||||
ip_field = cfg.get('RSTIMPRT_scanLastIP', '').strip()
|
||||
fake_mac = bool(cfg.get('RSTIMPRT_fake_mac', False))
|
||||
|
||||
mylog('none', [f'[{pluginName}] {name}'])
|
||||
|
||||
if not url:
|
||||
mylog('none', [f'[{pluginName}] Skipping "{name}" - URL is empty'])
|
||||
return
|
||||
|
||||
if method not in VALID_METHODS:
|
||||
mylog('none', [f'[{pluginName}] Skipping "{name}" - invalid method "{method}"'])
|
||||
return
|
||||
|
||||
if auth_type not in VALID_AUTH_TYPES:
|
||||
mylog('none', [f'[{pluginName}] Skipping "{name}" - invalid auth_type "{auth_type}"'])
|
||||
return
|
||||
|
||||
headers = build_headers(raw_headers)
|
||||
auth = build_auth(auth_type, username, password, bearer_token, headers)
|
||||
|
||||
response = make_request(name, url, method, verify_ssl, auth, headers, post_body)
|
||||
if response is None:
|
||||
return
|
||||
|
||||
try:
|
||||
data = response.json()
|
||||
except ValueError:
|
||||
mylog('none', [f'[{pluginName}] Invalid JSON response from "{name}"'])
|
||||
return
|
||||
|
||||
records = resolve_path(name, data, device_path)
|
||||
if records is None:
|
||||
return
|
||||
|
||||
imported = 0
|
||||
skipped = 0
|
||||
|
||||
for idx, record in enumerate(records):
|
||||
result = map_record(name, idx, record, cfg, mac_field, ip_field, fake_mac)
|
||||
if result is None:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId = result['mac'],
|
||||
secondaryId = result['ip'],
|
||||
watched1 = result['name'],
|
||||
watched2 = result['vendor'],
|
||||
watched3 = result['ssid'],
|
||||
watched4 = result['dev_type'],
|
||||
extra = '',
|
||||
foreignKey = result['mac'],
|
||||
helpVal1 = result['parent_mac'],
|
||||
helpVal2 = result['parent_port'],
|
||||
helpVal3 = result['site'],
|
||||
helpVal4 = result['vlan'],
|
||||
)
|
||||
imported += 1
|
||||
|
||||
mylog('none', [f'[{pluginName}] Retrieved {len(records)} records'])
|
||||
mylog('none', [f'[{pluginName}] Imported {imported} devices'])
|
||||
if skipped:
|
||||
mylog('none', [f'[{pluginName}] Skipped {skipped} devices'])
|
||||
|
||||
|
||||
def build_headers(raw_headers_str):
|
||||
headers = {}
|
||||
if not raw_headers_str:
|
||||
return headers
|
||||
for line in raw_headers_str.splitlines():
|
||||
line = line.strip()
|
||||
if not line or ':' not in line:
|
||||
continue
|
||||
key, _, value = line.partition(':')
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
if key:
|
||||
headers[key] = value
|
||||
return headers
|
||||
|
||||
|
||||
def build_auth(auth_type, username, password, bearer_token, headers):
|
||||
if auth_type == 'basic' and username:
|
||||
return (username, password)
|
||||
if auth_type == 'bearer' and bearer_token:
|
||||
headers['Authorization'] = f'Bearer {bearer_token}'
|
||||
return None
|
||||
|
||||
|
||||
def make_request(name, url, method, verify_ssl, auth, headers, post_body):
|
||||
try:
|
||||
body_data = None
|
||||
if method == 'POST' and post_body:
|
||||
body_data = post_body
|
||||
|
||||
response = requests.request(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
data=body_data,
|
||||
verify=verify_ssl,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] {name} HTTP {response.status_code}'])
|
||||
|
||||
if response.status_code == 401:
|
||||
mylog('none', [f'[{pluginName}] Authentication failed for "{name}"'])
|
||||
return None
|
||||
|
||||
if not response.ok:
|
||||
mylog('none', [f'[{pluginName}] HTTP error {response.status_code} for "{name}"'])
|
||||
return None
|
||||
|
||||
return response
|
||||
|
||||
except requests.exceptions.SSLError:
|
||||
mylog('none', [f'[{pluginName}] SSL error for "{name}" - try disabling Verify SSL'])
|
||||
return None
|
||||
except requests.exceptions.ConnectionError:
|
||||
mylog('none', [f'[{pluginName}] Connection error for "{name}" - check URL'])
|
||||
return None
|
||||
except requests.exceptions.Timeout:
|
||||
mylog('none', [f'[{pluginName}] Request timed out for "{name}"'])
|
||||
return None
|
||||
except requests.exceptions.RequestException as e:
|
||||
mylog('none', [f'[{pluginName}] Request error for "{name}": {e}'])
|
||||
return None
|
||||
|
||||
|
||||
def resolve_path(name, data, path):
|
||||
if not path:
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
mylog('none', [f'[{pluginName}] Device path not configured and response is not an array for "{name}"'])
|
||||
return None
|
||||
|
||||
node = data
|
||||
for key in path.split('.'):
|
||||
if not isinstance(node, dict) or key not in node:
|
||||
mylog('none', [f'[{pluginName}] Device path not found: {path} for "{name}"'])
|
||||
return None
|
||||
node = node[key]
|
||||
|
||||
if not isinstance(node, list):
|
||||
mylog('none', [f'[{pluginName}] Device path "{path}" does not point to an array for "{name}"'])
|
||||
return None
|
||||
|
||||
return node
|
||||
|
||||
|
||||
def map_record(name, idx, record, cfg, mac_field, ip_field, fake_mac):
|
||||
raw_mac = record.get(mac_field, '') if mac_field else ''
|
||||
raw_ip = record.get(ip_field, '') if ip_field else ''
|
||||
|
||||
mac = _resolve_mac(name, idx, raw_mac, raw_ip, fake_mac)
|
||||
if mac is None:
|
||||
return None
|
||||
|
||||
ip = str(raw_ip).strip() if raw_ip else ''
|
||||
|
||||
return {
|
||||
'mac': mac,
|
||||
'ip': ip,
|
||||
'name': _get_field(record, cfg, 'RSTIMPRT_scanName'),
|
||||
'vendor': _get_field(record, cfg, 'RSTIMPRT_scanVendor'),
|
||||
'ssid': _get_field(record, cfg, 'RSTIMPRT_scanSSID'),
|
||||
'dev_type': _get_field(record, cfg, 'RSTIMPRT_scanType'),
|
||||
'parent_mac': _get_field(record, cfg, 'RSTIMPRT_scanParentMAC'),
|
||||
'parent_port': _get_field(record, cfg, 'RSTIMPRT_scanParentPort'),
|
||||
'site': _get_field(record, cfg, 'RSTIMPRT_scanSite'),
|
||||
'vlan': _get_field(record, cfg, 'RSTIMPRT_scanVlan'),
|
||||
}
|
||||
|
||||
|
||||
def _resolve_mac(name, idx, raw_mac, raw_ip, fake_mac):
|
||||
if raw_mac:
|
||||
mac = validate_mac(str(raw_mac))
|
||||
if mac:
|
||||
return mac
|
||||
|
||||
if fake_mac:
|
||||
ip = str(raw_ip).strip() if raw_ip else ''
|
||||
if ip:
|
||||
return string_to_fake_mac(ip)
|
||||
mylog('none', [f'[{pluginName}] Skipped record {idx} - no IP for fake MAC generation for "{name}"'])
|
||||
return None
|
||||
|
||||
mylog('none', [f'[{pluginName}] Skipped record {idx} - invalid MAC address for "{name}"'])
|
||||
return None
|
||||
|
||||
|
||||
def _get_field(record, cfg, config_key):
|
||||
api_field = cfg.get(config_key, '').strip()
|
||||
if not api_field:
|
||||
return ''
|
||||
value = record.get(api_field, '')
|
||||
return str(value).strip() if value is not None else ''
|
||||
|
||||
|
||||
def validate_mac(raw):
|
||||
if not raw:
|
||||
return None
|
||||
mac = normalize_mac(raw)
|
||||
if not mac:
|
||||
return None
|
||||
# normalize_mac reformats any string — verify result is actually a MAC
|
||||
if not _MAC_RE.match(mac.lower()):
|
||||
return None
|
||||
return mac
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
237
test/plugins/test_rest_import.py
Normal file
237
test/plugins/test_rest_import.py
Normal file
@@ -0,0 +1,237 @@
|
||||
"""
|
||||
Tests for the REST Import plugin (rest_import.py).
|
||||
|
||||
Module-level side effects (get_setting_value, Logger, Plugin_Objects,
|
||||
conf.tz) are patched before import to prevent live config reads,
|
||||
log file creation, or network calls during tests.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path setup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
_SERVER = os.path.join(_ROOT, 'server')
|
||||
_PLUGIN_DIR = os.path.join(_ROOT, 'front', 'plugins', 'rest_import')
|
||||
|
||||
for _p in [_ROOT, _SERVER, _PLUGIN_DIR, os.path.join(_ROOT, 'front', 'plugins')]:
|
||||
if _p not in sys.path:
|
||||
sys.path.insert(0, _p)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Import module with side effects patched
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
with patch('helper.get_setting_value', return_value='UTC'), \
|
||||
patch('logger.Logger'), \
|
||||
patch('plugin_helper.Plugin_Objects'):
|
||||
import rest_import # noqa: E402
|
||||
|
||||
from utils.crypto_utils import string_to_fake_mac # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_headers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildHeaders:
|
||||
def test_empty_string_returns_empty_dict(self):
|
||||
assert rest_import.build_headers('') == {}
|
||||
|
||||
def test_none_returns_empty_dict(self):
|
||||
assert rest_import.build_headers(None) == {}
|
||||
|
||||
def test_single_header(self):
|
||||
result = rest_import.build_headers('Accept: application/json')
|
||||
assert result == {'Accept': 'application/json'}
|
||||
|
||||
def test_multiple_headers(self):
|
||||
raw = 'Accept: application/json\nX-API-Key: abc123'
|
||||
result = rest_import.build_headers(raw)
|
||||
assert result == {'Accept': 'application/json', 'X-API-Key': 'abc123'}
|
||||
|
||||
def test_value_with_colon_preserved(self):
|
||||
result = rest_import.build_headers('Authorization: Bearer tok:en')
|
||||
assert result == {'Authorization': 'Bearer tok:en'}
|
||||
|
||||
def test_malformed_line_without_colon_is_skipped(self):
|
||||
result = rest_import.build_headers('nocolonhere\nValid: yes')
|
||||
assert result == {'Valid': 'yes'}
|
||||
|
||||
def test_blank_lines_skipped(self):
|
||||
result = rest_import.build_headers('\n \nAccept: text/plain\n')
|
||||
assert result == {'Accept': 'text/plain'}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# resolve_path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestResolvePath:
|
||||
def test_empty_path_with_list_response(self):
|
||||
data = [{'mac': 'AA:BB:CC:DD:EE:FF'}]
|
||||
result = rest_import.resolve_path('test', data, '')
|
||||
assert result == data
|
||||
|
||||
def test_flat_key(self):
|
||||
data = {'rows': [{'mac': 'AA:BB:CC:DD:EE:FF'}]}
|
||||
result = rest_import.resolve_path('test', data, 'rows')
|
||||
assert result == data['rows']
|
||||
|
||||
def test_dot_notation(self):
|
||||
data = {'data': {'devices': [{'mac': 'AA:BB:CC:DD:EE:FF'}]}}
|
||||
result = rest_import.resolve_path('test', data, 'data.devices')
|
||||
assert result == data['data']['devices']
|
||||
|
||||
def test_missing_path_returns_none(self):
|
||||
data = {'rows': []}
|
||||
result = rest_import.resolve_path('test', data, 'missing.key')
|
||||
assert result is None
|
||||
|
||||
def test_path_points_to_non_list_returns_none(self):
|
||||
data = {'rows': {'not': 'a list'}}
|
||||
result = rest_import.resolve_path('test', data, 'rows')
|
||||
assert result is None
|
||||
|
||||
def test_empty_path_non_list_response_returns_none(self):
|
||||
result = rest_import.resolve_path('test', {'key': 'value'}, '')
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# validate_mac
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestValidateMac:
|
||||
def test_colon_separated_valid(self):
|
||||
mac = rest_import.validate_mac('AA:BB:CC:DD:EE:FF')
|
||||
assert mac is not None
|
||||
|
||||
def test_dash_separated_valid(self):
|
||||
mac = rest_import.validate_mac('AA-BB-CC-DD-EE-FF')
|
||||
assert mac is not None
|
||||
|
||||
def test_bare_hex_valid(self):
|
||||
mac = rest_import.validate_mac('AABBCCDDEEFF')
|
||||
assert mac is not None
|
||||
|
||||
def test_invalid_string_returns_none(self):
|
||||
assert rest_import.validate_mac('not-a-mac') is None
|
||||
|
||||
def test_empty_returns_none(self):
|
||||
assert rest_import.validate_mac('') is None
|
||||
|
||||
def test_placeholder_unknown_returns_none(self):
|
||||
assert rest_import.validate_mac('unknown') is None
|
||||
|
||||
def test_placeholder_star_returns_none(self):
|
||||
assert rest_import.validate_mac('*') is None
|
||||
|
||||
def test_null_string_returns_none(self):
|
||||
assert rest_import.validate_mac('null') is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# map_record
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMapRecord:
|
||||
def _cfg(self, mac_field='hwaddr', ip_field='address', fake_mac=False, **extra):
|
||||
cfg = {
|
||||
'RSTIMPRT_scanMac': mac_field,
|
||||
'RSTIMPRT_scanLastIP': ip_field,
|
||||
'RSTIMPRT_scanName': 'hostname',
|
||||
'RSTIMPRT_scanVendor': '',
|
||||
'RSTIMPRT_scanSSID': '',
|
||||
'RSTIMPRT_scanType': '',
|
||||
'RSTIMPRT_scanParentMAC': '',
|
||||
'RSTIMPRT_scanParentPort': '',
|
||||
'RSTIMPRT_scanSite': '',
|
||||
'RSTIMPRT_scanVlan': '',
|
||||
'RSTIMPRT_fake_mac': fake_mac,
|
||||
}
|
||||
cfg.update(extra)
|
||||
return cfg
|
||||
|
||||
def test_valid_record_returns_dict(self):
|
||||
record = {'hwaddr': 'AA:BB:CC:DD:EE:FF', 'address': '192.168.1.10', 'hostname': 'mydevice'}
|
||||
cfg = self._cfg()
|
||||
result = rest_import.map_record('test', 0, record, cfg, 'hwaddr', 'address', False)
|
||||
assert result is not None
|
||||
assert result['ip'] == '192.168.1.10'
|
||||
assert result['name'] == 'mydevice'
|
||||
|
||||
def test_invalid_mac_without_fake_mac_returns_none(self):
|
||||
record = {'hwaddr': 'not-a-mac', 'address': '192.168.1.10'}
|
||||
cfg = self._cfg()
|
||||
result = rest_import.map_record('test', 0, record, cfg, 'hwaddr', 'address', False)
|
||||
assert result is None
|
||||
|
||||
def test_missing_mac_with_fake_mac_enabled_generates_mac(self):
|
||||
record = {'address': '192.168.1.50', 'hostname': 'server01'}
|
||||
cfg = self._cfg(mac_field='', fake_mac=True)
|
||||
result = rest_import.map_record('test', 0, record, cfg, '', 'address', True)
|
||||
assert result is not None
|
||||
expected_mac = string_to_fake_mac('192.168.1.50')
|
||||
assert result['mac'] == expected_mac
|
||||
|
||||
def test_fake_mac_enabled_but_no_ip_returns_none(self):
|
||||
record = {'hostname': 'noip'}
|
||||
cfg = self._cfg(mac_field='', ip_field='', fake_mac=True)
|
||||
result = rest_import.map_record('test', 0, record, cfg, '', '', True)
|
||||
assert result is None
|
||||
|
||||
def test_optional_fields_default_to_empty_when_not_configured(self):
|
||||
record = {'hwaddr': 'AA:BB:CC:DD:EE:FF', 'address': '10.0.0.1'}
|
||||
cfg = self._cfg()
|
||||
result = rest_import.map_record('test', 0, record, cfg, 'hwaddr', 'address', False)
|
||||
assert result is not None
|
||||
assert result['vendor'] == ''
|
||||
assert result['ssid'] == ''
|
||||
assert result['vlan'] == ''
|
||||
|
||||
def test_valid_mac_takes_precedence_over_fake_mac_setting(self):
|
||||
record = {'hwaddr': 'AA:BB:CC:DD:EE:FF', 'address': '192.168.1.10'}
|
||||
cfg = self._cfg(fake_mac=True)
|
||||
result = rest_import.map_record('test', 0, record, cfg, 'hwaddr', 'address', True)
|
||||
assert result is not None
|
||||
# Should use the real MAC, not a fake one
|
||||
assert not result['mac'].startswith('fa:ce:')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_auth
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildAuth:
|
||||
def test_none_auth_returns_none(self):
|
||||
headers = {}
|
||||
auth = rest_import.build_auth('none', 'user', 'pass', 'token', headers)
|
||||
assert auth is None
|
||||
|
||||
def test_basic_auth_returns_tuple(self):
|
||||
headers = {}
|
||||
auth = rest_import.build_auth('basic', 'myuser', 'mypass', '', headers)
|
||||
assert auth == ('myuser', 'mypass')
|
||||
|
||||
def test_bearer_auth_injects_header(self):
|
||||
headers = {}
|
||||
auth = rest_import.build_auth('bearer', '', '', 'mytoken', headers)
|
||||
assert auth is None
|
||||
assert headers.get('Authorization') == 'Bearer mytoken'
|
||||
|
||||
def test_basic_auth_without_username_returns_none(self):
|
||||
headers = {}
|
||||
auth = rest_import.build_auth('basic', '', 'pass', '', headers)
|
||||
assert auth is None
|
||||
|
||||
def test_bearer_without_token_does_not_inject_header(self):
|
||||
headers = {}
|
||||
rest_import.build_auth('bearer', '', '', '', headers)
|
||||
assert 'Authorization' not in headers
|
||||
Reference in New Issue
Block a user