diff --git a/front/plugins/_publisher_mqtt/mqtt.py b/front/plugins/_publisher_mqtt/mqtt.py index c21671e3..a663555f 100755 --- a/front/plugins/_publisher_mqtt/mqtt.py +++ b/front/plugins/_publisher_mqtt/mqtt.py @@ -1,34 +1,29 @@ #!/usr/bin/env python import json -import subprocess -import argparse import os -import pathlib import sys from datetime import datetime import time import re -import unicodedata import paho.mqtt.client as mqtt # from paho.mqtt import client as mqtt_client # from paho.mqtt import CallbackAPIVersion as mqtt_CallbackAPIVersion import hashlib -import sqlite3 # Register NetAlertX directories -INSTALL_PATH="/app" +INSTALL_PATH = "/app" sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) # NetAlertX modules import conf -from const import apiPath, confFileName, logPath +from const import confFileName, logPath from plugin_utils import getPluginObject from plugin_helper import Plugin_Objects -from logger import mylog, Logger, append_line_to_file -from helper import timeNowTZ, get_setting_value, bytes_to_string, sanitize_string, normalize_string -from models.notification_instance import NotificationInstance +from logger import mylog, Logger +from helper import timeNowTZ, get_setting_value, bytes_to_string, \ + sanitize_string, normalize_string from database import DB, get_device_stats from pytz import timezone @@ -49,20 +44,22 @@ plugin_objects = Plugin_Objects(RESULT_FILE) md5_hash = hashlib.md5() - # globals mqtt_sensors = [] mqtt_connected_to_broker = False mqtt_client = None # mqtt client topic_root = get_setting_value('MQTT_topic_root') + def main(): - - mylog('verbose', [f'[{pluginName}](publisher) In script']) - + + mylog('verbose', [f'[{pluginName}](publisher) In script']) + # Check if basic config settings supplied - if check_config() == False: - mylog('verbose', [f'[{pluginName}] ⚠ ERROR: Publisher notification gateway not set up correctly. Check your {confFileName} {pluginName}_* variables.']) + if not check_config(): + mylog('verbose', [f'[{pluginName}] ⚠ ERROR: Publisher notification \ + gateway not set up correctly. Check your {confFileName} \ + {pluginName}_* variables.']) return # Create a database connection @@ -74,56 +71,80 @@ def main(): plugin_objects.write_result_file() - -#------------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- # MQTT -#------------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- def check_config(): - if get_setting_value('MQTT_BROKER') == '' or get_setting_value('MQTT_PORT') == '' or get_setting_value('MQTT_USER') == '' or get_setting_value('MQTT_PASSWORD') == '': - mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up correctly. Check your {confFileName} MQTT_* variables.']) - return False - else: - return True + """ + Checks whether the MQTT configuration settings are properly set. + + Returns: + bool: True if all required MQTT settings + ('MQTT_BROKER', 'MQTT_PORT', 'MQTT_USER', 'MQTT_PASSWORD') + are non-empty; + False otherwise. Logs a verbose error message + if any setting is missing. + """ + if get_setting_value('MQTT_BROKER') == '' \ + or get_setting_value('MQTT_PORT') == '' \ + or get_setting_value('MQTT_USER') == '' \ + or get_setting_value('MQTT_PASSWORD') == '': + mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up \ + correctly. Check your {confFileName} MQTT_* variables.']) + return False + else: + return True -#------------------------------------------------------------------------------- -# Sensor configs are tracking which sensors in NetAlertX exist and if a config has changed +# ----------------------------------------------------------------------------- +# Sensor configs are tracking which sensors in NetAlertX exist +# and if a config has changed class sensor_config: - def __init__(self, deviceId, deviceName, sensorType, sensorName, icon, mac): + def __init__(self, + deviceId, + deviceName, + sensorType, + sensorName, + icon, + mac): """ - Initialize the sensor_config object with provided parameters. Sets up sensor configuration - and generates necessary MQTT topics and messages based on the sensor type. + Initialize the sensor_config object with provided parameters. + Sets up sensor configuration and generates necessary MQTT topics + and messages based on the sensor type. """ # Assign initial attributes self.deviceId = deviceId self.deviceName = deviceName self.sensorType = sensorType self.sensorName = sensorName - self.icon = icon + self.icon = icon self.mac = mac - self.model = deviceName - self.hash = '' + self.model = deviceName + self.hash = '' self.state_topic = '' self.json_attr_topic = '' self.topic = '' self.message = {} # Initialize message as an empty dictionary self.unique_id = '' - # Call helper functions to initialize the message, generate a hash, and handle plugin object + # Call helper functions to initialize the message, generate a hash, + # and handle plugin object self.initialize_message() self.generate_hash() self.handle_plugin_object() def initialize_message(self): """ - Initialize the MQTT message payload based on the sensor type. This method handles sensors of types: + Initialize the MQTT message payload based on the sensor type. + This method handles sensors of types: - 'timestamp' - 'binary_sensor' - 'sensor' - 'device_tracker' """ - # Ensure self.message is initialized as a dictionary if not already done + # Ensure self.message is initialized as a dictionary + # if not already done if not isinstance(self.message, dict): self.message = {} @@ -153,7 +174,6 @@ class sensor_config: "icon": f'mdi:{self.icon}' }) - # Handle 'device_tracker' sensor type elif self.sensorType == 'device_tracker': self.topic = f'homeassistant/device_tracker/{self.deviceId}/config' @@ -229,25 +249,36 @@ class sensor_config: ) -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- def publish_mqtt(mqtt_client, topic, message): + """ + Publishes a message to an MQTT topic using the provided MQTT client. + If the message is not a string, it is converted to a JSON-formatted string. + The function retrieves the desired QoS level from settings and logs the publishing process. + If the client is not connected to the broker, the function logs an error and aborts. + It attempts to publish the message, retrying until the publish status indicates success. + Args: + mqtt_client: The MQTT client instance used to publish the message. + topic (str): The MQTT topic to publish to. + message (Any): The message payload to send. Non-string messages are converted to JSON. + Returns: + bool: True if the message was published successfully, False if not connected to the broker. + """ status = 1 # convert anything but a simple string to json if not isinstance(message, str): - message = json.dumps(message).replace("'",'"') + message = json.dumps(message).replace("'", '"') qos = get_setting_value('MQTT_QOS') - mylog('verbose', [f"[{pluginName}] Sending MQTT topic: {topic}"]) - mylog('verbose', [f"[{pluginName}] Sending MQTT message: {message}"]) + mylog('debug', [f"[{pluginName}] Sending MQTT topic: {topic}"]) + mylog('debug', [f"[{pluginName}] Sending MQTT message: {message}"]) # mylog('verbose', [f"[{pluginName}] get_setting_value('MQTT_QOS'): {qos}"]) - if mqtt_connected_to_broker == False: - - mylog('verbose', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."]) - + if not mqtt_connected_to_broker: + mylog('minimal', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."]) return False while status != 0: @@ -267,45 +298,46 @@ def publish_mqtt(mqtt_client, topic, message): # mylog('verbose', [f"[{pluginName}] status: {status}"]) # mylog('verbose', [f"[{pluginName}] result: {result}"]) - if status != 0: - mylog('verbose', [f"[{pluginName}] Waiting to reconnect to MQTT broker"]) - time.sleep(0.1) + if status != 0: + mylog('debug', [f"[{pluginName}] Waiting to reconnect to MQTT broker"]) + time.sleep(0.1) return True -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------ # Create a generic device for overal stats -def create_generic_device(mqtt_client, deviceId, deviceName): - - create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'online', 'wifi-check') - create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'down', 'wifi-cancel') +def create_generic_device(mqtt_client, deviceId, deviceName): + + create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'online', 'wifi-check') + create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'down', 'wifi-cancel') create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'all', 'wifi') create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'archived', 'wifi-lock') create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'new', 'wifi-plus') create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'unknown', 'wifi-alert') - -#------------------------------------------------------------------------------- + +# ------------------------------------------------------------------------------ # Register sensor config on the broker -def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""): - - global mqtt_sensors +def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""): + global mqtt_sensors # check previous configs - sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac) + sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac) - # send if new - if sensorConfig.isNew: + # Create the HA sensor config if a new device is discovered + if sensorConfig.isNew: # add the sensor to the global list to keep track of succesfully added sensors - if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message): - # hack - delay adding to the queue in case the process is - time.sleep(get_setting_value('MQTT_DELAY_SEC')) # restarted and previous publish processes aborted - # (it takes ~2s to update a sensor config on the broker) - mqtt_sensors.append(sensorConfig) + if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message): + # hack - delay adding to the queue in case the process is + # restarted and previous publish processes aborted + # (it takes ~2s to update a sensor config on the broker) + time.sleep(get_setting_value('MQTT_DELAY_SEC')) + mqtt_sensors.append(sensorConfig) return sensorConfig -#------------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- def mqtt_create_client(): # attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python @@ -313,11 +345,11 @@ def mqtt_create_client(): RECONNECT_RATE = 2 MAX_RECONNECT_COUNT = 12 MAX_RECONNECT_DELAY = 60 - - mytransport = 'tcp' # or 'websockets' + + mytransport = 'tcp' # or 'websockets' def on_disconnect(mqtt_client, userdata, rc): - + global mqtt_connected_to_broker mylog('verbose', [f"[{pluginName}] Connection terminated, reason_code: {rc}"]) @@ -328,7 +360,7 @@ def mqtt_create_client(): try: mqtt_client.reconnect() - mqtt_connected_to_broker = True # Signal connection + mqtt_connected_to_broker = True # Signal connection mylog('verbose', [f"[{pluginName}] Reconnected successfully"]) return except Exception as err: @@ -338,19 +370,18 @@ def mqtt_create_client(): reconnect_delay *= RECONNECT_RATE reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) reconnect_count += 1 - + mqtt_connected_to_broker = False - def on_connect(mqtt_client, userdata, flags, rc, properties): - + global mqtt_connected_to_broker # REF: Good docu on reason codes: https://www.emqx.com/en/blog/mqtt5-new-features-reason-code-and-ack - if rc == 0: - mylog('verbose', [f"[{pluginName}] Connected to broker"]) - mqtt_connected_to_broker = True # Signal connection - else: + if rc == 0: + mylog('verbose', [f"[{pluginName}] Connected to broker"]) + mqtt_connected_to_broker = True # Signal connection + else: mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {rc}"]) mqtt_connected_to_broker = False @@ -367,10 +398,12 @@ def mqtt_create_client(): version = mqtt.MQTTv5 # we now hardcode the client id into here. - # TODO: Add config ffor client id + # TODO: Add config for client id (atm, we use a fixed client id, + # so only one instance of NetAlertX can connect to the broker at any given time) + # If you intend to run multiple instances simultaneously, make sure to set unique client IDs for each instance. mqtt_client = mqtt.Client( client_id='netalertx', - callback_api_version = mqtt.CallbackAPIVersion.VERSION2, + callback_api_version=mqtt.CallbackAPIVersion.VERSION2, transport=mytransport, protocol=version) mqtt_client.on_connect = on_connect @@ -379,8 +412,8 @@ def mqtt_create_client(): if get_setting_value('MQTT_TLS'): mqtt_client.tls_set() - mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD')) - err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT')) + mqtt_client.username_pw_set(username=get_setting_value('MQTT_USER'), password=get_setting_value('MQTT_PASSWORD')) + err_code = mqtt_client.connect(host=get_setting_value('MQTT_BROKER'), port=get_setting_value('MQTT_PORT')) if (err_code == mqtt.MQTT_ERR_SUCCESS): # We (prematurely) set the connection state to connected # the callback may be delayed @@ -389,36 +422,37 @@ def mqtt_create_client(): # Mosquitto works straight away # EMQX has a delay and does not update in loop below, so we cannot rely on it, we wait 1 sec time.sleep(1) - mqtt_client.loop_start() + mqtt_client.loop_start() return mqtt_client -#------------------------------------------------------------------------------- -def mqtt_start(db): + +# ----------------------------------------------------------------------------- +def mqtt_start(db): global mqtt_client, mqtt_connected_to_broker - if mqtt_connected_to_broker == False: - mqtt_connected_to_broker = True - mqtt_client = mqtt_create_client() + if not mqtt_connected_to_broker: + mqtt_connected_to_broker = True + mqtt_client = mqtt_create_client() deviceName = get_setting_value('MQTT_DEVICE_NAME') - deviceId = get_setting_value('MQTT_DEVICE_ID') - - # General stats + deviceId = get_setting_value('MQTT_DEVICE_ID') + + # General stats # Create a generic device for overal stats - if get_setting_value('MQTT_SEND_STATS') == True: - # Create a new device representing overall stats + if get_setting_value('MQTT_SEND_STATS') == True: + # Create a new device representing overall stats create_generic_device(mqtt_client, deviceId, deviceName) # Get the data - row = get_device_stats(db) + row = get_device_stats(db) # Publish (wrap into {} and remove last ',' from above) - publish_mqtt(mqtt_client, f"{topic_root}/sensor/{deviceId}/state", - { + publish_mqtt(mqtt_client, f"{topic_root}/sensor/{deviceId}/state", + { "online": row[0], "down": row[1], "all": row[2], @@ -429,7 +463,7 @@ def mqtt_start(db): ) # Generate device-specific MQTT messages if enabled - if get_setting_value('MQTT_SEND_DEVICES') == True: + if get_setting_value('MQTT_SEND_DEVICES'): # Specific devices processing @@ -438,37 +472,37 @@ def mqtt_start(db): sec_delay = len(devices) * int(get_setting_value('MQTT_DELAY_SEC'))*5 - mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60,1) , 'min)' ]) + mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60, 1), 'min)']) debug_index = 0 - - for device in devices: + + for device in devices: # # debug statement START 🔻 # if 'Moto' not in device["devName"]: - # mylog('none', [f"[{pluginName}] ALERT - ⚠⚠⚠⚠ DEBUGGING ⚠⚠⚠⚠ - this should not be in uncommented in production"]) + # mylog('none', [f"[{pluginName}] ALERT - ⚠⚠⚠⚠ DEBUGGING ⚠⚠⚠⚠ - this should not be in uncommented in production"]) # continue # # debug statement END 🔺 - + # Create devices in Home Assistant - send config messages deviceId = 'mac_' + device["devMac"].replace(" ", "").replace(":", "_").lower() # Normalize the string and remove unwanted characters - devDisplayName = re.sub('[^a-zA-Z0-9-_\\s]', '', normalize_string(device["devName"])) + devDisplayName = re.sub('[^a-zA-Z0-9-_\\s]', '', normalize_string(device["devName"])) sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'last_ip', 'ip-network', device["devMac"]) sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'mac_address', 'folder-key-network', device["devMac"]) sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'is_new', 'bell-alert-outline', device["devMac"]) - sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'vendor', 'cog', device["devMac"]) + sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'vendor', 'cog', device["devMac"]) sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'first_connection', 'calendar-start', device["devMac"]) sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'last_connection', 'calendar-end', device["devMac"]) - + # handle device_tracker # IMPORTANT: shared payload - device_tracker attributes and individual sensors - devJson = { - "last_ip": device["devLastIP"], - "is_new": str(device["devIsNew"]), - "alert_down": str(device["devAlertDown"]), - "vendor": sanitize_string(device["devVendor"]), + devJson = { + "last_ip": device["devLastIP"], + "is_new": str(device["devIsNew"]), + "alert_down": str(device["devAlertDown"]), + "vendor": sanitize_string(device["devVendor"]), "mac_address": str(device["devMac"]), "model": devDisplayName, "last_connection": prepTimeStamp(str(device["devLastConnection"])), @@ -480,37 +514,36 @@ def mqtt_start(db): "network_parent_name": next((dev["devName"] for dev in devices if dev["devMAC"] == device["devParentMAC"]), "") } - # bulk update device sensors in home assistant + # bulk update device sensors in home assistant publish_mqtt(mqtt_client, sensorConfig.state_topic, devJson) # REQUIRED, DON'T DELETE - + # create and update is_present sensor sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'binary_sensor', 'is_present', 'wifi', device["devMac"]) - publish_mqtt(mqtt_client, sensorConfig.state_topic, - { + publish_mqtt(mqtt_client, sensorConfig.state_topic, + { "is_present": to_binary_sensor(str(device["devPresentLastScan"])) } - ) + ) # handle device_tracker - sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'device_tracker', 'is_home', 'home', device["devMac"]) + sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'device_tracker', 'is_home', 'home', device["devMac"]) # are only valid states state = 'away' if to_binary_sensor(str(device["devPresentLastScan"])) == "ON": state = 'home' - publish_mqtt(mqtt_client, sensorConfig.state_topic, state) - + publish_mqtt(mqtt_client, sensorConfig.state_topic, state) + # publish device_tracker attributes - publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson) + publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson) - -#=============================================================================== +# ============================================================================= # Home Assistant UTILs -#=============================================================================== +# ============================================================================= def to_binary_sensor(input): - # In HA a binary sensor returns ON or OFF + # In HA a binary sensor returns ON or OFF result = "OFF" # bytestring @@ -528,6 +561,7 @@ def to_binary_sensor(input): result = "ON" return result + # ------------------------------------- # Convert to format that is interpretable by Home Assistant def prepTimeStamp(datetime_str): @@ -547,9 +581,7 @@ def prepTimeStamp(datetime_str): # Convert to the required format with 'T' between date and time and ensure the timezone is included return parsed_datetime.isoformat() # This will include the timezone offset + # -------------INIT--------------------- if __name__ == '__main__': sys.exit(main()) - - -