diff --git a/src/anthias_server/celery_tasks.py b/src/anthias_server/celery_tasks.py index 08b2a958..0daa3be6 100755 --- a/src/anthias_server/celery_tasks.py +++ b/src/anthias_server/celery_tasks.py @@ -785,6 +785,13 @@ NORMALIZE_VIDEO_TIME_LIMIT_S = processing.NORMALIZE_VIDEO_TIME_LIMIT_S base=processing._NormalizeAssetTask, time_limit=300, autoretry_for=(OSError,), + # ``FileNotFoundError`` is-a ``OSError`` so the autoretry_for + # filter would catch it — but a missing source file isn't a + # transient condition; retrying just keeps the row in + # ``is_processing=True`` longer before the inevitable + # ``on_failure`` lands. Excluding it here lets the on_failure + # path write ``metadata.error_message`` immediately. + dont_autoretry_for=(FileNotFoundError,), retry_backoff=10, retry_backoff_max=300, retry_jitter=True, @@ -800,7 +807,10 @@ def normalize_image_asset(asset_id: str) -> None: Retries on OSError covers transient disk pressure / a temporary libheif read hiccup; Pillow's UnidentifiedImageError is permanent - and lands directly on on_failure. + and lands directly on on_failure. ``FileNotFoundError`` (source + file gone between row creation and pickup) is excluded from + autoretry — see the ``dont_autoretry_for`` rationale on the + decorator. """ asset = processing._row_or_none(asset_id) if asset is None: @@ -812,6 +822,9 @@ def normalize_image_asset(asset_id: str) -> None: base=processing._NormalizeAssetTask, time_limit=NORMALIZE_VIDEO_TIME_LIMIT_S, autoretry_for=(OSError,), + # Same rationale as normalize_image_asset above: a missing source + # file is permanent and should land on on_failure right away. + dont_autoretry_for=(FileNotFoundError,), retry_backoff=15, retry_backoff_max=300, retry_jitter=True, diff --git a/src/anthias_server/processing.py b/src/anthias_server/processing.py index 8c088fb4..a51b5def 100644 --- a/src/anthias_server/processing.py +++ b/src/anthias_server/processing.py @@ -554,8 +554,16 @@ def _run_image_normalisation(asset: Asset) -> None: # Atomic rename within the same dir — POSIX guarantees this is # observed as a single inode swap. os.replace overwrites an # existing .webp (e.g. a re-run of the task on the same asset), - # which is the right semantics here. - os.replace(staging, final_uri) + # which is the right semantics here. A rename failure + # (filesystem full, permissions, cross-device link mid-pipeline) + # must still drop the staging file so the "no leftover .tmp" + # contract holds — without the guard a stale .webp.tmp would + # only get cleaned up by the cleanup() sweep an hour later. + try: + os.replace(staging, final_uri) + except OSError: + _drop_image_staging() + raise # Drop the original now that the WebP has landed. cleanup() would # eventually sweep it as an orphan once the row's uri is updated, @@ -882,7 +890,16 @@ def _run_video_normalisation(asset: Asset) -> None: _drop_staging() raise RuntimeError(f'ffmpeg produced no output for {src_uri!r}') - os.replace(staging, final_uri) + # Same rename-failure cleanup as the image pipeline: the atomic + # rename normally succeeds in <1ms, but a filesystem-full / + # permissions / cross-device error here would otherwise leave + # the staging file hanging around. Mirror the contract by + # dropping it on any OSError. + try: + os.replace(staging, final_uri) + except OSError: + _drop_staging() + raise # Drop the original if it lived under a different name (e.g. a # ProRes .mov whose transcoded H.264 lands at the same base.mp4). diff --git a/tests/test_processing.py b/tests/test_processing.py index 4d70ec8e..c5c67f88 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -370,6 +370,38 @@ def test_image_partial_write_cleans_staging(asset_dir: str) -> None: assert not leftover, f'image staging leftover: {leftover}' +@pytest.mark.django_db +def test_image_rename_failure_cleans_staging(asset_dir: str) -> None: + """The atomic ``os.replace(staging, final_uri)`` normally succeeds + in <1ms, but a filesystem-full / permissions / cross-device error + there would otherwise leave the .webp.tmp behind. Wrap-the-rename + contract: any OSError on rename drops the staging file before + propagating, matching the timeout/error/zero-byte branches.""" + src = path.join(asset_dir, 'fixture.tiff') + _write_image(src, 'TIFF') + asset = _make_processing_asset('img-rename-fail', src) + + real_replace = os.replace + + def boom(staging: str, final_uri: str) -> None: + # Verify the staging file actually exists before we explode — + # otherwise the test would also pass if Pillow never wrote. + assert path.isfile(staging), 'precondition: staging must exist' + raise OSError('simulated cross-device rename failure') + + with ( + mock.patch.object(processing, '_notify'), + mock.patch('anthias_server.processing.os.replace', side_effect=boom), + ): + with pytest.raises(OSError, match='cross-device'): + processing._run_image_normalisation(asset) + + leftover = [n for n in os.listdir(asset_dir) if n.endswith('.webp.tmp')] + assert not leftover, f'image staging leftover after rename: {leftover}' + # The real replace was never called. + del real_replace + + @pytest.mark.django_db def test_image_missing_file_raises_filenotfound(asset_dir: str) -> None: """Source file disappeared between row creation and task @@ -899,6 +931,48 @@ def test_video_zero_byte_output_fails_clean(asset_dir: str) -> None: assert not leftover, f'staging leftover after empty output: {leftover}' +@pytest.mark.django_db +def test_video_rename_failure_cleans_staging(asset_dir: str) -> None: + """Video pipeline mirrors the image-pipeline contract: an OSError + on the post-transcode ``os.replace(staging, final_uri)`` (disk + full, permissions, cross-device) drops the .staging.mp4 file + before propagating.""" + src = path.join(asset_dir, 'odd.mov') + with open(src, 'wb') as fh: + fh.write(b'\x00' * 16) + asset = _make_processing_asset('vid-rename-fail', src, mimetype='video') + + summary = { + 'container': 'mov', + 'video_codec': 'prores', + 'audio_codec': 'aac', + } + + def good_transcode(_in: str, staging: str, _profile: Any = None) -> None: + with open(staging, 'wb') as fh: + fh.write(b'\x00\x00\x00\x18ftypmp42') + + def boom(staging: str, final_uri: str) -> None: + assert path.isfile(staging), 'precondition: staging must exist' + raise OSError('simulated rename failure') + + with ( + mock.patch.object(processing, '_notify'), + mock.patch.object( + processing, '_ffprobe_summary', return_value=summary + ), + mock.patch.object( + processing, '_transcode_to_target', side_effect=good_transcode + ), + mock.patch('anthias_server.processing.os.replace', side_effect=boom), + ): + with pytest.raises(OSError, match='rename failure'): + processing._run_video_normalisation(asset) + + leftover = [n for n in os.listdir(asset_dir) if n.endswith('.staging.mp4')] + assert not leftover, f'video staging leftover after rename: {leftover}' + + # --------------------------------------------------------------------------- # ffprobe summary parsing — tested independently of the runner # --------------------------------------------------------------------------- @@ -1331,6 +1405,39 @@ def test_normalize_image_asset_celery_no_op_when_row_missing() -> None: run.assert_not_called() +def test_normalize_tasks_exclude_filenotfounderror_from_autoretry() -> None: + """Both normalisation tasks have ``autoretry_for=(OSError,)`` so a + transient disk hiccup is retried automatically. ``FileNotFoundError`` + is-a ``OSError`` so the autoretry filter would otherwise catch it + too — but a missing source file is permanent, and retrying just + delays the on_failure that writes ``metadata.error_message``. + Confirm both tasks were registered with + ``dont_autoretry_for=(FileNotFoundError,)`` so the exclusion is + in effect at celery-config time. + """ + from anthias_server.celery_tasks import ( + normalize_image_asset, + normalize_video_asset, + ) + + for task in (normalize_image_asset, normalize_video_asset): + # ``autoretry_for`` and ``dont_autoretry_for`` are read off + # the celery Task instance via the per-task options dict that + # ``add_autoretry_behaviour`` populates at registration. They + # are not declared as class attributes on the Task type, so + # mypy needs a getattr to see them; the ``celery-types`` + # stubs we use don't model these dynamic options. + autoretry_for = tuple(getattr(task, 'autoretry_for', ())) + dont_autoretry_for = tuple(getattr(task, 'dont_autoretry_for', ())) + assert OSError in autoretry_for, ( + f'{task.name} expected autoretry_for=(OSError,)' + ) + assert FileNotFoundError in dont_autoretry_for, ( + f'{task.name} expected dont_autoretry_for to include ' + f'FileNotFoundError so missing-source raises immediately' + ) + + @pytest.mark.django_db def test_normalize_on_failure_writes_error_metadata( asset_dir: str,