Files
sabnzbd/tests/test_assembler.py
2026-03-23 11:41:30 +01:00

453 lines
20 KiB
Python

#!/usr/bin/python3 -OO
# Copyright 2007-2026 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_assembler - Testing functions in assembler.py
"""
from types import SimpleNamespace
from zlib import crc32
from sabnzbd.assembler import Assembler
from sabnzbd.constants import GIGI
from sabnzbd.filesystem import Diskspace
from sabnzbd.nzb import Article, NzbFile, NzbObject
from tests.testhelper import *
class TestAssembler:
@pytest.fixture
def assembler(self, tmp_path):
"""Prepare a sabnzbd assembler, tmp_path is used because C libraries require a real filesystem."""
try:
sabnzbd.Downloader = SimpleNamespace(paused=False)
sabnzbd.ArticleCache = SimpleNamespace()
sabnzbd.Assembler = Assembler()
# Create a minimal NzbObject / NzbFile
self.nzo = NzbObject("test.nzb")
admin_path = str(tmp_path / "admin")
with mock.patch.object(
NzbObject,
"admin_path",
new_callable=mock.PropertyMock,
) as admin_path_mock:
admin_path_mock.return_value = admin_path
self.nzo.download_path = str(tmp_path / "download")
os.mkdir(self.nzo.download_path)
os.mkdir(self.nzo.admin_path)
# NzbFile requires some constructor args; use dummy but valid values
self.nzf = NzbFile(
date=self.nzo.avg_date,
subject="test-file",
raw_article_db=[[None, None]],
file_bytes=0,
nzo=self.nzo,
)
self.nzo.files.append(self.nzf)
self.nzf.type = "yenc" # for writes from article cache
assert self.nzf.prepare_filepath() is not None
# Clear the state after prepare_filepath
self.nzf.articles.clear()
self.nzf.decodetable.clear()
with mock.patch.object(Assembler, "write", wraps=Assembler.write) as mocked_assembler_write:
yield mocked_assembler_write
# All articles should be marked on_disk
for article in self.nzf.decodetable:
assert article.on_disk is True
# File should be marked assembled
assert self.nzf.assembled is True
finally:
# Reset values after test
del sabnzbd.Downloader
del sabnzbd.ArticleCache
del sabnzbd.Assembler
def _make_article(
self, nzf: NzbFile, offset: int, data: bytearray, decoded: bool = True, can_direct_write: bool = True
) -> tuple[Article, bytearray]:
article = Article("msgid", len(data), nzf)
article.decoded = decoded
article.data_begin = offset
article.data_size = len(data) if can_direct_write else None
article.file_size = nzf.bytes
article.decoded_size = len(data)
article.crc32 = crc32(data)
article.tries = 1 # force aborts if never tried
return article, data
def _make_request(
self,
nzf: NzbFile,
articles: list[tuple[Article, bytearray]],
):
article_data = {}
for article, raw in articles:
nzf.decodetable.append(article)
article_data[article] = raw
expected = b"".join(article_data.values())
nzf.bytes = len(expected)
sabnzbd.ArticleCache.load_article = mock.Mock(side_effect=lambda article: article_data.get(article))
for article, _ in articles:
article.file_size = nzf.bytes
return article_data.values(), expected
@staticmethod
def _assert_expected_content(nzf: NzbFile, expected: bytes):
with open(nzf.filepath, "rb") as f:
content = f.read()
assert content == expected
assert nzf.assembler_next_index == len(nzf.decodetable)
assert nzf.contiguous_offset() == nzf.decodetable[0].file_size
def test_assemble_direct_write(self, assembler):
"""Pure direct write mode"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello"), can_direct_write=True),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), can_direct_write=True),
],
)
assert self.nzf.contiguous_offset() == 0
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=True)
self._assert_expected_content(self.nzf, expected)
def test_assemble_direct_write_aborted_to_append(self, assembler):
"""
Start in direct_write, but encounter an article that cannot be direct-written.
Assembler should abort direct_write and switch to append mode.
"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello"), can_direct_write=True),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), can_direct_write=False),
self._make_article(self.nzf, offset=10, data=bytearray(b"12345"), can_direct_write=True),
],
)
# [0] direct_write, [1] append, [2] append
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=True)
self._assert_expected_content(self.nzf, expected)
def test_assemble_direct_append_direct_append(self, assembler):
"""Out-of-order direct write via cache, append fills the gap."""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello"), can_direct_write=True),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), can_direct_write=False),
self._make_article(
self.nzf, offset=10, data=bytearray(b"12345"), decoded=False, can_direct_write=False
),
self._make_article(
self.nzf, offset=15, data=bytearray(b"abcde"), decoded=False, can_direct_write=True
), # Cache direct
],
)
# [0] direct_write, [1] append
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=False, direct_write=True)
assert assembler.call_count == 2
assert self.nzf.contiguous_offset() == 10
# [3] direct_write
article = self.nzf.decodetable[3]
article.decoded = True
Assembler.assemble_article(article, sabnzbd.ArticleCache.load_article(article))
assert assembler.call_count == 3
assert self.nzf.contiguous_offset() == 10 # was not a sequential write
# [3] append
article = self.nzf.decodetable[2]
article.decoded = True
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=True)
assert assembler.call_count == 4
self._assert_expected_content(self.nzf, expected)
def test_assemble_direct_write_aborted_to_append_second_attempt(self, assembler):
"""Second attempt after initial partial assemble, including revert to append mode."""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello"), can_direct_write=True),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), can_direct_write=False),
self._make_article(
self.nzf, offset=10, data=bytearray(b"12345"), decoded=False, can_direct_write=False
),
],
)
# [0] direct_write, [1] append
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=False, direct_write=True)
assert self.nzf.decodetable[2].on_disk is False
self.nzf.decodetable[2].decoded = True
# [2] append
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=True)
self._assert_expected_content(self.nzf, expected)
def test_assemble_append_direct_second_attempt(self, assembler):
"""Second attempt after initial partial assemble"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello"), can_direct_write=False),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), decoded=False, can_direct_write=True),
],
)
# [0] append
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=False, direct_write=False)
self.nzf.decodetable[1].decoded = True
# [1] append
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=True)
self._assert_expected_content(self.nzf, expected)
def test_assemble_append_only(self, assembler):
"""Pure append mode"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"abcd"), can_direct_write=False),
self._make_article(self.nzf, offset=0, data=bytearray(b"efg"), can_direct_write=False),
],
)
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=False)
self._assert_expected_content(self.nzf, expected)
def test_assemble_append_second_attempt(self, assembler):
"""Pure append mode, second attempt"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"abcd"), can_direct_write=False),
self._make_article(self.nzf, offset=0, data=bytearray(b"efg"), decoded=False, can_direct_write=False),
],
)
# [0] append
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=False, direct_write=False)
assert self.nzf.assembled is False
self.nzf.decodetable[1].decoded = True
# [1] append
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=False)
self._assert_expected_content(self.nzf, expected)
def test_assemble_append_first_not_decoded(self, assembler):
"""Pure append mode, second attempt"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"abcd"), decoded=False, can_direct_write=False),
self._make_article(self.nzf, offset=0, data=bytearray(b"efg"), can_direct_write=False),
],
)
# Nothing written
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=False, direct_write=False)
assert not os.path.exists(self.nzf.filepath)
self.nzf.decodetable[0].decoded = True
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=False)
self._assert_expected_content(self.nzf, expected)
def test_force_append(self, assembler):
"""Force in direct_write mode, then fill in gaps in append mode"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello")),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), decoded=False, can_direct_write=False),
self._make_article(self.nzf, offset=10, data=bytearray(b"12345")),
self._make_article(self.nzf, offset=15, data=bytearray(b"abcd"), decoded=False, can_direct_write=False),
self._make_article(self.nzf, offset=19, data=bytearray(b"efg")),
],
)
# [0] direct, [2] direct, [4], direct
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=True, direct_write=True)
assert assembler.call_count == 3
assert self.nzf.assembled is False
# [1] append, [3], append
self.nzf.decodetable[1].decoded = True
self.nzf.decodetable[3].decoded = True
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=False)
assert assembler.call_count == 5
self._assert_expected_content(self.nzf, expected)
def test_force_force_direct(self, assembler):
"""Force the first, then force the last, then direct the gap"""
data, expected = self._make_request(
self.nzf,
[
self._make_article(self.nzf, offset=0, data=bytearray(b"hello")),
self._make_article(self.nzf, offset=5, data=bytearray(b"world"), decoded=False),
self._make_article(self.nzf, offset=10, data=bytearray(b"12345"), decoded=False),
],
)
# [0] direct
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=False, direct_write=True)
assert assembler.call_count == 1
assert self.nzf.assembler_next_index == 1
# Client restart
self.nzf.assembler_next_index = 0
# force: [2] direct
self.nzf.decodetable[2].decoded = True
Assembler.assemble(self.nzo, self.nzf, file_done=False, allow_non_contiguous=True, direct_write=True)
assert assembler.call_count == 2
assert self.nzf.assembler_next_index == 1
# [1] direct
self.nzf.decodetable[1].decoded = True
Assembler.assemble(self.nzo, self.nzf, file_done=True, allow_non_contiguous=False, direct_write=True)
assert assembler.call_count == 3
self._assert_expected_content(self.nzf, expected)
class TestDiskspaceCheck:
"""Tests for Assembler.diskspace_check"""
@pytest.fixture(autouse=True)
def setup_mocks(self):
self.nzo = mock.Mock()
self.nzo.bytes = int(2 * GIGI)
self.nzo.bytes_tried = 0
self.nzo.bytes_par2 = 0
self.nzf = mock.Mock()
self.nzf.bytes = int(0.5 * GIGI)
self.mock_downloader = mock.Mock()
self.mock_scheduler = mock.Mock()
self.mock_notifier = mock.Mock()
self.mock_emailer = mock.Mock()
try:
sabnzbd.Downloader = self.mock_downloader
sabnzbd.Scheduler = self.mock_scheduler
sabnzbd.notifier = self.mock_notifier
sabnzbd.emailer = self.mock_emailer
with (
mock.patch("sabnzbd.assembler.diskspace") as self.mock_diskspace,
mock.patch("sabnzbd.assembler.get_complete_directory") as self.mock_get_complete_dir,
mock.patch("sabnzbd.assembler.cfg") as self.mock_cfg,
):
# Defaults: plenty of space, no direct_unpack, autoresume on
self.mock_get_complete_dir.return_value = ("/complete", None, True)
self.mock_cfg.download_free.get_float.return_value = 1 * GIGI
self.mock_cfg.complete_free.get_float.return_value = 2 * GIGI
self.mock_cfg.direct_unpack.return_value = False
self.mock_cfg.fulldisk_autoresume.return_value = True
self.mock_cfg.download_dir.get_path.return_value = "/download"
yield
finally:
del sabnzbd.Downloader
del sabnzbd.Scheduler
del sabnzbd.notifier
del sabnzbd.emailer
def _set_diskspace(self, download_free_gb: float, complete_free_gb: float, complete_path: str = "/complete"):
self.mock_diskspace.return_value = (
Diskspace(path="/download", free=download_free_gb),
Diskspace(path=complete_path, free=complete_free_gb),
)
def test_download_dir_full(self):
"""Pause when download_dir has insufficient space"""
# download_free=1GiB, nzf.bytes=0.5GiB => required = 1.5 GiB, free = 1.0 GiB
self._set_diskspace(download_free_gb=1.0, complete_free_gb=50.0)
Assembler.diskspace_check(self.nzo, self.nzf)
expected_required = (1 * GIGI + self.nzf.bytes) / GIGI
self.mock_downloader.pause.assert_called_once()
self.mock_scheduler.plan_diskspace_resume.assert_called_once_with("/download", expected_required)
def test_complete_dir_full_direct_unpack(self):
"""Pause when complete_dir is full during direct_unpack"""
self._set_diskspace(download_free_gb=50.0, complete_free_gb=1.0)
self.mock_cfg.direct_unpack.return_value = True
Assembler.diskspace_check(self.nzo, self.nzf)
expected_required = (2 * GIGI) / GIGI
self.mock_downloader.pause.assert_called_once()
self.mock_scheduler.plan_diskspace_resume.assert_called_once_with("/complete", expected_required)
def test_complete_dir_full_near_completion(self):
"""Pause when complete_dir is full and download is >95% done"""
self.nzo.bytes_tried = int(self.nzo.bytes * 0.96)
self.nzo.bytes_par2 = 0
self._set_diskspace(download_free_gb=50.0, complete_free_gb=1.0)
Assembler.diskspace_check(self.nzo, self.nzf)
expected_required = (2 * GIGI + self.nzo.bytes) / GIGI # (complete_free + nzo.bytes)
self.mock_downloader.pause.assert_called_once()
self.mock_scheduler.plan_diskspace_resume.assert_called_once_with("/complete", expected_required)
def test_complete_dir_no_check_below_95_percent(self):
"""No complete_dir check when download is below 95% and not direct_unpack"""
self.nzo.bytes_tried = int(self.nzo.bytes * 0.50)
self._set_diskspace(download_free_gb=50.0, complete_free_gb=0.1)
Assembler.diskspace_check(self.nzo, self.nzf)
self.mock_downloader.pause.assert_not_called()
self.mock_scheduler.plan_diskspace_resume.assert_not_called()
def test_complete_dir_custom_path(self):
"""full_dir is the actual path when complete_dir differs from default"""
custom_path = "/custom/complete"
self.mock_get_complete_dir.return_value = (custom_path, None, True)
self._set_diskspace(download_free_gb=50.0, complete_free_gb=1.0, complete_path=custom_path)
self.mock_cfg.direct_unpack.return_value = True
Assembler.diskspace_check(self.nzo, self.nzf)
self.mock_downloader.pause.assert_called_once()
self.mock_scheduler.plan_diskspace_resume.assert_called_once_with(custom_path, mock.ANY)
def test_enough_space(self):
"""No action when both dirs have sufficient space"""
self._set_diskspace(download_free_gb=50.0, complete_free_gb=50.0)
Assembler.diskspace_check(self.nzo, self.nzf)
self.mock_downloader.pause.assert_not_called()
self.mock_scheduler.plan_diskspace_resume.assert_not_called()
self.mock_notifier.send_notification.assert_not_called()
self.mock_emailer.diskfull_mail.assert_not_called()
def test_autoresume_disabled(self):
"""plan_diskspace_resume not called when fulldisk_autoresume is off"""
self._set_diskspace(download_free_gb=1.0, complete_free_gb=50.0)
self.mock_cfg.fulldisk_autoresume.return_value = False
Assembler.diskspace_check(self.nzo, self.nzf)
self.mock_downloader.pause.assert_called_once()
self.mock_scheduler.plan_diskspace_resume.assert_not_called()
def test_download_dir_full_notifications(self):
"""Verify notifications and email are sent on disk full"""
self._set_diskspace(download_free_gb=1.0, complete_free_gb=50.0)
Assembler.diskspace_check(self.nzo, self.nzf)
self.mock_notifier.send_notification.assert_called_once()
self.mock_emailer.diskfull_mail.assert_called_once()