Create PID file locking for EXO (#2072)

## Motivation

EXO should be PID file locked, to prevent duplicate processes from
clobbering the log, right now this isn't the case.

## Changes

I added a wrapper around a Rust PID file lock library, and used it to
implement PID locking for EXO, with the PID file being in exo cache
directory.

## Test Plan

### Manual Testing
Tested on e11, trying to spawn duplicate EXO processes prevented.
This commit is contained in:
Andrei Cravtov
2026-05-08 18:50:18 +01:00
committed by GitHub
parent fa57131374
commit e5a1e5dadb
11 changed files with 319 additions and 3 deletions

42
Cargo.lock generated
View File

@@ -916,11 +916,13 @@ dependencies = [
"libp2p",
"log",
"networking",
"pidfile-rs",
"pin-project",
"pyo3",
"pyo3-async-runtimes",
"pyo3-log",
"pyo3-stub-gen",
"thiserror 2.0.17",
"tokio",
"util",
]
@@ -964,6 +966,16 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
[[package]]
name = "flopen"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbfb8b5fbd1f27929f216650081a07b6ceb0741f0542c8c43ff7ef8e93a35a5d"
dependencies = [
"libc",
"nix 0.31.2",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -1789,9 +1801,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.178"
version = "0.2.186"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
[[package]]
name = "libp2p"
@@ -2807,6 +2819,18 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3"
dependencies = [
"bitflags 2.10.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nohash-hasher"
version = "0.2.0"
@@ -3060,6 +3084,18 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pidfile-rs"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1a8aa9a30b1b65ef48b333931b80f2324a14e00208eb2b8f5788f1180791bcc"
dependencies = [
"flopen",
"libc",
"log",
"thiserror 1.0.69",
]
[[package]]
name = "pin-project"
version = "1.1.10"
@@ -3668,7 +3704,7 @@ dependencies = [
"netlink-packet-utils",
"netlink-proto",
"netlink-sys",
"nix",
"nix 0.26.4",
"thiserror 1.0.69",
"tokio",
]

View File

@@ -46,9 +46,12 @@ pyo3-async-runtimes = { version = "0.27.0", features = [
] }
pyo3-log = "0.13.2"
pidfile-rs = "0.3"
# macro dependencies
extend = { workspace = true }
delegate = { workspace = true }
thiserror = "2.0"
# async runtime
tokio = { workspace = true, features = ["full", "tracing"] }

View File

@@ -2,6 +2,8 @@
# ruff: noqa: E501, F401
import builtins
import os
import pathlib
import typing
@typing.final
@@ -69,6 +71,48 @@ class NoPeersSubscribedToTopicError(builtins.Exception):
def __repr__(self) -> builtins.str: ...
def __str__(self) -> builtins.str: ...
@typing.final
class Pidfile:
r"""
A PID file protected with a lock.
An instance of `Pidfile` can be used to manage a PID file: create it,
lock it, detect already running daemons. It is backed by [`pidfile`][]
functions of `libbsd`/`libutil` which use `flopen` to lock the PID
file.
When a PID file is created, the process ID of the current process is
*not* written there, making it possible to lock the PID file before
forking and only write the ID of the forked process when it is ready.
The PID file is deleted automatically when the `Pidfile` comes out of
the scope. To close the PID file without deleting it, for example, in
the parent process of a forked daemon, call `close()`.
[`exit`]: https://doc.rust-lang.org/std/process/fn.exit.html
[`pidfile`]: https://linux.die.net/man/3/pidfile
[`daemon`(3)]: https://linux.die.net/man/3/daemon
"""
def __new__(cls, path: builtins.str | os.PathLike | pathlib.Path, mode: builtins.int) -> Pidfile:
r"""
Creates a new PID file and locks it.
If the PID file cannot be locked, returns `PidfileError::AlreadyRunning` with
a PID of the already running process, or `None` if no PID has been written to
the PID file yet.
"""
def write(self) -> None:
r"""
Writes the current process ID to the PID file.
The file is truncated before writing.
"""
@typing.final
class PidfileError(builtins.Exception):
def __repr__(self) -> builtins.str: ...
def __str__(self) -> builtins.str: ...
class PyFromSwarm:
@typing.final
class Connection(PyFromSwarm):

View File

@@ -7,9 +7,11 @@
mod allow_threading;
mod ident;
mod networking;
mod pidfile;
use crate::ident::PyKeypair;
use crate::networking::networking_submodule;
use crate::pidfile::pidfile_submodule;
use pyo3::prelude::PyModule;
use pyo3::types::PyModuleMethods;
use pyo3::{Bound, PyResult, pyclass, pymodule};
@@ -164,6 +166,7 @@ fn main_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
// too many importing issues...
m.add_class::<PyKeypair>()?;
networking_submodule(m)?;
pidfile_submodule(m)?;
// top-level constructs
// TODO: ...

View File

@@ -0,0 +1,87 @@
use pidfile_rs::{Pidfile, PidfileError};
use pyo3::exceptions::PyException;
use pyo3::prelude::{PyModule, PyModuleMethods};
use pyo3::{Bound, PyErr, PyResult, Python, pyclass, pymethods};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use std::fs::Permissions;
use std::os::unix::prelude::PermissionsExt;
use std::path::PathBuf;
#[gen_stub_pyclass]
#[pyclass(frozen, extends=PyException, name="PidfileError")]
pub struct PyPidfileError(PidfileError);
impl PyPidfileError {
// TODO: I actually like this pattern a LOT more but how to abstract??
fn into_pyerr(self, py: Python) -> PyErr {
match Bound::new(py, self) {
Ok(err) => PyErr::from_value(err.into_any()),
Err(err) => err,
}
}
}
#[gen_stub_pymethods]
#[pymethods]
impl PyPidfileError {
fn __repr__(&self) -> String {
format!("PidfileError(\"{}\")", self.0)
}
fn __str__(&self) -> String {
self.0.to_string()
}
}
/// A PID file protected with a lock.
///
/// An instance of `Pidfile` can be used to manage a PID file: create it,
/// lock it, detect already running daemons. It is backed by [`pidfile`][]
/// functions of `libbsd`/`libutil` which use `flopen` to lock the PID
/// file.
///
/// When a PID file is created, the process ID of the current process is
/// *not* written there, making it possible to lock the PID file before
/// forking and only write the ID of the forked process when it is ready.
///
/// The PID file is deleted automatically when the `Pidfile` comes out of
/// the scope. To close the PID file without deleting it, for example, in
/// the parent process of a forked daemon, call `close()`.
///
/// [`exit`]: https://doc.rust-lang.org/std/process/fn.exit.html
/// [`pidfile`]: https://linux.die.net/man/3/pidfile
/// [`daemon`(3)]: https://linux.die.net/man/3/daemon
#[gen_stub_pyclass]
#[pyclass(name = "Pidfile")]
pub struct PyPidfile(Pidfile);
#[gen_stub_pymethods]
#[pymethods]
impl PyPidfile {
/// Creates a new PID file and locks it.
///
/// If the PID file cannot be locked, returns `PidfileError::AlreadyRunning` with
/// a PID of the already running process, or `None` if no PID has been written to
/// the PID file yet.
#[new]
fn py_new(py: Python, path: PathBuf, mode: u32) -> PyResult<Self> {
Ok(Self(
Pidfile::new(&path, Permissions::from_mode(mode))
.map_err(|e| PyPidfileError(e).into_pyerr(py))?,
))
}
/// Writes the current process ID to the PID file.
///
/// The file is truncated before writing.
fn write<'py>(&mut self, py: Python<'py>) -> PyResult<()> {
self.0.write().map_err(|e| PyPidfileError(e).into_pyerr(py))
}
}
pub fn pidfile_submodule(m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<PyPidfileError>()?;
m.add_class::<PyPidfile>()?;
Ok(())
}

View File

@@ -1,10 +1,12 @@
import asyncio
import pytest
from _pytest.capture import CaptureFixture
from exo_pyo3_bindings import (
Keypair,
NetworkingHandle,
NoPeersSubscribedToTopicError,
Pidfile,
PyFromSwarm,
)
@@ -26,6 +28,13 @@ async def test_sleep_on_multiple_items() -> None:
print("caught it", e)
def test_pidfile(capsys: CaptureFixture[str]):
with capsys.disabled():
print("\nbefore python")
scoped_lock_file()
print("after python")
async def _await_recv(h: NetworkingHandle):
while True:
event = await h.recv()
@@ -34,3 +43,7 @@ async def _await_recv(h: NetworkingHandle):
print(f"PYTHON: connection update: {c}")
case PyFromSwarm.Message() as m:
print(f"PYTHON: message: {m}")
def scoped_lock_file():
a = Pidfile("/tmp/lock.pid", 0o0600)

View File

@@ -3,6 +3,7 @@ import multiprocessing as mp
import os
import resource
import signal
import sys
from dataclasses import dataclass, field
from typing import Self
@@ -22,6 +23,7 @@ from exo.shared.election import Election, ElectionResult
from exo.shared.logging import logger_cleanup, logger_setup
from exo.shared.types.common import NodeId, SessionId
from exo.utils.channels import Receiver, channel
from exo.utils.pidfile import PidfileLockError, acquire_exo_pidfile
from exo.utils.pydantic_ext import FrozenModel
from exo.utils.task_group import TaskGroup
from exo.worker.main import Worker
@@ -264,12 +266,20 @@ class Node:
def main():
# Exit early if no PID file (not compatible with double-for daemonization yet)
try:
pidfile = acquire_exo_pidfile()
except PidfileLockError as exception:
print(exception, file=sys.stderr)
raise SystemExit(1) from exception
args = Args.parse()
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
target = min(max(soft, 65535), hard)
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard))
mp.set_start_method("spawn", force=True)
# TODO: Refactor the current verbosity system
logger_setup(EXO_LOG, args.verbosity)
logger.info(f"{'=' * 40}")
@@ -306,6 +316,7 @@ def main():
finally:
logger.info("EXO Shutdown complete")
logger_cleanup()
del pidfile
class Args(FrozenModel):

View File

@@ -69,6 +69,7 @@ DASHBOARD_DIR = (
EXO_LOG_DIR = EXO_CACHE_HOME / "exo_log"
EXO_LOG = EXO_LOG_DIR / "exo.log"
EXO_TEST_LOG = EXO_CACHE_HOME / "exo_test.log"
EXO_PID_FILE = EXO_CACHE_HOME / "exo.pid"
# Identity (config)
EXO_NODE_ID_KEYPAIR = EXO_CONFIG_HOME / "node_id.keypair"

26
src/exo/utils/pidfile.py Normal file
View File

@@ -0,0 +1,26 @@
from __future__ import annotations
from typing import Final
from exo_pyo3_bindings import Pidfile, PidfileError
from exo.shared.constants import EXO_PID_FILE
_PIDFILE_MODE: Final = 0o600
class PidfileLockError(RuntimeError):
pass
def acquire_exo_pidfile() -> Pidfile:
path = EXO_PID_FILE
try:
pidfile = Pidfile(path, _PIDFILE_MODE)
pidfile.write()
except (OSError, PidfileError) as exception:
raise PidfileLockError(
f"Failed to acquire EXO pidfile at {path}: {exception}"
) from exception
return pidfile

View File

@@ -0,0 +1,8 @@
import multiprocessing as mp
import pytest
@pytest.fixture(scope="session", autouse=True)
def mp_force_spawn():
mp.set_start_method("spawn", force=True)

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import gc
import os
import subprocess
import sys
import textwrap
from pathlib import Path
from typing import Final
import pytest
import exo.utils.pidfile as pidfile
from exo.utils.pidfile import acquire_exo_pidfile
_CHILD_ACQUIRE_PIDFILE_SCRIPT: Final = textwrap.dedent(
"""
import sys
from pathlib import Path
from unittest.mock import patch
import exo.utils.pidfile as pidfile
from exo.utils.pidfile import PidfileLockError, acquire_exo_pidfile
with patch.object(pidfile, "EXO_PID_FILE", Path(sys.argv[1])):
try:
handle = acquire_exo_pidfile()
except PidfileLockError as exception:
print(str(exception))
raise SystemExit(73) from exception
del handle
"""
)
def _use_pidfile_path(monkeypatch: pytest.MonkeyPatch, path: Path) -> None:
monkeypatch.setattr(pidfile, "EXO_PID_FILE", path)
def _run_child_acquire_pidfile(path: Path) -> subprocess.CompletedProcess[str]:
return subprocess.run(
[sys.executable, "-c", _CHILD_ACQUIRE_PIDFILE_SCRIPT, str(path)],
check=False,
capture_output=True,
text=True,
)
def test_acquire_exo_pidfile_writes_current_pid_and_removes_on_drop(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
path = tmp_path / "exo.pid"
_use_pidfile_path(monkeypatch, path)
handle = acquire_exo_pidfile()
assert path.read_text() == str(os.getpid())
del handle
gc.collect()
assert not path.exists()
def test_acquire_exo_pidfile_rejects_second_process(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
path = tmp_path / "exo.pid"
_use_pidfile_path(monkeypatch, path)
handle = acquire_exo_pidfile()
try:
blocked_child = _run_child_acquire_pidfile(path)
assert blocked_child.returncode == 73
assert "Failed to acquire EXO pidfile" in blocked_child.stdout
finally:
del handle
gc.collect()
unblocked_child = _run_child_acquire_pidfile(path)
assert unblocked_child.returncode == 0
assert unblocked_child.stdout == ""