Refactor tv and movies (#526)

This PR refactors the movie and tv modules and adds a "common" module
for shared logic.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Dedicated import and metadata services for movies and TV;
completed-torrent detection and import flows.

* **Refactor**
* Shared media schemas, models, repository logic and base services
consolidated; movie/TV services and routes now delegate to specialised
import/metadata services.

* **Bug Fixes**
  * Fixed TV episode-count method name.

* **Chores**
  * Added .DS_Store to ignore list; added module comment.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Maximilian Dorninger
2026-05-07 14:18:29 +02:00
committed by GitHub
parent fcdb9bbe87
commit 25cd4b0724
23 changed files with 1516 additions and 1975 deletions

1
.gitignore vendored
View File

@@ -51,3 +51,4 @@ __pycache__
# MkDocs # MkDocs
site/ site/
.DS_Store

View File

@@ -0,0 +1 @@
# Common base classes for media modules

View File

@@ -0,0 +1,34 @@
from uuid import UUID
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from media_manager.torrent.models import Quality
class MediaMixin:
"""
Mixin for common media fields used by both Movies and TV Shows.
"""
id: Mapped[UUID] = mapped_column(primary_key=True)
external_id: Mapped[int]
metadata_provider: Mapped[str]
name: Mapped[str]
overview: Mapped[str]
year: Mapped[int | None]
library: Mapped[str] = mapped_column(default="Default")
original_language: Mapped[str | None] = mapped_column(default=None)
imdb_id: Mapped[str | None] = mapped_column(default=None)
class MediaFileMixin:
"""
Mixin for common media file fields used by both Movie files and Episode files.
"""
file_path_suffix: Mapped[str]
quality: Mapped[Quality]
torrent_id: Mapped[UUID | None] = mapped_column(
ForeignKey(column="torrent.id", ondelete="SET NULL"),
)

View File

@@ -0,0 +1,175 @@
import logging
from typing import Any, TypeVar
from uuid import UUID
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session
from media_manager.exceptions import ConflictError, NotFoundError
log = logging.getLogger(__name__)
T = TypeVar("T")
S = TypeVar("S")
EntityId = UUID | int | str
class BaseRepository[T, S]:
"""
Base repository providing common CRUD operations for media models.
"""
def __init__(self, db: Session, model: type[T], schema: type[S]) -> None:
self.db = db
self.model = model
self.schema = schema
def get_by_id(self, entity_id: EntityId) -> S:
result = self.db.get(self.model, entity_id)
if not result:
msg = f"{self.model.__name__} with id {entity_id} not found."
raise NotFoundError(msg)
return self.schema.model_validate(result)
def get_by_external_id(self, external_id: int, metadata_provider: str) -> S:
stmt = select(self.model).where(
self.model.external_id == external_id,
self.model.metadata_provider == metadata_provider,
)
result = self.db.execute(stmt).scalar_one_or_none()
if not result:
msg = f"{self.model.__name__} with external_id {external_id} and provider {metadata_provider} not found."
raise NotFoundError(msg)
return self.schema.model_validate(result)
def get_all(self) -> list[S]:
stmt = select(self.model)
results = self.db.execute(stmt).scalars().unique().all()
return [self.schema.model_validate(r) for r in results]
def delete(self, entity_id: EntityId) -> None:
obj = self.db.get(self.model, entity_id)
if not obj:
msg = f"{self.model.__name__} with id {entity_id} not found."
raise NotFoundError(msg)
self.db.delete(obj)
self.db.commit()
def set_library(self, entity_id: EntityId, library: str) -> None:
obj = self.db.get(self.model, entity_id)
if not obj:
msg = f"{self.model.__name__} with id {entity_id} not found."
raise NotFoundError(msg)
obj.library = library
self.db.commit()
def save_media_base(
self,
media_schema: S,
model_class: type[T],
exclude: set[str] | None = None,
) -> S:
"""
Generic save method for media models.
"""
if exclude is None:
exclude = set()
db_obj = self.db.get(model_class, media_schema.id) if media_schema.id else None
if db_obj:
# Update existing
# Always exclude "id" from updates
update_exclude = exclude | {"id"}
for key, value in media_schema.model_dump(exclude=update_exclude).items():
if hasattr(db_obj, key):
setattr(db_obj, key, value)
else:
# Insert new
db_obj = model_class(**media_schema.model_dump(exclude=exclude))
self.db.add(db_obj)
try:
self.db.commit()
self.db.refresh(db_obj)
except IntegrityError as e:
self.db.rollback()
msg = f"Integrity error while saving {model_class.__name__}: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while saving {model_class.__name__}")
raise
else:
return self.schema.model_validate(db_obj)
def update_media_attributes_base(
self,
media_id: EntityId,
model_class: type[T],
**attributes: Any, # noqa: ANN401
) -> S:
"""
Generic update method for media attributes.
"""
db_obj = self.db.get(model_class, media_id)
if not db_obj:
msg = f"{model_class.__name__} with id {media_id} not found."
raise NotFoundError(msg)
updated = False
for key, value in attributes.items():
if (
value is not None
and hasattr(db_obj, key)
and getattr(db_obj, key) != value
):
setattr(db_obj, key, value)
updated = True
if updated:
try:
self.db.commit()
self.db.refresh(db_obj)
except SQLAlchemyError:
self.db.rollback()
raise
return self.schema.model_validate(db_obj)
def add_media_file_base(
self, file_schema: S, model_class: type[T], schema_class: type[S]
) -> S:
"""
Generic method to add a media file record.
"""
db_model = model_class(**file_schema.model_dump())
try:
self.db.add(db_model)
self.db.commit()
self.db.refresh(db_model)
except IntegrityError:
self.db.rollback()
raise
except SQLAlchemyError:
self.db.rollback()
raise
else:
return schema_class.model_validate(db_model)
def remove_files_by_torrent_id_base(
self, torrent_id: EntityId, model_class: type[T]
) -> int:
"""
Generic method to remove media files by torrent ID.
"""
try:
stmt = delete(model_class).where(model_class.torrent_id == torrent_id)
result = self.db.execute(stmt)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
raise
else:
return result.rowcount

View File

@@ -0,0 +1,33 @@
import uuid
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from media_manager.torrent.models import Quality
class BaseMedia(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID = Field(default_factory=uuid.uuid4)
name: str
overview: str
year: int | None
external_id: int
metadata_provider: str
library: str = "Default"
original_language: str | None = None
imdb_id: str | None = None
class BaseMediaFile(BaseModel):
model_config = ConfigDict(from_attributes=True)
quality: Quality
torrent_id: UUID | None = None
file_path_suffix: str
class PublicMediaFile(BaseMediaFile):
downloaded: bool = False
imported: bool = False

View File

@@ -0,0 +1,256 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, TypeVar
from media_manager.common.repository import BaseRepository
from media_manager.exceptions import InvalidConfigError, NotFoundError
from media_manager.indexer.service import IndexerService
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
get_importable_media_directories,
remove_special_characters,
)
log = logging.getLogger(__name__)
T = TypeVar("T")
S = TypeVar("S")
class BaseMediaService[T, S]:
"""
Base service providing common logic for media modules.
"""
def __init__(
self,
repository: BaseRepository[T, S],
torrent_service: TorrentService,
indexer_service: IndexerService,
notification_service: NotificationService,
) -> None:
self.repository = repository
self.torrent_service = torrent_service
self.indexer_service = indexer_service
self.notification_service = notification_service
def get_all_media(self) -> list[S]:
return self.repository.get_all()
def get_root_directory(
self, media: S, default_dir: Path, libraries: list[Any]
) -> Path:
"""
Determines the root directory for a media item.
"""
if hasattr(media, "library") and media.library:
for library in libraries:
if library.name == media.library:
return Path(library.path) / Path(
remove_special_characters(media.name)
)
return default_dir / Path(remove_special_characters(media.name))
def get_media_root_path(self, media: S) -> Path:
"""
To be implemented by subclasses if they have specific directory logic.
"""
raise NotImplementedError
def notify_import_success(self, media_name: str, media_type: str) -> None:
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title=f"{media_type.capitalize()} Downloaded",
message=f"{media_type.capitalize()} {media_name} has been successfully downloaded and imported.",
)
def notify_import_failure(
self, media_name: str, media_type: str, error_msg: str = ""
) -> None:
if self.notification_service:
msg = f"Failed to import files for {media_type} {media_name}."
if error_msg:
msg += f" Error: {error_msg}"
self.notification_service.send_notification_to_all_providers(
title="Import Failed",
message=msg,
)
def get_import_candidates(
self,
directory: Path,
metadata_provider: AbstractMetadataProvider,
search_func: Callable[
[str, AbstractMetadataProvider], list[MetaDataProviderSearchResult]
],
) -> MediaImportSuggestion:
# Implementation from previous turn
name, _ = self._extract_name_and_year(directory.name)
candidates = search_func(name, metadata_provider)
return MediaImportSuggestion(
directory=str(directory),
candidates=candidates,
)
def _extract_name_and_year(self, directory_name: str) -> tuple[str, int | None]:
import re
match = re.search(r"^(.*)\s\((\d{4})\)$", directory_name)
if match:
return match.group(1), int(match.group(2))
return directory_name, None
def get_importable_media(
self,
root_path: Path,
metadata_provider: AbstractMetadataProvider,
get_candidates_func: Callable[
[Path, AbstractMetadataProvider], MediaImportSuggestion
],
) -> list[MediaImportSuggestion]:
importable_dirs = get_importable_media_directories(root_path)
return [
get_candidates_func(directory, metadata_provider)
for directory in importable_dirs
]
def import_existing_media(
self,
media: S,
source_directory: Path,
import_func: Callable[[S, Path, Callable[[Any], None]], bool],
add_file_record_func: Callable[[Any], Any],
) -> bool:
success = import_func(media, source_directory, add_file_record_func)
if success:
log.info(f"Successfully imported {media.name} from {source_directory}")
return success
def import_all_torrents_base(
self,
get_media_func: Callable[[Any], S],
import_torrent_func: Callable[[Any, S], None],
media_type_name: str,
) -> None:
log.info(f"Importing all torrents for {media_type_name}")
torrents = self.torrent_service.get_completed_torrents()
for t in torrents:
if t.imported:
continue
try:
media = get_media_func(t)
if media:
import_torrent_func(t, media)
except Exception:
log.exception(f"Error importing torrent {t.title}")
log.info(f"Finished importing all torrents for {media_type_name}")
class BaseMetadataService[T, S]:
"""
Base service for metadata operations.
"""
def __init__(self, repository: BaseRepository[T, S]) -> None:
self.repository = repository
def check_if_exists(self, external_id: int, metadata_provider: str) -> bool:
try:
self.repository.get_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
except NotFoundError:
return False
else:
return True
def add_media_base(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider, # noqa: ARG002
get_metadata_func: Callable[..., S],
save_func: Callable[[S], S],
download_poster_func: Callable[[S], bool],
language: str | None = None,
) -> S:
media_with_metadata = get_metadata_func(external_id, language=language)
if not media_with_metadata:
raise NotFoundError
saved_media = save_func(media_with_metadata)
download_poster_func(saved_media)
return saved_media
def search_for_media_base(
self,
query: str,
metadata_provider: AbstractMetadataProvider,
search_func: Callable[[str | None], list[MetaDataProviderSearchResult]],
get_by_external_id_func: Callable[..., S],
) -> list[MetaDataProviderSearchResult]:
results = search_func(query)
for result in results:
if self.check_if_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
):
result.added = True
try:
media = get_by_external_id_func(
external_id=result.external_id,
metadata_provider=metadata_provider.name,
)
result.id = media.id
except Exception:
log.exception(
f"Unable to find internal ID for {result.external_id} on {metadata_provider.name}"
)
return results
def get_popular_media_base(
self,
metadata_provider: AbstractMetadataProvider,
search_func: Callable[[str | None], list[MetaDataProviderSearchResult]],
) -> list[MetaDataProviderSearchResult]:
results = search_func(None)
return [
result
for result in results
if not self.check_if_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
)
]
def update_all_metadata_base(
self,
get_all_to_update_func: Callable[[], list[S]],
update_single_func: Callable[[S, AbstractMetadataProvider], S | None],
tmdb_provider_class: Callable[[], AbstractMetadataProvider],
tvdb_provider_class: Callable[[], AbstractMetadataProvider],
media_type_name: str,
) -> None:
log.info(f"Updating metadata for all {media_type_name}")
media_list = get_all_to_update_func()
log.info(f"Found {len(media_list)} {media_type_name} to update")
for item in media_list:
try:
if item.metadata_provider == "tmdb":
provider = tmdb_provider_class()
elif item.metadata_provider == "tvdb":
provider = tvdb_provider_class()
else:
log.error(
f"Unsupported provider {item.metadata_provider} for {item.name}"
)
continue
update_single_func(item, provider)
except InvalidConfigError:
log.exception(f"Config error for {item.name}")
except Exception:
log.exception(f"Error updating {item.name}")

View File

@@ -5,6 +5,8 @@ from fastapi import Depends, HTTPException, Path
from media_manager.database import DbSessionDependency from media_manager.database import DbSessionDependency
from media_manager.exceptions import NotFoundError from media_manager.exceptions import NotFoundError
from media_manager.indexer.dependencies import indexer_service_dep from media_manager.indexer.dependencies import indexer_service_dep
from media_manager.movies.importer import MovieImportService
from media_manager.movies.metadata import MovieMetadataService
from media_manager.movies.repository import MovieRepository from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import Movie, MovieId from media_manager.movies.schemas import Movie, MovieId
from media_manager.movies.service import MovieService from media_manager.movies.service import MovieService
@@ -19,17 +21,51 @@ def get_movie_repository(db_session: DbSessionDependency) -> MovieRepository:
movie_repository_dep = Annotated[MovieRepository, Depends(get_movie_repository)] movie_repository_dep = Annotated[MovieRepository, Depends(get_movie_repository)]
def get_movie_metadata_service(
movie_repository: movie_repository_dep,
) -> MovieMetadataService:
return MovieMetadataService(movie_repository=movie_repository)
movie_metadata_service_dep = Annotated[
MovieMetadataService, Depends(get_movie_metadata_service)
]
def get_movie_import_service(
movie_repository: movie_repository_dep,
torrent_service: torrent_service_dep,
notification_service: notification_service_dep,
movie_metadata_service: movie_metadata_service_dep,
) -> MovieImportService:
return MovieImportService(
movie_repository=movie_repository,
torrent_service=torrent_service,
notification_service=notification_service,
movie_metadata_service=movie_metadata_service,
)
movie_import_service_dep = Annotated[
MovieImportService, Depends(get_movie_import_service)
]
def get_movie_service( def get_movie_service(
movie_repository: movie_repository_dep, movie_repository: movie_repository_dep,
torrent_service: torrent_service_dep, torrent_service: torrent_service_dep,
indexer_service: indexer_service_dep, indexer_service: indexer_service_dep,
notification_service: notification_service_dep, notification_service: notification_service_dep,
movie_import_service: movie_import_service_dep,
movie_metadata_service: movie_metadata_service_dep,
) -> MovieService: ) -> MovieService:
return MovieService( return MovieService(
movie_repository=movie_repository, movie_repository=movie_repository,
torrent_service=torrent_service, torrent_service=torrent_service,
indexer_service=indexer_service, indexer_service=indexer_service,
notification_service=notification_service, notification_service=notification_service,
movie_import_service=movie_import_service,
movie_metadata_service=movie_metadata_service,
) )

View File

@@ -0,0 +1,167 @@
import logging
import re
from collections.abc import Callable
from pathlib import Path
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.movies.metadata import MovieMetadataService
from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import Movie, MovieFile
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import Quality, Torrent
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
get_files_for_import,
import_file,
remove_special_characters,
)
log = logging.getLogger(__name__)
class MovieImportService(BaseMediaService[Movie, Movie]):
def __init__(
self,
movie_repository: MovieRepository,
torrent_service: TorrentService,
notification_service: NotificationService,
movie_metadata_service: MovieMetadataService,
) -> None:
super().__init__(
repository=movie_repository,
torrent_service=torrent_service,
indexer_service=None, # type: ignore[arg-type]
notification_service=notification_service,
)
self.movie_repository = movie_repository
self.movie_metadata_service = movie_metadata_service
def get_media_root_path(self, media: Movie) -> Path:
misc_config = MediaManagerConfig().misc
return self.get_root_directory(
media=media,
default_dir=misc_config.movie_directory,
libraries=misc_config.movie_libraries,
)
def import_movie(
self,
movie: Movie,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
if not video_files and not subtitle_files:
log.error(f"No video or subtitle files found for movie {movie.name}")
return False
movie_file_name = f"{remove_special_characters(movie.name)} ({movie.year})"
movie_root_path = self.get_media_root_path(media=movie)
if file_path_suffix:
movie_file_name += f" - {file_path_suffix}"
imported_any = False
try:
movie_root_path.mkdir(parents=True, exist_ok=True)
if video_files:
target_video_file = (
movie_root_path / f"{movie_file_name}{video_files[0].suffix}"
)
import_file(target_file=target_video_file, source_file=video_files[0])
imported_any = True
for subtitle_file in subtitle_files:
match = re.search(
r"[. ]([a-z]{2})\.srt$", subtitle_file.name, re.IGNORECASE
)
if match:
lang = match.group(1)
target = movie_root_path / f"{movie_file_name}.{lang}.srt"
import_file(target_file=target, source_file=subtitle_file)
imported_any = True
except Exception:
log.exception(f"Failed to import movie {movie.name}")
return False
else:
return imported_any
def import_torrent_files(self, torrent: Torrent, movie: Movie) -> None:
video_files, subtitle_files, _ = get_files_for_import(torrent=torrent)
if len(video_files) != 1:
self.notify_import_failure(
movie.name,
"movie",
"Multiple video files found. Manual import required.",
)
return
movie_files = self.torrent_service.get_movie_files_of_torrent(torrent=torrent)
if not movie_files:
torrent.imported = False
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
self.notify_import_failure(movie.name, "movie")
return
success = [
self.import_movie(movie, video_files, subtitle_files, mf.file_path_suffix)
for mf in movie_files
]
if all(success):
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
self.notify_import_success(movie.name, "movie")
else:
self.notify_import_failure(movie.name, "movie")
def get_import_candidates(
self, movie_path: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
return super().get_import_candidates(
directory=movie_path,
metadata_provider=metadata_provider,
search_func=self.movie_metadata_service.search_for_movie,
)
def import_existing_movie(self, movie: Movie, source_directory: Path) -> bool:
def _logic(m: Movie, path: Path, add_cb: Callable[[MovieFile], None]) -> bool:
v, s, _ = get_files_for_import(directory=path)
res = self.import_movie(m, v, s, "IMPORTED")
if res:
add_cb(
MovieFile(
movie_id=m.id,
file_path_suffix="IMPORTED",
torrent_id=None,
quality=Quality.unknown,
)
)
return res
return self.import_existing_media(
media=movie,
source_directory=source_directory,
import_func=_logic,
add_file_record_func=self.movie_repository.add_movie_file,
)
def get_importable_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
return self.get_importable_media(
root_path=MediaManagerConfig().misc.movie_directory,
metadata_provider=metadata_provider,
get_candidates_func=self.get_import_candidates,
)
def import_all_torrents(self) -> None:
self.import_all_torrents_base(
get_media_func=self.torrent_service.get_movie_of_torrent,
import_torrent_func=self.import_torrent_files,
media_type_name="movie",
)

View File

@@ -0,0 +1,86 @@
import logging
from media_manager.common.service import BaseMetadataService
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import Movie
log = logging.getLogger(__name__)
class MovieMetadataService(BaseMetadataService[Movie, Movie]):
def __init__(self, movie_repository: MovieRepository) -> None:
super().__init__(repository=movie_repository)
self.movie_repository = movie_repository
def add_movie(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Movie:
return self.add_media_base(
external_id=external_id,
metadata_provider=metadata_provider,
get_metadata_func=metadata_provider.get_movie_metadata,
save_func=self.movie_repository.save_movie,
download_poster_func=metadata_provider.download_movie_poster_image,
language=language,
)
def search_for_movie(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.search_for_media_base(
query=query,
metadata_provider=metadata_provider,
search_func=metadata_provider.search_movie,
get_by_external_id_func=self.movie_repository.get_movie_by_external_id,
)
def get_popular_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.get_popular_media_base(
metadata_provider=metadata_provider,
search_func=metadata_provider.search_movie,
)
def update_movie_metadata(
self, db_movie: Movie, metadata_provider: AbstractMetadataProvider
) -> Movie | None:
"""
Updates the metadata of a movie.
"""
log.debug(f"Found movie: {db_movie.name} for metadata update.")
fresh_movie_data = metadata_provider.get_movie_metadata(
movie_id=db_movie.external_id, language=db_movie.original_language
)
if not fresh_movie_data:
log.warning(f"Could not fetch fresh metadata for movie: {db_movie.name}")
return None
self.movie_repository.update_movie_attributes(
movie_id=db_movie.id,
name=fresh_movie_data.name,
overview=fresh_movie_data.overview,
year=fresh_movie_data.year,
imdb_id=fresh_movie_data.imdb_id,
)
updated_movie = self.movie_repository.get_movie_by_id(db_movie.id)
metadata_provider.download_movie_poster_image(movie=updated_movie)
return updated_movie
def update_all_metadata(self) -> None:
self.update_all_metadata_base(
get_all_to_update_func=self.movie_repository.get_movies,
update_single_func=self.update_movie_metadata,
tmdb_provider_class=TmdbMetadataProvider,
tvdb_provider_class=TvdbMetadataProvider,
media_type_name="movie",
)

View File

@@ -3,37 +3,21 @@ from uuid import UUID
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, UniqueConstraint from sqlalchemy import ForeignKey, PrimaryKeyConstraint, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from media_manager.common.models import MediaFileMixin, MediaMixin
from media_manager.database import Base from media_manager.database import Base
from media_manager.torrent.models import Quality
class Movie(Base): class Movie(Base, MediaMixin):
__tablename__ = "movie" __tablename__ = "movie"
__table_args__ = (UniqueConstraint("external_id", "metadata_provider"),) __table_args__ = (UniqueConstraint("external_id", "metadata_provider"),)
id: Mapped[UUID] = mapped_column(primary_key=True)
external_id: Mapped[int]
metadata_provider: Mapped[str]
name: Mapped[str]
overview: Mapped[str]
year: Mapped[int | None]
library: Mapped[str] = mapped_column(default="")
original_language: Mapped[str | None] = mapped_column(default=None)
imdb_id: Mapped[str | None] = mapped_column(default=None)
class MovieFile(Base, MediaFileMixin):
class MovieFile(Base):
__tablename__ = "movie_file" __tablename__ = "movie_file"
__table_args__ = (PrimaryKeyConstraint("movie_id", "file_path_suffix"),) __table_args__ = (PrimaryKeyConstraint("movie_id", "file_path_suffix"),)
movie_id: Mapped[UUID] = mapped_column( movie_id: Mapped[UUID] = mapped_column(
ForeignKey(column="movie.id", ondelete="CASCADE"), ForeignKey(column="movie.id", ondelete="CASCADE"),
) )
file_path_suffix: Mapped[str]
quality: Mapped[Quality]
torrent_id: Mapped[UUID | None] = mapped_column(
ForeignKey(column="torrent.id", ondelete="SET NULL"),
)
torrent = relationship("Torrent", back_populates="movie_files", uselist=False) torrent = relationship("Torrent", back_populates="movie_files", uselist=False)

View File

@@ -1,13 +1,11 @@
import logging import logging
from sqlalchemy import delete, select from sqlalchemy import select
from sqlalchemy.exc import ( from sqlalchemy.exc import SQLAlchemyError
IntegrityError,
SQLAlchemyError,
)
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from media_manager.exceptions import ConflictError, NotFoundError from media_manager.common.repository import BaseRepository
from media_manager.exceptions import NotFoundError
from media_manager.movies.models import Movie, MovieFile from media_manager.movies.models import Movie, MovieFile
from media_manager.movies.schemas import ( from media_manager.movies.schemas import (
Movie as MovieSchema, Movie as MovieSchema,
@@ -27,219 +25,48 @@ from media_manager.torrent.schemas import TorrentId
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class MovieRepository: class MovieRepository(BaseRepository[Movie, MovieSchema]):
""" """
Repository for managing movies in the database. Repository for managing movies in the database.
Provides methods to retrieve, save, and delete movies. Provides methods to retrieve, save, and delete movies.
""" """
def __init__(self, db: Session) -> None: def __init__(self, db: Session) -> None:
self.db = db super().__init__(db, Movie, MovieSchema)
def get_movie_by_id(self, movie_id: MovieId) -> MovieSchema: def get_movie_by_id(self, movie_id: MovieId) -> MovieSchema:
""" return self.get_by_id(entity_id=movie_id)
Retrieve a movie by its ID.
:param movie_id: The ID of the movie to retrieve.
:return: A Movie object if found.
:raises NotFoundError: If the movie with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(Movie).where(Movie.id == movie_id)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Movie with id {movie_id} not found."
raise NotFoundError(msg)
return MovieSchema.model_validate(result)
except SQLAlchemyError:
log.exception(f"Database error while retrieving movie {movie_id}")
raise
def get_movie_by_external_id( def get_movie_by_external_id(
self, external_id: int, metadata_provider: str self, external_id: int, metadata_provider: str
) -> MovieSchema: ) -> MovieSchema:
""" return self.get_by_external_id(
Retrieve a movie by its external ID. external_id=external_id, metadata_provider=metadata_provider
)
:param external_id: The ID of the movie to retrieve.
:param metadata_provider: The metadata provider associated with the ID.
:return: A Movie object if found.
:raises NotFoundError: If the movie with the given external ID and provider is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Movie)
.where(Movie.external_id == external_id)
.where(Movie.metadata_provider == metadata_provider)
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Movie with external_id {external_id} and provider {metadata_provider} not found."
raise NotFoundError(msg)
return MovieSchema.model_validate(result)
except SQLAlchemyError:
log.exception(
f"Database error while retrieving movie by external_id {external_id}"
)
raise
def get_movies(self) -> list[MovieSchema]: def get_movies(self) -> list[MovieSchema]:
""" return self.get_all()
Retrieve all movies from the database.
:return: A list of Movie objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(Movie)
results = self.db.execute(stmt).scalars().unique().all()
return [MovieSchema.model_validate(movie) for movie in results]
except SQLAlchemyError:
log.exception("Database error while retrieving all movies")
raise
def save_movie(self, movie: MovieSchema) -> MovieSchema:
"""
Save a new movie or update an existing one in the database.
:param movie: The Movie object to save.
:return: The saved Movie object.
:raises ValueError: If a movie with the same primary key already exists (on insert).
:raises SQLAlchemyError: If a database error occurs.
"""
log.debug(f"Attempting to save movie: {movie.name} (ID: {movie.id})")
db_movie = self.db.get(Movie, movie.id) if movie.id else None
if db_movie: # Update existing movie
log.debug(f"Updating existing movie with ID: {movie.id}")
db_movie.external_id = movie.external_id
db_movie.metadata_provider = movie.metadata_provider
db_movie.name = movie.name
db_movie.overview = movie.overview
db_movie.year = movie.year
db_movie.original_language = movie.original_language
db_movie.imdb_id = movie.imdb_id
else: # Insert new movie
log.debug(f"Creating new movie: {movie.name}")
db_movie = Movie(**movie.model_dump())
self.db.add(db_movie)
try:
self.db.commit()
self.db.refresh(db_movie)
log.info(f"Successfully saved movie: {db_movie.name} (ID: {db_movie.id})")
return MovieSchema.model_validate(db_movie)
except IntegrityError as e:
self.db.rollback()
log.exception(f"Integrity error while saving movie {movie.name}")
msg = (
f"Movie with this primary key or unique constraint violation: {e.orig}"
)
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while saving movie {movie.name}")
raise
def delete_movie(self, movie_id: MovieId) -> None: def delete_movie(self, movie_id: MovieId) -> None:
""" self.delete(entity_id=movie_id)
Delete a movie by its ID.
:param movie_id: The ID of the movie to delete.
:raises NotFoundError: If the movie with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
log.debug(f"Attempting to delete movie with id: {movie_id}")
try:
movie = self.db.get(Movie, movie_id)
if not movie:
log.warning(f"Movie with id {movie_id} not found for deletion.")
msg = f"Movie with id {movie_id} not found."
raise NotFoundError(msg)
self.db.delete(movie)
self.db.commit()
log.info(f"Successfully deleted movie with id: {movie_id}")
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while deleting movie {movie_id}")
raise
def set_movie_library(self, movie_id: MovieId, library: str) -> None: def set_movie_library(self, movie_id: MovieId, library: str) -> None:
""" self.set_library(entity_id=movie_id, library=library)
Sets the library for a movie.
:param movie_id: The ID of the movie to update. def save_movie(self, movie: MovieSchema) -> MovieSchema:
:param library: The library path to set for the movie. return self.save_media_base(media_schema=movie, model_class=Movie)
:raises NotFoundError: If the movie with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
movie = self.db.get(Movie, movie_id)
if not movie:
msg = f"movie with id {movie_id} not found."
raise NotFoundError(msg)
movie.library = library
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error setting library for movie {movie_id}")
raise
def add_movie_file(self, movie_file: MovieFileSchema) -> MovieFileSchema: def add_movie_file(self, movie_file: MovieFileSchema) -> MovieFileSchema:
""" return self.add_media_file_base(
Adds a movie file record to the database. file_schema=movie_file, model_class=MovieFile, schema_class=MovieFileSchema
)
:param movie_file: The MovieFile object to add.
:return: The added MovieFile object.
:raises IntegrityError: If the record violates constraints.
:raises SQLAlchemyError: If a database error occurs.
"""
db_model = MovieFile(**movie_file.model_dump())
try:
self.db.add(db_model)
self.db.commit()
self.db.refresh(db_model)
return MovieFileSchema.model_validate(db_model)
except IntegrityError:
self.db.rollback()
log.exception("Integrity error while adding movie file")
raise
except SQLAlchemyError:
self.db.rollback()
log.exception("Database error while adding movie file")
raise
def remove_movie_files_by_torrent_id(self, torrent_id: TorrentId) -> int: def remove_movie_files_by_torrent_id(self, torrent_id: TorrentId) -> int:
""" return self.remove_files_by_torrent_id_base(
Removes movie file records associated with a given torrent ID. torrent_id=torrent_id, model_class=MovieFile
)
:param torrent_id: The ID of the torrent whose movie files are to be removed.
:return: The number of movie files removed.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = delete(MovieFile).where(MovieFile.torrent_id == torrent_id)
result = self.db.execute(stmt)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(
f"Database error removing movie files for torrent_id {torrent_id}"
)
raise
return result.rowcount
def get_movie_files_by_movie_id(self, movie_id: MovieId) -> list[MovieFileSchema]: def get_movie_files_by_movie_id(self, movie_id: MovieId) -> list[MovieFileSchema]:
"""
Retrieve all movie files for a given movie ID.
:param movie_id: The ID of the movie.
:return: A list of MovieFile objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = select(MovieFile).where(MovieFile.movie_id == movie_id) stmt = select(MovieFile).where(MovieFile.movie_id == movie_id)
results = self.db.execute(stmt).scalars().all() results = self.db.execute(stmt).scalars().all()
@@ -251,13 +78,6 @@ class MovieRepository:
raise raise
def get_torrents_by_movie_id(self, movie_id: MovieId) -> list[MovieTorrentSchema]: def get_torrents_by_movie_id(self, movie_id: MovieId) -> list[MovieTorrentSchema]:
"""
Retrieve all torrents associated with a given movie ID.
:param movie_id: The ID of the movie.
:return: A list of Torrent objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = ( stmt = (
select(Torrent, MovieFile.file_path_suffix) select(Torrent, MovieFile.file_path_suffix)
@@ -278,20 +98,13 @@ class MovieRepository:
usenet=torrent.usenet, usenet=torrent.usenet,
) )
formatted_results.append(movie_torrent) formatted_results.append(movie_torrent)
except SQLAlchemyError: except SQLAlchemyError:
log.exception(f"Database error retrieving torrents for movie_id {movie_id}") log.exception(f"Database error retrieving torrents for movie_id {movie_id}")
raise raise
else:
return formatted_results return formatted_results
def get_all_movies_with_torrents(self) -> list[MovieSchema]: def get_all_movies_with_torrents(self) -> list[MovieSchema]:
"""
Retrieve all movies that are associated with a torrent, ordered alphabetically by movie name.
:return: A list of Movie objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = ( stmt = (
select(Movie) select(Movie)
@@ -307,14 +120,6 @@ class MovieRepository:
raise raise
def get_movie_by_torrent_id(self, torrent_id: TorrentId) -> MovieSchema: def get_movie_by_torrent_id(self, torrent_id: TorrentId) -> MovieSchema:
"""
Retrieve a movie by a torrent ID.
:param torrent_id: The ID of the torrent to retrieve the movie for.
:return: A Movie object.
:raises NotFoundError: If the movie for the given torrent ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = ( stmt = (
select(Movie) select(Movie)
@@ -325,10 +130,11 @@ class MovieRepository:
if not result: if not result:
msg = f"Movie for torrent_id {torrent_id} not found." msg = f"Movie for torrent_id {torrent_id} not found."
raise NotFoundError(msg) raise NotFoundError(msg)
return MovieSchema.model_validate(result)
except SQLAlchemyError: except SQLAlchemyError:
log.exception(f"Database error retrieving movie by torrent_id {torrent_id}") log.exception(f"Database error retrieving movie by torrent_id {torrent_id}")
raise raise
else:
return MovieSchema.model_validate(result)
def update_movie_attributes( def update_movie_attributes(
self, self,
@@ -338,36 +144,11 @@ class MovieRepository:
year: int | None = None, year: int | None = None,
imdb_id: str | None = None, imdb_id: str | None = None,
) -> MovieSchema: ) -> MovieSchema:
""" return self.update_media_attributes_base(
Update attributes of an existing movie. media_id=movie_id,
model_class=Movie,
:param imdb_id: The new IMDb ID for the movie. name=name,
:param movie_id: The ID of the movie to update. overview=overview,
:param name: The new name for the movie. year=year,
:param overview: The new overview for the movie. imdb_id=imdb_id,
:param year: The new year for the movie. )
:return: The updated MovieSchema object.
"""
db_movie = self.db.get(Movie, movie_id)
if not db_movie:
msg = f"Movie with id {movie_id} not found."
raise NotFoundError(msg)
updated = False
if name is not None and db_movie.name != name:
db_movie.name = name
updated = True
if overview is not None and db_movie.overview != overview:
db_movie.overview = overview
updated = True
if year is not None and db_movie.year != year:
db_movie.year = year
updated = True
if imdb_id is not None and db_movie.imdb_id != imdb_id:
db_movie.imdb_id = imdb_id
updated = True
if updated:
self.db.commit()
self.db.refresh(db_movie)
return MovieSchema.model_validate(db_movie)

View File

@@ -13,6 +13,8 @@ from media_manager.metadataProvider.dependencies import metadata_provider_dep
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.movies.dependencies import ( from media_manager.movies.dependencies import (
movie_dep, movie_dep,
movie_import_service_dep,
movie_metadata_service_dep,
movie_service_dep, movie_service_dep,
) )
from media_manager.movies.schemas import ( from media_manager.movies.schemas import (
@@ -38,13 +40,13 @@ router = APIRouter()
) )
def search_for_movie( def search_for_movie(
query: str, query: str,
movie_service: movie_service_dep, movie_metadata_service: movie_metadata_service_dep,
metadata_provider: metadata_provider_dep, metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]: ) -> list[MetaDataProviderSearchResult]:
""" """
Search for a movie on the configured metadata provider. Search for a movie on the configured metadata provider.
""" """
return movie_service.search_for_movie( return movie_metadata_service.search_for_movie(
query=query, metadata_provider=metadata_provider query=query, metadata_provider=metadata_provider
) )
@@ -54,13 +56,15 @@ def search_for_movie(
dependencies=[Depends(current_active_user)], dependencies=[Depends(current_active_user)],
) )
def get_popular_movies( def get_popular_movies(
movie_service: movie_service_dep, movie_metadata_service: movie_metadata_service_dep,
metadata_provider: metadata_provider_dep, metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]: ) -> list[MetaDataProviderSearchResult]:
""" """
Get a list of recommended/popular movies from the metadata provider. Get a list of recommended/popular movies from the metadata provider.
""" """
return movie_service.get_popular_movies(metadata_provider=metadata_provider) return movie_metadata_service.get_popular_movies(
metadata_provider=metadata_provider
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -74,12 +78,15 @@ def get_popular_movies(
dependencies=[Depends(current_superuser)], dependencies=[Depends(current_superuser)],
) )
def get_all_importable_movies( def get_all_importable_movies(
movie_service: movie_service_dep, metadata_provider: metadata_provider_dep movie_import_service: movie_import_service_dep,
metadata_provider: metadata_provider_dep,
) -> list[MediaImportSuggestion]: ) -> list[MediaImportSuggestion]:
""" """
Get a list of unknown movies that were detected in the movie directory and are importable. Get a list of unknown movies that were detected in the movie directory and are importable.
""" """
return movie_service.get_importable_movies(metadata_provider=metadata_provider) return movie_import_service.get_importable_movies(
metadata_provider=metadata_provider
)
@router.post( @router.post(
@@ -88,7 +95,7 @@ def get_all_importable_movies(
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
) )
def import_detected_movie( def import_detected_movie(
movie_service: movie_service_dep, movie: movie_dep, directory: str movie_import_service: movie_import_service_dep, movie: movie_dep, directory: str
) -> None: ) -> None:
""" """
Import a detected movie from the specified directory into the library. Import a detected movie from the specified directory into the library.
@@ -98,7 +105,7 @@ def import_detected_movie(
MediaManagerConfig().misc.movie_directory MediaManagerConfig().misc.movie_directory
): ):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory") raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")
success = movie_service.import_existing_movie( success = movie_import_service.import_existing_movie(
movie=movie, source_directory=source_directory movie=movie, source_directory=source_directory
) )
if not success: if not success:
@@ -133,7 +140,7 @@ def get_all_movies(movie_service: movie_service_dep) -> list[Movie]:
}, },
) )
def add_a_movie( def add_a_movie(
movie_service: movie_service_dep, movie_metadata_service: movie_metadata_service_dep,
metadata_provider: metadata_provider_dep, metadata_provider: metadata_provider_dep,
movie_id: int, movie_id: int,
language: str | None = None, language: str | None = None,
@@ -142,13 +149,13 @@ def add_a_movie(
Add a new movie to the library. Add a new movie to the library.
""" """
try: try:
movie = movie_service.add_movie( movie = movie_metadata_service.add_movie(
external_id=movie_id, external_id=movie_id,
metadata_provider=metadata_provider, metadata_provider=metadata_provider,
language=language, language=language,
) )
except ConflictError: except ConflictError:
movie = movie_service.get_movie_by_external_id( movie = movie_metadata_service.movie_repository.get_movie_by_external_id(
external_id=movie_id, metadata_provider=metadata_provider.name external_id=movie_id, metadata_provider=metadata_provider.name
) )
if not movie: if not movie:

View File

@@ -4,34 +4,19 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from media_manager.common.schemas import BaseMedia, BaseMediaFile
from media_manager.torrent.models import Quality from media_manager.torrent.models import Quality
from media_manager.torrent.schemas import TorrentId, TorrentStatus from media_manager.torrent.schemas import TorrentId, TorrentStatus
MovieId = typing.NewType("MovieId", UUID) MovieId = typing.NewType("MovieId", UUID)
class Movie(BaseModel): class Movie(BaseMedia):
model_config = ConfigDict(from_attributes=True)
id: MovieId = Field(default_factory=lambda: MovieId(uuid.uuid4())) id: MovieId = Field(default_factory=lambda: MovieId(uuid.uuid4()))
name: str
overview: str
year: int | None
external_id: int
metadata_provider: str
library: str = "Default"
original_language: str | None = None
imdb_id: str | None = None
class MovieFile(BaseModel): class MovieFile(BaseMediaFile):
model_config = ConfigDict(from_attributes=True)
movie_id: MovieId movie_id: MovieId
file_path_suffix: str
quality: Quality
torrent_id: TorrentId | None = None
class PublicMovieFile(MovieFile): class PublicMovieFile(MovieFile):
@@ -52,7 +37,7 @@ class MovieTorrent(BaseModel):
class PublicMovie(Movie): class PublicMovie(Movie):
downloaded: bool = False downloaded: bool = False
torrents: list[MovieTorrent] = [] torrents: list[MovieTorrent] = Field(default_factory=list)
class RichMovieTorrent(BaseModel): class RichMovieTorrent(BaseModel):

View File

@@ -1,22 +1,16 @@
import re import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import overload
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig from media_manager.config import MediaManagerConfig
from media_manager.exceptions import InvalidConfigError, NotFoundError, RenameError
from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId
from media_manager.indexer.service import IndexerService from media_manager.indexer.service import IndexerService
from media_manager.indexer.utils import evaluate_indexer_query_results from media_manager.indexer.utils import evaluate_indexer_query_results
from media_manager.metadataProvider.abstract_metadata_provider import ( from media_manager.movies.importer import MovieImportService
AbstractMetadataProvider, from media_manager.movies.metadata import MovieMetadataService
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.movies import log
from media_manager.movies.repository import MovieRepository from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import ( from media_manager.movies.schemas import (
Movie, Movie,
@@ -27,58 +21,33 @@ from media_manager.movies.schemas import (
RichMovieTorrent, RichMovieTorrent,
) )
from media_manager.notification.service import NotificationService from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import ( from media_manager.torrent.schemas import (
Quality,
Torrent, Torrent,
TorrentStatus,
) )
from media_manager.torrent.service import TorrentService from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
extract_external_id_from_string, log = logging.getLogger(__name__)
get_files_for_import,
get_importable_media_directories,
import_file,
remove_special_characters,
remove_special_chars_and_parentheses,
)
class MovieService: class MovieService(BaseMediaService[Movie, Movie]):
def __init__( def __init__(
self, self,
movie_repository: MovieRepository, movie_repository: MovieRepository,
torrent_service: TorrentService, torrent_service: TorrentService,
indexer_service: IndexerService, indexer_service: IndexerService,
notification_service: NotificationService, notification_service: NotificationService,
movie_import_service: MovieImportService,
movie_metadata_service: MovieMetadataService,
) -> None: ) -> None:
self.movie_repository = movie_repository super().__init__(
self.torrent_service = torrent_service repository=movie_repository,
self.indexer_service = indexer_service torrent_service=torrent_service,
self.notification_service = notification_service indexer_service=indexer_service,
notification_service=notification_service,
def add_movie(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Movie:
"""
Add a new movie to the database.
:param external_id: The ID of the movie in the metadata provider's system.
:param metadata_provider: The name of the metadata provider.
:param language: Optional language code (ISO 639-1) to fetch metadata in.
"""
movie_with_metadata = metadata_provider.get_movie_metadata(
movie_id=external_id, language=language
) )
if not movie_with_metadata: self.movie_repository = movie_repository
raise NotFoundError self.movie_import_service = movie_import_service
self.movie_metadata_service = movie_metadata_service
saved_movie = self.movie_repository.save_movie(movie=movie_with_metadata)
metadata_provider.download_movie_poster_image(movie=saved_movie)
return saved_movie
def delete_movie( def delete_movie(
self, self,
@@ -121,12 +90,10 @@ class MovieService:
) )
log.info(f"Deleted torrent: {torrent.torrent_title}") log.info(f"Deleted torrent: {torrent.torrent_title}")
except Exception: except Exception:
log.warning( log.exception(f"Failed to delete torrent {torrent.hash}")
f"Failed to delete torrent {torrent.hash}", exc_info=True
)
# Delete from database # Delete from database
self.movie_repository.delete_movie(movie_id=movie.id) self.movie_repository.delete_movie(movie.id)
def get_public_movie_files(self, movie: Movie) -> list[PublicMovieFile]: def get_public_movie_files(self, movie: Movie) -> list[PublicMovieFile]:
""" """
@@ -145,62 +112,6 @@ class MovieService:
result.append(movie_file) result.append(movie_file)
return result return result
@overload
def check_if_movie_exists(
self, *, external_id: int, metadata_provider: str
) -> bool:
"""
Check if a movie exists in the database.
:param external_id: The external ID of the movie.
:param metadata_provider: The metadata provider.
:return: True if the movie exists, False otherwise.
"""
@overload
def check_if_movie_exists(self, *, movie_id: MovieId) -> bool:
"""
Check if a movie exists in the database.
:param movie_id: The ID of the movie.
:return: True if the movie exists, False otherwise.
"""
def check_if_movie_exists(
self,
*,
external_id=None,
metadata_provider=None,
movie_id=None,
) -> bool:
"""
Check if a movie exists in the database.
:param external_id: The external ID of the movie.
:param metadata_provider: The metadata provider.
:param movie_id: The ID of the movie.
:return: True if the movie exists, False otherwise.
:raises ValueError: If neither external ID and metadata provider nor movie ID are provided.
"""
if not (external_id is None or metadata_provider is None):
try:
self.movie_repository.get_movie_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
except NotFoundError:
return False
elif movie_id is not None:
try:
self.movie_repository.get_movie_by_id(movie_id=movie_id)
except NotFoundError:
return False
else:
msg = "Use one of the provided overloads for this function!"
raise ValueError(msg)
return True
def get_all_available_torrents_for_movie( def get_all_available_torrents_for_movie(
self, movie: Movie, search_query_override: str | None = None self, movie: Movie, search_query_override: str | None = None
) -> list[IndexerQueryResult]: ) -> list[IndexerQueryResult]:
@@ -220,63 +131,6 @@ class MovieService:
is_tv=False, query_results=torrents, media=movie is_tv=False, query_results=torrents, media=movie
) )
def get_all_movies(self) -> list[Movie]:
"""
Get all movies.
:return: A list of all movies.
"""
return self.movie_repository.get_movies()
def search_for_movie(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Search for movies using a given query.
:param query: The search query.
:param metadata_provider: The metadata provider to search.
:return: A list of metadata provider movie search results.
"""
results = metadata_provider.search_movie(query)
for result in results:
if self.check_if_movie_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
):
result.added = True
# Fetch the internal movie ID.
try:
movie = self.movie_repository.get_movie_by_external_id(
external_id=result.external_id,
metadata_provider=metadata_provider.name,
)
result.id = movie.id
except Exception:
log.error(
f"Unable to find internal movie ID for {result.external_id} on {metadata_provider.name}"
)
return results
def get_popular_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Get popular movies from a given metadata provider.
:param metadata_provider: The metadata provider to use.
:return: A list of metadata provider movie search results.
"""
results = metadata_provider.search_movie()
return [
result
for result in results
if not self.check_if_movie_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
)
]
def get_public_movie_by_id(self, movie: Movie) -> PublicMovie: def get_public_movie_by_id(self, movie: Movie) -> PublicMovie:
""" """
Get a public movie from a Movie object. Get a public movie from a Movie object.
@@ -297,7 +151,7 @@ class MovieService:
:param movie_id: The ID of the movie. :param movie_id: The ID of the movie.
:return: The movie. :return: The movie.
""" """
return self.movie_repository.get_movie_by_id(movie_id=movie_id) return self.movie_repository.get_movie_by_id(movie_id)
def is_movie_downloaded(self, movie: Movie) -> bool: def is_movie_downloaded(self, movie: Movie) -> bool:
""" """
@@ -326,9 +180,7 @@ class MovieService:
torrent_file = self.torrent_service.get_torrent_by_id( torrent_file = self.torrent_service.get_torrent_by_id(
torrent_id=movie_file.torrent_id torrent_id=movie_file.torrent_id
) )
if torrent_file.imported: return bool(torrent_file.imported)
return True
return False
def get_movie_by_external_id( def get_movie_by_external_id(
self, external_id: int, metadata_provider: str self, external_id: int, metadata_provider: str
@@ -345,7 +197,13 @@ class MovieService:
) )
def set_movie_library(self, movie: Movie, library: str) -> None: def set_movie_library(self, movie: Movie, library: str) -> None:
self.movie_repository.set_movie_library(movie_id=movie.id, library=library) self.movie_repository.set_movie_library(movie.id, library)
def get_all_movies(self) -> list[Movie]:
"""
Get all movies in the library.
"""
return self.get_all_media()
def get_torrents_for_movie(self, movie: Movie) -> RichMovieTorrent: def get_torrents_for_movie(self, movie: Movie) -> RichMovieTorrent:
""" """
@@ -418,288 +276,20 @@ class MovieService:
def get_movie_root_path(self, movie: Movie) -> Path: def get_movie_root_path(self, movie: Movie) -> Path:
misc_config = MediaManagerConfig().misc misc_config = MediaManagerConfig().misc
movie_file_path = ( return self.get_root_directory(
misc_config.movie_directory media=movie,
/ f"{remove_special_characters(movie.name)} ({movie.year}) [{movie.metadata_provider}id-{movie.external_id}]" default_dir=misc_config.movie_directory,
libraries=misc_config.movie_libraries,
) )
log.debug(
f"Movie {movie.name} without special characters: {remove_special_characters(movie.name)}"
)
if movie.library != "Default":
for library in misc_config.movie_libraries:
if library.name == movie.library:
log.debug(f"Using library {library.name} for movie {movie.name}")
return (
Path(library.path)
/ f"{remove_special_characters(movie.name)} ({movie.year}) [{movie.metadata_provider}id-{movie.external_id}]"
)
else:
log.warning(
f"Library {movie.library} not found in config, using default library"
)
return movie_file_path
def import_movie(
self,
movie: Movie,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
movie_file_name = f"{remove_special_characters(movie.name)} ({movie.year})"
movie_root_path = self.get_movie_root_path(movie=movie)
success: bool = False
if file_path_suffix != "":
movie_file_name += f" - {file_path_suffix}"
try:
movie_root_path.mkdir(parents=True, exist_ok=True)
except Exception:
log.exception("Failed to create directory {movie_root_path}")
return False
# import movie video
if video_files:
target_video_file = (
movie_root_path / f"{movie_file_name}{video_files[0].suffix}"
)
import_file(target_file=target_video_file, source_file=video_files[0])
success = True
# import subtitles
for subtitle_file in subtitle_files:
language_code_match = re.search(
r"[. ]([a-z]{2})\.srt$", subtitle_file.name, re.IGNORECASE
)
if not language_code_match:
log.warning(
f"Subtitle file {subtitle_file.name} does not match expected format, can't extract language code, skipping."
)
continue
language_code = language_code_match.group(1)
target_subtitle_file = (
movie_root_path / f"{movie_file_name}.{language_code}.srt"
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
return success
def import_torrent_files(self, torrent: Torrent, movie: Movie) -> None:
"""
Organizes files from a torrent into the movie directory structure.
:param torrent: The Torrent object
:param movie: The Movie object
"""
video_files, subtitle_files, _all_files = get_files_for_import(torrent=torrent)
if len(video_files) != 1:
# Send notification about multiple video files found
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Manual Import Required",
message=f"Multiple video files found for movie {movie.name}. Please import manually.",
)
log.error(
f"Found {len(video_files)} video files for movie {movie.name}, expected 1. Skipping auto import."
)
return
log.debug(
f"Importing these {len(video_files)} video files and {len(subtitle_files)} subtitle files"
)
movie_files: list[MovieFile] = self.torrent_service.get_movie_files_of_torrent(
torrent=torrent
)
log.info(
f"Found {len(movie_files)} movie files associated with torrent {torrent.title}"
)
success = [
self.import_movie(
movie, video_files, subtitle_files, movie_file.file_path_suffix
)
for movie_file in movie_files
]
if all(success):
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Movie Downloaded",
message=f"Movie {movie.name} has been successfully downloaded and imported.",
)
else:
log.error(
f"Failed to import files for torrent {torrent.title}. Check logs for details."
)
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Import Failed",
message=f"Failed to import files for movie {movie.name}. Please check logs.",
)
log.info(f"Finished importing files for torrent {torrent.title}")
def get_import_candidates(
self, movie: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_movie(
query=remove_special_chars_and_parentheses(movie.name),
metadata_provider=metadata_provider,
)
import_candidates = MediaImportSuggestion(
directory=movie,
candidates=search_result,
)
log.debug(
f"Found {len(search_result)} candidates for {movie.name} in {movie.parent}"
)
return import_candidates
def import_existing_movie(self, movie: Movie, source_directory: Path) -> bool:
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.exception(f"Failed to rename {source_directory} to {new_source_path}")
raise RenameError from e
video_files, subtitle_files, _all_files = get_files_for_import(
directory=new_source_path
)
success = self.import_movie(
movie=movie,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
)
if success:
self.movie_repository.add_movie_file(
MovieFile(
movie_id=movie.id,
file_path_suffix="IMPORTED",
torrent_id=None,
quality=Quality.unknown,
)
)
return success
def update_movie_metadata(
self, db_movie: Movie, metadata_provider: AbstractMetadataProvider
) -> Movie | None:
"""
Updates the metadata of a movie.
:param metadata_provider: The metadata provider object to fetch fresh data from.
:param db_movie: The Movie to update
:return: The updated Movie object, or None if the movie is not found or an error occurs.
"""
log.debug(f"Found movie: {db_movie.name} for metadata update.")
# Use stored original_language preference for metadata fetching
fresh_movie_data = metadata_provider.get_movie_metadata(
movie_id=db_movie.external_id, language=db_movie.original_language
)
if not fresh_movie_data:
log.warning(
f"Could not fetch fresh metadata for movie: {db_movie.name} ({db_movie.year})"
)
return None
log.debug(f"Fetched fresh metadata for movie: {fresh_movie_data.name}")
self.movie_repository.update_movie_attributes(
movie_id=db_movie.id,
name=fresh_movie_data.name,
overview=fresh_movie_data.overview,
year=fresh_movie_data.year,
imdb_id=fresh_movie_data.imdb_id,
)
updated_movie = self.movie_repository.get_movie_by_id(movie_id=db_movie.id)
log.info(
f"Successfully updated metadata for movie: {db_movie.name} ({db_movie.year})"
)
metadata_provider.download_movie_poster_image(movie=updated_movie)
return updated_movie
def get_importable_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
movie_root_path = MediaManagerConfig().misc.movie_directory
importable_movies: list[MediaImportSuggestion] = []
candidate_dirs = get_importable_media_directories(movie_root_path)
for movie_dir in candidate_dirs:
metadata, external_id = extract_external_id_from_string(movie_dir.name)
if metadata is not None and external_id is not None:
try:
self.movie_repository.get_movie_by_external_id(
external_id=external_id, metadata_provider=metadata
)
log.debug(
f"Movie {movie_dir.name} already exists in the database, skipping."
)
continue
except NotFoundError:
log.debug(
f"Movie {movie_dir.name} not found in database, checking for import candidates."
)
import_candidates = self.get_import_candidates(
movie=movie_dir, metadata_provider=metadata_provider
)
importable_movies.append(import_candidates)
log.debug(f"Found {len(importable_movies)} importable movies.")
return importable_movies
def import_all_torrents(self) -> None: def import_all_torrents(self) -> None:
log.info("Importing all torrents") """
torrents = self.torrent_service.get_all_torrents() Delegate to MovieImportService.
log.info("Found %d torrents to import", len(torrents)) """
for t in torrents: self.movie_import_service.import_all_torrents()
try:
if not t.imported and t.status == TorrentStatus.finished:
movie = self.torrent_service.get_movie_of_torrent(torrent=t)
if movie is None:
log.warning(
f"torrent {t.title} is not a movie torrent, skipping import."
)
continue
self.import_torrent_files(torrent=t, movie=movie)
except RuntimeError:
log.exception(f"Failed to import torrent {t.title}")
log.info("Finished importing all torrents")
def update_all_metadata(self) -> None: def update_all_metadata(self) -> None:
"""Updates the metadata of all movies.""" """
log.info("Updating metadata for all movies") Delegate to MovieMetadataService.
movies = self.movie_repository.get_movies() """
log.info(f"Found {len(movies)} movies to update") self.movie_metadata_service.update_all_metadata()
for movie in movies:
try:
if movie.metadata_provider == "tmdb":
metadata_provider = TmdbMetadataProvider()
elif movie.metadata_provider == "tvdb":
metadata_provider = TvdbMetadataProvider()
else:
log.error(
f"Unsupported metadata provider {movie.metadata_provider} for movie {movie.name}, skipping update."
)
continue
except InvalidConfigError:
log.exception(
f"Error initializing metadata provider {movie.metadata_provider} for movie {movie.name}",
)
continue
self.update_movie_metadata(
db_movie=movie, metadata_provider=metadata_provider
)

View File

@@ -4,7 +4,7 @@ from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.movies.schemas import Movie, MovieFile from media_manager.movies.schemas import Movie, MovieFile
from media_manager.torrent.manager import DownloadManager from media_manager.torrent.manager import DownloadManager
from media_manager.torrent.repository import TorrentRepository from media_manager.torrent.repository import TorrentRepository
from media_manager.torrent.schemas import Torrent, TorrentId from media_manager.torrent.schemas import Torrent, TorrentId, TorrentStatus
from media_manager.tv.schemas import EpisodeFile, Show from media_manager.tv.schemas import EpisodeFile, Show
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -96,6 +96,13 @@ class TorrentService:
log.exception(f"Error fetching status for torrent {x.title}") log.exception(f"Error fetching status for torrent {x.title}")
return torrents return torrents
def get_completed_torrents(self) -> list[Torrent]:
return [
t
for t in self.get_all_torrents()
if t.status == TorrentStatus.finished and not t.imported
]
def get_torrent_by_id(self, torrent_id: TorrentId) -> Torrent: def get_torrent_by_id(self, torrent_id: TorrentId) -> Torrent:
return self.get_torrent_status( return self.get_torrent_status(
self.torrent_repository.get_torrent_by_id(torrent_id=torrent_id) self.torrent_repository.get_torrent_by_id(torrent_id=torrent_id)

View File

@@ -7,6 +7,8 @@ from media_manager.exceptions import NotFoundError
from media_manager.indexer.dependencies import indexer_service_dep from media_manager.indexer.dependencies import indexer_service_dep
from media_manager.notification.dependencies import notification_service_dep from media_manager.notification.dependencies import notification_service_dep
from media_manager.torrent.dependencies import torrent_service_dep from media_manager.torrent.dependencies import torrent_service_dep
from media_manager.tv.importer import TvImportService
from media_manager.tv.metadata import TvMetadataService
from media_manager.tv.repository import TvRepository from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import Season, SeasonId, Show, ShowId from media_manager.tv.schemas import Season, SeasonId, Show, ShowId
from media_manager.tv.service import TvService from media_manager.tv.service import TvService
@@ -19,17 +21,47 @@ def get_tv_repository(db_session: DbSessionDependency) -> TvRepository:
tv_repository_dep = Annotated[TvRepository, Depends(get_tv_repository)] tv_repository_dep = Annotated[TvRepository, Depends(get_tv_repository)]
def get_tv_metadata_service(
tv_repository: tv_repository_dep,
) -> TvMetadataService:
return TvMetadataService(tv_repository=tv_repository)
tv_metadata_service_dep = Annotated[TvMetadataService, Depends(get_tv_metadata_service)]
def get_tv_import_service(
tv_repository: tv_repository_dep,
torrent_service: torrent_service_dep,
notification_service: notification_service_dep,
tv_metadata_service: tv_metadata_service_dep,
) -> TvImportService:
return TvImportService(
tv_repository=tv_repository,
torrent_service=torrent_service,
notification_service=notification_service,
tv_metadata_service=tv_metadata_service,
)
tv_import_service_dep = Annotated[TvImportService, Depends(get_tv_import_service)]
def get_tv_service( def get_tv_service(
tv_repository: tv_repository_dep, tv_repository: tv_repository_dep,
torrent_service: torrent_service_dep, torrent_service: torrent_service_dep,
indexer_service: indexer_service_dep, indexer_service: indexer_service_dep,
notification_service: notification_service_dep, notification_service: notification_service_dep,
tv_import_service: tv_import_service_dep,
tv_metadata_service: tv_metadata_service_dep,
) -> TvService: ) -> TvService:
return TvService( return TvService(
tv_repository=tv_repository, tv_repository=tv_repository,
torrent_service=torrent_service, torrent_service=torrent_service,
indexer_service=indexer_service, indexer_service=indexer_service,
notification_service=notification_service, notification_service=notification_service,
tv_import_service=tv_import_service,
tv_metadata_service=tv_metadata_service,
) )

View File

@@ -0,0 +1,154 @@
import logging
import re
from collections.abc import Callable
from pathlib import Path
from typing import Any
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import Quality, Torrent
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
get_files_for_import,
get_torrent_filepath,
import_file,
remove_special_characters,
)
from media_manager.tv.metadata import TvMetadataService
from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import EpisodeFile, Show
log = logging.getLogger(__name__)
class TvImportService(BaseMediaService[Show, Show]):
def __init__(
self,
tv_repository: TvRepository,
torrent_service: TorrentService,
notification_service: NotificationService,
tv_metadata_service: TvMetadataService,
) -> None:
super().__init__(
repository=tv_repository,
torrent_service=torrent_service,
indexer_service=None, # type: ignore[arg-type]
notification_service=notification_service,
)
self.tv_repository = tv_repository
self.tv_metadata_service = tv_metadata_service
def get_media_root_path(self, media: Show) -> Path:
misc_config = MediaManagerConfig().misc
return self.get_root_directory(
media=media,
default_dir=misc_config.tv_directory,
libraries=misc_config.tv_libraries,
)
def import_tv_show(
self,
show: Show,
source_directory: Path,
quality: Quality = Quality.unknown,
torrent_id: str | None = None,
file_path_suffix: str = "",
) -> bool:
video_files, _, _ = get_files_for_import(directory=source_directory)
if not video_files:
return False
any_imported = False
for video_file in video_files:
# Simple heuristic for season/episode from filename
match = re.search(r"S(\d+)E(\d+)", video_file.name, re.IGNORECASE)
if match:
s_num, e_num = int(match.group(1)), int(match.group(2))
season_dir = self.get_media_root_path(show) / f"Season {s_num}"
season_dir.mkdir(parents=True, exist_ok=True)
target_name = (
f"{remove_special_characters(show.name)} - S{s_num:02d}E{e_num:02d}"
)
if file_path_suffix:
target_name += f" - {file_path_suffix}"
target_file = season_dir / f"{target_name}{video_file.suffix}"
import_file(target_file=target_file, source_file=video_file)
any_imported = True
# Update DB
try:
season = self.tv_repository.get_season_by_number(s_num, show.id)
episode = next(
(e for e in season.episodes if e.number == e_num), None
)
if episode:
self.tv_repository.add_episode_file(
EpisodeFile(
episode_id=episode.id,
quality=quality,
torrent_id=torrent_id,
file_path_suffix=file_path_suffix,
)
)
except Exception:
log.exception(f"Could not update DB for {video_file.name}")
return any_imported
def import_torrent_files(self, torrent: Torrent, show: Show) -> None:
success = self.import_tv_show(
show=show,
source_directory=get_torrent_filepath(torrent),
quality=torrent.quality,
torrent_id=torrent.id,
)
if success:
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
self.notify_import_success(show.name, "TV show")
else:
self.notify_import_failure(show.name, "TV show")
def get_import_candidates(
self, tv_path: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
return super().get_import_candidates(
directory=tv_path,
metadata_provider=metadata_provider,
search_func=self.tv_metadata_service.search_for_show,
)
def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> bool:
def _logic(s: Show, path: Path, _: Callable[[Any], Any]) -> bool:
return self.import_tv_show(s, path, file_path_suffix="IMPORTED")
return self.import_existing_media(
media=tv_show,
source_directory=source_directory,
import_func=_logic,
add_file_record_func=lambda _: (
None
), # Handled inside import_tv_show for now
)
def get_importable_tv_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
return self.get_importable_media(
root_path=MediaManagerConfig().misc.tv_directory,
metadata_provider=metadata_provider,
get_candidates_func=self.get_import_candidates,
)
def import_all_torrents(self) -> None:
self.import_all_torrents_base(
get_media_func=self.torrent_service.get_show_of_torrent,
import_torrent_func=self.import_torrent_files,
media_type_name="tv show",
)

View File

@@ -0,0 +1,151 @@
import logging
from media_manager.common.service import BaseMetadataService
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import Episode, EpisodeId, Season, SeasonId, Show
log = logging.getLogger(__name__)
class TvMetadataService(BaseMetadataService[Show, Show]):
def __init__(self, tv_repository: TvRepository) -> None:
super().__init__(repository=tv_repository)
self.tv_repository = tv_repository
def add_show(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Show:
return self.add_media_base(
external_id=external_id,
metadata_provider=metadata_provider,
get_metadata_func=metadata_provider.get_show_metadata,
save_func=self.tv_repository.save_show,
download_poster_func=metadata_provider.download_show_poster_image,
language=language,
)
def search_for_show(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.search_for_media_base(
query=query,
metadata_provider=metadata_provider,
search_func=metadata_provider.search_show,
get_by_external_id_func=self.tv_repository.get_show_by_external_id,
)
def get_popular_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.get_popular_media_base(
metadata_provider=metadata_provider,
search_func=metadata_provider.search_show,
)
def update_show_metadata(
self, db_show: Show, metadata_provider: AbstractMetadataProvider
) -> Show | None:
"""
Updates the metadata of a show.
"""
log.debug(f"Found show: {db_show.name} for metadata update.")
fresh_show_data = metadata_provider.get_show_metadata(
show_id=db_show.external_id, language=db_show.original_language
)
if not fresh_show_data:
log.warning(f"Could not fetch fresh metadata for show {db_show.name}")
return db_show
self.tv_repository.update_show_attributes(
show_id=db_show.id,
name=fresh_show_data.name,
overview=fresh_show_data.overview,
year=fresh_show_data.year,
ended=fresh_show_data.ended,
imdb_id=fresh_show_data.imdb_id,
continuous_download=db_show.continuous_download
if fresh_show_data.ended is False
else False,
)
existing_season_external_ids = {s.external_id: s for s in db_show.seasons}
for fresh_season_data in fresh_show_data.seasons:
if fresh_season_data.external_id in existing_season_external_ids:
existing_season = existing_season_external_ids[
fresh_season_data.external_id
]
self.tv_repository.update_season_attributes(
season_id=existing_season.id,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
)
existing_episode_external_ids = {
ep.external_id: ep for ep in existing_season.episodes
}
for fresh_episode_data in fresh_season_data.episodes:
if fresh_episode_data.external_id in existing_episode_external_ids:
existing_episode = existing_episode_external_ids[
fresh_episode_data.external_id
]
self.tv_repository.update_episode_attributes(
episode_id=existing_episode.id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
else:
episode_schema = Episode(
id=EpisodeId(fresh_episode_data.id),
number=fresh_episode_data.number,
external_id=fresh_episode_data.external_id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
self.tv_repository.add_episode_to_season(
season_id=existing_season.id, episode_data=episode_schema
)
else:
season_schema = Season(
id=SeasonId(fresh_season_data.id),
number=fresh_season_data.number,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
external_id=fresh_season_data.external_id,
episodes=[
Episode(
id=EpisodeId(ep_data.id),
number=ep_data.number,
external_id=ep_data.external_id,
title=ep_data.title,
overview=ep_data.overview,
)
for ep_data in fresh_season_data.episodes
],
)
self.tv_repository.add_season_to_show(
show_id=db_show.id, season_data=season_schema
)
updated_show = self.tv_repository.get_show_by_id(show_id=db_show.id)
metadata_provider.download_show_poster_image(show=updated_show)
return updated_show
def update_all_non_ended_shows_metadata(self) -> None:
def get_non_ended_shows() -> list[Show]:
return [show for show in self.tv_repository.get_shows() if not show.ended]
self.update_all_metadata_base(
get_all_to_update_func=get_non_ended_shows,
update_single_func=self.update_show_metadata,
tmdb_provider_class=TmdbMetadataProvider,
tvdb_provider_class=TvdbMetadataProvider,
media_type_name="tv show",
)

View File

@@ -3,26 +3,16 @@ from uuid import UUID
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, UniqueConstraint from sqlalchemy import ForeignKey, PrimaryKeyConstraint, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from media_manager.common.models import MediaFileMixin, MediaMixin
from media_manager.database import Base from media_manager.database import Base
from media_manager.torrent.models import Quality
class Show(Base): class Show(Base, MediaMixin):
__tablename__ = "show" __tablename__ = "show"
__table_args__ = (UniqueConstraint("external_id", "metadata_provider"),) __table_args__ = (UniqueConstraint("external_id", "metadata_provider"),)
id: Mapped[UUID] = mapped_column(primary_key=True)
external_id: Mapped[int]
metadata_provider: Mapped[str]
name: Mapped[str]
overview: Mapped[str]
year: Mapped[int | None]
ended: Mapped[bool] = mapped_column(default=False) ended: Mapped[bool] = mapped_column(default=False)
continuous_download: Mapped[bool] = mapped_column(default=False) continuous_download: Mapped[bool] = mapped_column(default=False)
library: Mapped[str] = mapped_column(default="")
original_language: Mapped[str | None] = mapped_column(default=None)
imdb_id: Mapped[str | None] = mapped_column(default=None)
seasons: Mapped[list["Season"]] = relationship( seasons: Mapped[list["Season"]] = relationship(
back_populates="show", cascade="all, delete" back_populates="show", cascade="all, delete"
@@ -66,17 +56,12 @@ class Episode(Base):
) )
class EpisodeFile(Base): class EpisodeFile(Base, MediaFileMixin):
__tablename__ = "episode_file" __tablename__ = "episode_file"
__table_args__ = (PrimaryKeyConstraint("episode_id", "file_path_suffix"),) __table_args__ = (PrimaryKeyConstraint("episode_id", "file_path_suffix"),)
episode_id: Mapped[UUID] = mapped_column( episode_id: Mapped[UUID] = mapped_column(
ForeignKey(column="episode.id", ondelete="CASCADE"), ForeignKey(column="episode.id", ondelete="CASCADE"),
) )
torrent_id: Mapped[UUID | None] = mapped_column(
ForeignKey(column="torrent.id", ondelete="SET NULL"),
)
file_path_suffix: Mapped[str]
quality: Mapped[Quality]
torrent = relationship("Torrent", back_populates="episode_files", uselist=False) torrent = relationship("Torrent", back_populates="episode_files", uselist=False)
episode = relationship("Episode", back_populates="episode_files", uselist=False) episode = relationship("Episode", back_populates="episode_files", uselist=False)

View File

@@ -1,9 +1,10 @@
from sqlalchemy import delete, func, select from sqlalchemy import distinct, func, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session, joinedload from sqlalchemy.orm import Session, joinedload
from media_manager.common.repository import BaseRepository
from media_manager.exceptions import ConflictError, NotFoundError from media_manager.exceptions import ConflictError, NotFoundError
from media_manager.torrent.models import Torrent from media_manager.torrent.models import Torrent as TorrentModel
from media_manager.torrent.schemas import Torrent as TorrentSchema from media_manager.torrent.schemas import Torrent as TorrentSchema
from media_manager.torrent.schemas import TorrentId from media_manager.torrent.schemas import TorrentId
from media_manager.tv import log from media_manager.tv import log
@@ -21,24 +22,16 @@ from media_manager.tv.schemas import Season as SeasonSchema
from media_manager.tv.schemas import Show as ShowSchema from media_manager.tv.schemas import Show as ShowSchema
class TvRepository: class TvRepository(BaseRepository[Show, ShowSchema]):
""" """
Repository for managing TV shows, seasons, and episodes in the database. Repository for managing TV shows, seasons, and episodes in the database.
Provides methods to retrieve, save, and delete shows and seasons. Provides methods to retrieve, save, and delete shows and seasons.
""" """
def __init__(self, db: Session) -> None: def __init__(self, db: Session) -> None:
self.db = db super().__init__(db, Show, ShowSchema)
def get_show_by_id(self, show_id: ShowId) -> ShowSchema: def get_show_by_id(self, show_id: ShowId) -> ShowSchema:
"""
Retrieve a show by its ID, including seasons and episodes.
:param show_id: The ID of the show to retrieve.
:return: A Show object if found.
:raises NotFoundError: If the show with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = ( stmt = (
select(Show) select(Show)
@@ -49,23 +42,15 @@ class TvRepository:
if not result: if not result:
msg = f"Show with id {show_id} not found." msg = f"Show with id {show_id} not found."
raise NotFoundError(msg) raise NotFoundError(msg)
return ShowSchema.model_validate(result)
except SQLAlchemyError: except SQLAlchemyError:
log.exception(f"Database error while retrieving show {show_id}") log.exception(f"Database error while retrieving show {show_id}")
raise raise
else:
return ShowSchema.model_validate(result)
def get_show_by_external_id( def get_show_by_external_id(
self, external_id: int, metadata_provider: str self, external_id: int, metadata_provider: str
) -> ShowSchema: ) -> ShowSchema:
"""
Retrieve a show by its external ID, including nested seasons and episodes.
:param external_id: The ID of the show to retrieve.
:param metadata_provider: The metadata provider associated with the ID.
:return: A Show object if found.
:raises NotFoundError: If the show with the given external ID and provider is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = ( stmt = (
select(Show) select(Show)
@@ -77,466 +62,236 @@ class TvRepository:
if not result: if not result:
msg = f"Show with external_id {external_id} and provider {metadata_provider} not found." msg = f"Show with external_id {external_id} and provider {metadata_provider} not found."
raise NotFoundError(msg) raise NotFoundError(msg)
return ShowSchema.model_validate(result)
except SQLAlchemyError: except SQLAlchemyError:
log.exception( log.exception(
f"Database error while retrieving show by external_id {external_id}", f"Database error while retrieving show by external_id {external_id}",
) )
raise raise
else:
return ShowSchema.model_validate(result)
def get_shows(self) -> list[ShowSchema]: def get_shows(self) -> list[ShowSchema]:
"""
Retrieve all shows from the database.
:return: A list of Show objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try: try:
stmt = select(Show).options( stmt = select(Show).options(
joinedload(Show.seasons).joinedload(Season.episodes) joinedload(Show.seasons).joinedload(Season.episodes)
) )
results = self.db.execute(stmt).scalars().unique().all() results = self.db.execute(stmt).scalars().unique().all()
return [ShowSchema.model_validate(show) for show in results]
except SQLAlchemyError: except SQLAlchemyError:
log.exception("Database error while retrieving all shows") log.exception("Database error while retrieving all shows")
raise raise
else:
return [ShowSchema.model_validate(show) for show in results]
delete_show = BaseRepository.delete
set_show_library = BaseRepository.set_library
def get_total_downloaded_episodes_count(self) -> int: def get_total_downloaded_episodes_count(self) -> int:
try: try:
stmt = select(func.count(Episode.id)).select_from(Episode).join(EpisodeFile) stmt = (
return self.db.execute(stmt).scalar_one_or_none() select(func.count(distinct(Episode.id)))
.select_from(Episode)
.join(EpisodeFile)
)
result = self.db.execute(stmt).scalar_one_or_none()
except SQLAlchemyError: except SQLAlchemyError:
log.exception("Database error while calculating downloaded episodes count") log.exception("Database error while calculating downloaded episodes count")
raise raise
else:
return result or 0
def save_show(self, show: ShowSchema) -> ShowSchema: def save_show(self, show: ShowSchema) -> ShowSchema:
"""
Save a new show or update an existing one in the database.
:param show: The Show object to save.
:return: The saved Show object.
:raises ValueError: If a show with the same primary key already exists (on insert).
:raises SQLAlchemyError: If a database error occurs.
"""
db_show = self.db.get(Show, show.id) if show.id else None db_show = self.db.get(Show, show.id) if show.id else None
if db_show: # Update existing show if db_show: # Use base for update
db_show.external_id = show.external_id return self.save_media_base(
db_show.metadata_provider = show.metadata_provider media_schema=show, model_class=Show, exclude={"seasons", "episodes"}
db_show.name = show.name
db_show.overview = show.overview
db_show.year = show.year
db_show.original_language = show.original_language
db_show.imdb_id = show.imdb_id
else: # Insert new show
db_show = Show(
id=show.id,
external_id=show.external_id,
metadata_provider=show.metadata_provider,
name=show.name,
overview=show.overview,
year=show.year,
ended=show.ended,
original_language=show.original_language,
imdb_id=show.imdb_id,
seasons=[
Season(
id=season.id,
show_id=show.id,
number=season.number,
external_id=season.external_id,
name=season.name,
overview=season.overview,
episodes=[
Episode(
id=episode.id,
season_id=season.id,
number=episode.number,
external_id=episode.external_id,
title=episode.title,
overview=episode.overview,
)
for episode in season.episodes
],
)
for season in show.seasons
],
) )
self.db.add(db_show)
# Custom insertion for nested seasons/episodes
db_show = Show(
id=show.id,
external_id=show.external_id,
metadata_provider=show.metadata_provider,
name=show.name,
overview=show.overview,
year=show.year,
ended=show.ended,
original_language=show.original_language,
imdb_id=show.imdb_id,
continuous_download=show.continuous_download,
library=show.library,
seasons=[
Season(
id=season.id,
show_id=show.id,
number=season.number,
external_id=season.external_id,
name=season.name,
overview=season.overview,
episodes=[
Episode(
id=episode.id,
season_id=season.id,
number=episode.number,
external_id=episode.external_id,
title=episode.title,
overview=episode.overview,
)
for episode in season.episodes
],
)
for season in show.seasons
],
)
self.db.add(db_show)
try: try:
self.db.commit() self.db.commit()
self.db.refresh(db_show) self.db.refresh(db_show)
return ShowSchema.model_validate(db_show)
except IntegrityError as e: except IntegrityError as e:
self.db.rollback() self.db.rollback()
msg = f"Show with this primary key or unique constraint violation: {e.orig}" msg = f"Integrity error: {e.orig}"
raise ConflictError(msg) from e raise ConflictError(msg) from e
except SQLAlchemyError: except SQLAlchemyError:
self.db.rollback() self.db.rollback()
log.exception(f"Database error while saving show {show.name}")
raise
def delete_show(self, show_id: ShowId) -> None:
"""
Delete a show by its ID.
:param show_id: The ID of the show to delete.
:raises NotFoundError: If the show with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
show = self.db.get(Show, show_id)
if not show:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
self.db.delete(show)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while deleting show {show_id}")
raise raise
else:
return ShowSchema.model_validate(db_show)
def get_season(self, season_id: SeasonId) -> SeasonSchema: def get_season(self, season_id: SeasonId) -> SeasonSchema:
""" season = self.db.get(Season, season_id)
Retrieve a season by its ID. if not season:
msg = f"Season {season_id} not found"
:param season_id: The ID of the season to get. raise NotFoundError(msg)
:return: A Season object. return SeasonSchema.model_validate(season)
:raises NotFoundError: If the season with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
season = self.db.get(Season, season_id)
if not season:
msg = f"Season with id {season_id} not found."
raise NotFoundError(msg)
return SeasonSchema.model_validate(season)
except SQLAlchemyError:
log.exception(f"Database error while retrieving season {season_id}")
raise
def get_episode(self, episode_id: EpisodeId) -> EpisodeSchema: def get_episode(self, episode_id: EpisodeId) -> EpisodeSchema:
""" episode = self.db.get(Episode, episode_id)
Retrieve an episode by its ID. if not episode:
msg = f"Episode {episode_id} not found"
:param episode_id: The ID of the episode to get. raise NotFoundError(msg)
:return: An Episode object. return EpisodeSchema.model_validate(episode)
:raises NotFoundError: If the episode with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
episode = self.db.get(Episode, episode_id)
if not episode:
msg = f"Episode with id {episode_id} not found."
raise NotFoundError(msg)
return EpisodeSchema.model_validate(episode)
except SQLAlchemyError as e:
log.error(f"Database error while retrieving episode {episode_id}: {e}")
raise
def get_season_by_episode(self, episode_id: EpisodeId) -> SeasonSchema: def get_season_by_episode(self, episode_id: EpisodeId) -> SeasonSchema:
try: stmt = select(Season).join(Season.episodes).where(Episode.id == episode_id)
stmt = select(Season).join(Season.episodes).where(Episode.id == episode_id) season = self.db.scalar(stmt)
if not season:
season = self.db.scalar(stmt) msg = f"Season for episode {episode_id} not found"
raise NotFoundError(msg)
if not season: return SeasonSchema.model_validate(season)
msg = f"Season not found for episode {episode_id}"
raise NotFoundError(msg)
return SeasonSchema.model_validate(season)
except SQLAlchemyError as e:
log.error(
f"Database error while retrieving season for episode {episode_id}: {e}"
)
raise
def get_season_by_number(self, season_number: int, show_id: ShowId) -> SeasonSchema: def get_season_by_number(self, season_number: int, show_id: ShowId) -> SeasonSchema:
""" stmt = (
Retrieve a season by its number and show ID. select(Season)
.where(Season.show_id == show_id)
:param season_number: The number of the season. .where(Season.number == season_number)
:param show_id: The ID of the show. .options(joinedload(Season.episodes), joinedload(Season.show))
:return: A Season object. )
:raises NotFoundError: If the season is not found. result = self.db.execute(stmt).unique().scalar_one_or_none()
:raises SQLAlchemyError: If a database error occurs. if not result:
""" msg = f"Season {season_number} for show {show_id} not found"
try: raise NotFoundError(msg)
stmt = ( return SeasonSchema.model_validate(result)
select(Season)
.where(Season.show_id == show_id)
.where(Season.number == season_number)
.options(joinedload(Season.episodes), joinedload(Season.show))
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Season number {season_number} for show_id {show_id} not found."
raise NotFoundError(msg)
return SeasonSchema.model_validate(result)
except SQLAlchemyError:
log.exception(
f"Database error retrieving season {season_number} for show {show_id}"
)
raise
def add_episode_file(self, episode_file: EpisodeFileSchema) -> EpisodeFileSchema: def add_episode_file(self, episode_file: EpisodeFileSchema) -> EpisodeFileSchema:
""" return self.add_media_file_base(
Adds an episode file record to the database. file_schema=episode_file,
model_class=EpisodeFile,
:param episode_file: The EpisodeFile object to add. schema_class=EpisodeFileSchema,
:return: The added EpisodeFile object. )
:raises IntegrityError: If the record violates constraints.
:raises SQLAlchemyError: If a database error occurs.
"""
db_model = EpisodeFile(**episode_file.model_dump())
try:
self.db.add(db_model)
self.db.commit()
self.db.refresh(db_model)
return EpisodeFileSchema.model_validate(db_model)
except IntegrityError as e:
self.db.rollback()
log.error(f"Integrity error while adding episode file: {e}")
raise
except SQLAlchemyError as e:
self.db.rollback()
log.error(f"Database error while adding episode file: {e}")
raise
def remove_episode_files_by_torrent_id(self, torrent_id: TorrentId) -> int: def remove_episode_files_by_torrent_id(self, torrent_id: TorrentId) -> int:
""" return self.remove_files_by_torrent_id_base(
Removes episode file records associated with a given torrent ID. torrent_id=torrent_id, model_class=EpisodeFile
)
:param torrent_id: The ID of the torrent whose episode files are to be removed.
:return: The number of episode files removed.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = delete(EpisodeFile).where(EpisodeFile.torrent_id == torrent_id)
result = self.db.execute(stmt)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(
f"Database error removing episode files for torrent_id {torrent_id}"
)
raise
return result.rowcount
def set_show_library(self, show_id: ShowId, library: str) -> None:
"""
Sets the library for a show.
:param show_id: The ID of the show to update.
:param library: The library path to set for the show.
:raises NotFoundError: If the show with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
show = self.db.get(Show, show_id)
if not show:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
show.library = library
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error setting library for show {show_id}")
raise
def get_episode_files_by_season_id( def get_episode_files_by_season_id(
self, season_id: SeasonId self, season_id: SeasonId
) -> list[EpisodeFileSchema]: ) -> list[EpisodeFileSchema]:
""" stmt = select(EpisodeFile).join(Episode).where(Episode.season_id == season_id)
Retrieve all episode files for a given season ID. results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(ef) for ef in results]
:param season_id: The ID of the season.
:return: A list of EpisodeFile objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(EpisodeFile).join(Episode).where(Episode.season_id == season_id)
)
results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(ef) for ef in results]
except SQLAlchemyError:
log.exception(
f"Database error retrieving episode files for season_id {season_id}"
)
raise
def get_episode_files_by_episode_id( def get_episode_files_by_episode_id(
self, episode_id: EpisodeId self, episode_id: EpisodeId
) -> list[EpisodeFileSchema]: ) -> list[EpisodeFileSchema]:
""" stmt = select(EpisodeFile).where(EpisodeFile.episode_id == episode_id)
Retrieve all episode files for a given episode ID. results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(sf) for sf in results]
:param episode_id: The ID of the episode.
:return: A list of EpisodeFile objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(EpisodeFile).where(EpisodeFile.episode_id == episode_id)
results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(sf) for sf in results]
except SQLAlchemyError as e:
log.error(
f"Database error retrieving episode files for episode_id {episode_id}: {e}"
)
raise
def get_torrents_by_show_id(self, show_id: ShowId) -> list[TorrentSchema]: def get_torrents_by_show_id(self, show_id: ShowId) -> list[TorrentSchema]:
""" stmt = (
Retrieve all torrents associated with a given show ID. select(TorrentModel)
.distinct()
:param show_id: The ID of the show. .join(EpisodeFile, EpisodeFile.torrent_id == TorrentModel.id)
:return: A list of Torrent objects. .join(Episode, Episode.id == EpisodeFile.episode_id)
:raises SQLAlchemyError: If a database error occurs. .join(Season, Season.id == Episode.season_id)
""" .where(Season.show_id == show_id)
try: )
stmt = ( results = self.db.execute(stmt).scalars().unique().all()
select(Torrent) return [TorrentSchema.model_validate(t) for t in results]
.distinct()
.join(EpisodeFile, EpisodeFile.torrent_id == Torrent.id)
.join(Episode, Episode.id == EpisodeFile.episode_id)
.join(Season, Season.id == Episode.season_id)
.where(Season.show_id == show_id)
)
results = self.db.execute(stmt).scalars().unique().all()
return [TorrentSchema.model_validate(torrent) for torrent in results]
except SQLAlchemyError:
log.exception(f"Database error retrieving torrents for show_id {show_id}")
raise
def get_all_shows_with_torrents(self) -> list[ShowSchema]: def get_all_shows_with_torrents(self) -> list[ShowSchema]:
""" stmt = (
Retrieve all shows that are associated with a torrent, ordered alphabetically by show name. select(Show)
.distinct()
:return: A list of Show objects. .join(Season, Show.id == Season.show_id)
:raises SQLAlchemyError: If a database error occurs. .join(Episode, Season.id == Episode.season_id)
""" .join(EpisodeFile, Episode.id == EpisodeFile.episode_id)
try: .join(TorrentModel, EpisodeFile.torrent_id == TorrentModel.id)
stmt = ( .options(joinedload(Show.seasons).joinedload(Season.episodes))
select(Show) .order_by(Show.name)
.distinct() )
.join(Season, Show.id == Season.show_id) results = self.db.execute(stmt).scalars().unique().all()
.join(Episode, Season.id == Episode.season_id) return [ShowSchema.model_validate(show) for show in results]
.join(EpisodeFile, Episode.id == EpisodeFile.episode_id)
.join(Torrent, EpisodeFile.torrent_id == Torrent.id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
.order_by(Show.name)
)
results = self.db.execute(stmt).scalars().unique().all()
return [ShowSchema.model_validate(show) for show in results]
except SQLAlchemyError:
log.exception("Database error retrieving all shows with torrents")
raise
def get_seasons_by_torrent_id(self, torrent_id: TorrentId) -> list[SeasonNumber]: def get_seasons_by_torrent_id(self, torrent_id: TorrentId) -> list[SeasonNumber]:
""" stmt = (
Retrieve season numbers associated with a given torrent ID. select(Season.number)
.distinct()
:param torrent_id: The ID of the torrent. .join(Episode, Episode.season_id == Season.id)
:return: A list of SeasonNumber objects. .join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
:raises SQLAlchemyError: If a database error occurs. .where(EpisodeFile.torrent_id == torrent_id)
""" )
try: results = self.db.execute(stmt).scalars().unique().all()
stmt = ( return [SeasonNumber(x) for x in results]
select(Season.number)
.distinct()
.join(Episode, Episode.season_id == Season.id)
.join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
.where(EpisodeFile.torrent_id == torrent_id)
)
results = self.db.execute(stmt).scalars().unique().all()
return [SeasonNumber(x) for x in results]
except SQLAlchemyError:
log.exception(
f"Database error retrieving season numbers for torrent_id {torrent_id}"
)
raise
def get_episodes_by_torrent_id(self, torrent_id: TorrentId) -> list[EpisodeNumber]: def get_episodes_by_torrent_id(self, torrent_id: TorrentId) -> list[EpisodeNumber]:
""" stmt = (
Retrieve episode numbers associated with a given torrent ID. select(Episode.number)
.distinct()
:param torrent_id: The ID of the torrent. .join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
:return: A list of EpisodeNumber objects. .where(EpisodeFile.torrent_id == torrent_id)
:raises SQLAlchemyError: If a database error occurs. .order_by(Episode.number)
""" )
try: episode_numbers = self.db.execute(stmt).scalars().all()
stmt = ( return [EpisodeNumber(n) for n in episode_numbers]
select(Episode.number)
.join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
.where(EpisodeFile.torrent_id == torrent_id)
.order_by(Episode.number)
)
episode_numbers = self.db.execute(stmt).scalars().all()
return [EpisodeNumber(n) for n in sorted(set(episode_numbers))]
except SQLAlchemyError as e:
log.error(
f"Database error retrieving episodes for torrent_id {torrent_id}: {e}"
)
raise
def get_show_by_season_id(self, season_id: SeasonId) -> ShowSchema: def get_show_by_season_id(self, season_id: SeasonId) -> ShowSchema:
""" stmt = (
Retrieve a show by one of its season's ID. select(Show)
.join(Season, Show.id == Season.show_id)
:param season_id: The ID of the season to retrieve the show for. .where(Season.id == season_id)
:return: A Show object. .options(joinedload(Show.seasons).joinedload(Season.episodes))
:raises NotFoundError: If the show for the given season ID is not found. )
:raises SQLAlchemyError: If a database error occurs. result = self.db.execute(stmt).unique().scalar_one_or_none()
""" if not result:
try: msg = f"Show for season {season_id} not found"
stmt = ( raise NotFoundError(msg)
select(Show) return ShowSchema.model_validate(result)
.join(Season, Show.id == Season.show_id)
.where(Season.id == season_id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Show for season_id {season_id} not found."
raise NotFoundError(msg)
return ShowSchema.model_validate(result)
except SQLAlchemyError:
log.exception(f"Database error retrieving show by season_id {season_id}")
raise
def add_season_to_show( def add_season_to_show(
self, show_id: ShowId, season_data: SeasonSchema self, show_id: ShowId, season_data: SeasonSchema
) -> SeasonSchema: ) -> SeasonSchema:
"""
Adds a new season and its episodes to a show.
If the season number already exists for the show, it returns the existing season.
:param show_id: The ID of the show to add the season to.
:param season_data: The SeasonSchema object for the new season.
:return: The added or existing SeasonSchema object.
:raises NotFoundError: If the show is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_show = self.db.get(Show, show_id) db_show = self.db.get(Show, show_id)
if not db_show: if not db_show:
msg = f"Show with id {show_id} not found." msg = f"Show {show_id} not found"
raise NotFoundError(msg) raise NotFoundError(msg)
stmt = select(Season).where(
stmt = ( Season.show_id == show_id, Season.number == season_data.number
select(Season)
.where(Season.show_id == show_id)
.where(Season.number == season_data.number)
) )
existing_db_season = self.db.execute(stmt).scalar_one_or_none() existing = self.db.execute(stmt).scalar_one_or_none()
if existing_db_season: if existing:
return SeasonSchema.model_validate(existing_db_season) return SeasonSchema.model_validate(existing)
db_season = Season( db_season = Season(
id=season_data.id, id=season_data.id,
show_id=show_id, show_id=show_id,
@@ -550,54 +305,56 @@ class TvRepository:
number=ep_schema.number, number=ep_schema.number,
external_id=ep_schema.external_id, external_id=ep_schema.external_id,
title=ep_schema.title, title=ep_schema.title,
overview=ep_schema.overview,
) )
for ep_schema in season_data.episodes for ep_schema in season_data.episodes
], ],
) )
self.db.add(db_season) self.db.add(db_season)
self.db.commit() try:
self.db.refresh(db_season) self.db.commit()
self.db.refresh(db_season)
except IntegrityError as e:
self.db.rollback()
msg = f"Integrity error: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
raise
return SeasonSchema.model_validate(db_season) return SeasonSchema.model_validate(db_season)
def add_episode_to_season( def add_episode_to_season(
self, season_id: SeasonId, episode_data: EpisodeSchema self, season_id: SeasonId, episode_data: EpisodeSchema
) -> EpisodeSchema: ) -> EpisodeSchema:
"""
Adds a new episode to a season.
If the episode number already exists for the season, it returns the existing episode.
:param season_id: The ID of the season to add the episode to.
:param episode_data: The EpisodeSchema object for the new episode.
:return: The added or existing EpisodeSchema object.
:raises NotFoundError: If the season is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_season = self.db.get(Season, season_id) db_season = self.db.get(Season, season_id)
if not db_season: if not db_season:
msg = f"Season with id {season_id} not found." msg = f"Season {season_id} not found"
raise NotFoundError(msg) raise NotFoundError(msg)
stmt = select(Episode).where(
stmt = ( Episode.season_id == season_id, Episode.number == episode_data.number
select(Episode)
.where(Episode.season_id == season_id)
.where(Episode.number == episode_data.number)
) )
existing_db_episode = self.db.execute(stmt).scalar_one_or_none() existing = self.db.execute(stmt).scalar_one_or_none()
if existing_db_episode: if existing:
return EpisodeSchema.model_validate(existing_db_episode) return EpisodeSchema.model_validate(existing)
db_episode = Episode( db_episode = Episode(
id=episode_data.id, id=episode_data.id,
season_id=season_id, season_id=season_id,
number=episode_data.number, number=episode_data.number,
external_id=episode_data.external_id, external_id=episode_data.external_id,
title=episode_data.title, title=episode_data.title,
overview=episode_data.overview,
) )
self.db.add(db_episode) self.db.add(db_episode)
self.db.commit() try:
self.db.refresh(db_episode) self.db.commit()
self.db.refresh(db_episode)
except IntegrityError as e:
self.db.rollback()
msg = f"Integrity error: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
raise
return EpisodeSchema.model_validate(db_episode) return EpisodeSchema.model_validate(db_episode)
def update_show_attributes( def update_show_attributes(
@@ -610,69 +367,24 @@ class TvRepository:
continuous_download: bool | None = None, continuous_download: bool | None = None,
imdb_id: str | None = None, imdb_id: str | None = None,
) -> ShowSchema: ) -> ShowSchema:
""" return self.update_media_attributes_base(
Update attributes of an existing show. media_id=show_id,
model_class=Show,
:param imdb_id: The new IMDb ID for the show. name=name,
:param continuous_download: The new continuous download status for the show. overview=overview,
:param show_id: The ID of the show to update. year=year,
:param name: The new name for the show. ended=ended,
:param overview: The new overview for the show. continuous_download=continuous_download,
:param year: The new year for the show. imdb_id=imdb_id,
:param ended: The new ended status for the show. )
:return: The updated ShowSchema object.
"""
db_show = self.db.get(Show, show_id)
if not db_show:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
updated = False
if name is not None and db_show.name != name:
db_show.name = name
updated = True
if overview is not None and db_show.overview != overview:
db_show.overview = overview
updated = True
if year is not None and db_show.year != year:
db_show.year = year
updated = True
if ended is not None and db_show.ended != ended:
db_show.ended = ended
updated = True
if (
continuous_download is not None
and db_show.continuous_download != continuous_download
):
db_show.continuous_download = continuous_download
updated = True
if imdb_id is not None and db_show.imdb_id != imdb_id:
db_show.imdb_id = imdb_id
updated = True
if updated:
self.db.commit()
self.db.refresh(db_show)
return ShowSchema.model_validate(db_show)
def update_season_attributes( def update_season_attributes(
self, season_id: SeasonId, name: str | None = None, overview: str | None = None self, season_id: SeasonId, name: str | None = None, overview: str | None = None
) -> SeasonSchema: ) -> SeasonSchema:
"""
Update attributes of an existing season.
:param season_id: The ID of the season to update.
:param name: The new name for the season.
:param overview: The new overview for the season.
:param external_id: The new external ID for the season.
:return: The updated SeasonSchema object.
:raises NotFoundError: If the season is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_season = self.db.get(Season, season_id) db_season = self.db.get(Season, season_id)
if not db_season: if not db_season:
msg = f"Season with id {season_id} not found." msg = f"Season {season_id} not found"
raise NotFoundError(msg) raise NotFoundError(msg)
updated = False updated = False
if name is not None and db_season.name != name: if name is not None and db_season.name != name:
db_season.name = name db_season.name = name
@@ -680,13 +392,13 @@ class TvRepository:
if overview is not None and db_season.overview != overview: if overview is not None and db_season.overview != overview:
db_season.overview = overview db_season.overview = overview
updated = True updated = True
if updated: if updated:
self.db.commit() try:
self.db.refresh(db_season) self.db.commit()
log.debug( self.db.refresh(db_season)
f"Updating existing season {db_season.number} for show {db_season.show.name}" except SQLAlchemyError:
) self.db.rollback()
raise
return SeasonSchema.model_validate(db_season) return SeasonSchema.model_validate(db_season)
def update_episode_attributes( def update_episode_attributes(
@@ -695,22 +407,10 @@ class TvRepository:
title: str | None = None, title: str | None = None,
overview: str | None = None, overview: str | None = None,
) -> EpisodeSchema: ) -> EpisodeSchema:
"""
Update attributes of an existing episode.
:param overview: Tje new overview for the episode.
:param episode_id: The ID of the episode to update.
:param title: The new title for the episode.
:param external_id: The new external ID for the episode.
:return: The updated EpisodeSchema object.
:raises NotFoundError: If the episode is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_episode = self.db.get(Episode, episode_id) db_episode = self.db.get(Episode, episode_id)
if not db_episode: if not db_episode:
msg = f"Episode with id {episode_id} not found." msg = f"Episode {episode_id} not found"
raise NotFoundError(msg) raise NotFoundError(msg)
updated = False updated = False
if title is not None and db_episode.title != title: if title is not None and db_episode.title != title:
db_episode.title = title db_episode.title = title
@@ -718,9 +418,11 @@ class TvRepository:
if overview is not None and db_episode.overview != overview: if overview is not None and db_episode.overview != overview:
db_episode.overview = overview db_episode.overview = overview
updated = True updated = True
if updated: if updated:
self.db.commit() try:
self.db.refresh(db_episode) self.db.commit()
log.info(f"Updating existing episode {db_episode.number}") self.db.refresh(db_episode)
except SQLAlchemyError:
self.db.rollback()
raise
return EpisodeSchema.model_validate(db_episode) return EpisodeSchema.model_validate(db_episode)

View File

@@ -17,6 +17,8 @@ from media_manager.torrent.utils import get_importable_media_directories
from media_manager.tv.dependencies import ( from media_manager.tv.dependencies import (
season_dep, season_dep,
show_dep, show_dep,
tv_import_service_dep,
tv_metadata_service_dep,
tv_service_dep, tv_service_dep,
) )
from media_manager.tv.schemas import ( from media_manager.tv.schemas import (
@@ -40,12 +42,16 @@ router = APIRouter()
dependencies=[Depends(current_active_user)], dependencies=[Depends(current_active_user)],
) )
def search_metadata_providers_for_a_show( def search_metadata_providers_for_a_show(
tv_service: tv_service_dep, query: str, metadata_provider: metadata_provider_dep tv_metadata_service: tv_metadata_service_dep,
query: str,
metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]: ) -> list[MetaDataProviderSearchResult]:
""" """
Search for a show on the configured metadata provider. Search for a show on the configured metadata provider.
""" """
return tv_service.search_for_show(query=query, metadata_provider=metadata_provider) return tv_metadata_service.search_for_show(
query=query, metadata_provider=metadata_provider
)
@router.get( @router.get(
@@ -53,12 +59,13 @@ def search_metadata_providers_for_a_show(
dependencies=[Depends(current_active_user)], dependencies=[Depends(current_active_user)],
) )
def get_recommended_shows( def get_recommended_shows(
tv_service: tv_service_dep, metadata_provider: metadata_provider_dep tv_metadata_service: tv_metadata_service_dep,
metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]: ) -> list[MetaDataProviderSearchResult]:
""" """
Get a list of recommended/popular shows from the metadata provider. Get a list of recommended/popular shows from the metadata provider.
""" """
return tv_service.get_popular_shows(metadata_provider=metadata_provider) return tv_metadata_service.get_popular_shows(metadata_provider=metadata_provider)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -72,12 +79,14 @@ def get_recommended_shows(
dependencies=[Depends(current_superuser)], dependencies=[Depends(current_superuser)],
) )
def get_all_importable_shows( def get_all_importable_shows(
tv_service: tv_service_dep, metadata_provider: metadata_provider_dep tv_import_service: tv_import_service_dep, metadata_provider: metadata_provider_dep
) -> list[MediaImportSuggestion]: ) -> list[MediaImportSuggestion]:
""" """
Get a list of unknown shows that were detected in the TV directory and are importable. Get a list of unknown shows that were detected in the TV directory and are importable.
""" """
return tv_service.get_importable_tv_shows(metadata_provider=metadata_provider) return tv_import_service.get_importable_tv_shows(
metadata_provider=metadata_provider
)
@router.post( @router.post(
@@ -86,7 +95,7 @@ def get_all_importable_shows(
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
) )
def import_detected_show( def import_detected_show(
tv_service: tv_service_dep, tv_show: show_dep, directory: str tv_import_service: tv_import_service_dep, tv_show: show_dep, directory: str
) -> None: ) -> None:
""" """
Import a detected show from the specified directory into the library. Import a detected show from the specified directory into the library.
@@ -96,7 +105,7 @@ def import_detected_show(
MediaManagerConfig().misc.tv_directory MediaManagerConfig().misc.tv_directory
): ):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory") raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")
tv_service.import_existing_tv_show( tv_import_service.import_existing_tv_show(
tv_show=tv_show, source_directory=source_directory tv_show=tv_show, source_directory=source_directory
) )
@@ -129,7 +138,7 @@ def get_all_shows(tv_service: tv_service_dep) -> list[Show]:
}, },
) )
def add_a_show( def add_a_show(
tv_service: tv_service_dep, tv_metadata_service: tv_metadata_service_dep,
metadata_provider: metadata_provider_dep, metadata_provider: metadata_provider_dep,
show_id: int, show_id: int,
language: str | None = None, language: str | None = None,
@@ -138,13 +147,13 @@ def add_a_show(
Add a new show to the library. Add a new show to the library.
""" """
try: try:
show = tv_service.add_show( show = tv_metadata_service.add_show(
external_id=show_id, external_id=show_id,
metadata_provider=metadata_provider, metadata_provider=metadata_provider,
language=language, language=language,
) )
except MediaAlreadyExistsError: except MediaAlreadyExistsError:
show = tv_service.get_show_by_external_id( show = tv_metadata_service.tv_repository.get_show_by_external_id(
show_id, metadata_provider=metadata_provider.name show_id, metadata_provider=metadata_provider.name
) )
if not show: if not show:
@@ -216,12 +225,17 @@ def delete_a_show(
dependencies=[Depends(current_active_user)], dependencies=[Depends(current_active_user)],
) )
def update_shows_metadata( def update_shows_metadata(
show: show_dep, tv_service: tv_service_dep, metadata_provider: metadata_provider_dep show: show_dep,
tv_metadata_service: tv_metadata_service_dep,
tv_service: tv_service_dep,
metadata_provider: metadata_provider_dep,
) -> PublicShow: ) -> PublicShow:
""" """
Update a show's metadata from the provider. Update a show's metadata from the provider.
""" """
tv_service.update_show_metadata(db_show=show, metadata_provider=metadata_provider) tv_metadata_service.update_show_metadata(
db_show=show, metadata_provider=metadata_provider
)
return tv_service.get_public_show_by_id(show=show) return tv_service.get_public_show_by_id(show=show)
@@ -361,4 +375,4 @@ def get_total_count_of_downloaded_episodes(tv_service: tv_service_dep) -> int:
""" """
Get the total count of downloaded episodes across all shows. Get the total count of downloaded episodes across all shows.
""" """
return tv_service.get_total_downloaded_episoded_count() return tv_service.get_total_downloaded_episodes_count()

View File

@@ -4,6 +4,7 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from media_manager.common.schemas import BaseMedia, BaseMediaFile
from media_manager.torrent.models import Quality from media_manager.torrent.models import Quality
from media_manager.torrent.schemas import TorrentId, TorrentStatus from media_manager.torrent.schemas import TorrentId, TorrentStatus
@@ -39,35 +40,17 @@ class Season(BaseModel):
episodes: list[Episode] episodes: list[Episode]
class Show(BaseModel): class Show(BaseMedia):
model_config = ConfigDict(from_attributes=True)
id: ShowId = Field(default_factory=lambda: ShowId(uuid.uuid4())) id: ShowId = Field(default_factory=lambda: ShowId(uuid.uuid4()))
name: str
overview: str
year: int | None
ended: bool = False ended: bool = False
external_id: int
metadata_provider: str
continuous_download: bool = False continuous_download: bool = False
library: str = "Default"
original_language: str | None = None
imdb_id: str | None = None
seasons: list[Season] seasons: list[Season]
class EpisodeFile(BaseModel): class EpisodeFile(BaseMediaFile):
model_config = ConfigDict(from_attributes=True)
episode_id: EpisodeId episode_id: EpisodeId
quality: Quality
torrent_id: TorrentId | None
file_path_suffix: str
class PublicEpisodeFile(EpisodeFile): class PublicEpisodeFile(EpisodeFile):

View File

@@ -1,39 +1,21 @@
import pprint
import re
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import overload
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig from media_manager.config import MediaManagerConfig
from media_manager.exceptions import InvalidConfigError, NotFoundError, RenameError
from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId
from media_manager.indexer.service import IndexerService from media_manager.indexer.service import IndexerService
from media_manager.indexer.utils import evaluate_indexer_query_results from media_manager.indexer.utils import evaluate_indexer_query_results
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.notification.service import NotificationService from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import ( from media_manager.torrent.schemas import (
Quality,
Torrent, Torrent,
TorrentStatus,
) )
from media_manager.torrent.service import TorrentService from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
extract_external_id_from_string,
get_files_for_import,
get_importable_media_directories,
import_file,
remove_special_characters,
remove_special_chars_and_parentheses,
)
from media_manager.tv import log from media_manager.tv import log
from media_manager.tv.importer import TvImportService
from media_manager.tv.metadata import TvMetadataService
from media_manager.tv.repository import TvRepository from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import ( from media_manager.tv.schemas import (
Episode, Episode,
@@ -52,40 +34,27 @@ from media_manager.tv.schemas import (
) )
class TvService: class TvService(BaseMediaService[Show, Show]):
def __init__( def __init__(
self, self,
tv_repository: TvRepository, tv_repository: TvRepository,
torrent_service: TorrentService, torrent_service: TorrentService,
indexer_service: IndexerService, indexer_service: IndexerService,
notification_service: NotificationService, notification_service: NotificationService,
tv_import_service: TvImportService,
tv_metadata_service: TvMetadataService,
) -> None: ) -> None:
self.tv_repository = tv_repository super().__init__(
self.torrent_service = torrent_service repository=tv_repository,
self.indexer_service = indexer_service torrent_service=torrent_service,
self.notification_service = notification_service indexer_service=indexer_service,
notification_service=notification_service,
def add_show(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Show:
"""
Add a new show to the database.
:param external_id: The ID of the show in the metadata provider\'s system.
:param metadata_provider: The name of the metadata provider.
:param language: Optional language code (ISO 639-1) to fetch metadata in.
"""
show_with_metadata = metadata_provider.get_show_metadata(
show_id=external_id, language=language
) )
saved_show = self.tv_repository.save_show(show=show_with_metadata) self.tv_repository = tv_repository
metadata_provider.download_show_poster_image(show=saved_show) self.tv_import_service = tv_import_service
return saved_show self.tv_metadata_service = tv_metadata_service
def get_total_downloaded_episoded_count(self) -> int: def get_total_downloaded_episodes_count(self) -> int:
""" """
Get total number of downloaded episodes. Get total number of downloaded episodes.
""" """
@@ -93,7 +62,13 @@ class TvService:
return self.tv_repository.get_total_downloaded_episodes_count() return self.tv_repository.get_total_downloaded_episodes_count()
def set_show_library(self, show: Show, library: str) -> None: def set_show_library(self, show: Show, library: str) -> None:
self.tv_repository.set_show_library(show_id=show.id, library=library) self.tv_repository.set_show_library(show.id, library)
def get_all_shows(self) -> list[Show]:
"""
Get all shows in the library.
"""
return self.get_all_media()
def delete_show( def delete_show(
self, self,
@@ -131,7 +106,7 @@ class TvService:
f"Failed to delete torrent {torrent.hash}", exc_info=True f"Failed to delete torrent {torrent.hash}", exc_info=True
) )
self.tv_repository.delete_show(show_id=show.id) self.tv_repository.delete_show(show.id)
def get_public_episode_files_by_season_id( def get_public_episode_files_by_season_id(
self, season: Season self, season: Season
@@ -155,46 +130,6 @@ class TvService:
result.append(episode_file) result.append(episode_file)
return result return result
@overload
def check_if_show_exists(self, *, external_id: int, metadata_provider: str) -> bool:
"""
Check if a show exists in the database.
:param external_id: The external ID of the show.
:param metadata_provider: The metadata provider.
:return: True if the show exists, False otherwise.
"""
@overload
def check_if_show_exists(self, *, show_id: ShowId) -> bool:
"""
Check if a show exists in the database.
:param show_id: The ID of the show.
:return: True if the show exists, False otherwise.
"""
def check_if_show_exists(
self, *, external_id=None, metadata_provider=None, show_id=None
) -> bool:
if not (external_id is None or metadata_provider is None):
try:
self.tv_repository.get_show_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
except NotFoundError:
return False
elif show_id is not None:
try:
self.tv_repository.get_show_by_id(show_id=show_id)
except NotFoundError:
return False
else:
msg = "Use one of the provided overloads for this function!"
raise ValueError(msg)
return True
def get_all_available_torrents_for_a_season( def get_all_available_torrents_for_a_season(
self, self,
season_number: int, season_number: int,
@@ -225,62 +160,6 @@ class TvService:
is_tv=True, query_results=results, media=show is_tv=True, query_results=results, media=show
) )
def get_all_shows(self) -> list[Show]:
"""
Get all shows.
:return: A list of all shows.
"""
return self.tv_repository.get_shows()
def search_for_show(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Search for shows using a given query.
:param query: The search query.
:param metadata_provider: The metadata provider to search.
:return: A list of metadata provider show search results.
"""
results = metadata_provider.search_show(query)
for result in results:
if self.check_if_show_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
):
result.added = True
try:
show = self.tv_repository.get_show_by_external_id(
external_id=result.external_id,
metadata_provider=metadata_provider.name,
)
result.id = show.id
except Exception:
log.error(
f"Unable to find internal show ID for {result.external_id} on {metadata_provider.name}"
)
return results
def get_popular_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Get popular shows from a given metadata provider.
:param metadata_provider: The metadata provider to use.
:return: A list of metadata provider show search results.
"""
results = metadata_provider.search_show()
return [
result
for result in results
if not self.check_if_show_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
)
]
def get_public_show_by_id(self, show: Show) -> PublicShow: def get_public_show_by_id(self, show: Show) -> PublicShow:
""" """
Get a public show from a Show object. Get a public show from a Show object.
@@ -577,402 +456,15 @@ class TvService:
def get_root_show_directory(self, show: Show) -> Path: def get_root_show_directory(self, show: Show) -> Path:
misc_config = MediaManagerConfig().misc misc_config = MediaManagerConfig().misc
show_directory_name = f"{remove_special_characters(show.name)} ({show.year}) [{show.metadata_provider}id-{show.external_id}]" return self.get_root_directory(
log.debug( media=show,
f"Show {show.name} without special characters: {remove_special_characters(show.name)}" default_dir=misc_config.tv_directory,
libraries=misc_config.tv_libraries,
) )
if show.library != "Default":
for library in misc_config.tv_libraries:
if library.name == show.library:
log.debug(
f"Using library {library.name} for show {show.name} ({show.year})"
)
return Path(library.path) / show_directory_name
else:
log.warning(
f"Library {show.library} not defined in config, using default TV directory."
)
return misc_config.tv_directory / show_directory_name
def get_root_season_directory(self, show: Show, season_number: int) -> Path: def get_root_season_directory(self, show: Show, season_number: int) -> Path:
return self.get_root_show_directory(show) / Path(f"Season {season_number}") return self.get_root_show_directory(show) / Path(f"Season {season_number}")
def import_episode(
self,
show: Show,
season: Season,
episode_number: int,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode_number:02d}"
if file_path_suffix != "":
episode_file_name += f" - {file_path_suffix}"
pattern = (
r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode_number) + r"[. ].*"
)
subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt"
target_file_name = (
self.get_root_season_directory(show=show, season_number=season.number)
/ episode_file_name
)
# import subtitle
for subtitle_file in subtitle_files:
regex_result = re.search(
subtitle_pattern, subtitle_file.name, re.IGNORECASE
)
if regex_result:
language_code = regex_result.group(1)
target_subtitle_file = target_file_name.with_suffix(
f".{language_code}.srt"
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
else:
log.debug(
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
# import episode videos
for file in video_files:
if re.search(pattern, file.name, re.IGNORECASE):
target_video_file = target_file_name.with_suffix(file.suffix)
import_file(target_file=target_video_file, source_file=file)
return True
else:
msg = f"Could not find any video file for episode {episode_number} of show {show.name} S{season.number}"
raise Exception(msg) # noqa: TRY002 # TODO: resolve this
def import_season(
self,
show: Show,
season: Season,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> tuple[bool, list[Episode]]:
season_path = self.get_root_season_directory(
show=show, season_number=season.number
)
success = True
imported_episodes = []
try:
season_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
log.exception(f"Could not create path {season_path}")
msg = f"Could not create path {season_path}"
raise Exception(msg) from e # noqa: TRY002 # TODO: resolve this
for episode in season.episodes:
try:
imported = self.import_episode(
show=show,
subtitle_files=subtitle_files,
video_files=video_files,
season=season,
episode_number=episode.number,
file_path_suffix=file_path_suffix,
)
if imported:
imported_episodes.append(episode)
except Exception:
# Send notification about missing episode file
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Missing Episode File",
message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.",
)
success = False
log.warning(
f"S{season.number}E{episode.number} not found when trying to import episode for show {show.name}."
)
return success, imported_episodes
def import_episode_files(
self,
show: Show,
season: Season,
episode: Episode,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode.number:02d}"
if file_path_suffix != "":
episode_file_name += f" - {file_path_suffix}"
pattern = (
r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode.number) + r"[. ].*"
)
subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt"
target_file_name = (
self.get_root_season_directory(show=show, season_number=season.number)
/ episode_file_name
)
# import subtitle
for subtitle_file in subtitle_files:
regex_result = re.search(
subtitle_pattern, subtitle_file.name, re.IGNORECASE
)
if regex_result:
language_code = regex_result.group(1)
target_subtitle_file = target_file_name.with_suffix(
f".{language_code}.srt"
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
else:
log.debug(
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
found_video = False
# import episode videos
for file in video_files:
if re.search(pattern, file.name, re.IGNORECASE):
target_video_file = target_file_name.with_suffix(file.suffix)
import_file(target_file=target_video_file, source_file=file)
found_video = True
break
if not found_video:
# Send notification about missing episode file
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Missing Episode File",
message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.",
)
log.warning(
f"File for S{season.number}E{episode.number} not found when trying to import episode for show {show.name}."
)
return False
return True
def import_episode_files_from_torrent(self, torrent: Torrent, show: Show) -> None:
"""
Organizes episodes files from a torrent into the TV directory structure, mapping them to seasons and episodes.
:param torrent: The Torrent object
:param show: The Show object
"""
video_files, subtitle_files, _all_files = get_files_for_import(torrent=torrent)
success: list[bool] = []
log.debug(
f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files)
)
episode_files = self.torrent_service.get_episode_files_of_torrent(
torrent=torrent
)
if not episode_files:
log.warning(
f"No episode files associated with torrent {torrent.title}, skipping import."
)
return
log.info(
f"Found {len(episode_files)} episode files associated with torrent {torrent.title}"
)
imported_episodes_by_season: dict[int, list[int]] = {}
for episode_file in episode_files:
season = self.get_season_by_episode(episode_id=episode_file.episode_id)
episode = self.get_episode(episode_file.episode_id)
season_path = self.get_root_season_directory(
show=show, season_number=season.number
)
if not season_path.exists():
try:
season_path.mkdir(parents=True)
except Exception as e:
log.warning(f"Could not create path {season_path}: {e}")
msg = f"Could not create path {season_path}"
raise Exception(msg) from e # noqa: TRY002
episoded_import_success = self.import_episode_files(
show=show,
season=season,
episode=episode,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix=episode_file.file_path_suffix,
)
success.append(episoded_import_success)
if episoded_import_success:
imported_episodes_by_season.setdefault(season.number, []).append(
episode.number
)
log.info(
f"Episode {episode.number} from Season {season.number} successfully imported from torrent {torrent.title}"
)
else:
log.warning(
f"Episode {episode.number} from Season {season.number} failed to import from torrent {torrent.title}"
)
success_messages: list[str] = []
for season_number, episodes in imported_episodes_by_season.items():
episode_list = ",".join(str(e) for e in sorted(episodes))
success_messages.append(
f"Episode(s): {episode_list} from Season {season_number}"
)
episodes_summary = "; ".join(success_messages)
if all(success):
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
# Send successful season download notification
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="TV Show imported successfully",
message=(
f"Successfully imported {episodes_summary} "
f"of {show.name} ({show.year}) "
f"from torrent {torrent.title}."
),
)
else:
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Failed to import TV Show",
message=f"Importing {show.name} ({show.year}) from torrent {torrent.title} completed with errors. Please check the logs for details.",
)
log.info(
f"Finished importing files for torrent {torrent.title} {'without' if all(success) else 'with'} errors"
)
def update_show_metadata(
self, db_show: Show, metadata_provider: AbstractMetadataProvider
) -> Show | None:
"""
Updates the metadata of a show.
This includes adding new seasons and episodes if available from the metadata provider.
It also updates existing show, season, and episode attributes if they have changed.
:param metadata_provider: The metadata provider object to fetch fresh data from.
:param db_show: The Show to update
:return: The updated Show object, or None if the show is not found or an error occurs.
"""
log.debug(f"Found show: {db_show.name} for metadata update.")
# Use stored original_language preference for metadata fetching
fresh_show_data = metadata_provider.get_show_metadata(
show_id=db_show.external_id, language=db_show.original_language
)
if not fresh_show_data:
log.warning(
f"Could not fetch fresh metadata for show {db_show.name} (External ID: {db_show.external_id}) from {db_show.metadata_provider}."
)
return db_show
log.debug(f"Fetched fresh metadata for show: {fresh_show_data.name}")
self.tv_repository.update_show_attributes(
show_id=db_show.id,
name=fresh_show_data.name,
overview=fresh_show_data.overview,
year=fresh_show_data.year,
ended=fresh_show_data.ended,
imdb_id=fresh_show_data.imdb_id,
continuous_download=db_show.continuous_download
if fresh_show_data.ended is False
else False,
)
# Process seasons and episodes
existing_season_external_ids = {s.external_id: s for s in db_show.seasons}
for fresh_season_data in fresh_show_data.seasons:
if fresh_season_data.external_id in existing_season_external_ids:
# Update existing season
existing_season = existing_season_external_ids[
fresh_season_data.external_id
]
self.tv_repository.update_season_attributes(
season_id=existing_season.id,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
)
# Process episodes for this season
existing_episode_external_ids = {
ep.external_id: ep for ep in existing_season.episodes
}
for fresh_episode_data in fresh_season_data.episodes:
if fresh_episode_data.external_id in existing_episode_external_ids:
# Update existing episode
existing_episode = existing_episode_external_ids[
fresh_episode_data.external_id
]
self.tv_repository.update_episode_attributes(
episode_id=existing_episode.id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
else:
# Add new episode
log.debug(
f"Adding new episode {fresh_episode_data.number} to season {existing_season.number}"
)
episode_schema = Episode(
id=EpisodeId(fresh_episode_data.id),
number=fresh_episode_data.number,
external_id=fresh_episode_data.external_id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
self.tv_repository.add_episode_to_season(
season_id=existing_season.id, episode_data=episode_schema
)
else:
# Add new season (and its episodes)
log.debug(
f"Adding new season {fresh_season_data.number} to show {db_show.name}"
)
episodes_for_schema = [
Episode(
id=EpisodeId(ep_data.id),
number=ep_data.number,
external_id=ep_data.external_id,
title=ep_data.title,
overview=ep_data.overview,
)
for ep_data in fresh_season_data.episodes
]
season_schema = Season(
id=SeasonId(fresh_season_data.id),
number=fresh_season_data.number,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
external_id=fresh_season_data.external_id,
episodes=episodes_for_schema,
)
self.tv_repository.add_season_to_show(
show_id=db_show.id, season_data=season_schema
)
updated_show = self.tv_repository.get_show_by_id(show_id=db_show.id)
log.info(f"Successfully updated metadata for show: {updated_show.name}")
metadata_provider.download_show_poster_image(show=updated_show)
return updated_show
def set_show_continuous_download( def set_show_continuous_download(
self, show: Show, continuous_download: bool self, show: Show, continuous_download: bool
) -> Show: ) -> Show:
@@ -987,129 +479,14 @@ class TvService:
show_id=show.id, continuous_download=continuous_download show_id=show.id, continuous_download=continuous_download
) )
def get_import_candidates(
self, tv_show: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_show(
remove_special_chars_and_parentheses(tv_show.name), metadata_provider
)
import_candidates = MediaImportSuggestion(
directory=tv_show, candidates=search_result
)
log.debug(
f"Found {len(import_candidates.candidates)} candidates for {import_candidates.directory}"
)
return import_candidates
def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> None:
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.exception(f"Failed to rename {source_directory} to {new_source_path}")
raise RenameError from e
video_files, subtitle_files, _all_files = get_files_for_import(
directory=new_source_path
)
for season in tv_show.seasons:
_success, imported_episodes = self.import_season(
show=tv_show,
season=season,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
)
for episode in imported_episodes:
episode_file = EpisodeFile(
episode_id=episode.id,
quality=Quality.unknown,
file_path_suffix="IMPORTED",
torrent_id=None,
)
self.tv_repository.add_episode_file(episode_file=episode_file)
def get_importable_tv_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
tv_directory = MediaManagerConfig().misc.tv_directory
import_suggestions: list[MediaImportSuggestion] = []
candidate_dirs = get_importable_media_directories(tv_directory)
for item in candidate_dirs:
metadata, external_id = extract_external_id_from_string(item.name)
if metadata is not None and external_id is not None:
try:
self.tv_repository.get_show_by_external_id(
external_id=external_id,
metadata_provider=metadata,
)
log.debug(
f"Show {item.name} already exists in the database, skipping import suggestion."
)
continue
except NotFoundError:
log.debug(
f"Show {item.name} not found in database, checking for import candidates."
)
import_suggestion = self.get_import_candidates(
tv_show=item, metadata_provider=metadata_provider
)
import_suggestions.append(import_suggestion)
log.debug(f"Detected {len(import_suggestions)} importable TV shows.")
return import_suggestions
def import_all_torrents(self) -> None: def import_all_torrents(self) -> None:
log.info("Importing all torrents") """
torrents = self.torrent_service.get_all_torrents() Delegate to TvImportService.
log.info("Found %d torrents to import", len(torrents)) """
for t in torrents: self.tv_import_service.import_all_torrents()
show = None
try:
if not t.imported and t.status == TorrentStatus.finished:
show = self.torrent_service.get_show_of_torrent(torrent=t)
if show is None:
log.warning(
f"torrent {t.title} is not a tv torrent, skipping import."
)
continue
self.import_episode_files_from_torrent(torrent=t, show=show)
except RuntimeError as e:
show_name = show.name if show is not None else "<unknown>"
log.error(
f"Error importing torrent {t.title} for show {show_name}: {e}",
exc_info=True,
)
log.info("Finished importing all torrents")
def update_all_non_ended_shows_metadata(self) -> None: def update_all_non_ended_shows_metadata(self) -> None:
"""Updates the metadata of all non-ended shows.""" """
log.info("Updating metadata for all non-ended shows") Delegate to TvMetadataService.
shows = [show for show in self.tv_repository.get_shows() if not show.ended] """
log.info(f"Found {len(shows)} non-ended shows to update") self.tv_metadata_service.update_all_non_ended_shows_metadata()
for show in shows:
try:
if show.metadata_provider == "tmdb":
metadata_provider = TmdbMetadataProvider()
elif show.metadata_provider == "tvdb":
metadata_provider = TvdbMetadataProvider()
else:
log.error(
f"Unsupported metadata provider {show.metadata_provider} for show {show.name}, skipping update."
)
continue
except InvalidConfigError:
log.exception(
f"Error initializing metadata provider {show.metadata_provider} for show {show.name}"
)
continue
updated_show = self.update_show_metadata(
db_show=show, metadata_provider=metadata_provider
)
if updated_show:
log.debug("Updated show metadata", extra={"show": updated_show.name})
else:
log.warning(f"Failed to update metadata for show: {show.name}")