diff --git a/master/election_callback.py b/master/election_callback.py new file mode 100644 index 00000000..a3cba9b4 --- /dev/null +++ b/master/election_callback.py @@ -0,0 +1,24 @@ +from logging import Logger + +from master.forwarder_supervisor import ForwarderRole, ForwarderSupervisor + + +class ElectionCallbacks: + """ + Simple callbacks for the Rust election system to invoke. + No event system involvement - just direct forwarder control. + """ + + def __init__(self, forwarder_supervisor: ForwarderSupervisor, logger: Logger): + self._forwarder_supervisor = forwarder_supervisor + self._logger = logger + + async def on_became_master(self) -> None: + """Called when this node is elected as master""" + self._logger.info("Node elected as master") + await self._forwarder_supervisor.notify_role_change(ForwarderRole.MASTER) + + async def on_became_replica(self) -> None: + """Called when this node becomes a replica""" + self._logger.info("Node demoted to replica") + await self._forwarder_supervisor.notify_role_change(ForwarderRole.REPLICA) \ No newline at end of file diff --git a/master/env.py b/master/env.py index dadeee5f..284bf585 100644 --- a/master/env.py +++ b/master/env.py @@ -1,5 +1,8 @@ +from pathlib import Path + from shared.env import BaseEnv class MasterEnvironmentSchema(BaseEnv): - pass + # Master-specific: forwarder configuration + FORWARDER_BINARY_PATH: Path | None = None diff --git a/master/forwarder_supervisor.py b/master/forwarder_supervisor.py new file mode 100644 index 00000000..bdec1f7e --- /dev/null +++ b/master/forwarder_supervisor.py @@ -0,0 +1,186 @@ +import asyncio +import contextlib +from enum import Enum +from logging import Logger +from pathlib import Path + +from shared.constants import ( + EXO_GLOBAL_EVENT_DB, + EXO_WORKER_EVENT_DB, + LIBP2P_GLOBAL_EVENTS_TOPIC, + LIBP2P_WORKER_EVENTS_TOPIC, +) + + +class ForwarderRole(str, Enum): + """Role determines which forwarding pairs to use""" + MASTER = "master" + REPLICA = "replica" + + +class ForwarderSupervisor: + """ + Manages the forwarder subprocess for SQLite ↔ libp2p event forwarding. + The forwarder is a single process that handles multiple forwarding pairs. + + Master mode forwards: + - sqlite:worker_events.db:events → libp2p:worker_events (share local worker events) + - libp2p:worker_events → sqlite:global_events.db:events (collect network worker events) + - sqlite:global_events.db:events → libp2p:global_events (broadcast merged global log) + + Replica mode forwards: + - sqlite:worker_events.db:events → libp2p:worker_events (share local worker events) + - libp2p:global_events → sqlite:global_events.db:events (receive global log from master) + """ + + def __init__( + self, + forwarder_binary_path: Path, + logger: Logger, + health_check_interval: float = 5.0 + ): + self._binary_path = forwarder_binary_path + self._logger = logger + self._health_check_interval = health_check_interval + self._current_role: ForwarderRole | None = None + self._process: asyncio.subprocess.Process | None = None + self._health_check_task: asyncio.Task[None] | None = None + + async def notify_role_change(self, new_role: ForwarderRole) -> None: + """ + Called by external systems (e.g., election handler) when role changes. + This is the main public interface. + """ + if self._current_role == new_role: + self._logger.debug(f"Role unchanged: {new_role}") + return + + self._logger.info(f"Role changing from {self._current_role} to {new_role}") + self._current_role = new_role + await self._restart_with_role(new_role) + + async def start_as_replica(self) -> None: + """Convenience method to start in replica mode""" + await self.notify_role_change(ForwarderRole.REPLICA) + + async def stop(self) -> None: + """Stop forwarder and cleanup""" + await self._stop_process() + self._current_role = None + + def _get_forwarding_pairs(self, role: ForwarderRole) -> str: + """ + Generate forwarding pairs based on role. + Returns list of "source,sink" strings. + """ + pairs: list[str] = [] + + # Both master and replica forward local worker events to network + pairs.append( + f"sqlite:{EXO_WORKER_EVENT_DB}:events|libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}" + ) + + if role == ForwarderRole.MASTER: + # Master: collect worker events from network into global log + pairs.append( + f"libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events" + ) + # Master: broadcast global events to network + pairs.append( + f"sqlite:{EXO_GLOBAL_EVENT_DB}:events|libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}" + ) + else: # REPLICA + # Replica: receive global events from master + pairs.append( + f"libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events" + ) + + return ','.join(pairs) + + async def _restart_with_role(self, role: ForwarderRole) -> None: + """Internal method to restart forwarder with new role""" + await self._stop_process() + + + pairs: str = self._get_forwarding_pairs(role) + self._process = await asyncio.create_subprocess_exec( + str(self._binary_path), + f'{pairs}', + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + self._logger.info(f"Starting forwarder with forwarding pairs: {pairs}") + + # Start health monitoring + self._health_check_task = asyncio.create_task( + self._monitor_health() + ) + + async def _stop_process(self) -> None: + """Stop the forwarder process gracefully""" + if self._health_check_task: + self._health_check_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._health_check_task + self._health_check_task = None + + if self._process: + # Check if process is already dead + if self._process.returncode is None: + # Process is still alive, terminate it + try: + self._process.terminate() + await asyncio.wait_for(self._process.wait(), timeout=5.0) + except asyncio.TimeoutError: + self._logger.warning("Forwarder didn't terminate, killing") + self._process.kill() + await self._process.wait() + except ProcessLookupError: + # Process already dead + pass + self._process = None + + async def _monitor_health(self) -> None: + """Monitor process health and restart if it crashes""" + while self._process and self._current_role: + try: + # Check if process is still alive + retcode = await asyncio.wait_for( + self._process.wait(), + timeout=self._health_check_interval + ) + # Process exited + self._logger.error(f"Forwarder exited with code {retcode}") + + # Auto-restart + await asyncio.sleep(0.2) # Brief delay before restart + if self._current_role: # Still have a role + await self._restart_with_role(self._current_role) + break + + except asyncio.TimeoutError: + # Process still running, continue monitoring + continue + except asyncio.CancelledError: + break + + @property + def is_running(self) -> bool: + """Check if forwarder process is running""" + return self._process is not None and self._process.returncode is None + + @property + def current_role(self) -> ForwarderRole | None: + """Get current forwarder role (for testing)""" + return self._current_role + + @property + def process_pid(self) -> int | None: + """Get current process PID (for testing)""" + return self._process.pid if self._process else None + + @property + def process(self) -> asyncio.subprocess.Process | None: + """Get current process (for testing)""" + return self._process diff --git a/master/main.py b/master/main.py index 730289ac..a8fd53ca 100644 --- a/master/main.py +++ b/master/main.py @@ -106,6 +106,45 @@ async def lifespan(app: FastAPI): metrics_listener.start() cluster_listener.start() + # # Get validated environment + # env = get_validated_env(MasterEnvironmentSchema, logger) + + # # Initialize event log manager (creates both worker and global event DBs) + # event_log_config = EventLogConfig() # Uses default config + # event_log_manager = EventLogManager( + # config=event_log_config, + # logger=logger + # ) + # await event_log_manager.initialize() + + # # Store for use in API handlers + # app.state.event_log_manager = event_log_manager + + # # Initialize forwarder if configured + # if env.FORWARDER_BINARY_PATH: + # forwarder_supervisor = ForwarderSupervisor( + # forwarder_binary_path=env.FORWARDER_BINARY_PATH, + # logger=logger + # ) + # # Start as replica by default (until elected) + # await forwarder_supervisor.start_as_replica() + + # # Create election callbacks for Rust election system + # election_callbacks = ElectionCallbacks( + # forwarder_supervisor=forwarder_supervisor, + # logger=logger + # ) + + # # Make callbacks available for Rust code to invoke + # app.state.election_callbacks = election_callbacks + + # # Log status + # logger.info( + # f"Forwarder supervisor initialized. Running: {forwarder_supervisor.is_running}" + # ) + # else: + # logger.warning("No forwarder binary path configured") + # forwarder_supervisor = None # initial_state = get_master_state(logger) # app.state.master_event_loop = MasterEventLoop() # await app.state.master_event_loop.start() diff --git a/master/tests/test_forwarder_manager.py b/master/tests/test_forwarder_manager.py new file mode 100644 index 00000000..0160362b --- /dev/null +++ b/master/tests/test_forwarder_manager.py @@ -0,0 +1,379 @@ +""" +Comprehensive unit tests for Forwardersupervisor. +Tests basic functionality, process management, and edge cases. +""" +import asyncio +import logging +import os +import tempfile +from pathlib import Path +from typing import AsyncGenerator, Callable, Generator +from unittest.mock import AsyncMock, MagicMock + +import pytest +import pytest_asyncio + +from master.election_callback import ElectionCallbacks +from master.forwarder_supervisor import ( + ForwarderRole, + ForwarderSupervisor, +) +from shared.constants import ( + EXO_GLOBAL_EVENT_DB, + EXO_WORKER_EVENT_DB, + LIBP2P_GLOBAL_EVENTS_TOPIC, + LIBP2P_WORKER_EVENTS_TOPIC, +) + +# Mock forwarder script content +MOCK_FORWARDER_SCRIPT = '''#!/usr/bin/env python3 +"""Mock forwarder for testing.""" +import os +import sys +import time +import signal +from pathlib import Path + + +def log(message: str) -> None: + """Write to both stdout and a log file for test verification""" + print(message, flush=True) + + # Also write to a file for test verification + log_file = os.environ.get("MOCK_LOG_FILE") + if log_file: + with open(log_file, "a") as f: + f.write(f"{time.time()}: {message}\\n") + + +def handle_signal(signum: int, frame: object) -> None: + """Handle termination signals gracefully""" + log(f"Received signal {signum}") + sys.exit(0) + + +def main() -> None: + # Register signal handlers + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + # Log startup with arguments + args = sys.argv[1:] if len(sys.argv) > 1 else [] + log(f"Mock forwarder started with args: {args}") + + # Write PID file if requested (for testing process management) + pid_file = os.environ.get("MOCK_PID_FILE") + if pid_file: + Path(pid_file).write_text(str(os.getpid())) + + # Check for test control environment variables + exit_after = os.environ.get("MOCK_EXIT_AFTER") + exit_code = int(os.environ.get("MOCK_EXIT_CODE", "0")) + hang_mode = os.environ.get("MOCK_HANG_MODE", "false").lower() == "true" + ignore_signals = os.environ.get("MOCK_IGNORE_SIGNALS", "false").lower() == "true" + + if ignore_signals: + # Ignore SIGTERM for testing force kill scenarios + signal.signal(signal.SIGTERM, signal.SIG_IGN) + log("Ignoring SIGTERM signal") + + # Simulate work + start_time = time.time() + while True: + if exit_after and (time.time() - start_time) >= float(exit_after): + log(f"Exiting after {exit_after} seconds with code {exit_code}") + sys.exit(exit_code) + + if hang_mode: + # Simulate a hanging process (no CPU usage but not responding) + time.sleep(3600) # Sleep for an hour + else: + # Normal operation - small sleep to not consume CPU + time.sleep(0.1) + + +if __name__ == "__main__": + main() +''' + + +@pytest.fixture +def temp_dir() -> Generator[Path, None, None]: + """Create a temporary directory and clean it up after test.""" + temp_path = Path(tempfile.mkdtemp(prefix="exo_test_")) + yield temp_path + # Clean up + import shutil + shutil.rmtree(temp_path, ignore_errors=True) + + +@pytest.fixture +def mock_forwarder_script(temp_dir: Path) -> Path: + """Create the mock forwarder executable.""" + mock_script = temp_dir / "mock_forwarder.py" + mock_script.write_text(MOCK_FORWARDER_SCRIPT) + mock_script.chmod(0o755) + return mock_script + + +@pytest.fixture +def test_logger() -> logging.Logger: + """Create a test logger.""" + logger = logging.getLogger("test_forwarder") + logger.setLevel(logging.DEBUG) + + # Add console handler for debugging + if not logger.handlers: + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + +@pytest.fixture +def mock_env_vars(temp_dir: Path) -> dict[str, str]: + """Environment variables for controlling mock forwarder behavior.""" + return { + "MOCK_LOG_FILE": str(temp_dir / "mock_forwarder.log"), + "MOCK_PID_FILE": str(temp_dir / "mock_forwarder.pid"), + } + + +@pytest_asyncio.fixture +async def cleanup_processes() -> AsyncGenerator[set[int], None]: + """Track and cleanup any processes created during tests.""" + tracked_pids: set[int] = set() + + yield tracked_pids + + # Cleanup any remaining processes - simplified to avoid psutil dependency + import contextlib + import subprocess + for pid in tracked_pids: + with contextlib.suppress(Exception): + subprocess.run(["kill", str(pid)], check=False, timeout=1) + + +@pytest.fixture +def track_subprocess(cleanup_processes: set[int]) -> Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process]: + """Function to track subprocess PIDs for cleanup.""" + def track(process: asyncio.subprocess.Process) -> asyncio.subprocess.Process: + if process.pid: + cleanup_processes.add(process.pid) + return process + return track + + +class TestForwardersupervisorBasic: + """Basic functionality tests for Forwardersupervisor.""" + + @pytest.mark.asyncio + async def test_start_as_replica( + self, + mock_forwarder_script: Path, + mock_env_vars: dict[str, str], + test_logger: logging.Logger, + track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process] + ) -> None: + """Test starting forwarder in replica mode.""" + # Set environment + os.environ.update(mock_env_vars) + + supervisor = ForwarderSupervisor(mock_forwarder_script, test_logger) + await supervisor.start_as_replica() + + # Track the process for cleanup + if supervisor.process: + track_subprocess(supervisor.process) + + try: + # Verify process is running + assert supervisor.is_running + assert supervisor.current_role == ForwarderRole.REPLICA + + # Wait a bit for log file to be written + await asyncio.sleep(0.5) + + # Verify forwarding pairs in log + log_content = Path(mock_env_vars["MOCK_LOG_FILE"]).read_text() + + # Expected replica forwarding pairs + expected_pairs = [ + f"sqlite:{EXO_WORKER_EVENT_DB}:events|libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}", + f"libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events" + ] + + # Check that the forwarder received the correct arguments + assert all(pair in log_content for pair in expected_pairs) + + finally: + await supervisor.stop() + assert not supervisor.is_running + + @pytest.mark.asyncio + async def test_role_change_replica_to_master( + self, + mock_forwarder_script: Path, + mock_env_vars: dict[str, str], + test_logger: logging.Logger, + track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process] + ) -> None: + """Test changing role from replica to master.""" + os.environ.update(mock_env_vars) + + supervisor = ForwarderSupervisor(mock_forwarder_script, test_logger) + await supervisor.start_as_replica() + + if supervisor.process: + track_subprocess(supervisor.process) + + try: + # Change to master + await supervisor.notify_role_change(ForwarderRole.MASTER) + + if supervisor.process: + track_subprocess(supervisor.process) + + # Wait for restart + await asyncio.sleep(0.5) + + assert supervisor.is_running + assert supervisor.current_role == ForwarderRole.MASTER + + # Verify new forwarding pairs + log_content = Path(mock_env_vars["MOCK_LOG_FILE"]).read_text() + + # Expected master forwarding pairs + master_pairs = [ + f"libp2p:{LIBP2P_WORKER_EVENTS_TOPIC}|sqlite:{EXO_GLOBAL_EVENT_DB}:events", + f"sqlite:{EXO_GLOBAL_EVENT_DB}:events|libp2p:{LIBP2P_GLOBAL_EVENTS_TOPIC}" + ] + + assert all(pair in log_content for pair in master_pairs) + + finally: + await supervisor.stop() + + @pytest.mark.asyncio + async def test_idempotent_role_change( + self, + mock_forwarder_script: Path, + mock_env_vars: dict[str, str], + test_logger: logging.Logger, + track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process], + ) -> None: + """Test that setting the same role twice doesn't restart the process.""" + os.environ.update(mock_env_vars) + + supervisor = ForwarderSupervisor(mock_forwarder_script, test_logger) + await supervisor.start_as_replica() + + original_pid = supervisor.process_pid + if supervisor.process: + track_subprocess(supervisor.process) + + try: + # Try to change to the same role + await supervisor.notify_role_change(ForwarderRole.REPLICA) + + # Should not restart (same PID) + assert supervisor.process_pid == original_pid + + finally: + await supervisor.stop() + + @pytest.mark.asyncio + async def test_process_crash_and_restart( + self, + mock_forwarder_script: Path, + mock_env_vars: dict[str, str], + test_logger: logging.Logger, + track_subprocess: Callable[[asyncio.subprocess.Process], asyncio.subprocess.Process] + ) -> None: + """Test that Forwardersupervisor restarts the process if it crashes.""" + # Configure mock to exit after 1 second + mock_env_vars["MOCK_EXIT_AFTER"] = "1" + mock_env_vars["MOCK_EXIT_CODE"] = "1" + os.environ.update(mock_env_vars) + + supervisor = ForwarderSupervisor( + mock_forwarder_script, + test_logger, + health_check_interval=0.5 # Faster health checks for testing + ) + await supervisor.start_as_replica() + + original_pid = supervisor.process_pid + if supervisor.process: + track_subprocess(supervisor.process) + + try: + # Wait for first crash + await asyncio.sleep(1.5) + + # Process should have crashed + assert not supervisor.is_running or supervisor.process_pid != original_pid + + # Clear the crash-inducing environment variables so restart works + if "MOCK_EXIT_AFTER" in os.environ: + del os.environ["MOCK_EXIT_AFTER"] + if "MOCK_EXIT_CODE" in os.environ: + del os.environ["MOCK_EXIT_CODE"] + + # Wait for restart + await asyncio.sleep(1.0) + + # Process should have restarted with new PID + assert supervisor.is_running + assert supervisor.process_pid != original_pid + + # Track new process + if supervisor.process: + track_subprocess(supervisor.process) + + finally: + await supervisor.stop() + + @pytest.mark.asyncio + async def test_nonexistent_binary( + self, + test_logger: logging.Logger, + temp_dir: Path + ) -> None: + """Test behavior when forwarder binary doesn't exist.""" + nonexistent_path = temp_dir / "nonexistent_forwarder" + + supervisor = ForwarderSupervisor(nonexistent_path, test_logger) + + # Should raise FileNotFoundError + with pytest.raises(FileNotFoundError): + await supervisor.start_as_replica() + + +class TestElectionCallbacks: + """Test suite for ElectionCallbacks.""" + + @pytest.mark.asyncio + async def test_on_became_master(self, test_logger: logging.Logger) -> None: + """Test callback when becoming master.""" + mock_supervisor = MagicMock(spec=ForwarderSupervisor) + mock_supervisor.notify_role_change = AsyncMock() + + callbacks = ElectionCallbacks(mock_supervisor, test_logger) + await callbacks.on_became_master() + + mock_supervisor.notify_role_change.assert_called_once_with(ForwarderRole.MASTER) # type: ignore + + @pytest.mark.asyncio + async def test_on_became_replica(self, test_logger: logging.Logger) -> None: + """Test callback when becoming replica.""" + mock_supervisor = MagicMock(spec=ForwarderSupervisor) + mock_supervisor.notify_role_change = AsyncMock() + + callbacks = ElectionCallbacks(mock_supervisor, test_logger) + await callbacks.on_became_replica() + + mock_supervisor.notify_role_change.assert_called_once_with(ForwarderRole.REPLICA) # type: ignore \ No newline at end of file diff --git a/shared/constants.py b/shared/constants.py index 8172da3a..d187de03 100644 --- a/shared/constants.py +++ b/shared/constants.py @@ -12,6 +12,10 @@ EXO_WORKER_LOG = EXO_HOME / "worker.log" EXO_WORKER_KEYRING_FILE = EXO_HOME / "worker_keyring" EXO_MASTER_KEYRING_FILE = EXO_HOME / "master_keyring" +# libp2p topics for event forwarding +LIBP2P_WORKER_EVENTS_TOPIC = "worker_events" +LIBP2P_GLOBAL_EVENTS_TOPIC = "global_events" + # little helper function to get the name of the module that raised the error def get_caller_module_name() -> str: