Compare commits

...

1 Commits

Author SHA1 Message Date
Safihre
8d21033568 Use blocking writes instead of buffering 2025-12-29 12:43:58 +01:00
2 changed files with 11 additions and 20 deletions

View File

@@ -16,7 +16,8 @@
5. **Request scheduling & pipelining**
- `write()` chooses the next article command (`STAT/HEAD` for precheck, `BODY` or `ARTICLE` otherwise).
- Concurrency is limited by `server.pipelining_requests`; commands are queued and sent; partial writes are buffered.
- Concurrency is limited by `server.pipelining_requests`; commands are queued and sent with `sock.sendall`, so there is no local send buffer. A `BlockingIOError/SSLWantWriteError` simply requeues the same request for the next selector cycle.
- Sockets stay registered for `EVENT_WRITE`: without write readiness events, a temporarily full kernel send buffer could stall queued commands when there is nothing to read, so WRITE interest is needed to resume sending promptly.
6. **Receiving data**
- Selector events route to `process_nw_read`; `NewsWrapper.read` pulls bytes (SSL optimized via sabctools), parses NNTP responses, and calls `on_response`.

View File

@@ -60,7 +60,6 @@ class NewsWrapper:
"blocking",
"timeout",
"decoder",
"send_buffer",
"nntp",
"connected",
"user_sent",
@@ -86,7 +85,6 @@ class NewsWrapper:
self.timeout: Optional[float] = None
self.decoder: Optional[sabctools.Decoder] = None
self.send_buffer = b""
self.nntp: Optional[NNTP] = None
@@ -338,13 +336,6 @@ class NewsWrapper:
server = self.server
try:
# First, try to flush any remaining data
if self.send_buffer:
sent = self.nntp.sock.send(self.send_buffer)
self.send_buffer = self.send_buffer[sent:]
if self.send_buffer:
# Still unsent data, wait for next EVENT_WRITE
return
if self.connected:
if (
@@ -365,7 +356,7 @@ class NewsWrapper:
self.next_request = None
# If no pending buffer, try to send new command
if not self.send_buffer and self.next_request:
if self.next_request:
if self.concurrent_requests.acquire(blocking=False):
command, article = self.next_request
self.next_request = None
@@ -375,25 +366,24 @@ class NewsWrapper:
self.discard(article, count_article_try=False, retry_article=True)
self.concurrent_requests.release()
return
self._response_queue.append(article)
if sabnzbd.LOG_ALL:
logging.debug("Thread %s@%s: %s", self.thrdnum, server.host, command)
try:
sent = self.nntp.sock.send(command)
if sent < len(command):
# Partial send, store remainder
self.send_buffer = command[sent:]
self.nntp.sock.sendall(command)
self._response_queue.append(article)
except (BlockingIOError, ssl.SSLWantWriteError):
# Can't send now, store full command
self.send_buffer = command
# Couldn't send now, try again later
self.concurrent_requests.release()
self.next_request = (command, article)
return
else:
# Concurrency limit reached
sabnzbd.Downloader.modify_socket(self, EVENT_READ)
else:
# Is it safe to shut down this socket?
if (
not self.send_buffer
and not self.next_request
not self.next_request
and not self._response_queue
and (not server.active or server.restart or not self.timeout or time.time() > self.timeout)
):