diff --git a/shelfmark/core/activity_routes.py b/shelfmark/core/activity_routes.py index c0d3d96..5a1262a 100644 --- a/shelfmark/core/activity_routes.py +++ b/shelfmark/core/activity_routes.py @@ -7,22 +7,22 @@ from typing import Any, Callable, NamedTuple from flask import Flask, jsonify, request, session -from shelfmark.core.download_history_service import DownloadHistoryService +from shelfmark.core.download_history_service import ACTIVE_DOWNLOAD_STATUS, DownloadHistoryService, VALID_TERMINAL_STATUSES from shelfmark.core.logger import setup_logger +from shelfmark.core.models import ACTIVE_QUEUE_STATUSES, QueueStatus, TERMINAL_QUEUE_STATUSES +from shelfmark.core.request_validation import RequestStatus from shelfmark.core.request_helpers import ( emit_ws_event, extract_release_source_id, + normalize_optional_text, normalize_positive_int, now_utc_iso, + populate_request_usernames, ) from shelfmark.core.user_db import UserDB logger = setup_logger(__name__) -# Offset added to request row IDs so they don't collide with download_history -# row IDs when both types are merged into a single sorted list. -_REQUEST_ID_OFFSET = 1_000_000_000 - def _parse_timestamp(value: Any) -> float: if not isinstance(value, str) or not value.strip(): @@ -152,16 +152,20 @@ def _activity_ws_room(*, is_no_auth: bool, actor_db_user_id: int | None) -> str: return "admins" +def _check_item_ownership(actor: _ActorContext, row: dict[str, Any]) -> Any | None: + """Return a 403 response if the actor doesn't own the item, else None.""" + if actor.is_admin: + return None + owner_user_id = normalize_positive_int(row.get("user_id")) + if owner_user_id != actor.db_user_id: + return jsonify({"error": "Forbidden"}), 403 + return None + + def _list_visible_requests(user_db: UserDB, *, is_admin: bool, db_user_id: int | None) -> list[dict[str, Any]]: if is_admin: request_rows = user_db.list_requests() - user_cache: dict[int, str] = {} - for row in request_rows: - requester_id = row["user_id"] - if requester_id not in user_cache: - requester = user_db.get_user(user_id=requester_id) - user_cache[requester_id] = requester.get("username", "") if requester else "" - row["username"] = user_cache[requester_id] + populate_request_usernames(request_rows, user_db) return request_rows if db_user_id is None: @@ -169,51 +173,79 @@ def _list_visible_requests(user_db: UserDB, *, is_admin: bool, db_user_id: int | return user_db.list_requests(user_id=db_user_id) -def _parse_download_item_key(item_key: Any) -> str | None: - if not isinstance(item_key, str) or not item_key.startswith("download:"): +def _parse_item_key(item_key: Any, prefix: str) -> str | None: + """Extract the value after 'prefix:' from an item_key string.""" + if not isinstance(item_key, str) or not item_key.startswith(f"{prefix}:"): return None - task_id = item_key.split(":", 1)[1].strip() - return task_id or None + value = item_key.split(":", 1)[1].strip() + return value or None -def _parse_request_item_key(item_key: Any) -> int | None: - if not isinstance(item_key, str) or not item_key.startswith("request:"): - return None - raw_id = item_key.split(":", 1)[1].strip() - return normalize_positive_int(raw_id) +_ALL_BUCKET_KEYS = (*ACTIVE_QUEUE_STATUSES, *TERMINAL_QUEUE_STATUSES) -def _merge_terminal_snapshot_backfill( +def _build_download_status_from_db( *, - status: dict[str, dict[str, Any]], - terminal_rows: list[dict[str, Any]], -) -> None: - existing_task_ids: set[str] = set() - for bucket_key in ("queued", "resolving", "locating", "downloading", "complete", "error", "cancelled"): - bucket = status.get(bucket_key) + db_rows: list[dict[str, Any]], + queue_status: dict[str, dict[str, Any]], +) -> dict[str, dict[str, Any]]: + """Build the download status dict from DB rows, overlaying live queue data. + + Active DB rows are matched against the queue for live progress. + Terminal DB rows go directly into their final bucket. + Stale active rows (no queue entry) are treated as interrupted errors. + """ + status: dict[str, dict[str, Any]] = {key: {} for key in _ALL_BUCKET_KEYS} + + # Index queue items by task_id for fast lookup: task_id -> (bucket_key, payload) + queue_index: dict[str, tuple[str, dict[str, Any]]] = {} + for bucket_key in _ALL_BUCKET_KEYS: + bucket = queue_status.get(bucket_key) if not isinstance(bucket, dict): continue - existing_task_ids.update(str(task_id) for task_id in bucket.keys()) + for task_id, payload in bucket.items(): + queue_index[str(task_id)] = (bucket_key, payload) - for row in terminal_rows: + for row in db_rows: task_id = str(row.get("task_id") or "").strip() - if not task_id or task_id in existing_task_ids: + if not task_id: continue final_status = row.get("final_status") - if final_status not in {"complete", "error", "cancelled"}: - continue - download_payload = DownloadHistoryService.to_download_payload(row) - if final_status not in status or not isinstance(status.get(final_status), dict): - status[final_status] = {} - status[final_status][task_id] = download_payload - existing_task_ids.add(task_id) + if final_status == ACTIVE_DOWNLOAD_STATUS: + queue_entry = queue_index.pop(task_id, None) + if queue_entry is not None: + bucket_key, queue_payload = queue_entry + status[bucket_key][task_id] = queue_payload + else: + # Stale active row — no queue entry means it was interrupted + download_payload = DownloadHistoryService.to_download_payload(row) + download_payload["status_message"] = "Interrupted" + status[QueueStatus.ERROR][task_id] = download_payload + elif final_status in VALID_TERMINAL_STATUSES: + download_payload = DownloadHistoryService.to_download_payload(row) + # For complete/cancelled the saved status_message is a stale + # progress string (e.g. "Fetching download sources") — clear it + # so the frontend only shows its own status label. Error rows + # keep theirs since the message describes the failure. + if final_status in ("complete", "cancelled"): + download_payload["status_message"] = None + status[final_status][task_id] = download_payload + + # Include any queue items that don't have a DB row yet (race condition safety). + # Only include active items — terminal items without a DB row are orphans + # (e.g. admin cleared history) and should not be shown. + for task_id, (bucket_key, queue_payload) in queue_index.items(): + if bucket_key in ACTIVE_QUEUE_STATUSES and task_id not in status.get(bucket_key, {}): + status[bucket_key][task_id] = queue_payload + + return status def _collect_active_download_task_ids(status: dict[str, dict[str, Any]]) -> set[str]: active_task_ids: set[str] = set() - for bucket_key in ("queued", "resolving", "locating", "downloading"): + for bucket_key in ACTIVE_QUEUE_STATUSES: bucket = status.get(bucket_key) if not isinstance(bucket, dict): continue @@ -226,19 +258,19 @@ def _collect_active_download_task_ids(status: dict[str, dict[str, Any]]) -> set[ def _request_terminal_status(row: dict[str, Any]) -> str | None: request_status = row.get("status") - if request_status == "pending": + if request_status == RequestStatus.PENDING: return None - if request_status == "rejected": - return "rejected" - if request_status == "cancelled": - return "cancelled" - if request_status != "fulfilled": + if request_status == RequestStatus.REJECTED: + return RequestStatus.REJECTED + if request_status == RequestStatus.CANCELLED: + return RequestStatus.CANCELLED + if request_status != RequestStatus.FULFILLED: return None delivery_state = str(row.get("delivery_state") or "").strip().lower() - if delivery_state in {"error", "cancelled"}: + if delivery_state in {QueueStatus.ERROR, QueueStatus.CANCELLED}: return delivery_state - return "complete" + return QueueStatus.COMPLETE def _minimal_request_snapshot(request_row: dict[str, Any], request_id: int) -> dict[str, Any]: @@ -273,11 +305,12 @@ def _request_history_entry(request_row: dict[str, Any]) -> dict[str, Any] | None if request_id is None: return None final_status = _request_terminal_status(request_row) + item_key = f"request:{request_id}" return { - "id": _REQUEST_ID_OFFSET + request_id, + "id": item_key, "user_id": request_row.get("user_id"), "item_type": "request", - "item_key": f"request:{request_id}", + "item_key": item_key, "dismissed_at": request_row.get("dismissed_at"), "snapshot": _minimal_request_snapshot(request_row, request_id), "origin": "request", @@ -338,7 +371,22 @@ def register_activity_routes( owner_user_scope = None if is_admin else db_user_id - status = queue_status(user_id=owner_user_scope) + live_queue = queue_status(user_id=owner_user_scope) + + try: + db_rows = download_history_service.get_undismissed( + user_id=owner_user_scope, + limit=200, + ) + except Exception as exc: + logger.warning("Failed to load undismissed download rows: %s", exc) + db_rows = [] + + status = _build_download_status_from_db( + db_rows=db_rows, + queue_status=live_queue, + ) + updated_requests = sync_request_delivery_states( user_db, queue_status=status, @@ -347,15 +395,6 @@ def register_activity_routes( emit_request_updates(updated_requests) request_rows = _list_visible_requests(user_db, is_admin=is_admin, db_user_id=db_user_id) - try: - terminal_rows = download_history_service.get_undismissed_terminal( - user_id=owner_user_scope, - limit=200, - ) - _merge_terminal_snapshot_backfill(status=status, terminal_rows=terminal_rows) - except Exception as exc: - logger.warning("Failed to merge terminal download history rows: %s", exc) - dismissed: list[dict[str, str]] = [] dismissed_task_ids: list[str] = [] try: @@ -431,29 +470,27 @@ def register_activity_routes( dismissal_item: dict[str, str] | None = None if item_type == "download": - task_id = _parse_download_item_key(item_key) + task_id = _parse_item_key(item_key, "download") if task_id is None: return jsonify({"error": "item_key must be in the format download:"}), 400 existing = download_history_service.get_by_task_id(task_id) if existing is None: - return jsonify({"error": "Activity item not found"}), 404 + # Row already gone (e.g. admin cleared history) — treat as success + dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"} + else: + ownership_gate = _check_item_ownership(actor, existing) + if ownership_gate is not None: + return ownership_gate - owner_user_id = normalize_positive_int(existing.get("user_id")) - if not actor.is_admin and owner_user_id != actor.db_user_id: - return jsonify({"error": "Forbidden"}), 403 - - dismissed_count = download_history_service.dismiss( - task_id=task_id, - user_id=actor.owner_scope, - ) - if dismissed_count < 1: - return jsonify({"error": "Activity item not found"}), 404 - - dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"} + download_history_service.dismiss( + task_id=task_id, + user_id=actor.owner_scope, + ) + dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"} elif item_type == "request": - request_id = _parse_request_item_key(item_key) + request_id = normalize_positive_int(_parse_item_key(item_key, "request")) if request_id is None: return jsonify({"error": "item_key must be in the format request:"}), 400 @@ -461,9 +498,9 @@ def register_activity_routes( if request_row is None: return jsonify({"error": "Request not found"}), 404 - owner_user_id = normalize_positive_int(request_row.get("user_id")) - if not actor.is_admin and owner_user_id != actor.db_user_id: - return jsonify({"error": "Forbidden"}), 403 + ownership_gate = _check_item_ownership(actor, request_row) + if ownership_gate is not None: + return ownership_gate user_db.update_request(request_id, dismissed_at=now_utc_iso()) dismissal_item = {"item_type": "request", "item_key": f"request:{request_id}"} @@ -515,28 +552,28 @@ def register_activity_routes( item_key = item.get("item_key") if item_type == "download": - task_id = _parse_download_item_key(item_key) + task_id = _parse_item_key(item_key, "download") if task_id is None: return jsonify({"error": "download item_key must be in the format download:"}), 400 existing = download_history_service.get_by_task_id(task_id) if existing is None: continue - owner_user_id = normalize_positive_int(existing.get("user_id")) - if not actor.is_admin and owner_user_id != actor.db_user_id: - return jsonify({"error": "Forbidden"}), 403 + ownership_gate = _check_item_ownership(actor, existing) + if ownership_gate is not None: + return ownership_gate download_task_ids.append(task_id) continue if item_type == "request": - request_id = _parse_request_item_key(item_key) + request_id = normalize_positive_int(_parse_item_key(item_key, "request")) if request_id is None: return jsonify({"error": "request item_key must be in the format request:"}), 400 request_row = user_db.get_request(request_id) if request_row is None: continue - owner_user_id = normalize_positive_int(request_row.get("user_id")) - if not actor.is_admin and owner_user_id != actor.db_user_id: - return jsonify({"error": "Forbidden"}), 403 + ownership_gate = _check_item_ownership(actor, request_row) + if ownership_gate is not None: + return ownership_gate request_ids.append(request_id) continue @@ -591,14 +628,14 @@ def register_activity_routes( if offset < 0: return jsonify({"error": "offset must be a non-negative integer"}), 400 - # We combine download + request history and apply pagination over the merged list. - max_rows = min(limit + offset + 500, 5000) + # Fetch enough from each source to fill the requested page after merging. + merge_limit = offset + limit download_history_rows = download_history_service.get_history( user_id=actor.owner_scope, - limit=max_rows, + limit=merge_limit, offset=0, ) - dismissed_request_rows = user_db.list_dismissed_requests(user_id=actor.owner_scope, limit=max_rows) + dismissed_request_rows = user_db.list_dismissed_requests(user_id=actor.owner_scope, limit=merge_limit) request_history_rows = [ entry for entry in (_request_history_entry(row) for row in dismissed_request_rows) @@ -609,7 +646,7 @@ def register_activity_routes( combined.sort( key=lambda row: ( _parse_timestamp(row.get("dismissed_at")), - int(row.get("id") or 0), + str(row.get("id") or ""), ), reverse=True, ) diff --git a/shelfmark/core/download_history_service.py b/shelfmark/core/download_history_service.py index 2b06112..93020fa 100644 --- a/shelfmark/core/download_history_service.py +++ b/shelfmark/core/download_history_service.py @@ -5,13 +5,19 @@ from __future__ import annotations import os import sqlite3 import threading +from datetime import datetime from typing import Any +from shelfmark.core.logger import setup_logger +from shelfmark.core.models import TERMINAL_QUEUE_STATUSES from shelfmark.core.request_helpers import normalize_optional_positive_int, normalize_optional_text, now_utc_iso +logger = setup_logger(__name__) -VALID_FINAL_STATUSES = frozenset({"complete", "error", "cancelled"}) -VALID_ORIGINS = frozenset({"direct", "request", "requested"}) + +VALID_TERMINAL_STATUSES = frozenset(s.value for s in TERMINAL_QUEUE_STATUSES) +ACTIVE_DOWNLOAD_STATUS = "active" +VALID_ORIGINS = frozenset({"direct", "requested"}) def _normalize_task_id(task_id: Any) -> str: @@ -27,7 +33,7 @@ def _normalize_origin(origin: Any) -> str: return "direct" lowered = normalized.lower() if lowered not in VALID_ORIGINS: - raise ValueError("origin must be one of: direct, request, requested") + raise ValueError("origin must be one of: direct, requested") return lowered @@ -36,7 +42,7 @@ def _normalize_final_status(final_status: Any) -> str: if normalized is None: raise ValueError("final_status must be a non-empty string") lowered = normalized.lower() - if lowered not in VALID_FINAL_STATUSES: + if lowered not in VALID_TERMINAL_STATUSES: raise ValueError("final_status must be one of: complete, error, cancelled") return lowered @@ -109,23 +115,39 @@ class DownloadHistoryService: "source_display_name": row.get("source_display_name"), "status_message": row.get("status_message"), "download_path": DownloadHistoryService._resolve_existing_download_path(row.get("download_path")), + "added_time": DownloadHistoryService._iso_to_epoch(row.get("queued_at")), "user_id": row.get("user_id"), "username": row.get("username"), "request_id": row.get("request_id"), } + @staticmethod + def _iso_to_epoch(value: Any) -> float | None: + if not isinstance(value, str) or not value.strip(): + return None + normalized = value.strip().replace("Z", "+00:00") + try: + return datetime.fromisoformat(normalized).timestamp() + except ValueError: + return None + @classmethod def _to_history_row(cls, row: dict[str, Any]) -> dict[str, Any]: task_id = str(row.get("task_id") or "").strip() + item_key = cls._to_item_key(task_id) + download_payload = cls.to_download_payload(row) + # Clear stale progress messages for non-error terminal states. + if row.get("final_status") in ("complete", "cancelled"): + download_payload["status_message"] = None return { - "id": row.get("id"), + "id": item_key, "user_id": row.get("user_id"), "item_type": "download", - "item_key": cls._to_item_key(task_id), + "item_key": item_key, "dismissed_at": row.get("dismissed_at"), "snapshot": { "kind": "download", - "download": cls.to_download_payload(row), + "download": download_payload, }, "origin": row.get("origin"), "final_status": row.get("final_status"), @@ -134,7 +156,7 @@ class DownloadHistoryService: "source_id": task_id or None, } - def record_terminal( + def record_download( self, *, task_id: str, @@ -150,11 +172,13 @@ class DownloadHistoryService: preview: str | None, content_type: str | None, origin: str, - final_status: str, - status_message: str | None, - download_path: str | None, - terminal_at: str | None = None, ) -> None: + """Record a download at queue time with final_status='active'. + + On first queue: inserts a new row. + On retry (row already exists): resets the row back to 'active' + so the normal finalize path works when the retry completes. + """ normalized_task_id = _normalize_task_id(task_id) normalized_user_id = normalize_optional_positive_int(user_id, "user_id") normalized_request_id = normalize_optional_positive_int(request_id, "request_id") @@ -165,12 +189,6 @@ class DownloadHistoryService: if normalized_title is None: raise ValueError("title must be a non-empty string") normalized_origin = _normalize_origin(origin) - normalized_final_status = _normalize_final_status(final_status) - effective_terminal_at = ( - terminal_at - if isinstance(terminal_at, str) and terminal_at.strip() - else now_utc_iso() - ) with self._lock: conn = self._connect() @@ -178,44 +196,20 @@ class DownloadHistoryService: conn.execute( """ INSERT INTO download_history ( - task_id, - user_id, - username, - request_id, - source, - source_display_name, - title, - author, - format, - size, - preview, - content_type, - origin, - final_status, - status_message, - download_path, - terminal_at + task_id, user_id, username, request_id, + source, source_display_name, + title, author, format, size, preview, content_type, + origin, final_status, + status_message, download_path, + queued_at, terminal_at ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', NULL, NULL, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(task_id) DO UPDATE SET - user_id = excluded.user_id, - username = excluded.username, - request_id = excluded.request_id, - source = excluded.source, - source_display_name = excluded.source_display_name, - title = excluded.title, - author = excluded.author, - format = excluded.format, - size = excluded.size, - preview = excluded.preview, - content_type = excluded.content_type, - origin = excluded.origin, - final_status = excluded.final_status, - status_message = excluded.status_message, - download_path = excluded.download_path, - terminal_at = excluded.terminal_at, - dismissed_at = NULL - """, + final_status = 'active', + status_message = NULL, + download_path = NULL, + terminal_at = CURRENT_TIMESTAMP + """, ( normalized_task_id, normalized_user_id, @@ -230,16 +224,57 @@ class DownloadHistoryService: normalize_optional_text(preview), normalize_optional_text(content_type), normalized_origin, - normalized_final_status, - normalize_optional_text(status_message), - normalize_optional_text(download_path), - effective_terminal_at, ), ) conn.commit() finally: conn.close() + def finalize_download( + self, + *, + task_id: str, + final_status: str, + status_message: str | None = None, + download_path: str | None = None, + ) -> None: + """Update an existing download row to its terminal state.""" + normalized_task_id = _normalize_task_id(task_id) + normalized_final_status = _normalize_final_status(final_status) + normalized_status_message = normalize_optional_text(status_message) + normalized_download_path = normalize_optional_text(download_path) + effective_terminal_at = now_utc_iso() + + with self._lock: + conn = self._connect() + try: + cursor = conn.execute( + """ + UPDATE download_history + SET final_status = ?, + status_message = ?, + download_path = ?, + terminal_at = ? + WHERE task_id = ? AND final_status = 'active' + """, + ( + normalized_final_status, + normalized_status_message, + normalized_download_path, + effective_terminal_at, + normalized_task_id, + ), + ) + rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0 + if rowcount < 1: + logger.warning( + "finalize_download: no active row found for task_id=%s (may have been missed at queue time)", + normalized_task_id, + ) + conn.commit() + finally: + conn.close() + def get_by_task_id(self, task_id: str) -> dict[str, Any] | None: normalized_task_id = _normalize_task_id(task_id) conn = self._connect() @@ -252,7 +287,7 @@ class DownloadHistoryService: finally: conn.close() - def get_undismissed_terminal( + def get_undismissed( self, *, user_id: int | None, diff --git a/shelfmark/core/models.py b/shelfmark/core/models.py index b3ae3be..1fcd01b 100644 --- a/shelfmark/core/models.py +++ b/shelfmark/core/models.py @@ -38,12 +38,19 @@ class QueueStatus(str, Enum): LOCATING = "locating" DOWNLOADING = "downloading" COMPLETE = "complete" - AVAILABLE = "available" ERROR = "error" - DONE = "done" CANCELLED = "cancelled" +TERMINAL_QUEUE_STATUSES: frozenset[QueueStatus] = frozenset({ + QueueStatus.COMPLETE, QueueStatus.ERROR, QueueStatus.CANCELLED, +}) + +ACTIVE_QUEUE_STATUSES: frozenset[QueueStatus] = frozenset({ + QueueStatus.QUEUED, QueueStatus.RESOLVING, QueueStatus.LOCATING, QueueStatus.DOWNLOADING, +}) + + class SearchMode(str, Enum): DIRECT = "direct" UNIVERSAL = "universal" diff --git a/shelfmark/core/queue.py b/shelfmark/core/queue.py index 1306b90..c2725f0 100644 --- a/shelfmark/core/queue.py +++ b/shelfmark/core/queue.py @@ -8,7 +8,7 @@ from threading import Lock, Event from typing import Dict, List, Optional, Tuple, Any, Callable from shelfmark.core.config import config as app_config -from shelfmark.core.models import QueueStatus, QueueItem, DownloadTask +from shelfmark.core.models import QueueStatus, QueueItem, DownloadTask, TERMINAL_QUEUE_STATUSES class BookQueue: @@ -25,6 +25,7 @@ class BookQueue: self._terminal_status_hook: Optional[ Callable[[str, QueueStatus, DownloadTask], None] ] = None + self._queue_hook: Optional[Callable[[str, DownloadTask], None]] = None @property def _status_timeout(self) -> timedelta: @@ -33,11 +34,12 @@ class BookQueue: def add(self, task: DownloadTask) -> bool: """Add a download task to the queue. Returns False if already exists.""" + hook: Optional[Callable[[str, DownloadTask], None]] = None with self._lock: task_id = task.task_id - # Don't add if already exists and not in error/done state - if task_id in self._status and self._status[task_id] not in [QueueStatus.ERROR, QueueStatus.DONE, QueueStatus.CANCELLED]: + # Don't add if already exists and not in error/cancelled state + if task_id in self._status and self._status[task_id] not in [QueueStatus.ERROR, QueueStatus.CANCELLED]: return False # Ensure added_time is set @@ -48,7 +50,14 @@ class BookQueue: self._queue.put(queue_item) self._task_data[task_id] = task self._update_status(task_id, QueueStatus.QUEUED) - return True + hook = self._queue_hook + + if hook is not None: + try: + hook(task_id, task) + except Exception: + pass + return True def get_next(self) -> Optional[Tuple[str, Event]]: """Get next task ID from queue with cancellation flag.""" @@ -95,6 +104,14 @@ class BookQueue: with self._lock: self._terminal_status_hook = hook + def set_queue_hook( + self, + hook: Optional[Callable[[str, DownloadTask], None]], + ) -> None: + """Register a callback invoked when a task is added to the queue.""" + with self._lock: + self._queue_hook = hook + def update_status(self, book_id: str, status: QueueStatus) -> None: """Update status of a book in the queue.""" hook: Optional[Callable[[str, QueueStatus, DownloadTask], None]] = None @@ -103,15 +120,8 @@ class BookQueue: previous_status = self._status.get(book_id) self._update_status(book_id, status) - terminal_statuses = { - QueueStatus.COMPLETE, - QueueStatus.AVAILABLE, - QueueStatus.ERROR, - QueueStatus.DONE, - QueueStatus.CANCELLED, - } if ( - status in terminal_statuses + status in TERMINAL_QUEUE_STATUSES and previous_status != status and self._terminal_status_hook is not None ): @@ -121,7 +131,7 @@ class BookQueue: hook_task = current_task # Clean up active download tracking when finished - if status in [QueueStatus.COMPLETE, QueueStatus.AVAILABLE, QueueStatus.ERROR, QueueStatus.DONE, QueueStatus.CANCELLED]: + if status in TERMINAL_QUEUE_STATUSES: self._active_downloads.pop(book_id, None) self._cancel_flags.pop(book_id, None) @@ -196,29 +206,20 @@ class BookQueue: return sorted(queue_items, key=lambda x: (x['priority'], x['added_time'])) def cancel_download(self, task_id: str) -> bool: - """Cancel a download or clear a completed/errored item.""" + """Cancel an active or queued download.""" with self._lock: current_status = self._status.get(task_id) - # Allow cancellation during any active state if current_status in [QueueStatus.RESOLVING, QueueStatus.LOCATING, QueueStatus.DOWNLOADING]: # Signal active download to stop if task_id in self._cancel_flags: self._cancel_flags[task_id].set() - if current_status in [QueueStatus.COMPLETE, QueueStatus.DONE, QueueStatus.AVAILABLE, QueueStatus.ERROR, QueueStatus.CANCELLED]: - # Clear completed/errored/cancelled items from tracking - self._status.pop(task_id, None) - self._status_timestamps.pop(task_id, None) - self._task_data.pop(task_id, None) - self._cancel_flags.pop(task_id, None) - self._active_downloads.pop(task_id, None) - return True + elif current_status not in [QueueStatus.QUEUED]: + # Not in a cancellable state + return False - if current_status in [QueueStatus.RESOLVING, QueueStatus.LOCATING, QueueStatus.DOWNLOADING, QueueStatus.QUEUED]: - self.update_status(task_id, QueueStatus.CANCELLED) - return True - - return False + self.update_status(task_id, QueueStatus.CANCELLED) + return True def set_priority(self, task_id: str, new_priority: int) -> bool: """Change the priority of a queued task (lower = higher priority).""" @@ -257,6 +258,8 @@ class BookQueue: This is used for retries where task metadata should be preserved. """ + hook: Optional[Callable[[str, DownloadTask], None]] = None + hook_task: Optional[DownloadTask] = None with self._lock: task = self._task_data.get(task_id) if task is None: @@ -285,7 +288,15 @@ class BookQueue: queue_item = QueueItem(task_id, task.priority, time.time()) self._queue.put(queue_item) self._update_status(task_id, QueueStatus.QUEUED) - return True + hook = self._queue_hook + hook_task = task + + if hook is not None and hook_task is not None: + try: + hook(task_id, hook_task) + except Exception: + pass + return True def reorder_queue(self, task_priorities: Dict[str, int]) -> bool: """Bulk reorder queue by mapping task_id to new priority.""" @@ -327,7 +338,7 @@ class BookQueue: def refresh(self) -> None: """Remove any tasks that are done downloading or have stale status.""" - terminal_statuses = {QueueStatus.COMPLETE, QueueStatus.DONE, QueueStatus.ERROR, QueueStatus.AVAILABLE, QueueStatus.CANCELLED} + terminal_statuses = TERMINAL_QUEUE_STATUSES with self._lock: current_time = datetime.now() to_remove = [] @@ -341,10 +352,6 @@ class BookQueue: if task.download_path and not Path(task.download_path).exists(): task.download_path = None - # Mark available downloads as done if file is gone - if status == QueueStatus.AVAILABLE and not task.download_path: - self._update_status(task_id, QueueStatus.DONE) - # Check for stale status entries last_update = self._status_timestamps.get(task_id) if last_update and (current_time - last_update) > self._status_timeout: diff --git a/shelfmark/core/request_helpers.py b/shelfmark/core/request_helpers.py index 1894414..9cb54db 100644 --- a/shelfmark/core/request_helpers.py +++ b/shelfmark/core/request_helpers.py @@ -104,6 +104,17 @@ def normalize_optional_positive_int(value: Any, field_name: str = "value") -> in return parsed +def populate_request_usernames(rows: list[dict[str, Any]], user_db: Any) -> None: + """Add 'username' to each request row by looking up user_id.""" + cache: dict[int, str] = {} + for row in rows: + requester_id = row["user_id"] + if requester_id not in cache: + requester = user_db.get_user(user_id=requester_id) + cache[requester_id] = requester.get("username", "") if requester else "" + row["username"] = cache[requester_id] + + def extract_release_source_id(release_data: Any) -> str | None: """Extract and normalize release_data.source_id.""" if not isinstance(release_data, dict): diff --git a/shelfmark/core/request_routes.py b/shelfmark/core/request_routes.py index a54434e..4a97dc8 100644 --- a/shelfmark/core/request_routes.py +++ b/shelfmark/core/request_routes.py @@ -17,6 +17,7 @@ from shelfmark.core.request_policy import ( parse_policy_mode, resolve_policy_mode, ) +from shelfmark.core.request_validation import RequestStatus from shelfmark.core.requests_service import ( RequestServiceError, cancel_request, @@ -36,6 +37,8 @@ from shelfmark.core.request_helpers import ( emit_ws_event, load_users_request_policy_settings, normalize_optional_text, + normalize_positive_int, + populate_request_usernames, ) from shelfmark.core.user_db import UserDB @@ -88,6 +91,18 @@ def _require_db_user_id() -> tuple[int | None, Any | None]: ) +def _require_admin_user_id() -> tuple[int | None, Any | None]: + if not session.get("is_admin", False): + return None, (jsonify({"error": "Admin access required"}), 403) + raw_admin_id = session.get("db_user_id") + if raw_admin_id is None: + return None, (jsonify({"error": "Admin user identity unavailable"}), 403) + try: + return int(raw_admin_id), None + except (TypeError, ValueError): + return None, (jsonify({"error": "Admin user identity unavailable"}), 403) + + def _resolve_effective_policy( user_db: UserDB, *, @@ -177,23 +192,16 @@ def _format_user_label(username: str | None, user_id: int | None = None) -> str: return "unknown user" -def _resolve_request_username( - user_db: UserDB, - *, - request_row: dict[str, Any], - fallback_username: str | None = None, -) -> str | None: - normalized_fallback = normalize_optional_text(fallback_username) - raw_user_id = request_row.get("user_id") - try: - request_user_id = int(raw_user_id) - except (TypeError, ValueError): - return normalized_fallback - - requester = user_db.get_user(user_id=request_user_id) - if not isinstance(requester, dict): - return normalized_fallback - return normalize_optional_text(requester.get("username")) or normalized_fallback +def _format_requester_label(user_db: UserDB, request_row: dict[str, Any]) -> str: + """Resolve a display label for the user who created a request.""" + user_id = normalize_positive_int(request_row.get("user_id")) + if user_id is not None: + requester = user_db.get_user(user_id=user_id) + if isinstance(requester, dict): + username = normalize_optional_text(requester.get("username")) + if username is not None: + return username + return _format_user_label(None, user_id) def _resolve_request_source_and_format(request_row: dict[str, Any]) -> tuple[str, str | None]: @@ -209,13 +217,6 @@ def _resolve_request_source_and_format(request_row: dict[str, Any]) -> tuple[str return normalize_source(request_row.get("source_hint")), None -def _resolve_request_user_id(request_row: dict[str, Any]) -> int | None: - raw_user_id = request_row.get("user_id") - try: - user_id = int(raw_user_id) - except (TypeError, ValueError): - return None - return user_id if user_id > 0 else None def _notify_admin_for_request_event( @@ -223,7 +224,6 @@ def _notify_admin_for_request_event( *, event: NotificationEvent, request_row: dict[str, Any], - fallback_username: str | None = None, ) -> None: book_data = request_row.get("book_data") if not isinstance(book_data, dict): @@ -234,11 +234,7 @@ def _notify_admin_for_request_event( event=event, title=str(book_data.get("title") or "Unknown title"), author=str(book_data.get("author") or "Unknown author"), - username=_resolve_request_username( - user_db, - request_row=request_row, - fallback_username=fallback_username, - ), + username=_format_requester_label(user_db, request_row), content_type=normalize_content_type( request_row.get("content_type") or book_data.get("content_type") ), @@ -248,7 +244,7 @@ def _notify_admin_for_request_event( error_message=None, ) - owner_user_id = _resolve_request_user_id(request_row) + owner_user_id = normalize_positive_int(request_row.get("user_id")) try: notify_admin(event, context) except Exception as exc: @@ -513,7 +509,6 @@ def register_request_routes( user_db, event=NotificationEvent.REQUEST_CREATED, request_row=created, - fallback_username=actor_username, ) return jsonify(created), 201 @@ -606,13 +601,7 @@ def register_request_routes( except ValueError as exc: return jsonify({"error": str(exc)}), 400 - user_cache: dict[int, str] = {} - for row in rows: - requester_id = row["user_id"] - if requester_id not in user_cache: - requester = user_db.get_user(user_id=requester_id) - user_cache[requester_id] = requester.get("username", "") if requester else "" - row["username"] = user_cache[requester_id] + populate_request_usernames(rows, user_db) return jsonify(rows) @@ -626,11 +615,11 @@ def register_request_routes( by_status = { status: len(user_db.list_requests(status=status)) - for status in ("pending", "fulfilled", "rejected", "cancelled") + for status in RequestStatus } return jsonify( { - "pending": by_status["pending"], + "pending": by_status[RequestStatus.PENDING], "total": sum(by_status.values()), "by_status": by_status, } @@ -641,16 +630,10 @@ def register_request_routes( auth_gate = _require_request_endpoints_available(resolve_auth_mode) if auth_gate is not None: return auth_gate - if not session.get("is_admin", False): - return jsonify({"error": "Admin access required"}), 403 - raw_admin_id = session.get("db_user_id") - if raw_admin_id is None: - return jsonify({"error": "Admin user identity unavailable"}), 403 - try: - admin_user_id = int(raw_admin_id) - except (TypeError, ValueError): - return jsonify({"error": "Admin user identity unavailable"}), 403 + admin_user_id, admin_gate = _require_admin_user_id() + if admin_gate is not None: + return admin_gate data = request.get_json(silent=True) or {} if not isinstance(data, dict): @@ -675,10 +658,7 @@ def register_request_routes( "title": _resolve_request_title(updated), } admin_label = _format_user_label(normalize_optional_text(session.get("user_id")), admin_user_id) - requester_label = _format_user_label( - _resolve_request_username(user_db, request_row=updated), - _resolve_request_user_id(updated), - ) + requester_label = _format_requester_label(user_db, updated) logger.info( "Request fulfilled #%s for '%s' by %s (requested by %s)", updated["id"], @@ -712,16 +692,10 @@ def register_request_routes( auth_gate = _require_request_endpoints_available(resolve_auth_mode) if auth_gate is not None: return auth_gate - if not session.get("is_admin", False): - return jsonify({"error": "Admin access required"}), 403 - raw_admin_id = session.get("db_user_id") - if raw_admin_id is None: - return jsonify({"error": "Admin user identity unavailable"}), 403 - try: - admin_user_id = int(raw_admin_id) - except (TypeError, ValueError): - return jsonify({"error": "Admin user identity unavailable"}), 403 + admin_user_id, admin_gate = _require_admin_user_id() + if admin_gate is not None: + return admin_gate data = request.get_json(silent=True) or {} if not isinstance(data, dict): @@ -743,10 +717,7 @@ def register_request_routes( "title": _resolve_request_title(updated), } admin_label = _format_user_label(normalize_optional_text(session.get("user_id")), admin_user_id) - requester_label = _format_user_label( - _resolve_request_username(user_db, request_row=updated), - _resolve_request_user_id(updated), - ) + requester_label = _format_requester_label(user_db, updated) logger.info( "Request rejected #%s for '%s' by %s (requested by %s)", updated["id"], diff --git a/shelfmark/core/request_validation.py b/shelfmark/core/request_validation.py index 713183e..af707e0 100644 --- a/shelfmark/core/request_validation.py +++ b/shelfmark/core/request_validation.py @@ -2,27 +2,29 @@ from __future__ import annotations +from enum import Enum from typing import Any +from shelfmark.core.models import QueueStatus from shelfmark.core.request_policy import parse_policy_mode -VALID_REQUEST_STATUSES = frozenset({"pending", "fulfilled", "rejected", "cancelled"}) -TERMINAL_REQUEST_STATUSES = frozenset({"fulfilled", "rejected", "cancelled"}) +class RequestStatus(str, Enum): + """Enum for request lifecycle statuses.""" + PENDING = "pending" + FULFILLED = "fulfilled" + REJECTED = "rejected" + CANCELLED = "cancelled" + + +DELIVERY_STATE_NONE = "none" + +VALID_REQUEST_STATUSES = frozenset(RequestStatus) +TERMINAL_REQUEST_STATUSES = frozenset({ + RequestStatus.FULFILLED, RequestStatus.REJECTED, RequestStatus.CANCELLED, +}) VALID_REQUEST_LEVELS = frozenset({"book", "release"}) -VALID_DELIVERY_STATES = frozenset( - { - "none", - "unknown", - "queued", - "resolving", - "locating", - "downloading", - "complete", - "error", - "cancelled", - } -) +VALID_DELIVERY_STATES = frozenset({DELIVERY_STATE_NONE} | set(QueueStatus)) def normalize_request_status(status: Any) -> str: @@ -63,14 +65,6 @@ def normalize_delivery_state(state: Any) -> str: return normalized -def safe_delivery_state(state: Any, default: str = "none") -> str: - """Normalize delivery-state with a fallback for invalid/missing values.""" - if not isinstance(state, str): - return default - normalized = state.strip().lower() - return normalized if normalized in VALID_DELIVERY_STATES else default - - def validate_request_level_payload(request_level: Any, release_data: Any) -> str: """Validate request_level and release_data shape coupling.""" normalized_level = normalize_request_level(request_level) diff --git a/shelfmark/core/requests_service.py b/shelfmark/core/requests_service.py index 5b71418..e183056 100644 --- a/shelfmark/core/requests_service.py +++ b/shelfmark/core/requests_service.py @@ -7,11 +7,13 @@ import json from typing import Any, Callable, TYPE_CHECKING from shelfmark.core.request_policy import normalize_content_type +from shelfmark.core.models import QueueStatus from shelfmark.core.request_validation import ( + DELIVERY_STATE_NONE, + RequestStatus, normalize_policy_mode, normalize_request_level, normalize_request_status, - safe_delivery_state, validate_request_level_payload, validate_status_transition, ) @@ -102,7 +104,7 @@ def _find_duplicate_pending_request( author: str, content_type: str, ) -> dict[str, Any] | None: - pending_rows = user_db.list_requests(user_id=user_id, status="pending") + pending_rows = user_db.list_requests(user_id=user_id, status=RequestStatus.PENDING) for row in pending_rows: row_book_data = row.get("book_data") or {} if not isinstance(row_book_data, dict): @@ -122,8 +124,12 @@ def _now_timestamp() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") -def _existing_delivery_state(request_row: dict[str, Any]) -> str: - return safe_delivery_state(request_row.get("delivery_state")) +def _normalize_admin_note(admin_note: Any) -> str | None: + if admin_note is None: + return None + if not isinstance(admin_note, str): + raise RequestServiceError("admin_note must be a string", status_code=400) + return admin_note.strip() or None def sync_delivery_states_from_queue_status( @@ -134,7 +140,7 @@ def sync_delivery_states_from_queue_status( ) -> list[dict[str, Any]]: """Persist delivery-state transitions for fulfilled requests based on queue status.""" source_delivery_states: dict[str, str] = {} - for status_key in ("queued", "resolving", "locating", "downloading", "complete", "error", "cancelled"): + for status_key in QueueStatus: status_bucket = queue_status.get(status_key) if not isinstance(status_bucket, dict): continue @@ -144,7 +150,7 @@ def sync_delivery_states_from_queue_status( if not source_delivery_states: return [] - fulfilled_rows = user_db.list_requests(user_id=user_id, status="fulfilled") + fulfilled_rows = user_db.list_requests(user_id=user_id, status=RequestStatus.FULFILLED) updated: list[dict[str, Any]] = [] for row in fulfilled_rows: @@ -156,7 +162,7 @@ def sync_delivery_states_from_queue_status( if delivery_state is None: continue - if _existing_delivery_state(row) == delivery_state: + if row.get("delivery_state", DELIVERY_STATE_NONE) == delivery_state: continue updated.append( @@ -257,6 +263,15 @@ def ensure_request_access( return request_row +def _require_pending(request_row: dict[str, Any]) -> None: + if request_row["status"] != RequestStatus.PENDING: + raise RequestServiceError( + "Request is already in a terminal state", + status_code=409, + code="stale_transition", + ) + + def cancel_request( user_db: "UserDB", *, @@ -270,18 +285,13 @@ def cancel_request( actor_user_id=actor_user_id, is_admin=False, ) - if request_row["status"] != "pending": - raise RequestServiceError( - "Request is already in a terminal state", - status_code=409, - code="stale_transition", - ) + _require_pending(request_row) try: return user_db.update_request( request_id, - expected_current_status="pending", - status="cancelled", + expected_current_status=RequestStatus.PENDING, + status=RequestStatus.CANCELLED, ) except ValueError as exc: raise RequestServiceError(str(exc), status_code=409, code="stale_transition") from exc @@ -301,24 +311,15 @@ def reject_request( actor_user_id=admin_user_id, is_admin=True, ) - if request_row["status"] != "pending": - raise RequestServiceError( - "Request is already in a terminal state", - status_code=409, - code="stale_transition", - ) + _require_pending(request_row) - normalized_admin_note = None - if admin_note is not None: - if not isinstance(admin_note, str): - raise RequestServiceError("admin_note must be a string", status_code=400) - normalized_admin_note = admin_note.strip() or None + normalized_admin_note = _normalize_admin_note(admin_note) try: return user_db.update_request( request_id, - expected_current_status="pending", - status="rejected", + expected_current_status=RequestStatus.PENDING, + status=RequestStatus.REJECTED, admin_note=normalized_admin_note, reviewed_by=admin_user_id, reviewed_at=_now_timestamp(), @@ -344,18 +345,9 @@ def fulfil_request( actor_user_id=admin_user_id, is_admin=True, ) - if request_row["status"] != "pending": - raise RequestServiceError( - "Request is already in a terminal state", - status_code=409, - code="stale_transition", - ) + _require_pending(request_row) - normalized_admin_note = None - if admin_note is not None: - if not isinstance(admin_note, str): - raise RequestServiceError("admin_note must be a string", status_code=400) - normalized_admin_note = admin_note.strip() or None + normalized_admin_note = _normalize_admin_note(admin_note) if not isinstance(manual_approval, bool): raise RequestServiceError("manual_approval must be a boolean", status_code=400) @@ -368,10 +360,10 @@ def fulfil_request( try: return user_db.update_request( request_id, - expected_current_status="pending", - status="fulfilled", + expected_current_status=RequestStatus.PENDING, + status=RequestStatus.FULFILLED, release_data=None, - delivery_state="complete", + delivery_state=QueueStatus.COMPLETE, delivery_updated_at=_now_timestamp(), last_failure_reason=None, admin_note=normalized_admin_note, @@ -381,14 +373,9 @@ def fulfil_request( except ValueError as exc: raise RequestServiceError(str(exc), status_code=409, code="stale_transition") from exc - if request_row["request_level"] == "book" and selected_release_data is None: + if selected_release_data is None: raise RequestServiceError( - "release_data is required to fulfil book-level requests", - status_code=400, - ) - if request_row["request_level"] == "release" and selected_release_data is None: - raise RequestServiceError( - "release_data is required to fulfil release-level requests", + "release_data is required to fulfil requests", status_code=400, ) @@ -417,10 +404,10 @@ def fulfil_request( try: return user_db.update_request( request_id, - expected_current_status="pending", - status="fulfilled", + expected_current_status=RequestStatus.PENDING, + status=RequestStatus.FULFILLED, release_data=selected_release_data, - delivery_state="queued", + delivery_state=QueueStatus.QUEUED, delivery_updated_at=_now_timestamp(), last_failure_reason=None, admin_note=normalized_admin_note, diff --git a/shelfmark/core/user_db.py b/shelfmark/core/user_db.py index 2a21d2c..3edf6c5 100644 --- a/shelfmark/core/user_db.py +++ b/shelfmark/core/user_db.py @@ -9,12 +9,14 @@ from typing import Any, Dict, List, Optional from shelfmark.core.auth_modes import AUTH_SOURCE_BUILTIN, AUTH_SOURCE_SET from shelfmark.core.logger import setup_logger from shelfmark.core.request_helpers import normalize_optional_positive_int +from shelfmark.core.models import QueueStatus from shelfmark.core.request_validation import ( + DELIVERY_STATE_NONE, + RequestStatus, normalize_delivery_state, normalize_policy_mode, normalize_request_level, normalize_request_status, - safe_delivery_state, validate_request_level_payload, validate_status_transition, ) @@ -83,6 +85,7 @@ CREATE TABLE IF NOT EXISTS download_history ( final_status TEXT NOT NULL, status_message TEXT, download_path TEXT, + queued_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, terminal_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, dismissed_at TIMESTAMP ); @@ -92,6 +95,10 @@ ON download_history (user_id, final_status, terminal_at DESC); CREATE INDEX IF NOT EXISTS idx_download_history_dismissed ON download_history (dismissed_at) WHERE dismissed_at IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_download_history_undismissed +ON download_history (user_id, terminal_at DESC, id DESC) +WHERE dismissed_at IS NULL; """ @@ -162,6 +169,7 @@ class UserDB: self._migrate_auth_source_column(conn) self._migrate_request_delivery_columns(conn) self._migrate_download_requests_dismissed_at(conn) + self._migrate_download_history_queued_at(conn) conn.commit() # WAL mode must be changed outside an open transaction. conn.execute("PRAGMA journal_mode=WAL") @@ -201,18 +209,11 @@ class UserDB: if "last_failure_reason" not in column_names: conn.execute("ALTER TABLE download_requests ADD COLUMN last_failure_reason TEXT") - conn.execute( - """ - UPDATE download_requests - SET delivery_state = 'unknown' - WHERE status = 'fulfilled' AND (delivery_state IS NULL OR TRIM(delivery_state) = '' OR delivery_state = 'none') - """ - ) conn.execute( """ UPDATE download_requests SET delivery_state = 'none' - WHERE status != 'fulfilled' AND (delivery_state IS NULL OR TRIM(delivery_state) = '') + WHERE delivery_state IS NULL OR TRIM(delivery_state) = '' OR delivery_state IN ('unknown', 'available', 'done') """ ) conn.execute( @@ -222,13 +223,6 @@ class UserDB: WHERE delivery_state != 'none' AND delivery_updated_at IS NULL """ ) - conn.execute( - """ - UPDATE download_requests - SET delivery_state = 'complete' - WHERE delivery_state = 'cleared' - """ - ) def _migrate_download_requests_dismissed_at(self, conn: sqlite3.Connection) -> None: """Ensure download_requests.dismissed_at exists for request dismissal history.""" @@ -236,10 +230,20 @@ class UserDB: column_names = {str(col["name"]) for col in columns} if "dismissed_at" not in column_names: conn.execute("ALTER TABLE download_requests ADD COLUMN dismissed_at TIMESTAMP") - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_download_requests_dismissed " - "ON download_requests (dismissed_at) WHERE dismissed_at IS NOT NULL" - ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_download_requests_dismissed " + "ON download_requests (dismissed_at) WHERE dismissed_at IS NOT NULL" + ) + + def _migrate_download_history_queued_at(self, conn: sqlite3.Connection) -> None: + """Ensure download_history.queued_at exists for queue-time recording.""" + columns = conn.execute("PRAGMA table_info(download_history)").fetchall() + column_names = {str(col["name"]) for col in columns} + if "queued_at" not in column_names: + conn.execute("ALTER TABLE download_history ADD COLUMN queued_at TIMESTAMP") + conn.execute( + "UPDATE download_history SET queued_at = CURRENT_TIMESTAMP WHERE queued_at IS NULL" + ) def create_user( self, @@ -447,13 +451,13 @@ class UserDB: policy_mode: str, book_data: Dict[str, Any], release_data: Optional[Dict[str, Any]] = None, - status: str = "pending", + status: str = RequestStatus.PENDING, source_hint: Optional[str] = None, note: Optional[str] = None, admin_note: Optional[str] = None, reviewed_by: Optional[int] = None, reviewed_at: Optional[str] = None, - delivery_state: str = "none", + delivery_state: str = DELIVERY_STATE_NONE, delivery_updated_at: Optional[str] = None, ) -> Dict[str, Any]: """Create a download request row and return the created record.""" @@ -664,24 +668,8 @@ class UserDB: if "content_type" in updates and not updates["content_type"]: raise ValueError("content_type is required") - candidate_request_level = updates.get("request_level", current["request_level"]) - candidate_release_data = ( - updates["release_data"] if "release_data" in updates else current["release_data"] - ) - candidate_status = updates.get("status", current["status"]) - normalized_request_level = normalize_request_level(candidate_request_level) - normalized_candidate_status = normalize_request_status(candidate_status) - - if normalized_request_level == "release" and candidate_release_data is None: - raise ValueError("request_level=release requires non-null release_data") - if ( - normalized_request_level == "book" - and candidate_release_data is not None - and normalized_candidate_status != "fulfilled" - ): - raise ValueError("request_level=book requires null release_data") if "request_level" in updates: - updates["request_level"] = normalized_request_level + updates["request_level"] = normalize_request_level(updates["request_level"]) if "book_data" in updates: if not isinstance(updates["book_data"], dict): @@ -737,19 +725,17 @@ class UserDB: if current_request is None: return None - if current_request.get("status") != "fulfilled": + if current_request.get("status") != RequestStatus.FULFILLED: return None - current_delivery_state = safe_delivery_state( - current_request.get("delivery_state"), - ) + current_delivery_state = current_request.get("delivery_state", DELIVERY_STATE_NONE) # Terminal hook callbacks can run before delivery-state sync persists "error". # Allow reopening fulfilled requests unless they are already complete. - if current_delivery_state == "complete": + if current_delivery_state == QueueStatus.COMPLETE: return None if ( - current_delivery_state not in {"error", "cancelled"} + current_delivery_state not in {QueueStatus.ERROR, QueueStatus.CANCELLED} and normalized_failure_reason is None ): return None @@ -843,25 +829,6 @@ class UserDB: finally: conn.close() - def clear_request_dismissals(self, *, user_id: int | None) -> int: - """Clear dismissed_at on requests, optionally scoped by owner user_id.""" - normalized_user_id = normalize_optional_positive_int(user_id, "user_id") - params: list[Any] = [] - query = "UPDATE download_requests SET dismissed_at = NULL WHERE dismissed_at IS NOT NULL" - if normalized_user_id is not None: - query += " AND user_id = ?" - params.append(normalized_user_id) - - with self._lock: - conn = self._connect() - try: - cursor = conn.execute(query, params) - conn.commit() - rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0 - return max(rowcount, 0) - finally: - conn.close() - def delete_dismissed_requests(self, *, user_id: int | None) -> int: """Delete dismissed terminal requests, optionally scoped by owner user_id.""" normalized_user_id = normalize_optional_positive_int(user_id, "user_id") diff --git a/shelfmark/download/orchestrator.py b/shelfmark/download/orchestrator.py index 70e6406..381b471 100644 --- a/shelfmark/download/orchestrator.py +++ b/shelfmark/download/orchestrator.py @@ -411,6 +411,14 @@ def _download_task(task_id: str, cancel_flag: Event) -> Optional[str]: exc_type="StatusCallbackError", ) return + # Don't propagate terminal statuses to the queue here. Output modules + # call status_callback("complete") before returning the download path, + # but _process_single_download needs to set download_path on the task + # first so the terminal hook captures it for history persistence. + if status_key in ("complete", "cancelled"): + if message is not None: + book_queue.update_status_message(task_id, message) + return update_download_status(task_id, status, message) # Get the download handler based on the task's source @@ -548,22 +556,10 @@ def update_download_progress(book_id: str, progress: float) -> None: def update_download_status(book_id: str, status: str, message: Optional[str] = None) -> None: """Update download status with optional message for UI display.""" - # Map string status to QueueStatus enum - status_map = { - 'queued': QueueStatus.QUEUED, - 'resolving': QueueStatus.RESOLVING, - 'locating': QueueStatus.LOCATING, - 'downloading': QueueStatus.DOWNLOADING, - 'complete': QueueStatus.COMPLETE, - 'available': QueueStatus.AVAILABLE, - 'error': QueueStatus.ERROR, - 'done': QueueStatus.DONE, - 'cancelled': QueueStatus.CANCELLED, - } - status_key = status.lower() - queue_status_enum = status_map.get(status_key) - if not queue_status_enum: + try: + queue_status_enum = QueueStatus(status_key) + except ValueError: return # Always update activity timestamp (used by stall detection) even if the status @@ -598,16 +594,20 @@ def cancel_download(book_id: str) -> bool: def retry_download(book_id: str) -> Tuple[bool, Optional[str]]: - """Retry a failed standalone download.""" + """Retry a failed or cancelled download. + + Request-linked downloads can only be retried when cancelled (errors + reopen the request for admin re-approval instead). + """ task = book_queue.get_task(book_id) if task is None: return False, "Download not found" status = book_queue.get_task_status(book_id) - if status != QueueStatus.ERROR: - return False, "Download is not in an error state" + if status not in (QueueStatus.ERROR, QueueStatus.CANCELLED): + return False, "Download is not in an error or cancelled state" - if task.request_id: + if task.request_id and status != QueueStatus.CANCELLED: return False, "Request-linked downloads must be retried from requests" task.last_error_message = None diff --git a/shelfmark/main.py b/shelfmark/main.py index 80852b8..d20e8e2 100644 --- a/shelfmark/main.py +++ b/shelfmark/main.py @@ -18,6 +18,7 @@ from werkzeug.security import check_password_hash from werkzeug.wrappers import Response from shelfmark.download import orchestrator as backend +from shelfmark.release_sources import get_source_display_name from shelfmark.release_sources.direct_download import SearchUnavailable from shelfmark.config.settings import _SUPPORTED_BOOK_LANGUAGE from shelfmark.config.env import ( @@ -27,7 +28,7 @@ from shelfmark.config.env import ( ) from shelfmark.core.config import config as app_config from shelfmark.core.logger import setup_logger -from shelfmark.core.models import SearchFilters, QueueStatus +from shelfmark.core.models import SearchFilters, QueueStatus, TERMINAL_QUEUE_STATUSES from shelfmark.core.prefix_middleware import PrefixMiddleware from shelfmark.core.auth_modes import ( get_auth_check_admin_status, @@ -53,9 +54,9 @@ from shelfmark.core.download_history_service import DownloadHistoryService from shelfmark.core.notifications import NotificationContext, NotificationEvent, notify_admin, notify_user from shelfmark.core.request_helpers import ( coerce_bool, - extract_release_source_id, load_users_request_policy_settings, normalize_optional_text, + normalize_positive_int, ) from shelfmark.core.utils import normalize_base_path from shelfmark.api.websocket import ws_manager @@ -1104,16 +1105,10 @@ def _resolve_status_scope(*, require_authenticated: bool = True) -> tuple[bool, def _queue_status_to_final_activity_status(status: QueueStatus) -> str | None: - if status == QueueStatus.COMPLETE: - return "complete" - if status == QueueStatus.ERROR: - return "error" - if status == QueueStatus.CANCELLED: - return "cancelled" - return None + return status.value if status in TERMINAL_QUEUE_STATUSES else None def _queue_status_to_notification_event(status: QueueStatus) -> NotificationEvent | None: - if status in {QueueStatus.COMPLETE, QueueStatus.AVAILABLE, QueueStatus.DONE}: + if status == QueueStatus.COMPLETE: return NotificationEvent.DOWNLOAD_COMPLETE if status == QueueStatus.ERROR: return NotificationEvent.DOWNLOAD_FAILED @@ -1169,6 +1164,41 @@ def _notify_admin_for_terminal_download_status(*, task_id: str, status: QueueSta ) +def _record_download_queued(task_id: str, task: Any) -> None: + """Persist initial download record when a task enters the queue.""" + if download_history_service is None: + return + + owner_user_id = normalize_positive_int(getattr(task, "user_id", None)) + request_id = normalize_positive_int(getattr(task, "request_id", None)) + origin = "requested" if request_id else "direct" + + source_name = normalize_source(getattr(task, "source", None)) + try: + source_display = get_source_display_name(source_name) + except Exception: + source_display = None + + try: + download_history_service.record_download( + task_id=task_id, + user_id=owner_user_id, + username=normalize_optional_text(getattr(task, "username", None)), + request_id=request_id, + source=source_name, + source_display_name=source_display, + title=str(getattr(task, "title", "Unknown title") or "Unknown title"), + author=normalize_optional_text(getattr(task, "author", None)), + format=normalize_optional_text(getattr(task, "format", None)), + size=normalize_optional_text(getattr(task, "size", None)), + preview=normalize_optional_text(getattr(task, "preview", None)), + content_type=normalize_optional_text(getattr(task, "content_type", None)), + origin=origin, + ) + except Exception as exc: + logger.warning("Failed to record download at queue time for task %s: %s", task_id, exc) + + def _record_download_terminal_snapshot(task_id: str, status: QueueStatus, task: Any) -> None: _notify_admin_for_terminal_download_status(task_id=task_id, status=status, task=task) @@ -1176,68 +1206,22 @@ def _record_download_terminal_snapshot(task_id: str, status: QueueStatus, task: if final_status is None: return - raw_owner_user_id = getattr(task, "user_id", None) - try: - owner_user_id = int(raw_owner_user_id) if raw_owner_user_id is not None else None - except (TypeError, ValueError): - owner_user_id = None - - linked_request: dict[str, Any] | None = None - request_id: int | None = None - origin = "direct" - if user_db is not None and owner_user_id is not None: - fulfilled_rows = user_db.list_requests(user_id=owner_user_id, status="fulfilled") - for row in fulfilled_rows: - source_id = extract_release_source_id(row.get("release_data")) - if source_id == task_id: - linked_request = row - origin = "requested" - try: - request_id = int(row.get("id")) - except (TypeError, ValueError): - request_id = None - break - - try: - download_payload = backend._task_to_dict(task) - except Exception as exc: - logger.warning("Failed to serialize task payload for terminal snapshot: %s", exc) - download_payload = { - "id": task_id, - "title": getattr(task, "title", "Unknown title"), - "author": getattr(task, "author", "Unknown author"), - "source": getattr(task, "source", "direct_download"), - "added_time": getattr(task, "added_time", 0), - "status_message": getattr(task, "status_message", None), - "download_path": getattr(task, "download_path", None), - "user_id": getattr(task, "user_id", None), - "username": getattr(task, "username", None), - } - if download_history_service is not None: try: - download_history_service.record_terminal( - user_id=owner_user_id, + download_history_service.finalize_download( task_id=task_id, - username=normalize_optional_text(getattr(task, "username", None)), - request_id=request_id, - source=normalize_source(getattr(task, "source", None)), - source_display_name=download_payload.get("source_display_name"), - title=str(download_payload.get("title") or getattr(task, "title", "Unknown title")), - author=normalize_optional_text(download_payload.get("author")), - format=normalize_optional_text(download_payload.get("format")), - size=normalize_optional_text(download_payload.get("size")), - preview=normalize_optional_text(download_payload.get("preview")), - content_type=normalize_optional_text(download_payload.get("content_type")), - origin=origin, final_status=final_status, - status_message=normalize_optional_text(download_payload.get("status_message")), - download_path=normalize_optional_text(download_payload.get("download_path")), + status_message=normalize_optional_text(getattr(task, "status_message", None)), + download_path=normalize_optional_text(getattr(task, "download_path", None)), ) except Exception as exc: - logger.warning("Failed to record terminal download history for task %s: %s", task_id, exc) + logger.warning("Failed to finalize download history for task %s: %s", task_id, exc) - if user_db is None or linked_request is None or request_id is None or status != QueueStatus.ERROR: + if user_db is None or status != QueueStatus.ERROR: + return + + request_id = normalize_positive_int(getattr(task, "request_id", None)) + if request_id is None: return raw_error_message = getattr(task, "status_message", None) @@ -1280,18 +1264,7 @@ def _task_owned_by_actor(task: Any, *, actor_user_id: int | None, actor_username return False -def _is_graduated_request_download(task_id: str, *, user_id: int) -> bool: - if user_db is None: - return False - - fulfilled_rows = user_db.list_requests(user_id=user_id, status="fulfilled") - for row in fulfilled_rows: - source_id = extract_release_source_id(row.get("release_data")) - if source_id == task_id: - return True - return False - - +backend.book_queue.set_queue_hook(_record_download_queued) backend.book_queue.set_terminal_status_hook(_record_download_terminal_snapshot) @@ -1502,7 +1475,7 @@ def api_cancel_download(book_id: str) -> Union[Response, Tuple[Response, int]]: ): return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403 - if _is_graduated_request_download(book_id, user_id=db_user_id): + if getattr(task, "request_id", None) is not None: return jsonify({"error": "Forbidden", "code": "requested_download_cancel_forbidden"}), 403 success = backend.cancel_download(book_id) @@ -1537,16 +1510,8 @@ def api_retry_download(book_id: str) -> Union[Response, Tuple[Response, int]]: ): return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403 - raw_owner_user_id = getattr(task, "user_id", None) - try: - owner_user_id = int(raw_owner_user_id) if raw_owner_user_id is not None else None - except (TypeError, ValueError): - owner_user_id = None - - is_request_linked = bool(getattr(task, "request_id", None)) - if not is_request_linked and owner_user_id is not None: - is_request_linked = _is_graduated_request_download(book_id, user_id=owner_user_id) - if is_request_linked: + task_status = backend.book_queue.get_task_status(book_id) + if getattr(task, "request_id", None) is not None and task_status != QueueStatus.CANCELLED: return jsonify({"error": "Forbidden", "code": "requested_download_retry_forbidden"}), 403 success, error = backend.retry_download(book_id) @@ -2465,6 +2430,7 @@ def api_settings_get_all() -> Union[Response, Tuple[Response, int]]: # This triggers the @register_settings decorators import shelfmark.config.settings # noqa: F401 import shelfmark.config.security # noqa: F401 + import shelfmark.config.users_settings # noqa: F401 import shelfmark.config.notifications_settings # noqa: F401 data = serialize_all_settings(include_values=True) @@ -2495,6 +2461,7 @@ def api_settings_get_tab(tab_name: str) -> Union[Response, Tuple[Response, int]] # Ensure settings are registered import shelfmark.config.settings # noqa: F401 import shelfmark.config.security # noqa: F401 + import shelfmark.config.users_settings # noqa: F401 import shelfmark.config.notifications_settings # noqa: F401 tab = get_settings_tab(tab_name) @@ -2531,6 +2498,7 @@ def api_settings_update_tab(tab_name: str) -> Union[Response, Tuple[Response, in # Ensure settings are registered import shelfmark.config.settings # noqa: F401 import shelfmark.config.security # noqa: F401 + import shelfmark.config.users_settings # noqa: F401 import shelfmark.config.notifications_settings # noqa: F401 tab = get_settings_tab(tab_name) @@ -2578,6 +2546,7 @@ def api_settings_execute_action(tab_name: str, action_key: str) -> Union[Respons # Ensure settings are registered import shelfmark.config.settings # noqa: F401 import shelfmark.config.security # noqa: F401 + import shelfmark.config.users_settings # noqa: F401 import shelfmark.config.notifications_settings # noqa: F401 # Get current form values if provided (for testing with unsaved values) diff --git a/src/frontend/src/App.tsx b/src/frontend/src/App.tsx index 2e96194..dacd30e 100644 --- a/src/frontend/src/App.tsx +++ b/src/frontend/src/App.tsx @@ -299,16 +299,29 @@ function App() { }, [currentStatus, dismissedDownloadTaskIds]); // Use real-time buckets for active work and merge persisted terminal buckets - // so completed/errored entries survive restarts. - const activitySidebarStatus = useMemo(() => ({ + // so completed/errored entries survive restarts. Filter out dismissed items + // so the sidebar counts stay consistent with the activity panel. + const activitySidebarStatus = useMemo(() => { + const filterDismissed = ( + bucket: Record | undefined + ): Record | undefined => { + if (!bucket || dismissedDownloadTaskIds.size === 0) return bucket; + const filtered = Object.fromEntries( + Object.entries(bucket).filter(([taskId]) => !dismissedDownloadTaskIds.has(taskId)) + ) as Record; + return Object.keys(filtered).length > 0 ? filtered : undefined; + }; + + return { queued: currentStatus.queued, resolving: currentStatus.resolving, locating: currentStatus.locating, downloading: currentStatus.downloading, - complete: mergeTerminalBucket(activityStatus.complete, currentStatus.complete), - error: mergeTerminalBucket(activityStatus.error, currentStatus.error), - cancelled: mergeTerminalBucket(activityStatus.cancelled, currentStatus.cancelled), - }), [activityStatus, currentStatus]); + complete: filterDismissed(mergeTerminalBucket(activityStatus.complete, currentStatus.complete)), + error: filterDismissed(mergeTerminalBucket(activityStatus.error, currentStatus.error)), + cancelled: filterDismissed(mergeTerminalBucket(activityStatus.cancelled, currentStatus.cancelled)), + }; + }, [activityStatus, currentStatus, dismissedDownloadTaskIds]); const showRequestsTab = useMemo(() => { if (requestRoleIsAdmin) { diff --git a/src/frontend/src/components/activity/activityCardModel.ts b/src/frontend/src/components/activity/activityCardModel.ts index a27c3c3..4ee0ceb 100644 --- a/src/frontend/src/components/activity/activityCardModel.ts +++ b/src/frontend/src/components/activity/activityCardModel.ts @@ -175,6 +175,19 @@ const buildActions = (item: ActivityItem, isAdmin: boolean): ActivityCardAction[ }, ]; } + if (item.visualStatus === 'cancelled') { + return [ + { + kind: 'download-retry', + bookId: item.downloadBookId, + }, + { + kind: 'download-dismiss', + bookId: item.downloadBookId, + linkedRequestId: item.requestId, + }, + ]; + } return [ { kind: 'download-dismiss', diff --git a/src/frontend/src/hooks/useActivity.ts b/src/frontend/src/hooks/useActivity.ts index f303d28..39cc89f 100644 --- a/src/frontend/src/hooks/useActivity.ts +++ b/src/frontend/src/hooks/useActivity.ts @@ -407,6 +407,7 @@ export const useActivity = ({ void clearActivityHistory() .then(() => { void refreshActivitySnapshot(); + void refreshActivityHistory(); }) .catch((error) => { console.error('Clear history failed:', error); diff --git a/src/frontend/src/services/api.ts b/src/frontend/src/services/api.ts index 590be07..8956a05 100644 --- a/src/frontend/src/services/api.ts +++ b/src/frontend/src/services/api.ts @@ -430,7 +430,7 @@ export interface ActivityDismissPayload { } export interface ActivityHistoryItem { - id: number; + id: string; user_id: number; item_type: 'download' | 'request'; item_key: string; diff --git a/src/frontend/src/types/index.ts b/src/frontend/src/types/index.ts index 29fe566..b3a82c2 100644 --- a/src/frontend/src/types/index.ts +++ b/src/frontend/src/types/index.ts @@ -207,7 +207,7 @@ export interface RequestRecord { id: number; user_id: number; status: 'pending' | 'fulfilled' | 'rejected' | 'cancelled'; - delivery_state?: 'none' | 'unknown' | 'queued' | 'resolving' | 'locating' | 'downloading' | 'complete' | 'error' | 'cancelled'; + delivery_state?: 'none' | 'queued' | 'resolving' | 'locating' | 'downloading' | 'complete' | 'error' | 'cancelled'; delivery_updated_at?: string | null; last_failure_reason?: string | null; source_hint: string | null; diff --git a/tests/core/test_activity_routes_api.py b/tests/core/test_activity_routes_api.py index 4f5d6fe..1f3b7a2 100644 --- a/tests/core/test_activity_routes_api.py +++ b/tests/core/test_activity_routes_api.py @@ -53,8 +53,10 @@ def _record_terminal_download( final_status: str = "complete", request_id: int | None = None, status_message: str | None = None, + download_path: str | None = None, ) -> None: - main_module.download_history_service.record_terminal( + svc = main_module.download_history_service + svc.record_download( task_id=task_id, user_id=user_id, username=username, @@ -68,9 +70,12 @@ def _record_terminal_download( preview=None, content_type="ebook", origin=origin, + ) + svc.finalize_download( + task_id=task_id, final_status=final_status, status_message=status_message, - download_path=None, + download_path=download_path, ) @@ -162,6 +167,36 @@ class TestActivityRoutes: assert history_after_clear.status_code == 200 assert history_after_clear.json == [] + def test_dismiss_preserves_terminal_snapshot_without_live_queue_merge(self, main_module, client): + user = _create_user(main_module, prefix="reader") + _set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False) + + task_id = "dismiss-preserve-task" + _record_terminal_download( + main_module, + task_id=task_id, + user_id=user["id"], + username=user["username"], + title="Recorded Title", + author="Recorded Author", + status_message="Complete", + ) + + with patch.object(main_module, "get_auth_mode", return_value="builtin"): + dismiss_response = client.post( + "/api/activity/dismiss", + json={"item_type": "download", "item_key": f"download:{task_id}"}, + ) + history_response = client.get("/api/activity/history?limit=10&offset=0") + + assert dismiss_response.status_code == 200 + assert history_response.status_code == 200 + assert history_response.json[0]["item_key"] == f"download:{task_id}" + snapshot_download = history_response.json[0]["snapshot"]["download"] + assert snapshot_download["title"] == "Recorded Title" + assert snapshot_download["author"] == "Recorded Author" + assert snapshot_download["status_message"] is None + def test_clear_history_deletes_dismissed_requests_from_snapshot(self, main_module, client): user = _create_user(main_module, prefix="reader") _set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False) @@ -248,27 +283,6 @@ class TestActivityRoutes: user_id=user["id"], username=user["username"], title="History Local Download", - ) - - row = main_module.download_history_service.get_by_task_id(task_id) - assert row is not None - assert main_module.download_history_service is not None - main_module.download_history_service.record_terminal( - task_id=task_id, - user_id=user["id"], - username=user["username"], - request_id=row.get("request_id"), - source=row.get("source") or "direct_download", - source_display_name=row.get("source_display_name"), - title=row.get("title") or "History Local Download", - author=row.get("author"), - format=row.get("format"), - size=row.get("size"), - preview=row.get("preview"), - content_type=row.get("content_type"), - origin=row.get("origin") or "direct", - final_status=row.get("final_status") or "complete", - status_message=row.get("status_message"), download_path=str(file_path), ) @@ -295,7 +309,7 @@ class TestActivityRoutes: "provider_id": "legacy-fulfilled-1", }, status="fulfilled", - delivery_state="unknown", + delivery_state="none", ) with patch.object(main_module, "get_auth_mode", return_value="builtin"): @@ -355,6 +369,51 @@ class TestActivityRoutes: to=f"user_{user['id']}", ) + def test_dismiss_many_preserves_terminal_snapshots_without_live_queue_merge(self, main_module, client): + user = _create_user(main_module, prefix="reader") + _set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False) + + first_task_id = "dismiss-many-preserve-1" + second_task_id = "dismiss-many-preserve-2" + _record_terminal_download( + main_module, + task_id=first_task_id, + user_id=user["id"], + username=user["username"], + title="First Title", + author="First Author", + ) + _record_terminal_download( + main_module, + task_id=second_task_id, + user_id=user["id"], + username=user["username"], + title="Second Title", + author="Second Author", + ) + + with patch.object(main_module, "get_auth_mode", return_value="builtin"): + dismiss_many_response = client.post( + "/api/activity/dismiss-many", + json={ + "items": [ + {"item_type": "download", "item_key": f"download:{first_task_id}"}, + {"item_type": "download", "item_key": f"download:{second_task_id}"}, + ] + }, + ) + history_response = client.get("/api/activity/history?limit=20&offset=0") + + assert dismiss_many_response.status_code == 200 + assert dismiss_many_response.json["count"] == 2 + assert history_response.status_code == 200 + + rows_by_key = {row["item_key"]: row for row in history_response.json} + assert rows_by_key[f"download:{first_task_id}"]["snapshot"]["download"]["title"] == "First Title" + assert rows_by_key[f"download:{first_task_id}"]["snapshot"]["download"]["author"] == "First Author" + assert rows_by_key[f"download:{second_task_id}"]["snapshot"]["download"]["title"] == "Second Title" + assert rows_by_key[f"download:{second_task_id}"]["snapshot"]["download"]["author"] == "Second Author" + def test_no_auth_dismiss_many_and_history_use_shared_identity(self, main_module): task_id = f"no-auth-{uuid.uuid4().hex[:10]}" item_key = f"download:{task_id}" @@ -513,6 +572,77 @@ class TestActivityRoutes: assert "cross-user-expired-task" in response.json["status"]["complete"] assert response.json["status"]["complete"]["cross-user-expired-task"]["id"] == "cross-user-expired-task" + def test_snapshot_shows_stale_active_download_as_interrupted_error(self, main_module, client): + user = _create_user(main_module, prefix="reader") + _set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False) + + # Record a download at queue time (active status) but don't put it in the queue + main_module.download_history_service.record_download( + task_id="stale-active-task", + user_id=user["id"], + username=user["username"], + request_id=None, + source="direct_download", + source_display_name="Direct Download", + title="Stale Active Task", + author="Stale Author", + format="epub", + size="1 MB", + preview=None, + content_type="ebook", + origin="direct", + ) + + with patch.object(main_module, "get_auth_mode", return_value="builtin"): + with patch.object(main_module.backend, "queue_status", return_value=_sample_status_payload()): + response = client.get("/api/activity/snapshot") + + assert response.status_code == 200 + assert "stale-active-task" in response.json["status"]["error"] + assert response.json["status"]["error"]["stale-active-task"]["status_message"] == "Interrupted" + + def test_snapshot_active_download_with_queue_entry_shows_in_correct_bucket(self, main_module, client): + user = _create_user(main_module, prefix="reader") + _set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False) + + # Record a download at queue time + main_module.download_history_service.record_download( + task_id="active-downloading-task", + user_id=user["id"], + username=user["username"], + request_id=None, + source="direct_download", + source_display_name="Direct Download", + title="Active Downloading Task", + author="Active Author", + format="epub", + size="2 MB", + preview=None, + content_type="ebook", + origin="direct", + ) + + # Simulate it being active in the queue + active_status = _sample_status_payload() + active_status["downloading"] = { + "active-downloading-task": { + "id": "active-downloading-task", + "title": "Active Downloading Task", + "author": "Active Author", + "source": "direct_download", + "progress": 0.5, + "status_message": "Downloading 50%", + } + } + + with patch.object(main_module, "get_auth_mode", return_value="builtin"): + with patch.object(main_module.backend, "queue_status", return_value=active_status): + response = client.get("/api/activity/snapshot") + + assert response.status_code == 200 + assert "active-downloading-task" in response.json["status"]["downloading"] + assert response.json["status"]["downloading"]["active-downloading-task"]["progress"] == 0.5 + def test_snapshot_clears_stale_download_dismissal_when_same_task_is_active(self, main_module, client): user = _create_user(main_module, prefix="reader") _set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False) diff --git a/tests/core/test_activity_terminal_snapshots.py b/tests/core/test_activity_terminal_snapshots.py index df79107..6af35d9 100644 --- a/tests/core/test_activity_terminal_snapshots.py +++ b/tests/core/test_activity_terminal_snapshots.py @@ -93,6 +93,7 @@ class TestTerminalSnapshotCapture: title="Requested Snapshot", user_id=user["id"], username=user["username"], + request_id=request_row["id"], ) assert main_module.backend.book_queue.add(task) is True @@ -193,6 +194,103 @@ class TestTerminalSnapshotCapture: finally: main_module.backend.book_queue.cancel_download(task_id) + def test_queue_hook_records_active_row_at_queue_time(self, main_module): + user = _create_user(main_module, prefix="snap-queue") + task_id = f"queue-{uuid.uuid4().hex[:8]}" + task = DownloadTask( + task_id=task_id, + source="direct_download", + title="Queue Time Snapshot", + author="Queue Author", + user_id=user["id"], + username=user["username"], + ) + assert main_module.backend.book_queue.add(task) is True + + try: + row = _read_download_history_row(main_module, task_id) + assert row is not None + assert row["final_status"] == "active" + assert row["user_id"] == user["id"] + assert row["task_id"] == task_id + assert row["origin"] == "direct" + assert row["title"] == "Queue Time Snapshot" + assert row["author"] == "Queue Author" + assert row["queued_at"] is not None + finally: + main_module.backend.book_queue.cancel_download(task_id) + + def test_queue_hook_records_requested_origin_for_request_linked_task(self, main_module): + user = _create_user(main_module, prefix="snap-queue-req") + task_id = f"queue-req-{uuid.uuid4().hex[:8]}" + request_row = main_module.user_db.create_request( + user_id=user["id"], + content_type="ebook", + request_level="release", + policy_mode="request_release", + book_data={ + "title": "Requested Queue", + "author": "Request Author", + "provider": "openlibrary", + "provider_id": "queue-req-1", + }, + release_data={ + "source": "prowlarr", + "source_id": task_id, + "title": "Requested Queue.epub", + }, + status="fulfilled", + delivery_state="queued", + ) + task = DownloadTask( + task_id=task_id, + source="prowlarr", + title="Requested Queue", + user_id=user["id"], + username=user["username"], + request_id=request_row["id"], + ) + assert main_module.backend.book_queue.add(task) is True + + try: + row = _read_download_history_row(main_module, task_id) + assert row is not None + assert row["final_status"] == "active" + assert row["origin"] == "requested" + assert row["request_id"] == request_row["id"] + finally: + main_module.backend.book_queue.cancel_download(task_id) + + def test_finalize_updates_active_row_to_terminal(self, main_module): + user = _create_user(main_module, prefix="snap-finalize") + task_id = f"finalize-{uuid.uuid4().hex[:8]}" + task = DownloadTask( + task_id=task_id, + source="direct_download", + title="Finalize Snapshot", + user_id=user["id"], + username=user["username"], + ) + assert main_module.backend.book_queue.add(task) is True + + try: + # Verify active row exists + row = _read_download_history_row(main_module, task_id) + assert row is not None + assert row["final_status"] == "active" + + # Transition to complete + main_module.backend.book_queue.update_status(task_id, QueueStatus.COMPLETE) + + row = _read_download_history_row(main_module, task_id) + assert row is not None + assert row["final_status"] == "complete" + # Metadata from queue-time should be preserved + assert row["title"] == "Finalize Snapshot" + assert row["user_id"] == user["id"] + finally: + main_module.backend.book_queue.cancel_download(task_id) + def test_cancelled_transition_does_not_trigger_notification(self, main_module): user = _create_user(main_module, prefix="snap-notify-cancel") task_id = f"notify-cancel-{uuid.uuid4().hex[:8]}" diff --git a/tests/core/test_download_api_guardrails.py b/tests/core/test_download_api_guardrails.py index 4c1dd6d..95ad067 100644 --- a/tests/core/test_download_api_guardrails.py +++ b/tests/core/test_download_api_guardrails.py @@ -535,7 +535,7 @@ class TestCancelDownloadEndpointGuardrails: db_user_id=user["id"], is_admin=False, ) - main_module.user_db.create_request( + request_row = main_module.user_db.create_request( user_id=user["id"], content_type="ebook", request_level="release", @@ -560,6 +560,7 @@ class TestCancelDownloadEndpointGuardrails: title="Requested Book", user_id=user["id"], username=user["username"], + request_id=request_row["id"], ) with patch.object(main_module, "get_auth_mode", return_value="builtin"): @@ -721,7 +722,7 @@ class TestRetryDownloadEndpointGuardrails: db_user_id=user["id"], is_admin=False, ) - main_module.user_db.create_request( + request_row = main_module.user_db.create_request( user_id=user["id"], content_type="ebook", request_level="release", @@ -746,6 +747,7 @@ class TestRetryDownloadEndpointGuardrails: title="Requested Book", user_id=user["id"], username=user["username"], + request_id=request_row["id"], ) with patch.object(main_module, "get_auth_mode", return_value="builtin"): diff --git a/tests/core/test_request_routes_api.py b/tests/core/test_request_routes_api.py index d6ab289..0117af6 100644 --- a/tests/core/test_request_routes_api.py +++ b/tests/core/test_request_routes_api.py @@ -922,7 +922,7 @@ class TestRequestRoutes: fulfil_resp = client.post(f"/api/admin/requests/{request_id}/fulfil", json={}) assert fulfil_resp.status_code == 400 - assert "release_data is required to fulfil book-level requests" in fulfil_resp.json["error"] + assert "release_data is required to fulfil requests" in fulfil_resp.json["error"] def test_admin_fulfil_book_level_request_manual_approval(self, main_module, client): user = _create_user(main_module, prefix="reader") diff --git a/tests/core/test_requests_service.py b/tests/core/test_requests_service.py index 9a1b831..cd10497 100644 --- a/tests/core/test_requests_service.py +++ b/tests/core/test_requests_service.py @@ -307,7 +307,7 @@ def test_fulfil_request_requires_release_data_for_book_level(user_db): book_data=_book_data(), ) - with pytest.raises(RequestServiceError, match="release_data is required to fulfil book-level requests"): + with pytest.raises(RequestServiceError, match="release_data is required to fulfil requests"): fulfil_request( user_db, request_id=created["id"], diff --git a/tests/core/test_user_db.py b/tests/core/test_user_db.py index f28947d..57f3694 100644 --- a/tests/core/test_user_db.py +++ b/tests/core/test_user_db.py @@ -818,9 +818,6 @@ class TestDownloadRequests: book_data=self._book_data(), ) - with pytest.raises(ValueError, match="request_level=release requires non-null release_data"): - user_db.update_request(created["id"], request_level="release") - updated = user_db.update_request( created["id"], request_level="release", @@ -1015,36 +1012,6 @@ class TestDownloadRequests: bob_rows = user_db.list_dismissed_requests(user_id=bob["id"]) assert [row["id"] for row in bob_rows] == [second["id"]] - def test_clear_request_dismissals_scopes_by_user(self, user_db): - alice = user_db.create_user(username="alice") - bob = user_db.create_user(username="bob") - alice_request = user_db.create_request( - user_id=alice["id"], - content_type="ebook", - request_level="book", - policy_mode="request_book", - book_data=self._book_data(), - ) - bob_request = user_db.create_request( - user_id=bob["id"], - content_type="ebook", - request_level="book", - policy_mode="request_book", - book_data=self._book_data(), - ) - - user_db.update_request(alice_request["id"], dismissed_at="2026-01-01T10:00:00+00:00") - user_db.update_request(bob_request["id"], dismissed_at="2026-01-01T11:00:00+00:00") - - cleared_alice = user_db.clear_request_dismissals(user_id=alice["id"]) - assert cleared_alice == 1 - assert user_db.get_request(alice_request["id"])["dismissed_at"] is None - assert user_db.get_request(bob_request["id"])["dismissed_at"] is not None - - cleared_all = user_db.clear_request_dismissals(user_id=None) - assert cleared_all == 1 - assert user_db.get_request(bob_request["id"])["dismissed_at"] is None - def test_delete_dismissed_requests_scopes_by_user_and_only_deletes_terminal(self, user_db): alice = user_db.create_user(username="alice") bob = user_db.create_user(username="bob") @@ -1092,8 +1059,5 @@ class TestDownloadRequests: with pytest.raises(ValueError, match="user_id must be a positive integer"): user_db.list_dismissed_requests(user_id=0) - with pytest.raises(ValueError, match="user_id must be a positive integer"): - user_db.clear_request_dismissals(user_id=-1) - with pytest.raises(ValueError, match="user_id must be a positive integer"): user_db.delete_dismissed_requests(user_id=0)