Download history refactor pt.2 (#703)

Two-phase download history: downloads are now recorded in the DB at
queue time (not just at terminal time), eliminating the need to
reconstruct metadata in the terminal hook and removing the
`_is_graduated_request_download()` request-scan mess
This commit is contained in:
Alex
2026-03-05 13:06:34 +00:00
committed by GitHub
parent fbe25725d3
commit cce2c10704
22 changed files with 776 additions and 570 deletions

View File

@@ -7,22 +7,22 @@ from typing import Any, Callable, NamedTuple
from flask import Flask, jsonify, request, session
from shelfmark.core.download_history_service import DownloadHistoryService
from shelfmark.core.download_history_service import ACTIVE_DOWNLOAD_STATUS, DownloadHistoryService, VALID_TERMINAL_STATUSES
from shelfmark.core.logger import setup_logger
from shelfmark.core.models import ACTIVE_QUEUE_STATUSES, QueueStatus, TERMINAL_QUEUE_STATUSES
from shelfmark.core.request_validation import RequestStatus
from shelfmark.core.request_helpers import (
emit_ws_event,
extract_release_source_id,
normalize_optional_text,
normalize_positive_int,
now_utc_iso,
populate_request_usernames,
)
from shelfmark.core.user_db import UserDB
logger = setup_logger(__name__)
# Offset added to request row IDs so they don't collide with download_history
# row IDs when both types are merged into a single sorted list.
_REQUEST_ID_OFFSET = 1_000_000_000
def _parse_timestamp(value: Any) -> float:
if not isinstance(value, str) or not value.strip():
@@ -152,16 +152,20 @@ def _activity_ws_room(*, is_no_auth: bool, actor_db_user_id: int | None) -> str:
return "admins"
def _check_item_ownership(actor: _ActorContext, row: dict[str, Any]) -> Any | None:
"""Return a 403 response if the actor doesn't own the item, else None."""
if actor.is_admin:
return None
owner_user_id = normalize_positive_int(row.get("user_id"))
if owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
return None
def _list_visible_requests(user_db: UserDB, *, is_admin: bool, db_user_id: int | None) -> list[dict[str, Any]]:
if is_admin:
request_rows = user_db.list_requests()
user_cache: dict[int, str] = {}
for row in request_rows:
requester_id = row["user_id"]
if requester_id not in user_cache:
requester = user_db.get_user(user_id=requester_id)
user_cache[requester_id] = requester.get("username", "") if requester else ""
row["username"] = user_cache[requester_id]
populate_request_usernames(request_rows, user_db)
return request_rows
if db_user_id is None:
@@ -169,51 +173,79 @@ def _list_visible_requests(user_db: UserDB, *, is_admin: bool, db_user_id: int |
return user_db.list_requests(user_id=db_user_id)
def _parse_download_item_key(item_key: Any) -> str | None:
if not isinstance(item_key, str) or not item_key.startswith("download:"):
def _parse_item_key(item_key: Any, prefix: str) -> str | None:
"""Extract the value after 'prefix:' from an item_key string."""
if not isinstance(item_key, str) or not item_key.startswith(f"{prefix}:"):
return None
task_id = item_key.split(":", 1)[1].strip()
return task_id or None
value = item_key.split(":", 1)[1].strip()
return value or None
def _parse_request_item_key(item_key: Any) -> int | None:
if not isinstance(item_key, str) or not item_key.startswith("request:"):
return None
raw_id = item_key.split(":", 1)[1].strip()
return normalize_positive_int(raw_id)
_ALL_BUCKET_KEYS = (*ACTIVE_QUEUE_STATUSES, *TERMINAL_QUEUE_STATUSES)
def _merge_terminal_snapshot_backfill(
def _build_download_status_from_db(
*,
status: dict[str, dict[str, Any]],
terminal_rows: list[dict[str, Any]],
) -> None:
existing_task_ids: set[str] = set()
for bucket_key in ("queued", "resolving", "locating", "downloading", "complete", "error", "cancelled"):
bucket = status.get(bucket_key)
db_rows: list[dict[str, Any]],
queue_status: dict[str, dict[str, Any]],
) -> dict[str, dict[str, Any]]:
"""Build the download status dict from DB rows, overlaying live queue data.
Active DB rows are matched against the queue for live progress.
Terminal DB rows go directly into their final bucket.
Stale active rows (no queue entry) are treated as interrupted errors.
"""
status: dict[str, dict[str, Any]] = {key: {} for key in _ALL_BUCKET_KEYS}
# Index queue items by task_id for fast lookup: task_id -> (bucket_key, payload)
queue_index: dict[str, tuple[str, dict[str, Any]]] = {}
for bucket_key in _ALL_BUCKET_KEYS:
bucket = queue_status.get(bucket_key)
if not isinstance(bucket, dict):
continue
existing_task_ids.update(str(task_id) for task_id in bucket.keys())
for task_id, payload in bucket.items():
queue_index[str(task_id)] = (bucket_key, payload)
for row in terminal_rows:
for row in db_rows:
task_id = str(row.get("task_id") or "").strip()
if not task_id or task_id in existing_task_ids:
if not task_id:
continue
final_status = row.get("final_status")
if final_status not in {"complete", "error", "cancelled"}:
continue
download_payload = DownloadHistoryService.to_download_payload(row)
if final_status not in status or not isinstance(status.get(final_status), dict):
status[final_status] = {}
status[final_status][task_id] = download_payload
existing_task_ids.add(task_id)
if final_status == ACTIVE_DOWNLOAD_STATUS:
queue_entry = queue_index.pop(task_id, None)
if queue_entry is not None:
bucket_key, queue_payload = queue_entry
status[bucket_key][task_id] = queue_payload
else:
# Stale active row — no queue entry means it was interrupted
download_payload = DownloadHistoryService.to_download_payload(row)
download_payload["status_message"] = "Interrupted"
status[QueueStatus.ERROR][task_id] = download_payload
elif final_status in VALID_TERMINAL_STATUSES:
download_payload = DownloadHistoryService.to_download_payload(row)
# For complete/cancelled the saved status_message is a stale
# progress string (e.g. "Fetching download sources") — clear it
# so the frontend only shows its own status label. Error rows
# keep theirs since the message describes the failure.
if final_status in ("complete", "cancelled"):
download_payload["status_message"] = None
status[final_status][task_id] = download_payload
# Include any queue items that don't have a DB row yet (race condition safety).
# Only include active items — terminal items without a DB row are orphans
# (e.g. admin cleared history) and should not be shown.
for task_id, (bucket_key, queue_payload) in queue_index.items():
if bucket_key in ACTIVE_QUEUE_STATUSES and task_id not in status.get(bucket_key, {}):
status[bucket_key][task_id] = queue_payload
return status
def _collect_active_download_task_ids(status: dict[str, dict[str, Any]]) -> set[str]:
active_task_ids: set[str] = set()
for bucket_key in ("queued", "resolving", "locating", "downloading"):
for bucket_key in ACTIVE_QUEUE_STATUSES:
bucket = status.get(bucket_key)
if not isinstance(bucket, dict):
continue
@@ -226,19 +258,19 @@ def _collect_active_download_task_ids(status: dict[str, dict[str, Any]]) -> set[
def _request_terminal_status(row: dict[str, Any]) -> str | None:
request_status = row.get("status")
if request_status == "pending":
if request_status == RequestStatus.PENDING:
return None
if request_status == "rejected":
return "rejected"
if request_status == "cancelled":
return "cancelled"
if request_status != "fulfilled":
if request_status == RequestStatus.REJECTED:
return RequestStatus.REJECTED
if request_status == RequestStatus.CANCELLED:
return RequestStatus.CANCELLED
if request_status != RequestStatus.FULFILLED:
return None
delivery_state = str(row.get("delivery_state") or "").strip().lower()
if delivery_state in {"error", "cancelled"}:
if delivery_state in {QueueStatus.ERROR, QueueStatus.CANCELLED}:
return delivery_state
return "complete"
return QueueStatus.COMPLETE
def _minimal_request_snapshot(request_row: dict[str, Any], request_id: int) -> dict[str, Any]:
@@ -273,11 +305,12 @@ def _request_history_entry(request_row: dict[str, Any]) -> dict[str, Any] | None
if request_id is None:
return None
final_status = _request_terminal_status(request_row)
item_key = f"request:{request_id}"
return {
"id": _REQUEST_ID_OFFSET + request_id,
"id": item_key,
"user_id": request_row.get("user_id"),
"item_type": "request",
"item_key": f"request:{request_id}",
"item_key": item_key,
"dismissed_at": request_row.get("dismissed_at"),
"snapshot": _minimal_request_snapshot(request_row, request_id),
"origin": "request",
@@ -338,7 +371,22 @@ def register_activity_routes(
owner_user_scope = None if is_admin else db_user_id
status = queue_status(user_id=owner_user_scope)
live_queue = queue_status(user_id=owner_user_scope)
try:
db_rows = download_history_service.get_undismissed(
user_id=owner_user_scope,
limit=200,
)
except Exception as exc:
logger.warning("Failed to load undismissed download rows: %s", exc)
db_rows = []
status = _build_download_status_from_db(
db_rows=db_rows,
queue_status=live_queue,
)
updated_requests = sync_request_delivery_states(
user_db,
queue_status=status,
@@ -347,15 +395,6 @@ def register_activity_routes(
emit_request_updates(updated_requests)
request_rows = _list_visible_requests(user_db, is_admin=is_admin, db_user_id=db_user_id)
try:
terminal_rows = download_history_service.get_undismissed_terminal(
user_id=owner_user_scope,
limit=200,
)
_merge_terminal_snapshot_backfill(status=status, terminal_rows=terminal_rows)
except Exception as exc:
logger.warning("Failed to merge terminal download history rows: %s", exc)
dismissed: list[dict[str, str]] = []
dismissed_task_ids: list[str] = []
try:
@@ -431,29 +470,27 @@ def register_activity_routes(
dismissal_item: dict[str, str] | None = None
if item_type == "download":
task_id = _parse_download_item_key(item_key)
task_id = _parse_item_key(item_key, "download")
if task_id is None:
return jsonify({"error": "item_key must be in the format download:<task_id>"}), 400
existing = download_history_service.get_by_task_id(task_id)
if existing is None:
return jsonify({"error": "Activity item not found"}), 404
# Row already gone (e.g. admin cleared history) — treat as success
dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"}
else:
ownership_gate = _check_item_ownership(actor, existing)
if ownership_gate is not None:
return ownership_gate
owner_user_id = normalize_positive_int(existing.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
dismissed_count = download_history_service.dismiss(
task_id=task_id,
user_id=actor.owner_scope,
)
if dismissed_count < 1:
return jsonify({"error": "Activity item not found"}), 404
dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"}
download_history_service.dismiss(
task_id=task_id,
user_id=actor.owner_scope,
)
dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"}
elif item_type == "request":
request_id = _parse_request_item_key(item_key)
request_id = normalize_positive_int(_parse_item_key(item_key, "request"))
if request_id is None:
return jsonify({"error": "item_key must be in the format request:<id>"}), 400
@@ -461,9 +498,9 @@ def register_activity_routes(
if request_row is None:
return jsonify({"error": "Request not found"}), 404
owner_user_id = normalize_positive_int(request_row.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
ownership_gate = _check_item_ownership(actor, request_row)
if ownership_gate is not None:
return ownership_gate
user_db.update_request(request_id, dismissed_at=now_utc_iso())
dismissal_item = {"item_type": "request", "item_key": f"request:{request_id}"}
@@ -515,28 +552,28 @@ def register_activity_routes(
item_key = item.get("item_key")
if item_type == "download":
task_id = _parse_download_item_key(item_key)
task_id = _parse_item_key(item_key, "download")
if task_id is None:
return jsonify({"error": "download item_key must be in the format download:<task_id>"}), 400
existing = download_history_service.get_by_task_id(task_id)
if existing is None:
continue
owner_user_id = normalize_positive_int(existing.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
ownership_gate = _check_item_ownership(actor, existing)
if ownership_gate is not None:
return ownership_gate
download_task_ids.append(task_id)
continue
if item_type == "request":
request_id = _parse_request_item_key(item_key)
request_id = normalize_positive_int(_parse_item_key(item_key, "request"))
if request_id is None:
return jsonify({"error": "request item_key must be in the format request:<id>"}), 400
request_row = user_db.get_request(request_id)
if request_row is None:
continue
owner_user_id = normalize_positive_int(request_row.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
ownership_gate = _check_item_ownership(actor, request_row)
if ownership_gate is not None:
return ownership_gate
request_ids.append(request_id)
continue
@@ -591,14 +628,14 @@ def register_activity_routes(
if offset < 0:
return jsonify({"error": "offset must be a non-negative integer"}), 400
# We combine download + request history and apply pagination over the merged list.
max_rows = min(limit + offset + 500, 5000)
# Fetch enough from each source to fill the requested page after merging.
merge_limit = offset + limit
download_history_rows = download_history_service.get_history(
user_id=actor.owner_scope,
limit=max_rows,
limit=merge_limit,
offset=0,
)
dismissed_request_rows = user_db.list_dismissed_requests(user_id=actor.owner_scope, limit=max_rows)
dismissed_request_rows = user_db.list_dismissed_requests(user_id=actor.owner_scope, limit=merge_limit)
request_history_rows = [
entry
for entry in (_request_history_entry(row) for row in dismissed_request_rows)
@@ -609,7 +646,7 @@ def register_activity_routes(
combined.sort(
key=lambda row: (
_parse_timestamp(row.get("dismissed_at")),
int(row.get("id") or 0),
str(row.get("id") or ""),
),
reverse=True,
)

View File

@@ -5,13 +5,19 @@ from __future__ import annotations
import os
import sqlite3
import threading
from datetime import datetime
from typing import Any
from shelfmark.core.logger import setup_logger
from shelfmark.core.models import TERMINAL_QUEUE_STATUSES
from shelfmark.core.request_helpers import normalize_optional_positive_int, normalize_optional_text, now_utc_iso
logger = setup_logger(__name__)
VALID_FINAL_STATUSES = frozenset({"complete", "error", "cancelled"})
VALID_ORIGINS = frozenset({"direct", "request", "requested"})
VALID_TERMINAL_STATUSES = frozenset(s.value for s in TERMINAL_QUEUE_STATUSES)
ACTIVE_DOWNLOAD_STATUS = "active"
VALID_ORIGINS = frozenset({"direct", "requested"})
def _normalize_task_id(task_id: Any) -> str:
@@ -27,7 +33,7 @@ def _normalize_origin(origin: Any) -> str:
return "direct"
lowered = normalized.lower()
if lowered not in VALID_ORIGINS:
raise ValueError("origin must be one of: direct, request, requested")
raise ValueError("origin must be one of: direct, requested")
return lowered
@@ -36,7 +42,7 @@ def _normalize_final_status(final_status: Any) -> str:
if normalized is None:
raise ValueError("final_status must be a non-empty string")
lowered = normalized.lower()
if lowered not in VALID_FINAL_STATUSES:
if lowered not in VALID_TERMINAL_STATUSES:
raise ValueError("final_status must be one of: complete, error, cancelled")
return lowered
@@ -109,23 +115,39 @@ class DownloadHistoryService:
"source_display_name": row.get("source_display_name"),
"status_message": row.get("status_message"),
"download_path": DownloadHistoryService._resolve_existing_download_path(row.get("download_path")),
"added_time": DownloadHistoryService._iso_to_epoch(row.get("queued_at")),
"user_id": row.get("user_id"),
"username": row.get("username"),
"request_id": row.get("request_id"),
}
@staticmethod
def _iso_to_epoch(value: Any) -> float | None:
if not isinstance(value, str) or not value.strip():
return None
normalized = value.strip().replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return None
@classmethod
def _to_history_row(cls, row: dict[str, Any]) -> dict[str, Any]:
task_id = str(row.get("task_id") or "").strip()
item_key = cls._to_item_key(task_id)
download_payload = cls.to_download_payload(row)
# Clear stale progress messages for non-error terminal states.
if row.get("final_status") in ("complete", "cancelled"):
download_payload["status_message"] = None
return {
"id": row.get("id"),
"id": item_key,
"user_id": row.get("user_id"),
"item_type": "download",
"item_key": cls._to_item_key(task_id),
"item_key": item_key,
"dismissed_at": row.get("dismissed_at"),
"snapshot": {
"kind": "download",
"download": cls.to_download_payload(row),
"download": download_payload,
},
"origin": row.get("origin"),
"final_status": row.get("final_status"),
@@ -134,7 +156,7 @@ class DownloadHistoryService:
"source_id": task_id or None,
}
def record_terminal(
def record_download(
self,
*,
task_id: str,
@@ -150,11 +172,13 @@ class DownloadHistoryService:
preview: str | None,
content_type: str | None,
origin: str,
final_status: str,
status_message: str | None,
download_path: str | None,
terminal_at: str | None = None,
) -> None:
"""Record a download at queue time with final_status='active'.
On first queue: inserts a new row.
On retry (row already exists): resets the row back to 'active'
so the normal finalize path works when the retry completes.
"""
normalized_task_id = _normalize_task_id(task_id)
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_request_id = normalize_optional_positive_int(request_id, "request_id")
@@ -165,12 +189,6 @@ class DownloadHistoryService:
if normalized_title is None:
raise ValueError("title must be a non-empty string")
normalized_origin = _normalize_origin(origin)
normalized_final_status = _normalize_final_status(final_status)
effective_terminal_at = (
terminal_at
if isinstance(terminal_at, str) and terminal_at.strip()
else now_utc_iso()
)
with self._lock:
conn = self._connect()
@@ -178,44 +196,20 @@ class DownloadHistoryService:
conn.execute(
"""
INSERT INTO download_history (
task_id,
user_id,
username,
request_id,
source,
source_display_name,
title,
author,
format,
size,
preview,
content_type,
origin,
final_status,
status_message,
download_path,
terminal_at
task_id, user_id, username, request_id,
source, source_display_name,
title, author, format, size, preview, content_type,
origin, final_status,
status_message, download_path,
queued_at, terminal_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', NULL, NULL, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT(task_id) DO UPDATE SET
user_id = excluded.user_id,
username = excluded.username,
request_id = excluded.request_id,
source = excluded.source,
source_display_name = excluded.source_display_name,
title = excluded.title,
author = excluded.author,
format = excluded.format,
size = excluded.size,
preview = excluded.preview,
content_type = excluded.content_type,
origin = excluded.origin,
final_status = excluded.final_status,
status_message = excluded.status_message,
download_path = excluded.download_path,
terminal_at = excluded.terminal_at,
dismissed_at = NULL
""",
final_status = 'active',
status_message = NULL,
download_path = NULL,
terminal_at = CURRENT_TIMESTAMP
""",
(
normalized_task_id,
normalized_user_id,
@@ -230,16 +224,57 @@ class DownloadHistoryService:
normalize_optional_text(preview),
normalize_optional_text(content_type),
normalized_origin,
normalized_final_status,
normalize_optional_text(status_message),
normalize_optional_text(download_path),
effective_terminal_at,
),
)
conn.commit()
finally:
conn.close()
def finalize_download(
self,
*,
task_id: str,
final_status: str,
status_message: str | None = None,
download_path: str | None = None,
) -> None:
"""Update an existing download row to its terminal state."""
normalized_task_id = _normalize_task_id(task_id)
normalized_final_status = _normalize_final_status(final_status)
normalized_status_message = normalize_optional_text(status_message)
normalized_download_path = normalize_optional_text(download_path)
effective_terminal_at = now_utc_iso()
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(
"""
UPDATE download_history
SET final_status = ?,
status_message = ?,
download_path = ?,
terminal_at = ?
WHERE task_id = ? AND final_status = 'active'
""",
(
normalized_final_status,
normalized_status_message,
normalized_download_path,
effective_terminal_at,
normalized_task_id,
),
)
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
if rowcount < 1:
logger.warning(
"finalize_download: no active row found for task_id=%s (may have been missed at queue time)",
normalized_task_id,
)
conn.commit()
finally:
conn.close()
def get_by_task_id(self, task_id: str) -> dict[str, Any] | None:
normalized_task_id = _normalize_task_id(task_id)
conn = self._connect()
@@ -252,7 +287,7 @@ class DownloadHistoryService:
finally:
conn.close()
def get_undismissed_terminal(
def get_undismissed(
self,
*,
user_id: int | None,

View File

@@ -38,12 +38,19 @@ class QueueStatus(str, Enum):
LOCATING = "locating"
DOWNLOADING = "downloading"
COMPLETE = "complete"
AVAILABLE = "available"
ERROR = "error"
DONE = "done"
CANCELLED = "cancelled"
TERMINAL_QUEUE_STATUSES: frozenset[QueueStatus] = frozenset({
QueueStatus.COMPLETE, QueueStatus.ERROR, QueueStatus.CANCELLED,
})
ACTIVE_QUEUE_STATUSES: frozenset[QueueStatus] = frozenset({
QueueStatus.QUEUED, QueueStatus.RESOLVING, QueueStatus.LOCATING, QueueStatus.DOWNLOADING,
})
class SearchMode(str, Enum):
DIRECT = "direct"
UNIVERSAL = "universal"

View File

@@ -8,7 +8,7 @@ from threading import Lock, Event
from typing import Dict, List, Optional, Tuple, Any, Callable
from shelfmark.core.config import config as app_config
from shelfmark.core.models import QueueStatus, QueueItem, DownloadTask
from shelfmark.core.models import QueueStatus, QueueItem, DownloadTask, TERMINAL_QUEUE_STATUSES
class BookQueue:
@@ -25,6 +25,7 @@ class BookQueue:
self._terminal_status_hook: Optional[
Callable[[str, QueueStatus, DownloadTask], None]
] = None
self._queue_hook: Optional[Callable[[str, DownloadTask], None]] = None
@property
def _status_timeout(self) -> timedelta:
@@ -33,11 +34,12 @@ class BookQueue:
def add(self, task: DownloadTask) -> bool:
"""Add a download task to the queue. Returns False if already exists."""
hook: Optional[Callable[[str, DownloadTask], None]] = None
with self._lock:
task_id = task.task_id
# Don't add if already exists and not in error/done state
if task_id in self._status and self._status[task_id] not in [QueueStatus.ERROR, QueueStatus.DONE, QueueStatus.CANCELLED]:
# Don't add if already exists and not in error/cancelled state
if task_id in self._status and self._status[task_id] not in [QueueStatus.ERROR, QueueStatus.CANCELLED]:
return False
# Ensure added_time is set
@@ -48,7 +50,14 @@ class BookQueue:
self._queue.put(queue_item)
self._task_data[task_id] = task
self._update_status(task_id, QueueStatus.QUEUED)
return True
hook = self._queue_hook
if hook is not None:
try:
hook(task_id, task)
except Exception:
pass
return True
def get_next(self) -> Optional[Tuple[str, Event]]:
"""Get next task ID from queue with cancellation flag."""
@@ -95,6 +104,14 @@ class BookQueue:
with self._lock:
self._terminal_status_hook = hook
def set_queue_hook(
self,
hook: Optional[Callable[[str, DownloadTask], None]],
) -> None:
"""Register a callback invoked when a task is added to the queue."""
with self._lock:
self._queue_hook = hook
def update_status(self, book_id: str, status: QueueStatus) -> None:
"""Update status of a book in the queue."""
hook: Optional[Callable[[str, QueueStatus, DownloadTask], None]] = None
@@ -103,15 +120,8 @@ class BookQueue:
previous_status = self._status.get(book_id)
self._update_status(book_id, status)
terminal_statuses = {
QueueStatus.COMPLETE,
QueueStatus.AVAILABLE,
QueueStatus.ERROR,
QueueStatus.DONE,
QueueStatus.CANCELLED,
}
if (
status in terminal_statuses
status in TERMINAL_QUEUE_STATUSES
and previous_status != status
and self._terminal_status_hook is not None
):
@@ -121,7 +131,7 @@ class BookQueue:
hook_task = current_task
# Clean up active download tracking when finished
if status in [QueueStatus.COMPLETE, QueueStatus.AVAILABLE, QueueStatus.ERROR, QueueStatus.DONE, QueueStatus.CANCELLED]:
if status in TERMINAL_QUEUE_STATUSES:
self._active_downloads.pop(book_id, None)
self._cancel_flags.pop(book_id, None)
@@ -196,29 +206,20 @@ class BookQueue:
return sorted(queue_items, key=lambda x: (x['priority'], x['added_time']))
def cancel_download(self, task_id: str) -> bool:
"""Cancel a download or clear a completed/errored item."""
"""Cancel an active or queued download."""
with self._lock:
current_status = self._status.get(task_id)
# Allow cancellation during any active state
if current_status in [QueueStatus.RESOLVING, QueueStatus.LOCATING, QueueStatus.DOWNLOADING]:
# Signal active download to stop
if task_id in self._cancel_flags:
self._cancel_flags[task_id].set()
if current_status in [QueueStatus.COMPLETE, QueueStatus.DONE, QueueStatus.AVAILABLE, QueueStatus.ERROR, QueueStatus.CANCELLED]:
# Clear completed/errored/cancelled items from tracking
self._status.pop(task_id, None)
self._status_timestamps.pop(task_id, None)
self._task_data.pop(task_id, None)
self._cancel_flags.pop(task_id, None)
self._active_downloads.pop(task_id, None)
return True
elif current_status not in [QueueStatus.QUEUED]:
# Not in a cancellable state
return False
if current_status in [QueueStatus.RESOLVING, QueueStatus.LOCATING, QueueStatus.DOWNLOADING, QueueStatus.QUEUED]:
self.update_status(task_id, QueueStatus.CANCELLED)
return True
return False
self.update_status(task_id, QueueStatus.CANCELLED)
return True
def set_priority(self, task_id: str, new_priority: int) -> bool:
"""Change the priority of a queued task (lower = higher priority)."""
@@ -257,6 +258,8 @@ class BookQueue:
This is used for retries where task metadata should be preserved.
"""
hook: Optional[Callable[[str, DownloadTask], None]] = None
hook_task: Optional[DownloadTask] = None
with self._lock:
task = self._task_data.get(task_id)
if task is None:
@@ -285,7 +288,15 @@ class BookQueue:
queue_item = QueueItem(task_id, task.priority, time.time())
self._queue.put(queue_item)
self._update_status(task_id, QueueStatus.QUEUED)
return True
hook = self._queue_hook
hook_task = task
if hook is not None and hook_task is not None:
try:
hook(task_id, hook_task)
except Exception:
pass
return True
def reorder_queue(self, task_priorities: Dict[str, int]) -> bool:
"""Bulk reorder queue by mapping task_id to new priority."""
@@ -327,7 +338,7 @@ class BookQueue:
def refresh(self) -> None:
"""Remove any tasks that are done downloading or have stale status."""
terminal_statuses = {QueueStatus.COMPLETE, QueueStatus.DONE, QueueStatus.ERROR, QueueStatus.AVAILABLE, QueueStatus.CANCELLED}
terminal_statuses = TERMINAL_QUEUE_STATUSES
with self._lock:
current_time = datetime.now()
to_remove = []
@@ -341,10 +352,6 @@ class BookQueue:
if task.download_path and not Path(task.download_path).exists():
task.download_path = None
# Mark available downloads as done if file is gone
if status == QueueStatus.AVAILABLE and not task.download_path:
self._update_status(task_id, QueueStatus.DONE)
# Check for stale status entries
last_update = self._status_timestamps.get(task_id)
if last_update and (current_time - last_update) > self._status_timeout:

View File

@@ -104,6 +104,17 @@ def normalize_optional_positive_int(value: Any, field_name: str = "value") -> in
return parsed
def populate_request_usernames(rows: list[dict[str, Any]], user_db: Any) -> None:
"""Add 'username' to each request row by looking up user_id."""
cache: dict[int, str] = {}
for row in rows:
requester_id = row["user_id"]
if requester_id not in cache:
requester = user_db.get_user(user_id=requester_id)
cache[requester_id] = requester.get("username", "") if requester else ""
row["username"] = cache[requester_id]
def extract_release_source_id(release_data: Any) -> str | None:
"""Extract and normalize release_data.source_id."""
if not isinstance(release_data, dict):

View File

@@ -17,6 +17,7 @@ from shelfmark.core.request_policy import (
parse_policy_mode,
resolve_policy_mode,
)
from shelfmark.core.request_validation import RequestStatus
from shelfmark.core.requests_service import (
RequestServiceError,
cancel_request,
@@ -36,6 +37,8 @@ from shelfmark.core.request_helpers import (
emit_ws_event,
load_users_request_policy_settings,
normalize_optional_text,
normalize_positive_int,
populate_request_usernames,
)
from shelfmark.core.user_db import UserDB
@@ -88,6 +91,18 @@ def _require_db_user_id() -> tuple[int | None, Any | None]:
)
def _require_admin_user_id() -> tuple[int | None, Any | None]:
if not session.get("is_admin", False):
return None, (jsonify({"error": "Admin access required"}), 403)
raw_admin_id = session.get("db_user_id")
if raw_admin_id is None:
return None, (jsonify({"error": "Admin user identity unavailable"}), 403)
try:
return int(raw_admin_id), None
except (TypeError, ValueError):
return None, (jsonify({"error": "Admin user identity unavailable"}), 403)
def _resolve_effective_policy(
user_db: UserDB,
*,
@@ -177,23 +192,16 @@ def _format_user_label(username: str | None, user_id: int | None = None) -> str:
return "unknown user"
def _resolve_request_username(
user_db: UserDB,
*,
request_row: dict[str, Any],
fallback_username: str | None = None,
) -> str | None:
normalized_fallback = normalize_optional_text(fallback_username)
raw_user_id = request_row.get("user_id")
try:
request_user_id = int(raw_user_id)
except (TypeError, ValueError):
return normalized_fallback
requester = user_db.get_user(user_id=request_user_id)
if not isinstance(requester, dict):
return normalized_fallback
return normalize_optional_text(requester.get("username")) or normalized_fallback
def _format_requester_label(user_db: UserDB, request_row: dict[str, Any]) -> str:
"""Resolve a display label for the user who created a request."""
user_id = normalize_positive_int(request_row.get("user_id"))
if user_id is not None:
requester = user_db.get_user(user_id=user_id)
if isinstance(requester, dict):
username = normalize_optional_text(requester.get("username"))
if username is not None:
return username
return _format_user_label(None, user_id)
def _resolve_request_source_and_format(request_row: dict[str, Any]) -> tuple[str, str | None]:
@@ -209,13 +217,6 @@ def _resolve_request_source_and_format(request_row: dict[str, Any]) -> tuple[str
return normalize_source(request_row.get("source_hint")), None
def _resolve_request_user_id(request_row: dict[str, Any]) -> int | None:
raw_user_id = request_row.get("user_id")
try:
user_id = int(raw_user_id)
except (TypeError, ValueError):
return None
return user_id if user_id > 0 else None
def _notify_admin_for_request_event(
@@ -223,7 +224,6 @@ def _notify_admin_for_request_event(
*,
event: NotificationEvent,
request_row: dict[str, Any],
fallback_username: str | None = None,
) -> None:
book_data = request_row.get("book_data")
if not isinstance(book_data, dict):
@@ -234,11 +234,7 @@ def _notify_admin_for_request_event(
event=event,
title=str(book_data.get("title") or "Unknown title"),
author=str(book_data.get("author") or "Unknown author"),
username=_resolve_request_username(
user_db,
request_row=request_row,
fallback_username=fallback_username,
),
username=_format_requester_label(user_db, request_row),
content_type=normalize_content_type(
request_row.get("content_type") or book_data.get("content_type")
),
@@ -248,7 +244,7 @@ def _notify_admin_for_request_event(
error_message=None,
)
owner_user_id = _resolve_request_user_id(request_row)
owner_user_id = normalize_positive_int(request_row.get("user_id"))
try:
notify_admin(event, context)
except Exception as exc:
@@ -513,7 +509,6 @@ def register_request_routes(
user_db,
event=NotificationEvent.REQUEST_CREATED,
request_row=created,
fallback_username=actor_username,
)
return jsonify(created), 201
@@ -606,13 +601,7 @@ def register_request_routes(
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
user_cache: dict[int, str] = {}
for row in rows:
requester_id = row["user_id"]
if requester_id not in user_cache:
requester = user_db.get_user(user_id=requester_id)
user_cache[requester_id] = requester.get("username", "") if requester else ""
row["username"] = user_cache[requester_id]
populate_request_usernames(rows, user_db)
return jsonify(rows)
@@ -626,11 +615,11 @@ def register_request_routes(
by_status = {
status: len(user_db.list_requests(status=status))
for status in ("pending", "fulfilled", "rejected", "cancelled")
for status in RequestStatus
}
return jsonify(
{
"pending": by_status["pending"],
"pending": by_status[RequestStatus.PENDING],
"total": sum(by_status.values()),
"by_status": by_status,
}
@@ -641,16 +630,10 @@ def register_request_routes(
auth_gate = _require_request_endpoints_available(resolve_auth_mode)
if auth_gate is not None:
return auth_gate
if not session.get("is_admin", False):
return jsonify({"error": "Admin access required"}), 403
raw_admin_id = session.get("db_user_id")
if raw_admin_id is None:
return jsonify({"error": "Admin user identity unavailable"}), 403
try:
admin_user_id = int(raw_admin_id)
except (TypeError, ValueError):
return jsonify({"error": "Admin user identity unavailable"}), 403
admin_user_id, admin_gate = _require_admin_user_id()
if admin_gate is not None:
return admin_gate
data = request.get_json(silent=True) or {}
if not isinstance(data, dict):
@@ -675,10 +658,7 @@ def register_request_routes(
"title": _resolve_request_title(updated),
}
admin_label = _format_user_label(normalize_optional_text(session.get("user_id")), admin_user_id)
requester_label = _format_user_label(
_resolve_request_username(user_db, request_row=updated),
_resolve_request_user_id(updated),
)
requester_label = _format_requester_label(user_db, updated)
logger.info(
"Request fulfilled #%s for '%s' by %s (requested by %s)",
updated["id"],
@@ -712,16 +692,10 @@ def register_request_routes(
auth_gate = _require_request_endpoints_available(resolve_auth_mode)
if auth_gate is not None:
return auth_gate
if not session.get("is_admin", False):
return jsonify({"error": "Admin access required"}), 403
raw_admin_id = session.get("db_user_id")
if raw_admin_id is None:
return jsonify({"error": "Admin user identity unavailable"}), 403
try:
admin_user_id = int(raw_admin_id)
except (TypeError, ValueError):
return jsonify({"error": "Admin user identity unavailable"}), 403
admin_user_id, admin_gate = _require_admin_user_id()
if admin_gate is not None:
return admin_gate
data = request.get_json(silent=True) or {}
if not isinstance(data, dict):
@@ -743,10 +717,7 @@ def register_request_routes(
"title": _resolve_request_title(updated),
}
admin_label = _format_user_label(normalize_optional_text(session.get("user_id")), admin_user_id)
requester_label = _format_user_label(
_resolve_request_username(user_db, request_row=updated),
_resolve_request_user_id(updated),
)
requester_label = _format_requester_label(user_db, updated)
logger.info(
"Request rejected #%s for '%s' by %s (requested by %s)",
updated["id"],

View File

@@ -2,27 +2,29 @@
from __future__ import annotations
from enum import Enum
from typing import Any
from shelfmark.core.models import QueueStatus
from shelfmark.core.request_policy import parse_policy_mode
VALID_REQUEST_STATUSES = frozenset({"pending", "fulfilled", "rejected", "cancelled"})
TERMINAL_REQUEST_STATUSES = frozenset({"fulfilled", "rejected", "cancelled"})
class RequestStatus(str, Enum):
"""Enum for request lifecycle statuses."""
PENDING = "pending"
FULFILLED = "fulfilled"
REJECTED = "rejected"
CANCELLED = "cancelled"
DELIVERY_STATE_NONE = "none"
VALID_REQUEST_STATUSES = frozenset(RequestStatus)
TERMINAL_REQUEST_STATUSES = frozenset({
RequestStatus.FULFILLED, RequestStatus.REJECTED, RequestStatus.CANCELLED,
})
VALID_REQUEST_LEVELS = frozenset({"book", "release"})
VALID_DELIVERY_STATES = frozenset(
{
"none",
"unknown",
"queued",
"resolving",
"locating",
"downloading",
"complete",
"error",
"cancelled",
}
)
VALID_DELIVERY_STATES = frozenset({DELIVERY_STATE_NONE} | set(QueueStatus))
def normalize_request_status(status: Any) -> str:
@@ -63,14 +65,6 @@ def normalize_delivery_state(state: Any) -> str:
return normalized
def safe_delivery_state(state: Any, default: str = "none") -> str:
"""Normalize delivery-state with a fallback for invalid/missing values."""
if not isinstance(state, str):
return default
normalized = state.strip().lower()
return normalized if normalized in VALID_DELIVERY_STATES else default
def validate_request_level_payload(request_level: Any, release_data: Any) -> str:
"""Validate request_level and release_data shape coupling."""
normalized_level = normalize_request_level(request_level)

View File

@@ -7,11 +7,13 @@ import json
from typing import Any, Callable, TYPE_CHECKING
from shelfmark.core.request_policy import normalize_content_type
from shelfmark.core.models import QueueStatus
from shelfmark.core.request_validation import (
DELIVERY_STATE_NONE,
RequestStatus,
normalize_policy_mode,
normalize_request_level,
normalize_request_status,
safe_delivery_state,
validate_request_level_payload,
validate_status_transition,
)
@@ -102,7 +104,7 @@ def _find_duplicate_pending_request(
author: str,
content_type: str,
) -> dict[str, Any] | None:
pending_rows = user_db.list_requests(user_id=user_id, status="pending")
pending_rows = user_db.list_requests(user_id=user_id, status=RequestStatus.PENDING)
for row in pending_rows:
row_book_data = row.get("book_data") or {}
if not isinstance(row_book_data, dict):
@@ -122,8 +124,12 @@ def _now_timestamp() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _existing_delivery_state(request_row: dict[str, Any]) -> str:
return safe_delivery_state(request_row.get("delivery_state"))
def _normalize_admin_note(admin_note: Any) -> str | None:
if admin_note is None:
return None
if not isinstance(admin_note, str):
raise RequestServiceError("admin_note must be a string", status_code=400)
return admin_note.strip() or None
def sync_delivery_states_from_queue_status(
@@ -134,7 +140,7 @@ def sync_delivery_states_from_queue_status(
) -> list[dict[str, Any]]:
"""Persist delivery-state transitions for fulfilled requests based on queue status."""
source_delivery_states: dict[str, str] = {}
for status_key in ("queued", "resolving", "locating", "downloading", "complete", "error", "cancelled"):
for status_key in QueueStatus:
status_bucket = queue_status.get(status_key)
if not isinstance(status_bucket, dict):
continue
@@ -144,7 +150,7 @@ def sync_delivery_states_from_queue_status(
if not source_delivery_states:
return []
fulfilled_rows = user_db.list_requests(user_id=user_id, status="fulfilled")
fulfilled_rows = user_db.list_requests(user_id=user_id, status=RequestStatus.FULFILLED)
updated: list[dict[str, Any]] = []
for row in fulfilled_rows:
@@ -156,7 +162,7 @@ def sync_delivery_states_from_queue_status(
if delivery_state is None:
continue
if _existing_delivery_state(row) == delivery_state:
if row.get("delivery_state", DELIVERY_STATE_NONE) == delivery_state:
continue
updated.append(
@@ -257,6 +263,15 @@ def ensure_request_access(
return request_row
def _require_pending(request_row: dict[str, Any]) -> None:
if request_row["status"] != RequestStatus.PENDING:
raise RequestServiceError(
"Request is already in a terminal state",
status_code=409,
code="stale_transition",
)
def cancel_request(
user_db: "UserDB",
*,
@@ -270,18 +285,13 @@ def cancel_request(
actor_user_id=actor_user_id,
is_admin=False,
)
if request_row["status"] != "pending":
raise RequestServiceError(
"Request is already in a terminal state",
status_code=409,
code="stale_transition",
)
_require_pending(request_row)
try:
return user_db.update_request(
request_id,
expected_current_status="pending",
status="cancelled",
expected_current_status=RequestStatus.PENDING,
status=RequestStatus.CANCELLED,
)
except ValueError as exc:
raise RequestServiceError(str(exc), status_code=409, code="stale_transition") from exc
@@ -301,24 +311,15 @@ def reject_request(
actor_user_id=admin_user_id,
is_admin=True,
)
if request_row["status"] != "pending":
raise RequestServiceError(
"Request is already in a terminal state",
status_code=409,
code="stale_transition",
)
_require_pending(request_row)
normalized_admin_note = None
if admin_note is not None:
if not isinstance(admin_note, str):
raise RequestServiceError("admin_note must be a string", status_code=400)
normalized_admin_note = admin_note.strip() or None
normalized_admin_note = _normalize_admin_note(admin_note)
try:
return user_db.update_request(
request_id,
expected_current_status="pending",
status="rejected",
expected_current_status=RequestStatus.PENDING,
status=RequestStatus.REJECTED,
admin_note=normalized_admin_note,
reviewed_by=admin_user_id,
reviewed_at=_now_timestamp(),
@@ -344,18 +345,9 @@ def fulfil_request(
actor_user_id=admin_user_id,
is_admin=True,
)
if request_row["status"] != "pending":
raise RequestServiceError(
"Request is already in a terminal state",
status_code=409,
code="stale_transition",
)
_require_pending(request_row)
normalized_admin_note = None
if admin_note is not None:
if not isinstance(admin_note, str):
raise RequestServiceError("admin_note must be a string", status_code=400)
normalized_admin_note = admin_note.strip() or None
normalized_admin_note = _normalize_admin_note(admin_note)
if not isinstance(manual_approval, bool):
raise RequestServiceError("manual_approval must be a boolean", status_code=400)
@@ -368,10 +360,10 @@ def fulfil_request(
try:
return user_db.update_request(
request_id,
expected_current_status="pending",
status="fulfilled",
expected_current_status=RequestStatus.PENDING,
status=RequestStatus.FULFILLED,
release_data=None,
delivery_state="complete",
delivery_state=QueueStatus.COMPLETE,
delivery_updated_at=_now_timestamp(),
last_failure_reason=None,
admin_note=normalized_admin_note,
@@ -381,14 +373,9 @@ def fulfil_request(
except ValueError as exc:
raise RequestServiceError(str(exc), status_code=409, code="stale_transition") from exc
if request_row["request_level"] == "book" and selected_release_data is None:
if selected_release_data is None:
raise RequestServiceError(
"release_data is required to fulfil book-level requests",
status_code=400,
)
if request_row["request_level"] == "release" and selected_release_data is None:
raise RequestServiceError(
"release_data is required to fulfil release-level requests",
"release_data is required to fulfil requests",
status_code=400,
)
@@ -417,10 +404,10 @@ def fulfil_request(
try:
return user_db.update_request(
request_id,
expected_current_status="pending",
status="fulfilled",
expected_current_status=RequestStatus.PENDING,
status=RequestStatus.FULFILLED,
release_data=selected_release_data,
delivery_state="queued",
delivery_state=QueueStatus.QUEUED,
delivery_updated_at=_now_timestamp(),
last_failure_reason=None,
admin_note=normalized_admin_note,

View File

@@ -9,12 +9,14 @@ from typing import Any, Dict, List, Optional
from shelfmark.core.auth_modes import AUTH_SOURCE_BUILTIN, AUTH_SOURCE_SET
from shelfmark.core.logger import setup_logger
from shelfmark.core.request_helpers import normalize_optional_positive_int
from shelfmark.core.models import QueueStatus
from shelfmark.core.request_validation import (
DELIVERY_STATE_NONE,
RequestStatus,
normalize_delivery_state,
normalize_policy_mode,
normalize_request_level,
normalize_request_status,
safe_delivery_state,
validate_request_level_payload,
validate_status_transition,
)
@@ -83,6 +85,7 @@ CREATE TABLE IF NOT EXISTS download_history (
final_status TEXT NOT NULL,
status_message TEXT,
download_path TEXT,
queued_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
terminal_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
dismissed_at TIMESTAMP
);
@@ -92,6 +95,10 @@ ON download_history (user_id, final_status, terminal_at DESC);
CREATE INDEX IF NOT EXISTS idx_download_history_dismissed
ON download_history (dismissed_at) WHERE dismissed_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_download_history_undismissed
ON download_history (user_id, terminal_at DESC, id DESC)
WHERE dismissed_at IS NULL;
"""
@@ -162,6 +169,7 @@ class UserDB:
self._migrate_auth_source_column(conn)
self._migrate_request_delivery_columns(conn)
self._migrate_download_requests_dismissed_at(conn)
self._migrate_download_history_queued_at(conn)
conn.commit()
# WAL mode must be changed outside an open transaction.
conn.execute("PRAGMA journal_mode=WAL")
@@ -201,18 +209,11 @@ class UserDB:
if "last_failure_reason" not in column_names:
conn.execute("ALTER TABLE download_requests ADD COLUMN last_failure_reason TEXT")
conn.execute(
"""
UPDATE download_requests
SET delivery_state = 'unknown'
WHERE status = 'fulfilled' AND (delivery_state IS NULL OR TRIM(delivery_state) = '' OR delivery_state = 'none')
"""
)
conn.execute(
"""
UPDATE download_requests
SET delivery_state = 'none'
WHERE status != 'fulfilled' AND (delivery_state IS NULL OR TRIM(delivery_state) = '')
WHERE delivery_state IS NULL OR TRIM(delivery_state) = '' OR delivery_state IN ('unknown', 'available', 'done')
"""
)
conn.execute(
@@ -222,13 +223,6 @@ class UserDB:
WHERE delivery_state != 'none' AND delivery_updated_at IS NULL
"""
)
conn.execute(
"""
UPDATE download_requests
SET delivery_state = 'complete'
WHERE delivery_state = 'cleared'
"""
)
def _migrate_download_requests_dismissed_at(self, conn: sqlite3.Connection) -> None:
"""Ensure download_requests.dismissed_at exists for request dismissal history."""
@@ -236,10 +230,20 @@ class UserDB:
column_names = {str(col["name"]) for col in columns}
if "dismissed_at" not in column_names:
conn.execute("ALTER TABLE download_requests ADD COLUMN dismissed_at TIMESTAMP")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_download_requests_dismissed "
"ON download_requests (dismissed_at) WHERE dismissed_at IS NOT NULL"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_download_requests_dismissed "
"ON download_requests (dismissed_at) WHERE dismissed_at IS NOT NULL"
)
def _migrate_download_history_queued_at(self, conn: sqlite3.Connection) -> None:
"""Ensure download_history.queued_at exists for queue-time recording."""
columns = conn.execute("PRAGMA table_info(download_history)").fetchall()
column_names = {str(col["name"]) for col in columns}
if "queued_at" not in column_names:
conn.execute("ALTER TABLE download_history ADD COLUMN queued_at TIMESTAMP")
conn.execute(
"UPDATE download_history SET queued_at = CURRENT_TIMESTAMP WHERE queued_at IS NULL"
)
def create_user(
self,
@@ -447,13 +451,13 @@ class UserDB:
policy_mode: str,
book_data: Dict[str, Any],
release_data: Optional[Dict[str, Any]] = None,
status: str = "pending",
status: str = RequestStatus.PENDING,
source_hint: Optional[str] = None,
note: Optional[str] = None,
admin_note: Optional[str] = None,
reviewed_by: Optional[int] = None,
reviewed_at: Optional[str] = None,
delivery_state: str = "none",
delivery_state: str = DELIVERY_STATE_NONE,
delivery_updated_at: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a download request row and return the created record."""
@@ -664,24 +668,8 @@ class UserDB:
if "content_type" in updates and not updates["content_type"]:
raise ValueError("content_type is required")
candidate_request_level = updates.get("request_level", current["request_level"])
candidate_release_data = (
updates["release_data"] if "release_data" in updates else current["release_data"]
)
candidate_status = updates.get("status", current["status"])
normalized_request_level = normalize_request_level(candidate_request_level)
normalized_candidate_status = normalize_request_status(candidate_status)
if normalized_request_level == "release" and candidate_release_data is None:
raise ValueError("request_level=release requires non-null release_data")
if (
normalized_request_level == "book"
and candidate_release_data is not None
and normalized_candidate_status != "fulfilled"
):
raise ValueError("request_level=book requires null release_data")
if "request_level" in updates:
updates["request_level"] = normalized_request_level
updates["request_level"] = normalize_request_level(updates["request_level"])
if "book_data" in updates:
if not isinstance(updates["book_data"], dict):
@@ -737,19 +725,17 @@ class UserDB:
if current_request is None:
return None
if current_request.get("status") != "fulfilled":
if current_request.get("status") != RequestStatus.FULFILLED:
return None
current_delivery_state = safe_delivery_state(
current_request.get("delivery_state"),
)
current_delivery_state = current_request.get("delivery_state", DELIVERY_STATE_NONE)
# Terminal hook callbacks can run before delivery-state sync persists "error".
# Allow reopening fulfilled requests unless they are already complete.
if current_delivery_state == "complete":
if current_delivery_state == QueueStatus.COMPLETE:
return None
if (
current_delivery_state not in {"error", "cancelled"}
current_delivery_state not in {QueueStatus.ERROR, QueueStatus.CANCELLED}
and normalized_failure_reason is None
):
return None
@@ -843,25 +829,6 @@ class UserDB:
finally:
conn.close()
def clear_request_dismissals(self, *, user_id: int | None) -> int:
"""Clear dismissed_at on requests, optionally scoped by owner user_id."""
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
params: list[Any] = []
query = "UPDATE download_requests SET dismissed_at = NULL WHERE dismissed_at IS NOT NULL"
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()
def delete_dismissed_requests(self, *, user_id: int | None) -> int:
"""Delete dismissed terminal requests, optionally scoped by owner user_id."""
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")

View File

@@ -411,6 +411,14 @@ def _download_task(task_id: str, cancel_flag: Event) -> Optional[str]:
exc_type="StatusCallbackError",
)
return
# Don't propagate terminal statuses to the queue here. Output modules
# call status_callback("complete") before returning the download path,
# but _process_single_download needs to set download_path on the task
# first so the terminal hook captures it for history persistence.
if status_key in ("complete", "cancelled"):
if message is not None:
book_queue.update_status_message(task_id, message)
return
update_download_status(task_id, status, message)
# Get the download handler based on the task's source
@@ -548,22 +556,10 @@ def update_download_progress(book_id: str, progress: float) -> None:
def update_download_status(book_id: str, status: str, message: Optional[str] = None) -> None:
"""Update download status with optional message for UI display."""
# Map string status to QueueStatus enum
status_map = {
'queued': QueueStatus.QUEUED,
'resolving': QueueStatus.RESOLVING,
'locating': QueueStatus.LOCATING,
'downloading': QueueStatus.DOWNLOADING,
'complete': QueueStatus.COMPLETE,
'available': QueueStatus.AVAILABLE,
'error': QueueStatus.ERROR,
'done': QueueStatus.DONE,
'cancelled': QueueStatus.CANCELLED,
}
status_key = status.lower()
queue_status_enum = status_map.get(status_key)
if not queue_status_enum:
try:
queue_status_enum = QueueStatus(status_key)
except ValueError:
return
# Always update activity timestamp (used by stall detection) even if the status
@@ -598,16 +594,20 @@ def cancel_download(book_id: str) -> bool:
def retry_download(book_id: str) -> Tuple[bool, Optional[str]]:
"""Retry a failed standalone download."""
"""Retry a failed or cancelled download.
Request-linked downloads can only be retried when cancelled (errors
reopen the request for admin re-approval instead).
"""
task = book_queue.get_task(book_id)
if task is None:
return False, "Download not found"
status = book_queue.get_task_status(book_id)
if status != QueueStatus.ERROR:
return False, "Download is not in an error state"
if status not in (QueueStatus.ERROR, QueueStatus.CANCELLED):
return False, "Download is not in an error or cancelled state"
if task.request_id:
if task.request_id and status != QueueStatus.CANCELLED:
return False, "Request-linked downloads must be retried from requests"
task.last_error_message = None

View File

@@ -18,6 +18,7 @@ from werkzeug.security import check_password_hash
from werkzeug.wrappers import Response
from shelfmark.download import orchestrator as backend
from shelfmark.release_sources import get_source_display_name
from shelfmark.release_sources.direct_download import SearchUnavailable
from shelfmark.config.settings import _SUPPORTED_BOOK_LANGUAGE
from shelfmark.config.env import (
@@ -27,7 +28,7 @@ from shelfmark.config.env import (
)
from shelfmark.core.config import config as app_config
from shelfmark.core.logger import setup_logger
from shelfmark.core.models import SearchFilters, QueueStatus
from shelfmark.core.models import SearchFilters, QueueStatus, TERMINAL_QUEUE_STATUSES
from shelfmark.core.prefix_middleware import PrefixMiddleware
from shelfmark.core.auth_modes import (
get_auth_check_admin_status,
@@ -53,9 +54,9 @@ from shelfmark.core.download_history_service import DownloadHistoryService
from shelfmark.core.notifications import NotificationContext, NotificationEvent, notify_admin, notify_user
from shelfmark.core.request_helpers import (
coerce_bool,
extract_release_source_id,
load_users_request_policy_settings,
normalize_optional_text,
normalize_positive_int,
)
from shelfmark.core.utils import normalize_base_path
from shelfmark.api.websocket import ws_manager
@@ -1104,16 +1105,10 @@ def _resolve_status_scope(*, require_authenticated: bool = True) -> tuple[bool,
def _queue_status_to_final_activity_status(status: QueueStatus) -> str | None:
if status == QueueStatus.COMPLETE:
return "complete"
if status == QueueStatus.ERROR:
return "error"
if status == QueueStatus.CANCELLED:
return "cancelled"
return None
return status.value if status in TERMINAL_QUEUE_STATUSES else None
def _queue_status_to_notification_event(status: QueueStatus) -> NotificationEvent | None:
if status in {QueueStatus.COMPLETE, QueueStatus.AVAILABLE, QueueStatus.DONE}:
if status == QueueStatus.COMPLETE:
return NotificationEvent.DOWNLOAD_COMPLETE
if status == QueueStatus.ERROR:
return NotificationEvent.DOWNLOAD_FAILED
@@ -1169,6 +1164,41 @@ def _notify_admin_for_terminal_download_status(*, task_id: str, status: QueueSta
)
def _record_download_queued(task_id: str, task: Any) -> None:
"""Persist initial download record when a task enters the queue."""
if download_history_service is None:
return
owner_user_id = normalize_positive_int(getattr(task, "user_id", None))
request_id = normalize_positive_int(getattr(task, "request_id", None))
origin = "requested" if request_id else "direct"
source_name = normalize_source(getattr(task, "source", None))
try:
source_display = get_source_display_name(source_name)
except Exception:
source_display = None
try:
download_history_service.record_download(
task_id=task_id,
user_id=owner_user_id,
username=normalize_optional_text(getattr(task, "username", None)),
request_id=request_id,
source=source_name,
source_display_name=source_display,
title=str(getattr(task, "title", "Unknown title") or "Unknown title"),
author=normalize_optional_text(getattr(task, "author", None)),
format=normalize_optional_text(getattr(task, "format", None)),
size=normalize_optional_text(getattr(task, "size", None)),
preview=normalize_optional_text(getattr(task, "preview", None)),
content_type=normalize_optional_text(getattr(task, "content_type", None)),
origin=origin,
)
except Exception as exc:
logger.warning("Failed to record download at queue time for task %s: %s", task_id, exc)
def _record_download_terminal_snapshot(task_id: str, status: QueueStatus, task: Any) -> None:
_notify_admin_for_terminal_download_status(task_id=task_id, status=status, task=task)
@@ -1176,68 +1206,22 @@ def _record_download_terminal_snapshot(task_id: str, status: QueueStatus, task:
if final_status is None:
return
raw_owner_user_id = getattr(task, "user_id", None)
try:
owner_user_id = int(raw_owner_user_id) if raw_owner_user_id is not None else None
except (TypeError, ValueError):
owner_user_id = None
linked_request: dict[str, Any] | None = None
request_id: int | None = None
origin = "direct"
if user_db is not None and owner_user_id is not None:
fulfilled_rows = user_db.list_requests(user_id=owner_user_id, status="fulfilled")
for row in fulfilled_rows:
source_id = extract_release_source_id(row.get("release_data"))
if source_id == task_id:
linked_request = row
origin = "requested"
try:
request_id = int(row.get("id"))
except (TypeError, ValueError):
request_id = None
break
try:
download_payload = backend._task_to_dict(task)
except Exception as exc:
logger.warning("Failed to serialize task payload for terminal snapshot: %s", exc)
download_payload = {
"id": task_id,
"title": getattr(task, "title", "Unknown title"),
"author": getattr(task, "author", "Unknown author"),
"source": getattr(task, "source", "direct_download"),
"added_time": getattr(task, "added_time", 0),
"status_message": getattr(task, "status_message", None),
"download_path": getattr(task, "download_path", None),
"user_id": getattr(task, "user_id", None),
"username": getattr(task, "username", None),
}
if download_history_service is not None:
try:
download_history_service.record_terminal(
user_id=owner_user_id,
download_history_service.finalize_download(
task_id=task_id,
username=normalize_optional_text(getattr(task, "username", None)),
request_id=request_id,
source=normalize_source(getattr(task, "source", None)),
source_display_name=download_payload.get("source_display_name"),
title=str(download_payload.get("title") or getattr(task, "title", "Unknown title")),
author=normalize_optional_text(download_payload.get("author")),
format=normalize_optional_text(download_payload.get("format")),
size=normalize_optional_text(download_payload.get("size")),
preview=normalize_optional_text(download_payload.get("preview")),
content_type=normalize_optional_text(download_payload.get("content_type")),
origin=origin,
final_status=final_status,
status_message=normalize_optional_text(download_payload.get("status_message")),
download_path=normalize_optional_text(download_payload.get("download_path")),
status_message=normalize_optional_text(getattr(task, "status_message", None)),
download_path=normalize_optional_text(getattr(task, "download_path", None)),
)
except Exception as exc:
logger.warning("Failed to record terminal download history for task %s: %s", task_id, exc)
logger.warning("Failed to finalize download history for task %s: %s", task_id, exc)
if user_db is None or linked_request is None or request_id is None or status != QueueStatus.ERROR:
if user_db is None or status != QueueStatus.ERROR:
return
request_id = normalize_positive_int(getattr(task, "request_id", None))
if request_id is None:
return
raw_error_message = getattr(task, "status_message", None)
@@ -1280,18 +1264,7 @@ def _task_owned_by_actor(task: Any, *, actor_user_id: int | None, actor_username
return False
def _is_graduated_request_download(task_id: str, *, user_id: int) -> bool:
if user_db is None:
return False
fulfilled_rows = user_db.list_requests(user_id=user_id, status="fulfilled")
for row in fulfilled_rows:
source_id = extract_release_source_id(row.get("release_data"))
if source_id == task_id:
return True
return False
backend.book_queue.set_queue_hook(_record_download_queued)
backend.book_queue.set_terminal_status_hook(_record_download_terminal_snapshot)
@@ -1502,7 +1475,7 @@ def api_cancel_download(book_id: str) -> Union[Response, Tuple[Response, int]]:
):
return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403
if _is_graduated_request_download(book_id, user_id=db_user_id):
if getattr(task, "request_id", None) is not None:
return jsonify({"error": "Forbidden", "code": "requested_download_cancel_forbidden"}), 403
success = backend.cancel_download(book_id)
@@ -1537,16 +1510,8 @@ def api_retry_download(book_id: str) -> Union[Response, Tuple[Response, int]]:
):
return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403
raw_owner_user_id = getattr(task, "user_id", None)
try:
owner_user_id = int(raw_owner_user_id) if raw_owner_user_id is not None else None
except (TypeError, ValueError):
owner_user_id = None
is_request_linked = bool(getattr(task, "request_id", None))
if not is_request_linked and owner_user_id is not None:
is_request_linked = _is_graduated_request_download(book_id, user_id=owner_user_id)
if is_request_linked:
task_status = backend.book_queue.get_task_status(book_id)
if getattr(task, "request_id", None) is not None and task_status != QueueStatus.CANCELLED:
return jsonify({"error": "Forbidden", "code": "requested_download_retry_forbidden"}), 403
success, error = backend.retry_download(book_id)
@@ -2465,6 +2430,7 @@ def api_settings_get_all() -> Union[Response, Tuple[Response, int]]:
# This triggers the @register_settings decorators
import shelfmark.config.settings # noqa: F401
import shelfmark.config.security # noqa: F401
import shelfmark.config.users_settings # noqa: F401
import shelfmark.config.notifications_settings # noqa: F401
data = serialize_all_settings(include_values=True)
@@ -2495,6 +2461,7 @@ def api_settings_get_tab(tab_name: str) -> Union[Response, Tuple[Response, int]]
# Ensure settings are registered
import shelfmark.config.settings # noqa: F401
import shelfmark.config.security # noqa: F401
import shelfmark.config.users_settings # noqa: F401
import shelfmark.config.notifications_settings # noqa: F401
tab = get_settings_tab(tab_name)
@@ -2531,6 +2498,7 @@ def api_settings_update_tab(tab_name: str) -> Union[Response, Tuple[Response, in
# Ensure settings are registered
import shelfmark.config.settings # noqa: F401
import shelfmark.config.security # noqa: F401
import shelfmark.config.users_settings # noqa: F401
import shelfmark.config.notifications_settings # noqa: F401
tab = get_settings_tab(tab_name)
@@ -2578,6 +2546,7 @@ def api_settings_execute_action(tab_name: str, action_key: str) -> Union[Respons
# Ensure settings are registered
import shelfmark.config.settings # noqa: F401
import shelfmark.config.security # noqa: F401
import shelfmark.config.users_settings # noqa: F401
import shelfmark.config.notifications_settings # noqa: F401
# Get current form values if provided (for testing with unsaved values)

View File

@@ -299,16 +299,29 @@ function App() {
}, [currentStatus, dismissedDownloadTaskIds]);
// Use real-time buckets for active work and merge persisted terminal buckets
// so completed/errored entries survive restarts.
const activitySidebarStatus = useMemo<StatusData>(() => ({
// so completed/errored entries survive restarts. Filter out dismissed items
// so the sidebar counts stay consistent with the activity panel.
const activitySidebarStatus = useMemo<StatusData>(() => {
const filterDismissed = (
bucket: Record<string, Book> | undefined
): Record<string, Book> | undefined => {
if (!bucket || dismissedDownloadTaskIds.size === 0) return bucket;
const filtered = Object.fromEntries(
Object.entries(bucket).filter(([taskId]) => !dismissedDownloadTaskIds.has(taskId))
) as Record<string, Book>;
return Object.keys(filtered).length > 0 ? filtered : undefined;
};
return {
queued: currentStatus.queued,
resolving: currentStatus.resolving,
locating: currentStatus.locating,
downloading: currentStatus.downloading,
complete: mergeTerminalBucket(activityStatus.complete, currentStatus.complete),
error: mergeTerminalBucket(activityStatus.error, currentStatus.error),
cancelled: mergeTerminalBucket(activityStatus.cancelled, currentStatus.cancelled),
}), [activityStatus, currentStatus]);
complete: filterDismissed(mergeTerminalBucket(activityStatus.complete, currentStatus.complete)),
error: filterDismissed(mergeTerminalBucket(activityStatus.error, currentStatus.error)),
cancelled: filterDismissed(mergeTerminalBucket(activityStatus.cancelled, currentStatus.cancelled)),
};
}, [activityStatus, currentStatus, dismissedDownloadTaskIds]);
const showRequestsTab = useMemo(() => {
if (requestRoleIsAdmin) {

View File

@@ -175,6 +175,19 @@ const buildActions = (item: ActivityItem, isAdmin: boolean): ActivityCardAction[
},
];
}
if (item.visualStatus === 'cancelled') {
return [
{
kind: 'download-retry',
bookId: item.downloadBookId,
},
{
kind: 'download-dismiss',
bookId: item.downloadBookId,
linkedRequestId: item.requestId,
},
];
}
return [
{
kind: 'download-dismiss',

View File

@@ -407,6 +407,7 @@ export const useActivity = ({
void clearActivityHistory()
.then(() => {
void refreshActivitySnapshot();
void refreshActivityHistory();
})
.catch((error) => {
console.error('Clear history failed:', error);

View File

@@ -430,7 +430,7 @@ export interface ActivityDismissPayload {
}
export interface ActivityHistoryItem {
id: number;
id: string;
user_id: number;
item_type: 'download' | 'request';
item_key: string;

View File

@@ -207,7 +207,7 @@ export interface RequestRecord {
id: number;
user_id: number;
status: 'pending' | 'fulfilled' | 'rejected' | 'cancelled';
delivery_state?: 'none' | 'unknown' | 'queued' | 'resolving' | 'locating' | 'downloading' | 'complete' | 'error' | 'cancelled';
delivery_state?: 'none' | 'queued' | 'resolving' | 'locating' | 'downloading' | 'complete' | 'error' | 'cancelled';
delivery_updated_at?: string | null;
last_failure_reason?: string | null;
source_hint: string | null;

View File

@@ -53,8 +53,10 @@ def _record_terminal_download(
final_status: str = "complete",
request_id: int | None = None,
status_message: str | None = None,
download_path: str | None = None,
) -> None:
main_module.download_history_service.record_terminal(
svc = main_module.download_history_service
svc.record_download(
task_id=task_id,
user_id=user_id,
username=username,
@@ -68,9 +70,12 @@ def _record_terminal_download(
preview=None,
content_type="ebook",
origin=origin,
)
svc.finalize_download(
task_id=task_id,
final_status=final_status,
status_message=status_message,
download_path=None,
download_path=download_path,
)
@@ -162,6 +167,36 @@ class TestActivityRoutes:
assert history_after_clear.status_code == 200
assert history_after_clear.json == []
def test_dismiss_preserves_terminal_snapshot_without_live_queue_merge(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
task_id = "dismiss-preserve-task"
_record_terminal_download(
main_module,
task_id=task_id,
user_id=user["id"],
username=user["username"],
title="Recorded Title",
author="Recorded Author",
status_message="Complete",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
dismiss_response = client.post(
"/api/activity/dismiss",
json={"item_type": "download", "item_key": f"download:{task_id}"},
)
history_response = client.get("/api/activity/history?limit=10&offset=0")
assert dismiss_response.status_code == 200
assert history_response.status_code == 200
assert history_response.json[0]["item_key"] == f"download:{task_id}"
snapshot_download = history_response.json[0]["snapshot"]["download"]
assert snapshot_download["title"] == "Recorded Title"
assert snapshot_download["author"] == "Recorded Author"
assert snapshot_download["status_message"] is None
def test_clear_history_deletes_dismissed_requests_from_snapshot(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
@@ -248,27 +283,6 @@ class TestActivityRoutes:
user_id=user["id"],
username=user["username"],
title="History Local Download",
)
row = main_module.download_history_service.get_by_task_id(task_id)
assert row is not None
assert main_module.download_history_service is not None
main_module.download_history_service.record_terminal(
task_id=task_id,
user_id=user["id"],
username=user["username"],
request_id=row.get("request_id"),
source=row.get("source") or "direct_download",
source_display_name=row.get("source_display_name"),
title=row.get("title") or "History Local Download",
author=row.get("author"),
format=row.get("format"),
size=row.get("size"),
preview=row.get("preview"),
content_type=row.get("content_type"),
origin=row.get("origin") or "direct",
final_status=row.get("final_status") or "complete",
status_message=row.get("status_message"),
download_path=str(file_path),
)
@@ -295,7 +309,7 @@ class TestActivityRoutes:
"provider_id": "legacy-fulfilled-1",
},
status="fulfilled",
delivery_state="unknown",
delivery_state="none",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
@@ -355,6 +369,51 @@ class TestActivityRoutes:
to=f"user_{user['id']}",
)
def test_dismiss_many_preserves_terminal_snapshots_without_live_queue_merge(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
first_task_id = "dismiss-many-preserve-1"
second_task_id = "dismiss-many-preserve-2"
_record_terminal_download(
main_module,
task_id=first_task_id,
user_id=user["id"],
username=user["username"],
title="First Title",
author="First Author",
)
_record_terminal_download(
main_module,
task_id=second_task_id,
user_id=user["id"],
username=user["username"],
title="Second Title",
author="Second Author",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
dismiss_many_response = client.post(
"/api/activity/dismiss-many",
json={
"items": [
{"item_type": "download", "item_key": f"download:{first_task_id}"},
{"item_type": "download", "item_key": f"download:{second_task_id}"},
]
},
)
history_response = client.get("/api/activity/history?limit=20&offset=0")
assert dismiss_many_response.status_code == 200
assert dismiss_many_response.json["count"] == 2
assert history_response.status_code == 200
rows_by_key = {row["item_key"]: row for row in history_response.json}
assert rows_by_key[f"download:{first_task_id}"]["snapshot"]["download"]["title"] == "First Title"
assert rows_by_key[f"download:{first_task_id}"]["snapshot"]["download"]["author"] == "First Author"
assert rows_by_key[f"download:{second_task_id}"]["snapshot"]["download"]["title"] == "Second Title"
assert rows_by_key[f"download:{second_task_id}"]["snapshot"]["download"]["author"] == "Second Author"
def test_no_auth_dismiss_many_and_history_use_shared_identity(self, main_module):
task_id = f"no-auth-{uuid.uuid4().hex[:10]}"
item_key = f"download:{task_id}"
@@ -513,6 +572,77 @@ class TestActivityRoutes:
assert "cross-user-expired-task" in response.json["status"]["complete"]
assert response.json["status"]["complete"]["cross-user-expired-task"]["id"] == "cross-user-expired-task"
def test_snapshot_shows_stale_active_download_as_interrupted_error(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
# Record a download at queue time (active status) but don't put it in the queue
main_module.download_history_service.record_download(
task_id="stale-active-task",
user_id=user["id"],
username=user["username"],
request_id=None,
source="direct_download",
source_display_name="Direct Download",
title="Stale Active Task",
author="Stale Author",
format="epub",
size="1 MB",
preview=None,
content_type="ebook",
origin="direct",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.backend, "queue_status", return_value=_sample_status_payload()):
response = client.get("/api/activity/snapshot")
assert response.status_code == 200
assert "stale-active-task" in response.json["status"]["error"]
assert response.json["status"]["error"]["stale-active-task"]["status_message"] == "Interrupted"
def test_snapshot_active_download_with_queue_entry_shows_in_correct_bucket(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
# Record a download at queue time
main_module.download_history_service.record_download(
task_id="active-downloading-task",
user_id=user["id"],
username=user["username"],
request_id=None,
source="direct_download",
source_display_name="Direct Download",
title="Active Downloading Task",
author="Active Author",
format="epub",
size="2 MB",
preview=None,
content_type="ebook",
origin="direct",
)
# Simulate it being active in the queue
active_status = _sample_status_payload()
active_status["downloading"] = {
"active-downloading-task": {
"id": "active-downloading-task",
"title": "Active Downloading Task",
"author": "Active Author",
"source": "direct_download",
"progress": 0.5,
"status_message": "Downloading 50%",
}
}
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.backend, "queue_status", return_value=active_status):
response = client.get("/api/activity/snapshot")
assert response.status_code == 200
assert "active-downloading-task" in response.json["status"]["downloading"]
assert response.json["status"]["downloading"]["active-downloading-task"]["progress"] == 0.5
def test_snapshot_clears_stale_download_dismissal_when_same_task_is_active(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)

View File

@@ -93,6 +93,7 @@ class TestTerminalSnapshotCapture:
title="Requested Snapshot",
user_id=user["id"],
username=user["username"],
request_id=request_row["id"],
)
assert main_module.backend.book_queue.add(task) is True
@@ -193,6 +194,103 @@ class TestTerminalSnapshotCapture:
finally:
main_module.backend.book_queue.cancel_download(task_id)
def test_queue_hook_records_active_row_at_queue_time(self, main_module):
user = _create_user(main_module, prefix="snap-queue")
task_id = f"queue-{uuid.uuid4().hex[:8]}"
task = DownloadTask(
task_id=task_id,
source="direct_download",
title="Queue Time Snapshot",
author="Queue Author",
user_id=user["id"],
username=user["username"],
)
assert main_module.backend.book_queue.add(task) is True
try:
row = _read_download_history_row(main_module, task_id)
assert row is not None
assert row["final_status"] == "active"
assert row["user_id"] == user["id"]
assert row["task_id"] == task_id
assert row["origin"] == "direct"
assert row["title"] == "Queue Time Snapshot"
assert row["author"] == "Queue Author"
assert row["queued_at"] is not None
finally:
main_module.backend.book_queue.cancel_download(task_id)
def test_queue_hook_records_requested_origin_for_request_linked_task(self, main_module):
user = _create_user(main_module, prefix="snap-queue-req")
task_id = f"queue-req-{uuid.uuid4().hex[:8]}"
request_row = main_module.user_db.create_request(
user_id=user["id"],
content_type="ebook",
request_level="release",
policy_mode="request_release",
book_data={
"title": "Requested Queue",
"author": "Request Author",
"provider": "openlibrary",
"provider_id": "queue-req-1",
},
release_data={
"source": "prowlarr",
"source_id": task_id,
"title": "Requested Queue.epub",
},
status="fulfilled",
delivery_state="queued",
)
task = DownloadTask(
task_id=task_id,
source="prowlarr",
title="Requested Queue",
user_id=user["id"],
username=user["username"],
request_id=request_row["id"],
)
assert main_module.backend.book_queue.add(task) is True
try:
row = _read_download_history_row(main_module, task_id)
assert row is not None
assert row["final_status"] == "active"
assert row["origin"] == "requested"
assert row["request_id"] == request_row["id"]
finally:
main_module.backend.book_queue.cancel_download(task_id)
def test_finalize_updates_active_row_to_terminal(self, main_module):
user = _create_user(main_module, prefix="snap-finalize")
task_id = f"finalize-{uuid.uuid4().hex[:8]}"
task = DownloadTask(
task_id=task_id,
source="direct_download",
title="Finalize Snapshot",
user_id=user["id"],
username=user["username"],
)
assert main_module.backend.book_queue.add(task) is True
try:
# Verify active row exists
row = _read_download_history_row(main_module, task_id)
assert row is not None
assert row["final_status"] == "active"
# Transition to complete
main_module.backend.book_queue.update_status(task_id, QueueStatus.COMPLETE)
row = _read_download_history_row(main_module, task_id)
assert row is not None
assert row["final_status"] == "complete"
# Metadata from queue-time should be preserved
assert row["title"] == "Finalize Snapshot"
assert row["user_id"] == user["id"]
finally:
main_module.backend.book_queue.cancel_download(task_id)
def test_cancelled_transition_does_not_trigger_notification(self, main_module):
user = _create_user(main_module, prefix="snap-notify-cancel")
task_id = f"notify-cancel-{uuid.uuid4().hex[:8]}"

View File

@@ -535,7 +535,7 @@ class TestCancelDownloadEndpointGuardrails:
db_user_id=user["id"],
is_admin=False,
)
main_module.user_db.create_request(
request_row = main_module.user_db.create_request(
user_id=user["id"],
content_type="ebook",
request_level="release",
@@ -560,6 +560,7 @@ class TestCancelDownloadEndpointGuardrails:
title="Requested Book",
user_id=user["id"],
username=user["username"],
request_id=request_row["id"],
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
@@ -721,7 +722,7 @@ class TestRetryDownloadEndpointGuardrails:
db_user_id=user["id"],
is_admin=False,
)
main_module.user_db.create_request(
request_row = main_module.user_db.create_request(
user_id=user["id"],
content_type="ebook",
request_level="release",
@@ -746,6 +747,7 @@ class TestRetryDownloadEndpointGuardrails:
title="Requested Book",
user_id=user["id"],
username=user["username"],
request_id=request_row["id"],
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):

View File

@@ -922,7 +922,7 @@ class TestRequestRoutes:
fulfil_resp = client.post(f"/api/admin/requests/{request_id}/fulfil", json={})
assert fulfil_resp.status_code == 400
assert "release_data is required to fulfil book-level requests" in fulfil_resp.json["error"]
assert "release_data is required to fulfil requests" in fulfil_resp.json["error"]
def test_admin_fulfil_book_level_request_manual_approval(self, main_module, client):
user = _create_user(main_module, prefix="reader")

View File

@@ -307,7 +307,7 @@ def test_fulfil_request_requires_release_data_for_book_level(user_db):
book_data=_book_data(),
)
with pytest.raises(RequestServiceError, match="release_data is required to fulfil book-level requests"):
with pytest.raises(RequestServiceError, match="release_data is required to fulfil requests"):
fulfil_request(
user_db,
request_id=created["id"],

View File

@@ -818,9 +818,6 @@ class TestDownloadRequests:
book_data=self._book_data(),
)
with pytest.raises(ValueError, match="request_level=release requires non-null release_data"):
user_db.update_request(created["id"], request_level="release")
updated = user_db.update_request(
created["id"],
request_level="release",
@@ -1015,36 +1012,6 @@ class TestDownloadRequests:
bob_rows = user_db.list_dismissed_requests(user_id=bob["id"])
assert [row["id"] for row in bob_rows] == [second["id"]]
def test_clear_request_dismissals_scopes_by_user(self, user_db):
alice = user_db.create_user(username="alice")
bob = user_db.create_user(username="bob")
alice_request = user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
bob_request = user_db.create_request(
user_id=bob["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
user_db.update_request(alice_request["id"], dismissed_at="2026-01-01T10:00:00+00:00")
user_db.update_request(bob_request["id"], dismissed_at="2026-01-01T11:00:00+00:00")
cleared_alice = user_db.clear_request_dismissals(user_id=alice["id"])
assert cleared_alice == 1
assert user_db.get_request(alice_request["id"])["dismissed_at"] is None
assert user_db.get_request(bob_request["id"])["dismissed_at"] is not None
cleared_all = user_db.clear_request_dismissals(user_id=None)
assert cleared_all == 1
assert user_db.get_request(bob_request["id"])["dismissed_at"] is None
def test_delete_dismissed_requests_scopes_by_user_and_only_deletes_terminal(self, user_db):
alice = user_db.create_user(username="alice")
bob = user_db.create_user(username="bob")
@@ -1092,8 +1059,5 @@ class TestDownloadRequests:
with pytest.raises(ValueError, match="user_id must be a positive integer"):
user_db.list_dismissed_requests(user_id=0)
with pytest.raises(ValueError, match="user_id must be a positive integer"):
user_db.clear_request_dismissals(user_id=-1)
with pytest.raises(ValueError, match="user_id must be a positive integer"):
user_db.delete_dismissed_requests(user_id=0)