diff --git a/get_blacklisted_ip.py b/get_blacklisted_ip.py index 8f06a1f..0ce9948 100644 --- a/get_blacklisted_ip.py +++ b/get_blacklisted_ip.py @@ -13,107 +13,49 @@ blocklist_sources = { "Greensnow": "https://blocklist.greensnow.co/greensnow.txt", } -# --- Tor Exit Node Source (Testing) --- -tor_exit_nodes_url = "https://check.torproject.org/exit-addresses" # Testing +# Tor Exit Node Source +tor_exit_nodes_url = "https://check.torproject.org/exit-addresses" def extract_ips(source_name, url): - """Fetches data from the given URL and extracts IP addresses.""" + """Fetches data from the given URL and extracts IP addresses in CIDR format.""" ips = set() try: response = requests.get(url, timeout=10) - response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + response.raise_for_status() content = response.text except requests.exceptions.RequestException as e: print(f"Error fetching {source_name} from {url}: {e}") return ips - if source_name == "Talos Intelligence": - print(f"Skipping {source_name} due to webpage format, needs manual parsing.") - return ips - elif source_name == "TOR Exit Nodes": - for line in content.splitlines(): - if line.startswith("ExitAddress"): - parts = line.split(" ") - if len(parts) > 1: - try: - ipaddress.ip_address(parts[1].strip()) - ips.add(parts[1].strip()) - except ValueError: - continue - return ips - elif source_name == "Spamhaus DROP" or source_name == "Spamhaus EDROP": - for line in content.splitlines(): - line = line.strip() - if not line or line.startswith(";"): + for line in content.splitlines(): + line = line.strip() + if not line or line.startswith("#") or line.startswith(";"): + continue + + # MODIFIED: Preserve CIDR notation if it already exists + if "/" in line: + try: + # Validate it's a real network and add it + net = ipaddress.ip_network(line, strict=False) + ips.add(net.with_prefixlen) + except ValueError: continue - if "/" in line: - try: - for ip in ipaddress.ip_network(line, strict=False): - ips.add(str(ip)) - except ValueError: - continue - else: - try: - ipaddress.ip_address(line) - ips.add(line) - except ValueError: - continue - return ips - elif source_name == "MaxMind GeoIP2 Anonymous IP Database": - # Requires a license key, skipping for now. - print(f"Skipping {source_name} because it requires a license key.") - return ips - else: - # Default parsing for normal text file blocklists - for line in content.splitlines(): - line = line.strip() - if not line or line.startswith("#"): + else: + # MODIFIED: Convert single IPs to CIDR notation + try: + ip_obj = ipaddress.ip_address(line) + if ip_obj.version == 4: + ips.add(f"{line}/32") + else: + ips.add(f"{line}/128") + except ValueError: continue - # Normalize ranges to single IPs - if "/" in line: - try: - for ip in ipaddress.ip_network(line, strict=False): - ips.add(str(ip)) - except ValueError: - continue - elif "-" in line: - try: - start, end = line.split('-') - start_ip = ipaddress.ip_address(start.strip()) - end_ip = ipaddress.ip_address(end.strip()) - if start_ip.version == end_ip.version: - for ip_int in range(int(start_ip), int(end_ip) + 1): - ips.add(str(ipaddress.ip_address(ip_int))) - except ValueError: - continue - else: - try: - ipaddress.ip_address(line) - ips.add(line) - except ValueError: - continue - return ips + return ips -def is_valid_ip(ip_str): - """Helper function to check if an IP address is valid.""" - try: - ipaddress.ip_address(ip_str) - return True - except ValueError: - return False - - -def ip_to_int(ip_str): - """Converts an IP address string to its integer representation.""" - try: - return int(ipaddress.ip_address(ip_str)) - except ValueError: - return None - def extract_tor_exit_nodes(url): - """Fetches data from the given URL and extracts Tor exit node IPs.""" + """Fetches Tor exit node IPs and returns them in CIDR format.""" ips = set() try: response = requests.get(url, timeout=10) @@ -127,9 +69,14 @@ def extract_tor_exit_nodes(url): if line.startswith("ExitAddress"): parts = line.split(" ") if len(parts) > 1: + ip_str = parts[1].strip() + # MODIFIED: Convert single IPs to CIDR notation try: - ipaddress.ip_address(parts[1].strip()) - ips.add(parts[1].strip()) + ip_obj = ipaddress.ip_address(ip_str) + if ip_obj.version == 4: + ips.add(f"{ip_str}/32") + else: + ips.add(f"{ip_str}/128") except ValueError: continue return ips @@ -140,31 +87,21 @@ def main(): for source_name, url in tqdm(blocklist_sources.items(), desc="Processing Blocklists"): print(f"Processing {source_name} from {url}") ips = extract_ips(source_name, url) - print(f" Found {len(ips)} IPs in {source_name}") + print(f" Found {len(ips)} IPs/CIDRs in {source_name}") combined_ips.update(ips) - # --- Tor Exit Node Processing (Testing) --- + # Tor Exit Node Processing tor_exit_ips = extract_tor_exit_nodes(tor_exit_nodes_url) - print(f"Total Tor exit node IPs: {len(tor_exit_ips)}") - valid_tor_ips = [ip for ip in tor_exit_ips if is_valid_ip(ip)] - print(f"Total Valid Tor IPs after filtering: {len(valid_tor_ips)}") + print(f"Total Tor exit node IPs/CIDRs: {len(tor_exit_ips)}") + combined_ips.update(tor_exit_ips) - # Add Tor exit IPs to the combined IPs - combined_ips.update(valid_tor_ips) - - print(f"Total IPs before filtering and deduplication: {len(combined_ips)}") - - # Filter out invalid IPs before sorting. - valid_ips = [ip for ip in combined_ips if is_valid_ip(ip)] - print(f"Total Valid IPs after filtering: {len(valid_ips)}") - - # Remove duplicates by converting to a set before sorting - unique_ips = set(valid_ips) + print(f"Total Unique IPs/CIDRs after deduplication: {len(combined_ips)}") + # MODIFIED: The final write loop is simpler. The sorting key is removed as sorting CIDRs as integers is incorrect. + # A simple lexicographical sort is sufficient here. with open("ip_blacklist.txt", "w") as f: - # Sort using the integer representation and write each IP to the file - for ip in sorted(unique_ips, key=ip_to_int): - f.write(f"{ip}\n") + for ip_cidr in sorted(list(combined_ips)): + f.write(f"{ip_cidr}\n") print("IP blacklist saved to ip_blacklist.txt")