Files
NetAlertX/scripts/generate-device-inventory.py
2025-12-08 22:32:16 +00:00

364 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Generate a synthetic NetAlertX device CSV using the same column order as the shipped
sample inventory. This is intended for test data and keeps a simple parent/child
topology: one router, a few switches, a few APs, then leaf nodes. MACs, IPs, names,
and timestamps are random but reproducible with --seed.
"""
import argparse
import csv
import datetime as dt
import random
import sys
import uuid
from pathlib import Path
import ipaddress
# Default header copied from the sample inventory CSV to preserve column order.
DEFAULT_HEADER = [
"devMac",
"devName",
"devOwner",
"devType",
"devVendor",
"devFavorite",
"devGroup",
"devComments",
"devFirstConnection",
"devLastConnection",
"devLastIP",
"devStaticIP",
"devScan",
"devLogEvents",
"devAlertEvents",
"devAlertDown",
"devSkipRepeated",
"devLastNotification",
"devPresentLastScan",
"devIsNew",
"devLocation",
"devIsArchived",
"devParentPort",
"devParentMAC",
"devIcon",
"devGUID",
"devSyncHubNode",
"devSite",
"devSSID",
"devSourcePlugin",
"devCustomProps",
"devFQDN",
"devParentRelType",
"devReqNicsOnline",
]
ICON_DEFAULT = "PGkgY2xhc3M9J2ZhIGZhLWFuY2hvci1ub2RlJz48L2k+" # simple placeholder icon
VENDORS = [
"Raspberry Pi Trading Ltd",
"Dell Inc.",
"Intel Corporate",
"Espressif Inc.",
"Micro-Star INTL CO., LTD.",
"Google, Inc.",
"Hewlett Packard",
"ASUSTek COMPUTER INC.",
"TP-LINK TECHNOLOGIES CO.,LTD.",
]
LOCATIONS = [
"Com Closet",
"Office",
"Garage",
"Living Room",
"Master Bedroom",
"Kitchen",
"Attic",
"Outside",
]
DEVICE_TYPES = [
"Server",
"Laptop",
"NAS",
"Phone",
"TV Decoder",
"Printer",
"IoT",
"Camera",
]
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate a synthetic device CSV for NetAlertX")
parser.add_argument("--output", "-o", type=Path, default=Path("generated-devices.csv"), help="Output CSV path")
parser.add_argument("--seed", type=int, default=None, help="Seed for reproducible output")
parser.add_argument("--devices", type=int, default=40, help="Number of leaf nodes to generate")
parser.add_argument("--switches", type=int, default=2, help="Number of switches under the router")
parser.add_argument("--aps", type=int, default=3, help="Number of APs under switches")
parser.add_argument("--site", default="default", help="Site name")
parser.add_argument("--ssid", default="lab", help="SSID placeholder")
parser.add_argument("--owner", default="Test Lab", help="Owner name for devices")
parser.add_argument(
"--network",
default="192.168.50.0/22",
help="IPv4 network to draw addresses from (must have enough hosts for requested devices)",
)
parser.add_argument(
"--template",
type=Path,
help="Optional CSV to pull header from; defaults to the sample inventory layout",
)
return parser.parse_args(argv)
def load_header(template_path: Path | None) -> list[str]:
if not template_path:
return DEFAULT_HEADER
try:
with template_path.open(newline="", encoding="utf-8") as handle:
reader = csv.reader(handle)
header = next(reader)
return header if header else DEFAULT_HEADER
except FileNotFoundError:
return DEFAULT_HEADER
def random_mac(existing: set[str]) -> str:
while True:
mac = ":".join(f"{random.randint(0, 255):02x}" for _ in range(6))
if mac not in existing:
existing.add(mac)
return mac
def prepare_ip_pool(network_cidr: str) -> list[str]:
network = ipaddress.ip_network(network_cidr, strict=False)
hosts = list(network.hosts())
if not hosts:
raise ValueError(f"Network {network} has no usable hosts")
return [str(host) for host in hosts]
def random_time(now: dt.datetime) -> str:
delta_days = random.randint(0, 180)
delta_seconds = random.randint(0, 86400)
ts = now - dt.timedelta(days=delta_days, seconds=delta_seconds)
return ts.strftime("%Y-%m-%d %H:%M:%S")
def build_row(
name: str,
dev_type: str,
vendor: str,
mac: str,
parent_mac: str,
ip: str,
header: list[str],
owner: str,
site: str,
ssid: str,
now: dt.datetime,
) -> dict[str, str]:
comments = "Synthetic device generated for testing."
t1 = random_time(now)
t2 = random_time(now)
first_seen, last_seen = (t1, t2) if t1 <= t2 else (t2, t1)
fqdn = f"{name.lower().replace(' ', '-')}.{site}" if name else ""
# Minimal fields set; missing ones default to empty string for CSV compatibility.
base = {
"devMac": mac,
"devName": name,
"devOwner": owner,
"devType": dev_type,
"devVendor": vendor,
"devFavorite": "0",
"devGroup": "Always on" if dev_type in {"Router", "Switch", "AP", "Firewall"} else "",
"devComments": comments,
"devFirstConnection": first_seen,
"devLastConnection": last_seen,
"devLastIP": ip,
"devStaticIP": "1",
"devScan": "1",
"devLogEvents": "1",
"devAlertEvents": "1",
"devAlertDown": "0",
"devSkipRepeated": "0",
"devLastNotification": "",
"devPresentLastScan": "0",
"devIsNew": "0",
"devLocation": random.choice(LOCATIONS),
"devIsArchived": "0",
"devParentPort": "0",
"devParentMAC": parent_mac,
"devIcon": ICON_DEFAULT,
"devGUID": str(uuid.uuid4()),
"devSyncHubNode": "",
"devSite": site,
"devSSID": ssid,
"devSourcePlugin": "GENERATOR",
"devCustomProps": "",
"devFQDN": fqdn,
"devParentRelType": "None",
"devReqNicsOnline": "0",
}
# Ensure all header columns exist; extra columns are ignored by writer.
return {key: base.get(key, "") for key in header}
def generate_rows(args: argparse.Namespace, header: list[str]) -> list[dict[str, str]]:
now = dt.datetime.utcnow()
macs: set[str] = set()
ip_pool = prepare_ip_pool(args.network)
rows: list[dict[str, str]] = []
# Include one Internet root device that anchors the tree; it does not consume an IP.
required_devices = 1 + args.switches + args.aps + args.devices
if required_devices > len(ip_pool):
raise ValueError(
f"Not enough IPs in {args.network}: need {required_devices}, available {len(ip_pool)}. "
"Use --network with a larger range (e.g., 192.168.50.0/21)."
)
def take_ip() -> str:
choice = random.choice(ip_pool)
ip_pool.remove(choice)
return choice
# Root "Internet" device (no parent, no IP) so the topology has a defined root.
root_row = build_row(
name="Internet",
dev_type="Gateway",
vendor="NetAlertX",
mac="Internet",
parent_mac="",
ip="",
header=header,
owner=args.owner,
site=args.site,
ssid=args.ssid,
now=now,
)
root_row["devComments"] = "Synthetic root device representing the Internet."
root_row["devParentRelType"] = "Root"
root_row["devStaticIP"] = "0"
root_row["devScan"] = "0"
root_row["devAlertEvents"] = "0"
root_row["devAlertDown"] = "0"
root_row["devLogEvents"] = "0"
root_row["devPresentLastScan"] = "0"
rows.append(root_row)
router_mac = random_mac(macs)
router_ip = take_ip()
rows.append(
build_row(
name="Router-1",
dev_type="Firewall",
vendor=random.choice(VENDORS),
mac=router_mac,
parent_mac="Internet",
ip=router_ip,
header=header,
owner=args.owner,
site=args.site,
ssid=args.ssid,
now=now,
)
)
switch_macs: list[str] = []
for idx in range(1, args.switches + 1):
mac = random_mac(macs)
ip = take_ip()
switch_macs.append(mac)
rows.append(
build_row(
name=f"Switch-{idx}",
dev_type="Switch",
vendor=random.choice(VENDORS),
mac=mac,
parent_mac=router_mac,
ip=ip,
header=header,
owner=args.owner,
site=args.site,
ssid=args.ssid,
now=now,
)
)
ap_macs: list[str] = []
for idx in range(1, args.aps + 1):
mac = random_mac(macs)
ip = take_ip()
parent_mac = random.choice(switch_macs) if switch_macs else router_mac
ap_macs.append(mac)
rows.append(
build_row(
name=f"AP-{idx}",
dev_type="AP",
vendor=random.choice(VENDORS),
mac=mac,
parent_mac=parent_mac,
ip=ip,
header=header,
owner=args.owner,
site=args.site,
ssid=args.ssid,
now=now,
)
)
for idx in range(1, args.devices + 1):
mac = random_mac(macs)
ip = take_ip()
parent_pool = ap_macs or switch_macs or [router_mac]
parent_mac = random.choice(parent_pool)
dev_type = random.choice(DEVICE_TYPES)
name_prefix = "Node" if dev_type == "Server" else "Node"
name = f"{name_prefix}-{idx:02d}"
rows.append(
build_row(
name=name,
dev_type=dev_type,
vendor=random.choice(VENDORS),
mac=mac,
parent_mac=parent_mac,
ip=ip,
header=header,
owner=args.owner,
site=args.site,
ssid=args.ssid,
now=now,
)
)
return rows
def main(argv: list[str]) -> int:
args = parse_args(argv)
if args.seed is not None:
random.seed(args.seed)
header = load_header(args.template)
rows = generate_rows(args, header)
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=header, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
writer.writerows(rows)
print(f"Wrote {len(rows)} devices to {args.output}")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))