From e5a1e5dadbfec051ab23fc7a5cb66c8fc788ad5d Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 8 May 2026 18:50:18 +0100 Subject: [PATCH] Create PID file locking for EXO (#2072) ## Motivation EXO should be PID file locked, to prevent duplicate processes from clobbering the log, right now this isn't the case. ## Changes I added a wrapper around a Rust PID file lock library, and used it to implement PID locking for EXO, with the PID file being in exo cache directory. ## Test Plan ### Manual Testing Tested on e11, trying to spawn duplicate EXO processes prevented. --- Cargo.lock | 42 +++++++++- rust/exo_pyo3_bindings/Cargo.toml | 3 + rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi | 44 ++++++++++ rust/exo_pyo3_bindings/src/lib.rs | 3 + rust/exo_pyo3_bindings/src/pidfile.rs | 87 ++++++++++++++++++++ rust/exo_pyo3_bindings/tests/test_python.py | 13 +++ src/exo/main.py | 11 +++ src/exo/shared/constants.py | 1 + src/exo/utils/pidfile.py | 26 ++++++ src/exo/utils/tests/conftest.py | 8 ++ src/exo/utils/tests/test_pidfile.py | 84 +++++++++++++++++++ 11 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 rust/exo_pyo3_bindings/src/pidfile.rs create mode 100644 src/exo/utils/pidfile.py create mode 100644 src/exo/utils/tests/conftest.py create mode 100644 src/exo/utils/tests/test_pidfile.py diff --git a/Cargo.lock b/Cargo.lock index 96819c821..d0ab25d7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -916,11 +916,13 @@ dependencies = [ "libp2p", "log", "networking", + "pidfile-rs", "pin-project", "pyo3", "pyo3-async-runtimes", "pyo3-log", "pyo3-stub-gen", + "thiserror 2.0.17", "tokio", "util", ] @@ -964,6 +966,16 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +[[package]] +name = "flopen" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbfb8b5fbd1f27929f216650081a07b6ceb0741f0542c8c43ff7ef8e93a35a5d" +dependencies = [ + "libc", + "nix 0.31.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1789,9 +1801,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.178" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libp2p" @@ -2807,6 +2819,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -3060,6 +3084,18 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pidfile-rs" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1a8aa9a30b1b65ef48b333931b80f2324a14e00208eb2b8f5788f1180791bcc" +dependencies = [ + "flopen", + "libc", + "log", + "thiserror 1.0.69", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -3668,7 +3704,7 @@ dependencies = [ "netlink-packet-utils", "netlink-proto", "netlink-sys", - "nix", + "nix 0.26.4", "thiserror 1.0.69", "tokio", ] diff --git a/rust/exo_pyo3_bindings/Cargo.toml b/rust/exo_pyo3_bindings/Cargo.toml index e7577ab79..143e4d1fd 100644 --- a/rust/exo_pyo3_bindings/Cargo.toml +++ b/rust/exo_pyo3_bindings/Cargo.toml @@ -46,9 +46,12 @@ pyo3-async-runtimes = { version = "0.27.0", features = [ ] } pyo3-log = "0.13.2" +pidfile-rs = "0.3" + # macro dependencies extend = { workspace = true } delegate = { workspace = true } +thiserror = "2.0" # async runtime tokio = { workspace = true, features = ["full", "tracing"] } diff --git a/rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi b/rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi index bfd8978af..e7c423f03 100644 --- a/rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi +++ b/rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi @@ -2,6 +2,8 @@ # ruff: noqa: E501, F401 import builtins +import os +import pathlib import typing @typing.final @@ -69,6 +71,48 @@ class NoPeersSubscribedToTopicError(builtins.Exception): def __repr__(self) -> builtins.str: ... def __str__(self) -> builtins.str: ... +@typing.final +class Pidfile: + r""" + A PID file protected with a lock. + + An instance of `Pidfile` can be used to manage a PID file: create it, + lock it, detect already running daemons. It is backed by [`pidfile`][] + functions of `libbsd`/`libutil` which use `flopen` to lock the PID + file. + + When a PID file is created, the process ID of the current process is + *not* written there, making it possible to lock the PID file before + forking and only write the ID of the forked process when it is ready. + + The PID file is deleted automatically when the `Pidfile` comes out of + the scope. To close the PID file without deleting it, for example, in + the parent process of a forked daemon, call `close()`. + + [`exit`]: https://doc.rust-lang.org/std/process/fn.exit.html + [`pidfile`]: https://linux.die.net/man/3/pidfile + [`daemon`(3)]: https://linux.die.net/man/3/daemon + """ + def __new__(cls, path: builtins.str | os.PathLike | pathlib.Path, mode: builtins.int) -> Pidfile: + r""" + Creates a new PID file and locks it. + + If the PID file cannot be locked, returns `PidfileError::AlreadyRunning` with + a PID of the already running process, or `None` if no PID has been written to + the PID file yet. + """ + def write(self) -> None: + r""" + Writes the current process ID to the PID file. + + The file is truncated before writing. + """ + +@typing.final +class PidfileError(builtins.Exception): + def __repr__(self) -> builtins.str: ... + def __str__(self) -> builtins.str: ... + class PyFromSwarm: @typing.final class Connection(PyFromSwarm): diff --git a/rust/exo_pyo3_bindings/src/lib.rs b/rust/exo_pyo3_bindings/src/lib.rs index e22afdeb2..18a147f4c 100644 --- a/rust/exo_pyo3_bindings/src/lib.rs +++ b/rust/exo_pyo3_bindings/src/lib.rs @@ -7,9 +7,11 @@ mod allow_threading; mod ident; mod networking; +mod pidfile; use crate::ident::PyKeypair; use crate::networking::networking_submodule; +use crate::pidfile::pidfile_submodule; use pyo3::prelude::PyModule; use pyo3::types::PyModuleMethods; use pyo3::{Bound, PyResult, pyclass, pymodule}; @@ -164,6 +166,7 @@ fn main_module(m: &Bound<'_, PyModule>) -> PyResult<()> { // too many importing issues... m.add_class::()?; networking_submodule(m)?; + pidfile_submodule(m)?; // top-level constructs // TODO: ... diff --git a/rust/exo_pyo3_bindings/src/pidfile.rs b/rust/exo_pyo3_bindings/src/pidfile.rs new file mode 100644 index 000000000..32e8d7f79 --- /dev/null +++ b/rust/exo_pyo3_bindings/src/pidfile.rs @@ -0,0 +1,87 @@ +use pidfile_rs::{Pidfile, PidfileError}; +use pyo3::exceptions::PyException; +use pyo3::prelude::{PyModule, PyModuleMethods}; +use pyo3::{Bound, PyErr, PyResult, Python, pyclass, pymethods}; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +use std::fs::Permissions; +use std::os::unix::prelude::PermissionsExt; +use std::path::PathBuf; + +#[gen_stub_pyclass] +#[pyclass(frozen, extends=PyException, name="PidfileError")] +pub struct PyPidfileError(PidfileError); + +impl PyPidfileError { + // TODO: I actually like this pattern a LOT more but how to abstract?? + fn into_pyerr(self, py: Python) -> PyErr { + match Bound::new(py, self) { + Ok(err) => PyErr::from_value(err.into_any()), + Err(err) => err, + } + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl PyPidfileError { + fn __repr__(&self) -> String { + format!("PidfileError(\"{}\")", self.0) + } + + fn __str__(&self) -> String { + self.0.to_string() + } +} + +/// A PID file protected with a lock. +/// +/// An instance of `Pidfile` can be used to manage a PID file: create it, +/// lock it, detect already running daemons. It is backed by [`pidfile`][] +/// functions of `libbsd`/`libutil` which use `flopen` to lock the PID +/// file. +/// +/// When a PID file is created, the process ID of the current process is +/// *not* written there, making it possible to lock the PID file before +/// forking and only write the ID of the forked process when it is ready. +/// +/// The PID file is deleted automatically when the `Pidfile` comes out of +/// the scope. To close the PID file without deleting it, for example, in +/// the parent process of a forked daemon, call `close()`. +/// +/// [`exit`]: https://doc.rust-lang.org/std/process/fn.exit.html +/// [`pidfile`]: https://linux.die.net/man/3/pidfile +/// [`daemon`(3)]: https://linux.die.net/man/3/daemon +#[gen_stub_pyclass] +#[pyclass(name = "Pidfile")] +pub struct PyPidfile(Pidfile); + +#[gen_stub_pymethods] +#[pymethods] +impl PyPidfile { + /// Creates a new PID file and locks it. + /// + /// If the PID file cannot be locked, returns `PidfileError::AlreadyRunning` with + /// a PID of the already running process, or `None` if no PID has been written to + /// the PID file yet. + #[new] + fn py_new(py: Python, path: PathBuf, mode: u32) -> PyResult { + Ok(Self( + Pidfile::new(&path, Permissions::from_mode(mode)) + .map_err(|e| PyPidfileError(e).into_pyerr(py))?, + )) + } + + /// Writes the current process ID to the PID file. + /// + /// The file is truncated before writing. + fn write<'py>(&mut self, py: Python<'py>) -> PyResult<()> { + self.0.write().map_err(|e| PyPidfileError(e).into_pyerr(py)) + } +} + +pub fn pidfile_submodule(m: &Bound) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + + Ok(()) +} diff --git a/rust/exo_pyo3_bindings/tests/test_python.py b/rust/exo_pyo3_bindings/tests/test_python.py index a653103d1..ed65f4298 100644 --- a/rust/exo_pyo3_bindings/tests/test_python.py +++ b/rust/exo_pyo3_bindings/tests/test_python.py @@ -1,10 +1,12 @@ import asyncio import pytest +from _pytest.capture import CaptureFixture from exo_pyo3_bindings import ( Keypair, NetworkingHandle, NoPeersSubscribedToTopicError, + Pidfile, PyFromSwarm, ) @@ -26,6 +28,13 @@ async def test_sleep_on_multiple_items() -> None: print("caught it", e) +def test_pidfile(capsys: CaptureFixture[str]): + with capsys.disabled(): + print("\nbefore python") + scoped_lock_file() + print("after python") + + async def _await_recv(h: NetworkingHandle): while True: event = await h.recv() @@ -34,3 +43,7 @@ async def _await_recv(h: NetworkingHandle): print(f"PYTHON: connection update: {c}") case PyFromSwarm.Message() as m: print(f"PYTHON: message: {m}") + + +def scoped_lock_file(): + a = Pidfile("/tmp/lock.pid", 0o0600) diff --git a/src/exo/main.py b/src/exo/main.py index 30c54e292..520f2e2fb 100644 --- a/src/exo/main.py +++ b/src/exo/main.py @@ -3,6 +3,7 @@ import multiprocessing as mp import os import resource import signal +import sys from dataclasses import dataclass, field from typing import Self @@ -22,6 +23,7 @@ from exo.shared.election import Election, ElectionResult from exo.shared.logging import logger_cleanup, logger_setup from exo.shared.types.common import NodeId, SessionId from exo.utils.channels import Receiver, channel +from exo.utils.pidfile import PidfileLockError, acquire_exo_pidfile from exo.utils.pydantic_ext import FrozenModel from exo.utils.task_group import TaskGroup from exo.worker.main import Worker @@ -264,12 +266,20 @@ class Node: def main(): + # Exit early if no PID file (not compatible with double-for daemonization yet) + try: + pidfile = acquire_exo_pidfile() + except PidfileLockError as exception: + print(exception, file=sys.stderr) + raise SystemExit(1) from exception + args = Args.parse() soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) target = min(max(soft, 65535), hard) resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard)) mp.set_start_method("spawn", force=True) + # TODO: Refactor the current verbosity system logger_setup(EXO_LOG, args.verbosity) logger.info(f"{'=' * 40}") @@ -306,6 +316,7 @@ def main(): finally: logger.info("EXO Shutdown complete") logger_cleanup() + del pidfile class Args(FrozenModel): diff --git a/src/exo/shared/constants.py b/src/exo/shared/constants.py index d79354184..bd1c537bf 100644 --- a/src/exo/shared/constants.py +++ b/src/exo/shared/constants.py @@ -69,6 +69,7 @@ DASHBOARD_DIR = ( EXO_LOG_DIR = EXO_CACHE_HOME / "exo_log" EXO_LOG = EXO_LOG_DIR / "exo.log" EXO_TEST_LOG = EXO_CACHE_HOME / "exo_test.log" +EXO_PID_FILE = EXO_CACHE_HOME / "exo.pid" # Identity (config) EXO_NODE_ID_KEYPAIR = EXO_CONFIG_HOME / "node_id.keypair" diff --git a/src/exo/utils/pidfile.py b/src/exo/utils/pidfile.py new file mode 100644 index 000000000..54b76e940 --- /dev/null +++ b/src/exo/utils/pidfile.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import Final + +from exo_pyo3_bindings import Pidfile, PidfileError + +from exo.shared.constants import EXO_PID_FILE + +_PIDFILE_MODE: Final = 0o600 + + +class PidfileLockError(RuntimeError): + pass + + +def acquire_exo_pidfile() -> Pidfile: + path = EXO_PID_FILE + try: + pidfile = Pidfile(path, _PIDFILE_MODE) + pidfile.write() + except (OSError, PidfileError) as exception: + raise PidfileLockError( + f"Failed to acquire EXO pidfile at {path}: {exception}" + ) from exception + + return pidfile diff --git a/src/exo/utils/tests/conftest.py b/src/exo/utils/tests/conftest.py new file mode 100644 index 000000000..a4cae26cc --- /dev/null +++ b/src/exo/utils/tests/conftest.py @@ -0,0 +1,8 @@ +import multiprocessing as mp + +import pytest + + +@pytest.fixture(scope="session", autouse=True) +def mp_force_spawn(): + mp.set_start_method("spawn", force=True) diff --git a/src/exo/utils/tests/test_pidfile.py b/src/exo/utils/tests/test_pidfile.py new file mode 100644 index 000000000..c4fa86698 --- /dev/null +++ b/src/exo/utils/tests/test_pidfile.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import gc +import os +import subprocess +import sys +import textwrap +from pathlib import Path +from typing import Final + +import pytest + +import exo.utils.pidfile as pidfile +from exo.utils.pidfile import acquire_exo_pidfile + +_CHILD_ACQUIRE_PIDFILE_SCRIPT: Final = textwrap.dedent( + """ + import sys + from pathlib import Path + from unittest.mock import patch + + import exo.utils.pidfile as pidfile + from exo.utils.pidfile import PidfileLockError, acquire_exo_pidfile + + with patch.object(pidfile, "EXO_PID_FILE", Path(sys.argv[1])): + try: + handle = acquire_exo_pidfile() + except PidfileLockError as exception: + print(str(exception)) + raise SystemExit(73) from exception + + del handle + """ +) + + +def _use_pidfile_path(monkeypatch: pytest.MonkeyPatch, path: Path) -> None: + monkeypatch.setattr(pidfile, "EXO_PID_FILE", path) + + +def _run_child_acquire_pidfile(path: Path) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, "-c", _CHILD_ACQUIRE_PIDFILE_SCRIPT, str(path)], + check=False, + capture_output=True, + text=True, + ) + + +def test_acquire_exo_pidfile_writes_current_pid_and_removes_on_drop( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + path = tmp_path / "exo.pid" + _use_pidfile_path(monkeypatch, path) + + handle = acquire_exo_pidfile() + assert path.read_text() == str(os.getpid()) + + del handle + gc.collect() + + assert not path.exists() + + +def test_acquire_exo_pidfile_rejects_second_process( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + path = tmp_path / "exo.pid" + _use_pidfile_path(monkeypatch, path) + + handle = acquire_exo_pidfile() + try: + blocked_child = _run_child_acquire_pidfile(path) + assert blocked_child.returncode == 73 + assert "Failed to acquire EXO pidfile" in blocked_child.stdout + finally: + del handle + gc.collect() + + unblocked_child = _run_child_acquire_pidfile(path) + assert unblocked_child.returncode == 0 + assert unblocked_child.stdout == ""