From 0cb4e854e5db3cde270c2923bbaba4e72d167266 Mon Sep 17 00:00:00 2001 From: "James R. Barlow" Date: Fri, 17 May 2019 00:48:06 -0700 Subject: [PATCH] Replace ProcessPoolExecutor with multiprocessing.Pool There seems to be no reasonable way to handle Ctrl-C with a ProcessPoolExecutor. Or at least you have to press it several times to actually kill. Pool does the job. --- src/ocrmypdf/_sync.py | 92 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 74 insertions(+), 18 deletions(-) diff --git a/src/ocrmypdf/_sync.py b/src/ocrmypdf/_sync.py index 2322bb52..a5cec90d 100644 --- a/src/ocrmypdf/_sync.py +++ b/src/ocrmypdf/_sync.py @@ -18,6 +18,12 @@ import os import atexit import concurrent.futures +import logging +import logging.handlers +import multiprocessing +import threading +import sys +import signal from collections import namedtuple from tempfile import mkdtemp from ._jobcontext import PDFContext, get_logger, cleanup_working_files @@ -141,37 +147,84 @@ def post_process(pdf_file, context): return optimize_pdf(pdf_out, context) +def worker_init(queue): + """Initialize a process pool worker""" + + # Ignore SIGINT (our parent process will kill us gracefully) + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Reconfigure the root logger for this process to send all messages to a queue + h = logging.handlers.QueueHandler(queue) + root = logging.getLogger() + root.handlers = [] + root.addHandler(h) + + +def log_listener(queue): + """Listen to the worker processes and forward the messages to logging + + For simplicity this is a thread rather than a process. Only one process + should actually write to sys.stderr or whatever we're using, so if this is + made into a process the main application needs to be directed to it. + + See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes + """ + + while True: + try: + record = queue.get() + if record is None: + break + logger = logging.getLogger(record.name) + logger.handle(record) + except Exception: + import sys, traceback + + print("Logging problem", file=sys.stderr) + traceback.print_exc(file=sys.stderr) + + def exec_concurrent(context): - """Execute the pipeline concurrent""" + """Execute the pipeline concurrently""" # Run exec_page_sync on every page context max_workers = min(len(context.pdfinfo), context.options.jobs) if max_workers > 1: context.log.info("Start processing %d pages concurrent" % max_workers) - sidecars = {} - layers = [] + sidecars = [None] * len(context.pdfinfo) ocrgraft = OcrGrafter(context) + + log_queue = multiprocessing.Queue(-1) + listener = threading.Thread(target=log_listener, args=(log_queue,)) + listener.start() with tqdm( total=(2 * len(context.pdfinfo)), desc='OCR', unit='page', unit_scale=0.5 - ) as pbar, concurrent.futures.ProcessPoolExecutor( - max_workers=max_workers - ) as executor: - # layers = executor.map(exec_page_sync, context.get_page_contexts()) - futures = [ - executor.submit(exec_page_sync, ctx) for ctx in context.get_page_contexts() - ] - for future in concurrent.futures.as_completed(futures): - page_result = future.result() - sidecars[page_result.pageno] = page_result.text - pbar.update() - ocrgraft.graft_page(page_result) - pbar.update() + ) as pbar, multiprocessing.Pool( + processes=max_workers, initializer=worker_init, initargs=(log_queue,) + ) as pool: + results = pool.imap_unordered(exec_page_sync, context.get_page_contexts()) + while True: + try: + page_result = results.next() + sidecars[page_result.pageno] = page_result.text + pbar.update() + ocrgraft.graft_page(page_result) + pbar.update() + except StopIteration: + break + except (Exception, KeyboardInterrupt): + pool.terminate() + log_queue.put_nowait(None) # Terminate log listener + # Don't try listener.join() here, will deadlock + raise + + log_queue.put_nowait(None) + listener.join() # Output sidecar text if context.options.sidecar: - ordered_sidecars = [sidecars[pageno] for pageno in sorted(sidecars)] - text = merge_sidecars(ordered_sidecars, context) + text = merge_sidecars(sidecars, context) # Copy text file to destination copy_final(text, context.options.sidecar, context) @@ -233,6 +286,9 @@ def run_pipeline(options): # Execute the pipeline exec_concurrent(context) + except KeyboardInterrupt as e: + log.error("KeyboardInterrupt") + return ExitCode.ctrl_c except ExitCodeException as e: log.error("%s: %s" % (type(e).__name__, str(e))) return e.exit_code