mirror of
https://github.com/morpheus65535/bazarr.git
synced 2026-04-18 05:08:50 -04:00
539 lines
19 KiB
Python
539 lines
19 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import absolute_import
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
|
|
from requests import Session
|
|
from requests.exceptions import JSONDecodeError
|
|
|
|
from subliminal.exceptions import ConfigurationError, ProviderError
|
|
from subliminal.video import Episode, Movie
|
|
|
|
from subliminal_patch.exceptions import APIThrottled
|
|
from subliminal_patch.providers import Provider
|
|
from subliminal_patch.providers.utils import (
|
|
get_archive_from_bytes,
|
|
get_subtitle_from_archive,
|
|
update_matches,
|
|
)
|
|
from subliminal_patch.subtitle import Subtitle
|
|
|
|
from subzero.language import Language
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SUBX_BASE_URL = "https://subx-api.duckdns.org"
|
|
|
|
# Regex to detect Spain Spanish in descriptions (same as Subdivx provider)
|
|
_SPANISH_RE = re.compile(r"españa|ib[eé]rico|castellano|gallego|castilla|europ[ae]", re.IGNORECASE)
|
|
|
|
|
|
# ---------------------------
|
|
# Helpers
|
|
# ---------------------------
|
|
|
|
def _series_sanitizer(title):
|
|
"""Cleans series title for search."""
|
|
title = title or ""
|
|
title = re.sub(r"[._]+", " ", title)
|
|
title = re.sub(r"\s+", " ", title).strip()
|
|
return title
|
|
|
|
|
|
def _unique_nonempty(seq):
|
|
"""Returns unique non-empty elements maintaining order."""
|
|
seen = set()
|
|
out = []
|
|
for x in seq:
|
|
if not x:
|
|
continue
|
|
if x not in seen:
|
|
seen.add(x)
|
|
out.append(x)
|
|
return out
|
|
|
|
|
|
def _collect_titles(video, episode, max_alts=5):
|
|
"""Collects main and alternative titles."""
|
|
titles = [video.series] if episode else [video.title]
|
|
try:
|
|
alts = getattr(
|
|
video,
|
|
"alternative_series" if episode else "alternative_titles",
|
|
None,
|
|
)
|
|
if alts:
|
|
titles.extend(alts)
|
|
except Exception:
|
|
pass
|
|
return _unique_nonempty(titles)[:max_alts]
|
|
|
|
|
|
# ---------------------------
|
|
# Subtitle Class
|
|
# ---------------------------
|
|
|
|
class SubxSubtitle(Subtitle):
|
|
"""SubX Subtitle."""
|
|
provider_name = "subx"
|
|
hash_verifiable = False
|
|
|
|
def __init__(
|
|
self,
|
|
language,
|
|
video,
|
|
page_link,
|
|
title,
|
|
description,
|
|
uploader,
|
|
download_url,
|
|
season=None,
|
|
episode=None,
|
|
):
|
|
super(SubxSubtitle, self).__init__(
|
|
language,
|
|
hearing_impaired=False,
|
|
page_link=page_link,
|
|
)
|
|
|
|
self.video = video
|
|
self.download_url = download_url
|
|
self.uploader = uploader
|
|
self.season = season
|
|
self.episode = episode
|
|
|
|
self.release_info = str(title).strip()
|
|
if description:
|
|
self.release_info += f" | {description}"
|
|
|
|
@property
|
|
def id(self):
|
|
return self.page_link
|
|
|
|
def get_matches(self, video):
|
|
"""Determines which features match the video."""
|
|
self.matches = set()
|
|
|
|
if isinstance(video, Episode):
|
|
self.matches.update({"title", "series", "year"})
|
|
|
|
# Match season if it aligns
|
|
if self.season == video.season:
|
|
self.matches.add("season")
|
|
|
|
# For episode matching:
|
|
# - If subtitle has specific episode, it must match
|
|
# - If subtitle is a season pack (episode=None), consider it a match
|
|
if self.episode is not None:
|
|
if self.episode == video.episode:
|
|
self.matches.add("episode")
|
|
else:
|
|
# Season pack - add episode match to allow Bazarr to accept it
|
|
self.matches.add("episode")
|
|
|
|
elif isinstance(video, Movie):
|
|
self.matches.update({"title", "year"})
|
|
|
|
# Update matches from release info, but preserve episode match for season packs
|
|
is_season_pack = isinstance(video, Episode) and self.episode is None
|
|
if is_season_pack:
|
|
# Temporarily store that this is a season pack
|
|
had_episode_match = "episode" in self.matches
|
|
|
|
update_matches(self.matches, video, self.release_info)
|
|
|
|
# Restore episode match for season packs (it might be removed by update_matches)
|
|
if is_season_pack and had_episode_match:
|
|
self.matches.add("episode")
|
|
|
|
return self.matches
|
|
|
|
|
|
# ---------------------------
|
|
# Provider Class
|
|
# ---------------------------
|
|
|
|
class SubxSubtitlesProvider(Provider):
|
|
"""SubX subtitle provider for Spanish."""
|
|
provider_name = "subx"
|
|
hash_verifiable = False
|
|
|
|
languages = {
|
|
Language.fromalpha2("es"),
|
|
Language("spa", "MX"),
|
|
}
|
|
|
|
video_types = (Episode, Movie)
|
|
subtitle_class = SubxSubtitle
|
|
|
|
def __init__(self, api_key=None):
|
|
"""
|
|
Initialize SubX provider.
|
|
|
|
Args:
|
|
api_key: SubX API key (required)
|
|
"""
|
|
if not api_key:
|
|
raise ConfigurationError("SubX API key is required")
|
|
|
|
self.session = Session()
|
|
self.session.headers.update({
|
|
"Authorization": f"Bearer {api_key}",
|
|
"User-Agent": os.environ.get("SZ_USER_AGENT", "Sub-Zero/2"),
|
|
})
|
|
|
|
def initialize(self):
|
|
"""Initialize session."""
|
|
pass
|
|
|
|
def terminate(self):
|
|
"""Close session."""
|
|
self.session.close()
|
|
|
|
def run_query(self, query, video, video_type, season=None, episode=None):
|
|
"""
|
|
Execute a search on SubX API.
|
|
|
|
Args:
|
|
query: Search term (or None if using imdb_id)
|
|
video: Video object
|
|
video_type: Video type ('episode' or 'movie')
|
|
season: Season number to filter (optional)
|
|
episode: Episode number to filter (optional)
|
|
|
|
Returns:
|
|
List of found subtitles
|
|
"""
|
|
# Build search parameters
|
|
params = {
|
|
"limit": 200,
|
|
"video_type": video_type,
|
|
}
|
|
|
|
# Prefer IMDb ID for more accurate results (per API docs)
|
|
if hasattr(video, 'imdb_id') and video.imdb_id:
|
|
params["imdb_id"] = video.imdb_id
|
|
logger.debug("Using IMDb ID for search: %s", video.imdb_id)
|
|
elif query:
|
|
# Fallback to title search
|
|
params["title"] = query
|
|
else:
|
|
logger.error("No search criteria provided (no imdb_id or query)")
|
|
return []
|
|
|
|
# Add year if available (helps narrow results)
|
|
if hasattr(video, 'year') and video.year:
|
|
params["year"] = video.year
|
|
|
|
logger.debug("SubX search params: %s", params)
|
|
|
|
# Execute request with retry logic
|
|
max_retries = 3
|
|
data = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = self.session.get(
|
|
f"{_SUBX_BASE_URL}/api/subtitles/search",
|
|
params=params,
|
|
timeout=10, # 10s timeout for search (per API docs)
|
|
)
|
|
|
|
# Handle specific HTTP status codes per API documentation
|
|
if response.status_code == 400:
|
|
logger.error("Bad request to SubX API: %s", response.text)
|
|
return [] # Don't retry on bad requests
|
|
|
|
elif response.status_code == 401:
|
|
logger.error("Invalid SubX API key")
|
|
raise ConfigurationError("Invalid SubX API key")
|
|
|
|
elif response.status_code == 404:
|
|
logger.debug("No results found (404)")
|
|
return []
|
|
|
|
elif response.status_code == 429:
|
|
# Rate limited - use Retry-After header if available
|
|
if attempt < max_retries - 1:
|
|
wait_time = int(response.headers.get("Retry-After", 60 * (attempt + 1)))
|
|
logger.warning("Rate limit hit, waiting %ds before retry %d/%d",
|
|
wait_time, attempt + 1, max_retries)
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error("Rate limit exceeded after %d retries", max_retries)
|
|
raise APIThrottled("SubX rate limit exceeded")
|
|
|
|
elif response.status_code >= 500:
|
|
# Server error - retry with backoff
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt
|
|
logger.warning("Server error %d, retrying in %ds (attempt %d/%d)",
|
|
response.status_code, wait_time, attempt + 1, max_retries)
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error("Server error persists after %d retries", max_retries)
|
|
return []
|
|
|
|
# Success
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Proactively slow down if approaching rate limit
|
|
remaining = response.headers.get("X-RateLimit-Remaining")
|
|
limit = response.headers.get("X-RateLimit-Limit")
|
|
reset = response.headers.get("X-RateLimit-Reset")
|
|
|
|
if remaining is not None and limit is not None:
|
|
try:
|
|
remaining_int = int(remaining)
|
|
limit_int = int(limit)
|
|
|
|
# Slow down when below 20% of quota
|
|
if remaining_int < limit_int * 0.2:
|
|
if reset is not None:
|
|
# Wait exactly until the window resets
|
|
wait_time = max(0, int(reset) - int(time.time()))
|
|
else:
|
|
wait_time = 2 # Fallback
|
|
|
|
logger.warning(
|
|
"Approaching SubX rate limit (%d/%d remaining), waiting %ds",
|
|
remaining_int, limit_int, wait_time
|
|
)
|
|
time.sleep(wait_time)
|
|
except ValueError:
|
|
pass
|
|
|
|
break # Exit retry loop
|
|
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
logger.warning("SubX API error (attempt %d/%d): %s",
|
|
attempt + 1, max_retries, e)
|
|
time.sleep(2 ** attempt)
|
|
continue
|
|
else:
|
|
logger.error("SubX API error after %d retries: %s", max_retries, e)
|
|
return []
|
|
|
|
if data is None:
|
|
logger.error("No data received from SubX API")
|
|
return []
|
|
|
|
logger.debug(
|
|
"SubX API response: total=%s | items=%d",
|
|
data.get("total"),
|
|
len(data.get("items", [])),
|
|
)
|
|
|
|
subtitles = []
|
|
filtered_count = 0
|
|
season_packs = [] # Store season packs as fallback
|
|
|
|
for item in data.get("items", []):
|
|
# Filter by season/episode if searching for TV shows
|
|
item_season = item.get("season")
|
|
item_episode = item.get("episode")
|
|
|
|
logger.debug("Item: season=%s, episode=%s, title=%s",
|
|
item_season, item_episode, item.get("title"))
|
|
|
|
# Skip if season doesn't match
|
|
if season is not None and item_season != season:
|
|
logger.debug("Skipping - season mismatch (want %s, got %s)", season, item_season)
|
|
filtered_count += 1
|
|
continue
|
|
|
|
# If looking for specific episode
|
|
if episode is not None:
|
|
# Exact episode match - highest priority
|
|
if item_episode == episode:
|
|
logger.debug("Found exact episode match")
|
|
# Season pack (episode=None) - save as fallback
|
|
elif item_episode is None and item_season == season:
|
|
logger.debug("Found season pack - saving as fallback")
|
|
season_packs.append(item)
|
|
continue
|
|
# Different episode - skip
|
|
else:
|
|
logger.debug("Skipping - episode mismatch (want %s, got %s)", episode, item_episode)
|
|
filtered_count += 1
|
|
continue
|
|
|
|
# Build page URL
|
|
page_url = item.get("page_url")
|
|
if not page_url and item.get("id"):
|
|
page_url = f"{_SUBX_BASE_URL}/api/subtitles/{item['id']}"
|
|
|
|
# Detect language variant (Spain vs LatAm) from description
|
|
description = item.get("description") or ""
|
|
spain = _SPANISH_RE.search(description.lower()) is not None
|
|
language = Language.fromalpha2("es") if spain else Language("spa", "MX")
|
|
|
|
subtitles.append(self.subtitle_class(
|
|
language=language,
|
|
video=video,
|
|
page_link=page_url,
|
|
title=item.get("title"),
|
|
description=description,
|
|
uploader=item.get("uploader_name", "unknown"),
|
|
download_url=f"{_SUBX_BASE_URL}/api/subtitles/{item['id']}/download",
|
|
season=item_season,
|
|
episode=item_episode,
|
|
))
|
|
|
|
# If no exact episode matches found, use season packs as fallback
|
|
if episode is not None and not subtitles and season_packs:
|
|
logger.info("No exact episode matches, using %d season pack(s) as fallback", len(season_packs))
|
|
for item in season_packs:
|
|
page_url = item.get("page_url")
|
|
if not page_url and item.get("id"):
|
|
page_url = f"{_SUBX_BASE_URL}/api/subtitles/{item['id']}"
|
|
|
|
# Detect language variant from description
|
|
description = item.get("description") or ""
|
|
spain = _SPANISH_RE.search(description.lower()) is not None
|
|
language = Language.fromalpha2("es") if spain else Language("spa", "MX")
|
|
|
|
subtitles.append(self.subtitle_class(
|
|
language=language,
|
|
video=video,
|
|
page_link=page_url,
|
|
title=item.get("title"),
|
|
description=description,
|
|
uploader=item.get("uploader_name", "unknown"),
|
|
download_url=f"{_SUBX_BASE_URL}/api/subtitles/{item['id']}/download",
|
|
season=item.get("season"),
|
|
episode=item.get("episode"),
|
|
))
|
|
|
|
logger.debug("After filtering: %d subtitles (filtered out %d)", len(subtitles), filtered_count)
|
|
|
|
return subtitles
|
|
|
|
def list_subtitles(self, video, languages):
|
|
"""
|
|
List available subtitles for video.
|
|
|
|
Args:
|
|
video: Video object
|
|
languages: Requested languages
|
|
|
|
Returns:
|
|
List of found subtitles
|
|
"""
|
|
subtitles = []
|
|
|
|
# ---------------------------
|
|
# EPISODES
|
|
# ---------------------------
|
|
if isinstance(video, Episode):
|
|
titles = _collect_titles(video, episode=True, max_alts=3)
|
|
logger.debug("Titles to search: %s", titles)
|
|
|
|
for raw_title in titles:
|
|
title = _series_sanitizer(raw_title)
|
|
|
|
# 1. First try: Exact episode (e.g., "Breaking Bad S03E13")
|
|
logger.debug("Searching for %s S%02dE%02d", title, video.season, video.episode)
|
|
query = f"{title} S{video.season:02d}E{video.episode:02d}"
|
|
subtitles = self.run_query(
|
|
query,
|
|
video,
|
|
"episode",
|
|
season=video.season,
|
|
episode=video.episode,
|
|
)
|
|
|
|
if subtitles:
|
|
logger.debug("Found %d subtitles for exact episode", len(subtitles))
|
|
break
|
|
|
|
# 2. Second try: Season only (e.g., "Breaking Bad S03")
|
|
logger.debug("No exact match, trying season: %s S%02d", title, video.season)
|
|
query = f"{title} S{video.season:02d}"
|
|
subtitles = self.run_query(
|
|
query,
|
|
video,
|
|
"episode",
|
|
season=video.season,
|
|
episode=None, # Accept any episode from this season
|
|
)
|
|
|
|
if subtitles:
|
|
logger.debug("Found %d subtitles for season", len(subtitles))
|
|
break
|
|
|
|
# 3. Last try: Series title only (fallback for poorly tagged content)
|
|
logger.debug("No season match, trying series title only: %s", title)
|
|
subtitles = self.run_query(
|
|
title,
|
|
video,
|
|
"episode",
|
|
season=video.season,
|
|
episode=None,
|
|
)
|
|
|
|
if subtitles:
|
|
logger.debug("Found %d subtitles from series title search", len(subtitles))
|
|
break
|
|
|
|
time.sleep(1) # Small delay between different title attempts
|
|
|
|
# ---------------------------
|
|
# MOVIES
|
|
# ---------------------------
|
|
else:
|
|
titles = _collect_titles(video, episode=False, max_alts=3)
|
|
logger.debug("Titles to search: %s", titles)
|
|
|
|
for title in titles:
|
|
logger.debug("Searching for movie: %s", title)
|
|
subtitles = self.run_query(title, video, "movie")
|
|
|
|
if subtitles:
|
|
logger.debug("Found %d subtitles for movie", len(subtitles))
|
|
break
|
|
|
|
time.sleep(1) # Small delay between searches
|
|
|
|
return subtitles
|
|
|
|
def download_subtitle(self, subtitle):
|
|
"""
|
|
Download subtitle content.
|
|
|
|
Args:
|
|
subtitle: Subtitle object to download
|
|
"""
|
|
try:
|
|
response = self.session.get(
|
|
subtitle.download_url,
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
except Exception as e:
|
|
logger.error("Failed to download subtitle: %s", e)
|
|
raise APIThrottled("Failed to download subtitle")
|
|
|
|
# Process compressed file
|
|
archive = get_archive_from_bytes(response.content)
|
|
if archive is None:
|
|
raise APIThrottled("Unknown or unsupported archive format")
|
|
|
|
episode = (
|
|
subtitle.video.episode
|
|
if isinstance(subtitle.video, Episode)
|
|
else None
|
|
)
|
|
|
|
subtitle.content = get_subtitle_from_archive(
|
|
archive,
|
|
episode=episode,
|
|
)
|