Merge remote-tracking branch 'root/ble-logging' into pr-fixbluetooth

This commit is contained in:
Kevin Hester
2024-06-29 09:31:28 -07:00
9 changed files with 145 additions and 59 deletions

View File

@@ -76,7 +76,7 @@ from typing import *
import google.protobuf.json_format
import serial # type: ignore[import-untyped]
import timeago # type: ignore[import-untyped]
from dotmap import DotMap # type: ignore[import-untyped]
from google.protobuf.json_format import MessageToJson
from pubsub import pub # type: ignore[import-untyped]
from tabulate import tabulate

View File

@@ -6,6 +6,7 @@ import struct
import time
from threading import Event, Thread
from typing import Optional
from print_color import print
from bleak import BleakClient, BleakScanner, BLEDevice
@@ -16,6 +17,8 @@ SERVICE_UUID = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
FROMRADIO_UUID = "2c55e69e-4993-11ed-b878-0242ac120002"
FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
LOGRADIO_UUID = "6c6fd238-78fa-436b-aacf-15c5be1ef2e2"
class BLEInterface(MeshInterface):
@@ -75,12 +78,28 @@ class BLEInterface(MeshInterface):
logging.debug("Register FROMNUM notify callback")
self.client.start_notify(FROMNUM_UUID, self.from_num_handler)
self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler)
async def from_num_handler(self, _, b): # pylint: disable=C0116
from_num = struct.unpack("<I", bytes(b))[0]
logging.debug(f"FROMNUM notify: {from_num}")
self.should_read = True
async def log_radio_handler(self, _, b): # pylint: disable=C0116
log_radio = b.decode('utf-8').replace('\n', '')
if log_radio.startswith("DEBUG"):
print(log_radio, color="cyan", end=None)
elif log_radio.startswith("INFO"):
print(log_radio, color="white", end=None)
elif log_radio.startswith("WARN"):
print(log_radio, color="yellow", end=None)
elif log_radio.startswith("ERROR"):
print(log_radio, color="red", end=None)
else:
print(log_radio, end=None)
self.should_read = False
@staticmethod
def scan() -> list[BLEDevice]:
"""Scan for available BLE devices."""

View File

@@ -14,7 +14,6 @@ from decimal import Decimal
from typing import Any, Callable, Dict, List, Optional, Union
import google.protobuf.json_format
import timeago # type: ignore[import-untyped]
from pubsub import pub # type: ignore[import-untyped]
from tabulate import tabulate
@@ -42,6 +41,29 @@ from meshtastic.util import (
)
def _timeago(delta_secs: int) -> str:
"""Convert a number of seconds in the past into a short, friendly string
e.g. "now", "30 sec ago", "1 hour ago"
Zero or negative intervals simply return "now"
"""
intervals = (
("year", 60 * 60 * 24 * 365),
("month", 60 * 60 * 24 * 30),
("day", 60 * 60 * 24),
("hour", 60 * 60),
("min", 60),
("sec", 1),
)
for name, interval_duration in intervals:
if delta_secs < interval_duration:
continue
x = delta_secs // interval_duration
plur = "s" if x > 1 else ""
return f"{x} {name}{plur} ago"
return "now"
class MeshInterface: # pylint: disable=R0902
"""Interface class for meshtastic devices
@@ -158,11 +180,13 @@ class MeshInterface: # pylint: disable=R0902
def getTimeAgo(ts) -> Optional[str]:
"""Format how long ago have we heard from this node (aka timeago)."""
return (
timeago.format(datetime.fromtimestamp(ts), datetime.now())
if ts
else None
)
if ts is None:
return None
delta = datetime.now() - datetime.fromtimestamp(ts)
delta_secs = int(delta.total_seconds())
if delta_secs < 0:
return None # not handling a timestamp from the future
return _timeago(delta_secs)
rows: List[Dict[str, Any]] = []
if self.nodesByNum:

View File

@@ -5,9 +5,10 @@ import re
from unittest.mock import MagicMock, patch
import pytest
from hypothesis import given, strategies as st
from .. import mesh_pb2, config_pb2, BROADCAST_ADDR, LOCAL_ADDR
from ..mesh_interface import MeshInterface
from ..mesh_interface import MeshInterface, _timeago
from ..node import Node
# TODO
@@ -684,3 +685,21 @@ def test_waitConnected_isConnected_timeout(capsys):
out, err = capsys.readouterr()
assert re.search(r"warn about something", err, re.MULTILINE)
assert out == ""
@pytest.mark.unit
def test_timeago():
"""Test that the _timeago function returns sane values"""
assert _timeago(0) == "now"
assert _timeago(1) == "1 sec ago"
assert _timeago(15) == "15 secs ago"
assert _timeago(333) == "5 mins ago"
assert _timeago(99999) == "1 day ago"
assert _timeago(9999999) == "3 months ago"
assert _timeago(-999) == "now"
@given(seconds=st.integers())
def test_timeago_fuzz(seconds):
"""Fuzz _timeago to ensure it works with any integer"""
val = _timeago(seconds)
assert re.match(r"(now|\d+ (secs?|mins?|hours?|days?|months?|years?))", val)

View File

@@ -24,8 +24,16 @@ import serial.tools.list_ports # type: ignore[import-untyped]
from meshtastic.supported_device import supported_devices
from meshtastic.version import get_active_version
"""Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366])
"""Some devices such as a seger jlink or st-link we never want to accidentally open
0x1915 NordicSemi (PPK2)
"""
blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915])
"""Some devices are highly likely to be meshtastic.
0x239a RAK4631
0x303a Heltec tracker"""
whitelistVids = dict.fromkeys([0x239a, 0x303a])
def quoteBooleans(a_string):
"""Quote booleans
@@ -130,19 +138,35 @@ def findPorts(eliminate_duplicates: bool=False) -> List[str]:
Returns:
list -- a list of device paths
"""
l = list(
all_ports = serial.tools.list_ports.comports()
# look for 'likely' meshtastic devices
ports = list(
map(
lambda port: port.device,
filter(
lambda port: port.vid is not None and port.vid not in blacklistVids,
serial.tools.list_ports.comports(),
lambda port: port.vid is not None and port.vid in whitelistVids,
all_ports,
),
)
)
l.sort()
# if no likely devices, just list everything not blacklisted
if len(ports) == 0:
ports = list(
map(
lambda port: port.device,
filter(
lambda port: port.vid is not None and port.vid not in blacklistVids,
all_ports,
),
)
)
ports.sort()
if eliminate_duplicates:
l = eliminate_duplicate_port(l)
return l
ports = eliminate_duplicate_port(ports)
return ports
class dotdict(dict):