mirror of
https://github.com/Screenly/Anthias.git
synced 2026-06-10 09:08:09 -04:00
fix(processing): address Copilot review on commit 42697452
Four contract gaps Copilot flagged: * ``normalize_image_asset`` / ``normalize_video_asset`` use ``autoretry_for=(OSError,)`` to recover from transient disk pressure. ``FileNotFoundError`` is-a ``OSError`` so the filter was catching it too — but a missing source file is permanent, and retrying just delays the on_failure that writes ``metadata.error_message``. Adding ``dont_autoretry_for=(FileNotFoundError,)`` to both decorators makes the missing-source raise propagate immediately, so the operator sees the "Failed" pill and the error message at the next browser refresh instead of waiting through up-to-3 exponential-backoff retry cycles. * ``_run_image_normalisation`` and ``_run_video_normalisation`` both call ``os.replace(staging, final_uri)`` after a successful conversion / transcode. A rename failure (cross-device link, filesystem-full at the very last step, permissions) was outside the existing try/except, so the staging file would linger until cleanup()'s 1h sweep. Wrap both in a try/except that calls ``_drop_image_staging`` / ``_drop_staging`` on any OSError before propagating — the "no leftover staging artifacts on failure" contract now holds across every failure path. Tests: * ``test_image_rename_failure_cleans_staging`` and ``test_video_rename_failure_cleans_staging`` — patch ``os.replace`` to raise OSError; assert the staging file is gone before the exception reaches the runner's caller. * ``test_normalize_tasks_exclude_filenotfounderror_from_autoretry`` — celery-config-time check that both tasks expose ``FileNotFoundError`` in their dont_autoretry_for tuple, so a future change to the decorator can't silently regress the immediate-fail contract.
This commit is contained in:
@@ -785,6 +785,13 @@ NORMALIZE_VIDEO_TIME_LIMIT_S = processing.NORMALIZE_VIDEO_TIME_LIMIT_S
|
||||
base=processing._NormalizeAssetTask,
|
||||
time_limit=300,
|
||||
autoretry_for=(OSError,),
|
||||
# ``FileNotFoundError`` is-a ``OSError`` so the autoretry_for
|
||||
# filter would catch it — but a missing source file isn't a
|
||||
# transient condition; retrying just keeps the row in
|
||||
# ``is_processing=True`` longer before the inevitable
|
||||
# ``on_failure`` lands. Excluding it here lets the on_failure
|
||||
# path write ``metadata.error_message`` immediately.
|
||||
dont_autoretry_for=(FileNotFoundError,),
|
||||
retry_backoff=10,
|
||||
retry_backoff_max=300,
|
||||
retry_jitter=True,
|
||||
@@ -800,7 +807,10 @@ def normalize_image_asset(asset_id: str) -> None:
|
||||
|
||||
Retries on OSError covers transient disk pressure / a temporary
|
||||
libheif read hiccup; Pillow's UnidentifiedImageError is permanent
|
||||
and lands directly on on_failure.
|
||||
and lands directly on on_failure. ``FileNotFoundError`` (source
|
||||
file gone between row creation and pickup) is excluded from
|
||||
autoretry — see the ``dont_autoretry_for`` rationale on the
|
||||
decorator.
|
||||
"""
|
||||
asset = processing._row_or_none(asset_id)
|
||||
if asset is None:
|
||||
@@ -812,6 +822,9 @@ def normalize_image_asset(asset_id: str) -> None:
|
||||
base=processing._NormalizeAssetTask,
|
||||
time_limit=NORMALIZE_VIDEO_TIME_LIMIT_S,
|
||||
autoretry_for=(OSError,),
|
||||
# Same rationale as normalize_image_asset above: a missing source
|
||||
# file is permanent and should land on on_failure right away.
|
||||
dont_autoretry_for=(FileNotFoundError,),
|
||||
retry_backoff=15,
|
||||
retry_backoff_max=300,
|
||||
retry_jitter=True,
|
||||
|
||||
@@ -554,8 +554,16 @@ def _run_image_normalisation(asset: Asset) -> None:
|
||||
# Atomic rename within the same dir — POSIX guarantees this is
|
||||
# observed as a single inode swap. os.replace overwrites an
|
||||
# existing .webp (e.g. a re-run of the task on the same asset),
|
||||
# which is the right semantics here.
|
||||
os.replace(staging, final_uri)
|
||||
# which is the right semantics here. A rename failure
|
||||
# (filesystem full, permissions, cross-device link mid-pipeline)
|
||||
# must still drop the staging file so the "no leftover .tmp"
|
||||
# contract holds — without the guard a stale .webp.tmp would
|
||||
# only get cleaned up by the cleanup() sweep an hour later.
|
||||
try:
|
||||
os.replace(staging, final_uri)
|
||||
except OSError:
|
||||
_drop_image_staging()
|
||||
raise
|
||||
|
||||
# Drop the original now that the WebP has landed. cleanup() would
|
||||
# eventually sweep it as an orphan once the row's uri is updated,
|
||||
@@ -882,7 +890,16 @@ def _run_video_normalisation(asset: Asset) -> None:
|
||||
_drop_staging()
|
||||
raise RuntimeError(f'ffmpeg produced no output for {src_uri!r}')
|
||||
|
||||
os.replace(staging, final_uri)
|
||||
# Same rename-failure cleanup as the image pipeline: the atomic
|
||||
# rename normally succeeds in <1ms, but a filesystem-full /
|
||||
# permissions / cross-device error here would otherwise leave
|
||||
# the staging file hanging around. Mirror the contract by
|
||||
# dropping it on any OSError.
|
||||
try:
|
||||
os.replace(staging, final_uri)
|
||||
except OSError:
|
||||
_drop_staging()
|
||||
raise
|
||||
|
||||
# Drop the original if it lived under a different name (e.g. a
|
||||
# ProRes .mov whose transcoded H.264 lands at the same base.mp4).
|
||||
|
||||
@@ -370,6 +370,38 @@ def test_image_partial_write_cleans_staging(asset_dir: str) -> None:
|
||||
assert not leftover, f'image staging leftover: {leftover}'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_image_rename_failure_cleans_staging(asset_dir: str) -> None:
|
||||
"""The atomic ``os.replace(staging, final_uri)`` normally succeeds
|
||||
in <1ms, but a filesystem-full / permissions / cross-device error
|
||||
there would otherwise leave the .webp.tmp behind. Wrap-the-rename
|
||||
contract: any OSError on rename drops the staging file before
|
||||
propagating, matching the timeout/error/zero-byte branches."""
|
||||
src = path.join(asset_dir, 'fixture.tiff')
|
||||
_write_image(src, 'TIFF')
|
||||
asset = _make_processing_asset('img-rename-fail', src)
|
||||
|
||||
real_replace = os.replace
|
||||
|
||||
def boom(staging: str, final_uri: str) -> None:
|
||||
# Verify the staging file actually exists before we explode —
|
||||
# otherwise the test would also pass if Pillow never wrote.
|
||||
assert path.isfile(staging), 'precondition: staging must exist'
|
||||
raise OSError('simulated cross-device rename failure')
|
||||
|
||||
with (
|
||||
mock.patch.object(processing, '_notify'),
|
||||
mock.patch('anthias_server.processing.os.replace', side_effect=boom),
|
||||
):
|
||||
with pytest.raises(OSError, match='cross-device'):
|
||||
processing._run_image_normalisation(asset)
|
||||
|
||||
leftover = [n for n in os.listdir(asset_dir) if n.endswith('.webp.tmp')]
|
||||
assert not leftover, f'image staging leftover after rename: {leftover}'
|
||||
# The real replace was never called.
|
||||
del real_replace
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_image_missing_file_raises_filenotfound(asset_dir: str) -> None:
|
||||
"""Source file disappeared between row creation and task
|
||||
@@ -899,6 +931,48 @@ def test_video_zero_byte_output_fails_clean(asset_dir: str) -> None:
|
||||
assert not leftover, f'staging leftover after empty output: {leftover}'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_video_rename_failure_cleans_staging(asset_dir: str) -> None:
|
||||
"""Video pipeline mirrors the image-pipeline contract: an OSError
|
||||
on the post-transcode ``os.replace(staging, final_uri)`` (disk
|
||||
full, permissions, cross-device) drops the .staging.mp4 file
|
||||
before propagating."""
|
||||
src = path.join(asset_dir, 'odd.mov')
|
||||
with open(src, 'wb') as fh:
|
||||
fh.write(b'\x00' * 16)
|
||||
asset = _make_processing_asset('vid-rename-fail', src, mimetype='video')
|
||||
|
||||
summary = {
|
||||
'container': 'mov',
|
||||
'video_codec': 'prores',
|
||||
'audio_codec': 'aac',
|
||||
}
|
||||
|
||||
def good_transcode(_in: str, staging: str, _profile: Any = None) -> None:
|
||||
with open(staging, 'wb') as fh:
|
||||
fh.write(b'\x00\x00\x00\x18ftypmp42')
|
||||
|
||||
def boom(staging: str, final_uri: str) -> None:
|
||||
assert path.isfile(staging), 'precondition: staging must exist'
|
||||
raise OSError('simulated rename failure')
|
||||
|
||||
with (
|
||||
mock.patch.object(processing, '_notify'),
|
||||
mock.patch.object(
|
||||
processing, '_ffprobe_summary', return_value=summary
|
||||
),
|
||||
mock.patch.object(
|
||||
processing, '_transcode_to_target', side_effect=good_transcode
|
||||
),
|
||||
mock.patch('anthias_server.processing.os.replace', side_effect=boom),
|
||||
):
|
||||
with pytest.raises(OSError, match='rename failure'):
|
||||
processing._run_video_normalisation(asset)
|
||||
|
||||
leftover = [n for n in os.listdir(asset_dir) if n.endswith('.staging.mp4')]
|
||||
assert not leftover, f'video staging leftover after rename: {leftover}'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ffprobe summary parsing — tested independently of the runner
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1331,6 +1405,39 @@ def test_normalize_image_asset_celery_no_op_when_row_missing() -> None:
|
||||
run.assert_not_called()
|
||||
|
||||
|
||||
def test_normalize_tasks_exclude_filenotfounderror_from_autoretry() -> None:
|
||||
"""Both normalisation tasks have ``autoretry_for=(OSError,)`` so a
|
||||
transient disk hiccup is retried automatically. ``FileNotFoundError``
|
||||
is-a ``OSError`` so the autoretry filter would otherwise catch it
|
||||
too — but a missing source file is permanent, and retrying just
|
||||
delays the on_failure that writes ``metadata.error_message``.
|
||||
Confirm both tasks were registered with
|
||||
``dont_autoretry_for=(FileNotFoundError,)`` so the exclusion is
|
||||
in effect at celery-config time.
|
||||
"""
|
||||
from anthias_server.celery_tasks import (
|
||||
normalize_image_asset,
|
||||
normalize_video_asset,
|
||||
)
|
||||
|
||||
for task in (normalize_image_asset, normalize_video_asset):
|
||||
# ``autoretry_for`` and ``dont_autoretry_for`` are read off
|
||||
# the celery Task instance via the per-task options dict that
|
||||
# ``add_autoretry_behaviour`` populates at registration. They
|
||||
# are not declared as class attributes on the Task type, so
|
||||
# mypy needs a getattr to see them; the ``celery-types``
|
||||
# stubs we use don't model these dynamic options.
|
||||
autoretry_for = tuple(getattr(task, 'autoretry_for', ()))
|
||||
dont_autoretry_for = tuple(getattr(task, 'dont_autoretry_for', ()))
|
||||
assert OSError in autoretry_for, (
|
||||
f'{task.name} expected autoretry_for=(OSError,)'
|
||||
)
|
||||
assert FileNotFoundError in dont_autoretry_for, (
|
||||
f'{task.name} expected dont_autoretry_for to include '
|
||||
f'FileNotFoundError so missing-source raises immediately'
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_normalize_on_failure_writes_error_metadata(
|
||||
asset_dir: str,
|
||||
|
||||
Reference in New Issue
Block a user