Merge pull request #681 from william-stearns/wls_add_types2

Wls add types2
This commit is contained in:
Ian McEwen
2024-10-29 06:50:52 -07:00
committed by GitHub
11 changed files with 183 additions and 150 deletions

View File

@@ -111,13 +111,13 @@ from . import (
LOCAL_ADDR = "^local"
"""A special ID that means the local node"""
BROADCAST_NUM = 0xFFFFFFFF
BROADCAST_NUM: int = 0xFFFFFFFF
"""if using 8 bit nodenums this will be shortened on the target"""
BROADCAST_ADDR = "^all"
"""A special ID that means broadcast"""
OUR_APP_VERSION = 20300
OUR_APP_VERSION: int = 20300
"""The numeric buildnumber (shared with android apps) specifying the
level of device code we are guaranteed to understand

View File

@@ -11,7 +11,7 @@ import os
import platform
import sys
import time
from typing import Optional
from typing import List, Optional
import pyqrcode # type: ignore[import-untyped]
import yaml
@@ -42,7 +42,7 @@ except ImportError as e:
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2
from meshtastic.version import get_active_version
def onReceive(packet, interface):
def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
args = mt_config.args
try:
@@ -73,7 +73,7 @@ def onReceive(packet, interface):
print(f"Warning: There is no field {ex} in the packet.")
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
def onConnection(interface, topic=pub.AUTO_TOPIC) -> None: # pylint: disable=W0613
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
@@ -85,7 +85,7 @@ def checkChannel(interface: MeshInterface, channelIndex: int) -> bool:
return ch and ch.role != channel_pb2.Channel.Role.DISABLED
def getPref(node, comp_name):
def getPref(node, comp_name) -> bool:
"""Get a channel or preferences value"""
def _printSetting(config_type, uni_name, pref_value, repeated):
"""Pretty print the setting"""
@@ -109,11 +109,11 @@ def getPref(node, comp_name):
# First validate the input
localConfig = node.localConfig
moduleConfig = node.moduleConfig
found = False
found: bool = False
for config in [localConfig, moduleConfig]:
objDesc = config.DESCRIPTOR
config_type = objDesc.fields_by_name.get(name[0])
pref = False
pref = "" #FIXME - is this correct to leave as an empty string if not found?
if config_type:
pref = config_type.message_type.fields_by_name.get(snake_name)
if pref or wholeField:
@@ -130,7 +130,7 @@ def getPref(node, comp_name):
return False
# Check if we need to request the config
if len(config.ListFields()) != 0:
if len(config.ListFields()) != 0 and not isinstance(pref, str): # if str, it's still the empty string, I think
# read the value
config_values = getattr(config, config_type.name)
if not wholeField:
@@ -148,16 +148,16 @@ def getPref(node, comp_name):
return True
def splitCompoundName(comp_name):
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name = comp_name.split(".")
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
return name
def traverseConfig(config_root, config, interface_config):
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = meshtastic.util.camel_to_snake(config_root)
for pref in config:
@@ -958,7 +958,7 @@ def onConnected(interface):
sys.exit(1)
def printConfig(config):
def printConfig(config) -> None:
"""print configuration"""
objDesc = config.DESCRIPTOR
for config_section in objDesc.fields:
@@ -975,12 +975,12 @@ def printConfig(config):
print(f" {temp_name}")
def onNode(node):
def onNode(node) -> None:
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
def subscribe() -> None:
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onReceive, "meshtastic.receive")
# pub.subscribe(onConnection, "meshtastic.connection")
@@ -991,7 +991,7 @@ def subscribe():
# pub.subscribe(onNode, "meshtastic.node")
def export_config(interface):
def export_config(interface) -> str:
"""used in --export-config"""
configObj = {}
@@ -1023,7 +1023,7 @@ def export_config(interface):
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig)
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
@@ -1042,7 +1042,7 @@ def export_config(interface):
for i in range(len(prefs[pref]['adminKey'])):
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
if mt_config.camel_case:
configObj["config"] = config
configObj["config"] = config #Identical command here and 2 lines below?
else:
configObj["config"] = config
@@ -1058,10 +1058,11 @@ def export_config(interface):
else:
configObj["module_config"] = prefs
config = "# start of Meshtastic configure yaml\n"
config += yaml.dump(configObj)
print(config)
return config
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
#was used as a string here and a Dictionary above
config_txt += yaml.dump(configObj)
print(config_txt)
return config_txt
def create_power_meter():

View File

@@ -5,6 +5,7 @@ import atexit
import logging
import struct
import time
import io
from threading import Thread
from typing import List, Optional
@@ -34,9 +35,9 @@ class BLEInterface(MeshInterface):
self,
address: Optional[str],
noProto: bool = False,
debugOut=None,
debugOut: Optional[io.TextIOWrapper]=None,
noNodes: bool = False,
):
) -> None:
MeshInterface.__init__(
self, debugOut=debugOut, noProto=noProto, noNodes=noNodes
)
@@ -82,7 +83,7 @@ class BLEInterface(MeshInterface):
# Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit
self._exit_handler = atexit.register(self.client.disconnect)
def from_num_handler(self, _, b): # pylint: disable=C0116
def from_num_handler(self, _, b: bytes) -> None: # pylint: disable=C0116
"""Handle callbacks for fromnum notify.
Note: this method does not need to be async because it is just setting a bool.
"""
@@ -150,9 +151,12 @@ class BLEInterface(MeshInterface):
)
return addressed_devices[0]
def _sanitize_address(address): # pylint: disable=E0213
def _sanitize_address(self, address: Optional[str]) -> Optional[str]: # pylint: disable=E0213
"Standardize BLE address by removing extraneous characters and lowercasing."
return address.replace("-", "").replace("_", "").replace(":", "").lower()
if address is None:
return None
else:
return address.replace("-", "").replace("_", "").replace(":", "").lower()
def connect(self, address: Optional[str] = None) -> "BLEClient":
"Connect to a device by address."
@@ -164,12 +168,16 @@ class BLEInterface(MeshInterface):
client.discover()
return client
def _receiveFromRadioImpl(self):
def _receiveFromRadioImpl(self) -> None:
while self._want_receive:
if self.should_read:
self.should_read = False
retries = 0
retries: int = 0
while self._want_receive:
if self.client is None:
logging.debug(f"BLE client is None, shutting down")
self._want_receive = False
continue
try:
b = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
except BleakDBusError as e:
@@ -194,8 +202,8 @@ class BLEInterface(MeshInterface):
else:
time.sleep(0.01)
def _sendToRadioImpl(self, toRadio):
b = toRadio.SerializeToString()
def _sendToRadioImpl(self, toRadio) -> None:
b: bytes = toRadio.SerializeToString()
if b and self.client: # we silently ignore writes while we are shutting down
logging.debug(f"TORADIO write: {b.hex()}")
try:
@@ -211,7 +219,7 @@ class BLEInterface(MeshInterface):
time.sleep(0.01)
self.should_read = True
def close(self):
def close(self) -> None:
try:
MeshInterface.close(self)
except Exception as e:
@@ -236,7 +244,7 @@ class BLEInterface(MeshInterface):
class BLEClient:
"""Client for managing connection to a BLE device"""
def __init__(self, address=None, **kwargs):
def __init__(self, address=None, **kwargs) -> None:
self._eventLoop = asyncio.new_event_loop()
self._eventThread = Thread(
target=self._run_event_loop, name="BLEClient", daemon=True

View File

@@ -13,6 +13,8 @@ with rather more easily once the code is simplified by this change.
"""
from typing import Any, Optional
def reset():
"""
Restore the namespace to pristine condition.
@@ -33,5 +35,5 @@ args = None
parser = None
channel_index = None
logfile = None
tunnelInstance = None
tunnelInstance: Optional[Any] = None
camel_case = False

View File

@@ -8,7 +8,7 @@ from meshtastic.protobuf import portnums_pb2, remote_hardware_pb2
from meshtastic.util import our_exit
def onGPIOreceive(packet, interface):
def onGPIOreceive(packet, interface) -> None:
"""Callback for received GPIO responses"""
logging.debug(f"packet:{packet} interface:{interface}")
gpioValue = 0
@@ -37,7 +37,7 @@ class RemoteHardwareClient:
code for how you can connect to your own custom meshtastic services
"""
def __init__(self, iface):
def __init__(self, iface) -> None:
"""
Constructor

View File

@@ -5,7 +5,7 @@ import logging
import platform
import time
from typing import Optional
from typing import List, Optional
import serial # type: ignore[import-untyped]
@@ -19,7 +19,7 @@ if platform.system() != "Windows":
class SerialInterface(StreamInterface):
"""Interface class for meshtastic devices over a serial link"""
def __init__(self, devPath: Optional[str]=None, debugOut=None, noProto=False, connectNow=True, noNodes: bool=False):
def __init__(self, devPath: Optional[str]=None, debugOut=None, noProto: bool=False, connectNow: bool=True, noNodes: bool=False) -> None:
"""Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
@@ -32,13 +32,13 @@ class SerialInterface(StreamInterface):
self.devPath: Optional[str] = devPath
if self.devPath is None:
ports = meshtastic.util.findPorts(True)
ports: List[str] = meshtastic.util.findPorts(True)
logging.debug(f"ports:{ports}")
if len(ports) == 0:
print("No Serial Meshtastic device detected, attempting TCP connection on localhost.")
return
elif len(ports) > 1:
message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message: str = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message += f" Ports detected:{ports}"
meshtastic.util.our_exit(message)
else:
@@ -59,14 +59,14 @@ class SerialInterface(StreamInterface):
self.stream = serial.Serial(
self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0
)
self.stream.flush()
self.stream.flush() # type: ignore[attr-defined]
time.sleep(0.1)
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes
)
def close(self):
def close(self) -> None:
"""Close a connection to the device"""
if self.stream: # Stream can be null if we were already closed
self.stream.flush() # FIXME: why are there these two flushes with 100ms sleeps? This shouldn't be necessary

View File

@@ -1,10 +1,13 @@
"""Stream Interface base class
"""
import io
import logging
import threading
import time
import traceback
from typing import Optional, cast
import serial # type: ignore[import-untyped]
from meshtastic.mesh_interface import MeshInterface
@@ -19,7 +22,7 @@ MAX_TO_FROM_RADIO_SIZE = 512
class StreamInterface(MeshInterface):
"""Interface class for meshtastic devices over a stream link (serial, TCP, etc)"""
def __init__(self, debugOut=None, noProto=False, connectNow=True, noNodes=False):
def __init__(self, debugOut: Optional[io.TextIOWrapper]=None, noProto: bool=False, connectNow: bool=True, noNodes: bool=False) -> None:
"""Constructor, opens a connection to self.stream
Keyword Arguments:
@@ -35,6 +38,7 @@ class StreamInterface(MeshInterface):
raise Exception( # pylint: disable=W0719
"StreamInterface is now abstract (to update existing code create SerialInterface instead)"
)
self.stream: Optional[serial.Serial] # only serial uses this, TCPInterface overrides the relevant methods instead
self._rxBuf = bytes() # empty
self._wantExit = False
@@ -52,7 +56,7 @@ class StreamInterface(MeshInterface):
if not noProto:
self.waitForConfig()
def connect(self):
def connect(self) -> None:
"""Connect to our radio
Normally this is called automatically by the constructor, but if you
@@ -63,7 +67,7 @@ class StreamInterface(MeshInterface):
# if the reading statemachine was parsing a bad packet make sure
# we write enough start bytes to force it to resync (we don't use START1
# because we want to ensure it is looking for START1)
p = bytearray([START2] * 32)
p: bytes = bytearray([START2] * 32)
self._writeBytes(p)
time.sleep(0.1) # wait 100ms to give device time to start running
@@ -74,7 +78,7 @@ class StreamInterface(MeshInterface):
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
def _disconnected(self) -> None:
"""We override the superclass implementation to close our port"""
MeshInterface._disconnected(self)
@@ -86,7 +90,7 @@ class StreamInterface(MeshInterface):
# pylint: disable=W0201
self.stream = None
def _writeBytes(self, b):
def _writeBytes(self, b: bytes) -> None:
"""Write an array of bytes to our stream and flush"""
if self.stream: # ignore writes when stream is closed
self.stream.write(b)
@@ -98,24 +102,24 @@ class StreamInterface(MeshInterface):
# we sleep here to give the TBeam a chance to work
time.sleep(0.1)
def _readBytes(self, length):
def _readBytes(self, length) -> Optional[bytes]:
"""Read an array of bytes from our stream"""
if self.stream:
return self.stream.read(length)
else:
return None
def _sendToRadioImpl(self, toRadio):
def _sendToRadioImpl(self, toRadio) -> None:
"""Send a ToRadio protobuf to the device"""
logging.debug(f"Sending: {stripnl(toRadio)}")
b = toRadio.SerializeToString()
bufLen = len(b)
b: bytes = toRadio.SerializeToString()
bufLen: int = len(b)
# We convert into a string, because the TCP code doesn't work with byte arrays
header = bytes([START1, START2, (bufLen >> 8) & 0xFF, bufLen & 0xFF])
logging.debug(f"sending header:{header} b:{b}")
header: bytes = bytes([START1, START2, (bufLen >> 8) & 0xFF, bufLen & 0xFF])
logging.debug(f"sending header:{header!r} b:{b!r}")
self._writeBytes(header + b)
def close(self):
def close(self) -> None:
"""Close a connection to the device"""
logging.debug("Closing stream")
MeshInterface.close(self)
@@ -142,7 +146,7 @@ class StreamInterface(MeshInterface):
else:
self.cur_log_line += utf
def __reader(self):
def __reader(self) -> None:
"""The reader thread that reads bytes from our stream"""
logging.debug("in __reader()")
empty = bytes()
@@ -150,13 +154,13 @@ class StreamInterface(MeshInterface):
try:
while not self._wantExit:
# logging.debug("reading character")
b = self._readBytes(1)
b: Optional[bytes] = self._readBytes(1)
# logging.debug("In reader loop")
# logging.debug(f"read returned {b}")
if len(b) > 0:
c = b[0]
if b is not None and len(cast(bytes, b)) > 0:
c: int = b[0]
# logging.debug(f'c:{c}')
ptr = len(self._rxBuf)
ptr: int = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b

View File

@@ -3,7 +3,7 @@
# pylint: disable=R0917
import logging
import socket
from typing import Optional
from typing import Optional, cast
from meshtastic.stream_interface import StreamInterface
@@ -16,9 +16,9 @@ class TCPInterface(StreamInterface):
self,
hostname: str,
debugOut=None,
noProto=False,
connectNow=True,
portNumber=DEFAULT_TCP_PORT,
noProto: bool=False,
connectNow: bool=True,
portNumber: int=DEFAULT_TCP_PORT,
noNodes:bool=False,
):
"""Constructor, opens a connection to a specified IP address/hostname
@@ -29,14 +29,16 @@ class TCPInterface(StreamInterface):
self.stream = None
self.hostname = hostname
self.portNumber = portNumber
self.hostname: str = hostname
self.portNumber: int = portNumber
self.socket: Optional[socket.socket] = None
if connectNow:
logging.debug(f"Connecting to {hostname}") # type: ignore[str-bytes-safe]
server_address = (hostname, portNumber)
sock = socket.create_connection(server_address)
self.socket: Optional[socket.socket] = sock
server_address: tuple[str, int] = (hostname, portNumber)
sock: Optional[socket.socket] = socket.create_connection(server_address)
self.socket = sock
else:
self.socket = None
@@ -44,25 +46,26 @@ class TCPInterface(StreamInterface):
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes
)
def _socket_shutdown(self):
def _socket_shutdown(self) -> None:
"""Shutdown the socket.
Note: Broke out this line so the exception could be unit tested.
"""
self.socket.shutdown(socket.SHUT_RDWR)
if self.socket: #mian: please check that this should be "if self.socket:"
cast(socket.socket, self.socket).shutdown(socket.SHUT_RDWR)
def myConnect(self):
def myConnect(self) -> None:
"""Connect to socket"""
server_address = (self.hostname, self.portNumber)
sock = socket.create_connection(server_address)
server_address: tuple[str, int] = (self.hostname, self.portNumber)
sock: Optional[socket.socket] = socket.create_connection(server_address)
self.socket = sock
def close(self):
def close(self) -> None:
"""Close a connection to the device"""
logging.debug("Closing TCP stream")
StreamInterface.close(self)
# Sometimes the socket read might be blocked in the reader thread.
# Therefore we force the shutdown by closing the socket here
self._wantExit = True
self._wantExit: bool = True
if not self.socket is None:
try:
self._socket_shutdown()
@@ -70,10 +73,14 @@ class TCPInterface(StreamInterface):
pass # Ignore errors in shutdown, because we might have a race with the server
self.socket.close()
def _writeBytes(self, b):
def _writeBytes(self, b: bytes) -> None:
"""Write an array of bytes to our stream and flush"""
self.socket.send(b)
if self.socket:
self.socket.send(b)
def _readBytes(self, length):
def _readBytes(self, length) -> Optional[bytes]:
"""Read an array of bytes from our stream"""
return self.socket.recv(length)
if self.socket:
return self.socket.recv(length)
else:
return None

View File

@@ -5,6 +5,9 @@ import logging
import sys
import time
import traceback
import io
from typing import List, Optional
from dotmap import DotMap # type: ignore[import-untyped]
from pubsub import pub # type: ignore[import-untyped]
@@ -15,19 +18,19 @@ from meshtastic.serial_interface import SerialInterface
from meshtastic.tcp_interface import TCPInterface
"""The interfaces we are using for our tests"""
interfaces = None
interfaces: List = []
"""A list of all packets we received while the current test was running"""
receivedPackets = None
receivedPackets: Optional[List] = None
testsRunning = False
testsRunning: bool = False
testNumber = 0
testNumber: int = 0
sendingInterface = None
def onReceive(packet, interface):
def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
if sendingInterface == interface:
pass
@@ -42,20 +45,20 @@ def onReceive(packet, interface):
receivedPackets.append(p)
def onNode(node):
def onNode(node) -> None:
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
def subscribe() -> None:
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onNode, "meshtastic.node")
def testSend(
fromInterface, toInterface, isBroadcast=False, asBinary=False, wantAck=False
):
fromInterface, toInterface, isBroadcast: bool=False, asBinary: bool=False, wantAck: bool=False
) -> bool:
"""
Sends one test packet between two nodes and then returns success or failure
@@ -93,16 +96,16 @@ def testSend(
return False # Failed to send
def runTests(numTests=50, wantAck=False, maxFailures=0):
def runTests(numTests: int=50, wantAck: bool=False, maxFailures: int=0) -> bool:
"""Run the tests."""
logging.info(f"Running {numTests} tests with wantAck={wantAck}")
numFail = 0
numSuccess = 0
numFail: int = 0
numSuccess: int = 0
for _ in range(numTests):
# pylint: disable=W0603
global testNumber
testNumber = testNumber + 1
isBroadcast = True
isBroadcast:bool = True
# asBinary=(i % 2 == 0)
success = testSend(
interfaces[0], interfaces[1], isBroadcast, asBinary=False, wantAck=wantAck
@@ -126,10 +129,10 @@ def runTests(numTests=50, wantAck=False, maxFailures=0):
return True
def testThread(numTests=50):
def testThread(numTests=50) -> bool:
"""Test thread"""
logging.info("Found devices, starting tests...")
result = runTests(numTests, wantAck=True)
result: bool = runTests(numTests, wantAck=True)
if result:
# Run another test
# Allow a few dropped packets
@@ -137,25 +140,25 @@ def testThread(numTests=50):
return result
def onConnection(topic=pub.AUTO_TOPIC):
def onConnection(topic=pub.AUTO_TOPIC) -> None:
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def openDebugLog(portName):
def openDebugLog(portName) -> io.TextIOWrapper:
"""Open the debug log file"""
debugname = "log" + portName.replace("/", "_")
logging.info(f"Writing serial debugging to {debugname}")
return open(debugname, "w+", buffering=1, encoding="utf8")
def testAll(numTests=5):
def testAll(numTests: int=5) -> bool:
"""
Run a series of tests using devices we can find.
This is called from the cli with the "--test" option.
"""
ports = meshtastic.util.findPorts(True)
ports: List[str] = meshtastic.util.findPorts(True)
if len(ports) < 2:
meshtastic.util.our_exit(
"Warning: Must have at least two devices connected to USB."
@@ -175,7 +178,7 @@ def testAll(numTests=5):
)
logging.info("Ports opened, starting test")
result = testThread(numTests)
result: bool = testThread(numTests)
for i in interfaces:
i.close()
@@ -183,7 +186,7 @@ def testAll(numTests=5):
return result
def testSimulator():
def testSimulator() -> None:
"""
Assume that someone has launched meshtastic-native as a simulated node.
Talk to that node over TCP, do some operations and if they are successful
@@ -195,7 +198,7 @@ def testSimulator():
logging.basicConfig(level=logging.DEBUG)
logging.info("Connecting to simulator on localhost!")
try:
iface = TCPInterface("localhost")
iface: meshtastic.tcp_interface.TCPInterface = TCPInterface("localhost")
iface.showInfo()
iface.localNode.showInfo()
iface.localNode.exitSimulator()

View File

@@ -43,7 +43,7 @@ class Tunnel:
self.message = message
super().__init__(self.message)
def __init__(self, iface, subnet="10.115", netmask="255.255.0.0"):
def __init__(self, iface, subnet: str="10.115", netmask: str="255.255.0.0") -> None:
"""
Constructor

View File

@@ -11,7 +11,7 @@ import threading
import time
import traceback
from queue import Queue
from typing import List, NoReturn, Union
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
@@ -31,7 +31,7 @@ from meshtastic.version import get_active_version
0925 Lakeview Research Saleae Logic (logic analyzer)
04b4:602a Cypress Semiconductor Corp. Hantek DSO-6022BL (oscilloscope)
"""
blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4])
blacklistVids: Dict = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4])
"""Some devices are highly likely to be meshtastic.
0x239a RAK4631
@@ -39,21 +39,21 @@ blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4])
whitelistVids = dict.fromkeys([0x239a, 0x303a])
def quoteBooleans(a_string):
def quoteBooleans(a_string: str) -> str:
"""Quote booleans
given a string that contains ": true", replace with ": 'true'" (or false)
"""
tmp = a_string.replace(": true", ": 'true'")
tmp: str = a_string.replace(": true", ": 'true'")
tmp = tmp.replace(": false", ": 'false'")
return tmp
def genPSK256():
def genPSK256() -> bytes:
"""Generate a random preshared key"""
return os.urandom(32)
def fromPSK(valstr):
def fromPSK(valstr: str) -> Any:
"""A special version of fromStr that assumes the user is trying to set a PSK.
In that case we also allow "none", "default" or "random" (to have python generate one), or simpleN
"""
@@ -70,7 +70,7 @@ def fromPSK(valstr):
return fromStr(valstr)
def fromStr(valstr):
def fromStr(valstr: str) -> Any:
"""Try to parse as int, float or bool (and fallback to a string as last resort)
Returns: an int, bool, float, str or byte array (for strings of hex digits)
@@ -78,6 +78,7 @@ def fromStr(valstr):
Args:
valstr (string): A user provided string
"""
val: Any
if len(valstr) == 0: # Treat an emptystring as an empty bytes
val = bytes()
elif valstr.startswith("0x"):
@@ -100,6 +101,7 @@ def fromStr(valstr):
return val
def toStr(raw_value):
"""Convert a value to a string that can be used in a config file"""
if isinstance(raw_value, bytes):
@@ -107,7 +109,7 @@ def toStr(raw_value):
return str(raw_value)
def pskToString(psk: bytes):
def pskToString(psk: bytes) -> str:
"""Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string"""
if len(psk) == 0:
return "unencrypted"
@@ -129,12 +131,12 @@ def stripnl(s) -> str:
return " ".join(s.split())
def fixme(message):
def fixme(message: str) -> None:
"""Raise an exception for things that needs to be fixed"""
raise Exception(f"FIXME: {message}") # pylint: disable=W0719
def catchAndIgnore(reason, closure):
def catchAndIgnore(reason: str, closure) -> None:
"""Call a closure but if it throws an exception print it and continue"""
try:
closure()
@@ -152,7 +154,7 @@ def findPorts(eliminate_duplicates: bool=False) -> List[str]:
all_ports = serial.tools.list_ports.comports()
# look for 'likely' meshtastic devices
ports = list(
ports: List = list(
map(
lambda port: port.device,
filter(
@@ -191,12 +193,12 @@ class dotdict(dict):
class Timeout:
"""Timeout class"""
def __init__(self, maxSecs: int=20):
def __init__(self, maxSecs: int=20) -> None:
self.expireTime: Union[int, float] = 0
self.sleepInterval: float = 0.1
self.expireTimeout: int = maxSecs
def reset(self):
def reset(self) -> None:
"""Restart the waitForSet timer"""
self.expireTime = time.time() + self.expireTimeout
@@ -255,7 +257,7 @@ class Timeout:
class Acknowledgment:
"A class that records which type of acknowledgment was just received, if any."
def __init__(self):
def __init__(self) -> None:
"""initialize"""
self.receivedAck = False
self.receivedNak = False
@@ -264,7 +266,7 @@ class Acknowledgment:
self.receivedTelemetry = False
self.receivedPosition = False
def reset(self):
def reset(self) -> None:
"""reset"""
self.receivedAck = False
self.receivedNak = False
@@ -277,18 +279,18 @@ class Acknowledgment:
class DeferredExecution:
"""A thread that accepts closures to run, and runs them as they are received"""
def __init__(self, name):
self.queue = Queue()
def __init__(self, name) -> None:
self.queue: Queue = Queue()
# this thread must be marked as daemon, otherwise it will prevent clients from exiting
self.thread = threading.Thread(target=self._run, args=(), name=name, daemon=True)
self.thread.daemon = True
self.thread.start()
def queueWork(self, runnable):
def queueWork(self, runnable) -> None:
"""Queue up the work"""
self.queue.put(runnable)
def _run(self):
def _run(self) -> None:
while True:
try:
o = self.queue.get()
@@ -308,7 +310,7 @@ def our_exit(message, return_value=1) -> NoReturn:
sys.exit(return_value)
def support_info():
def support_info() -> None:
"""Print out info that helps troubleshooting of the cli."""
print("")
print("If having issues with meshtastic cli or python library")
@@ -337,7 +339,7 @@ def support_info():
print("Please add the output from the command: meshtastic --info")
def remove_keys_from_dict(keys, adict):
def remove_keys_from_dict(keys: Union[Tuple, List, Set], adict: Dict) -> Dict:
"""Return a dictionary without some keys in it.
Will removed nested keys.
"""
@@ -352,33 +354,33 @@ def remove_keys_from_dict(keys, adict):
return adict
def hexstr(barray):
def hexstr(barray: bytes) -> str:
"""Print a string of hex digits"""
return ":".join(f"{x:02x}" for x in barray)
def ipstr(barray):
def ipstr(barray: bytes) -> str:
"""Print a string of ip digits"""
return ".".join(f"{x}" for x in barray)
def readnet_u16(p, offset):
def readnet_u16(p, offset: int) -> int:
"""Read big endian u16 (network byte order)"""
return p[offset] * 256 + p[offset + 1]
def convert_mac_addr(val):
def convert_mac_addr(val: str) -> Union[str, bytes]:
"""Convert the base 64 encoded value to a mac address
val - base64 encoded value (ex: '/c0gFyhb'))
returns: a string formatted like a mac address (ex: 'fd:cd:20:17:28:5b')
"""
if not re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", val):
val_as_bytes = base64.b64decode(val)
val_as_bytes: bytes = base64.b64decode(val)
return hexstr(val_as_bytes)
return val
def snake_to_camel(a_string):
def snake_to_camel(a_string: str) -> str:
"""convert snake_case to camelCase"""
# split underscore using split
temp = a_string.split("_")
@@ -387,16 +389,16 @@ def snake_to_camel(a_string):
return result
def camel_to_snake(a_string):
def camel_to_snake(a_string: str) -> str:
"""convert camelCase to snake_case"""
return "".join(["_" + i.lower() if i.isupper() else i for i in a_string]).lstrip(
"_"
)
def detect_supported_devices():
def detect_supported_devices() -> Set:
"""detect supported devices based on vendor id"""
system = platform.system()
system: str = platform.system()
# print(f'system:{system}')
possible_devices = set()
@@ -454,9 +456,9 @@ def detect_supported_devices():
return possible_devices
def detect_windows_needs_driver(sd, print_reason=False):
def detect_windows_needs_driver(sd, print_reason=False) -> bool:
"""detect if Windows user needs to install driver for a supported device"""
need_to_install_driver = False
need_to_install_driver: bool = False
if sd:
system = platform.system()
@@ -482,7 +484,7 @@ def detect_windows_needs_driver(sd, print_reason=False):
return need_to_install_driver
def eliminate_duplicate_port(ports):
def eliminate_duplicate_port(ports: List) -> List:
"""Sometimes we detect 2 serial ports, but we really only need to use one of the ports.
ports is a list of ports
@@ -515,9 +517,9 @@ def eliminate_duplicate_port(ports):
return new_ports
def is_windows11():
def is_windows11() -> bool:
"""Detect if Windows 11"""
is_win11 = False
is_win11: bool = False
if platform.system() == "Windows":
if float(platform.release()) >= 10.0:
patch = platform.version().split(".")[2]
@@ -531,7 +533,7 @@ def is_windows11():
return is_win11
def get_unique_vendor_ids():
def get_unique_vendor_ids() -> Set[str]:
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
@@ -540,7 +542,7 @@ def get_unique_vendor_ids():
return vids
def get_devices_with_vendor_id(vid):
def get_devices_with_vendor_id(vid: str) -> Set: #Set[SupportedDevice]
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
@@ -549,11 +551,11 @@ def get_devices_with_vendor_id(vid):
return sd
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
def active_ports_on_supported_devices(sds, eliminate_duplicates=False) -> Set[str]:
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
ports: Set = set()
baseports: Set = set()
system: str = platform.system()
# figure out what possible base ports there are
for d in sds:
@@ -611,13 +613,13 @@ def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
for com_port in com_ports:
ports.add(com_port)
if eliminate_duplicates:
ports = eliminate_duplicate_port(list(ports))
ports.sort()
ports = set(ports)
portlist: List = eliminate_duplicate_port(list(ports))
portlist.sort()
ports = set(portlist)
return ports
def detect_windows_port(sd):
def detect_windows_port(sd) -> Set[str]: #"sd" is a SupportedDevice from meshtastic.supported_device
"""detect if Windows port"""
ports = set()
@@ -642,20 +644,26 @@ def detect_windows_port(sd):
return ports
def check_if_newer_version():
def check_if_newer_version() -> Optional[str]:
"""Check pip to see if we are running the latest version."""
pypi_version = None
pypi_version: Optional[str] = None
try:
url = "https://pypi.org/pypi/meshtastic/json"
url: str = "https://pypi.org/pypi/meshtastic/json"
data = requests.get(url, timeout=5).json()
pypi_version = data["info"]["version"]
except Exception:
pass
act_version = get_active_version()
if pypi_version is None:
return None
try:
parsed_act_version = pkg_version.parse(act_version)
parsed_pypi_version = pkg_version.parse(pypi_version)
#Note: if handed "None" when we can't download the pypi_version,
#this gets a TypeError:
#"TypeError: expected string or bytes-like object, got 'NoneType'"
#Handle that below?
except pkg_version.InvalidVersion:
return pypi_version