mirror of
https://github.com/fabriziosalmi/caddy-mib.git
synced 2025-12-23 22:17:43 -05:00
Update test.py
This commit is contained in:
45
test.py
45
test.py
@@ -22,6 +22,11 @@ LOGIN_BAN_DURATION = 15 # Matches ban_duration for /login (15 seconds)
|
||||
API_MAX_ERRORS = 8 # Matches max_error_count for /api
|
||||
API_BAN_DURATION = 20 # Matches ban_duration for /api (20 seconds)
|
||||
|
||||
def log(message):
|
||||
"""Log a message with a timestamp."""
|
||||
timestamp = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")[:-3]
|
||||
print(f"{timestamp} {message}")
|
||||
|
||||
def send_request(url):
|
||||
"""Send a request using curl and return the HTTP status code and response body."""
|
||||
try:
|
||||
@@ -31,29 +36,38 @@ def send_request(url):
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
# Extract HTTP status code
|
||||
status_code = int(result.stderr.split("HTTP/1.1 ")[1].split()[0])
|
||||
# Extract HTTP status code using regex to handle different HTTP versions
|
||||
import re
|
||||
match = re.search(r"HTTP/\d+\.\d+ (\d+)", result.stderr)
|
||||
status_code = int(match.group(1)) if match else 0
|
||||
match = re.search(r"HTTP/\d\.\d (\d+)", result.stderr)
|
||||
status_code = int(match.group(1)) if match else 0
|
||||
# Extract response body
|
||||
response_body = result.stdout
|
||||
return status_code, response_body
|
||||
except subprocess.CalledProcessError as e:
|
||||
# Handle cases where curl fails (e.g., banned IP)
|
||||
match = re.search(r"HTTP/\d+\.\d+ (\d+)", e.stderr)
|
||||
status_code = int(match.group(1)) if match else 0
|
||||
if "HTTP/1.1 " in e.stderr:
|
||||
status_code = int(e.stderr.split("HTTP/1.1 ")[1].split()[0])
|
||||
response_body = e.stdout
|
||||
return status_code, response_body
|
||||
return status_code, f"Server unreachable or other error: {e.stderr.strip()}"
|
||||
else:
|
||||
log(f"Error: {e.stderr}")
|
||||
return 0, "Server unreachable or other error"
|
||||
except re.error as e:
|
||||
log(f"Regex error: {type(e).__name__} - {e}")
|
||||
return 0, "Regex error"
|
||||
except ValueError as e:
|
||||
log(f"Value error: {type(e).__name__} - {e}")
|
||||
return 0, "Value error"
|
||||
except TypeError as e:
|
||||
log(f"Type error: {type(e).__name__} - {e}")
|
||||
return 0, "Type error"
|
||||
except Exception as e:
|
||||
log(f"Unexpected error: {e}")
|
||||
log(f"Unexpected error: {type(e).__name__} - {e}")
|
||||
return 0, "Unexpected error"
|
||||
|
||||
def log(message):
|
||||
"""Log a message with a timestamp."""
|
||||
timestamp = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")[:-3]
|
||||
print(f"{timestamp} {message}")
|
||||
|
||||
def test_global_ban():
|
||||
"""Test global error tracking and banning."""
|
||||
log("Starting global ban test...")
|
||||
@@ -67,11 +81,10 @@ def test_global_ban():
|
||||
if status_code == 429: # Custom status code for banned IPs
|
||||
log("IP has been banned globally.")
|
||||
log(f"Ban Response: {response_body.strip()}")
|
||||
break
|
||||
return True # Indicate success for this test
|
||||
elif status_code == 0:
|
||||
log("Failed to send request. Aborting test.")
|
||||
success = False
|
||||
return success
|
||||
return False
|
||||
|
||||
# Wait for the ban to expire
|
||||
log(f"Waiting for global ban to expire ({GLOBAL_BAN_DURATION} seconds)...")
|
||||
@@ -103,8 +116,7 @@ def test_login_ban():
|
||||
break
|
||||
elif status_code == 0:
|
||||
log("Failed to send request. Aborting test.")
|
||||
success = False
|
||||
return success
|
||||
return False
|
||||
|
||||
# Wait for the ban to expire
|
||||
log(f"Waiting for /login ban to expire ({LOGIN_BAN_DURATION} seconds)...")
|
||||
@@ -136,8 +148,7 @@ def test_api_ban():
|
||||
break
|
||||
elif status_code == 0:
|
||||
log("Failed to send request. Aborting test.")
|
||||
success = False
|
||||
return success
|
||||
return False
|
||||
|
||||
# Wait for the ban to expire
|
||||
log(f"Waiting for /api ban to expire ({API_BAN_DURATION} seconds)...")
|
||||
|
||||
Reference in New Issue
Block a user