diff --git a/meshtastic/__init__.py b/meshtastic/__init__.py index 8c7aeae..ce810cc 100644 --- a/meshtastic/__init__.py +++ b/meshtastic/__init__.py @@ -111,13 +111,13 @@ from . import ( LOCAL_ADDR = "^local" """A special ID that means the local node""" -BROADCAST_NUM = 0xFFFFFFFF +BROADCAST_NUM: int = 0xFFFFFFFF """if using 8 bit nodenums this will be shortened on the target""" BROADCAST_ADDR = "^all" """A special ID that means broadcast""" -OUR_APP_VERSION = 20300 +OUR_APP_VERSION: int = 20300 """The numeric buildnumber (shared with android apps) specifying the level of device code we are guaranteed to understand diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 4603822..9aeef2e 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -11,7 +11,7 @@ import os import platform import sys import time -from typing import Optional +from typing import List, Optional import pyqrcode # type: ignore[import-untyped] import yaml @@ -42,7 +42,7 @@ except ImportError as e: from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2 from meshtastic.version import get_active_version -def onReceive(packet, interface): +def onReceive(packet, interface) -> None: """Callback invoked when a packet arrives""" args = mt_config.args try: @@ -73,7 +73,7 @@ def onReceive(packet, interface): print(f"Warning: There is no field {ex} in the packet.") -def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613 +def onConnection(interface, topic=pub.AUTO_TOPIC) -> None: # pylint: disable=W0613 """Callback invoked when we connect/disconnect from a radio""" print(f"Connection changed: {topic.getName()}") @@ -85,7 +85,7 @@ def checkChannel(interface: MeshInterface, channelIndex: int) -> bool: return ch and ch.role != channel_pb2.Channel.Role.DISABLED -def getPref(node, comp_name): +def getPref(node, comp_name) -> bool: """Get a channel or preferences value""" def _printSetting(config_type, uni_name, pref_value, repeated): """Pretty print the setting""" @@ -109,11 +109,11 @@ def getPref(node, comp_name): # First validate the input localConfig = node.localConfig moduleConfig = node.moduleConfig - found = False + found: bool = False for config in [localConfig, moduleConfig]: objDesc = config.DESCRIPTOR config_type = objDesc.fields_by_name.get(name[0]) - pref = False + pref = "" #FIXME - is this correct to leave as an empty string if not found? if config_type: pref = config_type.message_type.fields_by_name.get(snake_name) if pref or wholeField: @@ -130,7 +130,7 @@ def getPref(node, comp_name): return False # Check if we need to request the config - if len(config.ListFields()) != 0: + if len(config.ListFields()) != 0 and not isinstance(pref, str): # if str, it's still the empty string, I think # read the value config_values = getattr(config, config_type.name) if not wholeField: @@ -148,16 +148,16 @@ def getPref(node, comp_name): return True -def splitCompoundName(comp_name): +def splitCompoundName(comp_name: str) -> List[str]: """Split compound (dot separated) preference name into parts""" - name = comp_name.split(".") + name: List[str] = comp_name.split(".") if len(name) < 2: name[0] = comp_name name.append(comp_name) return name -def traverseConfig(config_root, config, interface_config): +def traverseConfig(config_root, config, interface_config) -> bool: """Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference""" snake_name = meshtastic.util.camel_to_snake(config_root) for pref in config: @@ -958,7 +958,7 @@ def onConnected(interface): sys.exit(1) -def printConfig(config): +def printConfig(config) -> None: """print configuration""" objDesc = config.DESCRIPTOR for config_section in objDesc.fields: @@ -975,12 +975,12 @@ def printConfig(config): print(f" {temp_name}") -def onNode(node): +def onNode(node) -> None: """Callback invoked when the node DB changes""" print(f"Node changed: {node}") -def subscribe(): +def subscribe() -> None: """Subscribe to the topics the user probably wants to see, prints output to stdout""" pub.subscribe(onReceive, "meshtastic.receive") # pub.subscribe(onConnection, "meshtastic.connection") @@ -991,7 +991,7 @@ def subscribe(): # pub.subscribe(onNode, "meshtastic.node") -def export_config(interface): +def export_config(interface) -> str: """used in --export-config""" configObj = {} @@ -1023,7 +1023,7 @@ def export_config(interface): if alt: configObj["location"]["alt"] = alt - config = MessageToDict(interface.localNode.localConfig) + config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below if config: # Convert inner keys to correct snake/camelCase prefs = {} @@ -1042,7 +1042,7 @@ def export_config(interface): for i in range(len(prefs[pref]['adminKey'])): prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i] if mt_config.camel_case: - configObj["config"] = config + configObj["config"] = config #Identical command here and 2 lines below? else: configObj["config"] = config @@ -1058,10 +1058,11 @@ def export_config(interface): else: configObj["module_config"] = prefs - config = "# start of Meshtastic configure yaml\n" - config += yaml.dump(configObj) - print(config) - return config + config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out) + #was used as a string here and a Dictionary above + config_txt += yaml.dump(configObj) + print(config_txt) + return config_txt def create_power_meter(): diff --git a/meshtastic/ble_interface.py b/meshtastic/ble_interface.py index 12dbd07..76e5dc3 100644 --- a/meshtastic/ble_interface.py +++ b/meshtastic/ble_interface.py @@ -5,6 +5,7 @@ import atexit import logging import struct import time +import io from threading import Thread from typing import List, Optional @@ -34,9 +35,9 @@ class BLEInterface(MeshInterface): self, address: Optional[str], noProto: bool = False, - debugOut=None, + debugOut: Optional[io.TextIOWrapper]=None, noNodes: bool = False, - ): + ) -> None: MeshInterface.__init__( self, debugOut=debugOut, noProto=noProto, noNodes=noNodes ) @@ -82,7 +83,7 @@ class BLEInterface(MeshInterface): # Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit self._exit_handler = atexit.register(self.client.disconnect) - def from_num_handler(self, _, b): # pylint: disable=C0116 + def from_num_handler(self, _, b: bytes) -> None: # pylint: disable=C0116 """Handle callbacks for fromnum notify. Note: this method does not need to be async because it is just setting a bool. """ @@ -150,9 +151,12 @@ class BLEInterface(MeshInterface): ) return addressed_devices[0] - def _sanitize_address(address): # pylint: disable=E0213 + def _sanitize_address(self, address: Optional[str]) -> Optional[str]: # pylint: disable=E0213 "Standardize BLE address by removing extraneous characters and lowercasing." - return address.replace("-", "").replace("_", "").replace(":", "").lower() + if address is None: + return None + else: + return address.replace("-", "").replace("_", "").replace(":", "").lower() def connect(self, address: Optional[str] = None) -> "BLEClient": "Connect to a device by address." @@ -164,12 +168,16 @@ class BLEInterface(MeshInterface): client.discover() return client - def _receiveFromRadioImpl(self): + def _receiveFromRadioImpl(self) -> None: while self._want_receive: if self.should_read: self.should_read = False - retries = 0 + retries: int = 0 while self._want_receive: + if self.client is None: + logging.debug(f"BLE client is None, shutting down") + self._want_receive = False + continue try: b = bytes(self.client.read_gatt_char(FROMRADIO_UUID)) except BleakDBusError as e: @@ -194,8 +202,8 @@ class BLEInterface(MeshInterface): else: time.sleep(0.01) - def _sendToRadioImpl(self, toRadio): - b = toRadio.SerializeToString() + def _sendToRadioImpl(self, toRadio) -> None: + b: bytes = toRadio.SerializeToString() if b and self.client: # we silently ignore writes while we are shutting down logging.debug(f"TORADIO write: {b.hex()}") try: @@ -211,7 +219,7 @@ class BLEInterface(MeshInterface): time.sleep(0.01) self.should_read = True - def close(self): + def close(self) -> None: try: MeshInterface.close(self) except Exception as e: @@ -236,7 +244,7 @@ class BLEInterface(MeshInterface): class BLEClient: """Client for managing connection to a BLE device""" - def __init__(self, address=None, **kwargs): + def __init__(self, address=None, **kwargs) -> None: self._eventLoop = asyncio.new_event_loop() self._eventThread = Thread( target=self._run_event_loop, name="BLEClient", daemon=True diff --git a/meshtastic/mt_config.py b/meshtastic/mt_config.py index 662a10d..3b40294 100644 --- a/meshtastic/mt_config.py +++ b/meshtastic/mt_config.py @@ -13,6 +13,8 @@ with rather more easily once the code is simplified by this change. """ +from typing import Any, Optional + def reset(): """ Restore the namespace to pristine condition. @@ -33,5 +35,5 @@ args = None parser = None channel_index = None logfile = None -tunnelInstance = None +tunnelInstance: Optional[Any] = None camel_case = False diff --git a/meshtastic/remote_hardware.py b/meshtastic/remote_hardware.py index 73836f3..c3d873a 100644 --- a/meshtastic/remote_hardware.py +++ b/meshtastic/remote_hardware.py @@ -8,7 +8,7 @@ from meshtastic.protobuf import portnums_pb2, remote_hardware_pb2 from meshtastic.util import our_exit -def onGPIOreceive(packet, interface): +def onGPIOreceive(packet, interface) -> None: """Callback for received GPIO responses""" logging.debug(f"packet:{packet} interface:{interface}") gpioValue = 0 @@ -37,7 +37,7 @@ class RemoteHardwareClient: code for how you can connect to your own custom meshtastic services """ - def __init__(self, iface): + def __init__(self, iface) -> None: """ Constructor diff --git a/meshtastic/serial_interface.py b/meshtastic/serial_interface.py index fcca331..39f2648 100644 --- a/meshtastic/serial_interface.py +++ b/meshtastic/serial_interface.py @@ -5,7 +5,7 @@ import logging import platform import time -from typing import Optional +from typing import List, Optional import serial # type: ignore[import-untyped] @@ -19,7 +19,7 @@ if platform.system() != "Windows": class SerialInterface(StreamInterface): """Interface class for meshtastic devices over a serial link""" - def __init__(self, devPath: Optional[str]=None, debugOut=None, noProto=False, connectNow=True, noNodes: bool=False): + def __init__(self, devPath: Optional[str]=None, debugOut=None, noProto: bool=False, connectNow: bool=True, noNodes: bool=False) -> None: """Constructor, opens a connection to a specified serial port, or if unspecified try to find one Meshtastic device by probing @@ -32,13 +32,13 @@ class SerialInterface(StreamInterface): self.devPath: Optional[str] = devPath if self.devPath is None: - ports = meshtastic.util.findPorts(True) + ports: List[str] = meshtastic.util.findPorts(True) logging.debug(f"ports:{ports}") if len(ports) == 0: print("No Serial Meshtastic device detected, attempting TCP connection on localhost.") return elif len(ports) > 1: - message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n" + message: str = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n" message += f" Ports detected:{ports}" meshtastic.util.our_exit(message) else: @@ -59,14 +59,14 @@ class SerialInterface(StreamInterface): self.stream = serial.Serial( self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0 ) - self.stream.flush() + self.stream.flush() # type: ignore[attr-defined] time.sleep(0.1) StreamInterface.__init__( self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes ) - def close(self): + def close(self) -> None: """Close a connection to the device""" if self.stream: # Stream can be null if we were already closed self.stream.flush() # FIXME: why are there these two flushes with 100ms sleeps? This shouldn't be necessary diff --git a/meshtastic/stream_interface.py b/meshtastic/stream_interface.py index 55c1691..c1fecab 100644 --- a/meshtastic/stream_interface.py +++ b/meshtastic/stream_interface.py @@ -1,10 +1,13 @@ """Stream Interface base class """ +import io import logging import threading import time import traceback +from typing import Optional, cast + import serial # type: ignore[import-untyped] from meshtastic.mesh_interface import MeshInterface @@ -19,7 +22,7 @@ MAX_TO_FROM_RADIO_SIZE = 512 class StreamInterface(MeshInterface): """Interface class for meshtastic devices over a stream link (serial, TCP, etc)""" - def __init__(self, debugOut=None, noProto=False, connectNow=True, noNodes=False): + def __init__(self, debugOut: Optional[io.TextIOWrapper]=None, noProto: bool=False, connectNow: bool=True, noNodes: bool=False) -> None: """Constructor, opens a connection to self.stream Keyword Arguments: @@ -35,6 +38,7 @@ class StreamInterface(MeshInterface): raise Exception( # pylint: disable=W0719 "StreamInterface is now abstract (to update existing code create SerialInterface instead)" ) + self.stream: Optional[serial.Serial] # only serial uses this, TCPInterface overrides the relevant methods instead self._rxBuf = bytes() # empty self._wantExit = False @@ -52,7 +56,7 @@ class StreamInterface(MeshInterface): if not noProto: self.waitForConfig() - def connect(self): + def connect(self) -> None: """Connect to our radio Normally this is called automatically by the constructor, but if you @@ -63,7 +67,7 @@ class StreamInterface(MeshInterface): # if the reading statemachine was parsing a bad packet make sure # we write enough start bytes to force it to resync (we don't use START1 # because we want to ensure it is looking for START1) - p = bytearray([START2] * 32) + p: bytes = bytearray([START2] * 32) self._writeBytes(p) time.sleep(0.1) # wait 100ms to give device time to start running @@ -74,7 +78,7 @@ class StreamInterface(MeshInterface): if not self.noProto: # Wait for the db download if using the protocol self._waitConnected() - def _disconnected(self): + def _disconnected(self) -> None: """We override the superclass implementation to close our port""" MeshInterface._disconnected(self) @@ -86,7 +90,7 @@ class StreamInterface(MeshInterface): # pylint: disable=W0201 self.stream = None - def _writeBytes(self, b): + def _writeBytes(self, b: bytes) -> None: """Write an array of bytes to our stream and flush""" if self.stream: # ignore writes when stream is closed self.stream.write(b) @@ -98,24 +102,24 @@ class StreamInterface(MeshInterface): # we sleep here to give the TBeam a chance to work time.sleep(0.1) - def _readBytes(self, length): + def _readBytes(self, length) -> Optional[bytes]: """Read an array of bytes from our stream""" if self.stream: return self.stream.read(length) else: return None - def _sendToRadioImpl(self, toRadio): + def _sendToRadioImpl(self, toRadio) -> None: """Send a ToRadio protobuf to the device""" logging.debug(f"Sending: {stripnl(toRadio)}") - b = toRadio.SerializeToString() - bufLen = len(b) + b: bytes = toRadio.SerializeToString() + bufLen: int = len(b) # We convert into a string, because the TCP code doesn't work with byte arrays - header = bytes([START1, START2, (bufLen >> 8) & 0xFF, bufLen & 0xFF]) - logging.debug(f"sending header:{header} b:{b}") + header: bytes = bytes([START1, START2, (bufLen >> 8) & 0xFF, bufLen & 0xFF]) + logging.debug(f"sending header:{header!r} b:{b!r}") self._writeBytes(header + b) - def close(self): + def close(self) -> None: """Close a connection to the device""" logging.debug("Closing stream") MeshInterface.close(self) @@ -142,7 +146,7 @@ class StreamInterface(MeshInterface): else: self.cur_log_line += utf - def __reader(self): + def __reader(self) -> None: """The reader thread that reads bytes from our stream""" logging.debug("in __reader()") empty = bytes() @@ -150,13 +154,13 @@ class StreamInterface(MeshInterface): try: while not self._wantExit: # logging.debug("reading character") - b = self._readBytes(1) + b: Optional[bytes] = self._readBytes(1) # logging.debug("In reader loop") # logging.debug(f"read returned {b}") - if len(b) > 0: - c = b[0] + if b is not None and len(cast(bytes, b)) > 0: + c: int = b[0] # logging.debug(f'c:{c}') - ptr = len(self._rxBuf) + ptr: int = len(self._rxBuf) # Assume we want to append this byte, fixme use bytearray instead self._rxBuf = self._rxBuf + b diff --git a/meshtastic/tcp_interface.py b/meshtastic/tcp_interface.py index 8ccb021..66ac1f2 100644 --- a/meshtastic/tcp_interface.py +++ b/meshtastic/tcp_interface.py @@ -3,7 +3,7 @@ # pylint: disable=R0917 import logging import socket -from typing import Optional +from typing import Optional, cast from meshtastic.stream_interface import StreamInterface @@ -16,9 +16,9 @@ class TCPInterface(StreamInterface): self, hostname: str, debugOut=None, - noProto=False, - connectNow=True, - portNumber=DEFAULT_TCP_PORT, + noProto: bool=False, + connectNow: bool=True, + portNumber: int=DEFAULT_TCP_PORT, noNodes:bool=False, ): """Constructor, opens a connection to a specified IP address/hostname @@ -29,14 +29,16 @@ class TCPInterface(StreamInterface): self.stream = None - self.hostname = hostname - self.portNumber = portNumber + self.hostname: str = hostname + self.portNumber: int = portNumber + + self.socket: Optional[socket.socket] = None if connectNow: logging.debug(f"Connecting to {hostname}") # type: ignore[str-bytes-safe] - server_address = (hostname, portNumber) - sock = socket.create_connection(server_address) - self.socket: Optional[socket.socket] = sock + server_address: tuple[str, int] = (hostname, portNumber) + sock: Optional[socket.socket] = socket.create_connection(server_address) + self.socket = sock else: self.socket = None @@ -44,25 +46,26 @@ class TCPInterface(StreamInterface): self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes ) - def _socket_shutdown(self): + def _socket_shutdown(self) -> None: """Shutdown the socket. Note: Broke out this line so the exception could be unit tested. """ - self.socket.shutdown(socket.SHUT_RDWR) + if self.socket: #mian: please check that this should be "if self.socket:" + cast(socket.socket, self.socket).shutdown(socket.SHUT_RDWR) - def myConnect(self): + def myConnect(self) -> None: """Connect to socket""" - server_address = (self.hostname, self.portNumber) - sock = socket.create_connection(server_address) + server_address: tuple[str, int] = (self.hostname, self.portNumber) + sock: Optional[socket.socket] = socket.create_connection(server_address) self.socket = sock - def close(self): + def close(self) -> None: """Close a connection to the device""" logging.debug("Closing TCP stream") StreamInterface.close(self) # Sometimes the socket read might be blocked in the reader thread. # Therefore we force the shutdown by closing the socket here - self._wantExit = True + self._wantExit: bool = True if not self.socket is None: try: self._socket_shutdown() @@ -70,10 +73,14 @@ class TCPInterface(StreamInterface): pass # Ignore errors in shutdown, because we might have a race with the server self.socket.close() - def _writeBytes(self, b): + def _writeBytes(self, b: bytes) -> None: """Write an array of bytes to our stream and flush""" - self.socket.send(b) + if self.socket: + self.socket.send(b) - def _readBytes(self, length): + def _readBytes(self, length) -> Optional[bytes]: """Read an array of bytes from our stream""" - return self.socket.recv(length) + if self.socket: + return self.socket.recv(length) + else: + return None diff --git a/meshtastic/test.py b/meshtastic/test.py index 97947d4..96d6c09 100644 --- a/meshtastic/test.py +++ b/meshtastic/test.py @@ -5,6 +5,9 @@ import logging import sys import time import traceback +import io + +from typing import List, Optional from dotmap import DotMap # type: ignore[import-untyped] from pubsub import pub # type: ignore[import-untyped] @@ -15,19 +18,19 @@ from meshtastic.serial_interface import SerialInterface from meshtastic.tcp_interface import TCPInterface """The interfaces we are using for our tests""" -interfaces = None +interfaces: List = [] """A list of all packets we received while the current test was running""" -receivedPackets = None +receivedPackets: Optional[List] = None -testsRunning = False +testsRunning: bool = False -testNumber = 0 +testNumber: int = 0 sendingInterface = None -def onReceive(packet, interface): +def onReceive(packet, interface) -> None: """Callback invoked when a packet arrives""" if sendingInterface == interface: pass @@ -42,20 +45,20 @@ def onReceive(packet, interface): receivedPackets.append(p) -def onNode(node): +def onNode(node) -> None: """Callback invoked when the node DB changes""" print(f"Node changed: {node}") -def subscribe(): +def subscribe() -> None: """Subscribe to the topics the user probably wants to see, prints output to stdout""" pub.subscribe(onNode, "meshtastic.node") def testSend( - fromInterface, toInterface, isBroadcast=False, asBinary=False, wantAck=False -): + fromInterface, toInterface, isBroadcast: bool=False, asBinary: bool=False, wantAck: bool=False +) -> bool: """ Sends one test packet between two nodes and then returns success or failure @@ -93,16 +96,16 @@ def testSend( return False # Failed to send -def runTests(numTests=50, wantAck=False, maxFailures=0): +def runTests(numTests: int=50, wantAck: bool=False, maxFailures: int=0) -> bool: """Run the tests.""" logging.info(f"Running {numTests} tests with wantAck={wantAck}") - numFail = 0 - numSuccess = 0 + numFail: int = 0 + numSuccess: int = 0 for _ in range(numTests): # pylint: disable=W0603 global testNumber testNumber = testNumber + 1 - isBroadcast = True + isBroadcast:bool = True # asBinary=(i % 2 == 0) success = testSend( interfaces[0], interfaces[1], isBroadcast, asBinary=False, wantAck=wantAck @@ -126,10 +129,10 @@ def runTests(numTests=50, wantAck=False, maxFailures=0): return True -def testThread(numTests=50): +def testThread(numTests=50) -> bool: """Test thread""" logging.info("Found devices, starting tests...") - result = runTests(numTests, wantAck=True) + result: bool = runTests(numTests, wantAck=True) if result: # Run another test # Allow a few dropped packets @@ -137,25 +140,25 @@ def testThread(numTests=50): return result -def onConnection(topic=pub.AUTO_TOPIC): +def onConnection(topic=pub.AUTO_TOPIC) -> None: """Callback invoked when we connect/disconnect from a radio""" print(f"Connection changed: {topic.getName()}") -def openDebugLog(portName): +def openDebugLog(portName) -> io.TextIOWrapper: """Open the debug log file""" debugname = "log" + portName.replace("/", "_") logging.info(f"Writing serial debugging to {debugname}") return open(debugname, "w+", buffering=1, encoding="utf8") -def testAll(numTests=5): +def testAll(numTests: int=5) -> bool: """ Run a series of tests using devices we can find. This is called from the cli with the "--test" option. """ - ports = meshtastic.util.findPorts(True) + ports: List[str] = meshtastic.util.findPorts(True) if len(ports) < 2: meshtastic.util.our_exit( "Warning: Must have at least two devices connected to USB." @@ -175,7 +178,7 @@ def testAll(numTests=5): ) logging.info("Ports opened, starting test") - result = testThread(numTests) + result: bool = testThread(numTests) for i in interfaces: i.close() @@ -183,7 +186,7 @@ def testAll(numTests=5): return result -def testSimulator(): +def testSimulator() -> None: """ Assume that someone has launched meshtastic-native as a simulated node. Talk to that node over TCP, do some operations and if they are successful @@ -195,7 +198,7 @@ def testSimulator(): logging.basicConfig(level=logging.DEBUG) logging.info("Connecting to simulator on localhost!") try: - iface = TCPInterface("localhost") + iface: meshtastic.tcp_interface.TCPInterface = TCPInterface("localhost") iface.showInfo() iface.localNode.showInfo() iface.localNode.exitSimulator() diff --git a/meshtastic/tunnel.py b/meshtastic/tunnel.py index 409e3d4..d1294f3 100644 --- a/meshtastic/tunnel.py +++ b/meshtastic/tunnel.py @@ -43,7 +43,7 @@ class Tunnel: self.message = message super().__init__(self.message) - def __init__(self, iface, subnet="10.115", netmask="255.255.0.0"): + def __init__(self, iface, subnet: str="10.115", netmask: str="255.255.0.0") -> None: """ Constructor diff --git a/meshtastic/util.py b/meshtastic/util.py index 45527fa..2e0a989 100644 --- a/meshtastic/util.py +++ b/meshtastic/util.py @@ -11,7 +11,7 @@ import threading import time import traceback from queue import Queue -from typing import List, NoReturn, Union +from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union from google.protobuf.json_format import MessageToJson from google.protobuf.message import Message @@ -31,7 +31,7 @@ from meshtastic.version import get_active_version 0925 Lakeview Research Saleae Logic (logic analyzer) 04b4:602a Cypress Semiconductor Corp. Hantek DSO-6022BL (oscilloscope) """ -blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4]) +blacklistVids: Dict = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4]) """Some devices are highly likely to be meshtastic. 0x239a RAK4631 @@ -39,21 +39,21 @@ blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4]) whitelistVids = dict.fromkeys([0x239a, 0x303a]) -def quoteBooleans(a_string): +def quoteBooleans(a_string: str) -> str: """Quote booleans given a string that contains ": true", replace with ": 'true'" (or false) """ - tmp = a_string.replace(": true", ": 'true'") + tmp: str = a_string.replace(": true", ": 'true'") tmp = tmp.replace(": false", ": 'false'") return tmp -def genPSK256(): +def genPSK256() -> bytes: """Generate a random preshared key""" return os.urandom(32) -def fromPSK(valstr): +def fromPSK(valstr: str) -> Any: """A special version of fromStr that assumes the user is trying to set a PSK. In that case we also allow "none", "default" or "random" (to have python generate one), or simpleN """ @@ -70,7 +70,7 @@ def fromPSK(valstr): return fromStr(valstr) -def fromStr(valstr): +def fromStr(valstr: str) -> Any: """Try to parse as int, float or bool (and fallback to a string as last resort) Returns: an int, bool, float, str or byte array (for strings of hex digits) @@ -78,6 +78,7 @@ def fromStr(valstr): Args: valstr (string): A user provided string """ + val: Any if len(valstr) == 0: # Treat an emptystring as an empty bytes val = bytes() elif valstr.startswith("0x"): @@ -100,6 +101,7 @@ def fromStr(valstr): return val + def toStr(raw_value): """Convert a value to a string that can be used in a config file""" if isinstance(raw_value, bytes): @@ -107,7 +109,7 @@ def toStr(raw_value): return str(raw_value) -def pskToString(psk: bytes): +def pskToString(psk: bytes) -> str: """Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string""" if len(psk) == 0: return "unencrypted" @@ -129,12 +131,12 @@ def stripnl(s) -> str: return " ".join(s.split()) -def fixme(message): +def fixme(message: str) -> None: """Raise an exception for things that needs to be fixed""" raise Exception(f"FIXME: {message}") # pylint: disable=W0719 -def catchAndIgnore(reason, closure): +def catchAndIgnore(reason: str, closure) -> None: """Call a closure but if it throws an exception print it and continue""" try: closure() @@ -152,7 +154,7 @@ def findPorts(eliminate_duplicates: bool=False) -> List[str]: all_ports = serial.tools.list_ports.comports() # look for 'likely' meshtastic devices - ports = list( + ports: List = list( map( lambda port: port.device, filter( @@ -191,12 +193,12 @@ class dotdict(dict): class Timeout: """Timeout class""" - def __init__(self, maxSecs: int=20): + def __init__(self, maxSecs: int=20) -> None: self.expireTime: Union[int, float] = 0 self.sleepInterval: float = 0.1 self.expireTimeout: int = maxSecs - def reset(self): + def reset(self) -> None: """Restart the waitForSet timer""" self.expireTime = time.time() + self.expireTimeout @@ -255,7 +257,7 @@ class Timeout: class Acknowledgment: "A class that records which type of acknowledgment was just received, if any." - def __init__(self): + def __init__(self) -> None: """initialize""" self.receivedAck = False self.receivedNak = False @@ -264,7 +266,7 @@ class Acknowledgment: self.receivedTelemetry = False self.receivedPosition = False - def reset(self): + def reset(self) -> None: """reset""" self.receivedAck = False self.receivedNak = False @@ -277,18 +279,18 @@ class Acknowledgment: class DeferredExecution: """A thread that accepts closures to run, and runs them as they are received""" - def __init__(self, name): - self.queue = Queue() + def __init__(self, name) -> None: + self.queue: Queue = Queue() # this thread must be marked as daemon, otherwise it will prevent clients from exiting self.thread = threading.Thread(target=self._run, args=(), name=name, daemon=True) self.thread.daemon = True self.thread.start() - def queueWork(self, runnable): + def queueWork(self, runnable) -> None: """Queue up the work""" self.queue.put(runnable) - def _run(self): + def _run(self) -> None: while True: try: o = self.queue.get() @@ -308,7 +310,7 @@ def our_exit(message, return_value=1) -> NoReturn: sys.exit(return_value) -def support_info(): +def support_info() -> None: """Print out info that helps troubleshooting of the cli.""" print("") print("If having issues with meshtastic cli or python library") @@ -337,7 +339,7 @@ def support_info(): print("Please add the output from the command: meshtastic --info") -def remove_keys_from_dict(keys, adict): +def remove_keys_from_dict(keys: Union[Tuple, List, Set], adict: Dict) -> Dict: """Return a dictionary without some keys in it. Will removed nested keys. """ @@ -352,33 +354,33 @@ def remove_keys_from_dict(keys, adict): return adict -def hexstr(barray): +def hexstr(barray: bytes) -> str: """Print a string of hex digits""" return ":".join(f"{x:02x}" for x in barray) -def ipstr(barray): +def ipstr(barray: bytes) -> str: """Print a string of ip digits""" return ".".join(f"{x}" for x in barray) -def readnet_u16(p, offset): +def readnet_u16(p, offset: int) -> int: """Read big endian u16 (network byte order)""" return p[offset] * 256 + p[offset + 1] -def convert_mac_addr(val): +def convert_mac_addr(val: str) -> Union[str, bytes]: """Convert the base 64 encoded value to a mac address val - base64 encoded value (ex: '/c0gFyhb')) returns: a string formatted like a mac address (ex: 'fd:cd:20:17:28:5b') """ if not re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", val): - val_as_bytes = base64.b64decode(val) + val_as_bytes: bytes = base64.b64decode(val) return hexstr(val_as_bytes) return val -def snake_to_camel(a_string): +def snake_to_camel(a_string: str) -> str: """convert snake_case to camelCase""" # split underscore using split temp = a_string.split("_") @@ -387,16 +389,16 @@ def snake_to_camel(a_string): return result -def camel_to_snake(a_string): +def camel_to_snake(a_string: str) -> str: """convert camelCase to snake_case""" return "".join(["_" + i.lower() if i.isupper() else i for i in a_string]).lstrip( "_" ) -def detect_supported_devices(): +def detect_supported_devices() -> Set: """detect supported devices based on vendor id""" - system = platform.system() + system: str = platform.system() # print(f'system:{system}') possible_devices = set() @@ -454,9 +456,9 @@ def detect_supported_devices(): return possible_devices -def detect_windows_needs_driver(sd, print_reason=False): +def detect_windows_needs_driver(sd, print_reason=False) -> bool: """detect if Windows user needs to install driver for a supported device""" - need_to_install_driver = False + need_to_install_driver: bool = False if sd: system = platform.system() @@ -482,7 +484,7 @@ def detect_windows_needs_driver(sd, print_reason=False): return need_to_install_driver -def eliminate_duplicate_port(ports): +def eliminate_duplicate_port(ports: List) -> List: """Sometimes we detect 2 serial ports, but we really only need to use one of the ports. ports is a list of ports @@ -515,9 +517,9 @@ def eliminate_duplicate_port(ports): return new_ports -def is_windows11(): +def is_windows11() -> bool: """Detect if Windows 11""" - is_win11 = False + is_win11: bool = False if platform.system() == "Windows": if float(platform.release()) >= 10.0: patch = platform.version().split(".")[2] @@ -531,7 +533,7 @@ def is_windows11(): return is_win11 -def get_unique_vendor_ids(): +def get_unique_vendor_ids() -> Set[str]: """Return a set of unique vendor ids""" vids = set() for d in supported_devices: @@ -540,7 +542,7 @@ def get_unique_vendor_ids(): return vids -def get_devices_with_vendor_id(vid): +def get_devices_with_vendor_id(vid: str) -> Set: #Set[SupportedDevice] """Return a set of unique devices with the vendor id""" sd = set() for d in supported_devices: @@ -549,11 +551,11 @@ def get_devices_with_vendor_id(vid): return sd -def active_ports_on_supported_devices(sds, eliminate_duplicates=False): +def active_ports_on_supported_devices(sds, eliminate_duplicates=False) -> Set[str]: """Return a set of active ports based on the supplied supported devices""" - ports = set() - baseports = set() - system = platform.system() + ports: Set = set() + baseports: Set = set() + system: str = platform.system() # figure out what possible base ports there are for d in sds: @@ -611,13 +613,13 @@ def active_ports_on_supported_devices(sds, eliminate_duplicates=False): for com_port in com_ports: ports.add(com_port) if eliminate_duplicates: - ports = eliminate_duplicate_port(list(ports)) - ports.sort() - ports = set(ports) + portlist: List = eliminate_duplicate_port(list(ports)) + portlist.sort() + ports = set(portlist) return ports -def detect_windows_port(sd): +def detect_windows_port(sd) -> Set[str]: #"sd" is a SupportedDevice from meshtastic.supported_device """detect if Windows port""" ports = set() @@ -642,20 +644,26 @@ def detect_windows_port(sd): return ports -def check_if_newer_version(): +def check_if_newer_version() -> Optional[str]: """Check pip to see if we are running the latest version.""" - pypi_version = None + pypi_version: Optional[str] = None try: - url = "https://pypi.org/pypi/meshtastic/json" + url: str = "https://pypi.org/pypi/meshtastic/json" data = requests.get(url, timeout=5).json() pypi_version = data["info"]["version"] except Exception: pass act_version = get_active_version() + if pypi_version is None: + return None try: parsed_act_version = pkg_version.parse(act_version) parsed_pypi_version = pkg_version.parse(pypi_version) + #Note: if handed "None" when we can't download the pypi_version, + #this gets a TypeError: + #"TypeError: expected string or bytes-like object, got 'NoneType'" + #Handle that below? except pkg_version.InvalidVersion: return pypi_version