mirror of
https://github.com/Screenly/Anthias.git
synced 2026-06-10 09:08:09 -04:00
The upload codec/resolution gate raises ``UnsupportedVideoCodecError`` as a deliberate, operator-facing rejection — it's surfaced in the UI as a "Failed" pill plus a copy-pasteable ffmpeg re-encode recipe via ``_NormalizeAssetTask.on_failure``. It's an expected outcome (e.g. Pi 5 has no H.264 HW decode block, an unknown arm64 board can't certify any codec), not a fault, so it shouldn't reach Sentry — but every rejection was landing there as an unhandled task error (Sentry ANTHIAS-1J, ANTHIAS-20). List it in the video task's ``throws``: Celery then logs it at INFO without a traceback, and sentry-sdk's CeleryIntegration skips ``task.throws`` exceptions (``_capture_exception`` returns early on ``isinstance(exc, task.throws)``), so the gate stops flooding Sentry. ``on_failure`` still runs, so the operator-facing error pill and recipe are unchanged. Regression test asserts the video task declares it in ``throws`` and the image task (which never raises it) does not. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2123 lines
77 KiB
Python
2123 lines
77 KiB
Python
"""Unit tests for the upload-time normalisation pipeline.
|
||
|
||
Two tasks under test:
|
||
|
||
* ``normalize_image_asset`` — every extension in
|
||
``NORMALIZE_IMAGE_EXTS`` (HEIC / HEIF / TIFF / BMP / ICO / TGA /
|
||
JPEG 2000 family / AVIF) → lossless WebP. JPEG / PNG / WebP / GIF
|
||
/ SVG short-circuit through the no-op branch.
|
||
* ``normalize_video_asset`` — runs ffprobe and writes codec / dims /
|
||
fps / audio codec / container / duration into ``metadata``. The
|
||
asset file is never rewritten; the operator's UI uses the metadata
|
||
fields to identify clips the board can't decode in hardware.
|
||
|
||
Fixtures are generated programmatically (Pillow + ffmpeg) so the test
|
||
suite is self-contained — no checked-in binary blobs to drift, and
|
||
the matrix of formats can grow without a fixture-file dance. The
|
||
ffmpeg-driven video fixtures need a real ffmpeg in PATH; the host CI
|
||
image already has it (it's already a runtime dep for
|
||
``get_video_duration``), and the local-dev path matches once the
|
||
``ffmpeg`` apt package is installed.
|
||
|
||
Tests deliberately exercise the underlying helper functions
|
||
(``_run_image_normalisation``, ``_run_video_normalisation``) rather
|
||
than the celery wrappers — the wrappers are thin enough that calling
|
||
``.run()`` would just retest the same code path while bringing
|
||
celery's eager-mode plumbing into scope. The wrapper-side guarantees
|
||
(``Task.on_failure`` clearing ``is_processing``, autoretry config)
|
||
get their own dedicated tests below.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import io
|
||
import json
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
from collections.abc import Iterator
|
||
from os import path
|
||
from typing import Any
|
||
from unittest import mock
|
||
|
||
import pytest
|
||
import sh
|
||
from PIL import Image, UnidentifiedImageError
|
||
|
||
from anthias_server import processing
|
||
from anthias_server.app.models import Asset
|
||
from anthias_server.settings import settings as anthias_settings
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Skip markers — keep the suite green on hosts without optional deps
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
_FFMPEG_AVAILABLE = shutil.which('ffmpeg') is not None
|
||
_FFPROBE_AVAILABLE = shutil.which('ffprobe') is not None
|
||
|
||
try:
|
||
import pillow_heif
|
||
|
||
pillow_heif.register_heif_opener()
|
||
_HEIF_AVAILABLE = True
|
||
except Exception:
|
||
_HEIF_AVAILABLE = False
|
||
|
||
|
||
pytest_ffmpeg = pytest.mark.skipif(
|
||
not (_FFMPEG_AVAILABLE and _FFPROBE_AVAILABLE),
|
||
reason='ffmpeg / ffprobe not on PATH',
|
||
)
|
||
pytest_heif = pytest.mark.skipif(
|
||
not _HEIF_AVAILABLE,
|
||
reason='pillow-heif not installed (libheif1 missing?)',
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Asset / asset-dir fixtures
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.fixture
|
||
def asset_dir(tmp_path: Any) -> Iterator[str]:
|
||
"""Point ``settings['assetdir']`` at a fresh per-test tempdir.
|
||
|
||
Each test that writes a fixture file lands it under here, and
|
||
asserts on what survives. ``Asset.objects.all().delete()`` clears
|
||
DB rows between tests because ``django_db`` rollback covers
|
||
persisted writes but the celery tasks above call
|
||
``Asset.objects.update`` directly, which doesn't always observe
|
||
the wrap.
|
||
"""
|
||
Asset.objects.all().delete()
|
||
original = anthias_settings['assetdir']
|
||
anthias_settings['assetdir'] = str(tmp_path)
|
||
try:
|
||
yield str(tmp_path)
|
||
finally:
|
||
anthias_settings['assetdir'] = original
|
||
Asset.objects.all().delete()
|
||
|
||
|
||
def _make_processing_asset(
|
||
asset_id: str,
|
||
uri: str,
|
||
mimetype: str = 'image',
|
||
metadata: dict[str, Any] | None = None,
|
||
) -> Asset:
|
||
"""Persist a row in the state the upload path leaves behind:
|
||
``is_processing=True``, mimetype set, uri pointing at the upload
|
||
file. The normalisation task takes it from there.
|
||
"""
|
||
return Asset.objects.create(
|
||
asset_id=asset_id,
|
||
name=asset_id,
|
||
uri=uri,
|
||
mimetype=mimetype,
|
||
duration=0,
|
||
is_enabled=True,
|
||
is_processing=True,
|
||
play_order=0,
|
||
metadata=metadata or {},
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Image-fixture builders (HEIC / HEIF / TIFF / corrupt)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _write_image(out_path: str, fmt: str, **save_kwargs: Any) -> str:
|
||
"""Synthesise a ~16×16 image in *fmt* and save it to *out_path*.
|
||
|
||
Variety: alpha vs no-alpha, RGB vs RGBA, multi-frame TIFF. The
|
||
point is that the conversion path's ``convert('RGBA')`` call
|
||
handles every common Pillow input shape; tests don't need
|
||
photorealistic content, just plausibly-shaped bytes.
|
||
"""
|
||
mode = save_kwargs.pop('mode', 'RGBA' if fmt != 'JPEG' else 'RGB')
|
||
colour = save_kwargs.pop(
|
||
'colour',
|
||
(255, 0, 0, 200) if mode == 'RGBA' else (255, 0, 0),
|
||
)
|
||
size = save_kwargs.pop('size', (16, 16))
|
||
image = Image.new(mode, size, colour)
|
||
image.save(out_path, fmt, **save_kwargs)
|
||
return out_path
|
||
|
||
|
||
def _write_corrupt(out_path: str, suffix: str) -> str:
|
||
"""A file that decoders will reject — header bytes from one
|
||
format, body cut short. Pillow raises ``UnidentifiedImageError``
|
||
on these; the normalisation task surfaces the failure via the
|
||
metadata.error_message contract."""
|
||
with open(out_path, 'wb') as fh:
|
||
# 'GIF89a' is enough to fool extension sniffing but Pillow
|
||
# rejects it as an invalid GIF mid-decode.
|
||
fh.write(b'GIF89a' + b'\x00' * 8)
|
||
return out_path
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Video-fixture builders (h264 mp4, hevc mkv, mpeg2 mpg, prores mov, ...)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _make_video(
|
||
out_path: str,
|
||
*,
|
||
codec: str = 'libx264',
|
||
container: str | None = None,
|
||
audio: str | None = 'aac',
|
||
extra_args: tuple[str, ...] = (),
|
||
duration_s: float = 0.5,
|
||
) -> str:
|
||
"""Synthesise a tiny clip with a chosen codec / container / audio.
|
||
|
||
Why ffmpeg instead of a binary fixture: the matrix of codecs the
|
||
task branches on (h264 vs hevc vs mpeg2 vs prores vs mjpeg, with
|
||
or without audio, mp4 vs mkv vs mov vs mpg) would grow into ~20
|
||
MB of binary blobs in-tree. A few seconds of ffmpeg per test is
|
||
cheaper. Each clip is half a second of solid-colour video at
|
||
32×32 — enough for ffprobe to identify codecs and for libx264 to
|
||
produce a valid output, small enough that even the worst
|
||
transcode finishes in well under a second.
|
||
"""
|
||
args = [
|
||
'ffmpeg',
|
||
'-hide_banner',
|
||
'-y',
|
||
'-loglevel',
|
||
'error',
|
||
# ``lavfi`` source: synthesised SMPTE colour bars, deterministic.
|
||
'-f',
|
||
'lavfi',
|
||
'-i',
|
||
f'color=c=blue:s=32x32:d={duration_s}:r=10',
|
||
]
|
||
if audio:
|
||
args += [
|
||
'-f',
|
||
'lavfi',
|
||
'-i',
|
||
f'sine=f=440:d={duration_s}',
|
||
'-c:a',
|
||
audio,
|
||
]
|
||
args += ['-c:v', codec]
|
||
args += list(extra_args)
|
||
if container:
|
||
args += ['-f', container]
|
||
args += [out_path]
|
||
subprocess.run(args, check=True, timeout=60)
|
||
return out_path
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# IMAGE normalisation tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.django_db
|
||
@pytest.mark.parametrize(
|
||
('fmt', 'ext'),
|
||
[
|
||
# TIFF — both extension spellings must route through the
|
||
# same path; covers the multi-page-flatten case implicitly
|
||
# (Pillow opens the first frame).
|
||
('TIFF', '.tif'),
|
||
('TIFF', '.tiff'),
|
||
# BMP — uncompressed source, dramatic size win after WebP.
|
||
('BMP', '.bmp'),
|
||
# ICO — Windows icon. Pillow saves+reloads as a single
|
||
# frame, which is what we want for a signage asset.
|
||
('ICO', '.ico'),
|
||
# TGA — Truevision Targa, common in screenshot tools.
|
||
('TGA', '.tga'),
|
||
# JPEG 2000 family — every Pillow-recognised extension we
|
||
# accept must round-trip.
|
||
('JPEG2000', '.jp2'),
|
||
('JPEG2000', '.j2k'),
|
||
('JPEG2000', '.jpx'),
|
||
# AVIF — modern phone camera output. Pillow 12+ supports
|
||
# AVIF natively; round-trip proves the libavif binding is
|
||
# actually linked at runtime, not just registered.
|
||
('AVIF', '.avif'),
|
||
],
|
||
)
|
||
def test_image_normalises_to_lossless_webp_across_formats(
|
||
asset_dir: str, fmt: str, ext: str
|
||
) -> None:
|
||
"""Every entry in ``NORMALIZE_IMAGE_EXTS`` produces a valid WebP
|
||
output: the original is removed, the row's URI is swapped to the
|
||
new ``.webp`` path, ``metadata.original_ext`` carries the source
|
||
extension, and the resulting file actually decodes back as WebP
|
||
(not a stub or a bytewise-renamed source).
|
||
|
||
Parametrising over the full grid catches a future change to
|
||
``_convert_image_to_webp`` that breaks one decoder while leaving
|
||
the others intact — e.g. a move to a non-RGBA convert mode would
|
||
crash JPEG2000 (which Pillow opens in mode 'RGB' by default for
|
||
some files), and a Pillow version drop that loses libavif would
|
||
fail AVIF specifically. Each case is one assertion per
|
||
invariant; failures point at one row in the matrix."""
|
||
src = path.join(asset_dir, f'fixture{ext}')
|
||
# AVIF needs RGB (Pillow's libavif binding doesn't accept RGBA on
|
||
# write); ICO/TGA/JP2/BMP all accept RGBA. The conversion target
|
||
# (``_convert_image_to_webp``'s internal ``.convert('RGBA')``) is
|
||
# what unifies — just make sure the SOURCE encodes happily.
|
||
if fmt in ('AVIF', 'JPEG2000'):
|
||
_write_image(src, fmt, mode='RGB', colour=(0, 200, 0))
|
||
else:
|
||
_write_image(src, fmt)
|
||
asset = _make_processing_asset(f'img-{fmt.lower()}', src)
|
||
|
||
with mock.patch.object(processing, '_notify') as notify:
|
||
processing._run_image_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
expected_uri = path.join(asset_dir, 'fixture.webp')
|
||
assert asset.uri == expected_uri
|
||
assert path.isfile(expected_uri)
|
||
assert not path.exists(src), f'original {ext} must be removed'
|
||
assert asset.is_processing is False
|
||
assert asset.mimetype == 'image'
|
||
assert asset.metadata['original_ext'] == ext
|
||
assert asset.metadata['converted'] is True
|
||
notify.assert_called_once_with(f'img-{fmt.lower()}')
|
||
|
||
# Round-trip the WebP — proves the file isn't a stub.
|
||
with Image.open(expected_uri) as im:
|
||
assert im.format == 'WEBP'
|
||
assert im.size == (16, 16)
|
||
|
||
|
||
@pytest_heif
|
||
@pytest.mark.django_db
|
||
@pytest.mark.parametrize('ext', ['.heic', '.heif', '.HEIC'])
|
||
def test_image_heif_converts_to_lossless_webp(
|
||
asset_dir: str, ext: str
|
||
) -> None:
|
||
"""HEIC / HEIF (case-insensitive) → WebP. RGBA so the alpha
|
||
handling on the WebP write path is exercised even though HEIF
|
||
sources commonly arrive as RGB."""
|
||
src = path.join(asset_dir, f'fixture{ext}')
|
||
_write_image(src, 'HEIF', mode='RGB', colour=(0, 200, 0))
|
||
asset = _make_processing_asset('img-heif', src)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
expected_uri = path.join(asset_dir, 'fixture.webp')
|
||
assert asset.uri == expected_uri
|
||
assert path.isfile(expected_uri)
|
||
assert not path.exists(src)
|
||
# ``original_ext`` carries the lowercased extension regardless of
|
||
# the case the file landed with.
|
||
assert asset.metadata['original_ext'] == ext.lower()
|
||
assert asset.metadata['converted'] is True
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_corrupt_input_raises_clean_error(asset_dir: str) -> None:
|
||
"""Pillow's UnidentifiedImageError must bubble out so the
|
||
on_failure hook can write metadata.error_message — never leave a
|
||
half-written staging file behind."""
|
||
src = path.join(asset_dir, 'broken.tiff')
|
||
_write_corrupt(src, '.tiff')
|
||
asset = _make_processing_asset('img-bad', src)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
with pytest.raises(UnidentifiedImageError):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
# No webp produced; the source file is left in place for the
|
||
# operator to inspect / re-upload. Staging .webp.tmp must also
|
||
# be gone — same contract as the video pipeline's _drop_staging.
|
||
assert not path.exists(path.join(asset_dir, 'broken.webp'))
|
||
assert not path.exists(path.join(asset_dir, 'broken.webp.tmp'))
|
||
assert path.exists(src)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_decompression_bomb_is_rejected(asset_dir: str) -> None:
|
||
"""A malicious image (or a misclassified scan) advertising
|
||
enormous dimensions must be rejected *before* any pixel decode
|
||
happens. The check reads ``image.size`` from the format header
|
||
and raises a ValueError that on_failure surfaces via
|
||
``metadata.error_message`` — same contract as a corrupt input.
|
||
|
||
Mocks ``Image.open`` to return a stub whose ``.size`` exceeds
|
||
the cap, so the test runs without writing a real billion-pixel
|
||
fixture (which would itself need GBs of memory at create time)."""
|
||
src = path.join(asset_dir, 'bomb.tiff')
|
||
_write_image(src, 'TIFF') # real fixture that would otherwise decode
|
||
asset = _make_processing_asset('img-bomb', src)
|
||
|
||
bomb_size = (processing._MAX_IMAGE_PIXELS // 8 + 1, 8)
|
||
|
||
class _FakeImage:
|
||
size = bomb_size
|
||
format = 'TIFF'
|
||
|
||
def __enter__(self) -> '_FakeImage':
|
||
return self
|
||
|
||
def __exit__(self, *_: object) -> None:
|
||
return None
|
||
|
||
def convert(self, _mode: str) -> 'mock.MagicMock':
|
||
# Should never be reached — the size check raises first.
|
||
raise AssertionError(
|
||
'convert() called on a bomb input — size check missed'
|
||
)
|
||
|
||
def save(self, *_: object, **__: object) -> None:
|
||
raise AssertionError('save() called on a bomb input')
|
||
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch(
|
||
'anthias_server.processing.Image.open',
|
||
return_value=_FakeImage(),
|
||
),
|
||
):
|
||
with pytest.raises(ValueError, match='exceed cap'):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
# Staging cleanup contract still holds for the bomb path.
|
||
leftover = [n for n in os.listdir(asset_dir) if n.endswith('.webp.tmp')]
|
||
assert not leftover, f'image staging leftover: {leftover}'
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_partial_write_cleans_staging(asset_dir: str) -> None:
|
||
"""If Pillow writes some bytes to the staging file and *then*
|
||
raises mid-encode (disk pressure, codec crash), the runner must
|
||
clean up the partial .webp.tmp before propagating. Mocking
|
||
``_convert_image_to_webp`` to half-write + raise is the cheapest
|
||
way to exercise that path deterministically — a real OSError
|
||
mid-WebP-encode is hard to provoke from userspace."""
|
||
src = path.join(asset_dir, 'fixture.tiff')
|
||
_write_image(src, 'TIFF')
|
||
asset = _make_processing_asset('img-partial', src)
|
||
|
||
def half_write(_in: str, staging: str) -> None:
|
||
with open(staging, 'wb') as fh:
|
||
fh.write(b'partial WebP bytes')
|
||
raise OSError('disk full mid-encode')
|
||
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_convert_image_to_webp', side_effect=half_write
|
||
),
|
||
):
|
||
with pytest.raises(OSError, match='disk full'):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
# No leftover .webp.tmp in the asset dir — the runner removed it
|
||
# before the raise propagated.
|
||
leftover = [n for n in os.listdir(asset_dir) if n.endswith('.webp.tmp')]
|
||
assert not leftover, f'image staging leftover: {leftover}'
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_rename_failure_cleans_staging(asset_dir: str) -> None:
|
||
"""The atomic ``os.replace(staging, final_uri)`` normally succeeds
|
||
in <1ms, but a filesystem-full / permissions / cross-device error
|
||
there would otherwise leave the .webp.tmp behind. Wrap-the-rename
|
||
contract: any OSError on rename drops the staging file before
|
||
propagating, matching the timeout/error/zero-byte branches."""
|
||
src = path.join(asset_dir, 'fixture.tiff')
|
||
_write_image(src, 'TIFF')
|
||
asset = _make_processing_asset('img-rename-fail', src)
|
||
|
||
real_replace = os.replace
|
||
|
||
def boom(staging: str, final_uri: str) -> None:
|
||
# Verify the staging file actually exists before we explode —
|
||
# otherwise the test would also pass if Pillow never wrote.
|
||
assert path.isfile(staging), 'precondition: staging must exist'
|
||
raise OSError('simulated cross-device rename failure')
|
||
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch('anthias_server.processing.os.replace', side_effect=boom),
|
||
):
|
||
with pytest.raises(OSError, match='cross-device'):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
leftover = [n for n in os.listdir(asset_dir) if n.endswith('.webp.tmp')]
|
||
assert not leftover, f'image staging leftover after rename: {leftover}'
|
||
# The real replace was never called.
|
||
del real_replace
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_missing_file_raises_filenotfound(asset_dir: str) -> None:
|
||
"""Source file disappeared between row creation and task
|
||
pickup (cleanup raced operator, disk pressure). Fail clean so
|
||
on_failure writes the error and clears the flag."""
|
||
src = path.join(asset_dir, 'gone.tiff')
|
||
asset = _make_processing_asset('img-gone', src)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
with pytest.raises(FileNotFoundError):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_jpeg_routes_no_op(asset_dir: str) -> None:
|
||
"""A caller that mis-routed a JPEG (or .png, .webp) through this
|
||
task must not re-encode it — the row already plays. Just clear
|
||
is_processing and move on. Defensive guard for future call sites."""
|
||
src = path.join(asset_dir, 'photo.jpg')
|
||
_write_image(src, 'JPEG', mode='RGB')
|
||
asset = _make_processing_asset('img-jpeg', src)
|
||
|
||
with mock.patch.object(processing, '_notify') as notify:
|
||
processing._run_image_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert asset.uri == src # untouched
|
||
assert path.exists(src)
|
||
notify.assert_called_once_with('img-jpeg')
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_no_op_path_clears_stale_error_message(asset_dir: str) -> None:
|
||
"""A row being re-uploaded after a previous failed conversion
|
||
carries ``metadata.error_message``. When the new upload is a
|
||
format the pipeline doesn't convert (JPEG/PNG/etc.), the no-op
|
||
branch must still wipe the stale error so the operator's table
|
||
doesn't keep showing the "Failed" pill on a row that's now
|
||
fine."""
|
||
src = path.join(asset_dir, 'photo.jpg')
|
||
_write_image(src, 'JPEG', mode='RGB')
|
||
asset = _make_processing_asset(
|
||
'img-retry-jpeg',
|
||
src,
|
||
metadata={'error_message': 'previous attempt: libheif crashed'},
|
||
)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert 'error_message' not in asset.metadata
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_row_already_finalized_no_op(asset_dir: str) -> None:
|
||
"""Duplicate task fire on an already-finalised row → no-op. Same
|
||
contract download_youtube_asset enforces."""
|
||
src = path.join(asset_dir, 'fixture.tiff')
|
||
_write_image(src, 'TIFF')
|
||
Asset.objects.create(
|
||
asset_id='img-done',
|
||
name='img-done',
|
||
uri=src,
|
||
mimetype='image',
|
||
duration=10,
|
||
is_processing=False,
|
||
)
|
||
|
||
asset = processing._row_or_none('img-done')
|
||
assert asset is None # task body would short-circuit here
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_row_missing_no_op() -> None:
|
||
"""Row deleted between dispatch and pickup → no-op."""
|
||
assert processing._row_or_none('does-not-exist') is None
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_image_pipeline_clears_prior_error(asset_dir: str) -> None:
|
||
"""A re-uploaded asset whose previous attempt left an error
|
||
message in metadata must clear that on success — the operator's
|
||
next refresh shouldn't show a stale failure on a now-good row."""
|
||
src = path.join(asset_dir, 'fixture.tiff')
|
||
_write_image(src, 'TIFF')
|
||
asset = _make_processing_asset(
|
||
'img-retry',
|
||
src,
|
||
metadata={'error_message': 'previous run failed'},
|
||
)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
processing._run_image_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert 'error_message' not in asset.metadata
|
||
assert asset.metadata['converted'] is True
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_set_processing_error_writes_metadata(asset_dir: str) -> None:
|
||
"""Direct test of the failure-state contract: error message
|
||
persisted, is_processing cleared, prior metadata preserved,
|
||
and the row disabled so the viewer's scheduler can't pick up
|
||
a known-bad asset."""
|
||
asset = Asset.objects.create(
|
||
asset_id='img-err',
|
||
name='img-err',
|
||
uri=path.join(asset_dir, 'broken.tiff'),
|
||
mimetype='image',
|
||
duration=10,
|
||
# Row was active before the failure (the operator enabled it
|
||
# at create time). The error path must flip is_enabled too.
|
||
is_enabled=True,
|
||
is_processing=True,
|
||
play_order=0,
|
||
metadata={'original_ext': '.tiff'},
|
||
)
|
||
processing._set_processing_error(asset.asset_id, 'libheif: bad input')
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert asset.metadata['error_message'] == 'libheif: bad input'
|
||
# Earlier metadata keys are merged, not stomped on.
|
||
assert asset.metadata['original_ext'] == '.tiff'
|
||
# Row disabled: the viewer's scheduling.generate_asset_list
|
||
# filters on is_enabled+date, not metadata.error_message — so
|
||
# without this flip a failed conversion would still get queued
|
||
# for playback even though the file at uri is unplayable.
|
||
assert asset.is_enabled is False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# VIDEO normalisation tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_missing_file_raises_filenotfound(asset_dir: str) -> None:
|
||
src = path.join(asset_dir, 'gone.mp4')
|
||
asset = _make_processing_asset('vid-gone', src, mimetype='video')
|
||
with mock.patch.object(processing, '_notify'):
|
||
with pytest.raises(FileNotFoundError):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
|
||
@pytest_ffmpeg
|
||
@pytest.mark.django_db
|
||
def test_video_supported_codec_writes_metadata_and_clears_processing(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""H.264 upload on a Pi 4 (which HW-decodes H.264) is accepted:
|
||
the row gets codec / dims / fps written into ``metadata`` and
|
||
``is_processing`` is cleared."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi4-64')
|
||
src = path.join(asset_dir, 'sample.mp4')
|
||
_make_video(src, codec='libx264', container='mp4', audio='aac')
|
||
asset = _make_processing_asset('vid-h264-pi4', src, mimetype='video')
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert asset.metadata.get('video_codec') == 'h264'
|
||
assert asset.metadata.get('video_width') == 32
|
||
assert asset.metadata.get('video_height') == 32
|
||
# The asset file itself is untouched — no transcode.
|
||
assert path.exists(src)
|
||
|
||
|
||
@pytest_ffmpeg
|
||
@pytest.mark.django_db
|
||
def test_video_unsupported_codec_raises_with_ffmpeg_recipe(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""An H.264 upload on a Pi 5 (HEVC only — Pi 5's mpv has no v4l2-
|
||
request H.264 hwdec) is rejected. The exception's message names
|
||
the rejected codec and supported set; its ``recipe`` attribute
|
||
carries an ffmpeg command pre-filled with the upload's filename
|
||
(taken from ``metadata.upload_name``, which the upload view
|
||
stashes at create time) so the operator can copy-paste it
|
||
verbatim."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi5')
|
||
src = path.join(asset_dir, 'sample.mp4')
|
||
_make_video(src, codec='libx264', container='mp4', audio='aac')
|
||
asset = _make_processing_asset(
|
||
'vid-h264-pi5',
|
||
src,
|
||
mimetype='video',
|
||
metadata={'upload_name': 'beach-clip.mp4'},
|
||
)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
with pytest.raises(processing.UnsupportedVideoCodecError) as excinfo:
|
||
processing._run_video_normalisation(asset)
|
||
|
||
import shlex as _shlex
|
||
|
||
msg = str(excinfo.value)
|
||
assert "'h264'" in msg
|
||
assert 'hevc' in msg
|
||
# Recipe is on the exception, not in the message body.
|
||
recipe = excinfo.value.recipe
|
||
assert 'libx265' in recipe # Pi 5 supports HEVC only.
|
||
assert '-tag:v hvc1' in recipe
|
||
# The upload's filename appears in the recipe's input slot —
|
||
# operator can copy and paste it without hand-editing INPUT.
|
||
tokens = _shlex.split(recipe)
|
||
assert tokens[1] == '-i'
|
||
assert tokens[2] == 'beach-clip.mp4'
|
||
# Output filename carries a ``.hevc.`` suffix so the recipe
|
||
# doesn't ask the operator to overwrite their source file.
|
||
assert tokens[-1] == 'beach-clip.hevc.mp4'
|
||
|
||
# Metadata was still written so the operator can see *what* they
|
||
# uploaded next to the error message in the asset list.
|
||
asset.refresh_from_db()
|
||
assert asset.metadata.get('video_codec') == 'h264'
|
||
assert asset.metadata.get('video_width') == 32
|
||
|
||
|
||
@pytest_ffmpeg
|
||
@pytest.mark.django_db
|
||
def test_video_unsupported_codec_recipe_falls_back_to_upload_placeholder(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""When the row has no ``metadata.upload_name`` (YouTube
|
||
downloads, pre-rebrand rows), the recipe uses a stable
|
||
``upload<ext>`` placeholder so the operator still sees the
|
||
correct input extension to substitute."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi5')
|
||
src = path.join(asset_dir, 'noname.mp4')
|
||
_make_video(src, codec='libx264', container='mp4', audio='aac')
|
||
asset = _make_processing_asset('vid-noname', src, mimetype='video')
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
with pytest.raises(processing.UnsupportedVideoCodecError) as excinfo:
|
||
processing._run_video_normalisation(asset)
|
||
|
||
import shlex as _shlex
|
||
|
||
recipe = excinfo.value.recipe
|
||
tokens = _shlex.split(recipe)
|
||
assert tokens[1] == '-i'
|
||
assert tokens[2] == 'upload.mp4'
|
||
assert tokens[-1] == 'upload.hevc.mp4'
|
||
|
||
|
||
@pytest_ffmpeg
|
||
@pytest.mark.django_db
|
||
def test_video_unsupported_codec_h264_board_recipe(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""A board that supports H.264 (Pi 4) gets a libx264 recipe —
|
||
libx264 is significantly faster than libx265 for the operator."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi4-64')
|
||
src = path.join(asset_dir, 'sample.mpg')
|
||
_make_video(src, codec='mpeg2video', container='mpeg', audio=None)
|
||
asset = _make_processing_asset('vid-mpeg2', src, mimetype='video')
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
with pytest.raises(processing.UnsupportedVideoCodecError) as excinfo:
|
||
processing._run_video_normalisation(asset)
|
||
|
||
recipe = excinfo.value.recipe
|
||
assert 'libx264' in recipe
|
||
assert 'libx265' not in recipe
|
||
|
||
|
||
def test_pi3_64_hw_decode_set_is_h264_only(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
"""The 64-bit Qt6 Pi 3 board (``pi3-64``) runs the same VideoCore IV
|
||
silicon as the 32-bit ``pi3`` — H.264-only HW decode, no HEVC. The
|
||
gate must reject HEVC for it just like the armhf stream."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi3-64')
|
||
assert processing._hw_decoded_codecs() == frozenset({'h264'})
|
||
|
||
|
||
@pytest.mark.parametrize(
|
||
'filename',
|
||
[
|
||
"O'Brien.mp4",
|
||
'two words.mov',
|
||
'evil; rm -rf $HOME.mp4',
|
||
'tick`uname`.mp4',
|
||
'sub$(whoami).mp4',
|
||
],
|
||
)
|
||
def test_ffmpeg_recipe_quotes_hostile_filenames(filename: str) -> None:
|
||
"""``_ffmpeg_reencode_recipe`` must round-trip any filename through
|
||
``shlex`` so a user-supplied ``upload_name`` can't break out of the
|
||
recipe's quoting and inject commands the operator copy-pastes.
|
||
|
||
Round-trip means: ``shlex.split(recipe)`` recovers the *original*
|
||
filename byte-for-byte in the input slot. If the recipe still
|
||
interpolated raw (the pre-fix ``f"'{filename}'"`` path), the
|
||
embedded quote / metachar would either truncate the token or shell-
|
||
interpret on paste."""
|
||
import shlex as _shlex
|
||
|
||
recipe = processing._ffmpeg_reencode_recipe(frozenset({'h264'}), filename)
|
||
tokens = _shlex.split(recipe)
|
||
# ffmpeg -i <input> -c:v libx264 ... <output>
|
||
assert tokens[0] == 'ffmpeg'
|
||
assert tokens[1] == '-i'
|
||
assert tokens[2] == filename
|
||
# Output filename ends with .h264.mp4 and is the recipe's last token.
|
||
assert tokens[-1].endswith('.h264.mp4')
|
||
|
||
|
||
@pytest_ffmpeg
|
||
@pytest.mark.django_db
|
||
def test_video_unknown_codec_is_rejected(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""ffprobe failure (codec reported as 'unknown') must reject the
|
||
upload — we won't pass through a clip we can't certify against
|
||
the board's HW decode set."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi4-64')
|
||
src = path.join(asset_dir, 'sample.mp4')
|
||
_make_video(src, codec='libx264', container='mp4', audio='aac')
|
||
asset = _make_processing_asset('vid-unknown', src, mimetype='video')
|
||
|
||
fake_summary = {
|
||
'container': 'unknown',
|
||
'video_codec': 'unknown',
|
||
'video_pixels': None,
|
||
'video_width': None,
|
||
'video_height': None,
|
||
'video_fps': None,
|
||
'audio_codec': 'unknown',
|
||
'duration_seconds': None,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
pytest.raises(processing.UnsupportedVideoCodecError) as excinfo,
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
assert 'unknown' in str(excinfo.value)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_arm64_catch_all_rejects_everything(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""The catch-all ``arm64`` DEVICE_TYPE has no entry in the HW
|
||
decode map (an unknown aarch64 SBC isn't guaranteed to expose a
|
||
v4l2-request decoder mpv can address). Without a host_agent
|
||
subtype publish, every video upload is rejected — operator has
|
||
to install a board-specific image to get HW decode."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'arm64')
|
||
src = path.join(asset_dir, 'sample.mp4')
|
||
# Create an empty placeholder file so the FileNotFoundError check
|
||
# passes; we mock ffprobe below.
|
||
with open(src, 'wb') as f:
|
||
f.write(b'\x00')
|
||
asset = _make_processing_asset('vid-arm64', src, mimetype='video')
|
||
|
||
fake_summary = {
|
||
'container': 'mp4',
|
||
'video_codec': 'h264',
|
||
'video_pixels': 32 * 32,
|
||
'video_width': 32,
|
||
'video_height': 32,
|
||
'video_fps': 10.0,
|
||
'audio_codec': 'aac',
|
||
'duration_seconds': 1,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
# Subtype absent — Redis returns None.
|
||
mock.patch(
|
||
'anthias_common.board.get_board_subtype', return_value=None
|
||
),
|
||
pytest.raises(processing.UnsupportedVideoCodecError) as excinfo,
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
msg = str(excinfo.value)
|
||
# Catch-all branch must explain the board-subtype gap rather
|
||
# than the misleading "Supported: none." that earlier revisions
|
||
# surfaced.
|
||
assert 'subtype' in msg.lower()
|
||
assert 'board-specific image' in msg.lower()
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_arm64_with_rockpi4_subtype_accepts_h264(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""A Rock Pi 4 running the catch-all arm64 image gets its
|
||
``{h264, hevc}`` set once ``host:board_subtype=rockpi4`` is
|
||
published by anthias_host_agent."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'arm64')
|
||
src = path.join(asset_dir, 'sample.mp4')
|
||
with open(src, 'wb') as f:
|
||
f.write(b'\x00')
|
||
asset = _make_processing_asset('vid-rockpi', src, mimetype='video')
|
||
|
||
fake_summary = {
|
||
'container': 'mp4',
|
||
'video_codec': 'h264',
|
||
'video_pixels': 32 * 32,
|
||
'video_width': 32,
|
||
'video_height': 32,
|
||
'video_fps': 10.0,
|
||
'audio_codec': 'aac',
|
||
'duration_seconds': 1,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
mock.patch(
|
||
'anthias_common.board.get_board_subtype', return_value='rockpi4'
|
||
),
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert asset.metadata.get('video_codec') == 'h264'
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Low-RAM resolution gate (boards with < 1.5 GiB MemTotal). On-device
|
||
# measurement on a 1 GB Rock Pi 4 showed 4K HEVC OOM-kills the viewer
|
||
# container (dmesg ``global_oom`` on the docker container's bash); the
|
||
# codec gate rejects above-1080p uploads on those boards so an operator
|
||
# gets a clear failure pill + downscale recipe instead of a wedged
|
||
# device.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_low_ram_rejects_4k_with_resolution_message(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""A 4K HEVC upload on a low-RAM board (codec is in the supported
|
||
set, but resolution exceeds 1920×1080) is rejected with a
|
||
resolution-specific message and a recipe that includes the
|
||
downscale ``-vf scale=`` clause. Validates the gate fires *only*
|
||
on the resolution leg — codec is otherwise fine."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'arm64')
|
||
src = path.join(asset_dir, 'big.mp4')
|
||
with open(src, 'wb') as f:
|
||
f.write(b'\x00')
|
||
asset = _make_processing_asset(
|
||
'vid-4k-lowram',
|
||
src,
|
||
mimetype='video',
|
||
metadata={'upload_name': 'beach-4k.mp4'},
|
||
)
|
||
|
||
fake_summary = {
|
||
'container': 'mp4',
|
||
'video_codec': 'hevc',
|
||
'video_pixels': 3840 * 2160,
|
||
'video_width': 3840,
|
||
'video_height': 2160,
|
||
'video_fps': 30.0,
|
||
'audio_codec': 'aac',
|
||
'duration_seconds': 60,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
mock.patch(
|
||
'anthias_common.board.get_board_subtype', return_value='rockpi4'
|
||
),
|
||
mock.patch(
|
||
'anthias_server.processing.is_low_ram_device', return_value=True
|
||
),
|
||
pytest.raises(processing.UnsupportedVideoCodecError) as excinfo,
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
msg = str(excinfo.value)
|
||
assert '3840x2160' in msg
|
||
assert '1080p' in msg
|
||
assert '1.5 GiB' in msg
|
||
# Recipe carries the downscale clause + a board-appropriate
|
||
# codec. rockpi4 supports H.264 so libx264 is preferred.
|
||
recipe = excinfo.value.recipe
|
||
assert '-vf scale=1920:1080:force_original_aspect_ratio=decrease' in recipe
|
||
assert 'libx264' in recipe
|
||
# Metadata still captured (operator sees what they uploaded).
|
||
asset.refresh_from_db()
|
||
assert asset.metadata.get('video_width') == 3840
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_low_ram_accepts_1080p(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""1080p HEVC on the same low-RAM board passes the gate — exactly
|
||
the boundary the on-device measurement said survives."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'arm64')
|
||
src = path.join(asset_dir, 'hd.mp4')
|
||
with open(src, 'wb') as f:
|
||
f.write(b'\x00')
|
||
asset = _make_processing_asset('vid-1080-lowram', src, mimetype='video')
|
||
|
||
fake_summary = {
|
||
'container': 'mp4',
|
||
'video_codec': 'hevc',
|
||
'video_pixels': 1920 * 1080,
|
||
'video_width': 1920,
|
||
'video_height': 1080,
|
||
'video_fps': 30.0,
|
||
'audio_codec': 'aac',
|
||
'duration_seconds': 15,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
mock.patch(
|
||
'anthias_common.board.get_board_subtype', return_value='rockpi4'
|
||
),
|
||
mock.patch(
|
||
'anthias_server.processing.is_low_ram_device', return_value=True
|
||
),
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert asset.metadata.get('video_codec') == 'hevc'
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_high_ram_accepts_4k(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""A 4K upload on a high-RAM board (Pi 5, Pi 4 4GB) is NOT gated
|
||
by resolution — the low-RAM cap only fires when MemTotal sits
|
||
below the threshold."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'pi5')
|
||
src = path.join(asset_dir, 'big.mp4')
|
||
with open(src, 'wb') as f:
|
||
f.write(b'\x00')
|
||
asset = _make_processing_asset('vid-4k-pi5', src, mimetype='video')
|
||
|
||
fake_summary = {
|
||
'container': 'mp4',
|
||
'video_codec': 'hevc',
|
||
'video_pixels': 3840 * 2160,
|
||
'video_width': 3840,
|
||
'video_height': 2160,
|
||
'video_fps': 30.0,
|
||
'audio_codec': 'aac',
|
||
'duration_seconds': 60,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
# High-RAM device — gate inactive.
|
||
mock.patch(
|
||
'anthias_server.processing.is_low_ram_device', return_value=False
|
||
),
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert asset.metadata.get('video_width') == 3840
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_video_low_ram_unsupported_codec_recipe_includes_downscale(
|
||
asset_dir: str, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
"""When the upload fails BOTH the codec gate and the resolution
|
||
gate (e.g. 4K MPEG-2 on Rock Pi 4 1GB), the codec message wins
|
||
(codec is the strictly stronger rejection — operator has to
|
||
re-encode either way) but the recipe folds in the downscale so
|
||
the operator's re-encode also lands within the 1080p envelope.
|
||
Saves them an iteration of "re-encoded the codec, now it fails
|
||
the resolution gate too"."""
|
||
monkeypatch.setenv('DEVICE_TYPE', 'arm64')
|
||
src = path.join(asset_dir, 'old.mpg')
|
||
with open(src, 'wb') as f:
|
||
f.write(b'\x00')
|
||
asset = _make_processing_asset('vid-mpeg2-4k', src, mimetype='video')
|
||
|
||
fake_summary = {
|
||
'container': 'mpeg',
|
||
'video_codec': 'mpeg2video', # Not in rockpi4's HW set.
|
||
'video_pixels': 3840 * 2160,
|
||
'video_width': 3840,
|
||
'video_height': 2160,
|
||
'video_fps': 30.0,
|
||
'audio_codec': 'mp2',
|
||
'duration_seconds': 60,
|
||
}
|
||
with (
|
||
mock.patch.object(processing, '_notify'),
|
||
mock.patch.object(
|
||
processing, '_ffprobe_summary', return_value=fake_summary
|
||
),
|
||
mock.patch(
|
||
'anthias_common.board.get_board_subtype', return_value='rockpi4'
|
||
),
|
||
mock.patch(
|
||
'anthias_server.processing.is_low_ram_device', return_value=True
|
||
),
|
||
pytest.raises(processing.UnsupportedVideoCodecError) as excinfo,
|
||
):
|
||
processing._run_video_normalisation(asset)
|
||
|
||
msg = str(excinfo.value)
|
||
# Codec message wins (strictly stronger rejection).
|
||
assert 'codec' in msg.lower()
|
||
assert "'mpeg2video'" in msg
|
||
# Recipe folds in the downscale so a single re-encode satisfies
|
||
# both gates.
|
||
recipe = excinfo.value.recipe
|
||
assert '-vf scale=1920:1080:force_original_aspect_ratio=decrease' in recipe
|
||
|
||
|
||
def test_ffmpeg_recipe_omits_scale_clause_by_default() -> None:
|
||
"""The codec-only rejection path passes ``cap_to_1080p=False``
|
||
(the default). Recipe must NOT include a scale clause then — we
|
||
don't want to suggest a needless downscale when an HD codec
|
||
swap is all that's wanted."""
|
||
recipe = processing._ffmpeg_reencode_recipe(frozenset({'h264'}), 'foo.mkv')
|
||
assert '-vf scale' not in recipe
|
||
assert '-c:v libx264' in recipe
|
||
|
||
|
||
def test_ffmpeg_recipe_includes_scale_clause_when_capping() -> None:
|
||
"""``cap_to_1080p=True`` injects the
|
||
``-vf scale=1920:1080:force_original_aspect_ratio=decrease`` clause
|
||
between the input and the codec arguments, so the operator's
|
||
re-encode lands inside the 1920×1080 envelope. The
|
||
``force_original_aspect_ratio=decrease`` keeps the aspect ratio
|
||
intact for landscape, ultrawide, and portrait sources alike."""
|
||
recipe = processing._ffmpeg_reencode_recipe(
|
||
frozenset({'h264'}), 'foo.mkv', cap_to_1080p=True
|
||
)
|
||
assert '-vf scale=1920:1080:force_original_aspect_ratio=decrease' in recipe
|
||
# Order matters — scale must be before the encoder so the encoder
|
||
# sees the downscaled frames.
|
||
scale_idx = recipe.index('-vf')
|
||
encoder_idx = recipe.index('-c:v')
|
||
assert scale_idx < encoder_idx
|
||
|
||
|
||
def test_exceeds_low_ram_pixel_cap_unknown_dims_returns_false() -> None:
|
||
"""ffprobe-failed uploads (``video_width=None``) skip the
|
||
resolution gate — the codec gate already collapsed to ``unknown``
|
||
and will reject them; double-rejecting on dimensions we can't
|
||
measure would just be noise."""
|
||
with mock.patch(
|
||
'anthias_server.processing.is_low_ram_device', return_value=True
|
||
):
|
||
assert processing._exceeds_low_ram_pixel_cap(None, None) is False
|
||
assert processing._exceeds_low_ram_pixel_cap(0, 0) is False
|
||
assert processing._exceeds_low_ram_pixel_cap(1920, None) is False
|
||
|
||
|
||
def test_exceeds_low_ram_pixel_cap_high_ram_returns_false() -> None:
|
||
"""High-RAM devices never hit the cap, even for >1080p sources."""
|
||
with mock.patch(
|
||
'anthias_server.processing.is_low_ram_device', return_value=False
|
||
):
|
||
assert processing._exceeds_low_ram_pixel_cap(3840, 2160) is False
|
||
assert processing._exceeds_low_ram_pixel_cap(1920, 1080) is False
|
||
|
||
|
||
def test_format_subprocess_stderr_decodes_and_trims() -> None:
|
||
"""``_format_subprocess_stderr`` must produce operator-readable
|
||
text: bytes decoded as UTF-8 (with replacement for malformed
|
||
bytes), no ``b'...'`` wrapper, very long output trimmed to its
|
||
tail (where ffmpeg's actual diagnostic lives)."""
|
||
# 1) Plain bytes decode cleanly.
|
||
exc = sh.ErrorReturnCode_1(
|
||
full_cmd='ffmpeg ...',
|
||
stdout=b'',
|
||
stderr=b'Invalid data found in input\n',
|
||
truncate=False,
|
||
)
|
||
out = processing._format_subprocess_stderr(exc)
|
||
assert out == 'Invalid data found in input'
|
||
assert "b'" not in out
|
||
|
||
# 2) Malformed UTF-8 doesn't crash — replacement char is fine.
|
||
exc = sh.ErrorReturnCode_1(
|
||
full_cmd='ffmpeg ...',
|
||
stdout=b'',
|
||
stderr=b'broken\xff byte',
|
||
truncate=False,
|
||
)
|
||
out = processing._format_subprocess_stderr(exc)
|
||
assert 'broken' in out and 'byte' in out
|
||
|
||
# 3) Long stderr is tail-trimmed with an ellipsis prefix so the
|
||
# operator sees the diagnostic, not 4 KB of build-info preamble.
|
||
long_tail = 'final-error-line'
|
||
big = b'x' * 2000 + long_tail.encode()
|
||
exc = sh.ErrorReturnCode_1(
|
||
full_cmd='ffmpeg ...',
|
||
stdout=b'',
|
||
stderr=big,
|
||
truncate=False,
|
||
)
|
||
out = processing._format_subprocess_stderr(exc)
|
||
assert out.startswith('…')
|
||
assert long_tail in out
|
||
assert len(out) <= processing._STDERR_TAIL_BYTES + 1
|
||
|
||
# 4) Empty stderr returns the empty string, not "b''".
|
||
exc = sh.ErrorReturnCode_1(
|
||
full_cmd='ffmpeg ...',
|
||
stdout=b'',
|
||
stderr=b'',
|
||
truncate=False,
|
||
)
|
||
assert processing._format_subprocess_stderr(exc) == ''
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ffprobe summary parsing — tested independently of the runner
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_ffprobe_summary_handles_missing_streams() -> None:
|
||
"""A probe response missing audio/video streams is reported as
|
||
'none' (audio absent — passthrough OK if the rest matches) or
|
||
'unknown' (video absent — never passthrough). Defends the
|
||
passthrough decision against ffprobe schema drift."""
|
||
fake_probe_payload = {
|
||
'format': {},
|
||
'streams': [
|
||
# No video stream in this payload — the file is audio-only
|
||
# for the purpose of this test (a podcast-style .m4a was
|
||
# mis-routed to the video pipeline).
|
||
{'codec_type': 'audio', 'codec_name': 'aac'},
|
||
],
|
||
}
|
||
with mock.patch.object(
|
||
processing, '_ffprobe_streams', return_value=fake_probe_payload
|
||
):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
assert summary['audio_codec'] == 'aac'
|
||
assert summary['video_codec'] == 'unknown'
|
||
|
||
|
||
def test_ffprobe_summary_handles_no_audio_track() -> None:
|
||
fake = {
|
||
'format': {},
|
||
'streams': [
|
||
{'codec_type': 'video', 'codec_name': 'h264'},
|
||
],
|
||
}
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
assert summary['video_codec'] == 'h264'
|
||
assert summary['audio_codec'] == 'none'
|
||
|
||
|
||
@pytest.mark.parametrize(
|
||
('r_frame_rate', 'expected_fps'),
|
||
[
|
||
# Integer rates land cleanly.
|
||
('30/1', 30.0),
|
||
('60/1', 60.0),
|
||
('25/1', 25.0),
|
||
# NTSC drop-frame: 30000/1001 ≈ 29.97.
|
||
('30000/1001', 29.97002997002997),
|
||
# 60000/1001 ≈ 59.94 (NTSC 60).
|
||
('60000/1001', 59.94005994005994),
|
||
# Garbage values collapse to None so the envelope cap
|
||
# treats the source as "we can't tell" and skips the fps
|
||
# gate — codec / resolution gates still fire.
|
||
('bogus', None),
|
||
('60', None), # no slash → no rational, drop to None
|
||
('0/0', None), # denominator 0 → no fps
|
||
],
|
||
)
|
||
def test_ffprobe_summary_parses_video_fps(
|
||
r_frame_rate: str, expected_fps: float | None
|
||
) -> None:
|
||
"""``video_fps`` is the average frame rate parsed from
|
||
ffprobe's ``r_frame_rate`` rational. The envelope transcode
|
||
uses it to decide when to emit ``-r envelope.max_fps`` — only
|
||
when source fps > cap. Garbage / zero-denominator → ``None``."""
|
||
fake = {
|
||
'format': {},
|
||
'streams': [
|
||
{
|
||
'codec_type': 'video',
|
||
'codec_name': 'h264',
|
||
'r_frame_rate': r_frame_rate,
|
||
},
|
||
],
|
||
}
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
if expected_fps is None:
|
||
assert summary['video_fps'] is None
|
||
else:
|
||
assert summary['video_fps'] == pytest.approx(expected_fps)
|
||
|
||
|
||
def test_ffprobe_summary_prefers_extension_match_in_synonym_list() -> None:
|
||
"""ffprobe's ``format_name`` for the QuickTime family is a
|
||
synonym list (e.g. ``mov,mp4,m4a,3gp,3g2,mj2``). Operator-facing
|
||
metadata should match what the operator uploaded: an ``.mp4``
|
||
file surfaces as ``mp4`` (not ``mov``, the first token), and an
|
||
``.m4v`` file surfaces as ``m4v`` if ffprobe includes it. Falls
|
||
back to the first ffprobe token only when the extension doesn't
|
||
appear in the list at all (extension-less URI / genuinely exotic
|
||
container)."""
|
||
mp4_format_name = 'mov,mp4,m4a,3gp,3g2,mj2'
|
||
fake = {
|
||
'format': {'format_name': mp4_format_name},
|
||
'streams': [{'codec_type': 'video', 'codec_name': 'h264'}],
|
||
}
|
||
# .mp4 → operator sees 'mp4', not 'mov'.
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
assert summary['container'] == 'mp4'
|
||
|
||
# .m4a → 'm4a' (also in the synonym list).
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.m4a')
|
||
assert summary['container'] == 'm4a'
|
||
|
||
# No extension match (.bin hides mp4 bytes) → first ffprobe
|
||
# token wins so the metadata still reflects something concrete.
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.bin')
|
||
assert summary['container'] == 'mov'
|
||
|
||
# Made-up format name with a misleading filename extension →
|
||
# reported verbatim, no extension fallback (the file genuinely
|
||
# isn't mp4 despite the name).
|
||
fake = {
|
||
'format': {'format_name': 'unsupported_format'},
|
||
'streams': [{'codec_type': 'video', 'codec_name': 'h264'}],
|
||
}
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
assert summary['container'] == 'unsupported_format'
|
||
|
||
|
||
def test_ffprobe_summary_falls_back_to_extension_when_format_missing() -> None:
|
||
"""When ffprobe doesn't populate ``format.format_name`` (older
|
||
ffprobe builds, malformed input), fall back to the filename
|
||
extension so we still get a deterministic answer rather than
|
||
raising."""
|
||
fake: dict[str, Any] = {'format': {}, 'streams': []}
|
||
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
|
||
summary = processing._ffprobe_summary('fixture.mkv')
|
||
assert summary['container'] == 'mkv'
|
||
|
||
|
||
def test_ffprobe_summary_handles_probe_failure() -> None:
|
||
"""Probe errors (corrupt file, ffprobe missing) must not crash
|
||
the task — they downgrade to 'unknown' so the caller falls
|
||
through to transcode."""
|
||
with mock.patch.object(
|
||
processing,
|
||
'_ffprobe_streams',
|
||
side_effect=sh.ErrorReturnCode_1(
|
||
full_cmd='ffprobe ...',
|
||
stdout=b'',
|
||
stderr=b'invalid',
|
||
truncate=False,
|
||
),
|
||
):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
assert summary == {
|
||
'container': 'unknown',
|
||
'video_codec': 'unknown',
|
||
'video_pixels': None,
|
||
'video_width': None,
|
||
'video_height': None,
|
||
'video_fps': None,
|
||
'audio_codec': 'unknown',
|
||
'duration_seconds': None,
|
||
}
|
||
|
||
|
||
def test_ffprobe_summary_extracts_duration_from_probe_payload() -> None:
|
||
"""The runner reuses ``summary['duration_seconds']`` on the
|
||
passthrough path so we don't shell ffprobe twice. Confirm the
|
||
helper extracts ``format.duration`` correctly: floors to 1s
|
||
(sub-second clips can't slot a 0s rotation entry), returns
|
||
None when missing or unparseable."""
|
||
payload: dict[str, Any] = {
|
||
'format': {
|
||
'format_name': 'mov,mp4,m4a,3gp,3g2,mj2',
|
||
'duration': '12.7',
|
||
},
|
||
'streams': [{'codec_type': 'video', 'codec_name': 'h264'}],
|
||
}
|
||
with mock.patch.object(
|
||
processing, '_ffprobe_streams', return_value=payload
|
||
):
|
||
summary = processing._ffprobe_summary('clip.mp4')
|
||
assert summary['duration_seconds'] == 12
|
||
|
||
# Sub-second clip floors to 1 (matches the YouTube-task rule).
|
||
payload['format']['duration'] = '0.4'
|
||
with mock.patch.object(
|
||
processing, '_ffprobe_streams', return_value=payload
|
||
):
|
||
summary = processing._ffprobe_summary('clip.mp4')
|
||
assert summary['duration_seconds'] == 1
|
||
|
||
# Missing duration → None.
|
||
del payload['format']['duration']
|
||
with mock.patch.object(
|
||
processing, '_ffprobe_streams', return_value=payload
|
||
):
|
||
summary = processing._ffprobe_summary('clip.mp4')
|
||
assert summary['duration_seconds'] is None
|
||
|
||
# Unparseable string → None (rather than crashing the task).
|
||
payload['format']['duration'] = 'N/A'
|
||
with mock.patch.object(
|
||
processing, '_ffprobe_streams', return_value=payload
|
||
):
|
||
summary = processing._ffprobe_summary('clip.mp4')
|
||
assert summary['duration_seconds'] is None
|
||
|
||
|
||
def test_format_subprocess_stderr_byte_trim_handles_multibyte_utf8() -> None:
|
||
"""The trim is documented as a byte limit; multibyte characters
|
||
in the keep window must not push the decoded string over the
|
||
limit. Edge case: trimming mid-multibyte produces a replacement
|
||
character rather than a UnicodeDecodeError."""
|
||
tail = '— ffmpeg final error: invalid bitstream — '
|
||
big = b'x' * 2000 + tail.encode('utf-8')
|
||
exc = sh.ErrorReturnCode_1(
|
||
full_cmd='ffmpeg ...',
|
||
stdout=b'',
|
||
stderr=big,
|
||
truncate=False,
|
||
)
|
||
out = processing._format_subprocess_stderr(exc)
|
||
# The tail end (which has the diagnostic) is preserved.
|
||
assert 'invalid bitstream' in out
|
||
# The decoded string never exceeds the byte budget plus a small
|
||
# margin for the leading ellipsis character.
|
||
assert len(out.encode('utf-8')) <= processing._STDERR_TAIL_BYTES + 4
|
||
|
||
|
||
def test_ffprobe_summary_handles_missing_ffprobe_binary() -> None:
|
||
"""A stripped-down container or dev box without ffprobe in PATH
|
||
must not crash the normalisation task. ``sh.CommandNotFound``
|
||
is raised before any subprocess starts; the helper collapses
|
||
it to the same all-'unknown' summary as a probe-runtime error,
|
||
so the caller falls through to the transcode branch (which
|
||
will then itself fail clean if ffmpeg is also missing)."""
|
||
with mock.patch.object(
|
||
processing,
|
||
'_ffprobe_streams',
|
||
side_effect=sh.CommandNotFound('ffprobe not on PATH'),
|
||
):
|
||
summary = processing._ffprobe_summary('fixture.mp4')
|
||
assert summary == {
|
||
'container': 'unknown',
|
||
'video_codec': 'unknown',
|
||
'video_pixels': None,
|
||
'video_width': None,
|
||
'video_height': None,
|
||
'video_fps': None,
|
||
'audio_codec': 'unknown',
|
||
'duration_seconds': None,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Celery wrapper tests — task-level behaviour (no_op guards, on_failure)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_image_asset_celery_no_op_when_row_finalized(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""The celery task body must short-circuit on a row that's
|
||
already cleared is_processing — duplicate dispatch can't
|
||
re-convert and stomp on operator-edited state."""
|
||
Asset.objects.create(
|
||
asset_id='img-final',
|
||
name='img-final',
|
||
uri=path.join(asset_dir, 'fixture.webp'),
|
||
mimetype='image',
|
||
duration=10,
|
||
is_processing=False,
|
||
)
|
||
from anthias_server.celery_tasks import normalize_image_asset
|
||
|
||
with mock.patch.object(processing, '_run_image_normalisation') as run:
|
||
normalize_image_asset('img-final')
|
||
run.assert_not_called()
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_video_asset_celery_no_op_when_row_finalized(
|
||
asset_dir: str,
|
||
) -> None:
|
||
Asset.objects.create(
|
||
asset_id='vid-final',
|
||
name='vid-final',
|
||
uri=path.join(asset_dir, 'fixture.mp4'),
|
||
mimetype='video',
|
||
duration=10,
|
||
is_processing=False,
|
||
)
|
||
from anthias_server.celery_tasks import normalize_video_asset
|
||
|
||
with mock.patch.object(processing, '_run_video_normalisation') as run:
|
||
normalize_video_asset('vid-final')
|
||
run.assert_not_called()
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_image_asset_celery_no_op_when_row_missing() -> None:
|
||
from anthias_server.celery_tasks import normalize_image_asset
|
||
|
||
with mock.patch.object(processing, '_run_image_normalisation') as run:
|
||
normalize_image_asset('does-not-exist')
|
||
run.assert_not_called()
|
||
|
||
|
||
def test_normalize_tasks_exclude_permanent_oserrors_from_autoretry() -> None:
|
||
"""Both normalisation tasks have ``autoretry_for=(OSError,)`` so a
|
||
transient disk hiccup is retried automatically. Several OSError
|
||
subclasses are permanent and must be excluded so they land on
|
||
on_failure immediately:
|
||
|
||
* ``FileNotFoundError`` — source file disappeared between row
|
||
creation and pickup.
|
||
* ``UnidentifiedImageError`` (image task only) — Pillow refused
|
||
to decode the file. Inherits from OSError, so without the
|
||
explicit exclusion the autoretry filter would sweep it up.
|
||
|
||
Confirms the exclusions are in effect at celery-config time so a
|
||
future change to the decorators can't silently regress the
|
||
immediate-fail contract.
|
||
"""
|
||
from anthias_server.celery_tasks import (
|
||
normalize_image_asset,
|
||
normalize_video_asset,
|
||
)
|
||
|
||
for task in (normalize_image_asset, normalize_video_asset):
|
||
# ``autoretry_for`` and ``dont_autoretry_for`` are read off
|
||
# the celery Task instance via the per-task options dict that
|
||
# ``add_autoretry_behaviour`` populates at registration. They
|
||
# are not declared as class attributes on the Task type, so
|
||
# mypy needs a getattr to see them; the ``celery-types``
|
||
# stubs we use don't model these dynamic options.
|
||
autoretry_for = tuple(getattr(task, 'autoretry_for', ()))
|
||
dont_autoretry_for = tuple(getattr(task, 'dont_autoretry_for', ()))
|
||
assert OSError in autoretry_for, (
|
||
f'{task.name} expected autoretry_for=(OSError,)'
|
||
)
|
||
assert FileNotFoundError in dont_autoretry_for, (
|
||
f'{task.name} expected dont_autoretry_for to include '
|
||
f'FileNotFoundError so missing-source raises immediately'
|
||
)
|
||
|
||
# Image task additionally excludes UnidentifiedImageError (Pillow
|
||
# only — video has no Pillow path).
|
||
assert UnidentifiedImageError in tuple(
|
||
getattr(normalize_image_asset, 'dont_autoretry_for', ())
|
||
), (
|
||
'normalize_image_asset expected dont_autoretry_for to include '
|
||
'UnidentifiedImageError so corrupt-image raises immediately '
|
||
'(it inherits from OSError)'
|
||
)
|
||
|
||
|
||
def test_video_task_declares_codec_rejection_as_expected() -> None:
|
||
"""The codec/resolution gate raises ``UnsupportedVideoCodecError``
|
||
as a deliberate, operator-facing rejection — not a fault. The video
|
||
task must list it in ``throws`` so Celery logs it at INFO without a
|
||
traceback and sentry-sdk's CeleryIntegration skips it (it returns
|
||
early on ``isinstance(exc, task.throws)``), keeping the gate from
|
||
flooding Sentry. The image task never raises it, so it must not be
|
||
swept into the image task's ``throws``.
|
||
"""
|
||
from anthias_server.celery_tasks import (
|
||
normalize_image_asset,
|
||
normalize_video_asset,
|
||
)
|
||
|
||
assert processing.UnsupportedVideoCodecError in tuple(
|
||
getattr(normalize_video_asset, 'throws', ())
|
||
), (
|
||
'normalize_video_asset expected throws to include '
|
||
'UnsupportedVideoCodecError so the by-design codec rejection '
|
||
'is not reported to Sentry'
|
||
)
|
||
assert processing.UnsupportedVideoCodecError not in tuple(
|
||
getattr(normalize_image_asset, 'throws', ())
|
||
)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_on_failure_writes_error_metadata(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""The custom Task.on_failure path must persist the error message
|
||
and clear is_processing — operator must never see a row stuck
|
||
forever in 'Processing' after a crash."""
|
||
asset = _make_processing_asset(
|
||
'img-onfail',
|
||
path.join(asset_dir, 'fixture.tiff'),
|
||
mimetype='image',
|
||
metadata={'original_ext': '.tiff'},
|
||
)
|
||
task = processing._NormalizeAssetTask()
|
||
|
||
# Args[0] is the asset_id: matches the celery task signature.
|
||
with mock.patch.object(processing, '_notify') as notify:
|
||
task.on_failure(
|
||
UnidentifiedImageError('cannot decode'),
|
||
'task-id',
|
||
(asset.asset_id,),
|
||
{},
|
||
None,
|
||
)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
assert (
|
||
'cannot decode' in asset.metadata['error_message']
|
||
and 'UnidentifiedImageError' in asset.metadata['error_message']
|
||
)
|
||
# Earlier metadata keys are preserved.
|
||
assert asset.metadata['original_ext'] == '.tiff'
|
||
notify.assert_called_once_with(asset.asset_id)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_on_failure_no_args_is_safe() -> None:
|
||
"""on_failure called with empty args (e.g. a queueing crash
|
||
before the task body ran) must not raise."""
|
||
task = processing._NormalizeAssetTask()
|
||
# Should not raise.
|
||
task.on_failure(RuntimeError('boom'), 'task-id', (), {}, None)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_on_failure_unsupported_codec_persists_recipe(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""``UnsupportedVideoCodecError`` is the gate's user-facing
|
||
exception. on_failure must:
|
||
|
||
* write the bare message into ``metadata.error_message`` (no
|
||
``UnsupportedVideoCodecError:`` class-name prefix — that's the
|
||
P1 review finding), and
|
||
* mirror the exception's ``recipe`` attribute into
|
||
``metadata.error_recipe`` so the Edit modal can render it in a
|
||
copyable ``<code>`` block.
|
||
"""
|
||
asset = _make_processing_asset(
|
||
'vid-onfail',
|
||
path.join(asset_dir, 'fixture.mpg'),
|
||
mimetype='video',
|
||
)
|
||
task = processing._NormalizeAssetTask()
|
||
exc = processing.UnsupportedVideoCodecError(
|
||
"Video codec 'mpeg2video' is not hardware-decoded on this "
|
||
'device. Supported: h264, hevc.',
|
||
recipe="ffmpeg -i 'fixture.mpg' -c:v libx264 'fixture.mp4'",
|
||
)
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
task.on_failure(exc, 'task-id', (asset.asset_id,), {}, None)
|
||
|
||
asset.refresh_from_db()
|
||
assert asset.is_processing is False
|
||
msg = asset.metadata['error_message']
|
||
assert 'mpeg2video' in msg
|
||
assert 'UnsupportedVideoCodecError' not in msg
|
||
assert asset.metadata['error_recipe'] == (
|
||
"ffmpeg -i 'fixture.mpg' -c:v libx264 'fixture.mp4'"
|
||
)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_normalize_on_failure_clears_stale_error_recipe(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""A subsequent non-recipe failure must clear any stale
|
||
``error_recipe`` from a previous run, otherwise the modal would
|
||
show an outdated recipe alongside the new error message."""
|
||
asset = _make_processing_asset(
|
||
'img-clears',
|
||
path.join(asset_dir, 'fixture.tiff'),
|
||
mimetype='image',
|
||
metadata={
|
||
'error_recipe': "ffmpeg -i 'old.mpg' -c:v libx264 'old.mp4'",
|
||
},
|
||
)
|
||
task = processing._NormalizeAssetTask()
|
||
|
||
with mock.patch.object(processing, '_notify'):
|
||
task.on_failure(
|
||
UnidentifiedImageError('cannot decode'),
|
||
'task-id',
|
||
(asset.asset_id,),
|
||
{},
|
||
None,
|
||
)
|
||
|
||
asset.refresh_from_db()
|
||
assert 'error_recipe' not in asset.metadata
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helper / dispatch tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.mark.parametrize(
|
||
('filename', 'expected'),
|
||
[
|
||
# Every entry in NORMALIZE_IMAGE_EXTS routes through; case
|
||
# variants exercise the case-insensitive lstrip-and-lower
|
||
# path in ``_ext``.
|
||
('foo.heic', True),
|
||
('FOO.HEIC', True),
|
||
('foo.heif', True),
|
||
('foo.tif', True),
|
||
('foo.tiff', True),
|
||
('foo.bmp', True),
|
||
('foo.BMP', True),
|
||
('foo.ico', True),
|
||
('foo.tga', True),
|
||
('foo.jp2', True),
|
||
('foo.j2k', True),
|
||
('foo.jpx', True),
|
||
('foo.jpc', True),
|
||
('foo.jpf', True),
|
||
('foo.avif', True),
|
||
# Already-friendly formats stay untouched — no Celery hop.
|
||
('foo.jpg', False),
|
||
('foo.jpeg', False),
|
||
('foo.png', False),
|
||
('foo.webp', False),
|
||
('foo.gif', False),
|
||
('foo.svg', False),
|
||
# No extension and unknown extensions also fall through to
|
||
# the no-op branch.
|
||
('foo', False),
|
||
('foo.psd', False),
|
||
],
|
||
)
|
||
def test_needs_image_normalisation(filename: str, expected: bool) -> None:
|
||
assert processing.needs_image_normalisation(filename) is expected
|
||
|
||
|
||
def test_dispatch_normalize_image_invokes_celery_task() -> None:
|
||
# ``stamp_processing_start`` writes ``metadata.processing_started_at``
|
||
# for the reconciler — irrelevant to this delay-was-called check, so
|
||
# mocked out to keep the test off the DB.
|
||
with (
|
||
mock.patch(
|
||
'anthias_server.celery_tasks.normalize_image_asset.delay'
|
||
) as delay,
|
||
mock.patch('anthias_server.processing.stamp_processing_start'),
|
||
):
|
||
processing.dispatch_normalize_image('asset-1')
|
||
delay.assert_called_once_with('asset-1')
|
||
|
||
|
||
def test_dispatch_normalize_video_invokes_celery_task() -> None:
|
||
with (
|
||
mock.patch(
|
||
'anthias_server.celery_tasks.normalize_video_asset.delay'
|
||
) as delay,
|
||
mock.patch('anthias_server.processing.stamp_processing_start'),
|
||
):
|
||
processing.dispatch_normalize_video('asset-2')
|
||
delay.assert_called_once_with('asset-2')
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_dispatch_normalize_video_stamps_processing_started_at() -> None:
|
||
"""``dispatch_normalize_video`` writes
|
||
``metadata.processing_started_at`` so the periodic reconciler can
|
||
age the row. Regression guard for GH #2870's second fix line —
|
||
without the stamp, a stuck row has no signal the reconciler can
|
||
use to decide whether enough time has elapsed to re-dispatch."""
|
||
Asset.objects.create(
|
||
asset_id='stamped-vid',
|
||
name='Test',
|
||
uri='/data/anthias_assets/stamped-vid.mp4',
|
||
mimetype='video',
|
||
duration=10,
|
||
is_enabled=True,
|
||
is_processing=True,
|
||
metadata={'foo': 'bar'},
|
||
)
|
||
with mock.patch('anthias_server.celery_tasks.normalize_video_asset.delay'):
|
||
processing.dispatch_normalize_video('stamped-vid')
|
||
a = Asset.objects.get(asset_id='stamped-vid')
|
||
assert 'processing_started_at' in a.metadata
|
||
# Existing metadata keys preserved.
|
||
assert a.metadata['foo'] == 'bar'
|
||
# The stamp is a valid ISO-8601 string.
|
||
from datetime import datetime
|
||
|
||
datetime.fromisoformat(a.metadata['processing_started_at'])
|
||
|
||
|
||
class _FakeSerializer:
|
||
"""Stand-in for the four API serializer classes — the dispatch
|
||
helper only reads ``_pending_normalize``, so a minimal duck type
|
||
is enough to test the branch logic without spinning up DRF."""
|
||
|
||
def __init__(self, pending: str | None) -> None:
|
||
self._pending_normalize = pending
|
||
|
||
|
||
def test_dispatch_pending_normalize_routes_video() -> None:
|
||
with (
|
||
mock.patch('anthias_server.processing.dispatch_normalize_video') as v,
|
||
mock.patch('anthias_server.processing.dispatch_normalize_image') as i,
|
||
):
|
||
processing.dispatch_pending_normalize(
|
||
_FakeSerializer('video'), 'asset-vid'
|
||
)
|
||
v.assert_called_once_with('asset-vid')
|
||
i.assert_not_called()
|
||
|
||
|
||
def test_dispatch_pending_normalize_routes_image() -> None:
|
||
with (
|
||
mock.patch('anthias_server.processing.dispatch_normalize_video') as v,
|
||
mock.patch('anthias_server.processing.dispatch_normalize_image') as i,
|
||
):
|
||
processing.dispatch_pending_normalize(
|
||
_FakeSerializer('image'), 'asset-img'
|
||
)
|
||
i.assert_called_once_with('asset-img')
|
||
v.assert_not_called()
|
||
|
||
|
||
def test_dispatch_pending_normalize_noop_when_unset() -> None:
|
||
"""A serializer that didn't flag normalisation (most uploads:
|
||
JPEG/PNG/H.264) leaves the helper as a no-op — no spurious
|
||
Celery dispatch."""
|
||
with (
|
||
mock.patch('anthias_server.processing.dispatch_normalize_video') as v,
|
||
mock.patch('anthias_server.processing.dispatch_normalize_image') as i,
|
||
):
|
||
processing.dispatch_pending_normalize(
|
||
_FakeSerializer(None), 'asset-plain'
|
||
)
|
||
v.assert_not_called()
|
||
i.assert_not_called()
|
||
|
||
|
||
def test_dispatch_pending_normalize_handles_missing_attribute() -> None:
|
||
"""A serializer class that doesn't carry ``_pending_normalize``
|
||
at all must not crash — defends against a future serializer
|
||
that doesn't route through the shared mixin."""
|
||
|
||
class _Bare:
|
||
pass
|
||
|
||
with (
|
||
mock.patch('anthias_server.processing.dispatch_normalize_video') as v,
|
||
mock.patch('anthias_server.processing.dispatch_normalize_image') as i,
|
||
):
|
||
processing.dispatch_pending_normalize(_Bare(), 'asset-bare')
|
||
v.assert_not_called()
|
||
i.assert_not_called()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# notify() — smoke-test the publish/notify wiring without spinning up
|
||
# a real Channels stack.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_notify_swallows_publish_errors() -> None:
|
||
"""Redis flake during the viewer reload publish must not block
|
||
the browser-side notify (or vice-versa). Both are best-effort."""
|
||
fake_redis = mock.MagicMock()
|
||
fake_redis.publish.side_effect = RuntimeError('redis flake')
|
||
with (
|
||
mock.patch(
|
||
'anthias_common.utils.connect_to_redis', return_value=fake_redis
|
||
),
|
||
mock.patch(
|
||
'anthias_server.app.consumers.notify_asset_update'
|
||
) as notify,
|
||
):
|
||
processing._notify('asset-1')
|
||
notify.assert_called_once_with('asset-1')
|
||
|
||
|
||
def test_notify_swallows_notify_errors() -> None:
|
||
fake_redis = mock.MagicMock()
|
||
with (
|
||
mock.patch(
|
||
'anthias_common.utils.connect_to_redis', return_value=fake_redis
|
||
),
|
||
mock.patch(
|
||
'anthias_server.app.consumers.notify_asset_update',
|
||
side_effect=RuntimeError('channels flake'),
|
||
),
|
||
):
|
||
# Should not raise.
|
||
processing._notify('asset-1')
|
||
# The viewer-reload publish ran first and succeeded; the
|
||
# subsequent notify failure was caught.
|
||
fake_redis.publish.assert_called_once()
|
||
|
||
|
||
def test_notify_browser_only_skips_viewer_reload() -> None:
|
||
"""Intermediate hops that leave is_processing=True (e.g. the
|
||
YouTube task before chaining into normalize_video) call
|
||
``_notify(..., reload_viewer=False)`` so the on-device viewer
|
||
doesn't reload its playlist for a row that's still mid-flight.
|
||
The browser-side update still fires so the dashboard picks up
|
||
the new title/duration immediately."""
|
||
fake_redis = mock.MagicMock()
|
||
with (
|
||
mock.patch(
|
||
'anthias_common.utils.connect_to_redis', return_value=fake_redis
|
||
),
|
||
mock.patch(
|
||
'anthias_server.app.consumers.notify_asset_update'
|
||
) as browser_notify,
|
||
):
|
||
processing._notify('asset-1', reload_viewer=False)
|
||
# Viewer publish never happened.
|
||
fake_redis.publish.assert_not_called()
|
||
# Browser nudge still went out.
|
||
browser_notify.assert_called_once_with('asset-1')
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# JSON probe payload — ffprobe output parsing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_ffprobe_streams_parses_json() -> None:
|
||
payload = json.dumps(
|
||
{'streams': [{'codec_type': 'video', 'codec_name': 'h264'}]}
|
||
)
|
||
|
||
class _FakeBuf:
|
||
def __init__(self, body: str) -> None:
|
||
self._body = body
|
||
|
||
def __str__(self) -> str:
|
||
return self._body
|
||
|
||
with mock.patch.object(
|
||
sh, 'ffprobe', create=True, return_value=_FakeBuf(payload)
|
||
):
|
||
result = processing._ffprobe_streams('fixture.mp4')
|
||
assert result['streams'][0]['codec_name'] == 'h264'
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Static webp fixture for upload-path tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _stage_temp_upload(asset_dir: str, content: bytes) -> str:
|
||
"""Mirror ``FileAssetViewMixin.post`` byte-for-byte: write
|
||
``<assetdir>/<uuid>.tmp`` and return the path. Used by the
|
||
upload-path wiring tests to drive ``prepare_asset`` without a
|
||
real HTTP roundtrip."""
|
||
import uuid as _uuid
|
||
|
||
p = path.join(asset_dir, f'{_uuid.uuid4().hex}.tmp')
|
||
with open(p, 'wb') as fh:
|
||
fh.write(content)
|
||
return p
|
||
|
||
|
||
def _serialised_image_bytes() -> bytes:
|
||
"""Return a raw-PNG byte buffer — compatible with the URL
|
||
reachability probe (which only checks for a local file's
|
||
existence on schemeless paths)."""
|
||
buf = io.BytesIO()
|
||
Image.new('RGB', (4, 4), (10, 20, 30)).save(buf, 'PNG')
|
||
return buf.getvalue()
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_prepare_asset_routes_heic_through_image_pipeline(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""End-to-end-ish: simulate a HEIC upload and verify
|
||
prepare_asset stamps is_processing=True and stashes the pending
|
||
flag the view dispatches on."""
|
||
from datetime import datetime, timezone as _tz
|
||
|
||
from anthias_server.api.serializers.v2 import CreateAssetSerializerV2
|
||
|
||
upload_path = _stage_temp_upload(asset_dir, _serialised_image_bytes())
|
||
|
||
serializer = CreateAssetSerializerV2(
|
||
data={
|
||
'name': 'pic',
|
||
'uri': upload_path,
|
||
'ext': '.heic',
|
||
'mimetype': 'image',
|
||
'duration': 10,
|
||
'start_date': datetime(2026, 1, 1, tzinfo=_tz.utc),
|
||
'end_date': datetime(2030, 1, 1, tzinfo=_tz.utc),
|
||
'is_enabled': False,
|
||
},
|
||
unique_name=False,
|
||
)
|
||
with (
|
||
# We don't need an actual reachability probe for this test.
|
||
mock.patch(
|
||
'anthias_server.api.serializers.mixins.url_fails',
|
||
return_value=False,
|
||
),
|
||
):
|
||
assert serializer.is_valid(), serializer.errors
|
||
|
||
asset_dict = serializer.validated_data
|
||
assert asset_dict['is_processing'] is True
|
||
assert asset_dict['mimetype'] == 'image'
|
||
assert serializer._pending_normalize == 'image'
|
||
# The renamed file lives at <assetdir>/<asset_id>.heic
|
||
assert asset_dict['uri'].endswith('.heic')
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_prepare_asset_routes_video_through_video_pipeline(
|
||
asset_dir: str,
|
||
) -> None:
|
||
from datetime import datetime, timezone as _tz
|
||
|
||
from anthias_server.api.serializers.v2 import CreateAssetSerializerV2
|
||
|
||
upload_path = _stage_temp_upload(asset_dir, b'fake mp4 bytes ' * 16)
|
||
|
||
serializer = CreateAssetSerializerV2(
|
||
data={
|
||
'name': 'clip',
|
||
'uri': upload_path,
|
||
'ext': '.mp4',
|
||
'mimetype': 'video',
|
||
'duration': 0,
|
||
'start_date': datetime(2026, 1, 1, tzinfo=_tz.utc),
|
||
'end_date': datetime(2030, 1, 1, tzinfo=_tz.utc),
|
||
'is_enabled': False,
|
||
},
|
||
unique_name=False,
|
||
)
|
||
with (
|
||
mock.patch(
|
||
'anthias_server.api.serializers.mixins.url_fails',
|
||
return_value=False,
|
||
),
|
||
mock.patch(
|
||
'anthias_server.api.serializers.mixins.get_video_duration',
|
||
return_value=None,
|
||
),
|
||
):
|
||
assert serializer.is_valid(), serializer.errors
|
||
|
||
asset_dict = serializer.validated_data
|
||
assert asset_dict['is_processing'] is True
|
||
assert serializer._pending_normalize == 'video'
|
||
# Duration left at 0 — task fills it in on completion.
|
||
assert asset_dict['duration'] == 0
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_prepare_asset_skips_pipeline_for_remote_url(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""A webpage / RTSP / HTTP video URL must not get flagged for
|
||
normalisation — only locally-uploaded files do."""
|
||
from datetime import datetime, timezone as _tz
|
||
|
||
from anthias_server.api.serializers.v2 import CreateAssetSerializerV2
|
||
|
||
serializer = CreateAssetSerializerV2(
|
||
data={
|
||
'name': 'web',
|
||
'uri': 'https://example.com/page',
|
||
'mimetype': 'webpage',
|
||
'duration': 30,
|
||
'start_date': datetime(2026, 1, 1, tzinfo=_tz.utc),
|
||
'end_date': datetime(2030, 1, 1, tzinfo=_tz.utc),
|
||
'is_enabled': False,
|
||
},
|
||
unique_name=False,
|
||
)
|
||
with mock.patch(
|
||
'anthias_server.api.serializers.mixins.url_fails',
|
||
return_value=False,
|
||
):
|
||
assert serializer.is_valid(), serializer.errors
|
||
|
||
assert serializer._pending_normalize is None
|
||
assert serializer.validated_data.get('is_processing') in (False, 0, None)
|
||
|
||
|
||
@pytest.mark.django_db
|
||
def test_prepare_asset_skips_pipeline_for_jpeg_upload(
|
||
asset_dir: str,
|
||
) -> None:
|
||
"""Common JPEG / PNG / WebP uploads land ready-to-play — never
|
||
enqueue a normalisation task for them."""
|
||
from datetime import datetime, timezone as _tz
|
||
|
||
from anthias_server.api.serializers.v2 import CreateAssetSerializerV2
|
||
|
||
upload = _stage_temp_upload(asset_dir, _serialised_image_bytes())
|
||
serializer = CreateAssetSerializerV2(
|
||
data={
|
||
'name': 'photo',
|
||
'uri': upload,
|
||
'ext': '.jpg',
|
||
'mimetype': 'image',
|
||
'duration': 10,
|
||
'start_date': datetime(2026, 1, 1, tzinfo=_tz.utc),
|
||
'end_date': datetime(2030, 1, 1, tzinfo=_tz.utc),
|
||
'is_enabled': False,
|
||
},
|
||
unique_name=False,
|
||
)
|
||
with mock.patch(
|
||
'anthias_server.api.serializers.mixins.url_fails',
|
||
return_value=False,
|
||
):
|
||
assert serializer.is_valid(), serializer.errors
|
||
assert serializer._pending_normalize is None
|