refactor: update type hints for better clarity and consistency across multiple files

This commit is contained in:
Matthieu B
2025-10-23 19:49:58 +02:00
parent b7916658a3
commit 6a3632ec66
26 changed files with 88 additions and 180 deletions

View File

@@ -9,7 +9,7 @@ from .logging_config import configure_logging
from .middleware import require_onboarding
def create_app(config_object=DevelopmentConfig): # noqa: C901, PLR0912, PLR0915
def create_app(config_object=DevelopmentConfig):
"""Create and configure Flask application with clean startup sequence."""
from .logging_helpers import AppLogger, should_show_startup

View File

@@ -6,6 +6,7 @@ for managing and viewing media playback activity data.
"""
from datetime import UTC, datetime, timedelta
from typing import Any
import structlog
from flask import (
@@ -158,7 +159,7 @@ def _settings_action_response(
if error:
flash(error, "error")
extra_params: dict[str, object] = {}
extra_params: dict[str, Any] = {}
if selected_days_back is not None:
extra_params["days_back"] = selected_days_back
return redirect(url_for("activity.activity_settings", **extra_params))

View File

@@ -1,5 +1,7 @@
"""Helper utilities for activity blueprint routes."""
from typing import Any
import structlog
from flask import current_app, flash, redirect, render_template, request, url_for
from sqlalchemy.orm import joinedload
@@ -127,7 +129,7 @@ def settings_action_response(
if error:
flash(error, "error")
extra_params: dict[str, object] = {}
extra_params: dict[str, Any] = {}
if selected_days_back is not None:
extra_params["days_back"] = selected_days_back
return redirect(url_for("activity.activity_settings", **extra_params))

View File

@@ -213,7 +213,7 @@ def invites():
@admin_bp.route("/invite/table", methods=["POST"])
@login_required
def invite_table(): # noqa: C901, PLR0912, PLR0915
def invite_table():
"""
HTMX partial that renders the invitation cards grid.

View File

@@ -337,7 +337,7 @@ def password_prompt(code):
try:
if srv.server_type in ("jellyfin", "emby"):
uid = client.create_user(username, pw)
elif srv.server_type == "audiobookshelf" or srv.server_type == "romm":
elif srv.server_type in ("audiobookshelf", "romm"):
uid = client.create_user(username, pw, email=email)
else:
continue # unknown server type

View File

@@ -2,6 +2,7 @@ import logging.config
import os
import sys
from pathlib import Path
from typing import Any, cast
LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING").upper()
@@ -68,8 +69,10 @@ LOGGING_CONFIG = {
def configure_logging() -> None:
"""Call this once at start-up."""
if "file" in LOGGING_CONFIG.get("handlers", {}):
log_path = Path(LOGGING_CONFIG["handlers"]["file"]["filename"])
handlers = cast(dict[str, Any], LOGGING_CONFIG.get("handlers", {}))
if "file" in handlers:
file_handler = cast(dict[str, Any], handlers["file"])
log_path = Path(cast(str, file_handler["filename"]))
log_path.parent.mkdir(parents=True, exist_ok=True)
logging.config.dictConfig(LOGGING_CONFIG)

View File

@@ -188,7 +188,6 @@ class User(db.Model, UserMixin):
# Legacy metadata caching fields (will be phased out)
library_access_json = db.Column(db.Text, nullable=True)
raw_policies_json = db.Column(db.Text, nullable=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -228,31 +227,9 @@ class User(db.Model, UserMixin):
]
self.library_access_json = json.dumps(library_access)
def get_raw_policies(self):
"""Get deserialized raw policies data."""
import json
if not self.raw_policies_json:
return {}
try:
return json.loads(self.raw_policies_json)
except (json.JSONDecodeError, TypeError):
return {}
def set_raw_policies(self, policies):
"""Set raw policies data, serializing to JSON."""
import json
if policies is None:
self.raw_policies_json = None
else:
self.raw_policies_json = json.dumps(policies)
def has_cached_metadata(self):
"""Check if user has cached metadata available."""
return (
self.library_access_json is not None or self.raw_policies_json is not None
)
return self.library_access_json is not None
def get_accessible_libraries(self):
"""Get list of accessible library names."""

View File

@@ -15,7 +15,7 @@ Usage:
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import Any
from typing import Any, cast
import structlog
@@ -129,23 +129,18 @@ def cleanup_unknown_activity(
stats["by_status"]["inactive"] += 1 # type: ignore
# Track date range
date_range = cast(dict[str, Any], stats["date_range"])
if (
stats["date_range"]["oldest"] is None
or session.started_at
< stats[ # type: ignore
"date_range"
]["oldest"]
date_range["oldest"] is None
or session.started_at < date_range["oldest"]
):
stats["date_range"]["oldest"] = session.started_at # type: ignore
date_range["oldest"] = session.started_at
if (
stats["date_range"]["newest"] is None
or session.started_at
> stats[ # type: ignore
"date_range"
]["newest"]
date_range["newest"] is None
or session.started_at > date_range["newest"]
):
stats["date_range"]["newest"] = session.started_at # type: ignore
date_range["newest"] = session.started_at
# Log report
logger.info("=" * 70)
@@ -154,30 +149,35 @@ def cleanup_unknown_activity(
logger.info(f"Total sessions with Unknown values: {stats['total_unknown']}")
logger.info("")
logger.info("Breakdown by field:")
logger.info(f" - Unknown user: {stats['by_field']['unknown_user']}")
logger.info(f" - Unknown title: {stats['by_field']['unknown_title']}")
logger.info(f" - Unknown device: {stats['by_field']['unknown_device']}")
by_field = cast(dict[str, int], stats["by_field"])
logger.info(f" - Unknown user: {by_field['unknown_user']}")
logger.info(f" - Unknown title: {by_field['unknown_title']}")
logger.info(f" - Unknown device: {by_field['unknown_device']}")
logger.info("")
logger.info("Breakdown by duration:")
logger.info(f" - Very short (<30s): {stats['by_duration']['very_short']}")
logger.info(f" - Short (30s-5min): {stats['by_duration']['short']}")
logger.info(f" - Medium (5min-30min): {stats['by_duration']['medium']}")
logger.info(f" - Long (>30min): {stats['by_duration']['long']}")
by_duration = cast(dict[str, int], stats["by_duration"])
logger.info(f" - Very short (<30s): {by_duration['very_short']}")
logger.info(f" - Short (30s-5min): {by_duration['short']}")
logger.info(f" - Medium (5min-30min): {by_duration['medium']}")
logger.info(f" - Long (>30min): {by_duration['long']}")
logger.info("")
logger.info("Breakdown by status:")
logger.info(f" - Active: {stats['by_status']['active']}")
logger.info(f" - Inactive: {stats['by_status']['inactive']}")
by_status = cast(dict[str, int], stats["by_status"])
logger.info(f" - Active: {by_status['active']}")
logger.info(f" - Inactive: {by_status['inactive']}")
logger.info("")
if stats["date_range"]["oldest"]:
date_range = cast(dict[str, Any], stats["date_range"])
if date_range["oldest"]:
logger.info("Date range:")
logger.info(f" - Oldest: {stats['date_range']['oldest']}")
logger.info(f" - Newest: {stats['date_range']['newest']}")
logger.info(f" - Oldest: {date_range['oldest']}")
logger.info(f" - Newest: {date_range['newest']}")
logger.info("")
# Perform deletion if requested
if mode == "delete":
if stats["total_unknown"] > 0:
total_unknown = cast(int, stats["total_unknown"])
if total_unknown > 0:
logger.warning(
f"🗑️ DELETING {stats['total_unknown']} sessions with Unknown values..."
)

View File

@@ -42,7 +42,7 @@ class HistoricalDataService:
db.session.add(job)
db.session.commit()
app = current_app._get_current_object()
app = current_app._get_current_object() # type: ignore[attr-defined]
worker = threading.Thread(
target=self._run_import_job,

View File

@@ -108,7 +108,7 @@ class ImageProxyService:
return token
@classmethod
def validate_token(cls, token: str) -> dict | None: # noqa: PLR0911
def validate_token(cls, token: str) -> dict | None:
"""
Validate a stateless signed token and return the URL mapping.

View File

@@ -72,10 +72,12 @@ class InviteCodeManager:
# Check if invitation is expired
import datetime
if invitation.expires and invitation.expires <= datetime.datetime.now(
datetime.UTC
):
return False, None
if invitation.expires:
# Make database datetime timezone-aware (assumes UTC) for comparison
expires_aware = invitation.expires.replace(tzinfo=datetime.UTC)
now = datetime.datetime.now(datetime.UTC)
if expires_aware <= now:
return False, None
# Check if invitation is fully used (limited invitations only)
if not invitation.unlimited and invitation.used:

View File

@@ -41,8 +41,11 @@ def is_invite_valid(code: str) -> tuple[bool, str]:
if not invitation:
return False, "Invalid code"
now = datetime.datetime.now(datetime.UTC)
if invitation.expires and invitation.expires <= now:
return False, "Invitation has expired."
# Make database datetime timezone-aware (assumes UTC) for comparison
if invitation.expires:
expires_aware = invitation.expires.replace(tzinfo=datetime.UTC)
if expires_aware <= now:
return False, "Invitation has expired."
if invitation.used is True and invitation.unlimited is not True:
return False, "Invitation has already been used."
return True, "okay"

View File

@@ -254,7 +254,6 @@ class MediaClient(ABC):
user_id=str(user_identifier),
username=raw_details.get("username", "Unknown"),
email=raw_details.get("email"),
raw_policies=raw_details,
)
def _cache_user_metadata_batch(self, users: list[User]) -> None:

View File

@@ -143,19 +143,6 @@ class DropClient(RestApiMixin):
# Get the corresponding Drop user data
drop_user = remote_by_id.get(user.token, {})
drop_policies = {
# Server-specific data
"enabled": drop_user.get("enabled", True),
"admin": drop_user.get("admin", False),
"displayName": drop_user.get("displayName", user.username),
"profilePictureObjectId": drop_user.get("profilePictureObjectId"),
# Standardized permission keys for UI display
"allow_downloads": True, # Drop supports downloads by default
"allow_live_tv": False, # Drop doesn't support live TV
"allow_camera_upload": False, # Drop doesn't support camera upload
}
user.set_raw_policies(drop_policies)
# Update standardized User model columns
user.allow_downloads = True # Drop supports downloads by default
user.allow_live_tv = False # Drop doesn't support live TV
@@ -326,7 +313,6 @@ class DropClient(RestApiMixin):
created_at=None, # Would need to parse if available
last_active=None, # Not available in API
library_access=None, # Drop doesn't have traditional libraries - indicates full access
raw_policies=raw_user,
)
except Exception as exc:
logging.error("Drop: failed to get user details %s", exc)

View File

@@ -182,7 +182,7 @@ class JellyfinClient(RestApiMixin):
policy["IsDisabled"] = False
response = self.post(f"/Users/{user_id}/Policy", json=policy)
return response.status_code == 204 or response.status_code == 200
return response.status_code in {204, 200}
except Exception as e:
structlog.get_logger().error(f"Failed to enable Jellyfin user: {e}")
return False
@@ -205,7 +205,7 @@ class JellyfinClient(RestApiMixin):
policy["IsDisabled"] = True
response = self.post(f"/Users/{user_id}/Policy", json=policy)
return response.status_code == 204 or response.status_code == 200
return response.status_code in {204, 200}
except Exception as e:
structlog.get_logger().error(f"Failed to disable Jellyfin user: {e}")
return False

View File

@@ -438,7 +438,6 @@ class KavitaClient(RestApiMixin):
email=raw_user.get("email"),
permissions=permissions,
library_access=library_access,
raw_policies=raw_user,
created_at=created_at,
last_active=last_active,
is_enabled=True, # Kavita doesn't seem to have disabled users concept
@@ -579,17 +578,6 @@ class KavitaClient(RestApiMixin):
allow_downloads=True, # Reading app allows downloads by default
)
# Store both server-specific and standardized keys in policies dict
kavita_policies = {
# Server-specific data (Kavita user info would go here)
"enabled": True, # Kavita users are enabled by default
# Standardized permission keys for UI display
"allow_downloads": permissions.allow_downloads,
"allow_live_tv": permissions.allow_live_tv,
"allow_sync": permissions.allow_downloads, # Use downloads setting for sync
}
user.set_raw_policies(kavita_policies)
# Update standardized User model columns
user.allow_downloads = permissions.allow_downloads
user.allow_live_tv = permissions.allow_live_tv

View File

@@ -133,7 +133,7 @@ class KomgaClient(RestApiMixin):
# We can remove all library access to effectively disable the user
user_data = {"sharedLibrariesIds": []}
response = self.patch(f"/api/v2/users/{user_id}", json=user_data)
return response.status_code == 204 or response.status_code == 200
return response.status_code in {204, 200}
except Exception as e:
structlog.get_logger().error(f"Failed to disable Komga user: {e}")
return False
@@ -207,7 +207,6 @@ class KomgaClient(RestApiMixin):
email=raw_user.get("email"),
permissions=permissions,
library_access=library_access,
raw_policies=raw_user,
created_at=created_at,
last_active=last_active,
is_enabled=True, # Komga doesn't have a disabled state in API
@@ -254,22 +253,6 @@ class KomgaClient(RestApiMixin):
# Check for FILE_DOWNLOAD role to determine download permission
allow_downloads = "FILE_DOWNLOAD" in roles
# Store both server-specific and standardized keys in policies dict
komga_policies = {
# Server-specific data (Komga user info)
"enabled": True, # Komga users are enabled by default
"sharedAllLibraries": komga_user_data.get(
"sharedAllLibraries", False
),
"sharedLibrariesIds": komga_user_data.get("sharedLibrariesIds", []),
"roles": roles,
# Standardized permission keys for UI display
"allow_downloads": allow_downloads,
"allow_live_tv": False, # Komga doesn't have Live TV
"allow_sync": True, # Default to True for reading apps
}
user.set_raw_policies(komga_policies)
# Update standardized User model columns
user.allow_downloads = allow_downloads
user.allow_live_tv = False # Komga doesn't have Live TV
@@ -366,7 +349,6 @@ class KomgaClient(RestApiMixin):
email=raw_user.get("email"),
permissions=permissions,
library_access=library_access,
raw_policies=raw_user,
created_at=created_at,
last_active=last_active,
is_enabled=True, # Komga doesn't have a disabled state

View File

@@ -189,20 +189,6 @@ class NavidromeClient(RestApiMixin):
# Use standardized permissions helper for consistency
permissions = StandardizedPermissions.for_navidrome(navidrome_user)
# Store both server-specific and standardized keys in policies dict
navidrome_policies = {
# Server-specific data (navidrome user info)
"name": navidrome_user.get("name", ""),
"userName": navidrome_user.get("userName", ""),
"isAdmin": navidrome_user.get("isAdmin", False),
"lastFMApiKey": navidrome_user.get("lastFMApiKey", ""),
"listenBrainzToken": navidrome_user.get("listenBrainzToken", ""),
# Standardized permission keys for UI display
"allow_downloads": permissions.allow_downloads,
"allow_live_tv": permissions.allow_live_tv,
}
user.set_raw_policies(navidrome_policies)
# Update standardized User model columns
user.allow_downloads = permissions.allow_downloads
user.allow_live_tv = permissions.allow_live_tv
@@ -213,17 +199,6 @@ class NavidromeClient(RestApiMixin):
"navidrome"
)
default_policies = {
"name": "",
"userName": "",
"isAdmin": False,
"lastFMApiKey": "",
"listenBrainzToken": "",
"allow_downloads": default_permissions.allow_downloads,
"allow_live_tv": default_permissions.allow_live_tv,
}
user.set_raw_policies(default_policies)
# Update standardized User model columns with defaults
user.allow_downloads = default_permissions.allow_downloads
user.allow_live_tv = default_permissions.allow_live_tv
@@ -375,22 +350,12 @@ class NavidromeClient(RestApiMixin):
# Navidrome gives full access to all libraries
library_access = LibraryAccessHelper.create_full_access()
# Extract policies information
filtered_policies = {
"adminRole": permissions.is_admin,
"downloadRole": permissions.allow_downloads,
"uploadRole": raw_user.get("uploadRole", False),
"playlistRole": raw_user.get("playlistRole", True),
"streamRole": raw_user.get("streamRole", True),
}
return create_standardized_user_details(
user_id=username,
username=raw_user.get("username", username),
email=raw_user.get("email"),
permissions=permissions,
library_access=library_access,
raw_policies=filtered_policies,
created_at=None, # Navidrome doesn't expose creation date
last_active=None, # Navidrome doesn't expose last seen
is_enabled=True, # Navidrome doesn't have disabled users concept

View File

@@ -1,7 +1,7 @@
import logging
import re
import threading
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
import structlog
from cachetools import TTLCache, cached
@@ -187,7 +187,7 @@ class PlexClient(MediaClient):
"allow_camera_upload": getattr(plex_user, "allowCameraUpload", False),
}
def _filter_users_for_server(self, admin_users, server_id: str) -> dict[str, any]:
def _filter_users_for_server(self, admin_users, server_id: str) -> dict[str, Any]:
"""Filter Plex users who have access to this specific server."""
users_by_email = {}
for plex_user in admin_users:

View File

@@ -184,16 +184,6 @@ class RommClient(RestApiMixin):
# Add default policy attributes (RomM doesn't have specific download/live TV policies)
for user in users:
# Store both server-specific and standardized keys in policies dict
romm_policies = {
# Server-specific data (RomM user info would go here)
"enabled": True, # RomM users are enabled by default
# Standardized permission keys for UI display
"allow_downloads": True, # Default to True for gaming apps
"allow_live_tv": False, # RomM doesn't have Live TV
}
user.set_raw_policies(romm_policies)
# Update standardized User model columns
user.allow_downloads = True # Default for gaming apps
user.allow_live_tv = False # RomM doesn't have Live TV
@@ -357,7 +347,6 @@ class RommClient(RestApiMixin):
email=raw_user.get("email"),
permissions=permissions,
library_access=library_access,
raw_policies=raw_user,
created_at=created_at,
last_active=None, # RomM doesn't track last active time
is_enabled=raw_user.get("enabled", True),

View File

@@ -2,7 +2,6 @@
import datetime
import logging
from typing import Any
from app.models import Library
from app.services.media.user_details import MediaUserDetails, UserLibraryAccess
@@ -195,7 +194,6 @@ def create_standardized_user_details(
email: str | None,
permissions: StandardizedPermissions,
library_access: list[UserLibraryAccess] | None,
raw_policies: dict[str, Any],
created_at: datetime.datetime | None = None,
last_active: datetime.datetime | None = None,
is_enabled: bool = True,
@@ -213,5 +211,4 @@ def create_standardized_user_details(
allow_live_tv=permissions.allow_live_tv,
allow_camera_upload=permissions.allow_camera_upload,
library_access=library_access,
raw_policies=raw_policies,
)

View File

@@ -79,7 +79,6 @@ def upgrade():
except Exception as e:
print(f"Error updating wizard_step table: {e}")
# Don't fail the migration for this, as it's not critical
pass
def downgrade():
@@ -118,4 +117,3 @@ def downgrade():
except Exception as e:
print(f"Error reverting wizard_step table: {e}")
pass

View File

@@ -109,11 +109,26 @@ lint.ignore = [
"INP001", # implicit namespace packages - will create __init__.py files separately
# Phase 2 ignores - Too noisy or opinionated
"PLR2004", # magic-value-comparison - too noisy
# Complexity rules - too strict for media server integrations
"PLR0911", # too-many-return-statements
"PLR0912", # too-many-branches
"PLR0913", # too-many-arguments
"PLR0915", # too-many-statements
"C901", # complex-structure
]
[tool.ruff.lint.mccabe]
max-complexity = 15 # Aligns with CLAUDE.md "15 logical lines" guideline
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = ["S106", "S105", "S113", "S110", "S701", "RUF015", "RUF059", "ARG001", "ARG002", "ARG005", "PERF401", "RUF005", "DTZ005", "PTH110", "PTH108", "PTH118", "PTH120", "PTH207", "PTH119", "PTH107"] # Allow test patterns
"plus/tests/**/*.py" = ["S106", "S105", "S701"] # Allow Plus test patterns
"dev.py" = ["S603", "S607"] # Allow subprocess in dev script
"migrations/**/*.py" = ["ERA001", "ARG001", "S608", "S110"] # Allow migration patterns
"plus/**/*.py" = ["RUF012", "S110", "PERF403", "ARG001", "ARG002", "ARG004", "F841", "RUF022", "RUF059", "ERA001", "SIM102"] # Allow Plus feature patterns
"scripts/**/*.py" = ["S603", "RUF005", "PTH100", "PTH120"] # Allow script patterns
"setup_plus.py" = ["S603", "RUF005"] # Allow subprocess in setup script
[tool.ruff.lint.isort]
known-first-party = ["app"]
known-third-party = ["flask_session"]

View File

@@ -19,14 +19,14 @@ from .media_server_mocks import (
)
__all__ = [
"create_mock_client",
"setup_mock_servers",
"simulate_server_failure",
"simulate_auth_failure",
"simulate_user_creation_failure",
"get_mock_state",
"mock_state",
"MockAudiobookshelfClient",
"MockJellyfinClient",
"MockPlexClient",
"MockAudiobookshelfClient",
"create_mock_client",
"get_mock_state",
"mock_state",
"setup_mock_servers",
"simulate_auth_failure",
"simulate_server_failure",
"simulate_user_creation_failure",
]

View File

@@ -351,7 +351,7 @@ class MockPlexClient:
return True, ""
except Exception as e:
return False, f"Plex error: {str(e)}"
return False, f"Plex error: {e!s}"
def join(
self, username: str, password: str, confirm: str, email: str, code: str
@@ -437,7 +437,7 @@ class MockAudiobookshelfClient:
return True, ""
except Exception as e:
return False, f"Audiobookshelf error: {str(e)}"
return False, f"Audiobookshelf error: {e!s}"
def join(
self, username: str, password: str, confirm: str, email: str, code: str

View File

@@ -593,10 +593,11 @@ class TestInvitationExpiry:
assert db_user.expires is not None
# Should expire in approximately 7 days
expected_expiry = datetime.now() + timedelta(
days=7
) # Use naive datetime like the database
time_diff = abs((db_user.expires - expected_expiry).total_seconds())
# Database stores naive UTC, so compare with UTC time
expected_expiry = datetime.now(UTC) + timedelta(days=7)
time_diff = abs(
(db_user.expires - expected_expiry.replace(tzinfo=None)).total_seconds()
)
assert time_diff < 60 # Within 1 minute of expected