next scan dsiplay DRY

This commit is contained in:
Jokob @NetAlertX
2026-03-03 12:31:50 +00:00
parent c533c2267c
commit ea77112315
5 changed files with 96 additions and 14 deletions

View File

@@ -124,16 +124,10 @@ def main():
# last time any scan or maintenance/upkeep was run
conf.last_scan_run = loop_start_time
# Compute the next scheduled run time across enabled device_scanner plugins
scanner_prefixes = {p["unique_prefix"] for p in all_plugins if p.get("plugin_type") == "device_scanner"}
scanner_next = [s.last_next_schedule for s in conf.mySchedules if s.service in scanner_prefixes]
next_scan_dt = min(scanner_next, default=None)
next_scan_time_iso = next_scan_dt.replace(microsecond=0).isoformat() if next_scan_dt else ""
# Header (also broadcasts last_scan_run + next_scan_time to frontend via SSE / app_state.json)
# Header (also broadcasts last_scan_run to frontend via SSE / app_state.json)
updateState("Process: Start",
last_scan_run=loop_start_time.replace(microsecond=0).isoformat(),
next_scan_time=next_scan_time_iso)
next_scan_time="")
# Timestamp
startTime = loop_start_time
@@ -142,6 +136,17 @@ def main():
# Check if any plugins need to run on schedule
pm.run_plugin_scripts("schedule")
# Compute the next scheduled run time AFTER schedule check (which updates last_next_schedule)
# Only device_scanner plugins have meaningful next_scan times for user display
scanner_prefixes = {p["unique_prefix"] for p in all_plugins if p.get("plugin_type") == "device_scanner"}
scanner_next = [s.last_next_schedule for s in conf.mySchedules if s.service in scanner_prefixes]
# Get the earliest next scan time across all device scanners and broadcast.
# updateState validates the value is in the future before storing/broadcasting.
if scanner_next:
next_scan_dt = min(scanner_next)
updateState(next_scan_time=next_scan_dt.replace(microsecond=0).isoformat())
# determine run/scan type based on passed time
# --------------------------------------------

View File

@@ -4,7 +4,7 @@ import json
from const import applicationPath, apiPath
from logger import mylog
from helper import checkNewVersion
from utils.datetime_utils import timeNowUTC
from utils.datetime_utils import timeNowUTC, is_datetime_future, normalizeTimeStamp
from api_server.sse_broadcast import broadcast_state_update
# Register NetAlertX directories using runtime configuration
@@ -142,7 +142,12 @@ class app_state_class:
if last_scan_run is not None:
self.last_scan_run = last_scan_run
if next_scan_time is not None:
self.next_scan_time = next_scan_time
# Guard against stale/past timestamps — only store if genuinely in the future.
# This enforces correctness regardless of which caller sets next_scan_time.
if next_scan_time == "" or is_datetime_future(normalizeTimeStamp(next_scan_time)):
self.next_scan_time = next_scan_time
else:
self.next_scan_time = ""
# check for new version every hour and if currently not running new version
if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(
timeNowUTC(as_string=False).timestamp()

View File

@@ -13,7 +13,7 @@ import conf
from const import fullConfPath, fullConfFolder, default_tz, applicationPath
from db.db_upgrade import ensure_views
from helper import getBuildTimeStampAndVersion, collect_lang_strings, updateSubnets, generate_random_string
from utils.datetime_utils import timeNowUTC
from utils.datetime_utils import timeNowUTC, ensure_future_datetime
from app_state import updateState
from logger import mylog
from api import update_api
@@ -682,9 +682,12 @@ def importConfigs(pm, db, all_plugins):
newSchedule = Cron(run_sch).schedule(
start_date=timeNowUTC(as_string=False)
)
# Get initial next schedule time, ensuring it's in the future
next_schedule_time = ensure_future_datetime(newSchedule, timeNowUTC(as_string=False))
conf.mySchedules.append(
schedule_class(
plugin["unique_prefix"], newSchedule, newSchedule.next(), False
plugin["unique_prefix"], newSchedule, next_schedule_time, False
)
)

View File

@@ -3,7 +3,7 @@
import datetime
from logger import mylog
from utils.datetime_utils import timeNowTZ
from utils.datetime_utils import timeNowTZ, ensure_future_datetime
# -------------------------------------------------------------------------------
@@ -48,6 +48,7 @@ class schedule_class:
if self.was_last_schedule_used:
self.was_last_schedule_used = False
self.last_next_schedule = self.scheduleObject.next()
# Get the next scheduled time, ensuring it's in the future
self.last_next_schedule = ensure_future_datetime(self.scheduleObject, timeNowTZ(as_string=False))
return result

View File

@@ -88,6 +88,74 @@ def get_timezone_offset():
# Date and time methods
# -------------------------------------------------------------------------------
def is_datetime_future(dt, current_threshold=None):
"""
Check if a datetime is strictly in the future.
Utility for validating that a datetime hasn't already passed.
Used after retrieving pre-computed schedule times to ensure they're still valid.
Args:
dt: datetime.datetime object to validate
current_threshold: datetime to compare against. If None, uses timeNowUTC(as_string=False)
Returns:
bool: True if dt is in the future, False otherwise
Examples:
if is_datetime_future(next_scan_dt):
broadcast_to_frontend(next_scan_dt)
"""
if dt is None:
return False
if current_threshold is None:
current_threshold = timeNowUTC(as_string=False)
return dt > current_threshold
def ensure_future_datetime(schedule_obj, current_threshold=None, max_retries=5):
"""
Ensure a schedule's next() call returns a datetime strictly in the future.
This is a defensive utility for cron/schedule libraries that should always return
future times but may have edge cases. Validates and retries if needed.
Args:
schedule_obj: A schedule object with a .next() method (e.g., from croniter/APScheduler)
current_threshold: datetime to compare against. If None, uses timeNowTZ(as_string=False)
max_retries: Maximum times to call .next() if result is not in future (default: 5)
Returns:
datetime.datetime: A guaranteed future datetime from schedule_obj.next()
Raises:
RuntimeError: If max_retries exceeded without getting a future time
Examples:
newSchedule = Cron(run_sch).schedule(start_date=timeNowUTC(as_string=False))
next_time = ensure_future_datetime(newSchedule)
"""
if current_threshold is None:
current_threshold = timeNowTZ(as_string=False)
next_time = schedule_obj.next()
retries = 0
while next_time <= current_threshold and retries < max_retries:
next_time = schedule_obj.next()
retries += 1
if next_time <= current_threshold:
raise RuntimeError(
f"[ensure_future_datetime] Failed to get future time after {max_retries} retries. "
f"Last attempt: {next_time}, Current time: {current_threshold}"
)
return next_time
def normalizeTimeStamp(inputTimeStamp):
"""
Normalize various timestamp formats into a datetime.datetime object.