mirror of
https://github.com/sabnzbd/sabnzbd.git
synced 2026-06-07 07:16:34 -04:00
Pipelining and performance optimisations (#3199)
* Pipelining and performance optimisations
* Refactor to remove handle_remainder and add on_response callback to allow inspecting of nntp messages
* Logic fix if there are sockets but nothing to read/write
* Fix logic errors for failed article requests
* Fix logic for reconfiguring servers
* Add guard_restart callback to pipelining_requests
* Fix article download stats
* Fix current article request shown via api
* Removal of DecodingStatus
* Fix circular reference
* Cleanup imports
* Handle reset_nw and hard_reset for inflight requests
* Improve __request_article behaviour using discard helper
* Article should be None here (before auth) but just in case
* Remove command_queue_condition unnecessary with the pull rather than push queue system
* During reset discard any data received prior to sending quit request
* Circular references again
* Revert to using bytearray
* Revert "During reset discard any data received prior to sending quit request"
This reverts commit ed522e3e80.
* Simpler interaction with sabctools
* Temporarily use the sabctools streaming decoder branch
* Fix most uu tests
* Reduce maximum pipelining requests
* Fix the squiggly line
* Remove some LOG_ALL debug code
* Make get_articles return consistent (None) - it now populates the server deque
* Reduce NNTP_BUFFER_SIZE
* Rename PIPELINING_REQUESTS to DEF_PIPELINING_REQUESTS
* A little refactoring
* Reduce default pipelining until it is dynamic
* Use BoundedSemaphore and fix the unacquired release
* Use crc from sabctools for uu and make filename logic consistent wit yenc
* Use sabctools 9.0.0
* Fix Check Before Download
* Move lock to NzbFile
* Use sabctools 9.1.0
* Minor change
* Fix 430 on check before download
* Update sabnews to work reliably with pipelining
* Minor tidy up
* Why does only Linux complain about this
* Leave this as it was
* Remove unused import
* Compare enum by identity
* Remove command_queue and just prepare a single request
Check if it should be sent and discard when paused
* Kick-start idle connections
* Modify events sockets are monitored for
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
# Main requirements
|
||||
# Note that not all sub-dependencies are listed, but only ones we know could cause trouble
|
||||
apprise==1.9.5
|
||||
sabctools==8.2.6
|
||||
sabctools==9.1.0
|
||||
CT3==3.4.0.post5
|
||||
cffi==2.0.0
|
||||
pycparser==2.23
|
||||
|
||||
@@ -269,6 +269,7 @@ def initialize(pause_downloader=False, clean_up=False, repair=0):
|
||||
cfg.language.callback(cfg.guard_language)
|
||||
cfg.enable_https_verification.callback(cfg.guard_https_ver)
|
||||
cfg.guard_https_ver()
|
||||
cfg.pipelining_requests.callback(cfg.guard_restart)
|
||||
|
||||
# Set language files
|
||||
lang.set_locale_info("SABnzbd", DIR_LANGUAGE)
|
||||
|
||||
@@ -30,6 +30,8 @@ import cherrypy
|
||||
from threading import Thread
|
||||
from typing import Optional, Any, Union
|
||||
|
||||
import sabctools
|
||||
|
||||
# For json.dumps, orjson is magnitudes faster than ujson, but it is harder to
|
||||
# compile due to Rust dependency. Since the output is the same, we support all modules.
|
||||
try:
|
||||
@@ -1387,12 +1389,20 @@ def test_nntp_server_dict(kwargs: dict[str, Union[str, list[str]]]) -> tuple[boo
|
||||
# Sorry, no clever analysis:
|
||||
return False, T('Server address "%s:%s" is not valid.') % (host, port)
|
||||
|
||||
nw = NewsWrapper(server=test_server, thrdnum=-1, block=True)
|
||||
nntp_code: int = 0
|
||||
nntp_message: str = ""
|
||||
|
||||
def on_response(code: int, message: str):
|
||||
nonlocal nntp_code, nntp_message
|
||||
nntp_code = code
|
||||
nntp_message = message
|
||||
|
||||
try:
|
||||
nw = NewsWrapper(server=test_server, thrdnum=-1, block=True)
|
||||
nw.init_connect()
|
||||
while not nw.connected:
|
||||
nw.recv_chunk()
|
||||
nw.finish_connect(nw.status_code)
|
||||
nw.write()
|
||||
nw.read(on_response=on_response)
|
||||
|
||||
except socket.timeout:
|
||||
if port != 119 and not ssl:
|
||||
@@ -1414,30 +1424,30 @@ def test_nntp_server_dict(kwargs: dict[str, Union[str, list[str]]]) -> tuple[boo
|
||||
return False, str(err)
|
||||
|
||||
if not username or not password:
|
||||
nw.nntp.sock.sendall(b"ARTICLE <test@home>\r\n")
|
||||
nw.queue_command(b"ARTICLE <test@home>\r\n")
|
||||
try:
|
||||
nw.reset_data_buffer()
|
||||
nw.recv_chunk()
|
||||
nw.write()
|
||||
nw.read(on_response=on_response)
|
||||
except Exception as err:
|
||||
# Some internal error, not always safe to close connection
|
||||
return False, str(err)
|
||||
|
||||
# Parse result
|
||||
return_status = ()
|
||||
if nw.status_code:
|
||||
if nw.status_code == 480:
|
||||
if nntp_code:
|
||||
if nntp_code == 480:
|
||||
return_status = (False, T("Server requires username and password."))
|
||||
elif nw.status_code < 300 or nw.status_code in (411, 423, 430):
|
||||
elif nntp_code < 300 or nntp_code in (411, 423, 430):
|
||||
# If no username/password set and we requested fake-article, it will return 430 Not Found
|
||||
return_status = (True, T("Connection Successful!"))
|
||||
elif nw.status_code == 502 or sabnzbd.downloader.clues_login(nw.nntp_msg):
|
||||
elif nntp_code == 502 or sabnzbd.downloader.clues_login(nntp_message):
|
||||
return_status = (False, T("Authentication failed, check username/password."))
|
||||
elif sabnzbd.downloader.clues_too_many(nw.nntp_msg):
|
||||
elif sabnzbd.downloader.clues_too_many(nntp_message):
|
||||
return_status = (False, T("Too many connections, please pause downloading or try again later"))
|
||||
|
||||
# Fallback in case no data was received or unknown status
|
||||
if not return_status:
|
||||
return_status = (False, T("Could not determine connection result (%s)") % nw.nntp_msg)
|
||||
return_status = (False, T("Could not determine connection result (%s)") % nntp_message)
|
||||
|
||||
# Close the connection and return result
|
||||
nw.hard_reset()
|
||||
@@ -1502,13 +1512,13 @@ def build_status(calculate_performance: bool = False, skip_dashboard: bool = Fal
|
||||
for nw in server.busy_threads.copy():
|
||||
if nw.connected:
|
||||
activeconn += 1
|
||||
if nw.article:
|
||||
if article := nw.article:
|
||||
serverconnections.append(
|
||||
{
|
||||
"thrdnum": nw.thrdnum,
|
||||
"art_name": nw.article.article,
|
||||
"nzf_name": nw.article.nzf.filename,
|
||||
"nzo_name": nw.article.nzf.nzo.final_name,
|
||||
"art_name": article.article,
|
||||
"nzf_name": article.nzf.filename,
|
||||
"nzo_name": article.nzf.nzo.final_name,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ from sabnzbd.constants import (
|
||||
DEF_HTTPS_CERT_FILE,
|
||||
DEF_HTTPS_KEY_FILE,
|
||||
DEF_MAX_ASSEMBLER_QUEUE,
|
||||
DEF_PIPELINING_REQUESTS,
|
||||
)
|
||||
from sabnzbd.filesystem import same_directory, real_path, is_valid_script, is_network_path
|
||||
|
||||
@@ -534,6 +535,7 @@ ssdp_broadcast_interval = OptionNumber("misc", "ssdp_broadcast_interval", 15, mi
|
||||
ext_rename_ignore = OptionList("misc", "ext_rename_ignore", validation=lower_case_ext)
|
||||
unrar_parameters = OptionStr("misc", "unrar_parameters", validation=supported_unrar_parameters)
|
||||
outgoing_nntp_ip = OptionStr("misc", "outgoing_nntp_ip")
|
||||
pipelining_requests = OptionNumber("misc", "pipelining_requests", DEF_PIPELINING_REQUESTS, minval=1, maxval=10)
|
||||
|
||||
|
||||
##############################################################################
|
||||
|
||||
@@ -50,7 +50,7 @@ RENAMES_FILE = "__renames__"
|
||||
ATTRIB_FILE = "SABnzbd_attrib"
|
||||
REPAIR_REQUEST = "repair-all.sab"
|
||||
|
||||
SABCTOOLS_VERSION_REQUIRED = "8.2.6"
|
||||
SABCTOOLS_VERSION_REQUIRED = "9.1.0"
|
||||
|
||||
DB_HISTORY_VERSION = 1
|
||||
DB_HISTORY_NAME = "history%s.db" % DB_HISTORY_VERSION
|
||||
@@ -101,8 +101,9 @@ DEF_MAX_ASSEMBLER_QUEUE = 12
|
||||
SOFT_ASSEMBLER_QUEUE_LIMIT = 0.5
|
||||
# Percentage of cache to use before adding file to assembler
|
||||
ASSEMBLER_WRITE_THRESHOLD = 5
|
||||
NNTP_BUFFER_SIZE = int(800 * KIBI)
|
||||
NNTP_BUFFER_SIZE = int(256 * KIBI)
|
||||
NTTP_MAX_BUFFER_SIZE = int(10 * MEBI)
|
||||
DEF_PIPELINING_REQUESTS = 2
|
||||
|
||||
REPAIR_PRIORITY = 3
|
||||
FORCE_PRIORITY = 2
|
||||
|
||||
@@ -21,13 +21,10 @@ sabnzbd.decoder - article decoder
|
||||
|
||||
import logging
|
||||
import hashlib
|
||||
import binascii
|
||||
from io import BytesIO
|
||||
from zlib import crc32
|
||||
from typing import Optional
|
||||
|
||||
import sabnzbd
|
||||
from sabnzbd.constants import SABCTOOLS_VERSION_REQUIRED
|
||||
from sabnzbd.encoding import ubtou
|
||||
from sabnzbd.nzbstuff import Article
|
||||
from sabnzbd.misc import match_str
|
||||
|
||||
@@ -50,7 +47,7 @@ except Exception:
|
||||
|
||||
|
||||
class BadData(Exception):
|
||||
def __init__(self, data: bytes):
|
||||
def __init__(self, data: bytearray):
|
||||
super().__init__()
|
||||
self.data = data
|
||||
|
||||
@@ -63,8 +60,8 @@ class BadUu(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def decode(article: Article, data_view: memoryview):
|
||||
decoded_data = None
|
||||
def decode(article: Article, decoder: sabctools.NNTPResponse):
|
||||
decoded_data: Optional[bytearray] = None
|
||||
nzo = article.nzf.nzo
|
||||
art_id = article.article
|
||||
|
||||
@@ -78,10 +75,10 @@ def decode(article: Article, data_view: memoryview):
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("Decoding %s", art_id)
|
||||
|
||||
if article.nzf.type == "uu":
|
||||
decoded_data = decode_uu(article, bytes(data_view))
|
||||
if decoder.format is sabctools.EncodingFormat.UU:
|
||||
decoded_data = decode_uu(article, decoder)
|
||||
else:
|
||||
decoded_data = decode_yenc(article, data_view)
|
||||
decoded_data = decode_yenc(article, decoder)
|
||||
|
||||
article_success = True
|
||||
|
||||
@@ -112,28 +109,18 @@ def decode(article: Article, data_view: memoryview):
|
||||
|
||||
except (BadYenc, ValueError):
|
||||
# Handles precheck and badly formed articles
|
||||
if nzo.precheck and data_view and data_view[:4] == b"223 ":
|
||||
if nzo.precheck and decoder.status_code == 223:
|
||||
# STAT was used, so we only get a status code
|
||||
article_success = True
|
||||
else:
|
||||
# Try uu-decoding
|
||||
if not nzo.precheck and article.nzf.type != "yenc":
|
||||
try:
|
||||
decoded_data = decode_uu(article, bytes(data_view))
|
||||
logging.debug("Found uu-encoded article %s in job %s", art_id, nzo.final_name)
|
||||
article_success = True
|
||||
except Exception:
|
||||
pass
|
||||
# Only bother with further checks if uu-decoding didn't work out
|
||||
if not article_success:
|
||||
# Convert the first 2000 bytes of raw socket data to article lines,
|
||||
# and examine the headers (for precheck) or body (for download).
|
||||
for line in bytes(data_view[:2000]).split(b"\r\n"):
|
||||
# Examine the headers (for precheck) or body (for download).
|
||||
if lines := decoder.lines:
|
||||
for line in lines:
|
||||
lline = line.lower()
|
||||
if lline.startswith(b"message-id:"):
|
||||
if lline.startswith("message-id:"):
|
||||
article_success = True
|
||||
# Look for DMCA clues (while skipping "X-" headers)
|
||||
if not lline.startswith(b"x-") and match_str(lline, (b"dmca", b"removed", b"cancel", b"blocked")):
|
||||
if not lline.startswith("x-") and match_str(lline, ("dmca", "removed", "cancel", "blocked")):
|
||||
article_success = False
|
||||
logging.info("Article removed from server (%s)", art_id)
|
||||
break
|
||||
@@ -170,164 +157,63 @@ def decode(article: Article, data_view: memoryview):
|
||||
sabnzbd.NzbQueue.register_article(article, article_success)
|
||||
|
||||
|
||||
def decode_yenc(article: Article, data_view: memoryview) -> bytearray:
|
||||
def decode_yenc(article: Article, response: sabctools.NNTPResponse) -> bytearray:
|
||||
# Let SABCTools do all the heavy lifting
|
||||
(
|
||||
decoded_data,
|
||||
yenc_filename,
|
||||
article.file_size,
|
||||
article.data_begin,
|
||||
article.data_size,
|
||||
crc_correct,
|
||||
) = sabctools.yenc_decode(data_view)
|
||||
decoded_data = response.data
|
||||
article.file_size = response.file_size
|
||||
article.data_begin = response.part_begin
|
||||
article.data_size = response.part_size
|
||||
|
||||
nzf = article.nzf
|
||||
# Assume it is yenc
|
||||
nzf.type = "yenc"
|
||||
|
||||
# Only set the name if it was found and not obfuscated
|
||||
if not nzf.filename_checked and yenc_filename:
|
||||
if not nzf.filename_checked and (file_name := response.file_name):
|
||||
# Set the md5-of-16k if this is the first article
|
||||
if article.lowest_partnum:
|
||||
nzf.md5of16k = hashlib.md5(decoded_data[:16384]).digest()
|
||||
nzf.md5of16k = hashlib.md5(memoryview(decoded_data)[:16384]).digest()
|
||||
|
||||
# Try the rename, even if it's not the first article
|
||||
# For example when the first article was missing
|
||||
nzf.nzo.verify_nzf_filename(nzf, yenc_filename)
|
||||
nzf.nzo.verify_nzf_filename(nzf, file_name)
|
||||
|
||||
# CRC check
|
||||
if crc_correct is None:
|
||||
if (crc := response.crc) is None:
|
||||
logging.info("CRC Error in %s", article.article)
|
||||
raise BadData(decoded_data)
|
||||
|
||||
article.crc32 = crc_correct
|
||||
article.crc32 = crc
|
||||
|
||||
return decoded_data
|
||||
|
||||
|
||||
def decode_uu(article: Article, raw_data: bytes) -> bytes:
|
||||
"""Try to uu-decode an article. The raw_data may or may not contain headers.
|
||||
If there are headers, they will be separated from the body by at least one
|
||||
empty line. In case of no headers, the first line seems to always be the nntp
|
||||
response code (220/222) directly followed by the msg body."""
|
||||
if not raw_data:
|
||||
def decode_uu(article: Article, response: sabctools.NNTPResponse) -> bytearray:
|
||||
"""Process a uu-decoded response"""
|
||||
if not response.bytes_decoded:
|
||||
logging.debug("No data to decode")
|
||||
raise BadUu
|
||||
|
||||
# Line up the raw_data
|
||||
raw_data = raw_data.split(b"\r\n")
|
||||
if response.baddata:
|
||||
raise BadData(response.data)
|
||||
|
||||
# Index of the uu payload start in raw_data
|
||||
uu_start = 0
|
||||
|
||||
# Limit the number of lines to check for the onset of uu data
|
||||
limit = min(len(raw_data), 32) - 1
|
||||
if limit < 3:
|
||||
logging.debug("Article too short to contain valid uu-encoded data")
|
||||
raise BadUu
|
||||
|
||||
# Try to find an empty line separating the body from headers or response
|
||||
# code and set the expected payload start to the next line.
|
||||
try:
|
||||
uu_start = raw_data[:limit].index(b"") + 1
|
||||
except ValueError:
|
||||
# No empty line, look for a response code instead
|
||||
if raw_data[0].startswith(b"220 ") or raw_data[0].startswith(b"222 "):
|
||||
uu_start = 1
|
||||
else:
|
||||
# Invalid data?
|
||||
logging.debug("Failed to locate start of uu payload")
|
||||
raise BadUu
|
||||
|
||||
def is_uu_junk(line: bytes) -> bool:
|
||||
"""Determine if the line is empty or contains known junk data"""
|
||||
return (not line) or line == b"-- " or line.startswith(b"Posted via ")
|
||||
|
||||
# Check the uu 'begin' line
|
||||
if article.lowest_partnum:
|
||||
try:
|
||||
# Make sure the line after the uu_start one isn't empty as well or
|
||||
# detection of the 'begin' line won't work. For articles other than
|
||||
# lowest_partnum, filtering out empty lines (and other junk) can
|
||||
# wait until the actual decoding step.
|
||||
for index in range(uu_start, limit):
|
||||
if is_uu_junk(raw_data[index]):
|
||||
uu_start = index + 1
|
||||
else:
|
||||
# Bingo
|
||||
break
|
||||
else:
|
||||
# Search reached the limit
|
||||
raise IndexError
|
||||
|
||||
uu_begin_data = raw_data[uu_start].split(b" ")
|
||||
# Filename may contain spaces
|
||||
uu_filename = ubtou(b" ".join(uu_begin_data[2:]).strip())
|
||||
|
||||
# Sanity check the 'begin' line
|
||||
if (
|
||||
len(uu_begin_data) < 3
|
||||
or uu_begin_data[0].lower() != b"begin"
|
||||
or (not int(uu_begin_data[1], 8))
|
||||
or (not uu_filename)
|
||||
):
|
||||
raise ValueError
|
||||
|
||||
# Consider this enough proof to set the type, avoiding further
|
||||
# futile attempts at decoding articles in this nzf as yenc.
|
||||
article.nzf.type = "uu"
|
||||
|
||||
# Bump the pointer for the payload to the next line
|
||||
uu_start += 1
|
||||
except Exception:
|
||||
logging.debug("Missing or invalid uu 'begin' line: %s", raw_data[uu_start] if uu_start < limit else None)
|
||||
raise BadUu
|
||||
|
||||
# Do the actual decoding
|
||||
with BytesIO() as decoded_data:
|
||||
for line in raw_data[uu_start:]:
|
||||
# Ignore junk
|
||||
if is_uu_junk(line):
|
||||
continue
|
||||
|
||||
# End of the article
|
||||
if line in (b"`", b"end", b"."):
|
||||
break
|
||||
|
||||
# Remove dot stuffing
|
||||
if line.startswith(b".."):
|
||||
line = line[1:]
|
||||
|
||||
try:
|
||||
decoded_line = binascii.a2b_uu(line)
|
||||
except binascii.Error as msg:
|
||||
try:
|
||||
# Workaround for broken uuencoders by Fredrik Lundh
|
||||
nbytes = (((line[0] - 32) & 63) * 4 + 5) // 3
|
||||
decoded_line = binascii.a2b_uu(line[:nbytes])
|
||||
except Exception as msg2:
|
||||
logging.info(
|
||||
"Error while uu-decoding %s: %s (line: %s; workaround: %s)", article.article, msg, line, msg2
|
||||
)
|
||||
raise BadData(decoded_data.getvalue())
|
||||
|
||||
# Store the decoded data
|
||||
decoded_data.write(decoded_line)
|
||||
|
||||
# Set the type to uu; the latter is still needed in
|
||||
# case the lowest_partnum article was damaged or slow to download.
|
||||
article.nzf.type = "uu"
|
||||
decoded_data = response.data
|
||||
nzf = article.nzf
|
||||
nzf.type = "uu"
|
||||
|
||||
# Only set the name if it was found and not obfuscated
|
||||
if not nzf.filename_checked and (file_name := response.file_name):
|
||||
# Set the md5-of-16k if this is the first article
|
||||
if article.lowest_partnum:
|
||||
decoded_data.seek(0)
|
||||
article.nzf.md5of16k = hashlib.md5(decoded_data.read(16384)).digest()
|
||||
# Handle the filename
|
||||
if not article.nzf.filename_checked and uu_filename:
|
||||
article.nzf.nzo.verify_nzf_filename(article.nzf, uu_filename)
|
||||
nzf.md5of16k = hashlib.md5(memoryview(decoded_data)[:16384]).digest()
|
||||
|
||||
data = decoded_data.getvalue()
|
||||
article.crc32 = crc32(data)
|
||||
return data
|
||||
# Try the rename, even if it's not the first article
|
||||
# For example when the first article was missing
|
||||
nzf.nzo.verify_nzf_filename(nzf, file_name)
|
||||
|
||||
article.crc32 = response.crc
|
||||
|
||||
return decoded_data
|
||||
|
||||
|
||||
def search_new_server(article: Article) -> bool:
|
||||
|
||||
@@ -19,15 +19,18 @@
|
||||
sabnzbd.downloader - download engine
|
||||
"""
|
||||
|
||||
import select
|
||||
import logging
|
||||
import selectors
|
||||
from collections import deque
|
||||
from threading import Thread, RLock, current_thread
|
||||
import socket
|
||||
import sys
|
||||
import ssl
|
||||
import time
|
||||
from datetime import date
|
||||
from typing import Optional, Union
|
||||
from typing import Optional, Union, Deque
|
||||
|
||||
import sabctools
|
||||
|
||||
import sabnzbd
|
||||
from sabnzbd.decorators import synchronized, NzbQueueLocker, DOWNLOADER_CV, DOWNLOADER_LOCK
|
||||
@@ -148,7 +151,7 @@ class Server:
|
||||
self.request: bool = False # True if a getaddrinfo() request is pending
|
||||
self.have_body: bool = True # Assume server has "BODY", until proven otherwise
|
||||
self.have_stat: bool = True # Assume server has "STAT", until proven otherwise
|
||||
self.article_queue: list[sabnzbd.nzbstuff.Article] = []
|
||||
self.article_queue: Deque[sabnzbd.nzbstuff.Article] = deque()
|
||||
|
||||
# Skip during server testing
|
||||
if threads:
|
||||
@@ -173,19 +176,19 @@ class Server:
|
||||
self.reset_article_queue()
|
||||
|
||||
@synchronized(DOWNLOADER_LOCK)
|
||||
def get_article(self):
|
||||
def get_article(self, peek: bool = False):
|
||||
"""Get article from pre-fetched and pre-fetch new ones if necessary.
|
||||
Articles that are too old for this server are immediately marked as tried"""
|
||||
if self.article_queue:
|
||||
return self.article_queue.pop(0)
|
||||
return self.article_queue[0] if peek else self.article_queue.popleft()
|
||||
|
||||
if self.next_article_search < time.time():
|
||||
# Pre-fetch new articles
|
||||
self.article_queue = sabnzbd.NzbQueue.get_articles(self, sabnzbd.Downloader.servers, _ARTICLE_PREFETCH)
|
||||
sabnzbd.NzbQueue.get_articles(self, sabnzbd.Downloader.servers, _ARTICLE_PREFETCH)
|
||||
if self.article_queue:
|
||||
article = self.article_queue.pop(0)
|
||||
article = self.article_queue[0] if peek else self.article_queue.popleft()
|
||||
# Mark expired articles as tried on this server
|
||||
if self.retention and article.nzf.nzo.avg_stamp < time.time() - self.retention:
|
||||
if not peek and self.retention and article.nzf.nzo.avg_stamp < time.time() - self.retention:
|
||||
sabnzbd.Downloader.decode(article)
|
||||
while self.article_queue:
|
||||
sabnzbd.Downloader.decode(self.article_queue.pop())
|
||||
@@ -201,9 +204,12 @@ class Server:
|
||||
"""Reset articles queued for the Server. Locked to prevent
|
||||
articles getting stuck in the Server when enabled/disabled"""
|
||||
logging.debug("Resetting article queue for %s (%s)", self, self.article_queue)
|
||||
for article in self.article_queue:
|
||||
article.allow_new_fetcher()
|
||||
self.article_queue = []
|
||||
while self.article_queue:
|
||||
try:
|
||||
article = self.article_queue.popleft()
|
||||
article.allow_new_fetcher()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
def request_addrinfo(self):
|
||||
"""Launch async request to resolve server address and select the fastest.
|
||||
@@ -250,7 +256,7 @@ class Downloader(Thread):
|
||||
"shutdown",
|
||||
"server_restarts",
|
||||
"force_disconnect",
|
||||
"read_fds",
|
||||
"selector",
|
||||
"servers",
|
||||
"timers",
|
||||
"last_max_chunk_size",
|
||||
@@ -290,7 +296,7 @@ class Downloader(Thread):
|
||||
|
||||
self.force_disconnect: bool = False
|
||||
|
||||
self.read_fds: dict[int, NewsWrapper] = {}
|
||||
self.selector: selectors.DefaultSelector = selectors.DefaultSelector()
|
||||
|
||||
self.servers: list[Server] = []
|
||||
self.timers: dict[str, list[float]] = {}
|
||||
@@ -361,15 +367,34 @@ class Downloader(Thread):
|
||||
self.servers.sort(key=lambda svr: "%02d%s" % (svr.priority, svr.displayname.lower()))
|
||||
|
||||
@synchronized(DOWNLOADER_LOCK)
|
||||
def add_socket(self, fileno: int, nw: NewsWrapper):
|
||||
"""Add a socket ready to be used to the list to be watched"""
|
||||
self.read_fds[fileno] = nw
|
||||
def add_socket(self, nw: NewsWrapper):
|
||||
"""Add a socket to be watched for read or write availability"""
|
||||
if nw.nntp:
|
||||
try:
|
||||
self.selector.register(nw.nntp.fileno, selectors.EVENT_READ | selectors.EVENT_WRITE, nw)
|
||||
nw.selector_events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
@synchronized(DOWNLOADER_LOCK)
|
||||
def modify_socket(self, nw: NewsWrapper, events: int):
|
||||
"""Modify the events socket are watched for"""
|
||||
if nw.nntp and nw.selector_events != events:
|
||||
try:
|
||||
self.selector.modify(nw.nntp.fileno, events, nw)
|
||||
nw.selector_events = events
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
@synchronized(DOWNLOADER_LOCK)
|
||||
def remove_socket(self, nw: NewsWrapper):
|
||||
"""Remove a socket to be watched"""
|
||||
if nw.nntp:
|
||||
self.read_fds.pop(nw.nntp.fileno, None)
|
||||
try:
|
||||
self.selector.unregister(nw.nntp.fileno)
|
||||
nw.selector_events = 0
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
@NzbQueueLocker
|
||||
def set_paused_state(self, state: bool):
|
||||
@@ -509,26 +534,30 @@ class Downloader(Thread):
|
||||
|
||||
# Remove all connections to server
|
||||
for nw in server.idle_threads | server.busy_threads:
|
||||
self.__reset_nw(nw, "Forcing disconnect", warn=False, wait=False, retry_article=False)
|
||||
self.reset_nw(nw, "Forcing disconnect", warn=False, wait=False, retry_article=False)
|
||||
|
||||
# Make sure server address resolution is refreshed
|
||||
server.addrinfo = None
|
||||
|
||||
@staticmethod
|
||||
def decode(article, data_view: Optional[memoryview] = None):
|
||||
def decode(article: "sabnzbd.nzbstuff.Article", response: Optional[sabctools.NNTPResponse] = None):
|
||||
"""Decode article"""
|
||||
# Need a better way of draining requests
|
||||
if article.nzf.nzo.removed_from_queue:
|
||||
return
|
||||
|
||||
# Article was requested and fetched, update article stats for the server
|
||||
sabnzbd.BPSMeter.register_server_article_tried(article.fetcher.id)
|
||||
|
||||
# Handle broken articles directly
|
||||
if not data_view:
|
||||
if not response or not response.bytes_decoded and not article.nzf.nzo.precheck:
|
||||
if not article.search_new_server():
|
||||
article.nzf.nzo.increase_bad_articles_counter("missing_articles")
|
||||
sabnzbd.NzbQueue.register_article(article, success=False)
|
||||
return
|
||||
|
||||
# Decode and send to article cache
|
||||
sabnzbd.decoder.decode(article, data_view)
|
||||
sabnzbd.decoder.decode(article, response)
|
||||
|
||||
def run(self):
|
||||
# Warn if there are servers defined, but none are valid
|
||||
@@ -548,7 +577,7 @@ class Downloader(Thread):
|
||||
for _ in range(cfg.receive_threads()):
|
||||
# Started as daemon, so we don't need any shutdown logic in the worker
|
||||
# The Downloader code will make sure shutdown is handled gracefully
|
||||
Thread(target=self.process_nw_worker, args=(self.read_fds, process_nw_queue), daemon=True).start()
|
||||
Thread(target=self.process_nw_worker, args=(process_nw_queue,), daemon=True).start()
|
||||
|
||||
# Catch all errors, just in case
|
||||
try:
|
||||
@@ -570,9 +599,9 @@ class Downloader(Thread):
|
||||
if (nw.nntp and nw.nntp.error_msg) or (nw.timeout and now > nw.timeout):
|
||||
if nw.nntp and nw.nntp.error_msg:
|
||||
# Already showed error
|
||||
self.__reset_nw(nw)
|
||||
self.reset_nw(nw)
|
||||
else:
|
||||
self.__reset_nw(nw, "Timed out", warn=True)
|
||||
self.reset_nw(nw, "Timed out", warn=True)
|
||||
server.bad_cons += 1
|
||||
self.maybe_block_server(server)
|
||||
|
||||
@@ -612,15 +641,14 @@ class Downloader(Thread):
|
||||
server.request_addrinfo()
|
||||
break
|
||||
|
||||
nw.article = server.get_article()
|
||||
if not nw.article:
|
||||
if not server.get_article(peek=True):
|
||||
break
|
||||
|
||||
server.idle_threads.remove(nw)
|
||||
server.busy_threads.add(nw)
|
||||
|
||||
if nw.connected:
|
||||
self.__request_article(nw)
|
||||
self.add_socket(nw)
|
||||
else:
|
||||
try:
|
||||
logging.info("%s@%s: Initiating connection", nw.thrdnum, server.host)
|
||||
@@ -632,14 +660,14 @@ class Downloader(Thread):
|
||||
server.host,
|
||||
sys.exc_info()[1],
|
||||
)
|
||||
self.__reset_nw(nw, "Failed to initialize", warn=True)
|
||||
self.reset_nw(nw, "Failed to initialize", warn=True)
|
||||
|
||||
if self.force_disconnect or self.shutdown:
|
||||
for server in self.servers:
|
||||
for nw in server.idle_threads | server.busy_threads:
|
||||
# Send goodbye if we have open socket
|
||||
if nw.nntp:
|
||||
self.__reset_nw(nw, "Forcing disconnect", wait=False, count_article_try=False)
|
||||
self.reset_nw(nw, "Forcing disconnect", wait=False, count_article_try=False)
|
||||
# Make sure server address resolution is refreshed
|
||||
server.addrinfo = None
|
||||
server.reset_article_queue()
|
||||
@@ -663,10 +691,12 @@ class Downloader(Thread):
|
||||
self.last_max_chunk_size = 0
|
||||
|
||||
# Use select to find sockets ready for reading/writing
|
||||
if readkeys := self.read_fds.keys():
|
||||
read, _, _ = select.select(readkeys, (), (), 1.0)
|
||||
if self.selector.get_map():
|
||||
if events := self.selector.select(timeout=1.0):
|
||||
for key, ev in events:
|
||||
process_nw_queue.put((key.data, ev))
|
||||
else:
|
||||
read = []
|
||||
events = []
|
||||
BPSMeter.reset()
|
||||
time.sleep(0.1)
|
||||
self.max_chunk_size = _DEFAULT_CHUNK_SIZE
|
||||
@@ -685,58 +715,65 @@ class Downloader(Thread):
|
||||
next_bpsmeter_update = now + _BPSMETER_UPDATE_DELAY
|
||||
self.check_assembler_levels()
|
||||
|
||||
if not read:
|
||||
if not events:
|
||||
continue
|
||||
|
||||
# Submit all readable sockets to be processed and wait for completion
|
||||
process_nw_queue.put_multiple(read)
|
||||
# Wait for socket operation completion
|
||||
process_nw_queue.join()
|
||||
|
||||
except Exception:
|
||||
logging.error(T("Fatal error in Downloader"), exc_info=True)
|
||||
|
||||
def process_nw_worker(self, read_fds: dict[int, NewsWrapper], nw_queue: MultiAddQueue):
|
||||
def process_nw_worker(self, nw_queue: MultiAddQueue):
|
||||
"""Worker for the daemon thread to process results.
|
||||
Wrapped in try/except because in case of an exception, logging
|
||||
might get lost and the queue.join() would block forever."""
|
||||
try:
|
||||
logging.debug("Starting Downloader receive thread: %s", current_thread().name)
|
||||
while True:
|
||||
# The read_fds is passed by reference, so we can access its items!
|
||||
self.process_nw(read_fds[nw_queue.get()])
|
||||
self.process_nw(*nw_queue.get())
|
||||
nw_queue.task_done()
|
||||
except Exception:
|
||||
# We cannot break out of the Downloader from here, so just pause
|
||||
logging.error(T("Fatal error in Downloader"), exc_info=True)
|
||||
self.pause()
|
||||
|
||||
def process_nw(self, nw: NewsWrapper):
|
||||
def process_nw(self, nw: NewsWrapper, event: int):
|
||||
"""Receive data from a NewsWrapper and handle the response"""
|
||||
try:
|
||||
bytes_received, end_of_line, article_done = nw.recv_chunk()
|
||||
except ssl.SSLWantReadError:
|
||||
return
|
||||
except (ConnectionError, ConnectionAbortedError):
|
||||
# The ConnectionAbortedError is also thrown by sabctools in case of fatal SSL-layer problems
|
||||
self.__reset_nw(nw, "Server closed connection", wait=False)
|
||||
return
|
||||
except BufferError:
|
||||
# The BufferError is thrown when exceeding maximum buffer size
|
||||
# Make sure to discard the article
|
||||
self.__reset_nw(nw, "Maximum data buffer size exceeded", wait=False, retry_article=False)
|
||||
return
|
||||
if event & selectors.EVENT_READ:
|
||||
self.process_nw_read(nw)
|
||||
if event & selectors.EVENT_WRITE:
|
||||
nw.write()
|
||||
|
||||
def process_nw_read(self, nw: NewsWrapper) -> None:
|
||||
bytes_received: int = 0
|
||||
bytes_pending: int = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
n, bytes_pending = nw.read(nbytes=bytes_pending)
|
||||
bytes_received += n
|
||||
except ssl.SSLWantReadError:
|
||||
return
|
||||
except (ConnectionError, ConnectionAbortedError):
|
||||
# The ConnectionAbortedError is also thrown by sabctools in case of fatal SSL-layer problems
|
||||
self.reset_nw(nw, "Server closed connection", wait=False)
|
||||
return
|
||||
except BufferError:
|
||||
# The BufferError is thrown when exceeding maximum buffer size
|
||||
# Make sure to discard the article
|
||||
self.reset_nw(nw, "Maximum data buffer size exceeded", wait=False, retry_article=False)
|
||||
return
|
||||
|
||||
if not bytes_pending:
|
||||
break
|
||||
|
||||
article = nw.article
|
||||
server = nw.server
|
||||
|
||||
with DOWNLOADER_LOCK:
|
||||
sabnzbd.BPSMeter.update(server.id, bytes_received)
|
||||
if bytes_received > self.last_max_chunk_size:
|
||||
self.last_max_chunk_size = bytes_received
|
||||
# Update statistics only when we fetched a whole article
|
||||
# The side effect is that we don't count things like article-not-available messages
|
||||
if article_done:
|
||||
article.nzf.nzo.update_download_stats(sabnzbd.BPSMeter.bps, server.id, nw.data_position)
|
||||
# Check speedlimit
|
||||
if (
|
||||
self.bandwidth_limit
|
||||
@@ -747,99 +784,6 @@ class Downloader(Thread):
|
||||
time.sleep(0.01)
|
||||
sabnzbd.BPSMeter.update()
|
||||
|
||||
# If we are not at the end of a line, more data will follow
|
||||
if not end_of_line:
|
||||
return
|
||||
|
||||
# Response code depends on request command:
|
||||
# 220 = ARTICLE, 222 = BODY
|
||||
if nw.status_code not in (220, 222) and not article_done:
|
||||
if not nw.connected or nw.status_code == 480:
|
||||
if not self.__finish_connect_nw(nw):
|
||||
return
|
||||
if nw.connected:
|
||||
logging.info("Connecting %s@%s finished", nw.thrdnum, nw.server.host)
|
||||
self.__request_article(nw)
|
||||
|
||||
elif nw.status_code == 223:
|
||||
article_done = True
|
||||
logging.debug("Article <%s> is present on %s", article.article, nw.server.host)
|
||||
|
||||
elif nw.status_code in (411, 423, 430, 451):
|
||||
article_done = True
|
||||
logging.debug(
|
||||
"Thread %s@%s: Article %s missing (error=%s)",
|
||||
nw.thrdnum,
|
||||
nw.server.host,
|
||||
article.article,
|
||||
nw.status_code,
|
||||
)
|
||||
nw.reset_data_buffer()
|
||||
|
||||
elif nw.status_code == 500:
|
||||
if article.nzf.nzo.precheck:
|
||||
# Did we try "STAT" already?
|
||||
if not server.have_stat:
|
||||
# Hopless server, just discard
|
||||
logging.info("Server %s does not support STAT or HEAD, precheck not possible", server.host)
|
||||
article_done = True
|
||||
else:
|
||||
# Assume "STAT" command is not supported
|
||||
server.have_stat = False
|
||||
logging.debug("Server %s does not support STAT, trying HEAD", server.host)
|
||||
else:
|
||||
# Assume "BODY" command is not supported
|
||||
server.have_body = False
|
||||
logging.debug("Server %s does not support BODY", server.host)
|
||||
nw.reset_data_buffer()
|
||||
self.__request_article(nw)
|
||||
|
||||
else:
|
||||
# Don't warn for (internal) server errors during downloading
|
||||
if nw.status_code not in (400, 502, 503):
|
||||
logging.warning(
|
||||
T("%s@%s: Received unknown status code %s for article %s"),
|
||||
nw.thrdnum,
|
||||
nw.server.host,
|
||||
nw.status_code,
|
||||
article.article,
|
||||
)
|
||||
|
||||
# Ditch this thread, we don't know what data we got now so the buffer can be bad
|
||||
self.__reset_nw(nw, f"Server error or unknown status code: {nw.status_code}", wait=False)
|
||||
return
|
||||
|
||||
if article_done:
|
||||
# Successful data, clear "bad" counter
|
||||
server.bad_cons = 0
|
||||
server.errormsg = server.warning = ""
|
||||
|
||||
# Decode
|
||||
self.decode(article, nw.data_view[: nw.data_position])
|
||||
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("Thread %s@%s: %s done", nw.thrdnum, server.host, article.article)
|
||||
|
||||
# Reset connection for new activity
|
||||
nw.soft_reset()
|
||||
|
||||
# Request a new article immediately if possible
|
||||
if (
|
||||
nw.connected
|
||||
and server.active
|
||||
and not server.restart
|
||||
and not (self.paused or self.shutdown or self.paused_for_postproc)
|
||||
):
|
||||
nw.article = server.get_article()
|
||||
if nw.article:
|
||||
self.__request_article(nw)
|
||||
return
|
||||
|
||||
# Make socket available again
|
||||
server.busy_threads.discard(nw)
|
||||
server.idle_threads.add(nw)
|
||||
self.remove_socket(nw)
|
||||
|
||||
def check_assembler_levels(self):
|
||||
"""Check the Assembler queue to see if we need to delay, depending on queue size"""
|
||||
if (assembler_level := sabnzbd.Assembler.queue_level()) > SOFT_ASSEMBLER_QUEUE_LIMIT:
|
||||
@@ -865,13 +809,12 @@ class Downloader(Thread):
|
||||
logged_counter += 1
|
||||
|
||||
@synchronized(DOWNLOADER_LOCK)
|
||||
def __finish_connect_nw(self, nw: NewsWrapper) -> bool:
|
||||
def finish_connect_nw(self, nw: NewsWrapper, response: sabctools.NNTPResponse) -> bool:
|
||||
server = nw.server
|
||||
try:
|
||||
nw.finish_connect(nw.status_code)
|
||||
nw.finish_connect(response.status_code, response.message)
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("%s@%s last message -> %s", nw.thrdnum, server.host, nw.nntp_msg)
|
||||
nw.reset_data_buffer()
|
||||
logging.debug("%s@%s last message -> %d", nw.thrdnum, server.host, response.status_code)
|
||||
except NNTPPermanentError as error:
|
||||
# Handle login problems
|
||||
block = False
|
||||
@@ -884,7 +827,7 @@ class Downloader(Thread):
|
||||
errormsg = T("Too many connections to server %s [%s]") % (server.host, error.msg)
|
||||
if server.active:
|
||||
# Don't count this for the tries (max_art_tries) on this server
|
||||
self.__reset_nw(nw)
|
||||
self.reset_nw(nw)
|
||||
self.plan_server(server, _PENALTY_TOOMANY)
|
||||
elif error.code in (502, 481, 482) and clues_too_many_ip(error.msg):
|
||||
# Login from (too many) different IP addresses
|
||||
@@ -934,7 +877,7 @@ class Downloader(Thread):
|
||||
if penalty and (block or server.optional):
|
||||
self.plan_server(server, penalty)
|
||||
# Note that the article is discard for this server if the server is not required
|
||||
self.__reset_nw(nw, retry_article=retry_article)
|
||||
self.reset_nw(nw, retry_article=retry_article)
|
||||
return False
|
||||
except Exception as err:
|
||||
logging.error(
|
||||
@@ -945,11 +888,11 @@ class Downloader(Thread):
|
||||
)
|
||||
logging.info("Traceback: ", exc_info=True)
|
||||
# No reset-warning needed, above logging is sufficient
|
||||
self.__reset_nw(nw, retry_article=False)
|
||||
self.reset_nw(nw, retry_article=False)
|
||||
return True
|
||||
|
||||
@synchronized(DOWNLOADER_LOCK)
|
||||
def __reset_nw(
|
||||
def reset_nw(
|
||||
self,
|
||||
nw: NewsWrapper,
|
||||
reset_msg: Optional[str] = None,
|
||||
@@ -957,6 +900,7 @@ class Downloader(Thread):
|
||||
wait: bool = True,
|
||||
count_article_try: bool = True,
|
||||
retry_article: bool = True,
|
||||
article: Optional["sabnzbd.nzbstuff.Article"] = None,
|
||||
):
|
||||
# Some warnings are errors, and not added as server.warning
|
||||
if warn and reset_msg:
|
||||
@@ -972,20 +916,8 @@ class Downloader(Thread):
|
||||
# Make sure it is not in the readable sockets
|
||||
self.remove_socket(nw)
|
||||
|
||||
if nw.article and not nw.article.nzf.nzo.removed_from_queue:
|
||||
# Only some errors should count towards the total tries for each server
|
||||
if count_article_try:
|
||||
nw.article.tries += 1
|
||||
|
||||
# Do we discard, or try again for this server
|
||||
if not retry_article or (not nw.server.required and nw.article.tries > cfg.max_art_tries()):
|
||||
# Too many tries on this server, consider article missing
|
||||
self.decode(nw.article)
|
||||
nw.article.tries = 0
|
||||
else:
|
||||
# Allow all servers again for this article
|
||||
# Do not use the article_queue, as the server could already have been disabled when we get here!
|
||||
nw.article.allow_new_fetcher()
|
||||
# Discard the article request which failed
|
||||
nw.discard(article, count_article_try=count_article_try, retry_article=retry_article)
|
||||
|
||||
# Reset connection object
|
||||
nw.hard_reset(wait)
|
||||
@@ -993,21 +925,6 @@ class Downloader(Thread):
|
||||
# Empty SSL info, it might change on next connect
|
||||
nw.server.ssl_info = ""
|
||||
|
||||
def __request_article(self, nw: NewsWrapper):
|
||||
try:
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("Thread %s@%s: BODY %s", nw.thrdnum, nw.server.host, nw.article.article)
|
||||
nw.body()
|
||||
# Mark as ready to be read
|
||||
self.add_socket(nw.nntp.fileno, nw)
|
||||
except socket.error as err:
|
||||
logging.info("Looks like server closed connection: %s", err)
|
||||
self.__reset_nw(nw, "Server broke off connection", warn=True)
|
||||
except Exception:
|
||||
logging.error(T("Suspect error in downloader"))
|
||||
logging.info("Traceback: ", exc_info=True)
|
||||
self.__reset_nw(nw, "Server broke off connection", warn=True)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Timed restart of servers admin.
|
||||
# For each server all planned events are kept in a list.
|
||||
|
||||
@@ -913,6 +913,7 @@ SPECIAL_VALUE_LIST = (
|
||||
"ssdp_broadcast_interval",
|
||||
"unrar_parameters",
|
||||
"outgoing_nntp_ip",
|
||||
"pipelining_requests",
|
||||
)
|
||||
SPECIAL_LIST_LIST = (
|
||||
"rss_odd_titles",
|
||||
|
||||
@@ -21,20 +21,22 @@ sabnzbd.newswrapper
|
||||
|
||||
import errno
|
||||
import socket
|
||||
import threading
|
||||
from collections import deque
|
||||
from selectors import EVENT_READ, EVENT_WRITE
|
||||
from threading import Thread
|
||||
import time
|
||||
import logging
|
||||
import ssl
|
||||
import sabctools
|
||||
from typing import Optional, Union
|
||||
from typing import Optional, Tuple, Union, Callable
|
||||
|
||||
import sabctools
|
||||
import sabnzbd
|
||||
import sabnzbd.cfg
|
||||
from sabnzbd.constants import DEF_NETWORKING_TIMEOUT, NNTP_BUFFER_SIZE, NTTP_MAX_BUFFER_SIZE
|
||||
from sabnzbd.encoding import utob, ubtou
|
||||
from sabnzbd.constants import DEF_NETWORKING_TIMEOUT, NNTP_BUFFER_SIZE, Status, FORCE_PRIORITY
|
||||
from sabnzbd.encoding import utob
|
||||
from sabnzbd.get_addrinfo import AddrInfo
|
||||
from sabnzbd.decorators import synchronized, DOWNLOADER_LOCK
|
||||
from sabnzbd.misc import int_conv
|
||||
|
||||
# Set pre-defined socket timeout
|
||||
socket.setdefaulttimeout(DEF_NETWORKING_TIMEOUT)
|
||||
@@ -57,10 +59,8 @@ class NewsWrapper:
|
||||
"thrdnum",
|
||||
"blocking",
|
||||
"timeout",
|
||||
"article",
|
||||
"data",
|
||||
"data_view",
|
||||
"data_position",
|
||||
"decoder",
|
||||
"send_buffer",
|
||||
"nntp",
|
||||
"connected",
|
||||
"user_sent",
|
||||
@@ -69,6 +69,11 @@ class NewsWrapper:
|
||||
"user_ok",
|
||||
"pass_ok",
|
||||
"force_login",
|
||||
"next_request",
|
||||
"concurrent_requests",
|
||||
"_response_queue",
|
||||
"selector_events",
|
||||
"lock",
|
||||
)
|
||||
|
||||
def __init__(self, server, thrdnum, block=False):
|
||||
@@ -77,11 +82,9 @@ class NewsWrapper:
|
||||
self.blocking: bool = block
|
||||
|
||||
self.timeout: Optional[float] = None
|
||||
self.article: Optional[sabnzbd.nzbstuff.Article] = None
|
||||
|
||||
self.data: Optional[bytearray] = None
|
||||
self.data_view: Optional[memoryview] = None
|
||||
self.data_position: int = 0
|
||||
self.decoder: Optional[sabctools.Decoder] = None
|
||||
self.send_buffer = b""
|
||||
|
||||
self.nntp: Optional[NNTP] = None
|
||||
|
||||
@@ -93,14 +96,22 @@ class NewsWrapper:
|
||||
self.force_login: bool = False
|
||||
self.group: Optional[str] = None
|
||||
|
||||
@property
|
||||
def status_code(self) -> Optional[int]:
|
||||
if self.data_position >= 3:
|
||||
return int_conv(self.data[:3])
|
||||
# Command queue and concurrency
|
||||
self.next_request: Optional[tuple[bytes, Optional["sabnzbd.nzbstuff.Article"]]] = None
|
||||
self.concurrent_requests: threading.BoundedSemaphore = threading.BoundedSemaphore(
|
||||
sabnzbd.cfg.pipelining_requests()
|
||||
)
|
||||
self._response_queue: deque[Optional[sabnzbd.nzbstuff.Article]] = deque()
|
||||
self.selector_events = 0
|
||||
self.lock: threading.Lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def nntp_msg(self) -> str:
|
||||
return ubtou(self.data[: self.data_position]).strip()
|
||||
def article(self) -> Optional["sabnzbd.nzbstuff.Article"]:
|
||||
"""The article currently being downloaded"""
|
||||
with self.lock:
|
||||
if self._response_queue:
|
||||
return self._response_queue[0]
|
||||
return None
|
||||
|
||||
def init_connect(self):
|
||||
"""Setup the connection in NNTP object"""
|
||||
@@ -109,13 +120,15 @@ class NewsWrapper:
|
||||
raise socket.error(errno.EADDRNOTAVAIL, T("Invalid server address."))
|
||||
|
||||
# Construct buffer and NNTP object
|
||||
self.data = sabctools.bytearray_malloc(NNTP_BUFFER_SIZE)
|
||||
self.data_view = memoryview(self.data)
|
||||
self.reset_data_buffer()
|
||||
self.decoder = sabctools.Decoder(NNTP_BUFFER_SIZE)
|
||||
self.nntp = NNTP(self, self.server.addrinfo)
|
||||
self.timeout = time.time() + self.server.timeout
|
||||
|
||||
def finish_connect(self, code: int):
|
||||
# On connect the first "response" will be 200 Welcome
|
||||
self._response_queue.append(None)
|
||||
self.concurrent_requests.acquire()
|
||||
|
||||
def finish_connect(self, code: int, message: str) -> None:
|
||||
"""Perform login options"""
|
||||
if not (self.server.username or self.server.password or self.force_login):
|
||||
self.connected = True
|
||||
@@ -133,11 +146,10 @@ class NewsWrapper:
|
||||
self.pass_ok = False
|
||||
|
||||
if code in (400, 500, 502):
|
||||
raise NNTPPermanentError(self.nntp_msg, code)
|
||||
raise NNTPPermanentError(message, code)
|
||||
elif not self.user_sent:
|
||||
command = utob("authinfo user %s\r\n" % self.server.username)
|
||||
self.nntp.sock.sendall(command)
|
||||
self.reset_data_buffer()
|
||||
self.queue_command(command)
|
||||
self.user_sent = True
|
||||
elif not self.user_ok:
|
||||
if code == 381:
|
||||
@@ -151,98 +163,254 @@ class NewsWrapper:
|
||||
|
||||
if self.user_ok and not self.pass_sent:
|
||||
command = utob("authinfo pass %s\r\n" % self.server.password)
|
||||
self.nntp.sock.sendall(command)
|
||||
self.reset_data_buffer()
|
||||
self.queue_command(command)
|
||||
self.pass_sent = True
|
||||
elif self.user_ok and not self.pass_ok:
|
||||
if code != 281:
|
||||
# Assume that login failed (code 481 or other)
|
||||
raise NNTPPermanentError(self.nntp_msg, code)
|
||||
raise NNTPPermanentError(message, code)
|
||||
else:
|
||||
self.connected = True
|
||||
|
||||
self.timeout = time.time() + self.server.timeout
|
||||
|
||||
def body(self):
|
||||
def queue_command(
|
||||
self,
|
||||
command: bytes,
|
||||
article: Optional["sabnzbd.nzbstuff.Article"] = None,
|
||||
) -> None:
|
||||
"""Add a command to the command queue"""
|
||||
self.next_request = command, article
|
||||
|
||||
def body(self, article: "sabnzbd.nzbstuff.Article") -> tuple[bytes, "sabnzbd.nzbstuff.Article"]:
|
||||
"""Request the body of the article"""
|
||||
self.timeout = time.time() + self.server.timeout
|
||||
if self.article.nzf.nzo.precheck:
|
||||
if article.nzf.nzo.precheck:
|
||||
if self.server.have_stat:
|
||||
command = utob("STAT <%s>\r\n" % self.article.article)
|
||||
command = utob("STAT <%s>\r\n" % article.article)
|
||||
else:
|
||||
command = utob("HEAD <%s>\r\n" % self.article.article)
|
||||
command = utob("HEAD <%s>\r\n" % article.article)
|
||||
elif self.server.have_body:
|
||||
command = utob("BODY <%s>\r\n" % self.article.article)
|
||||
command = utob("BODY <%s>\r\n" % article.article)
|
||||
else:
|
||||
command = utob("ARTICLE <%s>\r\n" % self.article.article)
|
||||
self.nntp.sock.sendall(command)
|
||||
self.reset_data_buffer()
|
||||
command = utob("ARTICLE <%s>\r\n" % article.article)
|
||||
return command, article
|
||||
|
||||
def recv_chunk(self) -> tuple[int, bool, bool]:
|
||||
"""Receive data, return #bytes, end-of-line, end-of-article"""
|
||||
# Resize the buffer in the extremely unlikely case that it got full
|
||||
if self.data_position == len(self.data):
|
||||
self.nntp.nw.increase_data_buffer()
|
||||
def on_response(self, response: sabctools.NNTPResponse, article: Optional["sabnzbd.nzbstuff.Article"]) -> None:
|
||||
"""A response to a NNTP request is received"""
|
||||
self.concurrent_requests.release()
|
||||
sabnzbd.Downloader.modify_socket(self, EVENT_READ | EVENT_WRITE)
|
||||
server = self.server
|
||||
article_done = response.status_code in (220, 222) and article
|
||||
|
||||
# Receive data into the pre-allocated buffer
|
||||
if self.nntp.nw.server.ssl and not self.nntp.nw.blocking and sabctools.openssl_linked:
|
||||
if article_done:
|
||||
with DOWNLOADER_LOCK:
|
||||
# Update statistics only when we fetched a whole article
|
||||
# The side effect is that we don't count things like article-not-available messages
|
||||
article.nzf.nzo.update_download_stats(sabnzbd.BPSMeter.bps, server.id, response.bytes_read)
|
||||
|
||||
# Response code depends on request command:
|
||||
# 220 = ARTICLE, 222 = BODY
|
||||
if not article_done:
|
||||
if not self.connected or not article or response.status_code in (281, 381, 480, 481, 482):
|
||||
self.discard(article, count_article_try=False)
|
||||
if not sabnzbd.Downloader.finish_connect_nw(self, response):
|
||||
return
|
||||
if self.connected:
|
||||
logging.info("Connecting %s@%s finished", self.thrdnum, server.host)
|
||||
|
||||
elif response.status_code == 223:
|
||||
article_done = True
|
||||
logging.debug("Article <%s> is present on %s", article.article, server.host)
|
||||
|
||||
elif response.status_code in (411, 423, 430, 451):
|
||||
article_done = True
|
||||
logging.debug(
|
||||
"Thread %s@%s: Article %s missing (error=%s)",
|
||||
self.thrdnum,
|
||||
server.host,
|
||||
article.article,
|
||||
response.status_code,
|
||||
)
|
||||
|
||||
elif response.status_code == 500:
|
||||
if article.nzf.nzo.precheck:
|
||||
# Did we try "STAT" already?
|
||||
if not server.have_stat:
|
||||
# Hopless server, just discard
|
||||
logging.info("Server %s does not support STAT or HEAD, precheck not possible", server.host)
|
||||
article_done = True
|
||||
else:
|
||||
# Assume "STAT" command is not supported
|
||||
server.have_stat = False
|
||||
logging.debug("Server %s does not support STAT, trying HEAD", server.host)
|
||||
else:
|
||||
# Assume "BODY" command is not supported
|
||||
server.have_body = False
|
||||
logging.debug("Server %s does not support BODY", server.host)
|
||||
self.discard(article, count_article_try=False)
|
||||
|
||||
else:
|
||||
# Don't warn for (internal) server errors during downloading
|
||||
if response.status_code not in (400, 502, 503):
|
||||
logging.warning(
|
||||
T("%s@%s: Received unknown status code %s for article %s"),
|
||||
self.thrdnum,
|
||||
server.host,
|
||||
response.status_code,
|
||||
article.article,
|
||||
)
|
||||
|
||||
# Ditch this thread, we don't know what data we got now so the buffer can be bad
|
||||
sabnzbd.Downloader.reset_nw(
|
||||
self, f"Server error or unknown status code: {response.status_code}", wait=False, article=article
|
||||
)
|
||||
return
|
||||
|
||||
if article_done:
|
||||
# Successful data, clear "bad" counter
|
||||
server.bad_cons = 0
|
||||
server.errormsg = server.warning = ""
|
||||
|
||||
# Decode
|
||||
sabnzbd.Downloader.decode(article, response)
|
||||
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("Thread %s@%s: %s done", self.thrdnum, server.host, article.article)
|
||||
|
||||
def read(
|
||||
self,
|
||||
nbytes: int = 0,
|
||||
on_response: Optional[Callable[[int, str], None]] = None,
|
||||
) -> Tuple[int, Optional[int]]:
|
||||
"""Receive data, return #bytes, #pendingbytes
|
||||
:param nbytes: maximum number of bytes to read
|
||||
:param on_response: callback for each complete response received
|
||||
:return: #bytes, #pendingbytes
|
||||
"""
|
||||
# Receive data into the decoder pre-allocated buffer
|
||||
if not nbytes and self.nntp.nw.server.ssl and not self.nntp.nw.blocking and sabctools.openssl_linked:
|
||||
# Use patched version when downloading
|
||||
bytes_recv = sabctools.unlocked_ssl_recv_into(self.nntp.sock, self.data_view[self.data_position :])
|
||||
bytes_recv = sabctools.unlocked_ssl_recv_into(self.nntp.sock, self.decoder)
|
||||
else:
|
||||
bytes_recv = self.nntp.sock.recv_into(self.data_view[self.data_position :])
|
||||
bytes_recv = self.nntp.sock.recv_into(self.decoder, nbytes=nbytes)
|
||||
|
||||
# No data received
|
||||
if bytes_recv == 0:
|
||||
raise ConnectionError("Server closed connection")
|
||||
|
||||
# Success, move timeout and internal data position
|
||||
# Success, move timeout
|
||||
self.timeout = time.time() + self.server.timeout
|
||||
self.data_position += bytes_recv
|
||||
|
||||
self.decoder.process(bytes_recv)
|
||||
for response in self.decoder:
|
||||
with self.lock:
|
||||
article = self._response_queue.popleft()
|
||||
if on_response:
|
||||
on_response(response.status_code, response.message)
|
||||
self.on_response(response, article)
|
||||
|
||||
# The SSL-layer might still contain data even though the socket does not. Another Downloader-loop would
|
||||
# not identify this socket anymore as it is not returned by select(). So, we have to forcefully trigger
|
||||
# another recv_chunk so the buffer is increased and the data from the SSL-layer is read. See #2752.
|
||||
if self.nntp.nw.server.ssl and self.data_position == len(self.data) and self.nntp.sock.pending() > 0:
|
||||
# We do not perform error-handling, as we know there is data available to read
|
||||
additional_bytes_recv, additional_end_of_line, additional_end_of_article = self.recv_chunk()
|
||||
return bytes_recv + additional_bytes_recv, additional_end_of_line, additional_end_of_article
|
||||
if self.server.ssl and self.nntp and (pending := self.nntp.sock.pending()):
|
||||
return bytes_recv, pending
|
||||
return bytes_recv, None
|
||||
|
||||
# Check for end of line
|
||||
# Using the data directly seems faster than the memoryview
|
||||
if self.data[self.data_position - 2 : self.data_position] == b"\r\n":
|
||||
# Official end-of-article is "\r\n.\r\n"
|
||||
if self.data[self.data_position - 5 : self.data_position] == b"\r\n.\r\n":
|
||||
return bytes_recv, True, True
|
||||
return bytes_recv, True, False
|
||||
def write(self):
|
||||
"""Send data to server"""
|
||||
server = self.server
|
||||
|
||||
# Still in middle of data, so continue!
|
||||
return bytes_recv, False, False
|
||||
try:
|
||||
# First, try to flush any remaining data
|
||||
if self.send_buffer:
|
||||
sent = self.nntp.sock.send(self.send_buffer)
|
||||
self.send_buffer = self.send_buffer[sent:]
|
||||
if self.send_buffer:
|
||||
# Still unsent data, wait for next EVENT_WRITE
|
||||
return
|
||||
|
||||
def soft_reset(self):
|
||||
"""Reset for the next article"""
|
||||
self.timeout = None
|
||||
self.article = None
|
||||
self.reset_data_buffer()
|
||||
if self.connected:
|
||||
if (
|
||||
server.active
|
||||
and not server.restart
|
||||
and not (
|
||||
sabnzbd.Downloader.paused
|
||||
or sabnzbd.Downloader.shutdown
|
||||
or sabnzbd.Downloader.paused_for_postproc
|
||||
)
|
||||
):
|
||||
# Prepare the next request
|
||||
if not self.next_request and (article := server.get_article()):
|
||||
self.next_request = self.body(article)
|
||||
elif self.next_request and self.next_request[1]:
|
||||
# Discard the next request
|
||||
self.discard(self.next_request[1], count_article_try=False, retry_article=True)
|
||||
self.next_request = None
|
||||
|
||||
def reset_data_buffer(self):
|
||||
"""Reset the data position"""
|
||||
self.data_position = 0
|
||||
# If no pending buffer, try to send new command
|
||||
if not self.send_buffer and self.next_request:
|
||||
if self.concurrent_requests.acquire(blocking=False):
|
||||
command, article = self.next_request
|
||||
self.next_request = None
|
||||
if article:
|
||||
nzo = article.nzf.nzo
|
||||
if nzo.removed_from_queue or nzo.status is Status.PAUSED and nzo.priority is not FORCE_PRIORITY:
|
||||
self.discard(article, count_article_try=False, retry_article=True)
|
||||
self.concurrent_requests.release()
|
||||
return
|
||||
self._response_queue.append(article)
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("Thread %s@%s: %s", self.thrdnum, server.host, command)
|
||||
try:
|
||||
sent = self.nntp.sock.send(command)
|
||||
if sent < len(command):
|
||||
# Partial send, store remainder
|
||||
self.send_buffer = command[sent:]
|
||||
except (BlockingIOError, ssl.SSLWantWriteError):
|
||||
# Can't send now, store full command
|
||||
self.send_buffer = command
|
||||
else:
|
||||
# Concurrency limit reached
|
||||
sabnzbd.Downloader.modify_socket(self, EVENT_READ)
|
||||
else:
|
||||
# Is it safe to shut down this socket?
|
||||
if (
|
||||
not self.send_buffer
|
||||
and not self.next_request
|
||||
and not self._response_queue
|
||||
and (not server.active or server.restart or time.time() > self.timeout)
|
||||
):
|
||||
# Make socket available again
|
||||
server.busy_threads.discard(self)
|
||||
server.idle_threads.add(self)
|
||||
sabnzbd.Downloader.remove_socket(self)
|
||||
|
||||
def increase_data_buffer(self):
|
||||
"""Resize the buffer in the extremely unlikely case that it overflows"""
|
||||
# Sanity check before we go any further
|
||||
if len(self.data) > NTTP_MAX_BUFFER_SIZE:
|
||||
raise BufferError("Maximum data buffer size exceeded")
|
||||
|
||||
# Input needs to be integer, floats don't work
|
||||
new_buffer = sabctools.bytearray_malloc(len(self.data) + NNTP_BUFFER_SIZE // 2)
|
||||
new_buffer[: len(self.data)] = self.data
|
||||
logging.info("Increased buffer from %d to %d for %s", len(self.data), len(new_buffer), str(self))
|
||||
self.data = new_buffer
|
||||
self.data_view = memoryview(self.data)
|
||||
except (BlockingIOError, ssl.SSLWantWriteError):
|
||||
# Socket not currently writable — just try again later
|
||||
return
|
||||
except socket.error as err:
|
||||
logging.info("Looks like server closed connection: %s", err)
|
||||
sabnzbd.Downloader.reset_nw(self, "Server broke off connection", warn=True)
|
||||
except Exception:
|
||||
logging.error(T("Suspect error in downloader"))
|
||||
logging.info("Traceback: ", exc_info=True)
|
||||
sabnzbd.Downloader.reset_nw(self, "Server broke off connection", warn=True)
|
||||
|
||||
def hard_reset(self, wait: bool = True):
|
||||
"""Destroy and restart"""
|
||||
with self.lock:
|
||||
# Drain unsent requests
|
||||
if self.next_request:
|
||||
_, article = self.next_request
|
||||
if article:
|
||||
self.discard(article, count_article_try=False, retry_article=True)
|
||||
self.next_request = None
|
||||
# Drain responses
|
||||
while self._response_queue:
|
||||
if article := self._response_queue.popleft():
|
||||
self.discard(article, count_article_try=False, retry_article=True)
|
||||
|
||||
if self.nntp:
|
||||
self.nntp.close(send_quit=self.connected)
|
||||
self.nntp = None
|
||||
@@ -258,6 +426,28 @@ class NewsWrapper:
|
||||
# Reset for internal reasons, just wait 5 sec
|
||||
self.timeout = time.time() + 5
|
||||
|
||||
def discard(
|
||||
self,
|
||||
article: Optional["sabnzbd.nzbstuff.Article"],
|
||||
count_article_try: bool = True,
|
||||
retry_article: bool = True,
|
||||
) -> None:
|
||||
"""Discard an article back to the queue"""
|
||||
if article and not article.nzf.nzo.removed_from_queue:
|
||||
# Only some errors should count towards the total tries for each server
|
||||
if count_article_try:
|
||||
article.tries += 1
|
||||
|
||||
# Do we discard, or try again for this server
|
||||
if not retry_article or (not self.server.required and article.tries > sabnzbd.cfg.max_art_tries()):
|
||||
# Too many tries on this server, consider article missing
|
||||
sabnzbd.Downloader.decode(article)
|
||||
article.tries = 0
|
||||
else:
|
||||
# Allow all servers again for this article
|
||||
# Do not use the article_queue, as the server could already have been disabled when we get here!
|
||||
article.allow_new_fetcher()
|
||||
|
||||
def __repr__(self):
|
||||
return "<NewsWrapper: server=%s:%s, thread=%s, connected=%s>" % (
|
||||
self.server.host,
|
||||
@@ -379,7 +569,7 @@ class NNTP:
|
||||
# Locked, so it can't interleave with any of the Downloader "__nw" actions
|
||||
with DOWNLOADER_LOCK:
|
||||
if not self.closed:
|
||||
sabnzbd.Downloader.add_socket(self.fileno, self.nw)
|
||||
sabnzbd.Downloader.add_socket(self.nw)
|
||||
except OSError as e:
|
||||
self.error(e)
|
||||
|
||||
|
||||
@@ -692,7 +692,7 @@ class NzbQueue:
|
||||
return False
|
||||
return False
|
||||
|
||||
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> list[Article]:
|
||||
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> None:
|
||||
"""Get next article for jobs in the queue
|
||||
Not locked for performance, since it only reads the queue
|
||||
"""
|
||||
@@ -705,12 +705,12 @@ class NzbQueue:
|
||||
and not nzo.propagation_delay_left
|
||||
) or nzo.priority == FORCE_PRIORITY:
|
||||
if not nzo.server_in_try_list(server):
|
||||
if articles := nzo.get_articles(server, servers, fetch_limit):
|
||||
return articles
|
||||
nzo.get_articles(server, servers, fetch_limit)
|
||||
if server.article_queue:
|
||||
break
|
||||
# Stop after first job that wasn't paused/propagating/etc
|
||||
if self.__top_only:
|
||||
return []
|
||||
return []
|
||||
break
|
||||
|
||||
def register_article(self, article: Article, success: bool = True):
|
||||
"""Register the articles we tried
|
||||
@@ -893,11 +893,14 @@ class NzbQueue:
|
||||
|
||||
if nzf.all_servers_in_try_list(active_servers):
|
||||
# Check for articles where all active servers have already been tried
|
||||
for article in nzf.articles[:]:
|
||||
if article.all_servers_in_try_list(active_servers):
|
||||
logging.debug("Removing article %s with bad trylist in file %s", article, nzf.filename)
|
||||
nzo.increase_bad_articles_counter("missing_articles")
|
||||
sabnzbd.NzbQueue.register_article(article, success=False)
|
||||
with nzf:
|
||||
for article in nzf.articles:
|
||||
if article.all_servers_in_try_list(active_servers):
|
||||
logging.debug(
|
||||
"Removing article %s with bad trylist in file %s", article, nzf.filename
|
||||
)
|
||||
nzo.increase_bad_articles_counter("missing_articles")
|
||||
sabnzbd.NzbQueue.register_article(article, success=False)
|
||||
|
||||
logging.info("Resetting bad trylist for file %s in job %s", nzf.filename, nzo.final_name)
|
||||
nzf.reset_try_list()
|
||||
|
||||
@@ -26,7 +26,7 @@ import datetime
|
||||
import threading
|
||||
import functools
|
||||
import difflib
|
||||
from typing import Any, Optional, Union, BinaryIO
|
||||
from typing import Any, Optional, Union, BinaryIO, Deque
|
||||
|
||||
# SABnzbd modules
|
||||
import sabnzbd
|
||||
@@ -328,11 +328,12 @@ class NzbFile(TryList):
|
||||
"""Representation of one file consisting of multiple articles"""
|
||||
|
||||
# Pre-define attributes to save memory
|
||||
__slots__ = NzbFileSaver
|
||||
__slots__ = NzbFileSaver + ("lock",)
|
||||
|
||||
def __init__(self, date, subject, raw_article_db, file_bytes, nzo):
|
||||
"""Setup object"""
|
||||
super().__init__()
|
||||
self.lock = threading.RLock()
|
||||
|
||||
self.date: datetime.datetime = date
|
||||
self.type: Optional[str] = None
|
||||
@@ -347,7 +348,7 @@ class NzbFile(TryList):
|
||||
self.setname: Optional[str] = None
|
||||
|
||||
# Articles are removed from "articles" after being fetched
|
||||
self.articles: list[Article] = []
|
||||
self.articles: dict[Article, Article] = {}
|
||||
self.decodetable: list[Article] = []
|
||||
|
||||
self.bytes: int = file_bytes
|
||||
@@ -402,17 +403,18 @@ class NzbFile(TryList):
|
||||
def add_article(self, article_info):
|
||||
"""Add article to object database and return article object"""
|
||||
article = Article(article_info[0], article_info[1], self)
|
||||
self.articles.append(article)
|
||||
self.decodetable.append(article)
|
||||
with self.lock:
|
||||
self.articles[article] = article
|
||||
self.decodetable.append(article)
|
||||
return article
|
||||
|
||||
def remove_article(self, article: Article, success: bool) -> int:
|
||||
"""Handle completed article, possibly end of file"""
|
||||
if article in self.articles:
|
||||
self.articles.remove(article)
|
||||
if success:
|
||||
self.bytes_left -= article.bytes
|
||||
return len(self.articles)
|
||||
with self.lock:
|
||||
if self.articles.pop(article, None) is not None:
|
||||
if success:
|
||||
self.bytes_left -= article.bytes
|
||||
return len(self.articles)
|
||||
|
||||
def set_par2(self, setname, vol, blocks):
|
||||
"""Designate this file as a par2 file"""
|
||||
@@ -427,24 +429,25 @@ class NzbFile(TryList):
|
||||
else:
|
||||
self.crc32 = sabctools.crc32_combine(self.crc32, crc32, length)
|
||||
|
||||
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> list[Article]:
|
||||
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int):
|
||||
"""Get next articles to be downloaded"""
|
||||
articles = []
|
||||
for article in self.articles:
|
||||
if article := article.get_article(server, servers):
|
||||
articles.append(article)
|
||||
if len(articles) >= fetch_limit:
|
||||
return articles
|
||||
articles = server.article_queue
|
||||
with self.lock:
|
||||
for article in self.articles:
|
||||
if article := article.get_article(server, servers):
|
||||
articles.append(article)
|
||||
if len(articles) >= fetch_limit:
|
||||
return
|
||||
self.add_to_try_list(server)
|
||||
return articles
|
||||
|
||||
@synchronized(TRYLIST_LOCK)
|
||||
def reset_all_try_lists(self):
|
||||
"""Reset all try lists. Locked so reset is performed
|
||||
for all items at the same time without chance of another
|
||||
thread changing any of the items while we are resetting"""
|
||||
for art in self.articles:
|
||||
art.reset_try_list()
|
||||
with self.lock:
|
||||
for art in self.articles:
|
||||
art.reset_try_list()
|
||||
self.reset_try_list()
|
||||
|
||||
def first_article_processed(self) -> bool:
|
||||
@@ -474,7 +477,10 @@ class NzbFile(TryList):
|
||||
@property
|
||||
def completed(self):
|
||||
"""Is this file completed?"""
|
||||
return self.import_finished and not bool(self.articles)
|
||||
if not self.import_finished:
|
||||
return False
|
||||
with self.lock:
|
||||
return not self.articles
|
||||
|
||||
def remove_admin(self):
|
||||
"""Remove article database from disk (sabnzbd_nzf_<id>)"""
|
||||
@@ -484,6 +490,12 @@ class NzbFile(TryList):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
self.lock.acquire()
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.lock.release()
|
||||
|
||||
def __getstate__(self):
|
||||
"""Save to pickle file, selecting attributes"""
|
||||
dict_ = {}
|
||||
@@ -501,6 +513,10 @@ class NzbFile(TryList):
|
||||
# Handle new attributes
|
||||
setattr(self, item, None)
|
||||
super().__setstate__(dict_.get("try_list", []))
|
||||
self.lock = threading.RLock()
|
||||
if isinstance(self.articles, list):
|
||||
# Converted from list to dict
|
||||
self.articles = {x: x for x in self.articles}
|
||||
|
||||
def __eq__(self, other: "NzbFile"):
|
||||
"""Assume it's the same file if the number bytes and first article
|
||||
@@ -1640,8 +1656,9 @@ class NzbObject(TryList):
|
||||
self.nzo_info[bad_article_type] += 1
|
||||
self.bad_articles += 1
|
||||
|
||||
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int) -> list[Article]:
|
||||
articles = []
|
||||
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int):
|
||||
"""Assign articles server up to the fetch_limit"""
|
||||
articles: Deque[Article] = server.article_queue
|
||||
nzf_remove_list = []
|
||||
|
||||
# Did we go through all first-articles?
|
||||
@@ -1676,7 +1693,8 @@ class NzbObject(TryList):
|
||||
else:
|
||||
break
|
||||
|
||||
if articles := nzf.get_articles(server, servers, fetch_limit):
|
||||
nzf.get_articles(server, servers, fetch_limit)
|
||||
if articles:
|
||||
break
|
||||
|
||||
# Remove all files for which admin could not be read
|
||||
@@ -1691,7 +1709,6 @@ class NzbObject(TryList):
|
||||
if not articles:
|
||||
# No articles for this server, block for next time
|
||||
self.add_to_try_list(server)
|
||||
return articles
|
||||
|
||||
@synchronized(NZO_LOCK)
|
||||
def move_top_bulk(self, nzf_ids: list[str]):
|
||||
|
||||
@@ -45,32 +45,38 @@ ARTICLE_INFO = re.compile(
|
||||
YENC_ESCAPE = [0x00, 0x0A, 0x0D, ord("="), ord(".")]
|
||||
|
||||
|
||||
class NewsServerProtocol(asyncio.Protocol):
|
||||
def __init__(self):
|
||||
self.transport = None
|
||||
self.connected = False
|
||||
self.in_article = False
|
||||
super().__init__()
|
||||
class NewsServerSession:
|
||||
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
self.reader = reader
|
||||
self.writer = writer
|
||||
|
||||
def connection_made(self, transport):
|
||||
logging.info("Connection from %s", transport.get_extra_info("peername"))
|
||||
self.transport = transport
|
||||
self.connected = True
|
||||
self.transport.write(b"200 Welcome (SABNews)\r\n")
|
||||
async def run(self):
|
||||
self.writer.write(b"200 Welcome (SABNews)\r\n")
|
||||
await self.writer.drain()
|
||||
|
||||
def data_received(self, message):
|
||||
logging.debug("Data received: %s", message.strip())
|
||||
try:
|
||||
while not self.reader.at_eof():
|
||||
message = await self.reader.readuntil(b"\r\n")
|
||||
logging.debug("Data received: %s", message.strip())
|
||||
await self.handle_command(message)
|
||||
except (ConnectionResetError, asyncio.IncompleteReadError):
|
||||
logging.debug("Client closed connection")
|
||||
|
||||
# Handle basic commands
|
||||
async def handle_command(self, message: bytes):
|
||||
"""Handle basic NNTP commands, \r\n is already stripped."""
|
||||
if message.startswith(b"QUIT"):
|
||||
self.close_connection()
|
||||
elif message.startswith((b"ARTICLE", b"BODY")):
|
||||
await self.close_connection()
|
||||
return
|
||||
|
||||
if message.startswith((b"ARTICLE", b"BODY")):
|
||||
parsed_message = ARTICLE_INFO.search(message)
|
||||
self.serve_article(parsed_message)
|
||||
await self.serve_article(parsed_message)
|
||||
return
|
||||
|
||||
# self.transport.write(data)
|
||||
self.writer.write(b"500 Unknown command\r\n")
|
||||
await self.writer.drain()
|
||||
|
||||
def serve_article(self, parsed_message):
|
||||
async def serve_article(self, parsed_message):
|
||||
# Check if we parsed everything
|
||||
try:
|
||||
message_id = parsed_message.group("message_id")
|
||||
@@ -81,34 +87,37 @@ class NewsServerProtocol(asyncio.Protocol):
|
||||
size = int(parsed_message.group("size"))
|
||||
except (AttributeError, ValueError):
|
||||
logging.warning("Can't parse article information")
|
||||
self.transport.write(b"430 No Such Article Found (bad message-id)\r\n")
|
||||
self.writer.write(b"430 No Such Article Found (bad message-id)\r\n")
|
||||
await self.writer.drain()
|
||||
return
|
||||
|
||||
# Check if file exists
|
||||
if not os.path.exists(file):
|
||||
logging.warning("File not found: %s", file)
|
||||
self.transport.write(b"430 No Such Article Found (no file on disk)\r\n")
|
||||
self.writer.write(b"430 No Such Article Found (no file on disk)\r\n")
|
||||
await self.writer.drain()
|
||||
return
|
||||
|
||||
# Check if sizes are valid
|
||||
file_size = os.path.getsize(file)
|
||||
if start + size > file_size:
|
||||
logging.warning("Invalid start/size attributes")
|
||||
self.transport.write(b"430 No Such Article Found (invalid start/size attributes)\r\n")
|
||||
self.writer.write(b"430 No Such Article Found (invalid start/size attributes)\r\n")
|
||||
await self.writer.drain()
|
||||
return
|
||||
|
||||
logging.debug("Serving %s" % message_id)
|
||||
|
||||
# File is found, send headers
|
||||
self.transport.write(b"222 0 %s\r\n" % message_id)
|
||||
self.transport.write(b"Message-ID: %s\r\n" % message_id)
|
||||
self.transport.write(b'Subject: "%s"\r\n\r\n' % file_base.encode("utf-8"))
|
||||
self.writer.write(b"222 0 %s\r\n" % message_id)
|
||||
self.writer.write(b"Message-ID: %s\r\n" % message_id)
|
||||
self.writer.write(b'Subject: "%s"\r\n\r\n' % file_base.encode("utf-8"))
|
||||
|
||||
# Write yEnc headers
|
||||
self.transport.write(
|
||||
self.writer.write(
|
||||
b"=ybegin part=%d line=128 size=%d name=%s\r\n" % (part, file_size, file_base.encode("utf-8"))
|
||||
)
|
||||
self.transport.write(b"=ypart begin=%d end=%d\r\n" % (start + 1, start + size))
|
||||
self.writer.write(b"=ypart begin=%d end=%d\r\n" % (start + 1, start + size))
|
||||
|
||||
with open(file, "rb") as inp_file:
|
||||
inp_file.seek(start)
|
||||
@@ -116,24 +125,31 @@ class NewsServerProtocol(asyncio.Protocol):
|
||||
|
||||
# Encode data
|
||||
output_string, crc = sabctools.yenc_encode(inp_buffer)
|
||||
self.transport.write(output_string)
|
||||
self.writer.write(output_string)
|
||||
|
||||
# Write footer
|
||||
self.transport.write(b"\r\n=yend size=%d part=%d pcrc32=%08x\r\n" % (size, part, crc))
|
||||
self.transport.write(b".\r\n")
|
||||
self.writer.write(b"\r\n=yend size=%d part=%d pcrc32=%08x\r\n" % (size, part, crc))
|
||||
self.writer.write(b".\r\n")
|
||||
await self.writer.drain()
|
||||
|
||||
def close_connection(self):
|
||||
async def close_connection(self):
|
||||
logging.debug("Closing connection")
|
||||
self.transport.write(b"205 Connection closing\r\n")
|
||||
self.transport.close()
|
||||
self.writer.write(b"205 Connection closing\r\n")
|
||||
await self.writer.drain()
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
|
||||
|
||||
async def connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
session = NewsServerSession(reader, writer)
|
||||
await session.run()
|
||||
|
||||
|
||||
async def serve_sabnews(hostname, port):
|
||||
# Start server
|
||||
logging.info("Starting SABNews on %s:%d", hostname, port)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
server = await loop.create_server(lambda: NewsServerProtocol(), hostname, port)
|
||||
server = await asyncio.start_server(connection_handler, hostname, port)
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
|
||||
|
||||
@@ -21,10 +21,12 @@ tests.test_decoder- Testing functions in decoder.py
|
||||
import binascii
|
||||
import os
|
||||
import pytest
|
||||
from io import BytesIO
|
||||
|
||||
from random import randint
|
||||
from unittest import mock
|
||||
|
||||
import sabctools
|
||||
import sabnzbd.decoder as decoder
|
||||
from sabnzbd.nzbstuff import Article
|
||||
|
||||
@@ -111,7 +113,7 @@ class TestUuDecoder:
|
||||
result.append(END_DATA)
|
||||
|
||||
# Signal the end of the message with a dot on a line of its own
|
||||
data.append(b".")
|
||||
data.append(b".\r\n")
|
||||
|
||||
# Join the data with \r\n line endings, just like we get from socket reads
|
||||
data = b"\r\n".join(data)
|
||||
@@ -120,22 +122,26 @@ class TestUuDecoder:
|
||||
|
||||
return article, bytearray(data), result
|
||||
|
||||
def test_no_data(self):
|
||||
with pytest.raises(decoder.BadUu):
|
||||
assert decoder.decode_uu(None, None)
|
||||
@staticmethod
|
||||
def _response(raw_data: bytes) -> sabctools.NNTPResponse:
|
||||
dec = sabctools.Decoder(len(raw_data))
|
||||
reader = BytesIO(raw_data)
|
||||
reader.readinto(dec)
|
||||
dec.process(len(raw_data))
|
||||
return next(dec)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"raw_data",
|
||||
[
|
||||
b"",
|
||||
b"\r\n\r\n",
|
||||
b"foobar\r\n", # Plenty of list items, but (too) few actual lines
|
||||
b"222 0 <artid@woteva>\r\nX-Too-Short: yup\r\n",
|
||||
b"222 0 <foo@bar>\r\n.\r\n",
|
||||
b"222 0 <foo@bar>\r\n\r\n.\r\n",
|
||||
b"222 0 <foo@bar>\r\nfoobar\r\n.\r\n", # Plenty of list items, but (too) few actual lines
|
||||
b"222 0 <foo@bar>\r\nX-Too-Short: yup\r\n.\r\n",
|
||||
],
|
||||
)
|
||||
def test_short_data(self, raw_data):
|
||||
with pytest.raises(decoder.BadUu):
|
||||
assert decoder.decode_uu(None, bytearray(raw_data))
|
||||
assert decoder.decode_uu(Article("foo@bar", 4321, None), self._response(raw_data))
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"raw_data",
|
||||
@@ -158,7 +164,8 @@ class TestUuDecoder:
|
||||
with pytest.raises(decoder.BadUu):
|
||||
raw_data = bytearray(raw_data)
|
||||
raw_data.extend(filler)
|
||||
assert decoder.decode_uu(article, raw_data)
|
||||
raw_data.extend(b".\r\n")
|
||||
assert decoder.decode_uu(article, self._response(raw_data))
|
||||
|
||||
@pytest.mark.parametrize("insert_empty_line", [True, False])
|
||||
@pytest.mark.parametrize("insert_excess_empty_lines", [True, False])
|
||||
@@ -194,7 +201,7 @@ class TestUuDecoder:
|
||||
insert_dot_stuffing_line,
|
||||
begin_line,
|
||||
)
|
||||
assert decoder.decode_uu(article, raw_data) == expected_result
|
||||
assert decoder.decode_uu(article, self._response(raw_data)) == expected_result
|
||||
assert article.nzf.filename_checked
|
||||
|
||||
@pytest.mark.parametrize("insert_empty_line", [True, False])
|
||||
@@ -205,7 +212,7 @@ class TestUuDecoder:
|
||||
decoded_data = expected_data = b""
|
||||
for part in ("begin", "middle", "middle", "end"):
|
||||
article, data, result = self._generate_msg_part(part, insert_empty_line, False, False, True)
|
||||
decoded_data += decoder.decode_uu(article, data)
|
||||
decoded_data += decoder.decode_uu(article, self._response(data))
|
||||
expected_data += result
|
||||
|
||||
# Verify results
|
||||
@@ -223,4 +230,6 @@ class TestUuDecoder:
|
||||
article.lowest_partnum = False
|
||||
filler = b"\r\n".join(VALID_UU_LINES[:4]) + b"\r\n"
|
||||
with pytest.raises(decoder.BadData):
|
||||
assert decoder.decode_uu(article, bytearray(b"222 0 <foo@bar>\r\n" + filler + bad_data + b"\r\n"))
|
||||
assert decoder.decode_uu(
|
||||
article, self._response(bytearray(b"222 0 <foo@bar>\r\n" + filler + bad_data + b"\r\n.\r\n"))
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user