mirror of
https://github.com/Dictionarry-Hub/profilarr.git
synced 2026-01-02 11:47:46 -05:00
- added option to set radarr/sonarr specific scores that profilarr's compiler will handle on import - revise design for arr settings container - now styled as a table - completely rewrote import module. Now uses connection pooling to reuse connections. - fixed import progress bug where 1 failed format causes all other formats to be labelled as failed (even if they succeeded) - fixed bug where on pull sync wasn't working - improve styling for link / unlink database modals - fixed issue where 0 score formats were removed in selective mode
182 lines
5.9 KiB
Python
182 lines
5.9 KiB
Python
# app/task/tasks.py
|
|
from abc import ABC, abstractmethod
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from datetime import datetime
|
|
import logging
|
|
import re
|
|
|
|
from ..db import get_db
|
|
|
|
task_logger = logging.getLogger('task_system')
|
|
task_logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class Task(ABC):
|
|
|
|
def __init__(self, id=None, name=None, interval_minutes=None):
|
|
self.id = id
|
|
self.name = name
|
|
self.interval_minutes = interval_minutes
|
|
self.last_run = None
|
|
self.status = 'pending'
|
|
|
|
@abstractmethod
|
|
def run_job(self):
|
|
pass
|
|
|
|
def update_status(self, status):
|
|
task_logger.info(
|
|
f"Task {self.name} (ID: {self.id}) status changed to: {status}")
|
|
with get_db() as conn:
|
|
conn.execute(
|
|
'''
|
|
UPDATE scheduled_tasks
|
|
SET status = ?, last_run = ?
|
|
WHERE id = ?
|
|
''', (status, datetime.now(), self.id))
|
|
conn.commit()
|
|
|
|
|
|
class TaskScheduler:
|
|
_instance = None
|
|
|
|
def __init__(self):
|
|
self.scheduler = BackgroundScheduler()
|
|
self.logger = logging.getLogger('TaskScheduler')
|
|
TaskScheduler._instance = self
|
|
|
|
@classmethod
|
|
def get_instance(cls):
|
|
return cls._instance
|
|
|
|
def load_tasks_from_db(self):
|
|
"""
|
|
Reload tasks from the DB, removing all old jobs first
|
|
so we don't collide with existing job IDs.
|
|
"""
|
|
self.logger.debug(
|
|
"[TaskScheduler] remove_all_jobs to avoid duplicates")
|
|
self.scheduler.remove_all_jobs()
|
|
|
|
with get_db() as conn:
|
|
task_rows = conn.execute(
|
|
'SELECT * FROM scheduled_tasks').fetchall()
|
|
for row in task_rows:
|
|
task_class = self.get_task_class(row['type'])
|
|
if task_class:
|
|
task = task_class(id=row['id'],
|
|
name=row['name'],
|
|
interval_minutes=row['interval_minutes'])
|
|
self.schedule_task(task)
|
|
|
|
def schedule_task(self, task):
|
|
self.scheduler.add_job(self._run_task_wrapper(task),
|
|
'interval',
|
|
minutes=task.interval_minutes,
|
|
id=str(task.id))
|
|
|
|
def _run_task_wrapper(self, task):
|
|
|
|
def wrapped():
|
|
task_logger.info(f"Starting task: {task.name} (ID: {task.id})")
|
|
start_time = datetime.now()
|
|
try:
|
|
task.update_status('running')
|
|
task.run_job()
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
task_logger.info(
|
|
f"Task {task.name} completed successfully in {duration:.2f} seconds"
|
|
)
|
|
task.update_status('success')
|
|
except Exception as e:
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
task_logger.error(
|
|
f"Task {task.name} failed after {duration:.2f} seconds: {str(e)}"
|
|
)
|
|
task.update_status('failed')
|
|
|
|
return wrapped
|
|
|
|
def start(self):
|
|
self.scheduler.start()
|
|
|
|
@staticmethod
|
|
def get_task_class(task_type):
|
|
task_classes = {
|
|
'Sync': SyncTask,
|
|
'Backup': BackupTask,
|
|
'ImportSchedule': ImportScheduleTask,
|
|
}
|
|
return task_classes.get(task_type)
|
|
|
|
|
|
class SyncTask(Task):
|
|
|
|
def run_job(self):
|
|
"""Updates remote git status and performs other sync operations"""
|
|
from ..git.status.status import GitStatusManager
|
|
import os
|
|
from ..config.config import config
|
|
|
|
repo_path = config.DB_DIR
|
|
|
|
# Quick check if there's a valid git repo
|
|
if not os.path.exists(os.path.join(repo_path, '.git')):
|
|
task_logger.info("No valid git repository found - skipping sync")
|
|
return
|
|
|
|
# If we have a valid repo, proceed with sync
|
|
status_manager = GitStatusManager.get_instance(repo_path)
|
|
if status_manager:
|
|
success = status_manager.update_remote_status()
|
|
if not success:
|
|
task_logger.error("Failed to update remote git status")
|
|
|
|
|
|
class BackupTask(Task):
|
|
|
|
def run_job(self):
|
|
"""Performs configuration backup and cleanup"""
|
|
from .backup.backup import BackupManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"Running backup task {self.name}")
|
|
|
|
manager = BackupManager()
|
|
success, backup_name = manager.create_backup()
|
|
|
|
if success:
|
|
logger.info(f"Backup created successfully: {backup_name}")
|
|
# Run cleanup to remove old backups
|
|
manager.cleanup_old_backups()
|
|
else:
|
|
logger.error(f"Backup failed: {backup_name}"
|
|
) # backup_name contains error message in this case
|
|
|
|
|
|
class ImportScheduleTask(Task):
|
|
"""
|
|
A scheduled task that runs the "run_import_for_config" logic for a specific ARR config
|
|
(inferred by parsing the config ID from the task's 'name').
|
|
For example, if the scheduled_tasks.name is 'Import for ARR #1 - radarr',
|
|
we parse '1' out of that string to know which arr_config to import.
|
|
"""
|
|
|
|
def run_job(self):
|
|
from ..importer import handle_scheduled_import
|
|
|
|
task_logger.info(
|
|
f"[ImportScheduleTask] Running scheduled import for task_id={self.id} ({self.name})"
|
|
)
|
|
result = handle_scheduled_import(self.id)
|
|
if not result.get('success'):
|
|
task_logger.error(
|
|
f"[ImportScheduleTask] Scheduled import failed for task_id={self.id}: {result}"
|
|
)
|
|
else:
|
|
task_logger.info(
|
|
f"[ImportScheduleTask] Scheduled import completed for task_id={self.id}: added={result.get('added', 0)}, updated={result.get('updated', 0)}, failed={result.get('failed', 0)}"
|
|
)
|