Merge pull request #297 from maxdorninger/improve-importing

Improve importing
This commit is contained in:
Maximilian Dorninger
2025-12-20 21:39:22 +01:00
committed by GitHub
9 changed files with 173 additions and 76 deletions

View File

@@ -1,11 +1,10 @@
# Importing existing media
In order for MediaManager to be able to import existing media (e.g. downloaded by Sonarr or Radarr)
3 conditions have to be met:
2 conditions have to be met:
1. The folder's name must not contain `[tmdbid-xxxxx]` or `[tvdbid-xxxxx]`.
2. The folder's name must not start with a dot.
3. The media must be in the root tv/movie library
1. The folder's name must not start with a dot.
2. The media must be in the root tv/movie library
Here is an example, using these rules:
@@ -14,9 +13,8 @@ Here is an example, using these rules:
└── data/
├── tv/
│ ├── Rick and Morty # WILL be imported
│ ├── Stranger Things (2016) # WILL be imported
│ ├── Breaking Bad (2008) [tmdbid-1396] # WILL NOT be imported
│ ├── Stranger Things (2016) {tvdb_12345} [x265] # WILL be imported
├── Breaking Bad (2008) [tmdbid-1396] # WILL be imported
│ ├── .The Office (2013) # WILL NOT
│ │
│ └── my-custom-library/
@@ -44,16 +42,18 @@ So after importing, the directory would look like this (using the above director
│ ├── .Rick and Morty # RENAMED
│ ├── Rick and Morty (2013) [tmdbid-60625] # IMPORTED
│ │
│ ├── .Stranger Things (2016) # RENAMED
│ ├── .Stranger Things (2016) {tvdb_12345} [x265]
│ ├── Stranger Things (2016) [tmdbid-66732] # IMPORTED
│ │
│ ├── .The Office (2013) # IGNORED
├── Breaking Bad (2008) [tmdbid-1396] # IGNORED
│ ├── .Breaking Bad (2008) [tmdbid-1396]
│ ├── Breaking Bad (2008) [tmdbid-1396] # IMPORTED
│ │
│ └── my-custom-library/
│ └── The Simpsons # IGNORED
└── movie/
├── .Oppenheimer (2023) # RENAMED
├── .Oppenheimer (2023)
└── Oppenheimer (2023) [tmdbid-872585] # IMPORTED
```

View File

@@ -8,6 +8,7 @@ from media_manager.metadataProvider.abstractMetaDataProvider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.torrent.utils import remove_special_chars_and_parentheses
from media_manager.tv.schemas import Episode, Season, Show, SeasonNumber, EpisodeNumber
from media_manager.movies.schemas import Movie
from media_manager.notification.manager import notification_manager
@@ -60,7 +61,11 @@ class TmdbMetadataProvider(AbstractMetadataProvider):
def __search_tv(self, query: str, page: int) -> dict:
try:
response = requests.get(
url=f"{self.url}/tv/search", params={"query": query, "page": page}
url=f"{self.url}/tv/search",
params={
"query": remove_special_chars_and_parentheses(query),
"page": page,
},
)
response.raise_for_status()
return response.json()
@@ -104,7 +109,11 @@ class TmdbMetadataProvider(AbstractMetadataProvider):
def __search_movie(self, query: str, page: int) -> dict:
try:
response = requests.get(
url=f"{self.url}/movies/search", params={"query": query, "page": page}
url=f"{self.url}/movies/search",
params={
"query": remove_special_chars_and_parentheses(query),
"page": page,
},
)
response.raise_for_status()
return response.json()

View File

@@ -8,6 +8,7 @@ from media_manager.metadataProvider.abstractMetaDataProvider import (
AbstractMetadataProvider,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.torrent.utils import remove_special_chars_and_parentheses
from media_manager.tv.schemas import Episode, Season, Show, SeasonNumber
from media_manager.movies.schemas import Movie
@@ -29,7 +30,10 @@ class TvdbMetadataProvider(AbstractMetadataProvider):
return requests.get(f"{self.url}/tv/seasons/{id}").json()
def __search_tv(self, query: str) -> dict:
return requests.get(f"{self.url}/tv/search", params={"query": query}).json()
return requests.get(
f"{self.url}/tv/search",
params={"query": remove_special_chars_and_parentheses(query)},
).json()
def __get_trending_tv(self) -> dict:
return requests.get(f"{self.url}/tv/trending").json()
@@ -38,7 +42,10 @@ class TvdbMetadataProvider(AbstractMetadataProvider):
return requests.get(f"{self.url}/movies/{id}").json()
def __search_movie(self, query: str) -> dict:
return requests.get(f"{self.url}/movies/search", params={"query": query}).json()
return requests.get(
f"{self.url}/movies/search",
params={"query": remove_special_chars_and_parentheses(query)},
).json()
def __get_trending_movies(self) -> dict:
return requests.get(f"{self.url}/movies/trending").json()

View File

@@ -12,7 +12,7 @@ from media_manager.indexer.schemas import (
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.utils import detect_unknown_media
from media_manager.torrent.utils import get_importable_media_directories
from media_manager.torrent.schemas import Torrent
from media_manager.movies import log
from media_manager.movies.schemas import (
@@ -104,15 +104,7 @@ def get_all_importable_movies(
"""
get a list of unknown movies that were detected in the movie directory and are importable
"""
directories = detect_unknown_media(AllEncompassingConfig().misc.movie_directory)
movies = []
for directory in directories:
movies.append(
movie_service.get_import_candidates(
movie=directory, metadata_provider=metadata_provider
)
)
return movies
return movie_service.get_importable_movies(metadata_provider=metadata_provider)
@router.post(
@@ -127,7 +119,7 @@ def import_detected_movie(
get a list of unknown movies that were detected in the movie directory and are importable
"""
source_directory = Path(directory)
if source_directory not in detect_unknown_media(
if source_directory not in get_importable_media_directories(
AllEncompassingConfig().misc.movie_directory
):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")

View File

@@ -37,7 +37,9 @@ from media_manager.torrent.utils import (
import_file,
get_files_for_import,
remove_special_characters,
strip_trailing_year,
get_importable_media_directories,
extract_external_id_from_string,
remove_special_chars_and_parentheses,
)
from media_manager.indexer.service import IndexerService
from media_manager.metadataProvider.abstractMetaDataProvider import (
@@ -641,7 +643,7 @@ class MovieService:
self, movie: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_movie(
strip_trailing_year(movie.name), metadata_provider
remove_special_chars_and_parentheses(movie.name), metadata_provider
)
import_candidates = MediaImportSuggestion(
directory=movie, candidates=search_result
@@ -652,8 +654,17 @@ class MovieService:
return import_candidates
def import_existing_movie(self, movie: Movie, source_directory: Path) -> bool:
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.error(
f"Failed to rename directory '{source_directory}' to '{new_source_path}': {e}"
)
raise Exception("Failed to rename directory") from e
video_files, subtitle_files, all_files = get_files_for_import(
directory=source_directory
directory=new_source_path
)
success = self.import_movie(
@@ -672,15 +683,6 @@ class MovieService:
)
)
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.error(
f"Failed to rename directory '{source_directory}' to '{new_source_path}': {e}"
)
return False
return success
def update_movie_metadata(
@@ -716,6 +718,37 @@ class MovieService:
metadata_provider.download_movie_poster_image(movie=updated_movie)
return updated_movie
def get_importable_movies(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
movie_root_path = AllEncompassingConfig().misc.movie_directory
importable_movies: list[MediaImportSuggestion] = []
candidate_dirs = get_importable_media_directories(movie_root_path)
for movie_dir in candidate_dirs:
metadata, external_id = extract_external_id_from_string(movie_dir.name)
if metadata is not None and external_id is not None:
try:
self.movie_repository.get_movie_by_external_id(
external_id=external_id, metadata_provider=metadata
)
log.debug(
f"Movie {movie_dir.name} already exists in the database, skipping."
)
continue
except NotFoundError:
log.debug(
f"Movie {movie_dir.name} not found in database, checking for import candidates."
)
import_candidates = self.get_import_candidates(
movie=movie_dir, metadata_provider=metadata_provider
)
importable_movies.append(import_candidates)
log.debug(f"Found {len(importable_movies)} importable movies.")
return importable_movies
def auto_download_all_approved_movie_requests() -> None:
"""

View File

@@ -190,31 +190,60 @@ def remove_special_characters(filename: str) -> str:
return sanitized
def strip_trailing_year(title: str) -> str:
def remove_special_chars_and_parentheses(title: str) -> str:
"""
Removes a trailing space + (4-digit year) at end of string
Removes special characters and bracketed information from the title.
:param title: The original title.
:return: A sanitized version of the title.
"""
return re.sub(r"\s*\(\d{4}\)\s*$", "", title).strip()
# Remove content within brackets
sanitized = re.sub(r"\[.*?\]", "", title)
# Remove content within curly brackets
sanitized = re.sub(r"\{.*?\}", "", sanitized)
# Remove year within parentheses
sanitized = re.sub(r"\(\d{4}\)", "", sanitized)
# Remove special characters
sanitized = remove_special_characters(sanitized)
# Collapse multiple whitespace characters and trim the result
sanitized = re.sub(r"\s+", " ", sanitized).strip()
return sanitized
def detect_unknown_media(path: Path) -> list[Path]:
def get_importable_media_directories(path: Path) -> list[Path]:
libraries = []
libraries.extend(AllEncompassingConfig().misc.movie_libraries)
libraries.extend(AllEncompassingConfig().misc.tv_libraries)
show_dirs = path.glob("*")
log.debug(f"Using Directory {path}")
unknown_tv_shows = []
for media_dir in show_dirs:
# check if directory is one created by MediaManager (contains [tmdbd/tvdbid-0000) or if it is a library
if (
re.search(r"\[(?:tmdbid|tvdbid)-\d+]", media_dir.name, re.IGNORECASE)
or media_dir.absolute()
in [Path(library.path).absolute() for library in libraries]
or media_dir.name.startswith(".")
library_paths = {Path(library.path).absolute() for library in libraries}
unfiltered_dirs = [d for d in path.glob("*") if d.is_dir()]
media_dirs = []
for media_dir in unfiltered_dirs:
if media_dir.absolute() not in library_paths and not media_dir.name.startswith(
"."
):
log.debug(f"MediaManager directory detected: {media_dir.name}")
else:
log.info(f"Detected unknown media directory: {media_dir.name}")
unknown_tv_shows.append(media_dir)
return unknown_tv_shows
media_dirs.append(media_dir)
return media_dirs
def extract_external_id_from_string(input_string: str) -> tuple[str | None, int | None]:
"""
Extracts an external ID (tmdb/tvdb ID) from the given string.
:param input_string: The string to extract the ID from.
:return: The extracted Metadata Provider and ID or None if not found.
"""
match = re.search(
r"\b(tmdb|tvdb)(?:id)?[-_]?([0-9]+)\b", input_string, re.IGNORECASE
)
if match:
return match.group(1).lower(), int(match.group(2))
return None, None

View File

@@ -12,7 +12,7 @@ from media_manager.indexer.schemas import (
IndexerQueryResult,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.torrent.utils import detect_unknown_media
from media_manager.torrent.utils import get_importable_media_directories
from media_manager.torrent.schemas import Torrent
from media_manager.tv import log
from media_manager.exceptions import MediaAlreadyExists
@@ -125,15 +125,7 @@ def get_all_importable_shows(
"""
get a list of unknown shows that were detected in the tv directory and are importable
"""
directories = detect_unknown_media(AllEncompassingConfig().misc.tv_directory)
shows = []
for directory in directories:
shows.append(
tv_service.get_import_candidates(
tv_show=directory, metadata_provider=metadata_provider
)
)
return shows
return tv_service.get_importable_tv_shows(metadata_provider=metadata_provider)
@router.post(
@@ -146,7 +138,7 @@ def import_detected_show(tv_service: tv_service_dep, tv_show: show_dep, director
Import a detected show from the specified directory into the library.
"""
source_directory = Path(directory)
if source_directory not in detect_unknown_media(
if source_directory not in get_importable_media_directories(
AllEncompassingConfig().misc.tv_directory
):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")

View File

@@ -42,7 +42,9 @@ from media_manager.torrent.utils import (
import_file,
get_files_for_import,
remove_special_characters,
strip_trailing_year,
get_importable_media_directories,
extract_external_id_from_string,
remove_special_chars_and_parentheses,
)
from media_manager.indexer.service import IndexerService
from media_manager.metadataProvider.abstractMetaDataProvider import (
@@ -875,7 +877,7 @@ class TvService:
self, tv_show: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_show(
strip_trailing_year(tv_show.name), metadata_provider
remove_special_chars_and_parentheses(tv_show.name), metadata_provider
)
import_candidates = MediaImportSuggestion(
directory=tv_show, candidates=search_result
@@ -886,8 +888,15 @@ class TvService:
return import_candidates
def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> None:
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.error(f"Failed to rename {source_directory} to {new_source_path}: {e}")
raise Exception("Failed to rename source directory") from e
video_files, subtitle_files, all_files = get_files_for_import(
directory=source_directory
directory=new_source_path
)
for season in tv_show.seasons:
success, imported_episode_count = self.import_season(
@@ -906,11 +915,37 @@ class TvService:
if success or imported_episode_count > (len(season.episodes) / 2):
self.tv_repository.add_season_file(season_file=season_file)
new_source_path = source_directory.parent / ("." + source_directory.name)
try:
source_directory.rename(new_source_path)
except Exception as e:
log.error(f"Failed to rename {source_directory} to {new_source_path}: {e}")
def get_importable_tv_shows(
self, metadata_provider: AbstractMetadataProvider
) -> list[MediaImportSuggestion]:
tv_directory = AllEncompassingConfig().misc.tv_directory
import_suggestions: list[MediaImportSuggestion] = []
candidate_dirs = get_importable_media_directories(tv_directory)
for item in candidate_dirs:
metadata, external_id = extract_external_id_from_string(item.name)
if metadata is not None and external_id is not None:
try:
self.tv_repository.get_show_by_external_id(
external_id=external_id,
metadata_provider=metadata,
)
log.debug(
f"Show {item.name} already exists in the database, skipping import suggestion."
)
continue
except NotFoundError:
log.debug(
f"Show {item.name} not found in database, checking for import candidates."
)
import_suggestion = self.get_import_candidates(
tv_show=item, metadata_provider=metadata_provider
)
import_suggestions.append(import_suggestion)
log.debug(f"Detected {len(import_suggestions)} importable TV shows.")
return import_suggestions
def auto_download_all_approved_season_requests() -> None:

View File

@@ -29,7 +29,7 @@
});
client.GET('/api/v1/tv/importable').then(({ data, error }) => {
if (!error) {
importableMovies = data;
importableShows = data;
}
});
}