Fix: Orchestrator timeout and exception handling (#832)

Fixes #823
This commit is contained in:
Alex
2026-04-03 09:38:58 +01:00
committed by GitHub
parent c1143f808a
commit ff094bed56
3 changed files with 267 additions and 57 deletions

View File

@@ -56,9 +56,11 @@ _progress_lock = Lock()
# Stall detection - track last activity time per download
_last_activity: Dict[str, float] = {}
_last_progress_value: Dict[str, float] = {}
# De-duplicate status updates (keep-alive updates shouldn't spam clients)
_last_status_event: Dict[str, Tuple[str, Optional[str]]] = {}
STALL_TIMEOUT = 300 # 5 minutes without progress/status update = stalled
COORDINATOR_LOOP_ERROR_RETRY_DELAY = 1.0
def _is_plain_email_address(value: str) -> bool:
parsed = parseaddr(value or "")[1]
@@ -687,9 +689,13 @@ def update_download_progress(book_id: str, progress: float) -> None:
"""Update download progress with throttled WebSocket broadcasts."""
book_queue.update_progress(book_id, progress)
# Track activity for stall detection
# Only real progress changes should reset stall detection. Repeated keep-alive
# polls at the same percentage must not hide a stuck download forever.
with _progress_lock:
_last_activity[book_id] = time.time()
last_progress = _last_progress_value.get(book_id)
if last_progress is None or progress != last_progress:
_last_activity[book_id] = time.time()
_last_progress_value[book_id] = progress
# Broadcast progress via WebSocket with throttling
if ws_manager:
@@ -728,13 +734,11 @@ def update_download_status(book_id: str, status: str, message: Optional[str] = N
except ValueError:
return
# Always update activity timestamp (used by stall detection) even if the status
# event is a duplicate keep-alive update.
with _progress_lock:
_last_activity[book_id] = time.time()
status_event = (status_key, message)
if _last_status_event.get(book_id) == status_event:
return
_last_activity[book_id] = time.time()
_last_status_event[book_id] = status_event
# Update status message first so terminal snapshots capture the final message
@@ -812,6 +816,7 @@ def _cleanup_progress_tracking(task_id: str) -> None:
_progress_last_broadcast.pop(task_id, None)
_progress_last_broadcast.pop(f"{task_id}_progress", None)
_last_activity.pop(task_id, None)
_last_progress_value.pop(task_id, None)
_last_status_event.pop(task_id, None)
@@ -892,70 +897,77 @@ def concurrent_download_loop() -> None:
stalled_tasks: set[str] = set() # Track tasks already cancelled due to stall
while True:
# Clean up completed futures
completed_futures = [f for f in active_futures if f.done()]
for future in completed_futures:
task_id = active_futures.pop(future)
stalled_tasks.discard(task_id)
try:
future.result() # This will raise any exceptions from the worker
except Exception as e:
logger.error_trace(f"Future exception for {task_id}: {e}")
try:
# Clean up completed futures
completed_futures = [f for f in active_futures if f.done()]
for future in completed_futures:
task_id = active_futures.pop(future)
stalled_tasks.discard(task_id)
try:
future.result() # This will raise any exceptions from the worker
except Exception as e:
logger.error_trace(f"Future exception for {task_id}: {e}")
# Check for stalled downloads (no activity in STALL_TIMEOUT seconds)
current_time = time.time()
with _progress_lock:
for future, task_id in list(active_futures.items()):
if task_id in stalled_tasks:
continue
last_active = _last_activity.get(task_id, current_time)
if current_time - last_active > STALL_TIMEOUT:
logger.warning(f"Download stalled for {task_id}, cancelling")
book_queue.cancel_download(task_id)
book_queue.update_status_message(task_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)")
stalled_tasks.add(task_id)
# Check for stalled downloads (no activity in STALL_TIMEOUT seconds)
current_time = time.time()
with _progress_lock:
for future, task_id in list(active_futures.items()):
if task_id in stalled_tasks:
continue
last_active = _last_activity.get(task_id, current_time)
if current_time - last_active > STALL_TIMEOUT:
logger.warning(f"Download stalled for {task_id}, cancelling")
book_queue.cancel_download(task_id)
book_queue.update_status_message(task_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)")
stalled_tasks.add(task_id)
# Start new downloads if we have capacity
while len(active_futures) < max_workers:
next_download = book_queue.get_next()
if not next_download:
break
# Start new downloads if we have capacity
while len(active_futures) < max_workers:
next_download = book_queue.get_next()
if not next_download:
break
# Stagger concurrent downloads to avoid rate limiting on shared download servers
# Only delay if other downloads are already active
if active_futures:
stagger_delay = random.uniform(2, 5)
logger.debug(f"Staggering download start by {stagger_delay:.1f}s")
time.sleep(stagger_delay)
# Stagger concurrent downloads to avoid rate limiting on shared download servers
# Only delay if other downloads are already active
if active_futures:
stagger_delay = random.uniform(2, 5)
logger.debug(f"Staggering download start by {stagger_delay:.1f}s")
time.sleep(stagger_delay)
task_id, cancel_flag = next_download
task_id, cancel_flag = next_download
# Submit download job to thread pool
future = executor.submit(_process_single_download, task_id, cancel_flag)
active_futures[future] = task_id
# Submit download job to thread pool
future = executor.submit(_process_single_download, task_id, cancel_flag)
active_futures[future] = task_id
# Brief sleep to prevent busy waiting
time.sleep(config.MAIN_LOOP_SLEEP_TIME)
# Brief sleep to prevent busy waiting
time.sleep(config.MAIN_LOOP_SLEEP_TIME)
except Exception as e:
logger.error_trace("Download coordinator loop error: %s", e)
time.sleep(COORDINATOR_LOOP_ERROR_RETRY_DELAY)
# Download coordinator thread (started explicitly via start())
_coordinator_thread: Optional[threading.Thread] = None
_started = False
_coordinator_lock = Lock()
def start() -> None:
"""Start the download coordinator thread. Safe to call multiple times."""
global _coordinator_thread, _started
global _coordinator_thread
if _started:
logger.debug("Download coordinator already started")
return
with _coordinator_lock:
if _coordinator_thread is not None and _coordinator_thread.is_alive():
logger.debug("Download coordinator already started")
return
_coordinator_thread = threading.Thread(
target=concurrent_download_loop,
daemon=True,
name="DownloadCoordinator"
)
_coordinator_thread.start()
_started = True
if _coordinator_thread is not None:
logger.warning("Download coordinator thread is not alive; starting a new one")
_coordinator_thread = threading.Thread(
target=concurrent_download_loop,
daemon=True,
name="DownloadCoordinator"
)
_coordinator_thread.start()
logger.info(f"Download coordinator started with {config.MAX_CONCURRENT_DOWNLOADS} concurrent workers")

View File

@@ -0,0 +1,151 @@
from __future__ import annotations
from unittest.mock import ANY, MagicMock
import pytest
class _StopLoop(BaseException):
"""Sentinel used to stop the infinite coordinator loop during tests."""
class _FakeExecutor:
def __init__(self, *args, **kwargs) -> None:
self.args = args
self.kwargs = kwargs
def __enter__(self) -> _FakeExecutor:
return self
def __exit__(self, exc_type, exc, tb) -> bool:
return False
def submit(self, *args, **kwargs): # pragma: no cover - not expected in these tests
raise AssertionError("submit() should not be called in this test")
class _StopCoordinator(BaseException):
"""Sentinel used to stop a real coordinator thread cleanly in tests."""
def test_concurrent_download_loop_logs_and_recovers_after_loop_error(monkeypatch):
import shelfmark.download.orchestrator as orchestrator
call_count = 0
def fake_get_next():
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("boom")
return None
sleep_delays: list[float] = []
def fake_sleep(delay: float) -> None:
sleep_delays.append(delay)
if len(sleep_delays) >= 2:
raise _StopLoop()
mock_queue = MagicMock()
mock_queue.get_next.side_effect = fake_get_next
error_trace = MagicMock()
monkeypatch.setattr(orchestrator, "book_queue", mock_queue)
monkeypatch.setattr(orchestrator, "ThreadPoolExecutor", _FakeExecutor)
monkeypatch.setattr(orchestrator.time, "sleep", fake_sleep)
monkeypatch.setattr(orchestrator.logger, "error_trace", error_trace)
with pytest.raises(_StopLoop):
orchestrator.concurrent_download_loop()
assert mock_queue.get_next.call_count == 2
error_trace.assert_called_once_with("Download coordinator loop error: %s", ANY)
assert sleep_delays == [
orchestrator.COORDINATOR_LOOP_ERROR_RETRY_DELAY,
orchestrator.config.MAIN_LOOP_SLEEP_TIME,
]
def test_concurrent_download_loop_recovers_and_processes_task_after_transient_loop_error(monkeypatch):
import threading
import shelfmark.download.orchestrator as orchestrator
processed = threading.Event()
class FlakyQueue:
def __init__(self) -> None:
self.calls = 0
def get_next(self):
self.calls += 1
if self.calls == 1:
raise RuntimeError("boom")
if self.calls == 2:
return ("task-1", threading.Event())
if processed.is_set():
raise _StopCoordinator()
return None
def cancel_download(self, task_id: str) -> None: # pragma: no cover - unused
raise AssertionError(f"cancel_download unexpectedly called for {task_id}")
def update_status_message(self, task_id: str, message: str) -> None: # pragma: no cover - unused
raise AssertionError(f"update_status_message unexpectedly called for {task_id}: {message}")
queue = FlakyQueue()
error_trace = MagicMock()
monkeypatch.setattr(orchestrator, "book_queue", queue)
monkeypatch.setattr(
orchestrator,
"_process_single_download",
lambda task_id, cancel_flag: processed.set(),
)
monkeypatch.setattr(orchestrator.logger, "error_trace", error_trace)
monkeypatch.setattr(orchestrator, "COORDINATOR_LOOP_ERROR_RETRY_DELAY", 0.01)
monkeypatch.setattr(orchestrator.config, "MAX_CONCURRENT_DOWNLOADS", 1, raising=False)
monkeypatch.setattr(orchestrator.config, "MAIN_LOOP_SLEEP_TIME", 0.01, raising=False)
def run_loop() -> None:
try:
orchestrator.concurrent_download_loop()
except _StopCoordinator:
pass
thread = threading.Thread(target=run_loop, daemon=True, name="TestDownloadCoordinator")
thread.start()
assert processed.wait(timeout=1.0) is True
thread.join(timeout=1.0)
assert thread.is_alive() is False
assert queue.calls >= 3
error_trace.assert_called_once_with("Download coordinator loop error: %s", ANY)
def test_start_replaces_dead_coordinator_thread(monkeypatch):
import shelfmark.download.orchestrator as orchestrator
dead_thread = MagicMock()
dead_thread.is_alive.return_value = False
new_thread = MagicMock()
new_thread.is_alive.return_value = True
thread_factory = MagicMock(return_value=new_thread)
monkeypatch.setattr(orchestrator, "_coordinator_thread", dead_thread)
monkeypatch.setattr(orchestrator.threading, "Thread", thread_factory)
orchestrator.start()
thread_factory.assert_called_once_with(
target=orchestrator.concurrent_download_loop,
daemon=True,
name="DownloadCoordinator",
)
new_thread.start.assert_called_once_with()
assert orchestrator._coordinator_thread is new_thread

View File

@@ -8,6 +8,7 @@ def test_update_download_status_dedupes_identical_events(monkeypatch):
# Ensure clean module-level state
orchestrator._last_activity.clear()
orchestrator._last_progress_value.clear()
orchestrator._last_status_event.clear()
mock_queue = MagicMock()
@@ -28,6 +29,52 @@ def test_update_download_status_dedupes_identical_events(monkeypatch):
assert mock_queue.update_status_message.call_count == 1
assert mock_ws.broadcast_status_update.call_count == 1
# Activity timestamp should still be updated on the duplicate keep-alive call.
assert orchestrator._last_activity[book_id] == 2.0
# Duplicate keep-alives should not refresh stall activity.
assert orchestrator._last_activity[book_id] == 1.0
def test_update_download_progress_dedupes_identical_progress_for_activity(monkeypatch):
import shelfmark.download.orchestrator as orchestrator
book_id = "test-progress-book"
orchestrator._last_activity.clear()
orchestrator._last_progress_value.clear()
orchestrator._last_status_event.clear()
mock_queue = MagicMock()
monkeypatch.setattr(orchestrator, "book_queue", mock_queue)
monkeypatch.setattr(orchestrator, "ws_manager", None)
times = iter([10.0, 20.0])
monkeypatch.setattr(orchestrator.time, "time", lambda: next(times))
orchestrator.update_download_progress(book_id, 0.0)
orchestrator.update_download_progress(book_id, 0.0)
assert mock_queue.update_progress.call_count == 2
assert orchestrator._last_activity[book_id] == 10.0
assert orchestrator._last_progress_value[book_id] == 0.0
def test_update_download_progress_refreshes_activity_when_progress_changes(monkeypatch):
import shelfmark.download.orchestrator as orchestrator
book_id = "test-progress-change"
orchestrator._last_activity.clear()
orchestrator._last_progress_value.clear()
orchestrator._last_status_event.clear()
mock_queue = MagicMock()
monkeypatch.setattr(orchestrator, "book_queue", mock_queue)
monkeypatch.setattr(orchestrator, "ws_manager", None)
times = iter([30.0, 40.0])
monkeypatch.setattr(orchestrator.time, "time", lambda: next(times))
orchestrator.update_download_progress(book_id, 0.0)
orchestrator.update_download_progress(book_id, 0.5)
assert orchestrator._last_activity[book_id] == 40.0
assert orchestrator._last_progress_value[book_id] == 0.5