Download history refactor (#700)

- Much simpler handling of downloads in the activity sidebar, and
improved storage, persistence and UI behavior.
- Replace `ActivityService` with direct storage on
`DownloadHistoryService` and `download_requests` and removes the
activity_log/activity_dismissals tables
- Simplify no-auth mode by removing the fake user row pattern, handled
internally
- Add local download fallback so history entries can still serve files
after tasks leave the queue
- Downloads, requests and history are now entirely persistent between
updates / restarts, and correctly tied to each user.
This commit is contained in:
Alex
2026-03-04 19:10:06 +00:00
committed by GitHub
parent bd65bccf52
commit fbe25725d3
26 changed files with 1682 additions and 1891 deletions

View File

@@ -2,17 +2,37 @@
from __future__ import annotations
from typing import Any, Callable
from datetime import datetime, timezone
from typing import Any, Callable, NamedTuple
from flask import Flask, jsonify, request, session
from shelfmark.core.activity_service import ActivityService
from shelfmark.core.download_history_service import DownloadHistoryService
from shelfmark.core.logger import setup_logger
from shelfmark.core.request_helpers import extract_release_source_id
from shelfmark.core.user_db import NO_AUTH_ACTIVITY_USERNAME, UserDB
from shelfmark.core.request_helpers import (
emit_ws_event,
extract_release_source_id,
normalize_positive_int,
now_utc_iso,
)
from shelfmark.core.user_db import UserDB
logger = setup_logger(__name__)
# Offset added to request row IDs so they don't collide with download_history
# row IDs when both types are merged into a single sorted list.
_REQUEST_ID_OFFSET = 1_000_000_000
def _parse_timestamp(value: Any) -> float:
if not isinstance(value, str) or not value.strip():
return 0.0
normalized = value.strip().replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return 0.0
def _require_authenticated(resolve_auth_mode: Callable[[], str]):
auth_mode = resolve_auth_mode()
@@ -91,95 +111,45 @@ def _resolve_db_user_id(
return parsed_db_user_id, None
def _ensure_no_auth_activity_user_id(user_db: UserDB) -> int | None:
"""Resolve a stable users.db identity for no-auth activity state."""
try:
user = user_db.get_user(username=NO_AUTH_ACTIVITY_USERNAME)
if user is None:
try:
user_db.create_user(
username=NO_AUTH_ACTIVITY_USERNAME,
display_name="No-auth Activity",
role="admin",
)
except ValueError:
# Another request may have created it between lookup and insert.
pass
user = user_db.get_user(username=NO_AUTH_ACTIVITY_USERNAME)
if user is None:
return None
user_id = int(user.get("id"))
return user_id if user_id > 0 else None
except Exception as exc:
logger.warning("Failed to resolve no-auth activity identity: %s", exc)
return None
class _ActorContext(NamedTuple):
db_user_id: int | None
is_no_auth: bool
is_admin: bool
owner_scope: int | None
def _resolve_activity_actor_user_id(
def _resolve_activity_actor(
*,
user_db: UserDB,
resolve_auth_mode: Callable[[], str],
) -> tuple[int | None, Any | None]:
"""Resolve acting user identity for activity mutations."""
) -> tuple[_ActorContext | None, Any | None]:
"""Resolve acting user identity for activity mutations.
Returns (actor, error_response). On success actor is non-None.
"""
if resolve_auth_mode() == "none":
no_auth_user_id = _ensure_no_auth_activity_user_id(user_db)
if no_auth_user_id is not None:
return no_auth_user_id, None
return None, (
jsonify(
{
"error": "User identity unavailable for activity workflow",
"code": "user_identity_unavailable",
}
),
403,
)
return _ActorContext(db_user_id=None, is_no_auth=True, is_admin=True, owner_scope=None), None
db_user_id, db_gate = _resolve_db_user_id(user_db=user_db)
if db_user_id is not None:
return db_user_id, None
if db_user_id is None:
return None, db_gate
return None, db_gate
is_admin = bool(session.get("is_admin"))
return _ActorContext(
db_user_id=db_user_id,
is_no_auth=False,
is_admin=is_admin,
owner_scope=None if is_admin else db_user_id,
), None
def _emit_activity_event(ws_manager: Any | None, *, room: str, payload: dict[str, Any]) -> None:
if ws_manager is None:
return
try:
socketio = getattr(ws_manager, "socketio", None)
is_enabled = getattr(ws_manager, "is_enabled", None)
if socketio is None or not callable(is_enabled) or not is_enabled():
return
socketio.emit("activity_update", payload, to=room)
except Exception as exc:
logger.warning("Failed to emit activity_update event: %s", exc)
def _list_admin_user_ids(user_db: UserDB) -> list[int]:
admin_ids: set[int] = set()
try:
users = user_db.list_users()
except Exception as exc:
logger.warning("Failed to list users while resolving admin dismissal scope: %s", exc)
return []
for user in users:
if not isinstance(user, dict):
continue
if str(user.get("username") or "").strip() == NO_AUTH_ACTIVITY_USERNAME:
continue
role = str(user.get("role") or "").strip().lower()
if role != "admin":
continue
try:
user_id = int(user.get("id"))
except (TypeError, ValueError):
continue
if user_id > 0:
admin_ids.add(user_id)
return sorted(admin_ids)
def _activity_ws_room(*, is_no_auth: bool, actor_db_user_id: int | None) -> str:
"""Resolve the WebSocket room for activity events."""
if is_no_auth:
return "admins"
if actor_db_user_id is not None:
return f"user_{actor_db_user_id}"
return "admins"
def _list_visible_requests(user_db: UserDB, *, is_admin: bool, db_user_id: int | None) -> list[dict[str, Any]]:
@@ -199,29 +169,18 @@ def _list_visible_requests(user_db: UserDB, *, is_admin: bool, db_user_id: int |
return user_db.list_requests(user_id=db_user_id)
def _parse_download_item_key(item_key: str) -> str | None:
def _parse_download_item_key(item_key: Any) -> str | None:
if not isinstance(item_key, str) or not item_key.startswith("download:"):
return None
task_id = item_key.split(":", 1)[1].strip()
return task_id or None
def _parse_request_item_key(item_key: str) -> int | None:
def _parse_request_item_key(item_key: Any) -> int | None:
if not isinstance(item_key, str) or not item_key.startswith("request:"):
return None
raw_id = item_key.split(":", 1)[1].strip()
try:
parsed = int(raw_id)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def _task_id_from_download_item_key(item_key: str) -> str | None:
task_id = _parse_download_item_key(item_key)
if task_id is None:
return None
return task_id
return normalize_positive_int(raw_id)
def _merge_terminal_snapshot_backfill(
@@ -237,10 +196,7 @@ def _merge_terminal_snapshot_backfill(
existing_task_ids.update(str(task_id) for task_id in bucket.keys())
for row in terminal_rows:
item_key = row.get("item_key")
if not isinstance(item_key, str):
continue
task_id = _task_id_from_download_item_key(item_key)
task_id = str(row.get("task_id") or "").strip()
if not task_id or task_id in existing_task_ids:
continue
@@ -248,39 +204,24 @@ def _merge_terminal_snapshot_backfill(
if final_status not in {"complete", "error", "cancelled"}:
continue
snapshot = row.get("snapshot")
if not isinstance(snapshot, dict):
continue
raw_download = snapshot.get("download")
if not isinstance(raw_download, dict):
continue
download_payload = dict(raw_download)
if not isinstance(download_payload.get("id"), str):
download_payload["id"] = task_id
download_payload = DownloadHistoryService.to_download_payload(row)
if final_status not in status or not isinstance(status.get(final_status), dict):
status[final_status] = {}
status[final_status][task_id] = download_payload
existing_task_ids.add(task_id)
def _collect_active_download_item_keys(status: dict[str, dict[str, Any]]) -> set[str]:
active_keys: set[str] = set()
def _collect_active_download_task_ids(status: dict[str, dict[str, Any]]) -> set[str]:
active_task_ids: set[str] = set()
for bucket_key in ("queued", "resolving", "locating", "downloading"):
bucket = status.get(bucket_key)
if not isinstance(bucket, dict):
continue
for task_id in bucket.keys():
normalized_task_id = str(task_id).strip()
if not normalized_task_id:
continue
active_keys.add(f"download:{normalized_task_id}")
return active_keys
def _extract_request_source_id(row: dict[str, Any]) -> str | None:
return extract_release_source_id(row.get("release_data"))
if normalized_task_id:
active_task_ids.add(normalized_task_id)
return active_task_ids
def _request_terminal_status(row: dict[str, Any]) -> str | None:
@@ -319,7 +260,7 @@ def _minimal_request_snapshot(request_row: dict[str, Any], request_id: int) -> d
"note": request_row.get("note"),
"admin_note": request_row.get("admin_note"),
"created_at": request_row.get("created_at"),
"updated_at": request_row.get("updated_at"),
"updated_at": request_row.get("reviewed_at") or request_row.get("created_at"),
}
username = request_row.get("username")
if isinstance(username, str):
@@ -327,55 +268,47 @@ def _minimal_request_snapshot(request_row: dict[str, Any], request_id: int) -> d
return {"kind": "request", "request": minimal_request}
def _get_existing_activity_log_id_for_item(
*,
activity_service: ActivityService,
user_db: UserDB,
item_type: str,
item_key: str,
) -> int | None:
if item_type not in {"request", "download"}:
return None
if not isinstance(item_key, str) or not item_key.strip():
return None
existing_log_id = activity_service.get_latest_activity_log_id(
item_type=item_type,
item_key=item_key,
)
if existing_log_id is not None or item_type != "request":
return existing_log_id
request_id = _parse_request_item_key(item_key)
def _request_history_entry(request_row: dict[str, Any]) -> dict[str, Any] | None:
request_id = normalize_positive_int(request_row.get("id"))
if request_id is None:
return None
row = user_db.get_request(request_id)
if row is None:
return None
final_status = _request_terminal_status(request_row)
return {
"id": _REQUEST_ID_OFFSET + request_id,
"user_id": request_row.get("user_id"),
"item_type": "request",
"item_key": f"request:{request_id}",
"dismissed_at": request_row.get("dismissed_at"),
"snapshot": _minimal_request_snapshot(request_row, request_id),
"origin": "request",
"final_status": final_status,
"terminal_at": request_row.get("reviewed_at") or request_row.get("created_at"),
"request_id": request_id,
"source_id": extract_release_source_id(request_row.get("release_data")),
}
final_status = _request_terminal_status(row)
if final_status is None:
return None
source_id = _extract_request_source_id(row)
payload = activity_service.record_terminal_snapshot(
user_id=row.get("user_id"),
item_type="request",
item_key=item_key,
origin="request",
final_status=final_status,
snapshot=_minimal_request_snapshot(row, request_id),
request_id=request_id,
source_id=source_id,
)
return int(payload["id"])
def _dedupe_dismissed_entries(entries: list[dict[str, str]]) -> list[dict[str, str]]:
seen: set[tuple[str, str]] = set()
result: list[dict[str, str]] = []
for entry in entries:
item_type = str(entry.get("item_type") or "").strip().lower()
item_key = str(entry.get("item_key") or "").strip()
if item_type not in {"download", "request"} or not item_key:
continue
marker = (item_type, item_key)
if marker in seen:
continue
seen.add(marker)
result.append({"item_type": item_type, "item_key": item_key})
return result
def register_activity_routes(
app: Flask,
user_db: UserDB,
*,
activity_service: ActivityService,
download_history_service: DownloadHistoryService,
resolve_auth_mode: Callable[[], str],
resolve_status_scope: Callable[[], tuple[bool, int | None, bool]],
queue_status: Callable[..., dict[str, dict[str, Any]]],
@@ -403,53 +336,69 @@ def register_activity_routes(
403,
)
if resolve_auth_mode() == "none":
viewer_db_user_id = _ensure_no_auth_activity_user_id(user_db)
else:
viewer_db_user_id, _ = _resolve_db_user_id(
require_in_auth_mode=False,
user_db=user_db,
)
scoped_user_id = None if is_admin else db_user_id
status = queue_status(user_id=scoped_user_id)
owner_user_scope = None if is_admin else db_user_id
status = queue_status(user_id=owner_user_scope)
updated_requests = sync_request_delivery_states(
user_db,
queue_status=status,
user_id=scoped_user_id,
user_id=owner_user_scope,
)
emit_request_updates(updated_requests)
request_rows = _list_visible_requests(user_db, is_admin=is_admin, db_user_id=db_user_id)
if viewer_db_user_id is not None:
owner_user_scope = None if is_admin else db_user_id
if not is_admin and owner_user_scope is None:
owner_user_scope = viewer_db_user_id
try:
terminal_rows = activity_service.get_undismissed_terminal_downloads(
viewer_db_user_id,
owner_user_id=owner_user_scope,
limit=200,
)
_merge_terminal_snapshot_backfill(status=status, terminal_rows=terminal_rows)
except Exception as exc:
logger.warning("Failed to merge terminal snapshot backfill rows: %s", exc)
if viewer_db_user_id is not None:
active_download_keys = _collect_active_download_item_keys(status)
if active_download_keys:
try:
activity_service.clear_dismissals_for_item_keys(
user_id=viewer_db_user_id,
item_type="download",
item_keys=active_download_keys,
)
except Exception as exc:
logger.warning("Failed to clear stale download dismissals for active tasks: %s", exc)
try:
terminal_rows = download_history_service.get_undismissed_terminal(
user_id=owner_user_scope,
limit=200,
)
_merge_terminal_snapshot_backfill(status=status, terminal_rows=terminal_rows)
except Exception as exc:
logger.warning("Failed to merge terminal download history rows: %s", exc)
dismissed: list[dict[str, str]] = []
# Admins can view unscoped queue status, but dismissals remain per-viewer.
if viewer_db_user_id is not None:
dismissed = activity_service.get_dismissal_set(viewer_db_user_id)
dismissed_task_ids: list[str] = []
try:
dismissed_task_ids = download_history_service.get_dismissed_keys(
user_id=owner_user_scope,
)
except Exception as exc:
logger.warning("Failed to load dismissed download keys: %s", exc)
# Only clear stale dismissals when active downloads overlap dismissed keys.
active_task_ids = _collect_active_download_task_ids(status)
stale_dismissed = active_task_ids & set(dismissed_task_ids) if active_task_ids else set()
if stale_dismissed:
try:
download_history_service.clear_dismissals_for_active(
task_ids=stale_dismissed,
user_id=owner_user_scope,
)
dismissed_task_ids = [tid for tid in dismissed_task_ids if tid not in stale_dismissed]
except Exception as exc:
logger.warning("Failed to clear stale download dismissals for active tasks: %s", exc)
dismissed.extend(
{"item_type": "download", "item_key": f"download:{task_id}"}
for task_id in dismissed_task_ids
)
# Keep request dismissal state on the request rows directly.
try:
dismissed_request_rows = user_db.list_dismissed_requests(user_id=owner_user_scope)
for request_row in dismissed_request_rows:
request_id = normalize_positive_int(request_row.get("id"))
if request_id is None:
continue
dismissed.append({"item_type": "request", "item_key": f"request:{request_id}"})
except Exception as exc:
logger.warning("Failed to load dismissed request keys: %s", exc)
if not is_admin and db_user_id is None:
# In auth mode, if we can't identify a non-admin viewer, don't show dismissals.
dismissed = []
else:
dismissed = _dedupe_dismissed_entries(dismissed)
return jsonify(
{
@@ -465,68 +414,75 @@ def register_activity_routes(
if auth_gate is not None:
return auth_gate
db_user_id, db_gate = _resolve_activity_actor_user_id(
actor, actor_error = _resolve_activity_actor(
user_db=user_db,
resolve_auth_mode=resolve_auth_mode,
)
if db_gate is not None or db_user_id is None:
return db_gate
if actor_error is not None:
return actor_error
data = request.get_json(silent=True)
if not isinstance(data, dict):
return jsonify({"error": "Invalid payload"}), 400
activity_log_id = data.get("activity_log_id")
if activity_log_id is None:
try:
activity_log_id = _get_existing_activity_log_id_for_item(
activity_service=activity_service,
user_db=user_db,
item_type=data.get("item_type"),
item_key=data.get("item_key"),
)
except Exception as exc:
logger.warning("Failed to resolve activity snapshot id for dismiss payload: %s", exc)
activity_log_id = None
item_type = str(data.get("item_type") or "").strip().lower()
target_user_ids = [db_user_id]
if bool(session.get("is_admin")) and item_type == "request":
admin_ids = _list_admin_user_ids(user_db)
if db_user_id not in admin_ids:
admin_ids.append(db_user_id)
target_user_ids = sorted(set(admin_ids))
item_key = data.get("item_key")
dismissal = None
try:
for target_user_id in target_user_ids:
target_dismissal = activity_service.dismiss_item(
user_id=target_user_id,
item_type=data.get("item_type"),
item_key=data.get("item_key"),
activity_log_id=activity_log_id,
)
if target_user_id == db_user_id:
dismissal = target_dismissal
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
dismissal_item: dict[str, str] | None = None
if dismissal is None:
return jsonify({"error": "Failed to persist dismissal"}), 500
if item_type == "download":
task_id = _parse_download_item_key(item_key)
if task_id is None:
return jsonify({"error": "item_key must be in the format download:<task_id>"}), 400
for target_user_id in target_user_ids:
_emit_activity_event(
ws_manager,
room=f"user_{target_user_id}",
payload={
"kind": "dismiss",
"user_id": target_user_id,
"item_type": dismissal["item_type"],
"item_key": dismissal["item_key"],
},
existing = download_history_service.get_by_task_id(task_id)
if existing is None:
return jsonify({"error": "Activity item not found"}), 404
owner_user_id = normalize_positive_int(existing.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
dismissed_count = download_history_service.dismiss(
task_id=task_id,
user_id=actor.owner_scope,
)
if dismissed_count < 1:
return jsonify({"error": "Activity item not found"}), 404
return jsonify({"status": "dismissed", "item": dismissal})
dismissal_item = {"item_type": "download", "item_key": f"download:{task_id}"}
elif item_type == "request":
request_id = _parse_request_item_key(item_key)
if request_id is None:
return jsonify({"error": "item_key must be in the format request:<id>"}), 400
request_row = user_db.get_request(request_id)
if request_row is None:
return jsonify({"error": "Request not found"}), 404
owner_user_id = normalize_positive_int(request_row.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
user_db.update_request(request_id, dismissed_at=now_utc_iso())
dismissal_item = {"item_type": "request", "item_key": f"request:{request_id}"}
else:
return jsonify({"error": "item_type must be one of: download, request"}), 400
room = _activity_ws_room(is_no_auth=actor.is_no_auth, actor_db_user_id=actor.db_user_id)
emit_ws_event(
ws_manager,
event_name="activity_update",
room=room,
payload={
"kind": "dismiss",
"item_type": dismissal_item["item_type"],
"item_key": dismissal_item["item_key"],
},
)
return jsonify({"status": "dismissed", "item": dismissal_item})
@app.route("/api/activity/dismiss-many", methods=["POST"])
def api_activity_dismiss_many():
@@ -534,12 +490,12 @@ def register_activity_routes(
if auth_gate is not None:
return auth_gate
db_user_id, db_gate = _resolve_activity_actor_user_id(
actor, actor_error = _resolve_activity_actor(
user_db=user_db,
resolve_auth_mode=resolve_auth_mode,
)
if db_gate is not None or db_user_id is None:
return db_gate
if actor_error is not None:
return actor_error
data = request.get_json(silent=True)
if not isinstance(data, dict):
@@ -548,66 +504,66 @@ def register_activity_routes(
if not isinstance(items, list):
return jsonify({"error": "items must be an array"}), 400
normalized_items: list[dict[str, Any]] = []
download_task_ids: list[str] = []
request_ids: list[int] = []
for item in items:
if not isinstance(item, dict):
return jsonify({"error": "items must contain objects"}), 400
activity_log_id = item.get("activity_log_id")
if activity_log_id is None:
try:
activity_log_id = _get_existing_activity_log_id_for_item(
activity_service=activity_service,
user_db=user_db,
item_type=item.get("item_type"),
item_key=item.get("item_key"),
)
except Exception as exc:
logger.warning("Failed to resolve activity snapshot id for dismiss-many item: %s", exc)
activity_log_id = None
item_type = str(item.get("item_type") or "").strip().lower()
item_key = item.get("item_key")
normalized_payload = {
"item_type": item.get("item_type"),
"item_key": item.get("item_key"),
}
if activity_log_id is not None:
normalized_payload["activity_log_id"] = activity_log_id
normalized_items.append(normalized_payload)
if item_type == "download":
task_id = _parse_download_item_key(item_key)
if task_id is None:
return jsonify({"error": "download item_key must be in the format download:<task_id>"}), 400
existing = download_history_service.get_by_task_id(task_id)
if existing is None:
continue
owner_user_id = normalize_positive_int(existing.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
download_task_ids.append(task_id)
continue
request_items = [
item
for item in normalized_items
if str(item.get("item_type") or "").strip().lower() == "request"
]
actor_is_admin = bool(session.get("is_admin"))
target_user_ids = [db_user_id]
if actor_is_admin and request_items:
admin_ids = _list_admin_user_ids(user_db)
if db_user_id not in admin_ids:
admin_ids.append(db_user_id)
target_user_ids = sorted(set(admin_ids))
if item_type == "request":
request_id = _parse_request_item_key(item_key)
if request_id is None:
return jsonify({"error": "request item_key must be in the format request:<id>"}), 400
request_row = user_db.get_request(request_id)
if request_row is None:
continue
owner_user_id = normalize_positive_int(request_row.get("user_id"))
if not actor.is_admin and owner_user_id != actor.db_user_id:
return jsonify({"error": "Forbidden"}), 403
request_ids.append(request_id)
continue
try:
dismissed_count = activity_service.dismiss_many(user_id=db_user_id, items=normalized_items)
if actor_is_admin and request_items:
for target_user_id in target_user_ids:
if target_user_id == db_user_id:
continue
activity_service.dismiss_many(user_id=target_user_id, items=request_items)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
return jsonify({"error": "item_type must be one of: download, request"}), 400
for target_user_id in target_user_ids:
target_count = dismissed_count if target_user_id == db_user_id else len(request_items)
_emit_activity_event(
ws_manager,
room=f"user_{target_user_id}",
payload={
"kind": "dismiss_many",
"user_id": target_user_id,
"count": target_count,
},
)
dismissed_download_count = download_history_service.dismiss_many(
task_ids=download_task_ids,
user_id=actor.owner_scope,
)
dismissed_request_count = user_db.dismiss_requests_batch(
request_ids=request_ids,
dismissed_at=now_utc_iso(),
)
dismissed_count = dismissed_download_count + dismissed_request_count
room = _activity_ws_room(is_no_auth=actor.is_no_auth, actor_db_user_id=actor.db_user_id)
emit_ws_event(
ws_manager,
event_name="activity_update",
room=room,
payload={
"kind": "dismiss_many",
"count": dismissed_count,
},
)
return jsonify({"status": "dismissed", "count": dismissed_count})
@@ -617,21 +573,48 @@ def register_activity_routes(
if auth_gate is not None:
return auth_gate
db_user_id, db_gate = _resolve_activity_actor_user_id(
actor, actor_error = _resolve_activity_actor(
user_db=user_db,
resolve_auth_mode=resolve_auth_mode,
)
if db_gate is not None or db_user_id is None:
return db_gate
if actor_error is not None:
return actor_error
limit = request.args.get("limit", type=int, default=50) or 50
offset = request.args.get("offset", type=int, default=0) or 0
limit = request.args.get("limit", type=int, default=50)
offset = request.args.get("offset", type=int, default=0)
if limit is None:
limit = 50
if offset is None:
offset = 0
if limit < 1:
return jsonify({"error": "limit must be a positive integer"}), 400
if offset < 0:
return jsonify({"error": "offset must be a non-negative integer"}), 400
try:
history = activity_service.get_history(db_user_id, limit=limit, offset=offset)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
return jsonify(history)
# We combine download + request history and apply pagination over the merged list.
max_rows = min(limit + offset + 500, 5000)
download_history_rows = download_history_service.get_history(
user_id=actor.owner_scope,
limit=max_rows,
offset=0,
)
dismissed_request_rows = user_db.list_dismissed_requests(user_id=actor.owner_scope, limit=max_rows)
request_history_rows = [
entry
for entry in (_request_history_entry(row) for row in dismissed_request_rows)
if entry is not None
]
combined = [*download_history_rows, *request_history_rows]
combined.sort(
key=lambda row: (
_parse_timestamp(row.get("dismissed_at")),
int(row.get("id") or 0),
),
reverse=True,
)
paged = combined[offset:offset + limit]
return jsonify(paged)
@app.route("/api/activity/history", methods=["DELETE"])
def api_activity_history_clear():
@@ -639,20 +622,24 @@ def register_activity_routes(
if auth_gate is not None:
return auth_gate
db_user_id, db_gate = _resolve_activity_actor_user_id(
actor, actor_error = _resolve_activity_actor(
user_db=user_db,
resolve_auth_mode=resolve_auth_mode,
)
if db_gate is not None or db_user_id is None:
return db_gate
if actor_error is not None:
return actor_error
deleted_count = activity_service.clear_history(db_user_id)
_emit_activity_event(
deleted_downloads = download_history_service.clear_dismissed(user_id=actor.owner_scope)
deleted_requests = user_db.delete_dismissed_requests(user_id=actor.owner_scope)
deleted_count = deleted_downloads + deleted_requests
room = _activity_ws_room(is_no_auth=actor.is_no_auth, actor_db_user_id=actor.db_user_id)
emit_ws_event(
ws_manager,
room=f"user_{db_user_id}",
event_name="activity_update",
room=room,
payload={
"kind": "history_cleared",
"user_id": db_user_id,
"count": deleted_count,
},
)

View File

@@ -1,639 +0,0 @@
"""Persistence helpers for Activity dismissals and terminal snapshots."""
from __future__ import annotations
from datetime import datetime, timezone
import json
import sqlite3
from typing import Any, Iterable
VALID_ITEM_TYPES = frozenset({"download", "request"})
VALID_ORIGINS = frozenset({"direct", "request", "requested"})
VALID_FINAL_STATUSES = frozenset({"complete", "error", "cancelled", "rejected"})
def _now_timestamp() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _normalize_item_type(item_type: Any) -> str:
if not isinstance(item_type, str):
raise ValueError("item_type must be a string")
normalized = item_type.strip().lower()
if normalized not in VALID_ITEM_TYPES:
raise ValueError("item_type must be one of: download, request")
return normalized
def _normalize_item_key(item_key: Any) -> str:
if not isinstance(item_key, str):
raise ValueError("item_key must be a string")
normalized = item_key.strip()
if not normalized:
raise ValueError("item_key must not be empty")
return normalized
def _normalize_origin(origin: Any) -> str:
if not isinstance(origin, str):
raise ValueError("origin must be a string")
normalized = origin.strip().lower()
if normalized not in VALID_ORIGINS:
raise ValueError("origin must be one of: direct, request, requested")
return normalized
def _normalize_final_status(final_status: Any) -> str:
if not isinstance(final_status, str):
raise ValueError("final_status must be a string")
normalized = final_status.strip().lower()
if normalized not in VALID_FINAL_STATUSES:
raise ValueError("final_status must be one of: complete, error, cancelled, rejected")
return normalized
def build_item_key(item_type: str, raw_id: Any) -> str:
"""Build a stable item key used by dismiss/history APIs."""
normalized_type = _normalize_item_type(item_type)
if normalized_type == "request":
try:
request_id = int(raw_id)
except (TypeError, ValueError) as exc:
raise ValueError("request item IDs must be integers") from exc
if request_id < 1:
raise ValueError("request item IDs must be positive integers")
return f"request:{request_id}"
if not isinstance(raw_id, str):
raise ValueError("download item IDs must be strings")
task_id = raw_id.strip()
if not task_id:
raise ValueError("download item IDs must not be empty")
return f"download:{task_id}"
def build_request_item_key(request_id: int) -> str:
"""Build a request item key."""
return build_item_key("request", request_id)
def build_download_item_key(task_id: str) -> str:
"""Build a download item key."""
return build_item_key("download", task_id)
def _parse_request_id_from_item_key(item_key: Any) -> int | None:
if not isinstance(item_key, str) or not item_key.startswith("request:"):
return None
raw_value = item_key.split(":", 1)[1].strip()
try:
parsed = int(raw_value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def _request_final_status(request_status: Any, delivery_state: Any) -> str | None:
status = str(request_status or "").strip().lower()
if status == "pending":
return None
if status == "rejected":
return "rejected"
if status == "cancelled":
return "cancelled"
if status != "fulfilled":
return None
delivery = str(delivery_state or "").strip().lower()
if delivery in {"error", "cancelled"}:
return delivery
return "complete"
class ActivityService:
"""Service for per-user activity dismissals and terminal history snapshots."""
def __init__(self, db_path: str):
self._db_path = db_path
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
@staticmethod
def _coerce_positive_int(value: Any, field: str) -> int:
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{field} must be an integer") from exc
if parsed < 1:
raise ValueError(f"{field} must be a positive integer")
return parsed
@staticmethod
def _row_to_dict(row: sqlite3.Row | None) -> dict[str, Any] | None:
return dict(row) if row is not None else None
@staticmethod
def _parse_json_column(value: Any) -> Any:
if not isinstance(value, str):
return None
try:
return json.loads(value)
except (ValueError, TypeError):
return None
def _build_legacy_request_snapshot(
self,
conn: sqlite3.Connection,
request_id: int,
) -> tuple[dict[str, Any] | None, str | None]:
request_row = conn.execute(
"""
SELECT
id,
user_id,
status,
delivery_state,
request_level,
book_data,
release_data,
note,
admin_note,
created_at,
reviewed_at
FROM download_requests
WHERE id = ?
""",
(request_id,),
).fetchone()
if request_row is None:
return None, None
row_dict = dict(request_row)
book_data = self._parse_json_column(row_dict.get("book_data"))
release_data = self._parse_json_column(row_dict.get("release_data"))
if not isinstance(book_data, dict):
book_data = {}
if not isinstance(release_data, dict):
release_data = {}
snapshot = {
"kind": "request",
"request": {
"id": int(row_dict["id"]),
"user_id": row_dict.get("user_id"),
"status": row_dict.get("status"),
"delivery_state": row_dict.get("delivery_state"),
"request_level": row_dict.get("request_level"),
"book_data": book_data,
"release_data": release_data,
"note": row_dict.get("note"),
"admin_note": row_dict.get("admin_note"),
"created_at": row_dict.get("created_at"),
"updated_at": row_dict.get("reviewed_at") or row_dict.get("created_at"),
},
}
final_status = _request_final_status(row_dict.get("status"), row_dict.get("delivery_state"))
return snapshot, final_status
def record_terminal_snapshot(
self,
*,
user_id: int | None,
item_type: str,
item_key: str,
origin: str,
final_status: str,
snapshot: dict[str, Any],
request_id: int | None = None,
source_id: str | None = None,
terminal_at: str | None = None,
) -> dict[str, Any]:
"""Record a durable terminal-state snapshot for an activity item."""
normalized_item_type = _normalize_item_type(item_type)
normalized_item_key = _normalize_item_key(item_key)
normalized_origin = _normalize_origin(origin)
normalized_final_status = _normalize_final_status(final_status)
if not isinstance(snapshot, dict):
raise ValueError("snapshot must be an object")
if user_id is not None:
user_id = self._coerce_positive_int(user_id, "user_id")
if request_id is not None:
request_id = self._coerce_positive_int(request_id, "request_id")
if source_id is not None and not isinstance(source_id, str):
raise ValueError("source_id must be a string when provided")
if source_id is not None:
source_id = source_id.strip() or None
effective_terminal_at = terminal_at if isinstance(terminal_at, str) and terminal_at.strip() else _now_timestamp()
serialized_snapshot = json.dumps(snapshot, separators=(",", ":"), ensure_ascii=False)
conn = self._connect()
try:
cursor = conn.execute(
"""
INSERT INTO activity_log (
user_id,
item_type,
item_key,
request_id,
source_id,
origin,
final_status,
snapshot_json,
terminal_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
normalized_item_type,
normalized_item_key,
request_id,
source_id,
normalized_origin,
normalized_final_status,
serialized_snapshot,
effective_terminal_at,
),
)
snapshot_id = int(cursor.lastrowid)
conn.commit()
row = conn.execute(
"SELECT * FROM activity_log WHERE id = ?",
(snapshot_id,),
).fetchone()
payload = self._row_to_dict(row)
if payload is None:
raise ValueError("Failed to read back recorded activity snapshot")
return payload
finally:
conn.close()
def get_latest_activity_log_id(self, *, item_type: str, item_key: str) -> int | None:
"""Get the newest snapshot ID for an item key."""
normalized_item_type = _normalize_item_type(item_type)
normalized_item_key = _normalize_item_key(item_key)
conn = self._connect()
try:
row = conn.execute(
"""
SELECT id
FROM activity_log
WHERE item_type = ? AND item_key = ?
ORDER BY terminal_at DESC, id DESC
LIMIT 1
""",
(normalized_item_type, normalized_item_key),
).fetchone()
if row is None:
return None
return int(row["id"])
finally:
conn.close()
def dismiss_item(
self,
*,
user_id: int,
item_type: str,
item_key: str,
activity_log_id: int | None = None,
) -> dict[str, Any]:
"""Dismiss an item for a specific user (upsert)."""
normalized_user_id = self._coerce_positive_int(user_id, "user_id")
normalized_item_type = _normalize_item_type(item_type)
normalized_item_key = _normalize_item_key(item_key)
normalized_log_id = (
self._coerce_positive_int(activity_log_id, "activity_log_id")
if activity_log_id is not None
else self.get_latest_activity_log_id(
item_type=normalized_item_type,
item_key=normalized_item_key,
)
)
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO activity_dismissals (
user_id,
item_type,
item_key,
activity_log_id,
dismissed_at
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id, item_type, item_key)
DO UPDATE SET
activity_log_id = excluded.activity_log_id,
dismissed_at = excluded.dismissed_at
""",
(
normalized_user_id,
normalized_item_type,
normalized_item_key,
normalized_log_id,
_now_timestamp(),
),
)
conn.commit()
row = conn.execute(
"""
SELECT *
FROM activity_dismissals
WHERE user_id = ? AND item_type = ? AND item_key = ?
""",
(normalized_user_id, normalized_item_type, normalized_item_key),
).fetchone()
payload = self._row_to_dict(row)
if payload is None:
raise ValueError("Failed to read back dismissal row")
return payload
finally:
conn.close()
def dismiss_many(self, *, user_id: int, items: Iterable[dict[str, Any]]) -> int:
"""Dismiss many items for one user."""
normalized_user_id = self._coerce_positive_int(user_id, "user_id")
normalized_items: list[tuple[str, str, int | None]] = []
for item in items:
if not isinstance(item, dict):
raise ValueError("items must contain objects")
normalized_item_type = _normalize_item_type(item.get("item_type"))
normalized_item_key = _normalize_item_key(item.get("item_key"))
raw_log_id = item.get("activity_log_id")
normalized_log_id = (
self._coerce_positive_int(raw_log_id, "activity_log_id")
if raw_log_id is not None
else self.get_latest_activity_log_id(
item_type=normalized_item_type,
item_key=normalized_item_key,
)
)
normalized_items.append((normalized_item_type, normalized_item_key, normalized_log_id))
if not normalized_items:
return 0
conn = self._connect()
try:
timestamp = _now_timestamp()
for item_type, item_key, activity_log_id in normalized_items:
conn.execute(
"""
INSERT INTO activity_dismissals (
user_id,
item_type,
item_key,
activity_log_id,
dismissed_at
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id, item_type, item_key)
DO UPDATE SET
activity_log_id = excluded.activity_log_id,
dismissed_at = excluded.dismissed_at
""",
(
normalized_user_id,
item_type,
item_key,
activity_log_id,
timestamp,
),
)
conn.commit()
return len(normalized_items)
finally:
conn.close()
def get_dismissal_set(self, user_id: int) -> list[dict[str, str]]:
"""Return dismissed item keys for one user."""
normalized_user_id = self._coerce_positive_int(user_id, "user_id")
conn = self._connect()
try:
rows = conn.execute(
"""
SELECT item_type, item_key
FROM activity_dismissals
WHERE user_id = ?
ORDER BY dismissed_at DESC, id DESC
""",
(normalized_user_id,),
).fetchall()
return [
{
"item_type": str(row["item_type"]),
"item_key": str(row["item_key"]),
}
for row in rows
]
finally:
conn.close()
def clear_dismissals_for_item_keys(
self,
*,
user_id: int,
item_type: str,
item_keys: Iterable[str],
) -> int:
"""Clear dismissals for one user + item type + item keys."""
normalized_user_id = self._coerce_positive_int(user_id, "user_id")
normalized_item_type = _normalize_item_type(item_type)
normalized_keys = {
_normalize_item_key(item_key)
for item_key in item_keys
if isinstance(item_key, str) and item_key.strip()
}
if not normalized_keys:
return 0
conn = self._connect()
try:
cursor = conn.executemany(
"""
DELETE FROM activity_dismissals
WHERE user_id = ? AND item_type = ? AND item_key = ?
""",
(
(normalized_user_id, normalized_item_type, item_key)
for item_key in normalized_keys
),
)
conn.commit()
return int(cursor.rowcount or 0)
finally:
conn.close()
def get_history(self, user_id: int, *, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
"""Return paged dismissal history for one user."""
normalized_user_id = self._coerce_positive_int(user_id, "user_id")
normalized_limit = max(1, min(int(limit), 200))
normalized_offset = max(0, int(offset))
conn = self._connect()
try:
rows = conn.execute(
"""
SELECT
d.id,
d.user_id,
d.item_type,
d.item_key,
d.activity_log_id,
d.dismissed_at,
l.snapshot_json,
l.origin,
l.final_status,
l.terminal_at,
l.request_id,
l.source_id
FROM activity_dismissals d
LEFT JOIN activity_log l ON l.id = d.activity_log_id
WHERE d.user_id = ?
ORDER BY d.dismissed_at DESC, d.id DESC
LIMIT ? OFFSET ?
""",
(normalized_user_id, normalized_limit, normalized_offset),
).fetchall()
payload: list[dict[str, Any]] = []
for row in rows:
row_dict = dict(row)
raw_snapshot_json = row_dict.pop("snapshot_json", None)
snapshot_payload = None
if isinstance(raw_snapshot_json, str):
try:
snapshot_payload = json.loads(raw_snapshot_json)
except (ValueError, TypeError):
snapshot_payload = None
if snapshot_payload is None and row_dict.get("item_type") == "request":
request_id = row_dict.get("request_id")
if request_id is None:
request_id = _parse_request_id_from_item_key(row_dict.get("item_key"))
try:
normalized_request_id = int(request_id) if request_id is not None else None
except (TypeError, ValueError):
normalized_request_id = None
if normalized_request_id and normalized_request_id > 0:
fallback_snapshot, fallback_final_status = self._build_legacy_request_snapshot(
conn,
normalized_request_id,
)
if fallback_snapshot is not None:
snapshot_payload = fallback_snapshot
if not row_dict.get("origin"):
row_dict["origin"] = "request"
if not row_dict.get("final_status") and fallback_final_status is not None:
row_dict["final_status"] = fallback_final_status
row_dict["snapshot"] = snapshot_payload
payload.append(row_dict)
return payload
finally:
conn.close()
def get_undismissed_terminal_downloads(
self,
viewer_user_id: int,
*,
owner_user_id: int | None,
limit: int = 200,
) -> list[dict[str, Any]]:
"""Return latest undismissed terminal download snapshots for a viewer.
`viewer_user_id` controls which dismissals are applied.
`owner_user_id` scopes activity rows to one owner when provided; when
omitted, rows across all owners are considered.
"""
normalized_viewer_user_id = self._coerce_positive_int(viewer_user_id, "viewer_user_id")
normalized_owner_user_id = (
self._coerce_positive_int(owner_user_id, "owner_user_id")
if owner_user_id is not None
else None
)
normalized_limit = max(1, min(int(limit), 500))
conn = self._connect()
try:
rows = conn.execute(
"""
SELECT
l.id,
l.user_id,
l.item_type,
l.item_key,
l.request_id,
l.source_id,
l.origin,
l.final_status,
l.snapshot_json,
l.terminal_at
FROM activity_log l
LEFT JOIN activity_dismissals d
ON d.user_id = ?
AND d.item_type = l.item_type
AND d.item_key = l.item_key
WHERE (? IS NULL OR l.user_id = ?)
AND l.item_type = 'download'
AND l.final_status IN ('complete', 'error', 'cancelled')
AND d.id IS NULL
ORDER BY l.terminal_at DESC, l.id DESC
LIMIT ?
""",
(
normalized_viewer_user_id,
normalized_owner_user_id,
normalized_owner_user_id,
normalized_limit * 2,
),
).fetchall()
payload: list[dict[str, Any]] = []
seen_item_keys: set[str] = set()
for row in rows:
row_dict = dict(row)
item_key = str(row_dict.get("item_key") or "")
if not item_key or item_key in seen_item_keys:
continue
seen_item_keys.add(item_key)
raw_snapshot_json = row_dict.pop("snapshot_json", None)
snapshot_payload = None
if isinstance(raw_snapshot_json, str):
try:
snapshot_payload = json.loads(raw_snapshot_json)
except (ValueError, TypeError):
snapshot_payload = None
row_dict["snapshot"] = snapshot_payload
payload.append(row_dict)
if len(payload) >= normalized_limit:
break
return payload
finally:
conn.close()
def clear_history(self, user_id: int) -> int:
"""Delete all dismissals for a user and return deleted row count."""
normalized_user_id = self._coerce_positive_int(user_id, "user_id")
conn = self._connect()
try:
cursor = conn.execute(
"DELETE FROM activity_dismissals WHERE user_id = ?",
(normalized_user_id,),
)
conn.commit()
return int(cursor.rowcount or 0)
finally:
conn.close()

View File

@@ -33,7 +33,7 @@ from shelfmark.core.auth_modes import (
from shelfmark.core.cwa_user_sync import sync_cwa_users_from_rows
from shelfmark.core.logger import setup_logger
from shelfmark.core.settings_registry import load_config_file
from shelfmark.core.user_db import NO_AUTH_ACTIVITY_USERNAME, UserDB
from shelfmark.core.user_db import UserDB
logger = setup_logger(__name__)
@@ -104,9 +104,6 @@ def _serialize_user(
return payload
def _is_internal_system_user(user: dict[str, Any]) -> bool:
username = str(user.get("username") or "").strip()
return username == NO_AUTH_ACTIVITY_USERNAME
def _sync_all_cwa_users(user_db: UserDB) -> dict[str, int]:
@@ -153,7 +150,7 @@ def register_admin_routes(app: Flask, user_db: UserDB) -> None:
@_require_admin
def admin_list_users():
"""List all users."""
users = [u for u in user_db.list_users() if not _is_internal_system_user(u)]
users = user_db.list_users()
auth_mode = g.auth_mode
security_config = load_config_file("security")
return jsonify([
@@ -191,8 +188,8 @@ def register_admin_routes(app: Flask, user_db: UserDB) -> None:
return jsonify({"error": "Role must be 'admin' or 'user'"}), 400
# First user is always admin
real_users = [u for u in user_db.list_users() if not _is_internal_system_user(u)]
if not real_users:
existing_users = user_db.list_users()
if not existing_users:
role = "admin"
# Check if username already exists

View File

@@ -16,7 +16,7 @@ from shelfmark.core.user_settings_overrides import (
get_ordered_user_overridable_fields as _get_ordered_user_overridable_fields,
get_settings_registry as _get_settings_registry,
)
from shelfmark.core.user_db import NO_AUTH_ACTIVITY_USERNAME, UserDB
from shelfmark.core.user_db import UserDB
from shelfmark.core.request_policy import parse_policy_mode, validate_policy_rules
@@ -205,8 +205,6 @@ def register_admin_settings_routes(
keys_payload: dict[str, dict[str, Any]] = {}
for user_record in user_db.list_users():
if str(user_record.get("username") or "").strip() == NO_AUTH_ACTIVITY_USERNAME:
continue
user_settings = user_db.get_user_settings(user_record["id"])
if not isinstance(user_settings, dict):
continue

View File

@@ -0,0 +1,406 @@
"""Persistence helpers for flat download terminal history."""
from __future__ import annotations
import os
import sqlite3
import threading
from typing import Any
from shelfmark.core.request_helpers import normalize_optional_positive_int, normalize_optional_text, now_utc_iso
VALID_FINAL_STATUSES = frozenset({"complete", "error", "cancelled"})
VALID_ORIGINS = frozenset({"direct", "request", "requested"})
def _normalize_task_id(task_id: Any) -> str:
normalized = normalize_optional_text(task_id)
if normalized is None:
raise ValueError("task_id must be a non-empty string")
return normalized
def _normalize_origin(origin: Any) -> str:
normalized = normalize_optional_text(origin)
if normalized is None:
return "direct"
lowered = normalized.lower()
if lowered not in VALID_ORIGINS:
raise ValueError("origin must be one of: direct, request, requested")
return lowered
def _normalize_final_status(final_status: Any) -> str:
normalized = normalize_optional_text(final_status)
if normalized is None:
raise ValueError("final_status must be a non-empty string")
lowered = normalized.lower()
if lowered not in VALID_FINAL_STATUSES:
raise ValueError("final_status must be one of: complete, error, cancelled")
return lowered
def _normalize_limit(value: Any, *, default: int, minimum: int, maximum: int) -> int:
if value is None:
return default
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise ValueError("limit must be an integer") from exc
if parsed < minimum:
return minimum
if parsed > maximum:
return maximum
return parsed
def _normalize_offset(value: Any, *, default: int) -> int:
if value is None:
return default
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise ValueError("offset must be an integer") from exc
if parsed < 0:
return 0
return parsed
class DownloadHistoryService:
"""Service for persisted terminal download history and dismissals."""
def __init__(self, db_path: str):
self._db_path = db_path
self._lock = threading.Lock()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
@staticmethod
def _row_to_dict(row: sqlite3.Row | None) -> dict[str, Any] | None:
return dict(row) if row is not None else None
@staticmethod
def _to_item_key(task_id: str) -> str:
return f"download:{task_id}"
@staticmethod
def _resolve_existing_download_path(value: Any) -> str | None:
normalized = normalize_optional_text(value)
if normalized is None:
return None
return normalized if os.path.exists(normalized) else None
@staticmethod
def to_download_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row.get("task_id"),
"title": row.get("title"),
"author": row.get("author"),
"format": row.get("format"),
"size": row.get("size"),
"preview": row.get("preview"),
"content_type": row.get("content_type"),
"source": row.get("source"),
"source_display_name": row.get("source_display_name"),
"status_message": row.get("status_message"),
"download_path": DownloadHistoryService._resolve_existing_download_path(row.get("download_path")),
"user_id": row.get("user_id"),
"username": row.get("username"),
"request_id": row.get("request_id"),
}
@classmethod
def _to_history_row(cls, row: dict[str, Any]) -> dict[str, Any]:
task_id = str(row.get("task_id") or "").strip()
return {
"id": row.get("id"),
"user_id": row.get("user_id"),
"item_type": "download",
"item_key": cls._to_item_key(task_id),
"dismissed_at": row.get("dismissed_at"),
"snapshot": {
"kind": "download",
"download": cls.to_download_payload(row),
},
"origin": row.get("origin"),
"final_status": row.get("final_status"),
"terminal_at": row.get("terminal_at"),
"request_id": row.get("request_id"),
"source_id": task_id or None,
}
def record_terminal(
self,
*,
task_id: str,
user_id: int | None,
username: str | None,
request_id: int | None,
source: str,
source_display_name: str | None,
title: str,
author: str | None,
format: str | None,
size: str | None,
preview: str | None,
content_type: str | None,
origin: str,
final_status: str,
status_message: str | None,
download_path: str | None,
terminal_at: str | None = None,
) -> None:
normalized_task_id = _normalize_task_id(task_id)
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_request_id = normalize_optional_positive_int(request_id, "request_id")
normalized_source = normalize_optional_text(source)
if normalized_source is None:
raise ValueError("source must be a non-empty string")
normalized_title = normalize_optional_text(title)
if normalized_title is None:
raise ValueError("title must be a non-empty string")
normalized_origin = _normalize_origin(origin)
normalized_final_status = _normalize_final_status(final_status)
effective_terminal_at = (
terminal_at
if isinstance(terminal_at, str) and terminal_at.strip()
else now_utc_iso()
)
with self._lock:
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO download_history (
task_id,
user_id,
username,
request_id,
source,
source_display_name,
title,
author,
format,
size,
preview,
content_type,
origin,
final_status,
status_message,
download_path,
terminal_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(task_id) DO UPDATE SET
user_id = excluded.user_id,
username = excluded.username,
request_id = excluded.request_id,
source = excluded.source,
source_display_name = excluded.source_display_name,
title = excluded.title,
author = excluded.author,
format = excluded.format,
size = excluded.size,
preview = excluded.preview,
content_type = excluded.content_type,
origin = excluded.origin,
final_status = excluded.final_status,
status_message = excluded.status_message,
download_path = excluded.download_path,
terminal_at = excluded.terminal_at,
dismissed_at = NULL
""",
(
normalized_task_id,
normalized_user_id,
normalize_optional_text(username),
normalized_request_id,
normalized_source,
normalize_optional_text(source_display_name),
normalized_title,
normalize_optional_text(author),
normalize_optional_text(format),
normalize_optional_text(size),
normalize_optional_text(preview),
normalize_optional_text(content_type),
normalized_origin,
normalized_final_status,
normalize_optional_text(status_message),
normalize_optional_text(download_path),
effective_terminal_at,
),
)
conn.commit()
finally:
conn.close()
def get_by_task_id(self, task_id: str) -> dict[str, Any] | None:
normalized_task_id = _normalize_task_id(task_id)
conn = self._connect()
try:
row = conn.execute(
"SELECT * FROM download_history WHERE task_id = ?",
(normalized_task_id,),
).fetchone()
return self._row_to_dict(row)
finally:
conn.close()
def get_undismissed_terminal(
self,
*,
user_id: int | None,
limit: int = 200,
) -> list[dict[str, Any]]:
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_limit = _normalize_limit(limit, default=200, minimum=1, maximum=1000)
query = "SELECT * FROM download_history WHERE dismissed_at IS NULL"
params: list[Any] = []
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
query += " ORDER BY terminal_at DESC, id DESC LIMIT ?"
params.append(normalized_limit)
conn = self._connect()
try:
rows = conn.execute(query, params).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def get_dismissed_keys(self, *, user_id: int | None, limit: int = 5000) -> list[str]:
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_limit = _normalize_limit(limit, default=5000, minimum=1, maximum=10000)
query = "SELECT task_id FROM download_history WHERE dismissed_at IS NOT NULL"
params: list[Any] = []
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
query += " ORDER BY dismissed_at DESC, id DESC LIMIT ?"
params.append(normalized_limit)
conn = self._connect()
try:
rows = conn.execute(query, params).fetchall()
keys: list[str] = []
for row in rows:
task_id = normalize_optional_text(row["task_id"])
if task_id is not None:
keys.append(task_id)
return keys
finally:
conn.close()
def dismiss(self, *, task_id: str, user_id: int | None) -> int:
normalized_task_id = _normalize_task_id(task_id)
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
query = "UPDATE download_history SET dismissed_at = ? WHERE task_id = ?"
params: list[Any] = [now_utc_iso(), normalized_task_id]
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()
def dismiss_many(self, *, task_ids: list[str], user_id: int | None) -> int:
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_task_ids = [_normalize_task_id(task_id) for task_id in task_ids]
if not normalized_task_ids:
return 0
placeholders = ",".join("?" for _ in normalized_task_ids)
query = f"UPDATE download_history SET dismissed_at = ? WHERE task_id IN ({placeholders})"
params: list[Any] = [now_utc_iso(), *normalized_task_ids]
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()
def get_history(self, *, user_id: int | None, limit: int, offset: int) -> list[dict[str, Any]]:
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_limit = _normalize_limit(limit, default=50, minimum=1, maximum=5000)
normalized_offset = _normalize_offset(offset, default=0)
query = "SELECT * FROM download_history WHERE dismissed_at IS NOT NULL"
params: list[Any] = []
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
query += " ORDER BY dismissed_at DESC, id DESC LIMIT ? OFFSET ?"
params.extend([normalized_limit, normalized_offset])
conn = self._connect()
try:
rows = conn.execute(query, params).fetchall()
payload: list[dict[str, Any]] = []
for row in rows:
row_dict = dict(row)
payload.append(self._to_history_row(row_dict))
return payload
finally:
conn.close()
def clear_dismissed(self, *, user_id: int | None) -> int:
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
query = "DELETE FROM download_history WHERE dismissed_at IS NOT NULL"
params: list[Any] = []
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()
def clear_dismissals_for_active(self, *, task_ids: set[str], user_id: int | None) -> int:
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
normalized_task_ids = [_normalize_task_id(task_id) for task_id in task_ids]
if not normalized_task_ids:
return 0
placeholders = ",".join("?" for _ in normalized_task_ids)
query = f"UPDATE download_history SET dismissed_at = NULL WHERE task_id IN ({placeholders})"
params: list[Any] = [*normalized_task_ids]
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()

View File

@@ -325,40 +325,6 @@ class BookQueue:
return True
return any(status == QueueStatus.QUEUED for status in self._status.values())
def clear_completed(self, user_id: Optional[int] = None) -> int:
"""Remove terminal tasks from tracking, optionally scoped to one user.
Args:
user_id: If provided, only clear tasks belonging to this user.
If None, clear all.
"""
terminal_statuses = {QueueStatus.COMPLETE, QueueStatus.DONE, QueueStatus.AVAILABLE, QueueStatus.ERROR, QueueStatus.CANCELLED}
with self._lock:
to_remove: list[str] = []
for task_id, status in self._status.items():
if status not in terminal_statuses:
continue
if user_id is None:
to_remove.append(task_id)
continue
task = self._task_data.get(task_id)
if task is None:
# Without task ownership metadata we cannot safely scope removal.
continue
if task.user_id == user_id:
to_remove.append(task_id)
for task_id in to_remove:
self._status.pop(task_id, None)
self._status_timestamps.pop(task_id, None)
self._task_data.pop(task_id, None)
self._cancel_flags.pop(task_id, None)
self._active_downloads.pop(task_id, None)
return len(to_remove)
def refresh(self) -> None:
"""Remove any tasks that are done downloading or have stale status."""
terminal_statuses = {QueueStatus.COMPLETE, QueueStatus.DONE, QueueStatus.ERROR, QueueStatus.AVAILABLE, QueueStatus.CANCELLED}

View File

@@ -2,10 +2,39 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from shelfmark.core.logger import setup_logger
from shelfmark.core.settings_registry import load_config_file
_logger = setup_logger(__name__)
def now_utc_iso() -> str:
"""Return the current UTC time as a seconds-precision ISO 8601 string."""
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def emit_ws_event(
ws_manager: Any,
*,
event_name: str,
payload: dict[str, Any],
room: str,
) -> None:
"""Emit a WebSocket event via the shared manager, swallowing failures."""
if ws_manager is None:
return
try:
socketio = getattr(ws_manager, "socketio", None)
is_enabled = getattr(ws_manager, "is_enabled", None)
if socketio is None or not callable(is_enabled) or not is_enabled():
return
socketio.emit(event_name, payload, to=room)
except Exception as exc:
_logger.warning("Failed to emit WebSocket event '%s' to room '%s': %s", event_name, room, exc)
def load_users_request_policy_settings() -> dict[str, Any]:
"""Load global request-policy settings from the users config file."""
@@ -36,13 +65,45 @@ def coerce_int(value: Any, default: int) -> int:
def normalize_optional_text(value: Any) -> str | None:
"""Return a trimmed string or None for empty/non-string input."""
if not isinstance(value, str):
"""Return a trimmed string or None for empty/missing input.
Non-string values are coerced via ``str()`` before stripping;
``None`` short-circuits to ``None``.
"""
if value is None:
return None
if not isinstance(value, str):
value = str(value)
normalized = value.strip()
return normalized or None
def normalize_positive_int(value: Any) -> int | None:
"""Parse *value* as a positive integer, returning ``None`` on failure."""
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def normalize_optional_positive_int(value: Any, field_name: str = "value") -> int | None:
"""Parse *value* as a positive integer or ``None``.
Raises ``ValueError`` when *value* is present but not a valid
positive integer.
"""
if value is None:
return None
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{field_name} must be a positive integer when provided") from exc
if parsed < 1:
raise ValueError(f"{field_name} must be a positive integer when provided")
return parsed
def extract_release_source_id(release_data: Any) -> str | None:
"""Extract and normalize release_data.source_id."""
if not isinstance(release_data, dict):

View File

@@ -24,7 +24,6 @@ from shelfmark.core.requests_service import (
fulfil_request,
reject_request,
)
from shelfmark.core.activity_service import ActivityService, build_request_item_key
from shelfmark.core.notifications import (
NotificationContext,
NotificationEvent,
@@ -34,7 +33,7 @@ from shelfmark.core.notifications import (
from shelfmark.core.request_helpers import (
coerce_bool,
coerce_int,
extract_release_source_id,
emit_ws_event,
load_users_request_policy_settings,
normalize_optional_text,
)
@@ -101,68 +100,6 @@ def _resolve_effective_policy(
return global_settings, user_settings, effective, requests_enabled
def _emit_request_event(
ws_manager: Any,
*,
event_name: str,
payload: dict[str, Any],
room: str,
) -> None:
if ws_manager is None:
return
try:
socketio = getattr(ws_manager, "socketio", None)
is_enabled = getattr(ws_manager, "is_enabled", None)
if socketio is None or not callable(is_enabled) or not is_enabled():
return
socketio.emit(event_name, payload, to=room)
except Exception as exc:
logger.warning(f"Failed to emit WebSocket event '{event_name}' to room '{room}': {exc}")
def _record_terminal_request_snapshot(
activity_service: ActivityService | None,
*,
request_row: dict[str, Any],
) -> None:
if activity_service is None:
return
request_status = request_row.get("status")
if request_status not in {"rejected", "cancelled"}:
return
raw_request_id = request_row.get("id")
try:
request_id = int(raw_request_id)
except (TypeError, ValueError):
return
if request_id < 1:
return
raw_user_id = request_row.get("user_id")
try:
user_id = int(raw_user_id)
except (TypeError, ValueError):
user_id = None
source_id = extract_release_source_id(request_row.get("release_data"))
try:
activity_service.record_terminal_snapshot(
user_id=user_id,
item_type="request",
item_key=build_request_item_key(request_id),
origin="request",
final_status=request_status,
snapshot={"kind": "request", "request": request_row},
request_id=request_id,
source_id=source_id,
)
except Exception as exc:
logger.warning("Failed to record terminal request snapshot for request %s: %s", request_id, exc)
def _resolve_title_from_book_data(book_data: Any) -> str:
if isinstance(book_data, dict):
title = normalize_optional_text(book_data.get("title"))
@@ -339,7 +276,6 @@ def register_request_routes(
*,
resolve_auth_mode: Callable[[], str],
queue_release: Callable[..., tuple[bool, str | None]],
activity_service: ActivityService | None = None,
ws_manager: Any | None = None,
) -> None:
"""Register request policy and request lifecycle routes."""
@@ -560,13 +496,13 @@ def register_request_routes(
event_payload["title"],
actor_label,
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="new_request",
payload=event_payload,
room="admins",
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,
@@ -626,8 +562,6 @@ def register_request_routes(
except RequestServiceError as exc:
return _error_response(str(exc), exc.status_code, code=exc.code)
_record_terminal_request_snapshot(activity_service, request_row=updated)
event_payload = {
"request_id": updated["id"],
"status": updated["status"],
@@ -640,13 +574,13 @@ def register_request_routes(
event_payload["title"],
actor_label,
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,
room=f"user_{db_user_id}",
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,
@@ -752,13 +686,13 @@ def register_request_routes(
admin_label,
requester_label,
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,
room=f"user_{updated['user_id']}",
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,
@@ -803,8 +737,6 @@ def register_request_routes(
except RequestServiceError as exc:
return _error_response(str(exc), exc.status_code, code=exc.code)
_record_terminal_request_snapshot(activity_service, request_row=updated)
event_payload = {
"request_id": updated["id"],
"status": updated["status"],
@@ -822,13 +754,13 @@ def register_request_routes(
admin_label,
requester_label,
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,
room=f"user_{updated['user_id']}",
)
_emit_request_event(
emit_ws_event(
ws_manager,
event_name="request_update",
payload=event_payload,

View File

@@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional
from shelfmark.core.auth_modes import AUTH_SOURCE_BUILTIN, AUTH_SOURCE_SET
from shelfmark.core.logger import setup_logger
from shelfmark.core.request_helpers import normalize_optional_positive_int
from shelfmark.core.request_validation import (
normalize_delivery_state,
normalize_policy_mode,
@@ -18,8 +19,6 @@ from shelfmark.core.request_validation import (
validate_status_transition,
)
NO_AUTH_ACTIVITY_USERNAME = "__shelfmark_noauth_activity__"
logger = setup_logger(__name__)
_CREATE_TABLES_SQL = """
@@ -56,7 +55,8 @@ CREATE TABLE IF NOT EXISTS download_requests (
reviewed_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
reviewed_at TIMESTAMP,
delivery_updated_at TIMESTAMP
delivery_updated_at TIMESTAMP,
dismissed_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_download_requests_user_status_created_at
@@ -65,38 +65,33 @@ ON download_requests (user_id, status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_download_requests_status_created_at
ON download_requests (status, created_at DESC);
CREATE TABLE IF NOT EXISTS activity_log (
CREATE TABLE IF NOT EXISTS download_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
task_id TEXT UNIQUE NOT NULL,
user_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
username TEXT,
request_id INTEGER,
source_id TEXT,
origin TEXT NOT NULL,
source TEXT NOT NULL,
source_display_name TEXT,
title TEXT NOT NULL,
author TEXT,
format TEXT,
size TEXT,
preview TEXT,
content_type TEXT,
origin TEXT NOT NULL DEFAULT 'direct',
final_status TEXT NOT NULL,
snapshot_json TEXT NOT NULL,
terminal_at TIMESTAMP NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
status_message TEXT,
download_path TEXT,
terminal_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
dismissed_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_activity_log_user_terminal
ON activity_log (user_id, terminal_at DESC);
CREATE INDEX IF NOT EXISTS idx_download_history_user_status
ON download_history (user_id, final_status, terminal_at DESC);
CREATE INDEX IF NOT EXISTS idx_activity_log_lookup
ON activity_log (user_id, item_type, item_key, id DESC);
CREATE TABLE IF NOT EXISTS activity_dismissals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
activity_log_id INTEGER REFERENCES activity_log(id) ON DELETE SET NULL,
dismissed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, item_type, item_key)
);
CREATE INDEX IF NOT EXISTS idx_activity_dismissals_user_dismissed_at
ON activity_dismissals (user_id, dismissed_at DESC);
CREATE INDEX IF NOT EXISTS idx_download_history_dismissed
ON download_history (dismissed_at) WHERE dismissed_at IS NOT NULL;
"""
@@ -166,7 +161,7 @@ class UserDB:
conn.executescript(_CREATE_TABLES_SQL)
self._migrate_auth_source_column(conn)
self._migrate_request_delivery_columns(conn)
self._migrate_activity_tables(conn)
self._migrate_download_requests_dismissed_at(conn)
conn.commit()
# WAL mode must be changed outside an open transaction.
conn.execute("PRAGMA journal_mode=WAL")
@@ -235,50 +230,17 @@ class UserDB:
"""
)
def _migrate_activity_tables(self, conn: sqlite3.Connection) -> None:
"""Ensure activity log and dismissal tables exist with current columns/indexes."""
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS activity_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
request_id INTEGER,
source_id TEXT,
origin TEXT NOT NULL,
final_status TEXT NOT NULL,
snapshot_json TEXT NOT NULL,
terminal_at TIMESTAMP NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_activity_log_user_terminal
ON activity_log (user_id, terminal_at DESC);
CREATE INDEX IF NOT EXISTS idx_activity_log_lookup
ON activity_log (user_id, item_type, item_key, id DESC);
CREATE TABLE IF NOT EXISTS activity_dismissals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
activity_log_id INTEGER REFERENCES activity_log(id) ON DELETE SET NULL,
dismissed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, item_type, item_key)
);
CREATE INDEX IF NOT EXISTS idx_activity_dismissals_user_dismissed_at
ON activity_dismissals (user_id, dismissed_at DESC);
"""
def _migrate_download_requests_dismissed_at(self, conn: sqlite3.Connection) -> None:
"""Ensure download_requests.dismissed_at exists for request dismissal history."""
columns = conn.execute("PRAGMA table_info(download_requests)").fetchall()
column_names = {str(col["name"]) for col in columns}
if "dismissed_at" not in column_names:
conn.execute("ALTER TABLE download_requests ADD COLUMN dismissed_at TIMESTAMP")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_download_requests_dismissed "
"ON download_requests (dismissed_at) WHERE dismissed_at IS NOT NULL"
)
dismissal_columns = conn.execute("PRAGMA table_info(activity_dismissals)").fetchall()
dismissal_column_names = {str(col["name"]) for col in dismissal_columns}
if "activity_log_id" not in dismissal_column_names:
conn.execute("ALTER TABLE activity_dismissals ADD COLUMN activity_log_id INTEGER")
def create_user(
self,
username: str,
@@ -634,6 +596,7 @@ class UserDB:
"delivery_state",
"delivery_updated_at",
"last_failure_reason",
"dismissed_at",
}
def update_request(
@@ -693,6 +656,11 @@ class UserDB:
if delivery_updated_at is not None and not isinstance(delivery_updated_at, str):
raise ValueError("delivery_updated_at must be a string when provided")
if "dismissed_at" in updates:
dismissed_at = updates["dismissed_at"]
if dismissed_at is not None and not isinstance(dismissed_at, str):
raise ValueError("dismissed_at must be a string when provided")
if "content_type" in updates and not updates["content_type"]:
raise ValueError("content_type is required")
@@ -831,3 +799,88 @@ class UserDB:
return int(row["count"]) if row else 0
finally:
conn.close()
def list_dismissed_requests(self, *, user_id: int | None, limit: int | None = None) -> List[Dict[str, Any]]:
"""List dismissed requests, optionally scoped by owner user_id."""
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
params: list[Any] = []
query = "SELECT * FROM download_requests WHERE dismissed_at IS NOT NULL"
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
query += " ORDER BY dismissed_at DESC, id DESC"
if limit is not None and limit > 0:
query += " LIMIT ?"
params.append(limit)
conn = self._connect()
try:
rows = conn.execute(query, params).fetchall()
results: List[Dict[str, Any]] = []
for row in rows:
parsed = self._parse_request_row(row)
if parsed is not None:
results.append(parsed)
return results
finally:
conn.close()
def dismiss_requests_batch(self, *, request_ids: list[int], dismissed_at: str) -> int:
"""Set dismissed_at on multiple requests in a single UPDATE."""
if not request_ids:
return 0
placeholders = ",".join("?" for _ in request_ids)
query = f"UPDATE download_requests SET dismissed_at = ? WHERE id IN ({placeholders})"
params: list[Any] = [dismissed_at, *request_ids]
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()
def clear_request_dismissals(self, *, user_id: int | None) -> int:
"""Clear dismissed_at on requests, optionally scoped by owner user_id."""
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
params: list[Any] = []
query = "UPDATE download_requests SET dismissed_at = NULL WHERE dismissed_at IS NOT NULL"
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()
def delete_dismissed_requests(self, *, user_id: int | None) -> int:
"""Delete dismissed terminal requests, optionally scoped by owner user_id."""
normalized_user_id = normalize_optional_positive_int(user_id, "user_id")
params: list[Any] = []
query = """
DELETE FROM download_requests
WHERE dismissed_at IS NOT NULL
AND status IN ('fulfilled', 'rejected', 'cancelled')
"""
if normalized_user_id is not None:
query += " AND user_id = ?"
params.append(normalized_user_id)
with self._lock:
conn = self._connect()
try:
cursor = conn.execute(query, params)
conn.commit()
rowcount = int(cursor.rowcount) if cursor.rowcount is not None else 0
return max(rowcount, 0)
finally:
conn.close()

View File

@@ -640,10 +640,6 @@ def get_active_downloads() -> List[str]:
"""Get list of currently active downloads."""
return book_queue.get_active_downloads()
def clear_completed(user_id: Optional[int] = None) -> int:
"""Clear completed downloads from tracking (optionally user-scoped)."""
return book_queue.clear_completed(user_id=user_id)
def _cleanup_progress_tracking(task_id: str) -> None:
"""Clean up progress tracking data for a completed/cancelled download."""
with _progress_lock:

View File

@@ -49,7 +49,7 @@ from shelfmark.core.requests_service import (
reopen_failed_request,
sync_delivery_states_from_queue_status,
)
from shelfmark.core.activity_service import ActivityService, build_download_item_key
from shelfmark.core.download_history_service import DownloadHistoryService
from shelfmark.core.notifications import NotificationContext, NotificationEvent, notify_admin, notify_user
from shelfmark.core.request_helpers import (
coerce_bool,
@@ -127,11 +127,11 @@ import os as _os
from shelfmark.core.user_db import UserDB
_user_db_path = _os.path.join(_os.environ.get("CONFIG_DIR", "/config"), "users.db")
user_db: UserDB | None = None
activity_service: ActivityService | None = None
download_history_service: DownloadHistoryService | None = None
try:
user_db = UserDB(_user_db_path)
user_db.initialize()
activity_service = ActivityService(_user_db_path)
download_history_service = DownloadHistoryService(_user_db_path)
import shelfmark.config.users_settings as _ # noqa: F401 - registers users tab
from shelfmark.core.oidc_routes import register_oidc_routes
from shelfmark.core.admin_routes import register_admin_routes
@@ -146,6 +146,7 @@ except (sqlite3.OperationalError, OSError) as e:
f"Ensure CONFIG_DIR ({_os.environ.get('CONFIG_DIR', '/config')}) exists and is writable."
)
user_db = None
download_history_service = None
# Start download coordinator
backend.start()
@@ -404,14 +405,13 @@ if user_db is not None:
user_db,
resolve_auth_mode=lambda: get_auth_mode(),
queue_release=lambda *args, **kwargs: backend.queue_release(*args, **kwargs),
activity_service=activity_service,
ws_manager=ws_manager,
)
if activity_service is not None:
if download_history_service is not None:
register_activity_routes(
app,
user_db,
activity_service=activity_service,
download_history_service=download_history_service,
resolve_auth_mode=lambda: get_auth_mode(),
resolve_status_scope=lambda: _resolve_status_scope(),
queue_status=lambda user_id=None: backend.queue_status(user_id=user_id),
@@ -1214,24 +1214,28 @@ def _record_download_terminal_snapshot(task_id: str, status: QueueStatus, task:
"username": getattr(task, "username", None),
}
snapshot: dict[str, Any] = {"kind": "download", "download": download_payload}
if linked_request is not None:
snapshot["request"] = linked_request
if activity_service is not None:
if download_history_service is not None:
try:
activity_service.record_terminal_snapshot(
download_history_service.record_terminal(
user_id=owner_user_id,
item_type="download",
item_key=build_download_item_key(task_id),
task_id=task_id,
username=normalize_optional_text(getattr(task, "username", None)),
request_id=request_id,
source=normalize_source(getattr(task, "source", None)),
source_display_name=download_payload.get("source_display_name"),
title=str(download_payload.get("title") or getattr(task, "title", "Unknown title")),
author=normalize_optional_text(download_payload.get("author")),
format=normalize_optional_text(download_payload.get("format")),
size=normalize_optional_text(download_payload.get("size")),
preview=normalize_optional_text(download_payload.get("preview")),
content_type=normalize_optional_text(download_payload.get("content_type")),
origin=origin,
final_status=final_status,
snapshot=snapshot,
request_id=request_id,
source_id=task_id,
status_message=normalize_optional_text(download_payload.get("status_message")),
download_path=normalize_optional_text(download_payload.get("download_path")),
)
except Exception as exc:
logger.warning("Failed to record terminal download snapshot for task %s: %s", task_id, exc)
logger.warning("Failed to record terminal download history for task %s: %s", task_id, exc)
if user_db is None or linked_request is None or request_id is None or status != QueueStatus.ERROR:
return
@@ -1362,9 +1366,27 @@ def api_local_download() -> Union[Response, Tuple[Response, int]]:
try:
file_data, book_info = backend.get_book_data(book_id)
if file_data is None:
# Fallback for dismissed/history entries where queue task may no longer exist.
if download_history_service is not None:
is_admin, db_user_id, can_access_status = _resolve_status_scope()
if can_access_status:
history_row = download_history_service.get_by_task_id(book_id)
if history_row is not None:
owner_user_id = history_row.get("user_id")
if is_admin or owner_user_id == db_user_id:
download_path = DownloadHistoryService._resolve_existing_download_path(
history_row.get("download_path")
)
if download_path:
return send_file(
download_path,
download_name=os.path.basename(download_path),
as_attachment=True,
)
# Book data not found or not available
return jsonify({"error": "File not found"}), 404
file_name = book_info.get_filename()
file_name = book_info.get_filename() if book_info is not None else os.path.basename(book_id)
# Prepare the file for sending to the client
data = io.BytesIO(file_data)
return send_file(
@@ -1639,32 +1661,6 @@ def api_active_downloads() -> Union[Response, Tuple[Response, int]]:
logger.error_trace(f"Active downloads error: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/queue/clear', methods=['DELETE'])
@login_required
def api_clear_completed() -> Union[Response, Tuple[Response, int]]:
"""
Clear all completed, errored, or cancelled books from tracking.
Returns:
flask.Response: JSON with count of removed books.
"""
try:
is_admin, db_user_id, can_access_status = _resolve_status_scope()
if not can_access_status:
return jsonify({"error": "User identity unavailable", "code": "user_identity_unavailable"}), 403
scoped_user_id = None if is_admin else db_user_id
removed_count = backend.clear_completed(user_id=scoped_user_id)
# Broadcast status update after clearing
if ws_manager:
ws_manager.broadcast_status_update(backend.queue_status())
return jsonify({"status": "cleared", "removed_count": removed_count})
except Exception as e:
logger.error_trace(f"Clear completed error: {e}")
return jsonify({"error": str(e)}), 500
@app.errorhandler(404)
def not_found_error(error: Exception) -> Union[Response, Tuple[Response, int]]:
"""

View File

@@ -135,6 +135,17 @@ type PendingOnBehalfDownload =
actingAsUser: ActingAsUserSelection;
};
const mergeTerminalBucket = (
persistedBucket: Record<string, Book> | undefined,
realtimeBucket: Record<string, Book> | undefined
): Record<string, Book> | undefined => {
const merged = {
...(persistedBucket || {}),
...(realtimeBucket || {}),
};
return Object.keys(merged).length > 0 ? merged : undefined;
};
function App() {
const { toasts, showToast, removeToast } = useToast();
const { socket } = useSocket();
@@ -227,6 +238,7 @@ function App() {
});
const {
activityStatus,
requestItems,
dismissedActivityKeys,
historyItems,
@@ -286,6 +298,18 @@ function App() {
};
}, [currentStatus, dismissedDownloadTaskIds]);
// Use real-time buckets for active work and merge persisted terminal buckets
// so completed/errored entries survive restarts.
const activitySidebarStatus = useMemo<StatusData>(() => ({
queued: currentStatus.queued,
resolving: currentStatus.resolving,
locating: currentStatus.locating,
downloading: currentStatus.downloading,
complete: mergeTerminalBucket(activityStatus.complete, currentStatus.complete),
error: mergeTerminalBucket(activityStatus.error, currentStatus.error),
cancelled: mergeTerminalBucket(activityStatus.cancelled, currentStatus.cancelled),
}), [activityStatus, currentStatus]);
const showRequestsTab = useMemo(() => {
if (requestRoleIsAdmin) {
return true;
@@ -425,14 +449,14 @@ function App() {
};
const ongoing = [
currentStatus.queued,
currentStatus.resolving,
currentStatus.locating,
currentStatus.downloading,
activitySidebarStatus.queued,
activitySidebarStatus.resolving,
activitySidebarStatus.locating,
activitySidebarStatus.downloading,
].reduce((sum, status) => sum + countVisibleDownloads(status, { filterDismissed: false }), 0);
const completed = countVisibleDownloads(currentStatus.complete, { filterDismissed: true });
const errored = countVisibleDownloads(currentStatus.error, { filterDismissed: true });
const completed = countVisibleDownloads(activitySidebarStatus.complete, { filterDismissed: true });
const errored = countVisibleDownloads(activitySidebarStatus.error, { filterDismissed: true });
const pendingVisibleRequests = requestItems.filter((item) => {
const requestId = item.requestId;
if (!requestId || item.requestRecord?.status !== 'pending') {
@@ -447,7 +471,7 @@ function App() {
errored,
pendingRequests: pendingVisibleRequests,
};
}, [currentStatus, dismissedActivityKeys, requestItems]);
}, [activitySidebarStatus, dismissedActivityKeys, requestItems]);
// Compute visibility states
@@ -1602,7 +1626,7 @@ function App() {
<ActivitySidebar
isOpen={downloadsSidebarOpen}
onClose={() => setDownloadsSidebarOpen(false)}
status={currentStatus}
status={activitySidebarStatus}
isAdmin={requestRoleIsAdmin}
onClearCompleted={handleClearCompleted}
onCancel={handleCancel}

View File

@@ -510,11 +510,14 @@ export const ActivityCard = ({
? 'text-sm leading-tight min-w-0 whitespace-normal break-words'
: 'text-sm truncate leading-tight min-w-0';
const titleNode =
const canShowDownloadLink =
item.kind === 'download' &&
item.visualStatus === 'complete' &&
item.downloadPath &&
item.downloadBookId ? (
Boolean(item.downloadBookId) &&
Boolean(item.downloadPath);
const titleNode =
canShowDownloadLink && item.downloadBookId ? (
<a
href={withBasePath(`/api/localdownload?id=${encodeURIComponent(item.downloadBookId)}`)}
className="text-sky-600 hover:underline"

View File

@@ -28,6 +28,19 @@ const toNumber = (value: unknown, fallback = 0): number => {
return fallback;
};
const toEpochMillis = (value: unknown): number => {
const parsed = toNumber(value, 0);
if (parsed <= 0) {
return 0;
}
// Queue `added_time` values are epoch seconds; request/history timestamps are ms.
// Normalize to ms so sorting is consistent across merged activity items.
if (parsed >= 1_000_000_000 && parsed < 1_000_000_000_000) {
return parsed * 1000;
}
return parsed;
};
const toSourceLabel = (value: unknown): string | undefined => {
const text = toOptionalText(value);
if (!text) {
@@ -94,7 +107,7 @@ export const downloadToActivityItem = (book: Book, statusKey: DownloadStatusKey)
statusDetail,
progress,
progressAnimated: isActiveDownloadStatus(visualStatus),
timestamp: toNumber(book.added_time, 0),
timestamp: toEpochMillis(book.added_time),
username: toOptionalText(book.username),
downloadBookId: book.id,
downloadPath: toOptionalText(book.download_path),

View File

@@ -282,20 +282,35 @@ export const useActivity = ({
.map((row) => mapHistoryRowToActivityItem(row, isAdmin ? 'admin' : 'user'))
.sort((left, right) => right.timestamp - left.timestamp);
// Request history entries with attached release metadata are approval artifacts,
// not actual download outcomes. Keep history focused on concrete download results.
const nonAttachedReleaseRequestItems = mappedItems.filter((item) => {
if (item.kind !== 'request') {
return true;
}
const releaseData = item.requestRecord?.release_data;
if (!releaseData || typeof releaseData !== 'object') {
return true;
}
return Object.keys(releaseData as Record<string, unknown>).length === 0;
});
// Download dismissals already carry linked request context; hide redundant
// fulfilled-request history rows that would otherwise appear as "Approved".
const requestIdsWithDownloadRows = new Set<number>();
mappedItems.forEach((item) => {
nonAttachedReleaseRequestItems.forEach((item) => {
if (item.kind === 'download' && typeof item.requestId === 'number') {
requestIdsWithDownloadRows.add(item.requestId);
}
});
if (!requestIdsWithDownloadRows.size) {
return mappedItems;
return nonAttachedReleaseRequestItems;
}
return mappedItems.filter((item) => {
return nonAttachedReleaseRequestItems.filter((item) => {
if (item.kind !== 'request' || typeof item.requestId !== 'number') {
return true;
}
@@ -389,12 +404,16 @@ export const useActivity = ({
const handleClearHistory = useCallback(() => {
resetActivityHistory();
void clearActivityHistory().catch((error) => {
console.error('Clear history failed:', error);
void refreshActivityHistory();
showToast('Failed to clear history', 'error');
});
}, [refreshActivityHistory, resetActivityHistory, showToast]);
void clearActivityHistory()
.then(() => {
void refreshActivitySnapshot();
})
.catch((error) => {
console.error('Clear history failed:', error);
void refreshActivityHistory();
showToast('Failed to clear history', 'error');
});
}, [refreshActivityHistory, refreshActivitySnapshot, resetActivityHistory, showToast]);
return {
activityStatus,

View File

@@ -35,7 +35,6 @@ const API = {
cancelDownload: `${API_BASE}/download`,
retryDownload: `${API_BASE}/download`,
setPriority: `${API_BASE}/queue`,
clearCompleted: `${API_BASE}/queue/clear`,
config: `${API_BASE}/config`,
login: `${API_BASE}/auth/login`,
logout: `${API_BASE}/auth/logout`,
@@ -402,10 +401,6 @@ export const retryDownload = async (id: string): Promise<void> => {
await fetchJSON(`${API.retryDownload}/${encodeURIComponent(id)}/retry`, { method: 'POST' });
};
export const clearCompleted = async (): Promise<void> => {
await fetchJSON(`${API_BASE}/queue/clear`, { method: 'DELETE' });
};
export const getConfig = async (): Promise<AppConfig> => {
return fetchJSON<AppConfig>(API.config);
};
@@ -432,7 +427,6 @@ export interface ActivitySnapshotResponse {
export interface ActivityDismissPayload {
item_type: 'download' | 'request';
item_key: string;
activity_log_id?: number;
}
export interface ActivityHistoryItem {
@@ -440,7 +434,6 @@ export interface ActivityHistoryItem {
user_id: number;
item_type: 'download' | 'request';
item_key: string;
activity_log_id: number | null;
dismissed_at: string;
snapshot: Record<string, unknown> | null;
origin: 'direct' | 'request' | 'requested' | null;

View File

@@ -80,6 +80,17 @@ describe('activityMappers.downloadToActivityItem', () => {
assert.equal(item.timestamp, 123);
});
it('normalizes epoch-second added_time values to milliseconds', () => {
const item = downloadToActivityItem(
makeBook({
added_time: 1700000000,
}),
'complete'
);
assert.equal(item.timestamp, 1700000000000);
});
it('maps downloading progress using 20 + progress*0.8', () => {
const item = downloadToActivityItem(makeBook({ progress: 60 }), 'downloading');
assert.equal(item.visualStatus, 'downloading');

View File

@@ -254,10 +254,8 @@ docker ps | grep -E "qbittorrent|transmission|deluge|nzbget|sabnzbd"
```
### Stale test data
Restart the container to reset the in-memory queue between test runs:
```bash
# Clear the queue between test runs
docker exec test-cwabd python3 -c "
from shelfmark.core.queue import book_queue
book_queue.clear()
"
docker restart test-cwabd
```

View File

@@ -39,6 +39,41 @@ def _create_user(main_module, *, prefix: str, role: str = "user") -> dict:
return main_module.user_db.create_user(username=username, role=role)
def _record_terminal_download(
main_module,
*,
task_id: str,
user_id: int | None,
username: str | None,
title: str = "Recorded Download",
author: str = "Recorded Author",
source: str = "direct_download",
source_display_name: str = "Direct Download",
origin: str = "direct",
final_status: str = "complete",
request_id: int | None = None,
status_message: str | None = None,
) -> None:
main_module.download_history_service.record_terminal(
task_id=task_id,
user_id=user_id,
username=username,
request_id=request_id,
source=source,
source_display_name=source_display_name,
title=title,
author=author,
format="epub",
size="1 MB",
preview=None,
content_type="ebook",
origin=origin,
final_status=final_status,
status_message=status_message,
download_path=None,
)
def _sample_status_payload() -> dict:
return {
"queued": {},
@@ -87,14 +122,15 @@ class TestActivityRoutes:
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
main_module.activity_service.record_terminal_snapshot(
_record_terminal_download(
main_module,
task_id="test-task",
user_id=user["id"],
item_type="download",
item_key="download:test-task",
username=user["username"],
title="Dismiss Me",
origin="requested",
final_status="complete",
source_id="test-task",
snapshot={"title": "Dismiss Me"},
request_id=12,
status_message="Complete",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
@@ -116,7 +152,8 @@ class TestActivityRoutes:
assert history_response.status_code == 200
assert len(history_response.json) == 1
assert history_response.json[0]["item_key"] == "download:test-task"
assert history_response.json[0]["snapshot"] == {"title": "Dismiss Me"}
assert history_response.json[0]["snapshot"]["kind"] == "download"
assert history_response.json[0]["snapshot"]["download"]["title"] == "Dismiss Me"
assert clear_history_response.status_code == 200
assert clear_history_response.json["status"] == "cleared"
@@ -125,10 +162,62 @@ class TestActivityRoutes:
assert history_after_clear.status_code == 200
assert history_after_clear.json == []
def test_clear_history_deletes_dismissed_requests_from_snapshot(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
request_row = main_module.user_db.create_request(
user_id=user["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data={
"title": "Dismissed Request",
"author": "Request Author",
"provider": "openlibrary",
"provider_id": "dismissed-request",
},
status="rejected",
)
request_key = f"request:{request_row['id']}"
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
dismiss_response = client.post(
"/api/activity/dismiss",
json={"item_type": "request", "item_key": request_key},
)
history_before_clear = client.get("/api/activity/history?limit=10&offset=0")
clear_history_response = client.delete("/api/activity/history")
history_after_clear = client.get("/api/activity/history?limit=10&offset=0")
with patch.object(main_module.backend, "queue_status", return_value=_sample_status_payload()):
snapshot_after_clear = client.get("/api/activity/snapshot")
assert dismiss_response.status_code == 200
assert history_before_clear.status_code == 200
assert any(row["item_key"] == request_key for row in history_before_clear.json)
assert clear_history_response.status_code == 200
assert clear_history_response.json["status"] == "cleared"
assert history_after_clear.status_code == 200
assert history_after_clear.json == []
assert snapshot_after_clear.status_code == 200
assert all(row["id"] != request_row["id"] for row in snapshot_after_clear.json["requests"])
assert {"item_type": "request", "item_key": request_key} not in snapshot_after_clear.json["dismissed"]
def test_admin_snapshot_includes_admin_viewer_dismissals(self, main_module, client):
admin = _create_user(main_module, prefix="admin", role="admin")
_set_session(client, user_id=admin["username"], db_user_id=admin["id"], is_admin=True)
_record_terminal_download(
main_module,
task_id="admin-visible-task",
user_id=admin["id"],
username=admin["username"],
title="Admin Visible",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
dismiss_response = client.post(
"/api/activity/dismiss",
@@ -144,6 +233,52 @@ class TestActivityRoutes:
"item_key": "download:admin-visible-task",
} in snapshot_response.json["dismissed"]
def test_localdownload_falls_back_to_download_history_file(self, main_module, client, tmp_path):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
task_id = "history-localdownload-task"
file_path = tmp_path / "history-fallback.epub"
file_bytes = b"history download payload"
file_path.write_bytes(file_bytes)
_record_terminal_download(
main_module,
task_id=task_id,
user_id=user["id"],
username=user["username"],
title="History Local Download",
)
row = main_module.download_history_service.get_by_task_id(task_id)
assert row is not None
assert main_module.download_history_service is not None
main_module.download_history_service.record_terminal(
task_id=task_id,
user_id=user["id"],
username=user["username"],
request_id=row.get("request_id"),
source=row.get("source") or "direct_download",
source_display_name=row.get("source_display_name"),
title=row.get("title") or "History Local Download",
author=row.get("author"),
format=row.get("format"),
size=row.get("size"),
preview=row.get("preview"),
content_type=row.get("content_type"),
origin=row.get("origin") or "direct",
final_status=row.get("final_status") or "complete",
status_message=row.get("status_message"),
download_path=str(file_path),
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
response = client.get(f"/api/localdownload?id={task_id}")
assert response.status_code == 200
assert response.data == file_bytes
assert "attachment" in response.headers.get("Content-Disposition", "").lower()
def test_dismiss_legacy_fulfilled_request_creates_minimal_history_snapshot(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
@@ -198,13 +333,19 @@ class TestActivityRoutes:
def test_dismiss_emits_activity_update_to_user_room(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
_record_terminal_download(
main_module,
task_id="emit-task",
user_id=user["id"],
username=user["username"],
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.ws_manager, "is_enabled", return_value=True):
with patch.object(main_module.ws_manager.socketio, "emit") as mock_emit:
response = client.post(
"/api/activity/dismiss",
json={"item_type": "download", "item_key": "download:test-task"},
json={"item_type": "download", "item_key": "download:emit-task"},
)
assert response.status_code == 200
@@ -215,7 +356,15 @@ class TestActivityRoutes:
)
def test_no_auth_dismiss_many_and_history_use_shared_identity(self, main_module):
item_key = f"download:no-auth-{uuid.uuid4().hex[:10]}"
task_id = f"no-auth-{uuid.uuid4().hex[:10]}"
item_key = f"download:{task_id}"
_record_terminal_download(
main_module,
task_id=task_id,
user_id=None,
username=None,
title="No Auth",
)
client_one = main_module.app.test_client()
client_two = main_module.app.test_client()
@@ -247,7 +396,16 @@ class TestActivityRoutes:
stale_db_user_id = 999999999
_set_session(client, user_id="stale-session-user", db_user_id=stale_db_user_id, is_admin=False)
item_key = f"download:no-auth-stale-{uuid.uuid4().hex[:8]}"
task_id = f"no-auth-stale-{uuid.uuid4().hex[:8]}"
item_key = f"download:{task_id}"
_record_terminal_download(
main_module,
task_id=task_id,
user_id=None,
username=None,
title="No Auth Stale",
)
with patch.object(main_module, "get_auth_mode", return_value="none"):
response = client.post(
"/api/activity/dismiss-many",
@@ -257,10 +415,8 @@ class TestActivityRoutes:
assert response.status_code == 200
assert response.json["status"] == "dismissed"
no_auth_user = main_module.user_db.get_user(username="__shelfmark_noauth_activity__")
assert no_auth_user is not None
dismissals = main_module.activity_service.get_dismissal_set(no_auth_user["id"])
assert {"item_type": "download", "item_key": item_key} in dismissals
dismissals = main_module.download_history_service.get_dismissed_keys(user_id=None)
assert task_id in dismissals
def test_no_auth_dismiss_many_uses_shared_identity_even_with_valid_session_db_user(
self,
@@ -275,7 +431,15 @@ class TestActivityRoutes:
is_admin=False,
)
item_key = f"download:no-auth-valid-{uuid.uuid4().hex[:8]}"
task_id = f"no-auth-valid-{uuid.uuid4().hex[:8]}"
item_key = f"download:{task_id}"
_record_terminal_download(
main_module,
task_id=task_id,
user_id=None,
username=None,
title="No Auth Valid",
)
other_client = main_module.app.test_client()
with patch.object(main_module, "get_auth_mode", return_value="none"):
@@ -302,61 +466,18 @@ class TestActivityRoutes:
assert response.status_code == 403
assert response.json["code"] == "user_identity_unavailable"
def test_queue_clear_does_not_set_request_delivery_state_to_cleared(self, main_module, client):
def test_snapshot_backfills_undismissed_terminal_download_from_download_history(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
request_row = main_module.user_db.create_request(
_record_terminal_download(
main_module,
task_id="expired-task-1",
user_id=user["id"],
content_type="ebook",
request_level="release",
policy_mode="request_release",
book_data={
"title": "Queue Clear Book",
"author": "Queue Clear Author",
"provider": "openlibrary",
"provider_id": "clear-1",
},
release_data={
"source": "prowlarr",
"source_id": "clear-task-1",
"title": "Queue Clear Book.epub",
},
status="fulfilled",
delivery_state="complete",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.backend, "clear_completed", return_value=1):
response = client.delete("/api/queue/clear")
assert response.status_code == 200
updated_request = main_module.user_db.get_request(request_row["id"])
assert updated_request is not None
assert updated_request["delivery_state"] == "complete"
def test_snapshot_backfills_undismissed_terminal_download_from_activity_log(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
main_module.activity_service.record_terminal_snapshot(
user_id=user["id"],
item_type="download",
item_key="download:expired-task-1",
origin="direct",
final_status="complete",
source_id="expired-task-1",
snapshot={
"kind": "download",
"download": {
"id": "expired-task-1",
"title": "Expired Task",
"author": "Expired Author",
"added_time": 123,
"status_message": "Finished",
"source": "direct_download",
},
},
username=user["username"],
title="Expired Task",
author="Expired Author",
status_message="Finished",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
@@ -372,25 +493,16 @@ class TestActivityRoutes:
request_owner = _create_user(main_module, prefix="reader")
_set_session(client, user_id=admin["username"], db_user_id=admin["id"], is_admin=True)
main_module.activity_service.record_terminal_snapshot(
_record_terminal_download(
main_module,
task_id="cross-user-expired-task",
user_id=request_owner["id"],
item_type="download",
item_key="download:cross-user-expired-task",
username=request_owner["username"],
title="Cross User Task",
author="Another User",
origin="requested",
final_status="complete",
source_id="cross-user-expired-task",
snapshot={
"kind": "download",
"download": {
"id": "cross-user-expired-task",
"title": "Cross User Task",
"author": "Another User",
"added_time": 123,
"status_message": "Finished",
"source": "direct_download",
"user_id": request_owner["id"],
},
},
request_id=123,
status_message="Finished",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
@@ -405,6 +517,14 @@ class TestActivityRoutes:
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
_record_terminal_download(
main_module,
task_id="task-reused-1",
user_id=user["id"],
username=user["username"],
title="Reused Task",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
dismiss_response = client.post(
"/api/activity/dismiss",
@@ -431,12 +551,20 @@ class TestActivityRoutes:
"item_type": "download",
"item_key": "download:task-reused-1",
} not in snapshot_response.json["dismissed"]
assert main_module.activity_service.get_dismissal_set(user["id"]) == []
assert "task-reused-1" not in main_module.download_history_service.get_dismissed_keys(user_id=user["id"])
def test_dismiss_state_is_isolated_per_user(self, main_module, client):
user_one = _create_user(main_module, prefix="reader-one")
user_two = _create_user(main_module, prefix="reader-two")
_record_terminal_download(
main_module,
task_id="shared-task",
user_id=user_one["id"],
username=user_one["username"],
title="Shared Task",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
_set_session(client, user_id=user_one["username"], db_user_id=user_one["id"], is_admin=False)
dismiss_response = client.post(
@@ -457,12 +585,26 @@ class TestActivityRoutes:
def test_admin_request_dismissal_is_shared_across_admin_users(self, main_module, client):
admin_one = _create_user(main_module, prefix="admin-one", role="admin")
admin_two = _create_user(main_module, prefix="admin-two", role="admin")
request_owner = _create_user(main_module, prefix="request-owner")
request_row = main_module.user_db.create_request(
user_id=request_owner["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data={
"title": "Dismiss Me Request",
"author": "Request Author",
"provider": "openlibrary",
"provider_id": "dismiss-request-1",
},
status="rejected",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
_set_session(client, user_id=admin_one["username"], db_user_id=admin_one["id"], is_admin=True)
dismiss_response = client.post(
"/api/activity/dismiss",
json={"item_type": "request", "item_key": "request:999999"},
json={"item_type": "request", "item_key": f"request:{request_row['id']}"},
)
assert dismiss_response.status_code == 200
@@ -472,31 +614,25 @@ class TestActivityRoutes:
history_response = client.get("/api/activity/history?limit=50&offset=0")
assert snapshot_response.status_code == 200
assert {"item_type": "request", "item_key": "request:999999"} in snapshot_response.json["dismissed"]
assert {"item_type": "request", "item_key": f"request:{request_row['id']}"} in snapshot_response.json["dismissed"]
assert history_response.status_code == 200
assert any(row["item_key"] == "request:999999" for row in history_response.json)
assert any(row["item_key"] == f"request:{request_row['id']}" for row in history_response.json)
def test_history_paging_is_stable_and_non_overlapping(self, main_module, client):
user = _create_user(main_module, prefix="history-user")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
for index in range(5):
item_key = f"download:history-task-{index}"
main_module.activity_service.record_terminal_snapshot(
task_id = f"history-task-{index}"
_record_terminal_download(
main_module,
task_id=task_id,
user_id=user["id"],
item_type="download",
item_key=item_key,
origin="direct",
final_status="complete",
source_id=f"history-task-{index}",
snapshot={"kind": "download", "download": {"id": f"history-task-{index}"}},
)
main_module.activity_service.dismiss_item(
user_id=user["id"],
item_type="download",
item_key=item_key,
username=user["username"],
title=f"History Task {index}",
)
main_module.download_history_service.dismiss(task_id=task_id, user_id=user["id"])
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
page_one = client.get("/api/activity/history?limit=2&offset=0")
@@ -523,6 +659,12 @@ class TestActivityRoutes:
def test_dismiss_many_emits_activity_update_only_to_acting_user_room(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
_record_terminal_download(
main_module,
task_id="test-task-many",
user_id=user["id"],
username=user["username"],
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.ws_manager, "is_enabled", return_value=True):
@@ -546,11 +688,13 @@ class TestActivityRoutes:
def test_clear_history_emits_activity_update_only_to_acting_user_room(self, main_module, client):
user = _create_user(main_module, prefix="reader")
_set_session(client, user_id=user["username"], db_user_id=user["id"], is_admin=False)
main_module.activity_service.dismiss_item(
_record_terminal_download(
main_module,
task_id="history-clear-task",
user_id=user["id"],
item_type="download",
item_key="download:history-clear-task",
username=user["username"],
)
main_module.download_history_service.dismiss(task_id="history-clear-task", user_id=user["id"])
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.ws_manager, "is_enabled", return_value=True):

View File

@@ -1,295 +0,0 @@
"""Tests for activity service persistence helpers."""
from __future__ import annotations
import os
import tempfile
import pytest
from shelfmark.core.activity_service import (
ActivityService,
build_download_item_key,
build_item_key,
build_request_item_key,
)
from shelfmark.core.user_db import UserDB
@pytest.fixture
def db_path():
with tempfile.TemporaryDirectory() as tmpdir:
yield os.path.join(tmpdir, "users.db")
@pytest.fixture
def user_db(db_path):
db = UserDB(db_path)
db.initialize()
return db
@pytest.fixture
def activity_service(db_path):
return ActivityService(db_path)
class TestItemKeys:
def test_build_request_item_key(self):
assert build_request_item_key(42) == "request:42"
assert build_item_key("request", 7) == "request:7"
def test_build_download_item_key(self):
assert build_download_item_key("abc123") == "download:abc123"
assert build_item_key("download", "xyz") == "download:xyz"
def test_build_item_key_validation(self):
with pytest.raises(ValueError):
build_item_key("bad", "x")
with pytest.raises(ValueError):
build_item_key("request", "nope")
with pytest.raises(ValueError):
build_item_key("download", "")
class TestActivityService:
def test_record_snapshot_and_dismiss_and_history(self, user_db, activity_service):
user = user_db.create_user(username="activity-user")
snapshot = activity_service.record_terminal_snapshot(
user_id=user["id"],
item_type="download",
item_key="download:task-1",
origin="requested",
final_status="complete",
request_id=12,
source_id="task-1",
snapshot={"title": "My Book", "status": "complete"},
)
assert snapshot["item_type"] == "download"
assert snapshot["item_key"] == "download:task-1"
assert snapshot["origin"] == "requested"
assert snapshot["final_status"] == "complete"
dismissal = activity_service.dismiss_item(
user_id=user["id"],
item_type="download",
item_key="download:task-1",
)
assert dismissal["item_type"] == "download"
assert dismissal["item_key"] == "download:task-1"
assert dismissal["activity_log_id"] == snapshot["id"]
dismissed_set = activity_service.get_dismissal_set(user["id"])
assert dismissed_set == [{"item_type": "download", "item_key": "download:task-1"}]
history = activity_service.get_history(user["id"], limit=10, offset=0)
assert len(history) == 1
assert history[0]["item_type"] == "download"
assert history[0]["item_key"] == "download:task-1"
assert history[0]["origin"] == "requested"
assert history[0]["final_status"] == "complete"
assert history[0]["snapshot"] == {"title": "My Book", "status": "complete"}
def test_history_hydrates_legacy_request_dismissals_without_snapshot(self, user_db, activity_service):
user = user_db.create_user(username="legacy-reader")
request_row = user_db.create_request(
user_id=user["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data={
"title": "Legacy Request",
"author": "Legacy Author",
"provider": "openlibrary",
"provider_id": "legacy-hydrate-1",
},
status="fulfilled",
delivery_state="unknown",
)
activity_service.dismiss_item(
user_id=user["id"],
item_type="request",
item_key=f"request:{request_row['id']}",
)
history = activity_service.get_history(user["id"], limit=10, offset=0)
assert len(history) == 1
assert history[0]["item_type"] == "request"
assert history[0]["item_key"] == f"request:{request_row['id']}"
assert history[0]["origin"] == "request"
assert history[0]["final_status"] == "complete"
assert history[0]["snapshot"] == {
"kind": "request",
"request": {
"id": request_row["id"],
"user_id": user["id"],
"status": "fulfilled",
"delivery_state": "unknown",
"request_level": "book",
"book_data": {
"title": "Legacy Request",
"author": "Legacy Author",
"provider": "openlibrary",
"provider_id": "legacy-hydrate-1",
},
"release_data": {},
"note": None,
"admin_note": None,
"created_at": request_row["created_at"],
"updated_at": request_row["created_at"],
},
}
def test_dismiss_many_and_clear_history(self, user_db, activity_service):
alice = user_db.create_user(username="alice")
bob = user_db.create_user(username="bob")
activity_service.record_terminal_snapshot(
user_id=alice["id"],
item_type="request",
item_key="request:10",
origin="request",
final_status="rejected",
request_id=10,
snapshot={"title": "Rejected Book"},
)
activity_service.record_terminal_snapshot(
user_id=alice["id"],
item_type="download",
item_key="download:task-2",
origin="direct",
final_status="error",
source_id="task-2",
snapshot={"title": "Failed Download"},
)
dismissed_count = activity_service.dismiss_many(
user_id=alice["id"],
items=[
{"item_type": "request", "item_key": "request:10"},
{"item_type": "download", "item_key": "download:task-2"},
],
)
assert dismissed_count == 2
# Bob has independent dismiss state.
activity_service.dismiss_item(
user_id=bob["id"],
item_type="request",
item_key="request:10",
)
alice_history = activity_service.get_history(alice["id"])
bob_history = activity_service.get_history(bob["id"])
assert len(alice_history) == 2
assert len(bob_history) == 1
cleared = activity_service.clear_history(alice["id"])
assert cleared == 2
assert activity_service.get_history(alice["id"]) == []
assert len(activity_service.get_history(bob["id"])) == 1
def test_get_undismissed_terminal_downloads_returns_latest_per_item_and_excludes_dismissed(
self,
user_db,
activity_service,
):
user = user_db.create_user(username="snapshot-user")
activity_service.record_terminal_snapshot(
user_id=user["id"],
item_type="download",
item_key="download:task-1",
origin="direct",
final_status="error",
source_id="task-1",
terminal_at="2026-01-01T10:00:00+00:00",
snapshot={"kind": "download", "download": {"id": "task-1", "status_message": "failed"}},
)
activity_service.record_terminal_snapshot(
user_id=user["id"],
item_type="download",
item_key="download:task-1",
origin="direct",
final_status="complete",
source_id="task-1",
terminal_at="2026-01-01T11:00:00+00:00",
snapshot={"kind": "download", "download": {"id": "task-1", "status_message": "done"}},
)
activity_service.record_terminal_snapshot(
user_id=user["id"],
item_type="download",
item_key="download:task-2",
origin="direct",
final_status="cancelled",
source_id="task-2",
terminal_at="2026-01-01T09:00:00+00:00",
snapshot={"kind": "download", "download": {"id": "task-2", "status_message": "stopped"}},
)
activity_service.dismiss_item(
user_id=user["id"],
item_type="download",
item_key="download:task-2",
)
rows = activity_service.get_undismissed_terminal_downloads(
user["id"],
owner_user_id=user["id"],
)
assert len(rows) == 1
assert rows[0]["item_key"] == "download:task-1"
assert rows[0]["final_status"] == "complete"
assert rows[0]["snapshot"] == {
"kind": "download",
"download": {"id": "task-1", "status_message": "done"},
}
def test_get_undismissed_terminal_downloads_can_span_owners_for_admin_viewer(
self,
user_db,
activity_service,
):
viewer = user_db.create_user(username="admin-viewer", role="admin")
owner_one = user_db.create_user(username="owner-one")
owner_two = user_db.create_user(username="owner-two")
activity_service.record_terminal_snapshot(
user_id=owner_one["id"],
item_type="download",
item_key="download:owner-one-task",
origin="direct",
final_status="complete",
source_id="owner-one-task",
terminal_at="2026-01-01T10:00:00+00:00",
snapshot={"kind": "download", "download": {"id": "owner-one-task"}},
)
activity_service.record_terminal_snapshot(
user_id=owner_two["id"],
item_type="download",
item_key="download:owner-two-task",
origin="direct",
final_status="complete",
source_id="owner-two-task",
terminal_at="2026-01-01T11:00:00+00:00",
snapshot={"kind": "download", "download": {"id": "owner-two-task"}},
)
activity_service.dismiss_item(
user_id=viewer["id"],
item_type="download",
item_key="download:owner-two-task",
)
all_owner_rows = activity_service.get_undismissed_terminal_downloads(
viewer["id"],
owner_user_id=None,
)
assert [row["item_key"] for row in all_owner_rows] == ["download:owner-one-task"]
owner_one_rows = activity_service.get_undismissed_terminal_downloads(
viewer["id"],
owner_user_id=owner_one["id"],
)
assert [row["item_key"] for row in owner_one_rows] == ["download:owner-one-task"]

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import importlib
import json
import uuid
from unittest.mock import patch
@@ -28,19 +27,19 @@ def _create_user(main_module, *, prefix: str) -> dict:
return main_module.user_db.create_user(username=username, role="user")
def _read_activity_log_row(main_module, snapshot_id: int):
def _read_download_history_row(main_module, task_id: str):
conn = main_module.user_db._connect()
try:
return conn.execute(
"SELECT * FROM activity_log WHERE id = ?",
(snapshot_id,),
"SELECT * FROM download_history WHERE task_id = ?",
(task_id,),
).fetchone()
finally:
conn.close()
class TestTerminalSnapshotCapture:
def test_complete_transition_records_direct_snapshot_and_survives_queue_clear(self, main_module):
def test_complete_transition_records_direct_snapshot(self, main_module):
user = _create_user(main_module, prefix="snap-direct")
task_id = f"direct-{uuid.uuid4().hex[:8]}"
task = DownloadTask(
@@ -54,25 +53,15 @@ class TestTerminalSnapshotCapture:
try:
main_module.backend.book_queue.update_status(task_id, QueueStatus.COMPLETE)
item_key = f"download:{task_id}"
snapshot_id = main_module.activity_service.get_latest_activity_log_id(
item_type="download",
item_key=item_key,
)
assert snapshot_id is not None
row = _read_download_history_row(main_module, task_id)
assert row is not None
removed = main_module.backend.book_queue.clear_completed(user_id=user["id"])
assert removed >= 1
row = _read_activity_log_row(main_module, snapshot_id)
row = _read_download_history_row(main_module, task_id)
assert row is not None
assert row["user_id"] == user["id"]
assert row["item_key"] == item_key
assert row["task_id"] == task_id
assert row["origin"] == "direct"
assert row["final_status"] == "complete"
snapshot = json.loads(row["snapshot_json"])
assert snapshot["kind"] == "download"
assert snapshot["download"]["id"] == task_id
finally:
main_module.backend.book_queue.cancel_download(task_id)
@@ -109,20 +98,11 @@ class TestTerminalSnapshotCapture:
try:
main_module.backend.book_queue.update_status(task_id, QueueStatus.COMPLETE)
snapshot_id = main_module.activity_service.get_latest_activity_log_id(
item_type="download",
item_key=f"download:{task_id}",
)
assert snapshot_id is not None
row = _read_activity_log_row(main_module, snapshot_id)
row = _read_download_history_row(main_module, task_id)
assert row is not None
assert row["origin"] == "requested"
assert row["request_id"] == request_row["id"]
assert row["source_id"] == task_id
snapshot = json.loads(row["snapshot_json"])
assert snapshot["download"]["id"] == task_id
assert snapshot["request"]["id"] == request_row["id"]
assert row["task_id"] == task_id
finally:
main_module.backend.book_queue.cancel_download(task_id)
@@ -143,16 +123,9 @@ class TestTerminalSnapshotCapture:
main_module.backend.book_queue.update_status_message(task_id, "Moving file")
main_module.backend.update_download_status(task_id, "complete", "Complete")
snapshot_id = main_module.activity_service.get_latest_activity_log_id(
item_type="download",
item_key=f"download:{task_id}",
)
assert snapshot_id is not None
row = _read_activity_log_row(main_module, snapshot_id)
row = _read_download_history_row(main_module, task_id)
assert row is not None
snapshot = json.loads(row["snapshot_json"])
assert snapshot["download"]["status_message"] == "Complete"
assert row["status_message"] == "Complete"
finally:
main_module.backend.book_queue.cancel_download(task_id)

View File

@@ -106,20 +106,6 @@ class TestAdminUsersListEndpoint:
users = resp.json
assert "password_hash" not in users[0]
def test_list_users_hides_internal_no_auth_activity_user(self, admin_client, user_db):
user_db.create_user(
username="__shelfmark_noauth_activity__",
display_name="No-auth Activity",
role="admin",
)
user_db.create_user(username="alice", email="alice@example.com")
resp = admin_client.get("/api/admin/users")
assert resp.status_code == 200
usernames = [u["username"] for u in resp.json]
assert "__shelfmark_noauth_activity__" not in usernames
assert "alice" in usernames
def test_list_users_includes_auth_source_and_is_active(self, admin_client, user_db):
user_db.create_user(username="local_user", auth_source="builtin")
user_db.create_user(

View File

@@ -117,47 +117,6 @@ class TestQueueFilterByUser:
assert "book-1" not in all_tasks
assert "book-2" in all_tasks
def test_clear_completed_for_user_only_removes_user_terminal_tasks(self):
q = BookQueue()
q.add(self._make_task("book-1", user_id=1))
q.add(self._make_task("book-2", user_id=2))
q.add(self._make_task("book-3", user_id=1))
q.update_status("book-1", QueueStatus.COMPLETE)
q.update_status("book-2", QueueStatus.ERROR)
q.update_status("book-3", QueueStatus.QUEUED)
removed = q.clear_completed(user_id=1)
assert removed == 1
status = q.get_status()
all_tasks = {}
for tasks_by_status in status.values():
all_tasks.update(tasks_by_status)
assert "book-1" not in all_tasks
assert "book-2" in all_tasks
assert "book-3" in all_tasks
def test_clear_completed_for_user_excludes_legacy_tasks(self):
q = BookQueue()
q.add(self._make_task("legacy-book", user_id=None))
q.add(self._make_task("user-book", user_id=1))
q.update_status("legacy-book", QueueStatus.COMPLETE)
q.update_status("user-book", QueueStatus.COMPLETE)
removed = q.clear_completed(user_id=1)
assert removed == 1
status = q.get_status()
all_tasks = {}
for tasks_by_status in status.values():
all_tasks.update(tasks_by_status)
assert "legacy-book" in all_tasks
assert "user-book" not in all_tasks
def test_enqueue_existing_deduplicates_queue_entries(self):
q = BookQueue()
q.add(self._make_task("book-1", user_id=1))

View File

@@ -59,14 +59,6 @@ def _policy(
}
def _read_activity_log_row(main_module, snapshot_id: int):
conn = main_module.user_db._connect()
try:
return conn.execute("SELECT * FROM activity_log WHERE id = ?", (snapshot_id,)).fetchone()
finally:
conn.close()
class TestDownloadPolicyGuards:
def test_download_endpoint_blocks_before_queue_when_policy_requires_request(self, main_module, client):
user = _create_user(main_module, prefix="reader")
@@ -221,17 +213,10 @@ class TestRequestRoutes:
assert cancel_resp.status_code == 200
assert cancel_resp.json["status"] == "cancelled"
snapshot_id = main_module.activity_service.get_latest_activity_log_id(
item_type="request",
item_key=f"request:{request_id}",
)
assert snapshot_id is not None
log_row = _read_activity_log_row(main_module, snapshot_id)
assert log_row is not None
assert log_row["user_id"] == user["id"]
assert log_row["final_status"] == "cancelled"
assert log_row["origin"] == "request"
updated = main_module.user_db.get_request(request_id)
assert updated is not None
assert updated["user_id"] == user["id"]
assert updated["status"] == "cancelled"
def test_create_request_emits_websocket_events(self, main_module, client):
user = _create_user(main_module, prefix="reader")
@@ -652,17 +637,10 @@ class TestRequestRoutes:
assert reject_resp.json["status"] == "rejected"
assert reject_again_resp.status_code == 409
assert reject_again_resp.json["code"] == "stale_transition"
snapshot_id = main_module.activity_service.get_latest_activity_log_id(
item_type="request",
item_key=f"request:{request_id}",
)
assert snapshot_id is not None
log_row = _read_activity_log_row(main_module, snapshot_id)
assert log_row is not None
assert log_row["user_id"] == user["id"]
assert log_row["final_status"] == "rejected"
assert log_row["origin"] == "request"
updated = main_module.user_db.get_request(request_id)
assert updated is not None
assert updated["user_id"] == user["id"]
assert updated["status"] == "rejected"
def test_admin_reject_emits_update_to_user_and_admin_rooms(self, main_module, client):
user = _create_user(main_module, prefix="reader")
@@ -1986,122 +1964,3 @@ class TestDownloadPolicyGuardsExtended:
assert resp.status_code == 200
def test_clear_queue_does_not_mutate_fulfilled_request_delivery_state(main_module, client):
user = _create_user(main_module, prefix="reader")
admin = _create_user(main_module, prefix="admin", role="admin")
_set_session(client, user_id=admin["username"], db_user_id=admin["id"], is_admin=True)
created = main_module.user_db.create_request(
user_id=user["id"],
content_type="ebook",
request_level="release",
policy_mode="request_release",
book_data={
"title": "Clear Delivery State",
"author": "QA",
"content_type": "ebook",
"provider": "openlibrary",
"provider_id": "ol-clear-delivery",
},
release_data={
"source": "prowlarr",
"source_id": "clear-delivery-source-id",
"title": "Clear Delivery State.epub",
},
status="fulfilled",
delivery_state="complete",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.ws_manager, "is_enabled", return_value=False):
with patch.object(main_module.ws_manager, "broadcast_status_update"):
with patch.object(main_module.backend, "queue_status", return_value={}) as mock_queue_status:
with patch.object(main_module.backend, "clear_completed", return_value=1) as mock_clear_completed:
resp = client.delete("/api/queue/clear")
assert resp.status_code == 200
assert resp.json["status"] == "cleared"
assert resp.json["removed_count"] == 1
assert mock_queue_status.call_args_list[0].kwargs == {}
mock_clear_completed.assert_called_once_with(user_id=None)
refreshed = main_module.user_db.get_request(created["id"])
assert refreshed["delivery_state"] == "complete"
def test_non_admin_clear_queue_is_scoped_without_mutating_request_delivery_state(main_module, client):
alice = _create_user(main_module, prefix="alice")
bob = _create_user(main_module, prefix="bob")
_set_session(client, user_id=alice["username"], db_user_id=alice["id"], is_admin=False)
alice_request = main_module.user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="release",
policy_mode="request_release",
book_data={
"title": "Alice Clear Scope",
"author": "QA",
"content_type": "ebook",
"provider": "openlibrary",
"provider_id": "ol-alice-scope",
},
release_data={
"source": "prowlarr",
"source_id": "shared-clear-scope-source-id",
"title": "Alice Scope.epub",
},
status="fulfilled",
delivery_state="complete",
)
bob_request = main_module.user_db.create_request(
user_id=bob["id"],
content_type="ebook",
request_level="release",
policy_mode="request_release",
book_data={
"title": "Bob Clear Scope",
"author": "QA",
"content_type": "ebook",
"provider": "openlibrary",
"provider_id": "ol-bob-scope",
},
release_data={
"source": "prowlarr",
"source_id": "shared-clear-scope-source-id",
"title": "Bob Scope.epub",
},
status="fulfilled",
delivery_state="complete",
)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.ws_manager, "is_enabled", return_value=False):
with patch.object(main_module.ws_manager, "broadcast_status_update"):
with patch.object(main_module.backend, "queue_status", return_value={}) as mock_queue_status:
with patch.object(main_module.backend, "clear_completed", return_value=1) as mock_clear_completed:
resp = client.delete("/api/queue/clear")
assert resp.status_code == 200
assert resp.json["status"] == "cleared"
assert resp.json["removed_count"] == 1
assert mock_queue_status.call_args_list[0].kwargs == {}
mock_clear_completed.assert_called_once_with(user_id=alice["id"])
refreshed_alice = main_module.user_db.get_request(alice_request["id"])
refreshed_bob = main_module.user_db.get_request(bob_request["id"])
assert refreshed_alice["delivery_state"] == "complete"
assert refreshed_bob["delivery_state"] == "complete"
def test_non_admin_clear_queue_without_db_user_id_returns_403(main_module, client):
_set_session(client, user_id="reader-no-db", db_user_id=None, is_admin=False)
with patch.object(main_module, "get_auth_mode", return_value="builtin"):
with patch.object(main_module.backend, "clear_completed") as mock_clear_completed:
resp = client.delete("/api/queue/clear")
assert resp.status_code == 403
assert resp.json["code"] == "user_identity_unavailable"
mock_clear_completed.assert_not_called()

View File

@@ -62,7 +62,15 @@ class TestUserDBInitialization:
assert cursor.fetchone() is not None
conn.close()
def test_initialize_creates_activity_tables(self, user_db, db_path):
def test_initialize_creates_download_history_table(self, user_db, db_path):
conn = sqlite3.connect(db_path)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='download_history'"
)
assert cursor.fetchone() is not None
conn.close()
def test_initialize_does_not_create_legacy_activity_tables(self, user_db, db_path):
conn = sqlite3.connect(db_path)
activity_log = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='activity_log'"
@@ -70,8 +78,8 @@ class TestUserDBInitialization:
dismissals = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='activity_dismissals'"
).fetchone()
assert activity_log is not None
assert dismissals is not None
assert activity_log is None
assert dismissals is None
conn.close()
def test_initialize_creates_download_requests_indexes(self, user_db, db_path):
@@ -84,20 +92,30 @@ class TestUserDBInitialization:
assert "idx_download_requests_status_created_at" in index_names
conn.close()
def test_initialize_creates_activity_indexes(self, user_db, db_path):
def test_initialize_does_not_create_legacy_activity_indexes(self, user_db, db_path):
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_log'"
).fetchall()
log_index_names = {row[0] for row in rows}
assert "idx_activity_log_user_terminal" in log_index_names
assert "idx_activity_log_lookup" in log_index_names
assert "idx_activity_log_user_terminal" not in log_index_names
assert "idx_activity_log_lookup" not in log_index_names
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='activity_dismissals'"
).fetchall()
dismissal_index_names = {row[0] for row in rows}
assert "idx_activity_dismissals_user_dismissed_at" in dismissal_index_names
assert "idx_activity_dismissals_user_dismissed_at" not in dismissal_index_names
conn.close()
def test_initialize_creates_download_history_indexes(self, user_db, db_path):
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='download_history'"
).fetchall()
index_names = {row[0] for row in rows}
assert "idx_download_history_user_status" in index_names
assert "idx_download_history_dismissed" in index_names
conn.close()
def test_initialize_enables_wal_mode(self, user_db, db_path):
@@ -226,6 +244,226 @@ class TestUserDBInitialization:
assert "REQUESTS_ALLOW_NOTES" not in column_names
conn.close()
def test_initialize_migrates_download_requests_dismissed_at_column(self, db_path):
conn = sqlite3.connect(db_path)
conn.executescript(
"""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT,
display_name TEXT,
password_hash TEXT,
oidc_subject TEXT UNIQUE,
auth_source TEXT NOT NULL DEFAULT 'builtin',
role TEXT NOT NULL DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE user_settings (
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
settings_json TEXT NOT NULL DEFAULT '{}'
);
CREATE TABLE download_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'pending',
delivery_state TEXT NOT NULL DEFAULT 'none',
source_hint TEXT,
content_type TEXT NOT NULL,
request_level TEXT NOT NULL,
policy_mode TEXT NOT NULL,
book_data TEXT NOT NULL,
release_data TEXT,
note TEXT,
admin_note TEXT,
reviewed_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
reviewed_at TIMESTAMP,
delivery_updated_at TIMESTAMP
);
"""
)
conn.commit()
conn.close()
from shelfmark.core.user_db import UserDB
db = UserDB(db_path)
db.initialize()
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
columns = conn.execute("PRAGMA table_info(download_requests)").fetchall()
column_names = {str(col["name"]) for col in columns}
assert "dismissed_at" in column_names
conn.close()
def test_initialize_migrates_existing_install_without_backfill(self, db_path):
"""Upgrade path: preserve existing rows and add new schema without retroactive history backfill."""
conn = sqlite3.connect(db_path)
conn.executescript(
"""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT,
display_name TEXT,
password_hash TEXT,
oidc_subject TEXT UNIQUE,
auth_source TEXT NOT NULL DEFAULT 'builtin',
role TEXT NOT NULL DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE user_settings (
user_id INTEGER PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
settings_json TEXT NOT NULL DEFAULT '{}'
);
CREATE TABLE download_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'pending',
delivery_state TEXT NOT NULL DEFAULT 'none',
source_hint TEXT,
content_type TEXT NOT NULL,
request_level TEXT NOT NULL,
policy_mode TEXT NOT NULL,
book_data TEXT NOT NULL,
release_data TEXT,
note TEXT,
admin_note TEXT,
reviewed_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
reviewed_at TIMESTAMP,
delivery_updated_at TIMESTAMP
);
CREATE TABLE activity_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
request_id INTEGER,
source_id TEXT,
origin TEXT NOT NULL,
final_status TEXT NOT NULL,
snapshot_json TEXT NOT NULL,
terminal_at TIMESTAMP NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE activity_dismissals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
activity_log_id INTEGER REFERENCES activity_log(id) ON DELETE SET NULL,
dismissed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, item_type, item_key)
);
"""
)
conn.execute("INSERT INTO users (id, username, role) VALUES (?, ?, ?)", (1, "legacy-user", "user"))
conn.execute(
"""
INSERT INTO download_requests (
id,
user_id,
status,
delivery_state,
content_type,
request_level,
policy_mode,
book_data
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(11, 1, "fulfilled", "complete", "ebook", "book", "request_book", '{"title":"Legacy Book"}'),
)
conn.execute(
"""
INSERT INTO activity_log (
id,
user_id,
item_type,
item_key,
request_id,
source_id,
origin,
final_status,
snapshot_json,
terminal_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
21,
1,
"download",
"download:legacy-task",
11,
"legacy-task",
"request",
"complete",
'{"kind":"download","download":{"id":"legacy-task","title":"Legacy Book"}}',
"2026-01-01T00:00:00+00:00",
),
)
conn.execute(
"""
INSERT INTO activity_dismissals (
user_id,
item_type,
item_key,
activity_log_id,
dismissed_at
)
VALUES (?, ?, ?, ?, ?)
""",
(1, "download", "download:legacy-task", 21, "2026-01-02T00:00:00+00:00"),
)
conn.commit()
conn.close()
from shelfmark.core.user_db import UserDB
db = UserDB(db_path)
db.initialize()
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
request_row = conn.execute(
"SELECT id, user_id, status FROM download_requests WHERE id = 11"
).fetchone()
assert request_row is not None
assert request_row["user_id"] == 1
assert request_row["status"] == "fulfilled"
request_columns = conn.execute("PRAGMA table_info(download_requests)").fetchall()
request_column_names = {str(col["name"]) for col in request_columns}
assert "dismissed_at" in request_column_names
history_table = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='download_history'"
).fetchone()
assert history_table is not None
# No retroactive copy from legacy activity tables in the no-backfill plan.
history_count = conn.execute("SELECT COUNT(*) AS count FROM download_history").fetchone()["count"]
assert history_count == 0
legacy_activity_rows = conn.execute("SELECT COUNT(*) AS count FROM activity_log").fetchone()["count"]
legacy_dismissal_rows = conn.execute(
"SELECT COUNT(*) AS count FROM activity_dismissals"
).fetchone()["count"]
assert legacy_activity_rows == 1
assert legacy_dismissal_rows == 1
conn.close()
class TestUserCRUD:
"""Tests for user create, read, update, delete operations."""
@@ -738,3 +976,124 @@ class TestDownloadRequests:
user_db.delete_user(user["id"])
assert user_db.get_request(created["id"]) is None
def test_list_dismissed_requests_scopes_by_user(self, user_db):
alice = user_db.create_user(username="alice")
bob = user_db.create_user(username="bob")
first = user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
second = user_db.create_request(
user_id=bob["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
third = user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
user_db.update_request(first["id"], dismissed_at="2026-01-01T10:00:00+00:00")
user_db.update_request(second["id"], dismissed_at="2026-01-01T12:00:00+00:00")
user_db.update_request(third["id"], dismissed_at="2026-01-01T11:00:00+00:00")
all_rows = user_db.list_dismissed_requests(user_id=None)
assert [row["id"] for row in all_rows] == [second["id"], third["id"], first["id"]]
alice_rows = user_db.list_dismissed_requests(user_id=alice["id"])
assert [row["id"] for row in alice_rows] == [third["id"], first["id"]]
bob_rows = user_db.list_dismissed_requests(user_id=bob["id"])
assert [row["id"] for row in bob_rows] == [second["id"]]
def test_clear_request_dismissals_scopes_by_user(self, user_db):
alice = user_db.create_user(username="alice")
bob = user_db.create_user(username="bob")
alice_request = user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
bob_request = user_db.create_request(
user_id=bob["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
)
user_db.update_request(alice_request["id"], dismissed_at="2026-01-01T10:00:00+00:00")
user_db.update_request(bob_request["id"], dismissed_at="2026-01-01T11:00:00+00:00")
cleared_alice = user_db.clear_request_dismissals(user_id=alice["id"])
assert cleared_alice == 1
assert user_db.get_request(alice_request["id"])["dismissed_at"] is None
assert user_db.get_request(bob_request["id"])["dismissed_at"] is not None
cleared_all = user_db.clear_request_dismissals(user_id=None)
assert cleared_all == 1
assert user_db.get_request(bob_request["id"])["dismissed_at"] is None
def test_delete_dismissed_requests_scopes_by_user_and_only_deletes_terminal(self, user_db):
alice = user_db.create_user(username="alice")
bob = user_db.create_user(username="bob")
alice_rejected = user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
status="rejected",
)
bob_fulfilled = user_db.create_request(
user_id=bob["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
status="fulfilled",
)
alice_pending = user_db.create_request(
user_id=alice["id"],
content_type="ebook",
request_level="book",
policy_mode="request_book",
book_data=self._book_data(),
status="pending",
)
user_db.update_request(alice_rejected["id"], dismissed_at="2026-01-01T10:00:00+00:00")
user_db.update_request(bob_fulfilled["id"], dismissed_at="2026-01-01T11:00:00+00:00")
user_db.update_request(alice_pending["id"], dismissed_at="2026-01-01T12:00:00+00:00")
deleted_alice = user_db.delete_dismissed_requests(user_id=alice["id"])
assert deleted_alice == 1
assert user_db.get_request(alice_rejected["id"]) is None
assert user_db.get_request(alice_pending["id"]) is not None
assert user_db.get_request(bob_fulfilled["id"]) is not None
deleted_all = user_db.delete_dismissed_requests(user_id=None)
assert deleted_all == 1
assert user_db.get_request(bob_fulfilled["id"]) is None
assert user_db.get_request(alice_pending["id"]) is not None
def test_request_dismissal_helpers_validate_user_scope(self, user_db):
with pytest.raises(ValueError, match="user_id must be a positive integer"):
user_db.list_dismissed_requests(user_id=0)
with pytest.raises(ValueError, match="user_id must be a positive integer"):
user_db.clear_request_dismissals(user_id=-1)
with pytest.raises(ValueError, match="user_id must be a positive integer"):
user_db.delete_dismissed_requests(user_id=0)

View File

@@ -207,14 +207,6 @@ class TestQueueEndpoint:
else:
assert isinstance(data, list)
def test_clear_queue(self, api_client: APIClient, download_tracker: DownloadTracker):
"""Test clearing the queue."""
resp = api_client.delete("/api/queue/clear")
# Should succeed (may be 200 or 204)
assert resp.status_code in [200, 204]
@pytest.mark.e2e
class TestSettingsEndpoint:
"""Tests for settings endpoints."""