Compare commits

..

1 Commits

Author SHA1 Message Date
Alex Cheema
faac462d9a fix: unblock MpReceiver.close() to prevent pytest hang on shutdown
MpReceiver.close() only set the closed flag and closed the buffer pipe,
but did not unblock a thread stuck on queue.get() in receive_async().
This caused abandoned threads (from abandon_on_cancel=True) to keep the
Python process alive indefinitely after all tests passed, leading to
6-hour CI timeouts on aarch64-darwin.

Send an _MpEndOfStream sentinel before closing the buffer, mirroring
what MpSender.close() already does, so the blocked get() returns and
the thread can exit cleanly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 10:10:42 -08:00
4 changed files with 9 additions and 39 deletions

View File

@@ -126,37 +126,11 @@ final class ExoProcessController: ObservableObject {
return
}
process.terminationHandler = nil
status = .stopped
guard process.isRunning else {
self.process = nil
return
if process.isRunning {
process.terminate()
}
let proc = process
self.process = nil
Task.detached {
proc.interrupt()
for _ in 0..<50 {
if !proc.isRunning { return }
try? await Task.sleep(nanoseconds: 100_000_000)
}
if proc.isRunning {
proc.terminate()
}
for _ in 0..<30 {
if !proc.isRunning { return }
try? await Task.sleep(nanoseconds: 100_000_000)
}
if proc.isRunning {
kill(proc.processIdentifier, SIGKILL)
}
}
status = .stopped
}
func restart() {

View File

@@ -136,8 +136,6 @@ class Node:
async def run(self):
async with self._tg as tg:
signal.signal(signal.SIGINT, lambda _, __: self.shutdown())
signal.signal(signal.SIGTERM, lambda _, __: self.shutdown())
tg.start_soon(self.router.run)
tg.start_soon(self.election.run)
if self.download_coordinator:
@@ -149,6 +147,8 @@ class Node:
if self.api:
tg.start_soon(self.api.run)
tg.start_soon(self._elect_loop)
signal.signal(signal.SIGINT, lambda _, __: self.shutdown())
signal.signal(signal.SIGTERM, lambda _, __: self.shutdown())
def shutdown(self):
# if this is our second call to shutdown, just sys.exit

View File

@@ -211,14 +211,6 @@ class Router:
pass
except AllQueuesFullError:
logger.warning(f"All peer queues full, dropping message on {topic}")
except RuntimeError as e:
if "MessageTooLarge" in str(e):
logger.error(
f"Message too large for gossipsub on topic {topic} "
f"({len(data)} bytes), dropping message"
)
else:
raise
def get_node_id_keypair(

View File

@@ -204,6 +204,10 @@ class MpReceiver[T]:
def close(self) -> None:
if not self._state.closed.is_set():
self._state.closed.set()
try: # noqa: SIM105
self._state.buffer.put_nowait(_MpEndOfStream())
except Exception:
pass
self._state.buffer.close()
# == unique to Mp channels ==