From 70e0542488dd60ec8998a0444e5dfe4d7cd6f39f Mon Sep 17 00:00:00 2001 From: jokob-sk Date: Fri, 15 Nov 2024 23:27:26 +1100 Subject: [PATCH] GraphQl 0.125 - Threading issues --- front/php/templates/language/ca_ca.json | 0 front/php/templates/language/ru_ru.json | 0 server/graphql_server/graphql_server_start.py | 3 +- server/logger.py | 132 ++++++++---------- server/plugin.py | 30 ++-- 5 files changed, 80 insertions(+), 85 deletions(-) mode change 100644 => 100755 front/php/templates/language/ca_ca.json mode change 100644 => 100755 front/php/templates/language/ru_ru.json diff --git a/front/php/templates/language/ca_ca.json b/front/php/templates/language/ca_ca.json old mode 100644 new mode 100755 diff --git a/front/php/templates/language/ru_ru.json b/front/php/templates/language/ru_ru.json old mode 100644 new mode 100755 diff --git a/server/graphql_server/graphql_server_start.py b/server/graphql_server/graphql_server_start.py index 7efc59fa..fcb43360 100755 --- a/server/graphql_server/graphql_server_start.py +++ b/server/graphql_server/graphql_server_start.py @@ -41,12 +41,13 @@ def graphql_endpoint(): def start_server(): """Function to start the GraphQL server in a background thread.""" - mylog('verbose', [f'[graphql_server] Starting on port: {GRAPHQL_PORT}']) state = updateState("GraphQL: Starting", None, None, None, None) if state.graphQLServerStarted == 0: + mylog('verbose', [f'[graphql_server] Starting on port: {GRAPHQL_PORT}']) + # Start the Flask app in a separate thread thread = threading.Thread(target=lambda: app.run(host="0.0.0.0", port=GRAPHQL_PORT, debug=True, use_reloader=False)) thread.start() diff --git a/server/logger.py b/server/logger.py index fb1c42b7..2ddc1d3b 100755 --- a/server/logger.py +++ b/server/logger.py @@ -1,13 +1,11 @@ -""" Colection of functions to support all logging for NetAlertX """ import sys import io import datetime import threading +import queue import time - import conf from const import * -# from helper import get_setting_value #------------------------------------------------------------------------------- # duplication from helper to avoid circle @@ -18,17 +16,15 @@ def timeNowTZ(): else: return datetime.datetime.now().replace(microsecond=0) - #------------------------------------------------------------------------------- # More verbose as the numbers go up -debugLevels = [ - ('none', 0), ('minimal', 1), ('verbose', 2), ('debug', 3), ('trace', 4) - ] +debugLevels = [ + ('none', 0), ('minimal', 1), ('verbose', 2), ('debug', 3), ('trace', 4) +] currentLevel = 0 def mylog(requestedDebugLevel, n): - setLvl = 0 reqLvl = 0 @@ -43,100 +39,86 @@ def mylog(requestedDebugLevel, n): file_print (*n) #------------------------------------------------------------------------------- -def file_print (*args): +# Queue for log messages +log_queue = queue.Queue() - result = timeNowTZ().strftime ('%H:%M:%S') + ' ' - +# Dedicated thread for writing logs +log_thread = None # Will hold the thread reference + +def log_writer(): + while True: + log_entry = log_queue.get() + if log_entry is None: # Graceful exit signal + break + with open(logPath + "/app.log", 'a') as log_file: + log_file.write(log_entry + '\n') + +#------------------------------------------------------------------------------- +# Function to start the log writer thread if it doesn't exist +def start_log_writer_thread(): + global log_thread + if log_thread is None or not log_thread.is_alive(): + print("Starting log writer thread...") + log_thread = threading.Thread(target=log_writer, args=(), daemon=True) + log_thread.start() + +#------------------------------------------------------------------------------- +def file_print(*args): + result = timeNowTZ().strftime('%H:%M:%S') + ' ' + for arg in args: result += str(arg) print(result) - - append_to_file_with_timeout(logPath + "/app.log", result + '\n', 5) -#------------------------------------------------------------------------------- -# Function to append to the file -def append_to_file(file_path, data): - try: - # Open the file for appending - file = open(file_path, "a") - - # Write the data to the file - file.write(data) - - # Close the file - file.close() - except Exception as e: - print(f"Error appending to file: {e}") + # Ensure the log writer thread is running + start_log_writer_thread() + + # Queue the log entry for writing + append_to_file_with_timeout( result, 5) #------------------------------------------------------------------------------- # Function to append to the file with a timeout -def append_to_file_with_timeout(file_path, data, timeout): - # Create a thread for appending to the file - append_thread = threading.Thread(target=append_to_file, args=(file_path, data)) - - # Start the thread - append_thread.start() - - # Wait for the thread to complete or timeout - append_thread.join(timeout) - - # If the thread is still running, it has exceeded the timeout - if append_thread.is_alive(): - append_thread.join() # Optionally, you can force it to terminate - - # Handle the timeout here, e.g., log an error +def append_to_file_with_timeout(data, timeout): + try: + log_queue.put(data, timeout=timeout) + except queue.Full: print("Appending to file timed out") - - #------------------------------------------------------------------------------- -def print_log (pText): - - # Check LOG actived - if not conf.LOG_LEVEL == 'debug' : +def print_log(pText): + # Check if logging is active + if not conf.LOG_LEVEL == 'debug': return # Current Time log_timestamp2 = datetime.datetime.now(conf.tz).replace(microsecond=0) - # Print line + time + elapsed time + text - file_print ('[LOG_LEVEL=debug] ', - # log_timestamp2, ' ', - log_timestamp2.strftime ('%H:%M:%S'), ' ', - pText) - - + # Print line + time + text + file_print('[LOG_LEVEL=debug]', log_timestamp2.strftime('%H:%M:%S'), pText) return pText - - #------------------------------------------------------------------------------- -# textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f}) -# is_binary_string = lambda bytes: bool(bytes.translate(None, textchars)) - def append_file_binary(file_path, input_data): with open(file_path, 'ab') as file: if isinstance(input_data, str): input_data = input_data.encode('utf-8') # Encode string as bytes file.write(input_data) - +#------------------------------------------------------------------------------- +def logResult(stdout, stderr): + if stderr is not None: + append_file_binary(logPath + '/stderr.log', stderr) + if stdout is not None: + append_file_binary(logPath + '/stdout.log', stdout) #------------------------------------------------------------------------------- -def logResult (stdout, stderr): - if stderr != None: - append_file_binary (logPath + '/stderr.log', stderr) - if stdout != None: - append_file_binary (logPath + '/stdout.log', stdout) - -#------------------------------------------------------------------------------- -def append_line_to_file (pPath, pText): - # append the line depending using the correct python version +def append_line_to_file(pPath, pText): + # append the line using the correct python version if sys.version_info < (3, 0): - file = io.open (pPath , mode='a', encoding='utf-8') - file.write ( pText.decode('unicode_escape') ) + file = io.open(pPath, mode='a', encoding='utf-8') + file.write(pText.decode('unicode_escape')) file.close() else: - file = open (pPath, 'a', encoding='utf-8') - file.write (pText) - file.close() \ No newline at end of file + file = open(pPath, 'a', encoding='utf-8') + file.write(pText) + file.close() diff --git a/server/plugin.py b/server/plugin.py index bbdf5c30..27001f90 100755 --- a/server/plugin.py +++ b/server/plugin.py @@ -4,6 +4,7 @@ import json import subprocess import datetime import base64 +from concurrent.futures import ThreadPoolExecutor, as_completed from collections import namedtuple @@ -145,6 +146,18 @@ def run_plugin_scripts(db, all_plugins, runType, pluginsState = plugins_state()) +# Function to run a plugin command +def run_plugin(command, set_RUN_TIMEOUT): + try: + return subprocess.check_output(command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=set_RUN_TIMEOUT) + except subprocess.CalledProcessError as e: + mylog('none', [e.output]) + mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs']) + return None + except subprocess.TimeoutExpired as timeErr: + mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.']) + return None + #------------------------------------------------------------------------------- # Executes the plugin command specified in the setting with the function specified as CMD @@ -209,15 +222,14 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): mylog('verbose', ['[Plugins] Executing: ', set_CMD]) mylog('debug', ['[Plugins] Resolved : ', command]) - try: - # try running a subprocess with a forced timeout in case the subprocess hangs - output = subprocess.check_output(command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) - except subprocess.CalledProcessError as e: - # An error occurred, handle it - mylog('none', [e.output]) - mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs']) - except subprocess.TimeoutExpired as timeErr: - mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.']) + # Using ThreadPoolExecutor to handle concurrent subprocesses + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(run_plugin, command, set_RUN_TIMEOUT)] # Submit the command as a future + + for future in as_completed(futures): + output = future.result() # Get the output or error + if output is not None: + mylog('verbose', [f'[Plugins] Output: {output}']) # Initialize newLines newLines = []