diff --git a/meshtastic/__init__.py b/meshtastic/__init__.py index a5075bc..313cb42 100644 --- a/meshtastic/__init__.py +++ b/meshtastic/__init__.py @@ -76,7 +76,7 @@ from typing import * import google.protobuf.json_format import serial # type: ignore[import-untyped] -import timeago # type: ignore[import-untyped] +from dotmap import DotMap # type: ignore[import-untyped] from google.protobuf.json_format import MessageToJson from pubsub import pub # type: ignore[import-untyped] from tabulate import tabulate diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 5b3403c..66ce7f2 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -14,7 +14,6 @@ from decimal import Decimal from typing import Any, Callable, Dict, List, Optional, Union import google.protobuf.json_format -import timeago # type: ignore[import-untyped] from pubsub import pub # type: ignore[import-untyped] from tabulate import tabulate @@ -42,6 +41,29 @@ from meshtastic.util import ( ) +def _timeago(delta_secs: int) -> str: + """Convert a number of seconds in the past into a short, friendly string + e.g. "now", "30 sec ago", "1 hour ago" + Zero or negative intervals simply return "now" + """ + intervals = ( + ("year", 60 * 60 * 24 * 365), + ("month", 60 * 60 * 24 * 30), + ("day", 60 * 60 * 24), + ("hour", 60 * 60), + ("min", 60), + ("sec", 1), + ) + for name, interval_duration in intervals: + if delta_secs < interval_duration: + continue + x = delta_secs // interval_duration + plur = "s" if x > 1 else "" + return f"{x} {name}{plur} ago" + + return "now" + + class MeshInterface: # pylint: disable=R0902 """Interface class for meshtastic devices @@ -158,11 +180,13 @@ class MeshInterface: # pylint: disable=R0902 def getTimeAgo(ts) -> Optional[str]: """Format how long ago have we heard from this node (aka timeago).""" - return ( - timeago.format(datetime.fromtimestamp(ts), datetime.now()) - if ts - else None - ) + if ts is None: + return None + delta = datetime.now() - datetime.fromtimestamp(ts) + delta_secs = int(delta.total_seconds()) + if delta_secs < 0: + return None # not handling a timestamp from the future + return _timeago(delta_secs) rows: List[Dict[str, Any]] = [] if self.nodesByNum: diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 037e0bf..5e8441c 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -5,9 +5,10 @@ import re from unittest.mock import MagicMock, patch import pytest +from hypothesis import given, strategies as st from .. import mesh_pb2, config_pb2, BROADCAST_ADDR, LOCAL_ADDR -from ..mesh_interface import MeshInterface +from ..mesh_interface import MeshInterface, _timeago from ..node import Node # TODO @@ -684,3 +685,21 @@ def test_waitConnected_isConnected_timeout(capsys): out, err = capsys.readouterr() assert re.search(r"warn about something", err, re.MULTILINE) assert out == "" + + +@pytest.mark.unit +def test_timeago(): + """Test that the _timeago function returns sane values""" + assert _timeago(0) == "now" + assert _timeago(1) == "1 sec ago" + assert _timeago(15) == "15 secs ago" + assert _timeago(333) == "5 mins ago" + assert _timeago(99999) == "1 day ago" + assert _timeago(9999999) == "3 months ago" + assert _timeago(-999) == "now" + +@given(seconds=st.integers()) +def test_timeago_fuzz(seconds): + """Fuzz _timeago to ensure it works with any integer""" + val = _timeago(seconds) + assert re.match(r"(now|\d+ (secs?|mins?|hours?|days?|months?|years?))", val) diff --git a/meshtastic/util.py b/meshtastic/util.py index 14f6a54..3193802 100644 --- a/meshtastic/util.py +++ b/meshtastic/util.py @@ -24,8 +24,16 @@ import serial.tools.list_ports # type: ignore[import-untyped] from meshtastic.supported_device import supported_devices from meshtastic.version import get_active_version -"""Some devices such as a seger jlink we never want to accidentally open""" -blacklistVids = dict.fromkeys([0x1366]) +"""Some devices such as a seger jlink or st-link we never want to accidentally open +0x1915 NordicSemi (PPK2) +""" +blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915]) + +"""Some devices are highly likely to be meshtastic. +0x239a RAK4631 +0x303a Heltec tracker""" +whitelistVids = dict.fromkeys([0x239a, 0x303a]) + def quoteBooleans(a_string): """Quote booleans @@ -130,19 +138,35 @@ def findPorts(eliminate_duplicates: bool=False) -> List[str]: Returns: list -- a list of device paths """ - l = list( + all_ports = serial.tools.list_ports.comports() + + # look for 'likely' meshtastic devices + ports = list( map( lambda port: port.device, filter( - lambda port: port.vid is not None and port.vid not in blacklistVids, - serial.tools.list_ports.comports(), + lambda port: port.vid is not None and port.vid in whitelistVids, + all_ports, ), ) ) - l.sort() + + # if no likely devices, just list everything not blacklisted + if len(ports) == 0: + ports = list( + map( + lambda port: port.device, + filter( + lambda port: port.vid is not None and port.vid not in blacklistVids, + all_ports, + ), + ) + ) + + ports.sort() if eliminate_duplicates: - l = eliminate_duplicate_port(l) - return l + ports = eliminate_duplicate_port(ports) + return ports class dotdict(dict): diff --git a/tests/hello_world.py b/tests/hello_world.py index 8fd8f79..8c25f8f 100644 --- a/tests/hello_world.py +++ b/tests/hello_world.py @@ -1,9 +1,7 @@ -import time - -import meshtastic +import meshtastic.serial_interface interface = ( - meshtastic.SerialInterface() + meshtastic.serial_interface.SerialInterface() ) # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 interface.sendText("hello mesh") interface.close()