diff --git a/front/plugins/nmap_scan/script.py b/front/plugins/nmap_scan/script.py index 1e8e4a03..fab672ce 100755 --- a/front/plugins/nmap_scan/script.py +++ b/front/plugins/nmap_scan/script.py @@ -9,7 +9,7 @@ import subprocess INSTALL_PATH = os.getenv('NETALERTX_APP', '/app') sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) -from plugin_helper import Plugin_Objects, decodeBase64 +from plugin_helper import Plugin_Objects from logger import mylog, Logger, append_line_to_file from utils.datetime_utils import timeNowDB from helper import get_setting_value @@ -29,33 +29,59 @@ LOG_PATH = logPath + '/plugins' LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log') RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log') +# Initialize the Plugin obj output file +plugin_objects = Plugin_Objects(RESULT_FILE) + #------------------------------------------------------------------------------- def main(): - parser = argparse.ArgumentParser(description='Scan ports of devices specified by IP addresses') - parser.add_argument('ips', nargs='+', help="list of IPs to scan") - parser.add_argument('macs', nargs='+', help="list of MACs related to the supplied IPs in the same order") - parser.add_argument('timeout', nargs='+', help="timeout") - parser.add_argument('args', nargs='+', help="args") - values = parser.parse_args() + parser = argparse.ArgumentParser( + description='Scan ports of devices specified by IP addresses' + ) - # Plugin_Objects is a class that reads data from the RESULT_FILE - # and returns a list of results. - plugin_objects = Plugin_Objects(RESULT_FILE) + # Accept ANY key=value pairs + parser.add_argument('params', nargs='+', help="key=value style params") - # Print a message to indicate that the script is starting. - mylog('debug', [f'[{pluginName}] In script ']) + raw = parser.parse_args() - # Printing the params list to check its content. - mylog('debug', [f'[{pluginName}] values.ips: ', values.ips]) - mylog('debug', [f'[{pluginName}] values.macs: ', values.macs]) - mylog('debug', [f'[{pluginName}] values.timeout: ', values.timeout]) - mylog('debug', [f'[{pluginName}] values.args: ', values.args]) + try: + args = parse_kv_args(raw.params) + except ValueError as e: + mylog('error', [f"[{pluginName}] Argument error: {e}"]) + sys.exit(1) - argsDecoded = decodeBase64(values.args[0].split('=b')[1]) + # Required keys + required = ['ips', 'macs'] + for key in required: + if key not in args: + mylog('error', [f"[{pluginName}] Missing required parameter: {key}"]) + sys.exit(1) - mylog('debug', [f'[{pluginName}] argsDecoded: ', argsDecoded]) + # Parse lists + ip_list = safe_split_list(args['ips'], "ips") + mac_list = safe_split_list(args['macs'], "macs") - entries = performNmapScan(values.ips[0].split('=')[1].split(','), values.macs[0].split('=')[1].split(',') , values.timeout[0].split('=')[1], argsDecoded) + if len(ip_list) != len(mac_list): + mylog('error', [ + f"[{pluginName}] Mismatch: {len(ip_list)} IPs but {len(mac_list)} MACs" + ]) + sys.exit(1) + + # Optional + timeout = int(args.get("timeout", get_setting_value("NMAP_RUN_TIMEOUT"))) + + NMAP_ARGS = get_setting_value("NMAP_ARGS") + + mylog('debug', [f'[{pluginName}] Parsed IPs: {ip_list}']) + mylog('debug', [f'[{pluginName}] Parsed MACs: {mac_list}']) + mylog('debug', [f'[{pluginName}] Timeout: {timeout}']) + mylog('debug', [f'[{pluginName}] NMAP_ARGS: {NMAP_ARGS}']) + + entries = performNmapScan( + ip_list, + mac_list, + timeout, + NMAP_ARGS + ) mylog('verbose', [f'[{pluginName}] Total number of ports found by NMAP: ', len(entries)]) @@ -89,6 +115,35 @@ class nmap_entry: self.hash = str(mac) + str(port)+ str(state)+ str(service) +#------------------------------------------------------------------------------- +def parse_kv_args(raw_args): + """ + Converts ['ips=a,b,c', 'macs=x,y,z', 'timeout=5'] to a dict. + Ignores unknown keys. + """ + parsed = {} + + for item in raw_args: + if '=' not in item: + mylog('none', [f"[{pluginName}] Scan: Invalid parameter (missing '='): {item}"]) + + key, value = item.split('=', 1) + + if key in parsed: + mylog('none', [f"[{pluginName}] Scan: Duplicate parameter supplied: {key}"]) + + parsed[key] = value + + return parsed + +#------------------------------------------------------------------------------- +def safe_split_list(value, keyname): + """Split comma list safely and ensure no empty items.""" + items = [x.strip() for x in value.split(',') if x.strip()] + if not items: + mylog('none', [f"[{pluginName}] Scan: {keyname} list is empty or invalid"]) + return items + #------------------------------------------------------------------------------- def performNmapScan(deviceIPs, deviceMACs, timeoutSec, args): """