Refactor IP extraction and handling in get_blacklisted_ip.py

The Caddy server was panicking on startup due to a data format mismatch.
The `caddy-waf` Go module expects IP addresses in CIDR notation (e.g., `1.2.3.4/32`) for its blacklist, but the `get_blacklisted_ip.py` script was generating a list of plain IPs.

This commit updates the Python script to:
- Append `/32` to all individual IPv4 addresses.
- Append `/128` to all individual IPv6 addresses.
- Preserve existing CIDR ranges from source blocklists.

This ensures the generated `ip_blacklist.txt` is in the correct format required by the Go module, resolving the `netip.ParsePrefix` panic.
This commit is contained in:
fab
2025-10-16 00:42:00 +02:00
committed by GitHub
parent fa7f421773
commit f45e8331cc

View File

@@ -13,107 +13,49 @@ blocklist_sources = {
"Greensnow": "https://blocklist.greensnow.co/greensnow.txt",
}
# --- Tor Exit Node Source (Testing) ---
tor_exit_nodes_url = "https://check.torproject.org/exit-addresses" # Testing
# Tor Exit Node Source
tor_exit_nodes_url = "https://check.torproject.org/exit-addresses"
def extract_ips(source_name, url):
"""Fetches data from the given URL and extracts IP addresses."""
"""Fetches data from the given URL and extracts IP addresses in CIDR format."""
ips = set()
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
response.raise_for_status()
content = response.text
except requests.exceptions.RequestException as e:
print(f"Error fetching {source_name} from {url}: {e}")
return ips
if source_name == "Talos Intelligence":
print(f"Skipping {source_name} due to webpage format, needs manual parsing.")
return ips
elif source_name == "TOR Exit Nodes":
for line in content.splitlines():
if line.startswith("ExitAddress"):
parts = line.split(" ")
if len(parts) > 1:
try:
ipaddress.ip_address(parts[1].strip())
ips.add(parts[1].strip())
except ValueError:
continue
return ips
elif source_name == "Spamhaus DROP" or source_name == "Spamhaus EDROP":
for line in content.splitlines():
line = line.strip()
if not line or line.startswith(";"):
if not line or line.startswith("#") or line.startswith(";"):
continue
# MODIFIED: Preserve CIDR notation if it already exists
if "/" in line:
try:
for ip in ipaddress.ip_network(line, strict=False):
ips.add(str(ip))
# Validate it's a real network and add it
net = ipaddress.ip_network(line, strict=False)
ips.add(net.with_prefixlen)
except ValueError:
continue
else:
# MODIFIED: Convert single IPs to CIDR notation
try:
ipaddress.ip_address(line)
ips.add(line)
except ValueError:
continue
return ips
elif source_name == "MaxMind GeoIP2 Anonymous IP Database":
# Requires a license key, skipping for now.
print(f"Skipping {source_name} because it requires a license key.")
return ips
ip_obj = ipaddress.ip_address(line)
if ip_obj.version == 4:
ips.add(f"{line}/32")
else:
# Default parsing for normal text file blocklists
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
# Normalize ranges to single IPs
if "/" in line:
try:
for ip in ipaddress.ip_network(line, strict=False):
ips.add(str(ip))
except ValueError:
continue
elif "-" in line:
try:
start, end = line.split('-')
start_ip = ipaddress.ip_address(start.strip())
end_ip = ipaddress.ip_address(end.strip())
if start_ip.version == end_ip.version:
for ip_int in range(int(start_ip), int(end_ip) + 1):
ips.add(str(ipaddress.ip_address(ip_int)))
except ValueError:
continue
else:
try:
ipaddress.ip_address(line)
ips.add(line)
ips.add(f"{line}/128")
except ValueError:
continue
return ips
def is_valid_ip(ip_str):
"""Helper function to check if an IP address is valid."""
try:
ipaddress.ip_address(ip_str)
return True
except ValueError:
return False
def ip_to_int(ip_str):
"""Converts an IP address string to its integer representation."""
try:
return int(ipaddress.ip_address(ip_str))
except ValueError:
return None
def extract_tor_exit_nodes(url):
"""Fetches data from the given URL and extracts Tor exit node IPs."""
"""Fetches Tor exit node IPs and returns them in CIDR format."""
ips = set()
try:
response = requests.get(url, timeout=10)
@@ -127,9 +69,14 @@ def extract_tor_exit_nodes(url):
if line.startswith("ExitAddress"):
parts = line.split(" ")
if len(parts) > 1:
ip_str = parts[1].strip()
# MODIFIED: Convert single IPs to CIDR notation
try:
ipaddress.ip_address(parts[1].strip())
ips.add(parts[1].strip())
ip_obj = ipaddress.ip_address(ip_str)
if ip_obj.version == 4:
ips.add(f"{ip_str}/32")
else:
ips.add(f"{ip_str}/128")
except ValueError:
continue
return ips
@@ -140,31 +87,21 @@ def main():
for source_name, url in tqdm(blocklist_sources.items(), desc="Processing Blocklists"):
print(f"Processing {source_name} from {url}")
ips = extract_ips(source_name, url)
print(f" Found {len(ips)} IPs in {source_name}")
print(f" Found {len(ips)} IPs/CIDRs in {source_name}")
combined_ips.update(ips)
# --- Tor Exit Node Processing (Testing) ---
# Tor Exit Node Processing
tor_exit_ips = extract_tor_exit_nodes(tor_exit_nodes_url)
print(f"Total Tor exit node IPs: {len(tor_exit_ips)}")
valid_tor_ips = [ip for ip in tor_exit_ips if is_valid_ip(ip)]
print(f"Total Valid Tor IPs after filtering: {len(valid_tor_ips)}")
print(f"Total Tor exit node IPs/CIDRs: {len(tor_exit_ips)}")
combined_ips.update(tor_exit_ips)
# Add Tor exit IPs to the combined IPs
combined_ips.update(valid_tor_ips)
print(f"Total IPs before filtering and deduplication: {len(combined_ips)}")
# Filter out invalid IPs before sorting.
valid_ips = [ip for ip in combined_ips if is_valid_ip(ip)]
print(f"Total Valid IPs after filtering: {len(valid_ips)}")
# Remove duplicates by converting to a set before sorting
unique_ips = set(valid_ips)
print(f"Total Unique IPs/CIDRs after deduplication: {len(combined_ips)}")
# MODIFIED: The final write loop is simpler. The sorting key is removed as sorting CIDRs as integers is incorrect.
# A simple lexicographical sort is sufficient here.
with open("ip_blacklist.txt", "w") as f:
# Sort using the integer representation and write each IP to the file
for ip in sorted(unique_ips, key=ip_to_int):
f.write(f"{ip}\n")
for ip_cidr in sorted(list(combined_ips)):
f.write(f"{ip_cidr}\n")
print("IP blacklist saved to ip_blacklist.txt")