mirror of
https://github.com/Screenly/Anthias.git
synced 2026-06-13 10:44:18 -04:00
* ci: enforce strict mypy across the codebase Add a Python mypy job that runs on every PR (`.github/workflows/python-mypy.yaml`) and the supporting type annotations to make `strict = true` pass cleanly across all 88 source files. Configuration choices: * `strict = true` with no global relaxations (no `disallow_subclassing_any = false`, no `disallow_untyped_decorators = false`, no `disable_error_code`, no `ignore_missing_imports`). * `follow_imports = "normal"`. * django-stubs + djangorestframework-stubs plugins; django_stubs_ext.monkeypatch() in settings so generic Django classes are subscriptable at runtime. * Local stubs in `stubs/` for libraries that ship incomplete type info (drf_spectacular's view methods, redis-py's sync API). * A scoped `[[tool.mypy.overrides]]` block lists 7 third-party libs without any type info (cec, geventwebsocket, hurry.filesize, pydbus, sh, splinter, vlc) so future stub releases will be noticed instead of silently ignored. The two `# type: ignore` escape hatches that previously existed are gone: `lib/utils.py` now imports `mplayer` from `sh` properly, and `tests/test_settings.py` patches `os.getenv` via `mock.patch.object` instead of direct assignment. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * ci: fix mypy CI runtime deps and unused HttpResponse import * `lib/auth.py`: drop the local `HttpResponse` import; only the string-form annotation needs it and the runtime `isinstance` check only uses `HttpRequest` (caught by ruff's F401). * Add a `mypy` dependency group (extends `dev-host` with the runtime deps that `anthias_django.settings` touches) so the django-stubs plugin can introspect the app registry on a fresh CI runner. Skips the heavy native-extension deps (cec, netifaces, etc.) that aren't needed for type checking. * python-mypy workflow: install via `uv pip install --group mypy` (consistent with other workflows) and create a dummy `~/.screenly/screenly.conf` so the settings module's first-import side effect succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(security): migrate password hashing to PBKDF2 (CodeQL py/weak-sensitive-data-hashing) CodeQL flagged the SHA256-based password hashing in lib/auth.py and api/views/v2.py as a weak password hash (SHA256 is a fast hash, unsuitable for password storage). Switch new passwords to Django's `make_password` (PBKDF2-SHA256, 600k iterations by default in Django 4.2). Add `hash_password` / `verify_password` helpers in lib/auth.py that: * hash with PBKDF2 for any new password write, * verify both Django-format hashes and legacy bare-SHA256 hex digests so existing config files still authenticate, * opportunistically re-hash legacy SHA256 entries to PBKDF2 on a successful login, phasing out the weak format over time. `settings.py` auto-migration now uses `hash_password` for first-load plaintext passwords and recognises both legacy SHA256 and Django algorithm-prefixed formats so it doesn't double-hash an already-stored hash. Also fixes the stale ruff format check (5 files reformatted by `ruff format`) that was breaking the python-lint job. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: drop legacy SHA256 password verify and add docker-image-builder to mypy group CodeQL kept flagging the SHA256 fallback path in `lib/auth.py:verify_password` as `py/weak-sensitive-data-hashing`, even though it was scoped to a one-time migration. Rather than suppress the alert, drop the legacy verify entirely so no password material ever flows into hashlib.sha256. Migration of existing installs now happens at settings load time: * `settings.py` detects a 64-char hex (legacy SHA256) password hash on read and clears both the `password` field and `auth_backend`. The device stays reachable (no auth required) and the operator must re-set credentials via the web UI. A clear warning is logged so the change is visible. Also fix the python-mypy CI job: include the `docker-image-builder` group in the `mypy` group so `pygit2` and `python_on_whales` (imported by `tools/image_builder/__main__.py`) resolve. BREAKING: any Anthias install with a SHA256-format password in `screenly.conf` will have basic auth disabled on first start of this version. The operator must log in (no password required) and set a new password via the UI to re-enable basic auth. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address review feedback (Copilot + manual review) Critical: * settings.py — `_get` previously called `hash_password()` at module-import time when it found a plaintext password. That imports Django's password hashers, which raises `ImproperlyConfigured` if reached before `django.setup()` (e.g. when `viewer/__init__.py` imports `settings`). Drop the auto-hash entirely and treat plaintext the same as legacy SHA256: clear it, disable basic auth, log an `error`-level warning, and persist the cleaned state via `_needs_save_after_load` so the warning doesn't repeat on every load. * lib/backup_helper.py — `recover()` called `tar.extractall()` on an uploaded archive. A crafted backup with `../` paths or symlinks could overwrite arbitrary files under HOME. Add `_safe_extract()` that validates each member's resolved path stays inside the destination and rejects unsafe symlinks/hardlinks. * api/helpers.py — `custom_exception_handler()` discarded DRF's default response and always returned 500, breaking 4xx propagation for `ValidationError`/`NotFound`/etc. Return DRF's response when present; fall back to 500 only when it's None. * tests/test_settings.py — `broken_settings_should_raise_value_error` was missing the `test_` prefix and never actually ran. Renamed. API correctness: * api/views/mixins.py — replace bare `raise Exception(...)` for missing uploads / wrong file extensions / missing asset URI with DRF's `ValidationError` / `NotFound`, so callers get proper 4xx with a structured error body instead of a 500. * anthias_app/helpers.py, api/serializers/{mixins,v1_1}.py — replace `assert video_duration is not None` with explicit raises. `assert` is stripped under `python -O`, which would silently turn the next call into an `AttributeError` on None. Typing/stubs: * host_agent.py — pass `decode_responses=True` to both `redis.Redis()` constructions and switch the channel name + command map keys from bytes to str. The local redis stub assumes decoded responses, and host_agent was the only caller violating that invariant. * lib/auth.py — document the `R | HttpResponse` return-type collapse for DRF Response (mirrors Django's @login_required). * api/serializers/__init__.py — comment why `UpdateAssetSerializer`'s fields are widened to `Field[Any, Any, Any, Any]` (so v2's overrides with different field types don't trip [assignment]). Tests/CI: * api/tests/test_v2_endpoints.py — also assert the stored hash starts with `pbkdf2_sha256$` so a regression to a weaker hasher is caught even if `verify_password()` itself were broken. * .github/workflows/python-mypy.yaml — write a minimal valid `screenly.conf` (with section headers) instead of an empty file. The empty-file path only worked by accident of `_get`'s defensive try/except. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(test): force re-import of `settings` in fake_settings() The `fake_settings()` test helper only deleted `sys.modules['settings']` *after* the yield (i.e. on clean exit). Once a prior test imported `settings` cleanly, subsequent `import settings` calls returned the cache without re-instantiating `AnthiasSettings`, so the fixture's config file was silently ignored. This was hidden because `test_broken_settings_should_raise_value_error` was missing the `test_` prefix and never ran. After renaming it in the previous commit it now runs and exposes the bug. Pop the module before import (and again in `finally`) so each test gets a fresh `AnthiasSettings()` instance bound to the fixture file. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address Copilot review feedback - backup_helper: use dedicated BackupRecoverError instead of bare Exception so callers can map archive failures to 4xx responses. - mixins: catch BackupRecoverError / TarError in RecoverViewMixin and re-raise as DRF ValidationError so bad uploads return 400, not 500. - utils: drop top-level `from sh import ffprobe, mplayer`; resolve binaries lazily with sh.Command at call sites so a missing tool doesn't break module import. - host_agent: import redis ConnectionError explicitly (mypy strict). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(ci): unblock mypy and ruff format; sanitize recover error - stubs/redis-stubs: export ConnectionError so host_agent's retry_if_exception_type(redis.ConnectionError) typechecks under the isolated mypy CI env (no real redis package installed). - host_agent.py: switch to redis.ConnectionError (matches new stub). - lib/utils.py: ruff format fix for the lazy sh.Command(...) call. - api/views/mixins.py: don't echo backup-recover exception text into the API response (CodeQL py/stack-trace-exposure). Log server-side and return a generic 'Invalid backup archive.' message instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(security): generate server-side name for backup recovery upload Don't pass the client-supplied `file_upload.name` into `path.join('static', ...)`. A crafted name (path separators, leading `../`, or an absolute path) could write outside the static directory before the tarball is parsed. Use a UUID-named `.tar.gz` instead — the original filename is never needed again after the content-type check. Addresses suppressed Copilot review comment on api/views/mixins.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address Copilot review round 2 - lib/utils.py: catch sh.CommandNotFound in get_video_duration (return None) and url_fails (skip streaming probe) so missing ffprobe/mplayer don't surface as 500s. - lib/auth.py: decode basic-auth credentials with partition(':') so passwords containing ':' are accepted (RFC 7617). - lib/backup_helper.py: tighten _safe_extract — reject non-regular tar members (symlinks, hardlinks, device nodes, FIFOs) and extract members individually after validation instead of calling extractall(). - tests/test_backup_helper.py: add coverage for path traversal, absolute-path, symlink, and FIFO rejection in _safe_extract. - settings.py: fail fast in AnthiasSettings.__init__ when HOME is unset instead of silently rooting all config/state paths at the cwd. - api/serializers/v1_1.py: raise ValidationError when 'duration' is missing/invalid for non-video assets instead of an unhandled KeyError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(test): mark tar fixture/extract calls as NOSONAR The new SafeExtractTest builds and opens single-member tar archives specifically to test the safe-extract guard. SonarCloud flags any tarfile.open(...) call (rule python:S5042). Annotate the two test- fixture call sites with NOSONAR — same pattern used elsewhere in this repo (e.g. host_agent.py). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(ci): use 6-space step indentation in python-mypy.yaml Match the majority style used by the other workflows in this repo. The previous 4-space indent under `steps:` was valid YAML and ran fine in CI, but consistency with build-webview.yaml, build-balena-disk-image.yaml, deploy-website.yaml, etc. makes the file easier to read alongside the rest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address Copilot review round 3 - api/serializers/v1_1.py: raise DRF ValidationError instead of ValueError when video duration can't be determined, so clients get a 4xx with a field-level error rather than a 500. - api/views/mixins.py: ensure the uploaded backup tarball is removed in all paths (success and failure). recover() only deletes on success; without explicit cleanup, rejected archives — including attacker-controlled ones — accumulate under static/. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style: ruff format tests/test_backup_helper.py Single line that fit within 79 chars but I had broken across three lines during the merge resolution. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address Copilot review round 4 * Add partial PEP 561 stubs in `stubs/channels-stubs/` covering the `channels` API surface we actually use: `AsyncWebsocketConsumer` (with `as_asgi`), `ProtocolTypeRouter`, `URLRouter`, `AllowedHostsOriginValidator`, and `get_channel_layer`. Drop the `# type: ignore[misc]` from `AssetConsumer` and remove `channels.*` from the mypy `ignore_missing_imports` overrides — the override block is back to listing only libs that genuinely ship no type info. * Delete `_safe_extract` and `_is_within_directory` from `lib/backup_helper.py`, plus the corresponding `SafeExtractTest` and `_build_archive_with` from `tests/test_backup_helper.py`. After the master merge, `recover()` validates each member through `_safe_tar_member` (skip-on-fail) and extracts inside the loop, so the older raise-on-fail helper became dead code with no caller. `RecoverLegacyTarballTest::test_recover_skips_path_traversal_member` already exercises the path-traversal guard end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address Copilot review round 5 * `celery_tasks.py:cleanup()` — bail out with a logged error when HOME is unset instead of `path.join('', 'anthias_assets')` resolving to a relative path. The `find ... -delete` was never going to run on a Pi without HOME, but the silent fallback would have chewed through the celery worker's cwd if it ever did. * `api/serializers/v1_1.py:CreateAssetSerializerV1_1.prepare_asset()` — fix the video duration handling so non-zero and omitted durations are no longer dropped on the floor. Previously only the magic `duration == 0` case set `asset['duration']`; a client posting `duration=30` for a video produced a serialized dict with no duration key, then the view did `Asset.objects.create(**data)` and the row got the field's default. Now: missing or 0 means "infer from the file via get_video_duration()"; any other integer is persisted as-is. Non-int values raise ValidationError up front. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
516 lines
14 KiB
Python
516 lines
14 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import string
|
|
from datetime import datetime, timedelta
|
|
from distutils.util import strtobool
|
|
from os import getenv, path, utime
|
|
from platform import machine
|
|
from subprocess import call, check_output
|
|
from threading import Thread
|
|
from time import sleep
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
import certifi
|
|
import pytz
|
|
import redis
|
|
import requests
|
|
import sh
|
|
from tenacity import (
|
|
RetryError,
|
|
Retrying,
|
|
stop_after_attempt,
|
|
wait_fixed,
|
|
)
|
|
|
|
from anthias_app.models import Asset
|
|
from settings import settings
|
|
|
|
arch = machine()
|
|
|
|
|
|
def string_to_bool(string: Any) -> bool:
|
|
return bool(strtobool(str(string)))
|
|
|
|
|
|
def touch(path: str) -> None:
|
|
with open(path, 'a'):
|
|
utime(path, None)
|
|
|
|
|
|
def is_ci() -> bool:
|
|
"""
|
|
Returns True when run on CI.
|
|
"""
|
|
return string_to_bool(os.getenv('CI', False))
|
|
|
|
|
|
def validate_url(string: str) -> bool:
|
|
"""Simple URL verification.
|
|
>>> validate_url("hello")
|
|
False
|
|
>>> validate_url("ftp://example.com")
|
|
False
|
|
>>> validate_url("http://")
|
|
False
|
|
>>> validate_url("http://wireload.net/logo.png")
|
|
True
|
|
>>> validate_url("https://wireload.net/logo.png")
|
|
True
|
|
"""
|
|
|
|
checker = urlparse(string)
|
|
return bool(
|
|
checker.scheme in ('http', 'https', 'rtsp', 'rtmp') and checker.netloc
|
|
)
|
|
|
|
|
|
def get_balena_supervisor_api_response(
|
|
method: str,
|
|
action: str,
|
|
**kwargs: Any,
|
|
) -> requests.Response:
|
|
version = kwargs.get('version', 'v1')
|
|
response: requests.Response = getattr(requests, method)(
|
|
'{}/{}/{}?apikey={}'.format(
|
|
os.getenv('BALENA_SUPERVISOR_ADDRESS'),
|
|
version,
|
|
action,
|
|
os.getenv('BALENA_SUPERVISOR_API_KEY'),
|
|
),
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
return response
|
|
|
|
|
|
def get_balena_device_info() -> requests.Response:
|
|
return get_balena_supervisor_api_response(method='get', action='device')
|
|
|
|
|
|
def shutdown_via_balena_supervisor() -> requests.Response:
|
|
return get_balena_supervisor_api_response(method='post', action='shutdown')
|
|
|
|
|
|
def reboot_via_balena_supervisor() -> requests.Response:
|
|
return get_balena_supervisor_api_response(method='post', action='reboot')
|
|
|
|
|
|
def get_balena_supervisor_version() -> str:
|
|
response = get_balena_supervisor_api_response(
|
|
method='get', action='version', version='v2'
|
|
)
|
|
if response.ok:
|
|
return str(response.json()['version'])
|
|
else:
|
|
return 'Error getting the Supervisor version'
|
|
|
|
|
|
def get_node_ip() -> str:
|
|
"""
|
|
Returns the node's IP address.
|
|
We're using an API call to the supervisor for this on Balena
|
|
and an environment variable set by `install.sh` for other environments.
|
|
The reason for this is because we can't retrieve the host IP from
|
|
within Docker.
|
|
"""
|
|
|
|
if is_balena_app():
|
|
response = get_balena_device_info()
|
|
if response.ok:
|
|
return str(response.json()['ip_address'])
|
|
return 'Unknown'
|
|
else:
|
|
r = connect_to_redis()
|
|
max_retries = 60
|
|
retries = 0
|
|
|
|
while True:
|
|
environment = getenv('ENVIRONMENT', None)
|
|
if environment in ['development', 'test']:
|
|
break
|
|
|
|
is_ready = r.get('host_agent_ready') or 'false'
|
|
|
|
if json.loads(is_ready):
|
|
break
|
|
|
|
if retries >= max_retries:
|
|
logging.info(
|
|
'host_agent_service is not ready after %d retries',
|
|
max_retries,
|
|
)
|
|
break
|
|
|
|
retries += 1
|
|
sleep(1)
|
|
|
|
r.publish('hostcmd', 'set_ip_addresses')
|
|
|
|
try:
|
|
for attempt in Retrying(
|
|
stop=stop_after_attempt(20),
|
|
wait=wait_fixed(1),
|
|
):
|
|
environment = getenv('ENVIRONMENT', None)
|
|
if environment in ['development', 'test']:
|
|
break
|
|
|
|
with attempt:
|
|
ip_addresses_ready = r.get('ip_addresses_ready') or 'false'
|
|
if json.loads(ip_addresses_ready):
|
|
break
|
|
else:
|
|
raise Exception(
|
|
'Internet connection is not available.'
|
|
)
|
|
except RetryError:
|
|
logging.warning('Internet connection is not available. ')
|
|
|
|
ip_addresses = r.get('ip_addresses')
|
|
|
|
if ip_addresses:
|
|
return ' '.join(json.loads(ip_addresses))
|
|
elif os.getenv('MY_IP'):
|
|
return os.getenv('MY_IP') or 'Unable to retrieve IP.'
|
|
|
|
return 'Unable to retrieve IP.'
|
|
|
|
|
|
def get_node_mac_address() -> str:
|
|
"""
|
|
Returns the MAC address.
|
|
"""
|
|
if is_balena_app():
|
|
balena_supervisor_address = os.getenv('BALENA_SUPERVISOR_ADDRESS')
|
|
balena_supervisor_api_key = os.getenv('BALENA_SUPERVISOR_API_KEY')
|
|
headers = {'Content-Type': 'application/json'}
|
|
|
|
r = requests.get(
|
|
'{}/v1/device?apikey={}'.format(
|
|
balena_supervisor_address, balena_supervisor_api_key
|
|
),
|
|
headers=headers,
|
|
)
|
|
|
|
if r.ok:
|
|
return str(r.json()['mac_address'])
|
|
return 'Unknown'
|
|
|
|
return os.getenv('MAC_ADDRESS', 'Unable to retrieve MAC address.')
|
|
|
|
|
|
def get_active_connections(
|
|
bus: Any,
|
|
fields: list[str] | None = None,
|
|
) -> list[dict[str, Any]] | None:
|
|
"""
|
|
|
|
:param bus: pydbus.bus.Bus
|
|
:param fields: list
|
|
:return: list
|
|
"""
|
|
if not fields:
|
|
fields = ['Id', 'Uuid', 'Type', 'Devices']
|
|
|
|
connections = list()
|
|
|
|
try:
|
|
nm_proxy = bus.get(
|
|
'org.freedesktop.NetworkManager',
|
|
'/org/freedesktop/NetworkManager',
|
|
)
|
|
except Exception:
|
|
return None
|
|
|
|
nm_properties = nm_proxy['org.freedesktop.DBus.Properties']
|
|
active_connections = nm_properties.Get(
|
|
'org.freedesktop.NetworkManager', 'ActiveConnections'
|
|
)
|
|
for active_connection in active_connections:
|
|
active_connection_proxy = bus.get(
|
|
'org.freedesktop.NetworkManager', active_connection
|
|
)
|
|
active_connection_properties = active_connection_proxy[
|
|
'org.freedesktop.DBus.Properties'
|
|
]
|
|
|
|
connection = dict()
|
|
for field in fields:
|
|
field_value = active_connection_properties.Get(
|
|
'org.freedesktop.NetworkManager.Connection.Active', field
|
|
)
|
|
|
|
if field == 'Devices':
|
|
devices = list()
|
|
for device_path in field_value:
|
|
device_proxy = bus.get(
|
|
'org.freedesktop.NetworkManager', device_path
|
|
)
|
|
device_properties = device_proxy[
|
|
'org.freedesktop.DBus.Properties'
|
|
]
|
|
devices.append(
|
|
device_properties.Get(
|
|
'org.freedesktop.NetworkManager.Device',
|
|
'Interface',
|
|
)
|
|
)
|
|
field_value = devices
|
|
|
|
connection.update({field: field_value})
|
|
connections.append(connection)
|
|
|
|
return connections
|
|
|
|
|
|
def remove_connection(bus: Any, uuid: str) -> bool:
|
|
"""
|
|
|
|
:param bus: pydbus.bus.Bus
|
|
:param uuid: string
|
|
:return: boolean
|
|
"""
|
|
try:
|
|
nm_proxy = bus.get(
|
|
'org.freedesktop.NetworkManager',
|
|
'/org/freedesktop/NetworkManager/Settings',
|
|
)
|
|
except Exception:
|
|
return False
|
|
|
|
nm_settings = nm_proxy['org.freedesktop.NetworkManager.Settings']
|
|
|
|
connection_path = nm_settings.GetConnectionByUuid(uuid)
|
|
connection_proxy = bus.get(
|
|
'org.freedesktop.NetworkManager', connection_path
|
|
)
|
|
connection = connection_proxy[
|
|
'org.freedesktop.NetworkManager.Settings.Connection'
|
|
]
|
|
connection.Delete()
|
|
|
|
return True
|
|
|
|
|
|
def get_video_duration(file: str) -> timedelta | None:
|
|
"""
|
|
Returns the duration of a video file in timedelta.
|
|
|
|
Returns None if ffprobe is not available on the host so callers can
|
|
surface a clean validation error instead of a 500.
|
|
"""
|
|
time = None
|
|
|
|
try:
|
|
run_player = sh.Command('ffprobe')('-i', file, _err_to_out=True)
|
|
except sh.CommandNotFound:
|
|
logging.warning('ffprobe is not installed; cannot determine duration')
|
|
return None
|
|
except sh.ErrorReturnCode_1 as err:
|
|
raise Exception('Bad video format') from err
|
|
|
|
for line in run_player.split('\n'):
|
|
if 'Duration' in line:
|
|
match = re.search(r'[0-9]+:[0-9]+:[0-9]+\.[0-9]+', line)
|
|
if match:
|
|
time_input = match.group()
|
|
time_split = time_input.split(':')
|
|
hours = int(time_split[0])
|
|
minutes = int(time_split[1])
|
|
seconds = float(time_split[2])
|
|
time = timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
break
|
|
|
|
return time
|
|
|
|
|
|
def handler(obj: Any) -> str:
|
|
# Set timezone as UTC if it's datetime and format as ISO
|
|
if isinstance(obj, datetime):
|
|
with_tz = obj.replace(tzinfo=pytz.utc)
|
|
return with_tz.isoformat()
|
|
else:
|
|
raise TypeError(
|
|
f'Object of type {type(obj)} with value of {repr(obj)} '
|
|
'is not JSON serializable'
|
|
)
|
|
|
|
|
|
def json_dump(obj: Any) -> str:
|
|
return json.dumps(obj, default=handler)
|
|
|
|
|
|
def url_fails(url: str) -> bool:
|
|
"""
|
|
If it is streaming
|
|
"""
|
|
if urlparse(url).scheme in ('rtsp', 'rtmp'):
|
|
try:
|
|
run_mplayer = sh.Command('mplayer')(
|
|
'-identify', '-frames', '0', '-nosound', url
|
|
)
|
|
except sh.CommandNotFound:
|
|
logging.warning(
|
|
'mplayer is not installed; skipping streaming URL probe'
|
|
)
|
|
return False
|
|
for line in run_mplayer.split('\n'):
|
|
if 'Clip info:' in line:
|
|
return False
|
|
return True
|
|
|
|
"""
|
|
Try HEAD and GET for URL availability check.
|
|
"""
|
|
|
|
# Use Certifi module and set to True as default so users stop
|
|
# seeing InsecureRequestWarning in logs.
|
|
verify: str | bool
|
|
if settings['verify_ssl']:
|
|
verify = certifi.where()
|
|
else:
|
|
verify = True
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (X11; Linux armv7l) AppleWebKit/538.15 (KHTML, like Gecko) Version/8.0 Safari/538.15' # noqa: E501
|
|
}
|
|
try:
|
|
if not validate_url(url):
|
|
return False
|
|
|
|
if requests.head(
|
|
url,
|
|
allow_redirects=True,
|
|
headers=headers,
|
|
timeout=10,
|
|
verify=verify,
|
|
).ok:
|
|
return False
|
|
|
|
if requests.get(
|
|
url,
|
|
allow_redirects=True,
|
|
headers=headers,
|
|
timeout=10,
|
|
verify=verify,
|
|
).ok:
|
|
return False
|
|
|
|
except (requests.ConnectionError, requests.exceptions.Timeout):
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
def download_video_from_youtube(
|
|
uri: str,
|
|
asset_id: str,
|
|
) -> tuple[str, str, int]:
|
|
home = getenv('HOME') or ''
|
|
name = check_output(['yt-dlp', '-O', 'title', uri])
|
|
info = json.loads(check_output(['yt-dlp', '-j', uri]))
|
|
duration = info['duration']
|
|
|
|
location = path.join(home, 'anthias_assets', f'{asset_id}.mp4')
|
|
thread = YoutubeDownloadThread(location, uri, asset_id)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return location, str(name.decode('utf-8')), duration
|
|
|
|
|
|
class YoutubeDownloadThread(Thread):
|
|
def __init__(
|
|
self,
|
|
location: str,
|
|
uri: str,
|
|
asset_id: str,
|
|
) -> None:
|
|
Thread.__init__(self)
|
|
self.location = location
|
|
self.uri = uri
|
|
self.asset_id = asset_id
|
|
|
|
def run(self) -> None:
|
|
call(
|
|
[
|
|
'yt-dlp',
|
|
'-S',
|
|
'vcodec:h264,fps,res:1080,acodec:m4a',
|
|
'-o',
|
|
self.location,
|
|
self.uri,
|
|
]
|
|
)
|
|
|
|
try:
|
|
asset = Asset.objects.get(asset_id=self.asset_id)
|
|
asset.is_processing = False
|
|
asset.save()
|
|
except Asset.DoesNotExist:
|
|
logging.warning('Asset %s not found', self.asset_id)
|
|
return
|
|
|
|
# Imported lazily so the viewer container (which does not
|
|
# ship channels/channels-redis) can still import lib.utils.
|
|
from asgiref.sync import async_to_sync
|
|
from channels.layers import get_channel_layer
|
|
|
|
async_to_sync(get_channel_layer().group_send)(
|
|
'ws_server',
|
|
{'type': 'asset.update', 'asset_id': self.asset_id},
|
|
)
|
|
|
|
|
|
def template_handle_unicode(value: Any) -> str:
|
|
return str(value)
|
|
|
|
|
|
def is_demo_node() -> bool:
|
|
"""
|
|
Check if the environment variable IS_DEMO_NODE is set to 1
|
|
:return: bool
|
|
"""
|
|
return string_to_bool(os.getenv('IS_DEMO_NODE', False))
|
|
|
|
|
|
def generate_perfect_paper_password(
|
|
pw_length: int = 10,
|
|
has_symbols: bool = True,
|
|
) -> str:
|
|
"""
|
|
Generates a password using 64 characters from
|
|
"Perfect Paper Password" system by Steve Gibson
|
|
|
|
:param pw_length: int
|
|
:param has_symbols: bool
|
|
:return: string
|
|
"""
|
|
ppp_letters = (
|
|
'!#%+23456789:=?@ABCDEFGHJKLMNPRSTUVWXYZabcdefghjkmnopqrstuvwxyz' # noqa: E501
|
|
)
|
|
if not has_symbols:
|
|
ppp_letters = ''.join(set(ppp_letters) - set(string.punctuation))
|
|
return ''.join(
|
|
random.SystemRandom().choice(ppp_letters) for _ in range(pw_length)
|
|
)
|
|
|
|
|
|
def connect_to_redis() -> 'redis.Redis':
|
|
return redis.Redis(host='redis', decode_responses=True, port=6379, db=0)
|
|
|
|
|
|
def is_docker() -> bool:
|
|
return os.path.isfile('/.dockerenv')
|
|
|
|
|
|
def is_balena_app() -> bool:
|
|
"""
|
|
Checks the application is running on Balena Cloud
|
|
:return: bool
|
|
"""
|
|
return bool(getenv('BALENA', False))
|