Files
NetAlertX/server/models/event_instance.py
Jokob @NetAlertX c7399215ec Refactor event and session column names to camelCase
- Updated test cases to reflect new column names (eve_MAC -> eveMac, eve_DateTime -> eveDateTime, etc.) across various test files.
- Modified SQL table definitions in the database cleanup and migration tests to use camelCase naming conventions.
- Implemented migration tests to ensure legacy column names are correctly renamed to camelCase equivalents.
- Ensured that existing data is preserved during the migration process and that views referencing old column names are dropped before renaming.
- Verified that the migration function is idempotent, allowing for safe re-execution without data loss.
2026-03-16 10:11:22 +00:00

268 lines
9.1 KiB
Python

from datetime import datetime, timedelta
from logger import mylog
from database import get_temp_db_connection
from db.db_helper import row_to_json, get_date_from_period
from utils.datetime_utils import ensure_datetime, timeNowUTC
# -------------------------------------------------------------------------------
# Event handling (Matches table: Events)
# -------------------------------------------------------------------------------
class EventInstance:
def _conn(self):
"""Always return a new DB connection (thread-safe)."""
return get_temp_db_connection()
def _rows_to_list(self, rows):
return [dict(r) for r in rows]
# Get all events
def get_all(self):
conn = self._conn()
rows = conn.execute(
"SELECT * FROM Events ORDER BY eveDateTime DESC"
).fetchall()
conn.close()
return self._rows_to_list(rows)
# --- Get last n events ---
def get_last_n(self, n=10):
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
ORDER BY eveDateTime DESC
LIMIT ?
""", (n,)).fetchall()
conn.close()
return self._rows_to_list(rows)
# --- Specific helper for last 10 ---
def get_last(self):
return self.get_last_n(10)
# Get events in the last 24h
def get_recent(self):
since = timeNowUTC(as_string=False) - timedelta(hours=24)
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
WHERE eveDateTime >= ?
ORDER BY eveDateTime DESC
""", (since,)).fetchall()
conn.close()
return self._rows_to_list(rows)
# Get events from last N hours
def get_by_hours(self, hours: int):
if hours <= 0:
mylog("warn", f"[Events] get_by_hours({hours}) -> invalid value")
return []
since = timeNowUTC(as_string=False) - timedelta(hours=hours)
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
WHERE eveDateTime >= ?
ORDER BY eveDateTime DESC
""", (since,)).fetchall()
conn.close()
return self._rows_to_list(rows)
# Get events in a date range
def get_by_range(self, start: datetime, end: datetime):
if end < start:
mylog("error", f"[Events] get_by_range invalid: {start} > {end}")
raise ValueError("Start must not be after end")
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
WHERE eveDateTime BETWEEN ? AND ?
ORDER BY eveDateTime DESC
""", (start, end)).fetchall()
conn.close()
return self._rows_to_list(rows)
# Insert new event
def add(self, mac, ip, eventType, info="", pendingAlert=True, pairRow=None):
conn = self._conn()
conn.execute("""
INSERT OR IGNORE INTO Events (
eveMac, eveIp, eveDateTime,
eveEventType, eveAdditionalInfo,
evePendingAlertEmail, evePairEventRowid
) VALUES (?,?,?,?,?,?,?)
""", (mac, ip, timeNowUTC(), eventType, info,
1 if pendingAlert else 0, pairRow))
conn.commit()
conn.close()
# Delete old events
def delete_older_than(self, days: int):
cutoff = timeNowUTC(as_string=False) - timedelta(days=days)
conn = self._conn()
result = conn.execute("DELETE FROM Events WHERE eveDateTime < ?", (cutoff,))
conn.commit()
deleted_count = result.rowcount
conn.close()
return deleted_count
# --- events_endpoint.py methods ---
def createEvent(self, mac: str, ip: str, event_type: str = "Device Down", additional_info: str = "", pending_alert: int = 1, event_time: datetime | None = None):
"""
Insert a single event into the Events table.
Returns dict with success status.
"""
if isinstance(event_time, str):
start_time = ensure_datetime(event_time)
else:
start_time = ensure_datetime(event_time)
conn = self._conn()
cur = conn.cursor()
cur.execute(
"""
INSERT OR IGNORE INTO Events (eveMac, eveIp, eveDateTime, eveEventType, eveAdditionalInfo, evePendingAlertEmail)
VALUES (?, ?, ?, ?, ?, ?)
""",
(mac, ip, start_time, event_type, additional_info, pending_alert),
)
conn.commit()
conn.close()
mylog("debug", f"[Events] Created event for {mac} ({event_type})")
return {"success": True, "message": f"Created event for {mac}"}
def getEvents(self, mac=None):
"""
Fetch all events, or events for a specific MAC if provided.
Returns list of events.
"""
conn = self._conn()
cur = conn.cursor()
if mac:
sql = "SELECT * FROM Events WHERE eveMac=? ORDER BY eveDateTime DESC"
cur.execute(sql, (mac,))
else:
sql = "SELECT * FROM Events ORDER BY eveDateTime DESC"
cur.execute(sql)
rows = cur.fetchall()
events = [row_to_json(list(r.keys()), r) for r in rows]
conn.close()
return events
def deleteEventsOlderThan(self, days):
"""Delete all events older than a specified number of days"""
conn = self._conn()
cur = conn.cursor()
# Use a parameterized query with sqlite date function
sql = "DELETE FROM Events WHERE eveDateTime <= date('now', ?)"
cur.execute(sql, [f"-{days} days"])
conn.commit()
conn.close()
return {"success": True, "message": f"Deleted events older than {days} days"}
def deleteAllEvents(self):
"""Delete all events"""
conn = self._conn()
cur = conn.cursor()
sql = "DELETE FROM Events"
cur.execute(sql)
conn.commit()
conn.close()
return {"success": True, "message": "Deleted all events"}
def getEventsTotals(self, period: str = "7 days"):
"""
Return counts for events and sessions totals over a given period.
period: "7 days", "1 month", "1 year", "100 years"
Returns list with counts: [all_events, sessions, missing, voided, new, down]
"""
# Convert period to SQLite date expression
period_date_sql = get_date_from_period(period)
conn = self._conn()
cur = conn.cursor()
sql = f"""
SELECT
(SELECT COUNT(*) FROM Events WHERE eveDateTime >= {period_date_sql}) AS all_events,
(SELECT COUNT(*) FROM Sessions WHERE
sesDateTimeConnection >= {period_date_sql}
OR sesDateTimeDisconnection >= {period_date_sql}
OR sesStillConnected = 1
) AS sessions,
(SELECT COUNT(*) FROM Sessions WHERE
(sesDateTimeConnection IS NULL AND sesDateTimeDisconnection >= {period_date_sql})
OR (sesDateTimeDisconnection IS NULL AND sesStillConnected = 0 AND sesDateTimeConnection >= {period_date_sql})
) AS missing,
(SELECT COUNT(*) FROM Events WHERE eveDateTime >= {period_date_sql} AND eveEventType LIKE 'VOIDED%') AS voided,
(SELECT COUNT(*) FROM Events WHERE eveDateTime >= {period_date_sql} AND eveEventType LIKE 'New Device') AS new,
(SELECT COUNT(*) FROM Events WHERE eveDateTime >= {period_date_sql} AND eveEventType LIKE 'Device Down') AS down
"""
cur.execute(sql)
row = cur.fetchone()
conn.close()
# Return as list
return [row[0], row[1], row[2], row[3], row[4], row[5]]
def get_unstable_devices(self, hours: int = 1, threshold: int = 3, macs_only: bool = True):
"""
Return unstable devices based on flap detection.
A device is considered unstable if it has >= threshold events within the last `hours`.
Events considered:
- Connected
- Disconnected
- Device Down
- Down Reconnected
Args:
hours (int): Time window in hours (default: 1)
threshold (int): Minimum number of events to be considered unstable (default: 3)
macs_only (bool): If True, return only MAC addresses (set). Otherwise return full rows.
Returns:
set[str] OR list[dict]
"""
if hours <= 0 or threshold <= 0:
mylog("warn", f"[Events] get_unstable_devices invalid params: hours={hours}, threshold={threshold}")
return set() if macs_only else []
conn = self._conn()
sql = """
SELECT eveMac, COUNT(*) as event_count
FROM Events
WHERE eveEventType IN ('Connected','Disconnected','Device Down','Down Reconnected')
AND eveDateTime >= datetime('now', ?)
GROUP BY eveMac
HAVING COUNT(*) >= ?
"""
# SQLite expects "-1 hours" format
window = f"-{hours} hours"
rows = conn.execute(sql, (window, threshold)).fetchall()
conn.close()
if macs_only:
return {row["eveMac"] for row in rows}
return [dict(row) for row in rows]