Harden startup and refactor web/API logic

### Fixed
- Improves OpenAI parsing for fenced code blocks.
- Adds clearer admin actions with separated handlers for user and request management.

### Security
- Runs the application as a non-root user by default and tightens ownership of writable configuration paths.
This commit is contained in:
Beda Schmid
2025-10-13 14:30:36 -03:00
parent 98674635f7
commit 22ea024b94
8 changed files with 482 additions and 331 deletions

View File

@@ -4,7 +4,9 @@ ARG RELEASE_VERSION
ENV RELEASE_VERSION=${RELEASE_VERSION}
ENV PYTHONPATH="/sonobarr/src"
RUN apk update && apk add --no-cache su-exec
RUN apk update && apk add --no-cache su-exec \
&& addgroup -S -g 1000 sonobarr \
&& adduser -S -G sonobarr -u 1000 sonobarr
# Copy only requirements first
COPY requirements.txt /sonobarr/
@@ -20,6 +22,10 @@ COPY migrations/ /sonobarr/migrations/
COPY gunicorn_config.py /sonobarr/
COPY init.sh /sonobarr/
RUN chmod +x init.sh
RUN chmod 755 init.sh \
&& mkdir -p /sonobarr/config \
&& chown -R sonobarr:sonobarr /sonobarr/config
USER sonobarr
ENTRYPOINT ["./init.sh"]

45
init.sh
View File

@@ -13,19 +13,32 @@ cat << 'EOF'
██ ██ ██ ██ ██ |_______/ \______/ |__| \__| \______/ |______/ /__/ \__\ | _| `._____| | _| `._____|
EOF
PUID=${PUID:-1000}
PGID=${PGID:-1000}
CURRENT_UID="$(id -u)"
CURRENT_GID="$(id -g)"
PUID=${PUID:-}
PGID=${PGID:-}
APP_DIR=/sonobarr
SRC_DIR="${APP_DIR}/src"
CONFIG_DIR="${APP_DIR}/config"
CONFIG_MIGRATIONS_DIR="${CONFIG_DIR}/migrations"
MIGRATIONS_DIR=${MIGRATIONS_DIR:-${APP_DIR}/migrations}
WRITABLE_PATHS="${CONFIG_DIR}"
export PYTHONPATH=${PYTHONPATH:-${SRC_DIR}}
export FLASK_APP=${FLASK_APP:-src.Sonobarr}
export FLASK_ENV=${FLASK_ENV:-production}
export FLASK_RUN_FROM_CLI=${FLASK_RUN_FROM_CLI:-true}
# If we're not running as root, make sure UID/GID align with the current user
if [ "${CURRENT_UID}" -eq 0 ]; then
PUID=${PUID:-1000}
PGID=${PGID:-1000}
else
PUID=${PUID:-${CURRENT_UID}}
PGID=${PGID:-${CURRENT_GID}}
fi
echo "-----------------"
echo -e "\033[1mRunning with:\033[0m"
echo "PUID=${PUID}"
@@ -35,7 +48,12 @@ echo "-----------------"
# Create the required directories with the correct permissions
echo "Setting up directories.."
mkdir -p "${CONFIG_DIR}"
chown -R ${PUID}:${PGID} "${APP_DIR}"
if [ "${CURRENT_UID}" -eq 0 ]; then
for path in ${WRITABLE_PATHS}; do
chown -R ${PUID}:${PGID} "${path}"
done
fi
if [ -d "${CONFIG_MIGRATIONS_DIR}" ]; then
echo "Removing legacy migrations directory at ${CONFIG_MIGRATIONS_DIR}..."
@@ -47,8 +65,25 @@ if [ ! -d "${MIGRATIONS_DIR}" ]; then
exit 1
fi
if [ "${CURRENT_UID}" -eq 0 ]; then
RUNNER="su-exec ${PUID}:${PGID}"
else
if [ "${PUID}" != "${CURRENT_UID}" ] || [ "${PGID}" != "${CURRENT_GID}" ]; then
echo "Warning: running as UID ${CURRENT_UID} but PUID=${PUID}; ignoring PUID/PGID overrides because process is not root." >&2
fi
RUNNER=""
fi
echo "Applying database migrations..."
SONOBARR_SKIP_PROFILE_BACKFILL=1 su-exec ${PUID}:${PGID} flask db upgrade --directory "${MIGRATIONS_DIR}"
if [ -n "${RUNNER}" ]; then
SONOBARR_SKIP_PROFILE_BACKFILL=1 ${RUNNER} flask db upgrade --directory "${MIGRATIONS_DIR}"
else
SONOBARR_SKIP_PROFILE_BACKFILL=1 flask db upgrade --directory "${MIGRATIONS_DIR}"
fi
echo "Starting app..."
exec su-exec ${PUID}:${PGID} gunicorn src.Sonobarr:app -c gunicorn_config.py
if [ -n "${RUNNER}" ]; then
exec ${RUNNER} gunicorn src.Sonobarr:app -c gunicorn_config.py
else
exec gunicorn src.Sonobarr:app -c gunicorn_config.py
fi

View File

@@ -1041,40 +1041,6 @@ class DataHandler:
return ""
return str(value).strip()
def _coerce_bool(value: Any, current: bool) -> bool:
if isinstance(value, bool):
return value
if value is None:
return current
if isinstance(value, (int, float)):
return value != 0
value_str = str(value).strip().lower()
if value_str == "":
return False
return value_str in {"1", "true", "yes", "on"}
def _coerce_int(value: Any, current: int, minimum: int | None = None) -> int:
if value in (None, ""):
return current
try:
parsed = int(value)
except (TypeError, ValueError):
return current
if minimum is not None and parsed < minimum:
return minimum
return parsed
def _coerce_float(value: Any, current: float, minimum: float | None = None) -> float:
if value in (None, ""):
return current
try:
parsed = float(value)
except (TypeError, ValueError):
return current
if minimum is not None and parsed < minimum:
return minimum
return parsed
if "lidarr_address" in data:
self.lidarr_address = _clean_str(data.get("lidarr_address"))
if "lidarr_api_key" in data:
@@ -1547,7 +1513,7 @@ class DataHandler:
def load_environ_or_config_settings(self) -> None:
default_settings = {
"lidarr_address": "http://192.168.1.1:8686",
"lidarr_address": "",
"lidarr_api_key": "",
"root_folder_path": "/data/media/music/",
"fallback_to_top_result": False,
@@ -1558,7 +1524,7 @@ class DataHandler:
"dry_run_adding_to_lidarr": False,
"app_name": "Sonobarr",
"app_rev": "0.10",
"app_url": "http://" + "".join(random.choices(string.ascii_lowercase, k=10)) + ".com",
"app_url": "https://" + "".join(random.choices(string.ascii_lowercase, k=10)) + ".com", # NOSONAR(S2245)
"last_fm_api_key": "",
"last_fm_api_secret": "",
"auto_start": False,

View File

@@ -27,22 +27,49 @@ class OpenAIRecommender:
model: str | None = None,
max_seed_artists: int = DEFAULT_MAX_SEED_ARTISTS,
timeout: float | None = DEFAULT_OPENAI_TIMEOUT,
temperature: float | None = 0.7,
) -> None:
self.timeout = timeout
self.client = OpenAI(api_key=api_key, timeout=timeout)
self.model = model or DEFAULT_OPENAI_MODEL
self.max_seed_artists = max_seed_artists
self.temperature = temperature
def _extract_array_fragment(self, content: str) -> Optional[str]:
if not content:
return None
@staticmethod
def _iter_fenced_code_blocks(text: str):
start = 0
text_length = len(text)
while start < text_length:
open_idx = text.find("```", start)
if open_idx == -1:
return
label_start = open_idx + 3
label_end = label_start
while label_end < text_length and text[label_end] not in ("\n", "\r"):
label_end += 1
label = text[label_start:label_end].strip().lower()
content_start = label_end
if content_start < text_length and text[content_start] == "\r":
content_start += 1
if content_start < text_length and text[content_start] == "\n":
content_start += 1
close_idx = text.find("```", content_start)
if close_idx == -1:
return
yield label, text[content_start:close_idx]
start = close_idx + 3
# Prefer fenced code blocks labelled json (e.g. ```json ... ```)
for match in re.finditer(r"```(?:json)?\s*(.*?)```", content, flags=re.IGNORECASE | re.DOTALL):
candidate = match.group(1).strip()
def _extract_from_fenced_blocks(self, content: str) -> Optional[str]:
for label, block in self._iter_fenced_code_blocks(content):
if label and label != "json":
continue
candidate = block.strip()
if candidate.startswith("["):
return candidate
return None
@staticmethod
def _find_first_json_array(content: str) -> Optional[str]:
content_stripped = content.strip()
if content_stripped.startswith("["):
return content_stripped
@@ -51,32 +78,41 @@ class OpenAIRecommender:
text_length = len(content)
idx = 0
while idx < text_length:
char = content[idx]
if char == "[":
try:
parsed, end = decoder.raw_decode(content[idx:])
except json.JSONDecodeError:
idx += 1
continue
if isinstance(parsed, list):
return content[idx : idx + end]
if content[idx] != "[":
idx += 1
continue
try:
parsed, end = decoder.raw_decode(content[idx:])
except json.JSONDecodeError:
idx += 1
continue
if isinstance(parsed, list):
return content[idx : idx + end]
idx += 1
return None
def generate_seed_artists(
self,
prompt: str,
existing_artists: Sequence[str] | None = None,
) -> List[str]:
existing_artists = existing_artists or []
def _extract_array_fragment(self, content: str) -> Optional[str]:
if not content:
return None
candidate = self._extract_from_fenced_blocks(content)
if candidate:
return candidate
return self._find_first_json_array(content)
def _build_prompts(self, prompt: str, existing_artists: Sequence[str]) -> tuple[str, str]:
system_prompt = _SYSTEM_PROMPT.format(max_artists=self.max_seed_artists)
existing_preview = ", ".join(existing_artists[:50]) if existing_artists else "None provided."
user_prompt = (
"User request:\n"
f"{prompt.strip()}\n\n"
"Artists already in the library:\n"
f"{', '.join(existing_artists[:50]) if existing_artists else 'None provided.'}"
f"{existing_preview}"
)
return system_prompt, user_prompt
def _prepare_request(self, system_prompt: str, user_prompt: str) -> dict:
request_kwargs = {
"model": self.model,
"messages": [
@@ -84,17 +120,16 @@ class OpenAIRecommender:
{"role": "user", "content": user_prompt},
],
}
if self.temperature is not None:
request_kwargs["temperature"] = self.temperature
return request_kwargs
temperature_value = 0.7
if temperature_value is not None:
request_kwargs["temperature"] = temperature_value
def _execute_request(self, request_kwargs: dict):
attempts = 2
last_exc: Optional[Exception] = None
for attempt in range(attempts):
try:
response = self.client.chat.completions.create(**request_kwargs)
break
return self.client.chat.completions.create(**request_kwargs)
except OpenAIError as exc: # pragma: no cover - network failure path
message = str(exc)
last_exc = exc
@@ -107,20 +142,79 @@ class OpenAIRecommender:
if "timed out" in message.lower() and attempt + 1 < attempts:
continue
raise RuntimeError(message) from exc
else: # pragma: no cover - defensive
if last_exc is not None:
raise RuntimeError(str(last_exc)) from last_exc
raise RuntimeError("OpenAI request failed without response")
if last_exc is not None: # pragma: no cover - defensive
raise RuntimeError(str(last_exc)) from last_exc
raise RuntimeError("OpenAI request failed without response") # pragma: no cover - defensive
@staticmethod
def _extract_response_content(response) -> str:
try:
content = response.choices[0].message.content
except (AttributeError, IndexError, KeyError) as exc:
raise RuntimeError("Unexpected response format from OpenAI.") from exc
return content or ""
def _load_json_payload(self, array_fragment: str):
try:
return json.loads(array_fragment)
except json.JSONDecodeError as exc:
raise RuntimeError(
"OpenAI response was not valid JSON. "
"Please try rephrasing your request."
) from exc
@staticmethod
def _coerce_artist_entries(raw_data):
if isinstance(raw_data, list):
return raw_data
if isinstance(raw_data, dict):
candidate_list = raw_data.get("artists") or raw_data.get("seeds")
if isinstance(candidate_list, list):
return candidate_list
raise RuntimeError("OpenAI response JSON was not a list of artists.")
@staticmethod
def _normalize_artist_entry(item) -> Optional[str]:
if isinstance(item, str):
candidate = item.strip()
return candidate or None
if isinstance(item, dict):
name = item.get("name")
if isinstance(name, str):
candidate = name.strip()
return candidate or None
return None
def _dedupe_and_limit(self, items: Sequence) -> List[str]:
seeds: List[str] = []
seen: set[str] = set()
for item in items:
artist = self._normalize_artist_entry(item)
if not artist:
continue
lower_name = artist.lower()
if lower_name in seen:
continue
seeds.append(artist)
seen.add(lower_name)
if len(seeds) >= self.max_seed_artists:
break
return seeds
def generate_seed_artists(
self,
prompt: str,
existing_artists: Sequence[str] | None = None,
) -> List[str]:
catalog_artists = existing_artists or []
system_prompt, user_prompt = self._build_prompts(prompt, catalog_artists)
request_kwargs = self._prepare_request(system_prompt, user_prompt)
response = self._execute_request(request_kwargs)
content = self._extract_response_content(response).strip()
if not content:
return []
content = content.strip()
array_fragment = self._extract_array_fragment(content)
if not array_fragment:
raise RuntimeError(
@@ -128,42 +222,6 @@ class OpenAIRecommender:
"Please try rephrasing your request."
)
try:
raw_data = json.loads(array_fragment)
except json.JSONDecodeError as exc:
raise RuntimeError(
"OpenAI response was not valid JSON. "
"Please try rephrasing your request."
) from exc
if not isinstance(raw_data, list):
if isinstance(raw_data, dict):
candidate_list = raw_data.get("artists") or raw_data.get("seeds")
if isinstance(candidate_list, list):
raw_data = candidate_list
else:
raise RuntimeError("OpenAI response JSON was not a list of artists.")
else:
raise RuntimeError("OpenAI response JSON was not a list of artists.")
seeds: List[str] = []
for item in raw_data:
if isinstance(item, str):
artist = item.strip()
if artist and artist.lower() not in {
artist_name.lower() for artist_name in seeds
}:
seeds.append(artist)
elif isinstance(item, dict):
name = item.get("name") if isinstance(item.get("name"), str) else None
if name:
artist = name.strip()
if artist and artist.lower() not in {
artist_name.lower() for artist_name in seeds
}:
seeds.append(artist)
if len(seeds) > self.max_seed_artists:
seeds = seeds[: self.max_seed_artists]
return seeds
raw_payload = self._load_json_payload(array_fragment)
normalized_items = self._coerce_artist_entries(raw_payload)
return self._dedupe_and_limit(normalized_items)

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
import datetime
from datetime import datetime, timezone
from functools import wraps
@@ -24,136 +24,165 @@ def admin_required(view):
return wrapped
@bp.route("/users", methods=["GET", "POST"])
def _create_user_from_form(form):
username = (form.get("username") or "").strip()
password = form.get("password") or ""
confirm_password = (form.get("confirm_password") or "").strip()
display_name = (form.get("display_name") or "").strip()
avatar_url = (form.get("avatar_url") or "").strip()
is_admin = form.get("is_admin") == "on"
if not username or not password:
flash("Username and password are required.", "danger")
return
if password != confirm_password:
flash("Password confirmation does not match.", "danger")
return
if User.query.filter_by(username=username).first():
flash("Username already exists.", "danger")
return
user = User(
username=username,
display_name=display_name or None,
avatar_url=avatar_url or None,
is_admin=is_admin,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash(f"User '{username}' created.", "success")
def _delete_user_from_form(form):
try:
user_id = int(form.get("user_id", "0"))
except ValueError:
flash("Invalid user id.", "danger")
return
user = User.query.get(user_id)
if not user:
flash("User not found.", "danger")
return
if user.id == current_user.id:
flash("You cannot delete your own account.", "warning")
return
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
flash("At least one administrator must remain.", "warning")
return
# Delete associated artist requests first
ArtistRequest.query.filter_by(requested_by_id=user_id).delete()
ArtistRequest.query.filter_by(approved_by_id=user_id).delete()
db.session.delete(user)
db.session.commit()
flash(f"User '{user.username}' deleted.", "success")
def _resolve_artist_request(form):
request_id = form.get("request_id")
if not request_id:
flash("Invalid request ID.", "danger")
return None
try:
request_id_int = int(request_id)
except ValueError:
flash("Invalid request ID.", "danger")
return None
artist_request = ArtistRequest.query.get(request_id_int)
if not artist_request:
flash("Artist request not found.", "danger")
return None
if artist_request.status != "pending":
flash("Request has already been processed.", "warning")
return None
return artist_request
def _approve_artist_request(artist_request: ArtistRequest):
data_handler = current_app.extensions.get("data_handler")
if not data_handler:
flash(f"Failed to add '{artist_request.artist_name}' to Lidarr. Request not approved.", "danger")
return
session_key = f"admin_{current_user.id}"
data_handler.ensure_session(session_key, current_user.id, True)
result_status = data_handler.add_artists(session_key, artist_request.artist_name)
if result_status != "Added":
flash(f"Failed to add '{artist_request.artist_name}' to Lidarr. Request not approved.", "danger")
return
artist_request.status = "approved"
artist_request.approved_by_id = current_user.id
artist_request.approved_at = datetime.now(timezone.utc)
db.session.commit()
approved_artist = {"Name": artist_request.artist_name, "Status": "Added"}
data_handler.socketio.emit("refresh_artist", approved_artist)
flash(f"Request for '{artist_request.artist_name}' approved and added to Lidarr.", "success")
def _reject_artist_request(artist_request: ArtistRequest):
artist_request.status = "rejected"
artist_request.approved_by_id = current_user.id
artist_request.approved_at = datetime.now(timezone.utc)
db.session.commit()
data_handler = current_app.extensions.get("data_handler")
if data_handler:
rejected_artist = {"Name": artist_request.artist_name, "Status": "Rejected"}
data_handler.socketio.emit("refresh_artist", rejected_artist)
flash(f"Request for '{artist_request.artist_name}' rejected.", "success")
@bp.get("/users")
@login_required
@admin_required
def users():
if request.method == "POST":
action = request.form.get("action")
if action == "create":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
confirm_password = (request.form.get("confirm_password") or "").strip()
display_name = (request.form.get("display_name") or "").strip()
avatar_url = (request.form.get("avatar_url") or "").strip()
is_admin = request.form.get("is_admin") == "on"
if not username or not password:
flash("Username and password are required.", "danger")
elif password != confirm_password:
flash("Password confirmation does not match.", "danger")
elif User.query.filter_by(username=username).first():
flash("Username already exists.", "danger")
else:
user = User(
username=username,
display_name=display_name or None,
avatar_url=avatar_url or None,
is_admin=is_admin,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash(f"User '{username}' created.", "success")
elif action == "delete":
try:
user_id = int(request.form.get("user_id", "0"))
except ValueError:
flash("Invalid user id.", "danger")
else:
user = User.query.get(user_id)
if not user:
flash("User not found.", "danger")
elif user.id == current_user.id:
flash("You cannot delete your own account.", "warning")
elif user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
flash("At least one administrator must remain.", "warning")
else:
# Delete associated artist requests first
ArtistRequest.query.filter_by(requested_by_id=user_id).delete()
ArtistRequest.query.filter_by(approved_by_id=user_id).delete()
db.session.delete(user)
db.session.commit()
flash(f"User '{user.username}' deleted.", "success")
return redirect(url_for("admin.users"))
users = User.query.order_by(User.username.asc()).all()
return render_template("admin_users.html", users=users)
users_list = User.query.order_by(User.username.asc()).all()
return render_template("admin_users.html", users=users_list)
@bp.route("/artist-requests", methods=["GET", "POST"])
@bp.post("/users")
@login_required
@admin_required
def modify_users():
action = request.form.get("action")
if action == "create":
_create_user_from_form(request.form)
elif action == "delete":
_delete_user_from_form(request.form)
else:
flash("Invalid action.", "danger")
return redirect(url_for("admin.users"))
@bp.get("/artist-requests")
@login_required
@admin_required
def artist_requests():
if request.method == "POST":
action = request.form.get("action")
request_id = request.form.get("request_id")
if not request_id:
flash("Invalid request ID.", "danger")
return redirect(url_for("admin.artist_requests"))
try:
request_id = int(request_id)
except ValueError:
flash("Invalid request ID.", "danger")
return redirect(url_for("admin.artist_requests"))
artist_request = ArtistRequest.query.get(request_id)
if not artist_request:
flash("Artist request not found.", "danger")
return redirect(url_for("admin.artist_requests"))
if artist_request.status != "pending":
flash("Request has already been processed.", "warning")
return redirect(url_for("admin.artist_requests"))
if action == "approve":
# Add to Lidarr first
data_handler = current_app.extensions.get("data_handler")
success = False
if data_handler:
# Create a dummy session for the admin
admin_session = data_handler.ensure_session(f"admin_{current_user.id}", current_user.id, True)
# Add the artist to Lidarr
result_status = data_handler.add_artists(f"admin_{current_user.id}", artist_request.artist_name)
success = result_status == "Added"
if success:
artist_request.status = "approved"
artist_request.approved_by_id = current_user.id
artist_request.approved_at = datetime.datetime.utcnow()
db.session.commit()
# Notify all connected clients about the approval
approved_artist = {"Name": artist_request.artist_name, "Status": "Added"}
data_handler.socketio.emit("refresh_artist", approved_artist)
flash(f"Request for '{artist_request.artist_name}' approved and added to Lidarr.", "success")
else:
flash(f"Failed to add '{artist_request.artist_name}' to Lidarr. Request not approved.", "danger")
elif action == "reject":
artist_request.status = "rejected"
artist_request.approved_by_id = current_user.id
artist_request.approved_at = datetime.datetime.utcnow()
db.session.commit()
# Notify all connected clients about the rejection
data_handler = current_app.extensions.get("data_handler")
if data_handler:
# Emit refresh_artist event to all connected clients
rejected_artist = {"Name": artist_request.artist_name, "Status": "Rejected"}
data_handler.socketio.emit("refresh_artist", rejected_artist)
flash(f"Request for '{artist_request.artist_name}' rejected.", "success")
else:
flash("Invalid action.", "danger")
return redirect(url_for("admin.artist_requests"))
# GET request - show pending requests
pending_requests = ArtistRequest.query.filter_by(status="pending").order_by(
ArtistRequest.created_at.desc()
).all()
return render_template("admin_artist_requests.html", requests=pending_requests)
@bp.post("/artist-requests")
@login_required
@admin_required
def modify_artist_requests():
action = request.form.get("action")
artist_request = _resolve_artist_request(request.form)
if not artist_request:
return redirect(url_for("admin.artist_requests"))
if action == "approve":
_approve_artist_request(artist_request)
elif action == "reject":
_reject_artist_request(artist_request)
else:
flash("Invalid action.", "danger")
return redirect(url_for("admin.artist_requests"))

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from flask import Blueprint, current_app, jsonify, request
from flask_login import current_user
@@ -9,31 +11,53 @@ from ..models import ArtistRequest, User
bp = Blueprint("api", __name__, url_prefix="/api")
_ERROR_KEY_INVALID = {"error": "Invalid API key"}
_ERROR_INTERNAL = {"error": "Internal server error"}
def _normalize_api_key(key_value):
if key_value is None:
return None
return str(key_value).strip()
def _configured_api_key():
configured_key = current_app.config.get("API_KEY")
if configured_key:
return _normalize_api_key(configured_key)
data_handler = current_app.extensions.get("data_handler")
if data_handler is not None:
derived_key = getattr(data_handler, "api_key", None)
return _normalize_api_key(derived_key)
return None
def _resolve_request_api_key():
header_key = request.headers.get("X-API-Key")
if header_key is not None:
return _normalize_api_key(header_key)
header_key_alt = request.headers.get("X-Api-Key")
if header_key_alt is not None:
return _normalize_api_key(header_key_alt)
query_key = request.args.get("api_key") or request.args.get("key")
return _normalize_api_key(query_key)
def api_key_required(view):
"""Decorator to require API key for API endpoints."""
def wrapped(*args, **kwargs):
# Extract API key from headers or query params
api_key = request.headers.get("X-API-Key")
if api_key is None:
api_key = request.headers.get("X-Api-Key")
if api_key is None:
api_key = request.args.get("api_key") or request.args.get("key")
if api_key is not None:
api_key = str(api_key).strip()
api_key = _resolve_request_api_key()
configured_key = _configured_api_key()
configured_key = current_app.config.get("API_KEY")
if not configured_key:
data_handler = current_app.extensions.get("data_handler")
if data_handler is not None:
configured_key = getattr(data_handler, "api_key", None)
if configured_key:
configured_key = str(configured_key).strip()
if configured_key and api_key != configured_key:
return jsonify({"error": "Invalid API key"}), 401
if configured_key and configured_key != api_key:
return jsonify(_ERROR_KEY_INVALID), 401
return view(*args, **kwargs)
wrapped.__name__ = view.__name__
return wrapped
@@ -72,7 +96,7 @@ def status():
})
except Exception as e:
current_app.logger.error(f"API status error: {e}")
return jsonify({"error": "Internal server error"}), 500
return jsonify(_ERROR_INTERNAL), 500
@bp.route("/artist-requests")
@@ -108,7 +132,7 @@ def artist_requests():
})
except Exception as e:
current_app.logger.error(f"API artist-requests error: {e}")
return jsonify({"error": "Internal server error"}), 500
return jsonify(_ERROR_INTERNAL), 500
@bp.route("/stats")
@@ -128,8 +152,7 @@ def stats():
rejected_requests = ArtistRequest.query.filter_by(status="rejected").count()
# Recent activity (last 7 days)
from datetime import datetime, timedelta
week_ago = datetime.utcnow() - timedelta(days=7)
week_ago = datetime.now(timezone.utc) - timedelta(days=7)
recent_requests = ArtistRequest.query.filter(ArtistRequest.created_at >= week_ago).count()
# Top requesters
@@ -162,4 +185,4 @@ def stats():
})
except Exception as e:
current_app.logger.error(f"API stats error: {e}")
return jsonify({"error": "Internal server error"}), 500
return jsonify(_ERROR_INTERNAL), 500

View File

@@ -10,38 +10,55 @@ from ..extensions import db
bp = Blueprint("auth", __name__)
_HOME_ENDPOINT = "main.home"
@bp.route("/login", methods=["GET", "POST"])
def _authenticate(username: str, password: str):
if not username or not password:
flash("Username and password are required.", "danger")
return None
try:
user = User.query.filter_by(username=username).first()
except (OperationalError, ProgrammingError) as exc:
current_app.logger.warning(
"Database schema not ready during login attempt for username %s: %s",
username,
exc,
)
db.session.rollback()
flash("Database upgrade in progress. Please try again in a moment.", "warning")
return None
if not user or not user.check_password(password):
flash("Invalid username or password.", "danger")
return None
if not user.is_active:
flash("Account is disabled.", "danger")
return None
login_user(user)
flash("Welcome to Sonobarr!", "success")
return redirect(url_for(_HOME_ENDPOINT))
@bp.get("/login")
def login():
if current_user.is_authenticated:
return redirect(url_for("main.home"))
return redirect(url_for(_HOME_ENDPOINT))
return render_template("login.html")
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
if not username or not password:
flash("Username and password are required.", "danger")
else:
try:
user = User.query.filter_by(username=username).first()
except (OperationalError, ProgrammingError) as exc:
current_app.logger.warning(
"Database schema not ready during login attempt for username %s: %s",
username,
exc,
)
db.session.rollback()
flash("Database upgrade in progress. Please try again in a moment.", "warning")
else:
if not user or not user.check_password(password):
flash("Invalid username or password.", "danger")
elif not user.is_active:
flash("Account is disabled.", "danger")
else:
login_user(user)
flash("Welcome to Sonobarr!", "success")
return redirect(url_for("main.home"))
@bp.post("/login")
def login_submit():
if current_user.is_authenticated:
return redirect(url_for(_HOME_ENDPOINT))
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
response = _authenticate(username, password)
if response is not None:
return response
return render_template("login.html")

View File

@@ -15,50 +15,67 @@ def home():
return render_template("base.html")
@bp.route("/profile", methods=["GET", "POST"])
def _update_user_profile(form_data, user):
display_name = (form_data.get("display_name") or "").strip()
avatar_url = (form_data.get("avatar_url") or "").strip()
lastfm_username = (form_data.get("lastfm_username") or "").strip()
user.display_name = display_name or None
user.avatar_url = avatar_url or None
user.lastfm_username = lastfm_username or None
new_password = form_data.get("new_password", "")
confirm_password = form_data.get("confirm_password", "")
current_password = form_data.get("current_password", "")
errors: list[str] = []
password_changed = False
if not new_password:
return errors, password_changed
if new_password != confirm_password:
errors.append("New password and confirmation do not match.")
elif len(new_password) < 8:
errors.append("New password must be at least 8 characters long.")
elif not user.check_password(current_password):
errors.append("Current password is incorrect.")
else:
user.set_password(new_password)
password_changed = True
return errors, password_changed
def _refresh_personal_sources(user):
data_handler = current_app.extensions.get("data_handler")
if not data_handler or user.id is None:
return
try:
data_handler.refresh_personal_sources_for_user(int(user.id))
except Exception as exc: # pragma: no cover - defensive logging
current_app.logger.error("Failed to refresh personal discovery state: %s", exc)
@bp.get("/profile")
@login_required
def profile():
if request.method == "POST":
display_name = (request.form.get("display_name") or "").strip()
avatar_url = (request.form.get("avatar_url") or "").strip()
lastfm_username = (request.form.get("lastfm_username") or "").strip()
current_user.display_name = display_name or None
current_user.avatar_url = avatar_url or None
current_user.lastfm_username = lastfm_username or None
new_password = request.form.get("new_password", "")
confirm_password = request.form.get("confirm_password", "")
current_password = request.form.get("current_password", "")
password_changed = False
errors: list[str] = []
if new_password:
if new_password != confirm_password:
errors.append("New password and confirmation do not match.")
elif len(new_password) < 8:
errors.append("New password must be at least 8 characters long.")
elif not current_user.check_password(current_password):
errors.append("Current password is incorrect.")
else:
current_user.set_password(new_password)
password_changed = True
if errors:
for message in errors:
flash(message, "danger")
db.session.rollback()
else:
db.session.commit()
flash("Profile updated.", "success")
if password_changed:
flash("Password updated.", "success")
data_handler = current_app.extensions.get("data_handler")
if data_handler and current_user.id is not None:
try:
data_handler.refresh_personal_sources_for_user(int(current_user.id))
except Exception as exc: # pragma: no cover - defensive logging
current_app.logger.error("Failed to refresh personal discovery state: %s", exc)
return redirect(url_for("main.profile"))
return render_template("profile.html")
@bp.post("/profile")
@login_required
def update_profile():
errors, password_changed = _update_user_profile(request.form, current_user)
if errors:
for message in errors:
flash(message, "danger")
db.session.rollback()
else:
db.session.commit()
flash("Profile updated.", "success")
if password_changed:
flash("Password updated.", "success")
_refresh_personal_sources(current_user)
return redirect(url_for("main.profile"))