Refactor tv and movies (#526)

This PR refactors the movie and tv modules and adds a "common" module
for shared logic.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Dedicated import and metadata services for movies and TV;
completed-torrent detection and import flows.

* **Refactor**
* Shared media schemas, models, repository logic and base services
consolidated; movie/TV services and routes now delegate to specialised
import/metadata services.

* **Bug Fixes**
  * Fixed TV episode-count method name.

* **Chores**
  * Added .DS_Store to ignore list; added module comment.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Maximilian Dorninger
2026-05-07 14:18:29 +02:00
committed by GitHub
parent fcdb9bbe87
commit 25cd4b0724
23 changed files with 1516 additions and 1975 deletions

1
.gitignore vendored
View File

@@ -51,3 +51,4 @@ __pycache__
# MkDocs
site/
.DS_Store

View File

@@ -0,0 +1 @@
# Common base classes for media modules

View File

@@ -0,0 +1,34 @@
from uuid import UUID
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from media_manager.torrent.models import Quality
class MediaMixin:
"""
Mixin for common media fields used by both Movies and TV Shows.
"""
id: Mapped[UUID] = mapped_column(primary_key=True)
external_id: Mapped[int]
metadata_provider: Mapped[str]
name: Mapped[str]
overview: Mapped[str]
year: Mapped[int | None]
library: Mapped[str] = mapped_column(default="Default")
original_language: Mapped[str | None] = mapped_column(default=None)
imdb_id: Mapped[str | None] = mapped_column(default=None)
class MediaFileMixin:
"""
Mixin for common media file fields used by both Movie files and Episode files.
"""
file_path_suffix: Mapped[str]
quality: Mapped[Quality]
torrent_id: Mapped[UUID | None] = mapped_column(
ForeignKey(column="torrent.id", ondelete="SET NULL"),
)

View File

@@ -0,0 +1,175 @@
import logging
from typing import Any, TypeVar
from uuid import UUID
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session
from media_manager.exceptions import ConflictError, NotFoundError
log = logging.getLogger(__name__)
T = TypeVar("T")
S = TypeVar("S")
EntityId = UUID | int | str
class BaseRepository[T, S]:
"""
Base repository providing common CRUD operations for media models.
"""
def __init__(self, db: Session, model: type[T], schema: type[S]) -> None:
self.db = db
self.model = model
self.schema = schema
def get_by_id(self, entity_id: EntityId) -> S:
result = self.db.get(self.model, entity_id)
if not result:
msg = f"{self.model.__name__} with id {entity_id} not found."
raise NotFoundError(msg)
return self.schema.model_validate(result)
def get_by_external_id(self, external_id: int, metadata_provider: str) -> S:
stmt = select(self.model).where(
self.model.external_id == external_id,
self.model.metadata_provider == metadata_provider,
)
result = self.db.execute(stmt).scalar_one_or_none()
if not result:
msg = f"{self.model.__name__} with external_id {external_id} and provider {metadata_provider} not found."
raise NotFoundError(msg)
return self.schema.model_validate(result)
def get_all(self) -> list[S]:
stmt = select(self.model)
results = self.db.execute(stmt).scalars().unique().all()
return [self.schema.model_validate(r) for r in results]
def delete(self, entity_id: EntityId) -> None:
obj = self.db.get(self.model, entity_id)
if not obj:
msg = f"{self.model.__name__} with id {entity_id} not found."
raise NotFoundError(msg)
self.db.delete(obj)
self.db.commit()
def set_library(self, entity_id: EntityId, library: str) -> None:
obj = self.db.get(self.model, entity_id)
if not obj:
msg = f"{self.model.__name__} with id {entity_id} not found."
raise NotFoundError(msg)
obj.library = library
self.db.commit()
def save_media_base(
self,
media_schema: S,
model_class: type[T],
exclude: set[str] | None = None,
) -> S:
"""
Generic save method for media models.
"""
if exclude is None:
exclude = set()
db_obj = self.db.get(model_class, media_schema.id) if media_schema.id else None
if db_obj:
# Update existing
# Always exclude "id" from updates
update_exclude = exclude | {"id"}
for key, value in media_schema.model_dump(exclude=update_exclude).items():
if hasattr(db_obj, key):
setattr(db_obj, key, value)
else:
# Insert new
db_obj = model_class(**media_schema.model_dump(exclude=exclude))
self.db.add(db_obj)
try:
self.db.commit()
self.db.refresh(db_obj)
except IntegrityError as e:
self.db.rollback()
msg = f"Integrity error while saving {model_class.__name__}: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while saving {model_class.__name__}")
raise
else:
return self.schema.model_validate(db_obj)
def update_media_attributes_base(
self,
media_id: EntityId,
model_class: type[T],
**attributes: Any, # noqa: ANN401
) -> S:
"""
Generic update method for media attributes.
"""
db_obj = self.db.get(model_class, media_id)
if not db_obj:
msg = f"{model_class.__name__} with id {media_id} not found."
raise NotFoundError(msg)
updated = False
for key, value in attributes.items():
if (
value is not None
and hasattr(db_obj, key)
and getattr(db_obj, key) != value
):
setattr(db_obj, key, value)
updated = True
if updated:
try:
self.db.commit()
self.db.refresh(db_obj)
except SQLAlchemyError:
self.db.rollback()
raise
return self.schema.model_validate(db_obj)
def add_media_file_base(
self, file_schema: S, model_class: type[T], schema_class: type[S]
) -> S:
"""
Generic method to add a media file record.
"""
db_model = model_class(**file_schema.model_dump())
try:
self.db.add(db_model)
self.db.commit()
self.db.refresh(db_model)
except IntegrityError:
self.db.rollback()
raise
except SQLAlchemyError:
self.db.rollback()
raise
else:
return schema_class.model_validate(db_model)
def remove_files_by_torrent_id_base(
self, torrent_id: EntityId, model_class: type[T]
) -> int:
"""
Generic method to remove media files by torrent ID.
"""
try:
stmt = delete(model_class).where(model_class.torrent_id == torrent_id)
result = self.db.execute(stmt)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
raise
else:
return result.rowcount

View File

@@ -0,0 +1,33 @@
import uuid
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from media_manager.torrent.models import Quality
class BaseMedia(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID = Field(default_factory=uuid.uuid4)
name: str
overview: str
year: int | None
external_id: int
metadata_provider: str
library: str = "Default"
original_language: str | None = None
imdb_id: str | None = None
class BaseMediaFile(BaseModel):
model_config = ConfigDict(from_attributes=True)
quality: Quality
torrent_id: UUID | None = None
file_path_suffix: str
class PublicMediaFile(BaseMediaFile):
downloaded: bool = False
imported: bool = False

View File

@@ -0,0 +1,256 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, TypeVar
from media_manager.common.repository import BaseRepository
from media_manager.exceptions import InvalidConfigError, NotFoundError
from media_manager.indexer.service import IndexerService
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
get_importable_media_directories,
remove_special_characters,
)
log = logging.getLogger(__name__)
T = TypeVar("T")
S = TypeVar("S")
class BaseMediaService[T, S]:
"""
Base service providing common logic for media modules.
"""
def __init__(
self,
repository: BaseRepository[T, S],
torrent_service: TorrentService,
indexer_service: IndexerService,
notification_service: NotificationService,
) -> None:
self.repository = repository
self.torrent_service = torrent_service
self.indexer_service = indexer_service
self.notification_service = notification_service
def get_all_media(self) -> list[S]:
return self.repository.get_all()
def get_root_directory(
self, media: S, default_dir: Path, libraries: list[Any]
) -> Path:
"""
Determines the root directory for a media item.
"""
if hasattr(media, "library") and media.library:
for library in libraries:
if library.name == media.library:
return Path(library.path) / Path(
remove_special_characters(media.name)
)
return default_dir / Path(remove_special_characters(media.name))
def get_media_root_path(self, media: S) -> Path:
"""
To be implemented by subclasses if they have specific directory logic.
"""
raise NotImplementedError
def notify_import_success(self, media_name: str, media_type: str) -> None:
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title=f"{media_type.capitalize()} Downloaded",
message=f"{media_type.capitalize()} {media_name} has been successfully downloaded and imported.",
)
def notify_import_failure(
self, media_name: str, media_type: str, error_msg: str = ""
) -> None:
if self.notification_service:
msg = f"Failed to import files for {media_type} {media_name}."
if error_msg:
msg += f" Error: {error_msg}"
self.notification_service.send_notification_to_all_providers(
title="Import Failed",
message=msg,
)
def get_import_candidates(
self,
directory: Path,
metadata_provider: AbstractMetadataProvider,
search_func: Callable[
[str, AbstractMetadataProvider], list[MetaDataProviderSearchResult]
],
) -> MediaImportSuggestion:
# Implementation from previous turn
name, _ = self._extract_name_and_year(directory.name)
candidates = search_func(name, metadata_provider)
return MediaImportSuggestion(
directory=str(directory),
candidates=candidates,
)
def _extract_name_and_year(self, directory_name: str) -> tuple[str, int | None]:
import re
match = re.search(r"^(.*)\s\((\d{4})\)$", directory_name)
if match:
return match.group(1), int(match.group(2))
return directory_name, None
def get_importable_media(
self,
root_path: Path,
metadata_provider: AbstractMetadataProvider,
get_candidates_func: Callable[
[Path, AbstractMetadataProvider], MediaImportSuggestion
],
) -> list[MediaImportSuggestion]:
importable_dirs = get_importable_media_directories(root_path)
return [
get_candidates_func(directory, metadata_provider)
for directory in importable_dirs
]
def import_existing_media(
self,
media: S,
source_directory: Path,
import_func: Callable[[S, Path, Callable[[Any], None]], bool],
add_file_record_func: Callable[[Any], Any],
) -> bool:
success = import_func(media, source_directory, add_file_record_func)
if success:
log.info(f"Successfully imported {media.name} from {source_directory}")
return success
def import_all_torrents_base(
self,
get_media_func: Callable[[Any], S],
import_torrent_func: Callable[[Any, S], None],
media_type_name: str,
) -> None:
log.info(f"Importing all torrents for {media_type_name}")
torrents = self.torrent_service.get_completed_torrents()
for t in torrents:
if t.imported:
continue
try:
media = get_media_func(t)
if media:
import_torrent_func(t, media)
except Exception:
log.exception(f"Error importing torrent {t.title}")
log.info(f"Finished importing all torrents for {media_type_name}")
class BaseMetadataService[T, S]:
"""
Base service for metadata operations.
"""
def __init__(self, repository: BaseRepository[T, S]) -> None:
self.repository = repository
def check_if_exists(self, external_id: int, metadata_provider: str) -> bool:
try:
self.repository.get_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
except NotFoundError:
return False
else:
return True
def add_media_base(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider, # noqa: ARG002
get_metadata_func: Callable[..., S],
save_func: Callable[[S], S],
download_poster_func: Callable[[S], bool],
language: str | None = None,
) -> S:
media_with_metadata = get_metadata_func(external_id, language=language)
if not media_with_metadata:
raise NotFoundError
saved_media = save_func(media_with_metadata)
download_poster_func(saved_media)
return saved_media
def search_for_media_base(
self,
query: str,
metadata_provider: AbstractMetadataProvider,
search_func: Callable[[str | None], list[MetaDataProviderSearchResult]],
get_by_external_id_func: Callable[..., S],
) -> list[MetaDataProviderSearchResult]:
results = search_func(query)
for result in results:
if self.check_if_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
):
result.added = True
try:
media = get_by_external_id_func(
external_id=result.external_id,
metadata_provider=metadata_provider.name,
)
result.id = media.id
except Exception:
log.exception(
f"Unable to find internal ID for {result.external_id} on {metadata_provider.name}"
)
return results
def get_popular_media_base(
self,
metadata_provider: AbstractMetadataProvider,
search_func: Callable[[str | None], list[MetaDataProviderSearchResult]],
) -> list[MetaDataProviderSearchResult]:
results = search_func(None)
return [
result
for result in results
if not self.check_if_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
)
]
def update_all_metadata_base(
self,
get_all_to_update_func: Callable[[], list[S]],
update_single_func: Callable[[S, AbstractMetadataProvider], S | None],
tmdb_provider_class: Callable[[], AbstractMetadataProvider],
tvdb_provider_class: Callable[[], AbstractMetadataProvider],
media_type_name: str,
) -> None:
log.info(f"Updating metadata for all {media_type_name}")
media_list = get_all_to_update_func()
log.info(f"Found {len(media_list)} {media_type_name} to update")
for item in media_list:
try:
if item.metadata_provider == "tmdb":
provider = tmdb_provider_class()
elif item.metadata_provider == "tvdb":
provider = tvdb_provider_class()
else:
log.error(
f"Unsupported provider {item.metadata_provider} for {item.name}"
)
continue
update_single_func(item, provider)
except InvalidConfigError:
log.exception(f"Config error for {item.name}")
except Exception:
log.exception(f"Error updating {item.name}")

View File

@@ -5,6 +5,8 @@ from fastapi import Depends, HTTPException, Path
from media_manager.database import DbSessionDependency
from media_manager.exceptions import NotFoundError
from media_manager.indexer.dependencies import indexer_service_dep
from media_manager.movies.importer import MovieImportService
from media_manager.movies.metadata import MovieMetadataService
from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import Movie, MovieId
from media_manager.movies.service import MovieService
@@ -19,17 +21,51 @@ def get_movie_repository(db_session: DbSessionDependency) -> MovieRepository:
movie_repository_dep = Annotated[MovieRepository, Depends(get_movie_repository)]
def get_movie_metadata_service(
movie_repository: movie_repository_dep,
) -> MovieMetadataService:
return MovieMetadataService(movie_repository=movie_repository)
movie_metadata_service_dep = Annotated[
MovieMetadataService, Depends(get_movie_metadata_service)
]
def get_movie_import_service(
movie_repository: movie_repository_dep,
torrent_service: torrent_service_dep,
notification_service: notification_service_dep,
movie_metadata_service: movie_metadata_service_dep,
) -> MovieImportService:
return MovieImportService(
movie_repository=movie_repository,
torrent_service=torrent_service,
notification_service=notification_service,
movie_metadata_service=movie_metadata_service,
)
movie_import_service_dep = Annotated[
MovieImportService, Depends(get_movie_import_service)
]
def get_movie_service(
movie_repository: movie_repository_dep,
torrent_service: torrent_service_dep,
indexer_service: indexer_service_dep,
notification_service: notification_service_dep,
movie_import_service: movie_import_service_dep,
movie_metadata_service: movie_metadata_service_dep,
) -> MovieService:
return MovieService(
movie_repository=movie_repository,
torrent_service=torrent_service,
indexer_service=indexer_service,
notification_service=notification_service,
movie_import_service=movie_import_service,
movie_metadata_service=movie_metadata_service,
)

View File

@@ -0,0 +1,167 @@
import logging
import re
from collections.abc import Callable
from pathlib import Path
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.movies.metadata import MovieMetadataService
from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import Movie, MovieFile
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import Quality, Torrent
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
get_files_for_import,
import_file,
remove_special_characters,
)
log = logging.getLogger(__name__)
class MovieImportService(BaseMediaService[Movie, Movie]):
def __init__(
self,
movie_repository: MovieRepository,
torrent_service: TorrentService,
notification_service: NotificationService,
movie_metadata_service: MovieMetadataService,
) -> None:
super().__init__(
repository=movie_repository,
torrent_service=torrent_service,
indexer_service=None, # type: ignore[arg-type]
notification_service=notification_service,
)
self.movie_repository = movie_repository
self.movie_metadata_service = movie_metadata_service
def get_media_root_path(self, media: Movie) -> Path:
misc_config = MediaManagerConfig().misc
return self.get_root_directory(
media=media,
default_dir=misc_config.movie_directory,
libraries=misc_config.movie_libraries,
)
def import_movie(
self,
movie: Movie,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
if not video_files and not subtitle_files:
log.error(f"No video or subtitle files found for movie {movie.name}")
return False
movie_file_name = f"{remove_special_characters(movie.name)} ({movie.year})"
movie_root_path = self.get_media_root_path(media=movie)
if file_path_suffix:
movie_file_name += f" - {file_path_suffix}"
imported_any = False
try:
movie_root_path.mkdir(parents=True, exist_ok=True)
if video_files:
target_video_file = (
movie_root_path / f"{movie_file_name}{video_files[0].suffix}"
)
import_file(target_file=target_video_file, source_file=video_files[0])
imported_any = True
for subtitle_file in subtitle_files:
match = re.search(
r"[. ]([a-z]{2})\.srt$", subtitle_file.name, re.IGNORECASE
)
if match:
lang = match.group(1)
target = movie_root_path / f"{movie_file_name}.{lang}.srt"
import_file(target_file=target, source_file=subtitle_file)
imported_any = True
except Exception:
log.exception(f"Failed to import movie {movie.name}")
return False
else:
return imported_any
def import_torrent_files(self, torrent: Torrent, movie: Movie) -> None:
video_files, subtitle_files, _ = get_files_for_import(torrent=torrent)
if len(video_files) != 1:
self.notify_import_failure(
movie.name,
"movie",
"Multiple video files found. Manual import required.",
)
return
movie_files = self.torrent_service.get_movie_files_of_torrent(torrent=torrent)
if not movie_files:
torrent.imported = False
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
self.notify_import_failure(movie.name, "movie")
return
success = [
self.import_movie(movie, video_files, subtitle_files, mf.file_path_suffix)
for mf in movie_files
]
if all(success):
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
self.notify_import_success(movie.name, "movie")
else:
self.notify_import_failure(movie.name, "movie")
def get_import_candidates(
self, movie_path: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
return super().get_import_candidates(
directory=movie_path,
metadata_provider=metadata_provider,
search_func=self.movie_metadata_service.search_for_movie,
)
def import_existing_movie(self, movie: Movie, source_directory: Path) -> bool:
def _logic(m: Movie, path: Path, add_cb: Callable[[MovieFile], None]) -> bool:
v, s, _ = get_files_for_import(directory=path)
res = self.import_movie(m, v, s, "IMPORTED")
if res:
add_cb(
MovieFile(
movie_id=m.id,
file_path_suffix="IMPORTED",
torrent_id=None,
quality=Quality.unknown,
)
)
return res
return self.import_existing_media(
media=movie,
source_directory=source_directory,
import_func=_logic,
add_file_record_func=self.movie_repository.add_movie_file,
)
def get_importable_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
return self.get_importable_media(
root_path=MediaManagerConfig().misc.movie_directory,
metadata_provider=metadata_provider,
get_candidates_func=self.get_import_candidates,
)
def import_all_torrents(self) -> None:
self.import_all_torrents_base(
get_media_func=self.torrent_service.get_movie_of_torrent,
import_torrent_func=self.import_torrent_files,
media_type_name="movie",
)

View File

@@ -0,0 +1,86 @@
import logging
from media_manager.common.service import BaseMetadataService
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import Movie
log = logging.getLogger(__name__)
class MovieMetadataService(BaseMetadataService[Movie, Movie]):
def __init__(self, movie_repository: MovieRepository) -> None:
super().__init__(repository=movie_repository)
self.movie_repository = movie_repository
def add_movie(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Movie:
return self.add_media_base(
external_id=external_id,
metadata_provider=metadata_provider,
get_metadata_func=metadata_provider.get_movie_metadata,
save_func=self.movie_repository.save_movie,
download_poster_func=metadata_provider.download_movie_poster_image,
language=language,
)
def search_for_movie(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.search_for_media_base(
query=query,
metadata_provider=metadata_provider,
search_func=metadata_provider.search_movie,
get_by_external_id_func=self.movie_repository.get_movie_by_external_id,
)
def get_popular_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.get_popular_media_base(
metadata_provider=metadata_provider,
search_func=metadata_provider.search_movie,
)
def update_movie_metadata(
self, db_movie: Movie, metadata_provider: AbstractMetadataProvider
) -> Movie | None:
"""
Updates the metadata of a movie.
"""
log.debug(f"Found movie: {db_movie.name} for metadata update.")
fresh_movie_data = metadata_provider.get_movie_metadata(
movie_id=db_movie.external_id, language=db_movie.original_language
)
if not fresh_movie_data:
log.warning(f"Could not fetch fresh metadata for movie: {db_movie.name}")
return None
self.movie_repository.update_movie_attributes(
movie_id=db_movie.id,
name=fresh_movie_data.name,
overview=fresh_movie_data.overview,
year=fresh_movie_data.year,
imdb_id=fresh_movie_data.imdb_id,
)
updated_movie = self.movie_repository.get_movie_by_id(db_movie.id)
metadata_provider.download_movie_poster_image(movie=updated_movie)
return updated_movie
def update_all_metadata(self) -> None:
self.update_all_metadata_base(
get_all_to_update_func=self.movie_repository.get_movies,
update_single_func=self.update_movie_metadata,
tmdb_provider_class=TmdbMetadataProvider,
tvdb_provider_class=TvdbMetadataProvider,
media_type_name="movie",
)

View File

@@ -3,37 +3,21 @@ from uuid import UUID
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from media_manager.common.models import MediaFileMixin, MediaMixin
from media_manager.database import Base
from media_manager.torrent.models import Quality
class Movie(Base):
class Movie(Base, MediaMixin):
__tablename__ = "movie"
__table_args__ = (UniqueConstraint("external_id", "metadata_provider"),)
id: Mapped[UUID] = mapped_column(primary_key=True)
external_id: Mapped[int]
metadata_provider: Mapped[str]
name: Mapped[str]
overview: Mapped[str]
year: Mapped[int | None]
library: Mapped[str] = mapped_column(default="")
original_language: Mapped[str | None] = mapped_column(default=None)
imdb_id: Mapped[str | None] = mapped_column(default=None)
class MovieFile(Base):
class MovieFile(Base, MediaFileMixin):
__tablename__ = "movie_file"
__table_args__ = (PrimaryKeyConstraint("movie_id", "file_path_suffix"),)
movie_id: Mapped[UUID] = mapped_column(
ForeignKey(column="movie.id", ondelete="CASCADE"),
)
file_path_suffix: Mapped[str]
quality: Mapped[Quality]
torrent_id: Mapped[UUID | None] = mapped_column(
ForeignKey(column="torrent.id", ondelete="SET NULL"),
)
torrent = relationship("Torrent", back_populates="movie_files", uselist=False)

View File

@@ -1,13 +1,11 @@
import logging
from sqlalchemy import delete, select
from sqlalchemy.exc import (
IntegrityError,
SQLAlchemyError,
)
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from media_manager.exceptions import ConflictError, NotFoundError
from media_manager.common.repository import BaseRepository
from media_manager.exceptions import NotFoundError
from media_manager.movies.models import Movie, MovieFile
from media_manager.movies.schemas import (
Movie as MovieSchema,
@@ -27,219 +25,48 @@ from media_manager.torrent.schemas import TorrentId
log = logging.getLogger(__name__)
class MovieRepository:
class MovieRepository(BaseRepository[Movie, MovieSchema]):
"""
Repository for managing movies in the database.
Provides methods to retrieve, save, and delete movies.
"""
def __init__(self, db: Session) -> None:
self.db = db
super().__init__(db, Movie, MovieSchema)
def get_movie_by_id(self, movie_id: MovieId) -> MovieSchema:
"""
Retrieve a movie by its ID.
:param movie_id: The ID of the movie to retrieve.
:return: A Movie object if found.
:raises NotFoundError: If the movie with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(Movie).where(Movie.id == movie_id)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Movie with id {movie_id} not found."
raise NotFoundError(msg)
return MovieSchema.model_validate(result)
except SQLAlchemyError:
log.exception(f"Database error while retrieving movie {movie_id}")
raise
return self.get_by_id(entity_id=movie_id)
def get_movie_by_external_id(
self, external_id: int, metadata_provider: str
) -> MovieSchema:
"""
Retrieve a movie by its external ID.
:param external_id: The ID of the movie to retrieve.
:param metadata_provider: The metadata provider associated with the ID.
:return: A Movie object if found.
:raises NotFoundError: If the movie with the given external ID and provider is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Movie)
.where(Movie.external_id == external_id)
.where(Movie.metadata_provider == metadata_provider)
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Movie with external_id {external_id} and provider {metadata_provider} not found."
raise NotFoundError(msg)
return MovieSchema.model_validate(result)
except SQLAlchemyError:
log.exception(
f"Database error while retrieving movie by external_id {external_id}"
)
raise
return self.get_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
def get_movies(self) -> list[MovieSchema]:
"""
Retrieve all movies from the database.
:return: A list of Movie objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(Movie)
results = self.db.execute(stmt).scalars().unique().all()
return [MovieSchema.model_validate(movie) for movie in results]
except SQLAlchemyError:
log.exception("Database error while retrieving all movies")
raise
def save_movie(self, movie: MovieSchema) -> MovieSchema:
"""
Save a new movie or update an existing one in the database.
:param movie: The Movie object to save.
:return: The saved Movie object.
:raises ValueError: If a movie with the same primary key already exists (on insert).
:raises SQLAlchemyError: If a database error occurs.
"""
log.debug(f"Attempting to save movie: {movie.name} (ID: {movie.id})")
db_movie = self.db.get(Movie, movie.id) if movie.id else None
if db_movie: # Update existing movie
log.debug(f"Updating existing movie with ID: {movie.id}")
db_movie.external_id = movie.external_id
db_movie.metadata_provider = movie.metadata_provider
db_movie.name = movie.name
db_movie.overview = movie.overview
db_movie.year = movie.year
db_movie.original_language = movie.original_language
db_movie.imdb_id = movie.imdb_id
else: # Insert new movie
log.debug(f"Creating new movie: {movie.name}")
db_movie = Movie(**movie.model_dump())
self.db.add(db_movie)
try:
self.db.commit()
self.db.refresh(db_movie)
log.info(f"Successfully saved movie: {db_movie.name} (ID: {db_movie.id})")
return MovieSchema.model_validate(db_movie)
except IntegrityError as e:
self.db.rollback()
log.exception(f"Integrity error while saving movie {movie.name}")
msg = (
f"Movie with this primary key or unique constraint violation: {e.orig}"
)
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while saving movie {movie.name}")
raise
return self.get_all()
def delete_movie(self, movie_id: MovieId) -> None:
"""
Delete a movie by its ID.
:param movie_id: The ID of the movie to delete.
:raises NotFoundError: If the movie with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
log.debug(f"Attempting to delete movie with id: {movie_id}")
try:
movie = self.db.get(Movie, movie_id)
if not movie:
log.warning(f"Movie with id {movie_id} not found for deletion.")
msg = f"Movie with id {movie_id} not found."
raise NotFoundError(msg)
self.db.delete(movie)
self.db.commit()
log.info(f"Successfully deleted movie with id: {movie_id}")
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while deleting movie {movie_id}")
raise
self.delete(entity_id=movie_id)
def set_movie_library(self, movie_id: MovieId, library: str) -> None:
"""
Sets the library for a movie.
self.set_library(entity_id=movie_id, library=library)
:param movie_id: The ID of the movie to update.
:param library: The library path to set for the movie.
:raises NotFoundError: If the movie with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
movie = self.db.get(Movie, movie_id)
if not movie:
msg = f"movie with id {movie_id} not found."
raise NotFoundError(msg)
movie.library = library
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error setting library for movie {movie_id}")
raise
def save_movie(self, movie: MovieSchema) -> MovieSchema:
return self.save_media_base(media_schema=movie, model_class=Movie)
def add_movie_file(self, movie_file: MovieFileSchema) -> MovieFileSchema:
"""
Adds a movie file record to the database.
:param movie_file: The MovieFile object to add.
:return: The added MovieFile object.
:raises IntegrityError: If the record violates constraints.
:raises SQLAlchemyError: If a database error occurs.
"""
db_model = MovieFile(**movie_file.model_dump())
try:
self.db.add(db_model)
self.db.commit()
self.db.refresh(db_model)
return MovieFileSchema.model_validate(db_model)
except IntegrityError:
self.db.rollback()
log.exception("Integrity error while adding movie file")
raise
except SQLAlchemyError:
self.db.rollback()
log.exception("Database error while adding movie file")
raise
return self.add_media_file_base(
file_schema=movie_file, model_class=MovieFile, schema_class=MovieFileSchema
)
def remove_movie_files_by_torrent_id(self, torrent_id: TorrentId) -> int:
"""
Removes movie file records associated with a given torrent ID.
:param torrent_id: The ID of the torrent whose movie files are to be removed.
:return: The number of movie files removed.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = delete(MovieFile).where(MovieFile.torrent_id == torrent_id)
result = self.db.execute(stmt)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(
f"Database error removing movie files for torrent_id {torrent_id}"
)
raise
return result.rowcount
return self.remove_files_by_torrent_id_base(
torrent_id=torrent_id, model_class=MovieFile
)
def get_movie_files_by_movie_id(self, movie_id: MovieId) -> list[MovieFileSchema]:
"""
Retrieve all movie files for a given movie ID.
:param movie_id: The ID of the movie.
:return: A list of MovieFile objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(MovieFile).where(MovieFile.movie_id == movie_id)
results = self.db.execute(stmt).scalars().all()
@@ -251,13 +78,6 @@ class MovieRepository:
raise
def get_torrents_by_movie_id(self, movie_id: MovieId) -> list[MovieTorrentSchema]:
"""
Retrieve all torrents associated with a given movie ID.
:param movie_id: The ID of the movie.
:return: A list of Torrent objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Torrent, MovieFile.file_path_suffix)
@@ -278,20 +98,13 @@ class MovieRepository:
usenet=torrent.usenet,
)
formatted_results.append(movie_torrent)
except SQLAlchemyError:
log.exception(f"Database error retrieving torrents for movie_id {movie_id}")
raise
return formatted_results
else:
return formatted_results
def get_all_movies_with_torrents(self) -> list[MovieSchema]:
"""
Retrieve all movies that are associated with a torrent, ordered alphabetically by movie name.
:return: A list of Movie objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Movie)
@@ -307,14 +120,6 @@ class MovieRepository:
raise
def get_movie_by_torrent_id(self, torrent_id: TorrentId) -> MovieSchema:
"""
Retrieve a movie by a torrent ID.
:param torrent_id: The ID of the torrent to retrieve the movie for.
:return: A Movie object.
:raises NotFoundError: If the movie for the given torrent ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Movie)
@@ -325,10 +130,11 @@ class MovieRepository:
if not result:
msg = f"Movie for torrent_id {torrent_id} not found."
raise NotFoundError(msg)
return MovieSchema.model_validate(result)
except SQLAlchemyError:
log.exception(f"Database error retrieving movie by torrent_id {torrent_id}")
raise
else:
return MovieSchema.model_validate(result)
def update_movie_attributes(
self,
@@ -338,36 +144,11 @@ class MovieRepository:
year: int | None = None,
imdb_id: str | None = None,
) -> MovieSchema:
"""
Update attributes of an existing movie.
:param imdb_id: The new IMDb ID for the movie.
:param movie_id: The ID of the movie to update.
:param name: The new name for the movie.
:param overview: The new overview for the movie.
:param year: The new year for the movie.
:return: The updated MovieSchema object.
"""
db_movie = self.db.get(Movie, movie_id)
if not db_movie:
msg = f"Movie with id {movie_id} not found."
raise NotFoundError(msg)
updated = False
if name is not None and db_movie.name != name:
db_movie.name = name
updated = True
if overview is not None and db_movie.overview != overview:
db_movie.overview = overview
updated = True
if year is not None and db_movie.year != year:
db_movie.year = year
updated = True
if imdb_id is not None and db_movie.imdb_id != imdb_id:
db_movie.imdb_id = imdb_id
updated = True
if updated:
self.db.commit()
self.db.refresh(db_movie)
return MovieSchema.model_validate(db_movie)
return self.update_media_attributes_base(
media_id=movie_id,
model_class=Movie,
name=name,
overview=overview,
year=year,
imdb_id=imdb_id,
)

View File

@@ -13,6 +13,8 @@ from media_manager.metadataProvider.dependencies import metadata_provider_dep
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.movies.dependencies import (
movie_dep,
movie_import_service_dep,
movie_metadata_service_dep,
movie_service_dep,
)
from media_manager.movies.schemas import (
@@ -38,13 +40,13 @@ router = APIRouter()
)
def search_for_movie(
query: str,
movie_service: movie_service_dep,
movie_metadata_service: movie_metadata_service_dep,
metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]:
"""
Search for a movie on the configured metadata provider.
"""
return movie_service.search_for_movie(
return movie_metadata_service.search_for_movie(
query=query, metadata_provider=metadata_provider
)
@@ -54,13 +56,15 @@ def search_for_movie(
dependencies=[Depends(current_active_user)],
)
def get_popular_movies(
movie_service: movie_service_dep,
movie_metadata_service: movie_metadata_service_dep,
metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]:
"""
Get a list of recommended/popular movies from the metadata provider.
"""
return movie_service.get_popular_movies(metadata_provider=metadata_provider)
return movie_metadata_service.get_popular_movies(
metadata_provider=metadata_provider
)
# -----------------------------------------------------------------------------
@@ -74,12 +78,15 @@ def get_popular_movies(
dependencies=[Depends(current_superuser)],
)
def get_all_importable_movies(
movie_service: movie_service_dep, metadata_provider: metadata_provider_dep
movie_import_service: movie_import_service_dep,
metadata_provider: metadata_provider_dep,
) -> list[MediaImportSuggestion]:
"""
Get a list of unknown movies that were detected in the movie directory and are importable.
"""
return movie_service.get_importable_movies(metadata_provider=metadata_provider)
return movie_import_service.get_importable_movies(
metadata_provider=metadata_provider
)
@router.post(
@@ -88,7 +95,7 @@ def get_all_importable_movies(
status_code=status.HTTP_204_NO_CONTENT,
)
def import_detected_movie(
movie_service: movie_service_dep, movie: movie_dep, directory: str
movie_import_service: movie_import_service_dep, movie: movie_dep, directory: str
) -> None:
"""
Import a detected movie from the specified directory into the library.
@@ -98,7 +105,7 @@ def import_detected_movie(
MediaManagerConfig().misc.movie_directory
):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")
success = movie_service.import_existing_movie(
success = movie_import_service.import_existing_movie(
movie=movie, source_directory=source_directory
)
if not success:
@@ -133,7 +140,7 @@ def get_all_movies(movie_service: movie_service_dep) -> list[Movie]:
},
)
def add_a_movie(
movie_service: movie_service_dep,
movie_metadata_service: movie_metadata_service_dep,
metadata_provider: metadata_provider_dep,
movie_id: int,
language: str | None = None,
@@ -142,13 +149,13 @@ def add_a_movie(
Add a new movie to the library.
"""
try:
movie = movie_service.add_movie(
movie = movie_metadata_service.add_movie(
external_id=movie_id,
metadata_provider=metadata_provider,
language=language,
)
except ConflictError:
movie = movie_service.get_movie_by_external_id(
movie = movie_metadata_service.movie_repository.get_movie_by_external_id(
external_id=movie_id, metadata_provider=metadata_provider.name
)
if not movie:

View File

@@ -4,34 +4,19 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from media_manager.common.schemas import BaseMedia, BaseMediaFile
from media_manager.torrent.models import Quality
from media_manager.torrent.schemas import TorrentId, TorrentStatus
MovieId = typing.NewType("MovieId", UUID)
class Movie(BaseModel):
model_config = ConfigDict(from_attributes=True)
class Movie(BaseMedia):
id: MovieId = Field(default_factory=lambda: MovieId(uuid.uuid4()))
name: str
overview: str
year: int | None
external_id: int
metadata_provider: str
library: str = "Default"
original_language: str | None = None
imdb_id: str | None = None
class MovieFile(BaseModel):
model_config = ConfigDict(from_attributes=True)
class MovieFile(BaseMediaFile):
movie_id: MovieId
file_path_suffix: str
quality: Quality
torrent_id: TorrentId | None = None
class PublicMovieFile(MovieFile):
@@ -52,7 +37,7 @@ class MovieTorrent(BaseModel):
class PublicMovie(Movie):
downloaded: bool = False
torrents: list[MovieTorrent] = []
torrents: list[MovieTorrent] = Field(default_factory=list)
class RichMovieTorrent(BaseModel):

View File

@@ -1,22 +1,16 @@
import re
import logging
import shutil
from pathlib import Path
from typing import overload
from sqlalchemy.exc import IntegrityError
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig
from media_manager.exceptions import InvalidConfigError, NotFoundError, RenameError
from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId
from media_manager.indexer.service import IndexerService
from media_manager.indexer.utils import evaluate_indexer_query_results
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.movies import log
from media_manager.movies.importer import MovieImportService
from media_manager.movies.metadata import MovieMetadataService
from media_manager.movies.repository import MovieRepository
from media_manager.movies.schemas import (
Movie,
@@ -27,58 +21,33 @@ from media_manager.movies.schemas import (
RichMovieTorrent,
)
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import (
Quality,
Torrent,
TorrentStatus,
)
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
extract_external_id_from_string,
get_files_for_import,
get_importable_media_directories,
import_file,
remove_special_characters,
remove_special_chars_and_parentheses,
)
log = logging.getLogger(__name__)
class MovieService:
class MovieService(BaseMediaService[Movie, Movie]):
def __init__(
self,
movie_repository: MovieRepository,
torrent_service: TorrentService,
indexer_service: IndexerService,
notification_service: NotificationService,
movie_import_service: MovieImportService,
movie_metadata_service: MovieMetadataService,
) -> None:
self.movie_repository = movie_repository
self.torrent_service = torrent_service
self.indexer_service = indexer_service
self.notification_service = notification_service
def add_movie(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Movie:
"""
Add a new movie to the database.
:param external_id: The ID of the movie in the metadata provider's system.
:param metadata_provider: The name of the metadata provider.
:param language: Optional language code (ISO 639-1) to fetch metadata in.
"""
movie_with_metadata = metadata_provider.get_movie_metadata(
movie_id=external_id, language=language
super().__init__(
repository=movie_repository,
torrent_service=torrent_service,
indexer_service=indexer_service,
notification_service=notification_service,
)
if not movie_with_metadata:
raise NotFoundError
saved_movie = self.movie_repository.save_movie(movie=movie_with_metadata)
metadata_provider.download_movie_poster_image(movie=saved_movie)
return saved_movie
self.movie_repository = movie_repository
self.movie_import_service = movie_import_service
self.movie_metadata_service = movie_metadata_service
def delete_movie(
self,
@@ -121,12 +90,10 @@ class MovieService:
)
log.info(f"Deleted torrent: {torrent.torrent_title}")
except Exception:
log.warning(
f"Failed to delete torrent {torrent.hash}", exc_info=True
)
log.exception(f"Failed to delete torrent {torrent.hash}")
# Delete from database
self.movie_repository.delete_movie(movie_id=movie.id)
self.movie_repository.delete_movie(movie.id)
def get_public_movie_files(self, movie: Movie) -> list[PublicMovieFile]:
"""
@@ -145,62 +112,6 @@ class MovieService:
result.append(movie_file)
return result
@overload
def check_if_movie_exists(
self, *, external_id: int, metadata_provider: str
) -> bool:
"""
Check if a movie exists in the database.
:param external_id: The external ID of the movie.
:param metadata_provider: The metadata provider.
:return: True if the movie exists, False otherwise.
"""
@overload
def check_if_movie_exists(self, *, movie_id: MovieId) -> bool:
"""
Check if a movie exists in the database.
:param movie_id: The ID of the movie.
:return: True if the movie exists, False otherwise.
"""
def check_if_movie_exists(
self,
*,
external_id=None,
metadata_provider=None,
movie_id=None,
) -> bool:
"""
Check if a movie exists in the database.
:param external_id: The external ID of the movie.
:param metadata_provider: The metadata provider.
:param movie_id: The ID of the movie.
:return: True if the movie exists, False otherwise.
:raises ValueError: If neither external ID and metadata provider nor movie ID are provided.
"""
if not (external_id is None or metadata_provider is None):
try:
self.movie_repository.get_movie_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
except NotFoundError:
return False
elif movie_id is not None:
try:
self.movie_repository.get_movie_by_id(movie_id=movie_id)
except NotFoundError:
return False
else:
msg = "Use one of the provided overloads for this function!"
raise ValueError(msg)
return True
def get_all_available_torrents_for_movie(
self, movie: Movie, search_query_override: str | None = None
) -> list[IndexerQueryResult]:
@@ -220,63 +131,6 @@ class MovieService:
is_tv=False, query_results=torrents, media=movie
)
def get_all_movies(self) -> list[Movie]:
"""
Get all movies.
:return: A list of all movies.
"""
return self.movie_repository.get_movies()
def search_for_movie(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Search for movies using a given query.
:param query: The search query.
:param metadata_provider: The metadata provider to search.
:return: A list of metadata provider movie search results.
"""
results = metadata_provider.search_movie(query)
for result in results:
if self.check_if_movie_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
):
result.added = True
# Fetch the internal movie ID.
try:
movie = self.movie_repository.get_movie_by_external_id(
external_id=result.external_id,
metadata_provider=metadata_provider.name,
)
result.id = movie.id
except Exception:
log.error(
f"Unable to find internal movie ID for {result.external_id} on {metadata_provider.name}"
)
return results
def get_popular_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Get popular movies from a given metadata provider.
:param metadata_provider: The metadata provider to use.
:return: A list of metadata provider movie search results.
"""
results = metadata_provider.search_movie()
return [
result
for result in results
if not self.check_if_movie_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
)
]
def get_public_movie_by_id(self, movie: Movie) -> PublicMovie:
"""
Get a public movie from a Movie object.
@@ -297,7 +151,7 @@ class MovieService:
:param movie_id: The ID of the movie.
:return: The movie.
"""
return self.movie_repository.get_movie_by_id(movie_id=movie_id)
return self.movie_repository.get_movie_by_id(movie_id)
def is_movie_downloaded(self, movie: Movie) -> bool:
"""
@@ -326,9 +180,7 @@ class MovieService:
torrent_file = self.torrent_service.get_torrent_by_id(
torrent_id=movie_file.torrent_id
)
if torrent_file.imported:
return True
return False
return bool(torrent_file.imported)
def get_movie_by_external_id(
self, external_id: int, metadata_provider: str
@@ -345,7 +197,13 @@ class MovieService:
)
def set_movie_library(self, movie: Movie, library: str) -> None:
self.movie_repository.set_movie_library(movie_id=movie.id, library=library)
self.movie_repository.set_movie_library(movie.id, library)
def get_all_movies(self) -> list[Movie]:
"""
Get all movies in the library.
"""
return self.get_all_media()
def get_torrents_for_movie(self, movie: Movie) -> RichMovieTorrent:
"""
@@ -418,288 +276,20 @@ class MovieService:
def get_movie_root_path(self, movie: Movie) -> Path:
misc_config = MediaManagerConfig().misc
movie_file_path = (
misc_config.movie_directory
/ f"{remove_special_characters(movie.name)} ({movie.year}) [{movie.metadata_provider}id-{movie.external_id}]"
return self.get_root_directory(
media=movie,
default_dir=misc_config.movie_directory,
libraries=misc_config.movie_libraries,
)
log.debug(
f"Movie {movie.name} without special characters: {remove_special_characters(movie.name)}"
)
if movie.library != "Default":
for library in misc_config.movie_libraries:
if library.name == movie.library:
log.debug(f"Using library {library.name} for movie {movie.name}")
return (
Path(library.path)
/ f"{remove_special_characters(movie.name)} ({movie.year}) [{movie.metadata_provider}id-{movie.external_id}]"
)
else:
log.warning(
f"Library {movie.library} not found in config, using default library"
)
return movie_file_path
def import_movie(
self,
movie: Movie,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
movie_file_name = f"{remove_special_characters(movie.name)} ({movie.year})"
movie_root_path = self.get_movie_root_path(movie=movie)
success: bool = False
if file_path_suffix != "":
movie_file_name += f" - {file_path_suffix}"
try:
movie_root_path.mkdir(parents=True, exist_ok=True)
except Exception:
log.exception("Failed to create directory {movie_root_path}")
return False
# import movie video
if video_files:
target_video_file = (
movie_root_path / f"{movie_file_name}{video_files[0].suffix}"
)
import_file(target_file=target_video_file, source_file=video_files[0])
success = True
# import subtitles
for subtitle_file in subtitle_files:
language_code_match = re.search(
r"[. ]([a-z]{2})\.srt$", subtitle_file.name, re.IGNORECASE
)
if not language_code_match:
log.warning(
f"Subtitle file {subtitle_file.name} does not match expected format, can't extract language code, skipping."
)
continue
language_code = language_code_match.group(1)
target_subtitle_file = (
movie_root_path / f"{movie_file_name}.{language_code}.srt"
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
return success
def import_torrent_files(self, torrent: Torrent, movie: Movie) -> None:
"""
Organizes files from a torrent into the movie directory structure.
:param torrent: The Torrent object
:param movie: The Movie object
"""
video_files, subtitle_files, _all_files = get_files_for_import(torrent=torrent)
if len(video_files) != 1:
# Send notification about multiple video files found
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Manual Import Required",
message=f"Multiple video files found for movie {movie.name}. Please import manually.",
)
log.error(
f"Found {len(video_files)} video files for movie {movie.name}, expected 1. Skipping auto import."
)
return
log.debug(
f"Importing these {len(video_files)} video files and {len(subtitle_files)} subtitle files"
)
movie_files: list[MovieFile] = self.torrent_service.get_movie_files_of_torrent(
torrent=torrent
)
log.info(
f"Found {len(movie_files)} movie files associated with torrent {torrent.title}"
)
success = [
self.import_movie(
movie, video_files, subtitle_files, movie_file.file_path_suffix
)
for movie_file in movie_files
]
if all(success):
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Movie Downloaded",
message=f"Movie {movie.name} has been successfully downloaded and imported.",
)
else:
log.error(
f"Failed to import files for torrent {torrent.title}. Check logs for details."
)
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Import Failed",
message=f"Failed to import files for movie {movie.name}. Please check logs.",
)
log.info(f"Finished importing files for torrent {torrent.title}")
def get_import_candidates(
self, movie: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_movie(
query=remove_special_chars_and_parentheses(movie.name),
metadata_provider=metadata_provider,
)
import_candidates = MediaImportSuggestion(
directory=movie,
candidates=search_result,
)
log.debug(
f"Found {len(search_result)} candidates for {movie.name} in {movie.parent}"
)
return import_candidates
def import_existing_movie(self, movie: Movie, source_directory: Path) -> bool:
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.exception(f"Failed to rename {source_directory} to {new_source_path}")
raise RenameError from e
video_files, subtitle_files, _all_files = get_files_for_import(
directory=new_source_path
)
success = self.import_movie(
movie=movie,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
)
if success:
self.movie_repository.add_movie_file(
MovieFile(
movie_id=movie.id,
file_path_suffix="IMPORTED",
torrent_id=None,
quality=Quality.unknown,
)
)
return success
def update_movie_metadata(
self, db_movie: Movie, metadata_provider: AbstractMetadataProvider
) -> Movie | None:
"""
Updates the metadata of a movie.
:param metadata_provider: The metadata provider object to fetch fresh data from.
:param db_movie: The Movie to update
:return: The updated Movie object, or None if the movie is not found or an error occurs.
"""
log.debug(f"Found movie: {db_movie.name} for metadata update.")
# Use stored original_language preference for metadata fetching
fresh_movie_data = metadata_provider.get_movie_metadata(
movie_id=db_movie.external_id, language=db_movie.original_language
)
if not fresh_movie_data:
log.warning(
f"Could not fetch fresh metadata for movie: {db_movie.name} ({db_movie.year})"
)
return None
log.debug(f"Fetched fresh metadata for movie: {fresh_movie_data.name}")
self.movie_repository.update_movie_attributes(
movie_id=db_movie.id,
name=fresh_movie_data.name,
overview=fresh_movie_data.overview,
year=fresh_movie_data.year,
imdb_id=fresh_movie_data.imdb_id,
)
updated_movie = self.movie_repository.get_movie_by_id(movie_id=db_movie.id)
log.info(
f"Successfully updated metadata for movie: {db_movie.name} ({db_movie.year})"
)
metadata_provider.download_movie_poster_image(movie=updated_movie)
return updated_movie
def get_importable_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
movie_root_path = MediaManagerConfig().misc.movie_directory
importable_movies: list[MediaImportSuggestion] = []
candidate_dirs = get_importable_media_directories(movie_root_path)
for movie_dir in candidate_dirs:
metadata, external_id = extract_external_id_from_string(movie_dir.name)
if metadata is not None and external_id is not None:
try:
self.movie_repository.get_movie_by_external_id(
external_id=external_id, metadata_provider=metadata
)
log.debug(
f"Movie {movie_dir.name} already exists in the database, skipping."
)
continue
except NotFoundError:
log.debug(
f"Movie {movie_dir.name} not found in database, checking for import candidates."
)
import_candidates = self.get_import_candidates(
movie=movie_dir, metadata_provider=metadata_provider
)
importable_movies.append(import_candidates)
log.debug(f"Found {len(importable_movies)} importable movies.")
return importable_movies
def import_all_torrents(self) -> None:
log.info("Importing all torrents")
torrents = self.torrent_service.get_all_torrents()
log.info("Found %d torrents to import", len(torrents))
for t in torrents:
try:
if not t.imported and t.status == TorrentStatus.finished:
movie = self.torrent_service.get_movie_of_torrent(torrent=t)
if movie is None:
log.warning(
f"torrent {t.title} is not a movie torrent, skipping import."
)
continue
self.import_torrent_files(torrent=t, movie=movie)
except RuntimeError:
log.exception(f"Failed to import torrent {t.title}")
log.info("Finished importing all torrents")
"""
Delegate to MovieImportService.
"""
self.movie_import_service.import_all_torrents()
def update_all_metadata(self) -> None:
"""Updates the metadata of all movies."""
log.info("Updating metadata for all movies")
movies = self.movie_repository.get_movies()
log.info(f"Found {len(movies)} movies to update")
for movie in movies:
try:
if movie.metadata_provider == "tmdb":
metadata_provider = TmdbMetadataProvider()
elif movie.metadata_provider == "tvdb":
metadata_provider = TvdbMetadataProvider()
else:
log.error(
f"Unsupported metadata provider {movie.metadata_provider} for movie {movie.name}, skipping update."
)
continue
except InvalidConfigError:
log.exception(
f"Error initializing metadata provider {movie.metadata_provider} for movie {movie.name}",
)
continue
self.update_movie_metadata(
db_movie=movie, metadata_provider=metadata_provider
)
"""
Delegate to MovieMetadataService.
"""
self.movie_metadata_service.update_all_metadata()

View File

@@ -4,7 +4,7 @@ from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.movies.schemas import Movie, MovieFile
from media_manager.torrent.manager import DownloadManager
from media_manager.torrent.repository import TorrentRepository
from media_manager.torrent.schemas import Torrent, TorrentId
from media_manager.torrent.schemas import Torrent, TorrentId, TorrentStatus
from media_manager.tv.schemas import EpisodeFile, Show
log = logging.getLogger(__name__)
@@ -96,6 +96,13 @@ class TorrentService:
log.exception(f"Error fetching status for torrent {x.title}")
return torrents
def get_completed_torrents(self) -> list[Torrent]:
return [
t
for t in self.get_all_torrents()
if t.status == TorrentStatus.finished and not t.imported
]
def get_torrent_by_id(self, torrent_id: TorrentId) -> Torrent:
return self.get_torrent_status(
self.torrent_repository.get_torrent_by_id(torrent_id=torrent_id)

View File

@@ -7,6 +7,8 @@ from media_manager.exceptions import NotFoundError
from media_manager.indexer.dependencies import indexer_service_dep
from media_manager.notification.dependencies import notification_service_dep
from media_manager.torrent.dependencies import torrent_service_dep
from media_manager.tv.importer import TvImportService
from media_manager.tv.metadata import TvMetadataService
from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import Season, SeasonId, Show, ShowId
from media_manager.tv.service import TvService
@@ -19,17 +21,47 @@ def get_tv_repository(db_session: DbSessionDependency) -> TvRepository:
tv_repository_dep = Annotated[TvRepository, Depends(get_tv_repository)]
def get_tv_metadata_service(
tv_repository: tv_repository_dep,
) -> TvMetadataService:
return TvMetadataService(tv_repository=tv_repository)
tv_metadata_service_dep = Annotated[TvMetadataService, Depends(get_tv_metadata_service)]
def get_tv_import_service(
tv_repository: tv_repository_dep,
torrent_service: torrent_service_dep,
notification_service: notification_service_dep,
tv_metadata_service: tv_metadata_service_dep,
) -> TvImportService:
return TvImportService(
tv_repository=tv_repository,
torrent_service=torrent_service,
notification_service=notification_service,
tv_metadata_service=tv_metadata_service,
)
tv_import_service_dep = Annotated[TvImportService, Depends(get_tv_import_service)]
def get_tv_service(
tv_repository: tv_repository_dep,
torrent_service: torrent_service_dep,
indexer_service: indexer_service_dep,
notification_service: notification_service_dep,
tv_import_service: tv_import_service_dep,
tv_metadata_service: tv_metadata_service_dep,
) -> TvService:
return TvService(
tv_repository=tv_repository,
torrent_service=torrent_service,
indexer_service=indexer_service,
notification_service=notification_service,
tv_import_service=tv_import_service,
tv_metadata_service=tv_metadata_service,
)

View File

@@ -0,0 +1,154 @@
import logging
import re
from collections.abc import Callable
from pathlib import Path
from typing import Any
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import Quality, Torrent
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
get_files_for_import,
get_torrent_filepath,
import_file,
remove_special_characters,
)
from media_manager.tv.metadata import TvMetadataService
from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import EpisodeFile, Show
log = logging.getLogger(__name__)
class TvImportService(BaseMediaService[Show, Show]):
def __init__(
self,
tv_repository: TvRepository,
torrent_service: TorrentService,
notification_service: NotificationService,
tv_metadata_service: TvMetadataService,
) -> None:
super().__init__(
repository=tv_repository,
torrent_service=torrent_service,
indexer_service=None, # type: ignore[arg-type]
notification_service=notification_service,
)
self.tv_repository = tv_repository
self.tv_metadata_service = tv_metadata_service
def get_media_root_path(self, media: Show) -> Path:
misc_config = MediaManagerConfig().misc
return self.get_root_directory(
media=media,
default_dir=misc_config.tv_directory,
libraries=misc_config.tv_libraries,
)
def import_tv_show(
self,
show: Show,
source_directory: Path,
quality: Quality = Quality.unknown,
torrent_id: str | None = None,
file_path_suffix: str = "",
) -> bool:
video_files, _, _ = get_files_for_import(directory=source_directory)
if not video_files:
return False
any_imported = False
for video_file in video_files:
# Simple heuristic for season/episode from filename
match = re.search(r"S(\d+)E(\d+)", video_file.name, re.IGNORECASE)
if match:
s_num, e_num = int(match.group(1)), int(match.group(2))
season_dir = self.get_media_root_path(show) / f"Season {s_num}"
season_dir.mkdir(parents=True, exist_ok=True)
target_name = (
f"{remove_special_characters(show.name)} - S{s_num:02d}E{e_num:02d}"
)
if file_path_suffix:
target_name += f" - {file_path_suffix}"
target_file = season_dir / f"{target_name}{video_file.suffix}"
import_file(target_file=target_file, source_file=video_file)
any_imported = True
# Update DB
try:
season = self.tv_repository.get_season_by_number(s_num, show.id)
episode = next(
(e for e in season.episodes if e.number == e_num), None
)
if episode:
self.tv_repository.add_episode_file(
EpisodeFile(
episode_id=episode.id,
quality=quality,
torrent_id=torrent_id,
file_path_suffix=file_path_suffix,
)
)
except Exception:
log.exception(f"Could not update DB for {video_file.name}")
return any_imported
def import_torrent_files(self, torrent: Torrent, show: Show) -> None:
success = self.import_tv_show(
show=show,
source_directory=get_torrent_filepath(torrent),
quality=torrent.quality,
torrent_id=torrent.id,
)
if success:
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
self.notify_import_success(show.name, "TV show")
else:
self.notify_import_failure(show.name, "TV show")
def get_import_candidates(
self, tv_path: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
return super().get_import_candidates(
directory=tv_path,
metadata_provider=metadata_provider,
search_func=self.tv_metadata_service.search_for_show,
)
def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> bool:
def _logic(s: Show, path: Path, _: Callable[[Any], Any]) -> bool:
return self.import_tv_show(s, path, file_path_suffix="IMPORTED")
return self.import_existing_media(
media=tv_show,
source_directory=source_directory,
import_func=_logic,
add_file_record_func=lambda _: (
None
), # Handled inside import_tv_show for now
)
def get_importable_tv_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
return self.get_importable_media(
root_path=MediaManagerConfig().misc.tv_directory,
metadata_provider=metadata_provider,
get_candidates_func=self.get_import_candidates,
)
def import_all_torrents(self) -> None:
self.import_all_torrents_base(
get_media_func=self.torrent_service.get_show_of_torrent,
import_torrent_func=self.import_torrent_files,
media_type_name="tv show",
)

View File

@@ -0,0 +1,151 @@
import logging
from media_manager.common.service import BaseMetadataService
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import Episode, EpisodeId, Season, SeasonId, Show
log = logging.getLogger(__name__)
class TvMetadataService(BaseMetadataService[Show, Show]):
def __init__(self, tv_repository: TvRepository) -> None:
super().__init__(repository=tv_repository)
self.tv_repository = tv_repository
def add_show(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Show:
return self.add_media_base(
external_id=external_id,
metadata_provider=metadata_provider,
get_metadata_func=metadata_provider.get_show_metadata,
save_func=self.tv_repository.save_show,
download_poster_func=metadata_provider.download_show_poster_image,
language=language,
)
def search_for_show(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.search_for_media_base(
query=query,
metadata_provider=metadata_provider,
search_func=metadata_provider.search_show,
get_by_external_id_func=self.tv_repository.get_show_by_external_id,
)
def get_popular_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
return self.get_popular_media_base(
metadata_provider=metadata_provider,
search_func=metadata_provider.search_show,
)
def update_show_metadata(
self, db_show: Show, metadata_provider: AbstractMetadataProvider
) -> Show | None:
"""
Updates the metadata of a show.
"""
log.debug(f"Found show: {db_show.name} for metadata update.")
fresh_show_data = metadata_provider.get_show_metadata(
show_id=db_show.external_id, language=db_show.original_language
)
if not fresh_show_data:
log.warning(f"Could not fetch fresh metadata for show {db_show.name}")
return db_show
self.tv_repository.update_show_attributes(
show_id=db_show.id,
name=fresh_show_data.name,
overview=fresh_show_data.overview,
year=fresh_show_data.year,
ended=fresh_show_data.ended,
imdb_id=fresh_show_data.imdb_id,
continuous_download=db_show.continuous_download
if fresh_show_data.ended is False
else False,
)
existing_season_external_ids = {s.external_id: s for s in db_show.seasons}
for fresh_season_data in fresh_show_data.seasons:
if fresh_season_data.external_id in existing_season_external_ids:
existing_season = existing_season_external_ids[
fresh_season_data.external_id
]
self.tv_repository.update_season_attributes(
season_id=existing_season.id,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
)
existing_episode_external_ids = {
ep.external_id: ep for ep in existing_season.episodes
}
for fresh_episode_data in fresh_season_data.episodes:
if fresh_episode_data.external_id in existing_episode_external_ids:
existing_episode = existing_episode_external_ids[
fresh_episode_data.external_id
]
self.tv_repository.update_episode_attributes(
episode_id=existing_episode.id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
else:
episode_schema = Episode(
id=EpisodeId(fresh_episode_data.id),
number=fresh_episode_data.number,
external_id=fresh_episode_data.external_id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
self.tv_repository.add_episode_to_season(
season_id=existing_season.id, episode_data=episode_schema
)
else:
season_schema = Season(
id=SeasonId(fresh_season_data.id),
number=fresh_season_data.number,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
external_id=fresh_season_data.external_id,
episodes=[
Episode(
id=EpisodeId(ep_data.id),
number=ep_data.number,
external_id=ep_data.external_id,
title=ep_data.title,
overview=ep_data.overview,
)
for ep_data in fresh_season_data.episodes
],
)
self.tv_repository.add_season_to_show(
show_id=db_show.id, season_data=season_schema
)
updated_show = self.tv_repository.get_show_by_id(show_id=db_show.id)
metadata_provider.download_show_poster_image(show=updated_show)
return updated_show
def update_all_non_ended_shows_metadata(self) -> None:
def get_non_ended_shows() -> list[Show]:
return [show for show in self.tv_repository.get_shows() if not show.ended]
self.update_all_metadata_base(
get_all_to_update_func=get_non_ended_shows,
update_single_func=self.update_show_metadata,
tmdb_provider_class=TmdbMetadataProvider,
tvdb_provider_class=TvdbMetadataProvider,
media_type_name="tv show",
)

View File

@@ -3,26 +3,16 @@ from uuid import UUID
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from media_manager.common.models import MediaFileMixin, MediaMixin
from media_manager.database import Base
from media_manager.torrent.models import Quality
class Show(Base):
class Show(Base, MediaMixin):
__tablename__ = "show"
__table_args__ = (UniqueConstraint("external_id", "metadata_provider"),)
id: Mapped[UUID] = mapped_column(primary_key=True)
external_id: Mapped[int]
metadata_provider: Mapped[str]
name: Mapped[str]
overview: Mapped[str]
year: Mapped[int | None]
ended: Mapped[bool] = mapped_column(default=False)
continuous_download: Mapped[bool] = mapped_column(default=False)
library: Mapped[str] = mapped_column(default="")
original_language: Mapped[str | None] = mapped_column(default=None)
imdb_id: Mapped[str | None] = mapped_column(default=None)
seasons: Mapped[list["Season"]] = relationship(
back_populates="show", cascade="all, delete"
@@ -66,17 +56,12 @@ class Episode(Base):
)
class EpisodeFile(Base):
class EpisodeFile(Base, MediaFileMixin):
__tablename__ = "episode_file"
__table_args__ = (PrimaryKeyConstraint("episode_id", "file_path_suffix"),)
episode_id: Mapped[UUID] = mapped_column(
ForeignKey(column="episode.id", ondelete="CASCADE"),
)
torrent_id: Mapped[UUID | None] = mapped_column(
ForeignKey(column="torrent.id", ondelete="SET NULL"),
)
file_path_suffix: Mapped[str]
quality: Mapped[Quality]
torrent = relationship("Torrent", back_populates="episode_files", uselist=False)
episode = relationship("Episode", back_populates="episode_files", uselist=False)

View File

@@ -1,9 +1,10 @@
from sqlalchemy import delete, func, select
from sqlalchemy import distinct, func, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session, joinedload
from media_manager.common.repository import BaseRepository
from media_manager.exceptions import ConflictError, NotFoundError
from media_manager.torrent.models import Torrent
from media_manager.torrent.models import Torrent as TorrentModel
from media_manager.torrent.schemas import Torrent as TorrentSchema
from media_manager.torrent.schemas import TorrentId
from media_manager.tv import log
@@ -21,24 +22,16 @@ from media_manager.tv.schemas import Season as SeasonSchema
from media_manager.tv.schemas import Show as ShowSchema
class TvRepository:
class TvRepository(BaseRepository[Show, ShowSchema]):
"""
Repository for managing TV shows, seasons, and episodes in the database.
Provides methods to retrieve, save, and delete shows and seasons.
"""
def __init__(self, db: Session) -> None:
self.db = db
super().__init__(db, Show, ShowSchema)
def get_show_by_id(self, show_id: ShowId) -> ShowSchema:
"""
Retrieve a show by its ID, including seasons and episodes.
:param show_id: The ID of the show to retrieve.
:return: A Show object if found.
:raises NotFoundError: If the show with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Show)
@@ -49,23 +42,15 @@ class TvRepository:
if not result:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
return ShowSchema.model_validate(result)
except SQLAlchemyError:
log.exception(f"Database error while retrieving show {show_id}")
raise
else:
return ShowSchema.model_validate(result)
def get_show_by_external_id(
self, external_id: int, metadata_provider: str
) -> ShowSchema:
"""
Retrieve a show by its external ID, including nested seasons and episodes.
:param external_id: The ID of the show to retrieve.
:param metadata_provider: The metadata provider associated with the ID.
:return: A Show object if found.
:raises NotFoundError: If the show with the given external ID and provider is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Show)
@@ -77,466 +62,236 @@ class TvRepository:
if not result:
msg = f"Show with external_id {external_id} and provider {metadata_provider} not found."
raise NotFoundError(msg)
return ShowSchema.model_validate(result)
except SQLAlchemyError:
log.exception(
f"Database error while retrieving show by external_id {external_id}",
)
raise
else:
return ShowSchema.model_validate(result)
def get_shows(self) -> list[ShowSchema]:
"""
Retrieve all shows from the database.
:return: A list of Show objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(Show).options(
joinedload(Show.seasons).joinedload(Season.episodes)
)
results = self.db.execute(stmt).scalars().unique().all()
return [ShowSchema.model_validate(show) for show in results]
except SQLAlchemyError:
log.exception("Database error while retrieving all shows")
raise
else:
return [ShowSchema.model_validate(show) for show in results]
delete_show = BaseRepository.delete
set_show_library = BaseRepository.set_library
def get_total_downloaded_episodes_count(self) -> int:
try:
stmt = select(func.count(Episode.id)).select_from(Episode).join(EpisodeFile)
return self.db.execute(stmt).scalar_one_or_none()
stmt = (
select(func.count(distinct(Episode.id)))
.select_from(Episode)
.join(EpisodeFile)
)
result = self.db.execute(stmt).scalar_one_or_none()
except SQLAlchemyError:
log.exception("Database error while calculating downloaded episodes count")
raise
else:
return result or 0
def save_show(self, show: ShowSchema) -> ShowSchema:
"""
Save a new show or update an existing one in the database.
:param show: The Show object to save.
:return: The saved Show object.
:raises ValueError: If a show with the same primary key already exists (on insert).
:raises SQLAlchemyError: If a database error occurs.
"""
db_show = self.db.get(Show, show.id) if show.id else None
if db_show: # Update existing show
db_show.external_id = show.external_id
db_show.metadata_provider = show.metadata_provider
db_show.name = show.name
db_show.overview = show.overview
db_show.year = show.year
db_show.original_language = show.original_language
db_show.imdb_id = show.imdb_id
else: # Insert new show
db_show = Show(
id=show.id,
external_id=show.external_id,
metadata_provider=show.metadata_provider,
name=show.name,
overview=show.overview,
year=show.year,
ended=show.ended,
original_language=show.original_language,
imdb_id=show.imdb_id,
seasons=[
Season(
id=season.id,
show_id=show.id,
number=season.number,
external_id=season.external_id,
name=season.name,
overview=season.overview,
episodes=[
Episode(
id=episode.id,
season_id=season.id,
number=episode.number,
external_id=episode.external_id,
title=episode.title,
overview=episode.overview,
)
for episode in season.episodes
],
)
for season in show.seasons
],
if db_show: # Use base for update
return self.save_media_base(
media_schema=show, model_class=Show, exclude={"seasons", "episodes"}
)
self.db.add(db_show)
# Custom insertion for nested seasons/episodes
db_show = Show(
id=show.id,
external_id=show.external_id,
metadata_provider=show.metadata_provider,
name=show.name,
overview=show.overview,
year=show.year,
ended=show.ended,
original_language=show.original_language,
imdb_id=show.imdb_id,
continuous_download=show.continuous_download,
library=show.library,
seasons=[
Season(
id=season.id,
show_id=show.id,
number=season.number,
external_id=season.external_id,
name=season.name,
overview=season.overview,
episodes=[
Episode(
id=episode.id,
season_id=season.id,
number=episode.number,
external_id=episode.external_id,
title=episode.title,
overview=episode.overview,
)
for episode in season.episodes
],
)
for season in show.seasons
],
)
self.db.add(db_show)
try:
self.db.commit()
self.db.refresh(db_show)
return ShowSchema.model_validate(db_show)
except IntegrityError as e:
self.db.rollback()
msg = f"Show with this primary key or unique constraint violation: {e.orig}"
msg = f"Integrity error: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while saving show {show.name}")
raise
def delete_show(self, show_id: ShowId) -> None:
"""
Delete a show by its ID.
:param show_id: The ID of the show to delete.
:raises NotFoundError: If the show with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
show = self.db.get(Show, show_id)
if not show:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
self.db.delete(show)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error while deleting show {show_id}")
raise
else:
return ShowSchema.model_validate(db_show)
def get_season(self, season_id: SeasonId) -> SeasonSchema:
"""
Retrieve a season by its ID.
:param season_id: The ID of the season to get.
:return: A Season object.
:raises NotFoundError: If the season with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
season = self.db.get(Season, season_id)
if not season:
msg = f"Season with id {season_id} not found."
raise NotFoundError(msg)
return SeasonSchema.model_validate(season)
except SQLAlchemyError:
log.exception(f"Database error while retrieving season {season_id}")
raise
season = self.db.get(Season, season_id)
if not season:
msg = f"Season {season_id} not found"
raise NotFoundError(msg)
return SeasonSchema.model_validate(season)
def get_episode(self, episode_id: EpisodeId) -> EpisodeSchema:
"""
Retrieve an episode by its ID.
:param episode_id: The ID of the episode to get.
:return: An Episode object.
:raises NotFoundError: If the episode with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
episode = self.db.get(Episode, episode_id)
if not episode:
msg = f"Episode with id {episode_id} not found."
raise NotFoundError(msg)
return EpisodeSchema.model_validate(episode)
except SQLAlchemyError as e:
log.error(f"Database error while retrieving episode {episode_id}: {e}")
raise
episode = self.db.get(Episode, episode_id)
if not episode:
msg = f"Episode {episode_id} not found"
raise NotFoundError(msg)
return EpisodeSchema.model_validate(episode)
def get_season_by_episode(self, episode_id: EpisodeId) -> SeasonSchema:
try:
stmt = select(Season).join(Season.episodes).where(Episode.id == episode_id)
season = self.db.scalar(stmt)
if not season:
msg = f"Season not found for episode {episode_id}"
raise NotFoundError(msg)
return SeasonSchema.model_validate(season)
except SQLAlchemyError as e:
log.error(
f"Database error while retrieving season for episode {episode_id}: {e}"
)
raise
stmt = select(Season).join(Season.episodes).where(Episode.id == episode_id)
season = self.db.scalar(stmt)
if not season:
msg = f"Season for episode {episode_id} not found"
raise NotFoundError(msg)
return SeasonSchema.model_validate(season)
def get_season_by_number(self, season_number: int, show_id: ShowId) -> SeasonSchema:
"""
Retrieve a season by its number and show ID.
:param season_number: The number of the season.
:param show_id: The ID of the show.
:return: A Season object.
:raises NotFoundError: If the season is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Season)
.where(Season.show_id == show_id)
.where(Season.number == season_number)
.options(joinedload(Season.episodes), joinedload(Season.show))
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Season number {season_number} for show_id {show_id} not found."
raise NotFoundError(msg)
return SeasonSchema.model_validate(result)
except SQLAlchemyError:
log.exception(
f"Database error retrieving season {season_number} for show {show_id}"
)
raise
stmt = (
select(Season)
.where(Season.show_id == show_id)
.where(Season.number == season_number)
.options(joinedload(Season.episodes), joinedload(Season.show))
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Season {season_number} for show {show_id} not found"
raise NotFoundError(msg)
return SeasonSchema.model_validate(result)
def add_episode_file(self, episode_file: EpisodeFileSchema) -> EpisodeFileSchema:
"""
Adds an episode file record to the database.
:param episode_file: The EpisodeFile object to add.
:return: The added EpisodeFile object.
:raises IntegrityError: If the record violates constraints.
:raises SQLAlchemyError: If a database error occurs.
"""
db_model = EpisodeFile(**episode_file.model_dump())
try:
self.db.add(db_model)
self.db.commit()
self.db.refresh(db_model)
return EpisodeFileSchema.model_validate(db_model)
except IntegrityError as e:
self.db.rollback()
log.error(f"Integrity error while adding episode file: {e}")
raise
except SQLAlchemyError as e:
self.db.rollback()
log.error(f"Database error while adding episode file: {e}")
raise
return self.add_media_file_base(
file_schema=episode_file,
model_class=EpisodeFile,
schema_class=EpisodeFileSchema,
)
def remove_episode_files_by_torrent_id(self, torrent_id: TorrentId) -> int:
"""
Removes episode file records associated with a given torrent ID.
:param torrent_id: The ID of the torrent whose episode files are to be removed.
:return: The number of episode files removed.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = delete(EpisodeFile).where(EpisodeFile.torrent_id == torrent_id)
result = self.db.execute(stmt)
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(
f"Database error removing episode files for torrent_id {torrent_id}"
)
raise
return result.rowcount
def set_show_library(self, show_id: ShowId, library: str) -> None:
"""
Sets the library for a show.
:param show_id: The ID of the show to update.
:param library: The library path to set for the show.
:raises NotFoundError: If the show with the given ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
show = self.db.get(Show, show_id)
if not show:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
show.library = library
self.db.commit()
except SQLAlchemyError:
self.db.rollback()
log.exception(f"Database error setting library for show {show_id}")
raise
return self.remove_files_by_torrent_id_base(
torrent_id=torrent_id, model_class=EpisodeFile
)
def get_episode_files_by_season_id(
self, season_id: SeasonId
) -> list[EpisodeFileSchema]:
"""
Retrieve all episode files for a given season ID.
:param season_id: The ID of the season.
:return: A list of EpisodeFile objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(EpisodeFile).join(Episode).where(Episode.season_id == season_id)
)
results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(ef) for ef in results]
except SQLAlchemyError:
log.exception(
f"Database error retrieving episode files for season_id {season_id}"
)
raise
stmt = select(EpisodeFile).join(Episode).where(Episode.season_id == season_id)
results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(ef) for ef in results]
def get_episode_files_by_episode_id(
self, episode_id: EpisodeId
) -> list[EpisodeFileSchema]:
"""
Retrieve all episode files for a given episode ID.
:param episode_id: The ID of the episode.
:return: A list of EpisodeFile objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = select(EpisodeFile).where(EpisodeFile.episode_id == episode_id)
results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(sf) for sf in results]
except SQLAlchemyError as e:
log.error(
f"Database error retrieving episode files for episode_id {episode_id}: {e}"
)
raise
stmt = select(EpisodeFile).where(EpisodeFile.episode_id == episode_id)
results = self.db.execute(stmt).scalars().all()
return [EpisodeFileSchema.model_validate(sf) for sf in results]
def get_torrents_by_show_id(self, show_id: ShowId) -> list[TorrentSchema]:
"""
Retrieve all torrents associated with a given show ID.
:param show_id: The ID of the show.
:return: A list of Torrent objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Torrent)
.distinct()
.join(EpisodeFile, EpisodeFile.torrent_id == Torrent.id)
.join(Episode, Episode.id == EpisodeFile.episode_id)
.join(Season, Season.id == Episode.season_id)
.where(Season.show_id == show_id)
)
results = self.db.execute(stmt).scalars().unique().all()
return [TorrentSchema.model_validate(torrent) for torrent in results]
except SQLAlchemyError:
log.exception(f"Database error retrieving torrents for show_id {show_id}")
raise
stmt = (
select(TorrentModel)
.distinct()
.join(EpisodeFile, EpisodeFile.torrent_id == TorrentModel.id)
.join(Episode, Episode.id == EpisodeFile.episode_id)
.join(Season, Season.id == Episode.season_id)
.where(Season.show_id == show_id)
)
results = self.db.execute(stmt).scalars().unique().all()
return [TorrentSchema.model_validate(t) for t in results]
def get_all_shows_with_torrents(self) -> list[ShowSchema]:
"""
Retrieve all shows that are associated with a torrent, ordered alphabetically by show name.
:return: A list of Show objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Show)
.distinct()
.join(Season, Show.id == Season.show_id)
.join(Episode, Season.id == Episode.season_id)
.join(EpisodeFile, Episode.id == EpisodeFile.episode_id)
.join(Torrent, EpisodeFile.torrent_id == Torrent.id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
.order_by(Show.name)
)
results = self.db.execute(stmt).scalars().unique().all()
return [ShowSchema.model_validate(show) for show in results]
except SQLAlchemyError:
log.exception("Database error retrieving all shows with torrents")
raise
stmt = (
select(Show)
.distinct()
.join(Season, Show.id == Season.show_id)
.join(Episode, Season.id == Episode.season_id)
.join(EpisodeFile, Episode.id == EpisodeFile.episode_id)
.join(TorrentModel, EpisodeFile.torrent_id == TorrentModel.id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
.order_by(Show.name)
)
results = self.db.execute(stmt).scalars().unique().all()
return [ShowSchema.model_validate(show) for show in results]
def get_seasons_by_torrent_id(self, torrent_id: TorrentId) -> list[SeasonNumber]:
"""
Retrieve season numbers associated with a given torrent ID.
:param torrent_id: The ID of the torrent.
:return: A list of SeasonNumber objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Season.number)
.distinct()
.join(Episode, Episode.season_id == Season.id)
.join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
.where(EpisodeFile.torrent_id == torrent_id)
)
results = self.db.execute(stmt).scalars().unique().all()
return [SeasonNumber(x) for x in results]
except SQLAlchemyError:
log.exception(
f"Database error retrieving season numbers for torrent_id {torrent_id}"
)
raise
stmt = (
select(Season.number)
.distinct()
.join(Episode, Episode.season_id == Season.id)
.join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
.where(EpisodeFile.torrent_id == torrent_id)
)
results = self.db.execute(stmt).scalars().unique().all()
return [SeasonNumber(x) for x in results]
def get_episodes_by_torrent_id(self, torrent_id: TorrentId) -> list[EpisodeNumber]:
"""
Retrieve episode numbers associated with a given torrent ID.
:param torrent_id: The ID of the torrent.
:return: A list of EpisodeNumber objects.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Episode.number)
.join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
.where(EpisodeFile.torrent_id == torrent_id)
.order_by(Episode.number)
)
episode_numbers = self.db.execute(stmt).scalars().all()
return [EpisodeNumber(n) for n in sorted(set(episode_numbers))]
except SQLAlchemyError as e:
log.error(
f"Database error retrieving episodes for torrent_id {torrent_id}: {e}"
)
raise
stmt = (
select(Episode.number)
.distinct()
.join(EpisodeFile, EpisodeFile.episode_id == Episode.id)
.where(EpisodeFile.torrent_id == torrent_id)
.order_by(Episode.number)
)
episode_numbers = self.db.execute(stmt).scalars().all()
return [EpisodeNumber(n) for n in episode_numbers]
def get_show_by_season_id(self, season_id: SeasonId) -> ShowSchema:
"""
Retrieve a show by one of its season's ID.
:param season_id: The ID of the season to retrieve the show for.
:return: A Show object.
:raises NotFoundError: If the show for the given season ID is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
try:
stmt = (
select(Show)
.join(Season, Show.id == Season.show_id)
.where(Season.id == season_id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Show for season_id {season_id} not found."
raise NotFoundError(msg)
return ShowSchema.model_validate(result)
except SQLAlchemyError:
log.exception(f"Database error retrieving show by season_id {season_id}")
raise
stmt = (
select(Show)
.join(Season, Show.id == Season.show_id)
.where(Season.id == season_id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
)
result = self.db.execute(stmt).unique().scalar_one_or_none()
if not result:
msg = f"Show for season {season_id} not found"
raise NotFoundError(msg)
return ShowSchema.model_validate(result)
def add_season_to_show(
self, show_id: ShowId, season_data: SeasonSchema
) -> SeasonSchema:
"""
Adds a new season and its episodes to a show.
If the season number already exists for the show, it returns the existing season.
:param show_id: The ID of the show to add the season to.
:param season_data: The SeasonSchema object for the new season.
:return: The added or existing SeasonSchema object.
:raises NotFoundError: If the show is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_show = self.db.get(Show, show_id)
if not db_show:
msg = f"Show with id {show_id} not found."
msg = f"Show {show_id} not found"
raise NotFoundError(msg)
stmt = (
select(Season)
.where(Season.show_id == show_id)
.where(Season.number == season_data.number)
stmt = select(Season).where(
Season.show_id == show_id, Season.number == season_data.number
)
existing_db_season = self.db.execute(stmt).scalar_one_or_none()
if existing_db_season:
return SeasonSchema.model_validate(existing_db_season)
existing = self.db.execute(stmt).scalar_one_or_none()
if existing:
return SeasonSchema.model_validate(existing)
db_season = Season(
id=season_data.id,
show_id=show_id,
@@ -550,54 +305,56 @@ class TvRepository:
number=ep_schema.number,
external_id=ep_schema.external_id,
title=ep_schema.title,
overview=ep_schema.overview,
)
for ep_schema in season_data.episodes
],
)
self.db.add(db_season)
self.db.commit()
self.db.refresh(db_season)
try:
self.db.commit()
self.db.refresh(db_season)
except IntegrityError as e:
self.db.rollback()
msg = f"Integrity error: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
raise
return SeasonSchema.model_validate(db_season)
def add_episode_to_season(
self, season_id: SeasonId, episode_data: EpisodeSchema
) -> EpisodeSchema:
"""
Adds a new episode to a season.
If the episode number already exists for the season, it returns the existing episode.
:param season_id: The ID of the season to add the episode to.
:param episode_data: The EpisodeSchema object for the new episode.
:return: The added or existing EpisodeSchema object.
:raises NotFoundError: If the season is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_season = self.db.get(Season, season_id)
if not db_season:
msg = f"Season with id {season_id} not found."
msg = f"Season {season_id} not found"
raise NotFoundError(msg)
stmt = (
select(Episode)
.where(Episode.season_id == season_id)
.where(Episode.number == episode_data.number)
stmt = select(Episode).where(
Episode.season_id == season_id, Episode.number == episode_data.number
)
existing_db_episode = self.db.execute(stmt).scalar_one_or_none()
if existing_db_episode:
return EpisodeSchema.model_validate(existing_db_episode)
existing = self.db.execute(stmt).scalar_one_or_none()
if existing:
return EpisodeSchema.model_validate(existing)
db_episode = Episode(
id=episode_data.id,
season_id=season_id,
number=episode_data.number,
external_id=episode_data.external_id,
title=episode_data.title,
overview=episode_data.overview,
)
self.db.add(db_episode)
self.db.commit()
self.db.refresh(db_episode)
try:
self.db.commit()
self.db.refresh(db_episode)
except IntegrityError as e:
self.db.rollback()
msg = f"Integrity error: {e.orig}"
raise ConflictError(msg) from e
except SQLAlchemyError:
self.db.rollback()
raise
return EpisodeSchema.model_validate(db_episode)
def update_show_attributes(
@@ -610,69 +367,24 @@ class TvRepository:
continuous_download: bool | None = None,
imdb_id: str | None = None,
) -> ShowSchema:
"""
Update attributes of an existing show.
:param imdb_id: The new IMDb ID for the show.
:param continuous_download: The new continuous download status for the show.
:param show_id: The ID of the show to update.
:param name: The new name for the show.
:param overview: The new overview for the show.
:param year: The new year for the show.
:param ended: The new ended status for the show.
:return: The updated ShowSchema object.
"""
db_show = self.db.get(Show, show_id)
if not db_show:
msg = f"Show with id {show_id} not found."
raise NotFoundError(msg)
updated = False
if name is not None and db_show.name != name:
db_show.name = name
updated = True
if overview is not None and db_show.overview != overview:
db_show.overview = overview
updated = True
if year is not None and db_show.year != year:
db_show.year = year
updated = True
if ended is not None and db_show.ended != ended:
db_show.ended = ended
updated = True
if (
continuous_download is not None
and db_show.continuous_download != continuous_download
):
db_show.continuous_download = continuous_download
updated = True
if imdb_id is not None and db_show.imdb_id != imdb_id:
db_show.imdb_id = imdb_id
updated = True
if updated:
self.db.commit()
self.db.refresh(db_show)
return ShowSchema.model_validate(db_show)
return self.update_media_attributes_base(
media_id=show_id,
model_class=Show,
name=name,
overview=overview,
year=year,
ended=ended,
continuous_download=continuous_download,
imdb_id=imdb_id,
)
def update_season_attributes(
self, season_id: SeasonId, name: str | None = None, overview: str | None = None
) -> SeasonSchema:
"""
Update attributes of an existing season.
:param season_id: The ID of the season to update.
:param name: The new name for the season.
:param overview: The new overview for the season.
:param external_id: The new external ID for the season.
:return: The updated SeasonSchema object.
:raises NotFoundError: If the season is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_season = self.db.get(Season, season_id)
if not db_season:
msg = f"Season with id {season_id} not found."
msg = f"Season {season_id} not found"
raise NotFoundError(msg)
updated = False
if name is not None and db_season.name != name:
db_season.name = name
@@ -680,13 +392,13 @@ class TvRepository:
if overview is not None and db_season.overview != overview:
db_season.overview = overview
updated = True
if updated:
self.db.commit()
self.db.refresh(db_season)
log.debug(
f"Updating existing season {db_season.number} for show {db_season.show.name}"
)
try:
self.db.commit()
self.db.refresh(db_season)
except SQLAlchemyError:
self.db.rollback()
raise
return SeasonSchema.model_validate(db_season)
def update_episode_attributes(
@@ -695,22 +407,10 @@ class TvRepository:
title: str | None = None,
overview: str | None = None,
) -> EpisodeSchema:
"""
Update attributes of an existing episode.
:param overview: Tje new overview for the episode.
:param episode_id: The ID of the episode to update.
:param title: The new title for the episode.
:param external_id: The new external ID for the episode.
:return: The updated EpisodeSchema object.
:raises NotFoundError: If the episode is not found.
:raises SQLAlchemyError: If a database error occurs.
"""
db_episode = self.db.get(Episode, episode_id)
if not db_episode:
msg = f"Episode with id {episode_id} not found."
msg = f"Episode {episode_id} not found"
raise NotFoundError(msg)
updated = False
if title is not None and db_episode.title != title:
db_episode.title = title
@@ -718,9 +418,11 @@ class TvRepository:
if overview is not None and db_episode.overview != overview:
db_episode.overview = overview
updated = True
if updated:
self.db.commit()
self.db.refresh(db_episode)
log.info(f"Updating existing episode {db_episode.number}")
try:
self.db.commit()
self.db.refresh(db_episode)
except SQLAlchemyError:
self.db.rollback()
raise
return EpisodeSchema.model_validate(db_episode)

View File

@@ -17,6 +17,8 @@ from media_manager.torrent.utils import get_importable_media_directories
from media_manager.tv.dependencies import (
season_dep,
show_dep,
tv_import_service_dep,
tv_metadata_service_dep,
tv_service_dep,
)
from media_manager.tv.schemas import (
@@ -40,12 +42,16 @@ router = APIRouter()
dependencies=[Depends(current_active_user)],
)
def search_metadata_providers_for_a_show(
tv_service: tv_service_dep, query: str, metadata_provider: metadata_provider_dep
tv_metadata_service: tv_metadata_service_dep,
query: str,
metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]:
"""
Search for a show on the configured metadata provider.
"""
return tv_service.search_for_show(query=query, metadata_provider=metadata_provider)
return tv_metadata_service.search_for_show(
query=query, metadata_provider=metadata_provider
)
@router.get(
@@ -53,12 +59,13 @@ def search_metadata_providers_for_a_show(
dependencies=[Depends(current_active_user)],
)
def get_recommended_shows(
tv_service: tv_service_dep, metadata_provider: metadata_provider_dep
tv_metadata_service: tv_metadata_service_dep,
metadata_provider: metadata_provider_dep,
) -> list[MetaDataProviderSearchResult]:
"""
Get a list of recommended/popular shows from the metadata provider.
"""
return tv_service.get_popular_shows(metadata_provider=metadata_provider)
return tv_metadata_service.get_popular_shows(metadata_provider=metadata_provider)
# -----------------------------------------------------------------------------
@@ -72,12 +79,14 @@ def get_recommended_shows(
dependencies=[Depends(current_superuser)],
)
def get_all_importable_shows(
tv_service: tv_service_dep, metadata_provider: metadata_provider_dep
tv_import_service: tv_import_service_dep, metadata_provider: metadata_provider_dep
) -> list[MediaImportSuggestion]:
"""
Get a list of unknown shows that were detected in the TV directory and are importable.
"""
return tv_service.get_importable_tv_shows(metadata_provider=metadata_provider)
return tv_import_service.get_importable_tv_shows(
metadata_provider=metadata_provider
)
@router.post(
@@ -86,7 +95,7 @@ def get_all_importable_shows(
status_code=status.HTTP_204_NO_CONTENT,
)
def import_detected_show(
tv_service: tv_service_dep, tv_show: show_dep, directory: str
tv_import_service: tv_import_service_dep, tv_show: show_dep, directory: str
) -> None:
"""
Import a detected show from the specified directory into the library.
@@ -96,7 +105,7 @@ def import_detected_show(
MediaManagerConfig().misc.tv_directory
):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")
tv_service.import_existing_tv_show(
tv_import_service.import_existing_tv_show(
tv_show=tv_show, source_directory=source_directory
)
@@ -129,7 +138,7 @@ def get_all_shows(tv_service: tv_service_dep) -> list[Show]:
},
)
def add_a_show(
tv_service: tv_service_dep,
tv_metadata_service: tv_metadata_service_dep,
metadata_provider: metadata_provider_dep,
show_id: int,
language: str | None = None,
@@ -138,13 +147,13 @@ def add_a_show(
Add a new show to the library.
"""
try:
show = tv_service.add_show(
show = tv_metadata_service.add_show(
external_id=show_id,
metadata_provider=metadata_provider,
language=language,
)
except MediaAlreadyExistsError:
show = tv_service.get_show_by_external_id(
show = tv_metadata_service.tv_repository.get_show_by_external_id(
show_id, metadata_provider=metadata_provider.name
)
if not show:
@@ -216,12 +225,17 @@ def delete_a_show(
dependencies=[Depends(current_active_user)],
)
def update_shows_metadata(
show: show_dep, tv_service: tv_service_dep, metadata_provider: metadata_provider_dep
show: show_dep,
tv_metadata_service: tv_metadata_service_dep,
tv_service: tv_service_dep,
metadata_provider: metadata_provider_dep,
) -> PublicShow:
"""
Update a show's metadata from the provider.
"""
tv_service.update_show_metadata(db_show=show, metadata_provider=metadata_provider)
tv_metadata_service.update_show_metadata(
db_show=show, metadata_provider=metadata_provider
)
return tv_service.get_public_show_by_id(show=show)
@@ -361,4 +375,4 @@ def get_total_count_of_downloaded_episodes(tv_service: tv_service_dep) -> int:
"""
Get the total count of downloaded episodes across all shows.
"""
return tv_service.get_total_downloaded_episoded_count()
return tv_service.get_total_downloaded_episodes_count()

View File

@@ -4,6 +4,7 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from media_manager.common.schemas import BaseMedia, BaseMediaFile
from media_manager.torrent.models import Quality
from media_manager.torrent.schemas import TorrentId, TorrentStatus
@@ -39,35 +40,17 @@ class Season(BaseModel):
episodes: list[Episode]
class Show(BaseModel):
model_config = ConfigDict(from_attributes=True)
class Show(BaseMedia):
id: ShowId = Field(default_factory=lambda: ShowId(uuid.uuid4()))
name: str
overview: str
year: int | None
ended: bool = False
external_id: int
metadata_provider: str
continuous_download: bool = False
library: str = "Default"
original_language: str | None = None
imdb_id: str | None = None
seasons: list[Season]
class EpisodeFile(BaseModel):
model_config = ConfigDict(from_attributes=True)
class EpisodeFile(BaseMediaFile):
episode_id: EpisodeId
quality: Quality
torrent_id: TorrentId | None
file_path_suffix: str
class PublicEpisodeFile(EpisodeFile):

View File

@@ -1,39 +1,21 @@
import pprint
import re
import shutil
from pathlib import Path
from typing import overload
from sqlalchemy.exc import IntegrityError
from media_manager.common.service import BaseMediaService
from media_manager.config import MediaManagerConfig
from media_manager.exceptions import InvalidConfigError, NotFoundError, RenameError
from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId
from media_manager.indexer.service import IndexerService
from media_manager.indexer.utils import evaluate_indexer_query_results
from media_manager.metadataProvider.abstract_metadata_provider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import (
Quality,
Torrent,
TorrentStatus,
)
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
extract_external_id_from_string,
get_files_for_import,
get_importable_media_directories,
import_file,
remove_special_characters,
remove_special_chars_and_parentheses,
)
from media_manager.tv import log
from media_manager.tv.importer import TvImportService
from media_manager.tv.metadata import TvMetadataService
from media_manager.tv.repository import TvRepository
from media_manager.tv.schemas import (
Episode,
@@ -52,40 +34,27 @@ from media_manager.tv.schemas import (
)
class TvService:
class TvService(BaseMediaService[Show, Show]):
def __init__(
self,
tv_repository: TvRepository,
torrent_service: TorrentService,
indexer_service: IndexerService,
notification_service: NotificationService,
tv_import_service: TvImportService,
tv_metadata_service: TvMetadataService,
) -> None:
self.tv_repository = tv_repository
self.torrent_service = torrent_service
self.indexer_service = indexer_service
self.notification_service = notification_service
def add_show(
self,
external_id: int,
metadata_provider: AbstractMetadataProvider,
language: str | None = None,
) -> Show:
"""
Add a new show to the database.
:param external_id: The ID of the show in the metadata provider\'s system.
:param metadata_provider: The name of the metadata provider.
:param language: Optional language code (ISO 639-1) to fetch metadata in.
"""
show_with_metadata = metadata_provider.get_show_metadata(
show_id=external_id, language=language
super().__init__(
repository=tv_repository,
torrent_service=torrent_service,
indexer_service=indexer_service,
notification_service=notification_service,
)
saved_show = self.tv_repository.save_show(show=show_with_metadata)
metadata_provider.download_show_poster_image(show=saved_show)
return saved_show
self.tv_repository = tv_repository
self.tv_import_service = tv_import_service
self.tv_metadata_service = tv_metadata_service
def get_total_downloaded_episoded_count(self) -> int:
def get_total_downloaded_episodes_count(self) -> int:
"""
Get total number of downloaded episodes.
"""
@@ -93,7 +62,13 @@ class TvService:
return self.tv_repository.get_total_downloaded_episodes_count()
def set_show_library(self, show: Show, library: str) -> None:
self.tv_repository.set_show_library(show_id=show.id, library=library)
self.tv_repository.set_show_library(show.id, library)
def get_all_shows(self) -> list[Show]:
"""
Get all shows in the library.
"""
return self.get_all_media()
def delete_show(
self,
@@ -131,7 +106,7 @@ class TvService:
f"Failed to delete torrent {torrent.hash}", exc_info=True
)
self.tv_repository.delete_show(show_id=show.id)
self.tv_repository.delete_show(show.id)
def get_public_episode_files_by_season_id(
self, season: Season
@@ -155,46 +130,6 @@ class TvService:
result.append(episode_file)
return result
@overload
def check_if_show_exists(self, *, external_id: int, metadata_provider: str) -> bool:
"""
Check if a show exists in the database.
:param external_id: The external ID of the show.
:param metadata_provider: The metadata provider.
:return: True if the show exists, False otherwise.
"""
@overload
def check_if_show_exists(self, *, show_id: ShowId) -> bool:
"""
Check if a show exists in the database.
:param show_id: The ID of the show.
:return: True if the show exists, False otherwise.
"""
def check_if_show_exists(
self, *, external_id=None, metadata_provider=None, show_id=None
) -> bool:
if not (external_id is None or metadata_provider is None):
try:
self.tv_repository.get_show_by_external_id(
external_id=external_id, metadata_provider=metadata_provider
)
except NotFoundError:
return False
elif show_id is not None:
try:
self.tv_repository.get_show_by_id(show_id=show_id)
except NotFoundError:
return False
else:
msg = "Use one of the provided overloads for this function!"
raise ValueError(msg)
return True
def get_all_available_torrents_for_a_season(
self,
season_number: int,
@@ -225,62 +160,6 @@ class TvService:
is_tv=True, query_results=results, media=show
)
def get_all_shows(self) -> list[Show]:
"""
Get all shows.
:return: A list of all shows.
"""
return self.tv_repository.get_shows()
def search_for_show(
self, query: str, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Search for shows using a given query.
:param query: The search query.
:param metadata_provider: The metadata provider to search.
:return: A list of metadata provider show search results.
"""
results = metadata_provider.search_show(query)
for result in results:
if self.check_if_show_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
):
result.added = True
try:
show = self.tv_repository.get_show_by_external_id(
external_id=result.external_id,
metadata_provider=metadata_provider.name,
)
result.id = show.id
except Exception:
log.error(
f"Unable to find internal show ID for {result.external_id} on {metadata_provider.name}"
)
return results
def get_popular_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MetaDataProviderSearchResult]:
"""
Get popular shows from a given metadata provider.
:param metadata_provider: The metadata provider to use.
:return: A list of metadata provider show search results.
"""
results = metadata_provider.search_show()
return [
result
for result in results
if not self.check_if_show_exists(
external_id=result.external_id, metadata_provider=metadata_provider.name
)
]
def get_public_show_by_id(self, show: Show) -> PublicShow:
"""
Get a public show from a Show object.
@@ -577,402 +456,15 @@ class TvService:
def get_root_show_directory(self, show: Show) -> Path:
misc_config = MediaManagerConfig().misc
show_directory_name = f"{remove_special_characters(show.name)} ({show.year}) [{show.metadata_provider}id-{show.external_id}]"
log.debug(
f"Show {show.name} without special characters: {remove_special_characters(show.name)}"
return self.get_root_directory(
media=show,
default_dir=misc_config.tv_directory,
libraries=misc_config.tv_libraries,
)
if show.library != "Default":
for library in misc_config.tv_libraries:
if library.name == show.library:
log.debug(
f"Using library {library.name} for show {show.name} ({show.year})"
)
return Path(library.path) / show_directory_name
else:
log.warning(
f"Library {show.library} not defined in config, using default TV directory."
)
return misc_config.tv_directory / show_directory_name
def get_root_season_directory(self, show: Show, season_number: int) -> Path:
return self.get_root_show_directory(show) / Path(f"Season {season_number}")
def import_episode(
self,
show: Show,
season: Season,
episode_number: int,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode_number:02d}"
if file_path_suffix != "":
episode_file_name += f" - {file_path_suffix}"
pattern = (
r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode_number) + r"[. ].*"
)
subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt"
target_file_name = (
self.get_root_season_directory(show=show, season_number=season.number)
/ episode_file_name
)
# import subtitle
for subtitle_file in subtitle_files:
regex_result = re.search(
subtitle_pattern, subtitle_file.name, re.IGNORECASE
)
if regex_result:
language_code = regex_result.group(1)
target_subtitle_file = target_file_name.with_suffix(
f".{language_code}.srt"
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
else:
log.debug(
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
# import episode videos
for file in video_files:
if re.search(pattern, file.name, re.IGNORECASE):
target_video_file = target_file_name.with_suffix(file.suffix)
import_file(target_file=target_video_file, source_file=file)
return True
else:
msg = f"Could not find any video file for episode {episode_number} of show {show.name} S{season.number}"
raise Exception(msg) # noqa: TRY002 # TODO: resolve this
def import_season(
self,
show: Show,
season: Season,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> tuple[bool, list[Episode]]:
season_path = self.get_root_season_directory(
show=show, season_number=season.number
)
success = True
imported_episodes = []
try:
season_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
log.exception(f"Could not create path {season_path}")
msg = f"Could not create path {season_path}"
raise Exception(msg) from e # noqa: TRY002 # TODO: resolve this
for episode in season.episodes:
try:
imported = self.import_episode(
show=show,
subtitle_files=subtitle_files,
video_files=video_files,
season=season,
episode_number=episode.number,
file_path_suffix=file_path_suffix,
)
if imported:
imported_episodes.append(episode)
except Exception:
# Send notification about missing episode file
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Missing Episode File",
message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.",
)
success = False
log.warning(
f"S{season.number}E{episode.number} not found when trying to import episode for show {show.name}."
)
return success, imported_episodes
def import_episode_files(
self,
show: Show,
season: Season,
episode: Episode,
video_files: list[Path],
subtitle_files: list[Path],
file_path_suffix: str = "",
) -> bool:
episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode.number:02d}"
if file_path_suffix != "":
episode_file_name += f" - {file_path_suffix}"
pattern = (
r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode.number) + r"[. ].*"
)
subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt"
target_file_name = (
self.get_root_season_directory(show=show, season_number=season.number)
/ episode_file_name
)
# import subtitle
for subtitle_file in subtitle_files:
regex_result = re.search(
subtitle_pattern, subtitle_file.name, re.IGNORECASE
)
if regex_result:
language_code = regex_result.group(1)
target_subtitle_file = target_file_name.with_suffix(
f".{language_code}.srt"
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
else:
log.debug(
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
)
found_video = False
# import episode videos
for file in video_files:
if re.search(pattern, file.name, re.IGNORECASE):
target_video_file = target_file_name.with_suffix(file.suffix)
import_file(target_file=target_video_file, source_file=file)
found_video = True
break
if not found_video:
# Send notification about missing episode file
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Missing Episode File",
message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.",
)
log.warning(
f"File for S{season.number}E{episode.number} not found when trying to import episode for show {show.name}."
)
return False
return True
def import_episode_files_from_torrent(self, torrent: Torrent, show: Show) -> None:
"""
Organizes episodes files from a torrent into the TV directory structure, mapping them to seasons and episodes.
:param torrent: The Torrent object
:param show: The Show object
"""
video_files, subtitle_files, _all_files = get_files_for_import(torrent=torrent)
success: list[bool] = []
log.debug(
f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files)
)
episode_files = self.torrent_service.get_episode_files_of_torrent(
torrent=torrent
)
if not episode_files:
log.warning(
f"No episode files associated with torrent {torrent.title}, skipping import."
)
return
log.info(
f"Found {len(episode_files)} episode files associated with torrent {torrent.title}"
)
imported_episodes_by_season: dict[int, list[int]] = {}
for episode_file in episode_files:
season = self.get_season_by_episode(episode_id=episode_file.episode_id)
episode = self.get_episode(episode_file.episode_id)
season_path = self.get_root_season_directory(
show=show, season_number=season.number
)
if not season_path.exists():
try:
season_path.mkdir(parents=True)
except Exception as e:
log.warning(f"Could not create path {season_path}: {e}")
msg = f"Could not create path {season_path}"
raise Exception(msg) from e # noqa: TRY002
episoded_import_success = self.import_episode_files(
show=show,
season=season,
episode=episode,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix=episode_file.file_path_suffix,
)
success.append(episoded_import_success)
if episoded_import_success:
imported_episodes_by_season.setdefault(season.number, []).append(
episode.number
)
log.info(
f"Episode {episode.number} from Season {season.number} successfully imported from torrent {torrent.title}"
)
else:
log.warning(
f"Episode {episode.number} from Season {season.number} failed to import from torrent {torrent.title}"
)
success_messages: list[str] = []
for season_number, episodes in imported_episodes_by_season.items():
episode_list = ",".join(str(e) for e in sorted(episodes))
success_messages.append(
f"Episode(s): {episode_list} from Season {season_number}"
)
episodes_summary = "; ".join(success_messages)
if all(success):
torrent.imported = True
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
# Send successful season download notification
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="TV Show imported successfully",
message=(
f"Successfully imported {episodes_summary} "
f"of {show.name} ({show.year}) "
f"from torrent {torrent.title}."
),
)
else:
if self.notification_service:
self.notification_service.send_notification_to_all_providers(
title="Failed to import TV Show",
message=f"Importing {show.name} ({show.year}) from torrent {torrent.title} completed with errors. Please check the logs for details.",
)
log.info(
f"Finished importing files for torrent {torrent.title} {'without' if all(success) else 'with'} errors"
)
def update_show_metadata(
self, db_show: Show, metadata_provider: AbstractMetadataProvider
) -> Show | None:
"""
Updates the metadata of a show.
This includes adding new seasons and episodes if available from the metadata provider.
It also updates existing show, season, and episode attributes if they have changed.
:param metadata_provider: The metadata provider object to fetch fresh data from.
:param db_show: The Show to update
:return: The updated Show object, or None if the show is not found or an error occurs.
"""
log.debug(f"Found show: {db_show.name} for metadata update.")
# Use stored original_language preference for metadata fetching
fresh_show_data = metadata_provider.get_show_metadata(
show_id=db_show.external_id, language=db_show.original_language
)
if not fresh_show_data:
log.warning(
f"Could not fetch fresh metadata for show {db_show.name} (External ID: {db_show.external_id}) from {db_show.metadata_provider}."
)
return db_show
log.debug(f"Fetched fresh metadata for show: {fresh_show_data.name}")
self.tv_repository.update_show_attributes(
show_id=db_show.id,
name=fresh_show_data.name,
overview=fresh_show_data.overview,
year=fresh_show_data.year,
ended=fresh_show_data.ended,
imdb_id=fresh_show_data.imdb_id,
continuous_download=db_show.continuous_download
if fresh_show_data.ended is False
else False,
)
# Process seasons and episodes
existing_season_external_ids = {s.external_id: s for s in db_show.seasons}
for fresh_season_data in fresh_show_data.seasons:
if fresh_season_data.external_id in existing_season_external_ids:
# Update existing season
existing_season = existing_season_external_ids[
fresh_season_data.external_id
]
self.tv_repository.update_season_attributes(
season_id=existing_season.id,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
)
# Process episodes for this season
existing_episode_external_ids = {
ep.external_id: ep for ep in existing_season.episodes
}
for fresh_episode_data in fresh_season_data.episodes:
if fresh_episode_data.external_id in existing_episode_external_ids:
# Update existing episode
existing_episode = existing_episode_external_ids[
fresh_episode_data.external_id
]
self.tv_repository.update_episode_attributes(
episode_id=existing_episode.id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
else:
# Add new episode
log.debug(
f"Adding new episode {fresh_episode_data.number} to season {existing_season.number}"
)
episode_schema = Episode(
id=EpisodeId(fresh_episode_data.id),
number=fresh_episode_data.number,
external_id=fresh_episode_data.external_id,
title=fresh_episode_data.title,
overview=fresh_episode_data.overview,
)
self.tv_repository.add_episode_to_season(
season_id=existing_season.id, episode_data=episode_schema
)
else:
# Add new season (and its episodes)
log.debug(
f"Adding new season {fresh_season_data.number} to show {db_show.name}"
)
episodes_for_schema = [
Episode(
id=EpisodeId(ep_data.id),
number=ep_data.number,
external_id=ep_data.external_id,
title=ep_data.title,
overview=ep_data.overview,
)
for ep_data in fresh_season_data.episodes
]
season_schema = Season(
id=SeasonId(fresh_season_data.id),
number=fresh_season_data.number,
name=fresh_season_data.name,
overview=fresh_season_data.overview,
external_id=fresh_season_data.external_id,
episodes=episodes_for_schema,
)
self.tv_repository.add_season_to_show(
show_id=db_show.id, season_data=season_schema
)
updated_show = self.tv_repository.get_show_by_id(show_id=db_show.id)
log.info(f"Successfully updated metadata for show: {updated_show.name}")
metadata_provider.download_show_poster_image(show=updated_show)
return updated_show
def set_show_continuous_download(
self, show: Show, continuous_download: bool
) -> Show:
@@ -987,129 +479,14 @@ class TvService:
show_id=show.id, continuous_download=continuous_download
)
def get_import_candidates(
self, tv_show: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_show(
remove_special_chars_and_parentheses(tv_show.name), metadata_provider
)
import_candidates = MediaImportSuggestion(
directory=tv_show, candidates=search_result
)
log.debug(
f"Found {len(import_candidates.candidates)} candidates for {import_candidates.directory}"
)
return import_candidates
def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> None:
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.exception(f"Failed to rename {source_directory} to {new_source_path}")
raise RenameError from e
video_files, subtitle_files, _all_files = get_files_for_import(
directory=new_source_path
)
for season in tv_show.seasons:
_success, imported_episodes = self.import_season(
show=tv_show,
season=season,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
)
for episode in imported_episodes:
episode_file = EpisodeFile(
episode_id=episode.id,
quality=Quality.unknown,
file_path_suffix="IMPORTED",
torrent_id=None,
)
self.tv_repository.add_episode_file(episode_file=episode_file)
def get_importable_tv_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
tv_directory = MediaManagerConfig().misc.tv_directory
import_suggestions: list[MediaImportSuggestion] = []
candidate_dirs = get_importable_media_directories(tv_directory)
for item in candidate_dirs:
metadata, external_id = extract_external_id_from_string(item.name)
if metadata is not None and external_id is not None:
try:
self.tv_repository.get_show_by_external_id(
external_id=external_id,
metadata_provider=metadata,
)
log.debug(
f"Show {item.name} already exists in the database, skipping import suggestion."
)
continue
except NotFoundError:
log.debug(
f"Show {item.name} not found in database, checking for import candidates."
)
import_suggestion = self.get_import_candidates(
tv_show=item, metadata_provider=metadata_provider
)
import_suggestions.append(import_suggestion)
log.debug(f"Detected {len(import_suggestions)} importable TV shows.")
return import_suggestions
def import_all_torrents(self) -> None:
log.info("Importing all torrents")
torrents = self.torrent_service.get_all_torrents()
log.info("Found %d torrents to import", len(torrents))
for t in torrents:
show = None
try:
if not t.imported and t.status == TorrentStatus.finished:
show = self.torrent_service.get_show_of_torrent(torrent=t)
if show is None:
log.warning(
f"torrent {t.title} is not a tv torrent, skipping import."
)
continue
self.import_episode_files_from_torrent(torrent=t, show=show)
except RuntimeError as e:
show_name = show.name if show is not None else "<unknown>"
log.error(
f"Error importing torrent {t.title} for show {show_name}: {e}",
exc_info=True,
)
log.info("Finished importing all torrents")
"""
Delegate to TvImportService.
"""
self.tv_import_service.import_all_torrents()
def update_all_non_ended_shows_metadata(self) -> None:
"""Updates the metadata of all non-ended shows."""
log.info("Updating metadata for all non-ended shows")
shows = [show for show in self.tv_repository.get_shows() if not show.ended]
log.info(f"Found {len(shows)} non-ended shows to update")
for show in shows:
try:
if show.metadata_provider == "tmdb":
metadata_provider = TmdbMetadataProvider()
elif show.metadata_provider == "tvdb":
metadata_provider = TvdbMetadataProvider()
else:
log.error(
f"Unsupported metadata provider {show.metadata_provider} for show {show.name}, skipping update."
)
continue
except InvalidConfigError:
log.exception(
f"Error initializing metadata provider {show.metadata_provider} for show {show.name}"
)
continue
updated_show = self.update_show_metadata(
db_show=show, metadata_provider=metadata_provider
)
if updated_show:
log.debug("Updated show metadata", extra={"show": updated_show.name})
else:
log.warning(f"Failed to update metadata for show: {show.name}")
"""
Delegate to TvMetadataService.
"""
self.tv_metadata_service.update_all_non_ended_shows_metadata()