From edd5bd27b07c870783150288bb4b8bba0f813f1e Mon Sep 17 00:00:00 2001 From: Adam Outler Date: Thu, 23 Oct 2025 23:33:04 +0000 Subject: [PATCH] Devcontainer setup --- .devcontainer/Dockerfile | 4 +- .devcontainer/devcontainer.json | 2 +- .../resources/devcontainer-Dockerfile | 4 +- install/production-filesystem/entrypoint.sh | 30 +- test/test_compound_conditions.py | 456 +++++++++--------- test/test_safe_builder_unit.py | 356 +++++++------- 6 files changed, 430 insertions(+), 422 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index cb39f0bb..f45e620b 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -210,7 +210,7 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ FROM runner AS netalertx-devcontainer ENV INSTALL_DIR=/app -ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages +ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages:/usr/lib/python3.12/site-packages ENV PATH=/services:${PATH} ENV PHP_INI_SCAN_DIR=/services/config/php/conf.d:/etc/php83/conf.d ENV LISTEN_ADDR=0.0.0.0 @@ -231,7 +231,7 @@ RUN mkdir /workspaces && \ install -d -o netalertx -g netalertx -m 777 /services/run/logs && \ install -d -o netalertx -g netalertx -m 777 /app/run/tmp/client_body && \ sed -i -e 's|:/app:|:/workspaces:|' /etc/passwd && \ - find /opt/venv -type d -exec chmod o+rw {} \; + find /opt/venv -type d -exec chmod o+rwx {} \; USER netalertx ENTRYPOINT ["/bin/sh","-c","sleep infinity"] diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 4628c2ba..2a2276c7 100755 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -43,7 +43,7 @@ } }, - "postCreateCommand": "pip install pytest docker", + "postCreateCommand": "/opt/venv/bin/pip3 install pytest docker debugpy", "postStartCommand": "${containerWorkspaceFolder}/.devcontainer/scripts/setup.sh", "customizations": { diff --git a/.devcontainer/resources/devcontainer-Dockerfile b/.devcontainer/resources/devcontainer-Dockerfile index 352d874d..fc1709eb 100755 --- a/.devcontainer/resources/devcontainer-Dockerfile +++ b/.devcontainer/resources/devcontainer-Dockerfile @@ -7,7 +7,7 @@ FROM runner AS netalertx-devcontainer ENV INSTALL_DIR=/app -ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages +ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages:/usr/lib/python3.12/site-packages ENV PATH=/services:${PATH} ENV PHP_INI_SCAN_DIR=/services/config/php/conf.d:/etc/php83/conf.d ENV LISTEN_ADDR=0.0.0.0 @@ -28,7 +28,7 @@ RUN mkdir /workspaces && \ install -d -o netalertx -g netalertx -m 777 /services/run/logs && \ install -d -o netalertx -g netalertx -m 777 /app/run/tmp/client_body && \ sed -i -e 's|:/app:|:/workspaces:|' /etc/passwd && \ - find /opt/venv -type d -exec chmod o+rw {} \; + find /opt/venv -type d -exec chmod o+rwx {} \; USER netalertx ENTRYPOINT ["/bin/sh","-c","sleep infinity"] diff --git a/install/production-filesystem/entrypoint.sh b/install/production-filesystem/entrypoint.sh index 582a9fd4..b8ef0f9f 100644 --- a/install/production-filesystem/entrypoint.sh +++ b/install/production-filesystem/entrypoint.sh @@ -57,20 +57,22 @@ NETALERTX_DOCKER_ERROR_CHECK=0 # Run all pre-startup checks to validate container environment and dependencies -echo "Startup pre-checks" -for script in ${SYSTEM_SERVICES_SCRIPTS}/check-*.sh; do - script_name=$(basename "$script" | sed 's/^check-//;s/\.sh$//;s/-/ /g') - echo " --> ${script_name}" - - sh "$script" - NETALERTX_DOCKER_ERROR_CHECK=$? - - if [ ${NETALERTX_DOCKER_ERROR_CHECK} -ne 0 ]; then - - echo exit code ${NETALERTX_DOCKER_ERROR_CHECK} from ${script} - exit ${NETALERTX_DOCKER_ERROR_CHECK} - fi -done +if [ ${NETALERTX_DEBUG != 1} ]; then + echo "Startup pre-checks" + for script in ${SYSTEM_SERVICES_SCRIPTS}/check-*.sh; do + script_name=$(basename "$script" | sed 's/^check-//;s/\.sh$//;s/-/ /g') + echo " --> ${script_name}" + + sh "$script" + NETALERTX_DOCKER_ERROR_CHECK=$? + + if [ ${NETALERTX_DOCKER_ERROR_CHECK} -ne 0 ]; then + + echo exit code ${NETALERTX_DOCKER_ERROR_CHECK} from ${script} + exit ${NETALERTX_DOCKER_ERROR_CHECK} + fi + done +fi # Exit after checks if in check-only mode (for testing) if [ "${NETALERTX_CHECK_ONLY:-0}" -eq 1 ]; then diff --git a/test/test_compound_conditions.py b/test/test_compound_conditions.py index e7d15557..bfb9679a 100755 --- a/test/test_compound_conditions.py +++ b/test/test_compound_conditions.py @@ -5,322 +5,326 @@ Tests the fix for Issue #1210 - compound conditions with multiple AND/OR clauses """ import sys -import unittest +import pytest from unittest.mock import MagicMock # Mock the logger module before importing SafeConditionBuilder sys.modules['logger'] = MagicMock() # Add parent directory to path for imports -sys.path.insert(0, '/tmp/netalertx_hotfix/server/db') +sys.path.insert(0, '/workspaces/NetAlertX') -from sql_safe_builder import SafeConditionBuilder +from server.db.sql_safe_builder import SafeConditionBuilder -class TestCompoundConditions(unittest.TestCase): - """Test compound condition parsing functionality.""" +@pytest.fixture +def builder(): + """Create a fresh builder instance for each test.""" + return SafeConditionBuilder() - def setUp(self): - """Create a fresh builder instance for each test.""" - self.builder = SafeConditionBuilder() - def test_user_failing_filter_six_and_clauses(self): - """Test the exact user-reported failing filter from Issue #1210.""" - condition = ( - "AND devLastIP NOT LIKE '192.168.50.%' " - "AND devLastIP NOT LIKE '192.168.60.%' " - "AND devLastIP NOT LIKE '192.168.70.2' " - "AND devLastIP NOT LIKE '192.168.70.5' " - "AND devLastIP NOT LIKE '192.168.70.3' " - "AND devLastIP NOT LIKE '192.168.70.4'" - ) +def test_user_failing_filter_six_and_clauses(builder): + """Test the exact user-reported failing filter from Issue #1210.""" + condition = ( + "AND devLastIP NOT LIKE '192.168.50.%' " + "AND devLastIP NOT LIKE '192.168.60.%' " + "AND devLastIP NOT LIKE '192.168.70.2' " + "AND devLastIP NOT LIKE '192.168.70.5' " + "AND devLastIP NOT LIKE '192.168.70.3' " + "AND devLastIP NOT LIKE '192.168.70.4'" + ) - sql, params = self.builder.build_safe_condition(condition) + sql, params = builder.build_safe_condition(condition) - # Should successfully parse - self.assertIsNotNone(sql) - self.assertIsNotNone(params) + # Should successfully parse + assert sql is not None + assert params is not None - # Should have 6 parameters (one per clause) - self.assertEqual(len(params), 6) + # Should have 6 parameters (one per clause) + assert len(params) == 6 - # Should contain all 6 AND operators - self.assertEqual(sql.count('AND'), 6) + # Should contain all 6 AND operators + assert sql.count('AND') == 6 - # Should contain all 6 NOT LIKE operators - self.assertEqual(sql.count('NOT LIKE'), 6) + # Should contain all 6 NOT LIKE operators + assert sql.count('NOT LIKE') == 6 - # Should have 6 parameter placeholders - self.assertEqual(sql.count(':param_'), 6) + # Should have 6 parameter placeholders + assert sql.count(':param_') == 6 - # Verify all IP patterns are in parameters - param_values = list(params.values()) - self.assertIn('192.168.50.%', param_values) - self.assertIn('192.168.60.%', param_values) - self.assertIn('192.168.70.2', param_values) - self.assertIn('192.168.70.5', param_values) - self.assertIn('192.168.70.3', param_values) - self.assertIn('192.168.70.4', param_values) + # Verify all IP patterns are in parameters + param_values = list(params.values()) + assert '192.168.50.%' in param_values + assert '192.168.60.%' in param_values + assert '192.168.70.2' in param_values + assert '192.168.70.5' in param_values + assert '192.168.70.3' in param_values + assert '192.168.70.4' in param_values - def test_multiple_and_clauses_simple(self): - """Test multiple AND clauses with simple equality operators.""" - condition = "AND devName = 'Device1' AND devVendor = 'Apple' AND devFavorite = '1'" - sql, params = self.builder.build_safe_condition(condition) +def test_multiple_and_clauses_simple(builder): + """Test multiple AND clauses with simple equality operators.""" + condition = "AND devName = 'Device1' AND devVendor = 'Apple' AND devFavorite = '1'" - # Should have 3 parameters - self.assertEqual(len(params), 3) + sql, params = builder.build_safe_condition(condition) - # Should have 3 AND operators - self.assertEqual(sql.count('AND'), 3) + # Should have 3 parameters + assert len(params) == 3 - # Verify all values are parameterized - param_values = list(params.values()) - self.assertIn('Device1', param_values) - self.assertIn('Apple', param_values) - self.assertIn('1', param_values) + # Should have 3 AND operators + assert sql.count('AND') == 3 - def test_multiple_or_clauses(self): - """Test multiple OR clauses.""" - condition = "OR devName = 'Device1' OR devName = 'Device2' OR devName = 'Device3'" + # Verify all values are parameterized + param_values = list(params.values()) + assert 'Device1' in param_values + assert 'Apple' in param_values + assert '1' in param_values - sql, params = self.builder.build_safe_condition(condition) - # Should have 3 parameters - self.assertEqual(len(params), 3) +def test_multiple_or_clauses(builder): + """Test multiple OR clauses.""" + condition = "OR devName = 'Device1' OR devName = 'Device2' OR devName = 'Device3'" - # Should have 3 OR operators - self.assertEqual(sql.count('OR'), 3) + sql, params = builder.build_safe_condition(condition) - # Verify all device names are parameterized - param_values = list(params.values()) - self.assertIn('Device1', param_values) - self.assertIn('Device2', param_values) - self.assertIn('Device3', param_values) + # Should have 3 parameters + assert len(params) == 3 - def test_mixed_and_or_clauses(self): - """Test mixed AND/OR logical operators.""" - condition = "AND devName = 'Device1' OR devName = 'Device2' AND devFavorite = '1'" + # Should have 3 OR operators + assert sql.count('OR') == 3 - sql, params = self.builder.build_safe_condition(condition) + # Verify all device names are parameterized + param_values = list(params.values()) + assert 'Device1' in param_values + assert 'Device2' in param_values + assert 'Device3' in param_values - # Should have 3 parameters - self.assertEqual(len(params), 3) +def test_mixed_and_or_clauses(builder): + """Test mixed AND/OR logical operators.""" + condition = "AND devName = 'Device1' OR devName = 'Device2' AND devFavorite = '1'" - # Should preserve the logical operator order - self.assertIn('AND', sql) - self.assertIn('OR', sql) + sql, params = builder.build_safe_condition(condition) - # Verify all values are parameterized - param_values = list(params.values()) - self.assertIn('Device1', param_values) - self.assertIn('Device2', param_values) - self.assertIn('1', param_values) + # Should have 3 parameters + assert len(params) == 3 - def test_single_condition_backward_compatibility(self): - """Test that single conditions still work (backward compatibility).""" - condition = "AND devName = 'TestDevice'" + # Should preserve the logical operator order + assert 'AND' in sql + assert 'OR' in sql - sql, params = self.builder.build_safe_condition(condition) + # Verify all values are parameterized + param_values = list(params.values()) + assert 'Device1' in param_values + assert 'Device2' in param_values + assert '1' in param_values - # Should have 1 parameter - self.assertEqual(len(params), 1) - # Should match expected format - self.assertIn('AND devName = :param_', sql) +def test_single_condition_backward_compatibility(builder): + """Test that single conditions still work (backward compatibility).""" + condition = "AND devName = 'TestDevice'" - # Parameter should contain the value - self.assertIn('TestDevice', params.values()) + sql, params = builder.build_safe_condition(condition) - def test_single_condition_like_operator(self): - """Test single LIKE condition for backward compatibility.""" - condition = "AND devComments LIKE '%important%'" + # Should have 1 parameter + assert len(params) == 1 - sql, params = self.builder.build_safe_condition(condition) + # Should match expected format + assert 'AND devName = :param_' in sql - # Should have 1 parameter - self.assertEqual(len(params), 1) + # Parameter should contain the value + assert 'TestDevice' in params.values() - # Should contain LIKE operator - self.assertIn('LIKE', sql) - # Parameter should contain the pattern - self.assertIn('%important%', params.values()) +def test_single_condition_like_operator(builder): + """Test single LIKE condition for backward compatibility.""" + condition = "AND devComments LIKE '%important%'" - def test_compound_with_like_patterns(self): - """Test compound conditions with LIKE patterns.""" - condition = "AND devLastIP LIKE '192.168.%' AND devVendor LIKE '%Apple%'" + sql, params = builder.build_safe_condition(condition) - sql, params = self.builder.build_safe_condition(condition) + # Should have 1 parameter + assert len(params) == 1 - # Should have 2 parameters - self.assertEqual(len(params), 2) + # Should contain LIKE operator + assert 'LIKE' in sql - # Should have 2 LIKE operators - self.assertEqual(sql.count('LIKE'), 2) + # Parameter should contain the pattern + assert '%important%' in params.values() - # Verify patterns are parameterized - param_values = list(params.values()) - self.assertIn('192.168.%', param_values) - self.assertIn('%Apple%', param_values) - def test_compound_with_inequality_operators(self): - """Test compound conditions with various inequality operators.""" - condition = "AND eve_DateTime > '2024-01-01' AND eve_DateTime < '2024-12-31'" +def test_compound_with_like_patterns(builder): + """Test compound conditions with LIKE patterns.""" + condition = "AND devLastIP LIKE '192.168.%' AND devVendor LIKE '%Apple%'" - sql, params = self.builder.build_safe_condition(condition) + sql, params = builder.build_safe_condition(condition) - # Should have 2 parameters - self.assertEqual(len(params), 2) + # Should have 2 parameters + assert len(params) == 2 - # Should have both operators - self.assertIn('>', sql) - self.assertIn('<', sql) + # Should have 2 LIKE operators + assert sql.count('LIKE') == 2 - # Verify dates are parameterized - param_values = list(params.values()) - self.assertIn('2024-01-01', param_values) - self.assertIn('2024-12-31', param_values) + # Verify patterns are parameterized + param_values = list(params.values()) + assert '192.168.%' in param_values + assert '%Apple%' in param_values - def test_empty_condition(self): - """Test empty condition string.""" - condition = "" - sql, params = self.builder.build_safe_condition(condition) +def test_compound_with_inequality_operators(builder): + """Test compound conditions with various inequality operators.""" + condition = "AND eve_DateTime > '2024-01-01' AND eve_DateTime < '2024-12-31'" - # Should return empty results - self.assertEqual(sql, "") - self.assertEqual(params, {}) + sql, params = builder.build_safe_condition(condition) - def test_whitespace_only_condition(self): - """Test condition with only whitespace.""" - condition = " \t\n " + # Should have 2 parameters + assert len(params) == 2 - sql, params = self.builder.build_safe_condition(condition) + # Should have both operators + assert '>' in sql + assert '<' in sql - # Should return empty results - self.assertEqual(sql, "") - self.assertEqual(params, {}) + # Verify dates are parameterized + param_values = list(params.values()) + assert '2024-01-01' in param_values + assert '2024-12-31' in param_values - def test_invalid_column_name_rejected(self): - """Test that invalid column names are rejected.""" - condition = "AND malicious_column = 'value'" - with self.assertRaises(ValueError): - self.builder.build_safe_condition(condition) +def test_empty_condition(builder): + """Test empty condition string.""" + condition = "" - def test_invalid_operator_rejected(self): - """Test that invalid operators are rejected.""" - condition = "AND devName EXECUTE 'DROP TABLE'" + sql, params = builder.build_safe_condition(condition) - with self.assertRaises(ValueError): - self.builder.build_safe_condition(condition) + # Should return empty results + assert sql == "" + assert params == {} - def test_sql_injection_attempt_blocked(self): - """Test that SQL injection attempts are blocked.""" - condition = "AND devName = 'value'; DROP TABLE devices; --" - # Should either reject or sanitize the dangerous input - # The semicolon and comment should not appear in the final SQL - try: - sql, params = self.builder.build_safe_condition(condition) - # If it doesn't raise an error, it should sanitize the input - self.assertNotIn('DROP', sql.upper()) - self.assertNotIn(';', sql) - except ValueError: - # Rejection is also acceptable - pass +def test_whitespace_only_condition(builder): + """Test condition with only whitespace.""" + condition = " \t\n " - def test_quoted_string_with_spaces(self): - """Test that quoted strings with spaces are handled correctly.""" - condition = "AND devName = 'My Device Name' AND devComments = 'Has spaces here'" + sql, params = builder.build_safe_condition(condition) - sql, params = self.builder.build_safe_condition(condition) + # Should return empty results + assert sql == "" + assert params == {} - # Should have 2 parameters - self.assertEqual(len(params), 2) - # Verify values with spaces are preserved - param_values = list(params.values()) - self.assertIn('My Device Name', param_values) - self.assertIn('Has spaces here', param_values) +def test_invalid_column_name_rejected(builder): + """Test that invalid column names are rejected.""" + condition = "AND malicious_column = 'value'" - def test_compound_condition_with_not_equal(self): - """Test compound conditions with != operator.""" - condition = "AND devName != 'Device1' AND devVendor != 'Unknown'" + with pytest.raises(ValueError): + builder.build_safe_condition(condition) - sql, params = self.builder.build_safe_condition(condition) - # Should have 2 parameters - self.assertEqual(len(params), 2) +def test_invalid_operator_rejected(builder): + """Test that invalid operators are rejected.""" + condition = "AND devName EXECUTE 'DROP TABLE'" - # Should have != operators (or converted to <>) - self.assertTrue('!=' in sql or '<>' in sql) + with pytest.raises(ValueError): + builder.build_safe_condition(condition) - # Verify values are parameterized - param_values = list(params.values()) - self.assertIn('Device1', param_values) - self.assertIn('Unknown', param_values) - def test_very_long_compound_condition(self): - """Test handling of very long compound conditions (10+ clauses).""" - clauses = [] - for i in range(10): - clauses.append(f"AND devName != 'Device{i}'") +def test_sql_injection_attempt_blocked(builder): + """Test that SQL injection attempts are blocked.""" + condition = "AND devName = 'value'; DROP TABLE devices; --" - condition = " ".join(clauses) - sql, params = self.builder.build_safe_condition(condition) + # Should either reject or sanitize the dangerous input + # The semicolon and comment should not appear in the final SQL + try: + sql, params = builder.build_safe_condition(condition) + # If it doesn't raise an error, it should sanitize the input + assert 'DROP' not in sql.upper() + assert ';' not in sql + except ValueError: + # Rejection is also acceptable + pass - # Should have 10 parameters - self.assertEqual(len(params), 10) - # Should have 10 AND operators - self.assertEqual(sql.count('AND'), 10) +def test_quoted_string_with_spaces(builder): + """Test that quoted strings with spaces are handled correctly.""" + condition = "AND devName = 'My Device Name' AND devComments = 'Has spaces here'" - # Verify all device names are parameterized - param_values = list(params.values()) - for i in range(10): - self.assertIn(f'Device{i}', param_values) + sql, params = builder.build_safe_condition(condition) + # Should have 2 parameters + assert len(params) == 2 -class TestParameterGeneration(unittest.TestCase): - """Test parameter generation and naming.""" + # Verify values with spaces are preserved + param_values = list(params.values()) + assert 'My Device Name' in param_values + assert 'Has spaces here' in param_values - def setUp(self): - """Create a fresh builder instance for each test.""" - self.builder = SafeConditionBuilder() - def test_parameters_have_unique_names(self): - """Test that all parameters get unique names.""" - condition = "AND devName = 'A' AND devName = 'B' AND devName = 'C'" +def test_compound_condition_with_not_equal(builder): + """Test compound conditions with != operator.""" + condition = "AND devName != 'Device1' AND devVendor != 'Unknown'" - sql, params = self.builder.build_safe_condition(condition) + sql, params = builder.build_safe_condition(condition) - # All parameter names should be unique - param_names = list(params.keys()) - self.assertEqual(len(param_names), len(set(param_names))) + # Should have 2 parameters + assert len(params) == 2 - def test_parameter_values_match_condition(self): - """Test that parameter values correctly match the condition values.""" - condition = "AND devLastIP NOT LIKE '192.168.1.%' AND devLastIP NOT LIKE '10.0.0.%'" + # Should have != operators (or converted to <>) + assert '!=' in sql or '<>' in sql - sql, params = self.builder.build_safe_condition(condition) + # Verify values are parameterized + param_values = list(params.values()) + assert 'Device1' in param_values + assert 'Unknown' in param_values - # Should have exactly the values from the condition - param_values = sorted(params.values()) - expected_values = sorted(['192.168.1.%', '10.0.0.%']) - self.assertEqual(param_values, expected_values) - def test_parameters_referenced_in_sql(self): - """Test that all parameters are actually referenced in the SQL.""" - condition = "AND devName = 'Device1' AND devVendor = 'Apple'" +def test_very_long_compound_condition(builder): + """Test handling of very long compound conditions (10+ clauses).""" + clauses = [] + for i in range(10): + clauses.append(f"AND devName != 'Device{i}'") - sql, params = self.builder.build_safe_condition(condition) + condition = " ".join(clauses) + sql, params = builder.build_safe_condition(condition) - # Every parameter should appear in the SQL - for param_name in params.keys(): - self.assertIn(f':{param_name}', sql) + # Should have 10 parameters + assert len(params) == 10 + # Should have 10 AND operators + assert sql.count('AND') == 10 -if __name__ == '__main__': - unittest.main() + # Verify all device names are parameterized + param_values = list(params.values()) + for i in range(10): + assert f'Device{i}' in param_values + + +def test_parameters_have_unique_names(builder): + """Test that all parameters get unique names.""" + condition = "AND devName = 'A' AND devName = 'B' AND devName = 'C'" + + sql, params = builder.build_safe_condition(condition) + + # All parameter names should be unique + param_names = list(params.keys()) + assert len(param_names) == len(set(param_names)) + + +def test_parameter_values_match_condition(builder): + """Test that parameter values correctly match the condition values.""" + condition = "AND devLastIP NOT LIKE '192.168.1.%' AND devLastIP NOT LIKE '10.0.0.%'" + + sql, params = builder.build_safe_condition(condition) + + # Should have exactly the values from the condition + param_values = sorted(params.values()) + expected_values = sorted(['192.168.1.%', '10.0.0.%']) + assert param_values == expected_values + + +def test_parameters_referenced_in_sql(builder): + """Test that all parameters are actually referenced in the SQL.""" + condition = "AND devName = 'Device1' AND devVendor = 'Apple'" + + sql, params = builder.build_safe_condition(condition) + + # Every parameter should appear in the SQL + for param_name in params.keys(): + assert f':{param_name}' in sql diff --git a/test/test_safe_builder_unit.py b/test/test_safe_builder_unit.py index 356fdee1..a4f416c1 100755 --- a/test/test_safe_builder_unit.py +++ b/test/test_safe_builder_unit.py @@ -4,15 +4,15 @@ This test file has minimal dependencies to ensure it can run in any environment. """ import sys -import unittest import re +import pytest from unittest.mock import Mock, patch # Mock the logger module to avoid dependency issues sys.modules['logger'] = Mock() # Standalone version of SafeConditionBuilder for testing -class TestSafeConditionBuilder: +class SafeConditionBuilder: """ Test version of SafeConditionBuilder with mock logger. """ @@ -152,180 +152,182 @@ class TestSafeConditionBuilder: return "", {} -class TestSafeConditionBuilderSecurity(unittest.TestCase): - """Test cases for the SafeConditionBuilder security functionality.""" - - def setUp(self): - """Set up test fixtures before each test method.""" - self.builder = TestSafeConditionBuilder() - - def test_initialization(self): - """Test that SafeConditionBuilder initializes correctly.""" - self.assertIsInstance(self.builder, TestSafeConditionBuilder) - self.assertEqual(self.builder.param_counter, 0) - self.assertEqual(self.builder.parameters, {}) - - def test_sanitize_string(self): - """Test string sanitization functionality.""" - # Test normal string - result = self.builder._sanitize_string("normal string") - self.assertEqual(result, "normal string") - - # Test s-quote replacement - result = self.builder._sanitize_string("test{s-quote}value") - self.assertEqual(result, "test'value") - - # Test control character removal - result = self.builder._sanitize_string("test\x00\x01string") - self.assertEqual(result, "teststring") - - # Test excessive whitespace - result = self.builder._sanitize_string(" test string ") - self.assertEqual(result, "test string") - - def test_validate_column_name(self): - """Test column name validation against whitelist.""" - # Valid columns - self.assertTrue(self.builder._validate_column_name('eve_MAC')) - self.assertTrue(self.builder._validate_column_name('devName')) - self.assertTrue(self.builder._validate_column_name('eve_EventType')) - - # Invalid columns - self.assertFalse(self.builder._validate_column_name('malicious_column')) - self.assertFalse(self.builder._validate_column_name('drop_table')) - self.assertFalse(self.builder._validate_column_name('user_input')) - - def test_validate_operator(self): - """Test operator validation against whitelist.""" - # Valid operators - self.assertTrue(self.builder._validate_operator('=')) - self.assertTrue(self.builder._validate_operator('LIKE')) - self.assertTrue(self.builder._validate_operator('IN')) - - # Invalid operators - self.assertFalse(self.builder._validate_operator('UNION')) - self.assertFalse(self.builder._validate_operator('DROP')) - self.assertFalse(self.builder._validate_operator('EXEC')) - - def test_build_simple_condition_valid(self): - """Test building valid simple conditions.""" - sql, params = self.builder._build_simple_condition('AND', 'devName', '=', 'TestDevice') - - self.assertIn('AND devName = :param_', sql) - self.assertEqual(len(params), 1) - self.assertIn('TestDevice', params.values()) - - def test_build_simple_condition_invalid_column(self): - """Test that invalid column names are rejected.""" - with self.assertRaises(ValueError) as context: - self.builder._build_simple_condition('AND', 'invalid_column', '=', 'value') - - self.assertIn('Invalid column name', str(context.exception)) - - def test_build_simple_condition_invalid_operator(self): - """Test that invalid operators are rejected.""" - with self.assertRaises(ValueError) as context: - self.builder._build_simple_condition('AND', 'devName', 'UNION', 'value') - - self.assertIn('Invalid operator', str(context.exception)) - - def test_sql_injection_attempts(self): - """Test that various SQL injection attempts are blocked.""" - injection_attempts = [ - "'; DROP TABLE Devices; --", - "' UNION SELECT * FROM Settings --", - "' OR 1=1 --", - "'; INSERT INTO Events VALUES(1,2,3); --", - "' AND (SELECT COUNT(*) FROM sqlite_master) > 0 --", - ] - - for injection in injection_attempts: - with self.subTest(injection=injection): - with self.assertRaises(ValueError): - self.builder.build_safe_condition(f"AND devName = '{injection}'") - - def test_legacy_condition_compatibility(self): - """Test backward compatibility with legacy condition formats.""" - # Test simple condition - sql, params = self.builder.get_safe_condition_legacy("AND devName = 'TestDevice'") - self.assertIn('devName', sql) - self.assertIn('TestDevice', params.values()) - - # Test empty condition - sql, params = self.builder.get_safe_condition_legacy("") - self.assertEqual(sql, "") - self.assertEqual(params, {}) - - # Test invalid condition returns empty - sql, params = self.builder.get_safe_condition_legacy("INVALID SQL INJECTION") - self.assertEqual(sql, "") - self.assertEqual(params, {}) - - def test_parameter_generation(self): - """Test that parameters are generated correctly.""" - # Test multiple parameters - sql1, params1 = self.builder.build_safe_condition("AND devName = 'Device1'") - sql2, params2 = self.builder.build_safe_condition("AND devName = 'Device2'") - - # Each should have unique parameter names - self.assertNotEqual(list(params1.keys())[0], list(params2.keys())[0]) - - def test_xss_prevention(self): - """Test that XSS-like payloads in device names are handled safely.""" - xss_payloads = [ - "", - "javascript:alert(1)", - "", - "'; DROP TABLE users; SELECT '' --" - ] - - for payload in xss_payloads: - with self.subTest(payload=payload): - # Should either process safely or reject - try: - sql, params = self.builder.build_safe_condition(f"AND devName = '{payload}'") - # If processed, should be parameterized - self.assertIn(':', sql) - self.assertIn(payload, params.values()) - except ValueError: - # Rejection is also acceptable for safety - pass - - def test_unicode_handling(self): - """Test that Unicode characters are handled properly.""" - unicode_strings = [ - "Ülrich's Device", - "Café Network", - "测试设备", - "Устройство" - ] - - for unicode_str in unicode_strings: - with self.subTest(unicode_str=unicode_str): - sql, params = self.builder.build_safe_condition(f"AND devName = '{unicode_str}'") - self.assertIn(unicode_str, params.values()) - - def test_edge_cases(self): - """Test edge cases and boundary conditions.""" - edge_cases = [ - "", # Empty string - " ", # Whitespace only - "AND devName = ''", # Empty value - "AND devName = 'a'", # Single character - "AND devName = '" + "x" * 1000 + "'", # Very long string - ] - - for case in edge_cases: - with self.subTest(case=case): - try: - sql, params = self.builder.get_safe_condition_legacy(case) - # Should either return valid result or empty safe result - self.assertIsInstance(sql, str) - self.assertIsInstance(params, dict) - except Exception: - self.fail(f"Unexpected exception for edge case: {case}") +@pytest.fixture +def builder(): + """Fixture to provide a fresh SafeConditionBuilder instance for each test.""" + return SafeConditionBuilder() -if __name__ == '__main__': - # Run the test suite - unittest.main(verbosity=2) \ No newline at end of file +def test_initialization(builder): + """Test that SafeConditionBuilder initializes correctly.""" + assert isinstance(builder, SafeConditionBuilder) + assert builder.param_counter == 0 + assert builder.parameters == {} + + +def test_sanitize_string(builder): + """Test string sanitization functionality.""" + # Test normal string + result = builder._sanitize_string("normal string") + assert result == "normal string" + + # Test s-quote replacement + result = builder._sanitize_string("test{s-quote}value") + assert result == "test'value" + + # Test control character removal + result = builder._sanitize_string("test\x00\x01string") + assert result == "teststring" + + # Test excessive whitespace + result = builder._sanitize_string(" test string ") + assert result == "test string" + + +def test_validate_column_name(builder): + """Test column name validation against whitelist.""" + # Valid columns + assert builder._validate_column_name('eve_MAC') + assert builder._validate_column_name('devName') + assert builder._validate_column_name('eve_EventType') + + # Invalid columns + assert not builder._validate_column_name('malicious_column') + assert not builder._validate_column_name('drop_table') + assert not builder._validate_column_name('user_input') + + +def test_validate_operator(builder): + """Test operator validation against whitelist.""" + # Valid operators + assert builder._validate_operator('=') + assert builder._validate_operator('LIKE') + assert builder._validate_operator('IN') + + # Invalid operators + assert not builder._validate_operator('UNION') + assert not builder._validate_operator('DROP') + assert not builder._validate_operator('EXEC') + + +def test_build_simple_condition_valid(builder): + """Test building valid simple conditions.""" + sql, params = builder._build_simple_condition('AND', 'devName', '=', 'TestDevice') + + assert 'AND devName = :param_' in sql + assert len(params) == 1 + assert 'TestDevice' in params.values() + + +def test_build_simple_condition_invalid_column(builder): + """Test that invalid column names are rejected.""" + with pytest.raises(ValueError) as exc_info: + builder._build_simple_condition('AND', 'invalid_column', '=', 'value') + + assert 'Invalid column name' in str(exc_info.value) + + +def test_build_simple_condition_invalid_operator(builder): + """Test that invalid operators are rejected.""" + with pytest.raises(ValueError) as exc_info: + builder._build_simple_condition('AND', 'devName', 'UNION', 'value') + + assert 'Invalid operator' in str(exc_info.value) + + +def test_sql_injection_attempts(builder): + """Test that various SQL injection attempts are blocked.""" + injection_attempts = [ + "'; DROP TABLE Devices; --", + "' UNION SELECT * FROM Settings --", + "' OR 1=1 --", + "'; INSERT INTO Events VALUES(1,2,3); --", + "' AND (SELECT COUNT(*) FROM sqlite_master) > 0 --", + ] + + for injection in injection_attempts: + with pytest.raises(ValueError): + builder.build_safe_condition(f"AND devName = '{injection}'") + + +def test_legacy_condition_compatibility(builder): + """Test backward compatibility with legacy condition formats.""" + # Test simple condition + sql, params = builder.get_safe_condition_legacy("AND devName = 'TestDevice'") + assert 'devName' in sql + assert 'TestDevice' in params.values() + + # Test empty condition + sql, params = builder.get_safe_condition_legacy("") + assert sql == "" + assert params == {} + + # Test invalid condition returns empty + sql, params = builder.get_safe_condition_legacy("INVALID SQL INJECTION") + assert sql == "" + assert params == {} + + +def test_parameter_generation(builder): + """Test that parameters are generated correctly.""" + # Test single parameter + sql, params = builder.build_safe_condition("AND devName = 'Device1'") + + # Should have 1 parameter + assert len(params) == 1 + assert 'param_1' in params + + +def test_xss_prevention(builder): + """Test that XSS-like payloads in device names are handled safely.""" + xss_payloads = [ + "", + "javascript:alert(1)", + "", + "'; DROP TABLE users; SELECT '' --" + ] + + for payload in xss_payloads: + # Should either process safely or reject + try: + sql, params = builder.build_safe_condition(f"AND devName = '{payload}'") + # If processed, should be parameterized + assert ':' in sql + assert payload in params.values() + except ValueError: + # Rejection is also acceptable for safety + pass + + +def test_unicode_handling(builder): + """Test that Unicode characters are handled properly.""" + unicode_strings = [ + "Ülrichs Device", + "Café Network", + "测试设备", + "Устройство" + ] + + for unicode_str in unicode_strings: + sql, params = builder.build_safe_condition(f"AND devName = '{unicode_str}'") + assert unicode_str in params.values() + + +def test_edge_cases(builder): + """Test edge cases and boundary conditions.""" + edge_cases = [ + "", # Empty string + " ", # Whitespace only + "AND devName = ''", # Empty value + "AND devName = 'a'", # Single character + "AND devName = '" + "x" * 1000 + "'", # Very long string + ] + + for case in edge_cases: + try: + sql, params = builder.get_safe_condition_legacy(case) + # Should either return valid result or empty safe result + assert isinstance(sql, str) + assert isinstance(params, dict) + except Exception: + pytest.fail(f"Unexpected exception for edge case: {case}") \ No newline at end of file