mirror of
https://github.com/sabnzbd/sabnzbd.git
synced 2026-01-14 02:20:23 -05:00
394 lines
16 KiB
Python
394 lines
16 KiB
Python
#!/usr/bin/python3 -OO
|
|
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
"""
|
|
tests.test_postproc- Tests of various functions in newspack, among which rar_renamer()
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
from unittest import mock
|
|
|
|
from sabnzbd import postproc
|
|
from sabnzbd.config import ConfigSorter, ConfigCat
|
|
from sabnzbd.filesystem import globber_full, clip_path
|
|
from sabnzbd.misc import sort_to_opts
|
|
|
|
from tests.testhelper import *
|
|
|
|
|
|
@pytest.mark.usefixtures("clean_cache_dir")
|
|
class TestPostProc:
|
|
# Helper function for rar_renamer tests
|
|
def _deobfuscate_dir(self, sourcedir, expected_filename_matches):
|
|
"""Function to deobfuscate one directory with rar_renamer()"""
|
|
# We create a workingdir inside the sourcedir, because the filenames are really changed
|
|
workingdir = os.path.join(SAB_CACHE_DIR, "workingdir_test_rar_renamer")
|
|
|
|
# if workingdir is still there from previous run, remove it:
|
|
if os.path.isdir(workingdir):
|
|
try:
|
|
shutil.rmtree(workingdir)
|
|
except PermissionError:
|
|
pytest.fail("Could not remove existing workingdir %s for rar_renamer" % workingdir)
|
|
|
|
# create a fresh copy
|
|
try:
|
|
shutil.copytree(sourcedir, workingdir)
|
|
except Exception:
|
|
pytest.fail("Could not create copy of files for rar_renamer")
|
|
|
|
# And now let the magic happen:
|
|
nzo = mock.Mock()
|
|
nzo.final_name = "somedownloadname"
|
|
nzo.download_path = workingdir
|
|
number_renamed_files = postproc.rar_renamer(nzo)
|
|
|
|
# run check on the resulting files
|
|
if expected_filename_matches:
|
|
for filename_match in expected_filename_matches:
|
|
if len(globber_full(workingdir, filename_match)) != expected_filename_matches[filename_match]:
|
|
pytest.fail("Failed filename_match %s in %s" % (filename_match, workingdir))
|
|
|
|
# Remove workingdir again
|
|
try:
|
|
shutil.rmtree(workingdir)
|
|
except Exception:
|
|
pytest.fail("Could not remove existing workingdir %s for rar_renamer" % workingdir)
|
|
|
|
return number_renamed_files
|
|
|
|
def test_rar_renamer_obfuscated_single_rar_set(self):
|
|
"""Test rar_renamer with obfuscated single rar set"""
|
|
sourcedir = os.path.join(SAB_DATA_DIR, "obfuscated_single_rar_set")
|
|
# Now define the filematches we want to see, in which amount ("*-*-*-*-*" are the input files):
|
|
expected_filename_matches = {"*part007.rar": 1, "*-*-*-*-*": 0}
|
|
assert self._deobfuscate_dir(sourcedir, expected_filename_matches) == 7
|
|
|
|
def test_rar_renamer_obfuscated_two_rar_sets(self):
|
|
"""Test rar_renamer with obfuscated two rar sets"""
|
|
sourcedir = os.path.join(SAB_DATA_DIR, "obfuscated_two_rar_sets")
|
|
expected_filename_matches = {"*part007.rar": 2, "*part009.rar": 1, "*-*-*-*-*": 0}
|
|
assert self._deobfuscate_dir(sourcedir, expected_filename_matches) == 16
|
|
|
|
def test_rar_renamer_obfuscated_but_no_rar(self):
|
|
"""Test rar_renamer with obfuscated files that are not rar sets"""
|
|
sourcedir = os.path.join(SAB_DATA_DIR, "obfuscated_but_no_rar")
|
|
expected_filename_matches = {"*.rar": 0, "*-*-*-*-*": 6}
|
|
assert self._deobfuscate_dir(sourcedir, expected_filename_matches) == 0
|
|
|
|
def test_rar_renamer_single_rar_set_missing_first_rar(self):
|
|
"""Test rar_renamer with single rar set missing first rar"""
|
|
# One obfuscated rar set, but first rar (.part1.rar) is missing
|
|
sourcedir = os.path.join(SAB_DATA_DIR, "obfuscated_single_rar_set_missing_first_rar")
|
|
# single rar set (of 6 obfuscated rar files), so we expect renaming
|
|
# thus result must 6 rar files, and 0 obfuscated files
|
|
expected_filename_matches = {"*.rar": 6, "*-*-*-*-*": 0}
|
|
# 6 files should have been renamed
|
|
assert self._deobfuscate_dir(sourcedir, expected_filename_matches) == 6
|
|
|
|
def test_rar_renamer_double_rar_set_missing_rar(self):
|
|
"""Test rar_renamer with two rar sets where some rars are missing"""
|
|
# Two obfuscated rar sets, but some rars are missing
|
|
sourcedir = os.path.join(SAB_DATA_DIR, "obfuscated_double_rar_set_missing_rar")
|
|
# Two sets, missing rar, so we expect no renaming
|
|
# thus result should be 0 rar files, and still 8 obfuscated files
|
|
expected_filename_matches = {"*.rar": 0, "*-*-*-*-*": 8}
|
|
# 0 files should have been renamed
|
|
assert self._deobfuscate_dir(sourcedir, expected_filename_matches) == 0
|
|
|
|
def test_rar_renamer_fully_encrypted_and_obfuscated(self):
|
|
"""Test rar_renamer with fully encrypted and obfuscated rar set"""
|
|
# fully encrypted rar-set, and obfuscated rar names
|
|
sourcedir = os.path.join(SAB_DATA_DIR, "fully_encrypted_and_obfuscated_rars")
|
|
# SABnzbd cannot do anything with this, so we expect no renaming
|
|
expected_filename_matches = {"*.rar": 0, "*-*-*-*-*": 6}
|
|
# 0 files should have been renamed
|
|
assert self._deobfuscate_dir(sourcedir, expected_filename_matches) == 0
|
|
|
|
@pytest.mark.parametrize("category", ["testcat", "Default", None])
|
|
@pytest.mark.parametrize("has_jobdir", [True, False]) # With or without a job dir
|
|
@pytest.mark.parametrize("has_catdir", [True, False]) # Complete directory is defined at category level
|
|
@pytest.mark.parametrize("has_active_sorter", [True, False]) # Sorter active for the fake nzo
|
|
@pytest.mark.parametrize("sort_string", ["%sn (%r)", "%sn (%r)/file.%ext", ""]) # Identical path result
|
|
@pytest.mark.parametrize("marker_file", [None, ".marker"])
|
|
@pytest.mark.parametrize("do_folder_rename", [True, False])
|
|
def test_prepare_extraction_path(
|
|
self, category, has_jobdir, has_catdir, has_active_sorter, sort_string, marker_file, do_folder_rename
|
|
):
|
|
# Ensure global CFG_ vars are initialised
|
|
sabnzbd.config.read_config(os.devnull)
|
|
|
|
# Define a sorter and a category (as @set_config cannot handle those)
|
|
ConfigSorter(
|
|
"sorter__test_prepare_extraction_path",
|
|
{
|
|
"order": 0,
|
|
"min_size": 42,
|
|
"multipart_label": "",
|
|
"sort_string": sort_string,
|
|
"sort_cats": [category if category else "no_such_category"],
|
|
"sort_type": [
|
|
sort_to_opts("all"),
|
|
],
|
|
"is_active": int(has_active_sorter),
|
|
},
|
|
)
|
|
assert sabnzbd.config.CFG_DATABASE["sorters"]["sorter__test_prepare_extraction_path"]
|
|
|
|
if category:
|
|
ConfigCat(
|
|
category,
|
|
{
|
|
"order": 0,
|
|
"pp": None,
|
|
"script": None,
|
|
"dir": (
|
|
os.path.join(SAB_CACHE_DIR, ("category_dir_for_" + category + ("*" if not has_jobdir else "")))
|
|
if has_catdir
|
|
else None
|
|
),
|
|
"newzbin": "",
|
|
"priority": 0,
|
|
},
|
|
)
|
|
assert sabnzbd.config.CFG_DATABASE["categories"][category]
|
|
|
|
# Mock a minimal nzo, required as function input
|
|
fake_nzo = mock.Mock()
|
|
fake_nzo.final_name = "FOSS.Rules.S23E06.2160p-SABnzbd"
|
|
fake_nzo.cat = category
|
|
fake_nzo.nzo_info = {} # Placeholder to prevent a crash in sorting.get_titles()
|
|
|
|
@set_config(
|
|
{
|
|
"download_dir": os.path.join(SAB_CACHE_DIR, "incomplete"),
|
|
"complete_dir": os.path.join(SAB_CACHE_DIR, "complete"),
|
|
"marker_file": marker_file,
|
|
"folder_rename": do_folder_rename,
|
|
}
|
|
)
|
|
def _func():
|
|
(
|
|
tmp_workdir_complete,
|
|
workdir_complete,
|
|
file_sorter,
|
|
not_create_job_dir,
|
|
marker_file_result,
|
|
) = postproc.prepare_extraction_path(fake_nzo)
|
|
|
|
tmp_workdir_complete = clip_path(tmp_workdir_complete)
|
|
workdir_complete = clip_path(workdir_complete)
|
|
|
|
# Verify marker file
|
|
if marker_file and not not_create_job_dir:
|
|
assert marker_file_result
|
|
else:
|
|
assert not marker_file_result
|
|
|
|
# Verify sorter
|
|
assert file_sorter
|
|
if has_active_sorter and category and sort_string:
|
|
assert file_sorter.sorter_active
|
|
else:
|
|
assert not file_sorter.sorter_active
|
|
|
|
# Verify not_create_job_dir
|
|
if category and has_catdir and not has_jobdir and not file_sorter.sorter_active:
|
|
assert not_create_job_dir
|
|
else:
|
|
# Double negatives ftw
|
|
assert not not_create_job_dir
|
|
|
|
# Verify workdir_complete
|
|
if not category or not has_catdir:
|
|
# Using standard Complete directory as base
|
|
assert workdir_complete.startswith(os.path.join(SAB_CACHE_DIR, "complete"))
|
|
elif category and has_catdir:
|
|
# Based on the category directory
|
|
assert workdir_complete.startswith(os.path.join(SAB_CACHE_DIR, "category_dir_for_" + category))
|
|
# Check the job directory part (or the lack thereof) as well
|
|
if has_active_sorter and category and sort_string:
|
|
# Sorter path, with an extra job name work directory inside
|
|
assert re.fullmatch(
|
|
re.escape(SAB_CACHE_DIR)
|
|
+ r".*"
|
|
+ re.escape(os.sep)
|
|
+ r"Foss Rules \(2160p\)"
|
|
+ re.escape(os.sep)
|
|
+ fake_nzo.final_name
|
|
+ r"(\.\d+)?",
|
|
workdir_complete,
|
|
)
|
|
elif has_jobdir or not (category and has_catdir):
|
|
# Standard job name directory
|
|
assert re.fullmatch(
|
|
re.escape(SAB_CACHE_DIR) + r".*" + re.escape(os.sep) + r"FOSS.Rules.S23E06.2160p-SABnzbd(\.\d+)?",
|
|
workdir_complete,
|
|
)
|
|
else:
|
|
# No job directory at all
|
|
assert re.fullmatch(
|
|
re.escape(SAB_CACHE_DIR) + r".*" + re.escape(os.sep) + r"category_dir_for_([a-zA-Z]+)",
|
|
workdir_complete,
|
|
)
|
|
|
|
# Verify tmp_workdir_complete
|
|
if do_folder_rename:
|
|
if not not_create_job_dir:
|
|
assert tmp_workdir_complete != workdir_complete
|
|
assert tmp_workdir_complete.replace("_UNPACK_", "") == workdir_complete
|
|
else:
|
|
assert tmp_workdir_complete == workdir_complete
|
|
|
|
_func()
|
|
|
|
|
|
class TestNzbOnlyDownload:
|
|
@mock.patch("sabnzbd.postproc.process_single_nzb")
|
|
@mock.patch("sabnzbd.postproc.listdir_full")
|
|
def test_process_nzb_only_download_single_nzb(self, mock_listdir, mock_process_single_nzb):
|
|
"""Test process_nzb_only_download with a single NZB file"""
|
|
# Setup mock NZO
|
|
fake_nzo = mock.Mock()
|
|
fake_nzo.final_name = "TestDownload"
|
|
fake_nzo.pp = 3
|
|
fake_nzo.script = "test_script.py"
|
|
fake_nzo.cat = "movies"
|
|
fake_nzo.url = "http://example.com/test.nzb"
|
|
fake_nzo.priority = 0
|
|
|
|
# Mock single NZB file
|
|
workdir = os.path.join(SAB_CACHE_DIR, "test_workdir")
|
|
nzb_file = os.path.join(workdir, "test.nzb")
|
|
mock_listdir.return_value = [nzb_file]
|
|
|
|
# Call the function
|
|
result = postproc.process_nzb_only_download(workdir, fake_nzo)
|
|
|
|
# Verify result
|
|
assert result == [nzb_file]
|
|
|
|
# Verify process_single_nzb was called with correct arguments
|
|
mock_process_single_nzb.assert_called_once_with(
|
|
"test.nzb",
|
|
nzb_file,
|
|
pp=3,
|
|
script="test_script.py",
|
|
cat="movies",
|
|
url="http://example.com/test.nzb",
|
|
priority=0,
|
|
nzbname="TestDownload",
|
|
dup_check=False,
|
|
)
|
|
|
|
@mock.patch("sabnzbd.postproc.process_single_nzb")
|
|
@mock.patch("sabnzbd.postproc.listdir_full")
|
|
def test_process_nzb_only_download_multiple_nzbs(self, mock_listdir, mock_process_single_nzb):
|
|
"""Test process_nzb_only_download with multiple NZB files"""
|
|
# Setup mock NZO
|
|
fake_nzo = mock.Mock()
|
|
fake_nzo.final_name = "TestDownload"
|
|
fake_nzo.pp = 2
|
|
fake_nzo.script = None
|
|
fake_nzo.cat = "tv"
|
|
fake_nzo.url = "http://example.com/test.nzb"
|
|
fake_nzo.priority = 1
|
|
|
|
# Mock multiple NZB files
|
|
workdir = os.path.join(SAB_CACHE_DIR, "test_workdir")
|
|
first_nzb = os.path.join(workdir, "first.nzb")
|
|
second_nzb = os.path.join(workdir, "second.nzb")
|
|
mock_listdir.return_value = [first_nzb, second_nzb]
|
|
|
|
# Call the function
|
|
result = postproc.process_nzb_only_download(workdir, fake_nzo)
|
|
|
|
# Verify result
|
|
assert result == [first_nzb, second_nzb]
|
|
|
|
# Verify process_single_nzb was called twice with correct arguments
|
|
assert mock_process_single_nzb.call_count == 2
|
|
mock_process_single_nzb.assert_any_call(
|
|
"first.nzb",
|
|
first_nzb,
|
|
pp=2,
|
|
script=None,
|
|
cat="tv",
|
|
url="http://example.com/test.nzb",
|
|
priority=1,
|
|
nzbname="TestDownload - first.nzb",
|
|
dup_check=False,
|
|
)
|
|
mock_process_single_nzb.assert_any_call(
|
|
"second.nzb",
|
|
second_nzb,
|
|
pp=2,
|
|
script=None,
|
|
cat="tv",
|
|
url="http://example.com/test.nzb",
|
|
priority=1,
|
|
nzbname="TestDownload - second.nzb",
|
|
dup_check=False,
|
|
)
|
|
|
|
@mock.patch("sabnzbd.postproc.process_single_nzb")
|
|
@mock.patch("sabnzbd.postproc.listdir_full")
|
|
def test_process_nzb_only_download_mixed_files(self, mock_listdir, mock_process_single_nzb):
|
|
"""Test process_nzb_only_download with mixed file types returns None"""
|
|
# Setup mock NZO
|
|
fake_nzo = mock.Mock()
|
|
fake_nzo.final_name = "TestDownload"
|
|
|
|
# Mock mixed files (NZB and non-NZB)
|
|
workdir = os.path.join(SAB_CACHE_DIR, "test_workdir")
|
|
mock_listdir.return_value = [
|
|
os.path.join(workdir, "test.nzb"),
|
|
os.path.join(workdir, "readme.txt"),
|
|
]
|
|
|
|
# Call the function
|
|
result = postproc.process_nzb_only_download(workdir, fake_nzo)
|
|
|
|
# Verify result is None (not NZB-only)
|
|
assert result is None
|
|
|
|
# Verify process_single_nzb was NOT called
|
|
mock_process_single_nzb.assert_not_called()
|
|
|
|
@mock.patch("sabnzbd.postproc.process_single_nzb")
|
|
@mock.patch("sabnzbd.postproc.listdir_full")
|
|
def test_process_nzb_only_download_empty_directory(self, mock_listdir, mock_process_single_nzb):
|
|
"""Test process_nzb_only_download with empty directory returns None"""
|
|
# Setup mock NZO
|
|
fake_nzo = mock.Mock()
|
|
fake_nzo.final_name = "TestDownload"
|
|
|
|
# Mock empty directory
|
|
workdir = os.path.join(SAB_CACHE_DIR, "test_workdir")
|
|
mock_listdir.return_value = []
|
|
|
|
# Call the function
|
|
result = postproc.process_nzb_only_download(workdir, fake_nzo)
|
|
|
|
# Verify result is None (no files)
|
|
assert result is None
|
|
|
|
# Verify process_single_nzb was NOT called
|
|
mock_process_single_nzb.assert_not_called()
|