Files
MediaManager/media_manager/indexer/utils.py
Marcel Hellwig 96b84d45db Adding some more new lints (#393)
Enable `UP` and `TRY` lint
2026-02-01 18:04:15 +01:00

160 lines
6.5 KiB
Python

import logging
from urllib.parse import urljoin
import requests
from media_manager.config import MediaManagerConfig
from media_manager.indexer.config import ScoringRuleSet
from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.movies.schemas import Movie
from media_manager.tv.schemas import Show
log = logging.getLogger(__name__)
def evaluate_indexer_query_result(
query_result: IndexerQueryResult, ruleset: ScoringRuleSet
) -> tuple[IndexerQueryResult, bool]:
title_rules = MediaManagerConfig().indexers.title_scoring_rules
indexer_flag_rules = MediaManagerConfig().indexers.indexer_flag_scoring_rules
for rule_name in ruleset.rule_names:
for rule in title_rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
if (
any(
keyword.lower() in query_result.title.lower()
for keyword in rule.keywords
)
and not rule.negate
):
log.debug(
f"Rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
elif (
not any(
keyword.lower() in query_result.title.lower()
for keyword in rule.keywords
)
and rule.negate
):
log.debug(
f"Negated rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(
f"Rule {rule.name} with keywords {rule.keywords} did not match for {query_result.title}"
)
for rule in indexer_flag_rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
if (
any(flag in query_result.flags for flag in rule.flags)
and not rule.negate
):
log.debug(
f"Rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}"
)
query_result.score += rule.score_modifier
elif (
not any(flag in query_result.flags for flag in rule.flags)
and rule.negate
):
log.debug(
f"Negated rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}"
)
query_result.score += rule.score_modifier
else:
log.debug(
f"Rule {rule.name} with flags {rule.flags} did not match for {query_result.title} with flags {query_result.flags}"
)
if query_result.score <= 0:
return query_result, False
return query_result, True
def evaluate_indexer_query_results(
query_results: list[IndexerQueryResult], media: Show | Movie, is_tv: bool
) -> list[IndexerQueryResult]:
scoring_rulesets: list[ScoringRuleSet] = (
MediaManagerConfig().indexers.scoring_rule_sets
)
for ruleset in scoring_rulesets:
if (
(media.library in ruleset.libraries)
or ("ALL_TV" in ruleset.libraries and is_tv)
or ("ALL_MOVIES" in ruleset.libraries and not is_tv)
):
log.debug(
f"Applying scoring ruleset {ruleset.name} for {media.name} ({media.year}) to {len(query_results)} results."
)
for result in query_results:
log.debug(
f"Applying scoring ruleset {ruleset.name} for IndexerQueryResult {result.title} for {media.name} ({media.year})"
)
result, passed = evaluate_indexer_query_result(
query_result=result, ruleset=ruleset
)
if not passed:
log.debug(
f"Indexer query result {result.title} did not pass scoring ruleset {ruleset.name} with score {result.score}, removing from results."
)
else:
log.debug(
f"Indexer query result {result.title} passed scoring ruleset {ruleset.name} with score {result.score}."
)
query_results = [result for result in query_results if result.score >= 0]
query_results.sort(reverse=True)
log.info(f"{len(query_results)} passed the scoring rulesets")
return query_results
def follow_redirects_to_final_torrent_url(
initial_url: str, session: requests.Session, timeout: float = 10
) -> str:
"""
Follows redirects to get the final torrent URL.
:param initial_url: The initial URL to follow.
:param session: A requests session to use for the requests.
:param timeout: Timeout in seconds for each redirect request.
:return: The final torrent URL.
:raises: RuntimeError if it fails.
"""
current_url = initial_url
try:
for _ in range(10): # Limit redirects to prevent infinite loops
response = session.get(current_url, allow_redirects=False, timeout=timeout)
if 300 <= response.status_code < 400:
redirect_url = response.headers.get("Location")
if not redirect_url:
msg = "Redirect response without Location header"
raise RuntimeError(msg)
# Resolve relative redirects against the last URL
current_url = urljoin(current_url, redirect_url)
log.debug(f"Following redirect to: {current_url}")
if current_url.startswith("magnet:"):
return current_url
else:
response.raise_for_status() # Raise an exception for bad status codes
return current_url
else:
msg = "Exceeded maximum number of redirects"
raise RuntimeError(msg)
except requests.exceptions.RequestException as e:
log.debug(
f"An error occurred during the request for {initial_url}",
exc_info=True,
)
msg = "An error occurred during the request"
raise RuntimeError(msg) from e
return current_url