mirror of
https://github.com/calibrain/shelfmark.git
synced 2026-04-19 13:28:16 -04:00
- Adds `uv`, `ruff`, `pyright`, `vulture` and `pytest-xdist` - Move project, lockfile, docker build etc to uv - Align python tooling on 3.14 - Huge bulk of ruff linter fixes applied. Still in progress but all the core types are now enforced - Update CI and test helpers
3218 lines
116 KiB
Python
3218 lines
116 KiB
Python
"""Flask app - routes, WebSocket handlers, and middleware."""
|
|
|
|
import io
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from contextlib import suppress
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from importlib import import_module
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, Callable, NoReturn
|
|
|
|
from flask import Flask, jsonify, request, send_file, send_from_directory, session
|
|
from flask_cors import CORS
|
|
from flask_socketio import SocketIO, emit
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.security import check_password_hash
|
|
from werkzeug.wrappers import Response
|
|
|
|
from shelfmark.api.websocket import ws_manager
|
|
from shelfmark.config.env import (
|
|
BUILD_VERSION,
|
|
CONFIG_DIR,
|
|
CWA_DB_PATH,
|
|
DEBUG,
|
|
FLASK_HOST,
|
|
FLASK_PORT,
|
|
HIDE_LOCAL_AUTH,
|
|
OIDC_AUTO_REDIRECT,
|
|
RELEASE_VERSION,
|
|
_is_config_dir_writable,
|
|
)
|
|
from shelfmark.config.settings import _SUPPORTED_BOOK_LANGUAGE
|
|
from shelfmark.core.activity_view_state_service import ActivityViewStateService
|
|
from shelfmark.core.auth_modes import (
|
|
get_auth_check_admin_status,
|
|
is_settings_or_onboarding_path,
|
|
load_active_auth_mode,
|
|
requires_admin_for_settings_access,
|
|
)
|
|
from shelfmark.core.config import config as app_config
|
|
from shelfmark.core.cwa_user_sync import upsert_cwa_user
|
|
from shelfmark.core.download_history_service import DownloadHistoryService
|
|
from shelfmark.core.external_user_linking import upsert_external_user
|
|
from shelfmark.core.logger import setup_logger
|
|
from shelfmark.core.models import TERMINAL_QUEUE_STATUSES, QueueStatus, SearchFilters
|
|
from shelfmark.core.notifications import (
|
|
NotificationContext,
|
|
NotificationEvent,
|
|
notify_admin,
|
|
notify_user,
|
|
)
|
|
from shelfmark.core.prefix_middleware import PrefixMiddleware
|
|
from shelfmark.core.request_helpers import (
|
|
coerce_bool,
|
|
emit_ws_event,
|
|
get_session_db_user_id,
|
|
load_users_request_policy_settings,
|
|
normalize_optional_text,
|
|
normalize_positive_int,
|
|
)
|
|
from shelfmark.core.request_policy import (
|
|
PolicyMode,
|
|
get_source_content_type_capabilities,
|
|
merge_request_policy_settings,
|
|
normalize_content_type,
|
|
normalize_source,
|
|
resolve_policy_mode,
|
|
)
|
|
from shelfmark.core.requests_service import (
|
|
reopen_failed_request,
|
|
sync_delivery_states_from_queue_status,
|
|
)
|
|
from shelfmark.core.utils import normalize_base_path
|
|
from shelfmark.download import orchestrator as backend
|
|
from shelfmark.release_sources import (
|
|
BrowseRecord,
|
|
Release,
|
|
SourceUnavailableError,
|
|
get_source_display_name,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from shelfmark.metadata_providers import BookMetadata, MetadataProvider
|
|
|
|
logger = setup_logger(__name__)
|
|
|
|
|
|
def _raise_runtime_error(message: str) -> NoReturn:
|
|
raise RuntimeError(message)
|
|
|
|
|
|
# Project root is the repository root above the package directory.
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
FRONTEND_DIST = PROJECT_ROOT / "frontend-dist"
|
|
|
|
BASE_PATH = normalize_base_path(app_config.get("URL_BASE", ""))
|
|
|
|
app = Flask(__name__)
|
|
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0 # Disable caching
|
|
app.config["APPLICATION_ROOT"] = BASE_PATH or "/"
|
|
app.wsgi_app = ProxyFix(app.wsgi_app) # type: ignore
|
|
if BASE_PATH:
|
|
app.wsgi_app = PrefixMiddleware(app.wsgi_app, BASE_PATH, bypass_paths={"/api/health"})
|
|
|
|
# Socket.IO async mode.
|
|
# We run this app under Gunicorn with a gevent websocket worker (even when DEBUG=true),
|
|
# so Socket.IO should always use gevent here.
|
|
async_mode = "gevent"
|
|
socketio_cors_allowed_origins = "*"
|
|
|
|
# Initialize Flask-SocketIO with reverse proxy support
|
|
socketio_path = f"{BASE_PATH}/socket.io" if BASE_PATH else "/socket.io"
|
|
socketio = SocketIO(
|
|
app,
|
|
cors_allowed_origins=socketio_cors_allowed_origins,
|
|
async_mode=async_mode,
|
|
logger=False,
|
|
engineio_logger=False,
|
|
# Reverse proxy / Traefik compatibility settings
|
|
path=socketio_path,
|
|
ping_timeout=60, # Time to wait for pong response
|
|
ping_interval=25, # Send ping every 25 seconds
|
|
# Allow both websocket and polling for better compatibility
|
|
transports=["websocket", "polling"],
|
|
# Enable CORS for all origins (you can restrict this in production)
|
|
allow_upgrades=True,
|
|
# Important for proxies that buffer
|
|
http_compression=True,
|
|
)
|
|
|
|
# Initialize WebSocket manager
|
|
ws_manager.init_app(app, socketio)
|
|
ws_manager.set_queue_status_fn(backend.queue_status)
|
|
logger.info("Flask-SocketIO initialized with async_mode='%s'", async_mode)
|
|
logger.info("Socket.IO CORS allowed origins: %s", socketio_cors_allowed_origins)
|
|
|
|
# Ensure all plugins are loaded before starting the download coordinator.
|
|
# This prevents a race condition where the download loop could try to process
|
|
# a queued task before its handler (e.g., prowlarr) is registered.
|
|
try:
|
|
import_module("shelfmark.metadata_providers")
|
|
import_module("shelfmark.release_sources")
|
|
logger.debug("Plugin modules loaded successfully")
|
|
except ImportError as e:
|
|
logger.warning("Failed to import plugin modules: %s", e)
|
|
|
|
# Migrate legacy security settings if needed
|
|
from shelfmark.config.security import _migrate_security_settings
|
|
|
|
_migrate_security_settings()
|
|
|
|
# Initialize user database and register multi-user routes
|
|
# If CONFIG_DIR doesn't exist or is read-only, multi-user features will be disabled
|
|
import os as _os
|
|
|
|
from shelfmark.core.user_db import UserDB
|
|
|
|
_user_db_path = str(Path(_os.environ.get("CONFIG_DIR", "/config")) / "users.db")
|
|
user_db: UserDB | None = None
|
|
download_history_service: DownloadHistoryService | None = None
|
|
activity_view_state_service: ActivityViewStateService | None = None
|
|
try:
|
|
user_db = UserDB(_user_db_path)
|
|
user_db.initialize()
|
|
download_history_service = DownloadHistoryService(_user_db_path)
|
|
activity_view_state_service = ActivityViewStateService(_user_db_path)
|
|
import_module("shelfmark.config.users_settings")
|
|
from shelfmark.core.admin_routes import register_admin_routes
|
|
from shelfmark.core.oidc_routes import register_oidc_routes
|
|
from shelfmark.core.self_user_routes import register_self_user_routes
|
|
|
|
register_oidc_routes(app, user_db)
|
|
register_admin_routes(app, user_db)
|
|
register_self_user_routes(app, user_db)
|
|
except (sqlite3.OperationalError, OSError) as e:
|
|
logger.warning(
|
|
"User database initialization failed: %s. Multi-user authentication features will be disabled. Ensure CONFIG_DIR (%s) exists and is writable.",
|
|
e,
|
|
_os.environ.get("CONFIG_DIR", "/config"),
|
|
)
|
|
user_db = None
|
|
download_history_service = None
|
|
activity_view_state_service = None
|
|
|
|
# Start download coordinator
|
|
backend.start()
|
|
|
|
# Rate limiting for login attempts
|
|
# Structure: {username: {'count': int, 'lockout_until': datetime}}
|
|
failed_login_attempts: dict[str, dict[str, Any]] = {}
|
|
MAX_LOGIN_ATTEMPTS = 10
|
|
LOCKOUT_DURATION_MINUTES = 30
|
|
|
|
|
|
def cleanup_old_lockouts() -> None:
|
|
"""Remove expired lockout entries to prevent memory buildup."""
|
|
current_time = datetime.now()
|
|
expired_users = [
|
|
username
|
|
for username, data in failed_login_attempts.items()
|
|
if "lockout_until" in data and data["lockout_until"] < current_time
|
|
]
|
|
for username in expired_users:
|
|
logger.info("Lockout expired for user: %s", username)
|
|
del failed_login_attempts[username]
|
|
|
|
|
|
def is_account_locked(username: str) -> bool:
|
|
"""Check if an account is currently locked due to failed login attempts."""
|
|
cleanup_old_lockouts()
|
|
|
|
if username not in failed_login_attempts:
|
|
return False
|
|
|
|
lockout_until = failed_login_attempts[username].get("lockout_until")
|
|
return lockout_until is not None and datetime.now() < lockout_until
|
|
|
|
|
|
def record_failed_login(username: str, ip_address: str) -> bool:
|
|
"""Record a failed login attempt and lock account if threshold is reached.
|
|
|
|
Returns True if account is now locked, False otherwise.
|
|
"""
|
|
if username not in failed_login_attempts:
|
|
failed_login_attempts[username] = {"count": 0}
|
|
|
|
failed_login_attempts[username]["count"] += 1
|
|
count = failed_login_attempts[username]["count"]
|
|
|
|
logger.warning(
|
|
"Failed login attempt %s/%s for user '%s' from IP %s",
|
|
count,
|
|
MAX_LOGIN_ATTEMPTS,
|
|
username,
|
|
ip_address,
|
|
)
|
|
|
|
if count >= MAX_LOGIN_ATTEMPTS:
|
|
lockout_until = datetime.now() + timedelta(minutes=LOCKOUT_DURATION_MINUTES)
|
|
failed_login_attempts[username]["lockout_until"] = lockout_until
|
|
logger.warning(
|
|
"Account locked for user '%s' until %s due to %s failed login attempts",
|
|
username,
|
|
lockout_until.strftime("%Y-%m-%d %H:%M:%S"),
|
|
count,
|
|
)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def clear_failed_logins(username: str) -> None:
|
|
"""Clear failed login attempts for a user after successful login."""
|
|
if username in failed_login_attempts:
|
|
del failed_login_attempts[username]
|
|
logger.debug("Cleared failed login attempts for user: %s", username)
|
|
|
|
|
|
def get_client_ip() -> str:
|
|
"""Extract client IP address from request, handling reverse proxy forwarding."""
|
|
ip_address = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown"
|
|
# X-Forwarded-For can contain multiple IPs, take the first one
|
|
if "," in ip_address:
|
|
ip_address = ip_address.split(",")[0].strip()
|
|
return ip_address
|
|
|
|
|
|
def get_auth_mode() -> str:
|
|
"""Determine which authentication mode is active.
|
|
|
|
Uses configured AUTH_METHOD plus runtime prerequisites.
|
|
Returns "none" when config is invalid or unavailable.
|
|
"""
|
|
return load_active_auth_mode(CWA_DB_PATH, user_db=user_db)
|
|
|
|
|
|
_AUDIOBOOK_CATEGORY_RANGE = (3030, 3049)
|
|
_AUDIOBOOK_FORMAT_HINTS = frozenset(
|
|
{
|
|
"m4b",
|
|
"mp3",
|
|
"m4a",
|
|
"flac",
|
|
"ogg",
|
|
"wma",
|
|
"aac",
|
|
"wav",
|
|
"opus",
|
|
}
|
|
)
|
|
|
|
|
|
def _contains_audiobook_format_hint(value: Any) -> bool:
|
|
if not isinstance(value, str):
|
|
return False
|
|
|
|
normalized = value.strip().lower()
|
|
if not normalized:
|
|
return False
|
|
|
|
tokens = [token for token in re.split(r"[^a-z0-9]+", normalized) if token]
|
|
return any(token in _AUDIOBOOK_FORMAT_HINTS for token in tokens)
|
|
|
|
|
|
def _resolve_release_content_type(data: dict[str, Any], source: Any) -> tuple[str, bool]:
|
|
"""Resolve release content type for policy checks and queue payload normalization."""
|
|
extra = data.get("extra")
|
|
if not isinstance(extra, dict):
|
|
extra = {}
|
|
|
|
explicit_content_type = data.get("content_type")
|
|
if explicit_content_type is None:
|
|
explicit_content_type = extra.get("content_type")
|
|
if explicit_content_type is not None:
|
|
return normalize_content_type(explicit_content_type), False
|
|
|
|
categories = extra.get("categories")
|
|
if isinstance(categories, list):
|
|
min_cat, max_cat = _AUDIOBOOK_CATEGORY_RANGE
|
|
for raw_category in categories:
|
|
try:
|
|
category_id = int(raw_category)
|
|
except TypeError, ValueError:
|
|
continue
|
|
if min_cat <= category_id <= max_cat:
|
|
return "audiobook", True
|
|
|
|
candidates: list[Any] = [
|
|
data.get("format"),
|
|
extra.get("format"),
|
|
extra.get("formats_display"),
|
|
data.get("title"),
|
|
]
|
|
formats = extra.get("formats")
|
|
if isinstance(formats, list):
|
|
candidates.extend(formats)
|
|
else:
|
|
candidates.append(formats)
|
|
|
|
if any(_contains_audiobook_format_hint(candidate) for candidate in candidates):
|
|
return "audiobook", True
|
|
|
|
capabilities = get_source_content_type_capabilities()
|
|
supported = capabilities.get(normalize_source(source))
|
|
if supported and len(supported) == 1:
|
|
return normalize_content_type(next(iter(supported))), True
|
|
|
|
return "ebook", False
|
|
|
|
|
|
def _resolve_policy_mode_for_current_user(*, source: Any, content_type: Any) -> PolicyMode | None:
|
|
"""Resolve policy mode for current session, or None when policy guard is bypassed."""
|
|
auth_mode = get_auth_mode()
|
|
if auth_mode == "none":
|
|
return None
|
|
if session.get("is_admin", True):
|
|
return None
|
|
if user_db is None:
|
|
return None
|
|
|
|
global_settings = load_users_request_policy_settings()
|
|
db_user_id = session.get("db_user_id")
|
|
user_settings: dict[str, Any] | None = None
|
|
if db_user_id is not None:
|
|
try:
|
|
user_settings = user_db.get_user_settings(int(db_user_id))
|
|
except TypeError, ValueError:
|
|
user_settings = None
|
|
|
|
effective = merge_request_policy_settings(global_settings, user_settings)
|
|
if not coerce_bool(effective.get("REQUESTS_ENABLED"), default=False):
|
|
return None
|
|
|
|
resolved_mode = resolve_policy_mode(
|
|
source=source,
|
|
content_type=content_type,
|
|
global_settings=global_settings,
|
|
user_settings=user_settings,
|
|
)
|
|
logger.debug(
|
|
"download policy resolve user=%s db_user_id=%s is_admin=%s source=%s content_type=%s mode=%s",
|
|
session.get("user_id"),
|
|
db_user_id,
|
|
bool(session.get("is_admin", False)),
|
|
source,
|
|
content_type,
|
|
resolved_mode.value,
|
|
)
|
|
return resolved_mode
|
|
|
|
|
|
def _policy_block_response(mode: PolicyMode) -> tuple[Response, int]:
|
|
logger.debug(
|
|
"download policy guard user=%s db_user_id=%s mode=%s",
|
|
session.get("user_id"),
|
|
session.get("db_user_id"),
|
|
mode.value,
|
|
)
|
|
if mode == PolicyMode.BLOCKED:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": "Download not allowed by policy",
|
|
"code": "policy_blocked",
|
|
"required_mode": PolicyMode.BLOCKED.value,
|
|
}
|
|
),
|
|
403,
|
|
)
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": "Download not allowed by policy",
|
|
"code": "policy_requires_request",
|
|
"required_mode": mode.value,
|
|
}
|
|
),
|
|
403,
|
|
)
|
|
|
|
|
|
def _resolve_download_user_context(
|
|
db_user_id: Any,
|
|
username: Any,
|
|
on_behalf_of_user_id: Any,
|
|
) -> tuple[Any, Any, tuple[Response, int] | None]:
|
|
"""Resolve download queue user context, including optional admin on-behalf overrides."""
|
|
if on_behalf_of_user_id in (None, ""):
|
|
return db_user_id, username, None
|
|
|
|
if not session.get("is_admin", False):
|
|
return db_user_id, username, (jsonify({"error": "Admin required"}), 403)
|
|
|
|
if user_db is None:
|
|
return db_user_id, username, (jsonify({"error": "User database unavailable"}), 503)
|
|
|
|
try:
|
|
target_user_id = int(on_behalf_of_user_id)
|
|
except TypeError, ValueError:
|
|
return db_user_id, username, (jsonify({"error": "Invalid on_behalf_of_user_id"}), 400)
|
|
|
|
if target_user_id <= 0:
|
|
return db_user_id, username, (jsonify({"error": "Invalid on_behalf_of_user_id"}), 400)
|
|
|
|
target_user = user_db.get_user(user_id=target_user_id)
|
|
if not target_user:
|
|
return db_user_id, username, (jsonify({"error": "User not found"}), 404)
|
|
|
|
return target_user["id"], target_user["username"], None
|
|
|
|
|
|
def _emit_request_updates(rows: list[dict[str, Any]]) -> None:
|
|
"""Defer request update emission until the runtime hook is available."""
|
|
_emit_request_update_events(rows)
|
|
|
|
|
|
def _resolve_auth_mode_for_routes() -> str:
|
|
"""Resolve auth mode lazily so tests and runtime patches still take effect."""
|
|
return get_auth_mode()
|
|
|
|
|
|
def _queue_release_for_routes(*args: Any, **kwargs: Any) -> Any:
|
|
"""Queue a release via the current backend instance."""
|
|
return backend.queue_release(*args, **kwargs)
|
|
|
|
|
|
def _queue_status_for_routes(user_id: int | None = None) -> dict[str, dict[str, Any]]:
|
|
"""Read queue status via the current backend instance."""
|
|
return backend.queue_status(user_id=user_id)
|
|
|
|
|
|
if user_db is not None:
|
|
try:
|
|
from shelfmark.core.activity_routes import register_activity_routes
|
|
from shelfmark.core.request_routes import register_request_routes
|
|
|
|
register_request_routes(
|
|
app,
|
|
user_db,
|
|
resolve_auth_mode=_resolve_auth_mode_for_routes,
|
|
queue_release=_queue_release_for_routes,
|
|
ws_manager=ws_manager,
|
|
)
|
|
if download_history_service is not None and activity_view_state_service is not None:
|
|
register_activity_routes(
|
|
app,
|
|
user_db,
|
|
activity_view_state_service=activity_view_state_service,
|
|
download_history_service=download_history_service,
|
|
resolve_auth_mode=_resolve_auth_mode_for_routes,
|
|
queue_status=_queue_status_for_routes,
|
|
sync_request_delivery_states=sync_delivery_states_from_queue_status,
|
|
emit_request_updates=_emit_request_updates,
|
|
ws_manager=ws_manager,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to register request routes: %s", e)
|
|
|
|
|
|
# Enable CORS in development mode for local frontend development
|
|
if DEBUG:
|
|
CORS(
|
|
app,
|
|
resources={
|
|
r"/*": {
|
|
"origins": ["http://localhost:5173", "http://127.0.0.1:5173"],
|
|
"supports_credentials": True,
|
|
"allow_headers": ["Content-Type", "Authorization"],
|
|
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
# Custom log filter to exclude routine status endpoint polling and WebSocket noise
|
|
class LogNoiseFilter(logging.Filter):
|
|
"""Filter out routine status endpoint requests and WebSocket upgrade errors to reduce log noise.
|
|
|
|
WebSocket upgrade errors are benign - Flask-SocketIO automatically falls back to polling transport.
|
|
The error occurs because Werkzeug's built-in server doesn't fully support WebSocket upgrades.
|
|
"""
|
|
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
message = record.getMessage() if hasattr(record, "getMessage") else str(record.msg)
|
|
|
|
# Exclude GET /api/status requests (polling noise)
|
|
if "GET /api/status" in message:
|
|
return False
|
|
|
|
# Exclude WebSocket upgrade errors (benign - falls back to polling)
|
|
if "write() before start_response" in message:
|
|
return False
|
|
|
|
# Exclude the Error on request line that precedes WebSocket errors
|
|
if record.levelno == logging.ERROR:
|
|
if "Error on request:" in message:
|
|
return False
|
|
# Filter WebSocket-related AssertionError tracebacks
|
|
if hasattr(record, "exc_info") and record.exc_info:
|
|
exc_type, exc_value = record.exc_info[0], record.exc_info[1]
|
|
if (
|
|
exc_type
|
|
and exc_type.__name__ == "AssertionError"
|
|
and exc_value
|
|
and "write() before start_response" in str(exc_value)
|
|
):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
# Flask logger
|
|
app.logger.handlers = logger.handlers
|
|
app.logger.setLevel(logger.level)
|
|
# Also handle Werkzeug's logger
|
|
werkzeug_logger = logging.getLogger("werkzeug")
|
|
werkzeug_logger.handlers = logger.handlers
|
|
werkzeug_logger.setLevel(logger.level)
|
|
# Add filter to suppress routine status endpoint polling logs and WebSocket upgrade errors
|
|
werkzeug_logger.addFilter(LogNoiseFilter())
|
|
|
|
# Set up authentication defaults
|
|
from shelfmark.config.env import SESSION_COOKIE_NAME, SESSION_COOKIE_SECURE_ENV, string_to_bool
|
|
|
|
SESSION_COOKIE_SECURE = string_to_bool(SESSION_COOKIE_SECURE_ENV)
|
|
|
|
|
|
def _load_or_create_secret_key() -> bytes:
|
|
"""Load a persisted Flask secret key from config, or create one."""
|
|
secret_path = CONFIG_DIR / ".flask_secret"
|
|
|
|
try:
|
|
if secret_path.exists():
|
|
secret_key = secret_path.read_bytes()
|
|
if len(secret_key) >= 32:
|
|
return secret_key
|
|
logger.warning(
|
|
"Invalid persisted Flask secret key at %s (length=%s). Regenerating.",
|
|
secret_path,
|
|
len(secret_key),
|
|
)
|
|
except OSError as exc:
|
|
logger.warning("Failed to read Flask secret key at %s: %s", secret_path, exc)
|
|
|
|
secret_key = os.urandom(64)
|
|
try:
|
|
secret_path.parent.mkdir(parents=True, exist_ok=True)
|
|
secret_path.write_bytes(secret_key)
|
|
secret_path.chmod(0o600)
|
|
except OSError as exc:
|
|
logger.warning(
|
|
"Failed to persist Flask secret key at %s. Sessions may reset on restart: %s",
|
|
secret_path,
|
|
exc,
|
|
)
|
|
|
|
return secret_key
|
|
|
|
|
|
app.config.update(
|
|
SECRET_KEY=_load_or_create_secret_key(),
|
|
SESSION_COOKIE_HTTPONLY=True,
|
|
SESSION_COOKIE_SAMESITE="Lax",
|
|
SESSION_COOKIE_SECURE=SESSION_COOKIE_SECURE,
|
|
SESSION_COOKIE_NAME=SESSION_COOKIE_NAME,
|
|
PERMANENT_SESSION_LIFETIME=604800, # 7 days in seconds
|
|
)
|
|
|
|
logger.info(
|
|
"Session cookie secure setting: %s (from env: %s)",
|
|
SESSION_COOKIE_SECURE,
|
|
SESSION_COOKIE_SECURE_ENV,
|
|
)
|
|
logger.info("Session cookie name: %s", SESSION_COOKIE_NAME)
|
|
|
|
|
|
@app.before_request
|
|
def proxy_auth_middleware() -> Response | tuple[Response, int] | None:
|
|
"""Middleware to handle proxy authentication.
|
|
|
|
When AUTH_METHOD is set to "proxy", this middleware automatically
|
|
authenticates users based on headers set by the reverse proxy.
|
|
"""
|
|
auth_mode = get_auth_mode()
|
|
|
|
# Only run for proxy auth mode
|
|
if auth_mode != "proxy":
|
|
return None
|
|
|
|
# Skip for public endpoints that don't need auth
|
|
if request.path == "/api/health":
|
|
return None
|
|
|
|
def get_proxy_header(header_name: str) -> str | None:
|
|
"""Resolve proxy auth values from headers with WSGI env fallbacks."""
|
|
value = request.headers.get(header_name)
|
|
if value:
|
|
return value
|
|
|
|
env_key = f"HTTP_{header_name.upper().replace('-', '_')}"
|
|
value = request.environ.get(env_key)
|
|
if value:
|
|
return value
|
|
|
|
# Some proxies set authenticated username in REMOTE_USER (not as a header).
|
|
if header_name.lower().replace("_", "-") == "remote-user":
|
|
return request.environ.get("REMOTE_USER")
|
|
|
|
return None
|
|
|
|
try:
|
|
user_header = app_config.get("PROXY_AUTH_USER_HEADER", "X-Auth-User")
|
|
|
|
# Extract username from proxy header
|
|
username = get_proxy_header(user_header)
|
|
|
|
if not username:
|
|
if request.path.startswith("/api/auth/"):
|
|
return None
|
|
|
|
logger.warning("Proxy auth enabled but no username found in header '%s'", user_header)
|
|
return jsonify({"error": "Authentication required. Proxy header not set."}), 401
|
|
|
|
# Resolve admin role for proxy sessions.
|
|
# If an admin group is configured, derive from groups header.
|
|
# Otherwise preserve existing DB role for known users and default
|
|
# first-time users to admin (to avoid lockouts).
|
|
admin_group_header = app_config.get("PROXY_AUTH_ADMIN_GROUP_HEADER", "X-Auth-Groups")
|
|
admin_group_name = str(app_config.get("PROXY_AUTH_ADMIN_GROUP_NAME", "") or "").strip()
|
|
is_admin = True
|
|
|
|
if admin_group_name:
|
|
groups_header = get_proxy_header(admin_group_header) or ""
|
|
user_groups_delimiter = "," if "," in groups_header else "|"
|
|
user_groups = [
|
|
g.strip() for g in groups_header.split(user_groups_delimiter) if g.strip()
|
|
]
|
|
is_admin = admin_group_name in user_groups
|
|
elif user_db is not None:
|
|
existing_db_user = user_db.get_user(username=username)
|
|
if existing_db_user:
|
|
is_admin = existing_db_user.get("role") == "admin"
|
|
|
|
# Create or update session
|
|
previous_username = session.get("user_id")
|
|
if previous_username and previous_username != username:
|
|
# Header identity changed mid-session; force reprovision for the new user.
|
|
session.pop("db_user_id", None)
|
|
|
|
session["user_id"] = username
|
|
session["is_admin"] = is_admin
|
|
|
|
# Provision proxy-authenticated users into users.db for multi-user features.
|
|
# Re-provision when db_user_id is missing/stale/mismatched to avoid broken
|
|
# sessions after DB resets or auth-mode transitions.
|
|
if user_db is not None:
|
|
raw_db_user_id = session.get("db_user_id")
|
|
session_db_user = None
|
|
|
|
if raw_db_user_id is not None:
|
|
try:
|
|
session_db_user = user_db.get_user(user_id=int(raw_db_user_id))
|
|
except TypeError, ValueError:
|
|
session_db_user = None
|
|
|
|
session_db_username = (
|
|
str(session_db_user.get("username") or "").strip() if session_db_user else ""
|
|
)
|
|
needs_db_user_sync = (
|
|
raw_db_user_id is None or session_db_user is None or session_db_username != username
|
|
)
|
|
|
|
if needs_db_user_sync:
|
|
role = "admin" if is_admin else "user"
|
|
db_user, _ = upsert_external_user(
|
|
user_db,
|
|
auth_source="proxy",
|
|
username=username,
|
|
role=role,
|
|
collision_strategy="takeover",
|
|
context="proxy_request",
|
|
)
|
|
if db_user is None:
|
|
_raise_runtime_error("Unexpected proxy user sync result: no user returned")
|
|
|
|
session["db_user_id"] = db_user["id"]
|
|
|
|
session.permanent = False
|
|
except Exception:
|
|
logger.exception("Proxy auth middleware error")
|
|
return jsonify({"error": "Authentication error"}), 500
|
|
else:
|
|
return None
|
|
|
|
|
|
@app.after_request
|
|
def set_security_headers(response: Response) -> Response:
|
|
"""Add baseline security headers to every response."""
|
|
response.headers.setdefault("X-Content-Type-Options", "nosniff")
|
|
response.headers.setdefault(
|
|
"Content-Security-Policy",
|
|
"default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: blob:; connect-src 'self' ws: wss:",
|
|
)
|
|
response.headers.setdefault("Permissions-Policy", "camera=(), microphone=(), geolocation=()")
|
|
response.headers.setdefault("Cross-Origin-Embedder-Policy", "credentialless")
|
|
return response
|
|
|
|
|
|
def login_required(
|
|
f: Callable[..., Response | tuple[Response, int]],
|
|
) -> Callable[..., Response | tuple[Response, int]]:
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs) -> Response | tuple[Response, int]:
|
|
auth_mode = get_auth_mode()
|
|
|
|
# If no authentication is configured, allow access
|
|
if auth_mode == "none":
|
|
return f(*args, **kwargs)
|
|
|
|
# If CWA mode and database disappeared after startup, return error
|
|
if auth_mode == "cwa" and CWA_DB_PATH and not CWA_DB_PATH.exists():
|
|
logger.error("CWA database at %s is no longer accessible", CWA_DB_PATH)
|
|
return jsonify({"error": "Internal Server Error"}), 500
|
|
|
|
# Check if user has a valid session
|
|
if "user_id" not in session:
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
# Check admin access for settings/onboarding endpoints.
|
|
if is_settings_or_onboarding_path(request.path):
|
|
try:
|
|
if requires_admin_for_settings_access(request.path, {}) and not session.get(
|
|
"is_admin", False
|
|
):
|
|
return jsonify({"error": "Admin access required"}), 403
|
|
|
|
except Exception:
|
|
logger.exception("Admin access check error")
|
|
return jsonify({"error": "Internal Server Error"}), 500
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
_BASE_TAG = '<base href="/" data-shelfmark-base />'
|
|
|
|
|
|
def _base_href() -> str:
|
|
if not BASE_PATH:
|
|
return "/"
|
|
return f"{BASE_PATH}/"
|
|
|
|
|
|
def _serve_index_html() -> Response:
|
|
"""Serve index.html with an adjusted base tag for subpath deployments."""
|
|
index_path = FRONTEND_DIST / "index.html"
|
|
try:
|
|
with index_path.open(encoding="utf-8") as handle:
|
|
html = handle.read()
|
|
except OSError:
|
|
return send_from_directory(FRONTEND_DIST, "index.html")
|
|
|
|
if BASE_PATH and _BASE_TAG in html:
|
|
html = html.replace(_BASE_TAG, f'<base href="{_base_href()}" data-shelfmark-base />', 1)
|
|
|
|
return Response(html, mimetype="text/html")
|
|
|
|
|
|
# Serve frontend static files
|
|
@app.route("/assets/<path:filename>")
|
|
def serve_frontend_assets(filename: str) -> Response:
|
|
"""Serve static assets from the built frontend."""
|
|
return send_from_directory(FRONTEND_DIST / "assets", filename)
|
|
|
|
|
|
@app.route("/")
|
|
def index() -> Response:
|
|
"""Serve the React frontend application.
|
|
Authentication is handled by the React app itself.
|
|
"""
|
|
return _serve_index_html()
|
|
|
|
|
|
@app.route("/theme-init.js")
|
|
def theme_init_js() -> Response:
|
|
"""Serve the blocking theme-init script."""
|
|
return send_from_directory(FRONTEND_DIST, "theme-init.js", mimetype="application/javascript")
|
|
|
|
|
|
@app.route("/logo.png")
|
|
def logo() -> Response:
|
|
"""Serve logo from built frontend assets."""
|
|
return send_from_directory(FRONTEND_DIST, "logo.png", mimetype="image/png")
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
@app.route("/favico<path:_>")
|
|
def favicon(_: Any = None) -> Response:
|
|
"""Serve favicon from built frontend assets."""
|
|
return send_from_directory(FRONTEND_DIST, "favicon.ico", mimetype="image/vnd.microsoft.icon")
|
|
|
|
|
|
if DEBUG:
|
|
import subprocess
|
|
|
|
def _stop_gui() -> None:
|
|
return None
|
|
|
|
if app_config.get("USING_EXTERNAL_BYPASSER", False):
|
|
pass
|
|
else:
|
|
from shelfmark.bypass.internal_bypasser import _cleanup_orphan_processes as _stop_gui
|
|
|
|
@app.route("/api/debug", methods=["GET"])
|
|
@login_required
|
|
def debug() -> Response | tuple[Response, int]:
|
|
"""This will run the /app/genDebug.sh script, which will generate a debug zip with all the logs
|
|
The file will be named /tmp/shelfmark-debug.zip
|
|
And then return it to the user
|
|
"""
|
|
try:
|
|
logger.info("Debug endpoint called, stopping GUI and generating debug info...")
|
|
_stop_gui()
|
|
time.sleep(1)
|
|
result = subprocess.run(
|
|
["/app/genDebug.sh"], capture_output=True, text=True, check=True
|
|
)
|
|
if result.returncode != 0:
|
|
_raise_runtime_error(f"Debug script failed: {result.stderr}")
|
|
logger.info("Debug script executed: %s", result.stdout)
|
|
debug_file_path = result.stdout.strip().split("\n")[-1]
|
|
if not Path(debug_file_path).exists():
|
|
logger.error("Debug zip file not found at: %s", debug_file_path)
|
|
return jsonify({"error": "Failed to generate debug information"}), 500
|
|
|
|
logger.info("Sending debug file: %s", debug_file_path)
|
|
return send_file(
|
|
debug_file_path,
|
|
mimetype="application/zip",
|
|
download_name=Path(debug_file_path).name,
|
|
as_attachment=True,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error_trace(f"Debug script error: {e}, stdout: {e.stdout}, stderr: {e.stderr}")
|
|
return jsonify({"error": f"Debug script failed: {e.stderr}"}), 500
|
|
except Exception as e:
|
|
logger.error_trace(f"Debug endpoint error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/restart", methods=["GET"])
|
|
@login_required
|
|
def restart() -> Response | tuple[Response, int]:
|
|
"""Restart the application"""
|
|
os._exit(0)
|
|
|
|
|
|
def _parse_search_filters_from_request() -> SearchFilters:
|
|
"""Parse direct/source browse filters from query parameters."""
|
|
return SearchFilters(
|
|
isbn=request.args.getlist("isbn"),
|
|
author=request.args.getlist("author"),
|
|
title=request.args.getlist("title"),
|
|
lang=request.args.getlist("lang"),
|
|
sort=request.args.get("sort"),
|
|
content=request.args.getlist("content"),
|
|
format=request.args.getlist("format"),
|
|
)
|
|
|
|
|
|
def _build_source_query_book(query_text: str, filters: SearchFilters) -> BookMetadata:
|
|
"""Build a synthetic book context for source-native browse searches."""
|
|
from shelfmark.metadata_providers import BookMetadata
|
|
|
|
author_values = [value.strip() for value in (filters.author or []) if str(value).strip()]
|
|
title_values = [value.strip() for value in (filters.title or []) if str(value).strip()]
|
|
isbn_values = [value.strip() for value in (filters.isbn or []) if str(value).strip()]
|
|
title = (
|
|
title_values[0]
|
|
if title_values
|
|
else query_text
|
|
or (isbn_values[0] if isbn_values else "")
|
|
or (author_values[0] if author_values else "Direct Search")
|
|
)
|
|
author = author_values[0] if author_values else ""
|
|
|
|
return BookMetadata(
|
|
provider="manual",
|
|
provider_id=query_text or title,
|
|
provider_display_name="Manual Search",
|
|
title=title,
|
|
search_title=title,
|
|
search_author=author or None,
|
|
authors=author_values,
|
|
)
|
|
|
|
|
|
def _serialize_browse_record(record: BrowseRecord) -> dict:
|
|
"""Serialize a source-native browse record for the frontend."""
|
|
result = {key: value for key, value in record.__dict__.items() if value is not None}
|
|
|
|
preview = result.get("preview")
|
|
if isinstance(preview, str) and preview:
|
|
from shelfmark.core.utils import transform_cover_url
|
|
|
|
result["preview"] = transform_cover_url(preview, record.id)
|
|
|
|
return result
|
|
|
|
|
|
def _serialize_release(release: Release) -> dict:
|
|
"""Serialize a release for the frontend, normalizing preview URLs."""
|
|
from dataclasses import asdict
|
|
|
|
from shelfmark.core.utils import transform_cover_url
|
|
|
|
result = asdict(release)
|
|
extra = result.get("extra")
|
|
if isinstance(extra, dict):
|
|
preview = extra.get("preview")
|
|
if isinstance(preview, str) and preview:
|
|
extra = dict(extra)
|
|
extra["preview"] = transform_cover_url(preview, release.source_id)
|
|
result["extra"] = extra
|
|
|
|
return result
|
|
|
|
|
|
@app.route("/api/releases/download", methods=["POST"])
|
|
@login_required
|
|
def api_download_release() -> Response | tuple[Response, int]:
|
|
"""Queue a release for download.
|
|
|
|
This endpoint is used when downloading from the ReleaseModal where the
|
|
frontend already has all the release data from the search results.
|
|
|
|
Request Body (JSON):
|
|
source (str): Release source (e.g., "direct_download")
|
|
source_id (str): ID within the source (e.g., AA MD5 hash)
|
|
title (str): Book title
|
|
format (str, optional): File format
|
|
size (str, optional): Human-readable size
|
|
extra (dict, optional): Additional metadata
|
|
|
|
Returns:
|
|
flask.Response: JSON status object indicating success or failure.
|
|
|
|
"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "No data provided"}), 400
|
|
|
|
if "source_id" not in data:
|
|
return jsonify({"error": "source_id is required"}), 400
|
|
if "source" not in data:
|
|
return jsonify({"error": "source is required"}), 400
|
|
|
|
source = data["source"]
|
|
resolved_content_type, inferred_content_type = _resolve_release_content_type(data, source)
|
|
policy_mode = _resolve_policy_mode_for_current_user(
|
|
source=source,
|
|
content_type=resolved_content_type,
|
|
)
|
|
if policy_mode is not None and policy_mode != PolicyMode.DOWNLOAD:
|
|
return _policy_block_response(policy_mode)
|
|
|
|
release_payload = data
|
|
if inferred_content_type and data.get("content_type") is None:
|
|
release_payload = dict(data)
|
|
release_payload["content_type"] = resolved_content_type
|
|
|
|
priority = data.get("priority", 0)
|
|
# Per-user download overrides
|
|
db_user_id = session.get("db_user_id")
|
|
_username = session.get("user_id")
|
|
db_user_id, _username, on_behalf_error = _resolve_download_user_context(
|
|
db_user_id,
|
|
_username,
|
|
data.get("on_behalf_of_user_id"),
|
|
)
|
|
if on_behalf_error:
|
|
return on_behalf_error
|
|
success, error_msg = backend.queue_release(
|
|
release_payload,
|
|
priority,
|
|
user_id=db_user_id,
|
|
username=_username,
|
|
)
|
|
|
|
if success:
|
|
return jsonify({"status": "queued", "priority": priority})
|
|
return jsonify({"error": error_msg or "Failed to queue release"}), 500
|
|
except Exception as e:
|
|
logger.error_trace(f"Release download error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/config", methods=["GET"])
|
|
@login_required
|
|
def api_config() -> Response | tuple[Response, int]:
|
|
"""Get application configuration for frontend.
|
|
|
|
Uses the dynamic config singleton to ensure settings changes
|
|
are reflected without requiring a container restart.
|
|
"""
|
|
try:
|
|
from shelfmark.config.env import _is_config_dir_writable
|
|
from shelfmark.core.onboarding import is_onboarding_complete as _get_onboarding_complete
|
|
from shelfmark.metadata_providers import (
|
|
get_provider_default_sort,
|
|
get_provider_search_fields,
|
|
get_provider_sort_options,
|
|
)
|
|
|
|
db_user_id = get_session_db_user_id(session)
|
|
|
|
search_mode = app_config.get("SEARCH_MODE", "direct", user_id=db_user_id)
|
|
default_release_source = app_config.get(
|
|
"DEFAULT_RELEASE_SOURCE",
|
|
"direct_download",
|
|
user_id=db_user_id,
|
|
)
|
|
default_release_source_audiobook = app_config.get(
|
|
"DEFAULT_RELEASE_SOURCE_AUDIOBOOK",
|
|
"",
|
|
user_id=db_user_id,
|
|
)
|
|
configured_metadata_provider = app_config.get(
|
|
"METADATA_PROVIDER",
|
|
"",
|
|
user_id=db_user_id,
|
|
)
|
|
_configured_metadata_provider_audiobook = app_config.get(
|
|
"METADATA_PROVIDER_AUDIOBOOK",
|
|
"",
|
|
user_id=db_user_id,
|
|
)
|
|
metadata_ui_provider = (
|
|
configured_metadata_provider or _configured_metadata_provider_audiobook
|
|
)
|
|
|
|
config = {
|
|
"calibre_web_url": app_config.get("CALIBRE_WEB_URL", ""),
|
|
"audiobook_library_url": app_config.get("AUDIOBOOK_LIBRARY_URL", ""),
|
|
"debug": app_config.get("DEBUG", False),
|
|
"build_version": BUILD_VERSION,
|
|
"release_version": RELEASE_VERSION,
|
|
"book_languages": _SUPPORTED_BOOK_LANGUAGE,
|
|
"default_language": app_config.BOOK_LANGUAGE,
|
|
"supported_formats": app_config.SUPPORTED_FORMATS,
|
|
"supported_audiobook_formats": app_config.SUPPORTED_AUDIOBOOK_FORMATS,
|
|
"search_mode": search_mode,
|
|
"metadata_sort_options": get_provider_sort_options(metadata_ui_provider),
|
|
"metadata_search_fields": get_provider_search_fields(metadata_ui_provider),
|
|
"default_release_source": default_release_source,
|
|
"default_release_source_audiobook": default_release_source_audiobook,
|
|
"show_release_source_links": app_config.get("SHOW_RELEASE_SOURCE_LINKS", True),
|
|
"show_combined_selector": app_config.get(
|
|
"SHOW_COMBINED_SELECTOR", True, user_id=db_user_id
|
|
),
|
|
"books_output_mode": app_config.get("BOOKS_OUTPUT_MODE", "folder"),
|
|
"auto_open_downloads_sidebar": app_config.get("AUTO_OPEN_DOWNLOADS_SIDEBAR", True),
|
|
"hardcover_auto_remove_on_download": app_config.get(
|
|
"HARDCOVER_AUTO_REMOVE_ON_DOWNLOAD", True
|
|
),
|
|
"download_to_browser_content_types": app_config.get(
|
|
"DOWNLOAD_TO_BROWSER_CONTENT_TYPES",
|
|
[],
|
|
user_id=db_user_id,
|
|
),
|
|
"settings_enabled": _is_config_dir_writable(),
|
|
"onboarding_complete": _get_onboarding_complete(),
|
|
# Default sort orders
|
|
"default_sort": app_config.get(
|
|
"AA_DEFAULT_SORT", "relevance"
|
|
), # For direct mode (Anna's Archive)
|
|
"metadata_default_sort": get_provider_default_sort(
|
|
metadata_ui_provider
|
|
), # For universal mode
|
|
}
|
|
return jsonify(config)
|
|
except Exception as e:
|
|
logger.error_trace(f"Config error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/health", methods=["GET"])
|
|
def api_health() -> Response | tuple[Response, int]:
|
|
"""Health check endpoint for container orchestration.
|
|
No authentication required.
|
|
|
|
Returns:
|
|
flask.Response: JSON with status "ok" and optional degraded features.
|
|
|
|
"""
|
|
response = {"status": "ok"}
|
|
|
|
# Report degraded features
|
|
if not backend.WEBSOCKET_AVAILABLE:
|
|
response["degraded"] = {"websocket": "WebSocket unavailable - real-time updates disabled"}
|
|
|
|
return jsonify(response)
|
|
|
|
|
|
def _resolve_status_scope(*, require_authenticated: bool = True) -> tuple[bool, int | None, bool]:
|
|
"""Resolve queue-status visibility from session state.
|
|
|
|
Returns:
|
|
(is_admin, db_user_id, can_access_status)
|
|
|
|
"""
|
|
auth_mode = get_auth_mode()
|
|
if auth_mode == "none":
|
|
return True, None, True
|
|
|
|
if require_authenticated and "user_id" not in session:
|
|
return False, None, False
|
|
|
|
is_admin = bool(session.get("is_admin", False))
|
|
if is_admin:
|
|
return True, None, True
|
|
|
|
db_user_id = get_session_db_user_id(session)
|
|
|
|
if db_user_id is None:
|
|
return False, None, False
|
|
|
|
return False, db_user_id, True
|
|
|
|
|
|
def _queue_status_to_final_activity_status(status: QueueStatus) -> str | None:
|
|
return status.value if status in TERMINAL_QUEUE_STATUSES else None
|
|
|
|
|
|
def _queue_status_to_notification_event(status: QueueStatus) -> NotificationEvent | None:
|
|
if status == QueueStatus.COMPLETE:
|
|
return NotificationEvent.DOWNLOAD_COMPLETE
|
|
if status == QueueStatus.ERROR:
|
|
return NotificationEvent.DOWNLOAD_FAILED
|
|
return None
|
|
|
|
|
|
def _notify_admin_for_terminal_download_status(
|
|
*, task_id: str, status: QueueStatus, task: Any
|
|
) -> None:
|
|
event = _queue_status_to_notification_event(status)
|
|
if event is None:
|
|
return
|
|
|
|
raw_owner_user_id = getattr(task, "user_id", None)
|
|
try:
|
|
owner_user_id = int(raw_owner_user_id) if raw_owner_user_id is not None else None
|
|
except TypeError, ValueError:
|
|
owner_user_id = None
|
|
|
|
content_type = normalize_optional_text(getattr(task, "content_type", None))
|
|
context = NotificationContext(
|
|
event=event,
|
|
title=str(getattr(task, "title", "Unknown title") or "Unknown title"),
|
|
author=str(getattr(task, "author", "Unknown author") or "Unknown author"),
|
|
username=normalize_optional_text(getattr(task, "username", None)),
|
|
content_type=normalize_content_type(content_type) if content_type is not None else None,
|
|
format=normalize_optional_text(getattr(task, "format", None)),
|
|
source=normalize_source(getattr(task, "source", None)),
|
|
error_message=(
|
|
normalize_optional_text(getattr(task, "status_message", None))
|
|
if event == NotificationEvent.DOWNLOAD_FAILED
|
|
else None
|
|
),
|
|
)
|
|
try:
|
|
notify_admin(event, context)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to trigger admin notification for download %s (%s): %s",
|
|
task_id,
|
|
status.value,
|
|
exc,
|
|
)
|
|
if owner_user_id is None:
|
|
return
|
|
try:
|
|
notify_user(owner_user_id, event, context)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to trigger user notification for download %s (%s, user_id=%s): %s",
|
|
task_id,
|
|
status.value,
|
|
owner_user_id,
|
|
exc,
|
|
)
|
|
|
|
|
|
def _emit_activity_update_for_task(*, payload: dict[str, Any], task: Any) -> None:
|
|
owner_user_id = normalize_positive_int(getattr(task, "user_id", None))
|
|
emit_ws_event(
|
|
ws_manager,
|
|
event_name="activity_update",
|
|
room="admins",
|
|
payload=payload,
|
|
)
|
|
if owner_user_id is None:
|
|
return
|
|
emit_ws_event(
|
|
ws_manager,
|
|
event_name="activity_update",
|
|
room=f"user_{owner_user_id}",
|
|
payload=payload,
|
|
)
|
|
|
|
|
|
def _record_download_queued(task_id: str, task: Any) -> None:
|
|
"""Persist initial download record when a task enters the queue."""
|
|
if download_history_service is None:
|
|
return
|
|
|
|
owner_user_id = normalize_positive_int(getattr(task, "user_id", None))
|
|
request_id = normalize_positive_int(getattr(task, "request_id", None))
|
|
origin = "requested" if request_id else "direct"
|
|
|
|
source_name = normalize_source(getattr(task, "source", None))
|
|
try:
|
|
source_display = get_source_display_name(source_name)
|
|
except Exception:
|
|
source_display = None
|
|
|
|
try:
|
|
download_history_service.record_download(
|
|
task_id=task_id,
|
|
user_id=owner_user_id,
|
|
username=normalize_optional_text(getattr(task, "username", None)),
|
|
request_id=request_id,
|
|
source=source_name,
|
|
source_display_name=source_display,
|
|
title=str(getattr(task, "title", "Unknown title") or "Unknown title"),
|
|
author=normalize_optional_text(getattr(task, "author", None)),
|
|
format=normalize_optional_text(getattr(task, "format", None)),
|
|
size=normalize_optional_text(getattr(task, "size", None)),
|
|
preview=normalize_optional_text(getattr(task, "preview", None)),
|
|
content_type=normalize_optional_text(getattr(task, "content_type", None)),
|
|
origin=origin,
|
|
retry_payload=backend.serialize_task_for_retry(task),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Failed to record download at queue time for task %s: %s", task_id, exc)
|
|
return
|
|
|
|
if activity_view_state_service is None:
|
|
return
|
|
|
|
try:
|
|
cleared_view_state = 0
|
|
cleared_view_state += activity_view_state_service.clear_item_for_all_viewers(
|
|
item_type="download",
|
|
item_key=f"download:{task_id}",
|
|
)
|
|
if request_id is not None:
|
|
cleared_view_state += activity_view_state_service.clear_item_for_all_viewers(
|
|
item_type="request",
|
|
item_key=f"request:{request_id}",
|
|
)
|
|
if cleared_view_state > 0:
|
|
_emit_activity_update_for_task(
|
|
task=task,
|
|
payload={
|
|
"kind": "activity_reset",
|
|
"task_id": task_id,
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Failed to reset activity viewer state for task %s: %s", task_id, exc)
|
|
|
|
|
|
def _record_download_terminal_snapshot(task_id: str, status: QueueStatus, task: Any) -> None:
|
|
_notify_admin_for_terminal_download_status(task_id=task_id, status=status, task=task)
|
|
|
|
final_status = _queue_status_to_final_activity_status(status)
|
|
if final_status is None:
|
|
return
|
|
|
|
finalized_download = False
|
|
if download_history_service is not None:
|
|
try:
|
|
download_history_service.finalize_download(
|
|
task_id=task_id,
|
|
final_status=final_status,
|
|
status_message=normalize_optional_text(getattr(task, "status_message", None)),
|
|
download_path=normalize_optional_text(getattr(task, "download_path", None)),
|
|
retry_payload=backend.serialize_task_for_retry(task),
|
|
)
|
|
finalized_download = True
|
|
except Exception as exc:
|
|
logger.warning("Failed to finalize download history for task %s: %s", task_id, exc)
|
|
|
|
if finalized_download:
|
|
_emit_activity_update_for_task(
|
|
task=task,
|
|
payload={
|
|
"kind": "download_terminal",
|
|
"task_id": task_id,
|
|
"status": final_status,
|
|
},
|
|
)
|
|
|
|
if user_db is None or status != QueueStatus.ERROR:
|
|
return
|
|
|
|
request_id = normalize_positive_int(getattr(task, "request_id", None))
|
|
if request_id is None:
|
|
return
|
|
if backend.can_retry_download_task(task, status):
|
|
return
|
|
|
|
raw_error_message = getattr(task, "status_message", None)
|
|
fallback_reason = (
|
|
raw_error_message.strip()
|
|
if isinstance(raw_error_message, str) and raw_error_message.strip()
|
|
else "Download failed"
|
|
)
|
|
try:
|
|
reopened_request = reopen_failed_request(
|
|
user_db,
|
|
request_id=request_id,
|
|
failure_reason=fallback_reason,
|
|
)
|
|
if reopened_request is not None:
|
|
if activity_view_state_service is not None:
|
|
activity_view_state_service.clear_item_for_all_viewers(
|
|
item_type="request",
|
|
item_key=f"request:{request_id}",
|
|
)
|
|
_emit_request_update_events([reopened_request])
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to reopen request %s after terminal download error %s: %s",
|
|
request_id,
|
|
task_id,
|
|
exc,
|
|
)
|
|
|
|
|
|
def _task_owned_by_actor(
|
|
task: Any, *, actor_user_id: int | None, actor_username: str | None
|
|
) -> bool:
|
|
raw_task_user_id = getattr(task, "user_id", None)
|
|
try:
|
|
task_user_id = int(raw_task_user_id) if raw_task_user_id is not None else None
|
|
except TypeError, ValueError:
|
|
task_user_id = None
|
|
|
|
if actor_user_id is not None and task_user_id is not None:
|
|
return task_user_id == actor_user_id
|
|
|
|
task_username = getattr(task, "username", None)
|
|
if isinstance(task_username, str) and task_username.strip() and isinstance(actor_username, str):
|
|
return task_username.strip() == actor_username.strip()
|
|
|
|
return False
|
|
|
|
|
|
def _download_row_owned_by_actor(
|
|
row: dict[str, Any],
|
|
*,
|
|
actor_user_id: int | None,
|
|
actor_username: str | None,
|
|
) -> bool:
|
|
owner_user_id = normalize_positive_int(row.get("user_id"))
|
|
if actor_user_id is not None and owner_user_id is not None:
|
|
return owner_user_id == actor_user_id
|
|
|
|
row_username = normalize_optional_text(row.get("username"))
|
|
if row_username is not None and isinstance(actor_username, str):
|
|
return row_username == actor_username.strip()
|
|
|
|
return False
|
|
|
|
|
|
backend.book_queue.set_queue_hook(_record_download_queued)
|
|
backend.book_queue.set_terminal_status_hook(_record_download_terminal_snapshot)
|
|
|
|
|
|
def _emit_request_update_events(updated_requests: list[dict[str, Any]]) -> None:
|
|
"""Broadcast request_update events for rows changed by delivery-state sync."""
|
|
if not updated_requests or ws_manager is None:
|
|
return
|
|
|
|
try:
|
|
socketio_ref = getattr(ws_manager, "socketio", None)
|
|
is_enabled = getattr(ws_manager, "is_enabled", None)
|
|
if socketio_ref is None or not callable(is_enabled) or not is_enabled():
|
|
return
|
|
|
|
for updated in updated_requests:
|
|
payload = {
|
|
"request_id": updated["id"],
|
|
"status": updated["status"],
|
|
"delivery_state": updated.get("delivery_state"),
|
|
"title": (updated.get("book_data") or {}).get("title") or "Unknown title",
|
|
}
|
|
socketio_ref.emit("request_update", payload, to=f"user_{updated['user_id']}")
|
|
socketio_ref.emit("request_update", payload, to="admins")
|
|
except Exception as exc:
|
|
logger.warning("Failed to emit delivery request_update events: %s", exc)
|
|
|
|
|
|
@app.route("/api/status", methods=["GET"])
|
|
@login_required
|
|
def api_status() -> Response | tuple[Response, int]:
|
|
"""Get current download queue status.
|
|
|
|
Returns:
|
|
flask.Response: JSON object with queue status.
|
|
|
|
"""
|
|
try:
|
|
is_admin, db_user_id, can_access_status = _resolve_status_scope()
|
|
if not can_access_status:
|
|
return jsonify({})
|
|
|
|
user_id = None if is_admin else db_user_id
|
|
status = backend.queue_status(user_id=user_id)
|
|
if user_db is not None:
|
|
updated_requests = sync_delivery_states_from_queue_status(
|
|
user_db,
|
|
queue_status=status,
|
|
user_id=user_id,
|
|
)
|
|
_emit_request_update_events(updated_requests)
|
|
return jsonify(status)
|
|
except Exception as e:
|
|
logger.error_trace(f"Status error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/localdownload", methods=["GET"])
|
|
@login_required
|
|
def api_local_download() -> Response | tuple[Response, int]:
|
|
"""Download an EPUB file from local storage if available.
|
|
|
|
Query Parameters:
|
|
id (str): Book identifier (MD5 hash)
|
|
|
|
Returns:
|
|
flask.Response: The EPUB file if found, otherwise an error response.
|
|
|
|
"""
|
|
book_id = request.args.get("id", "")
|
|
if not book_id:
|
|
return jsonify({"error": "No book ID provided"}), 400
|
|
|
|
try:
|
|
file_data, book_info = backend.get_book_data(book_id)
|
|
if file_data is None:
|
|
# Fallback for dismissed/history entries where queue task may no longer exist.
|
|
if download_history_service is not None:
|
|
is_admin, db_user_id, can_access_status = _resolve_status_scope()
|
|
if can_access_status:
|
|
history_row = download_history_service.get_by_task_id(book_id)
|
|
if history_row is not None:
|
|
owner_user_id = history_row.get("user_id")
|
|
if is_admin or owner_user_id == db_user_id:
|
|
download_path = DownloadHistoryService._resolve_existing_download_path(
|
|
history_row.get("download_path")
|
|
)
|
|
if download_path:
|
|
return send_file(
|
|
download_path,
|
|
download_name=Path(download_path).name,
|
|
as_attachment=True,
|
|
)
|
|
|
|
# Book data not found or not available
|
|
return jsonify({"error": "File not found"}), 404
|
|
file_name = book_info.get_filename() if book_info is not None else Path(book_id).name
|
|
# Prepare the file for sending to the client
|
|
data = io.BytesIO(file_data)
|
|
return send_file(data, download_name=file_name, as_attachment=True)
|
|
|
|
except Exception as e:
|
|
logger.error_trace(f"Local download error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/covers/<cover_id>", methods=["GET"])
|
|
def api_cover(cover_id: str) -> Response | tuple[Response, int]:
|
|
"""Serve a cached book cover image.
|
|
|
|
This endpoint proxies and caches cover images from external sources.
|
|
Images are cached to disk for faster subsequent requests.
|
|
|
|
Path Parameters:
|
|
cover_id (str): Cover identifier (book ID or composite key for universal mode)
|
|
|
|
Query Parameters:
|
|
url (str): Base64-encoded original image URL (required on first request)
|
|
|
|
Returns:
|
|
flask.Response: Binary image data with appropriate Content-Type, or 404.
|
|
|
|
"""
|
|
try:
|
|
import base64
|
|
|
|
from shelfmark.config.env import is_covers_cache_enabled
|
|
from shelfmark.core.image_cache import get_image_cache
|
|
|
|
# Check if caching is enabled
|
|
if not is_covers_cache_enabled():
|
|
return jsonify({"error": "Cover caching is disabled"}), 404
|
|
|
|
cache = get_image_cache()
|
|
|
|
# Try to get from cache first
|
|
cached = cache.get(cover_id)
|
|
if cached:
|
|
image_data, content_type = cached
|
|
response = app.response_class(response=image_data, status=200, mimetype=content_type)
|
|
response.headers["Cache-Control"] = "public, max-age=86400"
|
|
response.headers["X-Cache"] = "HIT"
|
|
return response
|
|
|
|
# Cache miss - get URL from query parameter
|
|
encoded_url = request.args.get("url")
|
|
if not encoded_url:
|
|
return jsonify({"error": "Cover URL not provided"}), 404
|
|
|
|
try:
|
|
original_url = base64.urlsafe_b64decode(encoded_url).decode()
|
|
except Exception as e:
|
|
logger.warning("Failed to decode cover URL: %s", e)
|
|
return jsonify({"error": "Invalid cover URL encoding"}), 400
|
|
|
|
# Fetch and cache the image
|
|
result = cache.fetch_and_cache(cover_id, original_url)
|
|
if not result:
|
|
return jsonify({"error": "Failed to fetch cover image"}), 404
|
|
|
|
image_data, content_type = result
|
|
response = app.response_class(response=image_data, status=200, mimetype=content_type)
|
|
response.headers["Cache-Control"] = "public, max-age=86400"
|
|
response.headers["X-Cache"] = "MISS"
|
|
except Exception as e:
|
|
logger.error_trace(f"Cover fetch error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
else:
|
|
return response
|
|
|
|
|
|
@app.route("/api/download/<path:book_id>/cancel", methods=["DELETE"])
|
|
@login_required
|
|
def api_cancel_download(book_id: str) -> Response | tuple[Response, int]:
|
|
"""Cancel a download.
|
|
|
|
Path Parameters:
|
|
book_id (str): Book identifier to cancel
|
|
|
|
Returns:
|
|
flask.Response: JSON status indicating success or failure.
|
|
|
|
"""
|
|
try:
|
|
task = backend.book_queue.get_task(book_id)
|
|
if task is None:
|
|
return jsonify({"error": "Failed to cancel download or book not found"}), 404
|
|
|
|
is_admin, db_user_id, can_access_status = _resolve_status_scope()
|
|
if not is_admin:
|
|
if not can_access_status or db_user_id is None:
|
|
return jsonify(
|
|
{"error": "User identity unavailable", "code": "user_identity_unavailable"}
|
|
), 403
|
|
|
|
actor_username = session.get("user_id")
|
|
normalized_actor_username = actor_username if isinstance(actor_username, str) else None
|
|
if not _task_owned_by_actor(
|
|
task,
|
|
actor_user_id=db_user_id,
|
|
actor_username=normalized_actor_username,
|
|
):
|
|
return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403
|
|
|
|
if getattr(task, "request_id", None) is not None:
|
|
return jsonify(
|
|
{"error": "Forbidden", "code": "requested_download_cancel_forbidden"}
|
|
), 403
|
|
|
|
success = backend.cancel_download(book_id)
|
|
if success:
|
|
return jsonify({"status": "cancelled", "book_id": book_id})
|
|
return jsonify({"error": "Failed to cancel download or book not found"}), 404
|
|
except Exception as e:
|
|
logger.error_trace(f"Cancel download error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/download/<path:book_id>/retry", methods=["POST"])
|
|
@login_required
|
|
def api_retry_download(book_id: str) -> Response | tuple[Response, int]:
|
|
"""Retry a failed download."""
|
|
try:
|
|
task = backend.book_queue.get_task(book_id)
|
|
history_row = None
|
|
if task is None and download_history_service is not None:
|
|
history_row = download_history_service.get_by_task_id(book_id)
|
|
if task is None and history_row is None:
|
|
return jsonify({"error": "Download not found"}), 404
|
|
|
|
is_admin, db_user_id, can_access_status = _resolve_status_scope()
|
|
actor_username = session.get("user_id")
|
|
normalized_actor_username = actor_username if isinstance(actor_username, str) else None
|
|
if not is_admin:
|
|
if not can_access_status or db_user_id is None:
|
|
return jsonify(
|
|
{"error": "User identity unavailable", "code": "user_identity_unavailable"}
|
|
), 403
|
|
|
|
if task is not None:
|
|
if not _task_owned_by_actor(
|
|
task,
|
|
actor_user_id=db_user_id,
|
|
actor_username=normalized_actor_username,
|
|
):
|
|
return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403
|
|
elif history_row is None or not _download_row_owned_by_actor(
|
|
history_row,
|
|
actor_user_id=db_user_id,
|
|
actor_username=normalized_actor_username,
|
|
):
|
|
return jsonify({"error": "Forbidden", "code": "download_not_owned"}), 403
|
|
|
|
if task is not None:
|
|
task_status = backend.book_queue.get_task_status(book_id)
|
|
if getattr(
|
|
task, "request_id", None
|
|
) is not None and not backend.can_retry_download_task(task, task_status):
|
|
return jsonify(
|
|
{"error": "Forbidden", "code": "requested_download_retry_forbidden"}
|
|
), 403
|
|
success, error = backend.retry_download(book_id)
|
|
else:
|
|
assert history_row is not None
|
|
request_id = normalize_positive_int(history_row.get("request_id"))
|
|
retry_payload = history_row.get("retry_payload")
|
|
final_status = history_row.get("final_status")
|
|
if request_id is not None and not download_history_service.is_retry_available(
|
|
history_row
|
|
):
|
|
return jsonify(
|
|
{"error": "Forbidden", "code": "requested_download_retry_forbidden"}
|
|
), 403
|
|
success, error = backend.retry_persisted_download(
|
|
retry_payload,
|
|
final_status=final_status,
|
|
)
|
|
|
|
if success:
|
|
return jsonify({"status": "queued", "book_id": book_id})
|
|
|
|
if error == "Download not found":
|
|
return jsonify({"error": error}), 404
|
|
|
|
return jsonify({"error": error or "Download cannot be retried"}), 409
|
|
except Exception as e:
|
|
logger.error_trace(f"Retry download error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/queue/<path:book_id>/priority", methods=["PUT"])
|
|
@login_required
|
|
def api_set_priority(book_id: str) -> Response | tuple[Response, int]:
|
|
"""Set priority for a queued book.
|
|
|
|
Path Parameters:
|
|
book_id (str): Book identifier
|
|
|
|
Request Body:
|
|
priority (int): New priority level (lower number = higher priority)
|
|
|
|
Returns:
|
|
flask.Response: JSON status indicating success or failure.
|
|
|
|
"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or "priority" not in data:
|
|
return jsonify({"error": "Priority not provided"}), 400
|
|
|
|
priority = int(data["priority"])
|
|
success = backend.set_book_priority(book_id, priority)
|
|
|
|
if success:
|
|
return jsonify({"status": "updated", "book_id": book_id, "priority": priority})
|
|
return jsonify({"error": "Failed to update priority or book not found"}), 404
|
|
except ValueError:
|
|
return jsonify({"error": "Invalid priority value"}), 400
|
|
except Exception as e:
|
|
logger.error_trace(f"Set priority error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/queue/reorder", methods=["POST"])
|
|
@login_required
|
|
def api_reorder_queue() -> Response | tuple[Response, int]:
|
|
"""Bulk reorder queue by setting new priorities.
|
|
|
|
Request Body:
|
|
book_priorities (dict): Mapping of book_id to new priority
|
|
|
|
Returns:
|
|
flask.Response: JSON status indicating success or failure.
|
|
|
|
"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or "book_priorities" not in data:
|
|
return jsonify({"error": "book_priorities not provided"}), 400
|
|
|
|
book_priorities = data["book_priorities"]
|
|
if not isinstance(book_priorities, dict):
|
|
return jsonify({"error": "book_priorities must be a dictionary"}), 400
|
|
|
|
# Validate all priorities are integers
|
|
for book_id, priority in book_priorities.items():
|
|
if not isinstance(priority, int):
|
|
return jsonify({"error": f"Invalid priority for book {book_id}"}), 400
|
|
|
|
success = backend.reorder_queue(book_priorities)
|
|
|
|
if success:
|
|
return jsonify({"status": "reordered", "updated_count": len(book_priorities)})
|
|
return jsonify({"error": "Failed to reorder queue"}), 500
|
|
except Exception as e:
|
|
logger.error_trace(f"Reorder queue error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/queue/order", methods=["GET"])
|
|
@login_required
|
|
def api_queue_order() -> Response | tuple[Response, int]:
|
|
"""Get current queue order for display.
|
|
|
|
Returns:
|
|
flask.Response: JSON array of queued books with their order and priorities.
|
|
|
|
"""
|
|
try:
|
|
queue_order = backend.get_queue_order()
|
|
return jsonify({"queue": queue_order})
|
|
except Exception as e:
|
|
logger.error_trace(f"Queue order error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/downloads/active", methods=["GET"])
|
|
@login_required
|
|
def api_active_downloads() -> Response | tuple[Response, int]:
|
|
"""Get list of currently active downloads.
|
|
|
|
Returns:
|
|
flask.Response: JSON array of active download book IDs.
|
|
|
|
"""
|
|
try:
|
|
active_downloads = backend.get_active_downloads()
|
|
return jsonify({"active_downloads": active_downloads})
|
|
except Exception as e:
|
|
logger.error_trace(f"Active downloads error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def not_found_error(error: Exception) -> Response | tuple[Response, int]:
|
|
"""Handle 404 (Not Found) errors.
|
|
|
|
Args:
|
|
error (HTTPException): The 404 error raised by Flask.
|
|
|
|
Returns:
|
|
flask.Response: JSON error message with 404 status.
|
|
|
|
"""
|
|
logger.warning("404 error: %s : %s", request.url, error)
|
|
return jsonify({"error": "Resource not found"}), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def internal_error(error: Exception) -> Response | tuple[Response, int]:
|
|
"""Handle 500 (Internal Server) errors.
|
|
|
|
Args:
|
|
error (HTTPException): The 500 error raised by Flask.
|
|
|
|
Returns:
|
|
flask.Response: JSON error message with 500 status.
|
|
|
|
"""
|
|
logger.error_trace(f"500 error: {error}")
|
|
return jsonify({"error": "Internal server error"}), 500
|
|
|
|
|
|
def _failed_login_response(username: str, ip_address: str) -> tuple[Response, int]:
|
|
"""Handle a failed login attempt by recording it and returning the appropriate response."""
|
|
is_now_locked = record_failed_login(username, ip_address)
|
|
|
|
if is_now_locked:
|
|
return jsonify(
|
|
{
|
|
"error": f"Account locked due to {MAX_LOGIN_ATTEMPTS} failed login attempts. Try again in {LOCKOUT_DURATION_MINUTES} minutes."
|
|
}
|
|
), 429
|
|
|
|
attempts_remaining = MAX_LOGIN_ATTEMPTS - failed_login_attempts[username]["count"]
|
|
if attempts_remaining <= 5:
|
|
return jsonify(
|
|
{"error": f"Invalid username or password. {attempts_remaining} attempts remaining."}
|
|
), 401
|
|
|
|
return jsonify({"error": "Invalid username or password."}), 401
|
|
|
|
|
|
@app.route("/api/auth/login", methods=["POST"])
|
|
def api_login() -> Response | tuple[Response, int]:
|
|
"""Login endpoint that validates credentials and creates a session.
|
|
Supports both built-in credentials and CWA database authentication.
|
|
Includes rate limiting: 10 failed attempts = 30 minute lockout.
|
|
|
|
Request Body:
|
|
username (str): Username
|
|
password (str): Password
|
|
remember_me (bool): Whether to extend session duration
|
|
|
|
Returns:
|
|
flask.Response: JSON with success status or error message.
|
|
|
|
"""
|
|
try:
|
|
ip_address = get_client_ip()
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "No data provided"}), 400
|
|
|
|
auth_mode = get_auth_mode()
|
|
if auth_mode == "proxy":
|
|
return jsonify({"error": "Proxy authentication is enabled"}), 401
|
|
|
|
if auth_mode == "oidc" and HIDE_LOCAL_AUTH:
|
|
return jsonify({"error": "Local authentication is disabled"}), 403
|
|
|
|
username = data.get("username", "").strip()
|
|
password = data.get("password", "")
|
|
remember_me = data.get("remember_me", False)
|
|
|
|
if not username or not password:
|
|
return jsonify({"error": "Username and password are required"}), 400
|
|
|
|
# Check if account is locked due to failed login attempts
|
|
if is_account_locked(username):
|
|
lockout_until = failed_login_attempts[username].get("lockout_until")
|
|
remaining_time = (lockout_until - datetime.now()).total_seconds() / 60
|
|
logger.warning(
|
|
"Login attempt blocked for locked account '%s' from IP %s", username, ip_address
|
|
)
|
|
return jsonify(
|
|
{
|
|
"error": f"Account temporarily locked due to multiple failed login attempts. Try again in {int(remaining_time)} minutes."
|
|
}
|
|
), 429
|
|
|
|
# If no authentication is configured, authentication always succeeds
|
|
if auth_mode == "none":
|
|
session["user_id"] = username
|
|
session.permanent = remember_me
|
|
clear_failed_logins(username)
|
|
logger.info(
|
|
"Login successful for user '%s' from IP %s (no auth configured)",
|
|
username,
|
|
ip_address,
|
|
)
|
|
return jsonify({"success": True})
|
|
|
|
# Password authentication (builtin and OIDC modes)
|
|
# OIDC mode also allows password login as a fallback so admins don't get locked out
|
|
if auth_mode in ("builtin", "oidc"):
|
|
if user_db is None:
|
|
logger.error("User database not available for %s auth", auth_mode)
|
|
return jsonify({"error": "Authentication service unavailable"}), 503
|
|
try:
|
|
db_user = user_db.get_user(username=username)
|
|
|
|
if not db_user:
|
|
return _failed_login_response(username, ip_address)
|
|
|
|
# Authenticate against DB user
|
|
if db_user:
|
|
if not db_user.get("password_hash") or not check_password_hash(
|
|
db_user["password_hash"], password
|
|
):
|
|
return _failed_login_response(username, ip_address)
|
|
|
|
is_admin = db_user["role"] == "admin"
|
|
session["user_id"] = username
|
|
session["db_user_id"] = db_user["id"]
|
|
session["is_admin"] = is_admin
|
|
session.permanent = remember_me
|
|
clear_failed_logins(username)
|
|
logger.info(
|
|
"Login successful for user '%s' from IP %s (%s auth, is_admin=%s, remember_me=%s)",
|
|
username,
|
|
ip_address,
|
|
auth_mode,
|
|
is_admin,
|
|
remember_me,
|
|
)
|
|
return jsonify({"success": True})
|
|
|
|
return _failed_login_response(username, ip_address)
|
|
|
|
except Exception as e:
|
|
logger.error_trace(f"Built-in auth error: {e}")
|
|
return jsonify({"error": "Authentication system error"}), 500
|
|
|
|
# CWA database authentication mode
|
|
if auth_mode == "cwa":
|
|
# Verify database still exists (it was validated at startup)
|
|
if not CWA_DB_PATH or not CWA_DB_PATH.exists():
|
|
logger.error("CWA database at %s is no longer accessible", CWA_DB_PATH)
|
|
return jsonify({"error": "Database configuration error"}), 500
|
|
|
|
try:
|
|
db_path = os.fspath(CWA_DB_PATH)
|
|
db_uri = f"file:{db_path}?mode=ro&immutable=1"
|
|
conn = sqlite3.connect(db_uri, uri=True)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT password, role, email FROM user WHERE name = ?", (username,))
|
|
row = cur.fetchone()
|
|
conn.close()
|
|
|
|
# Check if user exists and password is correct
|
|
if not row or not row[0] or not check_password_hash(row[0], password):
|
|
return _failed_login_response(username, ip_address)
|
|
|
|
# Check if user has admin role (ROLE_ADMIN = 1, bit flag)
|
|
user_role = row[1] if row[1] is not None else 0
|
|
is_admin = (user_role & 1) == 1
|
|
cwa_email = row[2] or None
|
|
|
|
db_user_id = None
|
|
if user_db is not None:
|
|
role = "admin" if is_admin else "user"
|
|
db_user, _ = upsert_cwa_user(
|
|
user_db,
|
|
cwa_username=username,
|
|
cwa_email=cwa_email,
|
|
role=role,
|
|
context="cwa_login",
|
|
)
|
|
db_user_id = db_user["id"]
|
|
|
|
# Successful authentication - create session and clear failed attempts
|
|
session["user_id"] = username
|
|
session["is_admin"] = is_admin
|
|
if db_user_id is not None:
|
|
session["db_user_id"] = db_user_id
|
|
session.permanent = remember_me
|
|
clear_failed_logins(username)
|
|
logger.info(
|
|
"Login successful for user '%s' from IP %s (CWA auth, is_admin=%s, remember_me=%s)",
|
|
username,
|
|
ip_address,
|
|
is_admin,
|
|
remember_me,
|
|
)
|
|
return jsonify({"success": True})
|
|
|
|
except Exception as e:
|
|
logger.error_trace(f"CWA database error during login: {e}")
|
|
return jsonify({"error": "Authentication system error"}), 500
|
|
|
|
# Should not reach here, but handle gracefully
|
|
return jsonify({"error": "Unknown authentication mode"}), 500
|
|
|
|
except Exception as e:
|
|
logger.error_trace(f"Login error: {e}")
|
|
return jsonify({"error": "Login failed"}), 500
|
|
|
|
|
|
@app.route("/api/auth/logout", methods=["POST"])
|
|
def api_logout() -> Response | tuple[Response, int]:
|
|
"""Logout endpoint that clears the session.
|
|
For proxy auth, returns the logout URL if configured.
|
|
|
|
Returns:
|
|
flask.Response: JSON with success status and optional logout_url.
|
|
|
|
"""
|
|
try:
|
|
auth_mode = get_auth_mode()
|
|
ip_address = get_client_ip()
|
|
username = session.get("user_id", "unknown")
|
|
session.clear()
|
|
logger.info("Logout successful for user '%s' from IP %s", username, ip_address)
|
|
|
|
# For proxy auth, include logout URL if configured
|
|
if auth_mode == "proxy":
|
|
logout_url = app_config.get("PROXY_AUTH_LOGOUT_URL", "")
|
|
if logout_url:
|
|
return jsonify({"success": True, "logout_url": logout_url})
|
|
|
|
return jsonify({"success": True})
|
|
except Exception as e:
|
|
logger.error_trace(f"Logout error: {e}")
|
|
return jsonify({"error": "Logout failed"}), 500
|
|
|
|
|
|
@app.route("/api/auth/check", methods=["GET"])
|
|
def api_auth_check() -> Response | tuple[Response, int]:
|
|
"""Check if user has a valid session.
|
|
|
|
Returns:
|
|
flask.Response: JSON with authentication status, whether auth is required,
|
|
which auth mode is active, and whether user has admin privileges.
|
|
|
|
"""
|
|
try:
|
|
auth_mode = get_auth_mode()
|
|
|
|
# If no authentication is configured, access is allowed (full admin)
|
|
if auth_mode == "none":
|
|
return jsonify(
|
|
{
|
|
"authenticated": True,
|
|
"auth_required": False,
|
|
"auth_mode": "none",
|
|
"is_admin": True,
|
|
}
|
|
)
|
|
|
|
# Check if user has a valid session
|
|
is_authenticated = "user_id" in session
|
|
|
|
is_admin = get_auth_check_admin_status(auth_mode, {}, session)
|
|
|
|
display_name = None
|
|
if is_authenticated and session.get("db_user_id") and user_db is not None:
|
|
try:
|
|
db_user = user_db.get_user(user_id=session["db_user_id"])
|
|
if db_user:
|
|
display_name = db_user.get("display_name") or None
|
|
except Exception:
|
|
pass
|
|
|
|
response_data = {
|
|
"authenticated": is_authenticated,
|
|
"auth_required": True,
|
|
"auth_mode": auth_mode,
|
|
"is_admin": is_admin if is_authenticated else False,
|
|
"username": session.get("user_id") if is_authenticated else None,
|
|
"display_name": display_name,
|
|
}
|
|
|
|
# Add logout URL for proxy auth if configured
|
|
if auth_mode == "proxy" and app_config.get("PROXY_AUTH_USER_HEADER", ""):
|
|
logout_url = app_config.get("PROXY_AUTH_LOGOUT_URL", "")
|
|
if logout_url:
|
|
response_data["logout_url"] = logout_url
|
|
|
|
# Add custom OIDC button label and SSO enforcement flags if configured
|
|
if auth_mode == "oidc":
|
|
oidc_button_label = app_config.get("OIDC_BUTTON_LABEL", "")
|
|
if oidc_button_label:
|
|
response_data["oidc_button_label"] = oidc_button_label
|
|
if HIDE_LOCAL_AUTH:
|
|
response_data["hide_local_auth"] = True
|
|
if OIDC_AUTO_REDIRECT:
|
|
response_data["oidc_auto_redirect"] = True
|
|
|
|
return jsonify(response_data)
|
|
except Exception as e:
|
|
logger.error_trace(f"Auth check error: {e}")
|
|
return jsonify(
|
|
{
|
|
"authenticated": False,
|
|
"auth_required": True,
|
|
"auth_mode": "unknown",
|
|
"is_admin": False,
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/api/metadata/providers", methods=["GET"])
|
|
@login_required
|
|
def api_metadata_providers() -> Response | tuple[Response, int]:
|
|
"""Get list of available metadata providers.
|
|
|
|
Returns:
|
|
flask.Response: JSON with list of providers and their status.
|
|
|
|
"""
|
|
try:
|
|
from shelfmark.metadata_providers import (
|
|
get_configured_provider_name,
|
|
get_provider,
|
|
get_provider_kwargs,
|
|
list_providers,
|
|
)
|
|
|
|
app_config.refresh()
|
|
db_user_id = get_session_db_user_id(session)
|
|
|
|
configured_metadata_provider = get_configured_provider_name(
|
|
content_type="ebook",
|
|
user_id=db_user_id,
|
|
fallback_to_main=True,
|
|
)
|
|
configured_audiobook_metadata_provider = get_configured_provider_name(
|
|
content_type="audiobook",
|
|
user_id=db_user_id,
|
|
fallback_to_main=False,
|
|
)
|
|
configured_combined_metadata_provider = get_configured_provider_name(
|
|
content_type="combined",
|
|
user_id=db_user_id,
|
|
fallback_to_main=False,
|
|
)
|
|
providers = []
|
|
for info in list_providers():
|
|
enabled_key = f"{info['name'].upper()}_ENABLED"
|
|
provider_info = {
|
|
"name": info["name"],
|
|
"display_name": info["display_name"],
|
|
"requires_auth": info["requires_auth"],
|
|
"enabled": app_config.get(enabled_key, False) is True,
|
|
"available": False,
|
|
}
|
|
|
|
try:
|
|
kwargs = get_provider_kwargs(info["name"])
|
|
provider = get_provider(info["name"], **kwargs)
|
|
provider_info["available"] = provider.is_available()
|
|
except Exception:
|
|
pass
|
|
|
|
providers.append(provider_info)
|
|
|
|
return jsonify(
|
|
{
|
|
"providers": providers,
|
|
"configured_provider": configured_metadata_provider or None,
|
|
"configured_provider_audiobook": configured_audiobook_metadata_provider or None,
|
|
"configured_provider_combined": configured_combined_metadata_provider or None,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error_trace(f"Metadata providers error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/metadata/config", methods=["GET"])
|
|
@login_required
|
|
def api_metadata_config() -> Response | tuple[Response, int]:
|
|
"""Return provider-specific metadata search config for the active session."""
|
|
try:
|
|
from shelfmark.metadata_providers import (
|
|
get_configured_provider_name,
|
|
get_provider,
|
|
get_provider_capabilities,
|
|
get_provider_default_sort,
|
|
get_provider_kwargs,
|
|
get_provider_search_fields,
|
|
get_provider_sort_options,
|
|
is_provider_registered,
|
|
)
|
|
|
|
app_config.refresh()
|
|
content_type = request.args.get("content_type", "ebook").strip()
|
|
provider_name = request.args.get("provider", "").strip()
|
|
|
|
db_user_id = get_session_db_user_id(session)
|
|
|
|
if not provider_name:
|
|
provider_name = get_configured_provider_name(
|
|
content_type=content_type,
|
|
user_id=db_user_id,
|
|
fallback_to_main=True,
|
|
)
|
|
|
|
if not provider_name:
|
|
return jsonify(
|
|
{
|
|
"provider": None,
|
|
"display_name": None,
|
|
"enabled": False,
|
|
"available": False,
|
|
"search_fields": [],
|
|
"capabilities": [],
|
|
"sort_options": [{"value": "relevance", "label": "Most relevant"}],
|
|
"default_sort": "relevance",
|
|
}
|
|
)
|
|
|
|
if not is_provider_registered(provider_name):
|
|
return jsonify({"error": f"Unknown metadata provider: {provider_name}"}), 400
|
|
|
|
kwargs = get_provider_kwargs(provider_name)
|
|
provider = get_provider(provider_name, **kwargs)
|
|
enabled_key = f"{provider_name.upper()}_ENABLED"
|
|
provider_enabled = app_config.get(enabled_key, False) is True
|
|
provider_available = provider.is_available()
|
|
|
|
return jsonify(
|
|
{
|
|
"provider": provider_name,
|
|
"display_name": provider.display_name,
|
|
"enabled": provider_enabled,
|
|
"available": provider_available,
|
|
"search_fields": get_provider_search_fields(provider_name),
|
|
"capabilities": get_provider_capabilities(provider_name),
|
|
"sort_options": get_provider_sort_options(provider_name),
|
|
"default_sort": get_provider_default_sort(provider_name, user_id=db_user_id),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error_trace(f"Metadata config error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/metadata/search", methods=["GET"])
|
|
@login_required
|
|
def api_metadata_search() -> Response | tuple[Response, int]:
|
|
"""Search for books using the configured metadata provider.
|
|
|
|
Query Parameters:
|
|
query (str): Search query (required)
|
|
limit (int): Maximum number of results (default: 40, max: 100)
|
|
sort (str): Sort order - relevance, popularity, rating, newest, oldest (default: relevance)
|
|
[dynamic fields]: Provider-specific search fields passed as query params
|
|
|
|
Returns:
|
|
flask.Response: JSON with list of books from metadata provider.
|
|
|
|
"""
|
|
try:
|
|
from dataclasses import asdict
|
|
|
|
from shelfmark.metadata_providers import (
|
|
CheckboxSearchField,
|
|
MetadataSearchOptions,
|
|
NumberSearchField,
|
|
SortOrder,
|
|
get_configured_provider,
|
|
get_provider,
|
|
get_provider_kwargs,
|
|
is_provider_enabled,
|
|
is_provider_registered,
|
|
)
|
|
|
|
query = request.args.get("query", "").strip()
|
|
content_type = request.args.get("content_type", "ebook").strip()
|
|
provider_name = request.args.get("provider", "").strip()
|
|
|
|
try:
|
|
limit = min(int(request.args.get("limit", 40)), 100)
|
|
except ValueError:
|
|
limit = 40
|
|
|
|
try:
|
|
page = max(1, int(request.args.get("page", 1)))
|
|
except ValueError:
|
|
page = 1
|
|
|
|
# Parse sort parameter
|
|
sort_value = request.args.get("sort", "relevance").lower()
|
|
try:
|
|
sort_order = SortOrder(sort_value)
|
|
except ValueError:
|
|
sort_order = SortOrder.RELEVANCE
|
|
|
|
db_user_id = get_session_db_user_id(session)
|
|
|
|
if provider_name:
|
|
if not is_provider_registered(provider_name):
|
|
return jsonify(
|
|
{
|
|
"error": f"Unknown metadata provider: {provider_name}",
|
|
"message": f"Unknown metadata provider: {provider_name}",
|
|
}
|
|
), 400
|
|
if not is_provider_enabled(provider_name):
|
|
return jsonify(
|
|
{
|
|
"error": f"Metadata provider '{provider_name}' is not enabled",
|
|
"message": f"{provider_name} is not enabled. Enable it in Settings first.",
|
|
}
|
|
), 503
|
|
|
|
kwargs = get_provider_kwargs(provider_name)
|
|
provider = get_provider(provider_name, **kwargs)
|
|
else:
|
|
provider = get_configured_provider(content_type=content_type, user_id=db_user_id)
|
|
|
|
if not provider:
|
|
return jsonify(
|
|
{
|
|
"error": "No metadata provider configured",
|
|
"message": "No metadata provider configured. Enable one in Settings.",
|
|
}
|
|
), 503
|
|
|
|
if not provider.is_available():
|
|
return jsonify(
|
|
{
|
|
"error": f"Metadata provider '{provider.name}' is not available",
|
|
"message": f"{provider.display_name} is not available. Check configuration in Settings.",
|
|
}
|
|
), 503
|
|
|
|
# Extract custom search field values from query params
|
|
fields: dict[str, Any] = {}
|
|
for search_field in provider.search_fields:
|
|
value = request.args.get(search_field.key)
|
|
if value is not None:
|
|
# Strip string values to handle whitespace-only input
|
|
value = value.strip()
|
|
if value != "":
|
|
# Parse value based on field type
|
|
if isinstance(search_field, CheckboxSearchField):
|
|
fields[search_field.key] = value.lower() in ("true", "1", "yes", "on")
|
|
elif isinstance(search_field, NumberSearchField):
|
|
with suppress(ValueError):
|
|
fields[search_field.key] = int(value)
|
|
else:
|
|
fields[search_field.key] = value
|
|
|
|
# Require either a query or at least one field value
|
|
if not query and not fields:
|
|
return jsonify({"error": "Either 'query' or search field values are required"}), 400
|
|
|
|
options = MetadataSearchOptions(
|
|
query=query, limit=limit, page=page, sort=sort_order, fields=fields
|
|
)
|
|
search_result = provider.search_paginated(options)
|
|
|
|
# Convert BookMetadata objects to dicts
|
|
books_data = [asdict(book) for book in search_result.books]
|
|
|
|
# Transform cover_url to local proxy URLs when caching is enabled
|
|
from shelfmark.core.utils import transform_cover_url
|
|
|
|
for book_dict in books_data:
|
|
if book_dict.get("cover_url"):
|
|
cache_id = f"{book_dict['provider']}_{book_dict['provider_id']}"
|
|
book_dict["cover_url"] = transform_cover_url(book_dict["cover_url"], cache_id)
|
|
|
|
response_data = {
|
|
"books": books_data,
|
|
"provider": provider.name,
|
|
"query": query,
|
|
"page": search_result.page,
|
|
"total_found": search_result.total_found,
|
|
"has_more": search_result.has_more,
|
|
}
|
|
if search_result.source_url:
|
|
response_data["source_url"] = search_result.source_url
|
|
if search_result.source_title:
|
|
response_data["source_title"] = search_result.source_title
|
|
return jsonify(response_data)
|
|
except Exception as e:
|
|
logger.error_trace(f"Metadata search error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/metadata/field-options", methods=["GET"])
|
|
@login_required
|
|
def api_metadata_field_options() -> Response:
|
|
"""Return dynamic search-field options for a metadata provider."""
|
|
try:
|
|
from shelfmark.metadata_providers import (
|
|
get_configured_provider,
|
|
get_provider,
|
|
get_provider_kwargs,
|
|
is_provider_registered,
|
|
)
|
|
|
|
field_key = request.args.get("field", "").strip()
|
|
provider_name = request.args.get("provider", "").strip()
|
|
content_type = request.args.get("content_type", "ebook").strip()
|
|
query_text = request.args.get("query", "").strip()
|
|
|
|
if not field_key:
|
|
return jsonify({"options": []})
|
|
|
|
db_user_id = get_session_db_user_id(session)
|
|
|
|
provider = None
|
|
if provider_name:
|
|
if not is_provider_registered(provider_name):
|
|
return jsonify({"options": []})
|
|
kwargs = get_provider_kwargs(provider_name)
|
|
provider = get_provider(provider_name, **kwargs)
|
|
else:
|
|
provider = get_configured_provider(content_type=content_type, user_id=db_user_id)
|
|
|
|
if not provider or not provider.is_available():
|
|
return jsonify({"options": []})
|
|
|
|
options = provider.get_search_field_options(field_key, query=query_text or None)
|
|
return jsonify({"options": options})
|
|
except Exception as e:
|
|
logger.warning("Metadata field options endpoint error: %s", e)
|
|
return jsonify({"options": []})
|
|
|
|
|
|
def _resolve_metadata_provider(provider_name: str) -> MetadataProvider:
|
|
"""Validate, instantiate and return a ready metadata provider.
|
|
|
|
Raises appropriate HTTP-friendly exceptions on failure.
|
|
"""
|
|
from shelfmark.metadata_providers import (
|
|
get_provider,
|
|
get_provider_kwargs,
|
|
is_provider_registered,
|
|
)
|
|
|
|
if not is_provider_registered(provider_name):
|
|
raise ValueError(f"Unknown metadata provider: {provider_name}")
|
|
|
|
kwargs = get_provider_kwargs(provider_name)
|
|
prov = get_provider(provider_name, **kwargs)
|
|
|
|
if not prov.is_available():
|
|
raise RuntimeError(f"Provider '{provider_name}' is not available")
|
|
|
|
return prov
|
|
|
|
|
|
@app.route("/api/metadata/book/<provider>/<book_id>", methods=["GET"])
|
|
@login_required
|
|
def api_metadata_book(provider: str, book_id: str) -> Response | tuple[Response, int]:
|
|
"""Get detailed book information from a metadata provider.
|
|
|
|
Path Parameters:
|
|
provider (str): Provider name (e.g., "hardcover", "openlibrary")
|
|
book_id (str): Book ID in the provider's system
|
|
|
|
Returns:
|
|
flask.Response: JSON with book details.
|
|
|
|
"""
|
|
try:
|
|
from dataclasses import asdict
|
|
|
|
prov = _resolve_metadata_provider(provider)
|
|
|
|
book = prov.get_book(book_id)
|
|
if not book:
|
|
return jsonify({"error": "Book not found"}), 404
|
|
|
|
book_dict = asdict(book)
|
|
|
|
# Transform cover_url to local proxy URL when caching is enabled
|
|
from shelfmark.core.utils import transform_cover_url
|
|
|
|
if book_dict.get("cover_url"):
|
|
cache_id = f"{provider}_{book_id}"
|
|
book_dict["cover_url"] = transform_cover_url(book_dict["cover_url"], cache_id)
|
|
|
|
return jsonify(book_dict)
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
except RuntimeError as e:
|
|
return jsonify({"error": str(e)}), 503
|
|
except Exception as e:
|
|
logger.error_trace(f"Metadata book error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
def _handle_target_errors(
|
|
fallback_message: str,
|
|
) -> Callable[
|
|
[Callable[..., Response | tuple[Response, int]]], Callable[..., Response | tuple[Response, int]]
|
|
]:
|
|
"""Decorator that wraps a metadata-target route with standard error handling."""
|
|
|
|
def decorator(
|
|
fn: Callable[..., Response | tuple[Response, int]],
|
|
) -> Callable[..., Response | tuple[Response, int]]:
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs) -> Response | tuple[Response, int]:
|
|
try:
|
|
return fn(*args, **kwargs)
|
|
except (NotImplementedError, ValueError) as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
except RuntimeError as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
except Exception as e:
|
|
logger.error_trace(f"{fallback_message}: {e}")
|
|
return jsonify({"error": fallback_message}), 500
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
@app.route("/api/metadata/book/<provider>/<book_id>/targets", methods=["GET"])
|
|
@login_required
|
|
@_handle_target_errors("Failed to load book targets")
|
|
def api_metadata_book_targets(provider: str, book_id: str) -> Response | tuple[Response, int]:
|
|
"""Get provider-managed list/status targets for a specific book."""
|
|
prov = _resolve_metadata_provider(provider)
|
|
return jsonify({"options": prov.get_book_targets(book_id)})
|
|
|
|
|
|
@app.route("/api/metadata/book/<provider>/targets/batch", methods=["POST"])
|
|
@login_required
|
|
@_handle_target_errors("Failed to load book targets")
|
|
def api_metadata_book_targets_batch(provider: str) -> Response | tuple[Response, int]:
|
|
"""Get provider-managed list/status targets for multiple books."""
|
|
prov = _resolve_metadata_provider(provider)
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
raw_ids = payload.get("book_ids", []) if isinstance(payload, dict) else []
|
|
if not isinstance(raw_ids, list) or not raw_ids:
|
|
return jsonify({"error": "book_ids must be a non-empty array"}), 400
|
|
|
|
book_ids = [str(bid) for bid in raw_ids[:50]]
|
|
|
|
return jsonify({"results": prov.get_book_targets_batch(book_ids)})
|
|
|
|
|
|
@app.route("/api/metadata/book/<provider>/<book_id>/targets", methods=["PUT"])
|
|
@login_required
|
|
@_handle_target_errors("Failed to update book targets")
|
|
def api_metadata_book_targets_update(
|
|
provider: str, book_id: str
|
|
) -> Response | tuple[Response, int]:
|
|
"""Set whether a book belongs to a provider-managed list or shelf."""
|
|
prov = _resolve_metadata_provider(provider)
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
target = str(payload.get("target", "")).strip() if isinstance(payload, dict) else ""
|
|
selected = payload.get("selected") if isinstance(payload, dict) else None
|
|
if not target:
|
|
return jsonify({"error": "target is required"}), 400
|
|
if not isinstance(selected, bool):
|
|
return jsonify({"error": "selected must be a boolean"}), 400
|
|
|
|
result = prov.set_book_target_state(book_id, target, selected)
|
|
response: dict = {
|
|
"success": True,
|
|
"changed": bool(result.get("changed", True)),
|
|
"selected": selected,
|
|
}
|
|
deselected = result.get("deselected_target")
|
|
if isinstance(deselected, str) and deselected:
|
|
response["deselected_target"] = deselected
|
|
return jsonify(response)
|
|
|
|
|
|
@app.route("/api/releases", methods=["GET"])
|
|
@login_required
|
|
def api_releases() -> Response | tuple[Response, int]:
|
|
"""Search for downloadable releases of a book.
|
|
|
|
This endpoint takes book metadata and searches available release sources
|
|
(e.g., Anna's Archive, Libgen) for downloadable files.
|
|
|
|
Query Parameters:
|
|
provider (str): Metadata provider name (required)
|
|
book_id (str): Book ID from metadata provider (required)
|
|
source (str): Release source to search (optional, default: all)
|
|
|
|
Returns:
|
|
flask.Response: JSON with list of available releases.
|
|
|
|
"""
|
|
try:
|
|
from dataclasses import asdict
|
|
|
|
from shelfmark.core.search_plan import build_release_search_plan
|
|
from shelfmark.metadata_providers import (
|
|
BookMetadata,
|
|
get_provider,
|
|
get_provider_kwargs,
|
|
is_provider_registered,
|
|
)
|
|
from shelfmark.release_sources import (
|
|
browse_record_to_book_metadata,
|
|
get_source,
|
|
list_available_sources,
|
|
serialize_column_config,
|
|
source_results_are_releases,
|
|
)
|
|
|
|
def _search_source_releases(source_name: str) -> tuple[Any | None, list[Any], str | None]:
|
|
"""Search one source and return any error message instead of raising."""
|
|
try:
|
|
source = get_source(source_name)
|
|
|
|
plan = build_release_search_plan(
|
|
book,
|
|
languages=browse_filters.lang
|
|
if source_query_filters is not None
|
|
else languages,
|
|
manual_query=query_text if source_query_filters is not None else manual_query,
|
|
indexers=indexers,
|
|
source_filters=source_query_filters,
|
|
)
|
|
|
|
if plan.source_filters is not None:
|
|
planned_query = plan.manual_query or plan.primary_query
|
|
planned_query_type = "query"
|
|
elif plan.manual_query:
|
|
planned_query = plan.manual_query
|
|
planned_query_type = "manual"
|
|
elif not expand_search and plan.isbn_candidates:
|
|
planned_query = plan.isbn_candidates[0]
|
|
planned_query_type = "isbn"
|
|
else:
|
|
planned_query = plan.primary_query
|
|
planned_query_type = "title_author"
|
|
|
|
logger.debug(
|
|
"Searching %s: %s='%s' (title='%s', authors=%s, expand=%s, content_type=%s)",
|
|
source_name,
|
|
planned_query_type,
|
|
planned_query,
|
|
book.title,
|
|
book.authors,
|
|
expand_search,
|
|
content_type,
|
|
)
|
|
|
|
releases = source.search(
|
|
book, plan, expand_search=expand_search, content_type=content_type
|
|
)
|
|
except ValueError:
|
|
return None, [], f"Unknown source: {source_name}"
|
|
except Exception as e:
|
|
logger.warning("Release search failed for source %s: %s", source_name, e)
|
|
return None, [], f"{source_name}: {e!s}"
|
|
else:
|
|
return source, releases, None
|
|
|
|
provider = request.args.get("provider", "").strip()
|
|
book_id = request.args.get("book_id", "").strip()
|
|
source_filter = request.args.get("source", "").strip()
|
|
query_text = request.args.get("query", "").strip()
|
|
# Accept title/author from frontend to avoid re-fetching metadata
|
|
title_param = request.args.get("title", "").strip()
|
|
author_param = request.args.get("author", "").strip()
|
|
expand_search = request.args.get("expand_search", "").lower() == "true"
|
|
# Accept language codes for filtering (comma-separated)
|
|
languages_param = request.args.get("languages", "").strip()
|
|
languages = (
|
|
[lang.strip() for lang in languages_param.split(",") if lang.strip()]
|
|
if languages_param
|
|
else None
|
|
)
|
|
# Content type for audiobook vs ebook search
|
|
content_type = request.args.get("content_type", "ebook").strip()
|
|
|
|
manual_query = request.args.get("manual_query", "").strip()
|
|
|
|
# Accept indexer names for Prowlarr filtering (comma-separated)
|
|
indexers_param = request.args.get("indexers", "").strip()
|
|
indexers = (
|
|
[idx.strip() for idx in indexers_param.split(",") if idx.strip()]
|
|
if indexers_param
|
|
else None
|
|
)
|
|
browse_filters = _parse_search_filters_from_request()
|
|
has_browse_filters = bool(query_text or any(vars(browse_filters).values()))
|
|
|
|
source_query_filters = None
|
|
is_source_provider = bool(provider) and source_results_are_releases(provider)
|
|
|
|
if not provider or not book_id:
|
|
if not source_filter or not has_browse_filters:
|
|
return jsonify({"error": "Parameters 'provider' and 'book_id' are required"}), 400
|
|
if not source_results_are_releases(source_filter):
|
|
return jsonify(
|
|
{"error": f"Source does not support browse release search: {source_filter}"}
|
|
), 400
|
|
|
|
book = _build_source_query_book(query_text, browse_filters)
|
|
source_query_filters = browse_filters
|
|
elif is_source_provider:
|
|
# Source-backed browse flows can reopen the release modal with provider=<source name>.
|
|
# In that flow, treat the source-native record as release-search context instead of
|
|
# requiring a metadata provider registration.
|
|
source = get_source(provider)
|
|
direct_record = source.get_record(book_id)
|
|
if direct_record is None:
|
|
return jsonify({"error": "Book not found in release source"}), 404
|
|
|
|
book = browse_record_to_book_metadata(
|
|
direct_record,
|
|
title_override=title_param or None,
|
|
author_override=author_param or None,
|
|
)
|
|
elif provider == "manual":
|
|
resolved_title = title_param or manual_query or "Manual Search"
|
|
resolved_author = author_param or ""
|
|
authors = [a.strip() for a in resolved_author.split(",") if a.strip()]
|
|
|
|
book = BookMetadata(
|
|
provider="manual",
|
|
provider_id=book_id,
|
|
provider_display_name="Manual Search",
|
|
title=resolved_title,
|
|
search_title=resolved_title,
|
|
search_author=resolved_author or None,
|
|
authors=authors,
|
|
)
|
|
else:
|
|
if not is_provider_registered(provider):
|
|
return jsonify({"error": f"Unknown metadata provider: {provider}"}), 400
|
|
|
|
# Get book metadata from provider
|
|
kwargs = get_provider_kwargs(provider)
|
|
prov = get_provider(provider, **kwargs)
|
|
book = prov.get_book(book_id)
|
|
|
|
if not book:
|
|
return jsonify({"error": "Book not found in metadata provider"}), 404
|
|
|
|
# Override title from frontend if available (search results may have better data)
|
|
# Note: We intentionally DON'T override authors here - get_book() now returns
|
|
# filtered authors (primary authors only, excluding translators/narrators),
|
|
# which gives better release search results than the unfiltered search data
|
|
if title_param:
|
|
book.title = title_param
|
|
|
|
# Determine which release sources to search
|
|
if source_query_filters is not None or source_filter:
|
|
sources_to_search = [source_filter]
|
|
elif is_source_provider:
|
|
# Source-backed browse flows stay within the source that produced the record.
|
|
sources_to_search = [provider]
|
|
else:
|
|
# Search only enabled sources
|
|
sources_to_search = [src["name"] for src in list_available_sources() if src["enabled"]]
|
|
|
|
# Search each source for releases
|
|
all_releases = []
|
|
errors = []
|
|
source_instances = {} # Keep source instances for column config
|
|
|
|
for source_name in sources_to_search:
|
|
source, releases, error = _search_source_releases(source_name)
|
|
if source is not None:
|
|
source_instances[source_name] = source
|
|
all_releases.extend(releases)
|
|
if error is not None:
|
|
errors.append(error)
|
|
|
|
# Convert Release objects to dicts
|
|
releases_data = [_serialize_release(release) for release in all_releases]
|
|
|
|
# Get column config from the first source searched
|
|
# Reuse the same instance to get any dynamic data (e.g., online_servers for IRC)
|
|
column_config = None
|
|
if sources_to_search and sources_to_search[0] in source_instances:
|
|
try:
|
|
first_source = source_instances[sources_to_search[0]]
|
|
column_config = serialize_column_config(first_source.get_column_config())
|
|
except Exception as e:
|
|
logger.warning("Failed to get column config: %s", e)
|
|
|
|
# Convert book to dict and transform cover_url
|
|
book_dict = asdict(book)
|
|
from shelfmark.core.utils import transform_cover_url
|
|
|
|
if book_dict.get("cover_url"):
|
|
cache_id = f"{provider}_{book_id}"
|
|
book_dict["cover_url"] = transform_cover_url(book_dict["cover_url"], cache_id)
|
|
|
|
search_info = {}
|
|
for source_name, source_instance in source_instances.items():
|
|
if hasattr(source_instance, "last_search_type") and source_instance.last_search_type:
|
|
search_info[source_name] = {"search_type": source_instance.last_search_type}
|
|
|
|
response = {
|
|
"releases": releases_data,
|
|
"book": book_dict,
|
|
"sources_searched": sources_to_search,
|
|
"column_config": column_config,
|
|
"search_info": search_info,
|
|
}
|
|
|
|
if errors:
|
|
response["errors"] = errors
|
|
|
|
# If no releases found and there were errors, return 503 with the first
|
|
# source failure message so direct-mode source query searches surface the
|
|
# same unavailable-state messaging as release modal searches.
|
|
if not releases_data and errors:
|
|
# Use the first error message (typically the most relevant)
|
|
error_message = errors[0]
|
|
# Strip the source prefix if present (e.g., "direct_download: message" -> "message")
|
|
if ": " in error_message:
|
|
error_message = error_message.split(": ", 1)[1]
|
|
return jsonify({"error": error_message}), 503
|
|
|
|
return jsonify(response)
|
|
except SourceUnavailableError as e:
|
|
logger.warning("Release search unavailable: %s", e)
|
|
return jsonify({"error": str(e)}), 503
|
|
except Exception as e:
|
|
logger.error_trace(f"Releases search error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/release-sources", methods=["GET"])
|
|
@login_required
|
|
def api_release_sources() -> Response | tuple[Response, int]:
|
|
"""Get available release sources from the plugin registry.
|
|
|
|
Returns:
|
|
flask.Response: JSON list of available release sources.
|
|
|
|
"""
|
|
try:
|
|
from shelfmark.release_sources import list_available_sources
|
|
|
|
sources = list_available_sources()
|
|
return jsonify(sources)
|
|
except Exception as e:
|
|
logger.error_trace(f"Release sources error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/release-sources/<source_name>/records/<path:record_id>", methods=["GET"])
|
|
@login_required
|
|
def api_release_source_record(source_name: str, record_id: str) -> Response | tuple[Response, int]:
|
|
"""Resolve a source-native browse record for a release source."""
|
|
try:
|
|
from shelfmark.release_sources import get_source
|
|
|
|
source = get_source(source_name)
|
|
record = source.get_record(record_id)
|
|
if record is None:
|
|
return jsonify({"error": "Record not found"}), 404
|
|
return jsonify(_serialize_browse_record(record))
|
|
except ValueError:
|
|
return jsonify({"error": f"Unknown release source: {source_name}"}), 400
|
|
except SourceUnavailableError as e:
|
|
logger.warning("Release source record unavailable: %s", e)
|
|
return jsonify({"error": str(e)}), 503
|
|
except Exception as e:
|
|
logger.error_trace(f"Release source record error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/settings", methods=["GET"])
|
|
@login_required
|
|
def api_settings_get_all() -> Response | tuple[Response, int]:
|
|
"""Get all settings tabs with their fields and current values.
|
|
|
|
Returns:
|
|
flask.Response: JSON with all settings tabs.
|
|
|
|
"""
|
|
try:
|
|
import_module("shelfmark.config.notifications_settings")
|
|
import_module("shelfmark.config.security")
|
|
|
|
# Ensure settings are registered by importing settings modules
|
|
# This triggers the @register_settings decorators
|
|
import_module("shelfmark.config.settings")
|
|
import_module("shelfmark.config.users_settings")
|
|
from shelfmark.core.settings_registry import serialize_all_settings
|
|
|
|
data = serialize_all_settings(include_values=True)
|
|
return jsonify(data)
|
|
except Exception as e:
|
|
logger.error_trace(f"Settings get error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/settings/<tab_name>", methods=["GET"])
|
|
@login_required
|
|
def api_settings_get_tab(tab_name: str) -> Response | tuple[Response, int]:
|
|
"""Get settings for a specific tab.
|
|
|
|
Path Parameters:
|
|
tab_name (str): Settings tab name (e.g., "general", "hardcover")
|
|
|
|
Returns:
|
|
flask.Response: JSON with tab settings and values.
|
|
|
|
"""
|
|
try:
|
|
import_module("shelfmark.config.notifications_settings")
|
|
import_module("shelfmark.config.security")
|
|
|
|
# Ensure settings are registered
|
|
import_module("shelfmark.config.settings")
|
|
import_module("shelfmark.config.users_settings")
|
|
from shelfmark.core.settings_registry import (
|
|
get_settings_tab,
|
|
serialize_tab,
|
|
)
|
|
|
|
tab = get_settings_tab(tab_name)
|
|
if not tab:
|
|
return jsonify({"error": f"Unknown settings tab: {tab_name}"}), 404
|
|
|
|
return jsonify(serialize_tab(tab, include_values=True))
|
|
except Exception as e:
|
|
logger.error_trace(f"Settings get tab error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/settings/<tab_name>", methods=["PUT"])
|
|
@login_required
|
|
def api_settings_update_tab(tab_name: str) -> Response | tuple[Response, int]:
|
|
"""Update settings for a specific tab.
|
|
|
|
Path Parameters:
|
|
tab_name (str): Settings tab name
|
|
|
|
Request Body:
|
|
JSON object with setting keys and values to update.
|
|
|
|
Returns:
|
|
flask.Response: JSON with update result.
|
|
|
|
"""
|
|
try:
|
|
import_module("shelfmark.config.notifications_settings")
|
|
import_module("shelfmark.config.security")
|
|
|
|
# Ensure settings are registered
|
|
import_module("shelfmark.config.settings")
|
|
import_module("shelfmark.config.users_settings")
|
|
from shelfmark.core.settings_registry import (
|
|
get_settings_tab,
|
|
update_settings,
|
|
)
|
|
|
|
tab = get_settings_tab(tab_name)
|
|
if not tab:
|
|
return jsonify({"error": f"Unknown settings tab: {tab_name}"}), 404
|
|
|
|
values = request.get_json()
|
|
if values is None or not isinstance(values, dict):
|
|
return jsonify({"error": "Request body must be a JSON object"}), 400
|
|
|
|
# If no values to update, return success with empty updated list
|
|
if not values:
|
|
return jsonify({"success": True, "message": "No changes to save", "updated": []})
|
|
|
|
result = update_settings(tab_name, values)
|
|
|
|
if result["success"]:
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error_trace(f"Settings update error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/settings/<tab_name>/action/<action_key>", methods=["POST"])
|
|
@login_required
|
|
def api_settings_execute_action(tab_name: str, action_key: str) -> Response | tuple[Response, int]:
|
|
"""Execute a settings action (e.g., test connection).
|
|
|
|
Path Parameters:
|
|
tab_name (str): Settings tab name
|
|
action_key (str): Action key to execute
|
|
|
|
Request Body (optional):
|
|
JSON object with current form values (unsaved)
|
|
|
|
Returns:
|
|
flask.Response: JSON with action result.
|
|
|
|
"""
|
|
try:
|
|
import_module("shelfmark.config.notifications_settings")
|
|
import_module("shelfmark.config.security")
|
|
|
|
# Ensure settings are registered
|
|
import_module("shelfmark.config.settings")
|
|
import_module("shelfmark.config.users_settings")
|
|
from shelfmark.core.settings_registry import execute_action
|
|
|
|
# Get current form values if provided (for testing with unsaved values)
|
|
current_values = request.get_json(silent=True) or {}
|
|
|
|
result = execute_action(tab_name, action_key, current_values)
|
|
|
|
if result["success"]:
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error_trace(f"Settings action error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
# =============================================================================
|
|
# Onboarding API
|
|
# =============================================================================
|
|
|
|
|
|
@app.route("/api/onboarding", methods=["GET"])
|
|
@login_required
|
|
def api_onboarding_get() -> Response | tuple[Response, int]:
|
|
"""Get onboarding configuration including steps, fields, and current values.
|
|
|
|
Returns:
|
|
flask.Response: JSON with onboarding steps and values.
|
|
|
|
"""
|
|
try:
|
|
# Ensure settings are registered
|
|
import_module("shelfmark.config.settings")
|
|
from shelfmark.core.onboarding import get_onboarding_config
|
|
|
|
config = get_onboarding_config()
|
|
return jsonify(config)
|
|
except Exception as e:
|
|
logger.error_trace(f"Onboarding get error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/onboarding", methods=["POST"])
|
|
@login_required
|
|
def api_onboarding_save() -> Response | tuple[Response, int]:
|
|
"""Save onboarding settings and mark as complete.
|
|
|
|
Request Body:
|
|
JSON object with all onboarding field values
|
|
|
|
Returns:
|
|
flask.Response: JSON with success/error status.
|
|
|
|
"""
|
|
try:
|
|
# Ensure settings are registered
|
|
import_module("shelfmark.config.settings")
|
|
from shelfmark.core.onboarding import save_onboarding_settings
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"success": False, "message": "No data provided"}), 400
|
|
|
|
result = save_onboarding_settings(data)
|
|
|
|
if result["success"]:
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error_trace(f"Onboarding save error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/onboarding/skip", methods=["POST"])
|
|
@login_required
|
|
def api_onboarding_skip() -> Response | tuple[Response, int]:
|
|
"""Skip onboarding and mark as complete without saving any settings.
|
|
|
|
Returns:
|
|
flask.Response: JSON with success status.
|
|
|
|
"""
|
|
try:
|
|
from shelfmark.core.onboarding import mark_onboarding_complete
|
|
|
|
mark_onboarding_complete()
|
|
return jsonify({"success": True, "message": "Onboarding skipped"})
|
|
except Exception as e:
|
|
logger.error_trace(f"Onboarding skip error: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
# Catch-all route for React Router (must be last)
|
|
# This handles client-side routing by serving index.html for any unmatched routes
|
|
@app.route("/<path:path>")
|
|
def catch_all(path: str) -> Response:
|
|
"""Serve the React app for any route not matched by API endpoints.
|
|
This allows React Router to handle client-side routing.
|
|
Authentication is handled by the React app itself.
|
|
"""
|
|
# If the request is for an API endpoint or static file, let it 404
|
|
if path.startswith(("api/", "assets/")):
|
|
return jsonify({"error": "Resource not found"}), 404
|
|
# Otherwise serve the React app
|
|
return _serve_index_html()
|
|
|
|
|
|
# WebSocket event handlers
|
|
@socketio.on("connect")
|
|
def handle_connect() -> None:
|
|
"""Handle client connection."""
|
|
logger.info("WebSocket client connected")
|
|
|
|
# Track the connection (triggers warmup callbacks on first connect)
|
|
ws_manager.client_connected()
|
|
|
|
# Join appropriate room based on authenticated user session
|
|
is_admin, db_user_id, can_access_status = _resolve_status_scope()
|
|
ws_manager.join_user_room(request.sid, is_admin, db_user_id)
|
|
|
|
# Send initial status to the newly connected client (filtered)
|
|
try:
|
|
if not can_access_status:
|
|
emit("status_update", {})
|
|
return
|
|
|
|
user_id = None if is_admin else db_user_id
|
|
status = backend.queue_status(user_id=user_id)
|
|
emit("status_update", status)
|
|
except Exception:
|
|
logger.exception("Error sending initial status")
|
|
|
|
|
|
@socketio.on("disconnect")
|
|
def handle_disconnect() -> None:
|
|
"""Handle client disconnection."""
|
|
logger.info("WebSocket client disconnected")
|
|
|
|
# Leave room
|
|
ws_manager.leave_user_room(request.sid)
|
|
|
|
# Track the disconnection
|
|
ws_manager.client_disconnected()
|
|
|
|
|
|
@socketio.on("request_status")
|
|
def handle_status_request() -> None:
|
|
"""Handle manual status request from client."""
|
|
try:
|
|
is_admin, db_user_id, can_access_status = _resolve_status_scope()
|
|
ws_manager.sync_user_room(request.sid, is_admin, db_user_id)
|
|
|
|
if not can_access_status:
|
|
emit("status_update", {})
|
|
return
|
|
|
|
user_id = None if is_admin else db_user_id
|
|
status = backend.queue_status(user_id=user_id)
|
|
emit("status_update", status)
|
|
except Exception:
|
|
logger.exception("Error handling status request")
|
|
emit("error", {"message": "Failed to get status"})
|
|
|
|
|
|
logger.log_resource_usage()
|
|
|
|
# Warn if config directory is not writable (settings won't persist)
|
|
if not _is_config_dir_writable():
|
|
logger.warning(
|
|
"Config directory %s is not writable. Settings will not persist. Mount a config volume to enable settings persistence (see docs for details).",
|
|
CONFIG_DIR,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
logger.info(
|
|
"Starting Flask application with WebSocket support on %s:%s (debug=%s)",
|
|
FLASK_HOST,
|
|
FLASK_PORT,
|
|
DEBUG,
|
|
)
|
|
socketio.run(
|
|
app,
|
|
host=FLASK_HOST,
|
|
port=FLASK_PORT,
|
|
debug=DEBUG,
|
|
allow_unsafe_werkzeug=True, # For development only
|
|
)
|