From 65a5d35801c95cf10bb0567d64d25a89c249d004 Mon Sep 17 00:00:00 2001 From: xfilo Date: Mon, 24 Feb 2025 10:52:18 +0100 Subject: [PATCH 1/4] OMDSDNOPENAPI - Run command change due to plugin folder rename --- front/plugins/omada_sdn_openapi/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/front/plugins/omada_sdn_openapi/config.json b/front/plugins/omada_sdn_openapi/config.json index b0aa6715..864284be 100755 --- a/front/plugins/omada_sdn_openapi/config.json +++ b/front/plugins/omada_sdn_openapi/config.json @@ -331,7 +331,7 @@ } ] }, - "default_value": "python3 /app/front/plugins/omada_sdn_openapi_import/script.py", + "default_value": "python3 /app/front/plugins/omada_sdn_openapi/script.py", "options": [], "localized": ["name", "description"], "name": [ From 52de3ae872472d33f4c19160443d71c95054a2d0 Mon Sep 17 00:00:00 2001 From: xfilo Date: Mon, 24 Feb 2025 11:36:55 +0100 Subject: [PATCH 2/4] OMDSDNOPENAPI - Updated example image in README.md --- front/plugins/omada_sdn_openapi/README.md | 2 +- ..._settings.png => omada_sdn_openapi_settings.png} | Bin 2 files changed, 1 insertion(+), 1 deletion(-) rename front/plugins/omada_sdn_openapi/{omada_sdn_openapi_import_settings.png => omada_sdn_openapi_settings.png} (100%) diff --git a/front/plugins/omada_sdn_openapi/README.md b/front/plugins/omada_sdn_openapi/README.md index 79853bae..efa705d3 100755 --- a/front/plugins/omada_sdn_openapi/README.md +++ b/front/plugins/omada_sdn_openapi/README.md @@ -65,7 +65,7 @@ - Settings: -- ![settings_example](/front/plugins/omada_sdn_openapi_import/omada_sdn_openapi_import_settings.png) +- ![settings_example](/front/plugins/omada_sdn_openapi/omada_sdn_openapi_settings.png) ### ℹ️ Other info diff --git a/front/plugins/omada_sdn_openapi/omada_sdn_openapi_import_settings.png b/front/plugins/omada_sdn_openapi/omada_sdn_openapi_settings.png similarity index 100% rename from front/plugins/omada_sdn_openapi/omada_sdn_openapi_import_settings.png rename to front/plugins/omada_sdn_openapi/omada_sdn_openapi_settings.png From bf3fdd2766f87fc6c13a4467ab353e556f4c0cf3 Mon Sep 17 00:00:00 2001 From: xfilo Date: Mon, 24 Feb 2025 16:03:07 +0100 Subject: [PATCH 3/4] OMDSDNOPENAPI - Rephrased error messages, improved logging and code logic --- front/plugins/omada_sdn_openapi/script.py | 161 ++++++++++------------ 1 file changed, 76 insertions(+), 85 deletions(-) diff --git a/front/plugins/omada_sdn_openapi/script.py b/front/plugins/omada_sdn_openapi/script.py index 00e64288..0175933d 100755 --- a/front/plugins/omada_sdn_openapi/script.py +++ b/front/plugins/omada_sdn_openapi/script.py @@ -17,6 +17,7 @@ Author: https://github.com/xfilo __author__ = "xfilo" __version__ = 0.1 # Initial version +__version__ = 0.2 # Rephrased error messages, improved logging and code logic import os import sys @@ -59,7 +60,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class OmadaHelper: @staticmethod def log(message: str, level: Literal["minimal", "verbose", "debug", "trace"] = "minimal") -> None: - mylog(level, [f"[{pluginName}] {message}"]) + mylog(level, [f"[{pluginName}] [{level[:1].upper()}] {message}"]) @staticmethod def debug(message: str) -> None: @@ -75,8 +76,7 @@ class OmadaHelper: @staticmethod def response(response_type: str, response_message: str, response_result: Any = None) -> Dict[str, Any]: - return {"response_type": response_type, "response_message": response_message, - "response_result": response_result} + return {"response_type": response_type, "response_message": response_message, "response_result": response_result} @staticmethod def timestamp_to_datetime(ms: int, timezone: str) -> Dict[str, Any]: @@ -127,7 +127,7 @@ class OmadaHelper: if not isinstance(input_data, list): raise Exception(f"Expected a list, but got '{type(input_data)}'.") - OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site {site_name}") + OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site: {site_name}") default_entry = { "mac_address": "", "ip_address": "", @@ -158,45 +158,35 @@ class OmadaHelper: entry["name"] = data.get("name", "") last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone) - entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get( - "response_type") == "success" else "" + entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get("response_type") == "success" else "" if input_type == "device": entry["device_type"] = data.get("type") if data.get("type", "") != "gateway": parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac")) - parent_mac = parent_mac.get("response_result") if isinstance(parent_mac, - dict) and parent_mac.get( - "response_type") == "success" else "" - entry["parent_node_mac_address"] = parent_mac + entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" if input_type == "client": entry["vlan_id"] = data.get("vid") entry["device_type"] = data.get("deviceType") if data.get("connectDevType", "") == "gateway": parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac")) - entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, - dict) and parent_mac.get( - "response_type") == "success" else "" + entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" entry["parent_node_port"] = data.get("port", "") elif data.get("connectDevType", "") == "switch": parent_mac = OmadaHelper.normalize_mac(data.get("switchMac")) - entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, - dict) and parent_mac.get( - "response_type") == "success" else "" + entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" entry["parent_node_port"] = data.get("port", "") elif data.get("connectDevType", "") == "ap": parent_mac = OmadaHelper.normalize_mac(data.get("apMac")) - entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, - dict) and parent_mac.get( - "response_type") == "success" else "" + entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" entry["parent_node_ssid"] = data.get("ssid", "") result.append(entry) OmadaHelper.debug(f"Processed {input_type} entry: {entry}") msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}" - OmadaHelper.verbose(msg) + OmadaHelper.minimal(msg) final_result = OmadaHelper.response("success", msg, result) except Exception as ex: msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}" @@ -255,7 +245,7 @@ class OmadaAPI: def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]: time.sleep(1) # Sleep before making any request so it does not rate-limited - OmadaHelper.verbose(f"{method} request to endpoint: {endpoint}") + OmadaHelper.debug(f"{method} request to endpoint: {endpoint}") url = f"{getattr(self, 'host')}{endpoint}" headers = self._get_headers(kwargs.pop('include_auth', True)) try: @@ -264,37 +254,42 @@ class OmadaAPI: data = response.json() response_type = "error" if data.get("errorCode", 0) != 0 else "success" msg = f"{method} request completed: {endpoint}" - OmadaHelper.minimal(msg) + OmadaHelper.verbose(msg) return OmadaHelper.response(response_type, msg, data) except requests.exceptions.RequestException as ex: - msg = f"{method} request failed: {str(ex)}" - OmadaHelper.minimal(f"{method} request to {endpoint} failed") - OmadaHelper.verbose(msg) - return OmadaHelper.response("error", msg) + OmadaHelper.minimal(f"{method} request failed: {url}") + OmadaHelper.verbose(f"{method} request error: {str(ex)}") + return OmadaHelper.response("error", f"{method} request failed to endpoint '{endpoint}' with error: {str(ex)}") def authenticate(self) -> Dict[str, any]: """Make an endpoint request to get access token.""" OmadaHelper.verbose("Starting authentication process") + + # Endpoint request endpoint = "/openapi/authorize/token?grant_type=client_credentials" payload = { "omadacId": getattr(self, 'omada_id'), "client_id": getattr(self, 'client_id'), "client_secret": getattr(self, 'client_secret') } - response = self._make_request("POST", endpoint, json=payload, include_auth=False) - if response["response_type"] == "success": - token_data = response["response_result"] - if token_data.get("errorCode") == 0: - self.access_token = token_data["result"]["accessToken"] - self.refresh_token = token_data["result"]["refreshToken"] - OmadaHelper.minimal("Authentication successful") - return OmadaHelper.response("success", "Authenticated successfully") - OmadaHelper.minimal("Authentication failed") + # Successful endpoint response + if response.get("response_type") == "success": + response_result = response.get("response_result") + error_code = response_result.get("errorCode") + access_token = response_result.get("result").get("accessToken") + refresh_token = response_result.get("result").get("refreshToken") + if error_code == 0 and access_token and refresh_token: + self.access_token = access_token + self.refresh_token = refresh_token + msg = "Successfully authenticated" + OmadaHelper.minimal(msg) + return OmadaHelper.response("success", msg) + + # Failed authentication OmadaHelper.debug(f"Authentication response: {response}") - return OmadaHelper.response("error", - f"Authentication failed - error: {response.get('response_result').get('msg')}") + return OmadaHelper.response("error", f"Authentication failed - error: {response.get('response_message', 'Not provided')}") def get_clients(self, site_id: str) -> Dict[str, Any]: """Make an endpoint request to get all online clients on a site.""" @@ -308,52 +303,48 @@ class OmadaAPI: endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites/{site_id}/devices?page=1&pageSize={getattr(self, 'page_size')}" return self._make_request("GET", endpoint) - def get_sites(self) -> Dict[str, Any]: - """Make an endpoint request to populate all accesible sites.""" - OmadaHelper.verbose("Retrieving all accessible sites") - endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}" - return self._make_request("GET", endpoint) - def populate_sites(self) -> Dict[str, Any]: - """Make an endpoint request to populate sites.""" - try: - OmadaHelper.verbose("Starting site population process") + """Make an endpoint request to populate all accessible sites.""" + OmadaHelper.verbose("Starting site population process") - # All allowed sites for credentials - all_sites = self.get_sites()["response_result"].get("result").get("data", []) - OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total") + # Endpoint request + endpoint = f"/openapi/v1/{getattr(self, 'omada_id')}/sites?page=1&pageSize={getattr(self, 'page_size')}" + response = self._make_request("GET", endpoint) - # All available sites - self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites} - OmadaHelper.debug(f"Available sites: {self.available_sites_dict}") + # Successful endpoint response + if response.get("response_type") == "success": + response_result = response.get("response_result") + if response_result.get("errorCode") == 0: + # All allowed sites for credentials + all_sites = response_result.get("result", "").get("data", []) + OmadaHelper.debug(f"Retrieved {len(all_sites)} sites in total") - # All valid sites from input - active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if - site["siteId"] in self.requested_sites()} - active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if - site["name"] in self.requested_sites()} - self.active_sites_dict = active_sites_by_id | active_sites_by_name - OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}") + # All available sites + self.available_sites_dict = {site["siteId"]: site["name"] for site in all_sites} + OmadaHelper.debug(f"Available sites: {self.available_sites_dict}") - # If none of the input sites is valid/accessible, default to the first available site - if not self.active_sites_dict: - OmadaHelper.verbose( - "No valid site requested by configuration options, defaulting to first available site") - first_available_site = next(iter(self.available_sites_dict.items()), (None, None)) - if first_available_site[0]: # Check if there's an available site - self.active_sites_dict = {first_available_site[0]: first_available_site[1]} - OmadaHelper.debug(f"Using first available site: {first_available_site}") + # All valid sites from input + active_sites_by_id = {site["siteId"]: site["name"] for site in all_sites if site["siteId"] in self.requested_sites()} + active_sites_by_name = {site["siteId"]: site["name"] for site in all_sites if site["name"] in self.requested_sites()} + self.active_sites_dict = active_sites_by_id | active_sites_by_name + OmadaHelper.debug(f"Active sites after filtering: {self.active_sites_dict}") - msg = f"Populated {len(self.active_sites_dict)} active sites" - OmadaHelper.verbose(msg) - result = OmadaHelper.response("success", msg) - except Exception as ex: - OmadaHelper.minimal("Failed to populate sites") - msg = f"Site population error: {str(ex)}" - OmadaHelper.verbose(msg) - result = OmadaHelper.response("error", msg) + # If none of the input sites is valid/accessible, default to the first available site + if not self.active_sites_dict: + OmadaHelper.verbose("No valid site requested by configuration options, defaulting to first available site") + first_available_site = next(iter(self.available_sites_dict.items()), (None, None)) + if first_available_site[0]: # Check if there's an available site + self.active_sites_dict = {first_available_site[0]: first_available_site[1]} + OmadaHelper.debug(f"Using first available site: {first_available_site}") - return result + # Successful site population + msg = f"Successfully populated {len(self.active_sites_dict)} site(s)" + OmadaHelper.minimal(msg) + return OmadaHelper.response("success", msg) + + # Failed site population + OmadaHelper.debug(f"Site population response: {response}") + return OmadaHelper.response("error", f"Site population failed - error: {response.get('response_message', 'Not provided')}") def requested_sites(self) -> list: """Returns sites requested by user.""" @@ -370,8 +361,7 @@ class OmadaAPI: def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None: if normalized_input_data.get("response_type", "error") != "success": - OmadaHelper.minimal( - f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided.')}") + OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}") return response_result = normalized_input_data.get("response_result", {}) @@ -407,19 +397,18 @@ def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: - OmadaHelper.minimal("Starting data collection process") omada_api = OmadaAPI(OPTIONS) auth_result = omada_api.authenticate() if auth_result["response_type"] == "error": OmadaHelper.minimal("Authentication failed, aborting data collection") - OmadaHelper.debug(f"Authentication error - {auth_result['response_message']}") + OmadaHelper.debug(f"{auth_result['response_message']}") return plugin_objects sites_result = omada_api.populate_sites() if sites_result["response_type"] == "error": OmadaHelper.minimal("Site population failed, aborting data collection") - OmadaHelper.debug(f"Site population error - {auth_result['response_message']}") + OmadaHelper.debug(f"{sites_result['response_message']}") return plugin_objects requested_sites = omada_api.requested_sites() @@ -428,13 +417,15 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: OmadaHelper.verbose(f"Requested sites: {requested_sites}") OmadaHelper.verbose(f"Available sites: {available_sites}") - OmadaHelper.minimal(f"Active sites: {active_sites}") + OmadaHelper.verbose(f"Active sites: {active_sites}") + + OmadaHelper.minimal("Starting data collection process") for site_id, site_name in active_sites.items(): OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})") devices_response = omada_api.get_devices(site_id) - if devices_response["response_type"] == "error": + if devices_response["response_type"] != "success": OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}") else: devices = devices_response["response_result"].get("result").get("data", []) @@ -443,7 +434,7 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: make_entries(plugin_objects, devices) clients_response = omada_api.get_clients(site_id) - if clients_response["response_type"] == "error": + if clients_response["response_type"] != "success": OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}") else: clients = clients_response["response_result"].get("result").get("data", []) @@ -453,7 +444,7 @@ def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})") - OmadaHelper.minimal("Data collection process completed") + OmadaHelper.minimal("Completed data collection process") return plugin_objects From bf151bd69aa807c0c4fa13f614d847cd9853fcbd Mon Sep 17 00:00:00 2001 From: xfilo Date: Mon, 24 Feb 2025 20:51:01 +0100 Subject: [PATCH 4/4] OMDSDNOPENAPI - Refactored data collection into a class, improved code clarity with comments --- front/plugins/omada_sdn_openapi/script.py | 220 +++++++++++++--------- 1 file changed, 131 insertions(+), 89 deletions(-) diff --git a/front/plugins/omada_sdn_openapi/script.py b/front/plugins/omada_sdn_openapi/script.py index 0175933d..d3d6e718 100755 --- a/front/plugins/omada_sdn_openapi/script.py +++ b/front/plugins/omada_sdn_openapi/script.py @@ -9,7 +9,7 @@ However, I found that approach somewhat unstable, so I decided to give it a shot and create a new plugin with the goal of providing same, but more reliable results. -Please note that this is my first plugin, and I’m not a Python developer. +Please note that this is my first plugin, and I'm not a Python developer. Any comments, bug fixes, or contributions are greatly appreciated. Author: https://github.com/xfilo @@ -18,6 +18,7 @@ Author: https://github.com/xfilo __author__ = "xfilo" __version__ = 0.1 # Initial version __version__ = 0.2 # Rephrased error messages, improved logging and code logic +__version__ = 0.3 # Refactored data collection into a class, improved code clarity with comments import os import sys @@ -85,6 +86,7 @@ class OmadaHelper: if not ms or not isinstance(ms, (str, int)): raise ValueError(f"Value '{ms}' is not a valid timestamp") + # Convert UTC millisecond timestamp to datetime in NetAlertX's timezone timestamp = ms / 1000 tz = pytz.timezone("UTC") utc_datetime = datetime.fromtimestamp(timestamp, tz=tz) @@ -111,6 +113,7 @@ class OmadaHelper: if not mac or not isinstance(mac, str) or mac is None: raise Exception(f"Value '{mac}' is not a valid MAC address") + # Replace - with : in a MAC address and make it lowercase result = mac.lower().replace("-", ":") msg = f"Normalized MAC address from {mac} to {result}" OmadaHelper.debug(msg) @@ -128,6 +131,8 @@ class OmadaHelper: raise Exception(f"Expected a list, but got '{type(input_data)}'.") OmadaHelper.verbose(f"Starting normalization of {len(input_data)} {input_type}(s) from site: {site_name}") + + # The default return structure for one device/client default_entry = { "mac_address": "", "ip_address": "", @@ -141,8 +146,11 @@ class OmadaHelper: } result = [] + + # Loop through each device/client for data in input_data: + # Normalize and verify MAC address mac = OmadaHelper.normalize_mac(data.get("mac")) if not isinstance(mac, dict) or mac.get("response_type") != "success": continue @@ -152,23 +160,29 @@ class OmadaHelper: OmadaHelper.debug(f"Skipping {input_type}, not a MAC address: {mac}") continue + # Assigning mandatory return values entry = default_entry.copy() entry["mac_address"] = mac - entry["ip_address"] = data.get("ip", "") - entry["name"] = data.get("name", "") + entry["ip_address"] = data.get("ip") + entry["name"] = data.get("name") + # Assign the last datetime the device/client was seen on the network last_seen = OmadaHelper.timestamp_to_datetime(data.get("lastSeen", 0), timezone) entry["last_seen"] = last_seen.get("response_result") if isinstance(last_seen, dict) and last_seen.get("response_type") == "success" else "" + # Applicable only for DEVICE if input_type == "device": entry["device_type"] = data.get("type") + # If it's not a gateway try to assign parent node MAC if data.get("type", "") != "gateway": parent_mac = OmadaHelper.normalize_mac(data.get("uplinkDeviceMac")) entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" + # Applicable only for CLIENT if input_type == "client": entry["vlan_id"] = data.get("vid") entry["device_type"] = data.get("deviceType") + # Try to assign parent node MAC and PORT/SSID to the CLIENT if data.get("connectDevType", "") == "gateway": parent_mac = OmadaHelper.normalize_mac(data.get("gatewayMac")) entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" @@ -182,18 +196,17 @@ class OmadaHelper: entry["parent_node_mac_address"] = parent_mac.get("response_result") if isinstance(parent_mac, dict) and parent_mac.get("response_type") == "success" else "" entry["parent_node_ssid"] = data.get("ssid", "") + # Add the entry to the result result.append(entry) OmadaHelper.debug(f"Processed {input_type} entry: {entry}") msg = f"Successfully normalized {len(result)} {input_type}(s) from site: {site_name}" OmadaHelper.minimal(msg) - final_result = OmadaHelper.response("success", msg, result) + return OmadaHelper.response("success", msg, result) except Exception as ex: msg = f"Failed normalizing {input_type}(s) from site '{site_name}' - error: {str(ex)}" OmadaHelper.verbose(msg) - final_result = OmadaHelper.response("error", msg) - - return final_result + return OmadaHelper.response("error", msg) class OmadaAPI: @@ -213,14 +226,18 @@ class OmadaAPI: # Validate and set attributes for param_name, param_info in params.items(): + # Get user parameter input, or default value if any value = options.get(param_name, param_info.get("default")) + # Check if a parameter is required and if it's value is non-empty if param_info["required"] and (value is None or (param_info["type"] == str and not value)): raise ValueError(f"{param_name} is required and must be a non-empty {param_info['type'].__name__}") + # Check if a parameter has a correct datatype if not isinstance(value, param_info["type"]): raise TypeError(f"{param_name} must be of type {param_info['type'].__name__}") + # Assign the parameter to the class setattr(self, param_name, value) OmadaHelper.debug(f"Initialized option '{param_name}' with value: {value}") @@ -235,6 +252,7 @@ class OmadaAPI: def _get_headers(self, include_auth: bool = True) -> dict: """Return request headers.""" headers = {"Content-type": "application/json"} + # Add access token to header if requested and available if include_auth == True: if not self.access_token: OmadaHelper.debug("No access token available for headers") @@ -244,14 +262,18 @@ class OmadaAPI: return headers def _make_request(self, method: str, endpoint: str, **kwargs: Any) -> Dict[str, Any]: + """Make a request to an endpoint.""" time.sleep(1) # Sleep before making any request so it does not rate-limited OmadaHelper.debug(f"{method} request to endpoint: {endpoint}") url = f"{getattr(self, 'host')}{endpoint}" headers = self._get_headers(kwargs.pop('include_auth', True)) try: + # Make the request and get the response response = requests.request(method, url, headers=headers, verify=getattr(self, 'verify_ssl'), **kwargs) response.raise_for_status() data = response.json() + + # Check if the response contains an error code and determine the function response type response_type = "error" if data.get("errorCode", 0) != 0 else "success" msg = f"{method} request completed: {endpoint}" OmadaHelper.verbose(msg) @@ -280,6 +302,8 @@ class OmadaAPI: error_code = response_result.get("errorCode") access_token = response_result.get("result").get("accessToken") refresh_token = response_result.get("result").get("refreshToken") + + # Authentication is successful if there isn't a response error, and access_token and refresh_token are set if error_code == 0 and access_token and refresh_token: self.access_token = access_token self.refresh_token = refresh_token @@ -359,98 +383,116 @@ class OmadaAPI: return self.active_sites_dict -def make_entries(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None: - if normalized_input_data.get("response_type", "error") != "success": - OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}") - return +class OmadaData: + @staticmethod + def create_data(plugin_objects: Plugin_Objects, normalized_input_data: dict) -> None: + """Creates plugin object from normalized input data.""" + if normalized_input_data.get("response_type", "error") != "success": + OmadaHelper.minimal(f"Unable to make entries - error: {normalized_input_data.get('response_message', 'Not provided')}") + return - response_result = normalized_input_data.get("response_result", {}) - for entry in response_result: - OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}") + # Loop through every device/client and make an plugin entry + response_result = normalized_input_data.get("response_result", {}) + for entry in response_result: + if len(entry) == 0: + OmadaHelper.minimal(f"Skipping entry, missing data.") + continue - parent_node = entry["parent_node_mac_address"] - if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]): - parent_node = "Internet" + OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}") - device_type = entry["device_type"].lower() - if device_type == "iphone": - device_type = "iPhone" - elif device_type == "pc": - device_type = "PC" - else: - device_type = device_type.capitalize() + # If the device_type is gateway, set the parent_node to Internet + device_type = entry["device_type"].lower() + parent_node = entry["parent_node_mac_address"] + if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]): + parent_node = "Internet" - plugin_objects.add_object( - primaryId=entry["mac_address"], - secondaryId=entry["ip_address"], - watched1=entry["name"], - watched2=parent_node, - watched3=entry["parent_node_port"], - watched4=entry["parent_node_ssid"], - extra=device_type, - foreignKey=entry["mac_address"], - helpVal1=entry["last_seen"], - helpVal2=entry["site_name"], - helpVal3=entry["vlan_id"], - helpVal4="null" - ) + # Some device type naming exceptions + if device_type == "iphone": + device_type = "iPhone" + elif device_type == "pc": + device_type = "PC" + else: + device_type = device_type.capitalize() + # Add the plugin object + plugin_objects.add_object( + primaryId=entry["mac_address"], + secondaryId=entry["ip_address"], + watched1=entry["name"], + watched2=parent_node, + watched3=entry["parent_node_port"], + watched4=entry["parent_node_ssid"], + extra=device_type, + foreignKey=entry["mac_address"], + helpVal1=entry["last_seen"], + helpVal2=entry["site_name"], + helpVal3=entry["vlan_id"], + helpVal4="null" + ) -def get_entries(plugin_objects: Plugin_Objects) -> Plugin_Objects: - omada_api = OmadaAPI(OPTIONS) + @staticmethod + def collect_data(plugin_objects: Plugin_Objects) -> Plugin_Objects: + """Collects device and client data from Omada Controller.""" + omada_api = OmadaAPI(OPTIONS) - auth_result = omada_api.authenticate() - if auth_result["response_type"] == "error": - OmadaHelper.minimal("Authentication failed, aborting data collection") - OmadaHelper.debug(f"{auth_result['response_message']}") + # Authenticate + auth_result = omada_api.authenticate() + if auth_result["response_type"] == "error": + OmadaHelper.minimal("Authentication failed, aborting data collection") + OmadaHelper.debug(f"{auth_result['response_message']}") + return plugin_objects + + # Populate sites + sites_result = omada_api.populate_sites() + if sites_result["response_type"] == "error": + OmadaHelper.minimal("Site population failed, aborting data collection") + OmadaHelper.debug(f"{sites_result['response_message']}") + return plugin_objects + + requested_sites = omada_api.requested_sites() + available_sites = omada_api.available_sites() + active_sites = omada_api.active_sites() + + OmadaHelper.verbose(f"Requested sites: {requested_sites}") + OmadaHelper.verbose(f"Available sites: {available_sites}") + OmadaHelper.verbose(f"Active sites: {active_sites}") + + OmadaHelper.minimal("Starting data collection process") + + # Loop through sites and collect data + for site_id, site_name in active_sites.items(): + OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})") + + # Collect device data + devices_response = omada_api.get_devices(site_id) + if devices_response["response_type"] != "success": + OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}") + else: + devices = devices_response["response_result"].get("result").get("data", []) + OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}") + devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE) + OmadaData.create_data(plugin_objects, devices) + + # Collect client data + clients_response = omada_api.get_clients(site_id) + if clients_response["response_type"] != "success": + OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}") + else: + clients = clients_response["response_result"].get("result").get("data", []) + OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}") + clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE) + OmadaData.create_data(plugin_objects, clients) + + OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})") + + # Complete collection and return plugin object + OmadaHelper.minimal("Completed data collection process") return plugin_objects - sites_result = omada_api.populate_sites() - if sites_result["response_type"] == "error": - OmadaHelper.minimal("Site population failed, aborting data collection") - OmadaHelper.debug(f"{sites_result['response_message']}") - return plugin_objects - - requested_sites = omada_api.requested_sites() - available_sites = omada_api.available_sites() - active_sites = omada_api.active_sites() - - OmadaHelper.verbose(f"Requested sites: {requested_sites}") - OmadaHelper.verbose(f"Available sites: {available_sites}") - OmadaHelper.verbose(f"Active sites: {active_sites}") - - OmadaHelper.minimal("Starting data collection process") - - for site_id, site_name in active_sites.items(): - OmadaHelper.verbose(f"Processing site: {site_name} ({site_id})") - - devices_response = omada_api.get_devices(site_id) - if devices_response["response_type"] != "success": - OmadaHelper.minimal(f"Failed to retrieve devices for site: {site_name}") - else: - devices = devices_response["response_result"].get("result").get("data", []) - OmadaHelper.debug(f"Retrieved {len(devices)} device(s) from site: {site_name}") - devices = OmadaHelper.normalize_data(devices, "device", site_name, TIMEZONE) - make_entries(plugin_objects, devices) - - clients_response = omada_api.get_clients(site_id) - if clients_response["response_type"] != "success": - OmadaHelper.minimal(f"Failed to retrieve clients for site {site_name}") - else: - clients = clients_response["response_result"].get("result").get("data", []) - OmadaHelper.debug(f"Retrieved {len(clients)} client(s) from site: {site_name}") - clients = OmadaHelper.normalize_data(clients, "client", site_name, TIMEZONE) - make_entries(plugin_objects, clients) - - OmadaHelper.verbose(f"Site complete: {site_name} ({site_id})") - - OmadaHelper.minimal("Completed data collection process") - return plugin_objects - def main(): start_time = time.time() - OmadaHelper.minimal("Starting execution") + OmadaHelper.minimal(f"Starting execution, version {__version__}") # Initialize the Plugin object output file plugin_objects = Plugin_Objects(RESULT_FILE) @@ -468,8 +510,8 @@ def main(): } OmadaHelper.verbose("Configuration options loaded") - # Retrieve entries - plugin_objects = get_entries(plugin_objects) + # Retrieve entries and write result + plugin_objects = OmadaData.collect_data(plugin_objects) plugin_objects.write_result_file() # Finish