mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-03-04 07:09:09 -05:00
whitespace
This commit is contained in:
@@ -140,7 +140,7 @@ def main():
|
||||
# Only device_scanner plugins have meaningful next_scan times for user display
|
||||
scanner_prefixes = {p["unique_prefix"] for p in all_plugins if p.get("plugin_type") == "device_scanner"}
|
||||
scanner_next = [s.last_next_schedule for s in conf.mySchedules if s.service in scanner_prefixes]
|
||||
|
||||
|
||||
# Get the earliest next scan time across all device scanners and broadcast.
|
||||
# updateState validates the value is in the future before storing/broadcasting.
|
||||
if scanner_next:
|
||||
|
||||
@@ -91,68 +91,68 @@ def get_timezone_offset():
|
||||
def is_datetime_future(dt, current_threshold=None):
|
||||
"""
|
||||
Check if a datetime is strictly in the future.
|
||||
|
||||
|
||||
Utility for validating that a datetime hasn't already passed.
|
||||
Used after retrieving pre-computed schedule times to ensure they're still valid.
|
||||
|
||||
|
||||
Args:
|
||||
dt: datetime.datetime object to validate
|
||||
current_threshold: datetime to compare against. If None, uses timeNowUTC(as_string=False)
|
||||
|
||||
|
||||
Returns:
|
||||
bool: True if dt is in the future, False otherwise
|
||||
|
||||
|
||||
Examples:
|
||||
if is_datetime_future(next_scan_dt):
|
||||
broadcast_to_frontend(next_scan_dt)
|
||||
"""
|
||||
if dt is None:
|
||||
return False
|
||||
|
||||
|
||||
if current_threshold is None:
|
||||
current_threshold = timeNowUTC(as_string=False)
|
||||
|
||||
|
||||
return dt > current_threshold
|
||||
|
||||
|
||||
def ensure_future_datetime(schedule_obj, current_threshold=None, max_retries=5):
|
||||
"""
|
||||
Ensure a schedule's next() call returns a datetime strictly in the future.
|
||||
|
||||
|
||||
This is a defensive utility for cron/schedule libraries that should always return
|
||||
future times but may have edge cases. Validates and retries if needed.
|
||||
|
||||
|
||||
Args:
|
||||
schedule_obj: A schedule object with a .next() method (e.g., from croniter/APScheduler)
|
||||
current_threshold: datetime to compare against. If None, uses timeNowTZ(as_string=False)
|
||||
max_retries: Maximum times to call .next() if result is not in future (default: 5)
|
||||
|
||||
|
||||
Returns:
|
||||
datetime.datetime: A guaranteed future datetime from schedule_obj.next()
|
||||
|
||||
|
||||
Raises:
|
||||
RuntimeError: If max_retries exceeded without getting a future time
|
||||
|
||||
|
||||
Examples:
|
||||
newSchedule = Cron(run_sch).schedule(start_date=timeNowUTC(as_string=False))
|
||||
next_time = ensure_future_datetime(newSchedule)
|
||||
"""
|
||||
if current_threshold is None:
|
||||
current_threshold = timeNowTZ(as_string=False)
|
||||
|
||||
|
||||
next_time = schedule_obj.next()
|
||||
retries = 0
|
||||
|
||||
|
||||
while next_time <= current_threshold and retries < max_retries:
|
||||
next_time = schedule_obj.next()
|
||||
retries += 1
|
||||
|
||||
|
||||
if next_time <= current_threshold:
|
||||
raise RuntimeError(
|
||||
f"[ensure_future_datetime] Failed to get future time after {max_retries} retries. "
|
||||
f"Last attempt: {next_time}, Current time: {current_threshold}"
|
||||
)
|
||||
|
||||
|
||||
return next_time
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user