Pipelining and performance optimisations (#3199)

* Pipelining and performance optimisations

* Refactor to remove handle_remainder and add on_response callback to allow inspecting of nntp messages

* Logic fix if there are sockets but nothing to read/write

* Fix logic errors for failed article requests

* Fix logic for reconfiguring servers

* Add guard_restart callback to pipelining_requests

* Fix article download stats

* Fix current article request shown via api

* Removal of DecodingStatus

* Fix circular reference

* Cleanup imports

* Handle reset_nw and hard_reset for inflight requests

* Improve __request_article behaviour using discard helper

* Article should be None here (before auth) but just in case

* Remove command_queue_condition unnecessary with the pull rather than push queue system

* During reset discard any data received prior to sending quit request

* Circular references again

* Revert to using bytearray

* Revert "During reset discard any data received prior to sending quit request"

This reverts commit ed522e3e80.

* Simpler interaction with sabctools

* Temporarily use the sabctools streaming decoder branch

* Fix most uu tests

* Reduce maximum pipelining requests

* Fix the squiggly line

* Remove some LOG_ALL debug code

* Make get_articles return consistent (None) - it now populates the server deque

* Reduce NNTP_BUFFER_SIZE

* Rename PIPELINING_REQUESTS to DEF_PIPELINING_REQUESTS

* A little refactoring

* Reduce default pipelining until it is dynamic

* Use BoundedSemaphore and fix the unacquired release

* Use crc from sabctools for uu and make filename logic consistent wit yenc

* Use sabctools 9.0.0

* Fix Check Before Download

* Move lock to NzbFile

* Use sabctools 9.1.0

* Minor change

* Fix 430 on check before download

* Update sabnews to work reliably with pipelining

* Minor tidy up

* Why does only Linux complain about this

* Leave this as it was

* Remove unused import

* Compare enum by identity

* Remove command_queue and just prepare a single request
Check if it should be sent and discard when paused

* Kick-start idle connections

* Modify events sockets are monitored for
This commit is contained in:
mnightingale
2025-12-05 12:33:35 +00:00
committed by GitHub
parent e8e8fff5bf
commit 44d94226ec
13 changed files with 580 additions and 527 deletions

View File

@@ -1,7 +1,7 @@
# Main requirements
# Note that not all sub-dependencies are listed, but only ones we know could cause trouble
apprise==1.9.5
sabctools==8.2.6
sabctools==9.1.0
CT3==3.4.0.post5
cffi==2.0.0
pycparser==2.23

View File

@@ -269,6 +269,7 @@ def initialize(pause_downloader=False, clean_up=False, repair=0):
cfg.language.callback(cfg.guard_language)
cfg.enable_https_verification.callback(cfg.guard_https_ver)
cfg.guard_https_ver()
cfg.pipelining_requests.callback(cfg.guard_restart)
# Set language files
lang.set_locale_info("SABnzbd", DIR_LANGUAGE)

View File

@@ -30,6 +30,8 @@ import cherrypy
from threading import Thread
from typing import Optional, Any, Union
import sabctools
# For json.dumps, orjson is magnitudes faster than ujson, but it is harder to
# compile due to Rust dependency. Since the output is the same, we support all modules.
try:
@@ -1387,12 +1389,20 @@ def test_nntp_server_dict(kwargs: dict[str, Union[str, list[str]]]) -> tuple[boo
# Sorry, no clever analysis:
return False, T('Server address "%s:%s" is not valid.') % (host, port)
nw = NewsWrapper(server=test_server, thrdnum=-1, block=True)
nntp_code: int = 0
nntp_message: str = ""
def on_response(code: int, message: str):
nonlocal nntp_code, nntp_message
nntp_code = code
nntp_message = message
try:
nw = NewsWrapper(server=test_server, thrdnum=-1, block=True)
nw.init_connect()
while not nw.connected:
nw.recv_chunk()
nw.finish_connect(nw.status_code)
nw.write()
nw.read(on_response=on_response)
except socket.timeout:
if port != 119 and not ssl:
@@ -1414,30 +1424,30 @@ def test_nntp_server_dict(kwargs: dict[str, Union[str, list[str]]]) -> tuple[boo
return False, str(err)
if not username or not password:
nw.nntp.sock.sendall(b"ARTICLE <test@home>\r\n")
nw.queue_command(b"ARTICLE <test@home>\r\n")
try:
nw.reset_data_buffer()
nw.recv_chunk()
nw.write()
nw.read(on_response=on_response)
except Exception as err:
# Some internal error, not always safe to close connection
return False, str(err)
# Parse result
return_status = ()
if nw.status_code:
if nw.status_code == 480:
if nntp_code:
if nntp_code == 480:
return_status = (False, T("Server requires username and password."))
elif nw.status_code < 300 or nw.status_code in (411, 423, 430):
elif nntp_code < 300 or nntp_code in (411, 423, 430):
# If no username/password set and we requested fake-article, it will return 430 Not Found
return_status = (True, T("Connection Successful!"))
elif nw.status_code == 502 or sabnzbd.downloader.clues_login(nw.nntp_msg):
elif nntp_code == 502 or sabnzbd.downloader.clues_login(nntp_message):
return_status = (False, T("Authentication failed, check username/password."))
elif sabnzbd.downloader.clues_too_many(nw.nntp_msg):
elif sabnzbd.downloader.clues_too_many(nntp_message):
return_status = (False, T("Too many connections, please pause downloading or try again later"))
# Fallback in case no data was received or unknown status
if not return_status:
return_status = (False, T("Could not determine connection result (%s)") % nw.nntp_msg)
return_status = (False, T("Could not determine connection result (%s)") % nntp_message)
# Close the connection and return result
nw.hard_reset()
@@ -1502,13 +1512,13 @@ def build_status(calculate_performance: bool = False, skip_dashboard: bool = Fal
for nw in server.busy_threads.copy():
if nw.connected:
activeconn += 1
if nw.article:
if article := nw.article:
serverconnections.append(
{
"thrdnum": nw.thrdnum,
"art_name": nw.article.article,
"nzf_name": nw.article.nzf.filename,
"nzo_name": nw.article.nzf.nzo.final_name,
"art_name": article.article,
"nzf_name": article.nzf.filename,
"nzo_name": article.nzf.nzo.final_name,
}
)

View File

@@ -53,6 +53,7 @@ from sabnzbd.constants import (
DEF_HTTPS_CERT_FILE,
DEF_HTTPS_KEY_FILE,
DEF_MAX_ASSEMBLER_QUEUE,
DEF_PIPELINING_REQUESTS,
)
from sabnzbd.filesystem import same_directory, real_path, is_valid_script, is_network_path
@@ -534,6 +535,7 @@ ssdp_broadcast_interval = OptionNumber("misc", "ssdp_broadcast_interval", 15, mi
ext_rename_ignore = OptionList("misc", "ext_rename_ignore", validation=lower_case_ext)
unrar_parameters = OptionStr("misc", "unrar_parameters", validation=supported_unrar_parameters)
outgoing_nntp_ip = OptionStr("misc", "outgoing_nntp_ip")
pipelining_requests = OptionNumber("misc", "pipelining_requests", DEF_PIPELINING_REQUESTS, minval=1, maxval=10)
##############################################################################

View File

@@ -50,7 +50,7 @@ RENAMES_FILE = "__renames__"
ATTRIB_FILE = "SABnzbd_attrib"
REPAIR_REQUEST = "repair-all.sab"
SABCTOOLS_VERSION_REQUIRED = "8.2.6"
SABCTOOLS_VERSION_REQUIRED = "9.1.0"
DB_HISTORY_VERSION = 1
DB_HISTORY_NAME = "history%s.db" % DB_HISTORY_VERSION
@@ -101,8 +101,9 @@ DEF_MAX_ASSEMBLER_QUEUE = 12
SOFT_ASSEMBLER_QUEUE_LIMIT = 0.5
# Percentage of cache to use before adding file to assembler
ASSEMBLER_WRITE_THRESHOLD = 5
NNTP_BUFFER_SIZE = int(800 * KIBI)
NNTP_BUFFER_SIZE = int(256 * KIBI)
NTTP_MAX_BUFFER_SIZE = int(10 * MEBI)
DEF_PIPELINING_REQUESTS = 2
REPAIR_PRIORITY = 3
FORCE_PRIORITY = 2

View File

@@ -21,13 +21,10 @@ sabnzbd.decoder - article decoder
import logging
import hashlib
import binascii
from io import BytesIO
from zlib import crc32
from typing import Optional
import sabnzbd
from sabnzbd.constants import SABCTOOLS_VERSION_REQUIRED
from sabnzbd.encoding import ubtou
from sabnzbd.nzbstuff import Article
from sabnzbd.misc import match_str
@@ -50,7 +47,7 @@ except Exception:
class BadData(Exception):
def __init__(self, data: bytes):
def __init__(self, data: bytearray):
super().__init__()
self.data = data
@@ -63,8 +60,8 @@ class BadUu(Exception):
pass
def decode(article: Article, data_view: memoryview):
decoded_data = None
def decode(article: Article, decoder: sabctools.NNTPResponse):
decoded_data: Optional[bytearray] = None
nzo = article.nzf.nzo
art_id = article.article
@@ -78,10 +75,10 @@ def decode(article: Article, data_view: memoryview):
if sabnzbd.LOG_ALL:
logging.debug("Decoding %s", art_id)
if article.nzf.type == "uu":
decoded_data = decode_uu(article, bytes(data_view))
if decoder.format is sabctools.EncodingFormat.UU:
decoded_data = decode_uu(article, decoder)
else:
decoded_data = decode_yenc(article, data_view)
decoded_data = decode_yenc(article, decoder)
article_success = True
@@ -112,28 +109,18 @@ def decode(article: Article, data_view: memoryview):
except (BadYenc, ValueError):
# Handles precheck and badly formed articles
if nzo.precheck and data_view and data_view[:4] == b"223 ":
if nzo.precheck and decoder.status_code == 223:
# STAT was used, so we only get a status code
article_success = True
else:
# Try uu-decoding
if not nzo.precheck and article.nzf.type != "yenc":
try:
decoded_data = decode_uu(article, bytes(data_view))
logging.debug("Found uu-encoded article %s in job %s", art_id, nzo.final_name)
article_success = True
except Exception:
pass
# Only bother with further checks if uu-decoding didn't work out
if not article_success:
# Convert the first 2000 bytes of raw socket data to article lines,
# and examine the headers (for precheck) or body (for download).
for line in bytes(data_view[:2000]).split(b"\r\n"):
# Examine the headers (for precheck) or body (for download).
if lines := decoder.lines:
for line in lines:
lline = line.lower()
if lline.startswith(b"message-id:"):
if lline.startswith("message-id:"):
article_success = True
# Look for DMCA clues (while skipping "X-" headers)
if not lline.startswith(b"x-") and match_str(lline, (b"dmca", b"removed", b"cancel", b"blocked")):
if not lline.startswith("x-") and match_str(lline, ("dmca", "removed", "cancel", "blocked")):
article_success = False
logging.info("Article removed from server (%s)", art_id)
break
@@ -170,164 +157,63 @@ def decode(article: Article, data_view: memoryview):
sabnzbd.NzbQueue.register_article(article, article_success)
def decode_yenc(article: Article, data_view: memoryview) -> bytearray:
def decode_yenc(article: Article, response: sabctools.NNTPResponse) -> bytearray:
# Let SABCTools do all the heavy lifting
(
decoded_data,
yenc_filename,
article.file_size,
article.data_begin,
article.data_size,
crc_correct,
) = sabctools.yenc_decode(data_view)
decoded_data = response.data
article.file_size = response.file_size
article.data_begin = response.part_begin
article.data_size = response.part_size
nzf = article.nzf
# Assume it is yenc
nzf.type = "yenc"
# Only set the name if it was found and not obfuscated
if not nzf.filename_checked and yenc_filename:
if not nzf.filename_checked and (file_name := response.file_name):
# Set the md5-of-16k if this is the first article
if article.lowest_partnum:
nzf.md5of16k = hashlib.md5(decoded_data[:16384]).digest()
nzf.md5of16k = hashlib.md5(memoryview(decoded_data)[:16384]).digest()
# Try the rename, even if it's not the first article
# For example when the first article was missing
nzf.nzo.verify_nzf_filename(nzf, yenc_filename)
nzf.nzo.verify_nzf_filename(nzf, file_name)
# CRC check
if crc_correct is None:
if (crc := response.crc) is None:
logging.info("CRC Error in %s", article.article)
raise BadData(decoded_data)
article.crc32 = crc_correct
article.crc32 = crc
return decoded_data
def decode_uu(article: Article, raw_data: bytes) -> bytes:
"""Try to uu-decode an article. The raw_data may or may not contain headers.
If there are headers, they will be separated from the body by at least one
empty line. In case of no headers, the first line seems to always be the nntp
response code (220/222) directly followed by the msg body."""
if not raw_data:
def decode_uu(article: Article, response: sabctools.NNTPResponse) -> bytearray:
"""Process a uu-decoded response"""
if not response.bytes_decoded:
logging.debug("No data to decode")
raise BadUu
# Line up the raw_data
raw_data = raw_data.split(b"\r\n")
if response.baddata:
raise BadData(response.data)
# Index of the uu payload start in raw_data
uu_start = 0
# Limit the number of lines to check for the onset of uu data
limit = min(len(raw_data), 32) - 1
if limit < 3:
logging.debug("Article too short to contain valid uu-encoded data")
raise BadUu
# Try to find an empty line separating the body from headers or response
# code and set the expected payload start to the next line.
try:
uu_start = raw_data[:limit].index(b"") + 1
except ValueError:
# No empty line, look for a response code instead
if raw_data[0].startswith(b"220 ") or raw_data[0].startswith(b"222 "):
uu_start = 1
else:
# Invalid data?
logging.debug("Failed to locate start of uu payload")
raise BadUu
def is_uu_junk(line: bytes) -> bool:
"""Determine if the line is empty or contains known junk data"""
return (not line) or line == b"-- " or line.startswith(b"Posted via ")
# Check the uu 'begin' line
if article.lowest_partnum:
try:
# Make sure the line after the uu_start one isn't empty as well or
# detection of the 'begin' line won't work. For articles other than
# lowest_partnum, filtering out empty lines (and other junk) can
# wait until the actual decoding step.
for index in range(uu_start, limit):
if is_uu_junk(raw_data[index]):
uu_start = index + 1
else:
# Bingo
break
else:
# Search reached the limit
raise IndexError
uu_begin_data = raw_data[uu_start].split(b" ")
# Filename may contain spaces
uu_filename = ubtou(b" ".join(uu_begin_data[2:]).strip())
# Sanity check the 'begin' line
if (
len(uu_begin_data) < 3
or uu_begin_data[0].lower() != b"begin"
or (not int(uu_begin_data[1], 8))
or (not uu_filename)
):
raise ValueError
# Consider this enough proof to set the type, avoiding further
# futile attempts at decoding articles in this nzf as yenc.
article.nzf.type = "uu"
# Bump the pointer for the payload to the next line
uu_start += 1
except Exception:
logging.debug("Missing or invalid uu 'begin' line: %s", raw_data[uu_start] if uu_start < limit else None)
raise BadUu
# Do the actual decoding
with BytesIO() as decoded_data:
for line in raw_data[uu_start:]:
# Ignore junk
if is_uu_junk(line):
continue
# End of the article
if line in (b"`", b"end", b"."):
break
# Remove dot stuffing
if line.startswith(b".."):
line = line[1:]
try:
decoded_line = binascii.a2b_uu(line)
except binascii.Error as msg:
try:
# Workaround for broken uuencoders by Fredrik Lundh
nbytes = (((line[0] - 32) & 63) * 4 + 5) // 3
decoded_line = binascii.a2b_uu(line[:nbytes])
except Exception as msg2:
logging.info(
"Error while uu-decoding %s: %s (line: %s; workaround: %s)", article.article, msg, line, msg2
)
raise BadData(decoded_data.getvalue())
# Store the decoded data
decoded_data.write(decoded_line)
# Set the type to uu; the latter is still needed in
# case the lowest_partnum article was damaged or slow to download.
article.nzf.type = "uu"
decoded_data = response.data
nzf = article.nzf
nzf.type = "uu"
# Only set the name if it was found and not obfuscated
if not nzf.filename_checked and (file_name := response.file_name):
# Set the md5-of-16k if this is the first article
if article.lowest_partnum:
decoded_data.seek(0)
article.nzf.md5of16k = hashlib.md5(decoded_data.read(16384)).digest()
# Handle the filename
if not article.nzf.filename_checked and uu_filename:
article.nzf.nzo.verify_nzf_filename(article.nzf, uu_filename)
nzf.md5of16k = hashlib.md5(memoryview(decoded_data)[:16384]).digest()
data = decoded_data.getvalue()
article.crc32 = crc32(data)
return data
# Try the rename, even if it's not the first article
# For example when the first article was missing
nzf.nzo.verify_nzf_filename(nzf, file_name)
article.crc32 = response.crc
return decoded_data
def search_new_server(article: Article) -> bool:

View File

@@ -19,15 +19,18 @@
sabnzbd.downloader - download engine
"""
import select
import logging
import selectors
from collections import deque
from threading import Thread, RLock, current_thread
import socket
import sys
import ssl
import time
from datetime import date
from typing import Optional, Union
from typing import Optional, Union, Deque
import sabctools
import sabnzbd
from sabnzbd.decorators import synchronized, NzbQueueLocker, DOWNLOADER_CV, DOWNLOADER_LOCK
@@ -148,7 +151,7 @@ class Server:
self.request: bool = False # True if a getaddrinfo() request is pending
self.have_body: bool = True # Assume server has "BODY", until proven otherwise
self.have_stat: bool = True # Assume server has "STAT", until proven otherwise
self.article_queue: list[sabnzbd.nzbstuff.Article] = []
self.article_queue: Deque[sabnzbd.nzbstuff.Article] = deque()
# Skip during server testing
if threads:
@@ -173,19 +176,19 @@ class Server:
self.reset_article_queue()
@synchronized(DOWNLOADER_LOCK)
def get_article(self):
def get_article(self, peek: bool = False):
"""Get article from pre-fetched and pre-fetch new ones if necessary.
Articles that are too old for this server are immediately marked as tried"""
if self.article_queue:
return self.article_queue.pop(0)
return self.article_queue[0] if peek else self.article_queue.popleft()
if self.next_article_search < time.time():
# Pre-fetch new articles
self.article_queue = sabnzbd.NzbQueue.get_articles(self, sabnzbd.Downloader.servers, _ARTICLE_PREFETCH)
sabnzbd.NzbQueue.get_articles(self, sabnzbd.Downloader.servers, _ARTICLE_PREFETCH)
if self.article_queue:
article = self.article_queue.pop(0)
article = self.article_queue[0] if peek else self.article_queue.popleft()
# Mark expired articles as tried on this server
if self.retention and article.nzf.nzo.avg_stamp < time.time() - self.retention:
if not peek and self.retention and article.nzf.nzo.avg_stamp < time.time() - self.retention:
sabnzbd.Downloader.decode(article)
while self.article_queue:
sabnzbd.Downloader.decode(self.article_queue.pop())
@@ -201,9 +204,12 @@ class Server:
"""Reset articles queued for the Server. Locked to prevent
articles getting stuck in the Server when enabled/disabled"""
logging.debug("Resetting article queue for %s (%s)", self, self.article_queue)
for article in self.article_queue:
article.allow_new_fetcher()
self.article_queue = []
while self.article_queue:
try:
article = self.article_queue.popleft()
article.allow_new_fetcher()
except IndexError:
pass
def request_addrinfo(self):
"""Launch async request to resolve server address and select the fastest.
@@ -250,7 +256,7 @@ class Downloader(Thread):
"shutdown",
"server_restarts",
"force_disconnect",
"read_fds",
"selector",
"servers",
"timers",
"last_max_chunk_size",
@@ -290,7 +296,7 @@ class Downloader(Thread):
self.force_disconnect: bool = False
self.read_fds: dict[int, NewsWrapper] = {}
self.selector: selectors.DefaultSelector = selectors.DefaultSelector()
self.servers: list[Server] = []
self.timers: dict[str, list[float]] = {}
@@ -361,15 +367,34 @@ class Downloader(Thread):
self.servers.sort(key=lambda svr: "%02d%s" % (svr.priority, svr.displayname.lower()))
@synchronized(DOWNLOADER_LOCK)
def add_socket(self, fileno: int, nw: NewsWrapper):
"""Add a socket ready to be used to the list to be watched"""
self.read_fds[fileno] = nw
def add_socket(self, nw: NewsWrapper):
"""Add a socket to be watched for read or write availability"""
if nw.nntp:
try:
self.selector.register(nw.nntp.fileno, selectors.EVENT_READ | selectors.EVENT_WRITE, nw)
nw.selector_events = selectors.EVENT_READ | selectors.EVENT_WRITE
except KeyError:
pass
@synchronized(DOWNLOADER_LOCK)
def modify_socket(self, nw: NewsWrapper, events: int):
"""Modify the events socket are watched for"""
if nw.nntp and nw.selector_events != events:
try:
self.selector.modify(nw.nntp.fileno, events, nw)
nw.selector_events = events
except KeyError:
pass
@synchronized(DOWNLOADER_LOCK)
def remove_socket(self, nw: NewsWrapper):
"""Remove a socket to be watched"""
if nw.nntp:
self.read_fds.pop(nw.nntp.fileno, None)
try:
self.selector.unregister(nw.nntp.fileno)
nw.selector_events = 0
except KeyError:
pass
@NzbQueueLocker
def set_paused_state(self, state: bool):
@@ -509,26 +534,30 @@ class Downloader(Thread):
# Remove all connections to server
for nw in server.idle_threads | server.busy_threads:
self.__reset_nw(nw, "Forcing disconnect", warn=False, wait=False, retry_article=False)
self.reset_nw(nw, "Forcing disconnect", warn=False, wait=False, retry_article=False)
# Make sure server address resolution is refreshed
server.addrinfo = None
@staticmethod
def decode(article, data_view: Optional[memoryview] = None):
def decode(article: "sabnzbd.nzbstuff.Article", response: Optional[sabctools.NNTPResponse] = None):
"""Decode article"""
# Need a better way of draining requests
if article.nzf.nzo.removed_from_queue:
return
# Article was requested and fetched, update article stats for the server
sabnzbd.BPSMeter.register_server_article_tried(article.fetcher.id)
# Handle broken articles directly
if not data_view:
if not response or not response.bytes_decoded and not article.nzf.nzo.precheck:
if not article.search_new_server():
article.nzf.nzo.increase_bad_articles_counter("missing_articles")
sabnzbd.NzbQueue.register_article(article, success=False)
return
# Decode and send to article cache
sabnzbd.decoder.decode(article, data_view)
sabnzbd.decoder.decode(article, response)
def run(self):
# Warn if there are servers defined, but none are valid
@@ -548,7 +577,7 @@ class Downloader(Thread):
for _ in range(cfg.receive_threads()):
# Started as daemon, so we don't need any shutdown logic in the worker
# The Downloader code will make sure shutdown is handled gracefully
Thread(target=self.process_nw_worker, args=(self.read_fds, process_nw_queue), daemon=True).start()
Thread(target=self.process_nw_worker, args=(process_nw_queue,), daemon=True).start()
# Catch all errors, just in case
try:
@@ -570,9 +599,9 @@ class Downloader(Thread):
if (nw.nntp and nw.nntp.error_msg) or (nw.timeout and now > nw.timeout):
if nw.nntp and nw.nntp.error_msg:
# Already showed error
self.__reset_nw(nw)
self.reset_nw(nw)
else:
self.__reset_nw(nw, "Timed out", warn=True)
self.reset_nw(nw, "Timed out", warn=True)
server.bad_cons += 1
self.maybe_block_server(server)
@@ -612,15 +641,14 @@ class Downloader(Thread):
server.request_addrinfo()
break
nw.article = server.get_article()
if not nw.article:
if not server.get_article(peek=True):
break
server.idle_threads.remove(nw)
server.busy_threads.add(nw)
if nw.connected:
self.__request_article(nw)
self.add_socket(nw)
else:
try:
logging.info("%s@%s: Initiating connection", nw.thrdnum, server.host)
@@ -632,14 +660,14 @@ class Downloader(Thread):
server.host,
sys.exc_info()[1],
)
self.__reset_nw(nw, "Failed to initialize", warn=True)
self.reset_nw(nw, "Failed to initialize", warn=True)
if self.force_disconnect or self.shutdown:
for server in self.servers:
for nw in server.idle_threads | server.busy_threads:
# Send goodbye if we have open socket
if nw.nntp:
self.__reset_nw(nw, "Forcing disconnect", wait=False, count_article_try=False)
self.reset_nw(nw, "Forcing disconnect", wait=False, count_article_try=False)
# Make sure server address resolution is refreshed
server.addrinfo = None
server.reset_article_queue()
@@ -663,10 +691,12 @@ class Downloader(Thread):
self.last_max_chunk_size = 0
# Use select to find sockets ready for reading/writing
if readkeys := self.read_fds.keys():
read, _, _ = select.select(readkeys, (), (), 1.0)
if self.selector.get_map():
if events := self.selector.select(timeout=1.0):
for key, ev in events:
process_nw_queue.put((key.data, ev))
else:
read = []
events = []
BPSMeter.reset()
time.sleep(0.1)
self.max_chunk_size = _DEFAULT_CHUNK_SIZE
@@ -685,58 +715,65 @@ class Downloader(Thread):
next_bpsmeter_update = now + _BPSMETER_UPDATE_DELAY
self.check_assembler_levels()
if not read:
if not events:
continue
# Submit all readable sockets to be processed and wait for completion
process_nw_queue.put_multiple(read)
# Wait for socket operation completion
process_nw_queue.join()
except Exception:
logging.error(T("Fatal error in Downloader"), exc_info=True)
def process_nw_worker(self, read_fds: dict[int, NewsWrapper], nw_queue: MultiAddQueue):
def process_nw_worker(self, nw_queue: MultiAddQueue):
"""Worker for the daemon thread to process results.
Wrapped in try/except because in case of an exception, logging
might get lost and the queue.join() would block forever."""
try:
logging.debug("Starting Downloader receive thread: %s", current_thread().name)
while True:
# The read_fds is passed by reference, so we can access its items!
self.process_nw(read_fds[nw_queue.get()])
self.process_nw(*nw_queue.get())
nw_queue.task_done()
except Exception:
# We cannot break out of the Downloader from here, so just pause
logging.error(T("Fatal error in Downloader"), exc_info=True)
self.pause()
def process_nw(self, nw: NewsWrapper):
def process_nw(self, nw: NewsWrapper, event: int):
"""Receive data from a NewsWrapper and handle the response"""
try:
bytes_received, end_of_line, article_done = nw.recv_chunk()
except ssl.SSLWantReadError:
return
except (ConnectionError, ConnectionAbortedError):
# The ConnectionAbortedError is also thrown by sabctools in case of fatal SSL-layer problems
self.__reset_nw(nw, "Server closed connection", wait=False)
return
except BufferError:
# The BufferError is thrown when exceeding maximum buffer size
# Make sure to discard the article
self.__reset_nw(nw, "Maximum data buffer size exceeded", wait=False, retry_article=False)
return
if event & selectors.EVENT_READ:
self.process_nw_read(nw)
if event & selectors.EVENT_WRITE:
nw.write()
def process_nw_read(self, nw: NewsWrapper) -> None:
bytes_received: int = 0
bytes_pending: int = 0
while True:
try:
n, bytes_pending = nw.read(nbytes=bytes_pending)
bytes_received += n
except ssl.SSLWantReadError:
return
except (ConnectionError, ConnectionAbortedError):
# The ConnectionAbortedError is also thrown by sabctools in case of fatal SSL-layer problems
self.reset_nw(nw, "Server closed connection", wait=False)
return
except BufferError:
# The BufferError is thrown when exceeding maximum buffer size
# Make sure to discard the article
self.reset_nw(nw, "Maximum data buffer size exceeded", wait=False, retry_article=False)
return
if not bytes_pending:
break
article = nw.article
server = nw.server
with DOWNLOADER_LOCK:
sabnzbd.BPSMeter.update(server.id, bytes_received)
if bytes_received > self.last_max_chunk_size:
self.last_max_chunk_size = bytes_received
# Update statistics only when we fetched a whole article
# The side effect is that we don't count things like article-not-available messages
if article_done:
article.nzf.nzo.update_download_stats(sabnzbd.BPSMeter.bps, server.id, nw.data_position)
# Check speedlimit
if (
self.bandwidth_limit
@@ -747,99 +784,6 @@ class Downloader(Thread):
time.sleep(0.01)
sabnzbd.BPSMeter.update()
# If we are not at the end of a line, more data will follow
if not end_of_line:
return
# Response code depends on request command:
# 220 = ARTICLE, 222 = BODY
if nw.status_code not in (220, 222) and not article_done:
if not nw.connected or nw.status_code == 480:
if not self.__finish_connect_nw(nw):
return
if nw.connected:
logging.info("Connecting %s@%s finished", nw.thrdnum, nw.server.host)
self.__request_article(nw)
elif nw.status_code == 223:
article_done = True
logging.debug("Article <%s> is present on %s", article.article, nw.server.host)
elif nw.status_code in (411, 423, 430, 451):
article_done = True
logging.debug(
"Thread %s@%s: Article %s missing (error=%s)",
nw.thrdnum,
nw.server.host,
article.article,
nw.status_code,
)
nw.reset_data_buffer()
elif nw.status_code == 500:
if article.nzf.nzo.precheck:
# Did we try "STAT" already?
if not server.have_stat:
# Hopless server, just discard
logging.info("Server %s does not support STAT or HEAD, precheck not possible", server.host)
article_done = True
else:
# Assume "STAT" command is not supported
server.have_stat = False
logging.debug("Server %s does not support STAT, trying HEAD", server.host)
else:
# Assume "BODY" command is not supported
server.have_body = False
logging.debug("Server %s does not support BODY", server.host)
nw.reset_data_buffer()
self.__request_article(nw)
else:
# Don't warn for (internal) server errors during downloading
if nw.status_code not in (400, 502, 503):
logging.warning(
T("%s@%s: Received unknown status code %s for article %s"),
nw.thrdnum,
nw.server.host,
nw.status_code,
article.article,
)
# Ditch this thread, we don't know what data we got now so the buffer can be bad
self.__reset_nw(nw, f"Server error or unknown status code: {nw.status_code}", wait=False)
return
if article_done:
# Successful data, clear "bad" counter
server.bad_cons = 0
server.errormsg = server.warning = ""
# Decode
self.decode(article, nw.data_view[: nw.data_position])
if sabnzbd.LOG_ALL:
logging.debug("Thread %s@%s: %s done", nw.thrdnum, server.host, article.article)
# Reset connection for new activity
nw.soft_reset()
# Request a new article immediately if possible
if (
nw.connected
and server.active
and not server.restart
and not (self.paused or self.shutdown or self.paused_for_postproc)
):
nw.article = server.get_article()
if nw.article:
self.__request_article(nw)
return
# Make socket available again
server.busy_threads.discard(nw)
server.idle_threads.add(nw)
self.remove_socket(nw)
def check_assembler_levels(self):
"""Check the Assembler queue to see if we need to delay, depending on queue size"""
if (assembler_level := sabnzbd.Assembler.queue_level()) > SOFT_ASSEMBLER_QUEUE_LIMIT:
@@ -865,13 +809,12 @@ class Downloader(Thread):
logged_counter += 1
@synchronized(DOWNLOADER_LOCK)
def __finish_connect_nw(self, nw: NewsWrapper) -> bool:
def finish_connect_nw(self, nw: NewsWrapper, response: sabctools.NNTPResponse) -> bool:
server = nw.server
try:
nw.finish_connect(nw.status_code)
nw.finish_connect(response.status_code, response.message)
if sabnzbd.LOG_ALL:
logging.debug("%s@%s last message -> %s", nw.thrdnum, server.host, nw.nntp_msg)
nw.reset_data_buffer()
logging.debug("%s@%s last message -> %d", nw.thrdnum, server.host, response.status_code)
except NNTPPermanentError as error:
# Handle login problems
block = False
@@ -884,7 +827,7 @@ class Downloader(Thread):
errormsg = T("Too many connections to server %s [%s]") % (server.host, error.msg)
if server.active:
# Don't count this for the tries (max_art_tries) on this server
self.__reset_nw(nw)
self.reset_nw(nw)
self.plan_server(server, _PENALTY_TOOMANY)
elif error.code in (502, 481, 482) and clues_too_many_ip(error.msg):
# Login from (too many) different IP addresses
@@ -934,7 +877,7 @@ class Downloader(Thread):
if penalty and (block or server.optional):
self.plan_server(server, penalty)
# Note that the article is discard for this server if the server is not required
self.__reset_nw(nw, retry_article=retry_article)
self.reset_nw(nw, retry_article=retry_article)
return False
except Exception as err:
logging.error(
@@ -945,11 +888,11 @@ class Downloader(Thread):
)
logging.info("Traceback: ", exc_info=True)
# No reset-warning needed, above logging is sufficient
self.__reset_nw(nw, retry_article=False)
self.reset_nw(nw, retry_article=False)
return True
@synchronized(DOWNLOADER_LOCK)
def __reset_nw(
def reset_nw(
self,
nw: NewsWrapper,
reset_msg: Optional[str] = None,
@@ -957,6 +900,7 @@ class Downloader(Thread):
wait: bool = True,
count_article_try: bool = True,
retry_article: bool = True,
article: Optional["sabnzbd.nzbstuff.Article"] = None,
):
# Some warnings are errors, and not added as server.warning
if warn and reset_msg:
@@ -972,20 +916,8 @@ class Downloader(Thread):
# Make sure it is not in the readable sockets
self.remove_socket(nw)
if nw.article and not nw.article.nzf.nzo.removed_from_queue:
# Only some errors should count towards the total tries for each server
if count_article_try:
nw.article.tries += 1
# Do we discard, or try again for this server
if not retry_article or (not nw.server.required and nw.article.tries > cfg.max_art_tries()):
# Too many tries on this server, consider article missing
self.decode(nw.article)
nw.article.tries = 0
else:
# Allow all servers again for this article
# Do not use the article_queue, as the server could already have been disabled when we get here!
nw.article.allow_new_fetcher()
# Discard the article request which failed
nw.discard(article, count_article_try=count_article_try, retry_article=retry_article)
# Reset connection object
nw.hard_reset(wait)
@@ -993,21 +925,6 @@ class Downloader(Thread):
# Empty SSL info, it might change on next connect
nw.server.ssl_info = ""
def __request_article(self, nw: NewsWrapper):
try:
if sabnzbd.LOG_ALL:
logging.debug("Thread %s@%s: BODY %s", nw.thrdnum, nw.server.host, nw.article.article)
nw.body()
# Mark as ready to be read
self.add_socket(nw.nntp.fileno, nw)
except socket.error as err:
logging.info("Looks like server closed connection: %s", err)
self.__reset_nw(nw, "Server broke off connection", warn=True)
except Exception:
logging.error(T("Suspect error in downloader"))
logging.info("Traceback: ", exc_info=True)
self.__reset_nw(nw, "Server broke off connection", warn=True)
# ------------------------------------------------------------------------------
# Timed restart of servers admin.
# For each server all planned events are kept in a list.

View File

@@ -913,6 +913,7 @@ SPECIAL_VALUE_LIST = (
"ssdp_broadcast_interval",
"unrar_parameters",
"outgoing_nntp_ip",
"pipelining_requests",
)
SPECIAL_LIST_LIST = (
"rss_odd_titles",

View File

@@ -21,20 +21,22 @@ sabnzbd.newswrapper
import errno
import socket
import threading
from collections import deque
from selectors import EVENT_READ, EVENT_WRITE
from threading import Thread
import time
import logging
import ssl
import sabctools
from typing import Optional, Union
from typing import Optional, Tuple, Union, Callable
import sabctools
import sabnzbd
import sabnzbd.cfg
from sabnzbd.constants import DEF_NETWORKING_TIMEOUT, NNTP_BUFFER_SIZE, NTTP_MAX_BUFFER_SIZE
from sabnzbd.encoding import utob, ubtou
from sabnzbd.constants import DEF_NETWORKING_TIMEOUT, NNTP_BUFFER_SIZE, Status, FORCE_PRIORITY
from sabnzbd.encoding import utob
from sabnzbd.get_addrinfo import AddrInfo
from sabnzbd.decorators import synchronized, DOWNLOADER_LOCK
from sabnzbd.misc import int_conv
# Set pre-defined socket timeout
socket.setdefaulttimeout(DEF_NETWORKING_TIMEOUT)
@@ -57,10 +59,8 @@ class NewsWrapper:
"thrdnum",
"blocking",
"timeout",
"article",
"data",
"data_view",
"data_position",
"decoder",
"send_buffer",
"nntp",
"connected",
"user_sent",
@@ -69,6 +69,11 @@ class NewsWrapper:
"user_ok",
"pass_ok",
"force_login",
"next_request",
"concurrent_requests",
"_response_queue",
"selector_events",
"lock",
)
def __init__(self, server, thrdnum, block=False):
@@ -77,11 +82,9 @@ class NewsWrapper:
self.blocking: bool = block
self.timeout: Optional[float] = None
self.article: Optional[sabnzbd.nzbstuff.Article] = None
self.data: Optional[bytearray] = None
self.data_view: Optional[memoryview] = None
self.data_position: int = 0
self.decoder: Optional[sabctools.Decoder] = None
self.send_buffer = b""
self.nntp: Optional[NNTP] = None
@@ -93,14 +96,22 @@ class NewsWrapper:
self.force_login: bool = False
self.group: Optional[str] = None
@property
def status_code(self) -> Optional[int]:
if self.data_position >= 3:
return int_conv(self.data[:3])
# Command queue and concurrency
self.next_request: Optional[tuple[bytes, Optional["sabnzbd.nzbstuff.Article"]]] = None
self.concurrent_requests: threading.BoundedSemaphore = threading.BoundedSemaphore(
sabnzbd.cfg.pipelining_requests()
)
self._response_queue: deque[Optional[sabnzbd.nzbstuff.Article]] = deque()
self.selector_events = 0
self.lock: threading.Lock = threading.Lock()
@property
def nntp_msg(self) -> str:
return ubtou(self.data[: self.data_position]).strip()
def article(self) -> Optional["sabnzbd.nzbstuff.Article"]:
"""The article currently being downloaded"""
with self.lock:
if self._response_queue:
return self._response_queue[0]
return None
def init_connect(self):
"""Setup the connection in NNTP object"""
@@ -109,13 +120,15 @@ class NewsWrapper:
raise socket.error(errno.EADDRNOTAVAIL, T("Invalid server address."))
# Construct buffer and NNTP object
self.data = sabctools.bytearray_malloc(NNTP_BUFFER_SIZE)
self.data_view = memoryview(self.data)
self.reset_data_buffer()
self.decoder = sabctools.Decoder(NNTP_BUFFER_SIZE)
self.nntp = NNTP(self, self.server.addrinfo)
self.timeout = time.time() + self.server.timeout
def finish_connect(self, code: int):
# On connect the first "response" will be 200 Welcome
self._response_queue.append(None)
self.concurrent_requests.acquire()
def finish_connect(self, code: int, message: str) -> None:
"""Perform login options"""
if not (self.server.username or self.server.password or self.force_login):
self.connected = True
@@ -133,11 +146,10 @@ class NewsWrapper:
self.pass_ok = False
if code in (400, 500, 502):
raise NNTPPermanentError(self.nntp_msg, code)
raise NNTPPermanentError(message, code)
elif not self.user_sent:
command = utob("authinfo user %s\r\n" % self.server.username)
self.nntp.sock.sendall(command)
self.reset_data_buffer()
self.queue_command(command)
self.user_sent = True
elif not self.user_ok:
if code == 381:
@@ -151,98 +163,254 @@ class NewsWrapper:
if self.user_ok and not self.pass_sent:
command = utob("authinfo pass %s\r\n" % self.server.password)
self.nntp.sock.sendall(command)
self.reset_data_buffer()
self.queue_command(command)
self.pass_sent = True
elif self.user_ok and not self.pass_ok:
if code != 281:
# Assume that login failed (code 481 or other)
raise NNTPPermanentError(self.nntp_msg, code)
raise NNTPPermanentError(message, code)
else:
self.connected = True
self.timeout = time.time() + self.server.timeout
def body(self):
def queue_command(
self,
command: bytes,
article: Optional["sabnzbd.nzbstuff.Article"] = None,
) -> None:
"""Add a command to the command queue"""
self.next_request = command, article
def body(self, article: "sabnzbd.nzbstuff.Article") -> tuple[bytes, "sabnzbd.nzbstuff.Article"]:
"""Request the body of the article"""
self.timeout = time.time() + self.server.timeout
if self.article.nzf.nzo.precheck:
if article.nzf.nzo.precheck:
if self.server.have_stat:
command = utob("STAT <%s>\r\n" % self.article.article)
command = utob("STAT <%s>\r\n" % article.article)
else:
command = utob("HEAD <%s>\r\n" % self.article.article)
command = utob("HEAD <%s>\r\n" % article.article)
elif self.server.have_body:
command = utob("BODY <%s>\r\n" % self.article.article)
command = utob("BODY <%s>\r\n" % article.article)
else:
command = utob("ARTICLE <%s>\r\n" % self.article.article)
self.nntp.sock.sendall(command)
self.reset_data_buffer()
command = utob("ARTICLE <%s>\r\n" % article.article)
return command, article
def recv_chunk(self) -> tuple[int, bool, bool]:
"""Receive data, return #bytes, end-of-line, end-of-article"""
# Resize the buffer in the extremely unlikely case that it got full
if self.data_position == len(self.data):
self.nntp.nw.increase_data_buffer()
def on_response(self, response: sabctools.NNTPResponse, article: Optional["sabnzbd.nzbstuff.Article"]) -> None:
"""A response to a NNTP request is received"""
self.concurrent_requests.release()
sabnzbd.Downloader.modify_socket(self, EVENT_READ | EVENT_WRITE)
server = self.server
article_done = response.status_code in (220, 222) and article
# Receive data into the pre-allocated buffer
if self.nntp.nw.server.ssl and not self.nntp.nw.blocking and sabctools.openssl_linked:
if article_done:
with DOWNLOADER_LOCK:
# Update statistics only when we fetched a whole article
# The side effect is that we don't count things like article-not-available messages
article.nzf.nzo.update_download_stats(sabnzbd.BPSMeter.bps, server.id, response.bytes_read)
# Response code depends on request command:
# 220 = ARTICLE, 222 = BODY
if not article_done:
if not self.connected or not article or response.status_code in (281, 381, 480, 481, 482):
self.discard(article, count_article_try=False)
if not sabnzbd.Downloader.finish_connect_nw(self, response):
return
if self.connected:
logging.info("Connecting %s@%s finished", self.thrdnum, server.host)
elif response.status_code == 223:
article_done = True
logging.debug("Article <%s> is present on %s", article.article, server.host)
elif response.status_code in (411, 423, 430, 451):
article_done = True
logging.debug(
"Thread %s@%s: Article %s missing (error=%s)",
self.thrdnum,
server.host,
article.article,
response.status_code,
)
elif response.status_code == 500:
if article.nzf.nzo.precheck:
# Did we try "STAT" already?
if not server.have_stat:
# Hopless server, just discard
logging.info("Server %s does not support STAT or HEAD, precheck not possible", server.host)
article_done = True
else:
# Assume "STAT" command is not supported
server.have_stat = False
logging.debug("Server %s does not support STAT, trying HEAD", server.host)
else:
# Assume "BODY" command is not supported
server.have_body = False
logging.debug("Server %s does not support BODY", server.host)
self.discard(article, count_article_try=False)
else:
# Don't warn for (internal) server errors during downloading
if response.status_code not in (400, 502, 503):
logging.warning(
T("%s@%s: Received unknown status code %s for article %s"),
self.thrdnum,
server.host,
response.status_code,
article.article,
)
# Ditch this thread, we don't know what data we got now so the buffer can be bad
sabnzbd.Downloader.reset_nw(
self, f"Server error or unknown status code: {response.status_code}", wait=False, article=article
)
return
if article_done:
# Successful data, clear "bad" counter
server.bad_cons = 0
server.errormsg = server.warning = ""
# Decode
sabnzbd.Downloader.decode(article, response)
if sabnzbd.LOG_ALL:
logging.debug("Thread %s@%s: %s done", self.thrdnum, server.host, article.article)
def read(
self,
nbytes: int = 0,
on_response: Optional[Callable[[int, str], None]] = None,
) -> Tuple[int, Optional[int]]:
"""Receive data, return #bytes, #pendingbytes
:param nbytes: maximum number of bytes to read
:param on_response: callback for each complete response received
:return: #bytes, #pendingbytes
"""
# Receive data into the decoder pre-allocated buffer
if not nbytes and self.nntp.nw.server.ssl and not self.nntp.nw.blocking and sabctools.openssl_linked:
# Use patched version when downloading
bytes_recv = sabctools.unlocked_ssl_recv_into(self.nntp.sock, self.data_view[self.data_position :])
bytes_recv = sabctools.unlocked_ssl_recv_into(self.nntp.sock, self.decoder)
else:
bytes_recv = self.nntp.sock.recv_into(self.data_view[self.data_position :])
bytes_recv = self.nntp.sock.recv_into(self.decoder, nbytes=nbytes)
# No data received
if bytes_recv == 0:
raise ConnectionError("Server closed connection")
# Success, move timeout and internal data position
# Success, move timeout
self.timeout = time.time() + self.server.timeout
self.data_position += bytes_recv
self.decoder.process(bytes_recv)
for response in self.decoder:
with self.lock:
article = self._response_queue.popleft()
if on_response:
on_response(response.status_code, response.message)
self.on_response(response, article)
# The SSL-layer might still contain data even though the socket does not. Another Downloader-loop would
# not identify this socket anymore as it is not returned by select(). So, we have to forcefully trigger
# another recv_chunk so the buffer is increased and the data from the SSL-layer is read. See #2752.
if self.nntp.nw.server.ssl and self.data_position == len(self.data) and self.nntp.sock.pending() > 0:
# We do not perform error-handling, as we know there is data available to read
additional_bytes_recv, additional_end_of_line, additional_end_of_article = self.recv_chunk()
return bytes_recv + additional_bytes_recv, additional_end_of_line, additional_end_of_article
if self.server.ssl and self.nntp and (pending := self.nntp.sock.pending()):
return bytes_recv, pending
return bytes_recv, None
# Check for end of line
# Using the data directly seems faster than the memoryview
if self.data[self.data_position - 2 : self.data_position] == b"\r\n":
# Official end-of-article is "\r\n.\r\n"
if self.data[self.data_position - 5 : self.data_position] == b"\r\n.\r\n":
return bytes_recv, True, True
return bytes_recv, True, False
def write(self):
"""Send data to server"""
server = self.server
# Still in middle of data, so continue!
return bytes_recv, False, False
try:
# First, try to flush any remaining data
if self.send_buffer:
sent = self.nntp.sock.send(self.send_buffer)
self.send_buffer = self.send_buffer[sent:]
if self.send_buffer:
# Still unsent data, wait for next EVENT_WRITE
return
def soft_reset(self):
"""Reset for the next article"""
self.timeout = None
self.article = None
self.reset_data_buffer()
if self.connected:
if (
server.active
and not server.restart
and not (
sabnzbd.Downloader.paused
or sabnzbd.Downloader.shutdown
or sabnzbd.Downloader.paused_for_postproc
)
):
# Prepare the next request
if not self.next_request and (article := server.get_article()):
self.next_request = self.body(article)
elif self.next_request and self.next_request[1]:
# Discard the next request
self.discard(self.next_request[1], count_article_try=False, retry_article=True)
self.next_request = None
def reset_data_buffer(self):
"""Reset the data position"""
self.data_position = 0
# If no pending buffer, try to send new command
if not self.send_buffer and self.next_request:
if self.concurrent_requests.acquire(blocking=False):
command, article = self.next_request
self.next_request = None
if article:
nzo = article.nzf.nzo
if nzo.removed_from_queue or nzo.status is Status.PAUSED and nzo.priority is not FORCE_PRIORITY:
self.discard(article, count_article_try=False, retry_article=True)
self.concurrent_requests.release()
return
self._response_queue.append(article)
if sabnzbd.LOG_ALL:
logging.debug("Thread %s@%s: %s", self.thrdnum, server.host, command)
try:
sent = self.nntp.sock.send(command)
if sent < len(command):
# Partial send, store remainder
self.send_buffer = command[sent:]
except (BlockingIOError, ssl.SSLWantWriteError):
# Can't send now, store full command
self.send_buffer = command
else:
# Concurrency limit reached
sabnzbd.Downloader.modify_socket(self, EVENT_READ)
else:
# Is it safe to shut down this socket?
if (
not self.send_buffer
and not self.next_request
and not self._response_queue
and (not server.active or server.restart or time.time() > self.timeout)
):
# Make socket available again
server.busy_threads.discard(self)
server.idle_threads.add(self)
sabnzbd.Downloader.remove_socket(self)
def increase_data_buffer(self):
"""Resize the buffer in the extremely unlikely case that it overflows"""
# Sanity check before we go any further
if len(self.data) > NTTP_MAX_BUFFER_SIZE:
raise BufferError("Maximum data buffer size exceeded")
# Input needs to be integer, floats don't work
new_buffer = sabctools.bytearray_malloc(len(self.data) + NNTP_BUFFER_SIZE // 2)
new_buffer[: len(self.data)] = self.data
logging.info("Increased buffer from %d to %d for %s", len(self.data), len(new_buffer), str(self))
self.data = new_buffer
self.data_view = memoryview(self.data)
except (BlockingIOError, ssl.SSLWantWriteError):
# Socket not currently writable — just try again later
return
except socket.error as err:
logging.info("Looks like server closed connection: %s", err)
sabnzbd.Downloader.reset_nw(self, "Server broke off connection", warn=True)
except Exception:
logging.error(T("Suspect error in downloader"))
logging.info("Traceback: ", exc_info=True)
sabnzbd.Downloader.reset_nw(self, "Server broke off connection", warn=True)
def hard_reset(self, wait: bool = True):
"""Destroy and restart"""
with self.lock:
# Drain unsent requests
if self.next_request:
_, article = self.next_request
if article:
self.discard(article, count_article_try=False, retry_article=True)
self.next_request = None
# Drain responses
while self._response_queue:
if article := self._response_queue.popleft():
self.discard(article, count_article_try=False, retry_article=True)
if self.nntp:
self.nntp.close(send_quit=self.connected)
self.nntp = None
@@ -258,6 +426,28 @@ class NewsWrapper:
# Reset for internal reasons, just wait 5 sec
self.timeout = time.time() + 5
def discard(
self,
article: Optional["sabnzbd.nzbstuff.Article"],
count_article_try: bool = True,
retry_article: bool = True,
) -> None:
"""Discard an article back to the queue"""
if article and not article.nzf.nzo.removed_from_queue:
# Only some errors should count towards the total tries for each server
if count_article_try:
article.tries += 1
# Do we discard, or try again for this server
if not retry_article or (not self.server.required and article.tries > sabnzbd.cfg.max_art_tries()):
# Too many tries on this server, consider article missing
sabnzbd.Downloader.decode(article)
article.tries = 0
else:
# Allow all servers again for this article
# Do not use the article_queue, as the server could already have been disabled when we get here!
article.allow_new_fetcher()
def __repr__(self):
return "<NewsWrapper: server=%s:%s, thread=%s, connected=%s>" % (
self.server.host,
@@ -379,7 +569,7 @@ class NNTP:
# Locked, so it can't interleave with any of the Downloader "__nw" actions
with DOWNLOADER_LOCK:
if not self.closed:
sabnzbd.Downloader.add_socket(self.fileno, self.nw)
sabnzbd.Downloader.add_socket(self.nw)
except OSError as e:
self.error(e)

View File

@@ -692,7 +692,7 @@ class NzbQueue:
return False
return False
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> list[Article]:
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> None:
"""Get next article for jobs in the queue
Not locked for performance, since it only reads the queue
"""
@@ -705,12 +705,12 @@ class NzbQueue:
and not nzo.propagation_delay_left
) or nzo.priority == FORCE_PRIORITY:
if not nzo.server_in_try_list(server):
if articles := nzo.get_articles(server, servers, fetch_limit):
return articles
nzo.get_articles(server, servers, fetch_limit)
if server.article_queue:
break
# Stop after first job that wasn't paused/propagating/etc
if self.__top_only:
return []
return []
break
def register_article(self, article: Article, success: bool = True):
"""Register the articles we tried
@@ -893,11 +893,14 @@ class NzbQueue:
if nzf.all_servers_in_try_list(active_servers):
# Check for articles where all active servers have already been tried
for article in nzf.articles[:]:
if article.all_servers_in_try_list(active_servers):
logging.debug("Removing article %s with bad trylist in file %s", article, nzf.filename)
nzo.increase_bad_articles_counter("missing_articles")
sabnzbd.NzbQueue.register_article(article, success=False)
with nzf:
for article in nzf.articles:
if article.all_servers_in_try_list(active_servers):
logging.debug(
"Removing article %s with bad trylist in file %s", article, nzf.filename
)
nzo.increase_bad_articles_counter("missing_articles")
sabnzbd.NzbQueue.register_article(article, success=False)
logging.info("Resetting bad trylist for file %s in job %s", nzf.filename, nzo.final_name)
nzf.reset_try_list()

View File

@@ -26,7 +26,7 @@ import datetime
import threading
import functools
import difflib
from typing import Any, Optional, Union, BinaryIO
from typing import Any, Optional, Union, BinaryIO, Deque
# SABnzbd modules
import sabnzbd
@@ -328,11 +328,12 @@ class NzbFile(TryList):
"""Representation of one file consisting of multiple articles"""
# Pre-define attributes to save memory
__slots__ = NzbFileSaver
__slots__ = NzbFileSaver + ("lock",)
def __init__(self, date, subject, raw_article_db, file_bytes, nzo):
"""Setup object"""
super().__init__()
self.lock = threading.RLock()
self.date: datetime.datetime = date
self.type: Optional[str] = None
@@ -347,7 +348,7 @@ class NzbFile(TryList):
self.setname: Optional[str] = None
# Articles are removed from "articles" after being fetched
self.articles: list[Article] = []
self.articles: dict[Article, Article] = {}
self.decodetable: list[Article] = []
self.bytes: int = file_bytes
@@ -402,17 +403,18 @@ class NzbFile(TryList):
def add_article(self, article_info):
"""Add article to object database and return article object"""
article = Article(article_info[0], article_info[1], self)
self.articles.append(article)
self.decodetable.append(article)
with self.lock:
self.articles[article] = article
self.decodetable.append(article)
return article
def remove_article(self, article: Article, success: bool) -> int:
"""Handle completed article, possibly end of file"""
if article in self.articles:
self.articles.remove(article)
if success:
self.bytes_left -= article.bytes
return len(self.articles)
with self.lock:
if self.articles.pop(article, None) is not None:
if success:
self.bytes_left -= article.bytes
return len(self.articles)
def set_par2(self, setname, vol, blocks):
"""Designate this file as a par2 file"""
@@ -427,24 +429,25 @@ class NzbFile(TryList):
else:
self.crc32 = sabctools.crc32_combine(self.crc32, crc32, length)
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> list[Article]:
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int):
"""Get next articles to be downloaded"""
articles = []
for article in self.articles:
if article := article.get_article(server, servers):
articles.append(article)
if len(articles) >= fetch_limit:
return articles
articles = server.article_queue
with self.lock:
for article in self.articles:
if article := article.get_article(server, servers):
articles.append(article)
if len(articles) >= fetch_limit:
return
self.add_to_try_list(server)
return articles
@synchronized(TRYLIST_LOCK)
def reset_all_try_lists(self):
"""Reset all try lists. Locked so reset is performed
for all items at the same time without chance of another
thread changing any of the items while we are resetting"""
for art in self.articles:
art.reset_try_list()
with self.lock:
for art in self.articles:
art.reset_try_list()
self.reset_try_list()
def first_article_processed(self) -> bool:
@@ -474,7 +477,10 @@ class NzbFile(TryList):
@property
def completed(self):
"""Is this file completed?"""
return self.import_finished and not bool(self.articles)
if not self.import_finished:
return False
with self.lock:
return not self.articles
def remove_admin(self):
"""Remove article database from disk (sabnzbd_nzf_<id>)"""
@@ -484,6 +490,12 @@ class NzbFile(TryList):
except Exception:
pass
def __enter__(self):
self.lock.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
def __getstate__(self):
"""Save to pickle file, selecting attributes"""
dict_ = {}
@@ -501,6 +513,10 @@ class NzbFile(TryList):
# Handle new attributes
setattr(self, item, None)
super().__setstate__(dict_.get("try_list", []))
self.lock = threading.RLock()
if isinstance(self.articles, list):
# Converted from list to dict
self.articles = {x: x for x in self.articles}
def __eq__(self, other: "NzbFile"):
"""Assume it's the same file if the number bytes and first article
@@ -1640,8 +1656,9 @@ class NzbObject(TryList):
self.nzo_info[bad_article_type] += 1
self.bad_articles += 1
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> list[Article]:
articles = []
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int):
"""Assign articles server up to the fetch_limit"""
articles: Deque[Article] = server.article_queue
nzf_remove_list = []
# Did we go through all first-articles?
@@ -1676,7 +1693,8 @@ class NzbObject(TryList):
else:
break
if articles := nzf.get_articles(server, servers, fetch_limit):
nzf.get_articles(server, servers, fetch_limit)
if articles:
break
# Remove all files for which admin could not be read
@@ -1691,7 +1709,6 @@ class NzbObject(TryList):
if not articles:
# No articles for this server, block for next time
self.add_to_try_list(server)
return articles
@synchronized(NZO_LOCK)
def move_top_bulk(self, nzf_ids: list[str]):

View File

@@ -45,32 +45,38 @@ ARTICLE_INFO = re.compile(
YENC_ESCAPE = [0x00, 0x0A, 0x0D, ord("="), ord(".")]
class NewsServerProtocol(asyncio.Protocol):
def __init__(self):
self.transport = None
self.connected = False
self.in_article = False
super().__init__()
class NewsServerSession:
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer
def connection_made(self, transport):
logging.info("Connection from %s", transport.get_extra_info("peername"))
self.transport = transport
self.connected = True
self.transport.write(b"200 Welcome (SABNews)\r\n")
async def run(self):
self.writer.write(b"200 Welcome (SABNews)\r\n")
await self.writer.drain()
def data_received(self, message):
logging.debug("Data received: %s", message.strip())
try:
while not self.reader.at_eof():
message = await self.reader.readuntil(b"\r\n")
logging.debug("Data received: %s", message.strip())
await self.handle_command(message)
except (ConnectionResetError, asyncio.IncompleteReadError):
logging.debug("Client closed connection")
# Handle basic commands
async def handle_command(self, message: bytes):
"""Handle basic NNTP commands, \r\n is already stripped."""
if message.startswith(b"QUIT"):
self.close_connection()
elif message.startswith((b"ARTICLE", b"BODY")):
await self.close_connection()
return
if message.startswith((b"ARTICLE", b"BODY")):
parsed_message = ARTICLE_INFO.search(message)
self.serve_article(parsed_message)
await self.serve_article(parsed_message)
return
# self.transport.write(data)
self.writer.write(b"500 Unknown command\r\n")
await self.writer.drain()
def serve_article(self, parsed_message):
async def serve_article(self, parsed_message):
# Check if we parsed everything
try:
message_id = parsed_message.group("message_id")
@@ -81,34 +87,37 @@ class NewsServerProtocol(asyncio.Protocol):
size = int(parsed_message.group("size"))
except (AttributeError, ValueError):
logging.warning("Can't parse article information")
self.transport.write(b"430 No Such Article Found (bad message-id)\r\n")
self.writer.write(b"430 No Such Article Found (bad message-id)\r\n")
await self.writer.drain()
return
# Check if file exists
if not os.path.exists(file):
logging.warning("File not found: %s", file)
self.transport.write(b"430 No Such Article Found (no file on disk)\r\n")
self.writer.write(b"430 No Such Article Found (no file on disk)\r\n")
await self.writer.drain()
return
# Check if sizes are valid
file_size = os.path.getsize(file)
if start + size > file_size:
logging.warning("Invalid start/size attributes")
self.transport.write(b"430 No Such Article Found (invalid start/size attributes)\r\n")
self.writer.write(b"430 No Such Article Found (invalid start/size attributes)\r\n")
await self.writer.drain()
return
logging.debug("Serving %s" % message_id)
# File is found, send headers
self.transport.write(b"222 0 %s\r\n" % message_id)
self.transport.write(b"Message-ID: %s\r\n" % message_id)
self.transport.write(b'Subject: "%s"\r\n\r\n' % file_base.encode("utf-8"))
self.writer.write(b"222 0 %s\r\n" % message_id)
self.writer.write(b"Message-ID: %s\r\n" % message_id)
self.writer.write(b'Subject: "%s"\r\n\r\n' % file_base.encode("utf-8"))
# Write yEnc headers
self.transport.write(
self.writer.write(
b"=ybegin part=%d line=128 size=%d name=%s\r\n" % (part, file_size, file_base.encode("utf-8"))
)
self.transport.write(b"=ypart begin=%d end=%d\r\n" % (start + 1, start + size))
self.writer.write(b"=ypart begin=%d end=%d\r\n" % (start + 1, start + size))
with open(file, "rb") as inp_file:
inp_file.seek(start)
@@ -116,24 +125,31 @@ class NewsServerProtocol(asyncio.Protocol):
# Encode data
output_string, crc = sabctools.yenc_encode(inp_buffer)
self.transport.write(output_string)
self.writer.write(output_string)
# Write footer
self.transport.write(b"\r\n=yend size=%d part=%d pcrc32=%08x\r\n" % (size, part, crc))
self.transport.write(b".\r\n")
self.writer.write(b"\r\n=yend size=%d part=%d pcrc32=%08x\r\n" % (size, part, crc))
self.writer.write(b".\r\n")
await self.writer.drain()
def close_connection(self):
async def close_connection(self):
logging.debug("Closing connection")
self.transport.write(b"205 Connection closing\r\n")
self.transport.close()
self.writer.write(b"205 Connection closing\r\n")
await self.writer.drain()
self.writer.close()
await self.writer.wait_closed()
async def connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
session = NewsServerSession(reader, writer)
await session.run()
async def serve_sabnews(hostname, port):
# Start server
logging.info("Starting SABNews on %s:%d", hostname, port)
loop = asyncio.get_running_loop()
server = await loop.create_server(lambda: NewsServerProtocol(), hostname, port)
server = await asyncio.start_server(connection_handler, hostname, port)
async with server:
await server.serve_forever()

View File

@@ -21,10 +21,12 @@ tests.test_decoder- Testing functions in decoder.py
import binascii
import os
import pytest
from io import BytesIO
from random import randint
from unittest import mock
import sabctools
import sabnzbd.decoder as decoder
from sabnzbd.nzbstuff import Article
@@ -111,7 +113,7 @@ class TestUuDecoder:
result.append(END_DATA)
# Signal the end of the message with a dot on a line of its own
data.append(b".")
data.append(b".\r\n")
# Join the data with \r\n line endings, just like we get from socket reads
data = b"\r\n".join(data)
@@ -120,22 +122,26 @@ class TestUuDecoder:
return article, bytearray(data), result
def test_no_data(self):
with pytest.raises(decoder.BadUu):
assert decoder.decode_uu(None, None)
@staticmethod
def _response(raw_data: bytes) -> sabctools.NNTPResponse:
dec = sabctools.Decoder(len(raw_data))
reader = BytesIO(raw_data)
reader.readinto(dec)
dec.process(len(raw_data))
return next(dec)
@pytest.mark.parametrize(
"raw_data",
[
b"",
b"\r\n\r\n",
b"foobar\r\n", # Plenty of list items, but (too) few actual lines
b"222 0 <artid@woteva>\r\nX-Too-Short: yup\r\n",
b"222 0 <foo@bar>\r\n.\r\n",
b"222 0 <foo@bar>\r\n\r\n.\r\n",
b"222 0 <foo@bar>\r\nfoobar\r\n.\r\n", # Plenty of list items, but (too) few actual lines
b"222 0 <foo@bar>\r\nX-Too-Short: yup\r\n.\r\n",
],
)
def test_short_data(self, raw_data):
with pytest.raises(decoder.BadUu):
assert decoder.decode_uu(None, bytearray(raw_data))
assert decoder.decode_uu(Article("foo@bar", 4321, None), self._response(raw_data))
@pytest.mark.parametrize(
"raw_data",
@@ -158,7 +164,8 @@ class TestUuDecoder:
with pytest.raises(decoder.BadUu):
raw_data = bytearray(raw_data)
raw_data.extend(filler)
assert decoder.decode_uu(article, raw_data)
raw_data.extend(b".\r\n")
assert decoder.decode_uu(article, self._response(raw_data))
@pytest.mark.parametrize("insert_empty_line", [True, False])
@pytest.mark.parametrize("insert_excess_empty_lines", [True, False])
@@ -194,7 +201,7 @@ class TestUuDecoder:
insert_dot_stuffing_line,
begin_line,
)
assert decoder.decode_uu(article, raw_data) == expected_result
assert decoder.decode_uu(article, self._response(raw_data)) == expected_result
assert article.nzf.filename_checked
@pytest.mark.parametrize("insert_empty_line", [True, False])
@@ -205,7 +212,7 @@ class TestUuDecoder:
decoded_data = expected_data = b""
for part in ("begin", "middle", "middle", "end"):
article, data, result = self._generate_msg_part(part, insert_empty_line, False, False, True)
decoded_data += decoder.decode_uu(article, data)
decoded_data += decoder.decode_uu(article, self._response(data))
expected_data += result
# Verify results
@@ -223,4 +230,6 @@ class TestUuDecoder:
article.lowest_partnum = False
filler = b"\r\n".join(VALID_UU_LINES[:4]) + b"\r\n"
with pytest.raises(decoder.BadData):
assert decoder.decode_uu(article, bytearray(b"222 0 <foo@bar>\r\n" + filler + bad_data + b"\r\n"))
assert decoder.decode_uu(
article, self._response(bytearray(b"222 0 <foo@bar>\r\n" + filler + bad_data + b"\r\n.\r\n"))
)