add C90 linting rule

This commit is contained in:
Maximilian Dorninger
2026-05-07 15:56:07 +02:00
parent 25cd4b0724
commit ebb6cb790a
5 changed files with 169 additions and 157 deletions

View File

@@ -2,6 +2,7 @@ import logging
from typing import Any, TypeVar
from uuid import UUID
from pydantic import BaseModel
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session
@@ -11,11 +12,10 @@ from media_manager.exceptions import ConflictError, NotFoundError
log = logging.getLogger(__name__)
T = TypeVar("T")
S = TypeVar("S")
EntityId = UUID | int | str
class BaseRepository[T, S]:
class BaseRepository[T, S: BaseModel]:
"""
Base repository providing common CRUD operations for media models.
"""

View File

@@ -132,35 +132,57 @@ class Jackett(GenericIndexer, TorznabMixin):
indexer=indexer, session=session
)
if params["t"] == "tvsearch":
if not search_capabilities.supports_tv_search:
msg = f"Indexer {indexer} does not support TV search"
raise RuntimeError(msg)
if search_capabilities.supports_tv_search_season and "season" in params:
query_params["season"] = params["season"]
if search_capabilities.supports_tv_search_episode and "ep" in params:
query_params["ep"] = params["ep"]
if search_capabilities.supports_tv_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]
self.__add_tv_query_params(
indexer, params, query_params, search_capabilities
)
if params["t"] == "movie":
if not search_capabilities.supports_movie_search:
msg = f"Indexer {indexer} does not support Movie search"
raise RuntimeError(msg)
if search_capabilities.supports_movie_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]
self.__add_movie_query_params(
indexer, params, query_params, search_capabilities
)
return query_params
def __add_tv_query_params(
self,
indexer: str,
params: dict,
query_params: dict[str, str],
search_capabilities: IndexerInfo,
) -> None:
if not search_capabilities.supports_tv_search:
msg = f"Indexer {indexer} does not support TV search"
raise RuntimeError(msg)
if search_capabilities.supports_tv_search_season and "season" in params:
query_params["season"] = params["season"]
if search_capabilities.supports_tv_search_episode and "ep" in params:
query_params["ep"] = params["ep"]
if search_capabilities.supports_tv_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]
def __add_movie_query_params(
self,
indexer: str,
params: dict,
query_params: dict[str, str],
search_capabilities: IndexerInfo,
) -> None:
if not search_capabilities.supports_movie_search:
msg = f"Indexer {indexer} does not support Movie search"
raise RuntimeError(msg)
if search_capabilities.supports_movie_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]
def get_torrents_by_indexer(
self, indexer: str, params: dict, session: requests.Session
) -> list[IndexerQueryResult]:

View File

@@ -18,71 +18,78 @@ class TorznabMixin:
}
for item in xml_tree.findall("channel/item"):
try:
flags = []
seeders = 0
age = 0
indexer_name = "unknown"
if item.find("jackettindexer") is not None:
indexer_name = item.find("jackettindexer").text
if item.find("prowlarrindexer") is not None:
indexer_name = item.find("prowlarrindexer").text
is_usenet = (
item.find("enclosure").attrib["type"] != "application/x-bittorrent"
)
attributes = list(item.findall("torznab:attr", xmlns))
for attribute in attributes:
if is_usenet:
if attribute.attrib["name"] == "usenetdate":
posted_date = parsedate_to_datetime(
attribute.attrib["value"]
)
now = datetime.now(datetime.UTC)
age = int((now - posted_date).total_seconds())
else:
if attribute.attrib["name"] == "seeders":
seeders = int(attribute.attrib["value"])
if attribute.attrib["name"] == "downloadvolumefactor":
download_volume_factor = float(attribute.attrib["value"])
if download_volume_factor == 0:
flags.append("freeleech")
if download_volume_factor == 0.5:
flags.append("halfleech")
if download_volume_factor == 0.75:
flags.append("freeleech75")
if download_volume_factor == 0.25:
flags.append("freeleech25")
if attribute.attrib["name"] == "uploadvolumefactor":
upload_volume_factor = int(attribute.attrib["value"])
if upload_volume_factor == 2:
flags.append("doubleupload")
title = item.find("title").text
size_str = item.find("size")
if size_str is None or size_str.text is None:
log.warning(f"Torznab item {title} has no size, skipping.")
continue
try:
size = int(size_str.text or "0")
except ValueError:
log.warning(f"Torznab item {title} has invalid size, skipping.")
continue
result = IndexerQueryResult(
title=title or "unknown",
download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders,
flags=flags,
size=size,
usenet=is_usenet,
age=age,
indexer=indexer_name,
)
result_list.append(result)
result = self._process_item(item, xmlns)
if result:
result_list.append(result)
except Exception:
log.exception("1 Torznab search result failed")
return result_list
def _process_item(self, item: ET.Element, xmlns: dict) -> IndexerQueryResult | None:
indexer_name = "unknown"
if item.find("jackettindexer") is not None:
indexer_name = item.find("jackettindexer").text or "unknown"
elif item.find("prowlarrindexer") is not None:
indexer_name = item.find("prowlarrindexer").text or "unknown"
is_usenet = item.find("enclosure").attrib["type"] != "application/x-bittorrent"
seeders, age, flags = self._parse_torznab_attributes(item, xmlns, is_usenet)
title = item.find("title").text or "unknown"
size_str = item.find("size")
if size_str is None or size_str.text is None:
log.warning(f"Torznab item {title} has no size, skipping.")
return None
try:
size = int(size_str.text or "0")
except ValueError:
log.warning(f"Torznab item {title} has invalid size, skipping.")
return None
return IndexerQueryResult(
title=title,
download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders,
flags=flags,
size=size,
usenet=is_usenet,
age=age,
indexer=indexer_name,
)
def _parse_torznab_attributes(
self, item: ET.Element, xmlns: dict, is_usenet: bool
) -> tuple[int, int, list[str]]:
seeders = 0
age = 0
flags = []
attributes = list(item.findall("torznab:attr", xmlns))
for attribute in attributes:
name = attribute.attrib["name"]
value = attribute.attrib["value"]
if is_usenet:
if name == "usenetdate":
posted_date = parsedate_to_datetime(value)
now = datetime.now(datetime.UTC)
age = int((now - posted_date).total_seconds())
else:
if name == "seeders":
seeders = int(value)
elif name == "downloadvolumefactor":
self._add_leech_flags(float(value), flags)
elif name == "uploadvolumefactor":
if int(value) == 2:
flags.append("doubleupload")
return seeders, age, flags
def _add_leech_flags(self, factor: float, flags: list[str]) -> None:
if factor == 0:
flags.append("freeleech")
elif factor == 0.5:
flags.append("halfleech")
elif factor == 0.75:
flags.append("freeleech75")
elif factor == 0.25:
flags.append("freeleech25")

View File

@@ -16,69 +16,52 @@ log = logging.getLogger(__name__)
def evaluate_indexer_query_result(
query_result: IndexerQueryResult, ruleset: ScoringRuleSet
) -> tuple[IndexerQueryResult, bool]:
title_rules = MediaManagerConfig().indexers.title_scoring_rules
indexer_flag_rules = MediaManagerConfig().indexers.indexer_flag_scoring_rules
config = MediaManagerConfig().indexers
for rule_name in ruleset.rule_names:
for rule in title_rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
if (
any(
re.search(
rf"\b{re.escape(keyword)}\b",
query_result.title,
re.IGNORECASE,
)
for keyword in rule.keywords
)
and not rule.negate
):
log.debug(
f"Rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
elif (
not any(
re.search(
rf"\b{re.escape(keyword)}\b",
query_result.title,
re.IGNORECASE,
)
for keyword in rule.keywords
)
and rule.negate
):
log.debug(
f"Negated rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(
f"Rule {rule.name} with keywords {rule.keywords} did not match for {query_result.title}"
)
for rule in indexer_flag_rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
if (
any(flag in query_result.flags for flag in rule.flags)
and not rule.negate
):
log.debug(
f"Rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}"
)
query_result.score += rule.score_modifier
elif (
not any(flag in query_result.flags for flag in rule.flags)
and rule.negate
):
log.debug(
f"Negated rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}"
)
query_result.score += rule.score_modifier
else:
log.debug(
f"Rule {rule.name} with flags {rule.flags} did not match for {query_result.title} with flags {query_result.flags}"
)
_apply_title_rule(query_result, rule_name, config.title_scoring_rules)
_apply_indexer_flag_rule(
query_result, rule_name, config.indexer_flag_scoring_rules
)
if query_result.score <= 0:
return query_result, False
return query_result, True
def _apply_title_rule(
query_result: IndexerQueryResult, rule_name: str, rules: list
) -> None:
for rule in rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
matched = any(
re.search(rf"\b{re.escape(k)}\b", query_result.title, re.IGNORECASE)
for k in rule.keywords
)
if matched != rule.negate:
log.debug(
f"{'Negated ' if rule.negate else ''}Rule {rule.name} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(f"Rule {rule.name} did not match for {query_result.title}")
def _apply_indexer_flag_rule(
query_result: IndexerQueryResult, rule_name: str, rules: list
) -> None:
for rule in rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
matched = any(f in query_result.flags for f in rule.flags)
if matched != rule.negate:
log.debug(
f"{'Negated ' if rule.negate else ''}Rule {rule.name} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(f"Rule {rule.name} did not match for {query_result.title}")
if query_result.score <= 0:
return query_result, False

View File

@@ -6,10 +6,10 @@ line-ending = "lf"
quote-style = "double"
[lint]
# to be enabled: BLE, C90, CPY, D, DOC, DTZ, FBT, G, PL, RSE, SLF, SIM, TC
# to be enabled: BLE, CPY, D, DOC, DTZ, FBT, G, PL, RSE, SLF, SIM, TC
extend-select = [
"A", "ARG", "ASYNC", "ANN",
"B",
"B", "C90",
"C4", "COM",
"DTZ",
"E", "EM", "EXE",