mirror of
https://github.com/ocrmypdf/OCRmyPDF.git
synced 2026-06-11 07:28:45 -04:00
Expose Ghostscript's -dJPEGQ and image downsampling switches as advanced, plugin-scoped options for tuning PDF/A output, without polluting the central OcrOptions registry. The optimizer's existing --jpeg-quality remains the recommended JPEG quality control. - GhostscriptOptions gains jpeg_quality and jpeg_maxdpi fields and CLI args (advanced help text). jpeg_quality=0 is honored as Ghostscript's maximum compression rather than being silently coerced to the default. - _exec.ghostscript.generate_pdfa() forwards both values; when jpeg_maxdpi is set, downsample threshold is pinned at 1.0. - _get_plugin_options falls back to extra_attrs for namespaced fields so plugins can own their options without registering them centrally. - Documentation explains the rationale: Ghostscript is the legacy path (pypdfium + verapdf is preferred in v17+), the optimizer is the supported file-size lever, and lowering quality is almost always a better trade than downsampling.
596 lines
22 KiB
Python
596 lines
22 KiB
Python
# SPDX-FileCopyrightText: 2022 James R. Barlow
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
from decimal import Decimal
|
|
from unittest.mock import patch
|
|
|
|
import pikepdf
|
|
import pytest
|
|
from packaging.version import Version
|
|
from PIL import Image, UnidentifiedImageError
|
|
|
|
from ocrmypdf._exec import ghostscript
|
|
from ocrmypdf._exec.ghostscript import DuplicateFilter, rasterize_pdf
|
|
from ocrmypdf.builtin_plugins.ghostscript import _repair_gs106_jpeg_corruption
|
|
from ocrmypdf.exceptions import ColorConversionNeededError, ExitCode, InputFileError
|
|
from ocrmypdf.helpers import Resolution
|
|
from ocrmypdf.pluginspec import GhostscriptRasterDevice
|
|
|
|
from .conftest import check_ocrmypdf, run_ocrmypdf_api
|
|
|
|
# pylint: disable=redefined-outer-name
|
|
|
|
|
|
@pytest.fixture
|
|
def francais(resources):
|
|
path = resources / 'francais.pdf'
|
|
return path, pikepdf.open(path)
|
|
|
|
|
|
def test_rasterize_size(francais, outdir):
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (page_size_pts[0] / Decimal(72), page_size_pts[1] / Decimal(72))
|
|
target_size = Decimal('50.0'), Decimal('30.0')
|
|
forced_dpi = Resolution(42.0, 4242.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_rasterize_rotated(francais, outdir, caplog):
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (page_size_pts[0] / Decimal(72), page_size_pts[1] / Decimal(72))
|
|
target_size = Decimal('50.0'), Decimal('30.0')
|
|
forced_dpi = Resolution(42.0, 4242.0)
|
|
|
|
caplog.set_level(logging.DEBUG)
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
rotation=90,
|
|
)
|
|
|
|
with Image.open(outdir / 'out.png') as im:
|
|
assert im.size == (target_size[1], target_size[0])
|
|
assert im.info['dpi'] == forced_dpi.flip_axis()
|
|
|
|
|
|
def test_rasterize_low_dpi(francais, outdir):
|
|
"""Test that very low DPI values (below 10) produce correctly sized output.
|
|
|
|
Ghostscript may fail with DPI values below 10. The workaround renders at
|
|
a minimum of 10 DPI and resizes the output to match the expected dimensions.
|
|
"""
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (float(page_size_pts[0]) / 72, float(page_size_pts[1]) / 72)
|
|
|
|
# Request a very small output (DPI below 10 on both axes)
|
|
target_size = (5, 3)
|
|
forced_dpi = Resolution(72.0, 72.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out_low_dpi.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out_low_dpi.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_rasterize_low_dpi_one_axis(francais, outdir):
|
|
"""Test low DPI on only one axis produces correctly sized output."""
|
|
path, pdf = francais
|
|
page_size_pts = (pdf.pages[0].mediabox[2], pdf.pages[0].mediabox[3])
|
|
assert pdf.pages[0].mediabox[0] == pdf.pages[0].mediabox[1] == 0
|
|
page_size = (float(page_size_pts[0]) / 72, float(page_size_pts[1]) / 72)
|
|
|
|
# Request low DPI on X axis only (below 10), normal on Y axis
|
|
target_size = (5, 50)
|
|
forced_dpi = Resolution(72.0, 72.0)
|
|
|
|
rasterize_pdf(
|
|
path,
|
|
outdir / 'out_low_dpi_x.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(
|
|
target_size[0] / page_size[0], target_size[1] / page_size[1]
|
|
),
|
|
page_dpi=forced_dpi,
|
|
)
|
|
|
|
with Image.open(outdir / 'out_low_dpi_x.png') as im:
|
|
assert im.size == target_size
|
|
assert im.info['dpi'] == forced_dpi
|
|
|
|
|
|
def test_generate_pdfa_default_jpeg_quality(outdir):
|
|
"""When jpeg_quality is None, Ghostscript receives -dJPEGQ=95 (default)."""
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='auto',
|
|
color_conversion_strategy='LeaveColorUnchanged',
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=95' in args
|
|
# No downsample switches when jpeg_maxdpi is not set
|
|
assert not any(a.startswith('-dDownsampleColorImages') for a in args)
|
|
assert not any(a.startswith('-dColorImageResolution') for a in args)
|
|
|
|
|
|
def test_generate_pdfa_uses_user_jpeg_quality(outdir):
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='jpeg',
|
|
color_conversion_strategy='RGB',
|
|
jpeg_quality=72,
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=72' in args
|
|
assert '-dJPEGQ=95' not in args
|
|
|
|
|
|
def test_generate_pdfa_jpeg_quality_zero_is_max_compression(outdir):
|
|
"""Explicit jpeg_quality=0 must reach Ghostscript as -dJPEGQ=0.
|
|
|
|
Ghostscript accepts 0 as a valid quality value (maximum compression);
|
|
it must not be silently replaced by the default 95.
|
|
"""
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='jpeg',
|
|
color_conversion_strategy='RGB',
|
|
jpeg_quality=0,
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=0' in args
|
|
assert '-dJPEGQ=95' not in args
|
|
|
|
|
|
def test_generate_pdfa_honors_jpeg_maxdpi(outdir):
|
|
with (
|
|
patch('ocrmypdf._exec.ghostscript.version', return_value=Version('10.05.1')),
|
|
patch('ocrmypdf._exec.ghostscript.run_polling_stderr') as run_mock,
|
|
):
|
|
run_mock.return_value = subprocess.CompletedProcess(
|
|
['gs'], returncode=0, stdout='', stderr=''
|
|
)
|
|
ghostscript.generate_pdfa(
|
|
pdf_pages=[outdir / 'input.pdf'],
|
|
output_file=outdir / 'out.pdf',
|
|
compression='auto',
|
|
color_conversion_strategy='LeaveColorUnchanged',
|
|
jpeg_maxdpi=300,
|
|
)
|
|
|
|
args = run_mock.call_args.args[0]
|
|
assert '-dJPEGQ=95' in args
|
|
assert '-dDownsampleColorImages=true' in args
|
|
assert '-dColorImageDownsampleThreshold=1.0' in args
|
|
assert '-dDownsampleGrayImages=true' in args
|
|
assert '-dGrayImageDownsampleThreshold=1.0' in args
|
|
assert '-dDownsampleMonoImages=true' in args
|
|
assert '-dMonoImageDownsampleThreshold=1.0' in args
|
|
assert '-dColorImageResolution=300' in args
|
|
assert '-dGrayImageResolution=300' in args
|
|
assert '-dMonoImageResolution=300' in args
|
|
|
|
|
|
def test_ghostscript_jpeg_options_via_cli(resources, outpdf):
|
|
"""End-to-end: CLI flags reach the ghostscript plugin namespace."""
|
|
with patch(
|
|
'ocrmypdf._exec.ghostscript.generate_pdfa',
|
|
wraps=ghostscript.generate_pdfa,
|
|
) as gen_mock:
|
|
run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa',
|
|
'--ghostscript-jpeg-quality',
|
|
'60',
|
|
'--ghostscript-jpeg-maxdpi',
|
|
'150',
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
)
|
|
assert gen_mock.called
|
|
call_kwargs = gen_mock.call_args.kwargs
|
|
assert call_kwargs['jpeg_quality'] == 60
|
|
assert call_kwargs['jpeg_maxdpi'] == 150
|
|
|
|
|
|
def test_gs_render_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'blank.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa', # Required to trigger Ghostscript PDF/A generation
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_render_failure.py',
|
|
)
|
|
assert 'TEST ERROR: gs_render_failure.py' in caplog.text
|
|
assert exitcode == ExitCode.child_process_error
|
|
|
|
|
|
def test_gs_raster_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_raster_failure.py',
|
|
)
|
|
assert 'TEST ERROR: gs_raster_failure.py' in caplog.text
|
|
assert exitcode == ExitCode.child_process_error
|
|
|
|
|
|
def test_ghostscript_pdfa_failure(resources, outpdf, caplog):
|
|
exitcode = run_ocrmypdf_api(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa', # Required to trigger Ghostscript PDF/A generation
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_pdfa_failure.py',
|
|
)
|
|
assert exitcode == ExitCode.pdfa_conversion_failed, (
|
|
"Unexpected return when PDF/A fails"
|
|
)
|
|
|
|
|
|
def test_ghostscript_feature_elision(resources, outpdf):
|
|
check_ocrmypdf(
|
|
resources / 'francais.pdf',
|
|
outpdf,
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
'--plugin',
|
|
'tests/plugins/gs_feature_elision.py',
|
|
)
|
|
|
|
|
|
def test_ghostscript_mandatory_color_conversion(resources, outpdf):
|
|
with pytest.raises(ColorConversionNeededError):
|
|
check_ocrmypdf(
|
|
resources / 'jbig2_baddevicen.pdf',
|
|
outpdf,
|
|
'--output-type',
|
|
'pdfa', # Required to trigger Ghostscript PDF/A generation
|
|
'--plugin',
|
|
'tests/plugins/tesseract_noop.py',
|
|
)
|
|
|
|
|
|
def test_rasterize_pdf_errors(resources, no_outpdf, caplog):
|
|
with patch('ocrmypdf._exec.ghostscript.run') as mock:
|
|
# ghostscript can produce empty files with return code 0
|
|
mock.return_value = subprocess.CompletedProcess(
|
|
['fakegs'], returncode=0, stdout=b'', stderr=b'error this is an error'
|
|
)
|
|
with pytest.raises(UnidentifiedImageError):
|
|
rasterize_pdf(
|
|
resources / 'francais.pdf',
|
|
no_outpdf,
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(100, 100),
|
|
)
|
|
assert "this is an error" in caplog.text
|
|
assert "invalid page image file" in caplog.text
|
|
|
|
|
|
class TestDuplicateFilter:
|
|
@pytest.fixture(scope='function')
|
|
def duplicate_filter_logger(self):
|
|
# token_urlsafe: ensure the logger has a unique name so tests are isolated
|
|
logger = logging.getLogger(__name__ + secrets.token_urlsafe(8))
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addFilter(DuplicateFilter(logger))
|
|
return logger
|
|
|
|
@pytest.mark.xfail(
|
|
(3, 13, 3) <= sys.version_info[:3] <= (3, 13, 5),
|
|
reason="https://github.com/python/cpython/pull/135858",
|
|
)
|
|
def test_filter_duplicate_messages(self, duplicate_filter_logger, caplog):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 5
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "(suppressed 2 repeated lines)"
|
|
assert caplog.records[2].msg == "another error message"
|
|
assert caplog.records[3].msg == "(suppressed 1 repeated lines)"
|
|
assert caplog.records[4].msg == "yet another error message"
|
|
|
|
def test_filter_does_not_affect_unique_messages(
|
|
self, duplicate_filter_logger, caplog
|
|
):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 3
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "another error message"
|
|
assert caplog.records[2].msg == "yet another error message"
|
|
|
|
@pytest.mark.xfail(
|
|
(3, 13, 3) <= sys.version_info[:3] <= (3, 13, 5),
|
|
reason="https://github.com/python/cpython/pull/135858",
|
|
)
|
|
def test_filter_alt_messages(self, duplicate_filter_logger, caplog):
|
|
log = duplicate_filter_logger
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("test error message")
|
|
log.error("test error message")
|
|
log.error("another error message")
|
|
log.error("yet another error message")
|
|
|
|
assert len(caplog.records) == 4
|
|
assert caplog.records[0].msg == "test error message"
|
|
assert caplog.records[1].msg == "another error message"
|
|
assert caplog.records[2].msg == "(suppressed 5 repeated lines)"
|
|
assert caplog.records[3].msg == "yet another error message"
|
|
|
|
|
|
@pytest.fixture
|
|
def pdf_with_invalid_image(outdir):
|
|
# issue 1451
|
|
Name = pikepdf.Name
|
|
pdf = pikepdf.new()
|
|
pdf.add_blank_page()
|
|
pdf.pages[0].Contents = pdf.make_stream(b'612 0 0 612 0 0 cm /Image Do')
|
|
# Create an invalid image object that has both ColorSpace and ImageMask set
|
|
pdf.pages[0].Resources = pikepdf.Dictionary(
|
|
XObject=pdf.make_indirect(
|
|
pikepdf.Dictionary(
|
|
Image=pdf.make_stream(
|
|
b"\xf0\x0f" * 8,
|
|
ColorSpace=Name.DeviceGray,
|
|
BitsPerComponent=1,
|
|
Width=8,
|
|
Height=8,
|
|
ImageMask=True,
|
|
Subtype=Name.Image,
|
|
Type=Name.XObject,
|
|
)
|
|
)
|
|
)
|
|
)
|
|
pdf.save(outdir / 'invalid_image.pdf')
|
|
pdf.save('invalid_image.pdf')
|
|
return outdir / 'invalid_image.pdf'
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
ghostscript.version() < Version('10.04.0'),
|
|
reason="Older Ghostscript behavior is different",
|
|
)
|
|
def test_recoverable_image_error(pdf_with_invalid_image, outdir, caplog):
|
|
# When stop_on_error is False, we expect Ghostscript to print an error
|
|
# but continue
|
|
rasterize_pdf(
|
|
outdir / 'invalid_image.pdf',
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(10, 10),
|
|
stop_on_error=False,
|
|
)
|
|
assert 'Image has both ImageMask and ColorSpace' in caplog.text
|
|
|
|
|
|
@pytest.mark.xfail(
|
|
ghostscript.version() < Version('10.04.0'),
|
|
reason="Older Ghostscript behavior is different",
|
|
)
|
|
def test_recoverable_image_error_with_stop(pdf_with_invalid_image, outdir, caplog):
|
|
# When stop_on_error is True, Ghostscript will print an error and exit
|
|
# but still produce a viable image. We intercept this case and raise
|
|
# InputFileError because it will contain an image of the whole page minus
|
|
# the image we are rendering.
|
|
with pytest.raises(
|
|
InputFileError, match="Try using --continue-on-soft-render-error"
|
|
):
|
|
rasterize_pdf(
|
|
outdir / 'invalid_image.pdf',
|
|
outdir / 'out.png',
|
|
raster_device=GhostscriptRasterDevice.PNGMONO,
|
|
raster_dpi=Resolution(100, 100),
|
|
stop_on_error=True,
|
|
)
|
|
# out2.png will not be created; if it were it would be blank.
|
|
|
|
|
|
class TestGs106JpegCorruptionRepair:
|
|
"""Test the Ghostscript 10.6 JPEG corruption repair function."""
|
|
|
|
@pytest.fixture
|
|
def create_damaged_pdf(self, resources, outdir):
|
|
"""Create a damaged PDF by truncating JPEG data by 2 bytes."""
|
|
|
|
def _create_damaged(source_pdf_name='francais.pdf', truncate_bytes=2):
|
|
source_path = resources / source_pdf_name
|
|
damaged_path = outdir / 'damaged.pdf'
|
|
|
|
with pikepdf.open(source_path) as pdf:
|
|
# Find and truncate DCTDecode images
|
|
Name = pikepdf.Name
|
|
damaged_count = 0
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
# Truncate the JPEG data
|
|
original_bytes = obj.read_raw_bytes()
|
|
truncated_bytes = original_bytes[:-truncate_bytes]
|
|
obj.write(truncated_bytes, filter=Name.DCTDecode)
|
|
damaged_count += 1
|
|
|
|
pdf.save(damaged_path)
|
|
return source_path, damaged_path, damaged_count
|
|
|
|
return _create_damaged
|
|
|
|
def test_repair_truncated_jpeg(self, create_damaged_pdf, caplog):
|
|
"""Test that truncated JPEG images are repaired."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path, damaged_path, damaged_count = create_damaged_pdf()
|
|
|
|
assert damaged_count > 0, "Test PDF should have DCTDecode images"
|
|
|
|
# Get original image bytes for comparison
|
|
with pikepdf.open(source_path) as pdf:
|
|
Name = pikepdf.Name
|
|
original_bytes_list = []
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
original_bytes_list.append(obj.read_raw_bytes())
|
|
|
|
# Run the repair function
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, damaged_path)
|
|
assert repaired is True, "Repair should have been performed"
|
|
|
|
# Verify the repaired PDF has correct image bytes
|
|
with pikepdf.open(damaged_path) as pdf:
|
|
Name = pikepdf.Name
|
|
repaired_bytes_list = []
|
|
for page in pdf.pages:
|
|
if Name.Resources not in page:
|
|
continue
|
|
resources_dict = page[Name.Resources]
|
|
if Name.XObject not in resources_dict:
|
|
continue
|
|
for key in resources_dict[Name.XObject].keys():
|
|
obj = resources_dict[Name.XObject][key]
|
|
if obj.get(Name.Subtype) != Name.Image:
|
|
continue
|
|
if obj.get(Name.Filter) != Name.DCTDecode:
|
|
continue
|
|
repaired_bytes_list.append(obj.read_raw_bytes())
|
|
|
|
assert len(repaired_bytes_list) == len(original_bytes_list)
|
|
for orig, repaired_bytes in zip(
|
|
original_bytes_list, repaired_bytes_list, strict=False
|
|
):
|
|
assert orig == repaired_bytes, "Repaired bytes should match original"
|
|
|
|
# Check that error/warning was logged
|
|
assert "JPEG corruption detected" in caplog.text
|
|
|
|
def test_no_repair_when_not_truncated(self, resources, outdir, caplog):
|
|
"""Test that no repair is done when images are not truncated."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path = resources / 'francais.pdf'
|
|
|
|
# Copy source to output (no damage)
|
|
output_path = outdir / 'undamaged.pdf'
|
|
with pikepdf.open(source_path) as pdf:
|
|
pdf.save(output_path)
|
|
|
|
# Run the repair function - should not repair anything
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, output_path)
|
|
assert repaired is False, "No repair should have been performed"
|
|
assert "JPEG corruption detected" not in caplog.text
|
|
|
|
def test_no_repair_when_truncation_too_large(self, create_damaged_pdf, caplog):
|
|
"""Test that images truncated by more than 15 bytes are not repaired."""
|
|
caplog.set_level(logging.DEBUG)
|
|
source_path, damaged_path, _ = create_damaged_pdf(truncate_bytes=20)
|
|
|
|
repaired = _repair_gs106_jpeg_corruption(source_path, damaged_path)
|
|
assert repaired is False, "Should not repair truncation > 15 bytes"
|
|
assert "JPEG corruption detected" not in caplog.text
|