mirror of
https://github.com/sabnzbd/sabnzbd.git
synced 2025-12-24 00:00:12 -05:00
Make behaviour after reset more robust (#3229)
* Make behaviour after reset more robust * Remove use of hasattr and rename to generation * I had a feeling this would be a circular reference * Reset and increment generation under lock
This commit is contained in:
@@ -694,7 +694,8 @@ class Downloader(Thread):
|
||||
if self.selector.get_map():
|
||||
if events := self.selector.select(timeout=1.0):
|
||||
for key, ev in events:
|
||||
process_nw_queue.put((key.data, ev))
|
||||
nw = key.data
|
||||
process_nw_queue.put((nw, ev, nw.generation))
|
||||
else:
|
||||
events = []
|
||||
BPSMeter.reset()
|
||||
@@ -738,20 +739,26 @@ class Downloader(Thread):
|
||||
logging.error(T("Fatal error in Downloader"), exc_info=True)
|
||||
self.pause()
|
||||
|
||||
def process_nw(self, nw: NewsWrapper, event: int):
|
||||
def process_nw(self, nw: NewsWrapper, event: int, generation: int):
|
||||
"""Receive data from a NewsWrapper and handle the response"""
|
||||
# Drop stale items
|
||||
if nw.generation != generation:
|
||||
return
|
||||
if event & selectors.EVENT_READ:
|
||||
self.process_nw_read(nw)
|
||||
self.process_nw_read(nw, generation)
|
||||
# If read caused a reset, don't proceed to write
|
||||
if nw.generation != generation:
|
||||
return
|
||||
if event & selectors.EVENT_WRITE:
|
||||
nw.write()
|
||||
|
||||
def process_nw_read(self, nw: NewsWrapper) -> None:
|
||||
def process_nw_read(self, nw: NewsWrapper, generation: int) -> None:
|
||||
bytes_received: int = 0
|
||||
bytes_pending: int = 0
|
||||
|
||||
while nw.decoder:
|
||||
while nw.decoder and nw.generation == generation:
|
||||
try:
|
||||
n, bytes_pending = nw.read(nbytes=bytes_pending)
|
||||
n, bytes_pending = nw.read(nbytes=bytes_pending, generation=generation)
|
||||
bytes_received += n
|
||||
except ssl.SSLWantReadError:
|
||||
return
|
||||
@@ -768,6 +775,10 @@ class Downloader(Thread):
|
||||
if not bytes_pending:
|
||||
break
|
||||
|
||||
# Ignore metrics for reset connections
|
||||
if nw.generation != generation:
|
||||
return
|
||||
|
||||
server = nw.server
|
||||
|
||||
with DOWNLOADER_LOCK:
|
||||
|
||||
@@ -74,12 +74,14 @@ class NewsWrapper:
|
||||
"_response_queue",
|
||||
"selector_events",
|
||||
"lock",
|
||||
"generation",
|
||||
)
|
||||
|
||||
def __init__(self, server, thrdnum, block=False):
|
||||
def __init__(self, server: "sabnzbd.downloader.Server", thrdnum: int, block: bool = False, generation: int = 0):
|
||||
self.server: sabnzbd.downloader.Server = server
|
||||
self.thrdnum: int = thrdnum
|
||||
self.blocking: bool = block
|
||||
self.generation: int = generation
|
||||
|
||||
self.timeout: Optional[float] = None
|
||||
|
||||
@@ -282,12 +284,17 @@ class NewsWrapper:
|
||||
self,
|
||||
nbytes: int = 0,
|
||||
on_response: Optional[Callable[[int, str], None]] = None,
|
||||
generation: Optional[int] = None,
|
||||
) -> Tuple[int, Optional[int]]:
|
||||
"""Receive data, return #bytes, #pendingbytes
|
||||
:param nbytes: maximum number of bytes to read
|
||||
:param on_response: callback for each complete response received
|
||||
:param generation: expected reset generation
|
||||
:return: #bytes, #pendingbytes
|
||||
"""
|
||||
if generation is None:
|
||||
generation = self.generation
|
||||
|
||||
# NewsWrapper is being reset
|
||||
if not self.decoder:
|
||||
return 0, None
|
||||
@@ -308,7 +315,12 @@ class NewsWrapper:
|
||||
|
||||
self.decoder.process(bytes_recv)
|
||||
for response in self.decoder:
|
||||
if self.generation != generation:
|
||||
break
|
||||
with self.lock:
|
||||
# Re-check under lock to avoid racing with hard_reset
|
||||
if self.generation != generation or not self._response_queue:
|
||||
break
|
||||
article = self._response_queue.popleft()
|
||||
if on_response:
|
||||
on_response(response.status_code, response.message)
|
||||
@@ -419,8 +431,9 @@ class NewsWrapper:
|
||||
self.nntp.close(send_quit=self.connected)
|
||||
self.nntp = None
|
||||
|
||||
# Reset all variables (including the NNTP connection)
|
||||
self.__init__(self.server, self.thrdnum)
|
||||
with self.lock:
|
||||
# Reset all variables (including the NNTP connection) and increment the generation counter
|
||||
self.__init__(self.server, self.thrdnum, generation=self.generation + 1)
|
||||
|
||||
# Wait before re-using this newswrapper
|
||||
if wait:
|
||||
|
||||
Reference in New Issue
Block a user