Compare commits

...

3 Commits

Author SHA1 Message Date
Safihre
efe17ca3bb Changes to fix the tests 2025-10-10 15:10:38 +02:00
Safihre
d4995e3120 Only clean up files created by the specific download 2025-10-10 14:15:47 +02:00
Safihre
90989b374a Let Sorter track file changes 2025-10-10 13:44:47 +02:00
3 changed files with 152 additions and 58 deletions

View File

@@ -488,7 +488,7 @@ def process_job(nzo: NzbObject) -> bool:
if all_ok:
# Remove files matching the cleanup list
cleanup_list(tmp_workdir_complete, skip_nzb=True)
newfiles = cleanup_list(newfiles, skip_nzb=True)
# Check if this is an NZB-only download, if so redirect to queue
# except when PP was Download-only
@@ -501,7 +501,7 @@ def process_job(nzo: NzbObject) -> bool:
cleanup_empty_directories(tmp_workdir_complete)
else:
# Full cleanup including nzb's
cleanup_list(tmp_workdir_complete, skip_nzb=False)
newfiles = cleanup_list(newfiles, skip_nzb=False)
script_ret = 0
script_error = False
@@ -536,7 +536,7 @@ def process_job(nzo: NzbObject) -> bool:
# TV/Movie/Date Renaming code part 2 - rename and move files to parent folder
if all_ok and file_sorter.sorter_active:
if newfiles:
workdir_complete, ok = file_sorter.rename(newfiles, workdir_complete)
workdir_complete, ok, newfiles = file_sorter.rename(newfiles, workdir_complete)
if not ok:
nzo.set_unpack_info("Unpack", T("Failed to move files"))
nzo.fail_msg = T("Failed to move files")
@@ -607,9 +607,9 @@ def process_job(nzo: NzbObject) -> bool:
unique=True,
)
# Cleanup again, including NZB files
# Cleanup again, any changes made by the script will not be handled
if all_ok and os.path.isdir(workdir_complete):
cleanup_list(workdir_complete, False)
newfiles = cleanup_list(newfiles, False)
# Force error for empty result
all_ok = all_ok and not empty
@@ -1101,27 +1101,34 @@ def handle_empty_queue():
sabnzbd.LIBC.malloc_trim(0)
def cleanup_list(wdir: str, skip_nzb: bool):
def cleanup_list(file_paths: List[str], skip_nzb: bool) -> List[str]:
"""Remove all files whose extension matches the cleanup list,
optionally ignoring the nzb extension
optionally ignoring the nzb extension.
Returns the updated list of files (excluding removed files).
"""
if cfg.cleanup_list():
try:
with os.scandir(wdir) as files:
for entry in files:
if entry.is_dir():
cleanup_list(entry.path, skip_nzb)
cleanup_empty_directories(entry.path)
else:
if on_cleanup_list(entry.name, skip_nzb):
try:
logging.info("Removing unwanted file %s", entry.path)
remove_file(entry.path)
except Exception:
logging.error(T("Removing %s failed"), clip_path(entry.path))
logging.info("Traceback: ", exc_info=True)
except Exception:
logging.info("Traceback: ", exc_info=True)
if not cfg.cleanup_list():
return file_paths
logging.info("Checking for extensions to clean up: %s", cfg.cleanup_list.get_string())
remaining_files = []
for file_path in file_paths:
filename = os.path.basename(file_path)
if on_cleanup_list(filename, skip_nzb):
try:
logging.info("Removing unwanted file %s", file_path)
remove_file(file_path)
# File was removed, don't add to remaining_files
except Exception:
logging.error(T("Removing %s failed"), clip_path(file_path))
logging.info("Traceback: ", exc_info=True)
# File removal failed, keep it in the list
remaining_files.append(file_path)
else:
# File not on cleanup list, keep it
remaining_files.append(file_path)
return remaining_files
def prefix(path: str, pre: str) -> str:
@@ -1274,6 +1281,7 @@ def del_marker(path: str):
def remove_from_list(name: Optional[str], lst: List[str]):
"""Removes item from list, modifies list in place"""
if name:
for n in range(len(lst)):
if lst[n].endswith(name):

View File

@@ -501,6 +501,39 @@ class Sorter:
logging.info("Traceback: ", exc_info=True)
return success
def _update_files_after_renames(self, base_path: str, original_files: List[str]) -> List[str]:
"""Update files list to reflect any renames that may have occurred in the base_path"""
updated_files = []
renamed_files = set() # Track files that no longer exist at their original paths
for file_path in original_files:
# Convert to absolute path for checking
if os.path.isabs(file_path):
abs_file_path = file_path
else:
abs_file_path = os.path.join(base_path, file_path)
abs_file_path = os.path.normpath(abs_file_path)
# If the original file still exists, keep it
if os.path.exists(abs_file_path):
updated_files.append(file_path)
else:
renamed_files.add(os.path.basename(file_path))
# If any files were renamed, add all current files in the base_path (excluding originals)
if renamed_files:
try:
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isfile(item_path) and item not in renamed_files:
# Only add if not already in the list (to avoid duplicates)
if item_path not in updated_files:
updated_files.append(item_path)
except (OSError, FileNotFoundError):
pass
return updated_files
def _to_filepath(self, f: str, base_path: str) -> str:
if not is_full_path(f):
f = os.path.join(base_path, f)
@@ -515,23 +548,24 @@ class Sorter:
and os.stat(filepath).st_size >= self.rename_limit
)
def rename(self, files: List[str], base_path: str) -> Tuple[str, bool]:
def rename(self, files: List[str], base_path: str) -> Tuple[str, bool, List[str]]:
if not self.rename_files:
return move_to_parent_directory(base_path)
return move_to_parent_directory(base_path, files)
# Log the minimum filesize for renaming
if self.rename_limit > 0:
logging.debug("Minimum filesize for renaming set to %s bytes", self.rename_limit)
# Store the list of all files for later use
all_files = files
all_files = files[:]
updated_files = files[:]
# Filter files to remove nonexistent, undersized, samples, and excluded extensions
files = [f for f in files if self._filter_files(f, base_path)]
if len(files) == 0:
logging.debug("No files left to rename after applying filter")
return move_to_parent_directory(base_path)
return move_to_parent_directory(base_path, updated_files)
# Check for season packs or sequential filenames and handle their renaming separately;
# if neither applies or succeeds, fall back to going with the single largest file instead.
@@ -541,7 +575,9 @@ class Sorter:
logging.debug("Trying to rename season pack files %s", files)
if self._rename_season_pack(files, base_path, all_files):
cleanup_empty_directories(base_path)
return move_to_parent_directory(base_path)
# Update the files list to reflect any renames that happened
updated_files = self._update_files_after_renames(base_path, updated_files)
return move_to_parent_directory(base_path, updated_files)
else:
logging.debug("Season pack sorting didn´t rename any files")
@@ -550,7 +586,9 @@ class Sorter:
logging.debug("Trying to rename sequential files %s", sequential_files)
if self._rename_sequential(sequential_files, base_path):
cleanup_empty_directories(base_path)
return move_to_parent_directory(base_path)
# Update the files list to reflect any renames that happened
updated_files = self._update_files_after_renames(base_path, updated_files)
return move_to_parent_directory(base_path, updated_files)
else:
logging.debug("Sequential file handling didn't rename any files")
@@ -575,14 +613,16 @@ class Sorter:
renamer(filepath, new_filepath)
renamed_files.append(new_filepath)
except Exception:
logging.error(T("Failed to rename %s to %s"), clip_path(base_path), clip_path(new_filepath))
logging.error(T("Failed to rename %s to %s"), clip_path(filepath), clip_path(new_filepath))
logging.info("Traceback: ", exc_info=True)
rename_similar(base_path, f_ext, self.filename_set, renamed_files)
else:
logging.debug("Cannot rename %s, new path %s already exists.", largest_file.get("name"), new_filepath)
return move_to_parent_directory(base_path)
# Update the files list to reflect any renames that happened
updated_files = self._update_files_after_renames(base_path, updated_files)
return move_to_parent_directory(base_path, updated_files)
class BasicAnalyzer(Sorter):
@@ -607,29 +647,55 @@ def ends_in_file(path: str) -> bool:
return bool(RE_ENDEXT.search(path) or RE_ENDFN.search(path))
def move_to_parent_directory(workdir: str) -> Tuple[str, bool]:
"""Move all files under 'workdir' into 'workdir/..'"""
# Determine 'folder'/..
def move_to_parent_directory(workdir: str, files: List[str]) -> Tuple[str, bool, List[str]]:
"""Move specified files from workdir to workdir's parent directory and track file movements"""
if not files:
return workdir, True, []
# Determine 'workdir/..' as destination
workdir = os.path.abspath(os.path.normpath(workdir))
dest = os.path.abspath(os.path.normpath(os.path.join(workdir, "..")))
logging.debug("Moving files from %s to parent directory: %s", workdir, dest)
logging.debug("Moving all files from %s to %s", workdir, dest)
updated_files = []
# Check for DVD folders and bail out if found
for item in os.listdir(workdir):
if item.lower() in IGNORED_MOVIE_FOLDERS:
return workdir, True
try:
for item in os.listdir(workdir):
if os.path.isdir(os.path.join(workdir, item)) and item.lower() in IGNORED_MOVIE_FOLDERS:
return workdir, True, files
except (OSError, FileNotFoundError):
# Skip directory listing if directory doesn't exist
pass
for root, dirs, files in os.walk(workdir):
for _file in files:
path = os.path.join(root, _file)
new_path = path.replace(workdir, dest)
ok, new_path = move_to_path(path, new_path)
if not ok:
return dest, False
# Move each file to the parent directory
for file_path in files:
# Convert relative paths to absolute paths within workdir
if os.path.isabs(file_path):
abs_file_path = file_path
else:
abs_file_path = os.path.join(workdir, file_path)
abs_file_path = os.path.normpath(abs_file_path)
if not os.path.exists(abs_file_path):
# Skip files that don't exist
continue
filename = os.path.basename(abs_file_path)
new_path = os.path.join(dest, filename)
ok, new_path = move_to_path(abs_file_path, new_path)
if not ok:
return dest, False, files
# Track this file as it was moved
updated_files.append(new_path)
# Clean up empty directories in the workdir
cleanup_empty_directories(workdir)
return dest, True
# Return the parent directory and list of files that were actually moved
return dest, True, updated_files
def guess_what(name: str) -> MatchesDict:

View File

@@ -351,13 +351,15 @@ class TestSortingFunctions:
ffs.fs.create_file(base_dir + "/" + test_file, int("0644", 8))
assert os.path.exists(base_dir + "/" + test_file) is True
return_path, return_status = sorting.move_to_parent_directory(base_dir + "/TEST")
# Create the file list to move
files_to_move = [base_dir + "/TEST/DIR/FILE"]
return_path, return_status, return_files = sorting.move_to_parent_directory(base_dir + "/TEST", files_to_move)
# Affected by move
assert not os.path.exists(base_dir + "/TEST/DIR/FILE") # Moved to subdir
assert not os.path.exists(base_dir + "/TEST/DIR2") # Deleted empty directory
assert not os.path.exists(base_dir + "/DIR2") # Dirs don't get moved, only their file content
assert os.path.exists(base_dir + "/DIR/FILE") # Moved file
assert os.path.exists(base_dir + "/FILE") # Moved file
# Not moved
assert not os.path.exists(base_dir + "/some.file")
assert not os.path.exists(base_dir + "/2")
@@ -366,6 +368,8 @@ class TestSortingFunctions:
# Function return values
assert (return_path) == base_dir
assert (return_status) is True
assert len(return_files) == 1
assert return_files[0] == base_dir + "/FILE"
# Exception for DVD directories
with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
@@ -380,13 +384,15 @@ class TestSortingFunctions:
ffs.fs.create_file(base_dir + "/" + test_file, int("0644", 8))
assert os.path.exists(base_dir + "/" + test_file) is True
return_path, return_status = sorting.move_to_parent_directory(base_dir + "/TEST")
# Create the file list to move (includes file in DVD directory)
files_to_move = [base_dir + "/TEST/" + dvd + "/FILE"]
return_path, return_status, return_files = sorting.move_to_parent_directory(base_dir + "/TEST", files_to_move)
# Nothing should move in the presence of a DVD directory structure
assert os.path.exists(base_dir + "/TEST/" + dvd + "/FILE")
assert os.path.exists(base_dir + "/TEST/DIR2")
assert not os.path.exists(base_dir + "/DIR2")
assert not os.path.exists(base_dir + "/DIR/FILE")
assert not os.path.exists(base_dir + "/FILE")
assert not os.path.exists(base_dir + "/some.file")
assert not os.path.exists(base_dir + "/2")
assert os.path.exists(base_dir + "/dir/some.file")
@@ -394,6 +400,8 @@ class TestSortingFunctions:
# Function return values
assert (return_path) == base_dir + "/TEST"
assert (return_status) is True
# Files should be returned as-is when DVD structure prevents moving
assert return_files == files_to_move
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows tests")
def test_move_to_parent_directory_win(self):
@@ -409,13 +417,15 @@ class TestSortingFunctions:
ffs.fs.create_file(base_dir + "\\" + test_file, int("0644", 8))
assert os.path.exists(base_dir + "\\" + test_file) is True
return_path, return_status = sorting.move_to_parent_directory(base_dir + "\\TEST")
# Create the file list to move
files_to_move = [base_dir + "\\TEST\\DIR\\FILE"]
return_path, return_status, return_files = sorting.move_to_parent_directory(base_dir + "\\TEST", files_to_move)
# Affected by move
assert not os.path.exists(base_dir + "\\TEST\\DIR\\FILE") # Moved to subdir
assert not os.path.exists(base_dir + "\\TEST\\DIR2") # Deleted empty directory
assert not os.path.exists(base_dir + "\\DIR2") # Dirs don't get moved, only their file content
assert os.path.exists(base_dir + "\\DIR\\FILE") # Moved file
assert os.path.exists(base_dir + "\\FILE") # Moved file
# Not moved
assert not os.path.exists(base_dir + "\\some.file")
assert not os.path.exists(base_dir + "\\2")
@@ -424,6 +434,8 @@ class TestSortingFunctions:
# Function return values
assert (return_path) == base_dir
assert (return_status) is True
assert len(return_files) == 1
assert return_files[0] == base_dir + "\\FILE"
# Exception for DVD directories
with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
@@ -438,20 +450,24 @@ class TestSortingFunctions:
ffs.fs.create_file(base_dir + "\\" + test_file, int("0644", 8))
assert os.path.exists(base_dir + "\\" + test_file) is True
return_path, return_status = sorting.move_to_parent_directory(base_dir + "\\TEST")
# Create the file list to move (includes file in DVD directory)
files_to_move = [base_dir + "\\TEST\\" + dvd + "\\FILE"]
return_path, return_status, return_files = sorting.move_to_parent_directory(base_dir + "\\TEST", files_to_move)
# Nothing should move in the presence of a DVD directory structure
assert os.path.exists(base_dir + "\\TEST\\" + dvd + "\\FILE")
assert os.path.exists(base_dir + "\\TEST\\DIR2")
assert not os.path.exists(base_dir + "\\DIR2")
assert not os.path.exists(base_dir + "\\DIR\\FILE")
assert not os.path.exists(base_dir + "\\FILE")
assert not os.path.exists(base_dir + "\\some.file")
assert not os.path.exists(base_dir + "\\2")
assert os.path.exists(base_dir + "\\dir\\some.file")
assert os.path.exists(base_dir + "\\dir\\2")
# Function return values
# Function return values - should return original directory when DVD structure found
assert (return_path) == base_dir + "\\TEST"
assert (return_status) is True
# Files should be returned as-is when DVD structure prevents moving
assert return_files == files_to_move
@pytest.mark.usefixtures("clean_cache_dir")
@@ -766,6 +782,10 @@ class TestSortingSorter:
):
"""Test the file renaming of the Sorter class"""
with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
# Add guessit package directory to real paths so it can access its config files
import guessit
guessit_path = os.path.dirname(guessit.__file__)
ffs.fs.add_real_paths([guessit_path])
# Make up a job name
job_name = "Simulated.Job." + job_tag + ".2160p.Web.x264-SAB"
@@ -816,7 +836,7 @@ class TestSortingSorter:
)
sorter.get_values()
sorter.construct_path()
sort_dest, is_ok = sorter.rename(all_files, job_dir)
sort_dest, is_ok, updated_files = sorter.rename(all_files, job_dir)
# Check the result
try:
@@ -1314,7 +1334,7 @@ class TestSortingSorter:
sorted_path = sorter.construct_path()
# Check season pack status again after constructing the path
assert sorter.is_season_pack is result_is_season_pack_later
sorted_dest, sorted_ok = sorter.rename(globber(job_dir), job_dir)
sorted_dest, sorted_ok, updated_files = sorter.rename(globber(job_dir), job_dir)
# Verify the results
for pattern, number in result_globs.items():