mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-06-21 14:49:59 -04:00
- Implemented the REST Import plugin (rest_import.py) to handle importing data from REST APIs. - Added functionality for configurable HTTP methods, authentication types, and custom headers. - Included error handling for various HTTP response statuses and connection issues. - Created unit tests for the plugin covering header building, path resolution, MAC validation, record mapping, and authentication methods. - Ensured that module-level side effects are patched during tests to prevent live interactions.
295 lines
9.3 KiB
Python
295 lines
9.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import requests
|
|
from pytz import timezone
|
|
|
|
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
|
|
|
from plugin_helper import Plugin_Objects, decode_settings_base64, normalize_mac # noqa: E402
|
|
from logger import mylog, Logger # noqa: E402
|
|
from helper import get_setting_value # noqa: E402
|
|
from const import logPath # noqa: E402
|
|
from utils.crypto_utils import string_to_fake_mac # noqa: E402
|
|
import conf # noqa: E402
|
|
|
|
conf.tz = timezone(get_setting_value('TIMEZONE'))
|
|
Logger(get_setting_value('LOG_LEVEL'))
|
|
|
|
pluginName = 'RSTIMPRT'
|
|
|
|
LOG_PATH = logPath + '/plugins'
|
|
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
|
|
|
VALID_METHODS = ('GET', 'POST')
|
|
VALID_AUTH_TYPES = ('none', 'basic', 'bearer')
|
|
|
|
# Valid normalized MAC: exactly xx:xx:xx:xx:xx:xx with hex digits
|
|
_MAC_RE = re.compile(r'^([0-9a-f]{2}:){5}[0-9a-f]{2}$')
|
|
|
|
# Ordered scan field keys as they appear in add_object() positional slots
|
|
SCAN_FIELD_SLOTS = [
|
|
'RSTIMPRT_scanName', # watched1
|
|
'RSTIMPRT_scanVendor', # watched2
|
|
'RSTIMPRT_scanSSID', # watched3
|
|
'RSTIMPRT_scanType', # watched4
|
|
'RSTIMPRT_scanParentMAC', # helpVal1
|
|
'RSTIMPRT_scanParentPort', # helpVal2
|
|
'RSTIMPRT_scanSite', # helpVal3
|
|
'RSTIMPRT_scanVlan', # helpVal4
|
|
]
|
|
|
|
|
|
def main():
|
|
mylog('verbose', [f'[{pluginName}] In script'])
|
|
|
|
import_configs = get_setting_value('RSTIMPRT_imports')
|
|
plugin_objects = Plugin_Objects(RESULT_FILE)
|
|
|
|
if not import_configs:
|
|
mylog('none', [f'[{pluginName}] No import definitions configured'])
|
|
plugin_objects.write_result_file()
|
|
return 0
|
|
|
|
mylog('verbose', [f'[{pluginName}] Import definitions: {len(import_configs)}'])
|
|
|
|
for raw_config in import_configs:
|
|
cfg = decode_settings_base64(raw_config)
|
|
process_import(cfg, plugin_objects)
|
|
|
|
plugin_objects.write_result_file()
|
|
return 0
|
|
|
|
|
|
def process_import(cfg, plugin_objects):
|
|
name = cfg.get('RSTIMPRT_name', 'Unnamed')
|
|
url = cfg.get('RSTIMPRT_url', '').strip()
|
|
method = cfg.get('RSTIMPRT_method', 'GET').strip().upper()
|
|
verify_ssl = bool(cfg.get('RSTIMPRT_verify_ssl', True))
|
|
auth_type = cfg.get('RSTIMPRT_auth_type', 'none').strip().lower()
|
|
username = cfg.get('RSTIMPRT_username', '')
|
|
password = cfg.get('RSTIMPRT_password', '')
|
|
bearer_token = cfg.get('RSTIMPRT_bearer_token', '')
|
|
raw_headers = cfg.get('RSTIMPRT_headers', '')
|
|
post_body = cfg.get('RSTIMPRT_post_body', '')
|
|
device_path = cfg.get('RSTIMPRT_device_path', '').strip()
|
|
mac_field = cfg.get('RSTIMPRT_scanMac', '').strip()
|
|
ip_field = cfg.get('RSTIMPRT_scanLastIP', '').strip()
|
|
fake_mac = bool(cfg.get('RSTIMPRT_fake_mac', False))
|
|
|
|
mylog('none', [f'[{pluginName}] {name}'])
|
|
|
|
if not url:
|
|
mylog('none', [f'[{pluginName}] Skipping "{name}" - URL is empty'])
|
|
return
|
|
|
|
if method not in VALID_METHODS:
|
|
mylog('none', [f'[{pluginName}] Skipping "{name}" - invalid method "{method}"'])
|
|
return
|
|
|
|
if auth_type not in VALID_AUTH_TYPES:
|
|
mylog('none', [f'[{pluginName}] Skipping "{name}" - invalid auth_type "{auth_type}"'])
|
|
return
|
|
|
|
headers = build_headers(raw_headers)
|
|
auth = build_auth(auth_type, username, password, bearer_token, headers)
|
|
|
|
response = make_request(name, url, method, verify_ssl, auth, headers, post_body)
|
|
if response is None:
|
|
return
|
|
|
|
try:
|
|
data = response.json()
|
|
except ValueError:
|
|
mylog('none', [f'[{pluginName}] Invalid JSON response from "{name}"'])
|
|
return
|
|
|
|
records = resolve_path(name, data, device_path)
|
|
if records is None:
|
|
return
|
|
|
|
imported = 0
|
|
skipped = 0
|
|
|
|
for idx, record in enumerate(records):
|
|
result = map_record(name, idx, record, cfg, mac_field, ip_field, fake_mac)
|
|
if result is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
plugin_objects.add_object(
|
|
primaryId = result['mac'],
|
|
secondaryId = result['ip'],
|
|
watched1 = result['name'],
|
|
watched2 = result['vendor'],
|
|
watched3 = result['ssid'],
|
|
watched4 = result['dev_type'],
|
|
extra = '',
|
|
foreignKey = result['mac'],
|
|
helpVal1 = result['parent_mac'],
|
|
helpVal2 = result['parent_port'],
|
|
helpVal3 = result['site'],
|
|
helpVal4 = result['vlan'],
|
|
)
|
|
imported += 1
|
|
|
|
mylog('none', [f'[{pluginName}] Retrieved {len(records)} records'])
|
|
mylog('none', [f'[{pluginName}] Imported {imported} devices'])
|
|
if skipped:
|
|
mylog('none', [f'[{pluginName}] Skipped {skipped} devices'])
|
|
|
|
|
|
def build_headers(raw_headers_str):
|
|
headers = {}
|
|
if not raw_headers_str:
|
|
return headers
|
|
for line in raw_headers_str.splitlines():
|
|
line = line.strip()
|
|
if not line or ':' not in line:
|
|
continue
|
|
key, _, value = line.partition(':')
|
|
key = key.strip()
|
|
value = value.strip()
|
|
if key:
|
|
headers[key] = value
|
|
return headers
|
|
|
|
|
|
def build_auth(auth_type, username, password, bearer_token, headers):
|
|
if auth_type == 'basic' and username:
|
|
return (username, password)
|
|
if auth_type == 'bearer' and bearer_token:
|
|
headers['Authorization'] = f'Bearer {bearer_token}'
|
|
return None
|
|
|
|
|
|
def make_request(name, url, method, verify_ssl, auth, headers, post_body):
|
|
try:
|
|
body_data = None
|
|
if method == 'POST' and post_body:
|
|
body_data = post_body
|
|
|
|
response = requests.request(
|
|
method=method,
|
|
url=url,
|
|
headers=headers,
|
|
auth=auth,
|
|
data=body_data,
|
|
verify=verify_ssl,
|
|
timeout=30,
|
|
)
|
|
|
|
mylog('verbose', [f'[{pluginName}] {name} HTTP {response.status_code}'])
|
|
|
|
if response.status_code == 401:
|
|
mylog('none', [f'[{pluginName}] Authentication failed for "{name}"'])
|
|
return None
|
|
|
|
if not response.ok:
|
|
mylog('none', [f'[{pluginName}] HTTP error {response.status_code} for "{name}"'])
|
|
return None
|
|
|
|
return response
|
|
|
|
except requests.exceptions.SSLError:
|
|
mylog('none', [f'[{pluginName}] SSL error for "{name}" - try disabling Verify SSL'])
|
|
return None
|
|
except requests.exceptions.ConnectionError:
|
|
mylog('none', [f'[{pluginName}] Connection error for "{name}" - check URL'])
|
|
return None
|
|
except requests.exceptions.Timeout:
|
|
mylog('none', [f'[{pluginName}] Request timed out for "{name}"'])
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
mylog('none', [f'[{pluginName}] Request error for "{name}": {e}'])
|
|
return None
|
|
|
|
|
|
def resolve_path(name, data, path):
|
|
if not path:
|
|
if isinstance(data, list):
|
|
return data
|
|
mylog('none', [f'[{pluginName}] Device path not configured and response is not an array for "{name}"'])
|
|
return None
|
|
|
|
node = data
|
|
for key in path.split('.'):
|
|
if not isinstance(node, dict) or key not in node:
|
|
mylog('none', [f'[{pluginName}] Device path not found: {path} for "{name}"'])
|
|
return None
|
|
node = node[key]
|
|
|
|
if not isinstance(node, list):
|
|
mylog('none', [f'[{pluginName}] Device path "{path}" does not point to an array for "{name}"'])
|
|
return None
|
|
|
|
return node
|
|
|
|
|
|
def map_record(name, idx, record, cfg, mac_field, ip_field, fake_mac):
|
|
raw_mac = record.get(mac_field, '') if mac_field else ''
|
|
raw_ip = record.get(ip_field, '') if ip_field else ''
|
|
|
|
mac = _resolve_mac(name, idx, raw_mac, raw_ip, fake_mac)
|
|
if mac is None:
|
|
return None
|
|
|
|
ip = str(raw_ip).strip() if raw_ip else ''
|
|
|
|
return {
|
|
'mac': mac,
|
|
'ip': ip,
|
|
'name': _get_field(record, cfg, 'RSTIMPRT_scanName'),
|
|
'vendor': _get_field(record, cfg, 'RSTIMPRT_scanVendor'),
|
|
'ssid': _get_field(record, cfg, 'RSTIMPRT_scanSSID'),
|
|
'dev_type': _get_field(record, cfg, 'RSTIMPRT_scanType'),
|
|
'parent_mac': _get_field(record, cfg, 'RSTIMPRT_scanParentMAC'),
|
|
'parent_port': _get_field(record, cfg, 'RSTIMPRT_scanParentPort'),
|
|
'site': _get_field(record, cfg, 'RSTIMPRT_scanSite'),
|
|
'vlan': _get_field(record, cfg, 'RSTIMPRT_scanVlan'),
|
|
}
|
|
|
|
|
|
def _resolve_mac(name, idx, raw_mac, raw_ip, fake_mac):
|
|
if raw_mac:
|
|
mac = validate_mac(str(raw_mac))
|
|
if mac:
|
|
return mac
|
|
|
|
if fake_mac:
|
|
ip = str(raw_ip).strip() if raw_ip else ''
|
|
if ip:
|
|
return string_to_fake_mac(ip)
|
|
mylog('none', [f'[{pluginName}] Skipped record {idx} - no IP for fake MAC generation for "{name}"'])
|
|
return None
|
|
|
|
mylog('none', [f'[{pluginName}] Skipped record {idx} - invalid MAC address for "{name}"'])
|
|
return None
|
|
|
|
|
|
def _get_field(record, cfg, config_key):
|
|
api_field = cfg.get(config_key, '').strip()
|
|
if not api_field:
|
|
return ''
|
|
value = record.get(api_field, '')
|
|
return str(value).strip() if value is not None else ''
|
|
|
|
|
|
def validate_mac(raw):
|
|
if not raw:
|
|
return None
|
|
mac = normalize_mac(raw)
|
|
if not mac:
|
|
return None
|
|
# normalize_mac reformats any string — verify result is actually a MAC
|
|
if not _MAC_RE.match(mac.lower()):
|
|
return None
|
|
return mac
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|