refactor torrent module to use dependency injection and decouple tv and torrent module for better separation of concerns

This commit is contained in:
maxDorninger
2025-06-08 11:23:18 +02:00
parent b9f955fa3f
commit 1fddf876c8
8 changed files with 255 additions and 244 deletions

View File

@@ -1 +0,0 @@

View File

@@ -4,10 +4,16 @@ from fastapi import Depends
from media_manager.database import DbSessionDependency
from media_manager.torrent.service import TorrentService
from media_manager.torrent.repository import TorrentRepository
def get_torrent_service(db: DbSessionDependency) -> TorrentService:
return TorrentService(db=db)
def get_torrent_repository(db: DbSessionDependency) -> TorrentRepository:
return TorrentRepository(db=db)
TorrentRepositoryDependency = Annotated[TorrentRepository, Depends(get_torrent_repository)]
def get_torrent_service(torrent_repository: TorrentRepositoryDependency) -> TorrentService:
return TorrentService(torrent_repository=torrent_repository)
TorrentServiceDependency = Annotated[TorrentService, Depends(get_torrent_service)]

View File

@@ -1,47 +1,49 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from media_manager.database import DbSessionDependency
from media_manager.torrent.models import Torrent
from media_manager.torrent.schemas import TorrentId, Torrent as TorrentSchema
from media_manager.tv.models import SeasonFile, Show, Season
from media_manager.tv.schemas import SeasonFile as SeasonFileSchema, Show as ShowSchema
def get_seasons_files_of_torrent(
db: Session, torrent_id: TorrentId
) -> list[SeasonFileSchema]:
stmt = select(SeasonFile).where(SeasonFile.torrent_id == torrent_id)
result = db.execute(stmt).scalars().all()
return [SeasonFileSchema.model_validate(season_file) for season_file in result]
class TorrentRepository:
def __init__(self, db: DbSessionDependency):
self.db = db
def get_seasons_files_of_torrent(
self, torrent_id: TorrentId
) -> list[SeasonFileSchema]:
stmt = select(SeasonFile).where(SeasonFile.torrent_id == torrent_id)
result = self.db.execute(stmt).scalars().all()
return [SeasonFileSchema.model_validate(season_file) for season_file in result]
def get_show_of_torrent(db: Session, torrent_id: TorrentId) -> ShowSchema:
stmt = (
select(Show)
.join(SeasonFile.season)
.join(Season.show)
.where(SeasonFile.torrent_id == torrent_id)
)
result = db.execute(stmt).unique().scalar_one_or_none()
return ShowSchema.model_validate(result)
def get_show_of_torrent(self, torrent_id: TorrentId) -> ShowSchema:
stmt = (
select(Show)
.join(SeasonFile.season)
.join(Season.show)
.where(SeasonFile.torrent_id == torrent_id)
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
return ShowSchema.model_validate(result)
def save_torrent(self, torrent: TorrentSchema) -> TorrentSchema:
self.db.merge(Torrent(**torrent.model_dump()))
self.db.commit()
return TorrentSchema.model_validate(torrent)
def save_torrent(db: Session, torrent_schema: TorrentSchema) -> TorrentSchema:
db.merge(Torrent(**torrent_schema.model_dump()))
db.commit()
return TorrentSchema.model_validate(torrent_schema)
def get_all_torrents(self) -> list[TorrentSchema]:
stmt = select(Torrent)
result = self.db.execute(stmt).scalars().all()
return [
TorrentSchema.model_validate(torrent_schema) for torrent_schema in result
]
def get_all_torrents(db: Session) -> list[TorrentSchema]:
stmt = select(Torrent)
result = db.execute(stmt).scalars().all()
def get_torrent_by_id(self, torrent_id: TorrentId) -> TorrentSchema:
return TorrentSchema.model_validate(self.db.get(Torrent, torrent_id))
return [TorrentSchema.model_validate(torrent_schema) for torrent_schema in result]
def get_torrent_by_id(db: Session, torrent_id: TorrentId) -> TorrentSchema:
return TorrentSchema.model_validate(db.get(Torrent, torrent_id))
def delete_torrent(db: Session, torrent_id: TorrentId):
db.delete(db.get(Torrent, torrent_id))
def delete_torrent(self, torrent_id: TorrentId):
self.db.delete(self.db.get(Torrent, torrent_id))

View File

@@ -12,23 +12,16 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
from sqlalchemy.orm import Session
import media_manager.torrent.repository
import media_manager.tv.repository
import media_manager.tv.service
from media_manager.config import BasicConfig
from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.torrent.repository import (
get_seasons_files_of_torrent,
get_show_of_torrent,
save_torrent,
)
from media_manager.torrent.repository import TorrentRepository
from media_manager.torrent.schemas import Torrent, TorrentStatus, TorrentId
from media_manager.torrent.utils import (
list_files_recursively,
get_torrent_filepath,
import_file,
extract_archives,
)
from media_manager.tv.schemas import SeasonFile, Show
from media_manager.tv.schemas import SeasonFile
log = logging.getLogger(__name__)
@@ -65,17 +58,25 @@ class TorrentService:
UNKNOWN_STATE = ("unknown",)
api_client = qbittorrentapi.Client(**TorrentServiceConfig().model_dump())
def __init__(self, db: Session):
def __init__(self, torrent_repository: TorrentRepository):
try:
self.api_client.auth_log_in()
log.info("Successfully logged into qbittorrent")
self.db = db
self.torrent_repository = torrent_repository
except Exception as e:
log.error(f"Failed to log into qbittorrent: {e}")
raise
finally:
self.api_client.auth_log_out()
def get_season_files_of_torrent(self, torrent: Torrent) -> list[SeasonFile]:
"""
Returns all season files of a torrent
:param torrent: the torrent to get the season files of
:return: list of season files
"""
return self.torrent_repository.get_seasons_files_of_torrent(torrent_id=torrent.id)
def download(self, indexer_result: IndexerQueryResult) -> Torrent:
log.info(f"Attempting to download torrent: {indexer_result.title}")
torrent = Torrent(
@@ -136,7 +137,7 @@ class TorrentService:
torrent.status = TorrentStatus.unknown
else:
torrent.status = TorrentStatus.error
save_torrent(db=self.db, torrent_schema=torrent)
self.torrent_repository.save_torrent(torrent=torrent)
return torrent
def cancel_download(self, torrent: Torrent, delete_files: bool = False) -> Torrent:
@@ -170,120 +171,6 @@ class TorrentService:
self.api_client.torrents_resume(torrent_hashes=torrent.hash)
return self.get_torrent_status(torrent=torrent)
def import_torrent(self, torrent: Torrent) -> Torrent:
log.info(f"importing torrent {torrent}")
# get all files, extract archives if necessary and get all files (extracted) files again
all_files = list_files_recursively(path=get_torrent_filepath(torrent=torrent))
log.debug(f"Found {len(all_files)} files downloaded by the torrent")
extract_archives(all_files)
all_files = list_files_recursively(path=get_torrent_filepath(torrent=torrent))
# Filter videos and subtitles from all files
video_files = []
subtitle_files = []
for file in all_files:
file_type = mimetypes.guess_file_type(file)
if file_type[0] is not None:
if file_type[0].startswith("video"):
video_files.append(file)
log.debug(f"File is a video, it will be imported: {file}")
elif file_type[0].startswith("text") and file.suffix == ".srt":
subtitle_files.append(file)
log.debug(f"File is a subtitle, it will be imported: {file}")
else:
log.debug(
f"File is neither a video nor a subtitle, will not be imported: {file}"
)
log.info(
f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files)
)
# Fetch show and season information
show: Show = get_show_of_torrent(db=self.db, torrent_id=torrent.id)
show_file_path = (
BasicConfig().tv_directory
/ f"{show.name} ({show.year}) [{show.metadata_provider}id-{show.external_id}]"
)
season_files: list[SeasonFile] = get_seasons_files_of_torrent(
db=self.db, torrent_id=torrent.id
)
log.info(
f"Found {len(season_files)} season files associated with torrent {torrent.title}"
)
# creating directories and hard linking files
for season_file in season_files:
season = media_manager.tv.service.get_season(
db=self.db, season_id=season_file.season_id
)
season_path = show_file_path / Path(f"Season {season.number}")
try:
season_path.mkdir(parents=True)
except FileExistsError:
log.warning(f"Path already exists: {season_path}")
for episode in season.episodes:
episode_file_name = (
f"{show.name} S{season.number:02d}E{episode.number:02d}"
)
if season_file.file_path_suffix != "":
episode_file_name += f" - {season_file.file_path_suffix}"
pattern = (
r".*[.]S0?"
+ str(season.number)
+ r"E0?"
+ str(episode.number)
+ r"[.].*"
)
subtitle_pattern = pattern + r"[.]([A-Za-z]{2})[.]srt"
target_file_name = season_path / episode_file_name
# import subtitles
for subtitle_file in subtitle_files:
log.debug(
f"Searching for pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
regex_result = re.search(subtitle_pattern, subtitle_file.name)
if regex_result:
language_code = regex_result.group(1)
log.debug(
f"Found matching pattern: {subtitle_pattern} in subtitle file: {subtitle_file.name},"
+ f" extracted language code: {language_code}"
)
target_subtitle_file = target_file_name.with_suffix(
f".{language_code}.srt"
)
import_file(
target_file=target_subtitle_file, source_file=subtitle_file
)
else:
log.debug(
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
# import episode videos
for file in video_files:
log.debug(
f"Searching for pattern {pattern} in video file: {file.name}"
)
if re.search(pattern, file.name):
log.debug(
f"Found matching pattern: {pattern} in file {file.name}"
)
target_video_file = target_file_name.with_suffix(file.suffix)
import_file(target_file=target_video_file, source_file=file)
break
else:
log.warning(
f"S{season.number}E{episode.number} in Torrent {torrent.title}'s files not found."
)
torrent.imported = True
return self.get_torrent_status(torrent=torrent)
def get_all_torrents(self) -> list[Torrent]:
return [
self.get_torrent_status(x)
@@ -302,9 +189,8 @@ class TorrentService:
torrent_id=torrent_id, db=self.db
)
if not t.imported:
media_manager.tv.repository.remove_season_files_by_torrent_id(
db=self.db, torrent_id=torrent_id
)
from media_manager.tv.repository import remove_season_files_by_torrent_id
remove_season_files_by_torrent_id(db=self.db, torrent_id=torrent_id)
media_manager.torrent.repository.delete_torrent(db=self.db, torrent_id=t.id)
def import_all_torrents(self) -> list[Torrent]:

View File

@@ -44,3 +44,36 @@ def import_file(target_file: Path, source_file: Path):
if target_file.exists():
target_file.unlink()
target_file.hardlink_to(source_file)
def import_torrent(torrent: Torrent) -> (list[Path], list[Path], list[Path]):
"""
Extracts all files from the torrent download directory, including extracting archives.
Returns a tuple containing: seperated video files, subtitle files, and all files found in the torrent directory.
"""
log.info(f"Importing torrent {torrent}")
all_files = list_files_recursively(path=get_torrent_filepath(torrent=torrent))
log.debug(f"Found {len(all_files)} files downloaded by the torrent")
extract_archives(all_files)
all_files = list_files_recursively(path=get_torrent_filepath(torrent=torrent))
video_files = []
subtitle_files = []
for file in all_files:
file_type, _ = mimetypes.guess_type(str(file))
if file_type is not None:
if file_type.startswith("video"):
video_files.append(file)
log.debug(f"File is a video, it will be imported: {file}")
elif file_type.startswith("text") and Path(file).suffix == ".srt":
subtitle_files.append(file)
log.debug(f"File is a subtitle, it will be imported: {file}")
else:
log.debug(
f"File is neither a video nor a subtitle, will not be imported: {file}"
)
log.info(
f"Found {len(all_files)} files ({len(video_files)} video files, {len(subtitle_files)} subtitle files) for further processing."
)
return video_files, subtitle_files, all_files

View File

@@ -9,6 +9,8 @@ from media_manager.tv.service import TvService
from media_manager.tv.exceptions import NotFoundError
from fastapi import HTTPException
from media_manager.torrent.dependencies import TorrentServiceDependency
def get_tv_repository(db_session: DbSessionDependency) -> TvRepository:
return TvRepository(db_session)
@@ -19,8 +21,9 @@ tv_repository_dep = Annotated[TvRepository, Depends(get_tv_repository)]
def get_tv_service(
tv_repository: tv_repository_dep,
torrent_service: TorrentServiceDependency
) -> TvService:
return TvService(tv_repository)
return TvService(tv_repository, torrent_service)
tv_service_dep = Annotated[TvService, Depends(get_tv_service)]

View File

@@ -1,3 +1,5 @@
import re
from sqlalchemy.orm import Session
import media_manager.indexer.service
@@ -7,7 +9,6 @@ from media_manager.database import SessionLocal
from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.indexer.schemas import IndexerQueryResultId
from media_manager.metadataProvider.schemas import MetaDataProviderShowSearchResult
from media_manager.torrent.repository import get_seasons_files_of_torrent
from media_manager.torrent.schemas import Torrent
from media_manager.torrent.service import TorrentService
from media_manager.tv import log
@@ -29,11 +30,18 @@ from media_manager.tv.schemas import (
from media_manager.torrent.schemas import QualityStrings
from media_manager.tv.repository import TvRepository
from media_manager.tv.exceptions import NotFoundError
import mimetypes
import pprint
from pathlib import Path
from media_manager.config import BasicConfig
from media_manager.torrent.repository import TorrentRepository
from media_manager.torrent.utils import import_file, import_torrent
class TvService:
def __init__(self, tv_repository: TvRepository):
def __init__(self, tv_repository: TvRepository, torrent_service: TorrentService):
self.tv_repository = tv_repository
self.torrent_service = torrent_service
def add_show(self, external_id: int, metadata_provider: str) -> Show | None:
"""
@@ -278,10 +286,11 @@ class TvService:
if season_file.torrent_id is None:
return True
else:
torrent_file = media_manager.torrent.repository.get_torrent_by_id(
db=self.tv_repository.db, torrent_id=season_file.torrent_id
torrent_file = self.torrent_service.get_torrent_by_id(
torrent_id=season_file.torrent_id
)
if torrent_file.imported:
print("Servas")
return True
return False
@@ -329,8 +338,8 @@ class TvService:
seasons = self.tv_repository.get_seasons_by_torrent_id(
torrent_id=show_torrent.id
)
season_files = get_seasons_files_of_torrent(
db=self.tv_repository.db, torrent_id=show_torrent.id
season_files = self.torrent_service.get_season_files_of_torrent(
torrent=show_torrent
)
file_path_suffix = season_files[0].file_path_suffix if season_files else ""
season_torrent = RichSeasonTorrent(
@@ -377,9 +386,7 @@ class TvService:
indexer_result = media_manager.indexer.service.get_indexer_query_result(
db=self.tv_repository.db, result_id=public_indexer_result_id
)
show_torrent = TorrentService(db=self.tv_repository.db).download(
indexer_result=indexer_result
)
show_torrent = self.torrent_service.download(indexer_result=indexer_result)
for season_number in indexer_result.season:
season = self.tv_repository.get_season_by_number(
@@ -401,7 +408,7 @@ class TvService:
Download an approved season request.
:param season_request: The season request to download.
:param show_id: The ID of the show.
:param show: The Show object.
:return: True if the download was successful, False otherwise.
:raises ValueError: If the season request is not authorized.
"""
@@ -448,9 +455,7 @@ class TvService:
available_torrents.sort()
torrent = TorrentService(db=self.tv_repository.db).download(
indexer_result=available_torrents[0]
)
torrent = self.torrent_service.download(indexer_result=available_torrents[0])
season_file = SeasonFile(
season_id=season.id,
quality=torrent.quality,
@@ -460,6 +465,93 @@ class TvService:
self.tv_repository.add_season_file(season_file=season_file)
return True
def import_torrent_files(self, torrent: Torrent, show: Show) -> None:
"""
Organizes files from a torrent into the TV directory structure, mapping them to seasons and episodes.
:param torrent: The Torrent object
:param show: The Show object
"""
video_files, subtitle_files = import_torrent(torrent=torrent)
log.info(
f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files)
)
show_file_path = (
BasicConfig().tv_directory
/ f"{show.name} ({show.year}) [{show.metadata_provider}id-{show.external_id}]"
)
season_files = self.torrent_service.get_season_files_of_torrent(torrent=torrent)
log.info(
f"Found {len(season_files)} season files associated with torrent {torrent.title}"
)
for season_file in season_files:
season = self.get_season(season_id=season_file.season_id)
season_path = show_file_path / Path(f"Season {season.number}")
try:
season_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
log.warning(f"Could not create path {season_path}: {e}")
for episode in season.episodes:
episode_file_name = (
f"{show.name} S{season.number:02d}E{episode.number:02d}"
)
if season_file.file_path_suffix != "":
episode_file_name += f" - {season_file.file_path_suffix}"
pattern = (
r".*[.]S0?"
+ str(season.number)
+ r"E0?"
+ str(episode.number)
+ r"[.].*"
)
subtitle_pattern = pattern + r"[.]([A-Za-z]{2})[.]srt"
target_file_name = season_path / episode_file_name
# import subtitles
for subtitle_file in subtitle_files:
log.debug(
f"Searching for pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
regex_result = re.search(subtitle_pattern, subtitle_file.name)
if regex_result:
language_code = regex_result.group(1)
log.debug(
f"Found matching pattern: {subtitle_pattern} in subtitle file: {subtitle_file.name},"
+ f" extracted language code: {language_code}"
)
target_subtitle_file = target_file_name.with_suffix(
f".{language_code}.srt"
)
import_file(
target_file=target_subtitle_file, source_file=subtitle_file
)
else:
log.debug(
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
# import episode videos
for file in video_files:
log.debug(
f"Searching for pattern {pattern} in video file: {file.name}"
)
if re.search(pattern, file.name):
log.debug(
f"Found matching pattern: {pattern} in file {file.name}"
)
target_video_file = target_file_name.with_suffix(file.suffix)
import_file(target_file=target_video_file, source_file=file)
break
else:
# TODO: notify admin that no video file was found for this episode
log.warning(
f"S{season.number}E{episode.number} in Torrent {torrent.title}'s files not found."
)
log.info(f"Finished organizing files for torrent {torrent.title}")
def auto_download_all_approved_season_requests() -> None:
"""
@@ -467,8 +559,9 @@ def auto_download_all_approved_season_requests() -> None:
This is a standalone function as it creates its own DB session.
"""
db: Session = SessionLocal()
tv_repository = TvRepository(db)
tv_service = TvService(tv_repository)
tv_repository = TvRepository(db=db)
torrent_service = TorrentService(torrent_repository=TorrentRepository(db=db))
tv_service = TvService(tv_repository=tv_repository, torrent_service=torrent_service)
log.info("Auto downloading all approved season requests")
season_requests = tv_repository.get_season_requests()
@@ -482,8 +575,7 @@ def auto_download_all_approved_season_requests() -> None:
season_id=season_request.season_id
)
if tv_service.download_approved_season_request(
season_request=season_request, show_id=show.id
):
season_request=season_request, show=show):
count += 1
else:
log.warning(

View File

@@ -17,11 +17,16 @@ def mock_tv_repository():
@pytest.fixture
def tv_service(mock_tv_repository):
return TvService(tv_repository=mock_tv_repository)
def mock_torrent_service():
return MagicMock()
def test_add_show(tv_service, mock_tv_repository):
@pytest.fixture
def tv_service(mock_tv_repository, mock_torrent_service):
return TvService(tv_repository=mock_tv_repository, torrent_service=mock_torrent_service)
def test_add_show(tv_service, mock_tv_repository, mock_torrent_service):
external_id = 123
metadata_provider = "tmdb"
show_data = Show(
@@ -49,7 +54,7 @@ def test_add_show(tv_service, mock_tv_repository):
assert result == show_data
def test_add_show_with_invalid_metadata(monkeypatch, tv_service, mock_tv_repository):
def test_add_show_with_invalid_metadata(monkeypatch, tv_service, mock_tv_repository, mock_torrent_service):
external_id = 123
metadata_provider = "tmdb"
# Simulate metadata provider returning None
@@ -63,7 +68,7 @@ def test_add_show_with_invalid_metadata(monkeypatch, tv_service, mock_tv_reposit
assert result is None
def test_check_if_show_exists_by_external_id(tv_service, mock_tv_repository):
def test_check_if_show_exists_by_external_id(tv_service, mock_tv_repository, mock_torrent_service):
external_id = 123
metadata_provider = "tmdb"
mock_tv_repository.get_show_by_external_id.return_value = "show_obj"
@@ -80,7 +85,7 @@ def test_check_if_show_exists_by_external_id(tv_service, mock_tv_repository):
)
def test_check_if_show_exists_by_show_id(tv_service, mock_tv_repository):
def test_check_if_show_exists_by_show_id(tv_service, mock_tv_repository, mock_torrent_service):
show_id = ShowId(uuid.uuid4())
mock_tv_repository.get_show_by_id.return_value = "show_obj"
assert tv_service.check_if_show_exists(show_id=show_id)
@@ -90,19 +95,19 @@ def test_check_if_show_exists_by_show_id(tv_service, mock_tv_repository):
assert not tv_service.check_if_show_exists(show_id=show_id)
def test_check_if_show_exists_with_invalid_uuid(tv_service, mock_tv_repository):
def test_check_if_show_exists_with_invalid_uuid(tv_service, mock_tv_repository, mock_torrent_service):
# Simulate NotFoundError for a random UUID
show_id = uuid.uuid4()
mock_tv_repository.get_show_by_id.side_effect = NotFoundError
assert not tv_service.check_if_show_exists(show_id=show_id)
def test_check_if_show_exists_raises_value_error(tv_service):
def test_check_if_show_exists_raises_value_error(tv_service, mock_torrent_service):
with pytest.raises(ValueError):
tv_service.check_if_show_exists()
def test_add_season_request(tv_service, mock_tv_repository):
def test_add_season_request(tv_service, mock_tv_repository, mock_torrent_service):
season_request = MagicMock()
mock_tv_repository.add_season_request.return_value = season_request
result = tv_service.add_season_request(season_request)
@@ -112,7 +117,7 @@ def test_add_season_request(tv_service, mock_tv_repository):
assert result == season_request
def test_get_season_request_by_id(tv_service, mock_tv_repository):
def test_get_season_request_by_id(tv_service, mock_tv_repository, mock_torrent_service):
season_request_id = MagicMock()
season_request = MagicMock()
mock_tv_repository.get_season_request.return_value = season_request
@@ -123,7 +128,7 @@ def test_get_season_request_by_id(tv_service, mock_tv_repository):
assert result == season_request
def test_update_season_request(tv_service, mock_tv_repository):
def test_update_season_request(tv_service, mock_tv_repository, mock_torrent_service):
season_request = MagicMock()
mock_tv_repository.add_season_request.return_value = season_request
result = tv_service.update_season_request(season_request)
@@ -136,7 +141,7 @@ def test_update_season_request(tv_service, mock_tv_repository):
assert result == season_request
def test_delete_season_request(tv_service, mock_tv_repository):
def test_delete_season_request(tv_service, mock_tv_repository, mock_torrent_service):
season_request_id = MagicMock()
tv_service.delete_season_request(season_request_id)
mock_tv_repository.delete_season_request.assert_called_once_with(
@@ -144,7 +149,7 @@ def test_delete_season_request(tv_service, mock_tv_repository):
)
def test_get_all_shows(tv_service, mock_tv_repository):
def test_get_all_shows(tv_service, mock_tv_repository, mock_torrent_service):
shows = [MagicMock(), MagicMock()]
mock_tv_repository.get_shows.return_value = shows
result = tv_service.get_all_shows()
@@ -152,7 +157,7 @@ def test_get_all_shows(tv_service, mock_tv_repository):
assert result == shows
def test_get_show_by_id(tv_service, mock_tv_repository):
def test_get_show_by_id(tv_service, mock_tv_repository, mock_torrent_service):
show_id = MagicMock()
show = MagicMock()
mock_tv_repository.get_show_by_id.return_value = show
@@ -161,7 +166,7 @@ def test_get_show_by_id(tv_service, mock_tv_repository):
assert result == show
def test_get_show_by_id_not_found(tv_service, mock_tv_repository):
def test_get_show_by_id_not_found(tv_service, mock_tv_repository, mock_torrent_service):
show_id = uuid.uuid4()
mock_tv_repository.get_show_by_id.side_effect = NotFoundError
try:
@@ -172,7 +177,7 @@ def test_get_show_by_id_not_found(tv_service, mock_tv_repository):
assert False
def test_get_show_by_external_id(tv_service, mock_tv_repository):
def test_get_show_by_external_id(tv_service, mock_tv_repository, mock_torrent_service):
external_id = 123
metadata_provider = "tmdb"
show = MagicMock()
@@ -184,7 +189,7 @@ def test_get_show_by_external_id(tv_service, mock_tv_repository):
assert result == show
def test_get_show_by_external_id_not_found(tv_service, mock_tv_repository):
def test_get_show_by_external_id_not_found(tv_service, mock_tv_repository, mock_torrent_service):
external_id = 123
metadata_provider = "tmdb"
mock_tv_repository.get_show_by_external_id.side_effect = NotFoundError
@@ -196,7 +201,7 @@ def test_get_show_by_external_id_not_found(tv_service, mock_tv_repository):
assert False
def test_get_season(tv_service, mock_tv_repository):
def test_get_season(tv_service, mock_tv_repository, mock_torrent_service):
season_id = MagicMock()
season = MagicMock()
mock_tv_repository.get_season.return_value = season
@@ -205,7 +210,7 @@ def test_get_season(tv_service, mock_tv_repository):
assert result == season
def test_get_season_not_found(tv_service, mock_tv_repository):
def test_get_season_not_found(tv_service, mock_tv_repository, mock_torrent_service):
season_id = uuid.uuid4()
mock_tv_repository.get_season.side_effect = NotFoundError
try:
@@ -216,7 +221,7 @@ def test_get_season_not_found(tv_service, mock_tv_repository):
assert False
def test_get_all_season_requests(tv_service, mock_tv_repository):
def test_get_all_season_requests(tv_service, mock_tv_repository, mock_torrent_service):
requests = [MagicMock(), MagicMock()]
mock_tv_repository.get_season_requests.return_value = requests
result = tv_service.get_all_season_requests()
@@ -225,7 +230,7 @@ def test_get_all_season_requests(tv_service, mock_tv_repository):
def test_get_public_season_files_by_season_id_downloaded(
monkeypatch, tv_service, mock_tv_repository
monkeypatch, tv_service, mock_tv_repository, mock_torrent_service
):
season_id = MagicMock()
season_file = MagicMock()
@@ -244,7 +249,7 @@ def test_get_public_season_files_by_season_id_downloaded(
def test_get_public_season_files_by_season_id_not_downloaded(
monkeypatch, tv_service, mock_tv_repository
monkeypatch, tv_service, mock_tv_repository, mock_torrent_service
):
season_id = MagicMock()
season_file = MagicMock()
@@ -262,14 +267,14 @@ def test_get_public_season_files_by_season_id_not_downloaded(
assert result[0].downloaded is False
def test_get_public_season_files_by_season_id_empty(tv_service, mock_tv_repository):
def test_get_public_season_files_by_season_id_empty(tv_service, mock_tv_repository, mock_torrent_service):
season_id = uuid.uuid4()
mock_tv_repository.get_season_files_by_season_id.return_value = []
result = tv_service.get_public_season_files_by_season_id(season_id)
assert result == []
def test_is_season_downloaded_true(monkeypatch, tv_service, mock_tv_repository):
def test_is_season_downloaded_true(monkeypatch, tv_service, mock_tv_repository, mock_torrent_service):
season_id = MagicMock()
season_file = MagicMock()
mock_tv_repository.get_season_files_by_season_id.return_value = [season_file]
@@ -279,7 +284,7 @@ def test_is_season_downloaded_true(monkeypatch, tv_service, mock_tv_repository):
assert tv_service.is_season_downloaded(season_id) is True
def test_is_season_downloaded_false(monkeypatch, tv_service, mock_tv_repository):
def test_is_season_downloaded_false(monkeypatch, tv_service, mock_tv_repository, mock_torrent_service):
season_id = MagicMock()
season_file = MagicMock()
mock_tv_repository.get_season_files_by_season_id.return_value = [season_file]
@@ -289,82 +294,67 @@ def test_is_season_downloaded_false(monkeypatch, tv_service, mock_tv_repository)
assert tv_service.is_season_downloaded(season_id) is False
def test_is_season_downloaded_with_no_files(tv_service, mock_tv_repository):
def test_is_season_downloaded_with_no_files(tv_service, mock_tv_repository, mock_torrent_service):
season_id = uuid.uuid4()
mock_tv_repository.get_season_files_by_season_id.return_value = []
assert tv_service.is_season_downloaded(season_id) is False
def test_season_file_exists_on_file_none(monkeypatch, tv_service):
def test_season_file_exists_on_file_none(monkeypatch, tv_service, mock_torrent_service):
season_file = MagicMock()
season_file.torrent_id = None
assert tv_service.season_file_exists_on_file(season_file) is True
def test_season_file_exists_on_file_imported(monkeypatch, tv_service):
def test_season_file_exists_on_file_imported(monkeypatch, tv_service, mock_torrent_service):
season_file = MagicMock()
season_file.torrent_id = "torrent_id"
torrent_file = MagicMock(imported=True)
monkeypatch.setattr(
"media_manager.torrent.repository.get_torrent_by_id",
lambda db, torrent_id: torrent_file,
)
tv_service.tv_repository.db = MagicMock()
# Patch the repository method on the torrent_service instance
tv_service.torrent_service.torrent_repository.get_torrent_by_id = MagicMock(return_value=torrent_file)
assert tv_service.season_file_exists_on_file(season_file) is True
def test_season_file_exists_on_file_not_imported(monkeypatch, tv_service):
def test_season_file_exists_on_file_not_imported(monkeypatch, tv_service, mock_torrent_service):
season_file = MagicMock()
season_file.torrent_id = "torrent_id"
torrent_file = MagicMock(imported=False)
monkeypatch.setattr(
"media_manager.torrent.repository.get_torrent_by_id",
lambda db, torrent_id: torrent_file,
)
tv_service.tv_repository.db = MagicMock()
torrent_file = MagicMock()
torrent_file.torrent_id = "torrent_id"
torrent_file.imported = False
tv_service.torrent_service.get_torrent_by_id = MagicMock(return_value=torrent_file)
assert tv_service.season_file_exists_on_file(season_file) is False
def test_season_file_exists_on_file_with_none_imported(monkeypatch, tv_service):
def test_season_file_exists_on_file_with_none_imported(monkeypatch, tv_service, mock_torrent_service):
class DummyFile:
def __init__(self):
self.torrent_id = uuid.uuid4()
dummy_file = DummyFile()
# Simulate a torrent object with imported=True
class DummyTorrent:
imported = True
monkeypatch.setattr(
"media_manager.torrent.repository.get_torrent_by_id",
lambda db, torrent_id: DummyTorrent(),
)
tv_service.tv_repository.db = MagicMock()
tv_service.torrent_service.torrent_repository.get_torrent_by_id = MagicMock(return_value=DummyTorrent())
assert tv_service.season_file_exists_on_file(dummy_file) is True
def test_season_file_exists_on_file_with_none_not_imported(monkeypatch, tv_service):
def test_season_file_exists_on_file_with_none_not_imported(monkeypatch, tv_service, mock_torrent_service):
class DummyFile:
def __init__(self):
self.torrent_id = uuid.uuid4()
dummy_file = DummyFile()
# Simulate a torrent object with imported=False
class DummyTorrent:
imported = False
monkeypatch.setattr(
"media_manager.torrent.repository.get_torrent_by_id",
lambda db, torrent_id: DummyTorrent(),
)
tv_service.tv_repository.db = MagicMock()
tv_service.torrent_service.get_torrent_by_id = MagicMock(return_value=DummyTorrent())
assert tv_service.season_file_exists_on_file(dummy_file) is False
def test_get_all_available_torrents_for_a_season_no_override(
tv_service, mock_tv_repository, monkeypatch
tv_service, mock_tv_repository, mock_torrent_service, monkeypatch
):
show_id = ShowId(uuid.uuid4())
season_number = 1
@@ -446,7 +436,7 @@ def test_get_all_available_torrents_for_a_season_no_override(
def test_get_all_available_torrents_for_a_season_with_override(
tv_service, mock_tv_repository, monkeypatch
tv_service, mock_tv_repository, mock_torrent_service, monkeypatch
):
show_id = ShowId(uuid.uuid4())
season_number = 1
@@ -485,7 +475,7 @@ def test_get_all_available_torrents_for_a_season_with_override(
def test_get_all_available_torrents_for_a_season_no_results(
tv_service, mock_tv_repository, monkeypatch
tv_service, mock_tv_repository, mock_torrent_service, monkeypatch
):
show_id = ShowId(uuid.uuid4())
season_number = 1
@@ -509,7 +499,7 @@ def test_get_all_available_torrents_for_a_season_no_results(
assert results == []
def test_search_for_show_no_existing(tv_service, monkeypatch):
def test_search_for_show_no_existing(tv_service, mock_torrent_service, monkeypatch):
query = "Test Show"
metadata_provider = "tmdb"
search_result_item = MetaDataProviderShowSearchResult(
@@ -540,7 +530,7 @@ def test_search_for_show_no_existing(tv_service, monkeypatch):
assert results[0].added is False # Should not be marked as added
def test_search_for_show_with_existing(tv_service, monkeypatch):
def test_search_for_show_with_existing(tv_service, mock_torrent_service, monkeypatch):
query = "Test Show"
metadata_provider = "tmdb"
search_result_item = MetaDataProviderShowSearchResult(
@@ -569,7 +559,7 @@ def test_search_for_show_with_existing(tv_service, monkeypatch):
assert results[0].added is True # Should be marked as added
def test_search_for_show_empty_results(tv_service, monkeypatch):
def test_search_for_show_empty_results(tv_service, mock_torrent_service, monkeypatch):
query = "NonExistent Show"
metadata_provider = "tmdb"
mock_search_show = MagicMock(return_value=[])
@@ -581,7 +571,7 @@ def test_search_for_show_empty_results(tv_service, monkeypatch):
assert results == []
def test_get_popular_shows_none_added(tv_service, monkeypatch):
def test_get_popular_shows_none_added(tv_service, mock_torrent_service, monkeypatch):
metadata_provider = "tmdb"
popular_show1 = MetaDataProviderShowSearchResult(
external_id=123,
@@ -614,7 +604,7 @@ def test_get_popular_shows_none_added(tv_service, monkeypatch):
assert popular_show2 in results
def test_get_popular_shows_all_added(tv_service, monkeypatch):
def test_get_popular_shows_all_added(tv_service, mock_torrent_service, monkeypatch):
metadata_provider = "tmdb"
popular_show1 = MetaDataProviderShowSearchResult(
external_id=123,
@@ -635,7 +625,7 @@ def test_get_popular_shows_all_added(tv_service, monkeypatch):
assert results == []
def test_get_popular_shows_empty_from_provider(tv_service, monkeypatch):
def test_get_popular_shows_empty_from_provider(tv_service, mock_torrent_service, monkeypatch):
metadata_provider = "tmdb"
mock_search_show = MagicMock(return_value=[])
monkeypatch.setattr("media_manager.metadataProvider.search_show", mock_search_show)