Restrict SABnzbd NZB prefetch origins (#967)

This commit is contained in:
Alex
2026-05-09 13:44:47 +01:00
committed by GitHub
parent eee8ba0e83
commit 81b448bc9f
2 changed files with 75 additions and 6 deletions

View File

@@ -33,6 +33,24 @@ _SABNZBD_CLIENT_ERRORS = (
_SabnzbdRequestParam = str | int | float | bool
def _url_origin(value: str) -> tuple[str, str, int] | None:
try:
parsed = urlparse(value)
port = parsed.port
except ValueError:
return None
scheme = parsed.scheme.lower()
hostname = (parsed.hostname or "").lower()
if scheme not in {"http", "https"} or not hostname:
return None
if port is None:
port = 443 if scheme == "https" else 80
return scheme, hostname, port
def _parse_eta(eta_str: str) -> int | None:
"""Parse SABnzbd ETA string (format: 'H:MM:SS') to seconds."""
if not eta_str or eta_str == "0:00:00":
@@ -220,6 +238,18 @@ class SABnzbdClient(DownloadClient):
response.raise_for_status()
return response.content
def _can_prefetch_nzb_url(self, url: str) -> bool:
target_origin = _url_origin(url)
if target_origin is None:
return False
for key in ("PROWLARR_URL", "NEWZNAB_URL"):
trusted_url = normalize_http_config_url(config.get(key, ""))
if trusted_url and _url_origin(trusted_url) == target_origin:
return True
return False
def _get_prowlarr_headers(self, url: str) -> dict:
# TODO(shelfmark): Move this source-specific Prowlarr auth handling into a source hook.
api_key = str(config.get("PROWLARR_API_KEY", "") or "").strip()
@@ -326,15 +356,20 @@ class SABnzbdClient(DownloadClient):
try:
logger.debug("Adding NZB to SABnzbd: %s", name)
nzb_filename = self._build_nzb_filename(name, url)
nzb_content = self._fetch_nzb_content(url)
result = self._api_post_file(nzb_content, nzb_filename, name, resolved_category)
nzo_id = self._extract_nzo_id(result)
logger.info("Added NZB to SABnzbd: %s", nzo_id)
if self._can_prefetch_nzb_url(url):
nzb_filename = self._build_nzb_filename(name, url)
nzb_content = self._fetch_nzb_content(url)
result = self._api_post_file(nzb_content, nzb_filename, name, resolved_category)
nzo_id = self._extract_nzo_id(result)
logger.info("Added NZB to SABnzbd: %s", nzo_id)
else:
logger.info("Skipping SABnzbd addfile prefetch for untrusted NZB URL")
nzo_id = ""
except _SABNZBD_CLIENT_ERRORS as e:
logger.warning("SABnzbd addfile failed, falling back to addurl: %s", e)
else:
return nzo_id
if nzo_id:
return nzo_id
try:
result = self._api_call(

View File

@@ -476,6 +476,7 @@ class TestSABnzbdClientAddDownload:
"SABNZBD_URL": "http://localhost:8080",
"SABNZBD_API_KEY": "abc123",
"SABNZBD_CATEGORY": "books",
"PROWLARR_URL": "https://example.com",
}
monkeypatch.setattr(
"shelfmark.download.clients.sabnzbd.config.get",
@@ -512,6 +513,7 @@ class TestSABnzbdClientAddDownload:
"SABNZBD_URL": "http://localhost:8080",
"SABNZBD_API_KEY": "abc123",
"SABNZBD_CATEGORY": "books",
"PROWLARR_URL": "https://example.com",
}
monkeypatch.setattr(
"shelfmark.download.clients.sabnzbd.config.get",
@@ -551,6 +553,7 @@ class TestSABnzbdClientAddDownload:
"SABNZBD_URL": "http://localhost:8080",
"SABNZBD_API_KEY": "abc123",
"SABNZBD_CATEGORY": "books",
"PROWLARR_URL": "https://example.com",
}
monkeypatch.setattr(
"shelfmark.download.clients.sabnzbd.config.get",
@@ -592,6 +595,7 @@ class TestSABnzbdClientAddDownload:
"SABNZBD_URL": "http://localhost:8080",
"SABNZBD_API_KEY": "abc123",
"SABNZBD_CATEGORY": "books",
"PROWLARR_URL": "https://example.com",
}
monkeypatch.setattr(
"shelfmark.download.clients.sabnzbd.config.get",
@@ -620,6 +624,36 @@ class TestSABnzbdClientAddDownload:
assert result == "SABnzbd_nzo_fallback"
assert mock_api_call.call_args[0][0] == "addurl"
def test_add_download_does_not_prefetch_untrusted_nzb_url(self, monkeypatch):
"""Untrusted NZB URLs should be handed to SABnzbd without backend prefetch."""
config_values = {
"SABNZBD_URL": "http://localhost:8080",
"SABNZBD_API_KEY": "abc123",
"SABNZBD_CATEGORY": "books",
"PROWLARR_URL": "https://prowlarr.example",
}
monkeypatch.setattr(
"shelfmark.download.clients.sabnzbd.config.get",
lambda key, default="": config_values.get(key, default),
)
mock_response = MagicMock()
mock_response.json.return_value = {"status": True, "nzo_ids": ["SABnzbd_nzo_addurl"]}
with patch(
"shelfmark.download.clients.sabnzbd.requests.get",
return_value=mock_response,
) as mock_get:
from shelfmark.download.clients.sabnzbd import SABnzbdClient
client = SABnzbdClient()
result = client.add_download("https://attacker.example/download.nzb", "Test Book")
assert result == "SABnzbd_nzo_addurl"
called_urls = [call.args[0] for call in mock_get.call_args_list]
assert "https://attacker.example/download.nzb" not in called_urls
assert called_urls == ["http://localhost:8080/api"]
class TestSABnzbdClientRemove:
"""Tests for SABnzbdClient.remove()."""