From 7980554924b68d07331e0d633aebae910d4f1d9f Mon Sep 17 00:00:00 2001 From: jokob-sk Date: Mon, 7 Jul 2025 09:05:35 +1000 Subject: [PATCH] small refactor, docs --- docs/UPDATES.md | 34 ++ server/scan/device_handling.py | 743 +++++++++---------------------- server/scan/device_heuristics.py | 320 +++++++++++++ server/scan/session_events.py | 2 +- 4 files changed, 571 insertions(+), 528 deletions(-) create mode 100755 server/scan/device_heuristics.py diff --git a/docs/UPDATES.md b/docs/UPDATES.md index 58cf046a..d2588722 100755 --- a/docs/UPDATES.md +++ b/docs/UPDATES.md @@ -8,6 +8,7 @@ This guide outlines approaches for updating Docker containers, usually when upgr - Manual: Direct commands to stop, remove, and rebuild containers. - Dockcheck: Semi-automated with more control, suited for bulk updates. - Watchtower: Fully automated, runs continuously to check and update containers. +- Portainer: Setup depending, mostly manual with UI. You can choose any approach that fits your workflow. @@ -107,6 +108,39 @@ docker run -d \ ``` +## 4. Portainer controlled image + +This assumes you're using Portainer to manage Docker (or Docker Swarm) and want to pull the latest version of an image and redeploy the container. + +> [!NOTE] +> * Portainer does **not auto-update** containers. For automation, use **Watchtower** or similar tools. +> * Make sure you have the [persistent volumes mounted or backups ready](BACKUPS.md) before recreating. + +### 4.1 Steps to Update an Image in Portainer (Standalone Docker) + +1. **Login to Portainer.** +2. Go to **"Containers"** in the left sidebar. +3. Find the container you want to update, click its name. +4. Click **"Recreate"** (top right). +5. ✅ **Tick**: + * **Pull latest image** (this ensures Portainer fetches the newest version from Docker Hub or your registry). +6. Click **"Recreate"** again. +7. Wait for the container to be stopped, removed, and recreated with the updated image. + +--- + +### 4.2 For Docker Swarm Services + +If you're using Docker Swarm (under **"Stacks"** or **"Services"**): + +1. Go to **"Stacks"**. +2. Select the stack managing the container. +3. Click **"Editor"** (or "Update the Stack"). +4. Add a version tag or use `:latest` if your image tag is `latest` (not recommended for production). +5. Click **"Update the Stack"**. + > ⚠ Portainer will not pull the new image unless the tag changes OR the stack is forced to recreate. +6. If image tag hasn't changed, go to **"Services"**, find the service, and click **"Force Update"**. + ## Summary - Manual: Ideal for individual or critical updates. diff --git a/server/scan/device_handling.py b/server/scan/device_handling.py index e44b8a6e..0c481f3f 100755 --- a/server/scan/device_handling.py +++ b/server/scan/device_handling.py @@ -1,5 +1,4 @@ import sys -from typing import Optional, List, Tuple, Dict import subprocess import conf import os @@ -9,11 +8,12 @@ import re INSTALL_PATH="/app" sys.path.extend([f"{INSTALL_PATH}/server"]) -from helper import timeNowTZ, get_setting, get_setting_value, list_to_where, check_IP_format, sanitize_SQL_input +from helper import timeNowTZ, get_setting_value, list_to_where, check_IP_format, sanitize_SQL_input from logger import mylog from const import vendorsPath, vendorsPathNewest, sql_generateGuid from models.device_instance import DeviceInstance from scan.name_resolution import NameResolver +from scan.device_heuristics import guess_icon, guess_type #------------------------------------------------------------------------------- # Removing devices from the CurrentScan DB table which the user chose to ignore by MAC or IP @@ -45,9 +45,221 @@ def exclude_ignored_devices(db): mylog('debug', f'[New Devices] Excluding Ignored Devices Query: {query}') sql.execute(query) - +#------------------------------------------------------------------------------- +def update_devices_data_from_scan (db): + sql = db.sql #TO-DO + startTime = timeNowTZ().strftime('%Y-%m-%d %H:%M:%S') + + # Update Last Connection + mylog('debug', '[Update Devices] 1 Last Connection') + sql.execute(f"""UPDATE Devices SET devLastConnection = '{startTime}', + devPresentLastScan = 1 + WHERE devPresentLastScan = 0 + AND EXISTS (SELECT 1 FROM CurrentScan + WHERE devMac = cur_MAC) """) + + # Clean no active devices + mylog('debug', '[Update Devices] 2 Clean no active devices') + sql.execute("""UPDATE Devices SET devPresentLastScan = 0 + WHERE NOT EXISTS (SELECT 1 FROM CurrentScan + WHERE devMac = cur_MAC) """) + + # Update IP + mylog('debug', '[Update Devices] - cur_IP -> devLastIP (always updated)') + sql.execute("""UPDATE Devices + SET devLastIP = (SELECT cur_IP FROM CurrentScan + WHERE devMac = cur_MAC) + WHERE EXISTS (SELECT 1 FROM CurrentScan + WHERE devMac = cur_MAC) """) + + # Update only devices with empty, NULL or (u(U)nknown) vendors + mylog('debug', '[Update Devices] - cur_Vendor -> (if empty) devVendor') + sql.execute("""UPDATE Devices + SET devVendor = ( + SELECT cur_Vendor + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + ) + WHERE + (devVendor IS NULL OR devVendor IN ("", "null", "(unknown)", "(Unknown)")) + AND EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + )""") + + # Update only devices with empty or NULL devParentPort + mylog('debug', '[Update Devices] - (if not empty) cur_Port -> devParentPort') + sql.execute("""UPDATE Devices + SET devParentPort = ( + SELECT cur_Port + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + ) + WHERE + (devParentPort IS NULL OR devParentPort IN ("", "null", "(unknown)", "(Unknown)")) + AND + EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + AND CurrentScan.cur_Port IS NOT NULL AND CurrentScan.cur_Port NOT IN ("", "null") + )""") + + # Update only devices with empty or NULL devParentMAC + mylog('debug', '[Update Devices] - (if not empty) cur_NetworkNodeMAC -> devParentMAC') + sql.execute("""UPDATE Devices + SET devParentMAC = ( + SELECT cur_NetworkNodeMAC + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + ) + WHERE + (devParentMAC IS NULL OR devParentMAC IN ("", "null", "(unknown)", "(Unknown)")) + AND + EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + AND CurrentScan.cur_NetworkNodeMAC IS NOT NULL AND CurrentScan.cur_NetworkNodeMAC NOT IN ("", "null") + ) + """) + + + # Update only devices with empty or NULL devSite + mylog('debug', '[Update Devices] - (if not empty) cur_NetworkSite -> (if empty) devSite') + sql.execute("""UPDATE Devices + SET devSite = ( + SELECT cur_NetworkSite + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + ) + WHERE + (devSite IS NULL OR devSite IN ("", "null")) + AND EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + AND CurrentScan.cur_NetworkSite IS NOT NULL AND CurrentScan.cur_NetworkSite NOT IN ("", "null") + )""") + + # Update only devices with empty or NULL devSSID + mylog('debug', '[Update Devices] - (if not empty) cur_SSID -> (if empty) devSSID') + sql.execute("""UPDATE Devices + SET devSSID = ( + SELECT cur_SSID + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + ) + WHERE + (devSSID IS NULL OR devSSID IN ("", "null")) + AND EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + AND CurrentScan.cur_SSID IS NOT NULL AND CurrentScan.cur_SSID NOT IN ("", "null") + )""") + + # Update only devices with empty or NULL devType + mylog('debug', '[Update Devices] - (if not empty) cur_Type -> (if empty) devType') + sql.execute("""UPDATE Devices + SET devType = ( + SELECT cur_Type + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + ) + WHERE + (devType IS NULL OR devType IN ("", "null")) + AND EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE Devices.devMac = CurrentScan.cur_MAC + AND CurrentScan.cur_Type IS NOT NULL AND CurrentScan.cur_Type NOT IN ("", "null") + )""") + + # Update (unknown) or (name not found) Names if available + mylog('debug','[Update Devices] - (if not empty) cur_Name -> (if empty) devName') + sql.execute (""" UPDATE Devices + SET devName = COALESCE(( + SELECT cur_Name + FROM CurrentScan + WHERE cur_MAC = devMac + AND cur_Name IS NOT NULL + AND cur_Name <> 'null' + AND cur_Name <> '' + ), devName) + WHERE (devName IN ('(unknown)', '(name not found)', '') + OR devName IS NULL) + AND EXISTS ( + SELECT 1 + FROM CurrentScan + WHERE cur_MAC = devMac + AND cur_Name IS NOT NULL + AND cur_Name <> 'null' + AND cur_Name <> '' + ) """) + + # Update VENDORS + recordsToUpdate = [] + query = """SELECT * FROM Devices + WHERE devVendor IS NULL OR devVendor IN ("", "null", "(unknown)", "(Unknown)") + """ + + for device in sql.execute (query) : + vendor = query_MAC_vendor (device['devMac']) + if vendor != -1 and vendor != -2 : + recordsToUpdate.append ([vendor, device['devMac']]) + + if len(recordsToUpdate) > 0: + sql.executemany ("UPDATE Devices SET devVendor = ? WHERE devMac = ? ", recordsToUpdate ) + + # Guess ICONS + recordsToUpdate = [] + + default_icon = get_setting_value('NEWDEV_devIcon') + + if get_setting_value('NEWDEV_replace_preset_icon'): + query = f"""SELECT * FROM Devices + WHERE devIcon in ('', 'null', '{default_icon}') + OR devIcon IS NULL""" + else: + query = """SELECT * FROM Devices + WHERE devIcon in ('', 'null') + OR devIcon IS NULL""" + + for device in sql.execute (query) : + # Conditional logic for devIcon guessing + devIcon = guess_icon(device['devVendor'], device['devMac'], device['devLastIP'], device['devName'], default_icon) + + recordsToUpdate.append ([devIcon, device['devMac']]) + + + mylog('debug',f'[Update Devices] recordsToUpdate: {recordsToUpdate}') + + if len(recordsToUpdate) > 0: + sql.executemany ("UPDATE Devices SET devIcon = ? WHERE devMac = ? ", recordsToUpdate ) + + # Guess Type + recordsToUpdate = [] + query = """SELECT * FROM Devices + WHERE devType in ('', 'null') + OR devType IS NULL""" + default_type = get_setting_value('NEWDEV_devType') + + for device in sql.execute (query) : + # Conditional logic for devIcon guessing + devType = guess_type(device['devVendor'], device['devMac'], device['devLastIP'], device['devName'], default_type) + + recordsToUpdate.append ([devType, device['devMac']]) + + if len(recordsToUpdate) > 0: + sql.executemany ("UPDATE Devices SET devType = ? WHERE devMac = ? ", recordsToUpdate ) + + + mylog('debug','[Update Devices] Update devices end') + #------------------------------------------------------------------------------- def save_scanned_devices (db): sql = db.sql #TO-DO @@ -267,218 +479,6 @@ def create_new_devices (db): db.commitDB() -#------------------------------------------------------------------------------- -def update_devices_data_from_scan (db): - sql = db.sql #TO-DO - startTime = timeNowTZ().strftime('%Y-%m-%d %H:%M:%S') - - # Update Last Connection - mylog('debug', '[Update Devices] 1 Last Connection') - sql.execute(f"""UPDATE Devices SET devLastConnection = '{startTime}', - devPresentLastScan = 1 - WHERE devPresentLastScan = 0 - AND EXISTS (SELECT 1 FROM CurrentScan - WHERE devMac = cur_MAC) """) - - # Clean no active devices - mylog('debug', '[Update Devices] 2 Clean no active devices') - sql.execute("""UPDATE Devices SET devPresentLastScan = 0 - WHERE NOT EXISTS (SELECT 1 FROM CurrentScan - WHERE devMac = cur_MAC) """) - - # Update IP - mylog('debug', '[Update Devices] - cur_IP -> devLastIP (always updated)') - sql.execute("""UPDATE Devices - SET devLastIP = (SELECT cur_IP FROM CurrentScan - WHERE devMac = cur_MAC) - WHERE EXISTS (SELECT 1 FROM CurrentScan - WHERE devMac = cur_MAC) """) - - # Update only devices with empty, NULL or (u(U)nknown) vendors - mylog('debug', '[Update Devices] - cur_Vendor -> (if empty) devVendor') - sql.execute("""UPDATE Devices - SET devVendor = ( - SELECT cur_Vendor - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - ) - WHERE - (devVendor IS NULL OR devVendor IN ("", "null", "(unknown)", "(Unknown)")) - AND EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - )""") - - # Update only devices with empty or NULL devParentPort - mylog('debug', '[Update Devices] - (if not empty) cur_Port -> devParentPort') - sql.execute("""UPDATE Devices - SET devParentPort = ( - SELECT cur_Port - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - ) - WHERE - (devParentPort IS NULL OR devParentPort IN ("", "null", "(unknown)", "(Unknown)")) - AND - EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - AND CurrentScan.cur_Port IS NOT NULL AND CurrentScan.cur_Port NOT IN ("", "null") - )""") - - # Update only devices with empty or NULL devParentMAC - mylog('debug', '[Update Devices] - (if not empty) cur_NetworkNodeMAC -> devParentMAC') - sql.execute("""UPDATE Devices - SET devParentMAC = ( - SELECT cur_NetworkNodeMAC - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - ) - WHERE - (devParentMAC IS NULL OR devParentMAC IN ("", "null", "(unknown)", "(Unknown)")) - AND - EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - AND CurrentScan.cur_NetworkNodeMAC IS NOT NULL AND CurrentScan.cur_NetworkNodeMAC NOT IN ("", "null") - ) - """) - - - # Update only devices with empty or NULL devSite - mylog('debug', '[Update Devices] - (if not empty) cur_NetworkSite -> (if empty) devSite') - sql.execute("""UPDATE Devices - SET devSite = ( - SELECT cur_NetworkSite - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - ) - WHERE - (devSite IS NULL OR devSite IN ("", "null")) - AND EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - AND CurrentScan.cur_NetworkSite IS NOT NULL AND CurrentScan.cur_NetworkSite NOT IN ("", "null") - )""") - - # Update only devices with empty or NULL devSSID - mylog('debug', '[Update Devices] - (if not empty) cur_SSID -> (if empty) devSSID') - sql.execute("""UPDATE Devices - SET devSSID = ( - SELECT cur_SSID - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - ) - WHERE - (devSSID IS NULL OR devSSID IN ("", "null")) - AND EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - AND CurrentScan.cur_SSID IS NOT NULL AND CurrentScan.cur_SSID NOT IN ("", "null") - )""") - - # Update only devices with empty or NULL devType - mylog('debug', '[Update Devices] - (if not empty) cur_Type -> (if empty) devType') - sql.execute("""UPDATE Devices - SET devType = ( - SELECT cur_Type - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - ) - WHERE - (devType IS NULL OR devType IN ("", "null")) - AND EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE Devices.devMac = CurrentScan.cur_MAC - AND CurrentScan.cur_Type IS NOT NULL AND CurrentScan.cur_Type NOT IN ("", "null") - )""") - - # Update (unknown) or (name not found) Names if available - mylog('debug','[Update Devices] - (if not empty) cur_Name -> (if empty) devName') - sql.execute (""" UPDATE Devices - SET devName = COALESCE(( - SELECT cur_Name - FROM CurrentScan - WHERE cur_MAC = devMac - AND cur_Name IS NOT NULL - AND cur_Name <> 'null' - AND cur_Name <> '' - ), devName) - WHERE (devName IN ('(unknown)', '(name not found)', '') - OR devName IS NULL) - AND EXISTS ( - SELECT 1 - FROM CurrentScan - WHERE cur_MAC = devMac - AND cur_Name IS NOT NULL - AND cur_Name <> 'null' - AND cur_Name <> '' - ) """) - - # Update VENDORS - recordsToUpdate = [] - query = """SELECT * FROM Devices - WHERE devVendor IS NULL OR devVendor IN ("", "null", "(unknown)", "(Unknown)") - """ - - for device in sql.execute (query) : - vendor = query_MAC_vendor (device['devMac']) - if vendor != -1 and vendor != -2 : - recordsToUpdate.append ([vendor, device['devMac']]) - - if len(recordsToUpdate) > 0: - sql.executemany ("UPDATE Devices SET devVendor = ? WHERE devMac = ? ", recordsToUpdate ) - - # Guess ICONS - recordsToUpdate = [] - - default_icon = get_setting_value('NEWDEV_devIcon') - - if get_setting_value('NEWDEV_replace_preset_icon'): - query = f"""SELECT * FROM Devices - WHERE devIcon in ('', 'null', '{default_icon}') - OR devIcon IS NULL""" - else: - query = """SELECT * FROM Devices - WHERE devIcon in ('', 'null') - OR devIcon IS NULL""" - - for device in sql.execute (query) : - # Conditional logic for devIcon guessing - devIcon = guess_icon(device['devVendor'], device['devMac'], device['devLastIP'], device['devName'], default_icon) - - recordsToUpdate.append ([devIcon, device['devMac']]) - - - mylog('debug',f'[Update Devices] recordsToUpdate: {recordsToUpdate}') - - if len(recordsToUpdate) > 0: - sql.executemany ("UPDATE Devices SET devIcon = ? WHERE devMac = ? ", recordsToUpdate ) - - # Guess Type - recordsToUpdate = [] - query = """SELECT * FROM Devices - WHERE devType in ('', 'null') - OR devType IS NULL""" - default_type = get_setting_value('NEWDEV_devType') - - for device in sql.execute (query) : - # Conditional logic for devIcon guessing - devType = guess_type(device['devVendor'], device['devMac'], device['devLastIP'], device['devName'], default_type) - - recordsToUpdate.append ([devType, device['devMac']]) - - if len(recordsToUpdate) > 0: - sql.executemany ("UPDATE Devices SET devType = ? WHERE devMac = ? ", recordsToUpdate ) - - - mylog('debug','[Update Devices] Update devices end') #------------------------------------------------------------------------------- def update_devices_names(db): @@ -597,12 +597,8 @@ def check_mac_or_internet(input_str): else: return False - -#=============================================================================== -# Lookup unknown vendors on devices -#=============================================================================== - #------------------------------------------------------------------------------- +# Lookup unknown vendors on devices def query_MAC_vendor (pMAC): pMACstr = str(pMAC) @@ -642,311 +638,4 @@ def query_MAC_vendor (pMAC): return -1 -#=============================================================================== -# Icons -#=============================================================================== -#------------------------------------------------------------------------------- -# Base64 encoded HTML strings for FontAwesome icons, now with an extended icons dictionary for broader device coverage -ICONS = { - "globe": "PGkgY2xhc3M9ImZhcyBmYS1nbG9iZSI+PC9pPg==", # Internet or global network - "phone": "PGkgY2xhc3M9ImZhcyBmYS1tb2JpbGUtYWx0Ij48L2k+", # Smartphone - "laptop": "PGkgY2xhc3M9ImZhIGZhLWxhcHRvcCI+PC9pPg==", # Laptop - "printer": "PGkgY2xhc3M9ImZhIGZhLXByaW50ZXIiPjwvaT4=", # Printer - "router": "PGkgY2xhc3M9ImZhcyBmYS1yYW5kb20iPjwvaT4=", # Router or network switch - "tv": "PGkgY2xhc3M9ImZhIGZhLXR2Ij48L2k+", # Television - "desktop": "PGkgY2xhc3M9ImZhIGZhLWRlc2t0b3AiPjwvaT4=", # Desktop PC - "tablet": "PGkgY2xhc3M9ImZhIGZhLXRhYmxldCI+PC9pPg==", # Tablet - "watch": "PGkgY2xhc3M9ImZhcyBmYS1jbG9jayI+PC9pPg==", # Fallback to clock since smartwatch is nonfree in FontAwesome - "camera": "PGkgY2xhc3M9ImZhIGZhLWNhbWVyYSI+PC9pPg==", # Camera or webcam - "home": "PGkgY2xhc3M9ImZhIGZhLWhvbWUiPjwvaT4=", # Smart home device - "apple": "PGkgY2xhc3M9ImZhYiBmYS1hcHBsZSI+PC9pPg==", # Apple device - "ethernet": "PGkgY2xhc3M9ImZhcyBmYS1uZXR3b3JrLXdpcmVkIj48L2k+", # Free alternative for ethernet icon in FontAwesome - "google": "PGkgY2xhc3M9ImZhYiBmYS1nb29nbGUiPjwvaT4=", # Google device - "raspberry": "PGkgY2xhc3M9ImZhYiBmYS1yYXNwYmVycnktcGkiPjwvaT4=", # Raspberry Pi - "microchip": "PGkgY2xhc3M9ImZhcyBmYS1taWNyb2NoaXAiPjwvaT4=", # IoT or embedded device - "server": "PGkgY2xhc3M9ImZhcyBmYS1zZXJ2ZXIiPjwvaT4=", # Server - "gamepad": "PGkgY2xhc3M9ImZhcyBmYS1nYW1lcGFkIj48L2k+", # Gaming console - "lightbulb": "PGkgY2xhc3M9ImZhcyBmYS1saWdodGJ1bGIiPjwvaT4=", # Smart light - "speaker": "PGkgY2xhc3M9ImZhcyBmYS12b2x1bWUtdXAiPjwvaT4=", # Free speaker alt icon for smart speakers in FontAwesome - "lock": "PGkgY2xhc3M9ImZhcyBmYS1sb2NrIj48L2k+", # Security device -} -# Extended device types for comprehensive classification -DEVICE_TYPES = { - "Internet": "Internet Gateway", - "Phone": "Smartphone", - "Laptop": "Laptop", - "Printer": "Printer", - "Router": "Router", - "TV": "Television", - "Desktop": "Desktop PC", - "Tablet": "Tablet", - "Smartwatch": "Smartwatch", - "Camera": "Camera", - "SmartHome": "Smart Home Device", - "Server": "Server", - "GamingConsole": "Gaming Console", - "IoT": "IoT Device", - "NetworkSwitch": "Network Switch", - "AccessPoint": "Access Point", - "SmartLight": "Smart Light", - "SmartSpeaker": "Smart Speaker", - "SecurityDevice": "Security Device", - "Unknown": "Unknown Device", -} - -#------------------------------------------------------------------------------- -# Guess device attributes such as type of device and associated device icon -def guess_device_attributes( - vendor: Optional[str], - mac: Optional[str], - ip: Optional[str], - name: Optional[str], - default_icon: str, - default_type: str - ) -> Tuple[str, str]: - """ - Guess the appropriate FontAwesome icon and device type based on device attributes. - - Args: - vendor: Device vendor name. - mac: Device MAC address. - ip: Device IP address. - name: Device name. - default_icon: Default icon to return if no match is found. - default_type: Default type to return if no match is found. - - Returns: - Tuple[str, str]: A tuple containing the guessed icon (Base64-encoded HTML string) - and the guessed device type (string). - """ - mylog('debug', f"[guess_device_attributes] Guessing attributes for (vendor|mac|ip|name): ('{vendor}'|'{mac}'|'{ip}'|'{name}')") - # Normalize inputs - vendor = str(vendor).lower().strip() if vendor else "unknown" - mac = str(mac).upper().strip() if mac else "00:00:00:00:00:00" - ip = str(ip).strip() if ip else "169.254.0.0" # APIPA address for unknown IPs per RFC 3927 - name = str(name).lower().strip() if name else "(unknown)" - - # --- Icon Guessing Logic --- - if mac == "INTERNET": - icon = ICONS.get("globe", default_icon) - else: - # Vendor-based icon guessing - icon_vendor_patterns = { - "apple": "apple", - "samsung|motorola|xiaomi|huawei": "phone", - "dell|lenovo|asus|acer": "laptop", - "hp|epson|canon|brother": "printer", - "cisco|ubiquiti|netgear|tp-link|d-link|mikrotik": "router", - "lg|samsung electronics|sony|vizio": "tv", - "raspberry pi": "raspberry", - "google": "google", - "espressif|particle": "microchip", - "intel|amd": "desktop", - "amazon": "speaker", - "philips hue|lifx": "lightbulb", - "aruba|meraki": "ethernet", - "qnap|synology": "server", - "nintendo|sony interactive|microsoft": "gamepad", - "ring|blink|arlo": "camera", - "nest": "home", - } - for pattern, icon_key in icon_vendor_patterns.items(): - if re.search(pattern, vendor, re.IGNORECASE): - icon = ICONS.get(icon_key, default_icon) - break - else: - # MAC-based icon guessing - mac_clean = mac.replace(':', '').replace('-', '').upper() - icon_mac_patterns = { - "001A79|B0BE83|BC926B": "apple", - "001B63|BC4C4C": "tablet", - "74ACB9|002468": "ethernet", - "B827EB": "raspberry", - "001422|001874": "desktop", - "001CBF|002186": "server", - } - for pattern_str, icon_key in icon_mac_patterns.items(): - patterns = [p.replace(':', '').replace('-', '').upper() for p in pattern_str.split('|')] - if any(mac_clean.startswith(p) for p in patterns): - icon = ICONS.get(icon_key, default_icon) - break - else: - # Name-based icon guessing - icon_name_patterns = { - "iphone|ipad|macbook|imac": "apple", - "pixel|galaxy|redmi": "phone", - "laptop|notebook": "laptop", - "printer|print": "printer", - "router|gateway|ap|access[ -]?point": "router", - "tv|television|smarttv": "tv", - "desktop|pc|computer": "desktop", - "tablet|pad": "tablet", - "watch|wear": "watch", - "camera|cam|webcam": "camera", - "echo|alexa|dot": "speaker", - "hue|lifx|bulb": "lightbulb", - "server|nas": "server", - "playstation|xbox|switch": "gamepad", - "raspberry|pi": "raspberry", - "google|chromecast|nest": "google", - "doorbell|lock|security": "lock", - } - for pattern, icon_key in icon_name_patterns.items(): - if re.search(pattern, name, re.IGNORECASE): - icon = ICONS.get(icon_key, default_icon) - break - else: - # IP-based icon guessing - icon_ip_patterns = { - r"^192\.168\.[0-1]\.1$": "router", - r"^10\.0\.0\.1$": "router", - r"^192\.168\.[0-1]\.[2-9]$": "desktop", - r"^192\.168\.[0-1]\.1\d{2}$": "phone", - } - for pattern, icon_key in icon_ip_patterns.items(): - if re.match(pattern, ip): - icon = ICONS.get(icon_key, default_icon) - break - else: - icon = default_icon - - # --- Type Guessing Logic --- - if mac == "INTERNET": - type_ = DEVICE_TYPES.get("Internet", default_type) - else: - # Vendor-based type guessing - type_vendor_patterns = { - "apple|samsung|motorola|xiaomi|huawei": "Phone", - "dell|lenovo|asus|acer|hp": "Laptop", - "epson|canon|brother": "Printer", - "cisco|ubiquiti|netgear|tp-link|d-link|mikrotik|aruba|meraki": "Router", - "lg|samsung electronics|sony|vizio": "TV", - "raspberry pi": "IoT", - "google|nest": "SmartHome", - "espressif|particle": "IoT", - "intel|amd": "Desktop", - "amazon": "SmartSpeaker", - "philips hue|lifx": "SmartLight", - "qnap|synology": "Server", - "nintendo|sony interactive|microsoft": "GamingConsole", - "ring|blink|arlo": "Camera", - } - for pattern, type_key in type_vendor_patterns.items(): - if re.search(pattern, vendor, re.IGNORECASE): - type_ = DEVICE_TYPES.get(type_key, default_type) - break - else: - # MAC-based type guessing - mac_clean = mac.replace(':', '').replace('-', '').upper() - type_mac_patterns = { - "00:1A:79|B0:BE:83|BC:92:6B": "Phone", - "00:1B:63|BC:4C:4C": "Tablet", - "74:AC:B9|00:24:68": "AccessPoint", - "B8:27:EB": "IoT", - "00:14:22|00:18:74": "Desktop", - "00:1C:BF|00:21:86": "Server", - } - for pattern_str, type_key in type_mac_patterns.items(): - patterns = [p.replace(':', '').replace('-', '').upper() for p in pattern_str.split('|')] - if any(mac_clean.startswith(p) for p in patterns): - type_ = DEVICE_TYPES.get(type_key, default_type) - break - else: - # Name-based type guessing - type_name_patterns = { - "iphone|ipad": "Phone", - "macbook|imac": "Laptop", - "pixel|galaxy|redmi": "Phone", - "laptop|notebook": "Laptop", - "printer|print": "Printer", - "router|gateway|ap|access[ -]?point": "Router", - "tv|television|smarttv": "TV", - "desktop|pc|computer": "Desktop", - "tablet|pad": "Tablet", - "watch|wear": "Smartwatch", - "camera|cam|webcam": "Camera", - "echo|alexa|dot": "SmartSpeaker", - "hue|lifx|bulb": "SmartLight", - "server|nas": "Server", - "playstation|xbox|switch": "GamingConsole", - "raspberry|pi": "IoT", - "google|chromecast|nest": "SmartHome", - "doorbell|lock|security": "SecurityDevice", - } - for pattern, type_key in type_name_patterns.items(): - if re.search(pattern, name, re.IGNORECASE): - type_ = DEVICE_TYPES.get(type_key, default_type) - break - else: - # IP-based type guessing - type_ip_patterns = { - r"^192\.168\.[0-1]\.1$": "Router", - r"^10\.0\.0\.1$": "Router", - r"^192\.168\.[0-1]\.[2-9]$": "Desktop", - r"^192\.168\.[0-1]\.1\d{2}$": "Phone", - } - for pattern, type_key in type_ip_patterns.items(): - if re.match(pattern, ip): - type_ = DEVICE_TYPES.get(type_key, default_type) - break - else: - type_ = default_type - - return icon, type_ - -# Deprecated functions with redirects (To be removed once all calls for these have been adjusted to use the updated function) -def guess_icon( - vendor: Optional[str], - mac: Optional[str], - ip: Optional[str], - name: Optional[str], - default: str - ) -> str: - """ - [DEPRECATED] Guess the appropriate FontAwesome icon for a device based on its attributes. - Use guess_device_attributes instead. - - Args: - vendor: Device vendor name. - mac: Device MAC address. - ip: Device IP address. - name: Device name. - default: Default icon to return if no match is found. - - Returns: - str: Base64-encoded FontAwesome icon HTML string. - """ - - icon, _ = guess_device_attributes(vendor, mac, ip, name, default, "unknown_type") - return icon - -def guess_type( - vendor: Optional[str], - mac: Optional[str], - ip: Optional[str], - name: Optional[str], - default: str - ) -> str: - """ - [DEPRECATED] Guess the device type based on its attributes. - Use guess_device_attributes instead. - - Args: - vendor: Device vendor name. - mac: Device MAC address. - ip: Device IP address. - name: Device name. - default: Default type to return if no match is found. - - Returns: - str: Device type from DEVICE_TYPES dictionary. - """ - - _, type_ = guess_device_attributes(vendor, mac, ip, name, "unknown_icon", default) - return type_ - -# Handler for when this is run as a program instead of called as a module. -if __name__ == "__main__": - mylog('error', "This module is not intended to be run directly.") - diff --git a/server/scan/device_heuristics.py b/server/scan/device_heuristics.py new file mode 100755 index 00000000..9eedba3b --- /dev/null +++ b/server/scan/device_heuristics.py @@ -0,0 +1,320 @@ +import sys +import re +from typing import Optional, List, Tuple, Dict + +# Register NetAlertX directories +INSTALL_PATH = "/app" +sys.path.extend([f"{INSTALL_PATH}/server"]) + +import conf +from const import * +from logger import mylog +from helper import timeNowTZ, get_setting_value + +#------------------------------------------------------------------------------- +# Base64 encoded HTML strings for FontAwesome icons, now with an extended icons dictionary for broader device coverage +ICONS = { + "globe": "PGkgY2xhc3M9ImZhcyBmYS1nbG9iZSI+PC9pPg==", # Internet or global network + "phone": "PGkgY2xhc3M9ImZhcyBmYS1tb2JpbGUtYWx0Ij48L2k+", # Smartphone + "laptop": "PGkgY2xhc3M9ImZhIGZhLWxhcHRvcCI+PC9pPg==", # Laptop + "printer": "PGkgY2xhc3M9ImZhIGZhLXByaW50ZXIiPjwvaT4=", # Printer + "router": "PGkgY2xhc3M9ImZhcyBmYS1yYW5kb20iPjwvaT4=", # Router or network switch + "tv": "PGkgY2xhc3M9ImZhIGZhLXR2Ij48L2k+", # Television + "desktop": "PGkgY2xhc3M9ImZhIGZhLWRlc2t0b3AiPjwvaT4=", # Desktop PC + "tablet": "PGkgY2xhc3M9ImZhIGZhLXRhYmxldCI+PC9pPg==", # Tablet + "watch": "PGkgY2xhc3M9ImZhcyBmYS1jbG9jayI+PC9pPg==", # Fallback to clock since smartwatch is nonfree in FontAwesome + "camera": "PGkgY2xhc3M9ImZhIGZhLWNhbWVyYSI+PC9pPg==", # Camera or webcam + "home": "PGkgY2xhc3M9ImZhIGZhLWhvbWUiPjwvaT4=", # Smart home device + "apple": "PGkgY2xhc3M9ImZhYiBmYS1hcHBsZSI+PC9pPg==", # Apple device + "ethernet": "PGkgY2xhc3M9ImZhcyBmYS1uZXR3b3JrLXdpcmVkIj48L2k+", # Free alternative for ethernet icon in FontAwesome + "google": "PGkgY2xhc3M9ImZhYiBmYS1nb29nbGUiPjwvaT4=", # Google device + "raspberry": "PGkgY2xhc3M9ImZhYiBmYS1yYXNwYmVycnktcGkiPjwvaT4=", # Raspberry Pi + "microchip": "PGkgY2xhc3M9ImZhcyBmYS1taWNyb2NoaXAiPjwvaT4=", # IoT or embedded device + "server": "PGkgY2xhc3M9ImZhcyBmYS1zZXJ2ZXIiPjwvaT4=", # Server + "gamepad": "PGkgY2xhc3M9ImZhcyBmYS1nYW1lcGFkIj48L2k+", # Gaming console + "lightbulb": "PGkgY2xhc3M9ImZhcyBmYS1saWdodGJ1bGIiPjwvaT4=", # Smart light + "speaker": "PGkgY2xhc3M9ImZhcyBmYS12b2x1bWUtdXAiPjwvaT4=", # Free speaker alt icon for smart speakers in FontAwesome + "lock": "PGkgY2xhc3M9ImZhcyBmYS1sb2NrIj48L2k+", # Security device +} + +# Extended device types for comprehensive classification +DEVICE_TYPES = { + "Internet": "Internet Gateway", + "Phone": "Smartphone", + "Laptop": "Laptop", + "Printer": "Printer", + "Router": "Router", + "TV": "Television", + "Desktop": "Desktop PC", + "Tablet": "Tablet", + "Smartwatch": "Smartwatch", + "Camera": "Camera", + "SmartHome": "Smart Home Device", + "Server": "Server", + "GamingConsole": "Gaming Console", + "IoT": "IoT Device", + "NetworkSwitch": "Network Switch", + "AccessPoint": "Access Point", + "SmartLight": "Smart Light", + "SmartSpeaker": "Smart Speaker", + "SecurityDevice": "Security Device", + "Unknown": "Unknown Device", +} + + + +#------------------------------------------------------------------------------- +# Guess device attributes such as type of device and associated device icon +def guess_device_attributes( + vendor: Optional[str], + mac: Optional[str], + ip: Optional[str], + name: Optional[str], + default_icon: str, + default_type: str + ) -> Tuple[str, str]: + """ + Guess the appropriate FontAwesome icon and device type based on device attributes. + + Args: + vendor: Device vendor name. + mac: Device MAC address. + ip: Device IP address. + name: Device name. + default_icon: Default icon to return if no match is found. + default_type: Default type to return if no match is found. + + Returns: + Tuple[str, str]: A tuple containing the guessed icon (Base64-encoded HTML string) + and the guessed device type (string). + """ + mylog('debug', f"[guess_device_attributes] Guessing attributes for (vendor|mac|ip|name): ('{vendor}'|'{mac}'|'{ip}'|'{name}')") + # Normalize inputs + vendor = str(vendor).lower().strip() if vendor else "unknown" + mac = str(mac).upper().strip() if mac else "00:00:00:00:00:00" + ip = str(ip).strip() if ip else "169.254.0.0" # APIPA address for unknown IPs per RFC 3927 + name = str(name).lower().strip() if name else "(unknown)" + + # --- Icon Guessing Logic --- + if mac == "INTERNET": + icon = ICONS.get("globe", default_icon) + else: + # Vendor-based icon guessing + icon_vendor_patterns = { + "apple": "apple", + "samsung|motorola|xiaomi|huawei": "phone", + "dell|lenovo|asus|acer": "laptop", + "hp|epson|canon|brother": "printer", + "cisco|ubiquiti|netgear|tp-link|d-link|mikrotik": "router", + "lg|samsung electronics|sony|vizio": "tv", + "raspberry pi": "raspberry", + "google": "google", + "espressif|particle": "microchip", + "intel|amd": "desktop", + "amazon": "speaker", + "philips hue|lifx": "lightbulb", + "aruba|meraki": "ethernet", + "qnap|synology": "server", + "nintendo|sony interactive|microsoft": "gamepad", + "ring|blink|arlo": "camera", + "nest": "home", + } + for pattern, icon_key in icon_vendor_patterns.items(): + if re.search(pattern, vendor, re.IGNORECASE): + icon = ICONS.get(icon_key, default_icon) + break + else: + # MAC-based icon guessing + mac_clean = mac.replace(':', '').replace('-', '').upper() + icon_mac_patterns = { + "001A79|B0BE83|BC926B": "apple", + "001B63|BC4C4C": "tablet", + "74ACB9|002468": "ethernet", + "B827EB": "raspberry", + "001422|001874": "desktop", + "001CBF|002186": "server", + } + for pattern_str, icon_key in icon_mac_patterns.items(): + patterns = [p.replace(':', '').replace('-', '').upper() for p in pattern_str.split('|')] + if any(mac_clean.startswith(p) for p in patterns): + icon = ICONS.get(icon_key, default_icon) + break + else: + # Name-based icon guessing + icon_name_patterns = { + "iphone|ipad|macbook|imac": "apple", + "pixel|galaxy|redmi": "phone", + "laptop|notebook": "laptop", + "printer|print": "printer", + "router|gateway|ap|access[ -]?point": "router", + "tv|television|smarttv": "tv", + "desktop|pc|computer": "desktop", + "tablet|pad": "tablet", + "watch|wear": "watch", + "camera|cam|webcam": "camera", + "echo|alexa|dot": "speaker", + "hue|lifx|bulb": "lightbulb", + "server|nas": "server", + "playstation|xbox|switch": "gamepad", + "raspberry|pi": "raspberry", + "google|chromecast|nest": "google", + "doorbell|lock|security": "lock", + } + for pattern, icon_key in icon_name_patterns.items(): + if re.search(pattern, name, re.IGNORECASE): + icon = ICONS.get(icon_key, default_icon) + break + else: + # IP-based icon guessing + icon_ip_patterns = { + r"^192\.168\.[0-1]\.1$": "router", + r"^10\.0\.0\.1$": "router", + r"^192\.168\.[0-1]\.[2-9]$": "desktop", + r"^192\.168\.[0-1]\.1\d{2}$": "phone", + } + for pattern, icon_key in icon_ip_patterns.items(): + if re.match(pattern, ip): + icon = ICONS.get(icon_key, default_icon) + break + else: + icon = default_icon + + # --- Type Guessing Logic --- + if mac == "INTERNET": + type_ = DEVICE_TYPES.get("Internet", default_type) + else: + # Vendor-based type guessing + type_vendor_patterns = { + "apple|samsung|motorola|xiaomi|huawei": "Phone", + "dell|lenovo|asus|acer|hp": "Laptop", + "epson|canon|brother": "Printer", + "cisco|ubiquiti|netgear|tp-link|d-link|mikrotik|aruba|meraki": "Router", + "lg|samsung electronics|sony|vizio": "TV", + "raspberry pi": "IoT", + "google|nest": "SmartHome", + "espressif|particle": "IoT", + "intel|amd": "Desktop", + "amazon": "SmartSpeaker", + "philips hue|lifx": "SmartLight", + "qnap|synology": "Server", + "nintendo|sony interactive|microsoft": "GamingConsole", + "ring|blink|arlo": "Camera", + } + for pattern, type_key in type_vendor_patterns.items(): + if re.search(pattern, vendor, re.IGNORECASE): + type_ = DEVICE_TYPES.get(type_key, default_type) + break + else: + # MAC-based type guessing + mac_clean = mac.replace(':', '').replace('-', '').upper() + type_mac_patterns = { + "00:1A:79|B0:BE:83|BC:92:6B": "Phone", + "00:1B:63|BC:4C:4C": "Tablet", + "74:AC:B9|00:24:68": "AccessPoint", + "B8:27:EB": "IoT", + "00:14:22|00:18:74": "Desktop", + "00:1C:BF|00:21:86": "Server", + } + for pattern_str, type_key in type_mac_patterns.items(): + patterns = [p.replace(':', '').replace('-', '').upper() for p in pattern_str.split('|')] + if any(mac_clean.startswith(p) for p in patterns): + type_ = DEVICE_TYPES.get(type_key, default_type) + break + else: + # Name-based type guessing + type_name_patterns = { + "iphone|ipad": "Phone", + "macbook|imac": "Laptop", + "pixel|galaxy|redmi": "Phone", + "laptop|notebook": "Laptop", + "printer|print": "Printer", + "router|gateway|ap|access[ -]?point": "Router", + "tv|television|smarttv": "TV", + "desktop|pc|computer": "Desktop", + "tablet|pad": "Tablet", + "watch|wear": "Smartwatch", + "camera|cam|webcam": "Camera", + "echo|alexa|dot": "SmartSpeaker", + "hue|lifx|bulb": "SmartLight", + "server|nas": "Server", + "playstation|xbox|switch": "GamingConsole", + "raspberry|pi": "IoT", + "google|chromecast|nest": "SmartHome", + "doorbell|lock|security": "SecurityDevice", + } + for pattern, type_key in type_name_patterns.items(): + if re.search(pattern, name, re.IGNORECASE): + type_ = DEVICE_TYPES.get(type_key, default_type) + break + else: + # IP-based type guessing + type_ip_patterns = { + r"^192\.168\.[0-1]\.1$": "Router", + r"^10\.0\.0\.1$": "Router", + r"^192\.168\.[0-1]\.[2-9]$": "Desktop", + r"^192\.168\.[0-1]\.1\d{2}$": "Phone", + } + for pattern, type_key in type_ip_patterns.items(): + if re.match(pattern, ip): + type_ = DEVICE_TYPES.get(type_key, default_type) + break + else: + type_ = default_type + + return icon, type_ + +# Deprecated functions with redirects (To be removed once all calls for these have been adjusted to use the updated function) +def guess_icon( + vendor: Optional[str], + mac: Optional[str], + ip: Optional[str], + name: Optional[str], + default: str + ) -> str: + """ + [DEPRECATED] Guess the appropriate FontAwesome icon for a device based on its attributes. + Use guess_device_attributes instead. + + Args: + vendor: Device vendor name. + mac: Device MAC address. + ip: Device IP address. + name: Device name. + default: Default icon to return if no match is found. + + Returns: + str: Base64-encoded FontAwesome icon HTML string. + """ + + icon, _ = guess_device_attributes(vendor, mac, ip, name, default, "unknown_type") + return icon + +def guess_type( + vendor: Optional[str], + mac: Optional[str], + ip: Optional[str], + name: Optional[str], + default: str + ) -> str: + """ + [DEPRECATED] Guess the device type based on its attributes. + Use guess_device_attributes instead. + + Args: + vendor: Device vendor name. + mac: Device MAC address. + ip: Device IP address. + name: Device name. + default: Default type to return if no match is found. + + Returns: + str: Device type from DEVICE_TYPES dictionary. + """ + + _, type_ = guess_device_attributes(vendor, mac, ip, name, "unknown_icon", default) + return type_ + +# Handler for when this is run as a program instead of called as a module. +if __name__ == "__main__": + mylog('error', "This module is not intended to be run directly.") + \ No newline at end of file diff --git a/server/scan/session_events.py b/server/scan/session_events.py index 855d4b86..bd0b166a 100755 --- a/server/scan/session_events.py +++ b/server/scan/session_events.py @@ -5,7 +5,7 @@ INSTALL_PATH="/app" sys.path.extend([f"{INSTALL_PATH}/server"]) import conf -from scan.device_handling import create_new_devices, print_scan_stats, save_scanned_devices, update_devices_data_from_scan, exclude_ignored_devices +from scan.device_handling import create_new_devices, print_scan_stats, save_scanned_devices, exclude_ignored_devices, update_devices_data_from_scan from helper import timeNowTZ, print_table_schema, get_setting_value from logger import mylog, Logger from messaging.reporting import skip_repeated_notifications