diff --git a/front/plugins/omada_sdn_openapi/script.py b/front/plugins/omada_sdn_openapi/script.py index 0175933d..d3d6e718 100755 --- a/front/plugins/omada_sdn_openapi/script.py +++ b/front/plugins/omada_sdn_openapi/script.py @@ -9,7 +9,7 @@ However, I found that approach somewhat unstable, so I decided to give it a shot and create a new plugin with the goal of providing same, but more reliable results. -Please note that this is my first plugin, and I’m not a Python developer. +Please note that this is my first plugin, and I'm not a Python developer. Any comments, bug fixes, or contributions are greatly appreciated. Author: https://github.com/xfilo @@ -18,6 +18,7 @@ Author: https://github.com/xfilo __author__ = "xfilo" __version__ = 0.1 # Initial version __version__ = 0.2 # Rephrased error messages, improved logging and code logic +__version__ = 0.3 # Refactored data collection into a class, improved code clarity with comments import os import sys @@ -85,6 +86,7 @@ class OmadaHelper: if not ms or not isinstance(ms, (str, int)): raise ValueError(f"Value '{ms}' is not a valid timestamp") + # Convert UTC millisecond timestamp to datetime in NetAlertX's timezone timestamp = ms / 1000 tz = pytz.timezone("UTC") utc_datetime = datetime.fromtimestamp(timestamp, tz=tz) @@ -111,6 +113,7 @@ class OmadaHelper: if not mac or not isinstance(mac, str) or mac is None: raise Exception(f"Value '{mac}' is not a valid MAC address") + # Replace - with : in a MAC address and make it lowercase result = mac.lower().replace("-", ":") msg = f"Normalized MAC address from {mac} to {result}" OmadaHelper.debug(msg) @@ -128,6 +131,8 @@ class OmadaHelper: raise Exception(f"Expected a list, but got '{type(input_data)}'.") OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site: {site_name}") + + # The default return structure for one device/client default_entry = { "mac_address": "", "ip_address": "", @@ -141,8 +146,11 @@ class OmadaHelper: } result = [] + + # Loop through each device/client for data in input_data: + # Normalize and verify MAC address mac = OmadaHelper.normalize_mac(data.get("mac")) if not isinstance(mac, dict) or mac.get("response_type") != "success": continue @@ -152,23 +160,29 @@ class OmadaHelper: OmadaHelper.debug(f"Skipping {input_type}, not a MAC address: {mac}") continue + # Assigning mandatory return values entry = default_entry.copy() entry["mac_address"] = mac - entry["ip_address"] = data.get("ip", "") - entry["name"] = data.get("name", "") + entry["ip_address"] = data.get("ip") + entry["name"] = data.get("name") + # Assign the last datetime the device/client was seen on the network last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone) entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get("response_type") == "success" else "" + # Applicable only for DEVICE if input_type == "device": entry["device_type"] = data.get("type") + # If it's not a gateway try to assign parent node MAC if data.get("type", "") != "gateway": parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac")) entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" + # Applicable only for CLIENT if input_type == "client": entry["vlan_id"] = data.get("vid") entry["device_type"] = data.get("deviceType") + # Try to assign parent node MAC and PORT/SSID to the CLIENT if data.get("connectDevType", "") == "gateway": parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac")) entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" @@ -182,18 +196,17 @@ class OmadaHelper: entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" entry["parent_node_ssid"] = data.get("ssid", "") + # Add the entry to the result result.append(entry) OmadaHelper.debug(f"Processed {input_type} entry: {entry}") msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}" OmadaHelper.minimal(msg) - final_result = OmadaHelper.response("success", msg, result) + return OmadaHelper.response("success", msg, result) except Exception as ex: msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}" OmadaHelper.verbose(msg) - final_result = OmadaHelper.response("error", msg) - - return final_result + return OmadaHelper.response("error", msg) class OmadaAPI: @@ -213,14 +226,18 @@ class OmadaAPI: # Validate and set attributes for param_name, param_info in params.items(): + # Get user parameter input, or default value if any value = options.get(param_name, param_info.get("default")) + # Check if a parameter is required and if it's value is non-empty if param_info["required"] and (value is None or (param_info["type"] == str and not value)): raise ValueError(f"{param_name} is required and must be a non-empty {param_info['type'].__name__}") + # Check if a parameter has a correct datatype if not isinstance(value, param_info["type"]): raise TypeError(f"{param_name} must be of type {param_info['type'].__name__}") + # Assign the parameter to the class setattr(self, param_name, value) OmadaHelper.debug(f"Initialized option '{param_name}' with value: {value}") @@ -235,6 +252,7 @@ class OmadaAPI: def _get_headers(self, include_auth: bool = True) -> dict: """Return request headers.""" headers = {"Content-type": "application/json"} + # Add access token to header if requested and available if include_auth == True: if not self.access_token: OmadaHelper.debug("No access token available for headers") @@ -244,14 +262,18 @@ class OmadaAPI: return headers def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]: + """Make a request to an endpoint.""" time.sleep(1) # Sleep before making any request so it does not rate-limited OmadaHelper.debug(f"{method} request to endpoint: {endpoint}") url = f"{getattr(self, 'host')}{endpoint}" headers = self._get_headers(kwargs.pop('include_auth', True)) try: + # Make the request and get the response response = requests.request(method, url, headers=headers, verify=getattr(self, 'verify_ssl'), **kwargs) response.raise_for_status() data = response.json() + + # Check if the response contains an error code and determine the function response type response_type = "error" if data.get("errorCode", 0) != 0 else "success" msg = f"{method} request completed: {endpoint}" OmadaHelper.verbose(msg) @@ -280,6 +302,8 @@ class OmadaAPI: error_code = response_result.get("errorCode") access_token = response_result.get("result").get("accessToken") refresh_token = response_result.get("result").get("refreshToken") + + # Authentication is successful if there isn't a response error, and access_token and refresh_token are set if error_code == 0 and access_token and refresh_token: self.access_token = access_token self.refresh_token = refresh_token @@ -359,98 +383,116 @@ class OmadaAPI: return self.active_sites_dict -def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None: - if normalized_input_data.get("response_type", "error") != "success": - OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}") - return +class OmadaData: + @staticmethod + def create_data(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None: + """Creates plugin object from normalized input data.""" + if normalized_input_data.get("response_type", "error") != "success": + OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}") + return - response_result = normalized_input_data.get("response_result", {}) - for entry in response_result: - OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}") + # Loop through every device/client and make an plugin entry + response_result = normalized_input_data.get("response_result", {}) + for entry in response_result: + if len(entry) == 0: + OmadaHelper.minimal(f"Skipping entry, missing data.") + continue - parent_node = entry["parent_node_mac_address"] - if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]): - parent_node = "Internet" + OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}") - device_type = entry["device_type"].lower() - if device_type == "iphone": - device_type = "iPhone" - elif device_type == "pc": - device_type = "PC" - else: - device_type = device_type.capitalize() + # If the device_type is gateway, set the parent_node to Internet + device_type = entry["device_type"].lower() + parent_node = entry["parent_node_mac_address"] + if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]): + parent_node = "Internet" - plugin_objects.add_object( - primaryId=entry["mac_address"], - secondaryId=entry["ip_address"], - watched1=entry["name"], - watched2=parent_node, - watched3=entry["parent_node_port"], - watched4=entry["parent_node_ssid"], - extra=device_type, - foreignKey=entry["mac_address"], - helpVal1=entry["last_seen"], - helpVal2=entry["site_name"], - helpVal3=entry["vlan_id"], - helpVal4="null" - ) + # Some device type naming exceptions + if device_type == "iphone": + device_type = "iPhone" + elif device_type == "pc": + device_type = "PC" + else: + device_type = device_type.capitalize() + # Add the plugin object + plugin_objects.add_object( + primaryId=entry["mac_address"], + secondaryId=entry["ip_address"], + watched1=entry["name"], + watched2=parent_node, + watched3=entry["parent_node_port"], + watched4=entry["parent_node_ssid"], + extra=device_type, + foreignKey=entry["mac_address"], + helpVal1=entry["last_seen"], + helpVal2=entry["site_name"], + helpVal3=entry["vlan_id"], + helpVal4="null" + ) -def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: - omada_api = OmadaAPI(OPTIONS) + @staticmethod + def collect_data(plugin_objects: Plugin_Objects) -> Plugin_Objects: + """Collects device and client data from Omada Controller.""" + omada_api = OmadaAPI(OPTIONS) - auth_result = omada_api.authenticate() - if auth_result["response_type"] == "error": - OmadaHelper.minimal("Authentication failed, aborting data collection") - OmadaHelper.debug(f"{auth_result['response_message']}") + # Authenticate + auth_result = omada_api.authenticate() + if auth_result["response_type"] == "error": + OmadaHelper.minimal("Authentication failed, aborting data collection") + OmadaHelper.debug(f"{auth_result['response_message']}") + return plugin_objects + + # Populate sites + sites_result = omada_api.populate_sites() + if sites_result["response_type"] == "error": + OmadaHelper.minimal("Site population failed, aborting data collection") + OmadaHelper.debug(f"{sites_result['response_message']}") + return plugin_objects + + requested_sites = omada_api.requested_sites() + available_sites = omada_api.available_sites() + active_sites = omada_api.active_sites() + + OmadaHelper.verbose(f"Requested sites: {requested_sites}") + OmadaHelper.verbose(f"Available sites: {available_sites}") + OmadaHelper.verbose(f"Active sites: {active_sites}") + + OmadaHelper.minimal("Starting data collection process") + + # Loop through sites and collect data + for site_id, site_name in active_sites.items(): + OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})") + + # Collect device data + devices_response = omada_api.get_devices(site_id) + if devices_response["response_type"] != "success": + OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}") + else: + devices = devices_response["response_result"].get("result").get("data", []) + OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}") + devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE) + OmadaData.create_data(plugin_objects, devices) + + # Collect client data + clients_response = omada_api.get_clients(site_id) + if clients_response["response_type"] != "success": + OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}") + else: + clients = clients_response["response_result"].get("result").get("data", []) + OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}") + clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE) + OmadaData.create_data(plugin_objects, clients) + + OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})") + + # Complete collection and return plugin object + OmadaHelper.minimal("Completed data collection process") return plugin_objects - sites_result = omada_api.populate_sites() - if sites_result["response_type"] == "error": - OmadaHelper.minimal("Site population failed, aborting data collection") - OmadaHelper.debug(f"{sites_result['response_message']}") - return plugin_objects - - requested_sites = omada_api.requested_sites() - available_sites = omada_api.available_sites() - active_sites = omada_api.active_sites() - - OmadaHelper.verbose(f"Requested sites: {requested_sites}") - OmadaHelper.verbose(f"Available sites: {available_sites}") - OmadaHelper.verbose(f"Active sites: {active_sites}") - - OmadaHelper.minimal("Starting data collection process") - - for site_id, site_name in active_sites.items(): - OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})") - - devices_response = omada_api.get_devices(site_id) - if devices_response["response_type"] != "success": - OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}") - else: - devices = devices_response["response_result"].get("result").get("data", []) - OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}") - devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE) - make_entries(plugin_objects, devices) - - clients_response = omada_api.get_clients(site_id) - if clients_response["response_type"] != "success": - OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}") - else: - clients = clients_response["response_result"].get("result").get("data", []) - OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}") - clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE) - make_entries(plugin_objects, clients) - - OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})") - - OmadaHelper.minimal("Completed data collection process") - return plugin_objects - def main(): start_time = time.time() - OmadaHelper.minimal("Starting execution") + OmadaHelper.minimal(f"Starting execution, version {__version__}") # Initialize the Plugin object output file plugin_objects = Plugin_Objects(RESULT_FILE) @@ -468,8 +510,8 @@ def main(): } OmadaHelper.verbose("Configuration options loaded") - # Retrieve entries - plugin_objects = get_entries(plugin_objects) + # Retrieve entries and write result + plugin_objects = OmadaData.collect_data(plugin_objects) plugin_objects.write_result_file() # Finish