server-side remember-me

This commit is contained in:
Jokob @NetAlertX
2026-02-22 03:47:29 +00:00
parent 0e94dcb091
commit 70645e7ef3
5 changed files with 901 additions and 11 deletions

View File

@@ -84,6 +84,52 @@ function is_https_request(): bool {
return false;
}
function call_api(string $endpoint, array $data = []): ?array {
/*
Call NetAlertX API endpoint (for login page endpoints that don't require auth).
Returns: JSON response as array, or null on failure
*/
try {
// Determine API host (assume localhost on same port as frontend)
$api_host = $_SERVER['HTTP_HOST'] ?? 'localhost';
$api_scheme = is_https_request() ? 'https' : 'http';
$api_url = $api_scheme . '://' . $api_host;
$url = $api_url . $endpoint;
$ch = curl_init($url);
if (!$ch) return null;
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 5,
CURLOPT_FOLLOWLOCATION => false,
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Accept: application/json'
]
]);
if (!empty($data)) {
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
}
$response = curl_exec($ch);
$httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpcode !== 200 || !$response) {
return null;
}
return json_decode($response, true);
} catch (Exception $e) {
return null;
}
}
function logout_user(): void {
$_SESSION = [];
@@ -127,18 +173,26 @@ if (!empty($_POST['loginpassword'])) {
login_user();
// Handle "Remember Me" if checked
if (!empty($_POST['PWRemember'])) {
// Generate random token (64-byte hex = 128 chars, use 64 chars)
$token = bin2hex(random_bytes(32));
$_SESSION['remember_token'] = hash('sha256',$token);
setcookie(COOKIE_NAME,$token,[
'expires'=>time()+604800,
'path'=>'/',
'secure'=>is_https_request(),
'httponly'=>true,
'samesite'=>'Strict'
// Call API to save token hash to Parameters table
$save_response = call_api('/auth/remember-me/save', [
'token' => $token
]);
// If API call successful, set persistent cookie
if ($save_response && isset($save_response['success']) && $save_response['success']) {
setcookie(COOKIE_NAME, $token, [
'expires' => time() + 604800,
'path' => '/',
'secure' => is_https_request(),
'httponly' => true,
'samesite' => 'Strict'
]);
}
}
safe_redirect(append_hash($redirectTo));
@@ -149,9 +203,15 @@ if (!empty($_POST['loginpassword'])) {
Remember Me Validation
===================================================== */
if (!is_authenticated() && !empty($_COOKIE[COOKIE_NAME]) && !empty($_SESSION['remember_token'])) {
if (!is_authenticated() && !empty($_COOKIE[COOKIE_NAME])) {
if (hash_equals($_SESSION['remember_token'], hash('sha256',$_COOKIE[COOKIE_NAME]))) {
// Call API to validate token against stored hash
$validate_response = call_api('/auth/validate-remember', [
'token' => $_COOKIE[COOKIE_NAME]
]);
// If API returns valid token, authenticate and redirect
if ($validate_response && isset($validate_response['valid']) && $validate_response['valid'] === true) {
login_user();
safe_redirect(append_hash($redirectTo));
}

View File

@@ -6,6 +6,7 @@ import os
from flask import Flask, redirect, request, jsonify, url_for, Response
from models.device_instance import DeviceInstance # noqa: E402
from models.parameters_instance import ParametersInstance # noqa: E402
from flask_cors import CORS
from werkzeug.exceptions import HTTPException
@@ -94,7 +95,9 @@ from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression]
DbQueryRequest, DbQueryResponse,
DbQueryUpdateRequest, DbQueryDeleteRequest,
AddToQueueRequest, GetSettingResponse,
RecentEventsRequest, SetDeviceAliasRequest
RecentEventsRequest, SetDeviceAliasRequest,
ValidateRememberRequest, ValidateRememberResponse,
SaveRememberRequest, SaveRememberResponse
)
from .sse_endpoint import ( # noqa: E402 [flake8 lint suppression]
@@ -1933,6 +1936,146 @@ def check_auth(payload=None):
return jsonify({"success": True, "message": "Authentication check successful"}), 200
# --------------------------
# Remember Me Validation endpoint
# --------------------------
@app.route("/auth/validate-remember", methods=["POST"])
@validate_request(
operation_id="validate_remember",
summary="Validate Remember Me Token",
description="Validate a persistent Remember Me token against stored hash. Called from login page (no auth required).",
request_model=ValidateRememberRequest,
response_model=ValidateRememberResponse,
tags=["auth"],
auth_callable=None # No auth required - used on login page
)
def validate_remember(payload=None):
"""
Validate a Remember Me token from persistent cookie.
Security: Uses timing-safe hash comparison to prevent timing attacks.
Token format: hex-encoded 32 random bytes (64 chars) from bin2hex(random_bytes(32))
"""
try:
# Extract token from request
data = request.get_json() or {}
token = data.get("token")
if not token:
mylog("verbose", ["[auth/validate-remember] Missing token in request"])
return jsonify({
"success": True,
"valid": False,
"message": "Token validation failed: missing token"
}), 200
# Validate token against stored hash
params_instance = ParametersInstance()
result = params_instance.validate_token(token)
if result['valid']:
mylog("verbose", ["[auth/validate-remember] Token validation successful"])
return jsonify({
"success": True,
"valid": True,
"message": "Token validation successful"
}), 200
else:
mylog("verbose", ["[auth/validate-remember] Token validation failed"])
return jsonify({
"success": True,
"valid": False,
"message": "Token validation failed"
}), 200
except Exception as e:
mylog("verbose", [f"[auth/validate-remember] Unexpected error: {e}"])
return jsonify({
"success": False,
"valid": False,
"error": "Internal server error",
"message": "An unexpected error occurred during token validation"
}), 500
# --------------------------
# Remember Me Save endpoint
# --------------------------
@app.route("/auth/remember-me/save", methods=["POST"])
@validate_request(
operation_id="save_remember",
summary="Save Remember Me Token",
description="Save a Remember Me token to the database. Called after successful login to enable persistent authentication.",
request_model=SaveRememberRequest,
response_model=SaveRememberResponse,
tags=["auth"],
auth_callable=None # No auth required - used on login page
)
def save_remember(payload=None):
"""
Save a Remember Me token.
Flow:
1. User logs in with "Remember Me" checkbox
2. Password validated successfully
3. Token generated: bin2hex(random_bytes(32))
4. This endpoint called: saves hash(token) to Parameters table
5. Token (unhashed) set in persistent cookie
6. Session created and user redirected
Security: Only the HASH is stored in the database, not the token itself.
If database is compromised, attacker cannot use stolen hashes without the original token.
"""
try:
import uuid
import hashlib
# Extract token from request
data = request.get_json() or {}
token = data.get("token")
if not token or len(token) < 64:
mylog("verbose", ["[auth/remember-me/save] Invalid or missing token"])
return jsonify({
"success": False,
"error": "Invalid token",
"message": "Token must be 64+ hex characters"
}), 400
# Hash the token
token_hash = hashlib.sha256(token.encode('utf-8')).hexdigest()
# Generate UUID-based parameter ID
token_id = f"remember_me_token_{uuid.uuid4()}"
# Store hash in Parameters table
params_instance = ParametersInstance()
success = params_instance.set_parameter(token_id, token_hash)
if success:
mylog("verbose", [f"[auth/remember-me/save] Token saved successfully: {token_id}"])
return jsonify({
"success": True,
"message": "Remember Me token saved successfully",
"token_id": token_id
}), 200
else:
mylog("verbose", ["[auth/remember-me/save] Failed to save token to database"])
return jsonify({
"success": False,
"error": "Database error",
"message": "Failed to save Remember Me token"
}), 500
except Exception as e:
mylog("verbose", [f"[auth/remember-me/save] Unexpected error: {e}"])
return jsonify({
"success": False,
"error": "Internal server error",
"message": "An unexpected error occurred while saving Remember Me token"
}), 500
# --------------------------
# Health endpoint
# --------------------------

View File

@@ -1031,6 +1031,89 @@ class GetSettingResponse(BaseResponse):
value: Any = Field(None, description="The setting value")
# =============================================================================
# AUTH SCHEMAS (Remember Me)
# =============================================================================
class ValidateRememberRequest(BaseModel):
"""Request to validate a Remember Me token."""
model_config = ConfigDict(
json_schema_extra={
"examples": [{
"token": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6a7b8c9d0e1f2"
}]
}
)
token: str = Field(
...,
min_length=64,
max_length=128,
description="The unhashed Remember Me token from persistent cookie (hex-encoded binary)"
)
class ValidateRememberResponse(BaseResponse):
"""Response from Remember Me token validation."""
model_config = ConfigDict(
extra="allow",
json_schema_extra={
"examples": [{
"success": True,
"valid": True,
"message": "Token validation successful"
}, {
"success": True,
"valid": False,
"message": "Token validation failed"
}]
}
)
valid: bool = Field(
...,
description="Whether the token is valid and matches stored hash"
)
class SaveRememberRequest(BaseModel):
"""Request to save a Remember Me token."""
model_config = ConfigDict(
json_schema_extra={
"examples": [{
"token": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6a7b8c9d0e1f2"
}]
}
)
token: str = Field(
...,
min_length=64,
max_length=128,
description="The unhashed Remember Me token to save (hex-encoded binary from bin2hex(random_bytes(32)))"
)
class SaveRememberResponse(BaseResponse):
"""Response from Remember Me token save operation."""
model_config = ConfigDict(
extra="allow",
json_schema_extra={
"examples": [{
"success": True,
"message": "Token saved successfully",
"token_id": "remember_me_token_550e8400-e29b-41d4-a716-446655440000"
}]
}
)
token_id: Optional[str] = Field(
None,
description="The parameter ID where token hash was stored (UUID-based)"
)
# =============================================================================
# GRAPHQL SCHEMAS
# =============================================================================

View File

@@ -0,0 +1,204 @@
"""
Parameters Instance - Handles Parameters table operations for Remember Me tokens and other system parameters.
The Parameters table is used for temporary, ephemeral settings like Remember Me tokens.
Structure:
parID: TEXT PRIMARY KEY (e.g., "remember_me_token_{uuid}")
parValue: TEXT (e.g., hashed token value)
"""
import hashlib
import sqlite3
from database import get_temp_db_connection
from logger import mylog
class ParametersInstance:
"""Handler for Parameters table operations."""
# --- helper methods (DRY pattern from DeviceInstance) ----------------------
def _fetchall(self, query, params=()):
"""Fetch all rows and return as list of dicts."""
conn = get_temp_db_connection()
rows = conn.execute(query, params).fetchall()
conn.close()
return [dict(r) for r in rows]
def _fetchone(self, query, params=()):
"""Fetch single row and return as dict or None."""
conn = get_temp_db_connection()
row = conn.execute(query, params).fetchone()
conn.close()
return dict(row) if row else None
def _execute(self, query, params=()):
"""Execute write query (INSERT/UPDATE/DELETE)."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute(query, params)
conn.commit()
conn.close()
# --- public API -----------------------------------------------------------
def get_parameter(self, par_id):
"""
Retrieve a parameter value by ID.
Args:
par_id (str): The parameter ID to retrieve
Returns:
str: The parameter value, or None if not found
"""
try:
# Try with quoted column names in case they're reserved or have special chars
row = self._fetchone(
'SELECT "parValue" FROM "Parameters" WHERE "parID" = ?',
(par_id,)
)
return row['parValue'] if row else None
except Exception as e:
mylog("verbose", [f"[ParametersInstance] Error retrieving parameter {par_id}: {e}"])
return None
def set_parameter(self, par_id, par_value):
"""
Store or update a parameter (INSERT OR REPLACE).
Args:
par_id (str): The parameter ID
par_value (str): The parameter value
Returns:
bool: True if successful, False otherwise
"""
try:
# Try with quoted column names in case they're reserved or have special chars
self._execute(
'INSERT OR REPLACE INTO "Parameters" ("parID", "parValue") VALUES (?, ?)',
(par_id, par_value)
)
mylog("verbose", [f"[ParametersInstance] Parameter {par_id} stored successfully"])
return True
except Exception as e:
mylog("verbose", [f"[ParametersInstance] Error storing parameter {par_id}: {e}"])
return False
def delete_parameter(self, par_id):
"""
Delete a parameter by ID.
Args:
par_id (str): The parameter ID to delete
Returns:
bool: True if successful, False otherwise
"""
try:
# Try with quoted column names in case they're reserved or have special chars
self._execute(
'DELETE FROM "Parameters" WHERE "parID" = ?',
(par_id,)
)
mylog("verbose", [f"[ParametersInstance] Parameter {par_id} deleted successfully"])
return True
except Exception as e:
mylog("verbose", [f"[ParametersInstance] Error deleting parameter {par_id}: {e}"])
return False
def delete_parameters_by_prefix(self, prefix):
"""
Delete all parameters matching a prefix pattern (for cleanup).
Args:
prefix (str): The prefix pattern (e.g., "remember_me_token_")
Returns:
int: Number of parameters deleted
"""
try:
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute('DELETE FROM "Parameters" WHERE "parID" LIKE ?', (f"{prefix}%",))
deleted_count = cur.rowcount
conn.commit()
conn.close()
mylog("verbose", [f"[ParametersInstance] Deleted {deleted_count} parameters with prefix '{prefix}'"])
return deleted_count
except Exception as e:
mylog("verbose", [f"[ParametersInstance] Error deleting parameters with prefix '{prefix}': {e}"])
return 0
def validate_token(self, token):
"""
Validate a Remember Me token against stored hash.
Security: Compares hash(token) against stored hashes using hash_equals (timing-safe).
Args:
token (str): The unhashed token (from cookie)
Returns:
dict: {
'valid': bool,
'par_id': str or None # The matching parameter ID if valid
}
Note:
Returns immediately on first match. Use hash_equals() to prevent timing attacks.
"""
if not token:
return {'valid': False, 'par_id': None}
try:
# Compute hash of provided token
computed_hash = hashlib.sha256(token.encode('utf-8')).hexdigest()
# Retrieve all remember_me tokens from Parameters table
remember_tokens = self._fetchall(
'SELECT "parID", "parValue" FROM "Parameters" WHERE "parID" LIKE ?',
("remember_me_token_%",)
)
# Check each stored token using timing-safe comparison
for token_record in remember_tokens:
stored_hash = token_record['parValue']
stored_id = token_record['parID']
# Use hash_equals() to prevent timing attacks
if self._hash_equals(stored_hash, computed_hash):
mylog("verbose", [f"[ParametersInstance] Token validation successful for {stored_id}"])
return {'valid': True, 'par_id': stored_id}
mylog("verbose", ["[ParametersInstance] Token validation failed: no matching token found"])
return {'valid': False, 'par_id': None}
except Exception as e:
mylog("verbose", [f"[ParametersInstance] Error validating token: {e}"])
return {'valid': False, 'par_id': None}
@staticmethod
def _hash_equals(known_string, user_string):
"""
Timing-safe string comparison to prevent timing attacks.
Args:
known_string (str): The known value (stored hash)
user_string (str): The user-supplied value (computed hash)
Returns:
bool: True if strings match, False otherwise
"""
if not isinstance(known_string, str) or not isinstance(user_string, str):
return False
if len(known_string) != len(user_string):
return False
# Compare all characters regardless of match (timing-safe)
result = 0
for x, y in zip(known_string, user_string):
result |= ord(x) ^ ord(y)
return result == 0

View File

@@ -0,0 +1,400 @@
"""
Remember Me Token Tests - Security & Functionality
Tests the secure Remember Me feature:
- Token generation and storage
- Token validation (including timing-safe comparison)
- Security: Tampered token rejection
- API endpoint validation
"""
import sys
import os
import hashlib
import json
# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import pytest
from api_server.api_server_start import app # noqa: E402
from models.parameters_instance import ParametersInstance # noqa: E402
@pytest.fixture
def client():
"""Flask test client."""
with app.test_client() as client:
yield client
@pytest.fixture
def params_instance():
"""ParametersInstance for direct database access."""
return ParametersInstance()
@pytest.fixture
def test_token():
"""Generate a valid test token (64-hex characters)."""
import os
return os.urandom(32).hex() # 32 bytes = 64 hex chars
# ============================================================================
# REMEMBER ME SAVE ENDPOINT TESTS
# ============================================================================
def test_save_remember_success(client, test_token):
"""POST /auth/remember-me/save - Valid token should save successfully."""
resp = client.post("/auth/remember-me/save", json={"token": test_token})
assert resp.status_code == 200
data = resp.get_json()
assert data is not None
assert data["success"] is True
assert "saved successfully" in data.get("message", "").lower()
assert data.get("token_id") is not None
assert data["token_id"].startswith("remember_me_token_")
def test_save_remember_missing_token(client):
"""POST /auth/remember-me/save - Missing token should fail with 422 (validation error)."""
resp = client.post("/auth/remember-me/save", json={})
assert resp.status_code == 422 # Pydantic validation error, not 400
data = resp.get_json()
assert data is not None
assert data["success"] is False
def test_save_remember_empty_token(client):
"""POST /auth/remember-me/save - Empty token should fail with 422."""
resp = client.post("/auth/remember-me/save", json={"token": ""})
assert resp.status_code == 422 # Pydantic validation error
data = resp.get_json()
assert data is not None
assert data["success"] is False
def test_save_remember_short_token(client):
"""POST /auth/remember-me/save - Token too short (< 64 chars) should fail with 422."""
resp = client.post("/auth/remember-me/save", json={"token": "a" * 32})
assert resp.status_code == 422 # Pydantic validation error
data = resp.get_json()
assert data is not None
assert data["success"] is False
def test_save_remember_null_token(client):
"""POST /auth/remember-me/save - Null token should fail with 422."""
resp = client.post("/auth/remember-me/save", json={"token": None})
assert resp.status_code == 422 # Pydantic validation error
data = resp.get_json()
assert data is not None
assert data["success"] is False
# ============================================================================
# REMEMBER ME VALIDATION ENDPOINT TESTS
# ============================================================================
def test_validate_remember_valid_token(client, test_token, params_instance):
"""POST /auth/validate-remember - Valid token should validate successfully."""
# First save the token
save_resp = client.post("/auth/remember-me/save", json={"token": test_token})
assert save_resp.status_code == 200
# Now validate it
validate_resp = client.post("/auth/validate-remember", json={"token": test_token})
assert validate_resp.status_code == 200
data = validate_resp.get_json()
assert data is not None
assert data["success"] is True
assert data["valid"] is True
assert "successful" in data.get("message", "").lower()
def test_validate_remember_tampered_token(client, test_token):
"""
POST /auth/validate-remember - SECURITY TEST: Tampered token should be rejected.
This test verifies the security of the timing-safe hash comparison.
Even a single-character modification to the token should fail validation.
"""
# Save the original token
save_resp = client.post("/auth/remember-me/save", json={"token": test_token})
assert save_resp.status_code == 200
# Tamper with the token by modifying last character
tampered_token = test_token[:-1] + ("0" if test_token[-1] != "0" else "1")
# Attempt to validate with tampered token
validate_resp = client.post("/auth/validate-remember", json={"token": tampered_token})
assert validate_resp.status_code == 200
data = validate_resp.get_json()
assert data is not None
assert data["success"] is True # API doesn't error
assert data["valid"] is False # But validation fails
assert "failed" in data.get("message", "").lower()
def test_validate_remember_nonexistent_token(client):
"""POST /auth/validate-remember - Non-existent token should return invalid."""
import os
random_token = os.urandom(32).hex()
resp = client.post("/auth/validate-remember", json={"token": random_token})
assert resp.status_code == 200
data = resp.get_json()
assert data is not None
assert data["success"] is True
assert data["valid"] is False
def test_validate_remember_missing_token(client):
"""POST /auth/validate-remember - Missing token should fail with 422."""
resp = client.post("/auth/validate-remember", json={})
# Pydantic validation catches this before handler
assert resp.status_code == 422
data = resp.get_json()
assert data is not None
assert data["success"] is False
def test_validate_remember_empty_token(client):
"""POST /auth/validate-remember - Empty token should fail with 422."""
resp = client.post("/auth/validate-remember", json={"token": ""})
# Pydantic validation catches this before handler
assert resp.status_code == 422
data = resp.get_json()
assert data is not None
assert data["success"] is False
def test_validate_remember_null_token(client):
"""POST /auth/validate-remember - Null token should fail with 422."""
resp = client.post("/auth/validate-remember", json={"token": None})
# Pydantic validation catches this before handler
assert resp.status_code == 422
data = resp.get_json()
assert data is not None
assert data["success"] is False
# ============================================================================
# COMPLETE WORKFLOW TESTS
# ============================================================================
def test_remember_me_complete_workflow(client, test_token):
"""
Save and validate: Complete Remember Me workflow.
Simulates:
1. User logs in, "Remember Me" checked
2. System saves token via API
3. User closes browser, session expires
4. User returns, system validates token from cookie
5. User authenticated without re-entering password
"""
# Step 1: Save token (happens at login with "Remember Me")
save_resp = client.post("/auth/remember-me/save", json={"token": test_token})
assert save_resp.status_code == 200
save_data = save_resp.get_json()
assert save_data["success"] is True
token_id = save_data["token_id"]
# Step 2: Validate token (happens on return visit from cookie)
validate_resp = client.post("/auth/validate-remember", json={"token": test_token})
assert validate_resp.status_code == 200
validate_data = validate_resp.get_json()
assert validate_data["success"] is True
assert validate_data["valid"] is True
# Step 3: Verify token was actually stored in Parameters table
stored_value = ParametersInstance().get_parameter(token_id)
assert stored_value is not None
expected_hash = hashlib.sha256(test_token.encode('utf-8')).hexdigest()
assert stored_value == expected_hash
def test_remember_me_multiple_tokens(client):
"""Multiple tokens can coexist (for multi-device Remember Me in future)."""
import os
token1 = os.urandom(32).hex()
token2 = os.urandom(32).hex()
# Save both tokens
resp1 = client.post("/auth/remember-me/save", json={"token": token1})
resp2 = client.post("/auth/remember-me/save", json={"token": token2})
assert resp1.status_code == 200
assert resp2.status_code == 200
# Both should validate
validate1 = client.post("/auth/validate-remember", json={"token": token1})
validate2 = client.post("/auth/validate-remember", json={"token": token2})
assert validate1.get_json()["valid"] is True
assert validate2.get_json()["valid"] is True
# Each token should only match itself, not the other
cross_validate = client.post("/auth/validate-remember", json={"token": token1 + token2[:32]})
assert cross_validate.get_json()["valid"] is False
# ============================================================================
# SECURITY TESTS
# ============================================================================
def test_timing_attack_prevention(client, test_token):
"""
Verify timing-safe hash comparison prevents timing attacks.
The _hash_equals() method should take roughly equal time regardless of
where the mismatch occurs in the string.
"""
# Save token
client.post("/auth/remember-me/save", json={"token": test_token})
# Create tampered versions at different positions
tamper_positions = [0, 10, 32, 60, 63] # Various positions
for pos in tamper_positions:
tampered_token = list(test_token)
tampered_token[pos] = "F" if tampered_token[pos] != "F" else "0"
tampered_token = "".join(tampered_token)
resp = client.post("/auth/validate-remember", json={"token": tampered_token})
data = resp.get_json()
# All tampered attempts should fail validation
assert data["valid"] is False, f"Tamper at position {pos} was not detected!"
def test_hash_not_stored_on_disk(client, test_token):
"""Verify that the HASH (not the token) is stored on disk."""
save_resp = client.post("/auth/remember-me/save", json={"token": test_token})
token_id = save_resp.get_json()["token_id"]
# Retrieve the stored value
stored_hash = ParametersInstance().get_parameter(token_id)
# Verify it's a hash, not the original token
assert stored_hash != test_token, "Token hash should not match original token!"
assert len(stored_hash) == 64, "SHA256 hash should be 64 hex characters"
assert stored_hash == hashlib.sha256(test_token.encode('utf-8')).hexdigest()
def test_database_compromise_mitigation(client, test_token):
"""
If database is compromised, stolen hash should not authenticate.
This verifies the security model: attacker gets hash, but can't use it
without the original token (which only exists in the user's cookie).
"""
# Save token
save_resp = client.post("/auth/remember-me/save", json={"token": test_token})
token_id = save_resp.get_json()["token_id"]
# Simulate database breach: attacker gets the stored hash
stored_hash = ParametersInstance().get_parameter(token_id)
# Attacker tries to use the stolen hash as a token
breach_test_resp = client.post("/auth/validate-remember", json={"token": stored_hash})
breach_test_data = breach_test_resp.get_json()
# Validation should fail because hash doesn't match hash(hash(token))
assert breach_test_data["valid"] is False, "Stolen hash should not authenticate!"
# ============================================================================
# MALFORMED REQUEST TESTS
# ============================================================================
def test_save_remember_no_json_body(client):
"""POST /auth/remember-me/save - Missing JSON body should fail."""
resp = client.post(
"/auth/remember-me/save",
data="not json",
content_type="application/json"
)
assert resp.status_code in [400, 500]
def test_validate_remember_no_json_body(client):
"""POST /auth/validate-remember - Missing JSON body should handle gracefully."""
resp = client.post(
"/auth/validate-remember",
data="not json",
content_type="application/json"
)
assert resp.status_code in [200, 400, 500]
def test_save_remember_extra_fields(client, test_token):
"""POST /auth/remember-me/save - Extra fields should be ignored."""
resp = client.post("/auth/remember-me/save", json={
"token": test_token,
"extra_field": "should be ignored",
"another": 123
})
assert resp.status_code == 200
data = resp.get_json()
assert data["success"] is True
# ============================================================================
# CLEANUP/MAINTENANCE TESTS
# ============================================================================
def test_delete_parameter(params_instance, test_token):
"""ParametersInstance.delete_parameter() should clean up tokens."""
# Save a token
test_id = "test_token_cleanup"
test_hash = hashlib.sha256(test_token.encode('utf-8')).hexdigest()
params_instance.set_parameter(test_id, test_hash)
assert params_instance.get_parameter(test_id) is not None
# Delete it
success = params_instance.delete_parameter(test_id)
assert success is True
assert params_instance.get_parameter(test_id) is None
def test_delete_parameters_by_prefix(params_instance):
"""ParametersInstance.delete_parameters_by_prefix() should batch delete tokens."""
import os
# Create multiple tokens with same prefix
prefix = "remember_me_token_test_"
for i in range(3):
token_id = f"{prefix}{i}"
params_instance.set_parameter(token_id, f"hash_{i}")
# Verify they exist
assert params_instance.get_parameter(f"{prefix}0") is not None
assert params_instance.get_parameter(f"{prefix}1") is not None
assert params_instance.get_parameter(f"{prefix}2") is not None
# Delete by prefix
deleted_count = params_instance.delete_parameters_by_prefix(prefix)
assert deleted_count >= 3
# Verify they're gone
assert params_instance.get_parameter(f"{prefix}0") is None
assert params_instance.get_parameter(f"{prefix}1") is None
assert params_instance.get_parameter(f"{prefix}2") is None