mirror of
https://github.com/wizarrrr/wizarr.git
synced 2025-12-23 23:59:23 -05:00
451 lines
16 KiB
Python
451 lines
16 KiB
Python
"""
|
|
Performance and load tests for invitation system.
|
|
|
|
These tests ensure the invitation system can handle concurrent users
|
|
and process invitations efficiently.
|
|
"""
|
|
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from statistics import mean, median
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from app.extensions import db
|
|
from app.models import Invitation, MediaServer, User
|
|
from app.services.invitation_manager import InvitationManager
|
|
from tests.mocks.media_server_mocks import (
|
|
create_mock_client,
|
|
get_mock_state,
|
|
setup_mock_servers,
|
|
)
|
|
|
|
|
|
class TestInvitationPerformance:
|
|
"""Test invitation processing performance."""
|
|
|
|
def setup_method(self):
|
|
"""Setup for each test."""
|
|
setup_mock_servers()
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_single_invitation_processing_time(self, mock_get_client, app):
|
|
"""Test time to process a single invitation."""
|
|
with app.app_context():
|
|
# Setup
|
|
server = MediaServer(
|
|
name="Performance Test Server",
|
|
server_type="jellyfin",
|
|
url="http://localhost:8096",
|
|
api_key="perf-key",
|
|
)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
invitation = Invitation(code="PERF123", unlimited=True)
|
|
invitation.servers = [server]
|
|
db.session.add(invitation)
|
|
db.session.commit()
|
|
|
|
mock_client = create_mock_client("jellyfin", server_id=server.id)
|
|
mock_get_client.return_value = mock_client
|
|
|
|
# Measure processing time
|
|
start_time = time.time()
|
|
|
|
success, redirect_code, errors = InvitationManager.process_invitation(
|
|
code="PERF123",
|
|
username="perfuser",
|
|
password="testpass123",
|
|
confirm_password="testpass123",
|
|
email="perf@example.com",
|
|
)
|
|
|
|
end_time = time.time()
|
|
processing_time = end_time - start_time
|
|
|
|
# Verify success
|
|
assert success
|
|
assert len(errors) == 0
|
|
|
|
# Performance assertion - should complete in under 1 second
|
|
assert processing_time < 1.0, (
|
|
f"Processing took {processing_time:.3f}s, expected < 1.0s"
|
|
)
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_concurrent_invitation_processing(self, mock_get_client, app):
|
|
"""Test processing multiple invitations concurrently."""
|
|
with app.app_context():
|
|
# Setup server and unlimited invitation
|
|
server = MediaServer(
|
|
name="Concurrent Test Server",
|
|
server_type="jellyfin",
|
|
url="http://localhost:8096",
|
|
api_key="concurrent-key",
|
|
)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
invitation = Invitation(
|
|
code="CONCURRENT123",
|
|
unlimited=True, # Allow multiple uses
|
|
)
|
|
invitation.servers = [server]
|
|
db.session.add(invitation)
|
|
db.session.commit()
|
|
|
|
mock_client = create_mock_client("jellyfin", server_id=server.id)
|
|
mock_get_client.return_value = mock_client
|
|
|
|
# Function to process invitation
|
|
def process_invitation(user_id):
|
|
# Small stagger to reduce SQLite concurrency conflicts
|
|
time.sleep(user_id * 0.01) # 0-90ms stagger
|
|
with app.app_context():
|
|
success, redirect_code, errors = (
|
|
InvitationManager.process_invitation(
|
|
code="CONCURRENT123",
|
|
username=f"user{user_id}",
|
|
password="testpass123",
|
|
confirm_password="testpass123",
|
|
email=f"user{user_id}@example.com",
|
|
)
|
|
)
|
|
return success, redirect_code, errors, user_id
|
|
|
|
# Process 10 invitations concurrently
|
|
num_users = 10
|
|
start_time = time.time()
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor:
|
|
futures = [
|
|
executor.submit(process_invitation, i) for i in range(num_users)
|
|
]
|
|
results = [future.result() for future in futures]
|
|
|
|
end_time = time.time()
|
|
total_time = end_time - start_time
|
|
|
|
# Verify all succeeded
|
|
successful_count = sum(1 for success, _, _, _ in results if success)
|
|
assert successful_count == num_users, (
|
|
f"Only {successful_count}/{num_users} invitations succeeded"
|
|
)
|
|
|
|
# Performance assertion - should handle 10 concurrent users in reasonable time
|
|
assert total_time < 5.0, (
|
|
f"Concurrent processing took {total_time:.3f}s, expected < 5.0s"
|
|
)
|
|
|
|
# Verify users were created
|
|
created_users = get_mock_state().users
|
|
assert len(created_users) == num_users
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_multi_server_invitation_performance(self, mock_get_client, app):
|
|
"""Test performance of multi-server invitation processing."""
|
|
with app.app_context():
|
|
setup_mock_servers()
|
|
|
|
# Create multiple servers
|
|
servers = []
|
|
for i in range(3): # 3 servers
|
|
server = MediaServer(
|
|
name=f"Server {i + 1}",
|
|
server_type="jellyfin",
|
|
url=f"http://localhost:809{6 + i}",
|
|
api_key=f"key{i + 1}",
|
|
)
|
|
servers.append(server)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
# Create multi-server invitation
|
|
invitation = Invitation(code="MULTIPERFORMANCE123", unlimited=False)
|
|
invitation.servers = servers
|
|
db.session.add(invitation)
|
|
db.session.commit()
|
|
|
|
# Setup mock clients
|
|
def get_client_side_effect(server):
|
|
return create_mock_client("jellyfin", server_id=server.id)
|
|
|
|
mock_get_client.side_effect = get_client_side_effect
|
|
|
|
# Measure processing time
|
|
start_time = time.time()
|
|
|
|
success, redirect_code, errors = InvitationManager.process_invitation(
|
|
code="MULTIPERFORMANCE123",
|
|
username="multiuser",
|
|
password="testpass123",
|
|
confirm_password="testpass123",
|
|
email="multi@example.com",
|
|
)
|
|
|
|
end_time = time.time()
|
|
processing_time = end_time - start_time
|
|
|
|
# Verify success
|
|
assert success
|
|
assert len(errors) == 0
|
|
|
|
# Should process 3 servers in reasonable time
|
|
assert processing_time < 3.0, (
|
|
f"Multi-server processing took {processing_time:.3f}s, expected < 3.0s"
|
|
)
|
|
|
|
# Verify users created on all servers
|
|
created_users = get_mock_state().users
|
|
assert len(created_users) == 3
|
|
|
|
|
|
class TestInvitationLoadTesting:
|
|
"""Load testing for invitation system."""
|
|
|
|
def setup_method(self):
|
|
"""Setup for each test."""
|
|
setup_mock_servers()
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_invitation_validation_performance(self, mock_get_client, app):
|
|
"""Test performance of invitation validation under load."""
|
|
with app.app_context():
|
|
# Create multiple invitations
|
|
server = MediaServer(
|
|
name="Load Test Server",
|
|
server_type="jellyfin",
|
|
url="http://localhost:8096",
|
|
api_key="load-key",
|
|
)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
# Create 100 invitations
|
|
invitations = []
|
|
for i in range(100):
|
|
invitation = Invitation(code=f"LOAD{i:03d}", unlimited=True)
|
|
invitation.servers = [server]
|
|
invitations.append(invitation)
|
|
db.session.add(invitation)
|
|
db.session.commit()
|
|
|
|
# Test validation performance
|
|
from app.services.invites import is_invite_valid
|
|
|
|
validation_times = []
|
|
|
|
for invitation in invitations[:20]: # Test first 20
|
|
start_time = time.time()
|
|
is_valid, message = is_invite_valid(invitation.code)
|
|
end_time = time.time()
|
|
|
|
validation_times.append(end_time - start_time)
|
|
assert is_valid # Should all be valid
|
|
|
|
# Performance analysis
|
|
avg_time = mean(validation_times)
|
|
median_time = median(validation_times)
|
|
max_time = max(validation_times)
|
|
|
|
# Assertions with reasonable tolerance for system variance
|
|
# Increased thresholds to account for CI/system load variations
|
|
assert avg_time < 0.05, f"Average validation time {avg_time:.4f}s too slow"
|
|
assert median_time < 0.05, (
|
|
f"Median validation time {median_time:.4f}s too slow"
|
|
)
|
|
assert max_time < 0.15, f"Max validation time {max_time:.4f}s too slow"
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_database_performance_under_load(self, mock_get_client, app):
|
|
"""Test database performance with many invitation records."""
|
|
with app.app_context():
|
|
# Create server
|
|
server = MediaServer(
|
|
name="DB Load Test Server",
|
|
server_type="jellyfin",
|
|
url="http://localhost:8096",
|
|
api_key="db-load-key",
|
|
)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
# Create many invitations and users to simulate realistic load
|
|
for i in range(500): # 500 invitations
|
|
invitation = Invitation(
|
|
code=f"DBLOAD{i:04d}",
|
|
unlimited=False,
|
|
used=(i % 3 == 0), # Every 3rd invitation is used
|
|
)
|
|
invitation.servers = [server]
|
|
db.session.add(invitation)
|
|
|
|
if invitation.used:
|
|
# Create corresponding user
|
|
user = User(
|
|
username=f"user{i}",
|
|
email=f"user{i}@example.com",
|
|
token=f"token{i}",
|
|
code=invitation.code,
|
|
server_id=server.id,
|
|
)
|
|
db.session.add(user)
|
|
|
|
db.session.commit()
|
|
|
|
# Test query performance
|
|
start_time = time.time()
|
|
|
|
# Simulate typical queries during invitation processing
|
|
for i in range(10):
|
|
code = f"DBLOAD{i:04d}"
|
|
invitation = Invitation.query.filter_by(code=code).first()
|
|
assert invitation is not None
|
|
|
|
# Check if user exists (typical validation)
|
|
user = User.query.filter_by(code=code, server_id=server.id).first()
|
|
|
|
end_time = time.time()
|
|
query_time = end_time - start_time
|
|
|
|
# Should handle queries efficiently even with large dataset
|
|
assert query_time < 1.0, (
|
|
f"Database queries took {query_time:.3f}s, expected < 1.0s"
|
|
)
|
|
|
|
|
|
class TestInvitationMemoryUsage:
|
|
"""Test memory usage during invitation processing."""
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_memory_efficient_processing(self, mock_get_client, app):
|
|
"""Test that invitation processing doesn't leak memory."""
|
|
import gc
|
|
import tracemalloc
|
|
|
|
with app.app_context():
|
|
# Setup
|
|
server = MediaServer(
|
|
name="Memory Test Server",
|
|
server_type="jellyfin",
|
|
url="http://localhost:8096",
|
|
api_key="memory-key",
|
|
)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
invitation = Invitation(code="MEMORY123", unlimited=True)
|
|
invitation.servers = [server]
|
|
db.session.add(invitation)
|
|
db.session.commit()
|
|
|
|
mock_client = create_mock_client("jellyfin", server_id=server.id)
|
|
mock_get_client.return_value = mock_client
|
|
|
|
# Start memory tracking
|
|
tracemalloc.start()
|
|
|
|
# Process multiple invitations
|
|
for i in range(50):
|
|
success, redirect_code, errors = InvitationManager.process_invitation(
|
|
code="MEMORY123",
|
|
username=f"memuser{i}",
|
|
password="testpass123",
|
|
confirm_password="testpass123",
|
|
email=f"memuser{i}@example.com",
|
|
)
|
|
assert success
|
|
|
|
# Force garbage collection periodically
|
|
if i % 10 == 0:
|
|
gc.collect()
|
|
|
|
# Check memory usage
|
|
current, peak = tracemalloc.get_traced_memory()
|
|
tracemalloc.stop()
|
|
|
|
# Memory should be reasonable (less than 50MB peak)
|
|
peak_mb = peak / 1024 / 1024
|
|
assert peak_mb < 50, f"Peak memory usage {peak_mb:.2f}MB too high"
|
|
|
|
|
|
class TestInvitationErrorRecovery:
|
|
"""Test system recovery from errors during high load."""
|
|
|
|
def setup_method(self):
|
|
"""Setup for each test."""
|
|
setup_mock_servers()
|
|
|
|
@patch("app.services.invitation_manager.get_client_for_media_server")
|
|
def test_recovery_from_server_failures(self, mock_get_client, app):
|
|
"""Test system recovery when servers fail under load."""
|
|
with app.app_context():
|
|
# Setup server
|
|
server = MediaServer(
|
|
name="Failure Recovery Server",
|
|
server_type="jellyfin",
|
|
url="http://localhost:8096",
|
|
api_key="recovery-key",
|
|
)
|
|
db.session.add(server)
|
|
db.session.flush()
|
|
|
|
invitation = Invitation(code="RECOVERY123", unlimited=True)
|
|
invitation.servers = [server]
|
|
db.session.add(invitation)
|
|
db.session.commit()
|
|
|
|
# Setup client that fails intermittently
|
|
success_count = 0
|
|
failure_count = 0
|
|
|
|
def intermittent_client(server):
|
|
nonlocal success_count, failure_count
|
|
client = create_mock_client("jellyfin", server_id=server.id)
|
|
|
|
original_join = client._do_join
|
|
|
|
def failing_join(*args, **kwargs):
|
|
nonlocal success_count, failure_count
|
|
# Fail every 3rd request
|
|
if (success_count + failure_count) % 3 == 2:
|
|
failure_count += 1
|
|
return False, "Temporary server error"
|
|
success_count += 1
|
|
return original_join(*args, **kwargs)
|
|
|
|
client._do_join = failing_join
|
|
return client
|
|
|
|
mock_get_client.side_effect = intermittent_client
|
|
|
|
# Process invitations - some should fail, others succeed
|
|
results = []
|
|
for i in range(20):
|
|
success, redirect_code, errors = InvitationManager.process_invitation(
|
|
code="RECOVERY123",
|
|
username=f"recoveryuser{i}",
|
|
password="testpass123",
|
|
confirm_password="testpass123",
|
|
email=f"recovery{i}@example.com",
|
|
)
|
|
results.append(success)
|
|
|
|
# Should have both successes and failures
|
|
successful_count = sum(results)
|
|
failed_count = len(results) - successful_count
|
|
|
|
assert successful_count > 0, "No invitations succeeded"
|
|
assert failed_count > 0, "No invitations failed (test setup issue)"
|
|
|
|
# System should remain stable
|
|
assert successful_count >= failed_count, (
|
|
"More failures than successes indicates system instability"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v", "-s"]) # -s to see print output
|