Replace ProcessPoolExecutor with multiprocessing.Pool

There seems to be no reasonable way to handle Ctrl-C with a
ProcessPoolExecutor. Or at least you have to press it several times to
actually kill. Pool does the job.
This commit is contained in:
James R. Barlow
2019-05-17 00:48:06 -07:00
parent 13ab23ba54
commit 0cb4e854e5

View File

@@ -18,6 +18,12 @@
import os
import atexit
import concurrent.futures
import logging
import logging.handlers
import multiprocessing
import threading
import sys
import signal
from collections import namedtuple
from tempfile import mkdtemp
from ._jobcontext import PDFContext, get_logger, cleanup_working_files
@@ -141,37 +147,84 @@ def post_process(pdf_file, context):
return optimize_pdf(pdf_out, context)
def worker_init(queue):
"""Initialize a process pool worker"""
# Ignore SIGINT (our parent process will kill us gracefully)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Reconfigure the root logger for this process to send all messages to a queue
h = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.handlers = []
root.addHandler(h)
def log_listener(queue):
"""Listen to the worker processes and forward the messages to logging
For simplicity this is a thread rather than a process. Only one process
should actually write to sys.stderr or whatever we're using, so if this is
made into a process the main application needs to be directed to it.
See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
"""
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys, traceback
print("Logging problem", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def exec_concurrent(context):
"""Execute the pipeline concurrent"""
"""Execute the pipeline concurrently"""
# Run exec_page_sync on every page context
max_workers = min(len(context.pdfinfo), context.options.jobs)
if max_workers > 1:
context.log.info("Start processing %d pages concurrent" % max_workers)
sidecars = {}
layers = []
sidecars = [None] * len(context.pdfinfo)
ocrgraft = OcrGrafter(context)
log_queue = multiprocessing.Queue(-1)
listener = threading.Thread(target=log_listener, args=(log_queue,))
listener.start()
with tqdm(
total=(2 * len(context.pdfinfo)), desc='OCR', unit='page', unit_scale=0.5
) as pbar, concurrent.futures.ProcessPoolExecutor(
max_workers=max_workers
) as executor:
# layers = executor.map(exec_page_sync, context.get_page_contexts())
futures = [
executor.submit(exec_page_sync, ctx) for ctx in context.get_page_contexts()
]
for future in concurrent.futures.as_completed(futures):
page_result = future.result()
sidecars[page_result.pageno] = page_result.text
pbar.update()
ocrgraft.graft_page(page_result)
pbar.update()
) as pbar, multiprocessing.Pool(
processes=max_workers, initializer=worker_init, initargs=(log_queue,)
) as pool:
results = pool.imap_unordered(exec_page_sync, context.get_page_contexts())
while True:
try:
page_result = results.next()
sidecars[page_result.pageno] = page_result.text
pbar.update()
ocrgraft.graft_page(page_result)
pbar.update()
except StopIteration:
break
except (Exception, KeyboardInterrupt):
pool.terminate()
log_queue.put_nowait(None) # Terminate log listener
# Don't try listener.join() here, will deadlock
raise
log_queue.put_nowait(None)
listener.join()
# Output sidecar text
if context.options.sidecar:
ordered_sidecars = [sidecars[pageno] for pageno in sorted(sidecars)]
text = merge_sidecars(ordered_sidecars, context)
text = merge_sidecars(sidecars, context)
# Copy text file to destination
copy_final(text, context.options.sidecar, context)
@@ -233,6 +286,9 @@ def run_pipeline(options):
# Execute the pipeline
exec_concurrent(context)
except KeyboardInterrupt as e:
log.error("KeyboardInterrupt")
return ExitCode.ctrl_c
except ExitCodeException as e:
log.error("%s: %s" % (type(e).__name__, str(e)))
return e.exit_code