perf(celery,viewer): four hardening fixes so the player survives an upgrade

Live testing on Pi 4 / Pi 5 / Rock Pi 4 surfaced four scenarios
where a single ``docker compose pull && up -d`` (or any upgrade
that invalidates the playback envelope) wedges the device. These
aren't test-harness flakes; production operators on the same
hardware would hit them. All four belong in this PR alongside the
features that exposed them.

1. **Walker drip-feed** — ``regenerate_for_envelope_change``
   previously queued every stale ``normalize_video_asset`` in one
   beat tick. ``--concurrency=1`` serialises *execution* but the
   celery worker fetches the next task the instant the previous
   finishes, so a 100-asset catalog turns into hours of back-to-
   back libx265 with zero recovery windows between encodes.
   Switch to ``apply_async(args=..., countdown=N * 60)`` so
   each subsequent normalize starts at least 60 s after the
   previous was queued. Operator can flip ``is_processing=False``
   on a row mid-window to cancel its turn.
2. **``mem_limit`` on celery container** — cgroup CPU isolation
   alone doesn't stop libx265-4K from allocating ~1.5 GB resident
   memory, which on a 4 GB SBC pushes the system into swap and
   starves sshd + the viewer. Match the cpus cap with a memory
   cap (60% of host RAM, computed in ``bin/upgrade_containers.sh``).
3. **``stop_grace_period: 3s`` + ``stop_signal: SIGKILL`` on
   viewer** — cage doesn't reliably release DRM master on
   SIGTERM (its libinput shutdown path hangs on certain kernels)
   and the kernel's GPU driver leaves dangling references that
   prevent the next ``up`` from acquiring DRM master. Skipping the
   SIGTERM-then-wait dance on intentional restarts gets the
   device past cage's bug deterministically.
4. **libx265 / libx264 ``-preset superfast``** — was ``medium``.
   Asset processing is upload-time and only runs once per asset,
   so the 5-10× wallclock speedup is operator-facing throughput.
   The ~10-20% bitrate increase is invisible on typical signage
   content. Viewer decode is HW regardless of preset.

Tests:
* Walker test mocks switched from ``.delay`` to ``.apply_async``;
  signatures updated for ``args=(...,)`` + ``countdown=`` kwarg.
* New ``test_regenerate_walker_spaces_dispatches_via_countdown``
  asserts the countdowns are ``[0, 60, 120, ...]`` across a
  5-asset catalog so the drip-feed contract is pinned.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Viktor Petersson
2026-05-14 16:56:24 +00:00
parent 4d0184f515
commit 0340b4f444
5 changed files with 130 additions and 13 deletions

View File

@@ -28,6 +28,15 @@ export SHM_SIZE_KB="$(echo "$TOTAL_MEMORY_KB" \* 0.3 | bc | cut -d'.' -f1)"
# starves sshd through banner exchange and drops mpv frames.
CELERY_CPU_LIMIT_RAW=$(echo "$(nproc) * 0.5" | bc -l)
export CELERY_CPU_LIMIT=$(awk -v v="$CELERY_CPU_LIMIT_RAW" 'BEGIN { printf "%.1f", (v < 1.0 ? 1.0 : v) }')
# Hard cgroup memory limit for anthias-celery. 60% of host RAM
# keeps libx265 (~1.5 GB resident on 4K HEVC encodes) from pushing
# the system into swap, which is what actually starves sshd + the
# viewer on 4 GB SBCs. Without this cap a single 4K transcode on
# the Rock Pi 4 made the box unresponsive even with the CPU quota
# in place — cgroup CPU isolation doesn't help if libx265 can
# allocate all available RAM. 60% leaves 40% for the viewer +
# server + redis + system, matching the CPU 50/50 split.
export CELERY_MEMORY_LIMIT_KB=$(echo "$TOTAL_MEMORY_KB * 0.6" | bc | cut -d'.' -f1)
GIT_BRANCH="${GIT_BRANCH:-master}"
MODE="${MODE:-pull}"

View File

@@ -61,6 +61,21 @@ services:
- LC_ALL=${LC_ALL}
privileged: true
restart: always
# cage (the wlroots kiosk compositor used on Pi 5 / x86 /
# arm64 viewers) doesn't always release the DRM master on
# SIGTERM — its event loop blocks on a libinput shutdown
# path that hangs on certain kernels, and the kernel's GPU
# driver leaves dangling references that prevent the next
# container start from acquiring DRM master. ``stop_grace_period``
# caps how long ``docker compose down/restart`` waits before
# SIGKILLing the container; 3s is short enough that an
# upgrade rolls in a reasonable time even if cage hangs.
# ``stop_signal: SIGKILL`` skips the SIGTERM-then-wait dance
# entirely on intentional stops — same effect as a kernel
# OOM killer landing on the viewer, which the next ``up``
# cleans up from automatically.
stop_grace_period: 3s
stop_signal: SIGKILL
shm_size: ${SHM_SIZE_KB}kb
volumes:
- resin-data:/data
@@ -120,6 +135,17 @@ services:
# bigger machines finish encodes faster without ever
# compromising playback responsiveness.
cpus: ${CELERY_CPU_LIMIT}
# ``mem_limit`` partners with the cpus cap above. cgroup CPU
# isolation alone doesn't stop libx265 from allocating every
# available byte at 4K (~1.5 GB resident), which on a 4 GB
# SBC pushes the system into swap and starves the viewer +
# sshd. 60% of host RAM (computed in
# bin/upgrade_containers.sh) keeps libx265's working set
# comfortably inside the cap and leaves enough headroom for
# everything else. Live-confirmed on the Rock Pi 4 that
# without this cap a single 4K transcode wedged the device
# despite the cpus cap.
mem_limit: ${CELERY_MEMORY_LIMIT_KB}k
command: >
nice -n 19 ionice -c 3
celery -A anthias_server.celery_tasks.celery worker -B -n worker@anthias

View File

@@ -1026,6 +1026,12 @@ from anthias_server import processing # noqa: E402
NORMALIZE_VIDEO_TIME_LIMIT_S = processing.NORMALIZE_VIDEO_TIME_LIMIT_S
# Spacing between normalize_video_asset tasks queued by
# ``regenerate_for_envelope_change``. See the comment at the
# queueing site for the rationale (operator-overridable
# cancellation window + box recovery time between encodes).
_WALKER_DRIP_INTERVAL_S = 60
@celery.task(
base=processing._NormalizeAssetTask,
@@ -1182,7 +1188,26 @@ def regenerate_for_envelope_change(force: bool = False) -> int:
is_processing=True,
)
processing.stamp_processing_start(asset.asset_id)
normalize_video_asset.delay(asset.asset_id)
# Drip-feed: stagger queued tasks so an upgrade that
# invalidates every asset in the catalog doesn't fire
# all libx265 encodes back-to-back. ``--concurrency=1``
# already executes them serially, but without
# ``countdown`` the celery worker fetches the next
# task the instant the previous one finishes, which on
# a Pi 4 / Pi 5 / Rock Pi means continuous CPU
# saturation for as long as the catalog takes to re-
# render (potentially hours). The countdown gives the
# operator a chance to ``is_processing=False`` a row
# they want to skip and lets the box breathe between
# encodes. ``WALKER_DRIP_INTERVAL_S`` (default 60 s)
# is a compromise: fast enough to finish a 100-asset
# catalog overnight, slow enough that two consecutive
# encodes don't queue up before the box has recovered
# from the previous one.
normalize_video_asset.apply_async(
args=(asset.asset_id,),
countdown=queued * _WALKER_DRIP_INTERVAL_S,
)
queued += 1
except Exception:
logging.exception(

View File

@@ -171,11 +171,23 @@ _MP4_FAMILY_CONTAINERS = frozenset(
# CRF values are chosen to roughly match perceived quality across
# codecs: libx264 CRF 23 ≈ libx265 CRF 28. Both leave plenty of
# headroom for a fleet's typical image-and-text signage content.
#
# ``-preset superfast`` is a deliberate Anthias-specific choice: the
# upload-time walker runs once per asset, so a 5-10× speedup vs
# ``medium`` saves the operator real time on every upload, and the
# slight increase in bitrate (typically 10-20%) is invisible on
# typical signage content (logos, photos, slow-motion product
# shots). On low-end SBCs the speedup is the difference between
# "operator sees the asset in rotation within 30 s" and "operator
# waits 5+ minutes per 4K clip" — measured live on the Rock Pi 4
# during this PR's validation. The viewer-side decode is HW
# regardless of encoder preset, so playback latency / smoothness
# is unaffected.
_H264_VIDEO_ARGS = [
'-c:v',
'libx264',
'-preset',
'medium',
'superfast',
'-crf',
'23',
]
@@ -184,7 +196,7 @@ _HEVC_VIDEO_ARGS = [
'-c:v',
'libx265',
'-preset',
'medium',
'superfast',
'-crf',
'28',
'-tag:v',

View File

@@ -1367,7 +1367,7 @@ def test_regenerate_walker_skips_in_envelope_assets(
_make_video_asset('in-envelope', envelope_dict=current.as_dict())
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay'
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change()
@@ -1386,12 +1386,12 @@ def test_regenerate_walker_queues_stale_assets(
_make_video_asset('stale', envelope_dict=stale)
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay'
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change()
assert n == 1
queued.assert_called_once_with('stale')
queued.assert_called_once_with(args=('stale',), countdown=0)
assert Asset.objects.get(asset_id='stale').is_processing is True
@@ -1407,12 +1407,12 @@ def test_regenerate_walker_queues_assets_with_no_envelope(
_make_video_asset('legacy', envelope_dict=None)
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay'
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change()
assert n == 1
queued.assert_called_once_with('legacy')
queued.assert_called_once_with(args=('legacy',), countdown=0)
@pytest.mark.django_db
@@ -1435,7 +1435,7 @@ def test_regenerate_walker_skips_image_assets(
)
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay'
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change()
@@ -1456,7 +1456,7 @@ def test_regenerate_walker_force_requeues_everything(
_make_video_asset('in-envelope-b', envelope_dict=current)
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay'
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change(force=True)
@@ -1475,12 +1475,12 @@ def test_regenerate_walker_handles_malformed_envelope(
_make_video_asset('malformed', envelope_dict={'codec': 'vp9'})
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay'
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change()
assert n == 1
queued.assert_called_once_with('malformed')
queued.assert_called_once_with(args=('malformed',), countdown=0)
@pytest.mark.django_db
@@ -1496,7 +1496,7 @@ def test_regenerate_walker_continues_after_per_row_failure(
# First call raises, second succeeds.
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.delay',
'anthias_server.celery_tasks.normalize_video_asset.apply_async',
side_effect=[RuntimeError('redis hiccup'), None],
) as queued:
n = regenerate_for_envelope_change()
@@ -1504,3 +1504,48 @@ def test_regenerate_walker_continues_after_per_row_failure(
assert queued.call_count == 2
# Only the successful call counts as "queued".
assert n == 1
@pytest.mark.django_db
def test_regenerate_walker_spaces_dispatches_via_countdown(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""An upgrade that invalidates the entire catalog (envelope key
change → every recorded envelope is stale → every asset needs
re-render) must NOT queue every normalize task back-to-back.
The walker drip-feeds via ``apply_async(countdown=N*60)`` so
each normalize starts at least 60 s after the previous one was
queued. ``--concurrency=1`` already serialises execution, but
without countdown the celery worker fetches the next task the
instant the previous finishes — same wall-clock effect as
queuing them all at once, leaving no breathing room between
encodes. Live test on the Rock Pi 4 showed sshd starving
through the burst even with cgroup CPU caps in place; the
countdown gives the box a recovery window between encodes
AND lets the operator cancel a row mid-flight (``is_processing
= False`` on the row before its countdown elapses)."""
monkeypatch.setenv('DEVICE_TYPE', 'pi5')
for i in range(5):
_make_video_asset(f'asset-{i}', envelope_dict=None)
with mock.patch(
'anthias_server.celery_tasks.normalize_video_asset.apply_async'
) as queued:
n = regenerate_for_envelope_change()
assert n == 5
# Countdown grows by ``_WALKER_DRIP_INTERVAL_S`` per asset.
# We don't pin the exact order (Asset queryset ordering is
# implementation-defined), just that the set of countdowns is
# 0, 60, 120, 180, 240 (or whatever the interval is).
countdowns = sorted(
call.kwargs['countdown'] for call in queued.call_args_list
)
interval = celery_tasks_module._WALKER_DRIP_INTERVAL_S
assert countdowns == [
0,
interval,
2 * interval,
3 * interval,
4 * interval,
]