mirror of
https://github.com/ocrmypdf/OCRmyPDF.git
synced 2025-12-23 22:28:05 -05:00
Ghostscript 10.6 has a bug that truncates JPEG data by 1-15 bytes. This adds detection and repair by comparing output images to input images and restoring the original bytes when truncation is detected. - Add warning when GS 10.6+ is used with PDF/A output - Add _repair_gs106_jpeg_corruption() to fix damaged JPEGs after Ghostscript processing - Add unit tests for the repair function
408 lines
15 KiB
Python
408 lines
15 KiB
Python
# SPDX-FileCopyrightText: 2022 James R. Barlow
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
from decimal import Decimal
|
|
from unittest.mock import patch
|
|
|
|
import pikepdf
|
|
import pytest
|
|
from packaging.version import Version
|
|
from PIL import Image, UnidentifiedImageError
|
|
|
|
from ocrmypdf._exec import ghostscript
|
|
from ocrmypdf._exec.ghostscript import DuplicateFilter, rasterize_pdf
|
|
from ocrmypdf.builtin_plugins.ghostscript import _repair_gs106_jpeg_corruption
|
|
from ocrmypdf.exceptions import ColorConversionNeededError, ExitCode, InputFileError
|
|
from ocrmypdf.helpers import Resolution
|
|
|
|
from .conftest import check_ocrmypdf, run_ocrmypdf_api
|
|
|
|
# pylint: disable=redefined-outer-name
|
|
|
|
|
|
@pytest.fixture
|
|
def francais(resources):
|
|
path = resources / 'francais.pdf'
|
|
return path, pikepdf.open(path)
|
|
|
|
|
|
def test_rasterize_size(francais, outdir):
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (page_size_pts[0] / Decimal(72), page_size_pts[1] / Decimal(72))
|
|
target_size = Decimal('50.0'), Decimal('30.0')
|
|
forced_dpi = Resolution(42.0, 4242.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out.png',
|
|
raster_device='pngmono',
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_rasterize_rotated(francais, outdir, caplog):
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (page_size_pts[0] / Decimal(72), page_size_pts[1] / Decimal(72))
|
|
target_size = Decimal('50.0'), Decimal('30.0')
|
|
forced_dpi = Resolution(42.0, 4242.0)
|
|
|
|
caplog.set_level(logging.DEBUG)
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out.png',
|
|
raster_device='pngmono',
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
rotation=90,
|
|
)
|
|
|
|
with Image.open(outdir / 'out.png') as im:
|
|
assert im.size == (target_size[1], target_size[0])
|
|
assert im.info['dpi'] == forced_dpi.flip_axis()
|
|
|
|
|
|
def test_gs_render_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'blank.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_render_failure.py',
|
|
)
|
|
assert 'TEST ERROR: gs_render_failure.py' in caplog.text
|
|
assert exitcode == ExitCode.child_process_error
|
|
|
|
|
|
def test_gs_raster_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_raster_failure.py',
|
|
)
|
|
assert 'TEST ERROR: gs_raster_failure.py' in caplog.text
|
|
assert exitcode == ExitCode.child_process_error
|
|
|
|
|
|
def test_ghostscript_pdfa_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_pdfa_failure.py',
|
|
)
|
|
assert (
|
|
exitcode == ExitCode.pdfa_conversion_failed
|
|
), "Unexpected return when PDF/A fails"
|
|
|
|
|
|
def test_ghostscript_feature_elision(resources, outpdf):
|
|
check_ocrmypdf(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_feature_elision.py',
|
|
)
|
|
|
|
|
|
def test_ghostscript_mandatory_color_conversion(resources, outpdf):
|
|
with pytest.raises(ColorConversionNeededError):
|
|
check_ocrmypdf(
|
|
resources / 'jbig2_baddevicen.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
)
|
|
|
|
|
|
def test_rasterize_pdf_errors(resources, no_outpdf, caplog):
|
|
with patch('ocrmypdf._exec.ghostscript.run') as mock:
|
|
# ghostscript can produce empty files with return code 0
|
|
mock.return_value = subprocess.CompletedProcess(
|
|
['fakegs'], returncode=0, stdout=b'', stderr=b'error this is an error'
|
|
)
|
|
with pytest.raises(UnidentifiedImageError):
|
|
rasterize_pdf(
|
|
resources / 'francais.pdf',
|
|
no_outpdf,
|
|
raster_device='pngmono',
|
|
raster_dpi=Resolution(100, 100),
|
|
)
|
|
assert "this is an error" in caplog.text
|
|
assert "invalid page image file" in caplog.text
|
|
|
|
|
|
class TestDuplicateFilter:
|
|
@pytest.fixture(scope='function')
|
|
def duplicate_filter_logger(self):
|
|
# token_urlsafe: ensure the logger has a unique name so tests are isolated
|
|
logger = logging.getLogger(__name__ + secrets.token_urlsafe(8))
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addFilter(DuplicateFilter(logger))
|
|
return logger
|
|
|
|
@pytest.mark.xfail(
|
|
(3, 13, 3) <= sys.version_info[:3] <= (3, 13, 5),
|
|
reason="https://github.com/python/cpython/pull/135858",
|
|
)
|
|
def test_filter_duplicate_messages(self, duplicate_filter_logger, caplog):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 5
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "(suppressed 2 repeated lines)"
|
|
assert caplog.records[2].msg == "another error message"
|
|
assert caplog.records[3].msg == "(suppressed 1 repeated lines)"
|
|
assert caplog.records[4].msg == "yet another error message"
|
|
|
|
def test_filter_does_not_affect_unique_messages(
|
|
self, duplicate_filter_logger, caplog
|
|
):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 3
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "another error message"
|
|
assert caplog.records[2].msg == "yet another error message"
|
|
|
|
@pytest.mark.xfail(
|
|
(3, 13, 3) <= sys.version_info[:3] <= (3, 13, 5),
|
|
reason="https://github.com/python/cpython/pull/135858",
|
|
)
|
|
def test_filter_alt_messages(self, duplicate_filter_logger, caplog):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 4
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "another error message"
|
|
assert caplog.records[2].msg == "(suppressed 5 repeated lines)"
|
|
assert caplog.records[3].msg == "yet another error message"
|
|
|
|
|
|
@pytest.fixture
|
|
def pdf_with_invalid_image(outdir):
|
|
# issue 1451
|
|
Name = pikepdf.Name
|
|
pdf = pikepdf.new()
|
|
pdf.add_blank_page()
|
|
pdf.pages[0].Contents = pdf.make_stream(b'612 0 0 612 0 0 cm /Image Do')
|
|
# Create an invalid image object that has both ColorSpace and ImageMask set
|
|
pdf.pages[0].Resources = pikepdf.Dictionary(
|
|
XObject=pdf.make_indirect(
|
|
pikepdf.Dictionary(
|
|
Image=pdf.make_stream(
|
|
b"\xf0\x0f" * 8,
|
|
ColorSpace=Name.DeviceGray,
|
|
BitsPerComponent=1,
|
|
Width=8,
|
|
Height=8,
|
|
ImageMask=True,
|
|
Subtype=Name.Image,
|
|
Type=Name.XObject,
|
|
)
|
|
)
|
|
)
|
|
)
|
|
pdf.save(outdir / 'invalid_image.pdf')
|
|
pdf.save('invalid_image.pdf')
|
|
return outdir / 'invalid_image.pdf'
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
ghostscript.version() < Version('10.04.0'),
|
|
reason="Older Ghostscript behavior is different",
|
|
)
|
|
def test_recoverable_image_error(pdf_with_invalid_image, outdir, caplog):
|
|
# When stop_on_error is False, we expect Ghostscript to print an error
|
|
# but continue
|
|
rasterize_pdf(
|
|
outdir / 'invalid_image.pdf',
|
|
outdir / 'out.png',
|
|
raster_device='pngmono',
|
|
raster_dpi=Resolution(10, 10),
|
|
stop_on_error=False,
|
|
)
|
|
assert 'Image has both ImageMask and ColorSpace' in caplog.text
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
ghostscript.version() < Version('10.04.0'),
|
|
reason="Older Ghostscript behavior is different",
|
|
)
|
|
def test_recoverable_image_error_with_stop(pdf_with_invalid_image, outdir, caplog):
|
|
# When stop_on_error is True, Ghostscript will print an error and exit
|
|
# but still produce a viable image. We intercept this case and raise
|
|
# InputFileError because it will contain an image of the whole page minus
|
|
# the image we are rendering.
|
|
with pytest.raises(
|
|
InputFileError, match="Try using --continue-on-soft-render-error"
|
|
):
|
|
rasterize_pdf(
|
|
outdir / 'invalid_image.pdf',
|
|
outdir / 'out.png',
|
|
raster_device='pngmono',
|
|
raster_dpi=Resolution(100, 100),
|
|
stop_on_error=True,
|
|
)
|
|
# out2.png will not be created; if it were it would be blank.
|
|
|
|
|
|
class TestGs106JpegCorruptionRepair:
|
|
"""Test the Ghostscript 10.6 JPEG corruption repair function."""
|
|
|
|
@pytest.fixture
|
|
def create_damaged_pdf(self, resources, outdir):
|
|
"""Create a damaged PDF by truncating JPEG data by 2 bytes."""
|
|
|
|
def _create_damaged(source_pdf_name='francais.pdf', truncate_bytes=2):
|
|
source_path = resources / source_pdf_name
|
|
damaged_path = outdir / 'damaged.pdf'
|
|
|
|
with pikepdf.open(source_path) as pdf:
|
|
# Find and truncate DCTDecode images
|
|
Name = pikepdf.Name
|
|
damaged_count = 0
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
# Truncate the JPEG data
|
|
original_bytes = obj.read_raw_bytes()
|
|
truncated_bytes = original_bytes[:-truncate_bytes]
|
|
obj.write(truncated_bytes, filter=Name.DCTDecode)
|
|
damaged_count += 1
|
|
|
|
pdf.save(damaged_path)
|
|
return source_path, damaged_path, damaged_count
|
|
|
|
return _create_damaged
|
|
|
|
def test_repair_truncated_jpeg(self, create_damaged_pdf, caplog):
|
|
"""Test that truncated JPEG images are repaired."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path, damaged_path, damaged_count = create_damaged_pdf()
|
|
|
|
assert damaged_count > 0, "Test PDF should have DCTDecode images"
|
|
|
|
# Get original image bytes for comparison
|
|
with pikepdf.open(source_path) as pdf:
|
|
Name = pikepdf.Name
|
|
original_bytes_list = []
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
original_bytes_list.append(obj.read_raw_bytes())
|
|
|
|
# Run the repair function
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, damaged_path)
|
|
assert repaired is True, "Repair should have been performed"
|
|
|
|
# Verify the repaired PDF has correct image bytes
|
|
with pikepdf.open(damaged_path) as pdf:
|
|
Name = pikepdf.Name
|
|
repaired_bytes_list = []
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
repaired_bytes_list.append(obj.read_raw_bytes())
|
|
|
|
assert len(repaired_bytes_list) == len(original_bytes_list)
|
|
for orig, repaired_bytes in zip(original_bytes_list, repaired_bytes_list):
|
|
assert orig == repaired_bytes, "Repaired bytes should match original"
|
|
|
|
# Check that error/warning was logged
|
|
assert "JPEG corruption detected" in caplog.text
|
|
|
|
def test_no_repair_when_not_truncated(self, resources, outdir, caplog):
|
|
"""Test that no repair is done when images are not truncated."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path = resources / 'francais.pdf'
|
|
|
|
# Copy source to output (no damage)
|
|
output_path = outdir / 'undamaged.pdf'
|
|
with pikepdf.open(source_path) as pdf:
|
|
pdf.save(output_path)
|
|
|
|
# Run the repair function - should not repair anything
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, output_path)
|
|
assert repaired is False, "No repair should have been performed"
|
|
assert "JPEG corruption detected" not in caplog.text
|
|
|
|
def test_no_repair_when_truncation_too_large(self, create_damaged_pdf, caplog):
|
|
"""Test that images truncated by more than 15 bytes are not repaired."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path, damaged_path, _ = create_damaged_pdf(truncate_bytes=20)
|
|
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, damaged_path)
|
|
assert repaired is False, "Should not repair truncation > 15 bytes"
|
|
assert "JPEG corruption detected" not in caplog.text
|