From 70645e7ef3972c6750ec3ff521d1fb902ee05874 Mon Sep 17 00:00:00 2001 From: "Jokob @NetAlertX" <96159884+jokob-sk@users.noreply.github.com> Date: Sun, 22 Feb 2026 03:47:29 +0000 Subject: [PATCH] server-side remember-me --- front/index.php | 80 +++- server/api_server/api_server_start.py | 145 ++++++- server/api_server/openapi/schemas.py | 83 ++++ server/models/parameters_instance.py | 204 +++++++++ .../test_remember_me_endpoints.py | 400 ++++++++++++++++++ 5 files changed, 901 insertions(+), 11 deletions(-) create mode 100644 server/models/parameters_instance.py create mode 100644 test/api_endpoints/test_remember_me_endpoints.py diff --git a/front/index.php b/front/index.php index 261ab93c..fee811ff 100755 --- a/front/index.php +++ b/front/index.php @@ -84,6 +84,52 @@ function is_https_request(): bool { return false; } +function call_api(string $endpoint, array $data = []): ?array { + /* + Call NetAlertX API endpoint (for login page endpoints that don't require auth). + + Returns: JSON response as array, or null on failure + */ + try { + // Determine API host (assume localhost on same port as frontend) + $api_host = $_SERVER['HTTP_HOST'] ?? 'localhost'; + $api_scheme = is_https_request() ? 'https' : 'http'; + $api_url = $api_scheme . '://' . $api_host; + + $url = $api_url . $endpoint; + + $ch = curl_init($url); + if (!$ch) return null; + + curl_setopt_array($ch, [ + CURLOPT_RETURNTRANSFER => true, + CURLOPT_TIMEOUT => 5, + CURLOPT_FOLLOWLOCATION => false, + CURLOPT_HTTPHEADER => [ + 'Content-Type: application/json', + 'Accept: application/json' + ] + ]); + + if (!empty($data)) { + curl_setopt($ch, CURLOPT_POST, true); + curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data)); + } + + $response = curl_exec($ch); + $httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE); + curl_close($ch); + + if ($httpcode !== 200 || !$response) { + return null; + } + + return json_decode($response, true); + } catch (Exception $e) { + return null; + } +} + function logout_user(): void { $_SESSION = []; @@ -127,18 +173,26 @@ if (!empty($_POST['loginpassword'])) { login_user(); + // Handle "Remember Me" if checked if (!empty($_POST['PWRemember'])) { + // Generate random token (64-byte hex = 128 chars, use 64 chars) $token = bin2hex(random_bytes(32)); - $_SESSION['remember_token'] = hash('sha256',$token); - - setcookie(COOKIE_NAME,$token,[ - 'expires'=>time()+604800, - 'path'=>'/', - 'secure'=>is_https_request(), - 'httponly'=>true, - 'samesite'=>'Strict' + // Call API to save token hash to Parameters table + $save_response = call_api('/auth/remember-me/save', [ + 'token' => $token ]); + + // If API call successful, set persistent cookie + if ($save_response && isset($save_response['success']) && $save_response['success']) { + setcookie(COOKIE_NAME, $token, [ + 'expires' => time() + 604800, + 'path' => '/', + 'secure' => is_https_request(), + 'httponly' => true, + 'samesite' => 'Strict' + ]); + } } safe_redirect(append_hash($redirectTo)); @@ -149,9 +203,15 @@ if (!empty($_POST['loginpassword'])) { Remember Me Validation ===================================================== */ -if (!is_authenticated() && !empty($_COOKIE[COOKIE_NAME]) && !empty($_SESSION['remember_token'])) { +if (!is_authenticated() && !empty($_COOKIE[COOKIE_NAME])) { - if (hash_equals($_SESSION['remember_token'], hash('sha256',$_COOKIE[COOKIE_NAME]))) { + // Call API to validate token against stored hash + $validate_response = call_api('/auth/validate-remember', [ + 'token' => $_COOKIE[COOKIE_NAME] + ]); + + // If API returns valid token, authenticate and redirect + if ($validate_response && isset($validate_response['valid']) && $validate_response['valid'] === true) { login_user(); safe_redirect(append_hash($redirectTo)); } diff --git a/server/api_server/api_server_start.py b/server/api_server/api_server_start.py index b70a4d36..82f01e1d 100755 --- a/server/api_server/api_server_start.py +++ b/server/api_server/api_server_start.py @@ -6,6 +6,7 @@ import os from flask import Flask, redirect, request, jsonify, url_for, Response from models.device_instance import DeviceInstance # noqa: E402 +from models.parameters_instance import ParametersInstance # noqa: E402 from flask_cors import CORS from werkzeug.exceptions import HTTPException @@ -94,7 +95,9 @@ from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression] DbQueryRequest, DbQueryResponse, DbQueryUpdateRequest, DbQueryDeleteRequest, AddToQueueRequest, GetSettingResponse, - RecentEventsRequest, SetDeviceAliasRequest + RecentEventsRequest, SetDeviceAliasRequest, + ValidateRememberRequest, ValidateRememberResponse, + SaveRememberRequest, SaveRememberResponse ) from .sse_endpoint import ( # noqa: E402 [flake8 lint suppression] @@ -1933,6 +1936,146 @@ def check_auth(payload=None): return jsonify({"success": True, "message": "Authentication check successful"}), 200 +# -------------------------- +# Remember Me Validation endpoint +# -------------------------- +@app.route("/auth/validate-remember", methods=["POST"]) +@validate_request( + operation_id="validate_remember", + summary="Validate Remember Me Token", + description="Validate a persistent Remember Me token against stored hash. Called from login page (no auth required).", + request_model=ValidateRememberRequest, + response_model=ValidateRememberResponse, + tags=["auth"], + auth_callable=None # No auth required - used on login page +) +def validate_remember(payload=None): + """ + Validate a Remember Me token from persistent cookie. + + Security: Uses timing-safe hash comparison to prevent timing attacks. + Token format: hex-encoded 32 random bytes (64 chars) from bin2hex(random_bytes(32)) + """ + try: + # Extract token from request + data = request.get_json() or {} + token = data.get("token") + + if not token: + mylog("verbose", ["[auth/validate-remember] Missing token in request"]) + return jsonify({ + "success": True, + "valid": False, + "message": "Token validation failed: missing token" + }), 200 + + # Validate token against stored hash + params_instance = ParametersInstance() + result = params_instance.validate_token(token) + + if result['valid']: + mylog("verbose", ["[auth/validate-remember] Token validation successful"]) + return jsonify({ + "success": True, + "valid": True, + "message": "Token validation successful" + }), 200 + else: + mylog("verbose", ["[auth/validate-remember] Token validation failed"]) + return jsonify({ + "success": True, + "valid": False, + "message": "Token validation failed" + }), 200 + + except Exception as e: + mylog("verbose", [f"[auth/validate-remember] Unexpected error: {e}"]) + return jsonify({ + "success": False, + "valid": False, + "error": "Internal server error", + "message": "An unexpected error occurred during token validation" + }), 500 + + +# -------------------------- +# Remember Me Save endpoint +# -------------------------- +@app.route("/auth/remember-me/save", methods=["POST"]) +@validate_request( + operation_id="save_remember", + summary="Save Remember Me Token", + description="Save a Remember Me token to the database. Called after successful login to enable persistent authentication.", + request_model=SaveRememberRequest, + response_model=SaveRememberResponse, + tags=["auth"], + auth_callable=None # No auth required - used on login page +) +def save_remember(payload=None): + """ + Save a Remember Me token. + + Flow: + 1. User logs in with "Remember Me" checkbox + 2. Password validated successfully + 3. Token generated: bin2hex(random_bytes(32)) + 4. This endpoint called: saves hash(token) to Parameters table + 5. Token (unhashed) set in persistent cookie + 6. Session created and user redirected + + Security: Only the HASH is stored in the database, not the token itself. + If database is compromised, attacker cannot use stolen hashes without the original token. + """ + try: + import uuid + import hashlib + + # Extract token from request + data = request.get_json() or {} + token = data.get("token") + + if not token or len(token) < 64: + mylog("verbose", ["[auth/remember-me/save] Invalid or missing token"]) + return jsonify({ + "success": False, + "error": "Invalid token", + "message": "Token must be 64+ hex characters" + }), 400 + + # Hash the token + token_hash = hashlib.sha256(token.encode('utf-8')).hexdigest() + + # Generate UUID-based parameter ID + token_id = f"remember_me_token_{uuid.uuid4()}" + + # Store hash in Parameters table + params_instance = ParametersInstance() + success = params_instance.set_parameter(token_id, token_hash) + + if success: + mylog("verbose", [f"[auth/remember-me/save] Token saved successfully: {token_id}"]) + return jsonify({ + "success": True, + "message": "Remember Me token saved successfully", + "token_id": token_id + }), 200 + else: + mylog("verbose", ["[auth/remember-me/save] Failed to save token to database"]) + return jsonify({ + "success": False, + "error": "Database error", + "message": "Failed to save Remember Me token" + }), 500 + + except Exception as e: + mylog("verbose", [f"[auth/remember-me/save] Unexpected error: {e}"]) + return jsonify({ + "success": False, + "error": "Internal server error", + "message": "An unexpected error occurred while saving Remember Me token" + }), 500 + + # -------------------------- # Health endpoint # -------------------------- diff --git a/server/api_server/openapi/schemas.py b/server/api_server/openapi/schemas.py index 7adf76f4..df01ec0c 100644 --- a/server/api_server/openapi/schemas.py +++ b/server/api_server/openapi/schemas.py @@ -1031,6 +1031,89 @@ class GetSettingResponse(BaseResponse): value: Any = Field(None, description="The setting value") +# ============================================================================= +# AUTH SCHEMAS (Remember Me) +# ============================================================================= + + +class ValidateRememberRequest(BaseModel): + """Request to validate a Remember Me token.""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [{ + "token": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6a7b8c9d0e1f2" + }] + } + ) + + token: str = Field( + ..., + min_length=64, + max_length=128, + description="The unhashed Remember Me token from persistent cookie (hex-encoded binary)" + ) + + +class ValidateRememberResponse(BaseResponse): + """Response from Remember Me token validation.""" + model_config = ConfigDict( + extra="allow", + json_schema_extra={ + "examples": [{ + "success": True, + "valid": True, + "message": "Token validation successful" + }, { + "success": True, + "valid": False, + "message": "Token validation failed" + }] + } + ) + + valid: bool = Field( + ..., + description="Whether the token is valid and matches stored hash" + ) + + +class SaveRememberRequest(BaseModel): + """Request to save a Remember Me token.""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [{ + "token": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6a7b8c9d0e1f2" + }] + } + ) + + token: str = Field( + ..., + min_length=64, + max_length=128, + description="The unhashed Remember Me token to save (hex-encoded binary from bin2hex(random_bytes(32)))" + ) + + +class SaveRememberResponse(BaseResponse): + """Response from Remember Me token save operation.""" + model_config = ConfigDict( + extra="allow", + json_schema_extra={ + "examples": [{ + "success": True, + "message": "Token saved successfully", + "token_id": "remember_me_token_550e8400-e29b-41d4-a716-446655440000" + }] + } + ) + + token_id: Optional[str] = Field( + None, + description="The parameter ID where token hash was stored (UUID-based)" + ) + + # ============================================================================= # GRAPHQL SCHEMAS # ============================================================================= diff --git a/server/models/parameters_instance.py b/server/models/parameters_instance.py new file mode 100644 index 00000000..2e681ce7 --- /dev/null +++ b/server/models/parameters_instance.py @@ -0,0 +1,204 @@ +""" +Parameters Instance - Handles Parameters table operations for Remember Me tokens and other system parameters. + +The Parameters table is used for temporary, ephemeral settings like Remember Me tokens. +Structure: + parID: TEXT PRIMARY KEY (e.g., "remember_me_token_{uuid}") + parValue: TEXT (e.g., hashed token value) +""" + +import hashlib +import sqlite3 +from database import get_temp_db_connection +from logger import mylog + + +class ParametersInstance: + """Handler for Parameters table operations.""" + + # --- helper methods (DRY pattern from DeviceInstance) ---------------------- + def _fetchall(self, query, params=()): + """Fetch all rows and return as list of dicts.""" + conn = get_temp_db_connection() + rows = conn.execute(query, params).fetchall() + conn.close() + return [dict(r) for r in rows] + + def _fetchone(self, query, params=()): + """Fetch single row and return as dict or None.""" + conn = get_temp_db_connection() + row = conn.execute(query, params).fetchone() + conn.close() + return dict(row) if row else None + + def _execute(self, query, params=()): + """Execute write query (INSERT/UPDATE/DELETE).""" + conn = get_temp_db_connection() + cur = conn.cursor() + cur.execute(query, params) + conn.commit() + conn.close() + + # --- public API ----------------------------------------------------------- + + def get_parameter(self, par_id): + """ + Retrieve a parameter value by ID. + + Args: + par_id (str): The parameter ID to retrieve + + Returns: + str: The parameter value, or None if not found + """ + try: + # Try with quoted column names in case they're reserved or have special chars + row = self._fetchone( + 'SELECT "parValue" FROM "Parameters" WHERE "parID" = ?', + (par_id,) + ) + return row['parValue'] if row else None + except Exception as e: + mylog("verbose", [f"[ParametersInstance] Error retrieving parameter {par_id}: {e}"]) + return None + + def set_parameter(self, par_id, par_value): + """ + Store or update a parameter (INSERT OR REPLACE). + + Args: + par_id (str): The parameter ID + par_value (str): The parameter value + + Returns: + bool: True if successful, False otherwise + """ + try: + # Try with quoted column names in case they're reserved or have special chars + self._execute( + 'INSERT OR REPLACE INTO "Parameters" ("parID", "parValue") VALUES (?, ?)', + (par_id, par_value) + ) + mylog("verbose", [f"[ParametersInstance] Parameter {par_id} stored successfully"]) + return True + except Exception as e: + mylog("verbose", [f"[ParametersInstance] Error storing parameter {par_id}: {e}"]) + return False + + def delete_parameter(self, par_id): + """ + Delete a parameter by ID. + + Args: + par_id (str): The parameter ID to delete + + Returns: + bool: True if successful, False otherwise + """ + try: + # Try with quoted column names in case they're reserved or have special chars + self._execute( + 'DELETE FROM "Parameters" WHERE "parID" = ?', + (par_id,) + ) + mylog("verbose", [f"[ParametersInstance] Parameter {par_id} deleted successfully"]) + return True + except Exception as e: + mylog("verbose", [f"[ParametersInstance] Error deleting parameter {par_id}: {e}"]) + return False + + def delete_parameters_by_prefix(self, prefix): + """ + Delete all parameters matching a prefix pattern (for cleanup). + + Args: + prefix (str): The prefix pattern (e.g., "remember_me_token_") + + Returns: + int: Number of parameters deleted + """ + try: + conn = get_temp_db_connection() + cur = conn.cursor() + cur.execute('DELETE FROM "Parameters" WHERE "parID" LIKE ?', (f"{prefix}%",)) + deleted_count = cur.rowcount + conn.commit() + conn.close() + mylog("verbose", [f"[ParametersInstance] Deleted {deleted_count} parameters with prefix '{prefix}'"]) + return deleted_count + except Exception as e: + mylog("verbose", [f"[ParametersInstance] Error deleting parameters with prefix '{prefix}': {e}"]) + return 0 + + def validate_token(self, token): + """ + Validate a Remember Me token against stored hash. + + Security: Compares hash(token) against stored hashes using hash_equals (timing-safe). + + Args: + token (str): The unhashed token (from cookie) + + Returns: + dict: { + 'valid': bool, + 'par_id': str or None # The matching parameter ID if valid + } + + Note: + Returns immediately on first match. Use hash_equals() to prevent timing attacks. + """ + if not token: + return {'valid': False, 'par_id': None} + + try: + # Compute hash of provided token + computed_hash = hashlib.sha256(token.encode('utf-8')).hexdigest() + + # Retrieve all remember_me tokens from Parameters table + remember_tokens = self._fetchall( + 'SELECT "parID", "parValue" FROM "Parameters" WHERE "parID" LIKE ?', + ("remember_me_token_%",) + ) + + # Check each stored token using timing-safe comparison + for token_record in remember_tokens: + stored_hash = token_record['parValue'] + stored_id = token_record['parID'] + + # Use hash_equals() to prevent timing attacks + if self._hash_equals(stored_hash, computed_hash): + mylog("verbose", [f"[ParametersInstance] Token validation successful for {stored_id}"]) + return {'valid': True, 'par_id': stored_id} + + mylog("verbose", ["[ParametersInstance] Token validation failed: no matching token found"]) + return {'valid': False, 'par_id': None} + + except Exception as e: + mylog("verbose", [f"[ParametersInstance] Error validating token: {e}"]) + return {'valid': False, 'par_id': None} + + @staticmethod + def _hash_equals(known_string, user_string): + """ + Timing-safe string comparison to prevent timing attacks. + + Args: + known_string (str): The known value (stored hash) + user_string (str): The user-supplied value (computed hash) + + Returns: + bool: True if strings match, False otherwise + """ + if not isinstance(known_string, str) or not isinstance(user_string, str): + return False + + if len(known_string) != len(user_string): + return False + + # Compare all characters regardless of match (timing-safe) + result = 0 + for x, y in zip(known_string, user_string): + result |= ord(x) ^ ord(y) + + return result == 0 diff --git a/test/api_endpoints/test_remember_me_endpoints.py b/test/api_endpoints/test_remember_me_endpoints.py new file mode 100644 index 00000000..b18894ee --- /dev/null +++ b/test/api_endpoints/test_remember_me_endpoints.py @@ -0,0 +1,400 @@ +""" +Remember Me Token Tests - Security & Functionality + +Tests the secure Remember Me feature: +- Token generation and storage +- Token validation (including timing-safe comparison) +- Security: Tampered token rejection +- API endpoint validation +""" + +import sys +import os +import hashlib +import json + +# Register NetAlertX directories +INSTALL_PATH = os.getenv("NETALERTX_APP", "/app") +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +import pytest +from api_server.api_server_start import app # noqa: E402 +from models.parameters_instance import ParametersInstance # noqa: E402 + + +@pytest.fixture +def client(): + """Flask test client.""" + with app.test_client() as client: + yield client + + +@pytest.fixture +def params_instance(): + """ParametersInstance for direct database access.""" + return ParametersInstance() + + +@pytest.fixture +def test_token(): + """Generate a valid test token (64-hex characters).""" + import os + return os.urandom(32).hex() # 32 bytes = 64 hex chars + + +# ============================================================================ +# REMEMBER ME SAVE ENDPOINT TESTS +# ============================================================================ + +def test_save_remember_success(client, test_token): + """POST /auth/remember-me/save - Valid token should save successfully.""" + resp = client.post("/auth/remember-me/save", json={"token": test_token}) + + assert resp.status_code == 200 + data = resp.get_json() + assert data is not None + assert data["success"] is True + assert "saved successfully" in data.get("message", "").lower() + assert data.get("token_id") is not None + assert data["token_id"].startswith("remember_me_token_") + + +def test_save_remember_missing_token(client): + """POST /auth/remember-me/save - Missing token should fail with 422 (validation error).""" + resp = client.post("/auth/remember-me/save", json={}) + + assert resp.status_code == 422 # Pydantic validation error, not 400 + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +def test_save_remember_empty_token(client): + """POST /auth/remember-me/save - Empty token should fail with 422.""" + resp = client.post("/auth/remember-me/save", json={"token": ""}) + + assert resp.status_code == 422 # Pydantic validation error + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +def test_save_remember_short_token(client): + """POST /auth/remember-me/save - Token too short (< 64 chars) should fail with 422.""" + resp = client.post("/auth/remember-me/save", json={"token": "a" * 32}) + + assert resp.status_code == 422 # Pydantic validation error + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +def test_save_remember_null_token(client): + """POST /auth/remember-me/save - Null token should fail with 422.""" + resp = client.post("/auth/remember-me/save", json={"token": None}) + + assert resp.status_code == 422 # Pydantic validation error + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +# ============================================================================ +# REMEMBER ME VALIDATION ENDPOINT TESTS +# ============================================================================ + +def test_validate_remember_valid_token(client, test_token, params_instance): + """POST /auth/validate-remember - Valid token should validate successfully.""" + # First save the token + save_resp = client.post("/auth/remember-me/save", json={"token": test_token}) + assert save_resp.status_code == 200 + + # Now validate it + validate_resp = client.post("/auth/validate-remember", json={"token": test_token}) + + assert validate_resp.status_code == 200 + data = validate_resp.get_json() + assert data is not None + assert data["success"] is True + assert data["valid"] is True + assert "successful" in data.get("message", "").lower() + + +def test_validate_remember_tampered_token(client, test_token): + """ + POST /auth/validate-remember - SECURITY TEST: Tampered token should be rejected. + + This test verifies the security of the timing-safe hash comparison. + Even a single-character modification to the token should fail validation. + """ + # Save the original token + save_resp = client.post("/auth/remember-me/save", json={"token": test_token}) + assert save_resp.status_code == 200 + + # Tamper with the token by modifying last character + tampered_token = test_token[:-1] + ("0" if test_token[-1] != "0" else "1") + + # Attempt to validate with tampered token + validate_resp = client.post("/auth/validate-remember", json={"token": tampered_token}) + + assert validate_resp.status_code == 200 + data = validate_resp.get_json() + assert data is not None + assert data["success"] is True # API doesn't error + assert data["valid"] is False # But validation fails + assert "failed" in data.get("message", "").lower() + + +def test_validate_remember_nonexistent_token(client): + """POST /auth/validate-remember - Non-existent token should return invalid.""" + import os + random_token = os.urandom(32).hex() + + resp = client.post("/auth/validate-remember", json={"token": random_token}) + + assert resp.status_code == 200 + data = resp.get_json() + assert data is not None + assert data["success"] is True + assert data["valid"] is False + + +def test_validate_remember_missing_token(client): + """POST /auth/validate-remember - Missing token should fail with 422.""" + resp = client.post("/auth/validate-remember", json={}) + + # Pydantic validation catches this before handler + assert resp.status_code == 422 + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +def test_validate_remember_empty_token(client): + """POST /auth/validate-remember - Empty token should fail with 422.""" + resp = client.post("/auth/validate-remember", json={"token": ""}) + + # Pydantic validation catches this before handler + assert resp.status_code == 422 + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +def test_validate_remember_null_token(client): + """POST /auth/validate-remember - Null token should fail with 422.""" + resp = client.post("/auth/validate-remember", json={"token": None}) + + # Pydantic validation catches this before handler + assert resp.status_code == 422 + data = resp.get_json() + assert data is not None + assert data["success"] is False + + +# ============================================================================ +# COMPLETE WORKFLOW TESTS +# ============================================================================ + +def test_remember_me_complete_workflow(client, test_token): + """ + Save and validate: Complete Remember Me workflow. + + Simulates: + 1. User logs in, "Remember Me" checked + 2. System saves token via API + 3. User closes browser, session expires + 4. User returns, system validates token from cookie + 5. User authenticated without re-entering password + """ + # Step 1: Save token (happens at login with "Remember Me") + save_resp = client.post("/auth/remember-me/save", json={"token": test_token}) + assert save_resp.status_code == 200 + save_data = save_resp.get_json() + assert save_data["success"] is True + token_id = save_data["token_id"] + + # Step 2: Validate token (happens on return visit from cookie) + validate_resp = client.post("/auth/validate-remember", json={"token": test_token}) + assert validate_resp.status_code == 200 + validate_data = validate_resp.get_json() + assert validate_data["success"] is True + assert validate_data["valid"] is True + + # Step 3: Verify token was actually stored in Parameters table + stored_value = ParametersInstance().get_parameter(token_id) + assert stored_value is not None + expected_hash = hashlib.sha256(test_token.encode('utf-8')).hexdigest() + assert stored_value == expected_hash + + +def test_remember_me_multiple_tokens(client): + """Multiple tokens can coexist (for multi-device Remember Me in future).""" + import os + + token1 = os.urandom(32).hex() + token2 = os.urandom(32).hex() + + # Save both tokens + resp1 = client.post("/auth/remember-me/save", json={"token": token1}) + resp2 = client.post("/auth/remember-me/save", json={"token": token2}) + + assert resp1.status_code == 200 + assert resp2.status_code == 200 + + # Both should validate + validate1 = client.post("/auth/validate-remember", json={"token": token1}) + validate2 = client.post("/auth/validate-remember", json={"token": token2}) + + assert validate1.get_json()["valid"] is True + assert validate2.get_json()["valid"] is True + + # Each token should only match itself, not the other + cross_validate = client.post("/auth/validate-remember", json={"token": token1 + token2[:32]}) + assert cross_validate.get_json()["valid"] is False + + +# ============================================================================ +# SECURITY TESTS +# ============================================================================ + +def test_timing_attack_prevention(client, test_token): + """ + Verify timing-safe hash comparison prevents timing attacks. + + The _hash_equals() method should take roughly equal time regardless of + where the mismatch occurs in the string. + """ + # Save token + client.post("/auth/remember-me/save", json={"token": test_token}) + + # Create tampered versions at different positions + tamper_positions = [0, 10, 32, 60, 63] # Various positions + + for pos in tamper_positions: + tampered_token = list(test_token) + tampered_token[pos] = "F" if tampered_token[pos] != "F" else "0" + tampered_token = "".join(tampered_token) + + resp = client.post("/auth/validate-remember", json={"token": tampered_token}) + data = resp.get_json() + + # All tampered attempts should fail validation + assert data["valid"] is False, f"Tamper at position {pos} was not detected!" + + +def test_hash_not_stored_on_disk(client, test_token): + """Verify that the HASH (not the token) is stored on disk.""" + save_resp = client.post("/auth/remember-me/save", json={"token": test_token}) + token_id = save_resp.get_json()["token_id"] + + # Retrieve the stored value + stored_hash = ParametersInstance().get_parameter(token_id) + + # Verify it's a hash, not the original token + assert stored_hash != test_token, "Token hash should not match original token!" + assert len(stored_hash) == 64, "SHA256 hash should be 64 hex characters" + assert stored_hash == hashlib.sha256(test_token.encode('utf-8')).hexdigest() + + +def test_database_compromise_mitigation(client, test_token): + """ + If database is compromised, stolen hash should not authenticate. + + This verifies the security model: attacker gets hash, but can't use it + without the original token (which only exists in the user's cookie). + """ + # Save token + save_resp = client.post("/auth/remember-me/save", json={"token": test_token}) + token_id = save_resp.get_json()["token_id"] + + # Simulate database breach: attacker gets the stored hash + stored_hash = ParametersInstance().get_parameter(token_id) + + # Attacker tries to use the stolen hash as a token + breach_test_resp = client.post("/auth/validate-remember", json={"token": stored_hash}) + breach_test_data = breach_test_resp.get_json() + + # Validation should fail because hash doesn't match hash(hash(token)) + assert breach_test_data["valid"] is False, "Stolen hash should not authenticate!" + + +# ============================================================================ +# MALFORMED REQUEST TESTS +# ============================================================================ + +def test_save_remember_no_json_body(client): + """POST /auth/remember-me/save - Missing JSON body should fail.""" + resp = client.post( + "/auth/remember-me/save", + data="not json", + content_type="application/json" + ) + assert resp.status_code in [400, 500] + + +def test_validate_remember_no_json_body(client): + """POST /auth/validate-remember - Missing JSON body should handle gracefully.""" + resp = client.post( + "/auth/validate-remember", + data="not json", + content_type="application/json" + ) + assert resp.status_code in [200, 400, 500] + + +def test_save_remember_extra_fields(client, test_token): + """POST /auth/remember-me/save - Extra fields should be ignored.""" + resp = client.post("/auth/remember-me/save", json={ + "token": test_token, + "extra_field": "should be ignored", + "another": 123 + }) + + assert resp.status_code == 200 + data = resp.get_json() + assert data["success"] is True + + +# ============================================================================ +# CLEANUP/MAINTENANCE TESTS +# ============================================================================ + +def test_delete_parameter(params_instance, test_token): + """ParametersInstance.delete_parameter() should clean up tokens.""" + # Save a token + test_id = "test_token_cleanup" + test_hash = hashlib.sha256(test_token.encode('utf-8')).hexdigest() + + params_instance.set_parameter(test_id, test_hash) + assert params_instance.get_parameter(test_id) is not None + + # Delete it + success = params_instance.delete_parameter(test_id) + assert success is True + assert params_instance.get_parameter(test_id) is None + + +def test_delete_parameters_by_prefix(params_instance): + """ParametersInstance.delete_parameters_by_prefix() should batch delete tokens.""" + import os + + # Create multiple tokens with same prefix + prefix = "remember_me_token_test_" + for i in range(3): + token_id = f"{prefix}{i}" + params_instance.set_parameter(token_id, f"hash_{i}") + + # Verify they exist + assert params_instance.get_parameter(f"{prefix}0") is not None + assert params_instance.get_parameter(f"{prefix}1") is not None + assert params_instance.get_parameter(f"{prefix}2") is not None + + # Delete by prefix + deleted_count = params_instance.delete_parameters_by_prefix(prefix) + assert deleted_count >= 3 + + # Verify they're gone + assert params_instance.get_parameter(f"{prefix}0") is None + assert params_instance.get_parameter(f"{prefix}1") is None + assert params_instance.get_parameter(f"{prefix}2") is None