feat: implement hourly API rate limiting with automatic cap reset scheduler

This commit is contained in:
Admin9705
2025-05-10 09:53:28 -04:00
parent 84989e0282
commit 8003a12dfc
3 changed files with 483 additions and 11 deletions

View File

@@ -13,6 +13,8 @@ import importlib
import logging
import threading
from typing import Dict, List, Optional, Callable, Union, Tuple
import datetime
import traceback
# Define the version number
__version__ = "1.0.0" # Consider updating this based on changes
@@ -27,10 +29,13 @@ from src.primary import config, settings_manager
from src.primary.state import check_state_reset, calculate_reset_time
# from src.primary.utils.app_utils import get_ip_address # No longer used here
# Track active threads and stop flag
# Global state for managing app threads and their status
app_threads: Dict[str, threading.Thread] = {}
stop_event = threading.Event() # Use an event for clearer stop signaling
# Hourly cap scheduler thread
hourly_cap_scheduler_thread = None
def app_specific_loop(app_type: str) -> None:
"""
Main processing loop for a specific Arr application.
@@ -502,22 +507,106 @@ def shutdown_handler(signum, frame):
def shutdown_threads():
"""Wait for all threads to finish."""
logger.info("Waiting for app threads to finish...")
active_thread_list = list(app_threads.values())
for thread in active_thread_list:
thread.join(timeout=15) # Wait up to 15 seconds per thread
logger.info("Waiting for all app threads to stop...")
# Stop the hourly API cap scheduler
global hourly_cap_scheduler_thread
if hourly_cap_scheduler_thread and hourly_cap_scheduler_thread.is_alive():
# The thread should exit naturally due to the stop_event being set
logger.info("Waiting for hourly API cap scheduler to stop...")
hourly_cap_scheduler_thread.join(timeout=5.0)
if hourly_cap_scheduler_thread.is_alive():
logger.warning("Hourly API cap scheduler did not stop gracefully")
else:
logger.info("Hourly API cap scheduler stopped")
# Wait for all threads to terminate
for thread in app_threads.values():
if thread.is_alive():
logger.warning(f"Thread {thread.name} did not stop gracefully.")
thread.join(timeout=10.0)
logger.info("All app threads stopped.")
def hourly_cap_scheduler_loop():
"""Main loop for the hourly API cap scheduler thread
Checks time every 30 seconds and resets caps if needed at the top of the hour
"""
logger.info("Starting hourly API cap scheduler loop")
try:
from src.primary.stats_manager import reset_hourly_caps
# Initial check in case we're starting right at the top of an hour
current_time = datetime.datetime.now()
if current_time.minute == 0:
logger.info(f"Initial hourly reset triggered at {current_time.hour}:00")
reset_hourly_caps()
# Main monitoring loop
while not stop_event.is_set():
try:
# Sleep for 30 seconds between checks
# This ensures we won't miss the top of the hour
stop_event.wait(30)
if stop_event.is_set():
break
# Check if it's the top of the hour (00 minute mark)
current_time = datetime.datetime.now()
if current_time.minute == 0:
logger.info(f"Hourly reset triggered at {current_time.hour}:00")
success = reset_hourly_caps()
if success:
logger.info(f"Successfully reset hourly API caps at {current_time.hour}:00")
else:
logger.error(f"Failed to reset hourly API caps at {current_time.hour}:00")
except Exception as e:
logger.error(f"Error in hourly cap scheduler: {e}")
logger.error(traceback.format_exc())
# Sleep briefly to avoid spinning in case of repeated errors
time.sleep(5)
except Exception as e:
logger.error(f"Fatal error in hourly cap scheduler: {e}")
logger.error(traceback.format_exc())
logger.info("Hourly API cap scheduler stopped")
def start_hourly_cap_scheduler():
"""Start the hourly API cap scheduler thread"""
global hourly_cap_scheduler_thread
if hourly_cap_scheduler_thread and hourly_cap_scheduler_thread.is_alive():
logger.info("Hourly API cap scheduler already running")
return
# Create and start the scheduler thread
hourly_cap_scheduler_thread = threading.Thread(
target=hourly_cap_scheduler_loop,
name="HourlyCapScheduler",
daemon=True
)
hourly_cap_scheduler_thread.start()
logger.info(f"Hourly API cap scheduler started. Thread is alive: {hourly_cap_scheduler_thread.is_alive()}")
def start_huntarr():
"""Main entry point for Huntarr background tasks."""
logger.info(f"--- Starting Huntarr Background Tasks v{__version__} --- ")
# Perform initial settings migration if specified (e.g., via env var or arg)
if os.environ.get("HUNTARR_RUN_MIGRATION", "false").lower() == "true":
logger.info("Running settings migration from huntarr.json (if found)...")
settings_manager.migrate_from_huntarr_json()
# Start the hourly API cap scheduler
try:
start_hourly_cap_scheduler()
logger.info("Hourly API cap scheduler started successfully")
except Exception as e:
logger.error(f"Failed to start hourly API cap scheduler: {e}")
# Log initial configuration for all known apps
for app_name in settings_manager.KNOWN_APP_TYPES: # Corrected attribute name

View File

@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""Hourly API Cap Scheduler for Huntarr
Handles checking time and resetting hourly API caps at the top of each hour (00 minute mark)
"""
import threading
import time
import datetime
import traceback
import logging
# Try both import patterns to handle different module contexts
try:
# When imported from the main app
from src.primary.utils.logger import get_logger
from src.primary.stats_manager import reset_hourly_caps
logger = get_logger("hourly_caps")
except ImportError:
try:
# When imported within the package
from primary.utils.logger import get_logger
from primary.stats_manager import reset_hourly_caps
logger = get_logger("hourly_caps")
except ImportError:
# Fallback to standard logging in case neither works
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("hourly_caps")
logger.error("Failed to import Huntarr modules, using fallback logging")
# Print startup message to help with debugging
print("Hourly API Cap Scheduler module loaded")
logger.info("Hourly API Cap Scheduler module initialized")
# Global variables
stop_event = threading.Event()
scheduler_thread = None
def check_and_reset_caps():
"""
Check if it's the top of the hour (00 minute mark) and reset hourly API caps if needed
"""
try:
current_time = datetime.datetime.now()
# Only reset at the top of the hour (minute 0)
if current_time.minute == 0:
logger.info(f"Hourly reset triggered at {current_time.hour}:00")
try:
success = reset_hourly_caps()
if success:
logger.info(f"Successfully reset hourly API caps at {current_time.hour}:00")
else:
logger.error(f"Failed to reset hourly API caps at {current_time.hour}:00")
except Exception as e:
logger.error(f"Exception when resetting hourly caps: {e}")
logger.error(traceback.format_exc())
else:
logger.debug(f"Not time to reset hourly caps yet. Current time: {current_time.hour}:{current_time.minute}")
except Exception as e:
logger.error(f"Unexpected error in check_and_reset_caps: {e}")
logger.error(traceback.format_exc())
def scheduler_loop():
"""
Main loop for the scheduler thread
Checks time every 30 seconds and resets caps if needed
"""
try:
logger.info("Starting hourly API cap scheduler")
print("Hourly API cap scheduler thread starting")
# Initial check on startup
check_and_reset_caps()
while not stop_event.is_set():
try:
# Sleep for 30 seconds between checks
# This ensures we won't miss the top of the hour
stop_event.wait(30)
if stop_event.is_set():
logger.info("Stop event detected, exiting scheduler loop")
break
# Check if it's time to reset the caps
check_and_reset_caps()
except Exception as e:
logger.error(f"Error in hourly cap scheduler loop: {e}")
logger.error(traceback.format_exc())
# Sleep briefly to avoid spinning in case of repeated errors
time.sleep(5)
logger.info("Hourly API cap scheduler stopped")
except Exception as e:
logger.error(f"Fatal error in scheduler_loop: {e}")
logger.error(traceback.format_exc())
print(f"FATAL ERROR in hourly cap scheduler: {e}")
def start_scheduler():
"""
Start the hourly API cap scheduler thread
"""
global scheduler_thread
try:
logger.info("Attempting to start hourly API cap scheduler")
print("Attempting to start hourly API cap scheduler")
if scheduler_thread and scheduler_thread.is_alive():
logger.info("Hourly API cap scheduler already running")
return
# Reset the stop event
stop_event.clear()
# Create and start the scheduler thread
scheduler_thread = threading.Thread(target=scheduler_loop, name="HourlyCapScheduler", daemon=True)
scheduler_thread.start()
logger.info(f"Hourly API cap scheduler started. Thread is alive: {scheduler_thread.is_alive()}")
print(f"Hourly API cap scheduler started. Thread is alive: {scheduler_thread.is_alive()}")
return True
except Exception as e:
logger.error(f"Failed to start hourly API cap scheduler: {e}")
logger.error(traceback.format_exc())
print(f"Failed to start hourly API cap scheduler: {e}")
return False
def stop_scheduler():
"""
Stop the hourly API cap scheduler thread
"""
global scheduler_thread
if not scheduler_thread or not scheduler_thread.is_alive():
logger.info("Hourly API cap scheduler not running")
return
# Signal the thread to stop
stop_event.set()
# Wait for the thread to terminate (with timeout)
scheduler_thread.join(timeout=5.0)
if scheduler_thread.is_alive():
logger.warning("Hourly API cap scheduler did not terminate gracefully")
else:
logger.info("Hourly API cap scheduler stopped gracefully")

View File

@@ -2,14 +2,17 @@
"""
Statistics Manager for Huntarr
Handles tracking, storing, and retrieving statistics about hunted and upgraded media
and monitoring hourly API usage for rate limiting
"""
import os
import json
import time
import datetime
import threading
from typing import Dict, Any, Optional
from src.primary.utils.logger import get_logger
from src.primary.settings_manager import get_advanced_setting
logger = get_logger("stats")
@@ -22,6 +25,7 @@ STATS_DIRS = [
# Lock for thread-safe operations
stats_lock = threading.Lock()
hourly_lock = threading.Lock()
def find_writable_stats_dir():
"""Find a writable directory for stats from the list of candidates"""
@@ -51,13 +55,21 @@ def find_writable_stats_dir():
# Find the best stats directory
STATS_DIR = find_writable_stats_dir()
STATS_FILE = os.path.join(STATS_DIR, "media_stats.json") if STATS_DIR else None
HOURLY_CAP_FILE = os.path.join(STATS_DIR, "hourly_cap.json") if STATS_DIR else None
# Log the stats file location once at module load time
if STATS_FILE:
logger.info(f"===> Stats will be stored at: {STATS_FILE}")
logger.info(f"===> Hourly API cap tracking will be stored at: {HOURLY_CAP_FILE}")
else:
logger.error("===> CRITICAL: No stats file location could be determined!")
# Store the last hour we checked for resetting hourly caps
last_hour_checked = None
# Schedule the next hourly reset check
next_reset_check = None
def ensure_stats_dir():
"""Ensure the statistics directory exists"""
if not STATS_DIR:
@@ -117,6 +129,214 @@ def get_default_stats() -> Dict[str, Dict[str, int]]:
"swaparr": {"hunted": 0, "upgraded": 0}
}
def get_default_hourly_caps() -> Dict[str, Dict[str, int]]:
"""Get the default hourly caps structure"""
return {
"sonarr": {"api_hits": 0},
"radarr": {"api_hits": 0},
"lidarr": {"api_hits": 0},
"readarr": {"api_hits": 0},
"whisparr": {"api_hits": 0},
"eros": {"api_hits": 0}
}
def load_hourly_caps() -> Dict[str, Dict[str, int]]:
"""
Load hourly API caps from the caps file
Returns:
Dictionary containing hourly API usage for each app
"""
if not ensure_stats_dir() or not HOURLY_CAP_FILE:
logger.error("Cannot load hourly caps - no valid stats directory available")
return get_default_hourly_caps()
default_caps = get_default_hourly_caps()
try:
if os.path.exists(HOURLY_CAP_FILE):
logger.debug(f"Loading hourly caps from: {HOURLY_CAP_FILE}")
with open(HOURLY_CAP_FILE, 'r') as f:
caps = json.load(f)
# Ensure all apps are in the caps
for app in default_caps:
if app not in caps:
caps[app] = default_caps[app]
logger.debug(f"Loaded hourly caps: {caps}")
return caps
else:
logger.info(f"Hourly caps file not found at {HOURLY_CAP_FILE}, using default caps")
return default_caps
except Exception as e:
logger.error(f"Error loading hourly caps from {HOURLY_CAP_FILE}: {e}")
return default_caps
def save_hourly_caps(caps: Dict[str, Dict[str, int]]) -> bool:
"""
Save hourly API caps to the caps file
Args:
caps: Dictionary containing hourly API usage for each app
Returns:
True if successful, False otherwise
"""
if not ensure_stats_dir() or not HOURLY_CAP_FILE:
logger.error("Cannot save hourly caps - no valid stats directory available")
return False
try:
logger.debug(f"Saving hourly caps to: {HOURLY_CAP_FILE}")
# First write to a temp file, then move it to avoid partial writes
temp_file = f"{HOURLY_CAP_FILE}.tmp"
with open(temp_file, 'w') as f:
json.dump(caps, f, indent=2)
f.flush()
os.fsync(f.fileno())
# Move the temp file to the actual file
os.replace(temp_file, HOURLY_CAP_FILE)
logger.debug(f"Hourly caps saved successfully: {caps}")
return True
except Exception as e:
logger.error(f"Error saving hourly caps to {HOURLY_CAP_FILE}: {e}", exc_info=True)
return False
def reset_hourly_caps() -> bool:
"""
Reset all hourly API caps to zero at the top of the hour
Returns:
True if successful, False otherwise
"""
logger.info("=== RESETTING HOURLY API CAPS ===")
try:
if os.path.exists(HOURLY_CAP_FILE):
os.remove(HOURLY_CAP_FILE)
logger.info(f"Deleted hourly caps file at {HOURLY_CAP_FILE} to reset all API caps")
# Create a fresh hourly caps file
caps = get_default_hourly_caps()
save_success = save_hourly_caps(caps)
if save_success:
logger.info("Successfully reset all hourly API caps to zero")
return True
else:
logger.error("Failed to save reset hourly caps")
return False
except Exception as e:
logger.error(f"Error resetting hourly API caps: {e}")
return False
def check_hourly_reset():
"""
Check if we need to reset hourly caps based on the current hour
"""
global last_hour_checked, next_reset_check
current_time = datetime.datetime.now()
current_hour = current_time.hour
# Skip if we've already checked this hour
if last_hour_checked == current_hour:
return
# Only reset at the top of the hour (00 minute mark)
if current_time.minute == 0:
logger.info(f"Hour changed to {current_hour}:00, resetting hourly API caps")
reset_hourly_caps()
last_hour_checked = current_hour
def increment_hourly_cap(app_type: str, count: int = 1) -> bool:
"""
Increment hourly API usage cap for a specific app
Args:
app_type: The application type (sonarr, radarr, etc.)
count: The amount to increment by (default: 1)
Returns:
True if successful, False otherwise
"""
if app_type not in ["sonarr", "radarr", "lidarr", "readarr", "whisparr", "eros"]:
logger.error(f"Invalid app_type for hourly cap: {app_type}")
return False
# Check if we need to reset hourly caps
check_hourly_reset()
with hourly_lock:
caps = load_hourly_caps()
prev_value = caps[app_type]["api_hits"]
caps[app_type]["api_hits"] += count
new_value = caps[app_type]["api_hits"]
# Get the hourly cap from settings
hourly_limit = get_advanced_setting("hourly_cap", 20)
# Log current usage vs limit
logger.info(f"*** HOURLY API INCREMENT *** {app_type} by {count}: {prev_value} -> {new_value} (limit: {hourly_limit})")
# Warn if approaching limit
if new_value >= int(hourly_limit * 0.8) and prev_value < int(hourly_limit * 0.8):
logger.warning(f"{app_type} is approaching hourly API cap: {new_value}/{hourly_limit}")
# Alert if exceeding limit
if new_value >= hourly_limit and prev_value < hourly_limit:
logger.error(f"{app_type} has exceeded hourly API cap: {new_value}/{hourly_limit}")
save_success = save_hourly_caps(caps)
if not save_success:
logger.error(f"Failed to save hourly caps after incrementing {app_type}")
return False
return True
def get_hourly_cap_status(app_type: str) -> Dict[str, Any]:
"""
Get current API usage status for an app
Args:
app_type: The application type (sonarr, radarr, etc.)
Returns:
Dictionary with usage information
"""
if app_type not in ["sonarr", "radarr", "lidarr", "readarr", "whisparr", "eros"]:
return {"error": f"Invalid app_type: {app_type}"}
with hourly_lock:
caps = load_hourly_caps()
hourly_limit = get_advanced_setting("hourly_cap", 20)
current_usage = caps[app_type]["api_hits"]
return {
"app": app_type,
"current_usage": current_usage,
"limit": hourly_limit,
"remaining": max(0, hourly_limit - current_usage),
"percent_used": int((current_usage / hourly_limit) * 100) if hourly_limit > 0 else 0,
"exceeded": current_usage >= hourly_limit
}
def check_hourly_cap_exceeded(app_type: str) -> bool:
"""
Check if an app has exceeded its hourly API cap
Args:
app_type: The application type (sonarr, radarr, etc.)
Returns:
True if exceeded, False otherwise
"""
status = get_hourly_cap_status(app_type)
return status.get("exceeded", False)
def save_stats(stats: Dict[str, Dict[str, int]]) -> bool:
"""
Save statistics to the stats file
@@ -170,6 +390,10 @@ def increment_stat(app_type: str, stat_type: str, count: int = 1) -> bool:
logger.error(f"Invalid stat_type: {stat_type}")
return False
# Also increment the hourly API cap for this app, unless it's swaparr which doesn't have an API
if app_type != "swaparr":
increment_hourly_cap(app_type, count)
with stats_lock:
stats = load_stats()
prev_value = stats[app_type][stat_type]
@@ -233,9 +457,19 @@ def reset_stats(app_type: Optional[str] = None) -> bool:
return save_stats(stats)
# Initialize stats file with find_writable_stats_dir already called during import
if STATS_DIR and not os.path.exists(STATS_FILE):
logger.info(f"Creating new stats file at: {STATS_FILE}")
save_stats(get_default_stats())
# Initialize the files with find_writable_stats_dir already called during import
if STATS_DIR:
# Initialize stats file if it doesn't exist
if not os.path.exists(STATS_FILE):
logger.info(f"Creating new stats file at: {STATS_FILE}")
save_stats(get_default_stats())
# Initialize hourly caps file if it doesn't exist
if not os.path.exists(HOURLY_CAP_FILE):
logger.info(f"Creating new hourly caps file at: {HOURLY_CAP_FILE}")
save_hourly_caps(get_default_hourly_caps())
# Set up the initial hour check
last_hour_checked = datetime.datetime.now().hour
else:
logger.debug(f"Stats system initialized. Using file: {STATS_FILE}")