mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-01-17 19:28:27 -05:00
364 lines
10 KiB
Python
364 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Generate a synthetic NetAlertX device CSV using the same column order as the shipped
|
|
sample inventory. This is intended for test data and keeps a simple parent/child
|
|
topology: one router, a few switches, a few APs, then leaf nodes. MACs, IPs, names,
|
|
and timestamps are random but reproducible with --seed.
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import datetime as dt
|
|
import random
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
import ipaddress
|
|
|
|
# Default header copied from the sample inventory CSV to preserve column order.
|
|
DEFAULT_HEADER = [
|
|
"devMac",
|
|
"devName",
|
|
"devOwner",
|
|
"devType",
|
|
"devVendor",
|
|
"devFavorite",
|
|
"devGroup",
|
|
"devComments",
|
|
"devFirstConnection",
|
|
"devLastConnection",
|
|
"devLastIP",
|
|
"devStaticIP",
|
|
"devScan",
|
|
"devLogEvents",
|
|
"devAlertEvents",
|
|
"devAlertDown",
|
|
"devSkipRepeated",
|
|
"devLastNotification",
|
|
"devPresentLastScan",
|
|
"devIsNew",
|
|
"devLocation",
|
|
"devIsArchived",
|
|
"devParentPort",
|
|
"devParentMAC",
|
|
"devIcon",
|
|
"devGUID",
|
|
"devSyncHubNode",
|
|
"devSite",
|
|
"devSSID",
|
|
"devSourcePlugin",
|
|
"devCustomProps",
|
|
"devFQDN",
|
|
"devParentRelType",
|
|
"devReqNicsOnline",
|
|
]
|
|
|
|
ICON_DEFAULT = "PGkgY2xhc3M9J2ZhIGZhLWFuY2hvci1ub2RlJz48L2k+" # simple placeholder icon
|
|
|
|
VENDORS = [
|
|
"Raspberry Pi Trading Ltd",
|
|
"Dell Inc.",
|
|
"Intel Corporate",
|
|
"Espressif Inc.",
|
|
"Micro-Star INTL CO., LTD.",
|
|
"Google, Inc.",
|
|
"Hewlett Packard",
|
|
"ASUSTek COMPUTER INC.",
|
|
"TP-LINK TECHNOLOGIES CO.,LTD.",
|
|
]
|
|
|
|
LOCATIONS = [
|
|
"Com Closet",
|
|
"Office",
|
|
"Garage",
|
|
"Living Room",
|
|
"Master Bedroom",
|
|
"Kitchen",
|
|
"Attic",
|
|
"Outside",
|
|
]
|
|
|
|
DEVICE_TYPES = [
|
|
"Server",
|
|
"Laptop",
|
|
"NAS",
|
|
"Phone",
|
|
"TV Decoder",
|
|
"Printer",
|
|
"IoT",
|
|
"Camera",
|
|
]
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Generate a synthetic device CSV for NetAlertX")
|
|
parser.add_argument("--output", "-o", type=Path, default=Path("generated-devices.csv"), help="Output CSV path")
|
|
parser.add_argument("--seed", type=int, default=None, help="Seed for reproducible output")
|
|
parser.add_argument("--devices", type=int, default=40, help="Number of leaf nodes to generate")
|
|
parser.add_argument("--switches", type=int, default=2, help="Number of switches under the router")
|
|
parser.add_argument("--aps", type=int, default=3, help="Number of APs under switches")
|
|
parser.add_argument("--site", default="default", help="Site name")
|
|
parser.add_argument("--ssid", default="lab", help="SSID placeholder")
|
|
parser.add_argument("--owner", default="Test Lab", help="Owner name for devices")
|
|
parser.add_argument(
|
|
"--network",
|
|
default="192.168.50.0/22",
|
|
help="IPv4 network to draw addresses from (must have enough hosts for requested devices)",
|
|
)
|
|
parser.add_argument(
|
|
"--template",
|
|
type=Path,
|
|
help="Optional CSV to pull header from; defaults to the sample inventory layout",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def load_header(template_path: Path | None) -> list[str]:
|
|
if not template_path:
|
|
return DEFAULT_HEADER
|
|
try:
|
|
with template_path.open(newline="", encoding="utf-8") as handle:
|
|
reader = csv.reader(handle)
|
|
header = next(reader)
|
|
return header if header else DEFAULT_HEADER
|
|
except FileNotFoundError:
|
|
return DEFAULT_HEADER
|
|
|
|
|
|
def random_mac(existing: set[str]) -> str:
|
|
while True:
|
|
mac = ":".join(f"{random.randint(0, 255):02x}" for _ in range(6))
|
|
if mac not in existing:
|
|
existing.add(mac)
|
|
return mac
|
|
|
|
|
|
def prepare_ip_pool(network_cidr: str) -> list[str]:
|
|
network = ipaddress.ip_network(network_cidr, strict=False)
|
|
hosts = list(network.hosts())
|
|
if not hosts:
|
|
raise ValueError(f"Network {network} has no usable hosts")
|
|
return [str(host) for host in hosts]
|
|
|
|
|
|
def random_time(now: dt.datetime) -> str:
|
|
delta_days = random.randint(0, 180)
|
|
delta_seconds = random.randint(0, 86400)
|
|
ts = now - dt.timedelta(days=delta_days, seconds=delta_seconds)
|
|
return ts.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def build_row(
|
|
name: str,
|
|
dev_type: str,
|
|
vendor: str,
|
|
mac: str,
|
|
parent_mac: str,
|
|
ip: str,
|
|
header: list[str],
|
|
owner: str,
|
|
site: str,
|
|
ssid: str,
|
|
now: dt.datetime,
|
|
) -> dict[str, str]:
|
|
comments = "Synthetic device generated for testing."
|
|
t1 = random_time(now)
|
|
t2 = random_time(now)
|
|
first_seen, last_seen = (t1, t2) if t1 <= t2 else (t2, t1)
|
|
fqdn = f"{name.lower().replace(' ', '-')}.{site}" if name else ""
|
|
|
|
# Minimal fields set; missing ones default to empty string for CSV compatibility.
|
|
base = {
|
|
"devMac": mac,
|
|
"devName": name,
|
|
"devOwner": owner,
|
|
"devType": dev_type,
|
|
"devVendor": vendor,
|
|
"devFavorite": "0",
|
|
"devGroup": "Always on" if dev_type in {"Router", "Switch", "AP", "Firewall"} else "",
|
|
"devComments": comments,
|
|
"devFirstConnection": first_seen,
|
|
"devLastConnection": last_seen,
|
|
"devLastIP": ip,
|
|
"devStaticIP": "1",
|
|
"devScan": "1",
|
|
"devLogEvents": "1",
|
|
"devAlertEvents": "1",
|
|
"devAlertDown": "0",
|
|
"devSkipRepeated": "0",
|
|
"devLastNotification": "",
|
|
"devPresentLastScan": "0",
|
|
"devIsNew": "0",
|
|
"devLocation": random.choice(LOCATIONS),
|
|
"devIsArchived": "0",
|
|
"devParentPort": "0",
|
|
"devParentMAC": parent_mac,
|
|
"devIcon": ICON_DEFAULT,
|
|
"devGUID": str(uuid.uuid4()),
|
|
"devSyncHubNode": "",
|
|
"devSite": site,
|
|
"devSSID": ssid,
|
|
"devSourcePlugin": "GENERATOR",
|
|
"devCustomProps": "",
|
|
"devFQDN": fqdn,
|
|
"devParentRelType": "None",
|
|
"devReqNicsOnline": "0",
|
|
}
|
|
|
|
# Ensure all header columns exist; extra columns are ignored by writer.
|
|
return {key: base.get(key, "") for key in header}
|
|
|
|
|
|
def generate_rows(args: argparse.Namespace, header: list[str]) -> list[dict[str, str]]:
|
|
now = dt.datetime.utcnow()
|
|
macs: set[str] = set()
|
|
ip_pool = prepare_ip_pool(args.network)
|
|
|
|
rows: list[dict[str, str]] = []
|
|
|
|
# Include one Internet root device that anchors the tree; it does not consume an IP.
|
|
required_devices = 1 + args.switches + args.aps + args.devices
|
|
if required_devices > len(ip_pool):
|
|
raise ValueError(
|
|
f"Not enough IPs in {args.network}: need {required_devices}, available {len(ip_pool)}. "
|
|
"Use --network with a larger range (e.g., 192.168.50.0/21)."
|
|
)
|
|
|
|
def take_ip() -> str:
|
|
choice = random.choice(ip_pool)
|
|
ip_pool.remove(choice)
|
|
return choice
|
|
|
|
# Root "Internet" device (no parent, no IP) so the topology has a defined root.
|
|
root_row = build_row(
|
|
name="Internet",
|
|
dev_type="Gateway",
|
|
vendor="NetAlertX",
|
|
mac="Internet",
|
|
parent_mac="",
|
|
ip="",
|
|
header=header,
|
|
owner=args.owner,
|
|
site=args.site,
|
|
ssid=args.ssid,
|
|
now=now,
|
|
)
|
|
root_row["devComments"] = "Synthetic root device representing the Internet."
|
|
root_row["devParentRelType"] = "Root"
|
|
root_row["devStaticIP"] = "0"
|
|
root_row["devScan"] = "0"
|
|
root_row["devAlertEvents"] = "0"
|
|
root_row["devAlertDown"] = "0"
|
|
root_row["devLogEvents"] = "0"
|
|
root_row["devPresentLastScan"] = "0"
|
|
rows.append(root_row)
|
|
|
|
router_mac = random_mac(macs)
|
|
router_ip = take_ip()
|
|
rows.append(
|
|
build_row(
|
|
name="Router-1",
|
|
dev_type="Firewall",
|
|
vendor=random.choice(VENDORS),
|
|
mac=router_mac,
|
|
parent_mac="Internet",
|
|
ip=router_ip,
|
|
header=header,
|
|
owner=args.owner,
|
|
site=args.site,
|
|
ssid=args.ssid,
|
|
now=now,
|
|
)
|
|
)
|
|
|
|
switch_macs: list[str] = []
|
|
for idx in range(1, args.switches + 1):
|
|
mac = random_mac(macs)
|
|
ip = take_ip()
|
|
switch_macs.append(mac)
|
|
rows.append(
|
|
build_row(
|
|
name=f"Switch-{idx}",
|
|
dev_type="Switch",
|
|
vendor=random.choice(VENDORS),
|
|
mac=mac,
|
|
parent_mac=router_mac,
|
|
ip=ip,
|
|
header=header,
|
|
owner=args.owner,
|
|
site=args.site,
|
|
ssid=args.ssid,
|
|
now=now,
|
|
)
|
|
)
|
|
|
|
ap_macs: list[str] = []
|
|
for idx in range(1, args.aps + 1):
|
|
mac = random_mac(macs)
|
|
ip = take_ip()
|
|
parent_mac = random.choice(switch_macs) if switch_macs else router_mac
|
|
ap_macs.append(mac)
|
|
rows.append(
|
|
build_row(
|
|
name=f"AP-{idx}",
|
|
dev_type="AP",
|
|
vendor=random.choice(VENDORS),
|
|
mac=mac,
|
|
parent_mac=parent_mac,
|
|
ip=ip,
|
|
header=header,
|
|
owner=args.owner,
|
|
site=args.site,
|
|
ssid=args.ssid,
|
|
now=now,
|
|
)
|
|
)
|
|
|
|
for idx in range(1, args.devices + 1):
|
|
mac = random_mac(macs)
|
|
ip = take_ip()
|
|
parent_pool = ap_macs or switch_macs or [router_mac]
|
|
parent_mac = random.choice(parent_pool)
|
|
dev_type = random.choice(DEVICE_TYPES)
|
|
name_prefix = "Node" if dev_type == "Server" else "Node"
|
|
name = f"{name_prefix}-{idx:02d}"
|
|
rows.append(
|
|
build_row(
|
|
name=name,
|
|
dev_type=dev_type,
|
|
vendor=random.choice(VENDORS),
|
|
mac=mac,
|
|
parent_mac=parent_mac,
|
|
ip=ip,
|
|
header=header,
|
|
owner=args.owner,
|
|
site=args.site,
|
|
ssid=args.ssid,
|
|
now=now,
|
|
)
|
|
)
|
|
|
|
return rows
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
args = parse_args(argv)
|
|
if args.seed is not None:
|
|
random.seed(args.seed)
|
|
|
|
header = load_header(args.template)
|
|
rows = generate_rows(args, header)
|
|
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
with args.output.open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=header, quoting=csv.QUOTE_MINIMAL)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
print(f"Wrote {len(rows)} devices to {args.output}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv[1:]))
|