feat(server): introduce PlaybackEnvelope dataclass + matrix + cache

Foundation for the per-board playback envelope rollout (see
/home/ubuntu/.claude/plans/serene-munching-gem.md). No behaviour
change yet — wires up the canonical source of truth that
processing.py, celery_tasks.py's future re-render walker, and the
viewer's hwdec dispatch will all read from in the next commit.

src/anthias_server/playback_envelope.py (new)
---------------------------------------------

Frozen dataclass ``PlaybackEnvelope`` carrying codec / max_width /
max_height / max_fps plus a fixed ``container_ext = 'mp4'``.
``ENVELOPE_BY_DEVICE_TYPE`` maps every supported board:

* pi2 / pi3 / arm64 → H.264 1920x1080 30 (no HEVC silicon /
  no upstream mpv HW path)
* pi4-64 / pi5 / x86 → HEVC 3840x2160 60 (dedicated HEVC block
  or VAAPI; fleet uniformity so the same upload produces
  bit-identical variants on every board)

``compute_envelope()`` resolves the current process's envelope
from DEVICE_TYPE; unset / unknown / mixed-case / whitespace all
fall back to the conservative default (H.264 1080p30).

``load_cached()`` / ``save_cached()`` round-trip the envelope to
``~/.anthias/playback-envelope.json``. Cache corruption (missing
file, bad JSON, unsupported codec) returns ``None`` so the caller
recomputes and overwrites — a hand-edit that breaks the file
self-heals on next start. ``save_cached`` writes atomically via
temp-file + rename.

src/anthias_server/processing.py
--------------------------------

``_ffprobe_summary`` now returns ``video_fps`` alongside the
existing keys. The next commit (Phase 2) uses this to decide
whether to emit ``-r envelope.max_fps`` — the cap is one-way, so
sub-cap source rates pass through unchanged. r_frame_rate is
parsed as a rational ``num/den``; unparseable / zero-denominator
collapses to ``None`` so the caller treats source fps as
"unknown" and skips the gate.

tests
-----

* tests/test_playback_envelope.py (new): matrix coverage; unset /
  unknown / cased / whitespace inputs; cache round-trip; missing
  / corrupt JSON / invalid-payload recovery; atomic write
  (no leaked .tmp); container_ext invariant.
* tests/test_processing.py: positive video_fps cases (integer
  rates, NTSC drop-frame 30000/1001 + 60000/1001, bogus / no-slash
  / zero-denominator inputs); the two ``assert summary == { ... }``
  ffprobe-recovery tests now include the new ``video_fps: None``
  key.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Viktor Petersson
2026-05-14 06:22:06 +00:00
parent bc37d989f2
commit 0b6bea0c91
4 changed files with 513 additions and 0 deletions

View File

@@ -0,0 +1,213 @@
"""Per-board playback envelope.
The asset processor renders every video upload into a single
"playback variant" sized to the board's hardware envelope (codec +
max resolution + max fps). The viewer never sees anything outside
that envelope, which gives us:
* one codec per rotation → one mpv hwdec path per board, no
per-clip dispatch surprises;
* uniform output mode → cage's / `--vo=drm`'s output stays fixed
for the rotation's lifetime, no mid-clip mode flips;
* deterministic playback — drop counts depend on the display, not
on whichever happens-to-be-uploaded codec/resolution mix.
This module is the canonical source of truth for that envelope.
Three things consume it:
* `anthias_server.processing._run_video_normalisation` picks the
target codec + scale + fps cap for the ffmpeg invocation;
* the new `regenerate_for_envelope_change` celery task in
`anthias_server.celery_tasks` walks the catalog and re-renders
variants whose recorded `metadata['envelope']` no longer matches;
* `anthias_viewer.media_player._pi_hwdec_for_uri` still ffprobes
at launch time as a runtime safety net — but with every variant
on disk inside the envelope, the dispatch resolves the same way
every time.
V1 is board-driven (no display probing). A follow-up PR replaces
the body of `compute_envelope()` with a runtime resolver (probe
mpv hwdec list + /dev/video* + vainfo) returning the same
``PlaybackEnvelope`` type — call sites won't change.
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import asdict, dataclass
from os import path
from typing import Any
logger = logging.getLogger(__name__)
# Cached envelope JSON lives next to ``anthias.conf`` so an
# operator with shell access can inspect / hand-edit it (same place
# they edit the rest of Anthias's persistent state).
_CACHE_FILENAME = 'playback-envelope.json'
@dataclass(frozen=True)
class PlaybackEnvelope:
"""The codec + dimensions + framerate ceiling for a board.
Every playback variant on disk must match this envelope exactly
on codec, and be ``<= max_width`` / ``<= max_height`` /
``<= max_fps`` on the rest. ``container_ext`` is always mp4 —
keeping the container fixed simplifies the variant filename
convention (`<id>.mp4`) and matches what every Anthias-supported
player handles natively.
"""
codec: str # 'h264' or 'hevc'
max_width: int
max_height: int
max_fps: int
@property
def container_ext(self) -> str:
return 'mp4'
def as_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PlaybackEnvelope:
"""Reconstruct from a JSON-loaded dict.
Raises ``ValueError`` if any required key is missing or the
codec is outside the supported set. Cache corruption is
handled at the call site by treating a load failure as
"no cache" → trigger a fresh compute.
"""
try:
codec = str(data['codec']).lower()
max_width = int(data['max_width'])
max_height = int(data['max_height'])
max_fps = int(data['max_fps'])
except (KeyError, TypeError, ValueError) as exc:
raise ValueError(f'malformed envelope payload: {exc}') from exc
if codec not in ('h264', 'hevc'):
raise ValueError(f'unsupported envelope codec: {codec!r}')
return cls(codec, max_width, max_height, max_fps)
# Board -> envelope. Keys must match the ``DEVICE_TYPE`` env var the
# image builder writes into every Anthias-managed container, NOT
# whatever ``get_device_type()`` infers from /proc/device-tree/model
# at runtime. Build-time and transcode-time decisions therefore
# always agree on a Balena / dev workflow that runs an amd64 build
# on x86 hardware while still claiming a Pi target.
#
# Pi 4 / Pi 5 / x86 collapse to one HEVC 4Kp60 envelope on purpose:
# fleet uniformity means a single upload produces bit-identical
# variants on all three boards (cross-device sha256 stays equal),
# and we never have to handle a per-clip codec switch in rotation.
# Pi 4's H.264 V3D M2M path technically reaches 1080p60 at HW, but
# its CPU-mediated copy is slower in practice than the dedicated
# HEVC block (see the drop-rate data in the PR description); the
# trade is a one-time libx265 re-encode at upload buying steady-
# state HW decode forever.
#
# Pi 2 / Pi 3 and arm64 land on H.264 1080p30: legacy Pi has no
# HEVC silicon at all (V3D-IV is H.264-only) and arm64's Rockchip
# / Cedrus / Amlogic decoders are not reachable through upstream
# mpv. Conservative SW fallback.
ENVELOPE_BY_DEVICE_TYPE: dict[str, PlaybackEnvelope] = {
'pi2': PlaybackEnvelope('h264', 1920, 1080, 30),
'pi3': PlaybackEnvelope('h264', 1920, 1080, 30),
'pi4-64': PlaybackEnvelope('hevc', 3840, 2160, 60),
'pi5': PlaybackEnvelope('hevc', 3840, 2160, 60),
'x86': PlaybackEnvelope('hevc', 3840, 2160, 60),
'arm64': PlaybackEnvelope('h264', 1920, 1080, 30),
}
# Fallback when ``DEVICE_TYPE`` is unset (host dev shell,
# ``ENVIRONMENT=test``) or names a board we haven't profiled yet.
# H.264 1080p30 is the most-compatible choice — every player Anthias
# ships with handles it, and any board's CPU can software-decode it
# at real time.
_DEFAULT = PlaybackEnvelope('h264', 1920, 1080, 30)
def compute_envelope() -> PlaybackEnvelope:
"""Resolve the envelope for the current process.
Reads ``DEVICE_TYPE`` from the environment (the image builder
writes it in at build time) and returns the matching matrix
entry. Unknown or empty values resolve to ``_DEFAULT``.
"""
key = os.environ.get('DEVICE_TYPE', '').strip().lower()
return ENVELOPE_BY_DEVICE_TYPE.get(key, _DEFAULT)
def _cache_path() -> str:
"""Absolute location of the persisted envelope JSON.
Lives alongside ``anthias.conf`` in the user's config dir (the
canonical Anthias persistent-state directory; an operator with
shell access can inspect / hand-edit). Resolved via the
``AnthiasSettings`` singleton so we pick up the same ``$HOME``
every other persistent-state path uses.
"""
from anthias_server.settings import settings
return path.join(settings.get_configdir(), _CACHE_FILENAME)
def load_cached() -> PlaybackEnvelope | None:
"""Read the envelope from disk.
Returns ``None`` when:
* the cache file doesn't exist (first start ever),
* the JSON fails to parse, OR
* the payload doesn't validate via ``PlaybackEnvelope.from_dict``.
Cache corruption is treated as "no cache" by design: the caller
will compute fresh and overwrite, so a hand-edit that breaks the
file self-heals on next start. The corrupting state never
propagates into the walker.
"""
cache_path = _cache_path()
try:
with open(cache_path) as f:
data = json.load(f)
except FileNotFoundError:
return None
except (OSError, json.JSONDecodeError) as exc:
logger.warning(
'playback envelope cache at %s is unreadable (%s); '
'treating as missing — fresh compute will overwrite',
cache_path,
exc,
)
return None
try:
return PlaybackEnvelope.from_dict(data)
except ValueError as exc:
logger.warning(
'playback envelope cache at %s contains invalid payload '
'(%s); treating as missing — fresh compute will overwrite',
cache_path,
exc,
)
return None
def save_cached(envelope: PlaybackEnvelope) -> None:
"""Persist the envelope to disk atomically.
Writes via the standard temp-file-then-rename idiom so a crash
mid-write never leaves a half-written JSON file the next
``load_cached`` would have to discard. ``os.replace`` is atomic
on every filesystem Anthias supports.
"""
cache_path = _cache_path()
tmp_path = f'{cache_path}.tmp'
payload = json.dumps(envelope.as_dict(), indent=2, sort_keys=True)
with open(tmp_path, 'w') as f:
f.write(payload)
f.write('\n')
os.replace(tmp_path, cache_path)

View File

@@ -930,6 +930,12 @@ def _ffprobe_summary(input_path: str) -> dict[str, Any]:
pixel cap that keeps Pi 4 from passthrough-ing 4K H.264
(V3D's H.264 envelope tops out around 1080p60; 4K H.264
would clear the codec gate but fall to software at play).
* ``video_fps`` — the first video stream's average frame rate
as a float, or ``None`` if no video stream or
``r_frame_rate`` was unparseable. Used by the playback-
envelope transcode to decide whether to emit
``-r envelope.max_fps`` (only when source > cap; the cap
is one-way and never up-converts sub-cap content).
* ``audio_codec`` — lowercase codec name, ``'none'`` when the
file genuinely carries no audio stream, or ``'unknown'`` if
the audio stream existed but ffprobe couldn't name its
@@ -957,6 +963,7 @@ def _ffprobe_summary(input_path: str) -> dict[str, Any]:
'container': 'unknown',
'video_codec': 'unknown',
'video_pixels': None,
'video_fps': None,
'audio_codec': 'unknown',
'duration_seconds': None,
}
@@ -1004,6 +1011,22 @@ def _ffprobe_summary(input_path: str) -> dict[str, Any]:
except (TypeError, ValueError):
vw = vh = 0
video_pixels: int | None = vw * vh if vw > 0 and vh > 0 else None
# Average frame rate, used by the envelope transcode to decide
# whether to emit ``-r``. ffprobe writes ``r_frame_rate`` as a
# rational ``num/den`` string (e.g. ``30000/1001`` for NTSC,
# ``60/1`` for true 60 fps). Anything unparseable collapses to
# ``None`` so the caller treats it as "we can't tell" and skips
# the fps gate (the codec / resolution gates still fire).
video_fps: float | None = None
raw_fps = (video or {}).get('r_frame_rate')
if raw_fps and isinstance(raw_fps, str) and '/' in raw_fps:
num_str, _, den_str = raw_fps.partition('/')
try:
num, den = float(num_str), float(den_str)
if den > 0:
video_fps = num / den
except ValueError:
video_fps = None
if audio is None:
audio_codec = 'none'
else:
@@ -1027,6 +1050,7 @@ def _ffprobe_summary(input_path: str) -> dict[str, Any]:
'container': container,
'video_codec': video_codec,
'video_pixels': video_pixels,
'video_fps': video_fps,
'audio_codec': audio_codec,
'duration_seconds': duration_seconds,
}

View File

@@ -0,0 +1,230 @@
"""Tests for ``anthias_server.playback_envelope``.
The envelope is the canonical "what every video on disk must
look like" per device. The matrix is the source of truth three
other modules read from (`processing.py`, `celery_tasks.py`'s
walker, `media_player.py`'s safety-net dispatch), so the tests
here lock in:
* every supported board → an envelope that matches the
documented matrix;
* unknown / unset ``DEVICE_TYPE`` → the conservative default;
* cache round-trip: ``save_cached(e); load_cached() == e``;
* cache corruption (missing file, bad JSON, malformed payload) →
``load_cached() == None`` so the caller computes fresh and
overwrites.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pytest
from anthias_server.playback_envelope import (
ENVELOPE_BY_DEVICE_TYPE,
PlaybackEnvelope,
compute_envelope,
load_cached,
save_cached,
)
@pytest.fixture
def cache_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Redirect the envelope cache to a tmpdir.
The module reads ``settings.get_configdir()`` to find its
persistent state. The simplest, side-effect-free override is to
point ``$HOME`` at a tmp dir + create the ``.anthias`` subdir
inside it. ``AnthiasSettings`` re-reads ``$HOME`` on every
method call.
"""
home = tmp_path / 'home'
(home / '.anthias').mkdir(parents=True)
monkeypatch.setenv('HOME', str(home))
return home / '.anthias'
@pytest.mark.parametrize(
('device_type', 'codec', 'max_w', 'max_h', 'max_fps'),
[
# H.264 1080p30 boards (no HEVC silicon / no mpv HW path).
('pi2', 'h264', 1920, 1080, 30),
('pi3', 'h264', 1920, 1080, 30),
('arm64', 'h264', 1920, 1080, 30),
# HEVC 4Kp60 boards (dedicated HEVC block or VAAPI).
('pi4-64', 'hevc', 3840, 2160, 60),
('pi5', 'hevc', 3840, 2160, 60),
('x86', 'hevc', 3840, 2160, 60),
],
)
def test_envelope_matrix_per_device_type(
device_type: str,
codec: str,
max_w: int,
max_h: int,
max_fps: int,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Every documented board resolves to its envelope exactly.
The matrix is the canonical source of truth — a silent drift
here would split codec policy between the asset processor and
the viewer's hwdec dispatch (which is exactly the kind of bug
`bb27b186` was meant to close once and for all). Pin every row.
"""
monkeypatch.setenv('DEVICE_TYPE', device_type)
e = compute_envelope()
assert e == PlaybackEnvelope(codec, max_w, max_h, max_fps)
# Also verify the dict carries the same value the test pinned.
assert ENVELOPE_BY_DEVICE_TYPE[device_type] == e
def test_envelope_unset_device_type_default(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Unset env → safe H.264 1080p30 default.
The dev host (no DEVICE_TYPE) and `ENVIRONMENT=test` both hit
this path. H.264 1080p is the lowest common denominator any
Anthias-supported board can play in software.
"""
monkeypatch.delenv('DEVICE_TYPE', raising=False)
assert compute_envelope() == PlaybackEnvelope('h264', 1920, 1080, 30)
def test_envelope_unknown_device_type_falls_back(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""An unrecognised board name (typo, future board) falls back
to the conservative default rather than crashing — Anthias
should keep working, just at the safe codec, until the matrix
learns the new key."""
monkeypatch.setenv('DEVICE_TYPE', 'weird-future-board-2030')
assert compute_envelope() == PlaybackEnvelope('h264', 1920, 1080, 30)
def test_envelope_device_type_case_and_whitespace(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""``compute_envelope`` lowercases + strips so a stray newline
in the env file or a mixed-case board key doesn't fall through
to the default by accident. Matches the same normalisation the
legacy `_resolve_board_profile` did."""
monkeypatch.setenv('DEVICE_TYPE', ' PI5\n')
assert compute_envelope() == ENVELOPE_BY_DEVICE_TYPE['pi5']
def test_envelope_dataclass_round_trip() -> None:
"""``PlaybackEnvelope.from_dict(e.as_dict()) == e`` is the
foundation the JSON cache and the per-asset
``metadata['envelope']`` field rely on for equality
comparison."""
e = PlaybackEnvelope('hevc', 3840, 2160, 60)
assert PlaybackEnvelope.from_dict(e.as_dict()) == e
@pytest.mark.parametrize(
'payload',
[
# Missing required key.
{'codec': 'hevc', 'max_width': 3840, 'max_height': 2160},
# Non-integer dimension.
{
'codec': 'hevc',
'max_width': 'big',
'max_height': 2160,
'max_fps': 60,
},
# Unsupported codec (would silently route to wrong hwdec).
{'codec': 'vp9', 'max_width': 1920, 'max_height': 1080, 'max_fps': 30},
],
)
def test_envelope_from_dict_rejects_malformed(payload: dict[str, Any]) -> None:
"""A corrupt cache must not yield a half-valid envelope — the
caller treats ``ValueError`` as "no cache" and recomputes from
the matrix. This is how a hand-edit that breaks the JSON
self-heals on next start."""
with pytest.raises(ValueError):
PlaybackEnvelope.from_dict(payload)
def test_cache_round_trip(cache_dir: Path) -> None:
"""``save_cached(e); load_cached() == e`` for every envelope
shape we'd realistically write. The serialisation format is
pinned (codec / max_width / max_height / max_fps) so an
operator hand-editing the JSON sees a predictable schema."""
e = PlaybackEnvelope('hevc', 3840, 2160, 60)
save_cached(e)
assert load_cached() == e
written = json.loads((cache_dir / 'playback-envelope.json').read_text())
assert written == {
'codec': 'hevc',
'max_width': 3840,
'max_height': 2160,
'max_fps': 60,
}
def test_load_cached_returns_none_when_file_missing(cache_dir: Path) -> None:
"""First-ever start has no cache file. The caller computes
fresh and writes — we don't want ``load_cached`` to raise."""
assert load_cached() is None
def test_load_cached_returns_none_on_corrupt_json(cache_dir: Path) -> None:
"""A hand-edit that breaks JSON parsing self-heals: we treat
it as missing and the caller recomputes + overwrites. No
operator intervention needed beyond a server restart."""
(cache_dir / 'playback-envelope.json').write_text('this is not json {{{')
assert load_cached() is None
def test_load_cached_returns_none_on_invalid_payload(cache_dir: Path) -> None:
"""Same recovery contract for parseable JSON whose payload
doesn't validate (e.g. someone added ``codec: vp9`` — outside
the supported set). Better to drop a malformed envelope than
have the walker re-render the entire catalog against it."""
(cache_dir / 'playback-envelope.json').write_text(
json.dumps(
{
'codec': 'vp9',
'max_width': 1920,
'max_height': 1080,
'max_fps': 30,
}
)
)
assert load_cached() is None
def test_save_cached_atomic(cache_dir: Path) -> None:
"""``save_cached`` uses temp-file + rename so a crash mid-write
can't leave a half-written file. We can't directly observe the
rename, but we can verify the tmp file isn't left behind after
a successful save."""
save_cached(PlaybackEnvelope('hevc', 3840, 2160, 60))
leftover = list(cache_dir.glob('*.tmp'))
assert not leftover, f'staging file leaked: {leftover}'
def test_save_cached_overwrites_existing(cache_dir: Path) -> None:
"""An envelope change on Anthias upgrade rewrites the cache.
Verify the second write replaces the first (no append, no
leftover state)."""
save_cached(PlaybackEnvelope('h264', 1920, 1080, 30))
save_cached(PlaybackEnvelope('hevc', 3840, 2160, 60))
assert load_cached() == PlaybackEnvelope('hevc', 3840, 2160, 60)
def test_container_ext_is_mp4() -> None:
"""The container stays fixed across every envelope so the
variant-on-disk filename convention (`<id>.mp4`) never gains a
per-board exception. Every Anthias-supported player handles
mp4 natively, so this is a deliberate one-way constraint."""
for envelope in ENVELOPE_BY_DEVICE_TYPE.values():
assert envelope.container_ext == 'mp4'

View File

@@ -1087,6 +1087,50 @@ def test_ffprobe_summary_handles_no_audio_track() -> None:
assert summary['audio_codec'] == 'none'
@pytest.mark.parametrize(
('r_frame_rate', 'expected_fps'),
[
# Integer rates land cleanly.
('30/1', 30.0),
('60/1', 60.0),
('25/1', 25.0),
# NTSC drop-frame: 30000/1001 ≈ 29.97.
('30000/1001', 29.97002997002997),
# 60000/1001 ≈ 59.94 (NTSC 60).
('60000/1001', 59.94005994005994),
# Garbage values collapse to None so the envelope cap
# treats the source as "we can't tell" and skips the fps
# gate — codec / resolution gates still fire.
('bogus', None),
('60', None), # no slash → no rational, drop to None
('0/0', None), # denominator 0 → no fps
],
)
def test_ffprobe_summary_parses_video_fps(
r_frame_rate: str, expected_fps: float | None
) -> None:
"""``video_fps`` is the average frame rate parsed from
ffprobe's ``r_frame_rate`` rational. The envelope transcode
uses it to decide when to emit ``-r envelope.max_fps`` — only
when source fps > cap. Garbage / zero-denominator → ``None``."""
fake = {
'format': {},
'streams': [
{
'codec_type': 'video',
'codec_name': 'h264',
'r_frame_rate': r_frame_rate,
},
],
}
with mock.patch.object(processing, '_ffprobe_streams', return_value=fake):
summary = processing._ffprobe_summary('fixture.mp4')
if expected_fps is None:
assert summary['video_fps'] is None
else:
assert summary['video_fps'] == pytest.approx(expected_fps)
def test_ffprobe_summary_prefers_format_name_over_filename_extension() -> None:
"""Defensive: ffprobe-reported ``format.format_name`` beats the
filename. A ``.bin`` file that's actually an MP4 must classify
@@ -1204,6 +1248,7 @@ def test_ffprobe_summary_handles_probe_failure() -> None:
'container': 'unknown',
'video_codec': 'unknown',
'video_pixels': None,
'video_fps': None,
'audio_codec': 'unknown',
'duration_seconds': None,
}
@@ -1345,6 +1390,7 @@ def test_ffprobe_summary_handles_missing_ffprobe_binary() -> None:
'container': 'unknown',
'video_codec': 'unknown',
'video_pixels': None,
'video_fps': None,
'audio_codec': 'unknown',
'duration_seconds': None,
}