This commit is contained in:
jokob-sk
2026-02-11 15:33:52 +11:00
44 changed files with 777 additions and 189 deletions

View File

@@ -42,11 +42,18 @@ Nested subprocess calls need their own timeout—outer timeout won't save you.
## Time Utilities
```python
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
timestamp = timeNowDB()
timestamp = timeNowUTC()
```
This is the ONLY function that calls datetime.datetime.now() in the entire codebase.
⚠️ CRITICAL: ALL database timestamps MUST be stored in UTC
This is the SINGLE SOURCE OF TRUTH for current time in NetAlertX
Use timeNowUTC() for DB writes (returns UTC string by default)
Use timeNowUTC(as_string=False) for datetime operations (scheduling, comparisons, logging)
## String Sanitization
Use sanitizers from `server/helper.py` before storing user input.

View File

@@ -12,7 +12,7 @@ on:
type: boolean
default: false
run_backend:
description: '📂 backend/ (SQL Builder & Security)'
description: '📂 backend/ & db/ (SQL Builder, Security & Migration)'
type: boolean
default: false
run_docker_env:
@@ -43,9 +43,9 @@ jobs:
run: |
PATHS=""
# Folder Mapping with 'test/' prefix
if [ "${{ github.event.inputs.scan }}" == "true" ]; then PATHS="$PATHS test/scan/"; fi
if [ "${{ github.event.inputs.run_scan }}" == "true" ]; then PATHS="$PATHS test/scan/"; fi
if [ "${{ github.event.inputs.run_api }}" == "true" ]; then PATHS="$PATHS test/api_endpoints/ test/server/"; fi
if [ "${{ github.event.inputs.run_backend }}" == "true" ]; then PATHS="$PATHS test/backend/"; fi
if [ "${{ github.event.inputs.run_backend }}" == "true" ]; then PATHS="$PATHS test/backend/ test/db/"; fi
if [ "${{ github.event.inputs.run_docker_env }}" == "true" ]; then PATHS="$PATHS test/docker_tests/"; fi
if [ "${{ github.event.inputs.run_ui }}" == "true" ]; then PATHS="$PATHS test/ui/"; fi

View File

@@ -447,21 +447,35 @@ function localizeTimestamp(input) {
return formatSafe(input, tz);
function formatSafe(str, tz) {
const date = new Date(str);
// CHECK: Does the input string have timezone information?
// - Ends with Z: "2026-02-11T11:37:02Z"
// - Has GMT±offset: "Wed Feb 11 2026 12:34:12 GMT+1100 (...)"
// - Has offset at end: "2026-02-11 11:37:02+11:00"
// - Has timezone name in parentheses: "(Australian Eastern Daylight Time)"
const hasOffset = /Z$/i.test(str.trim()) ||
/GMT[+-]\d{2,4}/.test(str) ||
/[+-]\d{2}:?\d{2}$/.test(str.trim()) ||
/\([^)]+\)$/.test(str.trim());
// ⚠️ CRITICAL: All DB timestamps are stored in UTC without timezone markers.
// If no offset is present, we must explicitly mark it as UTC by appending 'Z'
// so JavaScript doesn't interpret it as local browser time.
let isoStr = str.trim();
if (!hasOffset) {
// Ensure proper ISO format before appending Z
// Replace space with 'T' if needed: "2026-02-11 11:37:02" → "2026-02-11T11:37:02Z"
isoStr = isoStr.trim().replace(/^(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2})$/, '$1T$2') + 'Z';
}
const date = new Date(isoStr);
if (!isFinite(date)) {
console.error(`ERROR: Couldn't parse date: '${str}' with TIMEZONE ${tz}`);
return 'Failed conversion';
}
// CHECK: Does the input string have an offset (e.g., +11:00 or Z)?
// If it does, and we apply a 'tz' again, we double-shift.
const hasOffset = /[Z|[+-]\d{2}:?\d{2}]$/.test(str.trim());
return new Intl.DateTimeFormat(LOCALE, {
// If it has an offset, we display it as-is (UTC mode in Intl
// effectively means "don't add more hours").
// If no offset, apply your variable 'tz'.
timeZone: hasOffset ? 'UTC' : tz,
// Convert from UTC to user's configured timezone
timeZone: tz,
year: 'numeric', month: '2-digit', day: '2-digit',
hour: '2-digit', minute: '2-digit', second: '2-digit',
hour12: false

View File

@@ -11,7 +11,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = result,
watched3 = 'null',

View File

@@ -19,7 +19,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, hide_email # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -80,7 +80,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = result,
watched3 = 'null',

View File

@@ -26,7 +26,7 @@ from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, bytes_to_string, \
sanitize_string, normalize_string # noqa: E402 [flake8 lint suppression]
from database import DB, get_device_stats # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct
@@ -583,7 +583,7 @@ def publish_notifications(db, mqtt_client):
# Optional: attach meta info
payload["_meta"] = {
"published_at": timeNowDB(),
"published_at": timeNowUTC(),
"source": "NetAlertX",
"notification_GUID": notification["GUID"]
}
@@ -631,7 +631,7 @@ def prepTimeStamp(datetime_str):
except ValueError:
mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'"])
# Use the current time if the input format is invalid
parsed_datetime = datetime.now(conf.tz)
parsed_datetime = timeNowUTC(as_string=False)
# Convert to the required format with 'T' between date and time and ensure the timezone is included
return parsed_datetime.isoformat() # This will include the timezone offset

View File

@@ -13,7 +13,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -63,7 +63,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_text),
watched3 = response_status_code,

View File

@@ -15,7 +15,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, hide_string # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
from database import DB # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId=pluginName,
secondaryId=timeNowDB(),
secondaryId=timeNowUTC(),
watched1=notification["GUID"],
watched2=handleEmpty(response_text),
watched3=response_status_code,

View File

@@ -13,7 +13,7 @@ from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, hide_string # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
from database import DB # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
@@ -61,7 +61,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_text),
watched3 = response_status_code,

View File

@@ -11,7 +11,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import confFileName, logPath # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId=pluginName,
secondaryId=timeNowDB(),
secondaryId=timeNowUTC(),
watched1=notification["GUID"],
watched2=result,
watched3='null',

View File

@@ -15,7 +15,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import logPath, confFileName # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, write_file # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
@@ -69,7 +69,7 @@ def main():
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_stdout),
watched3 = handleEmpty(response_stderr),

View File

@@ -4,7 +4,6 @@ import os
import argparse
import sys
import csv
from datetime import datetime
# Register NetAlertX directories
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
@@ -13,6 +12,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
@@ -60,7 +60,7 @@ def main():
if overwrite:
filename = 'devices.csv'
else:
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
timestamp = timeNowUTC(as_string=False).strftime('%Y%m%d%H%M%S')
filename = f'devices_{timestamp}.csv'
fullPath = os.path.join(values.location.split('=')[1], filename)

View File

@@ -22,7 +22,7 @@ from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB, DATETIME_PATTERN # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC, DATETIME_PATTERN # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value("TIMEZONE"))
@@ -151,7 +151,7 @@ def main():
watched1=freebox["name"],
watched2=freebox["operator"],
watched3="Gateway",
watched4=timeNowDB(),
watched4=timeNowUTC(),
extra="",
foreignKey=freebox["mac"],
)

View File

@@ -12,7 +12,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger, append_line_to_file # noqa: E402 [flake8 lint suppression]
from helper import check_IP_format, get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
@@ -74,7 +74,7 @@ def main():
mylog('verbose', [f'[{pluginName}] Curl Fallback (new_internet_IP|cmd_output): {new_internet_IP} | {cmd_output}'])
# logging
append_line_to_file(logPath + '/IP_changes.log', '[' + str(timeNowDB()) + ']\t' + new_internet_IP + '\n')
append_line_to_file(logPath + '/IP_changes.log', '[' + str(timeNowUTC()) + ']\t' + new_internet_IP + '\n')
plugin_objects = Plugin_Objects(RESULT_FILE)

View File

@@ -11,7 +11,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -37,7 +37,7 @@ def main():
speedtest_result = run_speedtest()
plugin_objects.add_object(
primaryId = 'Speedtest',
secondaryId = timeNowDB(),
secondaryId = timeNowUTC(),
watched1 = speedtest_result['download_speed'],
watched2 = speedtest_result['upload_speed'],
watched3 = speedtest_result['full_json'],

View File

@@ -3,7 +3,6 @@
import os
import sys
import subprocess
from datetime import datetime
from pytz import timezone
from functools import reduce
@@ -13,6 +12,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -95,7 +95,7 @@ def parse_neighbors(raw_neighbors: list[str]):
neighbor = {}
neighbor['ip'] = fields[0]
neighbor['mac'] = fields[2]
neighbor['last_seen'] = datetime.now()
neighbor['last_seen'] = timeNowUTC()
# Unknown data
neighbor['hostname'] = '(unknown)'

View File

@@ -11,7 +11,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger, append_line_to_file # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -213,7 +213,7 @@ def performNmapScan(deviceIPs, deviceMACs, timeoutSec, args):
elif 'PORT' in line and 'STATE' in line and 'SERVICE' in line:
startCollecting = False # end reached
elif startCollecting and len(line.split()) == 3:
newEntriesTmp.append(nmap_entry(ip, deviceMACs[devIndex], timeNowDB(), line.split()[0], line.split()[1], line.split()[2]))
newEntriesTmp.append(nmap_entry(ip, deviceMACs[devIndex], timeNowUTC(), line.split()[0], line.split()[1], line.split()[2]))
newPortsPerDevice += 1
elif 'Nmap done' in line:
duration = line.split('scanned in ')[1]

View File

@@ -6,7 +6,6 @@ Imports devices from Pi-hole v6 API (Network endpoints) into NetAlertX plugin re
import os
import sys
import datetime
import requests
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning
@@ -18,6 +17,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
pluginName = 'PIHOLEAPI'
from plugin_helper import Plugin_Objects, is_mac # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
@@ -201,7 +201,7 @@ def gather_device_entries():
"""
entries = []
devices = get_pihole_network_devices()
now_ts = int(datetime.datetime.now().timestamp())
now_ts = int(timeNowUTC(as_string=False).timestamp())
for device in devices:
hwaddr = device.get('hwaddr')

View File

@@ -12,7 +12,7 @@ sys.path.append(f"{INSTALL_PATH}/front/plugins")
sys.path.append(f'{INSTALL_PATH}/server')
from logger import mylog # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from const import default_tz, fullConfPath # noqa: E402 [flake8 lint suppression]
@@ -237,7 +237,7 @@ class Plugin_Object:
self.pluginPref = ""
self.primaryId = primaryId
self.secondaryId = secondaryId
self.created = timeNowDB()
self.created = timeNowUTC()
self.changed = ""
self.watched1 = watched1
self.watched2 = watched2

View File

@@ -16,7 +16,7 @@ from utils.plugin_utils import get_plugins_configs, decode_and_rename_files # n
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from utils.crypto_utils import encrypt_data # noqa: E402 [flake8 lint suppression]
from messaging.in_app import write_notification # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
@@ -147,7 +147,7 @@ def main():
message = f'[{pluginName}] Device data from node "{node_name}" written to {log_file_name}'
mylog('verbose', [message])
if lggr.isAbove('verbose'):
write_notification(message, 'info', timeNowDB())
write_notification(message, 'info', timeNowUTC())
# Process any received data for the Device DB table (ONLY JSON)
# Create the file path
@@ -253,7 +253,7 @@ def main():
message = f'[{pluginName}] Inserted "{len(new_devices)}" new devices'
mylog('verbose', [message])
write_notification(message, 'info', timeNowDB())
write_notification(message, 'info', timeNowUTC())
# Commit and close the connection
conn.commit()
@@ -298,7 +298,7 @@ def send_data(api_token, file_content, encryption_key, file_path, node_name, pre
if response.status_code == 200:
message = f'[{pluginName}] Data for "{file_path}" sent successfully via {final_endpoint}'
mylog('verbose', [message])
write_notification(message, 'info', timeNowDB())
write_notification(message, 'info', timeNowUTC())
return True
except requests.RequestException as e:
@@ -307,7 +307,7 @@ def send_data(api_token, file_content, encryption_key, file_path, node_name, pre
# If all endpoints fail
message = f'[{pluginName}] Failed to send data for "{file_path}" via all endpoints'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowDB())
write_notification(message, 'alert', timeNowUTC())
return False
@@ -331,7 +331,7 @@ def get_data(api_token, node_url):
except json.JSONDecodeError:
message = f'[{pluginName}] Failed to parse JSON from {final_endpoint}'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowDB())
write_notification(message, 'alert', timeNowUTC())
return ""
except requests.RequestException as e:
mylog('verbose', [f'[{pluginName}] Error calling {final_endpoint}: {e}'])
@@ -339,7 +339,7 @@ def get_data(api_token, node_url):
# If all endpoints fail
message = f'[{pluginName}] Failed to get data from "{node_url}" via all endpoints'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowDB())
write_notification(message, 'alert', timeNowUTC())
return ""

View File

@@ -25,7 +25,7 @@ import conf
from const import fullConfPath, sql_new_devices
from logger import mylog
from helper import filePermissions
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from api import update_api
from scan.session_events import process_scan
@@ -104,7 +104,7 @@ def main():
pm, all_plugins, imported = importConfigs(pm, db, all_plugins)
# update time started
conf.loop_start_time = timeNowTZ()
conf.loop_start_time = timeNowUTC(as_string=False)
loop_start_time = conf.loop_start_time # TODO fix

View File

@@ -23,7 +23,7 @@ from const import (
)
from logger import mylog
from helper import write_file, get_setting_value
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from models.user_events_queue_instance import UserEventsQueueInstance
@@ -105,7 +105,7 @@ def update_api(
class api_endpoint_class:
def __init__(self, db, forceUpdate, query, path, is_ad_hoc_user_event=False):
current_time = timeNowTZ()
current_time = timeNowUTC(as_string=False)
self.db = db
self.query = query
@@ -163,7 +163,7 @@ class api_endpoint_class:
# ----------------------------------------
def try_write(self, forceUpdate):
current_time = timeNowTZ()
current_time = timeNowUTC(as_string=False)
# Debugging info to understand the issue
# mylog('debug', [f'[API] api_endpoint_class: {self.fileName} is_ad_hoc_user_event
@@ -183,7 +183,7 @@ class api_endpoint_class:
write_file(self.path, json.dumps(self.jsonData))
self.needsUpdate = False
self.last_update_time = timeNowTZ() # Reset last_update_time after writing
self.last_update_time = timeNowUTC(as_string=False) # Reset last_update_time after writing
# Update user event execution log
# mylog('verbose', [f'[API] api_endpoint_class: is_ad_hoc_user_event {self.is_ad_hoc_user_event}'])

View File

@@ -12,7 +12,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, format_ip_long # noqa: E402 [flake8 lint suppression]
from db.db_helper import get_date_from_period # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB, format_date_iso, format_event_date, format_date_diff, format_date # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC, format_date_iso, format_event_date, format_date_diff, format_date # noqa: E402 [flake8 lint suppression]
# --------------------------
@@ -165,7 +165,7 @@ def get_sessions_calendar(start_date, end_date, mac):
rows = cur.fetchall()
conn.close()
now_iso = timeNowDB()
now_iso = timeNowUTC()
events = []
for row in rows:

View File

@@ -3,7 +3,7 @@ import base64
from flask import jsonify, request
from logger import mylog
from helper import get_setting_value
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
from messaging.in_app import write_notification
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
@@ -22,19 +22,19 @@ def handle_sync_get():
raw_data = f.read()
except FileNotFoundError:
msg = f"[Plugin: SYNC] Data file not found: {file_path}"
write_notification(msg, "alert", timeNowDB())
write_notification(msg, "alert", timeNowUTC())
mylog("verbose", [msg])
return jsonify({"error": msg}), 500
response_data = base64.b64encode(raw_data).decode("utf-8")
write_notification("[Plugin: SYNC] Data sent", "info", timeNowDB())
write_notification("[Plugin: SYNC] Data sent", "info", timeNowUTC())
return jsonify({
"node_name": get_setting_value("SYNC_node_name"),
"status": 200,
"message": "OK",
"data_base64": response_data,
"timestamp": timeNowDB()
"timestamp": timeNowUTC()
}), 200
@@ -68,11 +68,11 @@ def handle_sync_post():
f.write(data)
except Exception as e:
msg = f"[Plugin: SYNC] Failed to store data: {e}"
write_notification(msg, "alert", timeNowDB())
write_notification(msg, "alert", timeNowUTC())
mylog("verbose", [msg])
return jsonify({"error": msg}), 500
msg = f"[Plugin: SYNC] Data received ({file_path_new})"
write_notification(msg, "info", timeNowDB())
write_notification(msg, "info", timeNowUTC())
mylog("verbose", [msg])
return jsonify({"message": "Data received and stored successfully"}), 200

View File

@@ -4,7 +4,7 @@ import json
from const import applicationPath, apiPath
from logger import mylog
from helper import checkNewVersion
from utils.datetime_utils import timeNowDB, timeNow
from utils.datetime_utils import timeNowUTC
from api_server.sse_broadcast import broadcast_state_update
# Register NetAlertX directories using runtime configuration
@@ -67,7 +67,7 @@ class app_state_class:
previousState = ""
# Update self
self.lastUpdated = str(timeNowDB())
self.lastUpdated = str(timeNowUTC())
if os.path.exists(stateFile):
try:
@@ -95,7 +95,7 @@ class app_state_class:
self.showSpinner = False
self.processScan = False
self.isNewVersion = checkNewVersion()
self.isNewVersionChecked = int(timeNow().timestamp())
self.isNewVersionChecked = int(timeNowUTC(as_string=False).timestamp())
self.graphQLServerStarted = 0
self.currentState = "Init"
self.pluginsStates = {}
@@ -135,10 +135,10 @@ class app_state_class:
self.buildTimestamp = buildTimestamp
# check for new version every hour and if currently not running new version
if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(
timeNow().timestamp()
timeNowUTC(as_string=False).timestamp()
):
self.isNewVersion = checkNewVersion()
self.isNewVersionChecked = int(timeNow().timestamp())
self.isNewVersionChecked = int(timeNowUTC(as_string=False).timestamp())
# Update .json file
# with open(stateFile, 'w') as json_file:

View File

@@ -17,6 +17,7 @@ from db.db_upgrade import (
ensure_Settings,
ensure_Indexes,
ensure_mac_lowercase_triggers,
migrate_timestamps_to_utc,
)
@@ -187,6 +188,9 @@ class DB:
# Parameters tables setup
ensure_Parameters(self.sql)
# One-time UTC timestamp migration (must run after Parameters table exists)
migrate_timestamps_to_utc(self.sql)
# Plugins tables setup
ensure_plugins_tables(self.sql)

View File

@@ -228,7 +228,7 @@ def ensure_views(sql) -> bool:
)
SELECT
d.*, -- all Device fields
r.* -- all CurrentScan fields
r.* -- all CurrentScan fields
FROM Devices d
LEFT JOIN RankedScans r
ON d.devMac = r.scanMac
@@ -494,3 +494,219 @@ def ensure_plugins_tables(sql) -> bool:
); """)
return True
# ===============================================================================
# UTC Timestamp Migration (added 2026-02-10)
# ===============================================================================
def is_timestamps_in_utc(sql) -> bool:
"""
Check if existing timestamps in Devices table are already in UTC format.
Strategy:
1. Sample 10 non-NULL devFirstConnection timestamps from Devices
2. For each timestamp, assume it's UTC and calculate what it would be in local time
3. Check if timestamps have a consistent offset pattern (indicating local time storage)
4. If offset is consistently > 0, they're likely local timestamps (need migration)
5. If offset is ~0 or inconsistent, they're likely already UTC (skip migration)
Returns:
bool: True if timestamps appear to be in UTC already, False if they need migration
"""
try:
# Get timezone offset in seconds
import conf
from zoneinfo import ZoneInfo
import datetime as dt
now = dt.datetime.now(dt.UTC).replace(microsecond=0)
current_offset_seconds = 0
try:
if isinstance(conf.tz, dt.tzinfo):
tz = conf.tz
elif conf.tz:
tz = ZoneInfo(conf.tz)
else:
tz = None
except Exception:
tz = None
if tz:
local_now = dt.datetime.now(tz).replace(microsecond=0)
local_offset = local_now.utcoffset().total_seconds()
utc_offset = now.utcoffset().total_seconds() if now.utcoffset() else 0
current_offset_seconds = int(local_offset - utc_offset)
# Sample timestamps from Devices table
sql.execute("""
SELECT devFirstConnection, devLastConnection, devLastNotification
FROM Devices
WHERE devFirstConnection IS NOT NULL
LIMIT 10
""")
samples = []
for row in sql.fetchall():
for ts in row:
if ts:
samples.append(ts)
if not samples:
mylog("verbose", "[db_upgrade] No timestamp samples found in Devices - assuming UTC")
return True # Empty DB, assume UTC
# Parse samples and check if they have timezone info (which would indicate migration already done)
has_tz_marker = any('+' in str(ts) or 'Z' in str(ts) for ts in samples)
if has_tz_marker:
mylog("verbose", "[db_upgrade] Timestamps have timezone markers - already migrated to UTC")
return True
mylog("debug", f"[db_upgrade] Sampled {len(samples)} timestamps. Current TZ offset: {current_offset_seconds}s")
mylog("verbose", "[db_upgrade] Timestamps appear to be in system local time - migration needed")
return False
except Exception as e:
mylog("warn", f"[db_upgrade] Error checking UTC status: {e} - assuming UTC")
return True
def migrate_timestamps_to_utc(sql) -> bool:
"""
Migrate all timestamp columns in the database from local time to UTC.
This function determines if migration is needed based on the VERSION setting:
- Fresh installs (no VERSION): Skip migration - timestamps already UTC from timeNowUTC()
- Version >= 26.2.6: Skip migration - already using UTC timestamps
- Version < 26.2.6: Run migration - convert local timestamps to UTC
Affected tables:
- Devices: devFirstConnection, devLastConnection, devLastNotification
- Events: eve_DateTime
- Sessions: ses_DateTimeConnection, ses_DateTimeDisconnection
- Notifications: DateTimeCreated, DateTimePushed
- Online_History: Scan_Date
- Plugins_Objects: DateTimeCreated, DateTimeChanged
- Plugins_Events: DateTimeCreated, DateTimeChanged
- Plugins_History: DateTimeCreated, DateTimeChanged
- AppEvents: DateTimeCreated
Returns:
bool: True if migration completed or wasn't needed, False on error
"""
try:
import conf
from zoneinfo import ZoneInfo
import datetime as dt
# Check VERSION from Settings table (from previous app run)
sql.execute("SELECT setValue FROM Settings WHERE setKey = 'VERSION'")
result = sql.fetchone()
prev_version = result[0] if result else ""
# Fresh install: VERSION is empty → timestamps already UTC from timeNowUTC()
if not prev_version or prev_version == "" or prev_version == "unknown":
mylog("verbose", "[db_upgrade] Fresh install detected - timestamps already in UTC format")
return True
# Parse version - format: "26.2.6" or "v26.2.6"
try:
version_parts = prev_version.strip('v').split('.')
major = int(version_parts[0]) if len(version_parts) > 0 else 0
minor = int(version_parts[1]) if len(version_parts) > 1 else 0
patch = int(version_parts[2]) if len(version_parts) > 2 else 0
# UTC timestamps introduced in v26.2.6
# If upgrading from 26.2.6 or later, timestamps are already UTC
if (major > 26) or (major == 26 and minor > 2) or (major == 26 and minor == 2 and patch >= 6):
mylog("verbose", f"[db_upgrade] Version {prev_version} already uses UTC timestamps - skipping migration")
return True
mylog("verbose", f"[db_upgrade] Upgrading from {prev_version} (< v26.2.6) - migrating timestamps to UTC")
except (ValueError, IndexError) as e:
mylog("warn", f"[db_upgrade] Could not parse version '{prev_version}': {e} - checking timestamps")
# Fallback: use detection logic
if is_timestamps_in_utc(sql):
mylog("verbose", "[db_upgrade] Timestamps appear to be in UTC - skipping migration")
return True
# Get timezone offset
try:
if isinstance(conf.tz, dt.tzinfo):
tz = conf.tz
elif conf.tz:
tz = ZoneInfo(conf.tz)
else:
tz = None
except Exception:
tz = None
if tz:
now_local = dt.datetime.now(tz)
offset_hours = (now_local.utcoffset().total_seconds()) / 3600
else:
offset_hours = 0
mylog("verbose", f"[db_upgrade] Starting UTC timestamp migration (offset: {offset_hours} hours)")
# List of tables and their datetime columns
timestamp_columns = {
'Devices': ['devFirstConnection', 'devLastConnection', 'devLastNotification'],
'Events': ['eve_DateTime'],
'Sessions': ['ses_DateTimeConnection', 'ses_DateTimeDisconnection'],
'Notifications': ['DateTimeCreated', 'DateTimePushed'],
'Online_History': ['Scan_Date'],
'Plugins_Objects': ['DateTimeCreated', 'DateTimeChanged'],
'Plugins_Events': ['DateTimeCreated', 'DateTimeChanged'],
'Plugins_History': ['DateTimeCreated', 'DateTimeChanged'],
'AppEvents': ['DateTimeCreated'],
}
for table, columns in timestamp_columns.items():
try:
# Check if table exists
sql.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'")
if not sql.fetchone():
mylog("debug", f"[db_upgrade] Table '{table}' does not exist - skipping")
continue
for column in columns:
try:
# Update non-NULL timestamps
if offset_hours > 0:
# Convert local to UTC (subtract offset)
sql.execute(f"""
UPDATE {table}
SET {column} = DATETIME({column}, '-{int(offset_hours)} hours', '-{int((offset_hours % 1) * 60)} minutes')
WHERE {column} IS NOT NULL
""")
elif offset_hours < 0:
# Convert local to UTC (add offset absolute value)
abs_hours = abs(int(offset_hours))
abs_mins = int((abs(offset_hours) % 1) * 60)
sql.execute(f"""
UPDATE {table}
SET {column} = DATETIME({column}, '+{abs_hours} hours', '+{abs_mins} minutes')
WHERE {column} IS NOT NULL
""")
row_count = sql.rowcount
if row_count > 0:
mylog("verbose", f"[db_upgrade] Migrated {row_count} timestamps in {table}.{column}")
except Exception as e:
mylog("warn", f"[db_upgrade] Error updating {table}.{column}: {e}")
continue
except Exception as e:
mylog("warn", f"[db_upgrade] Error processing table {table}: {e}")
continue
mylog("none", "[db_upgrade] ✓ UTC timestamp migration completed successfully")
return True
except Exception as e:
mylog("none", f"[db_upgrade] ERROR during timestamp migration: {e}")
return False

View File

@@ -12,7 +12,7 @@ import uuid
import conf
from const import fullConfPath, fullConfFolder, default_tz
from helper import getBuildTimeStampAndVersion, collect_lang_strings, updateSubnets, generate_random_string
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from logger import mylog
from api import update_api
@@ -419,7 +419,7 @@ def importConfigs(pm, db, all_plugins):
# TODO cleanup later ----------------------------------------------------------------------------------
# init all time values as we have timezone - all this shoudl be moved into plugin/plugin settings
conf.time_started = datetime.datetime.now(conf.tz)
conf.time_started = timeNowUTC(as_string=False)
conf.plugins_once_run = False
# timestamps of last execution times
@@ -645,7 +645,7 @@ def importConfigs(pm, db, all_plugins):
if run_val == "schedule":
newSchedule = Cron(run_sch).schedule(
start_date=datetime.datetime.now(conf.tz)
start_date=timeNowUTC(as_string=False)
)
conf.mySchedules.append(
schedule_class(
@@ -682,7 +682,7 @@ def importConfigs(pm, db, all_plugins):
Check out new features and what has changed in the \
<a href="https://github.com/jokob-sk/NetAlertX/releases" target="_blank">📓 release notes</a>.""",
'interrupt',
timeNowDB()
timeNowUTC()
)
# -----------------
@@ -721,7 +721,7 @@ def importConfigs(pm, db, all_plugins):
mylog('minimal', msg)
# front end app log loggging
write_notification(msg, 'info', timeNowDB())
write_notification(msg, 'info', timeNowUTC())
return pm, all_plugins, True
@@ -770,7 +770,7 @@ def renameSettings(config_file):
# If the file contains old settings, proceed with renaming and backup
if contains_old_settings:
# Create a backup file with the suffix "_old_setting_names" and timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
timestamp = timeNowUTC(as_string=False).strftime("%Y%m%d%H%M%S")
backup_file = f"{config_file}_old_setting_names_{timestamp}.bak"
mylog("debug", f"[Config] Old setting names will be replaced and a backup ({backup_file}) of the config created.",)

View File

@@ -9,7 +9,7 @@ import logging
# NetAlertX imports
import conf
from const import logPath
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowUTC
DEFAULT_LEVEL = "none"
@@ -124,7 +124,7 @@ def start_log_writer_thread():
# -------------------------------------------------------------------------------
def file_print(*args):
result = timeNowTZ().strftime("%H:%M:%S") + " "
result = timeNowUTC(as_string=False).strftime("%H:%M:%S") + " "
for arg in args:
if isinstance(arg, list):
arg = " ".join(

View File

@@ -13,7 +13,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
from const import apiPath # noqa: E402 [flake8 lint suppression]
from logger import mylog # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.sse_broadcast import broadcast_unread_notifications_count # noqa: E402 [flake8 lint suppression]
@@ -64,7 +64,7 @@ def write_notification(content, level="alert", timestamp=None):
None
"""
if timestamp is None:
timestamp = timeNowDB()
timestamp = timeNowUTC()
notification = {
"timestamp": str(timestamp),

View File

@@ -18,7 +18,7 @@ from db.authoritative_handler import (
unlock_fields
)
from helper import is_random_mac, get_setting_value
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
class DeviceInstance:
@@ -407,7 +407,7 @@ class DeviceInstance:
def getDeviceData(self, mac, period=""):
"""Fetch device info with children, event stats, and presence calculation."""
now = timeNowDB()
now = timeNowUTC()
# Special case for new device
if mac.lower() == "new":
@@ -639,8 +639,8 @@ class DeviceInstance:
data.get("devSkipRepeated") or 0,
data.get("devIsNew") or 0,
data.get("devIsArchived") or 0,
data.get("devLastConnection") or timeNowDB(),
data.get("devFirstConnection") or timeNowDB(),
data.get("devLastConnection") or timeNowUTC(),
data.get("devFirstConnection") or timeNowUTC(),
data.get("devLastIP") or "",
data.get("devGUID") or "",
data.get("devCustomProps") or "",

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta
from logger import mylog
from database import get_temp_db_connection
from db.db_helper import row_to_json, get_date_from_period
from utils.datetime_utils import ensure_datetime
from utils.datetime_utils import ensure_datetime, timeNowUTC
# -------------------------------------------------------------------------------
@@ -43,7 +43,7 @@ class EventInstance:
# Get events in the last 24h
def get_recent(self):
since = datetime.now() - timedelta(hours=24)
since = timeNowUTC(as_string=False) - timedelta(hours=24)
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
@@ -59,7 +59,7 @@ class EventInstance:
mylog("warn", f"[Events] get_by_hours({hours}) -> invalid value")
return []
since = datetime.now() - timedelta(hours=hours)
since = timeNowUTC(as_string=False) - timedelta(hours=hours)
conn = self._conn()
rows = conn.execute("""
SELECT * FROM Events
@@ -93,14 +93,14 @@ class EventInstance:
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail, eve_PairEventRowid
) VALUES (?,?,?,?,?,?,?)
""", (mac, ip, datetime.now(), eventType, info,
""", (mac, ip, timeNowUTC(), eventType, info,
1 if pendingAlert else 0, pairRow))
conn.commit()
conn.close()
# Delete old events
def delete_older_than(self, days: int):
cutoff = datetime.now() - timedelta(days=days)
cutoff = timeNowUTC(as_string=False) - timedelta(days=days)
conn = self._conn()
result = conn.execute("DELETE FROM Events WHERE eve_DateTime < ?", (cutoff,))
conn.commit()

View File

@@ -16,7 +16,7 @@ from helper import (
getBuildTimeStampAndVersion,
)
from messaging.in_app import write_notification
from utils.datetime_utils import timeNowDB, get_timezone_offset
from utils.datetime_utils import timeNowUTC, get_timezone_offset
# -----------------------------------------------------------------------------
@@ -68,7 +68,7 @@ class NotificationInstance:
self.HasNotifications = True
self.GUID = str(uuid.uuid4())
self.DateTimeCreated = timeNowDB()
self.DateTimeCreated = timeNowUTC()
self.DateTimePushed = ""
self.Status = "new"
self.JSON = JSON
@@ -107,7 +107,7 @@ class NotificationInstance:
mail_html = mail_html.replace("NEW_VERSION", newVersionText)
# Report "REPORT_DATE" in Header & footer
timeFormated = timeNowDB()
timeFormated = timeNowUTC()
mail_text = mail_text.replace("REPORT_DATE", timeFormated)
mail_html = mail_html.replace("REPORT_DATE", timeFormated)
@@ -208,7 +208,7 @@ class NotificationInstance:
# Updates the Published properties
def updatePublishedVia(self, newPublishedVia):
self.PublishedVia = newPublishedVia
self.DateTimePushed = timeNowDB()
self.DateTimePushed = timeNowUTC()
self.upsert()
# create or update a notification
@@ -274,7 +274,7 @@ class NotificationInstance:
SELECT eve_MAC FROM Events
WHERE eve_PendingAlertEmail = 1
)
""", (timeNowDB(),))
""", (timeNowUTC(),))
self.db.sql.execute("""
UPDATE Events SET eve_PendingAlertEmail = 0

View File

@@ -3,7 +3,7 @@ import uuid
from const import logPath
from logger import mylog
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
class UserEventsQueueInstance:
@@ -90,7 +90,7 @@ class UserEventsQueueInstance:
success - True if the event was successfully added.
message - Log message describing the result.
"""
timestamp = timeNowDB()
timestamp = timeNowUTC()
# Generate GUID
guid = str(uuid.uuid4())

View File

@@ -11,7 +11,7 @@ import conf
from const import pluginsPath, logPath, applicationPath, reportTemplatesPath
from logger import mylog, Logger
from helper import get_file_content, get_setting, get_setting_value
from utils.datetime_utils import timeNowTZ, timeNowDB
from utils.datetime_utils import timeNowUTC
from app_state import updateState
from api import update_api
from utils.plugin_utils import (
@@ -113,7 +113,7 @@ class plugin_manager:
schd = self._cache["schedules"].get(prefix)
if schd:
# note the last time the scheduled plugin run was executed
schd.last_run = timeNowTZ()
schd.last_run = timeNowUTC(as_string=False)
# ===============================================================================
# Handling of user initialized front-end events
@@ -166,14 +166,14 @@ class plugin_manager:
if len(executed_events) > 0 and executed_events:
executed_events_message = ', '.join(executed_events)
mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message])
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowDB())
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowUTC())
return
# -------------------------------------------------------------------------------
def handle_run(self, runType):
mylog('minimal', ['[', timeNowDB(), '] START Run: ', runType])
mylog('minimal', ['[', timeNowUTC(), '] START Run: ', runType])
# run the plugin
for plugin in self.all_plugins:
@@ -190,15 +190,13 @@ class plugin_manager:
pluginsStates={pluginName: current_plugin_state.get(pluginName, {})}
)
mylog('minimal', ['[', timeNowDB(), '] END Run: ', runType])
mylog('minimal', ['[', timeNowUTC(), '] END Run: ', runType])
return
# -------------------------------------------------------------------------------
def handle_test(self, runType):
mylog("minimal", ["[", timeNowTZ(), "] [Test] START Test: ", runType])
mylog('minimal', ['[', timeNowDB(), '] [Test] START Test: ', runType])
mylog('minimal', ['[', timeNowUTC(), '] [Test] START Test: ', runType])
# Prepare test samples
sample_json = json.loads(
@@ -235,7 +233,7 @@ class plugin_manager:
"""
sql = self.db.sql
plugin_states = {}
now_str = timeNowDB()
now_str = timeNowUTC()
if plugin_name: # Only compute for single plugin
sql.execute(
@@ -799,7 +797,7 @@ def process_plugin_events(db, plugin, plugEventsArr):
if isMissing:
# if wasn't missing before, mark as changed
if tmpObj.status != "missing-in-last-scan":
tmpObj.changed = timeNowDB()
tmpObj.changed = timeNowUTC()
tmpObj.status = "missing-in-last-scan"
# mylog('debug', [f'[Plugins] Missing from last scan (PrimaryID | SecondaryID): {tmpObj.primaryId} | {tmpObj.secondaryId}'])

View File

@@ -3,7 +3,7 @@ import os
import re
import ipaddress
from helper import get_setting_value, check_IP_format
from utils.datetime_utils import timeNowDB, normalizeTimeStamp
from utils.datetime_utils import timeNowUTC, normalizeTimeStamp
from logger import mylog, Logger
from const import vendorsPath, vendorsPathNewest, sql_generateGuid, NULL_EQUIVALENTS
from models.device_instance import DeviceInstance
@@ -227,7 +227,7 @@ def update_devLastConnection_from_CurrentScan(db):
Update devLastConnection to current time for all devices seen in CurrentScan.
"""
sql = db.sql
startTime = timeNowDB()
startTime = timeNowUTC()
mylog("debug", f"[Update Devices] - Updating devLastConnection to {startTime}")
sql.execute(f"""
@@ -600,7 +600,7 @@ def print_scan_stats(db):
# -------------------------------------------------------------------------------
def create_new_devices(db):
sql = db.sql # TO-DO
startTime = timeNowDB()
startTime = timeNowUTC()
# Insert events for new devices from CurrentScan (not yet in Devices)
@@ -1109,7 +1109,7 @@ def update_devices_names(pm):
# --- Step 3: Log last checked time ---
# After resolving names, update last checked
pm.plugin_checks = {"DIGSCAN": timeNowDB(), "AVAHISCAN": timeNowDB(), "NSLOOKUP": timeNowDB(), "NBTSCAN": timeNowDB()}
pm.plugin_checks = {"DIGSCAN": timeNowUTC(), "AVAHISCAN": timeNowUTC(), "NSLOOKUP": timeNowUTC(), "NBTSCAN": timeNowUTC()}
# -------------------------------------------------------------------------------

View File

@@ -14,11 +14,11 @@ from scan.device_handling import (
)
from helper import get_setting_value
from db.db_helper import print_table_schema
from utils.datetime_utils import timeNowDB
from utils.datetime_utils import timeNowUTC
from logger import mylog, Logger
from messaging.reporting import skip_repeated_notifications
from messaging.in_app import update_unread_notifications_count
from const import NULL_EQUIVALENTS, NULL_EQUIVALENTS_SQL
from const import NULL_EQUIVALENTS_SQL
# Make sure log level is initialized correctly
@@ -167,7 +167,7 @@ def create_sessions_snapshot(db):
# -------------------------------------------------------------------------------
def insert_events(db):
sql = db.sql # TO-DO
startTime = timeNowDB()
startTime = timeNowUTC()
# Check device down
mylog("debug", "[Events] - 1 - Devices down")
@@ -234,7 +234,7 @@ def insert_events(db):
def insertOnlineHistory(db):
sql = db.sql # TO-DO: Implement sql object
scanTimestamp = timeNowDB()
scanTimestamp = timeNowUTC()
# Query to fetch all relevant device counts in one go
query = """

View File

@@ -3,7 +3,7 @@
import datetime
from logger import mylog
import conf
from utils.datetime_utils import timeNowUTC
# -------------------------------------------------------------------------------
@@ -28,11 +28,11 @@ class schedule_class:
# Initialize the last run time if never run before
if self.last_run == 0:
self.last_run = (
datetime.datetime.now(conf.tz) - datetime.timedelta(days=365)
timeNowUTC(as_string=False) - datetime.timedelta(days=365)
).replace(microsecond=0)
# get the current time with the currently specified timezone
nowTime = datetime.datetime.now(conf.tz).replace(microsecond=0)
nowTime = timeNowUTC(as_string=False)
# Run the schedule if the current time is past the schedule time we saved last time and
# (maybe the following check is unnecessary)

View File

@@ -20,45 +20,41 @@ DATETIME_PATTERN = "%Y-%m-%d %H:%M:%S"
DATETIME_REGEX = re.compile(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$')
def timeNowTZ():
if conf.tz:
return datetime.datetime.now(conf.tz).replace(microsecond=0)
else:
return datetime.datetime.now().replace(microsecond=0)
# ⚠️ CRITICAL: ALL database timestamps MUST be stored in UTC
# This is the SINGLE SOURCE OF TRUTH for current time in NetAlertX
# Use timeNowUTC() for DB writes (returns UTC string by default)
# Use timeNowUTC(as_string=False) for datetime operations (scheduling, comparisons, logging)
def timeNowUTC(as_string=True):
"""
Return the current time in UTC.
This is the ONLY function that calls datetime.datetime.now() in the entire codebase.
All timestamps stored in the database MUST use UTC format.
def timeNow():
return datetime.datetime.now().replace(microsecond=0)
Args:
as_string (bool): If True, returns formatted string for DB storage.
If False, returns datetime object for operations.
Returns:
str: UTC timestamp as 'YYYY-MM-DD HH:MM:SS' when as_string=True
datetime.datetime: UTC datetime object when as_string=False
Examples:
timeNowUTC() → '2025-11-04 07:09:11' (for DB writes)
timeNowUTC(as_string=False) → datetime.datetime(2025, 11, 4, 7, 9, 11, tzinfo=UTC)
"""
utc_now = datetime.datetime.now(datetime.UTC).replace(microsecond=0)
return utc_now.strftime(DATETIME_PATTERN) if as_string else utc_now
def get_timezone_offset():
now = datetime.datetime.now(conf.tz)
offset_hours = now.utcoffset().total_seconds() / 3600
offset_formatted = "{:+03d}:{:02d}".format(int(offset_hours), int((offset_hours % 1) * 60))
return offset_formatted
def timeNowDB(local=True):
"""
Return the current time (local or UTC) as ISO 8601 for DB storage.
Safe for SQLite, PostgreSQL, etc.
Example local: '2025-11-04 18:09:11'
Example UTC: '2025-11-04 07:09:11'
"""
if local:
try:
if isinstance(conf.tz, datetime.tzinfo):
tz = conf.tz
elif conf.tz:
tz = ZoneInfo(conf.tz)
else:
tz = None
except Exception:
tz = None
return datetime.datetime.now(tz).strftime(DATETIME_PATTERN)
if conf.tz:
now = timeNowUTC(as_string=False).astimezone(conf.tz)
offset_hours = now.utcoffset().total_seconds() / 3600
else:
return datetime.datetime.now(datetime.UTC).strftime(DATETIME_PATTERN)
offset_hours = 0
offset_formatted = "{:+03d}:{:02d}".format(int(offset_hours), int((offset_hours % 1) * 60))
return offset_formatted
# -------------------------------------------------------------------------------
@@ -113,7 +109,10 @@ def normalizeTimeStamp(inputTimeStamp):
# -------------------------------------------------------------------------------------------
def format_date_iso(date_val: str) -> Optional[str]:
"""Ensures a date string from DB is returned as a proper ISO string with TZ."""
"""Ensures a date string from DB is returned as a proper ISO string with TZ.
Assumes DB timestamps are stored in UTC and converts them to user's configured timezone.
"""
if not date_val:
return None
@@ -125,10 +124,14 @@ def format_date_iso(date_val: str) -> Optional[str]:
else:
dt = date_val
# 2. If it has no timezone, ATTACH (don't convert) your config TZ
# 2. If it has no timezone, assume it's UTC (our DB storage format)
# then CONVERT to user's configured timezone
if dt.tzinfo is None:
# Mark as UTC first
dt = dt.replace(tzinfo=datetime.UTC)
# Convert to user's timezone
target_tz = conf.tz if isinstance(conf.tz, datetime.tzinfo) else ZoneInfo(conf.tz)
dt = dt.replace(tzinfo=target_tz)
dt = dt.astimezone(target_tz)
# 3. Return the string. .isoformat() will now include the +11:00 or +10:00
return dt.isoformat()
@@ -151,7 +154,7 @@ def format_event_date(date_str: str, event_type: str) -> str:
# -------------------------------------------------------------------------------------------
def ensure_datetime(dt: Union[str, datetime.datetime, None]) -> datetime.datetime:
if dt is None:
return timeNowTZ()
return timeNowUTC(as_string=False)
if isinstance(dt, str):
return datetime.datetime.fromisoformat(dt)
return dt
@@ -172,6 +175,10 @@ def parse_datetime(dt_str):
def format_date(date_str: str) -> str:
"""Format a date string from DB for display.
Assumes DB timestamps are stored in UTC and converts them to user's configured timezone.
"""
try:
if not date_str:
return ""
@@ -179,25 +186,21 @@ def format_date(date_str: str) -> str:
date_str = re.sub(r"\s+", " ", str(date_str).strip())
dt = parse_datetime(date_str)
if dt.tzinfo is None:
if isinstance(conf.tz, str):
dt = dt.replace(tzinfo=ZoneInfo(conf.tz))
else:
dt = dt.replace(tzinfo=conf.tz)
if not dt:
return f"invalid:{repr(date_str)}"
# If the DB has no timezone, we tell Python what it IS,
# we don't CONVERT it.
# If the DB timestamp has no timezone, assume it's UTC (our storage format)
# then CONVERT to user's configured timezone
if dt.tzinfo is None:
# Option A: If the DB time is already AEDT, use AEDT.
# Option B: Use conf.tz if that is your 'source of truth'
dt = dt.replace(tzinfo=conf.tz)
# Mark as UTC first
dt = dt.replace(tzinfo=datetime.UTC)
# Convert to user's timezone
if isinstance(conf.tz, str):
dt = dt.astimezone(ZoneInfo(conf.tz))
else:
dt = dt.astimezone(conf.tz)
# IMPORTANT: Return the ISO format of the object AS IS.
# Calling .astimezone() here triggers a conversion to the
# System Local Time , which is causing your shift.
# Return ISO format with timezone offset
return dt.isoformat()
except Exception as e:
@@ -207,7 +210,7 @@ def format_date(date_str: str) -> str:
def format_date_diff(date1, date2, tz_name):
"""
Return difference between two datetimes as 'Xd HH:MM'.
Uses app timezone if datetime is naive.
Assumes DB timestamps are stored in UTC and converts them to user's configured timezone.
date2 can be None (uses now).
"""
# Get timezone from settings
@@ -215,20 +218,22 @@ def format_date_diff(date1, date2, tz_name):
def parse_dt(dt):
if dt is None:
return datetime.datetime.now(tz)
# Get current UTC time and convert to user's timezone
return timeNowUTC(as_string=False).astimezone(tz)
if isinstance(dt, str):
try:
dt_parsed = email.utils.parsedate_to_datetime(dt)
except (ValueError, TypeError):
# fallback: parse ISO string
dt_parsed = datetime.datetime.fromisoformat(dt)
# convert naive GMT/UTC to app timezone
# If naive (no timezone), assume it's UTC from DB, then convert to user's timezone
if dt_parsed.tzinfo is None:
dt_parsed = tz.localize(dt_parsed)
dt_parsed = dt_parsed.replace(tzinfo=datetime.UTC).astimezone(tz)
else:
dt_parsed = dt_parsed.astimezone(tz)
return dt_parsed
return dt if dt.tzinfo else tz.localize(dt)
# If datetime object without timezone, assume it's UTC from DB
return dt.astimezone(tz) if dt.tzinfo else dt.replace(tzinfo=datetime.UTC).astimezone(tz)
dt1 = parse_dt(date1)
dt2 = parse_dt(date2)

View File

@@ -8,7 +8,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
@@ -43,7 +43,7 @@ def b64(sql: str) -> str:
# -----------------------------
def test_dbquery_create_device(client, api_token, test_mac):
now = timeNowDB()
now = timeNowUTC()
sql = f"""
INSERT INTO Devices (devMac, devName, devVendor, devOwner, devFirstConnection, devLastConnection, devLastIP)

View File

@@ -8,7 +8,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowTZ # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
@@ -38,7 +38,7 @@ def create_event(client, api_token, mac, event="UnitTest Event", days_old=None):
# Calculate the event_time if days_old is given
if days_old is not None:
event_time = timeNowTZ() - timedelta(days=days_old)
event_time = timeNowUTC(as_string=False) - timedelta(days=days_old)
# ISO 8601 string
payload["event_time"] = event_time.isoformat()
@@ -140,7 +140,7 @@ def test_delete_events_dynamic_days(client, api_token, test_mac):
# Count pre-existing events younger than 30 days for test_mac
# These will remain after delete operation
from datetime import datetime
thirty_days_ago = timeNowTZ() - timedelta(days=30)
thirty_days_ago = timeNowUTC(as_string=False) - timedelta(days=30)
initial_younger_count = 0
for ev in initial_events:
if ev.get("eve_MAC") == test_mac and ev.get("eve_DateTime"):

View File

@@ -8,7 +8,7 @@ INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowTZ, timeNowDB # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
@@ -50,7 +50,7 @@ def test_create_session(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowDB(),
"start_time": timeNowUTC(),
"event_type_conn": "Connected",
"event_type_disc": "Disconnected"
}
@@ -65,7 +65,7 @@ def test_list_sessions(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
@@ -82,7 +82,7 @@ def test_device_sessions_by_period(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.200",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
resp_create = client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
assert resp_create.status_code == 200
@@ -117,7 +117,7 @@ def test_device_session_events(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.250",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
resp_create = client.post(
"/sessions/create",
@@ -166,7 +166,7 @@ def test_delete_session(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowDB()
"start_time": timeNowUTC()
}
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
@@ -188,7 +188,7 @@ def test_get_sessions_calendar(client, api_token, test_mac):
Cleans up test sessions after test.
"""
# --- Setup: create two sessions for the test MAC ---
now = timeNowTZ()
now = timeNowUTC(as_string=False)
start1 = (now - timedelta(days=2)).isoformat(timespec="seconds")
end1 = (now - timedelta(days=1, hours=20)).isoformat(timespec="seconds")

View File

@@ -0,0 +1,238 @@
"""
Unit tests for database timestamp migration to UTC.
Tests verify that:
- Migration detects version correctly from Settings table
- Fresh installs skip migration (empty VERSION)
- Upgrades from v26.2.6+ skip migration (already UTC)
- Upgrades from <v26.2.6 run migration (convert local→UTC)
- Migration handles timezone offset calculations correctly
- Migration is idempotent (safe to run multiple times)
"""
import sys
import os
import pytest
import sqlite3
import tempfile
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from db.db_upgrade import migrate_timestamps_to_utc, is_timestamps_in_utc # noqa: E402
from utils.datetime_utils import timeNowUTC # noqa: E402
@pytest.fixture
def temp_db():
"""Create a temporary database for testing"""
fd, db_path = tempfile.mkstemp(suffix='.db')
os.close(fd)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create Settings table
cursor.execute("""
CREATE TABLE Settings (
setKey TEXT PRIMARY KEY,
setValue TEXT
)
""")
# Create Devices table with timestamp columns
cursor.execute("""
CREATE TABLE Devices (
devMac TEXT PRIMARY KEY,
devFirstConnection TEXT,
devLastConnection TEXT,
devLastNotification TEXT
)
""")
conn.commit()
yield cursor, conn
conn.close()
os.unlink(db_path)
class TestTimestampMigration:
"""Test suite for UTC timestamp migration"""
def test_migrate_fresh_install_skips_migration(self, temp_db):
"""Test that fresh install with empty VERSION skips migration"""
cursor, conn = temp_db
# Empty Settings table (fresh install)
result = migrate_timestamps_to_utc(cursor)
assert result is True
# Should return without error
def test_migrate_unknown_version_skips_migration(self, temp_db):
"""Test that 'unknown' VERSION skips migration"""
cursor, conn = temp_db
# Insert 'unknown' VERSION
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', 'unknown')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_26_2_6_skips_migration(self, temp_db):
"""Test that v26.2.6 skips migration (already UTC)"""
cursor, conn = temp_db
# Insert VERSION v26.2.6
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '26.2.6')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_27_0_0_skips_migration(self, temp_db):
"""Test that v27.0.0 skips migration (newer version)"""
cursor, conn = temp_db
# Insert VERSION v27.0.0
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '27.0.0')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_26_3_0_skips_migration(self, temp_db):
"""Test that v26.3.0 skips migration (newer minor version)"""
cursor, conn = temp_db
# Insert VERSION v26.3.0
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '26.3.0')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_old_version_triggers_migration(self, temp_db):
"""Test that v25.x.x triggers migration"""
cursor, conn = temp_db
# Insert VERSION v25.1.0
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '25.1.0')")
# Insert a sample device with timestamp
now_str = timeNowUTC()
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection, devLastConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?, ?)
""", (now_str, now_str))
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_version_with_v_prefix(self, temp_db):
"""Test that version string with 'v' prefix is parsed correctly"""
cursor, conn = temp_db
# Insert VERSION with 'v' prefix
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', 'v26.2.6')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_malformed_version_uses_fallback(self, temp_db):
"""Test that malformed version string uses timestamp detection fallback"""
cursor, conn = temp_db
# Insert malformed VERSION
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', 'invalid.version')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
# Should not crash, should use fallback detection
assert result is True
def test_migrate_version_26_2_5_triggers_migration(self, temp_db):
"""Test that v26.2.5 (one patch before UTC) triggers migration"""
cursor, conn = temp_db
# Insert VERSION v26.2.5
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '26.2.5')")
# Insert sample device
now_str = timeNowUTC()
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?)
""", (now_str,))
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_migrate_does_not_crash_on_empty_devices_table(self, temp_db):
"""Test that migration handles empty Devices table gracefully"""
cursor, conn = temp_db
# Insert old VERSION but no devices
cursor.execute("INSERT INTO Settings (setKey, setValue) VALUES ('VERSION', '25.1.0')")
conn.commit()
result = migrate_timestamps_to_utc(cursor)
assert result is True
def test_is_timestamps_in_utc_returns_true_for_empty_table(self, temp_db):
"""Test that is_timestamps_in_utc returns True for empty Devices table"""
cursor, conn = temp_db
result = is_timestamps_in_utc(cursor)
assert result is True
def test_is_timestamps_in_utc_detects_utc_timestamps(self, temp_db):
"""Test that is_timestamps_in_utc correctly identifies UTC timestamps"""
cursor, conn = temp_db
# Insert devices with UTC timestamps
now_str = timeNowUTC()
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?)
""", (now_str,))
conn.commit()
result = is_timestamps_in_utc(cursor)
# Should return False for naive timestamps (no timezone marker)
# This is expected behavior - naive timestamps need migration check
assert result is False
def test_is_timestamps_in_utc_detects_timezone_markers(self, temp_db):
"""Test that is_timestamps_in_utc detects timestamps with timezone info"""
cursor, conn = temp_db
# Insert device with timezone marker
timestamp_with_tz = "2026-02-11 11:37:02+00:00"
cursor.execute("""
INSERT INTO Devices (devMac, devFirstConnection)
VALUES ('aa:bb:cc:dd:ee:ff', ?)
""", (timestamp_with_tz,))
conn.commit()
result = is_timestamps_in_utc(cursor)
# Should detect timezone marker
assert result is True

View File

@@ -0,0 +1,106 @@
"""
Unit tests for datetime_utils.py UTC timestamp functions.
Tests verify that:
- timeNowUTC() returns correct formats (string and datetime object)
- All timestamps are in UTC timezone
- No other functions call datetime.datetime.now() (single source of truth)
"""
import sys
import os
import datetime
import pytest
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from utils.datetime_utils import timeNowUTC, DATETIME_PATTERN # noqa: E402
class TestTimeNowUTC:
"""Test suite for timeNowUTC() function"""
def test_timeNowUTC_returns_string_by_default(self):
"""Test that timeNowUTC() returns a string by default"""
result = timeNowUTC()
assert isinstance(result, str)
assert len(result) == 19 # 'YYYY-MM-DD HH:MM:SS' format
def test_timeNowUTC_string_format(self):
"""Test that timeNowUTC() returns correct string format"""
result = timeNowUTC()
# Verify format matches DATETIME_PATTERN
try:
datetime.datetime.strptime(result, DATETIME_PATTERN)
except ValueError:
pytest.fail(f"timeNowUTC() returned invalid format: {result}")
def test_timeNowUTC_returns_datetime_object_when_false(self):
"""Test that timeNowUTC(as_string=False) returns datetime object"""
result = timeNowUTC(as_string=False)
assert isinstance(result, datetime.datetime)
def test_timeNowUTC_datetime_has_UTC_timezone(self):
"""Test that datetime object has UTC timezone"""
result = timeNowUTC(as_string=False)
assert result.tzinfo is datetime.UTC
def test_timeNowUTC_datetime_no_microseconds(self):
"""Test that datetime object has microseconds set to 0"""
result = timeNowUTC(as_string=False)
assert result.microsecond == 0
def test_timeNowUTC_consistency_between_modes(self):
"""Test that string and datetime modes return consistent values"""
dt_obj = timeNowUTC(as_string=False)
str_result = timeNowUTC(as_string=True)
# Convert datetime to string and compare (within 1 second tolerance)
dt_str = dt_obj.strftime(DATETIME_PATTERN)
# Parse both to compare timestamps
t1 = datetime.datetime.strptime(dt_str, DATETIME_PATTERN)
t2 = datetime.datetime.strptime(str_result, DATETIME_PATTERN)
diff = abs((t1 - t2).total_seconds())
assert diff <= 1 # Allow 1 second difference
def test_timeNowUTC_is_actually_UTC(self):
"""Test that timeNowUTC() returns actual UTC time, not local time"""
utc_now = datetime.datetime.now(datetime.UTC).replace(microsecond=0)
result = timeNowUTC(as_string=False)
# Should be within 1 second
diff = abs((utc_now - result).total_seconds())
assert diff <= 1
def test_timeNowUTC_string_matches_datetime_conversion(self):
"""Test that string result matches datetime object conversion"""
dt_obj = timeNowUTC(as_string=False)
str_result = timeNowUTC(as_string=True)
# Convert datetime to string using same format
expected = dt_obj.strftime(DATETIME_PATTERN)
# Should be same or within 1 second
t1 = datetime.datetime.strptime(expected, DATETIME_PATTERN)
t2 = datetime.datetime.strptime(str_result, DATETIME_PATTERN)
diff = abs((t1 - t2).total_seconds())
assert diff <= 1
def test_timeNowUTC_explicit_true_parameter(self):
"""Test that timeNowUTC(as_string=True) explicitly returns string"""
result = timeNowUTC(as_string=True)
assert isinstance(result, str)
def test_timeNowUTC_multiple_calls_increase(self):
"""Test that subsequent calls return increasing timestamps"""
import time
t1_str = timeNowUTC()
time.sleep(0.1)
t2_str = timeNowUTC()
t1 = datetime.datetime.strptime(t1_str, DATETIME_PATTERN)
t2 = datetime.datetime.strptime(t2_str, DATETIME_PATTERN)
assert t2 >= t1