mirror of
https://github.com/exo-explore/exo.git
synced 2026-02-18 14:55:13 -05:00
Compare commits
1 Commits
handle-mes
...
fix/mp-rec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
faac462d9a |
@@ -126,37 +126,11 @@ final class ExoProcessController: ObservableObject {
|
||||
return
|
||||
}
|
||||
process.terminationHandler = nil
|
||||
status = .stopped
|
||||
|
||||
guard process.isRunning else {
|
||||
self.process = nil
|
||||
return
|
||||
if process.isRunning {
|
||||
process.terminate()
|
||||
}
|
||||
|
||||
let proc = process
|
||||
self.process = nil
|
||||
|
||||
Task.detached {
|
||||
proc.interrupt()
|
||||
|
||||
for _ in 0..<50 {
|
||||
if !proc.isRunning { return }
|
||||
try? await Task.sleep(nanoseconds: 100_000_000)
|
||||
}
|
||||
|
||||
if proc.isRunning {
|
||||
proc.terminate()
|
||||
}
|
||||
|
||||
for _ in 0..<30 {
|
||||
if !proc.isRunning { return }
|
||||
try? await Task.sleep(nanoseconds: 100_000_000)
|
||||
}
|
||||
|
||||
if proc.isRunning {
|
||||
kill(proc.processIdentifier, SIGKILL)
|
||||
}
|
||||
}
|
||||
status = .stopped
|
||||
}
|
||||
|
||||
func restart() {
|
||||
|
||||
@@ -136,8 +136,6 @@ class Node:
|
||||
|
||||
async def run(self):
|
||||
async with self._tg as tg:
|
||||
signal.signal(signal.SIGINT, lambda _, __: self.shutdown())
|
||||
signal.signal(signal.SIGTERM, lambda _, __: self.shutdown())
|
||||
tg.start_soon(self.router.run)
|
||||
tg.start_soon(self.election.run)
|
||||
if self.download_coordinator:
|
||||
@@ -149,6 +147,8 @@ class Node:
|
||||
if self.api:
|
||||
tg.start_soon(self.api.run)
|
||||
tg.start_soon(self._elect_loop)
|
||||
signal.signal(signal.SIGINT, lambda _, __: self.shutdown())
|
||||
signal.signal(signal.SIGTERM, lambda _, __: self.shutdown())
|
||||
|
||||
def shutdown(self):
|
||||
# if this is our second call to shutdown, just sys.exit
|
||||
|
||||
@@ -211,14 +211,6 @@ class Router:
|
||||
pass
|
||||
except AllQueuesFullError:
|
||||
logger.warning(f"All peer queues full, dropping message on {topic}")
|
||||
except RuntimeError as e:
|
||||
if "MessageTooLarge" in str(e):
|
||||
logger.error(
|
||||
f"Message too large for gossipsub on topic {topic} "
|
||||
f"({len(data)} bytes), dropping message"
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def get_node_id_keypair(
|
||||
|
||||
@@ -204,6 +204,10 @@ class MpReceiver[T]:
|
||||
def close(self) -> None:
|
||||
if not self._state.closed.is_set():
|
||||
self._state.closed.set()
|
||||
try: # noqa: SIM105
|
||||
self._state.buffer.put_nowait(_MpEndOfStream())
|
||||
except Exception:
|
||||
pass
|
||||
self._state.buffer.close()
|
||||
|
||||
# == unique to Mp channels ==
|
||||
|
||||
Reference in New Issue
Block a user