mirror of
https://github.com/ocrmypdf/OCRmyPDF.git
synced 2026-06-10 23:17:00 -04:00
When Ghostscript reports a DeviceN colorspace with an inappropriate alternate, the resulting PDF/A may render blank in viewers such as Adobe Reader (#1187). The error is gated on that Ghostscript warning, which is the authoritative signal that the *output* is broken. Previously the error message always told the user to "use --color-conversion-strategy", which is confusing when they already set one and it didn't help. Crucially, the warning persists for strategies that don't actually normalize the colorspace -- notably UseDeviceIndependentColor (confirmed in #1187) -- so silencing the error for any non-default strategy would emit a silently-broken PDF/A. Keep raising whenever Ghostscript still reports the warning, regardless of strategy, but tailor the guidance: if no conversion was requested, suggest RGB/CMYK/Gray; if a conversion was requested but the warning persisted, say so and point at strategies that work or --output-type pdf. Add unit tests (mocked Ghostscript) covering the default case, the warning-persists-despite-strategy case for both an ineffective strategy and a normally-effective one, and the no-warning happy path.
678 lines
25 KiB
Python
678 lines
25 KiB
Python
# SPDX-FileCopyrightText: 2022 James R. Barlow
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
from decimal import Decimal
|
|
from unittest.mock import patch
|
|
|
|
import pikepdf
|
|
import pytest
|
|
from packaging.version import Version
|
|
from PIL import Image, UnidentifiedImageError
|
|
|
|
from ocrmypdf._exec import ghostscript
|
|
from ocrmypdf._exec.ghostscript import DuplicateFilter, rasterize_pdf
|
|
from ocrmypdf.builtin_plugins.ghostscript import _repair_gs106_jpeg_corruption
|
|
from ocrmypdf.exceptions import ColorConversionNeededError, ExitCode, InputFileError
|
|
from ocrmypdf.helpers import Resolution
|
|
from ocrmypdf.pluginspec import GhostscriptRasterDevice
|
|
|
|
from .conftest import check_ocrmypdf, run_ocrmypdf_api
|
|
|
|
# pylint: disable=redefined-outer-name
|
|
|
|
|
|
@pytest.fixture
|
|
def francais(resources):
|
|
path = resources / 'francais.pdf'
|
|
return path, pikepdf.open(path)
|
|
|
|
|
|
def test_rasterize_size(francais, outdir):
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (page_size_pts[0] / Decimal(72), page_size_pts[1] / Decimal(72))
|
|
target_size = Decimal('50.0'), Decimal('30.0')
|
|
forced_dpi = Resolution(42.0, 4242.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_rasterize_rotated(francais, outdir, caplog):
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (page_size_pts[0] / Decimal(72), page_size_pts[1] / Decimal(72))
|
|
target_size = Decimal('50.0'), Decimal('30.0')
|
|
forced_dpi = Resolution(42.0, 4242.0)
|
|
|
|
caplog.set_level(logging.DEBUG)
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
rotation=90,
|
|
)
|
|
|
|
with Image.open(outdir / 'out.png') as im:
|
|
assert im.size == (target_size[1], target_size[0])
|
|
assert im.info['dpi'] == forced_dpi.flip_axis()
|
|
|
|
|
|
def test_rasterize_low_dpi(francais, outdir):
|
|
"""Test that very low DPI values (below 10) produce correctly sized output.
|
|
|
|
Ghostscript may fail with DPI values below 10. The workaround renders at
|
|
a minimum of 10 DPI and resizes the output to match the expected dimensions.
|
|
"""
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (float(page_size_pts[0]) / 72, float(page_size_pts[1]) / 72)
|
|
|
|
# Request a very small output (DPI below 10 on both axes)
|
|
target_size = (5, 3)
|
|
forced_dpi = Resolution(72.0, 72.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out_low_dpi.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out_low_dpi.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_rasterize_low_dpi_one_axis(francais, outdir):
|
|
"""Test low DPI on only one axis produces correctly sized output."""
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (float(page_size_pts[0]) / 72, float(page_size_pts[1]) / 72)
|
|
|
|
# Request low DPI on X axis only (below 10), normal on Y axis
|
|
target_size = (5, 50)
|
|
forced_dpi = Resolution(72.0, 72.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out_low_dpi_x.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out_low_dpi_x.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_generate_pdfa_default_jpeg_quality(outdir):
|
|
"""When jpeg_quality is None, Ghostscript receives -dJPEGQ=95 (default)."""
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='auto',
|
|
color_conversion_strategy='LeaveColorUnchanged',
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=95' in args
|
|
# No downsample switches when jpeg_maxdpi is not set
|
|
assert not any(a.startswith('-dDownsampleColorImages') for a in args)
|
|
assert not any(a.startswith('-dColorImageResolution') for a in args)
|
|
|
|
|
|
def test_generate_pdfa_uses_user_jpeg_quality(outdir):
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='jpeg',
|
|
color_conversion_strategy='RGB',
|
|
jpeg_quality=72,
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=72' in args
|
|
assert '-dJPEGQ=95' not in args
|
|
|
|
|
|
def test_generate_pdfa_jpeg_quality_zero_is_max_compression(outdir):
|
|
"""Explicit jpeg_quality=0 must reach Ghostscript as -dJPEGQ=0.
|
|
|
|
Ghostscript accepts 0 as a valid quality value (maximum compression);
|
|
it must not be silently replaced by the default 95.
|
|
"""
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='jpeg',
|
|
color_conversion_strategy='RGB',
|
|
jpeg_quality=0,
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=0' in args
|
|
assert '-dJPEGQ=95' not in args
|
|
|
|
|
|
def test_generate_pdfa_honors_jpeg_maxdpi(outdir):
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='auto',
|
|
color_conversion_strategy='LeaveColorUnchanged',
|
|
jpeg_maxdpi=300,
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=95' in args
|
|
assert '-dDownsampleColorImages=true' in args
|
|
assert '-dColorImageDownsampleThreshold=1.0' in args
|
|
assert '-dDownsampleGrayImages=true' in args
|
|
assert '-dGrayImageDownsampleThreshold=1.0' in args
|
|
assert '-dDownsampleMonoImages=true' in args
|
|
assert '-dMonoImageDownsampleThreshold=1.0' in args
|
|
assert '-dColorImageResolution=300' in args
|
|
assert '-dGrayImageResolution=300' in args
|
|
assert '-dMonoImageResolution=300' in args
|
|
|
|
|
|
def test_ghostscript_jpeg_options_via_cli(resources, outpdf):
|
|
"""End-to-end: CLI flags reach the ghostscript plugin namespace."""
|
|
with patch(
|
|
'ocrmypdf._exec.ghostscript.generate_pdfa',
|
|
wraps=ghostscript.generate_pdfa,
|
|
) as gen_mock:
|
|
run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa',
|
|
'--ghostscript-jpeg-quality',
|
|
'60',
|
|
'--ghostscript-jpeg-maxdpi',
|
|
'150',
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
)
|
|
assert gen_mock.called
|
|
call_kwargs = gen_mock.call_args.kwargs
|
|
assert call_kwargs['jpeg_quality'] == 60
|
|
assert call_kwargs['jpeg_maxdpi'] == 150
|
|
|
|
|
|
def test_gs_render_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'blank.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa', # Required to trigger Ghostscript PDF/A generation
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_render_failure.py',
|
|
)
|
|
assert 'TEST ERROR: gs_render_failure.py' in caplog.text
|
|
assert exitcode == ExitCode.child_process_error
|
|
|
|
|
|
def test_gs_raster_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_raster_failure.py',
|
|
)
|
|
assert 'TEST ERROR: gs_raster_failure.py' in caplog.text
|
|
assert exitcode == ExitCode.child_process_error
|
|
|
|
|
|
def test_ghostscript_pdfa_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa', # Required to trigger Ghostscript PDF/A generation
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_pdfa_failure.py',
|
|
)
|
|
assert exitcode == ExitCode.pdfa_conversion_failed, (
|
|
"Unexpected return when PDF/A fails"
|
|
)
|
|
|
|
|
|
def test_ghostscript_feature_elision(resources, outpdf):
|
|
check_ocrmypdf(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_feature_elision.py',
|
|
)
|
|
|
|
|
|
def test_ghostscript_mandatory_color_conversion(resources, outpdf):
|
|
with pytest.raises(ColorConversionNeededError):
|
|
check_ocrmypdf(
|
|
resources / 'jbig2_baddevicen.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa', # Required to trigger Ghostscript PDF/A generation
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
)
|
|
|
|
|
|
def _run_generate_pdfa_with_devicen_warning(outdir, color_conversion_strategy):
|
|
"""Invoke generate_pdfa with Ghostscript mocked to emit the DeviceN warning.
|
|
|
|
Ghostscript emits this warning when it writes a DeviceN colorspace with an
|
|
inappropriate alternate, i.e. when it could not normalize the colorspace for
|
|
PDF/A. The output is then liable to render blank in viewers such as Adobe
|
|
Reader (see issue #1187), regardless of which conversion strategy was
|
|
requested.
|
|
"""
|
|
(outdir / 'input.pdf').write_bytes(b'%PDF-1.5\n%fake\n')
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'],
|
|
returncode=0,
|
|
stdout='',
|
|
stderr='Attempting to write a DeviceN space with an inappropriate '
|
|
'alternate, reverting to the alternate color space.',
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='auto',
|
|
color_conversion_strategy=color_conversion_strategy,
|
|
)
|
|
|
|
|
|
def test_devicen_warning_default_strategy_raises_with_guidance(outdir):
|
|
"""Default (no conversion): raise and tell the user to pick a strategy."""
|
|
with pytest.raises(ColorConversionNeededError) as exc_info:
|
|
_run_generate_pdfa_with_devicen_warning(outdir, 'LeaveColorUnchanged')
|
|
message = str(exc_info.value)
|
|
assert '--color-conversion-strategy' in message
|
|
assert 'RGB' in message
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'strategy',
|
|
[
|
|
# A strategy that genuinely cannot fix the colorspace; confirmed in #1187.
|
|
'UseDeviceIndependentColor',
|
|
# A normally-effective strategy that nonetheless failed on this input:
|
|
# if Ghostscript still warns, the output is still broken and we must not
|
|
# silently pass it through (the behaviour PR #1692 would have introduced).
|
|
'RGB',
|
|
],
|
|
)
|
|
def test_devicen_warning_persists_despite_strategy_still_raises(outdir, strategy):
|
|
"""If the warning survives the requested conversion, the output is broken.
|
|
|
|
We must still raise rather than silently emit a PDF/A that may render blank.
|
|
The guidance should acknowledge that the chosen strategy did not work and
|
|
point at strategies that do (or --output-type pdf).
|
|
"""
|
|
with pytest.raises(ColorConversionNeededError) as exc_info:
|
|
_run_generate_pdfa_with_devicen_warning(outdir, strategy)
|
|
message = str(exc_info.value)
|
|
assert strategy in message
|
|
assert '--output-type pdf' in message
|
|
|
|
|
|
def test_no_devicen_warning_does_not_raise(outdir):
|
|
"""When Ghostscript does not warn, conversion succeeded; never raise."""
|
|
(outdir / 'input.pdf').write_bytes(b'%PDF-1.5\n%fake\n')
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
# Must not raise for any strategy when there is no DeviceN warning.
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='auto',
|
|
color_conversion_strategy='RGB',
|
|
)
|
|
|
|
|
|
def test_rasterize_pdf_errors(resources, no_outpdf, caplog):
|
|
with patch('ocrmypdf._exec.ghostscript.run') as mock:
|
|
# ghostscript can produce empty files with return code 0
|
|
mock.return_value = subprocess.CompletedProcess(
|
|
['fakegs'], returncode=0, stdout=b'', stderr=b'error this is an error'
|
|
)
|
|
with pytest.raises(UnidentifiedImageError):
|
|
rasterize_pdf(
|
|
resources / 'francais.pdf',
|
|
no_outpdf,
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(100, 100),
|
|
)
|
|
assert "this is an error" in caplog.text
|
|
assert "invalid page image file" in caplog.text
|
|
|
|
|
|
class TestDuplicateFilter:
|
|
@pytest.fixture(scope='function')
|
|
def duplicate_filter_logger(self):
|
|
# token_urlsafe: ensure the logger has a unique name so tests are isolated
|
|
logger = logging.getLogger(__name__ + secrets.token_urlsafe(8))
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addFilter(DuplicateFilter(logger))
|
|
return logger
|
|
|
|
@pytest.mark.xfail(
|
|
(3, 13, 3) <= sys.version_info[:3] <= (3, 13, 5),
|
|
reason="https://github.com/python/cpython/pull/135858",
|
|
)
|
|
def test_filter_duplicate_messages(self, duplicate_filter_logger, caplog):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 5
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "(suppressed 2 repeated lines)"
|
|
assert caplog.records[2].msg == "another error message"
|
|
assert caplog.records[3].msg == "(suppressed 1 repeated lines)"
|
|
assert caplog.records[4].msg == "yet another error message"
|
|
|
|
def test_filter_does_not_affect_unique_messages(
|
|
self, duplicate_filter_logger, caplog
|
|
):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 3
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "another error message"
|
|
assert caplog.records[2].msg == "yet another error message"
|
|
|
|
@pytest.mark.xfail(
|
|
(3, 13, 3) <= sys.version_info[:3] <= (3, 13, 5),
|
|
reason="https://github.com/python/cpython/pull/135858",
|
|
)
|
|
def test_filter_alt_messages(self, duplicate_filter_logger, caplog):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 4
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "another error message"
|
|
assert caplog.records[2].msg == "(suppressed 5 repeated lines)"
|
|
assert caplog.records[3].msg == "yet another error message"
|
|
|
|
|
|
@pytest.fixture
|
|
def pdf_with_invalid_image(outdir):
|
|
# issue 1451
|
|
Name = pikepdf.Name
|
|
pdf = pikepdf.new()
|
|
pdf.add_blank_page()
|
|
pdf.pages[0].Contents = pdf.make_stream(b'612 0 0 612 0 0 cm /Image Do')
|
|
# Create an invalid image object that has both ColorSpace and ImageMask set
|
|
pdf.pages[0].Resources = pikepdf.Dictionary(
|
|
XObject=pdf.make_indirect(
|
|
pikepdf.Dictionary(
|
|
Image=pdf.make_stream(
|
|
b"\xf0\x0f" * 8,
|
|
ColorSpace=Name.DeviceGray,
|
|
BitsPerComponent=1,
|
|
Width=8,
|
|
Height=8,
|
|
ImageMask=True,
|
|
Subtype=Name.Image,
|
|
Type=Name.XObject,
|
|
)
|
|
)
|
|
)
|
|
)
|
|
pdf.save(outdir / 'invalid_image.pdf')
|
|
pdf.save('invalid_image.pdf')
|
|
return outdir / 'invalid_image.pdf'
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
ghostscript.version() < Version('10.04.0'),
|
|
reason="Older Ghostscript behavior is different",
|
|
)
|
|
def test_recoverable_image_error(pdf_with_invalid_image, outdir, caplog):
|
|
# When stop_on_error is False, we expect Ghostscript to print an error
|
|
# but continue
|
|
rasterize_pdf(
|
|
outdir / 'invalid_image.pdf',
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(10, 10),
|
|
stop_on_error=False,
|
|
)
|
|
assert 'Image has both ImageMask and ColorSpace' in caplog.text
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
ghostscript.version() < Version('10.04.0'),
|
|
reason="Older Ghostscript behavior is different",
|
|
)
|
|
def test_recoverable_image_error_with_stop(pdf_with_invalid_image, outdir, caplog):
|
|
# When stop_on_error is True, Ghostscript will print an error and exit
|
|
# but still produce a viable image. We intercept this case and raise
|
|
# InputFileError because it will contain an image of the whole page minus
|
|
# the image we are rendering.
|
|
with pytest.raises(
|
|
InputFileError, match="Try using --continue-on-soft-render-error"
|
|
):
|
|
rasterize_pdf(
|
|
outdir / 'invalid_image.pdf',
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(100, 100),
|
|
stop_on_error=True,
|
|
)
|
|
# out2.png will not be created; if it were it would be blank.
|
|
|
|
|
|
class TestGs106JpegCorruptionRepair:
|
|
"""Test the Ghostscript 10.6 JPEG corruption repair function."""
|
|
|
|
@pytest.fixture
|
|
def create_damaged_pdf(self, resources, outdir):
|
|
"""Create a damaged PDF by truncating JPEG data by 2 bytes."""
|
|
|
|
def _create_damaged(source_pdf_name='francais.pdf', truncate_bytes=2):
|
|
source_path = resources / source_pdf_name
|
|
damaged_path = outdir / 'damaged.pdf'
|
|
|
|
with pikepdf.open(source_path) as pdf:
|
|
# Find and truncate DCTDecode images
|
|
Name = pikepdf.Name
|
|
damaged_count = 0
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
# Truncate the JPEG data
|
|
original_bytes = obj.read_raw_bytes()
|
|
truncated_bytes = original_bytes[:-truncate_bytes]
|
|
obj.write(truncated_bytes, filter=Name.DCTDecode)
|
|
damaged_count += 1
|
|
|
|
pdf.save(damaged_path)
|
|
return source_path, damaged_path, damaged_count
|
|
|
|
return _create_damaged
|
|
|
|
def test_repair_truncated_jpeg(self, create_damaged_pdf, caplog):
|
|
"""Test that truncated JPEG images are repaired."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path, damaged_path, damaged_count = create_damaged_pdf()
|
|
|
|
assert damaged_count > 0, "Test PDF should have DCTDecode images"
|
|
|
|
# Get original image bytes for comparison
|
|
with pikepdf.open(source_path) as pdf:
|
|
Name = pikepdf.Name
|
|
original_bytes_list = []
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
original_bytes_list.append(obj.read_raw_bytes())
|
|
|
|
# Run the repair function
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, damaged_path)
|
|
assert repaired is True, "Repair should have been performed"
|
|
|
|
# Verify the repaired PDF has correct image bytes
|
|
with pikepdf.open(damaged_path) as pdf:
|
|
Name = pikepdf.Name
|
|
repaired_bytes_list = []
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
repaired_bytes_list.append(obj.read_raw_bytes())
|
|
|
|
assert len(repaired_bytes_list) == len(original_bytes_list)
|
|
for orig, repaired_bytes in zip(
|
|
original_bytes_list, repaired_bytes_list, strict=False
|
|
):
|
|
assert orig == repaired_bytes, "Repaired bytes should match original"
|
|
|
|
# Check that error/warning was logged
|
|
assert "JPEG corruption detected" in caplog.text
|
|
|
|
def test_no_repair_when_not_truncated(self, resources, outdir, caplog):
|
|
"""Test that no repair is done when images are not truncated."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path = resources / 'francais.pdf'
|
|
|
|
# Copy source to output (no damage)
|
|
output_path = outdir / 'undamaged.pdf'
|
|
with pikepdf.open(source_path) as pdf:
|
|
pdf.save(output_path)
|
|
|
|
# Run the repair function - should not repair anything
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, output_path)
|
|
assert repaired is False, "No repair should have been performed"
|
|
assert "JPEG corruption detected" not in caplog.text
|
|
|
|
def test_no_repair_when_truncation_too_large(self, create_damaged_pdf, caplog):
|
|
"""Test that images truncated by more than 15 bytes are not repaired."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path, damaged_path, _ = create_damaged_pdf(truncate_bytes=20)
|
|
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, damaged_path)
|
|
assert repaired is False, "Should not repair truncation > 15 bytes"
|
|
assert "JPEG corruption detected" not in caplog.text
|