Files
Anthias/tests/test_messaging.py
Viktor Petersson 133ec78ff0 refactor(packaging): adopt src/ layout with split server/viewer packages (#2817)
* refactor(packaging): adopt src/ layout with split server/viewer packages

Move all Python source under src/ following modern packaging conventions.
Server, viewer, host-agent, and shared common code now live as four
top-level packages with clear excision boundaries — anthias_viewer can
be removed wholesale when the rewrite-out-of-Python lands without
touching the server.

  src/anthias_common/         shared: errors, utils, internal_auth, device_helper
  src/anthias_server/         Django app, REST API, Celery tasks, manage.py
    lib/                      server-only: auth, backup_helper, diagnostics, github, telemetry
  src/anthias_viewer/         player runtime (was viewer/)
  src/anthias_host_agent/     systemd-driven host shim (was host_agent.py)
  tools/raspberry_pi_imager/  moved from repo root
  tests/conftest.py           moved from repo root

pyproject.toml gets [build-system], setuptools src/ discovery, and an
anthias-manage console script. Django AppConfigs keep label='anthias_app'
and label='api' so existing migration dependency tuples don't move.
BASE_DIR computed from parents[3] to keep templates/static at repo root.
mypy_path set to ["src", "stubs"] with explicit_package_bases.

Dockerfile templates set PYTHONPATH=/usr/src/app/src; bin/start_*.sh
and CI workflows use python -m anthias_server.manage / python -m
anthias_viewer instead of bare ./manage.py and python -m viewer.
Ansible host-agent unit invokes python -m anthias_host_agent.

Verified end-to-end in the docker test container:
  - 430 unit tests pass (matches baseline)
  - 7 integration tests pass, 5 skipped (matches baseline)
  - ruff, mypy clean

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style: ruff format the new src/ tree

The longer post-rename module paths (anthias_common.internal_auth vs
lib.internal_auth, etc.) pushed several import lines past 79 chars, so
ruff format had to wrap them. Apply that formatting and split the one
multi-import in anthias_viewer/__init__.py into per-symbol lines so the
existing # noqa: E402 sits on the `from` line where ruff expects it,
without needing a re-anchor when format wraps the parens.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: realign sonar + gitignore comment to src/ layout

sonar-project.properties still pointed at the pre-refactor top-level
packages (anthias_app, anthias_django, api, lib, viewer, ...) and
their old per-file coverage.exclusions paths, which would have
produced empty Sonar runs and stale exclusions. Collapse sources to
`src` and rewrite the exclusions to the new src/anthias_*/ paths.

Also fix the stale path reference in .gitignore's comment for the
test DB (now src/anthias_server/django_project/settings.py).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: gitignore .claude/ and untrack the lock file I just leaked

Previous commit accidentally pulled in .claude/scheduled_tasks.lock
because .claude was in .dockerignore but not .gitignore. Add the
pattern to .gitignore and drop the file from the index.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(dockerignore): exclude pytest cache, __pycache__ dirs, and the local test DB

Three entries that were missing relative to the new src/ layout:

- .anthias-test.db (and -journal/-wal/-shm siblings) — created at the
  repo root by src/anthias_server/django_project/settings.py when a
  developer runs the host pytest suite. Without this exclude, the
  next docker build COPY . bakes the file into /usr/src/app/.
- **/__pycache__ — *.py[co] only matched the .pyc/.pyo files, leaving
  the empty cache directories to ship.
- .pytest_cache — host-side, regenerable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(urls): preserve 'anthias_app' URL namespace, not just the app label

Copilot caught that the import-rewrite swept up the URL namespace too:
app_name in src/anthias_server/app/urls.py changed from 'anthias_app'
to 'anthias_server.app', which leaves templates/login.html's
{% url 'anthias_app:login' %} pointing at a namespace that no longer
exists — NoReverseMatch at render time when an unauthenticated request
hits the login page.

The namespace is the same kind of stable user-facing identifier as the
AppConfig label (which we already kept as 'anthias_app'). Restore it,
and revert the two reverse() callers in lib/auth.py and app/views.py
that the rewrite changed in lockstep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(ci): update --confcutdir to the new tools/raspberry_pi_imager path

Copilot caught that the earlier sweep missed --confcutdir=raspberry_pi_imager
(no trailing slash) — replace_all of "raspberry_pi_imager/" only matched
path-with-slash forms. Without confcutdir, pytest walks back up looking
for conftests and discovers the repo-root tests/conftest.py, which
applies the Anthias-specific Django/Redis stubs to the rpi-imager test
run on the website-deploy workflow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 08:08:32 +01:00

301 lines
9.9 KiB
Python

import json
import logging
from typing import Any, cast
from unittest import mock
from unittest.mock import MagicMock
import pytest
import redis
from anthias_common.errors import ReplyTimeoutError
from anthias_server import settings as settings_module
from anthias_server.settings import (
REPLY_KEY_PREFIX,
ReplyCollector,
ReplySender,
ViewerPublisher,
)
from anthias_viewer.messaging import ViewerSubscriber
logging.disable(logging.CRITICAL)
# ---------------------------------------------------------------------------
# settings.ViewerPublisher / ReplySender / ReplyCollector
# ---------------------------------------------------------------------------
def test_viewer_publisher_send_publishes_correct_payload() -> None:
fake_redis = MagicMock()
publisher = ViewerPublisher.__new__(ViewerPublisher)
publisher._redis = fake_redis
publisher.send_to_viewer('next')
fake_redis.publish.assert_called_once_with(
settings_module.VIEWER_CHANNEL, 'viewer next'
)
def test_viewer_publisher_singleton_rejects_second_init() -> None:
sentinel = cast(ViewerPublisher, object())
original = ViewerPublisher.INSTANCE
try:
ViewerPublisher.INSTANCE = sentinel
with pytest.raises(ValueError, match='instance already exists'):
ViewerPublisher()
finally:
ViewerPublisher.INSTANCE = original
def test_viewer_publisher_get_instance_creates_once() -> None:
original = ViewerPublisher.INSTANCE
ViewerPublisher.INSTANCE = None
try:
# connect_to_redis is called inside __init__; mock it.
with mock.patch(
'anthias_common.utils.connect_to_redis', return_value=MagicMock()
):
inst = ViewerPublisher.get_instance()
assert inst is ViewerPublisher.INSTANCE
# Calling get_instance again returns the cached instance.
with mock.patch(
'anthias_common.utils.connect_to_redis', return_value=MagicMock()
):
assert ViewerPublisher.get_instance() is inst
finally:
ViewerPublisher.INSTANCE = original
def test_reply_sender_pushes_json_and_sets_ttl() -> None:
fake_redis = MagicMock()
sender = ReplySender(fake_redis)
sender.send('correlation-1', {'asset_id': 'abc'})
expected_key = f'{REPLY_KEY_PREFIX}correlation-1'
fake_redis.rpush.assert_called_once_with(
expected_key, json.dumps({'asset_id': 'abc'})
)
fake_redis.expire.assert_called_once_with(expected_key, 30)
def test_reply_collector_recv_blocking_returns_decoded_payload() -> None:
fake_redis = MagicMock()
fake_redis.blpop.return_value = (
f'{REPLY_KEY_PREFIX}cid'.encode(),
json.dumps({'ok': True}),
)
collector = ReplyCollector.__new__(ReplyCollector)
collector._redis = fake_redis
result = collector.recv_json('cid', timeout_ms=2000)
assert result == {'ok': True}
fake_redis.blpop.assert_called_once_with(
f'{REPLY_KEY_PREFIX}cid', timeout=2
)
def test_reply_collector_blocking_rounds_up_to_next_second() -> None:
fake_redis = MagicMock()
fake_redis.blpop.return_value = (b'k', json.dumps(1))
collector = ReplyCollector.__new__(ReplyCollector)
collector._redis = fake_redis
collector.recv_json('cid', timeout_ms=1500)
# 1500ms → 2 seconds.
fake_redis.blpop.assert_called_once_with(
f'{REPLY_KEY_PREFIX}cid', timeout=2
)
def test_reply_collector_blocking_timeout() -> None:
fake_redis = MagicMock()
fake_redis.blpop.return_value = None
collector = ReplyCollector.__new__(ReplyCollector)
collector._redis = fake_redis
with pytest.raises(ReplyTimeoutError):
collector.recv_json('cid', timeout_ms=100)
def test_reply_collector_non_blocking_uses_lpop() -> None:
fake_redis = MagicMock()
fake_redis.lpop.return_value = json.dumps({'value': 42})
collector = ReplyCollector.__new__(ReplyCollector)
collector._redis = fake_redis
assert collector.recv_json('cid', timeout_ms=0) == {'value': 42}
fake_redis.lpop.assert_called_once_with(f'{REPLY_KEY_PREFIX}cid')
fake_redis.blpop.assert_not_called()
def test_reply_collector_non_blocking_negative_timeout_uses_lpop() -> None:
fake_redis = MagicMock()
fake_redis.lpop.return_value = json.dumps([1, 2, 3])
collector = ReplyCollector.__new__(ReplyCollector)
collector._redis = fake_redis
assert collector.recv_json('cid', timeout_ms=-1) == [1, 2, 3]
fake_redis.lpop.assert_called_once()
def test_reply_collector_non_blocking_empty_raises_timeout() -> None:
fake_redis = MagicMock()
fake_redis.lpop.return_value = None
collector = ReplyCollector.__new__(ReplyCollector)
collector._redis = fake_redis
with pytest.raises(ReplyTimeoutError):
collector.recv_json('cid', timeout_ms=0)
def test_reply_collector_singleton_rejects_second_init() -> None:
sentinel = cast(ReplyCollector, object())
original = ReplyCollector.INSTANCE
try:
ReplyCollector.INSTANCE = sentinel
with pytest.raises(ValueError, match='instance already exists'):
ReplyCollector()
finally:
ReplyCollector.INSTANCE = original
def test_reply_collector_get_instance_creates_once() -> None:
original = ReplyCollector.INSTANCE
ReplyCollector.INSTANCE = None
try:
with mock.patch(
'anthias_common.utils.connect_to_redis', return_value=MagicMock()
):
inst = ReplyCollector.get_instance()
assert inst is ReplyCollector.INSTANCE
with mock.patch(
'anthias_common.utils.connect_to_redis', return_value=MagicMock()
):
assert ReplyCollector.get_instance() is inst
finally:
ReplyCollector.INSTANCE = original
# ---------------------------------------------------------------------------
# viewer.messaging.ViewerSubscriber._consume
# ---------------------------------------------------------------------------
def _make_subscriber(commands: dict[str, Any]) -> ViewerSubscriber:
return ViewerSubscriber(MagicMock(), commands, topic='viewer')
def test_subscriber_dispatches_command_with_parameter() -> None:
handler = MagicMock()
sub = _make_subscriber({'next': handler})
pubsub = MagicMock()
pubsub.listen.return_value = iter([{'data': 'viewer next&5'}])
sub._consume(pubsub)
handler.assert_called_once_with('5')
def test_subscriber_dispatches_command_without_parameter() -> None:
handler = MagicMock()
sub = _make_subscriber({'reload': handler})
pubsub = MagicMock()
pubsub.listen.return_value = iter([{'data': 'viewer reload'}])
sub._consume(pubsub)
# No '&' in payload → parameter is None.
handler.assert_called_once_with(None)
def test_subscriber_skips_messages_with_wrong_topic() -> None:
handler = MagicMock()
unknown_handler = MagicMock()
sub = _make_subscriber({'next': handler, 'unknown': unknown_handler})
pubsub = MagicMock()
pubsub.listen.return_value = iter([{'data': 'somethingelse next'}])
sub._consume(pubsub)
handler.assert_not_called()
unknown_handler.assert_not_called()
def test_subscriber_skips_messages_with_empty_body() -> None:
handler = MagicMock()
sub = _make_subscriber({'next': handler})
pubsub = MagicMock()
pubsub.listen.return_value = iter([{'data': 'viewer'}])
sub._consume(pubsub)
handler.assert_not_called()
def test_subscriber_skips_non_string_data() -> None:
handler = MagicMock()
sub = _make_subscriber({'next': handler})
pubsub = MagicMock()
pubsub.listen.return_value = iter(
[{'data': 1}, {'data': b'bytes'}, {'data': None}]
)
sub._consume(pubsub)
handler.assert_not_called()
def test_subscriber_falls_back_to_unknown_handler() -> None:
unknown = MagicMock()
sub = _make_subscriber({'unknown': unknown})
pubsub = MagicMock()
pubsub.listen.return_value = iter([{'data': 'viewer mystery&x'}])
sub._consume(pubsub)
unknown.assert_called_once_with('x')
def test_subscriber_no_handler_when_unknown_missing() -> None:
sub = _make_subscriber({'next': MagicMock()})
pubsub = MagicMock()
pubsub.listen.return_value = iter([{'data': 'viewer mystery'}])
# Should not raise.
sub._consume(pubsub)
def test_subscriber_run_signals_ready_then_exits_on_loop_break(
monkeypatch: Any,
) -> None:
"""run() should call subscribe(), set the readiness flag, and consume.
We fake _consume to raise once it's called so the outer while loop
exits on the next iteration via a sentinel-driven side effect.
"""
redis_conn = MagicMock()
pubsub = MagicMock()
redis_conn.pubsub.return_value = pubsub
sub = ViewerSubscriber(redis_conn, {'next': MagicMock()})
consume_calls: list[int] = []
def fake_consume(pubsub_arg: Any) -> None:
consume_calls.append(1)
# Second iteration would loop forever; raise a connection
# error and then patch sleep to short-circuit to a final
# ConnectionError that breaks out of the while True.
raise redis.ConnectionError()
monkeypatch.setattr(sub, '_consume', fake_consume)
# After the first ConnectionError, the sleep+retry cycle would
# spin forever. Patch sleep to raise so the test exits cleanly.
sleep_calls: list[float] = []
def fake_sleep(delay: float) -> None:
sleep_calls.append(delay)
raise SystemExit # break out of while True
with mock.patch('anthias_viewer.messaging.sleep', side_effect=fake_sleep):
with pytest.raises(SystemExit):
sub.run()
# First subscribe succeeded → readiness signalled True before
# disconnect, then False on connection loss.
pubsub.subscribe.assert_called_once_with(settings_module.VIEWER_CHANNEL)
set_calls = redis_conn.set.call_args_list
keys_set = [call.args[0] for call in set_calls]
assert 'viewer-subscriber-ready' in keys_set
assert len(consume_calls) == 1
assert sleep_calls == [ViewerSubscriber.INITIAL_RETRY_DELAY_S]
pubsub.close.assert_called_once()