Restrict torrent prefetch to trusted origins (#976)

This commit is contained in:
Alex
2026-05-10 10:22:49 +01:00
committed by GitHub
parent 0e120abfaf
commit 472aae608b
2 changed files with 159 additions and 1 deletions

View File

@@ -7,12 +7,13 @@ import hashlib
import re
from binascii import Error as BinasciiError
from dataclasses import dataclass
from urllib.parse import parse_qs, urljoin, urlparse
from urllib.parse import ParseResult, parse_qs, urljoin, urlparse
import requests
from shelfmark.core.config import config
from shelfmark.core.logger import setup_logger
from shelfmark.core.utils import normalize_http_url
from shelfmark.download.network import get_ssl_verify
logger = setup_logger(__name__)
@@ -32,6 +33,7 @@ _TORRENT_FETCH_ERRORS = (
ValueError,
)
_TORRENT_PARSE_ERRORS = (IndexError, KeyError, TypeError, ValueError)
_TRUSTED_TORRENT_FETCH_URL_CONFIG_KEYS = ("PROWLARR_URL", "NEWZNAB_URL")
type BencodeValue = dict[str | bytes, BencodeValue] | list[BencodeValue] | int | bytes | str
@@ -93,6 +95,9 @@ def extract_torrent_info(
# Not a magnet - try to fetch and parse the .torrent file
if not fetch_torrent:
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)
if not _is_trusted_torrent_fetch_url(url):
logger.debug("Skipping torrent prefetch for untrusted URL: %s...", url[:80])
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)
headers: dict[str, str] = {"Accept": "application/x-bittorrent"}
# TODO(shelfmark): Move this source-specific Prowlarr auth handling into a source hook.
@@ -133,6 +138,12 @@ def extract_torrent_info(
is_magnet=True,
magnet_url=redirect_url,
)
if not _is_trusted_torrent_fetch_url(redirect_url):
logger.debug(
"Skipping torrent prefetch redirect to untrusted URL: %s...",
redirect_url[:80],
)
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)
# Not a magnet redirect, follow it manually
logger.debug("Following redirect to: %s...", redirect_url[:80])
resp = requests.get(
@@ -172,6 +183,36 @@ def extract_torrent_info(
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)
def _is_trusted_torrent_fetch_url(url: str) -> bool:
parsed = urlparse(url)
origin = _url_origin(parsed)
if origin is None:
return False
for key in _TRUSTED_TORRENT_FETCH_URL_CONFIG_KEYS:
configured_url = str(config.get(key, "") or "").strip()
if not configured_url:
continue
configured_origin = _url_origin(urlparse(normalize_http_url(configured_url)))
if configured_origin == origin:
return True
return False
def _url_origin(parsed_url: ParseResult) -> tuple[str, str, int] | None:
scheme = parsed_url.scheme.lower()
if scheme not in {"http", "https"}:
return None
hostname = parsed_url.hostname
if not hostname:
return None
default_port = 443 if scheme == "https" else 80
return (scheme, hostname.lower(), parsed_url.port or default_port)
def parse_transmission_url(url: str) -> tuple[str, str, int, str]:
"""Parse Transmission URL into (protocol, host, port, path)."""
parsed = urlparse(url)

View File

@@ -10,6 +10,7 @@ Tests:
import base64
import hashlib
from unittest.mock import MagicMock
import pytest
@@ -18,6 +19,7 @@ from shelfmark.download.clients.torrent_utils import (
bencode_encode,
extract_hash_from_magnet,
extract_info_hash_from_torrent,
extract_torrent_info,
parse_transmission_url,
)
@@ -356,6 +358,121 @@ class TestExtractInfoHash:
assert extract_info_hash_from_torrent(torrent_bytes) == expected
class TestExtractTorrentInfo:
"""Tests for extracting torrent info from user-supplied URLs."""
def test_does_not_fetch_untrusted_http_torrent_url(self, monkeypatch):
"""Arbitrary HTTP torrent URLs are passed through without backend prefetch."""
expected_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0"
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": "",
)
mock_get = MagicMock()
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)
result = extract_torrent_info(
"https://attacker.example/book.torrent",
fetch_torrent=True,
expected_hash=expected_hash,
)
assert result.info_hash == expected_hash
assert result.torrent_data is None
assert result.is_magnet is False
mock_get.assert_not_called()
def test_fetches_configured_prowlarr_torrent_url(self, monkeypatch):
"""Configured Prowlarr download URLs can still be prefetched and parsed."""
info_dict = {
b"name": b"trusted.txt",
b"length": 100,
b"piece length": 16384,
b"pieces": b"\x00" * 20,
}
torrent_data = bencode_encode({b"info": info_dict})
expected_hash = hashlib.sha1(bencode_encode(info_dict)).hexdigest().lower()
config_values = {
"PROWLARR_URL": "https://prowlarr.example",
"PROWLARR_API_KEY": "secret",
}
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": config_values.get(key, default),
)
response = MagicMock(status_code=200, content=torrent_data)
response.raise_for_status = MagicMock()
mock_get = MagicMock(return_value=response)
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)
result = extract_torrent_info(
"https://prowlarr.example/1/download?apikey=secret&indexer=7",
fetch_torrent=True,
)
assert result.info_hash == expected_hash
assert result.torrent_data == torrent_data
assert result.is_magnet is False
mock_get.assert_called_once()
def test_normalizes_configured_origin_before_trusting_torrent_url(self, monkeypatch):
"""Configured Prowlarr URLs match the same normalization used by the source."""
info_dict = {
b"name": b"trusted.txt",
b"length": 100,
b"piece length": 16384,
b"pieces": b"\x00" * 20,
}
torrent_data = bencode_encode({b"info": info_dict})
expected_hash = hashlib.sha1(bencode_encode(info_dict)).hexdigest().lower()
config_values = {
"PROWLARR_URL": "prowlarr.example:9696/",
"PROWLARR_API_KEY": "secret",
}
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": config_values.get(key, default),
)
response = MagicMock(status_code=200, content=torrent_data)
response.raise_for_status = MagicMock()
mock_get = MagicMock(return_value=response)
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)
result = extract_torrent_info(
"http://prowlarr.example:9696/1/download?apikey=secret&indexer=7",
fetch_torrent=True,
)
assert result.info_hash == expected_hash
assert result.torrent_data == torrent_data
mock_get.assert_called_once()
def test_does_not_follow_trusted_torrent_url_redirect_to_untrusted_host(self, monkeypatch):
"""Trusted HTTP prefetch does not continue through arbitrary redirects."""
expected_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0"
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": "https://prowlarr.example" if key == "PROWLARR_URL" else "",
)
response = MagicMock(status_code=302)
response.headers = {"Location": "https://attacker.example/book.torrent"}
mock_get = MagicMock(return_value=response)
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)
result = extract_torrent_info(
"https://prowlarr.example/1/download?apikey=secret&indexer=7",
fetch_torrent=True,
expected_hash=expected_hash,
)
assert result.info_hash == expected_hash
assert result.torrent_data is None
assert result.is_magnet is False
mock_get.assert_called_once()
class TestExtractHashFromMagnet:
"""Tests for extracting hash from magnet links."""