add forwarder supervisor

Co-authored-by: Gelu Vrabie <gelu@exolabs.net>
This commit is contained in:
Gelu Vrabie
2025-07-21 20:21:43 +01:00
committed by GitHub
parent bae58dd368
commit 54efd01d77
6 changed files with 636 additions and 1 deletions

View File

@@ -0,0 +1,24 @@
from logging import Logger
from master.forwarder_supervisor import ForwarderRole, ForwarderSupervisor
class ElectionCallbacks:
"""
Simple callbacks for the Rust election system to invoke.
No event system involvement - just direct forwarder control.
"""
def __init__(self, forwarder_supervisor: ForwarderSupervisor, logger: Logger):
self._forwarder_supervisor = forwarder_supervisor
self._logger = logger
async def on_became_master(self) -> None:
"""Called when this node is elected as master"""
self._logger.info("Node elected as master")
await self._forwarder_supervisor.notify_role_change(ForwarderRole.MASTER)
async def on_became_replica(self) -> None:
"""Called when this node becomes a replica"""
self._logger.info("Node demoted to replica")
await self._forwarder_supervisor.notify_role_change(ForwarderRole.REPLICA)

View File

@@ -1,5 +1,8 @@
from pathlib import Path
from shared.env import BaseEnv
class MasterEnvironmentSchema(BaseEnv):
pass
# Master-specific: forwarder configuration
FORWARDER_BINARY_PATH: Path | None = None

View File

@@ -0,0 +1,186 @@
import asyncio
import contextlib
from enum import Enum
from logging import Logger
from pathlib import Path
from shared.constants import (
EXO_GLOBAL_EVENT_DB,
EXO_WORKER_EVENT_DB,
LIBP2P_GLOBAL_EVENTS_TOPIC,
LIBP2P_WORKER_EVENTS_TOPIC,
)
class ForwarderRole(str, Enum):
"""Role determines which forwarding pairs to use"""
MASTER = "master"
REPLICA = "replica"
class ForwarderSupervisor:
"""
Manages the forwarder subprocess for SQLite ↔ libp2p event forwarding.
The forwarder is a single process that handles multiple forwarding pairs.
Master mode forwards:
- sqlite:worker_events.db:events → libp2p:worker_events (share local worker events)
- libp2p:worker_events → sqlite:global_events.db:events (collect network worker events)
- sqlite:global_events.db:events → libp2p:global_events (broadcast merged global log)
Replica mode forwards:
- sqlite:worker_events.db:events → libp2p:worker_events (share local worker events)
- libp2p:global_events → sqlite:global_events.db:events (receive global log from master)
"""
def __init__(
self,
forwarder_binary_path: Path,
logger: Logger,
health_check_interval: float = 5.0
):
self._binary_path = forwarder_binary_path
self._logger = logger
self._health_check_interval = health_check_interval
self._current_role: ForwarderRole | None = None
self._process: asyncio.subprocess.Process | None = None
self._health_check_task: asyncio.Task[None] | None = None
async def notify_role_change(self, new_role: ForwarderRole) -> None:
"""
Called by external systems (e.g., election handler) when role changes.
This is the main public interface.
"""
if self._current_role == new_role:
self._logger.debug(f"Role unchanged: {new_role}")
return
self._logger.info(f"Role changing from {self._current_role} to {new_role}")
self._current_role = new_role
await self._restart_with_role(new_role)
async def start_as_replica(self) -> None:
"""Convenience method to start in replica mode"""
await self.notify_role_change(ForwarderRole.REPLICA)
async def stop(self) -> None:
"""Stop forwarder and cleanup"""
await self._stop_process()
self._current_role = None
def _get_forwarding_pairs(self, role: ForwarderRole) -> str:
"""
Generate forwarding pairs based on role.
Returns list of "source,sink" strings.
"""
pairs: list[str] = []
# Both master and replica forward local worker events to network
pairs.append(
f"sqlite:{EXO_WORKER_EVENT_DB}:events|libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}"
)
if role == ForwarderRole.MASTER:
# Master: collect worker events from network into global log
pairs.append(
f"libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events"
)
# Master: broadcast global events to network
pairs.append(
f"sqlite:{EXO_GLOBAL_EVENT_DB}:events|libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}"
)
else: # REPLICA
# Replica: receive global events from master
pairs.append(
f"libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events"
)
return ','.join(pairs)
async def _restart_with_role(self, role: ForwarderRole) -> None:
"""Internal method to restart forwarder with new role"""
await self._stop_process()
pairs: str = self._get_forwarding_pairs(role)
self._process = await asyncio.create_subprocess_exec(
str(self._binary_path),
f'{pairs}',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._logger.info(f"Starting forwarder with forwarding pairs: {pairs}")
# Start health monitoring
self._health_check_task = asyncio.create_task(
self._monitor_health()
)
async def _stop_process(self) -> None:
"""Stop the forwarder process gracefully"""
if self._health_check_task:
self._health_check_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._health_check_task
self._health_check_task = None
if self._process:
# Check if process is already dead
if self._process.returncode is None:
# Process is still alive, terminate it
try:
self._process.terminate()
await asyncio.wait_for(self._process.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._logger.warning("Forwarder didn't terminate, killing")
self._process.kill()
await self._process.wait()
except ProcessLookupError:
# Process already dead
pass
self._process = None
async def _monitor_health(self) -> None:
"""Monitor process health and restart if it crashes"""
while self._process and self._current_role:
try:
# Check if process is still alive
retcode = await asyncio.wait_for(
self._process.wait(),
timeout=self._health_check_interval
)
# Process exited
self._logger.error(f"Forwarder exited with code {retcode}")
# Auto-restart
await asyncio.sleep(0.2) # Brief delay before restart
if self._current_role: # Still have a role
await self._restart_with_role(self._current_role)
break
except asyncio.TimeoutError:
# Process still running, continue monitoring
continue
except asyncio.CancelledError:
break
@property
def is_running(self) -> bool:
"""Check if forwarder process is running"""
return self._process is not None and self._process.returncode is None
@property
def current_role(self) -> ForwarderRole | None:
"""Get current forwarder role (for testing)"""
return self._current_role
@property
def process_pid(self) -> int | None:
"""Get current process PID (for testing)"""
return self._process.pid if self._process else None
@property
def process(self) -> asyncio.subprocess.Process | None:
"""Get current process (for testing)"""
return self._process

View File

@@ -106,6 +106,45 @@ async def lifespan(app: FastAPI):
metrics_listener.start()
cluster_listener.start()
# # Get validated environment
# env = get_validated_env(MasterEnvironmentSchema, logger)
# # Initialize event log manager (creates both worker and global event DBs)
# event_log_config = EventLogConfig() # Uses default config
# event_log_manager = EventLogManager(
# config=event_log_config,
# logger=logger
# )
# await event_log_manager.initialize()
# # Store for use in API handlers
# app.state.event_log_manager = event_log_manager
# # Initialize forwarder if configured
# if env.FORWARDER_BINARY_PATH:
# forwarder_supervisor = ForwarderSupervisor(
# forwarder_binary_path=env.FORWARDER_BINARY_PATH,
# logger=logger
# )
# # Start as replica by default (until elected)
# await forwarder_supervisor.start_as_replica()
# # Create election callbacks for Rust election system
# election_callbacks = ElectionCallbacks(
# forwarder_supervisor=forwarder_supervisor,
# logger=logger
# )
# # Make callbacks available for Rust code to invoke
# app.state.election_callbacks = election_callbacks
# # Log status
# logger.info(
# f"Forwarder supervisor initialized. Running: {forwarder_supervisor.is_running}"
# )
# else:
# logger.warning("No forwarder binary path configured")
# forwarder_supervisor = None
# initial_state = get_master_state(logger)
# app.state.master_event_loop = MasterEventLoop()
# await app.state.master_event_loop.start()

View File

@@ -0,0 +1,379 @@
"""
Comprehensive unit tests for Forwardersupervisor.
Tests basic functionality, process management, and edge cases.
"""
import asyncio
import logging
import os
import tempfile
from pathlib import Path
from typing import AsyncGenerator, Callable, Generator
from unittest.mock import AsyncMock, MagicMock
import pytest
import pytest_asyncio
from master.election_callback import ElectionCallbacks
from master.forwarder_supervisor import (
ForwarderRole,
ForwarderSupervisor,
)
from shared.constants import (
EXO_GLOBAL_EVENT_DB,
EXO_WORKER_EVENT_DB,
LIBP2P_GLOBAL_EVENTS_TOPIC,
LIBP2P_WORKER_EVENTS_TOPIC,
)
# Mock forwarder script content
MOCK_FORWARDER_SCRIPT = '''#!/usr/bin/env python3
"""Mock forwarder for testing."""
import os
import sys
import time
import signal
from pathlib import Path
def log(message: str) -> None:
"""Write to both stdout and a log file for test verification"""
print(message, flush=True)
# Also write to a file for test verification
log_file = os.environ.get("MOCK_LOG_FILE")
if log_file:
with open(log_file, "a") as f:
f.write(f"{time.time()}: {message}\\n")
def handle_signal(signum: int, frame: object) -> None:
"""Handle termination signals gracefully"""
log(f"Received signal {signum}")
sys.exit(0)
def main() -> None:
# Register signal handlers
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# Log startup with arguments
args = sys.argv[1:] if len(sys.argv) > 1 else []
log(f"Mock forwarder started with args: {args}")
# Write PID file if requested (for testing process management)
pid_file = os.environ.get("MOCK_PID_FILE")
if pid_file:
Path(pid_file).write_text(str(os.getpid()))
# Check for test control environment variables
exit_after = os.environ.get("MOCK_EXIT_AFTER")
exit_code = int(os.environ.get("MOCK_EXIT_CODE", "0"))
hang_mode = os.environ.get("MOCK_HANG_MODE", "false").lower() == "true"
ignore_signals = os.environ.get("MOCK_IGNORE_SIGNALS", "false").lower() == "true"
if ignore_signals:
# Ignore SIGTERM for testing force kill scenarios
signal.signal(signal.SIGTERM, signal.SIG_IGN)
log("Ignoring SIGTERM signal")
# Simulate work
start_time = time.time()
while True:
if exit_after and (time.time() - start_time) >= float(exit_after):
log(f"Exiting after {exit_after} seconds with code {exit_code}")
sys.exit(exit_code)
if hang_mode:
# Simulate a hanging process (no CPU usage but not responding)
time.sleep(3600) # Sleep for an hour
else:
# Normal operation - small sleep to not consume CPU
time.sleep(0.1)
if __name__ == "__main__":
main()
'''
@pytest.fixture
def temp_dir() -> Generator[Path, None, None]:
"""Create a temporary directory and clean it up after test."""
temp_path = Path(tempfile.mkdtemp(prefix="exo_test_"))
yield temp_path
# Clean up
import shutil
shutil.rmtree(temp_path, ignore_errors=True)
@pytest.fixture
def mock_forwarder_script(temp_dir: Path) -> Path:
"""Create the mock forwarder executable."""
mock_script = temp_dir / "mock_forwarder.py"
mock_script.write_text(MOCK_FORWARDER_SCRIPT)
mock_script.chmod(0o755)
return mock_script
@pytest.fixture
def test_logger() -> logging.Logger:
"""Create a test logger."""
logger = logging.getLogger("test_forwarder")
logger.setLevel(logging.DEBUG)
# Add console handler for debugging
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
@pytest.fixture
def mock_env_vars(temp_dir: Path) -> dict[str, str]:
"""Environment variables for controlling mock forwarder behavior."""
return {
"MOCK_LOG_FILE": str(temp_dir / "mock_forwarder.log"),
"MOCK_PID_FILE": str(temp_dir / "mock_forwarder.pid"),
}
@pytest_asyncio.fixture
async def cleanup_processes() -> AsyncGenerator[set[int], None]:
"""Track and cleanup any processes created during tests."""
tracked_pids: set[int] = set()
yield tracked_pids
# Cleanup any remaining processes - simplified to avoid psutil dependency
import contextlib
import subprocess
for pid in tracked_pids:
with contextlib.suppress(Exception):
subprocess.run(["kill", str(pid)], check=False, timeout=1)
@pytest.fixture
def track_subprocess(cleanup_processes: set[int]) -> Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process]:
"""Function to track subprocess PIDs for cleanup."""
def track(process: asyncio.subprocess.Process) -> asyncio.subprocess.Process:
if process.pid:
cleanup_processes.add(process.pid)
return process
return track
class TestForwardersupervisorBasic:
"""Basic functionality tests for Forwardersupervisor."""
@pytest.mark.asyncio
async def test_start_as_replica(
self,
mock_forwarder_script: Path,
mock_env_vars: dict[str, str],
test_logger: logging.Logger,
track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process]
) -> None:
"""Test starting forwarder in replica mode."""
# Set environment
os.environ.update(mock_env_vars)
supervisor = ForwarderSupervisor(mock_forwarder_script, test_logger)
await supervisor.start_as_replica()
# Track the process for cleanup
if supervisor.process:
track_subprocess(supervisor.process)
try:
# Verify process is running
assert supervisor.is_running
assert supervisor.current_role == ForwarderRole.REPLICA
# Wait a bit for log file to be written
await asyncio.sleep(0.5)
# Verify forwarding pairs in log
log_content = Path(mock_env_vars["MOCK_LOG_FILE"]).read_text()
# Expected replica forwarding pairs
expected_pairs = [
f"sqlite:{EXO_WORKER_EVENT_DB}:events|libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}",
f"libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events"
]
# Check that the forwarder received the correct arguments
assert all(pair in log_content for pair in expected_pairs)
finally:
await supervisor.stop()
assert not supervisor.is_running
@pytest.mark.asyncio
async def test_role_change_replica_to_master(
self,
mock_forwarder_script: Path,
mock_env_vars: dict[str, str],
test_logger: logging.Logger,
track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process]
) -> None:
"""Test changing role from replica to master."""
os.environ.update(mock_env_vars)
supervisor = ForwarderSupervisor(mock_forwarder_script, test_logger)
await supervisor.start_as_replica()
if supervisor.process:
track_subprocess(supervisor.process)
try:
# Change to master
await supervisor.notify_role_change(ForwarderRole.MASTER)
if supervisor.process:
track_subprocess(supervisor.process)
# Wait for restart
await asyncio.sleep(0.5)
assert supervisor.is_running
assert supervisor.current_role == ForwarderRole.MASTER
# Verify new forwarding pairs
log_content = Path(mock_env_vars["MOCK_LOG_FILE"]).read_text()
# Expected master forwarding pairs
master_pairs = [
f"libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events",
f"sqlite:{EXO_GLOBAL_EVENT_DB}:events|libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}"
]
assert all(pair in log_content for pair in master_pairs)
finally:
await supervisor.stop()
@pytest.mark.asyncio
async def test_idempotent_role_change(
self,
mock_forwarder_script: Path,
mock_env_vars: dict[str, str],
test_logger: logging.Logger,
track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process],
) -> None:
"""Test that setting the same role twice doesn't restart the process."""
os.environ.update(mock_env_vars)
supervisor = ForwarderSupervisor(mock_forwarder_script, test_logger)
await supervisor.start_as_replica()
original_pid = supervisor.process_pid
if supervisor.process:
track_subprocess(supervisor.process)
try:
# Try to change to the same role
await supervisor.notify_role_change(ForwarderRole.REPLICA)
# Should not restart (same PID)
assert supervisor.process_pid == original_pid
finally:
await supervisor.stop()
@pytest.mark.asyncio
async def test_process_crash_and_restart(
self,
mock_forwarder_script: Path,
mock_env_vars: dict[str, str],
test_logger: logging.Logger,
track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process]
) -> None:
"""Test that Forwardersupervisor restarts the process if it crashes."""
# Configure mock to exit after 1 second
mock_env_vars["MOCK_EXIT_AFTER"] = "1"
mock_env_vars["MOCK_EXIT_CODE"] = "1"
os.environ.update(mock_env_vars)
supervisor = ForwarderSupervisor(
mock_forwarder_script,
test_logger,
health_check_interval=0.5 # Faster health checks for testing
)
await supervisor.start_as_replica()
original_pid = supervisor.process_pid
if supervisor.process:
track_subprocess(supervisor.process)
try:
# Wait for first crash
await asyncio.sleep(1.5)
# Process should have crashed
assert not supervisor.is_running or supervisor.process_pid != original_pid
# Clear the crash-inducing environment variables so restart works
if "MOCK_EXIT_AFTER" in os.environ:
del os.environ["MOCK_EXIT_AFTER"]
if "MOCK_EXIT_CODE" in os.environ:
del os.environ["MOCK_EXIT_CODE"]
# Wait for restart
await asyncio.sleep(1.0)
# Process should have restarted with new PID
assert supervisor.is_running
assert supervisor.process_pid != original_pid
# Track new process
if supervisor.process:
track_subprocess(supervisor.process)
finally:
await supervisor.stop()
@pytest.mark.asyncio
async def test_nonexistent_binary(
self,
test_logger: logging.Logger,
temp_dir: Path
) -> None:
"""Test behavior when forwarder binary doesn't exist."""
nonexistent_path = temp_dir / "nonexistent_forwarder"
supervisor = ForwarderSupervisor(nonexistent_path, test_logger)
# Should raise FileNotFoundError
with pytest.raises(FileNotFoundError):
await supervisor.start_as_replica()
class TestElectionCallbacks:
"""Test suite for ElectionCallbacks."""
@pytest.mark.asyncio
async def test_on_became_master(self, test_logger: logging.Logger) -> None:
"""Test callback when becoming master."""
mock_supervisor = MagicMock(spec=ForwarderSupervisor)
mock_supervisor.notify_role_change = AsyncMock()
callbacks = ElectionCallbacks(mock_supervisor, test_logger)
await callbacks.on_became_master()
mock_supervisor.notify_role_change.assert_called_once_with(ForwarderRole.MASTER) # type: ignore
@pytest.mark.asyncio
async def test_on_became_replica(self, test_logger: logging.Logger) -> None:
"""Test callback when becoming replica."""
mock_supervisor = MagicMock(spec=ForwarderSupervisor)
mock_supervisor.notify_role_change = AsyncMock()
callbacks = ElectionCallbacks(mock_supervisor, test_logger)
await callbacks.on_became_replica()
mock_supervisor.notify_role_change.assert_called_once_with(ForwarderRole.REPLICA) # type: ignore

View File

@@ -12,6 +12,10 @@ EXO_WORKER_LOG = EXO_HOME / "worker.log"
EXO_WORKER_KEYRING_FILE = EXO_HOME / "worker_keyring"
EXO_MASTER_KEYRING_FILE = EXO_HOME / "master_keyring"
# libp2p topics for event forwarding
LIBP2P_WORKER_EVENTS_TOPIC = "worker_events"
LIBP2P_GLOBAL_EVENTS_TOPIC = "global_events"
# little helper function to get the name of the module that raised the error
def get_caller_module_name() -> str: