mirror of
https://github.com/sabnzbd/sabnzbd.git
synced 2025-12-30 11:09:22 -05:00
Compare commits
1 Commits
develop
...
feature/bl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d21033568 |
@@ -16,7 +16,8 @@
|
||||
|
||||
5. **Request scheduling & pipelining**
|
||||
- `write()` chooses the next article command (`STAT/HEAD` for precheck, `BODY` or `ARTICLE` otherwise).
|
||||
- Concurrency is limited by `server.pipelining_requests`; commands are queued and sent; partial writes are buffered.
|
||||
- Concurrency is limited by `server.pipelining_requests`; commands are queued and sent with `sock.sendall`, so there is no local send buffer. A `BlockingIOError/SSLWantWriteError` simply requeues the same request for the next selector cycle.
|
||||
- Sockets stay registered for `EVENT_WRITE`: without write readiness events, a temporarily full kernel send buffer could stall queued commands when there is nothing to read, so WRITE interest is needed to resume sending promptly.
|
||||
|
||||
6. **Receiving data**
|
||||
- Selector events route to `process_nw_read`; `NewsWrapper.read` pulls bytes (SSL optimized via sabctools), parses NNTP responses, and calls `on_response`.
|
||||
|
||||
@@ -60,7 +60,6 @@ class NewsWrapper:
|
||||
"blocking",
|
||||
"timeout",
|
||||
"decoder",
|
||||
"send_buffer",
|
||||
"nntp",
|
||||
"connected",
|
||||
"user_sent",
|
||||
@@ -86,7 +85,6 @@ class NewsWrapper:
|
||||
self.timeout: Optional[float] = None
|
||||
|
||||
self.decoder: Optional[sabctools.Decoder] = None
|
||||
self.send_buffer = b""
|
||||
|
||||
self.nntp: Optional[NNTP] = None
|
||||
|
||||
@@ -338,13 +336,6 @@ class NewsWrapper:
|
||||
server = self.server
|
||||
|
||||
try:
|
||||
# First, try to flush any remaining data
|
||||
if self.send_buffer:
|
||||
sent = self.nntp.sock.send(self.send_buffer)
|
||||
self.send_buffer = self.send_buffer[sent:]
|
||||
if self.send_buffer:
|
||||
# Still unsent data, wait for next EVENT_WRITE
|
||||
return
|
||||
|
||||
if self.connected:
|
||||
if (
|
||||
@@ -365,7 +356,7 @@ class NewsWrapper:
|
||||
self.next_request = None
|
||||
|
||||
# If no pending buffer, try to send new command
|
||||
if not self.send_buffer and self.next_request:
|
||||
if self.next_request:
|
||||
if self.concurrent_requests.acquire(blocking=False):
|
||||
command, article = self.next_request
|
||||
self.next_request = None
|
||||
@@ -375,25 +366,24 @@ class NewsWrapper:
|
||||
self.discard(article, count_article_try=False, retry_article=True)
|
||||
self.concurrent_requests.release()
|
||||
return
|
||||
self._response_queue.append(article)
|
||||
|
||||
if sabnzbd.LOG_ALL:
|
||||
logging.debug("Thread %s@%s: %s", self.thrdnum, server.host, command)
|
||||
try:
|
||||
sent = self.nntp.sock.send(command)
|
||||
if sent < len(command):
|
||||
# Partial send, store remainder
|
||||
self.send_buffer = command[sent:]
|
||||
self.nntp.sock.sendall(command)
|
||||
self._response_queue.append(article)
|
||||
except (BlockingIOError, ssl.SSLWantWriteError):
|
||||
# Can't send now, store full command
|
||||
self.send_buffer = command
|
||||
# Couldn't send now, try again later
|
||||
self.concurrent_requests.release()
|
||||
self.next_request = (command, article)
|
||||
return
|
||||
else:
|
||||
# Concurrency limit reached
|
||||
sabnzbd.Downloader.modify_socket(self, EVENT_READ)
|
||||
else:
|
||||
# Is it safe to shut down this socket?
|
||||
if (
|
||||
not self.send_buffer
|
||||
and not self.next_request
|
||||
not self.next_request
|
||||
and not self._response_queue
|
||||
and (not server.active or server.restart or not self.timeout or time.time() > self.timeout)
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user