From 8003a12dfc0dcd8dba0fee17fa4fd7efe77810ae Mon Sep 17 00:00:00 2001 From: Admin9705 <9705@duck.com> Date: Sat, 10 May 2025 09:53:28 -0400 Subject: [PATCH] feat: implement hourly API rate limiting with automatic cap reset scheduler --- src/primary/background.py | 103 +++++++++++- src/primary/hourly_cap_scheduler.py | 149 +++++++++++++++++ src/primary/stats_manager.py | 242 +++++++++++++++++++++++++++- 3 files changed, 483 insertions(+), 11 deletions(-) create mode 100644 src/primary/hourly_cap_scheduler.py diff --git a/src/primary/background.py b/src/primary/background.py index 689771c0..8bd1c064 100644 --- a/src/primary/background.py +++ b/src/primary/background.py @@ -13,6 +13,8 @@ import importlib import logging import threading from typing import Dict, List, Optional, Callable, Union, Tuple +import datetime +import traceback # Define the version number __version__ = "1.0.0" # Consider updating this based on changes @@ -27,10 +29,13 @@ from src.primary import config, settings_manager from src.primary.state import check_state_reset, calculate_reset_time # from src.primary.utils.app_utils import get_ip_address # No longer used here -# Track active threads and stop flag +# Global state for managing app threads and their status app_threads: Dict[str, threading.Thread] = {} stop_event = threading.Event() # Use an event for clearer stop signaling +# Hourly cap scheduler thread +hourly_cap_scheduler_thread = None + def app_specific_loop(app_type: str) -> None: """ Main processing loop for a specific Arr application. @@ -502,22 +507,106 @@ def shutdown_handler(signum, frame): def shutdown_threads(): """Wait for all threads to finish.""" - logger.info("Waiting for app threads to finish...") - active_thread_list = list(app_threads.values()) - for thread in active_thread_list: - thread.join(timeout=15) # Wait up to 15 seconds per thread + logger.info("Waiting for all app threads to stop...") + + # Stop the hourly API cap scheduler + global hourly_cap_scheduler_thread + if hourly_cap_scheduler_thread and hourly_cap_scheduler_thread.is_alive(): + # The thread should exit naturally due to the stop_event being set + logger.info("Waiting for hourly API cap scheduler to stop...") + hourly_cap_scheduler_thread.join(timeout=5.0) + if hourly_cap_scheduler_thread.is_alive(): + logger.warning("Hourly API cap scheduler did not stop gracefully") + else: + logger.info("Hourly API cap scheduler stopped") + + # Wait for all threads to terminate + for thread in app_threads.values(): if thread.is_alive(): - logger.warning(f"Thread {thread.name} did not stop gracefully.") + thread.join(timeout=10.0) + logger.info("All app threads stopped.") +def hourly_cap_scheduler_loop(): + """Main loop for the hourly API cap scheduler thread + Checks time every 30 seconds and resets caps if needed at the top of the hour + """ + logger.info("Starting hourly API cap scheduler loop") + + try: + from src.primary.stats_manager import reset_hourly_caps + + # Initial check in case we're starting right at the top of an hour + current_time = datetime.datetime.now() + if current_time.minute == 0: + logger.info(f"Initial hourly reset triggered at {current_time.hour}:00") + reset_hourly_caps() + + # Main monitoring loop + while not stop_event.is_set(): + try: + # Sleep for 30 seconds between checks + # This ensures we won't miss the top of the hour + stop_event.wait(30) + + if stop_event.is_set(): + break + + # Check if it's the top of the hour (00 minute mark) + current_time = datetime.datetime.now() + if current_time.minute == 0: + logger.info(f"Hourly reset triggered at {current_time.hour}:00") + success = reset_hourly_caps() + if success: + logger.info(f"Successfully reset hourly API caps at {current_time.hour}:00") + else: + logger.error(f"Failed to reset hourly API caps at {current_time.hour}:00") + + except Exception as e: + logger.error(f"Error in hourly cap scheduler: {e}") + logger.error(traceback.format_exc()) + # Sleep briefly to avoid spinning in case of repeated errors + time.sleep(5) + + except Exception as e: + logger.error(f"Fatal error in hourly cap scheduler: {e}") + logger.error(traceback.format_exc()) + + logger.info("Hourly API cap scheduler stopped") + +def start_hourly_cap_scheduler(): + """Start the hourly API cap scheduler thread""" + global hourly_cap_scheduler_thread + + if hourly_cap_scheduler_thread and hourly_cap_scheduler_thread.is_alive(): + logger.info("Hourly API cap scheduler already running") + return + + # Create and start the scheduler thread + hourly_cap_scheduler_thread = threading.Thread( + target=hourly_cap_scheduler_loop, + name="HourlyCapScheduler", + daemon=True + ) + hourly_cap_scheduler_thread.start() + + logger.info(f"Hourly API cap scheduler started. Thread is alive: {hourly_cap_scheduler_thread.is_alive()}") + def start_huntarr(): """Main entry point for Huntarr background tasks.""" logger.info(f"--- Starting Huntarr Background Tasks v{__version__} --- ") - + # Perform initial settings migration if specified (e.g., via env var or arg) if os.environ.get("HUNTARR_RUN_MIGRATION", "false").lower() == "true": logger.info("Running settings migration from huntarr.json (if found)...") settings_manager.migrate_from_huntarr_json() + + # Start the hourly API cap scheduler + try: + start_hourly_cap_scheduler() + logger.info("Hourly API cap scheduler started successfully") + except Exception as e: + logger.error(f"Failed to start hourly API cap scheduler: {e}") # Log initial configuration for all known apps for app_name in settings_manager.KNOWN_APP_TYPES: # Corrected attribute name diff --git a/src/primary/hourly_cap_scheduler.py b/src/primary/hourly_cap_scheduler.py new file mode 100644 index 00000000..ed8852bc --- /dev/null +++ b/src/primary/hourly_cap_scheduler.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +"""Hourly API Cap Scheduler for Huntarr +Handles checking time and resetting hourly API caps at the top of each hour (00 minute mark) +""" + +import threading +import time +import datetime +import traceback +import logging + +# Try both import patterns to handle different module contexts +try: + # When imported from the main app + from src.primary.utils.logger import get_logger + from src.primary.stats_manager import reset_hourly_caps + logger = get_logger("hourly_caps") +except ImportError: + try: + # When imported within the package + from primary.utils.logger import get_logger + from primary.stats_manager import reset_hourly_caps + logger = get_logger("hourly_caps") + except ImportError: + # Fallback to standard logging in case neither works + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger("hourly_caps") + logger.error("Failed to import Huntarr modules, using fallback logging") + +# Print startup message to help with debugging +print("Hourly API Cap Scheduler module loaded") +logger.info("Hourly API Cap Scheduler module initialized") + +# Global variables +stop_event = threading.Event() +scheduler_thread = None + +def check_and_reset_caps(): + """ + Check if it's the top of the hour (00 minute mark) and reset hourly API caps if needed + """ + try: + current_time = datetime.datetime.now() + + # Only reset at the top of the hour (minute 0) + if current_time.minute == 0: + logger.info(f"Hourly reset triggered at {current_time.hour}:00") + try: + success = reset_hourly_caps() + if success: + logger.info(f"Successfully reset hourly API caps at {current_time.hour}:00") + else: + logger.error(f"Failed to reset hourly API caps at {current_time.hour}:00") + except Exception as e: + logger.error(f"Exception when resetting hourly caps: {e}") + logger.error(traceback.format_exc()) + else: + logger.debug(f"Not time to reset hourly caps yet. Current time: {current_time.hour}:{current_time.minute}") + except Exception as e: + logger.error(f"Unexpected error in check_and_reset_caps: {e}") + logger.error(traceback.format_exc()) + +def scheduler_loop(): + """ + Main loop for the scheduler thread + Checks time every 30 seconds and resets caps if needed + """ + try: + logger.info("Starting hourly API cap scheduler") + print("Hourly API cap scheduler thread starting") + + # Initial check on startup + check_and_reset_caps() + + while not stop_event.is_set(): + try: + # Sleep for 30 seconds between checks + # This ensures we won't miss the top of the hour + stop_event.wait(30) + + if stop_event.is_set(): + logger.info("Stop event detected, exiting scheduler loop") + break + + # Check if it's time to reset the caps + check_and_reset_caps() + + except Exception as e: + logger.error(f"Error in hourly cap scheduler loop: {e}") + logger.error(traceback.format_exc()) + # Sleep briefly to avoid spinning in case of repeated errors + time.sleep(5) + + logger.info("Hourly API cap scheduler stopped") + except Exception as e: + logger.error(f"Fatal error in scheduler_loop: {e}") + logger.error(traceback.format_exc()) + print(f"FATAL ERROR in hourly cap scheduler: {e}") + +def start_scheduler(): + """ + Start the hourly API cap scheduler thread + """ + global scheduler_thread + + try: + logger.info("Attempting to start hourly API cap scheduler") + print("Attempting to start hourly API cap scheduler") + + if scheduler_thread and scheduler_thread.is_alive(): + logger.info("Hourly API cap scheduler already running") + return + + # Reset the stop event + stop_event.clear() + + # Create and start the scheduler thread + scheduler_thread = threading.Thread(target=scheduler_loop, name="HourlyCapScheduler", daemon=True) + scheduler_thread.start() + + logger.info(f"Hourly API cap scheduler started. Thread is alive: {scheduler_thread.is_alive()}") + print(f"Hourly API cap scheduler started. Thread is alive: {scheduler_thread.is_alive()}") + return True + except Exception as e: + logger.error(f"Failed to start hourly API cap scheduler: {e}") + logger.error(traceback.format_exc()) + print(f"Failed to start hourly API cap scheduler: {e}") + return False + +def stop_scheduler(): + """ + Stop the hourly API cap scheduler thread + """ + global scheduler_thread + + if not scheduler_thread or not scheduler_thread.is_alive(): + logger.info("Hourly API cap scheduler not running") + return + + # Signal the thread to stop + stop_event.set() + + # Wait for the thread to terminate (with timeout) + scheduler_thread.join(timeout=5.0) + + if scheduler_thread.is_alive(): + logger.warning("Hourly API cap scheduler did not terminate gracefully") + else: + logger.info("Hourly API cap scheduler stopped gracefully") diff --git a/src/primary/stats_manager.py b/src/primary/stats_manager.py index 51aff12d..178a3421 100644 --- a/src/primary/stats_manager.py +++ b/src/primary/stats_manager.py @@ -2,14 +2,17 @@ """ Statistics Manager for Huntarr Handles tracking, storing, and retrieving statistics about hunted and upgraded media +and monitoring hourly API usage for rate limiting """ import os import json import time +import datetime import threading from typing import Dict, Any, Optional from src.primary.utils.logger import get_logger +from src.primary.settings_manager import get_advanced_setting logger = get_logger("stats") @@ -22,6 +25,7 @@ STATS_DIRS = [ # Lock for thread-safe operations stats_lock = threading.Lock() +hourly_lock = threading.Lock() def find_writable_stats_dir(): """Find a writable directory for stats from the list of candidates""" @@ -51,13 +55,21 @@ def find_writable_stats_dir(): # Find the best stats directory STATS_DIR = find_writable_stats_dir() STATS_FILE = os.path.join(STATS_DIR, "media_stats.json") if STATS_DIR else None +HOURLY_CAP_FILE = os.path.join(STATS_DIR, "hourly_cap.json") if STATS_DIR else None # Log the stats file location once at module load time if STATS_FILE: logger.info(f"===> Stats will be stored at: {STATS_FILE}") + logger.info(f"===> Hourly API cap tracking will be stored at: {HOURLY_CAP_FILE}") else: logger.error("===> CRITICAL: No stats file location could be determined!") +# Store the last hour we checked for resetting hourly caps +last_hour_checked = None + +# Schedule the next hourly reset check +next_reset_check = None + def ensure_stats_dir(): """Ensure the statistics directory exists""" if not STATS_DIR: @@ -117,6 +129,214 @@ def get_default_stats() -> Dict[str, Dict[str, int]]: "swaparr": {"hunted": 0, "upgraded": 0} } +def get_default_hourly_caps() -> Dict[str, Dict[str, int]]: + """Get the default hourly caps structure""" + return { + "sonarr": {"api_hits": 0}, + "radarr": {"api_hits": 0}, + "lidarr": {"api_hits": 0}, + "readarr": {"api_hits": 0}, + "whisparr": {"api_hits": 0}, + "eros": {"api_hits": 0} + } + +def load_hourly_caps() -> Dict[str, Dict[str, int]]: + """ + Load hourly API caps from the caps file + + Returns: + Dictionary containing hourly API usage for each app + """ + if not ensure_stats_dir() or not HOURLY_CAP_FILE: + logger.error("Cannot load hourly caps - no valid stats directory available") + return get_default_hourly_caps() + + default_caps = get_default_hourly_caps() + + try: + if os.path.exists(HOURLY_CAP_FILE): + logger.debug(f"Loading hourly caps from: {HOURLY_CAP_FILE}") + with open(HOURLY_CAP_FILE, 'r') as f: + caps = json.load(f) + + # Ensure all apps are in the caps + for app in default_caps: + if app not in caps: + caps[app] = default_caps[app] + + logger.debug(f"Loaded hourly caps: {caps}") + return caps + else: + logger.info(f"Hourly caps file not found at {HOURLY_CAP_FILE}, using default caps") + return default_caps + except Exception as e: + logger.error(f"Error loading hourly caps from {HOURLY_CAP_FILE}: {e}") + return default_caps + +def save_hourly_caps(caps: Dict[str, Dict[str, int]]) -> bool: + """ + Save hourly API caps to the caps file + + Args: + caps: Dictionary containing hourly API usage for each app + + Returns: + True if successful, False otherwise + """ + if not ensure_stats_dir() or not HOURLY_CAP_FILE: + logger.error("Cannot save hourly caps - no valid stats directory available") + return False + + try: + logger.debug(f"Saving hourly caps to: {HOURLY_CAP_FILE}") + # First write to a temp file, then move it to avoid partial writes + temp_file = f"{HOURLY_CAP_FILE}.tmp" + with open(temp_file, 'w') as f: + json.dump(caps, f, indent=2) + f.flush() + os.fsync(f.fileno()) + + # Move the temp file to the actual file + os.replace(temp_file, HOURLY_CAP_FILE) + + logger.debug(f"Hourly caps saved successfully: {caps}") + return True + except Exception as e: + logger.error(f"Error saving hourly caps to {HOURLY_CAP_FILE}: {e}", exc_info=True) + return False + +def reset_hourly_caps() -> bool: + """ + Reset all hourly API caps to zero at the top of the hour + + Returns: + True if successful, False otherwise + """ + logger.info("=== RESETTING HOURLY API CAPS ===") + try: + if os.path.exists(HOURLY_CAP_FILE): + os.remove(HOURLY_CAP_FILE) + logger.info(f"Deleted hourly caps file at {HOURLY_CAP_FILE} to reset all API caps") + + # Create a fresh hourly caps file + caps = get_default_hourly_caps() + save_success = save_hourly_caps(caps) + + if save_success: + logger.info("Successfully reset all hourly API caps to zero") + return True + else: + logger.error("Failed to save reset hourly caps") + return False + except Exception as e: + logger.error(f"Error resetting hourly API caps: {e}") + return False + +def check_hourly_reset(): + """ + Check if we need to reset hourly caps based on the current hour + """ + global last_hour_checked, next_reset_check + + current_time = datetime.datetime.now() + current_hour = current_time.hour + + # Skip if we've already checked this hour + if last_hour_checked == current_hour: + return + + # Only reset at the top of the hour (00 minute mark) + if current_time.minute == 0: + logger.info(f"Hour changed to {current_hour}:00, resetting hourly API caps") + reset_hourly_caps() + last_hour_checked = current_hour + +def increment_hourly_cap(app_type: str, count: int = 1) -> bool: + """ + Increment hourly API usage cap for a specific app + + Args: + app_type: The application type (sonarr, radarr, etc.) + count: The amount to increment by (default: 1) + + Returns: + True if successful, False otherwise + """ + if app_type not in ["sonarr", "radarr", "lidarr", "readarr", "whisparr", "eros"]: + logger.error(f"Invalid app_type for hourly cap: {app_type}") + return False + + # Check if we need to reset hourly caps + check_hourly_reset() + + with hourly_lock: + caps = load_hourly_caps() + prev_value = caps[app_type]["api_hits"] + caps[app_type]["api_hits"] += count + new_value = caps[app_type]["api_hits"] + + # Get the hourly cap from settings + hourly_limit = get_advanced_setting("hourly_cap", 20) + + # Log current usage vs limit + logger.info(f"*** HOURLY API INCREMENT *** {app_type} by {count}: {prev_value} -> {new_value} (limit: {hourly_limit})") + + # Warn if approaching limit + if new_value >= int(hourly_limit * 0.8) and prev_value < int(hourly_limit * 0.8): + logger.warning(f"{app_type} is approaching hourly API cap: {new_value}/{hourly_limit}") + + # Alert if exceeding limit + if new_value >= hourly_limit and prev_value < hourly_limit: + logger.error(f"{app_type} has exceeded hourly API cap: {new_value}/{hourly_limit}") + + save_success = save_hourly_caps(caps) + + if not save_success: + logger.error(f"Failed to save hourly caps after incrementing {app_type}") + return False + + return True + +def get_hourly_cap_status(app_type: str) -> Dict[str, Any]: + """ + Get current API usage status for an app + + Args: + app_type: The application type (sonarr, radarr, etc.) + + Returns: + Dictionary with usage information + """ + if app_type not in ["sonarr", "radarr", "lidarr", "readarr", "whisparr", "eros"]: + return {"error": f"Invalid app_type: {app_type}"} + + with hourly_lock: + caps = load_hourly_caps() + hourly_limit = get_advanced_setting("hourly_cap", 20) + current_usage = caps[app_type]["api_hits"] + + return { + "app": app_type, + "current_usage": current_usage, + "limit": hourly_limit, + "remaining": max(0, hourly_limit - current_usage), + "percent_used": int((current_usage / hourly_limit) * 100) if hourly_limit > 0 else 0, + "exceeded": current_usage >= hourly_limit + } + +def check_hourly_cap_exceeded(app_type: str) -> bool: + """ + Check if an app has exceeded its hourly API cap + + Args: + app_type: The application type (sonarr, radarr, etc.) + + Returns: + True if exceeded, False otherwise + """ + status = get_hourly_cap_status(app_type) + return status.get("exceeded", False) + def save_stats(stats: Dict[str, Dict[str, int]]) -> bool: """ Save statistics to the stats file @@ -170,6 +390,10 @@ def increment_stat(app_type: str, stat_type: str, count: int = 1) -> bool: logger.error(f"Invalid stat_type: {stat_type}") return False + # Also increment the hourly API cap for this app, unless it's swaparr which doesn't have an API + if app_type != "swaparr": + increment_hourly_cap(app_type, count) + with stats_lock: stats = load_stats() prev_value = stats[app_type][stat_type] @@ -233,9 +457,19 @@ def reset_stats(app_type: Optional[str] = None) -> bool: return save_stats(stats) -# Initialize stats file with find_writable_stats_dir already called during import -if STATS_DIR and not os.path.exists(STATS_FILE): - logger.info(f"Creating new stats file at: {STATS_FILE}") - save_stats(get_default_stats()) +# Initialize the files with find_writable_stats_dir already called during import +if STATS_DIR: + # Initialize stats file if it doesn't exist + if not os.path.exists(STATS_FILE): + logger.info(f"Creating new stats file at: {STATS_FILE}") + save_stats(get_default_stats()) + + # Initialize hourly caps file if it doesn't exist + if not os.path.exists(HOURLY_CAP_FILE): + logger.info(f"Creating new hourly caps file at: {HOURLY_CAP_FILE}") + save_hourly_caps(get_default_hourly_caps()) + + # Set up the initial hour check + last_hour_checked = datetime.datetime.now().hour else: logger.debug(f"Stats system initialized. Using file: {STATS_FILE}") \ No newline at end of file