mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-26 00:18:20 -05:00
704 lines
25 KiB
Python
Executable File
704 lines
25 KiB
Python
Executable File
import os
|
|
import base64
|
|
import re
|
|
import sqlite3
|
|
import csv
|
|
from io import StringIO
|
|
from front.plugins.plugin_helper import is_mac
|
|
from logger import mylog
|
|
from models.plugin_object_instance import PluginObjectInstance
|
|
from database import get_temp_db_connection
|
|
from db.db_helper import get_table_json, get_device_condition_by_status, row_to_json, get_date_from_period
|
|
from helper import is_random_mac, get_setting_value
|
|
from utils.datetime_utils import timeNowDB, format_date
|
|
|
|
|
|
class DeviceInstance:
|
|
|
|
# --- helpers --------------------------------------------------------------
|
|
def _fetchall(self, query, params=()):
|
|
conn = get_temp_db_connection()
|
|
rows = conn.execute(query, params).fetchall()
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
def _fetchone(self, query, params=()):
|
|
conn = get_temp_db_connection()
|
|
row = conn.execute(query, params).fetchone()
|
|
conn.close()
|
|
return dict(row) if row else None
|
|
|
|
def _execute(self, query, params=()):
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute(query, params)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# --- public API -----------------------------------------------------------
|
|
def getAll(self):
|
|
return self._fetchall("SELECT * FROM Devices")
|
|
|
|
def getUnknown(self):
|
|
return self._fetchall("""
|
|
SELECT * FROM Devices
|
|
WHERE devName IN ("(unknown)", "(name not found)", "")
|
|
""")
|
|
|
|
def getValueWithMac(self, column_name, devMac):
|
|
row = self._fetchone(f"""
|
|
SELECT {column_name} FROM Devices WHERE devMac = ?
|
|
""", (devMac,))
|
|
return row.get(column_name) if row else None
|
|
|
|
def getDown(self):
|
|
return self._fetchall("""
|
|
SELECT * FROM Devices
|
|
WHERE devAlertDown = 1 AND devPresentLastScan = 0
|
|
""")
|
|
|
|
def getOffline(self):
|
|
return self._fetchall("""
|
|
SELECT * FROM Devices
|
|
WHERE devPresentLastScan = 0
|
|
""")
|
|
|
|
def getByGUID(self, devGUID):
|
|
return self._fetchone("""
|
|
SELECT * FROM Devices WHERE devGUID = ?
|
|
""", (devGUID,))
|
|
|
|
def exists(self, devGUID):
|
|
row = self._fetchone("""
|
|
SELECT COUNT(*) as count FROM Devices WHERE devGUID = ?
|
|
""", (devGUID,))
|
|
return row['count'] > 0 if row else False
|
|
|
|
def getByIP(self, ip):
|
|
return self._fetchone("""
|
|
SELECT * FROM Devices WHERE devLastIP = ?
|
|
""", (ip,))
|
|
|
|
def search(self, query):
|
|
like = f"%{query}%"
|
|
return self._fetchall("""
|
|
SELECT * FROM Devices
|
|
WHERE devMac LIKE ? OR devName LIKE ? OR devLastIP LIKE ?
|
|
""", (like, like, like))
|
|
|
|
def getLatest(self):
|
|
return self._fetchone("""
|
|
SELECT * FROM Devices
|
|
ORDER BY devFirstConnection DESC LIMIT 1
|
|
""")
|
|
|
|
def getFavorite(self):
|
|
return self._fetchall("""
|
|
SELECT * FROM Devices
|
|
WHERE devFavorite = 1
|
|
""")
|
|
|
|
def getNetworkTopology(self):
|
|
rows = self._fetchall("""
|
|
SELECT devName, devMac, devParentMAC, devParentPort, devVendor FROM Devices
|
|
""")
|
|
nodes = [{"id": r["devMac"], "name": r["devName"], "vendor": r["devVendor"]} for r in rows]
|
|
links = [{"source": r["devParentMAC"], "target": r["devMac"], "port": r["devParentPort"]}
|
|
for r in rows if r["devParentMAC"]]
|
|
return {"nodes": nodes, "links": links}
|
|
|
|
def updateField(self, devGUID, field, value):
|
|
if not self.exists(devGUID):
|
|
msg = f"[Device] updateField: GUID {devGUID} not found"
|
|
mylog("none", msg)
|
|
raise ValueError(msg)
|
|
self._execute(f"UPDATE Devices SET {field}=? WHERE devGUID=?", (value, devGUID))
|
|
|
|
def delete(self, devGUID):
|
|
if not self.exists(devGUID):
|
|
msg = f"[Device] delete: GUID {devGUID} not found"
|
|
mylog("none", msg)
|
|
raise ValueError(msg)
|
|
self._execute("DELETE FROM Devices WHERE devGUID=?", (devGUID,))
|
|
|
|
def resolvePrimaryID(self, target):
|
|
if is_mac(target):
|
|
return target.lower()
|
|
dev = self.getByIP(target)
|
|
return dev['devMac'].lower() if dev else None
|
|
|
|
def getOpenPorts(self, target):
|
|
primary = self.resolvePrimaryID(target)
|
|
if not primary:
|
|
return []
|
|
|
|
objs = PluginObjectInstance().getByField(
|
|
plugPrefix='NMAP',
|
|
matchedColumn='Object_PrimaryID',
|
|
matchedKey=primary,
|
|
returnFields=['Object_SecondaryID', 'Watched_Value2']
|
|
)
|
|
|
|
ports = []
|
|
for o in objs:
|
|
|
|
port = int(o.get('Object_SecondaryID') or 0)
|
|
|
|
ports.append({"port": port, "service": o.get('Watched_Value2', '')})
|
|
|
|
return ports
|
|
|
|
# --- devices_endpoint.py methods (HTTP response layer) -------------------
|
|
|
|
def getAll_AsResponse(self):
|
|
"""Return all devices as raw data (not jsonified)."""
|
|
return self.getAll()
|
|
|
|
def deleteDevices(self, macs):
|
|
"""
|
|
Delete devices from the Devices table.
|
|
- If `macs` is None → delete ALL devices.
|
|
- If `macs` is a list → delete only matching MACs (supports wildcard '*').
|
|
"""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
if not macs:
|
|
# No MACs provided → delete all
|
|
cur.execute("DELETE FROM Devices")
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True, "deleted": "all"}
|
|
|
|
deleted_count = 0
|
|
|
|
for mac in macs:
|
|
if "*" in mac:
|
|
# Wildcard matching
|
|
sql_pattern = mac.replace("*", "%")
|
|
cur.execute("DELETE FROM Devices WHERE devMAC LIKE ?", (sql_pattern,))
|
|
else:
|
|
# Exact match
|
|
cur.execute("DELETE FROM Devices WHERE devMAC = ?", (mac,))
|
|
deleted_count += cur.rowcount
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "deleted_count": deleted_count}
|
|
|
|
def deleteAllWithEmptyMacs(self):
|
|
"""Delete devices with empty MAC addresses."""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM Devices WHERE devMAC IS NULL OR devMAC = ''")
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True, "deleted": deleted}
|
|
|
|
def deleteUnknownDevices(self):
|
|
"""Delete devices marked as unknown."""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""DELETE FROM Devices WHERE devName='(unknown)' OR devName='(name not found)'"""
|
|
)
|
|
deleted = cur.rowcount
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True, "deleted": deleted}
|
|
|
|
def exportDevices(self, export_format):
|
|
"""
|
|
Export devices from the Devices table in the desired format.
|
|
"""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Fetch all devices
|
|
devices_json = get_table_json(cur, "SELECT * FROM Devices")
|
|
conn.close()
|
|
|
|
# Ensure columns exist
|
|
columns = devices_json.columnNames or (
|
|
list(devices_json["data"][0].keys()) if devices_json["data"] else []
|
|
)
|
|
|
|
if export_format == "json":
|
|
return {
|
|
"format": "json",
|
|
"data": [row for row in devices_json["data"]],
|
|
"columns": list(columns)
|
|
}
|
|
elif export_format == "csv":
|
|
si = StringIO()
|
|
writer = csv.DictWriter(si, fieldnames=columns, quoting=csv.QUOTE_ALL)
|
|
writer.writeheader()
|
|
for row in devices_json.json["data"]:
|
|
writer.writerow(row)
|
|
return {
|
|
"format": "csv",
|
|
"content": si.getvalue(),
|
|
}
|
|
else:
|
|
return {"error": f"Unsupported format '{export_format}'"}
|
|
|
|
def importCSV(self, file_storage=None, json_content=None):
|
|
"""
|
|
Import devices from CSV.
|
|
- json_content: base64-encoded CSV string
|
|
- file_storage: uploaded file object
|
|
- fallback: read from config/devices.csv
|
|
"""
|
|
data = ""
|
|
skipped = []
|
|
|
|
# 1. Try JSON `content` (base64-encoded CSV)
|
|
if json_content:
|
|
try:
|
|
data = base64.b64decode(json_content, validate=True).decode("utf-8")
|
|
except Exception as e:
|
|
return {"success": False, "error": f"Base64 decode failed: {e}"}
|
|
|
|
# 2. Otherwise, try uploaded file
|
|
elif file_storage:
|
|
try:
|
|
data = file_storage.read().decode("utf-8")
|
|
except Exception as e:
|
|
return {"success": False, "error": f"File read failed: {e}"}
|
|
|
|
# 3. Fallback: try local file (same as PHP `$file = '../../../config/devices.csv';`)
|
|
else:
|
|
config_root = os.environ.get("NETALERTX_CONFIG", "/data/config")
|
|
local_file = os.path.join(config_root, "devices.csv")
|
|
try:
|
|
with open(local_file, "r", encoding="utf-8") as f:
|
|
data = f.read()
|
|
except FileNotFoundError:
|
|
return {"success": False, "error": "CSV file missing"}
|
|
|
|
if not data:
|
|
return {"success": False, "error": "No CSV data found"}
|
|
|
|
# --- Clean up newlines inside quoted fields ---
|
|
data = re.sub(r'"([^"]*)"', lambda m: m.group(0).replace("\n", " "), data)
|
|
|
|
# --- Parse CSV ---
|
|
lines = data.splitlines()
|
|
reader = csv.reader(lines)
|
|
try:
|
|
header = [h.strip() for h in next(reader)]
|
|
except StopIteration:
|
|
return {"success": False, "error": "CSV missing header"}
|
|
|
|
# --- Wipe Devices table ---
|
|
conn = get_temp_db_connection()
|
|
sql = conn.cursor()
|
|
sql.execute("DELETE FROM Devices")
|
|
|
|
# --- Prepare insert ---
|
|
placeholders = ",".join(["?"] * len(header))
|
|
insert_sql = f"INSERT INTO Devices ({', '.join(header)}) VALUES ({placeholders})"
|
|
|
|
row_count = 0
|
|
for idx, row in enumerate(reader, start=1):
|
|
if len(row) != len(header):
|
|
skipped.append(idx)
|
|
continue
|
|
try:
|
|
sql.execute(insert_sql, [col.strip() for col in row])
|
|
row_count += 1
|
|
except sqlite3.Error as e:
|
|
mylog("error", [f"[ImportCSV] SQL ERROR row {idx}: {e}"])
|
|
skipped.append(idx)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "inserted": row_count, "skipped_lines": skipped}
|
|
|
|
def getTotals(self):
|
|
"""Get device totals by status."""
|
|
conn = get_temp_db_connection()
|
|
sql = conn.cursor()
|
|
|
|
# Build a combined query with sub-selects for each status
|
|
query = f"""
|
|
SELECT
|
|
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("my")}) AS devices,
|
|
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("connected")}) AS connected,
|
|
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("favorites")}) AS favorites,
|
|
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("new")}) AS new,
|
|
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("down")}) AS down,
|
|
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("archived")}) AS archived
|
|
"""
|
|
sql.execute(query)
|
|
row = sql.fetchone()
|
|
conn.close()
|
|
|
|
return list(row) if row else []
|
|
|
|
def getByStatus(self, status=None):
|
|
"""
|
|
Return devices filtered by status. Returns all if no status provided.
|
|
Possible statuses: my, connected, favorites, new, down, archived
|
|
"""
|
|
conn = get_temp_db_connection()
|
|
sql = conn.cursor()
|
|
|
|
# Build condition for SQL
|
|
condition = get_device_condition_by_status(status) if status else ""
|
|
|
|
query = f"SELECT * FROM Devices {condition}"
|
|
sql.execute(query)
|
|
|
|
table_data = []
|
|
for row in sql.fetchall():
|
|
r = dict(row) # Convert sqlite3.Row to dict for .get()
|
|
dev_name = r.get("devName", "")
|
|
if r.get("devFavorite") == 1:
|
|
dev_name = f'<span class="text-yellow">★</span> {dev_name}'
|
|
|
|
# Start with all fields from the device record
|
|
device_record = r.copy()
|
|
# Override with formatted fields
|
|
device_record["id"] = r.get("devMac", "")
|
|
device_record["title"] = dev_name
|
|
device_record["favorite"] = r.get("devFavorite", 0)
|
|
|
|
table_data.append(device_record)
|
|
|
|
conn.close()
|
|
return table_data
|
|
|
|
# --- device_endpoint.py methods -------------------------------------------
|
|
|
|
def getDeviceData(self, mac, period=""):
|
|
"""Fetch device info with children, event stats, and presence calculation."""
|
|
now = timeNowDB()
|
|
|
|
# Special case for new device
|
|
if mac.lower() == "new":
|
|
device_data = {
|
|
"devMac": "",
|
|
"devName": "",
|
|
"devOwner": "",
|
|
"devType": "",
|
|
"devVendor": "",
|
|
"devFavorite": 0,
|
|
"devGroup": "",
|
|
"devComments": "",
|
|
"devFirstConnection": now,
|
|
"devLastConnection": now,
|
|
"devLastIP": "",
|
|
"devStaticIP": 0,
|
|
"devScan": 0,
|
|
"devLogEvents": 0,
|
|
"devAlertEvents": 0,
|
|
"devAlertDown": 0,
|
|
"devParentRelType": "default",
|
|
"devReqNicsOnline": 0,
|
|
"devSkipRepeated": 0,
|
|
"devLastNotification": "",
|
|
"devPresentLastScan": 0,
|
|
"devIsNew": 1,
|
|
"devLocation": "",
|
|
"devIsArchived": 0,
|
|
"devParentMAC": "",
|
|
"devParentPort": "",
|
|
"devIcon": "",
|
|
"devGUID": "",
|
|
"devSite": "",
|
|
"devSSID": "",
|
|
"devSyncHubNode": "",
|
|
"devSourcePlugin": "",
|
|
"devCustomProps": "",
|
|
"devStatus": "Unknown",
|
|
"devIsRandomMAC": False,
|
|
"devSessions": 0,
|
|
"devEvents": 0,
|
|
"devDownAlerts": 0,
|
|
"devPresenceHours": 0,
|
|
"devFQDN": "",
|
|
}
|
|
return device_data
|
|
|
|
# Compute period date for sessions/events
|
|
period_date_sql = get_date_from_period(period)
|
|
|
|
# Fetch device info + computed fields
|
|
sql = f"""
|
|
SELECT
|
|
d.*,
|
|
CASE
|
|
WHEN d.devAlertDown != 0 AND d.devPresentLastScan = 0 THEN 'Down'
|
|
WHEN d.devPresentLastScan = 1 THEN 'On-line'
|
|
ELSE 'Off-line'
|
|
END AS devStatus,
|
|
|
|
(SELECT COUNT(*) FROM Sessions
|
|
WHERE ses_MAC = d.devMac AND (
|
|
ses_DateTimeConnection >= {period_date_sql} OR
|
|
ses_DateTimeDisconnection >= {period_date_sql} OR
|
|
ses_StillConnected = 1
|
|
)) AS devSessions,
|
|
|
|
(SELECT COUNT(*) FROM Events
|
|
WHERE eve_MAC = d.devMac AND eve_DateTime >= {period_date_sql}
|
|
AND eve_EventType NOT IN ('Connected','Disconnected')) AS devEvents,
|
|
|
|
(SELECT COUNT(*) FROM Events
|
|
WHERE eve_MAC = d.devMac AND eve_DateTime >= {period_date_sql}
|
|
AND eve_EventType = 'Device Down') AS devDownAlerts,
|
|
|
|
(SELECT CAST(MAX(0, SUM(
|
|
julianday(IFNULL(ses_DateTimeDisconnection,'{now}')) -
|
|
julianday(CASE WHEN ses_DateTimeConnection < {period_date_sql}
|
|
THEN {period_date_sql} ELSE ses_DateTimeConnection END)
|
|
) * 24) AS INT)
|
|
FROM Sessions
|
|
WHERE ses_MAC = d.devMac
|
|
AND ses_DateTimeConnection IS NOT NULL
|
|
AND (ses_DateTimeDisconnection IS NOT NULL OR ses_StillConnected = 1)
|
|
AND (ses_DateTimeConnection >= {period_date_sql}
|
|
OR ses_DateTimeDisconnection >= {period_date_sql} OR ses_StillConnected = 1)
|
|
) AS devPresenceHours
|
|
|
|
FROM Devices d
|
|
WHERE d.devMac = ? OR CAST(d.rowid AS TEXT) = ?
|
|
"""
|
|
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (mac, mac))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
conn.close()
|
|
return None
|
|
|
|
device_data = row_to_json(list(row.keys()), row)
|
|
device_data["devFirstConnection"] = format_date(device_data["devFirstConnection"])
|
|
device_data["devLastConnection"] = format_date(device_data["devLastConnection"])
|
|
device_data["devIsRandomMAC"] = is_random_mac(device_data["devMac"])
|
|
|
|
# Fetch children
|
|
cur.execute(
|
|
"SELECT * FROM Devices WHERE devParentMAC = ? ORDER BY devPresentLastScan DESC",
|
|
(device_data["devMac"],),
|
|
)
|
|
children_rows = cur.fetchall()
|
|
children = [row_to_json(list(r.keys()), r) for r in children_rows]
|
|
children_nics = [c for c in children if c.get("devParentRelType") == "nic"]
|
|
|
|
device_data["devChildrenDynamic"] = children
|
|
device_data["devChildrenNicsDynamic"] = children_nics
|
|
|
|
conn.close()
|
|
return device_data
|
|
|
|
def setDeviceData(self, mac, data):
|
|
"""Update or create a device."""
|
|
try:
|
|
if data.get("createNew", False):
|
|
sql = """
|
|
INSERT INTO Devices (
|
|
devMac, devName, devOwner, devType, devVendor, devIcon,
|
|
devFavorite, devGroup, devLocation, devComments,
|
|
devParentMAC, devParentPort, devSSID, devSite,
|
|
devStaticIP, devScan, devAlertEvents, devAlertDown,
|
|
devParentRelType, devReqNicsOnline, devSkipRepeated,
|
|
devIsNew, devIsArchived, devLastConnection,
|
|
devFirstConnection, devLastIP, devGUID, devCustomProps,
|
|
devSourcePlugin
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
values = (
|
|
mac,
|
|
data.get("devName", ""),
|
|
data.get("devOwner", ""),
|
|
data.get("devType", ""),
|
|
data.get("devVendor", ""),
|
|
data.get("devIcon", ""),
|
|
data.get("devFavorite", 0),
|
|
data.get("devGroup", ""),
|
|
data.get("devLocation", ""),
|
|
data.get("devComments", ""),
|
|
data.get("devParentMAC", ""),
|
|
data.get("devParentPort", ""),
|
|
data.get("devSSID", ""),
|
|
data.get("devSite", ""),
|
|
data.get("devStaticIP", 0),
|
|
data.get("devScan", 0),
|
|
data.get("devAlertEvents", 0),
|
|
data.get("devAlertDown", 0),
|
|
data.get("devParentRelType", "default"),
|
|
data.get("devReqNicsOnline", 0),
|
|
data.get("devSkipRepeated", 0),
|
|
data.get("devIsNew", 0),
|
|
data.get("devIsArchived", 0),
|
|
data.get("devLastConnection", timeNowDB()),
|
|
data.get("devFirstConnection", timeNowDB()),
|
|
data.get("devLastIP", ""),
|
|
data.get("devGUID", ""),
|
|
data.get("devCustomProps", ""),
|
|
data.get("devSourcePlugin", "DUMMY"),
|
|
)
|
|
|
|
else:
|
|
sql = """
|
|
UPDATE Devices SET
|
|
devName=?, devOwner=?, devType=?, devVendor=?, devIcon=?,
|
|
devFavorite=?, devGroup=?, devLocation=?, devComments=?,
|
|
devParentMAC=?, devParentPort=?, devSSID=?, devSite=?,
|
|
devStaticIP=?, devScan=?, devAlertEvents=?, devAlertDown=?,
|
|
devParentRelType=?, devReqNicsOnline=?, devSkipRepeated=?,
|
|
devIsNew=?, devIsArchived=?, devCustomProps=?
|
|
WHERE devMac=?
|
|
"""
|
|
values = (
|
|
data.get("devName", ""),
|
|
data.get("devOwner", ""),
|
|
data.get("devType", ""),
|
|
data.get("devVendor", ""),
|
|
data.get("devIcon", ""),
|
|
data.get("devFavorite", 0),
|
|
data.get("devGroup", ""),
|
|
data.get("devLocation", ""),
|
|
data.get("devComments", ""),
|
|
data.get("devParentMAC", ""),
|
|
data.get("devParentPort", ""),
|
|
data.get("devSSID", ""),
|
|
data.get("devSite", ""),
|
|
data.get("devStaticIP", 0),
|
|
data.get("devScan", 0),
|
|
data.get("devAlertEvents", 0),
|
|
data.get("devAlertDown", 0),
|
|
data.get("devParentRelType", "default"),
|
|
data.get("devReqNicsOnline", 0),
|
|
data.get("devSkipRepeated", 0),
|
|
data.get("devIsNew", 0),
|
|
data.get("devIsArchived", 0),
|
|
data.get("devCustomProps", ""),
|
|
mac,
|
|
)
|
|
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute(sql, values)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
mylog("debug", f"[DeviceInstance] setDeviceData SQL: {sql.strip()}")
|
|
mylog("debug", f"[DeviceInstance] setDeviceData VALUES:{values}")
|
|
|
|
return {"success": True}
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
|
|
# Optional: your existing logger
|
|
mylog("none", f"[DeviceInstance] setDeviceData({mac}) failed: {e}")
|
|
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def deleteDeviceByMAC(self, mac):
|
|
"""Delete a device by MAC."""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM Devices WHERE devMac=?", (mac,))
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True}
|
|
|
|
def deleteDeviceEvents(self, mac):
|
|
"""Delete all events for a device."""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM Events WHERE eve_MAC=?", (mac,))
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True}
|
|
|
|
def resetDeviceProps(self, mac):
|
|
"""Reset device custom properties to default."""
|
|
default_props = get_setting_value("NEWDEV_devCustomProps")
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"UPDATE Devices SET devCustomProps=? WHERE devMac=?",
|
|
(default_props, mac),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True}
|
|
|
|
def updateDeviceColumn(self, mac, column_name, column_value):
|
|
"""Update a specific column for a given device."""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Build safe SQL with column name
|
|
sql = f"UPDATE Devices SET {column_name}=? WHERE devMac=?"
|
|
cur.execute(sql, (column_value, mac))
|
|
conn.commit()
|
|
|
|
if cur.rowcount > 0:
|
|
result = {"success": True}
|
|
else:
|
|
result = {"success": False, "error": "Device not found"}
|
|
|
|
conn.close()
|
|
return result
|
|
|
|
def copyDevice(self, mac_from, mac_to):
|
|
"""Copy a device entry from one MAC to another."""
|
|
conn = get_temp_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Drop temporary table if exists
|
|
cur.execute("DROP TABLE IF EXISTS temp_devices")
|
|
|
|
# Create temporary table with source device
|
|
cur.execute(
|
|
"CREATE TABLE temp_devices AS SELECT * FROM Devices WHERE devMac = ?",
|
|
(mac_from,),
|
|
)
|
|
|
|
# Update temporary table to target MAC
|
|
cur.execute("UPDATE temp_devices SET devMac = ?", (mac_to,))
|
|
|
|
# Delete previous entry with target MAC
|
|
cur.execute("DELETE FROM Devices WHERE devMac = ?", (mac_to,))
|
|
|
|
# Insert new entry from temporary table
|
|
cur.execute(
|
|
"INSERT INTO Devices SELECT * FROM temp_devices WHERE devMac = ?", (mac_to,)
|
|
)
|
|
|
|
# Drop temporary table
|
|
cur.execute("DROP TABLE temp_devices")
|
|
|
|
conn.commit()
|
|
return {
|
|
"success": True,
|
|
"message": f"Device copied from {mac_from} to {mac_to}",
|
|
}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
conn.close()
|