Compare commits

...

2 Commits

Author SHA1 Message Date
Alex Cheema
21c363e997 fix: move suppress(ClosedResourceError) inside runner.shutdown() per review
Move the ClosedResourceError suppression from the two call sites in
worker/main.py into RunnerSupervisor.shutdown() itself, so each
close/send on already-closed channels is individually guarded.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 09:01:54 -08:00
Alex Cheema
b1c0e3116d fix: misc bug fixes (spawn force, download restart, shutdown guard)
Three independent fixes extracted from meta-instance branch (#1519):

- Use force=True for mp.set_start_method("spawn") to prevent errors
  when the start method was already set by another initialization path
- Detect already-complete downloads on restart instead of reporting them
  as DownloadPending (checks downloaded_bytes >= total_bytes)
- Guard runner.shutdown() with contextlib.suppress(ClosedResourceError)
  to handle already-closed resources during worker teardown

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 05:37:11 -08:00
3 changed files with 22 additions and 7 deletions

View File

@@ -338,7 +338,17 @@ class DownloadCoordinator:
),
)
elif progress.status in ["in_progress", "not_started"]:
if progress.downloaded_bytes_this_session.in_bytes == 0:
if (
progress.downloaded_bytes.in_bytes
>= progress.total_bytes.in_bytes
> 0
):
status = DownloadCompleted(
node_id=self.node_id,
shard_metadata=progress.shard,
total_bytes=progress.total_bytes,
)
elif progress.downloaded_bytes_this_session.in_bytes == 0:
status = DownloadPending(
node_id=self.node_id,
shard_metadata=progress.shard,

View File

@@ -258,7 +258,7 @@ def main():
target = min(max(soft, 65535), hard)
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard))
mp.set_start_method("spawn")
mp.set_start_method("spawn", force=True)
# TODO: Refactor the current verbosity system
logger_setup(EXO_LOG, args.verbosity)
logger.info("Starting EXO")

View File

@@ -98,11 +98,16 @@ class RunnerSupervisor:
def shutdown(self):
logger.info("Runner supervisor shutting down")
self._ev_recv.close()
self._task_sender.close()
self._event_sender.close()
self._cancel_sender.send(TaskId("CANCEL_CURRENT_TASK"))
self._cancel_sender.close()
with contextlib.suppress(ClosedResourceError):
self._ev_recv.close()
with contextlib.suppress(ClosedResourceError):
self._task_sender.close()
with contextlib.suppress(ClosedResourceError):
self._event_sender.close()
with contextlib.suppress(ClosedResourceError):
self._cancel_sender.send(TaskId("CANCEL_CURRENT_TASK"))
with contextlib.suppress(ClosedResourceError):
self._cancel_sender.close()
self.runner_process.join(5)
if not self.runner_process.is_alive():
logger.info("Runner process succesfully terminated")